Skip to content

Commit dfaffcb

Browse files
committed
feat(audio): streamed music decode — OGG/MP3 no longer fully decoded into RAM
A 5-minute 44.1kHz stereo track was held as ~57MB of f32 PCM; it's now ~5MB of compressed bytes (shared Arc) plus ~1.5s of decoded ring. audio/stream.rs: per-playback decode worker (lewton/minimp3 are both incremental) feeding 32k-sample chunks over the existing lock-free SPSC ring with sleep-based backpressure; looping restarts the decoder inside the worker so loop seams can't underrun; a stop flag (set by StreamConsumer::drop — i.e. stop/replace/teardown) kills the worker. The render-side MusicVoice grows a Stream source alongside Full: chunk-pulling mix loop, silence on underrun (resumes next callback), shared playing/position atomics maintained as before. WAV stays full-decode (already PCM); wasm32 falls back to full decode (no threads) — both via the same load_music_bytes entry the FFI loader now calls. Non-goal kept explicit: no resampling yet — non-44.1kHz tracks play at the same wrong pitch they did before (tracked). 97 shared tests green incl. a stop-flag/backpressure test.
1 parent c903483 commit dfaffcb

4 files changed

Lines changed: 406 additions & 33 deletions

File tree

native/shared/src/audio/mod.rs

Lines changed: 66 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
mod decode;
3030
mod render;
3131
mod spsc;
32+
pub mod stream;
3233

3334
pub use decode::{decode_audio, parse_ogg, parse_wav};
3435
#[cfg(feature = "mp3")]
@@ -53,8 +54,20 @@ pub struct MusicShared {
5354
pub position: AtomicUsize,
5455
}
5556

57+
enum MusicSource {
58+
/// Fully decoded PCM (WAV always; everything on wasm32).
59+
Full(Arc<SoundData>),
60+
/// Compressed bytes decoded by a background worker at play time.
61+
#[cfg(not(target_arch = "wasm32"))]
62+
Streamed {
63+
kind: stream::StreamKind,
64+
bytes: Arc<Vec<u8>>,
65+
channels: u16,
66+
},
67+
}
68+
5669
struct MusicEntry {
57-
data: Arc<SoundData>,
70+
source: MusicSource,
5871
shared: Arc<MusicShared>,
5972
volume: f32,
6073
looping: bool,
@@ -174,8 +187,50 @@ impl AudioMixer {
174187
// ------------------------------------------------------------ music
175188

176189
pub fn load_music(&mut self, data: SoundData) -> f64 {
190+
self.alloc_music(MusicSource::Full(Arc::new(data)))
191+
}
192+
193+
/// Load music from raw file bytes, streaming the decode when the
194+
/// format supports it (OGG/MP3 on native — keeps ~5 MB of compressed
195+
/// bytes resident instead of ~57 MB of PCM for a 5-minute track).
196+
/// WAV — and everything on wasm32, which has no threads — falls back
197+
/// to full decode. Returns 0 on undecodable data.
198+
pub fn load_music_bytes(&mut self, path: &str, data: Vec<u8>) -> f64 {
199+
#[cfg(not(target_arch = "wasm32"))]
200+
{
201+
let lower = path.to_ascii_lowercase();
202+
let kind = if lower.ends_with(".ogg") {
203+
Some(stream::StreamKind::Ogg)
204+
} else {
205+
#[cfg(feature = "mp3")]
206+
if lower.ends_with(".mp3") {
207+
Some(stream::StreamKind::Mp3)
208+
} else {
209+
None
210+
}
211+
#[cfg(not(feature = "mp3"))]
212+
None
213+
};
214+
if let Some(kind) = kind {
215+
if let Some((_rate, channels)) = stream::probe(kind, &data) {
216+
return self.alloc_music(MusicSource::Streamed {
217+
kind,
218+
bytes: Arc::new(data),
219+
channels,
220+
});
221+
}
222+
// Mislabelled file — fall through to sniffing decode.
223+
}
224+
}
225+
match decode_audio(path, &data) {
226+
Some(s) => self.load_music(s),
227+
None => 0.0,
228+
}
229+
}
230+
231+
fn alloc_music(&mut self, source: MusicSource) -> f64 {
177232
self.music.alloc(MusicEntry {
178-
data: Arc::new(data),
233+
source,
179234
shared: Arc::new(MusicShared {
180235
playing: AtomicBool::new(false),
181236
position: AtomicUsize::new(0),
@@ -191,9 +246,17 @@ impl AudioMixer {
191246
// moment play_music returns (the render thread confirms on its
192247
// next callback).
193248
m.shared.playing.store(true, Ordering::Relaxed);
249+
let payload = match &m.source {
250+
MusicSource::Full(data) => render::MusicPayload::Full(data.clone()),
251+
#[cfg(not(target_arch = "wasm32"))]
252+
MusicSource::Streamed { kind, bytes, channels } => render::MusicPayload::Stream {
253+
consumer: stream::start(*kind, bytes.clone(), m.looping),
254+
channels: *channels,
255+
},
256+
};
194257
let cmd = Cmd::PlayMusic {
195258
music_id: handle.to_bits(),
196-
data: m.data.clone(),
259+
payload,
197260
shared: m.shared.clone(),
198261
volume: m.volume,
199262
looping: m.looping,

native/shared/src/audio/render.rs

Lines changed: 111 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
//! voice playing it finishes, then the Arc drops on this thread.
1212
1313
use super::spsc::Consumer;
14+
#[cfg(not(target_arch = "wasm32"))]
15+
use super::stream::{StreamConsumer, StreamMsg};
1416
use super::{MusicShared, SoundData};
1517
use std::sync::atomic::Ordering;
1618
use std::sync::Arc;
@@ -30,7 +32,7 @@ pub enum Cmd {
3032
SetSoundVolume { sound_id: u64, volume: f32 },
3133
PlayMusic {
3234
music_id: u64,
33-
data: Arc<SoundData>,
35+
payload: MusicPayload,
3436
shared: Arc<MusicShared>,
3537
volume: f32,
3638
looping: bool,
@@ -49,13 +51,35 @@ struct Voice {
4951
spatial: Option<[f32; 3]>,
5052
}
5153

54+
/// How a music voice gets its samples.
55+
pub enum MusicPayload {
56+
Full(Arc<SoundData>),
57+
/// Chunks arrive from a background decode worker; the worker handles
58+
/// looping internally (End only arrives for finished non-loop tracks).
59+
#[cfg(not(target_arch = "wasm32"))]
60+
Stream { consumer: StreamConsumer, channels: u16 },
61+
}
62+
63+
enum MusicSamples {
64+
Full { data: Arc<SoundData>, position: usize },
65+
#[cfg(not(target_arch = "wasm32"))]
66+
Stream {
67+
consumer: StreamConsumer,
68+
channels: u16,
69+
current: Vec<f32>,
70+
offset: usize,
71+
ended: bool,
72+
},
73+
}
74+
5275
struct MusicVoice {
5376
music_id: u64,
54-
data: Arc<SoundData>,
77+
samples: MusicSamples,
5578
shared: Arc<MusicShared>,
56-
position: usize,
5779
volume: f32,
5880
looping: bool,
81+
/// Total samples consumed (drives the shared position mirror).
82+
consumed: usize,
5983
}
6084

6185
pub struct AudioRenderer {
@@ -94,12 +118,23 @@ impl AudioRenderer {
94118
}
95119
}
96120
}
97-
Cmd::PlayMusic { music_id, data, shared, volume, looping } => {
121+
Cmd::PlayMusic { music_id, payload, shared, volume, looping } => {
98122
// Restart-from-zero semantics (matches the old mixer).
99123
self.music.retain(|m| m.music_id != music_id);
100124
shared.playing.store(true, Ordering::Relaxed);
101125
shared.position.store(0, Ordering::Relaxed);
102-
self.music.push(MusicVoice { music_id, data, shared, position: 0, volume, looping });
126+
let samples = match payload {
127+
MusicPayload::Full(data) => MusicSamples::Full { data, position: 0 },
128+
#[cfg(not(target_arch = "wasm32"))]
129+
MusicPayload::Stream { consumer, channels } => MusicSamples::Stream {
130+
consumer,
131+
channels,
132+
current: Vec::new(),
133+
offset: 0,
134+
ended: false,
135+
},
136+
};
137+
self.music.push(MusicVoice { music_id, samples, shared, volume, looping, consumed: 0 });
103138
}
104139
Cmd::StopMusic { music_id } => {
105140
if let Some(m) = self.music.iter().position(|m| m.music_id == music_id) {
@@ -192,31 +227,81 @@ impl AudioRenderer {
192227
self.music.retain_mut(|m| {
193228
let vol = m.volume * master;
194229
let mut i = 0;
195-
while i < output.len() && m.position < m.data.samples.len() {
196-
if m.data.channels == 1 {
197-
let sample = m.data.samples[m.position] * vol;
198-
output[i] += sample;
199-
if i + 1 < output.len() {
200-
output[i + 1] += sample;
230+
let mut finished = false;
231+
match &mut m.samples {
232+
MusicSamples::Full { data, position } => {
233+
while i < output.len() && *position < data.samples.len() {
234+
if data.channels == 1 {
235+
let sample = data.samples[*position] * vol;
236+
output[i] += sample;
237+
if i + 1 < output.len() {
238+
output[i + 1] += sample;
239+
}
240+
*position += 1;
241+
i += 2;
242+
} else {
243+
output[i] += data.samples[*position] * vol;
244+
*position += 1;
245+
i += 1;
246+
}
201247
}
202-
m.position += 1;
203-
i += 2;
204-
} else {
205-
output[i] += m.data.samples[m.position] * vol;
206-
m.position += 1;
207-
i += 1;
248+
if *position >= data.samples.len() {
249+
if m.looping {
250+
*position = 0;
251+
} else {
252+
finished = true;
253+
}
254+
}
255+
m.consumed = *position;
208256
}
209-
}
210-
if m.position >= m.data.samples.len() {
211-
if m.looping {
212-
m.position = 0;
213-
} else {
214-
m.shared.playing.store(false, Ordering::Relaxed);
215-
m.shared.position.store(0, Ordering::Relaxed);
216-
return false;
257+
#[cfg(not(target_arch = "wasm32"))]
258+
MusicSamples::Stream { consumer, channels, current, offset, ended } => {
259+
let mono = *channels == 1;
260+
while i < output.len() {
261+
if *offset >= current.len() {
262+
// Refill from the decode worker's ring.
263+
match consumer.rx.pop() {
264+
Some(StreamMsg::Chunk(c)) => {
265+
*current = c;
266+
*offset = 0;
267+
}
268+
Some(StreamMsg::End) => {
269+
*ended = true;
270+
break;
271+
}
272+
// Underrun: worker is behind (cold start
273+
// or scheduling hiccup) — emit silence for
274+
// the rest of this callback, resume next.
275+
None => break,
276+
}
277+
}
278+
if mono {
279+
let sample = current[*offset] * vol;
280+
output[i] += sample;
281+
if i + 1 < output.len() {
282+
output[i + 1] += sample;
283+
}
284+
*offset += 1;
285+
m.consumed += 1;
286+
i += 2;
287+
} else {
288+
output[i] += current[*offset] * vol;
289+
*offset += 1;
290+
m.consumed += 1;
291+
i += 1;
292+
}
293+
}
294+
if *ended && *offset >= current.len() {
295+
finished = true;
296+
}
217297
}
218298
}
219-
m.shared.position.store(m.position, Ordering::Relaxed);
299+
if finished {
300+
m.shared.playing.store(false, Ordering::Relaxed);
301+
m.shared.position.store(0, Ordering::Relaxed);
302+
return false;
303+
}
304+
m.shared.position.store(m.consumed, Ordering::Relaxed);
220305
true
221306
});
222307
}

0 commit comments

Comments
 (0)