From c75679706df1a0e0eb5c625bdb899da382f39d6d Mon Sep 17 00:00:00 2001 From: woywro Date: Tue, 24 Mar 2026 11:35:09 +0100 Subject: [PATCH] Revert "revert: undo all AIW-3 changes (poll workflow migration)" This reverts commit 2a8672147bcae4a0d35628fc4efab62976a6c2a5. --- .env.example | 4 +- .github/workflows/post-deploy.yml | 18 +++ docs/SPEC.md | 37 +++--- env.ts | 8 +- nitro.config.ts | 1 + src/routes/cron/poll.get.ts | 132 --------------------- src/routes/poll/start.get.test.ts | 139 ++++++++++++++++++++++ src/routes/poll/start.get.ts | 61 ++++++++++ src/workflows/poll.ts | 189 ++++++++++++++++++++++++++++++ vercel.json | 9 +- 10 files changed, 437 insertions(+), 161 deletions(-) create mode 100644 .github/workflows/post-deploy.yml delete mode 100644 src/routes/cron/poll.get.ts create mode 100644 src/routes/poll/start.get.test.ts create mode 100644 src/routes/poll/start.get.ts create mode 100644 src/workflows/poll.ts diff --git a/.env.example b/.env.example index 8ea1f6d..30223c5 100644 --- a/.env.example +++ b/.env.example @@ -38,8 +38,8 @@ POLL_INTERVAL_MS=300000 # VERCEL_TEAM_ID= # VERCEL_PROJECT_ID= -# Cron auth -CRON_SECRET= +# Deploy hook auth +DEPLOY_HOOK_SECRET= # Workflow (local dev only) WORKFLOW_POSTGRES_URL=postgresql://localhost:5432/ai_workflow diff --git a/.github/workflows/post-deploy.yml b/.github/workflows/post-deploy.yml new file mode 100644 index 0000000..5245e80 --- /dev/null +++ b/.github/workflows/post-deploy.yml @@ -0,0 +1,18 @@ +name: Start poll workflow after deploy + +on: + deployment_status: + +jobs: + start-poll: + if: github.event.deployment_status.state == 'success' + runs-on: ubuntu-latest + steps: + - name: Trigger poll workflow + run: | + URL="${{ github.event.deployment_status.target_url }}/poll/start" + echo "Calling: $URL" + curl -sf -X GET \ + -H "Authorization: Bearer ${{ secrets.DEPLOY_HOOK_SECRET }}" \ + -H "x-vercel-protection-bypass: ${{ secrets.VERCEL_AUTOMATION_BYPASS }}" \ + "$URL" diff --git a/docs/SPEC.md b/docs/SPEC.md index 4d53d75..0f5d2f7 100644 --- a/docs/SPEC.md +++ b/docs/SPEC.md @@ -69,10 +69,13 @@ Important boundary: ### 3.1 Main Components -1. **Poller** (Vercel Cron) - - Runs on a configurable interval (`POLL_INTERVAL_MS`, default 5 min). +1. **Poller** (Vercel Workflow with sleep) + - Runs as a long-lived Vercel Workflow that sleeps between poll cycles (`POLL_INTERVAL_MS`, default 15s). - Queries the issue tracker for tickets in the AI column. - For each discovered ticket, starts a Vercel Workflow run if one is not already active. + - Started via `GET /poll/start` route which cancels any existing poll run and starts a fresh one. + A GitHub Action triggers this route on every successful deployment — no Vercel Cron needed. + Protected by `DEPLOY_HOOK_SECRET` bearer token. 2. **Issue Tracker Adapter** - Reads ticket data (description, acceptance criteria, comments, labels). @@ -202,7 +205,7 @@ All runtime config lives in environment variables, validated at startup. Key config groups: - **Sandbox:** concurrency limit (`MAX_CONCURRENT_AGENTS`), job timeout (`JOB_TIMEOUT_MS`). -- **Polling:** interval (`POLL_INTERVAL_MS`). +- **Polling:** interval between cycles (`POLL_INTERVAL_MS`, default 15s). - **Issue Tracker:** adapter kind (`ISSUE_TRACKER_KIND`), project key (`JIRA_PROJECT_KEY`), credentials. - **Messaging:** Chat SDK credentials (`CHAT_SDK_API_KEY`), channel config. @@ -255,8 +258,8 @@ transition. ### 8.1 Polling -The poller (Vercel Cron) runs every `POLL_INTERVAL_MS` and queries the issue tracker for tickets -in the AI column. For each discovered ticket: +The poller runs as a long-lived Vercel Workflow that sleeps `POLL_INTERVAL_MS` (default 15s) +between cycles and queries the issue tracker for tickets in the AI column. For each discovered ticket: 1. Check if a Vercel Workflow run is already active for this ticket — if so, skip. 2. Determine run type based on ticket state: @@ -616,17 +619,21 @@ Notifications are best-effort — never block the workflow. ### 16.1 Poller ``` -on_poll(): - tickets = issueTrackerAdapter.searchTickets("column = AI") +poll_workflow(): + while true: + ticketKeys = issueTrackerAdapter.searchTickets("column = AI") - for ticket in tickets: - if hasActiveWorkflowRun(ticket.id): continue - if atConcurrencyLimit(): break + for key in ticketKeys: + if hasActiveWorkflowRun(key): continue + if atConcurrencyLimit(): break - workflow.start("ticket_workflow", { - ticketId: ticket.id, - identifier: ticket.identifier - }) + workflow.start("ticket_workflow", { + ticketId: key, + identifier: key + }) + + reconcileRegistry(ticketKeys) + sleep(POLL_INTERVAL_MS) ``` ### 16.2 Ticket Workflow (Vercel Workflow) @@ -765,7 +772,7 @@ run_fixing_feedback(ticketId, existingPR): ### 18.1 Required for MVP -- [ ] Poller — Vercel Cron that queries issue tracker and dispatches workflow runs. +- [ ] Poller — GitHub Action deploy hook that starts the poll workflow on each deployment. - [ ] Issue Tracker adapter (Jira first). - [ ] VCS adapter (GitHub). - [ ] Messaging adapter (Chat SDK — chat-sdk.dev — for Slack/Teams). diff --git a/env.ts b/env.ts index 41ed804..3958c3d 100644 --- a/env.ts +++ b/env.ts @@ -36,16 +36,16 @@ export const env = createEnv({ JOB_TIMEOUT_MS: z.coerce.number().int().positive().default(1_800_000), // Polling - POLL_INTERVAL_MS: z.coerce.number().int().positive().default(300_000), + POLL_INTERVAL_MS: z.coerce.number().int().positive().default(15_000), + + // Deploy hook auth + DEPLOY_HOOK_SECRET: z.string().min(1).optional(), // Vercel (optional — auto via OIDC on Vercel) VERCEL_TOKEN: z.string().min(1).optional(), VERCEL_TEAM_ID: z.string().min(1).optional(), VERCEL_PROJECT_ID: z.string().min(1).optional(), - // Cron - CRON_SECRET: z.string().min(1).optional(), - // Redis (run registry) AI_WORKFLOW_KV_REST_API_URL: z.string().url(), AI_WORKFLOW_KV_REST_API_TOKEN: z.string().min(1), diff --git a/nitro.config.ts b/nitro.config.ts index 7dd4624..96eaeb5 100644 --- a/nitro.config.ts +++ b/nitro.config.ts @@ -5,4 +5,5 @@ export default defineNitroConfig({ modules: ["workflow/nitro"], compatibilityDate: "2025-01-01", srcDir: "src", + ignore: ["**/*.test.ts"], }); diff --git a/src/routes/cron/poll.get.ts b/src/routes/cron/poll.get.ts deleted file mode 100644 index 443d04f..0000000 --- a/src/routes/cron/poll.get.ts +++ /dev/null @@ -1,132 +0,0 @@ -import { defineEventHandler, getHeader, createError } from "h3"; -import { start, getRun } from "workflow/api"; -import { env } from "../../../env.js"; -import { createAdapters } from "../../lib/adapters.js"; -import { implementationWorkflow } from "../../workflows/implementation.js"; -import { reviewFixWorkflow } from "../../workflows/review-fix.js"; -import { logger } from "../../lib/logger.js"; - -async function getActiveSandboxCount(): Promise { - try { - const { Sandbox } = await import("@vercel/sandbox"); - const { json } = await Sandbox.list({ limit: 100 }); - return json.sandboxes.filter((s: any) => s.status === "running").length; - } catch { - return 0; - } -} - -export default defineEventHandler(async (event) => { - // Verify Vercel Cron auth - if (env.CRON_SECRET) { - const auth = getHeader(event, "authorization"); - if (auth !== `Bearer ${env.CRON_SECRET}`) { - throw createError({ statusCode: 401, statusMessage: "Unauthorized" }); - } - } - - const { issueTracker, vcs, runRegistry } = createAdapters(); - - // Search for tickets in AI column - const jql = `project = ${env.JIRA_PROJECT_KEY} AND status = "${env.COLUMN_AI}"`; - const ticketKeys = await issueTracker.searchTickets(jql); - - logger.info({ ticketCount: ticketKeys.length }, "poll_discovered_tickets"); - - // Concurrency control (spec Section 8.2) - const activeSandboxes = await getActiveSandboxCount(); - const availableSlots = Math.max(0, env.MAX_CONCURRENT_AGENTS - activeSandboxes); - if (availableSlots === 0) { - logger.info({ active: activeSandboxes, max: env.MAX_CONCURRENT_AGENTS }, "poll_at_capacity"); - return { status: "ok", discovered: ticketKeys.length, started: 0, reason: "at_capacity" }; - } - - const started: string[] = []; - - for (const key of ticketKeys) { - if (started.length >= availableSlots) break; - - // Atomically claim the ticket to prevent duplicate dispatches - const claimed = await runRegistry.claim(key, "claiming"); - if (!claimed) { - logger.info({ ticketKey: key }, "poll_ticket_already_claimed"); - continue; - } - - try { - const ticket = await issueTracker.fetchTicket(key); - const branchName = `blazebot/${ticket.identifier.toLowerCase()}`; - const existingPR = await vcs.findPR(branchName); - - if (existingPR) { - const handle = await start(reviewFixWorkflow, [ticket.id, branchName]); - await runRegistry.register(ticket.identifier, handle.runId); - logger.info( - { ticketId: ticket.id, identifier: ticket.identifier, runId: handle.runId }, - "workflow_started_review_fix", - ); - } else { - const handle = await start(implementationWorkflow, [ticket.id]); - await runRegistry.register(ticket.identifier, handle.runId); - logger.info( - { ticketId: ticket.id, identifier: ticket.identifier, runId: handle.runId }, - "workflow_started_implementation", - ); - } - - started.push(ticket.identifier); - } catch (err) { - // Release the claim if dispatch failed so the ticket can be retried - await runRegistry.unregister(key).catch(() => {}); - logger.warn( - { ticketKey: key, error: (err as Error).message }, - "poll_ticket_dispatch_error", - ); - } - } - - // Reconcile registry: cancel stale runs and clean up dead entries - const aiColumnSet = new Set(ticketKeys); - const activeRuns = await runRegistry.listAll(); - let cancelled = 0; - let cleaned = 0; - - for (const { ticketKey, runId } of activeRuns) { - if (aiColumnSet.has(ticketKey)) { - // Ticket is still in AI column — verify the run is actually alive - try { - const run = getRun(runId); - const status = await run.status; - if (status === "completed" || status === "failed" || status === "cancelled") { - await runRegistry.unregister(ticketKey); - logger.info({ ticketKey, runId, status }, "poll_cleaned_dead_run"); - cleaned++; - } - } catch { - // Run not found or status check failed — clean up so ticket can be retried - await runRegistry.unregister(ticketKey).catch(() => {}); - logger.warn({ ticketKey, runId }, "poll_cleaned_unreachable_run"); - cleaned++; - } - continue; - } - - // Ticket left the AI column — cancel and unregister - try { - const run = getRun(runId); - await run.cancel(); - await runRegistry.unregister(ticketKey); - logger.info({ ticketKey, runId }, "poll_cancelled_stale_run"); - cancelled++; - } catch (err) { - // Run may already be finished — unregister to clean up - await runRegistry.unregister(ticketKey).catch(() => {}); - logger.warn( - { ticketKey, runId, error: (err as Error).message }, - "poll_stale_run_cleanup_error", - ); - } - } - - return { status: "ok", discovered: ticketKeys.length, started: started.length, cancelled, cleaned }; -}); diff --git a/src/routes/poll/start.get.test.ts b/src/routes/poll/start.get.test.ts new file mode 100644 index 0000000..bfccfc7 --- /dev/null +++ b/src/routes/poll/start.get.test.ts @@ -0,0 +1,139 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; + +const { mockRedis, mockStart, mockGetRun } = vi.hoisted(() => ({ + mockRedis: { + get: vi.fn(), + set: vi.fn(), + del: vi.fn(), + }, + mockStart: vi.fn(), + mockGetRun: vi.fn(), +})); + +vi.mock("@upstash/redis", () => ({ + Redis: vi.fn(() => mockRedis), +})); + +vi.mock("workflow/api", () => ({ + start: (...args: any[]) => mockStart(...args), + getRun: (...args: any[]) => mockGetRun(...args), +})); + +vi.mock("../../workflows/poll.js", () => ({ + pollWorkflow: vi.fn(), +})); + +vi.mock("../../../env.js", () => ({ + env: { + AI_WORKFLOW_KV_REST_API_URL: "https://fake.upstash.io", + AI_WORKFLOW_KV_REST_API_TOKEN: "fake-token", + }, +})); + +vi.mock("../../lib/logger.js", () => ({ + logger: { info: vi.fn(), warn: vi.fn() }, +})); + +import handler from "./start.get.js"; +import { env } from "../../../env.js"; + +const handle = + typeof handler === "function" ? handler : (handler as any).handler; + +describe("GET /poll/start", () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + + it("starts a new workflow when none exists", async () => { + mockRedis.set.mockResolvedValueOnce("OK"); + mockRedis.get.mockResolvedValueOnce(null); + mockStart.mockResolvedValueOnce({ runId: "run_new" }); + mockRedis.set.mockResolvedValueOnce("OK"); + + const result = await handle({} as any); + + expect(result).toEqual({ + status: "restarted", + runId: "run_new", + cancelledRunId: null, + }); + expect(mockRedis.set).toHaveBeenCalledWith( + "blazebot:poll-workflow", + "run_new", + ); + expect(mockRedis.del).toHaveBeenCalledWith("blazebot:poll-workflow:lock"); + }); + + it("cancels existing workflow and starts a new one", async () => { + mockRedis.set.mockResolvedValueOnce("OK"); + mockRedis.get.mockResolvedValueOnce("run_old"); + const mockCancel = vi.fn().mockResolvedValueOnce(undefined); + mockGetRun.mockReturnValueOnce({ cancel: mockCancel }); + mockRedis.del.mockResolvedValueOnce(1); + mockStart.mockResolvedValueOnce({ runId: "run_new" }); + mockRedis.set.mockResolvedValueOnce("OK"); + + const result = await handle({} as any); + + expect(result).toEqual({ + status: "restarted", + runId: "run_new", + cancelledRunId: "run_old", + }); + expect(mockCancel).toHaveBeenCalled(); + }); + + it("starts new workflow even when cancel of existing throws", async () => { + mockRedis.set.mockResolvedValueOnce("OK"); + mockRedis.get.mockResolvedValueOnce("run_gone"); + mockGetRun.mockImplementationOnce(() => { + throw new Error("not found"); + }); + mockRedis.del.mockResolvedValueOnce(1); + mockStart.mockResolvedValueOnce({ runId: "run_fresh" }); + mockRedis.set.mockResolvedValueOnce("OK"); + + const result = await handle({} as any); + + expect(result).toEqual({ + status: "restarted", + runId: "run_fresh", + cancelledRunId: "run_gone", + }); + }); + + it("returns lock_contention when another start is in progress", async () => { + mockRedis.set.mockResolvedValueOnce(null); + + const result = await handle({} as any); + + expect(result).toEqual({ + status: "lock_contention", + message: "Another start request is in progress", + }); + expect(mockStart).not.toHaveBeenCalled(); + }); + + it("rejects requests with wrong bearer token when DEPLOY_HOOK_SECRET is set", async () => { + (env as any).DEPLOY_HOOK_SECRET = "real-secret"; + + const mockEvent = { + node: { + req: { + headers: { + authorization: "Bearer wrong-secret", + }, + }, + }, + }; + + try { + await expect(handle(mockEvent as any)).rejects.toMatchObject({ + statusCode: 401, + }); + } finally { + delete (env as any).DEPLOY_HOOK_SECRET; + } + }); +}); diff --git a/src/routes/poll/start.get.ts b/src/routes/poll/start.get.ts new file mode 100644 index 0000000..5fa67be --- /dev/null +++ b/src/routes/poll/start.get.ts @@ -0,0 +1,61 @@ +import { defineEventHandler, getHeader, createError } from "h3"; +import { start, getRun } from "workflow/api"; +import { Redis } from "@upstash/redis"; +import { env } from "../../../env.js"; +import { pollWorkflow } from "../../workflows/poll.js"; +import { logger } from "../../lib/logger.js"; + +const POLL_WORKFLOW_KEY = "blazebot:poll-workflow"; +const LOCK_KEY = "blazebot:poll-workflow:lock"; +const LOCK_TTL_S = 30; + +const redis = new Redis({ + url: env.AI_WORKFLOW_KV_REST_API_URL, + token: env.AI_WORKFLOW_KV_REST_API_TOKEN, +}); + +async function cancelExisting(): Promise { + const runId = await redis.get(POLL_WORKFLOW_KEY); + if (!runId) return null; + + try { + const run = getRun(runId); + await run.cancel(); + logger.info({ runId }, "poll_workflow_cancelled"); + } catch { + // already dead or not found + } + + await redis.del(POLL_WORKFLOW_KEY); + return runId; +} + +export default defineEventHandler(async (event) => { + if (env.DEPLOY_HOOK_SECRET) { + const auth = getHeader(event, "authorization"); + if (auth !== `Bearer ${env.DEPLOY_HOOK_SECRET}`) { + throw createError({ statusCode: 401, statusMessage: "Unauthorized" }); + } + } + + const acquired = await redis.set(LOCK_KEY, "1", { nx: true, ex: LOCK_TTL_S }); + if (!acquired) { + return { + status: "lock_contention", + message: "Another start request is in progress", + }; + } + + try { + const cancelledRunId = await cancelExisting(); + const handle = await start(pollWorkflow); + await redis.set(POLL_WORKFLOW_KEY, handle.runId); + logger.info({ runId: handle.runId, cancelledRunId }, "poll_workflow_started"); + return { status: "restarted", runId: handle.runId, cancelledRunId }; + } catch (err) { + logger.error({ error: (err as Error).message }, "poll_workflow_start_failed"); + throw createError({ statusCode: 500, statusMessage: (err as Error).message }); + } finally { + await redis.del(LOCK_KEY); + } +}); diff --git a/src/workflows/poll.ts b/src/workflows/poll.ts new file mode 100644 index 0000000..b32948e --- /dev/null +++ b/src/workflows/poll.ts @@ -0,0 +1,189 @@ +import { sleep } from "workflow"; + +async function discoverTickets(): Promise { + "use step"; + const { env } = await import("../../env.js"); + const { createStepAdapters } = await import("../lib/step-adapters.js"); + const { logger } = await import("../lib/logger.js"); + + const { issueTracker } = createStepAdapters(); + + const jql = `project = ${env.JIRA_PROJECT_KEY} AND status = "${env.COLUMN_AI}"`; + const ticketKeys = await issueTracker.searchTickets(jql); + + logger.info({ ticketCount: ticketKeys.length }, "poll_discovered_tickets"); + + return ticketKeys; +} + +async function dispatchTickets(ticketKeys: string[]): Promise { + "use step"; + const { env } = await import("../../env.js"); + const { createStepAdapters } = await import("../lib/step-adapters.js"); + const { logger } = await import("../lib/logger.js"); + const { Sandbox } = await import("@vercel/sandbox"); + const { start } = await import("workflow/api"); + const { implementationWorkflow } = await import("./implementation.js"); + const { reviewFixWorkflow } = await import("./review-fix.js"); + + const { issueTracker, vcs, runRegistry } = createStepAdapters(); + + let activeSandboxes = 0; + try { + let nextCursor: number | null = null; + do { + const { json } = await Sandbox.list({ + limit: 100, + ...(nextCursor != null ? { until: nextCursor } : {}), + }); + activeSandboxes += json.sandboxes.filter( + (s: any) => s.status === "running", + ).length; + nextCursor = json.pagination.next; + } while (nextCursor != null); + } catch { + // If we can't check, assume 0 and let sandbox provisioning fail if truly at capacity + } + + const availableSlots = Math.max( + 0, + env.MAX_CONCURRENT_AGENTS - activeSandboxes, + ); + if (availableSlots === 0) { + logger.info( + { active: activeSandboxes, max: env.MAX_CONCURRENT_AGENTS }, + "poll_at_capacity", + ); + return 0; + } + + const started: string[] = []; + + for (const key of ticketKeys) { + if (started.length >= availableSlots) break; + + const claimed = await runRegistry.claim(key, "claiming"); + if (!claimed) { + logger.info({ ticketKey: key }, "poll_ticket_already_claimed"); + continue; + } + + try { + const ticket = await issueTracker.fetchTicket(key); + const branchName = `blazebot/${ticket.identifier.toLowerCase()}`; + const existingPR = await vcs.findPR(branchName); + + if (existingPR) { + const handle = await start(reviewFixWorkflow, [ticket.id, branchName]); + await runRegistry.register(ticket.identifier, handle.runId); + logger.info( + { + ticketId: ticket.id, + identifier: ticket.identifier, + runId: handle.runId, + }, + "workflow_started_review_fix", + ); + } else { + const handle = await start(implementationWorkflow, [ticket.id]); + await runRegistry.register(ticket.identifier, handle.runId); + logger.info( + { + ticketId: ticket.id, + identifier: ticket.identifier, + runId: handle.runId, + }, + "workflow_started_implementation", + ); + } + + started.push(ticket.identifier); + } catch (err) { + await runRegistry.unregister(key).catch(() => {}); + logger.warn( + { ticketKey: key, error: (err as Error).message }, + "poll_ticket_dispatch_error", + ); + } + } + + return started.length; +} + +async function reconcileRegistry( + ticketKeys: string[], +): Promise<{ cancelled: number; cleaned: number }> { + "use step"; + const { createStepAdapters } = await import("../lib/step-adapters.js"); + const { logger } = await import("../lib/logger.js"); + const { getRun } = await import("workflow/api"); + + const { runRegistry } = createStepAdapters(); + + const aiColumnSet = new Set(ticketKeys); + const activeRuns = await runRegistry.listAll(); + let cancelled = 0; + let cleaned = 0; + + for (const { ticketKey, runId } of activeRuns) { + if (aiColumnSet.has(ticketKey)) { + try { + const run = getRun(runId); + const status = await run.status; + if ( + status === "completed" || + status === "failed" || + status === "cancelled" + ) { + await runRegistry.unregister(ticketKey); + logger.info({ ticketKey, runId, status }, "poll_cleaned_dead_run"); + cleaned++; + } + } catch { + await runRegistry.unregister(ticketKey).catch(() => {}); + logger.warn({ ticketKey, runId }, "poll_cleaned_unreachable_run"); + cleaned++; + } + continue; + } + + try { + const run = getRun(runId); + await run.cancel(); + await runRegistry.unregister(ticketKey); + logger.info({ ticketKey, runId }, "poll_cancelled_stale_run"); + cancelled++; + } catch (err) { + await runRegistry.unregister(ticketKey).catch(() => {}); + logger.warn( + { ticketKey, runId, error: (err as Error).message }, + "poll_stale_run_cleanup_error", + ); + } + } + + return { cancelled, cleaned }; +} + +async function logCycleError(message: string): Promise { + "use step"; + const { logger } = await import("../lib/logger.js"); + logger.warn({ error: message }, "poll_cycle_failed"); +} + +export async function pollWorkflow() { + "use workflow"; + + const { env } = await import("../../env.js"); + + while (true) { + try { + const ticketKeys = await discoverTickets(); + await dispatchTickets(ticketKeys); + await reconcileRegistry(ticketKeys); + } catch (err) { + await logCycleError((err as Error).message); + } + await sleep(env.POLL_INTERVAL_MS); + } +} diff --git a/vercel.json b/vercel.json index e1fe15d..0967ef4 100644 --- a/vercel.json +++ b/vercel.json @@ -1,8 +1 @@ -{ - "crons": [ - { - "path": "/cron/poll", - "schedule": "* * * * *" - } - ] -} +{}