From 92a29e0c2799c057c18866977bd51302064d0cf1 Mon Sep 17 00:00:00 2001 From: Antman316 <164426891+Antman316@users.noreply.github.com> Date: Wed, 20 May 2026 15:38:45 -0400 Subject: [PATCH] Add Composio OpenAI embedding provider --- src/core/ai/dims.ts | 16 +++ src/core/ai/gateway.ts | 128 ++++++++++++++++++++++ src/core/ai/recipes/composio-openai.ts | 26 +++++ src/core/ai/recipes/index.ts | 2 + src/core/ai/types.ts | 1 + test/ai/composio-openai-embedding.test.ts | 71 ++++++++++++ 6 files changed, 244 insertions(+) create mode 100644 src/core/ai/recipes/composio-openai.ts create mode 100644 test/ai/composio-openai-embedding.test.ts diff --git a/src/core/ai/dims.ts b/src/core/ai/dims.ts index 4f170e8fc..6b559c41f 100644 --- a/src/core/ai/dims.ts +++ b/src/core/ai/dims.ts @@ -135,6 +135,22 @@ export function dimsProviderOptions( } return undefined; } + case 'composio-openai': { + // Composio proxies OpenAI's embeddings endpoint, so it honors the same + // dimensions contract as native OpenAI text-embedding-3 models. + if (modelId.startsWith('text-embedding-3')) { + if (isOpenAITextEmbedding3Model(modelId) && !isValidOpenAITextEmbedding3Dim(modelId, dims)) { + const max = maxOpenAITextEmbedding3Dim(modelId)!; + throw new AIConfigError( + `OpenAI model "${modelId}" supports embedding_dimensions in 1..${max}, got ${dims}.`, + `Set \`embedding_dimensions\` to a value between 1 and ${max} ` + + `(\`gbrain config set embedding_dimensions ${Math.min(1024, max)}\` is a common default).`, + ); + } + return { openai: { dimensions: dims } }; + } + return undefined; + } case 'native-google': { if (modelId.startsWith('gemini-embedding') || modelId === 'text-embedding-004') { return { google: { outputDimensionality: dims } }; diff --git a/src/core/ai/gateway.ts b/src/core/ai/gateway.ts index 54a525775..07840ead0 100644 --- a/src/core/ai/gateway.ts +++ b/src/core/ai/gateway.ts @@ -28,6 +28,7 @@ import { createGoogleGenerativeAI } from '@ai-sdk/google'; import { createAnthropic } from '@ai-sdk/anthropic'; import { createOpenAICompatible } from '@ai-sdk/openai-compatible'; import { z } from 'zod'; +import { readFileSync } from 'fs'; import type { AIGatewayConfig, @@ -185,6 +186,7 @@ export class VoyageResponseTooLargeError extends Error { * pinning the Voyage name. Unification is a follow-up cleanup. */ const MAX_ZEROENTROPY_RESPONSE_BYTES = 256 * 1024 * 1024; +const COMPOSIO_EMBEDDING_BATCH_SIZE = 20; export class ZeroEntropyResponseTooLargeError extends Error { constructor(message: string) { @@ -953,6 +955,12 @@ function instantiateEmbedding(recipe: Recipe, modelId: string, cfg: AIGatewayCon throw new AIConfigError( `Anthropic has no embedding model. Use openai or google for embeddings.`, ); + case 'composio-openai': + if (!cfg.env.COMPOSIO_CLI) throw new AIConfigError( + `Composio OpenAI embedding requires COMPOSIO_CLI.`, + recipe.setup_hint, + ); + return { composioCli: cfg.env.COMPOSIO_CLI, modelId, env: cfg.env }; case 'openai-compatible': { // D12=A: unified auth via Recipe.resolveAuth (or default). const auth = applyResolveAuth(recipe, cfg, 'embedding'); @@ -985,6 +993,121 @@ function instantiateEmbedding(recipe: Recipe, modelId: string, cfg: AIGatewayCon } } +function parseComposioEnvelope(stdout: string): any { + const firstJson = stdout.indexOf('{'); + if (firstJson < 0) { + throw new AIConfigError( + `Composio CLI returned no JSON envelope.`, + `Check COMPOSIO_CLI and run \`$COMPOSIO_CLI whoami\`.`, + ); + } + const envelope = JSON.parse(stdout.slice(firstJson)); + if (!envelope.successful) { + throw new AIConfigError( + `Composio OpenAI embeddings failed: ${envelope.error || 'unknown error'}`, + `Check the Composio OpenAI connection and retry.`, + ); + } + if (envelope.storedInFile && envelope.outputFilePath) { + return JSON.parse(readFileSync(envelope.outputFilePath, 'utf-8')); + } + return envelope; +} + +function extractComposioEmbeddingRows(payload: any): Array<{ index?: number; embedding: number[] }> { + const rows = + payload?.data?.data ?? + payload?.data?.response?.data?.data ?? + payload?.response?.data?.data ?? + payload?.response?.data ?? + payload?.data ?? + []; + if (!Array.isArray(rows)) { + throw new AIConfigError( + `Composio OpenAI embeddings response did not include a data array.`, + `Inspect the Composio OPENAI_CREATE_EMBEDDINGS tool output schema.`, + ); + } + return rows.slice().sort((a, b) => Number(a?.index ?? 0) - Number(b?.index ?? 0)); +} + +async function embedComposioOpenAISubBatch( + texts: string[], + modelId: string, + expectedDims: number, + composioCli: string, + env: Record, +): Promise { + const proc = Bun.spawn([ + composioCli, + 'execute', + 'OPENAI_CREATE_EMBEDDINGS', + '-d', + JSON.stringify({ + model: modelId, + input: texts, + dimensions: expectedDims, + encoding_format: 'float', + }), + ], { + stdout: 'pipe', + stderr: 'pipe', + env, + }); + const [stdout, stderr, exitCode] = await Promise.all([ + new Response(proc.stdout).text(), + new Response(proc.stderr).text(), + proc.exited, + ]); + if (exitCode !== 0) { + throw new AITransientError( + `Composio CLI exited ${exitCode}: ${stderr.trim() || stdout.trim()}`, + `Check COMPOSIO_CLI and the Composio OpenAI connection.`, + ); + } + const payload = parseComposioEnvelope(stdout); + const rows = extractComposioEmbeddingRows(payload); + if (rows.length !== texts.length) { + throw new AIConfigError( + `Composio OpenAI returned ${rows.length} embedding(s) for ${texts.length} input(s).`, + `Retry the import after checking provider health; partial embedding responses are not safe to index.`, + ); + } + return rows.map(row => { + if (!Array.isArray(row.embedding)) { + throw new AIConfigError( + `Composio OpenAI embedding row did not include a numeric embedding array.`, + `Inspect the Composio OPENAI_CREATE_EMBEDDINGS tool output schema.`, + ); + } + if (row.embedding.length !== expectedDims) { + throw new AIConfigError( + `Embedding dim mismatch: model ${modelId} returned ${row.embedding.length} but schema expects ${expectedDims}.`, + `Run \`gbrain migrate --embedding-model composio-openai:${modelId} --embedding-dimensions ${row.embedding.length}\` or change models.`, + ); + } + return new Float32Array(row.embedding); + }); +} + +async function embedComposioOpenAI( + texts: string[], + model: { composioCli: string; modelId: string; env: Record }, + expectedDims: number, +): Promise { + const out: Float32Array[] = []; + for (let i = 0; i < texts.length; i += COMPOSIO_EMBEDDING_BATCH_SIZE) { + out.push(...await embedComposioOpenAISubBatch( + texts.slice(i, i + COMPOSIO_EMBEDDING_BATCH_SIZE), + model.modelId, + expectedDims, + model.composioCli, + model.env, + )); + } + return out; +} + /** Minimum sub-batch size before we give up splitting and just throw. */ const MIN_SUB_BATCH = 1; @@ -1239,6 +1362,11 @@ async function embedSubBatch( opts?: EmbedOpts, ): Promise { try { + if (recipe.implementation === 'composio-openai') { + const embeddings = await embedComposioOpenAI(texts, model, expectedDims); + recordSubBatchSuccess(recipe); + return embeddings; + } const result = await _embedTransport({ model, values: texts, diff --git a/src/core/ai/recipes/composio-openai.ts b/src/core/ai/recipes/composio-openai.ts new file mode 100644 index 000000000..77ee2d65e --- /dev/null +++ b/src/core/ai/recipes/composio-openai.ts @@ -0,0 +1,26 @@ +import type { Recipe } from '../types.ts'; + +export const composioOpenAI: Recipe = { + id: 'composio-openai', + name: 'Composio OpenAI', + tier: 'native', + implementation: 'composio-openai', + auth_env: { + required: ['COMPOSIO_CLI'], + setup_url: 'https://composio.dev', + }, + touchpoints: { + embedding: { + models: ['text-embedding-3-large', 'text-embedding-3-small'], + default_dims: 1536, + dims_options: [256, 512, 768, 1024, 1536, 3072], + cost_per_1m_tokens_usd: 0.13, + price_last_verified: '2026-04-20', + max_batch_tokens: 120000, + chars_per_token: 4, + safety_factor: 0.8, + }, + }, + setup_hint: + 'Install and authenticate the Composio CLI, then set COMPOSIO_CLI to its absolute path. The raw OpenAI key remains inside Composio.', +}; diff --git a/src/core/ai/recipes/index.ts b/src/core/ai/recipes/index.ts index 5915c9a92..35e998fdc 100644 --- a/src/core/ai/recipes/index.ts +++ b/src/core/ai/recipes/index.ts @@ -21,6 +21,7 @@ import { dashscope } from './dashscope.ts'; import { zhipu } from './zhipu.ts'; import { azureOpenAI } from './azure-openai.ts'; import { zeroentropyai } from './zeroentropyai.ts'; +import { composioOpenAI } from './composio-openai.ts'; const ALL: Recipe[] = [ openai, @@ -38,6 +39,7 @@ const ALL: Recipe[] = [ zhipu, azureOpenAI, zeroentropyai, + composioOpenAI, ]; /** Map from `provider:id` key to recipe. */ diff --git a/src/core/ai/types.ts b/src/core/ai/types.ts index ad6ab0279..c5082e583 100644 --- a/src/core/ai/types.ts +++ b/src/core/ai/types.ts @@ -22,6 +22,7 @@ export type Implementation = | 'native-openai' | 'native-google' | 'native-anthropic' + | 'composio-openai' | 'openai-compatible'; export interface EmbeddingTouchpoint { diff --git a/test/ai/composio-openai-embedding.test.ts b/test/ai/composio-openai-embedding.test.ts new file mode 100644 index 000000000..f4d6f6d68 --- /dev/null +++ b/test/ai/composio-openai-embedding.test.ts @@ -0,0 +1,71 @@ +import { describe, test, expect, beforeEach } from 'bun:test'; +import { chmodSync, mkdtempSync, writeFileSync } from 'fs'; +import { join } from 'path'; +import { tmpdir } from 'os'; +import { + configureGateway, + embed, + isAvailable, + resetGateway, +} from '../../src/core/ai/gateway.ts'; +import { resolveRecipe } from '../../src/core/ai/model-resolver.ts'; + +function fakeComposioCli(): string { + const dir = mkdtempSync(join(tmpdir(), 'gbrain-composio-openai-')); + const cli = join(dir, 'composio'); + writeFileSync(cli, `#!/bin/sh +if [ "$1" != "execute" ] || [ "$2" != "OPENAI_CREATE_EMBEDDINGS" ]; then + echo "unexpected args: $*" >&2 + exit 2 +fi +out='${dir}/embeddings-output.json' +cat > "$out" <<'JSON' +{"data":{"data":[{"object":"embedding","index":0,"embedding":[0,0.01,0.02,0.03]},{"object":"embedding","index":1,"embedding":[1,1.01,1.02,1.03]}],"model":"text-embedding-3-small"}} +JSON +echo 'Update available: ignored by parser' +echo '{"successful":true,"storedInFile":true,"outputFilePath":"${dir}/embeddings-output.json"}' +`); + chmodSync(cli, 0o700); + return cli; +} + +describe('Composio OpenAI embedding provider', () => { + beforeEach(() => resetGateway()); + + test('recipe is registered and availability is keyed by COMPOSIO_CLI', () => { + const { recipe, parsed } = resolveRecipe('composio-openai:text-embedding-3-small'); + expect(recipe.id).toBe('composio-openai'); + expect(parsed.modelId).toBe('text-embedding-3-small'); + + configureGateway({ + embedding_model: 'composio-openai:text-embedding-3-small', + embedding_dimensions: 4, + env: {}, + }); + expect(isAvailable('embedding')).toBe(false); + + resetGateway(); + configureGateway({ + embedding_model: 'composio-openai:text-embedding-3-small', + embedding_dimensions: 4, + env: { COMPOSIO_CLI: fakeComposioCli() }, + }); + expect(isAvailable('embedding')).toBe(true); + }); + + test('embeds through the Composio CLI without OPENAI_API_KEY', async () => { + configureGateway({ + embedding_model: 'composio-openai:text-embedding-3-small', + embedding_dimensions: 4, + env: { COMPOSIO_CLI: fakeComposioCli() }, + }); + + const vectors = await embed(['first document', 'second document']); + + expect(vectors).toHaveLength(2); + expect(vectors[0][0]).toBeCloseTo(0); + expect(vectors[0][3]).toBeCloseTo(0.03); + expect(vectors[1][0]).toBeCloseTo(1); + expect(vectors[1][3]).toBeCloseTo(1.03); + }); +});