diff --git a/src/core/constants.ts b/src/core/constants.ts index bd6d450..3cb139e 100644 --- a/src/core/constants.ts +++ b/src/core/constants.ts @@ -134,3 +134,7 @@ export const announcementMethods: AnnouncementMethods = { export const INITIALIZE_METHOD = 'initialize'; export const NOTIFICATIONS_INITIALIZED_METHOD = 'notifications/initialized'; +export const NOTIFICATIONS_RESOURCES_UPDATED_METHOD = + 'notifications/resources/updated'; +export const RESOURCES_SUBSCRIBE_METHOD = 'resources/subscribe'; +export const RESOURCES_UNSUBSCRIBE_METHOD = 'resources/unsubscribe'; diff --git a/src/transport/nostr-server-transport.ts b/src/transport/nostr-server-transport.ts index bfac1fc..3684b36 100644 --- a/src/transport/nostr-server-transport.ts +++ b/src/transport/nostr-server-transport.ts @@ -25,6 +25,9 @@ import { GIFT_WRAP_KIND, NOSTR_TAGS, NOTIFICATIONS_INITIALIZED_METHOD, + NOTIFICATIONS_RESOURCES_UPDATED_METHOD, + RESOURCES_SUBSCRIBE_METHOD, + RESOURCES_UNSUBSCRIBE_METHOD, decryptMessage, DEFAULT_LRU_SIZE, } from '../core/index.js'; @@ -182,6 +185,12 @@ export class NostrServerTransport /** Receives inbound oversized-transfer frames from clients (client→server requests). */ private readonly oversizedReceiver: OversizedTransferReceiver; + /** + * Tracks which clients have subscribed to which resource URIs. + * Maps resource URI → set of subscribed client public keys. + */ + private readonly resourceSubscriptions = new Map>(); + // Oversized-transfer sender settings (for server→client responses) private readonly oversizedEnabled: boolean; private readonly oversizedThreshold: number; @@ -213,6 +222,7 @@ export class NostrServerTransport // Clean up all correlation data for evicted session const removedCount = this.correlationStore.removeRoutesForClient(clientPubkey); + this.removeResourceSubscriptionsForClient(clientPubkey); this.logger.info( `Evicted session for ${clientPubkey} (removed ${removedCount} routes)`, ); @@ -379,6 +389,7 @@ export class NostrServerTransport this.correlationStore.clear(); this.seenEventIds.clear(); this.oversizedReceiver.clear(); + this.resourceSubscriptions.clear(); this.onclose?.(); } catch (error) { this.onerror?.(error instanceof Error ? error : new Error(String(error))); @@ -386,6 +397,43 @@ export class NostrServerTransport } } + /** + * Records a client's subscription to a resource URI. + */ + private addResourceSubscription(uri: string, clientPubkey: string): void { + let subscribers = this.resourceSubscriptions.get(uri); + if (!subscribers) { + subscribers = new Set(); + this.resourceSubscriptions.set(uri, subscribers); + } + subscribers.add(clientPubkey); + } + + /** + * Removes a client's subscription to a resource URI. + */ + private removeResourceSubscription(uri: string, clientPubkey: string): void { + const subscribers = this.resourceSubscriptions.get(uri); + if (subscribers) { + subscribers.delete(clientPubkey); + if (subscribers.size === 0) { + this.resourceSubscriptions.delete(uri); + } + } + } + + /** + * Removes all resource subscriptions for a given client (used on session eviction). + */ + private removeResourceSubscriptionsForClient(clientPubkey: string): void { + for (const [uri, subscribers] of this.resourceSubscriptions) { + subscribers.delete(clientPubkey); + if (subscribers.size === 0) { + this.resourceSubscriptions.delete(uri); + } + } + } + /** * Sends JSON-RPC messages over the Nostr transport. * @param message The JSON-RPC message to send. @@ -556,6 +604,15 @@ export class NostrServerTransport progressToken ? String(progressToken) : undefined, wrapKind, ); + + const uri = request.params?.uri; + if (typeof uri === 'string') { + if (request.method === RESOURCES_SUBSCRIBE_METHOD) { + this.addResourceSubscription(uri, clientPubkey); + } else if (request.method === RESOURCES_UNSUBSCRIBE_METHOD) { + this.removeResourceSubscription(uri, clientPubkey); + } + } } /** @@ -681,13 +738,13 @@ export class NostrServerTransport notification: JSONRPCMessage, ): Promise { try { - // Special handling for progress notifications - // TODO: Add handling for `notifications/resources/updated`, as they need to be associated with an id - if ( - isJSONRPCNotification(notification) && + if (!isJSONRPCNotification(notification)) { + // Broadcast path below handles non-notification edge cases. + } else if ( notification.method === 'notifications/progress' && notification.params?.progressToken ) { + // Targeted routing for progress notifications const token = String(notification.params.progressToken); // Use O(1) lookup for progress token routing @@ -710,6 +767,35 @@ export class NostrServerTransport this.logger.error('Progress token not found', { token }); this.onerror?.(error); return; + } else if ( + notification.method === NOTIFICATIONS_RESOURCES_UPDATED_METHOD + ) { + const uri = notification.params?.uri; + if (typeof uri === 'string') { + const subscribers = this.resourceSubscriptions.get(uri); + if (subscribers && subscribers.size > 0) { + for (const clientPubkey of subscribers) { + const session = this.sessionStore.getSession(clientPubkey); + if (session?.isInitialized) { + this.taskQueue.add(async () => { + try { + await this.sendNotification(clientPubkey, notification); + } catch (error) { + this.logger.error('Error sending resource update', { + error: + error instanceof Error + ? error.message + : String(error), + clientPubkey, + uri, + }); + } + }); + } + } + } + return; + } } // Use TaskQueue for outbound notification broadcasting to prevent event loop blocking