-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsequence_test.go
More file actions
297 lines (235 loc) · 8.91 KB
/
sequence_test.go
File metadata and controls
297 lines (235 loc) · 8.91 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
package blobcache
import (
"errors"
"sync"
"sync/atomic"
"testing"
"github.com/miretskiy/blobcache/internal/index"
"github.com/miretskiy/blobcache/internal/xmap"
"github.com/stretchr/testify/require"
"github.com/zeebo/xxh3"
)
// TestMemTable_LifecycleGuard verifies that writes with seqID <= maxSealedSeq are rejected
func TestMemTable_LifecycleGuard(t *testing.T) {
cfg := defaultConfig(t.TempDir())
cfg.WriteBufferSize = 4 << 10 // 4KB
mb := &MockBatcher{}
mh := &MockHealthReporter{}
mt := NewMemTable(cfg, mb, mh, nil, nil, newSegmentIDProvider(cfg.Path, cfg.Shards))
defer mt.Close()
// Write with seqID=100
err := mt.Put(100, xmap.Key{Lo: 1, Hi: 0}, []byte("key1"), []byte("value1"))
require.NoError(t, err)
// Simulate rotation by manually setting maxSealedSeq
mt.mu.Lock()
mt.mu.maxSealedSeq = 100
mt.mu.Unlock()
// Write with seqID=50 (older than sealed) should be rejected
err = mt.Put(50, xmap.Key{Lo: 2, Hi: 0}, []byte("key2"), []byte("value2"))
require.True(t, errors.Is(err, errSequenceTooOld), "expected errSequenceTooOld, got: %v", err)
// Write with seqID=100 (equal to sealed) should also be rejected
err = mt.Put(100, xmap.Key{Lo: 3, Hi: 0}, []byte("key3"), []byte("value3"))
require.True(t, errors.Is(err, errSequenceTooOld), "expected errSequenceTooOld for equal seqID")
// Write with seqID=101 (newer) should succeed
err = mt.Put(101, xmap.Key{Lo: 4, Hi: 0}, []byte("key4"), []byte("value4"))
require.NoError(t, err)
}
// TestMemTable_ConcurrencyGuard verifies that concurrent writes to the same key
// result in the newer seqID winning
func TestMemTable_ConcurrencyGuard(t *testing.T) {
cfg := defaultConfig(t.TempDir())
cfg.WriteBufferSize = 64 << 10 // 64KB to avoid rotation
mb := &MockBatcher{}
mh := &MockHealthReporter{}
mt := NewMemTable(cfg, mb, mh, nil, nil, newSegmentIDProvider(cfg.Path, cfg.Shards))
defer mt.Close()
key := xmap.Key{Lo: 12345, Hi: 0}
oldValue := []byte("old-value")
newValue := []byte("new-value")
keyBytes := []byte("test-key")
// Write with older seqID first
err := mt.Put(100, key, keyBytes, oldValue)
require.NoError(t, err)
// Write with newer seqID
err = mt.Put(200, key, keyBytes, newValue)
require.NoError(t, err)
// Now write with older seqID again - should NOT overwrite
err = mt.Put(150, key, keyBytes, []byte("middle-value"))
require.NoError(t, err) // Write itself succeeds (space reserved)
// Verify the index has the newer value (seqID=200)
mt.mu.Lock()
active := mt.mu.active
mt.mu.Unlock()
record, found := active.index.Get(key)
require.True(t, found)
require.Equal(t, uint64(200), record.SeqID, "index should have the newest seqID")
}
// TestMemTable_ConcurrentWritesSameKey tests concurrent goroutines writing same key
func TestMemTable_ConcurrentWritesSameKey(t *testing.T) {
cfg := defaultConfig(t.TempDir())
cfg.WriteBufferSize = 1 << 20 // 1MB to avoid rotation
mb := &MockBatcher{}
mh := &MockHealthReporter{}
mt := NewMemTable(cfg, mb, mh, nil, nil, newSegmentIDProvider(cfg.Path, cfg.Shards))
defer mt.Close()
key := xmap.Key{Lo: 99999, Hi: 0}
const numWriters = 100
var wg sync.WaitGroup
var maxSeqWritten atomic.Uint64
keyBytes := []byte("concurrent-key")
for i := 0; i < numWriters; i++ {
wg.Add(1)
seqID := uint64(i + 1)
go func(seq uint64) {
defer wg.Done()
value := make([]byte, 100)
err := mt.Put(seq, key, keyBytes, value)
require.NoError(t, err)
// Track highest seqID we wrote
for {
current := maxSeqWritten.Load()
if seq <= current || maxSeqWritten.CompareAndSwap(current, seq) {
break
}
}
}(seqID)
}
wg.Wait()
// Verify the index has the highest seqID
mt.mu.Lock()
active := mt.mu.active
mt.mu.Unlock()
record, found := active.index.Get(key)
require.True(t, found)
require.Equal(t, uint64(numWriters), record.SeqID,
"index should have seqID=%d (the highest), got %d", numWriters, record.SeqID)
}
// TestMemTable_RotationUpdatesMaxSealedSeq verifies that rotation captures currentMaxSeq
func TestMemTable_RotationUpdatesMaxSealedSeq(t *testing.T) {
cfg := defaultConfig(t.TempDir())
cfg.WriteBufferSize = 1 << 10 // 1KB - small to trigger rotation
mb := &MockBatcher{}
mh := &MockHealthReporter{}
mt := NewMemTable(cfg, mb, mh, nil, nil, newSegmentIDProvider(cfg.Path, cfg.Shards))
defer mt.Close()
// Write some data with increasing seqIDs
for i := 1; i <= 5; i++ {
keyBytes := []byte("key")
err := mt.Put(uint64(i*100), xmap.Key{Lo: uint64(i), Hi: 0}, keyBytes, make([]byte, 100))
require.NoError(t, err)
}
// Verify currentMaxSeq is 500
mt.mu.Lock()
require.Equal(t, uint64(500), mt.mu.active.currentMaxSeq)
mt.mu.Unlock()
// Trigger rotation by writing large value
err := mt.Put(600, xmap.Key{Lo: 999, Hi: 0}, []byte("large-key"), make([]byte, 800))
require.NoError(t, err)
// Wait for any pending writes
mt.Drain()
// After rotation, maxSealedSeq should be at least 500
mt.mu.Lock()
maxSealed := mt.mu.maxSealedSeq
mt.mu.Unlock()
require.GreaterOrEqual(t, maxSealed, uint64(500),
"maxSealedSeq should be >= 500 after rotation, got %d", maxSealed)
// Writing with seqID <= maxSealed should now fail
err = mt.Put(400, xmap.Key{Lo: 1000, Hi: 0}, []byte("fail-key"), []byte("should-fail"))
require.True(t, errors.Is(err, errSequenceTooOld))
}
// --- Cache Retry Loop Tests ---
// mockSequenceVendor allows tests to control sequence ID generation
type mockSequenceVendor struct {
seq atomic.Uint64
}
func (m *mockSequenceVendor) NextSeq() uint64 {
return m.seq.Add(1)
}
// TestCache_RetryLoop_ZombieResurrection verifies that putWithRetry acquires fresh seqID on rejection
func TestCache_RetryLoop_ZombieResurrection(t *testing.T) {
tmpDir := t.TempDir()
// We'll use a sequence vendor that we control
seqVendor := &mockSequenceVendor{}
seqVendor.seq.Store(100) // Start at 100
cache, err := New(tmpDir,
WithWriteBufferSize(64<<10),
WithTestingKnobs(&TestingKnobs{
SequenceVendor: seqVendor,
}),
)
require.NoError(t, err)
defer cache.Close()
// Simulate a zombie scenario:
// 1. First call gets seqID=101
// 2. Manually set maxSealedSeq to 200 (simulating rotation happened)
// 3. Put should fail with errSequenceTooOld
// 4. Retry should get seqID=102 (still old), fail again
// 5. Eventually succeed when seqID > maxSealedSeq
// Set up maxSealedSeq to force retries
cache.memTable.mu.Lock()
cache.memTable.mu.maxSealedSeq = 200
cache.memTable.mu.Unlock()
// Now bump sequence vendor past maxSealedSeq so retry will succeed
seqVendor.seq.Store(200)
// Put should retry and eventually succeed
require.NoError(t, cache.Put([]byte("zombie-key"), []byte("zombie-value")))
// Verify the write succeeded by reading back
data, found := cache.Get([]byte("zombie-key"))
require.True(t, found, "zombie write should have succeeded after retry")
require.Equal(t, []byte("zombie-value"), data)
// The final seqID should be > 200
cache.memTable.mu.Lock()
active := cache.memTable.mu.active
cache.memTable.mu.Unlock()
h := xxh3.Hash128([]byte("zombie-key"))
record, found := active.index.Get(xmap.Key(h))
require.True(t, found)
require.Greater(t, record.SeqID, uint64(200),
"final seqID should be > maxSealedSeq(200), got %d", record.SeqID)
}
// TestCache_RetryLoop_IdempotentSuccess tests that if a newer version exists,
// the retry loop returns success (idempotent behavior)
func TestCache_RetryLoop_IdempotentSuccess(t *testing.T) {
tmpDir := t.TempDir()
// Use controlled sequence vendor so we can simulate the scenario precisely
seqVendor := &mockSequenceVendor{}
seqVendor.seq.Store(100)
cache, err := New(tmpDir,
WithWriteBufferSize(64<<10),
WithTestingKnobs(&TestingKnobs{
SequenceVendor: seqVendor,
}),
)
require.NoError(t, err)
defer cache.Close()
key := []byte("idempotent-key")
h := xxh3.Hash128(key)
// First, write a value with seqID=101
require.NoError(t, cache.Put(key, []byte("first-value")))
cache.Drain()
// Verify it's in the index
_, found := cache.index.Get(index.Key(h))
require.True(t, found)
// Now write a "newer" value with seqID=200
seqVendor.seq.Store(199) // nextSeq will return 200
require.NoError(t, cache.Put(key, []byte("newer-value")))
cache.Drain()
// Verify the entry still exists in index
_, found = cache.index.Get(index.Key(h))
require.True(t, found)
// Now simulate a zombie: it will get seqID=50 (older than what's in index)
// and maxSealedSeq will reject it
seqVendor.seq.Store(49) // nextSeq will return 50
cache.memTable.mu.Lock()
cache.memTable.mu.maxSealedSeq = 300 // Force rejection
cache.memTable.mu.Unlock()
// Zombie writes with seqID=50:
// 1. Gets errSequenceTooOld (50 <= 300)
// 2. Checks index: existingRecord.SeqID (200) >= 50? YES
// 3. Returns success (idempotent - newer version exists)
require.NoError(t, cache.Put(key, []byte("zombie-value")))
// The value should still be "newer-value" (zombie was idempotently "successful")
data, found := cache.Get(key)
require.True(t, found)
require.Equal(t, []byte("newer-value"), data, "should still have newer value, not zombie")
}