Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions src/lib/groomer/groomer-lock.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/**
* Groomer run lock.
*
* A DB-backed single-row lock that serializes hosted-groomer runs. Without it,
* two concurrent runs can both select the same candidate before either acquires
* the per-issue lease (selection in selector.ts and lease acquisition in run.ts
* are not atomic), causing a duplicate LLM call + duplicate label writes.
*
* Mirrors src/lib/sync-lock.ts, reusing the generic `sync_lock` table with a
* distinct id ("groomer") — its `syncRunId` column is a plain nullable string
* (no FK), used here to hold a random lock token. First writer wins; a stale
* lock (>30 min) is reclaimed; the lock is released in a try/finally.
*/

import { randomUUID } from "crypto";
import { prisma } from "@/lib/prisma";

const LOCK_ID = "groomer" as const;
const MAX_AGE_MS = 30 * 60 * 1000; // 30 minutes

export type GroomerLock = { locked: true; token: string } | { locked: false };

/** Attempt to acquire the groomer run lock. */
export async function acquireGroomerLock(): Promise<GroomerLock> {
const existing = await prisma.syncLock.findUnique({ where: { id: LOCK_ID } });
if (existing && existing.syncRunId) {
const age = Date.now() - existing.acquiredAt.getTime();
if (age < MAX_AGE_MS) {
return { locked: false };
}
// Stale lock — clear it and proceed.
await prisma.syncLock.delete({ where: { id: LOCK_ID } });
}

const token = randomUUID();
try {
await prisma.$transaction(async (tx) => {
// Double-check inside the transaction for race safety.
const stillExisting = await tx.syncLock.findUnique({ where: { id: LOCK_ID } });
if (stillExisting && stillExisting.syncRunId) {
throw new Error("already_locked");
}
await tx.syncLock.create({ data: { id: LOCK_ID, syncRunId: token, acquiredAt: new Date() } });
});
} catch (err) {
if (err instanceof Error && err.message === "already_locked") {
return { locked: false };
}
throw err;
}

return { locked: true, token };
}

/** Release the groomer run lock. Conditional on the token so we never release another run's lock. */
export async function releaseGroomerLock(token: string): Promise<void> {
await prisma.syncLock.deleteMany({ where: { id: LOCK_ID, syncRunId: token } });
}
26 changes: 26 additions & 0 deletions src/lib/groomer/run.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ const { mocks } = vi.hoisted(() => ({
addIssueLabel: vi.fn(),
removeIssueLabel: vi.fn(),
buildRepositoryContext: vi.fn(),
acquireGroomerLock: vi.fn(),
releaseGroomerLock: vi.fn(),
prisma: {
automationRepo: { findUnique: vi.fn() },
groomingRun: { create: vi.fn(), update: vi.fn(), findFirst: vi.fn() },
Expand Down Expand Up @@ -84,6 +86,11 @@ vi.mock("./repository-context", () => ({
buildRepositoryContext: mocks.buildRepositoryContext,
}));

vi.mock("./groomer-lock", () => ({
acquireGroomerLock: mocks.acquireGroomerLock,
releaseGroomerLock: mocks.releaseGroomerLock,
}));

import { runHostedGroomer } from "./run";

const mockCandidate: GroomingCandidate = {
Expand Down Expand Up @@ -138,6 +145,8 @@ describe("runHostedGroomer", () => {
mocks.findActiveLeasesForIssue.mockResolvedValue([]);
mocks.upsertLease.mockResolvedValue({ created: true, lease: { id: "lease-1" } });
mocks.releaseLease.mockResolvedValue({ id: "lease-1" });
mocks.acquireGroomerLock.mockResolvedValue({ locked: true, token: "lock-token" });
mocks.releaseGroomerLock.mockResolvedValue(undefined);
mocks.prisma.automationRepo.findUnique.mockResolvedValue(mockAutomationRepo);
mocks.prisma.groomingRun.create.mockResolvedValue(mockGroomingRun);
mocks.prisma.groomingRun.update.mockResolvedValue({ ...mockGroomingRun, stage: "planned" });
Expand All @@ -164,6 +173,23 @@ describe("runHostedGroomer", () => {
expect(mocks.callGroomerLLM).not.toHaveBeenCalled();
});

it("bails without selecting when the groomer lock is held", async () => {
mocks.acquireGroomerLock.mockResolvedValue({ locked: false });

const result = await runHostedGroomer();

expect(result).toBeNull();
expect(mocks.selectGroomingCandidate).not.toHaveBeenCalled();
expect(mocks.releaseGroomerLock).not.toHaveBeenCalled();
});

it("releases the groomer lock after a run completes", async () => {
await runHostedGroomer();

expect(mocks.acquireGroomerLock).toHaveBeenCalledTimes(1);
expect(mocks.releaseGroomerLock).toHaveBeenCalledWith("lock-token");
});

it("dry-run creates and completes groomingRun and result has groomingRunId", async () => {
mocks.getHostedGroomerConfig.mockReturnValue({ ...mockConfig, dryRun: true });

Expand Down
21 changes: 21 additions & 0 deletions src/lib/groomer/run.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { prisma } from "@/lib/prisma";
import { addIssueComment, updateIssueLabels, updateIssueTitleAndBody } from "@/lib/github";
import { findActiveLeasesForIssue, releaseLease, upsertLease } from "@/lib/lease";
import { acquireGroomerLock, releaseGroomerLock } from "./groomer-lock";
import { selectGroomingCandidate } from "./selector";
import { buildIssueContext, fetchIssueComments } from "./context";
import { callGroomerLLM } from "./llm";
Expand Down Expand Up @@ -47,6 +48,8 @@ export interface GroomerDeps {
releaseLease: typeof releaseLease;
prisma: typeof prisma;
buildRepositoryContext: typeof buildRepositoryContext;
acquireGroomerLock: typeof acquireGroomerLock;
releaseGroomerLock: typeof releaseGroomerLock;
}

const defaultDeps: GroomerDeps = {
Expand All @@ -64,11 +67,29 @@ const defaultDeps: GroomerDeps = {
releaseLease,
prisma,
buildRepositoryContext,
acquireGroomerLock,
releaseGroomerLock,
};

export async function runHostedGroomer(
options: RunHostedGroomerOptions = {},
deps: GroomerDeps = defaultDeps,
): Promise<GroomerRunResult | null> {
// Serialize runs behind a DB lock: without it, two concurrent groomer runs
// can select the same candidate before either acquires the per-issue lease
// (selection and lease acquisition are not atomic), double-grooming the issue.
const lock = await deps.acquireGroomerLock();
if (!lock.locked) return null;
try {
return await executeGroomerRun(options, deps);
} finally {
await deps.releaseGroomerLock(lock.token);
}
}

async function executeGroomerRun(
options: RunHostedGroomerOptions = {},
deps: GroomerDeps = defaultDeps,
): Promise<GroomerRunResult | null> {
const config = deps.getConfig();
const dryRun = options.dryRun ?? config.dryRun;
Expand Down