-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathkinesis.go
More file actions
151 lines (116 loc) · 3.46 KB
/
kinesis.go
File metadata and controls
151 lines (116 loc) · 3.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package dlog
import (
"errors"
"fmt"
"sync"
"time"
"github.com/AdRoll/goamz/kinesis"
)
type KinesisInterface interface {
PutRecords(streamName string, records []kinesis.PutRecordsRequestEntry) (resp *kinesis.PutRecordsResponse, err error)
CreateStream(name string, shardCount int) error
DescribeStream(name string) (resp *kinesis.StreamDescription, err error)
DeleteStream(name string) error
}
type kinesisMock struct {
// Mapping from steam name to batches of batches
storage map[string][][]kinesis.PutRecordsRequestEntry
// simulate lantency that sync to Kinesis
putRecordLatency time.Duration
// created streams' names
streamNames []string
// lock to solve concurrent call
lock sync.RWMutex
}
func newKinesisMock(putRecordsLatency time.Duration) *kinesisMock {
return &kinesisMock{
storage: make(map[string][][]kinesis.PutRecordsRequestEntry),
putRecordLatency: putRecordsLatency,
streamNames: make([]string, 0),
}
}
func (mock *kinesisMock) PutRecords(streamName string, records []kinesis.PutRecordsRequestEntry) (resp *kinesis.PutRecordsResponse, err error) {
mock.lock.Lock()
defer mock.lock.Unlock()
if !streamNameRegexp.MatchString(streamName) {
return nil, fmt.Errorf("Invalid stream name %s", streamName)
}
if !mock.find(streamName) {
return nil, fmt.Errorf("Not found stream %s", streamName)
}
if len(records) == 0 {
return nil, errors.New("records length == 0")
}
time.Sleep(mock.putRecordLatency)
mock.storage[streamName] = append(mock.storage[streamName], records)
return &kinesis.PutRecordsResponse{
FailedRecordCount: 0, // Always success.
Records: nil}, nil
}
func (mock *kinesisMock) CreateStream(name string, shardCount int) error {
mock.lock.Lock()
defer mock.lock.Unlock()
if !streamNameRegexp.MatchString(name) {
return fmt.Errorf("Invalid stream name %s", name)
}
if mock.find(name) {
return fmt.Errorf("Stream already exists %s", name)
}
mock.streamNames = append(mock.streamNames, name)
return nil
}
func (mock *kinesisMock) DescribeStream(name string) (resp *kinesis.StreamDescription, err error) {
mock.lock.RLock()
defer mock.lock.RUnlock()
if !streamNameRegexp.MatchString(name) {
return nil, fmt.Errorf("Invalid stream name %s", name)
}
if !mock.find(name) {
return nil, fmt.Errorf("Not found stream %s", name)
}
resp = &kinesis.StreamDescription{
StreamName: name,
StreamStatus: "Active",
}
return resp, nil
}
func (mock *kinesisMock) DeleteStream(name string) error {
mock.lock.Lock()
defer mock.lock.Unlock()
if !streamNameRegexp.MatchString(name) {
return fmt.Errorf("Invalid stream name %s", name)
}
if !mock.find(name) {
return fmt.Errorf("Not found stream %s", name)
}
newStreamNames := make([]string, 0, len(mock.streamNames)-1)
for _, v := range mock.streamNames {
if v != name {
newStreamNames = append(newStreamNames, v)
}
}
mock.streamNames = newStreamNames
return nil
}
func (mock *kinesisMock) find(streamName string) bool {
if len(mock.streamNames) <= 0 {
return false
}
for _, v := range mock.streamNames {
if v == streamName {
return true
}
}
return false
}
type brokenKinesisMock struct {
*kinesisMock
}
func newBrokenKinesisMock() *brokenKinesisMock {
return &brokenKinesisMock{
kinesisMock: newKinesisMock(0),
}
}
func (mock *brokenKinesisMock) PutRecords(streamName string, records []kinesis.PutRecordsRequestEntry) (resp *kinesis.PutRecordsResponse, err error) {
return nil, fmt.Errorf("Kinesis is broken")
}