diff --git a/crates/perry-runtime/src/node_stream_readable_read.rs b/crates/perry-runtime/src/node_stream_readable_read.rs index 9a3de45094..2d7f261182 100644 --- a/crates/perry-runtime/src/node_stream_readable_read.rs +++ b/crates/perry-runtime/src/node_stream_readable_read.rs @@ -30,12 +30,13 @@ pub(super) fn read_stream_available_default(stream: f64) -> f64 { return read_stream_object_mode_chunk(stream); } - // `read()` with no size argument returns ONE chunk — the head of the - // internal buffer — not the whole buffer concatenated. This matches Node: - // `howMuchToRead(NaN)` yields a single buffered entry, so a `readable` - // drain loop (`while ((c = r.read()) !== null)`) and `for await` both see - // chunk boundaries preserved. Sized `read(n)` (read_stream_exact_size) - // still spans chunks. (#1545) + // `read()` with no size argument mirrors Node's `howMuchToRead(NaN)`: + // a FLOWING stream consumes ONE chunk (the buffer head) so 'data' + // emission preserves chunk boundaries, while a paused stream drains the + // entire internal buffer and returns it as a single value — Node only + // takes `state.buffer.first()` when `state.flowing && state.length`. + // Sized `read(n)` (read_stream_exact_size) still spans chunks. + // (#1545, #2484) let mut values = Vec::new(); if let Some(chunks) = readable_hidden_chunks(stream) { push_chunk_values(chunks, &mut values, 0); @@ -48,6 +49,10 @@ pub(super) fn read_stream_available_default(stream: f64) -> f64 { return f64::from_bits(TAG_NULL); } + if !readable_is_flowing(stream) { + return drain_whole_buffer(stream, values); + } + let head = values.remove(0); let mut remaining_len = 0usize; for value in &values { @@ -73,6 +78,43 @@ pub(super) fn read_stream_available_default(stream: f64) -> f64 { buffer_value_from_bytes(&bytes) } +/// Paused-mode `read()` with no size: consume every buffered chunk and return +/// them as one concatenated value (Node's `howMuchToRead(NaN)` returns +/// `state.length` when the stream is not flowing). +fn drain_whole_buffer(stream: f64, mut values: Vec) -> f64 { + clear_readable_buffer(stream); + mark_disturbed(stream); + clear_pending_readable_chunks(stream); + if stream_hidden_ended(stream) { + queue_readable_event(stream); + schedule_readable_end(stream); + } + + if readable_encoding_tag(stream).is_some() { + let mut decoded = Vec::with_capacity(values.len()); + for value in values { + if let Some(value) = super::decode_readable_chunk_for_encoding(stream, value) { + decoded.push(value); + } + } + values = decoded; + if values.is_empty() { + return f64::from_bits(TAG_NULL); + } + if values.len() == 1 { + return values[0]; + } + let result = crate::string::js_string_concat_chain(values.as_ptr(), values.len() as i32); + return f64::from_bits(JSValue::string_ptr(result).bits()); + } + + let mut bytes = Vec::new(); + for value in &values { + append_chunk_bytes(*value, &mut bytes, 0); + } + buffer_value_from_bytes(&bytes) +} + pub(super) fn read_stream_exact_size(stream: f64, size: f64) -> f64 { invoke_read_once(stream); if size <= 0.0 {