From 95a963ae222c801c48ea876f546a6919f21a228f Mon Sep 17 00:00:00 2001 From: blackpanda Date: Wed, 27 May 2026 16:09:43 +0100 Subject: [PATCH] feat(rate-limit): add per-provider token-bucket limiter --- docs/request-limits-implementation.md | 110 ++++++ jest.config.js | 14 +- package.json | 1 + src/rateLimit.integration.test.ts | 508 ++++++++++++++++++++++++++ src/rateLimit.ts | 281 ++++++++++++++ src/webhookDelivery.ts | 212 +++++++++++ src/webhookMetrics.ts | 71 ++++ 7 files changed, 1196 insertions(+), 1 deletion(-) create mode 100644 docs/request-limits-implementation.md create mode 100644 src/rateLimit.integration.test.ts create mode 100644 src/rateLimit.ts create mode 100644 src/webhookDelivery.ts create mode 100644 src/webhookMetrics.ts diff --git a/docs/request-limits-implementation.md b/docs/request-limits-implementation.md new file mode 100644 index 0000000..08c09b1 --- /dev/null +++ b/docs/request-limits-implementation.md @@ -0,0 +1,110 @@ +# Per-Provider Token-Bucket Rate Limiter + +## Overview + +Outbound webhook deliveries are rate-limited on a **per-provider** basis using a token-bucket algorithm. Each provider gets its own independent bucket, so a slow or throttled partner cannot starve deliveries to other providers. + +Deliveries that exceed a provider's capacity are **queued and paced** — they are never dropped. + +--- + +## Environment Variables + +| Variable | Default | Description | +|---|---|---| +| `WEBHOOK_BUCKET_CAPACITY` | `10` | Maximum tokens a single provider bucket can hold (burst ceiling). | +| `WEBHOOK_REFILL_RATE_PER_SEC` | `2` | Tokens added per second per provider bucket. | +| `WEBHOOK_SECRET_` | *(required)* | HMAC-SHA256 signing secret for each provider. Example: provider ID `acme` → `WEBHOOK_SECRET_ACME`. | + +Both numeric values are validated at process startup. The process will throw a descriptive error and refuse to start if either value is zero, negative, non-numeric, or `Infinity`. + +### Example `.env` + +``` +WEBHOOK_BUCKET_CAPACITY=10 +WEBHOOK_REFILL_RATE_PER_SEC=2 +WEBHOOK_SECRET_ACME= +WEBHOOK_SECRET_PARTNERX= +``` + +--- + +## Algorithm + +``` +tokens = min(capacity, tokens + elapsed_seconds × refillRatePerSec) +``` + +- On each `acquireToken(providerId)` call the bucket is refilled based on elapsed wall-clock time. +- If `tokens >= 1` the token is consumed immediately and the call resolves. +- If `tokens < 1` the caller is added to a FIFO queue. A `setTimeout` fires when the next token is due and drains as many queued callers as the refilled token count allows. The timer re-schedules itself until the queue is empty. + +--- + +## Shared State — Per-Process Behaviour + +**Bucket state is held in-process (a plain `Map`).** Each Node.js process maintains its own independent buckets. + +### Implications for blue/green and multi-replica deployments + +| Scenario | Behaviour | +|---|---| +| Single process | Full rate limiting as configured. | +| Blue/green (one active at a time) | Effective — only one process handles traffic at a time. | +| Multiple replicas behind a load balancer | Each replica enforces its own limit independently. Effective per-replica rate is `capacity / N` where `N` is the number of replicas. | + +### Upgrade path to shared state + +If strict cross-replica rate limiting is required, replace the `Map` in `TokenBucketLimiter` with a Redis-backed store (e.g. using the `INCR` + `EXPIRE` pattern or a Lua script for atomic token consumption). The public `acquireToken` / `getTokenCount` interface is unchanged — only the storage layer needs to swap. + +--- + +## Idempotency + +`WebhookDeliveryService` tracks delivered IDs in an in-process `Set`. Duplicate `deliveryId` values within the same process lifetime are silently skipped (return `{ sent: false }`). + +For cross-process or cross-restart idempotency, the caller should use a distributed lock or deduplicate upstream before calling `deliver()`. + +--- + +## Payload Signing + +Every outbound webhook is signed with HMAC-SHA256: + +``` +X-Webhook-Signature: sha256=<64-char hex digest> +``` + +The signing secret is read from `WEBHOOK_SECRET_` at delivery time and is never stored, logged, or included in error messages. + +--- + +## Metrics + +`webhookMetrics.ts` exposes two counters per provider: + +- `throttledByProvider` — incremented each time a delivery is queued (token not immediately available). +- `deliveredByProvider` — incremented on each successful HTTP delivery. + +Call `getMetrics()` to retrieve a point-in-time snapshot. These are in-process counters; export to Prometheus, CloudWatch, or a push-gateway for cross-process aggregation. + +--- + +## Security Notes + +1. **No secrets in source.** All signing secrets live in environment variables or a secrets manager. `.env` is in `.gitignore`. +2. **Secret redaction in logs.** Provider IDs are truncated to 4 characters + `****` in all log output (see `redactId()`). Secret values are never passed to the rate limiter or metrics modules. +3. **Error messages.** When a signing secret is missing, the error message contains only the environment variable *name* (e.g. `WEBHOOK_SECRET_ACME`), never the value. +4. **Input validation.** `loadRateLimiterConfig()` rejects zero, negative, non-numeric, and `Infinity` values at boot, preventing silent misconfiguration. +5. **Signature verification.** The `X-Webhook-Signature` header uses `sha256=` format. Receiving partners should verify this header before processing the payload. + +--- + +## File Map + +| File | Purpose | +|---|---| +| `src/rateLimit.ts` | `TokenBucketLimiter` class, `loadRateLimiterConfig`, `redactId`, `defaultLimiter` singleton | +| `src/webhookDelivery.ts` | `WebhookDeliveryService` — rate limiting, signing, idempotency | +| `src/webhookMetrics.ts` | In-process counters for throttled/delivered events | +| `src/rateLimit.integration.test.ts` | Integration tests covering all acceptance criteria | diff --git a/jest.config.js b/jest.config.js index 57cc387..fb8dec3 100644 --- a/jest.config.js +++ b/jest.config.js @@ -4,5 +4,17 @@ module.exports = { testEnvironment: 'node', roots: ['/src'], testMatch: ['**/*.test.ts'], - collectCoverageFrom: ['src/**/*.ts', '!src/**/*.d.ts'], + collectCoverageFrom: [ + 'src/**/*.ts', + '!src/**/*.d.ts', + '!src/index.ts', // Express bootstrap — not unit-testable without a running server + ], + coverageThresholds: { + global: { + lines: 95, + functions: 95, + branches: 90, + statements: 95, + }, + }, }; diff --git a/package.json b/package.json index 4522e40..d08a7c4 100644 --- a/package.json +++ b/package.json @@ -8,6 +8,7 @@ "start": "node dist/index.js", "dev": "ts-node-dev --respawn src/index.ts", "test": "jest", + "test:ci": "jest --ci --coverage --forceExit", "lint": "eslint src --ext .ts" }, "engines": { diff --git a/src/rateLimit.integration.test.ts b/src/rateLimit.integration.test.ts new file mode 100644 index 0000000..31efba9 --- /dev/null +++ b/src/rateLimit.integration.test.ts @@ -0,0 +1,508 @@ +/** + * Integration tests for the per-provider token-bucket rate limiter. + * + * Acceptance criteria verified here: + * 1. Throttling Provider A does NOT block Provider B. + * 2. Bursts beyond capacity are delayed (queued/paced), not dropped. + * 3. Edge cases: zero/invalid capacity, missing env vars, extreme bursts. + * 4. Secret redaction — secrets never appear in log output. + * 5. Idempotency — duplicate deliveryIds are skipped. + * 6. Metrics — throttled and delivered counters are recorded correctly. + */ + +import { + TokenBucketLimiter, + loadRateLimiterConfig, + redactId, + RateLimiterConfig, +} from './rateLimit'; +import { _resetMetrics, getMetrics } from './webhookMetrics'; +import { WebhookDeliveryService } from './webhookDelivery'; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +/** Build a limiter with explicit config (bypasses env vars). */ +function makeLimiter(capacity: number, refillRatePerSec: number): TokenBucketLimiter { + return new TokenBucketLimiter({ capacity, refillRatePerSec }); +} + +/** + * Resolve after `ms` milliseconds. + * Using real timers so the token-bucket refill math is exercised end-to-end. + */ +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +// --------------------------------------------------------------------------- +// Setup / teardown +// --------------------------------------------------------------------------- + +beforeEach(() => { + _resetMetrics(); +}); + +// --------------------------------------------------------------------------- +// 1. loadRateLimiterConfig — env-var parsing and validation +// --------------------------------------------------------------------------- + +describe('loadRateLimiterConfig', () => { + const originalEnv = process.env; + + beforeEach(() => { + process.env = { ...originalEnv }; + }); + + afterEach(() => { + process.env = originalEnv; + }); + + it('returns defaults when env vars are absent', () => { + delete process.env.WEBHOOK_BUCKET_CAPACITY; + delete process.env.WEBHOOK_REFILL_RATE_PER_SEC; + const cfg = loadRateLimiterConfig(); + expect(cfg.capacity).toBe(10); + expect(cfg.refillRatePerSec).toBe(2); + }); + + it('parses valid env vars correctly', () => { + process.env.WEBHOOK_BUCKET_CAPACITY = '20'; + process.env.WEBHOOK_REFILL_RATE_PER_SEC = '5'; + const cfg = loadRateLimiterConfig(); + expect(cfg.capacity).toBe(20); + expect(cfg.refillRatePerSec).toBe(5); + }); + + it('throws on zero capacity', () => { + process.env.WEBHOOK_BUCKET_CAPACITY = '0'; + expect(() => loadRateLimiterConfig()).toThrow(/WEBHOOK_BUCKET_CAPACITY/); + }); + + it('throws on negative capacity', () => { + process.env.WEBHOOK_BUCKET_CAPACITY = '-5'; + expect(() => loadRateLimiterConfig()).toThrow(/WEBHOOK_BUCKET_CAPACITY/); + }); + + it('throws on non-numeric capacity', () => { + process.env.WEBHOOK_BUCKET_CAPACITY = 'abc'; + expect(() => loadRateLimiterConfig()).toThrow(/WEBHOOK_BUCKET_CAPACITY/); + }); + + it('throws on zero refill rate', () => { + process.env.WEBHOOK_REFILL_RATE_PER_SEC = '0'; + expect(() => loadRateLimiterConfig()).toThrow(/WEBHOOK_REFILL_RATE_PER_SEC/); + }); + + it('throws on non-numeric refill rate', () => { + process.env.WEBHOOK_REFILL_RATE_PER_SEC = 'fast'; + expect(() => loadRateLimiterConfig()).toThrow(/WEBHOOK_REFILL_RATE_PER_SEC/); + }); + + it('throws on Infinity capacity', () => { + process.env.WEBHOOK_BUCKET_CAPACITY = 'Infinity'; + expect(() => loadRateLimiterConfig()).toThrow(/WEBHOOK_BUCKET_CAPACITY/); + }); +}); + +// --------------------------------------------------------------------------- +// 2. redactId — security helper +// --------------------------------------------------------------------------- + +describe('redactId', () => { + it('redacts IDs longer than 4 chars', () => { + expect(redactId('provider-acme')).toBe('prov****'); + }); + + it('fully redacts IDs of 4 chars or fewer', () => { + expect(redactId('abc')).toBe('****'); + expect(redactId('abcd')).toBe('****'); + }); + + it('fully redacts empty string', () => { + expect(redactId('')).toBe('****'); + }); + + it('never exposes more than 4 leading characters', () => { + const result = redactId('supersecretprovider'); + expect(result).toMatch(/^.{4}\*{4}$/); + expect(result).not.toContain('secret'); + }); +}); + +// --------------------------------------------------------------------------- +// 3. TokenBucketLimiter — core token-bucket behaviour +// --------------------------------------------------------------------------- + +describe('TokenBucketLimiter', () => { + it('starts with a full bucket (capacity tokens available)', () => { + const limiter = makeLimiter(5, 1); + expect(limiter.getTokenCount('p1')).toBeCloseTo(5, 0); + }); + + it('acquireToken resolves immediately when tokens are available', async () => { + const limiter = makeLimiter(3, 1); + const start = Date.now(); + await limiter.acquireToken('p1'); + expect(Date.now() - start).toBeLessThan(50); // well under 50 ms + }); + + it('consumes one token per acquireToken call', async () => { + const limiter = makeLimiter(3, 1); + await limiter.acquireToken('p1'); + expect(limiter.getTokenCount('p1')).toBeCloseTo(2, 0); + await limiter.acquireToken('p1'); + expect(limiter.getTokenCount('p1')).toBeCloseTo(1, 0); + }); + + it('queues calls when bucket is empty and resolves after refill', async () => { + // capacity=1, refill=10/sec → next token in ~100 ms + const limiter = makeLimiter(1, 10); + + await limiter.acquireToken('p1'); // consumes the only token + + const start = Date.now(); + await limiter.acquireToken('p1'); // must wait for refill + const elapsed = Date.now() - start; + + // Should have waited roughly 100 ms (1/10 sec), allow generous tolerance + expect(elapsed).toBeGreaterThanOrEqual(80); + expect(elapsed).toBeLessThan(500); + }, 2000); + + it('records a throttle metric when a call is queued', async () => { + const limiter = makeLimiter(1, 20); // fast refill so test doesn't hang + await limiter.acquireToken('p1'); // drain bucket + + const waiter = limiter.acquireToken('p1'); // should be throttled + expect(getMetrics().throttledByProvider['p1']).toBe(1); + await waiter; + }, 2000); + + it('getQueueDepth returns the number of waiting callers', async () => { + const limiter = makeLimiter(1, 1); // slow refill + await limiter.acquireToken('p1'); // drain + + // Queue two waiters without awaiting them yet + const w1 = limiter.acquireToken('p1'); + const w2 = limiter.acquireToken('p1'); + + expect(limiter.getQueueDepth('p1')).toBe(2); + + // Clean up — await both so no dangling timers + await Promise.all([w1, w2]); + }, 5000); + + // ------------------------------------------------------------------------- + // ACCEPTANCE CRITERION 1: Provider A throttling does NOT block Provider B + // ------------------------------------------------------------------------- + + it('AC1 — throttling provider A does not block provider B', async () => { + // capacity=1, refill=2/sec → next token in ~500 ms + const limiter = makeLimiter(1, 2); + + // Drain provider A's bucket + await limiter.acquireToken('providerA'); + + // Provider A is now empty — queue a waiter + let aResolved = false; + const aWaiter = limiter.acquireToken('providerA').then(() => { + aResolved = true; + }); + + // Provider B should resolve immediately (its own full bucket) + const bStart = Date.now(); + await limiter.acquireToken('providerB'); + const bElapsed = Date.now() - bStart; + + // B must not have waited for A's refill + expect(bElapsed).toBeLessThan(100); + expect(aResolved).toBe(false); // A is still waiting + + await aWaiter; // clean up + }, 3000); + + // ------------------------------------------------------------------------- + // ACCEPTANCE CRITERION 2: Bursts beyond capacity are delayed, not dropped + // ------------------------------------------------------------------------- + + it('AC2 — burst beyond capacity is queued and all tokens eventually delivered', async () => { + // capacity=2, refill=10/sec → tokens refill every 100 ms + const limiter = makeLimiter(2, 10); + const BURST = 5; // 3 beyond capacity + + const results: number[] = []; + const start = Date.now(); + + const promises = Array.from({ length: BURST }, (_, i) => + limiter.acquireToken('burstProvider').then(() => { + results.push(Date.now() - start); + }), + ); + + await Promise.all(promises); + + // All BURST deliveries must have completed (none dropped) + expect(results).toHaveLength(BURST); + + // First 2 should be near-instant (within bucket capacity) + expect(results[0]).toBeLessThan(100); + expect(results[1]).toBeLessThan(100); + + // Remaining 3 must have been delayed (paced) + expect(results[2]).toBeGreaterThanOrEqual(80); + expect(results[3]).toBeGreaterThanOrEqual(80); + expect(results[4]).toBeGreaterThanOrEqual(80); + }, 5000); + + it('handles multiple providers independently with no cross-contamination', async () => { + const limiter = makeLimiter(2, 5); + const providers = ['alpha', 'beta', 'gamma']; + + // Each provider gets its own full bucket — all should resolve quickly + const start = Date.now(); + await Promise.all(providers.map((p) => limiter.acquireToken(p))); + expect(Date.now() - start).toBeLessThan(100); + + // Drain each provider's remaining token + await Promise.all(providers.map((p) => limiter.acquireToken(p))); + + // Now all buckets are empty — each provider queues independently + const throttledBefore = getMetrics().throttledByProvider; + const waiters = providers.map((p) => limiter.acquireToken(p)); + providers.forEach((p) => { + expect(getMetrics().throttledByProvider[p]).toBe( + (throttledBefore[p] ?? 0) + 1, + ); + }); + + await Promise.all(waiters); + }, 5000); +}); + +// --------------------------------------------------------------------------- +// 4. WebhookDeliveryService — idempotency and signing +// --------------------------------------------------------------------------- + +describe('WebhookDeliveryService', () => { + const originalEnv = process.env; + + beforeEach(() => { + process.env = { ...originalEnv }; + process.env.WEBHOOK_SECRET_TESTPROVIDER = 'test-secret-value'; + }); + + afterEach(() => { + process.env = originalEnv; + }); + + it('skips duplicate deliveryIds (idempotency)', async () => { + // Use a fast limiter so the test doesn't wait on rate limits + const limiter = makeLimiter(100, 100); + const svc = new WebhookDeliveryService(limiter); + + // Mock fetch to avoid real network calls + const mockFetch = jest.fn().mockResolvedValue({ status: 200 }); + global.fetch = mockFetch as unknown as typeof fetch; + + const req = { + providerId: 'testprovider', + deliveryId: 'evt-dup-001', + targetUrl: 'https://example.com/hook', + payload: { event: 'test' }, + }; + + const first = await svc.deliver(req); + const second = await svc.deliver(req); + + expect(first.sent).toBe(true); + expect(second.sent).toBe(false); // duplicate skipped + expect(mockFetch).toHaveBeenCalledTimes(1); // only one HTTP call + }); + + it('isDelivered returns true after a successful delivery', async () => { + const limiter = makeLimiter(100, 100); + const svc = new WebhookDeliveryService(limiter); + + global.fetch = jest.fn().mockResolvedValue({ status: 200 }) as unknown as typeof fetch; + + await svc.deliver({ + providerId: 'testprovider', + deliveryId: 'evt-check-001', + targetUrl: 'https://example.com/hook', + payload: {}, + }); + + expect(svc.isDelivered('evt-check-001')).toBe(true); + expect(svc.isDelivered('evt-check-999')).toBe(false); + }); + + it('throws when the provider signing secret is missing', async () => { + delete process.env.WEBHOOK_SECRET_TESTPROVIDER; + const limiter = makeLimiter(100, 100); + const svc = new WebhookDeliveryService(limiter); + + await expect( + svc.deliver({ + providerId: 'testprovider', + deliveryId: 'evt-nosecret-001', + targetUrl: 'https://example.com/hook', + payload: {}, + }), + ).rejects.toThrow(/WEBHOOK_SECRET_TESTPROVIDER/); + }); + + it('error message for missing secret does NOT contain the secret value', async () => { + process.env.WEBHOOK_SECRET_TESTPROVIDER = 'super-secret-password-123'; + // Temporarily remove it to trigger the error path, then check message + const secret = process.env.WEBHOOK_SECRET_TESTPROVIDER; + delete process.env.WEBHOOK_SECRET_TESTPROVIDER; + + const limiter = makeLimiter(100, 100); + const svc = new WebhookDeliveryService(limiter); + + let errorMessage = ''; + try { + await svc.deliver({ + providerId: 'testprovider', + deliveryId: 'evt-redact-001', + targetUrl: 'https://example.com/hook', + payload: {}, + }); + } catch (err) { + errorMessage = (err as Error).message; + } + + expect(errorMessage).not.toContain(secret); + expect(errorMessage).toContain('WEBHOOK_SECRET_TESTPROVIDER'); + }); + + it('records a delivered metric on success', async () => { + const limiter = makeLimiter(100, 100); + const svc = new WebhookDeliveryService(limiter); + + global.fetch = jest.fn().mockResolvedValue({ status: 200 }) as unknown as typeof fetch; + + await svc.deliver({ + providerId: 'testprovider', + deliveryId: 'evt-metric-001', + targetUrl: 'https://example.com/hook', + payload: {}, + }); + + expect(getMetrics().deliveredByProvider['testprovider']).toBe(1); + }); + + it('wraps network errors with redacted provider ID', async () => { + const limiter = makeLimiter(100, 100); + const svc = new WebhookDeliveryService(limiter); + + global.fetch = jest.fn().mockRejectedValue(new Error('ECONNREFUSED')) as unknown as typeof fetch; + + await expect( + svc.deliver({ + providerId: 'testprovider', + deliveryId: 'evt-neterr-001', + targetUrl: 'https://example.com/hook', + payload: {}, + }), + ).rejects.toThrow(/ECONNREFUSED/); + }); + + it('sends X-Webhook-Signature header with sha256= prefix', async () => { + const limiter = makeLimiter(100, 100); + const svc = new WebhookDeliveryService(limiter); + + let capturedHeaders: Record = {}; + global.fetch = jest.fn().mockImplementation((_url: string, opts: RequestInit) => { + capturedHeaders = opts.headers as Record; + return Promise.resolve({ status: 200 }); + }) as unknown as typeof fetch; + + await svc.deliver({ + providerId: 'testprovider', + deliveryId: 'evt-sig-001', + targetUrl: 'https://example.com/hook', + payload: { data: 'hello' }, + }); + + expect(capturedHeaders['X-Webhook-Signature']).toMatch(/^sha256=[a-f0-9]{64}$/); + }); +}); + +// --------------------------------------------------------------------------- +// 5. Metrics module +// --------------------------------------------------------------------------- + +describe('webhookMetrics', () => { + it('starts with empty counters after reset', () => { + const m = getMetrics(); + expect(m.throttledByProvider).toEqual({}); + expect(m.deliveredByProvider).toEqual({}); + }); + + it('getMetrics returns a copy — mutations do not affect internal state', () => { + const limiter = makeLimiter(1, 20); + // Drain and trigger a throttle + void limiter.acquireToken('x'); + void limiter.acquireToken('x'); + + const snap1 = getMetrics(); + snap1.throttledByProvider['injected'] = 999; + + const snap2 = getMetrics(); + expect(snap2.throttledByProvider['injected']).toBeUndefined(); + }); +}); + +// --------------------------------------------------------------------------- +// 6. Edge cases +// --------------------------------------------------------------------------- + +describe('Edge cases', () => { + it('handles a single-token capacity correctly (capacity=1)', async () => { + const limiter = makeLimiter(1, 10); + await limiter.acquireToken('solo'); + expect(limiter.getTokenCount('solo')).toBeCloseTo(0, 0); + }); + + it('handles fractional refill rates (e.g. 0.5 tokens/sec)', async () => { + // 0.5/sec → 1 token every 2 seconds; use capacity=1 so we can drain it + const limiter = makeLimiter(1, 0.5); + await limiter.acquireToken('slow'); + // Queue a waiter — it should eventually resolve (we don't await to keep test fast) + const waiter = limiter.acquireToken('slow'); + expect(limiter.getQueueDepth('slow')).toBe(1); + // Cancel by letting the test end; Jest will warn about open handles only + // if the timer fires after the suite — acceptable for this edge-case check. + // We resolve it to avoid leaking: + await waiter; + }, 5000); + + it('large burst (50 requests) against capacity=5 — all complete, none dropped', async () => { + const limiter = makeLimiter(5, 50); // 50 tokens/sec → 20 ms per token + const BURST = 50; + const results: boolean[] = []; + + const promises = Array.from({ length: BURST }, () => + limiter.acquireToken('bigburst').then(() => { + results.push(true); + }), + ); + + await Promise.all(promises); + expect(results).toHaveLength(BURST); + expect(results.every(Boolean)).toBe(true); + }, 10000); + + it('concurrent acquireToken calls for different providers never interfere', async () => { + const limiter = makeLimiter(1, 100); + const providers = Array.from({ length: 10 }, (_, i) => `provider-${i}`); + + // Each provider has its own full bucket — all should resolve immediately + const start = Date.now(); + await Promise.all(providers.map((p) => limiter.acquireToken(p))); + expect(Date.now() - start).toBeLessThan(100); + }); +}); diff --git a/src/rateLimit.ts b/src/rateLimit.ts new file mode 100644 index 0000000..88a4770 --- /dev/null +++ b/src/rateLimit.ts @@ -0,0 +1,281 @@ +/** + * @module rateLimit + * + * Per-provider token-bucket rate limiter for outbound webhook deliveries. + * + * ## Algorithm + * Each provider gets its own token bucket. Tokens refill continuously at + * `refillRatePerSec` tokens/second up to `capacity`. When a caller requests + * a token and the bucket is empty the call is queued and resolved as soon as + * enough tokens have refilled — deliveries are **paced/queued, never dropped**. + * + * ## State + * Buckets are held in-process (a plain `Map`). In a blue/green or + * multi-replica deployment each process maintains its own independent bucket + * state. This is intentional: the implementation does not require Redis or + * any shared store. See `docs/request-limits-implementation.md` for the + * trade-off discussion and upgrade path. + * + * ## Configuration (environment variables) + * | Variable | Default | Description | + * |---------------------------------|---------|------------------------------------------| + * | `WEBHOOK_BUCKET_CAPACITY` | `10` | Max tokens per provider bucket | + * | `WEBHOOK_REFILL_RATE_PER_SEC` | `2` | Tokens added per second per provider | + * + * Both values are validated at construction time; the process will throw a + * descriptive error on invalid configuration rather than silently misbehaving. + * + * ## Security + * Provider secrets are **never** passed to or stored by this module. + * Only opaque provider IDs appear in log output. + */ + +import { recordThrottled } from './webhookMetrics'; + +// --------------------------------------------------------------------------- +// Configuration helpers +// --------------------------------------------------------------------------- + +/** Validated, parsed rate-limiter configuration. */ +export interface RateLimiterConfig { + /** Maximum number of tokens a single provider bucket can hold. */ + capacity: number; + /** Number of tokens added to each bucket per second. */ + refillRatePerSec: number; +} + +/** + * Parse and validate rate-limiter configuration from environment variables. + * + * @throws {Error} If any value is missing, non-numeric, non-positive, or + * `capacity` is zero (which would make every delivery block forever). + */ +export function loadRateLimiterConfig(): RateLimiterConfig { + const rawCapacity = process.env.WEBHOOK_BUCKET_CAPACITY ?? '10'; + const rawRefill = process.env.WEBHOOK_REFILL_RATE_PER_SEC ?? '2'; + + const capacity = Number(rawCapacity); + const refillRatePerSec = Number(rawRefill); + + if (!Number.isFinite(capacity) || capacity <= 0) { + throw new Error( + `[rateLimit] Invalid WEBHOOK_BUCKET_CAPACITY="${rawCapacity}". ` + + 'Must be a finite positive number greater than zero.', + ); + } + + if (!Number.isFinite(refillRatePerSec) || refillRatePerSec <= 0) { + throw new Error( + `[rateLimit] Invalid WEBHOOK_REFILL_RATE_PER_SEC="${rawRefill}". ` + + 'Must be a finite positive number greater than zero.', + ); + } + + return { capacity, refillRatePerSec }; +} + +// --------------------------------------------------------------------------- +// Internal bucket state +// --------------------------------------------------------------------------- + +/** Runtime state for a single provider's token bucket. */ +interface BucketState { + /** Current token count (may be fractional between refill ticks). */ + tokens: number; + /** Timestamp (ms) of the last refill calculation. */ + lastRefillMs: number; + /** Pending waiters in FIFO order. Each resolves when a token is available. */ + queue: Array<() => void>; +} + +// --------------------------------------------------------------------------- +// TokenBucketLimiter +// --------------------------------------------------------------------------- + +/** + * Per-provider token-bucket rate limiter. + * + * Instantiate once at application startup (or use the module-level singleton + * {@link defaultLimiter}) and share the instance across all delivery workers. + * + * @example + * ```ts + * const limiter = new TokenBucketLimiter(); + * await limiter.acquireToken('provider-acme'); + * // safe to send the webhook now + * ``` + */ +export class TokenBucketLimiter { + private readonly capacity: number; + private readonly refillRatePerSec: number; + private readonly buckets: Map = new Map(); + + /** + * @param config - Parsed configuration. Defaults to + * {@link loadRateLimiterConfig} (reads env vars) when omitted. + */ + constructor(config?: RateLimiterConfig) { + const resolved = config ?? loadRateLimiterConfig(); + this.capacity = resolved.capacity; + this.refillRatePerSec = resolved.refillRatePerSec; + } + + // ------------------------------------------------------------------------- + // Public API + // ------------------------------------------------------------------------- + + /** + * Acquire one token for the given provider. + * + * Resolves immediately when a token is available. If the bucket is empty + * the returned promise is queued and resolves as soon as the next refill + * produces a token — the delivery is **paced, not dropped**. + * + * @param providerId - Opaque provider identifier. Must NOT contain secrets. + * @returns A promise that resolves when the caller may proceed with delivery. + */ + public async acquireToken(providerId: string): Promise { + this.refillBucket(providerId); + const bucket = this.getBucket(providerId); + + if (bucket.tokens >= 1) { + bucket.tokens -= 1; + return; + } + + // Bucket is empty — queue the caller and record the throttle event. + recordThrottled(providerId); + console.log( + `[rateLimit] Provider "${redactId(providerId)}" throttled — queuing delivery.`, + ); + + return new Promise((resolve) => { + bucket.queue.push(resolve); + this.scheduleRefill(providerId); + }); + } + + /** + * Return the current token count for a provider without consuming a token. + * Useful for observability and testing. + * + * @param providerId - Opaque provider identifier. + */ + public getTokenCount(providerId: string): number { + this.refillBucket(providerId); + return this.getBucket(providerId).tokens; + } + + /** + * Return the number of queued (waiting) deliveries for a provider. + * + * @param providerId - Opaque provider identifier. + */ + public getQueueDepth(providerId: string): number { + return this.getBucket(providerId).queue.length; + } + + // ------------------------------------------------------------------------- + // Private helpers + // ------------------------------------------------------------------------- + + /** + * Retrieve or lazily create the bucket state for a provider. + */ + private getBucket(providerId: string): BucketState { + if (!this.buckets.has(providerId)) { + this.buckets.set(providerId, { + tokens: this.capacity, + lastRefillMs: Date.now(), + queue: [], + }); + } + // Non-null assertion is safe: we just set it above if absent. + return this.buckets.get(providerId)!; + } + + /** + * Apply elapsed-time token refill to the named bucket using the continuous + * token-bucket formula: + * + * newTokens = min(capacity, currentTokens + elapsed_sec * refillRate) + */ + private refillBucket(providerId: string): void { + const bucket = this.getBucket(providerId); + const nowMs = Date.now(); + const elapsedSec = (nowMs - bucket.lastRefillMs) / 1_000; + const added = elapsedSec * this.refillRatePerSec; + + bucket.tokens = Math.min(this.capacity, bucket.tokens + added); + bucket.lastRefillMs = nowMs; + } + + /** + * Schedule a `setTimeout` to drain the queue for a provider once enough + * time has elapsed to produce the next token. + * + * Only one timer is scheduled per provider at a time; the drain loop + * re-schedules itself while the queue is non-empty. + */ + private scheduleRefill(providerId: string): void { + const bucket = this.getBucket(providerId); + // Time (ms) until the next whole token is available. + const msUntilToken = Math.ceil((1 / this.refillRatePerSec) * 1_000); + + setTimeout(() => { + this.refillBucket(providerId); + this.drainQueue(providerId); + }, msUntilToken); + } + + /** + * Resolve as many queued waiters as the current token count allows, + * then re-schedule if the queue is still non-empty. + */ + private drainQueue(providerId: string): void { + const bucket = this.getBucket(providerId); + + while (bucket.queue.length > 0 && bucket.tokens >= 1) { + bucket.tokens -= 1; + const resolve = bucket.queue.shift()!; + resolve(); + } + + if (bucket.queue.length > 0) { + this.scheduleRefill(providerId); + } + } +} + +// --------------------------------------------------------------------------- +// Security helper +// --------------------------------------------------------------------------- + +/** + * Redact a provider ID for safe log output. + * + * Shows only the first 4 characters followed by `****` to aid debugging + * without leaking full identifiers that might encode sensitive routing info. + * + * @param id - Raw provider identifier. + * @returns Redacted string safe for log output. + */ +export function redactId(id: string): string { + if (id.length <= 4) { + return '****'; + } + return `${id.slice(0, 4)}****`; +} + +// --------------------------------------------------------------------------- +// Module-level singleton +// --------------------------------------------------------------------------- + +/** + * Shared default limiter instance, initialised once at module load time. + * Configuration is read from environment variables (see module docs). + * + * Import and use this in `WebhookDeliveryService` and anywhere else that + * needs rate-limited webhook delivery. + */ +export const defaultLimiter: TokenBucketLimiter = new TokenBucketLimiter(); diff --git a/src/webhookDelivery.ts b/src/webhookDelivery.ts new file mode 100644 index 0000000..50ab3ad --- /dev/null +++ b/src/webhookDelivery.ts @@ -0,0 +1,212 @@ +/** + * @module webhookDelivery + * + * Outbound webhook delivery service with per-provider token-bucket rate + * limiting, HMAC-SHA256 payload signing, and idempotency enforcement. + * + * ## Security + * - Provider signing secrets are read exclusively from environment variables + * (`WEBHOOK_SECRET_`). They are **never** logged, + * stored in memory beyond the signing operation, or included in any + * error message. + * - The `X-Webhook-Signature` header uses the format + * `sha256=` (compatible with GitHub-style webhook verification). + * + * ## Idempotency + * Each delivery attempt carries a caller-supplied `deliveryId`. The service + * tracks delivered IDs in an in-process `Set`; duplicate calls with the same + * ID are silently skipped. In a multi-process deployment the caller should + * use a distributed lock or deduplicate upstream. + * + * ## Rate Limiting + * Deliveries are paced through {@link TokenBucketLimiter}. A slow provider + * only blocks its own bucket — other providers are unaffected. + */ + +import { createHmac } from 'crypto'; +import { TokenBucketLimiter, defaultLimiter, redactId } from './rateLimit'; +import { recordDelivered } from './webhookMetrics'; + +// --------------------------------------------------------------------------- +// Types +// --------------------------------------------------------------------------- + +/** Payload passed to {@link WebhookDeliveryService.deliver}. */ +export interface DeliveryRequest { + /** Opaque provider identifier — used for rate-limit bucketing and secret lookup. */ + providerId: string; + /** Globally unique delivery identifier used for idempotency. */ + deliveryId: string; + /** Destination URL for the webhook POST. */ + targetUrl: string; + /** Arbitrary JSON-serialisable payload body. */ + payload: unknown; +} + +/** Result returned by {@link WebhookDeliveryService.deliver}. */ +export interface DeliveryResult { + /** `true` if the webhook was sent; `false` if it was a duplicate (skipped). */ + sent: boolean; + /** HTTP status code from the target, or `undefined` for duplicates. */ + statusCode?: number; +} + +// --------------------------------------------------------------------------- +// WebhookDeliveryService +// --------------------------------------------------------------------------- + +/** + * Manages outbound webhook delivery with rate limiting, signing, and + * idempotency. + * + * @example + * ```ts + * const svc = new WebhookDeliveryService(); + * await svc.deliver({ + * providerId: 'acme', + * deliveryId: 'evt-001', + * targetUrl: 'https://hooks.acme.com/inbound', + * payload: { event: 'contract.signed', contractId: '123' }, + * }); + * ``` + */ +export class WebhookDeliveryService { + private readonly limiter: TokenBucketLimiter; + private readonly deliveredIds: Set = new Set(); + + /** + * @param limiter - Token-bucket limiter to use. Defaults to the shared + * {@link defaultLimiter} singleton so all service instances share the + * same per-provider buckets within a process. + */ + constructor(limiter: TokenBucketLimiter = defaultLimiter) { + this.limiter = limiter; + } + + // ------------------------------------------------------------------------- + // Public API + // ------------------------------------------------------------------------- + + /** + * Deliver a webhook to the target URL. + * + * The call will wait (be paced) until the provider's token bucket has + * capacity before sending. Duplicate `deliveryId` values are silently + * skipped to guarantee at-most-once delivery within a process lifetime. + * + * @param request - Delivery parameters. + * @returns Result indicating whether the webhook was sent. + */ + public async deliver(request: DeliveryRequest): Promise { + const { providerId, deliveryId, targetUrl, payload } = request; + + // --- Idempotency check --------------------------------------------------- + if (this.deliveredIds.has(deliveryId)) { + console.log( + `[webhookDelivery] Duplicate deliveryId="${deliveryId}" for provider ` + + `"${redactId(providerId)}" — skipping.`, + ); + return { sent: false }; + } + + // --- Rate limiting ------------------------------------------------------- + await this.limiter.acquireToken(providerId); + + // --- Payload signing ----------------------------------------------------- + const body = JSON.stringify(payload); + const signature = this.signPayload(providerId, body); + + // --- HTTP delivery ------------------------------------------------------- + const statusCode = await this.postWebhook(targetUrl, body, signature, providerId); + + // --- Post-delivery bookkeeping ------------------------------------------ + this.deliveredIds.add(deliveryId); + recordDelivered(providerId); + + console.log( + `[webhookDelivery] Delivered deliveryId="${deliveryId}" to provider ` + + `"${redactId(providerId)}" — HTTP ${statusCode}.`, + ); + + return { sent: true, statusCode }; + } + + /** + * Check whether a delivery ID has already been processed. + * + * @param deliveryId - The delivery ID to check. + */ + public isDelivered(deliveryId: string): boolean { + return this.deliveredIds.has(deliveryId); + } + + // ------------------------------------------------------------------------- + // Private helpers + // ------------------------------------------------------------------------- + + /** + * Compute an HMAC-SHA256 signature for the given body using the provider's + * secret. The secret is read from the environment variable + * `WEBHOOK_SECRET_` (e.g. `WEBHOOK_SECRET_ACME`). + * + * SECURITY: The secret value is never stored, logged, or included in any + * thrown error. If the env var is absent an error is thrown with only the + * variable *name* (not value) in the message. + * + * @param providerId - Provider identifier used to derive the env-var name. + * @param body - Serialised request body to sign. + * @returns Signature string in the format `sha256=`. + */ + private signPayload(providerId: string, body: string): string { + const envKey = `WEBHOOK_SECRET_${providerId.toUpperCase().replace(/[^A-Z0-9]/g, '_')}`; + const secret = process.env[envKey]; + + if (!secret) { + throw new Error( + `[webhookDelivery] Missing signing secret for provider "${redactId(providerId)}". ` + + `Expected environment variable: ${envKey}`, + ); + } + + const hmac = createHmac('sha256', secret); + hmac.update(body); + return `sha256=${hmac.digest('hex')}`; + } + + /** + * Perform the HTTP POST to the webhook target. + * + * Uses the built-in `fetch` API (Node ≥ 18). Network errors are re-thrown + * with the provider ID redacted. + * + * @param url - Destination URL. + * @param body - Serialised JSON body. + * @param signature - HMAC signature header value. + * @param providerId - Used only for redacted error logging. + * @returns HTTP status code from the target server. + */ + private async postWebhook( + url: string, + body: string, + signature: string, + providerId: string, + ): Promise { + try { + const response = await fetch(url, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'X-Webhook-Signature': signature, + }, + body, + }); + return response.status; + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + throw new Error( + `[webhookDelivery] Network error delivering to provider ` + + `"${redactId(providerId)}": ${message}`, + ); + } + } +} diff --git a/src/webhookMetrics.ts b/src/webhookMetrics.ts new file mode 100644 index 0000000..8af16bb --- /dev/null +++ b/src/webhookMetrics.ts @@ -0,0 +1,71 @@ +/** + * @module webhookMetrics + * + * Lightweight in-process metrics collector for webhook delivery events. + * In a production multi-process deployment these counters are per-process; + * export them to a Prometheus push-gateway or similar aggregator if + * cross-process totals are required. + * + * SECURITY: This module never receives or stores provider secrets. + * Only opaque provider IDs (strings) are recorded. + */ + +/** Shape of the metrics snapshot returned by {@link getMetrics}. */ +export interface WebhookMetricsSnapshot { + /** Total throttled-delivery events recorded since process start, keyed by provider ID. */ + throttledByProvider: Record; + /** Total successful delivery events recorded since process start, keyed by provider ID. */ + deliveredByProvider: Record; +} + +// --------------------------------------------------------------------------- +// Internal mutable state (module-level singletons, reset-able for tests) +// --------------------------------------------------------------------------- + +let throttledByProvider: Record = {}; +let deliveredByProvider: Record = {}; + +// --------------------------------------------------------------------------- +// Public API +// --------------------------------------------------------------------------- + +/** + * Record that a webhook delivery was throttled (token not immediately + * available) for the given provider. + * + * @param providerId - Opaque provider identifier. Must NOT contain secrets. + */ +export function recordThrottled(providerId: string): void { + throttledByProvider[providerId] = (throttledByProvider[providerId] ?? 0) + 1; +} + +/** + * Record that a webhook was successfully delivered for the given provider. + * + * @param providerId - Opaque provider identifier. Must NOT contain secrets. + */ +export function recordDelivered(providerId: string): void { + deliveredByProvider[providerId] = (deliveredByProvider[providerId] ?? 0) + 1; +} + +/** + * Return a point-in-time snapshot of all recorded metrics. + * The returned object is a deep copy; mutations do not affect internal state. + */ +export function getMetrics(): WebhookMetricsSnapshot { + return { + throttledByProvider: { ...throttledByProvider }, + deliveredByProvider: { ...deliveredByProvider }, + }; +} + +/** + * Reset all counters to zero. + * Intended for use in tests only — do not call in production code. + * + * @internal + */ +export function _resetMetrics(): void { + throttledByProvider = {}; + deliveredByProvider = {}; +}