From 7e846095017fbc8a45d6f94f305925ac71661656 Mon Sep 17 00:00:00 2001 From: Attila Pix Date: Mon, 2 Mar 2026 13:58:56 -0800 Subject: [PATCH] Make the migration lock atomic --- CHANGELOG.md | 6 ++++++ README.md | 44 +++++++++++++++++++++++++++++++++++++++ lib/actions/down.js | 9 ++++---- lib/actions/up.js | 10 ++++----- lib/utils/lock.js | 31 ++++++++++++++------------- test/actions/down.test.js | 34 ++++++++++++++---------------- test/actions/up.test.js | 41 +++++++++++++++--------------------- 7 files changed, 109 insertions(+), 66 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 181a51b..a7e27ca 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) +## [Unreleased] +- Make migration lock acquisition atomic + - Replace separate existence-check + activate steps with a single atomic `acquire` operation + - Eliminates the race condition where concurrent processes could both pass the lock check and run migrations simultaneously + - Update README with documentation on the locking feature, configuration options, and error messages + ## [14.0.7] - 2025-12-03 - Reorganize test mocks structure - Move __mocks__ directory from project root to test directory diff --git a/README.md b/README.md index 64cf984..f20f439 100644 --- a/README.md +++ b/README.md @@ -358,6 +358,50 @@ module.exports = { }; ```` +### Using the locking feature to prevent concurrent migrations + +migrate-mongo can use a lock collection to ensure that only one migration process runs at a time. This is useful in environments where multiple instances of your application may start up simultaneously and all attempt to run migrations. + +#### How it works + +When `up` or `down` is called, migrate-mongo atomically acquires a lock against the lock collection. Because this is a single atomic operation, only one concurrent caller will succeed — the first one gets the lock, and all others are immediately rejected with an error. + +Once migrations finish, the lock is released automatically. + +#### Configuration + +Two config options control this feature: + +```javascript +// The mongodb collection where the lock will be stored. +lockCollectionName: "changelog_lock", + +// TTL in seconds for the lock document. This acts as a safety net to +// automatically expire stale locks left behind by a crashed process. +lockTtl: 60, +``` + +#### Enabling / disabling the locking feature + +Set the `lockCollectionName` and set the `lockTtl` to a positive number to enable the locking feature. If the `lockCollectionName` is not set, or the `lockTtl` is not a positive number, locking is disabled. + +#### Errors + +If a lock is already in place when `up` or `down` is called, an error is thrown: + +``` +Could not migrate up, a lock is in place. +``` +``` +Could not migrate down, a lock is in place. +``` + +If the lock collection itself cannot be written to, the error will be: + +``` +Could not create a lock: +``` + ### Using a file hash algorithm to enable re-running updated files There are use cases where it may make sense to not treat scripts as immutable items. An example would be a simple collection with lookup values where you just can wipe and recreate the entire collection all at the same time. diff --git a/lib/actions/down.js b/lib/actions/down.js index 9bc0492..e959036 100644 --- a/lib/actions/down.js +++ b/lib/actions/down.js @@ -40,14 +40,15 @@ export default async (db, client) => { } } - if (await lock.exist(db)) { - throw new Error("Could not migrate down, a lock is in place."); - } + let acquired; try { - await lock.activate(db); + acquired = await lock.acquire(db); } catch(err) { throw new Error(`Could not create a lock: ${err.message}`); } + if (!acquired) { + throw new Error("Could not migrate down, a lock is in place."); + } try { for (const item of itemsToRollback) { diff --git a/lib/actions/up.js b/lib/actions/up.js index 405e021..80e2458 100644 --- a/lib/actions/up.js +++ b/lib/actions/up.js @@ -9,15 +9,15 @@ export default async (db, client) => { const migrated = []; const migrationBlock = Date.now(); - if (await lock.exist(db)) { - throw new Error("Could not migrate up, a lock is in place."); - } - + let acquired; try { - await lock.activate(db); + acquired = await lock.acquire(db); } catch(err) { throw new Error(`Could not create a lock: ${err.message}`); } + if (!acquired) { + throw new Error("Could not migrate up, a lock is in place."); + } const migrateItem = async item => { try { diff --git a/lib/utils/lock.js b/lib/utils/lock.js index 25e8d62..bbe9041 100644 --- a/lib/utils/lock.js +++ b/lib/utils/lock.js @@ -7,33 +7,34 @@ async function getLockCollection(db) { } const lockCollection = db.collection(lockCollectionName); - lockCollection.createIndex({ createdAt: 1 }, { expireAfterSeconds: lockTtl }); + await lockCollection.createIndex({ createdAt: 1 }, { expireAfterSeconds: lockTtl }); return lockCollection; } -async function exist(db) { +async function acquire(db) { const lockCollection = await getLockCollection(db); if (!lockCollection) { - return false; + return true; } - const foundLocks = await lockCollection.find({}).toArray(); - - return foundLocks.length > 0; -} - -async function activate(db) { - const lockCollection = await getLockCollection(db); - if (lockCollection) { - await lockCollection.insertOne({ createdAt: new Date() }); + try { + const result = await lockCollection.updateOne( + { _id: 'lock' }, + { $setOnInsert: { createdAt: new Date() } }, + { upsert: true } + ); + return result.upsertedCount === 1; + } catch (err) { + if (err.code === 11000) return false; + throw err; } } async function clear(db) { const lockCollection = await getLockCollection(db); if (lockCollection) { - await lockCollection.deleteMany({}); + await lockCollection.deleteOne({ _id: 'lock' }); } } -export { exist, activate, clear }; -export default { exist, activate, clear }; +export { acquire, clear }; +export default { acquire, clear }; diff --git a/test/actions/down.test.js b/test/actions/down.test.js index da70c39..86b2364 100644 --- a/test/actions/down.test.js +++ b/test/actions/down.test.js @@ -41,15 +41,10 @@ describe("down", () => { } function mockChangelogLockCollection() { - const findStub = { - toArray: vi.fn().mockResolvedValue([]) - }; - return { - insertOne: vi.fn().mockResolvedValue(), + updateOne: vi.fn().mockResolvedValue({ upsertedCount: 1 }), createIndex: vi.fn().mockResolvedValue(), - find: vi.fn().mockReturnValue(findStub), - deleteMany: vi.fn().mockResolvedValue(), + deleteOne: vi.fn().mockResolvedValue(), }; } @@ -158,9 +153,8 @@ describe("down", () => { it("should lock if feature is enabled", async() => { await down(db); expect(changelogLockCollection.createIndex).toHaveBeenCalled(); - expect(changelogLockCollection.find).toHaveBeenCalled(); - expect(changelogLockCollection.insertOne).toHaveBeenCalled(); - expect(changelogLockCollection.deleteMany).toHaveBeenCalled(); + expect(changelogLockCollection.updateOne).toHaveBeenCalled(); + expect(changelogLockCollection.deleteOne).toHaveBeenCalled(); }); it("should ignore lock if feature is disabled", async() => { @@ -169,25 +163,29 @@ describe("down", () => { lockCollectionName: "changelog_lock", lockTtl: 0 }); - changelogLockCollection.find.mockReturnValue({ - toArray: vi.fn().mockResolvedValue([{ createdAt: new Date() }]) - }); await down(db); expect(changelogLockCollection.createIndex).not.toHaveBeenCalled(); - expect(changelogLockCollection.find).not.toHaveBeenCalled(); + expect(changelogLockCollection.updateOne).not.toHaveBeenCalled(); + expect(changelogLockCollection.deleteOne).not.toHaveBeenCalled(); }); it("should yield an error when unable to create a lock", async() => { - changelogLockCollection.insertOne.mockRejectedValue(new Error("Kernel panic")); + changelogLockCollection.updateOne.mockRejectedValue(new Error("Kernel panic")); await expect(down(db)).rejects.toThrow("Could not create a lock: Kernel panic"); }); it("should yield an error when changelog is locked", async() => { - changelogLockCollection.find.mockReturnValue({ - toArray: vi.fn().mockResolvedValue([{ createdAt: new Date() }]) - }); + changelogLockCollection.updateOne.mockResolvedValue({ upsertedCount: 0 }); + + await expect(down(db)).rejects.toThrow("Could not migrate down, a lock is in place."); + }); + + it("should yield an error when changelog is locked due to a concurrent acquire (E11000)", async() => { + const dupKeyError = new Error("E11000 duplicate key error"); + dupKeyError.code = 11000; + changelogLockCollection.updateOne.mockRejectedValue(dupKeyError); await expect(down(db)).rejects.toThrow("Could not migrate down, a lock is in place."); }); diff --git a/test/actions/up.test.js b/test/actions/up.test.js index 2ea0d8e..8a48d05 100644 --- a/test/actions/up.test.js +++ b/test/actions/up.test.js @@ -42,15 +42,10 @@ describe("up", () => { } function mockChangelogLockCollection() { - const findStub = { - toArray: vi.fn().mockResolvedValue([]) - }; - return { - insertOne: vi.fn().mockResolvedValue(), + updateOne: vi.fn().mockResolvedValue({ upsertedCount: 1 }), createIndex: vi.fn().mockResolvedValue(), - find: vi.fn().mockReturnValue(findStub), - deleteMany: vi.fn().mockResolvedValue(), + deleteOne: vi.fn().mockResolvedValue(), }; } @@ -153,9 +148,6 @@ describe("up", () => { lockTtl: 0, useFileHash: true, }); - changelogLockCollection.find.mockReturnValue({ - toArray: vi.fn().mockResolvedValue([{ createdAt: new Date() }]) - }); vi.useFakeTimers(); vi.setSystemTime(new Date("2016-06-09T08:07:00.077Z")); @@ -222,9 +214,8 @@ describe("up", () => { it("should lock if feature is enabled", async() => { await up(db); expect(changelogLockCollection.createIndex).toHaveBeenCalled(); - expect(changelogLockCollection.find).toHaveBeenCalled(); - expect(changelogLockCollection.insertOne).toHaveBeenCalled(); - expect(changelogLockCollection.deleteMany).toHaveBeenCalled(); + expect(changelogLockCollection.updateOne).toHaveBeenCalled(); + expect(changelogLockCollection.deleteOne).toHaveBeenCalled(); }); it("should ignore lock if feature is disabled", async() => { @@ -233,28 +224,30 @@ describe("up", () => { lockCollectionName: "changelog_lock", lockTtl: 0 }); - changelogLockCollection.find.mockReturnValue({ - toArray: vi.fn().mockResolvedValue([{ createdAt: new Date() }]) - }); await up(db); expect(changelogLockCollection.createIndex).not.toHaveBeenCalled(); - expect(changelogLockCollection.find).not.toHaveBeenCalled(); - expect(changelogLockCollection.insertOne).not.toHaveBeenCalled(); - expect(changelogLockCollection.deleteMany).not.toHaveBeenCalled(); + expect(changelogLockCollection.updateOne).not.toHaveBeenCalled(); + expect(changelogLockCollection.deleteOne).not.toHaveBeenCalled(); }); it("should yield an error when unable to create a lock", async() => { - changelogLockCollection.insertOne.mockRejectedValue(new Error("Kernel panic")); + changelogLockCollection.updateOne.mockRejectedValue(new Error("Kernel panic")); await expect(up(db)).rejects.toThrow("Could not create a lock: Kernel panic"); }); it("should yield an error when changelog is locked", async() => { - changelogLockCollection.find.mockReturnValue({ - toArray: vi.fn().mockResolvedValue([{ createdAt: new Date() }]) - }); - + changelogLockCollection.updateOne.mockResolvedValue({ upsertedCount: 0 }); + + await expect(up(db)).rejects.toThrow("Could not migrate up, a lock is in place."); + }); + + it("should yield an error when changelog is locked due to a concurrent acquire (E11000)", async() => { + const dupKeyError = new Error("E11000 duplicate key error"); + dupKeyError.code = 11000; + changelogLockCollection.updateOne.mockRejectedValue(dupKeyError); + await expect(up(db)).rejects.toThrow("Could not migrate up, a lock is in place."); }); });