-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmemtable_test.go
More file actions
173 lines (146 loc) · 4.1 KB
/
memtable_test.go
File metadata and controls
173 lines (146 loc) · 4.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
package blobcache
import (
"fmt"
"os"
"path/filepath"
"sync"
"testing"
"golang.org/x/sys/unix"
"github.com/miretskiy/blobcache/base"
"github.com/miretskiy/blobcache/internal/index"
"github.com/miretskiy/blobcache/internal/record"
"github.com/miretskiy/blobcache/internal/xmap"
"github.com/stretchr/testify/require"
)
// TrackedPool wraps the production MmapPool for deterministic cleanup.
type TrackedPool struct {
*MmapPool
mu sync.Mutex
extraRegions [][]byte
}
func NewTrackedPool(capacity int, slabSize int64) *TrackedPool {
return &TrackedPool{
MmapPool: NewMmapPool("", slabSize, capacity),
}
}
func (tp *TrackedPool) AcquireAligned(size int64) *MmapBuffer {
buf := tp.MmapPool.AcquireAligned(size)
// If buf.pool is nil, it's a one-off unpooled allocation that needs tracking.
if !buf.IsPooled() {
tp.mu.Lock()
tp.extraRegions = append(tp.extraRegions, buf.Bytes())
tp.mu.Unlock()
}
return buf
}
func (tp *TrackedPool) Teardown() {
tp.mu.Lock()
defer tp.mu.Unlock()
// Clean up unpooled regions.
for _, r := range tp.extraRegions {
_ = unix.Munmap(r)
}
tp.extraRegions = nil
}
// MockBatcher implements the Batcher interface for testing.
type MockBatcher struct {
mu sync.Mutex
Batches map[uint32][]index.Item
Count int
}
func (m *MockBatcher) PutBatch(segID uint32, entries []record.FooterEntry, _ uint64) error {
m.mu.Lock()
defer m.mu.Unlock()
if m.Batches == nil {
m.Batches = make(map[uint32][]index.Item)
}
// Convert entries to items for test assertions.
for i := range entries {
e := &entries[i]
physicalLen := int64(record.HeaderSize) + int64(e.KeyLen) + e.PhysicalSize
item := index.Item{
Key: e.Key,
SegmentID: segID,
Offset: uint32(e.Pos),
PhysicalLen: uint32(physicalLen),
}
item.SetCompression(e.Compression())
m.Batches[segID] = append(m.Batches[segID], item)
}
m.Count += len(entries)
return nil
}
// MockHealthReporter tracks desyncs or disk failures.
type MockHealthReporter struct {
mu sync.Mutex
ReportedErr error
DegradedFlag bool
}
type mockLibrarian struct{}
func (m mockLibrarian) Publish(slab *SharedSlab) {
}
var _ Publisher = mockLibrarian{}
func (m *MockHealthReporter) ReportError(err error) {
m.mu.Lock()
defer m.mu.Unlock()
m.ReportedErr = err
m.DegradedFlag = true
}
func (m *MockHealthReporter) IsDegraded() bool {
m.mu.Lock()
defer m.mu.Unlock()
return m.DegradedFlag
}
func (m *MockHealthReporter) ReportBlobError(key Key, errno base.BlobErrno) {
// No-op for tests
}
func TestMemTable_Integration_Rotation(t *testing.T) {
tmpDir := t.TempDir()
cfg := config{
Path: tmpDir,
WriteBufferSize: 512 * 1024,
MaxInflightSlabs: 4,
FlushConcurrency: 2,
Shards: 1,
}
// Create segment directory structure (normally done by checkOrInitialize)
require.NoError(t, os.MkdirAll(filepath.Join(tmpDir, "segments", "0000"), 0o755))
mb := &MockBatcher{}
mh := &MockHealthReporter{}
mt := NewMemTable(cfg, mb, mh, &mockLibrarian{}, nil, newSegmentIDProvider(cfg.Path, cfg.Shards))
defer mt.Close()
// Ingest blobs to force rotation across multiple 1MB segments.
blobCount := 20
blobSize := 100 * 1024
data := make([]byte, blobSize)
for i := 0; i < blobSize; i++ {
data[i] = byte(i % 256)
}
for i := 0; i < blobCount; i++ {
key := xmap.Key{Lo: uint64(i), Hi: 0}
keyBytes := fmt.Appendf(nil, "test-key-%04d", i)
require.NoError(t, mt.Put(uint64(i+1), key, keyBytes, data))
}
mt.Drain()
// Check if any errors were reported during flush
if mh.DegradedFlag {
t.Fatalf("MemTable entered degraded mode: %v", mh.ReportedErr)
}
// Verify all records hit the Batcher.
require.Equal(t, blobCount, mb.Count)
// 2. Verify rotation occurred.
require.GreaterOrEqual(t, len(mb.Batches), 2, "Should have flushed multiple segments")
// 3. Verify physical segments exist on disk.
segCount := 0
err := filepath.Walk(tmpDir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.IsDir() && filepath.Ext(path) == ".seg" {
segCount++
}
return nil
})
require.NoError(t, err)
require.GreaterOrEqual(t, segCount, 2)
}