-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgenerate-bucket-parameters.mjs
More file actions
259 lines (221 loc) · 7.51 KB
/
generate-bucket-parameters.mjs
File metadata and controls
259 lines (221 loc) · 7.51 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
import { createHash, randomBytes } from 'crypto';
import { MongoClient, ObjectId, Binary, Long, UUID } from 'mongodb';
// ----------------------------------------------
// Config (edit these)
// ----------------------------------------------
const MONGO_URI = process.env.MONGO_URI ?? 'mongodb://localhost:27017';
const DB_NAME = process.env.MONGO_DB ?? 'powersync';
const GROUP_ID = 1;
const TOTAL_BUCKET_PARAMETER_DOCS = 50_000_000; // total docs for bucket_parameters
const PARAMETERS_PER_DOC = 1;
const PARAM_VALUE_SIZE_BYTES = 120; // payload size for bucket_parameters entries
const LOOKUP_SIZE_BYTES = 16;
const MIN_OPS_PER_SOURCE_KEY = 1;
const MAX_OPS_PER_SOURCE_KEY = 20;
const ACTIVE_ROW_POOL_SIZE = 10_000;
const BATCH_MAX_DOCS = 5_000;
const BATCH_MAX_BYTES = 32 * 1024 * 1024; // 32MB approx client-side limit
const INSERT_LOG_INTERVAL_MS = 1_000;
// ----------------------------------------------
// Helpers
// ----------------------------------------------
function randomInt(min, max) {
return min + Math.floor(Math.random() * (max - min + 1));
}
function buildLookupBuffer(high, low) {
const buf = Buffer.alloc(LOOKUP_SIZE_BYTES);
buf.writeUInt32BE(high >>> 0, 0);
buf.writeUInt32BE(low >>> 0, 4);
return buf;
}
function createRowIdPool(totalOps, prefix) {
let nextKey = 1;
let remainingOpsToAssign = totalOps;
let remainingOps = totalOps;
const active = [];
const addEntry = (remaining) => {
const entry = { rowId: `${prefix}${nextKey}`, remaining, index: active.length };
active.push(entry);
nextKey += 1;
return entry;
};
const removeEntry = (entry) => {
const idx = entry.index;
const last = active.pop();
if (last && last !== entry) {
active[idx] = last;
last.index = idx;
}
};
const fillPool = () => {
while (active.length < ACTIVE_ROW_POOL_SIZE && remainingOpsToAssign > 0) {
const remaining = Math.min(randomInt(MIN_OPS_PER_SOURCE_KEY, MAX_OPS_PER_SOURCE_KEY), remainingOpsToAssign);
remainingOpsToAssign -= remaining;
addEntry(remaining);
}
};
const pickDistinctEntries = (maxKeys) => {
fillPool();
if (active.length === 0) {
return [];
}
const count = Math.min(maxKeys, active.length);
const picked = [];
const used = new Set();
while (picked.length < count) {
const idx = randomInt(0, active.length - 1);
if (used.has(idx)) {
continue;
}
used.add(idx);
picked.push(active[idx]);
}
return picked;
};
const allocateDocsForEntry = (entry, maxDocs) => {
const count = randomInt(1, Math.min(maxDocs, entry.remaining));
entry.remaining -= count;
remainingOps -= count;
if (entry.remaining === 0) {
removeEntry(entry);
}
return count;
};
return {
pickDistinctEntries,
allocateDocsForEntry,
remaining: () => remainingOps
};
}
function uuidFromString(value) {
const hash = createHash('sha1').update(value).digest();
const bytes = Buffer.from(hash.subarray(0, 16));
bytes[6] = (bytes[6] & 0x0f) | 0x50; // version 5
bytes[8] = (bytes[8] & 0x3f) | 0x80; // RFC 4122 variant
return new UUID(bytes);
}
function createInsertLogger(label, totalDocs) {
let lastLogAt = Date.now();
let docsSince = 0;
let docsTotal = 0;
const log = (now, isFinal = false) => {
if (docsSince === 0) {
lastLogAt = now;
return;
}
const seconds = Math.max((now - lastLogAt) / 1000, 0.001);
const docsPerSec = docsSince / seconds;
console.log(
`${label}: ${docsPerSec.toFixed(0)} docs/s (${docsTotal.toLocaleString()}/${totalDocs.toLocaleString()})` +
`${isFinal ? ' [final]' : ''}`
);
docsSince = 0;
lastLogAt = now;
};
return {
record(docs) {
docsSince += docs;
docsTotal += docs;
const now = Date.now();
if (now - lastLogAt >= INSERT_LOG_INTERVAL_MS) {
log(now);
}
},
flush() {
log(Date.now(), true);
}
};
}
async function main() {
const client = new MongoClient(MONGO_URI, { maxPoolSize: 10 });
await client.connect();
const db = client.db(DB_NAME);
const bucketParameters = db.collection('bucket_parameters');
const runId = randomBytes(6).toString('hex');
const existingCount = await bucketParameters.countDocuments({ 'key.g': GROUP_ID });
const lookupHigh = randomBytes(4).readUInt32BE(0);
await bucketParameters.createIndex(
{
'key.g': 1,
lookup: 1,
_id: 1
},
{ name: 'lookup1' }
);
const sourceTableIds = [new ObjectId(), new ObjectId(), new ObjectId(), new ObjectId()];
const bucketParametersRowPool = createRowIdPool(TOTAL_BUCKET_PARAMETER_DOCS, `row_${runId}_`);
let opId = existingCount * 2 + 1;
console.log(`Generating bucket_parameters (${TOTAL_BUCKET_PARAMETER_DOCS.toLocaleString()} docs)...`);
console.log(`Run id: ${runId}, op_id start: ${opId.toLocaleString()}, lookup_hi: ${lookupHigh}`);
const bucketParametersLogger = createInsertLogger('bucket_parameters insert', TOTAL_BUCKET_PARAMETER_DOCS);
let paramBatch = [];
let paramBatchBytes = 0;
let paramWritten = 0;
let lookupId = 1;
while (paramWritten < TOTAL_BUCKET_PARAMETER_DOCS) {
let remainingForLookup = TOTAL_BUCKET_PARAMETER_DOCS - paramWritten;
let keyCount = randomInt(1, 5);
if (keyCount > remainingForLookup) {
keyCount = remainingForLookup;
}
const entries = bucketParametersRowPool.pickDistinctEntries(keyCount);
if (entries.length === 0) {
throw new Error('Row id pool exhausted unexpectedly');
}
const currentLookupId = lookupId;
const lookupBuffer = buildLookupBuffer(lookupHigh, currentLookupId);
const lookup = new Binary(lookupBuffer);
lookupId += 1;
for (let i = 0; i < entries.length && paramWritten < TOTAL_BUCKET_PARAMETER_DOCS; i++) {
const entry = entries[i];
const keysLeft = entries.length - i - 1;
remainingForLookup = TOTAL_BUCKET_PARAMETER_DOCS - paramWritten;
const maxForKey = Math.min(10, remainingForLookup - keysLeft);
if (maxForKey <= 0) {
break;
}
const docsForKey = bucketParametersRowPool.allocateDocsForEntry(entry, maxForKey);
const sourceKey = uuidFromString(entry.rowId);
const sourceTableId = sourceTableIds[randomInt(0, sourceTableIds.length - 1)];
for (let d = 0; d < docsForKey && paramWritten < TOTAL_BUCKET_PARAMETER_DOCS; d++) {
const params = [];
for (let p = 0; p < PARAMETERS_PER_DOC; p++) {
params.push({
value: `test_${opId}_${p}`
});
}
const doc = {
_id: Long.fromNumber(opId),
key: {
g: GROUP_ID,
t: sourceTableId,
k: sourceKey
},
lookup,
bucket_parameters: params
};
paramBatch.push(doc);
paramBatchBytes += PARAM_VALUE_SIZE_BYTES * PARAMETERS_PER_DOC + 200;
paramWritten += 1;
opId += 1;
if (paramBatch.length >= BATCH_MAX_DOCS || paramBatchBytes >= BATCH_MAX_BYTES) {
await bucketParameters.insertMany(paramBatch, { ordered: false });
bucketParametersLogger.record(paramBatch.length);
paramBatch = [];
paramBatchBytes = 0;
}
}
}
}
if (paramBatch.length > 0) {
await bucketParameters.insertMany(paramBatch, { ordered: false });
bucketParametersLogger.record(paramBatch.length);
}
bucketParametersLogger.flush();
console.log(`bucket_parameters: done (${paramWritten.toLocaleString()} docs)`);
await client.close();
}
main().catch((err) => {
console.error(err);
process.exit(1);
});