diff --git a/crates/openshell-supervisor-network/src/l7/relay.rs b/crates/openshell-supervisor-network/src/l7/relay.rs index 3054a4530..830e3461b 100644 --- a/crates/openshell-supervisor-network/src/l7/relay.rs +++ b/crates/openshell-supervisor-network/src/l7/relay.rs @@ -136,6 +136,43 @@ fn emit_parse_rejection(ctx: &L7EvalContext, detail: &str, engine_type: &str) { emit_activity(ctx, true, "l7_parse_rejection"); } +fn engine_type_for_protocol(protocol: L7Protocol) -> &'static str { + match protocol { + L7Protocol::Graphql => "l7-graphql", + L7Protocol::Websocket => "l7-websocket", + L7Protocol::Rest | L7Protocol::Sql => "l7", + } +} + +async fn deny_h2c_upgrade_if_requested( + req: &crate::l7::provider::L7Request, + config: &L7EndpointConfig, + ctx: &L7EvalContext, + client: &mut C, +) -> Result +where + C: AsyncRead + AsyncWrite + Unpin + Send, +{ + if !crate::l7::rest::request_is_h2c_upgrade(&req.raw_header) { + return Ok(false); + } + + emit_parse_rejection( + ctx, + crate::l7::rest::UNSUPPORTED_H2C_UPGRADE_DETAIL, + engine_type_for_protocol(config.protocol), + ); + crate::l7::rest::RestProvider::default() + .deny( + req, + &ctx.policy_name, + crate::l7::rest::UNSUPPORTED_H2C_UPGRADE_DETAIL, + client, + ) + .await?; + Ok(true) +} + /// Run protocol-aware L7 inspection on a tunnel. /// /// This replaces `copy_bidirectional` for L7-enabled endpoints. @@ -239,6 +276,10 @@ where return Ok(()); }; + if deny_h2c_upgrade_if_requested(&req, config, ctx, client).await? { + return Ok(()); + } + let graphql_info = if config.protocol == L7Protocol::Graphql { match crate::l7::graphql::inspect_graphql_request( client, @@ -662,6 +703,10 @@ where } }; + if deny_h2c_upgrade_if_requested(&req, config, ctx, client).await? { + return Ok(()); + } + if close_if_stale(engine.generation_guard(), ctx) { return Ok(()); } @@ -933,6 +978,10 @@ where let req = parsed.request; let graphql_info = parsed.info; + if deny_h2c_upgrade_if_requested(&req, config, ctx, client).await? { + return Ok(()); + } + if close_if_stale(engine.generation_guard(), ctx) { return Ok(()); } diff --git a/crates/openshell-supervisor-network/src/l7/rest.rs b/crates/openshell-supervisor-network/src/l7/rest.rs index 4f46d24ba..3ae46417f 100644 --- a/crates/openshell-supervisor-network/src/l7/rest.rs +++ b/crates/openshell-supervisor-network/src/l7/rest.rs @@ -22,6 +22,22 @@ use tracing::debug; const MAX_HEADER_BYTES: usize = 16384; // 16 KiB for HTTP headers const MAX_REWRITE_BODY_BYTES: usize = 256 * 1024; const RELAY_BUF_SIZE: usize = 8192; +const HTTP_METHOD_PREFIXES: &[&[u8]] = &[ + b"GET ", + b"HEAD ", + b"POST ", + b"PUT ", + b"DELETE ", + b"PATCH ", + b"OPTIONS ", + b"CONNECT ", + b"TRACE ", +]; +pub(crate) const HTTP2_PRIOR_KNOWLEDGE_PREFACE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"; +pub(crate) const UNSUPPORTED_H2C_UPGRADE_DETAIL: &str = + "HTTP/2 cleartext upgrade (h2c) is not supported for L7-inspected endpoints"; +const MIN_HTTP2_PREFACE_DETECTION_BYTES: usize = 8; + /// Idle timeout for `relay_until_eof`. If no data arrives within this window /// the body is considered complete. Prevents blocking on servers that keep /// the TCP connection alive after the response body (common with CDN keep-alive). @@ -913,6 +929,36 @@ pub(crate) fn request_is_websocket_upgrade(raw_header: &[u8]) -> bool { validate_websocket_upgrade_request(&raw_header[..header_end]).unwrap_or(false) } +pub(crate) fn request_is_h2c_upgrade(raw_header: &[u8]) -> bool { + let header_end = raw_header + .windows(4) + .position(|w| w == b"\r\n\r\n") + .map_or(raw_header.len(), |p| p + 4); + let Ok(header_str) = std::str::from_utf8(&raw_header[..header_end]) else { + return false; + }; + + let mut upgrade_h2c = false; + let mut connection_upgrade = false; + + for line in header_str.lines().skip(1) { + let Some((name, value)) = line.split_once(':') else { + continue; + }; + let name = name.trim(); + let value = value.trim(); + if name.eq_ignore_ascii_case("upgrade") && header_value_contains_token(value, "h2c") { + upgrade_h2c = true; + } + if name.eq_ignore_ascii_case("connection") && header_value_contains_token(value, "upgrade") + { + connection_upgrade = true; + } + } + + upgrade_h2c && connection_upgrade +} + fn rewrite_websocket_extensions_for_mode( raw_header: &[u8], mode: WebSocketExtensionMode, @@ -1962,18 +2008,27 @@ where /// /// Checks for common HTTP methods at the start of the stream. pub fn looks_like_http(peek: &[u8]) -> bool { - const METHODS: &[&[u8]] = &[ - b"GET ", - b"HEAD ", - b"POST ", - b"PUT ", - b"DELETE ", - b"PATCH ", - b"OPTIONS ", - b"CONNECT ", - b"TRACE ", - ]; - METHODS.iter().any(|m| peek.starts_with(m)) + HTTP_METHOD_PREFIXES + .iter() + .any(|method| peek.starts_with(method)) +} + +pub(crate) fn could_be_http_request_prefix(peek: &[u8]) -> bool { + !peek.is_empty() + && HTTP_METHOD_PREFIXES + .iter() + .any(|method| peek.len() < method.len() && method.starts_with(peek)) +} + +pub fn looks_like_http2_prior_knowledge(peek: &[u8]) -> bool { + peek.len() >= MIN_HTTP2_PREFACE_DETECTION_BYTES + && HTTP2_PRIOR_KNOWLEDGE_PREFACE.starts_with(peek) +} + +pub(crate) fn could_be_http2_prior_knowledge_prefix(peek: &[u8]) -> bool { + !peek.is_empty() + && peek.len() < MIN_HTTP2_PREFACE_DETECTION_BYTES + && HTTP2_PRIOR_KNOWLEDGE_PREFACE.starts_with(peek) } /// Check if an IO error represents a benign connection close. @@ -2919,10 +2974,26 @@ mod tests { assert!(looks_like_http(b"GET / HTTP/1.1\r\n")); assert!(looks_like_http(b"POST /api HTTP/1.1\r\n")); assert!(looks_like_http(b"DELETE /foo HTTP/1.1\r\n")); + assert!(could_be_http_request_prefix(b"GE")); + assert!(!could_be_http_request_prefix(b"GET ")); assert!(!looks_like_http(b"\x00\x00\x00\x08")); // Postgres + assert!(!looks_like_http(HTTP2_PRIOR_KNOWLEDGE_PREFACE)); assert!(!looks_like_http(b"HELLO")); // Unknown } + #[test] + fn http2_prior_knowledge_detection() { + assert!(looks_like_http2_prior_knowledge( + HTTP2_PRIOR_KNOWLEDGE_PREFACE + )); + assert!(looks_like_http2_prior_knowledge( + &HTTP2_PRIOR_KNOWLEDGE_PREFACE[..8] + )); + assert!(could_be_http2_prior_knowledge_prefix(b"PRI * H")); + assert!(!looks_like_http2_prior_knowledge(b"PRI * H")); + assert!(!looks_like_http2_prior_knowledge(b"PRI / HTTP/1.1\r\n")); + } + #[test] fn test_parse_status_code() { assert_eq!( @@ -4160,6 +4231,20 @@ mod tests { assert!(!validate_websocket_upgrade_request(raw).expect("h2c request should parse")); } + #[test] + fn h2c_upgrade_detection_requires_upgrade_token_and_connection_upgrade() { + let raw = b"GET /h2c HTTP/1.1\r\nHost: example.com\r\nUpgrade: h2c\r\nConnection: keep-alive, Upgrade\r\nHTTP2-Settings: AAMAAABkAAQAAP__\r\n\r\n"; + assert!(request_is_h2c_upgrade(raw)); + + let missing_connection = b"GET /h2c HTTP/1.1\r\nHost: example.com\r\nUpgrade: h2c\r\n\r\n"; + assert!(!request_is_h2c_upgrade(missing_connection)); + + let websocket = format!( + "GET /ws HTTP/1.1\r\nHost: example.com\r\nUpgrade: websocket\r\nConnection: Upgrade\r\nSec-WebSocket-Key: {VALID_WS_KEY}\r\nSec-WebSocket-Version: 13\r\n\r\n" + ); + assert!(!request_is_h2c_upgrade(websocket.as_bytes())); + } + #[test] fn strip_websocket_extensions_removes_extension_negotiation() { let raw = format!( diff --git a/crates/openshell-supervisor-network/src/proxy.rs b/crates/openshell-supervisor-network/src/proxy.rs index d467b022e..691382469 100644 --- a/crates/openshell-supervisor-network/src/proxy.rs +++ b/crates/openshell-supervisor-network/src/proxy.rs @@ -31,6 +31,15 @@ use tokio::task::JoinHandle; use tracing::{debug, warn}; const MAX_HEADER_BYTES: usize = 8192; +const TUNNEL_PROTOCOL_PEEK_BYTES: usize = crate::l7::rest::HTTP2_PRIOR_KNOWLEDGE_PREFACE.len(); +#[cfg(not(test))] +const TUNNEL_PROTOCOL_PEEK_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(100); +#[cfg(test)] +const TUNNEL_PROTOCOL_PEEK_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(10); +#[cfg(not(test))] +const TUNNEL_PROTOCOL_PEEK_POLL: std::time::Duration = std::time::Duration::from_millis(5); +#[cfg(test)] +const TUNNEL_PROTOCOL_PEEK_POLL: std::time::Duration = std::time::Duration::from_millis(1); const INFERENCE_LOCAL_HOST: &str = "inference.local"; const INFERENCE_LOCAL_PORT: u16 = 443; @@ -318,6 +327,80 @@ fn l7_inspection_active(l7_route: Option<&L7RouteSnapshot>) -> bool { l7_route.is_some_and(|route| !route.configs.is_empty()) } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum TunnelProtocol { + Tls, + Http1, + H2cPriorKnowledge, + Unsupported, +} + +fn classify_tunnel_protocol(peek: &[u8]) -> TunnelProtocol { + if crate::l7::tls::looks_like_tls(peek) { + return TunnelProtocol::Tls; + } + if crate::l7::rest::looks_like_http(peek) { + return TunnelProtocol::Http1; + } + if crate::l7::rest::looks_like_http2_prior_knowledge(peek) { + return TunnelProtocol::H2cPriorKnowledge; + } + TunnelProtocol::Unsupported +} + +fn could_be_tls_prefix(peek: &[u8]) -> bool { + matches!(peek, [0x16] | [0x16, 0x03]) +} + +fn could_be_supported_tunnel_protocol_prefix(peek: &[u8]) -> bool { + could_be_tls_prefix(peek) + || crate::l7::rest::could_be_http_request_prefix(peek) + || crate::l7::rest::could_be_http2_prior_knowledge_prefix(peek) +} + +fn unsupported_l7_tunnel_protocol_detail( + tunnel_protocol: TunnelProtocol, + should_inspect_l7: bool, +) -> Option<&'static str> { + if !should_inspect_l7 { + return None; + } + + match tunnel_protocol { + TunnelProtocol::H2cPriorKnowledge => { + Some("HTTP/2 prior-knowledge (h2c) is not supported for L7-inspected endpoints") + } + TunnelProtocol::Unsupported => { + Some("Unsupported tunnel protocol for L7-inspected endpoint") + } + TunnelProtocol::Tls | TunnelProtocol::Http1 => None, + } +} + +async fn peek_tunnel_protocol(client: &TcpStream) -> Result> { + let mut peek_buf = [0u8; TUNNEL_PROTOCOL_PEEK_BYTES]; + let deadline = tokio::time::Instant::now() + TUNNEL_PROTOCOL_PEEK_TIMEOUT; + + loop { + let n = client.peek(&mut peek_buf).await.into_diagnostic()?; + if n == 0 { + return Ok(None); + } + + let peek = &peek_buf[..n]; + let protocol = classify_tunnel_protocol(peek); + if protocol != TunnelProtocol::Unsupported + || !could_be_supported_tunnel_protocol_prefix(peek) + || n == peek_buf.len() + || tokio::time::Instant::now() >= deadline + { + return Ok(Some(protocol)); + } + + tokio::time::sleep(TUNNEL_PROTOCOL_PEEK_POLL).await; + } +} + fn emit_connect_activity_if_l4_only( tx: &Option, l7_route: Option<&L7RouteSnapshot>, @@ -985,17 +1068,14 @@ async fn handle_tcp_connection( return Ok(()); } - // Auto-detect TLS by peeking the first bytes. - let mut peek_buf = [0u8; 8]; - let n = client.peek(&mut peek_buf).await.into_diagnostic()?; - if n == 0 { + // Auto-detect the tunnel payload. L7-configured endpoints must only + // enter relays that can enforce their configured protocol; unsupported + // bytes fail closed below instead of falling through to raw relay. + let Some(tunnel_protocol) = peek_tunnel_protocol(&client).await? else { return Ok(()); - } - - let is_tls = crate::l7::tls::looks_like_tls(&peek_buf[..n]); - let is_http = crate::l7::rest::looks_like_http(&peek_buf[..n]); + }; - if is_tls { + if tunnel_protocol == TunnelProtocol::Tls { // TLS detected — terminate unconditionally. if let Some(ref tls) = tls_state { let tls_result = async { @@ -1095,7 +1175,7 @@ async fn handle_tcp_connection( .await .into_diagnostic()?; } - } else if is_http { + } else if tunnel_protocol == TunnelProtocol::Http1 { // Plaintext HTTP detected. if let Some(route) = l7_route.as_ref().filter(|route| !route.configs.is_empty()) { let tunnel_engine = match opa_engine.clone_engine_for_tunnel(route.generation) { @@ -1178,6 +1258,51 @@ async fn handle_tcp_connection( } } } else { + if let Some(protocol_detail) = + unsupported_l7_tunnel_protocol_detail(tunnel_protocol, should_inspect_l7) + { + let event = NetworkActivityBuilder::new(openshell_ocsf::ctx::ctx()) + .activity(ActivityId::Open) + .action(ActionId::Denied) + .disposition(DispositionId::Blocked) + .severity(SeverityId::Medium) + .status(StatusId::Failure) + .dst_endpoint(Endpoint::from_domain(&host_lc, port)) + .src_endpoint_addr(peer_addr.ip(), peer_addr.port()) + .actor_process( + Process::from_bypass(&binary_str, &pid_str, &ancestors_str) + .with_cmd_line(&cmdline_str), + ) + .firewall_rule(policy_str, "l7") + .message(format!( + "CONNECT_L7 blocked unsupported tunnel protocol for {host_lc}:{port}" + )) + .status_detail(protocol_detail) + .build(); + ocsf_emit!(event); + emit_activity_simple(activity_tx.as_ref(), true, "l7_parse_rejection"); + emit_denial( + &denial_tx, + &host_lc, + port, + &binary_str, + &decision, + protocol_detail, + "connect-l7-parse-rejection", + ); + respond( + &mut client, + &build_json_error_response( + 403, + "Forbidden", + "unsupported_l7_protocol", + protocol_detail, + ), + ) + .await?; + return Ok(()); + } + // Neither TLS nor HTTP — raw binary relay. debug!( host = %host_lc, @@ -3336,6 +3461,52 @@ async fn handle_forward_proxy( .await?; return Ok(()); }; + if crate::l7::rest::request_is_h2c_upgrade(&forward_request_bytes) { + let event = HttpActivityBuilder::new(openshell_ocsf::ctx::ctx()) + .activity(ActivityId::Other) + .action(ActionId::Denied) + .disposition(DispositionId::Blocked) + .severity(SeverityId::Medium) + .status(StatusId::Failure) + .http_request(HttpRequest::new( + method, + OcsfUrl::new("http", &host_lc, &path, port), + )) + .dst_endpoint(Endpoint::from_domain(&host_lc, port)) + .src_endpoint(Endpoint::from_ip(peer_addr.ip(), peer_addr.port())) + .actor_process( + Process::from_bypass(&binary_str, &pid_str, &ancestors_str) + .with_cmd_line(&cmdline_str), + ) + .firewall_rule(policy_str, "l7") + .message(format!( + "FORWARD_L7 denied unsupported h2c upgrade for {method} {host_lc}:{port}{path}" + )) + .status_detail(crate::l7::rest::UNSUPPORTED_H2C_UPGRADE_DETAIL) + .build(); + ocsf_emit!(event); + emit_activity_simple(activity_tx, true, "l7_parse_rejection"); + emit_denial_simple( + denial_tx, + &host_lc, + port, + &binary_str, + &decision, + crate::l7::rest::UNSUPPORTED_H2C_UPGRADE_DETAIL, + "forward-l7-parse-rejection", + ); + respond( + client, + &build_json_error_response( + 403, + "Forbidden", + "unsupported_l7_protocol", + crate::l7::rest::UNSUPPORTED_H2C_UPGRADE_DETAIL, + ), + ) + .await?; + return Ok(()); + } forward_websocket_request = crate::l7::rest::request_is_websocket_upgrade(&forward_request_bytes); websocket_extensions = crate::l7::relay::websocket_extension_mode(&l7_config.config); @@ -4098,6 +4269,123 @@ mod tests { } } + #[test] + fn tunnel_protocol_classification_detects_supported_protocols() { + assert_eq!( + classify_tunnel_protocol(&[0x16, 0x03, 0x03, 0x00]), + TunnelProtocol::Tls + ); + assert_eq!( + classify_tunnel_protocol(b"GET / HTTP/1.1\r\n"), + TunnelProtocol::Http1 + ); + assert_eq!( + classify_tunnel_protocol(&crate::l7::rest::HTTP2_PRIOR_KNOWLEDGE_PREFACE[..8]), + TunnelProtocol::H2cPriorKnowledge + ); + assert_eq!( + classify_tunnel_protocol(b"SSH-2.0-OpenSSH\r\n"), + TunnelProtocol::Unsupported + ); + } + + #[test] + fn tunnel_protocol_prefix_detection_waits_for_partial_supported_prefixes() { + assert!(could_be_supported_tunnel_protocol_prefix(&[0x16])); + assert!(could_be_supported_tunnel_protocol_prefix(b"GE")); + assert!(could_be_supported_tunnel_protocol_prefix(b"PRI * H")); + assert!(!could_be_supported_tunnel_protocol_prefix(b"SSH")); + } + + #[tokio::test] + async fn h2c_prior_knowledge_is_blocked_for_l7_tunnel() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + let mut client = TcpStream::connect(addr).await.unwrap(); + let (server, _) = listener.accept().await.unwrap(); + + client + .write_all(crate::l7::rest::HTTP2_PRIOR_KNOWLEDGE_PREFACE) + .await + .unwrap(); + + let protocol = peek_tunnel_protocol(&server) + .await + .expect("peek should succeed") + .expect("client sent bytes"); + assert_eq!(protocol, TunnelProtocol::H2cPriorKnowledge); + assert_eq!( + unsupported_l7_tunnel_protocol_detail(protocol, true), + Some("HTTP/2 prior-knowledge (h2c) is not supported for L7-inspected endpoints") + ); + assert_eq!(unsupported_l7_tunnel_protocol_detail(protocol, false), None); + } + + #[tokio::test] + async fn h2c_upgrade_request_on_l7_relay_is_denied_without_upstream_write() { + let data = r#" +network_policies: + rest_api: + name: rest_api + endpoints: + - host: h2c.example.test + port: 80 + path: "/allowed" + protocol: rest + enforcement: enforce + rules: + - allow: + method: GET + path: "/allowed" + binaries: + - { path: /usr/bin/node } +"#; + let (config, tunnel_engine, ctx) = + forward_websocket_policy_parts(data, "h2c.example.test", 80, "/allowed", "rest_api"); + let (mut app, mut proxy_client) = tokio::io::duplex(8192); + let (mut proxy_upstream, mut upstream) = tokio::io::duplex(8192); + let request = b"GET /allowed HTTP/1.1\r\n\ + Host: h2c.example.test\r\n\ + Upgrade: h2c\r\n\ + Connection: keep-alive, Upgrade\r\n\ + HTTP2-Settings: AAMAAABkAAQAAP__\r\n\r\n"; + + app.write_all(request).await.unwrap(); + app.shutdown().await.unwrap(); + + crate::l7::relay::relay_with_inspection( + &config, + tunnel_engine, + &mut proxy_client, + &mut proxy_upstream, + &ctx, + ) + .await + .expect("h2c upgrade should be handled as a policy denial"); + + drop(proxy_client); + drop(proxy_upstream); + + let mut response = Vec::new(); + app.read_to_end(&mut response).await.unwrap(); + let response = String::from_utf8(response).unwrap(); + assert!( + response.starts_with("HTTP/1.1 403 Forbidden\r\n"), + "expected h2c upgrade to be denied, got: {response}" + ); + assert!( + response.contains(crate::l7::rest::UNSUPPORTED_H2C_UPGRADE_DETAIL), + "denial should explain unsupported h2c upgrade, got: {response}" + ); + + let mut leaked = Vec::new(); + upstream.read_to_end(&mut leaked).await.unwrap(); + assert!( + leaked.is_empty(), + "h2c upgrade request must not be written to upstream" + ); + } + #[test] fn connect_activity_is_skipped_when_l7_will_count_the_request() { let (tx, mut rx) = mpsc::channel(4);