Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 3 additions & 38 deletions src/main/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { app, BrowserWindow, ipcMain, safeStorage, screen } from 'electron'
import { app, BrowserWindow, safeStorage, screen } from 'electron'
import { join } from 'path'
import { readFileSync } from 'fs'
import { createPtyManager, type PtyManager } from './pty-manager'
Expand All @@ -13,7 +13,6 @@ import type { WorkflowEngine } from './workflow-engine'
import { createWorktreeManager, type WorktreeManager } from './worktree-manager'
import { createWslGitPort } from './git-port'
import { createCostTracker, type CostTracker } from './cost-tracker'
import { SAFE_ID_RE } from './validation'

/** Read persisted theme at startup to match BrowserWindow background to the active theme */
const THEME_BG0: Record<string, string> = {
Expand Down Expand Up @@ -48,6 +47,7 @@ import {
registerSkillHandlers,
registerWorktreeHandlers,
registerHomeHandlers,
registerCostHandlers,
costHistory,
reviewTracker,
} from './ipc'
Expand Down Expand Up @@ -266,42 +266,7 @@ app
costTracker = createCostTracker(mainWindow, [createClaudeAdapter(), createCodexAdapter()])
}

ipcMain.handle(
'cost:bind',
(
_,
sessionId: string,
opts: { agent: string; projectPath: string; cwd: string; spawnAt: number },
) => {
// R3-01: Validate sessionId with SAFE_ID_RE consistent with all other IPC handlers
if (typeof sessionId !== 'string' || !SAFE_ID_RE.test(sessionId)) {
throw new Error('cost:bind requires a valid sessionId')
}
if (!opts || typeof opts !== 'object') {
throw new Error('cost:bind requires an opts object')
}
if (typeof opts.agent !== 'string' || !opts.agent) {
throw new Error('cost:bind requires a non-empty agent')
}
if (typeof opts.cwd !== 'string' || !opts.cwd) {
throw new Error('cost:bind requires a non-empty cwd')
}
// R2-23: Validate spawnAt and projectPath types
if (typeof opts.spawnAt !== 'number' || !Number.isFinite(opts.spawnAt)) {
throw new Error('cost:bind requires a finite numeric spawnAt')
}
if (opts.projectPath !== undefined && typeof opts.projectPath !== 'string') {
throw new Error('cost:bind requires a string projectPath')
}
costTracker?.bindSession(sessionId, opts)
},
)
ipcMain.handle('cost:unbind', (_, sessionId: string) => {
if (typeof sessionId !== 'string' || !SAFE_ID_RE.test(sessionId)) {
throw new Error('cost:unbind requires a valid sessionId')
}
costTracker?.unbindSession(sessionId)
})
registerCostHandlers(() => costTracker)

// Warn renderer if encryption is unavailable (secrets stored as plaintext)
if (!safeStorage.isEncryptionAvailable() && mainWindow) {
Expand Down
1 change: 1 addition & 0 deletions src/main/ipc/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ export { registerUtilHandlers } from './ipc-utils'
export { registerSkillHandlers } from './ipc-skills'
export { registerWorktreeHandlers } from './ipc-worktree'
export { registerHomeHandlers, costHistory, reviewTracker } from './ipc-home'
export { registerCostHandlers } from './ipc-cost'
44 changes: 44 additions & 0 deletions src/main/ipc/ipc-cost.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import { ipcMain } from 'electron'
import { SAFE_ID_RE } from '../validation'
import type { CostTracker } from '../cost-tracker'

/**
* Cost IPC handlers: bind/unbind session cost tracking.
*/
export function registerCostHandlers(getCostTracker: () => CostTracker | null): void {
ipcMain.handle(
'cost:bind',
(
_,
sessionId: string,
opts: { agent: string; projectPath: string; cwd: string; spawnAt: number },
) => {
if (typeof sessionId !== 'string' || !SAFE_ID_RE.test(sessionId)) {
throw new Error('cost:bind requires a valid sessionId')
}
if (!opts || typeof opts !== 'object') {
throw new Error('cost:bind requires an opts object')
}
if (typeof opts.agent !== 'string' || !opts.agent) {
throw new Error('cost:bind requires a non-empty agent')
}
if (typeof opts.cwd !== 'string' || !opts.cwd) {
throw new Error('cost:bind requires a non-empty cwd')
}
if (typeof opts.spawnAt !== 'number' || !Number.isFinite(opts.spawnAt)) {
throw new Error('cost:bind requires a finite numeric spawnAt')
}
if (opts.projectPath !== undefined && typeof opts.projectPath !== 'string') {
throw new Error('cost:bind requires a string projectPath')
}
getCostTracker()?.bindSession(sessionId, opts)
},
)

ipcMain.handle('cost:unbind', (_, sessionId: string) => {
if (typeof sessionId !== 'string' || !SAFE_ID_RE.test(sessionId)) {
throw new Error('cost:unbind requires a valid sessionId')
}
getCostTracker()?.unbindSession(sessionId)
})
}
231 changes: 124 additions & 107 deletions src/main/workflow-engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -189,106 +189,12 @@ export function createWorkflowEngine(
return 'false'
}

// ── Process a single node ──────────────────────────────────────
async function processNode(
// ── Retry-and-record helper ────────────────────────────────────
/** Execute a non-condition node with retry logic and record the result. */
async function executeWithRetry(
node: WorkflowNode,
scheduler: ReturnType<typeof createScheduler>,
loopEdgesByCondition: Map<string, WorkflowEdge[]>,
loopCounters: Map<string, number>,
): Promise<void> {
if (stopped) return

// Condition nodes: evaluate inline, no process spawned
if (node.type === 'condition') {
const condStartTime = Date.now()
nodeExecCount.set(node.id, (nodeExecCount.get(node.id) ?? 0) + 1)

push(workflow.id, {
type: 'node:started',
workflowId: workflow.id,
nodeId: node.id,
message: `Evaluating ${node.name}`,
})

const branch = evaluateCondition(node)
push(workflow.id, {
type: 'node:done',
workflowId: workflow.id,
nodeId: node.id,
message: `Condition: ${branch}`,
branch,
})
scheduler.resolveCondition(node.id, branch)

const condFinishTime = Date.now()
const condNodeRun: WorkflowNodeRun = {
nodeId: node.id,
nodeName: node.name,
status: 'done',
startedAt: condStartTime,
finishedAt: condFinishTime,
durationMs: condFinishTime - condStartTime,
branchTaken: branch,
}
const condExecN = nodeExecCount.get(node.id) ?? 1
if (condExecN > 1) condNodeRun.loopIterations = condExecN
recorder.recordNode(condNodeRun)

// Handle loop edges
const condLoops = loopEdgesByCondition.get(node.id) ?? []
for (const le of condLoops) {
if (le.branch === branch) {
const count = (loopCounters.get(le.id) ?? 0) + 1
loopCounters.set(le.id, count)
if (count <= (le.maxIterations ?? 1)) {
push(workflow.id, {
type: 'node:loopIteration',
workflowId: workflow.id,
nodeId: node.id,
iteration: count,
maxIterations: le.maxIterations,
message: `Loop iteration ${String(count)}/${String(le.maxIterations)}`,
})
const resetIds = scheduler.resetLoopSubgraph(le.toNodeId, node.id)
// REL-7: Clear loop counters for inner loop edges within the reset subgraph
// so nested loops restart correctly on each outer iteration.
// BUG-5/CDX-5: Also check toNodeId is in resetIds — prevents sibling loop
// edges from the same condition node from having their counters cleared
for (const innerLoops of loopEdgesByCondition.values()) {
for (const innerLe of innerLoops) {
if (
innerLe.id !== le.id &&
resetIds.has(innerLe.fromNodeId) &&
resetIds.has(innerLe.toNodeId)
) {
loopCounters.delete(innerLe.id)
}
}
}
// PERF-4: Clear output maps for re-executing nodes to prevent unbounded growth
for (const nid of resetIds) {
nodeOutputs.delete(nid)
conditionOutputs.delete(nid)
}
}
}
}
return
}

// Build context summary from upstream node outputs
const upstreamEdges = workflow.edges.filter(
(e) => e.toNodeId === node.id && e.edgeType !== 'loop',
)
const contextSummary = upstreamEdges
.map((e) => {
const out = nodeOutputs.get(e.fromNodeId)
return out ? `[${e.fromNodeId}]: ${out.slice(-4000)}` : ''
})
.filter(Boolean)
.join('\n\n')

// Run with retry
contextSummary: string,
): Promise<'success' | 'failed' | 'stopped'> {
const maxAttempts = (node.retryCount ?? 0) + 1
const retryDelay = node.retryDelayMs ?? 2000
let lastError: Error | undefined
Expand All @@ -297,7 +203,7 @@ export function createWorkflowEngine(
nodeExecCount.set(node.id, (nodeExecCount.get(node.id) ?? 0) + 1)

for (let attempt = 1; attempt <= maxAttempts; attempt++) {
if (stopped) return
if (stopped) return 'stopped'
if (attempt > 1) {
push(workflow.id, {
type: 'node:retry',
Expand Down Expand Up @@ -337,7 +243,7 @@ export function createWorkflowEngine(
message: node.message ?? 'Waiting for user to continue...',
})
await onCheckpoint(node.id)
if (stopped) return
if (stopped) return 'stopped'
push(workflow.id, {
type: 'node:resumed',
workflowId: workflow.id,
Expand All @@ -354,7 +260,6 @@ export function createWorkflowEngine(
nodeId: node.id,
message: `${node.name} completed`,
})
scheduler.completeNode(node.id)

// Record success in run history
const doneTime = Date.now()
Expand All @@ -371,7 +276,7 @@ export function createWorkflowEngine(
if (execN > 1) doneNodeRun.loopIterations = execN
recorder.recordNode(doneNodeRun)

return // success, no more retries
return 'success'
} catch (err) {
runningNodeIds.delete(node.id)
lastError = err instanceof Error ? err : new Error(String(err))
Expand Down Expand Up @@ -407,11 +312,123 @@ export function createWorkflowEngine(
if (errExecN > 1) errNodeRun.loopIterations = errExecN
recorder.recordNode(errNodeRun)

if (node.continueOnError) {
scheduler.completeNode(node.id) // treat as done for scheduling
return 'failed'
}

// ── Process a single node ──────────────────────────────────────
async function processNode(
node: WorkflowNode,
scheduler: ReturnType<typeof createScheduler>,
loopEdgesByCondition: Map<string, WorkflowEdge[]>,
loopCounters: Map<string, number>,
): Promise<void> {
if (stopped) return

// Condition nodes: evaluate inline, no process spawned
if (node.type === 'condition') {
const condStartTime = Date.now()
nodeExecCount.set(node.id, (nodeExecCount.get(node.id) ?? 0) + 1)

push(workflow.id, {
type: 'node:started',
workflowId: workflow.id,
nodeId: node.id,
message: `Evaluating ${node.name}`,
})

const branch = evaluateCondition(node)
push(workflow.id, {
type: 'node:done',
workflowId: workflow.id,
nodeId: node.id,
message: `Condition: ${branch}`,
branch,
})
scheduler.resolveCondition(node.id, branch)

const condFinishTime = Date.now()
const condNodeRun: WorkflowNodeRun = {
nodeId: node.id,
nodeName: node.name,
status: 'done',
startedAt: condStartTime,
finishedAt: condFinishTime,
durationMs: condFinishTime - condStartTime,
branchTaken: branch,
}
const condExecN = nodeExecCount.get(node.id) ?? 1
if (condExecN > 1) condNodeRun.loopIterations = condExecN
recorder.recordNode(condNodeRun)

// Handle loop edges
const condLoops = loopEdgesByCondition.get(node.id) ?? []
for (const le of condLoops) {
if (le.branch === branch) {
const count = (loopCounters.get(le.id) ?? 0) + 1
loopCounters.set(le.id, count)
if (count <= (le.maxIterations ?? 1)) {
push(workflow.id, {
type: 'node:loopIteration',
workflowId: workflow.id,
nodeId: node.id,
iteration: count,
maxIterations: le.maxIterations,
message: `Loop iteration ${String(count)}/${String(le.maxIterations)}`,
})
const resetIds = scheduler.resetLoopSubgraph(le.toNodeId, node.id)
// REL-7: Clear loop counters for inner loop edges within the reset subgraph
// so nested loops restart correctly on each outer iteration.
// BUG-5/CDX-5: Also check toNodeId is in resetIds — prevents sibling loop
// edges from the same condition node from having their counters cleared
for (const innerLoops of loopEdgesByCondition.values()) {
for (const innerLe of innerLoops) {
if (
innerLe.id !== le.id &&
resetIds.has(innerLe.fromNodeId) &&
resetIds.has(innerLe.toNodeId)
) {
loopCounters.delete(innerLe.id)
}
}
}
// PERF-4: Clear output maps for re-executing nodes to prevent unbounded growth
for (const nid of resetIds) {
nodeOutputs.delete(nid)
conditionOutputs.delete(nid)
}
}
}
}
return
}

// Build context summary from upstream node outputs
const upstreamEdges = workflow.edges.filter(
(e) => e.toNodeId === node.id && e.edgeType !== 'loop',
)
const contextSummary = upstreamEdges
.map((e) => {
const out = nodeOutputs.get(e.fromNodeId)
return out ? `[${e.fromNodeId}]: ${out.slice(-4000)}` : ''
})
.filter(Boolean)
.join('\n\n')

// Run with retry, record result
const result = await executeWithRetry(node, contextSummary)

if (result === 'stopped') return

if (result === 'success') {
scheduler.completeNode(node.id)
} else {
scheduler.failNode(node.id)
stopped = true
// result === 'failed'
if (node.continueOnError) {
scheduler.completeNode(node.id) // treat as done for scheduling
} else {
scheduler.failNode(node.id)
stopped = true
}
}
}

Expand Down
Loading