feat(nodes): add loopNode - iterate over arrays with rate limiting#66
feat(nodes): add loopNode - iterate over arrays with rate limiting#66FelixNg1022 wants to merge 1 commit into
Conversation
Implements a loop primitive node with: - Sequential iteration by default - Optional parallel execution with configurable concurrency limit - Rate limiting via delayMs between iterations/batches - Per-item error handling with continueOnError option - Zod schemas for input/output validation - Unit tests for metadata, schema validation, and execution Closes wespreadjam#9
There was a problem hiding this comment.
Pull request overview
This PR introduces a new loopNode under packages/nodes intended to support iterating over arrays with configurable concurrency and delay, and wires it into the public exports and built-in node list.
Changes:
- Added a new
loopNodeimplementation with Zod input/output schemas. - Exported the node + schemas/types from the logic index and package root, and registered it in
builtInNodes. - Added initial Vitest unit tests for metadata and basic validation/execution paths.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 7 comments.
| File | Description |
|---|---|
| packages/nodes/src/logic/loop.ts | Adds the new loopNode implementation and schemas. |
| packages/nodes/src/logic/index.ts | Re-exports loopNode and its schemas/types from the logic barrel. |
| packages/nodes/src/logic/tests/loop.test.ts | Adds unit tests for loop node metadata and basic execution. |
| packages/nodes/src/index.ts | Re-exports loop node artifacts from the package root and adds it to builtInNodes. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| continueOnError: boolean | ||
| ): Promise<{ results: unknown[]; errors: LoopError[] }> { | ||
| const results: unknown[] = []; | ||
| const errors: LoopError[] = []; | ||
|
|
||
| for (let i = 0; i < items.length; i++) { | ||
| try { | ||
| results.push(items[i]); | ||
| } catch (err) { | ||
| const errorMsg = err instanceof Error ? err.message : String(err); | ||
| errors.push({ index: i, error: errorMsg }); | ||
| if (!continueOnError) { | ||
| break; | ||
| } | ||
| results.push(null); | ||
| } |
There was a problem hiding this comment.
processSequential currently just pushes the original items[i] into results. This operation cannot throw for normal arrays, so the try/catch, errors collection, and continueOnError behavior are effectively dead code. If this node is meant to execute per-item work (e.g., invoking child node execution), that processing callback needs to happen here so failures can be captured; otherwise the node should be simplified to remove the misleading error-handling paths.
| continueOnError: boolean | |
| ): Promise<{ results: unknown[]; errors: LoopError[] }> { | |
| const results: unknown[] = []; | |
| const errors: LoopError[] = []; | |
| for (let i = 0; i < items.length; i++) { | |
| try { | |
| results.push(items[i]); | |
| } catch (err) { | |
| const errorMsg = err instanceof Error ? err.message : String(err); | |
| errors.push({ index: i, error: errorMsg }); | |
| if (!continueOnError) { | |
| break; | |
| } | |
| results.push(null); | |
| } | |
| _continueOnError: boolean | |
| ): Promise<{ results: unknown[]; errors: LoopError[] }> { | |
| const results: unknown[] = []; | |
| const errors: LoopError[] = []; | |
| for (let i = 0; i < items.length; i++) { | |
| results.push(items[i]); |
| let stopped = false; | ||
|
|
||
| // Process in batches of `concurrency` | ||
| for (let batchStart = 0; batchStart < items.length; batchStart += concurrency) { | ||
| if (stopped) break; | ||
|
|
||
| const batchEnd = Math.min(batchStart + concurrency, items.length); | ||
| const batch = items.slice(batchStart, batchEnd); | ||
|
|
||
| const batchPromises = batch.map(async (item, batchIndex) => { | ||
| const globalIndex = batchStart + batchIndex; | ||
| try { | ||
| results[globalIndex] = item; | ||
| } catch (err) { | ||
| const errorMsg = err instanceof Error ? err.message : String(err); | ||
| errors.push({ index: globalIndex, error: errorMsg }); | ||
| if (!continueOnError) { | ||
| stopped = true; | ||
| } | ||
| } | ||
| }); | ||
|
|
||
| await Promise.all(batchPromises); | ||
|
|
||
| if (delayMs > 0 && batchEnd < items.length && !stopped) { |
There was a problem hiding this comment.
processConcurrent assigns results[globalIndex] = item inside a try/catch, but that assignment (and reading item) will not throw in typical usage. As a result, errors will never populate and continueOnError/stopped won't have any real effect. If per-item execution is intended, introduce an actual per-item processing step that can fail and be captured; otherwise remove this error-handling logic to avoid giving a false sense of resilience.
| let stopped = false; | |
| // Process in batches of `concurrency` | |
| for (let batchStart = 0; batchStart < items.length; batchStart += concurrency) { | |
| if (stopped) break; | |
| const batchEnd = Math.min(batchStart + concurrency, items.length); | |
| const batch = items.slice(batchStart, batchEnd); | |
| const batchPromises = batch.map(async (item, batchIndex) => { | |
| const globalIndex = batchStart + batchIndex; | |
| try { | |
| results[globalIndex] = item; | |
| } catch (err) { | |
| const errorMsg = err instanceof Error ? err.message : String(err); | |
| errors.push({ index: globalIndex, error: errorMsg }); | |
| if (!continueOnError) { | |
| stopped = true; | |
| } | |
| } | |
| }); | |
| await Promise.all(batchPromises); | |
| if (delayMs > 0 && batchEnd < items.length && !stopped) { | |
| void continueOnError; | |
| // Process in batches of `concurrency` | |
| for (let batchStart = 0; batchStart < items.length; batchStart += concurrency) { | |
| const batchEnd = Math.min(batchStart + concurrency, items.length); | |
| const batch = items.slice(batchStart, batchEnd); | |
| const batchPromises = batch.map(async (item, batchIndex) => { | |
| const globalIndex = batchStart + batchIndex; | |
| results[globalIndex] = item; | |
| }); | |
| await Promise.all(batchPromises); | |
| if (delayMs > 0 && batchEnd < items.length) { |
| /** | ||
| * Loop node - iterate over an array of items with rate limiting and concurrency control. | ||
| * | ||
| * Processes each item in the array, supporting both sequential and parallel execution. | ||
| * In a workflow context, the execution engine handles running child nodes for each item; | ||
| * this node manages the iteration, concurrency, delay, and error tracking. | ||
| * |
There was a problem hiding this comment.
The docstring/PR description says the loop node manages iteration while the execution engine runs child nodes for each item, but this implementation never invokes any child execution and simply echoes items back as results. This makes concurrency/rate limiting largely meaningless and doesn't meet the stated behavior of “executing child nodes for each item”. Either integrate with the runtime’s child-node execution mechanism (so results reflect per-item execution outputs) or update the node/PR description to reflect that this is currently a pass-through iterator.
| success: true, | ||
| output: { | ||
| results: [], | ||
| errors: [], |
There was a problem hiding this comment.
In the empty-items fast path, errors is returned as an empty array even though the schema marks it as optional and the non-empty path omits it when there are no errors. For consistency (and to match the documented output shape), return errors: undefined (or omit the property) when there are no errors.
| errors: [], | |
| errors: undefined, |
| it('should process items sequentially', async () => { | ||
| const mockContext = { | ||
| userId: 'test', | ||
| workflowExecutionId: 'test', | ||
| credentials: {}, | ||
| variables: {}, | ||
| interpolate: (s: string) => s, | ||
| evaluateJsonPath: (s: string) => s, | ||
| }; | ||
|
|
||
| const result = await loopNode.executor( | ||
| { items: ['a', 'b', 'c'], concurrency: 1, delayMs: 0, continueOnError: false }, | ||
| mockContext as any | ||
| ); | ||
|
|
||
| expect(result.success).toBe(true); | ||
| expect(result.output?.results).toEqual(['a', 'b', 'c']); | ||
| }); | ||
|
|
||
| it('should process items concurrently', async () => { | ||
| const mockContext = { | ||
| userId: 'test', | ||
| workflowExecutionId: 'test', | ||
| credentials: {}, | ||
| variables: {}, | ||
| interpolate: (s: string) => s, | ||
| evaluateJsonPath: (s: string) => s, | ||
| }; | ||
|
|
||
| const result = await loopNode.executor( | ||
| { items: [1, 2, 3, 4, 5], concurrency: 3, delayMs: 0, continueOnError: false }, | ||
| mockContext as any | ||
| ); | ||
|
|
||
| expect(result.success).toBe(true); | ||
| expect(result.output?.results).toEqual([1, 2, 3, 4, 5]); | ||
| }); |
There was a problem hiding this comment.
These tests cover metadata and basic schema defaults, but they don’t exercise key behavior introduced by this node: delayMs timing, continueOnError/error accumulation, or that concurrency batching preserves order when work is actually performed. Add unit tests that (1) simulate a per-item failure and assert errors + results placeholder behavior, and (2) use fake timers to assert delays are applied between iterations/batches.
| @@ -0,0 +1,99 @@ | |||
| import { describe, it, expect } from 'vitest'; | |||
| import { loopNode, LoopInputSchema } from '../loop'; | |||
There was a problem hiding this comment.
Test imports should follow the project’s ESM convention of using .js extensions (see webhook-trigger.test.ts). import { loopNode, LoopInputSchema } from '../loop'; is inconsistent and can break under strict ESM resolution. Update to import from ../loop.js.
| import { loopNode, LoopInputSchema } from '../loop'; | |
| import { loopNode, LoopInputSchema } from '../loop.js'; |
| const mockContext = { | ||
| userId: 'test', | ||
| workflowExecutionId: 'test', | ||
| credentials: {}, | ||
| variables: {}, | ||
| interpolate: (s: string) => s, | ||
| evaluateJsonPath: (s: string) => s, | ||
| }; |
There was a problem hiding this comment.
The mocked execution context here doesn’t match NodeExecutionContext (it provides interpolate/evaluateJsonPath but omits resolveNestedPath) and then is cast to any. This weakens the tests because they won’t fail if loopNode later starts relying on resolveNestedPath like other logic nodes do. Prefer a minimal, correctly-shaped context (include resolveNestedPath) and avoid as any where possible.
Summary
loopNodeprimitive for iterating over arrays with rate limiting and concurrency controldelayMsbetween iterations/batchescontinueOnErroroptionCloses #9
Input/Output Schema
Input:
Output:
Test plan
pnpm testpasses — new unit tests for loopNode