diff --git a/index.js b/index.js index 7493f53..05168e7 100644 --- a/index.js +++ b/index.js @@ -17,6 +17,8 @@ const createVideoRoutes = require('./routes/video'); const { buildAuditLogCsv } = require('./src/utils/export/auditLogCsv'); const { buildAuditLogPdf } = require('./src/utils/export/auditLogPdf'); const { getRequestIp } = require('./src/utils/requestIp'); +const { getRedisClient, closeRedisClient } = require('./src/config/redis'); +const { createRateLimiter } = require('./middleware/rateLimiter'); /** * Create the Express application with injectable services for testing. @@ -54,6 +56,19 @@ function createApp(dependencies = {}) { next(); }); + // Leaky-bucket rate limiting per wallet address (requires Redis). + if (dependencies.rateLimiter) { + app.use('/api', dependencies.rateLimiter); + } else if (process.env.REDIS_URL || process.env.REDIS_HOST) { + app.use('/api', createRateLimiter({ + redis: getRedisClient(), + bucketCapacity: Number(process.env.RATE_LIMIT_CAPACITY || 60), + leakRatePerSecond: Number(process.env.RATE_LIMIT_LEAK_RATE || 1), + blockDurationSeconds: Number(process.env.RATE_LIMIT_BLOCK_SECONDS || 300), + sybilThreshold: Number(process.env.SYBIL_THRESHOLD || 3), + })); + } + app.get('/', (req, res) => { res.json({ project: 'SubStream Protocol', @@ -353,6 +368,112 @@ const port = Number(process.env.PORT || 3000); if (require.main === module) { app.listen(port, () => console.log(`SubStream API running on port ${port}`)); +const cors = require('cors'); +const dotenv = require('dotenv'); + +// Load environment variables +dotenv.config(); + +const app = express(); +const port = process.env.PORT || 3000; + +// Middleware +app.use(cors()); +app.use(express.json({ limit: "10mb" })); +app.use(express.urlencoded({ extended: true })); + +// Leaky-bucket rate limiting per wallet address +if (process.env.REDIS_URL || process.env.REDIS_HOST) { + const { createRateLimiter: createRL } = require('./middleware/rateLimiter'); + const { getRedisClient: getRC } = require('./src/config/redis'); + app.use(createRL({ + redis: getRC(), + bucketCapacity: Number(process.env.RATE_LIMIT_CAPACITY || 60), + leakRatePerSecond: Number(process.env.RATE_LIMIT_LEAK_RATE || 1), + blockDurationSeconds: Number(process.env.RATE_LIMIT_BLOCK_SECONDS || 300), + sybilThreshold: Number(process.env.SYBIL_THRESHOLD || 3), + })); +} + +// Routes +app.use('/auth', require('./routes/auth')); +app.use('/content', require('./routes/content')); +app.use('/analytics', require('./routes/analytics')); +app.use('/storage', require('./routes/storage')); +app.use('/posts', require('./routes/posts')); +app.use("/auth", require("./routes/auth")); +app.use("/auth", require("./routes/stellarAuth")); +app.use("/content", require("./routes/content")); +app.use("/analytics", require("./routes/analytics")); +app.use("/storage", require("./routes/storage")); + +// Health check endpoint +app.get("/health", (req, res) => { + res.json({ + status: "healthy", + timestamp: new Date().toISOString(), + version: "1.0.0", + services: { + auth: 'active', + content: 'active', + analytics: 'active', + storage: 'active', + posts: 'active' + } + auth: "active", + content: "active", + analytics: "active", + storage: "active", + }, + }); +}); + +// Root endpoint +app.get("/", (req, res) => { + res.json({ + project: "SubStream Protocol", + status: "Active", + contract: "CAOUX2FZ65IDC4F2X7LJJ2SVF23A35CCTZB7KVVN475JCLKTTU4CEY6L", + version: "1.0.0", + endpoints: { + auth: '/auth', + content: '/content', + analytics: '/analytics', + storage: '/storage', + posts: '/posts', + health: '/health' + } + auth: "/auth", + content: "/content", + analytics: "/analytics", + storage: "/storage", + health: "/health", + }, + }); +}); + +// Error handling middleware +app.use((err, req, res, next) => { + console.error("Unhandled error:", err); + res.status(500).json({ + success: false, + error: "Internal server error", + }); +}); + +// 404 handler +app.use('*', (req, res) => { + res.status(404).json({ + success: false, + error: "Endpoint not found", + }); +}); + +if (require.main === module) { + app.listen(port, () => { + console.log(`SubStream API running on port ${port}`); + console.log(`Health check: http://localhost:${port}/health`); + }); } module.exports = app; diff --git a/middleware/rateLimiter.js b/middleware/rateLimiter.js new file mode 100644 index 0000000..5e692de --- /dev/null +++ b/middleware/rateLimiter.js @@ -0,0 +1,121 @@ +/** + * Express middleware – per-wallet Leaky Bucket rate limiting with Sybil flagging. + * + * Usage: + * const { createRateLimiter } = require('./middleware/rateLimiter'); + * app.use('/api', createRateLimiter({ redis, bucketCapacity: 60 })); + * + * The middleware extracts the wallet address from (in priority order): + * 1. req.user.address (set by auth middleware) + * 2. req.user.publicKey (Stellar auth) + * 3. req.body.walletAddress + * 4. req.query.walletAddress || req.query.publicKey + * + * When a wallet cannot be determined the request falls through to the + * next middleware so unauthenticated routes still work. + */ + +const { + LeakyBucketRateLimiter, +} = require("../src/services/leakyBucketRateLimiter"); +const { + SybilAnalysisService, +} = require("../src/services/sybilAnalysisService"); +const { getRequestIp } = require("../src/utils/requestIp"); + +/** + * Extract the wallet / public-key identifier from the request. + * + * @param {import('express').Request} req + * @returns {string|null} + */ +function extractWallet(req) { + if (req.user?.address) return req.user.address; + if (req.user?.publicKey) return req.user.publicKey; + if (req.body?.walletAddress) return req.body.walletAddress; + if (req.query?.walletAddress) return req.query.walletAddress; + if (req.query?.publicKey) return req.query.publicKey; + return null; +} + +/** + * Create the rate-limiting middleware. + * + * @param {object} options + * @param {import('ioredis').Redis} options.redis Redis client instance. + * @param {number} [options.bucketCapacity=60] Max burst size. + * @param {number} [options.leakRatePerSecond=1] Tokens drained per second. + * @param {number} [options.blockDurationSeconds=300] Temp-block length after overflow. + * @param {number} [options.sybilThreshold=3] Violations before Sybil flag. + * @param {boolean} [options.skipIfNoWallet=true] Pass through when wallet is unknown. + * @returns {import('express').RequestHandler} + */ +function createRateLimiter(options = {}) { + const { redis, skipIfNoWallet = true } = options; + + if (!redis) { + throw new Error("createRateLimiter requires a redis client instance"); + } + + const limiter = new LeakyBucketRateLimiter(redis, { + bucketCapacity: options.bucketCapacity, + leakRatePerSecond: options.leakRatePerSecond, + blockDurationSeconds: options.blockDurationSeconds, + sybilThreshold: options.sybilThreshold, + }); + + const sybil = new SybilAnalysisService(redis, { + flagThreshold: options.sybilThreshold, + }); + + return async function rateLimiterMiddleware(req, res, next) { + const wallet = extractWallet(req); + + if (!wallet) { + if (skipIfNoWallet) return next(); + return res.status(400).json({ + success: false, + error: "Wallet address required for rate-limited endpoints", + }); + } + + try { + const result = await limiter.consume(wallet); + + // Always attach rate-limit headers so clients can self-throttle. + res.set("X-RateLimit-Limit", String(result.capacity)); + res.set( + "X-RateLimit-Remaining", + String(Math.max(0, Math.floor(result.capacity - result.currentLevel))), + ); + + if (result.allowed) { + return next(); + } + + // --- Request denied --- + + // Flag for Sybil analysis when violations cross the threshold. + if (result.violations !== undefined) { + await sybil.evaluate(wallet, result.violations, { + endpoint: req.originalUrl, + ip: getRequestIp(req), + }); + } + + res.set("Retry-After", String(result.retryAfterSeconds)); + + return res.status(429).json({ + success: false, + error: "Rate limit exceeded. You have been temporarily blocked.", + retryAfterSeconds: result.retryAfterSeconds, + }); + } catch (err) { + // If Redis is unavailable, fail open so the API stays usable. + console.error("[RateLimiter] Redis error – failing open:", err.message); + return next(); + } + }; +} + +module.exports = { createRateLimiter, extractWallet }; diff --git a/package-lock.json b/package-lock.json index 2d109d4..317d12d 100644 --- a/package-lock.json +++ b/package-lock.json @@ -15,6 +15,7 @@ "ethers": "^6.8.1", "express": "^5.2.1", "form-data": "^4.0.0", + "ioredis": "^5.6.1", "ipfs-http-client": "^60.0.1", "jsonwebtoken": "^9.0.2", "multer": "^1.4.5-lts.1", @@ -570,6 +571,12 @@ "node": ">=14" } }, + "node_modules/@ioredis/commands": { + "version": "1.5.1", + "resolved": "https://registry.npmjs.org/@ioredis/commands/-/commands-1.5.1.tgz", + "integrity": "sha512-JH8ZL/ywcJyR9MmJ5BNqZllXNZQqQbnVZOqpPQqE1vHiFgAw4NHbvE0FOduNU8IX9babitBT46571OnPTT0Zcw==", + "license": "MIT" + }, "node_modules/@ipld/car": { "version": "3.2.4", "resolved": "https://registry.npmjs.org/@ipld/car/-/car-3.2.4.tgz", @@ -2783,6 +2790,15 @@ "node": ">=12" } }, + "node_modules/cluster-key-slot": { + "version": "1.1.2", + "resolved": "https://registry.npmjs.org/cluster-key-slot/-/cluster-key-slot-1.1.2.tgz", + "integrity": "sha512-RMr0FhtfXemyinomL4hrWcYJxmX6deFdCxpJzhDttxgO1+bcCnkk+9drydLVDmAMG7NE6aN/fl4F7ucU/90gAA==", + "license": "Apache-2.0", + "engines": { + "node": ">=0.10.0" + } + }, "node_modules/co": { "version": "4.6.0", "resolved": "https://registry.npmjs.org/co/-/co-4.6.0.tgz", @@ -3132,6 +3148,15 @@ "node": ">=0.4.0" } }, + "node_modules/denque": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/denque/-/denque-2.1.0.tgz", + "integrity": "sha512-HVQE3AAb/pxF8fQAoiqpvg9i3evqug3hoiwakOyZAwJm+6vZehbkYXZ0l4JxS+I3QxM97v5aaRNhj8v5oBhekw==", + "license": "Apache-2.0", + "engines": { + "node": ">=0.10" + } + }, "node_modules/depd": { "version": "2.0.0", "license": "MIT", @@ -4307,6 +4332,30 @@ "integrity": "sha512-+WvfEZnFUhRwFxgz+QCQi7UC6o9AM0EHM9bpIe2Nhqb100NHCsTvNAn4eJgvgV2/tmLo1MP9nGxQKEcZTAueLA==", "license": "Apache-2.0 OR MIT" }, + "node_modules/ioredis": { + "version": "5.10.1", + "resolved": "https://registry.npmjs.org/ioredis/-/ioredis-5.10.1.tgz", + "integrity": "sha512-HuEDBTI70aYdx1v6U97SbNx9F1+svQKBDo30o0b9fw055LMepzpOOd0Ccg9Q6tbqmBSJaMuY0fB7yw9/vjBYCA==", + "license": "MIT", + "dependencies": { + "@ioredis/commands": "1.5.1", + "cluster-key-slot": "^1.1.0", + "debug": "^4.3.4", + "denque": "^2.1.0", + "lodash.defaults": "^4.2.0", + "lodash.isarguments": "^3.1.0", + "redis-errors": "^1.2.0", + "redis-parser": "^3.0.0", + "standard-as-callback": "^2.1.0" + }, + "engines": { + "node": ">=12.22.0" + }, + "funding": { + "type": "opencollective", + "url": "https://opencollective.com/ioredis" + } + }, "node_modules/ip-regex": { "version": "4.3.0", "resolved": "https://registry.npmjs.org/ip-regex/-/ip-regex-4.3.0.tgz", @@ -4323,241 +4372,6 @@ "node": ">= 0.10" } }, - "node_modules/ipfs-car": { - "version": "0.7.0", - "resolved": "https://registry.npmjs.org/ipfs-car/-/ipfs-car-0.7.0.tgz", - "integrity": "sha512-9ser6WWZ1ZMTCGbcVkRXUzOrpQ4SIiLfzIEnk+3LQsXbV09yeZg3ijhRuEXozEIYE68Go9JmOFshamsK9iKlNQ==", - "license": "(Apache-2.0 AND MIT)", - "dependencies": { - "@ipld/car": "^3.2.3", - "@web-std/blob": "^3.0.1", - "bl": "^5.0.0", - "blockstore-core": "^1.0.2", - "browser-readablestream-to-it": "^1.0.2", - "idb-keyval": "^6.0.3", - "interface-blockstore": "^2.0.2", - "ipfs-core-types": "^0.8.3", - "ipfs-core-utils": "^0.12.1", - "ipfs-unixfs-exporter": "^7.0.4", - "ipfs-unixfs-importer": "^9.0.4", - "ipfs-utils": "^9.0.2", - "it-all": "^1.0.5", - "it-last": "^1.0.5", - "it-pipe": "^1.1.0", - "meow": "^9.0.0", - "move-file": "^2.1.0", - "multiformats": "^9.6.3", - "stream-to-it": "^0.2.3", - "streaming-iterables": "^6.0.0", - "uint8arrays": "^3.0.0" - }, - "bin": { - "🚘": "dist/cjs/cli/cli.js", - "ipfs-car": "dist/cjs/cli/cli.js" - } - }, - "node_modules/ipfs-car/node_modules/any-signal": { - "version": "2.1.2", - "resolved": "https://registry.npmjs.org/any-signal/-/any-signal-2.1.2.tgz", - "integrity": "sha512-B+rDnWasMi/eWcajPcCWSlYc7muXOrcYrqgyzcdKisl2H/WTlQ0gip1KyQfr0ZlxJdsuWCj/LWwQm7fhyhRfIQ==", - "license": "MIT", - "dependencies": { - "abort-controller": "^3.0.0", - "native-abort-controller": "^1.0.3" - } - }, - "node_modules/ipfs-car/node_modules/blob-to-it": { - "version": "1.0.4", - "resolved": "https://registry.npmjs.org/blob-to-it/-/blob-to-it-1.0.4.tgz", - "integrity": "sha512-iCmk0W4NdbrWgRRuxOriU8aM5ijeVLI61Zulsmg/lUHNr7pYjoj+U77opLefNagevtrrbMt3JQ5Qip7ar178kA==", - "license": "ISC", - "dependencies": { - "browser-readablestream-to-it": "^1.0.3" - } - }, - "node_modules/ipfs-car/node_modules/browser-readablestream-to-it": { - "version": "1.0.3", - "resolved": "https://registry.npmjs.org/browser-readablestream-to-it/-/browser-readablestream-to-it-1.0.3.tgz", - "integrity": "sha512-+12sHB+Br8HIh6VAMVEG5r3UXCyESIgDW7kzk3BjIXa43DVqVwL7GC5TW3jeh+72dtcH99pPVpw0X8i0jt+/kw==", - "license": "ISC" - }, - "node_modules/ipfs-car/node_modules/interface-datastore": { - "version": "6.1.1", - "resolved": "https://registry.npmjs.org/interface-datastore/-/interface-datastore-6.1.1.tgz", - "integrity": "sha512-AmCS+9CT34pp2u0QQVXjKztkuq3y5T+BIciuiHDDtDZucZD8VudosnSdUyXJV6IsRkN5jc4RFDhCk1O6Q3Gxjg==", - "license": "MIT", - "dependencies": { - "interface-store": "^2.0.2", - "nanoid": "^3.0.2", - "uint8arrays": "^3.0.0" - } - }, - "node_modules/ipfs-car/node_modules/interface-store": { - "version": "2.0.2", - "resolved": "https://registry.npmjs.org/interface-store/-/interface-store-2.0.2.tgz", - "integrity": "sha512-rScRlhDcz6k199EkHqT8NpM87ebN89ICOzILoBHgaG36/WX50N32BnU/kpZgCGPLhARRAWUUX5/cyaIjt7Kipg==", - "license": "(Apache-2.0 OR MIT)" - }, - "node_modules/ipfs-car/node_modules/ipfs-core-types": { - "version": "0.8.4", - "resolved": "https://registry.npmjs.org/ipfs-core-types/-/ipfs-core-types-0.8.4.tgz", - "integrity": "sha512-sbRZA1QX3xJ6ywTiVQZMOxhlhp4osAZX2SXx3azOLxAtxmGWDMkHYt722VV4nZ2GyJy8qyk5GHQIZ0uvQnpaTg==", - "deprecated": "js-IPFS has been deprecated in favour of Helia - please see https://github.com/ipfs/js-ipfs/issues/4336 for details", - "license": "(Apache-2.0 OR MIT)", - "dependencies": { - "interface-datastore": "^6.0.2", - "multiaddr": "^10.0.0", - "multiformats": "^9.4.13" - } - }, - "node_modules/ipfs-car/node_modules/ipfs-core-utils": { - "version": "0.12.2", - "resolved": "https://registry.npmjs.org/ipfs-core-utils/-/ipfs-core-utils-0.12.2.tgz", - "integrity": "sha512-RfxP3rPhXuqKIUmTAUhmee6fmaV3A7LMnjOUikRKpSyqESz/DR7aGK7tbttMxkZdkSEr0rFXlqbyb0vVwmn0wQ==", - "deprecated": "js-IPFS has been deprecated in favour of Helia - please see https://github.com/ipfs/js-ipfs/issues/4336 for details", - "license": "MIT", - "dependencies": { - "any-signal": "^2.1.2", - "blob-to-it": "^1.0.1", - "browser-readablestream-to-it": "^1.0.1", - "debug": "^4.1.1", - "err-code": "^3.0.1", - "ipfs-core-types": "^0.8.4", - "ipfs-unixfs": "^6.0.3", - "ipfs-utils": "^9.0.2", - "it-all": "^1.0.4", - "it-map": "^1.0.4", - "it-peekable": "^1.0.2", - "it-to-stream": "^1.0.0", - "merge-options": "^3.0.4", - "multiaddr": "^10.0.0", - "multiaddr-to-uri": "^8.0.0", - "multiformats": "^9.4.13", - "nanoid": "^3.1.23", - "parse-duration": "^1.0.0", - "timeout-abort-controller": "^1.1.1", - "uint8arrays": "^3.0.0" - } - }, - "node_modules/ipfs-car/node_modules/ipfs-unixfs": { - "version": "6.0.9", - "resolved": "https://registry.npmjs.org/ipfs-unixfs/-/ipfs-unixfs-6.0.9.tgz", - "integrity": "sha512-0DQ7p0/9dRB6XCb0mVCTli33GzIzSVx5udpJuVM47tGcD+W+Bl4LsnoLswd3ggNnNEakMv1FdoFITiEnchXDqQ==", - "license": "Apache-2.0 OR MIT", - "dependencies": { - "err-code": "^3.0.1", - "protobufjs": "^6.10.2" - }, - "engines": { - "node": ">=16.0.0", - "npm": ">=7.0.0" - } - }, - "node_modules/ipfs-car/node_modules/it-all": { - "version": "1.0.6", - "resolved": "https://registry.npmjs.org/it-all/-/it-all-1.0.6.tgz", - "integrity": "sha512-3cmCc6Heqe3uWi3CVM/k51fa/XbMFpQVzFoDsV0IZNHSQDyAXl3c4MjHkFX5kF3922OGj7Myv1nSEUgRtcuM1A==", - "license": "ISC" - }, - "node_modules/ipfs-car/node_modules/it-last": { - "version": "1.0.6", - "resolved": "https://registry.npmjs.org/it-last/-/it-last-1.0.6.tgz", - "integrity": "sha512-aFGeibeiX/lM4bX3JY0OkVCFkAw8+n9lkukkLNivbJRvNz8lI3YXv5xcqhFUV2lDJiraEK3OXRDbGuevnnR67Q==", - "license": "ISC" - }, - "node_modules/ipfs-car/node_modules/it-map": { - "version": "1.0.6", - "resolved": "https://registry.npmjs.org/it-map/-/it-map-1.0.6.tgz", - "integrity": "sha512-XT4/RM6UHIFG9IobGlQPFQUrlEKkU4eBUFG3qhWhfAdh1JfF2x11ShCrKCdmZ0OiZppPfoLuzcfA4cey6q3UAQ==", - "license": "ISC" - }, - "node_modules/ipfs-car/node_modules/it-peekable": { - "version": "1.0.3", - "resolved": "https://registry.npmjs.org/it-peekable/-/it-peekable-1.0.3.tgz", - "integrity": "sha512-5+8zemFS+wSfIkSZyf0Zh5kNN+iGyccN02914BY4w/Dj+uoFEoPSvj5vaWn8pNZJNSxzjW0zHRxC3LUb2KWJTQ==", - "license": "ISC" - }, - "node_modules/ipfs-car/node_modules/long": { - "version": "4.0.0", - "resolved": "https://registry.npmjs.org/long/-/long-4.0.0.tgz", - "integrity": "sha512-XsP+KhQif4bjX1kbuSiySJFNAehNxgLb6hPRGJ9QsUr8ajHkuXGdrHmFUTUUXhDwVX2R5bY4JNZEwbUiMhV+MA==", - "license": "Apache-2.0" - }, - "node_modules/ipfs-car/node_modules/multiformats": { - "version": "9.9.0", - "resolved": "https://registry.npmjs.org/multiformats/-/multiformats-9.9.0.tgz", - "integrity": "sha512-HoMUjhH9T8DDBNT+6xzkrd9ga/XiBI4xLr58LJACwK6G3HTOPeMz4nB4KJs33L2BelrIJa7P0VuNaVF3hMYfjg==", - "license": "(Apache-2.0 AND MIT)" - }, - "node_modules/ipfs-car/node_modules/nanoid": { - "version": "3.3.11", - "resolved": "https://registry.npmjs.org/nanoid/-/nanoid-3.3.11.tgz", - "integrity": "sha512-N8SpfPUnUp1bK+PMYW8qSWdl9U+wwNWI4QKxOYDy9JAro3WMX7p2OeVRF9v+347pnakNevPmiHhNmZ2HbFA76w==", - "funding": [ - { - "type": "github", - "url": "https://github.com/sponsors/ai" - } - ], - "license": "MIT", - "bin": { - "nanoid": "bin/nanoid.cjs" - }, - "engines": { - "node": "^10 || ^12 || ^13.7 || ^14 || >=15.0.1" - } - }, - "node_modules/ipfs-car/node_modules/protobufjs": { - "version": "6.11.4", - "resolved": "https://registry.npmjs.org/protobufjs/-/protobufjs-6.11.4.tgz", - "integrity": "sha512-5kQWPaJHi1WoCpjTGszzQ32PG2F4+wRY6BmAT4Vfw56Q2FZ4YZzK20xUYQH4YkfehY1e6QSICrJquM6xXZNcrw==", - "hasInstallScript": true, - "license": "BSD-3-Clause", - "dependencies": { - "@protobufjs/aspromise": "^1.1.2", - "@protobufjs/base64": "^1.1.2", - "@protobufjs/codegen": "^2.0.4", - "@protobufjs/eventemitter": "^1.1.0", - "@protobufjs/fetch": "^1.1.0", - "@protobufjs/float": "^1.0.2", - "@protobufjs/inquire": "^1.1.0", - "@protobufjs/path": "^1.1.2", - "@protobufjs/pool": "^1.1.0", - "@protobufjs/utf8": "^1.1.0", - "@types/long": "^4.0.1", - "@types/node": ">=13.7.0", - "long": "^4.0.0" - }, - "bin": { - "pbjs": "bin/pbjs", - "pbts": "bin/pbts" - } - }, - "node_modules/ipfs-car/node_modules/retimer": { - "version": "2.0.0", - "resolved": "https://registry.npmjs.org/retimer/-/retimer-2.0.0.tgz", - "integrity": "sha512-KLXY85WkEq2V2bKex/LOO1ViXVn2KGYe4PYysAdYdjmraYIUsVkXu8O4am+8+5UbaaGl1qho4aqAAPHNQ4GSbg==", - "license": "MIT" - }, - "node_modules/ipfs-car/node_modules/timeout-abort-controller": { - "version": "1.1.1", - "resolved": "https://registry.npmjs.org/timeout-abort-controller/-/timeout-abort-controller-1.1.1.tgz", - "integrity": "sha512-BsF9i3NAJag6T0ZEjki9j654zoafI2X6ayuNd6Tp8+Ul6Tr5s4jo973qFeiWrRSweqvskC+AHDKUmIW4b7pdhQ==", - "license": "MIT", - "dependencies": { - "abort-controller": "^3.0.0", - "retimer": "^2.0.0" - } - }, - "node_modules/ipfs-car/node_modules/uint8arrays": { - "version": "3.1.1", - "resolved": "https://registry.npmjs.org/uint8arrays/-/uint8arrays-3.1.1.tgz", - "integrity": "sha512-+QJa8QRnbdXVpHYjLoTpJIdCTiw9Ir62nocClWuXIq2JIh4Uta0cQsTSpFL678p2CN8B+XSApwcU+pQEqVpKWg==", - "license": "MIT", - "dependencies": { - "multiformats": "^9.4.2" - } - }, "node_modules/ipfs-core-types": { "version": "0.14.1", "resolved": "https://registry.npmjs.org/ipfs-core-types/-/ipfs-core-types-0.14.1.tgz", @@ -6380,12 +6194,24 @@ "node": ">=8" } }, + "node_modules/lodash.defaults": { + "version": "4.2.0", + "resolved": "https://registry.npmjs.org/lodash.defaults/-/lodash.defaults-4.2.0.tgz", + "integrity": "sha512-qjxPLHd3r5DnsdGacqOMU6pb/avJzdh9tFX2ymgoZE27BmjXrNy/y4LoaiTeAb+O3gL8AfpJGtqfX/ae2leYYQ==", + "license": "MIT" + }, "node_modules/lodash.includes": { "version": "4.3.0", "resolved": "https://registry.npmjs.org/lodash.includes/-/lodash.includes-4.3.0.tgz", "integrity": "sha512-W3Bx6mdkRTGtlJISOvVD/lbqjTlPPUDTMnlXZFnVwi9NKJ6tiAk6LVdlhZMm17VZisqhKcgzpO5Wz91PCt5b0w==", "license": "MIT" }, + "node_modules/lodash.isarguments": { + "version": "3.1.0", + "resolved": "https://registry.npmjs.org/lodash.isarguments/-/lodash.isarguments-3.1.0.tgz", + "integrity": "sha512-chi4NHZlZqZD18a0imDHnZPrDeBbTtVN7GXMwuGdRH9qotxAjYs3aVLKc7zNOG9eddR5Ksd8rvFEBc9SsggPpg==", + "license": "MIT" + }, "node_modules/lodash.isboolean": { "version": "3.0.3", "resolved": "https://registry.npmjs.org/lodash.isboolean/-/lodash.isboolean-3.0.3.tgz", @@ -7788,6 +7614,27 @@ "node": ">=8" } }, + "node_modules/redis-errors": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/redis-errors/-/redis-errors-1.2.0.tgz", + "integrity": "sha512-1qny3OExCf0UvUV/5wpYKf2YwPcOqXzkwKKSmKHiE6ZMQs5heeE/c8eXK+PNllPvmjgAbfnsbpkGZWy8cBpn9w==", + "license": "MIT", + "engines": { + "node": ">=4" + } + }, + "node_modules/redis-parser": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/redis-parser/-/redis-parser-3.0.0.tgz", + "integrity": "sha512-DJnGAeenTdpMEH6uAJRK/uiyEIH9WVsUmoLwzudwGJUwZPp80PDBWPHXSAGNPwNvIXAbe7MSUB1zQFugFml66A==", + "license": "MIT", + "dependencies": { + "redis-errors": "^1.0.0" + }, + "engines": { + "node": ">=4" + } + }, "node_modules/require-addon": { "version": "1.2.0", "resolved": "https://registry.npmjs.org/require-addon/-/require-addon-1.2.0.tgz", @@ -8252,6 +8099,12 @@ "node": ">=10" } }, + "node_modules/standard-as-callback": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/standard-as-callback/-/standard-as-callback-2.1.0.tgz", + "integrity": "sha512-qoRRSyROncaz1z0mvYqIE4lCd9p2R90i6GxW3uZv5ucSu8tU7B5HXUP1gG8pVZsYNVaXjk8ClXHPttLyxAL48A==", + "license": "MIT" + }, "node_modules/statuses": { "version": "2.0.2", "license": "MIT", @@ -8977,6 +8830,34 @@ "w3name": "^1.0.6" } }, + "node_modules/web3.storage/node_modules/@ipld/dag-pb": { + "version": "2.1.18", + "resolved": "https://registry.npmjs.org/@ipld/dag-pb/-/dag-pb-2.1.18.tgz", + "integrity": "sha512-ZBnf2fuX9y3KccADURG5vb9FaOeMjFkCrNysB0PtftME/4iCTjxfaLoNq/IAh5fTqUOMXvryN6Jyka4ZGuMLIg==", + "license": "(Apache-2.0 AND MIT)", + "dependencies": { + "multiformats": "^9.5.4" + } + }, + "node_modules/web3.storage/node_modules/any-signal": { + "version": "2.1.2", + "resolved": "https://registry.npmjs.org/any-signal/-/any-signal-2.1.2.tgz", + "integrity": "sha512-B+rDnWasMi/eWcajPcCWSlYc7muXOrcYrqgyzcdKisl2H/WTlQ0gip1KyQfr0ZlxJdsuWCj/LWwQm7fhyhRfIQ==", + "license": "MIT", + "dependencies": { + "abort-controller": "^3.0.0", + "native-abort-controller": "^1.0.3" + } + }, + "node_modules/web3.storage/node_modules/blob-to-it": { + "version": "1.0.4", + "resolved": "https://registry.npmjs.org/blob-to-it/-/blob-to-it-1.0.4.tgz", + "integrity": "sha512-iCmk0W4NdbrWgRRuxOriU8aM5ijeVLI61Zulsmg/lUHNr7pYjoj+U77opLefNagevtrrbMt3JQ5Qip7ar178kA==", + "license": "ISC", + "dependencies": { + "browser-readablestream-to-it": "^1.0.3" + } + }, "node_modules/web3.storage/node_modules/browser-readablestream-to-it": { "version": "1.0.3", "resolved": "https://registry.npmjs.org/browser-readablestream-to-it/-/browser-readablestream-to-it-1.0.3.tgz", @@ -8992,12 +8873,240 @@ "cborg": "cli.js" } }, + "node_modules/web3.storage/node_modules/interface-datastore": { + "version": "6.1.1", + "resolved": "https://registry.npmjs.org/interface-datastore/-/interface-datastore-6.1.1.tgz", + "integrity": "sha512-AmCS+9CT34pp2u0QQVXjKztkuq3y5T+BIciuiHDDtDZucZD8VudosnSdUyXJV6IsRkN5jc4RFDhCk1O6Q3Gxjg==", + "license": "MIT", + "dependencies": { + "interface-store": "^2.0.2", + "nanoid": "^3.0.2", + "uint8arrays": "^3.0.0" + } + }, + "node_modules/web3.storage/node_modules/interface-store": { + "version": "2.0.2", + "resolved": "https://registry.npmjs.org/interface-store/-/interface-store-2.0.2.tgz", + "integrity": "sha512-rScRlhDcz6k199EkHqT8NpM87ebN89ICOzILoBHgaG36/WX50N32BnU/kpZgCGPLhARRAWUUX5/cyaIjt7Kipg==", + "license": "(Apache-2.0 OR MIT)" + }, + "node_modules/web3.storage/node_modules/ipfs-car": { + "version": "0.7.0", + "resolved": "https://registry.npmjs.org/ipfs-car/-/ipfs-car-0.7.0.tgz", + "integrity": "sha512-9ser6WWZ1ZMTCGbcVkRXUzOrpQ4SIiLfzIEnk+3LQsXbV09yeZg3ijhRuEXozEIYE68Go9JmOFshamsK9iKlNQ==", + "license": "(Apache-2.0 AND MIT)", + "dependencies": { + "@ipld/car": "^3.2.3", + "@web-std/blob": "^3.0.1", + "bl": "^5.0.0", + "blockstore-core": "^1.0.2", + "browser-readablestream-to-it": "^1.0.2", + "idb-keyval": "^6.0.3", + "interface-blockstore": "^2.0.2", + "ipfs-core-types": "^0.8.3", + "ipfs-core-utils": "^0.12.1", + "ipfs-unixfs-exporter": "^7.0.4", + "ipfs-unixfs-importer": "^9.0.4", + "ipfs-utils": "^9.0.2", + "it-all": "^1.0.5", + "it-last": "^1.0.5", + "it-pipe": "^1.1.0", + "meow": "^9.0.0", + "move-file": "^2.1.0", + "multiformats": "^9.6.3", + "stream-to-it": "^0.2.3", + "streaming-iterables": "^6.0.0", + "uint8arrays": "^3.0.0" + }, + "bin": { + "🚘": "dist/cjs/cli/cli.js", + "ipfs-car": "dist/cjs/cli/cli.js" + } + }, + "node_modules/web3.storage/node_modules/ipfs-core-types": { + "version": "0.8.4", + "resolved": "https://registry.npmjs.org/ipfs-core-types/-/ipfs-core-types-0.8.4.tgz", + "integrity": "sha512-sbRZA1QX3xJ6ywTiVQZMOxhlhp4osAZX2SXx3azOLxAtxmGWDMkHYt722VV4nZ2GyJy8qyk5GHQIZ0uvQnpaTg==", + "deprecated": "js-IPFS has been deprecated in favour of Helia - please see https://github.com/ipfs/js-ipfs/issues/4336 for details", + "license": "(Apache-2.0 OR MIT)", + "dependencies": { + "interface-datastore": "^6.0.2", + "multiaddr": "^10.0.0", + "multiformats": "^9.4.13" + } + }, + "node_modules/web3.storage/node_modules/ipfs-core-utils": { + "version": "0.12.2", + "resolved": "https://registry.npmjs.org/ipfs-core-utils/-/ipfs-core-utils-0.12.2.tgz", + "integrity": "sha512-RfxP3rPhXuqKIUmTAUhmee6fmaV3A7LMnjOUikRKpSyqESz/DR7aGK7tbttMxkZdkSEr0rFXlqbyb0vVwmn0wQ==", + "deprecated": "js-IPFS has been deprecated in favour of Helia - please see https://github.com/ipfs/js-ipfs/issues/4336 for details", + "license": "MIT", + "dependencies": { + "any-signal": "^2.1.2", + "blob-to-it": "^1.0.1", + "browser-readablestream-to-it": "^1.0.1", + "debug": "^4.1.1", + "err-code": "^3.0.1", + "ipfs-core-types": "^0.8.4", + "ipfs-unixfs": "^6.0.3", + "ipfs-utils": "^9.0.2", + "it-all": "^1.0.4", + "it-map": "^1.0.4", + "it-peekable": "^1.0.2", + "it-to-stream": "^1.0.0", + "merge-options": "^3.0.4", + "multiaddr": "^10.0.0", + "multiaddr-to-uri": "^8.0.0", + "multiformats": "^9.4.13", + "nanoid": "^3.1.23", + "parse-duration": "^1.0.0", + "timeout-abort-controller": "^1.1.1", + "uint8arrays": "^3.0.0" + } + }, + "node_modules/web3.storage/node_modules/ipfs-unixfs": { + "version": "6.0.9", + "resolved": "https://registry.npmjs.org/ipfs-unixfs/-/ipfs-unixfs-6.0.9.tgz", + "integrity": "sha512-0DQ7p0/9dRB6XCb0mVCTli33GzIzSVx5udpJuVM47tGcD+W+Bl4LsnoLswd3ggNnNEakMv1FdoFITiEnchXDqQ==", + "license": "Apache-2.0 OR MIT", + "dependencies": { + "err-code": "^3.0.1", + "protobufjs": "^6.10.2" + }, + "engines": { + "node": ">=16.0.0", + "npm": ">=7.0.0" + } + }, + "node_modules/web3.storage/node_modules/ipfs-unixfs-importer": { + "version": "9.0.10", + "resolved": "https://registry.npmjs.org/ipfs-unixfs-importer/-/ipfs-unixfs-importer-9.0.10.tgz", + "integrity": "sha512-W+tQTVcSmXtFh7FWYWwPBGXJ1xDgREbIyI1E5JzDcimZLIyT5gGMfxR3oKPxxWj+GKMpP5ilvMQrbsPzWcm3Fw==", + "license": "Apache-2.0 OR MIT", + "dependencies": { + "@ipld/dag-pb": "^2.0.2", + "@multiformats/murmur3": "^1.0.3", + "bl": "^5.0.0", + "err-code": "^3.0.1", + "hamt-sharding": "^2.0.0", + "interface-blockstore": "^2.0.3", + "ipfs-unixfs": "^6.0.0", + "it-all": "^1.0.5", + "it-batch": "^1.0.8", + "it-first": "^1.0.6", + "it-parallel-batch": "^1.0.9", + "merge-options": "^3.0.4", + "multiformats": "^9.4.2", + "rabin-wasm": "^0.1.4", + "uint8arrays": "^3.0.0" + }, + "engines": { + "node": ">=16.0.0", + "npm": ">=7.0.0" + } + }, + "node_modules/web3.storage/node_modules/it-all": { + "version": "1.0.6", + "resolved": "https://registry.npmjs.org/it-all/-/it-all-1.0.6.tgz", + "integrity": "sha512-3cmCc6Heqe3uWi3CVM/k51fa/XbMFpQVzFoDsV0IZNHSQDyAXl3c4MjHkFX5kF3922OGj7Myv1nSEUgRtcuM1A==", + "license": "ISC" + }, + "node_modules/web3.storage/node_modules/it-first": { + "version": "1.0.7", + "resolved": "https://registry.npmjs.org/it-first/-/it-first-1.0.7.tgz", + "integrity": "sha512-nvJKZoBpZD/6Rtde6FXqwDqDZGF1sCADmr2Zoc0hZsIvnE449gRFnGctxDf09Bzc/FWnHXAdaHVIetY6lrE0/g==", + "license": "ISC" + }, + "node_modules/web3.storage/node_modules/it-last": { + "version": "1.0.6", + "resolved": "https://registry.npmjs.org/it-last/-/it-last-1.0.6.tgz", + "integrity": "sha512-aFGeibeiX/lM4bX3JY0OkVCFkAw8+n9lkukkLNivbJRvNz8lI3YXv5xcqhFUV2lDJiraEK3OXRDbGuevnnR67Q==", + "license": "ISC" + }, + "node_modules/web3.storage/node_modules/it-map": { + "version": "1.0.6", + "resolved": "https://registry.npmjs.org/it-map/-/it-map-1.0.6.tgz", + "integrity": "sha512-XT4/RM6UHIFG9IobGlQPFQUrlEKkU4eBUFG3qhWhfAdh1JfF2x11ShCrKCdmZ0OiZppPfoLuzcfA4cey6q3UAQ==", + "license": "ISC" + }, + "node_modules/web3.storage/node_modules/it-peekable": { + "version": "1.0.3", + "resolved": "https://registry.npmjs.org/it-peekable/-/it-peekable-1.0.3.tgz", + "integrity": "sha512-5+8zemFS+wSfIkSZyf0Zh5kNN+iGyccN02914BY4w/Dj+uoFEoPSvj5vaWn8pNZJNSxzjW0zHRxC3LUb2KWJTQ==", + "license": "ISC" + }, + "node_modules/web3.storage/node_modules/long": { + "version": "4.0.0", + "resolved": "https://registry.npmjs.org/long/-/long-4.0.0.tgz", + "integrity": "sha512-XsP+KhQif4bjX1kbuSiySJFNAehNxgLb6hPRGJ9QsUr8ajHkuXGdrHmFUTUUXhDwVX2R5bY4JNZEwbUiMhV+MA==", + "license": "Apache-2.0" + }, "node_modules/web3.storage/node_modules/multiformats": { "version": "9.9.0", "resolved": "https://registry.npmjs.org/multiformats/-/multiformats-9.9.0.tgz", "integrity": "sha512-HoMUjhH9T8DDBNT+6xzkrd9ga/XiBI4xLr58LJACwK6G3HTOPeMz4nB4KJs33L2BelrIJa7P0VuNaVF3hMYfjg==", "license": "(Apache-2.0 AND MIT)" }, + "node_modules/web3.storage/node_modules/nanoid": { + "version": "3.3.11", + "resolved": "https://registry.npmjs.org/nanoid/-/nanoid-3.3.11.tgz", + "integrity": "sha512-N8SpfPUnUp1bK+PMYW8qSWdl9U+wwNWI4QKxOYDy9JAro3WMX7p2OeVRF9v+347pnakNevPmiHhNmZ2HbFA76w==", + "funding": [ + { + "type": "github", + "url": "https://github.com/sponsors/ai" + } + ], + "license": "MIT", + "bin": { + "nanoid": "bin/nanoid.cjs" + }, + "engines": { + "node": "^10 || ^12 || ^13.7 || ^14 || >=15.0.1" + } + }, + "node_modules/web3.storage/node_modules/protobufjs": { + "version": "6.11.4", + "resolved": "https://registry.npmjs.org/protobufjs/-/protobufjs-6.11.4.tgz", + "integrity": "sha512-5kQWPaJHi1WoCpjTGszzQ32PG2F4+wRY6BmAT4Vfw56Q2FZ4YZzK20xUYQH4YkfehY1e6QSICrJquM6xXZNcrw==", + "hasInstallScript": true, + "license": "BSD-3-Clause", + "dependencies": { + "@protobufjs/aspromise": "^1.1.2", + "@protobufjs/base64": "^1.1.2", + "@protobufjs/codegen": "^2.0.4", + "@protobufjs/eventemitter": "^1.1.0", + "@protobufjs/fetch": "^1.1.0", + "@protobufjs/float": "^1.0.2", + "@protobufjs/inquire": "^1.1.0", + "@protobufjs/path": "^1.1.2", + "@protobufjs/pool": "^1.1.0", + "@protobufjs/utf8": "^1.1.0", + "@types/long": "^4.0.1", + "@types/node": ">=13.7.0", + "long": "^4.0.0" + }, + "bin": { + "pbjs": "bin/pbjs", + "pbts": "bin/pbts" + } + }, + "node_modules/web3.storage/node_modules/retimer": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/retimer/-/retimer-2.0.0.tgz", + "integrity": "sha512-KLXY85WkEq2V2bKex/LOO1ViXVn2KGYe4PYysAdYdjmraYIUsVkXu8O4am+8+5UbaaGl1qho4aqAAPHNQ4GSbg==", + "license": "MIT" + }, + "node_modules/web3.storage/node_modules/timeout-abort-controller": { + "version": "1.1.1", + "resolved": "https://registry.npmjs.org/timeout-abort-controller/-/timeout-abort-controller-1.1.1.tgz", + "integrity": "sha512-BsF9i3NAJag6T0ZEjki9j654zoafI2X6ayuNd6Tp8+Ul6Tr5s4jo973qFeiWrRSweqvskC+AHDKUmIW4b7pdhQ==", + "license": "MIT", + "dependencies": { + "abort-controller": "^3.0.0", + "retimer": "^2.0.0" + } + }, "node_modules/web3.storage/node_modules/uint8arrays": { "version": "3.1.1", "resolved": "https://registry.npmjs.org/uint8arrays/-/uint8arrays-3.1.1.tgz", diff --git a/package.json b/package.json index 40c3a54..b564e54 100644 --- a/package.json +++ b/package.json @@ -19,6 +19,7 @@ "express": "^5.2.1", "fluent-ffmpeg": "^2.1.2", "form-data": "^4.0.0", + "ioredis": "^5.6.1", "ipfs-http-client": "^60.0.1", "jsonwebtoken": "^9.0.2", "multer": "^1.4.5-lts.1", diff --git a/rateLimiter.test.js b/rateLimiter.test.js new file mode 100644 index 0000000..24f2ff4 --- /dev/null +++ b/rateLimiter.test.js @@ -0,0 +1,443 @@ +/** + * Tests for the Leaky Bucket rate limiter, Sybil analysis, and middleware. + * + * Uses a simple in-memory Redis mock so the suite runs without a real Redis + * instance. + */ + +const { + LeakyBucketRateLimiter, +} = require("./src/services/leakyBucketRateLimiter"); +const { SybilAnalysisService } = require("./src/services/sybilAnalysisService"); +const { + createRateLimiter, + extractWallet, +} = require("./middleware/rateLimiter"); + +// --------------------------------------------------------------------------- +// Minimal Redis mock (supports only the commands the services actually use) +// --------------------------------------------------------------------------- +class RedisMock { + constructor() { + this.store = new Map(); // key -> string value + this.hashes = new Map(); // key -> Map(field -> value) + this.sortedSets = new Map(); // key -> Map(member -> score) + this.ttls = new Map(); // key -> expiry timestamp + } + + // --- strings --- + async get(key) { + this._evict(key); + return this.store.get(key) ?? null; + } + async set(key, value, ...args) { + this.store.set(key, String(value)); + if (args[0] === "EX" && typeof args[1] === "number") { + this.ttls.set(key, Date.now() + args[1] * 1000); + } + return "OK"; + } + async incr(key) { + const cur = parseInt(this.store.get(key) || "0", 10); + const next = cur + 1; + this.store.set(key, String(next)); + return next; + } + async del(...keys) { + let count = 0; + for (const k of keys) { + if (this.store.delete(k)) count++; + if (this.hashes.delete(k)) count++; + if (this.sortedSets.delete(k)) count++; + this.ttls.delete(k); + } + return count; + } + async ttl(key) { + const ex = this.ttls.get(key); + if (!ex) return -1; + const remaining = Math.ceil((ex - Date.now()) / 1000); + return remaining > 0 ? remaining : -2; + } + async expire(key, seconds) { + this.ttls.set(key, Date.now() + seconds * 1000); + return 1; + } + + // --- hashes --- + async hmset(key, obj) { + if (!this.hashes.has(key)) this.hashes.set(key, new Map()); + const h = this.hashes.get(key); + for (const [f, v] of Object.entries(obj)) h.set(f, String(v)); + return "OK"; + } + async hmget(key, ...fields) { + const h = this.hashes.get(key); + return fields.map((f) => (h ? (h.get(f) ?? null) : null)); + } + async hgetall(key) { + const h = this.hashes.get(key); + if (!h) return {}; + const result = {}; + for (const [f, v] of h) result[f] = v; + return result; + } + + // --- sorted sets --- + async zadd(key, score, member) { + if (!this.sortedSets.has(key)) this.sortedSets.set(key, new Map()); + this.sortedSets.get(key).set(member, score); + return 1; + } + async zscore(key, member) { + const ss = this.sortedSets.get(key); + if (!ss) return null; + const s = ss.get(member); + return s !== undefined ? String(s) : null; + } + async zrevrange(key, start, stop, withScores) { + const ss = this.sortedSets.get(key); + if (!ss) return []; + const entries = [...ss.entries()] + .sort((a, b) => b[1] - a[1]) + .slice(start, stop + 1); + const result = []; + for (const [member, score] of entries) { + result.push(member); + if (withScores === "WITHSCORES") result.push(String(score)); + } + return result; + } + async zrem(key, member) { + const ss = this.sortedSets.get(key); + if (!ss) return 0; + return ss.delete(member) ? 1 : 0; + } + + // --- eval (Lua emulation) --- + async eval(script, numkeys, key, capacity, leakRate, now) { + capacity = Number(capacity); + leakRate = Number(leakRate); + now = Number(now); + + const [levelStr, lastDripStr] = await this.hmget(key, "level", "lastDrip"); + let level = parseFloat(levelStr) || 0; + const lastDrip = parseFloat(lastDripStr) || now; + + const elapsed = now - lastDrip; + const leaked = elapsed * leakRate; + level = Math.max(0, level - leaked); + + if (level + 1 > capacity) { + await this.hmset(key, { level: String(level), lastDrip: String(now) }); + return [0, String(level), String(capacity)]; + } + + level = level + 1; + await this.hmset(key, { level: String(level), lastDrip: String(now) }); + return [1, String(level), String(capacity)]; + } + + // TTL eviction helper + _evict(key) { + const ex = this.ttls.get(key); + if (ex && Date.now() > ex) { + this.store.delete(key); + this.hashes.delete(key); + this.ttls.delete(key); + } + } +} + +// --------------------------------------------------------------------------- +// LeakyBucketRateLimiter +// --------------------------------------------------------------------------- +describe("LeakyBucketRateLimiter", () => { + let redis; + let limiter; + + beforeEach(() => { + redis = new RedisMock(); + limiter = new LeakyBucketRateLimiter(redis, { + bucketCapacity: 5, + leakRatePerSecond: 1, + blockDurationSeconds: 10, + sybilThreshold: 2, + }); + }); + + it("allows requests within capacity", async () => { + const result = await limiter.consume("WALLET_A"); + expect(result.allowed).toBe(true); + expect(result.blocked).toBe(false); + expect(result.currentLevel).toBeGreaterThan(0); + }); + + it("rejects and blocks when bucket overflows", async () => { + // Fill bucket to capacity. + for (let i = 0; i < 5; i++) { + const r = await limiter.consume("WALLET_B"); + expect(r.allowed).toBe(true); + } + + // Next request should be rejected. + const rejected = await limiter.consume("WALLET_B"); + expect(rejected.allowed).toBe(false); + expect(rejected.blocked).toBe(true); + expect(rejected.retryAfterSeconds).toBe(10); + }); + + it("returns blocked status on subsequent attempts while blocked", async () => { + for (let i = 0; i < 5; i++) await limiter.consume("WALLET_C"); + await limiter.consume("WALLET_C"); // triggers block + + const attempt = await limiter.consume("WALLET_C"); + expect(attempt.allowed).toBe(false); + expect(attempt.blocked).toBe(true); + }); + + it("tracks violations", async () => { + for (let i = 0; i < 5; i++) await limiter.consume("WALLET_D"); + const overflow = await limiter.consume("WALLET_D"); + expect(overflow.violations).toBe(1); + + const count = await limiter.getViolationCount("WALLET_D"); + expect(count).toBe(1); + }); + + it("isBlocked returns correct state", async () => { + expect(await limiter.isBlocked("WALLET_E")).toBe(false); + for (let i = 0; i < 5; i++) await limiter.consume("WALLET_E"); + await limiter.consume("WALLET_E"); + expect(await limiter.isBlocked("WALLET_E")).toBe(true); + }); + + it("unblock clears the block", async () => { + for (let i = 0; i < 5; i++) await limiter.consume("WALLET_F"); + await limiter.consume("WALLET_F"); + expect(await limiter.isBlocked("WALLET_F")).toBe(true); + + await limiter.unblock("WALLET_F"); + expect(await limiter.isBlocked("WALLET_F")).toBe(false); + }); + + it("reset clears all state", async () => { + for (let i = 0; i < 5; i++) await limiter.consume("WALLET_G"); + await limiter.consume("WALLET_G"); + await limiter.reset("WALLET_G"); + + expect(await limiter.isBlocked("WALLET_G")).toBe(false); + expect(await limiter.getViolationCount("WALLET_G")).toBe(0); + + const fresh = await limiter.consume("WALLET_G"); + expect(fresh.allowed).toBe(true); + }); +}); + +// --------------------------------------------------------------------------- +// SybilAnalysisService +// --------------------------------------------------------------------------- +describe("SybilAnalysisService", () => { + let redis; + let sybil; + + beforeEach(() => { + redis = new RedisMock(); + sybil = new SybilAnalysisService(redis, { flagThreshold: 3 }); + }); + + it("does not flag when below threshold", async () => { + const result = await sybil.evaluate("WALLET_1", 2); + expect(result.flagged).toBe(false); + }); + + it("flags when violations meet threshold", async () => { + const result = await sybil.evaluate("WALLET_2", 3, { + endpoint: "/api/cdn/token", + ip: "1.2.3.4", + }); + expect(result.flagged).toBe(true); + expect(await sybil.isFlagged("WALLET_2")).toBe(true); + }); + + it("stores detail metadata", async () => { + await sybil.evaluate("WALLET_3", 5, { + endpoint: "/api/test", + ip: "10.0.0.1", + }); + const detail = await sybil.getDetail("WALLET_3"); + expect(detail).not.toBeNull(); + expect(detail.violations).toBe("5"); + expect(detail.lastEndpoint).toBe("/api/test"); + expect(detail.lastIp).toBe("10.0.0.1"); + }); + + it("getTopFlagged returns ranked wallets", async () => { + await sybil.evaluate("w1", 3); + await sybil.evaluate("w2", 10); + await sybil.evaluate("w3", 5); + + const top = await sybil.getTopFlagged(2); + expect(top).toHaveLength(2); + expect(top[0].wallet).toBe("w2"); + expect(top[0].violations).toBe(10); + }); + + it("unflag removes the wallet from flagged set", async () => { + await sybil.evaluate("WALLET_4", 4); + expect(await sybil.isFlagged("WALLET_4")).toBe(true); + await sybil.unflag("WALLET_4"); + expect(await sybil.isFlagged("WALLET_4")).toBe(false); + }); +}); + +// --------------------------------------------------------------------------- +// extractWallet helper +// --------------------------------------------------------------------------- +describe("extractWallet", () => { + it("prefers req.user.address", () => { + const req = { + user: { address: "0xABC" }, + body: { walletAddress: "0xOther" }, + query: {}, + }; + expect(extractWallet(req)).toBe("0xABC"); + }); + + it("falls back to req.user.publicKey", () => { + const req = { user: { publicKey: "GPUBKEY" }, body: {}, query: {} }; + expect(extractWallet(req)).toBe("GPUBKEY"); + }); + + it("falls back to req.body.walletAddress", () => { + const req = { body: { walletAddress: "0xBody" }, query: {} }; + expect(extractWallet(req)).toBe("0xBody"); + }); + + it("falls back to query params", () => { + const req = { body: {}, query: { publicKey: "GQUERY" } }; + expect(extractWallet(req)).toBe("GQUERY"); + }); + + it("returns null when no wallet is present", () => { + const req = { body: {}, query: {} }; + expect(extractWallet(req)).toBeNull(); + }); +}); + +// --------------------------------------------------------------------------- +// createRateLimiter middleware +// --------------------------------------------------------------------------- +describe("createRateLimiter middleware", () => { + let redis; + let middleware; + + const mockRes = () => { + const res = { + _status: null, + _json: null, + _headers: {}, + status(code) { + res._status = code; + return res; + }, + json(body) { + res._json = body; + return res; + }, + set(key, value) { + res._headers[key] = value; + }, + }; + return res; + }; + + beforeEach(() => { + redis = new RedisMock(); + middleware = createRateLimiter({ + redis, + bucketCapacity: 3, + leakRatePerSecond: 1, + blockDurationSeconds: 10, + sybilThreshold: 2, + }); + }); + + it("throws if no redis client is provided", () => { + expect(() => createRateLimiter({})).toThrow("redis client"); + }); + + it("passes through when no wallet is identified", async () => { + const req = { body: {}, query: {} }; + const res = mockRes(); + const next = jest.fn(); + await middleware(req, res, next); + expect(next).toHaveBeenCalled(); + }); + + it("allows requests within the limit", async () => { + const req = { + user: { address: "0xFan" }, + body: {}, + query: {}, + originalUrl: "/api/test", + }; + const res = mockRes(); + const next = jest.fn(); + await middleware(req, res, next); + expect(next).toHaveBeenCalled(); + expect(res._headers["X-RateLimit-Limit"]).toBeDefined(); + }); + + it("returns 429 when rate limit is exceeded", async () => { + for (let i = 0; i < 3; i++) { + const req = { + user: { address: "0xBot" }, + body: {}, + query: {}, + originalUrl: "/api/x", + }; + await middleware(req, mockRes(), jest.fn()); + } + + const req = { + user: { address: "0xBot" }, + body: {}, + query: {}, + originalUrl: "/api/x", + }; + const res = mockRes(); + const next = jest.fn(); + await middleware(req, res, next); + + expect(next).not.toHaveBeenCalled(); + expect(res._status).toBe(429); + expect(res._json.error).toMatch(/Rate limit exceeded/); + expect(res._headers["Retry-After"]).toBeDefined(); + }); + + it("fails open when Redis throws", async () => { + const brokenRedis = { + get: () => { + throw new Error("connection refused"); + }, + eval: () => { + throw new Error("connection refused"); + }, + }; + + const mw = createRateLimiter({ redis: brokenRedis, bucketCapacity: 5 }); + const req = { + user: { address: "0xOops" }, + body: {}, + query: {}, + originalUrl: "/api/y", + }; + const res = mockRes(); + const next = jest.fn(); + await mw(req, res, next); + + // Should call next() instead of crashing. + expect(next).toHaveBeenCalled(); + }); +}); diff --git a/src/config/redis.js b/src/config/redis.js new file mode 100644 index 0000000..2892cd8 --- /dev/null +++ b/src/config/redis.js @@ -0,0 +1,65 @@ +const Redis = require("ioredis"); + +let redisClient = null; + +/** + * Create or return the singleton Redis client. + * + * Supports configuration via environment variables: + * REDIS_URL – full connection string (e.g. redis://user:pass@host:6379) + * REDIS_HOST – hostname (default 127.0.0.1) + * REDIS_PORT – port (default 6379) + * REDIS_PASSWORD – password (optional) + * REDIS_DB – database index (default 0) + * + * @param {object} [opts] Override options forwarded to ioredis. + * @returns {import('ioredis').Redis} + */ +function getRedisClient(opts = {}) { + if (redisClient) return redisClient; + + const url = process.env.REDIS_URL; + + if (url) { + redisClient = new Redis(url, { + maxRetriesPerRequest: 3, + retryStrategy: (times) => Math.min(times * 200, 5000), + ...opts, + }); + } else { + redisClient = new Redis({ + host: process.env.REDIS_HOST || "127.0.0.1", + port: Number(process.env.REDIS_PORT || 6379), + password: process.env.REDIS_PASSWORD || undefined, + db: Number(process.env.REDIS_DB || 0), + maxRetriesPerRequest: 3, + retryStrategy: (times) => Math.min(times * 200, 5000), + ...opts, + }); + } + + redisClient.on("error", (err) => { + console.error("[Redis] connection error:", err.message); + }); + + return redisClient; +} + +/** + * Gracefully close the Redis connection (e.g. during shutdown). + */ +async function closeRedisClient() { + if (redisClient) { + await redisClient.quit(); + redisClient = null; + } +} + +/** + * Replace the singleton – useful for injecting a mock in tests. + */ +function setRedisClient(client) { + redisClient = client; +} + +module.exports = { getRedisClient, closeRedisClient, setRedisClient }; diff --git a/src/services/leakyBucketRateLimiter.js b/src/services/leakyBucketRateLimiter.js new file mode 100644 index 0000000..02d3405 --- /dev/null +++ b/src/services/leakyBucketRateLimiter.js @@ -0,0 +1,187 @@ +/** + * Leaky Bucket Rate Limiter backed by Redis. + * + * The algorithm models a bucket that: + * - Has a fixed capacity (burst size). + * - Leaks at a constant rate (requests / second). + * - Each incoming request adds 1 unit to the bucket. + * - If the bucket is full the request is rejected. + * + * All state is stored in Redis so the limiter works across multiple + * server instances. + * + * Redis keys used per wallet: + * ratelimit:{wallet} – hash { level, lastDrip } + * ratelimit:blocked:{wallet} – string with TTL (temporary block) + */ + +const BUCKET_KEY_PREFIX = "ratelimit:"; +const BLOCK_KEY_PREFIX = "ratelimit:blocked:"; + +// Lua script executed atomically in Redis. +// Returns: [allowed (0|1), currentLevel, bucketCapacity] +const LEAKY_BUCKET_LUA = ` +local key = KEYS[1] +local capacity = tonumber(ARGV[1]) +local leakRate = tonumber(ARGV[2]) +local now = tonumber(ARGV[3]) + +local data = redis.call('HMGET', key, 'level', 'lastDrip') +local level = tonumber(data[1]) or 0 +local lastDrip = tonumber(data[2]) or now + +-- Leak tokens since the last request +local elapsed = now - lastDrip +local leaked = elapsed * leakRate +level = math.max(0, level - leaked) + +-- Try to add the new request +if level + 1 > capacity then + -- Bucket full – reject + redis.call('HMSET', key, 'level', level, 'lastDrip', now) + redis.call('EXPIRE', key, math.ceil(capacity / leakRate) + 60) + return {0, tostring(level), tostring(capacity)} +end + +level = level + 1 +redis.call('HMSET', key, 'level', level, 'lastDrip', now) +redis.call('EXPIRE', key, math.ceil(capacity / leakRate) + 60) +return {1, tostring(level), tostring(capacity)} +`; + +class LeakyBucketRateLimiter { + /** + * @param {import('ioredis').Redis} redisClient + * @param {object} [options] + * @param {number} [options.bucketCapacity=60] Max burst size. + * @param {number} [options.leakRatePerSecond=1] Tokens leaked per second. + * @param {number} [options.blockDurationSeconds=300] How long to block after overflow (5 min). + * @param {number} [options.sybilThreshold=3] Consecutive blocks before Sybil flagging. + */ + constructor(redisClient, options = {}) { + this.redis = redisClient; + this.bucketCapacity = options.bucketCapacity ?? 60; + this.leakRatePerSecond = options.leakRatePerSecond ?? 1; + this.blockDurationSeconds = options.blockDurationSeconds ?? 300; + this.sybilThreshold = options.sybilThreshold ?? 3; + } + + /** + * Consume one token for the given wallet. + * + * @param {string} wallet Wallet address (Stellar public key or Ethereum address). + * @returns {Promise<{allowed: boolean, blocked: boolean, currentLevel: number, + * capacity: number, retryAfterSeconds: number | null}>} + */ + async consume(wallet) { + const normalizedWallet = wallet.toLowerCase(); + const blockKey = `${BLOCK_KEY_PREFIX}${normalizedWallet}`; + + // 1. Check if wallet is already temporarily blocked. + const blocked = await this.redis.get(blockKey); + if (blocked) { + const ttl = await this.redis.ttl(blockKey); + return { + allowed: false, + blocked: true, + currentLevel: this.bucketCapacity, + capacity: this.bucketCapacity, + retryAfterSeconds: ttl > 0 ? ttl : this.blockDurationSeconds, + }; + } + + // 2. Run the atomic leaky-bucket script. + const bucketKey = `${BUCKET_KEY_PREFIX}${normalizedWallet}`; + const nowSeconds = Date.now() / 1000; + + const [allowed, levelStr, capacityStr] = await this.redis.eval( + LEAKY_BUCKET_LUA, + 1, + bucketKey, + this.bucketCapacity, + this.leakRatePerSecond, + nowSeconds, + ); + + const currentLevel = parseFloat(levelStr); + const capacity = parseFloat(capacityStr); + + if (allowed === 1) { + return { + allowed: true, + blocked: false, + currentLevel, + capacity, + retryAfterSeconds: null, + }; + } + + // 3. Bucket overflow – impose temporary block and increment violation counter. + await this.redis.set(blockKey, "1", "EX", this.blockDurationSeconds); + + const violationKey = `ratelimit:violations:${normalizedWallet}`; + const violations = await this.redis.incr(violationKey); + // Expire violations counter after 24 hours so old infractions don't persist forever. + await this.redis.expire(violationKey, 86400); + + return { + allowed: false, + blocked: true, + currentLevel, + capacity, + retryAfterSeconds: this.blockDurationSeconds, + violations, + }; + } + + /** + * Return the current violation count for a wallet. + * + * @param {string} wallet + * @returns {Promise} + */ + async getViolationCount(wallet) { + const count = await this.redis.get( + `ratelimit:violations:${wallet.toLowerCase()}`, + ); + return count ? parseInt(count, 10) : 0; + } + + /** + * Check whether a wallet is currently blocked. + * + * @param {string} wallet + * @returns {Promise} + */ + async isBlocked(wallet) { + const res = await this.redis.get( + `${BLOCK_KEY_PREFIX}${wallet.toLowerCase()}`, + ); + return res !== null; + } + + /** + * Manually unblock a wallet (admin action). + * + * @param {string} wallet + */ + async unblock(wallet) { + await this.redis.del(`${BLOCK_KEY_PREFIX}${wallet.toLowerCase()}`); + } + + /** + * Reset all rate-limit state for a wallet. + * + * @param {string} wallet + */ + async reset(wallet) { + const w = wallet.toLowerCase(); + await this.redis.del( + `${BUCKET_KEY_PREFIX}${w}`, + `${BLOCK_KEY_PREFIX}${w}`, + `ratelimit:violations:${w}`, + ); + } +} + +module.exports = { LeakyBucketRateLimiter }; diff --git a/src/services/sybilAnalysisService.js b/src/services/sybilAnalysisService.js new file mode 100644 index 0000000..c820bad --- /dev/null +++ b/src/services/sybilAnalysisService.js @@ -0,0 +1,129 @@ +/** + * Sybil Analysis Service + * + * Tracks wallets that repeatedly exceed rate limits and flags them for + * review. Flagged wallets are stored in a Redis sorted set keyed by + * violation count so operators can query the worst offenders first. + * + * Redis structures: + * sybil:flagged – sorted set (score = violation count, member = wallet) + * sybil:details:{wallet} – hash with detailed flag metadata + */ + +const FLAGGED_SET_KEY = "sybil:flagged"; +const DETAIL_KEY_PREFIX = "sybil:details:"; + +class SybilAnalysisService { + /** + * @param {import('ioredis').Redis} redisClient + * @param {object} [options] + * @param {number} [options.flagThreshold=3] Violations required for flagging. + * @param {number} [options.detailTtlSeconds=604800] How long detail records persist (7 days). + */ + constructor(redisClient, options = {}) { + this.redis = redisClient; + this.flagThreshold = options.flagThreshold ?? 3; + this.detailTtlSeconds = options.detailTtlSeconds ?? 604800; + } + + /** + * Evaluate a wallet after a rate-limit violation and flag it if the + * threshold has been reached. + * + * @param {string} wallet + * @param {number} violations Current cumulative violation count. + * @param {object} [meta] Extra metadata (IP, endpoint, etc.). + * @returns {Promise<{flagged: boolean, violations: number}>} + */ + async evaluate(wallet, violations, meta = {}) { + const normalizedWallet = wallet.toLowerCase(); + + if (violations < this.flagThreshold) { + return { flagged: false, violations }; + } + + // Add / update in the sorted set (score = violation count). + await this.redis.zadd(FLAGGED_SET_KEY, violations, normalizedWallet); + + // Store granular details for investigation. + const detailKey = `${DETAIL_KEY_PREFIX}${normalizedWallet}`; + await this.redis.hmset(detailKey, { + wallet: normalizedWallet, + violations: String(violations), + flaggedAt: new Date().toISOString(), + lastEndpoint: meta.endpoint || "", + lastIp: meta.ip || "", + reason: "Exceeded rate-limit violation threshold", + }); + await this.redis.expire(detailKey, this.detailTtlSeconds); + + console.warn( + `[SybilAnalysis] Wallet ${normalizedWallet} flagged – ${violations} violations`, + ); + + return { flagged: true, violations }; + } + + /** + * Check whether a wallet was previously flagged. + * + * @param {string} wallet + * @returns {Promise} + */ + async isFlagged(wallet) { + const score = await this.redis.zscore( + FLAGGED_SET_KEY, + wallet.toLowerCase(), + ); + return score !== null; + } + + /** + * Retrieve the detail record for a flagged wallet. + * + * @param {string} wallet + * @returns {Promise} + */ + async getDetail(wallet) { + const data = await this.redis.hgetall( + `${DETAIL_KEY_PREFIX}${wallet.toLowerCase()}`, + ); + return data && Object.keys(data).length > 0 ? data : null; + } + + /** + * Return the top N most-violated wallets. + * + * @param {number} [count=20] + * @returns {Promise>} + */ + async getTopFlagged(count = 20) { + const results = await this.redis.zrevrange( + FLAGGED_SET_KEY, + 0, + count - 1, + "WITHSCORES", + ); + const entries = []; + for (let i = 0; i < results.length; i += 2) { + entries.push({ + wallet: results[i], + violations: parseInt(results[i + 1], 10), + }); + } + return entries; + } + + /** + * Remove a wallet from the flagged set (admin clearance). + * + * @param {string} wallet + */ + async unflag(wallet) { + const w = wallet.toLowerCase(); + await this.redis.zrem(FLAGGED_SET_KEY, w); + await this.redis.del(`${DETAIL_KEY_PREFIX}${w}`); + } +} + +module.exports = { SybilAnalysisService };