Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions components/persistor/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ coverage
dist
.nyc_output
test/supertype/*.js
lib/utils/*.js
lib/persistable.js
23 changes: 13 additions & 10 deletions components/persistor/lib/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,16 @@
*/

import { PersistorTransaction, RemoteDocConnectionOptions } from './types';
import { PersistorCtx } from './knex/PersistorCtx';
import { PersistorUtils } from './utils/PersistorUtils';

module.exports = function (PersistObjectTemplate, baseClassForPersist) {
const moduleName = `persistor/lib/api`;
let supertypeRequire = require('@haventech/supertype');
let statsDHelper = supertypeRequire.StatsdHelper;

var Promise = require('bluebird');
var _ = require('underscore');

function getTime() {
return process.hrtime();
}
Expand Down Expand Up @@ -165,10 +166,11 @@ module.exports = function (PersistObjectTemplate, baseClassForPersist) {
* @param {object} idMap id mapper for cached objects
* @param {bool} isRefresh force load
* @param {object} logger objecttemplate logger
* @param {date} asOfDate load objects based on the date provided
* @returns {object}
* @legacy Use persistorFetchById instead
*/
template.getFromPersistWithId = async function (id, cascade, isTransient, idMap, isRefresh, logger) {
template.getFromPersistWithId = async function (id, cascade, isTransient, idMap, isRefresh, logger, asOfDate) {
const functionName = 'getFromPersistWithId';
(logger || PersistObjectTemplate.logger).debug({
module: moduleName,
Expand All @@ -184,7 +186,7 @@ module.exports = function (PersistObjectTemplate, baseClassForPersist) {

let getQuery = (dbType == PersistObjectTemplate.DB_Mongo ?
PersistObjectTemplate.getFromPersistWithMongoId(template, id, cascade, isTransient, idMap, logger) :
PersistObjectTemplate.getFromPersistWithKnexId(template, id, cascade, isTransient, idMap, isRefresh, logger));
PersistorCtx.checkAndExecuteWithContext(asOfDate, PersistObjectTemplate.getFromPersistWithKnexId.bind(PersistObjectTemplate, template, id, cascade, isTransient, idMap, isRefresh, logger)));

const name = 'getFromPersistWithId';
return getQuery
Expand All @@ -210,10 +212,11 @@ module.exports = function (PersistObjectTemplate, baseClassForPersist) {
* @param {object} idMap id mapper for cached objects
* @param {bool} options {@TODO}
* @param {object} logger objecttemplate logger
* @param {date} asOfDate load objects based on the date provided
* @returns {object}
* @legacy in favor of persistorFetchByQuery
*/
template.getFromPersistWithQuery = async function (query, cascade, start, limit, isTransient, idMap, options, logger) {
template.getFromPersistWithQuery = async function (query, cascade, start, limit, isTransient, idMap, options, logger, asOfDate) {
const functionName = 'getFromPersistWithQuery';
(logger || PersistObjectTemplate.logger).debug({
module: moduleName,
Expand All @@ -228,7 +231,7 @@ module.exports = function (PersistObjectTemplate, baseClassForPersist) {

let getQuery = (dbType == PersistObjectTemplate.DB_Mongo ?
PersistObjectTemplate.getFromPersistWithMongoQuery(template, query, cascade, start, limit, isTransient, idMap, options, logger) :
PersistObjectTemplate.getFromPersistWithKnexQuery(null, template, query, cascade, start, limit, isTransient, idMap, options, undefined, undefined, logger));
PersistorCtx.checkAndExecuteWithContext(asOfDate, PersistObjectTemplate.getFromPersistWithKnexQuery.bind(PersistObjectTemplate, null, template, query, cascade, start, limit, isTransient, idMap, options, undefined, undefined, logger)));


const name = 'getFromPersistWithQuery';
Expand Down Expand Up @@ -300,7 +303,7 @@ module.exports = function (PersistObjectTemplate, baseClassForPersist) {
var dbType = persistObjectTemplate.getDB(persistObjectTemplate.getDBAlias(template.__collection__)).type;
let fetchQuery = (dbType == persistObjectTemplate.DB_Mongo ?
persistObjectTemplate.getFromPersistWithMongoId(template, id, options.fetch, options.transient, null, options.logger) :
persistObjectTemplate.getFromPersistWithKnexId(template, id, options.fetch, options.transient, null, null, options.logger, options.enableChangeTracking, options.projection));
PersistorCtx.checkAndExecuteWithContext(options.asOfDate, persistObjectTemplate.getFromPersistWithKnexId.bind(persistObjectTemplate, template, id, options.fetch, options.transient, null, null, options.logger, options.enableChangeTracking, options.projection)));

const name = 'persistorFetchById';
return fetchQuery
Expand Down Expand Up @@ -374,9 +377,9 @@ module.exports = function (PersistObjectTemplate, baseClassForPersist) {
let fetchQuery = (dbType == persistObjectTemplate.DB_Mongo ?
persistObjectTemplate.getFromPersistWithMongoQuery(template, query, options.fetch, options.start,
options.limit, options.transient, options.order, options.order, logger) :
persistObjectTemplate.getFromPersistWithKnexQuery(null, template, query, options.fetch, options.start,
PersistorCtx.checkAndExecuteWithContext(options.asOfDate, persistObjectTemplate.getFromPersistWithKnexQuery.bind(persistObjectTemplate, null, template, query, options.fetch, options.start,
options.limit, options.transient, null, options.order,
undefined, undefined, logger, options.enableChangeTracking, options.projection));
undefined, undefined, logger, options.enableChangeTracking, options.projection)));

const name = 'persistorFetchByQuery';
return fetchQuery
Expand Down Expand Up @@ -1396,7 +1399,7 @@ module.exports = function (PersistObjectTemplate, baseClassForPersist) {
templates.push(template);
}
}
return Promise.map(templates, action, { concurrency: concurrency || 1 });
return PersistorUtils.asyncMap(templates, concurrency || 1, action);
}

};
65 changes: 65 additions & 0 deletions components/persistor/lib/knex/PersistorCtx.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import { AsyncLocalStorage } from 'async_hooks'

type CtxProps = { name: string, properties: {} }

class ExecutionCtx {
private readonly _asOfDate;

constructor(asOfDate: Date) {
this._asOfDate = asOfDate
}

get asOfDate() {
return this._asOfDate;
}
}

export class PersistorCtx {
static persistorExnCtxKey = '#persistor-exn-ctx';
static persistorCacheCtxKey = '#persistor-cache-ctx';
private static _asyncLocalStorage: AsyncLocalStorage<CtxProps>;

private static get asyncLocalStorage() {
return this._asyncLocalStorage || (this._asyncLocalStorage = new AsyncLocalStorage<CtxProps>());
}

static checkAndExecuteWithContext(asOfDate: Date, callback: () => any ) {
if (!asOfDate) {
return callback();
}

const ctxProps = {
name: `${new Date().getTime()}`,
properties: { [this.persistorExnCtxKey]: new ExecutionCtx(asOfDate) },
};
return this.asyncLocalStorage.run(ctxProps, async () => {
return await callback();
});
}

static get executionCtx() {
if (!this._asyncLocalStorage) {
return;
}
const store = this.asyncLocalStorage.getStore() as CtxProps;
if (!store) {
return;
}
const exnCtx: ExecutionCtx = store.properties[this.persistorExnCtxKey];
return exnCtx;
}

static set setExecutionContext(asyncLocalStorage: AsyncLocalStorage<CtxProps>) {
const store = asyncLocalStorage.getStore() as CtxProps;
if (!store) {
return;
}

const exnCtx: ExecutionCtx = store.properties[this.persistorCacheCtxKey];
if (!exnCtx) {
throw Error(`ExecutionCtx can only be set from outside with #persistor-cache-ctx property in the store.`)
}

this._asyncLocalStorage = asyncLocalStorage;
}
}
Loading