Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/core/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,3 +134,7 @@ export const announcementMethods: AnnouncementMethods = {

export const INITIALIZE_METHOD = 'initialize';
export const NOTIFICATIONS_INITIALIZED_METHOD = 'notifications/initialized';
export const NOTIFICATIONS_RESOURCES_UPDATED_METHOD =
'notifications/resources/updated';
export const RESOURCES_SUBSCRIBE_METHOD = 'resources/subscribe';
export const RESOURCES_UNSUBSCRIBE_METHOD = 'resources/unsubscribe';
94 changes: 90 additions & 4 deletions src/transport/nostr-server-transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ import {
GIFT_WRAP_KIND,
NOSTR_TAGS,
NOTIFICATIONS_INITIALIZED_METHOD,
NOTIFICATIONS_RESOURCES_UPDATED_METHOD,
RESOURCES_SUBSCRIBE_METHOD,
RESOURCES_UNSUBSCRIBE_METHOD,
decryptMessage,
DEFAULT_LRU_SIZE,
} from '../core/index.js';
Expand Down Expand Up @@ -182,6 +185,12 @@ export class NostrServerTransport
/** Receives inbound oversized-transfer frames from clients (client→server requests). */
private readonly oversizedReceiver: OversizedTransferReceiver;

/**
* Tracks which clients have subscribed to which resource URIs.
* Maps resource URI → set of subscribed client public keys.
*/
private readonly resourceSubscriptions = new Map<string, Set<string>>();

// Oversized-transfer sender settings (for server→client responses)
private readonly oversizedEnabled: boolean;
private readonly oversizedThreshold: number;
Expand Down Expand Up @@ -213,6 +222,7 @@ export class NostrServerTransport
// Clean up all correlation data for evicted session
const removedCount =
this.correlationStore.removeRoutesForClient(clientPubkey);
this.removeResourceSubscriptionsForClient(clientPubkey);
this.logger.info(
`Evicted session for ${clientPubkey} (removed ${removedCount} routes)`,
);
Expand Down Expand Up @@ -379,13 +389,51 @@ export class NostrServerTransport
this.correlationStore.clear();
this.seenEventIds.clear();
this.oversizedReceiver.clear();
this.resourceSubscriptions.clear();
this.onclose?.();
} catch (error) {
this.onerror?.(error instanceof Error ? error : new Error(String(error)));
this.logAndRethrowError('Error closing NostrServerTransport', error);
}
}

/**
* Records a client's subscription to a resource URI.
*/
private addResourceSubscription(uri: string, clientPubkey: string): void {
let subscribers = this.resourceSubscriptions.get(uri);
if (!subscribers) {
subscribers = new Set();
this.resourceSubscriptions.set(uri, subscribers);
}
subscribers.add(clientPubkey);
}

/**
* Removes a client's subscription to a resource URI.
*/
private removeResourceSubscription(uri: string, clientPubkey: string): void {
const subscribers = this.resourceSubscriptions.get(uri);
if (subscribers) {
subscribers.delete(clientPubkey);
if (subscribers.size === 0) {
this.resourceSubscriptions.delete(uri);
}
}
}

/**
* Removes all resource subscriptions for a given client (used on session eviction).
*/
private removeResourceSubscriptionsForClient(clientPubkey: string): void {
for (const [uri, subscribers] of this.resourceSubscriptions) {
subscribers.delete(clientPubkey);
if (subscribers.size === 0) {
this.resourceSubscriptions.delete(uri);
}
}
}

/**
* Sends JSON-RPC messages over the Nostr transport.
* @param message The JSON-RPC message to send.
Expand Down Expand Up @@ -556,6 +604,15 @@ export class NostrServerTransport
progressToken ? String(progressToken) : undefined,
wrapKind,
);

const uri = request.params?.uri;
if (typeof uri === 'string') {
if (request.method === RESOURCES_SUBSCRIBE_METHOD) {
this.addResourceSubscription(uri, clientPubkey);
} else if (request.method === RESOURCES_UNSUBSCRIBE_METHOD) {
this.removeResourceSubscription(uri, clientPubkey);
}
}
}

/**
Expand Down Expand Up @@ -681,13 +738,13 @@ export class NostrServerTransport
notification: JSONRPCMessage,
): Promise<void> {
try {
// Special handling for progress notifications
// TODO: Add handling for `notifications/resources/updated`, as they need to be associated with an id
if (
isJSONRPCNotification(notification) &&
if (!isJSONRPCNotification(notification)) {
// Broadcast path below handles non-notification edge cases.
} else if (
notification.method === 'notifications/progress' &&
notification.params?.progressToken
) {
// Targeted routing for progress notifications
const token = String(notification.params.progressToken);

// Use O(1) lookup for progress token routing
Expand All @@ -710,6 +767,35 @@ export class NostrServerTransport
this.logger.error('Progress token not found', { token });
this.onerror?.(error);
return;
} else if (
notification.method === NOTIFICATIONS_RESOURCES_UPDATED_METHOD
) {
const uri = notification.params?.uri;
if (typeof uri === 'string') {
const subscribers = this.resourceSubscriptions.get(uri);
if (subscribers && subscribers.size > 0) {
for (const clientPubkey of subscribers) {
const session = this.sessionStore.getSession(clientPubkey);
if (session?.isInitialized) {
this.taskQueue.add(async () => {
try {
await this.sendNotification(clientPubkey, notification);
} catch (error) {
this.logger.error('Error sending resource update', {
error:
error instanceof Error
? error.message
: String(error),
clientPubkey,
uri,
});
}
});
}
}
}
return;
}
}

// Use TaskQueue for outbound notification broadcasting to prevent event loop blocking
Expand Down
Loading