From 1b4dd55e2a074071493642c938b27f20d3b60baf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ralph=20K=C3=BCpper?= Date: Sat, 13 Jun 2026 18:57:23 +0200 Subject: [PATCH] fix(stream): node v26 read() chunk-at-a-time, no spurious EOF readable, pipe dedup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes the largest real-gap cluster in test-parity/node-suite/stream by matching node v26's internal/streams/readable byte-for-byte. Three distinct root causes (6 previously-failing node-suite tests): 1. read() with no size returns the HEAD chunk, not the whole buffer. node v26 howMuchToRead(NaN) takes a fast path: WITHOUT a string decoder it always returns state.buffer[bufferIndex].length (one buffer at a time), even when paused. Only a setEncoding (decoded) paused stream concatenates the whole buffer into a single string. Perry drained the entire buffer whenever paused, so multi-chunk paused read() over-concatenated. Fixes: readable-mode-only, read-in-readable-listener, unshift-after-read, iter-yields-buffer-no-encoding. 2. objectMode read() of the last buffered item no longer emits a final 'readable'. node never emits 'readable' just to hand the consumer a null at EOF — once read() returns the last item from an ended stream it transitions straight to 'end' (endReadable). The extra emit made a fixed-count consumer observe a spurious read()===null pair. Fixes: object-mode-read-returns-object. 3. pipe() destinations now consume-on-emit. A flowing destination (piped-into PassThrough/Duplex) must drop each chunk from its own readable buffer when it emits 'data' live; otherwise the chunk lingers and the destination's drain microtask re-emits it, duplicating every piped chunk. pipeline() already marks both ends via mark_live_pipe_consume_on_emit; pipe() now marks the destination too. Fixes: pipe/repipe-after-unpipe. Also updates two perry-runtime unit tests in node_stream_tests_extra.rs that asserted the pre-v26 drain-whole behavior (read() after unshift returning the concatenated buffer). Verified against node v26: read() returns the unshifted head chunk and leaves the remainder, so the tests now assert chunk-at-a-time. Verification (node v26.3.0, macOS): the 6 target tests pass; a focused 232-test regression over the read/pipe/objectMode/flowing/transform/duplex/ consumers surface is clean (226 pass, 6 node-fail, 0 diffs, 0 new regressions); pipe fan-out and chained-pipe cross-checks match node. cargo test -p perry-runtime --lib green (the Date.prototype global-state test passes in isolation, a pre-existing flake unrelated to this change). Not addressed (separate codegen gap): subclass/extends-{readable,writable, duplex,transform} still throw "Class extends value is not a constructor" — user-defined `class X extends Readable` requires native-stream subclassing support in codegen (synthesized super() initialising native stream state on `this`, routing push()/_read to the user object), which is a much larger, independent change. --- .../src/node_stream_readable_read.rs | 14 +++++++++++-- .../src/node_stream_readwrite.rs | 7 +++++++ .../src/node_stream_tests_extra.rs | 20 +++++++++++++++---- 3 files changed, 35 insertions(+), 6 deletions(-) diff --git a/crates/perry-runtime/src/node_stream_readable_read.rs b/crates/perry-runtime/src/node_stream_readable_read.rs index 2d7f261182..2b42e0dd7a 100644 --- a/crates/perry-runtime/src/node_stream_readable_read.rs +++ b/crates/perry-runtime/src/node_stream_readable_read.rs @@ -49,7 +49,13 @@ pub(super) fn read_stream_available_default(stream: f64) -> f64 { return f64::from_bits(TAG_NULL); } - if !readable_is_flowing(stream) { + // Node's `howMuchToRead(NaN)` (internal/streams/readable, v26): WITHOUT a + // string decoder, `read()` always returns just the head chunk — one buffer + // at a time — even when the stream is paused. Only a *decoded* (setEncoding) + // paused stream concatenates the entire buffer into a single string. So we + // drain the whole buffer here only when paused AND a decoder is active; + // otherwise fall through to the head-only path below. + if !readable_is_flowing(stream) && readable_encoding_tag(stream).is_some() { return drain_whole_buffer(stream, values); } @@ -270,7 +276,11 @@ pub(super) fn read_stream_object_mode_chunk(stream: f64) -> f64 { sync_pending_readable_chunks_to_buffer(stream); if stream_hidden_ended(stream) && remaining == 0.0 { clear_pending_readable_chunks(stream); - queue_readable_event(stream); + // Node never emits a final `readable` just to hand the consumer a + // null at EOF: once `read()` returns the last buffered item from an + // ended stream it transitions straight to `end` (endReadable). A + // re-queued `readable` here makes a fixed-count consumer observe an + // extra `read() === null` pair. (internal/streams/readable, v26) schedule_readable_end(stream); } chunk diff --git a/crates/perry-runtime/src/node_stream_readwrite.rs b/crates/perry-runtime/src/node_stream_readwrite.rs index 4b9f77acdf..2ad1459fe3 100644 --- a/crates/perry-runtime/src/node_stream_readwrite.rs +++ b/crates/perry-runtime/src/node_stream_readwrite.rs @@ -461,6 +461,13 @@ pub(super) fn pipe_stream_to_destination(stream: f64, dest: f64, end_dest: bool) if !end_dest { add_pipe_no_end_destination(stream, dest); } + // A flowing destination (e.g. a piped-into PassThrough/Duplex) must consume + // each chunk from its own readable buffer when it emits 'data' live — + // otherwise the chunk lingers in the buffer and the destination's drain + // microtask re-emits it, duplicating every piped chunk. `pipeline()` already + // marks both ends; `pipe()` needs the same on the destination. (matches + // mark_live_pipe_consume_on_emit usage in node_stream_pipeline.rs) + mark_live_pipe_consume_on_emit(dest); install_pipe_destination_listeners(stream, dest); let _ = emit_stream_event(dest, string_value(b"pipe"), &[stream]); set_readable_flowing(stream, f64::from_bits(TAG_TRUE)); diff --git a/crates/perry-runtime/src/node_stream_tests_extra.rs b/crates/perry-runtime/src/node_stream_tests_extra.rs index 30f634fbdf..76e49e3ca8 100644 --- a/crates/perry-runtime/src/node_stream_tests_extra.rs +++ b/crates/perry-runtime/src/node_stream_tests_extra.rs @@ -467,8 +467,15 @@ fn readable_unshift_prepends_chunk_and_returns_hwm_signal() { ); assert_eq!(js_node_stream_method_readable_length(handle), 11.0); - let joined = js_node_stream_method_read(handle, f64::from_bits(TAG_UNDEFINED)); - assert_eq!(stream_test_buffer_bytes(joined), b"hello world"); + // Node v26 `read()` with no size and no decoder returns the head chunk only + // — one buffer at a time — not the whole concatenated buffer. The unshifted + // chunk becomes the new head, so the first read yields "hello " and "world" + // remains for the next read. (internal/streams/readable, howMuchToRead(NaN)) + let head = js_node_stream_method_read(handle, f64::from_bits(TAG_UNDEFINED)); + assert_eq!(stream_test_buffer_bytes(head), b"hello "); + assert_eq!(js_node_stream_method_readable_length(handle), 5.0); + let tail = js_node_stream_method_read(handle, f64::from_bits(TAG_UNDEFINED)); + assert_eq!(stream_test_buffer_bytes(tail), b"world"); assert_eq!(js_node_stream_method_readable_length(handle), 0.0); let opts = crate::object::js_object_alloc(0, 1); @@ -493,8 +500,13 @@ fn readable_unshift_after_eof_before_end_prepends_chunk() { TAG_TRUE ); - let joined = js_node_stream_method_read(handle, f64::from_bits(TAG_UNDEFINED)); - assert_eq!(stream_test_buffer_bytes(joined), b"hello world"); + // Chunk-at-a-time (no decoder): `read()` returns the unshifted head + // ("hello ") and leaves "world" for the next read, even though EOF was + // already signalled via push(null). Matches node v26. + let head = js_node_stream_method_read(handle, f64::from_bits(TAG_UNDEFINED)); + assert_eq!(stream_test_buffer_bytes(head), b"hello "); + let tail = js_node_stream_method_read(handle, f64::from_bits(TAG_UNDEFINED)); + assert_eq!(stream_test_buffer_bytes(tail), b"world"); } fn stream_test_buffer_bytes(value: f64) -> Vec {