@@ -189,106 +189,12 @@ export function createWorkflowEngine(
189189 return 'false'
190190 }
191191
192- // ── Process a single node ──────────────────────────────────────
193- async function processNode (
192+ // ── Retry-and-record helper ────────────────────────────────────
193+ /** Execute a non-condition node with retry logic and record the result. */
194+ async function executeWithRetry (
194195 node : WorkflowNode ,
195- scheduler : ReturnType < typeof createScheduler > ,
196- loopEdgesByCondition : Map < string , WorkflowEdge [ ] > ,
197- loopCounters : Map < string , number > ,
198- ) : Promise < void > {
199- if ( stopped ) return
200-
201- // Condition nodes: evaluate inline, no process spawned
202- if ( node . type === 'condition' ) {
203- const condStartTime = Date . now ( )
204- nodeExecCount . set ( node . id , ( nodeExecCount . get ( node . id ) ?? 0 ) + 1 )
205-
206- push ( workflow . id , {
207- type : 'node:started' ,
208- workflowId : workflow . id ,
209- nodeId : node . id ,
210- message : `Evaluating ${ node . name } ` ,
211- } )
212-
213- const branch = evaluateCondition ( node )
214- push ( workflow . id , {
215- type : 'node:done' ,
216- workflowId : workflow . id ,
217- nodeId : node . id ,
218- message : `Condition: ${ branch } ` ,
219- branch,
220- } )
221- scheduler . resolveCondition ( node . id , branch )
222-
223- const condFinishTime = Date . now ( )
224- const condNodeRun : WorkflowNodeRun = {
225- nodeId : node . id ,
226- nodeName : node . name ,
227- status : 'done' ,
228- startedAt : condStartTime ,
229- finishedAt : condFinishTime ,
230- durationMs : condFinishTime - condStartTime ,
231- branchTaken : branch ,
232- }
233- const condExecN = nodeExecCount . get ( node . id ) ?? 1
234- if ( condExecN > 1 ) condNodeRun . loopIterations = condExecN
235- recorder . recordNode ( condNodeRun )
236-
237- // Handle loop edges
238- const condLoops = loopEdgesByCondition . get ( node . id ) ?? [ ]
239- for ( const le of condLoops ) {
240- if ( le . branch === branch ) {
241- const count = ( loopCounters . get ( le . id ) ?? 0 ) + 1
242- loopCounters . set ( le . id , count )
243- if ( count <= ( le . maxIterations ?? 1 ) ) {
244- push ( workflow . id , {
245- type : 'node:loopIteration' ,
246- workflowId : workflow . id ,
247- nodeId : node . id ,
248- iteration : count ,
249- maxIterations : le . maxIterations ,
250- message : `Loop iteration ${ String ( count ) } /${ String ( le . maxIterations ) } ` ,
251- } )
252- const resetIds = scheduler . resetLoopSubgraph ( le . toNodeId , node . id )
253- // REL-7: Clear loop counters for inner loop edges within the reset subgraph
254- // so nested loops restart correctly on each outer iteration.
255- // BUG-5/CDX-5: Also check toNodeId is in resetIds — prevents sibling loop
256- // edges from the same condition node from having their counters cleared
257- for ( const innerLoops of loopEdgesByCondition . values ( ) ) {
258- for ( const innerLe of innerLoops ) {
259- if (
260- innerLe . id !== le . id &&
261- resetIds . has ( innerLe . fromNodeId ) &&
262- resetIds . has ( innerLe . toNodeId )
263- ) {
264- loopCounters . delete ( innerLe . id )
265- }
266- }
267- }
268- // PERF-4: Clear output maps for re-executing nodes to prevent unbounded growth
269- for ( const nid of resetIds ) {
270- nodeOutputs . delete ( nid )
271- conditionOutputs . delete ( nid )
272- }
273- }
274- }
275- }
276- return
277- }
278-
279- // Build context summary from upstream node outputs
280- const upstreamEdges = workflow . edges . filter (
281- ( e ) => e . toNodeId === node . id && e . edgeType !== 'loop' ,
282- )
283- const contextSummary = upstreamEdges
284- . map ( ( e ) => {
285- const out = nodeOutputs . get ( e . fromNodeId )
286- return out ? `[${ e . fromNodeId } ]: ${ out . slice ( - 4000 ) } ` : ''
287- } )
288- . filter ( Boolean )
289- . join ( '\n\n' )
290-
291- // Run with retry
196+ contextSummary : string ,
197+ ) : Promise < 'success' | 'failed' | 'stopped' > {
292198 const maxAttempts = ( node . retryCount ?? 0 ) + 1
293199 const retryDelay = node . retryDelayMs ?? 2000
294200 let lastError : Error | undefined
@@ -297,7 +203,7 @@ export function createWorkflowEngine(
297203 nodeExecCount . set ( node . id , ( nodeExecCount . get ( node . id ) ?? 0 ) + 1 )
298204
299205 for ( let attempt = 1 ; attempt <= maxAttempts ; attempt ++ ) {
300- if ( stopped ) return
206+ if ( stopped ) return 'stopped'
301207 if ( attempt > 1 ) {
302208 push ( workflow . id , {
303209 type : 'node:retry' ,
@@ -337,7 +243,7 @@ export function createWorkflowEngine(
337243 message : node . message ?? 'Waiting for user to continue...' ,
338244 } )
339245 await onCheckpoint ( node . id )
340- if ( stopped ) return
246+ if ( stopped ) return 'stopped'
341247 push ( workflow . id , {
342248 type : 'node:resumed' ,
343249 workflowId : workflow . id ,
@@ -354,7 +260,6 @@ export function createWorkflowEngine(
354260 nodeId : node . id ,
355261 message : `${ node . name } completed` ,
356262 } )
357- scheduler . completeNode ( node . id )
358263
359264 // Record success in run history
360265 const doneTime = Date . now ( )
@@ -371,7 +276,7 @@ export function createWorkflowEngine(
371276 if ( execN > 1 ) doneNodeRun . loopIterations = execN
372277 recorder . recordNode ( doneNodeRun )
373278
374- return // success, no more retries
279+ return ' success'
375280 } catch ( err ) {
376281 runningNodeIds . delete ( node . id )
377282 lastError = err instanceof Error ? err : new Error ( String ( err ) )
@@ -407,11 +312,123 @@ export function createWorkflowEngine(
407312 if ( errExecN > 1 ) errNodeRun . loopIterations = errExecN
408313 recorder . recordNode ( errNodeRun )
409314
410- if ( node . continueOnError ) {
411- scheduler . completeNode ( node . id ) // treat as done for scheduling
315+ return 'failed'
316+ }
317+
318+ // ── Process a single node ──────────────────────────────────────
319+ async function processNode (
320+ node : WorkflowNode ,
321+ scheduler : ReturnType < typeof createScheduler > ,
322+ loopEdgesByCondition : Map < string , WorkflowEdge [ ] > ,
323+ loopCounters : Map < string , number > ,
324+ ) : Promise < void > {
325+ if ( stopped ) return
326+
327+ // Condition nodes: evaluate inline, no process spawned
328+ if ( node . type === 'condition' ) {
329+ const condStartTime = Date . now ( )
330+ nodeExecCount . set ( node . id , ( nodeExecCount . get ( node . id ) ?? 0 ) + 1 )
331+
332+ push ( workflow . id , {
333+ type : 'node:started' ,
334+ workflowId : workflow . id ,
335+ nodeId : node . id ,
336+ message : `Evaluating ${ node . name } ` ,
337+ } )
338+
339+ const branch = evaluateCondition ( node )
340+ push ( workflow . id , {
341+ type : 'node:done' ,
342+ workflowId : workflow . id ,
343+ nodeId : node . id ,
344+ message : `Condition: ${ branch } ` ,
345+ branch,
346+ } )
347+ scheduler . resolveCondition ( node . id , branch )
348+
349+ const condFinishTime = Date . now ( )
350+ const condNodeRun : WorkflowNodeRun = {
351+ nodeId : node . id ,
352+ nodeName : node . name ,
353+ status : 'done' ,
354+ startedAt : condStartTime ,
355+ finishedAt : condFinishTime ,
356+ durationMs : condFinishTime - condStartTime ,
357+ branchTaken : branch ,
358+ }
359+ const condExecN = nodeExecCount . get ( node . id ) ?? 1
360+ if ( condExecN > 1 ) condNodeRun . loopIterations = condExecN
361+ recorder . recordNode ( condNodeRun )
362+
363+ // Handle loop edges
364+ const condLoops = loopEdgesByCondition . get ( node . id ) ?? [ ]
365+ for ( const le of condLoops ) {
366+ if ( le . branch === branch ) {
367+ const count = ( loopCounters . get ( le . id ) ?? 0 ) + 1
368+ loopCounters . set ( le . id , count )
369+ if ( count <= ( le . maxIterations ?? 1 ) ) {
370+ push ( workflow . id , {
371+ type : 'node:loopIteration' ,
372+ workflowId : workflow . id ,
373+ nodeId : node . id ,
374+ iteration : count ,
375+ maxIterations : le . maxIterations ,
376+ message : `Loop iteration ${ String ( count ) } /${ String ( le . maxIterations ) } ` ,
377+ } )
378+ const resetIds = scheduler . resetLoopSubgraph ( le . toNodeId , node . id )
379+ // REL-7: Clear loop counters for inner loop edges within the reset subgraph
380+ // so nested loops restart correctly on each outer iteration.
381+ // BUG-5/CDX-5: Also check toNodeId is in resetIds — prevents sibling loop
382+ // edges from the same condition node from having their counters cleared
383+ for ( const innerLoops of loopEdgesByCondition . values ( ) ) {
384+ for ( const innerLe of innerLoops ) {
385+ if (
386+ innerLe . id !== le . id &&
387+ resetIds . has ( innerLe . fromNodeId ) &&
388+ resetIds . has ( innerLe . toNodeId )
389+ ) {
390+ loopCounters . delete ( innerLe . id )
391+ }
392+ }
393+ }
394+ // PERF-4: Clear output maps for re-executing nodes to prevent unbounded growth
395+ for ( const nid of resetIds ) {
396+ nodeOutputs . delete ( nid )
397+ conditionOutputs . delete ( nid )
398+ }
399+ }
400+ }
401+ }
402+ return
403+ }
404+
405+ // Build context summary from upstream node outputs
406+ const upstreamEdges = workflow . edges . filter (
407+ ( e ) => e . toNodeId === node . id && e . edgeType !== 'loop' ,
408+ )
409+ const contextSummary = upstreamEdges
410+ . map ( ( e ) => {
411+ const out = nodeOutputs . get ( e . fromNodeId )
412+ return out ? `[${ e . fromNodeId } ]: ${ out . slice ( - 4000 ) } ` : ''
413+ } )
414+ . filter ( Boolean )
415+ . join ( '\n\n' )
416+
417+ // Run with retry, record result
418+ const result = await executeWithRetry ( node , contextSummary )
419+
420+ if ( result === 'stopped' ) return
421+
422+ if ( result === 'success' ) {
423+ scheduler . completeNode ( node . id )
412424 } else {
413- scheduler . failNode ( node . id )
414- stopped = true
425+ // result === 'failed'
426+ if ( node . continueOnError ) {
427+ scheduler . completeNode ( node . id ) // treat as done for scheduling
428+ } else {
429+ scheduler . failNode ( node . id )
430+ stopped = true
431+ }
415432 }
416433 }
417434
0 commit comments