Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 69 additions & 23 deletions src/app/api/stream/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ const MAX_CONNECTIONS_PER_USER = 4;
// while being 7.5x cheaper per connection than the previous 2 s interval.
const POLL_INTERVAL_MS = 15_000;

// Keep SSE connections bounded even if a proxy or client fails to send an
// abort signal. EventSource will reconnect automatically when the stream ends.
const MAX_CONNECTION_DURATION_MS = 5 * 60 * 1000;

export async function GET(req: NextRequest) {
const session = await getServerSession(authOptions);
if (!session?.githubId || !session.githubLogin) {
Expand Down Expand Up @@ -51,14 +55,59 @@ export async function GET(req: NextRequest) {

let lastCheckedSyncedAt: string | null = null;
let lastCheckedUnreadCount: number | null = null;

let isClosed = false;
let cleanupStream: (() => void) | null = null;

const stream = new ReadableStream({
start(controller) {
let isClosed = false;
let interval: ReturnType<typeof setInterval>;
let maxDurationTimeout: ReturnType<typeof setTimeout>;

const releaseConnectionSlot = () => {
const remaining = activeStreamConnections.get(userId) ?? 1;
if (remaining <= 1) {
activeStreamConnections.delete(userId);
} else {
activeStreamConnections.set(userId, remaining - 1);
}
};

const closeStream = () => {
if (isClosed) {
return;
}

isClosed = true;
clearInterval(interval);
clearTimeout(maxDurationTimeout);

try {
controller.close();
} catch {
// The stream may already be closed/canceled by the client.
}

releaseConnectionSlot();
};

const safeEnqueue = (message: string) => {
if (isClosed) {
return;
}

try {
controller.enqueue(message);
} catch {
closeStream();
}
};

const checkData = async () => {
if (isClosed) return;
try {
if (isClosed) {
return;
}

const { data: goals } = await supabaseAdmin
.from("goals")
.select("last_synced_at")
Expand Down Expand Up @@ -92,8 +141,8 @@ export async function GET(req: NextRequest) {
lastCheckedUnreadCount = currentUnreadCount;
}

if (hasChanges && !isClosed) {
controller.enqueue(`data: ${JSON.stringify(payload)}\n\n`);
if (hasChanges) {
safeEnqueue(`data: ${JSON.stringify(payload)}\n\n`);
}
} catch (error) {
console.error("SSE Polling Error:", error);
Expand All @@ -103,31 +152,28 @@ export async function GET(req: NextRequest) {
// Register the interval and abort handler synchronously so they are
// guaranteed to be in place before any async work begins. This prevents
// a race where abort() fires before the listener is attached.
const interval = setInterval(() => {
if (!isClosed) checkData();
interval = setInterval(() => {
checkData();
}, POLL_INTERVAL_MS);

req.signal.addEventListener("abort", () => {
isClosed = true;
clearInterval(interval);
try {
controller.close();
} catch (e) {
// ignore already closed
}
maxDurationTimeout = setTimeout(
closeStream,
MAX_CONNECTION_DURATION_MS
);

// Decrement the connection counter so the slot becomes available again.
const remaining = activeStreamConnections.get(userId) ?? 1;
if (remaining <= 1) {
activeStreamConnections.delete(userId);
} else {
activeStreamConnections.set(userId, remaining - 1);
}
});
cleanupStream = closeStream;
if (req.signal.aborted) {
closeStream();
} else {
req.signal.addEventListener("abort", closeStream, { once: true });
}

// Kick off the first poll immediately (non-blocking).
checkData();
},
cancel() {
cleanupStream?.();
},
});

return new Response(stream, {
Expand Down
49 changes: 48 additions & 1 deletion test/sse-stream-route.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
// @vitest-environment node

import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { NextRequest } from "next/server";
import { activeStreamConnections } from "@/lib/sse";

Expand Down Expand Up @@ -33,6 +35,14 @@ function makeAbortableRequest(): { req: NextRequest; abort: () => void } {
return { req, abort: () => controller.abort() };
}

function makePreAbortedRequest(): NextRequest {
const controller = new AbortController();
controller.abort();
return new NextRequest("http://localhost/api/stream", {
signal: controller.signal,
});
}

function setupSupabase(
goalSyncedAt: string | null = "2026-01-01T00:00:00Z",
unreadCount = 0
Expand Down Expand Up @@ -69,6 +79,10 @@ function setupSupabase(
// ─── tests ───────────────────────────────────────────────────────────────────

describe("GET /api/stream — SSE stream route", () => {
afterEach(() => {
vi.useRealTimers();
});

beforeEach(() => {
vi.clearAllMocks();
activeStreamConnections.clear();
Expand Down Expand Up @@ -215,6 +229,39 @@ describe("GET /api/stream — SSE stream route", () => {
expect(activeStreamConnections.get("user-1")).toBe(2);
});

it("releases the connection slot when the request is already aborted", async () => {
const { GET } = await import("@/app/api/stream/route");

await GET(makePreAbortedRequest());

expect(activeStreamConnections.has("user-1")).toBe(false);
});

it("closes stale streams and releases their slot after the max duration", async () => {
vi.useFakeTimers();
const { GET } = await import("@/app/api/stream/route");

await GET(makeRequest());
expect(activeStreamConnections.get("user-1")).toBe(1);

await vi.advanceTimersByTimeAsync(5 * 60 * 1000);

expect(activeStreamConnections.has("user-1")).toBe(false);
});

it("releases the connection slot when the stream reader is canceled", async () => {
const { GET } = await import("@/app/api/stream/route");

const res = await GET(makeRequest());
const reader = res.body?.getReader();

expect(activeStreamConnections.get("user-1")).toBe(1);

await reader?.cancel();

expect(activeStreamConnections.has("user-1")).toBe(false);
});

// ── response headers ──────────────────────────────────────────────────

it("includes Cache-Control: no-cache header", async () => {
Expand Down
Loading