Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 48 additions & 6 deletions crates/perry-runtime/src/node_stream_readable_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,13 @@ pub(super) fn read_stream_available_default(stream: f64) -> f64 {
return read_stream_object_mode_chunk(stream);
}

// `read()` with no size argument returns ONE chunk — the head of the
// internal buffer — not the whole buffer concatenated. This matches Node:
// `howMuchToRead(NaN)` yields a single buffered entry, so a `readable`
// drain loop (`while ((c = r.read()) !== null)`) and `for await` both see
// chunk boundaries preserved. Sized `read(n)` (read_stream_exact_size)
// still spans chunks. (#1545)
// `read()` with no size argument mirrors Node's `howMuchToRead(NaN)`:
// a FLOWING stream consumes ONE chunk (the buffer head) so 'data'
// emission preserves chunk boundaries, while a paused stream drains the
// entire internal buffer and returns it as a single value — Node only
// takes `state.buffer.first()` when `state.flowing && state.length`.
// Sized `read(n)` (read_stream_exact_size) still spans chunks.
// (#1545, #2484)
let mut values = Vec::new();
if let Some(chunks) = readable_hidden_chunks(stream) {
push_chunk_values(chunks, &mut values, 0);
Expand All @@ -48,6 +49,10 @@ pub(super) fn read_stream_available_default(stream: f64) -> f64 {
return f64::from_bits(TAG_NULL);
}

if !readable_is_flowing(stream) {
return drain_whole_buffer(stream, values);
}

let head = values.remove(0);
let mut remaining_len = 0usize;
for value in &values {
Expand All @@ -73,6 +78,43 @@ pub(super) fn read_stream_available_default(stream: f64) -> f64 {
buffer_value_from_bytes(&bytes)
}

/// Paused-mode `read()` with no size: consume every buffered chunk and return
/// them as one concatenated value (Node's `howMuchToRead(NaN)` returns
/// `state.length` when the stream is not flowing).
fn drain_whole_buffer(stream: f64, mut values: Vec<f64>) -> f64 {
clear_readable_buffer(stream);
mark_disturbed(stream);
clear_pending_readable_chunks(stream);
if stream_hidden_ended(stream) {
queue_readable_event(stream);
schedule_readable_end(stream);
}

if readable_encoding_tag(stream).is_some() {
let mut decoded = Vec::with_capacity(values.len());
for value in values {
if let Some(value) = super::decode_readable_chunk_for_encoding(stream, value) {
decoded.push(value);
}
}
values = decoded;
if values.is_empty() {
return f64::from_bits(TAG_NULL);
}
if values.len() == 1 {
return values[0];
}
let result = crate::string::js_string_concat_chain(values.as_ptr(), values.len() as i32);
return f64::from_bits(JSValue::string_ptr(result).bits());
}

let mut bytes = Vec::new();
for value in &values {
append_chunk_bytes(*value, &mut bytes, 0);
}
buffer_value_from_bytes(&bytes)
}

pub(super) fn read_stream_exact_size(stream: f64, size: f64) -> f64 {
invoke_read_once(stream);
if size <= 0.0 {
Expand Down
Loading