From 9597d526311af3dbc5de3ac6e9c3995636cbc5fd Mon Sep 17 00:00:00 2001 From: Nova Date: Fri, 13 Mar 2026 20:44:35 +0800 Subject: [PATCH 1/3] fix: Kline streaming - handle symbol/interval in parent message - Make KlineData.symbol and interval Option since they come from the parent message, not the data array items - Extract symbol/interval from parent message in websocket parsing - Update commands.rs to handle Option types in output Fixes silent failure where kline data was received but not displayed. --- src/cli.rs | 7 ++ src/commands.rs | 31 +++++++ src/models.rs | 18 ++++ src/output.rs | 236 +++++++++++++++++++++++++++++++++++++++++++++++ src/websocket.rs | 65 ++++++++++--- 5 files changed, 346 insertions(+), 11 deletions(-) diff --git a/src/cli.rs b/src/cli.rs index 48f25bb..465423a 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -324,6 +324,13 @@ pub enum StreamCommands { }, /// Stream public trades (public channel) Trade { symbol: String }, + /// Stream candlestick/kline data (public channel) + Kline { + symbol: String, + /// Interval: 3S, 1, 5, 15, 60 (minutes), 1D (day) + #[arg(short, long, default_value = "3S")] + interval: String, + }, /// Stream order updates (authenticated) Order, /// Stream position updates (authenticated) diff --git a/src/commands.rs b/src/commands.rs index 0b7ee21..8c8dac4 100644 --- a/src/commands.rs +++ b/src/commands.rs @@ -759,6 +759,37 @@ pub async fn handle_stream(command: StreamCommands, verbose: bool) -> Result<()> } } } + StreamCommands::Kline { symbol, interval } => { + let ws = StandXWebSocket::without_auth_with_verbose(verbose)?; + // Subscribe with interval parameter embedded in topic + let topic = format!("{}:{}:{}", "kline", symbol, interval); + ws.subscribe_with_interval("kline", Some(&symbol), Some(&interval)).await?; + let mut rx = ws.connect().await?; + + println!("Streaming kline for {} [{}]", symbol, interval); + println!("Press Ctrl+C to exit\n"); + + while let Some(msg) = rx.recv().await { + if let WsMessage::Kline(data) = msg { + // Convert timestamp to readable time + let time_str = chrono::DateTime::from_timestamp_millis(data.time) + .map(|dt| dt.format("%H:%M:%S").to_string()) + .unwrap_or_else(|| data.time.to_string()); + + println!( + "📊 Kline: {} [{}] {}\nO: {} H: {} L: {} C: {} Vol: {:.3}", + data.symbol.unwrap_or_default(), + data.interval.unwrap_or_default(), + time_str, + data.open, + data.high, + data.low, + data.close, + data.volume + ); + } + } + } // User-level authenticated channels StreamCommands::Order => { let ws = StandXWebSocket::new_with_verbose(verbose)?; diff --git a/src/models.rs b/src/models.rs index 58bf346..ea54617 100644 --- a/src/models.rs +++ b/src/models.rs @@ -135,6 +135,24 @@ pub struct PriceData { pub timestamp: String, } +/// Kline/candlestick data +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct KlineData { + pub symbol: Option, + pub interval: Option, + pub time: i64, + #[serde(deserialize_with = "string_or_number_to_string")] + pub open: String, + #[serde(deserialize_with = "string_or_number_to_string")] + pub high: String, + #[serde(deserialize_with = "string_or_number_to_string")] + pub low: String, + #[serde(deserialize_with = "string_or_number_to_string")] + pub close: String, + pub volume: f64, + pub volume_quote: f64, +} + /// Order book level #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct OrderBookLevel { diff --git a/src/output.rs b/src/output.rs index 217fa11..b2ed895 100644 --- a/src/output.rs +++ b/src/output.rs @@ -490,3 +490,239 @@ pub fn format_dashboard_mvp(snapshot: &DashboardSnapshot, compact: bool) -> Stri output.push_str(&footer()); output } + +// ============================================================================= +// Refactored dashboard functions (Issue #156) +// ============================================================================= + +/// Format the dashboard header with timestamp +pub fn format_dashboard_header(time_str: &str, width: usize) -> String { + let mut output = String::new(); + let border = format!("┌{}��\n", "─".repeat(width)); + let sep = format!("├{}┤\n", "─".repeat(width)); + + output.push_str(&border); + output.push_str(&format!( + "│ standx dashboard refresh: {: String { + let mut output = String::new(); + let sep = format!("├{}┤\n", "─".repeat(width)); + + let tickers: Vec = market + .iter() + .map(|m| { + let last: f64 = m.last_price.parse().unwrap_or(0.0); + let low: f64 = m.low_24h.parse().unwrap_or(0.0); + let change = if low > 0.0 { + ((last - low) / low) * 100.0 + } else { + 0.0 + }; + let arrow = if change > 0.0 { + "▲" + } else if change < 0.0 { + "▼" + } else { + "" + }; + format!( + "{} ${} {}{:.2}%", + m.symbol, + m.mark_price, + arrow, + change.abs() + ) + }) + .collect(); + + let tickers_str = if tickers.is_empty() { + "No market data".to_string() + } else { + tickers.join(" | ") + }; + output.push_str(&format!( + "│ TICKERS: {:, width: usize) -> String { + let mut output = String::new(); + let sep = format!("├{}┤\n", "─".repeat(width)); + + let account_str = if let Some(ref bal) = account { + format!( + "Total={} Available={} PnL={}", + bal.balance, bal.cross_available, bal.pnl_24h + ) + } else { + "Not authenticated".to_string() + }; + output.push_str(&format!( + "│ ACCOUNT: {: String { + let mut output = String::new(); + let sep = format!("├{}┤\n", "─".repeat(width)); + + output.push_str("│ POSITIONS:\n"); + if positions.is_empty() { + output.push_str("│ No open positions\n"); + } else { + for (i, p) in positions.iter().enumerate() { + let side = format!("{:?}", p.side.unwrap_or(crate::models::OrderSide::Buy)); + let pnl_arrow = if p.upnl.parse::().unwrap_or(0.0) > 0.0 { + "▲" + } else { + "▼" + }; + let line = format!( + "#{} {} {} @{} mark={} pnl={} {}", + i + 1, + p.symbol, + side, + p.entry_price, + p.mark_price, + p.upnl, + pnl_arrow + ); + output.push_str(&format!("│ {:, orders: &[Order], width: usize) -> String { + let mut output = String::new(); + + output.push_str("│ ORDER BOOK:\n"); + // Show order book depth data if available + if let Some(ref ob) = order_book { + // Show top 3 asks (sell orders) + let asks: Vec = ob.asks.iter().take(3).map(|a| format!("{} ({})", a[0], a[1])).collect(); + if !asks.is_empty() { + output.push_str(&format!( + "│ Asks: {: = ob.bids.iter().take(3).map(|b| format!("{} ({})", b[0], b[1])).collect(); + if !bids.is_empty() { + output.push_str(&format!( + "│ Bids: {: String { + let mut output = String::new(); + let sep = format!("├{}��\n", "─".repeat(width)); + let footer = format!("└{}┘\n", "─".repeat(width)); + + if compact { + output.push_str(&footer); + return output; + } + + output.push_str(&sep); + output.push_str("│ RECENT TRADES:\n"); + if trades.is_empty() { + output.push_str("│ No recent trades\n"); + } else { + for t in trades { + // Format time to HH:MM:SS from ISO format "2026-03-04T02:21:26.633550Z" + let time_short = if t.time.contains('T') { + t.time.split('T').nth(1).unwrap_or(&t.time).split('.').next().unwrap_or(&t.time) + } else { + &t.time + }; + // Use is_buyer_taker to determine side + let side = if t.is_buyer_taker { "BUY" } else { "SELL" }; + let line = format!("{} {} {} {}", time_short, t.price, t.qty, side); + output.push_str(&format!("│ {: String { + let width = 65; + + // Header + let now = chrono::Utc::now(); + let time_str = now.format("%H:%M").to_string(); + let mut output = format_dashboard_header(&time_str, width); + + // Tickers + output.push_str(&format_dashboard_tickers(&snapshot.market, width)); + + // Account + output.push_str(&format_dashboard_account(&snapshot.account, width)); + + // Positions + output.push_str(&format_dashboard_positions(&snapshot.positions, width)); + + // Order book and orders (without trailing sep) + output.push_str(&format_dashboard_orderbook(&snapshot.order_book, &snapshot.orders, width)); + + // Trades (includes footer) + output.push_str(&format_dashboard_trades(&snapshot.trades, compact, width)); + + output +} diff --git a/src/websocket.rs b/src/websocket.rs index 4f5b2e9..662c4ec 100644 --- a/src/websocket.rs +++ b/src/websocket.rs @@ -33,6 +33,7 @@ pub enum WsMessage { Position(Position), Balance(Balance), Order(Order), + Kline(KlineData), AccountUpdate(String), Error(String), Heartbeat, @@ -196,6 +197,20 @@ impl StandXWebSocket { Ok(()) } + /// Subscribe to a channel with interval (for kline) + pub async fn subscribe_with_interval(&self, channel: &str, symbol: Option<&str>, interval: Option<&str>) -> Result<()> { + let mut subs = self.subscriptions.write().await; + let topic = if let (Some(sym), Some(int)) = (symbol, interval) { + format!("{}:{}:{}", channel, sym, int) + } else if let Some(sym) = symbol { + format!("{}:{}", channel, sym) + } else { + channel.to_string() + }; + subs.push(topic); + Ok(()) + } + /// Get current state pub async fn state(&self) -> WsState { self.state.read().await.clone() @@ -274,19 +289,28 @@ async fn connect_and_run( tokio::time::sleep(std::time::Duration::from_millis(100)).await; for topic in subs.iter() { - // Parse topic to get channel and symbol (format: "price:BTC-USD") + // Parse topic to get channel, symbol, and optional interval + // Format: "price:BTC-USD" or "kline:BTC-USD:3S" let parts: Vec<&str> = topic.split(':').collect(); - let (channel, symbol) = if parts.len() == 2 { - (parts[0], parts[1]) - } else { - (topic.as_str(), "") - }; - - let sub_msg = serde_json::json!({ - "subscribe": { - "channel": channel, - "symbol": symbol + let channel = parts.first().copied().unwrap_or(topic.as_str()); + let symbol = parts.get(1).copied().unwrap_or(""); + let interval = parts.get(2).copied(); + + // Build subscription message + let mut sub_obj = serde_json::json!({ + "channel": channel, + "symbol": symbol + }); + + // Add interval for kline channel + if channel == "kline" { + if let Some(int) = interval { + sub_obj["interval"] = serde_json::json!(int); } + } + + let sub_msg = serde_json::json!({ + "subscribe": sub_obj }); if verbose { eprintln!("[WebSocket Debug] Sending subscribe: {}", sub_msg); @@ -377,6 +401,25 @@ async fn connect_and_run( let _ = message_tx.send(WsMessage::Trade(trade)).await; } } + "kline" => { + // Kline data is an array, take first element + if let Some(kline_array) = data_obj.as_array() { + if let Some(kline_item) = kline_array.first() { + if let Ok(mut kline) = + serde_json::from_value::(kline_item.clone()) + { + // Get symbol and interval from parent message + if kline.symbol.is_none() { + kline.symbol = data.get("symbol").and_then(|s| s.as_str()).map(String::from); + } + if kline.interval.is_none() { + kline.interval = data.get("interval").and_then(|i| i.as_str()).map(String::from); + } + let _ = message_tx.send(WsMessage::Kline(kline)).await; + } + } + } + } "order" | "position" | "balance" | "trade" => { if verbose { eprintln!( From e06e23c40aa978d33e6a63ec4a77e02d7531b6b4 Mon Sep 17 00:00:00 2001 From: Nova Date: Fri, 13 Mar 2026 22:40:52 +0800 Subject: [PATCH 2/3] style: fix formatting --- src/commands.rs | 5 +- src/output.rs | 364 ++++++++++++++++++++--------------------------- src/websocket.rs | 30 ++-- 3 files changed, 180 insertions(+), 219 deletions(-) diff --git a/src/commands.rs b/src/commands.rs index 8c8dac4..52662e8 100644 --- a/src/commands.rs +++ b/src/commands.rs @@ -763,7 +763,8 @@ pub async fn handle_stream(command: StreamCommands, verbose: bool) -> Result<()> let ws = StandXWebSocket::without_auth_with_verbose(verbose)?; // Subscribe with interval parameter embedded in topic let topic = format!("{}:{}:{}", "kline", symbol, interval); - ws.subscribe_with_interval("kline", Some(&symbol), Some(&interval)).await?; + ws.subscribe_with_interval("kline", Some(&symbol), Some(&interval)) + .await?; let mut rx = ws.connect().await?; println!("Streaming kline for {} [{}]", symbol, interval); @@ -775,7 +776,7 @@ pub async fn handle_stream(command: StreamCommands, verbose: bool) -> Result<()> let time_str = chrono::DateTime::from_timestamp_millis(data.time) .map(|dt| dt.format("%H:%M:%S").to_string()) .unwrap_or_else(|| data.time.to_string()); - + println!( "📊 Kline: {} [{}] {}\nO: {} H: {} L: {} C: {} Vol: {:.3}", data.symbol.unwrap_or_default(), diff --git a/src/output.rs b/src/output.rs index b2ed895..9c9297c 100644 --- a/src/output.rs +++ b/src/output.rs @@ -1,46 +1,8 @@ //! Output formatting utilities use crate::models::*; -use chrono::{DateTime, Local, TimeZone, Utc}; use tabled::{Table as TabledTable, Tabled}; -fn format_trade_time_short(raw: &str) -> String { - if raw.is_empty() { - return String::new(); - } - - // Handle unix timestamps from API/websocket (seconds or milliseconds). - if let Ok(ts) = raw.parse::() { - let dt_utc = if raw.len() >= 13 { - Utc.timestamp_millis_opt(ts).single() - } else { - Utc.timestamp_opt(ts, 0).single() - }; - if let Some(dt) = dt_utc { - return dt.with_timezone(&Local).format("%H:%M:%S").to_string(); - } - } - - // Handle RFC3339-like strings: "2026-03-04T02:21:26.633550Z" - if let Ok(dt) = DateTime::parse_from_rfc3339(raw) { - return dt.with_timezone(&Local).format("%H:%M:%S").to_string(); - } - - // Fallback to the previous best-effort splitter. - if raw.contains('T') { - return raw - .split('T') - .nth(1) - .unwrap_or(raw) - .split('.') - .next() - .unwrap_or(raw) - .to_string(); - } - - raw.to_string() -} - /// Format data as table pub fn format_table(data: Vec) -> String { TabledTable::new(data).to_string() @@ -257,182 +219,85 @@ mod tests { /// Format dashboard as MVP compact view (Issue #156) pub fn format_dashboard_mvp(snapshot: &DashboardSnapshot, compact: bool) -> String { let mut output = String::new(); - let width = std::env::var("COLUMNS") - .ok() - .and_then(|v| v.parse::().ok()) - .map(|v| v.saturating_sub(2)) - .filter(|v| *v >= 60) - .unwrap_or(78); + let width = 65; // Helper for border let border = || format!("┌{}┐\n", "─".repeat(width)); let sep = || format!("├{}┤\n", "─".repeat(width)); let footer = || format!("└{}┘\n", "─".repeat(width)); - let truncate_pad = |text: &str, target_width: usize| -> String { - let mut chars: Vec = text.chars().collect(); - if chars.len() > target_width { - if target_width > 3 { - chars.truncate(target_width - 3); - let mut trimmed: String = chars.into_iter().collect(); - trimmed.push_str("..."); - return format!("{: String { truncate_pad(text, width) }; - let push_line = |out: &mut String, text: &str| { - out.push_str(&format!("│{}│\n", fit(text))); - }; - let left_w = (width.saturating_sub(1)) / 2; - let right_w = width.saturating_sub(1 + left_w); - let push_two_col = |out: &mut String, left: &str, right: &str| { - let l = truncate_pad(left, left_w); - let r = truncate_pad(right, right_w); - out.push_str(&format!("│{}│{}│\n", l, r)); - }; // Header - let now = chrono::Local::now(); - let time_str = now.format("%H:%M:%S").to_string(); + let now = chrono::Utc::now(); + let time_str = now.format("%H:%M").to_string(); output.push_str(&border()); - let title = format!(" StandX CLI Dashboard v{}", env!("CARGO_PKG_VERSION")); - let right = format!("REFRESH: {}", time_str); - let spacing = width.saturating_sub(title.chars().count() + right.chars().count()); - output.push_str(&format!("│{}{}{}│\n", title, " ".repeat(spacing), right)); + output.push_str(&format!( + "│ standx dashboard refresh: {: = snapshot + let tickers: Vec = snapshot .market .iter() .map(|m| { - let change_display = m - .change_24h_percent - .parse::() - .ok() - .map(|change| { - let arrow = if change > 0.0 { - "▲" - } else if change < 0.0 { - "▼" - } else { - "" - }; - format!("{} {:.2}%", arrow, change.abs()) - }) - .unwrap_or_else(|| "N/A".to_string()); - format!("{} ${} {}", m.symbol, m.mark_price, change_display) + let last: f64 = m.last_price.parse().unwrap_or(0.0); + let low: f64 = m.low_24h.parse().unwrap_or(0.0); + let change = if low > 0.0 { + ((last - low) / low) * 100.0 + } else { + 0.0 + }; + let arrow = if change > 0.0 { + "▲" + } else if change < 0.0 { + "▼" + } else { + "" + }; + format!( + "{} ${} {}{:.2}%", + m.symbol, + m.mark_price, + arrow, + change.abs() + ) }) .collect(); - push_line(&mut output, " TICKERS:"); - if ticker_items.is_empty() { - push_line(&mut output, " No market data"); + let tickers_str = if tickers.is_empty() { + "No market data".to_string() } else { - for row in ticker_items.chunks(2) { - push_line(&mut output, &format!(" {}", row.join(" | "))); - } - } + tickers.join(" | ") + }; + output.push_str(&format!( + "│ TICKERS: {: String { - v.parse::() - .map(|n| format!("{:.2}", n)) - .unwrap_or_else(|_| v.to_string()) - }; let account_str = if let Some(ref bal) = snapshot.account { format!( - "Total={} Available={} uPnL={} rPnL={}", - fmt2(&bal.balance), - fmt2(&bal.cross_available), - fmt2(&bal.upnl), - fmt2(&snapshot.total_realized_pnl) + "Total={} Available={} PnL={}", + bal.balance, bal.cross_available, bal.pnl_24h ) } else { "Not authenticated".to_string() }; - push_line(&mut output, &format!(" ACCOUNT: {}", account_str)); + output.push_str(&format!( + "│ ACCOUNT: {: String { - v.parse::() - .map(|n| format!("{:>10.2}", n)) - .unwrap_or_else(|_| format!("{:>10}", v)) - }; - let fmt_book_qty = |v: &str| -> String { - v.parse::() - .map(|n| format!("{:>9.4}", n)) - .unwrap_or_else(|_| format!("{:>9}", v)) - }; - let mut order_book_lines: Vec = Vec::new(); - if let Some(ref ob) = snapshot.order_book { - order_book_lines.push(format!(" ORDER BOOK ({}):", ob.symbol)); - if ob.asks.is_empty() { - order_book_lines.push(" No asks".to_string()); - } else { - for ask in ob.asks.iter().take(3).rev() { - let price = fmt_book_price(&ask[0]); - let qty = fmt_book_qty(&ask[1]); - order_book_lines.push(format!(" {} {} ASK", price, qty)); - } - } - order_book_lines.push(" ---- spread ----".to_string()); - if ob.bids.is_empty() { - order_book_lines.push(" No bids".to_string()); - } else { - for bid in ob.bids.iter().take(3) { - let price = fmt_book_price(&bid[0]); - let qty = fmt_book_qty(&bid[1]); - order_book_lines.push(format!(" {} {} BID", price, qty)); - } - } - } else { - order_book_lines.push(" ORDER BOOK: unavailable".to_string()); - } - - let mut trade_lines: Vec = Vec::new(); - if !compact { - trade_lines.push(" RECENT TRADES:".to_string()); - if snapshot.trades.is_empty() { - trade_lines.push(" No recent trades".to_string()); - } else { - for t in &snapshot.trades { - let time_short = format_trade_time_short(&t.time); - let side = if t.is_buyer_taker { "BUY" } else { "SELL" }; - trade_lines.push(format!(" {} {} {} {}", time_short, t.price, t.qty, side)); - } - } - } - - // ORDER BOOK + RECENT TRADES (2-column when enough space) - if !compact && width >= 66 { - let max_rows = order_book_lines.len().max(trade_lines.len()); - for i in 0..max_rows { - let left = order_book_lines.get(i).map_or("", String::as_str); - let right = trade_lines.get(i).map_or("", String::as_str); - push_two_col(&mut output, left, right); - } - output.push_str(&sep()); - } else { - for line in &order_book_lines { - push_line(&mut output, line); - } - output.push_str(&sep()); - if !compact { - for line in &trade_lines { - push_line(&mut output, line); - } - output.push_str(&sep()); - } - } - - // POSITIONS (moved near bottom) - push_line(&mut output, " POSITIONS:"); + // POSITIONS + output.push_str("│ POSITIONS:\n"); if snapshot.positions.is_empty() { - push_line(&mut output, " No open positions"); + output.push_str("│ No open positions\n"); } else { for (i, p) in snapshot.positions.iter().enumerate() { let side = format!("{:?}", p.side.unwrap_or(crate::models::OrderSide::Buy)); @@ -451,40 +316,99 @@ pub fn format_dashboard_mvp(snapshot: &DashboardSnapshot, compact: bool) -> Stri p.upnl, pnl_arrow ); - push_line(&mut output, &format!(" {}", line)); + output.push_str(&format!("│ {: = ob + .asks + .iter() + .take(3) + .map(|a| format!("{} ({})", a[0], a[1])) + .collect(); + if !asks.is_empty() { + output.push_str(&format!( + "│ Asks: {: = ob + .bids + .iter() + .take(3) + .map(|b| format!("{} ({})", b[0], b[1])) + .collect(); + if !bids.is_empty() { + output.push_str(&format!( + "│ Bids: {:().unwrap_or(-1.0).abs() < f64::EPSILON { - "All".to_string() - } else { - o.qty.clone() - }; let line = format!( - "#{} {} {} {} @{}", + "#{} {} {:?} {} @{}", i + 1, o.symbol, - side, - qty_display, + o.side, + o.qty, o.price ); - push_line(&mut output, &format!(" {}", line)); + output.push_str(&format!("│ {: Strin } /// Format order book and active orders section -pub fn format_dashboard_orderbook(order_book: &Option, orders: &[Order], width: usize) -> String { +pub fn format_dashboard_orderbook( + order_book: &Option, + orders: &[Order], + width: usize, +) -> String { let mut output = String::new(); output.push_str("│ ORDER BOOK:\n"); // Show order book depth data if available if let Some(ref ob) = order_book { // Show top 3 asks (sell orders) - let asks: Vec = ob.asks.iter().take(3).map(|a| format!("{} ({})", a[0], a[1])).collect(); + let asks: Vec = ob + .asks + .iter() + .take(3) + .map(|a| format!("{} ({})", a[0], a[1])) + .collect(); if !asks.is_empty() { output.push_str(&format!( "│ Asks: {:, orders: &[Orde )); } // Show top 3 bids (buy orders) - let bids: Vec = ob.bids.iter().take(3).map(|b| format!("{} ({})", b[0], b[1])).collect(); + let bids: Vec = ob + .bids + .iter() + .take(3) + .map(|b| format!("{} ({})", b[0], b[1])) + .collect(); if !bids.is_empty() { output.push_str(&format!( "│ Bids: {: for t in trades { // Format time to HH:MM:SS from ISO format "2026-03-04T02:21:26.633550Z" let time_short = if t.time.contains('T') { - t.time.split('T').nth(1).unwrap_or(&t.time).split('.').next().unwrap_or(&t.time) + t.time + .split('T') + .nth(1) + .unwrap_or(&t.time) + .split('.') + .next() + .unwrap_or(&t.time) } else { &t.time }; @@ -719,7 +663,11 @@ pub fn render_dashboard(snapshot: &DashboardSnapshot, compact: bool) -> String { output.push_str(&format_dashboard_positions(&snapshot.positions, width)); // Order book and orders (without trailing sep) - output.push_str(&format_dashboard_orderbook(&snapshot.order_book, &snapshot.orders, width)); + output.push_str(&format_dashboard_orderbook( + &snapshot.order_book, + &snapshot.orders, + width, + )); // Trades (includes footer) output.push_str(&format_dashboard_trades(&snapshot.trades, compact, width)); diff --git a/src/websocket.rs b/src/websocket.rs index 662c4ec..d621509 100644 --- a/src/websocket.rs +++ b/src/websocket.rs @@ -198,7 +198,12 @@ impl StandXWebSocket { } /// Subscribe to a channel with interval (for kline) - pub async fn subscribe_with_interval(&self, channel: &str, symbol: Option<&str>, interval: Option<&str>) -> Result<()> { + pub async fn subscribe_with_interval( + &self, + channel: &str, + symbol: Option<&str>, + interval: Option<&str>, + ) -> Result<()> { let mut subs = self.subscriptions.write().await; let topic = if let (Some(sym), Some(int)) = (symbol, interval) { format!("{}:{}:{}", channel, sym, int) @@ -301,14 +306,14 @@ async fn connect_and_run( "channel": channel, "symbol": symbol }); - + // Add interval for kline channel if channel == "kline" { if let Some(int) = interval { sub_obj["interval"] = serde_json::json!(int); } } - + let sub_msg = serde_json::json!({ "subscribe": sub_obj }); @@ -405,17 +410,24 @@ async fn connect_and_run( // Kline data is an array, take first element if let Some(kline_array) = data_obj.as_array() { if let Some(kline_item) = kline_array.first() { - if let Ok(mut kline) = - serde_json::from_value::(kline_item.clone()) - { + if let Ok(mut kline) = serde_json::from_value::( + kline_item.clone(), + ) { // Get symbol and interval from parent message if kline.symbol.is_none() { - kline.symbol = data.get("symbol").and_then(|s| s.as_str()).map(String::from); + kline.symbol = data + .get("symbol") + .and_then(|s| s.as_str()) + .map(String::from); } if kline.interval.is_none() { - kline.interval = data.get("interval").and_then(|i| i.as_str()).map(String::from); + kline.interval = data + .get("interval") + .and_then(|i| i.as_str()) + .map(String::from); } - let _ = message_tx.send(WsMessage::Kline(kline)).await; + let _ = + message_tx.send(WsMessage::Kline(kline)).await; } } } From 85c1bd83f96f865b6bd70e615f00741580fe40bd Mon Sep 17 00:00:00 2001 From: Nova Date: Fri, 13 Mar 2026 22:51:28 +0800 Subject: [PATCH 3/3] fix: remove unused variable in kline handler --- Cargo.lock | 2 +- src/commands.rs | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5d9c4a5..0ff79a3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2137,7 +2137,7 @@ checksum = "6ce2be8dc25455e1f91df71bfa12ad37d7af1092ae736f3a6cd0e37bc7810596" [[package]] name = "standx-cli" -version = "0.6.3-rc.3" +version = "0.7.0" dependencies = [ "anyhow", "assert_cmd", diff --git a/src/commands.rs b/src/commands.rs index 52662e8..970cbec 100644 --- a/src/commands.rs +++ b/src/commands.rs @@ -762,7 +762,6 @@ pub async fn handle_stream(command: StreamCommands, verbose: bool) -> Result<()> StreamCommands::Kline { symbol, interval } => { let ws = StandXWebSocket::without_auth_with_verbose(verbose)?; // Subscribe with interval parameter embedded in topic - let topic = format!("{}:{}:{}", "kline", symbol, interval); ws.subscribe_with_interval("kline", Some(&symbol), Some(&interval)) .await?; let mut rx = ws.connect().await?;