From 6f226d4ff385889cd8ff088665e7bc806fa8af95 Mon Sep 17 00:00:00 2001 From: SebastianLevano Date: Tue, 26 May 2026 09:09:42 -0500 Subject: [PATCH] =?UTF-8?q?feat(api,web,infra):=20phase=204=20=E2=80=94=20?= =?UTF-8?q?async=20AI=20pipeline=20(extract=20=E2=86=92=20summarize=20?= =?UTF-8?q?=E2=86=92=20classify)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Documents are now processed end-to-end after upload: text extraction, AI summary and classification, surfaced in a detail page. Deployed to sa-east-1 and smoke-tested live with real gpt-4o-mini. Pipeline (separate workers, EventBridge fan-out): S3 ObjectCreated → EventBridge → SQS(ingest)+DLQ → extract-worker (pdf-parse/mammoth → text to S3, status extracted) → emits DocumentExtracted → EventBridge rule (2 targets) → SQS(summarize)+SQS(classify) → workers (gpt-4o-mini) → ai_analyses rows → atomic ready-join flips status to ready. Backend: - migration: ai_analyses table + documents.{category,tags,language, text_s3_key,page_count}. - AiProvider abstraction: OpenAiProvider (JSON-schema structured outputs, validated with zod) + MockAiProvider; getAiProvider() falls back to the mock when OPENAI_API_KEY is absent (offline/free tests). Versioned prompts. - AiAnalysesRepo + DocumentsRepo lifecycle methods (claimForExtraction, markExtracted, markAnalyzing, setClassification, markReadyIfAnalysesComplete). - SQS worker runner with per-record batchItemFailures; extract/summarize/ classify handlers. GET /v1/documents/{id} (document + analyses). - pdf-parse pinned to v1 (inner import to avoid its debug block); mammoth for DOCX; openai bundled; @aws-sdk/client-eventbridge external. Infra: - pipeline-stack: 3 SQS queues + DLQs, EventBridge rules (S3→ingest, DocumentExtracted→fan-out), 3 worker Lambdas (SQS event sources, reportBatchItemFailures, S3/secret/PutEvents grants). - storage-stack: eventBridgeEnabled on the uploads bucket. - api-stack: GET /v1/documents/{id} route. populate-dev-secrets uploads optional OPENAI_API_KEY. Frontend: - document-detail page (/documents/:id): summary, bullets, category, tags, status badge, 3s polling that stops when processing ends. List rows link to detail and poll while any document is processing. Tests: analysis schemas, MockAiProvider/getAiProvider, pipeline integration (extract→summarize→classify→ready + extract-failure). Integration 43/43 (Docker), web-e2e 6/6 (chromium), cdk synth bundles all workers, infra tsc. --- .../api/src/handlers/documents/get/handler.ts | 39 ++++ apps/api/src/handlers/workers/_shared.ts | 18 ++ .../src/handlers/workers/classify/handler.ts | 42 ++++ .../src/handlers/workers/extract/handler.ts | 68 +++++++ .../workers/pipeline.integration.spec.ts | 179 ++++++++++++++++++ apps/api/src/handlers/workers/runner.ts | 47 +++++ .../src/handlers/workers/summarize/handler.ts | 42 ++++ apps/api/src/lib/ai/ai.spec.ts | 33 ++++ apps/api/src/lib/ai/index.ts | 16 ++ apps/api/src/lib/ai/mock-provider.ts | 28 +++ apps/api/src/lib/ai/openai-provider.ts | 138 ++++++++++++++ apps/api/src/lib/ai/prompts/classify/v1.ts | 16 ++ apps/api/src/lib/ai/prompts/summary/v1.ts | 18 ++ apps/api/src/lib/ai/provider.ts | 28 +++ apps/api/src/lib/events.ts | 38 ++++ apps/api/src/lib/extract/text-extractor.ts | 38 ++++ apps/api/src/lib/storage/s3.ts | 32 ++++ apps/api/src/repositories/ai-analyses-repo.ts | 88 +++++++++ apps/api/src/repositories/documents-repo.ts | 95 ++++++++++ apps/api/src/types/vendor.d.ts | 24 +++ apps/web-e2e/src/document-detail.spec.ts | 111 +++++++++++ apps/web/src/app/app.routes.ts | 6 + .../documents/document-detail.page.ts | 145 ++++++++++++++ .../app/features/documents/document-status.ts | 27 +++ .../app/features/documents/documents.page.ts | 42 ++-- .../features/documents/documents.service.ts | 8 + infra/bin/clouddocs.ts | 12 ++ infra/lib/stacks/api-stack.ts | 7 + infra/lib/stacks/pipeline-stack.ts | 142 ++++++++++++++ infra/lib/stacks/storage-stack.ts | 3 + libs/shared-types/src/index.ts | 1 + .../shared-types/src/schemas/analysis.spec.ts | 35 ++++ libs/shared-types/src/schemas/analysis.ts | 58 ++++++ libs/shared-types/src/schemas/documents.ts | 5 + package.json | 5 + pnpm-lock.yaml | 168 ++++++++++++++++ .../migrations/1779802004247_ai-analyses.sql | 42 ++++ tools/scripts/populate-dev-secrets.ts | 21 +- 38 files changed, 1845 insertions(+), 20 deletions(-) create mode 100644 apps/api/src/handlers/documents/get/handler.ts create mode 100644 apps/api/src/handlers/workers/_shared.ts create mode 100644 apps/api/src/handlers/workers/classify/handler.ts create mode 100644 apps/api/src/handlers/workers/extract/handler.ts create mode 100644 apps/api/src/handlers/workers/pipeline.integration.spec.ts create mode 100644 apps/api/src/handlers/workers/runner.ts create mode 100644 apps/api/src/handlers/workers/summarize/handler.ts create mode 100644 apps/api/src/lib/ai/ai.spec.ts create mode 100644 apps/api/src/lib/ai/index.ts create mode 100644 apps/api/src/lib/ai/mock-provider.ts create mode 100644 apps/api/src/lib/ai/openai-provider.ts create mode 100644 apps/api/src/lib/ai/prompts/classify/v1.ts create mode 100644 apps/api/src/lib/ai/prompts/summary/v1.ts create mode 100644 apps/api/src/lib/ai/provider.ts create mode 100644 apps/api/src/lib/events.ts create mode 100644 apps/api/src/lib/extract/text-extractor.ts create mode 100644 apps/api/src/repositories/ai-analyses-repo.ts create mode 100644 apps/api/src/types/vendor.d.ts create mode 100644 apps/web-e2e/src/document-detail.spec.ts create mode 100644 apps/web/src/app/features/documents/document-detail.page.ts create mode 100644 apps/web/src/app/features/documents/document-status.ts create mode 100644 infra/lib/stacks/pipeline-stack.ts create mode 100644 libs/shared-types/src/schemas/analysis.spec.ts create mode 100644 libs/shared-types/src/schemas/analysis.ts create mode 100644 tools/migrations/1779802004247_ai-analyses.sql diff --git a/apps/api/src/handlers/documents/get/handler.ts b/apps/api/src/handlers/documents/get/handler.ts new file mode 100644 index 0000000..be4a0a2 --- /dev/null +++ b/apps/api/src/handlers/documents/get/handler.ts @@ -0,0 +1,39 @@ +import type { DocumentDetailResponse } from '@clouddocs/shared-types'; + +import { NotFoundError, ValidationError } from '../../../lib/errors'; +import { DocumentsRepo, toDocument } from '../../../repositories/documents-repo'; +import { AiAnalysesRepo, toAiAnalysis } from '../../../repositories/ai-analyses-repo'; +import { + compose, + jsonResponse, + withActiveOrg, + withAuth, + withErrorHandler, + withRequestLogger, + withSecrets, + type LambdaHandler, +} from '../../../middlewares'; + +/** GET /v1/documents/{id} — document detail with its AI analyses. */ +export const handler: LambdaHandler = withSecrets( + withRequestLogger( + compose(withErrorHandler)( + withAuth( + withActiveOrg(async (ctx) => { + const id = ctx.event.pathParameters?.['id']; + if (!id) throw new ValidationError('Document id missing from path.'); + + const doc = await new DocumentsRepo(ctx.orgId).findById(id); + if (!doc) throw new NotFoundError('Document not found.'); + + const analyses = await new AiAnalysesRepo(ctx.orgId).listForDocument(id); + const body: DocumentDetailResponse = { + document: toDocument(doc), + analyses: analyses.map(toAiAnalysis), + }; + return jsonResponse(200, body); + }), + ), + ), + ), +); diff --git a/apps/api/src/handlers/workers/_shared.ts b/apps/api/src/handlers/workers/_shared.ts new file mode 100644 index 0000000..fcf66b5 --- /dev/null +++ b/apps/api/src/handlers/workers/_shared.ts @@ -0,0 +1,18 @@ +import type { AnalysisKind } from '@clouddocs/shared-types'; +import type { DocumentExtractedDetail } from '../../lib/events'; + +/** + * The analysis kinds that must all exist before a document flips to `ready`. + * Both analyze workers reference this so the ready-join stays consistent. + */ +export const READY_KINDS: readonly AnalysisKind[] = ['summary', 'classification']; + +export function isExtractedDetail(detail: unknown): detail is DocumentExtractedDetail { + return ( + typeof detail === 'object' && + detail !== null && + typeof (detail as DocumentExtractedDetail).documentId === 'string' && + typeof (detail as DocumentExtractedDetail).orgId === 'string' && + typeof (detail as DocumentExtractedDetail).textS3Key === 'string' + ); +} diff --git a/apps/api/src/handlers/workers/classify/handler.ts b/apps/api/src/handlers/workers/classify/handler.ts new file mode 100644 index 0000000..98e5b17 --- /dev/null +++ b/apps/api/src/handlers/workers/classify/handler.ts @@ -0,0 +1,42 @@ +/** + * classify-worker — triggered by the `DocumentExtracted` fan-out. Reads the + * extracted text, asks the AI provider to categorize it, stores an + * `ai_analyses` row, mirrors category + tags onto the document (for list + * filtering in Phase 5), and runs the ready-join. + */ +import { getAiProvider } from '../../../lib/ai'; +import { getObjectText } from '../../../lib/storage/s3'; +import { AiAnalysesRepo } from '../../../repositories/ai-analyses-repo'; +import { DocumentsRepo } from '../../../repositories/documents-repo'; +import { sqsWorker } from '../runner'; +import { isExtractedDetail, READY_KINDS } from '../_shared'; + +export const handler = sqsWorker('classify', async (envelope, log) => { + if (!isExtractedDetail(envelope.detail)) { + log.warn({ envelope }, 'classify: unexpected event detail, skipping'); + return; + } + const { orgId, documentId, textS3Key } = envelope.detail; + + const docs = new DocumentsRepo(orgId); + await docs.markAnalyzing(documentId); + + const text = await getObjectText(textS3Key); + const { data, usage } = await getAiProvider().classify(text); + + const analyses = new AiAnalysesRepo(orgId); + await analyses.create({ + documentId, + kind: 'classification', + model: usage.model, + promptVersion: usage.promptVersion, + ...(usage.inputTokens != null ? { inputTokens: usage.inputTokens } : {}), + ...(usage.outputTokens != null ? { outputTokens: usage.outputTokens } : {}), + ...(usage.costUsd != null ? { costUsd: usage.costUsd } : {}), + result: data, + }); + await docs.setClassification(documentId, { category: data.category, tags: data.tags }); + + const ready = await docs.markReadyIfAnalysesComplete(documentId, READY_KINDS, READY_KINDS.length); + log.info({ documentId, category: data.category, ready }, 'classify: done'); +}); diff --git a/apps/api/src/handlers/workers/extract/handler.ts b/apps/api/src/handlers/workers/extract/handler.ts new file mode 100644 index 0000000..1e9f792 --- /dev/null +++ b/apps/api/src/handlers/workers/extract/handler.ts @@ -0,0 +1,68 @@ +/** + * extract-worker — SQS-triggered (S3 ObjectCreated → EventBridge → ingest queue). + * + * Downloads the uploaded file, extracts plain text (pdf-parse / mammoth), stores + * it in S3, advances the document to `extracted`, and emits a `DocumentExtracted` + * event that fans out to the summarize + classify queues. + */ +import { DocumentsRepo } from '../../../repositories/documents-repo'; +import { buildTextKey, getObjectBytes, putText } from '../../../lib/storage/s3'; +import { extractText } from '../../../lib/extract/text-extractor'; +import { publishDocumentExtracted } from '../../../lib/events'; +import { sqsWorker, type EventBridgeEnvelope } from '../runner'; + +interface S3ObjectCreatedDetail { + bucket: { name: string }; + object: { key: string }; +} + +/** raw-uploads/{orgId}/{documentId}/{filename} → ids. */ +function parseRawKey(key: string): { orgId: string; documentId: string } | null { + const parts = decodeURIComponent(key).split('/'); + if (parts.length < 4 || parts[0] !== 'raw-uploads') return null; + return { orgId: parts[1]!, documentId: parts[2]! }; +} + +export const handler = sqsWorker('extract', async (envelope: EventBridgeEnvelope, log) => { + const detail = envelope.detail as S3ObjectCreatedDetail; + const key = detail?.object?.key; + if (!key) { + log.warn({ envelope }, 'extract: event missing object key, skipping'); + return; + } + + const ids = parseRawKey(key); + if (!ids) { + log.warn({ key }, 'extract: key not under raw-uploads/, skipping'); + return; + } + const { orgId, documentId } = ids; + const repo = new DocumentsRepo(orgId); + + const doc = await repo.findById(documentId); + if (!doc) { + log.warn({ orgId, documentId }, 'extract: no document row for key, skipping'); + return; + } + + const claimed = await repo.claimForExtraction(documentId); + if (!claimed) { + log.info({ documentId, status: doc.status }, 'extract: already claimed/processed, skipping'); + return; + } + + try { + const bytes = await getObjectBytes(claimed.s3_key); + const { text, pageCount } = await extractText(bytes, claimed.mime_type); + const textS3Key = buildTextKey(orgId, documentId); + await putText(textS3Key, text); + await repo.markExtracted(documentId, { textS3Key, ...(pageCount ? { pageCount } : {}) }); + await publishDocumentExtracted({ orgId, documentId, textS3Key }); + log.info({ documentId, pageCount, chars: text.length }, 'extract: done'); + } catch (err) { + // Terminal for this document: mark failed and consume the message rather + // than retry a file we can't parse. Transient infra blips are rare in dev. + log.error({ err, documentId }, 'extract: failed'); + await repo.setStatus(documentId, 'failed', 'Text extraction failed.'); + } +}); diff --git a/apps/api/src/handlers/workers/pipeline.integration.spec.ts b/apps/api/src/handlers/workers/pipeline.integration.spec.ts new file mode 100644 index 0000000..e935891 --- /dev/null +++ b/apps/api/src/handlers/workers/pipeline.integration.spec.ts @@ -0,0 +1,179 @@ +/** + * Document AI pipeline integration test against a real Postgres. + * + * Same safety gate as the other suites (RUN_INTEGRATION=1 + localhost DB). S3, + * the text extractor and EventBridge are mocked, and the AI provider falls back + * to the deterministic MockAiProvider (no OPENAI_API_KEY), so the suite is + * offline and free — it exercises the worker → DB wiring and the ready-join. + */ +import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from 'vitest'; +import type { Context, SQSEvent } from 'aws-lambda'; + +const RUN = process.env['RUN_INTEGRATION'] === '1'; +const INTEGRATION_URL = + process.env['INTEGRATION_DATABASE_URL'] ?? + 'postgres://clouddocs:clouddocs@localhost:5434/clouddocs'; +const LOCAL_HOST_RE = /(?:^|@|\/\/)(?:localhost|127\.0\.0\.1|0\.0\.0\.0)(?::|\/|$)/; +const URL_LOCAL = LOCAL_HOST_RE.test(INTEGRATION_URL); + +if (RUN && URL_LOCAL) { + process.env['DATABASE_URL'] = INTEGRATION_URL; + process.env['UPLOADS_BUCKET'] = 'test-bucket'; + // Force the mock AI provider regardless of what .env.local carries. + delete process.env['OPENAI_API_KEY']; +} + +const EXTRACTED_TEXT = 'This is an INVOICE for consulting services. Total due: $1000.'; + +vi.mock('../../lib/storage/s3', async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + getObjectBytes: vi.fn(async () => Buffer.from('%PDF-1.4 fake')), + getObjectText: vi.fn(async () => EXTRACTED_TEXT), + putText: vi.fn(async () => undefined), + }; +}); +vi.mock('../../lib/extract/text-extractor', () => ({ + extractText: vi.fn(async () => ({ text: EXTRACTED_TEXT, pageCount: 2 })), +})); +vi.mock('../../lib/events', async (importOriginal) => { + const actual = await importOriginal(); + return { ...actual, publishDocumentExtracted: vi.fn(async () => undefined) }; +}); + +import { closePool, query } from '../../lib/db/client'; +import { buildRawKey, buildTextKey } from '../../lib/storage/s3'; +import { DOCUMENT_EXTRACTED, EVENT_SOURCE } from '../../lib/events'; +import { DocumentsRepo } from '../../repositories/documents-repo'; +import { AiAnalysesRepo } from '../../repositories/ai-analyses-repo'; +import { handler as extractHandler } from './extract/handler'; +import { handler as summarizeHandler } from './summarize/handler'; +import { handler as classifyHandler } from './classify/handler'; + +const lambdaCtx = {} as Context; +const noCb = undefined as never; + +/** Wrap an EventBridge envelope as a one-record SQS event. */ +function sqsEventFor(envelope: unknown): SQSEvent { + return { + Records: [ + { + messageId: 'm1', + receiptHandle: 'r1', + body: JSON.stringify(envelope), + attributes: {} as never, + messageAttributes: {}, + md5OfBody: '', + eventSource: 'aws:sqs', + eventSourceARN: 'arn:test', + awsRegion: 'sa-east-1', + }, + ], + }; +} + +async function seedDoc(): Promise<{ orgId: string; docId: string; s3Key: string }> { + const orgs = await query<{ id: string }>( + `INSERT INTO organizations (name, slug) VALUES ('Acme', 'acme') RETURNING id`, + ); + const orgId = orgs[0]!.id; + const users = await query<{ id: string }>( + `INSERT INTO users (email, password_hash) VALUES ('demo@test.com', 'x') RETURNING id`, + ); + const userId = users[0]!.id; + await query(`INSERT INTO memberships (user_id, org_id, role) VALUES ($1, $2, 'owner')`, [ + userId, + orgId, + ]); + const repo = new DocumentsRepo(orgId); + const crypto = await import('node:crypto'); + const docId = crypto.randomUUID(); + const s3Key = buildRawKey(orgId, docId, 'invoice.pdf'); + await repo.create({ + id: docId, + uploadedBy: userId, + filename: 'invoice.pdf', + mimeType: 'application/pdf', + sizeBytes: 1234, + s3Key, + }); + await repo.setStatus(docId, 'uploaded'); + return { orgId, docId, s3Key }; +} + +describe.skipIf(!RUN || !URL_LOCAL)('document AI pipeline', () => { + beforeAll(() => { + if (!process.env['JWT_PRIVATE_KEY'] || !process.env['JWT_PUBLIC_KEY']) { + throw new Error('JWT keys must be set for integration tests.'); + } + }); + + beforeEach(async () => { + await query( + 'TRUNCATE ai_analyses, documents, refresh_tokens, invitations, memberships, organizations, users RESTART IDENTITY CASCADE', + ); + }); + + afterAll(async () => { + await closePool(); + }); + + it('extract → summarize → classify → ready', async () => { + const { orgId, docId, s3Key } = await seedDoc(); + + // 1. extract: S3 ObjectCreated envelope + await extractHandler( + sqsEventFor({ + source: 'aws.s3', + 'detail-type': 'Object Created', + detail: { bucket: { name: 'test-bucket' }, object: { key: s3Key } }, + }), + lambdaCtx, + noCb, + ); + + const docs = new DocumentsRepo(orgId); + let doc = await docs.findById(docId); + expect(doc?.status).toBe('extracted'); + expect(doc?.text_s3_key).toBe(buildTextKey(orgId, docId)); + expect(doc?.page_count).toBe(2); + + // 2. analyze fan-out: DocumentExtracted envelope to both workers + const extractedEnvelope = { + source: EVENT_SOURCE, + 'detail-type': DOCUMENT_EXTRACTED, + detail: { orgId, documentId: docId, textS3Key: buildTextKey(orgId, docId) }, + }; + await summarizeHandler(sqsEventFor(extractedEnvelope), lambdaCtx, noCb); + await classifyHandler(sqsEventFor(extractedEnvelope), lambdaCtx, noCb); + + // 3. assert: ready, classified, two analyses + doc = await docs.findById(docId); + expect(doc?.status).toBe('ready'); + expect(doc?.category).toBe('Invoice'); // mock detects "invoice" in the text + expect(doc?.tags.length).toBeGreaterThan(0); + + const analyses = await new AiAnalysesRepo(orgId).listForDocument(docId); + expect(analyses.map((a) => a.kind).sort()).toEqual(['classification', 'summary']); + }); + + it('extract marks the document failed when extraction throws', async () => { + const { orgId, docId, s3Key } = await seedDoc(); + const extractor = await import('../../lib/extract/text-extractor'); + vi.mocked(extractor.extractText).mockRejectedValueOnce(new Error('corrupt pdf')); + + await extractHandler( + sqsEventFor({ + source: 'aws.s3', + 'detail-type': 'Object Created', + detail: { bucket: { name: 'test-bucket' }, object: { key: s3Key } }, + }), + lambdaCtx, + noCb, + ); + + const doc = await new DocumentsRepo(orgId).findById(docId); + expect(doc?.status).toBe('failed'); + }); +}); diff --git a/apps/api/src/handlers/workers/runner.ts b/apps/api/src/handlers/workers/runner.ts new file mode 100644 index 0000000..d8b82a3 --- /dev/null +++ b/apps/api/src/handlers/workers/runner.ts @@ -0,0 +1,47 @@ +/** + * Shared SQS worker runner for the document pipeline. + * + * All worker queues are fed by EventBridge (S3 events → ingest queue; our + * `DocumentExtracted` event → analyze queues), so each SQS record body is an + * EventBridge envelope. The runner: + * - loads Secrets Manager values into env (DB URL, OpenAI key) once per cold + * start, like the HTTP handlers' `withSecrets`; + * - processes each record independently and reports per-record failures via + * `batchItemFailures` so only failed messages are retried (the Lambda's + * event source must have ReportBatchItemFailures enabled). + */ +import type { SQSBatchResponse, SQSEvent, SQSHandler } from 'aws-lambda'; + +import { loadSecretsIntoEnv } from '../../lib/secrets'; +import { createLogger, type RequestLogger } from '../../lib/logger'; + +export interface EventBridgeEnvelope { + source: string; + 'detail-type': string; + detail: unknown; +} + +export type WorkerRecordHandler = ( + envelope: EventBridgeEnvelope, + log: RequestLogger, +) => Promise; + +export function sqsWorker(workerName: string, handle: WorkerRecordHandler): SQSHandler { + return async (event: SQSEvent): Promise => { + await loadSecretsIntoEnv(); + const log = createLogger({ worker: workerName }); + const batchItemFailures: SQSBatchResponse['batchItemFailures'] = []; + + for (const record of event.Records) { + try { + const envelope = JSON.parse(record.body) as EventBridgeEnvelope; + await handle(envelope, log); + } catch (err) { + log.error({ err, messageId: record.messageId }, `${workerName} failed a record`); + batchItemFailures.push({ itemIdentifier: record.messageId }); + } + } + + return { batchItemFailures }; + }; +} diff --git a/apps/api/src/handlers/workers/summarize/handler.ts b/apps/api/src/handlers/workers/summarize/handler.ts new file mode 100644 index 0000000..677e2ca --- /dev/null +++ b/apps/api/src/handlers/workers/summarize/handler.ts @@ -0,0 +1,42 @@ +/** + * summarize-worker — triggered by the `DocumentExtracted` fan-out. Reads the + * extracted text, asks the AI provider for a summary, stores it as an + * `ai_analyses` row, and (if classification is also done) flips the document to + * `ready` via the atomic ready-join. + */ +import { getAiProvider } from '../../../lib/ai'; +import { getObjectText } from '../../../lib/storage/s3'; +import { AiAnalysesRepo } from '../../../repositories/ai-analyses-repo'; +import { DocumentsRepo } from '../../../repositories/documents-repo'; +import { sqsWorker } from '../runner'; +import { isExtractedDetail, READY_KINDS } from '../_shared'; + +export const handler = sqsWorker('summarize', async (envelope, log) => { + if (!isExtractedDetail(envelope.detail)) { + log.warn({ envelope }, 'summarize: unexpected event detail, skipping'); + return; + } + const { orgId, documentId, textS3Key } = envelope.detail; + + const docs = new DocumentsRepo(orgId); + await docs.markAnalyzing(documentId); + + const text = await getObjectText(textS3Key); + const { data, usage } = await getAiProvider().summarize(text); + + const analyses = new AiAnalysesRepo(orgId); + await analyses.create({ + documentId, + kind: 'summary', + model: usage.model, + promptVersion: usage.promptVersion, + ...(usage.inputTokens != null ? { inputTokens: usage.inputTokens } : {}), + ...(usage.outputTokens != null ? { outputTokens: usage.outputTokens } : {}), + ...(usage.costUsd != null ? { costUsd: usage.costUsd } : {}), + result: data, + }); + if (data.language) await docs.setLanguage(documentId, data.language); + + const ready = await docs.markReadyIfAnalysesComplete(documentId, READY_KINDS, READY_KINDS.length); + log.info({ documentId, ready }, 'summarize: done'); +}); diff --git a/apps/api/src/lib/ai/ai.spec.ts b/apps/api/src/lib/ai/ai.spec.ts new file mode 100644 index 0000000..4a18420 --- /dev/null +++ b/apps/api/src/lib/ai/ai.spec.ts @@ -0,0 +1,33 @@ +import { describe, expect, it } from 'vitest'; + +import { ClassifyResultSchema, SummaryResultSchema } from '@clouddocs/shared-types'; + +import { MockAiProvider } from './mock-provider'; +import { getAiProvider } from './index'; + +describe('MockAiProvider', () => { + const provider = new MockAiProvider(); + + it('returns a schema-valid summary', async () => { + const { data } = await provider.summarize('some document text'); + expect(() => SummaryResultSchema.parse(data)).not.toThrow(); + }); + + it('returns a schema-valid classification and detects "invoice"', async () => { + const { data } = await provider.classify('This is an INVOICE for services'); + expect(() => ClassifyResultSchema.parse(data)).not.toThrow(); + expect(data.category).toBe('Invoice'); + }); +}); + +describe('getAiProvider', () => { + it('falls back to the mock provider when no OPENAI_API_KEY is set', () => { + const prev = process.env['OPENAI_API_KEY']; + delete process.env['OPENAI_API_KEY']; + try { + expect(getAiProvider()).toBeInstanceOf(MockAiProvider); + } finally { + if (prev !== undefined) process.env['OPENAI_API_KEY'] = prev; + } + }); +}); diff --git a/apps/api/src/lib/ai/index.ts b/apps/api/src/lib/ai/index.ts new file mode 100644 index 0000000..cf416aa --- /dev/null +++ b/apps/api/src/lib/ai/index.ts @@ -0,0 +1,16 @@ +export type { AiProvider, AiResult, AiUsage } from './provider'; +export { OpenAiProvider } from './openai-provider'; +export { MockAiProvider } from './mock-provider'; + +/** + * Returns the AI provider for the current environment: the real OpenAI client + * when a key is present, otherwise the deterministic mock (local/dev without a + * key, and tests). + */ +import type { AiProvider } from './provider'; +import { OpenAiProvider } from './openai-provider'; +import { MockAiProvider } from './mock-provider'; + +export function getAiProvider(): AiProvider { + return process.env['OPENAI_API_KEY'] ? new OpenAiProvider() : new MockAiProvider(); +} diff --git a/apps/api/src/lib/ai/mock-provider.ts b/apps/api/src/lib/ai/mock-provider.ts new file mode 100644 index 0000000..492b70b --- /dev/null +++ b/apps/api/src/lib/ai/mock-provider.ts @@ -0,0 +1,28 @@ +/** + * Deterministic {@link AiProvider} for tests and local runs without an API key. + * Returns canned-but-plausible results derived from the input so assertions can + * be specific. + */ +import type { ClassifyResult, SummaryResult } from '@clouddocs/shared-types'; + +import type { AiProvider, AiResult } from './provider'; + +export class MockAiProvider implements AiProvider { + async summarize(text: string): Promise> { + const data: SummaryResult = { + summary: `Mock summary of a ${text.length}-character document.`, + bullets: ['First key point', 'Second key point'], + language: 'en', + }; + return { data, usage: { model: 'mock', promptVersion: 'mock', costUsd: 0 } }; + } + + async classify(text: string): Promise> { + const data: ClassifyResult = { + category: text.toLowerCase().includes('invoice') ? 'Invoice' : 'Other', + confidence: 0.9, + tags: ['mock', 'test'], + }; + return { data, usage: { model: 'mock', promptVersion: 'mock', costUsd: 0 } }; + } +} diff --git a/apps/api/src/lib/ai/openai-provider.ts b/apps/api/src/lib/ai/openai-provider.ts new file mode 100644 index 0000000..29e29f9 --- /dev/null +++ b/apps/api/src/lib/ai/openai-provider.ts @@ -0,0 +1,138 @@ +/** + * OpenAI implementation of {@link AiProvider} using gpt-4o-mini with structured + * (JSON-schema) outputs. We pass an explicit JSON schema to `response_format` + * and validate the result with our own zod schema, rather than the SDK's + * `zodResponseFormat` helper, to stay decoupled from its Zod-version coupling. + */ +import OpenAI from 'openai'; + +import { + ClassifyResultSchema, + DOCUMENT_CATEGORIES, + SummaryResultSchema, + type ClassifyResult, + type SummaryResult, +} from '@clouddocs/shared-types'; + +import { AppError } from '../errors'; +import type { AiProvider, AiResult, AiUsage } from './provider'; +import { SUMMARY_PROMPT_VERSION, SUMMARY_SYSTEM, summaryUserPrompt } from './prompts/summary/v1'; +import { + CLASSIFY_PROMPT_VERSION, + CLASSIFY_SYSTEM, + classifyUserPrompt, +} from './prompts/classify/v1'; + +const MODEL = 'gpt-4o-mini'; +/** Bound input cost: ~6-8K tokens. gpt-4o-mini handles 128K but we don't need it. */ +const MAX_INPUT_CHARS = 24_000; +/** gpt-4o-mini pricing (USD per token), 2025 rates. */ +const INPUT_USD_PER_TOKEN = 0.15 / 1_000_000; +const OUTPUT_USD_PER_TOKEN = 0.6 / 1_000_000; + +const SUMMARY_JSON_SCHEMA: Record = { + type: 'object', + additionalProperties: false, + properties: { + summary: { type: 'string' }, + bullets: { type: 'array', items: { type: 'string' } }, + language: { type: 'string', description: 'ISO 639-1 code' }, + }, + required: ['summary', 'bullets', 'language'], +}; + +const CLASSIFY_JSON_SCHEMA: Record = { + type: 'object', + additionalProperties: false, + properties: { + category: { type: 'string', enum: [...DOCUMENT_CATEGORIES] }, + confidence: { type: 'number' }, + tags: { type: 'array', items: { type: 'string' } }, + }, + required: ['category', 'confidence', 'tags'], +}; + +export class OpenAiProvider implements AiProvider { + private readonly client: OpenAI; + + constructor(apiKey = process.env['OPENAI_API_KEY']) { + if (!apiKey) throw new Error('OPENAI_API_KEY is not set.'); + this.client = new OpenAI({ apiKey }); + } + + async summarize(text: string): Promise> { + const { parsed, usage } = await this.complete( + SUMMARY_SYSTEM, + summaryUserPrompt(truncate(text)), + 'summary', + SUMMARY_JSON_SCHEMA, + SUMMARY_PROMPT_VERSION, + ); + const data = SummaryResultSchema.parse(parsed); + return { data, usage }; + } + + async classify(text: string): Promise> { + const { parsed, usage } = await this.complete( + CLASSIFY_SYSTEM, + classifyUserPrompt(truncate(text)), + 'classification', + CLASSIFY_JSON_SCHEMA, + CLASSIFY_PROMPT_VERSION, + ); + const data = ClassifyResultSchema.parse(parsed); + return { data, usage }; + } + + private async complete( + system: string, + user: string, + schemaName: string, + schema: Record, + promptVersion: string, + ): Promise<{ parsed: unknown; usage: AiUsage }> { + const completion = await this.client.chat.completions.create({ + model: MODEL, + messages: [ + { role: 'system', content: system }, + { role: 'user', content: user }, + ], + response_format: { + type: 'json_schema', + json_schema: { name: schemaName, strict: true, schema }, + }, + }); + + const content = completion.choices[0]?.message.content; + if (!content) throw new AppError('ai_error', 'OpenAI returned no content.', 502); + + let parsed: unknown; + try { + parsed = JSON.parse(content); + } catch { + throw new AppError('ai_error', 'OpenAI returned non-JSON content.', 502); + } + + const inputTokens = completion.usage?.prompt_tokens; + const outputTokens = completion.usage?.completion_tokens; + return { + parsed, + usage: { + model: MODEL, + promptVersion, + inputTokens, + outputTokens, + costUsd: estimateCost(inputTokens, outputTokens), + }, + }; + } +} + +function truncate(text: string): string { + return text.length > MAX_INPUT_CHARS ? text.slice(0, MAX_INPUT_CHARS) : text; +} + +function estimateCost(inputTokens?: number, outputTokens?: number): number | undefined { + if (inputTokens == null || outputTokens == null) return undefined; + return inputTokens * INPUT_USD_PER_TOKEN + outputTokens * OUTPUT_USD_PER_TOKEN; +} diff --git a/apps/api/src/lib/ai/prompts/classify/v1.ts b/apps/api/src/lib/ai/prompts/classify/v1.ts new file mode 100644 index 0000000..18ddc12 --- /dev/null +++ b/apps/api/src/lib/ai/prompts/classify/v1.ts @@ -0,0 +1,16 @@ +import { DOCUMENT_CATEGORIES } from '@clouddocs/shared-types'; + +/** Classification prompt v1. */ +export const CLASSIFY_PROMPT_VERSION = 'classify.v1'; + +export const CLASSIFY_SYSTEM = [ + 'You are a document classifier for a document-management product.', + `Assign the document to exactly one category from this list: ${DOCUMENT_CATEGORIES.join(', ')}.`, + 'Use "Other" only when none of the specific categories clearly fit.', + 'Also provide a confidence between 0 and 1 and up to 8 short, lowercase topical tags.', + 'Base your answer only on the provided text.', +].join('\n'); + +export function classifyUserPrompt(text: string): string { + return `Classify the following document:\n\n"""\n${text}\n"""`; +} diff --git a/apps/api/src/lib/ai/prompts/summary/v1.ts b/apps/api/src/lib/ai/prompts/summary/v1.ts new file mode 100644 index 0000000..d4c5d25 --- /dev/null +++ b/apps/api/src/lib/ai/prompts/summary/v1.ts @@ -0,0 +1,18 @@ +/** + * Summary prompt v1. Versioned so we can re-run old documents under a new + * prompt and compare (the version is stored on every ai_analyses row). + */ +export const SUMMARY_PROMPT_VERSION = 'summary.v1'; + +export const SUMMARY_SYSTEM = [ + 'You are a precise document summarizer for a document-management product.', + 'Given the extracted text of a document, produce:', + '- a concise 2-4 sentence summary of what the document is and its key point,', + '- up to 7 short bullet points covering the most important facts,', + '- the ISO 639-1 language code of the document.', + 'Be factual; do not invent details that are not in the text.', +].join('\n'); + +export function summaryUserPrompt(text: string): string { + return `Summarize the following document:\n\n"""\n${text}\n"""`; +} diff --git a/apps/api/src/lib/ai/provider.ts b/apps/api/src/lib/ai/provider.ts new file mode 100644 index 0000000..35407bb --- /dev/null +++ b/apps/api/src/lib/ai/provider.ts @@ -0,0 +1,28 @@ +/** + * Provider-agnostic AI interface (plan §10.5). Use cases and workers depend on + * this, never on the OpenAI SDK directly, so we can swap providers (or a mock + * in tests) without touching business logic. + * + * Embeddings + chat (RAG) land in a later phase; Phase 4 only needs the two + * synchronous analysis calls. + */ +import type { ClassifyResult, SummaryResult } from '@clouddocs/shared-types'; + +export interface AiUsage { + model: string; + promptVersion: string; + inputTokens?: number; + outputTokens?: number; + /** Estimated USD cost, if the provider reports token usage. */ + costUsd?: number; +} + +export interface AiResult { + data: T; + usage: AiUsage; +} + +export interface AiProvider { + summarize(text: string): Promise>; + classify(text: string): Promise>; +} diff --git a/apps/api/src/lib/events.ts b/apps/api/src/lib/events.ts new file mode 100644 index 0000000..9e2c391 --- /dev/null +++ b/apps/api/src/lib/events.ts @@ -0,0 +1,38 @@ +/** + * EventBridge publishing for the document pipeline. The extract worker emits a + * `DocumentExtracted` event; an EventBridge rule fans it out to the summarize + * and classify queues (see infra/pipeline-stack). Keeping the source/detail-type + * constants here means producer and the CDK rule can't drift. + */ +import { EventBridgeClient, PutEventsCommand } from '@aws-sdk/client-eventbridge'; + +export const EVENT_SOURCE = 'clouddocs.documents'; +export const DOCUMENT_EXTRACTED = 'DocumentExtracted'; + +export interface DocumentExtractedDetail { + orgId: string; + documentId: string; + textS3Key: string; +} + +let client: EventBridgeClient | undefined; + +function bus(): EventBridgeClient { + client ??= new EventBridgeClient({}); + return client; +} + +export async function publishDocumentExtracted(detail: DocumentExtractedDetail): Promise { + await bus().send( + new PutEventsCommand({ + Entries: [ + { + Source: EVENT_SOURCE, + DetailType: DOCUMENT_EXTRACTED, + Detail: JSON.stringify(detail), + // Default bus; the rule lives there too. + }, + ], + }), + ); +} diff --git a/apps/api/src/lib/extract/text-extractor.ts b/apps/api/src/lib/extract/text-extractor.ts new file mode 100644 index 0000000..c431329 --- /dev/null +++ b/apps/api/src/lib/extract/text-extractor.ts @@ -0,0 +1,38 @@ +/** + * Text extraction for the supported document types. PDF via `pdf-parse` + * (imported from its inner entrypoint to avoid the package's debug block that + * reads a sample file at import time when bundled), DOCX via `mammoth`. + */ +import pdfParse from 'pdf-parse/lib/pdf-parse.js'; +import { extractRawText } from 'mammoth'; + +import { AppError } from '../errors'; + +const PDF = 'application/pdf'; +const DOCX = 'application/vnd.openxmlformats-officedocument.wordprocessingml.document'; + +export interface ExtractedText { + text: string; + pageCount?: number; +} + +export async function extractText(bytes: Buffer, mimeType: string): Promise { + if (mimeType === PDF) { + const result = await pdfParse(bytes); + return { text: normalize(result.text), pageCount: result.numpages }; + } + if (mimeType === DOCX) { + const result = await extractRawText({ buffer: bytes }); + return { text: normalize(result.value) }; + } + throw new AppError('unsupported_media_type', `Cannot extract text from ${mimeType}.`, 415); +} + +/** Collapse excessive whitespace; keeps payloads (and token costs) sane. */ +function normalize(text: string): string { + return text + .replace(/\r\n/g, '\n') + .replace(/[ \t]+\n/g, '\n') + .replace(/\n{3,}/g, '\n\n') + .trim(); +} diff --git a/apps/api/src/lib/storage/s3.ts b/apps/api/src/lib/storage/s3.ts index 47b4ce6..84d33bc 100644 --- a/apps/api/src/lib/storage/s3.ts +++ b/apps/api/src/lib/storage/s3.ts @@ -15,6 +15,11 @@ import { getSignedUrl } from '@aws-sdk/s3-request-presigner'; export const UPLOAD_URL_TTL_SECONDS = 5 * 60; export const DOWNLOAD_URL_TTL_SECONDS = 15 * 60; +/** Key prefix for extracted plain text, mirroring the raw-uploads layout. */ +export function buildTextKey(orgId: string, documentId: string): string { + return `extracted-text/${orgId}/${documentId}.txt`; +} + let client: S3Client | undefined; function s3(): S3Client { @@ -76,3 +81,30 @@ export async function presignDownload( const url = await getSignedUrl(s3(), command, { expiresIn: DOWNLOAD_URL_TTL_SECONDS }); return { url, expiresInSeconds: DOWNLOAD_URL_TTL_SECONDS }; } + +/** Download an object's raw bytes (used by the extract worker). */ +export async function getObjectBytes(key: string): Promise { + const result = await s3().send(new GetObjectCommand({ Bucket: bucketName(), Key: key })); + if (!result.Body) throw new Error(`S3 object ${key} has no body.`); + const bytes = await result.Body.transformToByteArray(); + return Buffer.from(bytes); +} + +/** Download an object's text content (used by the analysis workers). */ +export async function getObjectText(key: string): Promise { + const result = await s3().send(new GetObjectCommand({ Bucket: bucketName(), Key: key })); + if (!result.Body) throw new Error(`S3 object ${key} has no body.`); + return result.Body.transformToString('utf8'); +} + +/** Store UTF-8 text at a key (extracted document text). */ +export async function putText(key: string, text: string): Promise { + await s3().send( + new PutObjectCommand({ + Bucket: bucketName(), + Key: key, + Body: text, + ContentType: 'text/plain; charset=utf-8', + }), + ); +} diff --git a/apps/api/src/repositories/ai-analyses-repo.ts b/apps/api/src/repositories/ai-analyses-repo.ts new file mode 100644 index 0000000..9802d32 --- /dev/null +++ b/apps/api/src/repositories/ai-analyses-repo.ts @@ -0,0 +1,88 @@ +/** + * AI analyses repository — org-scoped. Stores one row per analysis run; a + * re-analysis inserts a new row (history) rather than overwriting. + */ +import type { AiAnalysis, AnalysisKind } from '@clouddocs/shared-types'; + +import { OrgScopedRepository } from './org-scoped-repository'; + +export type AiAnalysisRow = { + id: string; + document_id: string; + org_id: string; + kind: AnalysisKind; + model: string; + prompt_version: string; + input_tokens: number | null; + output_tokens: number | null; + cost_usd: string | null; + result: unknown; + created_at: Date; +}; + +export interface CreateAnalysisInput { + documentId: string; + kind: AnalysisKind; + model: string; + promptVersion: string; + inputTokens?: number; + outputTokens?: number; + costUsd?: number; + result: unknown; +} + +export class AiAnalysesRepo extends OrgScopedRepository { + async create(input: CreateAnalysisInput): Promise { + const rows = await this.scopedQuery( + `INSERT INTO ai_analyses + (org_id, document_id, kind, model, prompt_version, input_tokens, output_tokens, cost_usd, result) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) + RETURNING *`, + [ + input.documentId, + input.kind, + input.model, + input.promptVersion, + input.inputTokens ?? null, + input.outputTokens ?? null, + input.costUsd ?? null, + JSON.stringify(input.result), + ], + ); + const row = rows[0]; + if (!row) throw new Error('Insert into ai_analyses returned no row.'); + return row; + } + + async listForDocument(documentId: string): Promise { + return this.scopedQuery( + `SELECT * FROM ai_analyses + WHERE org_id = $1 AND document_id = $2 + ORDER BY created_at DESC`, + [documentId], + ); + } + + /** Distinct count of the given kinds present for a document (for the ready-join). */ + async countKinds(documentId: string, kinds: readonly AnalysisKind[]): Promise { + const rows = await this.scopedQuery<{ n: string }>( + `SELECT count(DISTINCT kind)::text AS n FROM ai_analyses + WHERE org_id = $1 AND document_id = $2 AND kind = ANY($3)`, + [documentId, [...kinds]], + ); + return Number(rows[0]?.n ?? 0); + } +} + +/** Map a DB row to the API `AiAnalysis` shape. */ +export function toAiAnalysis(row: AiAnalysisRow): AiAnalysis { + return { + id: row.id, + documentId: row.document_id, + kind: row.kind, + model: row.model, + promptVersion: row.prompt_version, + result: row.result, + createdAt: new Date(row.created_at).toISOString(), + }; +} diff --git a/apps/api/src/repositories/documents-repo.ts b/apps/api/src/repositories/documents-repo.ts index e852444..8aa3244 100644 --- a/apps/api/src/repositories/documents-repo.ts +++ b/apps/api/src/repositories/documents-repo.ts @@ -21,6 +21,11 @@ export type DocumentRow = { s3_key: string; status: DocumentStatus; error: string | null; + text_s3_key: string | null; + page_count: number | null; + language: string | null; + category: string | null; + tags: string[]; metadata: Record; created_at: Date; updated_at: Date; @@ -74,6 +79,92 @@ export class DocumentsRepo extends OrgScopedRepository { return rows[0]; } + /** + * Atomically claim a document for extraction: pending_upload|uploaded → + * extracting. Returns the row if this call won the claim, undefined if it was + * already claimed/processed (idempotency + race-safety against duplicate S3 + * events and the client's /complete call). + */ + async claimForExtraction(id: string): Promise { + const rows = await this.scopedQuery( + `UPDATE documents SET status = 'extracting' + WHERE org_id = $1 AND id = $2 AND status IN ('pending_upload', 'uploaded') + RETURNING *`, + [id], + ); + return rows[0]; + } + + /** Record extraction output and move to `extracted` (extract worker). */ + async markExtracted( + id: string, + input: { textS3Key: string; pageCount?: number; language?: string }, + ): Promise { + const rows = await this.scopedQuery( + `UPDATE documents + SET status = 'extracted', text_s3_key = $3, page_count = $4, language = COALESCE($5, language) + WHERE org_id = $1 AND id = $2 + RETURNING *`, + [id, input.textS3Key, input.pageCount ?? null, input.language ?? null], + ); + return rows[0]; + } + + /** Move `extracted` → `analyzing` (first analyze worker to run; idempotent). */ + async markAnalyzing(id: string): Promise { + await this.scopedQuery( + `UPDATE documents SET status = 'analyzing' + WHERE org_id = $1 AND id = $2 AND status = 'extracted'`, + [id], + ); + } + + /** Persist the classifier output onto the document (classify worker). */ + async setClassification( + id: string, + input: { category: string; tags: string[] }, + ): Promise { + const rows = await this.scopedQuery( + `UPDATE documents SET category = $3, tags = $4 + WHERE org_id = $1 AND id = $2 + RETURNING *`, + [id, input.category, input.tags], + ); + return rows[0]; + } + + /** Set the detected document language (summarize worker). */ + async setLanguage(id: string, language: string): Promise { + await this.scopedQuery(`UPDATE documents SET language = $3 WHERE org_id = $1 AND id = $2`, [ + id, + language, + ]); + } + + /** + * Atomically flips the document to `ready` once at least `requiredCount` + * distinct analysis kinds exist for it. Idempotent and race-safe: the worker + * that commits its analysis last is the one whose call sees the full set and + * wins; earlier calls match zero rows. Returns true if it set `ready`. + */ + async markReadyIfAnalysesComplete( + id: string, + kinds: readonly string[], + requiredCount: number, + ): Promise { + const rows = await this.scopedQuery<{ id: string }>( + `UPDATE documents SET status = 'ready' + WHERE org_id = $1 AND id = $2 AND status <> 'ready' + AND ( + SELECT count(DISTINCT kind) FROM ai_analyses + WHERE document_id = $2 AND org_id = $1 AND kind = ANY($3) + ) >= $4 + RETURNING id`, + [id, kinds as string[], requiredCount], + ); + return rows.length > 0; + } + /** * Keyset pagination, newest first. Pass the previous page's last `id` as * `cursor` to get the next page. Fetches one extra row to know whether more @@ -117,6 +208,10 @@ export function toDocument(row: DocumentRow): Document { sizeBytes: Number(row.size_bytes), status: row.status, error: row.error, + category: row.category, + tags: row.tags ?? [], + language: row.language, + pageCount: row.page_count, createdAt: new Date(row.created_at).toISOString(), updatedAt: new Date(row.updated_at).toISOString(), }; diff --git a/apps/api/src/types/vendor.d.ts b/apps/api/src/types/vendor.d.ts new file mode 100644 index 0000000..b9aff55 --- /dev/null +++ b/apps/api/src/types/vendor.d.ts @@ -0,0 +1,24 @@ +// Ambient declarations for vendored libs whose published types we can't use +// directly: pdf-parse's inner entrypoint (imported to dodge its debug block) +// and mammoth (ships no types). + +declare module 'pdf-parse/lib/pdf-parse.js' { + interface PdfParseResult { + numpages: number; + numrender: number; + info: unknown; + metadata: unknown; + text: string; + version: string; + } + function pdfParse(dataBuffer: Buffer, options?: Record): Promise; + export default pdfParse; +} + +declare module 'mammoth' { + interface MammothResult { + value: string; + messages: Array<{ type: string; message: string }>; + } + export function extractRawText(input: { buffer: Buffer }): Promise; +} diff --git a/apps/web-e2e/src/document-detail.spec.ts b/apps/web-e2e/src/document-detail.spec.ts new file mode 100644 index 0000000..d159592 --- /dev/null +++ b/apps/web-e2e/src/document-detail.spec.ts @@ -0,0 +1,111 @@ +import { test, expect, type Page } from '@playwright/test'; + +/** + * Document detail + polling e2e. The API is mocked at the network layer. The + * detail endpoint returns `analyzing` on the first poll and `ready` (with a + * summary + classification) afterwards, so the test verifies the page shows the + * processing state and then auto-updates without a manual refresh. + */ + +const ORG_ID = '33333333-3333-4333-8333-333333333333'; +const DOC_ID = '44444444-4444-4444-8444-444444444444'; + +const SESSION = { + user: { + id: '11111111-1111-4111-8111-111111111111', + email: 'ada@example.com', + displayName: 'Ada Lovelace', + avatarUrl: null, + emailVerified: false, + createdAt: '2026-05-25T00:00:00.000Z', + }, + memberships: [ + { + id: '22222222-2222-4222-8222-222222222222', + orgId: ORG_ID, + role: 'owner', + organization: { + id: ORG_ID, + name: 'Analytical Engines', + slug: 'analytical-engines', + plan: 'free', + createdAt: '2026-05-25T00:00:00.000Z', + }, + }, + ], + tokens: { accessToken: 'mock.access.token', accessTokenExpiresAt: '2026-05-25T00:15:00.000Z' }, +}; + +const json = (body: unknown, status = 200) => ({ + status, + contentType: 'application/json', + body: JSON.stringify(body), +}); + +function makeDoc(status: 'analyzing' | 'ready') { + const ready = status === 'ready'; + return { + id: DOC_ID, + orgId: ORG_ID, + uploadedBy: SESSION.user.id, + filename: 'invoice.pdf', + mimeType: 'application/pdf', + sizeBytes: 2048, + status, + error: null, + category: ready ? 'Invoice' : null, + tags: ready ? ['finance', 'q2'] : [], + language: ready ? 'en' : null, + pageCount: 2, + createdAt: '2026-05-26T10:00:00.000Z', + updatedAt: '2026-05-26T10:00:00.000Z', + }; +} + +async function stubAuthenticatedBoot(page: Page): Promise { + await page.route('**/v1/auth/refresh', (route) => route.fulfill(json(SESSION))); +} + +test('detail page shows processing, then auto-updates to the analysis', async ({ page }) => { + await stubAuthenticatedBoot(page); + + let calls = 0; + await page.route(`**/v1/documents/${DOC_ID}`, (route) => { + calls += 1; + if (calls === 1) { + return route.fulfill(json({ document: makeDoc('analyzing'), analyses: [] })); + } + return route.fulfill( + json({ + document: makeDoc('ready'), + analyses: [ + { + id: '55555555-5555-4555-8555-555555555555', + documentId: DOC_ID, + kind: 'summary', + model: 'gpt-4o-mini', + promptVersion: 'summary.v1', + result: { + summary: 'This invoice bills consulting services.', + bullets: ['Total due: $1000', 'Net 30 terms'], + language: 'en', + }, + createdAt: '2026-05-26T10:01:00.000Z', + }, + ], + }), + ); + }); + + await page.goto(`/documents/${DOC_ID}`); + + // First poll: processing. + await expect(page.getByTestId('processing')).toBeVisible(); + + // A later poll flips to ready: summary + classification render, badge updates. + await expect(page.getByTestId('summary')).toContainText('consulting services', { + timeout: 10_000, + }); + await expect(page.getByTestId('detail-status')).toHaveText('ready'); + await expect(page.getByText('Invoice', { exact: true })).toBeVisible(); +}); diff --git a/apps/web/src/app/app.routes.ts b/apps/web/src/app/app.routes.ts index 1a99c36..74a5807 100644 --- a/apps/web/src/app/app.routes.ts +++ b/apps/web/src/app/app.routes.ts @@ -30,6 +30,12 @@ export const appRoutes: Route[] = [ loadComponent: () => import('./features/documents/documents.page').then((m) => m.DocumentsPage), }, + { + path: 'documents/:id', + title: 'Document · CloudDocs AI', + loadComponent: () => + import('./features/documents/document-detail.page').then((m) => m.DocumentDetailPage), + }, ], }, { diff --git a/apps/web/src/app/features/documents/document-detail.page.ts b/apps/web/src/app/features/documents/document-detail.page.ts new file mode 100644 index 0000000..c8a8655 --- /dev/null +++ b/apps/web/src/app/features/documents/document-detail.page.ts @@ -0,0 +1,145 @@ +import { ChangeDetectionStrategy, Component, computed, inject, input, signal } from '@angular/core'; +import { takeUntilDestroyed } from '@angular/core/rxjs-interop'; +import { RouterLink } from '@angular/router'; +import { Subscription, switchMap, timer } from 'rxjs'; + +import type { AiAnalysis, Document, SummaryResult } from '@clouddocs/shared-types'; + +import { apiErrorMessage } from '../../shared/utils/api-error'; +import { DocumentsService } from './documents.service'; +import { isProcessing, statusBadgeClass } from './document-status'; + +@Component({ + selector: 'app-document-detail', + changeDetection: ChangeDetectionStrategy.OnPush, + imports: [RouterLink], + template: ` + ← Documents + + @if (loadError()) { +

+ {{ loadError() }} +

+ } @else if (doc(); as d) { +
+
+

{{ d.filename }}

+

+ {{ formatSize(d.sizeBytes) }} + @if (d.pageCount) { + · {{ d.pageCount }} pages + } + @if (d.language) { + · {{ d.language }} + } +

+
+ {{ d.status }} +
+ + @if (processing()) { +
+ + Analyzing document… this updates automatically. +
+ } @else if (d.status === 'failed') { +
+ Processing failed{{ d.error ? ': ' + d.error : '' }}. +
+ } @else { + + @if (d.category) { +
+

Classification

+
+ + {{ d.category }} + + @for (tag of d.tags; track tag) { + + {{ tag }} + + } +
+
+ } + + + @if (summary(); as s) { +
+

Summary

+

{{ s.summary }}

+ @if (s.bullets.length) { +
    + @for (b of s.bullets; track b) { +
  • {{ b }}
  • + } +
+ } +
+ } + } + } @else { +

Loading…

+ } + `, +}) +export class DocumentDetailPage { + private readonly api = inject(DocumentsService); + + /** Bound from the route param via withComponentInputBinding. */ + readonly id = input.required(); + + protected readonly doc = signal(null); + protected readonly analyses = signal([]); + protected readonly loadError = signal(null); + + protected readonly processing = computed(() => { + const d = this.doc(); + return d ? isProcessing(d.status) : false; + }); + + protected readonly summary = computed(() => { + const row = this.analyses().find((a) => a.kind === 'summary'); + return row ? (row.result as SummaryResult) : null; + }); + + private readonly poll: Subscription; + + constructor() { + // Poll every 3s; the server response tells us when to stop (status leaves + // the processing set). takeUntilDestroyed cleans up on navigation. + this.poll = timer(0, 3000) + .pipe( + switchMap(() => this.api.get(this.id())), + takeUntilDestroyed(), + ) + .subscribe({ + next: (res) => { + this.doc.set(res.document); + this.analyses.set(res.analyses); + if (!isProcessing(res.document.status)) this.poll.unsubscribe(); + }, + error: (err) => this.loadError.set(apiErrorMessage(err, 'Could not load the document.')), + }); + } + + protected badgeClass = statusBadgeClass; + + protected formatSize(bytes: number): string { + if (bytes < 1024) return `${bytes} B`; + if (bytes < 1024 * 1024) return `${(bytes / 1024).toFixed(0)} KB`; + return `${(bytes / (1024 * 1024)).toFixed(1)} MB`; + } +} diff --git a/apps/web/src/app/features/documents/document-status.ts b/apps/web/src/app/features/documents/document-status.ts new file mode 100644 index 0000000..aa4f8b5 --- /dev/null +++ b/apps/web/src/app/features/documents/document-status.ts @@ -0,0 +1,27 @@ +import type { Document } from '@clouddocs/shared-types'; + +/** Statuses that mean "the pipeline is still working" — drive polling + spinners. */ +const PROCESSING: ReadonlySet = new Set([ + 'pending_upload', + 'uploaded', + 'extracting', + 'extracted', + 'analyzing', +]); + +export function isProcessing(status: Document['status']): boolean { + return PROCESSING.has(status); +} + +export function statusBadgeClass(status: Document['status']): string { + switch (status) { + case 'ready': + return 'border-emerald-500/40 bg-emerald-500/10 text-emerald-400'; + case 'failed': + return 'border-danger/40 bg-danger/10 text-danger'; + case 'pending_upload': + return 'border-border-strong bg-surface-3 text-text-dim'; + default: + return 'border-brand-500/40 bg-brand-500/10 text-brand-300'; + } +} diff --git a/apps/web/src/app/features/documents/documents.page.ts b/apps/web/src/app/features/documents/documents.page.ts index b250edc..c70ce5e 100644 --- a/apps/web/src/app/features/documents/documents.page.ts +++ b/apps/web/src/app/features/documents/documents.page.ts @@ -1,4 +1,7 @@ import { ChangeDetectionStrategy, Component, inject, OnInit, signal } from '@angular/core'; +import { takeUntilDestroyed } from '@angular/core/rxjs-interop'; +import { RouterLink } from '@angular/router'; +import { timer } from 'rxjs'; import type { Document } from '@clouddocs/shared-types'; @@ -6,6 +9,7 @@ import { apiErrorMessage } from '../../shared/utils/api-error'; import { DocumentsService } from './documents.service'; import { UploadDropzoneComponent } from './components/upload-dropzone.component'; import { UploadService, validateFile, type UploadHandle } from './upload.service'; +import { isProcessing, statusBadgeClass } from './document-status'; interface RejectedFile { name: string; @@ -15,7 +19,7 @@ interface RejectedFile { @Component({ selector: 'app-documents', changeDetection: ChangeDetectionStrategy.OnPush, - imports: [UploadDropzoneComponent], + imports: [UploadDropzoneComponent, RouterLink], template: `
@@ -90,8 +94,17 @@ interface RejectedFile { @for (doc of documents(); track doc.id) { - - {{ doc.filename }} + + + {{ doc.filename }} + @if (doc.category) { + {{ doc.category }} + } + {{ formatSize(doc.sizeBytes) }} ([]); protected readonly rejected = signal([]); + constructor() { + // While any document is still being processed, re-fetch the list every 3s + // so statuses (and categories) update without a manual refresh. + timer(3000, 3000) + .pipe(takeUntilDestroyed()) + .subscribe(() => { + if (this.documents().some((d) => isProcessing(d.status))) this.refresh(); + }); + } + ngOnInit(): void { this.refresh(); } @@ -188,16 +211,5 @@ export class DocumentsPage implements OnInit { }); } - protected statusClass(status: Document['status']): string { - switch (status) { - case 'ready': - return 'border-emerald-500/40 bg-emerald-500/10 text-emerald-400'; - case 'failed': - return 'border-danger/40 bg-danger/10 text-danger'; - case 'pending_upload': - return 'border-border-strong bg-surface-3 text-text-dim'; - default: - return 'border-brand-500/40 bg-brand-500/10 text-brand-300'; - } - } + protected readonly statusClass = statusBadgeClass; } diff --git a/apps/web/src/app/features/documents/documents.service.ts b/apps/web/src/app/features/documents/documents.service.ts index 7985ff5..5039df2 100644 --- a/apps/web/src/app/features/documents/documents.service.ts +++ b/apps/web/src/app/features/documents/documents.service.ts @@ -6,6 +6,7 @@ import type { CreateDocumentDto, CreateDocumentResponse, Document, + DocumentDetailResponse, DocumentListResponse, DownloadResponse, } from '@clouddocs/shared-types'; @@ -54,6 +55,13 @@ export class DocumentsService { return this.http.get(this.url, { headers: this.orgHeaders(), params }); } + /** Document detail + its AI analyses. */ + get(id: string): Observable { + return this.http.get(`${this.url}/${id}`, { + headers: this.orgHeaders(), + }); + } + download(id: string): Observable { return this.http.get(`${this.url}/${id}/download`, { headers: this.orgHeaders(), diff --git a/infra/bin/clouddocs.ts b/infra/bin/clouddocs.ts index eef3425..c9b2f74 100644 --- a/infra/bin/clouddocs.ts +++ b/infra/bin/clouddocs.ts @@ -5,6 +5,7 @@ import { loadConfig } from '../lib/config'; import { NetworkStack } from '../lib/stacks/network-stack'; import { StorageStack } from '../lib/stacks/storage-stack'; import { ApiStack } from '../lib/stacks/api-stack'; +import { PipelineStack } from '../lib/stacks/pipeline-stack'; import { ObservabilityStack } from '../lib/stacks/observability-stack'; const app = new cdk.App(); @@ -40,6 +41,15 @@ const api = new ApiStack(app, `${config.resourcePrefix}-api`, { uploadsBucket: storage.uploadsBucket, }); +const pipeline = new PipelineStack(app, `${config.resourcePrefix}-pipeline`, { + env: config.env, + description: 'Async AI pipeline: EventBridge + SQS + extract/summarize/classify workers.', + tags, + config, + uploadsBucket: storage.uploadsBucket, + apiSecret: api.apiSecret, +}); + const observability = new ObservabilityStack(app, `${config.resourcePrefix}-observability`, { // Billing metrics only exist in us-east-1 — pin the stack there regardless of app region. env: { account: config.env.account, region: 'us-east-1' }, @@ -53,6 +63,8 @@ const observability = new ObservabilityStack(app, `${config.resourcePrefix}-obse // listing it here keeps the dependency graph readable. api.addDependency(storage); api.addDependency(network); +pipeline.addDependency(storage); +pipeline.addDependency(api); // shares the API secret observability.addDependency(api); app.synth(); diff --git a/infra/lib/stacks/api-stack.ts b/infra/lib/stacks/api-stack.ts index 9ed7d3b..b9723c2 100644 --- a/infra/lib/stacks/api-stack.ts +++ b/infra/lib/stacks/api-stack.ts @@ -89,6 +89,13 @@ const DOC_ROUTES: readonly DocRoute[] = [ path: '/v1/documents', description: 'Keyset-paginated list of the active org documents.', }, + { + id: 'Get', + handlerDir: 'get', + method: apigwv2.HttpMethod.GET, + path: '/v1/documents/{id}', + description: 'Document detail with its AI analyses.', + }, { id: 'Complete', handlerDir: 'complete', diff --git a/infra/lib/stacks/pipeline-stack.ts b/infra/lib/stacks/pipeline-stack.ts new file mode 100644 index 0000000..a6655b3 --- /dev/null +++ b/infra/lib/stacks/pipeline-stack.ts @@ -0,0 +1,142 @@ +import * as path from 'node:path'; +import * as cdk from 'aws-cdk-lib'; +import * as logs from 'aws-cdk-lib/aws-logs'; +import * as sqs from 'aws-cdk-lib/aws-sqs'; +import * as events from 'aws-cdk-lib/aws-events'; +import * as targets from 'aws-cdk-lib/aws-events-targets'; +import { SqsEventSource } from 'aws-cdk-lib/aws-lambda-event-sources'; +import type * as s3 from 'aws-cdk-lib/aws-s3'; +import type * as secretsmanager from 'aws-cdk-lib/aws-secretsmanager'; +import type { Construct } from 'constructs'; +import type { InfraConfig } from '../config'; +import { NodejsHandler } from '../constructs/nodejs-handler'; + +export interface PipelineStackProps extends cdk.StackProps { + readonly config: InfraConfig; + readonly uploadsBucket: s3.IBucket; + readonly apiSecret: secretsmanager.ISecret; +} + +const WORKSPACE_ROOT = path.resolve(__dirname, '..', '..', '..'); +const HANDLERS_ROOT = path.join(WORKSPACE_ROOT, 'apps', 'api', 'src', 'handlers'); + +// Mirror of apps/api/src/lib/events.ts (separate build, can't import across it). +const EVENT_SOURCE = 'clouddocs.documents'; +const DOCUMENT_EXTRACTED = 'DocumentExtracted'; + +/** + * Async AI pipeline (plan §2.2): + * S3 ObjectCreated → EventBridge → ingest queue → extract-worker + * → `DocumentExtracted` event → EventBridge fan-out → summarize + classify + * queues → workers → ai_analyses → document `ready`. + * + * Each queue has a dead-letter queue (maxReceiveCount 3); the workers report + * per-record failures so only the failed message is retried. + */ +export class PipelineStack extends cdk.Stack { + constructor(scope: Construct, id: string, props: PipelineStackProps) { + super(scope, id, props); + + const { config, uploadsBucket, apiSecret } = props; + const prefix = config.resourcePrefix; + + const makeQueue = (name: string): sqs.Queue => { + const dlq = new sqs.Queue(this, `${name}Dlq`, { + queueName: `${prefix}-${name}-dlq`, + retentionPeriod: cdk.Duration.days(14), + }); + return new sqs.Queue(this, `${name}Queue`, { + queueName: `${prefix}-${name}`, + // Generous: a worker may wait on OpenAI. Keep > worker timeout. + visibilityTimeout: cdk.Duration.seconds(120), + deadLetterQueue: { queue: dlq, maxReceiveCount: 3 }, + }); + }; + + const ingestQueue = makeQueue('doc-ingest'); + const summarizeQueue = makeQueue('doc-summarize'); + const classifyQueue = makeQueue('doc-classify'); + + // S3 ObjectCreated (raw-uploads/) → ingest queue. + new events.Rule(this, 'DocUploadedRule', { + ruleName: `${prefix}-doc-uploaded`, + description: 'Route new raw uploads to the extract worker.', + eventPattern: { + source: ['aws.s3'], + detailType: ['Object Created'], + detail: { + bucket: { name: [uploadsBucket.bucketName] }, + object: { key: [{ prefix: 'raw-uploads/' }] }, + }, + }, + targets: [new targets.SqsQueue(ingestQueue)], + }); + + // DocumentExtracted → fan out to both analysis queues. + new events.Rule(this, 'DocExtractedRule', { + ruleName: `${prefix}-doc-extracted`, + description: 'Fan out extracted documents to summarize + classify.', + eventPattern: { + source: [EVENT_SOURCE], + detailType: [DOCUMENT_EXTRACTED], + }, + targets: [new targets.SqsQueue(summarizeQueue), new targets.SqsQueue(classifyQueue)], + }); + + const defaultBus = events.EventBus.fromEventBusName(this, 'DefaultBus', 'default'); + + const makeWorker = ( + id: string, + handlerDir: string, + queue: sqs.Queue, + opts: { memorySize: number; canWriteBucket: boolean; canPutEvents: boolean }, + ): void => { + const fn = new NodejsHandler(this, id, { + functionName: `${prefix}-worker-${handlerDir}`, + entry: path.join(HANDLERS_ROOT, 'workers', handlerDir, 'handler.ts'), + environment: { + STAGE: config.stage, + SECRET_ARN: apiSecret.secretArn, + UPLOADS_BUCKET: uploadsBucket.bucketName, + LOG_LEVEL: config.stage === 'prod' ? 'info' : 'debug', + }, + memorySize: opts.memorySize, + timeout: cdk.Duration.seconds(90), + minify: config.stage === 'prod', + sourceMap: config.stage !== 'prod', + logRetention: logs.RetentionDays.TWO_WEEKS, + logRemovalPolicy: + config.stage === 'prod' ? cdk.RemovalPolicy.RETAIN : cdk.RemovalPolicy.DESTROY, + }); + + apiSecret.grantRead(fn.function); + if (opts.canWriteBucket) uploadsBucket.grantReadWrite(fn.function); + else uploadsBucket.grantRead(fn.function); + if (opts.canPutEvents) defaultBus.grantPutEventsTo(fn.function); + + fn.function.addEventSource( + new SqsEventSource(queue, { batchSize: 1, reportBatchItemFailures: true }), + ); + }; + + // extract reads raw bytes, writes extracted text, and emits an event. + makeWorker('ExtractWorker', 'extract', ingestQueue, { + memorySize: 1024, // pdf-parse/pdfjs is memory-hungry + canWriteBucket: true, + canPutEvents: true, + }); + // analysis workers only read the extracted text + call OpenAI. + makeWorker('SummarizeWorker', 'summarize', summarizeQueue, { + memorySize: 512, + canWriteBucket: false, + canPutEvents: false, + }); + makeWorker('ClassifyWorker', 'classify', classifyQueue, { + memorySize: 512, + canWriteBucket: false, + canPutEvents: false, + }); + + new cdk.CfnOutput(this, 'IngestQueueUrl', { value: ingestQueue.queueUrl }); + } +} diff --git a/infra/lib/stacks/storage-stack.ts b/infra/lib/stacks/storage-stack.ts index f26c714..8381ff7 100644 --- a/infra/lib/stacks/storage-stack.ts +++ b/infra/lib/stacks/storage-stack.ts @@ -25,6 +25,9 @@ export class StorageStack extends cdk.Stack { encryption: s3.BucketEncryption.S3_MANAGED, enforceSSL: true, versioned: true, + // Emit S3 events to EventBridge so the Phase 4 ingest pipeline can react + // to ObjectCreated without per-bucket notification wiring. + eventBridgeEnabled: true, // dev only — protect prod once we get there. removalPolicy: config.stage === 'prod' ? cdk.RemovalPolicy.RETAIN : cdk.RemovalPolicy.DESTROY, autoDeleteObjects: config.stage !== 'prod', diff --git a/libs/shared-types/src/index.ts b/libs/shared-types/src/index.ts index 81b0b01..71ab828 100644 --- a/libs/shared-types/src/index.ts +++ b/libs/shared-types/src/index.ts @@ -9,3 +9,4 @@ export * from './schemas/health'; export * from './schemas/auth'; export * from './schemas/orgs'; export * from './schemas/documents'; +export * from './schemas/analysis'; diff --git a/libs/shared-types/src/schemas/analysis.spec.ts b/libs/shared-types/src/schemas/analysis.spec.ts new file mode 100644 index 0000000..cd467c4 --- /dev/null +++ b/libs/shared-types/src/schemas/analysis.spec.ts @@ -0,0 +1,35 @@ +import { describe, expect, it } from 'vitest'; + +import { ClassifyResultSchema, SummaryResultSchema } from './analysis'; + +describe('SummaryResultSchema', () => { + it('accepts a well-formed summary', () => { + const v = { summary: 'A short summary.', bullets: ['a', 'b'], language: 'en' }; + expect(SummaryResultSchema.parse(v)).toEqual(v); + }); + + it('caps bullets at 7', () => { + expect(() => + SummaryResultSchema.parse({ summary: 's', bullets: Array(8).fill('x'), language: 'en' }), + ).toThrow(); + }); +}); + +describe('ClassifyResultSchema', () => { + it('accepts a known category with confidence in range', () => { + const v = { category: 'Invoice', confidence: 0.92, tags: ['finance'] }; + expect(ClassifyResultSchema.parse(v)).toEqual(v); + }); + + it('rejects an unknown category', () => { + expect(() => + ClassifyResultSchema.parse({ category: 'Spaceship', confidence: 0.5, tags: [] }), + ).toThrow(); + }); + + it('rejects confidence outside 0..1', () => { + expect(() => + ClassifyResultSchema.parse({ category: 'Other', confidence: 1.5, tags: [] }), + ).toThrow(); + }); +}); diff --git a/libs/shared-types/src/schemas/analysis.ts b/libs/shared-types/src/schemas/analysis.ts new file mode 100644 index 0000000..588bca5 --- /dev/null +++ b/libs/shared-types/src/schemas/analysis.ts @@ -0,0 +1,58 @@ +import { z } from 'zod'; + +import { DocumentSchema } from './documents'; + +/** Kinds of AI analysis. Phase 4 produces summary + classification. */ +export const AnalysisKindSchema = z.enum(['summary', 'classification', 'entities', 'keywords']); +export type AnalysisKind = z.infer; + +/** Predefined document categories the classifier picks from (plan §8 #6). */ +export const DOCUMENT_CATEGORIES = [ + 'Contract', + 'Invoice', + 'Report', + 'Resume', + 'Letter', + 'Presentation', + 'Other', +] as const; +export const DocumentCategorySchema = z.enum(DOCUMENT_CATEGORIES); +export type DocumentCategory = z.infer; + +/** + * Structured output of the summary prompt. This is the exact shape we ask + * OpenAI to return (json schema) and what we store in `ai_analyses.result`. + */ +export const SummaryResultSchema = z.object({ + summary: z.string(), + bullets: z.array(z.string()).max(7), + language: z.string().describe('ISO 639-1 code of the document language'), +}); +export type SummaryResult = z.infer; + +/** Structured output of the classification prompt. */ +export const ClassifyResultSchema = z.object({ + category: DocumentCategorySchema, + confidence: z.number().min(0).max(1), + tags: z.array(z.string()).max(8), +}); +export type ClassifyResult = z.infer; + +/** A stored analysis row as returned by the API. */ +export const AiAnalysisSchema = z.object({ + id: z.uuid(), + documentId: z.uuid(), + kind: AnalysisKindSchema, + model: z.string(), + promptVersion: z.string(), + result: z.unknown(), + createdAt: z.iso.datetime(), +}); +export type AiAnalysis = z.infer; + +/** GET /v1/documents/{id} — the document plus all its analyses. */ +export const DocumentDetailResponseSchema = z.object({ + document: DocumentSchema, + analyses: z.array(AiAnalysisSchema), +}); +export type DocumentDetailResponse = z.infer; diff --git a/libs/shared-types/src/schemas/documents.ts b/libs/shared-types/src/schemas/documents.ts index ac5265d..cf3425d 100644 --- a/libs/shared-types/src/schemas/documents.ts +++ b/libs/shared-types/src/schemas/documents.ts @@ -36,6 +36,11 @@ export const DocumentSchema = z.object({ sizeBytes: z.number().int().nonnegative(), status: DocumentStatusSchema, error: z.string().nullable(), + // Populated by the AI pipeline (Phase 4); null until processed. + category: z.string().nullable(), + tags: z.array(z.string()), + language: z.string().nullable(), + pageCount: z.number().int().nonnegative().nullable(), createdAt: z.iso.datetime(), updatedAt: z.iso.datetime(), }); diff --git a/package.json b/package.json index 540193f..ecb77af 100644 --- a/package.json +++ b/package.json @@ -31,6 +31,7 @@ "@angular/cli": "~21.2.0", "@angular/compiler-cli": "~21.2.0", "@angular/language-service": "~21.2.0", + "@aws-sdk/client-eventbridge": "^3.1053.0", "@aws-sdk/client-s3": "^3.1053.0", "@aws-sdk/client-secrets-manager": "^3.1052.0", "@aws-sdk/s3-request-presigner": "^3.1053.0", @@ -55,6 +56,7 @@ "@swc/helpers": "0.5.18", "@types/aws-lambda": "^8.10.161", "@types/node": "20.19.9", + "@types/pdf-parse": "^1.1.5", "@types/pg": "^8.20.0", "@typescript-eslint/utils": "^8.40.0", "angular-eslint": "^21.2.0", @@ -89,6 +91,9 @@ "@angular/platform-browser": "~21.2.0", "@angular/router": "~21.2.0", "@tailwindcss/postcss": "^4.3.0", + "mammoth": "^1.12.0", + "openai": "^6.39.0", + "pdf-parse": "^1.1.4", "postcss": "^8.5.15", "rxjs": "~7.8.0", "tailwindcss": "^4.3.0" diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index b482288..643e9f8 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -32,6 +32,15 @@ importers: '@tailwindcss/postcss': specifier: ^4.3.0 version: 4.3.0 + mammoth: + specifier: ^1.12.0 + version: 1.12.0 + openai: + specifier: ^6.39.0 + version: 6.39.0(ws@8.18.0)(zod@4.4.3) + pdf-parse: + specifier: ^1.1.4 + version: 1.1.4 postcss: specifier: ^8.5.15 version: 8.5.15 @@ -60,6 +69,9 @@ importers: '@angular/language-service': specifier: ~21.2.0 version: 21.2.13 + '@aws-sdk/client-eventbridge': + specifier: ^3.1053.0 + version: 3.1053.0 '@aws-sdk/client-s3': specifier: ^3.1053.0 version: 3.1053.0 @@ -132,6 +144,9 @@ importers: '@types/node': specifier: 20.19.9 version: 20.19.9 + '@types/pdf-parse': + specifier: ^1.1.5 + version: 1.1.5 '@types/pg': specifier: ^8.20.0 version: 8.20.0 @@ -514,6 +529,10 @@ packages: '@aws-crypto/util@5.2.0': resolution: {integrity: sha512-4RkU9EsI6ZpBve5fseQlGNUWKMa1RLPQ1dnjnQoe07ldfIzcsGb5hC5W0Dm7u423KWzawlrpbjXBrXCEv9zazQ==} + '@aws-sdk/client-eventbridge@3.1053.0': + resolution: {integrity: sha512-0ziD09G2HTSd+z0khUhPphk5tkG21j4+AhnY2ADq0QuSXYRZlpzEHqCiyYOuyJtqW284NJaJH+6DSK4xOwe9mA==} + engines: {node: '>=20.0.0'} + '@aws-sdk/client-s3@3.1053.0': resolution: {integrity: sha512-/oGxoB6p1Nqs935Blt+v1o+anSCEf2n3RjIrcLz84i4cn2Gr+Z7JpDdUkG5+74r5ctqEPG7k/phTGbJ9fNKnHg==} engines: {node: '>=20.0.0'} @@ -3446,6 +3465,9 @@ packages: '@types/parse-json@4.0.2': resolution: {integrity: sha512-dISoDXWWQwUquiKsyZ4Ng+HX2KsPL7LyHKHQwgGFEA3IaKac4Obd+h2a/a6waisAoepJlBcx9paWqjA8/HVjCw==} + '@types/pdf-parse@1.1.5': + resolution: {integrity: sha512-kBfrSXsloMnUJOKi25s3+hRmkycHfLK6A09eRGqF/N8BkQoPUmaCr+q8Cli5FnfohEz/rsv82zAiPz/LXtOGhA==} + '@types/pg@8.20.0': resolution: {integrity: sha512-bEPFOaMAHTEP1EzpvHTbmwR8UsFyHSKsRisLIHVMXnpNefSbGA1bD6CVy+qKjGSqmZqNqBDV2azOBo8TgkcVow==} @@ -3750,6 +3772,10 @@ packages: '@webassemblyjs/wast-printer@1.14.1': resolution: {integrity: sha512-kPSSXE6De1XOR820C90RIo2ogvZG+c3KiHzqUoO/F34Y2shGzesfqv7o57xrxovZJH/MetF5UjroJ/R/3isoiw==} + '@xmldom/xmldom@0.8.13': + resolution: {integrity: sha512-KRYzxepc14G/CEpEGc3Yn+JKaAeT63smlDr+vjB8jRfgTBBI9wRj/nkQEO+ucV8p8I9bfKLWp37uHgFrbntPvw==} + engines: {node: '>=10.0.0'} + '@xtuc/ieee754@1.2.0': resolution: {integrity: sha512-DX8nKgqcGwsc0eJSqYt5lwP4DH5FlHnmuWWBRy7X0NcaGR0ZtuyeESgMwTYVEtxmsNGY+qit4QYT/MIYTOTPeA==} @@ -4052,6 +4078,9 @@ packages: bl@4.1.0: resolution: {integrity: sha512-1W07cM9gS6DcLperZfFSj+bWLtaPGSOHWhPiGzXmvVJbRLdG82sH/Kn8EtW1VqWVA54AKf2h5k5BbnIbwF3h6w==} + bluebird@3.4.7: + resolution: {integrity: sha512-iD3898SR7sWVRHbiQv+sHUtHnMvC1o3nW5rAcqnq3uOn07DSAppZYUkIGslDz6gXC7HfunPe7YVBgoEJASPcHA==} + body-parser@1.20.5: resolution: {integrity: sha512-3grm+/2tUOvu2cjJkvsIxrv/wVpfXQW4PsQHYm7yk4vfpu7Ekl6nEsYBoJUL6qDwZUx8wUhQ8tR2qz+ad9c9OA==} engines: {node: '>= 0.8', npm: 1.2.8000 || >= 1.4.16} @@ -4598,6 +4627,9 @@ packages: engines: {node: '>= 16.0.0'} hasBin: true + dingbat-to-unicode@1.0.1: + resolution: {integrity: sha512-98l0sW87ZT58pU4i61wa2OHwxbiYSbuxsCBozaVnYX2iCnr3bLM3fIes1/ej7h1YdOKuKt/MLs706TVnALA65w==} + dns-packet@5.6.1: resolution: {integrity: sha512-l4gcSouhcgIKRvyy99RNVOgxXiicE+2jZoNmaNmZ6JXiGajBOJAesk1OBlJuM5k2c+eudGdLxDqXuPCKIj6kpw==} engines: {node: '>=6'} @@ -4635,6 +4667,9 @@ packages: resolution: {integrity: sha512-nI4U3TottKAcAD9LLud4Cb7b2QztQMUEfHbvhTH09bqXTxnSie8WnjPALV/WMCrJZ6UV/qHJ6L03OqO3LcdYZw==} engines: {node: '>=12'} + duck@0.1.12: + resolution: {integrity: sha512-wkctla1O6VfP89gQ+J/yDesM0S7B7XLXjKGzXxMDVFg7uEn706niAtyYovKbyq1oT9YwDcly721/iUWoc8MVRg==} + dunder-proto@1.0.1: resolution: {integrity: sha512-KIN/nDJBQRcXw0MLVhZE9iQHmG68qAVIBg9CqmUYjmQIhgij9U5MFvrqkUL5FbtyyzZuOeOt0zdeRe4UY7ct+A==} engines: {node: '>= 0.4'} @@ -5344,6 +5379,9 @@ packages: engines: {node: '>=0.10.0'} hasBin: true + immediate@3.0.6: + resolution: {integrity: sha512-XXOFtyqDjNDAQxVfYxuF7g9Il/IbWmmlQg2MYKOH8ExIT1qg6xc4zyS3HaEEATgs1btfzxq15ciUiY7gjSXRGQ==} + immutable@5.1.5: resolution: {integrity: sha512-t7xcm2siw+hlUM68I+UEOK+z84RzmN59as9DZ7P1l0994DKUWV7UXBMQZVxaoMSRQ+PBZbHCOoBt7a2wxOMt+A==} @@ -5752,6 +5790,9 @@ packages: resolution: {integrity: sha512-POQXvpdL69+CluYsillJ7SUhKvytYjW9vG/GKpnf+xP8UWgYEM/RaMzHHofbALDiKbbP1W8UEYmgGl39WkPZsg==} engines: {'0': node >= 0.2.0} + jszip@3.10.1: + resolution: {integrity: sha512-xXDvecyTpGLrqFrvkrUSoxxfJI5AH7U8zxxtVclpsUtMCq4JQ290LY8AW5c7Ggnr/Y/oK+bQMbqK2qmtk3pN4g==} + keyv@4.5.4: resolution: {integrity: sha512-oxVHkHR/EJf2CNXnWxRLW6mg7JyCCUcG0DtEGmL2ctUo1PNTin1PUil+r/+4r5MpVgC/fn1kjsx7mjSujKqIpw==} @@ -5800,6 +5841,9 @@ packages: webpack: optional: true + lie@3.3.0: + resolution: {integrity: sha512-UaiMJzeWRlEujzAuw5LokY1L5ecNQYZKfmyZ9L7wDHb/p5etKaxXhohBcrw0EYby+G/NA52vRSN4N39dxHAIwQ==} + lightningcss-android-arm64@1.32.0: resolution: {integrity: sha512-YK7/ClTt4kAK0vo6w3X+Pnm0D2cf2vPHbhOXdoNti1Ga0al1P4TBZhwjATvjNwLEBCnKvjJc2jQgHXH0NEwlAg==} engines: {node: '>= 12.0.0'} @@ -5970,6 +6014,9 @@ packages: long-timeout@0.1.1: resolution: {integrity: sha512-BFRuQUqc7x2NWxfJBCyUrN8iYUYznzL9JROmRz1gZ6KlOIgmoD+njPVbb+VNn2nGMKggMsK79iUNErillsrx7w==} + lop@0.4.2: + resolution: {integrity: sha512-RefILVDQ4DKoRZsJ4Pj22TxE3omDO47yFpkIBoDKzkqPRISs5U1cnAdg/5583YPkWPaLIYHOKRMQSvjFsO26cw==} + lru-cache@10.4.3: resolution: {integrity: sha512-JNAzZcXrCt42VGLuYz0zfAzDfAvJWW6AfYlDBQyDV5DClI2m5sAmK+OIO7s59XfsRsWHp02jAJrRadPRGTt6SQ==} @@ -6002,6 +6049,11 @@ packages: makeerror@1.0.12: resolution: {integrity: sha512-JmqCvUhmt43madlpFzG4BQzG2Z3m6tvQDNKdClZnO3VbIudJYmxsT0FNJMeiB2+JTSlTQTSbU8QdesVmwJcmLg==} + mammoth@1.12.0: + resolution: {integrity: sha512-cwnK1RIcRdDMi2HRx2EXGYlxqIEh0Oo3bLhorgnsVJi2UkbX1+jKxuBNR9PC5+JaX7EkmJxFPmo6mjLpqShI2w==} + engines: {node: '>=12.0.0'} + hasBin: true + math-intrinsics@1.1.0: resolution: {integrity: sha512-/IXtbwEk5HTPyEwyKX6hGkYXxM9nbj64B+ilVJnC/R6B0pH5G4V3b0pVbL7DBj4tkhBAppbQUlf6F6Xl9LHu1g==} engines: {node: '>= 0.4'} @@ -6209,6 +6261,9 @@ packages: node-addon-api@7.1.1: resolution: {integrity: sha512-5m3bsyrjFWE1xf7nz7YXdN4udnVtXK6/Yfgn5qnahL6bCkf2yKt4k3nuTKAtT4r3IG8JNR2ncsIMdZuAzJjHQQ==} + node-ensure@0.0.0: + resolution: {integrity: sha512-DRI60hzo2oKN1ma0ckc6nQWlHU69RH6xN0sjQTjMpChPfTYvKZdcQFfdYK2RWbJcKyUizSIy/l8OTGxMAM1QDw==} + node-fetch@2.7.0: resolution: {integrity: sha512-c4FRfUm/dbcWZ7U+1Wq0AwCyFL+3nt2bEw05wfxSz+DWpWsitgmSgYmy2dQdWyKC1694ELPqMs/YzUSNozLt8A==} engines: {node: 4.x || >=6.0.0} @@ -6361,10 +6416,25 @@ packages: resolution: {integrity: sha512-7x81NCL719oNbsq/3mh+hVrAWmFuEYUqrq/Iw3kUzH8ReypT9QQ0BLoJS7/G9k6N81XjW4qHWtjWwe/9eLy1EQ==} engines: {node: '>=12'} + openai@6.39.0: + resolution: {integrity: sha512-O61LIsimY3acVabwvomwFhwrnN36yvHY2quIfy9keEcFytGgWeV35yLHQ6NVMLSBxRpHmcg2yuhCnlu2HT4pLQ==} + hasBin: true + peerDependencies: + ws: ^8.18.0 + zod: ^3.25 || ^4.0 + peerDependenciesMeta: + ws: + optional: true + zod: + optional: true + opener@1.5.2: resolution: {integrity: sha512-ur5UIdyw5Y7yEj9wLzhqXiy6GZ3Mwx0yGI+5sMn2r0N0v3cKJvUmFH5yPP+WXh9e0xfyzyJX95D8l088DNFj7A==} hasBin: true + option@0.2.4: + resolution: {integrity: sha512-pkEqbDyl8ou5cpq+VsnQbe/WlEy5qS7xPzMS1U55OCG9KPvwFD46zDbxQIj3egJSFc3D+XhYOPUzz49zQAVy7A==} + optionator@0.9.4: resolution: {integrity: sha512-6IpQ7mKUxRcZNLIObR0hz7lxsapSSIYNZJwXPGeF0mTVqGKFIXj1DQcMoT22S3ROcLyY/rz0PWaWZ9ayWmad9g==} engines: {node: '>= 0.8.0'} @@ -6427,6 +6497,9 @@ packages: engines: {node: ^20.17.0 || >=22.9.0} hasBin: true + pako@1.0.11: + resolution: {integrity: sha512-4hLB8Py4zZce5s4yd9XzopqwVv/yGNhV1Bl8NTmCq1763HeK2+EwVTv+leGeL13Dnh2wfbqowVPXCIO0z4taYw==} + parent-module@1.0.1: resolution: {integrity: sha512-GQ2EWRpQV8/o+Aw8YqtfZZPfNRWZYkbidE9k5rpl/hC3vtHHBfGm2Ifi6qWV+coDGkrUKZAxE3Lot5kcsRlh+g==} engines: {node: '>=6'} @@ -6507,6 +6580,10 @@ packages: pathe@2.0.3: resolution: {integrity: sha512-WUjGcAqP1gQacoQe+OBJsFA7Ld4DyXuUIjZ5cc75cLHvJ7dtNsTugphxIADwspS+AraAUePCKrSVtPLFj/F88w==} + pdf-parse@1.1.4: + resolution: {integrity: sha512-XRIRcLgk6ZnUbsHsYXExMw+krrPE81hJ6FQPLdBNhhBefqIQKXu/WeTgNBGSwPrfU0v+UCEwn7AoAUOsVKHFvQ==} + engines: {node: '>=6.8.1'} + pg-cloudflare@1.4.0: resolution: {integrity: sha512-Vo7z/6rrQYxpNRylp4Tlob2elzbh+N/MOQbxFVWCxS7oEx6jF53GTJFxK2WWpKuBRkmiin4Mt+xofFDjx09R0A==} @@ -7299,6 +7376,9 @@ packages: resolution: {integrity: sha512-xRXBn0pPqQTVQiC8wyQrKs2MOlX24zQ0POGaj0kultvoOCstBQM5yvOhAVSUwOMjQtTvsPWoNCHfPGwaaQJhTw==} engines: {node: '>= 18'} + setimmediate@1.0.5: + resolution: {integrity: sha512-MATJdZp8sLqDl/68LfQmbP8zKPLQNV6BIZoIgrscFDQ+RsvK/BxeDQOgyxKKoh0y/8h3BqVFnCqQ/gd+reiIXA==} + setprototypeof@1.2.0: resolution: {integrity: sha512-E5LDX7Wrp85Kil5bhZv46j8jOeboKq5JMmYM3gVGdGH8xFpPWXUMsNrlODCrkoxMEeNi/XZIwuRvY4XNwYMJpw==} @@ -7811,6 +7891,9 @@ packages: engines: {node: '>=14.17'} hasBin: true + underscore@1.13.8: + resolution: {integrity: sha512-DXtD3ZtEQzc7M8m4cXotyHR+FAS18C64asBYY5vqZexfYryNNnDc02W4hKg3rdQuqOYas1jkseX0+nZXjTXnvQ==} + undici-types@6.21.0: resolution: {integrity: sha512-iwDZqg0QAGrg9Rav5H4n0M64c3mkR59cJ6wQp+7C4nI0gsmExaedaYLNO44eT4AtBBwjbTiGPMlt2Md0T9H9JQ==} @@ -8179,6 +8262,10 @@ packages: resolution: {integrity: sha512-k8KO9hrMyNk6tUWqUfkTEZbezRRpONVOzUTnc97VnCvyj6Tf9lyUR9EDAIeiVLv56jsMcoXEwjW8Kv5yPY52lw==} engines: {node: '>=16.0.0'} + xmlbuilder@10.1.1: + resolution: {integrity: sha512-OyzrcFLL/nb6fMGHbiRDuPup9ljBycsdCypwuyg5AAHvyWzGfChJpCXMG88AGTIMFhGZ9RccFN1e6lhg3hkwKg==} + engines: {node: '>=4.0'} + xmlchars@2.2.0: resolution: {integrity: sha512-JZnDKK8B0RCDw84FNdDAIpZK+JuJw+s7Lz8nksI7SIuU3UXJJslUthsi+uWBUYOwPFwW7W7PRLRfUKpxjtjFCw==} @@ -8652,6 +8739,20 @@ snapshots: '@smithy/util-utf8': 2.3.0 tslib: 2.8.1 + '@aws-sdk/client-eventbridge@3.1053.0': + dependencies: + '@aws-crypto/sha256-browser': 5.2.0 + '@aws-crypto/sha256-js': 5.2.0 + '@aws-sdk/core': 3.974.13 + '@aws-sdk/credential-provider-node': 3.972.44 + '@aws-sdk/signature-v4-multi-region': 3.996.28 + '@aws-sdk/types': 3.973.9 + '@smithy/core': 3.24.4 + '@smithy/fetch-http-handler': 5.4.4 + '@smithy/node-http-handler': 4.7.4 + '@smithy/types': 4.14.2 + tslib: 2.8.1 + '@aws-sdk/client-s3@3.1053.0': dependencies: '@aws-crypto/sha1-browser': 5.2.0 @@ -12230,6 +12331,10 @@ snapshots: '@types/parse-json@4.0.2': {} + '@types/pdf-parse@1.1.5': + dependencies: + '@types/node': 20.19.9 + '@types/pg@8.20.0': dependencies: '@types/node': 20.19.9 @@ -12563,6 +12668,8 @@ snapshots: '@webassemblyjs/ast': 1.14.1 '@xtuc/long': 4.2.2 + '@xmldom/xmldom@0.8.13': {} + '@xtuc/ieee754@1.2.0': {} '@xtuc/long@4.2.2': {} @@ -12911,6 +13018,8 @@ snapshots: inherits: 2.0.4 readable-stream: 3.6.2 + bluebird@3.4.7: {} + body-parser@1.20.5: dependencies: bytes: 3.1.2 @@ -13460,6 +13569,8 @@ snapshots: dependencies: address: 2.0.3 + dingbat-to-unicode@1.0.1: {} + dns-packet@5.6.1: dependencies: '@leichtgewicht/ip-codec': 2.0.5 @@ -13501,6 +13612,10 @@ snapshots: dotenv@17.4.2: {} + duck@0.1.12: + dependencies: + underscore: 1.13.8 + dunder-proto@1.0.1: dependencies: call-bind-apply-helpers: 1.0.2 @@ -14325,6 +14440,8 @@ snapshots: image-size@0.5.5: optional: true + immediate@3.0.6: {} + immutable@5.1.5: {} import-fresh@3.3.1: @@ -14852,6 +14969,13 @@ snapshots: jsonparse@1.3.1: {} + jszip@3.10.1: + dependencies: + lie: 3.3.0 + pako: 1.0.11 + readable-stream: 2.3.8 + setimmediate: 1.0.5 + keyv@4.5.4: dependencies: json-buffer: 3.0.1 @@ -14902,6 +15026,10 @@ snapshots: optionalDependencies: webpack: 5.106.2(@swc/core@1.15.8(@swc/helpers@0.5.18))(esbuild@0.27.3)(lightningcss@1.32.0)(postcss@8.5.15) + lie@3.3.0: + dependencies: + immediate: 3.0.6 + lightningcss-android-arm64@1.32.0: optional: true @@ -15070,6 +15198,12 @@ snapshots: long-timeout@0.1.1: {} + lop@0.4.2: + dependencies: + duck: 0.1.12 + option: 0.2.4 + underscore: 1.13.8 + lru-cache@10.4.3: {} lru-cache@11.5.0: {} @@ -15115,6 +15249,19 @@ snapshots: dependencies: tmpl: 1.0.5 + mammoth@1.12.0: + dependencies: + '@xmldom/xmldom': 0.8.13 + argparse: 1.0.10 + base64-js: 1.5.1 + bluebird: 3.4.7 + dingbat-to-unicode: 1.0.1 + jszip: 3.10.1 + lop: 0.4.2 + path-is-absolute: 1.0.1 + underscore: 1.13.8 + xmlbuilder: 10.1.1 + math-intrinsics@1.1.0: {} mdn-data@2.0.28: {} @@ -15296,6 +15443,8 @@ snapshots: node-addon-api@7.1.1: optional: true + node-ensure@0.0.0: {} + node-fetch@2.7.0(encoding@0.1.13): dependencies: whatwg-url: 5.0.0 @@ -15573,8 +15722,15 @@ snapshots: is-docker: 2.2.1 is-wsl: 2.2.0 + openai@6.39.0(ws@8.18.0)(zod@4.4.3): + optionalDependencies: + ws: 8.18.0 + zod: 4.4.3 + opener@1.5.2: {} + option@0.2.4: {} + optionator@0.9.4: dependencies: deep-is: 0.1.4 @@ -15693,6 +15849,8 @@ snapshots: transitivePeerDependencies: - supports-color + pako@1.0.11: {} + parent-module@1.0.1: dependencies: callsites: 3.1.0 @@ -15758,6 +15916,10 @@ snapshots: pathe@2.0.3: {} + pdf-parse@1.1.4: + dependencies: + node-ensure: 0.0.0 + pg-cloudflare@1.4.0: optional: true @@ -16559,6 +16721,8 @@ snapshots: transitivePeerDependencies: - supports-color + setimmediate@1.0.5: {} + setprototypeof@1.2.0: {} shallow-clone@3.0.1: @@ -17053,6 +17217,8 @@ snapshots: typescript@5.9.3: {} + underscore@1.13.8: {} + undici-types@6.21.0: {} undici@6.25.0: {} @@ -17432,6 +17598,8 @@ snapshots: xml-naming@0.1.0: {} + xmlbuilder@10.1.1: {} + xmlchars@2.2.0: {} xtend@4.0.2: {} diff --git a/tools/migrations/1779802004247_ai-analyses.sql b/tools/migrations/1779802004247_ai-analyses.sql new file mode 100644 index 0000000..16a125c --- /dev/null +++ b/tools/migrations/1779802004247_ai-analyses.sql @@ -0,0 +1,42 @@ +-- Up Migration + +-- Document columns the AI pipeline fills in (Phase 4) and Phase 5 search uses. +-- Added nullable so existing rows stay valid; populated by the workers. +ALTER TABLE documents + ADD COLUMN text_s3_key TEXT, -- extracted plain text in S3 + ADD COLUMN page_count INT, + ADD COLUMN language TEXT, -- ISO 639-1, from extraction/classify + ADD COLUMN category TEXT, -- classify result + ADD COLUMN tags TEXT[] NOT NULL DEFAULT '{}'; + +CREATE INDEX idx_documents_tags ON documents USING GIN (tags); + +-- AI analyses (plan §5.1). One row per (document, kind); a re-run inserts a new +-- row with a bumped prompt_version, so we keep history rather than overwrite. +CREATE TABLE ai_analyses ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + document_id UUID NOT NULL REFERENCES documents(id) ON DELETE CASCADE, + org_id UUID NOT NULL REFERENCES organizations(id) ON DELETE CASCADE, + kind TEXT NOT NULL CHECK (kind IN ('summary', 'classification', 'entities', 'keywords')), + model TEXT NOT NULL, + prompt_version TEXT NOT NULL, + input_tokens INT, + output_tokens INT, + cost_usd NUMERIC(10, 6), + result JSONB NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT now() +); +CREATE INDEX idx_ai_analyses_doc ON ai_analyses(document_id, kind); +CREATE INDEX idx_ai_analyses_org_created ON ai_analyses(org_id, created_at DESC); + + +-- Down Migration + +DROP TABLE IF EXISTS ai_analyses; +DROP INDEX IF EXISTS idx_documents_tags; +ALTER TABLE documents + DROP COLUMN IF EXISTS text_s3_key, + DROP COLUMN IF EXISTS page_count, + DROP COLUMN IF EXISTS language, + DROP COLUMN IF EXISTS category, + DROP COLUMN IF EXISTS tags; diff --git a/tools/scripts/populate-dev-secrets.ts b/tools/scripts/populate-dev-secrets.ts index 63b02f4..7446390 100644 --- a/tools/scripts/populate-dev-secrets.ts +++ b/tools/scripts/populate-dev-secrets.ts @@ -1,11 +1,12 @@ /** - * Uploads DATABASE_URL + JWT_PRIVATE_KEY + JWT_PUBLIC_KEY from `.env.local` - * into the AWS Secrets Manager secret `clouddocs/{stage}/api`. Run after - * each CDK deploy that touches the API stack: + * Uploads DATABASE_URL + JWT_PRIVATE_KEY + JWT_PUBLIC_KEY (required) and + * OPENAI_API_KEY (optional, Phase 4 AI workers) from `.env.local` into the AWS + * Secrets Manager secret `clouddocs/{stage}/api`. Run after each CDK deploy + * that touches the API stack: * * AWS_PROFILE=clouddocs-dev pnpm secrets:put:dev * - * Prereqs: `.env.local` exists with the three keys; AWS credentials in + * Prereqs: `.env.local` exists with the required keys; AWS credentials in * scope; the secret already created by CDK. */ import { readFileSync } from 'node:fs'; @@ -16,6 +17,8 @@ const STAGE = process.env.STAGE ?? 'dev'; const REGION = process.env.AWS_REGION ?? 'sa-east-1'; const SECRET_NAME = `clouddocs/${STAGE}/api`; const REQUIRED_KEYS = ['DATABASE_URL', 'JWT_PRIVATE_KEY', 'JWT_PUBLIC_KEY'] as const; +/** Uploaded only when present in .env.local. */ +const OPTIONAL_KEYS = ['OPENAI_API_KEY'] as const; function parseEnvLocal(): Record { const path = resolve(process.cwd(), '.env.local'); @@ -56,13 +59,21 @@ async function main(): Promise { } payload[key] = value; } + const uploaded: string[] = [...REQUIRED_KEYS]; + for (const key of OPTIONAL_KEYS) { + const value = env[key]; + if (typeof value === 'string' && value.length > 0) { + payload[key] = value; + uploaded.push(key); + } + } const client = new SecretsManagerClient({ region: REGION }); await client.send( new PutSecretValueCommand({ SecretId: SECRET_NAME, SecretString: JSON.stringify(payload) }), ); - console.log(`Updated ${SECRET_NAME} in ${REGION} with: ${REQUIRED_KEYS.join(', ')}`); + console.log(`Updated ${SECRET_NAME} in ${REGION} with: ${uploaded.join(', ')}`); } main().catch((err) => {