diff --git a/packages/nodes/src/index.ts b/packages/nodes/src/index.ts index 97e62dc..31a81dd 100644 --- a/packages/nodes/src/index.ts +++ b/packages/nodes/src/index.ts @@ -4,6 +4,7 @@ export { endNode, delayNode, webhookTriggerNode, + loopNode, ConditionalInputSchema, ConditionalOutputSchema, ConditionSchema, @@ -14,6 +15,9 @@ export { DelayOutputSchema, WebhookTriggerInputSchema, WebhookTriggerOutputSchema, + LoopInputSchema, + LoopOutputSchema, + LoopErrorSchema, } from './logic/index.js' export type { @@ -27,6 +31,9 @@ export type { DelayOutput, WebhookTriggerInput, WebhookTriggerOutput, + LoopInput, + LoopOutput, + LoopError, } from './logic/index.js' // Transform nodes @@ -311,6 +318,7 @@ import { conditionalNode } from './logic/index.js' import { endNode } from './logic/index.js' import { delayNode } from './logic/index.js' import { webhookTriggerNode } from './logic/index.js' +import { loopNode } from './logic/index.js' import { mapNode, filterNode, sortNode } from './transform/index.js' import { httpRequestNode, breadNode } from './examples/index.js' import { @@ -364,6 +372,7 @@ export const builtInNodes = [ endNode, delayNode, webhookTriggerNode, + loopNode, // Transform mapNode, filterNode, diff --git a/packages/nodes/src/logic/__tests__/loop.test.ts b/packages/nodes/src/logic/__tests__/loop.test.ts new file mode 100644 index 0000000..52a8409 --- /dev/null +++ b/packages/nodes/src/logic/__tests__/loop.test.ts @@ -0,0 +1,99 @@ +import { describe, it, expect } from 'vitest'; +import { loopNode, LoopInputSchema } from '../loop'; + +describe('loopNode', () => { + it('should have correct metadata', () => { + expect(loopNode.type).toBe('loop'); + expect(loopNode.category).toBe('logic'); + expect(loopNode.name).toBe('Loop'); + }); + + it('should validate valid input', () => { + const result = LoopInputSchema.safeParse({ + items: [1, 2, 3], + }); + expect(result.success).toBe(true); + }); + + it('should apply defaults', () => { + const result = LoopInputSchema.parse({ + items: [1, 2, 3], + }); + expect(result.concurrency).toBe(1); + expect(result.delayMs).toBe(0); + expect(result.continueOnError).toBe(false); + }); + + it('should reject invalid concurrency', () => { + const result = LoopInputSchema.safeParse({ + items: [1], + concurrency: 0, + }); + expect(result.success).toBe(false); + }); + + it('should reject negative delay', () => { + const result = LoopInputSchema.safeParse({ + items: [1], + delayMs: -1, + }); + expect(result.success).toBe(false); + }); + + it('should handle empty array', async () => { + const mockContext = { + userId: 'test', + workflowExecutionId: 'test', + credentials: {}, + variables: {}, + interpolate: (s: string) => s, + evaluateJsonPath: (s: string) => s, + }; + + const result = await loopNode.executor( + { items: [], concurrency: 1, delayMs: 0, continueOnError: false }, + mockContext as any + ); + + expect(result.success).toBe(true); + expect(result.output?.results).toEqual([]); + }); + + it('should process items sequentially', async () => { + const mockContext = { + userId: 'test', + workflowExecutionId: 'test', + credentials: {}, + variables: {}, + interpolate: (s: string) => s, + evaluateJsonPath: (s: string) => s, + }; + + const result = await loopNode.executor( + { items: ['a', 'b', 'c'], concurrency: 1, delayMs: 0, continueOnError: false }, + mockContext as any + ); + + expect(result.success).toBe(true); + expect(result.output?.results).toEqual(['a', 'b', 'c']); + }); + + it('should process items concurrently', async () => { + const mockContext = { + userId: 'test', + workflowExecutionId: 'test', + credentials: {}, + variables: {}, + interpolate: (s: string) => s, + evaluateJsonPath: (s: string) => s, + }; + + const result = await loopNode.executor( + { items: [1, 2, 3, 4, 5], concurrency: 3, delayMs: 0, continueOnError: false }, + mockContext as any + ); + + expect(result.success).toBe(true); + expect(result.output?.results).toEqual([1, 2, 3, 4, 5]); + }); +}); diff --git a/packages/nodes/src/logic/index.ts b/packages/nodes/src/logic/index.ts index 554d3e7..ce1ba00 100644 --- a/packages/nodes/src/logic/index.ts +++ b/packages/nodes/src/logic/index.ts @@ -24,4 +24,6 @@ export { webhookTriggerNode } from './webhook-trigger.js'; export { WebhookTriggerInputSchema, WebhookTriggerOutputSchema } from './webhook-trigger.js'; export type { WebhookTriggerInput, WebhookTriggerOutput } from './webhook-trigger.js'; - +export { loopNode } from './loop.js'; +export type { LoopInput, LoopOutput, LoopError } from './loop.js'; +export { LoopInputSchema, LoopOutputSchema, LoopErrorSchema } from './loop.js'; diff --git a/packages/nodes/src/logic/loop.ts b/packages/nodes/src/logic/loop.ts new file mode 100644 index 0000000..0741d3f --- /dev/null +++ b/packages/nodes/src/logic/loop.ts @@ -0,0 +1,192 @@ +import { z } from 'zod'; +import { defineNode } from '@jam-nodes/core'; + +/** + * Input schema for loop node + */ +export const LoopInputSchema = z.object({ + /** Array of items to iterate over */ + items: z.array(z.unknown()), + /** Max parallel executions (default: 1 for sequential) */ + concurrency: z.number().min(1).max(100).default(1), + /** Delay between iterations in milliseconds */ + delayMs: z.number().min(0).max(60000).default(0), + /** Continue processing remaining items if one fails */ + continueOnError: z.boolean().default(false), +}); + +export type LoopInput = z.infer; + +/** + * Error entry for a failed iteration + */ +export const LoopErrorSchema = z.object({ + index: z.number(), + error: z.string(), +}); + +export type LoopError = z.infer; + +/** + * Output schema for loop node + */ +export const LoopOutputSchema = z.object({ + results: z.array(z.unknown()), + errors: z.array(LoopErrorSchema).optional(), +}); + +export type LoopOutput = z.infer; + +/** + * Sleep utility for rate limiting between iterations + */ +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +/** + * Process items sequentially with optional delay + */ +async function processSequential( + items: unknown[], + delayMs: number, + continueOnError: boolean +): Promise<{ results: unknown[]; errors: LoopError[] }> { + const results: unknown[] = []; + const errors: LoopError[] = []; + + for (let i = 0; i < items.length; i++) { + try { + results.push(items[i]); + } catch (err) { + const errorMsg = err instanceof Error ? err.message : String(err); + errors.push({ index: i, error: errorMsg }); + if (!continueOnError) { + break; + } + results.push(null); + } + + if (delayMs > 0 && i < items.length - 1) { + await sleep(delayMs); + } + } + + return { results, errors }; +} + +/** + * Process items with concurrency limit and optional delay + */ +async function processConcurrent( + items: unknown[], + concurrency: number, + delayMs: number, + continueOnError: boolean +): Promise<{ results: unknown[]; errors: LoopError[] }> { + const results: unknown[] = new Array(items.length).fill(null); + const errors: LoopError[] = []; + let stopped = false; + + // Process in batches of `concurrency` + for (let batchStart = 0; batchStart < items.length; batchStart += concurrency) { + if (stopped) break; + + const batchEnd = Math.min(batchStart + concurrency, items.length); + const batch = items.slice(batchStart, batchEnd); + + const batchPromises = batch.map(async (item, batchIndex) => { + const globalIndex = batchStart + batchIndex; + try { + results[globalIndex] = item; + } catch (err) { + const errorMsg = err instanceof Error ? err.message : String(err); + errors.push({ index: globalIndex, error: errorMsg }); + if (!continueOnError) { + stopped = true; + } + } + }); + + await Promise.all(batchPromises); + + if (delayMs > 0 && batchEnd < items.length && !stopped) { + await sleep(delayMs); + } + } + + return { results, errors }; +} + +/** + * Loop node - iterate over an array of items with rate limiting and concurrency control. + * + * Processes each item in the array, supporting both sequential and parallel execution. + * In a workflow context, the execution engine handles running child nodes for each item; + * this node manages the iteration, concurrency, delay, and error tracking. + * + * @example + * ```typescript + * // Sequential with rate limiting + * { + * items: ['a@example.com', 'b@example.com'], + * delayMs: 200, + * continueOnError: true + * } + * + * // Parallel with concurrency limit + * { + * items: [1, 2, 3, 4, 5], + * concurrency: 3, + * delayMs: 100 + * } + * ``` + */ +export const loopNode = defineNode({ + type: 'loop', + name: 'Loop', + description: 'Iterate over an array of items with rate limiting and concurrency control', + category: 'logic', + inputSchema: LoopInputSchema, + outputSchema: LoopOutputSchema, + capabilities: { + supportsRerun: true, + supportsCancel: true, + }, + executor: async (input) => { + try { + if (!Array.isArray(input.items) || input.items.length === 0) { + return { + success: true, + output: { + results: [], + errors: [], + }, + }; + } + + const { results, errors } = + input.concurrency === 1 + ? await processSequential(input.items, input.delayMs, input.continueOnError) + : await processConcurrent( + input.items, + input.concurrency, + input.delayMs, + input.continueOnError + ); + + return { + success: true, + output: { + results, + errors: errors.length > 0 ? errors : undefined, + }, + }; + } catch (error) { + return { + success: false, + error: error instanceof Error ? error.message : 'Loop operation failed', + }; + } + }, +});