diff --git a/Cargo.lock b/Cargo.lock index 0665501..8ceb285 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -91,6 +91,52 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" +[[package]] +name = "axum" +version = "0.8.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31b698c5f9a010f6573133b09e0de5408834d0c82f8d7475a89fc1867a71cd90" +dependencies = [ + "axum-core", + "bytes", + "futures-util", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-util", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "serde_core", + "sync_wrapper", + "tokio", + "tower", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08c78f31d7b1291f7ee735c1c6780ccde7785daae9a9206026862dab7d8792d1" +dependencies = [ + "bytes", + "futures-core", + "http", + "http-body", + "http-body-util", + "mime", + "pin-project-lite", + "sync_wrapper", + "tower-layer", + "tower-service", +] + [[package]] name = "backon" version = "1.6.0" @@ -860,8 +906,9 @@ dependencies = [ [[package]] name = "koprs" -version = "0.9.2" +version = "0.9.5" dependencies = [ + "axum", "bytes", "chrono", "futures", @@ -872,6 +919,7 @@ dependencies = [ "k8s-openapi", "kube", "kube-runtime", + "prometheus", "schemars", "serde", "serde_json", @@ -881,19 +929,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "koprs-derive" -version = "0.1.0" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - -[[package]] -name = "koprs-gen" -version = "0.1.0" - [[package]] name = "kube" version = "1.1.0" @@ -1040,6 +1075,12 @@ dependencies = [ "regex-automata", ] +[[package]] +name = "matchit" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3" + [[package]] name = "memchr" version = "2.8.0" @@ -1063,6 +1104,24 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "multicontroller-operator" +version = "0.1.0" +dependencies = [ + "anyhow", + "chrono", + "k8s-openapi", + "koprs", + "kube", + "schemars", + "serde", + "serde_json", + "thiserror 1.0.69", + "tokio", + "tracing", + "tracing-subscriber", +] + [[package]] name = "nu-ansi-term" version = "0.50.3" @@ -1231,6 +1290,20 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prometheus" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ca5326d8d0b950a9acd87e6a3f94745394f62e4dae1b1ee22b2bc0c394af43a" +dependencies = [ + "cfg-if", + "fnv", + "lazy_static", + "memchr", + "parking_lot", + "thiserror 2.0.18", +] + [[package]] name = "quote" version = "1.0.45" diff --git a/Cargo.toml b/Cargo.toml index 6a2c5be..c6a8611 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,9 +1,9 @@ [workspace] -members = ["crates/*", "examples/configmapsync"] +members = ["crates/*", "examples/configmapsync", "examples/multicontroller"] resolver = "2" [workspace.package] -version = "0.9.2" +version = "0.9.5" edition = "2024" license = "MIT" repository = "https://github.com/bartvanbenthem/koprs" @@ -14,8 +14,4 @@ kube = { version = "1.1.0", features = ["runtime", "derive"] } kube-runtime = "1.1.0" tokio = { version = "1", features = ["full"] } serde = { version = "1", features = ["derive"] } -schemars = "0.8" -syn = { version = "2", features = ["full"] } -quote = "1" -proc-macro2 = "1" -inventory = "0.3" \ No newline at end of file +schemars = "0.8" \ No newline at end of file diff --git a/README.md b/README.md index 22bf554..e87f300 100644 --- a/README.md +++ b/README.md @@ -10,16 +10,13 @@ The Operator SDK for Rust is a framework that uses [`kube`](https://github.com/k * Tools for scaffolding and code generation to bootstrap a new project fast * Extensions to cover common Operator use cases -This repository contains the core framework, its proc macros, and the manifest generation -tooling for CRDs and RBAC. +This repository contains the core framework. ## Crates | Crate | Description | Docs | |-------|-------------|------| -| [`koprs`](./crates/koprs) | Core generic runtime framework | [![docs.rs](https://img.shields.io/docsrs/koprs)](https://docs.rs/koprs) [![crates.io](https://img.shields.io/crates/v/koprs)](https://crates.io/crates/koprs) | -| [`koprs-derive`](./crates/koprs-derive) | Proc macros — implementation detail | [![docs.rs](https://img.shields.io/docsrs/koprs-derive)](https://docs.rs/koprs-derive) [![crates.io](https://img.shields.io/crates/v/koprs-derive)](https://crates.io/crates/koprs-derive) | -| [`koprs-gen`](./crates/koprs-gen) | CRD and RBAC manifest generation CLI | [![docs.rs](https://img.shields.io/docsrs/koprs-gen)](https://docs.rs/koprs-gen) [![crates.io](https://img.shields.io/crates/v/koprs-gen)](https://crates.io/crates/koprs-gen) | +| [`koprs`](./crates/koprs) ([README](./crates/koprs/README.md)) | Core generic runtime framework | [![docs.rs](https://img.shields.io/docsrs/koprs)](https://docs.rs/koprs) [![crates.io](https://img.shields.io/crates/v/koprs)](https://crates.io/crates/koprs) | ## Workspace layout @@ -27,22 +24,105 @@ tooling for CRDs and RBAC. koprs/ ├── Cargo.toml # workspace manifest ├── Cargo.lock -└── crates/ - ├── koprs/ # core library - ├── koprs-derive/ # proc macros - └── koprs-gen/ # codegen CLI +├── crates/ +│ └── koprs/ # core library +└── examples/ + ├── configmapsync/ # single CRD, single controller + └── multicontroller/ # multiple CRDs, multiple controllers in one operator ``` ## Getting started If you are here to build a Kubernetes operator, you want [`koprs`](./crates/koprs). Start there. -For a working end-to-end example, see the [configmapsync operator](./examples/configmapsync/README.md). - +For working end-to-end examples, see: + +* [configmapsync](./examples/configmapsync/README.md) — a single CRD reconciled by one controller; the best starting point. +* [multicontroller](./examples/multicontroller/README.md) — multiple CRDs (`SecretSync`, `ServiceAccountSync`) each reconciled by its own controller, run side by side in one operator binary. + +### Minimal example + +A `koprs` operator boils down to three pieces: a CRD type, a [`Reconciler`](./crates/koprs/src/controller.rs), +and a [`ControllerBuilder`](./crates/koprs/src/controller.rs) that wires it all together. + +```rust,no_run +use std::sync::Arc; +use std::time::Duration; + +use kube::{Api, Client, CustomResource, ResourceExt}; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +use koprs::controller::{Action, Context, ControllerBuilder, Reconciler}; +use koprs::error::KubeGenericError; +use koprs::status::patch_status_namespaced; + +/// The `Greeting` CRD — `kube::CustomResource` derives the type, its CRD spec, +/// and the generated `Greeting` struct (spec + status + metadata) in one go. +#[derive(CustomResource, Debug, Clone, Serialize, Deserialize, Default, JsonSchema)] +#[kube( + group = "example.io", + version = "v1alpha1", + kind = "Greeting", + namespaced, + status = "GreetingStatus" +)] +pub struct GreetingSpec { + pub message: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize, Default, JsonSchema)] +pub struct GreetingStatus { + pub ready: bool, +} + +struct GreetingReconciler; + +impl Reconciler for GreetingReconciler { + type Error = KubeGenericError; + + async fn reconcile(&self, cr: Arc, ctx: Arc) -> Result { + let name = cr.name_any(); + let namespace = cr + .namespace() + .ok_or(KubeGenericError::MissingMetadata("namespace".into()))?; + + // Mark the resource ready — replace with your own reconciliation logic. + patch_status_namespaced::( + ctx.client.clone(), + &namespace, + &name, + GreetingStatus { ready: true }, + "greeting-operator", + ) + .await?; + + Ok(Action::requeue(Duration::from_secs(300))) + } + // error_policy defaults to requeue(30s) — override it for custom backoff +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let client = Client::try_default().await?; + let api: Api = Api::all(client.clone()); + let ctx = Context::new(client); + + ControllerBuilder::new(api) + .health_port(8080) + .graceful_shutdown() + .run(GreetingReconciler, ctx) + .await?; + + Ok(()) +} +``` -If you want to generate CRD or RBAC manifests from your annotated Rust types, you want [`koprs-gen`](./crates/koprs-gen). -> [!WARNING] -> `koprs-gen` is under development and not yet available. +For finalizers, owned-resource reconciliation, garbage collection, events, and leader +election, see the [configmapsync operator](./examples/configmapsync/README.md) — it +walks through the same building blocks in a complete, runnable operator. To see how to +run several CRDs and controllers from a single operator binary, see +[multicontroller](./examples/multicontroller/README.md). ## Contributing @@ -96,19 +176,15 @@ unit tests, integration tests, coverage, release build, docs, and audit. ### Publishing -`publish.sh` handles the full pre-flight and publishes all three crates to crates.io in -dependency order — `koprs-derive`, `koprs`, `koprs-gen`. +`publish.sh` handles the full pre-flight and publishes the crate to crates.io. ```bash -./scripts/publish.sh # full pre-flight + publish all crates +./scripts/publish.sh # full pre-flight + publish ./scripts/publish.sh --dry-run # stop before cargo publish ./scripts/publish.sh --skip-ci # skip CI checks, publish only ./scripts/publish.sh --crate koprs # publish a single crate ``` -A 20 second delay is applied between each crate to allow crates.io to index before the -next crate resolves it as a registry dependency. - See the [CI script docs](./scripts/cargo-ci.sh) for the full list of flags. ## License diff --git a/crates/crates.md b/crates/crates.md new file mode 100644 index 0000000..94964b1 --- /dev/null +++ b/crates/crates.md @@ -0,0 +1 @@ +crates \ No newline at end of file diff --git a/crates/koprs-derive/.gitignore b/crates/koprs-derive/.gitignore deleted file mode 100644 index a5ff07f..0000000 --- a/crates/koprs-derive/.gitignore +++ /dev/null @@ -1,8 +0,0 @@ -/target - - -# Added by cargo -# -# already existing elements were commented out - -#/target diff --git a/crates/koprs-derive/Cargo.toml b/crates/koprs-derive/Cargo.toml deleted file mode 100644 index 5497fb6..0000000 --- a/crates/koprs-derive/Cargo.toml +++ /dev/null @@ -1,13 +0,0 @@ -[package] -name = "koprs-derive" -version = "0.1.0" -edition = "2024" -license = "MIT" - -[lib] -proc-macro = true - -[dependencies] -proc-macro2 = "1" -quote = "1" -syn = { version = "2", features = ["full"] } diff --git a/crates/koprs-derive/README.md b/crates/koprs-derive/README.md deleted file mode 100644 index 70e1b0f..0000000 --- a/crates/koprs-derive/README.md +++ /dev/null @@ -1,36 +0,0 @@ -# KOPRS Derive - -Procedural macros for [`koprs`](https://crates.io/crates/koprs), the Kubernetes operator library for Rust. - -This crate is an **implementation detail** of `koprs`. If you are building a Kubernetes operator, depend on `koprs` directly, it re-exports everything you need. - ---- - -## What it provides - -`koprs-derive` exposes the `#[derive(KoResource)]` macro. It generates the trait implementations required to make a custom CRD type work with `koprs`'s generic resource utilities: - -- `kube::Resource` (with `DynamicType = ()`) -- `k8s_openapi::Metadata` (with `Ty = ObjectMeta`) -- The `koprs` marker traits `NamespacedResource` / `ClusterResource` - -Without the macro you must implement these by hand. With it: - -```rust -use koprs_derive::KoResource; -use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta; -use serde::{Deserialize, Serialize}; - -#[derive(Clone, Debug, Serialize, Deserialize, KoResource)] -pub struct MyOperatorCrd { - pub metadata: ObjectMeta, - pub spec: MySpec, - pub status: Option, -} -``` - ---- - -## License - -MIT diff --git a/crates/koprs-derive/src/lib.rs b/crates/koprs-derive/src/lib.rs deleted file mode 100644 index 76d5341..0000000 --- a/crates/koprs-derive/src/lib.rs +++ /dev/null @@ -1,18 +0,0 @@ -extern crate proc_macro; - -use proc_macro::TokenStream; -use proc_macro2::TokenStream as TokenStream2; -use syn::DeriveInput; - -#[cfg(test)] -mod tests; - -pub(crate) fn expand_ko_resource(_input: &DeriveInput) -> TokenStream2 { - TokenStream2::new() -} - -#[proc_macro_derive(KoResource)] -pub fn ko_resource_derive(input: TokenStream) -> TokenStream { - let ast = syn::parse_macro_input!(input as DeriveInput); - expand_ko_resource(&ast).into() -} diff --git a/crates/koprs-derive/src/tests/ko_resource.rs b/crates/koprs-derive/src/tests/ko_resource.rs deleted file mode 100644 index 2f66920..0000000 --- a/crates/koprs-derive/src/tests/ko_resource.rs +++ /dev/null @@ -1,42 +0,0 @@ -// src/tests/ko_resource.rs -#[cfg(test)] -mod ko_resource_tests { - use proc_macro2::TokenStream; - use quote::quote; - - use crate::expand_ko_resource; - - fn parse(ts: TokenStream) -> syn::DeriveInput { - syn::parse2(ts).expect("failed to parse test input") - } - - #[test] - fn expand_produces_no_tokens_for_stub() { - let input = parse(quote! { - struct MyResource { - metadata: String, - } - }); - let output = expand_ko_resource(&input); - assert!(output.is_empty()); - } - - #[test] - fn expand_accepts_struct_with_named_fields() { - let input = parse(quote! { - struct Foo { - a: u32, - b: String, - } - }); - let output = expand_ko_resource(&input); - assert!(output.is_empty()); - } - - #[test] - fn expand_accepts_unit_struct() { - let input = parse(quote! { struct Bar; }); - let output = expand_ko_resource(&input); - assert!(output.is_empty()); - } -} diff --git a/crates/koprs-derive/src/tests/mod.rs b/crates/koprs-derive/src/tests/mod.rs deleted file mode 100644 index b59d81a..0000000 --- a/crates/koprs-derive/src/tests/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -// src/tests/mod.rs -mod ko_resource; diff --git a/crates/koprs-gen/.gitignore b/crates/koprs-gen/.gitignore deleted file mode 100644 index a5ff07f..0000000 --- a/crates/koprs-gen/.gitignore +++ /dev/null @@ -1,8 +0,0 @@ -/target - - -# Added by cargo -# -# already existing elements were commented out - -#/target diff --git a/crates/koprs-gen/Cargo.toml b/crates/koprs-gen/Cargo.toml deleted file mode 100644 index f5c165c..0000000 --- a/crates/koprs-gen/Cargo.toml +++ /dev/null @@ -1,7 +0,0 @@ -[package] -name = "koprs-gen" -version = "0.1.0" -edition = "2024" -license = "MIT" - -[dependencies] diff --git a/crates/koprs-gen/src/main.rs b/crates/koprs-gen/src/main.rs deleted file mode 100644 index e7a11a9..0000000 --- a/crates/koprs-gen/src/main.rs +++ /dev/null @@ -1,3 +0,0 @@ -fn main() { - println!("Hello, world!"); -} diff --git a/crates/koprs/Cargo.toml b/crates/koprs/Cargo.toml index 1b7b3c3..4a883b8 100644 --- a/crates/koprs/Cargo.toml +++ b/crates/koprs/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "koprs" -version = "0.9.2" +version = "0.9.5" edition = "2024" description = "A reusable, ergonomic library that streamlines Kubernetes operator development, allowing developers to build controllers with significantly less code." license = "MIT" @@ -29,6 +29,8 @@ http-body-util = "0.1.3" hyper = { version = "1", features = ["http1", "server"] } hyper-util = { version = "0.1", features = ["tokio"] } bytes = "1" +axum = { version = "0.8.9", default-features = false, features = ["http1", "tokio"] } +prometheus = { version = "0.14.0", default-features = false } [dev-dependencies] tokio = { version = "1.0", features = ["macros", "rt-multi-thread", "test-util"] } @@ -36,4 +38,4 @@ tower-test = "0.4" http = "1" [features] -integration = [] \ No newline at end of file +integration = [] diff --git a/crates/koprs/README.md b/crates/koprs/README.md index 4d44497..c790380 100644 --- a/crates/koprs/README.md +++ b/crates/koprs/README.md @@ -57,6 +57,7 @@ A reusable, ergonomic library that eliminates Kubernetes operator boilerplate by | Method | What it provides | |--------|-----------------| | `.health_port(port)` | `GET /healthz` (liveness) + `GET /readyz` (readiness) HTTP server | +| `.metrics_port(port)` | `GET /metrics` Prometheus endpoint — reconcile counts, error counts, and duration histograms, recorded automatically around every reconcile | | `.graceful_shutdown()` | Clean stop on SIGTERM or Ctrl+C | | `.leader_election(ns, name)` | Kubernetes Lease-based HA — only one replica reconciles at a time | | `.leader_election_timings(dur, renew, retry)` | Override lease duration, renew period, and retry period (call after `.leader_election()`) | @@ -128,6 +129,7 @@ koprs = { path = "../koprs" } | `finalizers` | Add and remove finalizers | | `gc` | Garbage collect orphaned resources | | `watcher` | `watch` (signal), `watch_objects` (resource data), `watch_events` (applied + deleted); `WatchEvent` type | +| `observability` | `Metrics` — Prometheus collectors for reconcile counts, errors, and durations; wired in via `.metrics_port()` | | `owners` | Owner references, child wiring, `ObjectRef` sets, `owner_label_mapper`, and mapper closures | | `scope` | `Cluster` and `Namespaced` scope markers for compile-time API selection | | `traits` | `KubeResource`, `NamespacedResource`, `ClusterResource` trait aliases; `is_being_deleted` helper | @@ -408,6 +410,28 @@ patch_annotations::(client.clone(), Namespaced("my-ns"), "my-cr", &[("m ensure_namespace(client.clone(), "my-ns", "my-operator").await?; ``` +### Observability + +`ControllerBuilder::metrics_port` starts a Prometheus endpoint that records +reconcile counts, error counts (by kind and error), and reconcile latency +histograms automatically — no manual instrumentation needed: + +```rust +use koprs::controller::ControllerBuilder; + +ControllerBuilder::new(api) + .metrics_port(9090) + .run(MyReconciler, ctx) + .await?; +``` + +`GET /metrics` then serves `koprs_reconciliations_total`, +`koprs_reconcile_errors_total`, and `koprs_reconcile_duration_seconds` in +Prometheus text-exposition format. Use [`Metrics`](src/observability.rs) +directly if you need the collectors outside of `ControllerBuilder` — for +example to register them on your own `Registry` or record custom reconcile +outcomes. + --- ### Error handling @@ -464,6 +488,7 @@ src/tests/ ├── meta.rs ├── finalizers.rs ├── gc.rs +├── observability.rs ├── owners.rs ├── watcher.rs ├── scope.rs diff --git a/crates/koprs/src/controller.rs b/crates/koprs/src/controller.rs index f54e071..2a66367 100644 --- a/crates/koprs/src/controller.rs +++ b/crates/koprs/src/controller.rs @@ -58,6 +58,7 @@ use kube::Client; use tracing::{debug, error, info, warn}; use crate::error::{KubeGenericError, Result}; +use crate::observability::{Metrics, serve_metrics}; use crate::traits::KubeResource; // --------------------------------------------------------------------------- @@ -502,6 +503,7 @@ where pub(crate) watcher_config: watcher::Config, pub(crate) configure: Option>, pub(crate) health_port: Option, + pub(crate) metrics_port: Option, pub(crate) graceful_shutdown: bool, pub(crate) reconcile_timeout: Option, pub(crate) leader_election: Option, @@ -525,6 +527,7 @@ where watcher_config: watcher::Config::default(), configure: None, health_port: None, + metrics_port: None, graceful_shutdown: false, reconcile_timeout: None, leader_election: None, @@ -545,6 +548,20 @@ where self } + /// Start a Prometheus metrics server on `0.0.0.0:`. + /// + /// `GET /metrics` returns reconcile counts, error counts (by kind and + /// error), and reconcile duration histograms in Prometheus text + /// exposition format. Recording happens automatically around every + /// reconcile — see [`Metrics`] for the full list of collectors. + /// + /// If the port is already in use, [`run`](ControllerBuilder::run) returns + /// an error before the controller loop starts. + pub fn metrics_port(mut self, port: u16) -> Self { + self.metrics_port = Some(port); + self + } + /// Stop the controller loop cleanly on SIGTERM or Ctrl+C. /// /// The loop stops accepting new work; reconciles already running inside @@ -785,6 +802,18 @@ where tokio::spawn(serve_health(listener, ready.clone())); } + // --- Metrics --- + let metrics = if let Some(port) = self.metrics_port { + let registry = prometheus::Registry::new(); + let metrics = Arc::new(Metrics::new_registered(®istry)?); + let listener = tokio::net::TcpListener::bind(("0.0.0.0", port)).await?; + info!(port, "Metrics server listening"); + tokio::spawn(serve_metrics(listener, registry)); + Some(metrics) + } else { + None + }; + // --- Stop signal channel (shared by shutdown + lease loss) --- let (stop_tx, mut stop_rx) = tokio::sync::watch::channel(false); let has_stop = self.graceful_shutdown || self.leader_election.is_some(); @@ -822,13 +851,17 @@ where let error_policy_r = reconciler.clone(); let ready_ref = ready.clone(); let reconcile_timeout = self.reconcile_timeout; + let kind_owned = kind.to_string(); let run_loop = ctl .run( move |cr, ctx| { let r = reconciler.clone(); + let metrics = metrics.clone(); + let kind = kind_owned.clone(); async move { - if let Some(t) = reconcile_timeout { + let started = std::time::Instant::now(); + let result = if let Some(t) = reconcile_timeout { match tokio::time::timeout(t, r.reconcile(cr, ctx)).await { Ok(result) => result, Err(_) => { @@ -838,7 +871,16 @@ where } } else { r.reconcile(cr, ctx).await + }; + if let Some(m) = &metrics { + match &result { + Ok(_) => m.record_success(&kind, started.elapsed()), + Err(e) => { + m.record_failure(&kind, &e.to_string(), started.elapsed()) + } + } } + result } }, move |cr, err, ctx| error_policy_r.error_policy(cr, err, ctx), diff --git a/crates/koprs/src/lib.rs b/crates/koprs/src/lib.rs index 87741b1..a8bc0af 100755 --- a/crates/koprs/src/lib.rs +++ b/crates/koprs/src/lib.rs @@ -36,6 +36,7 @@ pub mod events; pub mod finalizers; pub mod gc; pub mod meta; +pub mod observability; pub mod owners; pub mod resources; pub mod scope; diff --git a/crates/koprs/src/observability.rs b/crates/koprs/src/observability.rs new file mode 100644 index 0000000..57332f7 --- /dev/null +++ b/crates/koprs/src/observability.rs @@ -0,0 +1,185 @@ +//! Operator observability — Prometheus metrics for the reconciliation loop. +//! +//! Provides [`Metrics`], a small set of Prometheus collectors that track the +//! three numbers every operator dashboard needs: how many reconciles ran, how +//! many failed (and why), and how long they took. +//! [`ControllerBuilder`][crate::controller::ControllerBuilder] wires this in +//! automatically when `.metrics_port()` is set — recording around every reconcile and serving the result on +//! `GET /metrics` in Prometheus text-exposition format. +//! +//! # Quick start +//! +//! ```no_run +//! use koprs::controller::{Action, Context, ControllerBuilder, Reconciler}; +//! use koprs::error::KubeGenericError; +//! use kube::Client; +//! use std::sync::Arc; +//! +//! struct MyOperator; +//! type MyCR = k8s_openapi::api::core::v1::ConfigMap; +//! +//! impl Reconciler for MyOperator { +//! type Error = KubeGenericError; +//! async fn reconcile(&self, _cr: Arc, _ctx: Arc) -> Result { +//! Ok(Action::await_change()) +//! } +//! } +//! +//! # async fn example(client: Client) -> Result<(), KubeGenericError> { +//! let ctx = Context::new(client.clone()); +//! let api = kube::Api::::namespaced(client, "my-namespace"); +//! ControllerBuilder::new(api) +//! .metrics_port(9090) +//! .run(MyOperator, ctx) +//! .await?; +//! # Ok(()) +//! # } +//! ``` + +use std::time::Duration; + +use prometheus::{Encoder, HistogramVec, IntCounter, IntCounterVec, Opts, Registry, TextEncoder}; + +use crate::error::{KubeGenericError, Result}; + +// --------------------------------------------------------------------------- +// Metrics +// --------------------------------------------------------------------------- + +/// Prometheus collectors for the reconciliation loop. +/// +/// | Metric | Type | Labels | Meaning | +/// |--------|------|--------|---------| +/// | `koprs_reconciliations_total` | counter | — | Reconciles completed, success or failure | +/// | `koprs_reconcile_errors_total` | counter | `kind`, `error` | Failed reconciles, by resource kind and error | +/// | `koprs_reconcile_duration_seconds` | histogram | `kind` | Reconcile latency, by resource kind | +/// +/// Construct with [`Metrics::new`] and register with a [`Registry`] via +/// [`Metrics::register`] — or use [`Metrics::new_registered`] to do both at +/// once. [`ControllerBuilder`][crate::controller::ControllerBuilder] does this +/// for you when `.metrics_port()` is set. +#[derive(Clone, Debug)] +pub struct Metrics { + reconciliations: IntCounter, + errors: IntCounterVec, + reconcile_duration: HistogramVec, +} + +impl Metrics { + /// Create the collectors without registering them. + pub fn new() -> Self { + let reconciliations = IntCounter::new( + "koprs_reconciliations_total", + "Total number of reconciles completed", + ) + .expect("static metric options are valid"); + + let errors = IntCounterVec::new( + Opts::new( + "koprs_reconcile_errors_total", + "Total number of failed reconciles", + ), + &["kind", "error"], + ) + .expect("static metric options are valid"); + + let reconcile_duration = HistogramVec::new( + prometheus::HistogramOpts::new( + "koprs_reconcile_duration_seconds", + "Reconcile latency in seconds", + ) + .buckets(vec![0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 15.0, 60.0]), + &["kind"], + ) + .expect("static metric options are valid"); + + Self { + reconciliations, + errors, + reconcile_duration, + } + } + + /// Register all collectors with `registry`. + /// + /// Returns [`KubeGenericError::Internal`] if a collector with the same + /// name is already registered. + pub fn register(self, registry: &Registry) -> Result { + registry + .register(Box::new(self.reconciliations.clone())) + .map_err(|e| KubeGenericError::Internal(format!("failed to register metrics: {e}")))?; + registry + .register(Box::new(self.errors.clone())) + .map_err(|e| KubeGenericError::Internal(format!("failed to register metrics: {e}")))?; + registry + .register(Box::new(self.reconcile_duration.clone())) + .map_err(|e| KubeGenericError::Internal(format!("failed to register metrics: {e}")))?; + Ok(self) + } + + /// Create the collectors and register them with `registry` in one step. + pub fn new_registered(registry: &Registry) -> Result { + Self::new().register(registry) + } + + /// Record a successful reconcile of `kind` that took `duration`. + pub fn record_success(&self, kind: &str, duration: Duration) { + self.reconciliations.inc(); + self.reconcile_duration + .with_label_values(&[kind]) + .observe(duration.as_secs_f64()); + } + + /// Record a failed reconcile of `kind` that took `duration`, labelled + /// with the error's `Display` representation. + pub fn record_failure(&self, kind: &str, error: &str, duration: Duration) { + self.reconciliations.inc(); + self.errors.with_label_values(&[kind, error]).inc(); + self.reconcile_duration + .with_label_values(&[kind]) + .observe(duration.as_secs_f64()); + } +} + +impl Default for Metrics { + fn default() -> Self { + Self::new() + } +} + +// --------------------------------------------------------------------------- +// Internal: metrics server +// --------------------------------------------------------------------------- + +/// Render every metric family in `registry` as Prometheus text exposition format. +pub(crate) fn render(registry: &Registry) -> Result { + let metric_families = registry.gather(); + let mut buffer = Vec::new(); + TextEncoder::new() + .encode(&metric_families, &mut buffer) + .map_err(|e| KubeGenericError::Internal(format!("failed to encode metrics: {e}")))?; + String::from_utf8(buffer) + .map_err(|e| KubeGenericError::Internal(format!("metrics output was not valid UTF-8: {e}"))) +} + +/// Serve `GET /metrics` on an already-bound listener, rendering `registry` +/// in Prometheus text exposition format on every request. +pub(crate) async fn serve_metrics(listener: tokio::net::TcpListener, registry: Registry) { + use axum::Router; + use axum::extract::State; + use axum::http::StatusCode; + use axum::routing::get; + + async fn metrics_handler(State(registry): State) -> (StatusCode, String) { + match render(®istry) { + Ok(body) => (StatusCode::OK, body), + Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()), + } + } + + let app = Router::new() + .route("/metrics", get(metrics_handler)) + .with_state(registry); + + let _ = axum::serve(listener, app).await; +} diff --git a/crates/koprs/src/tests/mod.rs b/crates/koprs/src/tests/mod.rs index e90076a..82a2671 100755 --- a/crates/koprs/src/tests/mod.rs +++ b/crates/koprs/src/tests/mod.rs @@ -6,6 +6,7 @@ mod events; mod finalizers; mod gc; mod meta; +mod observability; mod owners; mod resources; mod scope; diff --git a/crates/koprs/src/tests/observability.rs b/crates/koprs/src/tests/observability.rs new file mode 100644 index 0000000..d1fdf3a --- /dev/null +++ b/crates/koprs/src/tests/observability.rs @@ -0,0 +1,137 @@ +// src/tests/observability.rs + +#[cfg(test)] +mod observability_tests { + use std::time::Duration; + + use prometheus::Registry; + + use crate::observability::{Metrics, render, serve_metrics}; + + // ----------------------------------------------------------------------- + // Metrics + // ----------------------------------------------------------------------- + + #[test] + fn new_registered_registers_all_collectors() { + let registry = Registry::new(); + let metrics = Metrics::new_registered(®istry).expect("registration succeeds"); + + // Vec-based collectors only appear in `gather()` once a label + // combination has been observed at least once. + metrics.record_success("ConfigMap", Duration::from_millis(10)); + metrics.record_failure("ConfigMap", "boom", Duration::from_millis(10)); + + let names: Vec = registry + .gather() + .into_iter() + .map(|mf| mf.name().to_string()) + .collect(); + + assert!(names.contains(&"koprs_reconciliations_total".to_string())); + assert!(names.contains(&"koprs_reconcile_errors_total".to_string())); + assert!(names.contains(&"koprs_reconcile_duration_seconds".to_string())); + } + + #[test] + fn registering_twice_fails() { + let registry = Registry::new(); + Metrics::new_registered(®istry).expect("first registration succeeds"); + let err = Metrics::new_registered(®istry).expect_err("duplicate registration fails"); + assert!(matches!(err, crate::error::KubeGenericError::Internal(_))); + } + + #[test] + fn record_success_increments_total_but_not_errors() { + let registry = Registry::new(); + let metrics = Metrics::new_registered(®istry).unwrap(); + + metrics.record_success("ConfigMap", Duration::from_millis(5)); + metrics.record_success("ConfigMap", Duration::from_millis(5)); + + let output = render(®istry).unwrap(); + assert!(output.contains("koprs_reconciliations_total 2")); + assert!(!output.contains("koprs_reconcile_errors_total")); + } + + #[test] + fn record_failure_increments_total_and_labelled_error_counter() { + let registry = Registry::new(); + let metrics = Metrics::new_registered(®istry).unwrap(); + + metrics.record_failure("ConfigMap", "not found", Duration::from_millis(1)); + + let output = render(®istry).unwrap(); + assert!(output.contains("koprs_reconciliations_total 1")); + assert!( + output + .contains(r#"koprs_reconcile_errors_total{error="not found",kind="ConfigMap"} 1"#) + ); + } + + #[test] + fn render_includes_duration_histogram_for_recorded_kind() { + let registry = Registry::new(); + let metrics = Metrics::new_registered(®istry).unwrap(); + + metrics.record_success("ConfigMap", Duration::from_millis(100)); + + let output = render(®istry).unwrap(); + assert!(output.contains("koprs_reconcile_duration_seconds_count{kind=\"ConfigMap\"} 1")); + } + + #[test] + fn render_on_empty_registry_is_empty() { + let registry = Registry::new(); + assert_eq!(render(®istry).unwrap(), ""); + } + + // ----------------------------------------------------------------------- + // serve_metrics — exercised via real TCP + // ----------------------------------------------------------------------- + + /// Bind a random port, start serve_metrics, return the port. + async fn start_metrics_server(registry: Registry) -> u16 { + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let port = listener.local_addr().unwrap().port(); + tokio::spawn(serve_metrics(listener, registry)); + port + } + + /// Send a minimal HTTP/1.1 GET and return the full raw response. + async fn http_get(port: u16, path: &str) -> String { + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + let mut stream = tokio::net::TcpStream::connect(format!("127.0.0.1:{port}")) + .await + .unwrap(); + let req = format!("GET {path} HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n"); + stream.write_all(req.as_bytes()).await.unwrap(); + let mut buf = Vec::new(); + stream.read_to_end(&mut buf).await.unwrap(); + String::from_utf8_lossy(&buf).into_owned() + } + + #[tokio::test] + async fn metrics_endpoint_returns_200_and_renders_registry() { + let registry = Registry::new(); + let metrics = Metrics::new_registered(®istry).unwrap(); + metrics.record_success("ConfigMap", Duration::from_millis(20)); + + let port = start_metrics_server(registry).await; + let resp = http_get(port, "/metrics").await; + + assert!(resp.starts_with("HTTP/1.1 200 OK"), "got: {resp}"); + assert!( + resp.contains("koprs_reconciliations_total 1"), + "expected metric in body, got: {resp}" + ); + } + + #[tokio::test] + async fn unknown_path_returns_404() { + let registry = Registry::new(); + let port = start_metrics_server(registry).await; + let resp = http_get(port, "/nope").await; + assert!(resp.starts_with("HTTP/1.1 404"), "got: {resp}"); + } +} diff --git a/examples/configmapsync/src/main.rs b/examples/configmapsync/src/main.rs index 9303d0d..c5bad89 100644 --- a/examples/configmapsync/src/main.rs +++ b/examples/configmapsync/src/main.rs @@ -9,6 +9,7 @@ // // Operational features: // .health_port(8080) — GET /healthz + GET /readyz for pod probes +// .metrics_port(9090) — GET /metrics — Prometheus reconcile counts/errors/durations // .graceful_shutdown() — clean stop on SIGTERM / Ctrl+C // .leader_election(...) — Kubernetes Lease-based HA; only one replica reconciles // .reconcile_timeout(300s) — kills and requeues reconciles stuck longer than 5 minutes @@ -67,6 +68,7 @@ async fn main() -> anyhow::Result<()> { owner_label_mapper("configmapsync.example.io/owner"), ) .health_port(8080) + .metrics_port(9090) .graceful_shutdown() .leader_election(operator_ns, "configmapsync-operator-leader") .reconcile_timeout(Duration::from_secs(300)) diff --git a/examples/configmapsync/src/reconciler.rs b/examples/configmapsync/src/reconciler.rs index 0e9f9f0..114ee5e 100644 --- a/examples/configmapsync/src/reconciler.rs +++ b/examples/configmapsync/src/reconciler.rs @@ -6,7 +6,7 @@ use std::sync::Arc; use k8s_openapi::api::core::v1::ConfigMap; use kube::ResourceExt; use tokio::time::Duration; -use tracing::{error, info, warn}; +use tracing::{error, info}; use koprs::controller::{Action, Context, Reconciler}; use koprs::error::KubeGenericError; @@ -176,8 +176,8 @@ impl Reconciler for ConfigMapSyncReconciler { error: &KubeGenericError, _ctx: Arc, ) -> Action { - warn!(cr = %cr.name_any(), error = %error, "reconcile failed — retrying in 30s"); - Action::requeue(Duration::from_secs(30)) + error!(cr = %cr.name_any(), error = %error, "reconcile failed — retrying in 5s"); + Action::requeue(Duration::from_secs(5)) } } diff --git a/examples/multicontroller/.gitignore b/examples/multicontroller/.gitignore new file mode 100644 index 0000000..ea8c4bf --- /dev/null +++ b/examples/multicontroller/.gitignore @@ -0,0 +1 @@ +/target diff --git a/examples/multicontroller/Cargo.toml b/examples/multicontroller/Cargo.toml new file mode 100644 index 0000000..836093f --- /dev/null +++ b/examples/multicontroller/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "multicontroller-operator" +version = "0.1.0" +edition.workspace = true + +[[bin]] +name = "multicontroller-operator" +path = "src/main.rs" + +[dependencies] +koprs = { path = "../../crates/koprs" } +kube = { workspace = true } +k8s-openapi = { version = "0.25.0", features = ["v1_33"] } +schemars.workspace = true +serde = { workspace = true } +serde_json = "1.0" +tokio = { workspace = true } +tracing = "0.1" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } +thiserror = "1" +anyhow = "1" +chrono = "0.4" diff --git a/examples/multicontroller/README.md b/examples/multicontroller/README.md new file mode 100644 index 0000000..1a34bb4 --- /dev/null +++ b/examples/multicontroller/README.md @@ -0,0 +1,199 @@ +# multicontroller-operator + +A Kubernetes operator written in Rust that runs **two independent controllers +side by side in a single process**, each managing its own CRD kind: + +| Controller | Watches CR | Manages | +|---|---|---| +| `SecretSync` controller | `SecretSync` | `Secret` | +| `ServiceAccountSync` controller | `ServiceAccountSync` | `ServiceAccount` | + +This is the pattern to reach for when an operator owns more than one CRD: build +each controller independently with its own `ControllerBuilder`, then drive both +loops on the same Tokio runtime. Within each loop, multiple CRs of that kind +are also reconciled concurrently — so reconciliation is parallel both *across* +controllers and *within* each controller. + +## How it works + +### Custom Resources + +**`SecretSync`** — creates/maintains a `Secret` in a target namespace from +plaintext key/value pairs: + +```yaml +apiVersion: example.io/v1alpha1 +kind: SecretSync +metadata: + name: db-credentials + namespace: default +spec: + targetNamespace: production + stringData: + username: app-user + password: change-me +``` + +This creates a Secret named `ss-db-credentials` in the `production` namespace. + +**`ServiceAccountSync`** — creates/maintains a `ServiceAccount` in a target +namespace with the given image-pull secrets: + +```yaml +apiVersion: example.io/v1alpha1 +kind: ServiceAccountSync +metadata: + name: app-runner + namespace: default +spec: + targetNamespace: production + automountToken: false + imagePullSecrets: + - regcred +``` + +This creates a ServiceAccount named `sas-app-runner` in the `production` namespace. + +### Reconcile loop + +Both reconcilers (`secretsync.rs`, `serviceaccountsync.rs`) follow the same +shape — only the managed resource type differs: + +1. **Adds a finalizer** to the CR to prevent deletion before cleanup runs. +2. **Applies the managed resource** (`ss-` / `sas-`) in the + target namespace using Server-Side Apply, labelled with + `app.kubernetes.io/managed-by=multicontroller-sync`. +3. **Garbage collects** stale resources previously owned by the CR. +4. **Stamps a label** — adds `multicontroller.example.io/synced-to=` + to the CR. +5. **Patches status** — writes `ready`, `message`, and a `Ready=True` condition + in a single SSA patch. +6. **On deletion** — removes the synced resource, then strips the finalizer. + +Each controller requeues every **300 seconds** for drift correction, and +retries after **5 seconds** on error. + +### Running controllers concurrently + +[`src/main.rs`](src/main.rs) builds two `ControllerBuilder`s — one per CRD — +and drives both with `tokio::try_join!`: + +```rust +let secretsync_controller = ControllerBuilder::new(secretsync_api) + .health_port(8080) + .metrics_port(9090) + .leader_election(operator_ns.clone(), "secretsync-operator-leader") + .concurrency(4) + .run(SecretSyncReconciler, secret_ctx); + +let serviceaccountsync_controller = ControllerBuilder::new(serviceaccountsync_api) + .health_port(8081) + .metrics_port(9091) + .leader_election(operator_ns, "serviceaccountsync-operator-leader") + .concurrency(4) + .run(ServiceAccountSyncReconciler, serviceaccount_ctx); + +tokio::try_join!(secretsync_controller, serviceaccountsync_controller)?; +``` + +A few things worth noting about composing controllers this way: + +- Each `.run(...)` future drives its own watch + reconcile loop; polling them + together means CRs of *either* kind are picked up and reconciled in parallel — + a burst of `SecretSync` updates doesn't block `ServiceAccountSync` reconciles. +- `.concurrency(n)` additionally lets each controller reconcile up to `n` CRs + of *its own* kind in parallel, so concurrency happens both across and within + controllers. +- Operational features that bind shared resources need distinct values per + controller: each gets its own health port (`8080`/`8081`), its own metrics + port (`9090`/`9091`), and its own leader lease name, so the two controllers + can be elected leader independently. +- `tokio::try_join!` propagates the first error and cancels the other loop — + the same fail-fast behavior a single `.run()` call would have. Use + `tokio::join!` instead if controllers should keep running independently of + each other's failures. + +### Status + +``` +kubectl get secretsyncs +NAME TARGET READY +db-credentials production true + +kubectl get serviceaccountsyncs +NAME TARGET READY +app-runner production true +``` + +## Prerequisites + +- Kubernetes cluster (1.26+) +- `kubectl` configured to point at the cluster +- Rust toolchain (edition 2024) — only needed to build from source + +## Deploy + +### 1. Install the CRDs, then apply the example CRs + +The CRDs must be fully established before Kubernetes will accept instances of them. +Apply in two steps: + +```bash +kubectl apply -f manifests/crd-secretsync.yaml -f manifests/crd-serviceaccountsync.yaml +kubectl apply -f manifests/example-cr.yaml +``` + +### 2. Build and run the operator + +#### Local (out-of-cluster) + +```bash +RUST_LOG=info cargo run --release +``` + +The operator uses the kubeconfig from `~/.kube/config` (or the `KUBECONFIG` env var) when running out-of-cluster. + +#### In-cluster + +Build a container image from the binary and deploy it as a `Deployment` with a `ServiceAccount` that has the necessary RBAC permissions (see below), then point it at the cluster by mounting the in-cluster service account token (the default when no kubeconfig is present). + +### Required RBAC permissions + +The operator needs the following permissions: + +| Resource | Verbs | +|---|---| +| `secretsyncs` (example.io) | get, list, watch, patch, update | +| `secretsyncs/status` (example.io) | patch, update | +| `serviceaccountsyncs` (example.io) | get, list, watch, patch, update | +| `serviceaccountsyncs/status` (example.io) | patch, update | +| `secrets` (core) | get, list, watch, create, update, patch, delete | +| `serviceaccounts` (core) | get, list, watch, create, update, patch, delete | +| `leases` (coordination.k8s.io) | get, list, watch, create, update, patch | + +## Project structure + +``` +src/ + main.rs — wires up and runs both controllers concurrently + secretsync.rs — SecretSync reconciler (manages Secrets) + serviceaccountsync.rs — ServiceAccountSync reconciler (manages ServiceAccounts) + types.rs — SecretSync and ServiceAccountSync CRD definitions +manifests/ + crd-secretsync.yaml + crd-serviceaccountsync.yaml + example-cr.yaml +Cargo.toml +``` + +## Dependencies + +| Crate | Version | Purpose | +|---|---|---| +| `kube` | 1.1.0 | Kubernetes client + controller runtime | +| `k8s-openapi` | 0.25.0 (v1_33) | Typed Kubernetes API objects | +| `koprs` | 0.6.1 | Helper abstractions (SSA, finalizers, status patching, conditions, label patching, GC) | +| `tokio` | 1.0 | Async runtime | +| `tracing` / `tracing-subscriber` | 0.1 / 0.3 | Structured logging | + +Log level is controlled via the `RUST_LOG` environment variable (default: `info`). diff --git a/examples/multicontroller/manifests/crd-secretsync.yaml b/examples/multicontroller/manifests/crd-secretsync.yaml new file mode 100644 index 0000000..abdd62e --- /dev/null +++ b/examples/multicontroller/manifests/crd-secretsync.yaml @@ -0,0 +1,75 @@ +# manifests/crd-secretsync.yaml +# +# CustomResourceDefinition for SecretSync. +# Apply with: kubectl apply -f manifests/crd-secretsync.yaml +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: secretsyncs.example.io +spec: + group: example.io + names: + kind: SecretSync + listKind: SecretSyncList + plural: secretsyncs + singular: secretsync + shortNames: + - ssy + scope: Namespaced + versions: + - name: v1alpha1 + served: true + storage: true + subresources: + status: {} + additionalPrinterColumns: + - name: Target + type: string + jsonPath: .spec.targetNamespace + - name: Ready + type: string + jsonPath: .status.ready + schema: + openAPIV3Schema: + type: object + properties: + spec: + type: object + required: [targetNamespace, stringData] + properties: + targetNamespace: + type: string + description: Namespace where the Secret will be created. + stringData: + type: object + additionalProperties: + type: string + description: Plaintext key/value pairs to place in the Secret's stringData. + status: + type: object + properties: + ready: + type: boolean + message: + type: string + conditions: + type: array + items: + type: object + required: [type, status, reason, message, lastTransitionTime] + properties: + type: + type: string + status: + type: string + enum: ["True", "False", "Unknown"] + reason: + type: string + message: + type: string + lastTransitionTime: + type: string + format: date-time + observedGeneration: + type: integer + format: int64 diff --git a/examples/multicontroller/manifests/crd-serviceaccountsync.yaml b/examples/multicontroller/manifests/crd-serviceaccountsync.yaml new file mode 100644 index 0000000..17497d6 --- /dev/null +++ b/examples/multicontroller/manifests/crd-serviceaccountsync.yaml @@ -0,0 +1,78 @@ +# manifests/crd-serviceaccountsync.yaml +# +# CustomResourceDefinition for ServiceAccountSync. +# Apply with: kubectl apply -f manifests/crd-serviceaccountsync.yaml +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: serviceaccountsyncs.example.io +spec: + group: example.io + names: + kind: ServiceAccountSync + listKind: ServiceAccountSyncList + plural: serviceaccountsyncs + singular: serviceaccountsync + shortNames: + - sasy + scope: Namespaced + versions: + - name: v1alpha1 + served: true + storage: true + subresources: + status: {} + additionalPrinterColumns: + - name: Target + type: string + jsonPath: .spec.targetNamespace + - name: Ready + type: string + jsonPath: .status.ready + schema: + openAPIV3Schema: + type: object + properties: + spec: + type: object + required: [targetNamespace] + properties: + targetNamespace: + type: string + description: Namespace where the ServiceAccount will be created. + automountToken: + type: boolean + description: Whether pods using this ServiceAccount automount its token. + imagePullSecrets: + type: array + items: + type: string + description: Names of image-pull secrets to attach to the ServiceAccount. + status: + type: object + properties: + ready: + type: boolean + message: + type: string + conditions: + type: array + items: + type: object + required: [type, status, reason, message, lastTransitionTime] + properties: + type: + type: string + status: + type: string + enum: ["True", "False", "Unknown"] + reason: + type: string + message: + type: string + lastTransitionTime: + type: string + format: date-time + observedGeneration: + type: integer + format: int64 diff --git a/examples/multicontroller/manifests/example-cr.yaml b/examples/multicontroller/manifests/example-cr.yaml new file mode 100644 index 0000000..10f5457 --- /dev/null +++ b/examples/multicontroller/manifests/example-cr.yaml @@ -0,0 +1,25 @@ +# manifests/example-cr.yaml +# +# Example CRs for both controllers managed by this operator. +# Apply with: kubectl apply -f manifests/example-cr.yaml +apiVersion: example.io/v1alpha1 +kind: SecretSync +metadata: + name: db-credentials + namespace: default +spec: + targetNamespace: production + stringData: + username: app-user + password: change-me +--- +apiVersion: example.io/v1alpha1 +kind: ServiceAccountSync +metadata: + name: app-runner + namespace: default +spec: + targetNamespace: production + automountToken: false + imagePullSecrets: + - regcred diff --git a/examples/multicontroller/src/main.rs b/examples/multicontroller/src/main.rs new file mode 100644 index 0000000..7204e95 --- /dev/null +++ b/examples/multicontroller/src/main.rs @@ -0,0 +1,96 @@ +// src/main.rs +// +// Demonstrates running multiple independent controllers — each managing its +// own CRD kind — concurrently inside a single operator process. +// +// SecretSync controller — reconciles SecretSync CRs, manages Secrets +// ServiceAccountSync controller — reconciles ServiceAccountSync CRs, manages ServiceAccounts +// +// Each `ControllerBuilder::run(...)` call returns a future that drives its own +// watch + reconcile loop. Running them with `tokio::try_join!` polls both +// loops on the same runtime: CRs of either kind are picked up and reconciled +// in parallel, and within each loop multiple CRs of that kind are reconciled +// concurrently up to `.concurrency(n)`. +// +// Operational features (composed identically on both controllers): +// .health_port(...) — GET /healthz + GET /readyz per controller (distinct ports) +// .metrics_port(...) — GET /metrics per controller (distinct ports) — Prometheus +// reconcile counts/errors/durations +// .graceful_shutdown() — clean stop on SIGTERM / Ctrl+C +// .leader_election(...) — Kubernetes Lease-based HA (distinct lease names) +// .reconcile_timeout(...) — kills and requeues reconciles stuck too long +// .concurrency(n) — reconcile up to n CRs of that kind in parallel + +mod secretsync; +mod serviceaccountsync; +mod types; + +use std::time::Duration; + +use kube::{Api, Client}; +use tracing::info; + +use koprs::controller::{Context, ControllerBuilder}; + +use crate::secretsync::SecretSyncReconciler; +use crate::serviceaccountsync::ServiceAccountSyncReconciler; +use crate::types::{SecretSync, ServiceAccountSync}; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt() + .with_env_filter( + tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| "info".into()), + ) + .init(); + + info!("starting multicontroller-operator"); + + let client = Client::try_default().await?; + + // The operator namespace is injected via the downward API in production: + // env: + // - name: OPERATOR_NAMESPACE + // valueFrom: + // fieldRef: + // fieldPath: metadata.namespace + let operator_ns = std::env::var("OPERATOR_NAMESPACE").unwrap_or_else(|_| "default".to_string()); + + // ----------------------------------------------------------------------- + // Controller A — SecretSync + // ----------------------------------------------------------------------- + let secretsync_api: Api = Api::all(client.clone()); + let secret_ctx = Context::new(client.clone()); + + let secretsync_controller = ControllerBuilder::new(secretsync_api) + .health_port(8080) + .metrics_port(9090) + .graceful_shutdown() + .leader_election(operator_ns.clone(), "secretsync-operator-leader") + .reconcile_timeout(Duration::from_secs(300)) + .concurrency(4) + .run(SecretSyncReconciler, secret_ctx); + + // ----------------------------------------------------------------------- + // Controller B — ServiceAccountSync + // ----------------------------------------------------------------------- + let serviceaccountsync_api: Api = Api::all(client.clone()); + let serviceaccount_ctx = Context::new(client.clone()); + + let serviceaccountsync_controller = ControllerBuilder::new(serviceaccountsync_api) + .health_port(8081) + .metrics_port(9091) + .graceful_shutdown() + .leader_election(operator_ns, "serviceaccountsync-operator-leader") + .reconcile_timeout(Duration::from_secs(300)) + .concurrency(4) + .run(ServiceAccountSyncReconciler, serviceaccount_ctx); + + // Drive both controller loops on the same runtime: each polls its own + // watch stream and reconciles its own CRs independently and in parallel. + // If either returns an error, the other is cancelled and the error is + // propagated — mirroring how a single `.run()` call would fail the process. + let ((), ()) = tokio::try_join!(secretsync_controller, serviceaccountsync_controller)?; + + Ok(()) +} diff --git a/examples/multicontroller/src/secretsync.rs b/examples/multicontroller/src/secretsync.rs new file mode 100644 index 0000000..99857a8 --- /dev/null +++ b/examples/multicontroller/src/secretsync.rs @@ -0,0 +1,204 @@ +// src/secretsync.rs +// +// Reconciler for the SecretSync CRD — ensures a Secret with the spec's +// string data exists in the target namespace. Structurally identical to +// the ServiceAccountSync reconciler; the two run as independent controllers +// (see main.rs) so CRs of either kind are reconciled concurrently. + +use std::collections::BTreeMap; +use std::sync::Arc; + +use k8s_openapi::api::core::v1::Secret; +use kube::ResourceExt; +use tokio::time::Duration; +use tracing::{error, info}; + +use koprs::controller::{Action, Context, Reconciler}; +use koprs::error::KubeGenericError; +use koprs::events::{EventType, record_event}; +use koprs::finalizers::{add_finalizer_namespaced, remove_finalizers}; +use koprs::gc::gc_resources; +use koprs::is_being_deleted; +use koprs::meta::ObjectMetaBuilder; +use koprs::resources::{EnsureOutcome, delete_resource, ensure_resource, patch_labels}; +use koprs::scope::Namespaced; +use koprs::status::{make_condition, patch_status_namespaced, upsert_condition}; + +use crate::types::{SecretSync, SecretSyncStatus}; + +const FINALIZER: &str = "multicontroller.example.io/secretsync-cleanup"; +const FIELD_MANAGER: &str = "multicontroller-secretsync"; +const MANAGED_LABEL: &str = "app.kubernetes.io/managed-by=multicontroller-secretsync"; + +pub struct SecretSyncReconciler; + +impl Reconciler for SecretSyncReconciler { + type Error = KubeGenericError; + + async fn reconcile( + &self, + cr: Arc, + ctx: Arc, + ) -> Result { + let client = ctx.client.clone(); + let name = cr.name_any(); + let namespace = cr + .namespace() + .ok_or(KubeGenericError::MissingMetadata("namespace".into()))?; + + info!(cr = %name, ns = %namespace, "reconciling SecretSync"); + + // ------------------------------------------------------------------- + // Deletion path + // ------------------------------------------------------------------- + if is_being_deleted(&*cr) { + info!(cr = %name, "deletion timestamp set — running cleanup"); + + let target_ns = &cr.spec.target_namespace; + let secret_name = secret_name(&name); + + match delete_resource::(client.clone(), Namespaced(target_ns), &secret_name) + .await + { + Ok(true) => info!(secret = %secret_name, ns = %target_ns, "deleted synced Secret"), + Ok(false) => info!(secret = %secret_name, "Secret was already gone"), + Err(e) => { + error!(error = %e, "failed to delete Secret during cleanup"); + return Err(e.into()); + } + } + + remove_finalizers::(client.clone(), Namespaced(&namespace), &name) + .await?; + info!(cr = %name, "finalizer removed — deletion complete"); + return Ok(Action::await_change()); + } + + // ------------------------------------------------------------------- + // Normal reconcile path + // ------------------------------------------------------------------- + + // 1. Ensure finalizer is present. + add_finalizer_namespaced::(client.clone(), &cr, FINALIZER).await?; + + // 2. Build and ensure the desired Secret. + let target_ns = &cr.spec.target_namespace; + let secret_name = secret_name(&name); + let desired_secret = build_secret(&secret_name, target_ns, &name, &cr.spec.string_data); + + let outcome = ensure_resource::( + client.clone(), + Namespaced(target_ns), + &desired_secret, + FIELD_MANAGER, + ) + .await?; + info!(secret = %secret_name, ns = %target_ns, "applied Secret"); + + if outcome.was_changed() { + let (reason, note) = match &outcome { + EnsureOutcome::Created(_) => ( + "SecretCreated", + format!("Secret '{secret_name}' created in namespace '{target_ns}'"), + ), + EnsureOutcome::Updated(_) => ( + "SecretDriftCorrected", + format!("Secret '{secret_name}' corrected in namespace '{target_ns}'"), + ), + EnsureOutcome::Unchanged(_) => unreachable!(), + }; + record_event( + client.clone(), + &*cr, + EventType::Normal, + "Sync", + reason, + note, + FIELD_MANAGER, + ) + .await?; + } + + // 3. Garbage-collect stale Secrets previously owned by this CR. + gc_resources::(client.clone(), Namespaced(target_ns), MANAGED_LABEL, |s| { + s.name_any() == secret_name + }) + .await?; + + // 4. Stamp the target namespace as a label on the CR. + patch_labels::( + client.clone(), + Namespaced(&namespace), + &name, + &[("multicontroller.example.io/synced-to", target_ns)], + ) + .await?; + + // 5. Write the full status in one SSA patch. + let generation = cr.metadata.generation; + let status_message = format!("Secret '{secret_name}' synced to namespace '{target_ns}'"); + + let mut conditions = cr + .status + .as_ref() + .map(|s| s.conditions.clone()) + .unwrap_or_default(); + upsert_condition( + &mut conditions, + make_condition("Ready", "True", "SecretSynced", &status_message, generation), + ); + + patch_status_namespaced::( + client.clone(), + &namespace, + &name, + SecretSyncStatus { + ready: true, + message: status_message, + conditions, + }, + FIELD_MANAGER, + ) + .await?; + + info!(cr = %name, "reconcile complete"); + Ok(Action::requeue(Duration::from_secs(300))) + } + + fn error_policy( + &self, + cr: Arc, + error: &KubeGenericError, + _ctx: Arc, + ) -> Action { + error!(cr = %cr.name_any(), error = %error, "reconcile failed — retrying in 5s"); + Action::requeue(Duration::from_secs(5)) + } +} + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +fn secret_name(cr_name: &str) -> String { + format!("ss-{cr_name}") +} + +fn build_secret( + name: &str, + namespace: &str, + owner_cr: &str, + string_data: &BTreeMap, +) -> Secret { + Secret { + metadata: ObjectMetaBuilder::new() + .name(name) + .namespace(namespace) + .label("app.kubernetes.io/managed-by", "multicontroller-secretsync") + .label("multicontroller.example.io/owner", owner_cr) + .build(), + type_: Some("Opaque".to_string()), + string_data: Some(string_data.clone()), + ..Default::default() + } +} diff --git a/examples/multicontroller/src/serviceaccountsync.rs b/examples/multicontroller/src/serviceaccountsync.rs new file mode 100644 index 0000000..527f088 --- /dev/null +++ b/examples/multicontroller/src/serviceaccountsync.rs @@ -0,0 +1,236 @@ +// src/serviceaccountsync.rs +// +// Reconciler for the ServiceAccountSync CRD — ensures a ServiceAccount with +// the spec's image-pull secrets exists in the target namespace. Structurally +// identical to the SecretSync reconciler; the two run as independent +// controllers (see main.rs) so CRs of either kind are reconciled concurrently. + +use std::sync::Arc; + +use k8s_openapi::api::core::v1::{LocalObjectReference, ServiceAccount}; +use kube::ResourceExt; +use tokio::time::Duration; +use tracing::{error, info}; + +use koprs::controller::{Action, Context, Reconciler}; +use koprs::error::KubeGenericError; +use koprs::events::{EventType, record_event}; +use koprs::finalizers::{add_finalizer_namespaced, remove_finalizers}; +use koprs::gc::gc_resources; +use koprs::is_being_deleted; +use koprs::meta::ObjectMetaBuilder; +use koprs::resources::{EnsureOutcome, delete_resource, ensure_resource, patch_labels}; +use koprs::scope::Namespaced; +use koprs::status::{make_condition, patch_status_namespaced, upsert_condition}; + +use crate::types::{ServiceAccountSync, ServiceAccountSyncStatus}; + +const FINALIZER: &str = "multicontroller.example.io/serviceaccountsync-cleanup"; +const FIELD_MANAGER: &str = "multicontroller-serviceaccountsync"; +const MANAGED_LABEL: &str = "app.kubernetes.io/managed-by=multicontroller-serviceaccountsync"; + +pub struct ServiceAccountSyncReconciler; + +impl Reconciler for ServiceAccountSyncReconciler { + type Error = KubeGenericError; + + async fn reconcile( + &self, + cr: Arc, + ctx: Arc, + ) -> Result { + let client = ctx.client.clone(); + let name = cr.name_any(); + let namespace = cr + .namespace() + .ok_or(KubeGenericError::MissingMetadata("namespace".into()))?; + + info!(cr = %name, ns = %namespace, "reconciling ServiceAccountSync"); + + // ------------------------------------------------------------------- + // Deletion path + // ------------------------------------------------------------------- + if is_being_deleted(&*cr) { + info!(cr = %name, "deletion timestamp set — running cleanup"); + + let target_ns = &cr.spec.target_namespace; + let sa_name = service_account_name(&name); + + match delete_resource::( + client.clone(), + Namespaced(target_ns), + &sa_name, + ) + .await + { + Ok(true) => info!(sa = %sa_name, ns = %target_ns, "deleted synced ServiceAccount"), + Ok(false) => info!(sa = %sa_name, "ServiceAccount was already gone"), + Err(e) => { + error!(error = %e, "failed to delete ServiceAccount during cleanup"); + return Err(e.into()); + } + } + + remove_finalizers::( + client.clone(), + Namespaced(&namespace), + &name, + ) + .await?; + info!(cr = %name, "finalizer removed — deletion complete"); + return Ok(Action::await_change()); + } + + // ------------------------------------------------------------------- + // Normal reconcile path + // ------------------------------------------------------------------- + + // 1. Ensure finalizer is present. + add_finalizer_namespaced::(client.clone(), &cr, FINALIZER).await?; + + // 2. Build and ensure the desired ServiceAccount. + let target_ns = &cr.spec.target_namespace; + let sa_name = service_account_name(&name); + let desired_sa = build_service_account( + &sa_name, + target_ns, + &name, + cr.spec.automount_token, + &cr.spec.image_pull_secrets, + ); + + let outcome = ensure_resource::( + client.clone(), + Namespaced(target_ns), + &desired_sa, + FIELD_MANAGER, + ) + .await?; + info!(sa = %sa_name, ns = %target_ns, "applied ServiceAccount"); + + if outcome.was_changed() { + let (reason, note) = match &outcome { + EnsureOutcome::Created(_) => ( + "ServiceAccountCreated", + format!("ServiceAccount '{sa_name}' created in namespace '{target_ns}'"), + ), + EnsureOutcome::Updated(_) => ( + "ServiceAccountDriftCorrected", + format!("ServiceAccount '{sa_name}' corrected in namespace '{target_ns}'"), + ), + EnsureOutcome::Unchanged(_) => unreachable!(), + }; + record_event( + client.clone(), + &*cr, + EventType::Normal, + "Sync", + reason, + note, + FIELD_MANAGER, + ) + .await?; + } + + // 3. Garbage-collect stale ServiceAccounts previously owned by this CR. + gc_resources::( + client.clone(), + Namespaced(target_ns), + MANAGED_LABEL, + |sa| sa.name_any() == sa_name, + ) + .await?; + + // 4. Stamp the target namespace as a label on the CR. + patch_labels::( + client.clone(), + Namespaced(&namespace), + &name, + &[("multicontroller.example.io/synced-to", target_ns)], + ) + .await?; + + // 5. Write the full status in one SSA patch. + let generation = cr.metadata.generation; + let status_message = + format!("ServiceAccount '{sa_name}' synced to namespace '{target_ns}'"); + + let mut conditions = cr + .status + .as_ref() + .map(|s| s.conditions.clone()) + .unwrap_or_default(); + upsert_condition( + &mut conditions, + make_condition( + "Ready", + "True", + "ServiceAccountSynced", + &status_message, + generation, + ), + ); + + patch_status_namespaced::( + client.clone(), + &namespace, + &name, + ServiceAccountSyncStatus { + ready: true, + message: status_message, + conditions, + }, + FIELD_MANAGER, + ) + .await?; + + info!(cr = %name, "reconcile complete"); + Ok(Action::requeue(Duration::from_secs(300))) + } + + fn error_policy( + &self, + cr: Arc, + error: &KubeGenericError, + _ctx: Arc, + ) -> Action { + error!(cr = %cr.name_any(), error = %error, "reconcile failed — retrying in 5s"); + Action::requeue(Duration::from_secs(5)) + } +} + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +fn service_account_name(cr_name: &str) -> String { + format!("sas-{cr_name}") +} + +fn build_service_account( + name: &str, + namespace: &str, + owner_cr: &str, + automount_token: bool, + image_pull_secrets: &[String], +) -> ServiceAccount { + let refs: Vec = image_pull_secrets + .iter() + .map(|n| LocalObjectReference { name: n.clone() }) + .collect(); + + ServiceAccount { + metadata: ObjectMetaBuilder::new() + .name(name) + .namespace(namespace) + .label( + "app.kubernetes.io/managed-by", + "multicontroller-serviceaccountsync", + ) + .label("multicontroller.example.io/owner", owner_cr) + .build(), + automount_service_account_token: Some(automount_token), + image_pull_secrets: if refs.is_empty() { None } else { Some(refs) }, + ..Default::default() + } +} diff --git a/examples/multicontroller/src/types.rs b/examples/multicontroller/src/types.rs new file mode 100644 index 0000000..518993d --- /dev/null +++ b/examples/multicontroller/src/types.rs @@ -0,0 +1,88 @@ +// src/types.rs +// +// Defines two independent CRDs that this operator manages concurrently: +// +// SecretSync — ensures a Secret with the given string data exists +// in a target namespace. +// ServiceAccountSync — ensures a ServiceAccount with the given image-pull +// secrets exists in a target namespace. +// +// Each CRD is reconciled by its own controller (see secretsync.rs and +// serviceaccountsync.rs); main.rs runs both controllers side by side. + +use std::collections::BTreeMap; + +use koprs::status::KoprsCondition; +use kube::CustomResource; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +/// Spec section of the SecretSync CRD. +#[derive(CustomResource, Debug, Clone, Serialize, Deserialize, Default, JsonSchema)] +#[serde(rename_all = "camelCase")] +#[kube( + group = "example.io", + version = "v1alpha1", + kind = "SecretSync", + namespaced, + status = "SecretSyncStatus", + printcolumn = r#"{"name":"Target","type":"string","jsonPath":".spec.targetNamespace"}"#, + printcolumn = r#"{"name":"Ready","type":"string","jsonPath":".status.ready"}"# +)] +pub struct SecretSyncSpec { + /// Namespace where the Secret should be created/maintained. + pub target_namespace: String, + /// Plaintext key/value pairs to populate in the Secret's `stringData`. + pub string_data: BTreeMap, +} + +/// Status section written back by the operator after each reconcile. +/// Must derive JsonSchema because SecretSync's CustomResource derive requires it. +#[derive(Debug, Clone, Serialize, Deserialize, Default, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct SecretSyncStatus { + #[serde(default)] + pub ready: bool, + #[serde(default)] + pub message: String, + /// Standard Kubernetes conditions array. + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub conditions: Vec, +} + +/// Spec section of the ServiceAccountSync CRD. +#[derive(CustomResource, Debug, Clone, Serialize, Deserialize, Default, JsonSchema)] +#[serde(rename_all = "camelCase")] +#[kube( + group = "example.io", + version = "v1alpha1", + kind = "ServiceAccountSync", + namespaced, + status = "ServiceAccountSyncStatus", + printcolumn = r#"{"name":"Target","type":"string","jsonPath":".spec.targetNamespace"}"#, + printcolumn = r#"{"name":"Ready","type":"string","jsonPath":".status.ready"}"# +)] +pub struct ServiceAccountSyncSpec { + /// Namespace where the ServiceAccount should be created/maintained. + pub target_namespace: String, + /// Whether pods using this ServiceAccount automount its token. + #[serde(default)] + pub automount_token: bool, + /// Names of image-pull secrets to attach to the ServiceAccount. + #[serde(default)] + pub image_pull_secrets: Vec, +} + +/// Status section written back by the operator after each reconcile. +/// Must derive JsonSchema because ServiceAccountSync's CustomResource derive requires it. +#[derive(Debug, Clone, Serialize, Deserialize, Default, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct ServiceAccountSyncStatus { + #[serde(default)] + pub ready: bool, + #[serde(default)] + pub message: String, + /// Standard Kubernetes conditions array. + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub conditions: Vec, +} diff --git a/scripts/publish.sh b/scripts/publish.sh index 1d26cd0..a01d334 100755 --- a/scripts/publish.sh +++ b/scripts/publish.sh @@ -1,11 +1,6 @@ #!/usr/bin/env bash # scripts/publish.sh — publish koprs workspace crates to crates.io # -# Publishes in dependency order: -# 1. koprs-derive -# 2. koprs -# 3. koprs-gen -# # Usage: # ./scripts/publish.sh # full pre-flight + publish # ./scripts/publish.sh --dry-run # stop before `cargo publish` @@ -39,7 +34,7 @@ while [[ $# -gt 0 ]]; do echo "" echo " --dry-run run all checks but stop before cargo publish" echo " --skip-ci skip CI checks, go straight to packaging + publish" - echo " --crate publish a single crate (koprs-derive | koprs | koprs-gen)" + echo " --crate publish a single crate (koprs)" exit 0 ;; *) die "unknown argument: $1" ;; @@ -52,10 +47,7 @@ SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" ROOT_DIR="$(cd "${SCRIPT_DIR}/.." && pwd)" cd "${ROOT_DIR}" -# ── crate publish order ──────────────────────────────────────────────────────── -# Must follow dependency order — each crate must be on crates.io before the -# next one can reference it as a registry dependency. -ALL_CRATES=("koprs-derive" "koprs" "koprs-gen") +ALL_CRATES=("koprs") if [[ -n "${SINGLE_CRATE}" ]]; then valid=false diff --git a/todo.md b/todo.md deleted file mode 100644 index a980ab3..0000000 --- a/todo.md +++ /dev/null @@ -1,13 +0,0 @@ -Your operator binary -│ -├── uses koprs::controller::ControllerBuilder ← ties the loop together -├── uses koprs::crd::apply_crd ← startup bootstrap -├── uses koprs::health ← pod probes -├── uses koprs::leader ← HA -├── uses koprs::shutdown ← SIGTERM drain -│ -├── annotates CRDs with #[derive(KoResource)] from koprs-derive -│ └── emits inventory registrations for koprs-gen -│ -└── runs cargo run --bin generate ← koprs-gen writes manifests/ -