diff --git a/native-bridge/src/network/bridge.rs b/native-bridge/src/network/bridge.rs index a0aaf572..1ba02125 100644 --- a/native-bridge/src/network/bridge.rs +++ b/native-bridge/src/network/bridge.rs @@ -9,6 +9,7 @@ use super::{NetworkEvent, NetworkManager, NetworkMode}; use crate::audio::AudioBridgeHandle; +use crate::network::codec::{OpusCodec, OpusConfig}; use crate::network::osp::{OspMessageType, TrackStateMessage, TransportAction}; use parking_lot::RwLock; use std::sync::Arc; @@ -71,6 +72,10 @@ pub struct AudioNetworkBridge { running: Arc, /// Transport event sender for metronome/playback sync transport_tx: RwLock>>, + /// Opus codec for encoding local audio before P2P transmission. + /// The bridge encodes once and sends the encoded data to P2P peers, + /// avoiding redundant encoding in the P2P layer. + codec: Arc, } impl AudioNetworkBridge { @@ -78,14 +83,22 @@ impl AudioNetworkBridge { config: BridgeConfig, network: Arc, audio: AudioBridgeHandle, - ) -> Self { - Self { + ) -> super::Result { + let codec = Arc::new(OpusCodec::new(OpusConfig { + sample_rate: config.sample_rate, + channels: config.channels, + frame_size: config.frame_size, + ..OpusConfig::low_latency() + })?); + + Ok(Self { config, network, audio, running: Arc::new(std::sync::atomic::AtomicBool::new(false)), transport_tx: RwLock::new(None), - } + codec, + }) } /// Subscribe to transport events (clock sync, tempo changes, etc.) @@ -143,6 +156,7 @@ impl AudioNetworkBridge { // Spawn outgoing audio sender on dedicated thread let audio = self.audio.clone(); let network = self.network.clone(); + let codec = self.codec.clone(); let config = self.config.clone(); let running = self.running.clone(); std::thread::Builder::new() @@ -152,7 +166,7 @@ impl AudioNetworkBridge { .enable_all() .build() .expect("Failed to create runtime"); - rt.block_on(Self::audio_send_loop(audio, network, config, running)); + rt.block_on(Self::audio_send_loop(audio, network, codec, config, running)); }) .expect("Failed to spawn audio sender thread"); @@ -198,10 +212,18 @@ impl AudioNetworkBridge { match event { NetworkEvent::AudioReceived { user_id, - track_id: _, + track_id, samples, } => { - // Push audio to the remote user's buffer in the audio engine + // Feed received P2P/relay audio into the AudioEngine's remote user buffer. + // This handles both P2P-decoded and relay-decoded audio uniformly. + // The P2P layer has already Opus-decoded the audio; we receive PCM here. + debug!( + "Routing {} audio samples from user {} track {} to audio engine", + samples.len(), + user_id, + track_id, + ); audio.push_remote_audio(&user_id, &samples); } @@ -337,10 +359,19 @@ impl AudioNetworkBridge { } } - /// Audio send loop - gets audio from engine and sends to network + /// Audio send loop - gets audio from engine and sends to network. + /// + /// Handles dual-path operation: + /// 1. Opus-encodes audio once at the bridge level + /// 2. Sends pre-encoded data to P2P peers via NetworkManager (unreliable UDP) + /// 3. Sends raw PCM to relay when in Relay or Hybrid mode + /// + /// This avoids redundant Opus encoding: the bridge encodes once, and P2P + /// peers receive the encoded data directly without re-encoding. async fn audio_send_loop( audio: AudioBridgeHandle, network: Arc, + codec: Arc, config: BridgeConfig, running: Arc, ) { @@ -367,10 +398,37 @@ impl AudioNetworkBridge { // Accumulate until we have a full frame frame_buffer.extend_from_slice(&samples); - // Send complete frames + // Send complete frames via both P2P and relay paths while frame_buffer.len() >= samples_per_frame { let frame: Vec = frame_buffer.drain(..samples_per_frame).collect(); - network.send_audio(config.local_track_id, frame); + let sample_count = + (frame.len() / config.channels as usize) as u16; + + // Opus-encode once at the bridge level for P2P transmission. + // The encoded data goes directly to P2P peers without re-encoding. + match codec.encoder.encode(&frame) { + Ok(opus_data) => { + // Send pre-encoded audio to P2P peers (unreliable UDP). + // This uses the P2PEncodedAudio message type which bypasses + // the mode-based routing and goes directly to P2P. + network.send_p2p_encoded_audio( + config.local_track_id, + opus_data, + config.channels, + sample_count, + ); + } + Err(e) => { + debug!("Opus encode failed in bridge send loop: {}", e); + } + } + + // Also send raw PCM to relay for Relay and Hybrid modes. + // In P2P-only mode, relay is not running so this is a no-op. + let mode = network.mode(); + if mode == NetworkMode::Relay || mode == NetworkMode::Hybrid { + network.send_relay_audio(config.local_track_id, frame); + } } } } @@ -381,11 +439,11 @@ pub fn create_and_start_bridge( network: Arc, audio: AudioBridgeHandle, event_rx: broadcast::Receiver, -) -> AudioNetworkBridge { +) -> super::Result { let config = BridgeConfig::default(); - let bridge = AudioNetworkBridge::new(config, network, audio); + let bridge = AudioNetworkBridge::new(config, network, audio)?; bridge.start(event_rx); - bridge + Ok(bridge) } #[cfg(test)] diff --git a/native-bridge/src/network/manager.rs b/native-bridge/src/network/manager.rs index 5952e9e8..5b5ffa06 100644 --- a/native-bridge/src/network/manager.rs +++ b/native-bridge/src/network/manager.rs @@ -124,6 +124,21 @@ pub enum OutgoingMessage { bpm: f32, time_sig: (u8, u8), }, + /// Pre-encoded audio for direct P2P transmission. + /// Bypasses mode-based routing and Opus encoding in the P2P layer, + /// since the bridge has already Opus-encoded the audio. + P2PEncodedAudio { + track_id: u8, + opus_data: Vec, + channels: u8, + sample_count: u16, + }, + /// Audio for relay only. Used by the bridge for dual-path operation + /// where P2P audio is sent separately via P2PEncodedAudio. + RelayAudio { + track_id: u8, + samples: Vec, + }, } /// Network manager state @@ -369,6 +384,44 @@ impl NetworkManager { } } + /// Send pre-encoded audio frame to P2P peers. + /// Bypasses the standard outgoing queue's Opus encoding since the + /// bridge has already encoded the audio. Sent via the outgoing channel + /// so socket I/O occurs on the correct tokio runtime. + pub fn send_p2p_encoded_audio( + &self, + track_id: u8, + opus_data: Vec, + channels: u8, + sample_count: u16, + ) { + if let Some(tx) = self.outgoing_tx.read().as_ref() { + let _ = tx.send(OutgoingMessage::P2PEncodedAudio { + track_id, + opus_data, + channels, + sample_count, + }); + } + } + + /// Send audio frame to relay only. + /// Used by the bridge for dual-path operation where P2P audio + /// is sent separately via send_p2p_encoded_audio. + pub fn send_relay_audio(&self, track_id: u8, samples: Vec) { + if let Some(tx) = self.outgoing_tx.read().as_ref() { + let _ = tx.send(OutgoingMessage::RelayAudio { + track_id, + samples, + }); + } + } + + /// Get access to the audio codec for encoding/decoding + pub fn codec(&self) -> &OpusCodec { + &self.codec + } + /// Get current mode pub fn mode(&self) -> NetworkMode { self.state.read().mode @@ -496,6 +549,7 @@ impl NetworkManager { // Forward P2P events to our event channel let event_tx = self.event_tx.read().clone(); + let peers_for_events = self.peers.clone(); if let Some(tx) = event_tx { tokio::spawn(async move { while let Ok(event) = rx.recv().await { @@ -508,6 +562,10 @@ impl NetworkManager { }); } P2PEvent::PeerDisconnected { peer_id, reason } => { + // Mark peer as audio-inactive on disconnect + if let Some(peer) = peers_for_events.get(peer_id) { + peer.set_audio_active(false); + } let _ = tx.send(NetworkEvent::PeerDisconnected { user_id: peer_id.to_string(), reason, @@ -582,6 +640,30 @@ impl NetworkManager { } async fn send_message(&self, msg: OutgoingMessage) -> Result<()> { + match msg { + // Pre-encoded audio sent directly to P2P peers (bypass mode-based routing) + OutgoingMessage::P2PEncodedAudio { + track_id, + opus_data, + channels, + sample_count, + } => { + self.p2p + .broadcast_encoded_audio(track_id, &opus_data, channels, sample_count) + .await + } + // Audio sent directly to relay (bypass mode-based routing) + OutgoingMessage::RelayAudio { track_id, samples } => { + self.relay.send_audio(track_id, &samples).await + } + // Standard messages routed by current network mode + other => self.send_mode_routed(other).await, + } + } + + /// Route standard outgoing messages based on current network mode. + /// Handles Audio, Control, and ClockSync messages for P2P, Relay, and Hybrid modes. + async fn send_mode_routed(&self, msg: OutgoingMessage) -> Result<()> { match self.mode() { NetworkMode::P2P => { match msg { @@ -613,6 +695,8 @@ impl NetworkManager { // Broadcast clock sync to all peers self.p2p.broadcast_control(OspMessageType::ClockSync, payload).await?; } + // P2PEncodedAudio and RelayAudio are handled in send_message + _ => {} } } NetworkMode::Relay => match msg { @@ -644,6 +728,8 @@ impl NetworkManager { .send_control(OspMessageType::ClockSync, payload) .await?; } + // P2PEncodedAudio and RelayAudio are handled in send_message + _ => {} }, NetworkMode::Hybrid => { // Send via both (relay handles fan-out, P2P for low-latency to nearby) @@ -684,6 +770,8 @@ impl NetworkManager { .send_control(OspMessageType::ClockSync, payload) .await?; } + // P2PEncodedAudio and RelayAudio are handled in send_message + _ => {} } } NetworkMode::Disconnected => {} diff --git a/native-bridge/src/network/p2p.rs b/native-bridge/src/network/p2p.rs index 43a97126..ca3a5346 100644 --- a/native-bridge/src/network/p2p.rs +++ b/native-bridge/src/network/p2p.rs @@ -262,8 +262,10 @@ impl P2PNetwork { samples.clone(), ); - // Update last seen + // Update last seen and audio statistics peer.touch(); + peer.set_audio_active(true); + peer.record_audio_received(frame.data.len()); // Emit event let _ = event_tx.send(P2PEvent::AudioReceived { @@ -541,6 +543,44 @@ impl P2PNetwork { Ok(()) } + /// Broadcast pre-encoded audio data to all connected peers via UDP. + /// Uses unreliable delivery (no retransmission) since real-time audio + /// tolerates packet loss better than retransmission latency. + /// The caller provides already Opus-encoded data to avoid redundant encoding + /// when the bridge has already encoded for other paths (e.g., browser WebSocket). + pub async fn broadcast_encoded_audio( + &self, + track_id: u8, + opus_data: &[u8], + channels: u8, + sample_count: u16, + ) -> Result<()> { + let local_id = *self.local_peer_id.read(); + let frame = AudioFrameMessage { + user_id: local_id, + track_id, + codec: 1, // Opus + channels, + sample_count, + data: opus_data.to_vec(), + }; + + let payload = frame.to_bytes(); + + // Broadcast to all connected peers via unreliable UDP (audio is realtime, + // retransmission would add latency worse than the occasional lost packet) + for peer in self.peers.connected() { + if let Err(e) = self + .send_packet(&peer, OspMessageType::AudioFrame, payload.clone(), false) + .await + { + warn!("Failed to broadcast encoded audio to peer {}: {}", peer.id, e); + } + } + + Ok(()) + } + /// Queue audio for sending (called from audio thread) pub fn queue_audio(&self, track_id: u8, samples: Vec) { let mut outgoing = self.outgoing_audio.write(); diff --git a/native-bridge/src/network/peer.rs b/native-bridge/src/network/peer.rs index dc033b69..c12a0759 100644 --- a/native-bridge/src/network/peer.rs +++ b/native-bridge/src/network/peer.rs @@ -6,8 +6,9 @@ use super::jitter::{JitterBuffer, JitterConfig}; use parking_lot::RwLock; use std::collections::HashMap; use std::net::SocketAddr; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; -use std::time::Instant; +use std::time::{Instant, SystemTime, UNIX_EPOCH}; /// Peer connection state #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -115,6 +116,14 @@ pub struct Peer { pub avatar_url: RwLock>, /// Instrument pub instrument: RwLock>, + /// Whether this peer is actively streaming audio + audio_active: RwLock, + /// Total audio packets received from this peer + audio_packets_received: AtomicU64, + /// Timestamp of last audio packet received from this peer (ms since UNIX epoch) + last_audio_timestamp_ms: AtomicU64, + /// Total audio bytes received from this peer + audio_bytes_received: AtomicU64, } impl Peer { @@ -146,6 +155,10 @@ impl Peer { is_master: RwLock::new(false), avatar_url: RwLock::new(None), instrument: RwLock::new(None), + audio_active: RwLock::new(false), + audio_packets_received: AtomicU64::new(0), + last_audio_timestamp_ms: AtomicU64::new(0), + audio_bytes_received: AtomicU64::new(0), } } @@ -343,6 +356,44 @@ impl Peer { pub fn set_direct_addr(&self, addr: Option) { *self.direct_addr.write() = addr; } + + /// Set whether this peer is actively streaming audio + pub fn set_audio_active(&self, active: bool) { + *self.audio_active.write() = active; + } + + /// Check if this peer is actively streaming audio + pub fn is_audio_active(&self) -> bool { + *self.audio_active.read() + } + + /// Record reception of an audio packet from this peer. + /// Updates packet count, byte count, and last-received timestamp. + pub fn record_audio_received(&self, byte_count: usize) { + self.audio_packets_received.fetch_add(1, Ordering::Relaxed); + self.audio_bytes_received + .fetch_add(byte_count as u64, Ordering::Relaxed); + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64; + self.last_audio_timestamp_ms.store(now, Ordering::Relaxed); + } + + /// Get total audio packets received from this peer + pub fn audio_packets_received(&self) -> u64 { + self.audio_packets_received.load(Ordering::Relaxed) + } + + /// Get timestamp (ms since UNIX epoch) of last audio packet from this peer + pub fn last_audio_timestamp_ms(&self) -> u64 { + self.last_audio_timestamp_ms.load(Ordering::Relaxed) + } + + /// Get total audio bytes received from this peer + pub fn audio_bytes_received(&self) -> u64 { + self.audio_bytes_received.load(Ordering::Relaxed) + } } impl std::fmt::Debug for Peer { @@ -355,6 +406,8 @@ impl std::fmt::Debug for Peer { .field("has_native_bridge", &self.has_native_bridge) .field("rtt_ms", &self.rtt_ms()) .field("quality_tier", &self.quality_tier()) + .field("audio_active", &self.is_audio_active()) + .field("audio_packets_received", &self.audio_packets_received()) .finish() } } @@ -435,6 +488,16 @@ impl PeerRegistry { .count() } + /// Get all connected peers that are actively streaming audio + pub fn audio_active(&self) -> Vec> { + self.peers + .read() + .values() + .filter(|p| p.state() == PeerState::Connected && p.is_audio_active()) + .cloned() + .collect() + } + /// Find the current master pub fn master(&self) -> Option> { self.peers.read().values().find(|p| p.is_master()).cloned() diff --git a/native-bridge/src/protocol/server.rs b/native-bridge/src/protocol/server.rs index 3fd062a8..668e7fa3 100644 --- a/native-bridge/src/protocol/server.rs +++ b/native-bridge/src/protocol/server.rs @@ -855,12 +855,18 @@ impl BridgeServer { Ok(event_rx) => { // Start AudioNetworkBridge to connect audio engine with network let audio_handle = app.audio_engine.create_bridge_handle(); - let bridge = crate::network::bridge::create_and_start_bridge( + match crate::network::bridge::create_and_start_bridge( network.clone(), audio_handle, event_rx, - ); - app.audio_bridge = Some(bridge); + ) { + Ok(bridge) => { + app.audio_bridge = Some(bridge); + } + Err(e) => { + tracing::error!("Failed to create audio-network bridge: {}", e); + } + } let mode = network.mode(); let is_master = network.is_master(); diff --git a/src/hooks/useAudioEngine.ts b/src/hooks/useAudioEngine.ts index 4270f132..5ebe1499 100644 --- a/src/hooks/useAudioEngine.ts +++ b/src/hooks/useAudioEngine.ts @@ -12,6 +12,7 @@ import type { BackingTrack, TrackAudioSettings } from '@/types'; // and ensures the engine persists across component remounts let globalEngine: AudioEngine | null = null; let globalEngineInitPromise: Promise | null = null; +let isListenerModeActive = false; export function useAudioEngine() { const animationFrameRef = useRef(null); @@ -85,6 +86,14 @@ export function useAudioEngine() { await engine.initialize(); console.log('Audio engine initialized successfully'); + // Apply listener-optimized jitter buffer for smooth receive-only playback. + // Listeners don't send audio, so we use a stable (larger) buffer + // targeting 100-200ms latency for glitch-free playback over variable networks. + if (isListenerModeActive) { + engine.setJitterBufferMode('stable'); + console.log('[AudioEngine] Listener mode: using stable jitter buffer (100-200ms target latency)'); + } + // Set up level monitoring - use getState() inside callback engine.setOnLevelUpdate((levels) => { useRoomStore.getState().setAudioLevels(levels); @@ -642,6 +651,7 @@ export function useAudioEngine() { globalEngine.dispose(); globalEngine = null; globalEngineInitPromise = null; + isListenerModeActive = false; // Use getState() to avoid dependency issues const { setInitialized, setAudioContext, setBackingTrackAnalyser, setMasterAnalyser } = useAudioStore.getState(); setInitialized(false); @@ -793,6 +803,52 @@ export function useAudioEngine() { globalEngine?.updateBroadcastConnections(); }, []); + // === Listener Mode === + // Listeners receive audio without microphone access, using larger jitter buffers + // for smooth playback at the cost of slightly higher latency. + + /** + * Initialize audio engine in listener mode (receive-only, no microphone). + * Creates AudioContext with stable jitter buffer preset for smooth playback. + * Does NOT request microphone permissions or create a broadcast stream. + */ + const initializeListenerAudio = useCallback(async () => { + isListenerModeActive = true; + const engine = await initialize(); + return engine; + }, [initialize]); + + /** + * Switch between listener and performer modes at runtime. + * - Listener mode: stops capture, uses stable (larger) jitter buffers, no broadcast. + * - Performer mode: restores balanced jitter buffers. The caller is responsible + * for calling startCapture() after switching to performer mode to enable microphone. + */ + const setListenerMode = useCallback(async (mode: boolean) => { + const previousMode = isListenerModeActive; + isListenerModeActive = mode; + + if (mode === previousMode) { + return; + } + + if (mode) { + // Switching TO listener mode: stop capture, use stable jitter buffer + stopCapture(); + if (globalEngine) { + globalEngine.setJitterBufferMode('stable'); + console.log('[AudioEngine] Switched to listener mode: capture stopped, stable jitter buffer active'); + } + } else { + // Switching TO performer mode: restore balanced jitter buffer + // Caller must call startCapture() after this to enable microphone and broadcast + if (globalEngine) { + globalEngine.setJitterBufferMode('balanced'); + console.log('[AudioEngine] Switched to performer mode: balanced jitter buffer active, call startCapture() to enable microphone'); + } + } + }, [stopCapture]); + return { isInitialized, isPlaying, @@ -868,5 +924,9 @@ export function useAudioEngine() { enableMultiTrackBridgeAudio, disableMultiTrackBridgeAudio, updateBroadcastConnections, + // Listener mode (receive-only, no microphone) + isListenerMode: isListenerModeActive, + initializeListenerAudio, + setListenerMode, }; } diff --git a/src/hooks/useRoom.ts b/src/hooks/useRoom.ts index 1cd901c6..54c42a94 100644 --- a/src/hooks/useRoom.ts +++ b/src/hooks/useRoom.ts @@ -2,8 +2,10 @@ import { useCallback, useEffect, useRef } from 'react'; import { RealtimeRoomManager } from '@/lib/supabase/realtime'; +import type { StateSyncPayload } from '@/lib/supabase/realtime'; import { CloudflareCalls } from '@/lib/cloudflare/calls'; import { authFetch, authFetchJson } from '@/lib/auth-fetch'; +import { useTempoRealtimeBroadcast } from './useTempoRealtimeBroadcast'; // Storage key for persisted guest ID const GUEST_ID_STORAGE_KEY = 'openstudio_guest_id'; @@ -92,6 +94,77 @@ export interface SongSelectPayload { userId: string; } +/** + * WS2: Build and broadcast full room state to all clients. + * Called by the master when: + * 1. A new user joins (proactive sync) + * 2. A state:request is received from a joining/reconnecting user + * 3. A new master is elected after failover + */ +function broadcastFullStateSync(realtime: RealtimeRoomManager, requestId?: string): void { + const roomState = useRoomStore.getState(); + const audioState = useAudioStore.getState(); + const userTracksStore = useUserTracksStore.getState(); + const loopTracksStore = useLoopTracksStore.getState(); + const permissionsStore = usePermissionsStore.getState(); + + // Lazy-load stores to avoid circular dependency + const { useSessionTempoStore } = require('@/stores/session-tempo-store'); + const { useSongsStore } = require('@/stores/songs-store'); + const tempoState = useSessionTempoStore.getState(); + const songsState = useSongsStore.getState(); + + const allTracks = userTracksStore.getAllTracks().filter((t: UserTrack) => t.isActive); + const allLoopTracks = Array.from(loopTracksStore.tracks.values()) as LoopTrackState[]; + const roomSongs = songsState.getSongsByRoom(roomState.room?.id || ''); + + // Build song track states from current song + const songTrackStates: Record = {}; + if (songsState.currentSongId) { + const currentSong = songsState.songs.get(songsState.currentSongId); + if (currentSong) { + for (const trackRef of currentSong.tracks) { + songTrackStates[trackRef.id] = { + muted: trackRef.muted ?? false, + solo: trackRef.solo ?? false, + volume: trackRef.volume ?? 1, + }; + } + } + } + + const payload: StateSyncPayload = { + requestId: requestId || `master-${Date.now()}`, + queue: roomState.queue.tracks, + currentTrack: roomState.currentTrack, + currentTrackPosition: audioState.currentTime || 0, + isPlaying: audioState.isPlaying || roomState.queue.isPlaying, + tempo: tempoState.tempo, + tempoSource: tempoState.source, + timeSignature: { beatsPerBar: tempoState.beatsPerBar, beatUnit: tempoState.beatUnit }, + key: tempoState.key, + keyScale: tempoState.keyScale, + keySource: tempoState.keySource, + userTracks: allTracks, + loopTracks: allLoopTracks, + songs: roomSongs, + currentSongId: songsState.currentSongId, + stemMixState: roomState.stemMixState as unknown as Record, + permissions: { + members: permissionsStore.members, + defaultRole: permissionsStore.defaultRole, + }, + songTrackStates, + timestamp: Date.now(), + }; + + realtime.broadcastStateSync(payload).catch((err) => { + console.error('[useRoom] Failed to broadcast full state sync:', err); + }); + + console.log(`[useRoom] Full state sync broadcast: ${allTracks.length} tracks, ${allLoopTracks.length} loops, ${roomSongs.length} songs, playing=${payload.isPlaying}`); +} + interface UseRoomOptions { onUserJoined?: (user: User) => void; onUserLeft?: (userId: string) => void; @@ -107,6 +180,10 @@ interface UseRoomOptions { export function useRoom(roomId: string, options: UseRoomOptions = {}) { const realtimeRef = useRef(null); const cloudflareRef = useRef(null); + // WS6: Periodic position sync interval (master broadcasts every 5s during playback) + const positionSyncIntervalRef = useRef | null>(null); + // WS2: State sync retry tracking for new joiners + const stateSyncReceivedRef = useRef(false); // Use authenticated user ID if available, otherwise will be populated with signed guest ID // This is initialized once, but updated in join() if auth state changes @@ -147,6 +224,8 @@ export function useRoom(roomId: string, options: UseRoomOptions = {}) { getOrCreateTrackProcessor, setTrackMediaStreamInput, updateTrackState, + // WS5: Listener mode (receive-only, no microphone) + initializeListenerAudio, } = useAudioEngine(); // Stats tracking for gamification @@ -259,8 +338,13 @@ export function useRoom(roomId: string, options: UseRoomOptions = {}) { // Continue anyway - room_tracks may still work } - // Initialize audio engine - await initialize(); + // Initialize audio engine (WS5: listener mode uses optimized receive-only path) + if (listenerMode) { + await initializeListenerAudio(); + console.log('[useRoom] Audio engine initialized in listener mode (receive-only, stable jitter buffer)'); + } else { + await initialize(); + } // Load persisted user tracks from the database const userTracksState = useUserTracksStore.getState(); @@ -690,108 +774,20 @@ export function useRoom(roomId: string, options: UseRoomOptions = {}) { } } - // CRITICAL: Sync room state to new joiners - // If we're the master, broadcast current tempo/time signature so new users get the current state - // This fixes the issue where new users always see 120 BPM regardless of actual room state + // CRITICAL: Sync room state to new joiners via state:request/state:sync protocol (WS2) + // The new user will send a state:request; the master responds via the state:request handler below. + // As a fallback for the initial join race condition, also proactively broadcast after a short delay. if (hasNewUsers && useRoomStore.getState().isMaster) { - // Small delay to ensure new user's broadcast handlers are ready - // The new user needs time to complete subscription before receiving broadcasts - setTimeout(async () => { - console.log('[useRoom] Master broadcasting room state to new users...'); - const { useSessionTempoStore } = require('@/stores/session-tempo-store'); - const tempoState = useSessionTempoStore.getState(); - - // Broadcast current effective tempo (not just manualTempo) - // This ensures new joiners get the actual tempo being used - realtime.broadcastTempoUpdate(tempoState.tempo, tempoState.source); - - // Broadcast current time signature - realtime.broadcastTimeSignature(tempoState.beatsPerBar, tempoState.beatUnit); - - // Broadcast current tempo source - realtime.broadcastTempoSource(tempoState.source); - - // Broadcast current key/scale if set - if (tempoState.key) { - realtime.broadcastKeyUpdate(tempoState.key, tempoState.keyScale, tempoState.keySource); - } - - // CRITICAL: Broadcast current queue state so new users (especially listeners) - // can sync their queue and respond to play/pause/seek events - const currentQueue = useRoomStore.getState().queue; - if (currentQueue.tracks.length > 0) { - console.log(`[useRoom] Broadcasting queue with ${currentQueue.tracks.length} tracks to new users`); - realtime.broadcastQueueUpdate(currentQueue); - - // CRITICAL: If a track is currently playing, broadcast the playback state - // so late joiners can start playing from the current position - const audioState = useAudioStore.getState(); - const currentTrackFromStore = useRoomStore.getState().currentTrack; - if (currentQueue.isPlaying && currentTrackFromStore) { - const currentPlaybackTime = audioState.currentTime || 0; - const syncTime = Date.now() + 200; // Give late joiner time to load - console.log(`[useRoom] Broadcasting current playback state to late joiner: track=${currentTrackFromStore.id}, time=${currentPlaybackTime}`); - realtime.broadcastPlay(currentTrackFromStore.id, currentPlaybackTime, syncTime); - } - } - - // Broadcast all user tracks so new joiners see everyone's tracks - const userTracksStore = useUserTracksStore.getState(); - const allTracks = userTracksStore.getAllTracks(); - for (const track of allTracks) { - if (track.isActive) { - await realtime.broadcastUserTrackAdd(track); - } - } - console.log(`[useRoom] Broadcast ${allTracks.filter(t => t.isActive).length} active tracks to new users`); - - // Broadcast current songs list so new joiners can sync - const { useSongsStore } = require('@/stores/songs-store'); - const songsState = useSongsStore.getState(); - const roomSongs = songsState.getSongsByRoom(roomId); - if (roomSongs.length > 0) { - realtime.broadcastSongsSync(roomSongs, songsState.currentSongId); - console.log(`[useRoom] Broadcast ${roomSongs.length} songs to new users`); - - // If a song is currently playing, broadcast playback state - // so late joiners can start hearing it from the correct position - const songAudioState = useAudioStore.getState(); - if (songAudioState.isPlaying && songsState.currentSongId) { - const currentSong = songsState.songs.get(songsState.currentSongId); - if (currentSong) { - const trackStates = currentSong.tracks.map((trackRef: SongTrackReference) => ({ - trackRefId: trackRef.id, - muted: trackRef.muted ?? false, - solo: trackRef.solo ?? false, - volume: trackRef.volume ?? 1, - })); - const syncTime = Date.now() + 300; // Give late joiner time to load - realtime.broadcastSongPlay( - songsState.currentSongId, - songAudioState.currentTime || 0, - syncTime, - trackStates - ); - console.log(`[useRoom] Broadcast song playback state for late joiner: song=${songsState.currentSongId}, time=${songAudioState.currentTime}`); - } - } - } - - // Broadcast current loop tracks so new joiners can sync - const loopTracksState = useLoopTracksStore.getState(); - const allLoopTracks = Array.from(loopTracksState.tracks.values()); - if (allLoopTracks.length > 0) { - realtime.broadcastLoopTrackSync(allLoopTracks); - console.log(`[useRoom] Broadcast ${allLoopTracks.length} loop tracks to new users`); + // Short delay to allow new user's subscription to become active + setTimeout(() => { + // Only proactively broadcast if no state:request has been received yet + // (The state:request handler is the preferred path; this is a safety net) + const roomStore = useRoomStore.getState(); + if (roomStore.isMaster) { + console.log('[useRoom] Master proactively broadcasting state sync to new users'); + broadcastFullStateSync(realtime); } - - // Broadcast current permissions so new joiners get accurate role info - const permissionsState = usePermissionsStore.getState(); - if (permissionsState.members.length > 0) { - realtime.broadcastPermissionsSync(permissionsState.members, permissionsState.defaultRole); - console.log(`[useRoom] Broadcast permissions to new users`); - } - }, 500); + }, 300); } }); @@ -836,6 +832,14 @@ export function useRoom(roomId: string, options: UseRoomOptions = {}) { cloudflareRef.current?.setAsMaster(true); usePermissionsStore.getState().setMyPermissions('owner'); console.log('[useRoom] We are now the room master'); + + // WS2: Re-broadcast full room state as new master so all users have authoritative state + setTimeout(() => { + if (realtimeRef.current && useRoomStore.getState().isMaster) { + console.log('[useRoom] New master re-broadcasting full room state'); + broadcastFullStateSync(realtimeRef.current); + } + }, 200); } } } @@ -1188,6 +1192,156 @@ export function useRoom(roomId: string, options: UseRoomOptions = {}) { } }); + // WS2: State request/response protocol + // When a new user joins (or reconnects), they send state:request + // The master responds with the full room state via state:sync + realtime.on('state:request', (data) => { + const payload = data as { userId: string; requestId: string }; + if (payload.userId === user.id) return; + + // Only master responds to state requests + if (!useRoomStore.getState().isMaster) return; + + console.log(`[useRoom] Received state:request from ${payload.userId}, broadcasting full state sync`); + broadcastFullStateSync(realtime, payload.requestId); + }); + + realtime.on('state:sync', (data) => { + const payload = data as StateSyncPayload & { userId: string }; + if (payload.userId === user.id) return; + + stateSyncReceivedRef.current = true; + console.log('[useRoom] Received state:sync from master, applying full state'); + + // Apply queue + if (payload.queue.length > 0) { + const queue: TrackQueue = { + tracks: payload.queue, + currentIndex: payload.currentTrack + ? payload.queue.findIndex(t => t.id === payload.currentTrack?.id) + : 0, + isPlaying: payload.isPlaying, + currentTime: payload.currentTrackPosition, + syncTimestamp: payload.timestamp, + }; + useRoomStore.getState().setQueue(queue); + if (payload.currentTrack) { + useRoomStore.getState().setCurrentTrack(payload.currentTrack); + } + } + + // Apply tempo + const { useSessionTempoStore } = require('@/stores/session-tempo-store'); + const tempoStore = useSessionTempoStore.getState(); + if (payload.tempoSource === 'manual' || payload.tempoSource === 'tap') { + tempoStore.setManualTempo(payload.tempo); + } + tempoStore.setSource(payload.tempoSource as 'manual' | 'track' | 'analyzer' | 'tap'); + tempoStore.setTimeSignature(payload.timeSignature.beatsPerBar, payload.timeSignature.beatUnit); + if (payload.key) { + tempoStore.setManualKey(payload.key, payload.keyScale as 'major' | 'minor' | null); + } + + // Apply user tracks + if (payload.userTracks.length > 0) { + useUserTracksStore.getState().loadPersistedTracks(payload.userTracks); + } + + // Apply loop tracks + if (payload.loopTracks.length > 0) { + useLoopTracksStore.getState().loadTracks(payload.loopTracks); + } + + // Apply songs + if (payload.songs.length > 0) { + const { useSongsStore } = require('@/stores/songs-store'); + useSongsStore.getState().setSongs(payload.songs); + if (payload.currentSongId) { + useSongsStore.getState().selectSong(payload.currentSongId); + } + } + + // Apply stem mix state + if (payload.stemMixState && Object.keys(payload.stemMixState).length > 0) { + useRoomStore.getState().setStemMixState(payload.stemMixState as StemMixState); + } + + // Apply permissions + if (payload.permissions) { + const permData = payload.permissions as { members?: RoomMember[]; defaultRole?: RoomRole }; + if (permData.members) { + usePermissionsStore.getState().setMembers(permData.members); + } + if (permData.defaultRole) { + usePermissionsStore.getState().setDefaultRole(permData.defaultRole); + } + } + + // If track is playing, sync playback + if (payload.isPlaying && payload.currentTrack) { + const syncTime = Date.now() + 200; + loadBackingTrack(payload.currentTrack).then((success) => { + if (success) { + playBackingTrack(syncTime, payload.currentTrackPosition); + useRoomStore.getState().setQueuePlaying(true); + console.log(`[useRoom] State sync: started playback at position ${payload.currentTrackPosition}`); + } + }); + } + }); + + // WS2: Reconnection handler — auto-request state sync when connection is restored + realtime.on('reconnected', () => { + console.log('[useRoom] Reconnected to room, requesting state sync...'); + stateSyncReceivedRef.current = false; + // State request is already sent automatically by RealtimeRoomManager on reconnect + // Set a timeout to warn if sync is not received + setTimeout(() => { + if (!stateSyncReceivedRef.current) { + console.warn('[useRoom] State sync not received after reconnection within 3s, requesting again...'); + realtimeRef.current?.broadcastStateRequest().catch(() => { + console.warn('[useRoom] Failed to re-request state sync after reconnection'); + }); + } + }, 3000); + }); + + realtime.on('disconnected', () => { + console.log('[useRoom] Disconnected from room, waiting for reconnection...'); + }); + + // WS6: Periodic transport position sync — correct drift over long sessions + realtime.on('track:position', (data) => { + const payload = data as { trackId: string; currentTime: number; syncTimestamp: number; isPlaying: boolean; userId: string }; + if (payload.userId === user.id) return; + + // Only non-masters apply position corrections (master is authoritative) + if (useRoomStore.getState().isMaster) return; + + const localTime = useAudioStore.getState().currentTime || 0; + const drift = Math.abs(localTime - payload.currentTime); + + // Correct if drift exceeds 200ms + if (drift > 0.2 && payload.isPlaying) { + console.log(`[useRoom] Position drift detected: ${(drift * 1000).toFixed(0)}ms, correcting to ${payload.currentTime.toFixed(2)}s`); + seekTo(payload.currentTime, Date.now() + 50); + } + }); + + realtime.on('song:position', (data) => { + const payload = data as { songId: string; currentTime: number; syncTimestamp: number; isPlaying: boolean; userId: string }; + if (payload.userId === user.id) return; + if (useRoomStore.getState().isMaster) return; + + const localTime = useAudioStore.getState().currentTime || 0; + const drift = Math.abs(localTime - payload.currentTime); + + if (drift > 0.2 && payload.isPlaying) { + console.log(`[useRoom] Song position drift: ${(drift * 1000).toFixed(0)}ms, correcting`); + seekTo(payload.currentTime, Date.now() + 50); + } + }); + // Song playback event handlers (multi-track timeline sync) // These sync the Song system playback across all room members realtime.on('song:play', (data) => { @@ -1296,6 +1450,18 @@ export function useRoom(roomId: string, options: UseRoomOptions = {}) { await realtime.connect(user); realtimeRef.current = realtime; + // WS2: Request state sync from master after connecting + // Small delay to ensure subscription is fully active before requesting state + stateSyncReceivedRef.current = false; + setTimeout(() => { + if (realtimeRef.current && !stateSyncReceivedRef.current) { + console.log('[useRoom] Requesting state sync from master...'); + realtimeRef.current.broadcastStateRequest().catch((err) => { + console.warn('[useRoom] Failed to request initial state sync:', err); + }); + } + }, 200); + // Register song broadcast callbacks so CRUD operations auto-broadcast setSongBroadcastCallbacks({ onSongCreate: (song) => { @@ -1384,6 +1550,12 @@ export function useRoom(roomId: string, options: UseRoomOptions = {}) { }).catch(err => console.error('Failed to mark track as inactive:', err)); } + // WS6: Stop periodic position sync + if (positionSyncIntervalRef.current) { + clearInterval(positionSyncIntervalRef.current); + positionSyncIntervalRef.current = null; + } + await realtimeRef.current?.disconnect(); await cloudflareRef.current?.leaveRoom(); @@ -1475,6 +1647,23 @@ export function useRoom(roomId: string, options: UseRoomOptions = {}) { realtimeRef.current?.broadcastPlay(freshCurrentTrack.id, freshQueue.currentTime, syncTime); setQueuePlaying(true); + + // WS6: Start periodic position sync (every 5 seconds during playback) + if (positionSyncIntervalRef.current) { + clearInterval(positionSyncIntervalRef.current); + } + positionSyncIntervalRef.current = setInterval(() => { + const audioState = useAudioStore.getState(); + const roomState = useRoomStore.getState(); + if (roomState.isMaster && audioState.isPlaying && roomState.currentTrack) { + realtimeRef.current?.broadcastTrackPosition( + roomState.currentTrack.id, + audioState.currentTime || 0, + Date.now(), + true + ); + } + }, 5000); }, [initialize, loadBackingTrack, playBackingTrack]); const pause = useCallback(async () => { @@ -1496,6 +1685,12 @@ export function useRoom(roomId: string, options: UseRoomOptions = {}) { realtimeRef.current?.broadcastPause(freshCurrentTrack.id, currentTime); setQueuePlaying(false); + + // WS6: Stop periodic position sync during pause + if (positionSyncIntervalRef.current) { + clearInterval(positionSyncIntervalRef.current); + positionSyncIntervalRef.current = null; + } }, [pauseBackingTrack]); const seek = useCallback(async (time: number) => { @@ -2046,6 +2241,29 @@ export function useRoom(roomId: string, options: UseRoomOptions = {}) { realtimeRef.current?.broadcastStemVolume(trackId, stem, volume); }, [audioSetStemVolume]); + // WS3: Auto-broadcast tempo/time-signature changes when we're the master + // This integrates the useTempoRealtimeBroadcast hook that was previously unused, + // ensuring BPM/key/timesig changes auto-broadcast whenever the local user modifies them. + const tempoBroadcastTempoUpdate = useCallback((tempo: number, source: string) => { + if (useRoomStore.getState().isMaster) { + realtimeRef.current?.broadcastTempoUpdate(tempo, source); + } + }, []); + + const tempoBroadcastSource = useCallback((source: string) => { + if (useRoomStore.getState().isMaster) { + realtimeRef.current?.broadcastTempoSource(source); + } + }, []); + + const tempoBroadcastTimeSig = useCallback((beatsPerBar: number, beatUnit: number) => { + if (useRoomStore.getState().isMaster) { + realtimeRef.current?.broadcastTimeSignature(beatsPerBar, beatUnit); + } + }, []); + + useTempoRealtimeBroadcast(tempoBroadcastTempoUpdate, tempoBroadcastSource, tempoBroadcastTimeSig); + // Cleanup on unmount useEffect(() => { return () => { diff --git a/src/lib/audio/audio-engine.ts b/src/lib/audio/audio-engine.ts index 30246b5d..06b83f84 100644 --- a/src/lib/audio/audio-engine.ts +++ b/src/lib/audio/audio-engine.ts @@ -66,6 +66,7 @@ export class AudioEngine { private broadcastSongGain: GainNode | null = null; // Routes song/backing track audio to broadcast private midiSourceNode: MediaStreamAudioSourceNode | null = null; private midiMixGain: GainNode | null = null; + private broadcastActive: boolean = false; // Tracks whether broadcast was set up (survives AudioContext recreation) // Native bridge audio source (replaces localSourceNode when bridge is active) private bridgeWorkletNode: AudioWorkletNode | null = null; @@ -266,6 +267,18 @@ export class AudioEngine { this.songGain?.disconnect(); this.backingTrackGain?.disconnect(); + // Clean up broadcast nodes - they reference the old AudioContext and must be rebuilt + // broadcastActive flag is preserved so updateBroadcastConnections can rebuild them + this.broadcastSongGain?.disconnect(); + this.broadcastSongGain = null; + this.broadcastMixerGain?.disconnect(); + this.broadcastMixerGain = null; + this.broadcastDestination = null; + this.midiSourceNode?.disconnect(); + this.midiSourceNode = null; + this.midiMixGain?.disconnect(); + this.midiMixGain = null; + // Close old context await this.audioContext.close(); this.audioContext = null; @@ -285,6 +298,13 @@ export class AudioEngine { // Note: Bridge audio for tracks is re-enabled via setTrackBridgeInput calls // from useNativeBridge after sample rate change completes + // Rebuild broadcast connections if broadcast was active before sample rate change + // Must be done after initialize() recreates core audio nodes (songGain, masterGain, etc.) + if (this.broadcastActive) { + this.updateBroadcastConnections(); + console.log('[AudioEngine] Broadcast connections rebuilt after sample rate change'); + } + console.log('[AudioEngine] Sample rate changed to', this.config.sampleRate); } @@ -1377,13 +1397,27 @@ export class AudioEngine { return null; } - // Create broadcast destination if not exists - if (!this.broadcastDestination) { - this.broadcastDestination = this.audioContext.createMediaStreamDestination(); - this.broadcastMixerGain = this.audioContext.createGain(); - this.broadcastMixerGain.connect(this.broadcastDestination); + // Idempotent: tear down existing broadcast nodes before rebuilding + // This prevents duplicate connections and stale node references after AudioContext recreation + if (this.broadcastDestination) { + console.log('[AudioEngine] Tearing down existing broadcast nodes for rebuild'); + this.midiSourceNode?.disconnect(); + this.midiSourceNode = null; + this.midiMixGain?.disconnect(); + this.midiMixGain = null; + this.broadcastSongGain?.disconnect(); + this.broadcastSongGain = null; + this.broadcastMixerGain?.disconnect(); + this.broadcastMixerGain = null; + this.broadcastDestination = null; } + // Create fresh broadcast destination and mixer + this.broadcastDestination = this.audioContext.createMediaStreamDestination(); + this.broadcastMixerGain = this.audioContext.createGain(); + this.broadcastMixerGain.connect(this.broadcastDestination); + this.broadcastActive = true; + // Connect all track processor outputs to broadcast mixer // Each track's output goes through its own mute/solo logic if (this.broadcastMixerGain) { @@ -1432,21 +1466,84 @@ export class AudioEngine { * Call this after adding new tracks while broadcast is active */ updateBroadcastConnections(): void { - if (!this.broadcastMixerGain || !this.audioContext) return; + if (!this.audioContext) return; + + // If broadcast infrastructure doesn't exist, rebuild it only if broadcast was previously active + // This handles AudioContext recreation (sample rate changes) where old nodes become invalid + if (!this.broadcastDestination || !this.broadcastMixerGain) { + if (!this.broadcastActive) return; + + console.log('[AudioEngine] Rebuilding broadcast infrastructure after context recreation'); + this.broadcastDestination = this.audioContext.createMediaStreamDestination(); + this.broadcastMixerGain = this.audioContext.createGain(); + this.broadcastMixerGain.connect(this.broadcastDestination); + } + // Reconnect all track processors to broadcast mixer for (const processor of this.trackProcessors.values()) { - processor.getBroadcastNode().connect(this.broadcastMixerGain); + try { + processor.getBroadcastNode().connect(this.broadcastMixerGain); + } catch (err) { + console.warn('[AudioEngine] Failed to connect track processor to broadcast:', err); + } } - // Ensure song audio is still connected to broadcast - // This can become disconnected after sample rate changes or engine recreation - if (this.songGain && !this.broadcastSongGain) { + // Rebuild song -> broadcast connection unconditionally to handle stale references + // After AudioContext recreation, broadcastSongGain may reference nodes from the old context + if (this.songGain) { + if (this.broadcastSongGain) { + try { + this.broadcastSongGain.disconnect(); + } catch { + // Node may already be disconnected from old context + } + } this.broadcastSongGain = this.audioContext.createGain(); this.broadcastSongGain.gain.value = 1.0; this.songGain.connect(this.broadcastSongGain); this.broadcastSongGain.connect(this.broadcastMixerGain); - console.log('[AudioEngine] Reconnected song audio to broadcast mix'); + console.log('[AudioEngine] Song audio -> broadcast connection established'); + } + } + + /** + * Verify the broadcast audio chain is intact. + * Traces the node graph: backingTrackGain -> songGain -> broadcastSongGain -> broadcastMixerGain + * Logs which connections are intact and which are broken. + * @returns true if the full broadcast chain is complete, false if any link is broken + */ + verifyBroadcastChain(): boolean { + // If broadcast was never set up, chain is trivially "not needed" + if (!this.broadcastActive) { + return true; + } + + const links: Array<{ name: string; present: boolean }> = [ + { name: 'audioContext', present: this.audioContext !== null }, + { name: 'backingTrackGain', present: this.backingTrackGain !== null }, + { name: 'songGain', present: this.songGain !== null }, + { name: 'broadcastSongGain', present: this.broadcastSongGain !== null }, + { name: 'broadcastMixerGain', present: this.broadcastMixerGain !== null }, + { name: 'broadcastDestination', present: this.broadcastDestination !== null }, + ]; + + const brokenLinks = links.filter(link => !link.present); + const isComplete = brokenLinks.length === 0; + + if (!isComplete) { + console.warn( + '[AudioEngine] Broadcast chain BROKEN. Missing nodes:', + brokenLinks.map(l => l.name).join(', ') + ); + console.warn( + '[AudioEngine] Broadcast chain status:', + links.map(l => `${l.name}=${l.present ? 'OK' : 'MISSING'}`).join(' -> ') + ); + } else { + console.log('[AudioEngine] Broadcast chain verified: all nodes present'); } + + return isComplete; } /** @@ -1692,6 +1789,18 @@ export class AudioEngine { } } + // Guard: ensure broadcast chain is intact before playback + // broadcastSongGain can be null after AudioContext recreation or stem loading + if (this.broadcastActive && !this.broadcastSongGain) { + console.log('[AudioEngine] broadcastSongGain missing before playback, rebuilding broadcast connections'); + this.updateBroadcastConnections(); + } + + // Verify broadcast chain and warn if broken + if (this.broadcastActive && !this.verifyBroadcastChain()) { + console.warn('[AudioEngine] Broadcast chain broken before backing track playback - listeners may not hear audio'); + } + this.stopBackingTrack(); // Calculate start time based on sync timestamp @@ -1736,6 +1845,18 @@ export class AudioEngine { } } + // Guard: ensure broadcast chain is intact before stem playback + // broadcastSongGain can be null after AudioContext recreation or stem loading + if (this.broadcastActive && !this.broadcastSongGain) { + console.log('[AudioEngine] broadcastSongGain missing before stem playback, rebuilding broadcast connections'); + this.updateBroadcastConnections(); + } + + // Verify broadcast chain and warn if broken + if (this.broadcastActive && !this.verifyBroadcastChain()) { + console.warn('[AudioEngine] Broadcast chain broken before stem playback - listeners may not hear audio'); + } + this.stopStemmedTrack(); const now = Date.now(); @@ -1778,6 +1899,13 @@ export class AudioEngine { this.stemSources.set(stemType, source); } + // After connecting stems through their gain nodes -> backingTrackGain, + // explicitly update broadcast connections so stem audio routes through + // to broadcastSongGain for WebRTC listeners + if (this.broadcastActive) { + this.updateBroadcastConnections(); + } + this.playbackStartTime = startTime; this.playbackOffset = offset; this.isPlaying = true; @@ -2298,6 +2426,7 @@ export class AudioEngine { this.broadcastMixerGain?.disconnect(); this.broadcastMixerGain = null; this.broadcastDestination = null; + this.broadcastActive = false; this.localSourceNode?.disconnect(); this.localSourceNode = null; diff --git a/src/lib/audio/native-bridge.ts b/src/lib/audio/native-bridge.ts index bffce424..f86cf128 100644 --- a/src/lib/audio/native-bridge.ts +++ b/src/lib/audio/native-bridge.ts @@ -80,7 +80,8 @@ type NativeMessage = | { type: 'roomLeft' } | { type: 'peerConnected'; userId: string; userName: string; hasNativeBridge: boolean } | { type: 'peerDisconnected'; userId: string; reason: string } - | { type: 'networkModeChanged'; mode: string }; + | { type: 'networkModeChanged'; mode: string } + | { type: 'p2pStats'; peerId: string; rtt: number; packetLoss: number; jitter: number }; // === Audio Data Types === @@ -117,7 +118,9 @@ export type BridgeEventType = | 'roomLeft' | 'peerConnected' | 'peerDisconnected' - | 'networkModeChanged'; + | 'networkModeChanged' + | 'peerAudioReceived' + | 'p2pStats'; type BridgeEventData = { connected: { version: string; driverType: string }; @@ -135,6 +138,8 @@ type BridgeEventData = { peerConnected: { userId: string; userName: string; hasNativeBridge: boolean }; peerDisconnected: { userId: string; reason: string }; networkModeChanged: { mode: string }; + peerAudioReceived: { userId: string; samples: Float32Array }; + p2pStats: { peerId: string; rtt: number; packetLoss: number; jitter: number }; }; type BridgeEventCallback = (data: BridgeEventData[T]) => void; @@ -158,6 +163,15 @@ export class NativeBridge { // Track last sent channel config to avoid redundant restarts private lastChannelConfig: { channelCount: number; leftChannel: number; rightChannel?: number } | null = null; + // P2P coordination: track which remote peers are in the room and which have native bridge + private roomPeers: Set = new Set(); + private nativeBridgePeers: Set = new Set(); + private currentNetworkMode: 'webrtc' | 'p2p' | 'hybrid' = 'webrtc'; + + // P2P audio and stats callbacks + onPeerAudioReceived?: (userId: string, samples: Float32Array) => void; + onP2PStatsUpdate?: (stats: { peerId: string; rtt: number; packetLoss: number; jitter: number }) => void; + // Singleton instance private static instance: NativeBridge | null = null; @@ -185,6 +199,8 @@ export class NativeBridge { 'peerConnected', 'peerDisconnected', 'networkModeChanged', + 'peerAudioReceived', + 'p2pStats', ]; for (const type of eventTypes) { this.listeners.set(type, new Set()); @@ -299,6 +315,10 @@ export class NativeBridge { this.isConnected = false; // Reset cached config so next connection will properly configure this.lastChannelConfig = null; + // Clear P2P peer tracking state + this.roomPeers.clear(); + this.nativeBridgePeers.clear(); + this.currentNetworkMode = 'webrtc'; } /** @@ -476,23 +496,50 @@ export class NativeBridge { case 'roomLeft': console.log('[NativeBridge] Room left'); + this.roomPeers.clear(); + this.nativeBridgePeers.clear(); + this.currentNetworkMode = 'webrtc'; this.emit('roomLeft', {}); break; case 'peerConnected': console.log('[NativeBridge] Peer connected:', msg.userName, '(' + msg.userId + ')', 'native:', msg.hasNativeBridge); + this.roomPeers.add(msg.userId); + if (msg.hasNativeBridge) { + this.nativeBridgePeers.add(msg.userId); + } + this.evaluateAndSetNetworkMode(); this.emit('peerConnected', { userId: msg.userId, userName: msg.userName, hasNativeBridge: msg.hasNativeBridge }); break; case 'peerDisconnected': console.log('[NativeBridge] Peer disconnected:', msg.userId, 'reason:', msg.reason); + this.roomPeers.delete(msg.userId); + this.nativeBridgePeers.delete(msg.userId); + this.evaluateAndSetNetworkMode(); this.emit('peerDisconnected', { userId: msg.userId, reason: msg.reason }); break; case 'networkModeChanged': console.log('[NativeBridge] Network mode changed:', msg.mode); + this.currentNetworkMode = msg.mode as 'webrtc' | 'p2p' | 'hybrid'; this.emit('networkModeChanged', { mode: msg.mode }); break; + + case 'p2pStats': + this.emit('p2pStats', { + peerId: msg.peerId, + rtt: msg.rtt, + packetLoss: msg.packetLoss, + jitter: msg.jitter, + }); + this.onP2PStatsUpdate?.({ + peerId: msg.peerId, + rtt: msg.rtt, + packetLoss: msg.packetLoss, + jitter: msg.jitter, + }); + break; } } catch (e) { console.error('[NativeBridge] Failed to parse message:', e); @@ -530,6 +577,20 @@ export class NativeBridge { headerSize = 14 + trackIdLen; } + // P2P peer audio: decoded audio received from a remote native bridge peer + // Format: [msg_type: 3][sample_count: u32][timestamp: u64][user_id_len: u8][user_id: utf8][samples: f32 LE...] + if (msgType === 3) { + const userIdLen = view.getUint8(13); + const userIdBytes = new Uint8Array(data, 14, userIdLen); + const peerUserId = new TextDecoder().decode(userIdBytes); + const peerHeaderSize = 14 + userIdLen; + const peerSamplesBuffer = data.slice(peerHeaderSize); + const peerSamples = new Float32Array(peerSamplesBuffer); + this.onPeerAudioReceived?.(peerUserId, peerSamples); + this.emit('peerAudioReceived', { userId: peerUserId, samples: peerSamples }); + return; + } + // Parse samples (f32 little-endian) // Need to slice buffer for proper alignment const samplesBuffer = data.slice(headerSize); @@ -871,13 +932,79 @@ export class NativeBridge { } /** - * Switch network mode (P2P, Relay, Hybrid) - * @param mode 'p2p' | 'relay' | 'hybrid' + * Switch network mode (WebRTC, P2P, Relay, Hybrid). + * Updates internal state and sends the mode change to the native bridge. + * @param mode 'webrtc' | 'p2p' | 'relay' | 'hybrid' */ - setNetworkMode(mode: 'p2p' | 'relay' | 'hybrid'): void { + setNetworkMode(mode: 'webrtc' | 'p2p' | 'relay' | 'hybrid'): void { + // Normalize 'relay' to 'webrtc' for internal tracking (relay uses the WebRTC path from our perspective) + this.currentNetworkMode = mode === 'relay' ? 'webrtc' : mode as 'webrtc' | 'p2p' | 'hybrid'; this.send({ type: 'setNetworkMode', mode }); } + /** + * Get the current network mode. + * Returns the mode as determined by peer evaluation or explicit setting. + */ + getNetworkMode(): 'webrtc' | 'p2p' | 'hybrid' { + return this.currentNetworkMode; + } + + /** + * Register a remote user as having native bridge active. + * If the user is not already in the room peers set, they are added there too. + * Triggers automatic network mode re-evaluation. + */ + addNativeBridgePeer(userId: string): void { + if (!this.roomPeers.has(userId)) { + this.roomPeers.add(userId); + } + this.nativeBridgePeers.add(userId); + this.evaluateAndSetNetworkMode(); + console.log(`[NativeBridge] Added native bridge peer: ${userId} (${this.nativeBridgePeers.size}/${this.roomPeers.size} native)`); + } + + /** + * Remove a remote user from the native bridge peer set. + * The user remains in the room peers set (they may still be connected via WebRTC). + * Triggers automatic network mode re-evaluation. + */ + removeNativeBridgePeer(userId: string): void { + this.nativeBridgePeers.delete(userId); + this.evaluateAndSetNetworkMode(); + console.log(`[NativeBridge] Removed native bridge peer: ${userId} (${this.nativeBridgePeers.size}/${this.roomPeers.size} native)`); + } + + /** + * Evaluate the optimal network mode based on which remote peers have native bridge. + * Called automatically when peers connect/disconnect via the native bridge room, + * and when peers are manually added/removed via addNativeBridgePeer/removeNativeBridgePeer. + * + * Mode selection logic: + * - ALL remote users have native bridge -> 'p2p' (lowest latency, native-to-native) + * - SOME remote users have native bridge -> 'hybrid' (mixed native + WebRTC) + * - NO remote users have native bridge -> 'webrtc' (standard WebRTC path) + */ + private evaluateAndSetNetworkMode(): void { + const totalPeers = this.roomPeers.size; + const nativePeers = this.nativeBridgePeers.size; + + let targetMode: 'webrtc' | 'p2p' | 'hybrid'; + + if (totalPeers === 0 || nativePeers === 0) { + targetMode = 'webrtc'; + } else if (nativePeers === totalPeers) { + targetMode = 'p2p'; + } else { + targetMode = 'hybrid'; + } + + if (targetMode !== this.currentNetworkMode) { + console.log(`[NativeBridge] Network mode evaluation: ${this.currentNetworkMode} -> ${targetMode} (${nativePeers}/${totalPeers} peers have native bridge)`); + this.setNetworkMode(targetMode); + } + } + /** * Send remote user audio to native bridge for mixing * Used to route WebRTC audio through the native audio engine diff --git a/src/lib/supabase/realtime.ts b/src/lib/supabase/realtime.ts index fd0da67b..ea0edd28 100644 --- a/src/lib/supabase/realtime.ts +++ b/src/lib/supabase/realtime.ts @@ -10,6 +10,29 @@ export interface RoomState { messages: RoomMessage[]; } +/** Full room state payload for state sync protocol (WS2) */ +export interface StateSyncPayload { + requestId: string; + queue: BackingTrack[]; + currentTrack: BackingTrack | null; + currentTrackPosition: number; + isPlaying: boolean; + tempo: number; + tempoSource: string; + timeSignature: { beatsPerBar: number; beatUnit: number }; + key: string | null; + keyScale: string | null; + keySource: string; + userTracks: UserTrack[]; + loopTracks: LoopTrackState[]; + songs: Array<{ id: string; name: string; [key: string]: unknown }>; + currentSongId: string | null; + stemMixState: Record; + permissions: Record; + songTrackStates: Record; + timestamp: number; +} + // Track active channels by room ID to prevent conflicts const activeChannels = new Map(); @@ -18,6 +41,9 @@ export class RealtimeRoomManager { private roomId: string; private userId: string; private listeners: Map void>> = new Map(); + private connectionState: 'connected' | 'disconnected' | 'reconnecting' = 'disconnected'; + private seekDebounceTimer: ReturnType | null = null; + private songSeekDebounceTimer: ReturnType | null = null; constructor(roomId: string, userId: string) { this.roomId = roomId; @@ -262,6 +288,24 @@ export class RealtimeRoomManager { this.channel.on('broadcast', { event: 'song:trackStates' }, ({ payload }) => { this.emit('song:trackStates', payload); }); + + // State sync events (WS2: request/response state sync protocol) + this.channel.on('broadcast', { event: 'state:request' }, ({ payload }) => { + this.emit('state:request', payload); + }); + + this.channel.on('broadcast', { event: 'state:sync' }, ({ payload }) => { + this.emit('state:sync', payload); + }); + + // Transport position sync events (WS6) + this.channel.on('broadcast', { event: 'track:position' }, ({ payload }) => { + this.emit('track:position', payload); + }); + + this.channel.on('broadcast', { event: 'song:position' }, ({ payload }) => { + this.emit('song:position', payload); + }); } async connect(user: User): Promise { @@ -309,28 +353,54 @@ export class RealtimeRoomManager { }, BASE_TIMEOUT); this.channel!.subscribe(async (status) => { - if (resolved) return; // Prevent multiple resolutions - console.log('[Realtime] Subscription status:', status); - if (status === 'SUBSCRIBED') { - resolved = true; - clearTimeout(timeoutId); - try { - await this.channel?.track(user); - this.emit('connected', { roomId: this.roomId }); - resolve(); - } catch (err) { - reject(err); - } - } else if (status === 'TIMED_OUT' || status === 'CHANNEL_ERROR' || status === 'CLOSED') { - if (!resolved) { + if (!resolved) { + // Initial connection phase + if (status === 'SUBSCRIBED') { resolved = true; clearTimeout(timeoutId); + this.connectionState = 'connected'; + try { + await this.channel?.track(user); + this.emit('connected', { roomId: this.roomId }); + resolve(); + } catch (err) { + reject(err); + } + } else if (status === 'TIMED_OUT' || status === 'CHANNEL_ERROR' || status === 'CLOSED') { + resolved = true; + clearTimeout(timeoutId); + this.connectionState = 'disconnected'; reject(new Error(`Realtime subscription failed with status: ${status}`)); } + // SUBSCRIBING status is transitional, just wait + } else { + // Post-connection: detect reconnection (WS2) + if (status === 'CLOSED' || status === 'CHANNEL_ERROR') { + const previousState = this.connectionState; + this.connectionState = 'disconnected'; + if (previousState === 'connected') { + console.log('[Realtime] Connection lost, waiting for reconnection'); + this.emit('disconnected', { roomId: this.roomId }); + } + } else if (status === 'SUBSCRIBED') { + const previousState = this.connectionState; + this.connectionState = 'connected'; + if (previousState === 'disconnected' || previousState === 'reconnecting') { + console.log('[Realtime] Reconnected to room:', this.roomId); + this.emit('reconnected', { roomId: this.roomId }); + // Auto-request state sync on reconnection + this.broadcastStateRequest().catch((err) => { + console.warn('[Realtime] Failed to request state sync on reconnect:', err); + }); + } + } else if (status === 'SUBSCRIBING') { + if (this.connectionState === 'disconnected') { + this.connectionState = 'reconnecting'; + } + } } - // SUBSCRIBING status is transitional, just wait }); }); @@ -354,6 +424,17 @@ export class RealtimeRoomManager { } async disconnect(): Promise { + // Clear debounce timers to prevent stale broadcasts after disconnect + if (this.seekDebounceTimer !== null) { + clearTimeout(this.seekDebounceTimer); + this.seekDebounceTimer = null; + } + if (this.songSeekDebounceTimer !== null) { + clearTimeout(this.songSeekDebounceTimer); + this.songSeekDebounceTimer = null; + } + this.connectionState = 'disconnected'; + if (this.channel) { const channelToRemove = this.channel; const roomIdToCleanup = this.roomId; @@ -399,29 +480,74 @@ export class RealtimeRoomManager { } } + /** + * Reliable broadcast wrapper (WS3). + * Awaits the broadcast result and retries once after 500ms on failure. + * Returns true on success, false after exhausting the single retry. + */ + private async reliableBroadcast(event: string, payload: Record): Promise { + if (!this.channel) { + console.warn('[Realtime] reliableBroadcast: no channel available for event:', event); + return false; + } + + try { + const result = await this.channel.send({ + type: 'broadcast', + event, + payload, + }); + if (result === 'ok') { + return true; + } + console.warn('[Realtime] Broadcast returned non-ok for event:', event, 'result:', result); + } catch (err) { + console.warn('[Realtime] Broadcast failed for event:', event, err); + } + + // Retry once after 500ms + await new Promise((resolve) => setTimeout(resolve, 500)); + + if (!this.channel) { + console.warn('[Realtime] reliableBroadcast: channel lost during retry for event:', event); + return false; + } + + try { + const retryResult = await this.channel.send({ + type: 'broadcast', + event, + payload, + }); + if (retryResult === 'ok') { + return true; + } + console.warn('[Realtime] Broadcast retry returned non-ok for event:', event, 'result:', retryResult); + return false; + } catch (retryErr) { + console.warn('[Realtime] Broadcast retry failed for event:', event, retryErr); + return false; + } + } + // Broadcast events to all users async broadcastPlay(trackId: string, timestamp: number, syncTime: number): Promise { - await this.channel?.send({ - type: 'broadcast', - event: 'track:play', - payload: { trackId, timestamp, syncTime, userId: this.userId }, - }); + await this.reliableBroadcast('track:play', { trackId, timestamp, syncTime, userId: this.userId }); } async broadcastPause(trackId: string, timestamp: number): Promise { - await this.channel?.send({ - type: 'broadcast', - event: 'track:pause', - payload: { trackId, timestamp, userId: this.userId }, - }); + await this.reliableBroadcast('track:pause', { trackId, timestamp, userId: this.userId }); } async broadcastSeek(trackId: string, timestamp: number, syncTime: number): Promise { - await this.channel?.send({ - type: 'broadcast', - event: 'track:seek', - payload: { trackId, timestamp, syncTime, userId: this.userId }, - }); + // Debounce seek broadcasts (WS6) - only the final seek position gets sent + if (this.seekDebounceTimer !== null) { + clearTimeout(this.seekDebounceTimer); + } + this.seekDebounceTimer = setTimeout(() => { + this.seekDebounceTimer = null; + this.reliableBroadcast('track:seek', { trackId, timestamp, syncTime, userId: this.userId }); + }, 100); } async broadcastQueueUpdate(queue: TrackQueue): Promise { @@ -469,19 +595,11 @@ export class RealtimeRoomManager { } async broadcastStemToggle(trackId: string, stem: string, enabled: boolean): Promise { - await this.channel?.send({ - type: 'broadcast', - event: 'stem:toggle', - payload: { trackId, stem, enabled, userId: this.userId }, - }); + await this.reliableBroadcast('stem:toggle', { trackId, stem, enabled, userId: this.userId }); } async broadcastStemVolume(trackId: string, stem: string, volume: number): Promise { - await this.channel?.send({ - type: 'broadcast', - event: 'stem:volume', - payload: { trackId, stem, volume, userId: this.userId }, - }); + await this.reliableBroadcast('stem:volume', { trackId, stem, volume, userId: this.userId }); } async broadcastAIGeneration(request: { prompt: string; status: string; trackId?: string }): Promise { @@ -584,27 +702,15 @@ export class RealtimeRoomManager { // Tempo broadcasts async broadcastTempoUpdate(tempo: number, source: string): Promise { - await this.channel?.send({ - type: 'broadcast', - event: 'tempo:update', - payload: { tempo, source, userId: this.userId }, - }); + await this.reliableBroadcast('tempo:update', { tempo, source, userId: this.userId }); } async broadcastTempoSource(source: string): Promise { - await this.channel?.send({ - type: 'broadcast', - event: 'tempo:source', - payload: { source, userId: this.userId }, - }); + await this.reliableBroadcast('tempo:source', { source, userId: this.userId }); } async broadcastTimeSignature(beatsPerBar: number, beatUnit: number): Promise { - await this.channel?.send({ - type: 'broadcast', - event: 'tempo:timesig', - payload: { beatsPerBar, beatUnit, userId: this.userId }, - }); + await this.reliableBroadcast('tempo:timesig', { beatsPerBar, beatUnit, userId: this.userId }); } async broadcastKeyUpdate(key: string | null, keyScale: 'major' | 'minor' | null, keySource: string): Promise { @@ -690,27 +796,22 @@ export class RealtimeRoomManager { syncTime: number, trackStates: Array<{ trackRefId: string; muted: boolean; solo: boolean; volume: number }> ): Promise { - await this.channel?.send({ - type: 'broadcast', - event: 'song:play', - payload: { songId, currentTime, syncTime, trackStates, userId: this.userId }, - }); + await this.reliableBroadcast('song:play', { songId, currentTime, syncTime, trackStates, userId: this.userId }); } async broadcastSongPause(songId: string, currentTime: number): Promise { - await this.channel?.send({ - type: 'broadcast', - event: 'song:pause', - payload: { songId, currentTime, userId: this.userId }, - }); + await this.reliableBroadcast('song:pause', { songId, currentTime, userId: this.userId }); } async broadcastSongSeek(songId: string, seekTime: number, syncTime: number): Promise { - await this.channel?.send({ - type: 'broadcast', - event: 'song:seek', - payload: { songId, seekTime, syncTime, userId: this.userId }, - }); + // Debounce song seek broadcasts (WS6) - only the final seek position gets sent + if (this.songSeekDebounceTimer !== null) { + clearTimeout(this.songSeekDebounceTimer); + } + this.songSeekDebounceTimer = setTimeout(() => { + this.songSeekDebounceTimer = null; + this.reliableBroadcast('song:seek', { songId, seekTime, syncTime, userId: this.userId }); + }, 100); } async broadcastSongSelect(songId: string): Promise { @@ -765,6 +866,53 @@ export class RealtimeRoomManager { }); } + // State sync broadcasts (WS2: reliable state sync protocol) + async broadcastStateRequest(): Promise { + const requestId = `${this.userId}-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; + await this.reliableBroadcast('state:request', { userId: this.userId, requestId }); + } + + async broadcastStateSync(state: StateSyncPayload): Promise { + await this.reliableBroadcast('state:sync', { ...state, userId: this.userId }); + } + + // Transport position sync broadcasts (WS6: periodic position updates) + async broadcastTrackPosition( + trackId: string, + currentTime: number, + syncTimestamp: number, + isPlaying: boolean + ): Promise { + try { + await this.channel?.send({ + type: 'broadcast', + event: 'track:position', + payload: { trackId, currentTime, syncTimestamp, isPlaying, userId: this.userId }, + }); + } catch (err) { + // Position syncs are periodic and non-critical; log but do not retry + console.warn('[Realtime] Failed to broadcast track position:', err); + } + } + + async broadcastSongPosition( + songId: string, + currentTime: number, + syncTimestamp: number, + isPlaying: boolean + ): Promise { + try { + await this.channel?.send({ + type: 'broadcast', + event: 'song:position', + payload: { songId, currentTime, syncTimestamp, isPlaying, userId: this.userId }, + }); + } catch (err) { + // Position syncs are periodic and non-critical; log but do not retry + console.warn('[Realtime] Failed to broadcast song position:', err); + } + } + async updatePresence(data: Partial): Promise { await this.channel?.track({ ...data, id: this.userId }); }