diff --git a/server/src/server.rs b/server/src/server.rs index 1a29c548..588e7d93 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -79,12 +79,13 @@ pub fn run_server_with_registry(config: Config, registry: FilterRegistry, config init_runtime_limits(&config.runtime); warn_insecure_key_permissions(&config); - let state = build_server_state(&config, ®istry); + let health_registry = build_health_registry(&config.clusters); + let state = build_server_state(&config, ®istry, &health_registry); info!("initializing server"); let mut server = PingoraServerRuntime::new(&config); let _cert_shutdowns = register_protocols(&mut server, &config, &state.pipelines); - register_admin_endpoints(&mut server, &config, &state.health_registry, &state.kv_stores); + register_admin_endpoints(&mut server, &config, health_registry, &state.kv_stores); let _watcher = spawn_watcher(config_path, config, registry, state); @@ -101,8 +102,6 @@ pub fn run_server_with_registry(config: Config, registry: FilterRegistry, config struct ServerState { /// Resolved filter pipelines per listener. pipelines: Arc, - /// Cluster health state. - health_registry: HealthRegistry, /// KV store registry. kv_stores: praxis_core::kv::KvStoreRegistry, /// Health check cancellation token. @@ -110,9 +109,8 @@ struct ServerState { } /// Build filter pipelines, health checks, and registries. -fn build_server_state(config: &Config, registry: &FilterRegistry) -> ServerState { +fn build_server_state(config: &Config, registry: &FilterRegistry, health_registry: &HealthRegistry) -> ServerState { info!("building filter pipelines"); - let health_registry = build_health_registry(&config.clusters); let kv_stores = praxis_core::kv::KvStoreRegistry::new(); #[cfg(feature = "ai-inference")] let response_stores = praxis_filter::ResponseStoreRegistry::new(); @@ -120,7 +118,7 @@ fn build_server_state(config: &Config, registry: &FilterRegistry) -> ServerState let pipelines = resolve_pipelines( config, registry, - &health_registry, + health_registry, &kv_stores, #[cfg(feature = "ai-inference")] &response_stores, @@ -128,11 +126,10 @@ fn build_server_state(config: &Config, registry: &FilterRegistry) -> ServerState .unwrap_or_else(|e| fatal(&e)); let health_shutdown = Arc::new(Mutex::new(CancellationToken::new())); - spawn_health_check_tasks(config, &health_registry, &health_shutdown); + spawn_health_check_tasks(config, Arc::clone(health_registry), &health_shutdown); ServerState { pipelines: Arc::new(pipelines), - health_registry, kv_stores, health_shutdown, } @@ -146,7 +143,7 @@ fn build_server_state(config: &Config, registry: &FilterRegistry) -> ServerState fn register_protocols( server: &mut PingoraServerRuntime, config: &Config, - pipelines: &Arc, + pipelines: &ListenerPipelines, ) -> CertWatcherShutdowns { let mut all_shutdowns = Vec::new(); @@ -177,10 +174,10 @@ fn spawn_watcher( let path = config_path?; let handle = crate::watcher::spawn_config_watcher(crate::watcher::WatcherParams { config_path: path, - health_shutdown: Arc::clone(&state.health_shutdown), + health_shutdown: state.health_shutdown, initial_config: config, kv_stores: state.kv_stores, - pipelines: Arc::clone(&state.pipelines), + pipelines: state.pipelines, registry: Arc::new(registry), shutdown: CancellationToken::new(), }); @@ -195,14 +192,14 @@ fn spawn_watcher( fn register_admin_endpoints( server: &mut PingoraServerRuntime, config: &Config, - health_registry: &HealthRegistry, + health_registry: HealthRegistry, kv_stores: &praxis_core::kv::KvStoreRegistry, ) { if let Some(admin_addr) = &config.admin.address { praxis_protocol::http::pingora::health::add_admin_endpoints_to_pingora_server( server.server_mut(), admin_addr, - Some(Arc::clone(health_registry)), + Some(health_registry), Some(kv_stores.clone()), config.admin.verbose, ); @@ -386,7 +383,7 @@ fn warn_insecure_key_permissions(_config: &Config) {} #[expect(clippy::expect_used, reason = "fatal")] fn spawn_health_check_tasks( config: &Config, - registry: &HealthRegistry, + registry: HealthRegistry, health_shutdown: &Arc>, ) { if registry.is_empty() { @@ -395,7 +392,6 @@ fn spawn_health_check_tasks( let shutdown = health_shutdown.lock().expect("health shutdown lock").clone(); let clusters = config.clusters.clone(); - let registry = Arc::clone(registry); std::thread::spawn(move || { let rt = tokio::runtime::Builder::new_current_thread()