Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion crates/perry-runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,8 @@ pub use object::{
};
pub use promise::{js_is_promise, js_promise_run_microtasks, js_promise_state, js_promise_value};
pub use promise::{
js_promise_new, js_promise_reject, js_promise_rejected, js_promise_resolve, js_promise_resolved,
js_promise_mark_internally_handled, js_promise_new, js_promise_reject, js_promise_rejected,
js_promise_resolve, js_promise_resolved,
};
pub use string::js_string_from_bytes;
pub use value::{
Expand Down
24 changes: 21 additions & 3 deletions crates/perry-runtime/src/node_stream_event_emitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,19 @@ fn capture_rejections_enabled(stream: f64) -> bool {
super::has_truthy_hidden(stream, super::hidden_capture_rejections_key())
}

/// Mark an async listener's returned promise handled when its rejection is to
/// be swallowed (Node's Readable `data` path), so it is not reported as an
/// unhandled rejection at program end (#1545).
fn swallow_listener_rejection(result: f64) {
if crate::promise::js_value_is_promise(result) == 0 {
return;
}
let promise = crate::value::js_nanbox_get_pointer(result) as *mut crate::promise::Promise;
if !promise.is_null() {
crate::promise::mark_rejection_handled(promise);
}
}

fn capture_listener_rejection(stream: f64, result: f64) {
if crate::promise::js_value_is_promise(result) == 0 {
return;
Expand Down Expand Up @@ -755,13 +768,18 @@ pub(super) fn emit_stream_event(stream: f64, event: f64, args: &[f64]) -> f64 {
}
// Node's Readable data delivery path does not route async `data` listener
// rejections through captureRejections; custom EventEmitter-style events do.
let capture_rejections = capture_rejections_enabled(stream)
&& !super::string_value_eq(event, b"error")
&& !super::string_value_eq(event, b"data");
let is_data = super::string_value_eq(event, b"data");
let capture_rejections =
capture_rejections_enabled(stream) && !super::string_value_eq(event, b"error") && !is_data;
for (listener, _) in snapshot {
let result = call_listener_args(stream, listener, args);
if capture_rejections {
capture_listener_rejection(stream, result);
} else if is_data {
// Node's Readable swallows a rejection returned by an async `data`
// listener — it is neither captured to `error` nor surfaced as an
// unhandled rejection. Mark it handled so it stays silent (#1545).
swallow_listener_rejection(result);
}
}
f64::from_bits(super::TAG_TRUE)
Expand Down
14 changes: 12 additions & 2 deletions crates/perry-runtime/src/node_stream_iter_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,14 @@ pub(super) fn settle_result(value: f64) -> Result<f64, f64> {
unsafe {
match (*p).state {
crate::promise::PromiseState::Fulfilled => return Ok((*p).value),
crate::promise::PromiseState::Rejected => return Err((*p).reason),
crate::promise::PromiseState::Rejected => {
// We consumed the rejection by reading `reason` directly
// (no `.then`/`.catch` was attached), so clear it from the
// unhandled-rejection set — Node treats a helper-observed
// callback rejection as handled (#1545).
crate::promise::mark_rejection_handled(p);
return Err((*p).reason);
}
crate::promise::PromiseState::Pending => {}
}
}
Expand All @@ -288,7 +295,10 @@ pub(super) fn settle_result(value: f64) -> Result<f64, f64> {
unsafe {
match (*p).state {
crate::promise::PromiseState::Fulfilled => Ok((*p).value),
crate::promise::PromiseState::Rejected => Err((*p).reason),
crate::promise::PromiseState::Rejected => {
crate::promise::mark_rejection_handled(p);
Err((*p).reason)
}
crate::promise::PromiseState::Pending => Ok(current),
}
}
Expand Down
13 changes: 11 additions & 2 deletions crates/perry-runtime/src/node_stream_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,13 @@ pub(super) fn settle_pipeline_value_with_origin(value: f64) -> Result<PipelineSe
fulfilled_promise: true,
})
}
crate::promise::PromiseState::Rejected => return Err((*promise).reason),
crate::promise::PromiseState::Rejected => {
// Reason consumed by direct read (no reaction attached);
// mark handled so it is not reported as an unhandled
// rejection at program end (#1545).
crate::promise::mark_rejection_handled(promise);
return Err((*promise).reason);
}
crate::promise::PromiseState::Pending => {}
}
}
Expand Down Expand Up @@ -362,7 +368,10 @@ pub(super) fn settle_pipeline_value_with_origin(value: f64) -> Result<PipelineSe
value: (*promise).value,
fulfilled_promise: true,
}),
crate::promise::PromiseState::Rejected => Err((*promise).reason),
crate::promise::PromiseState::Rejected => {
crate::promise::mark_rejection_handled(promise);
Err((*promise).reason)
}
crate::promise::PromiseState::Pending => Ok(PipelineSettledValue {
value,
fulfilled_promise: false,
Expand Down
47 changes: 20 additions & 27 deletions crates/perry-runtime/src/node_stream_readable_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@ pub(super) fn read_stream_available_default(stream: f64) -> f64 {
return read_stream_object_mode_chunk(stream);
}

// `read()` with no size argument drains the entire internal buffer and
// returns it as a single value (Node semantics). The chunks are stored as
// separate entries to preserve boundaries for sized `read(n)` calls, so we
// gather them all here and concatenate before clearing the buffer.
// `read()` with no size argument returns ONE chunk — the head of the
// internal buffer — not the whole buffer concatenated. This matches Node:
// `howMuchToRead(NaN)` yields a single buffered entry, so a `readable`
// drain loop (`while ((c = r.read()) !== null)`) and `for await` both see
// chunk boundaries preserved. Sized `read(n)` (read_stream_exact_size)
// still spans chunks. (#1545)
let mut values = Vec::new();
if let Some(chunks) = readable_hidden_chunks(stream) {
push_chunk_values(chunks, &mut values, 0);
Expand All @@ -46,37 +48,28 @@ pub(super) fn read_stream_available_default(stream: f64) -> f64 {
return f64::from_bits(TAG_NULL);
}

clear_readable_buffer(stream);
let head = values.remove(0);
let mut remaining_len = 0usize;
for value in &values {
let mut bytes = Vec::new();
append_chunk_bytes(*value, &mut bytes, 0);
remaining_len += bytes.len();
}
set_readable_buffer_values(stream, &values, remaining_len);
mark_disturbed(stream);
clear_pending_readable_chunks(stream);
if stream_hidden_ended(stream) {
if stream_hidden_ended(stream) && remaining_len == 0 {
clear_pending_readable_chunks(stream);
queue_readable_event(stream);
schedule_readable_end(stream);
}

let encoded = readable_encoding_tag(stream).is_some();
if encoded {
let mut decoded = Vec::with_capacity(values.len());
for value in values {
if let Some(value) = super::decode_readable_chunk_for_encoding(stream, value) {
decoded.push(value);
}
}
values = decoded;
if values.is_empty() {
return f64::from_bits(TAG_NULL);
}
if values.len() == 1 {
return values[0];
}
let result = crate::string::js_string_concat_chain(values.as_ptr(), values.len() as i32);
return f64::from_bits(JSValue::string_ptr(result).bits());
if readable_encoding_tag(stream).is_some() {
return super::decode_readable_chunk_for_encoding(stream, head)
.unwrap_or(f64::from_bits(TAG_NULL));
}

let mut bytes = Vec::new();
for value in &values {
append_chunk_bytes(*value, &mut bytes, 0);
}
append_chunk_bytes(head, &mut bytes, 0);
buffer_value_from_bytes(&bytes)
}

Expand Down
7 changes: 4 additions & 3 deletions crates/perry-runtime/src/promise/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,10 @@ pub(crate) use then::{
promise_prototype_catch_thunk, promise_prototype_finally_thunk, promise_prototype_then_thunk,
};
pub use then::{
js_promise_bound_method, js_promise_catch, js_promise_finally, js_promise_free, js_promise_new,
js_promise_reason, js_promise_reject, js_promise_resolve, js_promise_resolve_with_promise,
js_promise_result, js_promise_state, js_promise_then, js_promise_value,
js_promise_bound_method, js_promise_catch, js_promise_finally, js_promise_free,
js_promise_mark_internally_handled, js_promise_new, js_promise_reason, js_promise_reject,
js_promise_resolve, js_promise_resolve_with_promise, js_promise_result, js_promise_state,
js_promise_then, js_promise_value,
};

#[cfg(test)]
Expand Down
45 changes: 44 additions & 1 deletion crates/perry-runtime/src/promise/then.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,45 @@ unsafe fn store_promise_next_slot(

thread_local! {
static UNHANDLED_REJECTIONS: RefCell<Vec<usize>> = const { RefCell::new(Vec::new()) };
/// Promises the runtime owns and observes through internal channels — a
/// WHATWG reader/writer `closed` promise, a `[[closeRequest]]`, etc. Node
/// marks these `markPromiseAsHandled` at creation so that an abort / error
/// / cancel that later rejects them is never surfaced as an unhandled
/// rejection. We mirror that with a persistent membership set consulted at
/// rejection-track time. Stays empty for non-stream programs, so the hot
/// reject path pays nothing (#1545).
static INTERNALLY_HANDLED: RefCell<std::collections::HashSet<usize>> =
RefCell::new(std::collections::HashSet::new());
}

/// Mark a promise as internally handled (Node's `markPromiseAsHandled`): a
/// later rejection of it is never reported as unhandled. Used by the WHATWG
/// stream implementation for the internal `closed` / `closeRequest` promises it
/// settles on abort/error/cancel without a user-attached reaction (#1545).
#[no_mangle]
pub extern "C" fn js_promise_mark_internally_handled(promise: *mut Promise) {
if promise.is_null() {
return;
}
INTERNALLY_HANDLED.with(|s| {
s.borrow_mut().insert(promise as usize);
});
// If it already rejected before being marked, drop it from the set now.
mark_rejection_handled(promise);
}

/// Keep the stdlib-facing marker alive through the dead-strip pass on the
/// PERRY_NO_AUTO_OPTIMIZE prebuilt-lib link (same pattern as the program-end
/// hook anchor below).
#[used]
static KEEP_PROMISE_MARK_INTERNALLY_HANDLED: extern "C" fn(*mut Promise) =
js_promise_mark_internally_handled;

fn is_internally_handled(promise: *mut Promise) -> bool {
INTERNALLY_HANDLED.with(|s| {
let s = s.borrow();
!s.is_empty() && s.contains(&(promise as usize))
})
}

/// Record a rejection that has no reaction attached yet.
Expand Down Expand Up @@ -691,7 +730,11 @@ pub extern "C" fn js_promise_reject(promise: *mut Promise, reason: f64) {
// is non-null so `has_normal_handler` is true here and the
// rejection propagates to `next`, whose own settlement re-runs
// this check — the leaf unhandled promise is the one tracked.
track_unhandled_rejection(promise);
// Promises the runtime marked internally handled (stream `closed`
// promises, etc.) are never reported (#1545).
if !is_internally_handled(promise) {
track_unhandled_rejection(promise);
}
}
if has_settle_listeners
|| !promise_all_states.is_empty()
Expand Down
21 changes: 16 additions & 5 deletions crates/perry-stdlib/src/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,24 @@
use perry_runtime::{
js_array_alloc, js_array_push, js_closure_call0, js_closure_call1, js_closure_call2,
js_nanbox_get_pointer, js_object_alloc, js_object_get_field_by_name, js_object_set_field,
js_object_set_field_by_name, js_object_set_keys, js_promise_new, js_promise_reject,
js_promise_resolve, js_string_from_bytes, ClosureHeader, JSValue, ObjectHeader, Promise,
js_object_set_field_by_name, js_object_set_keys, js_promise_mark_internally_handled,
js_promise_new, js_promise_reject, js_promise_resolve, js_string_from_bytes, ClosureHeader,
JSValue, ObjectHeader, Promise,
};
use std::collections::{HashMap, VecDeque};
use std::os::raw::c_int;
use std::sync::Mutex;

/// Allocate a promise the stream machinery owns and observes internally — the
/// reader/writer `closed`, writer `ready`, and `[[closeRequest]]` promises.
/// Node marks these `markPromiseAsHandled` so that an abort / error / cancel
/// that rejects them is never surfaced as an unhandled rejection (#1545).
pub(crate) fn internal_promise() -> *mut Promise {
let p = js_promise_new();
js_promise_mark_internally_handled(p);
p
}

mod byob;
mod pipe;
mod strategy;
Expand Down Expand Up @@ -604,8 +615,8 @@ fn alloc_writable_with_strategy(
strategy_size_cb: i64,
) -> usize {
let id = next_id(&NEXT_STREAM_ID);
let ready = js_promise_new();
let closed = js_promise_new();
let ready = internal_promise();
let closed = internal_promise();
js_promise_resolve(ready, f64::from_bits(TAG_UNDEFINED));
WRITABLE_STREAMS.lock().unwrap().insert(
id,
Expand Down Expand Up @@ -935,7 +946,7 @@ pub unsafe extern "C" fn js_readable_stream_get_reader_with_options(
true
} else {
let reader_id = next_id(&NEXT_STREAM_ID);
let closed_p = js_promise_new();
let closed_p = internal_promise();
if s.state == ReadableState::Closed {
js_promise_resolve(closed_p, f64::from_bits(TAG_UNDEFINED));
} else if s.state == ReadableState::Errored {
Expand Down
2 changes: 1 addition & 1 deletion crates/perry-stdlib/src/streams/byob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ pub(super) unsafe fn get_reader_for_stream(stream_id: usize, is_byob: bool) -> u
throw_type_error("ReadableStream is locked");
}
let reader_id = next_id(&NEXT_STREAM_ID);
let closed_p = js_promise_new();
let closed_p = internal_promise();
if s.state == ReadableState::Closed {
js_promise_resolve(closed_p, f64::from_bits(TAG_UNDEFINED));
} else if s.state == ReadableState::Errored {
Expand Down
4 changes: 2 additions & 2 deletions crates/perry-stdlib/src/streams/transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ unsafe fn alloc_transform_stream_with_strategies(
// Allocate writable side; its write_cb is synthesized via the
// dispatcher table below to invoke transform(chunk, controller).
let writable_id = next_id(&NEXT_STREAM_ID);
let ready = js_promise_new();
let closed = js_promise_new();
let ready = internal_promise();
let closed = internal_promise();
js_promise_resolve(ready, f64::from_bits(TAG_UNDEFINED));
WRITABLE_STREAMS.lock().unwrap().insert(
writable_id,
Expand Down
2 changes: 1 addition & 1 deletion crates/perry-stdlib/src/streams/writable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ fn sync_writer_ready_promise(stream_id: usize, writer_id: usize, ready: *mut Pro
}

pub(super) unsafe fn install_writable_backpressure_ready(stream_id: usize, writer_id: usize) {
let ready = js_promise_new();
let ready = internal_promise();
if let Some(s) = WRITABLE_STREAMS.lock().unwrap().get_mut(&stream_id) {
s.ready_promise = ready;
}
Expand Down
Loading