Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 169 additions & 9 deletions src/commands/protocols/get/sender/get-command.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class GetCommand extends Command {
this.cryptoService = ctx.cryptoService;
this.messagingService = ctx.messagingService;
this.tripleStoreModuleManager = ctx.tripleStoreModuleManager;
this.pendingStorageService = ctx.pendingStorageService;
}

async handleError(operationId, blockchain, errorMessage, errorType) {
Expand Down Expand Up @@ -72,19 +73,67 @@ class GetCommand extends Command {
OPERATION_ID_STATUS.GET.GET_VALIDATE_ASSET_START,
);

const { isValid, errorMessage } = await this.validateUAL(
operationId,
blockchain,
contract,
knowledgeCollectionId,
ual,
);
const maxGetRetries = 3;
const getRetryDelayMs = 5_000;
let ualValidationPassed = false;
let ualValidationError = null;

for (let attempt = 1; attempt <= maxGetRetries; attempt += 1) {
try {
// eslint-disable-next-line no-await-in-loop
const { isValid, errorMessage: valMsg } = await this.validateUAL(
operationId,
blockchain,
contract,
knowledgeCollectionId,
ual,
);
if (isValid) {
ualValidationPassed = true;
break;
}
ualValidationError = valMsg;
} catch (err) {
ualValidationError = `UAL validation failed: ${err.message}`;
}

if (!isValid) {
if (!ualValidationPassed) {
try {
// eslint-disable-next-line no-await-in-loop
const cachedResult = await this._tryCacheFallback(
blockchain,
contract,
knowledgeCollectionId,
knowledgeAssetId,
ual,
operationId,
paranetNodesAccessPolicy,
contentType,
);
if (cachedResult) {
return cachedResult;
}
} catch (_cacheErr) {
// cache fallback also failed, will retry
}
}

if (attempt < maxGetRetries) {
this.logger.debug(
`Get validation/cache attempt ${attempt}/${maxGetRetries} failed for ${ual}, retrying in ${getRetryDelayMs}ms`,
);
// eslint-disable-next-line no-await-in-loop
await new Promise((resolve) => {
setTimeout(resolve, getRetryDelayMs);
});
}
}

if (!ualValidationPassed) {
await this.handleError(
operationId,
blockchain,
errorMessage,
ualValidationError,
ERROR_TYPE.GET.GET_VALIDATE_ASSET_ERROR,
);
return Command.empty();
Expand Down Expand Up @@ -262,6 +311,26 @@ class GetCommand extends Command {
}
this.logger.debug(`Could not find asset with UAL: ${ual} locally`);

try {
const cachedResult = await this._tryCacheFallback(
blockchain,
contract,
knowledgeCollectionId,
knowledgeAssetId,
ual,
operationId,
paranetNodesAccessPolicy,
contentType,
);
if (cachedResult) {
return cachedResult;
}
} catch (cacheErr) {
this.logger.debug(
`Pending storage cache fallback failed for ${ual}: ${cacheErr.message}`,
);
}

await this.operationIdService.emitChangeEvent(
OPERATION_ID_STATUS.GET.GET_LOCAL_END,
operationId,
Expand Down Expand Up @@ -391,6 +460,97 @@ class GetCommand extends Command {
return Command.empty();
}

async _tryCacheFallback(
blockchain,
contract,
knowledgeCollectionId,
knowledgeAssetId,
ual,
operationId,
paranetNodesAccessPolicy,
contentType,
) {
if (knowledgeAssetId) return null;

const latestMerkleRoot =
await this.blockchainModuleManager.getKnowledgeCollectionLatestMerkleRoot(
blockchain,
contract,
knowledgeCollectionId,
);
if (!latestMerkleRoot) return null;

const publishOpId = this.pendingStorageService.getOperationIdByMerkleRoot(latestMerkleRoot);
if (!publishOpId) return null;

const cachedAssertion = await this.pendingStorageService.getCachedDataset(publishOpId);
if (
!cachedAssertion ||
(!cachedAssertion.public?.length && !cachedAssertion.private?.length)
) {
return null;
}

const filteredAssertion = this._filterAssertionByContentType(cachedAssertion, contentType);
if (!filteredAssertion.public?.length && !filteredAssertion.private?.length) {
return null;
}

let cachePassed = true;
if (paranetNodesAccessPolicy === PARANET_ACCESS_POLICY.PERMISSIONED) {
if (Array.isArray(filteredAssertion.public)) {
const shouldHavePrivate = filteredAssertion.public.some((triple) =>
triple.includes(`${PRIVATE_ASSERTION_PREDICATE}`),
);
if (shouldHavePrivate) {
cachePassed = filteredAssertion.private?.length > 0;
}
} else {
cachePassed = false;
}
}

if (!cachePassed) return null;

const cachedResponseData = { assertion: filteredAssertion };
const isValid = await this.validateResponse(
cachedResponseData,
blockchain,
contract,
knowledgeCollectionId,
knowledgeAssetId,
paranetNodesAccessPolicy,
contentType,
);
if (!isValid) return null;

this.logger.info(
`Serving asset ${ual} from pending storage cache (merkleRoot: ${latestMerkleRoot})`,
);
await this.operationService.markOperationAsCompleted(
operationId,
blockchain,
cachedResponseData,
[
OPERATION_ID_STATUS.GET.GET_LOCAL_END,
OPERATION_ID_STATUS.GET.GET_END,
OPERATION_ID_STATUS.COMPLETED,
],
);
return Command.empty();
}

_filterAssertionByContentType(assertion, contentType) {
if (!contentType || contentType === 'all') return assertion;
if (contentType === 'public') {
return { public: assertion.public || [] };
}
if (contentType === 'private') {
return { private: assertion.private || [] };
}
return assertion;
}

async validateUAL(operationId, blockchain, contract, knowledgeCollectionId, ual) {
const isUAL = this.ualService.isUAL(ual);

Expand Down
59 changes: 46 additions & 13 deletions src/commands/protocols/publish/publish-finalization-command.js
Original file line number Diff line number Diff line change
Expand Up @@ -137,14 +137,42 @@ class PublishFinalizationCommand extends Command {
const node = { id: publisherPeerId, protocol: networkProtocols[0] };

const message = { ual, publishOperationId, blockchain, operationId };
// TODO: Add retry logic maybe
const response = await this.messagingService.sendProtocolMessage(
node,
operationId,
message,
NETWORK_MESSAGE_TYPES.REQUESTS.PROTOCOL_REQUEST,
NETWORK_MESSAGE_TIMEOUT_MILLS.FINALITY.REQUEST,
);

const maxFinalityAttempts = 3;
const backoffDelays = [0, 5_000, 10_000];
let response;
let lastError;

for (let attempt = 0; attempt < maxFinalityAttempts; attempt += 1) {
if (backoffDelays[attempt] > 0) {
// eslint-disable-next-line no-await-in-loop
await new Promise((r) => {
setTimeout(r, backoffDelays[attempt]);
});
}
try {
// eslint-disable-next-line no-await-in-loop
response = await this.messagingService.sendProtocolMessage(
node,
operationId,
message,
NETWORK_MESSAGE_TYPES.REQUESTS.PROTOCOL_REQUEST,
NETWORK_MESSAGE_TIMEOUT_MILLS.FINALITY.REQUEST,
);
lastError = null;
break;
} catch (err) {
lastError = err;
this.logger.warn(
`Finality request to publisher ${publisherPeerId} failed ` +
`(attempt ${attempt + 1}/${maxFinalityAttempts}): ${err.message}`,
);
}
}

if (lastError) {
throw lastError;
}

await this.messagingService.handleProtocolResponse(
response,
Expand Down Expand Up @@ -196,16 +224,21 @@ class PublishFinalizationCommand extends Command {
return cachedData;
} catch (error) {
attempt += 1;
// eslint-disable-next-line no-await-in-loop
await new Promise((resolve) => {
setTimeout(resolve, RETRY_DELAY_READ_CACHED_PUBLISH_DATA);
});
if (attempt < MAX_RETRIES_READ_CACHED_PUBLISH_DATA) {
this.logger.debug(
`[Cache] Read attempt ${attempt}/${MAX_RETRIES_READ_CACHED_PUBLISH_DATA} ` +
`failed for publishOperationId: ${publishOperationId}, retrying in ${RETRY_DELAY_READ_CACHED_PUBLISH_DATA}ms...`,
);
// eslint-disable-next-line no-await-in-loop
await new Promise((resolve) => {
setTimeout(resolve, RETRY_DELAY_READ_CACHED_PUBLISH_DATA);
});
}
}
}
this.logger.warn(
`[Cache] Exhausted retries reading cached publish data (publishOperationId: ${publishOperationId}, path: ${datasetPath}).`,
);
// TODO: Mark this operation as failed
throw new Error('Failed to read cached publish data');
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class PublishReplicationCommand extends Command {
this.signatureService = ctx.signatureService;
this.cryptoService = ctx.cryptoService;
this.messagingService = ctx.messagingService;
this.pendingStorageService = ctx.pendingStorageService;

this.errorType = ERROR_TYPE.LOCAL_STORE.LOCAL_STORE_ERROR;
}
Expand Down Expand Up @@ -133,6 +134,14 @@ class PublishReplicationCommand extends Command {
return Command.empty();
}
const { dataset } = await this.operationIdService.getCachedOperationIdData(operationId);

await this.pendingStorageService.cacheDataset(
operationId,
datasetRoot,
dataset,
currentPeerId,
);

const message = {
dataset: dataset.public,
datasetRoot,
Expand Down
4 changes: 2 additions & 2 deletions src/constants/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -1067,8 +1067,8 @@ export const LOCAL_INSERT_FOR_ASSET_SYNC_RETRY_DELAY = 1000;
export const LOCAL_INSERT_FOR_CURATED_PARANET_MAX_ATTEMPTS = 5;
export const LOCAL_INSERT_FOR_CURATED_PARANET_RETRY_DELAY = 1000;

export const MAX_RETRIES_READ_CACHED_PUBLISH_DATA = 10;
export const RETRY_DELAY_READ_CACHED_PUBLISH_DATA = 10 * 1000;
export const MAX_RETRIES_READ_CACHED_PUBLISH_DATA = 5;
export const RETRY_DELAY_READ_CACHED_PUBLISH_DATA = 5 * 1000;

export const TRIPLE_STORE_REPOSITORY = {
DKG: 'dkg',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,15 @@ class FinalityStatusRepository {
async saveFinalityAck(operationId, ual, peerId, options) {
return this.model.upsert({ operationId, ual, peerId }, options);
}

async getPublishOperationIdByUal(ual, options) {
const record = await this.model.findOne({
where: { ual },
attributes: ['operationId'],
...options,
});
return record?.operationId ?? null;
}
}

export default FinalityStatusRepository;
4 changes: 4 additions & 0 deletions src/modules/repository/repository-module-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,10 @@ class RepositoryModuleManager extends BaseModuleManager {
return this.getRepository('finality_status').getFinalityAcksCount(ual, options);
}

async getPublishOperationIdByUal(ual, options = {}) {
return this.getRepository('finality_status').getPublishOperationIdByUal(ual, options);
}

async getLatestRandomSamplingChallengeRecordForBlockchainId(blockchainId, limit = 1) {
return this.getRepository(
'random_sampling_challenge',
Expand Down
Loading
Loading