diff --git a/crates/perry-ext-http-server/src/http2_server.rs b/crates/perry-ext-http-server/src/http2_server.rs index ec45ba2ae5..88bf510a0b 100644 --- a/crates/perry-ext-http-server/src/http2_server.rs +++ b/crates/perry-ext-http-server/src/http2_server.rs @@ -688,22 +688,13 @@ pub unsafe extern "C" fn js_node_http2_server_listen(server_handle: i64, args_ar }); }); - // Bind `this` to the server for the `'listening'` listeners + the - // optional `cb` so `this.address().port` resolves inside the listen - // callback, matching Node (#2132). - let this_val = handle_to_pointer_f64(server_handle); - let listening_listeners = get_handle::(server_handle) - .and_then(|s| s.base.listeners.get("listening").cloned()) - .unwrap_or_default(); - with_implicit_this(this_val, || emit_no_arg_to_listeners(&listening_listeners)); - if callback != 0 { - let raw = callback as *const RawClosureHeader; - let closure = JsClosure::from_raw(raw); - if !closure.is_null() { - with_implicit_this(this_val, || { - let _ = closure.call0(); - }); - } + // #4903 — queue the `'listening'` emit + the optional `cb` for the + // main-thread pump instead of firing synchronously; Node emits + // `'listening'` on a later tick, after `const server = ...` has been + // assigned. The pump binds `this` to the server when it fires them + // (#2132). See `server::drain_deferred_listen_for`. + if let Some(s) = get_handle_mut::(server_handle) { + crate::server::queue_deferred_listening_emit(&mut s.base, callback); } // Closes #604 — `listen()` is now non-blocking; the unified @@ -844,6 +835,9 @@ pub(crate) fn try_recv_pending_h2_nonblocking(server_handle: i64) -> Option(server_handle) - .and_then(|s| s.base.listeners.get("listening").cloned()) - .unwrap_or_default(); - with_implicit_this(this_val, || emit_no_arg_to_listeners(&listening_listeners)); - if callback != 0 { - let raw = callback as *const RawClosureHeader; - let closure = JsClosure::from_raw(raw); - if !closure.is_null() { - with_implicit_this(this_val, || { - let _ = closure.call0(); - }); - } + // #4903 — queue the `'listening'` emit + the optional `cb` for the + // main-thread pump instead of firing synchronously; Node emits + // `'listening'` on a later tick, after `const server = ...` has been + // assigned. The pump binds `this` to the server when it fires them + // (#2132). See `server::drain_deferred_listen_for`. + if let Some(s) = get_handle_mut::(server_handle) { + crate::server::queue_deferred_listening_emit(&mut s.base, callback); } // Closes #604 — `listen()` is now non-blocking. Pending requests @@ -371,6 +362,9 @@ pub(crate) fn try_recv_pending_https_nonblocking(server_handle: i64) -> Option) { fn scan_base_server_roots(server: &mut HttpServer, visitor: &mut GcRootVisitor<'_>) { visitor.visit_i64_slot(&mut server.handler); scan_listener_roots(&mut server.listeners, visitor); + // #4903 — listen callbacks queued for the deferred `'listening'` + // emit; a GC between `listen()` and the pump tick must not sweep + // them. + for cb in server.deferred_listen_cbs.iter_mut() { + visitor.visit_i64_slot(cb); + } } iter_handles_of_mut::(|s| { diff --git a/crates/perry-ext-http-server/src/server.rs b/crates/perry-ext-http-server/src/server.rs index 93d3818840..5f4ae9affb 100644 --- a/crates/perry-ext-http-server/src/server.rs +++ b/crates/perry-ext-http-server/src/server.rs @@ -51,6 +51,22 @@ pub struct HttpServer { pub bound_host: String, /// True between `.listen()` and `.close()`. pub listening: bool, + /// #4903 — `'listening'` emit pending: set by `.listen()` after the + /// synchronous bind, consumed by the main-thread pump which fires the + /// `'listening'` listeners on the next tick. Node never emits + /// `'listening'` synchronously from inside `listen()`, so + /// `const server = createServer().listen(0, cb)` must see `server` + /// assigned (and late `.on('listening', ...)` registrations) before + /// any callback runs. + pub pending_listening_emit: bool, + /// #4903 — `listen(port, cb)` callbacks for the pending deferred emit. + /// Node registers the listen callback as a *once* `'listening'` + /// listener, so `listen()` also appends it to `listeners["listening"]` + /// (preserving Node's emit order vs. listeners added before/after + /// `listen()`); this list is what the pump removes from the live + /// listener list after the emit fires. Raw closure pointers; rooted + /// by the GC scanner in lib.rs. + pub deferred_listen_cbs: Vec, /// Sent by `.close()` to wake the accept loop. pub shutdown_tx: Option>, /// Channel main thread drains in the event loop. Hyper service @@ -102,6 +118,8 @@ impl HttpServer { bound_port: 0, bound_host: String::new(), listening: false, + pending_listening_emit: false, + deferred_listen_cbs: Vec::new(), shutdown_tx: None, request_rx: None, upgrade_rx: None, @@ -472,25 +490,16 @@ pub unsafe extern "C" fn js_node_http_server_listen(server_handle: i64, args_arr }); }); - // Fire `'listening'` listeners + the optional `cb` argument. Node - // invokes both with `this` bound to the server, so the canonical - // `server.listen(0, function() { this.address().port })` idiom works - // (#2132). Set the implicit-`this` cell to the server's JS value - // (POINTER_TAG-boxed handle, identical to what `createServer` - // returned) for the duration of each callback, then restore. - let this_val = handle_to_pointer_f64(server_handle); - let listening_listeners = get_handle::(server_handle) - .and_then(|s| s.listeners.get("listening").cloned()) - .unwrap_or_default(); - with_implicit_this(this_val, || emit_no_arg_to_listeners(&listening_listeners)); - if callback != 0 { - let raw = callback as *const RawClosureHeader; - let closure = JsClosure::from_raw(raw); - if !closure.is_null() { - with_implicit_this(this_val, || { - let _ = closure.call0(); - }); - } + // #4903 — queue the `'listening'` emit + the optional `cb` argument for + // the main-thread pump instead of firing them synchronously. Node emits + // `'listening'` on a later event-loop tick, after the current synchronous + // script segment finishes; firing inline ran the callback before + // `const server = http.createServer().listen(0, cb)` had assigned + // `server`, so `server.address()` inside the callback threw + // "Cannot read properties of undefined". The pump fires both with + // `this` bound to the server (#2132), via `drain_deferred_listen_events`. + if let Some(s) = get_handle_mut::(server_handle) { + queue_deferred_listening_emit(s, callback); } // Closes #604 — `listen()` is now non-blocking. The accept loop is @@ -1024,6 +1033,79 @@ pub(crate) fn finalize_or_park_request(pending: &HttpPendingRequest) { /// `(req, res) => res.end(...)` shape that the load-bearing #604 /// fixture uses works without this — the response oneshot fires /// synchronously from inside `js_node_http_res_end`. +/// #4903 — record a pending `'listening'` emit on a server (http / https / +/// http2 all share the `HttpServer` base). Node registers the +/// `listen(port, cb)` callback as a *once* `'listening'` listener inside +/// `listen()`, so the callback goes into the live listener list (correct +/// emit order vs. listeners added before/after `listen()`) and into +/// `deferred_listen_cbs`, which the pump uses to remove it again after +/// the emit fires. +pub(crate) fn queue_deferred_listening_emit(s: &mut HttpServer, callback: i64) { + s.pending_listening_emit = true; + if callback != 0 { + s.listeners + .entry("listening".to_string()) + .or_default() + .push(callback); + s.deferred_listen_cbs.push(callback); + } +} + +/// #4903 — fire a server's queued `'listening'` listeners + `listen(cb)` +/// callbacks with implicit `this` bound to the server. Runs from the +/// main-thread pump, never from inside `listen()` itself: Node emits +/// `'listening'` on a later event-loop tick, so the listen callback only +/// runs after the current synchronous script segment (including the +/// `const server = ...` assignment) has finished, and `'listening'` +/// listeners registered after `listen()` returned still fire. The +/// listener snapshot is taken here at drain time for that same reason, +/// and the queue is detached (`mem::take`) before any callback runs so +/// a re-entrant `listen()` from a callback can't double-fire. +pub(crate) fn drain_deferred_listen_for(server_handle: i64, base_of: F) -> i32 +where + T: Send + Sync + 'static, + F: FnOnce(&mut T) -> &mut HttpServer, +{ + let cbs: Vec = match get_handle_mut::(server_handle) { + Some(t) => { + let s = base_of(t); + if !std::mem::take(&mut s.pending_listening_emit) { + return 0; + } + let snapshot = s.listeners.get("listening").cloned().unwrap_or_default(); + // The `listen(port, cb)` callbacks are once-listeners: now that + // this emit has snapshotted them, drop them from the live list + // so a future emit / listener introspection doesn't see them. + let once: Vec = std::mem::take(&mut s.deferred_listen_cbs); + if let Some(ls) = s.listeners.get_mut("listening") { + for cb in &once { + if let Some(pos) = ls.iter().position(|x| x == cb) { + ls.remove(pos); + } + } + } + snapshot + } + None => return 0, + }; + let this_val = handle_to_pointer_f64(server_handle); + let mut fired = 0i32; + for cb in cbs { + if cb == 0 { + continue; + } + let raw = cb as *const RawClosureHeader; + let closure = unsafe { JsClosure::from_raw(raw) }; + if !closure.is_null() { + with_implicit_this(this_val, || { + let _ = unsafe { closure.call0() }; + }); + fired += 1; + } + } + fired +} + #[no_mangle] pub extern "C" fn js_node_http_server_process_pending() -> i32 { let mut count = 0i32; @@ -1038,6 +1120,10 @@ pub extern "C" fn js_node_http_server_process_pending() -> i32 { let mut http_handles: Vec = Vec::new(); perry_ffi::iter_handle_ids_of::(|id| http_handles.push(id)); for h in http_handles { + // #4903 — fire the deferred `'listening'` emit + listen callbacks + // before draining requests: the listen callback is usually what + // kicks off the client request in the first place. + count += drain_deferred_listen_for::(h, |s| s); // Drain upgrades first so they don't get starved by a busy // request stream. while let Some(up) = try_recv_upgrade(h) { @@ -1060,6 +1146,8 @@ pub extern "C" fn js_node_http_server_process_pending() -> i32 { https_handles.push(id) }); for h in https_handles { + count += + drain_deferred_listen_for::(h, |s| &mut s.base); while let Some(p) = crate::https_server::try_recv_pending_https_nonblocking(h) { crate::https_server::process_pending_https(p); count += 1; @@ -1071,6 +1159,9 @@ pub extern "C" fn js_node_http_server_process_pending() -> i32 { h2_handles.push(id) }); for h in h2_handles { + count += drain_deferred_listen_for::(h, |s| { + &mut s.base + }); count += crate::http2_server::process_pending_h2_events(); while let Some(p) = crate::http2_server::try_recv_pending_h2_nonblocking(h) { crate::http2_server::process_pending_h2(p); @@ -1087,6 +1178,11 @@ fn server_is_active(s: &HttpServer) -> bool { if s.listening { return true; } + // #4903 — a queued `'listening'` emit / listen callback must keep the + // loop alive until the pump fires it, even if `close()` already ran. + if s.pending_listening_emit || !s.deferred_listen_cbs.is_empty() { + return true; + } // Even if the user has called close(), the channels may still // hold queued items the pump needs to drain on a subsequent tick // before the program can exit cleanly. @@ -1139,6 +1235,10 @@ fn process_pending(pending: HttpPendingRequest) { let res_f64 = handle_to_pointer_f64(pending.response_handle); // Fire `'request'` listeners (Node's `server.on('request', ...)`). + // Node's emitter invokes them with `this` bound to the server, so the + // `function (req, res) { this.address().port }` handler idiom works + // (#4903). Bind for the synchronous call only — microtasks run outside. + let server_this = handle_to_pointer_f64(pending.server_handle); for cb in &pending.request_listeners { if *cb == 0 { continue; @@ -1147,7 +1247,9 @@ fn process_pending(pending: HttpPendingRequest) { let raw = *cb as *const RawClosureHeader; let closure = JsClosure::from_raw(raw); if !closure.is_null() { - let _ = closure.call2(req_f64, res_f64); + with_implicit_this(server_this, || { + let _ = closure.call2(req_f64, res_f64); + }); } js_promise_run_microtasks(); } @@ -1167,7 +1269,11 @@ fn process_pending(pending: HttpPendingRequest) { let raw = pending.handler as *const RawClosureHeader; let closure = JsClosure::from_raw(raw); if !closure.is_null() { - let _ = closure.call2(req_f64, res_f64); + // `createServer(handler)` registers `handler` as a + // `'request'` listener — same `this` = server binding. + with_implicit_this(server_this, || { + let _ = closure.call2(req_f64, res_f64); + }); } js_promise_run_microtasks(); } diff --git a/crates/perry/tests/issue_4903_listen_callback_deferred.rs b/crates/perry/tests/issue_4903_listen_callback_deferred.rs new file mode 100644 index 0000000000..c0269585fe --- /dev/null +++ b/crates/perry/tests/issue_4903_listen_callback_deferred.rs @@ -0,0 +1,130 @@ +//! Regression test for #4903: the `listen(port, cb)` callback (and the +//! `'listening'` emit) must fire on a later event-loop tick, after the +//! current synchronous script segment finishes — never synchronously from +//! inside `listen()`. Pre-fix, the canonical Node corpus shape +//! `const server = http.createServer().listen(0, cb)` ran `cb` before +//! `server` was assigned, so `server.address().port` threw +//! "Cannot read properties of undefined (reading 'address')". +//! +//! Also covers the sibling `this`-binding half of the ticket: Node invokes +//! `'listening'` listeners, the listen callback, and `'request'` listeners +//! (including the `createServer(handler)` handler) with `this` bound to the +//! server, so `this.address().port` works inside both. + +use std::path::PathBuf; +use std::process::Command; + +fn perry_bin() -> PathBuf { + PathBuf::from(env!("CARGO_BIN_EXE_perry")) +} + +fn compile_and_run(dir: &std::path::Path, source: &str) -> String { + let entry = dir.join("main.ts"); + let output = dir.join("main_bin"); + std::fs::write(&entry, source).expect("write entry"); + + let compile = Command::new(perry_bin()) + .current_dir(dir) + .arg("compile") + .arg(&entry) + .arg("-o") + .arg(&output) + .output() + .expect("run perry compile"); + assert!( + compile.status.success(), + "perry compile failed\nstdout:\n{}\nstderr:\n{}", + String::from_utf8_lossy(&compile.stdout), + String::from_utf8_lossy(&compile.stderr) + ); + + let run = Command::new(&output) + .current_dir(dir) + .output() + .expect("run compiled binary"); + assert!( + run.status.success(), + "compiled binary failed\nstatus: {:?}\nstdout:\n{}\nstderr:\n{}", + run.status, + String::from_utf8_lossy(&run.stdout), + String::from_utf8_lossy(&run.stderr) + ); + String::from_utf8_lossy(&run.stdout).into_owned() +} + +/// The chained corpus shape: the listen callback must see `server` already +/// assigned (deferred emit), `server.address()` must return a real +/// `{ address, family, port }`, and a full request round-trip must work with +/// `this` bound to the server inside the request handler. +#[test] +fn listen_callback_is_deferred_and_this_bound() { + let dir = tempfile::tempdir().expect("tempdir"); + let stdout = compile_and_run( + dir.path(), + r#" +const http = require('http'); +const server = http.createServer(function (req: any, res: any) { + // #4903 — `'request'` listeners run with `this` = server. + console.log('handler-this-port-matches', (this as any).address().port === server.address().port); + res.end('ok'); +}).listen(0, function () { + // Pre-fix this line printed `undefined` (callback ran before assignment). + console.log('cb-sees-server', typeof server); + const addr = server.address(); + console.log('addr', typeof addr, addr.family, typeof addr.port, addr.port > 0); + // `this` inside the listen callback is also the server. + console.log('cb-this-port-matches', (this as any).address().port === addr.port); + http.get({ port: addr.port, path: '/' }, (res: any) => { + let body = ''; + res.on('data', (c: any) => { body += c; }); + res.on('end', () => { + console.log('body', body); + server.close(); + }); + }); +}); +// Must print BEFORE the listen callback: the emit is deferred to a later tick. +console.log('sync-after-listen'); +"#, + ); + assert_eq!( + stdout, + "sync-after-listen\n\ + cb-sees-server object\n\ + addr object IPv4 number true\n\ + cb-this-port-matches true\n\ + handler-this-port-matches true\n\ + body ok\n", + "listen callback must fire post-tick with `this` = server, and \ + server.address() must expose the bound ephemeral port" + ); +} + +/// `'listening'` listeners registered AFTER `listen()` returned must still +/// fire (the emit happens on a later tick, as in Node), and listeners +/// registered before `listen()` fire ahead of the listen callback. +#[test] +fn late_listening_listener_fires() { + let dir = tempfile::tempdir().expect("tempdir"); + let stdout = compile_and_run( + dir.path(), + r#" +const http = require('http'); +const server = http.createServer(); +server.on('listening', () => console.log('pre-listener')); +server.listen(0, () => { + console.log('listen-cb'); +}); +server.on('listening', () => { + console.log('late-listener'); + server.close(); +}); +console.log('sync-tail'); +"#, + ); + assert_eq!( + stdout, "sync-tail\npre-listener\nlisten-cb\nlate-listener\n", + "'listening' must emit once on a later tick, in Node's listener order \ + (pre-registered, listen callback, late-registered)" + ); +}