Skip to content
Open
1 change: 1 addition & 0 deletions components/persistor/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ coverage
dist
.nyc_output
test/supertype/*.js
lib/utils/*.js
lib/persistable.js
42 changes: 31 additions & 11 deletions components/persistor/lib/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,16 @@
*/

import { PersistorTransaction, RemoteDocConnectionOptions } from './types';
import { PersistorCtx } from './knex/PersistorCtx';
import { PersistorUtils } from './utils/PersistorUtils';

module.exports = function (PersistObjectTemplate, baseClassForPersist) {
const moduleName = `persistor/lib/api`;
let supertypeRequire = require('@haventech/supertype');
let statsDHelper = supertypeRequire.StatsdHelper;

var Promise = require('bluebird');
var _ = require('underscore');

function getTime() {
return process.hrtime();
}
Expand Down Expand Up @@ -165,10 +166,11 @@ module.exports = function (PersistObjectTemplate, baseClassForPersist) {
* @param {object} idMap id mapper for cached objects
* @param {bool} isRefresh force load
* @param {object} logger objecttemplate logger
* @param {date} asOfDate load objects based on the date provided
* @returns {object}
* @legacy Use persistorFetchById instead
*/
template.getFromPersistWithId = async function (id, cascade, isTransient, idMap, isRefresh, logger) {
template.getFromPersistWithId = async function (id, cascade, isTransient, idMap, isRefresh, logger, asOfDate) {
const functionName = 'getFromPersistWithId';
(logger || PersistObjectTemplate.logger).debug({
module: moduleName,
Expand All @@ -184,7 +186,7 @@ module.exports = function (PersistObjectTemplate, baseClassForPersist) {

let getQuery = (dbType == PersistObjectTemplate.DB_Mongo ?
PersistObjectTemplate.getFromPersistWithMongoId(template, id, cascade, isTransient, idMap, logger) :
PersistObjectTemplate.getFromPersistWithKnexId(template, id, cascade, isTransient, idMap, isRefresh, logger));
PersistorCtx.checkAndExecuteWithContext(asOfDate, PersistObjectTemplate.getFromPersistWithKnexId.bind(PersistObjectTemplate, template, id, cascade, isTransient, idMap, isRefresh, logger)));

const name = 'getFromPersistWithId';
return getQuery
Expand All @@ -210,10 +212,11 @@ module.exports = function (PersistObjectTemplate, baseClassForPersist) {
* @param {object} idMap id mapper for cached objects
* @param {bool} options {@TODO}
* @param {object} logger objecttemplate logger
* @param {date} asOfDate load objects based on the date provided
* @returns {object}
* @legacy in favor of persistorFetchByQuery
*/
template.getFromPersistWithQuery = async function (query, cascade, start, limit, isTransient, idMap, options, logger) {
template.getFromPersistWithQuery = async function (query, cascade, start, limit, isTransient, idMap, options, logger, asOfDate) {
const functionName = 'getFromPersistWithQuery';
(logger || PersistObjectTemplate.logger).debug({
module: moduleName,
Expand All @@ -228,7 +231,7 @@ module.exports = function (PersistObjectTemplate, baseClassForPersist) {

let getQuery = (dbType == PersistObjectTemplate.DB_Mongo ?
PersistObjectTemplate.getFromPersistWithMongoQuery(template, query, cascade, start, limit, isTransient, idMap, options, logger) :
PersistObjectTemplate.getFromPersistWithKnexQuery(null, template, query, cascade, start, limit, isTransient, idMap, options, undefined, undefined, logger));
PersistorCtx.checkAndExecuteWithContext(asOfDate, PersistObjectTemplate.getFromPersistWithKnexQuery.bind(PersistObjectTemplate, null, template, query, cascade, start, limit, isTransient, idMap, options, undefined, undefined, logger)));


const name = 'getFromPersistWithQuery';
Expand Down Expand Up @@ -286,6 +289,7 @@ module.exports = function (PersistObjectTemplate, baseClassForPersist) {
PersistObjectTemplate._validateParams(options, 'fetchSchema', template);

options = options || {};
const idMap = PersistorCtx.persistorCacheCtx || null;

var persistObjectTemplate = options.session || PersistObjectTemplate;
(options.logger || persistObjectTemplate.logger).debug({
Expand All @@ -300,7 +304,7 @@ module.exports = function (PersistObjectTemplate, baseClassForPersist) {
var dbType = persistObjectTemplate.getDB(persistObjectTemplate.getDBAlias(template.__collection__)).type;
let fetchQuery = (dbType == persistObjectTemplate.DB_Mongo ?
persistObjectTemplate.getFromPersistWithMongoId(template, id, options.fetch, options.transient, null, options.logger) :
persistObjectTemplate.getFromPersistWithKnexId(template, id, options.fetch, options.transient, null, null, options.logger, options.enableChangeTracking, options.projection));
PersistorCtx.checkAndExecuteWithContext(options.asOfDate, persistObjectTemplate.getFromPersistWithKnexId.bind(persistObjectTemplate, template, id, options.fetch, options.transient, idMap, null, options.logger, options.enableChangeTracking, options.projection)));

const name = 'persistorFetchById';
return fetchQuery
Expand All @@ -326,6 +330,8 @@ module.exports = function (PersistObjectTemplate, baseClassForPersist) {
PersistObjectTemplate._validateParams(options, 'persistSchema', template);

options = options || {};
const idMap = PersistorCtx.persistorCacheCtx || null;

var dbType = PersistObjectTemplate.getDB(PersistObjectTemplate.getDBAlias(template.__collection__)).type;
let deleteQuery = dbType == PersistObjectTemplate.DB_Mongo ?
PersistObjectTemplate.deleteFromPersistWithMongoQuery(template, query, options.logger) :
Expand Down Expand Up @@ -358,6 +364,8 @@ module.exports = function (PersistObjectTemplate, baseClassForPersist) {

options = options || {};
var persistObjectTemplate = options.session || PersistObjectTemplate;
const idMap = PersistorCtx.persistorCacheCtx || null;

var logger = options.logger || persistObjectTemplate.logger;
logger.debug({
module: moduleName,
Expand All @@ -374,9 +382,9 @@ module.exports = function (PersistObjectTemplate, baseClassForPersist) {
let fetchQuery = (dbType == persistObjectTemplate.DB_Mongo ?
persistObjectTemplate.getFromPersistWithMongoQuery(template, query, options.fetch, options.start,
options.limit, options.transient, options.order, options.order, logger) :
persistObjectTemplate.getFromPersistWithKnexQuery(null, template, query, options.fetch, options.start,
options.limit, options.transient, null, options.order,
undefined, undefined, logger, options.enableChangeTracking, options.projection));
PersistorCtx.checkAndExecuteWithContext(options.asOfDate, persistObjectTemplate.getFromPersistWithKnexQuery.bind(persistObjectTemplate, null, template, query, options.fetch, options.start,
options.limit, options.transient, idMap, options.order,
undefined, undefined, logger, options.enableChangeTracking, options.projection)));

const name = 'persistorFetchByQuery';
return fetchQuery
Expand Down Expand Up @@ -1320,6 +1328,18 @@ module.exports = function (PersistObjectTemplate, baseClassForPersist) {
return this.__defaultTransaction__;
};

PersistObjectTemplate.setPersistorCacheContext = function (cacheContext) {
return PersistorCtx.setExecutionContext(cacheContext);
};

Object.defineProperty(PersistObjectTemplate, 'persistorCacheCtxKey', {
get: function () {
return PersistorCtx.persistorCacheCtxKey;
},
enumerable: false,
configurable: false
})

PersistObjectTemplate.commit = async function commit(options) {
var time = getTime();
PersistObjectTemplate._validateParams(options, 'commitSchema');
Expand Down Expand Up @@ -1396,7 +1416,7 @@ module.exports = function (PersistObjectTemplate, baseClassForPersist) {
templates.push(template);
}
}
return Promise.map(templates, action, { concurrency: concurrency || 1 });
return PersistorUtils.asyncMap(templates, concurrency || 1, action);
}

};
80 changes: 80 additions & 0 deletions components/persistor/lib/knex/PersistorCtx.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import { AsyncLocalStorage } from 'async_hooks'

type CtxProps = { name: string, properties: {} }

class ExecutionCtx {
private readonly _asOfDate;

constructor(asOfDate: Date) {
this._asOfDate = asOfDate
}

get asOfDate() {
return this._asOfDate;
}

}


export class PersistorCtx {
static persistorExnCtxKey = '#persistor-exn-ctx';
static persistorCacheCtxKey = '#persistor-cache-ctx';
private static _asyncLocalStorage: AsyncLocalStorage<CtxProps>;

private static get asyncLocalStorage() {
return this._asyncLocalStorage || (this._asyncLocalStorage = new AsyncLocalStorage<CtxProps>());
}

static async checkAndExecuteWithContext(asOfDate: Date, callback: () => any ) {
if (!asOfDate) {
const response = await callback();
return Promise.resolve(response);
}

const ctxProps = {
name: `${new Date().getTime()}`,
properties: { [this.persistorExnCtxKey]: new ExecutionCtx(asOfDate) },
};
return this.asyncLocalStorage.run(ctxProps, async () => {
const response = await callback();
return Promise.resolve(response);
});
}

static get executionCtx() {
if (!this._asyncLocalStorage) {
return;
}
const store = this.asyncLocalStorage.getStore() as CtxProps;
if (!store) {
return;
}
const exnCtx: ExecutionCtx = store.properties[this.persistorExnCtxKey];
return exnCtx;
}

static get persistorCacheCtx() {
if (!this._asyncLocalStorage) {
return;
}
const store = this.asyncLocalStorage.getStore() as CtxProps;
if (!store) {
return;
}
return store.properties[this.persistorCacheCtxKey];
}

static setExecutionContext(asyncLocalStorage: AsyncLocalStorage<CtxProps>) {
const store = asyncLocalStorage.getStore() as CtxProps;
if (!store) {
return;
}

const exnCtx: ExecutionCtx = store.properties[this.persistorCacheCtxKey];
if (!exnCtx) {
throw Error(`ExecutionCtx can only be set from outside with #persistor-cache-ctx property in the store.`)
}

this._asyncLocalStorage = asyncLocalStorage;
}
}
Loading