From 5110b1b2a982b971d854f21716a261fa04415114 Mon Sep 17 00:00:00 2001 From: auraroom Date: Thu, 28 May 2026 21:53:31 +0100 Subject: [PATCH 1/8] feat(stream): add exponential backoff for event stream reconnects Introduce a Backoff helper used when Soroban getEvents polling fails so monitor can retry with increasing delays instead of hammering the RPC. Co-authored-by: Cursor --- src/utils/stream.rs | 32 ++++++++++++++++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/src/utils/stream.rs b/src/utils/stream.rs index b887657f..fef35392 100644 --- a/src/utils/stream.rs +++ b/src/utils/stream.rs @@ -1,5 +1,4 @@ use anyhow::{Context, Result}; -use rand::Rng; use serde::Deserialize; use std::thread; use std::time::Duration; @@ -13,6 +12,36 @@ pub struct SorobanEventStream { backoff: Backoff, } +#[derive(Debug, Clone)] +struct Backoff { + attempt: u32, + base_ms: u64, + max_ms: u64, +} + +impl Default for Backoff { + fn default() -> Self { + Self { + attempt: 0, + base_ms: 500, + max_ms: 30_000, + } + } +} + +impl Backoff { + fn reset(&mut self) { + self.attempt = 0; + } + + fn next_delay(&mut self) -> Duration { + let exp = self.attempt.min(6); + self.attempt = self.attempt.saturating_add(1); + let ms = (self.base_ms.saturating_mul(1_u64 << exp)).min(self.max_ms); + Duration::from_millis(ms) + } +} + impl SorobanEventStream { pub fn new(rpc_url: String, contract_id: String) -> Self { Self { @@ -101,7 +130,6 @@ pub struct SorobanEvent { pub ledger: u32, pub id: String, #[serde(default)] - #[allow(dead_code)] pub topic: Vec, pub value: serde_json::Value, } From f65ad965c9308a02d7e444730416cb530393a474 Mon Sep 17 00:00:00 2001 From: auraroom Date: Thu, 28 May 2026 21:53:40 +0100 Subject: [PATCH 2/8] feat(stream): add EventStreamFilters for Soroban getEvents RPC Add filter struct and builder methods so callers can set event type, topic segments, and value matching before polling getEvents. Co-authored-by: Cursor --- src/utils/stream.rs | 114 ++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 109 insertions(+), 5 deletions(-) diff --git a/src/utils/stream.rs b/src/utils/stream.rs index fef35392..f2440785 100644 --- a/src/utils/stream.rs +++ b/src/utils/stream.rs @@ -1,8 +1,18 @@ use anyhow::{Context, Result}; +use base64::{engine::general_purpose::STANDARD as BASE64, Engine}; use serde::Deserialize; +use stellar_xdr::curr::{Limited, Limits, ScSymbol, ScVal, WriteXdr}; use std::thread; use std::time::Duration; +/// RPC and client-side filters for Soroban `getEvents`. +#[derive(Debug, Clone, Default)] +pub struct EventStreamFilters { + pub event_type: Option, + pub topic_segments: Option>, + pub value_match: Option, +} + #[derive(Debug, Clone)] pub struct SorobanEventStream { rpc_url: String, @@ -10,6 +20,7 @@ pub struct SorobanEventStream { cursor: Option, poll_interval: Duration, backoff: Backoff, + filters: EventStreamFilters, } #[derive(Debug, Clone)] @@ -50,6 +61,7 @@ impl SorobanEventStream { cursor: None, poll_interval: Duration::from_secs(2), backoff: Backoff::default(), + filters: EventStreamFilters::default(), } } @@ -58,16 +70,34 @@ impl SorobanEventStream { self } + pub fn with_filters(mut self, filters: EventStreamFilters) -> Self { + self.filters = filters; + self + } + + pub fn with_event_type(mut self, event_type: impl Into) -> Self { + self.filters.event_type = Some(event_type.into()); + self + } + + pub fn with_topic_segments(mut self, segments: Vec) -> Self { + self.filters.topic_segments = Some(segments); + self + } + + pub fn with_value_match(mut self, pattern: impl Into) -> Self { + self.filters.value_match = Some(pattern.into()); + self + } + pub fn next_batch(&mut self) -> Result> { + let filter = self.build_rpc_filter(); let request = serde_json::json!({ "jsonrpc": "2.0", "id": 1, "method": "getEvents", "params": { - "filters": [{ - "type": "contract", - "contractIds": [self.contract_id], - }], + "filters": [filter], "pagination": { "cursor": self.cursor, "limit": 10 @@ -98,7 +128,14 @@ impl SorobanEventStream { self.cursor = result.cursor; self.backoff.reset(); - Ok(result.events) + + let events = result + .events + .into_iter() + .filter(|event| event_matches_value(event, &self.filters)) + .collect(); + + Ok(events) } pub fn sleep(&self) { @@ -108,6 +145,73 @@ impl SorobanEventStream { pub fn sleep_backoff(&mut self) { thread::sleep(self.backoff.next_delay()); } + + fn build_rpc_filter(&self) -> serde_json::Value { + let event_type = self + .filters + .event_type + .as_deref() + .unwrap_or("contract"); + + let mut filter = serde_json::json!({ + "type": event_type, + "contractIds": [self.contract_id], + }); + + if let Some(ref segments) = self.filters.topic_segments { + let encoded: Result> = + segments.iter().map(|s| encode_topic_segment(s)).collect(); + if let Ok(topic_row) = encoded { + if !topic_row.is_empty() { + filter["topics"] = serde_json::json!([topic_row]); + } + } + } + + filter + } +} + +fn event_matches_value(event: &SorobanEvent, filters: &EventStreamFilters) -> bool { + let Some(ref pattern) = filters.value_match else { + return true; + }; + if pattern.is_empty() { + return true; + } + let haystack = event.value.to_string().to_lowercase(); + haystack.contains(&pattern.to_lowercase()) +} + +fn encode_topic_segment(segment: &str) -> Result { + let trimmed = segment.trim(); + if trimmed.is_empty() { + anyhow::bail!("topic segment cannot be empty"); + } + if trimmed == "*" || trimmed == "**" { + return Ok(trimmed.to_string()); + } + if looks_like_base64(trimmed) { + return Ok(trimmed.to_string()); + } + + let symbol = ScSymbol( + trimmed + .as_bytes() + .try_into() + .with_context(|| format!("invalid topic symbol '{}'", trimmed))?, + ); + let scval = ScVal::Symbol(symbol); + let mut bytes = Vec::new(); + scval.write_xdr(&mut Limited::new(&mut bytes, Limits::none()))?; + Ok(BASE64.encode(bytes)) +} + +fn looks_like_base64(value: &str) -> bool { + value.len() >= 8 + && value + .chars() + .all(|c| c.is_ascii_alphanumeric() || c == '+' || c == '/' || c == '=') } #[derive(Debug, Deserialize)] From a169f2f42c8d56dd7636a2421962b60392b34f32 Mon Sep 17 00:00:00 2001 From: auraroom Date: Thu, 28 May 2026 21:53:43 +0100 Subject: [PATCH 3/8] feat(monitor): add --type, --topic, --value, and --balance-alert flags Extend the monitor CLI with Soroban event filter options and a watchman threshold for low wallet balance alerts. Co-authored-by: Cursor --- src/commands/monitor.rs | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/src/commands/monitor.rs b/src/commands/monitor.rs index 99646e7e..e4fcad9c 100644 --- a/src/commands/monitor.rs +++ b/src/commands/monitor.rs @@ -20,6 +20,18 @@ pub struct MonitorArgs { #[arg(long)] pub follow: bool, + /// Soroban event type filter (contract, system, diagnostic) + #[arg(long = "type")] + pub event_type: Option, + + /// Topic filter: comma-separated segment matchers (* wildcards supported) + #[arg(long)] + pub topic: Option, + + /// Match emitted event value (substring match on JSON payload) + #[arg(long)] + pub value: Option, + /// Wallet name from starforge config to monitor #[arg(long)] pub wallet: Option, @@ -28,6 +40,10 @@ pub struct MonitorArgs { #[arg(long)] pub threshold: Option, + /// Alert when wallet XLM balance drops below this amount (watchman) + #[arg(long)] + pub balance_alert: Option, + /// Network to use (overrides config) #[arg(long)] pub network: Option, From b2d0913bc163a0ddb2433b2821a6fb9f926955a6 Mon Sep 17 00:00:00 2001 From: auraroom Date: Thu, 28 May 2026 21:53:59 +0100 Subject: [PATCH 4/8] feat(notifications): add alert helper for watchman notifications Emit a prominent terminal warning, bell, and OS notification when the monitor watchman detects a low wallet balance. Co-authored-by: Cursor --- src/utils/notifications.rs | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/src/utils/notifications.rs b/src/utils/notifications.rs index aa919f9b..e19ca9c4 100644 --- a/src/utils/notifications.rs +++ b/src/utils/notifications.rs @@ -1,4 +1,5 @@ use colored::*; +use std::process::Command; pub fn info(message: &str) { println!(" {} {}", "•".bright_blue(), message); @@ -11,3 +12,35 @@ pub fn success(message: &str) { pub fn warn(message: &str) { eprintln!(" {} {}", "!".yellow().bold(), message); } + +/// Terminal alert with optional OS notification (watchman). +pub fn alert(message: &str) { + eprintln!( + "\n {} {}\n", + "⚠ ALERT:".red().bold(), + message.bright_white().bold() + ); + print!("\x07"); + let _ = std::io::Write::flush(&mut std::io::stdout()); + try_system_notification(message); +} + +fn try_system_notification(message: &str) { + let escaped = message.replace('\\', "\\\\").replace('"', "\\\""); + + #[cfg(target_os = "macos")] + { + let script = format!( + "display notification \"{}\" with title \"StarForge\"", + escaped + ); + let _ = Command::new("osascript").args(["-e", &script]).status(); + } + + #[cfg(target_os = "linux")] + { + let _ = Command::new("notify-send") + .args(["StarForge", message]) + .status(); + } +} From 4cafb9bc1d2003776035a1eef77063d90fb7f4e6 Mon Sep 17 00:00:00 2001 From: auraroom Date: Thu, 28 May 2026 21:54:08 +0100 Subject: [PATCH 5/8] feat(monitor): route advanced filter flags into contract monitoring Pass --type, --topic, --value, and --follow from the CLI into the contract event monitor so Soroban getEvents filters can be applied server-side. Co-authored-by: Cursor --- src/commands/monitor.rs | 28 +++++++++++++++++++++------- 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/src/commands/monitor.rs b/src/commands/monitor.rs index e4fcad9c..d4289d2e 100644 --- a/src/commands/monitor.rs +++ b/src/commands/monitor.rs @@ -1,4 +1,7 @@ -use crate::utils::{config, horizon, notifications, print as p, soroban, stream::SorobanEventStream}; +use crate::utils::{ + config, horizon, notifications, print as p, soroban, + stream::{EventStreamFilters, SorobanEventStream}, +}; use anyhow::Result; use clap::Args; use std::sync::{ @@ -65,12 +68,23 @@ pub fn handle(args: MonitorArgs) -> Result<()> { println!(); match (&args.contract, &args.wallet) { - (Some(contract_id), None) => { - monitor_contract(contract_id, args.events.as_deref(), network, args.interval) - } - (None, Some(wallet_name)) => { - monitor_wallet(wallet_name, args.threshold, network, args.interval) - } + (Some(contract_id), None) => monitor_contract( + contract_id, + args.events.as_deref(), + args.event_type.as_deref(), + args.topic.as_deref(), + args.value.as_deref(), + network, + args.interval, + args.follow, + ), + (None, Some(wallet_name)) => monitor_wallet( + wallet_name, + args.threshold, + args.balance_alert, + network, + args.interval, + ), _ => anyhow::bail!("Specify either --contract or --wallet (but not both)"), } } From 6c976e2e8d33c990e817f806ab04c92963d75dbf Mon Sep 17 00:00:00 2001 From: auraroom Date: Thu, 28 May 2026 21:54:24 +0100 Subject: [PATCH 6/8] feat(monitor): apply Soroban getEvents filters for contract streaming Build EventStreamFilters from --type, --topic, and --value, then poll the RPC with server-side filters plus legacy --events matching. Co-authored-by: Cursor --- src/commands/monitor.rs | 82 ++++++++++++++++++++++++++++++++--------- 1 file changed, 64 insertions(+), 18 deletions(-) diff --git a/src/commands/monitor.rs b/src/commands/monitor.rs index d4289d2e..c27faae3 100644 --- a/src/commands/monitor.rs +++ b/src/commands/monitor.rs @@ -92,19 +92,46 @@ pub fn handle(args: MonitorArgs) -> Result<()> { fn monitor_contract( contract_id: &str, events_filter: Option<&str>, + event_type: Option<&str>, + topic: Option<&str>, + value: Option<&str>, network: &str, interval: u64, follow: bool, ) -> Result<()> { config::validate_contract_id(contract_id)?; - let filter_set: Option> = events_filter.map(|s| { + let legacy_filter_set: Option> = events_filter.map(|s| { s.split(',') .map(|x| x.trim().to_lowercase()) .filter(|x| !x.is_empty()) .collect() }); + let mut stream_filters = EventStreamFilters::default(); + if let Some(t) = event_type { + let normalized = t.trim().to_lowercase(); + if !normalized.is_empty() { + stream_filters.event_type = Some(normalized); + } + } + if let Some(topic_filter) = topic { + let segments: Vec = topic_filter + .split(',') + .map(|s| s.trim().to_string()) + .filter(|s| !s.is_empty()) + .collect(); + if !segments.is_empty() { + stream_filters.topic_segments = Some(segments); + } + } + if let Some(value_match) = value { + let trimmed = value_match.trim(); + if !trimmed.is_empty() { + stream_filters.value_match = Some(trimmed.to_string()); + } + } + let rpc_url = soroban::rpc_url(network); notifications::info(&format!( @@ -112,27 +139,46 @@ fn monitor_contract( rpc_url )); - let mut stream = - SorobanEventStream::new(rpc_url, contract_id.to_string()).with_poll_interval(interval); - loop { - let batch = stream.next_batch()?; - for event in batch { - let as_text = event.value.to_string(); - if let Some(ref filters) = filter_set { - let mut matches = false; - for f in filters { - if as_text.to_lowercase().contains(f) { - matches = true; - break; + let running = Arc::new(AtomicBool::new(true)); + { + let running = Arc::clone(&running); + ctrlc::set_handler(move || { + running.store(false, Ordering::SeqCst); + })?; + } + + let mut stream = SorobanEventStream::new(rpc_url, contract_id.to_string()) + .with_poll_interval(interval) + .with_filters(stream_filters); + + let mut printed_any = false; + + while running.load(Ordering::SeqCst) { + match stream.next_batch() { + Ok(batch) => { + for event in batch { + let as_text = event.value.to_string(); + let topic_text = event.topic.join(","); + let matches_legacy = legacy_filter_set.as_ref().is_none_or(|filters| { + filters.iter().any(|f| { + as_text.to_lowercase().contains(f) + || topic_text.to_lowercase().contains(f) + }) + }); + + if matches_legacy { + printed_any = true; + notifications::success(&format!( + "Ledger {} event {}: {}", + event.ledger, event.id, as_text + )); } - printed_any = true; - notifications::success(&format!( - "Ledger {} event {}: {}", - event.ledger, event.id, as_text - )); } if !follow { + if !printed_any { + notifications::warn("No matching events in the latest batch."); + } break; } stream.sleep(); From f1bf78c1f6f35e0f5591150d8d07577356061e3b Mon Sep 17 00:00:00 2001 From: auraroom Date: Thu, 28 May 2026 21:54:45 +0100 Subject: [PATCH 7/8] feat(monitor): enable balance-alert watchman for wallet monitoring Validate --balance-alert, announce watchman mode, and support graceful shutdown while polling wallet balances. Co-authored-by: Cursor --- src/commands/monitor.rs | 31 ++++++++++++++++++++++++++++--- 1 file changed, 28 insertions(+), 3 deletions(-) diff --git a/src/commands/monitor.rs b/src/commands/monitor.rs index c27faae3..79e778d3 100644 --- a/src/commands/monitor.rs +++ b/src/commands/monitor.rs @@ -202,6 +202,7 @@ fn monitor_contract( fn monitor_wallet( wallet_name: &str, threshold: Option, + balance_alert: Option, network: &str, interval: u64, ) -> Result<()> { @@ -213,8 +214,20 @@ fn monitor_wallet( .ok_or_else(|| anyhow::anyhow!("Wallet '{}' not found", wallet_name))?; let threshold = threshold.unwrap_or(0.0); - if threshold <= 0.0 { - notifications::warn("No --threshold provided; will print balance changes only."); + if threshold <= 0.0 && balance_alert.is_none() { + notifications::warn( + "No --threshold or --balance-alert provided; will print balance changes only.", + ); + } + + if let Some(alert_level) = balance_alert { + if alert_level <= 0.0 { + anyhow::bail!("--balance-alert must be greater than zero"); + } + notifications::info(&format!( + "Watchman enabled: alert when balance drops below {:.7} XLM.", + alert_level + )); } notifications::info(&format!( @@ -223,7 +236,17 @@ fn monitor_wallet( )); let mut last_balance: Option = None; - loop { + let mut low_balance_alerted = false; + + let running = Arc::new(AtomicBool::new(true)); + { + let running = Arc::clone(&running); + ctrlc::set_handler(move || { + running.store(false, Ordering::SeqCst); + })?; + } + + while running.load(Ordering::SeqCst) { let account = horizon::fetch_account(&wallet.public_key, network)?; let native = account .balances @@ -249,4 +272,6 @@ fn monitor_wallet( std::thread::sleep(std::time::Duration::from_secs(interval.max(1))); } + + Ok(()) } From a8d7d0696f02ce038dbe90d6e978fc2ece02b2e6 Mon Sep 17 00:00:00 2001 From: auraroom Date: Thu, 28 May 2026 21:54:50 +0100 Subject: [PATCH 8/8] feat(monitor): fire watchman alert when balance drops below threshold Notify the terminal and OS when wallet XLM falls under --balance-alert, and reset the latch after the balance recovers. Co-authored-by: Cursor --- src/commands/monitor.rs | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/src/commands/monitor.rs b/src/commands/monitor.rs index 79e778d3..ec8d0185 100644 --- a/src/commands/monitor.rs +++ b/src/commands/monitor.rs @@ -270,6 +270,20 @@ fn monitor_wallet( )); } + if let Some(alert_level) = balance_alert { + if native < alert_level { + if !low_balance_alerted { + notifications::alert(&format!( + "Balance {:.7} XLM dropped below watchman threshold {:.7} XLM", + native, alert_level + )); + low_balance_alerted = true; + } + } else { + low_balance_alerted = false; + } + } + std::thread::sleep(std::time::Duration::from_secs(interval.max(1))); }