From 87c99cf7cac2c74183c48d5e3183866bcec72ab7 Mon Sep 17 00:00:00 2001 From: Gethyl Kurian Date: Mon, 13 Apr 2026 20:41:32 +1000 Subject: [PATCH 1/9] chore: move otel initialisation from talos_certifier to talos_metrics package chore: consolidating otel initialisation from talos_metrics package --- Cargo.lock | 9 +- .../examples/cohort_banking_with_sdk.rs | 64 +---------- packages/talos_certifier/Cargo.toml | 1 + packages/talos_certifier/src/lib.rs | 1 - .../src/certifier_kafka_pg.rs | 2 +- packages/talos_metrics/Cargo.toml | 11 +- packages/talos_metrics/src/lib.rs | 1 + .../src/opentel/aggregation_selector.rs | 102 ++++++++---------- packages/talos_metrics/src/opentel/mod.rs | 2 +- .../src/otel.rs | 0 10 files changed, 68 insertions(+), 125 deletions(-) rename packages/{talos_certifier => talos_metrics}/src/otel.rs (100%) diff --git a/Cargo.lock b/Cargo.lock index c2762cff..c68a1078 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3266,6 +3266,7 @@ dependencies = [ "serde_json", "strum 0.24.1", "talos_common_utils", + "talos_metrics", "talos_suffix", "thiserror 1.0.69", "time", @@ -3428,13 +3429,17 @@ dependencies = [ "env_logger", "log", "once_cell", - "opentelemetry 0.20.0", + "opentelemetry 0.28.0", + "opentelemetry-otlp", "opentelemetry-stdout 0.1.0", "opentelemetry_api", - "opentelemetry_sdk 0.20.0", + "opentelemetry_sdk 0.28.0", "serde", "serde_json", + "strum 0.24.1", + "thiserror 1.0.69", "time", + "tracing", ] [[package]] diff --git a/examples/cohort_banking_with_sdk/examples/cohort_banking_with_sdk.rs b/examples/cohort_banking_with_sdk/examples/cohort_banking_with_sdk.rs index 7c6ac891..8d8dbba2 100644 --- a/examples/cohort_banking_with_sdk/examples/cohort_banking_with_sdk.rs +++ b/examples/cohort_banking_with_sdk/examples/cohort_banking_with_sdk.rs @@ -10,16 +10,9 @@ use examples_support::load_generator::models::Generator; use examples_support::load_generator::{generator::ControlledRateLoadGenerator, models::StopType}; use opentelemetry_api::global; -use opentelemetry_api::metrics::MetricsError; -use opentelemetry_sdk::metrics::reader::DefaultTemporalitySelector; -use opentelemetry_sdk::metrics::{MeterProvider, PeriodicReader}; -use opentelemetry_sdk::runtime; -use opentelemetry_stdout::MetricsExporterBuilder; use rand::Rng; use rust_decimal::prelude::FromPrimitive; -use talos_metrics::opentel::aggregation_selector::CustomHistogramSelector; -use talos_metrics::opentel::printer::MetricsToStringPrinter; -use talos_metrics::opentel::scaling::ScalingConfig; +use talos_metrics::otel::init_otel_metrics; use talos_rdkafka_utils::kafka_config::KafkaConfig; use tokio::{signal, task::JoinHandle, try_join}; @@ -31,8 +24,6 @@ struct LaunchParams { target_rate: f32, threads: u64, accounts: u64, - scaling_config: HashMap, - metric_print_raw: bool, } /// Connects to database, to kafka certification topic as talos agent and as cohort replicator. @@ -118,23 +109,7 @@ async fn main() -> Result<(), String> { let db_config = DatabaseConfig::from_env(Some("COHORT"))?; - let printer = MetricsToStringPrinter::new(params.threads, params.metric_print_raw, ScalingConfig { ratios: params.scaling_config }); - let (tx_metrics, rx_metrics) = tokio::sync::watch::channel("".to_string()); - let exporter = MetricsExporterBuilder::default() - .with_aggregation_selector(CustomHistogramSelector::new_with_4k_buckets()?) - .with_temporality_selector(DefaultTemporalitySelector::new()) - .with_encoder(move |_writer, data| { - let report = printer.print(&data).map_err(MetricsError::Other)?; - tx_metrics.send(report).map_err(|e| MetricsError::Other(e.to_string()))?; - Ok(()) - }) - .build(); - - let reader = PeriodicReader::builder(exporter, runtime::Tokio).build(); - - let meter_provider = MeterProvider::builder().with_reader(reader).build(); - let meter_provider_copy = meter_provider.clone(); - global::set_meter_provider(meter_provider); + let _ = init_otel_metrics(sdk_config.otel_telemetry.grpc_endpoint.clone()); let meter = global::meter("banking_cohort"); let meter = Arc::new(meter); @@ -178,9 +153,6 @@ async fn main() -> Result<(), String> { } shutdown_tracer_provider(); - let _ = meter_provider_copy.shutdown(); - let report = rx_metrics.borrow(); - log::warn!("{}", *report); Ok(()) } @@ -200,8 +172,6 @@ async fn get_params() -> Result { let mut accounts: Option = None; let mut target_rate: Option = None; let mut stop_type: Option = None; - let mut scaling_config: Option> = None; - let mut metric_print_raw = None; if args.len() >= 3 { let mut i = 1; @@ -228,34 +198,6 @@ async fn get_params() -> Result { let count: u64 = param_value.parse().unwrap(); stop_type = Some(StopType::LimitGeneratedTransactions { count }) } - } else if param_name.eq("--metric_print_raw") { - metric_print_raw = Some(true); - } else if param_name.eq("--metric_scaling") { - let param_value = &args[i + 1]; - let mut cfg: HashMap = HashMap::new(); - for spec in param_value.replace(' ', "").split(',') { - if let Some(i) = spec.find('=') { - let metric: String = spec[..i].into(); - let scale_factor_raw: String = spec[i + 1..].into(); - let scale_factor = match scale_factor_raw.parse::() { - Err(e) => { - log::error!( - "Unable to parse scaling factor for metric '{}'. No scaling will be applied. Pasing: '{}'. Error: {}.", - metric, - scale_factor_raw, - e - ); - 1_f32 - } - Ok(scale_factor) => scale_factor, - }; - cfg.insert(metric, scale_factor); - } - } - - if !cfg.is_empty() { - scaling_config = Some(cfg); - } } i += 2; @@ -271,8 +213,6 @@ async fn get_params() -> Result { stop_type, threads: threads.unwrap(), accounts, - scaling_config: scaling_config.unwrap_or_default(), - metric_print_raw: metric_print_raw.is_some(), }) } diff --git a/packages/talos_certifier/Cargo.toml b/packages/talos_certifier/Cargo.toml index 68669ce0..2d3fb938 100644 --- a/packages/talos_certifier/Cargo.toml +++ b/packages/talos_certifier/Cargo.toml @@ -58,6 +58,7 @@ tracing-subscriber = { version = "0.3.19", features = [ # internal crates talos_suffix = { path = "../talos_suffix", version = "0.3.17" } +talos_metrics = { path = "../talos_metrics", version = "0.3.17" } talos_common_utils = { path = "../talos_common_utils", version = "0.3.17" } uuid = { version = "1.4.1", features = ["v4"] } diff --git a/packages/talos_certifier/src/lib.rs b/packages/talos_certifier/src/lib.rs index 8ec9e17b..78579fe8 100644 --- a/packages/talos_certifier/src/lib.rs +++ b/packages/talos_certifier/src/lib.rs @@ -4,7 +4,6 @@ pub mod core; pub mod errors; pub mod healthcheck; pub mod model; -pub mod otel; pub mod ports; pub mod services; pub mod talos_certifier_service; diff --git a/packages/talos_certifier_adapters/src/certifier_kafka_pg.rs b/packages/talos_certifier_adapters/src/certifier_kafka_pg.rs index f30ec0b7..405bcef2 100644 --- a/packages/talos_certifier_adapters/src/certifier_kafka_pg.rs +++ b/packages/talos_certifier_adapters/src/certifier_kafka_pg.rs @@ -5,7 +5,6 @@ use crate::{self as Adapters, KafkaConsumer}; use std::sync::{atomic::AtomicI64, Arc}; use talos_certifier::core::SystemService; use talos_certifier::model::{CandidateMessage, DecisionMessage}; -use talos_certifier::otel::init_otel_metrics; use talos_certifier::ports::DecisionStore; use talos_certifier::services::{CertifierServiceConfig, MessageReceiverServiceConfig}; @@ -16,6 +15,7 @@ use talos_certifier::{ services::{CertifierService, DecisionOutboxService, MessageReceiverService}, talos_certifier_service::{TalosCertifierService, TalosCertifierServiceBuilder}, }; +use talos_metrics::otel::init_otel_metrics; use talos_rdkafka_utils::kafka_config::KafkaConfig; use talos_suffix::core::SuffixConfig; use tokio::sync::{broadcast, mpsc}; diff --git a/packages/talos_metrics/Cargo.toml b/packages/talos_metrics/Cargo.toml index fbd33a00..0d48568c 100644 --- a/packages/talos_metrics/Cargo.toml +++ b/packages/talos_metrics/Cargo.toml @@ -13,13 +13,20 @@ description = "Metric utilities used in Talos" once_cell = { version = "1.18.0" } opentelemetry_api = { version = "0.20.0", features = ["metrics"] } opentelemetry-stdout = { version = "0.1.0", features = ["metrics"] } -opentelemetry_sdk = { version = "0.20.0", features = ["metrics", "rt-tokio"] } -opentelemetry = { version = "0.20.0", features = ["metrics"] } +opentelemetry_sdk = { version = "0.28.0", features = ["metrics", "rt-tokio"] } +opentelemetry = { version = "0.28.0", features = ["metrics"] } +opentelemetry-otlp = { version = "0.28.0", features = ["trace", "grpc-tonic"] } serde = { workspace = true } serde_json = { workspace = true } env_logger = { workspace = true } log = { workspace = true } +tracing = { version = "0.1.41", features = ["log"] } time = { workspace = true } + +# Error +thiserror = "1.0.31" +# +strum = { version = "0.24", features = ["derive"] } diff --git a/packages/talos_metrics/src/lib.rs b/packages/talos_metrics/src/lib.rs index 5fc7db15..864ed143 100644 --- a/packages/talos_metrics/src/lib.rs +++ b/packages/talos_metrics/src/lib.rs @@ -1,2 +1,3 @@ pub mod model; pub mod opentel; +pub mod otel; diff --git a/packages/talos_metrics/src/opentel/aggregation_selector.rs b/packages/talos_metrics/src/opentel/aggregation_selector.rs index b336681c..27051933 100644 --- a/packages/talos_metrics/src/opentel/aggregation_selector.rs +++ b/packages/talos_metrics/src/opentel/aggregation_selector.rs @@ -1,56 +1,46 @@ -use opentelemetry_sdk::metrics::{reader::AggregationSelector, Aggregation, InstrumentKind}; - -use super::buckets::BUCKETS_4K as BUCKETS; - -#[derive(Debug)] -pub struct CustomHistogramSelector { - buckets: Vec, -} - -impl Default for CustomHistogramSelector { - fn default() -> Self { - CustomHistogramSelector { - buckets: vec![0.0, 10.0, 100.0, 500.0, 1_000.0, 10_000.0], - } - } -} - -impl CustomHistogramSelector { - pub fn new_with_4k_buckets() -> Result { - let mut buckets: Vec = Vec::new(); - for b in BUCKETS { - buckets.push(b as f64) - } - - Ok(Self { buckets }) - } - pub fn new2(csv_buckets: &str) -> Result { - let buckets_iter = csv_buckets.split(','); - - let mut buckets: Vec = Vec::new(); - for txt in buckets_iter { - let cleaned = txt.replace(' ', ""); - let parsed = cleaned - .parse::() - .map_err(|e| format!("Cannot convert this '{}' to f64. Error: {}", cleaned, e))?; - buckets.push(parsed); - } - - Ok(Self { buckets }) - } -} - -impl AggregationSelector for CustomHistogramSelector { - fn aggregation(&self, kind: InstrumentKind) -> Aggregation { - match kind { - InstrumentKind::Counter | InstrumentKind::UpDownCounter | InstrumentKind::ObservableCounter | InstrumentKind::ObservableUpDownCounter => { - Aggregation::Sum - } - InstrumentKind::ObservableGauge => Aggregation::LastValue, - InstrumentKind::Histogram => Aggregation::ExplicitBucketHistogram { - boundaries: self.buckets.clone(), - record_min_max: true, - }, - } - } -} +// use opentelemetry_sdk::metrics::{new_view, Aggregation, Instrument, InstrumentKind, Stream}; + +// use super::buckets::BUCKETS_4K as BUCKETS; + +// #[derive(Debug, Clone)] +// pub struct CustomHistogramSelector { +// buckets: Vec, +// } + +// impl Default for CustomHistogramSelector { +// fn default() -> Self { +// CustomHistogramSelector { +// buckets: vec![0.0, 10.0, 100.0, 500.0, 1_000.0, 10_000.0], +// } +// } +// } + +// impl CustomHistogramSelector { +// pub fn new_with_4k_buckets() -> Result { +// let buckets = BUCKETS.iter().map(|&b| b as f64).collect(); +// Ok(Self { buckets }) +// } + +// pub fn new2(csv_buckets: &str) -> Result { +// let buckets = csv_buckets +// .split(',') +// .map(|txt| { +// let cleaned = txt.replace(' ', ""); +// cleaned +// .parse::() +// .map_err(|e| format!("Cannot convert this '{}' to f64. Error: {}", cleaned, e)) +// }) +// .collect::, _>>()?; + +// Ok(Self { buckets }) +// } + +// pub fn into_view(self) -> Arc Option + Send + Sync> { +// let criteria = Instrument::new().kind(InstrumentKind::Histogram); +// let mask = Stream::new().aggregation(Aggregation::ExplicitBucketHistogram { +// boundaries: self.buckets, +// record_min_max: true, +// }); +// Arc::new(new_view(criteria, mask)) +// } +// } diff --git a/packages/talos_metrics/src/opentel/mod.rs b/packages/talos_metrics/src/opentel/mod.rs index d19eb4bb..352afc02 100644 --- a/packages/talos_metrics/src/opentel/mod.rs +++ b/packages/talos_metrics/src/opentel/mod.rs @@ -1,4 +1,4 @@ -pub mod aggregation_selector; +// pub mod aggregation_selector; pub mod buckets; pub mod model; pub mod printer; diff --git a/packages/talos_certifier/src/otel.rs b/packages/talos_metrics/src/otel.rs similarity index 100% rename from packages/talos_certifier/src/otel.rs rename to packages/talos_metrics/src/otel.rs From cad5dff06a94b15ff0c1d609cf1b75d26692f4f7 Mon Sep 17 00:00:00 2001 From: Gethyl Kurian Date: Tue, 14 Apr 2026 10:08:39 +1000 Subject: [PATCH 2/9] feat: add suffix related otel metrics for messenger --- Cargo.lock | 2 ++ .../examples/messenger_using_kafka.rs | 4 +++- packages/talos_messenger_actions/Cargo.toml | 8 +++++++ .../src/messenger_with_kafka.rs | 24 +++++++++++++++++-- 4 files changed, 35 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c68a1078..15025295 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3367,6 +3367,7 @@ dependencies = [ "litemap", "log", "mockall", + "opentelemetry 0.28.0", "rand 0.8.5", "rdkafka 0.34.0", "serde", @@ -3376,6 +3377,7 @@ dependencies = [ "talos_certifier_adapters", "talos_common_utils", "talos_messenger_core", + "talos_metrics", "talos_rdkafka_utils", "talos_suffix", "thiserror 1.0.69", diff --git a/examples/messenger_using_kafka/examples/messenger_using_kafka.rs b/examples/messenger_using_kafka/examples/messenger_using_kafka.rs index ad3de29c..871b7c32 100644 --- a/examples/messenger_using_kafka/examples/messenger_using_kafka.rs +++ b/examples/messenger_using_kafka/examples/messenger_using_kafka.rs @@ -1,4 +1,4 @@ -use talos_common_utils::env_var; +use talos_common_utils::{env_var, env_var_with_defaults}; use talos_messenger_actions::messenger_with_kafka::{messenger_with_kafka, Configuration}; use talos_messenger_core::utlis::{create_whitelist_actions_from_str, ActionsParserConfig}; use talos_rdkafka_utils::kafka_config::KafkaConfig; @@ -31,6 +31,7 @@ async fn main() { }; let actions_from_env = env_var!("TALOS_MESSENGER_ACTIONS_WHITELIST"); + let otel_grpc_endpoint = env_var_with_defaults!("OTEL_EXPORTER_OTLP_ENDPOINT", Option::); let allowed_actions = create_whitelist_actions_from_str(&actions_from_env, &ActionsParserConfig::default()); let config = Configuration { @@ -40,6 +41,7 @@ async fn main() { channel_buffers: None, commit_size: Some(2_000), commit_frequency: None, + otel_grpc_endpoint, }; messenger_with_kafka(config).await.unwrap(); diff --git a/packages/talos_messenger_actions/Cargo.toml b/packages/talos_messenger_actions/Cargo.toml index 61bce27a..150e38e8 100644 --- a/packages/talos_messenger_actions/Cargo.toml +++ b/packages/talos_messenger_actions/Cargo.toml @@ -56,12 +56,20 @@ toml_parser = { workspace = true } toml_writer = { workspace = true } # *** End - Adding to fix napi build error on GH actions. + +# Otel packages +opentelemetry = { version = "0.28.0", default-features = false, features = [ + "trace", +] } + +# Talos packages talos_certifier = { path = "../talos_certifier", version = "0.3.17" } talos_suffix = { path = "../talos_suffix", version = "0.3.17" } talos_certifier_adapters = { path = "../talos_certifier_adapters", version = "0.3.17" } talos_common_utils = { path = "../talos_common_utils", version = "0.3.17" } talos_rdkafka_utils = { path = "../talos_rdkafka_utils", version = "0.3.17" } talos_messenger_core = { path = "../talos_messenger_core", version = "0.3.17" } +talos_metrics = { path = "../talos_metrics", version = "0.3.17" } [dev-dependencies] diff --git a/packages/talos_messenger_actions/src/messenger_with_kafka.rs b/packages/talos_messenger_actions/src/messenger_with_kafka.rs index 9604b41a..dc4dfe6f 100644 --- a/packages/talos_messenger_actions/src/messenger_with_kafka.rs +++ b/packages/talos_messenger_actions/src/messenger_with_kafka.rs @@ -1,6 +1,7 @@ use ahash::HashMap; use async_trait::async_trait; use log::debug; +use opentelemetry::global; use rdkafka::producer::ProducerContext; use talos_certifier::ports::{errors::MessagePublishError, MessageReciever}; use talos_certifier_adapters::KafkaConsumer; @@ -11,8 +12,12 @@ use talos_messenger_core::{ suffix::MessengerCandidate, talos_messenger_service::TalosMessengerService, }; +use talos_metrics::otel::init_otel_metrics; use talos_rdkafka_utils::kafka_config::KafkaConfig; -use talos_suffix::{core::SuffixConfig, Suffix}; +use talos_suffix::{ + core::{SuffixConfig, SuffixMetricsConfig}, + Suffix, +}; use tokio::sync::mpsc; use crate::kafka::{ @@ -110,9 +115,16 @@ pub struct Configuration { pub commit_size: Option, /// Commit issuing frequency. pub commit_frequency: Option, + /// Otel grpc endpoint + pub otel_grpc_endpoint: Option, } pub async fn messenger_with_kafka(config: Configuration) -> MessengerServiceResult { + // initialise otel + let _ = init_otel_metrics(config.otel_grpc_endpoint); + + let meter = global::meter("talos_messenger"); + let kafka_consumer = KafkaConsumer::new(&config.kafka_config); // Subscribe to topic. @@ -133,7 +145,15 @@ pub async fn messenger_with_kafka(config: Configuration) -> MessengerServiceResu let tx_feedback_channel_clone = tx_feedback_channel.clone(); // START - Inbound service - let suffix: Suffix = Suffix::with_config(config.suffix_config.unwrap_or_default(), None); + let suffix: Suffix = Suffix::with_config( + config.suffix_config.unwrap_or_default(), + Some(( + SuffixMetricsConfig { + prefix: "talos_messenger".into(), + }, + meter.clone(), + )), + ); let inbound_service_config = MessengerInboundServiceConfig::new(config.allowed_actions, config.commit_size, config.commit_frequency); let inbound_service = MessengerInboundService::new(kafka_consumer, tx_actions_channel, rx_feedback_channel, suffix, inbound_service_config); // END - Inbound service From 36ce09747479e72441af8ec022c22bb577258ad4 Mon Sep 17 00:00:00 2001 From: Gethyl Kurian Date: Tue, 14 Apr 2026 15:24:36 +1000 Subject: [PATCH 3/9] feat: add action and feedback channel capacity otel metrics for messenger --- Cargo.lock | 1 + .../src/messenger_with_kafka.rs | 9 ++++++- packages/talos_messenger_core/Cargo.toml | 5 ++++ .../src/services/inbound_service.rs | 27 +++++++++++++++++++ .../src/tests/test_utils.rs | 4 +++ 5 files changed, 45 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 15025295..f0439fcd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3404,6 +3404,7 @@ dependencies = [ "futures-util", "log", "mockall", + "opentelemetry 0.28.0", "rand 0.8.5", "rdkafka 0.34.0", "serde", diff --git a/packages/talos_messenger_actions/src/messenger_with_kafka.rs b/packages/talos_messenger_actions/src/messenger_with_kafka.rs index dc4dfe6f..2995b35f 100644 --- a/packages/talos_messenger_actions/src/messenger_with_kafka.rs +++ b/packages/talos_messenger_actions/src/messenger_with_kafka.rs @@ -155,7 +155,14 @@ pub async fn messenger_with_kafka(config: Configuration) -> MessengerServiceResu )), ); let inbound_service_config = MessengerInboundServiceConfig::new(config.allowed_actions, config.commit_size, config.commit_frequency); - let inbound_service = MessengerInboundService::new(kafka_consumer, tx_actions_channel, rx_feedback_channel, suffix, inbound_service_config); + let inbound_service = MessengerInboundService::new( + kafka_consumer, + tx_actions_channel, + rx_feedback_channel, + suffix, + inbound_service_config, + meter.clone(), + ); // END - Inbound service // START - Publish service diff --git a/packages/talos_messenger_core/Cargo.toml b/packages/talos_messenger_core/Cargo.toml index b4eb1b2d..01f8f18f 100644 --- a/packages/talos_messenger_core/Cargo.toml +++ b/packages/talos_messenger_core/Cargo.toml @@ -35,6 +35,11 @@ thiserror = { version = "1.0.31" } # Kafka rdkafka = { version = "0.34.0", features = ["sasl"] } +# Otel packages +opentelemetry = { version = "0.28.0", default-features = false, features = [ + "trace", +] } + # Time time = { version = "0.3.30" } diff --git a/packages/talos_messenger_core/src/services/inbound_service.rs b/packages/talos_messenger_core/src/services/inbound_service.rs index 11aa95dd..070d5d28 100644 --- a/packages/talos_messenger_core/src/services/inbound_service.rs +++ b/packages/talos_messenger_core/src/services/inbound_service.rs @@ -4,6 +4,7 @@ use ahash::{HashMap, HashMapExt}; use async_trait::async_trait; use log::{debug, error, info, warn}; +use opentelemetry::metrics::{Gauge, Meter}; use talos_certifier::{model::DecisionMessageTrait, ports::MessageReciever, ChannelMessage}; use talos_suffix::{core::SuffixMeta, Suffix, SuffixTrait}; use time::{format_description::well_known::Rfc3339, OffsetDateTime}; @@ -44,6 +45,22 @@ impl MessengerInboundServiceConfig { } } +pub struct MessengerInboundOtelMetrics { + /// Metric on the actions channel capacity + otel_actions_channel_capacity_gauge: Gauge, + /// Metric on the feedback channel capacity + otel_feedback_channel_capacity_gauge: Gauge, +} + +impl MessengerInboundOtelMetrics { + pub(crate) fn new(meter: &Meter) -> Self { + Self { + otel_actions_channel_capacity_gauge: meter.u64_gauge("talos_messenger_actions_channel_capacity".to_string()).build(), + otel_feedback_channel_capacity_gauge: meter.u64_gauge("talos_messenger_feedback_channel_capacity".to_string()).build(), + } + } +} + pub struct MessengerInboundService where M: MessageReciever> + Send + Sync + 'static, @@ -59,6 +76,8 @@ where last_committed_version: u64, /// The next version ready to be send for commit. next_version_to_commit: u64, + /// Otel metrics + otel_metrics: MessengerInboundOtelMetrics, } impl MessengerInboundService @@ -71,8 +90,10 @@ where rx_feedback_channel: mpsc::Receiver, suffix: Suffix, config: MessengerInboundServiceConfig, + otel_meter: Meter, ) -> Self { let commit_interval = tokio::time::interval(Duration::from_millis(config.commit_frequency_ms as u64)); + let otel_metrics = MessengerInboundOtelMetrics::new(&otel_meter); Self { message_receiver, tx_actions_channel, @@ -82,6 +103,7 @@ where commit_interval, last_committed_version: 0, next_version_to_commit: 0, + otel_metrics, } } /// Get next versions with their commit actions to process. @@ -114,6 +136,10 @@ where // Mark item as in process self.suffix.set_item_state(ver, SuffixItemState::Processing); + + self.otel_metrics + .otel_actions_channel_capacity_gauge + .record(self.tx_actions_channel.capacity() as u64, &[]); } Ok(()) @@ -224,6 +250,7 @@ where tokio::select! { // Receive feedback from publisher. Some(feedback_result) = self.rx_feedback_channel.recv() => { + self.otel_metrics.otel_feedback_channel_capacity_gauge.record(self.rx_feedback_channel.capacity() as u64, &[]); match feedback_result { MessengerChannelFeedback::Error(version, key, message_error) => { error!("Failed to process version={version} with error={message_error:?}"); diff --git a/packages/talos_messenger_core/src/tests/test_utils.rs b/packages/talos_messenger_core/src/tests/test_utils.rs index 7a48834f..11fdd521 100644 --- a/packages/talos_messenger_core/src/tests/test_utils.rs +++ b/packages/talos_messenger_core/src/tests/test_utils.rs @@ -1,6 +1,7 @@ use ahash::HashMap; use async_trait::async_trait; use log::{debug, error}; +use opentelemetry::global; use strum::{Display, EnumString}; use talos_certifier::{ errors::SystemServiceError, @@ -271,12 +272,15 @@ impl MessengerServiceTester Date: Wed, 15 Apr 2026 10:56:18 +1000 Subject: [PATCH 4/9] feat: capture deserialise and publish message related error counter otel metrics for messenger --- .../src/kafka/service.rs | 48 ++++++++++++++++++- .../src/messenger_with_kafka.rs | 15 +----- .../src/services/inbound_service.rs | 4 +- .../src/tests/test_utils.rs | 2 +- 4 files changed, 52 insertions(+), 17 deletions(-) diff --git a/packages/talos_messenger_actions/src/kafka/service.rs b/packages/talos_messenger_actions/src/kafka/service.rs index b823c974..615cb745 100644 --- a/packages/talos_messenger_actions/src/kafka/service.rs +++ b/packages/talos_messenger_actions/src/kafka/service.rs @@ -3,6 +3,10 @@ use std::sync::Arc; use async_trait::async_trait; use futures_util::future::join_all; use log::{debug, error, info}; +use opentelemetry::{ + metrics::{Counter, Meter}, + KeyValue, +}; use time::{format_description::well_known::Rfc3339, OffsetDateTime}; use tokio::sync::mpsc; @@ -15,11 +19,48 @@ use talos_messenger_core::{ use super::models::KafkaAction; +pub const METRIC_ERROR_TYPES: &str = "error_type"; + +#[derive(Debug)] +pub struct KafkaActionServiceOtelMetrics { + /// Metric counter on the actions errors + otel_actions_error_counters: Counter, +} + +impl KafkaActionServiceOtelMetrics { + pub(crate) fn new(meter: &Meter) -> Self { + Self { + otel_actions_error_counters: meter.u64_counter("talos_messenger_errors".to_string()).build(), + } + } +} + #[derive(Debug)] pub struct KafkaActionService + Send + Sync + 'static> { pub publisher: Arc, pub rx_actions_channel: mpsc::Receiver, pub tx_feedback_channel: mpsc::Sender, + pub otel_metrics: KafkaActionServiceOtelMetrics, +} + +impl KafkaActionService +where + M: MessengerPublisher + Send + Sync, +{ + pub fn new( + publisher: Arc, + rx_actions_channel: mpsc::Receiver, + tx_feedback_channel: mpsc::Sender, + otel_meter: &Meter, + ) -> Self { + let otel_metrics = KafkaActionServiceOtelMetrics::new(otel_meter); + Self { + publisher, + rx_actions_channel, + tx_feedback_channel, + otel_metrics, + } + } } #[async_trait] @@ -51,9 +92,11 @@ where let timestamp = OffsetDateTime::now_utc().format(&Rfc3339).ok().unwrap(); headers.insert(MessengerStateTransitionTimestamps::EndOnCommitActions.to_string(), timestamp); + let otel_actions_error_counters = self.otel_metrics.otel_actions_error_counters.clone(); async move { if let Err(publish_error) = publisher.send(version, action, headers, total_len).await { - error!("Failed to publish message for version={version} with error {publish_error}") + error!("Failed to publish message for version={version} with error {publish_error}"); + otel_actions_error_counters.add(1, &[KeyValue::new(METRIC_ERROR_TYPES, "publish_message")]); } } }); @@ -66,6 +109,9 @@ where err.data, err.reason ); + self.otel_metrics + .otel_actions_error_counters + .add(1, &[KeyValue::new(METRIC_ERROR_TYPES, "deserialise_message")]); } } } diff --git a/packages/talos_messenger_actions/src/messenger_with_kafka.rs b/packages/talos_messenger_actions/src/messenger_with_kafka.rs index 2995b35f..ff73654a 100644 --- a/packages/talos_messenger_actions/src/messenger_with_kafka.rs +++ b/packages/talos_messenger_actions/src/messenger_with_kafka.rs @@ -155,14 +155,7 @@ pub async fn messenger_with_kafka(config: Configuration) -> MessengerServiceResu )), ); let inbound_service_config = MessengerInboundServiceConfig::new(config.allowed_actions, config.commit_size, config.commit_frequency); - let inbound_service = MessengerInboundService::new( - kafka_consumer, - tx_actions_channel, - rx_feedback_channel, - suffix, - inbound_service_config, - meter.clone(), - ); + let inbound_service = MessengerInboundService::new(kafka_consumer, tx_actions_channel, rx_feedback_channel, suffix, inbound_service_config, &meter); // END - Inbound service // START - Publish service @@ -172,11 +165,7 @@ pub async fn messenger_with_kafka(config: Configuration) -> MessengerServiceResu let kafka_producer = KafkaProducer::with_context(&config.kafka_config, custom_context); let messenger_kafka_publisher = MessengerKafkaPublisher { publisher: kafka_producer }; - let publish_service = KafkaActionService { - publisher: messenger_kafka_publisher.into(), - rx_actions_channel, - tx_feedback_channel, - }; + let publish_service = KafkaActionService::new(messenger_kafka_publisher.into(), rx_actions_channel, tx_feedback_channel, &meter); // END - Publish service let messenger_service = TalosMessengerService { diff --git a/packages/talos_messenger_core/src/services/inbound_service.rs b/packages/talos_messenger_core/src/services/inbound_service.rs index 070d5d28..a7e92ee7 100644 --- a/packages/talos_messenger_core/src/services/inbound_service.rs +++ b/packages/talos_messenger_core/src/services/inbound_service.rs @@ -90,10 +90,10 @@ where rx_feedback_channel: mpsc::Receiver, suffix: Suffix, config: MessengerInboundServiceConfig, - otel_meter: Meter, + otel_meter: &Meter, ) -> Self { let commit_interval = tokio::time::interval(Duration::from_millis(config.commit_frequency_ms as u64)); - let otel_metrics = MessengerInboundOtelMetrics::new(&otel_meter); + let otel_metrics = MessengerInboundOtelMetrics::new(otel_meter); Self { message_receiver, tx_actions_channel, diff --git a/packages/talos_messenger_core/src/tests/test_utils.rs b/packages/talos_messenger_core/src/tests/test_utils.rs index 11fdd521..7899296f 100644 --- a/packages/talos_messenger_core/src/tests/test_utils.rs +++ b/packages/talos_messenger_core/src/tests/test_utils.rs @@ -280,7 +280,7 @@ impl MessengerServiceTester Date: Thu, 16 Apr 2026 10:43:09 +1000 Subject: [PATCH 5/9] feat: capture counter for completed items with reason as dimension for otel metrics for messenger --- .../src/services/inbound_service.rs | 36 +++++++++++++------ packages/talos_messenger_core/src/suffix.rs | 2 +- 2 files changed, 26 insertions(+), 12 deletions(-) diff --git a/packages/talos_messenger_core/src/services/inbound_service.rs b/packages/talos_messenger_core/src/services/inbound_service.rs index a7e92ee7..3ace180a 100644 --- a/packages/talos_messenger_core/src/services/inbound_service.rs +++ b/packages/talos_messenger_core/src/services/inbound_service.rs @@ -4,7 +4,10 @@ use ahash::{HashMap, HashMapExt}; use async_trait::async_trait; use log::{debug, error, info, warn}; -use opentelemetry::metrics::{Gauge, Meter}; +use opentelemetry::{ + metrics::{Counter, Gauge, Meter}, + KeyValue, +}; use talos_certifier::{model::DecisionMessageTrait, ports::MessageReciever, ChannelMessage}; use talos_suffix::{core::SuffixMeta, Suffix, SuffixTrait}; use time::{format_description::well_known::Rfc3339, OffsetDateTime}; @@ -23,6 +26,8 @@ use crate::{ utlis::get_allowed_commit_actions, }; +pub const METRIC_COMPLETION_REASON: &str = "completion_reason"; + #[derive(Debug)] pub struct MessengerInboundServiceConfig { /// commit size decides when the offsets can be committed. @@ -50,6 +55,8 @@ pub struct MessengerInboundOtelMetrics { otel_actions_channel_capacity_gauge: Gauge, /// Metric on the feedback channel capacity otel_feedback_channel_capacity_gauge: Gauge, + /// Metric on candidates processed + otel_candidates_processed: Counter, } impl MessengerInboundOtelMetrics { @@ -57,6 +64,7 @@ impl MessengerInboundOtelMetrics { Self { otel_actions_channel_capacity_gauge: meter.u64_gauge("talos_messenger_actions_channel_capacity".to_string()).build(), otel_feedback_channel_capacity_gauge: meter.u64_gauge("talos_messenger_feedback_channel_capacity".to_string()).build(), + otel_candidates_processed: meter.u64_counter("talos_messenger_candidates_processed".to_string()).build(), } } } @@ -162,6 +170,13 @@ where } } + fn mark_suffix_item_complete(&mut self, version: u64, complete_reason: SuffixItemCompleteStateReason) { + self.otel_metrics + .otel_candidates_processed + .add(1, &[KeyValue::new(METRIC_COMPLETION_REASON, complete_reason.to_string())]); + self.suffix.set_item_state(version, SuffixItemState::Complete(complete_reason)); + } + pub(crate) fn suffix_pruning(&mut self) { // Check prune eligibility by looking at the prune meta info. if let Some(index_to_prune) = self.suffix.get_safe_prune_index() { @@ -180,9 +195,7 @@ where pub(crate) fn check_and_update_all_actions_complete(&mut self, version: u64, reason: SuffixItemCompleteStateReason) { match self.suffix.are_all_actions_complete_for_version(version) { Ok(is_completed) if is_completed => { - self.suffix.set_item_state(version, SuffixItemState::Complete(reason)); - - // self.all_completed_versions.push(version); + self.mark_suffix_item_complete(version, reason); if let Some((_, new_prune_version)) = self.suffix.update_prune_index_from_version(version) { self.update_commit_offset(new_prune_version); @@ -214,7 +227,6 @@ where Some(SuffixItemState::Complete(..)) => { self.suffix .set_item_state(version, SuffixItemState::Complete(SuffixItemCompleteStateReason::ErrorProcessing)); - self.suffix.increment_item_action_count(version, action_key); self.check_and_update_all_actions_complete(version, SuffixItemCompleteStateReason::ErrorProcessing); debug!( @@ -284,14 +296,13 @@ where let filter_actions = get_allowed_commit_actions(commit_actions, &self.config.allowed_actions); if filter_actions.is_empty() { // There are on_commit actions, but not the ones required by messenger - item_to_update.item.set_state(SuffixItemState::Complete(SuffixItemCompleteStateReason::NoRelavantCommitActions)); + self.mark_suffix_item_complete(version, SuffixItemCompleteStateReason::NoRelavantCommitActions); } else { item_to_update.item.set_commit_action(filter_actions); } } else { // No on_commit actions - item_to_update.item.set_state(SuffixItemState::Complete(SuffixItemCompleteStateReason::NoCommitActions)); - + self.mark_suffix_item_complete(version, SuffixItemCompleteStateReason::NoCommitActions); } }; @@ -309,6 +320,11 @@ where // TODO: GK - no hardcoded filters on headers let headers: HashMap = decision.headers.into_iter().filter(|(key, _)| key.as_str() != "messageType").collect(); self.suffix.update_item_decision(version, decision.decision_version, &decision.message, headers); + if decision.message.is_abort(){ + self.otel_metrics + .otel_candidates_processed + .add(1, &[KeyValue::new(METRIC_COMPLETION_REASON, SuffixItemCompleteStateReason::Aborted.to_string())]); + } // Look for any early `Complete(..)` state, and update the `prune_index` and `commit_offset`. if let Ok(Some(suffix_item)) = self.suffix.get(version){ @@ -334,9 +350,7 @@ where Err(error) => { // Catch the error propogated, and if it has a version, mark the item as completed. if let Some(version) = error.version { - if let Some(item_to_update) = self.suffix.get_mut(version){ - item_to_update.item.set_state(SuffixItemState::Complete(SuffixItemCompleteStateReason::ErrorProcessing)); - } + self.mark_suffix_item_complete(version, SuffixItemCompleteStateReason::ErrorProcessing); } error!("error consuming message....{:?}", error); }, diff --git a/packages/talos_messenger_core/src/suffix.rs b/packages/talos_messenger_core/src/suffix.rs index 8e50789d..7add00f1 100644 --- a/packages/talos_messenger_core/src/suffix.rs +++ b/packages/talos_messenger_core/src/suffix.rs @@ -137,7 +137,7 @@ impl From for MessengerStateTransitionTimestamps { } } -#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)] +#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq, Display)] pub enum SuffixItemCompleteStateReason { /// When the decision is an abort Aborted, From d0cc682e03731dc6b570fc30722c885d4424b1f3 Mon Sep 17 00:00:00 2001 From: Gethyl Kurian Date: Tue, 21 Apr 2026 12:11:51 +1000 Subject: [PATCH 6/9] chore(metrics): capture duration between various points in histogram for messenger --- .../talos_messenger_actions/src/kafka/mod.rs | 1 + .../src/kafka/service.rs | 94 ++++++++++++++++++- .../src/kafka/utils.rs | 78 +++++++++++++++ 3 files changed, 170 insertions(+), 3 deletions(-) create mode 100644 packages/talos_messenger_actions/src/kafka/utils.rs diff --git a/packages/talos_messenger_actions/src/kafka/mod.rs b/packages/talos_messenger_actions/src/kafka/mod.rs index 1f5827d2..5ee10392 100644 --- a/packages/talos_messenger_actions/src/kafka/mod.rs +++ b/packages/talos_messenger_actions/src/kafka/mod.rs @@ -2,3 +2,4 @@ pub mod context; pub mod models; pub mod producer; pub mod service; +mod utils; diff --git a/packages/talos_messenger_actions/src/kafka/service.rs b/packages/talos_messenger_actions/src/kafka/service.rs index 615cb745..11bfc711 100644 --- a/packages/talos_messenger_actions/src/kafka/service.rs +++ b/packages/talos_messenger_actions/src/kafka/service.rs @@ -1,10 +1,11 @@ use std::sync::Arc; +use ahash::HashMap; use async_trait::async_trait; use futures_util::future::join_all; use log::{debug, error, info}; use opentelemetry::{ - metrics::{Counter, Meter}, + metrics::{Counter, Histogram, Meter}, KeyValue, }; use time::{format_description::well_known::Rfc3339, OffsetDateTime}; @@ -17,6 +18,8 @@ use talos_messenger_core::{ utlis::get_actions_deserialised, }; +use crate::kafka::utils::{parse_header_timestamps, time_diff_ms}; + use super::models::KafkaAction; pub const METRIC_ERROR_TYPES: &str = "error_type"; @@ -25,12 +28,90 @@ pub const METRIC_ERROR_TYPES: &str = "error_type"; pub struct KafkaActionServiceOtelMetrics { /// Metric counter on the actions errors otel_actions_error_counters: Counter, + /// Metric histogram candidate received in messenger to publish on_commit actions + otel_candidate_received_to_publish_action_histogram: Histogram, + /// Metric histogram decision received in messenger to publish on_commit actions + otel_decision_received_to_publish_action_histogram: Histogram, + /// Metric histogram decision received in messenger to pick on_commit actions to be published + /// A decision received doesn't automatically mean the actions will be published if there are messages + /// prior to it with dependency waiting to be published. + otel_decision_received_to_pick_action_histogram: Histogram, + /// Metric histogram for on_commit action picked to when it was published. + otel_action_picked_published_histogram: Histogram, + /// Metric histogram for duration from initiator to action published. + otel_initiator_publish_action_histogram: Histogram, } impl KafkaActionServiceOtelMetrics { pub(crate) fn new(meter: &Meter) -> Self { Self { otel_actions_error_counters: meter.u64_counter("talos_messenger_errors".to_string()).build(), + otel_candidate_received_to_publish_action_histogram: meter + .f64_histogram("talos_messenger_candidate_received_publish_action".to_string()) + .with_description("Duration in talos messenger from when the candidate was received to when the on_commit action was published") + .with_unit("ms") + .build(), + otel_decision_received_to_publish_action_histogram: meter + .f64_histogram("talos_messenger_decision_received_publish_action".to_string()) + .with_description("Duration in talos messenger from when the decision was received to when the on_commit action was published") + .with_unit("ms") + .build(), + otel_decision_received_to_pick_action_histogram: meter + .f64_histogram("talos_messenger_decision_received_pick_action".to_string()) + .with_description("Duration in talos messenger from when the decision was received to when the on_commit action was picked") + .with_unit("ms") + .build(), + otel_action_picked_published_histogram: meter + .f64_histogram("talos_messenger_action_picked_published".to_string()) + .with_description("Duration in talos messenger from action picked to process till published the on_commit action") + .with_unit("ms") + .build(), + otel_initiator_publish_action_histogram: meter + .f64_histogram("talos_initiator_messenger_publish_action".to_string()) + .with_description("Duration from candidate created in initiator to talos messenger published the on_commit action") + .with_unit("ms") + .build(), + } + } + + pub(crate) fn record_metrics_from_kafka_header(&mut self, headers: &HashMap) { + let end_on_commit_time = OffsetDateTime::now_utc(); + + let candidate_received_time = headers + .get(&MessengerStateTransitionTimestamps::CandidateReceived.to_string()) + .and_then(|t| parse_header_timestamps(t)); + let decision_received_time = headers + .get(&MessengerStateTransitionTimestamps::DecisionReceived.to_string()) + .and_then(|t| parse_header_timestamps(t)); + let start_on_commit_actions_time = headers + .get(&MessengerStateTransitionTimestamps::StartOnCommitActions.to_string()) + .and_then(|t| parse_header_timestamps(t)); + let initiator_candidate_created_time = headers.get("cohortCandidateCreatedAtTimestamp").and_then(|t| parse_header_timestamps(t)); + // Publish the following metrics + // 1. Candidate received to Messenger on_commit publish + if let Some(candidate_received_time) = candidate_received_time { + self.otel_candidate_received_to_publish_action_histogram + .record(time_diff_ms(candidate_received_time, end_on_commit_time) as f64, &[]); + } + // 2. Decision received to Messenger on_commit publish + if let Some(decision_received_time) = decision_received_time { + self.otel_decision_received_to_publish_action_histogram + .record(time_diff_ms(decision_received_time, end_on_commit_time) as f64, &[]); + } + // 3. Decision received to Messenger picked on_commit messages + if let (Some(decision_received_time), Some(on_commit_actions_picked_time)) = (decision_received_time, start_on_commit_actions_time) { + self.otel_decision_received_to_pick_action_histogram + .record(time_diff_ms(decision_received_time, on_commit_actions_picked_time) as f64, &[]); + } + // 4. on_commit message picked to published - (Picked in Inbound Service and passed to Action Service from where it will be published) + if let Some(start_on_commit_actions_time) = start_on_commit_actions_time { + self.otel_action_picked_published_histogram + .record(time_diff_ms(start_on_commit_actions_time, end_on_commit_time) as f64, &[]); + } + // 5. Initiator to on_commit message published + if let Some(initiator_candidate_created_time) = initiator_candidate_created_time { + self.otel_initiator_publish_action_histogram + .record(time_diff_ms(initiator_candidate_created_time, end_on_commit_time) as f64, &[]); } } } @@ -89,10 +170,17 @@ where let publish_vec = actions.into_iter().map(|action| { let publisher = self.publisher.clone(); let mut headers = headers_cloned.clone(); - let timestamp = OffsetDateTime::now_utc().format(&Rfc3339).ok().unwrap(); + if let Ok(end_on_commit_timestamp_string) = OffsetDateTime::now_utc().format(&Rfc3339) { + headers.insert( + MessengerStateTransitionTimestamps::EndOnCommitActions.to_string(), + end_on_commit_timestamp_string, + ); + } + // Record the otel metrics for durations at between various points + self.otel_metrics.record_metrics_from_kafka_header(&headers); - headers.insert(MessengerStateTransitionTimestamps::EndOnCommitActions.to_string(), timestamp); let otel_actions_error_counters = self.otel_metrics.otel_actions_error_counters.clone(); + async move { if let Err(publish_error) = publisher.send(version, action, headers, total_len).await { error!("Failed to publish message for version={version} with error {publish_error}"); diff --git a/packages/talos_messenger_actions/src/kafka/utils.rs b/packages/talos_messenger_actions/src/kafka/utils.rs new file mode 100644 index 00000000..a7d426e1 --- /dev/null +++ b/packages/talos_messenger_actions/src/kafka/utils.rs @@ -0,0 +1,78 @@ +use time::{format_description::well_known::Rfc3339, OffsetDateTime}; + +pub(crate) fn time_diff_ms(start: OffsetDateTime, end: OffsetDateTime) -> i128 { + let diff = end - start; + diff.whole_microseconds() / 1_000 +} + +pub(crate) fn parse_header_timestamps(timestamp: &str) -> Option { + let timestamp = timestamp.trim(); + + // Validate empty timestamp + if timestamp.is_empty() { + return None; + } + // Parse timestamp in RFC3339 date string (eg. 2026-04-19T20:53:11.407Z) + if let Ok(ts) = OffsetDateTime::parse(timestamp, &Rfc3339) { + return Some(ts); + } + // Parse timestamp. If the length is less than 13 characters, it doesn't have milliseconds. + if timestamp.len() == 13 { + // Parse timestamp in epoch number (eg. 1776631991407) + if let Ok(ts) = timestamp.parse::() { + return OffsetDateTime::from_unix_timestamp_nanos(ts as i128 * 1_000_000).ok(); + } + } + + None +} + +#[cfg(test)] +mod tests { + use time::{format_description::well_known::Rfc3339, OffsetDateTime}; + + use crate::kafka::utils::parse_header_timestamps; + + // Test for valid date string + #[test] + fn test_fn_handle_valid_date_string() { + let date = "2026-04-19T20:53:11.407Z"; + let expected = OffsetDateTime::parse(date, &Rfc3339).unwrap(); + + assert!(parse_header_timestamps(date).is_some()); + assert_eq!(parse_header_timestamps(date).unwrap(), expected); + } + // Test for invalid date string + #[test] + fn test_fn_handle_incorrect_date_string() { + let date = "2026-049T20:53:11.407Z"; + + assert!(parse_header_timestamps(date).is_none()); + } + + // Test for valid epoch date + #[test] + fn test_fn_handle_valid_epoch_date() { + let date = 1776631991407_i128; + let date_string = date.to_string(); + let expected = OffsetDateTime::from_unix_timestamp_nanos(date * 1_000_000).unwrap(); + + let result = parse_header_timestamps(&date_string); + assert!(result.is_some()); + assert_eq!(result.unwrap(), expected); + } + // Test for invalid epoch date + #[test] + fn test_fn_handle_incorrect_epoch_date() { + let date = "1234567"; + + assert!(parse_header_timestamps(date).is_none()); + } + // Tets for empty string + #[test] + fn test_fn_handle_empty_string_input_date() { + let date = ""; + + assert!(parse_header_timestamps(date).is_none()); + } +} From 14314a140124f75420bec68d1b551b6b6e0703cf Mon Sep 17 00:00:00 2001 From: Gethyl Kurian Date: Thu, 23 Apr 2026 13:05:51 +1000 Subject: [PATCH 7/9] chore(metrics): capture messenger internal metrics in microseconds precision --- .../src/kafka/service.rs | 22 ++++++++++--------- .../src/kafka/utils.rs | 5 +++-- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/packages/talos_messenger_actions/src/kafka/service.rs b/packages/talos_messenger_actions/src/kafka/service.rs index 11bfc711..36a5704d 100644 --- a/packages/talos_messenger_actions/src/kafka/service.rs +++ b/packages/talos_messenger_actions/src/kafka/service.rs @@ -18,7 +18,7 @@ use talos_messenger_core::{ utlis::get_actions_deserialised, }; -use crate::kafka::utils::{parse_header_timestamps, time_diff_ms}; +use crate::kafka::utils::{parse_header_timestamps, time_diff_micoseconds}; use super::models::KafkaAction; @@ -54,17 +54,17 @@ impl KafkaActionServiceOtelMetrics { otel_decision_received_to_publish_action_histogram: meter .f64_histogram("talos_messenger_decision_received_publish_action".to_string()) .with_description("Duration in talos messenger from when the decision was received to when the on_commit action was published") - .with_unit("ms") + .with_unit("us") .build(), otel_decision_received_to_pick_action_histogram: meter .f64_histogram("talos_messenger_decision_received_pick_action".to_string()) .with_description("Duration in talos messenger from when the decision was received to when the on_commit action was picked") - .with_unit("ms") + .with_unit("us") .build(), otel_action_picked_published_histogram: meter .f64_histogram("talos_messenger_action_picked_published".to_string()) .with_description("Duration in talos messenger from action picked to process till published the on_commit action") - .with_unit("ms") + .with_unit("us") .build(), otel_initiator_publish_action_histogram: meter .f64_histogram("talos_initiator_messenger_publish_action".to_string()) @@ -91,27 +91,29 @@ impl KafkaActionServiceOtelMetrics { // 1. Candidate received to Messenger on_commit publish if let Some(candidate_received_time) = candidate_received_time { self.otel_candidate_received_to_publish_action_histogram - .record(time_diff_ms(candidate_received_time, end_on_commit_time) as f64, &[]); + .record(time_diff_micoseconds(candidate_received_time, end_on_commit_time) as f64 / 1_000_f64, &[]); } // 2. Decision received to Messenger on_commit publish if let Some(decision_received_time) = decision_received_time { self.otel_decision_received_to_publish_action_histogram - .record(time_diff_ms(decision_received_time, end_on_commit_time) as f64, &[]); + .record(time_diff_micoseconds(decision_received_time, end_on_commit_time) as f64, &[]); } // 3. Decision received to Messenger picked on_commit messages if let (Some(decision_received_time), Some(on_commit_actions_picked_time)) = (decision_received_time, start_on_commit_actions_time) { self.otel_decision_received_to_pick_action_histogram - .record(time_diff_ms(decision_received_time, on_commit_actions_picked_time) as f64, &[]); + .record(time_diff_micoseconds(decision_received_time, on_commit_actions_picked_time) as f64, &[]); } // 4. on_commit message picked to published - (Picked in Inbound Service and passed to Action Service from where it will be published) if let Some(start_on_commit_actions_time) = start_on_commit_actions_time { self.otel_action_picked_published_histogram - .record(time_diff_ms(start_on_commit_actions_time, end_on_commit_time) as f64, &[]); + .record(time_diff_micoseconds(start_on_commit_actions_time, end_on_commit_time) as f64, &[]); } // 5. Initiator to on_commit message published if let Some(initiator_candidate_created_time) = initiator_candidate_created_time { - self.otel_initiator_publish_action_histogram - .record(time_diff_ms(initiator_candidate_created_time, end_on_commit_time) as f64, &[]); + self.otel_initiator_publish_action_histogram.record( + time_diff_micoseconds(initiator_candidate_created_time, end_on_commit_time) as f64 / 1_000_f64, + &[], + ); } } } diff --git a/packages/talos_messenger_actions/src/kafka/utils.rs b/packages/talos_messenger_actions/src/kafka/utils.rs index a7d426e1..03d67e3d 100644 --- a/packages/talos_messenger_actions/src/kafka/utils.rs +++ b/packages/talos_messenger_actions/src/kafka/utils.rs @@ -1,8 +1,9 @@ use time::{format_description::well_known::Rfc3339, OffsetDateTime}; -pub(crate) fn time_diff_ms(start: OffsetDateTime, end: OffsetDateTime) -> i128 { +/// Get the time difference in micro-seconds +pub(crate) fn time_diff_micoseconds(start: OffsetDateTime, end: OffsetDateTime) -> i128 { let diff = end - start; - diff.whole_microseconds() / 1_000 + diff.whole_microseconds() } pub(crate) fn parse_header_timestamps(timestamp: &str) -> Option { From 4fe97356bd9693ee53d52ce21a9414f3ca10d3d2 Mon Sep 17 00:00:00 2001 From: Gethyl Kurian Date: Tue, 28 Apr 2026 10:34:39 +1000 Subject: [PATCH 8/9] chore: ignore codecoverage for otel struct --- packages/talos_messenger_actions/src/kafka/service.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/packages/talos_messenger_actions/src/kafka/service.rs b/packages/talos_messenger_actions/src/kafka/service.rs index 36a5704d..9e5e9055 100644 --- a/packages/talos_messenger_actions/src/kafka/service.rs +++ b/packages/talos_messenger_actions/src/kafka/service.rs @@ -24,6 +24,7 @@ use super::models::KafkaAction; pub const METRIC_ERROR_TYPES: &str = "error_type"; +// $coverage:ignore-start #[derive(Debug)] pub struct KafkaActionServiceOtelMetrics { /// Metric counter on the actions errors @@ -117,6 +118,7 @@ impl KafkaActionServiceOtelMetrics { } } } +// $coverage:ignore-end #[derive(Debug)] pub struct KafkaActionService + Send + Sync + 'static> { From de56816d5d871ee30e9115f3e1e304d30c110ddc Mon Sep 17 00:00:00 2001 From: Gethyl Kurian Date: Tue, 28 Apr 2026 11:57:52 +1000 Subject: [PATCH 9/9] test: unit test for messenger kafka action service metrics struct --- Cargo.lock | 1 + packages/talos_messenger_actions/Cargo.toml | 1 + .../src/kafka/service.rs | 124 +++++++++++++++++- 3 files changed, 124 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f0439fcd..79a562d7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3368,6 +3368,7 @@ dependencies = [ "log", "mockall", "opentelemetry 0.28.0", + "opentelemetry_sdk 0.28.0", "rand 0.8.5", "rdkafka 0.34.0", "serde", diff --git a/packages/talos_messenger_actions/Cargo.toml b/packages/talos_messenger_actions/Cargo.toml index 150e38e8..18b2890a 100644 --- a/packages/talos_messenger_actions/Cargo.toml +++ b/packages/talos_messenger_actions/Cargo.toml @@ -76,3 +76,4 @@ talos_metrics = { path = "../talos_metrics", version = "0.3.17" } mockall = { version = "0.11.3" } tokio-test = { version = "0.4.2" } rand = { version = "0.8.5" } +opentelemetry_sdk = { version = "0.28.0", features = ["metrics", "rt-tokio"] } diff --git a/packages/talos_messenger_actions/src/kafka/service.rs b/packages/talos_messenger_actions/src/kafka/service.rs index 9e5e9055..db3f93ca 100644 --- a/packages/talos_messenger_actions/src/kafka/service.rs +++ b/packages/talos_messenger_actions/src/kafka/service.rs @@ -24,7 +24,6 @@ use super::models::KafkaAction; pub const METRIC_ERROR_TYPES: &str = "error_type"; -// $coverage:ignore-start #[derive(Debug)] pub struct KafkaActionServiceOtelMetrics { /// Metric counter on the actions errors @@ -118,7 +117,6 @@ impl KafkaActionServiceOtelMetrics { } } } -// $coverage:ignore-end #[derive(Debug)] pub struct KafkaActionService + Send + Sync + 'static> { @@ -235,3 +233,125 @@ where todo!() } } + +#[cfg(test)] +mod tests { + use ahash::AHashMap; + use ahash::HashMap; + use async_trait::async_trait; + use opentelemetry::metrics::Meter; + use opentelemetry::metrics::MeterProvider; + use opentelemetry_sdk::error::OTelSdkResult; + use opentelemetry_sdk::metrics::data::ResourceMetrics; + use opentelemetry_sdk::metrics::exporter::PushMetricExporter; + use opentelemetry_sdk::metrics::{SdkMeterProvider, Temporality}; + use std::sync::{Arc, Mutex}; + use talos_messenger_core::suffix::MessengerStateTransitionTimestamps; + + use crate::kafka::service::KafkaActionServiceOtelMetrics; + + #[derive(Debug, Clone, Default)] + struct TestExporter { + metrics: Arc>>, // store metric names + } + + #[async_trait] + impl PushMetricExporter for TestExporter { + async fn export(&self, metrics: &mut ResourceMetrics) -> OTelSdkResult { + for scope in &metrics.scope_metrics { + for metric in &scope.metrics { + self.metrics.lock().unwrap().push(metric.name.to_string()); + } + } + Ok(()) + } + + async fn force_flush(&self) -> OTelSdkResult { + Ok(()) + } + + fn shutdown(&self) -> OTelSdkResult { + Ok(()) + } + + fn temporality(&self) -> Temporality { + Temporality::Cumulative + } + } + + fn setup_meter() -> (Meter, TestExporter, SdkMeterProvider) { + let exporter = TestExporter::default(); + let provider = SdkMeterProvider::builder().with_periodic_exporter(exporter.clone()).build(); + let meter = provider.meter("test"); + (meter, exporter, provider) + } + + fn base_headers() -> HashMap { + HashMap::from_iter([ + ( + MessengerStateTransitionTimestamps::CandidateReceived.to_string(), + "2026-04-19T20:53:11.000Z".to_string(), + ), + ( + MessengerStateTransitionTimestamps::DecisionReceived.to_string(), + "2026-04-19T20:53:11.100Z".to_string(), + ), + ( + MessengerStateTransitionTimestamps::StartOnCommitActions.to_string(), + "2026-04-19T20:53:11.200Z".to_string(), + ), + ("cohortCandidateCreatedAtTimestamp".to_string(), "2026-04-19T20:53:10.000Z".to_string()), + ]) + } + + #[tokio::test] + async fn test_metric_struct_all_headers_present() { + let (meter, exporter, provider) = setup_meter(); + let mut metric = KafkaActionServiceOtelMetrics::new(&meter); + + metric.record_metrics_from_kafka_header(&base_headers()); + + // Before flush there are no metrics available + let recorded = exporter.metrics.lock().unwrap(); + assert_eq!(recorded.len(), 0); + drop(recorded); + + // Flush the metrics recorded + provider.force_flush().unwrap(); + + // After flush, all the metrics are available + let recorded = exporter.metrics.lock().unwrap(); + assert_eq!(recorded.len(), 5); + assert!(recorded.contains(&"talos_messenger_candidate_received_publish_action".to_string())); + assert!(recorded.contains(&"talos_messenger_decision_received_publish_action".to_string())); + assert!(recorded.contains(&"talos_messenger_decision_received_pick_action".to_string())); + assert!(recorded.contains(&"talos_messenger_action_picked_published".to_string())); + assert!(recorded.contains(&"talos_initiator_messenger_publish_action".to_string())); + } + + #[tokio::test] + async fn test_empty_headers_records_nothing() { + let (meter, exporter, provider) = setup_meter(); + let mut metric = KafkaActionServiceOtelMetrics::new(&meter); + + metric.record_metrics_from_kafka_header(&AHashMap::new()); + provider.force_flush().unwrap(); + + let finished = exporter.metrics.lock().unwrap(); + assert!(finished.is_empty()); + } + + #[tokio::test] + async fn test_missing_candidate_received_skips_histogram_1() { + let (meter, exporter, provider) = setup_meter(); + let mut metric = KafkaActionServiceOtelMetrics::new(&meter); + let mut headers = base_headers(); + headers.remove(&MessengerStateTransitionTimestamps::CandidateReceived.to_string()); + + metric.record_metrics_from_kafka_header(&headers); + provider.force_flush().unwrap(); + + let recorded = exporter.metrics.lock().unwrap(); + assert!(!recorded.contains(&"talos_messenger_candidate_received_publish_action".to_string())); + } +}