diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 6032deb..1ebf063 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -87,7 +87,7 @@ and the three client suites. ## Checked-In Generated Code -Four directories contain checked-in `buf generate` output and **must be +Five directories contain checked-in `buf generate` output and **must be regenerated** whenever `connectrpc-codegen` output changes (or the buffa dependency is bumped): @@ -95,6 +95,7 @@ dependency is bumped): - `examples/eliza/src/generated/` - `examples/multiservice/src/generated/` - `benches/rpc/src/generated/` +- `connectrpc-health/src/generated/` Regenerate all of them with: diff --git a/Cargo.toml b/Cargo.toml index f1c3033..7a6c048 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace] -members = ["connectrpc", "connectrpc-codegen", "connectrpc-build", "conformance", "examples/eliza", "examples/middleware", "examples/mtls-identity", "examples/multiservice", "examples/streaming-tour", "examples/wasm-client", "tests/streaming", "benches/rpc", "benches/rpc-tonic"] +members = ["connectrpc", "connectrpc-codegen", "connectrpc-build", "connectrpc-health", "conformance", "examples/eliza", "examples/middleware", "examples/mtls-identity", "examples/multiservice", "examples/streaming-tour", "examples/wasm-client", "tests/streaming", "benches/rpc", "benches/rpc-tonic"] resolver = "2" [workspace.package] diff --git a/README.md b/README.md index e603691..ca4d66f 100644 --- a/README.md +++ b/README.md @@ -26,6 +26,7 @@ connectrpc provides: - **`connectrpc`** — A Tower-based runtime library implementing the Connect protocol - **`protoc-gen-connect-rust`** — A `protoc` plugin that generates service traits, clients, and message types - **`connectrpc-build`** — `build.rs` integration for generating code at build time +- **`connectrpc-health`** — The standard `grpc.health.v1.Health` service, for `grpc_health_probe` / kubelet gRPC probes / service-mesh health checks The runtime is built on [`tower::Service`](https://docs.rs/tower/latest/tower/trait.Service.html), making it framework-agnostic. It integrates with any tower-compatible HTTP framework including [Axum](https://docs.rs/axum), [Hyper](https://docs.rs/hyper), and others. @@ -221,6 +222,10 @@ use std::sync::Arc; let service = Arc::new(MyGreetService); let connect = service.register(ConnectRouter::new()); +// Plain HTTP liveness probe for `kubectl`'s httpGet style. For the +// standard gRPC Health protocol (grpc_health_probe, kubelet `grpc:` +// probes), mount `connectrpc_health::HealthService` on the Connect +// router instead — see docs/guide.md#health-checking. let app = Router::new() .route("/health", get(|| async { "OK" })) .fallback_service(connect.into_axum_service()); @@ -301,6 +306,7 @@ The Quick Start above shows the unary path. For everything else, see the user gu - **Interceptors** (typed, async per-RPC middleware for unary and streaming calls) - see [docs/guide.md#interceptors](docs/guide.md#interceptors). Interceptors see the resolved `Spec`, headers, deadline, and a lazily decoded message body, and can rewrite or short-circuit the call - the equivalent of `connect-go`'s `WithInterceptors`. - **Tower middleware on the server** (gzip, raw header rewriting, generic HTTP concerns below the RPC layer) - see [docs/guide.md#tower-middleware](docs/guide.md#tower-middleware) and [`examples/middleware/`](examples/middleware) for a custom auth layer that stamps caller identity into request extensions. - **TLS / mTLS** - see [docs/guide.md#tls](docs/guide.md#tls) and [`examples/eliza/README.md`](examples/eliza/README.md) for cert generation and `Server::with_tls` / `HttpClient::with_tls` patterns. +- **gRPC health checking** (`grpc.health.v1.Health`, used by `grpc_health_probe`, kubelet `grpc:` probes, and service meshes) - see [docs/guide.md#health-checking](docs/guide.md#health-checking) and the [`connectrpc-health`](connectrpc-health/) crate. ## Feature Flags @@ -353,6 +359,49 @@ serde_json = "1" http-body = "1" ``` +### Optional: gate the client behind a Cargo feature + +If you want a server-only build of your crate to drop the +`connectrpc/client` transport stack, opt in to the cfg gate. With +`buf generate`: + +```yaml +# buf.gen.yaml +plugins: + - local: protoc-gen-connect-rust + out: src/gen/connect + opt: [buffa_module=crate::proto, gate_client_feature] +``` + +Or with `connectrpc-build` in `build.rs`: + +```rust +// build.rs +connectrpc_build::Config::new() + .files(&["proto/greet.proto"]) + .includes(&["proto/"]) + .gate_client_feature(true) + .compile()?; +``` + +The codegen then prefixes every emitted `FooClient` struct and its +`impl` block with `#[cfg(feature = "client")]`. Declare the feature in +your `Cargo.toml` to forward it through to the runtime dep: + +```toml +[features] +default = ["client"] +client = ["connectrpc/client"] + +[dependencies] +connectrpc = { version = "0.6", features = ["server"] } # no "client" +``` + +`cargo build --no-default-features` now leaves out the `FooClient` items +*and* drops `connectrpc/client` (the HTTP/2 transport stack) from the +dependency graph. See `connectrpc-health` for the minimal example. The +option is opt-in; the default emission is unconditional. + ## Protocol Support | Protocol | Status | diff --git a/Taskfile.yaml b/Taskfile.yaml index 200e157..4f54bdd 100644 --- a/Taskfile.yaml +++ b/Taskfile.yaml @@ -122,6 +122,12 @@ tasks: cmds: - buf generate + connectrpc-health:generate: + desc: Regenerate connectrpc-health Rust code from the vendored grpc.health.v1 proto + dir: "{{.ROOT_DIR}}/connectrpc-health" + cmds: + - buf generate + example:multiservice:server: desc: Run the multi-service server (greet, math, well-known types) cmds: @@ -374,6 +380,7 @@ tasks: - task: example:multiservice:generate - task: conformance:generate - task: bench:generate + - task: connectrpc-health:generate # =========================================================================== diff --git a/connectrpc-build/src/lib.rs b/connectrpc-build/src/lib.rs index 8a29b46..4bc763d 100644 --- a/connectrpc-build/src/lib.rs +++ b/connectrpc-build/src/lib.rs @@ -173,6 +173,23 @@ impl Config { self } + /// Prefix every generated `FooClient` struct and its `impl` block + /// with `#[cfg(feature = "client")]` (default: `false`). + /// + /// Opt in when you want a server-only build of your crate to drop + /// the `connectrpc/client` transport stack from its dependency + /// graph. The consumer crate then declares a `client` Cargo feature + /// that forwards to `connectrpc/client`; see the `# Client-side cfg + /// gate` section in [`connectrpc_codegen::codegen::generate`]'s + /// docs for the minimal pattern. With the option off (the default), + /// generated client items are unconditional — external consumers + /// don't have to declare any Cargo feature. + #[must_use] + pub fn gate_client_feature(mut self, enabled: bool) -> Self { + self.options.gate_client_feature = enabled; + self + } + /// Replace the underlying buffa [`CodeGenConfig`] wholesale. /// /// Any buffa knob not surfaced as a builder method here can be set this @@ -626,12 +643,14 @@ mod tests { .strict_utf8_mapping(true) .generate_json(false) .emit_register_fn(false) + .gate_client_feature(true) .include_file("_inc.rs"); assert_eq!(cfg.files.len(), 2); assert_eq!(cfg.includes.len(), 1); assert!(cfg.options.buffa.strict_utf8_mapping); assert!(!cfg.options.buffa.generate_json); assert!(!cfg.options.buffa.emit_register_fn); + assert!(cfg.options.gate_client_feature); assert_eq!(cfg.include_file.as_deref(), Some("_inc.rs")); } @@ -641,10 +660,71 @@ mod tests { assert!(!cfg.options.buffa.strict_utf8_mapping); assert!(cfg.options.buffa.generate_json); assert!(cfg.options.buffa.emit_register_fn); + // `gate_client_feature` defaults off — build.rs consumers don't + // have to declare a `client` Cargo feature unless they opt in. + assert!(!cfg.options.gate_client_feature); assert!(cfg.emit_rerun_directives); assert!(matches!(cfg.descriptor_source, DescriptorSource::Protoc)); } + /// End-to-end through `Config`: with `gate_client_feature(true)`, + /// the generated `__connect.rs` contains `#[cfg(feature = "client")]` + /// on the `EchoServiceClient` struct + impl. Without the opt-in, the + /// cfg attr is absent. Uses the same `echo.fds.bin` fixture as + /// [`compile_precompiled_descriptor_set`]. + #[test] + fn compile_gate_client_feature_emits_cfg_attr() { + let fixture = format!("{}/tests/fixtures/echo.fds.bin", env!("CARGO_MANIFEST_DIR")); + + // Opt-in: cfg attrs present on the client items. + let out_with = tempfile::tempdir().unwrap(); + Config::new() + .descriptor_set(&fixture) + .files(&["echo.proto"]) + .out_dir(out_with.path()) + .gate_client_feature(true) + .emit_rerun_directives(false) + .compile() + .expect("compile with gate_client_feature=true"); + let gated = std::fs::read_to_string(out_with.path().join("echo.__connect.rs")) + .expect("read gated __connect.rs"); + let cfg_count = gated.matches("#[cfg(feature = \"client\")]").count(); + assert_eq!( + cfg_count, 2, + "expected exactly 2 cfg attrs (struct + impl) with \ + gate_client_feature=true; got {cfg_count}:\n{gated}" + ); + // Sanity: the server-side trait + ext trait must not be gated. + for marker in ["pub trait EchoService", "pub trait EchoServiceExt"] { + let idx = gated + .find(marker) + .unwrap_or_else(|| panic!("expected `{marker}` in output:\n{gated}")); + let prefix = &gated[..idx]; + assert!( + !prefix.trim_end().ends_with("#[cfg(feature = \"client\")]"), + "`{marker}` must not be gated:\n{gated}" + ); + } + + // Opt-out (default): no cfg attrs anywhere in the same file. + let out_without = tempfile::tempdir().unwrap(); + Config::new() + .descriptor_set(&fixture) + .files(&["echo.proto"]) + .out_dir(out_without.path()) + .emit_rerun_directives(false) + .compile() + .expect("compile with default options"); + let ungated = std::fs::read_to_string(out_without.path().join("echo.__connect.rs")) + .expect("read default __connect.rs"); + assert!( + !ungated.contains("#[cfg(feature ="), + "default emission must not emit any cfg attr — external \ + consumers should not need to declare a `client` Cargo \ + feature unless they opt in. Got:\n{ungated}" + ); + } + #[test] fn config_emit_rerun_directives_toggle() { let cfg = Config::new().emit_rerun_directives(false); diff --git a/connectrpc-codegen/src/codegen.rs b/connectrpc-codegen/src/codegen.rs index 3933aa7..30d6308 100644 --- a/connectrpc-codegen/src/codegen.rs +++ b/connectrpc-codegen/src/codegen.rs @@ -57,13 +57,22 @@ pub struct Options { /// catch-all so every type resolves); it is ignored by /// [`generate_files`] (the unified `super::`-relative path). pub buffa: CodeGenConfig, + + /// When `true`, prefix every emitted `FooClient` struct and its + /// `impl` block with `#[cfg(feature = "client")]`. Opt in when + /// the consuming crate wants to give server-only deployments a way + /// to drop the client transport stack from their dependency graph. + pub gate_client_feature: bool, } impl Default for Options { fn default() -> Self { let mut buffa = CodeGenConfig::default(); buffa.generate_json = true; - Self { buffa } + Self { + buffa, + gate_client_feature: false, + } } } @@ -83,6 +92,7 @@ fn emit_service_files( proto_file: &[FileDescriptorProto], file_to_generate: &[String], resolver: &TypeResolver<'_>, + gate_client_feature: bool, ) -> Result> { let mut out = Vec::new(); // Dedup state shared across the whole batch, not per file: @@ -94,6 +104,7 @@ fn emit_service_files( // because the stitcher mounts sibling files into one module. let mut batch = BatchState { colliding_aliases: collect_alias_collisions(proto_file, file_to_generate), + gate_client_feature, ..BatchState::default() }; for file_name in file_to_generate { @@ -163,7 +174,12 @@ pub fn generate_files( .map_err(|e| anyhow::anyhow!("buffa-codegen failed: {e}"))?; let resolver = TypeResolver::new(proto_file, file_to_generate, &config, false); - let service_files = emit_service_files(proto_file, file_to_generate, &resolver)?; + let service_files = emit_service_files( + proto_file, + file_to_generate, + &resolver, + options.gate_client_feature, + )?; if config.file_per_package { // Under `file_per_package` buffa emits one `.rs` @@ -291,7 +307,12 @@ pub fn generate_services( let config = options.to_buffa_config(); let resolver = TypeResolver::new(proto_file, file_to_generate, &config, true); - let mut files = emit_service_files(proto_file, file_to_generate, &resolver)?; + let mut files = emit_service_files( + proto_file, + file_to_generate, + &resolver, + options.gate_client_feature, + )?; if config.file_per_package { // Collapse the per-proto split into one `.rs` per @@ -423,6 +444,29 @@ pub fn generate_services( /// `register_types(&mut TypeRegistry)` aggregator. See /// [`CodeGenConfig::emit_register_fn`]. Ignored in this plugin (no message /// types emitted); accepted for compatibility with the unified path. +/// - `gate_client_feature` — prefix every emitted `FooClient` +/// struct and its `impl` block with `#[cfg(feature = "client")]`. +/// +/// # Client-side cfg gate +/// +/// When `gate_client_feature` is set, the consumer crate must declare +/// a Cargo feature literally named `client`. Without it, the generated +/// `FooClient` items will be absent from the crate namespace. +/// +/// Two consumer patterns: +/// +/// 1. **Dep-forwarding** (`client = ["connectrpc/client"]`, with +/// `connectrpc = { ..., features = ["server"] }` and no `"client"` +/// in that dep's feature list): turns the gate into a real +/// server-only escape hatch. Disabling the feature drops +/// `connectrpc/client` (and its transport stack) from the +/// dependency graph entirely. This is the intended use; see +/// `connectrpc-health` for the minimal example. +/// +/// 2. **Marker** (`client = []`, no forwarding): satisfies the gate +/// without slimming the dependency graph. Use only when you want +/// the cfg infrastructure in place but aren't ready to gate the +/// dep yet. pub fn generate(request: &CodeGeneratorRequest) -> Result { let mut options = Options::default(); @@ -467,12 +511,13 @@ pub fn generate(request: &CodeGeneratorRequest) -> Result "strict_utf8_mapping" => options.buffa.strict_utf8_mapping = true, "no_json" => options.buffa.generate_json = false, "no_register_fn" => options.buffa.emit_register_fn = false, + "gate_client_feature" => options.gate_client_feature = true, _ => { return Err(anyhow::anyhow!( "unknown plugin option: {opt:?}. Supported: \ buffa_module=, extern_path==, \ file_per_package, strict_utf8_mapping, no_json, \ - no_register_fn" + no_register_fn, gate_client_feature" )); } } @@ -699,6 +744,12 @@ struct BatchState { /// /// [#75]: https://github.com/anthropics/connect-rust/issues/75 colliding_aliases: std::collections::BTreeSet<(String, String)>, + /// Mirrors [`Options::gate_client_feature`]. When `true`, prefix + /// each emitted `FooClient` struct + `impl` with + /// `#[cfg(feature = "client")]`. Threaded here so it propagates + /// through the per-file emission loop without changing every + /// helper's signature. + gate_client_feature: bool, } fn generate_connect_services( @@ -1242,6 +1293,18 @@ let owned = client.{example_method}(request).await?.into_owned(); ```"# ); let client_doc_tokens = doc_attrs(&client_doc); + // Opt-in `#[cfg(feature = "client")]` on every client-side item. + // + // INVARIANT: any future emission referencing + // `::connectrpc::client::*` (an additional `impl`, a free fn, a + // sibling trait, …) must also be prefixed with `#client_cfg_attr`. + // The `no_ungated_client_references` test enforces this by scanning + // the formatted output under the opt-in path. + let client_cfg_attr: TokenStream = if batch.gate_client_feature { + quote! { #[cfg(feature = "client")] } + } else { + TokenStream::new() + }; // Per-method `Spec` constants. Stable, allocation-free metadata that the // dispatcher threads into `RequestContext::spec` and that user code can @@ -1294,12 +1357,14 @@ let owned = client.{example_method}(request).await?.into_owned(); #service_server #client_doc_tokens + #client_cfg_attr #[derive(Clone)] pub struct #client_name { transport: T, config: ::connectrpc::client::ClientConfig, } + #client_cfg_attr impl #client_name where T: ::connectrpc::client::ClientTransport, @@ -1926,6 +1991,7 @@ fn find_comment(source_info: &SourceCodeInfo, target_path: &[i32]) -> Option String { + let file = minimal_file( + Some("example.v1"), + ".example.v1.PingReq", + ".example.v1.PingResp", + &["PingReq", "PingResp"], + ); + let config = buffa_codegen::CodeGenConfig::default(); + let target = file.name.clone().into_iter().collect::>(); + let resolver = TypeResolver::new(std::slice::from_ref(&file), &target, &config, false); + let service = &file.service[0]; + let batch = BatchState { + colliding_aliases: collect_alias_collisions(std::slice::from_ref(&file), &target), + gate_client_feature, + ..BatchState::default() + }; + format_token_stream(&generate_service(&file, service, &resolver, &batch).unwrap()).unwrap() + } + + #[test] + fn default_emission_has_no_client_cfg() { + // CRITICAL invariant: with the option unset, codegen emits zero + // `#[cfg(feature = "client")]` attrs. External users with their + // own protos must not be forced to declare a Cargo feature. + let out = format_minimal_service(false); + assert!( + !out.contains("#[cfg(feature ="), + "default emission must not emit any cfg attr — external \ + consumers should not need to declare a `client` Cargo \ + feature unless they explicitly opt in via the \ + `gate_client_feature` plugin option:\n{out}" + ); + } + + #[test] + fn client_items_gated_when_opt_in() { + // When `gate_client_feature` is set, the `FooClient` struct + + // impl carry `#[cfg(feature = "client")]`. Exactly two attrs: + // one on the struct, one on the impl block. (All `_with_options` + // methods live inside the impl and inherit the gate.) + let out = format_minimal_service(true); + let cfg_count = out.matches("#[cfg(feature = \"client\")]").count(); + assert_eq!( + cfg_count, 2, + "expected exactly two #[cfg(feature = \"client\")] attrs (one on \ + `pub struct PingServiceClient`, one on its `impl` block); got \ + {cfg_count}:\n{out}" + ); + } + + #[test] + fn server_items_never_carry_client_cfg() { + // The trait, ext trait, and monomorphic dispatcher live on the + // server side; nothing about them should be feature-gated even + // under the opt-in path. + let out = format_minimal_service(true); + for marker in [ + "pub trait PingService", + "pub trait PingServiceExt", + "pub struct PingServiceServer", + "pub const PING_SERVICE_SERVICE_NAME", + ] { + let idx = out + .find(marker) + .unwrap_or_else(|| panic!("expected `{marker}` in output:\n{out}")); + let prefix = &out[..idx]; + assert!( + !prefix.trim_end().ends_with("#[cfg(feature = \"client\")]"), + "`{marker}` must not be preceded by a client cfg attr — \ + server-side items are always compiled in:\n{out}" + ); + } + } + + /// The strongest invariant: every reference to + /// `::connectrpc::client::*` (or the unqualified `connectrpc::client::` + /// — should not appear, but guard anyway) must live inside an item + /// (or ancestor module/item) carrying `#[cfg(feature = "client")]`. + /// Catches the missing-gate regression that a count-only test cannot + /// detect: e.g. a future `impl Default for FooClient` that the + /// contributor forgot to prefix. + /// + /// Walks recursively into `Item::Mod` bodies so a gate on a parent + /// module implicitly covers its children — avoids false-positives + /// where a wrapper `pub mod gated { #[cfg(...)] … }` would flag the + /// outer module just because its rendered body mentions + /// `::connectrpc::client::`. + #[test] + fn no_ungated_client_references() { + // Only relevant under the opt-in path — that's where the + // invariant ("every `::connectrpc::client::*` reference lives + // inside a gated item") is meaningful. + let out = format_minimal_service(true); + let parsed: syn::File = syn::parse_str(&out).expect("output parses"); + + let mut offenders: Vec = Vec::new(); + scan_items_for_ungated_client_refs(&parsed.items, false, &mut offenders); + assert!( + offenders.is_empty(), + "every item that mentions `::connectrpc::client::*` must be \ + prefixed with `#[cfg(feature = \"client\")]`. Offenders:\n{}\n\nFull output:\n{out}", + offenders.join("\n") + ); + } + + /// Predicate: is this attribute `#[cfg(feature = "client")]`? + /// Stringifies the attr to avoid coupling to syn's parsed `Meta` + /// shape across versions. + fn is_client_feature_cfg(attr: &syn::Attribute) -> bool { + attr.path().is_ident("cfg") + && attr + .to_token_stream() + .to_string() + .contains("feature = \"client\"") + } + + /// Render `ts` through prettyplease (matching the spacing of the + /// rest of the codegen test surface) and check for any reference + /// to `::connectrpc::client::` or `connectrpc :: client ::` (the + /// pre-prettyplease form, defensive). + fn mentions_connectrpc_client(ts: TokenStream) -> bool { + let rendered = format_token_stream(&ts).unwrap_or_default(); + rendered.contains("::connectrpc::client::") || rendered.contains("connectrpc :: client ::") + } + + /// Recursive walker for `no_ungated_client_references`. For each + /// item: if the item or any ancestor is `#[cfg(feature = "client")]`, + /// it's gated and we skip. Otherwise, if its rendered tokens + /// mention `::connectrpc::client::`, push an offender entry. + /// `Item::Mod` recurses into its children so a parent-level gate + /// implicitly covers them. + /// + /// Item kinds the codegen doesn't currently emit at top level + /// (`Use`, `Static`, `Macro`, `ForeignMod`, `Union`, `TraitAlias`, + /// `ExternCrate`, `Verbatim`, …) still go through the textual scan + /// via the fallthrough arm — they're not gated by anything we can + /// inspect, so if their token rendering mentions + /// `::connectrpc::client::` they're flagged. This is the defensive + /// shape: a future emission that introduces e.g. an ungated + /// `use ::connectrpc::client::ClientConfig;` at module scope must + /// not slip past the invariant test. + fn scan_items_for_ungated_client_refs( + items: &[syn::Item], + ancestor_gated: bool, + offenders: &mut Vec, + ) { + for item in items { + // Extract attrs for the kinds we explicitly model. For + // everything else we treat the item as not self-gated and + // fall through to the textual scan — better a false + // positive on an exotic ungated emission than silently + // missing a real one. + let (attrs, ident): (&[syn::Attribute], String) = match item { + syn::Item::Struct(s) => (&s.attrs, s.ident.to_string()), + syn::Item::Impl(i) => ( + &i.attrs, + format!("impl-block for {}", ToTokens::to_token_stream(&i.self_ty)), + ), + syn::Item::Fn(f) => (&f.attrs, f.sig.ident.to_string()), + syn::Item::Trait(t) => (&t.attrs, t.ident.to_string()), + syn::Item::Const(c) => (&c.attrs, c.ident.to_string()), + syn::Item::Type(t) => (&t.attrs, t.ident.to_string()), + syn::Item::Static(s) => (&s.attrs, s.ident.to_string()), + syn::Item::Use(u) => (&u.attrs, "use-item".to_string()), + syn::Item::ExternCrate(e) => (&e.attrs, e.ident.to_string()), + syn::Item::Macro(m) => ( + &m.attrs, + m.ident + .as_ref() + .map(syn::Ident::to_string) + .unwrap_or_else(|| "macro-item".to_string()), + ), + syn::Item::ForeignMod(f) => (&f.attrs, "extern-block".to_string()), + syn::Item::Union(u) => (&u.attrs, u.ident.to_string()), + syn::Item::TraitAlias(t) => (&t.attrs, t.ident.to_string()), + syn::Item::Enum(e) => (&e.attrs, e.ident.to_string()), + syn::Item::Mod(m) => { + let self_gated = m.attrs.iter().any(is_client_feature_cfg); + let gated = ancestor_gated || self_gated; + if let Some((_brace, children)) = &m.content { + scan_items_for_ungated_client_refs(children, gated, offenders); + } + // Don't fall through — the textual scan on a Mod's + // tokens would render its children too and double-count. + continue; + } + // `Item::Verbatim` and any future syn variant: we can't + // inspect attrs, so assume not self-gated and let the + // textual scan decide. + _ => (&[][..], "".to_string()), + }; + let self_gated = attrs.iter().any(is_client_feature_cfg); + let gated = ancestor_gated || self_gated; + if gated { + continue; + } + if mentions_connectrpc_client(ToTokens::to_token_stream(item)) { + offenders.push(format!( + "ungated reference to ::connectrpc::client in `{ident}`" + )); + } + } + } + + /// Verify the recursive scanner: a parent module gated on `client` + /// covers its children (no false-positive); an ungated parent + /// containing an ungated child gets flagged via the child, not the + /// parent's textual rendering (no double-counting). + #[test] + fn ungated_scanner_handles_nested_modules() { + // Case 1: gated parent + ungated-looking child → no offenders. + let parsed: syn::File = syn::parse_str( + r#" + #[cfg(feature = "client")] + pub mod gated_parent { + pub struct WithClientRef { + field: ::connectrpc::client::ClientConfig, + } + } + "#, + ) + .unwrap(); + let mut offenders = Vec::new(); + scan_items_for_ungated_client_refs(&parsed.items, false, &mut offenders); + assert!( + offenders.is_empty(), + "parent-level cfg must cover children: {offenders:?}" + ); + + // Case 2: ungated parent + ungated child referencing client → exactly + // ONE offender (the inner struct), not two (parent + child). + let parsed: syn::File = syn::parse_str( + r#" + pub mod ungated_parent { + pub struct WithClientRef { + field: ::connectrpc::client::ClientConfig, + } + } + "#, + ) + .unwrap(); + let mut offenders = Vec::new(); + scan_items_for_ungated_client_refs(&parsed.items, false, &mut offenders); + assert_eq!( + offenders.len(), + 1, + "exactly one offender expected (the inner struct), not the wrapping \ + module: {offenders:?}" + ); + assert!( + offenders[0].contains("WithClientRef"), + "offender should name the inner struct: {:?}", + offenders[0] + ); + + // Case 3: ungated parent containing a gated child → no offenders. + let parsed: syn::File = syn::parse_str( + r#" + pub mod outer { + #[cfg(feature = "client")] + pub struct GatedClient { + field: ::connectrpc::client::ClientConfig, + } + } + "#, + ) + .unwrap(); + let mut offenders = Vec::new(); + scan_items_for_ungated_client_refs(&parsed.items, false, &mut offenders); + assert!( + offenders.is_empty(), + "self-gating child inside ungated module must be OK: {offenders:?}" + ); + } + + /// Regression: the scanner must not silently skip `syn::Item` variants + /// the codegen doesn't currently emit. A future ungated + /// `use ::connectrpc::client::ClientConfig;` or a `static` + /// referencing the client module would have slipped past the + /// earlier `_ => continue` catch-all; the expanded variant arms + + /// fallthrough textual scan catch it now. + #[test] + fn ungated_scanner_catches_use_and_static_items() { + // Item::Use, ungated → flagged. + let parsed: syn::File = syn::parse_str("use ::connectrpc::client::ClientConfig;").unwrap(); + let mut offenders = Vec::new(); + scan_items_for_ungated_client_refs(&parsed.items, false, &mut offenders); + assert_eq!( + offenders.len(), + 1, + "ungated `use ::connectrpc::client::*` must be flagged: {offenders:?}" + ); + + // Item::Use, gated → OK. + let parsed: syn::File = + syn::parse_str("#[cfg(feature = \"client\")] use ::connectrpc::client::ClientConfig;") + .unwrap(); + let mut offenders = Vec::new(); + scan_items_for_ungated_client_refs(&parsed.items, false, &mut offenders); + assert!( + offenders.is_empty(), + "gated `use ::connectrpc::client::*` must NOT be flagged: {offenders:?}" + ); + + // Item::Static, ungated, referencing client module → flagged. + let parsed: syn::File = + syn::parse_str("static FOO: &str = stringify!(::connectrpc::client::ClientConfig);") + .unwrap(); + let mut offenders = Vec::new(); + scan_items_for_ungated_client_refs(&parsed.items, false, &mut offenders); + assert_eq!( + offenders.len(), + 1, + "ungated `static FOO` mentioning ::connectrpc::client must be flagged: \ + {offenders:?}" + ); + } + + #[test] + fn client_cfg_round_trips_through_prettyplease() { + // Sanity: prettyplease formats the cfg attr to exactly the + // canonical spelling we grep for in the count test. If a future + // formatting change reshapes the attribute (e.g. inserts spaces), + // the count test would silently report zero matches — make sure + // we'd notice. + let out = format_minimal_service(true); + // The exact rendered form prettyplease uses; if this assertion + // ever fails we need to update the other test's grep pattern. + assert!( + out.contains("#[cfg(feature = \"client\")]"), + "prettyplease no longer renders the cfg attr as expected; \ + update the grep pattern in client_items_always_gated:\n{out}" + ); + } + + #[test] + fn multi_service_in_one_file_each_client_is_gated() { + // Two services in the same file → 4 cfg attrs (2 per FooClient). + // Catches a regression where the cfg interpolation accidentally + // moved outside the per-service token block. + let make_service = |name: &str| ServiceDescriptorProto { + name: Some(name.into()), + method: vec![MethodDescriptorProto { + name: Some("Ping".into()), + input_type: Some(".example.v1.PingReq".into()), + output_type: Some(".example.v1.PingResp".into()), + ..Default::default() + }], + ..Default::default() + }; + let file = FileDescriptorProto { + name: Some("two.proto".into()), + package: Some("example.v1".into()), + service: vec![make_service("Alpha"), make_service("Beta")], + message_type: vec![ + DescriptorProto { + name: Some("PingReq".into()), + ..Default::default() + }, + DescriptorProto { + name: Some("PingResp".into()), + ..Default::default() + }, + ], + ..Default::default() + }; + let config = buffa_codegen::CodeGenConfig::default(); + let target = vec!["two.proto".to_string()]; + let resolver = TypeResolver::new(std::slice::from_ref(&file), &target, &config, false); + let mut batch = BatchState { + colliding_aliases: collect_alias_collisions(std::slice::from_ref(&file), &target), + gate_client_feature: true, + ..BatchState::default() + }; + let ts = generate_connect_services(&file, &resolver, &mut batch).unwrap(); + let out = format_token_stream(&ts).unwrap(); + let cfg_count = out.matches("#[cfg(feature = \"client\")]").count(); + assert_eq!( + cfg_count, 4, + "expected 4 client cfg attrs (2 per service * 2 services); got \ + {cfg_count}:\n{out}" + ); + // Both client structs are present, both gated. + for client_struct in ["pub struct AlphaClient", "pub struct BetaClient"] { + let idx = out + .find(client_struct) + .unwrap_or_else(|| panic!("expected `{client_struct}` in output:\n{out}")); + let prefix = &out[..idx]; + assert!( + prefix.trim_end().ends_with("#[derive(Clone)]") + || prefix.contains("#[cfg(feature = \"client\")]"), + "`{client_struct}` must have a client cfg attr in its \ + attribute cluster:\n{out}" + ); + } + } + + #[test] + fn plugin_accepts_gate_client_feature_flag() { + // The current option is a bare flag (no `=value`). + let request = CodeGeneratorRequest { + parameter: Some("buffa_module=crate::proto,gate_client_feature".into()), + file_to_generate: vec![], + proto_file: vec![], + ..Default::default() + }; + generate(&request).expect("gate_client_feature should be a recognized plugin option"); + } + + #[test] + fn plugin_rejects_old_client_feature_value_form() { + // The previous design used `client_feature=` with an + // arbitrary feature name. That option was renamed to the bare + // flag `gate_client_feature` (the feature name is fixed as + // `client`). A stale buf.gen.yaml using the old form must fail + // loudly, not silently no-op. + let request = CodeGeneratorRequest { + parameter: Some("buffa_module=crate::proto,client_feature=client".into()), + file_to_generate: vec![], + proto_file: vec![], + ..Default::default() + }; + let err = generate(&request) + .expect_err("legacy `client_feature=…` option must now fail as unknown"); + let msg = err.to_string(); + assert!( + msg.contains("client_feature"), + "error should name the offending option: {msg}" + ); + assert!( + msg.contains("unknown plugin option"), + "error should say the option is unknown: {msg}" + ); + } + #[test] fn plugin_file_per_package_collapses_output() { // End-to-end through the protoc entry point: one `.rs` diff --git a/connectrpc-health/Cargo.toml b/connectrpc-health/Cargo.toml new file mode 100644 index 0000000..733cabe --- /dev/null +++ b/connectrpc-health/Cargo.toml @@ -0,0 +1,42 @@ +[package] +name = "connectrpc-health" +version = "0.6.1" +edition.workspace = true +rust-version.workspace = true +license.workspace = true +repository.workspace = true +readme = "../README.md" +description = "gRPC health-checking service for connectrpc (wire-compatible with grpc_health_v1)" +keywords = ["connectrpc", "grpc", "health", "rpc"] +categories = ["network-programming", "web-programming::http-server"] + +[features] +default = ["client"] +# Pulls in `connectrpc/client` and exposes the generated `HealthClient` +# re-export for callers that want to probe a health server (integration +# tests, sidecar tooling, dev binaries). Server-only deployments opt out +# with `default-features = false` to drop the client transport stack +# from the dependency graph. +client = ["connectrpc/client"] + +[dependencies] +connectrpc = { path = "../connectrpc", version = "0.6", features = ["server"] } + +# Protobuf + serde (pulled in by the generated module paths) +buffa = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +http-body = { workspace = true } + +# Async +tokio = { workspace = true, features = ["sync", "macros"] } +tokio-stream = { workspace = true, features = ["sync"] } +futures = { workspace = true } + +[dev-dependencies] +connectrpc = { path = "../connectrpc", features = ["axum", "client", "server"] } +tokio = { workspace = true, features = ["full"] } +axum = { workspace = true, features = ["tokio", "http1"] } + +[lints] +workspace = true diff --git a/connectrpc-health/buf.gen.yaml b/connectrpc-health/buf.gen.yaml new file mode 100644 index 0000000..0fe27c1 --- /dev/null +++ b/connectrpc-health/buf.gen.yaml @@ -0,0 +1,20 @@ +version: v2 +plugins: + - local: ../../buffa/target/release/protoc-gen-buffa + out: src/generated/buffa + opt: [views=true, json=true] + - local: ../../buffa/target/release/protoc-gen-buffa-packaging + out: src/generated/buffa + strategy: all + - local: ../target/release/protoc-gen-connect-rust + out: src/generated/connect + # `gate_client_feature` puts every emitted `HealthClient` item + # behind `#[cfg(feature = "client")]` so a server-only build of + # `connectrpc-health` can drop `connectrpc/client` from the + # dependency graph. The feature is wired up in this crate's + # Cargo.toml under `[features] client = ["connectrpc/client"]`. + opt: [buffa_module=crate::proto, gate_client_feature] + - local: ../../buffa/target/release/protoc-gen-buffa-packaging + out: src/generated/connect + strategy: all + opt: [filter=services] diff --git a/connectrpc-health/buf.yaml b/connectrpc-health/buf.yaml new file mode 100644 index 0000000..355a79a --- /dev/null +++ b/connectrpc-health/buf.yaml @@ -0,0 +1,6 @@ +# Local proto module configuration for connectrpc-health. +# The generated code is checked into src/generated/. +# To regenerate: task connectrpc-health:generate +version: v2 +modules: + - path: proto diff --git a/connectrpc-health/proto/grpc/health/v1/health.proto b/connectrpc-health/proto/grpc/health/v1/health.proto new file mode 100644 index 0000000..44a3563 --- /dev/null +++ b/connectrpc-health/proto/grpc/health/v1/health.proto @@ -0,0 +1,64 @@ +// Copyright 2015 The gRPC Authors All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// The canonical version of this proto can be found at +// https://github.com/grpc/grpc-proto/blob/master/grpc/health/v1/health.proto + +syntax = "proto3"; + +package grpc.health.v1; + +option csharp_namespace = "Grpc.Health.V1"; +option go_package = "google.golang.org/grpc/health/grpc_health_v1"; +option java_multiple_files = true; +option java_outer_classname = "HealthProto"; +option java_package = "io.grpc.health.v1"; + +message HealthCheckRequest { + string service = 1; +} + +message HealthCheckResponse { + enum ServingStatus { + UNKNOWN = 0; + SERVING = 1; + NOT_SERVING = 2; + SERVICE_UNKNOWN = 3; // Used only by the Watch method. + } + + ServingStatus status = 1; +} + +service Health { + // Check returns the serving status of the requested service. If the + // service name is empty, the response covers the whole server. + rpc Check(HealthCheckRequest) returns (HealthCheckResponse); + + // Watch performs a watch for the serving status of the requested service. + // The server will immediately send back a message indicating the current + // serving status. It will then subsequently send a new message whenever + // the service's serving status changes. + // + // If the requested service is unknown when the call is received, the + // server will send a message setting the serving status to SERVICE_UNKNOWN + // but will *not* terminate the call. If at some future point, the serving + // status of the service becomes known, the server will send a new message + // with the service's serving status. + // + // If the call terminates with status UNIMPLEMENTED, then the client should + // assume this method is not supported and should not retry the call. If + // the call terminates with any other status (including OK), then the + // client should retry the call with appropriate exponential backoff. + rpc Watch(HealthCheckRequest) returns (stream HealthCheckResponse); +} diff --git a/connectrpc-health/src/checker.rs b/connectrpc-health/src/checker.rs new file mode 100644 index 0000000..711d492 --- /dev/null +++ b/connectrpc-health/src/checker.rs @@ -0,0 +1,109 @@ +//! The [`Checker`] trait that user code implements. + +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use connectrpc::ConnectError; +use futures::Stream; +use tokio::sync::watch; +use tokio_stream::wrappers::WatchStream; + +use crate::Status; + +/// Reports the health of services on this server. +/// +/// # Example +/// +/// ```no_run +/// use connectrpc::ConnectError; +/// use connectrpc_health::{Checker, Status}; +/// +/// struct MyChecker; +/// +/// impl Checker for MyChecker { +/// async fn check(&self, service: &str) -> Result { +/// match service { +/// "" | "acme.user.v1.UserService" => Ok(Status::Serving), +/// _ => Err(ConnectError::not_found(format!("unknown service {service}"))), +/// } +/// } +/// } +/// ``` +pub trait Checker: Send + Sync + 'static { + /// Check the health of `service`. An empty `service` asks for the + /// whole-process status. + /// + /// # Errors + /// + /// Return `Err(ConnectError::not_found(_))` for any service the + /// implementation doesn't recognize. + fn check(&self, service: &str) -> impl Future> + Send; + + /// Subscribe to status changes for `service`. The returned + /// [`StatusStream`] yields the current status immediately, then a + /// new value on every subsequent change. Updates may be coalesced. + /// + /// # Default body returns `Unimplemented` + /// + /// The provided default implementation reports + /// [`Unimplemented`](::connectrpc::ErrorCode::Unimplemented), which is + /// fine for Check-only deployments — kubelet's `grpc:` probe and + /// `grpc_health_probe` only call Check. **If your callers stream + /// Watch (service meshes, gRPC clients with health-based balancing), + /// override this** or they will see every Watch RPC fail. + /// [`StaticChecker`](crate::StaticChecker) provides a working + /// `watch` for the common static-status case. + /// + /// # Errors + /// + /// Overrides should return `Err(ConnectError::not_found(_))` for + /// unknown services. + fn watch( + &self, + service: &str, + ) -> impl Future> + Send { + let _ = service; + async { + Err(ConnectError::unimplemented( + "watching health state is not supported", + )) + } + } +} + +/// A stream of [`Status`] updates produced by [`Checker::watch`]. +pub struct StatusStream { + inner: Pin + Send + 'static>>, +} + +impl StatusStream { + /// Wrap an arbitrary [`Stream`] of status updates. + #[must_use] + pub fn new(stream: impl Stream + Send + 'static) -> Self { + Self { + inner: Box::pin(stream), + } + } + + /// Wrap a [`tokio::sync::watch::Receiver`]. Preferred over + /// [`new`](Self::new) when the checker already has a `watch::Sender`. + #[must_use] + pub fn from_watch(receiver: watch::Receiver) -> Self { + Self::new(WatchStream::new(receiver)) + } +} + +impl Stream for StatusStream { + type Item = Status; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.inner.as_mut().poll_next(cx) + } +} + +impl std::fmt::Debug for StatusStream { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("StatusStream").finish_non_exhaustive() + } +} diff --git a/connectrpc-health/src/generated/buffa/grpc.health.v1.health.__view.rs b/connectrpc-health/src/generated/buffa/grpc.health.v1.health.__view.rs new file mode 100644 index 0000000..f58bd89 --- /dev/null +++ b/connectrpc-health/src/generated/buffa/grpc.health.v1.health.__view.rs @@ -0,0 +1,360 @@ +// @generated by buffa-codegen. DO NOT EDIT. +// source: grpc/health/v1/health.proto + +#[derive(Clone, Debug, Default)] +pub struct HealthCheckRequestView<'a> { + /// Field 1: `service` + pub service: &'a str, + pub __buffa_unknown_fields: ::buffa::UnknownFieldsView<'a>, +} +impl<'a> HealthCheckRequestView<'a> { + /// Decode from `buf`, enforcing a recursion depth limit for nested messages. + /// + /// Called by [`::buffa::MessageView::decode_view`] with [`::buffa::RECURSION_LIMIT`] + /// and by generated sub-message decode arms with `depth - 1`. + /// + /// **Not part of the public API.** Named with a leading underscore to + /// signal that it is for generated-code use only. + #[doc(hidden)] + pub fn _decode_depth( + buf: &'a [u8], + depth: u32, + ) -> ::core::result::Result { + let mut view = Self::default(); + view._merge_into_view(buf, depth)?; + ::core::result::Result::Ok(view) + } + /// Merge fields from `buf` into this view (proto merge semantics). + /// + /// Repeated fields append; singular fields last-wins; singular + /// MESSAGE fields merge recursively. Used by sub-message decode + /// arms when the same field appears multiple times on the wire. + /// + /// **Not part of the public API.** + #[doc(hidden)] + pub fn _merge_into_view( + &mut self, + buf: &'a [u8], + depth: u32, + ) -> ::core::result::Result<(), ::buffa::DecodeError> { + let _ = depth; + #[allow(unused_variables)] + let view = self; + let mut cur: &'a [u8] = buf; + while !cur.is_empty() { + let before_tag = cur; + let tag = ::buffa::encoding::Tag::decode(&mut cur)?; + match tag.field_number() { + 1u32 => { + if tag.wire_type() != ::buffa::encoding::WireType::LengthDelimited { + return ::core::result::Result::Err(::buffa::DecodeError::WireTypeMismatch { + field_number: 1u32, + expected: 2u8, + actual: tag.wire_type() as u8, + }); + } + view.service = ::buffa::types::borrow_str(&mut cur)?; + } + _ => { + ::buffa::encoding::skip_field_depth(tag, &mut cur, depth)?; + let span_len = before_tag.len() - cur.len(); + view.__buffa_unknown_fields.push_raw(&before_tag[..span_len]); + } + } + } + ::core::result::Result::Ok(()) + } +} +impl<'a> ::buffa::MessageView<'a> for HealthCheckRequestView<'a> { + type Owned = super::super::HealthCheckRequest; + fn decode_view(buf: &'a [u8]) -> ::core::result::Result { + Self::_decode_depth(buf, ::buffa::RECURSION_LIMIT) + } + fn decode_view_with_limit( + buf: &'a [u8], + depth: u32, + ) -> ::core::result::Result { + Self::_decode_depth(buf, depth) + } + fn to_owned_message(&self) -> super::super::HealthCheckRequest { + self.to_owned_from_source(None) + } + #[allow(clippy::useless_conversion, clippy::needless_update)] + fn to_owned_from_source( + &self, + __buffa_src: ::core::option::Option<&::buffa::bytes::Bytes>, + ) -> super::super::HealthCheckRequest { + #[allow(unused_imports)] + use ::buffa::alloc::string::ToString as _; + let _ = __buffa_src; + super::super::HealthCheckRequest { + service: self.service.to_string(), + __buffa_unknown_fields: self + .__buffa_unknown_fields + .to_owned() + .unwrap_or_default() + .into(), + ..::core::default::Default::default() + } + } +} +impl<'a> ::buffa::ViewEncode<'a> for HealthCheckRequestView<'a> { + #[allow(clippy::needless_borrow, clippy::let_and_return)] + fn compute_size(&self, _cache: &mut ::buffa::SizeCache) -> u32 { + #[allow(unused_imports)] + use ::buffa::Enumeration as _; + let mut size = 0u32; + if !self.service.is_empty() { + size += 1u32 + ::buffa::types::string_encoded_len(&self.service) as u32; + } + size += self.__buffa_unknown_fields.encoded_len() as u32; + size + } + #[allow(clippy::needless_borrow)] + fn write_to( + &self, + _cache: &mut ::buffa::SizeCache, + buf: &mut impl ::buffa::bytes::BufMut, + ) { + #[allow(unused_imports)] + use ::buffa::Enumeration as _; + if !self.service.is_empty() { + ::buffa::encoding::Tag::new( + 1u32, + ::buffa::encoding::WireType::LengthDelimited, + ) + .encode(buf); + ::buffa::types::encode_string(&self.service, buf); + } + self.__buffa_unknown_fields.write_to(buf); + } +} +/// Serializes this view as protobuf JSON. +/// +/// Implicit-presence fields with default values are omitted, `required` +/// fields are always emitted, explicit-presence (`optional`) fields are +/// emitted only when set, bytes fields are base64-encoded, and enum +/// values are their proto name strings. +/// +/// This impl uses `serialize_map(None)` because the number of emitted +/// fields depends on default-omission rules; serializers that require +/// known map lengths (e.g. `bincode`) will return a runtime error. +/// Use the owned message type for those formats. +impl<'__a> ::serde::Serialize for HealthCheckRequestView<'__a> { + fn serialize<__S: ::serde::Serializer>( + &self, + __s: __S, + ) -> ::core::result::Result<__S::Ok, __S::Error> { + use ::serde::ser::SerializeMap as _; + let mut __map = __s.serialize_map(::core::option::Option::None)?; + if !::buffa::json_helpers::skip_if::is_empty_str(self.service) { + __map.serialize_entry("service", self.service)?; + } + __map.end() + } +} +impl<'a> ::buffa::MessageName for HealthCheckRequestView<'a> { + const PACKAGE: &'static str = "grpc.health.v1"; + const NAME: &'static str = "HealthCheckRequest"; + const FULL_NAME: &'static str = "grpc.health.v1.HealthCheckRequest"; + const TYPE_URL: &'static str = "type.googleapis.com/grpc.health.v1.HealthCheckRequest"; +} +impl<'v> ::buffa::DefaultViewInstance for HealthCheckRequestView<'v> { + fn default_view_instance<'a>() -> &'a Self + where + Self: 'a, + { + static VALUE: ::buffa::__private::OnceBox> = ::buffa::__private::OnceBox::new(); + VALUE + .get_or_init(|| ::buffa::alloc::boxed::Box::new( + >::default(), + )) + } +} +impl ::buffa::ViewReborrow for HealthCheckRequestView<'static> { + type Reborrowed<'b> = HealthCheckRequestView<'b>; + fn reborrow<'b>(this: &'b Self) -> &'b Self::Reborrowed<'b> { + this + } +} +#[derive(Clone, Debug, Default)] +pub struct HealthCheckResponseView<'a> { + /// Field 1: `status` + pub status: ::buffa::EnumValue, + pub __buffa_unknown_fields: ::buffa::UnknownFieldsView<'a>, +} +impl<'a> HealthCheckResponseView<'a> { + /// Decode from `buf`, enforcing a recursion depth limit for nested messages. + /// + /// Called by [`::buffa::MessageView::decode_view`] with [`::buffa::RECURSION_LIMIT`] + /// and by generated sub-message decode arms with `depth - 1`. + /// + /// **Not part of the public API.** Named with a leading underscore to + /// signal that it is for generated-code use only. + #[doc(hidden)] + pub fn _decode_depth( + buf: &'a [u8], + depth: u32, + ) -> ::core::result::Result { + let mut view = Self::default(); + view._merge_into_view(buf, depth)?; + ::core::result::Result::Ok(view) + } + /// Merge fields from `buf` into this view (proto merge semantics). + /// + /// Repeated fields append; singular fields last-wins; singular + /// MESSAGE fields merge recursively. Used by sub-message decode + /// arms when the same field appears multiple times on the wire. + /// + /// **Not part of the public API.** + #[doc(hidden)] + pub fn _merge_into_view( + &mut self, + buf: &'a [u8], + depth: u32, + ) -> ::core::result::Result<(), ::buffa::DecodeError> { + let _ = depth; + #[allow(unused_variables)] + let view = self; + let mut cur: &'a [u8] = buf; + while !cur.is_empty() { + let before_tag = cur; + let tag = ::buffa::encoding::Tag::decode(&mut cur)?; + match tag.field_number() { + 1u32 => { + if tag.wire_type() != ::buffa::encoding::WireType::Varint { + return ::core::result::Result::Err(::buffa::DecodeError::WireTypeMismatch { + field_number: 1u32, + expected: 0u8, + actual: tag.wire_type() as u8, + }); + } + view.status = ::buffa::EnumValue::from( + ::buffa::types::decode_int32(&mut cur)?, + ); + } + _ => { + ::buffa::encoding::skip_field_depth(tag, &mut cur, depth)?; + let span_len = before_tag.len() - cur.len(); + view.__buffa_unknown_fields.push_raw(&before_tag[..span_len]); + } + } + } + ::core::result::Result::Ok(()) + } +} +impl<'a> ::buffa::MessageView<'a> for HealthCheckResponseView<'a> { + type Owned = super::super::HealthCheckResponse; + fn decode_view(buf: &'a [u8]) -> ::core::result::Result { + Self::_decode_depth(buf, ::buffa::RECURSION_LIMIT) + } + fn decode_view_with_limit( + buf: &'a [u8], + depth: u32, + ) -> ::core::result::Result { + Self::_decode_depth(buf, depth) + } + fn to_owned_message(&self) -> super::super::HealthCheckResponse { + self.to_owned_from_source(None) + } + #[allow(clippy::useless_conversion, clippy::needless_update)] + fn to_owned_from_source( + &self, + __buffa_src: ::core::option::Option<&::buffa::bytes::Bytes>, + ) -> super::super::HealthCheckResponse { + #[allow(unused_imports)] + use ::buffa::alloc::string::ToString as _; + let _ = __buffa_src; + super::super::HealthCheckResponse { + status: self.status, + __buffa_unknown_fields: self + .__buffa_unknown_fields + .to_owned() + .unwrap_or_default() + .into(), + ..::core::default::Default::default() + } + } +} +impl<'a> ::buffa::ViewEncode<'a> for HealthCheckResponseView<'a> { + #[allow(clippy::needless_borrow, clippy::let_and_return)] + fn compute_size(&self, _cache: &mut ::buffa::SizeCache) -> u32 { + #[allow(unused_imports)] + use ::buffa::Enumeration as _; + let mut size = 0u32; + { + let val = self.status.to_i32(); + if val != 0 { + size += 1u32 + ::buffa::types::int32_encoded_len(val) as u32; + } + } + size += self.__buffa_unknown_fields.encoded_len() as u32; + size + } + #[allow(clippy::needless_borrow)] + fn write_to( + &self, + _cache: &mut ::buffa::SizeCache, + buf: &mut impl ::buffa::bytes::BufMut, + ) { + #[allow(unused_imports)] + use ::buffa::Enumeration as _; + { + let val = self.status.to_i32(); + if val != 0 { + ::buffa::encoding::Tag::new(1u32, ::buffa::encoding::WireType::Varint) + .encode(buf); + ::buffa::types::encode_int32(val, buf); + } + } + self.__buffa_unknown_fields.write_to(buf); + } +} +/// Serializes this view as protobuf JSON. +/// +/// Implicit-presence fields with default values are omitted, `required` +/// fields are always emitted, explicit-presence (`optional`) fields are +/// emitted only when set, bytes fields are base64-encoded, and enum +/// values are their proto name strings. +/// +/// This impl uses `serialize_map(None)` because the number of emitted +/// fields depends on default-omission rules; serializers that require +/// known map lengths (e.g. `bincode`) will return a runtime error. +/// Use the owned message type for those formats. +impl<'__a> ::serde::Serialize for HealthCheckResponseView<'__a> { + fn serialize<__S: ::serde::Serializer>( + &self, + __s: __S, + ) -> ::core::result::Result<__S::Ok, __S::Error> { + use ::serde::ser::SerializeMap as _; + let mut __map = __s.serialize_map(::core::option::Option::None)?; + if !::buffa::json_helpers::skip_if::is_default_enum_value(&self.status) { + __map.serialize_entry("status", &self.status)?; + } + __map.end() + } +} +impl<'a> ::buffa::MessageName for HealthCheckResponseView<'a> { + const PACKAGE: &'static str = "grpc.health.v1"; + const NAME: &'static str = "HealthCheckResponse"; + const FULL_NAME: &'static str = "grpc.health.v1.HealthCheckResponse"; + const TYPE_URL: &'static str = "type.googleapis.com/grpc.health.v1.HealthCheckResponse"; +} +impl<'v> ::buffa::DefaultViewInstance for HealthCheckResponseView<'v> { + fn default_view_instance<'a>() -> &'a Self + where + Self: 'a, + { + static VALUE: ::buffa::__private::OnceBox> = ::buffa::__private::OnceBox::new(); + VALUE + .get_or_init(|| ::buffa::alloc::boxed::Box::new( + >::default(), + )) + } +} +impl ::buffa::ViewReborrow for HealthCheckResponseView<'static> { + type Reborrowed<'b> = HealthCheckResponseView<'b>; + fn reborrow<'b>(this: &'b Self) -> &'b Self::Reborrowed<'b> { + this + } +} diff --git a/connectrpc-health/src/generated/buffa/grpc.health.v1.health.rs b/connectrpc-health/src/generated/buffa/grpc.health.v1.health.rs new file mode 100644 index 0000000..529d15e --- /dev/null +++ b/connectrpc-health/src/generated/buffa/grpc.health.v1.health.rs @@ -0,0 +1,418 @@ +// @generated by buffa-codegen. DO NOT EDIT. +// source: grpc/health/v1/health.proto + +#[derive(Clone, PartialEq, Default)] +#[derive(::serde::Serialize, ::serde::Deserialize)] +#[serde(default)] +pub struct HealthCheckRequest { + /// Field 1: `service` + #[serde( + rename = "service", + with = "::buffa::json_helpers::proto_string", + skip_serializing_if = "::buffa::json_helpers::skip_if::is_empty_str" + )] + pub service: ::buffa::alloc::string::String, + #[serde(skip)] + #[doc(hidden)] + pub __buffa_unknown_fields: ::buffa::UnknownFields, +} +impl ::core::fmt::Debug for HealthCheckRequest { + fn fmt(&self, f: &mut ::core::fmt::Formatter<'_>) -> ::core::fmt::Result { + f.debug_struct("HealthCheckRequest").field("service", &self.service).finish() + } +} +impl HealthCheckRequest { + /// Protobuf type URL for this message, for use with `Any::pack` and + /// `Any::unpack_if`. + /// + /// Format: `type.googleapis.com/` + pub const TYPE_URL: &'static str = "type.googleapis.com/grpc.health.v1.HealthCheckRequest"; +} +impl ::buffa::DefaultInstance for HealthCheckRequest { + fn default_instance() -> &'static Self { + static VALUE: ::buffa::__private::OnceBox = ::buffa::__private::OnceBox::new(); + VALUE.get_or_init(|| ::buffa::alloc::boxed::Box::new(Self::default())) + } +} +impl ::buffa::MessageName for HealthCheckRequest { + const PACKAGE: &'static str = "grpc.health.v1"; + const NAME: &'static str = "HealthCheckRequest"; + const FULL_NAME: &'static str = "grpc.health.v1.HealthCheckRequest"; + const TYPE_URL: &'static str = "type.googleapis.com/grpc.health.v1.HealthCheckRequest"; +} +impl ::buffa::Message for HealthCheckRequest { + /// Returns the total encoded size in bytes. + /// + /// The result is a `u32`; the protobuf specification requires all + /// messages to fit within 2 GiB (2,147,483,647 bytes), so a + /// compliant message will never overflow this type. + #[allow(clippy::let_and_return)] + fn compute_size(&self, _cache: &mut ::buffa::SizeCache) -> u32 { + #[allow(unused_imports)] + use ::buffa::Enumeration as _; + let mut size = 0u32; + if !self.service.is_empty() { + size += 1u32 + ::buffa::types::string_encoded_len(&self.service) as u32; + } + size += self.__buffa_unknown_fields.encoded_len() as u32; + size + } + fn write_to( + &self, + _cache: &mut ::buffa::SizeCache, + buf: &mut impl ::buffa::bytes::BufMut, + ) { + #[allow(unused_imports)] + use ::buffa::Enumeration as _; + if !self.service.is_empty() { + ::buffa::encoding::Tag::new( + 1u32, + ::buffa::encoding::WireType::LengthDelimited, + ) + .encode(buf); + ::buffa::types::encode_string(&self.service, buf); + } + self.__buffa_unknown_fields.write_to(buf); + } + fn merge_field( + &mut self, + tag: ::buffa::encoding::Tag, + buf: &mut impl ::buffa::bytes::Buf, + depth: u32, + ) -> ::core::result::Result<(), ::buffa::DecodeError> { + #[allow(unused_imports)] + use ::buffa::bytes::Buf as _; + #[allow(unused_imports)] + use ::buffa::Enumeration as _; + match tag.field_number() { + 1u32 => { + if tag.wire_type() != ::buffa::encoding::WireType::LengthDelimited { + return ::core::result::Result::Err(::buffa::DecodeError::WireTypeMismatch { + field_number: 1u32, + expected: 2u8, + actual: tag.wire_type() as u8, + }); + } + ::buffa::types::merge_string(&mut self.service, buf)?; + } + _ => { + self.__buffa_unknown_fields + .push(::buffa::encoding::decode_unknown_field(tag, buf, depth)?); + } + } + ::core::result::Result::Ok(()) + } + fn clear(&mut self) { + self.service.clear(); + self.__buffa_unknown_fields.clear(); + } +} +impl ::buffa::ExtensionSet for HealthCheckRequest { + const PROTO_FQN: &'static str = "grpc.health.v1.HealthCheckRequest"; + fn unknown_fields(&self) -> &::buffa::UnknownFields { + &self.__buffa_unknown_fields + } + fn unknown_fields_mut(&mut self) -> &mut ::buffa::UnknownFields { + &mut self.__buffa_unknown_fields + } +} +impl ::buffa::json_helpers::ProtoElemJson for HealthCheckRequest { + fn serialize_proto_json( + v: &Self, + s: S, + ) -> ::core::result::Result { + ::serde::Serialize::serialize(v, s) + } + fn deserialize_proto_json<'de, D: ::serde::Deserializer<'de>>( + d: D, + ) -> ::core::result::Result { + ::deserialize(d) + } +} +#[doc(hidden)] +pub const __HEALTH_CHECK_REQUEST_JSON_ANY: ::buffa::type_registry::JsonAnyEntry = ::buffa::type_registry::JsonAnyEntry { + type_url: "type.googleapis.com/grpc.health.v1.HealthCheckRequest", + to_json: ::buffa::type_registry::any_to_json::, + from_json: ::buffa::type_registry::any_from_json::, + is_wkt: false, +}; +#[derive(Clone, PartialEq, Default)] +#[derive(::serde::Serialize, ::serde::Deserialize)] +#[serde(default)] +pub struct HealthCheckResponse { + /// Field 1: `status` + #[serde( + rename = "status", + with = "::buffa::json_helpers::proto_enum", + skip_serializing_if = "::buffa::json_helpers::skip_if::is_default_enum_value" + )] + pub status: ::buffa::EnumValue, + #[serde(skip)] + #[doc(hidden)] + pub __buffa_unknown_fields: ::buffa::UnknownFields, +} +impl ::core::fmt::Debug for HealthCheckResponse { + fn fmt(&self, f: &mut ::core::fmt::Formatter<'_>) -> ::core::fmt::Result { + f.debug_struct("HealthCheckResponse").field("status", &self.status).finish() + } +} +impl HealthCheckResponse { + /// Protobuf type URL for this message, for use with `Any::pack` and + /// `Any::unpack_if`. + /// + /// Format: `type.googleapis.com/` + pub const TYPE_URL: &'static str = "type.googleapis.com/grpc.health.v1.HealthCheckResponse"; +} +impl ::buffa::DefaultInstance for HealthCheckResponse { + fn default_instance() -> &'static Self { + static VALUE: ::buffa::__private::OnceBox = ::buffa::__private::OnceBox::new(); + VALUE.get_or_init(|| ::buffa::alloc::boxed::Box::new(Self::default())) + } +} +impl ::buffa::MessageName for HealthCheckResponse { + const PACKAGE: &'static str = "grpc.health.v1"; + const NAME: &'static str = "HealthCheckResponse"; + const FULL_NAME: &'static str = "grpc.health.v1.HealthCheckResponse"; + const TYPE_URL: &'static str = "type.googleapis.com/grpc.health.v1.HealthCheckResponse"; +} +impl ::buffa::Message for HealthCheckResponse { + /// Returns the total encoded size in bytes. + /// + /// The result is a `u32`; the protobuf specification requires all + /// messages to fit within 2 GiB (2,147,483,647 bytes), so a + /// compliant message will never overflow this type. + #[allow(clippy::let_and_return)] + fn compute_size(&self, _cache: &mut ::buffa::SizeCache) -> u32 { + #[allow(unused_imports)] + use ::buffa::Enumeration as _; + let mut size = 0u32; + { + let val = self.status.to_i32(); + if val != 0 { + size += 1u32 + ::buffa::types::int32_encoded_len(val) as u32; + } + } + size += self.__buffa_unknown_fields.encoded_len() as u32; + size + } + fn write_to( + &self, + _cache: &mut ::buffa::SizeCache, + buf: &mut impl ::buffa::bytes::BufMut, + ) { + #[allow(unused_imports)] + use ::buffa::Enumeration as _; + { + let val = self.status.to_i32(); + if val != 0 { + ::buffa::encoding::Tag::new(1u32, ::buffa::encoding::WireType::Varint) + .encode(buf); + ::buffa::types::encode_int32(val, buf); + } + } + self.__buffa_unknown_fields.write_to(buf); + } + fn merge_field( + &mut self, + tag: ::buffa::encoding::Tag, + buf: &mut impl ::buffa::bytes::Buf, + depth: u32, + ) -> ::core::result::Result<(), ::buffa::DecodeError> { + #[allow(unused_imports)] + use ::buffa::bytes::Buf as _; + #[allow(unused_imports)] + use ::buffa::Enumeration as _; + match tag.field_number() { + 1u32 => { + if tag.wire_type() != ::buffa::encoding::WireType::Varint { + return ::core::result::Result::Err(::buffa::DecodeError::WireTypeMismatch { + field_number: 1u32, + expected: 0u8, + actual: tag.wire_type() as u8, + }); + } + self.status = ::buffa::EnumValue::from( + ::buffa::types::decode_int32(buf)?, + ); + } + _ => { + self.__buffa_unknown_fields + .push(::buffa::encoding::decode_unknown_field(tag, buf, depth)?); + } + } + ::core::result::Result::Ok(()) + } + fn clear(&mut self) { + self.status = ::buffa::EnumValue::from(0); + self.__buffa_unknown_fields.clear(); + } +} +impl ::buffa::ExtensionSet for HealthCheckResponse { + const PROTO_FQN: &'static str = "grpc.health.v1.HealthCheckResponse"; + fn unknown_fields(&self) -> &::buffa::UnknownFields { + &self.__buffa_unknown_fields + } + fn unknown_fields_mut(&mut self) -> &mut ::buffa::UnknownFields { + &mut self.__buffa_unknown_fields + } +} +impl ::buffa::json_helpers::ProtoElemJson for HealthCheckResponse { + fn serialize_proto_json( + v: &Self, + s: S, + ) -> ::core::result::Result { + ::serde::Serialize::serialize(v, s) + } + fn deserialize_proto_json<'de, D: ::serde::Deserializer<'de>>( + d: D, + ) -> ::core::result::Result { + ::deserialize(d) + } +} +#[doc(hidden)] +pub const __HEALTH_CHECK_RESPONSE_JSON_ANY: ::buffa::type_registry::JsonAnyEntry = ::buffa::type_registry::JsonAnyEntry { + type_url: "type.googleapis.com/grpc.health.v1.HealthCheckResponse", + to_json: ::buffa::type_registry::any_to_json::, + from_json: ::buffa::type_registry::any_from_json::, + is_wkt: false, +}; +pub mod health_check_response { + #[allow(unused_imports)] + use super::*; + #[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)] + #[repr(i32)] + pub enum ServingStatus { + UNKNOWN = 0i32, + SERVING = 1i32, + NOT_SERVING = 2i32, + /// Used only by the Watch method. + SERVICE_UNKNOWN = 3i32, + } + impl ::core::default::Default for ServingStatus { + fn default() -> Self { + Self::UNKNOWN + } + } + impl ::serde::Serialize for ServingStatus { + fn serialize( + &self, + s: S, + ) -> ::core::result::Result { + s.serialize_str(::buffa::Enumeration::proto_name(self)) + } + } + impl<'de> ::serde::Deserialize<'de> for ServingStatus { + fn deserialize>( + d: D, + ) -> ::core::result::Result { + struct _V; + impl ::serde::de::Visitor<'_> for _V { + type Value = ServingStatus; + fn expecting( + &self, + f: &mut ::core::fmt::Formatter<'_>, + ) -> ::core::fmt::Result { + f.write_str( + concat!( + "a string, integer, or null for ", stringify!(ServingStatus) + ), + ) + } + fn visit_str( + self, + v: &str, + ) -> ::core::result::Result { + ::from_proto_name(v) + .ok_or_else(|| { ::serde::de::Error::unknown_variant(v, &[]) }) + } + fn visit_i64( + self, + v: i64, + ) -> ::core::result::Result { + let v32 = i32::try_from(v) + .map_err(|_| { + ::serde::de::Error::custom( + ::buffa::alloc::format!("enum value {v} out of i32 range"), + ) + })?; + ::from_i32(v32) + .ok_or_else(|| { + ::serde::de::Error::custom( + ::buffa::alloc::format!("unknown enum value {v32}"), + ) + }) + } + fn visit_u64( + self, + v: u64, + ) -> ::core::result::Result { + let v32 = i32::try_from(v) + .map_err(|_| { + ::serde::de::Error::custom( + ::buffa::alloc::format!("enum value {v} out of i32 range"), + ) + })?; + ::from_i32(v32) + .ok_or_else(|| { + ::serde::de::Error::custom( + ::buffa::alloc::format!("unknown enum value {v32}"), + ) + }) + } + fn visit_unit( + self, + ) -> ::core::result::Result { + ::core::result::Result::Ok(::core::default::Default::default()) + } + } + d.deserialize_any(_V) + } + } + impl ::buffa::json_helpers::ProtoElemJson for ServingStatus { + fn serialize_proto_json( + v: &Self, + s: S, + ) -> ::core::result::Result { + ::serde::Serialize::serialize(v, s) + } + fn deserialize_proto_json<'de, D: ::serde::Deserializer<'de>>( + d: D, + ) -> ::core::result::Result { + ::deserialize(d) + } + } + impl ::buffa::Enumeration for ServingStatus { + fn from_i32(value: i32) -> ::core::option::Option { + match value { + 0i32 => ::core::option::Option::Some(Self::UNKNOWN), + 1i32 => ::core::option::Option::Some(Self::SERVING), + 2i32 => ::core::option::Option::Some(Self::NOT_SERVING), + 3i32 => ::core::option::Option::Some(Self::SERVICE_UNKNOWN), + _ => ::core::option::Option::None, + } + } + fn to_i32(&self) -> i32 { + *self as i32 + } + fn proto_name(&self) -> &'static str { + match self { + Self::UNKNOWN => "UNKNOWN", + Self::SERVING => "SERVING", + Self::NOT_SERVING => "NOT_SERVING", + Self::SERVICE_UNKNOWN => "SERVICE_UNKNOWN", + } + } + fn from_proto_name(name: &str) -> ::core::option::Option { + match name { + "UNKNOWN" => ::core::option::Option::Some(Self::UNKNOWN), + "SERVING" => ::core::option::Option::Some(Self::SERVING), + "NOT_SERVING" => ::core::option::Option::Some(Self::NOT_SERVING), + "SERVICE_UNKNOWN" => ::core::option::Option::Some(Self::SERVICE_UNKNOWN), + _ => ::core::option::Option::None, + } + } + fn values() -> &'static [Self] { + &[Self::UNKNOWN, Self::SERVING, Self::NOT_SERVING, Self::SERVICE_UNKNOWN] + } + } +} diff --git a/connectrpc-health/src/generated/buffa/grpc.health.v1.mod.rs b/connectrpc-health/src/generated/buffa/grpc.health.v1.mod.rs new file mode 100644 index 0000000..1d6c905 --- /dev/null +++ b/connectrpc-health/src/generated/buffa/grpc.health.v1.mod.rs @@ -0,0 +1,34 @@ +// @generated by buffa-codegen. DO NOT EDIT. + +include!("grpc.health.v1.health.rs"); +#[allow( + non_camel_case_types, + dead_code, + unused_imports, + unused_qualifications, + clippy::derivable_impls, + clippy::match_single_binding, + clippy::uninlined_format_args, + clippy::doc_lazy_continuation, + clippy::module_inception +)] +pub mod __buffa { + #[allow(unused_imports)] + use super::*; + pub mod view { + #[allow(unused_imports)] + use super::*; + include!("grpc.health.v1.health.__view.rs"); + } + /// Register this package's `Any` type entries and extension entries. + pub fn register_types(reg: &mut ::buffa::type_registry::TypeRegistry) { + reg.register_json_any(super::__HEALTH_CHECK_REQUEST_JSON_ANY); + reg.register_json_any(super::__HEALTH_CHECK_RESPONSE_JSON_ANY); + } +} +#[doc(inline)] +pub use self::__buffa::view::HealthCheckRequestView; +#[doc(inline)] +pub use self::__buffa::view::HealthCheckResponseView; +#[doc(inline)] +pub use self::__buffa::register_types; diff --git a/connectrpc-health/src/generated/buffa/mod.rs b/connectrpc-health/src/generated/buffa/mod.rs new file mode 100644 index 0000000..b4702e2 --- /dev/null +++ b/connectrpc-health/src/generated/buffa/mod.rs @@ -0,0 +1,16 @@ +// @generated by buffa-codegen. DO NOT EDIT. +#![allow(non_camel_case_types, dead_code, unused_imports, unused_qualifications, clippy::derivable_impls, clippy::match_single_binding, clippy::uninlined_format_args, clippy::doc_lazy_continuation, clippy::module_inception)] + +#[allow(non_camel_case_types, dead_code, unused_imports, unused_qualifications, clippy::derivable_impls, clippy::match_single_binding, clippy::uninlined_format_args, clippy::doc_lazy_continuation, clippy::module_inception)] +pub mod grpc { + use super::*; + #[allow(non_camel_case_types, dead_code, unused_imports, unused_qualifications, clippy::derivable_impls, clippy::match_single_binding, clippy::uninlined_format_args, clippy::doc_lazy_continuation, clippy::module_inception)] + pub mod health { + use super::*; + #[allow(non_camel_case_types, dead_code, unused_imports, unused_qualifications, clippy::derivable_impls, clippy::match_single_binding, clippy::uninlined_format_args, clippy::doc_lazy_continuation, clippy::module_inception)] + pub mod v1 { + use super::*; + include!("grpc.health.v1.mod.rs"); + } + } +} diff --git a/connectrpc-health/src/generated/connect/grpc.health.v1.health.__connect.rs b/connectrpc-health/src/generated/connect/grpc.health.v1.health.__connect.rs new file mode 100644 index 0000000..1181fdd --- /dev/null +++ b/connectrpc-health/src/generated/connect/grpc.health.v1.health.__connect.rs @@ -0,0 +1,496 @@ +///Shorthand for `OwnedView>`. +pub type OwnedHealthCheckRequestView = ::buffa::view::OwnedView< + crate::proto::grpc::health::v1::__buffa::view::HealthCheckRequestView<'static>, +>; +///Shorthand for `OwnedView>`. +pub type OwnedHealthCheckResponseView = ::buffa::view::OwnedView< + crate::proto::grpc::health::v1::__buffa::view::HealthCheckResponseView<'static>, +>; +impl ::connectrpc::Encodable +for crate::proto::grpc::health::v1::__buffa::view::HealthCheckResponseView<'_> { + fn encode( + &self, + codec: ::connectrpc::CodecFormat, + ) -> ::std::result::Result<::buffa::bytes::Bytes, ::connectrpc::ConnectError> { + ::connectrpc::__codegen::encode_view_body(self, codec) + } +} +impl ::connectrpc::Encodable +for ::buffa::view::OwnedView< + crate::proto::grpc::health::v1::__buffa::view::HealthCheckResponseView<'static>, +> { + fn encode( + &self, + codec: ::connectrpc::CodecFormat, + ) -> ::std::result::Result<::buffa::bytes::Bytes, ::connectrpc::ConnectError> { + ::connectrpc::__codegen::encode_view_body(&**self, codec) + } +} +/// Full service name for this service. +pub const HEALTH_SERVICE_NAME: &str = "grpc.health.v1.Health"; +/// Static [`Spec`](::connectrpc::Spec) for the server-side `Check` RPC. +/// +/// The dispatcher surfaces this on +/// [`RequestContext::spec`](::connectrpc::RequestContext::spec). +pub const HEALTH_CHECK_SPEC: ::connectrpc::Spec = ::connectrpc::Spec::server( + "/grpc.health.v1.Health/Check", + ::connectrpc::StreamType::Unary, + ) + .with_idempotency_level(::connectrpc::IdempotencyLevel::Unknown); +/// Static [`Spec`](::connectrpc::Spec) for the server-side `Watch` RPC. +/// +/// The dispatcher surfaces this on +/// [`RequestContext::spec`](::connectrpc::RequestContext::spec). +pub const HEALTH_WATCH_SPEC: ::connectrpc::Spec = ::connectrpc::Spec::server( + "/grpc.health.v1.Health/Watch", + ::connectrpc::StreamType::ServerStream, + ) + .with_idempotency_level(::connectrpc::IdempotencyLevel::Unknown); +/// Server trait for Health. +/// +/// # Implementing handlers +/// +/// Handlers receive requests as `OwnedFooView` (an alias for +/// `OwnedView>`), which gives zero-copy borrowed access +/// to fields (e.g. `request.name` is a `&str` into the decoded buffer). +/// The view can be held across `.await` points. When two RPC types in +/// the same package would alias to the same `Owned<…>View` name (e.g. +/// a local message plus an imported one with the same short name), the +/// alias is suppressed for both and the request type is spelled as +/// `OwnedView<…View<'static>>` directly in the trait signature. +/// +/// Implement methods with plain `async fn`; the returned future satisfies +/// the `Send` bound automatically. See the +/// [buffa user guide](https://github.com/anthropics/buffa/blob/main/docs/guide.md#ownedview-in-async-trait-implementations) +/// for zero-copy access patterns and when `to_owned_message()` is needed. +/// +/// The `impl Encodable` return bound accepts the owned `Out`, the +/// generated `OutView<'_>` / `OwnedOutView`, +/// [`MaybeBorrowed`](::connectrpc::MaybeBorrowed), or +/// [`PreEncoded`](::connectrpc::PreEncoded) for handlers that encode a +/// non-`'static` view internally and pass the bytes across the handler +/// boundary. View bodies are not emitted for output types mapped via +/// `extern_path` (the impl would be an orphan); return owned for +/// WKT/extern outputs. +/// +/// Server-streaming and bidi-streaming methods return +/// `ServiceStream + Send + use>`. The +/// `use` precise-capturing clause excludes `&self`'s lifetime +/// (unary methods use `use<'a, Self>` and may borrow), so stream items +/// must be `'static`. To stream view-encoded data, encode each item +/// inside the stream body and yield +/// [`PreEncoded`](::connectrpc::PreEncoded) — see its `# Streaming +/// example` doc. +#[allow(clippy::type_complexity)] +pub trait Health: Send + Sync + 'static { + /// Check returns the serving status of the requested service. If the + /// service name is empty, the response covers the whole server. + /// + /// `'a` lets the response body borrow from `&self` (e.g. server-resident state). + fn check<'a>( + &'a self, + ctx: ::connectrpc::RequestContext, + request: OwnedHealthCheckRequestView, + ) -> impl ::std::future::Future< + Output = ::connectrpc::ServiceResult< + impl ::connectrpc::Encodable< + crate::proto::grpc::health::v1::HealthCheckResponse, + > + Send + use<'a, Self>, + >, + > + Send; + /// Watch performs a watch for the serving status of the requested service. + /// The server will immediately send back a message indicating the current + /// serving status. It will then subsequently send a new message whenever + /// the service's serving status changes. + /// If the requested service is unknown when the call is received, the + /// server will send a message setting the serving status to SERVICE_UNKNOWN + /// but will *not* terminate the call. If at some future point, the serving + /// status of the service becomes known, the server will send a new message + /// with the service's serving status. + /// If the call terminates with status UNIMPLEMENTED, then the client should + /// assume this method is not supported and should not retry the call. If + /// the call terminates with any other status (including OK), then the + /// client should retry the call with appropriate exponential backoff. + fn watch( + &self, + ctx: ::connectrpc::RequestContext, + request: OwnedHealthCheckRequestView, + ) -> impl ::std::future::Future< + Output = ::connectrpc::ServiceResult< + ::connectrpc::ServiceStream< + impl ::connectrpc::Encodable< + crate::proto::grpc::health::v1::HealthCheckResponse, + > + Send + use, + >, + >, + > + Send; +} +/// Extension trait for registering a service implementation with a Router. +/// +/// This trait is automatically implemented for all types that implement the service trait. +/// +/// # Example +/// +/// ```rust,ignore +/// use std::sync::Arc; +/// +/// let service = Arc::new(MyServiceImpl); +/// let router = service.register(Router::new()); +/// ``` +pub trait HealthExt: Health { + /// Register this service implementation with a Router. + /// + /// Takes ownership of the `Arc` and returns a new Router with + /// this service's methods registered. + fn register( + self: ::std::sync::Arc, + router: ::connectrpc::Router, + ) -> ::connectrpc::Router; +} +impl HealthExt for S { + fn register( + self: ::std::sync::Arc, + router: ::connectrpc::Router, + ) -> ::connectrpc::Router { + router + .route_view( + HEALTH_SERVICE_NAME, + "Check", + { + let svc = ::std::sync::Arc::clone(&self); + ::connectrpc::view_handler_fn(move |ctx, req, format| { + let svc = ::std::sync::Arc::clone(&svc); + async move { + svc.check(ctx, req) + .await? + .encode::< + crate::proto::grpc::health::v1::HealthCheckResponse, + >(format) + } + }) + }, + ) + .with_spec(HEALTH_CHECK_SPEC) + .route_view_server_stream::< + _, + _, + crate::proto::grpc::health::v1::HealthCheckResponse, + >( + HEALTH_SERVICE_NAME, + "Watch", + ::connectrpc::view_streaming_handler_fn({ + let svc = ::std::sync::Arc::clone(&self); + move |ctx, req| { + let svc = ::std::sync::Arc::clone(&svc); + async move { svc.watch(ctx, req).await } + } + }), + ) + .with_spec(HEALTH_WATCH_SPEC) + } +} +/// Monomorphic dispatcher for `Health`. +/// +/// Unlike `.register(Router)` which type-erases each method into an `Arc` stored in a `HashMap`, this struct dispatches via a compile-time `match` on method name: no vtable, no hash lookup. +/// +/// # Example +/// +/// ```rust,ignore +/// use connectrpc::ConnectRpcService; +/// +/// let server = HealthServer::new(MyImpl); +/// let service = ConnectRpcService::new(server); +/// // hand `service` to axum/hyper as a fallback_service +/// ``` +pub struct HealthServer { + inner: ::std::sync::Arc, +} +impl HealthServer { + /// Wrap a service implementation in a monomorphic dispatcher. + pub fn new(service: T) -> Self { + Self { + inner: ::std::sync::Arc::new(service), + } + } + /// Wrap an already-`Arc`'d service implementation. + pub fn from_arc(inner: ::std::sync::Arc) -> Self { + Self { inner } + } +} +impl Clone for HealthServer { + fn clone(&self) -> Self { + Self { + inner: ::std::sync::Arc::clone(&self.inner), + } + } +} +impl ::connectrpc::Dispatcher for HealthServer { + #[inline] + fn lookup( + &self, + path: &str, + ) -> Option<::connectrpc::dispatcher::codegen::MethodDescriptor> { + let method = path.strip_prefix("grpc.health.v1.Health/")?; + match method { + "Check" => { + Some( + ::connectrpc::dispatcher::codegen::MethodDescriptor::unary(false) + .with_spec(HEALTH_CHECK_SPEC), + ) + } + "Watch" => { + Some( + ::connectrpc::dispatcher::codegen::MethodDescriptor::server_streaming() + .with_spec(HEALTH_WATCH_SPEC), + ) + } + _ => None, + } + } + fn call_unary( + &self, + path: &str, + ctx: ::connectrpc::RequestContext, + request: ::connectrpc::Payload, + format: ::connectrpc::CodecFormat, + ) -> ::connectrpc::dispatcher::codegen::UnaryResult { + let Some(method) = path.strip_prefix("grpc.health.v1.Health/") else { + return ::connectrpc::dispatcher::codegen::unimplemented_unary(path); + }; + let _ = (&ctx, &request, &format); + match method { + "Check" => { + let svc = ::std::sync::Arc::clone(&self.inner); + Box::pin(async move { + let req = ::connectrpc::dispatcher::codegen::decode_request_view::< + crate::proto::grpc::health::v1::__buffa::view::HealthCheckRequestView, + >(request.encoded()?, format)?; + svc.check(ctx, req) + .await? + .encode::< + crate::proto::grpc::health::v1::HealthCheckResponse, + >(format) + }) + } + _ => ::connectrpc::dispatcher::codegen::unimplemented_unary(path), + } + } + fn call_server_streaming( + &self, + path: &str, + ctx: ::connectrpc::RequestContext, + request: ::buffa::bytes::Bytes, + format: ::connectrpc::CodecFormat, + ) -> ::connectrpc::dispatcher::codegen::StreamingResult { + let Some(method) = path.strip_prefix("grpc.health.v1.Health/") else { + return ::connectrpc::dispatcher::codegen::unimplemented_streaming(path); + }; + let _ = (&ctx, &request, &format); + match method { + "Watch" => { + let svc = ::std::sync::Arc::clone(&self.inner); + Box::pin(async move { + let req = ::connectrpc::dispatcher::codegen::decode_request_view::< + crate::proto::grpc::health::v1::__buffa::view::HealthCheckRequestView, + >(request, format)?; + let resp = svc.watch(ctx, req).await?; + Ok( + resp + .map_body(|s| ::connectrpc::dispatcher::codegen::encode_response_stream::< + crate::proto::grpc::health::v1::HealthCheckResponse, + _, + _, + >(s, format)), + ) + }) + } + _ => ::connectrpc::dispatcher::codegen::unimplemented_streaming(path), + } + } + fn call_client_streaming( + &self, + path: &str, + ctx: ::connectrpc::RequestContext, + requests: ::connectrpc::dispatcher::codegen::RequestStream, + format: ::connectrpc::CodecFormat, + ) -> ::connectrpc::dispatcher::codegen::UnaryResult { + let Some(method) = path.strip_prefix("grpc.health.v1.Health/") else { + return ::connectrpc::dispatcher::codegen::unimplemented_unary(path); + }; + let _ = (&ctx, &requests, &format); + match method { + _ => ::connectrpc::dispatcher::codegen::unimplemented_unary(path), + } + } + fn call_bidi_streaming( + &self, + path: &str, + ctx: ::connectrpc::RequestContext, + requests: ::connectrpc::dispatcher::codegen::RequestStream, + format: ::connectrpc::CodecFormat, + ) -> ::connectrpc::dispatcher::codegen::StreamingResult { + let Some(method) = path.strip_prefix("grpc.health.v1.Health/") else { + return ::connectrpc::dispatcher::codegen::unimplemented_streaming(path); + }; + let _ = (&ctx, &requests, &format); + match method { + _ => ::connectrpc::dispatcher::codegen::unimplemented_streaming(path), + } + } +} +/// Client for this service. +/// +/// Generic over `T: ClientTransport`. For **gRPC** (HTTP/2), use +/// `Http2Connection` — it has honest `poll_ready` and composes with +/// `tower::balance` for multi-connection load balancing. For **Connect +/// over HTTP/1.1** (or unknown protocol), use `HttpClient`. +/// +/// # Example (gRPC / HTTP/2) +/// +/// ```rust,ignore +/// use connectrpc::client::{Http2Connection, ClientConfig}; +/// use connectrpc::Protocol; +/// +/// let uri: http::Uri = "http://localhost:8080".parse()?; +/// let conn = Http2Connection::connect_plaintext(uri.clone()).await?.shared(1024); +/// let config = ClientConfig::new(uri).with_protocol(Protocol::Grpc); +/// +/// let client = HealthClient::new(conn, config); +/// let response = client.check(request).await?; +/// ``` +/// +/// # Example (Connect / HTTP/1.1 or ALPN) +/// +/// ```rust,ignore +/// use connectrpc::client::{HttpClient, ClientConfig}; +/// +/// let http = HttpClient::plaintext(); // cleartext http:// only +/// let config = ClientConfig::new("http://localhost:8080".parse()?); +/// +/// let client = HealthClient::new(http, config); +/// let response = client.check(request).await?; +/// ``` +/// +/// # Working with the response +/// +/// Unary calls return [`UnaryResponse>`](::connectrpc::client::UnaryResponse). +/// The `OwnedView` derefs to the view, so field access is zero-copy: +/// +/// ```rust,ignore +/// let resp = client.check(request).await?.into_view(); +/// let name: &str = resp.name; // borrow into the response buffer +/// ``` +/// +/// If you need the owned struct (e.g. to store or pass by value), use +/// [`into_owned()`](::connectrpc::client::UnaryResponse::into_owned): +/// +/// ```rust,ignore +/// let owned = client.check(request).await?.into_owned(); +/// ``` +#[cfg(feature = "client")] +#[derive(Clone)] +pub struct HealthClient { + transport: T, + config: ::connectrpc::client::ClientConfig, +} +#[cfg(feature = "client")] +impl HealthClient +where + T: ::connectrpc::client::ClientTransport, + ::Error: ::std::fmt::Display, +{ + /// Create a new client with the given transport and configuration. + pub fn new(transport: T, config: ::connectrpc::client::ClientConfig) -> Self { + Self { transport, config } + } + /// Get the client configuration. + pub fn config(&self) -> &::connectrpc::client::ClientConfig { + &self.config + } + /// Get a mutable reference to the client configuration. + pub fn config_mut(&mut self) -> &mut ::connectrpc::client::ClientConfig { + &mut self.config + } + /// Call the Check RPC. Sends a request to /grpc.health.v1.Health/Check. + pub async fn check( + &self, + request: crate::proto::grpc::health::v1::HealthCheckRequest, + ) -> Result< + ::connectrpc::client::UnaryResponse< + ::buffa::view::OwnedView< + crate::proto::grpc::health::v1::__buffa::view::HealthCheckResponseView< + 'static, + >, + >, + >, + ::connectrpc::ConnectError, + > { + self.check_with_options(request, ::connectrpc::client::CallOptions::default()) + .await + } + /// Call the Check RPC with explicit per-call options. Options override [`ClientConfig`](::connectrpc::client::ClientConfig) defaults. + pub async fn check_with_options( + &self, + request: crate::proto::grpc::health::v1::HealthCheckRequest, + options: ::connectrpc::client::CallOptions, + ) -> Result< + ::connectrpc::client::UnaryResponse< + ::buffa::view::OwnedView< + crate::proto::grpc::health::v1::__buffa::view::HealthCheckResponseView< + 'static, + >, + >, + >, + ::connectrpc::ConnectError, + > { + ::connectrpc::client::call_unary( + &self.transport, + &self.config, + HEALTH_SERVICE_NAME, + "Check", + request, + options, + ) + .await + } + /// Call the Watch RPC. Sends a request to /grpc.health.v1.Health/Watch. + pub async fn watch( + &self, + request: crate::proto::grpc::health::v1::HealthCheckRequest, + ) -> Result< + ::connectrpc::client::ServerStream< + T::ResponseBody, + crate::proto::grpc::health::v1::__buffa::view::HealthCheckResponseView< + 'static, + >, + >, + ::connectrpc::ConnectError, + > { + self.watch_with_options(request, ::connectrpc::client::CallOptions::default()) + .await + } + /// Call the Watch RPC with explicit per-call options. Options override [`ClientConfig`](::connectrpc::client::ClientConfig) defaults. + pub async fn watch_with_options( + &self, + request: crate::proto::grpc::health::v1::HealthCheckRequest, + options: ::connectrpc::client::CallOptions, + ) -> Result< + ::connectrpc::client::ServerStream< + T::ResponseBody, + crate::proto::grpc::health::v1::__buffa::view::HealthCheckResponseView< + 'static, + >, + >, + ::connectrpc::ConnectError, + > { + ::connectrpc::client::call_server_stream( + &self.transport, + &self.config, + HEALTH_SERVICE_NAME, + "Watch", + request, + options, + ) + .await + } +} diff --git a/connectrpc-health/src/generated/connect/grpc.health.v1.mod.rs b/connectrpc-health/src/generated/connect/grpc.health.v1.mod.rs new file mode 100644 index 0000000..0104edf --- /dev/null +++ b/connectrpc-health/src/generated/connect/grpc.health.v1.mod.rs @@ -0,0 +1,2 @@ +// @generated by connectrpc-codegen. DO NOT EDIT. +include!("grpc.health.v1.health.__connect.rs"); diff --git a/connectrpc-health/src/generated/connect/mod.rs b/connectrpc-health/src/generated/connect/mod.rs new file mode 100644 index 0000000..b4702e2 --- /dev/null +++ b/connectrpc-health/src/generated/connect/mod.rs @@ -0,0 +1,16 @@ +// @generated by buffa-codegen. DO NOT EDIT. +#![allow(non_camel_case_types, dead_code, unused_imports, unused_qualifications, clippy::derivable_impls, clippy::match_single_binding, clippy::uninlined_format_args, clippy::doc_lazy_continuation, clippy::module_inception)] + +#[allow(non_camel_case_types, dead_code, unused_imports, unused_qualifications, clippy::derivable_impls, clippy::match_single_binding, clippy::uninlined_format_args, clippy::doc_lazy_continuation, clippy::module_inception)] +pub mod grpc { + use super::*; + #[allow(non_camel_case_types, dead_code, unused_imports, unused_qualifications, clippy::derivable_impls, clippy::match_single_binding, clippy::uninlined_format_args, clippy::doc_lazy_continuation, clippy::module_inception)] + pub mod health { + use super::*; + #[allow(non_camel_case_types, dead_code, unused_imports, unused_qualifications, clippy::derivable_impls, clippy::match_single_binding, clippy::uninlined_format_args, clippy::doc_lazy_continuation, clippy::module_inception)] + pub mod v1 { + use super::*; + include!("grpc.health.v1.mod.rs"); + } + } +} diff --git a/connectrpc-health/src/lib.rs b/connectrpc-health/src/lib.rs new file mode 100644 index 0000000..340f735 --- /dev/null +++ b/connectrpc-health/src/lib.rs @@ -0,0 +1,102 @@ +//! gRPC health-checking service for `connectrpc`. +//! +//! Wire-compatible with [`grpc.health.v1.Health`], so `grpc_health_probe`, +//! `grpcurl`, Kubernetes' gRPC liveness probes, and any other client of the +//! standard gRPC health protocol just work. +//! +//! Non-empty unregistered services return `Err(ConnectError::not_found(_))` +//! from both `Check` and `Watch`; the empty service auto-subscribes on +//! `Watch` and returns `Serving` on `Check` by default — see +//! [`HealthService`]'s `# Unknown services` section for how this relates +//! to the gRPC Health spec. +//! +//! # Cargo features +//! +//! * **`client`** (on by default) — re-exports the generated +//! `HealthClient` for in-process probes, integration tests, and +//! sidecar tooling. Pulls in `connectrpc`'s `client` feature (the +//! HTTP/2 transport stack). Server-only deployments drop it with +//! `connectrpc-health = { version = "0.6", default-features = false }`; +//! `use connectrpc_health::HealthClient` then becomes an unresolved +//! import (the type is gone), but the dependency graph loses +//! `connectrpc/client`. +//! +//! # Writing a custom `Checker` +//! +//! [`StaticChecker`] covers most servers. If you implement [`Checker`] +//! yourself (e.g. report `NotServing` while a database connection is +//! down), note that the **default `watch` implementation returns +//! `Unimplemented`** — fine for kubelet / `grpc_health_probe` (they only +//! call `Check`), but service meshes (Linkerd, Istio) and gRPC clients +//! with health-based load balancing call `Watch` too. Override the +//! method if your probes need it. See [`Checker::watch`] for details. +//! +//! # Quick start +//! +//! ```no_run +//! use connectrpc::Router; +//! use connectrpc_health::{install_static, Status}; +//! +//! // In real code, pass the generated `*_SERVICE_NAME` constant — +//! // the literal below is a stand-in. +//! let (router, health) = install_static(Router::new(), [ +//! "acme.user.v1.UserService", +//! ]); +//! +//! // Later, when something goes wrong. `set_status` errors on unknown +//! // names; here the name was just registered above, so `.expect` +//! // documents the invariant. +//! health +//! .set_status("acme.user.v1.UserService", Status::NotServing) +//! .expect("registered above"); +//! +//! // ...and at shutdown. `shutdown()` flips every registered service, +//! // including the empty whole-process entry seeded on construction. +//! health.shutdown(); +//! # drop(router); +//! ``` +//! +//! For custom logic (probing a database, propagating dependency state), +//! implement [`Checker`] directly and wrap it in [`HealthService::new`] +//! / [`HealthService::from_arc`]. +//! +//! [`grpc.health.v1.Health`]: https://github.com/grpc/grpc-proto/blob/master/grpc/health/v1/health.proto + +mod checker; +mod service; +mod static_checker; +mod status; + +#[path = "generated/connect/mod.rs"] +mod connect; +#[path = "generated/buffa/mod.rs"] +mod proto; + +pub use checker::{Checker, StatusStream}; +pub use service::{HealthService, install_static}; +pub use static_checker::{StaticChecker, UnknownServiceError}; +pub use status::Status; + +/// Generated client for calling a `grpc.health.v1.Health` server. +/// +/// Gated on the `client` Cargo feature (on by default). Server-only +/// deployments turn off default features to drop the `connectrpc/client` +/// transport stack from their dependency graph. +#[cfg(feature = "client")] +pub use connect::grpc::health::v1::HealthClient; + +/// Generated extension trait that adds `.register(router)` to any +/// `Arc where S: Health`. Import it to register a [`HealthService`]. +pub use connect::grpc::health::v1::HealthExt; + +/// Fully-qualified protobuf service name: `"grpc.health.v1.Health"`. +pub use connect::grpc::health::v1::HEALTH_SERVICE_NAME; + +/// Re-exports of the generated `grpc.health.v1` wire types — request and +/// response messages, `ServingStatus`, the `*_SPEC` constants. Downstream +/// crates can build probe loops without regenerating the proto. +pub mod wire { + pub use crate::connect::grpc::health::v1::{HEALTH_CHECK_SPEC, HEALTH_WATCH_SPEC}; + pub use crate::proto::grpc::health::v1::health_check_response::ServingStatus; + pub use crate::proto::grpc::health::v1::{HealthCheckRequest, HealthCheckResponse}; +} diff --git a/connectrpc-health/src/service.rs b/connectrpc-health/src/service.rs new file mode 100644 index 0000000..8fbed67 --- /dev/null +++ b/connectrpc-health/src/service.rs @@ -0,0 +1,451 @@ +//! The bridge from a user [`Checker`] to the generated `grpc.health.v1.Health` +//! service trait. + +use std::sync::Arc; + +use buffa::view::OwnedView; +use connectrpc::{ConnectError, RequestContext, Response, Router, ServiceResult, ServiceStream}; +use futures::StreamExt; + +use crate::connect::grpc::health::v1::{Health, HealthExt}; +use crate::proto::grpc::health::v1::{ + HealthCheckRequestView, HealthCheckResponse, health_check_response::ServingStatus, +}; +use crate::{Checker, StaticChecker}; + +/// gRPC-compatible health service backed by a user-supplied [`Checker`]. +/// +/// Wraps any `Checker` and exposes it as the wire-format +/// `grpc.health.v1.Health` service. For the common case of a +/// [`StaticChecker`]-backed setup, prefer the [`install_static`] +/// free function; reach for `HealthService` directly when you implement +/// [`Checker`] yourself. +/// +/// ```no_run +/// use std::sync::Arc; +/// use connectrpc::Router; +/// use connectrpc_health::{HealthExt, HealthService, StaticChecker}; +/// +/// let checker = Arc::new(StaticChecker::with_services([ +/// "acme.user.v1.UserService", +/// ])); +/// let service = Arc::new(HealthService::from_arc(Arc::clone(&checker))); +/// let router = service.register(Router::new()); +/// ``` +/// +/// `HealthService::new(checker)` is the move-in shorthand; use +/// [`from_arc`](Self::from_arc) when you keep your own clone of the +/// `Arc` to flip status from outside the service. +/// +/// # Unknown services +/// +/// Non-empty unregistered services surface as +/// `Err(ConnectError::not_found(_))` from both `Check` and `Watch`; the +/// empty service is pre-registered with [`Status::Serving`] and behaves +/// like any other service (see [`StaticChecker`]'s `# Empty service name` +/// section). The [gRPC Health spec] additionally specifies a +/// `SERVICE_UNKNOWN` keep-stream-open flow for `Watch` that this crate +/// does not implement (matching the Go `connectrpc.com/grpchealth` +/// reference). Probes that treat any error as failure — kubelet, +/// `grpc_health_probe`, Linkerd, Istio — work unchanged. +/// +/// [`Status::Serving`]: crate::Status::Serving +/// [`StaticChecker`]: crate::StaticChecker +/// +/// [gRPC Health spec]: https://github.com/grpc/grpc/blob/master/doc/health-checking.md +pub struct HealthService { + checker: Arc, +} + +impl HealthService { + /// Wrap a checker by value; it is moved into a fresh `Arc`. + #[must_use] + pub fn new(checker: C) -> Self { + Self { + checker: Arc::new(checker), + } + } + + /// Wrap a checker that is already inside an `Arc`. Use this when + /// you keep your own clone of the `Arc` to flip status from + /// outside the service. + #[must_use] + pub fn from_arc(checker: Arc) -> Self { + Self { checker } + } + + /// Return a fresh `Arc` handle to the inner checker. One atomic + /// increment per call; safe to keep, store, or pass into another + /// `HealthService::from_arc` to mount the same checker behind a + /// second service. + #[must_use] + pub fn checker(&self) -> Arc { + Arc::clone(&self.checker) + } +} + +/// One-line installation for the static-checker happy path. +/// +/// Builds a [`StaticChecker`] pre-populated with `services` (each +/// reporting [`Status::Serving`](crate::Status::Serving)), wraps it in +/// a [`HealthService`], registers that service on `router`, and hands +/// back both the updated router and a shared `Arc` for +/// status mutation. +/// +/// Pass the generated `*_SERVICE_NAME` constants from your service +/// stubs to avoid drifting from the wire name a probe will ask about. +/// +/// See the crate-level [Quick start](crate#quick-start) for an +/// end-to-end example including status flips and shutdown. +/// +/// # Caveat on destructuring discards +/// +/// `#[must_use]` only fires when the *whole* return value is dropped at +/// statement position (`install_static(...);`). Any pattern binding — +/// `let _ = install_static(...);`, +/// `let (router, _) = install_static(...);`, +/// `let (_, _) = install_static(...);` — counts as a use by the lint +/// (the `Arc` is still dropped at end of statement, but +/// the lint stays quiet). Also note that the input `router` is moved +/// into the function; if you drop the returned tuple you've lost both +/// the mount and the only handle for status mutation. Bind both halves +/// of the tuple even if you don't immediately call the checker. +#[must_use = "install_static returns (Router, Arc) — \ + drop either and you've lost the mount or the only handle \ + for status mutation"] +pub fn install_static(router: Router, services: I) -> (Router, Arc) +where + I: IntoIterator, + S: Into, +{ + let checker = Arc::new(StaticChecker::with_services(services)); + let service = Arc::new(HealthService::from_arc(Arc::clone(&checker))); + let router = service.register(router); + (router, checker) +} + +impl Clone for HealthService { + fn clone(&self) -> Self { + Self { + checker: Arc::clone(&self.checker), + } + } +} + +impl std::fmt::Debug for HealthService { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("HealthService").finish_non_exhaustive() + } +} + +impl Health for HealthService { + async fn check( + &self, + _ctx: RequestContext, + request: OwnedView>, + ) -> ServiceResult { + let status = self.checker.check(request.service).await?; + Response::ok(HealthCheckResponse { + status: ServingStatus::from(status).into(), + ..Default::default() + }) + } + + async fn watch( + &self, + _ctx: RequestContext, + request: OwnedView>, + ) -> ServiceResult> { + let stream = self.checker.watch(request.service).await?; + Response::stream_ok(stream.map(|status| { + Ok::<_, ConnectError>(HealthCheckResponse { + status: ServingStatus::from(status).into(), + ..Default::default() + }) + })) + } +} + +// Integration tests drive the server through the generated client, +// which is feature-gated. `client` is default-on; `cargo test +// --no-default-features` skips this module. +#[cfg(all(test, feature = "client"))] +mod tests { + use std::sync::Arc; + use std::time::Duration; + + use connectrpc::Router; + use connectrpc::client::{ClientConfig, HttpClient}; + use tokio::net::TcpListener; + + use super::*; + use crate::connect::grpc::health::v1::{HealthClient, HealthExt}; + use crate::proto::grpc::health::v1::HealthCheckRequest; + use crate::{StaticChecker, Status}; + + /// Spin up a Health server on a free port and hand back the address + /// and a client targeting it. The server runs until the test exits. + async fn spawn_health_server( + checker: Arc, + ) -> (HealthClient, std::net::SocketAddr) { + let service = Arc::new(HealthService::from_arc(checker)); + let router = service.register(Router::new()); + let app = router.into_axum_router(); + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + tokio::spawn(async move { + axum::serve(listener, app).await.unwrap(); + }); + let config = ClientConfig::new(format!("http://{addr}").parse().unwrap()); + let client = HealthClient::new(HttpClient::plaintext(), config); + (client, addr) + } + + #[tokio::test] + async fn check_serving_service() { + let checker = Arc::new(StaticChecker::with_services(["acme.A"])); + let (client, _addr) = spawn_health_server(checker).await; + + let resp = client + .check(HealthCheckRequest { + service: "acme.A".into(), + ..Default::default() + }) + .await + .unwrap(); + assert_eq!(resp.view().status, ServingStatus::SERVING); + } + + #[tokio::test] + async fn check_empty_service_returns_serving() { + let checker = Arc::new(StaticChecker::new()); + let (client, _addr) = spawn_health_server(checker).await; + + let resp = client.check(HealthCheckRequest::default()).await.unwrap(); + assert_eq!(resp.view().status, ServingStatus::SERVING); + } + + #[tokio::test] + async fn check_unknown_service_returns_not_found() { + let checker = Arc::new(StaticChecker::new()); + let (client, _addr) = spawn_health_server(checker).await; + + let err = client + .check(HealthCheckRequest { + service: "acme.NoSuch".into(), + ..Default::default() + }) + .await + .unwrap_err(); + assert_eq!(err.code, connectrpc::ErrorCode::NotFound); + } + + #[tokio::test] + async fn check_reflects_not_serving_after_update() { + let checker = Arc::new(StaticChecker::with_services(["acme.A"])); + let (client, _addr) = spawn_health_server(Arc::clone(&checker)).await; + + checker.set_status("acme.A", Status::NotServing).unwrap(); + + let resp = client + .check(HealthCheckRequest { + service: "acme.A".into(), + ..Default::default() + }) + .await + .unwrap(); + assert_eq!(resp.view().status, ServingStatus::NOT_SERVING); + } + + /// End-to-end companion to `dropping_watch_stream_releases_subscriber` + /// in `static_checker`: when a client drops its handle to the Watch + /// RPC mid-stream, the server's spawned response future is cancelled, + /// the response body's stream chain (boxed `Map` → `StatusStream` → + /// `WatchStream`) is dropped, and the underlying + /// [`tokio::sync::watch::Receiver`] is released back to the + /// [`watch::Sender`]'s receiver count. The unit test pins the + /// `WatchStream`→`Receiver` rung in isolation; this test ties the + /// chain together through axum + hyper so a regression in any + /// intermediate layer (axum's response cancellation, connectrpc's + /// body driver, the codegen's stream wrapper) surfaces here. + #[tokio::test] + async fn client_disconnect_releases_server_side_subscriber() { + let checker = Arc::new(StaticChecker::with_services(["acme.A"])); + let (client, _addr) = spawn_health_server(Arc::clone(&checker)).await; + + let before = checker + .receiver_count_for("acme.A") + .expect("registered service must have a Sender"); + + // Open Watch and pull the initial frame. The pull is + // load-bearing: `client.watch(...).await` returns once the + // request headers are written, but the server's `subscribe()` + // call happens inside the spawned handler future. Without a + // round-trip-forcing read, the next `receiver_count_for` read + // could observe `before` (handler not yet polled to subscribe) + // and the +1 assertion would race. + let mut stream = client + .watch(HealthCheckRequest { + service: "acme.A".into(), + ..Default::default() + }) + .await + .unwrap(); + let first = stream + .message() + .await + .unwrap() + .expect("expected initial Watch message"); + assert_eq!(first.status, ServingStatus::SERVING); + + // Server-side receiver count must be elevated by exactly 1. + assert_eq!( + checker.receiver_count_for("acme.A"), + Some(before + 1), + "server-side Receiver must be live while the client holds \ + the stream open" + ); + + // Simulate client disconnect. `drop(stream)` is the load-bearing + // step: it closes the HTTP/2 response body's receive half, which + // h2 surfaces to the server's body-driver task as cancellation. + // `drop(client)` is belt-and-braces — the HttpClient owns the + // connection pool but not this RPC's lifetime; for an already + // cancelled stream it's a no-op. + drop(stream); + drop(client); + + // Cancellation propagation through axum+hyper is asynchronous; + // poll with a tight tick and a generous overall budget. Two + // seconds is well above the observed propagation latency + // (sub-100ms locally) and tolerates slow CI. + let deadline = std::time::Instant::now() + Duration::from_secs(2); + loop { + let count = checker + .receiver_count_for("acme.A") + .expect("entry still registered post-disconnect"); + if count == before { + break; + } + if std::time::Instant::now() >= deadline { + panic!( + "server-side Receiver was not released within 2s of \ + client disconnect — expected {before}, still {count}" + ); + } + tokio::time::sleep(Duration::from_millis(20)).await; + } + } + + #[tokio::test] + async fn watch_streams_initial_then_changes() { + let checker = Arc::new(StaticChecker::with_services(["acme.A"])); + let (client, _addr) = spawn_health_server(Arc::clone(&checker)).await; + + let mut stream = client + .watch(HealthCheckRequest { + service: "acme.A".into(), + ..Default::default() + }) + .await + .unwrap(); + + // First message is the current state. + let initial = stream + .message() + .await + .unwrap() + .expect("expected initial Watch message"); + assert_eq!(initial.status, ServingStatus::SERVING); + + // Update fires a follow-up message. + checker.set_status("acme.A", Status::NotServing).unwrap(); + let after = tokio::time::timeout(Duration::from_secs(2), stream.message()) + .await + .expect("watch did not deliver update within timeout") + .unwrap() + .expect("expected follow-up Watch message"); + assert_eq!(after.status, ServingStatus::NOT_SERVING); + } + + #[tokio::test] + async fn checker_accessor_returns_shared_arc() { + let svc = HealthService::new(StaticChecker::with_services(["acme.A"])); + // Mutating through the accessor must be visible to the service. + svc.checker() + .set_status("acme.A", Status::NotServing) + .unwrap(); + let (client, _addr) = spawn_health_server(svc.checker()).await; + let resp = client + .check(HealthCheckRequest { + service: "acme.A".into(), + ..Default::default() + }) + .await + .unwrap(); + assert_eq!(resp.view().status, ServingStatus::NOT_SERVING); + } + + #[tokio::test] + async fn install_static_mounts_and_returns_mutable_checker() { + use crate::install_static; + let (router, health) = install_static(Router::new(), ["acme.A"]); + let app = router.into_axum_router(); + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + tokio::spawn(async move { + axum::serve(listener, app).await.unwrap(); + }); + let config = ClientConfig::new(format!("http://{addr}").parse().unwrap()); + let client = HealthClient::new(HttpClient::plaintext(), config); + + // Default: Serving. + let resp = client + .check(HealthCheckRequest { + service: "acme.A".into(), + ..Default::default() + }) + .await + .unwrap(); + assert_eq!(resp.view().status, ServingStatus::SERVING); + + // Flip via the returned handle. + health.set_status("acme.A", Status::NotServing).unwrap(); + let resp = client + .check(HealthCheckRequest { + service: "acme.A".into(), + ..Default::default() + }) + .await + .unwrap(); + assert_eq!(resp.view().status, ServingStatus::NOT_SERVING); + } + + #[tokio::test] + async fn watch_unimplemented_when_checker_does_not_support_it() { + struct CheckOnly; + impl Checker for CheckOnly { + async fn check(&self, _service: &str) -> Result { + Ok(Status::Serving) + } + // No watch override → default returns Unimplemented. + } + let svc = Arc::new(HealthService::new(CheckOnly)); + let router = svc.register(Router::new()); + let app = router.into_axum_router(); + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + tokio::spawn(async move { + axum::serve(listener, app).await.unwrap(); + }); + let config = ClientConfig::new(format!("http://{addr}").parse().unwrap()); + let client: HealthClient = HealthClient::new(HttpClient::plaintext(), config); + + let mut stream = client.watch(HealthCheckRequest::default()).await.unwrap(); + // Server-streaming RPCs surface errors via the trailers — `message()` + // returns `Ok(None)` and the error lands on `stream.error()`. + assert!(stream.message().await.unwrap().is_none()); + let err = stream.error().expect("expected Unimplemented error"); + assert_eq!(err.code, connectrpc::ErrorCode::Unimplemented); + } +} diff --git a/connectrpc-health/src/static_checker.rs b/connectrpc-health/src/static_checker.rs new file mode 100644 index 0000000..1e147bf --- /dev/null +++ b/connectrpc-health/src/static_checker.rs @@ -0,0 +1,726 @@ +//! A shared-state [`Checker`] suitable for most servers. + +use std::collections::HashMap; +use std::collections::hash_map::Entry; +use std::sync::Mutex; + +use connectrpc::ConnectError; +use tokio::sync::watch; + +use crate::checker::StatusStream; +use crate::{Checker, Status}; + +/// In-memory checker backed by a `HashMap`. +/// +/// `Send + Sync`; clone the `Arc` you wrap it in to share across tasks. +/// +/// # Registration is explicit +/// +/// Every name a probe might ask about must be registered first — either +/// up front via [`with_services`](Self::with_services) (the common case; +/// pass the generated `*_SERVICE_NAME` constants) or at runtime via +/// [`register`](Self::register). [`set_status`](Self::set_status) refuses +/// unknown names and returns [`UnknownServiceError`], so a typo'd name +/// surfaces at the call site instead of silently shadowing the real +/// entry. This matches connect-go's `grpchealth.StaticChecker.SetStatus`. +/// +/// # Empty service name +/// +/// The empty string represents the whole-process status. It is always +/// pre-registered with [`Status::Serving`], so `check("")` and +/// `watch("")` behave like any other registered service — and +/// [`shutdown`](Self::shutdown) flips it alongside the user-registered +/// services. Unregistered non-empty services return `NotFound` from +/// both `check` and `watch`. +pub struct StaticChecker { + services: Mutex>>, +} + +/// Returned by [`StaticChecker::set_status`] when called with a name +/// that hasn't been registered. Register first via +/// [`StaticChecker::register`] (or up front via +/// [`StaticChecker::with_services`]) to avoid this. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct UnknownServiceError { + service: String, +} + +impl UnknownServiceError { + /// The unknown service name that triggered the error. + #[must_use] + pub fn service(&self) -> &str { + &self.service + } +} + +impl std::fmt::Display for UnknownServiceError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + // Quote with `{:?}` so the empty whole-process name and any + // whitespace-only typos render visibly in log lines. + write!(f, "unknown service {:?}", self.service) + } +} + +impl std::error::Error for UnknownServiceError {} + +impl StaticChecker { + /// Create a checker with only the whole-process entry (`""`) seeded + /// at [`Status::Serving`]. + #[must_use] + pub fn new() -> Self { + Self::with_services(std::iter::empty::<&str>()) + } + + /// Create a checker pre-populated with the given services and the + /// whole-process entry (`""`), each reporting [`Status::Serving`]. + /// Pass the generated `*_SERVICE_NAME` constant to avoid typos. + /// + /// The synthetic `""` entry is chained *before* the user entries, + /// so a user-supplied `""` (or any duplicate key) wins the last-write + /// when the iterator is collected into the underlying `HashMap`. + /// Duplicate non-empty names also last-write-win — pass each name + /// once. + #[must_use] + pub fn with_services(services: I) -> Self + where + I: IntoIterator, + S: Into, + { + // Empty entry first → user entries chained after → HashMap + // collect is last-write-wins → a user-supplied `""` overrides + // the synthetic one. + let map = std::iter::once((String::new(), watch::channel(Status::Serving).0)) + .chain( + services + .into_iter() + .map(|s| (s.into(), watch::channel(Status::Serving).0)), + ) + .collect(); + Self { + services: Mutex::new(map), + } + } + + /// Register `service` with an initial [`Status::Serving`]. Returns + /// `true` if the service was newly registered, `false` if it was + /// already known (existing status and subscribers preserved). + /// + /// Takes `impl Into` (consumes / allocates) because the + /// name becomes a map key on the `Vacant` branch. The companion + /// [`set_status`](Self::set_status) takes `impl AsRef` instead + /// — it only borrows for the lookup, no insertion path. + /// + /// `register("")` is a no-op in the common case (the whole-process + /// entry is seeded at construction by [`new`](Self::new) / + /// [`with_services`](Self::with_services)); it returns `true` only + /// after [`remove_service("")`](Self::remove_service), where it + /// installs a fresh `Sender` (in-flight `Watch` subscribers were + /// already terminated by the `remove_service` call that dropped + /// the prior `Sender`). + #[must_use = "the bool indicates whether the name was newly registered \ + (true) or already present (false); ignore with `let _ = …` \ + if you don't care which"] + pub fn register(&self, service: impl Into) -> bool { + self.register_with_status(service, Status::Serving) + } + + /// Register `service` with the given initial `status`. Returns + /// `true` if newly registered, `false` if already known (status + /// preserved — use [`set_status`](Self::set_status) to update an + /// existing entry). Use this to bring a service up as + /// [`Status::NotServing`] while you wait for its dependencies. + #[must_use = "the bool indicates whether the name was newly registered \ + (true) or already present (false); ignore with `let _ = …` \ + if you don't care which"] + pub fn register_with_status(&self, service: impl Into, status: Status) -> bool { + let mut services = self.lock(); + match services.entry(service.into()) { + Entry::Vacant(slot) => { + slot.insert(watch::channel(status).0); + true + } + Entry::Occupied(_) => false, + } + } + + /// Update the status of a previously [`register`](Self::register)ed + /// service. Existing `Watch` subscribers are notified; no-op + /// transitions are suppressed. + /// + /// Takes `impl AsRef` (borrowing, no allocation) because — unlike + /// [`register`](Self::register) — there's no insertion: a `String`, + /// `&String`, or `&str` all work without cloning. + /// + /// # Errors + /// + /// Returns [`UnknownServiceError`] if `service` was never registered. + /// This is the strict, typo-catching variant — call + /// [`register`](Self::register) (or + /// [`with_services`](Self::with_services) up front) before flipping + /// status. + pub fn set_status( + &self, + service: impl AsRef, + status: Status, + ) -> Result<(), UnknownServiceError> { + let service = service.as_ref(); + let services = self.lock(); + let Some(sender) = services.get(service) else { + return Err(UnknownServiceError { + service: service.to_string(), + }); + }; + sender.send_if_modified(|current| { + if *current == status { + false + } else { + *current = status; + true + } + }); + Ok(()) + } + + /// Remove `service` from the registry. Returns `true` if it was + /// present. Active `Watch` streams complete when the underlying + /// [`watch::Sender`] is dropped; subsequent `Check`/`Watch` calls + /// for the name return `NotFound`. The whole-process entry (`""`) + /// is removable like any other — call only if you really mean it. + pub fn remove_service(&self, service: &str) -> bool { + self.lock().remove(service).is_some() + } + + /// Mark every registered service [`Status::NotServing`], including + /// the whole-process `""` entry. Call this from your shutdown handler + /// before draining traffic. Services registered after `shutdown` are + /// unaffected. + pub fn shutdown(&self) { + let services = self.lock(); + for sender in services.values() { + sender.send_if_modified(|status| { + if *status == Status::NotServing { + false + } else { + *status = Status::NotServing; + true + } + }); + } + } + + /// Snapshot of every registered service name. Includes the + /// whole-process entry `""` unless a caller explicitly removed it + /// via [`remove_service("")`](Self::remove_service); covers names + /// supplied to [`with_services`](Self::with_services), + /// [`register`](Self::register), and + /// [`register_with_status`](Self::register_with_status), minus + /// anything removed since. + #[must_use] + pub fn services(&self) -> Vec { + self.lock().keys().cloned().collect() + } + + // Poison recovery: the wrapped state is `Status` + `watch::Sender`, + // both safe to observe after a panic, so we keep going instead of + // turning the next handler call into a second panic. + fn lock(&self) -> std::sync::MutexGuard<'_, HashMap>> { + self.services + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + } + + /// Live `watch::Receiver` count for `service`, or `None` if not + /// registered. Used by the in-crate end-to-end Watch-disconnect + /// tests to observe server-side subscriber cleanup. + /// + /// Monotonic-per-subscriber: the count can only rise via an + /// explicit `Sender::subscribe()` call and can only fall when a + /// `Receiver` is dropped, so once a Watch RPC's server-side + /// `Receiver` is live the count cannot transiently dip below the + /// subscribed baseline until that `Receiver` actually drops. Safe + /// to poll from a test without `Acquire` fences beyond the + /// `Mutex`-guarded map access. + #[cfg(test)] + pub(crate) fn receiver_count_for(&self, service: &str) -> Option { + self.lock().get(service).map(watch::Sender::receiver_count) + } +} + +impl Default for StaticChecker { + fn default() -> Self { + Self::new() + } +} + +impl Checker for StaticChecker { + async fn check(&self, service: &str) -> Result { + // Block-scope the guard: forces `Send`-incompatible state to drop + // before any future code path could grow an `.await`. + let snapshot = { + let services = self.lock(); + services.get(service).map(|sender| *sender.borrow()) + }; + snapshot.ok_or_else(|| ConnectError::not_found(format!("unknown service {service}"))) + } + + async fn watch(&self, service: &str) -> Result { + let receiver = { + let services = self.lock(); + services.get(service).map(watch::Sender::subscribe) + }; + receiver + .map(StatusStream::from_watch) + .ok_or_else(|| ConnectError::not_found(format!("unknown service {service}"))) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use futures::StreamExt; + + use super::*; + + #[tokio::test] + async fn check_unknown_service_returns_not_found() { + let checker = StaticChecker::new(); + let err = checker.check("acme.NoSuch").await.unwrap_err(); + assert_eq!(err.code, connectrpc::ErrorCode::NotFound); + } + + #[tokio::test] + async fn check_empty_service_defaults_to_serving() { + let checker = StaticChecker::new(); + assert_eq!(checker.check("").await.unwrap(), Status::Serving); + } + + #[tokio::test] + async fn with_services_seeds_serving() { + let checker = StaticChecker::with_services(["acme.A", "acme.B"]); + assert_eq!(checker.check("acme.A").await.unwrap(), Status::Serving); + assert_eq!(checker.check("acme.B").await.unwrap(), Status::Serving); + } + + /// `with_services` doesn't panic or deadlock when the user list + /// contains duplicates (including a duplicate `""`); the entries + /// collapse last-write-wins under `HashMap::collect`. The surviving + /// Sender must be live — a `watch`/`set_status` round-trip exercises + /// the dedup'd entry to catch any "orphaned Sender" regression. + #[tokio::test] + async fn with_services_duplicates_collapse_cleanly() { + let checker = StaticChecker::with_services(["foo", "foo", ""]); + let mut names = checker.services(); + names.sort(); + assert_eq!(names, vec!["", "foo"]); + + // Exercise both surviving entries end-to-end. + for name in ["foo", ""] { + let mut stream = checker.watch(name).await.unwrap(); + assert_eq!(stream.next().await.unwrap(), Status::Serving); + checker.set_status(name, Status::NotServing).unwrap(); + let next = tokio::time::timeout(std::time::Duration::from_secs(1), stream.next()) + .await + .expect("subscriber must receive update after dedup") + .unwrap(); + assert_eq!(next, Status::NotServing); + // Reset for the next iteration. + checker.set_status(name, Status::Serving).unwrap(); + } + } + + #[tokio::test] + async fn set_status_unknown_returns_error() { + let checker = StaticChecker::new(); + let err = checker + .set_status("acme.A", Status::NotServing) + .unwrap_err(); + assert_eq!(err.service(), "acme.A"); + } + + #[tokio::test] + async fn register_then_set_status_works() { + let checker = StaticChecker::new(); + assert!(checker.register("acme.A")); + checker.set_status("acme.A", Status::NotServing).unwrap(); + assert_eq!(checker.check("acme.A").await.unwrap(), Status::NotServing); + } + + #[tokio::test] + async fn register_is_idempotent_and_preserves_status() { + let checker = StaticChecker::with_services(["acme.A"]); + checker.set_status("acme.A", Status::NotServing).unwrap(); + // Re-registering an existing name must not reset its status. + assert!(!checker.register("acme.A")); + assert_eq!(checker.check("acme.A").await.unwrap(), Status::NotServing); + } + + #[tokio::test] + async fn register_with_status_seeds_initial_value() { + let checker = StaticChecker::new(); + assert!(checker.register_with_status("acme.A", Status::NotServing)); + assert_eq!(checker.check("acme.A").await.unwrap(), Status::NotServing); + } + + /// Mirror of `register_is_idempotent_and_preserves_status` for the + /// `register_with_status` variant: when the entry is already + /// registered, the call must return `false`, preserve the existing + /// status, AND keep the same `watch::Sender` so in-flight + /// subscribers stay connected. Pins the Occupied branch against + /// refactors that either overwrite the status or swap the sender + /// while still returning `false`. + #[tokio::test] + async fn register_with_status_is_idempotent_and_preserves_status() { + let checker = StaticChecker::with_services(["acme.A"]); + checker.set_status("acme.A", Status::NotServing).unwrap(); + + // Subscribe BEFORE the redundant register. A regression that + // swaps the underlying `Sender` (e.g. `slot.insert(channel(s).0)` + // returning `false`) would orphan this subscriber — visible + // here as a missed update on the post-register `set_status`. + let mut stream = checker.watch("acme.A").await.unwrap(); + assert_eq!(stream.next().await.unwrap(), Status::NotServing); + + // Re-register with a status that conflicts with the live one; + // existing status must win, the call must return `false`. + assert!(!checker.register_with_status("acme.A", Status::Serving)); + assert_eq!(checker.check("acme.A").await.unwrap(), Status::NotServing); + + // Now flip the status through the SAME entry. If the prior + // register call swapped the Sender out from under our + // subscriber, this update never arrives. + checker.set_status("acme.A", Status::Serving).unwrap(); + let next = tokio::time::timeout(std::time::Duration::from_secs(1), stream.next()) + .await + .expect("subscriber must survive register_with_status no-op") + .unwrap(); + assert_eq!(next, Status::Serving); + } + + #[tokio::test] + async fn remove_service_returns_true_and_makes_subsequent_checks_not_found() { + let checker = StaticChecker::with_services(["acme.A"]); + assert!(checker.remove_service("acme.A")); + let err = checker.check("acme.A").await.unwrap_err(); + assert_eq!(err.code, connectrpc::ErrorCode::NotFound); + } + + #[tokio::test] + async fn remove_service_returns_false_for_unknown() { + let checker = StaticChecker::new(); + assert!(!checker.remove_service("acme.NoSuch")); + } + + #[tokio::test] + async fn shutdown_marks_all_not_serving() { + let checker = StaticChecker::with_services(["acme.A", "acme.B"]); + checker.shutdown(); + assert_eq!(checker.check("acme.A").await.unwrap(), Status::NotServing); + assert_eq!(checker.check("acme.B").await.unwrap(), Status::NotServing); + } + + #[tokio::test] + async fn shutdown_leaves_post_registered_services_alone() { + let checker = StaticChecker::with_services(["acme.A"]); + checker.shutdown(); + // Registering after shutdown is the documented escape hatch — the + // new service must come up Serving, not NotServing. + assert!(checker.register("acme.B")); + assert_eq!(checker.check("acme.B").await.unwrap(), Status::Serving); + } + + #[tokio::test] + async fn shutdown_is_noop_for_already_not_serving() { + let checker = StaticChecker::with_services(["acme.A"]); + checker.set_status("acme.A", Status::NotServing).unwrap(); + let mut stream = checker.watch("acme.A").await.unwrap(); + assert_eq!(stream.next().await.unwrap(), Status::NotServing); + + // Already NotServing → shutdown must not emit a notification. + checker.shutdown(); + tokio::select! { + item = stream.next() => panic!("unexpected notification on no-op shutdown: {item:?}"), + () = tokio::time::sleep(std::time::Duration::from_millis(50)) => {} + } + } + + #[tokio::test] + async fn watch_streams_initial_and_changes() { + let checker = StaticChecker::with_services(["acme.A"]); + let mut stream = checker.watch("acme.A").await.unwrap(); + + // Initial value is the current state. + assert_eq!(stream.next().await.unwrap(), Status::Serving); + + // Update fires the subscriber. + checker.set_status("acme.A", Status::NotServing).unwrap(); + let next = tokio::time::timeout(std::time::Duration::from_secs(1), stream.next()) + .await + .expect("watch did not deliver update within timeout") + .unwrap(); + assert_eq!(next, Status::NotServing); + } + + #[tokio::test] + async fn watch_unknown_service_returns_not_found() { + let checker = StaticChecker::new(); + let err = checker.watch("acme.NoSuch").await.unwrap_err(); + assert_eq!(err.code, connectrpc::ErrorCode::NotFound); + } + + #[tokio::test] + async fn watch_empty_service_subscribes() { + let checker = StaticChecker::new(); + let mut stream = checker.watch("").await.unwrap(); + assert_eq!(stream.next().await.unwrap(), Status::Serving); + } + + // Both subscribers see updates from the same Sender — a regression + // in `register` that swapped the entry's Sender out from under + // existing subscribers would break this silently. + #[tokio::test] + async fn concurrent_watchers_of_registered_service_share_a_sender() { + let checker = Arc::new(StaticChecker::with_services(["acme.A"])); + let mut a = checker.watch("acme.A").await.unwrap(); + let mut b = checker.watch("acme.A").await.unwrap(); + + assert_eq!(a.next().await.unwrap(), Status::Serving); + assert_eq!(b.next().await.unwrap(), Status::Serving); + + checker.set_status("acme.A", Status::NotServing).unwrap(); + let a_next = tokio::time::timeout(std::time::Duration::from_secs(1), a.next()) + .await + .expect("subscriber A did not receive update") + .unwrap(); + let b_next = tokio::time::timeout(std::time::Duration::from_secs(1), b.next()) + .await + .expect("subscriber B did not receive update") + .unwrap(); + assert_eq!(a_next, Status::NotServing); + assert_eq!(b_next, Status::NotServing); + } + + // Regression test: earlier code inserted a fresh Sender on every + // watch("") call, orphaning prior subscribers. + #[tokio::test] + async fn concurrent_watchers_of_empty_service_share_a_sender() { + let checker = Arc::new(StaticChecker::new()); + let mut a = checker.watch("").await.unwrap(); + let mut b = checker.watch("").await.unwrap(); + + // Both see the initial value. + assert_eq!(a.next().await.unwrap(), Status::Serving); + assert_eq!(b.next().await.unwrap(), Status::Serving); + + // A single update must reach both subscribers. + checker.set_status("", Status::NotServing).unwrap(); + let a_next = tokio::time::timeout(std::time::Duration::from_secs(1), a.next()) + .await + .expect("subscriber A did not receive update") + .unwrap(); + let b_next = tokio::time::timeout(std::time::Duration::from_secs(1), b.next()) + .await + .expect("subscriber B did not receive update") + .unwrap(); + assert_eq!(a_next, Status::NotServing); + assert_eq!(b_next, Status::NotServing); + } + + #[tokio::test] + async fn set_same_status_does_not_notify() { + let checker = StaticChecker::with_services(["acme.A"]); + let mut stream = checker.watch("acme.A").await.unwrap(); + assert_eq!(stream.next().await.unwrap(), Status::Serving); + + // No change → no notification. + checker.set_status("acme.A", Status::Serving).unwrap(); + tokio::select! { + item = stream.next() => panic!("unexpected notification on no-op set_status: {item:?}"), + () = tokio::time::sleep(std::time::Duration::from_millis(50)) => {} + } + } + + #[tokio::test] + async fn services_lists_every_registered_name() { + let checker = StaticChecker::with_services(["acme.A", "acme.B"]); + assert!(checker.register("acme.C")); + checker.set_status("acme.C", Status::NotServing).unwrap(); + + let mut names = checker.services(); + names.sort(); + // The whole-process "" entry is always present. + assert_eq!(names, vec!["", "acme.A", "acme.B", "acme.C"]); + } + + #[tokio::test] + async fn shutdown_flips_whole_process_entry() { + let checker = StaticChecker::new(); + assert_eq!(checker.check("").await.unwrap(), Status::Serving); + checker.shutdown(); + assert_eq!(checker.check("").await.unwrap(), Status::NotServing); + } + + // Cancelling a Watch RPC must release the underlying watch::Receiver + // so the Sender stops holding state for a dead subscriber. + #[tokio::test] + async fn dropping_watch_stream_releases_subscriber() { + let checker = StaticChecker::with_services(["acme.A"]); + + let receiver_count_before = { + let services = checker.services.lock().unwrap(); + services.get("acme.A").unwrap().receiver_count() + }; + + let stream = checker.watch("acme.A").await.unwrap(); + let receiver_count_during = { + let services = checker.services.lock().unwrap(); + services.get("acme.A").unwrap().receiver_count() + }; + assert_eq!(receiver_count_during, receiver_count_before + 1); + + drop(stream); + // `WatchStream` drops its `Receiver` synchronously, so the + // `Sender` observes the decrement immediately. + let receiver_count_after = { + let services = checker.services.lock().unwrap(); + services.get("acme.A").unwrap().receiver_count() + }; + assert_eq!(receiver_count_after, receiver_count_before); + } + + #[tokio::test] + async fn concurrent_set_status_does_not_panic() { + let checker = Arc::new(StaticChecker::with_services(["acme.race"])); + let mut handles = Vec::new(); + for i in 0..50 { + let c = Arc::clone(&checker); + handles.push(tokio::spawn(async move { + let status = if i % 2 == 0 { + Status::Serving + } else { + Status::NotServing + }; + c.set_status("acme.race", status).unwrap(); + })); + } + for h in handles { + h.await.unwrap(); + } + let final_status = checker.check("acme.race").await.unwrap(); + assert!(matches!(final_status, Status::Serving | Status::NotServing)); + } + + /// N tasks race to register the same name. Exactly one must observe + /// `Vacant` and return `true`; the rest see `Occupied` and return + /// `false`. A subscriber wrapped around the first sender must still + /// receive updates after the race — i.e. no register call orphaned + /// the live `Sender` by replacing it. + /// + /// Uses `flavor = "multi_thread"` so the 50 spawned tasks actually + /// contend for the `Mutex`; the default `current_thread` runtime + /// would serialize them and the test name would be misleading. + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn concurrent_register_only_one_wins_and_subscriber_survives() { + let checker = Arc::new(StaticChecker::new()); + // Use a fresh contended name so exactly one register call sees + // `Vacant` and returns `true`; the rest see `Occupied` and + // return `false`. After the race, subscribe and verify the + // surviving `Sender` propagates updates. + let mut handles = Vec::new(); + for _ in 0..50 { + let c = Arc::clone(&checker); + handles.push(tokio::spawn(async move { c.register("acme.race") })); + } + let mut winners = 0; + for h in handles { + if h.await.unwrap() { + winners += 1; + } + } + assert_eq!( + winners, 1, + "exactly one register call must succeed under contention" + ); + + // Subscribe AFTER the race, then update — the live Sender is the + // one the winner installed; this subscriber must receive it. + let mut stream = checker.watch("acme.race").await.unwrap(); + assert_eq!(stream.next().await.unwrap(), Status::Serving); + checker.set_status("acme.race", Status::NotServing).unwrap(); + let next = tokio::time::timeout(std::time::Duration::from_secs(1), stream.next()) + .await + .expect("subscriber did not receive update") + .unwrap(); + assert_eq!(next, Status::NotServing); + } + + /// `register("")` is normally a no-op (the whole-process entry is + /// seeded at construction), but after `remove_service("")` the slot + /// is `Vacant` again — registering re-inserts and returns `true`. + /// The doc comment on `register` calls this case out explicitly; + /// this test pins the behavior. + #[tokio::test] + async fn register_after_remove_inserts_fresh() { + let checker = StaticChecker::new(); + assert!(!checker.register(""), "default `\"\"` entry is seeded"); + assert!(checker.remove_service(""), "remove must succeed"); + assert!( + checker.register(""), + "register after remove must re-insert and return true" + ); + assert_eq!(checker.check("").await.unwrap(), Status::Serving); + } + + /// `set_status` takes `impl AsRef`. Verify the three common + /// call shapes all type-check without coaxing: `&str`, `String`, + /// and `&String`. A regression that tightened the bound to `&str` + /// would break two of these. + #[tokio::test] + async fn set_status_accepts_str_string_and_borrowed_string() { + let checker = StaticChecker::with_services(["acme.A"]); + + // &str + checker.set_status("acme.A", Status::NotServing).unwrap(); + assert_eq!(checker.check("acme.A").await.unwrap(), Status::NotServing); + + // String (owned, by value) + let owned: String = "acme.A".to_string(); + checker.set_status(owned, Status::Serving).unwrap(); + assert_eq!(checker.check("acme.A").await.unwrap(), Status::Serving); + + // &String — should coerce via AsRef without an explicit `.as_str()` + let s: String = "acme.A".to_string(); + checker.set_status(&s, Status::NotServing).unwrap(); + assert_eq!(checker.check("acme.A").await.unwrap(), Status::NotServing); + } + + /// `UnknownServiceError` carries the offending name verbatim, + /// including empty / whitespace-only / non-ASCII forms. Verify the + /// public accessors return them faithfully and `Display` keeps them + /// visible (via `{:?}` debug-quoting). + #[test] + fn unknown_service_error_preserves_name() { + let err = UnknownServiceError { + service: String::new(), + }; + assert_eq!(err.service(), ""); + let s = err.to_string(); + assert!( + s.contains("\"\""), + "empty name must render as `\"\"` so logs distinguish \ + it from a missing field: got {s:?}" + ); + + let err = UnknownServiceError { + service: "acme.❄.frozen".into(), + }; + assert_eq!(err.service(), "acme.❄.frozen"); + // Debug-quoting escapes non-ASCII to \u{...}. The intent is + // documented on the Display impl; this test pins it so a + // change to plain quoting surfaces here first. + assert!( + err.to_string().contains("acme."), + "Display must include the prefix verbatim: {err}" + ); + } +} diff --git a/connectrpc-health/src/status.rs b/connectrpc-health/src/status.rs new file mode 100644 index 0000000..a4836c3 --- /dev/null +++ b/connectrpc-health/src/status.rs @@ -0,0 +1,135 @@ +//! The `Status` enum returned by checkers and streamed by watchers. +//! +//! `Status` is the Rust-side type for [`Checker`](crate::Checker) impls and +//! [`StaticChecker`](crate::StaticChecker); the server maps it to +//! [`wire::ServingStatus`](crate::wire::ServingStatus) automatically. Reach +//! for the wire enum only when decoding a raw `HealthCheckResponse` off the +//! network (e.g. in a probe loop). `SERVICE_UNKNOWN` is intentionally not +//! represented. Unknown services surface as `NotFound` instead. + +use crate::proto::grpc::health::v1::health_check_response::ServingStatus; + +/// Health status of a single service or of the whole server. +/// +/// `Status::default()` is [`Status::Unknown`] (the proto wire default), not +/// [`Status::Serving`] — [`StaticChecker::with_services`] seeds new entries +/// with `Serving` because that's almost always what registering a service +/// means in practice. +/// +/// [`StaticChecker::with_services`]: crate::StaticChecker::with_services +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)] +pub enum Status { + /// The implementation has not (yet) determined whether the service + /// is healthy. + #[default] + Unknown, + /// The service is ready to accept requests. + Serving, + /// The process is up but the service is intentionally not accepting + /// requests (e.g. a dependency is down, or the service is draining + /// in preparation for shutdown). + NotServing, +} + +impl Status { + /// Lowercase string representation, useful for logging. + #[must_use] + pub fn as_str(self) -> &'static str { + match self { + Self::Unknown => "unknown", + Self::Serving => "serving", + Self::NotServing => "not_serving", + } + } +} + +impl std::fmt::Display for Status { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(self.as_str()) + } +} + +impl From for ServingStatus { + fn from(value: Status) -> Self { + match value { + Status::Unknown => ServingStatus::UNKNOWN, + Status::Serving => ServingStatus::SERVING, + Status::NotServing => ServingStatus::NOT_SERVING, + } + } +} + +impl From for Status { + /// Map the wire enum back to [`Status`]. Useful when decoding a raw + /// `HealthCheckResponse` off the network — e.g. a probe loop holding + /// the response payload before re-encoding it. + /// + /// `SERVICE_UNKNOWN` maps to [`Status::Unknown`]: this crate + /// surfaces unknown services as a transport-layer `NotFound` + /// rather than as a wire status, so the round-trip is + /// information-preserving for the three statuses [`Status`] + /// models. The match is exhaustive — if the generated + /// `grpc.health.v1` proto ever grows a fifth variant, this site + /// fails to compile and forces an explicit mapping decision. + fn from(value: ServingStatus) -> Self { + match value { + ServingStatus::SERVING => Status::Serving, + ServingStatus::NOT_SERVING => Status::NotServing, + ServingStatus::UNKNOWN | ServingStatus::SERVICE_UNKNOWN => Status::Unknown, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn default_is_unknown() { + assert_eq!(Status::default(), Status::Unknown); + } + + #[test] + fn as_str_lowercase() { + assert_eq!(Status::Unknown.as_str(), "unknown"); + assert_eq!(Status::Serving.as_str(), "serving"); + assert_eq!(Status::NotServing.as_str(), "not_serving"); + } + + #[test] + fn maps_to_serving_status() { + assert_eq!(ServingStatus::from(Status::Unknown), ServingStatus::UNKNOWN); + assert_eq!(ServingStatus::from(Status::Serving), ServingStatus::SERVING); + assert_eq!( + ServingStatus::from(Status::NotServing), + ServingStatus::NOT_SERVING, + ); + } + + #[test] + fn maps_from_serving_status() { + assert_eq!(Status::from(ServingStatus::SERVING), Status::Serving); + assert_eq!(Status::from(ServingStatus::NOT_SERVING), Status::NotServing); + assert_eq!(Status::from(ServingStatus::UNKNOWN), Status::Unknown); + // SERVICE_UNKNOWN collapses to Unknown, not NotServing — the + // wire signal is "we couldn't tell you", not "we're definitely + // down". The match is exhaustive at the impl site, so a future + // proto variant would force an explicit mapping decision there. + assert_eq!( + Status::from(ServingStatus::SERVICE_UNKNOWN), + Status::Unknown + ); + } + + #[test] + fn round_trip_through_serving_status() { + for status in [Status::Unknown, Status::Serving, Status::NotServing] { + let wire = ServingStatus::from(status); + assert_eq!( + Status::from(wire), + status, + "round trip failed for {status:?}" + ); + } + } +} diff --git a/docs/guide.md b/docs/guide.md index a9b55a5..22d8a93 100644 --- a/docs/guide.md +++ b/docs/guide.md @@ -17,6 +17,7 @@ with the [README quick start](../README.md#quick-start) and the - [Tower middleware](#tower-middleware) - [Interceptors](#interceptors) - [Hosting](#hosting) +- [Health checking](#health-checking) - [Clients](#clients) - [Errors and status codes](#errors-and-status-codes) - [Compression](#compression) @@ -31,6 +32,7 @@ with the [README quick start](../README.md#quick-start) and the | `connectrpc` | Tower-based runtime: server dispatcher, client transports, codec, compression | | `protoc-gen-connect-rust` (binary, in `connectrpc-codegen`) | `protoc` plugin that generates service stubs | | `connectrpc-build` | `build.rs` integration that runs the codegen at build time | +| `connectrpc-health` | The standard `grpc.health.v1.Health` service for liveness / readiness probes ([Health checking](#health-checking)) | Add the runtime to your `Cargo.toml`: @@ -977,6 +979,82 @@ mtls-identity example demonstrates `serve_tls` end-to-end with cert-SAN identity extraction and an ACL keyed on it. +## Health checking + +The `connectrpc-health` crate implements the standard +`grpc.health.v1.Health` service. Mount it on your Connect router and +clients like `grpc_health_probe`, kubelet's `grpc:` probe, and gRPC-aware +service meshes (Linkerd, Istio) just work. + +This is the gRPC protocol — different from the plain HTTP `GET /health` +route shown earlier in the [Hosting](#hosting) section. Keep the HTTP +route for `httpGet:` probes; add the gRPC service for `grpc:` probes. + +```toml +[dependencies] +connectrpc = { version = "0.6", features = ["server"] } +connectrpc-health = "0.6" +``` + +```rust,no_run +use connectrpc::Router; +use connectrpc_health::{install_static, Status}; + +// `install_static` registers every name with `Status::Serving`; use the +// generated `*_SERVICE_NAME` constants so the registered name matches +// exactly what clients ask for. The whole-process `""` entry is seeded +// for you, so probes that don't pass a service name also work. +# mod proto { pub mod greet { pub mod v1 { +# pub const GREET_SERVICE_SERVICE_NAME: &str = "greet.v1.GreetService"; +# } } } +let (router, health) = install_static(Router::new(), [ + proto::greet::v1::GREET_SERVICE_SERVICE_NAME, +]); + +// Flip status when something goes wrong. `set_status` errors on an +// unknown name, so typos surface immediately instead of silently +// shadowing the real entry. +health + .set_status(proto::greet::v1::GREET_SERVICE_SERVICE_NAME, Status::NotServing) + .expect("registered above"); + +// At shutdown, drain. `shutdown()` flips every registered service, +// including the empty whole-process entry: +health.shutdown(); +# drop(router); +``` + +For custom logic (e.g. report `NotServing` while a database connection +is down), implement the `Checker` trait directly and wrap it in +`HealthService::new(...)` or `HealthService::from_arc(...)`. The default +`Checker::watch` body returns `Unimplemented`, which is fine for +Check-only probes; override it if your probes call Watch. + +The `HealthClient` (for in-process probes, integration tests, sidecar +tooling) is gated on a `client` Cargo feature that is **on by default**. +Server-only deployments turn it off: + +```toml +[dependencies] +connectrpc = { version = "0.6", features = ["server"] } +connectrpc-health = { version = "0.6", default-features = false } +``` + +That drops `connectrpc/client` (the HTTP/2 transport stack) from the +dependency graph entirely. `use connectrpc_health::HealthClient` then +becomes an unresolved import, but the binary stays lean. + +**Unknown services on `Watch`.** Non-empty unregistered services return +`Err(ConnectError::not_found(_))` from both `Check` and `Watch`; the +empty service auto-subscribes on `Watch` and returns `Serving` on +`Check` by default. The gRPC Health spec additionally describes a +`SERVICE_UNKNOWN` keep-stream-open flow for `Watch` that this crate +does not implement, matching the Go `connectrpc.com/grpchealth` +reference. Every probe that treats any error as a failure — kubelet's +`grpc:` probe, `grpc_health_probe`, Linkerd, Istio — works unchanged. +See `HealthService`'s `# Unknown services` section in the crate docs +for the full context. + ## Production hardening ### Deadline policy