From 95d8f39835641a62b0eb50758edac5cc6fcb1090 Mon Sep 17 00:00:00 2001 From: Dheeraj Shrivastav Date: Thu, 5 Mar 2026 12:17:17 +0530 Subject: [PATCH] feat: implement rateLimiterNode with fixed and sliding window strategies MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a rate limiter primitive node that queues requests to stay within API rate limits. Supports both fixed window (batch-based) and sliding window (timestamp-based) strategies with Zod-validated I/O schemas. - Fixed window: processes items in batches, sleeps windowMs between batches - Sliding window: tracks per-request timestamps, waits only when window is full - Input: items[], requestsPerWindow (1–10000), windowMs (100–3600000), strategy - Output: processedItems, totalItems, totalDurationMs, echoed config metadata - Runtime guard for requestsPerWindow < 1 (belt-and-suspenders beyond schema) - 28 unit tests covering schema validation, ordering, timing assertions, output schema, and runtime error handling --- packages/nodes/src/index.ts | 7 + .../src/logic/__tests__/rate-limiter.test.ts | 370 ++++++++++++++++++ packages/nodes/src/logic/index.ts | 7 + packages/nodes/src/logic/rate-limiter.ts | 111 ++++++ 4 files changed, 495 insertions(+) create mode 100644 packages/nodes/src/logic/__tests__/rate-limiter.test.ts create mode 100644 packages/nodes/src/logic/rate-limiter.ts diff --git a/packages/nodes/src/index.ts b/packages/nodes/src/index.ts index 5c2de45..5966f02 100644 --- a/packages/nodes/src/index.ts +++ b/packages/nodes/src/index.ts @@ -3,6 +3,7 @@ export { conditionalNode, endNode, delayNode, + rateLimiterNode, ConditionalInputSchema, ConditionalOutputSchema, ConditionSchema, @@ -11,6 +12,8 @@ export { EndOutputSchema, DelayInputSchema, DelayOutputSchema, + RateLimiterInputSchema, + RateLimiterOutputSchema, } from './logic/index.js' export type { @@ -22,6 +25,8 @@ export type { EndOutput, DelayInput, DelayOutput, + RateLimiterInput, + RateLimiterOutput, } from './logic/index.js' // Transform nodes @@ -268,6 +273,7 @@ export type { import { conditionalNode } from './logic/index.js' import { endNode } from './logic/index.js' import { delayNode } from './logic/index.js' +import { rateLimiterNode } from './logic/index.js' import { mapNode, filterNode, sortNode } from './transform/index.js' import { httpRequestNode, breadNode } from './examples/index.js' import { @@ -313,6 +319,7 @@ export const builtInNodes = [ conditionalNode, endNode, delayNode, + rateLimiterNode, // Transform mapNode, filterNode, diff --git a/packages/nodes/src/logic/__tests__/rate-limiter.test.ts b/packages/nodes/src/logic/__tests__/rate-limiter.test.ts new file mode 100644 index 0000000..51e3e1a --- /dev/null +++ b/packages/nodes/src/logic/__tests__/rate-limiter.test.ts @@ -0,0 +1,370 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest' +import { + rateLimiterNode, + RateLimiterInputSchema, + RateLimiterOutputSchema, +} from '../rate-limiter.js' + +// --------------------------------------------------------------------------- +// Metadata +// --------------------------------------------------------------------------- + +describe('rateLimiterNode - metadata', () => { + it('should have type rate_limiter', () => { + expect(rateLimiterNode.type).toBe('rate_limiter') + }) + + it('should have category logic', () => { + expect(rateLimiterNode.category).toBe('logic') + }) + + it('should support rerun', () => { + expect(rateLimiterNode.capabilities?.supportsRerun).toBe(true) + }) + + it('should not support cancel', () => { + expect(rateLimiterNode.capabilities?.supportsCancel).toBe(false) + }) +}) + +// --------------------------------------------------------------------------- +// Input schema validation +// --------------------------------------------------------------------------- + +describe('rateLimiterNode - input schema validation', () => { + it('should accept valid minimal input', () => { + const result = RateLimiterInputSchema.safeParse({ + items: [1], + requestsPerWindow: 5, + windowMs: 1000, + strategy: 'fixed', + }) + expect(result.success).toBe(true) + }) + + it('should reject empty items array', () => { + const result = RateLimiterInputSchema.safeParse({ + items: [], + requestsPerWindow: 5, + windowMs: 1000, + strategy: 'fixed', + }) + expect(result.success).toBe(false) + }) + + it('should reject requestsPerWindow of 0', () => { + const result = RateLimiterInputSchema.safeParse({ + items: [1], + requestsPerWindow: 0, + windowMs: 1000, + strategy: 'fixed', + }) + expect(result.success).toBe(false) + }) + + it('should reject windowMs below 100', () => { + const result = RateLimiterInputSchema.safeParse({ + items: [1], + requestsPerWindow: 5, + windowMs: 99, + strategy: 'fixed', + }) + expect(result.success).toBe(false) + }) + + it('should reject windowMs above 3600000', () => { + const result = RateLimiterInputSchema.safeParse({ + items: [1], + requestsPerWindow: 5, + windowMs: 3600001, + strategy: 'fixed', + }) + expect(result.success).toBe(false) + }) + + it('should reject invalid strategy', () => { + const result = RateLimiterInputSchema.safeParse({ + items: [1], + requestsPerWindow: 5, + windowMs: 1000, + strategy: 'rolling', + }) + expect(result.success).toBe(false) + }) + + it('should accept strategy sliding', () => { + const result = RateLimiterInputSchema.safeParse({ + items: [1], + requestsPerWindow: 5, + windowMs: 1000, + strategy: 'sliding', + }) + expect(result.success).toBe(true) + }) + + it('should accept strategy fixed', () => { + const result = RateLimiterInputSchema.safeParse({ + items: [1], + requestsPerWindow: 5, + windowMs: 1000, + strategy: 'fixed', + }) + expect(result.success).toBe(true) + }) +}) + +// --------------------------------------------------------------------------- +// Fixed window strategy +// --------------------------------------------------------------------------- + +describe('rateLimiterNode - fixed window strategy', () => { + beforeEach(() => { + vi.useFakeTimers() + }) + + afterEach(() => { + vi.useRealTimers() + }) + + it('should return success true and all items processed', async () => { + const items = [1, 2, 3, 4, 5, 6] + const executorPromise = rateLimiterNode.executor( + { items, requestsPerWindow: 3, windowMs: 500, strategy: 'fixed' }, + {} as never, + ) + await vi.runAllTimersAsync() + const result = await executorPromise + expect(result.success).toBe(true) + if (result.success) { + expect(result.output!.processedItems).toHaveLength(items.length) + expect(result.output!.strategy).toBe('fixed') + } + }) + + it('should return processedItems in original order', async () => { + const items = [1, 2, 3, 4, 5] + const executorPromise = rateLimiterNode.executor( + { items, requestsPerWindow: 3, windowMs: 500, strategy: 'fixed' }, + {} as never, + ) + await vi.runAllTimersAsync() + const result = await executorPromise + expect(result.success).toBe(true) + if (result.success) { + expect(result.output!.processedItems).toEqual([1, 2, 3, 4, 5]) + } + }) + + it('should include correct metadata in output', async () => { + const items = [1, 2, 3, 4, 5, 6] + const executorPromise = rateLimiterNode.executor( + { items, requestsPerWindow: 3, windowMs: 500, strategy: 'fixed' }, + {} as never, + ) + await vi.runAllTimersAsync() + const result = await executorPromise + expect(result.success).toBe(true) + if (result.success) { + expect(result.output!.totalItems).toBe(6) + expect(result.output!.windowMs).toBe(500) + expect(result.output!.requestsPerWindow).toBe(3) + } + }) + + it('should handle items that fit exactly in one window (no waiting needed)', async () => { + const items = [1, 2] + const executorPromise = rateLimiterNode.executor( + { items, requestsPerWindow: 5, windowMs: 500, strategy: 'fixed' }, + {} as never, + ) + // No sleep needed — resolves immediately + const result = await executorPromise + expect(result.success).toBe(true) + if (result.success) { + expect(result.output!.processedItems).toEqual([1, 2]) + } + }) + + it('should return success false if items array is empty after schema rejects it', () => { + const result = RateLimiterInputSchema.safeParse({ + items: [], + requestsPerWindow: 5, + windowMs: 500, + strategy: 'fixed', + }) + expect(result.success).toBe(false) + }) + + it('should wait at least windowMs between batches (totalDurationMs >= windowMs)', async () => { + const items = [1, 2, 3, 4] // 2 batches of 2 → one sleep of windowMs + const windowMs = 500 + const executorPromise = rateLimiterNode.executor( + { items, requestsPerWindow: 2, windowMs, strategy: 'fixed' }, + {} as never, + ) + await vi.runAllTimersAsync() + const result = await executorPromise + expect(result.success).toBe(true) + if (result.success) { + expect(result.output!.totalDurationMs).toBeGreaterThanOrEqual(windowMs) + } + }) +}) + +// --------------------------------------------------------------------------- +// Sliding window strategy +// --------------------------------------------------------------------------- + +describe('rateLimiterNode - sliding window strategy', () => { + beforeEach(() => { + vi.useFakeTimers() + }) + + afterEach(() => { + vi.useRealTimers() + }) + + it('should return success true and all items processed', async () => { + const items = [1, 2, 3, 4, 5] + const executorPromise = rateLimiterNode.executor( + { items, requestsPerWindow: 3, windowMs: 500, strategy: 'sliding' }, + {} as never, + ) + await vi.runAllTimersAsync() + const result = await executorPromise + expect(result.success).toBe(true) + if (result.success) { + expect(result.output!.processedItems).toHaveLength(5) + } + }) + + it('should return processedItems in original order', async () => { + const items = ['a', 'b', 'c', 'd', 'e'] + const executorPromise = rateLimiterNode.executor( + { items, requestsPerWindow: 3, windowMs: 500, strategy: 'sliding' }, + {} as never, + ) + await vi.runAllTimersAsync() + const result = await executorPromise + expect(result.success).toBe(true) + if (result.success) { + expect(result.output!.processedItems).toEqual(['a', 'b', 'c', 'd', 'e']) + } + }) + + it('should include correct strategy in output', async () => { + const items = [1, 2, 3] + const executorPromise = rateLimiterNode.executor( + { items, requestsPerWindow: 3, windowMs: 500, strategy: 'sliding' }, + {} as never, + ) + await vi.runAllTimersAsync() + const result = await executorPromise + expect(result.success).toBe(true) + if (result.success) { + expect(result.output!.strategy).toBe('sliding') + } + }) + + it('should handle single item with no waiting', async () => { + const items = ['only-one'] + const result = await rateLimiterNode.executor( + { items, requestsPerWindow: 10, windowMs: 1000, strategy: 'sliding' }, + {} as never, + ) + expect(result.success).toBe(true) + if (result.success) { + expect(result.output!.processedItems).toEqual(['only-one']) + } + }) + + it('should wait at least windowMs when window capacity is exceeded (totalDurationMs >= windowMs)', async () => { + const items = [1, 2, 3, 4] // limit 2 per 500ms — third item triggers a wait + const windowMs = 500 + const executorPromise = rateLimiterNode.executor( + { items, requestsPerWindow: 2, windowMs, strategy: 'sliding' }, + {} as never, + ) + await vi.runAllTimersAsync() + const result = await executorPromise + expect(result.success).toBe(true) + if (result.success) { + expect(result.output!.totalDurationMs).toBeGreaterThanOrEqual(windowMs) + } + }) +}) + +// --------------------------------------------------------------------------- +// Output schema +// --------------------------------------------------------------------------- + +describe('rateLimiterNode - output schema', () => { + it('should validate a well-formed output', () => { + const mockOutput = { + processedItems: [1, 2, 3], + totalItems: 3, + totalDurationMs: 100, + strategy: 'fixed', + windowMs: 1000, + requestsPerWindow: 5, + } + expect(RateLimiterOutputSchema.safeParse(mockOutput).success).toBe(true) + }) + + it('should reject output missing processedItems', () => { + const mockOutput = { + totalItems: 3, + totalDurationMs: 100, + strategy: 'fixed', + windowMs: 1000, + requestsPerWindow: 5, + } + expect(RateLimiterOutputSchema.safeParse(mockOutput).success).toBe(false) + }) + + it('should reject output with negative totalDurationMs', () => { + const mockOutput = { + processedItems: [1, 2, 3], + totalItems: 3, + totalDurationMs: -1, + strategy: 'fixed', + windowMs: 1000, + requestsPerWindow: 5, + } + expect(RateLimiterOutputSchema.safeParse(mockOutput).success).toBe(false) + }) +}) + +// --------------------------------------------------------------------------- +// Error handling +// --------------------------------------------------------------------------- + +describe('rateLimiterNode - error handling', () => { + it('should return success false if requestsPerWindow is 0 at runtime', async () => { + // Bypass schema by casting to the type directly + const invalidInput = { + items: [1, 2, 3], + requestsPerWindow: 0, + windowMs: 1000, + strategy: 'fixed', + } as Parameters[0] + + // With 0 requestsPerWindow, slicing by 0 causes an infinite loop — the + // executor should either handle this gracefully or the catch block should + // return success: false. We verify schema-level rejection as the guard. + const schemaResult = RateLimiterInputSchema.safeParse(invalidInput) + expect(schemaResult.success).toBe(false) + }) + + it('should return success false when executor receives requestsPerWindow < 1 directly (runtime guard)', async () => { + const invalidInput = { + items: [1, 2, 3], + requestsPerWindow: 0, + windowMs: 1000, + strategy: 'fixed', + } as Parameters[0] + + const result = await rateLimiterNode.executor(invalidInput, {} as never) + expect(result.success).toBe(false) + }) +}) diff --git a/packages/nodes/src/logic/index.ts b/packages/nodes/src/logic/index.ts index 30b27e9..53a5dd4 100644 --- a/packages/nodes/src/logic/index.ts +++ b/packages/nodes/src/logic/index.ts @@ -19,3 +19,10 @@ export { EndInputSchema, EndOutputSchema } from './end.js'; export { delayNode } from './delay.js'; export type { DelayInput, DelayOutput } from './delay.js'; export { DelayInputSchema, DelayOutputSchema } from './delay.js'; + +export { rateLimiterNode } from './rate-limiter.js' +export type { RateLimiterInput, RateLimiterOutput } from './rate-limiter.js' +export { + RateLimiterInputSchema, + RateLimiterOutputSchema, +} from './rate-limiter.js' diff --git a/packages/nodes/src/logic/rate-limiter.ts b/packages/nodes/src/logic/rate-limiter.ts new file mode 100644 index 0000000..ec357c0 --- /dev/null +++ b/packages/nodes/src/logic/rate-limiter.ts @@ -0,0 +1,111 @@ +import { z } from 'zod' +import { defineNode } from '@jam-nodes/core' + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)) +} + +export const RateLimiterInputSchema = z.object({ + items: z.array(z.unknown()).min(1, 'At least one item is required'), + requestsPerWindow: z.number().int().min(1).max(10000), + windowMs: z.number().int().min(100).max(3600000), + strategy: z.enum(['fixed', 'sliding']), +}) + +export type RateLimiterInput = z.infer + +export const RateLimiterOutputSchema = z.object({ + processedItems: z.array(z.unknown()), + totalItems: z.number(), + totalDurationMs: z.number().min(0), + strategy: z.string(), + windowMs: z.number(), + requestsPerWindow: z.number(), +}) + +export type RateLimiterOutput = z.infer + +export const rateLimiterNode = defineNode({ + type: 'rate_limiter', + name: 'Rate Limiter', + description: + 'Process items while respecting API rate limits using fixed or sliding window strategies', + category: 'logic', + inputSchema: RateLimiterInputSchema, + outputSchema: RateLimiterOutputSchema, + estimatedDuration: 5, + capabilities: { + supportsRerun: true, + supportsCancel: false, + }, + executor: async (input) => { + if (input.requestsPerWindow < 1) { + return { success: false, error: 'requestsPerWindow must be at least 1' } + } + try { + const startTime = Date.now() + const allProcessedItems: unknown[] = [] + + if (input.strategy === 'fixed') { + const batchSize = input.requestsPerWindow + const items = input.items + for (let i = 0; i < items.length; i += batchSize) { + const chunkIndex = Math.floor(i / batchSize) + if (chunkIndex > 0) { + await sleep(input.windowMs) + } + const chunk = items.slice(i, i + batchSize) + for (const item of chunk) { + allProcessedItems.push(item) + } + } + } else { + // sliding window + const timestamps: number[] = [] + for (const item of input.items) { + let now = Date.now() + // Prune expired timestamps + while ( + timestamps.length > 0 && + timestamps[0]! < now - input.windowMs + ) { + timestamps.shift() + } + if (timestamps.length >= input.requestsPerWindow) { + const waitMs = timestamps[0]! + input.windowMs - now + if (waitMs > 0) { + await sleep(waitMs) + } + // Prune again after waking + now = Date.now() + while ( + timestamps.length > 0 && + timestamps[0]! < now - input.windowMs + ) { + timestamps.shift() + } + } + timestamps.push(Date.now()) + allProcessedItems.push(item) + } + } + + return { + success: true, + output: { + processedItems: allProcessedItems, + totalItems: allProcessedItems.length, + totalDurationMs: Date.now() - startTime, + strategy: input.strategy, + windowMs: input.windowMs, + requestsPerWindow: input.requestsPerWindow, + }, + } + } catch (error) { + return { + success: false, + error: error instanceof Error ? error.message : 'Rate limiter failed', + } + } + }, +})