-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathcolumnar_memtable.h
More file actions
1426 lines (1339 loc) · 59.2 KB
/
columnar_memtable.h
File metadata and controls
1426 lines (1339 loc) · 59.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#ifndef COLUMNAR_MEMTABLE_H
#define COLUMNAR_MEMTABLE_H
#include <algorithm>
#include <array>
#include <atomic>
#include <cmath>
#include <condition_variable>
#include <cstring>
#include <deque> // For the thread ID pool
#include <functional>
#include <future>
#include <iostream>
#include <map>
#include <memory>
#include <mutex>
#include <numeric>
#include <optional>
#include <queue>
#include <stdexcept>
#include <string>
#include <string_view>
#include <thread>
#include <tuple>
#include <utility>
#include <vector>
#define XXH_INLINE_ALL
#include "xxhash.h"
// --- Forward Declarations ---
enum class RecordType;
struct RecordRef;
class ColumnarBlock;
class Sorter;
class SortedColumnarBlock;
class FlushIterator;
class CompactingIterator;
class FlashActiveBlock;
class BloomFilter;
class ColumnarRecordArena;
class ColumnarMemTable;
// --- Core Utility Structures ---
struct XXHasher {
std::size_t operator()(const std::string_view key) const noexcept { return XXH3_64bits(key.data(), key.size()); }
};
class SpinLock {
public:
void lock() noexcept {
for (;;) {
if (!lock_.exchange(true, std::memory_order_acquire)) {
return;
}
while (lock_.load(std::memory_order_relaxed)) {
__builtin_ia32_pause();
}
}
}
void unlock() noexcept { lock_.store(false, std::memory_order_release); }
private:
std::atomic<bool> lock_ = {false};
};
inline uint64_t load_u64_prefix(std::string_view sv) {
if (sv.size() >= 8) {
uint64_t prefix;
memcpy(&prefix, sv.data(), 8);
return prefix;
}
uint64_t prefix = 0;
if (!sv.empty()) {
memcpy(&prefix, sv.data(), sv.size());
}
return prefix;
}
enum class RecordType { Put, Delete };
struct RecordRef {
std::string_view key;
std::string_view value;
RecordType type;
};
// --- Bloom Filter ---
class BloomFilter {
public:
explicit BloomFilter(size_t num_entries, double false_positive_rate = 0.01);
void Add(std::string_view key);
bool MayContain(std::string_view key) const;
size_t ApproximateMemoryUsage() const { return bits_.capacity() / 8; }
private:
static std::array<uint64_t, 2> Hash(std::string_view key);
std::vector<bool> bits_;
int num_hashes_;
};
inline BloomFilter::BloomFilter(size_t n, double p) {
if (n == 0) n = 1;
size_t m = -1.44 * n * std::log(p);
bits_ = std::vector<bool>((m + 7) & ~7, false);
num_hashes_ = 0.7 * (double(bits_.size()) / n);
if (num_hashes_ < 1) num_hashes_ = 1;
if (num_hashes_ > 8) num_hashes_ = 8;
}
inline void BloomFilter::Add(std::string_view key) {
auto h = Hash(key);
for (int i = 0; i < num_hashes_; ++i) {
uint64_t hash = h[0] + i * h[1];
if (!bits_.empty()) bits_[hash % bits_.size()] = true;
}
}
inline bool BloomFilter::MayContain(std::string_view key) const {
if (bits_.empty()) return true;
auto h = Hash(key);
for (int i = 0; i < num_hashes_; ++i) {
uint64_t hash = h[0] + i * h[1];
if (!bits_[hash % bits_.size()]) return false;
}
return true;
}
inline std::array<uint64_t, 2> BloomFilter::Hash(std::string_view key) {
XXH128_hash_t const hash_val = XXH3_128bits(key.data(), key.size());
return {hash_val.low64, hash_val.high64};
}
// --- Columnar MemTable Components ---
struct StoredRecord {
RecordRef record;
std::atomic<bool> ready{false};
};
// Manages thread IDs, allowing for efficient recycling to keep the ID range small.
class ThreadIdManager {
public:
static constexpr size_t kMaxThreads = 256;
static uint32_t GetId() {
thread_local ThreadIdRecycler instance;
return instance.id;
}
private:
struct ThreadIdRecycler {
uint32_t id;
ThreadIdRecycler() {
std::lock_guard<SpinLock> lock(pool_lock_);
if (!recycled_ids_.empty()) {
id = recycled_ids_.front();
recycled_ids_.pop_front();
} else {
id = next_id_.fetch_add(1, std::memory_order_relaxed);
if (id >= kMaxThreads) {
next_id_.fetch_sub(1, std::memory_order_relaxed);
throw std::runtime_error("Exceeded kMaxThreads. Increase the compile-time constant.");
}
}
}
~ThreadIdRecycler() {
std::lock_guard<SpinLock> lock(pool_lock_);
recycled_ids_.push_back(id);
}
};
// Use inline static to define and initialize in header for C++17+
static inline std::atomic<uint32_t> next_id_{0};
static inline std::deque<uint32_t> recycled_ids_;
static inline SpinLock pool_lock_;
};
// Thread-local arena for storing record data (keys and values) efficiently.
// Data is allocated in chunks to reduce fragmentation and contention.
class ColumnarRecordArena {
private:
friend class ColumnarMemTable;
struct DataChunk {
static constexpr size_t kRecordCapacity = 256;
static constexpr size_t kBufferCapacity = 32 * 1024;
// Combine record index and buffer position into one atomic to prevent races.
// High 32 bits for record index, low 32 bits for buffer position.
std::atomic<uint64_t> positions_{0};
std::array<StoredRecord, kRecordCapacity> records;
alignas(64) char buffer[kBufferCapacity];
};
struct alignas(64) ThreadLocalData {
std::vector<std::unique_ptr<DataChunk>> chunks;
DataChunk* current_chunk = nullptr;
// Add a lock to prevent multiple threads from switching the chunk for the same TLS simultaneously.
SpinLock chunk_switch_lock;
ThreadLocalData() { AddNewChunk(); }
void AddNewChunk() {
chunks.push_back(std::make_unique<DataChunk>());
current_chunk = chunks.back().get();
}
};
public:
ColumnarRecordArena();
~ColumnarRecordArena();
const StoredRecord* AllocateAndAppend(std::string_view key, std::string_view value, RecordType type);
std::vector<const StoredRecord*> AllocateAndAppendBatch(
const std::vector<std::pair<std::string_view, std::string_view>>& batch, RecordType type);
size_t size() const { return size_.load(std::memory_order_acquire); }
size_t ApproximateMemoryUsage() const { return memory_usage_.load(std::memory_order_relaxed); }
uint32_t GetMaxThreadIdSeen() const { return max_tid_seen_.load(std::memory_order_acquire); }
const std::array<std::atomic<ThreadLocalData*>, ThreadIdManager::kMaxThreads>& GetAllTlsData() const {
return all_tls_data_;
}
private:
ThreadLocalData* GetTlsData();
std::array<std::atomic<ThreadLocalData*>, ThreadIdManager::kMaxThreads> all_tls_data_{};
std::vector<ThreadLocalData*> owned_tls_data_;
SpinLock owner_lock_;
std::atomic<size_t> size_;
std::atomic<size_t> memory_usage_{0};
std::atomic<uint32_t> max_tid_seen_{0};
};
inline ColumnarRecordArena::ColumnarRecordArena() : size_(0) {}
inline ColumnarRecordArena::~ColumnarRecordArena() {
std::lock_guard<SpinLock> lock(owner_lock_);
for (auto* ptr : owned_tls_data_) {
delete ptr;
}
}
inline ColumnarRecordArena::ThreadLocalData* ColumnarRecordArena::GetTlsData() {
uint32_t tid = ThreadIdManager::GetId();
uint32_t current_max = max_tid_seen_.load(std::memory_order_relaxed);
while (tid > current_max) {
if (max_tid_seen_.compare_exchange_weak(current_max, tid, std::memory_order_release,
std::memory_order_relaxed)) {
break;
}
}
ThreadLocalData* my_data = all_tls_data_[tid].load(std::memory_order_acquire);
if (my_data == nullptr) {
auto* new_data = new ThreadLocalData();
ThreadLocalData* expected_null = nullptr;
if (all_tls_data_[tid].compare_exchange_strong(expected_null, new_data, std::memory_order_release,
std::memory_order_acquire)) {
std::lock_guard<SpinLock> lock(owner_lock_);
owned_tls_data_.push_back(new_data);
my_data = new_data;
memory_usage_.fetch_add(sizeof(ThreadLocalData) + sizeof(DataChunk), std::memory_order_relaxed);
} else {
delete new_data;
my_data = expected_null;
}
}
return my_data;
}
// Rewritten AllocateAndAppend with atomic 64-bit CAS for race-free allocation.
// The logic now internally handles chunk switches, simplifying the caller.
inline const StoredRecord* ColumnarRecordArena::AllocateAndAppend(std::string_view key, std::string_view value,
RecordType type) {
ThreadLocalData* tls_data = GetTlsData();
size_t required_size = key.size() + value.size();
// Prevent allocation of single items larger than the buffer.
if (required_size > DataChunk::kBufferCapacity) {
return nullptr;
}
uint32_t record_idx;
uint32_t buffer_offset;
while (true) {
DataChunk* chunk = tls_data->current_chunk;
uint64_t old_pos = chunk->positions_.load(std::memory_order_relaxed);
while (true) {
uint32_t old_ridx = static_cast<uint32_t>(old_pos >> 32);
uint32_t old_bpos = static_cast<uint32_t>(old_pos);
if (old_ridx >= DataChunk::kRecordCapacity || old_bpos + required_size > DataChunk::kBufferCapacity) {
break; // Chunk is full, need to switch.
}
uint64_t new_pos = (static_cast<uint64_t>(old_ridx + 1) << 32) | (old_bpos + required_size);
if (chunk->positions_.compare_exchange_weak(old_pos, new_pos, std::memory_order_acq_rel)) {
record_idx = old_ridx;
buffer_offset = old_bpos;
goto allocation_success; // Exit both loops
}
// CAS failed, another thread updated positions_. Retry with the new old_pos.
}
// If we're here, the chunk is full. Acquire lock to switch it.
std::lock_guard<SpinLock> lock(tls_data->chunk_switch_lock);
// Re-check if another thread already switched the chunk while we waited for the lock.
if (chunk == tls_data->current_chunk) {
tls_data->AddNewChunk();
memory_usage_.fetch_add(sizeof(DataChunk), std::memory_order_relaxed);
}
// Loop again to try allocating in the new chunk.
}
allocation_success:
DataChunk* final_chunk = tls_data->current_chunk; // The chunk where allocation succeeded
char* key_mem = final_chunk->buffer + buffer_offset;
memcpy(key_mem, key.data(), key.size());
char* val_mem = key_mem + key.size();
memcpy(val_mem, value.data(), value.size());
StoredRecord& record_slot = final_chunk->records[record_idx];
record_slot.record = {{key_mem, key.size()}, {val_mem, value.size()}, type};
record_slot.ready.store(true, std::memory_order_release);
size_.fetch_add(1, std::memory_order_release);
memory_usage_.fetch_add(required_size + sizeof(StoredRecord), std::memory_order_relaxed);
return &record_slot;
}
// Rewritten batch allocation with the same robust CAS-based approach.
inline std::vector<const StoredRecord*> ColumnarRecordArena::AllocateAndAppendBatch(
const std::vector<std::pair<std::string_view, std::string_view>>& batch, RecordType type) {
std::vector<const StoredRecord*> results;
if (batch.empty()) return results;
results.reserve(batch.size());
ThreadLocalData* tls_data = GetTlsData();
size_t batch_offset = 0;
while (batch_offset < batch.size()) {
DataChunk* chunk = tls_data->current_chunk;
uint64_t old_pos = chunk->positions_.load(std::memory_order_relaxed);
uint32_t allocated_record_idx = 0;
uint32_t allocated_buffer_pos = 0;
uint32_t records_to_alloc = 0;
size_t buffer_needed = 0;
while (true) { // CAS loop
uint32_t old_ridx = static_cast<uint32_t>(old_pos >> 32);
uint32_t old_bpos = static_cast<uint32_t>(old_pos);
records_to_alloc = 0;
buffer_needed = 0;
for (size_t i = batch_offset; i < batch.size(); ++i) {
const auto& [key, value] = batch[i];
size_t item_size = key.size() + value.size();
if (old_ridx + records_to_alloc < DataChunk::kRecordCapacity &&
old_bpos + buffer_needed + item_size <= DataChunk::kBufferCapacity) {
records_to_alloc++;
buffer_needed += item_size;
} else {
break;
}
}
if (records_to_alloc == 0) {
break; // Not enough space for even one item, need to switch chunk.
}
uint64_t new_pos = (static_cast<uint64_t>(old_ridx + records_to_alloc) << 32) | (old_bpos + buffer_needed);
if (chunk->positions_.compare_exchange_weak(old_pos, new_pos, std::memory_order_acq_rel)) {
allocated_record_idx = old_ridx;
allocated_buffer_pos = old_bpos;
goto batch_allocation_success;
}
}
// If we're here, the chunk is full for this sub-batch.
{
std::lock_guard<SpinLock> lock(tls_data->chunk_switch_lock);
if (chunk == tls_data->current_chunk) {
// Handle oversized items that will never fit.
const auto& [key, value] = batch[batch_offset];
if (key.size() + value.size() > DataChunk::kBufferCapacity) {
batch_offset++; // Skip this item and continue.
continue;
}
tls_data->AddNewChunk();
memory_usage_.fetch_add(sizeof(DataChunk), std::memory_order_relaxed);
}
continue; // Retry with the new chunk.
}
batch_allocation_success:
size_t current_buffer_offset_in_batch = 0;
for (uint32_t i = 0; i < records_to_alloc; ++i) {
const auto& [key, value] = batch[batch_offset + i];
size_t item_size = key.size() + value.size();
char* key_mem = chunk->buffer + allocated_buffer_pos + current_buffer_offset_in_batch;
memcpy(key_mem, key.data(), key.size());
char* val_mem = key_mem + key.size();
memcpy(val_mem, value.data(), value.size());
StoredRecord& record_slot = chunk->records[allocated_record_idx + i];
record_slot.record = {{key_mem, key.size()}, {val_mem, value.size()}, type};
record_slot.ready.store(true, std::memory_order_release);
results.push_back(&record_slot);
current_buffer_offset_in_batch += item_size;
}
size_.fetch_add(records_to_alloc, std::memory_order_release);
memory_usage_.fetch_add(buffer_needed + records_to_alloc * sizeof(StoredRecord), std::memory_order_relaxed);
batch_offset += records_to_alloc;
}
return results;
}
class ConcurrentStringHashMap {
public:
static constexpr uint8_t EMPTY_TAG = 0xFF, LOCKED_TAG = 0xFE;
private:
struct alignas(64) Slot {
std::atomic<uint8_t> tag;
uint64_t full_hash;
std::string_view key;
std::atomic<const StoredRecord*> record;
};
std::unique_ptr<Slot[]> slots_;
size_t capacity_, capacity_mask_;
XXHasher hasher_;
public:
ConcurrentStringHashMap(const ConcurrentStringHashMap&) = delete;
ConcurrentStringHashMap& operator=(const ConcurrentStringHashMap&) = delete;
static size_t calculate_power_of_2(size_t n) { return n == 0 ? 1 : 1UL << (64 - __builtin_clzll(n - 1)); }
explicit ConcurrentStringHashMap(size_t build_size);
void Insert(std::string_view key, const StoredRecord* new_record);
const StoredRecord* Find(std::string_view key) const;
};
inline ConcurrentStringHashMap::ConcurrentStringHashMap(size_t build_size) {
size_t capacity = calculate_power_of_2(build_size * 1.5 + 64);
capacity_ = capacity;
capacity_mask_ = capacity - 1;
slots_ = std::make_unique<Slot[]>(capacity_);
for (size_t i = 0; i < capacity_; ++i) {
slots_[i].tag.store(EMPTY_TAG, std::memory_order_relaxed);
slots_[i].record.store(nullptr, std::memory_order_relaxed);
}
}
inline void ConcurrentStringHashMap::Insert(std::string_view key, const StoredRecord* new_record) {
uint64_t hash = hasher_(key);
uint8_t tag = (hash >> 56);
if (tag >= LOCKED_TAG) tag = 0;
size_t pos = hash & capacity_mask_;
const size_t initial_pos = pos;
while (true) {
uint8_t current_tag = slots_[pos].tag.load(std::memory_order_acquire);
if (current_tag == tag && slots_[pos].full_hash == hash && slots_[pos].key == key) {
slots_[pos].record.store(new_record, std::memory_order_release);
return;
}
if (current_tag == EMPTY_TAG) {
uint8_t expected_empty = EMPTY_TAG;
if (slots_[pos].tag.compare_exchange_strong(expected_empty, LOCKED_TAG, std::memory_order_acq_rel)) {
slots_[pos].key = key;
slots_[pos].full_hash = hash;
slots_[pos].record.store(new_record, std::memory_order_relaxed);
slots_[pos].tag.store(tag, std::memory_order_release);
return;
}
continue;
}
pos = (pos + 1) & capacity_mask_;
if (pos == initial_pos) {
throw std::runtime_error("ConcurrentStringHashMap is full. Consider increasing capacity.");
}
}
}
inline const StoredRecord* ConcurrentStringHashMap::Find(std::string_view key) const {
uint64_t hash = hasher_(key);
uint8_t tag = (hash >> 56);
if (tag >= LOCKED_TAG) tag = 0;
size_t pos = hash & capacity_mask_;
const size_t initial_pos = pos;
do {
uint8_t current_tag = slots_[pos].tag.load(std::memory_order_acquire);
if (current_tag == EMPTY_TAG) return nullptr;
if (current_tag == tag && slots_[pos].full_hash == hash && slots_[pos].key == key) {
const StoredRecord* rec = slots_[pos].record.load(std::memory_order_acquire);
if (rec && rec->ready.load(std::memory_order_acquire)) {
return rec;
}
return nullptr;
}
pos = (pos + 1) & capacity_mask_;
} while (pos != initial_pos);
return nullptr;
}
class FlashActiveBlock {
friend class ColumnarMemTable;
public:
explicit FlashActiveBlock(size_t cap) : index_(cap) {}
~FlashActiveBlock() = default;
// With the new Arena, TryAdd only fails if the block is sealed or the item is too large.
// The caller no longer needs to retry on chunk-full conditions.
bool TryAdd(std::string_view key, std::string_view value, RecordType type) {
if (is_sealed()) return false;
const StoredRecord* record_ptr = data_log_.AllocateAndAppend(key, value, type);
if (record_ptr) {
index_.Insert(record_ptr->record.key, record_ptr);
return true;
}
return false;
}
bool TryAddBatch(const std::vector<std::pair<std::string_view, std::string_view>>& batch, RecordType type) {
if (is_sealed()) return false;
auto record_ptrs = data_log_.AllocateAndAppendBatch(batch, type);
if (is_sealed()) return false; // Re-check after allocation
for (const auto* record_ptr : record_ptrs) {
if (record_ptr) {
index_.Insert(record_ptr->record.key, record_ptr);
}
}
return true;
}
std::optional<RecordRef> Get(std::string_view key) const {
const StoredRecord* record_ptr = index_.Find(key);
return record_ptr ? std::optional<RecordRef>(record_ptr->record) : std::nullopt;
}
size_t size() const { return data_log_.size(); }
size_t ApproximateMemoryUsage() const { return data_log_.ApproximateMemoryUsage() + sizeof(index_); }
void Seal() { sealed_.store(true, std::memory_order_release); }
bool is_sealed() const { return sealed_.load(std::memory_order_acquire); }
private:
ColumnarRecordArena data_log_;
ConcurrentStringHashMap index_;
std::atomic<bool> sealed_{false};
};
class ColumnarBlock {
public:
class SimpleArena {
public:
char* AllocateRaw(size_t bytes) {
if (current_block_idx_ < 0 || blocks_[current_block_idx_].pos + bytes > blocks_[current_block_idx_].size) {
size_t bs = std::max(bytes, (size_t)4096);
blocks_.emplace_back(bs);
current_block_idx_++;
}
Block& b = blocks_[current_block_idx_];
char* r = b.data.get() + b.pos;
b.pos += bytes;
return r;
}
std::string_view AllocateAndCopy(std::string_view d) {
char* m = AllocateRaw(d.size());
if (!d.empty()) memcpy(m, d.data(), d.size());
return {m, d.size()};
}
size_t ApproximateMemoryUsage() const {
size_t total = 0;
for (const auto& block : blocks_) total += block.size;
return total;
}
private:
struct Block {
std::unique_ptr<char[]> data;
size_t pos, size;
explicit Block(size_t s) : data(new char[s]), pos(0), size(s) {}
};
std::vector<Block> blocks_;
int current_block_idx_ = -1;
};
SimpleArena arena;
std::vector<std::string_view> keys, values;
std::vector<RecordType> types;
void Add(std::string_view k, std::string_view v, RecordType t) {
keys.push_back(arena.AllocateAndCopy(k));
values.push_back(arena.AllocateAndCopy(v));
types.push_back(t);
}
size_t size() const { return keys.size(); }
bool empty() const { return keys.empty(); }
void Clear() {
keys.clear();
values.clear();
types.clear();
arena = SimpleArena();
}
size_t ApproximateMemoryUsage() const {
size_t total = arena.ApproximateMemoryUsage();
total += keys.capacity() * sizeof(std::string_view);
total += values.capacity() * sizeof(std::string_view);
total += types.capacity() * sizeof(RecordType);
return total;
}
};
class Sorter {
public:
virtual ~Sorter() = default;
virtual std::vector<uint32_t> Sort(const ColumnarBlock& block) const = 0;
};
class StdSorter : public Sorter {
public:
std::vector<uint32_t> Sort(const ColumnarBlock& block) const override {
if (block.empty()) return {};
std::vector<uint32_t> indices(block.size());
std::iota(indices.begin(), indices.end(), 0);
std::stable_sort(indices.begin(), indices.end(),
[&block](uint32_t a, uint32_t b) { return block.keys[a] < block.keys[b]; });
return indices;
}
};
class ParallelRadixSorter : public Sorter {
public:
std::vector<uint32_t> Sort(const ColumnarBlock& block) const override {
if (block.empty()) return {};
std::vector<uint32_t> indices(block.size());
std::iota(indices.begin(), indices.end(), 0);
unsigned int num_threads = std::thread::hardware_concurrency();
if (num_threads == 0) num_threads = 1;
radix_sort_msd_parallel(indices.begin(), indices.end(), 0, num_threads, block);
return indices;
}
private:
static constexpr size_t kSequentialSortThreshold = 2048;
static constexpr size_t kRadixAlphabetSize = 256;
using Iterator = std::vector<uint32_t>::iterator;
static inline int get_char_at(std::string_view s, size_t depth) {
return depth < s.size() ? static_cast<unsigned char>(s[depth]) : -1;
}
void radix_sort_msd_sequential(Iterator begin, Iterator end, size_t depth, const ColumnarBlock& block) const {
if (static_cast<size_t>(std::distance(begin, end)) <= 1) return;
if (static_cast<size_t>(std::distance(begin, end)) <= kSequentialSortThreshold) {
std::stable_sort(begin, end, [&](uint32_t a, uint32_t b) {
return block.keys[a].substr(std::min(depth, block.keys[a].size())) <
block.keys[b].substr(std::min(depth, block.keys[b].size()));
});
return;
}
std::vector<uint32_t> buckets[kRadixAlphabetSize];
std::vector<uint32_t> finished_strings;
for (auto it = begin; it != end; ++it) {
int char_code = get_char_at(block.keys[*it], depth);
if (char_code == -1) {
finished_strings.push_back(*it);
} else {
buckets[char_code].push_back(*it);
}
}
auto current = begin;
std::copy(finished_strings.begin(), finished_strings.end(), current);
current += finished_strings.size();
for (size_t i = 0; i < kRadixAlphabetSize; ++i) {
if (!buckets[i].empty()) {
auto bucket_begin = current;
std::copy(buckets[i].begin(), buckets[i].end(), bucket_begin);
current += buckets[i].size();
radix_sort_msd_sequential(bucket_begin, current, depth + 1, block);
}
}
}
void radix_sort_msd_parallel(Iterator begin, Iterator end, size_t depth, unsigned int num_threads,
const ColumnarBlock& block) const {
const size_t size = std::distance(begin, end);
if (size <= kSequentialSortThreshold || num_threads <= 1) {
radix_sort_msd_sequential(begin, end, depth, block);
return;
}
std::vector<size_t> bucket_counts(kRadixAlphabetSize + 1, 0);
for (auto it = begin; it != end; ++it) {
bucket_counts[get_char_at(block.keys[*it], depth) + 1]++;
}
std::vector<size_t> bucket_offsets(kRadixAlphabetSize + 2, 0);
for (size_t i = 0; i < kRadixAlphabetSize + 1; ++i) {
bucket_offsets[i + 1] = bucket_offsets[i] + bucket_counts[i];
}
std::vector<uint32_t> sorted_output(size);
std::vector<size_t> current_offsets = bucket_offsets;
for (auto it = begin; it != end; ++it) {
uint32_t val = *it;
int char_code = get_char_at(block.keys[val], depth);
sorted_output[current_offsets[char_code + 1]++] = val;
}
std::copy(sorted_output.begin(), sorted_output.end(), begin);
std::vector<std::future<void>> futures;
for (size_t i = 1; i < kRadixAlphabetSize + 1; ++i) {
size_t bucket_size = bucket_counts[i];
if (bucket_size == 0) continue;
Iterator bucket_begin = begin + bucket_offsets[i];
Iterator bucket_end = begin + bucket_offsets[i + 1];
if (futures.size() < num_threads - 1 && bucket_size > kSequentialSortThreshold) {
// Improved thread distribution logic.
futures.push_back(std::async(
std::launch::async, [this, bucket_begin, bucket_end, depth, num_threads, &block, &futures] {
unsigned int threads_for_child = std::max(1u, num_threads / (unsigned int)(futures.size() + 1));
radix_sort_msd_parallel(bucket_begin, bucket_end, depth + 1, threads_for_child, block);
}));
} else {
radix_sort_msd_sequential(bucket_begin, bucket_end, depth + 1, block);
}
}
for (auto& f : futures) {
f.get();
}
}
};
class SortedColumnarBlock {
public:
class Iterator;
static constexpr size_t kSparseIndexSampleRate = 16;
explicit SortedColumnarBlock(std::shared_ptr<ColumnarBlock> block, const Sorter& sorter,
bool build_bloom_filter = true);
bool MayContain(std::string_view key) const;
std::optional<RecordRef> Get(std::string_view key) const;
std::string_view min_key() const { return min_key_; }
std::string_view max_key() const { return max_key_; }
Iterator begin() const;
bool empty() const { return sorted_indices_.empty(); }
size_t size() const { return sorted_indices_.size(); }
size_t ApproximateMemoryUsage() const {
size_t usage = sizeof(*this);
if (block_data_) usage += block_data_->ApproximateMemoryUsage();
usage += sorted_indices_.capacity() * sizeof(uint32_t);
if (bloom_filter_) usage += bloom_filter_->ApproximateMemoryUsage();
usage += sparse_index_.capacity() * sizeof(std::pair<std::string_view, size_t>);
return usage;
}
private:
friend class Iterator;
std::shared_ptr<ColumnarBlock> block_data_;
std::vector<uint32_t> sorted_indices_;
std::string_view min_key_, max_key_;
std::unique_ptr<BloomFilter> bloom_filter_;
std::vector<std::pair<std::string_view, size_t>> sparse_index_;
};
inline SortedColumnarBlock::SortedColumnarBlock(std::shared_ptr<ColumnarBlock> b, const Sorter& s,
bool build_bloom_filter)
: block_data_(std::move(b)) {
sorted_indices_ = s.Sort(*block_data_);
if (sorted_indices_.empty()) {
min_key_ = {};
max_key_ = {};
return;
}
min_key_ = block_data_->keys[sorted_indices_.front()];
max_key_ = block_data_->keys[sorted_indices_.back()];
constexpr size_t kBloomFilterThreshold = 256 * 1024;
if (build_bloom_filter && block_data_->size() < kBloomFilterThreshold) {
bloom_filter_ = std::make_unique<BloomFilter>(block_data_->size());
for (size_t i = 0; i < block_data_->size(); ++i) {
bloom_filter_->Add(block_data_->keys[i]);
}
}
sparse_index_.reserve(sorted_indices_.size() / kSparseIndexSampleRate + 1);
for (size_t i = 0; i < sorted_indices_.size(); i += kSparseIndexSampleRate)
sparse_index_.emplace_back(block_data_->keys[sorted_indices_[i]], i);
}
inline bool SortedColumnarBlock::MayContain(std::string_view key) const {
if (empty() || key < min_key_ || key > max_key_) return false;
if (!bloom_filter_) {
return true;
}
return bloom_filter_->MayContain(key);
}
inline std::optional<RecordRef> SortedColumnarBlock::Get(std::string_view key) const {
if (!MayContain(key)) return std::nullopt;
auto sparse_it = std::lower_bound(sparse_index_.begin(), sparse_index_.end(), key,
[](const auto& a, auto b) { return a.first < b; });
auto start_it = sorted_indices_.begin();
if (sparse_it != sparse_index_.begin()) start_it += (sparse_it - 1)->second;
auto end_it = sorted_indices_.end();
if (sparse_it != sparse_index_.end()) {
end_it = sorted_indices_.begin() + sparse_it->second + kSparseIndexSampleRate;
if (end_it > sorted_indices_.end()) end_it = sorted_indices_.end();
}
auto it = std::lower_bound(start_it, end_it, key,
[&](uint32_t i, std::string_view k) { return block_data_->keys[i] < k; });
if (it == end_it || block_data_->keys[*it] != key) {
return std::nullopt;
}
// stable_sort ensures that for identical keys, the one inserted later appears later in the sorted list.
// We want the latest version, so find the end of the range of equal keys and take the one just before it.
auto range_end =
std::upper_bound(it, end_it, key, [&](std::string_view k, uint32_t i) { return k < block_data_->keys[i]; });
uint32_t latest_idx = *std::prev(range_end);
return RecordRef{block_data_->keys[latest_idx], block_data_->values[latest_idx], block_data_->types[latest_idx]};
}
class SortedColumnarBlock::Iterator {
public:
Iterator(const SortedColumnarBlock* b, size_t p) : block_(b), pos_(p) {}
RecordRef operator*() const {
uint32_t i = block_->sorted_indices_[pos_];
return {block_->block_data_->keys[i], block_->block_data_->values[i], block_->block_data_->types[i]};
}
void Next() { ++pos_; }
bool IsValid() const { return block_ && pos_ < block_->sorted_indices_.size(); }
private:
const SortedColumnarBlock* block_;
size_t pos_;
};
inline SortedColumnarBlock::Iterator SortedColumnarBlock::begin() const { return Iterator(this, 0); }
class FlushIterator {
public:
explicit FlushIterator(const std::vector<std::shared_ptr<const SortedColumnarBlock>>& sources);
bool IsValid() const { return !min_heap_.empty(); }
RecordRef Get() const { return min_heap_.top().record; }
void Next();
private:
struct HeapNode {
RecordRef record;
uint64_t key_prefix;
size_t source_index;
// Use original logic with key_prefix optimization
bool operator>(const HeapNode& o) const {
if (key_prefix != o.key_prefix) return key_prefix > o.key_prefix;
if (record.key != o.record.key) return record.key > o.record.key;
// To ensure stability, use source index as a tie-breaker.
// Iterators created later (for newer data) should have a higher index.
// A smaller source_index means older data, so it should have higher priority (come first).
return source_index > o.source_index;
}
};
std::vector<SortedColumnarBlock::Iterator> iterators_;
std::priority_queue<HeapNode, std::vector<HeapNode>, std::greater<HeapNode>> min_heap_;
};
inline FlushIterator::FlushIterator(const std::vector<std::shared_ptr<const SortedColumnarBlock>>& sources) {
iterators_.reserve(sources.size());
for (size_t i = 0; i < sources.size(); ++i) {
if (sources[i]) {
iterators_.emplace_back(sources[i]->begin());
} else {
iterators_.emplace_back(nullptr, 0);
}
if (iterators_.back().IsValid()) {
RecordRef rec = *iterators_.back();
uint64_t prefix = load_u64_prefix(rec.key);
min_heap_.push({rec, prefix, i});
}
}
}
inline void FlushIterator::Next() {
if (!IsValid()) return;
HeapNode n = min_heap_.top();
min_heap_.pop();
iterators_[n.source_index].Next();
if (iterators_[n.source_index].IsValid()) {
RecordRef rec = *iterators_[n.source_index];
uint64_t prefix = load_u64_prefix(rec.key);
min_heap_.push({rec, prefix, n.source_index});
}
}
class CompactingIterator {
public:
template <typename It>
explicit CompactingIterator(std::unique_ptr<It> s);
bool IsValid() const { return is_valid_; }
RecordRef Get() const { return current_record_; }
void Next() { FindNext(); }
private:
struct ItConcept {
virtual ~ItConcept() = default;
virtual bool IsValid() const = 0;
virtual RecordRef Get() const = 0;
virtual void Next() = 0;
};
template <typename It>
struct ItWrapper final : public ItConcept {
explicit ItWrapper(std::unique_ptr<It> i) : iter_(std::move(i)) {}
bool IsValid() const override { return iter_->IsValid(); }
RecordRef Get() const override { return iter_->Get(); }
void Next() override { iter_->Next(); }
std::unique_ptr<It> iter_;
};
void FindNext();
std::unique_ptr<ItConcept> source_;
RecordRef current_record_;
bool is_valid_ = false;
};
template <typename It>
inline CompactingIterator::CompactingIterator(std::unique_ptr<It> s)
: source_(std::make_unique<ItWrapper<It>>(std::move(s))) {
FindNext();
}
inline void CompactingIterator::FindNext() {
while (source_->IsValid()) {
RecordRef latest_record = source_->Get();
source_->Next();
while (source_->IsValid() && source_->Get().key == latest_record.key) {
latest_record = source_->Get();
source_->Next();
}
if (latest_record.type == RecordType::Put) {
current_record_ = latest_record;
is_valid_ = true;
return;
}
}
is_valid_ = false;
}
class ColumnarMemTable : public std::enable_shared_from_this<ColumnarMemTable> {
public:
using GetResult = std::optional<std::string_view>;
using MultiGetResult = std::map<std::string_view, GetResult, std::less<>>;
~ColumnarMemTable() {
{
std::lock_guard<std::mutex> lock(queue_mutex_);
stop_background_thread_ = true;
}
queue_cond_.notify_one();
if (background_thread_.joinable()) background_thread_.join();
}
ColumnarMemTable(const ColumnarMemTable&) = delete;
ColumnarMemTable& operator=(const ColumnarMemTable&) = delete;
// Adopt factory pattern for safe lifetime management with background thread.
static std::shared_ptr<ColumnarMemTable> Create(
size_t active_block_size_bytes = 16 * 1024 * 48, bool enable_compaction = false,
std::shared_ptr<Sorter> sorter = std::make_shared<ParallelRadixSorter>(), size_t num_shards = 16) {
struct MakeSharedEnabler : public ColumnarMemTable {
MakeSharedEnabler(size_t active_block_size_bytes, bool enable_compaction, std::shared_ptr<Sorter> sorter,
size_t num_shards)
: ColumnarMemTable(active_block_size_bytes, enable_compaction, std::move(sorter), num_shards) {}
};
auto table = std::make_shared<MakeSharedEnabler>(active_block_size_bytes, enable_compaction, std::move(sorter),
num_shards);
table->StartBackgroundThread();
return table;
}
void Put(std::string_view key, std::string_view value) { Insert(key, value, RecordType::Put); }
void Delete(std::string_view key) { Insert(key, "", RecordType::Delete); }
GetResult Get(std::string_view key) const;
MultiGetResult MultiGet(const std::vector<std::string_view>& keys) const;
void PutBatch(const std::vector<std::pair<std::string_view, std::string_view>>& batch);
void WaitForBackgroundWork();
std::unique_ptr<CompactingIterator> NewCompactingIterator();
size_t ApproximateMemoryUsage() const;
private:
// Make constructor private for factory pattern.
explicit ColumnarMemTable(size_t active_block_size_bytes, bool enable_compaction, std::shared_ptr<Sorter> sorter,
size_t num_shards)
: active_block_threshold_(std::max((size_t)1, active_block_size_bytes / 116)),
enable_compaction_(enable_compaction),
sorter_(std::move(sorter)),
num_shards_(num_shards > 0 ? 1UL << (63 - __builtin_clzll(num_shards - 1)) : 1),
shard_mask_(num_shards_ - 1) {
for (size_t i = 0; i < num_shards_; ++i) {
shards_.push_back(std::make_unique<Shard>(active_block_threshold_));
}
}
void StartBackgroundThread() { background_thread_ = std::thread(&ColumnarMemTable::BackgroundWorkerLoop, this); }
struct ImmutableState {
using SortedBlockList = std::vector<std::shared_ptr<const SortedColumnarBlock>>;
using SealedBlockList = std::vector<std::shared_ptr<FlashActiveBlock>>;
std::shared_ptr<const SealedBlockList> sealed_blocks;
std::shared_ptr<const SortedBlockList> blocks;
ImmutableState()
: sealed_blocks(std::make_shared<const SealedBlockList>()),
blocks(std::make_shared<const SortedBlockList>()) {}
};
struct alignas(64) Shard {
std::shared_ptr<FlashActiveBlock> active_block_;
std::shared_ptr<const ImmutableState> immutable_state_;
std::atomic<uint64_t> version_{0};
SpinLock seal_mutex_;
Shard(size_t active_block_threshold) {
active_block_ = std::make_shared<FlashActiveBlock>(active_block_threshold);
immutable_state_ = std::make_shared<const ImmutableState>();
}
};
struct BackgroundWorkItem {
std::shared_ptr<FlashActiveBlock> block;
std::unique_ptr<std::promise<void>> promise;
size_t shard_idx;
};
const size_t active_block_threshold_;