Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ npm run db:deploy # Deploy migrations (prod)
| `DISPATCH_DATABASE_URL` | No | Alternative database URL alias — used if `DATABASE_URL` is not set |
| `NEXTAUTH_SECRET` | No | NextAuth.js secret |
| `NEXTAUTH_URL` | No | NextAuth.js URL |
| `DISPATCH_SCHEDULER_ENABLED` | No | `true` runs periodic jobs (issue sync) in-process instead of via external cronjobs. Off by default. Confine to a single replica. |
| `DISPATCH_SYNC_INTERVAL_MS` | No | Scheduled-sync interval when the in-app scheduler is enabled (default 900000 = 15m) |

Resolution order: `DATABASE_URL` > `DISPATCH_DATABASE_URL`. `DISPATCH_AGENT_TOKEN` for agent API bearer auth.

Expand Down
17 changes: 17 additions & 0 deletions src/instrumentation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,21 @@
*/
export async function register() {
// Lane config init moved to src/lib/lane-config.ts module-load side effect.

// In-app periodic scheduler (opt-in via DISPATCH_SCHEDULER_ENABLED). Node
// runtime only — register() also runs in the edge runtime, which has no
// timers/loopback. Dynamic import so the edge bundle never pulls it in. The
// scheduler fires loopback HTTP POSTs (no shared module state with routes),
// so the Turbopack chunk-graph isolation noted above does not affect it.
if (process.env.NEXT_RUNTIME !== "nodejs") return;
const { schedulerConfigFromEnv, startScheduler } = await import("@/lib/scheduler");
startScheduler(schedulerConfigFromEnv(process.env), {
fetch,
setInterval,
setTimeout,
log: (message, error) =>
error !== undefined
? console.error(`[scheduler] ${message}`, error)
: console.log(`[scheduler] ${message}`),
});
}
103 changes: 103 additions & 0 deletions src/lib/scheduler.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import { describe, it, expect, vi } from "vitest";
import { schedulerConfigFromEnv, runJob, startScheduler, type SchedulerConfig, type SchedulerDeps } from "./scheduler";

function fakeDeps(overrides: Partial<SchedulerDeps> = {}): SchedulerDeps & { logs: Array<[string, unknown]> } {
const logs: Array<[string, unknown]> = [];
return {
fetch: vi.fn(async () => new Response(null, { status: 200 })) as unknown as typeof fetch,
setInterval: vi.fn(() => "interval-handle"),
setTimeout: vi.fn(() => "timeout-handle"),
log: (m, e) => logs.push([m, e]),
logs,
...overrides,
};
}

const CONFIG: SchedulerConfig = {
enabled: true,
baseUrl: "http://127.0.0.1:3000",
token: "tok",
startupDelayMs: 5000,
jobs: [{ name: "sync", path: "/api/sync/scheduled", body: { issues: true }, intervalMs: 900000 }],
};

describe("schedulerConfigFromEnv", () => {
it("is disabled unless DISPATCH_SCHEDULER_ENABLED=true", () => {
expect(schedulerConfigFromEnv({}).enabled).toBe(false);
expect(schedulerConfigFromEnv({ DISPATCH_SCHEDULER_ENABLED: "true" }).enabled).toBe(true);
});

it("defaults the sync job to 15m and builds a loopback base from PORT", () => {
const c = schedulerConfigFromEnv({ PORT: "8080", DISPATCH_AGENT_TOKEN: "t" });
expect(c.baseUrl).toBe("http://127.0.0.1:8080");
expect(c.token).toBe("t");
const sync = c.jobs.find((j) => j.name === "sync")!;
expect(sync.path).toBe("/api/sync/scheduled");
expect(sync.body).toEqual({ issues: true });
expect(sync.intervalMs).toBe(15 * 60 * 1000);
});

it("honors DISPATCH_SYNC_INTERVAL_MS and falls back on garbage", () => {
expect(schedulerConfigFromEnv({ DISPATCH_SYNC_INTERVAL_MS: "60000" }).jobs[0].intervalMs).toBe(60000);
expect(schedulerConfigFromEnv({ DISPATCH_SYNC_INTERVAL_MS: "nope" }).jobs[0].intervalMs).toBe(15 * 60 * 1000);
expect(schedulerConfigFromEnv({ PORT: "" }).baseUrl).toBe("http://127.0.0.1:3000");
});
});

describe("runJob", () => {
it("POSTs JSON with bearer auth to baseUrl+path", async () => {
const deps = fakeDeps();
await runJob(CONFIG.jobs[0], CONFIG, deps);
expect(deps.fetch).toHaveBeenCalledWith("http://127.0.0.1:3000/api/sync/scheduled", {
method: "POST",
headers: { "Content-Type": "application/json", Authorization: "Bearer tok" },
body: JSON.stringify({ issues: true }),
});
expect(deps.logs).toHaveLength(0); // 200 -> quiet
});

it("treats 409 as expected (lock held), not an error", async () => {
const deps = fakeDeps({ fetch: vi.fn(async () => new Response(null, { status: 409 })) as unknown as typeof fetch });
await runJob(CONFIG.jobs[0], CONFIG, deps);
expect(deps.logs).toHaveLength(0);
});

it("logs non-ok statuses without throwing", async () => {
const deps = fakeDeps({ fetch: vi.fn(async () => new Response(null, { status: 500 })) as unknown as typeof fetch });
await expect(runJob(CONFIG.jobs[0], CONFIG, deps)).resolves.toBeUndefined();
expect(deps.logs[0][0]).toContain("HTTP 500");
});

it("swallows fetch rejection (a transient failure must not kill the interval)", async () => {
const deps = fakeDeps({ fetch: vi.fn(async () => { throw new Error("econnrefused"); }) as unknown as typeof fetch });
await expect(runJob(CONFIG.jobs[0], CONFIG, deps)).resolves.toBeUndefined();
expect(deps.logs[0][0]).toContain("failed");
});
});

describe("startScheduler", () => {
it("no-ops when disabled", () => {
const deps = fakeDeps();
expect(startScheduler({ ...CONFIG, enabled: false }, deps)).toEqual([]);
expect(deps.setTimeout).not.toHaveBeenCalled();
});

it("no-ops (and warns) when enabled but token is missing", () => {
const deps = fakeDeps();
expect(startScheduler({ ...CONFIG, token: "" }, deps)).toEqual([]);
expect(deps.setTimeout).not.toHaveBeenCalled();
expect(deps.logs.some(([m]) => m.includes("DISPATCH_AGENT_TOKEN is unset"))).toBe(true);
});

it("schedules each job after the startup delay, then on an interval", () => {
const deps = fakeDeps();
startScheduler(CONFIG, deps);
expect(deps.setTimeout).toHaveBeenCalledTimes(1);
expect(deps.setTimeout).toHaveBeenCalledWith(expect.any(Function), 5000);
// fire the startup timer -> it runs once and arms the interval
const startupCb = (deps.setTimeout as ReturnType<typeof vi.fn>).mock.calls[0][0] as () => void;
startupCb();
expect(deps.fetch).toHaveBeenCalledTimes(1);
expect(deps.setInterval).toHaveBeenCalledWith(expect.any(Function), 900000);
});
});
125 changes: 125 additions & 0 deletions src/lib/scheduler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/**
* In-app periodic scheduler.
*
* Runs periodic work (issue sync today; groomer/pr-followup/prune to follow)
* from inside the Next.js server process instead of external Kubernetes
* cronjobs, wired from `src/instrumentation.ts` `register()`.
*
* Jobs fire as **loopback HTTP POSTs to the app's own endpoints**, not direct
* function calls. This is deliberate:
* - It routes through the real Next.js router (the route handler's module
* graph), avoiding the Turbopack standalone chunk-graph isolation that the
* instrumentation.ts / lane-config.ts comments describe — a scheduler in
* the instrumentation chunk that imported job functions directly could run
* against a different module instance (stale lane config, etc.).
* - It reuses each endpoint's existing auth + locking for free (e.g. the sync
* endpoint's DB lock makes concurrent fires collapse to one via 409).
*
* Opt-in via DISPATCH_SCHEDULER_ENABLED so dev/CI don't spin timers, and so it
* can be confined to a single replica until per-job locks exist for the others.
*/

export interface ScheduledJob {
name: string;
/** App-relative path, e.g. "/api/sync/scheduled". */
path: string;
/** JSON body POSTed to the endpoint. */
body: unknown;
intervalMs: number;
}

export interface SchedulerConfig {
enabled: boolean;
/** Loopback base, e.g. "http://127.0.0.1:3000". */
baseUrl: string;
/** Bearer token for the endpoints (DISPATCH_AGENT_TOKEN). */
token: string;
/** Delay before the first fire, so the HTTP server is listening. */
startupDelayMs: number;
jobs: ScheduledJob[];
}

export interface SchedulerDeps {
fetch: typeof fetch;
setInterval: (fn: () => void, ms: number) => unknown;
setTimeout: (fn: () => void, ms: number) => unknown;
log: (message: string, error?: unknown) => void;
}

const DEFAULT_SYNC_INTERVAL_MS = 15 * 60 * 1000; // 15m
const DEFAULT_STARTUP_DELAY_MS = 5 * 1000;

function intFromEnv(raw: string | undefined, fallback: number): number {
const n = Number(raw);
return Number.isFinite(n) && n > 0 ? n : fallback;
}

/**
* Build the scheduler config from the environment. Only the sync job is wired
* today; groomer/pr-followup/prune-closed are added in follow-up issues (they
* need their own concurrency guards first).
*/
export function schedulerConfigFromEnv(env: Record<string, string | undefined>): SchedulerConfig {
const port = env.PORT && env.PORT.trim() !== "" ? env.PORT.trim() : "3000";
return {
enabled: env.DISPATCH_SCHEDULER_ENABLED === "true",
baseUrl: `http://127.0.0.1:${port}`,
token: env.DISPATCH_AGENT_TOKEN ?? "",
startupDelayMs: intFromEnv(env.DISPATCH_SCHEDULER_STARTUP_DELAY_MS, DEFAULT_STARTUP_DELAY_MS),
jobs: [
{
name: "sync",
path: "/api/sync/scheduled",
body: { issues: true },
intervalMs: intFromEnv(env.DISPATCH_SYNC_INTERVAL_MS, DEFAULT_SYNC_INTERVAL_MS),
},
],
};
}

/** Fire one job. Never throws — a transient failure must not kill the interval. */
export async function runJob(job: ScheduledJob, config: SchedulerConfig, deps: SchedulerDeps): Promise<void> {
try {
const res = await deps.fetch(`${config.baseUrl}${job.path}`, {
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: `Bearer ${config.token}`,
},
body: JSON.stringify(job.body),
});
// 409 = another run holds the lock (expected under concurrency); not an error.
if (!res.ok && res.status !== 409) {
deps.log(`job "${job.name}" -> HTTP ${res.status}`);
}
} catch (error) {
deps.log(`job "${job.name}" failed`, error);
}
}

/**
* Start the scheduler. No-op (returns []) when disabled or when no token is
* configured. Each job runs once after startupDelayMs, then every intervalMs.
* Returns the interval handles (for teardown in tests).
*/
export function startScheduler(config: SchedulerConfig, deps: SchedulerDeps): unknown[] {
if (!config.enabled) {
deps.log("disabled (set DISPATCH_SCHEDULER_ENABLED=true to enable)");
return [];
}
if (!config.token) {
deps.log("enabled but DISPATCH_AGENT_TOKEN is unset; not scheduling any jobs");
return [];
}

const handles: unknown[] = [];
for (const job of config.jobs) {
deps.log(`scheduling "${job.name}" every ${job.intervalMs}ms -> ${job.path}`);
deps.setTimeout(() => {
void runJob(job, config, deps);
const handle = deps.setInterval(() => void runJob(job, config, deps), job.intervalMs);
handles.push(handle);
}, config.startupDelayMs);
}
return handles;
}