From 07c8ae8bc57bb5ef1d8bd2c81f2eb01f78ba80ec Mon Sep 17 00:00:00 2001 From: Deploy Bot Date: Fri, 10 Apr 2026 07:03:58 -0400 Subject: [PATCH] fix(websocket): handle broadcast Lagged error + ping keepalive MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When the broadcast channel's recv() returns Lagged (client fell behind), the server previously treated it as fatal and dropped the connection. The client immediately reconnected, creating a visible 2-4s freeze cycle. Fix: - Lagged error: skip missed frames with continue instead of break - Add 30s ping/pong keepalive to prevent reverse proxy idle timeouts (Caddy, nginx default 60s idle → drops long-lived WS connections) Tested with 11 ESP32-S3 nodes streaming at ~20fps per node: sustained 154 frames over 8 seconds, zero disconnects. Fixes dashboard freezing reported in multi-node deployments. --- .../wifi-densepose-sensing-server/src/main.rs | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/main.rs b/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/main.rs index 029287c1c..12c482359 100644 --- a/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/main.rs +++ b/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/main.rs @@ -1883,6 +1883,10 @@ async fn handle_ws_client(mut socket: WebSocket, state: SharedState) { info!("WebSocket client connected (sensing)"); + // Ping keepalive to prevent proxy idle timeouts (Caddy, nginx, etc.) + let mut ping_interval = tokio::time::interval(std::time::Duration::from_secs(30)); + ping_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + loop { tokio::select! { msg = rx.recv() => { @@ -1892,13 +1896,24 @@ async fn handle_ws_client(mut socket: WebSocket, state: SharedState) { break; } } - Err(_) => break, + // Lagged: client fell behind — skip missed frames, don't disconnect. + Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { + tracing::debug!("WS client lagged by {n} frames, skipping"); + continue; + } + Err(_) => break, // channel closed + } + } + _ = ping_interval.tick() => { + if socket.send(Message::Ping(vec![].into())).await.is_err() { + break; } } msg = socket.recv() => { match msg { Some(Ok(Message::Close(_))) | None => break, - _ => {} // ignore client messages + Some(Ok(Message::Pong(_))) => {} // keepalive response + _ => {} // ignore other client messages } } }