From 92cf9eb6979b9711b36c8b14b2f4d6b282c8af23 Mon Sep 17 00:00:00 2001 From: hyochan Date: Mon, 4 May 2026 07:53:35 +0900 Subject: [PATCH 1/6] chore(kit): drain helper extraction + ci pins + buildkit secret - Extract drainWebhookEventBatches helper with unit tests; webhooks.ts now delegates the live SSE drain loop with the same advance/abort semantics so the cohort + iteration-limit edges are testable. - Drop the redundant by_project_and_received_and_creation Convex index (Convex auto-appends _creationTime, so by_project_and_received already serves both webhookEventsSince and latestWebhookEventsSince). - Add real-HTTP SSE integration test for connectWebhookStream using a Bun.serve + fetch-based EventSource shim. - Pass VITE_KIT_MIXPANEL_TOKEN as a BuildKit secret instead of an ARG/ENV pair (Dockerfile + deploy-kit.yml + deploy-prod.sh) to silence Docker's TOKEN-named warning while still inlining the public token. - Bump actions/checkout v4 -> v6, docker/setup-buildx v3 -> v4, and docker/build-push v6 -> v7 across CI workflows. Co-Authored-By: Claude Opus 4.7 (1M context) --- .claude/guides/09-kit-package.md | 30 +-- .github/workflows/ci-expo-iap.yml | 4 +- .github/workflows/ci-flutter-iap.yml | 2 +- .github/workflows/ci-godot-iap.yml | 2 +- .github/workflows/ci-kmp-iap.yml | 2 +- .github/workflows/ci-react-native-iap.yml | 4 +- .github/workflows/ci.yml | 12 +- .github/workflows/dependabot-bun-lockfile.yml | 2 +- .github/workflows/deploy-kit.yml | 18 +- .github/workflows/publish-flutter.yml | 2 +- .github/workflows/release-apple.yml | 4 +- .github/workflows/release-expo.yml | 6 +- .github/workflows/release-flutter.yml | 6 +- .github/workflows/release-godot.yml | 6 +- .github/workflows/release-google.yml | 4 +- .github/workflows/release-kmp.yml | 6 +- .github/workflows/release-react-native.yml | 6 +- .github/workflows/release.yml | 2 +- packages/gql/src/webhook-client.sse.test.ts | 182 ++++++++++++++++++ packages/kit/Dockerfile | 12 +- packages/kit/README.md | 17 +- packages/kit/convex/_generated/api.d.ts | 2 + packages/kit/convex/schema.ts | 20 +- packages/kit/convex/webhooks/query.ts | 8 +- packages/kit/scripts/deploy-prod.sh | 10 +- .../server/api/v1/webhookStreamDrain.test.ts | 120 ++++++++++++ .../kit/server/api/v1/webhookStreamDrain.ts | 119 ++++++++++++ packages/kit/server/api/v1/webhooks.ts | 78 +++----- 28 files changed, 545 insertions(+), 141 deletions(-) create mode 100644 packages/gql/src/webhook-client.sse.test.ts create mode 100644 packages/kit/server/api/v1/webhookStreamDrain.test.ts create mode 100644 packages/kit/server/api/v1/webhookStreamDrain.ts diff --git a/.claude/guides/09-kit-package.md b/.claude/guides/09-kit-package.md index 2b52ccfd..b4f20aba 100644 --- a/.claude/guides/09-kit-package.md +++ b/.claude/guides/09-kit-package.md @@ -6,14 +6,14 @@ Hosted receipt-validation SaaS at [kit.openiap.dev](https://kit.openiap.dev) — `packages/kit` is the only package in this monorepo that is **a deployable application, not a publishable library**. Treat it differently: -| Aspect | apple/google/gql/docs | kit | -| ------------------- | --------------------------- | -------------------------------------------------- | -| Output | Library / static site | Running SaaS (Fly.io machine) | -| Type SSOT | `packages/gql` (GraphQL IR) | Independent Convex schema | -| Deploy trigger | Tagged release | `main` push (paths-filtered to `packages/kit/**`) | +| Aspect | apple/google/gql/docs | kit | +| ------------------- | --------------------------- | ------------------------------------------------------ | +| Output | Library / static site | Running SaaS (Fly.io machine) | +| Type SSOT | `packages/gql` (GraphQL IR) | Independent Convex schema | +| Deploy trigger | Tagged release | `main` push (paths-filtered to `packages/kit/**`) | | GQL type-sync chain | Yes | **No** — kit does not consume `@hyodotdev/openiap-gql` | -| `private: true` | Mixed | Yes — never publish to npm | -| User-facing brand | `openiap-*` | `IAPKit` (managed by OpenIAP) | +| `private: true` | Mixed | Yes — never publish to npm | +| User-facing brand | `openiap-*` | `IAPKit` (managed by OpenIAP) | ## Internal Layout @@ -60,13 +60,13 @@ If the hook fails, fix the underlying issue and re-stage; never bypass with `--n GitHub Actions secrets (set on `hyodotdev/openiap`): -| Secret | Real secret? | Purpose | -| ------------------------- | ------------ | --------------------------------------------------------- | -| `KIT_FLY_API_TOKEN` | ✅ yes | `flyctl deploy` auth — keep private | -| `KIT_CONVEX_DEPLOY_KEY` | ✅ yes | Convex function deploy (optional — step skips if absent) | -| `VITE_KIT_CONVEX_URL` | ⚠️ public | Build arg for SPA — visible in deployed JS bundle | -| `VITE_KIT_SENTRY_DSN` | ⚠️ public | Build arg for SPA (optional) | -| `VITE_KIT_MIXPANEL_TOKEN` | ⚠️ public | Build arg for SPA (optional, analytics opt-in) | +| Secret | Real secret? | Purpose | +| ------------------------- | ------------ | -------------------------------------------------------- | +| `KIT_FLY_API_TOKEN` | ✅ yes | `flyctl deploy` auth — keep private | +| `KIT_CONVEX_DEPLOY_KEY` | ✅ yes | Convex function deploy (optional — step skips if absent) | +| `VITE_KIT_CONVEX_URL` | ⚠️ public | Build arg for SPA — visible in deployed JS bundle | +| `VITE_KIT_SENTRY_DSN` | ⚠️ public | Build arg for SPA (optional) | +| `VITE_KIT_MIXPANEL_TOKEN` | ⚠️ public | BuildKit secret for SPA (optional, analytics opt-in) | `VITE_KIT_*` values are inlined into the SPA bundle at `bun run build`; treat them as public configuration, not secrets. @@ -83,6 +83,6 @@ Monorepo `.vscode/launch.json` has a single kit entry: **🧰 Kit: Dev (Vite + H ## When You Touch Kit - Stay paths-aware. The deploy workflow only fires on `packages/kit/**` changes. -- Add new env vars to `.env.example` first (template), then `.env.local` (dev) and `.env.production` (manual prod fallback). For `VITE_KIT_*` vars, also update the Dockerfile ARG/ENV pair, the `Deploy` step in `deploy-kit.yml`, and the GitHub secrets. +- Add new env vars to `.env.example` first (template), then `.env.local` (dev) and `.env.production` (manual prod fallback). For `VITE_KIT_*` vars, also update the Docker build-time injection in `Dockerfile`, the `Deploy` step in `deploy-kit.yml`, and the GitHub secrets. Use BuildKit secrets for TOKEN-named public SPA values to avoid Docker secret-name warnings. - For server-runtime-only secrets (Stripe / Resend / GitHub OAuth), use the Convex dashboard, not these files. - Keep dashboard text English-only. Inline string literals; do not reintroduce i18next. diff --git a/.github/workflows/ci-expo-iap.yml b/.github/workflows/ci-expo-iap.yml index 67f6b7fa..aa30aa22 100644 --- a/.github/workflows/ci-expo-iap.yml +++ b/.github/workflows/ci-expo-iap.yml @@ -26,7 +26,7 @@ jobs: runs-on: ubuntu-latest timeout-minutes: 10 steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v6 with: fetch-depth: 1 @@ -51,7 +51,7 @@ jobs: runs-on: ubuntu-latest timeout-minutes: 15 steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v6 with: fetch-depth: 1 diff --git a/.github/workflows/ci-flutter-iap.yml b/.github/workflows/ci-flutter-iap.yml index 44c6cbff..5911a41b 100644 --- a/.github/workflows/ci-flutter-iap.yml +++ b/.github/workflows/ci-flutter-iap.yml @@ -26,7 +26,7 @@ jobs: runs-on: ubuntu-latest timeout-minutes: 15 steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v6 with: fetch-depth: 1 diff --git a/.github/workflows/ci-godot-iap.yml b/.github/workflows/ci-godot-iap.yml index c2e796ce..7ebfa5d8 100644 --- a/.github/workflows/ci-godot-iap.yml +++ b/.github/workflows/ci-godot-iap.yml @@ -26,7 +26,7 @@ jobs: runs-on: ubuntu-latest timeout-minutes: 5 steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v6 with: fetch-depth: 1 diff --git a/.github/workflows/ci-kmp-iap.yml b/.github/workflows/ci-kmp-iap.yml index c39fbcb8..c17d67cb 100644 --- a/.github/workflows/ci-kmp-iap.yml +++ b/.github/workflows/ci-kmp-iap.yml @@ -24,7 +24,7 @@ jobs: runs-on: ubuntu-latest timeout-minutes: 15 steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v6 with: fetch-depth: 1 diff --git a/.github/workflows/ci-react-native-iap.yml b/.github/workflows/ci-react-native-iap.yml index 7081d898..d367592e 100644 --- a/.github/workflows/ci-react-native-iap.yml +++ b/.github/workflows/ci-react-native-iap.yml @@ -26,7 +26,7 @@ jobs: runs-on: ubuntu-latest timeout-minutes: 10 steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v6 with: fetch-depth: 1 @@ -55,7 +55,7 @@ jobs: runs-on: ubuntu-latest timeout-minutes: 15 steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v6 with: fetch-depth: 1 diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 0ec81959..58752e09 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -21,7 +21,7 @@ jobs: agent: ${{ steps.filter.outputs.agent }} steps: - name: Checkout - uses: actions/checkout@v4 + uses: actions/checkout@v6 - name: Check for changes uses: dorny/paths-filter@v4 @@ -59,7 +59,7 @@ jobs: if: needs.changes.outputs.gql == 'true' steps: - name: Checkout - uses: actions/checkout@v4 + uses: actions/checkout@v6 - name: Setup Bun uses: oven-sh/setup-bun@v2 @@ -118,7 +118,7 @@ jobs: if: needs.changes.outputs.android == 'true' steps: - name: Checkout - uses: actions/checkout@v4 + uses: actions/checkout@v6 - name: Setup Java uses: actions/setup-java@v5 @@ -168,7 +168,7 @@ jobs: if: needs.changes.outputs.ios == 'true' steps: - name: Checkout - uses: actions/checkout@v4 + uses: actions/checkout@v6 - name: Setup Bun uses: oven-sh/setup-bun@v2 @@ -204,7 +204,7 @@ jobs: if: needs.changes.outputs.docs == 'true' steps: - name: Checkout - uses: actions/checkout@v4 + uses: actions/checkout@v6 - name: Setup Bun uses: oven-sh/setup-bun@v2 @@ -244,7 +244,7 @@ jobs: if: needs.changes.outputs.agent == 'true' steps: - name: Checkout - uses: actions/checkout@v4 + uses: actions/checkout@v6 - name: Setup Bun uses: oven-sh/setup-bun@v2 diff --git a/.github/workflows/dependabot-bun-lockfile.yml b/.github/workflows/dependabot-bun-lockfile.yml index 12134cd6..887618e5 100644 --- a/.github/workflows/dependabot-bun-lockfile.yml +++ b/.github/workflows/dependabot-bun-lockfile.yml @@ -23,7 +23,7 @@ jobs: runs-on: ubuntu-latest steps: - name: Checkout PR branch - uses: actions/checkout@v4 + uses: actions/checkout@v6 with: ref: ${{ github.event.pull_request.head.ref }} token: ${{ secrets.GITHUB_TOKEN }} diff --git a/.github/workflows/deploy-kit.yml b/.github/workflows/deploy-kit.yml index 50de8425..86cf25b5 100644 --- a/.github/workflows/deploy-kit.yml +++ b/.github/workflows/deploy-kit.yml @@ -65,14 +65,14 @@ jobs: run: ./scripts/smoke-server.sh - name: Set up Docker Buildx - uses: docker/setup-buildx-action@v3 + uses: docker/setup-buildx-action@v4 - name: Docker build (PR-level Dockerfile check) # The Deploy job below also builds the Docker image, but only on # push-to-main. Building here on PRs catches Dockerfile/Bun-image # incompatibilities before merge (see PR #119, where bun 1.3.13 # changed --filter hoisting and broke the per-package COPY). - uses: docker/build-push-action@v6 + uses: docker/build-push-action@v7 with: context: ${{ github.workspace }} file: packages/kit/Dockerfile @@ -127,12 +127,20 @@ jobs: # VITE_KIT_SENTRY_DSN / VITE_KIT_MIXPANEL_TOKEN are optional — # if unset, the SPA skips the corresponding analytics init at # runtime rather than failing the build. + # Mixpanel is passed as a BuildKit secret, not an ARG/ENV pair, + # so Docker's secret-name check does not flag a public SPA token. + BUILD_FLAGS=( + --build-arg "VITE_KIT_CONVEX_URL=$VITE_KIT_CONVEX_URL" + --build-arg "VITE_KIT_SENTRY_DSN=$VITE_KIT_SENTRY_DSN" + ) + if [ -n "$VITE_KIT_MIXPANEL_TOKEN" ]; then + BUILD_FLAGS+=(--build-secret "VITE_KIT_MIXPANEL_TOKEN=$VITE_KIT_MIXPANEL_TOKEN") + fi + flyctl deploy --remote-only \ --config packages/kit/fly.toml \ --dockerfile packages/kit/Dockerfile \ - --build-arg VITE_KIT_CONVEX_URL="$VITE_KIT_CONVEX_URL" \ - --build-arg VITE_KIT_SENTRY_DSN="$VITE_KIT_SENTRY_DSN" \ - --build-arg VITE_KIT_MIXPANEL_TOKEN="$VITE_KIT_MIXPANEL_TOKEN" + "${BUILD_FLAGS[@]}" - name: Setup Bun (for convex deploy) if: ${{ env.KIT_CONVEX_DEPLOY_KEY != '' }} diff --git a/.github/workflows/publish-flutter.yml b/.github/workflows/publish-flutter.yml index e61c00f3..39549029 100644 --- a/.github/workflows/publish-flutter.yml +++ b/.github/workflows/publish-flutter.yml @@ -18,7 +18,7 @@ jobs: working-directory: libraries/flutter_inapp_purchase steps: - name: Checkout - uses: actions/checkout@v4 + uses: actions/checkout@v6 - name: Setup Flutter uses: subosito/flutter-action@v2 diff --git a/.github/workflows/release-apple.yml b/.github/workflows/release-apple.yml index 27a2b063..ee494211 100644 --- a/.github/workflows/release-apple.yml +++ b/.github/workflows/release-apple.yml @@ -46,7 +46,7 @@ jobs: timeout-minutes: 30 steps: - name: Checkout - uses: actions/checkout@v4 + uses: actions/checkout@v6 - name: Select Xcode run: | @@ -66,7 +66,7 @@ jobs: steps: - name: Checkout - uses: actions/checkout@v4 + uses: actions/checkout@v6 with: fetch-depth: 0 diff --git a/.github/workflows/release-expo.yml b/.github/workflows/release-expo.yml index 3c6593be..27530f59 100644 --- a/.github/workflows/release-expo.yml +++ b/.github/workflows/release-expo.yml @@ -44,7 +44,7 @@ jobs: working-directory: libraries/expo-iap steps: - name: Checkout - uses: actions/checkout@v4 + uses: actions/checkout@v6 - name: Setup Node.js uses: actions/setup-node@v6 @@ -90,7 +90,7 @@ jobs: working-directory: libraries/expo-iap steps: - name: Checkout - uses: actions/checkout@v4 + uses: actions/checkout@v6 - name: Setup Node.js uses: actions/setup-node@v6 @@ -142,7 +142,7 @@ jobs: working-directory: libraries/expo-iap steps: - name: Checkout - uses: actions/checkout@v4 + uses: actions/checkout@v6 with: fetch-depth: 0 diff --git a/.github/workflows/release-flutter.yml b/.github/workflows/release-flutter.yml index b6767143..a59196ce 100644 --- a/.github/workflows/release-flutter.yml +++ b/.github/workflows/release-flutter.yml @@ -43,7 +43,7 @@ jobs: working-directory: libraries/flutter_inapp_purchase steps: - name: Checkout - uses: actions/checkout@v4 + uses: actions/checkout@v6 - name: Setup Flutter uses: subosito/flutter-action@v2 @@ -85,7 +85,7 @@ jobs: working-directory: libraries/flutter_inapp_purchase steps: - name: Checkout - uses: actions/checkout@v4 + uses: actions/checkout@v6 - name: Setup Flutter uses: subosito/flutter-action@v2 @@ -134,7 +134,7 @@ jobs: working-directory: libraries/flutter_inapp_purchase steps: - name: Checkout - uses: actions/checkout@v4 + uses: actions/checkout@v6 with: fetch-depth: 0 token: ${{ secrets.DEPENDENCY_UPDATE_PAT }} diff --git a/.github/workflows/release-godot.yml b/.github/workflows/release-godot.yml index 20c1414b..94192b7a 100644 --- a/.github/workflows/release-godot.yml +++ b/.github/workflows/release-godot.yml @@ -41,7 +41,7 @@ jobs: working-directory: libraries/godot-iap steps: - name: Checkout - uses: actions/checkout@v4 + uses: actions/checkout@v6 - name: Setup Java uses: actions/setup-java@v5 @@ -69,7 +69,7 @@ jobs: working-directory: libraries/godot-iap steps: - name: Checkout - uses: actions/checkout@v4 + uses: actions/checkout@v6 - name: Verify iOS Frameworks run: | @@ -93,7 +93,7 @@ jobs: working-directory: libraries/godot-iap steps: - name: Checkout - uses: actions/checkout@v4 + uses: actions/checkout@v6 with: fetch-depth: 0 diff --git a/.github/workflows/release-google.yml b/.github/workflows/release-google.yml index 552c74d4..fed06f06 100644 --- a/.github/workflows/release-google.yml +++ b/.github/workflows/release-google.yml @@ -35,7 +35,7 @@ jobs: timeout-minutes: 20 steps: - name: Checkout - uses: actions/checkout@v4 + uses: actions/checkout@v6 - name: Set up JDK 17 uses: actions/setup-java@v5 @@ -64,7 +64,7 @@ jobs: steps: - name: Checkout - uses: actions/checkout@v4 + uses: actions/checkout@v6 with: fetch-depth: 0 diff --git a/.github/workflows/release-kmp.yml b/.github/workflows/release-kmp.yml index 5b48e328..6eeb412a 100644 --- a/.github/workflows/release-kmp.yml +++ b/.github/workflows/release-kmp.yml @@ -43,7 +43,7 @@ jobs: run: working-directory: libraries/kmp-iap steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v6 - name: Set up JDK 17 uses: actions/setup-java@v5 @@ -67,7 +67,7 @@ jobs: run: working-directory: libraries/kmp-iap steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v6 - name: Set up JDK 17 uses: actions/setup-java@v5 @@ -102,7 +102,7 @@ jobs: run: working-directory: libraries/kmp-iap steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v6 with: fetch-depth: 0 diff --git a/.github/workflows/release-react-native.yml b/.github/workflows/release-react-native.yml index 528db98d..6f361078 100644 --- a/.github/workflows/release-react-native.yml +++ b/.github/workflows/release-react-native.yml @@ -44,7 +44,7 @@ jobs: working-directory: libraries/react-native-iap steps: - name: Checkout - uses: actions/checkout@v4 + uses: actions/checkout@v6 - name: Setup Node.js uses: actions/setup-node@v6 @@ -83,7 +83,7 @@ jobs: working-directory: libraries/react-native-iap steps: - name: Checkout - uses: actions/checkout@v4 + uses: actions/checkout@v6 - name: Setup Node.js uses: actions/setup-node@v6 @@ -140,7 +140,7 @@ jobs: working-directory: libraries/react-native-iap steps: - name: Checkout - uses: actions/checkout@v4 + uses: actions/checkout@v6 with: fetch-depth: 0 diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index f6c1f50a..a905f5ef 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -22,7 +22,7 @@ jobs: runs-on: ubuntu-latest steps: - name: Checkout repository - uses: actions/checkout@v4 + uses: actions/checkout@v6 with: fetch-depth: 0 diff --git a/packages/gql/src/webhook-client.sse.test.ts b/packages/gql/src/webhook-client.sse.test.ts new file mode 100644 index 00000000..a77eac94 --- /dev/null +++ b/packages/gql/src/webhook-client.sse.test.ts @@ -0,0 +1,182 @@ +import { describe, expect, it } from "vitest"; + +import { + connectWebhookStream, + type WebhookEventPayload, + type WebhookEventStream, +} from "./webhook-client"; + +const validEvent: WebhookEventPayload = { + id: "uuid-typed-sse", + type: "SubscriptionRenewed", + source: "AppleAppStoreServerNotificationsV2", + platform: "IOS", + environment: "Production", + projectId: "project-1", + occurredAt: 1_711_000_000_000, + receivedAt: 1_711_000_001_000, + purchaseToken: "token-typed-sse", + productId: "com.example.premium", + subscriptionState: "Active", +}; + +class FetchEventSource implements WebhookEventStream { + onmessage: ((event: { data: string; lastEventId?: string }) => void) | null = + null; + onerror: ((error: unknown) => void) | null = null; + + private readonly controller = new AbortController(); + private readonly listeners = new Map< + string, + Array<(event: { data: string; lastEventId?: string }) => void> + >(); + private closed = false; + + constructor(url: string) { + void this.read(url); + } + + addEventListener( + type: string, + listener: (event: { data: string; lastEventId?: string }) => void, + ) { + const listeners = this.listeners.get(type) ?? []; + listeners.push(listener); + this.listeners.set(type, listeners); + } + + close() { + this.closed = true; + this.controller.abort(); + } + + private async read(url: string) { + try { + const response = await fetch(url, { signal: this.controller.signal }); + if (!response.body) { + throw new Error("SSE response has no body"); + } + + const reader = response.body.getReader(); + const decoder = new TextDecoder(); + let buffer = ""; + while (!this.closed) { + const { value, done } = await reader.read(); + if (done) break; + buffer += decoder.decode(value, { stream: true }); + let separator = buffer.indexOf("\n\n"); + while (separator !== -1) { + const frame = buffer.slice(0, separator); + buffer = buffer.slice(separator + 2); + this.dispatch(frame); + separator = buffer.indexOf("\n\n"); + } + } + } catch (error) { + if (!this.closed) { + this.onerror?.(error); + } + } + } + + private dispatch(frame: string) { + if (!frame.trim()) return; + + let eventType = "message"; + let lastEventId: string | undefined; + const data: string[] = []; + + for (const rawLine of frame.split(/\r?\n/)) { + if (!rawLine || rawLine.startsWith(":")) continue; + const colonIndex = rawLine.indexOf(":"); + const field = colonIndex === -1 ? rawLine : rawLine.slice(0, colonIndex); + let value = colonIndex === -1 ? "" : rawLine.slice(colonIndex + 1); + if (value.startsWith(" ")) { + value = value.slice(1); + } + + if (field === "event") { + eventType = value || "message"; + } else if (field === "id") { + lastEventId = value; + } else if (field === "data") { + data.push(value); + } + } + + const event = { data: data.join("\n"), lastEventId }; + if (eventType === "message") { + this.onmessage?.(event); + } + for (const listener of this.listeners.get(eventType) ?? []) { + listener(event); + } + } +} + +describe("connectWebhookStream SSE integration", () => { + it("receives typed SSE frames from an actual HTTP stream", async () => { + const encoder = new TextEncoder(); + const server = Bun.serve({ + port: 0, + fetch: () => + new Response( + new ReadableStream({ + start(controller) { + controller.enqueue( + encoder.encode( + [ + "id: uuid-typed-sse", + "event: SubscriptionRenewed", + `data: ${JSON.stringify(validEvent)}`, + "", + "", + ].join("\n"), + ), + ); + controller.close(); + }, + }), + { + headers: { + "content-type": "text/event-stream", + "cache-control": "no-cache", + }, + }, + ), + }); + + let listener: { close(): void } | null = null; + try { + const event = await new Promise( + (resolve, reject) => { + const timeout = setTimeout( + () => reject(new Error("Timed out waiting for SSE event")), + 2_000, + ); + + listener = connectWebhookStream({ + apiKey: "test-key", + baseUrl: `http://127.0.0.1:${server.port}`, + eventSourceFactory: (url) => new FetchEventSource(url), + onEvent: (received) => { + clearTimeout(timeout); + resolve(received); + }, + onError: (error) => { + clearTimeout(timeout); + reject(error); + }, + }); + }, + ); + + expect(event.id).toBe("uuid-typed-sse"); + expect(event.type).toBe("SubscriptionRenewed"); + expect(event.purchaseToken).toBe("token-typed-sse"); + } finally { + listener?.close(); + server.stop(true); + } + }); +}); diff --git a/packages/kit/Dockerfile b/packages/kit/Dockerfile index 7069242d..bbfa53c1 100644 --- a/packages/kit/Dockerfile +++ b/packages/kit/Dockerfile @@ -1,3 +1,5 @@ +# syntax=docker/dockerfile:1.7 + FROM oven/bun:1.3.13-slim AS base # Build context is the monorepo root (see deploy-kit.yml + scripts/deploy-prod.sh). @@ -44,11 +46,11 @@ ENV VITE_KIT_CONVEX_URL=${VITE_KIT_CONVEX_URL} # a build arg instead of committing to the repo so it rotates cleanly. ARG VITE_KIT_SENTRY_DSN ENV VITE_KIT_SENTRY_DSN=${VITE_KIT_SENTRY_DSN} -# Mixpanel project token — also public by design (exposed in the SPA -# bundle to identify users client-side). Empty value disables init. -ARG VITE_KIT_MIXPANEL_TOKEN -ENV VITE_KIT_MIXPANEL_TOKEN=${VITE_KIT_MIXPANEL_TOKEN} -RUN bun run build:all +# Mixpanel project token is public by design and baked into the SPA when set, +# but pass it via BuildKit secret so Docker does not flag TOKEN-named ARG/ENV. +RUN --mount=type=secret,id=VITE_KIT_MIXPANEL_TOKEN,required=false \ + VITE_KIT_MIXPANEL_TOKEN="$(cat /run/secrets/VITE_KIT_MIXPANEL_TOKEN 2>/dev/null || true)" \ + bun run build:all # --- Runtime image (single binary + static assets) ----------------------- FROM debian:bookworm-slim AS runtime diff --git a/packages/kit/README.md b/packages/kit/README.md index 095556a9..a910824b 100644 --- a/packages/kit/README.md +++ b/packages/kit/README.md @@ -209,17 +209,16 @@ SMOKE_PORT=3200 ./scripts/smoke-server.sh ```bash flyctl auth login -# Source your production env first; do not paste the Convex URL here. -source .env.production -flyctl secrets set VITE_KIT_CONVEX_URL="$VITE_KIT_CONVEX_URL" -flyctl deploy \ - --build-arg VITE_KIT_CONVEX_URL="$VITE_KIT_CONVEX_URL" \ - --build-arg VITE_KIT_SENTRY_DSN="$VITE_KIT_SENTRY_DSN" # optional +# Fill .env.production from .env.example first. +bun run deploy:prod ``` -`VITE_*` values have to be passed as **build args**, not just runtime +`VITE_*` values have to be passed at **build time**, not just runtime secrets — Vite inlines them into the SPA bundle at `bun run build` -time. Omitting `VITE_KIT_SENTRY_DSN` is fine; the SPA skips Sentry init. +time. The deploy script sends `VITE_KIT_CONVEX_URL` and +`VITE_KIT_SENTRY_DSN` as build args, and sends `VITE_KIT_MIXPANEL_TOKEN` +as a BuildKit secret only to avoid Docker's TOKEN-named ARG/ENV warning. +Omitting Sentry or Mixpanel is fine; the SPA skips those integrations. Server-side runtime secrets (read by the compiled Bun binary at boot) are set once with `flyctl secrets set`: @@ -250,7 +249,7 @@ namespace them away from other monorepo secrets): | `KIT_CONVEX_DEPLOY_KEY` | ✅ yes | Convex function deploy (optional — step skips if absent) | | `VITE_KIT_CONVEX_URL` | ⚠️ public | Build arg for SPA — visible in deployed JS bundle | | `VITE_KIT_SENTRY_DSN` | ⚠️ public | Build arg for SPA (optional — SPA skips init if absent) | -| `VITE_KIT_MIXPANEL_TOKEN` | ⚠️ public | Build arg for SPA (optional — analytics opt-in) | +| `VITE_KIT_MIXPANEL_TOKEN` | ⚠️ public | BuildKit secret for SPA (optional — analytics opt-in) | If `KIT_CONVEX_DEPLOY_KEY` is **not** set, the Convex deploy step prints a skip message and exits 0 — you'll need to run diff --git a/packages/kit/convex/_generated/api.d.ts b/packages/kit/convex/_generated/api.d.ts index a92cd711..30f95d3d 100644 --- a/packages/kit/convex/_generated/api.d.ts +++ b/packages/kit/convex/_generated/api.d.ts @@ -58,6 +58,7 @@ import type * as subscriptions_internal from "../subscriptions/internal.js"; import type * as subscriptions_monthlyMicros from "../subscriptions/monthlyMicros.js"; import type * as subscriptions_mutation from "../subscriptions/mutation.js"; import type * as subscriptions_query from "../subscriptions/query.js"; +import type * as subscriptions_selectLatest from "../subscriptions/selectLatest.js"; import type * as subscriptions_stateMachine from "../subscriptions/stateMachine.js"; import type * as subscriptions_stats from "../subscriptions/stats.js"; import type * as userProfiles_action from "../userProfiles/action.js"; @@ -134,6 +135,7 @@ declare const fullApi: ApiFromModules<{ "subscriptions/monthlyMicros": typeof subscriptions_monthlyMicros; "subscriptions/mutation": typeof subscriptions_mutation; "subscriptions/query": typeof subscriptions_query; + "subscriptions/selectLatest": typeof subscriptions_selectLatest; "subscriptions/stateMachine": typeof subscriptions_stateMachine; "subscriptions/stats": typeof subscriptions_stats; "userProfiles/action": typeof userProfiles_action; diff --git a/packages/kit/convex/schema.ts b/packages/kit/convex/schema.ts index 3a42ba44..f576aaa4 100644 --- a/packages/kit/convex/schema.ts +++ b/packages/kit/convex/schema.ts @@ -515,6 +515,14 @@ const schema = defineSchema({ }) .index("by_project", ["projectId"]) .index("by_purchase_token", ["purchaseToken"]) + // (projectId, receivedAt, _creationTime) — Convex appends + // `_creationTime` automatically. Used by the SSE backfill + // `webhookEventsSince` query so the boundary-cohort tail past the + // millisecond cursor can be walked directly via + // `gt("_creationTime", afterCreationTime)` instead of an in-memory + // filter that would silently drop pages when a single + // millisecond's burst exceeds the take() cap (PR #124 + // (https://github.com/hyodotdev/openiap/pull/124) review). .index("by_project_and_received", ["projectId", "receivedAt"]) .index("by_received_at", ["receivedAt"]) // Lookup helper used by the SSE stream's `Last-Event-ID` cursor @@ -525,18 +533,6 @@ const schema = defineSchema({ .index("by_project_and_notification_id", [ "projectId", "sourceNotificationId", - ]) - // Composite (projectId, receivedAt, _creationTime) for the SSE - // backfill `webhookEventsSince` query — lets the boundary-cohort - // tail past the millisecond cursor be walked directly via the - // index (`gt("_creationTime", afterCreationTime)`) instead of an - // in-memory filter that would silently drop pages when a single - // millisecond's burst exceeds the take() cap (PR #124 - // (https://github.com/hyodotdev/openiap/pull/124) review). - .index("by_project_and_received_and_creation", [ - "projectId", - "receivedAt", - "_creationTime", ]), // Dedup table for webhook payloads. Insertion uses diff --git a/packages/kit/convex/webhooks/query.ts b/packages/kit/convex/webhooks/query.ts index 2b6be531..76842881 100644 --- a/packages/kit/convex/webhooks/query.ts +++ b/packages/kit/convex/webhooks/query.ts @@ -170,7 +170,7 @@ export const webhookEventsSince = query({ if (args.afterCreationTime !== undefined) { const boundaryTail = await ctx.db .query("webhookEvents") - .withIndex("by_project_and_received_and_creation", (q) => + .withIndex("by_project_and_received", (q) => q .eq("projectId", project._id) .eq("receivedAt", args.sinceMs) @@ -183,7 +183,7 @@ export const webhookEventsSince = query({ if (events.length < limit) { const postBoundary = await ctx.db .query("webhookEvents") - .withIndex("by_project_and_received_and_creation", (q) => + .withIndex("by_project_and_received", (q) => q.eq("projectId", project._id).gt("receivedAt", args.sinceMs), ) .order("asc") @@ -197,7 +197,7 @@ export const webhookEventsSince = query({ // matches up to the limit. const boundary = await ctx.db .query("webhookEvents") - .withIndex("by_project_and_received_and_creation", (q) => + .withIndex("by_project_and_received", (q) => q.eq("projectId", project._id).eq("receivedAt", args.sinceMs), ) .order("asc") @@ -234,7 +234,7 @@ export const latestWebhookEventsSince = query({ const limit = Math.min(Math.max(args.limit ?? 100, 1), 500); const latest = await ctx.db .query("webhookEvents") - .withIndex("by_project_and_received_and_creation", (q) => + .withIndex("by_project_and_received", (q) => q.eq("projectId", project._id).gte("receivedAt", args.sinceMs), ) .order("desc") diff --git a/packages/kit/scripts/deploy-prod.sh b/packages/kit/scripts/deploy-prod.sh index e30950e8..f28b3800 100755 --- a/packages/kit/scripts/deploy-prod.sh +++ b/packages/kit/scripts/deploy-prod.sh @@ -56,16 +56,18 @@ esac echo "Deploying to Fly with VITE_KIT_CONVEX_URL=$VITE_KIT_CONVEX_URL" -BUILD_ARGS=(--build-arg "VITE_KIT_CONVEX_URL=$VITE_KIT_CONVEX_URL") +BUILD_FLAGS=(--build-arg "VITE_KIT_CONVEX_URL=$VITE_KIT_CONVEX_URL") if [ -n "${VITE_KIT_SENTRY_DSN:-}" ]; then - BUILD_ARGS+=(--build-arg "VITE_KIT_SENTRY_DSN=$VITE_KIT_SENTRY_DSN") + BUILD_FLAGS+=(--build-arg "VITE_KIT_SENTRY_DSN=$VITE_KIT_SENTRY_DSN") fi if [ -n "${VITE_KIT_MIXPANEL_TOKEN:-}" ]; then - BUILD_ARGS+=(--build-arg "VITE_KIT_MIXPANEL_TOKEN=$VITE_KIT_MIXPANEL_TOKEN") + # Public SPA config, passed as a BuildKit secret to avoid Docker's + # TOKEN-named ARG/ENV warning while still baking it into the bundle. + BUILD_FLAGS+=(--build-secret "VITE_KIT_MIXPANEL_TOKEN=$VITE_KIT_MIXPANEL_TOKEN") fi cd "$REPO_ROOT" exec flyctl deploy --app openiap-kit \ --config packages/kit/fly.toml \ --dockerfile packages/kit/Dockerfile \ - "${BUILD_ARGS[@]}" "$@" + "${BUILD_FLAGS[@]}" "$@" diff --git a/packages/kit/server/api/v1/webhookStreamDrain.test.ts b/packages/kit/server/api/v1/webhookStreamDrain.test.ts new file mode 100644 index 00000000..f7156111 --- /dev/null +++ b/packages/kit/server/api/v1/webhookStreamDrain.test.ts @@ -0,0 +1,120 @@ +import { describe, expect, it } from "vitest"; + +import { drainWebhookEventBatches } from "./webhookStreamDrain"; + +type EventRow = { + id: string; + receivedAt: number; + _creationTime: number; +}; + +function makeSeen(initialIds: string[] = []) { + const seen = new Set(initialIds); + return { + has: (id: string) => seen.has(id), + add: (id: string) => { + seen.add(id); + }, + }; +} + +function makePagedLoader(events: EventRow[]) { + return async ({ + sinceMs, + afterCreationTime, + limit, + }: { + sinceMs: number; + afterCreationTime?: number; + limit: number; + }) => + events + .filter((event) => { + if (event.receivedAt > sinceMs) return true; + if (event.receivedAt !== sinceMs) return false; + return ( + afterCreationTime === undefined || + event._creationTime > afterCreationTime + ); + }) + .slice(0, limit); +} + +describe("drainWebhookEventBatches", () => { + it("walks beyond the first 500-event page with a moving cursor", async () => { + const events = Array.from({ length: 1_201 }, (_, index) => ({ + id: `event-${index}`, + receivedAt: 1_000 + index + 1, + _creationTime: 10_000 + index, + })); + const delivered: string[] = []; + + const result = await drainWebhookEventBatches({ + initialCursor: { sinceMs: 1_000 }, + maxIterations: 10, + loadBatch: makePagedLoader(events), + seen: makeSeen(), + writeEvent: async (_event, id) => { + delivered.push(id); + }, + }); + + expect(delivered).toHaveLength(1_201); + expect(delivered.at(0)).toBe("event-0"); + expect(delivered.at(-1)).toBe("event-1200"); + expect(result.cursor).toEqual({ + sinceMs: 2_201, + afterCreationTime: 11_200, + }); + expect(result.hitIterationLimit).toBe(false); + }); + + it("advances through a same-receivedAt cohort larger than one page", async () => { + const events = Array.from({ length: 1_001 }, (_, index) => ({ + id: `same-ms-${index}`, + receivedAt: 2_000, + _creationTime: index + 1, + })); + const delivered: string[] = []; + + const result = await drainWebhookEventBatches({ + initialCursor: { sinceMs: 2_000, afterCreationTime: 0 }, + maxIterations: 10, + loadBatch: makePagedLoader(events), + seen: makeSeen(), + writeEvent: async (_event, id) => { + delivered.push(id); + }, + }); + + expect(delivered).toHaveLength(1_001); + expect(result.cursor).toEqual({ + sinceMs: 2_000, + afterCreationTime: 1_001, + }); + }); + + it("advances the cursor even when duplicate ids are skipped", async () => { + const events = [ + { id: "already-sent", receivedAt: 2_001, _creationTime: 1 }, + { id: "new-event", receivedAt: 2_002, _creationTime: 2 }, + ]; + const delivered: string[] = []; + + const result = await drainWebhookEventBatches({ + initialCursor: { sinceMs: 2_000 }, + maxIterations: 10, + loadBatch: makePagedLoader(events), + seen: makeSeen(["already-sent"]), + writeEvent: async (_event, id) => { + delivered.push(id); + }, + }); + + expect(delivered).toEqual(["new-event"]); + expect(result.cursor).toEqual({ + sinceMs: 2_002, + afterCreationTime: 2, + }); + }); +}); diff --git a/packages/kit/server/api/v1/webhookStreamDrain.ts b/packages/kit/server/api/v1/webhookStreamDrain.ts new file mode 100644 index 00000000..f28c35bb --- /dev/null +++ b/packages/kit/server/api/v1/webhookStreamDrain.ts @@ -0,0 +1,119 @@ +export type WebhookStreamCursor = { + sinceMs: number; + afterCreationTime?: number; +}; + +export type WebhookStreamSeenSet = { + has(id: string): boolean; + add(id: string): void; +}; + +export type DrainWebhookEventBatchesOptions = { + initialCursor: WebhookStreamCursor; + limit?: number; + maxIterations: number; + isAborted?: () => boolean; + loadBatch: ( + cursor: WebhookStreamCursor & { limit: number }, + ) => Promise>>; + seen: WebhookStreamSeenSet; + writeEvent: (event: Record, id: string) => Promise; + onIterationLimit?: (state: { + iterations: number; + cursor: WebhookStreamCursor; + }) => void; +}; + +export type DrainWebhookEventBatchesResult = { + cursor: WebhookStreamCursor; + delivered: number; + iterations: number; + hitIterationLimit: boolean; +}; + +export async function drainWebhookEventBatches( + options: DrainWebhookEventBatchesOptions, +): Promise { + const limit = options.limit ?? 500; + const cursor: WebhookStreamCursor = { + sinceMs: options.initialCursor.sinceMs, + afterCreationTime: options.initialCursor.afterCreationTime, + }; + let delivered = 0; + let iterations = 0; + let hitIterationLimit = false; + + while (!options.isAborted?.()) { + if (iterations >= options.maxIterations) { + hitIterationLimit = true; + options.onIterationLimit?.({ iterations, cursor: { ...cursor } }); + break; + } + iterations += 1; + + const batch = await options.loadBatch({ ...cursor, limit }); + if (!batch.length) { + break; + } + + let advanced = false; + for (const event of batch) { + if (options.isAborted?.()) break; + + if (advanceCursor(cursor, event)) { + advanced = true; + } + + const id = typeof event.id === "string" ? event.id : null; + if (!id || options.seen.has(id)) { + continue; + } + options.seen.add(id); + await options.writeEvent(event, id); + delivered += 1; + } + + if (!advanced || batch.length < limit) { + break; + } + } + + return { + cursor, + delivered, + iterations, + hitIterationLimit, + }; +} + +function advanceCursor( + cursor: WebhookStreamCursor, + event: Record, +): boolean { + const receivedAt = + typeof event.receivedAt === "number" ? event.receivedAt : null; + if (receivedAt === null) { + return false; + } + + const creationTime = + typeof event._creationTime === "number" ? event._creationTime : undefined; + + if (receivedAt > cursor.sinceMs) { + cursor.sinceMs = receivedAt; + cursor.afterCreationTime = creationTime; + return true; + } + + if ( + receivedAt === cursor.sinceMs && + creationTime !== undefined && + (cursor.afterCreationTime === undefined || + creationTime > cursor.afterCreationTime) + ) { + cursor.afterCreationTime = creationTime; + return true; + } + + return false; +} diff --git a/packages/kit/server/api/v1/webhooks.ts b/packages/kit/server/api/v1/webhooks.ts index ae39243b..8f81ab70 100644 --- a/packages/kit/server/api/v1/webhooks.ts +++ b/packages/kit/server/api/v1/webhooks.ts @@ -6,6 +6,7 @@ import { ConvexClient } from "convex/browser"; import { api } from "@/convex"; import { client, convexUrlForRealtime, handleConvexError } from "../../convex"; +import { drainWebhookEventBatches } from "./webhookStreamDrain"; // Shared reactive client for the SSE webhook stream. We keep a // SINGLE WebSocket open to Convex regardless of how many SDK clients @@ -622,54 +623,22 @@ webhooks.get("/stream/:apiKey", async (c) => { try { do { liveDrainRequested = false; - let iterations = 0; - while (!aborted) { - if (iterations >= DRAIN_MAX_ITERATIONS) { - console.warn( - "[webhooks/stream] live drain hit DRAIN_MAX_ITERATIONS", - { iterations, liveCursor }, - ); - break; - } - iterations += 1; - const batch = (await client.query( - api.webhooks.query.webhookEventsSince, - { + const result = await drainWebhookEventBatches({ + initialCursor: { + sinceMs: liveCursor, + afterCreationTime: liveCreationCursor, + }, + maxIterations: DRAIN_MAX_ITERATIONS, + isAborted: () => aborted, + loadBatch: async ({ sinceMs, afterCreationTime, limit }) => + await client.query(api.webhooks.query.webhookEventsSince, { apiKey, - sinceMs: liveCursor, - afterCreationTime: liveCreationCursor, - limit: 500, - }, - )) as Array>; - if (!batch.length) { - break; - } - - let advanced = false; - for (const event of batch) { - if (aborted) break; - const receivedAt = - typeof event.receivedAt === "number" ? event.receivedAt : null; - const creationTime = - typeof event._creationTime === "number" - ? event._creationTime - : undefined; - if ( - receivedAt !== null && - (receivedAt > liveCursor || - (receivedAt === liveCursor && - creationTime !== undefined && - (liveCreationCursor === undefined || - creationTime > liveCreationCursor))) - ) { - liveCursor = receivedAt; - liveCreationCursor = creationTime; - advanced = true; - } - - const id = typeof event.id === "string" ? event.id : null; - if (!id || seen.has(id)) continue; - seen.add(id); + sinceMs, + afterCreationTime, + limit, + }), + seen, + writeEvent: async (event, id) => { await stream .writeSSE({ id, @@ -682,11 +651,16 @@ webhooks.get("/stream/:apiKey", async (c) => { .catch((err) => { console.error("[webhooks/stream] live write failed", err); }); - } - if (!advanced || batch.length < 500) { - break; - } - } + }, + onIterationLimit: ({ iterations, cursor }) => { + console.warn( + "[webhooks/stream] live drain hit DRAIN_MAX_ITERATIONS", + { iterations, liveCursor: cursor.sinceMs }, + ); + }, + }); + liveCursor = result.cursor.sinceMs; + liveCreationCursor = result.cursor.afterCreationTime; } while (liveDrainRequested && !aborted); } catch (error) { console.error("[webhooks/stream] live drain failed", error); From 267f0edb24df6366868e6916b59d2a4d630ae6c6 Mon Sep 17 00:00:00 2001 From: hyochan Date: Mon, 4 May 2026 17:45:20 +0900 Subject: [PATCH 2/6] fix(kit): address PR review (Mixpanel cache, gql tests in CI, drain fallback) - Add VITE_KIT_MIXPANEL_TOKEN_HASH cache-bust ARG so a token rotation invalidates the cached `bun run build:all` layer (BuildKit secret values are not part of the layer cache key, so without it GHA / Fly's remote builder would reuse the prior layer and ship a stale token). - Wire packages/gql vitest tests into CI: add a `test` script and a Run tests step in the test-gql job. Existing webhook-client tests weren't being exercised on PRs. - Port the saturated-cohort fallback from the original Phase 1 drain into drainWebhookEventBatches so the helper makes forward progress even when an underlying query truncates a same-millisecond cohort past its take() cap. Add a unit test covering that scenario. Co-Authored-By: Claude Opus 4.7 (1M context) --- .github/workflows/ci.yml | 4 ++ .github/workflows/deploy-kit.yml | 5 ++ packages/gql/package.json | 3 +- packages/kit/Dockerfile | 5 ++ packages/kit/scripts/deploy-prod.sh | 5 ++ .../server/api/v1/webhookStreamDrain.test.ts | 64 +++++++++++++++++++ .../kit/server/api/v1/webhookStreamDrain.ts | 21 ++++++ 7 files changed, 106 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 58752e09..0041f0c3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -88,6 +88,10 @@ jobs: test -f src/generated/Types.swift || exit 1 test -f src/generated/types.dart || exit 1 + - name: Run tests + working-directory: packages/gql + run: bun run test + - name: Verify generated types are committed (no drift) run: | # `bun run generate` is expected to be idempotent: running it diff --git a/.github/workflows/deploy-kit.yml b/.github/workflows/deploy-kit.yml index 86cf25b5..f5be2694 100644 --- a/.github/workflows/deploy-kit.yml +++ b/.github/workflows/deploy-kit.yml @@ -129,11 +129,16 @@ jobs: # runtime rather than failing the build. # Mixpanel is passed as a BuildKit secret, not an ARG/ENV pair, # so Docker's secret-name check does not flag a public SPA token. + # A short hash of the token is also passed as a build arg purely to + # bust the layer cache when the token rotates (BuildKit secret values + # are not part of the cache key). BUILD_FLAGS=( --build-arg "VITE_KIT_CONVEX_URL=$VITE_KIT_CONVEX_URL" --build-arg "VITE_KIT_SENTRY_DSN=$VITE_KIT_SENTRY_DSN" ) if [ -n "$VITE_KIT_MIXPANEL_TOKEN" ]; then + MIXPANEL_TOKEN_HASH=$(printf '%s' "$VITE_KIT_MIXPANEL_TOKEN" | sha256sum | cut -c1-16) + BUILD_FLAGS+=(--build-arg "VITE_KIT_MIXPANEL_TOKEN_HASH=$MIXPANEL_TOKEN_HASH") BUILD_FLAGS+=(--build-secret "VITE_KIT_MIXPANEL_TOKEN=$VITE_KIT_MIXPANEL_TOKEN") fi diff --git a/packages/gql/package.json b/packages/gql/package.json index 7dd62db6..41085dab 100644 --- a/packages/gql/package.json +++ b/packages/gql/package.json @@ -18,7 +18,8 @@ "generate:kotlin": "bun codegen/index.ts kotlin", "generate:dart": "bun codegen/index.ts dart", "generate:gdscript": "bun codegen/index.ts gdscript", - "sync": "bun scripts/sync-to-platforms.mjs" + "sync": "bun scripts/sync-to-platforms.mjs", + "test": "bun test src" }, "keywords": [ "graphql", diff --git a/packages/kit/Dockerfile b/packages/kit/Dockerfile index bbfa53c1..da149886 100644 --- a/packages/kit/Dockerfile +++ b/packages/kit/Dockerfile @@ -48,6 +48,11 @@ ARG VITE_KIT_SENTRY_DSN ENV VITE_KIT_SENTRY_DSN=${VITE_KIT_SENTRY_DSN} # Mixpanel project token is public by design and baked into the SPA when set, # but pass it via BuildKit secret so Docker does not flag TOKEN-named ARG/ENV. +# BuildKit secret values are not part of the layer cache key, so a non-secret +# fingerprint of the token is passed as a build arg purely to bust the +# `bun run build:all` cache when the token rotates — without it, GHA / Fly's +# remote builder would happily reuse the prior layer and ship a stale token. +ARG VITE_KIT_MIXPANEL_TOKEN_HASH= RUN --mount=type=secret,id=VITE_KIT_MIXPANEL_TOKEN,required=false \ VITE_KIT_MIXPANEL_TOKEN="$(cat /run/secrets/VITE_KIT_MIXPANEL_TOKEN 2>/dev/null || true)" \ bun run build:all diff --git a/packages/kit/scripts/deploy-prod.sh b/packages/kit/scripts/deploy-prod.sh index f28b3800..b87e14e8 100755 --- a/packages/kit/scripts/deploy-prod.sh +++ b/packages/kit/scripts/deploy-prod.sh @@ -63,6 +63,11 @@ fi if [ -n "${VITE_KIT_MIXPANEL_TOKEN:-}" ]; then # Public SPA config, passed as a BuildKit secret to avoid Docker's # TOKEN-named ARG/ENV warning while still baking it into the bundle. + # The hash arg is purely a cache buster — BuildKit secret values do + # not participate in the layer cache key, so without it a token + # rotation would happily reuse the prior `bun run build:all` layer. + MIXPANEL_TOKEN_HASH=$(printf '%s' "$VITE_KIT_MIXPANEL_TOKEN" | shasum -a 256 | cut -c1-16) + BUILD_FLAGS+=(--build-arg "VITE_KIT_MIXPANEL_TOKEN_HASH=$MIXPANEL_TOKEN_HASH") BUILD_FLAGS+=(--build-secret "VITE_KIT_MIXPANEL_TOKEN=$VITE_KIT_MIXPANEL_TOKEN") fi diff --git a/packages/kit/server/api/v1/webhookStreamDrain.test.ts b/packages/kit/server/api/v1/webhookStreamDrain.test.ts index f7156111..b3ed4d51 100644 --- a/packages/kit/server/api/v1/webhookStreamDrain.test.ts +++ b/packages/kit/server/api/v1/webhookStreamDrain.test.ts @@ -94,6 +94,70 @@ describe("drainWebhookEventBatches", () => { }); }); + it("steps past a saturated millisecond cohort hidden by the query cap", async () => { + // Simulate the original webhookEventsSince bug shape: a same- + // millisecond burst that fills exactly `limit` rows so the helper + // keeps looping, then the next query at (sinceMs, afterCreationTime) + // lies and returns [] (mirroring an in-memory filter that drops + // the rest of the cohort past the take() cap). Without the + // saturated-cohort fallback the helper would declare drain + // complete and miss the post-cohort event. + const limit = 5; + const cohort = Array.from({ length: limit }, (_, index) => ({ + id: `cohort-${index}`, + receivedAt: 5_000, + _creationTime: index + 1, + })); + const postCohort = { + id: "post-cohort", + receivedAt: 5_001, + _creationTime: 100, + }; + const loadBatch = async ({ + sinceMs, + afterCreationTime, + }: { + sinceMs: number; + afterCreationTime?: number; + limit: number; + }) => { + if (sinceMs === 5_000 && afterCreationTime === undefined) { + return cohort; + } + if (sinceMs === 5_000 && afterCreationTime !== undefined) { + // The buggy underlying query: claims the cohort is exhausted + // even though we've only walked a take()-capped slice. + return []; + } + if (sinceMs >= 5_001) { + return [postCohort]; + } + return []; + }; + + const delivered: string[] = []; + const result = await drainWebhookEventBatches({ + initialCursor: { sinceMs: 5_000 }, + limit, + maxIterations: 10, + loadBatch, + seen: makeSeen(), + writeEvent: async (_event, id) => { + delivered.push(id); + }, + }); + + expect(delivered).toEqual([ + "cohort-0", + "cohort-1", + "cohort-2", + "cohort-3", + "cohort-4", + "post-cohort", + ]); + expect(result.cursor.sinceMs).toBe(5_001); + }); + it("advances the cursor even when duplicate ids are skipped", async () => { const events = [ { id: "already-sent", receivedAt: 2_001, _creationTime: 1 }, diff --git a/packages/kit/server/api/v1/webhookStreamDrain.ts b/packages/kit/server/api/v1/webhookStreamDrain.ts index f28c35bb..7f1fcc62 100644 --- a/packages/kit/server/api/v1/webhookStreamDrain.ts +++ b/packages/kit/server/api/v1/webhookStreamDrain.ts @@ -42,6 +42,12 @@ export async function drainWebhookEventBatches( let delivered = 0; let iterations = 0; let hitIterationLimit = false; + // Tracks whether the previous iteration delivered events stuck at + // `cursor.sinceMs`. Used by the saturated-cohort fallback below so a + // single millisecond's burst that exceeds the underlying query's row + // cap can still make forward progress instead of declaring drain + // complete on the next empty batch. + let lastDeliveredReceivedAt: number | null = null; while (!options.isAborted?.()) { if (iterations >= options.maxIterations) { @@ -53,6 +59,18 @@ export async function drainWebhookEventBatches( const batch = await options.loadBatch({ ...cursor, limit }); if (!batch.length) { + // Saturated-cohort fallback: if the previous iteration delivered + // events at this millisecond and the next query came back empty, + // the underlying query may have hidden the rest of that cohort + // behind its own row cap (e.g. boundaryTail.take(limit) returning + // a partial slice of a same-ms burst). Advance past the + // millisecond and try once more before declaring drain complete. + if (lastDeliveredReceivedAt === cursor.sinceMs) { + cursor.sinceMs += 1; + cursor.afterCreationTime = undefined; + lastDeliveredReceivedAt = null; + continue; + } break; } @@ -71,6 +89,9 @@ export async function drainWebhookEventBatches( options.seen.add(id); await options.writeEvent(event, id); delivered += 1; + if (typeof event.receivedAt === "number") { + lastDeliveredReceivedAt = event.receivedAt; + } } if (!advanced || batch.length < limit) { From 2af6471c1d83f483b459c7f1b0d7adf75d885479 Mon Sep 17 00:00:00 2001 From: hyochan Date: Mon, 4 May 2026 17:57:58 +0900 Subject: [PATCH 3/6] fix(kit): tighten drain helper event type + portable sha256 in deploy-prod - Replace `Record` in drainWebhookEventBatches with a WebhookStreamEvent type that names the fields the helper actually reads (`id`, `receivedAt`, `_creationTime`) while still allowing pass-through fields the SSE writer needs (`type`, payload bodies). - deploy-prod.sh: prefer `sha256sum` (most Linux) and fall back to `shasum -a 256` (macOS); error out with a clear message when neither is on PATH so a deployer doesn't ship without the cache- bust arg. Co-Authored-By: Claude Opus 4.7 (1M context) --- packages/kit/scripts/deploy-prod.sh | 9 ++++++++- .../kit/server/api/v1/webhookStreamDrain.ts | 18 +++++++++++++++--- 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/packages/kit/scripts/deploy-prod.sh b/packages/kit/scripts/deploy-prod.sh index b87e14e8..168f711b 100755 --- a/packages/kit/scripts/deploy-prod.sh +++ b/packages/kit/scripts/deploy-prod.sh @@ -66,7 +66,14 @@ if [ -n "${VITE_KIT_MIXPANEL_TOKEN:-}" ]; then # The hash arg is purely a cache buster — BuildKit secret values do # not participate in the layer cache key, so without it a token # rotation would happily reuse the prior `bun run build:all` layer. - MIXPANEL_TOKEN_HASH=$(printf '%s' "$VITE_KIT_MIXPANEL_TOKEN" | shasum -a 256 | cut -c1-16) + if command -v sha256sum >/dev/null 2>&1; then + MIXPANEL_TOKEN_HASH=$(printf '%s' "$VITE_KIT_MIXPANEL_TOKEN" | sha256sum | cut -c1-16) + elif command -v shasum >/dev/null 2>&1; then + MIXPANEL_TOKEN_HASH=$(printf '%s' "$VITE_KIT_MIXPANEL_TOKEN" | shasum -a 256 | cut -c1-16) + else + echo "error: need sha256sum (most Linux) or shasum (macOS) on PATH to hash the Mixpanel token." >&2 + exit 1 + fi BUILD_FLAGS+=(--build-arg "VITE_KIT_MIXPANEL_TOKEN_HASH=$MIXPANEL_TOKEN_HASH") BUILD_FLAGS+=(--build-secret "VITE_KIT_MIXPANEL_TOKEN=$VITE_KIT_MIXPANEL_TOKEN") fi diff --git a/packages/kit/server/api/v1/webhookStreamDrain.ts b/packages/kit/server/api/v1/webhookStreamDrain.ts index 7f1fcc62..52fbaa24 100644 --- a/packages/kit/server/api/v1/webhookStreamDrain.ts +++ b/packages/kit/server/api/v1/webhookStreamDrain.ts @@ -8,6 +8,18 @@ export type WebhookStreamSeenSet = { add(id: string): void; }; +// Minimal contract the helper needs from a webhook event row. The +// caller may pass through additional fields (the SSE writer reads +// `type`, the dashboard reads payload bodies etc.); the index +// signature lets those flow through without forcing every callsite +// to widen back to `Record`. +export type WebhookStreamEvent = { + id?: unknown; + receivedAt?: unknown; + _creationTime?: unknown; + [key: string]: unknown; +}; + export type DrainWebhookEventBatchesOptions = { initialCursor: WebhookStreamCursor; limit?: number; @@ -15,9 +27,9 @@ export type DrainWebhookEventBatchesOptions = { isAborted?: () => boolean; loadBatch: ( cursor: WebhookStreamCursor & { limit: number }, - ) => Promise>>; + ) => Promise; seen: WebhookStreamSeenSet; - writeEvent: (event: Record, id: string) => Promise; + writeEvent: (event: WebhookStreamEvent, id: string) => Promise; onIterationLimit?: (state: { iterations: number; cursor: WebhookStreamCursor; @@ -109,7 +121,7 @@ export async function drainWebhookEventBatches( function advanceCursor( cursor: WebhookStreamCursor, - event: Record, + event: WebhookStreamEvent, ): boolean { const receivedAt = typeof event.receivedAt === "number" ? event.receivedAt : null; From 19af666f240e923d2187a2ff517e7132f16fb711 Mon Sep 17 00:00:00 2001 From: hyochan Date: Mon, 4 May 2026 23:30:09 +0900 Subject: [PATCH 4/6] fix(kit): address round-4 review (cache key, fallback gating, retry, vitest) - Dockerfile: expand VITE_KIT_MIXPANEL_TOKEN_HASH inside the RUN command so BuildKit folds it into the layer cache key. An unused ARG is ignored by the cache, which silently undid the rotation-busting effect. - webhookStreamDrain: gate the saturated-cohort fallback on lastObservedReceivedAt instead of lastDeliveredReceivedAt so a full same-millisecond page of dedup'd events still advances past the cohort. - webhookStreamDrain: add the event id to `seen` only after writeEvent succeeds. A thrown writer leaves the event eligible for retry on the next drain pass; the loop is sequential so no in-batch race. - Add tests for both behaviors (dedup'd cohort fallback, write-failure retryability). - packages/gql: switch the test script from `bun test src` to `vitest run src` and add vitest as a devDep so the SSE integration test runs under the same runner as the rest of the monorepo. - Rewrite the SSE integration test to use Node's `http.createServer` instead of `Bun.serve` so it runs under vitest on the Node pool. Co-Authored-By: Claude Opus 4.7 (1M context) --- bun.lock | 1 + packages/gql/package.json | 5 +- packages/gql/src/webhook-client.sse.test.ts | 68 +++++++++-------- packages/kit/Dockerfile | 6 ++ .../server/api/v1/webhookStreamDrain.test.ts | 73 +++++++++++++++++++ .../kit/server/api/v1/webhookStreamDrain.ts | 42 ++++++----- 6 files changed, 145 insertions(+), 50 deletions(-) diff --git a/bun.lock b/bun.lock index 9ea5ed61..78f62bdf 100644 --- a/bun.lock +++ b/bun.lock @@ -73,6 +73,7 @@ "handlebars": "^4.7.8", "ts-node": "^10.9.2", "typescript": "^5.9.2", + "vitest": "^4", }, }, "packages/kit": { diff --git a/packages/gql/package.json b/packages/gql/package.json index 41085dab..e8cb7b15 100644 --- a/packages/gql/package.json +++ b/packages/gql/package.json @@ -19,7 +19,7 @@ "generate:dart": "bun codegen/index.ts dart", "generate:gdscript": "bun codegen/index.ts gdscript", "sync": "bun scripts/sync-to-platforms.mjs", - "test": "bun test src" + "test": "vitest run src" }, "keywords": [ "graphql", @@ -37,7 +37,8 @@ "graphql": "^16.11.0", "handlebars": "^4.7.8", "ts-node": "^10.9.2", - "typescript": "^5.9.2" + "typescript": "^5.9.2", + "vitest": "^4" }, "packageManager": "bun@1.1.0" } diff --git a/packages/gql/src/webhook-client.sse.test.ts b/packages/gql/src/webhook-client.sse.test.ts index a77eac94..a9dab21a 100644 --- a/packages/gql/src/webhook-client.sse.test.ts +++ b/packages/gql/src/webhook-client.sse.test.ts @@ -1,3 +1,5 @@ +import { createServer, type Server } from "node:http"; +import type { AddressInfo } from "node:net"; import { describe, expect, it } from "vitest"; import { @@ -114,37 +116,43 @@ class FetchEventSource implements WebhookEventStream { } } -describe("connectWebhookStream SSE integration", () => { - it("receives typed SSE frames from an actual HTTP stream", async () => { - const encoder = new TextEncoder(); - const server = Bun.serve({ - port: 0, - fetch: () => - new Response( - new ReadableStream({ - start(controller) { - controller.enqueue( - encoder.encode( - [ - "id: uuid-typed-sse", - "event: SubscriptionRenewed", - `data: ${JSON.stringify(validEvent)}`, - "", - "", - ].join("\n"), - ), - ); - controller.close(); - }, +function startSseServer(frame: string): Promise<{ + port: number; + close(): Promise; +}> { + return new Promise((resolve, reject) => { + const server: Server = createServer((_req, res) => { + res.writeHead(200, { + "content-type": "text/event-stream", + "cache-control": "no-cache", + }); + res.write(frame); + res.end(); + }); + server.once("error", reject); + server.listen(0, "127.0.0.1", () => { + const address = server.address() as AddressInfo; + resolve({ + port: address.port, + close: () => + new Promise((closeResolve) => { + server.close(() => closeResolve()); }), - { - headers: { - "content-type": "text/event-stream", - "cache-control": "no-cache", - }, - }, - ), + }); }); + }); +} + +describe("connectWebhookStream SSE integration", () => { + it("receives typed SSE frames from an actual HTTP stream", async () => { + const frame = [ + "id: uuid-typed-sse", + "event: SubscriptionRenewed", + `data: ${JSON.stringify(validEvent)}`, + "", + "", + ].join("\n"); + const server = await startSseServer(frame); let listener: { close(): void } | null = null; try { @@ -176,7 +184,7 @@ describe("connectWebhookStream SSE integration", () => { expect(event.purchaseToken).toBe("token-typed-sse"); } finally { listener?.close(); - server.stop(true); + await server.close(); } }); }); diff --git a/packages/kit/Dockerfile b/packages/kit/Dockerfile index da149886..2949279b 100644 --- a/packages/kit/Dockerfile +++ b/packages/kit/Dockerfile @@ -53,7 +53,13 @@ ENV VITE_KIT_SENTRY_DSN=${VITE_KIT_SENTRY_DSN} # `bun run build:all` cache when the token rotates — without it, GHA / Fly's # remote builder would happily reuse the prior layer and ship a stale token. ARG VITE_KIT_MIXPANEL_TOKEN_HASH= +# VITE_KIT_MIXPANEL_TOKEN_HASH must be expanded inside the RUN command +# string for BuildKit to fold it into the layer cache key — an unused +# ARG does not affect caching, so the rotation-busting effect would +# silently disappear if we only declared it above. The build script +# does not consume the hash; it only exists as a cache fingerprint. RUN --mount=type=secret,id=VITE_KIT_MIXPANEL_TOKEN,required=false \ + VITE_KIT_MIXPANEL_TOKEN_HASH="${VITE_KIT_MIXPANEL_TOKEN_HASH}" \ VITE_KIT_MIXPANEL_TOKEN="$(cat /run/secrets/VITE_KIT_MIXPANEL_TOKEN 2>/dev/null || true)" \ bun run build:all diff --git a/packages/kit/server/api/v1/webhookStreamDrain.test.ts b/packages/kit/server/api/v1/webhookStreamDrain.test.ts index b3ed4d51..7b37cd2f 100644 --- a/packages/kit/server/api/v1/webhookStreamDrain.test.ts +++ b/packages/kit/server/api/v1/webhookStreamDrain.test.ts @@ -181,4 +181,77 @@ describe("drainWebhookEventBatches", () => { afterCreationTime: 2, }); }); + + it("triggers the saturated-cohort fallback even when the page is fully dedup'd", async () => { + // Reviewer-flagged edge: gating the fallback on delivered events + // would let a full same-ms page of duplicates break early instead + // of advancing past the cohort. + const limit = 5; + const cohort = Array.from({ length: limit }, (_, index) => ({ + id: `dedup-${index}`, + receivedAt: 7_000, + _creationTime: index + 1, + })); + const postCohort = { + id: "post-cohort", + receivedAt: 7_001, + _creationTime: 100, + }; + const loadBatch = async ({ + sinceMs, + afterCreationTime, + }: { + sinceMs: number; + afterCreationTime?: number; + limit: number; + }) => { + if (sinceMs === 7_000 && afterCreationTime === undefined) { + return cohort; + } + if (sinceMs === 7_000 && afterCreationTime !== undefined) { + return []; + } + if (sinceMs >= 7_001) { + return [postCohort]; + } + return []; + }; + + const delivered: string[] = []; + const result = await drainWebhookEventBatches({ + initialCursor: { sinceMs: 7_000 }, + limit, + maxIterations: 10, + loadBatch, + seen: makeSeen(cohort.map((event) => event.id)), + writeEvent: async (_event, id) => { + delivered.push(id); + }, + }); + + expect(delivered).toEqual(["post-cohort"]); + expect(result.cursor.sinceMs).toBe(7_001); + }); + + it("leaves a failed event eligible for retry by adding to `seen` only after writeEvent succeeds", async () => { + const events = [{ id: "will-fail", receivedAt: 9_001, _creationTime: 1 }]; + const seen = makeSeen(); + let attempts = 0; + + await expect( + drainWebhookEventBatches({ + initialCursor: { sinceMs: 9_000 }, + maxIterations: 10, + loadBatch: makePagedLoader(events), + seen, + writeEvent: async () => { + attempts += 1; + throw new Error("network blip"); + }, + }), + ).rejects.toThrow("network blip"); + + expect(attempts).toBe(1); + expect(seen.has("will-fail")).toBe(false); + }); }); diff --git a/packages/kit/server/api/v1/webhookStreamDrain.ts b/packages/kit/server/api/v1/webhookStreamDrain.ts index 52fbaa24..b71eb45d 100644 --- a/packages/kit/server/api/v1/webhookStreamDrain.ts +++ b/packages/kit/server/api/v1/webhookStreamDrain.ts @@ -54,12 +54,12 @@ export async function drainWebhookEventBatches( let delivered = 0; let iterations = 0; let hitIterationLimit = false; - // Tracks whether the previous iteration delivered events stuck at - // `cursor.sinceMs`. Used by the saturated-cohort fallback below so a - // single millisecond's burst that exceeds the underlying query's row - // cap can still make forward progress instead of declaring drain - // complete on the next empty batch. - let lastDeliveredReceivedAt: number | null = null; + // Tracks whether the previous iteration *observed* events at + // `cursor.sinceMs` (delivered or deduped). Used by the saturated- + // cohort fallback below so a single millisecond's burst that + // exceeds the underlying query's row cap can still make forward + // progress even when the whole page was already in `seen`. + let lastObservedReceivedAt: number | null = null; while (!options.isAborted?.()) { if (iterations >= options.maxIterations) { @@ -71,16 +71,18 @@ export async function drainWebhookEventBatches( const batch = await options.loadBatch({ ...cursor, limit }); if (!batch.length) { - // Saturated-cohort fallback: if the previous iteration delivered - // events at this millisecond and the next query came back empty, - // the underlying query may have hidden the rest of that cohort - // behind its own row cap (e.g. boundaryTail.take(limit) returning - // a partial slice of a same-ms burst). Advance past the - // millisecond and try once more before declaring drain complete. - if (lastDeliveredReceivedAt === cursor.sinceMs) { + // Saturated-cohort fallback: if the previous iteration saw + // events stuck at this millisecond and the next query came back + // empty, the underlying query may have hidden the rest of that + // cohort behind its own row cap (e.g. boundaryTail.take(limit) + // returning a partial slice of a same-ms burst). Advance past + // the millisecond and try once more before declaring drain + // complete. We gate on observation rather than delivery so a + // full page of dedup'd same-ms events still advances the cursor. + if (lastObservedReceivedAt === cursor.sinceMs) { cursor.sinceMs += 1; cursor.afterCreationTime = undefined; - lastDeliveredReceivedAt = null; + lastObservedReceivedAt = null; continue; } break; @@ -93,17 +95,21 @@ export async function drainWebhookEventBatches( if (advanceCursor(cursor, event)) { advanced = true; } + if (typeof event.receivedAt === "number") { + lastObservedReceivedAt = event.receivedAt; + } const id = typeof event.id === "string" ? event.id : null; if (!id || options.seen.has(id)) { continue; } - options.seen.add(id); + // Add to `seen` *after* writeEvent succeeds so a thrown writer + // leaves the event eligible for retry on the next drain pass. + // The await keeps the loop sequential, so there's no in-batch + // race where the same id could be dispatched twice. await options.writeEvent(event, id); + options.seen.add(id); delivered += 1; - if (typeof event.receivedAt === "number") { - lastDeliveredReceivedAt = event.receivedAt; - } } if (!advanced || batch.length < limit) { From 162dceba32990e7fad1c7f3c0d801369bd34c0ad Mon Sep 17 00:00:00 2001 From: hyochan Date: Tue, 5 May 2026 01:23:51 +0900 Subject: [PATCH 5/6] fix(kit): document drain fallback observability --- .../server/api/v1/webhookStreamDrain.test.ts | 13 +++++++++++ .../kit/server/api/v1/webhookStreamDrain.ts | 22 +++++++++++++++++++ packages/kit/server/api/v1/webhooks.ts | 17 ++++++++++++++ 3 files changed, 52 insertions(+) diff --git a/packages/kit/server/api/v1/webhookStreamDrain.test.ts b/packages/kit/server/api/v1/webhookStreamDrain.test.ts index 7b37cd2f..c8ea8dcd 100644 --- a/packages/kit/server/api/v1/webhookStreamDrain.test.ts +++ b/packages/kit/server/api/v1/webhookStreamDrain.test.ts @@ -136,6 +136,10 @@ describe("drainWebhookEventBatches", () => { }; const delivered: string[] = []; + const fallbacks: Array<{ + cursor: { sinceMs: number; afterCreationTime?: number }; + nextSinceMs: number; + }> = []; const result = await drainWebhookEventBatches({ initialCursor: { sinceMs: 5_000 }, limit, @@ -145,6 +149,9 @@ describe("drainWebhookEventBatches", () => { writeEvent: async (_event, id) => { delivered.push(id); }, + onSaturatedCohortFallback: ({ cursor, nextSinceMs }) => { + fallbacks.push({ cursor, nextSinceMs }); + }, }); expect(delivered).toEqual([ @@ -155,6 +162,12 @@ describe("drainWebhookEventBatches", () => { "cohort-4", "post-cohort", ]); + expect(fallbacks).toEqual([ + { + cursor: { sinceMs: 5_000, afterCreationTime: 5 }, + nextSinceMs: 5_001, + }, + ]); expect(result.cursor.sinceMs).toBe(5_001); }); diff --git a/packages/kit/server/api/v1/webhookStreamDrain.ts b/packages/kit/server/api/v1/webhookStreamDrain.ts index b71eb45d..5801be2b 100644 --- a/packages/kit/server/api/v1/webhookStreamDrain.ts +++ b/packages/kit/server/api/v1/webhookStreamDrain.ts @@ -34,6 +34,12 @@ export type DrainWebhookEventBatchesOptions = { iterations: number; cursor: WebhookStreamCursor; }) => void; + onSaturatedCohortFallback?: (state: { + iterations: number; + cursor: WebhookStreamCursor; + nextSinceMs: number; + limit: number; + }) => void; }; export type DrainWebhookEventBatchesResult = { @@ -43,6 +49,16 @@ export type DrainWebhookEventBatchesResult = { hitIterationLimit: boolean; }; +/** + * Drains webhook event pages using a moving `(receivedAt, _creationTime)` + * cursor. Events are processed sequentially so the caller can preserve SSE + * ordering and backpressure; an event id is added to `seen` only after + * `writeEvent` succeeds, leaving failed writes retryable on the next pass. + * Abort checks stop further work without throwing, and `maxIterations` bounds + * the loop for safety. When the source query appears to truncate a saturated + * same-millisecond cohort, the helper reports the fallback and advances + * `sinceMs` by one millisecond before retrying. + */ export async function drainWebhookEventBatches( options: DrainWebhookEventBatchesOptions, ): Promise { @@ -80,6 +96,12 @@ export async function drainWebhookEventBatches( // complete. We gate on observation rather than delivery so a // full page of dedup'd same-ms events still advances the cursor. if (lastObservedReceivedAt === cursor.sinceMs) { + options.onSaturatedCohortFallback?.({ + iterations, + cursor: { ...cursor }, + nextSinceMs: cursor.sinceMs + 1, + limit, + }); cursor.sinceMs += 1; cursor.afterCreationTime = undefined; lastObservedReceivedAt = null; diff --git a/packages/kit/server/api/v1/webhooks.ts b/packages/kit/server/api/v1/webhooks.ts index 8f81ab70..651a5a5a 100644 --- a/packages/kit/server/api/v1/webhooks.ts +++ b/packages/kit/server/api/v1/webhooks.ts @@ -658,6 +658,23 @@ webhooks.get("/stream/:apiKey", async (c) => { { iterations, liveCursor: cursor.sinceMs }, ); }, + onSaturatedCohortFallback: ({ + iterations, + cursor, + nextSinceMs, + limit, + }) => { + console.warn( + "[webhooks/stream] live drain saturated same-ms cohort fallback", + { + iterations, + limit, + liveCursor: cursor.sinceMs, + afterCreationTime: cursor.afterCreationTime, + nextLiveCursor: nextSinceMs, + }, + ); + }, }); liveCursor = result.cursor.sinceMs; liveCreationCursor = result.cursor.afterCreationTime; From f61f2e0c2488020554550a39e5460f22e0fea8d8 Mon Sep 17 00:00:00 2001 From: hyochan Date: Tue, 5 May 2026 02:13:46 +0900 Subject: [PATCH 6/6] fix(kit): tighten saturated-cohort gate + harden Kit dev launch config - webhookStreamDrain: gate the saturated-cohort fallback on the prior page being a true same-receivedAt cohort (`batch.length === limit`, every event sharing one receivedAt, that receivedAt matching `cursor.sinceMs`). A mixed full page that merely ends at the cursor ms is no longer evidence of saturation, so we don't risk skipping a late-arriving same-ms event the next normal query would have picked up. Add a regression test asserting the fallback does NOT fire on a mixed full page ending at the cursor ms. - launch.json: prepend `npx --yes kill-port 3000 5173 || true` and install a `trap 'kill 0' INT TERM EXIT` for the Kit Dev config. Without it, stopping the debug session left bun's :3000 server alive across runs and the next launch crashed with EADDRINUSE. Co-Authored-By: Claude Opus 4.7 (1M context) --- .vscode/launch.json | 2 +- .../server/api/v1/webhookStreamDrain.test.ts | 36 ++++++++++++++ .../kit/server/api/v1/webhookStreamDrain.ts | 48 +++++++++++++++---- 3 files changed, 76 insertions(+), 10 deletions(-) diff --git a/.vscode/launch.json b/.vscode/launch.json index e383c089..956d99b3 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -130,7 +130,7 @@ "runtimeExecutable": "bash", "runtimeArgs": [ "-lc", - "([ -d ../../node_modules ] || (echo '📦 Installing workspace dependencies...' && cd ../.. && bun install)) && (bun run dev:convex & bun run dev:server & bun run dev; kill 0)" + "npx --yes kill-port 3000 5173 || true && ([ -d ../../node_modules ] || (echo '📦 Installing workspace dependencies...' && cd ../.. && bun install)) && trap 'kill 0' INT TERM EXIT && (bun run dev:convex & bun run dev:server & bun run dev; wait)" ], "cwd": "${workspaceFolder}/packages/kit", "console": "integratedTerminal" diff --git a/packages/kit/server/api/v1/webhookStreamDrain.test.ts b/packages/kit/server/api/v1/webhookStreamDrain.test.ts index c8ea8dcd..2c931c52 100644 --- a/packages/kit/server/api/v1/webhookStreamDrain.test.ts +++ b/packages/kit/server/api/v1/webhookStreamDrain.test.ts @@ -195,6 +195,42 @@ describe("drainWebhookEventBatches", () => { }); }); + it("does NOT bump sinceMs when a full page merely ends at the cursor ms but spans multiple receivedAts", async () => { + // Reviewer-flagged edge (coderabbit): a full page whose tail + // happens to land at the cursor millisecond is NOT a saturated + // cohort — bumping sinceMs there would skip a late-arriving event + // at the same ms that the next normal query would have picked up. + const limit = 5; + const mixedPage = [ + { id: "mix-0", receivedAt: 8_000, _creationTime: 1 }, + { id: "mix-1", receivedAt: 8_001, _creationTime: 2 }, + { id: "mix-2", receivedAt: 8_002, _creationTime: 3 }, + { id: "mix-3", receivedAt: 8_003, _creationTime: 4 }, + { id: "mix-4", receivedAt: 8_003, _creationTime: 5 }, + ]; + let calls = 0; + const loadBatch = async () => { + calls += 1; + return calls === 1 ? mixedPage : []; + }; + + let fallbackFired = false; + const result = await drainWebhookEventBatches({ + initialCursor: { sinceMs: 7_999 }, + limit, + maxIterations: 10, + loadBatch, + seen: makeSeen(), + writeEvent: async () => {}, + onSaturatedCohortFallback: () => { + fallbackFired = true; + }, + }); + + expect(fallbackFired).toBe(false); + expect(result.cursor.sinceMs).toBe(8_003); + }); + it("triggers the saturated-cohort fallback even when the page is fully dedup'd", async () => { // Reviewer-flagged edge: gating the fallback on delivered events // would let a full same-ms page of duplicates break early instead diff --git a/packages/kit/server/api/v1/webhookStreamDrain.ts b/packages/kit/server/api/v1/webhookStreamDrain.ts index 5801be2b..07273d60 100644 --- a/packages/kit/server/api/v1/webhookStreamDrain.ts +++ b/packages/kit/server/api/v1/webhookStreamDrain.ts @@ -76,6 +76,13 @@ export async function drainWebhookEventBatches( // exceeds the underlying query's row cap can still make forward // progress even when the whole page was already in `seen`. let lastObservedReceivedAt: number | null = null; + // Tighter gate for the saturated-cohort fallback: only true when + // the previous page (a) was full (`length === limit`) and (b) every + // event sat at `cursor.sinceMs`. A mixed full page that merely ends + // at the cursor ms is not evidence of saturation — bumping `sinceMs` + // there would risk skipping a late-arriving same-ms event that the + // next normal query would have picked up. + let lastPageWasSaturatedCohort = false; while (!options.isAborted?.()) { if (iterations >= options.maxIterations) { @@ -87,15 +94,17 @@ export async function drainWebhookEventBatches( const batch = await options.loadBatch({ ...cursor, limit }); if (!batch.length) { - // Saturated-cohort fallback: if the previous iteration saw - // events stuck at this millisecond and the next query came back - // empty, the underlying query may have hidden the rest of that - // cohort behind its own row cap (e.g. boundaryTail.take(limit) - // returning a partial slice of a same-ms burst). Advance past - // the millisecond and try once more before declaring drain - // complete. We gate on observation rather than delivery so a - // full page of dedup'd same-ms events still advances the cursor. - if (lastObservedReceivedAt === cursor.sinceMs) { + // Saturated-cohort fallback: if the previous iteration's full + // page was pinned to this millisecond and the next query came + // back empty, the underlying query may have hidden the rest of + // that cohort behind its own row cap (e.g. boundaryTail.take + // (limit) returning a partial slice of a same-ms burst). + // Advance past the millisecond and try once more before + // declaring drain complete. + if ( + lastPageWasSaturatedCohort && + lastObservedReceivedAt === cursor.sinceMs + ) { options.onSaturatedCohortFallback?.({ iterations, cursor: { ...cursor }, @@ -105,12 +114,21 @@ export async function drainWebhookEventBatches( cursor.sinceMs += 1; cursor.afterCreationTime = undefined; lastObservedReceivedAt = null; + lastPageWasSaturatedCohort = false; continue; } break; } let advanced = false; + // A page is a "saturated cohort" only when every event in it + // shares one receivedAt value. Track the first event's + // receivedAt and flip the flag if any later event differs — + // comparing against `cursor.sinceMs` doesn't work because + // `advanceCursor` bumps the cursor to each event's receivedAt + // mid-iteration, making the equality trivially true. + let pageReceivedAt: number | null = null; + let pageAllSameReceivedAt = true; for (const event of batch) { if (options.isAborted?.()) break; @@ -119,6 +137,13 @@ export async function drainWebhookEventBatches( } if (typeof event.receivedAt === "number") { lastObservedReceivedAt = event.receivedAt; + if (pageReceivedAt === null) { + pageReceivedAt = event.receivedAt; + } else if (event.receivedAt !== pageReceivedAt) { + pageAllSameReceivedAt = false; + } + } else { + pageAllSameReceivedAt = false; } const id = typeof event.id === "string" ? event.id : null; @@ -134,6 +159,11 @@ export async function drainWebhookEventBatches( delivered += 1; } + lastPageWasSaturatedCohort = + batch.length >= limit && + pageAllSameReceivedAt && + pageReceivedAt === cursor.sinceMs; + if (!advanced || batch.length < limit) { break; }