Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions core/src/exchanges/limitless/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -309,14 +309,20 @@ export class LimitlessClient {
* Cancel a specific order by ID.
*/
async cancelOrder(orderId: string) {
return await this.orderClient!.cancel(orderId);
if (!this.orderClient) {
throw new Error('[limitless] Order client not initialized -- trading credentials required');
}
return await this.orderClient.cancel(orderId);
}

/**
* Cancel all orders for a specific market.
*/
async cancelAllOrders(marketSlug: string) {
return await this.orderClient!.cancelAll(marketSlug);
if (!this.orderClient) {
throw new Error('[limitless] Order client not initialized -- trading credentials required');
}
return await this.orderClient.cancelAll(marketSlug);
}

/**
Expand Down Expand Up @@ -385,7 +391,10 @@ export class LimitlessClient {
});
const contract = new Contract(USDC_ADDRESS, ABI, provider);

const balance = await contract.balanceOf(this.signer!.address);
if (!this.signer) {
throw new Error('[limitless] Signer not initialized -- wallet private key required');
}
const balance = await contract.balanceOf(this.signer.address);
const decimals = await contract.decimals(); // Should be 6

return parseFloat(utils.formatUnits(balance, decimals));
Expand Down
6 changes: 4 additions & 2 deletions core/src/exchanges/limitless/normalizer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,12 @@ export class LimitlessNormalizer implements IExchangeNormalizer<LimitlessRawMark
}).sort((a, b) => a.timestamp - b.timestamp);

if (params.start) {
candles = candles.filter((c) => c.timestamp >= params.start!.getTime());
const start = params.start;
candles = candles.filter((c) => c.timestamp >= start.getTime());
}
if (params.end) {
candles = candles.filter((c) => c.timestamp <= params.end!.getTime());
const end = params.end;
candles = candles.filter((c) => c.timestamp <= end.getTime());
}
if (params.limit) {
candles = candles.slice(0, params.limit);
Expand Down
45 changes: 37 additions & 8 deletions core/src/exchanges/limitless/websocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ export class LimitlessWebSocket {
// 1. If we have buffered data, return it immediately
const buffer = this.orderbookBuffers.get(marketSlug);
if (buffer && buffer.length > 0) {
return buffer.shift()!;
const entry = buffer.shift();
if (entry) return entry;
}

// 2. Special case: If this is the FIRST call for this market and we have no data,
Expand Down Expand Up @@ -152,15 +153,31 @@ export class LimitlessWebSocket {

// Wait for WebSocket update with timeout
try {
const resolverEntry: { resolve: (ob: OrderBook) => void; reject: (err: any) => void } = {
resolve: () => {},
reject: () => {},
};

const wsUpdatePromise = new Promise<OrderBook>((resolve, reject) => {
resolverEntry.resolve = resolve;
resolverEntry.reject = reject;
if (!this.orderbookResolvers.has(marketSlug)) {
this.orderbookResolvers.set(marketSlug, []);
}
this.orderbookResolvers.get(marketSlug)!.push({ resolve, reject });
const resolvers = this.orderbookResolvers.get(marketSlug);
if (resolvers) {
resolvers.push(resolverEntry);
}
});

const timeoutPromise = new Promise<OrderBook>((resolve) => {
setTimeout(async () => {
// Timeout won the race -- remove the stale resolver (#372)
const resolvers = this.orderbookResolvers.get(marketSlug);
if (resolvers) {
const idx = resolvers.indexOf(resolverEntry);
if (idx !== -1) resolvers.splice(idx, 1);
}
// Timeout: fetch REST snapshot as fallback
try {
this.lastOrderbookTimestamps.set(marketSlug, Date.now());
Expand Down Expand Up @@ -278,6 +295,14 @@ export class LimitlessWebSocket {
async close(): Promise<void> {
this.orderbookCallbacks.clear();
this.priceCallbacks.clear();
// Reject any pending resolvers before clearing (#372)
for (const [, resolvers] of this.orderbookResolvers) {
for (const resolver of resolvers) {
resolver.reject(new Error('WebSocket closed'));
}
}
this.orderbookResolvers.clear();
this.orderbookBuffers.clear();
await this.client.disconnect();
this.watcher.close();
}
Expand Down Expand Up @@ -315,17 +340,21 @@ export class LimitlessWebSocket {
const resolvers = this.orderbookResolvers.get(marketSlug) || [];
if (resolvers.length > 0) {
// If someone is waiting, give it to them immediately
const resolver = resolvers.shift()!;
resolver.resolve(pmxtOrderbook);
const resolver = resolvers.shift();
if (resolver) {
resolver.resolve(pmxtOrderbook);
}
} else {
// Otherwise, buffer it for the next call
if (!this.orderbookBuffers.has(marketSlug)) {
this.orderbookBuffers.set(marketSlug, []);
}
const buffer = this.orderbookBuffers.get(marketSlug)!;
buffer.push(pmxtOrderbook);
// Keep buffer size reasonable
if (buffer.length > 100) buffer.shift();
const buffer = this.orderbookBuffers.get(marketSlug);
if (buffer) {
buffer.push(pmxtOrderbook);
// Keep buffer size reasonable
if (buffer.length > 100) buffer.shift();
}
}
});

Expand Down
Loading