-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathreplication_buffer.go
More file actions
64 lines (54 loc) · 1.65 KB
/
replication_buffer.go
File metadata and controls
64 lines (54 loc) · 1.65 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package iceberg
import (
"github.com/transferia/transferia/pkg/abstract"
)
// tableBuffer accumulates ChangeItems for a single table between flush cycles.
// It tracks estimated memory usage and the maximum LSN seen for WAL position checkpointing.
type tableBuffer struct {
items []abstract.ChangeItem
sizeBytes int64
lastSchema *abstract.TableSchema
maxLSN uint64
}
func newTableBuffer() *tableBuffer {
return &tableBuffer{}
}
// Append adds an item to the buffer, updating size estimates and max LSN.
func (b *tableBuffer) Append(item abstract.ChangeItem) {
b.items = append(b.items, item)
b.sizeBytes += estimateItemSize(item)
if item.LSN > b.maxLSN {
b.maxLSN = item.LSN
}
if item.TableSchema != nil {
b.lastSchema = item.TableSchema
}
}
// DrainAndReset returns all buffered items and resets the buffer.
// Returns items, the last seen table schema, and the max LSN.
func (b *tableBuffer) DrainAndReset() ([]abstract.ChangeItem, *abstract.TableSchema, uint64) {
items := b.items
schema := b.lastSchema
lsn := b.maxLSN
b.items = nil
b.sizeBytes = 0
b.lastSchema = nil
b.maxLSN = 0
return items, schema, lsn
}
// SizeBytes returns the estimated memory usage of the buffer.
func (b *tableBuffer) SizeBytes() int64 {
return b.sizeBytes
}
// Len returns the number of items in the buffer.
func (b *tableBuffer) Len() int {
return len(b.items)
}
// estimateItemSize gives a rough byte estimate for a ChangeItem.
func estimateItemSize(item abstract.ChangeItem) int64 {
if item.Size.Values > 0 {
return int64(item.Size.Values)
}
// Rough estimate: 64 bytes per column value + 32 bytes overhead
return int64(len(item.ColumnValues)*64 + 32)
}