Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 12 additions & 16 deletions server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,13 @@ pub fn run_server_with_registry(config: Config, registry: FilterRegistry, config
init_runtime_limits(&config.runtime);
warn_insecure_key_permissions(&config);

let state = build_server_state(&config, &registry);
let health_registry = build_health_registry(&config.clusters);
let state = build_server_state(&config, &registry, &health_registry);

info!("initializing server");
let mut server = PingoraServerRuntime::new(&config);
let _cert_shutdowns = register_protocols(&mut server, &config, &state.pipelines);
register_admin_endpoints(&mut server, &config, &state.health_registry, &state.kv_stores);
register_admin_endpoints(&mut server, &config, health_registry, &state.kv_stores);

let _watcher = spawn_watcher(config_path, config, registry, state);

Expand All @@ -101,38 +102,34 @@ pub fn run_server_with_registry(config: Config, registry: FilterRegistry, config
struct ServerState {
/// Resolved filter pipelines per listener.
pipelines: Arc<ListenerPipelines>,
/// Cluster health state.
health_registry: HealthRegistry,
/// KV store registry.
kv_stores: praxis_core::kv::KvStoreRegistry,
/// Health check cancellation token.
health_shutdown: Arc<Mutex<CancellationToken>>,
}

/// Build filter pipelines, health checks, and registries.
fn build_server_state(config: &Config, registry: &FilterRegistry) -> ServerState {
fn build_server_state(config: &Config, registry: &FilterRegistry, health_registry: &HealthRegistry) -> ServerState {
info!("building filter pipelines");
let health_registry = build_health_registry(&config.clusters);
let kv_stores = praxis_core::kv::KvStoreRegistry::new();
#[cfg(feature = "ai-inference")]
let response_stores = praxis_filter::ResponseStoreRegistry::new();

let pipelines = resolve_pipelines(
config,
registry,
&health_registry,
health_registry,
&kv_stores,
#[cfg(feature = "ai-inference")]
&response_stores,
)
.unwrap_or_else(|e| fatal(&e));

let health_shutdown = Arc::new(Mutex::new(CancellationToken::new()));
spawn_health_check_tasks(config, &health_registry, &health_shutdown);
spawn_health_check_tasks(config, Arc::clone(health_registry), &health_shutdown);

ServerState {
pipelines: Arc::new(pipelines),
health_registry,
kv_stores,
health_shutdown,
}
Expand All @@ -146,7 +143,7 @@ fn build_server_state(config: &Config, registry: &FilterRegistry) -> ServerState
fn register_protocols(
server: &mut PingoraServerRuntime,
config: &Config,
pipelines: &Arc<ListenerPipelines>,
pipelines: &ListenerPipelines,
) -> CertWatcherShutdowns {
let mut all_shutdowns = Vec::new();

Expand Down Expand Up @@ -177,10 +174,10 @@ fn spawn_watcher(
let path = config_path?;
let handle = crate::watcher::spawn_config_watcher(crate::watcher::WatcherParams {
config_path: path,
health_shutdown: Arc::clone(&state.health_shutdown),
health_shutdown: state.health_shutdown,
initial_config: config,
kv_stores: state.kv_stores,
pipelines: Arc::clone(&state.pipelines),
pipelines: state.pipelines,
registry: Arc::new(registry),
shutdown: CancellationToken::new(),
});
Expand All @@ -195,14 +192,14 @@ fn spawn_watcher(
fn register_admin_endpoints(
server: &mut PingoraServerRuntime,
config: &Config,
health_registry: &HealthRegistry,
health_registry: HealthRegistry,
kv_stores: &praxis_core::kv::KvStoreRegistry,
) {
if let Some(admin_addr) = &config.admin.address {
praxis_protocol::http::pingora::health::add_admin_endpoints_to_pingora_server(
server.server_mut(),
admin_addr,
Some(Arc::clone(health_registry)),
Some(health_registry),
Some(kv_stores.clone()),
config.admin.verbose,
);
Expand Down Expand Up @@ -386,7 +383,7 @@ fn warn_insecure_key_permissions(_config: &Config) {}
#[expect(clippy::expect_used, reason = "fatal")]
fn spawn_health_check_tasks(
config: &Config,
registry: &HealthRegistry,
registry: HealthRegistry,
health_shutdown: &Arc<Mutex<CancellationToken>>,
) {
if registry.is_empty() {
Expand All @@ -395,7 +392,6 @@ fn spawn_health_check_tasks(

let shutdown = health_shutdown.lock().expect("health shutdown lock").clone();
let clusters = config.clusters.clone();
let registry = Arc::clone(registry);

std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
Expand Down
Loading