diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 58752e09..0041f0c3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -88,6 +88,10 @@ jobs: test -f src/generated/Types.swift || exit 1 test -f src/generated/types.dart || exit 1 + - name: Run tests + working-directory: packages/gql + run: bun run test + - name: Verify generated types are committed (no drift) run: | # `bun run generate` is expected to be idempotent: running it diff --git a/.github/workflows/deploy-kit.yml b/.github/workflows/deploy-kit.yml index 86cf25b5..f5be2694 100644 --- a/.github/workflows/deploy-kit.yml +++ b/.github/workflows/deploy-kit.yml @@ -129,11 +129,16 @@ jobs: # runtime rather than failing the build. # Mixpanel is passed as a BuildKit secret, not an ARG/ENV pair, # so Docker's secret-name check does not flag a public SPA token. + # A short hash of the token is also passed as a build arg purely to + # bust the layer cache when the token rotates (BuildKit secret values + # are not part of the cache key). BUILD_FLAGS=( --build-arg "VITE_KIT_CONVEX_URL=$VITE_KIT_CONVEX_URL" --build-arg "VITE_KIT_SENTRY_DSN=$VITE_KIT_SENTRY_DSN" ) if [ -n "$VITE_KIT_MIXPANEL_TOKEN" ]; then + MIXPANEL_TOKEN_HASH=$(printf '%s' "$VITE_KIT_MIXPANEL_TOKEN" | sha256sum | cut -c1-16) + BUILD_FLAGS+=(--build-arg "VITE_KIT_MIXPANEL_TOKEN_HASH=$MIXPANEL_TOKEN_HASH") BUILD_FLAGS+=(--build-secret "VITE_KIT_MIXPANEL_TOKEN=$VITE_KIT_MIXPANEL_TOKEN") fi diff --git a/.vscode/launch.json b/.vscode/launch.json index e383c089..956d99b3 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -130,7 +130,7 @@ "runtimeExecutable": "bash", "runtimeArgs": [ "-lc", - "([ -d ../../node_modules ] || (echo '📦 Installing workspace dependencies...' && cd ../.. && bun install)) && (bun run dev:convex & bun run dev:server & bun run dev; kill 0)" + "npx --yes kill-port 3000 5173 || true && ([ -d ../../node_modules ] || (echo '📦 Installing workspace dependencies...' && cd ../.. && bun install)) && trap 'kill 0' INT TERM EXIT && (bun run dev:convex & bun run dev:server & bun run dev; wait)" ], "cwd": "${workspaceFolder}/packages/kit", "console": "integratedTerminal" diff --git a/bun.lock b/bun.lock index 9ea5ed61..78f62bdf 100644 --- a/bun.lock +++ b/bun.lock @@ -73,6 +73,7 @@ "handlebars": "^4.7.8", "ts-node": "^10.9.2", "typescript": "^5.9.2", + "vitest": "^4", }, }, "packages/kit": { diff --git a/packages/gql/package.json b/packages/gql/package.json index 7dd62db6..e8cb7b15 100644 --- a/packages/gql/package.json +++ b/packages/gql/package.json @@ -18,7 +18,8 @@ "generate:kotlin": "bun codegen/index.ts kotlin", "generate:dart": "bun codegen/index.ts dart", "generate:gdscript": "bun codegen/index.ts gdscript", - "sync": "bun scripts/sync-to-platforms.mjs" + "sync": "bun scripts/sync-to-platforms.mjs", + "test": "vitest run src" }, "keywords": [ "graphql", @@ -36,7 +37,8 @@ "graphql": "^16.11.0", "handlebars": "^4.7.8", "ts-node": "^10.9.2", - "typescript": "^5.9.2" + "typescript": "^5.9.2", + "vitest": "^4" }, "packageManager": "bun@1.1.0" } diff --git a/packages/gql/src/webhook-client.sse.test.ts b/packages/gql/src/webhook-client.sse.test.ts index a77eac94..a9dab21a 100644 --- a/packages/gql/src/webhook-client.sse.test.ts +++ b/packages/gql/src/webhook-client.sse.test.ts @@ -1,3 +1,5 @@ +import { createServer, type Server } from "node:http"; +import type { AddressInfo } from "node:net"; import { describe, expect, it } from "vitest"; import { @@ -114,37 +116,43 @@ class FetchEventSource implements WebhookEventStream { } } -describe("connectWebhookStream SSE integration", () => { - it("receives typed SSE frames from an actual HTTP stream", async () => { - const encoder = new TextEncoder(); - const server = Bun.serve({ - port: 0, - fetch: () => - new Response( - new ReadableStream({ - start(controller) { - controller.enqueue( - encoder.encode( - [ - "id: uuid-typed-sse", - "event: SubscriptionRenewed", - `data: ${JSON.stringify(validEvent)}`, - "", - "", - ].join("\n"), - ), - ); - controller.close(); - }, +function startSseServer(frame: string): Promise<{ + port: number; + close(): Promise; +}> { + return new Promise((resolve, reject) => { + const server: Server = createServer((_req, res) => { + res.writeHead(200, { + "content-type": "text/event-stream", + "cache-control": "no-cache", + }); + res.write(frame); + res.end(); + }); + server.once("error", reject); + server.listen(0, "127.0.0.1", () => { + const address = server.address() as AddressInfo; + resolve({ + port: address.port, + close: () => + new Promise((closeResolve) => { + server.close(() => closeResolve()); }), - { - headers: { - "content-type": "text/event-stream", - "cache-control": "no-cache", - }, - }, - ), + }); }); + }); +} + +describe("connectWebhookStream SSE integration", () => { + it("receives typed SSE frames from an actual HTTP stream", async () => { + const frame = [ + "id: uuid-typed-sse", + "event: SubscriptionRenewed", + `data: ${JSON.stringify(validEvent)}`, + "", + "", + ].join("\n"); + const server = await startSseServer(frame); let listener: { close(): void } | null = null; try { @@ -176,7 +184,7 @@ describe("connectWebhookStream SSE integration", () => { expect(event.purchaseToken).toBe("token-typed-sse"); } finally { listener?.close(); - server.stop(true); + await server.close(); } }); }); diff --git a/packages/kit/Dockerfile b/packages/kit/Dockerfile index bbfa53c1..2949279b 100644 --- a/packages/kit/Dockerfile +++ b/packages/kit/Dockerfile @@ -48,7 +48,18 @@ ARG VITE_KIT_SENTRY_DSN ENV VITE_KIT_SENTRY_DSN=${VITE_KIT_SENTRY_DSN} # Mixpanel project token is public by design and baked into the SPA when set, # but pass it via BuildKit secret so Docker does not flag TOKEN-named ARG/ENV. +# BuildKit secret values are not part of the layer cache key, so a non-secret +# fingerprint of the token is passed as a build arg purely to bust the +# `bun run build:all` cache when the token rotates — without it, GHA / Fly's +# remote builder would happily reuse the prior layer and ship a stale token. +ARG VITE_KIT_MIXPANEL_TOKEN_HASH= +# VITE_KIT_MIXPANEL_TOKEN_HASH must be expanded inside the RUN command +# string for BuildKit to fold it into the layer cache key — an unused +# ARG does not affect caching, so the rotation-busting effect would +# silently disappear if we only declared it above. The build script +# does not consume the hash; it only exists as a cache fingerprint. RUN --mount=type=secret,id=VITE_KIT_MIXPANEL_TOKEN,required=false \ + VITE_KIT_MIXPANEL_TOKEN_HASH="${VITE_KIT_MIXPANEL_TOKEN_HASH}" \ VITE_KIT_MIXPANEL_TOKEN="$(cat /run/secrets/VITE_KIT_MIXPANEL_TOKEN 2>/dev/null || true)" \ bun run build:all diff --git a/packages/kit/scripts/deploy-prod.sh b/packages/kit/scripts/deploy-prod.sh index f28b3800..168f711b 100755 --- a/packages/kit/scripts/deploy-prod.sh +++ b/packages/kit/scripts/deploy-prod.sh @@ -63,6 +63,18 @@ fi if [ -n "${VITE_KIT_MIXPANEL_TOKEN:-}" ]; then # Public SPA config, passed as a BuildKit secret to avoid Docker's # TOKEN-named ARG/ENV warning while still baking it into the bundle. + # The hash arg is purely a cache buster — BuildKit secret values do + # not participate in the layer cache key, so without it a token + # rotation would happily reuse the prior `bun run build:all` layer. + if command -v sha256sum >/dev/null 2>&1; then + MIXPANEL_TOKEN_HASH=$(printf '%s' "$VITE_KIT_MIXPANEL_TOKEN" | sha256sum | cut -c1-16) + elif command -v shasum >/dev/null 2>&1; then + MIXPANEL_TOKEN_HASH=$(printf '%s' "$VITE_KIT_MIXPANEL_TOKEN" | shasum -a 256 | cut -c1-16) + else + echo "error: need sha256sum (most Linux) or shasum (macOS) on PATH to hash the Mixpanel token." >&2 + exit 1 + fi + BUILD_FLAGS+=(--build-arg "VITE_KIT_MIXPANEL_TOKEN_HASH=$MIXPANEL_TOKEN_HASH") BUILD_FLAGS+=(--build-secret "VITE_KIT_MIXPANEL_TOKEN=$VITE_KIT_MIXPANEL_TOKEN") fi diff --git a/packages/kit/server/api/v1/webhookStreamDrain.test.ts b/packages/kit/server/api/v1/webhookStreamDrain.test.ts index f7156111..2c931c52 100644 --- a/packages/kit/server/api/v1/webhookStreamDrain.test.ts +++ b/packages/kit/server/api/v1/webhookStreamDrain.test.ts @@ -94,6 +94,83 @@ describe("drainWebhookEventBatches", () => { }); }); + it("steps past a saturated millisecond cohort hidden by the query cap", async () => { + // Simulate the original webhookEventsSince bug shape: a same- + // millisecond burst that fills exactly `limit` rows so the helper + // keeps looping, then the next query at (sinceMs, afterCreationTime) + // lies and returns [] (mirroring an in-memory filter that drops + // the rest of the cohort past the take() cap). Without the + // saturated-cohort fallback the helper would declare drain + // complete and miss the post-cohort event. + const limit = 5; + const cohort = Array.from({ length: limit }, (_, index) => ({ + id: `cohort-${index}`, + receivedAt: 5_000, + _creationTime: index + 1, + })); + const postCohort = { + id: "post-cohort", + receivedAt: 5_001, + _creationTime: 100, + }; + const loadBatch = async ({ + sinceMs, + afterCreationTime, + }: { + sinceMs: number; + afterCreationTime?: number; + limit: number; + }) => { + if (sinceMs === 5_000 && afterCreationTime === undefined) { + return cohort; + } + if (sinceMs === 5_000 && afterCreationTime !== undefined) { + // The buggy underlying query: claims the cohort is exhausted + // even though we've only walked a take()-capped slice. + return []; + } + if (sinceMs >= 5_001) { + return [postCohort]; + } + return []; + }; + + const delivered: string[] = []; + const fallbacks: Array<{ + cursor: { sinceMs: number; afterCreationTime?: number }; + nextSinceMs: number; + }> = []; + const result = await drainWebhookEventBatches({ + initialCursor: { sinceMs: 5_000 }, + limit, + maxIterations: 10, + loadBatch, + seen: makeSeen(), + writeEvent: async (_event, id) => { + delivered.push(id); + }, + onSaturatedCohortFallback: ({ cursor, nextSinceMs }) => { + fallbacks.push({ cursor, nextSinceMs }); + }, + }); + + expect(delivered).toEqual([ + "cohort-0", + "cohort-1", + "cohort-2", + "cohort-3", + "cohort-4", + "post-cohort", + ]); + expect(fallbacks).toEqual([ + { + cursor: { sinceMs: 5_000, afterCreationTime: 5 }, + nextSinceMs: 5_001, + }, + ]); + expect(result.cursor.sinceMs).toBe(5_001); + }); + it("advances the cursor even when duplicate ids are skipped", async () => { const events = [ { id: "already-sent", receivedAt: 2_001, _creationTime: 1 }, @@ -117,4 +194,113 @@ describe("drainWebhookEventBatches", () => { afterCreationTime: 2, }); }); + + it("does NOT bump sinceMs when a full page merely ends at the cursor ms but spans multiple receivedAts", async () => { + // Reviewer-flagged edge (coderabbit): a full page whose tail + // happens to land at the cursor millisecond is NOT a saturated + // cohort — bumping sinceMs there would skip a late-arriving event + // at the same ms that the next normal query would have picked up. + const limit = 5; + const mixedPage = [ + { id: "mix-0", receivedAt: 8_000, _creationTime: 1 }, + { id: "mix-1", receivedAt: 8_001, _creationTime: 2 }, + { id: "mix-2", receivedAt: 8_002, _creationTime: 3 }, + { id: "mix-3", receivedAt: 8_003, _creationTime: 4 }, + { id: "mix-4", receivedAt: 8_003, _creationTime: 5 }, + ]; + let calls = 0; + const loadBatch = async () => { + calls += 1; + return calls === 1 ? mixedPage : []; + }; + + let fallbackFired = false; + const result = await drainWebhookEventBatches({ + initialCursor: { sinceMs: 7_999 }, + limit, + maxIterations: 10, + loadBatch, + seen: makeSeen(), + writeEvent: async () => {}, + onSaturatedCohortFallback: () => { + fallbackFired = true; + }, + }); + + expect(fallbackFired).toBe(false); + expect(result.cursor.sinceMs).toBe(8_003); + }); + + it("triggers the saturated-cohort fallback even when the page is fully dedup'd", async () => { + // Reviewer-flagged edge: gating the fallback on delivered events + // would let a full same-ms page of duplicates break early instead + // of advancing past the cohort. + const limit = 5; + const cohort = Array.from({ length: limit }, (_, index) => ({ + id: `dedup-${index}`, + receivedAt: 7_000, + _creationTime: index + 1, + })); + const postCohort = { + id: "post-cohort", + receivedAt: 7_001, + _creationTime: 100, + }; + const loadBatch = async ({ + sinceMs, + afterCreationTime, + }: { + sinceMs: number; + afterCreationTime?: number; + limit: number; + }) => { + if (sinceMs === 7_000 && afterCreationTime === undefined) { + return cohort; + } + if (sinceMs === 7_000 && afterCreationTime !== undefined) { + return []; + } + if (sinceMs >= 7_001) { + return [postCohort]; + } + return []; + }; + + const delivered: string[] = []; + const result = await drainWebhookEventBatches({ + initialCursor: { sinceMs: 7_000 }, + limit, + maxIterations: 10, + loadBatch, + seen: makeSeen(cohort.map((event) => event.id)), + writeEvent: async (_event, id) => { + delivered.push(id); + }, + }); + + expect(delivered).toEqual(["post-cohort"]); + expect(result.cursor.sinceMs).toBe(7_001); + }); + + it("leaves a failed event eligible for retry by adding to `seen` only after writeEvent succeeds", async () => { + const events = [{ id: "will-fail", receivedAt: 9_001, _creationTime: 1 }]; + const seen = makeSeen(); + let attempts = 0; + + await expect( + drainWebhookEventBatches({ + initialCursor: { sinceMs: 9_000 }, + maxIterations: 10, + loadBatch: makePagedLoader(events), + seen, + writeEvent: async () => { + attempts += 1; + throw new Error("network blip"); + }, + }), + ).rejects.toThrow("network blip"); + + expect(attempts).toBe(1); + expect(seen.has("will-fail")).toBe(false); + }); }); diff --git a/packages/kit/server/api/v1/webhookStreamDrain.ts b/packages/kit/server/api/v1/webhookStreamDrain.ts index f28c35bb..07273d60 100644 --- a/packages/kit/server/api/v1/webhookStreamDrain.ts +++ b/packages/kit/server/api/v1/webhookStreamDrain.ts @@ -8,6 +8,18 @@ export type WebhookStreamSeenSet = { add(id: string): void; }; +// Minimal contract the helper needs from a webhook event row. The +// caller may pass through additional fields (the SSE writer reads +// `type`, the dashboard reads payload bodies etc.); the index +// signature lets those flow through without forcing every callsite +// to widen back to `Record`. +export type WebhookStreamEvent = { + id?: unknown; + receivedAt?: unknown; + _creationTime?: unknown; + [key: string]: unknown; +}; + export type DrainWebhookEventBatchesOptions = { initialCursor: WebhookStreamCursor; limit?: number; @@ -15,13 +27,19 @@ export type DrainWebhookEventBatchesOptions = { isAborted?: () => boolean; loadBatch: ( cursor: WebhookStreamCursor & { limit: number }, - ) => Promise>>; + ) => Promise; seen: WebhookStreamSeenSet; - writeEvent: (event: Record, id: string) => Promise; + writeEvent: (event: WebhookStreamEvent, id: string) => Promise; onIterationLimit?: (state: { iterations: number; cursor: WebhookStreamCursor; }) => void; + onSaturatedCohortFallback?: (state: { + iterations: number; + cursor: WebhookStreamCursor; + nextSinceMs: number; + limit: number; + }) => void; }; export type DrainWebhookEventBatchesResult = { @@ -31,6 +49,16 @@ export type DrainWebhookEventBatchesResult = { hitIterationLimit: boolean; }; +/** + * Drains webhook event pages using a moving `(receivedAt, _creationTime)` + * cursor. Events are processed sequentially so the caller can preserve SSE + * ordering and backpressure; an event id is added to `seen` only after + * `writeEvent` succeeds, leaving failed writes retryable on the next pass. + * Abort checks stop further work without throwing, and `maxIterations` bounds + * the loop for safety. When the source query appears to truncate a saturated + * same-millisecond cohort, the helper reports the fallback and advances + * `sinceMs` by one millisecond before retrying. + */ export async function drainWebhookEventBatches( options: DrainWebhookEventBatchesOptions, ): Promise { @@ -42,6 +70,19 @@ export async function drainWebhookEventBatches( let delivered = 0; let iterations = 0; let hitIterationLimit = false; + // Tracks whether the previous iteration *observed* events at + // `cursor.sinceMs` (delivered or deduped). Used by the saturated- + // cohort fallback below so a single millisecond's burst that + // exceeds the underlying query's row cap can still make forward + // progress even when the whole page was already in `seen`. + let lastObservedReceivedAt: number | null = null; + // Tighter gate for the saturated-cohort fallback: only true when + // the previous page (a) was full (`length === limit`) and (b) every + // event sat at `cursor.sinceMs`. A mixed full page that merely ends + // at the cursor ms is not evidence of saturation — bumping `sinceMs` + // there would risk skipping a late-arriving same-ms event that the + // next normal query would have picked up. + let lastPageWasSaturatedCohort = false; while (!options.isAborted?.()) { if (iterations >= options.maxIterations) { @@ -53,26 +94,76 @@ export async function drainWebhookEventBatches( const batch = await options.loadBatch({ ...cursor, limit }); if (!batch.length) { + // Saturated-cohort fallback: if the previous iteration's full + // page was pinned to this millisecond and the next query came + // back empty, the underlying query may have hidden the rest of + // that cohort behind its own row cap (e.g. boundaryTail.take + // (limit) returning a partial slice of a same-ms burst). + // Advance past the millisecond and try once more before + // declaring drain complete. + if ( + lastPageWasSaturatedCohort && + lastObservedReceivedAt === cursor.sinceMs + ) { + options.onSaturatedCohortFallback?.({ + iterations, + cursor: { ...cursor }, + nextSinceMs: cursor.sinceMs + 1, + limit, + }); + cursor.sinceMs += 1; + cursor.afterCreationTime = undefined; + lastObservedReceivedAt = null; + lastPageWasSaturatedCohort = false; + continue; + } break; } let advanced = false; + // A page is a "saturated cohort" only when every event in it + // shares one receivedAt value. Track the first event's + // receivedAt and flip the flag if any later event differs — + // comparing against `cursor.sinceMs` doesn't work because + // `advanceCursor` bumps the cursor to each event's receivedAt + // mid-iteration, making the equality trivially true. + let pageReceivedAt: number | null = null; + let pageAllSameReceivedAt = true; for (const event of batch) { if (options.isAborted?.()) break; if (advanceCursor(cursor, event)) { advanced = true; } + if (typeof event.receivedAt === "number") { + lastObservedReceivedAt = event.receivedAt; + if (pageReceivedAt === null) { + pageReceivedAt = event.receivedAt; + } else if (event.receivedAt !== pageReceivedAt) { + pageAllSameReceivedAt = false; + } + } else { + pageAllSameReceivedAt = false; + } const id = typeof event.id === "string" ? event.id : null; if (!id || options.seen.has(id)) { continue; } - options.seen.add(id); + // Add to `seen` *after* writeEvent succeeds so a thrown writer + // leaves the event eligible for retry on the next drain pass. + // The await keeps the loop sequential, so there's no in-batch + // race where the same id could be dispatched twice. await options.writeEvent(event, id); + options.seen.add(id); delivered += 1; } + lastPageWasSaturatedCohort = + batch.length >= limit && + pageAllSameReceivedAt && + pageReceivedAt === cursor.sinceMs; + if (!advanced || batch.length < limit) { break; } @@ -88,7 +179,7 @@ export async function drainWebhookEventBatches( function advanceCursor( cursor: WebhookStreamCursor, - event: Record, + event: WebhookStreamEvent, ): boolean { const receivedAt = typeof event.receivedAt === "number" ? event.receivedAt : null; diff --git a/packages/kit/server/api/v1/webhooks.ts b/packages/kit/server/api/v1/webhooks.ts index 8f81ab70..651a5a5a 100644 --- a/packages/kit/server/api/v1/webhooks.ts +++ b/packages/kit/server/api/v1/webhooks.ts @@ -658,6 +658,23 @@ webhooks.get("/stream/:apiKey", async (c) => { { iterations, liveCursor: cursor.sinceMs }, ); }, + onSaturatedCohortFallback: ({ + iterations, + cursor, + nextSinceMs, + limit, + }) => { + console.warn( + "[webhooks/stream] live drain saturated same-ms cohort fallback", + { + iterations, + limit, + liveCursor: cursor.sinceMs, + afterCreationTime: cursor.afterCreationTime, + nextLiveCursor: nextSinceMs, + }, + ); + }, }); liveCursor = result.cursor.sinceMs; liveCreationCursor = result.cursor.afterCreationTime;