diff --git a/Cargo.lock b/Cargo.lock index dc83ae0..189931f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -43,6 +43,15 @@ dependencies = [ "subtle", ] +[[package]] +name = "aho-corasick" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddd31a130427c27518df266943a5308ed92d4b226cc639f5a8f1002816174301" +dependencies = [ + "memchr", +] + [[package]] name = "allocator-api2" version = "0.2.21" @@ -619,6 +628,15 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.21" @@ -1647,6 +1665,9 @@ dependencies = [ "sha2", "tokio", "toml", + "tracing", + "tracing-appender", + "tracing-subscriber", ] [[package]] @@ -2250,6 +2271,15 @@ dependencies = [ "syn", ] +[[package]] +name = "matchers" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1525a2a28c7f4fa0fc98bb91ae755d1e2d1505079e05539e35bc876b5d65ae9" +dependencies = [ + "regex-automata", +] + [[package]] name = "matchit" version = "0.8.4" @@ -2449,6 +2479,15 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "nu-ansi-term" +version = "0.50.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" +dependencies = [ + "windows-sys 0.61.2", +] + [[package]] name = "num-bigint" version = "0.4.6" @@ -2924,6 +2963,23 @@ dependencies = [ "bitflags", ] +[[package]] +name = "regex-automata" +version = "0.4.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e1dd4122fc1595e8162618945476892eefca7b88c52820e74af6262213cae8f" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", +] + +[[package]] +name = "regex-syntax" +version = "0.8.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc897dd8d9e8bd1ed8cdad82b5966c3e0ecae09fb1907d58efaa013543185d0a" + [[package]] name = "reqwest" version = "0.12.28" @@ -3271,6 +3327,15 @@ dependencies = [ "digest", ] +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + [[package]] name = "shlex" version = "1.3.0" @@ -3531,6 +3596,15 @@ dependencies = [ "syn", ] +[[package]] +name = "thread_local" +version = "1.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f60246a4944f24f6e018aa17cdeffb7818b76356965d03b07d6a9886e8962185" +dependencies = [ + "cfg-if", +] + [[package]] name = "time" version = "0.3.47" @@ -3750,6 +3824,18 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-appender" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "786d480bce6247ab75f005b14ae1624ad978d3029d9113f0a22fa1ac773faeaf" +dependencies = [ + "crossbeam-channel", + "thiserror 2.0.18", + "time", + "tracing-subscriber", +] + [[package]] name = "tracing-attributes" version = "0.1.31" @@ -3768,6 +3854,36 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "db97caf9d906fbde555dd62fa95ddba9eecfd14cb388e4f491a66d74cd5fb79a" dependencies = [ "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb7f578e5945fb242538965c2d0b04418d38ec25c79d160cd279bf0731c8d319" +dependencies = [ + "matchers", + "nu-ansi-term", + "once_cell", + "regex-automata", + "sharded-slab", + "smallvec", + "thread_local", + "tracing", + "tracing-core", + "tracing-log", ] [[package]] @@ -3900,6 +4016,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "valuable" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" + [[package]] name = "version_check" version = "0.9.5" diff --git a/crates/jaunt-host/Cargo.toml b/crates/jaunt-host/Cargo.toml index 0b09ca2..2a330a1 100644 --- a/crates/jaunt-host/Cargo.toml +++ b/crates/jaunt-host/Cargo.toml @@ -19,9 +19,12 @@ serde_json = "1" rmp-serde = "1" toml = "0.8" hostname = "0.4" -nix = { version = "0.29", features = ["user"] } +nix = { version = "0.29", features = ["user", "signal", "process"] } hmac = "0.12" sha2 = "0.10" +tracing = "0.1" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } +tracing-appender = "0.2" [package.metadata.deb] maintainer = "moukrea " diff --git a/crates/jaunt-host/src/main.rs b/crates/jaunt-host/src/main.rs index 1a76e0b..9787c21 100644 --- a/crates/jaunt-host/src/main.rs +++ b/crates/jaunt-host/src/main.rs @@ -3,10 +3,12 @@ mod config; mod files; mod node; mod pairing_server; +mod pid; mod profile; mod snag; use clap::{Parser, Subcommand}; +use tracing::error; #[derive(Parser)] #[command(name = "jaunt-host", about = "Jaunt host daemon", version)] @@ -17,15 +19,23 @@ struct Cli { #[derive(Subcommand)] enum Command { - /// Start the host daemon (default) - Serve, - /// Generate a pairing profile and wait for a peer to connect + /// Start the host daemon — accepts connections from paired devices + Serve { + /// Run as a background daemon + #[arg(short, long)] + daemon: bool, + }, + /// Pair a new device — generates a PIN and waits for connection Pair, /// Manage paired devices Devices { #[command(subcommand)] action: DeviceAction, }, + /// Stop a running daemon + Stop, + /// Show daemon status + Status, } #[derive(Subcommand)] @@ -36,13 +46,91 @@ enum DeviceAction { Revoke { peer_id: String }, } +fn init_logging(daemon: bool) { + use tracing_subscriber::{fmt, EnvFilter}; + + let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")); + + if daemon { + // Daemon mode: log to file + let log_dir = dirs_log(); + std::fs::create_dir_all(&log_dir).ok(); + let file_appender = tracing_appender::rolling::daily(&log_dir, "jaunt-host.log"); + fmt() + .with_env_filter(filter) + .with_writer(file_appender) + .with_ansi(false) + .init(); + } else { + // Foreground: log to stderr + fmt() + .with_env_filter(filter) + .with_writer(std::io::stderr) + .with_target(false) + .init(); + } +} + +fn dirs_log() -> std::path::PathBuf { + if let Ok(dir) = std::env::var("XDG_DATA_HOME") { + std::path::PathBuf::from(dir).join("jaunt") + } else if let Ok(home) = std::env::var("HOME") { + std::path::PathBuf::from(home) + .join(".local") + .join("share") + .join("jaunt") + } else { + std::path::PathBuf::from("/tmp/jaunt") + } +} + #[tokio::main(flavor = "current_thread")] async fn main() { let cli = Cli::parse(); + + // Commands that don't need logging or config + match &cli.command { + Some(Command::Stop) => { + pid::cmd_stop(); + return; + } + Some(Command::Status) => { + pid::cmd_status(); + return; + } + _ => {} + } + let config = config::JauntConfig::load(); + let is_daemon = matches!(&cli.command, Some(Command::Serve { daemon: true })); + + init_logging(is_daemon); let result = match cli.command { - None | Some(Command::Serve) => node::run_host(config).await, + None | Some(Command::Serve { .. }) => { + // Check for existing instance + if let Err(msg) = pid::acquire() { + error!("{msg}"); + eprintln!("error: {msg}"); + std::process::exit(1); + } + + // Daemonize if requested + if is_daemon { + if let Err(e) = nix::unistd::daemon(true, false) { + error!("Failed to daemonize: {e}"); + eprintln!("error: failed to daemonize: {e}"); + pid::release(); + std::process::exit(1); + } + // Re-write PID after fork (PID changed) + pid::write_current(); + } + + let res = node::run_host(config).await; + pid::release(); + res + } Some(Command::Pair) => node::run_pair(config).await, Some(Command::Devices { action }) => match action { DeviceAction::List => { @@ -63,9 +151,11 @@ async fn main() { Ok(()) } }, + Some(Command::Stop) | Some(Command::Status) => unreachable!(), }; if let Err(e) = result { + error!("{e}"); eprintln!("error: {e}"); std::process::exit(1); } diff --git a/crates/jaunt-host/src/node.rs b/crates/jaunt-host/src/node.rs index 9a3706a..53f01e8 100644 --- a/crates/jaunt-host/src/node.rs +++ b/crates/jaunt-host/src/node.rs @@ -1,9 +1,10 @@ -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; use cairn_p2p::{CairnConfig, Event, Node, Session, StorageBackend, TurnServer}; use jaunt_protocol::messages::*; use tokio::sync::RwLock; +use tracing::{debug, error, info, trace, warn}; use crate::approval::ApprovalStore; use crate::config::JauntConfig; @@ -23,114 +24,56 @@ struct PtyAttachment { /// Shared state for per-peer PTY attachments. type Attachments = Arc>>; +/// Run the host daemon — accepts connections from already-paired devices. +/// Does NOT generate PINs or start the pairing server. +/// Use `jaunt-host pair` to add new devices. pub async fn run_host(config: JauntConfig) -> Result<(), String> { - // Check snag is available let snag = SnagBridge::new(); snag.check_available()?; - // Build cairn config and start transport let cairn_config = build_cairn_config(&config); let node = cairn_p2p::create_and_start_with_config(cairn_config) .await .map_err(|e| format!("failed to create cairn node: {e}"))?; - // Collect listen addresses. Print all useful ones (skip Docker bridges). - // Profile only includes /ws (for browser clients), but the host accepts - // TCP, QUIC, and WS — native clients use the best available transport. let all_addrs = node.listen_addresses().await; let is_useful = |a: &&String| -> bool { !a.contains("/172.") || a.contains("/172.16.") }; let useful_addrs: Vec<&String> = all_addrs.iter().filter(is_useful).collect(); for addr in &useful_addrs { - eprintln!(" Listen: {addr}"); + debug!("Listen: {addr}"); } - // Profile gets only /ws addrs (browser transport constraint) + let ws_addrs: Vec = useful_addrs .iter() .filter(|a| a.ends_with("/ws")) .map(|a| a.to_string()) .collect(); - // Generate connection profile (includes cairn listen addresses for browser clients) - let (conn_profile, profile_url) = - profile::generate_qr_profile(&node, &config, &ws_addrs).await?; - let pin_result = profile::generate_pin_profile(&node, &config, &ws_addrs).await; - let pin = pin_result - .as_ref() - .map(|(_, pin)| pin.clone()) - .unwrap_or_default(); - - // Register as a Kademlia PROVIDER under a PIN-derived key. - // Clients compute the same key, call get_providers(), and get our PeerId. - // Provider records are the core IPFS DHT mechanism — they work reliably. - if let Some(sender) = node.swarm_sender().cloned() { - let pin_key = { - use hmac::{Hmac, Mac}; - use sha2::Sha256; - type HmacSha256 = Hmac; - let mut mac = HmacSha256::new_from_slice(b"jaunt-pin-v1").expect("HMAC key"); - mac.update(pin.as_bytes()); - let hash = mac.finalize().into_bytes(); - // Wrap in identity multihash format: [0x00 (identity), 0x20 (32 bytes), ...hash] - // This matches js-libp2p's CID.multihash.bytes used by findProviders() - let mut mh = Vec::with_capacity(2 + hash.len()); - mh.push(0x00); // identity hash function code - mh.push(hash.len() as u8); // digest length - mh.extend_from_slice(&hash); - mh - }; - tokio::spawn(async move { - // Wait for DHT bootstrap to complete before publishing. - tokio::time::sleep(std::time::Duration::from_secs(8)).await; - eprintln!(" DHT: Publishing PIN provider record..."); - match sender.kad_start_providing(pin_key).await { - Ok(()) => { - eprintln!(" DHT: PIN discoverable (provider record confirmed by DHT peers)") - } - Err(e) => eprintln!(" DHT: PIN publish failed — PIN discovery will not work: {e}"), - } - }); - } - - // Start the pairing HTTP server so browsers can fetch the profile via PIN - let _pairing_addr = match pairing_server::start_pairing_server(Arc::new(RwLock::new( - pairing_server::PairingState { - pin: pin.clone(), - profile: conn_profile, - }, - ))) - .await - { - Ok(addr) => Some(addr), - Err(e) => { - eprintln!(" Warning: pairing server failed to start: {e}"); - None - } - }; - - // Initialize file browser for cairn RPC handler let file_browser = if config.files.enabled { Some(FileBrowser::new(&config)) } else { None }; - // Load approval store let mut approval_store = ApprovalStore::load(); - - // Per-peer PTY attachment tracking let attachments: Attachments = Arc::new(RwLock::new(HashMap::new())); - // Display status + // Known jaunt clients — peers that sent RPC messages or completed pairing. + // Only these get INFO-level connection/disconnection logs. + let mut known_clients: HashSet = HashSet::new(); + // Seed from approval store + for device in approval_store.list() { + known_clients.insert(device.peer_id.clone()); + } + let host_name = hostname::get() .map(|h| h.to_string_lossy().into_owned()) .unwrap_or_else(|_| "unknown".to_string()); - // Find the primary LAN IP for the pairing display let lan_ip = ws_addrs .iter() .find(|a| !a.contains("/127.") && a.ends_with("/ws")) .and_then(|a| { - // Extract IP from multiaddr like /ip4/192.168.1.119/tcp/35833/ws let parts: Vec<&str> = a.split('/').collect(); parts.get(2).map(|ip| ip.to_string()) }); @@ -140,187 +83,208 @@ pub async fn run_host(config: JauntConfig) -> Result<(), String> { .map(|p| p.to_string()) .unwrap_or_default(); - eprintln!("Jaunt host daemon started"); - eprintln!(" Host: {host_name}"); - eprintln!(" Tier: {}", config.tier_label()); - eprintln!(); - eprintln!(" ┌─ Connect from anywhere ──────────────────────┐"); - eprintln!(" │ PIN: {pin:<42}│"); - eprintln!(" │ PeerId: {peer_id_display}"); - eprintln!(" │ │"); - eprintln!(" │ Enter the PIN in the Jaunt app to connect. │"); - eprintln!(" │ Works over the internet — no IP needed. │"); - eprintln!(" └───────────────────────────────────────────────┘"); - eprintln!(); - eprintln!(" URL: {profile_url}"); + info!("Jaunt host daemon started"); + info!(" Host: {host_name}"); + info!(" PeerId: {peer_id_display}"); + info!(" Tier: {}", config.tier_label()); if let Some(ref ip) = lan_ip { - eprintln!(" LAN: {ip} (same network only)"); + info!(" LAN: {ip}"); } - eprintln!(" Devices: {}", approval_store.list().len()); - eprintln!(); - eprintln!("Waiting for connections..."); + info!(" Devices: {}", approval_store.list().len()); + info!("Accepting connections from paired devices. Use `jaunt-host pair` to add new devices."); - // Event loop - loop { - let event = match node.recv_event().await { - Some(e) => e, - None => break, - }; + // Signal handling for graceful shutdown + let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) + .map_err(|e| format!("failed to register SIGTERM handler: {e}"))?; - match event { - Event::PairingCompleted { ref peer_id } => { - eprintln!("Pairing completed: {peer_id}"); - if config.server.require_approval { - approval_store.approve(peer_id, "device"); - approval_store.save(); - eprintln!(" Auto-approved device: {peer_id}"); - } + // Event loop with signal handling + loop { + tokio::select! { + event = node.recv_event() => { + let event = match event { + Some(e) => e, + None => break, + }; + handle_event( + &event, &node, &config, &snag, &file_browser, + &mut approval_store, &attachments, &mut known_clients, + ).await; } - Event::StateChanged { - ref peer_id, - ref state, - } => { - eprintln!("Peer {peer_id}: {state}"); + _ = tokio::signal::ctrl_c() => { + info!("Received SIGINT, shutting down..."); + break; } - Event::MessageReceived { - ref peer_id, - ref channel, - ref data, - } => { - if !approval_store.is_approved(peer_id) { - // Auto-approve: the peer connected via cairn transport - // which already authenticated via Noise XX handshake. - approval_store.approve(peer_id, "cairn-transport"); - approval_store.save(); - eprintln!("Auto-approved peer: {peer_id}"); - } + _ = sigterm.recv() => { + info!("Received SIGTERM, shutting down..."); + break; + } + } + } - // Route by tag byte (application-layer multiplexing). - // cairn strips channel names, so we can't rely on them. - // First byte: TAG_RPC (0x01) or TAG_PTY (0x02). - // Fall back to channel name for legacy/untagged messages. - if data.is_empty() { - continue; - } + info!("Jaunt host daemon stopped"); + Ok(()) +} - let tag = data[0]; - let payload = &data[1..]; - - match tag { - TAG_RPC => { - eprintln!("RPC from {peer_id} ({} bytes, tagged)", payload.len()); - - let request = match jaunt_protocol::decode_request(payload) { - Ok(r) => r, - Err(e) => { - let response = RpcResponse::Error { - code: 1, - message: format!("decode error: {e}"), - }; - send_rpc_response(&node, peer_id, &response).await; - continue; - } - }; +/// Handle a single event from the cairn node. +#[allow(clippy::too_many_arguments)] +async fn handle_event( + event: &Event, + node: &Node, + config: &JauntConfig, + snag: &SnagBridge, + file_browser: &Option, + approval_store: &mut ApprovalStore, + attachments: &Attachments, + known_clients: &mut HashSet, +) { + match event { + Event::PairingCompleted { ref peer_id } => { + info!("Pairing completed: {peer_id}"); + known_clients.insert(peer_id.clone()); + if config.server.require_approval { + approval_store.approve(peer_id, "device"); + approval_store.save(); + info!("Auto-approved device: {peer_id}"); + } + } + Event::StateChanged { + ref peer_id, + ref state, + } => { + if known_clients.contains(peer_id) { + info!("Client {peer_id}: {state}"); + } else { + trace!("DHT peer {peer_id}: {state}"); + } + } + Event::MessageReceived { + ref peer_id, + ref channel, + ref data, + } => { + if !approval_store.is_approved(peer_id) { + approval_store.approve(peer_id, "cairn-transport"); + approval_store.save(); + info!("Auto-approved peer: {peer_id}"); + } + known_clients.insert(peer_id.clone()); - match request { - RpcRequest::SessionAttach { ref target } => { - eprintln!(" SessionAttach target={target}"); - handle_session_attach(&node, peer_id, target, &snag, &attachments) - .await; - } - RpcRequest::SessionDetach {} => { - eprintln!(" SessionDetach"); - handle_session_detach(peer_id, &attachments).await; - let response = RpcResponse::Ok(RpcData::Empty {}); - send_rpc_response(&node, peer_id, &response).await; - } - RpcRequest::Resize { cols, rows } => { - let att = attachments.read().await; - if let Some(attachment) = att.get(peer_id) { - let mut guard = attachment.writer.lock().await; - if let Some(ref mut w) = *guard { - let _ = w.send_resize(cols, rows).await; - } + if data.is_empty() { + return; + } + + let tag = data[0]; + let payload = &data[1..]; + + match tag { + TAG_RPC => { + debug!("RPC from {peer_id} ({} bytes)", payload.len()); + + let request = match jaunt_protocol::decode_request(payload) { + Ok(r) => r, + Err(e) => { + let response = RpcResponse::Error { + code: 1, + message: format!("decode error: {e}"), + }; + send_rpc_response(node, peer_id, &response).await; + return; + } + }; + + match request { + RpcRequest::SessionAttach { ref target } => { + info!( + "Session attach: peer={} target={target}", + &peer_id[..16.min(peer_id.len())] + ); + handle_session_attach(node, peer_id, target, snag, attachments).await; + } + RpcRequest::SessionDetach {} => { + info!("Session detach: peer={}", &peer_id[..16.min(peer_id.len())]); + handle_session_detach(peer_id, attachments).await; + let response = RpcResponse::Ok(RpcData::Empty {}); + send_rpc_response(node, peer_id, &response).await; + } + RpcRequest::Resize { cols, rows } => { + let att = attachments.read().await; + if let Some(attachment) = att.get(peer_id) { + let mut guard = attachment.writer.lock().await; + if let Some(ref mut w) = *guard { + let _ = w.send_resize(cols, rows).await; } - let response = RpcResponse::Ok(RpcData::Empty {}); - send_rpc_response(&node, peer_id, &response).await; } - _ => { - let response = handle_rpc_request(&request, &snag, &file_browser); - if let RpcResponse::Error { message, .. } = &response { - eprintln!(" RPC error: {message}"); - } - send_rpc_response(&node, peer_id, &response).await; + let response = RpcResponse::Ok(RpcData::Empty {}); + send_rpc_response(node, peer_id, &response).await; + } + _ => { + let response = handle_rpc_request(&request, snag, file_browser); + if let RpcResponse::Error { message, .. } = &response { + warn!("RPC error: {message}"); } + send_rpc_response(node, peer_id, &response).await; } } - TAG_PTY => { - // PTY input from browser: forward to attached snag session - let att = attachments.read().await; - if let Some(attachment) = att.get(peer_id) { - let mut guard = attachment.writer.lock().await; - if let Some(ref mut w) = *guard { - if let Err(e) = w.send_pty_input(payload).await { - eprintln!("PTY send to {} failed: {e}", attachment.target); - } + } + TAG_PTY => { + let att = attachments.read().await; + if let Some(attachment) = att.get(peer_id) { + let mut guard = attachment.writer.lock().await; + if let Some(ref mut w) = *guard { + if let Err(e) = w.send_pty_input(payload).await { + warn!("PTY send to {} failed: {e}", attachment.target); } } } - _ => { - // Legacy fallback: try routing by channel name - match channel.as_str() { - "rpc" => { - eprintln!( - "RPC from {peer_id} ({} bytes, legacy channel)", - data.len() - ); - let request = match jaunt_protocol::decode_request(data) { - Ok(r) => r, - Err(e) => { - let response = RpcResponse::Error { - code: 1, - message: format!("decode error: {e}"), - }; - send_rpc_response(&node, peer_id, &response).await; - continue; - } - }; - let response = handle_rpc_request(&request, &snag, &file_browser); - send_rpc_response(&node, peer_id, &response).await; - } - "pty" => { - let att = attachments.read().await; - if let Some(attachment) = att.get(peer_id) { - let mut guard = attachment.writer.lock().await; - if let Some(ref mut w) = *guard { - if let Err(e) = w.send_pty_input(data).await { - eprintln!( - "PTY send to {} failed: {e}", - attachment.target - ); - } + } + _ => { + // Legacy fallback: try routing by channel name + match channel.as_str() { + "rpc" => { + debug!("RPC from {peer_id} ({} bytes, legacy channel)", data.len()); + let request = match jaunt_protocol::decode_request(data) { + Ok(r) => r, + Err(e) => { + let response = RpcResponse::Error { + code: 1, + message: format!("decode error: {e}"), + }; + send_rpc_response(node, peer_id, &response).await; + return; + } + }; + let response = handle_rpc_request(&request, snag, file_browser); + send_rpc_response(node, peer_id, &response).await; + } + "pty" => { + let att = attachments.read().await; + if let Some(attachment) = att.get(peer_id) { + let mut guard = attachment.writer.lock().await; + if let Some(ref mut w) = *guard { + if let Err(e) = w.send_pty_input(data).await { + warn!("PTY send to {} failed: {e}", attachment.target); } } } - _ => {} } + _ => {} } } } - Event::Error { ref error } => { - eprintln!("Error: {error}"); + } + Event::Error { ref error } => { + // Dial errors from DHT peers are expected noise + if error.contains("dial failed") || error.contains("Failed to negotiate") { + debug!("{error}"); + } else { + warn!("{error}"); } - _ => {} } + _ => {} } - - Ok(()) } // Application-layer message tags. -// cairn's dispatch_incoming strips channel names, so ALL messages arrive with -// channel "". We prefix every message with a 1-byte tag so both sides can -// distinguish RPC traffic from PTY traffic. const TAG_RPC: u8 = 0x01; const TAG_PTY: u8 = 0x02; @@ -336,23 +300,22 @@ async fn send_rpc_response(node: &Node, peer_id: &str, response: &RpcResponse) { match session.open_channel("rpc").await { Ok(ch) => match session.send(&ch, &tagged).await { Ok(_) => { - eprintln!(" Sent {} bytes response (tagged RPC)", resp_data.len()) + debug!("Sent {} bytes response", resp_data.len()); } - Err(e) => eprintln!(" Send failed: {e}"), + Err(e) => warn!("Send failed: {e}"), }, - Err(e) => eprintln!(" Open channel failed: {e}"), + Err(e) => warn!("Open channel failed: {e}"), } } else { - eprintln!(" No session for peer {peer_id}"); + debug!("No session for peer {peer_id}"); } } - Err(e) => eprintln!(" Encode response failed: {e}"), + Err(e) => error!("Encode response failed: {e}"), } } /// Handle SessionAttach: respond with Ok, then spawn a background task that -/// runs `snag output --follow` and streams PTY output to the browser -/// via the cairn session's "pty" channel. +/// streams PTY output to the browser via the cairn session's "pty" channel. async fn handle_session_attach( node: &Node, peer_id: &str, @@ -360,10 +323,8 @@ async fn handle_session_attach( snag: &SnagBridge, attachments: &Attachments, ) { - // Kill any existing attachment for this peer first handle_session_detach(peer_id, attachments).await; - // Verify the target session exists match snag.session_info(target) { Ok(_info) => {} Err(e) => { @@ -376,25 +337,22 @@ async fn handle_session_attach( } } - // Send the Ok response immediately so the browser knows attach succeeded let response = RpcResponse::Ok(RpcData::Empty {}); send_rpc_response(node, peer_id, &response).await; - // Get the cairn session handle for this peer let sessions = node.sessions().await; let session = match sessions.get(peer_id) { Some(s) => s.clone(), None => { - eprintln!(" No cairn session for peer {peer_id}, cannot start PTY forwarding"); + warn!("No cairn session for peer {peer_id}, cannot start PTY forwarding"); return; } }; - // Attach to the snag session via the daemon's Unix socket let (scrollback, snag_attachment) = match crate::snag::SnagAttachment::attach(target).await { Ok(r) => r, Err(e) => { - eprintln!(" SnagAttachment::attach failed: {e}"); + warn!("SnagAttachment::attach failed: {e}"); let response = RpcResponse::Error { code: 8, message: format!("attach failed: {e}"), @@ -404,11 +362,9 @@ async fn handle_session_attach( } }; - // Split into reader (for output forwarding task) and writer (for input from event loop) let (reader, writer) = snag_attachment.split(); let writer = Arc::new(tokio::sync::Mutex::new(Some(writer))); - // Send scrollback so the terminal isn't blank (tagged as PTY output) if !scrollback.is_empty() { if let Ok(ch) = session.open_channel("pty").await { let sb_bytes = scrollback.as_bytes(); @@ -416,11 +372,10 @@ async fn handle_session_attach( tagged.push(TAG_PTY); tagged.extend_from_slice(sb_bytes); let _ = session.send(&ch, &tagged).await; - eprintln!(" Sent {} bytes scrollback (tagged PTY)", sb_bytes.len()); + debug!("Sent {} bytes scrollback", sb_bytes.len()); } } - // Spawn the PTY output forwarding task let peer_id_owned = peer_id.to_string(); let task = tokio::spawn(async move { pty_output_forwarder(session, reader, &peer_id_owned).await; @@ -436,11 +391,12 @@ async fn handle_session_attach( }, ); - eprintln!(" PTY forwarding started for peer {peer_id} -> session {target}"); + info!( + "PTY forwarding: peer {} -> session {target}", + &peer_id[..16.min(peer_id.len())] + ); } -/// Long-running task: reads PTY output from the SnagAttachmentReader and forwards -/// each chunk to the browser via the cairn session's "pty" channel. async fn pty_output_forwarder( session: Session, mut reader: crate::snag::SnagAttachmentReader, @@ -449,7 +405,7 @@ async fn pty_output_forwarder( let pty_channel = match session.open_channel("pty").await { Ok(ch) => ch, Err(e) => { - eprintln!(" PTY forwarder: failed to open pty channel: {e}"); + warn!("PTY forwarder: failed to open pty channel: {e}"); return; } }; @@ -461,13 +417,12 @@ async fn pty_output_forwarder( tagged.push(TAG_PTY); tagged.extend_from_slice(&data); if let Err(e) = session.send(&pty_channel, &tagged).await { - eprintln!(" PTY forwarder: send failed for peer {peer_id}: {e}"); + debug!("PTY forwarder: send failed for peer {peer_id}: {e}"); break; } } Ok(crate::snag::PtyReadResult::SessionEvent { event, session_id }) => { - eprintln!(" PTY forwarder: session event '{event}' for peer {peer_id}"); - // Forward the event to the web client via RPC channel + debug!("PTY forwarder: session event '{event}' for peer {peer_id}"); let resp = jaunt_protocol::RpcResponse::SessionEvent { event, session_id }; let rpc_channel = match session.open_channel("rpc").await { Ok(ch) => ch, @@ -483,29 +438,25 @@ async fn pty_output_forwarder( break; } Ok(crate::snag::PtyReadResult::Eof) => { - eprintln!(" PTY forwarder: EOF for peer {peer_id}"); + debug!("PTY forwarder: EOF for peer {peer_id}"); break; } Err(e) => { - eprintln!(" PTY forwarder: read error for peer {peer_id}: {e}"); + debug!("PTY forwarder: read error for peer {peer_id}: {e}"); break; } } } } -/// Handle SessionDetach: abort the PTY forwarding task. async fn handle_session_detach(peer_id: &str, attachments: &Attachments) { if let Some(attachment) = attachments.write().await.remove(peer_id) { attachment.abort_handle.abort(); - eprintln!( - " Detached peer {peer_id} from session {}", - attachment.target - ); + debug!("Detached peer {peer_id} from session {}", attachment.target); } } -/// Interactive pairing: displays PIN + URL, waits for a peer, approves it. +/// Interactive pairing: generates PIN, starts HTTP server, waits for a device to pair. pub async fn run_pair(config: JauntConfig) -> Result<(), String> { let cairn_config = build_cairn_config(&config); let node = cairn_p2p::create_and_start_with_config(cairn_config) @@ -521,7 +472,7 @@ pub async fn run_pair(config: JauntConfig) -> Result<(), String> { .map(|a| a.to_string()) .collect(); - let (_conn_profile, profile_url) = + let (conn_profile, profile_url) = profile::generate_qr_profile(&node, &config, &ws_addrs).await?; let pin_result = profile::generate_pin_profile(&node, &config, &ws_addrs).await; let pin = pin_result @@ -529,11 +480,52 @@ pub async fn run_pair(config: JauntConfig) -> Result<(), String> { .map(|(_, pin)| pin.clone()) .unwrap_or_default(); + // Register as a Kademlia PROVIDER under a PIN-derived key + if let Some(sender) = node.swarm_sender().cloned() { + let pin_key = derive_pin_key(&pin); + tokio::spawn(async move { + tokio::time::sleep(std::time::Duration::from_secs(8)).await; + info!("DHT: Publishing PIN provider record..."); + match sender.kad_start_providing(pin_key).await { + Ok(()) => info!("DHT: PIN discoverable (confirmed by DHT peers)"), + Err(e) => warn!("DHT: PIN publish failed: {e}"), + } + }); + } + + // Start the pairing HTTP server + let _pairing_addr = match pairing_server::start_pairing_server(Arc::new(RwLock::new( + pairing_server::PairingState { + pin: pin.clone(), + profile: conn_profile, + }, + ))) + .await + { + Ok(addr) => { + debug!("Pairing server listening on {addr}"); + Some(addr) + } + Err(e) => { + warn!("Pairing server failed to start: {e}"); + None + } + }; + let peer_id_str = node .libp2p_peer_id() .map(|p| p.to_string()) .unwrap_or_default(); + let lan_ip = ws_addrs + .iter() + .find(|a| !a.contains("/127.") && a.ends_with("/ws")) + .and_then(|a| { + let parts: Vec<&str> = a.split('/').collect(); + parts.get(2).map(|ip| ip.to_string()) + }); + + // User-facing output for the pairing UI eprintln!(); eprintln!(" ┌─────────────────────────────────────┐"); eprintln!(" │ JAUNT PAIRING MODE │"); @@ -545,6 +537,9 @@ pub async fn run_pair(config: JauntConfig) -> Result<(), String> { &peer_id_str[..24.min(peer_id_str.len())] ); eprintln!(" URL: {profile_url}"); + if let Some(ref ip) = lan_ip { + eprintln!(" LAN: {ip} (same network only)"); + } eprintln!(); eprintln!(" Waiting for a device to connect..."); eprintln!(); @@ -553,15 +548,13 @@ pub async fn run_pair(config: JauntConfig) -> Result<(), String> { loop { match node.recv_event().await { - Some(Event::MessageReceived { ref peer_id, .. }) => { + Some(Event::PairingCompleted { ref peer_id }) + | Some(Event::MessageReceived { ref peer_id, .. }) => { if !approval_store.is_approved(peer_id) { approval_store.approve(peer_id, "paired"); approval_store.save(); } - eprintln!( - " ✓ Device paired: {}...", - &peer_id[..24.min(peer_id.len())] - ); + eprintln!(" Device paired: {}...", &peer_id[..24.min(peer_id.len())]); eprintln!(" Run `jaunt-host serve` to start accepting connections."); break; } @@ -569,7 +562,17 @@ pub async fn run_pair(config: JauntConfig) -> Result<(), String> { ref peer_id, ref state, }) => { - eprintln!(" Peer {}...: {state}", &peer_id[..16.min(peer_id.len())]); + trace!( + "Pairing: peer {}...: {state}", + &peer_id[..16.min(peer_id.len())] + ); + } + Some(Event::Error { ref error }) => { + if error.contains("dial failed") { + trace!("{error}"); + } else { + debug!("{error}"); + } } None => break, _ => {} @@ -578,6 +581,20 @@ pub async fn run_pair(config: JauntConfig) -> Result<(), String> { Ok(()) } +fn derive_pin_key(pin: &str) -> Vec { + use hmac::{Hmac, Mac}; + use sha2::Sha256; + type HmacSha256 = Hmac; + let mut mac = HmacSha256::new_from_slice(b"jaunt-pin-v1").expect("HMAC key"); + mac.update(pin.as_bytes()); + let hash = mac.finalize().into_bytes(); + let mut mh = Vec::with_capacity(2 + hash.len()); + mh.push(0x00); + mh.push(hash.len() as u8); + mh.extend_from_slice(&hash); + mh +} + fn build_cairn_config(config: &JauntConfig) -> CairnConfig { let mut cairn = CairnConfig::default(); @@ -601,14 +618,12 @@ fn build_cairn_config(config: &JauntConfig) -> CairnConfig { path: JauntConfig::config_dir().join("cairn-data"), }; - // Jaunt-specific discovery namespace to avoid collisions with other cairn apps cairn.app_identifier = Some("jaunt".to_string()); cairn } /// Handle an RPC request and produce a synchronous response. -/// SessionAttach/SessionDetach/Resize are handled separately in the event loop. fn handle_rpc_request( request: &RpcRequest, snag: &SnagBridge, @@ -707,7 +722,6 @@ fn handle_rpc_request( message: "file browser disabled".into(), }, }, - // These are handled in the event loop, not here RpcRequest::SessionAttach { .. } | RpcRequest::SessionDetach {} | RpcRequest::Resize { .. } => RpcResponse::Ok(RpcData::Empty {}), diff --git a/crates/jaunt-host/src/pid.rs b/crates/jaunt-host/src/pid.rs new file mode 100644 index 0000000..ea9ff3d --- /dev/null +++ b/crates/jaunt-host/src/pid.rs @@ -0,0 +1,107 @@ +//! PID file management for single-instance enforcement. + +use std::fs; +use std::path::PathBuf; + +/// Get the PID file path. +/// Prefers $XDG_RUNTIME_DIR, falls back to ~/.config/jaunt/. +fn pid_path() -> PathBuf { + if let Ok(dir) = std::env::var("XDG_RUNTIME_DIR") { + PathBuf::from(dir).join("jaunt-host.pid") + } else { + crate::config::JauntConfig::config_dir().join("jaunt-host.pid") + } +} + +/// Read PID from the PID file. Returns None if file doesn't exist or is invalid. +fn read_pid() -> Option { + fs::read_to_string(pid_path()) + .ok() + .and_then(|s| s.trim().parse().ok()) +} + +/// Check if a process with the given PID is alive. +fn is_alive(pid: u32) -> bool { + // kill(pid, 0) checks if process exists without sending a signal + nix::sys::signal::kill(nix::unistd::Pid::from_raw(pid as i32), None).is_ok() +} + +/// Acquire the PID lock. Returns Err with message if another instance is running. +pub fn acquire() -> Result<(), String> { + if let Some(pid) = read_pid() { + if is_alive(pid) { + return Err(format!( + "Another jaunt-host is running (PID {pid}). \ + Stop it with `jaunt-host stop` or `kill {pid}`." + )); + } + // Stale PID file — remove it + let _ = fs::remove_file(pid_path()); + } + write_current(); + Ok(()) +} + +/// Write the current process PID to the PID file. +pub fn write_current() { + let path = pid_path(); + if let Some(parent) = path.parent() { + let _ = fs::create_dir_all(parent); + } + let _ = fs::write(&path, format!("{}", std::process::id())); +} + +/// Remove the PID file. +pub fn release() { + let _ = fs::remove_file(pid_path()); +} + +/// `jaunt-host stop` — send SIGTERM to the running daemon. +pub fn cmd_stop() { + match read_pid() { + Some(pid) if is_alive(pid) => { + let nix_pid = nix::unistd::Pid::from_raw(pid as i32); + eprintln!("Stopping jaunt-host (PID {pid})..."); + let _ = nix::sys::signal::kill(nix_pid, nix::sys::signal::Signal::SIGTERM); + + // Wait up to 5 seconds for graceful shutdown + for _ in 0..50 { + if !is_alive(pid) { + eprintln!("Stopped."); + let _ = fs::remove_file(pid_path()); + return; + } + std::thread::sleep(std::time::Duration::from_millis(100)); + } + + // Force kill + eprintln!("Sending SIGKILL..."); + let _ = nix::sys::signal::kill(nix_pid, nix::sys::signal::Signal::SIGKILL); + let _ = fs::remove_file(pid_path()); + eprintln!("Killed."); + } + Some(_) => { + eprintln!("jaunt-host is not running (stale PID file). Cleaning up."); + let _ = fs::remove_file(pid_path()); + } + None => { + eprintln!("jaunt-host is not running."); + } + } +} + +/// `jaunt-host status` — check if daemon is running. +pub fn cmd_status() { + match read_pid() { + Some(pid) if is_alive(pid) => { + println!("jaunt-host is running (PID {pid})"); + } + Some(_) => { + println!("jaunt-host is not running (stale PID file)"); + let _ = fs::remove_file(pid_path()); + } + None => { + println!("jaunt-host is not running"); + } + } +} diff --git a/packaging/systemd/jaunt-host.service b/packaging/systemd/jaunt-host.service new file mode 100644 index 0000000..5ddebe4 --- /dev/null +++ b/packaging/systemd/jaunt-host.service @@ -0,0 +1,22 @@ +[Unit] +Description=Jaunt host daemon — P2P remote shell access +Documentation=https://github.com/moukrea/jaunt +After=network-online.target +Wants=network-online.target + +[Service] +Type=simple +ExecStart=/usr/bin/jaunt-host serve +ExecStop=/usr/bin/jaunt-host stop +Restart=on-failure +RestartSec=5 + +# Security hardening +NoNewPrivileges=yes +ProtectSystem=strict +ProtectHome=read-only +ReadWritePaths=%h/.config/jaunt %h/.local/share/jaunt +PrivateTmp=yes + +[Install] +WantedBy=default.target