diff --git a/Cargo.toml b/Cargo.toml index 86025646a..94817fcf2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -75,7 +75,7 @@ toml = "0.8" apollo-parser = "0.8.5" # HTTP client -reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls-native-roots"] } +reqwest = { version = "0.12", default-features = false, features = ["blocking", "json", "rustls-tls-native-roots"] } # WebSocket tokio-tungstenite = { version = "0.26", features = ["rustls-tls-native-roots"] } diff --git a/crates/openshell-supervisor-network/src/inspection.rs b/crates/openshell-supervisor-network/src/inspection.rs new file mode 100644 index 000000000..093779692 --- /dev/null +++ b/crates/openshell-supervisor-network/src/inspection.rs @@ -0,0 +1,336 @@ +// SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +//! Minimal inspection primitives for runtime-boundary experimentation. +//! +//! This module intentionally does not introduce control-plane configuration or +//! plugin registration. It provides a small decision vocabulary that the +//! supervisor network path can invoke when a caller wires in an inspector. + +use crate::l7::provider::L7Request; +use crate::l7::relay::L7EvalContext; +use miette::{Result, miette}; +use serde::{Deserialize, Serialize}; +use serde_json::Value as Json; +use std::sync::{Arc, LazyLock}; + +static REQUEST_INSPECTOR: LazyLock>> = LazyLock::new(|| { + DemoInspectorClient::from_env().map(|client| { + let inspector: Arc = Arc::new(client); + inspector + }) +}); + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(tag = "kind", rename_all = "snake_case")] +pub enum InspectionTarget { + LlmRequest { + provider: String, + request: Json, + }, + ToolRequest { + tool_name: String, + input: Json, + }, + HttpRequest { + method: String, + path: String, + headers: Vec<(String, String)>, + body: Vec, + }, +} + +#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)] +pub struct InspectionContext { + pub sandbox_id: Option, + pub scope_id: Option, + pub provider: Option, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct Finding { + pub code: String, + pub message: String, +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(tag = "kind", rename_all = "snake_case")] +pub enum InspectionDecision { + Allow, + Deny { + reason: String, + findings: Vec, + }, + Mutate { + target: InspectionTarget, + findings: Vec, + }, +} + +pub trait Inspector: Send + Sync { + fn inspect( + &self, + target: InspectionTarget, + ctx: &InspectionContext, + ) -> Result; +} + +#[derive(Debug, Serialize)] +struct InspectionRequestEnvelope<'a> { + target: &'a InspectionTarget, + context: &'a InspectionContext, +} + +#[derive(Clone)] +pub struct DemoInspectorClient { + endpoint: String, + client: reqwest::blocking::Client, +} + +impl DemoInspectorClient { + fn from_env() -> Option { + let endpoint = std::env::var("OPENSHELL_REQUEST_INSPECTOR_URL") + .ok() + .map(|value| value.trim().to_string()) + .filter(|value| !value.is_empty())?; + let client = reqwest::blocking::Client::builder() + .timeout(std::time::Duration::from_secs(5)) + .build() + .expect("request inspector client configuration should be valid"); + Some(Self { endpoint, client }) + } +} + +impl Inspector for DemoInspectorClient { + fn inspect( + &self, + target: InspectionTarget, + ctx: &InspectionContext, + ) -> Result { + let response = self + .client + .post(&self.endpoint) + .json(&InspectionRequestEnvelope { + target: &target, + context: ctx, + }) + .send() + .map_err(|error| miette!("request inspector call failed: {error}"))?; + if !response.status().is_success() { + return Err(miette!( + "request inspector returned non-success status {}", + response.status() + )); + } + response + .json::() + .map_err(|error| miette!("request inspector response decode failed: {error}")) + } +} + +pub fn request_inspector_from_env() -> Option> { + REQUEST_INSPECTOR.clone() +} + +#[allow(dead_code)] +pub(crate) enum HttpInspectionOutcome { + Allow, + Deny { + reason: String, + findings: Vec, + }, + Mutate { + findings: Vec, + }, +} + +pub(crate) fn inspect_http_request( + req: &mut L7Request, + ctx: &L7EvalContext, +) -> Result { + let Some(inspector) = ctx.request_inspector.as_ref() else { + return Ok(HttpInspectionOutcome::Allow); + }; + + let headers = parse_headers(&req.raw_header)?; + let decision = inspector.inspect( + InspectionTarget::HttpRequest { + method: req.action.clone(), + path: req.target.clone(), + headers, + body: Vec::new(), + }, + &InspectionContext::default(), + )?; + + match decision { + InspectionDecision::Allow => Ok(HttpInspectionOutcome::Allow), + InspectionDecision::Deny { reason, findings } => { + Ok(HttpInspectionOutcome::Deny { reason, findings }) + } + InspectionDecision::Mutate { target, findings } => match target { + InspectionTarget::HttpRequest { + method, + path, + headers, + .. + } => { + if method != req.action { + return Err(miette!( + "http inspection mutation attempted to rewrite method after policy evaluation" + )); + } + if path != req.target { + return Err(miette!( + "http inspection mutation attempted to rewrite path after policy evaluation" + )); + } + rewrite_headers(req, &headers)?; + Ok(HttpInspectionOutcome::Mutate { findings }) + } + other => Err(miette!( + "http inspection returned non-http target for mutation: {other:?}" + )), + }, + } +} + +fn header_end(raw: &[u8]) -> Result { + raw.windows(4) + .position(|window| window == b"\r\n\r\n") + .map(|index| index + 4) + .ok_or_else(|| miette!("http request headers missing CRLF terminator")) +} + +fn parse_request_line(raw: &[u8]) -> Result<(String, String, String)> { + let eol = raw + .windows(2) + .position(|window| window == b"\r\n") + .ok_or_else(|| miette!("http request line missing CRLF"))?; + let line = std::str::from_utf8(&raw[..eol]).map_err(|error| miette!(error.to_string()))?; + let mut parts = line.split_whitespace(); + let method = parts + .next() + .ok_or_else(|| miette!("missing http method"))? + .to_string(); + let path = parts + .next() + .ok_or_else(|| miette!("missing http target"))? + .to_string(); + let version = parts + .next() + .ok_or_else(|| miette!("missing http version"))? + .to_string(); + Ok((method, path, version)) +} + +fn parse_headers(raw: &[u8]) -> Result> { + let header_end = header_end(raw)?; + let header_str = + std::str::from_utf8(&raw[..header_end]).map_err(|error| miette!(error.to_string()))?; + let mut headers = Vec::new(); + for line in header_str.lines().skip(1) { + let line = line.trim_end_matches('\r'); + if line.is_empty() { + break; + } + let Some((name, value)) = line.split_once(':') else { + return Err(miette!("malformed http header line")); + }; + headers.push((name.trim().to_string(), value.trim().to_string())); + } + Ok(headers) +} + +fn rewrite_headers(req: &mut L7Request, headers: &[(String, String)]) -> Result<()> { + let header_end = header_end(&req.raw_header)?; + let (_, _, version) = parse_request_line(&req.raw_header)?; + let overflow = req.raw_header[header_end..].to_vec(); + + let mut raw = format!("{} {} {}\r\n", req.action, req.target, version).into_bytes(); + for (name, value) in headers { + raw.extend_from_slice(name.as_bytes()); + raw.extend_from_slice(b": "); + raw.extend_from_slice(value.as_bytes()); + raw.extend_from_slice(b"\r\n"); + } + raw.extend_from_slice(b"\r\n"); + raw.extend_from_slice(&overflow); + req.raw_header = raw; + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use std::io::{Read, Write}; + use std::net::TcpListener; + use std::thread; + + fn spawn_demo_server(response_body: &'static str) -> String { + let listener = TcpListener::bind("127.0.0.1:0").expect("bind demo listener"); + let addr = listener.local_addr().expect("local addr"); + thread::spawn(move || { + let (mut stream, _) = listener.accept().expect("accept inspector client"); + let mut buffer = [0u8; 4096]; + let _ = stream.read(&mut buffer).expect("read request"); + let response = format!( + "HTTP/1.1 200 OK\r\ncontent-type: application/json\r\ncontent-length: {}\r\n\r\n{}", + response_body.len(), + response_body + ); + stream + .write_all(response.as_bytes()) + .expect("write response"); + }); + format!("http://{addr}") + } + + #[test] + fn demo_inspector_client_round_trips_mutation() { + let endpoint = spawn_demo_server( + r#"{"kind":"mutate","target":{"kind":"http_request","method":"GET","path":"/demo","headers":[["x-inspected","true"]],"body":[]},"findings":[{"code":"header_injected","message":"added demo header"}]}"#, + ); + let client = DemoInspectorClient { + endpoint, + client: reqwest::blocking::Client::builder() + .timeout(std::time::Duration::from_secs(5)) + .build() + .expect("client"), + }; + + let decision = client + .inspect( + InspectionTarget::HttpRequest { + method: "GET".to_string(), + path: "/demo".to_string(), + headers: Vec::new(), + body: Vec::new(), + }, + &InspectionContext::default(), + ) + .expect("inspection should succeed"); + + match decision { + InspectionDecision::Mutate { target, findings } => { + assert_eq!( + findings, + vec![Finding { + code: "header_injected".to_string(), + message: "added demo header".to_string(), + }] + ); + assert_eq!( + target, + InspectionTarget::HttpRequest { + method: "GET".to_string(), + path: "/demo".to_string(), + headers: vec![("x-inspected".to_string(), "true".to_string())], + body: Vec::new(), + } + ); + } + other => panic!("expected mutation decision, got {other:?}"), + } + } +} diff --git a/crates/openshell-supervisor-network/src/l7/graphql.rs b/crates/openshell-supervisor-network/src/l7/graphql.rs index 82c35720e..f5ca728ac 100644 --- a/crates/openshell-supervisor-network/src/l7/graphql.rs +++ b/crates/openshell-supervisor-network/src/l7/graphql.rs @@ -804,6 +804,7 @@ network_policies: activity_tx: None, dynamic_credentials: None, token_grant_resolver: None, + request_inspector: None, }; let request_info = crate::l7::L7RequestInfo { action: req.action, diff --git a/crates/openshell-supervisor-network/src/l7/relay.rs b/crates/openshell-supervisor-network/src/l7/relay.rs index 3054a4530..7671a1425 100644 --- a/crates/openshell-supervisor-network/src/l7/relay.rs +++ b/crates/openshell-supervisor-network/src/l7/relay.rs @@ -11,6 +11,7 @@ use crate::l7::provider::{L7Provider, RelayOutcome}; use crate::l7::rest::WebSocketExtensionMode; use crate::l7::{EnforcementMode, L7EndpointConfig, L7Protocol, L7RequestInfo}; use crate::opa::{PolicyGenerationGuard, TunnelPolicyEngine}; +use crate::{inspection, inspection::HttpInspectionOutcome}; use miette::{IntoDiagnostic, Result, miette}; use openshell_core::activity::{ActivitySender, try_record_activity}; use openshell_core::secrets::{self, SecretResolver}; @@ -51,6 +52,9 @@ pub struct L7EvalContext { /// Dynamic token grant resolver for endpoint-bound credentials. pub(crate) token_grant_resolver: Option>, + /// Optional request inspector invoked on parsed HTTP requests before + /// credential or token-grant injection. + pub request_inspector: Option>, } #[derive(Default)] @@ -356,6 +360,28 @@ where let _ = &eval_target; if allowed || (config.enforcement == EnforcementMode::Audit && !force_deny) { + match inspection::inspect_http_request(&mut req, ctx)? { + HttpInspectionOutcome::Allow => {} + HttpInspectionOutcome::Mutate { .. } => {} + HttpInspectionOutcome::Deny { reason, .. } => { + crate::l7::rest::RestProvider::default() + .deny_with_redacted_target( + &req, + &ctx.policy_name, + &reason, + client, + Some(&redacted_target), + Some(crate::l7::rest::DenyResponseContext { + host: Some(&ctx.host), + port: Some(ctx.port), + binary: Some(&ctx.binary_path), + }), + ) + .await?; + return Ok(()); + } + } + let outcome = crate::l7::rest::relay_http_request_with_options_guarded( &req, client, @@ -780,6 +806,29 @@ where let _ = &eval_target; if allowed || config.enforcement == EnforcementMode::Audit { + let mut req = req; + match inspection::inspect_http_request(&mut req, ctx)? { + HttpInspectionOutcome::Allow => {} + HttpInspectionOutcome::Mutate { .. } => {} + HttpInspectionOutcome::Deny { reason, .. } => { + provider + .deny_with_redacted_target( + &req, + &ctx.policy_name, + &reason, + client, + Some(&redacted_target), + Some(crate::l7::rest::DenyResponseContext { + host: Some(&ctx.host), + port: Some(ctx.port), + binary: Some(&ctx.binary_path), + }), + ) + .await?; + return Ok(()); + } + } + let req_with_auth = match crate::l7::token_grant_injection::inject_if_needed(req, ctx).await { Ok(req) => req, @@ -1221,7 +1270,7 @@ where } // Read next request from client. - let req = match provider.parse_request(client).await { + let mut req = match provider.parse_request(client).await { Ok(Some(req)) => req, Ok(None) => break, // Client closed connection. Err(e) => { @@ -1284,6 +1333,17 @@ where ocsf_emit!(event); } + match inspection::inspect_http_request(&mut req, ctx)? { + HttpInspectionOutcome::Allow => {} + HttpInspectionOutcome::Mutate { .. } => {} + HttpInspectionOutcome::Deny { reason, .. } => { + provider + .deny(&req, &ctx.policy_name, &reason, client) + .await?; + return Ok(()); + } + } + let req_with_auth = match crate::l7::token_grant_injection::inject_if_needed(req, ctx).await { Ok(req) => req, @@ -1354,8 +1414,12 @@ where #[cfg(test)] mod tests { use super::*; + use crate::inspection::{ + Finding, InspectionContext, InspectionDecision, InspectionTarget, Inspector, + }; use crate::opa::{NetworkInput, OpaEngine}; use std::path::PathBuf; + use std::sync::Arc; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt}; const TEST_POLICY: &str = include_str!("../../data/sandbox-policy.rego"); @@ -1424,6 +1488,7 @@ network_policies: activity_tx: None, dynamic_credentials: Some(fixture.dynamic_credentials()), token_grant_resolver: Some(fixture.resolver()), + request_inspector: None, }; (config, tunnel_engine, ctx, fixture) @@ -1467,6 +1532,7 @@ network_policies: activity_tx: None, dynamic_credentials: Some(fixture.dynamic_credentials()), token_grant_resolver: Some(fixture.resolver()), + request_inspector: None, }; (generation_guard, ctx, fixture) @@ -1482,6 +1548,61 @@ network_policies: .count() } + struct HeaderMutationInspector; + + impl Inspector for HeaderMutationInspector { + fn inspect( + &self, + target: InspectionTarget, + _ctx: &InspectionContext, + ) -> Result { + match target { + InspectionTarget::HttpRequest { + method, + path, + mut headers, + body, + } => { + headers.push(("X-Inspected".to_string(), "true".to_string())); + Ok(InspectionDecision::Mutate { + target: InspectionTarget::HttpRequest { + method, + path, + headers, + body, + }, + findings: vec![Finding { + code: "header_added".to_string(), + message: "inspector added header".to_string(), + }], + }) + } + other => Err(miette!("unexpected inspection target: {other:?}")), + } + } + } + + struct DenyInspector; + + impl Inspector for DenyInspector { + fn inspect( + &self, + target: InspectionTarget, + _ctx: &InspectionContext, + ) -> Result { + match target { + InspectionTarget::HttpRequest { .. } => Ok(InspectionDecision::Deny { + reason: "blocked by test inspector".to_string(), + findings: vec![Finding { + code: "blocked".to_string(), + message: "inspector denied request".to_string(), + }], + }), + other => Err(miette!("unexpected inspection target: {other:?}")), + } + } + } + #[test] fn parse_rejection_detail_adds_l7_hint_for_encoded_slash() { let detail = parse_rejection_detail( @@ -1743,6 +1864,106 @@ network_policies: fixture.assert_one_request("api.example.test\t8080\t/v1/**\tprovider:access_token"); } + #[tokio::test] + async fn passthrough_relay_inspector_denies_before_upstream_forward() { + let (generation_guard, mut ctx, _fixture) = + passthrough_token_grant_relay_context(Ok("grant-token")); + ctx.request_inspector = Some(Arc::new(DenyInspector)); + let (mut app, mut relay_client) = tokio::io::duplex(8192); + let (mut relay_upstream, mut upstream) = tokio::io::duplex(8192); + let relay = tokio::spawn(async move { + relay_passthrough_with_credentials( + &mut relay_client, + &mut relay_upstream, + &ctx, + &generation_guard, + ) + .await + }); + + app.write_all( + b"GET /v1/projects HTTP/1.1\r\nHost: api.example.test\r\nConnection: close\r\n\r\n", + ) + .await + .unwrap(); + + let mut client_response = [0u8; 512]; + let n = tokio::time::timeout( + std::time::Duration::from_secs(1), + app.read(&mut client_response), + ) + .await + .expect("deny response should reach client") + .unwrap(); + let client_response = String::from_utf8_lossy(&client_response[..n]); + assert!(client_response.contains("403 Forbidden")); + assert!(client_response.contains("blocked by test inspector")); + + let mut upstream_request = [0u8; 128]; + let n = tokio::time::timeout( + std::time::Duration::from_secs(1), + upstream.read(&mut upstream_request), + ) + .await + .expect("upstream should close without forwarded data") + .unwrap(); + assert_eq!(n, 0, "denied request must not reach upstream"); + + tokio::time::timeout(std::time::Duration::from_secs(1), relay) + .await + .expect("relay should finish") + .unwrap() + .unwrap(); + } + + #[tokio::test] + async fn l7_rest_relay_inspector_mutates_headers_before_forwarding() { + let (config, tunnel_engine, mut ctx, _fixture) = + rest_token_grant_relay_context(Ok("grant-token")); + ctx.request_inspector = Some(Arc::new(HeaderMutationInspector)); + let (mut app, mut relay_client) = tokio::io::duplex(8192); + let (mut relay_upstream, mut upstream) = tokio::io::duplex(8192); + let relay = tokio::spawn(async move { + relay_with_inspection( + &config, + tunnel_engine, + &mut relay_client, + &mut relay_upstream, + &ctx, + ) + .await + }); + + app.write_all( + b"GET /v1/projects HTTP/1.1\r\nHost: api.example.test\r\nConnection: close\r\n\r\n", + ) + .await + .unwrap(); + + let mut upstream_request = [0u8; 1024]; + let n = tokio::time::timeout( + std::time::Duration::from_secs(1), + upstream.read(&mut upstream_request), + ) + .await + .expect("request should reach upstream") + .unwrap(); + let upstream_request = String::from_utf8_lossy(&upstream_request[..n]); + assert!(upstream_request.contains("X-Inspected: true\r\n")); + assert!(upstream_request.contains("Authorization: Bearer grant-token\r\n")); + + upstream + .write_all(b"HTTP/1.1 204 No Content\r\nContent-Length: 0\r\nConnection: close\r\n\r\n") + .await + .unwrap(); + + tokio::time::timeout(std::time::Duration::from_secs(1), relay) + .await + .expect("relay should finish") + .unwrap() + .unwrap(); + } + #[test] fn websocket_text_policy_requires_explicit_message_rule() { let data = r#" @@ -1786,6 +2007,7 @@ network_policies: activity_tx: None, dynamic_credentials: None, token_grant_resolver: None, + request_inspector: None, }; let request = L7RequestInfo { action: "WEBSOCKET_TEXT".into(), @@ -1844,6 +2066,7 @@ network_policies: activity_tx: None, dynamic_credentials: None, token_grant_resolver: None, + request_inspector: None, }; let (mut app, mut relay_client) = tokio::io::duplex(8192); @@ -1951,6 +2174,7 @@ network_policies: activity_tx: None, dynamic_credentials: None, token_grant_resolver: None, + request_inspector: None, }; let (mut app, mut relay_client) = tokio::io::duplex(8192); @@ -2071,6 +2295,7 @@ network_policies: activity_tx: None, dynamic_credentials: None, token_grant_resolver: None, + request_inspector: None, }; let (mut app, mut relay_client) = tokio::io::duplex(8192); @@ -2244,6 +2469,7 @@ network_policies: activity_tx: None, dynamic_credentials: None, token_grant_resolver: None, + request_inspector: None, }; let (mut app, mut relay_client) = tokio::io::duplex(8192); @@ -2334,6 +2560,7 @@ network_policies: activity_tx: None, dynamic_credentials: None, token_grant_resolver: None, + request_inspector: None, }; let (mut app, mut relay_client) = tokio::io::duplex(8192); diff --git a/crates/openshell-supervisor-network/src/l7/token_grant_injection.rs b/crates/openshell-supervisor-network/src/l7/token_grant_injection.rs index 0d7c18e99..a3e4d51f2 100644 --- a/crates/openshell-supervisor-network/src/l7/token_grant_injection.rs +++ b/crates/openshell-supervisor-network/src/l7/token_grant_injection.rs @@ -735,6 +735,7 @@ mod tests { activity_tx: None, dynamic_credentials: Some(fixture.dynamic_credentials()), token_grant_resolver: Some(fixture.resolver()), + request_inspector: None, }; let req = L7Request { action: "GET".to_string(), @@ -772,6 +773,7 @@ mod tests { activity_tx: None, dynamic_credentials: Some(fixture.dynamic_credentials()), token_grant_resolver: Some(fixture.resolver()), + request_inspector: None, }; let req = L7Request { action: "GET".to_string(), diff --git a/crates/openshell-supervisor-network/src/l7/websocket.rs b/crates/openshell-supervisor-network/src/l7/websocket.rs index 31aa35509..27e3c43fa 100644 --- a/crates/openshell-supervisor-network/src/l7/websocket.rs +++ b/crates/openshell-supervisor-network/src/l7/websocket.rs @@ -1273,6 +1273,7 @@ network_policies: activity_tx: None, dynamic_credentials: None, token_grant_resolver: None, + request_inspector: None, }; let (mut client_write, mut relay_read) = tokio::io::duplex(MAX_TEXT_MESSAGE_BYTES + 1024); let (mut relay_write, mut upstream_read) = tokio::io::duplex(MAX_TEXT_MESSAGE_BYTES + 1024); diff --git a/crates/openshell-supervisor-network/src/lib.rs b/crates/openshell-supervisor-network/src/lib.rs index a559a57e6..73733b85f 100644 --- a/crates/openshell-supervisor-network/src/lib.rs +++ b/crates/openshell-supervisor-network/src/lib.rs @@ -10,6 +10,7 @@ pub mod identity; pub mod inference_routes; +pub mod inspection; pub mod l7; pub mod opa; pub mod policy_local; diff --git a/crates/openshell-supervisor-network/src/proxy.rs b/crates/openshell-supervisor-network/src/proxy.rs index d467b022e..8af478e89 100644 --- a/crates/openshell-supervisor-network/src/proxy.rs +++ b/crates/openshell-supervisor-network/src/proxy.rs @@ -970,6 +970,7 @@ async fn handle_tcp_connection( token_grant_resolver: dynamic_credentials .as_ref() .map(|_| crate::l7::token_grant_injection::default_resolver()), + request_inspector: crate::inspection::request_inspector_from_env(), }; if effective_tls_skip { @@ -3215,6 +3216,7 @@ async fn handle_forward_proxy( token_grant_resolver: dynamic_credentials .as_ref() .map(|_| crate::l7::token_grant_injection::default_resolver()), + request_inspector: crate::inspection::request_inspector_from_env(), }; let mut l7_activity_pending = false; @@ -4293,6 +4295,7 @@ mod tests { activity_tx: None, dynamic_credentials: Some(fixture.dynamic_credentials()), token_grant_resolver: Some(fixture.resolver()), + request_inspector: None, }; (ctx, fixture) @@ -4351,6 +4354,7 @@ mod tests { activity_tx: None, dynamic_credentials: None, token_grant_resolver: None, + request_inspector: None, }; (config, tunnel_engine, ctx) } @@ -4519,6 +4523,7 @@ mod tests { activity_tx: None, dynamic_credentials: None, token_grant_resolver: None, + request_inspector: None, }; let query_params = std::collections::HashMap::new(); @@ -4562,6 +4567,7 @@ mod tests { activity_tx: None, dynamic_credentials: None, token_grant_resolver: None, + request_inspector: None, }; let query_params = std::collections::HashMap::new(); let config = websocket_l7_config(crate::l7::L7Protocol::Rest, false); diff --git a/examples/relay-inspection-demo/README.md b/examples/relay-inspection-demo/README.md new file mode 100644 index 000000000..99d5f7032 --- /dev/null +++ b/examples/relay-inspection-demo/README.md @@ -0,0 +1,32 @@ +# Relay Inspection Demo + +This directory holds a tiny shared inspector service for the Hermes + NeMo Relay + OpenShell demo. + +Start the service: + +```shell +python3 examples/relay-inspection-demo/inspector_service.py --host 127.0.0.1 --port 7777 +``` + +Point Hermes at it: + +```shell +export HERMES_NEMO_RELAY_INSPECTOR_URL=http://127.0.0.1:7777/inspect +``` + +Point OpenShell at it: + +```shell +export OPENSHELL_REQUEST_INSPECTOR_URL=http://127.0.0.1:7777/inspect +``` + +Demo behavior: + +- `llm_request` + - redacts `alice@example.com` from semantic request payloads +- `tool_request` + - denies inputs containing `DROP TABLE` + - annotates `{"query": "books"}` with `relay_inspected: true` +- `http_request` + - denies requests to `/blocked` + - injects `x-inspected: true` on allowed requests that do not already carry it diff --git a/examples/relay-inspection-demo/inspector_service.py b/examples/relay-inspection-demo/inspector_service.py new file mode 100644 index 000000000..e90c830a1 --- /dev/null +++ b/examples/relay-inspection-demo/inspector_service.py @@ -0,0 +1,153 @@ +#!/usr/bin/env python3 +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +import argparse +import json +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer + + +def redact_text(value: str) -> str: + return value.replace("alice@example.com", "[redacted-email]") + + +def redact_json(value): + if isinstance(value, str): + return redact_text(value) + if isinstance(value, list): + return [redact_json(item) for item in value] + if isinstance(value, dict): + return {key: redact_json(item) for key, item in value.items()} + return value + + +def inspect(payload: dict) -> dict: + target = payload.get("target") or {} + kind = target.get("kind") + + if kind == "llm_request": + request = target.get("request") + if isinstance(request, dict): + redacted = redact_json(request) + if redacted != request: + return { + "kind": "mutate", + "target": { + "kind": "llm_request", + "provider": target.get("provider", ""), + "request": redacted, + }, + "findings": [ + { + "code": "pii_redacted", + "message": "redacted email address from llm request", + } + ], + } + return {"kind": "allow"} + + if kind == "tool_request": + tool_name = target.get("tool_name", "") + tool_input = target.get("input") + encoded = json.dumps(tool_input, sort_keys=True) + if "DROP TABLE" in encoded: + return { + "kind": "deny", + "reason": f"blocked dangerous tool input for {tool_name}", + "findings": [ + { + "code": "dangerous_tool_input", + "message": "tool input matched blocked sql pattern", + } + ], + } + if isinstance(tool_input, dict) and tool_input.get("query") == "books": + mutated = dict(tool_input) + mutated["relay_inspected"] = True + return { + "kind": "mutate", + "target": { + "kind": "tool_request", + "tool_name": tool_name, + "input": mutated, + }, + "findings": [ + { + "code": "tool_request_annotated", + "message": "annotated tool request for demo visibility", + } + ], + } + return {"kind": "allow"} + + if kind == "http_request": + path = target.get("path", "") + headers = target.get("headers") or [] + if path == "/blocked": + return { + "kind": "deny", + "reason": "blocked outbound request by path", + "findings": [ + { + "code": "blocked_path", + "message": "http request matched blocked demo path", + } + ], + } + header_names = {name.lower() for name, _ in headers if isinstance(name, str)} + if "x-inspected" not in header_names: + mutated = list(headers) + mutated.append(("x-inspected", "true")) + return { + "kind": "mutate", + "target": { + "kind": "http_request", + "method": target.get("method", ""), + "path": path, + "headers": mutated, + "body": target.get("body", []), + }, + "findings": [ + { + "code": "header_injected", + "message": "added x-inspected header at runtime boundary", + } + ], + } + return {"kind": "allow"} + + return {"kind": "allow"} + + +class Handler(BaseHTTPRequestHandler): + def do_POST(self): + if self.path != "/inspect": + self.send_error(404) + return + length = int(self.headers.get("Content-Length", "0")) + body = self.rfile.read(length) + payload = json.loads(body or b"{}") + response = inspect(payload) + encoded = json.dumps(response).encode("utf-8") + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(encoded))) + self.end_headers() + self.wfile.write(encoded) + + def log_message(self, *_args): + return + + +def main() -> None: + parser = argparse.ArgumentParser() + parser.add_argument("--host", default="127.0.0.1") + parser.add_argument("--port", type=int, default=7777) + args = parser.parse_args() + server = ThreadingHTTPServer((args.host, args.port), Handler) + print(f"relay inspection demo listening on http://{args.host}:{args.port}/inspect") + server.serve_forever() + + +if __name__ == "__main__": + main()