diff --git a/crates/perry-api-manifest/src/entries.rs b/crates/perry-api-manifest/src/entries.rs index 4137b23c3c..343d5fbfa4 100644 --- a/crates/perry-api-manifest/src/entries.rs +++ b/crates/perry-api-manifest/src/entries.rs @@ -5344,6 +5344,8 @@ pub static API_MANIFEST: &[ApiEntry] = &[ method("http", "__get_method", true, Some("IncomingMessage")), method("http", "__get_url", true, Some("IncomingMessage")), method("http", "__get_httpVersion", true, Some("IncomingMessage")), + method("http", "__get_httpVersionMajor", true, Some("IncomingMessage")), + method("http", "__get_httpVersionMinor", true, Some("IncomingMessage")), method("http", "__get_complete", true, Some("IncomingMessage")), method("http", "__get_aborted", true, Some("IncomingMessage")), method("http", "__get_destroyed", true, Some("IncomingMessage")), diff --git a/crates/perry-codegen/src/ext_registry.rs b/crates/perry-codegen/src/ext_registry.rs index d06aab3d0d..c86e6a98dc 100644 --- a/crates/perry-codegen/src/ext_registry.rs +++ b/crates/perry-codegen/src/ext_registry.rs @@ -224,6 +224,7 @@ const FFI_REGISTRY: &[(&str, OwnerKind)] = &[ ("js_http_client_request_abort", OwnerKind::WellKnown("http")), ("js_http_client_request_destroy", OwnerKind::WellKnown("http")), ("js_http_client_request_noop_undefined", OwnerKind::WellKnown("http")), + ("js_http_client_request_flush_headers", OwnerKind::WellKnown("http")), ("js_http_client_request_method", OwnerKind::WellKnown("http")), ("js_http_client_request_protocol", OwnerKind::WellKnown("http")), ("js_http_client_request_host", OwnerKind::WellKnown("http")), @@ -294,6 +295,8 @@ const FFI_REGISTRY: &[(&str, OwnerKind)] = &[ ("js_node_http_im_method", OwnerKind::WellKnown("http")), ("js_node_http_im_url", OwnerKind::WellKnown("http")), ("js_node_http_im_http_version", OwnerKind::WellKnown("http")), + ("js_node_http_im_http_version_major", OwnerKind::WellKnown("http")), + ("js_node_http_im_http_version_minor", OwnerKind::WellKnown("http")), ("js_node_http_im_complete", OwnerKind::WellKnown("http")), ("js_node_http_im_aborted", OwnerKind::WellKnown("http")), ("js_node_http_im_destroyed", OwnerKind::WellKnown("http")), diff --git a/crates/perry-codegen/src/lower_call/native_table/http_client.rs b/crates/perry-codegen/src/lower_call/native_table/http_client.rs index fd8841e9ad..43dba9e93d 100644 --- a/crates/perry-codegen/src/lower_call/native_table/http_client.rs +++ b/crates/perry-codegen/src/lower_call/native_table/http_client.rs @@ -239,7 +239,7 @@ pub(super) const HTTP_CLIENT_ROWS: &[NativeModSig] = &[ ), cr( "flushHeaders", - "js_http_client_request_noop_undefined", + "js_http_client_request_flush_headers", &[NA_F64, NA_F64], NR_F64, ), diff --git a/crates/perry-codegen/src/lower_call/native_table/http_server.rs b/crates/perry-codegen/src/lower_call/native_table/http_server.rs index 32be182c13..0851b7e2b0 100644 --- a/crates/perry-codegen/src/lower_call/native_table/http_server.rs +++ b/crates/perry-codegen/src/lower_call/native_table/http_server.rs @@ -721,6 +721,24 @@ pub(super) const HTTP_SERVER_ROWS: &[NativeModSig] = &[ args: &[], ret: NR_STR, }, + NativeModSig { + module: "http", + has_receiver: true, + method: "__get_httpVersionMajor", + class_filter: Some("IncomingMessage"), + runtime: "js_node_http_im_http_version_major", + args: &[], + ret: NR_F64, + }, + NativeModSig { + module: "http", + has_receiver: true, + method: "__get_httpVersionMinor", + class_filter: Some("IncomingMessage"), + runtime: "js_node_http_im_http_version_minor", + args: &[], + ret: NR_F64, + }, NativeModSig { module: "http", has_receiver: true, diff --git a/crates/perry-codegen/src/runtime_decls/stdlib_ffi_part2.rs b/crates/perry-codegen/src/runtime_decls/stdlib_ffi_part2.rs index d24c1e6d10..cee98d44d7 100644 --- a/crates/perry-codegen/src/runtime_decls/stdlib_ffi_part2.rs +++ b/crates/perry-codegen/src/runtime_decls/stdlib_ffi_part2.rs @@ -12,4 +12,13 @@ pub(crate) fn declare_stdlib_ffi_part2(module: &mut LlModule) { module.declare_function("js_node_http_server_unref", I64, &[I64]); module.declare_function("js_node_https_server_ref", I64, &[I64]); module.declare_function("js_node_https_server_unref", I64, &[I64]); + // Streaming-bodies PR — client flushHeaders early dispatch + numeric + // httpVersion halves. + module.declare_function( + "js_http_client_request_flush_headers", + DOUBLE, + &[I64, DOUBLE, DOUBLE], + ); + module.declare_function("js_node_http_im_http_version_major", DOUBLE, &[I64]); + module.declare_function("js_node_http_im_http_version_minor", DOUBLE, &[I64]); } diff --git a/crates/perry-ext-http-server/src/handle_dispatch.rs b/crates/perry-ext-http-server/src/handle_dispatch.rs index ad11571963..598db848ef 100644 --- a/crates/perry-ext-http-server/src/handle_dispatch.rs +++ b/crates/perry-ext-http-server/src/handle_dispatch.rs @@ -580,6 +580,12 @@ pub unsafe extern "C" fn js_ext_http_incoming_message_dispatch_method( "httpVersion" | "__get_httpVersion" => { string_ptr_value(js_node_http_im_http_version(handle)) } + "httpVersionMajor" | "__get_httpVersionMajor" => { + crate::request::incoming_http_version_part(handle, false) + } + "httpVersionMinor" | "__get_httpVersionMinor" => { + crate::request::incoming_http_version_part(handle, true) + } "__get_complete" => bool_value(js_node_http_im_complete(handle) != 0), "__get_aborted" => bool_value(js_node_http_im_aborted(handle) != 0), "__get_destroyed" => bool_value(js_node_http_im_destroyed(handle) != 0), @@ -850,6 +856,8 @@ pub unsafe extern "C" fn js_ext_http_incoming_message_dispatch_property( "method" => string_ptr_value(js_node_http_im_method(handle)), "url" => string_ptr_value(js_node_http_im_url(handle)), "httpVersion" => string_ptr_value(js_node_http_im_http_version(handle)), + "httpVersionMajor" => crate::request::incoming_http_version_part(handle, false), + "httpVersionMinor" => crate::request::incoming_http_version_part(handle, true), "headers" => json_string_value(js_node_http_im_headers_json(handle)), "rawHeaders" => json_string_value(js_node_http_im_raw_headers_json(handle)), "headersDistinct" => json_string_value(js_node_http_im_headers_distinct_json(handle)), diff --git a/crates/perry-ext-http-server/src/http2_server.rs b/crates/perry-ext-http-server/src/http2_server.rs index ca05eead9a..c661f079de 100644 --- a/crates/perry-ext-http-server/src/http2_server.rs +++ b/crates/perry-ext-http-server/src/http2_server.rs @@ -967,7 +967,7 @@ fn synthesize_default_h2_stream_response(stream_handle: i64) { status_message: None, headers, trailers: Vec::new(), - body: Vec::new(), + body: crate::response::ShapeBody::Full(Vec::new()), }; if let Some(tx) = stream.response_tx.take() { let _ = tx.send(shape); @@ -1987,7 +1987,7 @@ fn end_server_h2_stream(handle: i64, body: Vec) { status_message: None, headers, trailers: Vec::new(), - body, + body: crate::response::ShapeBody::Full(body), }; if let Some(tx) = stream.response_tx.take() { let _ = tx.send(shape); diff --git a/crates/perry-ext-http-server/src/request.rs b/crates/perry-ext-http-server/src/request.rs index 34e4f26363..e1872af61e 100644 --- a/crates/perry-ext-http-server/src/request.rs +++ b/crates/perry-ext-http-server/src/request.rs @@ -207,6 +207,40 @@ pub extern "C" fn js_node_http_im_http_version(handle: i64) -> *mut StringHeader alloc_string(&s).as_raw() } +/// `req.httpVersionMajor` — numeric major half of `httpVersion`. +#[no_mangle] +pub extern "C" fn js_node_http_im_http_version_major(handle: i64) -> f64 { + incoming_http_version_part(handle, false) +} + +/// `req.httpVersionMinor` — numeric minor half of `httpVersion`. +#[no_mangle] +pub extern "C" fn js_node_http_im_http_version_minor(handle: i64) -> f64 { + incoming_http_version_part(handle, true) +} + +/// `req.httpVersionMajor` / `req.httpVersionMinor` — numeric halves of +/// `httpVersion` ("1.0" → 1 / 0). +pub(crate) fn incoming_http_version_part(handle: i64, minor: bool) -> f64 { + let version = get_handle::(handle) + .map(|im| im.http_version.clone()) + .unwrap_or_else(|| "1.1".to_string()); + let mut parts = version.split('.'); + let major = parts + .next() + .and_then(|p| p.parse::().ok()) + .unwrap_or(1.0); + let minor_v = parts + .next() + .and_then(|p| p.parse::().ok()) + .unwrap_or(1.0); + if minor { + minor_v + } else { + major + } +} + /// `req.headers` — JSON-stringify the lowercase-keyed header map. /// Returned as a NaN-boxed STRING — TS-side parses with `JSON.parse` /// at the binding wrapper. (Returning a runtime ObjectHeader directly diff --git a/crates/perry-ext-http-server/src/response.rs b/crates/perry-ext-http-server/src/response.rs index 45df296df5..499a77ce97 100644 --- a/crates/perry-ext-http-server/src/response.rs +++ b/crates/perry-ext-http-server/src/response.rs @@ -92,6 +92,66 @@ struct TrailerBody { trailers: Option, } +/// One frame of a streaming response body: a data chunk from +/// `res.write(...)`, or the trailer block from `res.addTrailers` delivered +/// after the final chunk. +pub enum StreamFrame { + Data(Bytes), + Trailers(HeaderMap), +} + +/// Streaming response body — frames flow from the JS thread +/// (`res.write`/`res.end`) through an unbounded channel into hyper. The +/// channel closing (sender dropped at `.end()`) ends the body. Size hint +/// stays unknown so hyper uses chunked transfer-encoding, matching Node's +/// wire behavior for a response whose headers flush before the body is +/// complete. `in_flight` tracks bytes queued but not yet handed to hyper — +/// the JS side reads it for the `res.write()` backpressure return and the +/// `'drain'` edge. +pub struct ChannelBody { + rx: tokio::sync::mpsc::UnboundedReceiver, + in_flight: std::sync::Arc, +} + +impl Body for ChannelBody { + type Data = Bytes; + type Error = Infallible; + + fn poll_frame( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>>> { + match self.rx.poll_recv(cx) { + Poll::Ready(Some(StreamFrame::Data(b))) => { + self.in_flight + .fetch_sub(b.len(), std::sync::atomic::Ordering::AcqRel); + Poll::Ready(Some(Ok(Frame::data(b)))) + } + Poll::Ready(Some(StreamFrame::Trailers(t))) => { + Poll::Ready(Some(Ok(Frame::trailers(t)))) + } + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + } + } + + fn size_hint(&self) -> SizeHint { + SizeHint::new() + } +} + +/// The body half of a [`HyperResponseShape`]: fully buffered (the classic +/// single-shot `res.end(body)` path, which keeps Content-Length semantics) +/// or streaming (headers flushed early by `res.flushHeaders()` / +/// `res.write(...)`, body frames following over a channel). +pub enum ShapeBody { + Full(Vec), + Stream { + rx: tokio::sync::mpsc::UnboundedReceiver, + in_flight: std::sync::Arc, + }, +} + impl Body for TrailerBody { type Data = Bytes; type Error = Infallible; @@ -149,8 +209,21 @@ pub struct ServerResponse { /// Body chunks accumulated by `.write(chunk)` calls. Assembled /// + flushed when `.end()` is called. pub buffered_body: Vec, - /// One-shot back to hyper's service fn — taken on `.end()`. + /// One-shot back to hyper's service fn — taken on `.end()`, or earlier + /// by `begin_streaming` when the headers flush before the body is done. pub response_tx: Option>, + /// Live body channel once the response head has been flushed early + /// (`res.flushHeaders()` / first `res.write(...)`). `Some` means + /// streaming mode: subsequent chunks go straight to the wire and + /// `.end()` closes the channel instead of sending a buffered shape. + pub stream_tx: Option>, + /// Bytes written to the stream channel but not yet handed to hyper. + /// Backs the `res.write()` backpressure return (`false` past the HWM) + /// and the `'drain'` edge the pump emits when it sinks below it again. + pub stream_in_flight: Option>, + /// True after a streaming `res.write()` returned `false`; the pump + /// fires `'drain'` (once) when the in-flight count drops below the HWM. + pub needs_drain: bool, /// Event-name → list of registered listener closure pointers. pub listeners: HashMap>, /// #4904: true for `new http.ServerResponse(req)` instances (and any @@ -175,7 +248,7 @@ pub struct HyperResponseShape { pub status_message: Option, pub headers: Vec<(String, String)>, pub trailers: Vec<(String, String)>, - pub body: Vec, + pub body: ShapeBody, } impl HyperResponseShape { @@ -184,12 +257,30 @@ impl HyperResponseShape { pub fn into_hyper(self) -> Response { let mut builder = Response::builder().status(StatusCode::from_u16(self.status).unwrap_or(StatusCode::OK)); + // `res.statusMessage = 'Custom Message'` must reach the HTTP/1 + // status line (test-http-status-message reads it off the raw + // socket). hyper emits it via the ReasonPhrase extension. + if let Some(msg) = self.status_message.as_deref() { + if !msg.is_empty() { + if let Ok(reason) = hyper::ext::ReasonPhrase::try_from(msg.to_string()) { + if let Some(ext) = builder.extensions_mut() { + ext.insert(reason); + } + } + } + } for (k, v) in self.headers { builder = builder.header(k, v); } + let full = match self.body { + ShapeBody::Stream { rx, in_flight } => { + return builder.body(ChannelBody { rx, in_flight }.boxed()).unwrap(); + } + ShapeBody::Full(bytes) => bytes, + }; let trailers = self.trailers; let body = if trailers.is_empty() { - Full::new(Bytes::from(self.body)).boxed() + Full::new(Bytes::from(full)).boxed() } else { let mut map = HeaderMap::new(); for (name, value) in trailers { @@ -201,7 +292,7 @@ impl HyperResponseShape { } } TrailerBody { - body: Some(Bytes::from(self.body)), + body: Some(Bytes::from(full)), trailers: Some(map), } .boxed() @@ -284,6 +375,9 @@ impl ServerResponse { outgoing_message_only: false, buffered_body: Vec::new(), response_tx: Some(response_tx), + stream_tx: None, + stream_in_flight: None, + needs_drain: false, listeners: HashMap::new(), standalone: false, standalone_socket: f64::from_bits(TAG_UNDEFINED), @@ -837,15 +931,46 @@ pub unsafe extern "C" fn js_node_http_res_write_head( } } -/// `res.write(chunk)` — append to the buffered body. Returns 1 -/// (always-flushed for the buffered MVP — Node's contract is "false -/// = call drain", which we sidestep by buffering). +/// Send `bytes` over the live stream channel, charging the in-flight +/// counter. Returns `Some(below_hwm)` when the response is streaming +/// (`begin_streaming` succeeded now or earlier), `None` when it isn't — +/// the caller falls back to the legacy buffered path. +fn stream_write(handle: i64, bytes: &[u8]) -> Option { + if !begin_streaming(handle) { + return None; + } + let sr = get_handle_mut::(handle)?; + let tx = sr.stream_tx.as_ref()?; + let in_flight = sr.stream_in_flight.as_ref()?; + let queued = + in_flight.fetch_add(bytes.len(), std::sync::atomic::Ordering::AcqRel) + bytes.len(); + let _ = tx.send(StreamFrame::Data(Bytes::copy_from_slice(bytes))); + let below_hwm = queued <= DEFAULT_HIGH_WATER_MARK; + if !below_hwm { + sr.needs_drain = true; + } + Some(below_hwm) +} + +/// `res.write(chunk)` — flush the head on first write (Node's behavior) +/// and stream the chunk to the wire; falls back to buffering for handle +/// flavors that can't stream. Returns 0 (backpressure: "wait for drain") +/// once the queued-but-unsent bytes pass the HWM, else 1. #[no_mangle] pub extern "C" fn js_node_http_res_write(handle: i64, chunk: f64) -> i32 { let bytes = match jsvalue_to_body_bytes(chunk) { Some(b) => b, None => return 1, }; + let ended = get_handle::(handle) + .map(|sr| sr.writable_ended) + .unwrap_or(true); + if ended { + return 1; + } + if let Some(below_hwm) = stream_write(handle, &bytes) { + return below_hwm as i32; + } if let Some(sr) = get_handle_mut::(handle) { if !sr.writable_ended { sr.headers_sent = true; @@ -895,6 +1020,22 @@ pub extern "C" fn js_node_http_res_write_full( ) -> f64 { let callback = pick_trailing_callback(arg2, arg3); let bytes = jsvalue_to_body_bytes(chunk); + let ended = get_handle::(handle) + .map(|sr| sr.writable_ended) + .unwrap_or(true); + if ended { + return f64::from_bits(TAG_TRUE); + } + if let Some(b) = &bytes { + if let Some(below_hwm) = stream_write(handle, b) { + // Streaming: the chunk is on its way to the wire, so the write + // callback fires now rather than queueing for `.end()`. + if callback != 0 { + call_closure0(callback); + } + return f64::from_bits(if below_hwm { TAG_TRUE } else { TAG_FALSE }); + } + } let mut below_hwm = true; if let Some(sr) = get_handle_mut::(handle) { if !sr.writable_ended { @@ -1012,6 +1153,38 @@ fn finalize_buffered_end(handle: i64, chunk: f64) -> Option<(Vec, Vec) if sr.writable_ended { return None; } + + // Streaming mode: the head already went to the wire. Send the final + // chunk + trailer block as frames and close the channel — hyper ends + // the (chunked) body when the sender drops. + if let Some(tx) = sr.stream_tx.take() { + if let Some(c) = final_chunk { + if let Some(in_flight) = sr.stream_in_flight.as_ref() { + in_flight.fetch_add(c.len(), std::sync::atomic::Ordering::AcqRel); + } + let _ = tx.send(StreamFrame::Data(Bytes::from(c))); + } + let trailers = sr.snapshot_trailers(); + if !trailers.is_empty() { + let mut map = HeaderMap::new(); + for (name, value) in trailers { + if let (Ok(name), Ok(value)) = ( + HeaderName::from_bytes(name.as_bytes()), + HeaderValue::from_str(&value), + ) { + map.insert(name, value); + } + } + let _ = tx.send(StreamFrame::Trailers(map)); + } + sr.writable_ended = true; + sr.writable_finished = true; + sr.needs_drain = false; + let finish_listeners = sr.listeners.get("finish").cloned().unwrap_or_default(); + let close_listeners = sr.listeners.get("close").cloned().unwrap_or_default(); + return Some((finish_listeners, close_listeners)); + } + if let Some(c) = final_chunk { sr.buffered_body.extend_from_slice(&c); } @@ -1026,7 +1199,7 @@ fn finalize_buffered_end(handle: i64, chunk: f64) -> Option<(Vec, Vec) status_message: sr.status_message.clone(), headers, trailers, - body, + body: ShapeBody::Full(body), }; let finish_listeners = sr.listeners.get("finish").cloned().unwrap_or_default(); let close_listeners = sr.listeners.get("close").cloned().unwrap_or_default(); @@ -1048,13 +1221,107 @@ pub extern "C" fn js_node_http_res_end(handle: i64, chunk: f64) { } } +/// Flush the response head to the wire now and switch the response into +/// streaming mode: the status line + headers go back to hyper immediately +/// with a channel-backed body, and subsequent `res.write(...)` chunks flow +/// straight to the client (chunked transfer-encoding unless the handler set +/// Content-Length). This is what makes Node shapes like "send headers, keep +/// the response open, write later" (SSE, long-poll, `res.flushHeaders()`, +/// `res.write()` before an async gap) observable client-side before +/// `.end()`. +/// +/// Returns `true` when the response is in streaming mode after the call. +/// Standalone (`assignSocket`) and bare `OutgoingMessage` handles keep the +/// buffered path, as does a response whose connection already died. +pub(crate) fn begin_streaming(handle: i64) -> bool { + let Some(sr) = get_handle_mut::(handle) else { + return false; + }; + if sr.writable_ended { + return false; + } + if sr.stream_tx.is_some() { + return true; + } + if sr.standalone || sr.outgoing_message_only { + return false; + } + let receiver_alive = sr + .response_tx + .as_ref() + .map(|tx| !tx.is_closed()) + .unwrap_or(false); + if !receiver_alive { + return false; + } + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + let in_flight = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0)); + let shape = HyperResponseShape { + status: sr.status_code, + status_message: sr.status_message.clone(), + headers: sr.snapshot_headers(), + trailers: Vec::new(), + body: ShapeBody::Stream { + rx, + in_flight: in_flight.clone(), + }, + }; + sr.headers_sent = true; + let oneshot_tx = sr.response_tx.take().expect("checked above"); + if oneshot_tx.send(shape).is_err() { + return false; + } + if !sr.buffered_body.is_empty() { + let first = std::mem::take(&mut sr.buffered_body); + in_flight.fetch_add(first.len(), std::sync::atomic::Ordering::AcqRel); + let _ = tx.send(StreamFrame::Data(Bytes::from(first))); + } + sr.stream_tx = Some(tx); + sr.stream_in_flight = Some(in_flight); + true +} + +/// If a streaming response previously hit backpressure (`res.write()` +/// returned `false`) and its queued bytes have since drained below the +/// HWM, clear the flag and return its `'drain'` listeners for the caller +/// (the pump) to fire. Empty otherwise. +pub(crate) fn take_drain_listeners_if_ready(handle: i64) -> Vec { + let Some(sr) = get_handle_mut::(handle) else { + return Vec::new(); + }; + if !sr.needs_drain || sr.writable_ended { + return Vec::new(); + } + let below = sr + .stream_in_flight + .as_ref() + .map(|c| c.load(std::sync::atomic::Ordering::Acquire) <= DEFAULT_HIGH_WATER_MARK) + .unwrap_or(false); + if !below { + return Vec::new(); + } + sr.needs_drain = false; + sr.listeners.get("drain").cloned().unwrap_or_default() +} + +/// True when a streaming response's connection died under it (hyper +/// dropped the body receiver — client disconnect / server close). +pub(crate) fn stream_receiver_gone(handle: i64) -> bool { + get_handle::(handle) + .and_then(|sr| sr.stream_tx.as_ref().map(|tx| tx.is_closed())) + .unwrap_or(false) +} + /// `res.flushHeaders()` — Node sends headers immediately even before -/// any body. Phase 1 marks the response as headers-sent (our actual -/// flush is unified at `.end()` time since we buffer). +/// any body. Flushes the head to the wire and switches to the streaming +/// body path; falls back to marking headers-sent for handle flavors that +/// can't stream (standalone / bare OutgoingMessage). #[no_mangle] pub extern "C" fn js_node_http_res_flush_headers(handle: i64) { - if let Some(sr) = get_handle_mut::(handle) { - sr.headers_sent = true; + if !begin_streaming(handle) { + if let Some(sr) = get_handle_mut::(handle) { + sr.headers_sent = true; + } } } diff --git a/crates/perry-ext-http-server/src/server.rs b/crates/perry-ext-http-server/src/server.rs index 3aa3b645d0..59cce0e97e 100644 --- a/crates/perry-ext-http-server/src/server.rs +++ b/crates/perry-ext-http-server/src/server.rs @@ -995,7 +995,7 @@ async fn handle_request( Err(_) => Vec::new(), }; - let im = IncomingMessage::new( + let mut im = IncomingMessage::new( method, url, headers_lower, @@ -1004,6 +1004,14 @@ async fn handle_request( peer.ip().to_string(), peer.port(), ); + // `req.httpVersion` reflects the wire version, not a constant — an + // HTTP/1.0 request must read "1.0" (test-http-1.0 asserts all three + // httpVersion fields). + im.http_version = match http_version { + hyper::Version::HTTP_10 => "1.0".to_string(), + hyper::Version::HTTP_2 => "2.0".to_string(), + _ => "1.1".to_string(), + }; let im_handle = alloc_incoming_message(im); let (response_tx, response_rx) = oneshot::channel::(); @@ -1272,6 +1280,7 @@ fn has_in_flight_requests() -> bool { fn reap_in_flight_requests() { // (request_handle, response_handle, needs_synthesize) let mut to_finalize: Vec<(i64, i64, bool)> = Vec::new(); + let mut drain_listeners: Vec> = Vec::new(); { let mut guard = match IN_FLIGHT.lock() { Ok(g) => g, @@ -1283,14 +1292,24 @@ fn reap_in_flight_requests() { let now = Instant::now(); guard.retain(|e| { let ended = response_writable_ended(e.response_handle); + if !ended { + // Streaming backpressure cleared — fire `'drain'` (outside + // the lock) so `res.on('drain')` producer loops resume. + let ls = crate::response::take_drain_listeners_if_ready(e.response_handle); + if !ls.is_empty() { + drain_listeners.push(ls); + } + } // #4905: the per-request oneshot receiver died with its // connection task (client disconnected / closeAllConnections) // — the response can never be flushed, so don't pin the event - // loop for the rest of the grace window. + // loop for the rest of the grace window. A streaming response + // whose body receiver dropped is the same edge. let peer_gone = get_handle::(e.response_handle) .and_then(|sr| sr.response_tx.as_ref()) .map(|tx| tx.is_closed()) - .unwrap_or(false); + .unwrap_or(false) + || crate::response::stream_receiver_gone(e.response_handle); let expired = now >= e.deadline; if ended || expired || peer_gone { to_finalize.push(( @@ -1308,6 +1327,9 @@ fn reap_in_flight_requests() { } }); } + for ls in drain_listeners { + crate::request::emit_no_arg_to_listeners(&ls); + } // Finalize outside the lock — `synthesize_default_response_if_needed` // and `drop_handle` don't touch `IN_FLIGHT`, but keeping them off the // lock avoids any future re-entrancy surprise. @@ -1702,6 +1724,12 @@ pub(crate) fn synthesize_default_response_if_needed(response_handle: i64) { sr.writable_ended = true; sr.headers_sent = true; sr.writable_finished = true; + // Streaming response whose handler never called `.end()`: the + // head is already on the wire — just close the body channel. + if sr.stream_tx.take().is_some() { + sr.needs_drain = false; + return; + } let body = std::mem::take(&mut sr.buffered_body); // `snapshot_headers` expands array-valued headers (e.g. // Set-Cookie) into one entry per element so they emit a separate @@ -1717,7 +1745,7 @@ pub(crate) fn synthesize_default_response_if_needed(response_handle: i64) { status_message: sr.status_message.clone(), headers, trailers: Vec::new(), - body, + body: crate::response::ShapeBody::Full(body), }; if let Some(tx) = sr.response_tx.take() { let _ = tx.send(shape); diff --git a/crates/perry-ext-http/src/agent.rs b/crates/perry-ext-http/src/agent.rs index 6aa4d3ade6..ce022a0f17 100644 --- a/crates/perry-ext-http/src/agent.rs +++ b/crates/perry-ext-http/src/agent.rs @@ -211,7 +211,6 @@ pub(crate) fn client_for_agent(handle: Handle) -> reqwest::Client { }; let built = reqwest::Client::builder() - .user_agent(concat!("perry/", env!("CARGO_PKG_VERSION"))) .pool_max_idle_per_host(pool_max_idle) .pool_idle_timeout(idle_timeout) .tcp_keepalive(std::time::Duration::from_secs(60)) diff --git a/crates/perry-ext-http/src/client_dispatch.rs b/crates/perry-ext-http/src/client_dispatch.rs index f8f32ddef6..6c726aab3d 100644 --- a/crates/perry-ext-http/src/client_dispatch.rs +++ b/crates/perry-ext-http/src/client_dispatch.rs @@ -100,6 +100,12 @@ pub(crate) fn dispatch_request( for (k, v) in &headers { req = req.header(k.as_str(), v.as_str()); } + // Node's default agent is keep-alive (v19+) and sends the + // header explicitly; servers reading `req.headers.connection` + // expect it. + if !headers.keys().any(|k| k.eq_ignore_ascii_case("connection")) { + req = req.header("Connection", "keep-alive"); + } if let Some(ms) = timeout_ms { req = req.timeout(std::time::Duration::from_millis(ms)); } else { @@ -109,7 +115,7 @@ pub(crate) fn dispatch_request( req = req.body(body); } match req.send().await { - Ok(response) => { + Ok(mut response) => { let status = response.status().as_u16(); let status_message = response .status() @@ -122,19 +128,42 @@ pub(crate) fn dispatch_request( hdrs.push((k.to_string(), s.to_string())); } } - let body = response - .bytes() - .await - .map(|b| b.to_vec()) - .unwrap_or_default(); - push_event(PendingHttpEvent::Response { + // Streaming delivery: hand the head to the main thread + // as soon as it arrives, then pump body chunks as they + // come off the socket. Client code can react to the + // headers (timers, destroy, data listeners) while the + // server is still producing the body — Node's model. + push_event(PendingHttpEvent::ResponseHead { request_handle, status, status_message, headers: hdrs, - trailers: Vec::new(), - body, }); + loop { + match response.chunk().await { + Ok(Some(bytes)) => { + push_event(PendingHttpEvent::ResponseChunk { + request_handle, + chunk: bytes.to_vec(), + }); + } + Ok(None) => { + push_event(PendingHttpEvent::ResponseEnd { request_handle }); + break; + } + Err(e) => { + if e.is_timeout() { + push_event(PendingHttpEvent::Timeout { request_handle }); + } else { + push_event(PendingHttpEvent::Error { + request_handle, + error_message: e.to_string(), + }); + } + break; + } + } + } } Err(e) => { // #4905: surface transport deadlines as the 'timeout' diff --git a/crates/perry-ext-http/src/client_events.rs b/crates/perry-ext-http/src/client_events.rs index d18e03c3c2..aa4df821ef 100644 --- a/crates/perry-ext-http/src/client_events.rs +++ b/crates/perry-ext-http/src/client_events.rs @@ -200,6 +200,168 @@ pub(crate) unsafe fn handle_response_event( fire_request_close_once(request_handle); } +/// Drain handler for `PendingHttpEvent::ResponseHead` (streaming path): +/// build the IncomingMessage handle with an empty body, remember it on the +/// request, and fire the factory callback + `'response'` listeners. Body +/// chunks and the end edge arrive as separate events. +/// +/// # Safety +/// +/// Same listener-liveness contract as [`fire_request_event_listeners`]. +pub(crate) unsafe fn handle_response_head_event( + request_handle: Handle, + status: u16, + status_message: String, + headers: Vec<(String, String)>, +) { + // A destroyed request delivers nothing. + let destroyed = + with_handle_mut::(request_handle, |req| req.completed) + .unwrap_or(true); + if destroyed { + return; + } + + let mut headers_map = HashMap::new(); + for (k, v) in headers { + headers_map.insert(k, v); + } + let incoming = register_handle(IncomingMessageHandle { + status_code: status, + status_message, + headers: headers_map, + trailers: HashMap::new(), + body: Vec::new(), + listeners: HashMap::new(), + encoding: None, + }); + let response_callback = with_handle_mut::(request_handle, |req| { + req.incoming_handle = incoming; + req.response_callback + }) + .unwrap_or(0); + + let arg = f64::from_bits(POINTER_TAG | (incoming as u64 & PTR_MASK)); + if response_callback != 0 { + let closure = JsClosure::from_raw(response_callback as *const RawClosureHeader); + let _ = closure.call1(arg); + } + let response_listeners = get_handle_mut::(request_handle) + .and_then(|r| r.listeners.get("response").cloned()) + .unwrap_or_default(); + for cb in response_listeners { + if cb != 0 { + let closure = JsClosure::from_raw(cb as *const RawClosureHeader); + let _ = closure.call1(arg); + } + } +} + +/// Drain handler for `PendingHttpEvent::ResponseChunk`: deliver to the +/// message's `'data'` listeners, or buffer until `'end'` when none are +/// registered yet (listeners typically attach inside the response +/// callback, which has already run by the time chunks drain). +/// +/// # Safety +/// +/// Same listener-liveness contract as [`fire_request_event_listeners`]. +pub(crate) unsafe fn handle_response_chunk_event(request_handle: Handle, chunk: Vec) { + let (incoming, done) = get_handle_mut::(request_handle) + .map(|r| (r.incoming_handle, r.completed)) + .unwrap_or((0, true)); + // `completed` mid-stream means the request was destroyed — chunks + // never arrive after the end edge, so this only suppresses delivery + // into a torn-down exchange. + if incoming == 0 || done { + return; + } + let (data_listeners, encoding) = get_handle_mut::(incoming) + .map(|r| { + ( + r.listeners.get("data").cloned().unwrap_or_default(), + r.encoding.clone(), + ) + }) + .unwrap_or_default(); + if data_listeners.is_empty() { + if let Some(im) = get_handle_mut::(incoming) { + im.body.extend_from_slice(&chunk); + } + return; + } + let arg = body_chunk_value(&chunk, encoding.as_deref()); + if arg.to_bits() == TAG_UNDEFINED { + return; + } + for cb in data_listeners { + if cb != 0 { + let closure = JsClosure::from_raw(cb as *const RawClosureHeader); + let _ = closure.call1(arg); + } + } +} + +/// Drain handler for `PendingHttpEvent::ResponseEnd`: flush any buffered +/// chunks to late-registered `'data'` listeners, fire `'end'` on the +/// message, then `'close'` on the request. +/// +/// # Safety +/// +/// Same listener-liveness contract as [`fire_request_event_listeners`]. +pub(crate) unsafe fn handle_response_end_event(request_handle: Handle) { + let (incoming, was_done) = + with_handle_mut::(request_handle, |req| { + let was = req.completed; + req.completed = true; + (req.incoming_handle, was) + }) + .unwrap_or((0, true)); + // `was_done` means the request was destroyed mid-stream — the + // teardown already emitted its own error/close edges. + if incoming == 0 || was_done { + return; + } + + let (data_listeners, encoding, buffered) = get_handle_mut::(incoming) + .map(|r| { + ( + r.listeners.get("data").cloned().unwrap_or_default(), + r.encoding.clone(), + std::mem::take(&mut r.body), + ) + }) + .unwrap_or_default(); + if !data_listeners.is_empty() && !buffered.is_empty() { + let arg = body_chunk_value(&buffered, encoding.as_deref()); + if arg.to_bits() != TAG_UNDEFINED { + for cb in data_listeners { + if cb != 0 { + let closure = JsClosure::from_raw(cb as *const RawClosureHeader); + let _ = closure.call1(arg); + } + } + } + } else if !buffered.is_empty() { + // Nobody consumed the body — keep it on the handle for any + // late reader. + if let Some(im) = get_handle_mut::(incoming) { + im.body = buffered; + } + } + + let end_listeners = get_handle_mut::(incoming) + .and_then(|r| r.listeners.get("end").cloned()) + .unwrap_or_default(); + for cb in end_listeners { + if cb != 0 { + let closure = JsClosure::from_raw(cb as *const RawClosureHeader); + let _ = closure.call0(); + } + } + + fire_request_close_once(request_handle); +} + /// Drain handler for `PendingHttpEvent::Error`: `'error'` listeners then /// `'close'`, suppressed entirely once the request already completed /// (e.g. a `req.destroy()` raced the transport failure). diff --git a/crates/perry-ext-http/src/client_request_surface.rs b/crates/perry-ext-http/src/client_request_surface.rs index 7dcdce3622..2ce0e6bb64 100644 --- a/crates/perry-ext-http/src/client_request_surface.rs +++ b/crates/perry-ext-http/src/client_request_surface.rs @@ -262,10 +262,24 @@ pub extern "C" fn js_http_client_request_get_headers(handle: Handle) -> f64 { #[no_mangle] pub extern "C" fn js_http_client_request_abort(handle: Handle) -> f64 { if is_client_request_handle(handle) { - with_state_mut(handle, |state| { + let already = with_state_mut(handle, |state| { + let was = state.aborted; state.aborted = true; state.destroyed = true; + was }); + if !already { + // Node: `abort()` tears the exchange down — a later `end()` + // must not dispatch, no `'error'` fires, and the (legacy) + // `'abort'` event precedes the once-only `'close'`. + with_handle_mut::(handle, |req| { + req.completed = true; + }); + unsafe { + client_events::fire_request_event_listeners(handle, "abort"); + } + client_events::fire_request_close_once(handle); + } } undefined_value() } @@ -320,6 +334,21 @@ pub extern "C" fn js_http_client_request_noop_undefined( undefined_value() } +/// Static-dispatch route for `req.flushHeaders()` — dispatch the exchange +/// now for body-less requests (Node puts the head on the wire immediately). +/// +/// # Safety +/// FFI entry; `handle` must be a live `ClientRequestHandle` id (or absent). +#[no_mangle] +pub unsafe extern "C" fn js_http_client_request_flush_headers( + handle: Handle, + _arg0: f64, + _arg1: f64, +) -> f64 { + crate::client_request_flush_headers(handle); + undefined_value() +} + #[no_mangle] pub extern "C" fn js_http_client_request_aborted(handle: Handle) -> f64 { state_bool(handle, "aborted") @@ -569,9 +598,11 @@ fn dispatch_method(handle: Handle, method: &str, args: &[f64]) -> Option { }); handle_value(handle) } - "flushHeaders" | "cork" | "uncork" | "setNoDelay" | "setSocketKeepAlive" => { + "flushHeaders" => { + unsafe { crate::client_request_flush_headers(handle) }; undefined_value() } + "cork" | "uncork" | "setNoDelay" | "setSocketKeepAlive" => undefined_value(), _ => return None, }) } diff --git a/crates/perry-ext-http/src/lib.rs b/crates/perry-ext-http/src/lib.rs index 54e68c7f9a..ea208dd5eb 100644 --- a/crates/perry-ext-http/src/lib.rs +++ b/crates/perry-ext-http/src/lib.rs @@ -60,6 +60,11 @@ mod client_request_surface; // stay under the 2000-line lint cap. mod tls_client; +// Raw-socket trailer-aware HTTP/1.1 client (`TE: trailers` bypass) + +// response parser, extracted to keep `lib.rs` under the 2000-line lint cap. +mod plain_client; +use plain_client::{dispatch_plain_http_request, parse_http_response}; + // Async reqwest dispatch (`dispatch_request` + TLS-client selection), // extracted to keep `lib.rs` under the 2000-line lint cap. mod client_dispatch; @@ -111,6 +116,25 @@ pub(crate) enum PendingHttpEvent { trailers: Vec<(String, String)>, body: Vec, }, + /// Streaming delivery (reqwest path): the response head arrived — fire + /// the `http.request` callback / `'response'` listeners now; body + /// chunks follow as [`PendingHttpEvent::ResponseChunk`]s. This is what + /// lets client code observe headers (and start timers / destroy the + /// request) while the server is still writing. + ResponseHead { + request_handle: Handle, + status: u16, + status_message: String, + headers: Vec<(String, String)>, + }, + /// One streamed body chunk following a `ResponseHead`. + ResponseChunk { + request_handle: Handle, + chunk: Vec, + }, + /// The streamed body finished — `'end'` on the message, `'close'` on + /// the request. + ResponseEnd { request_handle: Handle }, Error { request_handle: Handle, error_message: String, @@ -132,7 +156,6 @@ lazy_static! { /// session cache. Without this each request allocs a fresh /// reqwest::Client (~250 KB) and the memory never gets reused. pub(crate) static ref HTTP_CLIENT: reqwest::Client = reqwest::Client::builder() - .user_agent(concat!("perry/", env!("CARGO_PKG_VERSION"))) .pool_idle_timeout(std::time::Duration::from_secs(90)) .pool_max_idle_per_host(16) .tcp_keepalive(std::time::Duration::from_secs(60)) @@ -229,213 +252,6 @@ fn map_to_js_object(map: &HashMap) -> f64 { out } -fn expects_response_trailers(headers: &HashMap) -> bool { - headers.iter().any(|(name, value)| { - name.eq_ignore_ascii_case("te") - && value - .split(',') - .any(|part| part.trim().eq_ignore_ascii_case("trailers")) - }) -} - -pub(crate) async fn dispatch_plain_http_request( - request_handle: Handle, - method: &str, - url: &str, - headers: &HashMap, - body: &[u8], - timeout_ms: Option, -) -> Option> { - if !expects_response_trailers(headers) { - return None; - } - let parsed = match reqwest::Url::parse(url) { - Ok(u) if u.scheme() == "http" => u, - _ => return None, - }; - let host = match parsed.host_str() { - Some(h) => h.to_string(), - None => return Some(Err("missing host".to_string())), - }; - let port = parsed.port_or_known_default().unwrap_or(80); - let mut path = parsed.path().to_string(); - if path.is_empty() { - path.push('/'); - } - if let Some(q) = parsed.query() { - path.push('?'); - path.push_str(q); - } - - let fut = async { - let mut stream = tokio::net::TcpStream::connect((host.as_str(), port)).await?; - let host_header = if parsed.port().is_some() { - format!("{}:{}", host, port) - } else { - host.clone() - }; - let mut req = format!("{} {} HTTP/1.1\r\nHost: {}\r\n", method, path, host_header); - let mut has_content_length = false; - for (k, v) in headers { - if k.eq_ignore_ascii_case("content-length") { - has_content_length = true; - } - if k.eq_ignore_ascii_case("connection") { - // The raw trailer-aware path reads until EOF after the final - // chunk/trailer block. Force close here so an explicit - // `Connection: keep-alive` cannot hang until timeout. - continue; - } - req.push_str(k); - req.push_str(": "); - req.push_str(v); - req.push_str("\r\n"); - } - req.push_str("Connection: close\r\n"); - if !body.is_empty() && !has_content_length { - req.push_str(&format!("Content-Length: {}\r\n", body.len())); - } - req.push_str("\r\n"); - stream.write_all(req.as_bytes()).await?; - if !body.is_empty() { - stream.write_all(body).await?; - } - - let mut raw = Vec::new(); - stream.read_to_end(&mut raw).await?; - Ok::, std::io::Error>(raw) - }; - - let raw = match timeout_ms { - Some(ms) => match tokio::time::timeout(std::time::Duration::from_millis(ms), fut).await { - Ok(r) => r, - Err(_) => return Some(Err("request timed out".to_string())), - }, - None => match tokio::time::timeout(std::time::Duration::from_secs(30), fut).await { - Ok(r) => r, - Err(_) => return Some(Err("request timed out".to_string())), - }, - }; - let raw = match raw { - Ok(r) => r, - Err(e) => return Some(Err(e.to_string())), - }; - - match parse_http_response(&raw) { - Ok(parsed) => { - push_event(PendingHttpEvent::Response { - request_handle, - status: parsed.status, - status_message: parsed.status_message, - headers: parsed.headers, - trailers: parsed.trailers, - body: parsed.body, - }); - Some(Ok(())) - } - Err(e) => Some(Err(e)), - } -} - -/// A parsed HTTP/1.1 response message (status line + headers + decoded body -/// + trailers). Produced by [`parse_http_response`]. -struct ParsedHttpResponse { - status: u16, - status_message: String, - headers: Vec<(String, String)>, - trailers: Vec<(String, String)>, - body: Vec, -} - -/// Parse a raw HTTP/1.1 response (the bytes read off a socket) into status / -/// headers / decoded body / trailers. Decodes `Transfer-Encoding: chunked` -/// (including a trailer block) and honors `Content-Length`; with neither it -/// treats the remainder as the body (read-until-EOF transports). Shared by -/// the trailer-aware reqwest-bypass path ([`dispatch_plain_http_request`]) -/// and the #2154 `agent.createConnection` socket path -/// ([`dispatch_request_over_socket`]). -fn parse_http_response(raw: &[u8]) -> Result { - let Some(header_end) = raw.windows(4).position(|w| w == b"\r\n\r\n") else { - return Err("invalid HTTP response".to_string()); - }; - let head = String::from_utf8_lossy(&raw[..header_end]); - let mut lines = head.split("\r\n"); - let status_line = lines.next().unwrap_or_default(); - let mut status_parts = status_line.splitn(3, ' '); - let _version = status_parts.next(); - let status = status_parts - .next() - .and_then(|s| s.parse::().ok()) - .unwrap_or(0); - let status_message = status_parts.next().unwrap_or("").to_string(); - let mut hdrs = Vec::new(); - let mut is_chunked = false; - let mut content_length: Option = None; - for line in lines { - if let Some((name, value)) = line.split_once(':') { - let name = name.trim().to_ascii_lowercase(); - let value = value.trim().to_string(); - if name == "transfer-encoding" && value.to_ascii_lowercase().contains("chunked") { - is_chunked = true; - } - if name == "content-length" { - content_length = value.parse::().ok(); - } - hdrs.push((name, value)); - } - } - let payload = &raw[header_end + 4..]; - let mut decoded = Vec::new(); - let mut trailers = Vec::new(); - if is_chunked { - let mut pos = 0; - while pos < payload.len() { - let Some(line_end_rel) = payload[pos..].windows(2).position(|w| w == b"\r\n") else { - break; - }; - let line_end = pos + line_end_rel; - let size_line = String::from_utf8_lossy(&payload[pos..line_end]); - let size_hex = size_line.split(';').next().unwrap_or("").trim(); - let size = usize::from_str_radix(size_hex, 16).unwrap_or(0); - pos = line_end + 2; - if size == 0 { - if pos <= payload.len() { - let rest = &payload[pos..]; - let trailer_end = rest - .windows(4) - .position(|w| w == b"\r\n\r\n") - .unwrap_or(rest.len()); - let trailer_text = String::from_utf8_lossy(&rest[..trailer_end]); - for line in trailer_text.split("\r\n") { - if let Some((name, value)) = line.split_once(':') { - trailers - .push((name.trim().to_ascii_lowercase(), value.trim().to_string())); - } - } - } - break; - } - if pos + size > payload.len() { - break; - } - decoded.extend_from_slice(&payload[pos..pos + size]); - pos += size + 2; - } - } else if let Some(len) = content_length { - decoded.extend_from_slice(&payload[..payload.len().min(len)]); - } else { - decoded.extend_from_slice(payload); - } - - Ok(ParsedHttpResponse { - status, - status_message, - headers: hdrs, - trailers, - body: decoded, - }) -} - // ------------------------------------------------------------------ // Handle types // ------------------------------------------------------------------ @@ -451,6 +267,10 @@ pub struct ClientRequestHandle { listeners: HashMap>, timeout_ms: Option, ended: bool, + /// `flushHeaders()` dispatched the exchange before `end()` was called; + /// the eventual `end()` still owes the write/finish/end callback + /// ordering exactly once. + flushed_early: bool, /// #4909 — `write(chunk, cb)` callbacks queued until the body is /// flushed at `end()` (Node fires them once the chunk hits the /// transport; our buffered MVP flushes everything at `end()`). @@ -476,6 +296,11 @@ pub struct ClientRequestHandle { /// Client-side TLS options (#4906): `rejectUnauthorized` / `ca` / /// `checkServerIdentity`. Default = no customization (pooled client). tls: tls_client::TlsOptions, + /// The IncomingMessage handle created when a streamed `ResponseHead` + /// arrived; later `ResponseChunk` / `ResponseEnd` events route to it. + /// `0` until the head is delivered (and always for the full-buffer + /// delivery paths). + incoming_handle: Handle, } // SAFETY: closure pointers point into program-global code/data and @@ -635,6 +460,7 @@ fn make_request_handle( listeners: HashMap::new(), timeout_ms, ended: false, + flushed_early: false, pending_write_callbacks: Vec::new(), end_callback: 0, completed: false, @@ -642,6 +468,7 @@ fn make_request_handle( close_emitted: false, agent_handle, tls: tls_client::TlsOptions::default(), + incoming_handle: 0, }); // #4909 — `options.timeout` arms the inactivity timer as soon as the // socket exists in Node, not at `end()`; a request that is never @@ -1150,6 +977,12 @@ pub unsafe extern "C" fn js_http_client_request_end(handle: Handle, body_f64: f6 } pub(crate) unsafe fn client_request_end_impl(handle: Handle, body_f64: f64) -> Handle { + // An aborted/destroyed request never dispatches — Node's `abort()` + // before `end()` means the server must not see the request and no + // `'error'` fires (test-http-abort-before-end). + if client_request_surface::request_destroyed(handle) { + return handle; + } if let Some(body) = client_outgoing::chunk_to_bytes(body_f64) { with_handle_mut::(handle, |req| { req.body.extend_from_slice(&body); @@ -1158,10 +991,17 @@ pub(crate) unsafe fn client_request_end_impl(handle: Handle, body_f64: f64) -> H let snapshot = with_handle_mut::(handle, |req| { if req.ended { - return None; + // Already dispatched by `flushHeaders()` — the exchange is in + // flight, but this `end()` still owes its write/finish/end + // callback ordering (once). + if req.flushed_early { + req.flushed_early = false; + return Err(true); + } + return Err(false); } req.ended = true; - Some(( + Ok(( req.method.clone(), req.url.clone(), req.headers.clone(), @@ -1172,19 +1012,82 @@ pub(crate) unsafe fn client_request_end_impl(handle: Handle, body_f64: f64) -> H )) }); - let snapshot = match snapshot.flatten() { - Some(s) => s, + let snapshot = match snapshot { + Some(Ok(s)) => s, + Some(Err(owes_flush)) => { + if owes_flush { + push_event(PendingHttpEvent::Flushed { + request_handle: handle, + }); + } + return handle; + } None => return handle, }; - let (method, url, headers, body, timeout_ms, agent_handle, tls) = snapshot; - // #4909 — queue the flush notification before dispatching so the // write/end callbacks and `'finish'` drain ahead of any `'response'`. push_event(PendingHttpEvent::Flushed { request_handle: handle, }); + dispatch_request_snapshot(handle, snapshot); + handle +} + +/// `req.flushHeaders()` — Node opens the connection and puts the request +/// head on the wire immediately. Our transport sends a complete request in +/// one shot, so for a request with no buffered body (and a method that +/// doesn't usually carry one) this dispatches the exchange now; a later +/// `end()` only drains the callback ordering. Requests that already +/// buffered body bytes (or use body-carrying methods) keep the +/// dispatch-at-`end()` behavior, since the head can't go out alone. +pub(crate) unsafe fn client_request_flush_headers(handle: Handle) { + if client_request_surface::request_destroyed(handle) { + return; + } + let snapshot = with_handle_mut::(handle, |req| { + if req.ended || !req.body.is_empty() { + return None; + } + let method = req.method.to_ascii_uppercase(); + if !matches!(method.as_str(), "GET" | "HEAD" | "DELETE" | "OPTIONS") { + return None; + } + req.ended = true; + req.flushed_early = true; + Some(( + req.method.clone(), + req.url.clone(), + req.headers.clone(), + Vec::new(), + req.timeout_ms, + req.agent_handle, + req.tls.clone(), + )) + }) + .flatten(); + if let Some(snapshot) = snapshot { + dispatch_request_snapshot(handle, snapshot); + } +} + +type RequestSnapshot = ( + String, + String, + HashMap, + Vec, + Option, + Handle, + tls_client::TlsOptions, +); + +/// The shared dispatch tail of `end()` / `flushHeaders()`: route through the +/// agent's `createConnection` / `createSocket` override when present, else +/// the reqwest path. +unsafe fn dispatch_request_snapshot(handle: Handle, snapshot: RequestSnapshot) { + let (method, url, headers, body, timeout_ms, agent_handle, tls) = snapshot; + // #2154 — if the agent supplied a `createConnection` / `createSocket` // override, invoke it here on the main thread (JS closure calls must not // run on a tokio worker) and run the HTTP exchange over the socket it @@ -1200,7 +1103,7 @@ pub(crate) unsafe fn client_request_end_impl(handle: Handle, body_f64: f64) -> H // fall through to reqwest after dispatching it. if agent::create_socket_override(agent_handle) != 0 { invoke_create_socket(handle, agent_handle, &host, port, &path); - return handle; + return; } if let Some(socket_id) = agent::try_create_connection_socket(agent_handle, &host, port, &path) @@ -1213,7 +1116,7 @@ pub(crate) unsafe fn client_request_end_impl(handle: Handle, body_f64: f64) -> H dispatch_request_over_socket( handle, method, url, headers, body, timeout_ms, socket_id, ); - return handle; + return; } } } @@ -1228,7 +1131,6 @@ pub(crate) unsafe fn client_request_end_impl(handle: Handle, body_f64: f64) -> H agent_handle, tls, ); - handle } /// Parse a request URL into the `(host, port, path)` an @@ -1647,6 +1549,28 @@ pub unsafe extern "C" fn js_http_process_pending() -> i32 { body, ); } + PendingHttpEvent::ResponseHead { + request_handle, + status, + status_message, + headers, + } => { + client_events::handle_response_head_event( + request_handle, + status, + status_message, + headers, + ); + } + PendingHttpEvent::ResponseChunk { + request_handle, + chunk, + } => { + client_events::handle_response_chunk_event(request_handle, chunk); + } + PendingHttpEvent::ResponseEnd { request_handle } => { + client_events::handle_response_end_event(request_handle); + } PendingHttpEvent::Error { request_handle, error_message, diff --git a/crates/perry-ext-http/src/plain_client.rs b/crates/perry-ext-http/src/plain_client.rs new file mode 100644 index 0000000000..2f0d3cd304 --- /dev/null +++ b/crates/perry-ext-http/src/plain_client.rs @@ -0,0 +1,219 @@ +//! Raw-socket HTTP/1.1 client path used when the request asks for response +//! trailers (`TE: trailers`) — reqwest's body API drops trailer blocks, so +//! this bypass speaks HTTP/1.1 over a plain TcpStream and parses the +//! response (chunked decoding + trailer block) itself. The parser is shared +//! with the #2154 `agent.createConnection` socket path in `lib.rs`. + +use std::collections::HashMap; + +use perry_ffi::Handle; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; + +use crate::{push_event, PendingHttpEvent}; + +fn expects_response_trailers(headers: &HashMap) -> bool { + headers.iter().any(|(name, value)| { + name.eq_ignore_ascii_case("te") + && value + .split(',') + .any(|part| part.trim().eq_ignore_ascii_case("trailers")) + }) +} + +pub(crate) async fn dispatch_plain_http_request( + request_handle: Handle, + method: &str, + url: &str, + headers: &HashMap, + body: &[u8], + timeout_ms: Option, +) -> Option> { + if !expects_response_trailers(headers) { + return None; + } + let parsed = match reqwest::Url::parse(url) { + Ok(u) if u.scheme() == "http" => u, + _ => return None, + }; + let host = match parsed.host_str() { + Some(h) => h.to_string(), + None => return Some(Err("missing host".to_string())), + }; + let port = parsed.port_or_known_default().unwrap_or(80); + let mut path = parsed.path().to_string(); + if path.is_empty() { + path.push('/'); + } + if let Some(q) = parsed.query() { + path.push('?'); + path.push_str(q); + } + + let fut = async { + let mut stream = tokio::net::TcpStream::connect((host.as_str(), port)).await?; + let host_header = if parsed.port().is_some() { + format!("{}:{}", host, port) + } else { + host.clone() + }; + let mut req = format!("{} {} HTTP/1.1\r\nHost: {}\r\n", method, path, host_header); + let mut has_content_length = false; + for (k, v) in headers { + if k.eq_ignore_ascii_case("content-length") { + has_content_length = true; + } + if k.eq_ignore_ascii_case("connection") { + // The raw trailer-aware path reads until EOF after the final + // chunk/trailer block. Force close here so an explicit + // `Connection: keep-alive` cannot hang until timeout. + continue; + } + req.push_str(k); + req.push_str(": "); + req.push_str(v); + req.push_str("\r\n"); + } + req.push_str("Connection: close\r\n"); + if !body.is_empty() && !has_content_length { + req.push_str(&format!("Content-Length: {}\r\n", body.len())); + } + req.push_str("\r\n"); + stream.write_all(req.as_bytes()).await?; + if !body.is_empty() { + stream.write_all(body).await?; + } + + let mut raw = Vec::new(); + stream.read_to_end(&mut raw).await?; + Ok::, std::io::Error>(raw) + }; + + let raw = match timeout_ms { + Some(ms) => match tokio::time::timeout(std::time::Duration::from_millis(ms), fut).await { + Ok(r) => r, + Err(_) => return Some(Err("request timed out".to_string())), + }, + None => match tokio::time::timeout(std::time::Duration::from_secs(30), fut).await { + Ok(r) => r, + Err(_) => return Some(Err("request timed out".to_string())), + }, + }; + let raw = match raw { + Ok(r) => r, + Err(e) => return Some(Err(e.to_string())), + }; + + match parse_http_response(&raw) { + Ok(parsed) => { + push_event(PendingHttpEvent::Response { + request_handle, + status: parsed.status, + status_message: parsed.status_message, + headers: parsed.headers, + trailers: parsed.trailers, + body: parsed.body, + }); + Some(Ok(())) + } + Err(e) => Some(Err(e)), + } +} + +/// A parsed HTTP/1.1 response message (status line + headers + decoded body +/// + trailers). Produced by [`parse_http_response`]. +pub(crate) struct ParsedHttpResponse { + pub(crate) status: u16, + pub(crate) status_message: String, + pub(crate) headers: Vec<(String, String)>, + pub(crate) trailers: Vec<(String, String)>, + pub(crate) body: Vec, +} + +/// Parse a raw HTTP/1.1 response (the bytes read off a socket) into status / +/// headers / decoded body / trailers. Decodes `Transfer-Encoding: chunked` +/// (including a trailer block) and honors `Content-Length`; with neither it +/// treats the remainder as the body (read-until-EOF transports). Shared by +/// the trailer-aware reqwest-bypass path ([`dispatch_plain_http_request`]) +/// and the #2154 `agent.createConnection` socket path +/// ([`dispatch_request_over_socket`]). +pub(crate) fn parse_http_response(raw: &[u8]) -> Result { + let Some(header_end) = raw.windows(4).position(|w| w == b"\r\n\r\n") else { + return Err("invalid HTTP response".to_string()); + }; + let head = String::from_utf8_lossy(&raw[..header_end]); + let mut lines = head.split("\r\n"); + let status_line = lines.next().unwrap_or_default(); + let mut status_parts = status_line.splitn(3, ' '); + let _version = status_parts.next(); + let status = status_parts + .next() + .and_then(|s| s.parse::().ok()) + .unwrap_or(0); + let status_message = status_parts.next().unwrap_or("").to_string(); + let mut hdrs = Vec::new(); + let mut is_chunked = false; + let mut content_length: Option = None; + for line in lines { + if let Some((name, value)) = line.split_once(':') { + let name = name.trim().to_ascii_lowercase(); + let value = value.trim().to_string(); + if name == "transfer-encoding" && value.to_ascii_lowercase().contains("chunked") { + is_chunked = true; + } + if name == "content-length" { + content_length = value.parse::().ok(); + } + hdrs.push((name, value)); + } + } + let payload = &raw[header_end + 4..]; + let mut decoded = Vec::new(); + let mut trailers = Vec::new(); + if is_chunked { + let mut pos = 0; + while pos < payload.len() { + let Some(line_end_rel) = payload[pos..].windows(2).position(|w| w == b"\r\n") else { + break; + }; + let line_end = pos + line_end_rel; + let size_line = String::from_utf8_lossy(&payload[pos..line_end]); + let size_hex = size_line.split(';').next().unwrap_or("").trim(); + let size = usize::from_str_radix(size_hex, 16).unwrap_or(0); + pos = line_end + 2; + if size == 0 { + if pos <= payload.len() { + let rest = &payload[pos..]; + let trailer_end = rest + .windows(4) + .position(|w| w == b"\r\n\r\n") + .unwrap_or(rest.len()); + let trailer_text = String::from_utf8_lossy(&rest[..trailer_end]); + for line in trailer_text.split("\r\n") { + if let Some((name, value)) = line.split_once(':') { + trailers + .push((name.trim().to_ascii_lowercase(), value.trim().to_string())); + } + } + } + break; + } + if pos + size > payload.len() { + break; + } + decoded.extend_from_slice(&payload[pos..pos + size]); + pos += size + 2; + } + } else if let Some(len) = content_length { + decoded.extend_from_slice(&payload[..payload.len().min(len)]); + } else { + decoded.extend_from_slice(payload); + } + + Ok(ParsedHttpResponse { + status, + status_message, + headers: hdrs, + trailers, + body: decoded, + }) +} diff --git a/crates/perry-ext-http/src/tests.rs b/crates/perry-ext-http/src/tests.rs index 4f18d7f48f..b63c198f45 100644 --- a/crates/perry-ext-http/src/tests.rs +++ b/crates/perry-ext-http/src/tests.rs @@ -65,6 +65,7 @@ fn gc_mutable_scanner_rewrites_request_response_listener_roots() { listeners: request_listeners, timeout_ms: None, ended: false, + flushed_early: false, pending_write_callbacks: Vec::new(), end_callback: 0, completed: false, @@ -72,6 +73,7 @@ fn gc_mutable_scanner_rewrites_request_response_listener_roots() { close_emitted: false, agent_handle: 0, tls: crate::tls_client::TlsOptions::default(), + incoming_handle: 0, }); let mut incoming_listeners = HashMap::new(); diff --git a/crates/perry-ext-http/src/tls_client.rs b/crates/perry-ext-http/src/tls_client.rs index 22b05d19d3..2e1ba3232f 100644 --- a/crates/perry-ext-http/src/tls_client.rs +++ b/crates/perry-ext-http/src/tls_client.rs @@ -82,9 +82,8 @@ impl TlsOptions { &self, pool: Option<(bool, f64, f64)>, ) -> Result { - let mut builder = reqwest::Client::builder() - .user_agent(concat!("perry/", env!("CARGO_PKG_VERSION"))) - .tcp_keepalive(std::time::Duration::from_secs(60)); + let mut builder = + reqwest::Client::builder().tcp_keepalive(std::time::Duration::from_secs(60)); if self.accept_invalid_certs() { builder = builder.danger_accept_invalid_certs(true); diff --git a/crates/perry-hir/src/lower/expr_member.rs b/crates/perry-hir/src/lower/expr_member.rs index 30ca2f5db1..c9530759ca 100644 --- a/crates/perry-hir/src/lower/expr_member.rs +++ b/crates/perry-hir/src/lower/expr_member.rs @@ -1594,6 +1594,8 @@ fn lower_member_inner(ctx: &mut LoweringContext, member: &ast::MemberExpr) -> Re | ("IncomingMessage", "method") | ("IncomingMessage", "url") | ("IncomingMessage", "httpVersion") + | ("IncomingMessage", "httpVersionMajor") + | ("IncomingMessage", "httpVersionMinor") | ("IncomingMessage", "complete") | ("IncomingMessage", "aborted") | ("IncomingMessage", "destroyed") @@ -2807,6 +2809,8 @@ fn is_http_incoming_message_runtime_property_name(prop: &str) -> bool { "method" | "url" | "httpVersion" + | "httpVersionMajor" + | "httpVersionMinor" | "headers" | "rawHeaders" | "headersDistinct" diff --git a/crates/perry-stdlib/src/common/dispatch.rs b/crates/perry-stdlib/src/common/dispatch.rs index e618b4495e..b3f47e5e1c 100644 --- a/crates/perry-stdlib/src/common/dispatch.rs +++ b/crates/perry-stdlib/src/common/dispatch.rs @@ -2042,6 +2042,8 @@ pub unsafe extern "C" fn js_handle_property_dispatch( | "url" | "rawBody" | "httpVersion" + | "httpVersionMajor" + | "httpVersionMinor" | "headers" | "rawHeaders" | "headersDistinct" diff --git a/crates/perry-stdlib/src/http/client_request_surface.rs b/crates/perry-stdlib/src/http/client_request_surface.rs index 30ec0932e5..7be0a8c070 100644 --- a/crates/perry-stdlib/src/http/client_request_surface.rs +++ b/crates/perry-stdlib/src/http/client_request_surface.rs @@ -246,6 +246,19 @@ pub extern "C" fn js_http_client_request_noop_undefined( undefined_value() } +/// Twin of perry-ext-http's `js_http_client_request_flush_headers` for +/// non-auto-optimize links: the stdlib client dispatches the whole exchange +/// at `end()`, so flushHeaders stays a no-op here. +#[no_mangle] +pub extern "C" fn js_http_client_request_flush_headers( + handle: Handle, + _arg0: f64, + _arg1: f64, +) -> f64 { + let _ = handle; + undefined_value() +} + #[no_mangle] pub extern "C" fn js_http_client_request_aborted(handle: Handle) -> f64 { state_bool(handle, "aborted") diff --git a/docs/src/api/reference.md b/docs/src/api/reference.md index 994566740e..25509810ab 100644 --- a/docs/src/api/reference.md +++ b/docs/src/api/reference.md @@ -2,7 +2,7 @@ This page is auto-generated from Perry's compile-time API manifest (`perry-api-manifest::API_MANIFEST`). It is the source of truth for what `perry compile` accepts; references to symbols not listed here produce `R005 UnimplementedApi` (issue #463). Stubs (#464) are flagged ⚠ — they link cleanly but no-op at runtime on the chosen target. -Total: 2800 entries across 115 modules. +Total: 2802 entries across 115 modules. ## Modules @@ -1455,6 +1455,8 @@ Total: 2800 entries across 115 modules. - `__get_headersTimeout` — instance *(class: `HttpServer`)* - `__get_host` — instance *(class: `ClientRequest`)* - `__get_httpVersion` — instance *(class: `IncomingMessage`)* +- `__get_httpVersionMajor` — instance *(class: `IncomingMessage`)* +- `__get_httpVersionMinor` — instance *(class: `IncomingMessage`)* - `__get_keepAlive` — instance *(class: `Agent`)* - `__get_keepAliveMsecs` — instance *(class: `Agent`)* - `__get_keepAliveTimeout` — instance *(class: `HttpServer`)*