Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 12 additions & 2 deletions src/core/__tests__/indexer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,12 @@ describe('StarknetIndexer', () => {
isConnected: jest.fn().mockReturnValue(true),
on: jest.fn(),
waitForConnection: jest.fn().mockResolvedValue(undefined),
subscribeNewHeads: jest.fn().mockResolvedValue({ on: jest.fn() }),
subscribeNewHeads: jest.fn().mockResolvedValue({
on: jest.fn(),
isClosed: false,
unsubscribe: jest.fn().mockResolvedValue(true),
}),
disconnect: jest.fn(),
};

mockIndexer = new StarknetIndexer({
Expand Down Expand Up @@ -929,8 +934,13 @@ describe('StarknetIndexer', () => {
mockWsChannel = {
on: jest.fn(),
waitForConnection: jest.fn().mockResolvedValue(undefined),
subscribeNewHeads: jest.fn().mockResolvedValue({ on: jest.fn() }),
subscribeNewHeads: jest.fn().mockResolvedValue({
on: jest.fn(),
isClosed: false,
unsubscribe: jest.fn().mockResolvedValue(true),
}),
isConnected: jest.fn().mockReturnValue(true),
disconnect: jest.fn(),
};

mockIndexer = new StarknetIndexer({
Expand Down
33 changes: 33 additions & 0 deletions src/core/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,11 @@ export class StarknetIndexer {
this.newHeadsSubscription = await this.wsChannel.subscribeNewHeads();

this.newHeadsSubscription.on(async (data) => {
// Skip processing if indexer has been stopped
if (!this.started) {
return;
}

await this.withErrorHandling(
'Processing new head',
async () => {
Expand Down Expand Up @@ -552,6 +557,34 @@ export class StarknetIndexer {
}
this.stopProgressUiLoop();

// Unsubscribe from new heads subscription to stop event processing
if (this.newHeadsSubscription) {
this.logger.info('[WebSocket] Unsubscribing from new heads...');
try {
if (!this.newHeadsSubscription.isClosed) {
await this.newHeadsSubscription.unsubscribe();
}
this.logger.info('[WebSocket] Successfully unsubscribed from new heads');
} catch (error: any) {
// Error code 66 means "Invalid subscription id" - subscription already closed on server
const isAlreadyClosed =
error?.message?.includes('code":66') || error?.message?.includes('Invalid subscription');
if (isAlreadyClosed) {
this.logger.debug('[WebSocket] Subscription already closed on server');
} else {
this.logger.error('[WebSocket] Error unsubscribing from new heads:', error);
}
}
}
this.newHeadsSubscription = undefined;

// Close WebSocket connection
if (this.wsChannel.isConnected()) {
this.logger.info('[WebSocket] Closing connection...');
this.wsChannel.disconnect();
this.logger.info('[WebSocket] Connection closed');
}

// Close persistent database connection
if (this.dbHandler.isConnected()) {
this.logger.info('[Database] Closing connection...');
Expand Down
Loading