Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/api-doc-config.generated.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"_generated": "Auto-generated by extract-jsdoc.js on 2026-06-08T08:29:02.679Z. Do not edit manually.",
"_generated": "Auto-generated by extract-jsdoc.js on 2026-06-08T08:31:38.004Z. Do not edit manually.",
"methods": {
"has": {
"summary": "HTTP verb for the endpoint (e.g. GET, POST). */",
Expand Down
22 changes: 17 additions & 5 deletions core/src/exchanges/gemini-titan/websocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,10 @@ export class GeminiWebSocket {
try {
const message = JSON.parse(data.toString());
this.handleMessage(message);
} catch {
// Ignore unparseable messages
} catch (error) {
logger.warn('[gemini-titan] failed to parse or handle message', {
error: error instanceof Error ? error.message : String(error),
});
}
});

Expand Down Expand Up @@ -212,7 +214,7 @@ export class GeminiWebSocket {
size: number,
sortOrder: 'asc' | 'desc',
): void {
const idx = levels.findIndex(l => Math.abs(l.price - price) < 0.0001);
const idx = levels.findIndex(l => l.price === price);

if (size === 0) {
if (idx !== -1) levels.splice(idx, 1);
Expand Down Expand Up @@ -267,7 +269,12 @@ export class GeminiWebSocket {

if (!this.isConnected) {
this.connect().catch((err: unknown) => {
logger.warn(`[gemini-titan] connect failed during watchOrderBook('${symbol}'): ${err instanceof Error ? err.message : String(err)}`);
logger.warn(`[gemini-titan] connect failed during watchOrderBook('${symbol}')`, {
error: err instanceof Error ? err.message : String(err),
});
if (!this.isTerminated) {
this.scheduleReconnect();
}
});
} else {
this.sendSubscribe([`${symbol}@depth20`]);
Expand Down Expand Up @@ -301,7 +308,12 @@ export class GeminiWebSocket {

if (!this.isConnected) {
this.connect().catch((err: unknown) => {
logger.warn(`[gemini-titan] connect failed during watchTrades('${symbol}'): ${err instanceof Error ? err.message : String(err)}`);
logger.warn(`[gemini-titan] connect failed during watchTrades('${symbol}')`, {
error: err instanceof Error ? err.message : String(err),
});
if (!this.isTerminated) {
this.scheduleReconnect();
}
});
} else {
this.sendSubscribe([`${symbol}@trade`]);
Expand Down
30 changes: 21 additions & 9 deletions core/src/exchanges/kalshi/websocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -480,13 +480,15 @@ export class KalshiWebSocket {
// The resolver will be fulfilled once the connection is (re)established
// and data arrives.
if (!this.isConnected) {
this.connect().catch(() => {
// Connection failed — scheduleReconnect is already queued via the
// close handler, so we intentionally swallow here. The pending
// resolver will be resolved when data arrives on the next connection.
this.connect().catch((err) => {
logger.warn("Kalshi WebSocket connect failed during subscribeToOrderbook", {
error: String(err),
});
if (!this.isTerminated) {
this.scheduleReconnect();
}
});
} else {
// Already connected — ensure subscription message is sent
this.subscribeToOrderbook([ticker]);
}

Expand Down Expand Up @@ -519,8 +521,13 @@ export class KalshiWebSocket {

// Attempt connection — if it fails, scheduleReconnect handles recovery.
if (!this.isConnected) {
this.connect().catch(() => {
// Swallow — scheduleReconnect will retry. Resolvers stay pending.
this.connect().catch((err) => {
logger.warn("Kalshi WebSocket connect failed during subscribeToOrderbooks", {
error: String(err),
});
if (!this.isTerminated) {
this.scheduleReconnect();
}
});
} else if (newTickers.length > 0) {
this.subscribeToOrderbook(newTickers);
Expand Down Expand Up @@ -565,8 +572,13 @@ export class KalshiWebSocket {
}

if (!this.isConnected) {
this.connect().catch(() => {
// Swallow — scheduleReconnect will retry. Resolvers stay pending.
this.connect().catch((err) => {
logger.warn("Kalshi WebSocket connect failed during subscribeToTrades", {
error: String(err),
});
if (!this.isTerminated) {
this.scheduleReconnect();
}
});
} else {
this.subscribeToTrades([ticker]);
Expand Down
40 changes: 23 additions & 17 deletions core/src/exchanges/opinion/websocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -242,10 +242,7 @@ export class OpinionWebSocket {
return;
}

const existing = this.orderBooks.get(marketId);
const book: OrderBook = existing
? { ...existing }
: { bids: [], asks: [], timestamp: Date.now() };
const book: OrderBook = this.orderBooks.get(marketId) ?? { bids: [], asks: [], timestamp: Date.now() };

if (side === "bids") {
book.bids = applyLevelUpdate(book.bids, price, size, "desc");
Expand Down Expand Up @@ -333,8 +330,11 @@ export class OpinionWebSocket {
}

if (!this.isConnected) {
this.connect().catch(() => {
// Swallow — scheduleReconnect will retry. Resolvers stay pending.
this.connect().catch((err) => {
logger.warn("Opinion WebSocket connect failed during watchOrderBook", { error: String(err) });
if (!this.isTerminated) {
this.scheduleReconnect();
}
});
} else {
this.sendSubscribe("market.depth.diff", marketId);
Expand Down Expand Up @@ -370,8 +370,11 @@ export class OpinionWebSocket {
}

if (!this.isConnected) {
this.connect().catch(() => {
// Swallow — scheduleReconnect will retry. Resolvers stay pending.
this.connect().catch((err) => {
logger.warn("Opinion WebSocket connect failed during watchTrades", { error: String(err) });
if (!this.isTerminated) {
this.scheduleReconnect();
}
});
} else {
this.sendSubscribe("market.last.trade", marketId);
Expand Down Expand Up @@ -457,29 +460,32 @@ export class OpinionWebSocket {

/**
* Apply an absolute-size level update to one side of the book.
* A size of 0 (or NaN) removes the level. Returns a new sorted array.
* A size of 0 (or NaN) removes the level. Mutates the provided array in place
* and keeps it sorted.
*/
function applyLevelUpdate(
levels: readonly OrderLevel[],
levels: OrderLevel[],
price: number,
size: number,
sortOrder: "asc" | "desc",
): OrderLevel[] {
const updated = levels.filter(
(l) => Math.abs(l.price - price) >= 0.0001,
);
const idx = levels.findIndex((l) => l.price === price);

if (idx !== -1) {
levels.splice(idx, 1);
}

if (!isNaN(size) && size > 0) {
updated.push({ price, size });
levels.push({ price, size });
}

if (sortOrder === "desc") {
updated.sort((a, b) => b.price - a.price);
levels.sort((a, b) => b.price - a.price);
} else {
updated.sort((a, b) => a.price - b.price);
levels.sort((a, b) => a.price - b.price);
}

return updated;
return levels;
}

/**
Expand Down
5 changes: 3 additions & 2 deletions core/src/exchanges/polymarket_us/websocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import type {
MarketBook,
} from 'polymarket-us';
import { OrderBook, QueuedPromise, Trade } from '../../types';
import { logger } from '../../utils/logger';
import { DEFAULT_WATCH_TIMEOUT_MS, withWatchTimeout } from '../../utils/watch-timeout';
import { fromAmount } from './price';
import { PolymarketUSNormalizer } from './normalizer';
Expand Down Expand Up @@ -124,8 +125,8 @@ export class PolymarketUSWebSocket {
if (this.socket) {
try {
this.socket.close();
} catch {
// Ignore close errors.
} catch (error) {
logger.debug('[polymarket_us] error during socket close', { error: String(error) });
}
this.socket = null;
}
Expand Down
9 changes: 6 additions & 3 deletions core/src/feeds/binance/binance-feed.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ export class BinanceFeed extends BaseDataFeed {

protected watchTickerImpl(symbol: string, callback: (ticker: Ticker) => void): () => void {
const sub: Subscription = { symbol, callback };
this.subscriptions = [...this.subscriptions, sub];
this.subscriptions.push(sub);
this.ensureConnected().catch((err: unknown) => {
logger.error('[BinanceFeed] initial connect failed in watchTickerImpl', {
error: err instanceof Error ? err.message : String(err),
Expand Down Expand Up @@ -213,7 +213,10 @@ export class BinanceFeed extends BaseDataFeed {
let msg: BinanceRelayMessage;
try {
msg = JSON.parse(text) as BinanceRelayMessage;
} catch {
} catch (error) {
logger.debug('[BinanceFeed] failed to parse relay message', {
error: error instanceof Error ? error.message : String(error),
});
return;
}

Expand All @@ -222,7 +225,7 @@ export class BinanceFeed extends BaseDataFeed {
const event = msg as BinanceRelayTradeEvent;
const ticker = normalizeTradeToTicker(event);

this.latestTickers = new Map(this.latestTickers).set(ticker.symbol, ticker);
this.latestTickers.set(ticker.symbol, ticker);

for (const sub of this.subscriptions) {
if (sub.symbol === ticker.symbol) {
Expand Down
7 changes: 5 additions & 2 deletions core/src/feeds/chainlink/chainlink-feed.ts
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ export class ChainlinkFeed extends BaseDataFeed {

protected watchTickerImpl(symbol: string, callback: (ticker: Ticker) => void): () => void {
const sub: Subscription = { symbol: symbol.toUpperCase(), callback };
this.subscriptions = [...this.subscriptions, sub];
this.subscriptions.push(sub);
this.ensureConnected().catch((err: unknown) => {
logger.error('[ChainlinkFeed] initial connect failed in watchTickerImpl', { error: err instanceof Error ? err.message : String(err) });
});
Expand Down Expand Up @@ -384,7 +384,10 @@ export class ChainlinkFeed extends BaseDataFeed {
let msg: ChainlinkWsMessage;
try {
msg = JSON.parse(text) as ChainlinkWsMessage;
} catch {
} catch (error) {
logger.debug('[ChainlinkFeed] failed to parse relay message', {
error: error instanceof Error ? error.message : String(error),
});
return;
}

Expand Down
Loading