From cb545ca42775b01845b901cb3dccdf30491a0441 Mon Sep 17 00:00:00 2001 From: Priyanshubhartistm Date: Mon, 20 Apr 2026 22:56:45 +0530 Subject: [PATCH] fix(transport): correlate resources/updated notifications --- ...tr-server-transport.dedup-response.test.ts | 124 +++++++++++- src/transport/nostr-server-transport.ts | 178 +++++++++++++++++- .../nostr-server/correlation-store.ts | 13 ++ 3 files changed, 312 insertions(+), 3 deletions(-) diff --git a/src/transport/nostr-server-transport.dedup-response.test.ts b/src/transport/nostr-server-transport.dedup-response.test.ts index 4c8da3e..ccc5ac2 100644 --- a/src/transport/nostr-server-transport.dedup-response.test.ts +++ b/src/transport/nostr-server-transport.dedup-response.test.ts @@ -1,7 +1,11 @@ import { describe, it, expect } from 'bun:test'; import type { RelayHandler } from '../core/interfaces.js'; import type { NostrEvent } from 'nostr-tools'; -import type { JSONRPCResponse } from '@modelcontextprotocol/sdk/types.js'; +import type { + JSONRPCMessage, + JSONRPCRequest, + JSONRPCResponse, +} from '@modelcontextprotocol/sdk/types.js'; import { NostrServerTransport } from './nostr-server-transport.js'; import { PrivateKeySigner } from '../signer/private-key-signer.js'; import { EncryptionMode } from '../core/interfaces.js'; @@ -10,6 +14,7 @@ import { GIFT_WRAP_KIND, NOSTR_TAGS, } from '../core/constants.js'; +import { waitFor } from '../core/utils/test.utils.js'; function makeCountingRelayHandler(counter: { publishCalls: number; @@ -260,4 +265,121 @@ describe.serial('NostrServerTransport duplicate response prevention', () => { expect(session.hasSentCommonTags).toBe(true); }); + + it('delivers notifications/resources/updated only to correlated subscribed clients', async () => { + const publishedEvents: NostrEvent[] = []; + + const transport = new NostrServerTransport({ + signer: new PrivateKeySigner('1'.repeat(64)), + relayHandler: makeCapturingRelayHandler(publishedEvents), + encryptionMode: EncryptionMode.DISABLED, + }); + + const state = transport.getInternalStateForTesting(); + const clientA = 'a'.repeat(64); + const clientB = 'b'.repeat(64); + + const [sessionA] = state.sessionStore.getOrCreateSession(clientA, false); + const [sessionB] = state.sessionStore.getOrCreateSession(clientB, false); + sessionA.isInitialized = true; + sessionB.isInitialized = true; + + const subscribeEventA = 'e'.repeat(64); + const subscribeEventB = 'f'.repeat(64); + + const subscribeA: JSONRPCRequest = { + jsonrpc: '2.0', + id: 'sub-a', + method: 'resources/subscribe', + params: { uri: 'resource://alpha' }, + }; + const subscribeB: JSONRPCRequest = { + jsonrpc: '2.0', + id: 'sub-b', + method: 'resources/subscribe', + params: { uri: 'resource://beta' }, + }; + + transport['handleIncomingRequest'](subscribeEventA, subscribeA, clientA); + transport['handleIncomingRequest'](subscribeEventB, subscribeB, clientB); + + await transport.send({ + jsonrpc: '2.0', + id: subscribeEventA, + result: {}, + } as JSONRPCResponse); + await transport.send({ + jsonrpc: '2.0', + id: subscribeEventB, + result: {}, + } as JSONRPCResponse); + + publishedEvents.length = 0; + + const update: JSONRPCMessage = { + jsonrpc: '2.0', + method: 'notifications/resources/updated', + params: { uri: 'resource://alpha' }, + }; + + await transport.send(update); + + expect(publishedEvents).toHaveLength(1); + expect( + publishedEvents[0].tags.some( + (tag) => tag[0] === NOSTR_TAGS.PUBKEY && tag[1] === clientA, + ), + ).toBe(true); + expect( + publishedEvents[0].tags.some( + (tag) => tag[0] === NOSTR_TAGS.PUBKEY && tag[1] === clientB, + ), + ).toBe(false); + expect( + publishedEvents[0].tags.some( + (tag) => tag[0] === NOSTR_TAGS.EVENT_ID && tag[1] === subscribeEventA, + ), + ).toBe(true); + }); + + it('continues broadcasting non-correlated notifications to initialized sessions', async () => { + const publishedEvents: NostrEvent[] = []; + + const transport = new NostrServerTransport({ + signer: new PrivateKeySigner('1'.repeat(64)), + relayHandler: makeCapturingRelayHandler(publishedEvents), + encryptionMode: EncryptionMode.DISABLED, + }); + + const state = transport.getInternalStateForTesting(); + const clientA = 'a'.repeat(64); + const clientB = 'b'.repeat(64); + + const [sessionA] = state.sessionStore.getOrCreateSession(clientA, false); + const [sessionB] = state.sessionStore.getOrCreateSession(clientB, false); + sessionA.isInitialized = true; + sessionB.isInitialized = true; + + await transport.send({ + jsonrpc: '2.0', + method: 'notifications/custom', + params: { ok: true }, + }); + + await waitFor({ + produce: () => (publishedEvents.length >= 2 ? true : undefined), + timeoutMs: 2_000, + }); + + const recipients = new Set( + publishedEvents + .map((event) => + event.tags.find((tag) => tag[0] === NOSTR_TAGS.PUBKEY)?.[1], + ) + .filter((pubkey): pubkey is string => typeof pubkey === 'string'), + ); + + expect(recipients.has(clientA)).toBe(true); + expect(recipients.has(clientB)).toBe(true); + }); }); diff --git a/src/transport/nostr-server-transport.ts b/src/transport/nostr-server-transport.ts index eaaa72c..5dd0939 100644 --- a/src/transport/nostr-server-transport.ts +++ b/src/transport/nostr-server-transport.ts @@ -3,7 +3,10 @@ import { ListPromptsResultSchema, ListResourcesResultSchema, ListResourceTemplatesResultSchema, + ResourceUpdatedNotificationSchema, ListToolsResultSchema, + SubscribeRequestSchema, + UnsubscribeRequestSchema, isJSONRPCRequest, isJSONRPCNotification, type JSONRPCMessage, @@ -32,7 +35,10 @@ import { EncryptionMode, GiftWrapMode } from '../core/interfaces.js'; import { NostrEvent } from 'nostr-tools'; import { LogLevel } from '../core/utils/logger.js'; import { injectClientPubkey, withTimeout } from '../core/utils/utils.js'; -import { CorrelationStore } from './nostr-server/correlation-store.js'; +import { + CorrelationStore, + type EventRoute, +} from './nostr-server/correlation-store.js'; import { ClientSession, SessionStore } from './nostr-server/session-store.js'; import { LruCache } from '../core/utils/lru-cache.js'; import { ApplesauceRelayPool } from '../relay/applesauce-relay-pool.js'; @@ -158,6 +164,7 @@ export class NostrServerTransport private readonly sessionStore: SessionStore; private readonly correlationStore: CorrelationStore; + private readonly resourceSubscriptionsByUri: Map>; private readonly authorizationPolicy: AuthorizationPolicy; private readonly announcementManager: AnnouncementManager; private readonly injectClientPubkey: boolean; @@ -227,6 +234,8 @@ export class NostrServerTransport return; // Don't call onClientSessionEvicted for vetoed eviction } + this.removeResourceSubscriptionsForClient(clientPubkey); + if (this.onClientSessionEvicted) { Promise.resolve( this.onClientSessionEvicted({ clientPubkey, session }), @@ -251,6 +260,8 @@ export class NostrServerTransport }, }); + this.resourceSubscriptionsByUri = new Map>(); + // Initialize announcement manager this.announcementManager = new AnnouncementManager({ serverInfo: options.serverInfo, @@ -371,6 +382,7 @@ export class NostrServerTransport await this.disconnect(); this.sessionStore.clear(); this.correlationStore.clear(); + this.resourceSubscriptionsByUri.clear(); this.seenEventIds.clear(); this.oversizedReceiver.clear(); this.onclose?.(); @@ -543,15 +555,103 @@ export class NostrServerTransport // Register the event route in the correlation store const progressToken = request.params?._meta?.progressToken; + const parsedSubscribeRequest = SubscribeRequestSchema.safeParse(request); + const parsedUnsubscribeRequest = UnsubscribeRequestSchema.safeParse(request); + const resourceUri = + parsedSubscribeRequest.success || parsedUnsubscribeRequest.success + ? request.params?.uri + : undefined; + this.correlationStore.registerEventRoute( eventId, clientPubkey, originalRequestId, progressToken ? String(progressToken) : undefined, wrapKind, + request.method, + typeof resourceUri === 'string' ? resourceUri : undefined, + ); + } + + private registerResourceSubscription( + clientPubkey: string, + resourceUri: string, + correlatedEventId: string, + ): void { + let subscribers = this.resourceSubscriptionsByUri.get(resourceUri); + if (!subscribers) { + subscribers = new Map(); + this.resourceSubscriptionsByUri.set(resourceUri, subscribers); + } + + subscribers.set(clientPubkey, correlatedEventId); + } + + private unregisterResourceSubscription( + clientPubkey: string, + resourceUri: string, + ): void { + const subscribers = this.resourceSubscriptionsByUri.get(resourceUri); + if (!subscribers) { + return; + } + + subscribers.delete(clientPubkey); + if (subscribers.size === 0) { + this.resourceSubscriptionsByUri.delete(resourceUri); + } + } + + private removeResourceSubscriptionsForClient(clientPubkey: string): void { + for (const [resourceUri, subscribers] of this.resourceSubscriptionsByUri) { + subscribers.delete(clientPubkey); + if (subscribers.size === 0) { + this.resourceSubscriptionsByUri.delete(resourceUri); + } + } + } + + private getResourceSubscribers(resourceUri: string): Array<{ + clientPubkey: string; + correlatedEventId: string; + }> { + const subscribers = this.resourceSubscriptionsByUri.get(resourceUri); + if (!subscribers) { + return []; + } + + return Array.from( + subscribers, + ([clientPubkey, correlatedEventId]) => ({ + clientPubkey, + correlatedEventId, + }), ); } + private applyResourceSubscriptionResult( + route: EventRoute, + nostrEventId: string, + response: JSONRPCResponse | JSONRPCErrorResponse, + ): void { + if (!isJSONRPCResultResponse(response) || !route.resourceUri) { + return; + } + + if (route.requestMethod === 'resources/subscribe') { + this.registerResourceSubscription( + route.clientPubkey, + route.resourceUri, + nostrEventId, + ); + return; + } + + if (route.requestMethod === 'resources/unsubscribe') { + this.unregisterResourceSubscription(route.clientPubkey, route.resourceUri); + } + } + /** * Handles incoming notifications. * @param clientPubkey The client's public key. @@ -607,6 +707,8 @@ export class NostrServerTransport return; } + this.applyResourceSubscriptionResult(route, nostrEventId, response); + // Restore the original request ID in the response response.id = route.originalRequestId; @@ -676,7 +778,6 @@ export class NostrServerTransport ): Promise { try { // Special handling for progress notifications - // TODO: Add handling for `notifications/resources/updated`, as they need to be associated with an id if ( isJSONRPCNotification(notification) && notification.method === 'notifications/progress' && @@ -706,6 +807,79 @@ export class NostrServerTransport return; } + // `notifications/resources/updated` must be delivered only to correlated + // subscribers, never via the generic broadcast fan-out. + if ( + isJSONRPCNotification(notification) && + notification.method === 'notifications/resources/updated' + ) { + const parsedNotification = + ResourceUpdatedNotificationSchema.safeParse(notification); + + if (!parsedNotification.success) { + this.logger.warn('Invalid resources/updated notification payload', { + notification, + }); + return; + } + + const resourceUri = parsedNotification.data.params.uri; + const progressToken = parsedNotification.data.params._meta?.progressToken; + + if (progressToken !== undefined) { + const token = String(progressToken); + const nostrEventId = + this.correlationStore.getEventIdByProgressToken(token); + + if (nostrEventId) { + const route = this.correlationStore.getEventRoute(nostrEventId); + if (route) { + await this.sendNotification( + route.clientPubkey, + notification, + nostrEventId, + ); + return; + } + } + } + + const subscribers = this.getResourceSubscribers(resourceUri).filter( + ({ clientPubkey }) => { + const session = this.sessionStore.getSession(clientPubkey); + return !!session?.isInitialized; + }, + ); + + if (subscribers.length === 0) { + this.logger.debug( + 'Dropping resources/updated notification with no correlated subscribers', + { resourceUri }, + ); + return; + } + + await Promise.all( + subscribers.map(async ({ clientPubkey, correlatedEventId }) => { + try { + await this.sendNotification( + clientPubkey, + notification, + correlatedEventId, + ); + } catch (error) { + this.logger.error('Error sending resources/updated notification', { + error: error instanceof Error ? error.message : String(error), + clientPubkey, + resourceUri, + }); + } + }), + ); + + return; + } + // Use TaskQueue for outbound notification broadcasting to prevent event loop blocking for (const [ clientPubkey, diff --git a/src/transport/nostr-server/correlation-store.ts b/src/transport/nostr-server/correlation-store.ts index d4e3bac..0f514c7 100644 --- a/src/transport/nostr-server/correlation-store.ts +++ b/src/transport/nostr-server/correlation-store.ts @@ -18,6 +18,12 @@ export interface EventRoute { /** Optional progress token for this request */ progressToken?: string; + /** Optional original JSON-RPC method for request-specific post-processing. */ + requestMethod?: string; + + /** Optional resource URI for resources/subscribe and resources/unsubscribe requests. */ + resourceUri?: string; + /** * Optional gift wrap kind used for the correlated request. * @@ -85,6 +91,9 @@ export class CorrelationStore { * @param clientPubkey The client's public key * @param originalRequestId The original JSON-RPC request ID * @param progressToken Optional progress token for this request + * @param wrapKind Optional inbound gift-wrap kind for reply mirroring + * @param requestMethod Optional original JSON-RPC method + * @param resourceUri Optional resource URI for subscribe/unsubscribe tracking */ registerEventRoute( eventId: string, @@ -92,12 +101,16 @@ export class CorrelationStore { originalRequestId: string | number, progressToken?: string, wrapKind?: number, + requestMethod?: string, + resourceUri?: string, ): void { const route: EventRoute = { clientPubkey, originalRequestId, progressToken, wrapKind, + requestMethod, + resourceUri, }; this.eventRoutes.set(eventId, route);