From f82b0db0ac27b0c08b2eb2d793e249e5f9d5cbeb Mon Sep 17 00:00:00 2001 From: Jory Irving Date: Wed, 1 Jul 2026 15:01:23 -0600 Subject: [PATCH] feat(scheduler): in-app scheduler; internalize scheduled sync MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds an opt-in in-process scheduler (DISPATCH_SCHEDULER_ENABLED) wired from instrumentation.ts register(), so periodic work runs inside the server instead of external Kubernetes cronjobs. First job: scheduled issue sync (/api/sync/scheduled), DISPATCH_SYNC_INTERVAL_MS (default 15m). Jobs fire as loopback HTTP POSTs to the app's own endpoints, not direct function calls: this routes through the real route module graph (avoiding the Turbopack standalone chunk-graph isolation that instrumentation.ts/lane-config warn about) and reuses each endpoint's auth + DB lock (concurrent fires 409). The sync endpoint is globally DB-locked, so it's safe to internalize now. Node runtime only (dynamic import so the edge bundle skips it). runJob never throws — a transient failure can't kill the interval; 409 is treated as expected. Scheduler logic is pure + dependency-injected (fetch/timers) so it's unit-tested without a server (10 tests). groomer/pr-followup/prune-closed follow in #503/#504 (they need their own concurrency guards first). Closes #502 --- AGENTS.md | 2 + src/instrumentation.ts | 17 ++++++ src/lib/scheduler.test.ts | 103 +++++++++++++++++++++++++++++++ src/lib/scheduler.ts | 125 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 247 insertions(+) create mode 100644 src/lib/scheduler.test.ts create mode 100644 src/lib/scheduler.ts diff --git a/AGENTS.md b/AGENTS.md index c8cc8907..8109df49 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -41,6 +41,8 @@ npm run db:deploy # Deploy migrations (prod) | `DISPATCH_DATABASE_URL` | No | Alternative database URL alias — used if `DATABASE_URL` is not set | | `NEXTAUTH_SECRET` | No | NextAuth.js secret | | `NEXTAUTH_URL` | No | NextAuth.js URL | +| `DISPATCH_SCHEDULER_ENABLED` | No | `true` runs periodic jobs (issue sync) in-process instead of via external cronjobs. Off by default. Confine to a single replica. | +| `DISPATCH_SYNC_INTERVAL_MS` | No | Scheduled-sync interval when the in-app scheduler is enabled (default 900000 = 15m) | Resolution order: `DATABASE_URL` > `DISPATCH_DATABASE_URL`. `DISPATCH_AGENT_TOKEN` for agent API bearer auth. diff --git a/src/instrumentation.ts b/src/instrumentation.ts index 0f500c73..1f69ac5d 100644 --- a/src/instrumentation.ts +++ b/src/instrumentation.ts @@ -9,4 +9,21 @@ */ export async function register() { // Lane config init moved to src/lib/lane-config.ts module-load side effect. + + // In-app periodic scheduler (opt-in via DISPATCH_SCHEDULER_ENABLED). Node + // runtime only — register() also runs in the edge runtime, which has no + // timers/loopback. Dynamic import so the edge bundle never pulls it in. The + // scheduler fires loopback HTTP POSTs (no shared module state with routes), + // so the Turbopack chunk-graph isolation noted above does not affect it. + if (process.env.NEXT_RUNTIME !== "nodejs") return; + const { schedulerConfigFromEnv, startScheduler } = await import("@/lib/scheduler"); + startScheduler(schedulerConfigFromEnv(process.env), { + fetch, + setInterval, + setTimeout, + log: (message, error) => + error !== undefined + ? console.error(`[scheduler] ${message}`, error) + : console.log(`[scheduler] ${message}`), + }); } diff --git a/src/lib/scheduler.test.ts b/src/lib/scheduler.test.ts new file mode 100644 index 00000000..42eb2435 --- /dev/null +++ b/src/lib/scheduler.test.ts @@ -0,0 +1,103 @@ +import { describe, it, expect, vi } from "vitest"; +import { schedulerConfigFromEnv, runJob, startScheduler, type SchedulerConfig, type SchedulerDeps } from "./scheduler"; + +function fakeDeps(overrides: Partial = {}): SchedulerDeps & { logs: Array<[string, unknown]> } { + const logs: Array<[string, unknown]> = []; + return { + fetch: vi.fn(async () => new Response(null, { status: 200 })) as unknown as typeof fetch, + setInterval: vi.fn(() => "interval-handle"), + setTimeout: vi.fn(() => "timeout-handle"), + log: (m, e) => logs.push([m, e]), + logs, + ...overrides, + }; +} + +const CONFIG: SchedulerConfig = { + enabled: true, + baseUrl: "http://127.0.0.1:3000", + token: "tok", + startupDelayMs: 5000, + jobs: [{ name: "sync", path: "/api/sync/scheduled", body: { issues: true }, intervalMs: 900000 }], +}; + +describe("schedulerConfigFromEnv", () => { + it("is disabled unless DISPATCH_SCHEDULER_ENABLED=true", () => { + expect(schedulerConfigFromEnv({}).enabled).toBe(false); + expect(schedulerConfigFromEnv({ DISPATCH_SCHEDULER_ENABLED: "true" }).enabled).toBe(true); + }); + + it("defaults the sync job to 15m and builds a loopback base from PORT", () => { + const c = schedulerConfigFromEnv({ PORT: "8080", DISPATCH_AGENT_TOKEN: "t" }); + expect(c.baseUrl).toBe("http://127.0.0.1:8080"); + expect(c.token).toBe("t"); + const sync = c.jobs.find((j) => j.name === "sync")!; + expect(sync.path).toBe("/api/sync/scheduled"); + expect(sync.body).toEqual({ issues: true }); + expect(sync.intervalMs).toBe(15 * 60 * 1000); + }); + + it("honors DISPATCH_SYNC_INTERVAL_MS and falls back on garbage", () => { + expect(schedulerConfigFromEnv({ DISPATCH_SYNC_INTERVAL_MS: "60000" }).jobs[0].intervalMs).toBe(60000); + expect(schedulerConfigFromEnv({ DISPATCH_SYNC_INTERVAL_MS: "nope" }).jobs[0].intervalMs).toBe(15 * 60 * 1000); + expect(schedulerConfigFromEnv({ PORT: "" }).baseUrl).toBe("http://127.0.0.1:3000"); + }); +}); + +describe("runJob", () => { + it("POSTs JSON with bearer auth to baseUrl+path", async () => { + const deps = fakeDeps(); + await runJob(CONFIG.jobs[0], CONFIG, deps); + expect(deps.fetch).toHaveBeenCalledWith("http://127.0.0.1:3000/api/sync/scheduled", { + method: "POST", + headers: { "Content-Type": "application/json", Authorization: "Bearer tok" }, + body: JSON.stringify({ issues: true }), + }); + expect(deps.logs).toHaveLength(0); // 200 -> quiet + }); + + it("treats 409 as expected (lock held), not an error", async () => { + const deps = fakeDeps({ fetch: vi.fn(async () => new Response(null, { status: 409 })) as unknown as typeof fetch }); + await runJob(CONFIG.jobs[0], CONFIG, deps); + expect(deps.logs).toHaveLength(0); + }); + + it("logs non-ok statuses without throwing", async () => { + const deps = fakeDeps({ fetch: vi.fn(async () => new Response(null, { status: 500 })) as unknown as typeof fetch }); + await expect(runJob(CONFIG.jobs[0], CONFIG, deps)).resolves.toBeUndefined(); + expect(deps.logs[0][0]).toContain("HTTP 500"); + }); + + it("swallows fetch rejection (a transient failure must not kill the interval)", async () => { + const deps = fakeDeps({ fetch: vi.fn(async () => { throw new Error("econnrefused"); }) as unknown as typeof fetch }); + await expect(runJob(CONFIG.jobs[0], CONFIG, deps)).resolves.toBeUndefined(); + expect(deps.logs[0][0]).toContain("failed"); + }); +}); + +describe("startScheduler", () => { + it("no-ops when disabled", () => { + const deps = fakeDeps(); + expect(startScheduler({ ...CONFIG, enabled: false }, deps)).toEqual([]); + expect(deps.setTimeout).not.toHaveBeenCalled(); + }); + + it("no-ops (and warns) when enabled but token is missing", () => { + const deps = fakeDeps(); + expect(startScheduler({ ...CONFIG, token: "" }, deps)).toEqual([]); + expect(deps.setTimeout).not.toHaveBeenCalled(); + expect(deps.logs.some(([m]) => m.includes("DISPATCH_AGENT_TOKEN is unset"))).toBe(true); + }); + + it("schedules each job after the startup delay, then on an interval", () => { + const deps = fakeDeps(); + startScheduler(CONFIG, deps); + expect(deps.setTimeout).toHaveBeenCalledTimes(1); + expect(deps.setTimeout).toHaveBeenCalledWith(expect.any(Function), 5000); + // fire the startup timer -> it runs once and arms the interval + const startupCb = (deps.setTimeout as ReturnType).mock.calls[0][0] as () => void; + startupCb(); + expect(deps.fetch).toHaveBeenCalledTimes(1); + expect(deps.setInterval).toHaveBeenCalledWith(expect.any(Function), 900000); + }); +}); diff --git a/src/lib/scheduler.ts b/src/lib/scheduler.ts new file mode 100644 index 00000000..bc6d8b05 --- /dev/null +++ b/src/lib/scheduler.ts @@ -0,0 +1,125 @@ +/** + * In-app periodic scheduler. + * + * Runs periodic work (issue sync today; groomer/pr-followup/prune to follow) + * from inside the Next.js server process instead of external Kubernetes + * cronjobs, wired from `src/instrumentation.ts` `register()`. + * + * Jobs fire as **loopback HTTP POSTs to the app's own endpoints**, not direct + * function calls. This is deliberate: + * - It routes through the real Next.js router (the route handler's module + * graph), avoiding the Turbopack standalone chunk-graph isolation that the + * instrumentation.ts / lane-config.ts comments describe — a scheduler in + * the instrumentation chunk that imported job functions directly could run + * against a different module instance (stale lane config, etc.). + * - It reuses each endpoint's existing auth + locking for free (e.g. the sync + * endpoint's DB lock makes concurrent fires collapse to one via 409). + * + * Opt-in via DISPATCH_SCHEDULER_ENABLED so dev/CI don't spin timers, and so it + * can be confined to a single replica until per-job locks exist for the others. + */ + +export interface ScheduledJob { + name: string; + /** App-relative path, e.g. "/api/sync/scheduled". */ + path: string; + /** JSON body POSTed to the endpoint. */ + body: unknown; + intervalMs: number; +} + +export interface SchedulerConfig { + enabled: boolean; + /** Loopback base, e.g. "http://127.0.0.1:3000". */ + baseUrl: string; + /** Bearer token for the endpoints (DISPATCH_AGENT_TOKEN). */ + token: string; + /** Delay before the first fire, so the HTTP server is listening. */ + startupDelayMs: number; + jobs: ScheduledJob[]; +} + +export interface SchedulerDeps { + fetch: typeof fetch; + setInterval: (fn: () => void, ms: number) => unknown; + setTimeout: (fn: () => void, ms: number) => unknown; + log: (message: string, error?: unknown) => void; +} + +const DEFAULT_SYNC_INTERVAL_MS = 15 * 60 * 1000; // 15m +const DEFAULT_STARTUP_DELAY_MS = 5 * 1000; + +function intFromEnv(raw: string | undefined, fallback: number): number { + const n = Number(raw); + return Number.isFinite(n) && n > 0 ? n : fallback; +} + +/** + * Build the scheduler config from the environment. Only the sync job is wired + * today; groomer/pr-followup/prune-closed are added in follow-up issues (they + * need their own concurrency guards first). + */ +export function schedulerConfigFromEnv(env: Record): SchedulerConfig { + const port = env.PORT && env.PORT.trim() !== "" ? env.PORT.trim() : "3000"; + return { + enabled: env.DISPATCH_SCHEDULER_ENABLED === "true", + baseUrl: `http://127.0.0.1:${port}`, + token: env.DISPATCH_AGENT_TOKEN ?? "", + startupDelayMs: intFromEnv(env.DISPATCH_SCHEDULER_STARTUP_DELAY_MS, DEFAULT_STARTUP_DELAY_MS), + jobs: [ + { + name: "sync", + path: "/api/sync/scheduled", + body: { issues: true }, + intervalMs: intFromEnv(env.DISPATCH_SYNC_INTERVAL_MS, DEFAULT_SYNC_INTERVAL_MS), + }, + ], + }; +} + +/** Fire one job. Never throws — a transient failure must not kill the interval. */ +export async function runJob(job: ScheduledJob, config: SchedulerConfig, deps: SchedulerDeps): Promise { + try { + const res = await deps.fetch(`${config.baseUrl}${job.path}`, { + method: "POST", + headers: { + "Content-Type": "application/json", + Authorization: `Bearer ${config.token}`, + }, + body: JSON.stringify(job.body), + }); + // 409 = another run holds the lock (expected under concurrency); not an error. + if (!res.ok && res.status !== 409) { + deps.log(`job "${job.name}" -> HTTP ${res.status}`); + } + } catch (error) { + deps.log(`job "${job.name}" failed`, error); + } +} + +/** + * Start the scheduler. No-op (returns []) when disabled or when no token is + * configured. Each job runs once after startupDelayMs, then every intervalMs. + * Returns the interval handles (for teardown in tests). + */ +export function startScheduler(config: SchedulerConfig, deps: SchedulerDeps): unknown[] { + if (!config.enabled) { + deps.log("disabled (set DISPATCH_SCHEDULER_ENABLED=true to enable)"); + return []; + } + if (!config.token) { + deps.log("enabled but DISPATCH_AGENT_TOKEN is unset; not scheduling any jobs"); + return []; + } + + const handles: unknown[] = []; + for (const job of config.jobs) { + deps.log(`scheduling "${job.name}" every ${job.intervalMs}ms -> ${job.path}`); + deps.setTimeout(() => { + void runJob(job, config, deps); + const handle = deps.setInterval(() => void runJob(job, config, deps), job.intervalMs); + handles.push(handle); + }, config.startupDelayMs); + } + return handles; +}