diff --git a/src/core/constants.rs b/src/core/constants.rs index 6e2a117..095d784 100644 --- a/src/core/constants.rs +++ b/src/core/constants.rs @@ -97,6 +97,13 @@ pub const INITIALIZE_METHOD: &str = "initialize"; /// MCP protocol method for the initialized notification pub const NOTIFICATIONS_INITIALIZED_METHOD: &str = "notifications/initialized"; +/// Sentinel request ID for the announcement auto-publish flow. +/// +/// Synthetic initialize and capability-list requests use this ID so the +/// worker routes responses to the announcement handler rather than the +/// normal client response path. +pub const ANNOUNCEMENT_REQUEST_ID: &str = "announcement"; + /// Kinds that should never be encrypted (public announcements) pub const UNENCRYPTED_KINDS: &[u16] = &[ SERVER_ANNOUNCEMENT_KIND, @@ -211,6 +218,13 @@ mod tests { ); } + #[test] + fn test_announcement_request_id() { + assert_eq!(ANNOUNCEMENT_REQUEST_ID, "announcement"); + // Must differ from the stateless synthetic sentinel used by the worker + assert_ne!(ANNOUNCEMENT_REQUEST_ID, "contextvm-stateless-init"); + } + #[test] fn test_unencrypted_kinds_contains_all_announcements() { assert!(UNENCRYPTED_KINDS.contains(&SERVER_ANNOUNCEMENT_KIND)); diff --git a/src/rmcp_transport/worker.rs b/src/rmcp_transport/worker.rs index 023354f..da83b79 100644 --- a/src/rmcp_transport/worker.rs +++ b/src/rmcp_transport/worker.rs @@ -3,6 +3,7 @@ //! This file defines wrapper types that bind existing ContextVM Nostr //! transports to rmcp's worker abstraction. +use crate::core::constants::ANNOUNCEMENT_REQUEST_ID; use crate::core::error::Result; use crate::core::types::{JsonRpcMessage, JsonRpcNotification, JsonRpcRequest}; use crate::transport::client::{NostrClientTransport, NostrClientTransportConfig}; @@ -118,6 +119,10 @@ impl Worker for NostrServerWorker { .await .map_err(WorkerQuitReason::fatal_context("starting server transport"))?; + // CEP-6: Spawn auto-publish after start() so the worker's select loop + // is running when synthetic messages arrive through message_tx. + self.transport.spawn_announcements(); + let mut rx = self.transport.take_message_receiver().ok_or_else(|| { WorkerQuitReason::fatal( Self::Error::Other("server message receiver already taken".to_string()), @@ -366,6 +371,17 @@ impl NostrServerWorker { ) })?; + if event_id == ANNOUNCEMENT_REQUEST_ID { + tracing::debug!( + target: LOG_TARGET, + "Routing announcement response to handler" + ); + return self + .transport + .handle_announcement_response(JsonRpcMessage::Response(resp)) + .await; + } + if event_id == STATELESS_SYNTHETIC_EVENT_ID { tracing::debug!( target: LOG_TARGET, @@ -386,6 +402,17 @@ impl NostrServerWorker { ) })?; + if event_id == ANNOUNCEMENT_REQUEST_ID { + tracing::debug!( + target: LOG_TARGET, + "Routing announcement error to handler" + ); + return self + .transport + .handle_announcement_response(JsonRpcMessage::ErrorResponse(resp)) + .await; + } + if event_id == STATELESS_SYNTHETIC_EVENT_ID { tracing::debug!( target: LOG_TARGET, @@ -517,4 +544,29 @@ mod tests { &synthetic_initialize_message() )); } + + #[test] + fn test_announcement_sentinel_differs_from_stateless_sentinel() { + assert_ne!(ANNOUNCEMENT_REQUEST_ID, STATELESS_SYNTHETIC_EVENT_ID); + } + + #[test] + fn test_announcement_response_id_detected() { + let response = JsonRpcMessage::Response(JsonRpcResponse { + jsonrpc: "2.0".to_string(), + id: serde_json::json!(ANNOUNCEMENT_REQUEST_ID), + result: serde_json::json!({ + "protocolVersion": "2025-11-25", + "capabilities": {}, + "serverInfo": { "name": "test" } + }), + }); + + if let JsonRpcMessage::Response(ref resp) = response { + let event_id = resp.id.as_str().unwrap(); + assert_eq!(event_id, ANNOUNCEMENT_REQUEST_ID); + // Must not be confused with the stateless synthetic sentinel + assert!(!is_synthetic_initialize_message(&response)); + } + } } diff --git a/src/transport/server/announcement_manager.rs b/src/transport/server/announcement_manager.rs index 1abf888..bc81d62 100644 --- a/src/transport/server/announcement_manager.rs +++ b/src/transport/server/announcement_manager.rs @@ -4,14 +4,24 @@ //! announcements (kinds 11316–11320) and CEP-35 first-response discovery. use std::sync::{Arc, Mutex}; +use std::time::Duration; use nostr_sdk::prelude::*; +use tokio::sync::Notify; +use super::IncomingRequest; use crate::core::constants::*; use crate::core::error::{Error, Result}; use crate::core::types::*; use crate::relay::RelayPoolTrait; +const LOG_TARGET: &str = "contextvm_sdk::transport::server::announcement"; + +/// Default timeout waiting for the rmcp handler to respond to the synthetic +/// initialize request during announcement auto-publish. +#[cfg_attr(not(feature = "rmcp"), allow(dead_code))] +const ANNOUNCEMENT_INIT_TIMEOUT: Duration = Duration::from_secs(10); + /// Manages tag composition and publishing for server announcements. /// /// Handles CEP-6 announcement event publishing (kinds 11316–11320) and @@ -34,15 +44,31 @@ pub(crate) struct AnnouncementManager { pricing_tags: Vec, /// Cached result of `get_common_tags()`. Invalidated by tag setters. cached_common_tags: Mutex>>, + /// Channel for injecting synthetic MCP messages into the transport's inbound queue. + /// Wrapped in `Option` so it can be dropped during `close()` — otherwise this + /// clone keeps the message channel alive after `message_tx` is taken. + dispatch_fn: Option>, + /// Notifier signaled when the announcement init response has been processed. + init_notify: Arc, + /// Whether the announcement initialization has completed. + /// Only read by `handle_announcement_response`, which is called from the + /// rmcp worker — unused when the `rmcp` feature is disabled. + #[cfg_attr(not(feature = "rmcp"), allow(dead_code))] + initialized: Mutex, } impl AnnouncementManager { /// Create a new announcement manager. + /// + /// `dispatch_fn` is a clone of the transport's `message_tx` channel, used to + /// inject synthetic MCP messages (initialize, notifications/initialized, + /// capability list requests) during the auto-publish flow. pub fn new( relay_pool: Arc, server_info: Option, encryption_mode: EncryptionMode, gift_wrap_mode: GiftWrapMode, + dispatch_fn: tokio::sync::mpsc::UnboundedSender, ) -> Self { Self { relay_pool, @@ -53,6 +79,9 @@ impl AnnouncementManager { internal_common_tags: Vec::new(), pricing_tags: Vec::new(), cached_common_tags: Mutex::new(None), + dispatch_fn: Some(dispatch_fn), + init_notify: Arc::new(Notify::new()), + initialized: Mutex::new(false), } } @@ -318,6 +347,234 @@ impl AnnouncementManager { gift_wrap_mode: self.gift_wrap_mode, } } + + /// Drop the dispatch channel clone so `close()` can fully shut down the + /// message channel. + pub(crate) fn shutdown(&mut self) { + self.dispatch_fn.take(); + } + + // ── Auto-publish orchestration ──────────────────────────────── + + /// Handle a response to a synthetic announcement request. + /// + /// Schema-matches the result to determine which event kind to publish: + /// `InitializeResult` → 11316, `ListToolsResult` → 11317, + /// `ListResourcesResult` → 11318, `ListResourceTemplatesResult` → 11319, + /// `ListPromptsResult` → 11320. + /// + /// On `InitializeResult`, dispatches `notifications/initialized` via + /// `dispatch_fn` **before** signaling `init_notify` — this ordering is + /// critical so the notification enters the worker queue before any + /// capability-list requests. + #[cfg_attr(not(feature = "rmcp"), allow(dead_code))] + pub(crate) async fn handle_announcement_response( + &self, + response: JsonRpcMessage, + ) -> Result<()> { + let result = match &response { + JsonRpcMessage::Response(resp) => &resp.result, + JsonRpcMessage::ErrorResponse(resp) => { + tracing::warn!( + target: LOG_TARGET, + error_code = resp.error.code, + error_message = %resp.error.message, + "Announcement request returned error, skipping publish" + ); + // If init hasn't completed yet, signal so publish_public_announcements + // doesn't hang waiting for the Notify. + let mut flag = self.initialized.lock().unwrap_or_else(|e| e.into_inner()); + if !*flag { + *flag = true; + drop(flag); + self.init_notify.notify_one(); + } + return Ok(()); + } + _ => return Ok(()), + }; + + // Determine event kind from response schema. + let kind = + if result.get("protocolVersion").is_some() || result.get("capabilities").is_some() { + Some(SERVER_ANNOUNCEMENT_KIND) + } else if result.get("tools").is_some() { + Some(TOOLS_LIST_KIND) + } else if result.get("resources").is_some() { + Some(RESOURCES_LIST_KIND) + } else if result.get("resourceTemplates").is_some() { + Some(RESOURCETEMPLATES_LIST_KIND) + } else if result.get("prompts").is_some() { + Some(PROMPTS_LIST_KIND) + } else { + tracing::warn!( + target: LOG_TARGET, + "Announcement response has unrecognized schema, skipping publish" + ); + None + }; + + if let Some(kind) = kind { + let content = serde_json::to_string(result)?; + let tags = self.get_announcement_tags(kind); + let builder = EventBuilder::new(Kind::Custom(kind), content).tags(tags); + match self.relay_pool.publish(builder).await { + Ok(id) => tracing::info!( + target: LOG_TARGET, + event_id = %id, + kind, + "Published announcement event" + ), + Err(e) => tracing::warn!( + target: LOG_TARGET, + error = %e, + kind, + "Failed to publish announcement event" + ), + } + + // For InitializeResult: dispatch notifications/initialized and signal Notify. + if kind == SERVER_ANNOUNCEMENT_KIND { + // Critical ordering: dispatch notifications/initialized FIRST so it + // enters the worker queue before capability-list requests. + if let Some(ref tx) = self.dispatch_fn { + let _ = tx.send(IncomingRequest { + message: JsonRpcMessage::Notification(JsonRpcNotification { + jsonrpc: "2.0".to_string(), + method: NOTIFICATIONS_INITIALIZED_METHOD.to_string(), + params: None, + }), + client_pubkey: ANNOUNCEMENT_REQUEST_ID.to_string(), + event_id: ANNOUNCEMENT_REQUEST_ID.to_string(), + is_encrypted: false, + }); + } + + // THEN signal the Notify — publish_public_announcements will dispatch + // capability-list requests after this, ensuring they arrive after the + // initialized notification in the worker queue. + let mut flag = self.initialized.lock().unwrap_or_else(|e| e.into_inner()); + *flag = true; + drop(flag); + self.init_notify.notify_one(); + } + } + + Ok(()) + } + + /// Spawn the auto-publish orchestration task. + /// + /// Returns a `JoinHandle` that the caller should track for cleanup. + /// The task dispatches a synthetic `initialize` request, waits for the + /// response, then dispatches capability-list requests. + #[cfg_attr(not(feature = "rmcp"), allow(dead_code))] + pub(crate) fn spawn_publish_public_announcements( + &self, + cancel: tokio_util::sync::CancellationToken, + ) -> tokio::task::JoinHandle<()> { + let dispatch_fn = self + .dispatch_fn + .clone() + .expect("dispatch_fn must be set before spawning announcements"); + let init_notify = Arc::clone(&self.init_notify); + tokio::spawn(publish_public_announcements( + dispatch_fn, + init_notify, + cancel, + )) + } +} + +/// Auto-publish orchestration: dispatches synthetic requests and waits for init. +/// +/// Standalone async function (not a method) so it can be moved into a spawned task +/// without borrowing the `AnnouncementManager`. +#[cfg_attr(not(feature = "rmcp"), allow(dead_code))] +async fn publish_public_announcements( + dispatch_fn: tokio::sync::mpsc::UnboundedSender, + init_notify: Arc, + cancel: tokio_util::sync::CancellationToken, +) { + tracing::info!(target: LOG_TARGET, "Starting auto-publish of server announcements"); + + // Dispatch synthetic initialize request. + let init_request = JsonRpcMessage::Request(JsonRpcRequest { + jsonrpc: "2.0".to_string(), + id: serde_json::json!(ANNOUNCEMENT_REQUEST_ID), + method: INITIALIZE_METHOD.to_string(), + params: Some(serde_json::json!({ + "protocolVersion": crate::core::constants::mcp_protocol_version(), + "capabilities": {}, + "clientInfo": { + "name": "contextvm-announcement-client", + "version": "0.1.0" + } + })), + }); + if dispatch_fn + .send(IncomingRequest { + message: init_request, + client_pubkey: ANNOUNCEMENT_REQUEST_ID.to_string(), + event_id: ANNOUNCEMENT_REQUEST_ID.to_string(), + is_encrypted: false, + }) + .is_err() + { + tracing::warn!( + target: LOG_TARGET, + "Transport channel closed before init request could be sent" + ); + return; + } + + // Wait for handle_announcement_response to signal completion of the init + // response, with cancellation support so close() isn't blocked. + tokio::select! { + _ = cancel.cancelled() => { + tracing::info!(target: LOG_TARGET, "Announcement publish cancelled during init wait"); + return; + } + result = tokio::time::timeout(ANNOUNCEMENT_INIT_TIMEOUT, init_notify.notified()) => { + match result { + Ok(()) => tracing::info!( + target: LOG_TARGET, + "Announcement init complete, dispatching capability list requests" + ), + Err(_) => tracing::warn!( + target: LOG_TARGET, + timeout_secs = ANNOUNCEMENT_INIT_TIMEOUT.as_secs(), + "Announcement init timed out, proceeding with capability list requests" + ), + } + } + } + + // Dispatch all four capability-list requests at once (no per-request await). + for method in &[ + "tools/list", + "resources/list", + "resources/templates/list", + "prompts/list", + ] { + let request = JsonRpcMessage::Request(JsonRpcRequest { + jsonrpc: "2.0".to_string(), + id: serde_json::json!(ANNOUNCEMENT_REQUEST_ID), + method: method.to_string(), + params: None, + }); + let _ = dispatch_fn.send(IncomingRequest { + message: request, + client_pubkey: ANNOUNCEMENT_REQUEST_ID.to_string(), + event_id: ANNOUNCEMENT_REQUEST_ID.to_string(), + is_encrypted: false, + }); + } + + tracing::info!( + target: LOG_TARGET, + "Dispatched all announcement capability list requests" + ); } /// Cloneable snapshot of tag-building state for the event loop. @@ -375,10 +632,11 @@ mod tests { gift_wrap_mode: GiftWrapMode, server_info: Option, ) -> AnnouncementManager { - // Tests only exercise tag building; relay pool is unused. + // Tests only exercise tag building; relay pool and dispatch channel are unused. use crate::relay::mock::MockRelayPool; let pool: Arc = Arc::new(MockRelayPool::new()); - AnnouncementManager::new(pool, server_info, encryption_mode, gift_wrap_mode) + let (tx, _rx) = tokio::sync::mpsc::unbounded_channel(); + AnnouncementManager::new(pool, server_info, encryption_mode, gift_wrap_mode, tx) } // ── 1. Server info tags ──────────────────────────────────────── @@ -562,4 +820,246 @@ mod tests { "Kind 11317 should include pricing tags" ); } + + // ── 11. Auto-publish: handle_announcement_response ─────────── + + fn make_manager_with_pool( + server_info: Option, + ) -> ( + AnnouncementManager, + Arc, + tokio::sync::mpsc::UnboundedReceiver, + ) { + use crate::relay::mock::MockRelayPool; + let pool = Arc::new(MockRelayPool::new()); + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + let mgr = AnnouncementManager::new( + Arc::clone(&pool) as Arc, + server_info, + EncryptionMode::Disabled, + GiftWrapMode::Optional, + tx, + ); + (mgr, pool, rx) + } + + #[tokio::test] + async fn handle_announcement_response_publishes_init_result() { + let (mgr, pool, mut rx) = make_manager_with_pool(None); + + let response = JsonRpcMessage::Response(JsonRpcResponse { + jsonrpc: "2.0".to_string(), + id: serde_json::json!(ANNOUNCEMENT_REQUEST_ID), + result: serde_json::json!({ + "protocolVersion": "2025-11-25", + "capabilities": {}, + "serverInfo": { "name": "test-server", "version": "0.1.0" } + }), + }); + + mgr.handle_announcement_response(response).await.unwrap(); + + // Verify kind 11316 event published + let events = pool.stored_events().await; + assert_eq!(events.len(), 1); + assert_eq!(events[0].kind, Kind::Custom(SERVER_ANNOUNCEMENT_KIND)); + + // Verify notifications/initialized dispatched + let notif = rx + .try_recv() + .expect("should dispatch notifications/initialized"); + assert_eq!( + notif.message.method(), + Some(NOTIFICATIONS_INITIALIZED_METHOD) + ); + assert_eq!(notif.client_pubkey, ANNOUNCEMENT_REQUEST_ID); + + // Verify initialized flag set + assert!(*mgr.initialized.lock().unwrap()); + } + + #[tokio::test] + async fn handle_announcement_response_publishes_tools_list() { + let (mgr, pool, _rx) = make_manager_with_pool(None); + + let response = JsonRpcMessage::Response(JsonRpcResponse { + jsonrpc: "2.0".to_string(), + id: serde_json::json!(ANNOUNCEMENT_REQUEST_ID), + result: serde_json::json!({ + "tools": [{ "name": "echo", "description": "Echo tool" }] + }), + }); + + mgr.handle_announcement_response(response).await.unwrap(); + + let events = pool.stored_events().await; + assert_eq!(events.len(), 1); + assert_eq!(events[0].kind, Kind::Custom(TOOLS_LIST_KIND)); + } + + #[tokio::test] + async fn handle_announcement_response_publishes_resources_list() { + let (mgr, pool, _rx) = make_manager_with_pool(None); + + let response = JsonRpcMessage::Response(JsonRpcResponse { + jsonrpc: "2.0".to_string(), + id: serde_json::json!(ANNOUNCEMENT_REQUEST_ID), + result: serde_json::json!({ "resources": [] }), + }); + + mgr.handle_announcement_response(response).await.unwrap(); + + let events = pool.stored_events().await; + assert_eq!(events.len(), 1); + assert_eq!(events[0].kind, Kind::Custom(RESOURCES_LIST_KIND)); + } + + #[tokio::test] + async fn handle_announcement_response_publishes_resource_templates_list() { + let (mgr, pool, _rx) = make_manager_with_pool(None); + + let response = JsonRpcMessage::Response(JsonRpcResponse { + jsonrpc: "2.0".to_string(), + id: serde_json::json!(ANNOUNCEMENT_REQUEST_ID), + result: serde_json::json!({ "resourceTemplates": [] }), + }); + + mgr.handle_announcement_response(response).await.unwrap(); + + let events = pool.stored_events().await; + assert_eq!(events.len(), 1); + assert_eq!(events[0].kind, Kind::Custom(RESOURCETEMPLATES_LIST_KIND)); + } + + #[tokio::test] + async fn handle_announcement_response_publishes_prompts_list() { + let (mgr, pool, _rx) = make_manager_with_pool(None); + + let response = JsonRpcMessage::Response(JsonRpcResponse { + jsonrpc: "2.0".to_string(), + id: serde_json::json!(ANNOUNCEMENT_REQUEST_ID), + result: serde_json::json!({ "prompts": [] }), + }); + + mgr.handle_announcement_response(response).await.unwrap(); + + let events = pool.stored_events().await; + assert_eq!(events.len(), 1); + assert_eq!(events[0].kind, Kind::Custom(PROMPTS_LIST_KIND)); + } + + #[tokio::test] + async fn handle_announcement_response_error_signals_notify_without_publishing() { + let (mgr, pool, _rx) = make_manager_with_pool(None); + + let response = JsonRpcMessage::ErrorResponse(JsonRpcErrorResponse { + jsonrpc: "2.0".to_string(), + id: serde_json::json!(ANNOUNCEMENT_REQUEST_ID), + error: JsonRpcError { + code: -32600, + message: "test error".to_string(), + data: None, + }, + }); + + mgr.handle_announcement_response(response).await.unwrap(); + + // No events published + assert!(pool.stored_events().await.is_empty()); + // But initialized flag is set (to unblock publish_public_announcements) + assert!(*mgr.initialized.lock().unwrap()); + } + + #[tokio::test] + async fn handle_announcement_response_unknown_schema_no_publish() { + let (mgr, pool, _rx) = make_manager_with_pool(None); + + let response = JsonRpcMessage::Response(JsonRpcResponse { + jsonrpc: "2.0".to_string(), + id: serde_json::json!(ANNOUNCEMENT_REQUEST_ID), + result: serde_json::json!({ "unknown": "data" }), + }); + + mgr.handle_announcement_response(response).await.unwrap(); + assert!(pool.stored_events().await.is_empty()); + } + + // ── 12. Auto-publish: publish_public_announcements ─────────── + + #[tokio::test] + async fn publish_public_announcements_dispatches_init_then_capability_lists() { + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::(); + let init_notify = Arc::new(Notify::new()); + let cancel = tokio_util::sync::CancellationToken::new(); + + let init_notify_clone = Arc::clone(&init_notify); + let handle = tokio::spawn(publish_public_announcements(tx, init_notify_clone, cancel)); + + // First message should be the synthetic initialize request. + let init_msg = tokio::time::timeout(Duration::from_secs(1), rx.recv()) + .await + .expect("should receive init request within 1s") + .expect("channel should not be closed"); + assert_eq!(init_msg.message.method(), Some(INITIALIZE_METHOD)); + assert_eq!(init_msg.client_pubkey, ANNOUNCEMENT_REQUEST_ID); + assert_eq!(init_msg.event_id, ANNOUNCEMENT_REQUEST_ID); + assert!(!init_msg.is_encrypted); + + // Signal init complete — the task should then dispatch capability lists. + init_notify.notify_one(); + + // Should receive 4 capability list requests in order. + let expected_methods = [ + "tools/list", + "resources/list", + "resources/templates/list", + "prompts/list", + ]; + for expected_method in &expected_methods { + let msg = tokio::time::timeout(Duration::from_secs(1), rx.recv()) + .await + .expect("should receive capability request within 1s") + .expect("channel should not be closed"); + assert_eq!(msg.message.method(), Some(*expected_method)); + assert_eq!(msg.client_pubkey, ANNOUNCEMENT_REQUEST_ID); + } + + handle.await.unwrap(); + } + + #[tokio::test] + async fn handle_init_result_dispatches_notification_before_notify_signal() { + let (mgr, _pool, mut rx) = make_manager_with_pool(None); + + // Clone the Notify so we can also wait on it from the test. + let init_notify = Arc::clone(&mgr.init_notify); + let notified = init_notify.notified(); + + let response = JsonRpcMessage::Response(JsonRpcResponse { + jsonrpc: "2.0".to_string(), + id: serde_json::json!(ANNOUNCEMENT_REQUEST_ID), + result: serde_json::json!({ + "protocolVersion": "2025-11-25", + "capabilities": {}, + "serverInfo": { "name": "test" } + }), + }); + + mgr.handle_announcement_response(response).await.unwrap(); + + // The Notify should have been signaled. + tokio::time::timeout(Duration::from_millis(100), notified) + .await + .expect("init_notify should have been signaled"); + + // And the notifications/initialized message should already be in the + // channel (dispatched BEFORE the Notify signal). + let notif = rx + .try_recv() + .expect("notification should be queued before Notify"); + assert_eq!( + notif.message.method(), + Some(NOTIFICATIONS_INITIALIZED_METHOD) + ); + } } diff --git a/src/transport/server/mod.rs b/src/transport/server/mod.rs index 6eef49d..1eeb368 100644 --- a/src/transport/server/mod.rs +++ b/src/transport/server/mod.rs @@ -215,6 +215,7 @@ impl NostrServerTransport { config.server_info.clone(), config.encryption_mode, config.gift_wrap_mode, + tx.clone(), ), base: BaseTransport { relay_pool, @@ -256,6 +257,7 @@ impl NostrServerTransport { config.server_info.clone(), config.encryption_mode, config.gift_wrap_mode, + tx.clone(), ), base: BaseTransport { relay_pool, @@ -430,6 +432,7 @@ impl NostrServerTransport { for handle in self.task_handles.drain(..) { let _ = handle.await; } + self.announcement_manager.shutdown(); self.message_tx.take(); self.base.disconnect().await?; self.sessions.clear().await; @@ -705,6 +708,34 @@ impl NostrServerTransport { self.announcement_manager.delete_announcements(reason).await } + /// Spawn the CEP-6 auto-publish task if `is_announced_server` is set. + /// + /// Called by the rmcp worker after `start()` — not in `start()` itself — + /// because the auto-publish flow injects synthetic MCP requests that + /// require an rmcp handler to produce responses. + #[cfg_attr(not(feature = "rmcp"), allow(dead_code))] + pub(crate) fn spawn_announcements(&mut self) { + if self.config.is_announced_server { + let handle = self + .announcement_manager + .spawn_publish_public_announcements(self.cancellation_token.child_token()); + self.task_handles.push(handle); + } + } + + /// Forward an announcement response to the announcement manager for publishing. + /// + /// Called by the worker when a response with the announcement sentinel ID arrives. + #[cfg_attr(not(feature = "rmcp"), allow(dead_code))] + pub(crate) async fn handle_announcement_response( + &self, + response: JsonRpcMessage, + ) -> Result<()> { + self.announcement_manager + .handle_announcement_response(response) + .await + } + /// Publish tools list from rmcp typed tool descriptors. #[cfg(feature = "rmcp")] pub async fn publish_tools_typed(&self, tools: Vec) -> Result {