-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsegment_drain_test.go
More file actions
251 lines (212 loc) · 7.75 KB
/
segment_drain_test.go
File metadata and controls
251 lines (212 loc) · 7.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
package blobcache
import (
"fmt"
"os"
"testing"
"github.com/miretskiy/blobcache/internal/index"
"github.com/stretchr/testify/require"
"github.com/zeebo/xxh3"
)
// TestSegmentDrain_Basic validates the full pressure-driven drain flow:
// when on-disk waste exceeds MaxSize/2, drain the sparsest segments.
func TestSegmentDrain_Basic(t *testing.T) {
tmpDir := t.TempDir()
cache, err := New(tmpDir,
WithMaxCachedSlabs(0), // Force disk path
WithWriteBufferSize(1<<20), // 1MB segments for faster testing
WithMaxSize(2<<20), // 2MB — waste allowance = 1MB
)
require.NoError(t, err)
cache.Start()
defer cache.Close()
// Write enough items to fill a segment.
const numItems = 20
value := make([]byte, 40_000) // 40KB each → ~800KB total in one segment
keys := make([][]byte, numItems)
for i := range numItems {
keys[i] = fmt.Appendf(nil, "drain-key-%04d", i)
require.NoError(t, cache.Put(keys[i], value))
}
cache.Drain()
// Identify the segment ID from one of the items.
h := xxh3.Hash128(keys[0])
item, found := cache.index.Get(h)
require.True(t, found)
segID := item.SegmentID
// Delete 19 of 20 items — makes this segment the sparsest (~40KB live).
for i := 1; i < numItems; i++ {
require.NoError(t, cache.Delete(keys[i]))
}
// Create enough filler segments for BOTH cooling period AND waste pressure.
// Use unique keys so each filler segment retains live data — otherwise the
// filler segments become empty (sparsest) and get drained before the target.
coolingFillers := cache.MaxCachedSlabs + index.CoolingPeriodMargin + 1
for i := range max(coolingFillers, 4) {
key := fmt.Appendf(nil, "filler-%04d", i)
require.NoError(t, cache.Put(key, make([]byte, 200_000)))
cache.Drain()
}
// Run drain directly — waste pressure should trigger drain of sparsest segment.
err = cache.maybeDrainSegments()
require.NoError(t, err)
// Verify: the segment file should be deleted.
segPath := getSegmentPath(cache.Path, cache.Shards, segID)
_, err = os.Stat(segPath)
require.True(t, os.IsNotExist(err), "segment file should be deleted after drain")
// Verify: remaining live item (keys[0]) should be a cache miss (drained from RAM).
_, ok := cache.Get(keys[0])
require.False(t, ok, "drained live item should become a cache miss")
// Verify: the key is no longer in the RAM index.
_, found = cache.index.Get(h)
require.False(t, found, "drained item should be removed from RAM index")
}
// TestSegmentDrain_NoPressure verifies drain is a no-op when on-disk waste
// is within the allowance (MaxSize/2).
func TestSegmentDrain_NoPressure(t *testing.T) {
tmpDir := t.TempDir()
cache, err := New(tmpDir,
WithMaxCachedSlabs(0),
WithWriteBufferSize(1<<20),
WithMaxSize(100<<20), // 100MB — waste allowance = 50MB, way above a few 1MB segments
)
require.NoError(t, err)
cache.Start()
defer cache.Close()
// Write items.
const numItems = 20
value := make([]byte, 40_000)
keys := make([][]byte, numItems)
for i := range numItems {
keys[i] = fmt.Appendf(nil, "nopressure-key-%04d", i)
require.NoError(t, cache.Put(keys[i], value))
}
cache.Drain()
// Get segment ID.
h := xxh3.Hash128(keys[0])
item, found := cache.index.Get(h)
require.True(t, found)
segID := item.SegmentID
// Delete 50% of items — segment has waste, but insufficient pressure.
for i := range numItems / 2 {
require.NoError(t, cache.Delete(keys[i]))
}
// Create segments to push past cooling.
for i := range cache.MaxCachedSlabs + index.CoolingPeriodMargin + 1 {
key := fmt.Appendf(nil, "filler-%04d", i)
require.NoError(t, cache.Put(key, make([]byte, 200_000)))
cache.Drain()
}
// Run drain — should be a no-op (waste well within allowance).
err = cache.maybeDrainSegments()
require.NoError(t, err)
// Verify: segment file still exists.
segPath := getSegmentPath(cache.Path, cache.Shards, segID)
_, err = os.Stat(segPath)
require.NoError(t, err, "segment file should still exist (no waste pressure)")
// Verify: live items still accessible.
for i := numItems / 2; i < numItems; i++ {
_, ok := cache.Get(keys[i])
require.True(t, ok, "live item %d should still be readable", i)
}
}
// TestSegmentDrain_CoolingPeriod verifies that recent segments are not drained
// even under waste pressure.
func TestSegmentDrain_CoolingPeriod(t *testing.T) {
tmpDir := t.TempDir()
cache, err := New(tmpDir,
WithMaxCachedSlabs(0),
WithWriteBufferSize(1<<20),
WithMaxSize(1<<20), // 1MB — waste allowance = 512KB, immediate pressure
)
require.NoError(t, err)
cache.Start()
defer cache.Close()
// Write items.
const numItems = 20
value := make([]byte, 40_000)
keys := make([][]byte, numItems)
for i := range numItems {
keys[i] = fmt.Appendf(nil, "cool-key-%04d", i)
require.NoError(t, cache.Put(keys[i], value))
}
cache.Drain()
// Get segment ID.
h := xxh3.Hash128(keys[0])
item, found := cache.index.Get(h)
require.True(t, found)
segID := item.SegmentID
// Delete 95% of items.
for i := 1; i < numItems; i++ {
require.NoError(t, cache.Delete(keys[i]))
}
// Do NOT create enough segments to push past cooling period.
// Run drain — should be a no-op (segment too recent).
err = cache.maybeDrainSegments()
require.NoError(t, err)
// Verify: segment file still exists (within cooling period).
segPath := getSegmentPath(cache.Path, cache.Shards, segID)
_, err = os.Stat(segPath)
require.NoError(t, err, "segment file should still exist (within cooling period)")
// Now create enough segments to push past cooling period.
// Use unique keys so filler segments stay populated.
coolingFillers := cache.MaxCachedSlabs + index.CoolingPeriodMargin + 1
for i := range max(coolingFillers, 4) {
key := fmt.Appendf(nil, "filler-%04d", i)
require.NoError(t, cache.Put(key, make([]byte, 200_000)))
cache.Drain()
}
// Run drain again — now should drain the sparse segment.
err = cache.maybeDrainSegments()
require.NoError(t, err)
// Verify: segment file should be deleted.
_, err = os.Stat(segPath)
require.True(t, os.IsNotExist(err), "segment file should be deleted after cooling period")
}
// TestSegmentDrain_WALModeSkipped verifies that segment drain is NOT triggered
// in WAL/CAS mode (WAL mode uses segment compaction instead of drain).
func TestSegmentDrain_WALModeSkipped(t *testing.T) {
tmpDir := t.TempDir()
cache, err := New(tmpDir,
WithWAL(),
WithMaxCachedSlabs(0),
WithWriteBufferSize(1<<20),
WithMaxSize(2<<20), // 2MB — would trigger drain in cache mode
WithCompactionWasteThreshold(1.0), // Disable compaction to isolate drain behavior
)
require.NoError(t, err)
cache.Start()
defer cache.Close()
// Write items.
const numItems = 20
value := make([]byte, 40_000)
keys := make([][]byte, numItems)
for i := range numItems {
keys[i] = fmt.Appendf(nil, "wal-key-%04d", i)
require.NoError(t, cache.Put(keys[i], value))
}
cache.Drain()
// Get segment ID.
h := xxh3.Hash128(keys[0])
item, found := cache.index.Get(h)
require.True(t, found)
segID := item.SegmentID
// Delete 95% of items.
for i := 1; i < numItems; i++ {
require.NoError(t, cache.Delete(keys[i]))
}
// Create enough segments to push past cooling AND waste pressure.
coolingFillers := cache.MaxCachedSlabs + index.CoolingPeriodMargin + 1
for i := range max(coolingFillers, 4) {
key := fmt.Appendf(nil, "filler-%04d", i)
require.NoError(t, cache.Put(key, make([]byte, 200_000)))
cache.Drain()
}
// In WAL mode, maintenanceWorker skips drain (Phase 3 is guarded by c.wal == nil).
// With compaction disabled, we verify the segment file is still present.
segPath := getSegmentPath(cache.Path, cache.Shards, segID)
_, err = os.Stat(segPath)
require.NoError(t, err, "WAL mode should NOT drain segments")
// Live item should still be accessible.
_, ok := cache.Get(keys[0])
require.True(t, ok, "live item should still be readable in WAL mode")
}