Skip to content

Commit 0e40733

Browse files
committed
Fix local get after publish and cache reliability
- Add pending storage cache fallback in get-command so gets work right after publish - Add merkle root index in pending-storage-service for fast operationId lookups - Clean up merkle root index entries on cache removal to prevent memory leak - Add retry logic in publish-finalization-command for reading cached assertion data - Reduce cache retry window from 100s to 25s (5 retries x 5s) for faster failure detection - Add getPublishOperationIdByUal repository method for cache lookups Made-with: Cursor
1 parent 4e25e00 commit 0e40733

6 files changed

Lines changed: 124 additions & 15 deletions

File tree

src/commands/protocols/get/sender/get-command.js

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ class GetCommand extends Command {
3030
this.cryptoService = ctx.cryptoService;
3131
this.messagingService = ctx.messagingService;
3232
this.tripleStoreModuleManager = ctx.tripleStoreModuleManager;
33+
this.pendingStorageService = ctx.pendingStorageService;
3334
}
3435

3536
async handleError(operationId, blockchain, errorMessage, errorType) {
@@ -262,6 +263,49 @@ class GetCommand extends Command {
262263
}
263264
this.logger.debug(`Could not find asset with UAL: ${ual} locally`);
264265

266+
try {
267+
const latestMerkleRoot =
268+
await this.blockchainModuleManager.getKnowledgeCollectionLatestMerkleRoot(
269+
blockchain,
270+
contract,
271+
knowledgeCollectionId,
272+
);
273+
if (latestMerkleRoot) {
274+
const publishOpId =
275+
this.pendingStorageService.getOperationIdByMerkleRoot(latestMerkleRoot);
276+
if (publishOpId) {
277+
const cachedAssertion = await this.pendingStorageService.getCachedDataset(
278+
publishOpId,
279+
);
280+
if (
281+
cachedAssertion &&
282+
(cachedAssertion.public?.length || cachedAssertion.private?.length)
283+
) {
284+
const cachedResponseData = { assertion: cachedAssertion };
285+
286+
this.logger.info(
287+
`Serving asset ${ual} from pending storage cache (merkleRoot: ${latestMerkleRoot})`,
288+
);
289+
await this.operationService.markOperationAsCompleted(
290+
operationId,
291+
blockchain,
292+
cachedResponseData,
293+
[
294+
OPERATION_ID_STATUS.GET.GET_LOCAL_END,
295+
OPERATION_ID_STATUS.GET.GET_END,
296+
OPERATION_ID_STATUS.COMPLETED,
297+
],
298+
);
299+
return Command.empty();
300+
}
301+
}
302+
}
303+
} catch (cacheErr) {
304+
this.logger.debug(
305+
`Pending storage cache fallback failed for ${ual}: ${cacheErr.message}`,
306+
);
307+
}
308+
265309
await this.operationIdService.emitChangeEvent(
266310
OPERATION_ID_STATUS.GET.GET_LOCAL_END,
267311
operationId,

src/commands/protocols/publish/publish-finalization-command.js

Lines changed: 46 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -137,14 +137,42 @@ class PublishFinalizationCommand extends Command {
137137
const node = { id: publisherPeerId, protocol: networkProtocols[0] };
138138

139139
const message = { ual, publishOperationId, blockchain, operationId };
140-
// TODO: Add retry logic maybe
141-
const response = await this.messagingService.sendProtocolMessage(
142-
node,
143-
operationId,
144-
message,
145-
NETWORK_MESSAGE_TYPES.REQUESTS.PROTOCOL_REQUEST,
146-
NETWORK_MESSAGE_TIMEOUT_MILLS.FINALITY.REQUEST,
147-
);
140+
141+
const maxFinalityAttempts = 3;
142+
const backoffDelays = [0, 5_000, 10_000];
143+
let response;
144+
let lastError;
145+
146+
for (let attempt = 0; attempt < maxFinalityAttempts; attempt += 1) {
147+
if (backoffDelays[attempt] > 0) {
148+
// eslint-disable-next-line no-await-in-loop
149+
await new Promise((r) => {
150+
setTimeout(r, backoffDelays[attempt]);
151+
});
152+
}
153+
try {
154+
// eslint-disable-next-line no-await-in-loop
155+
response = await this.messagingService.sendProtocolMessage(
156+
node,
157+
operationId,
158+
message,
159+
NETWORK_MESSAGE_TYPES.REQUESTS.PROTOCOL_REQUEST,
160+
NETWORK_MESSAGE_TIMEOUT_MILLS.FINALITY.REQUEST,
161+
);
162+
lastError = null;
163+
break;
164+
} catch (err) {
165+
lastError = err;
166+
this.logger.warn(
167+
`Finality request to publisher ${publisherPeerId} failed ` +
168+
`(attempt ${attempt + 1}/${maxFinalityAttempts}): ${err.message}`,
169+
);
170+
}
171+
}
172+
173+
if (lastError) {
174+
throw lastError;
175+
}
148176

149177
await this.messagingService.handleProtocolResponse(
150178
response,
@@ -196,16 +224,21 @@ class PublishFinalizationCommand extends Command {
196224
return cachedData;
197225
} catch (error) {
198226
attempt += 1;
199-
// eslint-disable-next-line no-await-in-loop
200-
await new Promise((resolve) => {
201-
setTimeout(resolve, RETRY_DELAY_READ_CACHED_PUBLISH_DATA);
202-
});
227+
if (attempt < MAX_RETRIES_READ_CACHED_PUBLISH_DATA) {
228+
this.logger.debug(
229+
`[Cache] Read attempt ${attempt}/${MAX_RETRIES_READ_CACHED_PUBLISH_DATA} ` +
230+
`failed for publishOperationId: ${publishOperationId}, retrying in ${RETRY_DELAY_READ_CACHED_PUBLISH_DATA}ms...`,
231+
);
232+
// eslint-disable-next-line no-await-in-loop
233+
await new Promise((resolve) => {
234+
setTimeout(resolve, RETRY_DELAY_READ_CACHED_PUBLISH_DATA);
235+
});
236+
}
203237
}
204238
}
205239
this.logger.warn(
206240
`[Cache] Exhausted retries reading cached publish data (publishOperationId: ${publishOperationId}, path: ${datasetPath}).`,
207241
);
208-
// TODO: Mark this operation as failed
209242
throw new Error('Failed to read cached publish data');
210243
}
211244

src/constants/constants.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1067,8 +1067,8 @@ export const LOCAL_INSERT_FOR_ASSET_SYNC_RETRY_DELAY = 1000;
10671067
export const LOCAL_INSERT_FOR_CURATED_PARANET_MAX_ATTEMPTS = 5;
10681068
export const LOCAL_INSERT_FOR_CURATED_PARANET_RETRY_DELAY = 1000;
10691069

1070-
export const MAX_RETRIES_READ_CACHED_PUBLISH_DATA = 10;
1071-
export const RETRY_DELAY_READ_CACHED_PUBLISH_DATA = 10 * 1000;
1070+
export const MAX_RETRIES_READ_CACHED_PUBLISH_DATA = 5;
1071+
export const RETRY_DELAY_READ_CACHED_PUBLISH_DATA = 5 * 1000;
10721072

10731073
export const TRIPLE_STORE_REPOSITORY = {
10741074
DKG: 'dkg',

src/modules/repository/implementation/sequelize/repositories/finality-status-repository.js

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,15 @@ class FinalityStatusRepository {
1414
async saveFinalityAck(operationId, ual, peerId, options) {
1515
return this.model.upsert({ operationId, ual, peerId }, options);
1616
}
17+
18+
async getPublishOperationIdByUal(ual, options) {
19+
const record = await this.model.findOne({
20+
where: { ual },
21+
attributes: ['operationId'],
22+
...options,
23+
});
24+
return record?.operationId ?? null;
25+
}
1726
}
1827

1928
export default FinalityStatusRepository;

src/modules/repository/repository-module-manager.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -467,6 +467,10 @@ class RepositoryModuleManager extends BaseModuleManager {
467467
return this.getRepository('finality_status').getFinalityAcksCount(ual, options);
468468
}
469469

470+
async getPublishOperationIdByUal(ual, options = {}) {
471+
return this.getRepository('finality_status').getPublishOperationIdByUal(ual, options);
472+
}
473+
470474
async getLatestRandomSamplingChallengeRecordForBlockchainId(blockchainId, limit = 1) {
471475
return this.getRepository(
472476
'random_sampling_challenge',

src/service/pending-storage-service.js

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,16 @@ class PendingStorageService {
1010
this.fileService = ctx.fileService;
1111
this.repositoryModuleManager = ctx.repositoryModuleManager; // this is not used
1212
this.tripleStoreService = ctx.tripleStoreService; // this is not used
13+
this._merkleRootIndex = new Map();
1314
}
1415

1516
async cacheDataset(operationId, datasetRoot, dataset, remotePeerId) {
1617
this.logger.debug(
1718
`Caching ${datasetRoot} dataset root, operation id: ${operationId} in file in pending storage`,
1819
);
1920

21+
this._merkleRootIndex.set(datasetRoot, operationId);
22+
2023
await this.fileService.writeContentsToFile(
2124
this.fileService.getPendingStorageCachePath(),
2225
operationId,
@@ -28,6 +31,10 @@ class PendingStorageService {
2831
);
2932
}
3033

34+
getOperationIdByMerkleRoot(merkleRoot) {
35+
return this._merkleRootIndex.get(merkleRoot) ?? null;
36+
}
37+
3138
async getCachedDataset(operationId) {
3239
this.logger.debug(`Retrieving cached dataset for ${operationId} from pending storage`);
3340

@@ -94,6 +101,7 @@ class PendingStorageService {
94101

95102
const createdDate = fileStats.mtime;
96103
if (createdDate.getTime() + expirationTimeMillis < now) {
104+
this._removeMerkleRootIndexEntry(file);
97105
await this.fileService.removeFile(filePath);
98106
this.logger.debug(`Deleted expired file: ${filePath}`);
99107
return true;
@@ -155,6 +163,8 @@ class PendingStorageService {
155163
`Removing cached assertion for ual: ${ual} operation id: ${operationId} from file in ${repository} pending storage`,
156164
);
157165

166+
this._removeMerkleRootIndexEntry(operationId);
167+
158168
const pendingAssertionPath = await this.fileService.getPendingStorageDocumentPath(
159169
operationId,
160170
);
@@ -177,6 +187,15 @@ class PendingStorageService {
177187
}
178188
}
179189

190+
_removeMerkleRootIndexEntry(operationId) {
191+
for (const [root, opId] of this._merkleRootIndex) {
192+
if (opId === operationId) {
193+
this._merkleRootIndex.delete(root);
194+
break;
195+
}
196+
}
197+
}
198+
180199
async getPendingState(operationId) {
181200
return this.fileService.getPendingStorageLatestDocument(operationId);
182201
}

0 commit comments

Comments
 (0)