Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions packages/nodes/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ export {
endNode,
delayNode,
webhookTriggerNode,
loopNode,
ConditionalInputSchema,
ConditionalOutputSchema,
ConditionSchema,
Expand All @@ -14,6 +15,9 @@ export {
DelayOutputSchema,
WebhookTriggerInputSchema,
WebhookTriggerOutputSchema,
LoopInputSchema,
LoopOutputSchema,
LoopErrorSchema,
} from './logic/index.js'

export type {
Expand All @@ -27,6 +31,9 @@ export type {
DelayOutput,
WebhookTriggerInput,
WebhookTriggerOutput,
LoopInput,
LoopOutput,
LoopError,
} from './logic/index.js'

// Transform nodes
Expand Down Expand Up @@ -311,6 +318,7 @@ import { conditionalNode } from './logic/index.js'
import { endNode } from './logic/index.js'
import { delayNode } from './logic/index.js'
import { webhookTriggerNode } from './logic/index.js'
import { loopNode } from './logic/index.js'
import { mapNode, filterNode, sortNode } from './transform/index.js'
import { httpRequestNode, breadNode } from './examples/index.js'
import {
Expand Down Expand Up @@ -364,6 +372,7 @@ export const builtInNodes = [
endNode,
delayNode,
webhookTriggerNode,
loopNode,
// Transform
mapNode,
filterNode,
Expand Down
99 changes: 99 additions & 0 deletions packages/nodes/src/logic/__tests__/loop.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import { describe, it, expect } from 'vitest';
import { loopNode, LoopInputSchema } from '../loop';

Copilot AI Apr 3, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test imports should follow the project’s ESM convention of using .js extensions (see webhook-trigger.test.ts). import { loopNode, LoopInputSchema } from '../loop'; is inconsistent and can break under strict ESM resolution. Update to import from ../loop.js.

Suggested change
import { loopNode, LoopInputSchema } from '../loop';
import { loopNode, LoopInputSchema } from '../loop.js';

Copilot uses AI. Check for mistakes.

describe('loopNode', () => {
it('should have correct metadata', () => {
expect(loopNode.type).toBe('loop');
expect(loopNode.category).toBe('logic');
expect(loopNode.name).toBe('Loop');
});

it('should validate valid input', () => {
const result = LoopInputSchema.safeParse({
items: [1, 2, 3],
});
expect(result.success).toBe(true);
});

it('should apply defaults', () => {
const result = LoopInputSchema.parse({
items: [1, 2, 3],
});
expect(result.concurrency).toBe(1);
expect(result.delayMs).toBe(0);
expect(result.continueOnError).toBe(false);
});

it('should reject invalid concurrency', () => {
const result = LoopInputSchema.safeParse({
items: [1],
concurrency: 0,
});
expect(result.success).toBe(false);
});

it('should reject negative delay', () => {
const result = LoopInputSchema.safeParse({
items: [1],
delayMs: -1,
});
expect(result.success).toBe(false);
});

it('should handle empty array', async () => {
const mockContext = {
userId: 'test',
workflowExecutionId: 'test',
credentials: {},
variables: {},
interpolate: (s: string) => s,
evaluateJsonPath: (s: string) => s,
};
Comment on lines +44 to +51

Copilot AI Apr 3, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The mocked execution context here doesn’t match NodeExecutionContext (it provides interpolate/evaluateJsonPath but omits resolveNestedPath) and then is cast to any. This weakens the tests because they won’t fail if loopNode later starts relying on resolveNestedPath like other logic nodes do. Prefer a minimal, correctly-shaped context (include resolveNestedPath) and avoid as any where possible.

Copilot uses AI. Check for mistakes.

const result = await loopNode.executor(
{ items: [], concurrency: 1, delayMs: 0, continueOnError: false },
mockContext as any
);

expect(result.success).toBe(true);
expect(result.output?.results).toEqual([]);
});

it('should process items sequentially', async () => {
const mockContext = {
userId: 'test',
workflowExecutionId: 'test',
credentials: {},
variables: {},
interpolate: (s: string) => s,
evaluateJsonPath: (s: string) => s,
};

const result = await loopNode.executor(
{ items: ['a', 'b', 'c'], concurrency: 1, delayMs: 0, continueOnError: false },
mockContext as any
);

expect(result.success).toBe(true);
expect(result.output?.results).toEqual(['a', 'b', 'c']);
});

it('should process items concurrently', async () => {
const mockContext = {
userId: 'test',
workflowExecutionId: 'test',
credentials: {},
variables: {},
interpolate: (s: string) => s,
evaluateJsonPath: (s: string) => s,
};

const result = await loopNode.executor(
{ items: [1, 2, 3, 4, 5], concurrency: 3, delayMs: 0, continueOnError: false },
mockContext as any
);

expect(result.success).toBe(true);
expect(result.output?.results).toEqual([1, 2, 3, 4, 5]);
});
Comment on lines +62 to +98

Copilot AI Apr 3, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests cover metadata and basic schema defaults, but they don’t exercise key behavior introduced by this node: delayMs timing, continueOnError/error accumulation, or that concurrency batching preserves order when work is actually performed. Add unit tests that (1) simulate a per-item failure and assert errors + results placeholder behavior, and (2) use fake timers to assert delays are applied between iterations/batches.

Copilot uses AI. Check for mistakes.
});
4 changes: 3 additions & 1 deletion packages/nodes/src/logic/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,6 @@ export { webhookTriggerNode } from './webhook-trigger.js';
export { WebhookTriggerInputSchema, WebhookTriggerOutputSchema } from './webhook-trigger.js';
export type { WebhookTriggerInput, WebhookTriggerOutput } from './webhook-trigger.js';


export { loopNode } from './loop.js';
export type { LoopInput, LoopOutput, LoopError } from './loop.js';
export { LoopInputSchema, LoopOutputSchema, LoopErrorSchema } from './loop.js';
192 changes: 192 additions & 0 deletions packages/nodes/src/logic/loop.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
import { z } from 'zod';
import { defineNode } from '@jam-nodes/core';

/**
* Input schema for loop node
*/
export const LoopInputSchema = z.object({
/** Array of items to iterate over */
items: z.array(z.unknown()),
/** Max parallel executions (default: 1 for sequential) */
concurrency: z.number().min(1).max(100).default(1),
/** Delay between iterations in milliseconds */
delayMs: z.number().min(0).max(60000).default(0),
/** Continue processing remaining items if one fails */
continueOnError: z.boolean().default(false),
});

export type LoopInput = z.infer<typeof LoopInputSchema>;

/**
* Error entry for a failed iteration
*/
export const LoopErrorSchema = z.object({
index: z.number(),
error: z.string(),
});

export type LoopError = z.infer<typeof LoopErrorSchema>;

/**
* Output schema for loop node
*/
export const LoopOutputSchema = z.object({
results: z.array(z.unknown()),
errors: z.array(LoopErrorSchema).optional(),
});

export type LoopOutput = z.infer<typeof LoopOutputSchema>;

/**
* Sleep utility for rate limiting between iterations
*/
function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}

/**
* Process items sequentially with optional delay
*/
async function processSequential(
items: unknown[],
delayMs: number,
continueOnError: boolean
): Promise<{ results: unknown[]; errors: LoopError[] }> {
const results: unknown[] = [];
const errors: LoopError[] = [];

for (let i = 0; i < items.length; i++) {
try {
results.push(items[i]);
} catch (err) {
const errorMsg = err instanceof Error ? err.message : String(err);
errors.push({ index: i, error: errorMsg });
if (!continueOnError) {
break;
}
results.push(null);
}
Comment on lines +53 to +68

Copilot AI Apr 3, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

processSequential currently just pushes the original items[i] into results. This operation cannot throw for normal arrays, so the try/catch, errors collection, and continueOnError behavior are effectively dead code. If this node is meant to execute per-item work (e.g., invoking child node execution), that processing callback needs to happen here so failures can be captured; otherwise the node should be simplified to remove the misleading error-handling paths.

Suggested change
continueOnError: boolean
): Promise<{ results: unknown[]; errors: LoopError[] }> {
const results: unknown[] = [];
const errors: LoopError[] = [];
for (let i = 0; i < items.length; i++) {
try {
results.push(items[i]);
} catch (err) {
const errorMsg = err instanceof Error ? err.message : String(err);
errors.push({ index: i, error: errorMsg });
if (!continueOnError) {
break;
}
results.push(null);
}
_continueOnError: boolean
): Promise<{ results: unknown[]; errors: LoopError[] }> {
const results: unknown[] = [];
const errors: LoopError[] = [];
for (let i = 0; i < items.length; i++) {
results.push(items[i]);

Copilot uses AI. Check for mistakes.

if (delayMs > 0 && i < items.length - 1) {
await sleep(delayMs);
}
}

return { results, errors };
}

/**
* Process items with concurrency limit and optional delay
*/
async function processConcurrent(
items: unknown[],
concurrency: number,
delayMs: number,
continueOnError: boolean
): Promise<{ results: unknown[]; errors: LoopError[] }> {
const results: unknown[] = new Array(items.length).fill(null);
const errors: LoopError[] = [];
let stopped = false;

// Process in batches of `concurrency`
for (let batchStart = 0; batchStart < items.length; batchStart += concurrency) {
if (stopped) break;

const batchEnd = Math.min(batchStart + concurrency, items.length);
const batch = items.slice(batchStart, batchEnd);

const batchPromises = batch.map(async (item, batchIndex) => {
const globalIndex = batchStart + batchIndex;
try {
results[globalIndex] = item;
} catch (err) {
const errorMsg = err instanceof Error ? err.message : String(err);
errors.push({ index: globalIndex, error: errorMsg });
if (!continueOnError) {
stopped = true;
}
}
});

await Promise.all(batchPromises);

if (delayMs > 0 && batchEnd < items.length && !stopped) {
Comment on lines +89 to +113

Copilot AI Apr 3, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

processConcurrent assigns results[globalIndex] = item inside a try/catch, but that assignment (and reading item) will not throw in typical usage. As a result, errors will never populate and continueOnError/stopped won't have any real effect. If per-item execution is intended, introduce an actual per-item processing step that can fail and be captured; otherwise remove this error-handling logic to avoid giving a false sense of resilience.

Suggested change
let stopped = false;
// Process in batches of `concurrency`
for (let batchStart = 0; batchStart < items.length; batchStart += concurrency) {
if (stopped) break;
const batchEnd = Math.min(batchStart + concurrency, items.length);
const batch = items.slice(batchStart, batchEnd);
const batchPromises = batch.map(async (item, batchIndex) => {
const globalIndex = batchStart + batchIndex;
try {
results[globalIndex] = item;
} catch (err) {
const errorMsg = err instanceof Error ? err.message : String(err);
errors.push({ index: globalIndex, error: errorMsg });
if (!continueOnError) {
stopped = true;
}
}
});
await Promise.all(batchPromises);
if (delayMs > 0 && batchEnd < items.length && !stopped) {
void continueOnError;
// Process in batches of `concurrency`
for (let batchStart = 0; batchStart < items.length; batchStart += concurrency) {
const batchEnd = Math.min(batchStart + concurrency, items.length);
const batch = items.slice(batchStart, batchEnd);
const batchPromises = batch.map(async (item, batchIndex) => {
const globalIndex = batchStart + batchIndex;
results[globalIndex] = item;
});
await Promise.all(batchPromises);
if (delayMs > 0 && batchEnd < items.length) {

Copilot uses AI. Check for mistakes.
await sleep(delayMs);
}
}

return { results, errors };
}

/**
* Loop node - iterate over an array of items with rate limiting and concurrency control.
*
* Processes each item in the array, supporting both sequential and parallel execution.
* In a workflow context, the execution engine handles running child nodes for each item;
* this node manages the iteration, concurrency, delay, and error tracking.
*
Comment on lines +121 to +127

Copilot AI Apr 3, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docstring/PR description says the loop node manages iteration while the execution engine runs child nodes for each item, but this implementation never invokes any child execution and simply echoes items back as results. This makes concurrency/rate limiting largely meaningless and doesn't meet the stated behavior of “executing child nodes for each item”. Either integrate with the runtime’s child-node execution mechanism (so results reflect per-item execution outputs) or update the node/PR description to reflect that this is currently a pass-through iterator.

Copilot uses AI. Check for mistakes.
* @example
* ```typescript
* // Sequential with rate limiting
* {
* items: ['a@example.com', 'b@example.com'],
* delayMs: 200,
* continueOnError: true
* }
*
* // Parallel with concurrency limit
* {
* items: [1, 2, 3, 4, 5],
* concurrency: 3,
* delayMs: 100
* }
* ```
*/
export const loopNode = defineNode({
type: 'loop',
name: 'Loop',
description: 'Iterate over an array of items with rate limiting and concurrency control',
category: 'logic',
inputSchema: LoopInputSchema,
outputSchema: LoopOutputSchema,
capabilities: {
supportsRerun: true,
supportsCancel: true,
},
executor: async (input) => {
try {
if (!Array.isArray(input.items) || input.items.length === 0) {
return {
success: true,
output: {
results: [],
errors: [],

Copilot AI Apr 3, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the empty-items fast path, errors is returned as an empty array even though the schema marks it as optional and the non-empty path omits it when there are no errors. For consistency (and to match the documented output shape), return errors: undefined (or omit the property) when there are no errors.

Suggested change
errors: [],
errors: undefined,

Copilot uses AI. Check for mistakes.
},
};
}

const { results, errors } =
input.concurrency === 1
? await processSequential(input.items, input.delayMs, input.continueOnError)
: await processConcurrent(
input.items,
input.concurrency,
input.delayMs,
input.continueOnError
);

return {
success: true,
output: {
results,
errors: errors.length > 0 ? errors : undefined,
},
};
} catch (error) {
return {
success: false,
error: error instanceof Error ? error.message : 'Loop operation failed',
};
}
},
});
Loading