-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Expand file tree
/
Copy pathinverted_index.hpp
More file actions
354 lines (282 loc) · 13.1 KB
/
inverted_index.hpp
File metadata and controls
354 lines (282 loc) · 13.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
#pragma once
#include <algorithm>
#include <cmath>
#include <cstdint>
#include <cstring>
#include <functional>
#include <limits>
#include <memory>
#include <queue>
#include <shared_mutex>
#include <thread>
#include <unordered_map>
#include <vector>
#if defined(__x86_64__) || defined(_M_X64)
# include <immintrin.h>
#elif defined(__aarch64__) || defined(_M_ARM64)
# if defined(USE_SVE2)
# include <arm_sve.h>
# endif
# include <arm_neon.h>
#endif // defined(__x86_64__) || defined(_M_X64)
#include "mdbx/mdbx.h"
#include "../core/types.hpp"
#include "../utils/log.hpp"
#include "../utils/settings.hpp"
#include "sparse_vector.hpp"
namespace ndd {
static constexpr ndd::idInt EXHAUSTED_DOC_ID = std::numeric_limits<ndd::idInt>::max();
#pragma pack(push, 1)
// Per-term metadata stored under the reserved metadata key for that term.
struct PostingListHeader {
uint32_t nr_entries = 0;
uint32_t nr_live_entries = 0;
float max_value = 0.0f;
};
// Header that prefixes each on-disk (term_id, block_nr) payload.
struct BlockHeader {
uint16_t nr_entries = 0;
uint16_t nr_live_in_block = 0;
float max_value = 0.0f;
};
// Single metadata row stored at packPostingKey(kMetadataTermId, kSuperBlockBlockNr).
// Checked on initialize() to reject incompatible databases.
struct SuperBlock {
uint8_t format_version = 0;
};
#pragma pack(pop)
// Fully decoded posting entry used only in update/delete paths.
struct PostingListEntry {
ndd::idInt doc_id;
float value;
PostingListEntry() : doc_id(0), value(0.0f) {}
PostingListEntry(ndd::idInt id, float val) : doc_id(id), value(val) {}
};
struct ScoredDoc {
ndd::idInt doc_id;
float score;
ScoredDoc(ndd::idInt id, float s) : doc_id(id), score(s) {}
bool operator<(const ScoredDoc& other) const {
// Reverse ordering so std::priority_queue behaves like a min-heap on score.
return score > other.score;
}
};
// MDBX-backed sparse inverted index. Search walks zero-copy block views directly,
// while update/delete paths decode a block into PostingListEntry objects, merge,
// and write the block back.
class InvertedIndex {
public:
InvertedIndex(MDBX_env* env,
size_t vocab_size,
const std::string& index_id,
ndd::SparseScoringModel sparse_model =
ndd::SparseScoringModel::DEFAULT);
~InvertedIndex() = default;
bool initialize();
bool addDocumentsBatch(MDBX_txn* txn,
const std::vector<std::pair<ndd::idInt, SparseVector>>& docs);
bool removeDocument(MDBX_txn* txn, ndd::idInt doc_id, const SparseVector& vec);
size_t getTermCount() const;
size_t getVocabSize() const;
std::vector<std::pair<ndd::idInt, float>>search(const SparseVector& query,
size_t k,
const ndd::RoaringBitmap* filter = nullptr);
std::vector<std::pair<ndd::idInt, float>>search(const SparseVector& query,
size_t k,
size_t total_nr_docs,
const ndd::RoaringBitmap* filter = nullptr);
private:
friend class InvertedIndexTestPeer;
MDBX_env* env_;
MDBX_dbi blocked_term_postings_dbi_;
size_t vocab_size_;
std::string index_id_;
ndd::SparseScoringModel sparse_model_;
// Cached per-term max values loaded from posting-list metadata. Search uses this
// to skip absent terms quickly and to compute pruning bounds.
std::unordered_map<uint32_t, float> term_info_;
mutable std::shared_mutex mutex_;
using BlockOffset = uint16_t;
static constexpr uint32_t kBlockCapacity = std::numeric_limits<BlockOffset>::max();
// Sentinel IDs reserved for metadata rows in blocked_term_postings.
static constexpr uint32_t kMetadataTermId = std::numeric_limits<uint32_t>::max();
static constexpr uint32_t kMetadataBlockNr = std::numeric_limits<uint32_t>::max();
static constexpr uint32_t kSuperBlockBlockNr = 0;
static inline uint8_t quantize(float val, float max_val);
static inline float dequantize(uint8_t val, float max_val);
static inline bool nearEqual(float a, float b) {
return std::fabs(a - b) <= settings::NEAR_ZERO;
}
// Key packing is term_id in high 32 bits and block_nr in low 32 bits.
// This keeps all keys for a term contiguous so range scans can seek to
// [pack(term, 0), pack(term, UINT32_MAX)] efficiently.
static inline uint64_t packPostingKey(uint32_t term_id, uint32_t block_nr) {
return (static_cast<uint64_t>(term_id) << 32) | static_cast<uint64_t>(block_nr);
}
static inline uint32_t unpackTermId(uint64_t packed_key) {
return static_cast<uint32_t>(packed_key >> 32);
}
static inline uint32_t unpackBlockNr(uint64_t packed_key) {
return static_cast<uint32_t>(packed_key & 0xFFFFFFFFULL);
}
static inline uint32_t docToBlockNr(ndd::idInt doc_id) {
return static_cast<uint32_t>(doc_id / kBlockCapacity);
}
static inline BlockOffset docToBlockOffset(ndd::idInt doc_id) {
return static_cast<BlockOffset>(doc_id % kBlockCapacity);
}
static inline ndd::idInt blockOffsetToDocId(uint32_t block_nr, BlockOffset block_offset) {
uint64_t base = static_cast<uint64_t>(block_nr)
* static_cast<uint64_t>(kBlockCapacity);
return static_cast<ndd::idInt>(base + static_cast<uint64_t>(block_offset));
}
// Zero-copy view into a block payload owned by MDBX. Pointers remain valid only while
// the surrounding transaction/cursor stays alive and on the same record.
struct BlockView {
const BlockOffset* doc_offsets;
const void* values;
uint32_t count;
uint8_t value_bits;
float max_value;
};
// Cursor-backed iterator over one term. It only keeps the current block in memory and
// advances across MDBX records as search/pruning consumes entries.
struct PostingListIterator {
uint32_t term_id;
float term_weight;
float global_max;
const InvertedIndex* index;
// Cursor positioned somewhere within this term's contiguous MDBX key range.
MDBX_cursor* cursor;
uint32_t current_block_nr;
// Zero-copy pointers for the current block.
const BlockOffset* doc_offsets;
const void* values_ptr;
uint32_t data_size;
uint8_t value_bits;
float max_value;
uint32_t current_entry_idx;
ndd::idInt current_doc_id;
// This is maintained incrementally from posting-list metadata,
// so pruning can estimate list length without scanning all blocks.
uint32_t remaining_entries;
#ifdef NDD_INV_IDX_PRUNE_DEBUG
uint32_t initial_entries;
uint32_t pruned_entries;
#endif // NDD_INV_IDX_PRUNE_DEBUG
void init(MDBX_cursor* cursor,
uint32_t term_id,
float term_weight,
float global_max,
uint32_t total_entries,
const InvertedIndex* index);
inline float valueAt(uint32_t idx) const {
if (value_bits == 32) {
return ((const float*)values_ptr)[idx];
}
return dequantize(((const uint8_t*)values_ptr)[idx], max_value);
}
inline bool isLiveAt(uint32_t idx) const {
if (value_bits == 32) {
return ((const float*)values_ptr)[idx] > 0.0f;
}
return ((const uint8_t*)values_ptr)[idx] > 0;
}
inline float currentValue() const {
return valueAt(current_entry_idx);
}
void advanceToNextLive();
void next();
void advance(ndd::idInt target_doc_id);
float upperBound() const {
return global_max * term_weight;
}
uint32_t remainingEntries() const {
if (current_doc_id == EXHAUSTED_DOC_ID) return 0;
return remaining_entries;
}
bool loadNextBlock();
bool loadFirstBlock();
bool parseCurrentKV(const MDBX_val& key, const MDBX_val& data);
inline void consumeEntries(uint32_t count) {
// Pruning relies on remaining_entries being conservative and monotonic.
if (count >= remaining_entries) {
remaining_entries = 0;
} else {
remaining_entries -= count;
}
}
inline ndd::idInt currentBlockBaseDocId() const {
return blockOffsetToDocId(current_block_nr, 0);
}
inline ndd::idInt docIdAt(uint32_t idx) const {
return blockOffsetToDocId(current_block_nr, doc_offsets[idx]);
}
private:
static inline float dequantize(uint8_t val, float max_val) {
if (max_val <= settings::NEAR_ZERO) return 0.0f;
return (float)val * (max_val / UINT8_MAX);
}
};
size_t findDocIdSIMD(const uint32_t* doc_ids,
size_t size,
size_t start_idx,
uint32_t target) const;
size_t findNextLiveSIMD(const uint8_t* values,
size_t size,
size_t start_idx) const;
template <bool StoreFloats>
static bool accumulateBatchScores(PostingListIterator* it,
ndd::idInt batch_start,
uint32_t batch_end_block_nr,
BlockOffset batch_end_block_offset,
float* scores_buf,
float term_weight);
PostingListHeader readPostingListHeader(MDBX_txn* txn,
uint32_t term_id,
bool* out_found = nullptr) const;
bool writePostingListHeader(MDBX_txn* txn,
uint32_t term_id,
const PostingListHeader& header);
bool deletePostingListHeader(MDBX_txn* txn, uint32_t term_id);
bool loadBlockEntries(MDBX_txn* txn,
uint32_t term_id,
uint32_t block_nr,
std::vector<PostingListEntry>* entries,
uint32_t* out_live_in_block,
float* out_max_value,
bool* out_found) const;
bool saveBlockEntries(MDBX_txn* txn,
uint32_t term_id,
uint32_t block_nr,
const std::vector<PostingListEntry>& entries,
uint32_t live_in_block,
float max_val);
bool deleteBlock(MDBX_txn* txn, uint32_t term_id, uint32_t block_nr);
bool parseBlockViewFromValue(const MDBX_val& data,
uint32_t block_nr,
BlockView* out_view) const;
bool iterateTermBlocks(
MDBX_txn* txn,
uint32_t term_id,
const std::function<bool(uint32_t block_nr, const MDBX_val& data)>& callback) const;
float recomputeGlobalMaxFromBlocks(MDBX_txn* txn, uint32_t term_id) const;
static void applyHeaderDelta(PostingListHeader& header,
int64_t total_delta,
int64_t live_delta);
static float get_IDF(size_t total_nr_docs, size_t nr_live_docs_with_term);
bool loadTermInfo();
bool readSuperBlock(MDBX_txn* txn, SuperBlock* out, bool* out_found) const;
bool writeSuperBlock(MDBX_txn* txn, const SuperBlock& sb);
bool validateSuperBlock(MDBX_txn* txn);
bool addDocumentsBatchInternal(
MDBX_txn* txn,
const std::vector<std::pair<ndd::idInt, SparseVector>>& docs);
bool removeDocumentInternal(MDBX_txn* txn,
ndd::idInt doc_id,
const SparseVector& vec);
void pruneLongest(std::vector<PostingListIterator*>& iters, float min_score);
};
void printSparseSearchDebugStats();
void printSparseUpdateDebugStats();
} // namespace ndd