diff --git a/apps/app/src/react-app/domains/settings/pages/cloud-workers-view.tsx b/apps/app/src/react-app/domains/settings/pages/cloud-workers-view.tsx index e2b08b5207..a57dd36081 100644 --- a/apps/app/src/react-app/domains/settings/pages/cloud-workers-view.tsx +++ b/apps/app/src/react-app/domains/settings/pages/cloud-workers-view.tsx @@ -3,6 +3,7 @@ import * as React from "react"; import { toast } from "@/components/ui/sonner"; import { Button } from "@/components/ui/button"; +import { Input } from "@/components/ui/input"; import { Separator } from "@/components/ui/separator"; import { t } from "@/i18n"; import { useCloudSession } from "@/react-app/domains/settings/cloud/cloud-session-provider"; @@ -13,6 +14,11 @@ export type CloudWorkersViewProps = { connectRemoteWorkspace: (input: { openworkHostUrl?: string | null; openworkToken?: string | null; + openworkClientToken?: string | null; + openworkHostToken?: string | null; + openworkDenBaseUrl?: string | null; + openworkDenOrgId?: string | null; + openworkDenWorkerId?: string | null; directory?: string | null; displayName?: string | null; }) => Promise; @@ -23,11 +29,18 @@ export function CloudWorkersView({ connectRemoteWorkspace, onOpenAccount, }: CloudWorkersViewProps) { - const { activeOrganization: activeOrg, authToken, client, isSignedIn, user } = useCloudSession(); + const { activeOrganization: activeOrg, authToken, baseUrl, client, isSignedIn, user } = useCloudSession(); const [workersBusy, setWorkersBusy] = React.useState(false); const [openingWorkerId, setOpeningWorkerId] = React.useState(null); + const [attachBusy, setAttachBusy] = React.useState(false); const [workers, setWorkers] = React.useState([]); const [workersError, setWorkersError] = React.useState(null); + const [staticWorkerForm, setStaticWorkerForm] = React.useState({ + name: "LAN static worker", + url: "", + clientToken: "", + hostToken: "", + }); const activeOrgId = activeOrg?.id ?? ""; const refreshWorkers = React.useCallback( @@ -82,7 +95,7 @@ export function CloudWorkersView({ try { const tokens = await client.getWorkerTokens(workerId, activeOrgId); const openworkUrl = tokens.openworkUrl?.trim() ?? ""; - const accessToken = tokens.ownerToken?.trim() || tokens.clientToken?.trim() || ""; + const accessToken = tokens.clientToken?.trim() || tokens.ownerToken?.trim() || ""; if (!openworkUrl || !accessToken) { throw new Error(t("den.error_worker_not_ready")); } @@ -90,6 +103,11 @@ export function CloudWorkersView({ const ok = await connectRemoteWorkspace({ openworkHostUrl: openworkUrl, openworkToken: accessToken, + openworkClientToken: tokens.clientToken?.trim() || null, + openworkHostToken: tokens.hostToken?.trim() || null, + openworkDenBaseUrl: baseUrl, + openworkDenOrgId: activeOrgId, + openworkDenWorkerId: workerId, directory: null, displayName: workerName, }); @@ -108,9 +126,47 @@ export function CloudWorkersView({ setOpeningWorkerId(null); } }, - [activeOrgId, client, connectRemoteWorkspace], + [activeOrgId, baseUrl, client, connectRemoteWorkspace], ); + const attachStaticWorker = React.useCallback(async () => { + if (!activeOrgId) { + setWorkersError(t("den.error_choose_org")); + return; + } + + const name = staticWorkerForm.name.trim(); + const url = staticWorkerForm.url.trim(); + const clientToken = staticWorkerForm.clientToken.trim(); + const hostToken = staticWorkerForm.hostToken.trim(); + if (!name || !url || !clientToken || !hostToken) { + setWorkersError("Name, URL, client token, and host token are required to attach a static worker."); + return; + } + + setAttachBusy(true); + setWorkersError(null); + try { + const worker = await client.attachStaticWorker(activeOrgId, { + name, + url, + clientToken, + hostToken, + }); + setWorkers((current) => [worker, ...current.filter((entry) => entry.workerId !== worker.workerId)]); + setStaticWorkerForm((current) => ({ ...current, url: "", clientToken: "", hostToken: "" })); + toast.success(`Attached ${worker.workerName}`); + void refreshWorkers(true); + } catch (error) { + const status = typeof error === "object" && error !== null && "status" in error ? Number((error as { status?: unknown }).status) : null; + setWorkersError(status === 403 + ? "Only organization owners and admins can attach static workers. Ask an operator to register this worker." + : error instanceof Error ? error.message : "Static worker attach failed."); + } finally { + setAttachBusy(false); + } + }, [activeOrgId, client, refreshWorkers, staticWorkerForm]); + if (!isSignedIn) { return ( @@ -130,6 +186,45 @@ export function CloudWorkersView({ return ( + +
+
+
Admin/operator: attach LAN static worker
+
+ Organization owners and admins can register a pre-running OpenWork worker without manual database changes. The URL and tokens must match the worker container environment. +
+
+
+ setStaticWorkerForm((current) => ({ ...current, name: event.currentTarget.value }))} + placeholder="Worker name" + /> + setStaticWorkerForm((current) => ({ ...current, url: event.currentTarget.value }))} + placeholder="http://192.168.1.50:8787" + /> + setStaticWorkerForm((current) => ({ ...current, clientToken: event.currentTarget.value }))} + placeholder="OPENWORK_TOKEN" + type="password" + /> + setStaticWorkerForm((current) => ({ ...current, hostToken: event.currentTarget.value }))} + placeholder="OPENWORK_HOST_TOKEN" + type="password" + /> +
+
+ +
+
+
= 0 ? workspaceIndex : legacyIndex; + if (mountIndex >= 0 && segments[mountIndex + 1]) { + const prefix = segments.slice(0, mountIndex).join("/"); + url.pathname = prefix ? `/${prefix}` : "/"; + } + return url.toString().replace(/\/+$/, ""); + } catch { + return raw.replace(/\/(?:workspace|w)\/[^/?#]+.*$/, "").replace(/\/+$/, "") || raw; + } +} + +export function isDesktopFetchAllowedForWorkspaces(url, workspaces) { + let parsed; + try { + parsed = new URL(trim(url)); + } catch { + return false; + } + if (!["http:", "https:"].includes(parsed.protocol)) return false; + + for (const workspace of Array.isArray(workspaces) ? workspaces : []) { + if (workspace?.workspaceType !== "remote") continue; + for (const candidate of [workspace.baseUrl, workspace.openworkHostUrl]) { + const stripped = stripOpenworkWorkspaceMount(candidate); + if (!stripped) continue; + try { + if (new URL(stripped).origin === parsed.origin) return true; + } catch { + // Ignore malformed persisted values; they are not valid fetch targets. + } + } + } + return false; +} diff --git a/apps/desktop/electron/remote-workspace.test.mjs b/apps/desktop/electron/remote-workspace.test.mjs index c7ba555332..c82fd3a016 100644 --- a/apps/desktop/electron/remote-workspace.test.mjs +++ b/apps/desktop/electron/remote-workspace.test.mjs @@ -2,6 +2,7 @@ import { describe, it } from "node:test"; import assert from "node:assert/strict"; import { + isDesktopFetchAllowedForWorkspaces, openworkWorkspaceDisplayName, selectOpenworkWorkspaceForConnection, } from "./remote-workspace.mjs"; @@ -87,6 +88,26 @@ describe("selectOpenworkWorkspaceForConnection", () => { }); }); +describe("isDesktopFetchAllowedForWorkspaces", () => { + const workspaces = [ + { + workspaceType: "remote", + baseUrl: "https://worker.example.com/w/rem_ws_123", + openworkHostUrl: "https://worker.example.com", + }, + { workspaceType: "local", baseUrl: "https://ignored.example.com" }, + ]; + + it("allows configured remote workspace origins", () => { + assert.equal(isDesktopFetchAllowedForWorkspaces("https://worker.example.com/workspaces", workspaces), true); + }); + + it("rejects unconfigured origins and non-HTTP protocols", () => { + assert.equal(isDesktopFetchAllowedForWorkspaces("https://attacker.example.com/workspaces", workspaces), false); + assert.equal(isDesktopFetchAllowedForWorkspaces("file:///etc/passwd", workspaces), false); + }); +}); + describe("openworkWorkspaceDisplayName", () => { it("prefers display fields before id", () => { assert.equal( diff --git a/ee/apps/den-api/src/db.ts b/ee/apps/den-api/src/db.ts index bf48dfb132..a8fa313c07 100644 --- a/ee/apps/den-api/src/db.ts +++ b/ee/apps/den-api/src/db.ts @@ -1,8 +1,9 @@ import { createDenDb } from "@openwork-ee/den-db" import { env } from "./env.js" -export const { db } = createDenDb({ +export const denDb = createDenDb({ databaseUrl: env.databaseUrl, mode: env.dbMode, planetscale: env.planetscale, }) +export const { client: dbClient, db } = denDb diff --git a/ee/apps/den-api/src/env.ts b/ee/apps/den-api/src/env.ts index 61cbd19614..478245884e 100644 --- a/ee/apps/den-api/src/env.ts +++ b/ee/apps/den-api/src/env.ts @@ -34,8 +34,17 @@ const EnvSchema = z.object({ PORT: z.string().optional(), CORS_ORIGINS: z.string().optional(), WORKER_PROXY_PORT: z.string().optional(), - PROVISIONER_MODE: z.enum(["stub", "render", "daytona"]).optional(), + PROVISIONER_MODE: z.enum(["stub", "render", "daytona", "static"]).optional(), WORKER_URL_TEMPLATE: z.string().optional(), + STATIC_WORKER_URLS: z.string().optional(), + STATIC_WORKER_HEALTH_PATH: z.string().optional(), + STATIC_WORKER_HEALTHCHECK_TIMEOUT_MS: z.string().optional(), + STATIC_WORKER_HEALTHCHECK_INTERVAL_MS: z.string().optional(), + STATIC_WORKER_RESERVATION_TTL_MS: z.string().optional(), + STATIC_WORKER_TOKEN_MAP_JSON: z.string().optional(), + STATIC_WORKER_ATTACH_ALLOW_PRIVATE: z.string().optional(), + STATIC_WORKER_ATTACH_ALLOWED_HOSTS: z.string().optional(), + STATIC_WORKER_ATTACH_ALLOWED_CIDRS: z.string().optional(), WORKER_ACTIVITY_BASE_URL: z.string().optional(), OPENWORK_DAYTONA_ENV_PATH: z.string().optional(), RENDER_API_BASE: z.string().optional(), @@ -123,6 +132,17 @@ const EnvSchema = z.object({ } } } + + if (value.PROVISIONER_MODE === "static") { + const staticConfig = parseStaticWorkersEnv(value) + for (const issue of staticConfig.issues) { + ctx.addIssue({ + code: z.ZodIssueCode.custom, + message: issue.message, + path: [issue.path], + }) + } + } }) const parsed = EnvSchema.parse(process.env) @@ -147,6 +167,214 @@ function normalizeOrigin(origin: string) { return value.replace(/\/+$/, "") } +type StaticWorkersEnvInput = { + STATIC_WORKER_URLS?: string + STATIC_WORKER_HEALTH_PATH?: string + STATIC_WORKER_HEALTHCHECK_TIMEOUT_MS?: string + STATIC_WORKER_HEALTHCHECK_INTERVAL_MS?: string + STATIC_WORKER_RESERVATION_TTL_MS?: string + STATIC_WORKER_TOKEN_MAP_JSON?: string + STATIC_WORKER_ATTACH_ALLOW_PRIVATE?: string + STATIC_WORKER_ATTACH_ALLOWED_HOSTS?: string + STATIC_WORKER_ATTACH_ALLOWED_CIDRS?: string +} + +type StaticWorkersEnvIssue = { + path: keyof StaticWorkersEnvInput + message: string +} + +function parsePositiveInteger(value: string | undefined, fallback: number) { + const raw = value?.trim() + if (!raw) { + return fallback + } + const parsedValue = Number(raw) + return Number.isInteger(parsedValue) && parsedValue > 0 ? parsedValue : null +} + +function normalizeStaticWorkerUrl(value: string) { + const parsedUrl = new URL(value.trim()) + parsedUrl.hash = "" + parsedUrl.search = "" + parsedUrl.pathname = parsedUrl.pathname.replace(/\/+$/, "") + const serialized = parsedUrl.toString().replace(/\/+$/, "") + return serialized +} + +function isRecord(value: unknown): value is Record { + return typeof value === "object" && value !== null && !Array.isArray(value) +} + +function parseStaticWorkerTokenPair(value: unknown) { + if (!isRecord(value)) { + return null + } + const clientToken = typeof value.clientToken === "string" ? value.clientToken.trim() : "" + const hostToken = typeof value.hostToken === "string" ? value.hostToken.trim() : "" + return clientToken && hostToken ? { clientToken, hostToken } : null +} + +export function parseStaticWorkersEnv(input: StaticWorkersEnvInput) { + const issues: StaticWorkersEnvIssue[] = [] + const urls: string[] = [] + const seenUrls = new Set() + const tokenMap: Record = {} + + for (const rawUrl of splitCsv(input.STATIC_WORKER_URLS)) { + let normalizedUrl: string + try { + const parsedUrl = new URL(rawUrl) + if (parsedUrl.protocol !== "http:" && parsedUrl.protocol !== "https:") { + issues.push({ + path: "STATIC_WORKER_URLS", + message: "STATIC_WORKER_URLS entries must use http or https URLs", + }) + continue + } + normalizedUrl = normalizeStaticWorkerUrl(rawUrl) + } catch { + issues.push({ + path: "STATIC_WORKER_URLS", + message: "STATIC_WORKER_URLS entries must be valid URLs", + }) + continue + } + + if (seenUrls.has(normalizedUrl)) { + issues.push({ + path: "STATIC_WORKER_URLS", + message: `STATIC_WORKER_URLS contains duplicate URL ${normalizedUrl}`, + }) + continue + } + + seenUrls.add(normalizedUrl) + urls.push(normalizedUrl) + } + + if (urls.length === 0) { + issues.push({ + path: "STATIC_WORKER_URLS", + message: "STATIC_WORKER_URLS is required when PROVISIONER_MODE=static", + }) + } + + const rawTokenMap = optionalString(input.STATIC_WORKER_TOKEN_MAP_JSON) + if (!rawTokenMap) { + issues.push({ + path: "STATIC_WORKER_TOKEN_MAP_JSON", + message: "STATIC_WORKER_TOKEN_MAP_JSON is required when PROVISIONER_MODE=static", + }) + } else { + try { + const parsedTokenMap = JSON.parse(rawTokenMap) as unknown + if (!isRecord(parsedTokenMap)) { + issues.push({ + path: "STATIC_WORKER_TOKEN_MAP_JSON", + message: "STATIC_WORKER_TOKEN_MAP_JSON must be a JSON object keyed by worker URL", + }) + } else { + for (const [rawUrl, rawPair] of Object.entries(parsedTokenMap)) { + let normalizedUrl: string + try { + normalizedUrl = normalizeStaticWorkerUrl(rawUrl) + } catch { + issues.push({ + path: "STATIC_WORKER_TOKEN_MAP_JSON", + message: "STATIC_WORKER_TOKEN_MAP_JSON keys must be valid worker URLs", + }) + continue + } + + const pair = parseStaticWorkerTokenPair(rawPair) + if (!pair) { + issues.push({ + path: "STATIC_WORKER_TOKEN_MAP_JSON", + message: `STATIC_WORKER_TOKEN_MAP_JSON entry for ${normalizedUrl} must include non-empty clientToken and hostToken`, + }) + continue + } + tokenMap[normalizedUrl] = pair + } + + for (const url of urls) { + if (!tokenMap[url]) { + issues.push({ + path: "STATIC_WORKER_TOKEN_MAP_JSON", + message: `STATIC_WORKER_TOKEN_MAP_JSON is missing token pair for ${url}`, + }) + } + } + + const configuredUrls = new Set(urls) + for (const url of Object.keys(tokenMap)) { + if (!configuredUrls.has(url)) { + issues.push({ + path: "STATIC_WORKER_TOKEN_MAP_JSON", + message: `STATIC_WORKER_TOKEN_MAP_JSON contains token pair for unconfigured URL ${url}`, + }) + } + } + } + } catch { + issues.push({ + path: "STATIC_WORKER_TOKEN_MAP_JSON", + message: "STATIC_WORKER_TOKEN_MAP_JSON must be valid JSON", + }) + } + } + + const healthPath = optionalString(input.STATIC_WORKER_HEALTH_PATH) ?? "/health" + if (!healthPath.startsWith("/") || healthPath.startsWith("//") || healthPath.includes("?")) { + issues.push({ + path: "STATIC_WORKER_HEALTH_PATH", + message: "STATIC_WORKER_HEALTH_PATH must be an absolute path such as /health", + }) + } + + const healthcheckTimeoutMs = parsePositiveInteger(input.STATIC_WORKER_HEALTHCHECK_TIMEOUT_MS, 10000) + if (healthcheckTimeoutMs === null) { + issues.push({ + path: "STATIC_WORKER_HEALTHCHECK_TIMEOUT_MS", + message: "STATIC_WORKER_HEALTHCHECK_TIMEOUT_MS must be a positive integer", + }) + } + + const healthcheckIntervalMs = parsePositiveInteger(input.STATIC_WORKER_HEALTHCHECK_INTERVAL_MS, 1000) + if (healthcheckIntervalMs === null) { + issues.push({ + path: "STATIC_WORKER_HEALTHCHECK_INTERVAL_MS", + message: "STATIC_WORKER_HEALTHCHECK_INTERVAL_MS must be a positive integer", + }) + } + + const reservationTtlMs = parsePositiveInteger(input.STATIC_WORKER_RESERVATION_TTL_MS, 300000) + if (reservationTtlMs === null) { + issues.push({ + path: "STATIC_WORKER_RESERVATION_TTL_MS", + message: "STATIC_WORKER_RESERVATION_TTL_MS must be a positive integer", + }) + } + + const allowPrivateAttach = (input.STATIC_WORKER_ATTACH_ALLOW_PRIVATE ?? "false").trim().toLowerCase() === "true" + const attachAllowedHosts = splitCsv(input.STATIC_WORKER_ATTACH_ALLOWED_HOSTS).map((host) => host.toLowerCase()) + const attachAllowedCidrs = splitCsv(input.STATIC_WORKER_ATTACH_ALLOWED_CIDRS) + + return { + urls, + healthPath, + healthcheckTimeoutMs: healthcheckTimeoutMs ?? 10000, + healthcheckIntervalMs: healthcheckIntervalMs ?? 1000, + reservationTtlMs: reservationTtlMs ?? 300000, + tokenMap, + allowPrivateAttach, + attachAllowedHosts, + attachAllowedCidrs, + issues, + } +} + const corsOrigins = splitCsv(parsed.CORS_ORIGINS).map((origin) => normalizeOrigin(origin)) const betterAuthTrustedOrigins = splitCsv(parsed.DEN_BETTER_AUTH_TRUSTED_ORIGINS) .map((origin) => normalizeOrigin(origin)) @@ -159,6 +387,7 @@ const requireEmailVerification = parsed.DEN_REQUIRE_EMAIL_VERIFICATION === undef ? !devMode : parsed.DEN_REQUIRE_EMAIL_VERIFICATION.trim().toLowerCase() !== "false" const port = Number(parsed.PORT ?? "8790") +const staticWorkers = parseStaticWorkersEnv(parsed) const daytonaSandboxPublic = (parsed.DAYTONA_SANDBOX_PUBLIC ?? "false").toLowerCase() === "true" @@ -223,6 +452,17 @@ export const env = { corsOrigins, provisionerMode: parsed.PROVISIONER_MODE ?? "daytona", workerUrlTemplate: parsed.WORKER_URL_TEMPLATE, + staticWorkers: { + urls: staticWorkers.urls, + healthPath: staticWorkers.healthPath, + healthcheckTimeoutMs: staticWorkers.healthcheckTimeoutMs, + healthcheckIntervalMs: staticWorkers.healthcheckIntervalMs, + reservationTtlMs: staticWorkers.reservationTtlMs, + tokenMap: staticWorkers.tokenMap, + allowPrivateAttach: staticWorkers.allowPrivateAttach, + attachAllowedHosts: staticWorkers.attachAllowedHosts, + attachAllowedCidrs: staticWorkers.attachAllowedCidrs, + }, workerActivityBaseUrl: optionalString(parsed.WORKER_ACTIVITY_BASE_URL) ?? parsed.BETTER_AUTH_URL.trim().replace(/\/+$/, ""), diff --git a/ee/apps/den-api/src/routes/workers/core.ts b/ee/apps/den-api/src/routes/workers/core.ts index 2d0be1fe80..9728a37619 100644 --- a/ee/apps/den-api/src/routes/workers/core.ts +++ b/ee/apps/den-api/src/routes/workers/core.ts @@ -1,17 +1,21 @@ -import { desc, eq } from "@openwork-ee/den-db/drizzle" -import { WorkerTable, WorkerTokenTable } from "@openwork-ee/den-db/schema" +import { and, desc, eq, inArray } from "@openwork-ee/den-db/drizzle" +import { WorkerInstanceTable, WorkerTable, WorkerTokenTable } from "@openwork-ee/den-db/schema" import { createDenTypeId, normalizeDenTypeId } from "@openwork-ee/utils/typeid" import type { Hono } from "hono" +import type { MiddlewareHandler } from "hono" import { describeRoute } from "hono-openapi" import { z } from "zod" import { db } from "../../db.js" -import { jsonValidator, paramValidator, queryValidator, requireUserMiddleware, resolveUserOrganizationsMiddleware } from "../../middleware/index.js" +import { env } from "../../env.js" +import { jsonValidator, paramValidator, queryValidator, requireUserMiddleware, resolveOrganizationContextMiddleware, resolveUserOrganizationsMiddleware } from "../../middleware/index.js" import { denTypeIdSchema, emptyResponse, forbiddenSchema, invalidRequestSchema, jsonResponse, notFoundSchema, unauthorizedSchema } from "../../openapi.js" import { getOrganizationLimitStatus } from "../../organization-limits.js" import { getRequiredUserEmail } from "../../user.js" import type { WorkerRouteVariables } from "./shared.js" import { continueCloudProvisioning, + attachStaticWorkerSchema, + canAttachStaticWorkerForMember, createWorkerSchema, deleteWorkerCascade, getLatestWorkerInstance, @@ -20,10 +24,14 @@ import { listWorkersQuerySchema, parseWorkerIdParam, requireCloudAccessOrPayment, + shouldUseSignupAutoExistingWorker, toInstanceResponse, toWorkerResponse, token, updateWorkerSchema, + type ValidatedStaticWorkerAttachUrl, + validateResolvedStaticWorkerAttachUrl, + withStaticAssignmentLock, workerIdParamSchema, } from "./shared.js" @@ -79,6 +87,15 @@ const workerCreateResponseSchema = z.object({ }), }).meta({ ref: "WorkerCreateResponse" }) +const staticWorkerAttachResponseSchema = z.object({ + worker: workerSchema, + instance: workerInstanceSchema, + launch: z.object({ + mode: z.literal("attached"), + pollAfterMs: z.literal(0), + }), +}).meta({ ref: "StaticWorkerAttachResponse" }) + const workerTokensResponseSchema = z.object({ tokens: z.object({ owner: z.string(), @@ -91,6 +108,8 @@ const workerTokensResponseSchema = z.object({ }).nullable(), }).meta({ ref: "WorkerTokensResponse" }) +type WorkerRow = typeof WorkerTable.$inferSelect + const organizationUnavailableSchema = z.object({ error: z.literal("organization_unavailable"), }).meta({ ref: "OrganizationUnavailableError" }) @@ -124,6 +143,310 @@ const workerRuntimeUnavailableSchema = z.object({ message: z.string(), })).meta({ ref: "WorkerConnectionError" }) +async function getExistingSignupAutoWorkerResponse(input: { + orgId: WorkerRow["org_id"] + userId: NonNullable +}) { + const rows = await db + .select() + .from(WorkerTable) + .where(and( + eq(WorkerTable.org_id, input.orgId), + eq(WorkerTable.created_by_user_id, input.userId), + eq(WorkerTable.destination, "cloud"), + inArray(WorkerTable.status, ["provisioning", "healthy"]), + )) + .orderBy(desc(WorkerTable.created_at)) + .limit(1) + + const existingWorker = rows[0] + if (!existingWorker) { + return null + } + + const instance = await getLatestWorkerInstance(existingWorker.id) + const tokensAndConnect = await getWorkerTokensAndConnect(existingWorker) + const tokens = "tokens" in tokensAndConnect ? tokensAndConnect.tokens : undefined + if (!tokens) { + return null + } + + return { + worker: toWorkerResponse(existingWorker, input.userId), + tokens, + instance: toInstanceResponse(instance), + launch: { mode: "existing", pollAfterMs: 0 }, + } +} + +export async function fetchStaticWorker(url: string, path: string, headers: Record) { + return fetch(`${url}${path}`, { + method: "GET", + redirect: "manual", + headers: { Accept: "application/json", ...headers }, + signal: AbortSignal.timeout(env.staticWorkers.healthcheckTimeoutMs), + }) +} + +function formatIpForUrl(address: string) { + return address.includes(":") ? `[${address}]` : address +} + +export async function fetchPinnedStaticWorker(target: ValidatedStaticWorkerAttachUrl, path: string, headers: Record) { + const original = new URL(target.url) + const resolvedAddress = original.protocol === "http:" ? target.resolvedAddresses[0]?.address : undefined + const pinned = resolvedAddress ? new URL(target.url) : original + if (resolvedAddress) { + pinned.hostname = formatIpForUrl(resolvedAddress) + } + const hostHeader = original.port ? `${original.hostname}:${original.port}` : original.hostname + return fetch(`${pinned.toString().replace(/\/+$/, "")}${path}`, { + method: "GET", + redirect: "manual", + headers: { + Accept: "application/json", + ...(resolvedAddress && original.protocol === "http:" ? { Host: hostHeader } : {}), + ...headers, + }, + signal: AbortSignal.timeout(env.staticWorkers.healthcheckTimeoutMs), + }) +} + +export async function assertStaticWorkerReachable(url: string | ValidatedStaticWorkerAttachUrl, clientToken: string, hostToken: string) { + const fetchWorker = typeof url === "string" + ? (path: string, headers: Record) => fetchStaticWorker(url, path, headers) + : (path: string, headers: Record) => fetchPinnedStaticWorker(url, path, headers) + + const clientResponse = await fetchWorker("/workspaces", { + Authorization: `Bearer ${clientToken}`, + }) + + if (!clientResponse.ok) { + throw new Error(`Worker rejected the provided client token with HTTP ${clientResponse.status}`) + } + + const hostResponse = await fetchWorker("/env/keys", { + "X-OpenWork-Host-Token": hostToken, + }) + + if (!hostResponse.ok) { + throw new Error(`Worker rejected the provided host token with HTTP ${hostResponse.status}`) + } +} + +type StaticAttachTx = Pick +type StaticAttachInput = z.infer +type StaticAttachRouteDeps = { + middlewares?: MiddlewareHandler<{ Variables: WorkerRouteVariables }>[] + data?: StaticAttachTx + lookup?: Parameters[2] + fetchReachable?: (url: ValidatedStaticWorkerAttachUrl, clientToken: string, hostToken: string) => Promise + lock?: (run: (tx: StaticAttachTx) => Promise) => Promise + getWorkerLimit?: typeof getOrganizationLimitStatus +} + +async function findActiveStaticWorkerByUrl(tx: Pick, normalizedUrl: string) { + return tx + .select({ id: WorkerInstanceTable.id }) + .from(WorkerInstanceTable) + .where( + and( + eq(WorkerInstanceTable.provider, "static"), + eq(WorkerInstanceTable.url, normalizedUrl), + inArray(WorkerInstanceTable.status, ["provisioning", "healthy"]), + ), + ) + .limit(1) +} + +function staticAttachDuplicateResponse() { + return { + error: "worker_url_already_attached", + message: "This static worker URL is already attached to an active Den worker.", + } +} + +export function registerStaticWorkerAttachRoute(app: Hono<{ Variables: WorkerRouteVariables }>, deps: StaticAttachRouteDeps = {}) { + const routeMiddlewares = deps.middlewares ?? [requireUserMiddleware, resolveOrganizationContextMiddleware, jsonValidator(attachStaticWorkerSchema)] + const data = deps.data ?? db + const fetchReachable = deps.fetchReachable ?? assertStaticWorkerReachable + const lock = deps.lock ?? ((run) => withStaticAssignmentLock(run)) + const getWorkerLimit = deps.getWorkerLimit ?? getOrganizationLimitStatus + + app.post( + "/v1/workers/static-attach", + describeRoute({ + tags: ["Workers"], + summary: "Attach static worker", + description: "Registers a pre-running LAN/OpenWork worker for the active organization using its existing runtime URL and tokens.", + responses: { + 201: jsonResponse("Static worker attached successfully.", staticWorkerAttachResponseSchema), + 400: jsonResponse("The static worker attach payload was invalid.", invalidRequestSchema), + 401: jsonResponse("The caller must be signed in to attach workers.", unauthorizedSchema), + 403: jsonResponse("Only organization owners and admins can attach static workers.", forbiddenSchema), + 409: jsonResponse("The organization has reached its worker limit or the URL is already attached.", orgLimitReachedSchema.or(z.object({ error: z.literal("worker_url_already_attached"), message: z.string() }))), + }, + }), + ...(routeMiddlewares as never[]), + async (c) => { + const user = c.get("user") + const orgId = c.get("activeOrganizationId") + const organizationContext = c.get("organizationContext") + const input = c.req.valid("json" as never) as StaticAttachInput + + if (!user?.id) { + return c.json({ error: "unauthorized" }, 401) + } + + if (!orgId) { + return c.json({ error: "organization_unavailable" }, 400) + } + + const normalizedOrgId = normalizeDenTypeId("organization", orgId) + const normalizedUserId = normalizeDenTypeId("user", user.id) + + if (!organizationContext || !canAttachStaticWorkerForMember(organizationContext)) { + return c.json({ + error: "forbidden", + message: "Only organization owners and admins can attach static workers.", + }, 403) + } + + const validatedUrl = await validateResolvedStaticWorkerAttachUrl(input.url, { + allowPrivate: env.staticWorkers.allowPrivateAttach, + allowedHosts: env.staticWorkers.attachAllowedHosts, + allowedCidrs: env.staticWorkers.attachAllowedCidrs, + }, deps.lookup) + if (!validatedUrl.ok) { + return c.json({ error: "invalid_request", message: validatedUrl.message }, 400) + } + + const normalizedUrl = validatedUrl.url + const existing = await findActiveStaticWorkerByUrl(data, normalizedUrl) + if (existing.length > 0) { + return c.json(staticAttachDuplicateResponse(), 409) + } + + try { + await fetchReachable(validatedUrl, input.clientToken.trim(), input.hostToken.trim()) + } catch (error) { + return c.json({ + error: "invalid_request", + message: "Static worker verification failed with the provided URL and tokens.", + }, 400) + } + + const workerId = createDenTypeId("worker") + const instanceId = createDenTypeId("workerInstance") + const activityToken = input.activityToken?.trim() || token() + const now = new Date() + + const insertResult = await lock(async (tx) => { + const duplicateRows = await findActiveStaticWorkerByUrl(tx, normalizedUrl) + if (duplicateRows.length > 0) { + return { status: "duplicate" as const } + } + + const workerLimit = await getWorkerLimit(normalizedOrgId, "workers") + if (workerLimit.exceeded) { + return { status: "limit" as const, workerLimit } + } + + await tx.insert(WorkerTable).values({ + id: workerId, + org_id: normalizedOrgId, + created_by_user_id: normalizedUserId, + name: input.name, + description: input.description?.trim() || null, + destination: "cloud", + status: "healthy", + image_version: null, + workspace_path: null, + sandbox_backend: "static", + } as never) + + await tx.insert(WorkerTokenTable).values([ + { + id: createDenTypeId("workerToken"), + worker_id: workerId, + scope: "host", + token: input.hostToken.trim(), + }, + { + id: createDenTypeId("workerToken"), + worker_id: workerId, + scope: "client", + token: input.clientToken.trim(), + }, + { + id: createDenTypeId("workerToken"), + worker_id: workerId, + scope: "activity", + token: activityToken, + }, + ] as never) + + await tx.insert(WorkerInstanceTable).values({ + id: instanceId, + worker_id: workerId, + provider: "static", + region: "on-prem", + url: normalizedUrl, + status: "healthy", + } as never) + return { status: "inserted" as const } + }) + + if (insertResult.status === "duplicate") { + return c.json(staticAttachDuplicateResponse(), 409) + } + + if (insertResult.status === "limit") { + return c.json({ + error: "org_limit_reached", + limitType: "workers", + limit: insertResult.workerLimit.limit, + currentCount: insertResult.workerLimit.currentCount, + message: `This workspace currently supports up to ${insertResult.workerLimit.limit} workers. Contact support to increase the limit.`, + }, 409) + } + + return c.json({ + worker: toWorkerResponse( + { + id: workerId, + org_id: normalizedOrgId, + created_by_user_id: normalizedUserId, + name: input.name, + description: input.description?.trim() || null, + destination: "cloud", + status: "healthy", + image_version: null, + workspace_path: null, + sandbox_backend: "static", + last_heartbeat_at: null, + last_active_at: null, + created_at: now, + updated_at: now, + }, + normalizedUserId, + ), + instance: toInstanceResponse({ + id: instanceId, + worker_id: workerId, + provider: "static", + region: "on-prem", + url: normalizedUrl, + status: "healthy", + created_at: now, + updated_at: now, + }), + launch: { mode: "attached", pollAfterMs: 0 }, + }, 201) + }, + ) +} + export function registerWorkerCoreRoutes(app: Hono) { app.get( "/v1/workers", @@ -197,6 +520,16 @@ export function registerWorkerCoreRoutes) + app.get( "/v1/workers/:id", describeRoute({ diff --git a/ee/apps/den-api/src/routes/workers/shared.ts b/ee/apps/den-api/src/routes/workers/shared.ts index cbf1f01d25..f55402f637 100644 --- a/ee/apps/den-api/src/routes/workers/shared.ts +++ b/ee/apps/den-api/src/routes/workers/shared.ts @@ -1,5 +1,7 @@ import { randomBytes } from "node:crypto" -import { and, asc, desc, eq, isNull } from "@openwork-ee/den-db/drizzle" +import { lookup as dnsLookup } from "node:dns/promises" +import { isIP } from "node:net" +import { and, asc, desc, eq, inArray, isNull, sql } from "@openwork-ee/den-db/drizzle" import { AuditEventTable, AuthUserTable, @@ -13,23 +15,48 @@ import { import { createDenTypeId, normalizeDenTypeId } from "@openwork-ee/utils/typeid" import { z } from "zod" import { requireCloudWorkerAccess } from "../../billing/polar.js" -import { db } from "../../db.js" +import { db, dbClient } from "../../db.js" import { env } from "../../env.js" -import type { UserOrganizationsContext } from "../../middleware/index.js" +import type { OrganizationContextVariables, UserOrganizationsContext } from "../../middleware/index.js" import { denTypeIdSchema } from "../../openapi.js" import type { AuthContextVariables } from "../../session.js" -import { deprovisionWorker, provisionWorker } from "../../workers/provisioner.js" +import { + checkStaticWorkerHealth, + deprovisionWorker, + getStaticWorkerTokenPairForUrl, + normalizeStaticWorkerUrl, + provisionWorker, + selectStaticWorkerUrlFromPool, + verifyStaticWorkerRuntimeAccess, +} from "../../workers/provisioner.js" import { customDomainForWorker } from "../../workers/vanity-domain.js" export const createWorkerSchema = z.object({ name: z.string().min(1), description: z.string().optional(), destination: z.enum(["local", "cloud"]), + source: z.enum(["manual", "signup_auto"]).optional(), workspacePath: z.string().optional(), sandboxBackend: z.string().optional(), imageVersion: z.string().optional(), }) +export function shouldUseSignupAutoExistingWorker(input: { + destination: "local" | "cloud" + source?: "manual" | "signup_auto" +}) { + return input.destination === "cloud" && input.source === "signup_auto" +} + +export const attachStaticWorkerSchema = z.object({ + name: z.string().trim().min(1).max(255), + description: z.string().trim().max(1024).optional(), + url: z.string().url(), + clientToken: z.string().trim().min(1).max(128), + hostToken: z.string().trim().min(1).max(128), + activityToken: z.string().trim().min(1).max(128).optional(), +}) + export const updateWorkerSchema = z.object({ name: z.string().trim().min(1).max(255), }) @@ -50,12 +77,21 @@ export const workerIdParamSchema = z.object({ }) export type WorkerRouteVariables = AuthContextVariables & Partial + & Partial type WorkerRow = typeof WorkerTable.$inferSelect type WorkerInstanceRow = typeof WorkerInstanceTable.$inferSelect export type WorkerId = WorkerRow["id"] type OrgId = typeof MemberTable.$inferSelect.organizationId type UserId = typeof AuthUserTable.$inferSelect.id +type StaticAssignmentDb = Pick +type MySqlLockConnection = { + query: (statement: string, values?: unknown[]) => Promise + release: () => void +} +type MySqlLockPool = { + getConnection: () => Promise +} export const token = () => randomBytes(32).toString("hex") @@ -63,6 +99,262 @@ export function parseWorkerIdParam(value: string): WorkerId { return normalizeDenTypeId("worker", value) } +export function normalizeWorkerRuntimeUrl(value: string): string { + const parsed = new URL(value.trim()) + parsed.hash = "" + parsed.search = "" + parsed.username = "" + parsed.password = "" + parsed.pathname = parsed.pathname.replace(/\/+$/, "") + return parsed.toString().replace(/\/+$/, "") +} + +export type StaticWorkerAttachUrlPolicy = { + allowPrivate: boolean + allowedHosts: readonly string[] + allowedCidrs: readonly string[] +} + +export type DnsLookupAddress = { + address: string + family: 4 | 6 +} + +export type StaticWorkerDnsLookup = (hostname: string) => Promise + +export type ValidatedStaticWorkerAttachUrl = { + ok: true + url: string + resolvedAddresses: DnsLookupAddress[] +} + +export function canAttachStaticWorkerForMember(payload: { currentMember: { isOwner: boolean; role: string } }) { + return payload.currentMember.isOwner || payload.currentMember.role.split(",").map((role) => role.trim()).includes("admin") +} + +function parseIpv4(value: string) { + const parts = value.split(".") + if (parts.length !== 4) { + return null + } + let result = 0 + for (const part of parts) { + if (!/^\d{1,3}$/.test(part)) { + return null + } + const octet = Number(part) + if (octet < 0 || octet > 255) { + return null + } + result = (result << 8) + octet + } + return result >>> 0 +} + +function ipv4InCidr(host: string, cidr: string) { + const [base, prefixRaw] = cidr.split("/") + const ip = parseIpv4(host) + const baseIp = parseIpv4(base ?? "") + const prefix = Number(prefixRaw) + if (ip === null || baseIp === null || !Number.isInteger(prefix) || prefix < 0 || prefix > 32) { + return false + } + const mask = prefix === 0 ? 0 : (0xffffffff << (32 - prefix)) >>> 0 + return (ip & mask) === (baseIp & mask) +} + +function parseIpv6(value: string): bigint | null { + const normalized = value.toLowerCase() + const zoneIndex = normalized.indexOf("%") + const withoutZone = zoneIndex === -1 ? normalized : normalized.slice(0, zoneIndex) + const [headRaw, tailRaw, extra] = withoutZone.split("::") + if (extra !== undefined) { + return null + } + + const parsePart = (part: string) => { + if (!part) { + return [] as number[] + } + const entries = part.split(":") + const words: number[] = [] + for (const entry of entries) { + if (!entry) { + return null + } + if (entry.includes(".")) { + const ipv4 = parseIpv4(entry) + if (ipv4 === null) { + return null + } + words.push((ipv4 >>> 16) & 0xffff, ipv4 & 0xffff) + continue + } + if (!/^[0-9a-f]{1,4}$/.test(entry)) { + return null + } + words.push(Number.parseInt(entry, 16)) + } + return words + } + + const head = parsePart(headRaw ?? "") + const tail = parsePart(tailRaw ?? "") + if (!head || !tail) { + return null + } + + const missing = tailRaw === undefined ? 0 : 8 - head.length - tail.length + if (missing < 0) { + return null + } + const words = [...head, ...Array.from({ length: missing }, () => 0), ...tail] + if (words.length !== 8) { + return null + } + + return words.reduce((result, word) => (result << 16n) + BigInt(word), 0n) +} + +function ipv6InCidr(host: string, cidr: string) { + const [base, prefixRaw] = cidr.split("/") + const ip = parseIpv6(host) + const baseIp = parseIpv6(base ?? "") + const prefix = Number(prefixRaw) + if (ip === null || baseIp === null || !Number.isInteger(prefix) || prefix < 0 || prefix > 128) { + return false + } + const hostBits = 128 - prefix + const mask = prefix === 0 ? 0n : ((1n << 128n) - 1n) ^ ((1n << BigInt(hostBits)) - 1n) + return (ip & mask) === (baseIp & mask) +} + +function ipInCidr(host: string, cidr: string) { + return isIP(host) === 4 ? ipv4InCidr(host, cidr) : isIP(host) === 6 ? ipv6InCidr(host, cidr) : false +} + +function isPrivateIpv4(hostname: string) { + const ip = parseIpv4(hostname) + if (ip === null) { + return false + } + return ipv4InCidr(hostname, "10.0.0.0/8") + || ipv4InCidr(hostname, "172.16.0.0/12") + || ipv4InCidr(hostname, "192.168.0.0/16") + || ipv4InCidr(hostname, "127.0.0.0/8") + || ipv4InCidr(hostname, "169.254.0.0/16") + || ip === 0 +} + +function isUnsafeIpv6(hostname: string) { + const ip = parseIpv6(hostname) + if (ip === null) { + return false + } + return ip === 0n + || ip === 1n + || ipv6InCidr(hostname, "fc00::/7") + || ipv6InCidr(hostname, "fe80::/10") + || ipv6InCidr(hostname, "::ffff:0:0/96") +} + +function isUnsafeAddress(hostname: string) { + return isPrivateIpv4(hostname) || isUnsafeIpv6(hostname) +} + +function isLocalHostname(hostname: string) { + const normalized = hostname.toLowerCase() + return normalized === "localhost" || normalized.endsWith(".local") || normalized.endsWith(".localhost") +} + +export function validateStaticWorkerAttachUrl(value: string, policy: StaticWorkerAttachUrlPolicy) { + let parsed: URL + try { + parsed = new URL(value.trim()) + } catch { + return { ok: false as const, message: "Worker URL must be a valid URL." } + } + + if (parsed.protocol !== "http:" && parsed.protocol !== "https:") { + return { ok: false as const, message: "Worker URL must use http or https." } + } + if (parsed.username || parsed.password) { + return { ok: false as const, message: "Worker URL must not include credentials." } + } + if (parsed.search || parsed.hash) { + return { ok: false as const, message: "Worker URL must not include query parameters or fragments." } + } + + const hostname = parsed.hostname.toLowerCase() + const allowedHosts = new Set(policy.allowedHosts.map((host) => host.trim().toLowerCase()).filter(Boolean)) + const hostExplicitlyAllowed = allowedHosts.has(hostname) + const cidrAllowed = policy.allowedCidrs.some((cidr) => ipInCidr(hostname, cidr.trim())) + const privateOrLocal = isUnsafeAddress(hostname) || isLocalHostname(hostname) + + if (privateOrLocal && !policy.allowPrivate && !hostExplicitlyAllowed && !cidrAllowed) { + return { + ok: false as const, + message: "Private and LAN worker URLs require explicit on-prem attach policy or an allowed host/CIDR.", + } + } + + return { ok: true as const, url: normalizeWorkerRuntimeUrl(value), resolvedAddresses: [] as DnsLookupAddress[] } +} + +async function defaultDnsLookup(hostname: string) { + if (isIP(hostname)) { + return [{ address: hostname, family: isIP(hostname) as 4 | 6 }] + } + return dnsLookup(hostname, { all: true }) as Promise +} + +export async function validateResolvedStaticWorkerAttachUrl( + value: string, + policy: StaticWorkerAttachUrlPolicy, + lookup: StaticWorkerDnsLookup = defaultDnsLookup, +) { + const basic = validateStaticWorkerAttachUrl(value, policy) + if (!basic.ok) { + return basic + } + + const parsed = new URL(basic.url) + const hostname = parsed.hostname.toLowerCase() + const allowedHosts = new Set(policy.allowedHosts.map((host) => host.trim().toLowerCase()).filter(Boolean)) + const hostExplicitlyAllowed = allowedHosts.has(hostname) + + let addresses: DnsLookupAddress[] + try { + addresses = await lookup(hostname) + } catch { + return { ok: false as const, message: "Worker URL hostname could not be resolved." } + } + + if (addresses.length === 0) { + return { ok: false as const, message: "Worker URL hostname could not be resolved." } + } + + if (parsed.protocol === "https:" && !isIP(hostname) && !hostExplicitlyAllowed) { + return { + ok: false as const, + message: "HTTPS static worker attach hostnames must be explicitly allowed by on-prem attach policy.", + } + } + + for (const entry of addresses) { + const address = entry.address.toLowerCase() + const cidrAllowed = policy.allowedCidrs.some((cidr) => ipInCidr(address, cidr.trim())) + if (isUnsafeAddress(address) && !policy.allowPrivate && !cidrAllowed) { + return { + ok: false as const, + message: "Worker URL resolves to a private, loopback, link-local, or metadata address that is not explicitly allowed.", + } + } + } + + return { ...basic, resolvedAddresses: addresses } +} + export function parseUserId(value: string): UserId { return normalizeDenTypeId("user", value) } @@ -71,9 +363,8 @@ function isRecord(value: unknown): value is Record { return typeof value === "object" && value !== null } -function normalizeUrl(value: string): string { - return value.trim().replace(/\/+$/, "") -} +const normalizeUrl = normalizeWorkerRuntimeUrl +const STATIC_PROVISIONING_LOCK_NAME = "den_static_provisioner_assignment" function parseWorkspaceSelection(payload: unknown): { workspaceId: string; openworkUrl: string } | null { if (!isRecord(payload) || !Array.isArray(payload.items)) { @@ -284,6 +575,198 @@ export async function getLatestWorkerInstance(workerId: WorkerId) { return rows[0] ?? null } +async function getUnavailableStaticWorkerUrls() { + if (env.provisionerMode !== "static") { + return [] + } + + const rows = await db + .select({ url: WorkerInstanceTable.url }) + .from(WorkerInstanceTable) + .where( + and( + eq(WorkerInstanceTable.provider, "static"), + inArray(WorkerInstanceTable.status, ["provisioning", "healthy"]), + ), + ) + + return rows.map((row) => normalizeUrl(row.url)).filter(Boolean) +} + +function staticReservationStaleBefore() { + return new Date(Date.now() - env.staticWorkers.reservationTtlMs) +} + +async function markStaleStaticReservationsFailed(tx: StaticAssignmentDb) { + const staleRows = await tx + .select({ workerId: WorkerInstanceTable.worker_id }) + .from(WorkerInstanceTable) + .where( + and( + eq(WorkerInstanceTable.provider, "static"), + eq(WorkerInstanceTable.status, "provisioning"), + sql`${WorkerInstanceTable.updated_at} < ${staticReservationStaleBefore()}`, + ), + ) + const staleWorkerIds = [...new Set(staleRows.map((row) => row.workerId))] + if (staleWorkerIds.length > 0) { + await tx + .update(WorkerTable) + .set({ status: "failed" }) + .where(inArray(WorkerTable.id, staleWorkerIds)) + } + + await tx + .update(WorkerInstanceTable) + .set({ status: "failed" }) + .where( + and( + eq(WorkerInstanceTable.provider, "static"), + eq(WorkerInstanceTable.status, "provisioning"), + sql`${WorkerInstanceTable.updated_at} < ${staticReservationStaleBefore()}`, + ), + ) +} + +export function readMySqlLockAcquired(result: unknown) { + const rows = Array.isArray(result) && Array.isArray(result[0]) ? result[0] : result + if (!Array.isArray(rows)) { + return 0 + } + const first = rows[0] + if (first && typeof first === "object" && "acquired" in first) { + return Number((first as { acquired: unknown }).acquired) + } + return 0 +} + +function getMySqlLockPool(): MySqlLockPool { + if (dbClient && typeof dbClient === "object" && "getConnection" in dbClient && typeof dbClient.getConnection === "function") { + return dbClient as MySqlLockPool + } + throw new Error("Static worker assignment locking requires MySQL DB_MODE") +} + +export async function withStaticAssignmentLockUsing(input: { + pool: MySqlLockPool + transaction: (run: (tx: StaticAssignmentDb) => Promise) => Promise + run: (tx: StaticAssignmentDb) => Promise +}) { + const connection = await input.pool.getConnection() + let lockAcquired = false + + try { + const lockRows = await connection.query(`SELECT GET_LOCK(?, 10) AS acquired`, [STATIC_PROVISIONING_LOCK_NAME]) + const acquired = readMySqlLockAcquired(lockRows) + + if (acquired !== 1) { + throw new Error("Timed out waiting for static worker assignment lock") + } + + lockAcquired = true + return await input.transaction(input.run) + } finally { + if (lockAcquired) { + await connection.query(`SELECT RELEASE_LOCK(?)`, [STATIC_PROVISIONING_LOCK_NAME]) + } + connection.release() + } +} + +export async function withStaticAssignmentLock(run: (tx: StaticAssignmentDb) => Promise) { + return withStaticAssignmentLockUsing({ + pool: getMySqlLockPool(), + transaction: (callback) => db.transaction(callback), + run, + }) +} + +async function reserveStaticWorkerInstance(input: { + workerId: WorkerId +}) { + return withStaticAssignmentLock(async (tx) => { + await markStaleStaticReservationsFailed(tx) + + const rows = await tx + .select({ url: WorkerInstanceTable.url }) + .from(WorkerInstanceTable) + .where( + and( + eq(WorkerInstanceTable.provider, "static"), + inArray(WorkerInstanceTable.status, ["provisioning", "healthy"]), + ), + ) + + const url = normalizeStaticWorkerUrl(selectStaticWorkerUrlFromPool(input.workerId, { + ...env.staticWorkers, + unavailableUrls: rows.map((row) => row.url), + })) + const tokens = getStaticWorkerTokenPairForUrl(url, env.staticWorkers) + const instanceId = createDenTypeId("workerInstance") + + await tx.insert(WorkerInstanceTable).values({ + id: instanceId, + worker_id: input.workerId, + provider: "static", + region: "on-prem", + url, + status: "provisioning", + }) + + await tx + .update(WorkerTokenTable) + .set({ token: tokens.hostToken }) + .where(and(eq(WorkerTokenTable.worker_id, input.workerId), eq(WorkerTokenTable.scope, "host"))) + + await tx + .update(WorkerTokenTable) + .set({ token: tokens.clientToken }) + .where(and(eq(WorkerTokenTable.worker_id, input.workerId), eq(WorkerTokenTable.scope, "client"))) + + return { instanceId, url, tokens } + }) +} + +async function continueStaticCloudProvisioning(input: { + workerId: WorkerId + name: string + hostToken: string + clientToken: string + activityToken: string +}) { + const reservation = await reserveStaticWorkerInstance({ workerId: input.workerId }) + + try { + await checkStaticWorkerHealth(reservation.url, env.staticWorkers) + await verifyStaticWorkerRuntimeAccess(reservation.url, reservation.tokens, env.staticWorkers) + + await db.transaction(async (tx) => { + await tx + .update(WorkerTable) + .set({ status: "healthy" }) + .where(eq(WorkerTable.id, input.workerId)) + + await tx + .update(WorkerInstanceTable) + .set({ status: "healthy" }) + .where(eq(WorkerInstanceTable.id, reservation.instanceId)) + }) + } catch (error) { + await db.transaction(async (tx) => { + await tx + .update(WorkerTable) + .set({ status: "failed" }) + .where(eq(WorkerTable.id, input.workerId)) + + await tx + .update(WorkerInstanceTable) + .set({ status: "failed" }) + .where(eq(WorkerInstanceTable.id, reservation.instanceId)) + }) + throw error + } +} + export function toInstanceResponse(instance: WorkerInstanceRow | null) { if (!instance) { return null @@ -327,12 +810,18 @@ export async function continueCloudProvisioning(input: { activityToken: string }) { try { + if (env.provisionerMode === "static") { + await continueStaticCloudProvisioning(input) + return + } + const provisioned = await provisionWorker({ workerId: input.workerId, name: input.name, hostToken: input.hostToken, clientToken: input.clientToken, activityToken: input.activityToken, + unavailableStaticWorkerUrls: await getUnavailableStaticWorkerUrls(), }) await db diff --git a/ee/apps/den-api/src/workers/provisioner.ts b/ee/apps/den-api/src/workers/provisioner.ts index dcf952a34e..132a0639c0 100644 --- a/ee/apps/den-api/src/workers/provisioner.ts +++ b/ee/apps/den-api/src/workers/provisioner.ts @@ -17,6 +17,7 @@ export type ProvisionInput = { hostToken: string clientToken: string activityToken: string + unavailableStaticWorkerUrls?: string[] } export type ProvisionedInstance = { @@ -26,6 +27,26 @@ export type ProvisionedInstance = { region?: string } +export type StaticWorkerTokenPair = { + clientToken: string + hostToken: string +} + +export type StaticWorkerConfig = { + urls: string[] + healthPath: string + healthcheckTimeoutMs: number + healthcheckIntervalMs: number + reservationTtlMs?: number + unavailableUrls?: string[] + tokenMap?: Record +} + +type StaticWorkerReservation = { + workerId: string + expiresAt: number +} + type RenderService = { id: string name?: string @@ -54,6 +75,7 @@ const terminalDeployStates = new Set([ ]) const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)) +const staticWorkerReservations = new Map() const slug = (value: string) => value @@ -135,25 +157,219 @@ async function waitForDeployLive(serviceId: string) { async function waitForHealth( url: string, timeoutMs = env.render.healthcheckTimeoutMs, + intervalMs = env.render.pollIntervalMs, + healthPath = "/health", ) { - const healthUrl = `${url.replace(/\/$/, "")}/health` + const normalizedPath = healthPath.startsWith("/") ? healthPath : `/${healthPath}` + const healthUrl = `${url.replace(/\/$/, "")}${normalizedPath}` const startedAt = Date.now() while (Date.now() - startedAt < timeoutMs) { + const remainingMs = timeoutMs - (Date.now() - startedAt) + const controller = new AbortController() + const timeout = setTimeout(() => controller.abort(), remainingMs) + try { - const response = await fetch(healthUrl, { method: "GET" }) + const response = await fetch(healthUrl, { + method: "GET", + signal: controller.signal, + }) if (response.ok) { return } } catch { // ignore transient network failures while the instance boots + } finally { + clearTimeout(timeout) + } + + const elapsedMs = Date.now() - startedAt + const sleepMs = Math.min(intervalMs, Math.max(timeoutMs - elapsedMs, 0)) + if (sleepMs > 0) { + await sleep(sleepMs) } - await sleep(env.render.pollIntervalMs) } throw new Error(`Timed out waiting for worker health endpoint ${healthUrl}`) } +export function normalizeStaticWorkerUrl(value: string) { + const parsedUrl = new URL(value.trim()) + parsedUrl.hash = "" + parsedUrl.search = "" + parsedUrl.pathname = parsedUrl.pathname.replace(/\/+$/, "") + return parsedUrl.toString().replace(/\/+$/, "") +} + +function safeNormalizeWorkerUrl(value: string) { + try { + return normalizeStaticWorkerUrl(value) + } catch { + return "" + } +} + +export function getStaticWorkerTokenPairForUrl(url: string, config: StaticWorkerConfig) { + const normalizedUrl = normalizeStaticWorkerUrl(url) + const pair = config.tokenMap?.[normalizedUrl] + if (!pair?.clientToken?.trim() || !pair.hostToken?.trim()) { + throw new Error(`STATIC_WORKER_TOKEN_MAP_JSON is missing token pair for ${normalizedUrl}`) + } + return { + clientToken: pair.clientToken.trim(), + hostToken: pair.hostToken.trim(), + } +} + +async function fetchStaticWorkerRuntime(url: string, path: string, headers: Record, timeoutMs: number) { + return fetch(`${url.replace(/\/$/, "")}${path}`, { + method: "GET", + redirect: "manual", + headers: { Accept: "application/json", ...headers }, + signal: AbortSignal.timeout(timeoutMs), + }) +} + +export async function verifyStaticWorkerRuntimeAccess( + url: string, + tokens: StaticWorkerTokenPair, + config: Pick, +) { + const clientResponse = await fetchStaticWorkerRuntime(url, "/workspaces", { + Authorization: `Bearer ${tokens.clientToken}`, + }, config.healthcheckTimeoutMs) + if (!clientResponse.ok) { + throw new Error(`Worker rejected configured static client token with HTTP ${clientResponse.status}`) + } + + const hostResponse = await fetchStaticWorkerRuntime(url, "/env/keys", { + "X-OpenWork-Host-Token": tokens.hostToken, + }, config.healthcheckTimeoutMs) + if (!hostResponse.ok) { + throw new Error(`Worker rejected configured static host token with HTTP ${hostResponse.status}`) + } +} + +function pruneExpiredStaticWorkerReservations(now = Date.now()) { + for (const [url, reservation] of staticWorkerReservations) { + if (reservation.expiresAt <= now) { + staticWorkerReservations.delete(url) + } + } +} + +function selectStaticWorkerUrl(workerId: string, config: StaticWorkerConfig) { + pruneExpiredStaticWorkerReservations() + + return selectStaticWorkerUrlFromPool(workerId, config, true) +} + +export function selectStaticWorkerUrlFromPool( + workerId: string, + config: StaticWorkerConfig, + includeInProcessReservations = false, +) { + if (includeInProcessReservations) { + pruneExpiredStaticWorkerReservations() + } + + const urls = config.urls.map(safeNormalizeWorkerUrl).filter(Boolean) + if (urls.length === 0) { + throw new Error("STATIC_WORKER_URLS is required when PROVISIONER_MODE=static") + } + + const unavailableUrls = new Set( + (config.unavailableUrls ?? []).map(safeNormalizeWorkerUrl).filter(Boolean), + ) + if (includeInProcessReservations) { + for (const [url, reservation] of staticWorkerReservations) { + if (reservation.workerId !== workerId) { + unavailableUrls.add(url) + } + } + } + + const availableUrls = urls.filter((url) => !unavailableUrls.has(url)) + + if (availableUrls.length === 0) { + throw new Error( + "No available static worker URL remains; all configured STATIC_WORKER_URLS are already assigned to active workers", + ) + } + + let hash = 0 + for (const char of workerId) { + hash = (hash * 31 + char.charCodeAt(0)) >>> 0 + } + + return availableUrls[hash % availableUrls.length]! +} + +function reserveStaticWorkerUrl(workerId: string, url: string, ttlMs: number) { + if (ttlMs <= 0) { + return + } + staticWorkerReservations.set(url, { + workerId, + expiresAt: Date.now() + ttlMs, + }) +} + +export function releaseStaticWorkerUrl(workerId: string, url: string) { + const reservation = staticWorkerReservations.get(url) + if (reservation?.workerId === workerId) { + staticWorkerReservations.delete(url) + } +} + +export async function provisionStaticWorker( + input: ProvisionInput, + config: StaticWorkerConfig = env.staticWorkers, +): Promise { + const reservationTtlMs = config.reservationTtlMs ?? 300000 + const url = selectStaticWorkerUrl(input.workerId, { + ...config, + unavailableUrls: config.unavailableUrls ?? input.unavailableStaticWorkerUrls, + }) + reserveStaticWorkerUrl(input.workerId, url, reservationTtlMs) + + try { + await waitForHealth( + url, + config.healthcheckTimeoutMs, + config.healthcheckIntervalMs, + config.healthPath, + ) + const tokens = config.tokenMap?.[url] + if (tokens) { + await verifyStaticWorkerRuntimeAccess(url, tokens, config) + } + } catch (error) { + releaseStaticWorkerUrl(input.workerId, url) + throw error + } + + if (reservationTtlMs <= 0) { + releaseStaticWorkerUrl(input.workerId, url) + } + + return { + provider: "static", + url, + status: "healthy", + region: "on-prem", + } +} + +export async function checkStaticWorkerHealth(url: string, config: StaticWorkerConfig) { + await waitForHealth( + url, + config.healthcheckTimeoutMs, + config.healthcheckIntervalMs, + config.healthPath, + ) +} + async function listRenderServices(limit = 200) { const rows: RenderService[] = [] let cursor: string | undefined @@ -343,6 +559,10 @@ export async function provisionWorker( return provisionWorkerOnDaytona(input) } + if (env.provisionerMode === "static") { + return provisionStaticWorker(input) + } + const template = env.workerUrlTemplate ?? "https://workers.local/{workerId}" const url = template.replace("{workerId}", input.workerId) return { @@ -356,6 +576,13 @@ export async function deprovisionWorker(input: { workerId: WorkerId instanceUrl: string | null }) { + if (env.provisionerMode === "static") { + if (input.instanceUrl) { + releaseStaticWorkerUrl(input.workerId, safeNormalizeWorkerUrl(input.instanceUrl)) + } + return + } + if (env.provisionerMode === "daytona") { await deprovisionWorkerOnDaytona(input.workerId) return diff --git a/ee/apps/den-api/test/provisioner-static.test.ts b/ee/apps/den-api/test/provisioner-static.test.ts new file mode 100644 index 0000000000..6370e1e1e8 --- /dev/null +++ b/ee/apps/den-api/test/provisioner-static.test.ts @@ -0,0 +1,828 @@ +import { afterAll, beforeAll, expect, test } from "bun:test" +import { Hono } from "hono" +import { jsonValidator } from "../src/middleware/validation.js" +import { WorkerInstanceTable, WorkerTable, WorkerTokenTable } from "@openwork-ee/den-db/schema" +import { createDenTypeId } from "@openwork-ee/utils/typeid" +import type { ProvisionedInstance, StaticWorkerConfig } from "../src/workers/provisioner.js" + +function seedRequiredEnv() { + process.env.DATABASE_URL = process.env.DATABASE_URL ?? "mysql://root:password@127.0.0.1:3306/openwork_test" + process.env.DEN_DB_ENCRYPTION_KEY = process.env.DEN_DB_ENCRYPTION_KEY ?? "x".repeat(32) + process.env.BETTER_AUTH_SECRET = process.env.BETTER_AUTH_SECRET ?? "y".repeat(32) + process.env.BETTER_AUTH_URL = process.env.BETTER_AUTH_URL ?? "http://127.0.0.1:8790" + process.env.CORS_ORIGINS = process.env.CORS_ORIGINS ?? "http://127.0.0.1:8790" + process.env.PROVISIONER_MODE = "static" + process.env.STATIC_WORKER_URLS = process.env.STATIC_WORKER_URLS ?? "http://127.0.0.1:8787" + process.env.STATIC_WORKER_TOKEN_MAP_JSON = process.env.STATIC_WORKER_TOKEN_MAP_JSON ?? '{"http://127.0.0.1:8787":{"clientToken":"static-client-token","hostToken":"static-host-token"}}' + process.env.STATIC_WORKER_ATTACH_ALLOW_PRIVATE = "true" +} + +let provisionerModule: typeof import("../src/workers/provisioner.js") +let envModule: typeof import("../src/env.js") +let workersSharedModule: typeof import("../src/routes/workers/shared.js") +let workersCoreModule: typeof import("../src/routes/workers/core.js") +let server: ReturnType +let staticWorkerUrl: string + +function staticWorkerConfig(overrides: Partial = {}): StaticWorkerConfig { + return { + urls: [staticWorkerUrl], + healthPath: "/health", + healthcheckTimeoutMs: 1000, + healthcheckIntervalMs: 10, + reservationTtlMs: 0, + ...overrides, + } +} + +function createFakeStaticAttachStore() { + const instances: Array> = [] + const workers: Array> = [] + const tokens: Array> = [] + const selectedUrl = { value: "" } + + const data = { + select() { + return { + from(table: unknown) { + return { + where() { + return { + async limit() { + if (table !== WorkerInstanceTable) { + return [] + } + return instances + .filter((entry) => entry.provider === "static" + && entry.url === selectedUrl.value + && (entry.status === "provisioning" || entry.status === "healthy")) + .map((entry) => ({ id: entry.id })) + }, + } + }, + } + }, + } + }, + insert(table: unknown) { + return { + async values(value: unknown) { + const values = Array.isArray(value) ? value : [value] + if (table === WorkerInstanceTable) { + instances.push(...values as Record[]) + selectedUrl.value = String((values[0] as Record).url ?? "") + } else if (table === WorkerTokenTable) { + tokens.push(...values as Record[]) + } else if (table === WorkerTable) { + workers.push(...values as Record[]) + } + }, + } + }, + } + + return { data, instances, workers, tokens, selectedUrl } +} + +function createStaticAttachRouteApp(input: { + role?: string + isOwner?: boolean + store?: ReturnType + fetchReachable?: typeof workersCoreModule.assertStaticWorkerReachable + lookup?: Parameters[2] +}) { + const app = new Hono() + const store = input.store ?? createFakeStaticAttachStore() + const userId = createDenTypeId("user") + const orgId = createDenTypeId("organization") + const memberId = createDenTypeId("member") + + workersCoreModule.registerStaticWorkerAttachRoute(app as never, { + data: store.data as never, + lookup: input.lookup ?? (async () => [{ address: "203.0.113.10", family: 4 }]), + fetchReachable: input.fetchReachable ?? (async () => undefined), + getWorkerLimit: async () => ({ exceeded: false, limit: 10, currentCount: 0 }), + lock: async (run) => run(store.data as never), + middlewares: [ + async (c, next) => { + c.set("user", { id: userId, email: "admin@example.com", name: "Admin" }) + c.set("activeOrganizationId", orgId) + c.set("organizationContext", { + organization: { id: orgId }, + currentMember: { + id: memberId, + userId, + role: input.role ?? "admin", + createdAt: new Date(), + isOwner: input.isOwner ?? false, + }, + }) + await next() + }, + jsonValidator(workersSharedModule.attachStaticWorkerSchema), + ] as never, + }) + return { app, store } +} + +async function postStaticAttach(app: Hono, overrides: Record = {}) { + return app.request("http://den.local/v1/workers/static-attach", { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ + name: "Static Attach Route Worker", + url: "http://worker.example.com", + clientToken: "valid-client-token", + hostToken: "valid-host-token", + ...overrides, + }), + }) +} + +beforeAll(async () => { + seedRequiredEnv() + server = Bun.serve({ + port: 0, + fetch(request) { + const url = new URL(request.url) + const authorization = request.headers.get("authorization") + const hostToken = request.headers.get("x-openwork-host-token") + if (url.pathname === "/health") { + return Response.json({ ok: true }) + } + if (url.pathname === "/workspaces") { + return authorization === "Bearer valid-client-token" + ? Response.json({ items: [], activeId: null }) + : Response.json({ error: "unauthorized" }, { status: 401 }) + } + if (url.pathname === "/env/keys") { + return hostToken === "valid-host-token" + ? Response.json({ keys: [] }) + : Response.json({ error: "forbidden" }, { status: 403 }) + } + if (url.pathname === "/redirect-workspaces") { + return new Response(null, { status: 302, headers: { location: "/workspaces" } }) + } + if (url.pathname === "/hang-health") { + return new Promise(() => {}) + } + return new Response("not found", { status: 404 }) + }, + }) + staticWorkerUrl = `http://127.0.0.1:${server.port}` + process.env.STATIC_WORKER_URLS = staticWorkerUrl + process.env.STATIC_WORKER_TOKEN_MAP_JSON = JSON.stringify({ + [staticWorkerUrl]: { clientToken: "valid-client-token", hostToken: "valid-host-token" }, + }) + envModule = await import("../src/env.js") + provisionerModule = await import("../src/workers/provisioner.js") + workersSharedModule = await import("../src/routes/workers/shared.js") + workersCoreModule = await import("../src/routes/workers/core.js") +}) + +afterAll(() => { + server.stop(true) +}) + +test("static provisioner assigns a configured healthy worker URL", async () => { + const provisioned = await provisionerModule.provisionStaticWorker( + { + workerId: "worker_static_health_123", + name: "Static Health", + hostToken: "host-token", + clientToken: "client-token", + activityToken: "activity-token", + }, + staticWorkerConfig(), + ) + + expect(provisioned).toEqual({ + provider: "static", + region: "on-prem", + status: "healthy", + url: staticWorkerUrl, + }) +}) + +test("static provisioner verifies configured token-map tokens against worker runtime", async () => { + const provisioned = await provisionerModule.provisionStaticWorker( + { + workerId: "worker_static_token_map_auth_123", + name: "Static Token Map Auth", + hostToken: "generated-host-token-not-used", + clientToken: "generated-client-token-not-used", + activityToken: "activity-token", + }, + staticWorkerConfig({ + tokenMap: { + [staticWorkerUrl]: { clientToken: "valid-client-token", hostToken: "valid-host-token" }, + }, + }), + ) + + expect(provisioned.url).toBe(staticWorkerUrl) + expect(provisioned.status).toBe("healthy") + + await expect(provisionerModule.provisionStaticWorker( + { + workerId: "worker_static_token_map_reject_123", + name: "Static Token Map Reject", + hostToken: "generated-host-token-not-used", + clientToken: "generated-client-token-not-used", + activityToken: "activity-token", + }, + staticWorkerConfig({ + tokenMap: { + [staticWorkerUrl]: { clientToken: "invalid-client-token", hostToken: "valid-host-token" }, + }, + }), + )).rejects.toThrow("Worker rejected configured static client token") +}) + +test("static provisioner skips URLs already assigned to active workers", async () => { + const provisioned = await provisionerModule.provisionStaticWorker( + { + workerId: "worker_static_available_url_123", + name: "Static Available URL", + hostToken: "host-token", + clientToken: "client-token", + activityToken: "activity-token", + unavailableStaticWorkerUrls: ["http://127.0.0.1:1/"], + }, + staticWorkerConfig({ + urls: ["http://127.0.0.1:1/", staticWorkerUrl], + }), + ) + + expect(provisioned.url).toBe(staticWorkerUrl) + expect(provisioned.status).toBe("healthy") +}) + +test("static provisioner fails clearly when every configured URL is already active", async () => { + await expect(provisionerModule.provisionStaticWorker( + { + workerId: "worker_static_exhausted_123", + name: "Static Exhausted", + hostToken: "host-token", + clientToken: "client-token", + activityToken: "activity-token", + unavailableStaticWorkerUrls: [staticWorkerUrl], + }, + staticWorkerConfig({ + urls: [staticWorkerUrl], + }), + )).rejects.toThrow("No available static worker URL remains") +}) + +test("static selector exhausts against DB-recomputed active normalized URLs", () => { + expect(() => provisionerModule.selectStaticWorkerUrlFromPool( + "worker_static_db_active_exhausted_123", + staticWorkerConfig({ + urls: [staticWorkerUrl], + unavailableUrls: [`${staticWorkerUrl}/`], + }), + )).toThrow("No available static worker URL remains") +}) + +test("static selector allows failed or stopped URLs when DB active set excludes them", () => { + const selected = provisionerModule.selectStaticWorkerUrlFromPool( + "worker_static_db_failed_reuse_123", + staticWorkerConfig({ + urls: [staticWorkerUrl], + unavailableUrls: [], + }), + ) + + expect(selected).toBe(staticWorkerUrl) +}) + +test("static DB assignment lock releases only after reservation transaction completes", async () => { + const events: string[] = [] + const result = await workersSharedModule.withStaticAssignmentLockUsing({ + pool: { + async getConnection() { + return { + async query(statement: string) { + if (statement.includes("GET_LOCK")) { + events.push("lock:acquired") + return [[{ acquired: 1 }], []] + } + if (statement.includes("RELEASE_LOCK")) { + events.push("lock:released") + return [[{}], []] + } + throw new Error(`unexpected query: ${statement}`) + }, + release() { + events.push("connection:released") + }, + } + }, + }, + async transaction(run) { + events.push("transaction:started") + const value = await run({} as never) + events.push("transaction:committed") + return value + }, + async run() { + events.push("reservation:inserted") + return "reserved" + }, + }) + + expect(result).toBe("reserved") + expect(events).toEqual([ + "lock:acquired", + "transaction:started", + "reservation:inserted", + "transaction:committed", + "lock:released", + "connection:released", + ]) +}) + +test("static attach permission gate allows owners and admins only", () => { + expect(workersSharedModule.canAttachStaticWorkerForMember({ currentMember: { isOwner: true, role: "member" } })).toBe(true) + expect(workersSharedModule.canAttachStaticWorkerForMember({ currentMember: { isOwner: false, role: "admin" } })).toBe(true) + expect(workersSharedModule.canAttachStaticWorkerForMember({ currentMember: { isOwner: false, role: "member" } })).toBe(false) +}) + +test("static attach route requires authentication", async () => { + const app = new Hono() + workersCoreModule.registerWorkerCoreRoutes(app) + + const response = await app.request("http://den.local/v1/workers/static-attach", { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ name: "Static", url: staticWorkerUrl, clientToken: "valid-client-token", hostToken: "valid-host-token" }), + }) + + expect(response.status).toBe(401) + await expect(response.json()).resolves.toEqual({ error: "unauthorized" }) +}) + +test("static attach route succeeds for organization admin without token echo", async () => { + const { app, store } = createStaticAttachRouteApp({ role: "admin" }) + const response = await postStaticAttach(app) + const payload = await response.json() as Record + + expect(response.status).toBe(201) + expect(payload.worker).toBeTruthy() + expect(payload.instance).toBeTruthy() + expect(JSON.stringify(payload).includes("valid-client-token")).toBe(false) + expect(JSON.stringify(payload).includes("valid-host-token")).toBe(false) + expect(store.tokens.map((entry) => entry.scope).sort()).toEqual(["activity", "client", "host"]) +}) + +test("static attach route rejects ordinary organization members", async () => { + const { app } = createStaticAttachRouteApp({ role: "member", isOwner: false }) + const response = await postStaticAttach(app) + + expect(response.status).toBe(403) + await expect(response.json()).resolves.toMatchObject({ error: "forbidden" }) +}) + +test("static attach route rejects duplicate URLs before verification", async () => { + const store = createFakeStaticAttachStore() + store.instances.push({ id: "existing", provider: "static", url: "http://worker.example.com", status: "healthy" }) + store.selectedUrl.value = "http://worker.example.com" + const { app } = createStaticAttachRouteApp({ store }) + const response = await postStaticAttach(app) + + expect(response.status).toBe(409) + await expect(response.json()).resolves.toMatchObject({ error: "worker_url_already_attached" }) +}) + +test("static attach route re-checks duplicate URL inside lock before insert", async () => { + const store = createFakeStaticAttachStore() + const app = new Hono() + workersCoreModule.registerStaticWorkerAttachRoute(app as never, { + data: store.data as never, + lookup: async () => [{ address: "203.0.113.10", family: 4 }], + fetchReachable: async () => undefined, + getWorkerLimit: async () => ({ exceeded: false, limit: 10, currentCount: 0 }), + lock: async (run) => { + store.instances.push({ id: "raced", provider: "static", url: "http://worker.example.com", status: "healthy" }) + store.selectedUrl.value = "http://worker.example.com" + return run(store.data as never) + }, + middlewares: [ + async (c, next) => { + const userId = createDenTypeId("user") + const orgId = createDenTypeId("organization") + c.set("user", { id: userId, email: "admin@example.com" }) + c.set("activeOrganizationId", orgId) + c.set("organizationContext", { currentMember: { id: createDenTypeId("member"), userId, role: "admin", createdAt: new Date(), isOwner: false } }) + await next() + }, + jsonValidator(workersSharedModule.attachStaticWorkerSchema), + ] as never, + }) + + const response = await postStaticAttach(app) + expect(response.status).toBe(409) + expect(store.workers).toHaveLength(0) +}) + +test("static attach route checks worker quota inside lock before insert", async () => { + const store = createFakeStaticAttachStore() + let lockActive = false + let quotaCheckedInsideLock = false + const app = new Hono() + workersCoreModule.registerStaticWorkerAttachRoute(app as never, { + data: store.data as never, + lookup: async () => [{ address: "203.0.113.10", family: 4 }], + fetchReachable: async () => undefined, + getWorkerLimit: async () => { + quotaCheckedInsideLock = lockActive + return { exceeded: true, limit: 1, currentCount: 1 } + }, + lock: async (run) => { + lockActive = true + try { + return await run(store.data as never) + } finally { + lockActive = false + } + }, + middlewares: [ + async (c, next) => { + const userId = createDenTypeId("user") + const orgId = createDenTypeId("organization") + c.set("user", { id: userId, email: "admin@example.com" }) + c.set("activeOrganizationId", orgId) + c.set("organizationContext", { currentMember: { id: createDenTypeId("member"), userId, role: "admin", createdAt: new Date(), isOwner: false } }) + await next() + }, + jsonValidator(workersSharedModule.attachStaticWorkerSchema), + ] as never, + }) + + const response = await postStaticAttach(app) + expect(response.status).toBe(409) + await expect(response.json()).resolves.toMatchObject({ error: "org_limit_reached" }) + expect(quotaCheckedInsideLock).toBe(true) + expect(store.workers).toHaveLength(0) +}) + +test("static attach route rejects invalid URL", async () => { + const { app } = createStaticAttachRouteApp({}) + const response = await postStaticAttach(app, { url: "ftp://worker.example.com" }) + + expect(response.status).toBe(400) + await expect(response.json()).resolves.toMatchObject({ error: "invalid_request" }) +}) + +test("static attach route rejects invalid client and host tokens", async () => { + const clientFailure = createStaticAttachRouteApp({ + fetchReachable: async () => { throw new Error("arbitrary upstream text valid-client-token") }, + }) + const clientResponse = await postStaticAttach(clientFailure.app) + const clientPayload = await clientResponse.json() as Record + expect(clientResponse.status).toBe(400) + expect(clientPayload.message).toBe("Static worker verification failed with the provided URL and tokens.") + expect(JSON.stringify(clientPayload).includes("valid-client-token")).toBe(false) + + const hostFailure = createStaticAttachRouteApp({ + fetchReachable: async () => { throw new Error("arbitrary upstream text valid-host-token") }, + }) + const hostResponse = await postStaticAttach(hostFailure.app) + const hostPayload = await hostResponse.json() as Record + expect(hostResponse.status).toBe(400) + expect(hostPayload.message).toBe("Static worker verification failed with the provided URL and tokens.") + expect(JSON.stringify(hostPayload).includes("valid-host-token")).toBe(false) +}) + +test("static attach URL policy rejects unsafe URLs and allows explicit on-prem hosts", () => { + const defaultPolicy = { allowPrivate: false, allowedHosts: [], allowedCidrs: [] } + + expect(workersSharedModule.validateStaticWorkerAttachUrl("ftp://worker.example.com", defaultPolicy)).toMatchObject({ ok: false }) + expect(workersSharedModule.validateStaticWorkerAttachUrl("http://user:pass@worker.example.com", defaultPolicy)).toMatchObject({ ok: false }) + expect(workersSharedModule.validateStaticWorkerAttachUrl("http://worker.example.com/?token=abc", defaultPolicy)).toMatchObject({ ok: false }) + expect(workersSharedModule.validateStaticWorkerAttachUrl("http://127.0.0.1:8787", defaultPolicy)).toMatchObject({ ok: false }) + expect(workersSharedModule.validateStaticWorkerAttachUrl("http://127.0.0.1:8787", { ...defaultPolicy, allowedCidrs: ["127.0.0.0/8"] })).toMatchObject({ + ok: true, + url: "http://127.0.0.1:8787", + }) + expect(workersSharedModule.validateStaticWorkerAttachUrl("http://lan-worker.local:8787", { ...defaultPolicy, allowedHosts: ["lan-worker.local"] })).toMatchObject({ ok: true }) +}) + +test("static attach URL policy blocks DNS names resolving to unsafe IPv4 and IPv6 addresses", async () => { + await expect(workersSharedModule.validateResolvedStaticWorkerAttachUrl( + "http://public-name.example.com", + { allowPrivate: false, allowedHosts: [], allowedCidrs: [] }, + async () => [{ address: "127.0.0.1", family: 4 }], + )).resolves.toMatchObject({ ok: false }) + + await expect(workersSharedModule.validateResolvedStaticWorkerAttachUrl( + "http://public-name.example.com", + { allowPrivate: false, allowedHosts: [], allowedCidrs: [] }, + async () => [{ address: "fe80::1", family: 6 }], + )).resolves.toMatchObject({ ok: false }) + + await expect(workersSharedModule.validateResolvedStaticWorkerAttachUrl( + "http://public-name.example.com", + { allowPrivate: false, allowedHosts: [], allowedCidrs: ["127.0.0.0/8"] }, + async () => [{ address: "127.0.0.1", family: 4 }], + )).resolves.toMatchObject({ ok: true }) + + await expect(workersSharedModule.validateResolvedStaticWorkerAttachUrl( + "http://allowed-host.example.com", + { allowPrivate: false, allowedHosts: ["allowed-host.example.com"], allowedCidrs: [] }, + async () => [{ address: "::1", family: 6 }], + )).resolves.toMatchObject({ ok: false }) +}) + +test("static attach URL policy allows explicitly allow-listed HTTPS hostnames", async () => { + await expect(workersSharedModule.validateResolvedStaticWorkerAttachUrl( + "https://worker.example.com", + { allowPrivate: false, allowedHosts: ["worker.example.com"], allowedCidrs: [] }, + async () => [{ address: "203.0.113.10", family: 4 }], + )).resolves.toMatchObject({ ok: true }) +}) + +test("static attach URL policy rejects non-allow-listed HTTPS hostnames", async () => { + await expect(workersSharedModule.validateResolvedStaticWorkerAttachUrl( + "https://worker.example.com", + { allowPrivate: false, allowedHosts: [], allowedCidrs: [] }, + async () => [{ address: "203.0.113.10", family: 4 }], + )).resolves.toMatchObject({ ok: false }) +}) + +test("static attach URL policy still rejects allow-listed HTTPS hostnames resolving to unsafe addresses", async () => { + await expect(workersSharedModule.validateResolvedStaticWorkerAttachUrl( + "https://worker.example.com", + { allowPrivate: false, allowedHosts: ["worker.example.com"], allowedCidrs: [] }, + async () => [{ address: "127.0.0.1", family: 4 }], + )).resolves.toMatchObject({ ok: false }) +}) + +test("static attach verification fetch uses the validated IP address instead of re-resolving hostname", async () => { + const seenHosts: string[] = [] + const pinServer = Bun.serve({ + port: 0, + fetch(request) { + seenHosts.push(request.headers.get("host") ?? "") + const url = new URL(request.url) + if (url.pathname === "/workspaces") { + return Response.json({ items: [] }) + } + if (url.pathname === "/env/keys") { + return Response.json({ keys: [] }) + } + return new Response("not found", { status: 404 }) + }, + }) + try { + const target = await workersSharedModule.validateResolvedStaticWorkerAttachUrl( + `http://rebinding-worker.test:${pinServer.port}`, + { allowPrivate: false, allowedHosts: [], allowedCidrs: ["127.0.0.0/8"] }, + async () => [{ address: "127.0.0.1", family: 4 }], + ) + expect(target.ok).toBe(true) + if (target.ok) { + await workersCoreModule.assertStaticWorkerReachable(target, "valid-client-token", "valid-host-token") + } + expect(seenHosts).toEqual([`rebinding-worker.test:${pinServer.port}`, `rebinding-worker.test:${pinServer.port}`]) + } finally { + pinServer.stop(true) + } +}) + +test("static attach verification keeps HTTPS hostname for certificate validation", async () => { + const originalFetch = globalThis.fetch + const requestedUrls: string[] = [] + globalThis.fetch = ((input: RequestInfo | URL) => { + requestedUrls.push(String(input)) + return Promise.resolve(Response.json({ ok: true })) + }) as typeof fetch + try { + await workersCoreModule.assertStaticWorkerReachable({ + ok: true, + url: "https://worker.example.com:8787", + resolvedAddresses: [{ address: "203.0.113.10", family: 4 }], + }, "valid-client-token", "valid-host-token") + expect(requestedUrls).toEqual([ + "https://worker.example.com:8787/workspaces", + "https://worker.example.com:8787/env/keys", + ]) + } finally { + globalThis.fetch = originalFetch + } +}) + +test("static attach worker token verification succeeds without following redirects", async () => { + await expect(workersCoreModule.assertStaticWorkerReachable(staticWorkerUrl, "valid-client-token", "valid-host-token")).resolves.toBeUndefined() + + const redirectResponse = await workersCoreModule.fetchStaticWorker(staticWorkerUrl, "/redirect-workspaces", {}) + expect(redirectResponse.status).toBe(302) +}) + +test("static attach worker token verification rejects invalid client and host tokens without echoing tokens", async () => { + await expect(workersCoreModule.assertStaticWorkerReachable(staticWorkerUrl, "invalid-client-token", "valid-host-token")) + .rejects.toThrow("Worker rejected the provided client token with HTTP 401") + await expect(workersCoreModule.assertStaticWorkerReachable(staticWorkerUrl, "valid-client-token", "invalid-host-token")) + .rejects.toThrow("Worker rejected the provided host token with HTTP 403") + + try { + await workersCoreModule.assertStaticWorkerReachable(staticWorkerUrl, "invalid-client-token", "valid-host-token") + } catch (error) { + const message = error instanceof Error ? error.message : String(error) + expect(message.includes("invalid-client-token")).toBe(false) + expect(message.includes("valid-host-token")).toBe(false) + } +}) + +test("static provisioner fails clearly when no worker URLs are configured", async () => { + await expect(provisionerModule.provisionStaticWorker( + { + workerId: "worker_static_missing_url_123", + name: "Static Missing URL", + hostToken: "host-token", + clientToken: "client-token", + activityToken: "activity-token", + }, + staticWorkerConfig({ + urls: [], + }), + )).rejects.toThrow("STATIC_WORKER_URLS is required when PROVISIONER_MODE=static") +}) + +test("static provisioner fails clearly when health check does not pass", async () => { + await expect(provisionerModule.provisionStaticWorker( + { + workerId: "worker_static_unhealthy_123", + name: "Static Unhealthy", + hostToken: "host-token", + clientToken: "client-token", + activityToken: "activity-token", + }, + staticWorkerConfig({ + urls: [`${staticWorkerUrl}/missing`], + healthcheckTimeoutMs: 50, + }), + )).rejects.toThrow("Timed out waiting for worker health endpoint") +}) + +test("static provisioner aborts a hanging health check within the configured timeout", async () => { + const startedAt = performance.now() + + await expect(provisionerModule.provisionStaticWorker( + { + workerId: "worker_static_hanging_health_123", + name: "Static Hanging Health", + hostToken: "host-token", + clientToken: "client-token", + activityToken: "activity-token", + }, + staticWorkerConfig({ + healthPath: "/hang-health", + healthcheckTimeoutMs: 75, + }), + )).rejects.toThrow("Timed out waiting for worker health endpoint") + + expect(performance.now() - startedAt).toBeLessThan(1000) +}) + +test("static env validation requires config only when static mode is enabled", () => { + expect(envModule.parseStaticWorkersEnv({ STATIC_WORKER_URLS: undefined }).issues).toContainEqual({ + path: "STATIC_WORKER_URLS", + message: "STATIC_WORKER_URLS is required when PROVISIONER_MODE=static", + }) + expect(envModule.parseStaticWorkersEnv({ STATIC_WORKER_URLS: undefined }).issues).toContainEqual({ + path: "STATIC_WORKER_TOKEN_MAP_JSON", + message: "STATIC_WORKER_TOKEN_MAP_JSON is required when PROVISIONER_MODE=static", + }) +}) + +test("static env validation normalizes URLs and rejects duplicate normalized URLs", () => { + const parsed = envModule.parseStaticWorkersEnv({ + STATIC_WORKER_URLS: "https://Worker.Example.com/, https://worker.example.com", + STATIC_WORKER_TOKEN_MAP_JSON: JSON.stringify({ + "https://worker.example.com": { clientToken: "client-token", hostToken: "host-token" }, + }), + }) + + expect(parsed.urls).toEqual(["https://worker.example.com"]) + expect(parsed.issues.some((issue) => issue.message.includes("duplicate URL https://worker.example.com"))).toBe(true) +}) + +test("static env validation rejects invalid URL, protocol, health path, and timeout values", () => { + const parsed = envModule.parseStaticWorkersEnv({ + STATIC_WORKER_URLS: "not-a-url, ftp://worker.example.com", + STATIC_WORKER_HEALTH_PATH: "health?ready=1", + STATIC_WORKER_HEALTHCHECK_TIMEOUT_MS: "0", + STATIC_WORKER_HEALTHCHECK_INTERVAL_MS: "NaN", + STATIC_WORKER_RESERVATION_TTL_MS: "-1", + STATIC_WORKER_TOKEN_MAP_JSON: "not-json", + }) + + expect(parsed.issues.some((issue) => issue.message === "STATIC_WORKER_URLS entries must be valid URLs")).toBe(true) + expect(parsed.issues.some((issue) => issue.message === "STATIC_WORKER_URLS entries must use http or https URLs")).toBe(true) + expect(parsed.issues.some((issue) => issue.path === "STATIC_WORKER_HEALTH_PATH")).toBe(true) + expect(parsed.issues.some((issue) => issue.path === "STATIC_WORKER_HEALTHCHECK_TIMEOUT_MS")).toBe(true) + expect(parsed.issues.some((issue) => issue.path === "STATIC_WORKER_HEALTHCHECK_INTERVAL_MS")).toBe(true) + expect(parsed.issues.some((issue) => issue.path === "STATIC_WORKER_RESERVATION_TTL_MS")).toBe(true) + expect(parsed.issues.some((issue) => issue.message === "STATIC_WORKER_TOKEN_MAP_JSON must be valid JSON")).toBe(true) +}) + +test("static env validation requires one token-map pair per configured worker URL", () => { + const parsed = envModule.parseStaticWorkersEnv({ + STATIC_WORKER_URLS: "http://worker-a.example.com,http://worker-b.example.com", + STATIC_WORKER_TOKEN_MAP_JSON: JSON.stringify({ + "http://worker-a.example.com/": { clientToken: "client-a", hostToken: "host-a" }, + "http://worker-extra.example.com": { clientToken: "client-extra", hostToken: "host-extra" }, + }), + }) + + expect(parsed.tokenMap["http://worker-a.example.com"]).toEqual({ clientToken: "client-a", hostToken: "host-a" }) + expect(parsed.issues.some((issue) => issue.message.includes("missing token pair for http://worker-b.example.com"))).toBe(true) + expect(parsed.issues.some((issue) => issue.message.includes("unconfigured URL http://worker-extra.example.com"))).toBe(true) +}) + +test("static provisioner in-process reservations prevent concurrent duplicate assignment", async () => { + const config = staticWorkerConfig({ reservationTtlMs: 1000 }) + const first = provisionerModule.provisionStaticWorker( + { + workerId: "worker_static_concurrent_a_123", + name: "Static Concurrent A", + hostToken: "host-token", + clientToken: "client-token", + activityToken: "activity-token", + }, + config, + ) + const second = provisionerModule.provisionStaticWorker( + { + workerId: "worker_static_concurrent_b_123", + name: "Static Concurrent B", + hostToken: "host-token", + clientToken: "client-token", + activityToken: "activity-token", + }, + config, + ) + + const results = await Promise.allSettled([first, second]) + expect(results.filter((result) => result.status === "fulfilled")).toHaveLength(1) + expect(results.filter((result) => result.status === "rejected")).toHaveLength(1) + + const fulfilled = results.find((result): result is PromiseFulfilledResult => result.status === "fulfilled") + expect(fulfilled?.value.url).toBe(staticWorkerUrl) + await provisionerModule.deprovisionWorker({ workerId: "worker_static_concurrent_a_123", instanceUrl: staticWorkerUrl }) + await provisionerModule.deprovisionWorker({ workerId: "worker_static_concurrent_b_123", instanceUrl: staticWorkerUrl }) +}) + +test("static provisioner releases failed health reservations for reuse", async () => { + await expect(provisionerModule.provisionStaticWorker( + { + workerId: "worker_static_failed_cleanup_123", + name: "Static Failed Cleanup", + hostToken: "host-token", + clientToken: "client-token", + activityToken: "activity-token", + }, + staticWorkerConfig({ healthPath: "/missing", healthcheckTimeoutMs: 50, reservationTtlMs: 1000 }), + )).rejects.toThrow("Timed out waiting for worker health endpoint") + + const provisioned = await provisionerModule.provisionStaticWorker( + { + workerId: "worker_static_failed_cleanup_reuse_123", + name: "Static Failed Cleanup Reuse", + hostToken: "host-token", + clientToken: "client-token", + activityToken: "activity-token", + }, + staticWorkerConfig(), + ) + expect(provisioned.url).toBe(staticWorkerUrl) +}) + +test("static provisioner recovers stale reservations for reuse", async () => { + const first = await provisionerModule.provisionStaticWorker( + { + workerId: "worker_static_stale_a_123", + name: "Static Stale A", + hostToken: "host-token", + clientToken: "client-token", + activityToken: "activity-token", + }, + staticWorkerConfig({ reservationTtlMs: 1 }), + ) + expect(first.url).toBe(staticWorkerUrl) + + await new Promise((resolve) => setTimeout(resolve, 5)) + + const second = await provisionerModule.provisionStaticWorker( + { + workerId: "worker_static_stale_b_123", + name: "Static Stale B", + hostToken: "host-token", + clientToken: "client-token", + activityToken: "activity-token", + }, + staticWorkerConfig(), + ) + expect(second.url).toBe(staticWorkerUrl) +})