diff --git a/universal-refiner/scripts/stress/event-store-soak-worker.mjs b/universal-refiner/scripts/stress/event-store-soak-worker.mjs index 0208038..0feaca4 100644 --- a/universal-refiner/scripts/stress/event-store-soak-worker.mjs +++ b/universal-refiner/scripts/stress/event-store-soak-worker.mjs @@ -2,13 +2,14 @@ import { EventStore } from "../../dist/src/history/event-store.js"; const workerId = process.argv[2]; const durationMs = Number.parseInt(process.argv[3] || "10000", 10); +const minOperations = Number.parseInt(process.argv[4] || "0", 10); const deadline = Date.now() + durationMs; const counts = { events: 0, prompts: 0, executions: 0, operations: 0 }; const store = EventStore.getInstance(); let index = 0; let lastPromptId; -while (Date.now() < deadline) { +while (Date.now() < deadline || counts.operations < minOperations) { const id = `soak-${workerId}-${index}`; const operation = index % 3; diff --git a/universal-refiner/scripts/stress/event-store-soak.mjs b/universal-refiner/scripts/stress/event-store-soak.mjs index 1f36455..2306bfb 100644 --- a/universal-refiner/scripts/stress/event-store-soak.mjs +++ b/universal-refiner/scripts/stress/event-store-soak.mjs @@ -25,13 +25,14 @@ export async function runEventStoreSoak(options = {}) { const workers = readPositiveInteger(options.workers, 4, "workers"); const durationMs = readPositiveInteger(options.durationMs, 10_000, "durationMs"); const minOperations = readPositiveInteger(options.minOperations, workers * 10, "minOperations"); + const minOperationsPerWorker = Math.ceil(minOperations / workers); const maxLossRatio = readRatio(options.maxLossRatio, 0, "maxLossRatio"); const directory = await mkdtemp(join(tmpdir(), "prompt-refiner-soak-")); const databasePath = join(directory, "events.db"); try { const results = await Promise.all(Array.from({ length: workers }, async (_, index) => { - const result = await runProcess(process.execPath, [workerScript, String(index), String(durationMs)], { + const result = await runProcess(process.execPath, [workerScript, String(index), String(durationMs), String(minOperationsPerWorker)], { cwd: repoRoot, env: { ...process.env, PROMPT_REFINER_GLOBAL_DIR: directory, PROMPT_REFINER_LOG_LEVEL: "error" }, timeoutMs: durationMs + 30_000, diff --git a/universal-refiner/src/core/dashboard.ts b/universal-refiner/src/core/dashboard.ts index 52a0868..6e2ff31 100644 --- a/universal-refiner/src/core/dashboard.ts +++ b/universal-refiner/src/core/dashboard.ts @@ -9,12 +9,12 @@ import { streamSSE } from "hono/streaming"; import { getDisplayVersion } from "./version.js"; import { RuntimeLogger } from "./logger.js"; import { ConfigManager } from "./config.js"; +import { randomUUID } from "crypto"; import { TimelineProvider } from "../history/timeline.js"; import { EventStore } from "../history/event-store.js"; import { AutoPilotStatus } from "./autopilot-status.js"; import { createABEvaluationRecord } from "../evaluation/prompt-evaluator.js"; -import { randomUUID } from "crypto"; const __dirname = path.dirname(fileURLToPath(import.meta.url)); @@ -394,6 +394,110 @@ export class CommandCenterDashboard { return c.json({ error: "Provider health unavailable" }, 500); } }); + app.post("/proxy/v1/chat/completions", async (c) => { + const selectedPath = this.resolveSelectedPath(c.req.query("project")); + try { + if (!isJsonContentType(c.req.header("content-type"))) { + return c.json({ error: "Proxy requests must use application/json" }, 415); + } + + const config = ConfigManager.getSemanticConfig(selectedPath); + let upstreamBase: URL; + try { + upstreamBase = new URL(config.baseUrl.endsWith("/") ? config.baseUrl : `${config.baseUrl}/`); + } catch { + return c.json({ error: "Configured semantic baseUrl is invalid" }, 400); + } + if (!config.allowNonLoopback && !isLoopbackHost(upstreamBase.hostname)) { + return c.json({ error: "Proxy upstream must be loopback unless allowNonLoopback is enabled" }, 403); + } + + let body: any; + try { + body = await c.req.json(); + } catch { + return c.json({ error: "Proxy request body must be valid JSON" }, 400); + } + + const messages = Array.isArray(body.messages) ? body.messages : []; + const lastMessage = messages[messages.length - 1]; + const rawPrompt = typeof lastMessage?.content === "string" && lastMessage.content.trim() + ? lastMessage.content + : "Unknown proxy prompt"; + + const store = EventStore.getInstance(); + const repoId = store.ensureRepository(selectedPath).id; + const promptId = `prm_${randomUUID()}`; + const execId = `exec_${randomUUID()}`; + store.recordPrompt({ + id: promptId, + client: "API_PROXY", + agent_name: "ProxyClient", + raw_prompt: rawPrompt, + repo_id: repoId, + }); + + const upstreamUrl = new URL("chat/completions", upstreamBase).toString(); + let upstreamResponse: Response; + try { + upstreamResponse = await fetch(upstreamUrl, { + method: "POST", + headers: { + "Content-Type": "application/json", + ...(c.req.header("authorization") ? { "Authorization": c.req.header("authorization") as string } : {}), + }, + body: JSON.stringify(body), + }); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + store.recordExecution({ + id: execId, + prompt_id: promptId, + workflow_name: "proxy_forward", + executor_name: "LocalLLM", + status: "failed", + result_summary: `Network error reaching upstream: ${message}`, + artifacts_json: JSON.stringify({ error: redactSensitive(message) }), + }); + return c.json({ error: "Proxy request failed" }, 502); + } + + if (!upstreamResponse.ok) { + const errText = await upstreamResponse.text(); + store.recordExecution({ + id: execId, + prompt_id: promptId, + workflow_name: "proxy_forward", + executor_name: "LocalLLM", + status: "failed", + result_summary: `Upstream error: ${upstreamResponse.status} ${upstreamResponse.statusText}`, + artifacts_json: JSON.stringify({ error: redactSensitive(errText) }), + }); + return new Response(errText, { + status: upstreamResponse.status, + headers: upstreamResponse.headers, + }); + } + + store.recordExecution({ + id: execId, + prompt_id: promptId, + workflow_name: "proxy_forward", + executor_name: "LocalLLM", + status: "completed", + result_summary: `Proxied successfully to ${sanitizeEndpoint(config.baseUrl)}`, + artifacts_json: "{}", + }); + + return new Response(upstreamResponse.body, { + status: upstreamResponse.status, + headers: upstreamResponse.headers, + }); + } catch (error) { + this.logRouteError("proxy/v1/chat/completions", error, selectedPath); + return c.json({ error: "Proxy request failed" }, 502); + } + }); app.get("/api/tournaments", async (c) => { const selectedPath = this.resolveSelectedPath(c.req.query("project")); diff --git a/universal-refiner/tests/proxy.test.ts b/universal-refiner/tests/proxy.test.ts new file mode 100644 index 0000000..b30b360 --- /dev/null +++ b/universal-refiner/tests/proxy.test.ts @@ -0,0 +1,205 @@ +// secret-scan: allow-fixture +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import * as fs from "node:fs"; +import * as os from "node:os"; +import * as path from "node:path"; +import { CommandCenterDashboard } from "../src/core/dashboard.js"; +import { ConfigManager, type SemanticConfig } from "../src/core/config.js"; +import { EventStore } from "../src/history/event-store.js"; + +describe("dashboard OpenAI-compatible proxy route", () => { + let testDir: string; + let repoDir: string; + let store: EventStore; + const semanticConfig: SemanticConfig = { + localEnabled: true, + mcpSamplingEnabled: false, + baseUrl: "http://127.0.0.1:9000/v1", + models: ["gemma3:12b"], + timeoutMs: 1_000, + temperature: 0.2, + allowNonLoopback: false, + }; + + beforeEach(() => { + testDir = fs.mkdtempSync(path.join(os.tmpdir(), "proxy-route-")); + repoDir = path.join(testDir, "repo"); + fs.mkdirSync(repoDir, { recursive: true }); + process.env.PROMPT_REFINER_GLOBAL_DIR = path.join(testDir, "global"); + (EventStore as unknown as { instance: EventStore | null }).instance = null; + store = EventStore.getInstance(); + vi.spyOn(ConfigManager, "getSemanticConfig").mockReturnValue(semanticConfig); + }); + + afterEach(() => { + vi.restoreAllMocks(); + store.close(); + (EventStore as unknown as { instance: EventStore | null }).instance = null; + delete process.env.PROMPT_REFINER_GLOBAL_DIR; + fs.rmSync(testDir, { recursive: true, force: true }); + }); + + it("records the prompt, forwards the request, and records a completed execution", async () => { + const fetchMock = vi.fn().mockResolvedValue(new Response("mock LLM response", { + status: 200, + headers: { "content-type": "text/plain" }, + })); + vi.stubGlobal("fetch", fetchMock); + const app = CommandCenterDashboard.createApp(repoDir); + + const response = await app.request("/proxy/v1/chat/completions", { + method: "POST", + headers: { + "content-type": "application/json", + authorization: "Bearer local-token", + }, + body: JSON.stringify({ + messages: [{ role: "user", content: "Improve this prompt" }], + }), + }); + + expect(response.status).toBe(200); + expect(await response.text()).toBe("mock LLM response"); + expect(fetchMock).toHaveBeenCalledWith("http://127.0.0.1:9000/v1/chat/completions", expect.objectContaining({ + method: "POST", + headers: expect.objectContaining({ + "Content-Type": "application/json", + "Authorization": "Bearer local-token", + }), + })); + + const db = (store as any).db; + const prompt = db.prepare("SELECT * FROM prompts WHERE client = ?").get("API_PROXY"); + expect(prompt.raw_prompt).toBe("Improve this prompt"); + const execution = db.prepare("SELECT * FROM executions WHERE prompt_id = ?").get(prompt.id); + expect(execution).toMatchObject({ + workflow_name: "proxy_forward", + executor_name: "LocalLLM", + status: "completed", + artifacts_json: "{}", + }); + }); + + it("supports trailing-slash base URLs and falls back when no prompt message is present", async () => { + vi.spyOn(ConfigManager, "getSemanticConfig").mockReturnValueOnce({ + ...semanticConfig, + baseUrl: "http://127.0.0.1:9000/v1/", + }); + const fetchMock = vi.fn().mockResolvedValue(new Response("ok", { status: 200 })); + vi.stubGlobal("fetch", fetchMock); + const app = CommandCenterDashboard.createApp(repoDir); + + const response = await app.request("/proxy/v1/chat/completions", { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ messages: "not-an-array" }), + }); + + expect(response.status).toBe(200); + expect(fetchMock).toHaveBeenCalledWith("http://127.0.0.1:9000/v1/chat/completions", expect.objectContaining({ + headers: { "Content-Type": "application/json" }, + })); + const db = (store as any).db; + expect(db.prepare("SELECT raw_prompt FROM prompts WHERE client = ?").get("API_PROXY")) + .toEqual({ raw_prompt: "Unknown proxy prompt" }); + }); + + it("rejects malformed proxy requests before forwarding", async () => { + const fetchMock = vi.fn(); + vi.stubGlobal("fetch", fetchMock); + const app = CommandCenterDashboard.createApp(repoDir); + + expect((await app.request("/proxy/v1/chat/completions", { method: "POST", body: "{}" })).status).toBe(415); + expect((await app.request("/proxy/v1/chat/completions", { + method: "POST", + headers: { "content-type": "application/json" }, + body: "{", + })).status).toBe(400); + + vi.spyOn(ConfigManager, "getSemanticConfig").mockReturnValueOnce({ + ...semanticConfig, + baseUrl: "https://remote.example/v1", + allowNonLoopback: false, + }); + expect((await app.request("/proxy/v1/chat/completions", { + method: "POST", + headers: { "content-type": "application/json" }, + body: "{}", + })).status).toBe(403); + + vi.spyOn(ConfigManager, "getSemanticConfig").mockReturnValueOnce({ + ...semanticConfig, + baseUrl: "not a url", + }); + expect((await app.request("/proxy/v1/chat/completions", { + method: "POST", + headers: { "content-type": "application/json" }, + body: "{}", + })).status).toBe(400); + expect(fetchMock).not.toHaveBeenCalled(); + }); + + it("records failed executions for upstream HTTP and network failures", async () => { + const app = CommandCenterDashboard.createApp(repoDir); + vi.stubGlobal("fetch", vi.fn() + .mockResolvedValueOnce(new Response("token=secret-value", { status: 503, statusText: "Unavailable" })) + .mockRejectedValueOnce(new Error("connection refused with token=secret-value"))); + + const upstreamFailure = await app.request("/proxy/v1/chat/completions", { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ messages: [{ role: "user", content: "First" }] }), + }); + expect(upstreamFailure.status).toBe(503); + expect(await upstreamFailure.text()).toBe("token=secret-value"); + + const networkFailure = await app.request("/proxy/v1/chat/completions", { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ messages: [{ role: "user", content: "Second" }] }), + }); + expect(networkFailure.status).toBe(502); + expect(await networkFailure.json()).toEqual({ error: "Proxy request failed" }); + + const db = (store as any).db; + const failures = db.prepare("SELECT * FROM executions WHERE status = 'failed' ORDER BY started_at ASC").all(); + expect(failures).toHaveLength(2); + expect(failures[0].result_summary).toContain("Upstream error: 503"); + expect(failures[0].artifacts_json).toContain("token=[REDACTED]"); + expect(failures[1].result_summary).toContain("Network error reaching upstream"); + expect(failures[1].artifacts_json).toContain("token=[REDACTED]"); + }); + + it("records non-Error network failures", async () => { + const app = CommandCenterDashboard.createApp(repoDir); + vi.stubGlobal("fetch", vi.fn().mockRejectedValue("string failure token=secret-value")); + + const response = await app.request("/proxy/v1/chat/completions", { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ messages: [{ role: "user", content: "Prompt" }] }), + }); + + expect(response.status).toBe(502); + const db = (store as any).db; + const failure = db.prepare("SELECT * FROM executions WHERE status = 'failed'").get(); + expect(failure.result_summary).toContain("string failure"); + expect(failure.artifacts_json).toContain("token=[REDACTED]"); + }); + + it("returns a sanitized route failure when proxy bookkeeping fails", async () => { + const app = CommandCenterDashboard.createApp(repoDir); + vi.spyOn(EventStore, "getInstance").mockImplementationOnce(() => { + throw new Error("store secret"); + }); + + const response = await app.request("/proxy/v1/chat/completions", { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ messages: [] }), + }); + + expect(response.status).toBe(502); + expect(await response.json()).toEqual({ error: "Proxy request failed" }); + }); +});