Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions crates/perry-runtime/src/node_stream_readable_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,13 @@ pub(super) fn read_stream_available_default(stream: f64) -> f64 {
return f64::from_bits(TAG_NULL);
}

if !readable_is_flowing(stream) {
// Node's `howMuchToRead(NaN)` (internal/streams/readable, v26): WITHOUT a
// string decoder, `read()` always returns just the head chunk — one buffer
// at a time — even when the stream is paused. Only a *decoded* (setEncoding)
// paused stream concatenates the entire buffer into a single string. So we
// drain the whole buffer here only when paused AND a decoder is active;
// otherwise fall through to the head-only path below.
if !readable_is_flowing(stream) && readable_encoding_tag(stream).is_some() {
return drain_whole_buffer(stream, values);
}

Expand Down Expand Up @@ -270,7 +276,11 @@ pub(super) fn read_stream_object_mode_chunk(stream: f64) -> f64 {
sync_pending_readable_chunks_to_buffer(stream);
if stream_hidden_ended(stream) && remaining == 0.0 {
clear_pending_readable_chunks(stream);
queue_readable_event(stream);
// Node never emits a final `readable` just to hand the consumer a
// null at EOF: once `read()` returns the last buffered item from an
// ended stream it transitions straight to `end` (endReadable). A
// re-queued `readable` here makes a fixed-count consumer observe an
// extra `read() === null` pair. (internal/streams/readable, v26)
schedule_readable_end(stream);
}
chunk
Expand Down
7 changes: 7 additions & 0 deletions crates/perry-runtime/src/node_stream_readwrite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,13 @@ pub(super) fn pipe_stream_to_destination(stream: f64, dest: f64, end_dest: bool)
if !end_dest {
add_pipe_no_end_destination(stream, dest);
}
// A flowing destination (e.g. a piped-into PassThrough/Duplex) must consume
// each chunk from its own readable buffer when it emits 'data' live —
// otherwise the chunk lingers in the buffer and the destination's drain
// microtask re-emits it, duplicating every piped chunk. `pipeline()` already
// marks both ends; `pipe()` needs the same on the destination. (matches
// mark_live_pipe_consume_on_emit usage in node_stream_pipeline.rs)
mark_live_pipe_consume_on_emit(dest);
install_pipe_destination_listeners(stream, dest);
let _ = emit_stream_event(dest, string_value(b"pipe"), &[stream]);
set_readable_flowing(stream, f64::from_bits(TAG_TRUE));
Expand Down
20 changes: 16 additions & 4 deletions crates/perry-runtime/src/node_stream_tests_extra.rs
Original file line number Diff line number Diff line change
Expand Up @@ -467,8 +467,15 @@ fn readable_unshift_prepends_chunk_and_returns_hwm_signal() {
);
assert_eq!(js_node_stream_method_readable_length(handle), 11.0);

let joined = js_node_stream_method_read(handle, f64::from_bits(TAG_UNDEFINED));
assert_eq!(stream_test_buffer_bytes(joined), b"hello world");
// Node v26 `read()` with no size and no decoder returns the head chunk only
// — one buffer at a time — not the whole concatenated buffer. The unshifted
// chunk becomes the new head, so the first read yields "hello " and "world"
// remains for the next read. (internal/streams/readable, howMuchToRead(NaN))
let head = js_node_stream_method_read(handle, f64::from_bits(TAG_UNDEFINED));
assert_eq!(stream_test_buffer_bytes(head), b"hello ");
assert_eq!(js_node_stream_method_readable_length(handle), 5.0);
let tail = js_node_stream_method_read(handle, f64::from_bits(TAG_UNDEFINED));
assert_eq!(stream_test_buffer_bytes(tail), b"world");
assert_eq!(js_node_stream_method_readable_length(handle), 0.0);

let opts = crate::object::js_object_alloc(0, 1);
Expand All @@ -493,8 +500,13 @@ fn readable_unshift_after_eof_before_end_prepends_chunk() {
TAG_TRUE
);

let joined = js_node_stream_method_read(handle, f64::from_bits(TAG_UNDEFINED));
assert_eq!(stream_test_buffer_bytes(joined), b"hello world");
// Chunk-at-a-time (no decoder): `read()` returns the unshifted head
// ("hello ") and leaves "world" for the next read, even though EOF was
// already signalled via push(null). Matches node v26.
let head = js_node_stream_method_read(handle, f64::from_bits(TAG_UNDEFINED));
assert_eq!(stream_test_buffer_bytes(head), b"hello ");
let tail = js_node_stream_method_read(handle, f64::from_bits(TAG_UNDEFINED));
assert_eq!(stream_test_buffer_bytes(tail), b"world");
}

fn stream_test_buffer_bytes(value: f64) -> Vec<u8> {
Expand Down