Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion universal-refiner/scripts/stress/event-store-soak-worker.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ import { EventStore } from "../../dist/src/history/event-store.js";

const workerId = process.argv[2];
const durationMs = Number.parseInt(process.argv[3] || "10000", 10);
const minOperations = Number.parseInt(process.argv[4] || "0", 10);
const deadline = Date.now() + durationMs;
const counts = { events: 0, prompts: 0, executions: 0, operations: 0 };
const store = EventStore.getInstance();
let index = 0;
let lastPromptId;

while (Date.now() < deadline) {
while (Date.now() < deadline || counts.operations < minOperations) {
const id = `soak-${workerId}-${index}`;
const operation = index % 3;

Expand Down
3 changes: 2 additions & 1 deletion universal-refiner/scripts/stress/event-store-soak.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@ export async function runEventStoreSoak(options = {}) {
const workers = readPositiveInteger(options.workers, 4, "workers");
const durationMs = readPositiveInteger(options.durationMs, 10_000, "durationMs");
const minOperations = readPositiveInteger(options.minOperations, workers * 10, "minOperations");
const minOperationsPerWorker = Math.ceil(minOperations / workers);
const maxLossRatio = readRatio(options.maxLossRatio, 0, "maxLossRatio");
const directory = await mkdtemp(join(tmpdir(), "prompt-refiner-soak-"));
const databasePath = join(directory, "events.db");

try {
const results = await Promise.all(Array.from({ length: workers }, async (_, index) => {
const result = await runProcess(process.execPath, [workerScript, String(index), String(durationMs)], {
const result = await runProcess(process.execPath, [workerScript, String(index), String(durationMs), String(minOperationsPerWorker)], {
cwd: repoRoot,
env: { ...process.env, PROMPT_REFINER_GLOBAL_DIR: directory, PROMPT_REFINER_LOG_LEVEL: "error" },
timeoutMs: durationMs + 30_000,
Expand Down
106 changes: 105 additions & 1 deletion universal-refiner/src/core/dashboard.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ import { streamSSE } from "hono/streaming";
import { getDisplayVersion } from "./version.js";
import { RuntimeLogger } from "./logger.js";
import { ConfigManager } from "./config.js";
import { randomUUID } from "crypto";

import { TimelineProvider } from "../history/timeline.js";
import { EventStore } from "../history/event-store.js";
import { AutoPilotStatus } from "./autopilot-status.js";
import { createABEvaluationRecord } from "../evaluation/prompt-evaluator.js";
import { randomUUID } from "crypto";

const __dirname = path.dirname(fileURLToPath(import.meta.url));

Expand Down Expand Up @@ -394,6 +394,110 @@ export class CommandCenterDashboard {
return c.json({ error: "Provider health unavailable" }, 500);
}
});
app.post("/proxy/v1/chat/completions", async (c) => {
const selectedPath = this.resolveSelectedPath(c.req.query("project"));
try {
if (!isJsonContentType(c.req.header("content-type"))) {
return c.json({ error: "Proxy requests must use application/json" }, 415);
}

const config = ConfigManager.getSemanticConfig(selectedPath);
let upstreamBase: URL;
try {
upstreamBase = new URL(config.baseUrl.endsWith("/") ? config.baseUrl : `${config.baseUrl}/`);
} catch {
return c.json({ error: "Configured semantic baseUrl is invalid" }, 400);
}
if (!config.allowNonLoopback && !isLoopbackHost(upstreamBase.hostname)) {
return c.json({ error: "Proxy upstream must be loopback unless allowNonLoopback is enabled" }, 403);
}

let body: any;
try {
body = await c.req.json();
} catch {
return c.json({ error: "Proxy request body must be valid JSON" }, 400);
}

const messages = Array.isArray(body.messages) ? body.messages : [];
const lastMessage = messages[messages.length - 1];
const rawPrompt = typeof lastMessage?.content === "string" && lastMessage.content.trim()
? lastMessage.content
: "Unknown proxy prompt";

const store = EventStore.getInstance();
const repoId = store.ensureRepository(selectedPath).id;
const promptId = `prm_${randomUUID()}`;
const execId = `exec_${randomUUID()}`;
store.recordPrompt({
id: promptId,
client: "API_PROXY",
agent_name: "ProxyClient",
raw_prompt: rawPrompt,
repo_id: repoId,
});

const upstreamUrl = new URL("chat/completions", upstreamBase).toString();
let upstreamResponse: Response;
try {
upstreamResponse = await fetch(upstreamUrl, {
method: "POST",
headers: {
"Content-Type": "application/json",
...(c.req.header("authorization") ? { "Authorization": c.req.header("authorization") as string } : {}),
},
body: JSON.stringify(body),
});
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
store.recordExecution({
id: execId,
prompt_id: promptId,
workflow_name: "proxy_forward",
executor_name: "LocalLLM",
status: "failed",
result_summary: `Network error reaching upstream: ${message}`,
artifacts_json: JSON.stringify({ error: redactSensitive(message) }),
});
return c.json({ error: "Proxy request failed" }, 502);
}

if (!upstreamResponse.ok) {
const errText = await upstreamResponse.text();
store.recordExecution({
id: execId,
prompt_id: promptId,
workflow_name: "proxy_forward",
executor_name: "LocalLLM",
status: "failed",
result_summary: `Upstream error: ${upstreamResponse.status} ${upstreamResponse.statusText}`,
artifacts_json: JSON.stringify({ error: redactSensitive(errText) }),
});
return new Response(errText, {
status: upstreamResponse.status,
headers: upstreamResponse.headers,
});
}

store.recordExecution({
id: execId,
prompt_id: promptId,
workflow_name: "proxy_forward",
executor_name: "LocalLLM",
status: "completed",
result_summary: `Proxied successfully to ${sanitizeEndpoint(config.baseUrl)}`,
artifacts_json: "{}",
});

return new Response(upstreamResponse.body, {
status: upstreamResponse.status,
headers: upstreamResponse.headers,
});
} catch (error) {
this.logRouteError("proxy/v1/chat/completions", error, selectedPath);
return c.json({ error: "Proxy request failed" }, 502);
}
});

app.get("/api/tournaments", async (c) => {
const selectedPath = this.resolveSelectedPath(c.req.query("project"));
Expand Down
205 changes: 205 additions & 0 deletions universal-refiner/tests/proxy.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
// secret-scan: allow-fixture
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import * as fs from "node:fs";
import * as os from "node:os";
import * as path from "node:path";
import { CommandCenterDashboard } from "../src/core/dashboard.js";
import { ConfigManager, type SemanticConfig } from "../src/core/config.js";
import { EventStore } from "../src/history/event-store.js";

describe("dashboard OpenAI-compatible proxy route", () => {
let testDir: string;
let repoDir: string;
let store: EventStore;
const semanticConfig: SemanticConfig = {
localEnabled: true,
mcpSamplingEnabled: false,
baseUrl: "http://127.0.0.1:9000/v1",
models: ["gemma3:12b"],
timeoutMs: 1_000,
temperature: 0.2,
allowNonLoopback: false,
};

beforeEach(() => {
testDir = fs.mkdtempSync(path.join(os.tmpdir(), "proxy-route-"));
repoDir = path.join(testDir, "repo");
fs.mkdirSync(repoDir, { recursive: true });
process.env.PROMPT_REFINER_GLOBAL_DIR = path.join(testDir, "global");
(EventStore as unknown as { instance: EventStore | null }).instance = null;
store = EventStore.getInstance();
vi.spyOn(ConfigManager, "getSemanticConfig").mockReturnValue(semanticConfig);
});

afterEach(() => {
vi.restoreAllMocks();
store.close();
(EventStore as unknown as { instance: EventStore | null }).instance = null;
delete process.env.PROMPT_REFINER_GLOBAL_DIR;
fs.rmSync(testDir, { recursive: true, force: true });
});

it("records the prompt, forwards the request, and records a completed execution", async () => {
const fetchMock = vi.fn().mockResolvedValue(new Response("mock LLM response", {
status: 200,
headers: { "content-type": "text/plain" },
}));
vi.stubGlobal("fetch", fetchMock);
const app = CommandCenterDashboard.createApp(repoDir);

const response = await app.request("/proxy/v1/chat/completions", {
method: "POST",
headers: {
"content-type": "application/json",
authorization: "Bearer local-token",
},
body: JSON.stringify({
messages: [{ role: "user", content: "Improve this prompt" }],
}),
});

expect(response.status).toBe(200);
expect(await response.text()).toBe("mock LLM response");
expect(fetchMock).toHaveBeenCalledWith("http://127.0.0.1:9000/v1/chat/completions", expect.objectContaining({
method: "POST",
headers: expect.objectContaining({
"Content-Type": "application/json",
"Authorization": "Bearer local-token",
}),
}));

const db = (store as any).db;
const prompt = db.prepare("SELECT * FROM prompts WHERE client = ?").get("API_PROXY");
expect(prompt.raw_prompt).toBe("Improve this prompt");
const execution = db.prepare("SELECT * FROM executions WHERE prompt_id = ?").get(prompt.id);
expect(execution).toMatchObject({
workflow_name: "proxy_forward",
executor_name: "LocalLLM",
status: "completed",
artifacts_json: "{}",
});
});

it("supports trailing-slash base URLs and falls back when no prompt message is present", async () => {
vi.spyOn(ConfigManager, "getSemanticConfig").mockReturnValueOnce({
...semanticConfig,
baseUrl: "http://127.0.0.1:9000/v1/",
});
const fetchMock = vi.fn().mockResolvedValue(new Response("ok", { status: 200 }));
vi.stubGlobal("fetch", fetchMock);
const app = CommandCenterDashboard.createApp(repoDir);

const response = await app.request("/proxy/v1/chat/completions", {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({ messages: "not-an-array" }),
});

expect(response.status).toBe(200);
expect(fetchMock).toHaveBeenCalledWith("http://127.0.0.1:9000/v1/chat/completions", expect.objectContaining({
headers: { "Content-Type": "application/json" },
}));
const db = (store as any).db;
expect(db.prepare("SELECT raw_prompt FROM prompts WHERE client = ?").get("API_PROXY"))
.toEqual({ raw_prompt: "Unknown proxy prompt" });
});

it("rejects malformed proxy requests before forwarding", async () => {
const fetchMock = vi.fn();
vi.stubGlobal("fetch", fetchMock);
const app = CommandCenterDashboard.createApp(repoDir);

expect((await app.request("/proxy/v1/chat/completions", { method: "POST", body: "{}" })).status).toBe(415);
expect((await app.request("/proxy/v1/chat/completions", {
method: "POST",
headers: { "content-type": "application/json" },
body: "{",
})).status).toBe(400);

vi.spyOn(ConfigManager, "getSemanticConfig").mockReturnValueOnce({
...semanticConfig,
baseUrl: "https://remote.example/v1",
allowNonLoopback: false,
});
expect((await app.request("/proxy/v1/chat/completions", {
method: "POST",
headers: { "content-type": "application/json" },
body: "{}",
})).status).toBe(403);

vi.spyOn(ConfigManager, "getSemanticConfig").mockReturnValueOnce({
...semanticConfig,
baseUrl: "not a url",
});
expect((await app.request("/proxy/v1/chat/completions", {
method: "POST",
headers: { "content-type": "application/json" },
body: "{}",
})).status).toBe(400);
expect(fetchMock).not.toHaveBeenCalled();
});

it("records failed executions for upstream HTTP and network failures", async () => {
const app = CommandCenterDashboard.createApp(repoDir);
vi.stubGlobal("fetch", vi.fn()
.mockResolvedValueOnce(new Response("token=secret-value", { status: 503, statusText: "Unavailable" }))
.mockRejectedValueOnce(new Error("connection refused with token=secret-value")));

const upstreamFailure = await app.request("/proxy/v1/chat/completions", {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({ messages: [{ role: "user", content: "First" }] }),
});
expect(upstreamFailure.status).toBe(503);
expect(await upstreamFailure.text()).toBe("token=secret-value");

const networkFailure = await app.request("/proxy/v1/chat/completions", {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({ messages: [{ role: "user", content: "Second" }] }),
});
expect(networkFailure.status).toBe(502);
expect(await networkFailure.json()).toEqual({ error: "Proxy request failed" });

const db = (store as any).db;
const failures = db.prepare("SELECT * FROM executions WHERE status = 'failed' ORDER BY started_at ASC").all();
expect(failures).toHaveLength(2);
expect(failures[0].result_summary).toContain("Upstream error: 503");
expect(failures[0].artifacts_json).toContain("token=[REDACTED]");
expect(failures[1].result_summary).toContain("Network error reaching upstream");
expect(failures[1].artifacts_json).toContain("token=[REDACTED]");
});

it("records non-Error network failures", async () => {
const app = CommandCenterDashboard.createApp(repoDir);
vi.stubGlobal("fetch", vi.fn().mockRejectedValue("string failure token=secret-value"));

const response = await app.request("/proxy/v1/chat/completions", {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({ messages: [{ role: "user", content: "Prompt" }] }),
});

expect(response.status).toBe(502);
const db = (store as any).db;
const failure = db.prepare("SELECT * FROM executions WHERE status = 'failed'").get();
expect(failure.result_summary).toContain("string failure");
expect(failure.artifacts_json).toContain("token=[REDACTED]");
});

it("returns a sanitized route failure when proxy bookkeeping fails", async () => {
const app = CommandCenterDashboard.createApp(repoDir);
vi.spyOn(EventStore, "getInstance").mockImplementationOnce(() => {
throw new Error("store secret");
});

const response = await app.request("/proxy/v1/chat/completions", {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({ messages: [] }),
});

expect(response.status).toBe(502);
expect(await response.json()).toEqual({ error: "Proxy request failed" });
});
});