diff --git a/apps/backend/src/app/api/deployments/[id]/logs/stream/route.test.ts b/apps/backend/src/app/api/deployments/[id]/logs/stream/route.test.ts new file mode 100644 index 0000000..072a25a --- /dev/null +++ b/apps/backend/src/app/api/deployments/[id]/logs/stream/route.test.ts @@ -0,0 +1,167 @@ +import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest'; +import { NextRequest } from 'next/server'; +import { GET } from './route'; + +const mockSupabase = { + from: vi.fn().mockReturnThis(), + select: vi.fn().mockReturnThis(), + eq: vi.fn().mockReturnThis(), + single: vi.fn(), + gt: vi.fn().mockReturnThis(), + order: vi.fn().mockReturnThis(), + range: vi.fn().mockReturnThis(), +}; + +const mockUser = { id: 'test-user' }; + +const mockDeployment = { user_id: 'test-user', id: 'test-deployment' }; + +describe('GET /api/deployments/[id]/logs/stream', () => { + beforeEach(() => { + vi.clearAllMocks(); + vi.useFakeTimers(); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it('should initialize stream connection for authorized user', async () => { + mockSupabase.single.mockResolvedValueOnce({ + data: mockDeployment, + error: null, + }); + + const req = new NextRequest('http://localhost/api/deployments/test-id/logs/stream'); + + const response = await GET(req, { + params: { id: 'test-id' }, + user: mockUser, + supabase: mockSupabase, + } as any); + + expect(response.status).toBe(200); + expect(response.headers.get('content-type')).toBe('text/event-stream'); + expect(response.headers.get('cache-control')).toBe('no-cache'); + expect(response.headers.get('connection')).toBe('keep-alive'); + }); + + it('should return 404 for non-existent deployment', async () => { + mockSupabase.single.mockResolvedValueOnce({ + data: null, + error: null, + }); + + const req = new NextRequest('http://localhost/api/deployments/test-id/logs/stream'); + + const response = await GET(req, { + params: { id: 'test-id' }, + user: mockUser, + supabase: mockSupabase, + } as any); + + expect(response.status).toBe(404); + }); + + it('should return 404 for deployment not owned by user', async () => { + mockSupabase.single.mockResolvedValueOnce({ + data: { user_id: 'other-user', id: 'test-deployment' }, + error: null, + }); + + const req = new NextRequest('http://localhost/api/deployments/test-id/logs/stream'); + + const response = await GET(req, { + params: { id: 'test-id' }, + user: mockUser, + supabase: mockSupabase, + } as any); + + expect(response.status).toBe(404); + }); + + it('should validate since parameter format', async () => { + mockSupabase.single.mockResolvedValueOnce({ + data: mockDeployment, + error: null, + }); + + const req = new NextRequest( + 'http://localhost/api/deployments/test-id/logs/stream?since=invalid-date', + ); + + const response = await GET(req, { + params: { id: 'test-id' }, + user: mockUser, + supabase: mockSupabase, + } as any); + + expect(response.status).toBe(400); + const body = await response.json(); + expect(body.error).toContain('Invalid since parameter'); + }); + + it('should accept valid ISO 8601 since parameter', async () => { + mockSupabase.single.mockResolvedValueOnce({ + data: mockDeployment, + error: null, + }); + + const isoDate = new Date().toISOString(); + const req = new NextRequest( + `http://localhost/api/deployments/test-id/logs/stream?since=${encodeURIComponent(isoDate)}`, + ); + + const response = await GET(req, { + params: { id: 'test-id' }, + user: mockUser, + supabase: mockSupabase, + } as any); + + expect(response.status).toBe(200); + expect(response.headers.get('content-type')).toBe('text/event-stream'); + }); + + it('should set proper CORS headers', async () => { + mockSupabase.single.mockResolvedValueOnce({ + data: mockDeployment, + error: null, + }); + + const req = new NextRequest('http://localhost/api/deployments/test-id/logs/stream'); + + const response = await GET(req, { + params: { id: 'test-id' }, + user: mockUser, + supabase: mockSupabase, + } as any); + + expect(response.headers.get('access-control-allow-origin')).toBe('*'); + expect(response.headers.get('access-control-allow-methods')).toBe('GET'); + expect(response.headers.get('access-control-allow-headers')).toBe('Content-Type'); + }); + + it('should handle stream cancellation gracefully', async () => { + mockSupabase.single.mockResolvedValueOnce({ + data: mockDeployment, + error: null, + }); + + const req = new NextRequest('http://localhost/api/deployments/test-id/logs/stream'); + + const response = await GET(req, { + params: { id: 'test-id' }, + user: mockUser, + supabase: mockSupabase, + } as any); + + expect(response.status).toBe(200); + + // Simulate reading the stream + const reader = response.body?.getReader(); + expect(reader).toBeDefined(); + + // Cancel the stream + await reader?.cancel(); + }); +}); diff --git a/apps/backend/src/app/api/deployments/[id]/logs/stream/route.ts b/apps/backend/src/app/api/deployments/[id]/logs/stream/route.ts new file mode 100644 index 0000000..bd8abce --- /dev/null +++ b/apps/backend/src/app/api/deployments/[id]/logs/stream/route.ts @@ -0,0 +1,239 @@ +/** + * GET /api/deployments/[id]/logs/stream + * + * Server-Sent Events (SSE) endpoint for streaming deployment logs in real-time. + * Returns a continuous stream of log entries and heartbeat events. + * + * Authentication: requires a valid Supabase session (401 if missing). + * Ownership: the authenticated user must own the deployment. + * Non-owners and missing deployments both return 404. + * + * Query parameters: + * since ISO 8601 Start streaming logs created after this timestamp (optional) + * level log level Filter by log level (optional) + * stage stage name Filter by deployment stage (optional) + * + * Response format (text/event-stream): + * event: log + * data: {"id": "...", "deploymentId": "...", "timestamp": "...", "level": "...", "message": "..."} + * + * event: heartbeat + * data: {"timestamp": "..."} + * + * event: error + * data: {"error": "..."} + * + * Responses: + * 200 — SSE stream established + * 400 — Invalid query parameters + * 401 — Not authenticated + * 404 — Deployment not found (or not owned by caller) + * 500 — Unexpected server error + * + * Issue: #605 + * Branch: feat/issue-069-deployment-log-sse-streaming + */ + +import { NextRequest, NextResponse } from 'next/server'; +import { withAuth } from '@/lib/api/with-auth'; +import { + deploymentLogsService, + type ExtendedLogsQueryParams, +} from '@/services/deployment-logs.service'; + +const HEARTBEAT_INTERVAL = 30000; // 30 seconds +const POLL_INTERVAL = 2000; // 2 seconds +const MAX_STREAM_DURATION = 24 * 60 * 60 * 1000; // 24 hours + +class SSEStreamManager { + private encoder = new TextEncoder(); + private controller: ReadableStreamDefaultController | null = null; + private lastLogId: string | null = null; + private lastTimestamp: string | null = null; + private pollTimeout: NodeJS.Timeout | null = null; + private heartbeatTimeout: NodeJS.Timeout | null = null; + private streamStartTime: number; + private closed = false; + + constructor() { + this.streamStartTime = Date.now(); + } + + async initialize( + controller: ReadableStreamDefaultController, + deploymentId: string, + since: string | undefined, + supabase: any, + user: any, + ): Promise { + this.controller = controller; + this.lastTimestamp = since || new Date(Date.now() - 60000).toISOString(); // Default to last minute + + // Send initial connection event + this.sendEvent('connected', { deploymentId, timestamp: new Date().toISOString() }); + + // Start polling for new logs + this.startPolling(deploymentId, supabase); + + // Start heartbeat + this.startHeartbeat(); + } + + private startPolling(deploymentId: string, supabase: any): void { + const poll = async () => { + if (this.closed) return; + + try { + const params: ExtendedLogsQueryParams = { + page: 1, + limit: 100, + order: 'asc', + since: this.lastTimestamp, + }; + + const result = await deploymentLogsService.getLogs( + deploymentId, + params, + supabase, + ); + + // Send any new logs + for (const log of result.data) { + if (!this.lastLogId || log.id !== this.lastLogId) { + this.sendEvent('log', log); + this.lastLogId = log.id; + } + } + + // Update last timestamp to avoid re-fetching the same logs + if (result.data.length > 0) { + const lastLog = result.data[result.data.length - 1]; + this.lastTimestamp = lastLog.timestamp; + } + + // Check if stream duration exceeded + if (Date.now() - this.streamStartTime > MAX_STREAM_DURATION) { + this.sendEvent('end', { + reason: 'Stream duration limit reached', + timestamp: new Date().toISOString(), + }); + this.close(); + return; + } + } catch (err: unknown) { + const msg = err instanceof Error ? err.message : 'Polling failed'; + console.error('[sse-stream] polling error:', err); + this.sendEvent('error', { error: msg }); + // Don't close on error, continue polling + } + + // Schedule next poll + if (!this.closed) { + this.pollTimeout = setTimeout(poll, POLL_INTERVAL); + } + }; + + poll(); + } + + private startHeartbeat(): void { + const heartbeat = () => { + if (this.closed) return; + + this.sendEvent('heartbeat', { timestamp: new Date().toISOString() }); + + if (!this.closed) { + this.heartbeatTimeout = setTimeout(heartbeat, HEARTBEAT_INTERVAL); + } + }; + + this.heartbeatTimeout = setTimeout(heartbeat, HEARTBEAT_INTERVAL); + } + + private sendEvent(eventType: string, data: Record): void { + if (!this.controller || this.closed) return; + + try { + const eventStr = `event: ${eventType}\ndata: ${JSON.stringify(data)}\n\n`; + const encoded = this.encoder.encode(eventStr); + this.controller.enqueue(encoded); + } catch (err: unknown) { + console.error('[sse-stream] failed to send event:', err); + this.close(); + } + } + + close(): void { + if (this.closed) return; + this.closed = true; + + if (this.pollTimeout) clearTimeout(this.pollTimeout); + if (this.heartbeatTimeout) clearTimeout(this.heartbeatTimeout); + + try { + this.controller?.close(); + } catch (err: unknown) { + console.error('[sse-stream] error closing stream:', err); + } + } +} + +export const GET = withAuth(async (req: NextRequest, { params, user, supabase }) => { + const deploymentId = (params as { id: string }).id; + + // Ownership check + const { data: deployment } = await supabase + .from('deployments') + .select('user_id') + .eq('id', deploymentId) + .single(); + + if (!deployment || deployment.user_id !== user.id) { + return NextResponse.json({ error: 'Deployment not found' }, { status: 404 }); + } + + // Validate query parameters + const since = req.nextUrl.searchParams.get('since') ?? undefined; + if (since) { + const d = new Date(since); + if (isNaN(d.getTime())) { + return NextResponse.json( + { error: 'Invalid since parameter' }, + { status: 400 }, + ); + } + } + + try { + const manager = new SSEStreamManager(); + + const stream = new ReadableStream({ + start(controller) { + manager.initialize(controller, deploymentId, since, supabase, user).catch( + (err: unknown) => { + console.error('[sse-stream] initialization error:', err); + controller.error(err); + }, + ); + }, + cancel() { + manager.close(); + }, + }); + + return new NextResponse(stream, { + headers: { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + 'Connection': 'keep-alive', + 'Access-Control-Allow-Origin': '*', + 'Access-Control-Allow-Methods': 'GET', + 'Access-Control-Allow-Headers': 'Content-Type', + }, + }); + } catch (err: unknown) { + const msg = err instanceof Error ? err.message : 'Failed to establish stream'; + console.error('[sse-stream] unexpected error:', err); + return NextResponse.json({ error: msg }, { status: 500 }); + } +});