Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions src/core/ai/dims.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,22 @@ export function dimsProviderOptions(
}
return undefined;
}
case 'composio-openai': {
// Composio proxies OpenAI's embeddings endpoint, so it honors the same
// dimensions contract as native OpenAI text-embedding-3 models.
if (modelId.startsWith('text-embedding-3')) {
if (isOpenAITextEmbedding3Model(modelId) && !isValidOpenAITextEmbedding3Dim(modelId, dims)) {
const max = maxOpenAITextEmbedding3Dim(modelId)!;
throw new AIConfigError(
`OpenAI model "${modelId}" supports embedding_dimensions in 1..${max}, got ${dims}.`,
`Set \`embedding_dimensions\` to a value between 1 and ${max} ` +
`(\`gbrain config set embedding_dimensions ${Math.min(1024, max)}\` is a common default).`,
);
}
return { openai: { dimensions: dims } };
}
return undefined;
}
case 'native-google': {
if (modelId.startsWith('gemini-embedding') || modelId === 'text-embedding-004') {
return { google: { outputDimensionality: dims } };
Expand Down
128 changes: 128 additions & 0 deletions src/core/ai/gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import { createGoogleGenerativeAI } from '@ai-sdk/google';
import { createAnthropic } from '@ai-sdk/anthropic';
import { createOpenAICompatible } from '@ai-sdk/openai-compatible';
import { z } from 'zod';
import { readFileSync } from 'fs';

import type {
AIGatewayConfig,
Expand Down Expand Up @@ -185,6 +186,7 @@ export class VoyageResponseTooLargeError extends Error {
* pinning the Voyage name. Unification is a follow-up cleanup.
*/
const MAX_ZEROENTROPY_RESPONSE_BYTES = 256 * 1024 * 1024;
const COMPOSIO_EMBEDDING_BATCH_SIZE = 20;

Comment on lines +189 to 190
export class ZeroEntropyResponseTooLargeError extends Error {
constructor(message: string) {
Expand Down Expand Up @@ -953,6 +955,12 @@ function instantiateEmbedding(recipe: Recipe, modelId: string, cfg: AIGatewayCon
throw new AIConfigError(
`Anthropic has no embedding model. Use openai or google for embeddings.`,
);
case 'composio-openai':
if (!cfg.env.COMPOSIO_CLI) throw new AIConfigError(
`Composio OpenAI embedding requires COMPOSIO_CLI.`,
recipe.setup_hint,
);
return { composioCli: cfg.env.COMPOSIO_CLI, modelId, env: cfg.env };
case 'openai-compatible': {
// D12=A: unified auth via Recipe.resolveAuth (or default).
const auth = applyResolveAuth(recipe, cfg, 'embedding');
Expand Down Expand Up @@ -985,6 +993,121 @@ function instantiateEmbedding(recipe: Recipe, modelId: string, cfg: AIGatewayCon
}
}

function parseComposioEnvelope(stdout: string): any {
const firstJson = stdout.indexOf('{');
if (firstJson < 0) {
throw new AIConfigError(
`Composio CLI returned no JSON envelope.`,
`Check COMPOSIO_CLI and run \`$COMPOSIO_CLI whoami\`.`,
);
}
const envelope = JSON.parse(stdout.slice(firstJson));
if (!envelope.successful) {
Comment on lines +996 to +1005
throw new AIConfigError(
`Composio OpenAI embeddings failed: ${envelope.error || 'unknown error'}`,
`Check the Composio OpenAI connection and retry.`,
);
}
if (envelope.storedInFile && envelope.outputFilePath) {
return JSON.parse(readFileSync(envelope.outputFilePath, 'utf-8'));
}
return envelope;
}

function extractComposioEmbeddingRows(payload: any): Array<{ index?: number; embedding: number[] }> {
const rows =
payload?.data?.data ??
payload?.data?.response?.data?.data ??
payload?.response?.data?.data ??
payload?.response?.data ??
payload?.data ??
[];
if (!Array.isArray(rows)) {
throw new AIConfigError(
`Composio OpenAI embeddings response did not include a data array.`,
`Inspect the Composio OPENAI_CREATE_EMBEDDINGS tool output schema.`,
);
}
return rows.slice().sort((a, b) => Number(a?.index ?? 0) - Number(b?.index ?? 0));
}

async function embedComposioOpenAISubBatch(
texts: string[],
modelId: string,
expectedDims: number,
composioCli: string,
env: Record<string, string | undefined>,
): Promise<Float32Array[]> {
const proc = Bun.spawn([
composioCli,
'execute',
'OPENAI_CREATE_EMBEDDINGS',
'-d',
JSON.stringify({
model: modelId,
input: texts,
dimensions: expectedDims,
encoding_format: 'float',
}),
], {
stdout: 'pipe',
stderr: 'pipe',
env,
});
Comment on lines +1041 to +1056
const [stdout, stderr, exitCode] = await Promise.all([
new Response(proc.stdout).text(),
new Response(proc.stderr).text(),
proc.exited,
]);
if (exitCode !== 0) {
throw new AITransientError(
`Composio CLI exited ${exitCode}: ${stderr.trim() || stdout.trim()}`,
`Check COMPOSIO_CLI and the Composio OpenAI connection.`,
);
}
const payload = parseComposioEnvelope(stdout);
const rows = extractComposioEmbeddingRows(payload);
if (rows.length !== texts.length) {
throw new AIConfigError(
`Composio OpenAI returned ${rows.length} embedding(s) for ${texts.length} input(s).`,
`Retry the import after checking provider health; partial embedding responses are not safe to index.`,
);
}
return rows.map(row => {
if (!Array.isArray(row.embedding)) {
throw new AIConfigError(
`Composio OpenAI embedding row did not include a numeric embedding array.`,
`Inspect the Composio OPENAI_CREATE_EMBEDDINGS tool output schema.`,
);
}
if (row.embedding.length !== expectedDims) {
throw new AIConfigError(
`Embedding dim mismatch: model ${modelId} returned ${row.embedding.length} but schema expects ${expectedDims}.`,
`Run \`gbrain migrate --embedding-model composio-openai:${modelId} --embedding-dimensions ${row.embedding.length}\` or change models.`,
);
}
return new Float32Array(row.embedding);
});
}

async function embedComposioOpenAI(
texts: string[],
model: { composioCli: string; modelId: string; env: Record<string, string | undefined> },
expectedDims: number,
): Promise<Float32Array[]> {
const out: Float32Array[] = [];
for (let i = 0; i < texts.length; i += COMPOSIO_EMBEDDING_BATCH_SIZE) {
out.push(...await embedComposioOpenAISubBatch(
texts.slice(i, i + COMPOSIO_EMBEDDING_BATCH_SIZE),
model.modelId,
expectedDims,
model.composioCli,
model.env,
));
}
return out;
}

/** Minimum sub-batch size before we give up splitting and just throw. */
const MIN_SUB_BATCH = 1;

Expand Down Expand Up @@ -1239,6 +1362,11 @@ async function embedSubBatch(
opts?: EmbedOpts,
): Promise<Float32Array[]> {
try {
if (recipe.implementation === 'composio-openai') {
const embeddings = await embedComposioOpenAI(texts, model, expectedDims);
recordSubBatchSuccess(recipe);
return embeddings;
}
Comment on lines +1365 to +1369
const result = await _embedTransport({
model,
values: texts,
Expand Down
26 changes: 26 additions & 0 deletions src/core/ai/recipes/composio-openai.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import type { Recipe } from '../types.ts';

export const composioOpenAI: Recipe = {
id: 'composio-openai',
name: 'Composio OpenAI',
tier: 'native',
implementation: 'composio-openai',
auth_env: {
required: ['COMPOSIO_CLI'],
setup_url: 'https://composio.dev',
},
touchpoints: {
embedding: {
models: ['text-embedding-3-large', 'text-embedding-3-small'],
default_dims: 1536,
dims_options: [256, 512, 768, 1024, 1536, 3072],
cost_per_1m_tokens_usd: 0.13,
price_last_verified: '2026-04-20',
max_batch_tokens: 120000,
chars_per_token: 4,
safety_factor: 0.8,
},
},
setup_hint:
'Install and authenticate the Composio CLI, then set COMPOSIO_CLI to its absolute path. The raw OpenAI key remains inside Composio.',
};
2 changes: 2 additions & 0 deletions src/core/ai/recipes/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { dashscope } from './dashscope.ts';
import { zhipu } from './zhipu.ts';
import { azureOpenAI } from './azure-openai.ts';
import { zeroentropyai } from './zeroentropyai.ts';
import { composioOpenAI } from './composio-openai.ts';

const ALL: Recipe[] = [
openai,
Expand All @@ -38,6 +39,7 @@ const ALL: Recipe[] = [
zhipu,
azureOpenAI,
zeroentropyai,
composioOpenAI,
];

/** Map from `provider:id` key to recipe. */
Expand Down
1 change: 1 addition & 0 deletions src/core/ai/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ export type Implementation =
| 'native-openai'
| 'native-google'
| 'native-anthropic'
| 'composio-openai'
| 'openai-compatible';

export interface EmbeddingTouchpoint {
Expand Down
71 changes: 71 additions & 0 deletions test/ai/composio-openai-embedding.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import { describe, test, expect, beforeEach } from 'bun:test';
import { chmodSync, mkdtempSync, writeFileSync } from 'fs';
import { join } from 'path';
import { tmpdir } from 'os';
import {
configureGateway,
embed,
isAvailable,
resetGateway,
} from '../../src/core/ai/gateway.ts';
import { resolveRecipe } from '../../src/core/ai/model-resolver.ts';

function fakeComposioCli(): string {
const dir = mkdtempSync(join(tmpdir(), 'gbrain-composio-openai-'));
const cli = join(dir, 'composio');
writeFileSync(cli, `#!/bin/sh
if [ "$1" != "execute" ] || [ "$2" != "OPENAI_CREATE_EMBEDDINGS" ]; then
echo "unexpected args: $*" >&2
exit 2
fi
out='${dir}/embeddings-output.json'
cat > "$out" <<'JSON'
{"data":{"data":[{"object":"embedding","index":0,"embedding":[0,0.01,0.02,0.03]},{"object":"embedding","index":1,"embedding":[1,1.01,1.02,1.03]}],"model":"text-embedding-3-small"}}
JSON
echo 'Update available: ignored by parser'
echo '{"successful":true,"storedInFile":true,"outputFilePath":"${dir}/embeddings-output.json"}'
`);
chmodSync(cli, 0o700);
return cli;
}

describe('Composio OpenAI embedding provider', () => {
beforeEach(() => resetGateway());

test('recipe is registered and availability is keyed by COMPOSIO_CLI', () => {
const { recipe, parsed } = resolveRecipe('composio-openai:text-embedding-3-small');
expect(recipe.id).toBe('composio-openai');
expect(parsed.modelId).toBe('text-embedding-3-small');

configureGateway({
embedding_model: 'composio-openai:text-embedding-3-small',
embedding_dimensions: 4,
env: {},
});
expect(isAvailable('embedding')).toBe(false);

resetGateway();
configureGateway({
embedding_model: 'composio-openai:text-embedding-3-small',
embedding_dimensions: 4,
env: { COMPOSIO_CLI: fakeComposioCli() },
});
expect(isAvailable('embedding')).toBe(true);
});

test('embeds through the Composio CLI without OPENAI_API_KEY', async () => {
configureGateway({
embedding_model: 'composio-openai:text-embedding-3-small',
embedding_dimensions: 4,
env: { COMPOSIO_CLI: fakeComposioCli() },
});

const vectors = await embed(['first document', 'second document']);

expect(vectors).toHaveLength(2);
expect(vectors[0][0]).toBeCloseTo(0);
expect(vectors[0][3]).toBeCloseTo(0.03);
expect(vectors[1][0]).toBeCloseTo(1);
expect(vectors[1][3]).toBeCloseTo(1.03);
});
});