Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions apps/api/src/handlers/documents/get/handler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import type { DocumentDetailResponse } from '@clouddocs/shared-types';

import { NotFoundError, ValidationError } from '../../../lib/errors';
import { DocumentsRepo, toDocument } from '../../../repositories/documents-repo';
import { AiAnalysesRepo, toAiAnalysis } from '../../../repositories/ai-analyses-repo';
import {
compose,
jsonResponse,
withActiveOrg,
withAuth,
withErrorHandler,
withRequestLogger,
withSecrets,
type LambdaHandler,
} from '../../../middlewares';

/** GET /v1/documents/{id} — document detail with its AI analyses. */
export const handler: LambdaHandler = withSecrets(
withRequestLogger(
compose(withErrorHandler)(
withAuth(
withActiveOrg(async (ctx) => {
const id = ctx.event.pathParameters?.['id'];
if (!id) throw new ValidationError('Document id missing from path.');

const doc = await new DocumentsRepo(ctx.orgId).findById(id);
if (!doc) throw new NotFoundError('Document not found.');

const analyses = await new AiAnalysesRepo(ctx.orgId).listForDocument(id);
const body: DocumentDetailResponse = {
document: toDocument(doc),
analyses: analyses.map(toAiAnalysis),
};
return jsonResponse(200, body);
}),
),
),
),
);
18 changes: 18 additions & 0 deletions apps/api/src/handlers/workers/_shared.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import type { AnalysisKind } from '@clouddocs/shared-types';
import type { DocumentExtractedDetail } from '../../lib/events';

/**
* The analysis kinds that must all exist before a document flips to `ready`.
* Both analyze workers reference this so the ready-join stays consistent.
*/
export const READY_KINDS: readonly AnalysisKind[] = ['summary', 'classification'];

export function isExtractedDetail(detail: unknown): detail is DocumentExtractedDetail {
return (
typeof detail === 'object' &&
detail !== null &&
typeof (detail as DocumentExtractedDetail).documentId === 'string' &&
typeof (detail as DocumentExtractedDetail).orgId === 'string' &&
typeof (detail as DocumentExtractedDetail).textS3Key === 'string'
);
}
42 changes: 42 additions & 0 deletions apps/api/src/handlers/workers/classify/handler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/**
* classify-worker — triggered by the `DocumentExtracted` fan-out. Reads the
* extracted text, asks the AI provider to categorize it, stores an
* `ai_analyses` row, mirrors category + tags onto the document (for list
* filtering in Phase 5), and runs the ready-join.
*/
import { getAiProvider } from '../../../lib/ai';
import { getObjectText } from '../../../lib/storage/s3';
import { AiAnalysesRepo } from '../../../repositories/ai-analyses-repo';
import { DocumentsRepo } from '../../../repositories/documents-repo';
import { sqsWorker } from '../runner';
import { isExtractedDetail, READY_KINDS } from '../_shared';

export const handler = sqsWorker('classify', async (envelope, log) => {
if (!isExtractedDetail(envelope.detail)) {
log.warn({ envelope }, 'classify: unexpected event detail, skipping');
return;
}
const { orgId, documentId, textS3Key } = envelope.detail;

const docs = new DocumentsRepo(orgId);
await docs.markAnalyzing(documentId);

const text = await getObjectText(textS3Key);
const { data, usage } = await getAiProvider().classify(text);

const analyses = new AiAnalysesRepo(orgId);
await analyses.create({
documentId,
kind: 'classification',
model: usage.model,
promptVersion: usage.promptVersion,
...(usage.inputTokens != null ? { inputTokens: usage.inputTokens } : {}),
...(usage.outputTokens != null ? { outputTokens: usage.outputTokens } : {}),
...(usage.costUsd != null ? { costUsd: usage.costUsd } : {}),
result: data,
});
await docs.setClassification(documentId, { category: data.category, tags: data.tags });

const ready = await docs.markReadyIfAnalysesComplete(documentId, READY_KINDS, READY_KINDS.length);
log.info({ documentId, category: data.category, ready }, 'classify: done');
});
68 changes: 68 additions & 0 deletions apps/api/src/handlers/workers/extract/handler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/**
* extract-worker — SQS-triggered (S3 ObjectCreated → EventBridge → ingest queue).
*
* Downloads the uploaded file, extracts plain text (pdf-parse / mammoth), stores
* it in S3, advances the document to `extracted`, and emits a `DocumentExtracted`
* event that fans out to the summarize + classify queues.
*/
import { DocumentsRepo } from '../../../repositories/documents-repo';
import { buildTextKey, getObjectBytes, putText } from '../../../lib/storage/s3';
import { extractText } from '../../../lib/extract/text-extractor';
import { publishDocumentExtracted } from '../../../lib/events';
import { sqsWorker, type EventBridgeEnvelope } from '../runner';

interface S3ObjectCreatedDetail {
bucket: { name: string };
object: { key: string };
}

/** raw-uploads/{orgId}/{documentId}/{filename} → ids. */
function parseRawKey(key: string): { orgId: string; documentId: string } | null {
const parts = decodeURIComponent(key).split('/');
if (parts.length < 4 || parts[0] !== 'raw-uploads') return null;
return { orgId: parts[1]!, documentId: parts[2]! };
}

export const handler = sqsWorker('extract', async (envelope: EventBridgeEnvelope, log) => {
const detail = envelope.detail as S3ObjectCreatedDetail;
const key = detail?.object?.key;
if (!key) {
log.warn({ envelope }, 'extract: event missing object key, skipping');
return;
}

const ids = parseRawKey(key);
if (!ids) {
log.warn({ key }, 'extract: key not under raw-uploads/, skipping');
return;
}
const { orgId, documentId } = ids;
const repo = new DocumentsRepo(orgId);

const doc = await repo.findById(documentId);
if (!doc) {
log.warn({ orgId, documentId }, 'extract: no document row for key, skipping');
return;
}

const claimed = await repo.claimForExtraction(documentId);
if (!claimed) {
log.info({ documentId, status: doc.status }, 'extract: already claimed/processed, skipping');
return;
}

try {
const bytes = await getObjectBytes(claimed.s3_key);
const { text, pageCount } = await extractText(bytes, claimed.mime_type);
const textS3Key = buildTextKey(orgId, documentId);
await putText(textS3Key, text);
await repo.markExtracted(documentId, { textS3Key, ...(pageCount ? { pageCount } : {}) });
await publishDocumentExtracted({ orgId, documentId, textS3Key });
log.info({ documentId, pageCount, chars: text.length }, 'extract: done');
} catch (err) {
// Terminal for this document: mark failed and consume the message rather
// than retry a file we can't parse. Transient infra blips are rare in dev.
log.error({ err, documentId }, 'extract: failed');
await repo.setStatus(documentId, 'failed', 'Text extraction failed.');
}
});
179 changes: 179 additions & 0 deletions apps/api/src/handlers/workers/pipeline.integration.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
/**
* Document AI pipeline integration test against a real Postgres.
*
* Same safety gate as the other suites (RUN_INTEGRATION=1 + localhost DB). S3,
* the text extractor and EventBridge are mocked, and the AI provider falls back
* to the deterministic MockAiProvider (no OPENAI_API_KEY), so the suite is
* offline and free — it exercises the worker → DB wiring and the ready-join.
*/
import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from 'vitest';
import type { Context, SQSEvent } from 'aws-lambda';

const RUN = process.env['RUN_INTEGRATION'] === '1';
const INTEGRATION_URL =
process.env['INTEGRATION_DATABASE_URL'] ??
'postgres://clouddocs:clouddocs@localhost:5434/clouddocs';
const LOCAL_HOST_RE = /(?:^|@|\/\/)(?:localhost|127\.0\.0\.1|0\.0\.0\.0)(?::|\/|$)/;
const URL_LOCAL = LOCAL_HOST_RE.test(INTEGRATION_URL);

if (RUN && URL_LOCAL) {
process.env['DATABASE_URL'] = INTEGRATION_URL;
process.env['UPLOADS_BUCKET'] = 'test-bucket';
// Force the mock AI provider regardless of what .env.local carries.
delete process.env['OPENAI_API_KEY'];
}

const EXTRACTED_TEXT = 'This is an INVOICE for consulting services. Total due: $1000.';

vi.mock('../../lib/storage/s3', async (importOriginal) => {
const actual = await importOriginal<typeof import('../../lib/storage/s3')>();
return {
...actual,
getObjectBytes: vi.fn(async () => Buffer.from('%PDF-1.4 fake')),
getObjectText: vi.fn(async () => EXTRACTED_TEXT),
putText: vi.fn(async () => undefined),
};
});
vi.mock('../../lib/extract/text-extractor', () => ({
extractText: vi.fn(async () => ({ text: EXTRACTED_TEXT, pageCount: 2 })),
}));
vi.mock('../../lib/events', async (importOriginal) => {
const actual = await importOriginal<typeof import('../../lib/events')>();
return { ...actual, publishDocumentExtracted: vi.fn(async () => undefined) };
});

import { closePool, query } from '../../lib/db/client';
import { buildRawKey, buildTextKey } from '../../lib/storage/s3';
import { DOCUMENT_EXTRACTED, EVENT_SOURCE } from '../../lib/events';
import { DocumentsRepo } from '../../repositories/documents-repo';
import { AiAnalysesRepo } from '../../repositories/ai-analyses-repo';
import { handler as extractHandler } from './extract/handler';
import { handler as summarizeHandler } from './summarize/handler';
import { handler as classifyHandler } from './classify/handler';

const lambdaCtx = {} as Context;
const noCb = undefined as never;

/** Wrap an EventBridge envelope as a one-record SQS event. */
function sqsEventFor(envelope: unknown): SQSEvent {
return {
Records: [
{
messageId: 'm1',
receiptHandle: 'r1',
body: JSON.stringify(envelope),
attributes: {} as never,
messageAttributes: {},
md5OfBody: '',
eventSource: 'aws:sqs',
eventSourceARN: 'arn:test',
awsRegion: 'sa-east-1',
},
],
};
}

async function seedDoc(): Promise<{ orgId: string; docId: string; s3Key: string }> {
const orgs = await query<{ id: string }>(
`INSERT INTO organizations (name, slug) VALUES ('Acme', 'acme') RETURNING id`,
);
const orgId = orgs[0]!.id;
const users = await query<{ id: string }>(
`INSERT INTO users (email, password_hash) VALUES ('demo@test.com', 'x') RETURNING id`,
);
const userId = users[0]!.id;
await query(`INSERT INTO memberships (user_id, org_id, role) VALUES ($1, $2, 'owner')`, [
userId,
orgId,
]);
const repo = new DocumentsRepo(orgId);
const crypto = await import('node:crypto');
const docId = crypto.randomUUID();
const s3Key = buildRawKey(orgId, docId, 'invoice.pdf');
await repo.create({
id: docId,
uploadedBy: userId,
filename: 'invoice.pdf',
mimeType: 'application/pdf',
sizeBytes: 1234,
s3Key,
});
await repo.setStatus(docId, 'uploaded');
return { orgId, docId, s3Key };
}

describe.skipIf(!RUN || !URL_LOCAL)('document AI pipeline', () => {
beforeAll(() => {
if (!process.env['JWT_PRIVATE_KEY'] || !process.env['JWT_PUBLIC_KEY']) {
throw new Error('JWT keys must be set for integration tests.');
}
});

beforeEach(async () => {
await query(
'TRUNCATE ai_analyses, documents, refresh_tokens, invitations, memberships, organizations, users RESTART IDENTITY CASCADE',
);
});

afterAll(async () => {
await closePool();
});

it('extract → summarize → classify → ready', async () => {
const { orgId, docId, s3Key } = await seedDoc();

// 1. extract: S3 ObjectCreated envelope
await extractHandler(
sqsEventFor({
source: 'aws.s3',
'detail-type': 'Object Created',
detail: { bucket: { name: 'test-bucket' }, object: { key: s3Key } },
}),
lambdaCtx,
noCb,
);

const docs = new DocumentsRepo(orgId);
let doc = await docs.findById(docId);
expect(doc?.status).toBe('extracted');
expect(doc?.text_s3_key).toBe(buildTextKey(orgId, docId));
expect(doc?.page_count).toBe(2);

// 2. analyze fan-out: DocumentExtracted envelope to both workers
const extractedEnvelope = {
source: EVENT_SOURCE,
'detail-type': DOCUMENT_EXTRACTED,
detail: { orgId, documentId: docId, textS3Key: buildTextKey(orgId, docId) },
};
await summarizeHandler(sqsEventFor(extractedEnvelope), lambdaCtx, noCb);
await classifyHandler(sqsEventFor(extractedEnvelope), lambdaCtx, noCb);

// 3. assert: ready, classified, two analyses
doc = await docs.findById(docId);
expect(doc?.status).toBe('ready');
expect(doc?.category).toBe('Invoice'); // mock detects "invoice" in the text
expect(doc?.tags.length).toBeGreaterThan(0);

const analyses = await new AiAnalysesRepo(orgId).listForDocument(docId);
expect(analyses.map((a) => a.kind).sort()).toEqual(['classification', 'summary']);
});

it('extract marks the document failed when extraction throws', async () => {
const { orgId, docId, s3Key } = await seedDoc();
const extractor = await import('../../lib/extract/text-extractor');
vi.mocked(extractor.extractText).mockRejectedValueOnce(new Error('corrupt pdf'));

await extractHandler(
sqsEventFor({
source: 'aws.s3',
'detail-type': 'Object Created',
detail: { bucket: { name: 'test-bucket' }, object: { key: s3Key } },
}),
lambdaCtx,
noCb,
);

const doc = await new DocumentsRepo(orgId).findById(docId);
expect(doc?.status).toBe('failed');
});
});
47 changes: 47 additions & 0 deletions apps/api/src/handlers/workers/runner.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/**
* Shared SQS worker runner for the document pipeline.
*
* All worker queues are fed by EventBridge (S3 events → ingest queue; our
* `DocumentExtracted` event → analyze queues), so each SQS record body is an
* EventBridge envelope. The runner:
* - loads Secrets Manager values into env (DB URL, OpenAI key) once per cold
* start, like the HTTP handlers' `withSecrets`;
* - processes each record independently and reports per-record failures via
* `batchItemFailures` so only failed messages are retried (the Lambda's
* event source must have ReportBatchItemFailures enabled).
*/
import type { SQSBatchResponse, SQSEvent, SQSHandler } from 'aws-lambda';

import { loadSecretsIntoEnv } from '../../lib/secrets';
import { createLogger, type RequestLogger } from '../../lib/logger';

export interface EventBridgeEnvelope {
source: string;
'detail-type': string;
detail: unknown;
}

export type WorkerRecordHandler = (
envelope: EventBridgeEnvelope,
log: RequestLogger,
) => Promise<void>;

export function sqsWorker(workerName: string, handle: WorkerRecordHandler): SQSHandler {
return async (event: SQSEvent): Promise<SQSBatchResponse> => {
await loadSecretsIntoEnv();
const log = createLogger({ worker: workerName });
const batchItemFailures: SQSBatchResponse['batchItemFailures'] = [];

for (const record of event.Records) {
try {
const envelope = JSON.parse(record.body) as EventBridgeEnvelope;
await handle(envelope, log);
} catch (err) {
log.error({ err, messageId: record.messageId }, `${workerName} failed a record`);
batchItemFailures.push({ itemIdentifier: record.messageId });
}
}

return { batchItemFailures };
};
}
Loading
Loading