Skip to content

Commit 8e21407

Browse files
ZhiXiao-Linclaude
andcommitted
perf: reduce per-request allocation pressure in hot path
- Async access log via unbounded channel (JSON serialization off hot path) - Replace header_map HashMap with direct http::HeaderMap access (~20 fewer String allocs per request); update RouterTable::match_request and Rule::matches signatures accordingly - reqwest pool: tcp_nodelay(true), pool_idle_timeout(90s) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent cd21555 commit 8e21407

7 files changed

Lines changed: 130 additions & 71 deletions

File tree

src/entrypoint.rs

Lines changed: 21 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,10 @@ pub struct GatewayState {
9090
pub mirrors: HashMap<String, Arc<crate::service::TrafficMirror>>,
9191
/// Failover selectors: service_name → FailoverSelector
9292
pub failovers: HashMap<String, Arc<crate::service::FailoverSelector>>,
93-
/// Structured access log
93+
/// Structured access log (counter + background task target)
9494
pub access_log: Arc<crate::observability::access_log::AccessLog>,
95+
/// Channel for fire-and-forget log entries — background task does JSON + tracing
96+
pub log_tx: tokio::sync::mpsc::UnboundedSender<crate::observability::access_log::AccessLogEntry>,
9597
/// Sticky session managers (only for services with sticky config)
9698
pub sticky_managers: HashMap<String, Arc<StickySessionManager>>,
9799
/// Passive health checkers for all services
@@ -327,7 +329,7 @@ async fn start_tcp_entrypoint(
327329
tokio::spawn(async move {
328330
let _permit = permit;
329331

330-
let headers = HashMap::new();
332+
let headers = http::HeaderMap::new();
331333
if let Some(route) = state
332334
.router_table
333335
.match_request(None, "/", "TCP", &headers, &ep_name)
@@ -381,7 +383,7 @@ async fn start_udp_entrypoint(
381383
max_sessions: Option<usize>,
382384
state: Arc<GatewayState>,
383385
) -> Result<tokio::task::JoinHandle<()>> {
384-
let headers = HashMap::new();
386+
let headers = http::HeaderMap::new();
385387
let upstream_addr = state
386388
.router_table
387389
.match_request(None, "/", "UDP", &headers, &name)
@@ -446,14 +448,6 @@ async fn handle_http_request(
446448
let method_str = req.method().as_str().to_string();
447449
let uri = req.uri().clone();
448450

449-
// Collect headers into a plain map for routing and middleware context.
450-
let mut header_map = HashMap::new();
451-
for (key, value) in req.headers().iter() {
452-
if let Ok(v) = value.to_str() {
453-
header_map.insert(key.as_str().to_string(), v.to_string());
454-
}
455-
}
456-
457451
// Detect protocol from request headers.
458452
let is_ws = crate::proxy::websocket::is_websocket_upgrade(req.headers());
459453
let is_grpc = crate::proxy::grpc::is_grpc_request(req.headers());
@@ -462,7 +456,7 @@ async fn handle_http_request(
462456
let access_tracker = state.access_log.start_request();
463457

464458
// Extract incoming trace context and create a child span.
465-
let trace_ctx = crate::observability::tracing::extract_trace_context(&header_map)
459+
let trace_ctx = crate::observability::tracing::extract_trace_context(req.headers())
466460
.map(|ctx| ctx.child())
467461
.unwrap_or_else(crate::observability::tracing::TraceContext::new_root);
468462

@@ -471,7 +465,7 @@ async fn handle_http_request(
471465
host.as_deref(),
472466
&path,
473467
&method_str,
474-
&header_map,
468+
req.headers(),
475469
&entrypoint,
476470
) {
477471
Some(route) => route,
@@ -644,8 +638,10 @@ async fn handle_http_request(
644638
.sticky_managers
645639
.get(&route.service_name)
646640
.and_then(|mgr| {
647-
let session_id = header_map
641+
let session_id = req_parts
642+
.headers
648643
.get("cookie")
644+
.and_then(|v| v.to_str().ok())
649645
.and_then(|cookie| mgr.extract_session_id(cookie))
650646
.map(|s| s.to_string());
651647
match mgr.select_backend(session_id.as_deref(), lb.backends()) {
@@ -751,7 +747,7 @@ async fn handle_http_request(
751747
{
752748
Ok(grpc_resp) => {
753749
let status_code = grpc_resp.http_status.as_u16();
754-
state.access_log.record(&access_tracker.build_entry(
750+
let _ = state.log_tx.send(access_tracker.build_entry(
755751
remote_addr.ip().to_string(),
756752
method_str,
757753
path,
@@ -761,7 +757,7 @@ async fn handle_http_request(
761757
Some(backend.url.clone()),
762758
Some(route.router_name.clone()),
763759
Some(entrypoint),
764-
header_map.get("user-agent").cloned(),
760+
req_parts.headers.get("user-agent").and_then(|v| v.to_str().ok()).map(|s| s.to_string()),
765761
));
766762

767763
// Record passive health from HTTP status.
@@ -806,7 +802,7 @@ async fn handle_http_request(
806802
}
807803
Err(e) => {
808804
tracing::error!(error = %e, backend = backend.url, "gRPC proxy error");
809-
state.access_log.record(&access_tracker.build_entry(
805+
let _ = state.log_tx.send(access_tracker.build_entry(
810806
remote_addr.ip().to_string(),
811807
method_str,
812808
path,
@@ -816,7 +812,7 @@ async fn handle_http_request(
816812
Some(backend.url.clone()),
817813
Some(route.router_name.clone()),
818814
Some(entrypoint),
819-
header_map.get("user-agent").cloned(),
815+
req_parts.headers.get("user-agent").and_then(|v| v.to_str().ok()).map(|s| s.to_string()),
820816
));
821817
if let Some(phc) = state.passive_health.get(&route.service_name) {
822818
phc.record_error(&backend, 502);
@@ -864,7 +860,7 @@ async fn handle_http_request(
864860
{
865861
Ok(stream_resp) => {
866862
let status_code = stream_resp.status.as_u16();
867-
state.access_log.record(&access_tracker.build_entry(
863+
let _ = state.log_tx.send(access_tracker.build_entry(
868864
remote_addr.ip().to_string(),
869865
method_str,
870866
path,
@@ -874,7 +870,7 @@ async fn handle_http_request(
874870
Some(backend.url.clone()),
875871
Some(route.router_name.clone()),
876872
Some(entrypoint),
877-
header_map.get("user-agent").cloned(),
873+
req_parts.headers.get("user-agent").and_then(|v| v.to_str().ok()).map(|s| s.to_string()),
878874
));
879875

880876
if let Some(phc) = state.passive_health.get(&route.service_name) {
@@ -982,7 +978,7 @@ async fn handle_http_request(
982978
Ok(proxy_resp) => {
983979
let status_code = proxy_resp.status.as_u16();
984980

985-
state.access_log.record(&access_tracker.build_entry(
981+
let _ = state.log_tx.send(access_tracker.build_entry(
986982
remote_addr.ip().to_string(),
987983
method_str,
988984
path,
@@ -992,7 +988,7 @@ async fn handle_http_request(
992988
Some(backend.url.clone()),
993989
Some(route.router_name.clone()),
994990
Some(entrypoint),
995-
header_map.get("user-agent").cloned(),
991+
req_parts.headers.get("user-agent").and_then(|v| v.to_str().ok()).map(|s| s.to_string()),
996992
));
997993

998994
// Passive health: record 5xx errors.
@@ -1051,7 +1047,7 @@ async fn handle_http_request(
10511047
phc.record_error(&backend, 502);
10521048
}
10531049

1054-
state.access_log.record(&access_tracker.build_entry(
1050+
let _ = state.log_tx.send(access_tracker.build_entry(
10551051
remote_addr.ip().to_string(),
10561052
method_str,
10571053
path,
@@ -1061,7 +1057,7 @@ async fn handle_http_request(
10611057
Some(backend.url.clone()),
10621058
Some(route.router_name.clone()),
10631059
Some(entrypoint),
1064-
header_map.get("user-agent").cloned(),
1060+
req_parts.headers.get("user-agent").and_then(|v| v.to_str().ok()).map(|s| s.to_string()),
10651061
));
10661062

10671063
state.metrics.record_request(502, 0);
@@ -1130,6 +1126,7 @@ mod tests {
11301126
mirrors: HashMap::new(),
11311127
failovers: HashMap::new(),
11321128
access_log: Arc::new(crate::observability::access_log::AccessLog::new()),
1129+
log_tx: tokio::sync::mpsc::unbounded_channel().0,
11331130
sticky_managers: HashMap::new(),
11341131
passive_health: HashMap::new(),
11351132
metrics: Arc::new(crate::observability::metrics::GatewayMetrics::new()),

src/gateway.rs

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,10 @@ impl Gateway {
122122
let middleware_configs = Arc::new(config.middlewares.clone());
123123
let pipeline_cache = Arc::new(build_pipeline_cache(&config, &middleware_configs));
124124

125+
let access_log = Arc::new(crate::observability::access_log::AccessLog::new());
126+
let (log_tx, log_rx) = tokio::sync::mpsc::unbounded_channel();
127+
spawn_log_task(log_rx, access_log.clone());
128+
125129
let gw_state = Arc::new(entrypoint::GatewayState {
126130
router_table,
127131
service_registry,
@@ -132,7 +136,8 @@ impl Gateway {
132136
scaling: scaling_state,
133137
mirrors,
134138
failovers,
135-
access_log: Arc::new(crate::observability::access_log::AccessLog::new()),
139+
access_log,
140+
log_tx,
136141
sticky_managers: build_sticky_managers(&config),
137142
passive_health: build_passive_health(&config),
138143
metrics: self.metrics.clone(),
@@ -365,6 +370,10 @@ impl Gateway {
365370
let middleware_configs = Arc::new(new_config.middlewares.clone());
366371
let pipeline_cache = Arc::new(build_pipeline_cache(&new_config, &middleware_configs));
367372

373+
let access_log = Arc::new(crate::observability::access_log::AccessLog::new());
374+
let (log_tx, log_rx) = tokio::sync::mpsc::unbounded_channel();
375+
spawn_log_task(log_rx, access_log.clone());
376+
368377
let gw_state = Arc::new(entrypoint::GatewayState {
369378
router_table,
370379
service_registry,
@@ -375,7 +384,8 @@ impl Gateway {
375384
scaling: scaling_state,
376385
mirrors,
377386
failovers,
378-
access_log: Arc::new(crate::observability::access_log::AccessLog::new()),
387+
access_log,
388+
log_tx,
379389
sticky_managers: build_sticky_managers(&new_config),
380390
passive_health: build_passive_health(&new_config),
381391
metrics: self.metrics.clone(),
@@ -809,6 +819,19 @@ fn spawn_autoscaler(
809819
Some(handle)
810820
}
811821

822+
/// Spawn a background task that drains the access log channel and serializes entries.
823+
/// This keeps JSON serialization and tracing off the request hot path.
824+
fn spawn_log_task(
825+
mut rx: tokio::sync::mpsc::UnboundedReceiver<crate::observability::access_log::AccessLogEntry>,
826+
access_log: Arc<crate::observability::access_log::AccessLog>,
827+
) {
828+
tokio::spawn(async move {
829+
while let Some(entry) = rx.recv().await {
830+
access_log.record(&entry);
831+
}
832+
});
833+
}
834+
812835
/// Pre-compile middleware pipelines for all routers — avoids per-request Pipeline::from_config.
813836
fn build_pipeline_cache(
814837
config: &GatewayConfig,

src/observability/access_log.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ impl AccessLog {
5757
}
5858
}
5959

60-
/// Record and emit a log entry
60+
/// Record and emit a log entry (called from background logging task)
6161
pub fn record(&self, entry: &AccessLogEntry) {
6262
self.total_entries.fetch_add(1, Ordering::Relaxed);
6363
tracing::info!(
@@ -75,6 +75,13 @@ impl AccessLog {
7575
);
7676
}
7777

78+
/// Increment request counter only — for use with async logging channel.
79+
/// Callers send the entry to the log channel; a background task calls record().
80+
#[allow(dead_code)]
81+
pub fn count(&self) {
82+
self.total_entries.fetch_add(1, Ordering::Relaxed);
83+
}
84+
7885
/// Get total number of logged entries
7986
#[allow(dead_code)]
8087
pub fn total_entries(&self) -> u64 {

src/observability/tracing.rs

Lines changed: 43 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -184,32 +184,38 @@ pub enum PropagationFormat {
184184
B3,
185185
}
186186

187-
/// Extract trace context from request headers
188-
pub fn extract_trace_context(headers: &HashMap<String, String>) -> Option<TraceContext> {
187+
/// Extract trace context from request headers.
188+
///
189+
/// Accepts `&http::HeaderMap` directly — avoids the HashMap<String,String>
190+
/// allocation that was previously needed to convert hyper headers.
191+
pub fn extract_trace_context(headers: &http::HeaderMap) -> Option<TraceContext> {
192+
let hdr = |name: &str| -> Option<&str> {
193+
headers.get(name).and_then(|v| v.to_str().ok())
194+
};
195+
189196
// Try W3C traceparent first
190-
if let Some(traceparent) = headers.get(TRACEPARENT_HEADER) {
197+
if let Some(traceparent) = hdr(TRACEPARENT_HEADER) {
191198
if let Some(mut ctx) = TraceContext::from_traceparent(traceparent) {
192-
ctx.trace_state = headers.get(TRACESTATE_HEADER).cloned();
199+
ctx.trace_state = hdr(TRACESTATE_HEADER).map(|s| s.to_string());
193200
return Some(ctx);
194201
}
195202
}
196203

197204
// Try B3 single header
198-
if let Some(b3) = headers.get(B3_HEADER) {
205+
if let Some(b3) = hdr(B3_HEADER) {
199206
return TraceContext::from_b3_single(b3);
200207
}
201208

202209
// Try B3 multi-header
203-
if let Some(trace_id) = headers.get(B3_TRACE_ID_HEADER) {
204-
if let Some(span_id) = headers.get(B3_SPAN_ID_HEADER) {
205-
let sampled = headers
206-
.get(B3_SAMPLED_HEADER)
210+
if let Some(trace_id) = hdr(B3_TRACE_ID_HEADER) {
211+
if let Some(span_id) = hdr(B3_SPAN_ID_HEADER) {
212+
let sampled = hdr(B3_SAMPLED_HEADER)
207213
.map(|s| s == "1" || s == "true")
208214
.unwrap_or(true);
209215

210216
return Some(TraceContext {
211-
trace_id: trace_id.clone(),
212-
parent_span_id: span_id.clone(),
217+
trace_id: trace_id.to_string(),
218+
parent_span_id: span_id.to_string(),
213219
span_id: generate_span_id(),
214220
trace_flags: if sampled { 1 } else { 0 },
215221
trace_state: None,
@@ -542,12 +548,17 @@ mod tests {
542548

543549
#[test]
544550
fn test_extract_w3c() {
545-
let mut headers = HashMap::new();
551+
let mut headers = http::HeaderMap::new();
546552
headers.insert(
547-
"traceparent".to_string(),
548-
"00-0af7651916cd43dd8448eb211c80319c-00f067aa0ba902b7-01".to_string(),
553+
http::header::HeaderName::from_static("traceparent"),
554+
"00-0af7651916cd43dd8448eb211c80319c-00f067aa0ba902b7-01"
555+
.parse()
556+
.unwrap(),
557+
);
558+
headers.insert(
559+
http::header::HeaderName::from_static("tracestate"),
560+
"vendor=value".parse().unwrap(),
549561
);
550-
headers.insert("tracestate".to_string(), "vendor=value".to_string());
551562

552563
let ctx = extract_trace_context(&headers).unwrap();
553564
assert_eq!(ctx.trace_id, "0af7651916cd43dd8448eb211c80319c");
@@ -556,10 +567,12 @@ mod tests {
556567

557568
#[test]
558569
fn test_extract_b3_single() {
559-
let mut headers = HashMap::new();
570+
let mut headers = http::HeaderMap::new();
560571
headers.insert(
561-
"b3".to_string(),
562-
"463ac35c9f6413ad48485a3953bb6124-0020000000000001-1".to_string(),
572+
http::header::HeaderName::from_static("b3"),
573+
"463ac35c9f6413ad48485a3953bb6124-0020000000000001-1"
574+
.parse()
575+
.unwrap(),
563576
);
564577

565578
let ctx = extract_trace_context(&headers).unwrap();
@@ -568,13 +581,19 @@ mod tests {
568581

569582
#[test]
570583
fn test_extract_b3_multi() {
571-
let mut headers = HashMap::new();
584+
let mut headers = http::HeaderMap::new();
585+
headers.insert(
586+
http::header::HeaderName::from_static("x-b3-traceid"),
587+
"463ac35c9f6413ad48485a3953bb6124".parse().unwrap(),
588+
);
589+
headers.insert(
590+
http::header::HeaderName::from_static("x-b3-spanid"),
591+
"0020000000000001".parse().unwrap(),
592+
);
572593
headers.insert(
573-
"x-b3-traceid".to_string(),
574-
"463ac35c9f6413ad48485a3953bb6124".to_string(),
594+
http::header::HeaderName::from_static("x-b3-sampled"),
595+
"1".parse().unwrap(),
575596
);
576-
headers.insert("x-b3-spanid".to_string(), "0020000000000001".to_string());
577-
headers.insert("x-b3-sampled".to_string(), "1".to_string());
578597

579598
let ctx = extract_trace_context(&headers).unwrap();
580599
assert_eq!(ctx.trace_id, "463ac35c9f6413ad48485a3953bb6124");
@@ -583,7 +602,7 @@ mod tests {
583602

584603
#[test]
585604
fn test_extract_no_trace() {
586-
let headers = HashMap::new();
605+
let headers = http::HeaderMap::new();
587606
assert!(extract_trace_context(&headers).is_none());
588607
}
589608

src/proxy/http_proxy.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ impl HttpProxy {
2323
let client = reqwest::Client::builder()
2424
.timeout(timeout)
2525
.pool_max_idle_per_host(100)
26+
.pool_idle_timeout(Duration::from_secs(90))
27+
.tcp_nodelay(true)
2628
.build()
2729
.unwrap_or_default();
2830

0 commit comments

Comments
 (0)