Skip to content

feat: add Vercel Workflow (WDK) for durable agent execution#268

Open
rossmanko wants to merge 57 commits intomainfrom
feat/vercel-workflow-agent
Open

feat: add Vercel Workflow (WDK) for durable agent execution#268
rossmanko wants to merge 57 commits intomainfrom
feat/vercel-workflow-agent

Conversation

@rossmanko
Copy link
Contributor

@rossmanko rossmanko commented Mar 4, 2026

Migrate agent mode to Vercel Workflow to support execution up to 1 hour, removing the 800s serverless function timeout limitation. Feature-flagged via NEXT_PUBLIC_USE_WORKFLOW_AGENT.

  • Add workflow orchestrator and step with full agent logic
  • Add /api/agent-workflow route with pre-processing via prepareAgentPayload
  • Wire abort/cancel logic using existing Redis pub/sub infrastructure
  • Extract rate-limit serialization to shared module
  • Update next.config.ts with withWorkflow, middleware matcher exclusion
  • Feature-flag client routing in chat.tsx

Summary by CodeRabbit

  • New Features

    • Agent workflow support: long-running, resumable agent runs with streaming UI messages, reconnection, and user-triggered cancellation; optional workflow-backed chat transport (feature-flagged).
    • Rate-limit serialization for safe cross-boundary payloads.
  • Infrastructure

    • Workflow runtime integrated and workflow-aware logging added; app config wrapped to enable workflows.
  • Tests

    • Added workflow transport mocks and test mapping.
  • Bug Fixes

    • Improved stream reconnect and API error handling for more robust client behavior.

Migrate agent mode to Vercel Workflow to support execution up to 1 hour,
removing the 800s serverless function timeout limitation. Feature-flagged
via NEXT_PUBLIC_USE_WORKFLOW_AGENT.

- Add workflow orchestrator and step with full agent logic
- Add /api/agent-workflow route with pre-processing via prepareAgentPayload
- Wire abort/cancel logic using existing Redis pub/sub infrastructure
- Extract rate-limit serialization to shared module
- Update next.config.ts with withWorkflow, middleware matcher exclusion
- Feature-flag client routing in chat.tsx

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@vercel
Copy link

vercel bot commented Mar 4, 2026

The latest updates on your projects. Learn more about Vercel for GitHub.

Project Deployment Actions Updated (UTC)
hackerai Ready Ready Preview, Comment Mar 20, 2026 3:55pm

Request Review

@coderabbitai
Copy link

coderabbitai bot commented Mar 4, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Adds durable workflow-based agent support: new workflow and step, start/cancel/stream server routes, client hybrid transport to run/reconnect workflows, rate-limit serialization, Axiom workflow logger, middleware/config updates, mocks, and supporting utilities.

Changes

Cohort / File(s) Summary
Workflow API & Start
app/api/agent-workflow/route.ts, app/api/agent-workflow/cancel/route.ts, app/api/agent-workflow/[id]/stream/route.ts
New endpoints to start workflows, cancel runs, and stream workflow output. Exports maxDuration constants; handle auth, run lookup, streaming/reconnect semantics, error scoping to ChatSDKError, and graceful finish streams.
Workflow Implementation
workflows/agent-workflow.ts, workflows/agent-step.ts
New durable workflow and primary step runAgentStep implementing the agent loop: streaming UI messages into the workflow, cancellation (Redis), sandbox/file handling, usage accounting, provider fallback/retry, persistence, and cleanup.
Client Chat Integration
app/components/chat.tsx
Introduces hybrid transport (feature-flagged by NEXT_PUBLIC_USE_WORKFLOW_AGENT) choosing between default and workflow transports; centralizes message preparation; captures workflow run IDs for reconnect/cancel; routes to /api/agent-workflow and reconnect stream endpoint when active.
Rate Limit Serialization & Payloads
lib/api/rate-limit-serialization.ts, lib/api/prepare-agent-payload.ts
Adds SerializableRateLimitInfo with serialize/deserialize helpers; prepareAgentPayload gains allowedMode (`"agent"
Logging & Types
lib/logger.ts, lib/api/chat-logger.ts, lib/axiom/workflow.ts
Added ChatApiEndpoint type and updated event/builder/config types. Added workflowAxiomLogger configured with Axiom transport for workflow logs.
Utilities & Error Handling
lib/utils.ts, lib/api/chat-logger.ts
fetchWithErrorHandlers now falls back to a ChatSDKError with HTTP status when response JSON parsing fails; ChatLoggerConfig.endpoint generalized to ChatApiEndpoint.
Streaming Reconnect & Chat Stream
app/api/chat/[id]/stream/route.ts
Enhanced reconnect flow instrumentation, restructured resumable stream handling, explicit abort/timeout wiring, and ReadableStream pull/cancel control with added logs.
Middleware, Next Config & Packages
middleware.ts, next.config.ts, package.json
Excluded .well-known/workflow/ in middleware matcher; wrapped Next config with withWorkflow(nextConfig); added runtime deps workflow and @workflow/ai.
Mocks & Tests
__mocks__/workflow-ai.ts, jest.config.js
Added WorkflowChatTransport mock returning ReadableStream; mapped @workflow/ai to mock in Jest config.
Other minor changes
lib/axiom/workflow.ts, lib/utils.ts, lib/api/chat-logger.ts
New Axiom logger for workflows; minor type/signature updates and safer error parsing in fetch helpers.

Sequence Diagram

sequenceDiagram
    participant Client
    participant ChatUI as Chat Component
    participant StartAPI as /api/agent-workflow
    participant WorkflowEngine as Workflow Runtime
    participant AgentStep as runAgentStep
    participant LLM as LLM Provider
    participant DB as Database

    Client->>ChatUI: send messages (select agent mode)
    ChatUI->>StartAPI: POST prepareAgentPayload -> start workflow
    StartAPI->>WorkflowEngine: start(agentWorkflow, payload)
    WorkflowEngine->>AgentStep: invoke runAgentStep(payload)
    AgentStep->>LLM: stream completions / tool calls
    LLM-->>AgentStep: stream tokens / tool results
    AgentStep->>DB: persist messages, files, usage
    AgentStep-->>WorkflowEngine: write UI stream events
    Client->>StartAPI: GET /api/agent-workflow/{id}/stream (reconnect)
    StartAPI->>WorkflowEngine: proxy stream data to client
Loading

Estimated Code Review Effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Poem

🐇 I hopped into workflows, neat and spry,
Streams and runs beneath the sky,
Messages zipped, reconnect in sight,
Cancels and logs all tidy and bright,
A tiny rabbit cheers: "Agent, fly!"

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 41.67% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly summarizes the main change: adding Vercel Workflow (WDK) integration for durable agent execution with extended timeout support.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/vercel-workflow-agent

Comment @coderabbitai help to get the list of available commands and usage tips.

Tip

CodeRabbit can generate a title for your PR based on the changes.

Add @coderabbitai placeholder anywhere in the title of your PR and CodeRabbit will replace it with a title based on the changes in the PR. You can change the placeholder by changing the reviews.auto_title_placeholder setting.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
lib/api/rate-limit-serialization.ts (1)

51-58: Prefer returning RateLimitInfo from deserializeRateLimitInfo.

The inline return type can drift from the canonical RateLimitInfo contract over time. Using RateLimitInfo directly keeps this utility aligned with the source type.

♻️ Suggested change
 export function deserializeRateLimitInfo(info: SerializableRateLimitInfo): {
-  remaining: number;
-  resetTime: Date;
-  limit: number;
-  session?: { remaining: number; limit: number; resetTime: Date };
-  weekly?: { remaining: number; limit: number; resetTime: Date };
-  extraUsagePointsDeducted?: number;
-} {
+  returnTypeMarker?: never;
+}
+
+export function deserializeRateLimitInfo(
+  info: SerializableRateLimitInfo,
+): RateLimitInfo {
   return {
     ...info,
     resetTime: new Date(info.resetTime),
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@lib/api/rate-limit-serialization.ts` around lines 51 - 58, Change the return
type of deserializeRateLimitInfo from the inline object signature to the
canonical RateLimitInfo type to keep the utility aligned with the source
contract; update the function signature for deserializeRateLimitInfo to return
RateLimitInfo and ensure RateLimitInfo (and SerializableRateLimitInfo if not
already) is imported from the module that defines the canonical types, then run
type checks to fix any mismatches between the current implementation and the
RateLimitInfo shape.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@app/api/agent-workflow/route.ts`:
- Around line 17-30: The analytics call after starting the durable run
(start(agentWorkflow, [payload]) -> run) must not be allowed to throw or block
the HTTP response; wrap PostHogClient usage (PostHogClient(), posthog.capture,
posthog.flush) in a non-failing, non-blocking path: check posthog, then call
capture/flush inside a try/catch and swallow/log errors (or run them as
fire-and-forget so you don't await flush), ensuring any exceptions from posthog
do not propagate to the caller and do not affect the started run.

In `@workflows/agent-step.ts`:
- Around line 668-680: The writer is only having its lock released but never
closed, so after draining uiStream (the loop using reader.read()) ensure you
call wWriter.close() to signal EOF to consumers; specifically, in the finally
block (after the loop completes) call await wWriter.close() (or wWriter.close()
if synchronous) before calling wWriter.releaseLock(), and keep
reader.releaseLock() for the uiStream reader to fully clean up; this change
involves the uiStream.getReader()/reader, writable.getWriter()/wWriter logic
shown in the diff.

---

Nitpick comments:
In `@lib/api/rate-limit-serialization.ts`:
- Around line 51-58: Change the return type of deserializeRateLimitInfo from the
inline object signature to the canonical RateLimitInfo type to keep the utility
aligned with the source contract; update the function signature for
deserializeRateLimitInfo to return RateLimitInfo and ensure RateLimitInfo (and
SerializableRateLimitInfo if not already) is imported from the module that
defines the canonical types, then run type checks to fix any mismatches between
the current implementation and the RateLimitInfo shape.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7a4de25 and 3939caf.

⛔ Files ignored due to path filters (1)
  • pnpm-lock.yaml is excluded by !**/pnpm-lock.yaml
📒 Files selected for processing (11)
  • app/api/agent-workflow/route.ts
  • app/components/chat.tsx
  • lib/api/chat-logger.ts
  • lib/api/prepare-agent-payload.ts
  • lib/api/rate-limit-serialization.ts
  • lib/logger.ts
  • middleware.ts
  • next.config.ts
  • package.json
  • workflows/agent-step.ts
  • workflows/agent-workflow.ts

…den resumable stream infrastructure

- Install @workflow/ai and use WorkflowChatTransport for agent mode with
  auto-reconnection on Vercel function timeouts (while(!gotFinish) loop)
- Create hybrid proxy transport that delegates to WorkflowChatTransport
  for agent mode and DefaultChatTransport for ask mode
- Add /api/agent-workflow/[id]/stream endpoint returning empty finish
  streams for completed runs (prevents reconnect retry loops)
- Add /api/agent-workflow/cancel endpoint for stop button support
- Remove workflow fallback from /api/chat/[id]/stream (now handled by
  WorkflowChatTransport directly)
- Add prepareForNewStream cleanup in agent-step.ts outer catch to
  prevent stale active_stream_id on workflow crashes
- Add startIndex validation, Axiom logging, UsageRefundTracker,
  incomplete response fallback, and error emission to agent step
- Add route-level error handling with ChatSDKError and chatLogger
- Add @workflow/ai Jest mock for ESM compatibility in tests

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
app/components/chat.tsx (1)

865-879: ⚠️ Potential issue | 🟠 Major

Use active_stream_id as a cancellation fallback.

At Line 871, stop only reads workflowRunIdRef.current. After reconnect/page reload, this ref can be empty while chatDataRef.current.active_stream_id is still wrun_*, so the server workflow may continue running after user stop.

Suggested fix
-        const runId = workflowRunIdRef.current;
+        const activeStreamId = chatDataRef.current?.active_stream_id;
+        const runId =
+          workflowRunIdRef.current ??
+          (activeStreamId?.startsWith("wrun_") ? activeStreamId : null);
         if (runId) {
           workflowRunIdRef.current = null;
           fetch("/api/agent-workflow/cancel", {
             method: "POST",
             headers: { "Content-Type": "application/json" },
             body: JSON.stringify({ runId }),
           }).catch(() => {});
         }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/components/chat.tsx` around lines 865 - 879, The stop callback currently
cancels agentLong or uses workflowRunIdRef.current to cancel the server
workflow; add a fallback to read chatDataRef.current.active_stream_id when
workflowRunIdRef.current is empty (and ensure it matches the expected "wrun_*"
form), use that value as runId for the POST to /api/agent-workflow/cancel, and
then clear workflowRunIdRef.current and chatDataRef.current.active_stream_id as
appropriate; update the stop function (referencing stop, agentLong,
workflowRunIdRef, chatDataRef, and the fetch("/api/agent-workflow/cancel") call)
so the server workflow is cancelled even after reconnects or reloads.
♻️ Duplicate comments (2)
app/api/agent-workflow/route.ts (1)

23-40: ⚠️ Potential issue | 🔴 Critical

Do not let post-start side-effects fail the request after a durable run is created.

After Line 23 starts the workflow, Line 27 (startStream) and Line 39 (await posthog.flush()) can still fail and return an error, which can trigger client retries and duplicate runs.

🛡️ Suggested hardening
     const run = await start(agentWorkflow, [payload]);

-    await startStream({ chatId: payload.chatId, streamId: run.runId });
+    try {
+      await startStream({ chatId: payload.chatId, streamId: run.runId });
+    } catch (streamPersistError) {
+      console.warn("[agent-workflow] failed to persist active stream id", streamPersistError);
+    }

     const posthog = PostHogClient();
     if (posthog) {
-      posthog.capture({
-        distinctId: payload.userId,
-        event: "hackerai-agent-workflow",
-        properties: {
-          regenerate: payload.regenerate,
-          ...(payload.subscription && { subscription: payload.subscription }),
-        },
-      });
-      await posthog.flush();
+      try {
+        posthog.capture({
+          distinctId: payload.userId,
+          event: "hackerai-agent-workflow",
+          properties: {
+            regenerate: payload.regenerate,
+            ...(payload.subscription && { subscription: payload.subscription }),
+          },
+        });
+        void posthog.flush();
+      } catch (analyticsError) {
+        console.warn("[agent-workflow] posthog capture failed", analyticsError);
+      }
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/api/agent-workflow/route.ts` around lines 23 - 40, After creating the
durable run via start(agentWorkflow, [payload]) you must not let ancillary
side-effects abort the request; wrap calls to startStream({ chatId:
payload.chatId, streamId: run.runId }) and the PostHog block (PostHogClient(),
posthog.capture(...), await posthog.flush()) in separate try/catch blocks, log
any caught errors (e.g., processLogger.error or console.error) and do not
rethrow them so the route returns success even if these post-start actions fail;
keep the call to start(...) intact and only swallow/log failures from
startStream and posthog.flush/capture.
workflows/agent-step.ts (1)

962-974: ⚠️ Potential issue | 🟠 Major

Close the workflow writer after the read loop completes.

At Line 964, the writer is acquired and only released (Line 973), never closed. Release does not signal stream completion.

Suggested fix
   try {
     while (true) {
       const { done, value } = await reader.read();
       if (done) break;
       await wWriter.write(value);
     }
+    await wWriter.close();
   } finally {
     reader.releaseLock();
     wWriter.releaseLock();
   }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@workflows/agent-step.ts` around lines 962 - 974, The writable stream writer
acquired via writable.getWriter() is never closed, only released, so change the
finally block to close the writer after the read-loop completes: after the loop
finishes (or in the finally), call await wWriter.close() (or wWriter.close() if
synchronous) before wWriter.releaseLock(); ensure you still release
reader.releaseLock() and guard the close call by checking wWriter is
defined/locked to avoid exceptions; reference uiStream.getReader(),
writable.getWriter(), reader.releaseLock(), and wWriter.releaseLock().
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@app/api/agent-workflow/`[id]/stream/route.ts:
- Around line 40-58: Before creating the stream, enforce that the authenticated
user is allowed to access the requested runId: after calling getRun(runId) and
before reading run.status or calling run.getReadable, fetch the request's
authenticated user (e.g., from req or your auth helper) and compare it against
the run's owner or allowedUsers property on the run object; if the user is not
authorized, return a 403/emptyFinishResponse (or appropriate error) instead of
proceeding to createUIMessageStreamResponse; update the logic around getRun,
status check, and run.getReadable to include this authorization gate.
- Around line 30-33: Wrap the call to getUserIDAndPro in a try/catch inside the
route handler and explicitly handle ChatSDKError: if catch(e) and e instanceof
ChatSDKError return new Response("Unauthorized", { status: 401 }); otherwise
rethrow or let the error propagate so non-auth errors produce the appropriate
5xx handling; keep the existing userId null-check (const { userId } = await
getUserIDAndPro(...)) logic but move it into the try block to ensure auth
exceptions are caught and converted to the 401 response.

In `@app/api/agent-workflow/cancel/route.ts`:
- Around line 7-10: The route currently treats all errors from getUserIDAndPro
as internal errors; update the cancel route handler to detect and map
authentication/authorization failures from getUserIDAndPro to proper HTTP
responses (e.g., return 401 or 403) while only converting unexpected exceptions
into 500s. Locate the calls to getUserIDAndPro in this file (both at the top
block and the second occurrence around lines 25-31) and change the catch logic
to inspect the thrown error (use error type, code, or message) to return new
Response("Unauthorized", { status: 401 }) or new Response("Forbidden", { status:
403 }) for auth-related errors, and rethrow or return new Response("Internal
Server Error", { status: 500 }) for other errors.
- Around line 17-23: The cancel endpoint calls getRun(runId) and cancels the run
without verifying ownership (uses run.status and run.cancel), allowing any
authenticated user to cancel others' runs; before calling run.cancel(), fetch
the current user's id from the session/request (the existing auth/session helper
used elsewhere in this route), compare it to the run's owner id (e.g.,
run.ownerId or run.userId on the returned run object), and if they don't match
(unless the caller has an admin role), return a 403 response instead of
proceeding to call run.cancel().

In `@app/api/chat/`[id]/stream/route.ts:
- Around line 119-191: The pull handler currently adds a new abort listener on
every invocation (causing buildup) and several exit paths (done/AbortError/other
errors) call controller.close()/controller.error() without stopping
cancellationSubscriber or cancelling the reader; move the abort listener
registration off the per-chunk pull (attach it once when creating
abortableStream or use a single shared handler bound to abortController.signal)
and ensure every exit/cleanup path (the done branch, AbortError branch, and the
generic catch branch) calls cancellationSubscriber.stop() and reader.cancel()
(and clear preemptiveTimeout) before closing/erroring the controller; also keep
the existing logging/flush behavior for preemptive cases.

In `@lib/axiom/workflow.ts`:
- Line 8: Replace the non-null assertion on process.env.AXIOM_DATASET used in
the transport config (the dataset property in lib/axiom/workflow.ts) with an
explicit guard: read const dataset = process.env.AXIOM_DATASET; if dataset is
falsy, throw a clear Error (e.g. "AXIOM_DATASET environment variable is
required") so the process fails fast; then use that dataset variable in the
transport/config where dataset: process.env.AXIOM_DATASET! currently appears.

In `@lib/utils.ts`:
- Around line 35-44: The JSON-parsing branch currently assumes the parsed object
is a valid SDK error; add a validation step (e.g., an isValidErrorPayload type
guard) that checks parsed.code is a known ErrorCode string and parsed.cause is a
string (or acceptable type) before constructing new ChatSDKError; if validation
fails, fall back to throwing the generic ChatSDKError with the status message.
Update the try/catch in the response handling (where response.json() is called
and ChatSDKError is constructed) to only throw new ChatSDKError(code as
ErrorCode, cause) when the payload passes the validator, otherwise throw the
existing `bad_request:api` ChatSDKError so malformed JSON won’t be
misclassified.

---

Outside diff comments:
In `@app/components/chat.tsx`:
- Around line 865-879: The stop callback currently cancels agentLong or uses
workflowRunIdRef.current to cancel the server workflow; add a fallback to read
chatDataRef.current.active_stream_id when workflowRunIdRef.current is empty (and
ensure it matches the expected "wrun_*" form), use that value as runId for the
POST to /api/agent-workflow/cancel, and then clear workflowRunIdRef.current and
chatDataRef.current.active_stream_id as appropriate; update the stop function
(referencing stop, agentLong, workflowRunIdRef, chatDataRef, and the
fetch("/api/agent-workflow/cancel") call) so the server workflow is cancelled
even after reconnects or reloads.

---

Duplicate comments:
In `@app/api/agent-workflow/route.ts`:
- Around line 23-40: After creating the durable run via start(agentWorkflow,
[payload]) you must not let ancillary side-effects abort the request; wrap calls
to startStream({ chatId: payload.chatId, streamId: run.runId }) and the PostHog
block (PostHogClient(), posthog.capture(...), await posthog.flush()) in separate
try/catch blocks, log any caught errors (e.g., processLogger.error or
console.error) and do not rethrow them so the route returns success even if
these post-start actions fail; keep the call to start(...) intact and only
swallow/log failures from startStream and posthog.flush/capture.

In `@workflows/agent-step.ts`:
- Around line 962-974: The writable stream writer acquired via
writable.getWriter() is never closed, only released, so change the finally block
to close the writer after the read-loop completes: after the loop finishes (or
in the finally), call await wWriter.close() (or wWriter.close() if synchronous)
before wWriter.releaseLock(); ensure you still release reader.releaseLock() and
guard the close call by checking wWriter is defined/locked to avoid exceptions;
reference uiStream.getReader(), writable.getWriter(), reader.releaseLock(), and
wWriter.releaseLock().

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: a74b7611-25b9-4aba-91bf-7d0fa38b99c7

📥 Commits

Reviewing files that changed from the base of the PR and between 3939caf and 0fc80f7.

⛔ Files ignored due to path filters (1)
  • pnpm-lock.yaml is excluded by !**/pnpm-lock.yaml
📒 Files selected for processing (12)
  • __mocks__/workflow-ai.ts
  • app/api/agent-workflow/[id]/stream/route.ts
  • app/api/agent-workflow/cancel/route.ts
  • app/api/agent-workflow/route.ts
  • app/api/chat/[id]/stream/route.ts
  • app/components/chat.tsx
  • jest.config.js
  • lib/axiom/workflow.ts
  • lib/logger.ts
  • lib/utils.ts
  • package.json
  • workflows/agent-step.ts
🚧 Files skipped from review as they are similar to previous changes (1)
  • package.json

…etion

Two issues prevented WorkflowChatTransport from transitioning to "ready":

1. The agent step's manual pipe loop released the writable writer lock
   without closing it, so the Workflow never signaled end-of-stream.
   Replaced with pipeTo() which closes the writable when the readable
   ends, handles backpressure, and aborts on error.

2. The reconnect endpoint's emptyFinishResponse() created an empty
   stream with no finish chunk, causing the transport's reconnect loop
   to retry indefinitely. Now explicitly writes a finish chunk.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (2)
app/api/agent-workflow/[id]/stream/route.ts (2)

34-37: ⚠️ Potential issue | 🟠 Major

Handle auth exceptions from getUserIDAndPro explicitly.

getUserIDAndPro(req) can throw on auth failure; currently that can bypass the 401 branch and surface as 5xx.

Suggested fix
+import { ChatSDKError } from "@/lib/errors";
@@
-  const { userId } = await getUserIDAndPro(req);
+  let userId: string;
+  try {
+    ({ userId } = await getUserIDAndPro(req));
+  } catch (error) {
+    if (error instanceof ChatSDKError) {
+      return new Response("Unauthorized", { status: 401 });
+    }
+    throw error;
+  }
   if (!userId) {
     return new Response("Unauthorized", { status: 401 });
   }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/api/agent-workflow/`[id]/stream/route.ts around lines 34 - 37,
getUserIDAndPro(req) can throw on auth failures; wrap the call to
getUserIDAndPro inside a try/catch around the const { userId } = await
getUserIDAndPro(req) in route handler and if it throws return a
Response("Unauthorized", { status: 401 }) (or map auth-specific errors to 401)
instead of letting errors bubble to a 5xx; ensure you still handle the case
where the call succeeds but userId is falsy by keeping the existing if (!userId)
return 401 logic.

44-52: ⚠️ Potential issue | 🔴 Critical

Authorize runId access before streaming.

After generic auth, the handler streams by runId without proving that the run belongs to the authenticated user. That can expose another user’s stream if runId leaks.

#!/bin/bash
# Verify whether run ownership is enforced anywhere for workflow stream reconnects.
# Expected: a clear check tying runId -> chat/session owner -> authenticated userId.

rg -n -C3 --type=ts '\bgetRun\s*\('
rg -n -C3 --type=ts 'active_stream_id|stream_id|runId|workflow.*id'
rg -n -C3 --type=ts 'userId.*(===|==).*run|run.*(owner|user|chat)'

If no ownership gate exists, add one before status/readable access and return 403 (or a finish response) for unauthorized run IDs.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/api/agent-workflow/`[id]/stream/route.ts around lines 44 - 52, Ensure the
handler authorizes access to the requested runId before streaming: after calling
getRun(runId) (or before any await run.status or reading run.readable), verify
the run’s owner/associated user or session matches the authenticated userId
(e.g., compare run.ownerId/run.userId/run.sessionOwner to req.user.id or the
session principal). If the ownership check fails, return a forbidden response
(HTTP 403) or the existing emptyFinishResponse() so the reconnect loop
terminates; add this check near the getRun/run.status usage and short-circuit
before any stream access.
🧹 Nitpick comments (1)
workflows/agent-step.ts (1)

635-708: Extract shared finalize/persist flow to one helper.

Fallback and normal onFinish branches duplicate chat-update, file metadata, and message persistence logic. This is drift-prone and harder to maintain.

Also applies to: 791-897

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@workflows/agent-step.ts` around lines 635 - 708, The finalize/persist logic
in the onFinish branches is duplicated; extract the shared flow into a single
helper (e.g., finalizeAndPersistStream) that accepts parameters needed by both
paths (chatId, userId, generatedTitle or temporary flag, streamFinishReason,
streamUsage, responseModel, fallbackStartTime, summarizationParts,
retryMessages, retryMessageId, mode, selectedModelOverride, sandboxManager).
Move calls to getTodoManager().mergeWith, updateChat/prepareForNewStream,
getFileAccumulator().getAll, mapping newFileIds, looping retryMessages and
calling saveMessage, and sendFileMetadataToStream into that helper, and call
deleteTempStreamForBackend only from the temporary branch; replace the
duplicated blocks in both locations (including the other block at lines
mentioned in the comment) with calls to this new helper, preserving existing
parameter values and behavior.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@workflows/agent-step.ts`:
- Around line 175-183: The call to createCancellationSubscriber(...) can throw
before the inner stream try/catch runs, skipping refund/cleanup; wrap the
creation in its own try/catch (around the const cancellationSubscriber = await
createCancellationSubscriber({...})), and in the catch invoke the same
refund/cleanup/error-handling path you use inside the stream (e.g., call the
existing refund function or cleanup logic and set subscriberStopped as needed),
then rethrow or return to ensure consistent behavior; ensure you still propagate
or handle the error so upstream code knows creation failed.

---

Duplicate comments:
In `@app/api/agent-workflow/`[id]/stream/route.ts:
- Around line 34-37: getUserIDAndPro(req) can throw on auth failures; wrap the
call to getUserIDAndPro inside a try/catch around the const { userId } = await
getUserIDAndPro(req) in route handler and if it throws return a
Response("Unauthorized", { status: 401 }) (or map auth-specific errors to 401)
instead of letting errors bubble to a 5xx; ensure you still handle the case
where the call succeeds but userId is falsy by keeping the existing if (!userId)
return 401 logic.
- Around line 44-52: Ensure the handler authorizes access to the requested runId
before streaming: after calling getRun(runId) (or before any await run.status or
reading run.readable), verify the run’s owner/associated user or session matches
the authenticated userId (e.g., compare run.ownerId/run.userId/run.sessionOwner
to req.user.id or the session principal). If the ownership check fails, return a
forbidden response (HTTP 403) or the existing emptyFinishResponse() so the
reconnect loop terminates; add this check near the getRun/run.status usage and
short-circuit before any stream access.

---

Nitpick comments:
In `@workflows/agent-step.ts`:
- Around line 635-708: The finalize/persist logic in the onFinish branches is
duplicated; extract the shared flow into a single helper (e.g.,
finalizeAndPersistStream) that accepts parameters needed by both paths (chatId,
userId, generatedTitle or temporary flag, streamFinishReason, streamUsage,
responseModel, fallbackStartTime, summarizationParts, retryMessages,
retryMessageId, mode, selectedModelOverride, sandboxManager). Move calls to
getTodoManager().mergeWith, updateChat/prepareForNewStream,
getFileAccumulator().getAll, mapping newFileIds, looping retryMessages and
calling saveMessage, and sendFileMetadataToStream into that helper, and call
deleteTempStreamForBackend only from the temporary branch; replace the
duplicated blocks in both locations (including the other block at lines
mentioned in the comment) with calls to this new helper, preserving existing
parameter values and behavior.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: b0d57704-02c1-439a-83aa-4ac4024c73b5

📥 Commits

Reviewing files that changed from the base of the PR and between 0fc80f7 and a838035.

📒 Files selected for processing (2)
  • app/api/agent-workflow/[id]/stream/route.ts
  • workflows/agent-step.ts

Eliminated the PostHog client usage from the agent workflow's POST handler, streamlining the code by removing event capturing and flushing logic. This change simplifies the workflow without affecting its core functionality.
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
app/components/chat.tsx (1)

471-472: Move ref sync out of render for React-idiomatic purity.

At lines 471-472, mutating setMessagesRef.current during render violates React 19 principles. React's compiler explicitly discourages reading or writing ref.current in render to preserve render purity. Sync the ref in useEffect instead:

-  // Update ref so transport callbacks can access setMessages (breaks circular dependency)
-  setMessagesRef.current = setMessages;
+  // Update ref so transport callbacks can access setMessages (breaks circular dependency)
+  useEffect(() => {
+    setMessagesRef.current = setMessages;
+  }, [setMessages]);
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/components/chat.tsx` around lines 471 - 472, Move the synchronous
mutation of setMessagesRef.current out of render and into an effect: remove the
line "setMessagesRef.current = setMessages" from the render body and instead set
it inside a useEffect (e.g., useEffect(() => { setMessagesRef.current =
setMessages }, [setMessages])) within the same component so the ref is updated
after render in an idiomatic React way; reference setMessagesRef and setMessages
when updating and ensure the useEffect is declared inside the Chat component (or
the component that defines setMessages).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@app/components/chat.tsx`:
- Around line 471-472: Move the synchronous mutation of setMessagesRef.current
out of render and into an effect: remove the line "setMessagesRef.current =
setMessages" from the render body and instead set it inside a useEffect (e.g.,
useEffect(() => { setMessagesRef.current = setMessages }, [setMessages])) within
the same component so the ref is updated after render in an idiomatic React way;
reference setMessagesRef and setMessages when updating and ensure the useEffect
is declared inside the Chat component (or the component that defines
setMessages).

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 2f3cdcd9-95fc-41b9-adf3-df36aecf83fe

📥 Commits

Reviewing files that changed from the base of the PR and between 4cb4113 and 8181263.

⛔ Files ignored due to path filters (1)
  • pnpm-lock.yaml is excluded by !**/pnpm-lock.yaml
📒 Files selected for processing (1)
  • app/components/chat.tsx

Remove the NEXT_PUBLIC_USE_WORKFLOW_AGENT environment variable gate
so the workflow transport is always used for agent mode.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…reaming

Uses Convex real-time active_stream_id and active_trigger_run_id fields
to display a spinning loader icon on any chat currently generating a response.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Extract the ~800 lines of shared logic between chat-handler.ts and
agent-step.ts into a new agent-stream-core.ts module. Both files are
now thin wrappers over createAgentStreamExecute(), reducing agent-step
from 967 to 133 lines and chat-handler from 1306 to 389 lines.

Also fixes several gaps in the workflow path vs the serverless path:
- Adopt UsageTracker (was using raw variables, missing cache tokens,
  sandbox cost tracking, and usage record logging)
- Add missing tools/providerOptions args to runSummarizationStep
- Track summarization usage (cache read/write tokens, cost)
- Pass onToolCost callback to createTools for per-tool cost tracking

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
When clicking abort, the WorkflowChatTransport's reconnect loop would
fire ~8+ duplicate stream?startIndex=N requests because the workflow
run status remained "running" while backend cleanup propagated. Now we
call run.cancel() via /api/agent-workflow/cancel in parallel with the
existing Convex cancellation, so the stream route immediately returns
a finish response and the transport stops retrying.

Also wraps stop() in try/catch to handle the expected
"BodyStreamBuffer was aborted" error from aborting mid-read.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Configure app/.well-known/workflow/v1/step/route.js maxDuration to 800s
- Allows workflow step handler to run up to Vercel's max function duration

Made-with: Cursor
The reader should run until __done or the function is killed. Step
preemption is handled by the workflow's timeBudgetMs, not the reader.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…get to 180s

- Send keepalive data chunk every 5s when XREAD blocks to prevent
  Vercel idle timeout from killing the streaming response
- Remove "insufficient time remaining" guard that refused to start
  tool execution near budget expiry (tool preemption still active)
- Bump timeBudgetMs to 180s for testing

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The keepalive { type: "data", data: [] } fails UIMessageChunk schema
validation, causing reconnection to fail after 10 consecutive errors.
Remove it — the transport's reconnection mechanism works without it.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
useAutoResume previously only called resumeStream() when the last
message was from the user. Now also resumes when there's an active
stream ID (rstream_* or wrun_*), enabling reconnection even when
the assistant has already started responding.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
mschead and others added 2 commits March 17, 2026 22:07
…nk ordering errors

Strip the abort signal from fetch in WorkflowChatTransport and proxy the
response body through a reader we cancel ourselves on abort — pending reads
resolve with {done:true} instead of throwing "BodyStreamBuffer was aborted",
which eliminates the console.error noise and the reconnection cascade that
followed.

Also guard onError against AbortError and auto-clear UIMessageStreamError
("No tool invocation found for tool call ID") so the chat recovers to a
ready state instead of being stuck on a red error banner.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…k replay

Add resetStream() to clear old chunks from the Redis stream before starting
a new workflow run. Without this, regenerating a chat could replay tool-result
chunks from the previous run, causing "No tool invocation found" errors on
the client.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Remove the Redis streaming feature flag (hardcoded to 100%) and always
use Redis Streams transport. Delete the unused Vercel Workflow stream
reconnect route, wrun_ prefix handling, verbose logStep instrumentation,
abort entry/exit logging, and debug console.logs scattered across the
workflow pipeline.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Remove the legacy agent-long mode, Trigger.dev task, streaming hook,
and API routes. Keep "agent-long" only in Convex schema unions to avoid
breaking old documents. Old chats with agent-long slug now map to agent
mode on load.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
mschead and others added 2 commits March 18, 2026 17:00
Add a "Long" toggle next to the mode selector that controls whether
agent requests use the serverless path (/api/agent) or the durable
Vercel Workflow path (/api/agent-workflow + Redis Streams). The toggle
is persisted to localStorage, only visible in agent mode, and disabled
while streaming to prevent mid-run transport switching.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…nnect

After workflow completion, WorkflowChatTransport reconnects and replays
all chunks from Redis (startIndex=0), causing duplicate Reasoning blocks
in the UI. Fix by checking active_stream_id in the redis-stream GET
route — if cleared (workflow done), return an empty finish response so
the transport stops reconnecting. Also wrap controller.close() in the
__done handler with try/catch to prevent crash when client disconnects
before the sentinel is processed.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Set workflow step budget to 750s (50s buffer before Vercel 800s limit)
and tool preemption buffer to 5min (commands preempt at ~450s mark).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
mschead and others added 2 commits March 19, 2026 13:11
… completion

When a workflow finishes but active_stream_id is never cleared (or the
Redis stream TTL expires before the client reconnects), the redis-stream
GET route enters an infinite XREAD BLOCK loop with no data, leaving the
chat permanently frozen.

Two-layer fix:
- Route checks streamKeyExists() upfront and returns a finish response
  immediately if the key is gone (fast path for expired TTL).
- createRedisChunkReadable() tracks consecutive empty reads and breaks
  out after ~30s of silence (safety net for crashed workflows).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
When the reconnect endpoint detects a stale workflow (active_stream_id
set but Redis key gone), clear active_stream_id so the sidebar loading
indicator stops showing for completed chats.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ow streaming

Bug A — startIndex mismatch: WorkflowChatTransport sends a sequential
chunk counter as startIndex but Redis XREAD expects timestamp-based
stream IDs. Add resolveStartId() to convert chunk counter to correct
Redis stream ID via XRANGE, preventing full stream replay on reconnect.

Bug B — missing __done sentinel: await appendChunk and markStreamDone
in agent-step instead of fire-and-forget, ensuring the sentinel is
always written before the step exits.

Bug C — infinite reconnect loop: emit a synthetic finish chunk when
the stale timeout fires so WorkflowChatTransport stops reconnecting.

Bug D — premature finish during checkpoint transitions: write a
__checkpoint marker to Redis when the step checkpoints. The reader
resets its stale counter and extends timeout to 2min (3min absolute
cap) so the next step has time to start.

Bug E — warm-up reasoning noise: suppress reasoning and step-start
chunks at the beginning of continuation steps until the model produces
real content, preventing UI flood after checkpoint.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
mschead and others added 2 commits March 20, 2026 11:21
Merge origin/main into feat/vercel-workflow-agent, resolving conflicts in
ChatInputToolbar, chat.tsx, GlobalState, tools/index, chat-handler, utils,
and package.json. Kept feature branch's workflow transport, agentLongMode,
and createAgentStreamExecute refactor while incorporating main's ask-only
model selector, HackingSuggestions, metadata in ChatSDKError, statusRef
pattern, and updated workos/dependency versions. Removed dead
tauriCmdServer references.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Replace Redis reader with run.getReadable() in POST handler so chunks
  flow directly from the AI to the client (same perf as normal agent mode).
  Redis is now a cache for reconnects, not the primary transport.
- Add 15s heartbeat interval in workflow step to prevent the Redis reader's
  30s stale timeout from firing during long tool executions (nmap, sqlmap).
- Handle __heartbeat sentinel in Redis chunk reader.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants