22//
33// All subscriptions go through `eth_subscribe`, even the Sentrix-native
44// channels (sentrix_finalized, sentrix_validatorSet, sentrix_tokenOps,
5- // sentrix_stakingOps, sentrix_jail). The chain dispatches them by channel
6- // name — there is no separate `sentrix_subscribe` method, common
7- // confusion source.
5+ // sentrix_stakingOps, sentrix_jail). The chain dispatches them by
6+ // channel name — there is no separate `sentrix_subscribe` method,
7+ // common confusion source.
8+ //
9+ // Recommended usage: instantiate `SubscriptionManager` once per process
10+ // + call `.subscribe()` repeatedly. The manager multiplexes every
11+ // subscription over one socket, sends keepalive pings every 30 s, and
12+ // transparently re-subscribes after reconnect. The single-shot
13+ // `subscribe(network, channel, opts)` helper is convenient for one-off
14+ // scripts but opens its own socket — avoid in production code that
15+ // listens to multiple channels.
816
917import WebSocket from "ws" ;
1018import { getSpec , type SentrixNetwork } from "../network.js" ;
@@ -22,6 +30,44 @@ export type Channel =
2230 | "sentrix_stakingOps"
2331 | "sentrix_jail" ;
2432
33+ // Discriminated payload type per channel. Consumers that opt into
34+ // `subscribeTyped<C>()` get a precise payload type; the original
35+ // untyped `subscribe()` path stays for back-compat with existing apps.
36+ export interface NewHeadsPayload {
37+ number : `0x${string } `;
38+ hash : `0x${string } `;
39+ parentHash : `0x${string } `;
40+ timestamp : `0x${string } `;
41+ miner : `0x${string } `;
42+ }
43+ export interface LogsPayload {
44+ address : `0x${string } `;
45+ topics : `0x${string } `[ ] ;
46+ data : `0x${string } `;
47+ blockNumber : `0x${string } `;
48+ transactionHash : `0x${string } `;
49+ logIndex : `0x${string } `;
50+ removed : boolean ;
51+ }
52+ export type SentrixFinalizedPayload = { height : number ; hash : `0x${string } ` } ;
53+ export type SentrixValidatorSetPayload = { epoch : number ; validators : `0x${string } `[ ] } ;
54+ // Native ops payloads stay loose for now — chain shape is still
55+ // stabilising and a precise type would lag the source. Apps cast at
56+ // the use site if they need a stricter shape.
57+ export type SentrixOpsPayload = Record < string , unknown > ;
58+
59+ export interface ChannelPayloadMap {
60+ newHeads : NewHeadsPayload ;
61+ logs : LogsPayload ;
62+ newPendingTransactions : `0x${string } `;
63+ syncing : boolean | { startingBlock : `0x${string } `; currentBlock : `0x${string } `; highestBlock : `0x${string } ` } ;
64+ sentrix_finalized : SentrixFinalizedPayload ;
65+ sentrix_validatorSet : SentrixValidatorSetPayload ;
66+ sentrix_tokenOps : SentrixOpsPayload ;
67+ sentrix_stakingOps : SentrixOpsPayload ;
68+ sentrix_jail : SentrixOpsPayload ;
69+ }
70+
2571export interface SubscribeOptions {
2672 /** Override the WS URL. */
2773 wsUrl ?: string ;
@@ -38,10 +84,11 @@ export interface Subscription {
3884 close ( ) : Promise < void > ;
3985}
4086
41- /** Open a single subscription on a fresh WS connection. Use this when
42- * you want one channel and don't mind a dedicated socket. For
43- * multi-channel usage, instantiate `SubscriptionManager` once and
44- * call `subscribe` repeatedly to share a single connection. */
87+ /** Open a single subscription on a fresh WS connection. Convenience for
88+ * one-off scripts. For multi-channel usage, instantiate
89+ * `SubscriptionManager` once and call `subscribe` repeatedly to share
90+ * a single connection — opening N sockets for N channels burns server
91+ * file descriptors and breaks the per-IP connection cap. */
4592export function subscribe (
4693 network : SentrixNetwork ,
4794 channel : Channel ,
@@ -60,19 +107,38 @@ interface InternalSub {
60107 channel : Channel ;
61108 serverId : string ;
62109 onMessage : ( payload : unknown , channel : Channel ) => void ;
110+ onError ?: ( err : Error ) => void ;
111+ filter ?: Record < string , unknown > ;
112+ }
113+
114+ interface PendingSubscribe {
115+ channel : Channel ;
116+ resolve : ( serverId : string ) => void ;
117+ reject : ( err : Error ) => void ;
63118}
64119
65120/** Multiplexes many subscriptions over one WebSocket. Reconnects with
66- * exponential backoff on close (1s → 2s → 4s → 8s → 16s, capped 30s);
67- * re-subscribes registered channels after each reconnect. */
121+ * exponential backoff on close (1s → 2s → 4s → 8s → 16s → 30s capped);
122+ * re-subscribes registered channels after each reconnect. Sends a
123+ * WebSocket ping frame every 30 s to keep middleboxes (NAT, ALB, Caddy)
124+ * from killing the connection during quiet periods. */
68125export class SubscriptionManager {
69126 private readonly wsUrl : string ;
70127 private ws : WebSocket | null = null ;
71128 private nextId = 1 ;
72129 private subs = new Map < string , InternalSub > ( ) ; // serverId → sub
73- private pending = new Map < number , ( id : string ) => void > ( ) ; // jsonrpc id → resolver
130+ private pending = new Map < number , PendingSubscribe > ( ) ; // jsonrpc id → callback pair
74131 private backoffMs = 1000 ;
75132 private closed = false ;
133+ private pingTimer : ReturnType < typeof setInterval > | null = null ;
134+ /** Last time a frame (subscribe-response, event, or pong) arrived.
135+ * Used by the keepalive interval to detect a half-open connection. */
136+ private lastFrameAt = 0 ;
137+ /** How long to wait between pings + how long without a frame before
138+ * we consider the socket dead and force a reconnect. Tunable per
139+ * environment via constructor. */
140+ private static readonly KEEPALIVE_INTERVAL_MS = 30_000 ;
141+ private static readonly STALE_TIMEOUT_MS = 90_000 ;
76142
77143 constructor ( network : SentrixNetwork , wsUrl ?: string ) {
78144 this . wsUrl = wsUrl ?? getSpec ( network ) . wsUrl ;
@@ -88,20 +154,26 @@ export class SubscriptionManager {
88154 if ( channel === "logs" && opts . filter ) params . push ( opts . filter ) ;
89155
90156 const serverId = await new Promise < string > ( ( resolve , reject ) => {
91- this . pending . set ( id , resolve ) ;
157+ this . pending . set ( id , { channel , resolve, reject } ) ;
92158 const payload = { jsonrpc : "2.0" , id, method : "eth_subscribe" , params } ;
93159 try {
94160 this . ws ! . send ( JSON . stringify ( payload ) ) ;
95161 } catch ( err ) {
96162 this . pending . delete ( id ) ;
97- reject ( err ) ;
163+ reject ( err as Error ) ;
98164 }
99165 setTimeout ( ( ) => {
100166 if ( this . pending . delete ( id ) ) reject ( new Error ( `subscribe ${ channel } timed out` ) ) ;
101167 } , 10_000 ) ;
102168 } ) ;
103169
104- this . subs . set ( serverId , { channel, serverId, onMessage : opts . onMessage } ) ;
170+ this . subs . set ( serverId , {
171+ channel,
172+ serverId,
173+ onMessage : opts . onMessage ,
174+ onError : opts . onError ,
175+ filter : opts . filter ,
176+ } ) ;
105177
106178 return {
107179 close : async ( ) => {
@@ -115,13 +187,51 @@ export class SubscriptionManager {
115187 } ;
116188 }
117189
190+ /** Typed alternative to `subscribe`. The payload type is derived
191+ * from the channel via `ChannelPayloadMap` — `subscribeTyped("newHeads", ...)`
192+ * gives `payload: NewHeadsPayload` instead of `unknown`. */
193+ subscribeTyped < C extends Channel > (
194+ channel : C ,
195+ opts : {
196+ filter ?: Record < string , unknown > ;
197+ onMessage : ( payload : ChannelPayloadMap [ C ] ) => void ;
198+ onError ?: ( err : Error ) => void ;
199+ } ,
200+ ) : Promise < Subscription > {
201+ return this . subscribe ( channel , {
202+ filter : opts . filter ,
203+ onMessage : ( p ) => opts . onMessage ( p as ChannelPayloadMap [ C ] ) ,
204+ onError : opts . onError ,
205+ } ) ;
206+ }
207+
118208 /** Close the underlying socket and stop reconnecting. */
119209 close ( ) : void {
120210 this . closed = true ;
211+ if ( this . pingTimer ) {
212+ clearInterval ( this . pingTimer ) ;
213+ this . pingTimer = null ;
214+ }
121215 this . ws ?. close ( ) ;
122216 this . ws = null ;
123217 }
124218
219+ /** Diagnostic snapshot — useful for ops dashboards / debug pages. */
220+ status ( ) : {
221+ socketState : "open" | "connecting" | "closed" ;
222+ subs : number ;
223+ secondsSinceLastFrame : number | null ;
224+ } {
225+ const sec =
226+ this . lastFrameAt === 0 ? null : Math . floor ( ( Date . now ( ) - this . lastFrameAt ) / 1000 ) ;
227+ let state : "open" | "connecting" | "closed" = "closed" ;
228+ if ( this . ws ) {
229+ if ( this . ws . readyState === WebSocket . OPEN ) state = "open" ;
230+ else if ( this . ws . readyState === WebSocket . CONNECTING ) state = "connecting" ;
231+ }
232+ return { socketState : state , subs : this . subs . size , secondsSinceLastFrame : sec } ;
233+ }
234+
125235 private ensureSocket ( onError ?: ( err : Error ) => void ) : Promise < void > {
126236 if ( this . ws && this . ws . readyState === WebSocket . OPEN ) return Promise . resolve ( ) ;
127237 return new Promise ( ( resolve , reject ) => {
@@ -130,60 +240,133 @@ export class SubscriptionManager {
130240
131241 ws . on ( "open" , ( ) => {
132242 this . backoffMs = 1000 ;
243+ this . lastFrameAt = Date . now ( ) ;
244+ this . startKeepalive ( ) ;
133245 resolve ( ) ;
134246 } ) ;
135247
136248 ws . on ( "message" , ( raw : WebSocket . RawData ) => {
137- const msg = JSON . parse ( raw . toString ( ) ) ;
138- // Subscribe-response (one-time): { id, result: "0x..." }
139- if ( typeof msg . id === "number" && typeof msg . result === "string" ) {
249+ this . lastFrameAt = Date . now ( ) ;
250+ let msg : { id ?: number ; result ?: string ; method ?: string ; params ?: { subscription ?: string ; result ?: unknown } ; error ?: { message : string } } ;
251+ try {
252+ msg = JSON . parse ( raw . toString ( ) ) ;
253+ } catch {
254+ // Malformed frame — drop. Can happen on edge buffer fragmentation.
255+ return ;
256+ }
257+ // Subscribe-response (one-time): { id, result: "0x..." } OR { id, error }
258+ if ( typeof msg . id === "number" && ( typeof msg . result === "string" || msg . error ) ) {
140259 const cb = this . pending . get ( msg . id ) ;
141- if ( cb ) {
142- this . pending . delete ( msg . id ) ;
143- cb ( msg . result ) ;
260+ if ( ! cb ) return ;
261+ this . pending . delete ( msg . id ) ;
262+ if ( msg . error ) {
263+ cb . reject ( new Error ( `eth_subscribe ${ cb . channel } : ${ msg . error . message } ` ) ) ;
264+ } else if ( typeof msg . result === "string" ) {
265+ cb . resolve ( msg . result ) ;
144266 }
145267 return ;
146268 }
147269 // Stream event: { method: "eth_subscription", params: { subscription, result } }
148270 if ( msg . method === "eth_subscription" ) {
149- const sid = msg . params ?. subscription as string | undefined ;
271+ const sid = msg . params ?. subscription ;
150272 if ( ! sid ) return ;
151273 const sub = this . subs . get ( sid ) ;
152- sub ?. onMessage ( msg . params . result , sub . channel ) ;
274+ sub ?. onMessage ( msg . params ! . result , sub . channel ) ;
153275 }
154276 } ) ;
155277
278+ // Pong frame from server keepalive ping. Resets the
279+ // last-frame timestamp so a long-quiet subscription (eg
280+ // sentrix_jail with no events for hours) doesn't trip the
281+ // stale-connection guard.
282+ ws . on ( "pong" , ( ) => {
283+ this . lastFrameAt = Date . now ( ) ;
284+ } ) ;
285+
156286 ws . on ( "error" , ( err ) => {
157287 if ( onError ) onError ( err ) ;
158- if ( this . pending . size > 0 ) {
159- for ( const r of this . pending . keys ( ) ) this . pending . delete ( r ) ;
160- reject ( err ) ;
288+ // Reject every pending subscribe — pre-fix only the first
289+ // pending caller saw the rejection, the rest hung until their
290+ // 10 s timeout fired one-by-one. Now they all get the same
291+ // surfaced error immediately.
292+ for ( const [ id , p ] of this . pending ) {
293+ this . pending . delete ( id ) ;
294+ p . reject ( err ) ;
161295 }
296+ // Surface to per-sub error handlers too.
297+ for ( const sub of this . subs . values ( ) ) sub . onError ?.( err ) ;
298+ reject ( err ) ;
162299 } ) ;
163300
164301 ws . on ( "close" , ( ) => {
302+ if ( this . pingTimer ) {
303+ clearInterval ( this . pingTimer ) ;
304+ this . pingTimer = null ;
305+ }
306+ // Reject any pending subscribes — same race fix as the error
307+ // path. Without this a subscribe in flight when the close
308+ // lands resolves never (the stream-event path can't fire on
309+ // a closed socket).
310+ for ( const [ id , p ] of this . pending ) {
311+ this . pending . delete ( id ) ;
312+ p . reject ( new Error ( "websocket closed before subscribe response" ) ) ;
313+ }
165314 if ( this . closed ) return ;
166315 // Reconnect with exponential backoff, then re-subscribe.
167316 const wait = Math . min ( this . backoffMs , 30_000 ) ;
168317 this . backoffMs = Math . min ( this . backoffMs * 2 , 30_000 ) ;
169318 setTimeout ( ( ) => {
170319 this . ensureSocket ( onError )
171320 . then ( ( ) => this . resubscribeAll ( onError ) )
172- . catch ( ( ) => { } ) ;
321+ . catch ( ( ) => {
322+ /* will retry via the next close event */
323+ } ) ;
173324 } , wait ) ;
174325 } ) ;
175326 } ) ;
176327 }
177328
329+ /** Send a WebSocket ping frame every KEEPALIVE_INTERVAL_MS. If the
330+ * socket has gone STALE_TIMEOUT_MS without any frame, force a close
331+ * (which triggers the reconnect path). Middleboxes — Caddy
332+ * reverse_proxy idle_timeout, NAT, AWS ALB — drop quiet
333+ * connections at 60–120 s; the ping keeps them open. */
334+ private startKeepalive ( ) : void {
335+ if ( this . pingTimer ) clearInterval ( this . pingTimer ) ;
336+ this . pingTimer = setInterval ( ( ) => {
337+ if ( ! this . ws || this . ws . readyState !== WebSocket . OPEN ) return ;
338+ const idle = Date . now ( ) - this . lastFrameAt ;
339+ if ( idle > SubscriptionManager . STALE_TIMEOUT_MS ) {
340+ // Half-open — server stopped pong'ing. Force close so the
341+ // close handler fires and reconnects.
342+ try {
343+ this . ws . terminate ( ) ;
344+ } catch {
345+ /* ignore */
346+ }
347+ return ;
348+ }
349+ try {
350+ this . ws . ping ( ) ;
351+ } catch {
352+ /* socket may have closed mid-call; close handler will recover */
353+ }
354+ } , SubscriptionManager . KEEPALIVE_INTERVAL_MS ) ;
355+ }
356+
178357 private async resubscribeAll ( onError ?: ( err : Error ) => void ) : Promise < void > {
179358 if ( ! this . ws || this . ws . readyState !== WebSocket . OPEN ) return ;
180359 const stale = Array . from ( this . subs . values ( ) ) ;
181360 this . subs . clear ( ) ;
182361 for ( const old of stale ) {
183362 try {
184- await this . subscribe ( old . channel , { onMessage : old . onMessage , onError } ) ;
363+ await this . subscribe ( old . channel , {
364+ filter : old . filter ,
365+ onMessage : old . onMessage ,
366+ onError : old . onError ?? onError ,
367+ } ) ;
185368 } catch ( err ) {
186- onError ?.( err as Error ) ;
369+ ( old . onError ?? onError ) ?.( err as Error ) ;
187370 }
188371 }
189372 }
0 commit comments