Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 20 additions & 63 deletions crates/perry-ext-events/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ use target_helpers::{
};
mod module_iterators;
use module_iterators::{
events_on_abort_listener, events_on_queue_listener, events_once_event_target_listener,
first_rest_arg_or_undefined, rest_array_or_empty,
events_on_abort_listener, events_on_queue_listener, events_once_abort_listener,
events_once_event_target_listener, events_once_stream_reject_listener,
events_once_stream_resolve_listener,
};

const MIN_HEAP_POINTER: u64 = 0x1000;
Expand Down Expand Up @@ -81,6 +82,15 @@ extern "C" {
);
fn js_promise_resolve(promise: *mut Promise, value: f64);
fn js_promise_reject(promise: *mut Promise, reason: f64);
// events.once allocates its Promise via perry-ffi's `JsPromise::new()`
// (→ `perry_ffi_promise_new` → a registered native-async token that pins
// the Promise and reports the event loop as "busy"). Because we settle
// synchronously through `js_promise_resolve`/`js_promise_reject` above —
// bypassing the deferred completion machinery that would normally retire
// the token — the token leaks and `js_native_async_has_active()` keeps the
// loop alive forever (the `events.once(em, name)` + `emit` hang). Drop the
// orphaned token right after each synchronous settle.
fn js_native_async_drop_promise_token(promise: *mut Promise);
fn js_promise_then(
promise: *mut Promise,
on_fulfilled: *mut RawClosureHeader,
Expand Down Expand Up @@ -1092,6 +1102,7 @@ unsafe fn drain_pending_once_promises(
// Synchronous resolve — see the comment on the extern at the
// top of this file for why we bypass `JsPromise::resolve`.
js_promise_resolve(pending.promise, f64::from_bits(bits));
js_native_async_drop_promise_token(pending.promise);
}
}

Expand All @@ -1114,6 +1125,7 @@ unsafe fn reject_pending_once_promises_for_error(
cleanup_pending_abort_listener(&pending);
if !pending.promise.is_null() {
js_promise_reject(pending.promise, error_value);
js_native_async_drop_promise_token(pending.promise);
rejected_any = true;
}
}
Expand Down Expand Up @@ -1562,69 +1574,11 @@ pub unsafe extern "C" fn js_event_emitter_raw_listeners(
// Module-level helpers — `events.once(em, name)` / `events.on(em, name)` /
// `events.getEventListeners(em, name)` / `events.listenerCount(em, name)` /
// `events.setMaxListeners(n, em)` / `events.getMaxListeners(em)`.
// The `events.once` listener trampolines (abort / stream-resolve /
// stream-reject) live in `module_iterators.rs` alongside
// `events_once_event_target_listener`.
// ============================================================================

extern "C" fn events_once_abort_listener(closure: *const RawClosureHeader) -> f64 {
unsafe {
let handle = js_closure_get_capture_ptr(closure, 0) as Handle;
let promise = js_closure_get_capture_ptr(closure, 1) as *mut Promise;
let pending = get_event_emitter_mut(handle)
.and_then(|emitter| remove_pending_once_promise(emitter, promise));
if let Some(pending) = pending {
cleanup_pending_abort_listener(&pending);
if !pending.promise.is_null() {
js_promise_reject(pending.promise, js_abort_error_value());
}
}
}
undefined_value()
}

extern "C" fn events_once_stream_resolve_listener(
closure: *const RawClosureHeader,
rest: f64,
) -> f64 {
unsafe {
let promise = js_closure_get_capture_ptr(closure, 0) as *mut Promise;
let handle = js_closure_get_capture_ptr(closure, 1) as Handle;
let error_listener = js_closure_get_capture_ptr(closure, 2);
let error_event_ptr = js_closure_get_capture_ptr(closure, 3);
if promise.is_null() {
return undefined_value();
}
if handle != 0 && error_listener != 0 && error_event_ptr != 0 {
let error_event =
f64::from_bits(nanbox_string_bits(error_event_ptr as *mut StringHeader));
let error_listener_value = nanbox_pointer_bits(error_listener);
let _ =
js_node_stream_method_remove_listener(handle, error_event, error_listener_value);
}
js_promise_resolve(promise, rest_array_or_empty(rest));
}
undefined_value()
}

extern "C" fn events_once_stream_reject_listener(
closure: *const RawClosureHeader,
rest: f64,
) -> f64 {
unsafe {
let promise = js_closure_get_capture_ptr(closure, 0) as *mut Promise;
let handle = js_closure_get_capture_ptr(closure, 1) as Handle;
let event_name_ptr = js_closure_get_capture_ptr(closure, 2);
let resolve_listener = js_closure_get_capture_ptr(closure, 3);
if handle != 0 && event_name_ptr != 0 && resolve_listener != 0 {
let event = f64::from_bits(nanbox_string_bits(event_name_ptr as *mut StringHeader));
let resolve_listener_value = nanbox_pointer_bits(resolve_listener);
let _ = js_node_stream_method_remove_listener(handle, event, resolve_listener_value);
}
if !promise.is_null() {
js_promise_reject(promise, first_rest_arg_or_undefined(rest));
}
}
undefined_value()
}

/// `events.once(emitter, eventName[, options])` — returns a Promise that resolves
/// to the args array from the next matching event.
///
Expand All @@ -1651,6 +1605,7 @@ pub unsafe extern "C" fn js_events_once(
target_value,
)),
);
js_native_async_drop_promise_token(raw);
return raw;
}
};
Expand All @@ -1661,11 +1616,13 @@ pub unsafe extern "C" fn js_events_once(
Ok(signal) => signal,
Err(error) => {
js_promise_reject(raw, error);
js_native_async_drop_promise_token(raw);
return raw;
}
};
if signal.is_some_and(signal_is_aborted) {
js_promise_reject(raw, js_abort_error_value());
js_native_async_drop_promise_token(raw);
return raw;
}
if let EventHelperTarget::EventEmitter(handle) = target {
Expand Down
65 changes: 65 additions & 0 deletions crates/perry-ext-events/src/module_iterators.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,71 @@ pub(super) extern "C" fn events_once_event_target_listener(
let mut args = js_array_alloc(0);
args = js_array_push_f64(args, arg0);
js_promise_resolve(promise, nanbox_pointer_bits(args as i64));
js_native_async_drop_promise_token(promise);
}
}
undefined_value()
}

pub(super) extern "C" fn events_once_abort_listener(closure: *const RawClosureHeader) -> f64 {
unsafe {
let handle = js_closure_get_capture_ptr(closure, 0) as Handle;
let promise = js_closure_get_capture_ptr(closure, 1) as *mut Promise;
let pending = get_event_emitter_mut(handle)
.and_then(|emitter| remove_pending_once_promise(emitter, promise));
if let Some(pending) = pending {
cleanup_pending_abort_listener(&pending);
if !pending.promise.is_null() {
js_promise_reject(pending.promise, js_abort_error_value());
js_native_async_drop_promise_token(pending.promise);
}
}
}
undefined_value()
}

pub(super) extern "C" fn events_once_stream_resolve_listener(
closure: *const RawClosureHeader,
rest: f64,
) -> f64 {
unsafe {
let promise = js_closure_get_capture_ptr(closure, 0) as *mut Promise;
let handle = js_closure_get_capture_ptr(closure, 1) as Handle;
let error_listener = js_closure_get_capture_ptr(closure, 2);
let error_event_ptr = js_closure_get_capture_ptr(closure, 3);
if promise.is_null() {
return undefined_value();
}
if handle != 0 && error_listener != 0 && error_event_ptr != 0 {
let error_event =
f64::from_bits(nanbox_string_bits(error_event_ptr as *mut StringHeader));
let error_listener_value = nanbox_pointer_bits(error_listener);
let _ =
js_node_stream_method_remove_listener(handle, error_event, error_listener_value);
}
js_promise_resolve(promise, rest_array_or_empty(rest));
js_native_async_drop_promise_token(promise);
}
undefined_value()
}

pub(super) extern "C" fn events_once_stream_reject_listener(
closure: *const RawClosureHeader,
rest: f64,
) -> f64 {
unsafe {
let promise = js_closure_get_capture_ptr(closure, 0) as *mut Promise;
let handle = js_closure_get_capture_ptr(closure, 1) as Handle;
let event_name_ptr = js_closure_get_capture_ptr(closure, 2);
let resolve_listener = js_closure_get_capture_ptr(closure, 3);
if handle != 0 && event_name_ptr != 0 && resolve_listener != 0 {
let event = f64::from_bits(nanbox_string_bits(event_name_ptr as *mut StringHeader));
let resolve_listener_value = nanbox_pointer_bits(resolve_listener);
let _ = js_node_stream_method_remove_listener(handle, event, resolve_listener_value);
}
if !promise.is_null() {
js_promise_reject(promise, first_rest_arg_or_undefined(rest));
js_native_async_drop_promise_token(promise);
}
}
undefined_value()
Expand Down
5 changes: 3 additions & 2 deletions crates/perry-runtime/src/promise/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ pub use native_async::{
js_native_async_completion_new, js_native_async_completion_promise,
js_native_async_completion_reject_bits, js_native_async_completion_reject_promise_bits,
js_native_async_completion_reject_string, js_native_async_completion_resolve_bits,
js_native_async_completion_resolve_promise_bits, js_native_async_has_active,
js_native_async_process_pending, scan_native_async_completion_roots_mut, NativeAsyncCompletion,
js_native_async_completion_resolve_promise_bits, js_native_async_drop_promise_token,
js_native_async_has_active, js_native_async_process_pending,
scan_native_async_completion_roots_mut, NativeAsyncCompletion,
PERRY_NATIVE_ASYNC_ALREADY_COMPLETED, PERRY_NATIVE_ASYNC_CLEANUP_ON_CANCEL,
PERRY_NATIVE_ASYNC_CLEANUP_ON_REJECT, PERRY_NATIVE_ASYNC_CLEANUP_ON_SUCCESS,
PERRY_NATIVE_ASYNC_INVALID, PERRY_NATIVE_ASYNC_OK, PERRY_NATIVE_ASYNC_THREAD_MAIN,
Expand Down
32 changes: 32 additions & 0 deletions crates/perry-runtime/src/promise/native_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,38 @@ pub extern "C" fn js_native_async_process_pending() -> i32 {
processed
}

/// Drop the native-async completion token associated with a Promise that was
/// settled *synchronously* (via `js_promise_resolve`/`js_promise_reject`)
/// outside the deferred completion machinery.
///
/// Ext crates such as `perry-ext-events` allocate their `events.once` Promise
/// through perry-ffi's `JsPromise::new()` → `perry_ffi_promise_new()`, which
/// registers a native-async token (and pins the Promise) so a worker can
/// resolve it later. `events.once`, however, settles synchronously from
/// `emit(...)` and deliberately bypasses the deferred resolve path (see the
/// extern comment in perry-ext-events). That bypass never runs
/// `js_native_async_process_pending`, so the token stays in the registry
/// forever and `js_native_async_has_active()` keeps reporting work — the
/// process hangs after the awaited event already fired (the
/// `events.once(emitter, name)` + `emit` hang). Calling this right after the
/// synchronous settle removes the orphaned token (mirroring the cleanup
/// `js_native_async_process_pending` performs) so the event loop can drain.
#[no_mangle]
pub extern "C" fn js_native_async_drop_promise_token(promise: *mut Promise) {
if promise.is_null() {
return;
}
let token_ptr = {
let registry = crate::gc::lock_gc_root_registry(registry());
registry.by_promise.get(&(promise as usize)).copied()
};
if let Some(token_ptr) = token_ptr {
let token = unsafe { &*(token_ptr as *const NativeAsyncCompletion) };
token.state.store(STATE_COMPLETED, Ordering::Release);
remove_token_from_registry(token_ptr, promise as usize);
}
}

/// Return 1 while there are live or queued native async completions.
#[no_mangle]
pub extern "C" fn js_native_async_has_active() -> i32 {
Expand Down
Loading