From 3dc2bb1200c4e08a0c124d0e271093b52a9db157 Mon Sep 17 00:00:00 2001 From: jlfetter1 <71606450+jlfetter1@users.noreply.github.com> Date: Sun, 24 May 2026 12:35:01 -0400 Subject: [PATCH 1/3] Clamp remote source overrides --- src/core/operations.ts | 65 +++++++++--- src/core/pglite-engine.ts | 16 ++- src/core/postgres-engine.ts | 20 ++++ src/core/think/gather.ts | 16 ++- src/core/think/index.ts | 3 + test/e2e/source-isolation-pglite.test.ts | 128 +++++++++++++++++++++++ 6 files changed, 229 insertions(+), 19 deletions(-) diff --git a/src/core/operations.ts b/src/core/operations.ts index d69ac7d40..ca065c51c 100644 --- a/src/core/operations.ts +++ b/src/core/operations.ts @@ -421,6 +421,48 @@ export function sourceScopeOpts(ctx: OperationContext): { sourceId?: string; sou return {}; } +export function sourceScopeOptsForRequest( + ctx: OperationContext, + requestedSourceId?: string, + opts: { allowAll?: boolean } = {}, +): { sourceId?: string; sourceIds?: string[] } { + if (!requestedSourceId) return sourceScopeOpts(ctx); + + const allowAll = opts.allowAll !== false; + + if (ctx.remote === true) { + const allowed = ctx.auth?.allowedSources && ctx.auth.allowedSources.length > 0 + ? ctx.auth.allowedSources + : ctx.sourceId + ? [ctx.sourceId] + : []; + + if (requestedSourceId === '__all__') { + if (!allowAll) { + throw new OperationError('permission_denied', 'Requested source is outside caller read scope'); + } + if (allowed.length === 0) { + throw new OperationError('permission_denied', 'Requested source is outside caller read scope'); + } + return { sourceIds: allowed }; + } + + if (!allowed.includes(requestedSourceId)) { + throw new OperationError('permission_denied', 'Requested source is outside caller read scope'); + } + + return { sourceId: requestedSourceId }; + } + + if (requestedSourceId === '__all__') { + if (!allowAll) { + throw new OperationError('invalid_params', "source_id='__all__' is not supported for this operation"); + } + return {}; + } + return { sourceId: requestedSourceId }; +} + export interface Operation { name: string; description: string; @@ -456,17 +498,16 @@ const get_page: Operation = { slug: { type: 'string', required: true, description: 'Page slug' }, fuzzy: { type: 'boolean', description: 'Enable fuzzy slug resolution (default: false)' }, include_deleted: { type: 'boolean', description: 'v0.26.5: surface soft-deleted pages with deleted_at populated (default: false). Used by restore workflows.' }, + source_id: { type: 'string', description: "Read from a specific source within caller read scope. '__all__' is rejected because slug resolution is ambiguous across sources." }, }, handler: async (ctx, p) => { const slug = p.slug as string; const fuzzy = (p.fuzzy as boolean) || false; const includeDeleted = (p.include_deleted as boolean) === true; - // v0.31.8 (D20): thread ctx.sourceId through read-side ops. Only pass - // sourceId when it's set on ctx — when unset (local CLI default chain - // resolves to no source), the engine two-branch query falls through to - // the cross-source view, preserving pre-v0.31.8 behavior. MCP callers - // (stdio + HTTP) populate ctx.sourceId via the transport layer. - const sourceOpts = ctx.sourceId ? { sourceId: ctx.sourceId } : {}; + const sourceIdParam = typeof p.source_id === 'string' ? p.source_id : undefined; + const sourceOpts = sourceIdParam + ? sourceScopeOptsForRequest(ctx, sourceIdParam, { allowAll: false }) + : (ctx.sourceId ? { sourceId: ctx.sourceId } : {}); let page = await ctx.engine.getPage(slug, { includeDeleted, ...sourceOpts }); let resolved_slug: string | undefined; @@ -1340,15 +1381,11 @@ const query: Operation = { typeof p.embedding_column === 'string' && p.embedding_column.length > 0 ? (p.embedding_column as string) : undefined; - // Explicit per-call source_id must win over ctx.sourceId. The special - // __all__ value opts out of source filtering for local cross-source search. + // Explicit per-call source_id is allowed only inside the caller's read + // scope for remote OAuth clients. For local CLI callers, preserve the + // historical override semantics including `__all__`. const sourceIdParam = typeof p.source_id === 'string' ? p.source_id : undefined; - const querySourceScope = - sourceIdParam !== undefined - ? sourceIdParam === '__all__' - ? {} - : { sourceId: sourceIdParam } - : sourceScopeOpts(ctx); + const querySourceScope = sourceScopeOptsForRequest(ctx, sourceIdParam, { allowAll: true }); // v0.27.1: image-similarity branch. Bypasses hybridSearch (which is // text-only); embeds the image via embedMultimodal and runs a direct diff --git a/src/core/pglite-engine.ts b/src/core/pglite-engine.ts index a507c8393..a90c6f927 100644 --- a/src/core/pglite-engine.ts +++ b/src/core/pglite-engine.ts @@ -3530,9 +3530,11 @@ export class PGLiteEngine implements BrainEngine { async searchTakes( query: string, - opts: { limit?: number; takesHoldersAllowList?: string[] } = {}, + opts: SearchOpts & { takesHoldersAllowList?: string[] } = {}, ): Promise { const limit = clampSearchLimit(opts.limit, 30, 100); + const scopedSourceIds = opts.sourceIds && opts.sourceIds.length > 0 ? opts.sourceIds : null; + const scopedSourceId = scopedSourceIds ? null : (opts.sourceId ?? null); const { rows } = await this.db.query( `SELECT t.id AS take_id, t.page_id, p.slug AS page_slug, t.row_num, t.claim, t.kind, t.holder, t.weight, @@ -3542,18 +3544,22 @@ export class PGLiteEngine implements BrainEngine { WHERE t.active AND t.claim % $1 AND ($2::text[] IS NULL OR t.holder = ANY($2::text[])) + AND ($4::text[] IS NULL OR p.source_id = ANY($4::text[])) + AND ($5::text IS NULL OR p.source_id = $5) ORDER BY score DESC, t.weight DESC LIMIT $3`, - [query, opts.takesHoldersAllowList ?? null, limit] + [query, opts.takesHoldersAllowList ?? null, limit, scopedSourceIds, scopedSourceId] ); return rows as unknown as TakeHit[]; } async searchTakesVector( embedding: Float32Array, - opts: { limit?: number; takesHoldersAllowList?: string[] } = {}, + opts: SearchOpts & { takesHoldersAllowList?: string[] } = {}, ): Promise { const limit = clampSearchLimit(opts.limit, 30, 100); + const scopedSourceIds = opts.sourceIds && opts.sourceIds.length > 0 ? opts.sourceIds : null; + const scopedSourceId = scopedSourceIds ? null : (opts.sourceId ?? null); const vec = `[${Array.from(embedding).join(',')}]`; const { rows } = await this.db.query( `SELECT t.id AS take_id, t.page_id, p.slug AS page_slug, t.row_num, @@ -3564,9 +3570,11 @@ export class PGLiteEngine implements BrainEngine { WHERE t.active AND t.embedding IS NOT NULL AND ($2::text[] IS NULL OR t.holder = ANY($2::text[])) + AND ($4::text[] IS NULL OR p.source_id = ANY($4::text[])) + AND ($5::text IS NULL OR p.source_id = $5) ORDER BY t.embedding <=> $1::vector LIMIT $3`, - [vec, opts.takesHoldersAllowList ?? null, limit] + [vec, opts.takesHoldersAllowList ?? null, limit, scopedSourceIds, scopedSourceId] ); return rows as unknown as TakeHit[]; } diff --git a/src/core/postgres-engine.ts b/src/core/postgres-engine.ts index cb15de3b6..92ce6ca65 100644 --- a/src/core/postgres-engine.ts +++ b/src/core/postgres-engine.ts @@ -3544,6 +3544,8 @@ export class PostgresEngine implements BrainEngine { async searchTakes(query: string, opts: SearchOpts & { takesHoldersAllowList?: string[] } = {}): Promise { const sql = this.sql; const limit = clampSearchLimit(opts.limit, 30, 100); + const scopedSourceIds = opts.sourceIds && opts.sourceIds.length > 0 ? opts.sourceIds : null; + const scopedSourceId = scopedSourceIds ? null : (opts.sourceId ?? null); const rows = await sql` SELECT t.id AS take_id, t.page_id, p.slug AS page_slug, t.row_num, t.claim, t.kind, t.holder, t.weight, @@ -3556,6 +3558,14 @@ export class PostgresEngine implements BrainEngine { ${opts.takesHoldersAllowList ?? null}::text[] IS NULL OR t.holder = ANY(${opts.takesHoldersAllowList ?? null}::text[]) ) + AND ( + ${scopedSourceIds}::text[] IS NULL + OR p.source_id = ANY(${scopedSourceIds}::text[]) + ) + AND ( + ${scopedSourceId}::text IS NULL + OR p.source_id = ${scopedSourceId} + ) ORDER BY score DESC, t.weight DESC LIMIT ${limit} `; @@ -3568,6 +3578,8 @@ export class PostgresEngine implements BrainEngine { ): Promise { const sql = this.sql; const limit = clampSearchLimit(opts.limit, 30, 100); + const scopedSourceIds = opts.sourceIds && opts.sourceIds.length > 0 ? opts.sourceIds : null; + const scopedSourceId = scopedSourceIds ? null : (opts.sourceId ?? null); const vec = `[${Array.from(embedding).join(',')}]`; const rows = await sql` SELECT t.id AS take_id, t.page_id, p.slug AS page_slug, t.row_num, @@ -3581,6 +3593,14 @@ export class PostgresEngine implements BrainEngine { ${opts.takesHoldersAllowList ?? null}::text[] IS NULL OR t.holder = ANY(${opts.takesHoldersAllowList ?? null}::text[]) ) + AND ( + ${scopedSourceIds}::text[] IS NULL + OR p.source_id = ANY(${scopedSourceIds}::text[]) + ) + AND ( + ${scopedSourceId}::text IS NULL + OR p.source_id = ${scopedSourceId} + ) ORDER BY t.embedding <=> ${vec}::vector LIMIT ${limit} `; diff --git a/src/core/think/gather.ts b/src/core/think/gather.ts index 11fcc3d1b..03eb7efb0 100644 --- a/src/core/think/gather.ts +++ b/src/core/think/gather.ts @@ -34,6 +34,12 @@ export interface ThinkGatherOpts { questionEmbedding?: Float32Array; /** When set, MCP-bound calls forward this allow-list to takes_search. Local CLI leaves unset. */ takesHoldersAllowList?: string[]; + /** Source scope for single-source callers. Federated array wins when both are present. */ + sourceId?: string; + /** Federated read source scope for OAuth callers. */ + allowedSources?: string[]; + /** Remote trust marker forwarded by operation handlers. */ + remote?: boolean; } export interface ThinkGatherResult { @@ -105,11 +111,17 @@ export async function runGather( // Sanitize the question for any path that includes it in an LLM prompt. // (Direct DB search is fine — those are parameterized queries.) const sanitizedQuestion = sanitizeQueryForPrompt(opts.question); + const sourceScope = opts.allowedSources && opts.allowedSources.length > 0 + ? { sourceIds: opts.allowedSources } + : opts.sourceId + ? { sourceId: opts.sourceId } + : {}; // Stream 1: hybrid page search (existing primitive). const pagesPromise = hybridSearch(engine, opts.question, { limit: gatherLimit, expansion: false, // think provides its own anchor + graph context; no need for re-expansion + ...sourceScope, }).catch((e) => { process.stderr.write(`[think.gather] hybrid stream failed: ${(e as Error).message}\n`); return [] as SearchResult[]; @@ -119,6 +131,7 @@ export async function runGather( const takesKwPromise = engine.searchTakes(opts.question, { limit: takesLimit, takesHoldersAllowList: opts.takesHoldersAllowList, + ...sourceScope, }).catch((e) => { process.stderr.write(`[think.gather] takes-keyword stream failed: ${(e as Error).message}\n`); return [] as TakeHit[]; @@ -129,6 +142,7 @@ export async function runGather( ? engine.searchTakesVector(opts.questionEmbedding, { limit: takesLimit, takesHoldersAllowList: opts.takesHoldersAllowList, + ...sourceScope, }).catch((e) => { process.stderr.write(`[think.gather] takes-vector stream failed: ${(e as Error).message}\n`); return [] as TakeHit[]; @@ -137,7 +151,7 @@ export async function runGather( // Stream 4: graph walk (anchor only). const graphPromise: Promise = opts.anchor - ? engine.traversePaths(opts.anchor, { depth: graphDepth, direction: 'both' }) + ? engine.traversePaths(opts.anchor, { depth: graphDepth, direction: 'both', ...sourceScope }) .then(paths => { const slugs = new Set([opts.anchor!]); for (const p of paths) { diff --git a/src/core/think/index.ts b/src/core/think/index.ts index cae87182b..be26482fb 100644 --- a/src/core/think/index.ts +++ b/src/core/think/index.ts @@ -239,6 +239,9 @@ export async function runThink( anchor: opts.anchor, questionEmbedding, takesHoldersAllowList: opts.takesHoldersAllowList, + sourceId: opts.sourceId, + allowedSources: opts.allowedSources, + remote: opts.remote, }); // Render evidence blocks for the prompt diff --git a/test/e2e/source-isolation-pglite.test.ts b/test/e2e/source-isolation-pglite.test.ts index 71567d87a..f60208895 100644 --- a/test/e2e/source-isolation-pglite.test.ts +++ b/test/e2e/source-isolation-pglite.test.ts @@ -84,6 +84,23 @@ beforeEach(async () => { chunk_source: 'compiled_truth', token_count: 11, }], { sourceId: 'src-b' }); + + await engine.executeRaw( + `INSERT INTO sources (id, name, config) VALUES ('shared', 'shared', '{}'::jsonb) ON CONFLICT DO NOTHING`, + ); + await engine.putPage('docs/shared-only', { + type: 'guide', + title: 'Shared Only', + compiled_truth: 'Shared-only canonical content. Important context here.', + timeline: '', + frontmatter: {}, + }, { sourceId: 'shared' }); + await engine.upsertChunks('docs/shared-only', [{ + chunk_index: 0, + chunk_text: 'Shared-only canonical content. Important context here.', + chunk_source: 'compiled_truth', + token_count: 7, + }], { sourceId: 'shared' }); }); describe('v0.34.1 source-isolation regression (#861)', () => { @@ -264,4 +281,115 @@ describe('v0.34.1 source-isolation regression (#861)', () => { expect(r.source_id).toBe('default'); } }); + + test('query source_id override rejects unauthorized remote source', async () => { + const { operations } = await import('../../src/core/operations.ts'); + const queryOp = operations.find(o => o.name === 'query'); + const ctx = { + engine, + config: { engine: 'pglite' as const }, + logger: { info: () => {}, warn: () => {}, error: () => {} }, + dryRun: false, + remote: true, + sourceId: 'default', + auth: { + token: 'test', + clientId: 'test', + scopes: ['read'], + sourceId: 'default', + allowedSources: ['default'], + }, + }; + await expect( + queryOp!.handler(ctx as any, { query: 'Bob lives only', source_id: 'src-b' }), + ).rejects.toThrow('Requested source is outside caller read scope'); + }); + + test('query source_id=__all__ clamps to allowed sources for remote callers', async () => { + const { operations } = await import('../../src/core/operations.ts'); + const queryOp = operations.find(o => o.name === 'query'); + const ctx = { + engine, + config: { engine: 'pglite' as const }, + logger: { info: () => {}, warn: () => {}, error: () => {} }, + dryRun: false, + remote: true, + sourceId: 'default', + auth: { + token: 'test', + clientId: 'test', + scopes: ['read'], + sourceId: 'default', + allowedSources: ['default'], + }, + }; + const result = await queryOp!.handler(ctx as any, { query: 'Important context', source_id: '__all__' }); + const rows = result as Array<{ source_id?: string }>; + expect(rows.length).toBeGreaterThan(0); + for (const r of rows) { + expect(r.source_id).toBe('default'); + } + }); + + test('get_page can read an explicitly allowed shared source', async () => { + const { operations } = await import('../../src/core/operations.ts'); + const getPageOp = operations.find(o => o.name === 'get_page'); + const ctx = { + engine, + config: { engine: 'pglite' as const }, + logger: { info: () => {}, warn: () => {}, error: () => {} }, + dryRun: false, + remote: true, + sourceId: 'default', + auth: { + token: 'test', + clientId: 'test', + scopes: ['read'], + sourceId: 'default', + allowedSources: ['default', 'shared'], + }, + }; + const page = await getPageOp!.handler(ctx as any, { slug: 'docs/shared-only', source_id: 'shared' }); + expect((page as { source_id?: string }).source_id).toBe('shared'); + expect((page as { title?: string }).title).toBe('Shared Only'); + }); + + test('get_page rejects an explicit unauthorized source', async () => { + const { operations } = await import('../../src/core/operations.ts'); + const getPageOp = operations.find(o => o.name === 'get_page'); + const ctx = { + engine, + config: { engine: 'pglite' as const }, + logger: { info: () => {}, warn: () => {}, error: () => {} }, + dryRun: false, + remote: true, + sourceId: 'default', + auth: { + token: 'test', + clientId: 'test', + scopes: ['read'], + sourceId: 'default', + allowedSources: ['default', 'shared'], + }, + }; + await expect( + getPageOp!.handler(ctx as any, { slug: 'people/bob', source_id: 'src-b' }), + ).rejects.toThrow('Requested source is outside caller read scope'); + }); + + test('think gather applies federated source scope to page, take, and graph streams', async () => { + const { runGather } = await import('../../src/core/think/gather.ts'); + const result = await runGather(engine, { + question: 'Important context', + gatherLimit: 20, + allowedSources: ['default'], + remote: true, + }); + for (const p of result.pages as Array<{ source_id?: string }>) { + expect(p.source_id).toBe('default'); + } + for (const t of result.takes as Array<{ source_id?: string }>) { + if (t.source_id) expect(t.source_id).toBe('default'); + } + }); }); From aacac3023fb7108aa41a3889d0c354f27c659154 Mon Sep 17 00:00:00 2001 From: jlfetter1 <71606450+jlfetter1@users.noreply.github.com> Date: Sun, 24 May 2026 13:06:36 -0400 Subject: [PATCH 2/3] Audit source scope decisions --- src/commands/serve-http.ts | 86 +++++++++++++++++++++++++++++++++++--- 1 file changed, 80 insertions(+), 6 deletions(-) diff --git a/src/commands/serve-http.ts b/src/commands/serve-http.ts index 7d2ee1e1e..34eb0ede4 100644 --- a/src/commands/serve-http.ts +++ b/src/commands/serve-http.ts @@ -52,6 +52,78 @@ import { */ export const HEALTH_TIMEOUT_MS = 3000; +type SourceScopeAuditOutcome = + | 'default_allowed_scope' + | 'default_caller_source' + | 'explicit_allowed' + | 'clamped_to_allowed_sources' + | 'rejected_unauthorized' + | 'rejected_ambiguous_all'; + +interface SourceScopeAudit { + caller_source_id: string; + requested_source_id: string | null; + allowed_sources: string[]; + effective_source_ids: string[]; + outcome: SourceScopeAuditOutcome; +} + +function buildSourceScopeAudit( + operation: string, + params: unknown, + authInfo: AuthInfo, + tokenSourceId: string, +): SourceScopeAudit | null { + const sourceScopedOps = new Set(['query', 'search', 'list_pages', 'get_page', 'think']); + if (!sourceScopedOps.has(operation)) return null; + + const allowedSources = authInfo.allowedSources?.length + ? authInfo.allowedSources + : [tokenSourceId]; + const requested = params && typeof params === 'object' && !Array.isArray(params) + ? (params as Record).source_id + : undefined; + const requestedSourceId = typeof requested === 'string' ? requested : null; + + if (!requestedSourceId) { + const effective = operation === 'get_page' ? [tokenSourceId] : allowedSources; + return { + caller_source_id: tokenSourceId, + requested_source_id: null, + allowed_sources: allowedSources, + effective_source_ids: effective, + outcome: operation === 'get_page' ? 'default_caller_source' : 'default_allowed_scope', + }; + } + + if (requestedSourceId === '__all__') { + return { + caller_source_id: tokenSourceId, + requested_source_id: requestedSourceId, + allowed_sources: allowedSources, + effective_source_ids: operation === 'get_page' ? [] : allowedSources, + outcome: operation === 'get_page' ? 'rejected_ambiguous_all' : 'clamped_to_allowed_sources', + }; + } + + const allowed = allowedSources.includes(requestedSourceId); + return { + caller_source_id: tokenSourceId, + requested_source_id: requestedSourceId, + allowed_sources: allowedSources, + effective_source_ids: allowed ? [requestedSourceId] : [], + outcome: allowed ? 'explicit_allowed' : 'rejected_unauthorized', + }; +} + +function attachSourceScopeAudit(paramsSummary: unknown, audit: SourceScopeAudit | null): unknown { + if (!audit) return paramsSummary; + if (paramsSummary && typeof paramsSummary === 'object' && !Array.isArray(paramsSummary)) { + return { ...(paramsSummary as Record), source_scope: audit }; + } + return { source_scope: audit }; +} + /** * v0.36.1.x #1024: bootstrap token resolution. * @@ -1387,12 +1459,6 @@ export async function runServeHttp(engine: BrainEngine, options: ServeHttpOption // cast, so reads return real objects and `params->>'op'` returns // 'tools/list'. Pre-existing string-shaped rows are normalized by // migration v41 in src/core/migrate.ts. - const safeParamsSummary = summarizeMcpParams(name, params); - const logParamsObj: unknown = logFullParams - ? (params || null) - : (safeParamsSummary || null); - const broadcastParams = logFullParams ? (params || {}) : safeParamsSummary; - // v0.31 (D12 / eE1): refactor the inlined op.handler call to go through // src/mcp/dispatch.ts so HTTP MCP shares the same dispatch path as // stdio MCP. The dispatcher does param validation, OperationContext @@ -1410,6 +1476,14 @@ export async function runServeHttp(engine: BrainEngine, options: ServeHttpOption // has source_id set; legacy bearer tokens default to 'default' in // verifyAccessToken. The env-fallback is gone. const tokenSourceId = authInfo.sourceId ?? 'default'; + const sourceScopeAudit = buildSourceScopeAudit(name, params, authInfo, tokenSourceId); + const safeParamsSummary = summarizeMcpParams(name, params); + const logParamsObj: unknown = logFullParams + ? attachSourceScopeAudit(params || null, sourceScopeAudit) + : attachSourceScopeAudit(safeParamsSummary || null, sourceScopeAudit); + const broadcastParams = logFullParams + ? attachSourceScopeAudit(params || {}, sourceScopeAudit) + : attachSourceScopeAudit(safeParamsSummary, sourceScopeAudit); let toolResult: Awaited>; try { From cad7ed9127c0a1274a7d01bfcfb6e2011a01ed75 Mon Sep 17 00:00:00 2001 From: jlfetter1 <71606450+jlfetter1@users.noreply.github.com> Date: Sun, 24 May 2026 21:20:59 -0400 Subject: [PATCH 3/3] Bind ingest source to OAuth client --- src/commands/serve-http.ts | 27 ++++++- test/e2e/serve-http-ingest-webhook.test.ts | 9 ++- test/e2e/source-isolation-pglite.test.ts | 87 ++++++++++++++++++++++ test/ingestion/ingest-source-scope.test.ts | 39 ++++++++++ 4 files changed, 155 insertions(+), 7 deletions(-) create mode 100644 test/ingestion/ingest-source-scope.test.ts diff --git a/src/commands/serve-http.ts b/src/commands/serve-http.ts index 34eb0ede4..cf671c918 100644 --- a/src/commands/serve-http.ts +++ b/src/commands/serve-http.ts @@ -68,6 +68,14 @@ interface SourceScopeAudit { outcome: SourceScopeAuditOutcome; } +export function resolveIngestSourceId(authInfo: AuthInfo, requestedSourceId?: string | null): string { + const tokenSourceId = authInfo.sourceId || 'default'; + const requested = requestedSourceId?.trim(); + if (!requested) return tokenSourceId; + if (requested === tokenSourceId) return tokenSourceId; + throw new Error('Requested ingest source is outside caller write scope'); +} + function buildSourceScopeAudit( operation: string, params: unknown, @@ -1617,9 +1625,11 @@ export async function runServeHttp(engine: BrainEngine, options: ServeHttpOption // (file-watcher, inbox-folder, cron-scheduler) while serve --http hosts // the network surface and submits Minion jobs directly. // - // Auth: existing OAuth `write` scope. Rate limit: 100 events / 10s per - // IP (reuses the IP-keyed pattern from ccRateLimiter; a future tweak - // could key on authInfo.clientId for fairer per-agent fairness). + // Auth: existing OAuth `write` scope. Events are bound to the OAuth + // client's source_id; X-Gbrain-Source-Id may only repeat that value and + // cannot widen write authority. Rate limit: 100 events / 10s per IP + // (reuses the IP-keyed pattern from ccRateLimiter; a future tweak could + // key on authInfo.clientId for fairer per-agent fairness). // Payload cap: 1 MB default. Content-type allowlist: markdown, plain, // HTML, JSON. Binary content is REJECTED with HTTP 415 in v1 — the // binary-upload flow ships as a separate route in a later wave when @@ -1753,7 +1763,16 @@ export async function runServeHttp(engine: BrainEngine, options: ServeHttpOption const content = body.toString('utf8'); const contentHash = computeContentHash(content); const sourceUri = (req.header('x-gbrain-source-uri') || `mcp-webhook:${authInfo.clientId}:${Date.now()}`).slice(0, 1024); - const sourceId = (req.header('x-gbrain-source-id') || `webhook-${authInfo.clientId}`).slice(0, 256); + let sourceId: string; + try { + sourceId = resolveIngestSourceId(authInfo, req.header('x-gbrain-source-id')).slice(0, 256); + } catch (err) { + res.status(403).json({ + error: 'permission_denied', + message: err instanceof Error ? err.message : 'Requested ingest source is outside caller write scope', + }); + return; + } const callerSlug = req.header('x-gbrain-slug'); const event: IngestionEvent = { diff --git a/test/e2e/serve-http-ingest-webhook.test.ts b/test/e2e/serve-http-ingest-webhook.test.ts index 70d514ec5..e21f6d91d 100644 --- a/test/e2e/serve-http-ingest-webhook.test.ts +++ b/test/e2e/serve-http-ingest-webhook.test.ts @@ -20,7 +20,7 @@ * processor-skillpack hint * 4. Happy path: text/markdown → 200/202 with job_id in response * 5. Header overrides: X-Gbrain-Slug is forwarded; X-Gbrain-Source-Id - * tags the event + * is accepted only when it matches the OAuth client's bound source * 6. Idempotency: same content + same client → job_id returned twice * should match (queue dedup on (client_id, content_hash)) * @@ -332,7 +332,7 @@ describeE2E('serve-http POST /ingest webhook (v0.38)', () => { // test/ingestion/ingest-capture.test.ts). }); - test('X-Gbrain-Source-Id header is accepted', async () => { + test('X-Gbrain-Source-Id header cannot override the OAuth client source', async () => { const token = await mintToken('read write'); const res = await postIngest( token, @@ -340,7 +340,10 @@ describeE2E('serve-http POST /ingest webhook (v0.38)', () => { '# source-id header test', { 'X-Gbrain-Source-Id': 'zapier-webhook' }, ); - expect([200, 202]).toContain(res.status); + expect(res.status).toBe(403); + const body = (await res.json()) as { error?: string; message?: string }; + expect(body.error).toBe('permission_denied'); + expect(body.message).toContain('outside caller write scope'); }); test('X-Gbrain-Source-Uri header is accepted', async () => { diff --git a/test/e2e/source-isolation-pglite.test.ts b/test/e2e/source-isolation-pglite.test.ts index f60208895..3c0bf5d37 100644 --- a/test/e2e/source-isolation-pglite.test.ts +++ b/test/e2e/source-isolation-pglite.test.ts @@ -88,6 +88,9 @@ beforeEach(async () => { await engine.executeRaw( `INSERT INTO sources (id, name, config) VALUES ('shared', 'shared', '{}'::jsonb) ON CONFLICT DO NOTHING`, ); + await engine.executeRaw( + `INSERT INTO sources (id, name, config) VALUES ('capture-events', 'capture-events', '{}'::jsonb) ON CONFLICT DO NOTHING`, + ); await engine.putPage('docs/shared-only', { type: 'guide', title: 'Shared Only', @@ -101,6 +104,20 @@ beforeEach(async () => { chunk_source: 'compiled_truth', token_count: 7, }], { sourceId: 'shared' }); + + await engine.putPage('events/ambient-capture-only', { + type: 'event', + title: 'Ambient Capture Only', + compiled_truth: 'Ambient capture event stream should stay isolated. Important context here.', + timeline: '', + frontmatter: {}, + }, { sourceId: 'capture-events' }); + await engine.upsertChunks('events/ambient-capture-only', [{ + chunk_index: 0, + chunk_text: 'Ambient capture event stream should stay isolated. Important context here.', + chunk_source: 'compiled_truth', + token_count: 10, + }], { sourceId: 'capture-events' }); }); describe('v0.34.1 source-isolation regression (#861)', () => { @@ -392,4 +409,74 @@ describe('v0.34.1 source-isolation regression (#861)', () => { if (t.source_id) expect(t.source_id).toBe('default'); } }); + + test('automated capture OAuth writer cannot widen read scope with source overrides', async () => { + const { operations } = await import('../../src/core/operations.ts'); + const queryOp = operations.find(o => o.name === 'query'); + const getPageOp = operations.find(o => o.name === 'get_page'); + const ctx = { + engine, + config: { engine: 'pglite' as const }, + logger: { info: () => {}, warn: () => {}, error: () => {} }, + dryRun: false, + remote: true, + sourceId: 'capture-events', + auth: { + token: 'test', + clientId: 'simon-ambient-capture', + scopes: ['read', 'write'], + sourceId: 'capture-events', + allowedSources: ['capture-events'], + }, + }; + + await expect( + queryOp!.handler(ctx as any, { query: 'Shared-only canonical', source_id: 'shared' }), + ).rejects.toThrow('Requested source is outside caller read scope'); + await expect( + getPageOp!.handler(ctx as any, { slug: 'docs/shared-only', source_id: 'shared' }), + ).rejects.toThrow('Requested source is outside caller read scope'); + + const allResult = await queryOp!.handler(ctx as any, { + query: 'Important context', + source_id: '__all__', + limit: 20, + }); + for (const r of allResult as Array<{ source_id?: string }>) { + expect(r.source_id).toBe('capture-events'); + } + }); + + test('other OAuth clients cannot read an automated capture source unless explicitly granted', async () => { + const { operations } = await import('../../src/core/operations.ts'); + const searchOp = operations.find(o => o.name === 'search'); + const getPageOp = operations.find(o => o.name === 'get_page'); + const ctx = { + engine, + config: { engine: 'pglite' as const }, + logger: { info: () => {}, warn: () => {}, error: () => {} }, + dryRun: false, + remote: true, + sourceId: 'default', + auth: { + token: 'test', + clientId: 'jarvis-openclaw', + scopes: ['read', 'write'], + sourceId: 'default', + allowedSources: ['default', 'shared'], + }, + }; + + const searchResult = await searchOp!.handler(ctx as any, { + query: 'Ambient capture event stream', + limit: 20, + }); + for (const r of searchResult as Array<{ source_id?: string }>) { + expect(r.source_id).not.toBe('capture-events'); + } + + await expect( + getPageOp!.handler(ctx as any, { slug: 'events/ambient-capture-only', source_id: 'capture-events' }), + ).rejects.toThrow('Requested source is outside caller read scope'); + }); }); diff --git a/test/ingestion/ingest-source-scope.test.ts b/test/ingestion/ingest-source-scope.test.ts new file mode 100644 index 000000000..ce15cb115 --- /dev/null +++ b/test/ingestion/ingest-source-scope.test.ts @@ -0,0 +1,39 @@ +import { describe, expect, test } from 'bun:test'; +import { resolveIngestSourceId } from '../../src/commands/serve-http.ts'; +import type { AuthInfo } from '../../src/core/operations.ts'; + +describe('POST /ingest source scope resolution', () => { + const auth: AuthInfo = { + token: 'test', + clientId: 'simon-ambient-capture', + scopes: ['read', 'write'], + sourceId: 'capture-events', + allowedSources: ['capture-events'], + }; + + test('defaults writes to the OAuth client source', () => { + expect(resolveIngestSourceId(auth)).toBe('capture-events'); + }); + + test('accepts an explicit source only when it matches the OAuth client source', () => { + expect(resolveIngestSourceId(auth, 'capture-events')).toBe('capture-events'); + }); + + test('rejects source header attempts to write into another source', () => { + expect(() => resolveIngestSourceId(auth, 'shared')).toThrow( + 'Requested ingest source is outside caller write scope', + ); + }); + + test('legacy auth without source falls back to default, not caller-controlled source', () => { + const legacyAuth: AuthInfo = { + token: 'test', + clientId: 'legacy-client', + scopes: ['read', 'write'], + }; + expect(resolveIngestSourceId(legacyAuth)).toBe('default'); + expect(() => resolveIngestSourceId(legacyAuth, 'shared')).toThrow( + 'Requested ingest source is outside caller write scope', + ); + }); +});