Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 103 additions & 10 deletions src/commands/serve-http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,86 @@ import {
*/
export const HEALTH_TIMEOUT_MS = 3000;

type SourceScopeAuditOutcome =
| 'default_allowed_scope'
| 'default_caller_source'
| 'explicit_allowed'
| 'clamped_to_allowed_sources'
| 'rejected_unauthorized'
| 'rejected_ambiguous_all';

interface SourceScopeAudit {
caller_source_id: string;
requested_source_id: string | null;
allowed_sources: string[];
effective_source_ids: string[];
outcome: SourceScopeAuditOutcome;
}

export function resolveIngestSourceId(authInfo: AuthInfo, requestedSourceId?: string | null): string {
const tokenSourceId = authInfo.sourceId || 'default';
const requested = requestedSourceId?.trim();
if (!requested) return tokenSourceId;
if (requested === tokenSourceId) return tokenSourceId;
throw new Error('Requested ingest source is outside caller write scope');
}

function buildSourceScopeAudit(
operation: string,
params: unknown,
authInfo: AuthInfo,
tokenSourceId: string,
): SourceScopeAudit | null {
const sourceScopedOps = new Set(['query', 'search', 'list_pages', 'get_page', 'think']);
if (!sourceScopedOps.has(operation)) return null;

const allowedSources = authInfo.allowedSources?.length
? authInfo.allowedSources
: [tokenSourceId];
const requested = params && typeof params === 'object' && !Array.isArray(params)
? (params as Record<string, unknown>).source_id
: undefined;
const requestedSourceId = typeof requested === 'string' ? requested : null;

if (!requestedSourceId) {
const effective = operation === 'get_page' ? [tokenSourceId] : allowedSources;
return {
caller_source_id: tokenSourceId,
requested_source_id: null,
allowed_sources: allowedSources,
effective_source_ids: effective,
outcome: operation === 'get_page' ? 'default_caller_source' : 'default_allowed_scope',
};
}

if (requestedSourceId === '__all__') {
return {
caller_source_id: tokenSourceId,
requested_source_id: requestedSourceId,
allowed_sources: allowedSources,
effective_source_ids: operation === 'get_page' ? [] : allowedSources,
outcome: operation === 'get_page' ? 'rejected_ambiguous_all' : 'clamped_to_allowed_sources',
};
}

const allowed = allowedSources.includes(requestedSourceId);
return {
caller_source_id: tokenSourceId,
requested_source_id: requestedSourceId,
allowed_sources: allowedSources,
effective_source_ids: allowed ? [requestedSourceId] : [],
outcome: allowed ? 'explicit_allowed' : 'rejected_unauthorized',
};
}

function attachSourceScopeAudit(paramsSummary: unknown, audit: SourceScopeAudit | null): unknown {
if (!audit) return paramsSummary;
if (paramsSummary && typeof paramsSummary === 'object' && !Array.isArray(paramsSummary)) {
return { ...(paramsSummary as Record<string, unknown>), source_scope: audit };
}
return { source_scope: audit };
}

/**
* v0.36.1.x #1024: bootstrap token resolution.
*
Expand Down Expand Up @@ -1387,12 +1467,6 @@ export async function runServeHttp(engine: BrainEngine, options: ServeHttpOption
// cast, so reads return real objects and `params->>'op'` returns
// 'tools/list'. Pre-existing string-shaped rows are normalized by
// migration v41 in src/core/migrate.ts.
const safeParamsSummary = summarizeMcpParams(name, params);
const logParamsObj: unknown = logFullParams
? (params || null)
: (safeParamsSummary || null);
const broadcastParams = logFullParams ? (params || {}) : safeParamsSummary;

// v0.31 (D12 / eE1): refactor the inlined op.handler call to go through
// src/mcp/dispatch.ts so HTTP MCP shares the same dispatch path as
// stdio MCP. The dispatcher does param validation, OperationContext
Expand All @@ -1410,6 +1484,14 @@ export async function runServeHttp(engine: BrainEngine, options: ServeHttpOption
// has source_id set; legacy bearer tokens default to 'default' in
// verifyAccessToken. The env-fallback is gone.
const tokenSourceId = authInfo.sourceId ?? 'default';
const sourceScopeAudit = buildSourceScopeAudit(name, params, authInfo, tokenSourceId);
const safeParamsSummary = summarizeMcpParams(name, params);
const logParamsObj: unknown = logFullParams
? attachSourceScopeAudit(params || null, sourceScopeAudit)
: attachSourceScopeAudit(safeParamsSummary || null, sourceScopeAudit);
const broadcastParams = logFullParams
? attachSourceScopeAudit(params || {}, sourceScopeAudit)
: attachSourceScopeAudit(safeParamsSummary, sourceScopeAudit);

let toolResult: Awaited<ReturnType<typeof dispatchToolCall>>;
try {
Expand Down Expand Up @@ -1543,9 +1625,11 @@ export async function runServeHttp(engine: BrainEngine, options: ServeHttpOption
// (file-watcher, inbox-folder, cron-scheduler) while serve --http hosts
// the network surface and submits Minion jobs directly.
//
// Auth: existing OAuth `write` scope. Rate limit: 100 events / 10s per
// IP (reuses the IP-keyed pattern from ccRateLimiter; a future tweak
// could key on authInfo.clientId for fairer per-agent fairness).
// Auth: existing OAuth `write` scope. Events are bound to the OAuth
// client's source_id; X-Gbrain-Source-Id may only repeat that value and
// cannot widen write authority. Rate limit: 100 events / 10s per IP
// (reuses the IP-keyed pattern from ccRateLimiter; a future tweak could
// key on authInfo.clientId for fairer per-agent fairness).
// Payload cap: 1 MB default. Content-type allowlist: markdown, plain,
// HTML, JSON. Binary content is REJECTED with HTTP 415 in v1 — the
// binary-upload flow ships as a separate route in a later wave when
Expand Down Expand Up @@ -1679,7 +1763,16 @@ export async function runServeHttp(engine: BrainEngine, options: ServeHttpOption
const content = body.toString('utf8');
const contentHash = computeContentHash(content);
const sourceUri = (req.header('x-gbrain-source-uri') || `mcp-webhook:${authInfo.clientId}:${Date.now()}`).slice(0, 1024);
const sourceId = (req.header('x-gbrain-source-id') || `webhook-${authInfo.clientId}`).slice(0, 256);
let sourceId: string;
try {
sourceId = resolveIngestSourceId(authInfo, req.header('x-gbrain-source-id')).slice(0, 256);
} catch (err) {
res.status(403).json({
error: 'permission_denied',
message: err instanceof Error ? err.message : 'Requested ingest source is outside caller write scope',
});
return;
}
const callerSlug = req.header('x-gbrain-slug');

const event: IngestionEvent = {
Expand Down
65 changes: 51 additions & 14 deletions src/core/operations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,48 @@ export function sourceScopeOpts(ctx: OperationContext): { sourceId?: string; sou
return {};
}

export function sourceScopeOptsForRequest(
ctx: OperationContext,
requestedSourceId?: string,
opts: { allowAll?: boolean } = {},
): { sourceId?: string; sourceIds?: string[] } {
if (!requestedSourceId) return sourceScopeOpts(ctx);

const allowAll = opts.allowAll !== false;

if (ctx.remote === true) {
const allowed = ctx.auth?.allowedSources && ctx.auth.allowedSources.length > 0
? ctx.auth.allowedSources
: ctx.sourceId
? [ctx.sourceId]
: [];

if (requestedSourceId === '__all__') {
if (!allowAll) {
throw new OperationError('permission_denied', 'Requested source is outside caller read scope');
}
if (allowed.length === 0) {
throw new OperationError('permission_denied', 'Requested source is outside caller read scope');
}
return { sourceIds: allowed };
}

if (!allowed.includes(requestedSourceId)) {
throw new OperationError('permission_denied', 'Requested source is outside caller read scope');
}

return { sourceId: requestedSourceId };
}

if (requestedSourceId === '__all__') {
if (!allowAll) {
throw new OperationError('invalid_params', "source_id='__all__' is not supported for this operation");
}
return {};
}
return { sourceId: requestedSourceId };
}

export interface Operation {
name: string;
description: string;
Expand Down Expand Up @@ -456,17 +498,16 @@ const get_page: Operation = {
slug: { type: 'string', required: true, description: 'Page slug' },
fuzzy: { type: 'boolean', description: 'Enable fuzzy slug resolution (default: false)' },
include_deleted: { type: 'boolean', description: 'v0.26.5: surface soft-deleted pages with deleted_at populated (default: false). Used by restore workflows.' },
source_id: { type: 'string', description: "Read from a specific source within caller read scope. '__all__' is rejected because slug resolution is ambiguous across sources." },
},
handler: async (ctx, p) => {
const slug = p.slug as string;
const fuzzy = (p.fuzzy as boolean) || false;
const includeDeleted = (p.include_deleted as boolean) === true;
// v0.31.8 (D20): thread ctx.sourceId through read-side ops. Only pass
// sourceId when it's set on ctx — when unset (local CLI default chain
// resolves to no source), the engine two-branch query falls through to
// the cross-source view, preserving pre-v0.31.8 behavior. MCP callers
// (stdio + HTTP) populate ctx.sourceId via the transport layer.
const sourceOpts = ctx.sourceId ? { sourceId: ctx.sourceId } : {};
const sourceIdParam = typeof p.source_id === 'string' ? p.source_id : undefined;
const sourceOpts = sourceIdParam
? sourceScopeOptsForRequest(ctx, sourceIdParam, { allowAll: false })
: (ctx.sourceId ? { sourceId: ctx.sourceId } : {});

let page = await ctx.engine.getPage(slug, { includeDeleted, ...sourceOpts });
let resolved_slug: string | undefined;
Expand Down Expand Up @@ -1340,15 +1381,11 @@ const query: Operation = {
typeof p.embedding_column === 'string' && p.embedding_column.length > 0
? (p.embedding_column as string)
: undefined;
// Explicit per-call source_id must win over ctx.sourceId. The special
// __all__ value opts out of source filtering for local cross-source search.
// Explicit per-call source_id is allowed only inside the caller's read
// scope for remote OAuth clients. For local CLI callers, preserve the
// historical override semantics including `__all__`.
const sourceIdParam = typeof p.source_id === 'string' ? p.source_id : undefined;
const querySourceScope =
sourceIdParam !== undefined
? sourceIdParam === '__all__'
? {}
: { sourceId: sourceIdParam }
: sourceScopeOpts(ctx);
const querySourceScope = sourceScopeOptsForRequest(ctx, sourceIdParam, { allowAll: true });

// v0.27.1: image-similarity branch. Bypasses hybridSearch (which is
// text-only); embeds the image via embedMultimodal and runs a direct
Expand Down
16 changes: 12 additions & 4 deletions src/core/pglite-engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3530,9 +3530,11 @@ export class PGLiteEngine implements BrainEngine {

async searchTakes(
query: string,
opts: { limit?: number; takesHoldersAllowList?: string[] } = {},
opts: SearchOpts & { takesHoldersAllowList?: string[] } = {},
): Promise<TakeHit[]> {
const limit = clampSearchLimit(opts.limit, 30, 100);
const scopedSourceIds = opts.sourceIds && opts.sourceIds.length > 0 ? opts.sourceIds : null;
const scopedSourceId = scopedSourceIds ? null : (opts.sourceId ?? null);
const { rows } = await this.db.query(
`SELECT t.id AS take_id, t.page_id, p.slug AS page_slug, t.row_num,
t.claim, t.kind, t.holder, t.weight,
Expand All @@ -3542,18 +3544,22 @@ export class PGLiteEngine implements BrainEngine {
WHERE t.active
AND t.claim % $1
AND ($2::text[] IS NULL OR t.holder = ANY($2::text[]))
AND ($4::text[] IS NULL OR p.source_id = ANY($4::text[]))
AND ($5::text IS NULL OR p.source_id = $5)
ORDER BY score DESC, t.weight DESC
LIMIT $3`,
[query, opts.takesHoldersAllowList ?? null, limit]
[query, opts.takesHoldersAllowList ?? null, limit, scopedSourceIds, scopedSourceId]
);
return rows as unknown as TakeHit[];
}

async searchTakesVector(
embedding: Float32Array,
opts: { limit?: number; takesHoldersAllowList?: string[] } = {},
opts: SearchOpts & { takesHoldersAllowList?: string[] } = {},
): Promise<TakeHit[]> {
const limit = clampSearchLimit(opts.limit, 30, 100);
const scopedSourceIds = opts.sourceIds && opts.sourceIds.length > 0 ? opts.sourceIds : null;
const scopedSourceId = scopedSourceIds ? null : (opts.sourceId ?? null);
const vec = `[${Array.from(embedding).join(',')}]`;
const { rows } = await this.db.query(
`SELECT t.id AS take_id, t.page_id, p.slug AS page_slug, t.row_num,
Expand All @@ -3564,9 +3570,11 @@ export class PGLiteEngine implements BrainEngine {
WHERE t.active
AND t.embedding IS NOT NULL
AND ($2::text[] IS NULL OR t.holder = ANY($2::text[]))
AND ($4::text[] IS NULL OR p.source_id = ANY($4::text[]))
AND ($5::text IS NULL OR p.source_id = $5)
ORDER BY t.embedding <=> $1::vector
LIMIT $3`,
[vec, opts.takesHoldersAllowList ?? null, limit]
[vec, opts.takesHoldersAllowList ?? null, limit, scopedSourceIds, scopedSourceId]
);
return rows as unknown as TakeHit[];
}
Expand Down
20 changes: 20 additions & 0 deletions src/core/postgres-engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3544,6 +3544,8 @@ export class PostgresEngine implements BrainEngine {
async searchTakes(query: string, opts: SearchOpts & { takesHoldersAllowList?: string[] } = {}): Promise<TakeHit[]> {
const sql = this.sql;
const limit = clampSearchLimit(opts.limit, 30, 100);
const scopedSourceIds = opts.sourceIds && opts.sourceIds.length > 0 ? opts.sourceIds : null;
const scopedSourceId = scopedSourceIds ? null : (opts.sourceId ?? null);
const rows = await sql`
SELECT t.id AS take_id, t.page_id, p.slug AS page_slug, t.row_num,
t.claim, t.kind, t.holder, t.weight,
Expand All @@ -3556,6 +3558,14 @@ export class PostgresEngine implements BrainEngine {
${opts.takesHoldersAllowList ?? null}::text[] IS NULL
OR t.holder = ANY(${opts.takesHoldersAllowList ?? null}::text[])
)
AND (
${scopedSourceIds}::text[] IS NULL
OR p.source_id = ANY(${scopedSourceIds}::text[])
)
AND (
${scopedSourceId}::text IS NULL
OR p.source_id = ${scopedSourceId}
)
ORDER BY score DESC, t.weight DESC
LIMIT ${limit}
`;
Expand All @@ -3568,6 +3578,8 @@ export class PostgresEngine implements BrainEngine {
): Promise<TakeHit[]> {
const sql = this.sql;
const limit = clampSearchLimit(opts.limit, 30, 100);
const scopedSourceIds = opts.sourceIds && opts.sourceIds.length > 0 ? opts.sourceIds : null;
const scopedSourceId = scopedSourceIds ? null : (opts.sourceId ?? null);
const vec = `[${Array.from(embedding).join(',')}]`;
const rows = await sql`
SELECT t.id AS take_id, t.page_id, p.slug AS page_slug, t.row_num,
Expand All @@ -3581,6 +3593,14 @@ export class PostgresEngine implements BrainEngine {
${opts.takesHoldersAllowList ?? null}::text[] IS NULL
OR t.holder = ANY(${opts.takesHoldersAllowList ?? null}::text[])
)
AND (
${scopedSourceIds}::text[] IS NULL
OR p.source_id = ANY(${scopedSourceIds}::text[])
)
AND (
${scopedSourceId}::text IS NULL
OR p.source_id = ${scopedSourceId}
)
ORDER BY t.embedding <=> ${vec}::vector
LIMIT ${limit}
`;
Expand Down
16 changes: 15 additions & 1 deletion src/core/think/gather.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ export interface ThinkGatherOpts {
questionEmbedding?: Float32Array;
/** When set, MCP-bound calls forward this allow-list to takes_search. Local CLI leaves unset. */
takesHoldersAllowList?: string[];
/** Source scope for single-source callers. Federated array wins when both are present. */
sourceId?: string;
/** Federated read source scope for OAuth callers. */
allowedSources?: string[];
/** Remote trust marker forwarded by operation handlers. */
remote?: boolean;
}

export interface ThinkGatherResult {
Expand Down Expand Up @@ -105,11 +111,17 @@ export async function runGather(
// Sanitize the question for any path that includes it in an LLM prompt.
// (Direct DB search is fine — those are parameterized queries.)
const sanitizedQuestion = sanitizeQueryForPrompt(opts.question);
const sourceScope = opts.allowedSources && opts.allowedSources.length > 0
? { sourceIds: opts.allowedSources }
: opts.sourceId
? { sourceId: opts.sourceId }
: {};

// Stream 1: hybrid page search (existing primitive).
const pagesPromise = hybridSearch(engine, opts.question, {
limit: gatherLimit,
expansion: false, // think provides its own anchor + graph context; no need for re-expansion
...sourceScope,
}).catch((e) => {
process.stderr.write(`[think.gather] hybrid stream failed: ${(e as Error).message}\n`);
return [] as SearchResult[];
Expand All @@ -119,6 +131,7 @@ export async function runGather(
const takesKwPromise = engine.searchTakes(opts.question, {
limit: takesLimit,
takesHoldersAllowList: opts.takesHoldersAllowList,
...sourceScope,
}).catch((e) => {
process.stderr.write(`[think.gather] takes-keyword stream failed: ${(e as Error).message}\n`);
return [] as TakeHit[];
Expand All @@ -129,6 +142,7 @@ export async function runGather(
? engine.searchTakesVector(opts.questionEmbedding, {
limit: takesLimit,
takesHoldersAllowList: opts.takesHoldersAllowList,
...sourceScope,
}).catch((e) => {
process.stderr.write(`[think.gather] takes-vector stream failed: ${(e as Error).message}\n`);
return [] as TakeHit[];
Expand All @@ -137,7 +151,7 @@ export async function runGather(

// Stream 4: graph walk (anchor only).
const graphPromise: Promise<string[]> = opts.anchor
? engine.traversePaths(opts.anchor, { depth: graphDepth, direction: 'both' })
? engine.traversePaths(opts.anchor, { depth: graphDepth, direction: 'both', ...sourceScope })
.then(paths => {
const slugs = new Set<string>([opts.anchor!]);
for (const p of paths) {
Expand Down
Loading