diff --git a/Cargo.lock b/Cargo.lock index 80e334e..8a5b108 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -593,6 +593,26 @@ dependencies = [ "winnow", ] +[[package]] +name = "console_error_panic_hook" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a06aeb73f470f66dcdbf7223caeebb85984942f22f1adb2a088cf9668146bbbc" +dependencies = [ + "cfg-if", + "wasm-bindgen", +] + +[[package]] +name = "console_log" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be8aed40e4edbf4d3b4431ab260b63fdc40f5780a4766824329ea0f1eefe3c0f" +dependencies = [ + "log", + "web-sys", +] + [[package]] name = "const-oid" version = "0.9.6" @@ -1186,6 +1206,8 @@ dependencies = [ name = "exoclaw-ui" version = "0.1.0" dependencies = [ + "console_error_panic_hook", + "console_log", "futures", "gloo-net", "gloo-timers", diff --git a/src/agent/mod.rs b/src/agent/mod.rs index b829aca..f574908 100644 --- a/src/agent/mod.rs +++ b/src/agent/mod.rs @@ -91,52 +91,84 @@ impl AgentRunner { return Ok(()); } - // Create an internal channel to collect events from this LLM call - let (inner_tx, mut inner_rx) = mpsc::channel::(32); - - provider - .call_streaming(¤t_messages, tools, system_prompt, inner_tx) - .await?; - - // Collect events, forwarding text/usage/error to client, - // collecting tool_use calls for dispatch - let mut tool_calls: Vec<(String, String, serde_json::Value)> = Vec::new(); + let tool_calls: Vec<(String, String, serde_json::Value)> = { + // Create an internal channel to collect events from this LLM call. + // We must drain this channel while provider streaming is in-flight; + // otherwise long responses can fill the buffer and deadlock. + let (inner_tx, mut inner_rx) = mpsc::channel::(32); + let mut tool_calls: Vec<(String, String, serde_json::Value)> = Vec::new(); + let mut provider_done = false; + let mut provider_error: Option = None; + + let provider_call = + provider.call_streaming(¤t_messages, tools, system_prompt, inner_tx); + tokio::pin!(provider_call); + + loop { + tokio::select! { + result = &mut provider_call, if !provider_done => { + provider_done = true; + if let Err(e) = result { + provider_error = Some(e); + } + } + maybe_event = inner_rx.recv() => { + let Some(event) = maybe_event else { + if !provider_done { + if let Err(e) = (&mut provider_call).await { + provider_error = Some(e); + } + } + break; + }; - while let Some(event) = inner_rx.recv().await { - match event { - AgentEvent::Text(ref _t) => { - let _ = tx.send(event).await; - } - AgentEvent::ToolUse { - ref id, - ref name, - ref input, - } => { - // Forward to client so they can observe - let _ = tx - .send(AgentEvent::ToolUse { - id: id.clone(), - name: name.clone(), - input: input.clone(), - }) - .await; - tool_calls.push((id.clone(), name.clone(), input.clone())); - } - AgentEvent::Usage { .. } => { - let _ = tx.send(event).await; - } - AgentEvent::Error(ref _e) => { - let _ = tx.send(event).await; - } - AgentEvent::Done => { - // Don't forward Done yet — we may need to continue the loop + match event { + AgentEvent::Text(ref _t) => { + let _ = tx.send(event).await; + } + AgentEvent::ToolUse { + ref id, + ref name, + ref input, + } => { + // Forward to client so they can observe + let _ = tx + .send(AgentEvent::ToolUse { + id: id.clone(), + name: name.clone(), + input: input.clone(), + }) + .await; + tool_calls.push((id.clone(), name.clone(), input.clone())); + } + AgentEvent::Usage { .. } => { + let _ = tx.send(event).await; + } + AgentEvent::Error(ref _e) => { + let _ = tx.send(event).await; + } + AgentEvent::Done => { + // Don't forward Done yet — we may need to continue the loop + } + AgentEvent::ToolResult { .. } => { + // Shouldn't come from provider, but forward if it does + let _ = tx.send(event).await; + } + } + } } - AgentEvent::ToolResult { .. } => { - // Shouldn't come from provider, but forward if it does - let _ = tx.send(event).await; + + if provider_done && inner_rx.is_closed() && inner_rx.is_empty() { + break; } } - } + + if let Some(e) = provider_error { + return Err(e); + } + + tool_calls + }; // If no tool calls, we're done if tool_calls.is_empty() { @@ -366,3 +398,73 @@ impl Default for AgentRunner { Self::new() } } + +#[cfg(test)] +mod tests { + use super::{AgentEvent, AgentRunner, providers}; + use crate::sandbox::PluginHost; + use async_trait::async_trait; + use std::sync::Arc; + use tokio::sync::{RwLock, mpsc}; + use tokio::time::{Duration, timeout}; + + struct BurstProvider { + chunks: usize, + } + + #[async_trait] + impl providers::LlmProvider for BurstProvider { + async fn call_streaming( + &self, + _messages: &[serde_json::Value], + _tools: &[serde_json::Value], + _system_prompt: Option<&str>, + tx: mpsc::Sender, + ) -> anyhow::Result<()> { + for i in 0..self.chunks { + tx.send(AgentEvent::Text(format!("chunk-{i}"))).await?; + } + tx.send(AgentEvent::Done).await?; + Ok(()) + } + } + + #[tokio::test] + async fn run_with_tools_drains_stream_while_provider_is_running() { + let runner = AgentRunner::new(); + let provider = BurstProvider { chunks: 64 }; + let plugins = Arc::new(RwLock::new(PluginHost::new())); + let (tx, mut rx) = mpsc::channel::(256); + + timeout( + Duration::from_secs(5), + runner.run_with_tools( + &provider, + vec![serde_json::json!({ + "role": "user", + "content": "hello", + })], + &[], + None, + &plugins, + tx, + ), + ) + .await + .expect("runner should not deadlock") + .expect("runner should succeed"); + + let mut text_count = 0usize; + let mut saw_done = false; + while let Ok(event) = rx.try_recv() { + match event { + AgentEvent::Text(_) => text_count += 1, + AgentEvent::Done => saw_done = true, + _ => {} + } + } + + assert_eq!(text_count, 64); + assert!(saw_done); + } +} diff --git a/src/agent/providers.rs b/src/agent/providers.rs index b47cbbe..4890ba5 100644 --- a/src/agent/providers.rs +++ b/src/agent/providers.rs @@ -2,7 +2,8 @@ use async_trait::async_trait; use futures::StreamExt; use reqwest::Client; use tokio::sync::mpsc; -use tracing::debug; +use tokio::time::{Duration, timeout}; +use tracing::{debug, warn}; use super::AgentEvent; @@ -16,6 +17,46 @@ fn openai_endpoint() -> String { .unwrap_or_else(|_| "https://api.openai.com/v1/chat/completions".to_string()) } +const PROVIDER_REQUEST_TIMEOUT_SECS: u64 = 45; +const PROVIDER_STREAM_IDLE_TIMEOUT_SECS: u64 = 45; + +fn pop_next_sse_event(buffer: &mut String) -> Option { + if buffer.contains('\r') { + // SSE allows CRLF, LF, and CR. Normalize everything to LF so splitting + // on "\n\n" is robust across providers and proxies. + *buffer = buffer.replace("\r\n", "\n").replace('\r', "\n"); + } + + let pos = buffer.find("\n\n")?; + let event = buffer[..pos].to_string(); + buffer.replace_range(..pos + 2, ""); + Some(event) +} + +fn parse_sse_fields(event_text: &str) -> (String, String) { + let mut event_type = String::new(); + let mut data_lines = Vec::new(); + + for raw in event_text.lines() { + if raw.is_empty() || raw.starts_with(':') { + continue; + } + + let Some((field, value)) = raw.split_once(':') else { + continue; + }; + let value = value.strip_prefix(' ').unwrap_or(value); + + match field { + "event" => event_type = value.to_string(), + "data" => data_lines.push(value.to_string()), + _ => {} + } + } + + (event_type, data_lines.join("\n")) +} + /// Trait for LLM provider implementations. #[async_trait] pub trait LlmProvider: Send + Sync { @@ -78,6 +119,14 @@ impl LlmProvider for AnthropicProvider { system_prompt: Option<&str>, tx: mpsc::Sender, ) -> anyhow::Result<()> { + debug!( + provider = "anthropic", + model = %self.model, + message_count = messages.len(), + tool_count = tools.len(), + "starting provider stream" + ); + let mut body = serde_json::json!({ "model": self.model, "max_tokens": self.max_tokens, @@ -93,15 +142,24 @@ impl LlmProvider for AnthropicProvider { body["tools"] = serde_json::json!(tools); } - let response = self - .client - .post(anthropic_endpoint()) - .header("x-api-key", &self.api_key) - .header("anthropic-version", "2023-06-01") - .header("content-type", "application/json") - .json(&body) - .send() - .await?; + let response = timeout( + Duration::from_secs(PROVIDER_REQUEST_TIMEOUT_SECS), + self.client + .post(anthropic_endpoint()) + .header("x-api-key", &self.api_key) + .header("anthropic-version", "2023-06-01") + .header("content-type", "application/json") + .json(&body) + .send(), + ) + .await + .map_err(|_| anyhow::anyhow!("provider request timed out"))??; + + debug!( + provider = "anthropic", + status = %response.status(), + "provider response received" + ); if !response.status().is_success() { let status = response.status(); @@ -121,29 +179,46 @@ impl LlmProvider for AnthropicProvider { let mut input_tokens: u32 = 0; let mut output_tokens: u32 = 0; - while let Some(chunk) = stream.next().await { - let chunk = chunk?; + loop { + let chunk = match timeout( + Duration::from_secs(PROVIDER_STREAM_IDLE_TIMEOUT_SECS), + stream.next(), + ) + .await + { + Ok(Some(chunk)) => chunk?, + Ok(None) => { + debug!(provider = "anthropic", "provider stream closed"); + break; + } + Err(_) => { + return Err(anyhow::anyhow!( + "provider stream idle timeout after {}s", + PROVIDER_STREAM_IDLE_TIMEOUT_SECS + )); + } + }; + debug!( + provider = "anthropic", + chunk_bytes = chunk.len(), + "received stream chunk" + ); buffer.push_str(&String::from_utf8_lossy(&chunk)); - while let Some(pos) = buffer.find("\n\n") { - let event_text = buffer[..pos].to_string(); - buffer = buffer[pos + 2..].to_string(); - - // Parse SSE event type and data - let mut event_type = String::new(); - let mut data = String::new(); - for line in event_text.lines() { - if let Some(et) = line.strip_prefix("event: ") { - event_type = et.to_string(); - } else if let Some(d) = line.strip_prefix("data: ") { - data = d.to_string(); - } - } + while let Some(event_text) = pop_next_sse_event(&mut buffer) { + let (event_type, data) = parse_sse_fields(&event_text); if data.is_empty() || data == "[DONE]" { continue; } + debug!( + provider = "anthropic", + event = %event_type, + data_bytes = data.len(), + "parsed SSE event" + ); + let parsed: serde_json::Value = match serde_json::from_str(&data) { Ok(v) => v, Err(e) => { @@ -235,6 +310,7 @@ impl LlmProvider for AnthropicProvider { }) .await; let _ = tx.send(AgentEvent::Done).await; + debug!(provider = "anthropic", "provider stream completed"); return Ok(()); } @@ -243,6 +319,10 @@ impl LlmProvider for AnthropicProvider { } } + warn!( + provider = "anthropic", + "provider stream ended without explicit message_stop" + ); let _ = tx .send(AgentEvent::Usage { input_tokens, @@ -281,6 +361,14 @@ impl LlmProvider for OpenAiProvider { system_prompt: Option<&str>, tx: mpsc::Sender, ) -> anyhow::Result<()> { + debug!( + provider = "openai", + model = %self.model, + message_count = messages.len(), + tool_count = tools.len(), + "starting provider stream" + ); + // Prepend system message if provided let mut all_messages = Vec::new(); if let Some(system) = system_prompt { @@ -303,14 +391,23 @@ impl LlmProvider for OpenAiProvider { body["tools"] = serde_json::json!(tools); } - let response = self - .client - .post(openai_endpoint()) - .header("Authorization", format!("Bearer {}", self.api_key)) - .header("content-type", "application/json") - .json(&body) - .send() - .await?; + let response = timeout( + Duration::from_secs(PROVIDER_REQUEST_TIMEOUT_SECS), + self.client + .post(openai_endpoint()) + .header("Authorization", format!("Bearer {}", self.api_key)) + .header("content-type", "application/json") + .json(&body) + .send(), + ) + .await + .map_err(|_| anyhow::anyhow!("provider request timed out"))??; + + debug!( + provider = "openai", + status = %response.status(), + "provider response received" + ); if !response.status().is_success() { let status = response.status(); @@ -329,15 +426,42 @@ impl LlmProvider for OpenAiProvider { let mut input_tokens: u32 = 0; let mut output_tokens: u32 = 0; - while let Some(chunk) = stream.next().await { - let chunk = chunk?; + loop { + let chunk = match timeout( + Duration::from_secs(PROVIDER_STREAM_IDLE_TIMEOUT_SECS), + stream.next(), + ) + .await + { + Ok(Some(chunk)) => chunk?, + Ok(None) => { + debug!(provider = "openai", "provider stream closed"); + break; + } + Err(_) => { + return Err(anyhow::anyhow!( + "provider stream idle timeout after {}s", + PROVIDER_STREAM_IDLE_TIMEOUT_SECS + )); + } + }; + debug!( + provider = "openai", + chunk_bytes = chunk.len(), + "received stream chunk" + ); buffer.push_str(&String::from_utf8_lossy(&chunk)); - while let Some(pos) = buffer.find("\n\n") { - let event = buffer[..pos].to_string(); - buffer = buffer[pos + 2..].to_string(); - - if let Some(data) = event.strip_prefix("data: ") { + while let Some(event_text) = pop_next_sse_event(&mut buffer) { + let (_event_type, data) = parse_sse_fields(&event_text); + if !data.is_empty() { + debug!( + provider = "openai", + data_bytes = data.len(), + "parsed SSE event" + ); + } + if !data.is_empty() { if data == "[DONE]" { let _ = tx .send(AgentEvent::Usage { @@ -346,10 +470,11 @@ impl LlmProvider for OpenAiProvider { }) .await; let _ = tx.send(AgentEvent::Done).await; + debug!(provider = "openai", "provider stream completed"); return Ok(()); } - let parsed: serde_json::Value = match serde_json::from_str(data) { + let parsed: serde_json::Value = match serde_json::from_str(&data) { Ok(v) => v, Err(_) => continue, }; @@ -430,6 +555,10 @@ impl LlmProvider for OpenAiProvider { } } + warn!( + provider = "openai", + "provider stream ended without explicit [DONE]" + ); let _ = tx .send(AgentEvent::Usage { input_tokens, @@ -528,3 +657,34 @@ pub fn build_tools_for_provider( _ => build_anthropic_tools(schemas), // default to Anthropic format } } + +#[cfg(test)] +mod tests { + use super::{parse_sse_fields, pop_next_sse_event}; + + #[test] + fn pop_next_sse_event_handles_crlf() { + let mut buffer = "event: ping\r\ndata: {\"ok\":true}\r\n\r\nrest".to_string(); + let event = pop_next_sse_event(&mut buffer).expect("event"); + assert_eq!(event, "event: ping\ndata: {\"ok\":true}"); + assert_eq!(buffer, "rest"); + } + + #[test] + fn pop_next_sse_event_handles_cr_only() { + let mut buffer = "data: one\r\rdata: two\r\r".to_string(); + let first = pop_next_sse_event(&mut buffer).expect("first"); + let second = pop_next_sse_event(&mut buffer).expect("second"); + assert_eq!(first, "data: one"); + assert_eq!(second, "data: two"); + } + + #[test] + fn parse_sse_fields_collects_multiline_data() { + let (event_type, data) = parse_sse_fields( + "event: message_delta\n:data-comment\ndata: {\"a\":1}\ndata: {\"b\":2}", + ); + assert_eq!(event_type, "message_delta"); + assert_eq!(data, "{\"a\":1}\n{\"b\":2}"); + } +} diff --git a/src/gateway/protocol.rs b/src/gateway/protocol.rs index 83862d3..decef0c 100644 --- a/src/gateway/protocol.rs +++ b/src/gateway/protocol.rs @@ -1,7 +1,7 @@ use serde::{Deserialize, Serialize}; use std::sync::Arc; use tokio::sync::mpsc; -use tracing::warn; +use tracing::{debug, info, warn}; use super::server::AppState; use crate::agent::AgentEvent; @@ -10,12 +10,28 @@ use crate::types::Message as AgentMessage; #[derive(Deserialize)] struct RpcRequest { - id: String, + id: RpcId, method: String, #[serde(default)] params: serde_json::Value, } +#[derive(Debug, Deserialize)] +#[serde(untagged)] +enum RpcId { + String(String), + Number(serde_json::Number), +} + +impl RpcId { + fn into_string(self) -> String { + match self { + RpcId::String(value) => value, + RpcId::Number(value) => value.to_string(), + } + } +} + /// Parameters for the `chat.send` RPC method. #[derive(Debug, Deserialize)] pub struct ChatSendParams { @@ -54,6 +70,17 @@ pub enum RpcResult { }, } +fn event_kind(event: &AgentEvent) -> &'static str { + match event { + AgentEvent::Text(_) => "text", + AgentEvent::ToolUse { .. } => "tool_use", + AgentEvent::ToolResult { .. } => "tool_result", + AgentEvent::Usage { .. } => "usage", + AgentEvent::Done => "done", + AgentEvent::Error(_) => "error", + } +} + /// Handle an incoming JSON-RPC-style message. pub async fn handle_rpc(msg: &str, state: &Arc) -> RpcResult { let req: RpcRequest = match serde_json::from_str(msg) { @@ -70,10 +97,12 @@ pub async fn handle_rpc(msg: &str, state: &Arc) -> RpcResult { } }; + let request_id = req.id.into_string(); + match req.method.as_str() { "ping" => { let resp = RpcResponse { - id: req.id, + id: request_id, result: Some(serde_json::json!("pong")), error: None, }; @@ -83,7 +112,7 @@ pub async fn handle_rpc(msg: &str, state: &Arc) -> RpcResult { "status" => { let router = state.router.read().await; let resp = RpcResponse { - id: req.id, + id: request_id, result: Some(serde_json::json!({ "version": env!("CARGO_PKG_VERSION"), "plugins": state.plugins.read().await.count(), @@ -99,7 +128,7 @@ pub async fn handle_rpc(msg: &str, state: &Arc) -> RpcResult { Ok(p) => p, Err(e) => { let resp = RpcResponse { - id: req.id, + id: request_id, result: None, error: Some(format!("invalid chat.send params: {e}")), }; @@ -107,13 +136,13 @@ pub async fn handle_rpc(msg: &str, state: &Arc) -> RpcResult { } }; - handle_chat_send(req.id, params, state).await + handle_chat_send(request_id, params, state).await } "plugin.list" => { let plugins = state.plugins.read().await; let resp = RpcResponse { - id: req.id, + id: request_id, result: Some(serde_json::json!(plugins.list())), error: None, }; @@ -122,7 +151,7 @@ pub async fn handle_rpc(msg: &str, state: &Arc) -> RpcResult { _ => { let resp = RpcResponse { - id: req.id, + id: request_id, result: None, error: Some(format!("unknown method: {}", req.method)), }; @@ -148,6 +177,14 @@ async fn handle_chat_send( params.team.as_deref(), ) }; + info!( + request_id = %request_id, + session = %route.session_key, + agent = %route.agent_id, + provider = %state.config.agent.provider, + message_chars = params.content.chars().count(), + "chat.send accepted" + ); // 2. Get/create session and append user message { @@ -224,10 +261,17 @@ async fn handle_chat_send( let plugins = Arc::clone(&state.plugins); let budget_config = state.config.budgets.clone(); let session_lock = state.session_lock(&route.session_key).await; + let relay_request_id = request_id.clone(); + let runner_request_id = request_id.clone(); // Metering relay: intercepts events to record usage, then forwards to client. tokio::spawn(async move { while let Some(event) = meter_rx.recv().await { + debug!( + request_id = %relay_request_id, + event = event_kind(&event), + "relaying agent event" + ); // Record usage when we see a Usage event (T031/T033) if let AgentEvent::Usage { input_tokens, @@ -246,14 +290,26 @@ async fn handle_chat_send( ); } if tx.send(event).await.is_err() { + debug!(request_id = %relay_request_id, "client receiver dropped"); break; } } + debug!(request_id = %relay_request_id, "agent relay channel closed"); }); tokio::spawn(async move { // Serialize all processing for this session across connections. + debug!( + request_id = %runner_request_id, + session = %session_key, + "waiting for session lock" + ); let _session_guard = session_lock.lock().await; + debug!( + request_id = %runner_request_id, + session = %session_key, + "acquired session lock" + ); let runner = crate::agent::AgentRunner::new(); let result = runner @@ -268,10 +324,21 @@ async fn handle_chat_send( .await; if let Err(e) = result { + warn!( + request_id = %runner_request_id, + session = %session_key, + "provider run failed: {e}" + ); let _ = meter_tx .send(AgentEvent::Error(format!("provider error: {e}"))) .await; let _ = meter_tx.send(AgentEvent::Done).await; + } else { + debug!( + request_id = %runner_request_id, + session = %session_key, + "provider run completed" + ); } // Collect assistant response text and append to session diff --git a/src/gateway/server.rs b/src/gateway/server.rs index 7f3e759..f1157b0 100644 --- a/src/gateway/server.rs +++ b/src/gateway/server.rs @@ -12,7 +12,7 @@ use rust_embed::Embed; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::{Mutex, RwLock}; -use tracing::{info, warn}; +use tracing::{debug, info, warn}; #[derive(Embed)] #[folder = "ui/dist/"] @@ -227,9 +227,24 @@ async fn handle_connection(mut socket: WebSocket, state: Arc) { user_content, mut rx, } => { + info!( + request_id = %id, + session = %session_key, + "starting websocket event stream" + ); // Stream AgentEvents as JSON frames to the client let mut assistant_text = String::new(); + let mut saw_done = false; + let mut sent_frames: usize = 0; while let Some(event) = rx.recv().await { + let event_kind = match &event { + AgentEvent::Text(_) => "text", + AgentEvent::ToolUse { .. } => "tool_use", + AgentEvent::ToolResult { .. } => "tool_result", + AgentEvent::Usage { .. } => "usage", + AgentEvent::Done => "done", + AgentEvent::Error(_) => "error", + }; let frame = match &event { AgentEvent::Text(text) => { assistant_text.push_str(text); @@ -301,10 +316,24 @@ async fn handle_connection(mut socket: WebSocket, state: Arc) { let frame_str = serde_json::to_string(&frame).unwrap_or_default(); if socket.send(Message::Text(frame_str.into())).await.is_err() { // Client disconnected mid-stream + debug!( + request_id = %id, + session = %session_key, + "client disconnected during stream send" + ); break; } + sent_frames += 1; + debug!( + request_id = %id, + session = %session_key, + sent_frames, + event = event_kind, + "sent stream frame" + ); if is_done { + saw_done = true; // Append collected assistant text to session if !assistant_text.is_empty() { let mut store = state.store.write().await; @@ -329,6 +358,14 @@ async fn handle_connection(mut socket: WebSocket, state: Arc) { break; } } + if !saw_done { + warn!( + request_id = %id, + session = %session_key, + sent_frames, + "stream ended without done" + ); + } } } } diff --git a/src/main.rs b/src/main.rs index 6c5185c..31ce07f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -59,9 +59,10 @@ enum PluginAction { #[tokio::main] async fn main() -> anyhow::Result<()> { - tracing_subscriber::fmt() - .with_env_filter(EnvFilter::from_default_env()) - .init(); + let env_filter = EnvFilter::try_from_default_env() + .or_else(|_| EnvFilter::try_new("warn,exoclaw=info")) + .unwrap_or_else(|_| EnvFilter::new("warn")); + tracing_subscriber::fmt().with_env_filter(env_filter).init(); let cli = Cli::parse(); diff --git a/tests/protocol_test.rs b/tests/protocol_test.rs index 0f0f9e5..b3db0a7 100644 --- a/tests/protocol_test.rs +++ b/tests/protocol_test.rs @@ -38,6 +38,18 @@ async fn ping_returns_pong() { assert_eq!(parsed["result"], "pong"); } +#[tokio::test] +async fn ping_accepts_numeric_id() { + let state = build_state(ExoclawConfig::default()); + let result = handle_rpc(r#"{"id":1,"method":"ping"}"#, &state).await; + let RpcResult::Response(resp) = result else { + panic!("expected response"); + }; + let parsed: serde_json::Value = serde_json::from_str(&resp).unwrap(); + assert_eq!(parsed["id"], "1"); + assert_eq!(parsed["result"], "pong"); +} + #[tokio::test] async fn status_returns_version_plugins_sessions() { let state = build_state(ExoclawConfig::default()); diff --git a/ui/Cargo.toml b/ui/Cargo.toml index 34e87d0..d9e99b0 100644 --- a/ui/Cargo.toml +++ b/ui/Cargo.toml @@ -32,6 +32,8 @@ web-sys = { version = "0.3", features = [ js-sys = "0.3" futures = "0.3" log = "0.4" +console_log = "1" +console_error_panic_hook = "0.1" [dev-dependencies] wasm-bindgen-test = "0.3" diff --git a/ui/src/components/input.rs b/ui/src/components/input.rs index 77ace41..ff69257 100644 --- a/ui/src/components/input.rs +++ b/ui/src/components/input.rs @@ -1,6 +1,12 @@ use crate::state::ChatState; use crate::ws; +use futures::{FutureExt, StreamExt}; +use gloo_net::websocket::Message; +use gloo_timers::future::TimeoutFuture; use leptos::prelude::*; +use log::{debug, warn}; + +const STREAM_IDLE_TIMEOUT_MS: u32 = 60_000; #[component] pub fn MessageInput() -> impl IntoView { @@ -24,19 +30,37 @@ pub fn MessageInput() -> impl IntoView { match conn { Ok(mut conn) => { state.is_connected.set(true); - if let Err(e) = ws::send_chat(&mut conn.write, &content, 1).await { + let request_id = format!("web-{}", js_sys::Date::now() as u64); + if let Err(e) = ws::send_chat(&mut conn.write, &content, &request_id).await { state.add_error(e); state.is_streaming.set(false); state.is_connected.set(false); return; } - use futures::StreamExt; - use gloo_net::websocket::Message; + loop { + let next_msg = conn.read.next().fuse(); + let timeout = TimeoutFuture::new(STREAM_IDLE_TIMEOUT_MS).fuse(); + futures::pin_mut!(next_msg, timeout); + + let msg = futures::select! { + msg = next_msg => msg, + _ = timeout => { + warn!("stream timed out for request_id={request_id}"); + state.complete_message(); + state.add_error("Timed out waiting for model response.".to_string()); + state.is_streaming.set(false); + state.is_connected.set(false); + return; + } + }; - while let Some(msg) = conn.read.next().await { match msg { - Ok(Message::Text(text)) => { + Some(Ok(Message::Text(text))) => { + debug!( + "received text websocket frame (request_id={request_id}, bytes={})", + text.len() + ); if let Some(event) = ws::parse_event(&text) { match event { ws::StreamEvent::Text(t) => { @@ -62,14 +86,26 @@ pub fn MessageInput() -> impl IntoView { } } } - Ok(Message::Bytes(_)) => {} - Err(e) => { + Some(Ok(Message::Bytes(_))) => {} + Some(Err(e)) => { state.complete_message(); state.add_error(format!("WebSocket error: {}", e)); state.is_streaming.set(false); state.is_connected.set(false); break; } + None => { + warn!("websocket closed before stream completion"); + if state.is_streaming.get() { + state.complete_message(); + state.add_error( + "Connection closed before response completed.".to_string(), + ); + state.is_streaming.set(false); + } + state.is_connected.set(false); + break; + } } } } diff --git a/ui/src/lib.rs b/ui/src/lib.rs index c342ade..dabfa39 100644 --- a/ui/src/lib.rs +++ b/ui/src/lib.rs @@ -12,5 +12,8 @@ use wasm_bindgen::prelude::*; #[cfg(not(test))] #[wasm_bindgen(start)] pub fn start() { + console_error_panic_hook::set_once(); + let _ = console_log::init_with_level(log::Level::Info); + log::info!("exoclaw-ui boot"); leptos::mount::mount_to_body(App); } diff --git a/ui/src/ws.rs b/ui/src/ws.rs index f1cdd85..4b6832e 100644 --- a/ui/src/ws.rs +++ b/ui/src/ws.rs @@ -1,6 +1,7 @@ use futures::{FutureExt, SinkExt, StreamExt}; use gloo_net::websocket::{Message, futures::WebSocket}; use gloo_timers::future::TimeoutFuture; +use log::{debug, info, warn}; use serde_json::{Value, json}; #[derive(Debug, Clone, PartialEq, Eq)] @@ -21,7 +22,15 @@ fn ws_url() -> Result { } pub fn parse_event(msg: &str) -> Option { - let v: Value = serde_json::from_str(msg).ok()?; + let v: Value = match serde_json::from_str(msg) { + Ok(v) => v, + Err(e) => { + warn!("failed to parse websocket frame as JSON: {e}"); + return Some(StreamEvent::Error( + "invalid response from gateway".to_string(), + )); + } + }; // Handle JSON-RPC error responses (no "event" field, has "error" field) if let Some(err) = v.get("error") { @@ -54,7 +63,14 @@ pub fn parse_event(msg: &str) -> Option { .unwrap_or("unknown error"); Some(StreamEvent::Error(data.to_string())) } - _ => None, + "usage" | "tool_result" => { + debug!("ignoring non-render event frame: {event}"); + None + } + other => { + debug!("unknown stream event: {other}"); + None + } } } @@ -65,6 +81,7 @@ pub struct WsConnection { pub async fn connect(token: Option) -> Result { let url = ws_url()?; + info!("opening websocket connection to {url}"); let ws = WebSocket::open(&url).map_err(|e| format!("WebSocket open failed: {}", e))?; let (mut write, mut read) = ws.split(); @@ -94,6 +111,7 @@ pub async fn connect(token: Option) -> Result { if v.get("ok").and_then(|v| v.as_bool()) != Some(true) { return Err("authentication failed".to_string()); } + info!("websocket authenticated"); } else { // In no-token mode, loopback gateway sends {"ok":true,...} immediately. // Token-protected gateway waits for an auth frame and sends nothing. @@ -131,7 +149,10 @@ pub async fn connect(token: Option) -> Result { } Some(Some(Err(e))) => return Err(format!("read response failed: {}", e)), Some(None) => return Err("connection closed during handshake".to_string()), - None => return Err("authentication required".to_string()), + None => { + debug!("no handshake ack in timeout window; treating as auth-required"); + return Err("authentication required".to_string()); + } } } @@ -141,8 +162,12 @@ pub async fn connect(token: Option) -> Result { pub async fn send_chat( write: &mut futures::stream::SplitSink, content: &str, - id: u32, + id: &str, ) -> Result<(), String> { + info!( + "sending chat request id={id} chars={}", + content.chars().count() + ); let msg = json!({ "jsonrpc": "2.0", "id": id,