chore(kit): drain helper extraction + ci pins + buildkit secret#125
Conversation
- Extract drainWebhookEventBatches helper with unit tests; webhooks.ts now delegates the live SSE drain loop with the same advance/abort semantics so the cohort + iteration-limit edges are testable. - Drop the redundant by_project_and_received_and_creation Convex index (Convex auto-appends _creationTime, so by_project_and_received already serves both webhookEventsSince and latestWebhookEventsSince). - Add real-HTTP SSE integration test for connectWebhookStream using a Bun.serve + fetch-based EventSource shim. - Pass VITE_KIT_MIXPANEL_TOKEN as a BuildKit secret instead of an ARG/ENV pair (Dockerfile + deploy-kit.yml + deploy-prod.sh) to silence Docker's TOKEN-named warning while still inlining the public token. - Bump actions/checkout v4 -> v6, docker/setup-buildx v3 -> v4, and docker/build-push v6 -> v7 across CI workflows. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
/gemini review |
|
Warning Rate limit exceeded
To keep reviews running without waiting, you can enable usage-based add-on for your organization. This allows additional reviews beyond the hourly cap. Account admins can enable it under billing. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (3)
📝 WalkthroughWalkthroughUpdates CI workflow checkout and Docker action versions, refactors Mixpanel token handling to use BuildKit secrets with a cache-busting hash, tightens webhook stream drain typing and saturated-cohort fallback logic (with tests), and adds a vitest test script for the gql package. ChangesCI and Tooling Action Upgrades
Mixpanel Token Security & Build Cache
Webhook Stream Drain Enhancement
GQL Test Script & SSE Test Adjustment
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested labels
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Review rate limit: 0/1 reviews remaining, refill in 10 minutes and 21 seconds.Comment |
There was a problem hiding this comment.
Code Review
This pull request refactors the webhook SSE streaming logic in the kit package by introducing a centralized drainWebhookEventBatches utility and optimizing Convex database indexes to leverage automatic _creationTime appending. It also updates the Docker build configuration to use BuildKit secrets for the Mixpanel token to avoid security-related naming warnings and adds comprehensive integration tests for the SSE client. However, a critical issue was identified: the generated API types reference a new subscriptions/selectLatest module, but the corresponding implementation file is missing from the PR, which will lead to build failures.
There was a problem hiding this comment.
Pull request overview
This PR refactors and hardens the kit webhook SSE drain path (by extracting the batch-drain loop into a standalone helper with unit tests), simplifies Convex indexing for webhook backfill queries, and updates deployment/CI plumbing (BuildKit secret injection for Mixpanel + GitHub Actions action version bumps). It also adds a real-HTTP SSE integration test for the GQL webhook client.
Changes:
- Extract
drainWebhookEventBatchesintowebhookStreamDrain.tsand add focused unit tests;webhooks.tsdelegates to the helper. - Remove the redundant Convex index and update webhook queries to use
by_project_and_received. - Switch Mixpanel token injection to BuildKit secrets (Dockerfile + deploy scripts/workflows) and bump several CI action pins.
Reviewed changes
Copilot reviewed 27 out of 28 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| packages/kit/server/api/v1/webhooks.ts | Delegates the live SSE drain loop to the extracted drain helper. |
| packages/kit/server/api/v1/webhookStreamDrain.ts | New helper implementing paged draining with cursor advancement + iteration limiting. |
| packages/kit/server/api/v1/webhookStreamDrain.test.ts | New unit tests covering paging, same-ms cohorts, and dedupe cursor advancement. |
| packages/kit/scripts/deploy-prod.sh | Passes Mixpanel via BuildKit secret and renames build args array to BUILD_FLAGS. |
| packages/kit/convex/webhooks/query.ts | Switches queries to by_project_and_received index. |
| packages/kit/convex/schema.ts | Removes the redundant index and documents _creationTime behavior for SSE backfill. |
| packages/kit/convex/_generated/api.d.ts | Regenerates Convex API typings (adds subscriptions/selectLatest). |
| packages/kit/README.md | Updates production deploy instructions and BuildKit secret notes. |
| packages/kit/Dockerfile | Uses BuildKit secret mount for Mixpanel token during bun run build:all; adds Dockerfile syntax directive. |
| packages/gql/src/webhook-client.sse.test.ts | Adds a real-HTTP SSE integration test using Bun.serve + a fetch-based EventSource shim. |
| .github/workflows/release.yml | Bumps actions/checkout version pin. |
| .github/workflows/release-react-native.yml | Bumps actions/checkout version pin. |
| .github/workflows/release-kmp.yml | Bumps actions/checkout version pin. |
| .github/workflows/release-google.yml | Bumps actions/checkout version pin. |
| .github/workflows/release-godot.yml | Bumps actions/checkout version pin. |
| .github/workflows/release-flutter.yml | Bumps actions/checkout version pin. |
| .github/workflows/release-expo.yml | Bumps actions/checkout version pin. |
| .github/workflows/release-apple.yml | Bumps actions/checkout version pin. |
| .github/workflows/publish-flutter.yml | Bumps actions/checkout version pin. |
| .github/workflows/deploy-kit.yml | Bumps Docker actions and switches Mixpanel injection to BuildKit secret at deploy time. |
| .github/workflows/dependabot-bun-lockfile.yml | Bumps actions/checkout version pin. |
| .github/workflows/ci.yml | Bumps actions/checkout version pin across jobs. |
| .github/workflows/ci-react-native-iap.yml | Bumps actions/checkout version pin. |
| .github/workflows/ci-kmp-iap.yml | Bumps actions/checkout version pin. |
| .github/workflows/ci-godot-iap.yml | Bumps actions/checkout version pin. |
| .github/workflows/ci-flutter-iap.yml | Bumps actions/checkout version pin. |
| .github/workflows/ci-expo-iap.yml | Bumps actions/checkout version pin. |
| .claude/guides/09-kit-package.md | Updates kit operational guidance around BuildKit-secret usage for TOKEN-named public config. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Code Review
This pull request refactors the webhook SSE streaming logic in packages/kit by introducing a drainWebhookEventBatches helper to manage paged event loading and cursor advancement. It optimizes Convex database indexes for these queries and updates the Docker build process to use BuildKit secrets for public tokens, preventing Docker secret-name warnings. Additionally, a new integration test for the SSE client was added to packages/gql. Review feedback indicates that the drainWebhookEventBatches helper lacks the "saturated-cohort fallback" logic found in the original implementation, which is necessary to handle event bursts that exceed the page limit within a single millisecond.
There was a problem hiding this comment.
🧹 Nitpick comments (2)
packages/kit/server/api/v1/webhookStreamDrain.ts (1)
1-33: 💤 Low valueConsider using
interfacefor object shape definitions.The coding guidelines prefer
interfaceovertypefor defining object shapes. This is a minor style consideration.♻️ Optional: Convert types to interfaces
-export type WebhookStreamCursor = { +export interface WebhookStreamCursor { sinceMs: number; afterCreationTime?: number; -}; +} -export type WebhookStreamSeenSet = { +export interface WebhookStreamSeenSet { has(id: string): boolean; add(id: string): void; -}; +} -export type DrainWebhookEventBatchesOptions = { +export interface DrainWebhookEventBatchesOptions { // ... same fields -}; +} -export type DrainWebhookEventBatchesResult = { +export interface DrainWebhookEventBatchesResult { // ... same fields -}; +}As per coding guidelines: "Prefer interface for defining object shapes in TypeScript"
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/kit/server/api/v1/webhookStreamDrain.ts` around lines 1 - 33, Replace the exported object-shape type aliases with interfaces to follow the project's TypeScript style guide: change WebhookStreamCursor, WebhookStreamSeenSet, DrainWebhookEventBatchesOptions, and DrainWebhookEventBatchesResult from "export type X = { ... }" to "export interface X { ... }", preserving all property names, optional markers, method signatures (including the loadBatch and writeEvent signatures on DrainWebhookEventBatchesOptions), and exported names so behavior and types remain identical.packages/kit/Dockerfile (1)
1-1: 💤 Low valueOptional: Consider pinning the Dockerfile syntax to a specific patch version.
docker/dockerfile:1.7resolves to the latest1.7.xpatch, which could shift under you if Docker releases a new patch. Pinning to a fully-qualified tag (e.g.,docker/dockerfile:1.7.1) would make the build hermetic.🔧 Suggested change
-# syntax=docker/dockerfile:1.7 +# syntax=docker/dockerfile:1.7.1🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/kit/Dockerfile` at line 1, The Dockerfile's syntax directive ("# syntax=docker/dockerfile:1.7") is using a floating patch tag; update that directive to a fully-qualified patch tag (for example change "docker/dockerfile:1.7" to a specific patch like "docker/dockerfile:1.7.1") to pin the build to a stable parser version and make the build hermetic.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@packages/kit/Dockerfile`:
- Line 1: The Dockerfile's syntax directive ("# syntax=docker/dockerfile:1.7")
is using a floating patch tag; update that directive to a fully-qualified patch
tag (for example change "docker/dockerfile:1.7" to a specific patch like
"docker/dockerfile:1.7.1") to pin the build to a stable parser version and make
the build hermetic.
In `@packages/kit/server/api/v1/webhookStreamDrain.ts`:
- Around line 1-33: Replace the exported object-shape type aliases with
interfaces to follow the project's TypeScript style guide: change
WebhookStreamCursor, WebhookStreamSeenSet, DrainWebhookEventBatchesOptions, and
DrainWebhookEventBatchesResult from "export type X = { ... }" to "export
interface X { ... }", preserving all property names, optional markers, method
signatures (including the loadBatch and writeEvent signatures on
DrainWebhookEventBatchesOptions), and exported names so behavior and types
remain identical.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: f085103f-bbbe-485d-b46f-9c75dfdefa4b
⛔ Files ignored due to path filters (1)
packages/kit/convex/_generated/api.d.tsis excluded by!**/_generated/**
📒 Files selected for processing (27)
.claude/guides/09-kit-package.md.github/workflows/ci-expo-iap.yml.github/workflows/ci-flutter-iap.yml.github/workflows/ci-godot-iap.yml.github/workflows/ci-kmp-iap.yml.github/workflows/ci-react-native-iap.yml.github/workflows/ci.yml.github/workflows/dependabot-bun-lockfile.yml.github/workflows/deploy-kit.yml.github/workflows/publish-flutter.yml.github/workflows/release-apple.yml.github/workflows/release-expo.yml.github/workflows/release-flutter.yml.github/workflows/release-godot.yml.github/workflows/release-google.yml.github/workflows/release-kmp.yml.github/workflows/release-react-native.yml.github/workflows/release.ymlpackages/gql/src/webhook-client.sse.test.tspackages/kit/Dockerfilepackages/kit/README.mdpackages/kit/convex/schema.tspackages/kit/convex/webhooks/query.tspackages/kit/scripts/deploy-prod.shpackages/kit/server/api/v1/webhookStreamDrain.test.tspackages/kit/server/api/v1/webhookStreamDrain.tspackages/kit/server/api/v1/webhooks.ts
…allback) - Add VITE_KIT_MIXPANEL_TOKEN_HASH cache-bust ARG so a token rotation invalidates the cached `bun run build:all` layer (BuildKit secret values are not part of the layer cache key, so without it GHA / Fly's remote builder would reuse the prior layer and ship a stale token). - Wire packages/gql vitest tests into CI: add a `test` script and a Run tests step in the test-gql job. Existing webhook-client tests weren't being exercised on PRs. - Port the saturated-cohort fallback from the original Phase 1 drain into drainWebhookEventBatches so the helper makes forward progress even when an underlying query truncates a same-millisecond cohort past its take() cap. Add a unit test covering that scenario. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
/gemini review |
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
🧹 Nitpick comments (2)
packages/kit/server/api/v1/webhookStreamDrain.test.ts (2)
5-9: ⚡ Quick winUse interfaces for object shapes in this TypeScript test.
This file uses a type alias and inline object type literals for structured shapes; the repo guideline prefers interfaces.
♻️ Suggested refactor
-type EventRow = { +interface EventRow { id: string; receivedAt: number; _creationTime: number; -}; +} + +interface BatchCursorInput { + sinceMs: number; + afterCreationTime?: number; + limit: number; +} function makePagedLoader(events: EventRow[]) { return async ({ sinceMs, afterCreationTime, limit, - }: { - sinceMs: number; - afterCreationTime?: number; - limit: number; - }) => + }: BatchCursorInput) => events .filter((event) => { if (event.receivedAt > sinceMs) return true; if (event.receivedAt !== sinceMs) return false; return ( afterCreationTime === undefined || event._creationTime > afterCreationTime ); }) .slice(0, limit); }As per coding guidelines:
**/*.{ts,tsx}: Prefer interface for defining object shapes in TypeScript.Also applies to: 26-30, 120-123
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/kit/server/api/v1/webhookStreamDrain.test.ts` around lines 5 - 9, Replace the type alias EventRow with an equivalent interface and update any inline object type literals in webhookStreamDrain.test.ts to use interfaces as well; specifically convert the type alias named EventRow into an interface declaration and replace other inline shapes referenced around the test helpers/assertions (the inline event/response shapes used near the top and inside the tests) with named interfaces so the file follows the repo guideline preferring interfaces for object shapes. Ensure exported/used names remain the same and update any references to the former type alias (EventRow) to the new interface name.
43-184: ⚡ Quick winAdd explicit tests for
isAbortedand iteration-limit callbacks.Great coverage on paging/cohorts/duplicates; adding one test each for abort short-circuit and
onIterationLimit/hitIterationLimitbehavior would lock down the remaining control-flow branches in the helper.As per coding guidelines:
**/*.test.{ts,tsx,js,jsx}: Write unit tests for all utility functions and business logic.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/kit/server/api/v1/webhookStreamDrain.test.ts` around lines 43 - 184, Add two tests to drainWebhookEventBatches: one that passes an isAborted function which returns true after the first iteration and asserts writeEvent is not invoked and the returned result indicates the drain stopped (e.g., hitIterationLimit false and cursor unchanged), and another that uses a very small maxIterations with a spy on onIterationLimit (or checks result.hitIterationLimit) to assert the callback is invoked and hitIterationLimit becomes true; locate and modify the existing tests in webhookStreamDrain.test.ts and use the same helpers (makePagedLoader, makeSeen) while referencing the drainWebhookEventBatches parameters isAborted, onIterationLimit, maxIterations, writeEvent, loadBatch, and result.hitIterationLimit to implement the assertions.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@packages/kit/server/api/v1/webhookStreamDrain.test.ts`:
- Around line 5-9: Replace the type alias EventRow with an equivalent interface
and update any inline object type literals in webhookStreamDrain.test.ts to use
interfaces as well; specifically convert the type alias named EventRow into an
interface declaration and replace other inline shapes referenced around the test
helpers/assertions (the inline event/response shapes used near the top and
inside the tests) with named interfaces so the file follows the repo guideline
preferring interfaces for object shapes. Ensure exported/used names remain the
same and update any references to the former type alias (EventRow) to the new
interface name.
- Around line 43-184: Add two tests to drainWebhookEventBatches: one that passes
an isAborted function which returns true after the first iteration and asserts
writeEvent is not invoked and the returned result indicates the drain stopped
(e.g., hitIterationLimit false and cursor unchanged), and another that uses a
very small maxIterations with a spy on onIterationLimit (or checks
result.hitIterationLimit) to assert the callback is invoked and
hitIterationLimit becomes true; locate and modify the existing tests in
webhookStreamDrain.test.ts and use the same helpers (makePagedLoader, makeSeen)
while referencing the drainWebhookEventBatches parameters isAborted,
onIterationLimit, maxIterations, writeEvent, loadBatch, and
result.hitIterationLimit to implement the assertions.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 2a254792-4a47-4f80-a288-e218aeaffed5
📒 Files selected for processing (7)
.github/workflows/ci.yml.github/workflows/deploy-kit.ymlpackages/gql/package.jsonpackages/kit/Dockerfilepackages/kit/scripts/deploy-prod.shpackages/kit/server/api/v1/webhookStreamDrain.test.tspackages/kit/server/api/v1/webhookStreamDrain.ts
✅ Files skipped from review due to trivial changes (2)
- packages/gql/package.json
- .github/workflows/ci.yml
🚧 Files skipped from review as they are similar to previous changes (4)
- packages/kit/scripts/deploy-prod.sh
- .github/workflows/deploy-kit.yml
- packages/kit/Dockerfile
- packages/kit/server/api/v1/webhookStreamDrain.ts
There was a problem hiding this comment.
Code Review
This pull request refactors the webhook SSE streaming logic in the kit package by introducing a drainWebhookEventBatches helper to improve cursor management and handle saturated millisecond cohorts. It updates the Convex schema to leverage automatic _creationTime indexing and modifies the Docker build process to use BuildKit secrets for public tokens, ensuring proper cache busting. Additionally, it adds integration tests for the SSE client and the drain helper. Feedback suggests improving type safety for event objects and ensuring the portability of hashing commands in the deployment scripts.
…-prod - Replace `Record<string, unknown>` in drainWebhookEventBatches with a WebhookStreamEvent type that names the fields the helper actually reads (`id`, `receivedAt`, `_creationTime`) while still allowing pass-through fields the SSE writer needs (`type`, payload bodies). - deploy-prod.sh: prefer `sha256sum` (most Linux) and fall back to `shasum -a 256` (macOS); error out with a clear message when neither is on PATH so a deployer doesn't ship without the cache- bust arg. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
/gemini review |
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Code Review
This pull request refactors the webhook SSE stream logic by introducing a drainWebhookEventBatches helper to manage cursors and handle saturated-cohort progress. It simplifies Convex indexes by removing a redundant composite index, updates the Docker build process to use BuildKit secrets for the Mixpanel token to avoid warnings and improve cache management, and adds comprehensive tests for the new drain logic and SSE client. Feedback was provided regarding a potential edge case in the drain helper where missing event fields could cause the process to stop prematurely.
…drain-helper # Conflicts: # .github/workflows/deploy-kit.yml # packages/kit/Dockerfile # packages/kit/scripts/deploy-prod.sh # packages/kit/server/api/v1/webhookStreamDrain.test.ts # packages/kit/server/api/v1/webhookStreamDrain.ts
There was a problem hiding this comment.
🧹 Nitpick comments (3)
packages/kit/server/api/v1/webhookStreamDrain.ts (3)
11-21: ⚡ Quick winUse an
interfacehere and make the existing docs real JSDoc.
WebhookStreamEventis now a public object contract, sointerfacefits the repo convention better thantype, and converting the block comment to/** ... */makes the intent show up in editor tooltips for callers.♻️ Proposed change
-// Minimal contract the helper needs from a webhook event row. The -// caller may pass through additional fields (the SSE writer reads -// `type`, the dashboard reads payload bodies etc.); the index -// signature lets those flow through without forcing every callsite -// to widen back to `Record<string, unknown>`. -export type WebhookStreamEvent = { +/** + * Minimal contract the helper needs from a webhook event row. The + * caller may pass through additional fields (the SSE writer reads + * `type`, the dashboard reads payload bodies etc.); the index + * signature lets those flow through without forcing every callsite + * to widen back to `Record<string, unknown>`. + */ +export interface WebhookStreamEvent { id?: unknown; receivedAt?: unknown; _creationTime?: unknown; [key: string]: unknown; -}; +}As per coding guidelines,
**/*.{ts,tsx}: Prefer interface for defining object shapes in TypeScript, and**/*.{ts,tsx,js,jsx}: Add JSDoc comments for public functions and exported APIs.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/kit/server/api/v1/webhookStreamDrain.ts` around lines 11 - 21, Replace the exported type alias WebhookStreamEvent with an exported interface WebhookStreamEvent and convert the leading block comment into a JSDoc comment (/** ... */) so the contract is discoverable in editor tooltips; keep the same optional fields (id, receivedAt, _creationTime) and the index signature ([key: string]: unknown) so existing callsites remain compatible and update any references if your editor flags them.
46-48: ⚡ Quick winAdd JSDoc to the exported drain helper.
drainWebhookEventBatchesnow has subtle guarantees around cursor advancement, dedupe, abort behavior, and the same-ms fallback. A short JSDoc block would make that contract much easier to consume correctly from other call sites.As per coding guidelines,
**/*.{ts,tsx,js,jsx}: Add JSDoc comments for public functions and exported APIs.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/kit/server/api/v1/webhookStreamDrain.ts` around lines 46 - 48, Add a concise JSDoc block above the exported function drainWebhookEventBatches describing its contract: explain how cursor advancement is handled (when and how the cursor is advanced), deduplication guarantees (what is deduped and by what key), abort behavior (how an AbortSignal affects processing and cursor updates), and the same-millisecond fallback policy (how events with identical timestamps are ordered/handled). Mention the types of the parameters and return value (DrainWebhookEventBatchesOptions -> DrainWebhookEventBatchesResult) and note any side effects (e.g., persistent cursor/state updates) and error conditions callers should expect.
74-85: Consider surfacing when the saturated-cohort fallback fires.This branch intentionally skips the rest of one millisecond by bumping
sinceMs, so a metric or callback here would make burst-induced drops much easier to diagnose in production.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/kit/server/api/v1/webhookStreamDrain.ts` around lines 74 - 85, The saturated-cohort fallback branch (where lastDeliveredReceivedAt === cursor.sinceMs and you increment cursor.sinceMs, reset cursor.afterCreationTime and clear lastDeliveredReceivedAt) should emit an observability signal so burst-induced drops are visible in production; update that branch in webhookStreamDrain.ts to call your metrics/logging hook (e.g., increment a counter like saturatedCohortFallback, or invoke a provided callback/onSaturatedCohort) and include context (cursor.sinceMs, any stream id) before advancing the cursor, ensuring the metric/log call occurs immediately before cursor.sinceMs += 1 and preserves existing behavior.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@packages/kit/server/api/v1/webhookStreamDrain.ts`:
- Around line 11-21: Replace the exported type alias WebhookStreamEvent with an
exported interface WebhookStreamEvent and convert the leading block comment into
a JSDoc comment (/** ... */) so the contract is discoverable in editor tooltips;
keep the same optional fields (id, receivedAt, _creationTime) and the index
signature ([key: string]: unknown) so existing callsites remain compatible and
update any references if your editor flags them.
- Around line 46-48: Add a concise JSDoc block above the exported function
drainWebhookEventBatches describing its contract: explain how cursor advancement
is handled (when and how the cursor is advanced), deduplication guarantees (what
is deduped and by what key), abort behavior (how an AbortSignal affects
processing and cursor updates), and the same-millisecond fallback policy (how
events with identical timestamps are ordered/handled). Mention the types of the
parameters and return value (DrainWebhookEventBatchesOptions ->
DrainWebhookEventBatchesResult) and note any side effects (e.g., persistent
cursor/state updates) and error conditions callers should expect.
- Around line 74-85: The saturated-cohort fallback branch (where
lastDeliveredReceivedAt === cursor.sinceMs and you increment cursor.sinceMs,
reset cursor.afterCreationTime and clear lastDeliveredReceivedAt) should emit an
observability signal so burst-induced drops are visible in production; update
that branch in webhookStreamDrain.ts to call your metrics/logging hook (e.g.,
increment a counter like saturatedCohortFallback, or invoke a provided
callback/onSaturatedCohort) and include context (cursor.sinceMs, any stream id)
before advancing the cursor, ensuring the metric/log call occurs immediately
before cursor.sinceMs += 1 and preserves existing behavior.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: b8f7e831-e7be-457f-95bb-9d713ec1f26a
📒 Files selected for processing (2)
packages/kit/scripts/deploy-prod.shpackages/kit/server/api/v1/webhookStreamDrain.ts
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces a cache-busting mechanism for Mixpanel tokens in the Docker build process by hashing the token and passing it as a build argument. It also refactors the webhook stream draining logic to handle saturated millisecond cohorts, preventing premature termination when query limits are reached, and adds a corresponding test case. Feedback includes ensuring the token hash is explicitly referenced in the Dockerfile's RUN command to effectively invalidate the cache and reordering the event processing logic to guarantee at-least-once delivery by marking events as seen only after a successful write.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 7 out of 7 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
…vitest) - Dockerfile: expand VITE_KIT_MIXPANEL_TOKEN_HASH inside the RUN command so BuildKit folds it into the layer cache key. An unused ARG is ignored by the cache, which silently undid the rotation-busting effect. - webhookStreamDrain: gate the saturated-cohort fallback on lastObservedReceivedAt instead of lastDeliveredReceivedAt so a full same-millisecond page of dedup'd events still advances past the cohort. - webhookStreamDrain: add the event id to `seen` only after writeEvent succeeds. A thrown writer leaves the event eligible for retry on the next drain pass; the loop is sequential so no in-batch race. - Add tests for both behaviors (dedup'd cohort fallback, write-failure retryability). - packages/gql: switch the test script from `bun test src` to `vitest run src` and add vitest as a devDep so the SSE integration test runs under the same runner as the rest of the monorepo. - Rewrite the SSE integration test to use Node's `http.createServer` instead of `Bun.serve` so it runs under vitest on the Node pool. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
/gemini review |
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Code Review
This pull request implements a saturated-cohort fallback in the webhook draining logic to handle large bursts of events occurring within the same millisecond and ensures that events are only marked as processed after a successful write, allowing for retries on failure. Additionally, it migrates SSE integration tests to use Node's native HTTP server and introduces a cache-busting mechanism in the Docker build process for Mixpanel tokens. Feedback was provided regarding the sequential nature of event processing, which might impact throughput.
| // leaves the event eligible for retry on the next drain pass. | ||
| // The await keeps the loop sequential, so there's no in-batch | ||
| // race where the same id could be dispatched twice. | ||
| await options.writeEvent(event, id); |
There was a problem hiding this comment.
The await on options.writeEvent inside the loop makes the drain process sequential. If writeEvent is slow or experiences transient failures, this could significantly delay the processing of subsequent events in the batch. Consider if parallel processing or a more robust retry mechanism is needed if throughput is a concern.
There was a problem hiding this comment.
The sequential await is intentional and load-bearing here, not a perf oversight:
- SSE ordering: the only caller (
webhooks.ts) writes throughstream.writeSSEto a Hono SSE response. Clients reconnect withLast-Event-Idand rely on monotonically-increasing IDs — parallel writes would interleave events from a single batch and break consumer cursor semantics. - Same-batch dedupe: we just (this PR, prior reviewer suggestion) moved
options.seen.add(id)to afterwriteEventso a thrown writer leaves the event retryable. That move is only safe because the loop is sequential — underPromise.alltwo events with the same id could both pass theseen.hascheck before either finishes writing. - Cursor advancement:
lastObservedReceivedAtand the cohort fallback rely on the in-order traversal to decide when to bumpcursor.sinceMs.
Throughput is bounded by the SSE channel's natural ordering anyway, and live drain pages are 500 events at a time. If a particular writeEvent is slow that's a backpressure signal we want to honor (the SSE consumer is overwhelmed), not paper over with concurrency.
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
packages/kit/server/api/v1/webhooks.ts (1)
641-653:⚠️ Potential issue | 🟠 Major | ⚡ Quick winDon't swallow live-write failures inside
writeEvent.
stream.writeSSE(...).catch(...)turns a transport failure into a resolvedwriteEvent, sodrainWebhookEventBatches()still callsseen.add(id)and the undelivered event is never retried. Let the promise reject after logging so the new helper semantics actually preserve retryability.🛠️ Minimal change
writeEvent: async (event, id) => { - await stream - .writeSSE({ - id, - event: - typeof event.type === "string" - ? event.type - : "WebhookEvent", - data: JSON.stringify(redactWebhookEventForStream(event)), - }) - .catch((err) => { - console.error("[webhooks/stream] live write failed", err); - }); + try { + await stream.writeSSE({ + id, + event: + typeof event.type === "string" + ? event.type + : "WebhookEvent", + data: JSON.stringify(redactWebhookEventForStream(event)), + }); + } catch (err) { + console.error("[webhooks/stream] live write failed", err); + throw err; + } },🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/kit/server/api/v1/webhooks.ts` around lines 641 - 653, The writeEvent helper currently swallows transport errors by attaching .catch(...) to stream.writeSSE, causing drainWebhookEventBatches() to mark events as seen (seen.add(id)) even when delivery failed; change writeEvent (the async function writeEvent in webhooks stream writer) to let stream.writeSSE rethrow after logging so failures propagate: await stream.writeSSE(...) without a terminal .catch, or catch the error, call console.error("[webhooks/stream] live write failed", err) and then rethrow the error so drainWebhookEventBatches() can detect the rejection and avoid calling seen.add(id) and allow retries.
🧹 Nitpick comments (2)
packages/kit/server/api/v1/webhookStreamDrain.ts (1)
16-42: ⚡ Quick winPrefer interfaces for the new exported object contracts.
WebhookStreamEventand the new callback state payloads are exported object shapes. Converting them to namedinterfaces would keep this helper API consistent with the repo's TypeScript convention.As per coding guidelines,
**/*.{ts,tsx}: Prefer interface for defining object shapes in TypeScript.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/kit/server/api/v1/webhookStreamDrain.ts` around lines 16 - 42, Convert the exported object "type" aliases to exported "interface"s: change WebhookStreamEvent to an exported interface, and change DrainWebhookEventBatchesOptions from a type to an exported interface while preserving all property names and signatures (including loadBatch, seen, writeEvent, isAborted, limit, maxIterations, initialCursor, and the optional callbacks). Also replace the inline anonymous state payloads used in onIterationLimit and onSaturatedCohortFallback with named exported interfaces (e.g., WebhookStreamIterationState and WebhookStreamSaturatedCohortState) and reference those names in the DrainWebhookEventBatchesOptions callbacks so the shapes are declared as interfaces consistently.packages/gql/src/webhook-client.sse.test.ts (1)
119-122: ⚡ Quick winUse a named interface for the server handle.
The inline
Promise<{ port: number; close(): Promise<void> }>return type is another object-shape declaration. A smallinterfacekeeps this test helper aligned with the repo's TypeScript convention.♻️ Minimal refactor
+interface SseTestServer { + port: number; + close(): Promise<void>; +} + -function startSseServer(frame: string): Promise<{ - port: number; - close(): Promise<void>; -}> { +function startSseServer(frame: string): Promise<SseTestServer> {As per coding guidelines,
**/*.{ts,tsx}: Prefer interface for defining object shapes in TypeScript.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/gql/src/webhook-client.sse.test.ts` around lines 119 - 122, Replace the inline return shape on startSseServer with a named interface: define an interface (e.g., SseServerHandle) with properties port: number and close(): Promise<void>, then update the function signature of startSseServer to return Promise<SseServerHandle> and adjust any imports/exports if needed so the test helper follows the project's TypeScript convention.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@packages/kit/server/api/v1/webhookStreamDrain.ts`:
- Around line 90-109: The saturated-cohort fallback is too broad: change the
predicate from just lastObservedReceivedAt === cursor.sinceMs to also require
evidence the previous page was truly saturated (e.g. the prior query returned at
least limit rows or was otherwise truncated). Add a boolean flag (e.g.
lastPageWasFull or lastPageTruncated) that you set when processing the previous
fetch (set true when results.length >= limit or when you detect truncation), and
update the check to: lastObservedReceivedAt === cursor.sinceMs &&
lastPageWasFull before triggering options.onSaturatedCohortFallback and bumping
cursor.sinceMs; clear/reset that flag when you advance normally. Also add a
regression test that simulates a full page ending at the cursor millisecond but
not truncated and asserts the fallback does NOT advance sinceMs.
---
Outside diff comments:
In `@packages/kit/server/api/v1/webhooks.ts`:
- Around line 641-653: The writeEvent helper currently swallows transport errors
by attaching .catch(...) to stream.writeSSE, causing drainWebhookEventBatches()
to mark events as seen (seen.add(id)) even when delivery failed; change
writeEvent (the async function writeEvent in webhooks stream writer) to let
stream.writeSSE rethrow after logging so failures propagate: await
stream.writeSSE(...) without a terminal .catch, or catch the error, call
console.error("[webhooks/stream] live write failed", err) and then rethrow the
error so drainWebhookEventBatches() can detect the rejection and avoid calling
seen.add(id) and allow retries.
---
Nitpick comments:
In `@packages/gql/src/webhook-client.sse.test.ts`:
- Around line 119-122: Replace the inline return shape on startSseServer with a
named interface: define an interface (e.g., SseServerHandle) with properties
port: number and close(): Promise<void>, then update the function signature of
startSseServer to return Promise<SseServerHandle> and adjust any imports/exports
if needed so the test helper follows the project's TypeScript convention.
In `@packages/kit/server/api/v1/webhookStreamDrain.ts`:
- Around line 16-42: Convert the exported object "type" aliases to exported
"interface"s: change WebhookStreamEvent to an exported interface, and change
DrainWebhookEventBatchesOptions from a type to an exported interface while
preserving all property names and signatures (including loadBatch, seen,
writeEvent, isAborted, limit, maxIterations, initialCursor, and the optional
callbacks). Also replace the inline anonymous state payloads used in
onIterationLimit and onSaturatedCohortFallback with named exported interfaces
(e.g., WebhookStreamIterationState and WebhookStreamSaturatedCohortState) and
reference those names in the DrainWebhookEventBatchesOptions callbacks so the
shapes are declared as interfaces consistently.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: ae93446e-ce2b-4fac-8f15-bbb7a186be88
⛔ Files ignored due to path filters (1)
bun.lockis excluded by!**/*.lock
📒 Files selected for processing (6)
packages/gql/package.jsonpackages/gql/src/webhook-client.sse.test.tspackages/kit/Dockerfilepackages/kit/server/api/v1/webhookStreamDrain.test.tspackages/kit/server/api/v1/webhookStreamDrain.tspackages/kit/server/api/v1/webhooks.ts
- webhookStreamDrain: gate the saturated-cohort fallback on the prior page being a true same-receivedAt cohort (`batch.length === limit`, every event sharing one receivedAt, that receivedAt matching `cursor.sinceMs`). A mixed full page that merely ends at the cursor ms is no longer evidence of saturation, so we don't risk skipping a late-arriving same-ms event the next normal query would have picked up. Add a regression test asserting the fallback does NOT fire on a mixed full page ending at the cursor ms. - launch.json: prepend `npx --yes kill-port 3000 5173 || true` and install a `trap 'kill 0' INT TERM EXIT` for the Kit Dev config. Without it, stopping the debug session left bun's :3000 server alive across runs and the next launch crashed with EADDRINUSE. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Summary
drainWebhookEventBatchesinto a standalone helper (packages/kit/server/api/v1/webhookStreamDrain.ts) with unit tests; the SSE live-drain loop inwebhooks.tsnow delegates to it, preserving the same advance/abort semantics so cohort and iteration-limit edges are testable in isolation.by_project_and_received_and_creationConvex index. Convex auto-appends_creationTimeto every index, soby_project_and_receivedalready serves bothwebhookEventsSinceandlatestWebhookEventsSincequeries.connectWebhookStreaminpackages/gqlusingBun.serveplus afetch-basedEventSourceshim.VITE_KIT_MIXPANEL_TOKENas a BuildKit secret instead of anARG/ENVpair (Dockerfile,deploy-kit.yml,scripts/deploy-prod.sh) to silence Docker's TOKEN-named warning while still inlining the public token into the SPA bundle.actions/checkout@v4→v6,docker/setup-buildx-action@v3→v4, anddocker/build-push-action@v6→v7across CI workflows.Test plan
packages/kitlint, typecheck, tests (337 passed), smoke server probe — passed locally via pre-push gatepackages/gqlwebhook-client.sse.test.tsruns against a real Bun-served SSE endpoint — 1 pass / 3 expect() callsdeploy-kit.ymldeploy step passes Mixpanel token via--build-secret; Dockerfile mount usesrequired=falseso the PR-level build (which does not pass the secret) still succeedsactions/checkout@,setup-buildx-action@,build-push-action@shows only@v6,@v4,@v7)🤖 Generated with Claude Code
Summary by CodeRabbit
Bug Fixes / Reliability
Tests
Chores