diff --git a/crates/perry-runtime/src/lib.rs b/crates/perry-runtime/src/lib.rs index ec611bcedb..d812aa174e 100644 --- a/crates/perry-runtime/src/lib.rs +++ b/crates/perry-runtime/src/lib.rs @@ -234,7 +234,8 @@ pub use object::{ }; pub use promise::{js_is_promise, js_promise_run_microtasks, js_promise_state, js_promise_value}; pub use promise::{ - js_promise_new, js_promise_reject, js_promise_rejected, js_promise_resolve, js_promise_resolved, + js_promise_mark_internally_handled, js_promise_new, js_promise_reject, js_promise_rejected, + js_promise_resolve, js_promise_resolved, }; pub use string::js_string_from_bytes; pub use value::{ diff --git a/crates/perry-runtime/src/node_stream_event_emitter.rs b/crates/perry-runtime/src/node_stream_event_emitter.rs index 2c2bb9ed66..a5a49ec6c3 100644 --- a/crates/perry-runtime/src/node_stream_event_emitter.rs +++ b/crates/perry-runtime/src/node_stream_event_emitter.rs @@ -690,6 +690,19 @@ fn capture_rejections_enabled(stream: f64) -> bool { super::has_truthy_hidden(stream, super::hidden_capture_rejections_key()) } +/// Mark an async listener's returned promise handled when its rejection is to +/// be swallowed (Node's Readable `data` path), so it is not reported as an +/// unhandled rejection at program end (#1545). +fn swallow_listener_rejection(result: f64) { + if crate::promise::js_value_is_promise(result) == 0 { + return; + } + let promise = crate::value::js_nanbox_get_pointer(result) as *mut crate::promise::Promise; + if !promise.is_null() { + crate::promise::mark_rejection_handled(promise); + } +} + fn capture_listener_rejection(stream: f64, result: f64) { if crate::promise::js_value_is_promise(result) == 0 { return; @@ -755,13 +768,18 @@ pub(super) fn emit_stream_event(stream: f64, event: f64, args: &[f64]) -> f64 { } // Node's Readable data delivery path does not route async `data` listener // rejections through captureRejections; custom EventEmitter-style events do. - let capture_rejections = capture_rejections_enabled(stream) - && !super::string_value_eq(event, b"error") - && !super::string_value_eq(event, b"data"); + let is_data = super::string_value_eq(event, b"data"); + let capture_rejections = + capture_rejections_enabled(stream) && !super::string_value_eq(event, b"error") && !is_data; for (listener, _) in snapshot { let result = call_listener_args(stream, listener, args); if capture_rejections { capture_listener_rejection(stream, result); + } else if is_data { + // Node's Readable swallows a rejection returned by an async `data` + // listener — it is neither captured to `error` nor surfaced as an + // unhandled rejection. Mark it handled so it stays silent (#1545). + swallow_listener_rejection(result); } } f64::from_bits(super::TAG_TRUE) diff --git a/crates/perry-runtime/src/node_stream_iter_helpers.rs b/crates/perry-runtime/src/node_stream_iter_helpers.rs index 87363fd2f3..47ada1cd5e 100644 --- a/crates/perry-runtime/src/node_stream_iter_helpers.rs +++ b/crates/perry-runtime/src/node_stream_iter_helpers.rs @@ -265,7 +265,14 @@ pub(super) fn settle_result(value: f64) -> Result { unsafe { match (*p).state { crate::promise::PromiseState::Fulfilled => return Ok((*p).value), - crate::promise::PromiseState::Rejected => return Err((*p).reason), + crate::promise::PromiseState::Rejected => { + // We consumed the rejection by reading `reason` directly + // (no `.then`/`.catch` was attached), so clear it from the + // unhandled-rejection set — Node treats a helper-observed + // callback rejection as handled (#1545). + crate::promise::mark_rejection_handled(p); + return Err((*p).reason); + } crate::promise::PromiseState::Pending => {} } } @@ -288,7 +295,10 @@ pub(super) fn settle_result(value: f64) -> Result { unsafe { match (*p).state { crate::promise::PromiseState::Fulfilled => Ok((*p).value), - crate::promise::PromiseState::Rejected => Err((*p).reason), + crate::promise::PromiseState::Rejected => { + crate::promise::mark_rejection_handled(p); + Err((*p).reason) + } crate::promise::PromiseState::Pending => Ok(current), } } diff --git a/crates/perry-runtime/src/node_stream_pipeline.rs b/crates/perry-runtime/src/node_stream_pipeline.rs index 7823ec3678..303e480c32 100644 --- a/crates/perry-runtime/src/node_stream_pipeline.rs +++ b/crates/perry-runtime/src/node_stream_pipeline.rs @@ -327,7 +327,13 @@ pub(super) fn settle_pipeline_value_with_origin(value: f64) -> Result return Err((*promise).reason), + crate::promise::PromiseState::Rejected => { + // Reason consumed by direct read (no reaction attached); + // mark handled so it is not reported as an unhandled + // rejection at program end (#1545). + crate::promise::mark_rejection_handled(promise); + return Err((*promise).reason); + } crate::promise::PromiseState::Pending => {} } } @@ -362,7 +368,10 @@ pub(super) fn settle_pipeline_value_with_origin(value: f64) -> Result Err((*promise).reason), + crate::promise::PromiseState::Rejected => { + crate::promise::mark_rejection_handled(promise); + Err((*promise).reason) + } crate::promise::PromiseState::Pending => Ok(PipelineSettledValue { value, fulfilled_promise: false, diff --git a/crates/perry-runtime/src/node_stream_readable_read.rs b/crates/perry-runtime/src/node_stream_readable_read.rs index 768db25091..9a3de45094 100644 --- a/crates/perry-runtime/src/node_stream_readable_read.rs +++ b/crates/perry-runtime/src/node_stream_readable_read.rs @@ -30,10 +30,12 @@ pub(super) fn read_stream_available_default(stream: f64) -> f64 { return read_stream_object_mode_chunk(stream); } - // `read()` with no size argument drains the entire internal buffer and - // returns it as a single value (Node semantics). The chunks are stored as - // separate entries to preserve boundaries for sized `read(n)` calls, so we - // gather them all here and concatenate before clearing the buffer. + // `read()` with no size argument returns ONE chunk — the head of the + // internal buffer — not the whole buffer concatenated. This matches Node: + // `howMuchToRead(NaN)` yields a single buffered entry, so a `readable` + // drain loop (`while ((c = r.read()) !== null)`) and `for await` both see + // chunk boundaries preserved. Sized `read(n)` (read_stream_exact_size) + // still spans chunks. (#1545) let mut values = Vec::new(); if let Some(chunks) = readable_hidden_chunks(stream) { push_chunk_values(chunks, &mut values, 0); @@ -46,37 +48,28 @@ pub(super) fn read_stream_available_default(stream: f64) -> f64 { return f64::from_bits(TAG_NULL); } - clear_readable_buffer(stream); + let head = values.remove(0); + let mut remaining_len = 0usize; + for value in &values { + let mut bytes = Vec::new(); + append_chunk_bytes(*value, &mut bytes, 0); + remaining_len += bytes.len(); + } + set_readable_buffer_values(stream, &values, remaining_len); mark_disturbed(stream); - clear_pending_readable_chunks(stream); - if stream_hidden_ended(stream) { + if stream_hidden_ended(stream) && remaining_len == 0 { + clear_pending_readable_chunks(stream); queue_readable_event(stream); schedule_readable_end(stream); } - let encoded = readable_encoding_tag(stream).is_some(); - if encoded { - let mut decoded = Vec::with_capacity(values.len()); - for value in values { - if let Some(value) = super::decode_readable_chunk_for_encoding(stream, value) { - decoded.push(value); - } - } - values = decoded; - if values.is_empty() { - return f64::from_bits(TAG_NULL); - } - if values.len() == 1 { - return values[0]; - } - let result = crate::string::js_string_concat_chain(values.as_ptr(), values.len() as i32); - return f64::from_bits(JSValue::string_ptr(result).bits()); + if readable_encoding_tag(stream).is_some() { + return super::decode_readable_chunk_for_encoding(stream, head) + .unwrap_or(f64::from_bits(TAG_NULL)); } let mut bytes = Vec::new(); - for value in &values { - append_chunk_bytes(*value, &mut bytes, 0); - } + append_chunk_bytes(head, &mut bytes, 0); buffer_value_from_bytes(&bytes) } diff --git a/crates/perry-runtime/src/promise/mod.rs b/crates/perry-runtime/src/promise/mod.rs index f5a79ee4e5..19a623a427 100644 --- a/crates/perry-runtime/src/promise/mod.rs +++ b/crates/perry-runtime/src/promise/mod.rs @@ -65,9 +65,10 @@ pub(crate) use then::{ promise_prototype_catch_thunk, promise_prototype_finally_thunk, promise_prototype_then_thunk, }; pub use then::{ - js_promise_bound_method, js_promise_catch, js_promise_finally, js_promise_free, js_promise_new, - js_promise_reason, js_promise_reject, js_promise_resolve, js_promise_resolve_with_promise, - js_promise_result, js_promise_state, js_promise_then, js_promise_value, + js_promise_bound_method, js_promise_catch, js_promise_finally, js_promise_free, + js_promise_mark_internally_handled, js_promise_new, js_promise_reason, js_promise_reject, + js_promise_resolve, js_promise_resolve_with_promise, js_promise_result, js_promise_state, + js_promise_then, js_promise_value, }; #[cfg(test)] diff --git a/crates/perry-runtime/src/promise/then.rs b/crates/perry-runtime/src/promise/then.rs index 49ea579b3c..e072c671e4 100644 --- a/crates/perry-runtime/src/promise/then.rs +++ b/crates/perry-runtime/src/promise/then.rs @@ -48,6 +48,45 @@ unsafe fn store_promise_next_slot( thread_local! { static UNHANDLED_REJECTIONS: RefCell> = const { RefCell::new(Vec::new()) }; + /// Promises the runtime owns and observes through internal channels — a + /// WHATWG reader/writer `closed` promise, a `[[closeRequest]]`, etc. Node + /// marks these `markPromiseAsHandled` at creation so that an abort / error + /// / cancel that later rejects them is never surfaced as an unhandled + /// rejection. We mirror that with a persistent membership set consulted at + /// rejection-track time. Stays empty for non-stream programs, so the hot + /// reject path pays nothing (#1545). + static INTERNALLY_HANDLED: RefCell> = + RefCell::new(std::collections::HashSet::new()); +} + +/// Mark a promise as internally handled (Node's `markPromiseAsHandled`): a +/// later rejection of it is never reported as unhandled. Used by the WHATWG +/// stream implementation for the internal `closed` / `closeRequest` promises it +/// settles on abort/error/cancel without a user-attached reaction (#1545). +#[no_mangle] +pub extern "C" fn js_promise_mark_internally_handled(promise: *mut Promise) { + if promise.is_null() { + return; + } + INTERNALLY_HANDLED.with(|s| { + s.borrow_mut().insert(promise as usize); + }); + // If it already rejected before being marked, drop it from the set now. + mark_rejection_handled(promise); +} + +/// Keep the stdlib-facing marker alive through the dead-strip pass on the +/// PERRY_NO_AUTO_OPTIMIZE prebuilt-lib link (same pattern as the program-end +/// hook anchor below). +#[used] +static KEEP_PROMISE_MARK_INTERNALLY_HANDLED: extern "C" fn(*mut Promise) = + js_promise_mark_internally_handled; + +fn is_internally_handled(promise: *mut Promise) -> bool { + INTERNALLY_HANDLED.with(|s| { + let s = s.borrow(); + !s.is_empty() && s.contains(&(promise as usize)) + }) } /// Record a rejection that has no reaction attached yet. @@ -691,7 +730,11 @@ pub extern "C" fn js_promise_reject(promise: *mut Promise, reason: f64) { // is non-null so `has_normal_handler` is true here and the // rejection propagates to `next`, whose own settlement re-runs // this check — the leaf unhandled promise is the one tracked. - track_unhandled_rejection(promise); + // Promises the runtime marked internally handled (stream `closed` + // promises, etc.) are never reported (#1545). + if !is_internally_handled(promise) { + track_unhandled_rejection(promise); + } } if has_settle_listeners || !promise_all_states.is_empty() diff --git a/crates/perry-stdlib/src/streams.rs b/crates/perry-stdlib/src/streams.rs index 02d6555265..335497986f 100644 --- a/crates/perry-stdlib/src/streams.rs +++ b/crates/perry-stdlib/src/streams.rs @@ -28,13 +28,24 @@ use perry_runtime::{ js_array_alloc, js_array_push, js_closure_call0, js_closure_call1, js_closure_call2, js_nanbox_get_pointer, js_object_alloc, js_object_get_field_by_name, js_object_set_field, - js_object_set_field_by_name, js_object_set_keys, js_promise_new, js_promise_reject, - js_promise_resolve, js_string_from_bytes, ClosureHeader, JSValue, ObjectHeader, Promise, + js_object_set_field_by_name, js_object_set_keys, js_promise_mark_internally_handled, + js_promise_new, js_promise_reject, js_promise_resolve, js_string_from_bytes, ClosureHeader, + JSValue, ObjectHeader, Promise, }; use std::collections::{HashMap, VecDeque}; use std::os::raw::c_int; use std::sync::Mutex; +/// Allocate a promise the stream machinery owns and observes internally — the +/// reader/writer `closed`, writer `ready`, and `[[closeRequest]]` promises. +/// Node marks these `markPromiseAsHandled` so that an abort / error / cancel +/// that rejects them is never surfaced as an unhandled rejection (#1545). +pub(crate) fn internal_promise() -> *mut Promise { + let p = js_promise_new(); + js_promise_mark_internally_handled(p); + p +} + mod byob; mod pipe; mod strategy; @@ -604,8 +615,8 @@ fn alloc_writable_with_strategy( strategy_size_cb: i64, ) -> usize { let id = next_id(&NEXT_STREAM_ID); - let ready = js_promise_new(); - let closed = js_promise_new(); + let ready = internal_promise(); + let closed = internal_promise(); js_promise_resolve(ready, f64::from_bits(TAG_UNDEFINED)); WRITABLE_STREAMS.lock().unwrap().insert( id, @@ -935,7 +946,7 @@ pub unsafe extern "C" fn js_readable_stream_get_reader_with_options( true } else { let reader_id = next_id(&NEXT_STREAM_ID); - let closed_p = js_promise_new(); + let closed_p = internal_promise(); if s.state == ReadableState::Closed { js_promise_resolve(closed_p, f64::from_bits(TAG_UNDEFINED)); } else if s.state == ReadableState::Errored { diff --git a/crates/perry-stdlib/src/streams/byob.rs b/crates/perry-stdlib/src/streams/byob.rs index 0142667444..7a7ca441e2 100644 --- a/crates/perry-stdlib/src/streams/byob.rs +++ b/crates/perry-stdlib/src/streams/byob.rs @@ -212,7 +212,7 @@ pub(super) unsafe fn get_reader_for_stream(stream_id: usize, is_byob: bool) -> u throw_type_error("ReadableStream is locked"); } let reader_id = next_id(&NEXT_STREAM_ID); - let closed_p = js_promise_new(); + let closed_p = internal_promise(); if s.state == ReadableState::Closed { js_promise_resolve(closed_p, f64::from_bits(TAG_UNDEFINED)); } else if s.state == ReadableState::Errored { diff --git a/crates/perry-stdlib/src/streams/transform.rs b/crates/perry-stdlib/src/streams/transform.rs index ee87996232..32029bc5d7 100644 --- a/crates/perry-stdlib/src/streams/transform.rs +++ b/crates/perry-stdlib/src/streams/transform.rs @@ -75,8 +75,8 @@ unsafe fn alloc_transform_stream_with_strategies( // Allocate writable side; its write_cb is synthesized via the // dispatcher table below to invoke transform(chunk, controller). let writable_id = next_id(&NEXT_STREAM_ID); - let ready = js_promise_new(); - let closed = js_promise_new(); + let ready = internal_promise(); + let closed = internal_promise(); js_promise_resolve(ready, f64::from_bits(TAG_UNDEFINED)); WRITABLE_STREAMS.lock().unwrap().insert( writable_id, diff --git a/crates/perry-stdlib/src/streams/writable.rs b/crates/perry-stdlib/src/streams/writable.rs index 0d0228fb39..9eb2405cec 100644 --- a/crates/perry-stdlib/src/streams/writable.rs +++ b/crates/perry-stdlib/src/streams/writable.rs @@ -239,7 +239,7 @@ fn sync_writer_ready_promise(stream_id: usize, writer_id: usize, ready: *mut Pro } pub(super) unsafe fn install_writable_backpressure_ready(stream_id: usize, writer_id: usize) { - let ready = js_promise_new(); + let ready = internal_promise(); if let Some(s) = WRITABLE_STREAMS.lock().unwrap().get_mut(&stream_id) { s.ready_promise = ready; }