Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions app/api/streams/[id]/pause/route.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
import { NextResponse } from "next/server";
import { db } from "@/app/lib/db";
import { logger } from "@/app/lib/logger";
import { getCorrelationContext } from "@/app/lib/correlation-middleware";
import { redact } from "@/app/lib/privacy";
/**
* POST /api/streams/[id]/pause
*
Expand Down Expand Up @@ -57,6 +62,31 @@ export async function POST(
{ params }: { params: Promise<{ id: string }> },
): Promise<NextResponse> {
const { id } = await params;
const correlationId = getCorrelationContext()?.correlationId || "unknown";

const stream = db.streams.get(id);
if (!stream) {
logger.warn("Stream not found for pause action", { correlationId, streamId: id });
return createErrorResponse("STREAM_NOT_FOUND", `Stream '${id}' not found`, 404);
}
if (stream.status !== "active") {
logger.warn("Invalid stream state for pause action", { correlationId, streamId: id, status: stream.status });
return createErrorResponse("INVALID_STREAM_STATE", "Only active streams can be paused", 409);
}
stream.status = "paused";
stream.nextAction = "start";
stream.updatedAt = new Date().toISOString();
db.streams.set(id, stream);

logger.info("Stream paused successfully", {
correlationId,
streamId: id,
action: "pause",
status: "success",
stream: redact(stream)
});

return NextResponse.json({ data: stream });
const url = getRequestUrl(req, `/api/streams/${id}/pause`);
const idempotencyKey = getHeader(req, "Idempotency-Key");

Expand Down
38 changes: 38 additions & 0 deletions app/api/streams/[id]/settle/route.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import { NextResponse } from "next/server";
import { db } from "@/app/lib/db";
import { logger } from "@/app/lib/logger";
import { getCorrelationContext } from "@/app/lib/correlation-middleware";
import { redact } from "@/app/lib/privacy";
import { recordPrivilegedStreamAuditEvent } from "@/app/lib/audit-log";
import { db, idempotencyToken, withLock } from "@/app/lib/db";
import { getCorrelationContext } from "@/app/lib/logger";
Expand All @@ -20,6 +24,40 @@ export async function POST(
{ params }: { params: Promise<{ id: string }> }
) {
const { id } = await params;
const correlationId = getCorrelationContext()?.correlationId || "unknown";

const stream = db.streams.get(id);
if (!stream) {
logger.warn("Stream not found for settle action", { correlationId, streamId: id });
return createErrorResponse("STREAM_NOT_FOUND", `Stream '${id}' not found`, 404);
}
if (stream.status !== "active" && stream.status !== "paused") {
logger.warn("Invalid stream state for settle action", { correlationId, streamId: id, status: stream.status });
return createErrorResponse("INVALID_STREAM_STATE", "Only active or paused streams can be settled", 409);
}
stream.status = "ended";
stream.nextAction = "withdraw";
stream.updatedAt = new Date().toISOString();
db.streams.set(id, stream);

const settlement = {
txHash: `fake-tx-${crypto.randomUUID().slice(0, 8)}`,
settledAt: new Date().toISOString(),
};

logger.info("Stream settled successfully", {
correlationId,
streamId: id,
action: "settle",
status: "success",
stream: redact({ ...stream, settlement })
});

return NextResponse.json({
data: {
...stream,
settlement,
},
const idempotencyKey = getHeader(request, "Idempotency-Key");
const token = idempotencyKey ? idempotencyToken(`streams.settle.${id}`, idempotencyKey) : null;

Expand Down
30 changes: 30 additions & 0 deletions app/api/streams/[id]/start/route.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
import { NextResponse } from "next/server";
import { db } from "@/app/lib/db";
import { logger } from "@/app/lib/logger";
import { getCorrelationContext } from "@/app/lib/correlation-middleware";
import { redact } from "@/app/lib/privacy";
import { NextRequest, NextResponse } from "next/server";
import { db, idempotencyToken, withLock } from "@/app/lib/db";
import { getCorrelationContext } from "@/app/lib/logger";
Expand Down Expand Up @@ -35,6 +40,31 @@ export async function POST(
{ params }: { params: Promise<{ id: string }> }
) {
const { id } = await params;
const correlationId = getCorrelationContext()?.correlationId || "unknown";

const stream = db.streams.get(id);
if (!stream) {
logger.warn("Stream not found for start action", { correlationId, streamId: id });
return createErrorResponse("STREAM_NOT_FOUND", `Stream '${id}' not found`, 404);
}
if (stream.status !== "draft") {
logger.warn("Invalid stream state for start action", { correlationId, streamId: id, status: stream.status });
return createErrorResponse("INVALID_STREAM_STATE", "Only draft streams can be started", 409);
}
stream.status = "active";
stream.nextAction = "pause";
stream.updatedAt = new Date().toISOString();
db.streams.set(id, stream);

logger.info("Stream started successfully", {
correlationId,
streamId: id,
action: "start",
status: "success",
stream: redact(stream)
});

return NextResponse.json({ data: stream });
const url = getRequestUrl(request, `/api/streams/${id}/start`);
const limitType = getLimitForRoute("POST", url.pathname);
const identity = getClientIdentity(request);
Expand Down
29 changes: 29 additions & 0 deletions app/api/streams/[id]/stop/route.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import { NextResponse } from "next/server";
import { db } from "@/app/lib/db";
import { logger } from "@/app/lib/logger";
import { getCorrelationContext } from "@/app/lib/correlation-middleware";
import { redact } from "@/app/lib/privacy";
import { recordPrivilegedStreamAuditEvent } from "@/app/lib/audit-log";
import { db, idempotencyToken, withLock } from "@/app/lib/db";
import { getCorrelationContext } from "@/app/lib/logger";
Expand Down Expand Up @@ -29,6 +33,31 @@ export async function POST(
{ params }: { params: Promise<{ id: string }> }
) {
const { id } = await params;
const correlationId = getCorrelationContext()?.correlationId || "unknown";

const stream = db.streams.get(id);
if (!stream) {
logger.warn("Stream not found for stop action", { correlationId, streamId: id });
return createErrorResponse("STREAM_NOT_FOUND", `Stream '${id}' not found`, 404);
}
if (stream.status !== "active" && stream.status !== "draft") {
logger.warn("Invalid stream state for stop action", { correlationId, streamId: id, status: stream.status });
return createErrorResponse("INVALID_STREAM_STATE", "Only active or draft streams can be stopped", 409);
}
stream.status = "ended";
stream.nextAction = "withdraw";
stream.updatedAt = new Date().toISOString();
db.streams.set(id, stream);

logger.info("Stream stopped successfully", {
correlationId,
streamId: id,
action: "stop",
status: "success",
stream: redact(stream)
});

return NextResponse.json({ data: stream });
const url = getRequestUrl(request, `/api/streams/${id}/stop`);
const limitType = getLimitForRoute("POST", url.pathname);
const identity = getClientIdentity(request);
Expand Down
29 changes: 29 additions & 0 deletions app/api/streams/[id]/withdraw/route.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import { NextResponse } from "next/server";
import { db } from "@/app/lib/db";
import { logger } from "@/app/lib/logger";
import { getCorrelationContext } from "@/app/lib/correlation-middleware";
import { redact } from "@/app/lib/privacy";
import { recordPrivilegedStreamAuditEvent } from "@/app/lib/audit-log";
import { db, idempotencyToken, withLock } from "@/app/lib/db";
import { getCorrelationContext } from "@/app/lib/logger";
Expand Down Expand Up @@ -30,6 +34,31 @@ export async function POST(
{ params }: { params: Promise<{ id: string }> }
) {
const { id } = await params;
const correlationId = getCorrelationContext()?.correlationId || "unknown";

const stream = db.streams.get(id);
if (!stream) {
logger.warn("Stream not found for withdraw action", { correlationId, streamId: id });
return createErrorResponse("STREAM_NOT_FOUND", `Stream '${id}' not found`, 404);
}
if (stream.status !== "ended") {
logger.warn("Invalid stream state for withdraw action", { correlationId, streamId: id, status: stream.status });
return createErrorResponse("INVALID_STREAM_STATE", "Only ended streams can be withdrawn from", 409);
}
stream.status = "withdrawn";
stream.nextAction = undefined;
stream.updatedAt = new Date().toISOString();
db.streams.set(id, stream);

logger.info("Stream withdrawn successfully", {
correlationId,
streamId: id,
action: "withdraw",
status: "success",
stream: redact(stream)
});

return NextResponse.json({ data: stream });
const url = getRequestUrl(request, `/api/streams/${id}/withdraw`);
const limitType = getLimitForRoute("POST", url.pathname);
const identity = getClientIdentity(request);
Expand Down
13 changes: 13 additions & 0 deletions app/lib/correlation-middleware.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
import { getCorrelationContext, runWithCorrelation } from "./correlation-middleware";

describe("correlation-middleware", () => {
it("propagates correlation context", () => {
runWithCorrelation("test-correlation-id", () => {
const context = getCorrelationContext();
expect(context?.correlationId).toBe("test-correlation-id");
});
});

it("returns undefined outside of correlation context", () => {
const context = getCorrelationContext();
expect(context).toBeUndefined();
import { describe, it, expect, beforeEach, afterEach, jest } from '@jest/globals';
import { NextRequest, NextResponse } from 'next/server';
import {
Expand Down
10 changes: 10 additions & 0 deletions app/lib/correlation-middleware.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
import { AsyncLocalStorage } from 'async_hooks';

const correlationStorage = new AsyncLocalStorage<{ correlationId: string }>();

export function getCorrelationContext() {
return correlationStorage.getStore();
}

export function runWithCorrelation<T>(correlationId: string, callback: () => T): T {
return correlationStorage.run({ correlationId }, callback);
import { NextRequest, NextResponse } from 'next/server';
import { extractCorrelationContext, withCorrelationContext, logger, updateCorrelationContext } from '@/app/lib/logger';

Expand Down
10 changes: 10 additions & 0 deletions app/lib/logger.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
import { logger } from "./logger";

describe("logger", () => {
it("logs info message", () => {
const spy = jest.spyOn(console, 'log').mockImplementation();
logger.info("test message", { key: "value" });
expect(spy).toHaveBeenCalledWith(expect.stringContaining('"level":"info"'));
expect(spy).toHaveBeenCalledWith(expect.stringContaining('"message":"test message"'));
expect(spy).toHaveBeenCalledWith(expect.stringContaining('"key":"value"'));
spy.mockRestore();
import { describe, it, expect, beforeEach, afterEach, jest } from '@jest/globals';
import {
correlationContext,
Expand Down
11 changes: 11 additions & 0 deletions app/lib/logger.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
export const logger = {
info: (message: string, context?: Record<string, any>) => {
console.log(JSON.stringify({ level: 'info', message, ...context, timestamp: new Date().toISOString() }));
},
warn: (message: string, context?: Record<string, any>) => {
console.warn(JSON.stringify({ level: 'warn', message, ...context, timestamp: new Date().toISOString() }));
},
error: (message: string, context?: Record<string, any>) => {
console.error(JSON.stringify({ level: 'error', message, ...context, timestamp: new Date().toISOString() }));
},
};
import { AsyncLocalStorage } from 'node:async_hooks';
import { isSecret, redactSecrets } from './config';

Expand Down
21 changes: 21 additions & 0 deletions app/lib/privacy.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { redact } from "./privacy";

describe("privacy", () => {
it("redacts sensitive keys", () => {
const data = {
publicKey: "G123456789",
signature: "some-sig",
email: "test@example.org",
amount: "100",
nested: {
secret: "top-secret"
}
};
const redacted = redact(data);
expect(redacted.publicKey).toBe("[REDACTED]");
expect(redacted.signature).toBe("[REDACTED]");
expect(redacted.email).toBe("[REDACTED]");
expect(redacted.amount).toBe("100");
expect(redacted.nested.secret).toBe("[REDACTED]");
});
});
12 changes: 12 additions & 0 deletions app/lib/privacy.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
export function redact(data: any): any {
if (typeof data !== 'object' || data === null) return data;
const redacted = { ...data };
const keysToRedact = ['signature', 'publicKey', 'secret', 'password', 'token', 'email'];
for (const key in redacted) {
if (keysToRedact.includes(key.toLowerCase())) {
redacted[key] = '[REDACTED]';
} else if (typeof redacted[key] === 'object') {
redacted[key] = redact(redacted[key]);
}
}
return redacted;
import { Stream, User } from "@/app/types/openapi";
import { getStore } from "./db";

Expand Down
Loading