diff --git a/ee/apps/den-api/src/db.ts b/ee/apps/den-api/src/db.ts index bf48dfb132..a8fa313c07 100644 --- a/ee/apps/den-api/src/db.ts +++ b/ee/apps/den-api/src/db.ts @@ -1,8 +1,9 @@ import { createDenDb } from "@openwork-ee/den-db" import { env } from "./env.js" -export const { db } = createDenDb({ +export const denDb = createDenDb({ databaseUrl: env.databaseUrl, mode: env.dbMode, planetscale: env.planetscale, }) +export const { client: dbClient, db } = denDb diff --git a/ee/apps/den-api/src/env.ts b/ee/apps/den-api/src/env.ts index 61cbd19614..1266b49273 100644 --- a/ee/apps/den-api/src/env.ts +++ b/ee/apps/den-api/src/env.ts @@ -34,8 +34,17 @@ const EnvSchema = z.object({ PORT: z.string().optional(), CORS_ORIGINS: z.string().optional(), WORKER_PROXY_PORT: z.string().optional(), - PROVISIONER_MODE: z.enum(["stub", "render", "daytona"]).optional(), + PROVISIONER_MODE: z.enum(["stub", "render", "daytona", "static"]).optional(), WORKER_URL_TEMPLATE: z.string().optional(), + STATIC_WORKER_URLS: z.string().optional(), + STATIC_WORKER_HEALTH_PATH: z.string().optional(), + STATIC_WORKER_HEALTHCHECK_TIMEOUT_MS: z.string().optional(), + STATIC_WORKER_HEALTHCHECK_INTERVAL_MS: z.string().optional(), + STATIC_WORKER_RESERVATION_TTL_MS: z.string().optional(), + STATIC_WORKER_TOKEN_MAP_JSON: z.string().optional(), + STATIC_WORKER_ATTACH_ALLOW_PRIVATE: z.string().optional(), + STATIC_WORKER_ATTACH_ALLOWED_HOSTS: z.string().optional(), + STATIC_WORKER_ATTACH_ALLOWED_CIDRS: z.string().optional(), WORKER_ACTIVITY_BASE_URL: z.string().optional(), OPENWORK_DAYTONA_ENV_PATH: z.string().optional(), RENDER_API_BASE: z.string().optional(), @@ -123,6 +132,17 @@ const EnvSchema = z.object({ } } } + + if (value.PROVISIONER_MODE === "static") { + const staticConfig = parseStaticWorkersEnv(value) + for (const issue of staticConfig.issues) { + ctx.addIssue({ + code: z.ZodIssueCode.custom, + message: issue.message, + path: [issue.path], + }) + } + } }) const parsed = EnvSchema.parse(process.env) @@ -147,6 +167,226 @@ function normalizeOrigin(origin: string) { return value.replace(/\/+$/, "") } +type StaticWorkersEnvInput = { + STATIC_WORKER_URLS?: string + STATIC_WORKER_HEALTH_PATH?: string + STATIC_WORKER_HEALTHCHECK_TIMEOUT_MS?: string + STATIC_WORKER_HEALTHCHECK_INTERVAL_MS?: string + STATIC_WORKER_RESERVATION_TTL_MS?: string + STATIC_WORKER_TOKEN_MAP_JSON?: string + STATIC_WORKER_ATTACH_ALLOW_PRIVATE?: string + STATIC_WORKER_ATTACH_ALLOWED_HOSTS?: string + STATIC_WORKER_ATTACH_ALLOWED_CIDRS?: string +} + +type StaticWorkersEnvIssue = { + path: keyof StaticWorkersEnvInput + message: string +} + +type StaticWorkerTokenPair = { + clientToken: string + hostToken: string +} + +function parsePositiveInteger(value: string | undefined, fallback: number) { + const raw = value?.trim() + if (!raw) { + return fallback + } + const parsedValue = Number(raw) + return Number.isInteger(parsedValue) && parsedValue > 0 ? parsedValue : null +} + +function normalizeStaticWorkerUrl(value: string) { + const parsedUrl = new URL(value.trim()) + parsedUrl.hash = "" + parsedUrl.search = "" + parsedUrl.pathname = parsedUrl.pathname.replace(/\/+$/, "") + const serialized = parsedUrl.toString().replace(/\/+$/, "") + return serialized +} + +function isRecord(value: unknown): value is Record { + return typeof value === "object" && value !== null && !Array.isArray(value) +} + +function parseStaticWorkerTokenPair(value: unknown) { + if (!isRecord(value)) { + return null + } + const clientToken = typeof value.clientToken === "string" ? value.clientToken.trim() : "" + const hostToken = typeof value.hostToken === "string" ? value.hostToken.trim() : "" + return clientToken && hostToken ? { clientToken, hostToken } : null +} + +export function parseStaticWorkersEnv(input: StaticWorkersEnvInput) { + const issues: StaticWorkersEnvIssue[] = [] + const urls: string[] = [] + const seenUrls = new Set() + const tokenMap: Record = {} + + for (const rawUrl of splitCsv(input.STATIC_WORKER_URLS)) { + let normalizedUrl: string + try { + const parsedUrl = new URL(rawUrl) + if (parsedUrl.protocol !== "http:" && parsedUrl.protocol !== "https:") { + issues.push({ + path: "STATIC_WORKER_URLS", + message: "STATIC_WORKER_URLS entries must use http or https URLs", + }) + continue + } + normalizedUrl = normalizeStaticWorkerUrl(rawUrl) + } catch { + issues.push({ + path: "STATIC_WORKER_URLS", + message: "STATIC_WORKER_URLS entries must be valid URLs", + }) + continue + } + + if (seenUrls.has(normalizedUrl)) { + issues.push({ + path: "STATIC_WORKER_URLS", + message: `STATIC_WORKER_URLS contains duplicate URL ${normalizedUrl}`, + }) + continue + } + + seenUrls.add(normalizedUrl) + urls.push(normalizedUrl) + } + + if (urls.length === 0) { + issues.push({ + path: "STATIC_WORKER_URLS", + message: "STATIC_WORKER_URLS is required when PROVISIONER_MODE=static", + }) + } + + const rawTokenMap = optionalString(input.STATIC_WORKER_TOKEN_MAP_JSON) + if (!rawTokenMap) { + issues.push({ + path: "STATIC_WORKER_TOKEN_MAP_JSON", + message: "STATIC_WORKER_TOKEN_MAP_JSON is required when PROVISIONER_MODE=static", + }) + } else { + try { + const parsedTokenMap: unknown = JSON.parse(rawTokenMap) + if (!isRecord(parsedTokenMap)) { + issues.push({ + path: "STATIC_WORKER_TOKEN_MAP_JSON", + message: "STATIC_WORKER_TOKEN_MAP_JSON must be a JSON object keyed by worker URL", + }) + } else { + for (const [rawUrl, rawPair] of Object.entries(parsedTokenMap)) { + let normalizedUrl: string + try { + normalizedUrl = normalizeStaticWorkerUrl(rawUrl) + } catch { + issues.push({ + path: "STATIC_WORKER_TOKEN_MAP_JSON", + message: "STATIC_WORKER_TOKEN_MAP_JSON keys must be valid worker URLs", + }) + continue + } + + const pair = parseStaticWorkerTokenPair(rawPair) + if (!pair) { + issues.push({ + path: "STATIC_WORKER_TOKEN_MAP_JSON", + message: `STATIC_WORKER_TOKEN_MAP_JSON entry for ${normalizedUrl} must include non-empty clientToken and hostToken`, + }) + continue + } + if (Object.prototype.hasOwnProperty.call(tokenMap, normalizedUrl)) { + issues.push({ + path: "STATIC_WORKER_TOKEN_MAP_JSON", + message: `STATIC_WORKER_TOKEN_MAP_JSON contains duplicate key for ${normalizedUrl}`, + }) + continue + } + tokenMap[normalizedUrl] = pair + } + + for (const url of urls) { + if (!tokenMap[url]) { + issues.push({ + path: "STATIC_WORKER_TOKEN_MAP_JSON", + message: `STATIC_WORKER_TOKEN_MAP_JSON is missing token pair for ${url}`, + }) + } + } + + const configuredUrls = new Set(urls) + for (const url of Object.keys(tokenMap)) { + if (!configuredUrls.has(url)) { + issues.push({ + path: "STATIC_WORKER_TOKEN_MAP_JSON", + message: `STATIC_WORKER_TOKEN_MAP_JSON contains token pair for unconfigured URL ${url}`, + }) + } + } + } + } catch { + issues.push({ + path: "STATIC_WORKER_TOKEN_MAP_JSON", + message: "STATIC_WORKER_TOKEN_MAP_JSON must be valid JSON", + }) + } + } + + const healthPath = optionalString(input.STATIC_WORKER_HEALTH_PATH) ?? "/health" + if (!healthPath.startsWith("/") || healthPath.startsWith("//") || healthPath.includes("?")) { + issues.push({ + path: "STATIC_WORKER_HEALTH_PATH", + message: "STATIC_WORKER_HEALTH_PATH must be an absolute path such as /health", + }) + } + + const healthcheckTimeoutMs = parsePositiveInteger(input.STATIC_WORKER_HEALTHCHECK_TIMEOUT_MS, 10000) + if (healthcheckTimeoutMs === null) { + issues.push({ + path: "STATIC_WORKER_HEALTHCHECK_TIMEOUT_MS", + message: "STATIC_WORKER_HEALTHCHECK_TIMEOUT_MS must be a positive integer", + }) + } + + const healthcheckIntervalMs = parsePositiveInteger(input.STATIC_WORKER_HEALTHCHECK_INTERVAL_MS, 1000) + if (healthcheckIntervalMs === null) { + issues.push({ + path: "STATIC_WORKER_HEALTHCHECK_INTERVAL_MS", + message: "STATIC_WORKER_HEALTHCHECK_INTERVAL_MS must be a positive integer", + }) + } + + const reservationTtlMs = parsePositiveInteger(input.STATIC_WORKER_RESERVATION_TTL_MS, 300000) + if (reservationTtlMs === null) { + issues.push({ + path: "STATIC_WORKER_RESERVATION_TTL_MS", + message: "STATIC_WORKER_RESERVATION_TTL_MS must be a positive integer", + }) + } + + const allowPrivateAttach = (input.STATIC_WORKER_ATTACH_ALLOW_PRIVATE ?? "false").trim().toLowerCase() === "true" + const attachAllowedHosts = splitCsv(input.STATIC_WORKER_ATTACH_ALLOWED_HOSTS).map((host) => host.toLowerCase()) + const attachAllowedCidrs = splitCsv(input.STATIC_WORKER_ATTACH_ALLOWED_CIDRS) + + return { + urls, + healthPath, + healthcheckTimeoutMs: healthcheckTimeoutMs ?? 10000, + healthcheckIntervalMs: healthcheckIntervalMs ?? 1000, + reservationTtlMs: reservationTtlMs ?? 300000, + tokenMap, + allowPrivateAttach, + attachAllowedHosts, + attachAllowedCidrs, + issues, + } +} + const corsOrigins = splitCsv(parsed.CORS_ORIGINS).map((origin) => normalizeOrigin(origin)) const betterAuthTrustedOrigins = splitCsv(parsed.DEN_BETTER_AUTH_TRUSTED_ORIGINS) .map((origin) => normalizeOrigin(origin)) @@ -159,6 +399,7 @@ const requireEmailVerification = parsed.DEN_REQUIRE_EMAIL_VERIFICATION === undef ? !devMode : parsed.DEN_REQUIRE_EMAIL_VERIFICATION.trim().toLowerCase() !== "false" const port = Number(parsed.PORT ?? "8790") +const staticWorkers = parseStaticWorkersEnv(parsed) const daytonaSandboxPublic = (parsed.DAYTONA_SANDBOX_PUBLIC ?? "false").toLowerCase() === "true" @@ -223,6 +464,17 @@ export const env = { corsOrigins, provisionerMode: parsed.PROVISIONER_MODE ?? "daytona", workerUrlTemplate: parsed.WORKER_URL_TEMPLATE, + staticWorkers: { + urls: staticWorkers.urls, + healthPath: staticWorkers.healthPath, + healthcheckTimeoutMs: staticWorkers.healthcheckTimeoutMs, + healthcheckIntervalMs: staticWorkers.healthcheckIntervalMs, + reservationTtlMs: staticWorkers.reservationTtlMs, + tokenMap: staticWorkers.tokenMap, + allowPrivateAttach: staticWorkers.allowPrivateAttach, + attachAllowedHosts: staticWorkers.attachAllowedHosts, + attachAllowedCidrs: staticWorkers.attachAllowedCidrs, + }, workerActivityBaseUrl: optionalString(parsed.WORKER_ACTIVITY_BASE_URL) ?? parsed.BETTER_AUTH_URL.trim().replace(/\/+$/, ""), diff --git a/ee/apps/den-api/src/routes/workers/core.ts b/ee/apps/den-api/src/routes/workers/core.ts index 2d0be1fe80..bdd6b9b3cf 100644 --- a/ee/apps/den-api/src/routes/workers/core.ts +++ b/ee/apps/den-api/src/routes/workers/core.ts @@ -1,10 +1,11 @@ -import { desc, eq } from "@openwork-ee/den-db/drizzle" +import { and, desc, eq, inArray } from "@openwork-ee/den-db/drizzle" import { WorkerTable, WorkerTokenTable } from "@openwork-ee/den-db/schema" import { createDenTypeId, normalizeDenTypeId } from "@openwork-ee/utils/typeid" import type { Hono } from "hono" import { describeRoute } from "hono-openapi" import { z } from "zod" import { db } from "../../db.js" +import { env } from "../../env.js" import { jsonValidator, paramValidator, queryValidator, requireUserMiddleware, resolveUserOrganizationsMiddleware } from "../../middleware/index.js" import { denTypeIdSchema, emptyResponse, forbiddenSchema, invalidRequestSchema, jsonResponse, notFoundSchema, unauthorizedSchema } from "../../openapi.js" import { getOrganizationLimitStatus } from "../../organization-limits.js" @@ -20,6 +21,7 @@ import { listWorkersQuerySchema, parseWorkerIdParam, requireCloudAccessOrPayment, + shouldUseSignupAutoExistingWorker, toInstanceResponse, toWorkerResponse, token, @@ -91,6 +93,8 @@ const workerTokensResponseSchema = z.object({ }).nullable(), }).meta({ ref: "WorkerTokensResponse" }) +type WorkerRow = typeof WorkerTable.$inferSelect + const organizationUnavailableSchema = z.object({ error: z.literal("organization_unavailable"), }).meta({ ref: "OrganizationUnavailableError" }) @@ -124,6 +128,42 @@ const workerRuntimeUnavailableSchema = z.object({ message: z.string(), })).meta({ ref: "WorkerConnectionError" }) +async function getExistingSignupAutoWorkerResponse(input: { + orgId: WorkerRow["org_id"] + userId: NonNullable +}) { + const rows = await db + .select() + .from(WorkerTable) + .where(and( + eq(WorkerTable.org_id, input.orgId), + eq(WorkerTable.created_by_user_id, input.userId), + eq(WorkerTable.destination, "cloud"), + inArray(WorkerTable.status, ["provisioning", "healthy"]), + )) + .orderBy(desc(WorkerTable.created_at)) + .limit(1) + + const existingWorker = rows[0] + if (!existingWorker) { + return null + } + + const instance = await getLatestWorkerInstance(existingWorker.id) + const tokensAndConnect = await getWorkerTokensAndConnect(existingWorker) + const tokens = "tokens" in tokensAndConnect ? tokensAndConnect.tokens : undefined + if (!tokens) { + return null + } + + return { + worker: toWorkerResponse(existingWorker, input.userId), + tokens, + instance: toInstanceResponse(instance), + launch: { mode: "existing", pollAfterMs: 0 }, + } +} + export function registerWorkerCoreRoutes(app: Hono) { app.get( "/v1/workers", @@ -197,6 +237,16 @@ export function registerWorkerCoreRoutes +type MySqlLockConnection = { + query: (statement: string, values?: unknown[]) => Promise + release: () => void +} +type MySqlLockPool = { + getConnection: () => Promise +} export const token = () => randomBytes(32).toString("hex") @@ -75,6 +99,8 @@ function normalizeUrl(value: string): string { return value.trim().replace(/\/+$/, "") } +const STATIC_PROVISIONING_LOCK_NAME = "den_static_provisioner_assignment" + function parseWorkspaceSelection(payload: unknown): { workspaceId: string; openworkUrl: string } | null { if (!isRecord(payload) || !Array.isArray(payload.items)) { return null @@ -284,6 +310,198 @@ export async function getLatestWorkerInstance(workerId: WorkerId) { return rows[0] ?? null } +async function getUnavailableStaticWorkerUrls() { + if (env.provisionerMode !== "static") { + return [] + } + + const rows = await db + .select({ url: WorkerInstanceTable.url }) + .from(WorkerInstanceTable) + .where( + and( + eq(WorkerInstanceTable.provider, "static"), + inArray(WorkerInstanceTable.status, ["provisioning", "healthy"]), + ), + ) + + return rows.map((row) => normalizeUrl(row.url)).filter(Boolean) +} + +function staticReservationStaleBefore() { + return new Date(Date.now() - env.staticWorkers.reservationTtlMs) +} + +async function markStaleStaticReservationsFailed(tx: StaticAssignmentDb) { + const staleRows = await tx + .select({ workerId: WorkerInstanceTable.worker_id }) + .from(WorkerInstanceTable) + .where( + and( + eq(WorkerInstanceTable.provider, "static"), + eq(WorkerInstanceTable.status, "provisioning"), + sql`${WorkerInstanceTable.updated_at} < ${staticReservationStaleBefore()}`, + ), + ) + const staleWorkerIds = [...new Set(staleRows.map((row) => row.workerId))] + if (staleWorkerIds.length > 0) { + await tx + .update(WorkerTable) + .set({ status: "failed" }) + .where(inArray(WorkerTable.id, staleWorkerIds)) + } + + await tx + .update(WorkerInstanceTable) + .set({ status: "failed" }) + .where( + and( + eq(WorkerInstanceTable.provider, "static"), + eq(WorkerInstanceTable.status, "provisioning"), + sql`${WorkerInstanceTable.updated_at} < ${staticReservationStaleBefore()}`, + ), + ) +} + +export function readMySqlLockAcquired(result: unknown) { + const rows = Array.isArray(result) && Array.isArray(result[0]) ? result[0] : result + if (!Array.isArray(rows)) { + return 0 + } + const first = rows[0] + if (first && typeof first === "object" && "acquired" in first) { + return Number((first as { acquired: unknown }).acquired) + } + return 0 +} + +function getMySqlLockPool(): MySqlLockPool { + if (dbClient && typeof dbClient === "object" && "getConnection" in dbClient && typeof dbClient.getConnection === "function") { + return dbClient as MySqlLockPool + } + throw new Error("Static worker assignment locking requires MySQL DB_MODE") +} + +export async function withStaticAssignmentLockUsing(input: { + pool: MySqlLockPool + transaction: (run: (tx: StaticAssignmentDb) => Promise) => Promise + run: (tx: StaticAssignmentDb) => Promise +}) { + const connection = await input.pool.getConnection() + let lockAcquired = false + + try { + const lockRows = await connection.query(`SELECT GET_LOCK(?, 10) AS acquired`, [STATIC_PROVISIONING_LOCK_NAME]) + const acquired = readMySqlLockAcquired(lockRows) + + if (acquired !== 1) { + throw new Error("Timed out waiting for static worker assignment lock") + } + + lockAcquired = true + return await input.transaction(input.run) + } finally { + if (lockAcquired) { + await connection.query(`SELECT RELEASE_LOCK(?)`, [STATIC_PROVISIONING_LOCK_NAME]) + } + connection.release() + } +} + +export async function withStaticAssignmentLock(run: (tx: StaticAssignmentDb) => Promise) { + return withStaticAssignmentLockUsing({ + pool: getMySqlLockPool(), + transaction: (callback) => db.transaction(callback), + run, + }) +} + +async function reserveStaticWorkerInstance(input: { + workerId: WorkerId +}) { + return withStaticAssignmentLock(async (tx) => { + await markStaleStaticReservationsFailed(tx) + + const rows = await tx + .select({ url: WorkerInstanceTable.url }) + .from(WorkerInstanceTable) + .where( + and( + eq(WorkerInstanceTable.provider, "static"), + inArray(WorkerInstanceTable.status, ["provisioning", "healthy"]), + ), + ) + + const url = normalizeStaticWorkerUrl(selectStaticWorkerUrlFromPool(input.workerId, { + ...env.staticWorkers, + unavailableUrls: rows.map((row) => row.url), + })) + const tokens = getStaticWorkerTokenPairForUrl(url, env.staticWorkers) + const instanceId = createDenTypeId("workerInstance") + + await tx.insert(WorkerInstanceTable).values({ + id: instanceId, + worker_id: input.workerId, + provider: "static", + region: "on-prem", + url, + status: "provisioning", + }) + + await tx + .update(WorkerTokenTable) + .set({ token: tokens.hostToken }) + .where(and(eq(WorkerTokenTable.worker_id, input.workerId), eq(WorkerTokenTable.scope, "host"))) + + await tx + .update(WorkerTokenTable) + .set({ token: tokens.clientToken }) + .where(and(eq(WorkerTokenTable.worker_id, input.workerId), eq(WorkerTokenTable.scope, "client"))) + + return { instanceId, url, tokens } + }) +} + +async function continueStaticCloudProvisioning(input: { + workerId: WorkerId + name: string + hostToken: string + clientToken: string + activityToken: string +}) { + const reservation = await reserveStaticWorkerInstance({ workerId: input.workerId }) + + try { + await checkStaticWorkerHealth(reservation.url, env.staticWorkers) + await verifyStaticWorkerRuntimeAccess(reservation.url, reservation.tokens, env.staticWorkers) + + await db.transaction(async (tx) => { + await tx + .update(WorkerTable) + .set({ status: "healthy" }) + .where(eq(WorkerTable.id, input.workerId)) + + await tx + .update(WorkerInstanceTable) + .set({ status: "healthy" }) + .where(eq(WorkerInstanceTable.id, reservation.instanceId)) + }) + } catch (error) { + await db.transaction(async (tx) => { + await tx + .update(WorkerTable) + .set({ status: "failed" }) + .where(eq(WorkerTable.id, input.workerId)) + + await tx + .update(WorkerInstanceTable) + .set({ status: "failed" }) + .where(eq(WorkerInstanceTable.id, reservation.instanceId)) + }) + throw error + } +} + export function toInstanceResponse(instance: WorkerInstanceRow | null) { if (!instance) { return null @@ -327,12 +545,18 @@ export async function continueCloudProvisioning(input: { activityToken: string }) { try { + if (env.provisionerMode === "static") { + await continueStaticCloudProvisioning(input) + return + } + const provisioned = await provisionWorker({ workerId: input.workerId, name: input.name, hostToken: input.hostToken, clientToken: input.clientToken, activityToken: input.activityToken, + unavailableStaticWorkerUrls: await getUnavailableStaticWorkerUrls(), }) await db diff --git a/ee/apps/den-api/src/workers/provisioner.ts b/ee/apps/den-api/src/workers/provisioner.ts index dcf952a34e..d144211ba3 100644 --- a/ee/apps/den-api/src/workers/provisioner.ts +++ b/ee/apps/den-api/src/workers/provisioner.ts @@ -17,6 +17,7 @@ export type ProvisionInput = { hostToken: string clientToken: string activityToken: string + unavailableStaticWorkerUrls?: string[] } export type ProvisionedInstance = { @@ -26,6 +27,26 @@ export type ProvisionedInstance = { region?: string } +export type StaticWorkerTokenPair = { + clientToken: string + hostToken: string +} + +export type StaticWorkerConfig = { + urls: string[] + healthPath: string + healthcheckTimeoutMs: number + healthcheckIntervalMs: number + reservationTtlMs?: number + unavailableUrls?: string[] + tokenMap?: Record +} + +type StaticWorkerReservation = { + workerId: string + expiresAt: number +} + type RenderService = { id: string name?: string @@ -54,6 +75,7 @@ const terminalDeployStates = new Set([ ]) const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)) +const staticWorkerReservations = new Map() const slug = (value: string) => value @@ -135,25 +157,219 @@ async function waitForDeployLive(serviceId: string) { async function waitForHealth( url: string, timeoutMs = env.render.healthcheckTimeoutMs, + intervalMs = env.render.pollIntervalMs, + healthPath = "/health", ) { - const healthUrl = `${url.replace(/\/$/, "")}/health` + const normalizedPath = healthPath.startsWith("/") ? healthPath : `/${healthPath}` + const healthUrl = `${url.replace(/\/$/, "")}${normalizedPath}` const startedAt = Date.now() while (Date.now() - startedAt < timeoutMs) { + const remainingMs = timeoutMs - (Date.now() - startedAt) + const controller = new AbortController() + const timeout = setTimeout(() => controller.abort(), remainingMs) + try { - const response = await fetch(healthUrl, { method: "GET" }) + const response = await fetch(healthUrl, { + method: "GET", + signal: controller.signal, + }) if (response.ok) { return } } catch { // ignore transient network failures while the instance boots + } finally { + clearTimeout(timeout) + } + + const elapsedMs = Date.now() - startedAt + const sleepMs = Math.min(intervalMs, Math.max(timeoutMs - elapsedMs, 0)) + if (sleepMs > 0) { + await sleep(sleepMs) } - await sleep(env.render.pollIntervalMs) } throw new Error(`Timed out waiting for worker health endpoint ${healthUrl}`) } +export function normalizeStaticWorkerUrl(value: string) { + const parsedUrl = new URL(value.trim()) + parsedUrl.hash = "" + parsedUrl.search = "" + parsedUrl.pathname = parsedUrl.pathname.replace(/\/+$/, "") + return parsedUrl.toString().replace(/\/+$/, "") +} + +function safeNormalizeWorkerUrl(value: string) { + try { + return normalizeStaticWorkerUrl(value) + } catch { + return "" + } +} + +export function getStaticWorkerTokenPairForUrl(url: string, config: StaticWorkerConfig) { + const normalizedUrl = normalizeStaticWorkerUrl(url) + const pair = config.tokenMap?.[normalizedUrl] + if (!pair?.clientToken.trim() || !pair.hostToken.trim()) { + throw new Error(`STATIC_WORKER_TOKEN_MAP_JSON is missing token pair for ${normalizedUrl}`) + } + return { + clientToken: pair.clientToken.trim(), + hostToken: pair.hostToken.trim(), + } +} + +async function fetchStaticWorkerRuntime(url: string, path: string, headers: Record, timeoutMs: number) { + return fetch(`${url.replace(/\/$/, "")}${path}`, { + method: "GET", + redirect: "manual", + headers: { Accept: "application/json", ...headers }, + signal: AbortSignal.timeout(timeoutMs), + }) +} + +export async function verifyStaticWorkerRuntimeAccess( + url: string, + tokens: StaticWorkerTokenPair, + config: Pick, +) { + const clientResponse = await fetchStaticWorkerRuntime(url, "/workspaces", { + Authorization: `Bearer ${tokens.clientToken}`, + }, config.healthcheckTimeoutMs) + if (!clientResponse.ok) { + throw new Error(`Worker rejected configured static client token with HTTP ${clientResponse.status}`) + } + + const hostResponse = await fetchStaticWorkerRuntime(url, "/env/keys", { + "X-OpenWork-Host-Token": tokens.hostToken, + }, config.healthcheckTimeoutMs) + if (!hostResponse.ok) { + throw new Error(`Worker rejected configured static host token with HTTP ${hostResponse.status}`) + } +} + +function pruneExpiredStaticWorkerReservations(now = Date.now()) { + for (const [url, reservation] of staticWorkerReservations) { + if (reservation.expiresAt <= now) { + staticWorkerReservations.delete(url) + } + } +} + +function selectStaticWorkerUrl(workerId: string, config: StaticWorkerConfig) { + pruneExpiredStaticWorkerReservations() + + return selectStaticWorkerUrlFromPool(workerId, config, true) +} + +export function selectStaticWorkerUrlFromPool( + workerId: string, + config: StaticWorkerConfig, + includeInProcessReservations = false, +) { + if (includeInProcessReservations) { + pruneExpiredStaticWorkerReservations() + } + + const urls = config.urls.map(safeNormalizeWorkerUrl).filter(Boolean) + if (urls.length === 0) { + throw new Error("STATIC_WORKER_URLS is required when PROVISIONER_MODE=static") + } + + const unavailableUrls = new Set( + (config.unavailableUrls ?? []).map(safeNormalizeWorkerUrl).filter(Boolean), + ) + if (includeInProcessReservations) { + for (const [url, reservation] of staticWorkerReservations) { + if (reservation.workerId !== workerId) { + unavailableUrls.add(url) + } + } + } + + const availableUrls = urls.filter((url) => !unavailableUrls.has(url)) + + if (availableUrls.length === 0) { + throw new Error( + "No available static worker URL remains; all configured STATIC_WORKER_URLS are already assigned to active workers", + ) + } + + let hash = 0 + for (const char of workerId) { + hash = (hash * 31 + char.charCodeAt(0)) >>> 0 + } + + return availableUrls[hash % availableUrls.length]! +} + +function reserveStaticWorkerUrl(workerId: string, url: string, ttlMs: number) { + if (ttlMs <= 0) { + return + } + staticWorkerReservations.set(url, { + workerId, + expiresAt: Date.now() + ttlMs, + }) +} + +export function releaseStaticWorkerUrl(workerId: string, url: string) { + const reservation = staticWorkerReservations.get(url) + if (reservation?.workerId === workerId) { + staticWorkerReservations.delete(url) + } +} + +export async function provisionStaticWorker( + input: ProvisionInput, + config: StaticWorkerConfig = env.staticWorkers, +): Promise { + const reservationTtlMs = config.reservationTtlMs ?? 300000 + const url = selectStaticWorkerUrl(input.workerId, { + ...config, + unavailableUrls: config.unavailableUrls ?? input.unavailableStaticWorkerUrls, + }) + reserveStaticWorkerUrl(input.workerId, url, reservationTtlMs) + + try { + await waitForHealth( + url, + config.healthcheckTimeoutMs, + config.healthcheckIntervalMs, + config.healthPath, + ) + const tokens = config.tokenMap?.[url] + if (tokens) { + await verifyStaticWorkerRuntimeAccess(url, tokens, config) + } + } catch (error) { + releaseStaticWorkerUrl(input.workerId, url) + throw error + } + + if (reservationTtlMs <= 0) { + releaseStaticWorkerUrl(input.workerId, url) + } + + return { + provider: "static", + url, + status: "healthy", + region: "on-prem", + } +} + +export async function checkStaticWorkerHealth(url: string, config: StaticWorkerConfig) { + await waitForHealth( + url, + config.healthcheckTimeoutMs, + config.healthcheckIntervalMs, + config.healthPath, + ) +} + async function listRenderServices(limit = 200) { const rows: RenderService[] = [] let cursor: string | undefined @@ -343,6 +559,10 @@ export async function provisionWorker( return provisionWorkerOnDaytona(input) } + if (env.provisionerMode === "static") { + return provisionStaticWorker(input) + } + const template = env.workerUrlTemplate ?? "https://workers.local/{workerId}" const url = template.replace("{workerId}", input.workerId) return { @@ -356,6 +576,13 @@ export async function deprovisionWorker(input: { workerId: WorkerId instanceUrl: string | null }) { + if (env.provisionerMode === "static") { + if (input.instanceUrl) { + releaseStaticWorkerUrl(input.workerId, safeNormalizeWorkerUrl(input.instanceUrl)) + } + return + } + if (env.provisionerMode === "daytona") { await deprovisionWorkerOnDaytona(input.workerId) return diff --git a/ee/apps/den-api/test/provisioner-static.test.ts b/ee/apps/den-api/test/provisioner-static.test.ts new file mode 100644 index 0000000000..09779ef95e --- /dev/null +++ b/ee/apps/den-api/test/provisioner-static.test.ts @@ -0,0 +1,316 @@ +import { afterAll, beforeAll, expect, test } from "bun:test" +import type { ProvisionedInstance, StaticWorkerConfig } from "../src/workers/provisioner.js" + +function seedRequiredEnv() { + process.env.DATABASE_URL = process.env.DATABASE_URL ?? "mysql://root:password@127.0.0.1:3306/openwork_test" + process.env.DEN_DB_ENCRYPTION_KEY = process.env.DEN_DB_ENCRYPTION_KEY ?? "x".repeat(32) + process.env.BETTER_AUTH_SECRET = process.env.BETTER_AUTH_SECRET ?? "y".repeat(32) + process.env.BETTER_AUTH_URL = process.env.BETTER_AUTH_URL ?? "http://127.0.0.1:8790" + process.env.CORS_ORIGINS = process.env.CORS_ORIGINS ?? "http://127.0.0.1:8790" + process.env.PROVISIONER_MODE = "static" + process.env.STATIC_WORKER_URLS = process.env.STATIC_WORKER_URLS ?? "http://127.0.0.1:8787" + process.env.STATIC_WORKER_TOKEN_MAP_JSON = process.env.STATIC_WORKER_TOKEN_MAP_JSON ?? '{"http://127.0.0.1:8787":{"clientToken":"static-client-token","hostToken":"static-host-token"}}' + process.env.STATIC_WORKER_ATTACH_ALLOW_PRIVATE = "true" +} + +let provisionerModule: typeof import("../src/workers/provisioner.js") +let envModule: typeof import("../src/env.js") +let server: ReturnType +let staticWorkerUrl: string + +function staticWorkerConfig(overrides: Partial = {}): StaticWorkerConfig { + return { + urls: [staticWorkerUrl], + healthPath: "/health", + healthcheckTimeoutMs: 1000, + healthcheckIntervalMs: 10, + reservationTtlMs: 0, + ...overrides, + } +} + +beforeAll(async () => { + seedRequiredEnv() + server = Bun.serve({ + port: 0, + fetch(request) { + const url = new URL(request.url) + const authorization = request.headers.get("authorization") + const hostToken = request.headers.get("x-openwork-host-token") + if (url.pathname === "/health") { + return Response.json({ ok: true }) + } + if (url.pathname === "/workspaces") { + return authorization === "Bearer valid-client-token" + ? Response.json({ items: [], activeId: null }) + : Response.json({ error: "unauthorized" }, { status: 401 }) + } + if (url.pathname === "/env/keys") { + return hostToken === "valid-host-token" + ? Response.json({ keys: [] }) + : Response.json({ error: "forbidden" }, { status: 403 }) + } + if (url.pathname === "/redirect-workspaces") { + return new Response(null, { status: 302, headers: { location: "/workspaces" } }) + } + if (url.pathname === "/hang-health") { + return new Promise(() => {}) + } + return new Response("not found", { status: 404 }) + }, + }) + staticWorkerUrl = `http://127.0.0.1:${server.port}` + envModule = await import("../src/env.js") + provisionerModule = await import("../src/workers/provisioner.js") +}) + +afterAll(() => { + server.stop(true) +}) + +test("static provisioner assigns a configured healthy worker URL", async () => { + const provisioned = await provisionerModule.provisionStaticWorker( + { + workerId: "worker_static_health_123", + name: "Static Health", + hostToken: "host-token", + clientToken: "client-token", + activityToken: "activity-token", + }, + staticWorkerConfig(), + ) + + expect(provisioned).toEqual({ + provider: "static", + region: "on-prem", + status: "healthy", + url: staticWorkerUrl, + }) +}) + +test("static provisioner skips URLs already assigned to active workers", async () => { + const provisioned = await provisionerModule.provisionStaticWorker( + { + workerId: "worker_static_available_url_123", + name: "Static Available URL", + hostToken: "host-token", + clientToken: "client-token", + activityToken: "activity-token", + unavailableStaticWorkerUrls: ["http://127.0.0.1:1/"], + }, + staticWorkerConfig({ + urls: ["http://127.0.0.1:1/", staticWorkerUrl], + }), + ) + + expect(provisioned.url).toBe(staticWorkerUrl) + expect(provisioned.status).toBe("healthy") +}) + +test("static provisioner fails clearly when every configured URL is already active", async () => { + await expect(provisionerModule.provisionStaticWorker( + { + workerId: "worker_static_exhausted_123", + name: "Static Exhausted", + hostToken: "host-token", + clientToken: "client-token", + activityToken: "activity-token", + unavailableStaticWorkerUrls: [staticWorkerUrl], + }, + staticWorkerConfig({ + urls: [staticWorkerUrl], + }), + )).rejects.toThrow("No available static worker URL remains") +}) + +test("static selector exhausts against DB-recomputed active normalized URLs", () => { + expect(() => provisionerModule.selectStaticWorkerUrlFromPool( + "worker_static_db_active_exhausted_123", + staticWorkerConfig({ + urls: [staticWorkerUrl], + unavailableUrls: [`${staticWorkerUrl}/`], + }), + )).toThrow("No available static worker URL remains") +}) + +test("static selector allows failed or stopped URLs when DB active set excludes them", () => { + const selected = provisionerModule.selectStaticWorkerUrlFromPool( + "worker_static_db_failed_reuse_123", + staticWorkerConfig({ + urls: [staticWorkerUrl], + unavailableUrls: [], + }), + ) + + expect(selected).toBe(staticWorkerUrl) +}) + +test("static provisioner fails clearly when no worker URLs are configured", async () => { + await expect(provisionerModule.provisionStaticWorker( + { + workerId: "worker_static_missing_url_123", + name: "Static Missing URL", + hostToken: "host-token", + clientToken: "client-token", + activityToken: "activity-token", + }, + staticWorkerConfig({ + urls: [], + }), + )).rejects.toThrow("STATIC_WORKER_URLS is required when PROVISIONER_MODE=static") +}) + +test("static provisioner fails clearly when health check does not pass", async () => { + await expect(provisionerModule.provisionStaticWorker( + { + workerId: "worker_static_unhealthy_123", + name: "Static Unhealthy", + hostToken: "host-token", + clientToken: "client-token", + activityToken: "activity-token", + }, + staticWorkerConfig({ + urls: [`${staticWorkerUrl}/missing`], + healthcheckTimeoutMs: 50, + }), + )).rejects.toThrow("Timed out waiting for worker health endpoint") +}) + +test("static provisioner aborts a hanging health check within the configured timeout", async () => { + const startedAt = performance.now() + + await expect(provisionerModule.provisionStaticWorker( + { + workerId: "worker_static_hanging_health_123", + name: "Static Hanging Health", + hostToken: "host-token", + clientToken: "client-token", + activityToken: "activity-token", + }, + staticWorkerConfig({ + healthPath: "/hang-health", + healthcheckTimeoutMs: 75, + }), + )).rejects.toThrow("Timed out waiting for worker health endpoint") + + expect(performance.now() - startedAt).toBeLessThan(1000) +}) + +test("static env validation requires config only when static mode is enabled", () => { + expect(envModule.parseStaticWorkersEnv({ STATIC_WORKER_URLS: undefined }).issues).toContainEqual({ + path: "STATIC_WORKER_URLS", + message: "STATIC_WORKER_URLS is required when PROVISIONER_MODE=static", + }) +}) + +test("static env validation normalizes URLs and rejects duplicate normalized URLs", () => { + const parsed = envModule.parseStaticWorkersEnv({ + STATIC_WORKER_URLS: "https://Worker.Example.com/, https://worker.example.com", + }) + + expect(parsed.urls).toEqual(["https://worker.example.com"]) + expect(parsed.issues.some((issue) => issue.message.includes("duplicate URL https://worker.example.com"))).toBe(true) +}) + +test("static env validation rejects invalid URL, protocol, health path, and timeout values", () => { + const parsed = envModule.parseStaticWorkersEnv({ + STATIC_WORKER_URLS: "not-a-url, ftp://worker.example.com", + STATIC_WORKER_HEALTH_PATH: "health?ready=1", + STATIC_WORKER_HEALTHCHECK_TIMEOUT_MS: "0", + STATIC_WORKER_HEALTHCHECK_INTERVAL_MS: "NaN", + STATIC_WORKER_RESERVATION_TTL_MS: "-1", + }) + + expect(parsed.issues.some((issue) => issue.message === "STATIC_WORKER_URLS entries must be valid URLs")).toBe(true) + expect(parsed.issues.some((issue) => issue.message === "STATIC_WORKER_URLS entries must use http or https URLs")).toBe(true) + expect(parsed.issues.some((issue) => issue.path === "STATIC_WORKER_HEALTH_PATH")).toBe(true) + expect(parsed.issues.some((issue) => issue.path === "STATIC_WORKER_HEALTHCHECK_TIMEOUT_MS")).toBe(true) + expect(parsed.issues.some((issue) => issue.path === "STATIC_WORKER_HEALTHCHECK_INTERVAL_MS")).toBe(true) + expect(parsed.issues.some((issue) => issue.path === "STATIC_WORKER_RESERVATION_TTL_MS")).toBe(true) +}) + +test("static provisioner in-process reservations prevent concurrent duplicate assignment", async () => { + const config = staticWorkerConfig({ reservationTtlMs: 1000 }) + const first = provisionerModule.provisionStaticWorker( + { + workerId: "worker_static_concurrent_a_123", + name: "Static Concurrent A", + hostToken: "host-token", + clientToken: "client-token", + activityToken: "activity-token", + }, + config, + ) + const second = provisionerModule.provisionStaticWorker( + { + workerId: "worker_static_concurrent_b_123", + name: "Static Concurrent B", + hostToken: "host-token", + clientToken: "client-token", + activityToken: "activity-token", + }, + config, + ) + + const results = await Promise.allSettled([first, second]) + expect(results.filter((result) => result.status === "fulfilled")).toHaveLength(1) + expect(results.filter((result) => result.status === "rejected")).toHaveLength(1) + + const fulfilled = results.find((result): result is PromiseFulfilledResult => result.status === "fulfilled") + expect(fulfilled?.value.url).toBe(staticWorkerUrl) + await provisionerModule.deprovisionWorker({ workerId: "worker_static_concurrent_a_123", instanceUrl: staticWorkerUrl }) + await provisionerModule.deprovisionWorker({ workerId: "worker_static_concurrent_b_123", instanceUrl: staticWorkerUrl }) +}) + +test("static provisioner releases failed health reservations for reuse", async () => { + await expect(provisionerModule.provisionStaticWorker( + { + workerId: "worker_static_failed_cleanup_123", + name: "Static Failed Cleanup", + hostToken: "host-token", + clientToken: "client-token", + activityToken: "activity-token", + }, + staticWorkerConfig({ healthPath: "/missing", healthcheckTimeoutMs: 50, reservationTtlMs: 1000 }), + )).rejects.toThrow("Timed out waiting for worker health endpoint") + + const provisioned = await provisionerModule.provisionStaticWorker( + { + workerId: "worker_static_failed_cleanup_reuse_123", + name: "Static Failed Cleanup Reuse", + hostToken: "host-token", + clientToken: "client-token", + activityToken: "activity-token", + }, + staticWorkerConfig(), + ) + expect(provisioned.url).toBe(staticWorkerUrl) +}) + +test("static provisioner recovers stale reservations for reuse", async () => { + const first = await provisionerModule.provisionStaticWorker( + { + workerId: "worker_static_stale_a_123", + name: "Static Stale A", + hostToken: "host-token", + clientToken: "client-token", + activityToken: "activity-token", + }, + staticWorkerConfig({ reservationTtlMs: 1 }), + ) + expect(first.url).toBe(staticWorkerUrl) + + await new Promise((resolve) => setTimeout(resolve, 5)) + + const second = await provisionerModule.provisionStaticWorker( + { + workerId: "worker_static_stale_b_123", + name: "Static Stale B", + hostToken: "host-token", + clientToken: "client-token", + activityToken: "activity-token", + }, + staticWorkerConfig(), + ) + expect(second.url).toBe(staticWorkerUrl) +}) diff --git a/ee/apps/den-web/app/(den)/_lib/den-flow.ts b/ee/apps/den-web/app/(den)/_lib/den-flow.ts index 3cb38f61bc..def4207222 100644 --- a/ee/apps/den-web/app/(den)/_lib/den-flow.ts +++ b/ee/apps/den-web/app/(den)/_lib/den-flow.ts @@ -148,6 +148,8 @@ export type OnboardingIntent = { authMethod: AuthMethod; }; +export type OnboardingAutoLaunchDecision = "idle" | "wait" | "complete_existing" | "launch"; + type PosthogClient = { capture?: (eventName: string, properties?: Record) => void; identify?: (distinctId?: string, properties?: Record) => void; @@ -253,6 +255,40 @@ export function deriveOnboardingWorkerName(user: AuthUser): string { return normalizeWorkerName(`${owner}${suffix}`); } +export function getOnboardingAutoLaunchDecision(input: { + userId: string | null; + activeOrganizationId: string | null; + onboardingIntent: OnboardingIntent | null; + billingSummary: BillingSummary | null; + workersLoadedOnce: boolean; + ownedWorkerCount: number; + launchBusy: boolean; + currentAutoLaunchKey: string | null; +}): OnboardingAutoLaunchDecision { + if (!input.userId || !input.onboardingIntent?.shouldLaunch || input.onboardingIntent.completed) { + return "idle"; + } + + if (!input.activeOrganizationId || !input.billingSummary || !input.workersLoadedOnce) { + return "wait"; + } + + if (input.ownedWorkerCount > 0) { + return "complete_existing"; + } + + if (input.billingSummary.featureGateEnabled && !input.billingSummary.hasActivePlan) { + return "wait"; + } + + if (input.launchBusy) { + return "wait"; + } + + const autoLaunchKey = `${input.activeOrganizationId}:${input.onboardingIntent.workerName || DEFAULT_WORKER_NAME}`; + return input.currentAutoLaunchKey === autoLaunchKey ? "wait" : "launch"; +} + export function getSocialCallbackUrl(): string { try { const origin = typeof window !== "undefined" ? window.location.origin : OPENWORK_AUTH_CALLBACK_BASE_URL || "https://app.openworklabs.com"; diff --git a/ee/apps/den-web/app/(den)/_providers/den-flow-provider.tsx b/ee/apps/den-web/app/(den)/_providers/den-flow-provider.tsx index 2db51d2a83..666c2eeab1 100644 --- a/ee/apps/den-web/app/(den)/_providers/den-flow-provider.tsx +++ b/ee/apps/den-web/app/(den)/_providers/den-flow-provider.tsx @@ -31,6 +31,7 @@ import { getBillingSummary, getEmailDomain, getErrorMessage, + getOnboardingAutoLaunchDecision, getOrgLimitError, getRuntimeServiceLabel, getSocialCallbackUrl, @@ -215,6 +216,7 @@ export function DenFlowProvider({ children }: { children: ReactNode }) { const [runtimeBusy, setRuntimeBusy] = useState(false); const [runtimeError, setRuntimeError] = useState(null); const [runtimeUpgradeBusy, setRuntimeUpgradeBusy] = useState(false); + const [activeOrganizationId, setActiveOrganizationId] = useState(null); const [onboardingIntent, setOnboardingIntent] = useState(null); const onboardingAutoLaunchKeyRef = useRef(null); @@ -906,6 +908,7 @@ export function DenFlowProvider({ children }: { children: ReactNode }) { const { response, payload } = await requestJson("/v1/me/orgs", { method: "GET", headers }, 12000); if (!response.ok) { + setActiveOrganizationId(null); return { orgs: [], activeOrgId: null, @@ -913,7 +916,9 @@ export function DenFlowProvider({ children }: { children: ReactNode }) { }; } - return parseOrgListPayload(payload); + const orgDirectory = parseOrgListPayload(payload); + setActiveOrganizationId(orgDirectory.activeOrgId ?? orgDirectory.orgs[0]?.id ?? null); + return orgDirectory; } async function resolveDashboardRoute() { @@ -1189,6 +1194,7 @@ export function DenFlowProvider({ children }: { children: ReactNode }) { setBillingSummary(null); setBillingError(null); setOrgLimitError(null); + setActiveOrganizationId(null); setBillingBusy(false); setBillingLoadedOnce(false); setTokenFetchedForWorkerId(null); @@ -1724,6 +1730,7 @@ export function DenFlowProvider({ children }: { children: ReactNode }) { useEffect(() => { if (!user) { + setActiveOrganizationId(null); setWorkers([]); setWorkersLoadedOnce(false); setWorkersError(null); @@ -1917,36 +1924,35 @@ export function DenFlowProvider({ children }: { children: ReactNode }) { }, [desktopAuthRequested, user?.id, authToken, desktopRedirectUrl, desktopRedirectBusy, desktopRedirectAttempted, desktopAuthScheme]); useEffect(() => { - if (!user || !onboardingPending) { - onboardingAutoLaunchKeyRef.current = null; - return; - } - - if (!billingSummary) { - return; - } + const decision = getOnboardingAutoLaunchDecision({ + userId: user?.id ?? null, + activeOrganizationId, + onboardingIntent, + billingSummary, + workersLoadedOnce, + ownedWorkerCount, + launchBusy, + currentAutoLaunchKey: onboardingAutoLaunchKeyRef.current, + }); - if (billingSummary.featureGateEnabled && !billingSummary.hasActivePlan) { + if (decision === "idle") { + onboardingAutoLaunchKeyRef.current = null; return; } - if (ownedWorkerCount > 0) { + if (decision === "complete_existing") { markOnboardingComplete(); return; } - if (launchBusy) { - return; - } - - const autoLaunchKey = `${user.id}:${onboardingIntent?.workerName ?? DEFAULT_WORKER_NAME}`; - if (onboardingAutoLaunchKeyRef.current === autoLaunchKey) { + if (decision !== "launch" || !user || !onboardingIntent) { return; } + const autoLaunchKey = `${activeOrganizationId}:${onboardingIntent.workerName || DEFAULT_WORKER_NAME}`; onboardingAutoLaunchKeyRef.current = autoLaunchKey; - markOnboardingComplete(); - }, [billingSummary?.featureGateEnabled, billingSummary?.hasActivePlan, launchBusy, onboardingIntent?.workerName, onboardingPending, ownedWorkerCount, user?.id]); + void launchWorker({ source: "signup_auto", workerNameOverride: onboardingIntent.workerName }); + }, [activeOrganizationId, billingSummary, launchBusy, onboardingIntent, onboardingPending, ownedWorkerCount, user?.id, workersLoadedOnce]); useEffect(() => { if (!user) {