From 9235171fbc4ed12115a7398ed734aa9d9db10250 Mon Sep 17 00:00:00 2001 From: "Samuel EF. Tinnerholm" Date: Sun, 24 May 2026 19:48:09 +0300 Subject: [PATCH] fix: polymarket websocket non-null guards + bounded buffers + connection timeouts Fixes #243 Fixes #245 Fixes #247 Fixes #334 Fixes #380 --- core/src/exchanges/polymarket/websocket.ts | 88 +++++++++++++++------- 1 file changed, 61 insertions(+), 27 deletions(-) diff --git a/core/src/exchanges/polymarket/websocket.ts b/core/src/exchanges/polymarket/websocket.ts index 6bb280e7..f81b75d1 100644 --- a/core/src/exchanges/polymarket/websocket.ts +++ b/core/src/exchanges/polymarket/websocket.ts @@ -61,8 +61,14 @@ export interface PolymarketWebSocketConfig { watchTimeoutMs?: number; /** API credentials for the authenticated user channel (fills/orders). */ userChannelCreds?: PolymarketUserChannelCreds; + /** Timeout in ms for WebSocket connections to open (default: 30000). */ + connectionTimeoutMs?: number; } +const DEFAULT_CONNECTION_TIMEOUT_MS = 30_000; +const MAX_PENDING_TRADES_PER_ASSET = 1000; +const MAX_USER_CALLBACKS = 100; + const POLYMARKET_MARKET_WS_URL = 'wss://ws-subscriptions-clob.polymarket.com/ws/market'; /** @@ -102,10 +108,8 @@ export class PolymarketWebSocket { // Return a promise that resolves on the next orderbook update const dataPromise = new Promise((resolve, reject) => { - if (!this.orderBookResolvers.has(outcomeId)) { - this.orderBookResolvers.set(outcomeId, []); - } - this.orderBookResolvers.get(outcomeId)!.push({ resolve, reject }); + const existing = this.orderBookResolvers.get(outcomeId) ?? []; + this.orderBookResolvers.set(outcomeId, [...existing, { resolve, reject }]); }); return withWatchTimeout( @@ -152,10 +156,8 @@ export class PolymarketWebSocket { // Otherwise wait for the next trade const dataPromise = new Promise((resolve, reject) => { - if (!this.tradeResolvers.has(outcomeId)) { - this.tradeResolvers.set(outcomeId, []); - } - this.tradeResolvers.get(outcomeId)!.push({ resolve, reject }); + const existing = this.tradeResolvers.get(outcomeId) ?? []; + this.tradeResolvers.set(outcomeId, [...existing, { resolve, reject }]); }); return withWatchTimeout( @@ -197,7 +199,14 @@ export class PolymarketWebSocket { 'User channel requires API credentials. Pass userChannelCreds in PolymarketWebSocketConfig.', ); } - this.userCallbacks.push(callback); + if (!this.userCallbacks.includes(callback)) { + if (this.userCallbacks.length >= MAX_USER_CALLBACKS) { + throw new Error( + `Maximum user callback limit (${MAX_USER_CALLBACKS}) reached. Call unwatchUserFills() to clear existing callbacks.`, + ); + } + this.userCallbacks = [...this.userCallbacks, callback]; + } this.userConditionIds = [...new Set([...this.userConditionIds, ...conditionIds])]; if (this.userWs) { @@ -221,20 +230,37 @@ export class PolymarketWebSocket { private async connectUserChannel(creds: PolymarketUserChannelCreds): Promise { const WebSocket = (await import('ws')).default; const url = 'wss://ws-subscriptions-clob.polymarket.com/ws/user'; + const timeoutMs = this.config.connectionTimeoutMs ?? DEFAULT_CONNECTION_TIMEOUT_MS; this.userWs = new WebSocket(url); - this.userWs.on('open', () => { - logger.debug('[polymarket-ws] user channel connected'); - this.sendUserSubscription(creds); + await new Promise((resolve, reject) => { + const timeout = setTimeout(() => { + this.userWs?.terminate(); + this.userWs = null; + reject(new Error(`Polymarket user channel connection timed out after ${timeoutMs}ms`)); + }, timeoutMs); + + this.userWs.on('open', () => { + clearTimeout(timeout); + logger.info('[polymarket-ws] user channel connected'); + this.sendUserSubscription(creds); + + // Ping every 10 seconds to keep the connection alive. + if (this.userPingInterval) clearInterval(this.userPingInterval); + this.userPingInterval = setInterval(() => { + if (this.userWs?.readyState === WebSocket.OPEN) { + this.userWs.ping(); + } + }, 10_000); + resolve(); + }); - // Ping every 10 seconds to keep the connection alive. - if (this.userPingInterval) clearInterval(this.userPingInterval); - this.userPingInterval = setInterval(() => { - if (this.userWs?.readyState === WebSocket.OPEN) { - this.userWs.ping(); - } - }, 10_000); + this.userWs.on('error', (err: Error) => { + clearTimeout(timeout); + logger.error('[polymarket-ws] user channel error', { error: err.message }); + reject(err); + }); }); this.userWs.on('message', (raw: Buffer) => { @@ -257,10 +283,6 @@ export class PolymarketWebSocket { logger.warn('[polymarket-ws] user channel disconnected, reconnecting in 5s'); this.scheduleUserReconnect(creds); }); - - this.userWs.on('error', (err: Error) => { - logger.error('[polymarket-ws] user channel error', { error: err.message }); - }); } private sendUserSubscription(creds: PolymarketUserChannelCreds): void { @@ -324,11 +346,21 @@ export class PolymarketWebSocket { private async ensureInitialized() { if (this.initializationPromise) return this.initializationPromise; + const timeoutMs = this.config.connectionTimeoutMs ?? DEFAULT_CONNECTION_TIMEOUT_MS; + this.initializationPromise = new Promise((resolve, reject) => { const WebSocket = require('ws'); this.ws = new WebSocket(POLYMARKET_MARKET_WS_URL); + const timeout = setTimeout(() => { + this.ws?.terminate(); + this.ws = null; + this.initializationPromise = undefined; + reject(new Error(`Polymarket market channel connection timed out after ${timeoutMs}ms`)); + }, timeoutMs); + this.ws.on('open', () => { + clearTimeout(timeout); resolve(); }); @@ -350,6 +382,7 @@ export class PolymarketWebSocket { }); this.ws.on('error', (err: Error) => { + clearTimeout(timeout); logger.error('[polymarket-ws] WebSocket error', { error: err.message }); reject(err); }); @@ -444,10 +477,11 @@ export class PolymarketWebSocket { resolvers.forEach((r) => r.resolve([trade])); this.tradeResolvers.set(id, []); } else { - if (!this.pendingTrades.has(id)) { - this.pendingTrades.set(id, []); - } - this.pendingTrades.get(id)!.push(trade); + const pending = this.pendingTrades.get(id) ?? []; + const updated = pending.length >= MAX_PENDING_TRADES_PER_ASSET + ? [...pending.slice(1), trade] + : [...pending, trade]; + this.pendingTrades.set(id, updated); } }