diff --git a/crates/perry-runtime/src/node_stream_readable_read.rs b/crates/perry-runtime/src/node_stream_readable_read.rs index 2d7f261182..2b42e0dd7a 100644 --- a/crates/perry-runtime/src/node_stream_readable_read.rs +++ b/crates/perry-runtime/src/node_stream_readable_read.rs @@ -49,7 +49,13 @@ pub(super) fn read_stream_available_default(stream: f64) -> f64 { return f64::from_bits(TAG_NULL); } - if !readable_is_flowing(stream) { + // Node's `howMuchToRead(NaN)` (internal/streams/readable, v26): WITHOUT a + // string decoder, `read()` always returns just the head chunk — one buffer + // at a time — even when the stream is paused. Only a *decoded* (setEncoding) + // paused stream concatenates the entire buffer into a single string. So we + // drain the whole buffer here only when paused AND a decoder is active; + // otherwise fall through to the head-only path below. + if !readable_is_flowing(stream) && readable_encoding_tag(stream).is_some() { return drain_whole_buffer(stream, values); } @@ -270,7 +276,11 @@ pub(super) fn read_stream_object_mode_chunk(stream: f64) -> f64 { sync_pending_readable_chunks_to_buffer(stream); if stream_hidden_ended(stream) && remaining == 0.0 { clear_pending_readable_chunks(stream); - queue_readable_event(stream); + // Node never emits a final `readable` just to hand the consumer a + // null at EOF: once `read()` returns the last buffered item from an + // ended stream it transitions straight to `end` (endReadable). A + // re-queued `readable` here makes a fixed-count consumer observe an + // extra `read() === null` pair. (internal/streams/readable, v26) schedule_readable_end(stream); } chunk diff --git a/crates/perry-runtime/src/node_stream_readwrite.rs b/crates/perry-runtime/src/node_stream_readwrite.rs index 4b9f77acdf..2ad1459fe3 100644 --- a/crates/perry-runtime/src/node_stream_readwrite.rs +++ b/crates/perry-runtime/src/node_stream_readwrite.rs @@ -461,6 +461,13 @@ pub(super) fn pipe_stream_to_destination(stream: f64, dest: f64, end_dest: bool) if !end_dest { add_pipe_no_end_destination(stream, dest); } + // A flowing destination (e.g. a piped-into PassThrough/Duplex) must consume + // each chunk from its own readable buffer when it emits 'data' live — + // otherwise the chunk lingers in the buffer and the destination's drain + // microtask re-emits it, duplicating every piped chunk. `pipeline()` already + // marks both ends; `pipe()` needs the same on the destination. (matches + // mark_live_pipe_consume_on_emit usage in node_stream_pipeline.rs) + mark_live_pipe_consume_on_emit(dest); install_pipe_destination_listeners(stream, dest); let _ = emit_stream_event(dest, string_value(b"pipe"), &[stream]); set_readable_flowing(stream, f64::from_bits(TAG_TRUE)); diff --git a/crates/perry-runtime/src/node_stream_tests_extra.rs b/crates/perry-runtime/src/node_stream_tests_extra.rs index 30f634fbdf..76e49e3ca8 100644 --- a/crates/perry-runtime/src/node_stream_tests_extra.rs +++ b/crates/perry-runtime/src/node_stream_tests_extra.rs @@ -467,8 +467,15 @@ fn readable_unshift_prepends_chunk_and_returns_hwm_signal() { ); assert_eq!(js_node_stream_method_readable_length(handle), 11.0); - let joined = js_node_stream_method_read(handle, f64::from_bits(TAG_UNDEFINED)); - assert_eq!(stream_test_buffer_bytes(joined), b"hello world"); + // Node v26 `read()` with no size and no decoder returns the head chunk only + // — one buffer at a time — not the whole concatenated buffer. The unshifted + // chunk becomes the new head, so the first read yields "hello " and "world" + // remains for the next read. (internal/streams/readable, howMuchToRead(NaN)) + let head = js_node_stream_method_read(handle, f64::from_bits(TAG_UNDEFINED)); + assert_eq!(stream_test_buffer_bytes(head), b"hello "); + assert_eq!(js_node_stream_method_readable_length(handle), 5.0); + let tail = js_node_stream_method_read(handle, f64::from_bits(TAG_UNDEFINED)); + assert_eq!(stream_test_buffer_bytes(tail), b"world"); assert_eq!(js_node_stream_method_readable_length(handle), 0.0); let opts = crate::object::js_object_alloc(0, 1); @@ -493,8 +500,13 @@ fn readable_unshift_after_eof_before_end_prepends_chunk() { TAG_TRUE ); - let joined = js_node_stream_method_read(handle, f64::from_bits(TAG_UNDEFINED)); - assert_eq!(stream_test_buffer_bytes(joined), b"hello world"); + // Chunk-at-a-time (no decoder): `read()` returns the unshifted head + // ("hello ") and leaves "world" for the next read, even though EOF was + // already signalled via push(null). Matches node v26. + let head = js_node_stream_method_read(handle, f64::from_bits(TAG_UNDEFINED)); + assert_eq!(stream_test_buffer_bytes(head), b"hello "); + let tail = js_node_stream_method_read(handle, f64::from_bits(TAG_UNDEFINED)); + assert_eq!(stream_test_buffer_bytes(tail), b"world"); } fn stream_test_buffer_bytes(value: f64) -> Vec {