Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 30 additions & 13 deletions core/src/exchanges/kalshi/websocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,15 @@ export class KalshiWebSocket {
private reconnectTimer?: NodeJS.Timeout;
private connectionPromise?: Promise<void>;
private isTerminated = false;
private static readonly CONNECTION_TIMEOUT_MS = 30_000;

constructor(auth: KalshiAuth, config: KalshiWebSocketConfig = {}) {
this.auth = auth;
this.config = config;
this.wsUrl = config.wsUrl!; // wsUrl must be provided by caller (from KalshiExchange)
if (!config.wsUrl) {
throw new Error('KalshiWebSocket: wsUrl is required in config');
}
this.wsUrl = config.wsUrl;
}

private async connect(): Promise<void> {
Expand Down Expand Up @@ -71,7 +75,21 @@ export class KalshiWebSocket {

this.ws = new WebSocket(this.wsUrl, { headers });

// Connection timeout: close the socket if not connected within 30s
const connectionTimeout = setTimeout(() => {
if (!this.isConnected && this.ws) {
logger.error("Kalshi WebSocket connection timed out", {
timeoutMs: KalshiWebSocket.CONNECTION_TIMEOUT_MS,
});
this.ws.close();
this.isConnecting = false;
this.connectionPromise = undefined;
reject(new Error(`Kalshi WebSocket connection timed out after ${KalshiWebSocket.CONNECTION_TIMEOUT_MS}ms`));
}
}, KalshiWebSocket.CONNECTION_TIMEOUT_MS);

this.ws.on("open", () => {
clearTimeout(connectionTimeout);
this.isConnected = true;
this.isConnecting = false;
this.connectionPromise = undefined;
Expand Down Expand Up @@ -100,13 +118,15 @@ export class KalshiWebSocket {
});

this.ws.on("error", (error: Error) => {
clearTimeout(connectionTimeout);
logger.error("Kalshi WebSocket error", { error: String(error) });
this.isConnecting = false;
this.connectionPromise = undefined;
reject(error);
});

this.ws.on("close", () => {
clearTimeout(connectionTimeout);
if (!this.isTerminated) {
logger.info("Kalshi WebSocket closed");
this.scheduleReconnect();
Expand Down Expand Up @@ -472,10 +492,9 @@ export class KalshiWebSocket {

// Return a promise that resolves on the next orderbook update
const dataPromise = new Promise<OrderBook>((resolve, reject) => {
if (!this.orderBookResolvers.has(ticker)) {
this.orderBookResolvers.set(ticker, []);
}
this.orderBookResolvers.get(ticker)!.push({ resolve, reject });
const resolvers = this.orderBookResolvers.get(ticker) ?? [];
resolvers.push({ resolve, reject });
this.orderBookResolvers.set(ticker, resolvers);
});

return withWatchTimeout(
Expand Down Expand Up @@ -511,15 +530,14 @@ export class KalshiWebSocket {
const dataPromise = Promise.all(
tickers.map((ticker) =>
new Promise<[string, OrderBook]>((resolve, reject) => {
if (!this.orderBookResolvers.has(ticker)) {
this.orderBookResolvers.set(ticker, []);
}
this.orderBookResolvers.get(ticker)!.push({
const resolvers = this.orderBookResolvers.get(ticker) ?? [];
resolvers.push({
resolve: (book: OrderBook | PromiseLike<OrderBook>) => {
Promise.resolve(book).then((b) => resolve([ticker, b]));
},
reject,
});
this.orderBookResolvers.set(ticker, resolvers);
}),
),
);
Expand Down Expand Up @@ -555,10 +573,9 @@ export class KalshiWebSocket {
}

const dataPromise = new Promise<Trade[]>((resolve, reject) => {
if (!this.tradeResolvers.has(ticker)) {
this.tradeResolvers.set(ticker, []);
}
this.tradeResolvers.get(ticker)!.push({ resolve, reject });
const resolvers = this.tradeResolvers.get(ticker) ?? [];
resolvers.push({ resolve, reject });
this.tradeResolvers.set(ticker, resolvers);
});

return withWatchTimeout(
Expand Down
Loading