Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)

## [Unreleased]
- Make migration lock acquisition atomic
- Replace separate existence-check + activate steps with a single atomic `acquire` operation
- Eliminates the race condition where concurrent processes could both pass the lock check and run migrations simultaneously
- Update README with documentation on the locking feature, configuration options, and error messages

## [14.0.7] - 2025-12-03
- Reorganize test mocks structure
- Move __mocks__ directory from project root to test directory
Expand Down
44 changes: 44 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,50 @@ module.exports = {
};
````

### Using the locking feature to prevent concurrent migrations

migrate-mongo can use a lock collection to ensure that only one migration process runs at a time. This is useful in environments where multiple instances of your application may start up simultaneously and all attempt to run migrations.

#### How it works

When `up` or `down` is called, migrate-mongo atomically acquires a lock against the lock collection. Because this is a single atomic operation, only one concurrent caller will succeed — the first one gets the lock, and all others are immediately rejected with an error.

Once migrations finish, the lock is released automatically.

#### Configuration

Two config options control this feature:

```javascript
// The mongodb collection where the lock will be stored.
lockCollectionName: "changelog_lock",

// TTL in seconds for the lock document. This acts as a safety net to
// automatically expire stale locks left behind by a crashed process.
lockTtl: 60,
```

#### Enabling / disabling the locking feature

Set the `lockCollectionName` and set the `lockTtl` to a positive number to enable the locking feature. If the `lockCollectionName` is not set, or the `lockTtl` is not a positive number, locking is disabled.

#### Errors

If a lock is already in place when `up` or `down` is called, an error is thrown:

```
Could not migrate up, a lock is in place.
```
```
Could not migrate down, a lock is in place.
```

If the lock collection itself cannot be written to, the error will be:

```
Could not create a lock: <reason>
```

### Using a file hash algorithm to enable re-running updated files
There are use cases where it may make sense to not treat scripts as immutable items. An example would be a simple collection with lookup values where you just can wipe and recreate the entire collection all at the same time.

Expand Down
9 changes: 5 additions & 4 deletions lib/actions/down.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,15 @@ export default async (db, client) => {
}
}

if (await lock.exist(db)) {
throw new Error("Could not migrate down, a lock is in place.");
}
let acquired;
try {
await lock.activate(db);
acquired = await lock.acquire(db);
} catch(err) {
throw new Error(`Could not create a lock: ${err.message}`);
}
if (!acquired) {
throw new Error("Could not migrate down, a lock is in place.");
}

try {
for (const item of itemsToRollback) {
Expand Down
10 changes: 5 additions & 5 deletions lib/actions/up.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ export default async (db, client) => {
const migrated = [];
const migrationBlock = Date.now();

if (await lock.exist(db)) {
throw new Error("Could not migrate up, a lock is in place.");
}

let acquired;
try {
await lock.activate(db);
acquired = await lock.acquire(db);
} catch(err) {
throw new Error(`Could not create a lock: ${err.message}`);
}
if (!acquired) {
throw new Error("Could not migrate up, a lock is in place.");
}

const migrateItem = async item => {
try {
Expand Down
31 changes: 16 additions & 15 deletions lib/utils/lock.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,33 +7,34 @@ async function getLockCollection(db) {
}

const lockCollection = db.collection(lockCollectionName);
lockCollection.createIndex({ createdAt: 1 }, { expireAfterSeconds: lockTtl });
await lockCollection.createIndex({ createdAt: 1 }, { expireAfterSeconds: lockTtl });
return lockCollection;
}

async function exist(db) {
async function acquire(db) {
const lockCollection = await getLockCollection(db);
if (!lockCollection) {
return false;
return true;
}
const foundLocks = await lockCollection.find({}).toArray();

return foundLocks.length > 0;
}

async function activate(db) {
const lockCollection = await getLockCollection(db);
if (lockCollection) {
await lockCollection.insertOne({ createdAt: new Date() });
try {
const result = await lockCollection.updateOne(
{ _id: 'lock' },
{ $setOnInsert: { createdAt: new Date() } },
{ upsert: true }
);
return result.upsertedCount === 1;
} catch (err) {
if (err.code === 11000) return false;
throw err;
}
}

async function clear(db) {
const lockCollection = await getLockCollection(db);
if (lockCollection) {
await lockCollection.deleteMany({});
await lockCollection.deleteOne({ _id: 'lock' });
}
}

export { exist, activate, clear };
export default { exist, activate, clear };
export { acquire, clear };
export default { acquire, clear };
34 changes: 16 additions & 18 deletions test/actions/down.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,10 @@ describe("down", () => {
}

function mockChangelogLockCollection() {
const findStub = {
toArray: vi.fn().mockResolvedValue([])
};

return {
insertOne: vi.fn().mockResolvedValue(),
updateOne: vi.fn().mockResolvedValue({ upsertedCount: 1 }),
createIndex: vi.fn().mockResolvedValue(),
find: vi.fn().mockReturnValue(findStub),
deleteMany: vi.fn().mockResolvedValue(),
deleteOne: vi.fn().mockResolvedValue(),
};
}

Expand Down Expand Up @@ -158,9 +153,8 @@ describe("down", () => {
it("should lock if feature is enabled", async() => {
await down(db);
expect(changelogLockCollection.createIndex).toHaveBeenCalled();
expect(changelogLockCollection.find).toHaveBeenCalled();
expect(changelogLockCollection.insertOne).toHaveBeenCalled();
expect(changelogLockCollection.deleteMany).toHaveBeenCalled();
expect(changelogLockCollection.updateOne).toHaveBeenCalled();
expect(changelogLockCollection.deleteOne).toHaveBeenCalled();
});

it("should ignore lock if feature is disabled", async() => {
Expand All @@ -169,25 +163,29 @@ describe("down", () => {
lockCollectionName: "changelog_lock",
lockTtl: 0
});
changelogLockCollection.find.mockReturnValue({
toArray: vi.fn().mockResolvedValue([{ createdAt: new Date() }])
});

await down(db);
expect(changelogLockCollection.createIndex).not.toHaveBeenCalled();
expect(changelogLockCollection.find).not.toHaveBeenCalled();
expect(changelogLockCollection.updateOne).not.toHaveBeenCalled();
expect(changelogLockCollection.deleteOne).not.toHaveBeenCalled();
});

it("should yield an error when unable to create a lock", async() => {
changelogLockCollection.insertOne.mockRejectedValue(new Error("Kernel panic"));
changelogLockCollection.updateOne.mockRejectedValue(new Error("Kernel panic"));

await expect(down(db)).rejects.toThrow("Could not create a lock: Kernel panic");
});

it("should yield an error when changelog is locked", async() => {
changelogLockCollection.find.mockReturnValue({
toArray: vi.fn().mockResolvedValue([{ createdAt: new Date() }])
});
changelogLockCollection.updateOne.mockResolvedValue({ upsertedCount: 0 });

await expect(down(db)).rejects.toThrow("Could not migrate down, a lock is in place.");
});

it("should yield an error when changelog is locked due to a concurrent acquire (E11000)", async() => {
const dupKeyError = new Error("E11000 duplicate key error");
dupKeyError.code = 11000;
changelogLockCollection.updateOne.mockRejectedValue(dupKeyError);

await expect(down(db)).rejects.toThrow("Could not migrate down, a lock is in place.");
});
Expand Down
41 changes: 17 additions & 24 deletions test/actions/up.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,10 @@ describe("up", () => {
}

function mockChangelogLockCollection() {
const findStub = {
toArray: vi.fn().mockResolvedValue([])
};

return {
insertOne: vi.fn().mockResolvedValue(),
updateOne: vi.fn().mockResolvedValue({ upsertedCount: 1 }),
createIndex: vi.fn().mockResolvedValue(),
find: vi.fn().mockReturnValue(findStub),
deleteMany: vi.fn().mockResolvedValue(),
deleteOne: vi.fn().mockResolvedValue(),
};
}

Expand Down Expand Up @@ -153,9 +148,6 @@ describe("up", () => {
lockTtl: 0,
useFileHash: true,
});
changelogLockCollection.find.mockReturnValue({
toArray: vi.fn().mockResolvedValue([{ createdAt: new Date() }])
});

vi.useFakeTimers();
vi.setSystemTime(new Date("2016-06-09T08:07:00.077Z"));
Expand Down Expand Up @@ -222,9 +214,8 @@ describe("up", () => {
it("should lock if feature is enabled", async() => {
await up(db);
expect(changelogLockCollection.createIndex).toHaveBeenCalled();
expect(changelogLockCollection.find).toHaveBeenCalled();
expect(changelogLockCollection.insertOne).toHaveBeenCalled();
expect(changelogLockCollection.deleteMany).toHaveBeenCalled();
expect(changelogLockCollection.updateOne).toHaveBeenCalled();
expect(changelogLockCollection.deleteOne).toHaveBeenCalled();
});

it("should ignore lock if feature is disabled", async() => {
Expand All @@ -233,28 +224,30 @@ describe("up", () => {
lockCollectionName: "changelog_lock",
lockTtl: 0
});
changelogLockCollection.find.mockReturnValue({
toArray: vi.fn().mockResolvedValue([{ createdAt: new Date() }])
});

await up(db);
expect(changelogLockCollection.createIndex).not.toHaveBeenCalled();
expect(changelogLockCollection.find).not.toHaveBeenCalled();
expect(changelogLockCollection.insertOne).not.toHaveBeenCalled();
expect(changelogLockCollection.deleteMany).not.toHaveBeenCalled();
expect(changelogLockCollection.updateOne).not.toHaveBeenCalled();
expect(changelogLockCollection.deleteOne).not.toHaveBeenCalled();
});

it("should yield an error when unable to create a lock", async() => {
changelogLockCollection.insertOne.mockRejectedValue(new Error("Kernel panic"));
changelogLockCollection.updateOne.mockRejectedValue(new Error("Kernel panic"));

await expect(up(db)).rejects.toThrow("Could not create a lock: Kernel panic");
});

it("should yield an error when changelog is locked", async() => {
changelogLockCollection.find.mockReturnValue({
toArray: vi.fn().mockResolvedValue([{ createdAt: new Date() }])
});

changelogLockCollection.updateOne.mockResolvedValue({ upsertedCount: 0 });

await expect(up(db)).rejects.toThrow("Could not migrate up, a lock is in place.");
});

it("should yield an error when changelog is locked due to a concurrent acquire (E11000)", async() => {
const dupKeyError = new Error("E11000 duplicate key error");
dupKeyError.code = 11000;
changelogLockCollection.updateOne.mockRejectedValue(dupKeyError);

await expect(up(db)).rejects.toThrow("Could not migrate up, a lock is in place.");
});
});