Skip to content

Commit 24091a0

Browse files
author
Appu
authored
Merge pull request #42 from bongodevs/dev
v1.0.8
2 parents 43ec6f0 + 8a95005 commit 24091a0

File tree

32 files changed

+643
-521
lines changed

32 files changed

+643
-521
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "rustalink"
3-
version = "1.0.7"
3+
version = "1.0.8"
44
edition = "2024"
55
license = "Apache-2.0"
66
authors = ["appujet", "notdeltaxd", "contributors"]

src/audio/effects/tape.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -79,10 +79,8 @@ impl TapeEffect {
7979
}
8080
}
8181

82-
/// Process a PCM frame. Unlike the previous implementation which worked in-place,
83-
/// this uses an internal resampling buffer to handle rate changes correctly.
8482
pub fn process(&mut self, frame: &mut [i16]) {
85-
if frame.is_empty() {
83+
if frame.is_empty() || !self.is_active() {
8684
return;
8785
}
8886

@@ -156,7 +154,8 @@ impl TapeEffect {
156154

157155
if self.read_pos > (self.sample_rate as f64 * channels as f64 * 2.0) {
158156
let integral = (self.read_pos.floor() as usize / channels) * channels;
159-
self.input_buffer.drain(0..integral);
157+
self.input_buffer.copy_within(integral.., 0);
158+
self.input_buffer.truncate(self.input_buffer.len() - integral);
160159
self.read_pos -= integral as f64;
161160
}
162161
}

src/audio/effects/volume.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,6 @@ impl VolumeEffect {
126126
value.signum() * limited.min(INT16_MAX_F)
127127
}
128128

129-
/// Process a stereo interleaved i16 frame **in-place**.
130129
pub fn process(&mut self, frame: &mut [i16]) {
131130
let sample_count = frame.len();
132131
if sample_count == 0 {
@@ -158,6 +157,10 @@ impl VolumeEffect {
158157
(v, v)
159158
};
160159

160+
if !self.fade_active && (gain_start - 1.0).abs() < 0.0001 {
161+
return;
162+
}
163+
161164
let step = if sample_count > 1 {
162165
(gain_end - gain_start) / (sample_count - 1) as f32
163166
} else {

src/audio/processor.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ pub struct AudioProcessor {
4949
source_rate: u32,
5050
channels: usize,
5151
config: PlayerConfig,
52+
recoverable_errors: u32,
5253
}
5354

5455
impl AudioProcessor {
@@ -119,6 +120,7 @@ impl AudioProcessor {
119120
source_rate: sample_rate,
120121
channels,
121122
config,
123+
recoverable_errors: 0,
122124
})
123125
}
124126

@@ -152,6 +154,7 @@ impl AudioProcessor {
152154

153155
match self.decoder.decode(&packet) {
154156
Ok(decoded) => {
157+
self.recoverable_errors = 0;
155158
let spec = *decoded.spec();
156159
let mut buf = self.sample_buf.take().unwrap_or_else(|| {
157160
SampleBuffer::<i16>::new(decoded.capacity() as u64, spec)
@@ -260,7 +263,14 @@ impl AudioProcessor {
260263
self.sample_buf = Some(buf);
261264
}
262265
Err(Error::IoError(e)) if e.kind() == ErrorKind::UnexpectedEof => break,
263-
Err(Error::DecodeError(e)) => warn!("Decode error (recoverable): {e}"),
266+
Err(Error::DecodeError(e)) => {
267+
self.recoverable_errors += 1;
268+
if self.recoverable_errors == 1 {
269+
warn!("Decode error (recoverable): {e}");
270+
} else if self.recoverable_errors.is_multiple_of(100) {
271+
warn!("Decode error (recoverable, x{}): {e}", self.recoverable_errors);
272+
}
273+
}
264274
Err(e) => {
265275
self.send_error(format!("Decode error: {e}"));
266276
return Err(e);

src/audio/resample/sinc.rs

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,30 +7,35 @@ pub struct SincResampler {
77
index: f32,
88
channels: usize,
99
taps: usize,
10-
window: Vec<f32>,
10+
table: Vec<f32>,
1111
buffer: Vec<VecDeque<f32>>,
1212
}
1313

1414
impl SincResampler {
1515
pub fn new(source_rate: u32, target_rate: u32, channels: usize) -> Self {
1616
let taps = 32;
17-
let mut window = Vec::with_capacity(taps);
17+
let mut table = Vec::with_capacity(taps);
1818
let m = taps as f32 - 1.0;
19+
let half_taps = (taps / 2) as f32;
1920

2021
for i in 0..taps {
22+
let offset = i as f32 - half_taps;
23+
2124
let a0 = 0.42;
2225
let a1 = 0.5;
2326
let a2 = 0.08;
2427
let pi_n_m = 2.0 * std::f32::consts::PI * i as f32 / m;
25-
window.push(a0 - a1 * pi_n_m.cos() + a2 * (2.0 * pi_n_m).cos());
28+
let window = a0 - a1 * pi_n_m.cos() + a2 * (2.0 * pi_n_m).cos();
29+
30+
table.push(Self::sinc(offset) * window);
2631
}
2732

2833
Self {
2934
ratio: source_rate as f32 / target_rate as f32,
3035
index: 0.0,
3136
channels,
3237
taps,
33-
window,
38+
table,
3439
buffer: vec![VecDeque::from(vec![0.0; taps]); channels],
3540
}
3641
}
@@ -45,7 +50,6 @@ impl SincResampler {
4550

4651
pub fn process(&mut self, input: &[i16], output: &mut PooledBuffer) {
4752
let num_frames = input.len() / self.channels;
48-
let half_taps = (self.taps / 2) as f32;
4953

5054
for frame in 0..num_frames {
5155
for ch in 0..self.channels {
@@ -58,8 +62,7 @@ impl SincResampler {
5862
let mut sum = 0.0;
5963

6064
for i in 0..self.taps {
61-
let offset = (i as f32 - half_taps) - self.index;
62-
sum += self.buffer[ch][i] * Self::sinc(offset) * self.window[i];
65+
sum += self.buffer[ch][i] * self.table[i];
6366
}
6467

6568
output.push(sum.clamp(i16::MIN as f32, i16::MAX as f32) as i16);

src/audio/source/http/prefetcher.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,6 @@ pub fn prefetch_loop(
252252
state.buffered += n;
253253
state.chunks.push_back(chunk);
254254
cvar.notify_all();
255-
retry_count = 0;
256255
}
257256
Ok(None) => {
258257
response = None;
@@ -274,7 +273,7 @@ pub fn prefetch_loop(
274273
retry_count += 1;
275274

276275
if retry_count > MAX_FETCH_RETRIES {
277-
warn!("prefetch: read failed fatally: {}", e);
276+
warn!("prefetch: read failed fatally after {} retries: {}", MAX_FETCH_RETRIES, e);
278277
let (lock, cvar) = &*shared;
279278
let mut state = lock.lock();
280279
state.error = Some(e.to_string());

src/common/logger/writer.rs

Lines changed: 33 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ pub struct CircularFileWriter {
1919
struct WriterState {
2020
file: Option<File>,
2121
lines_since_prune: u32,
22+
is_pruning: bool,
2223
}
2324

2425
impl CircularFileWriter {
@@ -29,6 +30,7 @@ impl CircularFileWriter {
2930
state: Arc::new(Mutex::new(WriterState {
3031
file: None,
3132
lines_since_prune: 0,
33+
is_pruning: false,
3234
})),
3335
}
3436
}
@@ -45,26 +47,42 @@ impl CircularFileWriter {
4547
Ok(state.file.as_mut().expect("file was just opened"))
4648
}
4749

48-
fn prune(&self, state: &mut WriterState) -> io::Result<()> {
49-
// Close file before pruning
50-
state.file = None;
50+
fn spawn_prune(&self) {
51+
let path = self.path.clone();
52+
let max_lines = self.max_lines;
53+
let state_arc = self.state.clone();
5154

52-
if !Path::new(&self.path).exists() {
55+
std::thread::spawn(move || {
56+
if let Err(e) = Self::do_prune(&path, max_lines) {
57+
eprintln!("Failed to prune log file '{}': {}", path, e);
58+
}
59+
let mut state = state_arc.lock();
60+
state.is_pruning = false;
61+
});
62+
}
63+
64+
fn do_prune(path: &str, max_lines: u32) -> io::Result<()> {
65+
if !Path::new(path).exists() {
5366
return Ok(());
5467
}
5568

5669
let lines: Vec<String> = {
57-
let file = File::open(&self.path)?;
70+
let file = File::open(path)?;
5871
let reader = BufReader::new(file);
5972
reader.lines().collect::<Result<_, _>>()?
6073
};
6174

62-
if lines.len() > self.max_lines as usize {
63-
let start = lines.len() - self.max_lines as usize;
64-
let mut file = File::create(&self.path)?;
65-
for line in &lines[start..] {
66-
writeln!(file, "{}", line)?;
75+
if lines.len() > max_lines as usize {
76+
let start = lines.len() - max_lines as usize;
77+
// Atomic-ish replacement: write to .tmp then rename
78+
let tmp_path = format!("{}.tmp", path);
79+
{
80+
let mut file = File::create(&tmp_path)?;
81+
for line in &lines[start..] {
82+
writeln!(file, "{}", line)?;
83+
}
6784
}
85+
std::fs::rename(tmp_path, path)?;
6886
}
6987
Ok(())
7088
}
@@ -73,19 +91,19 @@ impl CircularFileWriter {
7391
impl io::Write for CircularFileWriter {
7492
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
7593
let mut state = self.state.lock();
94+
7695
let file = self.ensure_file_open(&mut state)?;
77-
7896
file.write_all(buf)?;
7997

8098
let new_lines = buf.iter().filter(|&&b| b == b'\n').count() as u32;
8199
state.lines_since_prune += new_lines;
82100

83101
let prune_threshold = (self.max_lines / 10).max(50);
84-
if state.lines_since_prune >= prune_threshold {
85-
if let Err(e) = self.prune(&mut state) {
86-
eprintln!("Failed to prune log file '{}': {}", self.path, e);
87-
}
102+
if state.lines_since_prune >= prune_threshold && !state.is_pruning {
103+
state.is_pruning = true;
88104
state.lines_since_prune = 0;
105+
state.file = None; // Close file so rename can happen on Windows if needed
106+
self.spawn_prune();
89107
}
90108

91109
Ok(buf.len())

src/gateway/encryption.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ pub struct DaveHandler {
2626
protocol_version: u16,
2727
pending_transitions: HashMap<u16, u16>,
2828
external_sender_set: bool,
29+
saved_external_sender: Option<Vec<u8>>,
2930
pending_proposals: Vec<Vec<u8>>,
3031
pending_handshake: Vec<(Vec<u8>, bool)>,
3132
was_ready: bool,
@@ -43,6 +44,7 @@ impl DaveHandler {
4344
protocol_version: 0,
4445
pending_transitions: HashMap::new(),
4546
external_sender_set: false,
47+
saved_external_sender: None,
4648
pending_proposals: Vec::new(),
4749
pending_handshake: Vec::new(),
4850
was_ready: false,
@@ -92,13 +94,31 @@ impl DaveHandler {
9294
self.was_ready = false;
9395

9496
debug!("DAVE session setup (v{})", version);
95-
session.create_key_package().map_err(map_boxed_err)
97+
let key_package = session.create_key_package().map_err(map_boxed_err)?;
98+
99+
if let Some(saved) = self.saved_external_sender.clone() {
100+
if let Some(sess) = &mut self.session {
101+
match sess.set_external_sender(&saved) {
102+
Ok(()) => {
103+
self.external_sender_set = true;
104+
debug!("DAVE re-applied saved external sender after epoch reset");
105+
}
106+
Err(e) => {
107+
warn!("DAVE failed to re-apply saved external sender: {e}");
108+
self.saved_external_sender = None;
109+
}
110+
}
111+
}
112+
}
113+
114+
Ok(key_package)
96115
}
97116

98117
pub fn reset(&mut self) {
99118
self.protocol_version = 0;
100119
self.pending_transitions.clear();
101120
self.external_sender_set = false;
121+
self.saved_external_sender = None;
102122
self.pending_proposals.clear();
103123
self.pending_handshake.clear();
104124
self.was_ready = false;
@@ -143,6 +163,7 @@ impl DaveHandler {
143163
if let Some(session) = &mut self.session {
144164
session.set_external_sender(data).map_err(map_boxed_err)?;
145165
self.external_sender_set = true;
166+
self.saved_external_sender = Some(data.to_vec());
146167

147168
if !self.pending_proposals.is_empty() {
148169
debug!(

src/gateway/session/handler.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,7 @@ impl<'a> SessionState<'a> {
312312
{
313313
let mut state = self.persistent_state.lock().await;
314314
state.ssrc = self.ssrc;
315+
state.selected_mode = Some(self.selected_mode.clone());
315316
}
316317

317318
if self.gateway.channel_id.0 > 0 {
@@ -413,6 +414,7 @@ impl<'a> SessionState<'a> {
413414
state.udp_addr = Some(addr);
414415
state.session_key = Some(key);
415416
state.ssrc = self.ssrc;
417+
state.selected_mode = Some(self.selected_mode.clone());
416418
}
417419

418420
self.launch_speak_loop(addr, key).await;
@@ -456,16 +458,19 @@ impl<'a> SessionState<'a> {
456458

457459
self.backoff.reset();
458460

459-
let (addr, key, ssrc) = {
461+
let (addr, key, ssrc, mode) = {
460462
let state = self.persistent_state.lock().await;
461-
(state.udp_addr, state.session_key, state.ssrc)
463+
(state.udp_addr, state.session_key, state.ssrc, state.selected_mode.clone())
462464
};
463465

464466
match (addr, key) {
465467
(Some(addr), Some(key)) => {
466468
self.udp_addr = Some(addr);
467469
self.session_key = Some(key);
468470
self.ssrc = ssrc;
471+
if let Some(m) = mode {
472+
self.selected_mode = m;
473+
}
469474

470475
self.launch_speak_loop(addr, key).await;
471476
self.send_json(

0 commit comments

Comments
 (0)