-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathoptions.go
More file actions
328 lines (277 loc) · 12.4 KB
/
options.go
File metadata and controls
328 lines (277 loc) · 12.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
package blobcache
import (
"hash"
"hash/crc32"
"github.com/miretskiy/blobcache/compression"
"github.com/miretskiy/blobcache/internal/wal"
"github.com/miretskiy/dio/iosched"
"github.com/miretskiy/dio/sys"
)
// DegradedMode controls how the cache behaves when entering degraded mode.
type DegradedMode int
const (
// DegradedMemoryOnly continues operating as a memory-only cache (default).
// This is the resilient option for production caches.
DegradedMemoryOnly DegradedMode = iota
// DegradedPanic panics with a stack trace when degraded mode is triggered.
// Use this for debugging and benchmarking to catch issues immediately.
DegradedPanic
)
// IOConfig holds I/O strategy settings
type IOConfig struct {
FDataSync bool // Use fdatasync for durability
Fadvise bool // Use fadvise to provide data access hints to the kernel.
DirectIOWrite bool // Use O_DIRECT (Linux) or F_NOCACHE (Darwin) for segment writes
DirectIORead bool // Use O_DIRECT (Linux) or F_NOCACHE (Darwin) for segment reads
}
// ResilienceConfig holds data integrity settings
type ResilienceConfig struct {
ChecksumHasher Hasher // Hash factory for checksums (nil = disabled)
VerifyOnRead bool // Verify checksums on reads
}
// CompressionConfig holds compression strategy settings
type CompressionConfig struct {
Codec compression.Codex // Compression algorithm (None, Zstd, LZ4, S2)
Level compression.Level // Compression level (Default, Speed, Best)
MinSize int64 // Don't compress blobs smaller than this (default: 512)
}
// WALConfig holds write-ahead log settings
type WALConfig struct {
Enabled bool // Enable WAL for durability (default: false)
wal.Config
}
// config holds internal configuration
type config struct {
Path string
MaxSize int64
Shards int
// --- Slab Configuration ---
WriteBufferSize int64 // Size of one memory slab
MaxInflightSlabs int // Max slabs queueing for flush
MaxCachedSlabs int // Max slabs kept in memory for reading
FlushConcurrency int
BloomFPRate float64
BloomEstimatedKeys int
IO IOConfig
Resilience ResilienceConfig
Compression CompressionConfig
WAL WALConfig
DegradedMode DegradedMode // How to handle degraded mode (default: memory-only)
TrustHash bool // Skip key verification on reads (cache mode optimization)
BallastSize int // Heap ballast size in bytes (default: 1GB, 0 = disabled)
CompactionWasteThreshold float64 // Waste ratio to trigger segment rewrite (WAL mode, default: 0.25)
IOScheduler *iosched.BlockingIO // Pluggable I/O backend for reads (default: direct POSIX)
// MaxReadConcurrency limits how many goroutines can simultaneously block
// in pread. 0 = unlimited (default). When set, excess readers block on
// a channel (Go scheduler, no OS thread consumed) rather than in a
// syscall, preventing thread explosion. See DESIGN.md §11.6.
// Recommended: runtime.GOMAXPROCS(0)*2 when using PreadScheduler.
// Not needed with URingScheduler (coordinator bounds threads to 1).
MaxReadConcurrency int
// --- Read Cache Configuration ---
ReadCacheSlabs int // Number of read cache slabs (0 = disabled)
ReadCacheSlabSize int64 // Size of each read cache slab (default: WriteBufferSize)
ReadCacheMaxItemSize int64 // Max item size to cache (0 = slabSize/64, -1 = no limit)
knobs *TestingKnobs
}
// Option configures BlobCache
type Option interface {
apply(*config)
}
type funcOpt func(*config)
func (f funcOpt) apply(c *config) {
f(c)
}
func WithMaxSize(size int64) Option {
return funcOpt(func(c *config) { c.MaxSize = size })
}
func WithShards(n int) Option {
return funcOpt(func(c *config) { c.Shards = n })
}
// WithWriteBufferSize sets the size of the memory chunks used for buffering.
// Default: 128MB.
func WithWriteBufferSize(bytes int64) Option {
return funcOpt(func(c *config) { c.WriteBufferSize = bytes })
}
// WithMaxInflightSlabs sets how many slabs can be queued for flushing
// before backpressure (blocking writes) kicks in.
func WithMaxInflightSlabs(n int) Option {
return funcOpt(func(c *config) { c.MaxInflightSlabs = n })
}
// WithMaxCachedSlabs sets how many sealed slabs are kept in memory for reading.
// Increasing this improves read performance for recently written data at the cost of RAM.
// Set to 0 to disable the in-memory read cache (all reads go to disk).
// Default: 4.
func WithMaxCachedSlabs(n int) Option {
return funcOpt(func(c *config) { c.MaxCachedSlabs = n })
}
func WithBloomFPRate(rate float64) Option {
return funcOpt(func(c *config) { c.BloomFPRate = rate })
}
func WithBloomEstimatedKeys(n int) Option {
return funcOpt(func(c *config) { c.BloomEstimatedKeys = n })
}
func WithChecksum() Option {
return funcOpt(func(c *config) {
c.Resilience.ChecksumHasher = func() hash.Hash32 { return crc32.NewIEEE() }
})
}
func WithChecksumHash(factory Hasher) Option {
return funcOpt(func(c *config) { c.Resilience.ChecksumHasher = factory })
}
func WithFDataSync(enabled bool) Option {
return funcOpt(func(c *config) { c.IO.FDataSync = enabled })
}
func WithVerifyOnRead(enabled bool) Option {
return funcOpt(func(c *config) { c.Resilience.VerifyOnRead = enabled })
}
func WithFlushConcurrency(n int) Option {
return funcOpt(func(c *config) { c.FlushConcurrency = n })
}
func WithFadvise(enabled bool) Option {
return funcOpt(func(c *config) { c.IO.Fadvise = enabled })
}
func WithDirectIOWrite(enabled bool) Option {
return funcOpt(func(c *config) { c.IO.DirectIOWrite = enabled })
}
// WithDirectIORead enables O_DIRECT (Linux) or F_NOCACHE (Darwin) for segment reads.
// Bypasses kernel page cache, which can reduce tail latency under heavy write I/O pressure.
// Default: false (leverage kernel page cache for reads).
func WithDirectIORead(enabled bool) Option {
return funcOpt(func(c *config) { c.IO.DirectIORead = enabled })
}
// WithReadConcurrency limits the number of goroutines that can simultaneously
// block in pread. Excess callers block on a Go channel (no OS thread consumed)
// rather than in a syscall, preventing thread explosion under high cold-read
// concurrency. See DESIGN.md §11.6.
//
// Recommended: runtime.GOMAXPROCS(0)*2 when using PreadScheduler.
// Not needed with URingScheduler (its coordinator already bounds OS threads to 1).
func WithReadConcurrency(n int) Option {
return funcOpt(func(c *config) { c.MaxReadConcurrency = n })
}
// WithCompression enables compression with the specified codec.
// Compression is performed in the calling goroutine during Put() to distribute
// CPU load and prevent flush workers from becoming bottlenecks.
func WithCompression(codec compression.Codex) Option {
return funcOpt(func(c *config) { c.Compression.Codec = codec })
}
// WithCompressionLevel sets the compression level.
func WithCompressionLevel(level compression.Level) Option {
return funcOpt(func(c *config) { c.Compression.Level = level })
}
// WithCompressionMinSize sets the minimum blob size for compression.
// Blobs smaller than this are stored uncompressed.
func WithCompressionMinSize(size int64) Option {
return funcOpt(func(c *config) { c.Compression.MinSize = size })
}
// WithTestingKnobs configures testing hooks for error injection and behavior overrides.
func WithTestingKnobs(knobs *TestingKnobs) Option {
return funcOpt(func(c *config) { c.knobs = knobs })
}
// WithWAL enables the write-ahead log for durability.
// When enabled, all writes are logged to WAL before being acknowledged.
// This transforms blobcache from an ephemeral cache into durable storage.
func WithWAL() Option {
return funcOpt(func(c *config) {
c.WAL.Enabled = true
c.IO.FDataSync = true // If using wal, not using data sync is lying to yourself.
c.TrustHash = false // CAS mode: verify keys to detect hash collisions.
})
}
// WithWALFlags sets the file flags for WAL writes.
// FlDirectIO: bypass OS page cache (default: enabled)
// FlDSync: fdatasync after writes (default: enabled)
// FlSync: full fsync after writes
// Use sys.SyncNone (0) for testing only.
func WithWALFlags(flags sys.OpenFlag) Option {
return funcOpt(func(c *config) { c.WAL.Flags = flags })
}
// WithDegradedMode controls how the cache handles degraded mode.
// DegradedMemoryOnly (default): continues operating as memory-only cache
// DegradedPanic: panics with stack trace (use for debugging/benchmarking)
func WithDegradedMode(mode DegradedMode) Option {
return funcOpt(func(c *config) { c.DegradedMode = mode })
}
// WithTrustHash controls whether to skip key verification on reads.
// When true, the cache trusts hash collisions are rare enough that verifying
// the stored key matches the requested key is unnecessary.
// Default: true for cache mode, false for CAS mode (WAL enabled).
// Cache mode: hash collision = wrong data served (acceptable for cache)
// CAS mode: hash collision = data corruption (must verify)
func WithTrustHash(enabled bool) Option {
return funcOpt(func(c *config) { c.TrustHash = enabled })
}
// WithBallast sets the heap ballast size in bytes.
// By default, the cache allocates a 1GB ballast at startup.
// This keeps the heap larger so GC triggers less frequently, reducing GC overhead.
// Use WithBallast(0) to disable, or WithBallast(10<<30) for 10GB, etc.
func WithBallast(size int) Option {
return funcOpt(func(c *config) { c.BallastSize = max(0, size) })
}
// WithCompactionWasteThreshold sets the waste ratio that triggers segment rewrite
// in WAL mode. When a segment's tombstone ratio exceeds this threshold, it becomes
// a candidate for compaction. Default: 0.25 (25%).
func WithCompactionWasteThreshold(ratio float64) Option {
return funcOpt(func(c *config) { c.CompactionWasteThreshold = ratio })
}
// WithIOScheduler sets the I/O backend used for segment reads.
// Default: direct POSIX pread. Use [iosched.NewDefaultIO] to get
// io_uring on Linux or [iosched.NewBlockingIO]([iosched.NewURingScheduler])
// for explicit control.
func WithIOScheduler(sched *iosched.BlockingIO) Option {
return funcOpt(func(c *config) { c.IOScheduler = sched })
}
// WithReadCacheSlabs enables the optional second-tier read cache for disk-resident
// blobs. The primary read path (Librarian) handles recently written data with
// zero-copy sub-microsecond latency; this cache is only useful for temporally
// distant reads or when the kernel page cache is under external pressure.
// Each arena is ReadCacheSlabSize bytes. Total memory = n * slabSize.
// Default: 0 (disabled — Librarian + kernel page cache is sufficient for most workloads).
func WithReadCacheSlabs(n int) Option {
return funcOpt(func(c *config) { c.ReadCacheSlabs = n })
}
// WithReadCacheSlabSize sets the size of each read cache slab.
// Default: same as WriteBufferSize.
func WithReadCacheSlabSize(size int64) Option {
return funcOpt(func(c *config) { c.ReadCacheSlabSize = size })
}
// WithReadCacheMaxItemSize sets the maximum item size eligible for caching.
// Items larger than this bypass the read cache and are served directly from disk.
// This prevents large items from rapidly filling slabs and evicting many smaller items.
// Default: slabSize/64 (ensuring at least 64 items per slab).
// Use -1 to disable the limit (cache all items regardless of size).
func WithReadCacheMaxItemSize(size int64) Option {
return funcOpt(func(c *config) { c.ReadCacheMaxItemSize = size })
}
func defaultConfig(path string) config {
return config{
Path: path,
MaxSize: 0,
Shards: 0, // 0 = auto-computed from MaxSize/WriteBufferSize
WriteBufferSize: 64 << 20, // 64MB
MaxInflightSlabs: 6,
MaxCachedSlabs: 8, // Keep ~1GB of recently written data in RAM
FlushConcurrency: 2,
BloomFPRate: 0.01,
BloomEstimatedKeys: 1_000_000,
DegradedMode: DegradedMemoryOnly,
TrustHash: true, // Cache mode: trust hash, skip key verification
BallastSize: 1 << 30, // 1GB heap ballast to reduce GC frequency
CompactionWasteThreshold: 0.25, // 25% waste triggers segment rewrite
IO: IOConfig{
FDataSync: false,
Fadvise: sys.UseFadvise,
DirectIOWrite: true,
},
Compression: CompressionConfig{
Codec: compression.CodexNone, // Disabled by default
Level: compression.CompressionDefault,
MinSize: 512, // Don't compress small blobs
},
WAL: WALConfig{
Enabled: false,
Config: wal.Config{Flags: sys.FlDirectIO | sys.SyncData},
},
}
}