Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 61 additions & 27 deletions core/src/exchanges/polymarket/websocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,14 @@ export interface PolymarketWebSocketConfig {
watchTimeoutMs?: number;
/** API credentials for the authenticated user channel (fills/orders). */
userChannelCreds?: PolymarketUserChannelCreds;
/** Timeout in ms for WebSocket connections to open (default: 30000). */
connectionTimeoutMs?: number;
}

const DEFAULT_CONNECTION_TIMEOUT_MS = 30_000;
const MAX_PENDING_TRADES_PER_ASSET = 1000;
const MAX_USER_CALLBACKS = 100;

const POLYMARKET_MARKET_WS_URL = 'wss://ws-subscriptions-clob.polymarket.com/ws/market';

/**
Expand Down Expand Up @@ -102,10 +108,8 @@ export class PolymarketWebSocket {

// Return a promise that resolves on the next orderbook update
const dataPromise = new Promise<OrderBook>((resolve, reject) => {
if (!this.orderBookResolvers.has(outcomeId)) {
this.orderBookResolvers.set(outcomeId, []);
}
this.orderBookResolvers.get(outcomeId)!.push({ resolve, reject });
const existing = this.orderBookResolvers.get(outcomeId) ?? [];
this.orderBookResolvers.set(outcomeId, [...existing, { resolve, reject }]);
});

return withWatchTimeout(
Expand Down Expand Up @@ -152,10 +156,8 @@ export class PolymarketWebSocket {

// Otherwise wait for the next trade
const dataPromise = new Promise<Trade[]>((resolve, reject) => {
if (!this.tradeResolvers.has(outcomeId)) {
this.tradeResolvers.set(outcomeId, []);
}
this.tradeResolvers.get(outcomeId)!.push({ resolve, reject });
const existing = this.tradeResolvers.get(outcomeId) ?? [];
this.tradeResolvers.set(outcomeId, [...existing, { resolve, reject }]);
});

return withWatchTimeout(
Expand Down Expand Up @@ -197,7 +199,14 @@ export class PolymarketWebSocket {
'User channel requires API credentials. Pass userChannelCreds in PolymarketWebSocketConfig.',
);
}
this.userCallbacks.push(callback);
if (!this.userCallbacks.includes(callback)) {
if (this.userCallbacks.length >= MAX_USER_CALLBACKS) {
throw new Error(
`Maximum user callback limit (${MAX_USER_CALLBACKS}) reached. Call unwatchUserFills() to clear existing callbacks.`,
);
}
this.userCallbacks = [...this.userCallbacks, callback];
}
this.userConditionIds = [...new Set([...this.userConditionIds, ...conditionIds])];

if (this.userWs) {
Expand All @@ -221,20 +230,37 @@ export class PolymarketWebSocket {
private async connectUserChannel(creds: PolymarketUserChannelCreds): Promise<void> {
const WebSocket = (await import('ws')).default;
const url = 'wss://ws-subscriptions-clob.polymarket.com/ws/user';
const timeoutMs = this.config.connectionTimeoutMs ?? DEFAULT_CONNECTION_TIMEOUT_MS;

this.userWs = new WebSocket(url);

this.userWs.on('open', () => {
logger.debug('[polymarket-ws] user channel connected');
this.sendUserSubscription(creds);
await new Promise<void>((resolve, reject) => {
const timeout = setTimeout(() => {
this.userWs?.terminate();
this.userWs = null;
reject(new Error(`Polymarket user channel connection timed out after ${timeoutMs}ms`));
}, timeoutMs);

this.userWs.on('open', () => {
clearTimeout(timeout);
logger.info('[polymarket-ws] user channel connected');
this.sendUserSubscription(creds);

// Ping every 10 seconds to keep the connection alive.
if (this.userPingInterval) clearInterval(this.userPingInterval);
this.userPingInterval = setInterval(() => {
if (this.userWs?.readyState === WebSocket.OPEN) {
this.userWs.ping();
}
}, 10_000);
resolve();
});

// Ping every 10 seconds to keep the connection alive.
if (this.userPingInterval) clearInterval(this.userPingInterval);
this.userPingInterval = setInterval(() => {
if (this.userWs?.readyState === WebSocket.OPEN) {
this.userWs.ping();
}
}, 10_000);
this.userWs.on('error', (err: Error) => {
clearTimeout(timeout);
logger.error('[polymarket-ws] user channel error', { error: err.message });
reject(err);
});
});

this.userWs.on('message', (raw: Buffer) => {
Expand All @@ -257,10 +283,6 @@ export class PolymarketWebSocket {
logger.warn('[polymarket-ws] user channel disconnected, reconnecting in 5s');
this.scheduleUserReconnect(creds);
});

this.userWs.on('error', (err: Error) => {
logger.error('[polymarket-ws] user channel error', { error: err.message });
});
}

private sendUserSubscription(creds: PolymarketUserChannelCreds): void {
Expand Down Expand Up @@ -324,11 +346,21 @@ export class PolymarketWebSocket {
private async ensureInitialized() {
if (this.initializationPromise) return this.initializationPromise;

const timeoutMs = this.config.connectionTimeoutMs ?? DEFAULT_CONNECTION_TIMEOUT_MS;

this.initializationPromise = new Promise<void>((resolve, reject) => {
const WebSocket = require('ws');
this.ws = new WebSocket(POLYMARKET_MARKET_WS_URL);

const timeout = setTimeout(() => {
this.ws?.terminate();
this.ws = null;
this.initializationPromise = undefined;
reject(new Error(`Polymarket market channel connection timed out after ${timeoutMs}ms`));
}, timeoutMs);

this.ws.on('open', () => {
clearTimeout(timeout);
resolve();
});

Expand All @@ -350,6 +382,7 @@ export class PolymarketWebSocket {
});

this.ws.on('error', (err: Error) => {
clearTimeout(timeout);
logger.error('[polymarket-ws] WebSocket error', { error: err.message });
reject(err);
});
Expand Down Expand Up @@ -444,10 +477,11 @@ export class PolymarketWebSocket {
resolvers.forEach((r) => r.resolve([trade]));
this.tradeResolvers.set(id, []);
} else {
if (!this.pendingTrades.has(id)) {
this.pendingTrades.set(id, []);
}
this.pendingTrades.get(id)!.push(trade);
const pending = this.pendingTrades.get(id) ?? [];
const updated = pending.length >= MAX_PENDING_TRADES_PER_ASSET
? [...pending.slice(1), trade]
: [...pending, trade];
this.pendingTrades.set(id, updated);
}
}

Expand Down
Loading