Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 70 additions & 12 deletions native-bridge/src/network/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

use super::{NetworkEvent, NetworkManager, NetworkMode};
use crate::audio::AudioBridgeHandle;
use crate::network::codec::{OpusCodec, OpusConfig};
use crate::network::osp::{OspMessageType, TrackStateMessage, TransportAction};
use parking_lot::RwLock;
use std::sync::Arc;
Expand Down Expand Up @@ -71,21 +72,33 @@ pub struct AudioNetworkBridge {
running: Arc<std::sync::atomic::AtomicBool>,
/// Transport event sender for metronome/playback sync
transport_tx: RwLock<Option<broadcast::Sender<TransportEvent>>>,
/// Opus codec for encoding local audio before P2P transmission.
/// The bridge encodes once and sends the encoded data to P2P peers,
/// avoiding redundant encoding in the P2P layer.
codec: Arc<OpusCodec>,
}

impl AudioNetworkBridge {
pub fn new(
config: BridgeConfig,
network: Arc<NetworkManager>,
audio: AudioBridgeHandle,
) -> Self {
Self {
) -> super::Result<Self> {
let codec = Arc::new(OpusCodec::new(OpusConfig {
sample_rate: config.sample_rate,
channels: config.channels,
frame_size: config.frame_size,
..OpusConfig::low_latency()
})?);

Ok(Self {
config,
network,
audio,
running: Arc::new(std::sync::atomic::AtomicBool::new(false)),
transport_tx: RwLock::new(None),
}
codec,
})
}

/// Subscribe to transport events (clock sync, tempo changes, etc.)
Expand Down Expand Up @@ -143,6 +156,7 @@ impl AudioNetworkBridge {
// Spawn outgoing audio sender on dedicated thread
let audio = self.audio.clone();
let network = self.network.clone();
let codec = self.codec.clone();
let config = self.config.clone();
let running = self.running.clone();
std::thread::Builder::new()
Expand All @@ -152,7 +166,7 @@ impl AudioNetworkBridge {
.enable_all()
.build()
.expect("Failed to create runtime");
rt.block_on(Self::audio_send_loop(audio, network, config, running));
rt.block_on(Self::audio_send_loop(audio, network, codec, config, running));
})
.expect("Failed to spawn audio sender thread");

Expand Down Expand Up @@ -198,10 +212,18 @@ impl AudioNetworkBridge {
match event {
NetworkEvent::AudioReceived {
user_id,
track_id: _,
track_id,
samples,
} => {
// Push audio to the remote user's buffer in the audio engine
// Feed received P2P/relay audio into the AudioEngine's remote user buffer.
// This handles both P2P-decoded and relay-decoded audio uniformly.
// The P2P layer has already Opus-decoded the audio; we receive PCM here.
debug!(
"Routing {} audio samples from user {} track {} to audio engine",
samples.len(),
user_id,
track_id,
);
audio.push_remote_audio(&user_id, &samples);
}

Expand Down Expand Up @@ -337,10 +359,19 @@ impl AudioNetworkBridge {
}
}

/// Audio send loop - gets audio from engine and sends to network
/// Audio send loop - gets audio from engine and sends to network.
///
/// Handles dual-path operation:
/// 1. Opus-encodes audio once at the bridge level
/// 2. Sends pre-encoded data to P2P peers via NetworkManager (unreliable UDP)
/// 3. Sends raw PCM to relay when in Relay or Hybrid mode
///
/// This avoids redundant Opus encoding: the bridge encodes once, and P2P
/// peers receive the encoded data directly without re-encoding.
async fn audio_send_loop(
audio: AudioBridgeHandle,
network: Arc<NetworkManager>,
codec: Arc<OpusCodec>,
config: BridgeConfig,
running: Arc<std::sync::atomic::AtomicBool>,
) {
Expand All @@ -367,10 +398,37 @@ impl AudioNetworkBridge {
// Accumulate until we have a full frame
frame_buffer.extend_from_slice(&samples);

// Send complete frames
// Send complete frames via both P2P and relay paths
while frame_buffer.len() >= samples_per_frame {
let frame: Vec<f32> = frame_buffer.drain(..samples_per_frame).collect();
network.send_audio(config.local_track_id, frame);
let sample_count =
(frame.len() / config.channels as usize) as u16;

// Opus-encode once at the bridge level for P2P transmission.
// The encoded data goes directly to P2P peers without re-encoding.
match codec.encoder.encode(&frame) {
Ok(opus_data) => {
// Send pre-encoded audio to P2P peers (unreliable UDP).
// This uses the P2PEncodedAudio message type which bypasses
// the mode-based routing and goes directly to P2P.
network.send_p2p_encoded_audio(
config.local_track_id,
opus_data,
config.channels,
sample_count,
);
}
Err(e) => {
debug!("Opus encode failed in bridge send loop: {}", e);
}
}

// Also send raw PCM to relay for Relay and Hybrid modes.
// In P2P-only mode, relay is not running so this is a no-op.
let mode = network.mode();
if mode == NetworkMode::Relay || mode == NetworkMode::Hybrid {
network.send_relay_audio(config.local_track_id, frame);
}
}
}
}
Expand All @@ -381,11 +439,11 @@ pub fn create_and_start_bridge(
network: Arc<NetworkManager>,
audio: AudioBridgeHandle,
event_rx: broadcast::Receiver<NetworkEvent>,
) -> AudioNetworkBridge {
) -> super::Result<AudioNetworkBridge> {
let config = BridgeConfig::default();
let bridge = AudioNetworkBridge::new(config, network, audio);
let bridge = AudioNetworkBridge::new(config, network, audio)?;
bridge.start(event_rx);
bridge
Ok(bridge)
}

#[cfg(test)]
Expand Down
88 changes: 88 additions & 0 deletions native-bridge/src/network/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,21 @@ pub enum OutgoingMessage {
bpm: f32,
time_sig: (u8, u8),
},
/// Pre-encoded audio for direct P2P transmission.
/// Bypasses mode-based routing and Opus encoding in the P2P layer,
/// since the bridge has already Opus-encoded the audio.
P2PEncodedAudio {
track_id: u8,
opus_data: Vec<u8>,
channels: u8,
sample_count: u16,
},
/// Audio for relay only. Used by the bridge for dual-path operation
/// where P2P audio is sent separately via P2PEncodedAudio.
RelayAudio {
track_id: u8,
samples: Vec<f32>,
},
}

/// Network manager state
Expand Down Expand Up @@ -369,6 +384,44 @@ impl NetworkManager {
}
}

/// Send pre-encoded audio frame to P2P peers.
/// Bypasses the standard outgoing queue's Opus encoding since the
/// bridge has already encoded the audio. Sent via the outgoing channel
/// so socket I/O occurs on the correct tokio runtime.
pub fn send_p2p_encoded_audio(
&self,
track_id: u8,
opus_data: Vec<u8>,
channels: u8,
sample_count: u16,
) {
if let Some(tx) = self.outgoing_tx.read().as_ref() {
let _ = tx.send(OutgoingMessage::P2PEncodedAudio {
track_id,
opus_data,
channels,
sample_count,
});
}
}

/// Send audio frame to relay only.
/// Used by the bridge for dual-path operation where P2P audio
/// is sent separately via send_p2p_encoded_audio.
pub fn send_relay_audio(&self, track_id: u8, samples: Vec<f32>) {
if let Some(tx) = self.outgoing_tx.read().as_ref() {
let _ = tx.send(OutgoingMessage::RelayAudio {
track_id,
samples,
});
}
}

/// Get access to the audio codec for encoding/decoding
pub fn codec(&self) -> &OpusCodec {
&self.codec
}

/// Get current mode
pub fn mode(&self) -> NetworkMode {
self.state.read().mode
Expand Down Expand Up @@ -496,6 +549,7 @@ impl NetworkManager {

// Forward P2P events to our event channel
let event_tx = self.event_tx.read().clone();
let peers_for_events = self.peers.clone();
if let Some(tx) = event_tx {
tokio::spawn(async move {
while let Ok(event) = rx.recv().await {
Expand All @@ -508,6 +562,10 @@ impl NetworkManager {
});
}
P2PEvent::PeerDisconnected { peer_id, reason } => {
// Mark peer as audio-inactive on disconnect
if let Some(peer) = peers_for_events.get(peer_id) {
peer.set_audio_active(false);
}
let _ = tx.send(NetworkEvent::PeerDisconnected {
user_id: peer_id.to_string(),
reason,
Expand Down Expand Up @@ -582,6 +640,30 @@ impl NetworkManager {
}

async fn send_message(&self, msg: OutgoingMessage) -> Result<()> {
match msg {
// Pre-encoded audio sent directly to P2P peers (bypass mode-based routing)
OutgoingMessage::P2PEncodedAudio {
track_id,
opus_data,
channels,
sample_count,
} => {
self.p2p
.broadcast_encoded_audio(track_id, &opus_data, channels, sample_count)
.await
}
// Audio sent directly to relay (bypass mode-based routing)
OutgoingMessage::RelayAudio { track_id, samples } => {
self.relay.send_audio(track_id, &samples).await
}
// Standard messages routed by current network mode
other => self.send_mode_routed(other).await,
}
}

/// Route standard outgoing messages based on current network mode.
/// Handles Audio, Control, and ClockSync messages for P2P, Relay, and Hybrid modes.
async fn send_mode_routed(&self, msg: OutgoingMessage) -> Result<()> {
match self.mode() {
NetworkMode::P2P => {
match msg {
Expand Down Expand Up @@ -613,6 +695,8 @@ impl NetworkManager {
// Broadcast clock sync to all peers
self.p2p.broadcast_control(OspMessageType::ClockSync, payload).await?;
}
// P2PEncodedAudio and RelayAudio are handled in send_message
_ => {}
}
}
NetworkMode::Relay => match msg {
Expand Down Expand Up @@ -644,6 +728,8 @@ impl NetworkManager {
.send_control(OspMessageType::ClockSync, payload)
.await?;
}
// P2PEncodedAudio and RelayAudio are handled in send_message
_ => {}
},
NetworkMode::Hybrid => {
// Send via both (relay handles fan-out, P2P for low-latency to nearby)
Expand Down Expand Up @@ -684,6 +770,8 @@ impl NetworkManager {
.send_control(OspMessageType::ClockSync, payload)
.await?;
}
// P2PEncodedAudio and RelayAudio are handled in send_message
_ => {}
}
}
NetworkMode::Disconnected => {}
Expand Down
42 changes: 41 additions & 1 deletion native-bridge/src/network/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,10 @@ impl P2PNetwork {
samples.clone(),
);

// Update last seen
// Update last seen and audio statistics
peer.touch();
peer.set_audio_active(true);
peer.record_audio_received(frame.data.len());

// Emit event
let _ = event_tx.send(P2PEvent::AudioReceived {
Expand Down Expand Up @@ -541,6 +543,44 @@ impl P2PNetwork {
Ok(())
}

/// Broadcast pre-encoded audio data to all connected peers via UDP.
/// Uses unreliable delivery (no retransmission) since real-time audio
/// tolerates packet loss better than retransmission latency.
/// The caller provides already Opus-encoded data to avoid redundant encoding
/// when the bridge has already encoded for other paths (e.g., browser WebSocket).
pub async fn broadcast_encoded_audio(
&self,
track_id: u8,
opus_data: &[u8],
channels: u8,
sample_count: u16,
) -> Result<()> {
let local_id = *self.local_peer_id.read();
let frame = AudioFrameMessage {
user_id: local_id,
track_id,
codec: 1, // Opus
channels,
sample_count,
data: opus_data.to_vec(),
};

let payload = frame.to_bytes();

// Broadcast to all connected peers via unreliable UDP (audio is realtime,
// retransmission would add latency worse than the occasional lost packet)
for peer in self.peers.connected() {
if let Err(e) = self
.send_packet(&peer, OspMessageType::AudioFrame, payload.clone(), false)
.await
{
warn!("Failed to broadcast encoded audio to peer {}: {}", peer.id, e);
}
}

Ok(())
}

/// Queue audio for sending (called from audio thread)
pub fn queue_audio(&self, track_id: u8, samples: Vec<f32>) {
let mut outgoing = self.outgoing_audio.write();
Expand Down
Loading
Loading