Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ jobs:
test -f src/generated/Types.swift || exit 1
test -f src/generated/types.dart || exit 1

- name: Run tests
working-directory: packages/gql
run: bun run test
Comment thread
hyochan marked this conversation as resolved.

- name: Verify generated types are committed (no drift)
run: |
# `bun run generate` is expected to be idempotent: running it
Expand Down
5 changes: 5 additions & 0 deletions .github/workflows/deploy-kit.yml
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,16 @@ jobs:
# runtime rather than failing the build.
# Mixpanel is passed as a BuildKit secret, not an ARG/ENV pair,
# so Docker's secret-name check does not flag a public SPA token.
# A short hash of the token is also passed as a build arg purely to
# bust the layer cache when the token rotates (BuildKit secret values
# are not part of the cache key).
BUILD_FLAGS=(
--build-arg "VITE_KIT_CONVEX_URL=$VITE_KIT_CONVEX_URL"
--build-arg "VITE_KIT_SENTRY_DSN=$VITE_KIT_SENTRY_DSN"
)
if [ -n "$VITE_KIT_MIXPANEL_TOKEN" ]; then
MIXPANEL_TOKEN_HASH=$(printf '%s' "$VITE_KIT_MIXPANEL_TOKEN" | sha256sum | cut -c1-16)
BUILD_FLAGS+=(--build-arg "VITE_KIT_MIXPANEL_TOKEN_HASH=$MIXPANEL_TOKEN_HASH")
BUILD_FLAGS+=(--build-secret "VITE_KIT_MIXPANEL_TOKEN=$VITE_KIT_MIXPANEL_TOKEN")
fi

Expand Down
2 changes: 1 addition & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@
"runtimeExecutable": "bash",
"runtimeArgs": [
"-lc",
"([ -d ../../node_modules ] || (echo '📦 Installing workspace dependencies...' && cd ../.. && bun install)) && (bun run dev:convex & bun run dev:server & bun run dev; kill 0)"
"npx --yes kill-port 3000 5173 || true && ([ -d ../../node_modules ] || (echo '📦 Installing workspace dependencies...' && cd ../.. && bun install)) && trap 'kill 0' INT TERM EXIT && (bun run dev:convex & bun run dev:server & bun run dev; wait)"
],
"cwd": "${workspaceFolder}/packages/kit",
"console": "integratedTerminal"
Expand Down
1 change: 1 addition & 0 deletions bun.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions packages/gql/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
"generate:kotlin": "bun codegen/index.ts kotlin",
"generate:dart": "bun codegen/index.ts dart",
"generate:gdscript": "bun codegen/index.ts gdscript",
"sync": "bun scripts/sync-to-platforms.mjs"
"sync": "bun scripts/sync-to-platforms.mjs",
"test": "vitest run src"
},
"keywords": [
"graphql",
Expand All @@ -36,7 +37,8 @@
"graphql": "^16.11.0",
"handlebars": "^4.7.8",
"ts-node": "^10.9.2",
"typescript": "^5.9.2"
"typescript": "^5.9.2",
"vitest": "^4"
},
"packageManager": "bun@1.1.0"
}
68 changes: 38 additions & 30 deletions packages/gql/src/webhook-client.sse.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { createServer, type Server } from "node:http";
import type { AddressInfo } from "node:net";
import { describe, expect, it } from "vitest";

import {
Expand Down Expand Up @@ -114,37 +116,43 @@ class FetchEventSource implements WebhookEventStream {
}
}

describe("connectWebhookStream SSE integration", () => {
it("receives typed SSE frames from an actual HTTP stream", async () => {
const encoder = new TextEncoder();
const server = Bun.serve({
port: 0,
fetch: () =>
new Response(
new ReadableStream({
start(controller) {
controller.enqueue(
encoder.encode(
[
"id: uuid-typed-sse",
"event: SubscriptionRenewed",
`data: ${JSON.stringify(validEvent)}`,
"",
"",
].join("\n"),
),
);
controller.close();
},
function startSseServer(frame: string): Promise<{
port: number;
close(): Promise<void>;
}> {
return new Promise((resolve, reject) => {
const server: Server = createServer((_req, res) => {
res.writeHead(200, {
"content-type": "text/event-stream",
"cache-control": "no-cache",
});
res.write(frame);
res.end();
});
server.once("error", reject);
server.listen(0, "127.0.0.1", () => {
const address = server.address() as AddressInfo;
resolve({
port: address.port,
close: () =>
new Promise<void>((closeResolve) => {
server.close(() => closeResolve());
}),
{
headers: {
"content-type": "text/event-stream",
"cache-control": "no-cache",
},
},
),
});
});
});
}

describe("connectWebhookStream SSE integration", () => {
it("receives typed SSE frames from an actual HTTP stream", async () => {
const frame = [
"id: uuid-typed-sse",
"event: SubscriptionRenewed",
`data: ${JSON.stringify(validEvent)}`,
"",
"",
].join("\n");
const server = await startSseServer(frame);

let listener: { close(): void } | null = null;
try {
Expand Down Expand Up @@ -176,7 +184,7 @@ describe("connectWebhookStream SSE integration", () => {
expect(event.purchaseToken).toBe("token-typed-sse");
} finally {
listener?.close();
server.stop(true);
await server.close();
}
});
});
11 changes: 11 additions & 0 deletions packages/kit/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,18 @@ ARG VITE_KIT_SENTRY_DSN
ENV VITE_KIT_SENTRY_DSN=${VITE_KIT_SENTRY_DSN}
# Mixpanel project token is public by design and baked into the SPA when set,
# but pass it via BuildKit secret so Docker does not flag TOKEN-named ARG/ENV.
# BuildKit secret values are not part of the layer cache key, so a non-secret
# fingerprint of the token is passed as a build arg purely to bust the
# `bun run build:all` cache when the token rotates — without it, GHA / Fly's
# remote builder would happily reuse the prior layer and ship a stale token.
ARG VITE_KIT_MIXPANEL_TOKEN_HASH=
# VITE_KIT_MIXPANEL_TOKEN_HASH must be expanded inside the RUN command
# string for BuildKit to fold it into the layer cache key — an unused
# ARG does not affect caching, so the rotation-busting effect would
# silently disappear if we only declared it above. The build script
# does not consume the hash; it only exists as a cache fingerprint.
RUN --mount=type=secret,id=VITE_KIT_MIXPANEL_TOKEN,required=false \
Comment thread
hyochan marked this conversation as resolved.
Comment thread
hyochan marked this conversation as resolved.
VITE_KIT_MIXPANEL_TOKEN_HASH="${VITE_KIT_MIXPANEL_TOKEN_HASH}" \
VITE_KIT_MIXPANEL_TOKEN="$(cat /run/secrets/VITE_KIT_MIXPANEL_TOKEN 2>/dev/null || true)" \
bun run build:all
Comment thread
hyochan marked this conversation as resolved.

Expand Down
12 changes: 12 additions & 0 deletions packages/kit/scripts/deploy-prod.sh
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,18 @@ fi
if [ -n "${VITE_KIT_MIXPANEL_TOKEN:-}" ]; then
# Public SPA config, passed as a BuildKit secret to avoid Docker's
# TOKEN-named ARG/ENV warning while still baking it into the bundle.
# The hash arg is purely a cache buster — BuildKit secret values do
# not participate in the layer cache key, so without it a token
# rotation would happily reuse the prior `bun run build:all` layer.
if command -v sha256sum >/dev/null 2>&1; then
MIXPANEL_TOKEN_HASH=$(printf '%s' "$VITE_KIT_MIXPANEL_TOKEN" | sha256sum | cut -c1-16)
elif command -v shasum >/dev/null 2>&1; then
MIXPANEL_TOKEN_HASH=$(printf '%s' "$VITE_KIT_MIXPANEL_TOKEN" | shasum -a 256 | cut -c1-16)
else
echo "error: need sha256sum (most Linux) or shasum (macOS) on PATH to hash the Mixpanel token." >&2
exit 1
fi
BUILD_FLAGS+=(--build-arg "VITE_KIT_MIXPANEL_TOKEN_HASH=$MIXPANEL_TOKEN_HASH")
BUILD_FLAGS+=(--build-secret "VITE_KIT_MIXPANEL_TOKEN=$VITE_KIT_MIXPANEL_TOKEN")
fi

Expand Down
186 changes: 186 additions & 0 deletions packages/kit/server/api/v1/webhookStreamDrain.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,83 @@ describe("drainWebhookEventBatches", () => {
});
});

it("steps past a saturated millisecond cohort hidden by the query cap", async () => {
// Simulate the original webhookEventsSince bug shape: a same-
// millisecond burst that fills exactly `limit` rows so the helper
// keeps looping, then the next query at (sinceMs, afterCreationTime)
// lies and returns [] (mirroring an in-memory filter that drops
// the rest of the cohort past the take() cap). Without the
// saturated-cohort fallback the helper would declare drain
// complete and miss the post-cohort event.
const limit = 5;
const cohort = Array.from({ length: limit }, (_, index) => ({
id: `cohort-${index}`,
receivedAt: 5_000,
_creationTime: index + 1,
}));
const postCohort = {
id: "post-cohort",
receivedAt: 5_001,
_creationTime: 100,
};
const loadBatch = async ({
sinceMs,
afterCreationTime,
}: {
sinceMs: number;
afterCreationTime?: number;
limit: number;
}) => {
if (sinceMs === 5_000 && afterCreationTime === undefined) {
return cohort;
}
if (sinceMs === 5_000 && afterCreationTime !== undefined) {
// The buggy underlying query: claims the cohort is exhausted
// even though we've only walked a take()-capped slice.
return [];
}
if (sinceMs >= 5_001) {
return [postCohort];
}
return [];
};

const delivered: string[] = [];
const fallbacks: Array<{
cursor: { sinceMs: number; afterCreationTime?: number };
nextSinceMs: number;
}> = [];
const result = await drainWebhookEventBatches({
initialCursor: { sinceMs: 5_000 },
limit,
maxIterations: 10,
loadBatch,
seen: makeSeen(),
writeEvent: async (_event, id) => {
delivered.push(id);
},
onSaturatedCohortFallback: ({ cursor, nextSinceMs }) => {
fallbacks.push({ cursor, nextSinceMs });
},
});

expect(delivered).toEqual([
"cohort-0",
"cohort-1",
"cohort-2",
"cohort-3",
"cohort-4",
"post-cohort",
]);
expect(fallbacks).toEqual([
{
cursor: { sinceMs: 5_000, afterCreationTime: 5 },
nextSinceMs: 5_001,
},
]);
expect(result.cursor.sinceMs).toBe(5_001);
});

it("advances the cursor even when duplicate ids are skipped", async () => {
const events = [
{ id: "already-sent", receivedAt: 2_001, _creationTime: 1 },
Expand All @@ -117,4 +194,113 @@ describe("drainWebhookEventBatches", () => {
afterCreationTime: 2,
});
});

it("does NOT bump sinceMs when a full page merely ends at the cursor ms but spans multiple receivedAts", async () => {
// Reviewer-flagged edge (coderabbit): a full page whose tail
// happens to land at the cursor millisecond is NOT a saturated
// cohort — bumping sinceMs there would skip a late-arriving event
// at the same ms that the next normal query would have picked up.
const limit = 5;
const mixedPage = [
{ id: "mix-0", receivedAt: 8_000, _creationTime: 1 },
{ id: "mix-1", receivedAt: 8_001, _creationTime: 2 },
{ id: "mix-2", receivedAt: 8_002, _creationTime: 3 },
{ id: "mix-3", receivedAt: 8_003, _creationTime: 4 },
{ id: "mix-4", receivedAt: 8_003, _creationTime: 5 },
];
let calls = 0;
const loadBatch = async () => {
calls += 1;
return calls === 1 ? mixedPage : [];
};

let fallbackFired = false;
const result = await drainWebhookEventBatches({
initialCursor: { sinceMs: 7_999 },
limit,
maxIterations: 10,
loadBatch,
seen: makeSeen(),
writeEvent: async () => {},
onSaturatedCohortFallback: () => {
fallbackFired = true;
},
});

expect(fallbackFired).toBe(false);
expect(result.cursor.sinceMs).toBe(8_003);
});

it("triggers the saturated-cohort fallback even when the page is fully dedup'd", async () => {
// Reviewer-flagged edge: gating the fallback on delivered events
// would let a full same-ms page of duplicates break early instead
// of advancing past the cohort.
const limit = 5;
const cohort = Array.from({ length: limit }, (_, index) => ({
id: `dedup-${index}`,
receivedAt: 7_000,
_creationTime: index + 1,
}));
const postCohort = {
id: "post-cohort",
receivedAt: 7_001,
_creationTime: 100,
};
const loadBatch = async ({
sinceMs,
afterCreationTime,
}: {
sinceMs: number;
afterCreationTime?: number;
limit: number;
}) => {
if (sinceMs === 7_000 && afterCreationTime === undefined) {
return cohort;
}
if (sinceMs === 7_000 && afterCreationTime !== undefined) {
return [];
}
if (sinceMs >= 7_001) {
return [postCohort];
}
return [];
};

const delivered: string[] = [];
const result = await drainWebhookEventBatches({
initialCursor: { sinceMs: 7_000 },
limit,
maxIterations: 10,
loadBatch,
seen: makeSeen(cohort.map((event) => event.id)),
writeEvent: async (_event, id) => {
delivered.push(id);
},
});

expect(delivered).toEqual(["post-cohort"]);
expect(result.cursor.sinceMs).toBe(7_001);
});

it("leaves a failed event eligible for retry by adding to `seen` only after writeEvent succeeds", async () => {
const events = [{ id: "will-fail", receivedAt: 9_001, _creationTime: 1 }];
const seen = makeSeen();
let attempts = 0;

await expect(
drainWebhookEventBatches({
initialCursor: { sinceMs: 9_000 },
maxIterations: 10,
loadBatch: makePagedLoader(events),
seen,
writeEvent: async () => {
attempts += 1;
throw new Error("network blip");
},
}),
).rejects.toThrow("network blip");

expect(attempts).toBe(1);
expect(seen.has("will-fail")).toBe(false);
});
});
Loading