Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 95 additions & 10 deletions apps/api/src/__tests__/notes.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1593,6 +1593,31 @@ describe("Chunked upload flow", () => {
expect(res.status).toBe(500);
expect((await res.json()).error).toBe("Corrupted upload session");
});

it("returns 500 when metadata is valid JSON but has the wrong shape", async () => {
const { json: initJson } = await initUpload({ chunkCount: 1 });
const chunk = chunkData("test-chunk");
await app.request(`/api/v1/notes/upload/${initJson.uploadId}/chunks/0`, {
method: "PUT",
headers: { "Content-Type": "application/octet-stream", "X-Chunk-Hash": sha256hex(chunk) },
body: chunk as BodyInit,
});

const { uploads: uploadsTable } = await import("../db/schema.js");
const { eq } = await import("drizzle-orm");
// Valid JSON but missing every required metadata field.
db.update(uploadsTable)
.set({ metadata: JSON.stringify({ unexpected: "shape" }) })
.where(eq(uploadsTable.id, initJson.uploadId))
.run();

const res = await app.request(`/api/v1/notes/upload/${initJson.uploadId}/complete`, {
method: "POST",
headers: authHeaders(),
});
expect(res.status).toBe(500);
expect((await res.json()).error).toBe("Corrupted upload session");
});
});

describe("GET /api/v1/notes/:id/stream", () => {
Expand Down Expand Up @@ -1850,6 +1875,30 @@ describe("GET /api/v1/notes/:id/stream", () => {
expect(res.headers.get("X-Salt")).toBe(testSalt);
expect(res.headers.get("X-Has-Password")).toBe("true");
});

it("streams chunks in order with read-ahead prefetching (more chunks than window)", async () => {
const chunkCount = 6;
const { id } = await createChunkedNote({ chunkCount });

const res = await app.request(`/api/v1/notes/${id}/stream`);
expect(res.status).toBe(200);
expect(res.headers.get("X-Chunk-Count")).toBe(String(chunkCount));

// Decode the length-prefixed framing and verify every chunk arrives
// in order with its original (client-encrypted) content.
const body = new Uint8Array(await res.arrayBuffer());
const view = new DataView(body.buffer, body.byteOffset, body.byteLength);
const frames: string[] = [];
let offset = 0;
while (offset < body.byteLength) {
const len = view.getUint32(offset);
offset += 4;
frames.push(Buffer.from(body.subarray(offset, offset + len)).toString());
offset += len;
}

expect(frames).toEqual(Array.from({ length: chunkCount }, (_, i) => `chunk-data-${String(i)}`));
});
});

describe("GET /api/v1/notes/:id/stream edge cases", () => {
Expand Down Expand Up @@ -1885,6 +1934,7 @@ describe("GET /api/v1/notes/:id/stream edge cases", () => {

async function createChunkedNoteViaApp(
targetApp: ReturnType<typeof createAppWithCustomStorage>,
chunkCount = 1,
): Promise<{ id: string; deleteToken: string }> {
const initRes = await targetApp.request("/api/v1/notes/upload/init", {
method: "POST",
Expand All @@ -1896,20 +1946,22 @@ describe("GET /api/v1/notes/:id/stream edge cases", () => {
expiresIn: 3600,
maxReads: 0,
fileCount: 1,
chunkCount: 1,
chunkCount,
}),
});
const { uploadId } = (await initRes.json()) as { uploadId: string };

const chunk = chunkData("chunk-data-0");
await targetApp.request(`/api/v1/notes/upload/${uploadId}/chunks/0`, {
method: "PUT",
headers: {
"Content-Type": "application/octet-stream",
"X-Chunk-Hash": sha256hex(chunk),
},
body: chunk as BodyInit,
});
for (let i = 0; i < chunkCount; i++) {
const chunk = chunkData(`chunk-data-${String(i)}`);
await targetApp.request(`/api/v1/notes/upload/${uploadId}/chunks/${String(i)}`, {
method: "PUT",
headers: {
"Content-Type": "application/octet-stream",
"X-Chunk-Hash": sha256hex(chunk),
},
body: chunk as BodyInit,
});
}

const completeRes = await targetApp.request(`/api/v1/notes/upload/${uploadId}/complete`, {
method: "POST",
Expand All @@ -1918,6 +1970,39 @@ describe("GET /api/v1/notes/:id/stream edge cases", () => {
return completeRes.json() as Promise<{ id: string; deleteToken: string }>;
}

it("errors the stream when a later chunk read fails mid-stream", async () => {
const realStorage = new LocalStorage(TEST_FILES_PATH);
// Create a 2-chunk note using real storage first
const { id } = await createChunkedNoteViaApp(createAppWithCustomStorage(db, realStorage), 2);

// Stream through a storage whose chunk-1 read fails (chunk 0 still works,
// so the pre-flight check passes and headers are already sent).
const failingStorage: StorageBackend = {
save: (noteId, data) => realStorage.save(noteId, data),
read: (key) => realStorage.read(key),
delete: (key) => realStorage.delete(key),
saveChunk: (noteId, idx, data) => realStorage.saveChunk(noteId, idx, data),
readChunk: (noteId, idx) =>
idx === 0
? realStorage.readChunk(noteId, idx)
: Promise.reject(new Error("mid-stream read failure")),
deleteChunks: (noteId, cnt) => realStorage.deleteChunks(noteId, cnt),
};
const failApp = createAppWithCustomStorage(db, failingStorage);

const res = await failApp.request(`/api/v1/notes/${id}/stream`);
expect(res.status).toBe(200); // Headers committed before the failure

// Consuming the body must surface the stream error
let streamFailed = false;
try {
await res.arrayBuffer();
} catch {
streamFailed = true;
}
expect(streamFailed).toBe(true);
});

it("handles corrupted chunk data (too small for auth tag)", async () => {
const realStorage = new LocalStorage(TEST_FILES_PATH);
// Create note using real storage first
Expand Down
8 changes: 4 additions & 4 deletions apps/api/src/middleware/rateLimit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,15 @@ export function createRateLimit(options: RateLimitOptions): RateLimitResult {

if (existing === undefined || existing.resetAt <= now) {
if (store.size >= MAX_STORE_SIZE && existing === undefined) {
let evicted = false;
// Bulk-evict every expired entry in one sweep: freeing all reusable
// slots at once (instead of one per request) keeps a full store from
// rejecting new clients under sustained traffic.
for (const [key, entry] of store) {
if (entry.resetAt <= now) {
store.delete(key);
evicted = true;
break;
}
}
if (!evicted) {
if (store.size >= MAX_STORE_SIZE) {
return c.json({ error: "Too many requests" }, 429);
}
}
Expand Down
54 changes: 42 additions & 12 deletions apps/api/src/routes/notes/chunked.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ import type { OpenAPIHono } from "@hono/zod-openapi";
import { SECRETSTREAM_ABYTES, serverDecrypt, serverEncrypt } from "@secret/crypto";
import {
chunkedUploadInitSchema,
isValidNoteId,
NOTE_ID_LENGTH,
UPLOAD_ID_LENGTH,
UPLOAD_SESSION_TTL,
uploadSessionMetadataSchema,
} from "@secret/shared";
import { eq } from "drizzle-orm";
import { nanoid } from "nanoid";
Expand Down Expand Up @@ -149,20 +151,20 @@ export function registerChunkedRoutes(app: OpenAPIHono<NotesEnv>): void {
return c.json({ error: "Upload incomplete" }, 400);
}

let meta: {
streamHeader: string;
clientNonce: string;
hasPassword: boolean;
expiresIn: number;
maxReads: number;
fileCount: number;
salt?: string;
};
// Re-validate the persisted metadata instead of trusting it blindly:
// a corrupted row (bad JSON or missing fields) must not produce a
// malformed note.
let parsedMetadata: unknown;
try {
meta = JSON.parse(session.metadata) as typeof meta;
parsedMetadata = JSON.parse(session.metadata);
} catch {
return c.json({ error: "Corrupted upload session" }, 500);
}
const metaResult = uploadSessionMetadataSchema.safeParse(parsedMetadata);
if (!metaResult.success) {
return c.json({ error: "Corrupted upload session" }, 500);
}
const meta = metaResult.data;

const now = new Date();
const expiresAt = new Date(now.getTime() + meta.expiresIn * 1000);
Expand Down Expand Up @@ -219,7 +221,7 @@ export function registerChunkedRoutes(app: OpenAPIHono<NotesEnv>): void {

app.get("/:id/stream", async (c) => {
const id = c.req.param("id");
if (!id || !/^[A-Za-z0-9_-]+$/.test(id) || id.length !== NOTE_ID_LENGTH) {
if (!id || !isValidNoteId(id)) {
return c.json({ error: "Invalid note ID" }, 400);
}

Expand Down Expand Up @@ -268,11 +270,39 @@ export function registerChunkedRoutes(app: OpenAPIHono<NotesEnv>): void {
throw err;
}

// Read-ahead window: keep a few chunk reads in flight so per-chunk
// storage latency (one round-trip per chunk on S3) overlaps with
// decryption and streaming instead of accumulating sequentially.
// Rejections are captured as values so an un-awaited prefetch can
// never surface as an unhandled promise rejection.
const PREFETCH_CHUNKS = 4;
type ChunkRead = { ok: true; data: Buffer } | { ok: false; reason: unknown };
const readChunkSafe = (index: number): Promise<ChunkRead> =>
(index === 0 ? Promise.resolve(firstChunk) : storage.readChunk(id, index)).then(
(data) => ({ ok: true as const, data }),
(reason: unknown) => ({ ok: false as const, reason }),
);

const stream = new ReadableStream({
async start(controller) {
try {
const readAhead: Promise<ChunkRead>[] = [];
let nextToFetch = 0;

for (let i = 0; i < chunkCount; i++) {
const storedData = i === 0 ? firstChunk : await storage.readChunk(id, i);
// Top up the read-ahead window.
while (nextToFetch < chunkCount && readAhead.length < PREFETCH_CHUNKS) {
readAhead.push(readChunkSafe(nextToFetch));
nextToFetch += 1;
}

const chunkRead = await (readAhead.shift() as Promise<ChunkRead>);
if (!chunkRead.ok) {
controller.error(chunkRead.reason);
return;
}

const storedData = chunkRead.data;
const iv = storedData.subarray(0, IV_LENGTH);
const encrypted = storedData.subarray(IV_LENGTH);

Expand Down
12 changes: 6 additions & 6 deletions apps/api/src/routes/notes/openapi-routes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@ import {
createNoteResponseSchema,
createNoteSchema,
deleteNoteResponseSchema,
NOTE_ID_LENGTH,
noteExistsResponseSchema,
noteIdSchema,
noteNotFoundResponseSchema,
readNoteResponseSchema,
} from "@secret/shared";

const noteIdParam = z
.string()
.regex(/^[A-Za-z0-9_-]+$/, "Invalid note ID format")
.length(NOTE_ID_LENGTH, `Note ID must be ${String(NOTE_ID_LENGTH)} characters`)
.openapi({ param: { name: "id", in: "path" }, example: "aBcDeFgHiJkL" });
// Reuse the shared note ID schema so the format is defined in one place.
const noteIdParam = noteIdSchema.openapi({
param: { name: "id", in: "path" },
example: "aBcDeFgHiJkL",
});

export const createNoteRoute = createRoute({
method: "post",
Expand Down
4 changes: 2 additions & 2 deletions apps/api/src/routes/notes/standard.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { timingSafeEqual } from "node:crypto";
import type { OpenAPIHono } from "@hono/zod-openapi";
import { createNoteMultipartSchema, NOTE_ID_LENGTH } from "@secret/shared";
import { createNoteMultipartSchema, isValidNoteId } from "@secret/shared";
import { eq } from "drizzle-orm";
import { notes } from "../../db/schema.js";
import { deleteOrSchedule } from "../../pendingDeletions.js";
Expand Down Expand Up @@ -128,7 +128,7 @@ export function registerStandardRoutes(app: OpenAPIHono<NotesEnv>): void {

app.get("/:id/raw", async (c) => {
const id = c.req.param("id");
if (!id || !/^[A-Za-z0-9_-]+$/.test(id) || id.length !== NOTE_ID_LENGTH) {
if (!id || !isValidNoteId(id)) {
return c.json({ error: "Invalid note ID" }, 400);
}

Expand Down
10 changes: 2 additions & 8 deletions apps/api/src/storage/local.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,9 @@ export class LocalStorage implements StorageBackend {

async deleteChunks(noteId: string, chunkCount: number): Promise<void> {
assertChunkCount(chunkCount);
// All chunks of a note live under its own directory, so a single
// recursive removal replaces per-chunk unlink round-trips.
const dirPath = this.assertSafePath(join(this.filesPath, noteId));
for (let i = 0; i < chunkCount; i++) {
try {
const filePath = this.assertSafePath(join(dirPath, `chunk_${String(i)}`));
await unlink(filePath);
} catch {
/* chunk already deleted or missing */
}
}
try {
await rm(dirPath, { recursive: true });
} catch {
Expand Down
Loading