Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion components/persistor/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ RUN apk update && apk add --no-cache bash
WORKDIR /app

COPY ./wait-for-it.sh .
#COPY ./ ./
COPY ./ ./
RUN npm install --production=false
3 changes: 3 additions & 0 deletions components/persistor/HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 11.0.1
* In the past if a remote object is saved without a transaction, the persistSave would save the db first, followed by remote doc uploads. If there are intermittent connectivity issues with the doc, the doc wouldn't properly save into remote storage, causing the db and remote storage to be out of sync. In cases where updates are wrapped around transactions however, remote docs get uploaded first, followed by db save. In this case if there are db save errors, the remote docs are deleted as part of transaction rollback logic.
With this change, we are reversing the order of docs saved for the non-transaction cases to also be consistent with transaction cases. We do strongly recommend the use of transactions especially when saving remote docs.
## 11.0.0
* Just a major version bump to prepare for parallel es2020. This would allow us to start pegging versions of persistor on amorphic, so that persistor's minor or patched versions are not automatically pulled in when bumped.
## 10.0.0
Expand Down
2 changes: 1 addition & 1 deletion components/persistor/docker-compose-debug.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ services:
database-test:
image: postgres:9.4-alpine
ports:
- 5432
- '5432:5432'
environment:
- POSTGRES_USER=persistor
- POSTGRES_PASSWORD=persistor
Expand Down
83 changes: 63 additions & 20 deletions components/persistor/lib/knex/update.ts
Original file line number Diff line number Diff line change
Expand Up @@ -247,27 +247,70 @@ module.exports = function (PersistObjectTemplate) {
}
});

promises.push(this.saveKnexPojo(obj, pojo, isDocumentUpdate ? obj._id : null, txn, logger));

return Promise.all(promises) // update sql saves first
.then(() => (
Promise.all(
remoteUpdateFns.map(
// We want to wait to execute remote updates until the sql has resolved
remoteUpdateFn => remoteUpdateFn()
)
).then((updateData: Array<UploadDocumentResponse>) => {
if (txn) {
txn.remoteObjects = txn.remoteObjects || new Map();
for (const update of updateData) {
txn.remoteObjects.set(update.key, update.versionId);
}
}
})
)) // update remote objects second
.then(function() {
return obj;
// changing the order of execution. In the past, for cases without transaction, we saved db first followed by remote doc, if any.
// In case of connection errors to remote doc server, the db would get saved but the remote docs wouldnt causing a disconnect.
// Now, we are changing the order of execution to save remote doc first followed by db. This way on db failures, we can delete the docs.
// In case of connection issues
let remoteObjects: Map<any, any> = new Map();
try {
// first save documents, if any
const remoteDocs = remoteUpdateFns.map(
(remoteUpdateFn) => remoteUpdateFn()
);
if (remoteDocs && remoteDocs.length > 0) {
const updateData: Array<UploadDocumentResponse> = await Promise.all(remoteDocs);
setRemoteObjects(updateData, remoteObjects);
}
// next, save db records
promises.push(this.saveKnexPojo(obj, pojo, isDocumentUpdate ? obj._id : null, txn, logger));
await Promise.all(promises);
}
catch(error) {
// if transaction is used then simply throw as it would be handled appropriately by rollback logic.
if (txn) {
throw error;
}

//otherwise delete docs, log docs that could not be deleted and then throw;
let toDeletePromiseArr = [];

remoteObjects.forEach((versionId: string, key: string) => {
toDeletePromiseArr.push(
remoteDocService.deleteDocument(key, this.bucketName, versionId)
.catch(error => {
(logger || this.logger).error({
module: moduleName,
function: functionName,
category: 'milestone',
message: 'unable to rollback remote document with key:' + key + ' and bucket: ' + this.bucketName,
error: {
message: error && error.message,
stack: error && error.stack,
isHumanRelated: false,
name: error&& error.name
}
});
})
);
});
await Promise.all(toDeletePromiseArr);
throw error;
}

return obj;

function setRemoteObjects(updateData: Array<UploadDocumentResponse>, remoteObjects: Map<any, any>) {

if (txn && !txn.remoteObjects) {
txn.remoteObjects = remoteObjects;
}

remoteObjects = (txn && txn.remoteObjects) || remoteObjects;
for (const update of updateData) {
remoteObjects.set(update.key, update.versionId);
}

}

function log(defineProperty, pojo, prop) {
if (defineProperty.logChanges)
Expand Down
52 changes: 26 additions & 26 deletions components/persistor/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion components/persistor/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"name": "@haventech/persistor",
"description": "A subclass of supertype that serializes to and reconstitutes from MongoDB or SQL databases",
"homepage": "https://github.com/haven-life/persistor",
"version": "11.0.0",
"version": "11.0.1",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"dependencies": {
Expand Down
80 changes: 79 additions & 1 deletion components/persistor/test/persist_banking_s3.js
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,85 @@ describe('Banking from pgsql Example persist_banking_s3', function () {
});
});

context('when uploading a banking document', () => {
context('when uploading a banking document, without transaction', () => {
let bankingDocumentStr = null;

beforeEach(() => {
sinon.replace(S3RemoteDocClient.prototype, 'downloadDocument', () => (
Promise.resolve(bankingDocumentStr)
));
});

context('it succeeds', () => {
beforeEach(() => {
bankingDocumentStr = 'this should be stored remotely';
sinon.replace(S3RemoteDocClient.prototype, 'uploadDocument', () => (
Promise.resolve()
));

sandbox.spy(S3RemoteDocClient.prototype, 'uploadDocument');
});

afterEach(() => {
sandbox.restore();
});

it('should send with correct mime type', async () => {
let fetchedNoBankDocCust = await Customer.getFromPersistWithId(noBankingDocumentCustomer._id);
fetchedNoBankDocCust.bankingDocument = bankingDocumentStr;
await fetchedNoBankDocCust.persistSave();

fetchedNoBankDocCust = await Customer.getFromPersistWithId(noBankingDocumentCustomer._id);
expect(S3RemoteDocClient.prototype.uploadDocument.calledOnce).to.eql(true);
expect(S3RemoteDocClient.prototype.uploadDocument.getCall(0).args.length).to.eql(4);
expect(S3RemoteDocClient.prototype.uploadDocument.getCall(0).args[3]).to.eql('application/pdf');
});

it('should save to remote store', async () => {
let fetchedNoBankDocCust = await Customer.getFromPersistWithId(noBankingDocumentCustomer._id);
fetchedNoBankDocCust.bankingDocument = bankingDocumentStr;
await fetchedNoBankDocCust.persistSave();

fetchedNoBankDocCust = await Customer.getFromPersistWithId(noBankingDocumentCustomer._id);
expect(fetchedNoBankDocCust.bankingDocument).to.eql(bankingDocumentStr);
});
});

context('it errors', () => {
let uploadError;
beforeEach(() => {
uploadError = new Error('Upload Failed');
sinon.replace(S3RemoteDocClient.prototype, 'uploadDocument', () => (
Promise.reject(uploadError)
));
});

it('should rollback transaction on failure to save to remote store', async () => {
let fetchedNoBankDocCust;
let updateVersion;
try {
fetchedNoBankDocCust = await Customer.getFromPersistWithId(noBankingDocumentCustomer._id);
updateVersion = fetchedNoBankDocCust.version;
fetchedNoBankDocCust.bankingDocument = 'this should have rolled back';
await fetchedNoBankDocCust.persistSave();
expect.fail('Expected transaction to fail');
} catch (e) {
if (e instanceof AssertionError) {
throw e;
}
expect(e).to.eql(uploadError);

fetchedNoBankDocCust = await Customer.getFromPersistWithId(noBankingDocumentCustomer._id);
expect(fetchedNoBankDocCust.bankingDocument).to.not.equal('this should have rolled back');
expect(fetchedNoBankDocCust.bankingDocument).to.equal(bankingDocumentStr);
expect(updateVersion).to.equal(fetchedNoBankDocCust.version);
}
});
});
});


context('when uploading a banking document, with transaction', () => {
let bankingDocumentStr = null;

beforeEach(() => {
Expand Down
Loading