Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 16 additions & 18 deletions crates/perry-ext-http-server/src/http2_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -688,22 +688,13 @@ pub unsafe extern "C" fn js_node_http2_server_listen(server_handle: i64, args_ar
});
});

// Bind `this` to the server for the `'listening'` listeners + the
// optional `cb` so `this.address().port` resolves inside the listen
// callback, matching Node (#2132).
let this_val = handle_to_pointer_f64(server_handle);
let listening_listeners = get_handle::<Http2SecureServer>(server_handle)
.and_then(|s| s.base.listeners.get("listening").cloned())
.unwrap_or_default();
with_implicit_this(this_val, || emit_no_arg_to_listeners(&listening_listeners));
if callback != 0 {
let raw = callback as *const RawClosureHeader;
let closure = JsClosure::from_raw(raw);
if !closure.is_null() {
with_implicit_this(this_val, || {
let _ = closure.call0();
});
}
// #4903 — queue the `'listening'` emit + the optional `cb` for the
// main-thread pump instead of firing synchronously; Node emits
// `'listening'` on a later tick, after `const server = ...` has been
// assigned. The pump binds `this` to the server when it fires them
// (#2132). See `server::drain_deferred_listen_for`.
if let Some(s) = get_handle_mut::<Http2SecureServer>(server_handle) {
crate::server::queue_deferred_listening_emit(&mut s.base, callback);
}

// Closes #604 — `listen()` is now non-blocking; the unified
Expand Down Expand Up @@ -844,6 +835,9 @@ pub(crate) fn try_recv_pending_h2_nonblocking(server_handle: i64) -> Option<Http
pub(crate) fn process_pending_h2(pending: HttpPendingRequest) {
let req_f64 = handle_to_pointer_f64(pending.request_handle);
let res_f64 = handle_to_pointer_f64(pending.response_handle);
// #4903 — Node invokes `'request'` listeners (and the `createServer`
// handler, which is one) with `this` bound to the server.
let server_this = handle_to_pointer_f64(pending.server_handle);
for cb in &pending.request_listeners {
if *cb == 0 {
continue;
Expand All @@ -852,7 +846,9 @@ pub(crate) fn process_pending_h2(pending: HttpPendingRequest) {
let raw = *cb as *const RawClosureHeader;
let closure = JsClosure::from_raw(raw);
if !closure.is_null() {
let _ = closure.call2(req_f64, res_f64);
with_implicit_this(server_this, || {
let _ = closure.call2(req_f64, res_f64);
});
}
js_promise_run_microtasks();
}
Expand All @@ -862,7 +858,9 @@ pub(crate) fn process_pending_h2(pending: HttpPendingRequest) {
let raw = pending.handler as *const RawClosureHeader;
let closure = JsClosure::from_raw(raw);
if !closure.is_null() {
let _ = closure.call2(req_f64, res_f64);
with_implicit_this(server_this, || {
let _ = closure.call2(req_f64, res_f64);
});
}
js_promise_run_microtasks();
}
Expand Down
34 changes: 16 additions & 18 deletions crates/perry-ext-http-server/src/https_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,22 +247,13 @@ pub unsafe extern "C" fn js_node_https_server_listen(server_handle: i64, args_ar
});
});

// Bind `this` to the server for the `'listening'` listeners + the
// optional `cb`, mirroring Node so `this.address().port` resolves
// inside the listen callback (#2132).
let this_val = handle_to_pointer_f64(server_handle);
let listening_listeners = get_handle::<HttpsServer>(server_handle)
.and_then(|s| s.base.listeners.get("listening").cloned())
.unwrap_or_default();
with_implicit_this(this_val, || emit_no_arg_to_listeners(&listening_listeners));
if callback != 0 {
let raw = callback as *const RawClosureHeader;
let closure = JsClosure::from_raw(raw);
if !closure.is_null() {
with_implicit_this(this_val, || {
let _ = closure.call0();
});
}
// #4903 — queue the `'listening'` emit + the optional `cb` for the
// main-thread pump instead of firing synchronously; Node emits
// `'listening'` on a later tick, after `const server = ...` has been
// assigned. The pump binds `this` to the server when it fires them
// (#2132). See `server::drain_deferred_listen_for`.
if let Some(s) = get_handle_mut::<HttpsServer>(server_handle) {
crate::server::queue_deferred_listening_emit(&mut s.base, callback);
}

// Closes #604 — `listen()` is now non-blocking. Pending requests
Expand Down Expand Up @@ -371,6 +362,9 @@ pub(crate) fn try_recv_pending_https_nonblocking(server_handle: i64) -> Option<H
pub(crate) fn process_pending_https(pending: HttpPendingRequest) {
let req_f64 = handle_to_pointer_f64(pending.request_handle);
let res_f64 = handle_to_pointer_f64(pending.response_handle);
// #4903 — Node invokes `'request'` listeners (and the `createServer`
// handler, which is one) with `this` bound to the server.
let server_this = handle_to_pointer_f64(pending.server_handle);
for cb in &pending.request_listeners {
if *cb == 0 {
continue;
Expand All @@ -379,7 +373,9 @@ pub(crate) fn process_pending_https(pending: HttpPendingRequest) {
let raw = *cb as *const RawClosureHeader;
let closure = JsClosure::from_raw(raw);
if !closure.is_null() {
let _ = closure.call2(req_f64, res_f64);
with_implicit_this(server_this, || {
let _ = closure.call2(req_f64, res_f64);
});
}
js_promise_run_microtasks();
}
Expand All @@ -389,7 +385,9 @@ pub(crate) fn process_pending_https(pending: HttpPendingRequest) {
let raw = pending.handler as *const RawClosureHeader;
let closure = JsClosure::from_raw(raw);
if !closure.is_null() {
let _ = closure.call2(req_f64, res_f64);
with_implicit_this(server_this, || {
let _ = closure.call2(req_f64, res_f64);
});
}
js_promise_run_microtasks();
}
Expand Down
6 changes: 6 additions & 0 deletions crates/perry-ext-http-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,12 @@ fn scan_http_server_roots(visitor: &mut GcRootVisitor<'_>) {
fn scan_base_server_roots(server: &mut HttpServer, visitor: &mut GcRootVisitor<'_>) {
visitor.visit_i64_slot(&mut server.handler);
scan_listener_roots(&mut server.listeners, visitor);
// #4903 — listen callbacks queued for the deferred `'listening'`
// emit; a GC between `listen()` and the pump tick must not sweep
// them.
for cb in server.deferred_listen_cbs.iter_mut() {
visitor.visit_i64_slot(cb);
}
}

iter_handles_of_mut::<HttpServer, _>(|s| {
Expand Down
148 changes: 127 additions & 21 deletions crates/perry-ext-http-server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,22 @@ pub struct HttpServer {
pub bound_host: String,
/// True between `.listen()` and `.close()`.
pub listening: bool,
/// #4903 — `'listening'` emit pending: set by `.listen()` after the
/// synchronous bind, consumed by the main-thread pump which fires the
/// `'listening'` listeners on the next tick. Node never emits
/// `'listening'` synchronously from inside `listen()`, so
/// `const server = createServer().listen(0, cb)` must see `server`
/// assigned (and late `.on('listening', ...)` registrations) before
/// any callback runs.
pub pending_listening_emit: bool,
/// #4903 — `listen(port, cb)` callbacks for the pending deferred emit.
/// Node registers the listen callback as a *once* `'listening'`
/// listener, so `listen()` also appends it to `listeners["listening"]`
/// (preserving Node's emit order vs. listeners added before/after
/// `listen()`); this list is what the pump removes from the live
/// listener list after the emit fires. Raw closure pointers; rooted
/// by the GC scanner in lib.rs.
pub deferred_listen_cbs: Vec<i64>,
/// Sent by `.close()` to wake the accept loop.
pub shutdown_tx: Option<oneshot::Sender<()>>,
/// Channel main thread drains in the event loop. Hyper service
Expand Down Expand Up @@ -102,6 +118,8 @@ impl HttpServer {
bound_port: 0,
bound_host: String::new(),
listening: false,
pending_listening_emit: false,
deferred_listen_cbs: Vec::new(),
shutdown_tx: None,
request_rx: None,
upgrade_rx: None,
Expand Down Expand Up @@ -472,25 +490,16 @@ pub unsafe extern "C" fn js_node_http_server_listen(server_handle: i64, args_arr
});
});

// Fire `'listening'` listeners + the optional `cb` argument. Node
// invokes both with `this` bound to the server, so the canonical
// `server.listen(0, function() { this.address().port })` idiom works
// (#2132). Set the implicit-`this` cell to the server's JS value
// (POINTER_TAG-boxed handle, identical to what `createServer`
// returned) for the duration of each callback, then restore.
let this_val = handle_to_pointer_f64(server_handle);
let listening_listeners = get_handle::<HttpServer>(server_handle)
.and_then(|s| s.listeners.get("listening").cloned())
.unwrap_or_default();
with_implicit_this(this_val, || emit_no_arg_to_listeners(&listening_listeners));
if callback != 0 {
let raw = callback as *const RawClosureHeader;
let closure = JsClosure::from_raw(raw);
if !closure.is_null() {
with_implicit_this(this_val, || {
let _ = closure.call0();
});
}
// #4903 — queue the `'listening'` emit + the optional `cb` argument for
// the main-thread pump instead of firing them synchronously. Node emits
// `'listening'` on a later event-loop tick, after the current synchronous
// script segment finishes; firing inline ran the callback before
// `const server = http.createServer().listen(0, cb)` had assigned
// `server`, so `server.address()` inside the callback threw
// "Cannot read properties of undefined". The pump fires both with
// `this` bound to the server (#2132), via `drain_deferred_listen_events`.
if let Some(s) = get_handle_mut::<HttpServer>(server_handle) {
queue_deferred_listening_emit(s, callback);
}

// Closes #604 — `listen()` is now non-blocking. The accept loop is
Expand Down Expand Up @@ -1024,6 +1033,79 @@ pub(crate) fn finalize_or_park_request(pending: &HttpPendingRequest) {
/// `(req, res) => res.end(...)` shape that the load-bearing #604
/// fixture uses works without this — the response oneshot fires
/// synchronously from inside `js_node_http_res_end`.
/// #4903 — record a pending `'listening'` emit on a server (http / https /
/// http2 all share the `HttpServer` base). Node registers the
/// `listen(port, cb)` callback as a *once* `'listening'` listener inside
/// `listen()`, so the callback goes into the live listener list (correct
/// emit order vs. listeners added before/after `listen()`) and into
/// `deferred_listen_cbs`, which the pump uses to remove it again after
/// the emit fires.
pub(crate) fn queue_deferred_listening_emit(s: &mut HttpServer, callback: i64) {
s.pending_listening_emit = true;
if callback != 0 {
s.listeners
.entry("listening".to_string())
.or_default()
.push(callback);
s.deferred_listen_cbs.push(callback);
}
}

/// #4903 — fire a server's queued `'listening'` listeners + `listen(cb)`
/// callbacks with implicit `this` bound to the server. Runs from the
/// main-thread pump, never from inside `listen()` itself: Node emits
/// `'listening'` on a later event-loop tick, so the listen callback only
/// runs after the current synchronous script segment (including the
/// `const server = ...` assignment) has finished, and `'listening'`
/// listeners registered after `listen()` returned still fire. The
/// listener snapshot is taken here at drain time for that same reason,
/// and the queue is detached (`mem::take`) before any callback runs so
/// a re-entrant `listen()` from a callback can't double-fire.
pub(crate) fn drain_deferred_listen_for<T, F>(server_handle: i64, base_of: F) -> i32
where
T: Send + Sync + 'static,
F: FnOnce(&mut T) -> &mut HttpServer,
{
let cbs: Vec<i64> = match get_handle_mut::<T>(server_handle) {
Some(t) => {
let s = base_of(t);
if !std::mem::take(&mut s.pending_listening_emit) {
return 0;
}
let snapshot = s.listeners.get("listening").cloned().unwrap_or_default();
// The `listen(port, cb)` callbacks are once-listeners: now that
// this emit has snapshotted them, drop them from the live list
// so a future emit / listener introspection doesn't see them.
let once: Vec<i64> = std::mem::take(&mut s.deferred_listen_cbs);
if let Some(ls) = s.listeners.get_mut("listening") {
for cb in &once {
if let Some(pos) = ls.iter().position(|x| x == cb) {
ls.remove(pos);
}
}
}
snapshot
}
None => return 0,
};
let this_val = handle_to_pointer_f64(server_handle);
let mut fired = 0i32;
for cb in cbs {
if cb == 0 {
continue;
}
let raw = cb as *const RawClosureHeader;
let closure = unsafe { JsClosure::from_raw(raw) };
if !closure.is_null() {
with_implicit_this(this_val, || {
let _ = unsafe { closure.call0() };
});
fired += 1;
}
}
fired
}

#[no_mangle]
pub extern "C" fn js_node_http_server_process_pending() -> i32 {
let mut count = 0i32;
Expand All @@ -1038,6 +1120,10 @@ pub extern "C" fn js_node_http_server_process_pending() -> i32 {
let mut http_handles: Vec<i64> = Vec::new();
perry_ffi::iter_handle_ids_of::<HttpServer, _>(|id| http_handles.push(id));
for h in http_handles {
// #4903 — fire the deferred `'listening'` emit + listen callbacks
// before draining requests: the listen callback is usually what
// kicks off the client request in the first place.
count += drain_deferred_listen_for::<HttpServer, _>(h, |s| s);
// Drain upgrades first so they don't get starved by a busy
// request stream.
while let Some(up) = try_recv_upgrade(h) {
Expand All @@ -1060,6 +1146,8 @@ pub extern "C" fn js_node_http_server_process_pending() -> i32 {
https_handles.push(id)
});
for h in https_handles {
count +=
drain_deferred_listen_for::<crate::https_server::HttpsServer, _>(h, |s| &mut s.base);
while let Some(p) = crate::https_server::try_recv_pending_https_nonblocking(h) {
crate::https_server::process_pending_https(p);
count += 1;
Expand All @@ -1071,6 +1159,9 @@ pub extern "C" fn js_node_http_server_process_pending() -> i32 {
h2_handles.push(id)
});
for h in h2_handles {
count += drain_deferred_listen_for::<crate::http2_server::Http2SecureServer, _>(h, |s| {
&mut s.base
});
count += crate::http2_server::process_pending_h2_events();
while let Some(p) = crate::http2_server::try_recv_pending_h2_nonblocking(h) {
crate::http2_server::process_pending_h2(p);
Expand All @@ -1087,6 +1178,11 @@ fn server_is_active(s: &HttpServer) -> bool {
if s.listening {
return true;
}
// #4903 — a queued `'listening'` emit / listen callback must keep the
// loop alive until the pump fires it, even if `close()` already ran.
if s.pending_listening_emit || !s.deferred_listen_cbs.is_empty() {
return true;
}
// Even if the user has called close(), the channels may still
// hold queued items the pump needs to drain on a subsequent tick
// before the program can exit cleanly.
Expand Down Expand Up @@ -1139,6 +1235,10 @@ fn process_pending(pending: HttpPendingRequest) {
let res_f64 = handle_to_pointer_f64(pending.response_handle);

// Fire `'request'` listeners (Node's `server.on('request', ...)`).
// Node's emitter invokes them with `this` bound to the server, so the
// `function (req, res) { this.address().port }` handler idiom works
// (#4903). Bind for the synchronous call only — microtasks run outside.
let server_this = handle_to_pointer_f64(pending.server_handle);
for cb in &pending.request_listeners {
if *cb == 0 {
continue;
Expand All @@ -1147,7 +1247,9 @@ fn process_pending(pending: HttpPendingRequest) {
let raw = *cb as *const RawClosureHeader;
let closure = JsClosure::from_raw(raw);
if !closure.is_null() {
let _ = closure.call2(req_f64, res_f64);
with_implicit_this(server_this, || {
let _ = closure.call2(req_f64, res_f64);
});
}
js_promise_run_microtasks();
}
Expand All @@ -1167,7 +1269,11 @@ fn process_pending(pending: HttpPendingRequest) {
let raw = pending.handler as *const RawClosureHeader;
let closure = JsClosure::from_raw(raw);
if !closure.is_null() {
let _ = closure.call2(req_f64, res_f64);
// `createServer(handler)` registers `handler` as a
// `'request'` listener — same `this` = server binding.
with_implicit_this(server_this, || {
let _ = closure.call2(req_f64, res_f64);
});
}
js_promise_run_microtasks();
}
Expand Down
Loading
Loading