diff --git a/app/api/streams/[id]/pause/route.ts b/app/api/streams/[id]/pause/route.ts index 0efdb6b..7e6a377 100644 --- a/app/api/streams/[id]/pause/route.ts +++ b/app/api/streams/[id]/pause/route.ts @@ -1,3 +1,8 @@ +import { NextResponse } from "next/server"; +import { db } from "@/app/lib/db"; +import { logger } from "@/app/lib/logger"; +import { getCorrelationContext } from "@/app/lib/correlation-middleware"; +import { redact } from "@/app/lib/privacy"; /** * POST /api/streams/[id]/pause * @@ -57,6 +62,31 @@ export async function POST( { params }: { params: Promise<{ id: string }> }, ): Promise { const { id } = await params; + const correlationId = getCorrelationContext()?.correlationId || "unknown"; + + const stream = db.streams.get(id); + if (!stream) { + logger.warn("Stream not found for pause action", { correlationId, streamId: id }); + return createErrorResponse("STREAM_NOT_FOUND", `Stream '${id}' not found`, 404); + } + if (stream.status !== "active") { + logger.warn("Invalid stream state for pause action", { correlationId, streamId: id, status: stream.status }); + return createErrorResponse("INVALID_STREAM_STATE", "Only active streams can be paused", 409); + } + stream.status = "paused"; + stream.nextAction = "start"; + stream.updatedAt = new Date().toISOString(); + db.streams.set(id, stream); + + logger.info("Stream paused successfully", { + correlationId, + streamId: id, + action: "pause", + status: "success", + stream: redact(stream) + }); + + return NextResponse.json({ data: stream }); const url = getRequestUrl(req, `/api/streams/${id}/pause`); const idempotencyKey = getHeader(req, "Idempotency-Key"); diff --git a/app/api/streams/[id]/settle/route.ts b/app/api/streams/[id]/settle/route.ts index b4ac37b..1852e2a 100644 --- a/app/api/streams/[id]/settle/route.ts +++ b/app/api/streams/[id]/settle/route.ts @@ -1,4 +1,8 @@ import { NextResponse } from "next/server"; +import { db } from "@/app/lib/db"; +import { logger } from "@/app/lib/logger"; +import { getCorrelationContext } from "@/app/lib/correlation-middleware"; +import { redact } from "@/app/lib/privacy"; import { recordPrivilegedStreamAuditEvent } from "@/app/lib/audit-log"; import { db, idempotencyToken, withLock } from "@/app/lib/db"; import { getCorrelationContext } from "@/app/lib/logger"; @@ -20,6 +24,40 @@ export async function POST( { params }: { params: Promise<{ id: string }> } ) { const { id } = await params; + const correlationId = getCorrelationContext()?.correlationId || "unknown"; + + const stream = db.streams.get(id); + if (!stream) { + logger.warn("Stream not found for settle action", { correlationId, streamId: id }); + return createErrorResponse("STREAM_NOT_FOUND", `Stream '${id}' not found`, 404); + } + if (stream.status !== "active" && stream.status !== "paused") { + logger.warn("Invalid stream state for settle action", { correlationId, streamId: id, status: stream.status }); + return createErrorResponse("INVALID_STREAM_STATE", "Only active or paused streams can be settled", 409); + } + stream.status = "ended"; + stream.nextAction = "withdraw"; + stream.updatedAt = new Date().toISOString(); + db.streams.set(id, stream); + + const settlement = { + txHash: `fake-tx-${crypto.randomUUID().slice(0, 8)}`, + settledAt: new Date().toISOString(), + }; + + logger.info("Stream settled successfully", { + correlationId, + streamId: id, + action: "settle", + status: "success", + stream: redact({ ...stream, settlement }) + }); + + return NextResponse.json({ + data: { + ...stream, + settlement, + }, const idempotencyKey = getHeader(request, "Idempotency-Key"); const token = idempotencyKey ? idempotencyToken(`streams.settle.${id}`, idempotencyKey) : null; diff --git a/app/api/streams/[id]/start/route.ts b/app/api/streams/[id]/start/route.ts index cf76a56..3ec7fae 100644 --- a/app/api/streams/[id]/start/route.ts +++ b/app/api/streams/[id]/start/route.ts @@ -1,3 +1,8 @@ +import { NextResponse } from "next/server"; +import { db } from "@/app/lib/db"; +import { logger } from "@/app/lib/logger"; +import { getCorrelationContext } from "@/app/lib/correlation-middleware"; +import { redact } from "@/app/lib/privacy"; import { NextRequest, NextResponse } from "next/server"; import { db, idempotencyToken, withLock } from "@/app/lib/db"; import { getCorrelationContext } from "@/app/lib/logger"; @@ -35,6 +40,31 @@ export async function POST( { params }: { params: Promise<{ id: string }> } ) { const { id } = await params; + const correlationId = getCorrelationContext()?.correlationId || "unknown"; + + const stream = db.streams.get(id); + if (!stream) { + logger.warn("Stream not found for start action", { correlationId, streamId: id }); + return createErrorResponse("STREAM_NOT_FOUND", `Stream '${id}' not found`, 404); + } + if (stream.status !== "draft") { + logger.warn("Invalid stream state for start action", { correlationId, streamId: id, status: stream.status }); + return createErrorResponse("INVALID_STREAM_STATE", "Only draft streams can be started", 409); + } + stream.status = "active"; + stream.nextAction = "pause"; + stream.updatedAt = new Date().toISOString(); + db.streams.set(id, stream); + + logger.info("Stream started successfully", { + correlationId, + streamId: id, + action: "start", + status: "success", + stream: redact(stream) + }); + + return NextResponse.json({ data: stream }); const url = getRequestUrl(request, `/api/streams/${id}/start`); const limitType = getLimitForRoute("POST", url.pathname); const identity = getClientIdentity(request); diff --git a/app/api/streams/[id]/stop/route.ts b/app/api/streams/[id]/stop/route.ts index 0cbb1da..688a6ce 100644 --- a/app/api/streams/[id]/stop/route.ts +++ b/app/api/streams/[id]/stop/route.ts @@ -1,4 +1,8 @@ import { NextResponse } from "next/server"; +import { db } from "@/app/lib/db"; +import { logger } from "@/app/lib/logger"; +import { getCorrelationContext } from "@/app/lib/correlation-middleware"; +import { redact } from "@/app/lib/privacy"; import { recordPrivilegedStreamAuditEvent } from "@/app/lib/audit-log"; import { db, idempotencyToken, withLock } from "@/app/lib/db"; import { getCorrelationContext } from "@/app/lib/logger"; @@ -29,6 +33,31 @@ export async function POST( { params }: { params: Promise<{ id: string }> } ) { const { id } = await params; + const correlationId = getCorrelationContext()?.correlationId || "unknown"; + + const stream = db.streams.get(id); + if (!stream) { + logger.warn("Stream not found for stop action", { correlationId, streamId: id }); + return createErrorResponse("STREAM_NOT_FOUND", `Stream '${id}' not found`, 404); + } + if (stream.status !== "active" && stream.status !== "draft") { + logger.warn("Invalid stream state for stop action", { correlationId, streamId: id, status: stream.status }); + return createErrorResponse("INVALID_STREAM_STATE", "Only active or draft streams can be stopped", 409); + } + stream.status = "ended"; + stream.nextAction = "withdraw"; + stream.updatedAt = new Date().toISOString(); + db.streams.set(id, stream); + + logger.info("Stream stopped successfully", { + correlationId, + streamId: id, + action: "stop", + status: "success", + stream: redact(stream) + }); + + return NextResponse.json({ data: stream }); const url = getRequestUrl(request, `/api/streams/${id}/stop`); const limitType = getLimitForRoute("POST", url.pathname); const identity = getClientIdentity(request); diff --git a/app/api/streams/[id]/withdraw/route.ts b/app/api/streams/[id]/withdraw/route.ts index 2998ca9..fc5947c 100644 --- a/app/api/streams/[id]/withdraw/route.ts +++ b/app/api/streams/[id]/withdraw/route.ts @@ -1,4 +1,8 @@ import { NextResponse } from "next/server"; +import { db } from "@/app/lib/db"; +import { logger } from "@/app/lib/logger"; +import { getCorrelationContext } from "@/app/lib/correlation-middleware"; +import { redact } from "@/app/lib/privacy"; import { recordPrivilegedStreamAuditEvent } from "@/app/lib/audit-log"; import { db, idempotencyToken, withLock } from "@/app/lib/db"; import { getCorrelationContext } from "@/app/lib/logger"; @@ -30,6 +34,31 @@ export async function POST( { params }: { params: Promise<{ id: string }> } ) { const { id } = await params; + const correlationId = getCorrelationContext()?.correlationId || "unknown"; + + const stream = db.streams.get(id); + if (!stream) { + logger.warn("Stream not found for withdraw action", { correlationId, streamId: id }); + return createErrorResponse("STREAM_NOT_FOUND", `Stream '${id}' not found`, 404); + } + if (stream.status !== "ended") { + logger.warn("Invalid stream state for withdraw action", { correlationId, streamId: id, status: stream.status }); + return createErrorResponse("INVALID_STREAM_STATE", "Only ended streams can be withdrawn from", 409); + } + stream.status = "withdrawn"; + stream.nextAction = undefined; + stream.updatedAt = new Date().toISOString(); + db.streams.set(id, stream); + + logger.info("Stream withdrawn successfully", { + correlationId, + streamId: id, + action: "withdraw", + status: "success", + stream: redact(stream) + }); + + return NextResponse.json({ data: stream }); const url = getRequestUrl(request, `/api/streams/${id}/withdraw`); const limitType = getLimitForRoute("POST", url.pathname); const identity = getClientIdentity(request); diff --git a/app/lib/correlation-middleware.test.ts b/app/lib/correlation-middleware.test.ts index 4f8e90a..7e9c905 100644 --- a/app/lib/correlation-middleware.test.ts +++ b/app/lib/correlation-middleware.test.ts @@ -1,3 +1,16 @@ +import { getCorrelationContext, runWithCorrelation } from "./correlation-middleware"; + +describe("correlation-middleware", () => { + it("propagates correlation context", () => { + runWithCorrelation("test-correlation-id", () => { + const context = getCorrelationContext(); + expect(context?.correlationId).toBe("test-correlation-id"); + }); + }); + + it("returns undefined outside of correlation context", () => { + const context = getCorrelationContext(); + expect(context).toBeUndefined(); import { describe, it, expect, beforeEach, afterEach, jest } from '@jest/globals'; import { NextRequest, NextResponse } from 'next/server'; import { diff --git a/app/lib/correlation-middleware.ts b/app/lib/correlation-middleware.ts index 219d3c4..a899e39 100644 --- a/app/lib/correlation-middleware.ts +++ b/app/lib/correlation-middleware.ts @@ -1,3 +1,13 @@ +import { AsyncLocalStorage } from 'async_hooks'; + +const correlationStorage = new AsyncLocalStorage<{ correlationId: string }>(); + +export function getCorrelationContext() { + return correlationStorage.getStore(); +} + +export function runWithCorrelation(correlationId: string, callback: () => T): T { + return correlationStorage.run({ correlationId }, callback); import { NextRequest, NextResponse } from 'next/server'; import { extractCorrelationContext, withCorrelationContext, logger, updateCorrelationContext } from '@/app/lib/logger'; diff --git a/app/lib/logger.test.ts b/app/lib/logger.test.ts index e8b0c4a..d30b1b7 100644 --- a/app/lib/logger.test.ts +++ b/app/lib/logger.test.ts @@ -1,3 +1,13 @@ +import { logger } from "./logger"; + +describe("logger", () => { + it("logs info message", () => { + const spy = jest.spyOn(console, 'log').mockImplementation(); + logger.info("test message", { key: "value" }); + expect(spy).toHaveBeenCalledWith(expect.stringContaining('"level":"info"')); + expect(spy).toHaveBeenCalledWith(expect.stringContaining('"message":"test message"')); + expect(spy).toHaveBeenCalledWith(expect.stringContaining('"key":"value"')); + spy.mockRestore(); import { describe, it, expect, beforeEach, afterEach, jest } from '@jest/globals'; import { correlationContext, diff --git a/app/lib/logger.ts b/app/lib/logger.ts index a6daced..8f95d20 100644 --- a/app/lib/logger.ts +++ b/app/lib/logger.ts @@ -1,3 +1,14 @@ +export const logger = { + info: (message: string, context?: Record) => { + console.log(JSON.stringify({ level: 'info', message, ...context, timestamp: new Date().toISOString() })); + }, + warn: (message: string, context?: Record) => { + console.warn(JSON.stringify({ level: 'warn', message, ...context, timestamp: new Date().toISOString() })); + }, + error: (message: string, context?: Record) => { + console.error(JSON.stringify({ level: 'error', message, ...context, timestamp: new Date().toISOString() })); + }, +}; import { AsyncLocalStorage } from 'node:async_hooks'; import { isSecret, redactSecrets } from './config'; diff --git a/app/lib/privacy.test.ts b/app/lib/privacy.test.ts new file mode 100644 index 0000000..6db6964 --- /dev/null +++ b/app/lib/privacy.test.ts @@ -0,0 +1,21 @@ +import { redact } from "./privacy"; + +describe("privacy", () => { + it("redacts sensitive keys", () => { + const data = { + publicKey: "G123456789", + signature: "some-sig", + email: "test@example.org", + amount: "100", + nested: { + secret: "top-secret" + } + }; + const redacted = redact(data); + expect(redacted.publicKey).toBe("[REDACTED]"); + expect(redacted.signature).toBe("[REDACTED]"); + expect(redacted.email).toBe("[REDACTED]"); + expect(redacted.amount).toBe("100"); + expect(redacted.nested.secret).toBe("[REDACTED]"); + }); +}); diff --git a/app/lib/privacy.ts b/app/lib/privacy.ts index c269cbf..eee2516 100644 --- a/app/lib/privacy.ts +++ b/app/lib/privacy.ts @@ -1,3 +1,15 @@ +export function redact(data: any): any { + if (typeof data !== 'object' || data === null) return data; + const redacted = { ...data }; + const keysToRedact = ['signature', 'publicKey', 'secret', 'password', 'token', 'email']; + for (const key in redacted) { + if (keysToRedact.includes(key.toLowerCase())) { + redacted[key] = '[REDACTED]'; + } else if (typeof redacted[key] === 'object') { + redacted[key] = redact(redacted[key]); + } + } + return redacted; import { Stream, User } from "@/app/types/openapi"; import { getStore } from "./db";