From f94ae5b40a4fbb5080dca2851493791c28526b97 Mon Sep 17 00:00:00 2001 From: Rick Date: Tue, 13 Jan 2026 14:12:40 +0000 Subject: [PATCH 1/2] fix: properly stop event processing when indexer.stop() is called Previously, calling indexer.stop() would not properly stop the onEvent handlers from processing new blocks. This was because: 1. The WebSocket subscription was never unsubscribed 2. The WebSocket connection was never disconnected 3. In-flight event handlers could still run after stop() was called This fix: - Adds early return in subscription handler when indexer is stopped - Unsubscribes from newHeads subscription before closing connections - Gracefully handles "Invalid subscription id" errors (code 66) - Disconnects WebSocket connection to prevent reconnection attempts Closes #54 Co-Authored-By: Claude Opus 4.5 --- src/core/indexer.ts | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/src/core/indexer.ts b/src/core/indexer.ts index 441af23..0742d45 100644 --- a/src/core/indexer.ts +++ b/src/core/indexer.ts @@ -151,6 +151,11 @@ export class StarknetIndexer { this.newHeadsSubscription = await this.wsChannel.subscribeNewHeads(); this.newHeadsSubscription.on(async (data) => { + // Skip processing if indexer has been stopped + if (!this.started) { + return; + } + await this.withErrorHandling( 'Processing new head', async () => { @@ -552,6 +557,34 @@ export class StarknetIndexer { } this.stopProgressUiLoop(); + // Unsubscribe from new heads subscription to stop event processing + if (this.newHeadsSubscription) { + this.logger.info('[WebSocket] Unsubscribing from new heads...'); + try { + if (!this.newHeadsSubscription.isClosed) { + await this.newHeadsSubscription.unsubscribe(); + } + this.logger.info('[WebSocket] Successfully unsubscribed from new heads'); + } catch (error: any) { + // Error code 66 means "Invalid subscription id" - subscription already closed on server + const isAlreadyClosed = + error?.message?.includes('code":66') || error?.message?.includes('Invalid subscription'); + if (isAlreadyClosed) { + this.logger.debug('[WebSocket] Subscription already closed on server'); + } else { + this.logger.error('[WebSocket] Error unsubscribing from new heads:', error); + } + } + } + this.newHeadsSubscription = undefined; + + // Close WebSocket connection + if (this.wsChannel.isConnected()) { + this.logger.info('[WebSocket] Closing connection...'); + this.wsChannel.disconnect(); + this.logger.info('[WebSocket] Connection closed'); + } + // Close persistent database connection if (this.dbHandler.isConnected()) { this.logger.info('[Database] Closing connection...'); From c0ac621e3a1cac8dda16fbcaa6b05c94b03b6cd3 Mon Sep 17 00:00:00 2001 From: Rick Date: Mon, 19 Jan 2026 11:31:11 +0000 Subject: [PATCH 2/2] test: update tests to match fix --- package-lock.json | 10 ++++++++++ src/core/__tests__/indexer.test.ts | 14 ++++++++++++-- 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/package-lock.json b/package-lock.json index ee81ebf..2eea87d 100644 --- a/package-lock.json +++ b/package-lock.json @@ -83,6 +83,7 @@ "integrity": "sha512-UlLAnTPrFdNGoFtbSXwcGFQBtQZJCNjaN6hQNP3UPvuNXT1i82N26KL3dZeIpNalWywr9IuQuncaAfUaS1g6sQ==", "dev": true, "license": "MIT", + "peer": true, "dependencies": { "@ampproject/remapping": "^2.2.0", "@babel/code-frame": "^7.27.1", @@ -1630,6 +1631,7 @@ "integrity": "sha512-3MyiDfrfLeK06bi/g9DqJxP5pV74LNv4rFTyvGDmT3x2p1yp1lOd+qYZfiRPIOf/oON+WRZR5wxxuF85qOar+w==", "dev": true, "license": "MIT", + "peer": true, "dependencies": { "@typescript-eslint/scope-manager": "8.35.1", "@typescript-eslint/types": "8.35.1", @@ -1862,6 +1864,7 @@ "integrity": "sha512-NZyJarBfL7nWwIq+FDL6Zp/yHEhePMNnnJ0y3qfieCrmNvYct8uvtiV41UvlSe6apAfk0fY1FbWx+NwfmpvtTg==", "dev": true, "license": "MIT", + "peer": true, "bin": { "acorn": "bin/acorn" }, @@ -2237,6 +2240,7 @@ } ], "license": "MIT", + "peer": true, "dependencies": { "caniuse-lite": "^1.0.30001726", "electron-to-chromium": "^1.5.173", @@ -2965,6 +2969,7 @@ "deprecated": "This version is no longer supported. Please see https://eslint.org/version-support for other options.", "dev": true, "license": "MIT", + "peer": true, "dependencies": { "@eslint-community/eslint-utils": "^4.2.0", "@eslint-community/regexpp": "^4.6.1", @@ -3021,6 +3026,7 @@ "integrity": "sha512-zc1UmCpNltmVY34vuLRV61r1K27sWuX39E+uyUnY8xS2Bex88VV9cugG+UZbRSRGtGyFboj+D8JODyme1plMpw==", "dev": true, "license": "MIT", + "peer": true, "bin": { "eslint-config-prettier": "bin/cli.js" }, @@ -4200,6 +4206,7 @@ "integrity": "sha512-NIy3oAFp9shda19hy4HK0HRTWKtPJmGdnvywu01nOqNC2vZg+Z+fvJDxpMQA88eb2I9EcafcdjYgsDthnYTvGw==", "dev": true, "license": "MIT", + "peer": true, "dependencies": { "@jest/core": "^29.7.0", "@jest/types": "^29.6.3", @@ -6119,6 +6126,7 @@ "resolved": "https://registry.npmjs.org/pg/-/pg-8.16.3.tgz", "integrity": "sha512-enxc1h0jA/aq5oSDMvqyW3q89ra6XIIDZgCX9vkMrnz5DFTw/Ny3Li2lFQ+pt3L6MCgm/5o2o8HW9hiJji+xvw==", "license": "MIT", + "peer": true, "dependencies": { "pg-connection-string": "^2.9.1", "pg-pool": "^3.10.1", @@ -6395,6 +6403,7 @@ "integrity": "sha512-I7AIg5boAr5R0FFtJ6rCfD+LFsWHp81dolrFD8S79U9tb8Az2nGrJncnMSnys+bpQJfRUzqs9hnA81OAA3hCuQ==", "dev": true, "license": "MIT", + "peer": true, "bin": { "prettier": "bin/prettier.cjs" }, @@ -7459,6 +7468,7 @@ "integrity": "sha512-p1diW6TqL9L07nNxvRMM7hMMw4c5XOo/1ibL4aAIGmSAt9slTE1Xgw5KWuof2uTOvCg9BY7ZRi+GaF+7sfgPeQ==", "dev": true, "license": "Apache-2.0", + "peer": true, "bin": { "tsc": "bin/tsc", "tsserver": "bin/tsserver" diff --git a/src/core/__tests__/indexer.test.ts b/src/core/__tests__/indexer.test.ts index 90e1693..edca4e4 100644 --- a/src/core/__tests__/indexer.test.ts +++ b/src/core/__tests__/indexer.test.ts @@ -527,7 +527,12 @@ describe('StarknetIndexer', () => { isConnected: jest.fn().mockReturnValue(true), on: jest.fn(), waitForConnection: jest.fn().mockResolvedValue(undefined), - subscribeNewHeads: jest.fn().mockResolvedValue({ on: jest.fn() }), + subscribeNewHeads: jest.fn().mockResolvedValue({ + on: jest.fn(), + isClosed: false, + unsubscribe: jest.fn().mockResolvedValue(true), + }), + disconnect: jest.fn(), }; mockIndexer = new StarknetIndexer({ @@ -929,8 +934,13 @@ describe('StarknetIndexer', () => { mockWsChannel = { on: jest.fn(), waitForConnection: jest.fn().mockResolvedValue(undefined), - subscribeNewHeads: jest.fn().mockResolvedValue({ on: jest.fn() }), + subscribeNewHeads: jest.fn().mockResolvedValue({ + on: jest.fn(), + isClosed: false, + unsubscribe: jest.fn().mockResolvedValue(true), + }), isConnected: jest.fn().mockReturnValue(true), + disconnect: jest.fn(), }; mockIndexer = new StarknetIndexer({