From 131d8a223f44d7857caf5ab3be81f0c5d917ddc1 Mon Sep 17 00:00:00 2001 From: Jeremy Cook Date: Thu, 21 May 2026 12:15:35 -0700 Subject: [PATCH] Add payload size metrics Record logical workflow and activity payload sizes alongside client-side gRPC message body sizes so users can detect payload growth before hitting transport limits. This adds workflow_payload_size metrics for workflow activation inputs and successful completion results, activity_payload_size metrics for activity inputs and successful results, and rpc_message_size metrics for client gRPC request and response bodies. The metrics use byte units, shared payload-size histogram buckets that cover the 4 MiB boundary, and low-cardinality message_direction labels for request and response series. Payload metric names and message_direction labels live in common so client and core emitters use the same names and cardinality values. The client transport wrapper records streamed DATA frame bytes without changing the body, while workflow and activity metrics count payload data plus metadata key/value bytes. Tests cover payload byte accounting, Prometheus series labels/counts for activity and workflow payload metrics, request and response RPC message-size series, and non-zero activity payload sums. --- crates/client/src/lib.rs | 5 +- crates/client/src/metrics.rs | 110 +++++++++++++- crates/common/src/telemetry/metrics.rs | 52 ++++++- crates/common/src/telemetry/otel.rs | 21 ++- crates/sdk-core/src/telemetry/metrics.rs | 32 +++- crates/sdk-core/src/worker/activities.rs | 79 ++++++++-- crates/sdk-core/src/worker/workflow/mod.rs | 55 ++++++- .../tests/integ_tests/metrics_tests.rs | 142 +++++++++++++++++- 8 files changed, 461 insertions(+), 35 deletions(-) diff --git a/crates/client/src/lib.rs b/crates/client/src/lib.rs index 1f1f6ef05..83a9661a8 100644 --- a/crates/client/src/lib.rs +++ b/crates/client/src/lib.rs @@ -36,7 +36,10 @@ pub use async_activity_handle::{ ActivityHeartbeatResponse, ActivityIdentifier, AsyncActivityHandle, }; -pub use metrics::{LONG_REQUEST_LATENCY_HISTOGRAM_NAME, REQUEST_LATENCY_HISTOGRAM_NAME}; +pub use metrics::{ + LONG_REQUEST_LATENCY_HISTOGRAM_NAME, REQUEST_LATENCY_HISTOGRAM_NAME, + RPC_MESSAGE_SIZE_HISTOGRAM_NAME, +}; pub use options_structs::*; pub use replaceable::SharedReplaceableClient; pub use retry::RetryOptions; diff --git a/crates/client/src/metrics.rs b/crates/client/src/metrics.rs index 92e3ba778..fc2d264f2 100644 --- a/crates/client/src/metrics.rs +++ b/crates/client/src/metrics.rs @@ -1,8 +1,10 @@ use crate::{AttachMetricLabels, CallType, callback_based, dbg_panic}; +use bytes::Bytes; use futures_util::{ FutureExt, TryFutureExt, future::{BoxFuture, Either}, }; +use http_body_util::BodyExt; use std::{ fmt, task::{Context, Poll}, @@ -11,13 +13,16 @@ use std::{ use temporalio_common::telemetry::{ TaskQueueLabelStrategy, metrics::{ - Counter, CounterBase, HistogramDuration, HistogramDurationBase, MetricAttributable, - MetricAttributes, MetricKeyValue, MetricParameters, TemporalMeter, + Counter, CounterBase, Histogram, HistogramBase, HistogramDuration, HistogramDurationBase, + MESSAGE_DIRECTION_REQUEST, MESSAGE_DIRECTION_RESPONSE, MetricAttributable, + MetricAttributes, MetricKeyValue, MetricParameters, TemporalMeter, message_direction, }, }; use tonic::{Code, body::Body, transport::Channel}; use tower::Service; +pub use temporalio_common::telemetry::metrics::RPC_MESSAGE_SIZE_HISTOGRAM_NAME; + /// The string name (which may be prefixed) for this metric pub static REQUEST_LATENCY_HISTOGRAM_NAME: &str = "request_latency"; /// The string name (which may be prefixed) for this metric @@ -40,6 +45,7 @@ struct Instruments { svc_request_latency: HistogramDuration, long_svc_request_latency: HistogramDuration, + rpc_message_size: Histogram, } impl MetricsContext { @@ -75,6 +81,11 @@ impl MetricsContext { unit: "duration".into(), description: "Histogram of client long-poll request latencies".into(), }), + rpc_message_size: tm.histogram(MetricParameters { + name: RPC_MESSAGE_SIZE_HISTOGRAM_NAME.into(), + unit: "By".into(), + description: "Histogram of client gRPC request and response body sizes".into(), + }), }; Self { poll_is_long: false, @@ -109,8 +120,14 @@ impl MetricsContext { .long_svc_request_latency .with_attributes(self.meter.get_default_attributes()) }) - .map(|v| { + .and_then(|v| { self.instruments.long_svc_request_latency = v; + self.instruments + .rpc_message_size + .with_attributes(self.meter.get_default_attributes()) + }) + .map(|v| { + self.instruments.rpc_message_size = v; }) .inspect_err(|e| { dbg_panic!("Failed to extend client metrics attributes: {:?}", e); @@ -166,6 +183,10 @@ impl MetricsContext { self.instruments.svc_request_latency.records(dur); } } + + pub(crate) fn rpc_message_size(&self, size_bytes: u64) { + self.instruments.rpc_message_size.records(size_bytes); + } } const KEY_NAMESPACE: &str = "namespace"; @@ -216,6 +237,44 @@ fn code_as_screaming_snake(code: &Code) -> &'static str { } } +struct BodySizeRecorder { + size_bytes: u64, + record: Box, +} + +impl BodySizeRecorder { + fn new(record: impl Fn(u64) + Send + Sync + 'static) -> Self { + Self { + size_bytes: 0, + record: Box::new(record), + } + } + + fn add_frame(&mut self, frame: &Bytes) { + self.size_bytes += frame.len() as u64; + } +} + +impl Drop for BodySizeRecorder { + fn drop(&mut self) { + (self.record)(self.size_bytes); + } +} + +fn body_with_size_recorder(body: Body, recorder: BodySizeRecorder) -> Body { + let mut recorder = recorder; + Body::new(body.map_frame(move |frame| { + if let Some(data) = frame.data_ref() { + recorder.add_frame(data); + } + frame + })) +} + +fn body_size_recorder(metrics: MetricsContext) -> BodySizeRecorder { + BodySizeRecorder::new(move |size_bytes| metrics.rpc_message_size(size_bytes)) +} + /// Implements metrics functionality for gRPC (really, any http) calls #[derive(Debug, Clone)] pub(crate) struct GrpcMetricSvc { @@ -294,6 +353,11 @@ impl Service> for GrpcMetricSvc { metrics }) }); + if let Some(metrics) = metrics.as_ref() { + let mut req_metrics = metrics.clone(); + req_metrics.with_new_attrs([message_direction(MESSAGE_DIRECTION_REQUEST)]); + req = req.map(|body| body_with_size_recorder(body, body_size_recorder(req_metrics))); + } let callfut = match &mut self.inner { ChannelOrGrpcOverride::Channel(inner) => { Either::Left(inner.call(req).map_err(Into::into)) @@ -306,7 +370,7 @@ impl Service> for GrpcMetricSvc { async move { let started = Instant::now(); let res = callfut.await; - if let Some(metrics) = metrics { + if let Some(metrics) = metrics.as_ref() { metrics.record_svc_req_latency(started.elapsed()); match res { Ok(ref ok_res) => { @@ -339,8 +403,44 @@ impl Service> for GrpcMetricSvc { } } } - res + match (res, metrics) { + (Ok(res), Some(metrics)) => { + let mut resp_metrics = metrics; + resp_metrics.with_new_attrs([message_direction(MESSAGE_DIRECTION_RESPONSE)]); + Ok(res.map(|body| { + body_with_size_recorder(body, body_size_recorder(resp_metrics)) + })) + } + (res, _) => res, + } } .boxed() } } + +#[cfg(test)] +mod tests { + use super::*; + use http_body_util::Full; + use std::sync::{ + Arc, + atomic::{AtomicU64, Ordering}, + }; + + #[tokio::test] + async fn body_size_recorder_counts_data_frames() { + let recorded = Arc::new(AtomicU64::new(0)); + let recorded_clone = recorded.clone(); + let body = Body::new(Full::new(Bytes::from_static(b"hello"))); + let body = body_with_size_recorder( + body, + BodySizeRecorder::new(move |size_bytes| { + recorded_clone.store(size_bytes, Ordering::Relaxed); + }), + ); + + let _ = body.collect().await.unwrap(); + + assert_eq!(recorded.load(Ordering::Relaxed), 5); + } +} diff --git a/crates/common/src/telemetry/metrics.rs b/crates/common/src/telemetry/metrics.rs index 6dc427216..12cd24276 100644 --- a/crates/common/src/telemetry/metrics.rs +++ b/crates/common/src/telemetry/metrics.rs @@ -28,6 +28,23 @@ pub const ACTIVITY_SCHED_TO_START_LATENCY_HISTOGRAM_NAME: &str = "activity_schedule_to_start_latency"; /// The string name (which may be prefixed) for this metric pub const ACTIVITY_EXEC_LATENCY_HISTOGRAM_NAME: &str = "activity_execution_latency"; +/// The string name (which may be prefixed) for this metric +pub const WORKFLOW_PAYLOAD_SIZE_HISTOGRAM_NAME: &str = "workflow_payload_size"; +/// The string name (which may be prefixed) for this metric +pub const ACTIVITY_PAYLOAD_SIZE_HISTOGRAM_NAME: &str = "activity_payload_size"; +/// The string name (which may be prefixed) for this metric +pub const RPC_MESSAGE_SIZE_HISTOGRAM_NAME: &str = "rpc_message_size"; + +const KEY_MESSAGE_DIRECTION: &str = "message_direction"; +/// Kept shared so all payload-size emitters use the same request direction label. +pub const MESSAGE_DIRECTION_REQUEST: &str = "request"; +/// Kept shared so all payload-size emitters use the same response direction label. +pub const MESSAGE_DIRECTION_RESPONSE: &str = "response"; + +/// Keeps the payload-size label key consistent across client and core emitters. +pub fn message_direction(direction: &'static str) -> MetricKeyValue { + MetricKeyValue::new(KEY_MESSAGE_DIRECTION, direction) +} /// Helps define buckets once in terms of millis, but also generates a seconds version macro_rules! define_latency_buckets { @@ -37,12 +54,7 @@ macro_rules! define_latency_buckets { pub(super) static $sec_name: &[f64] = &[$( $bucket / 1000.0, )*]; )* - /// Returns the default histogram buckets that lang should use for a given metric name if - /// they have not been overridden by the user. If `use_seconds` is true, returns buckets - /// in terms of seconds rather than milliseconds. - /// - /// The name must *not* be prefixed with `temporal_` - pub fn default_buckets_for(histo_name: &str, use_seconds: bool) -> &'static [f64] { + fn default_duration_buckets_for(histo_name: &str, use_seconds: bool) -> &'static [f64] { match histo_name { $( $metric_name => { if use_seconds { &$sec_name } else { &$name } }, @@ -104,6 +116,34 @@ define_latency_buckets!( ) ); +pub(super) static PAYLOAD_SIZE_BUCKETS: &[f64] = &[ + 128.0, + 512.0, + 1_024.0, + 4_096.0, + 16_384.0, + 65_536.0, + 262_144.0, + 1_048_576.0, + 4_194_304.0, + 16_777_216.0, + 67_108_864.0, +]; + +/// Returns the default histogram buckets that lang should use for a given metric name if they have +/// not been overridden by the user. If `use_seconds` is true, duration metric buckets are returned +/// in terms of seconds rather than milliseconds. +/// +/// The name must *not* be prefixed with `temporal_`. +pub fn default_buckets_for(histo_name: &str, use_seconds: bool) -> &'static [f64] { + match histo_name { + WORKFLOW_PAYLOAD_SIZE_HISTOGRAM_NAME + | ACTIVITY_PAYLOAD_SIZE_HISTOGRAM_NAME + | RPC_MESSAGE_SIZE_HISTOGRAM_NAME => PAYLOAD_SIZE_BUCKETS, + _ => default_duration_buckets_for(histo_name, use_seconds), + } +} + /// Implementors of this trait are expected to be defined in each language's bridge. /// The implementor is responsible for the allocation/instantiation of new metric meters which /// Core has requested. diff --git a/crates/common/src/telemetry/otel.rs b/crates/common/src/telemetry/otel.rs index 0668ee37f..b40abc3ba 100644 --- a/crates/common/src/telemetry/otel.rs +++ b/crates/common/src/telemetry/otel.rs @@ -2,11 +2,13 @@ use super::{ HistogramBucketOverrides, MetricTemporality, OtelCollectorOptions, OtlpProtocol, TELEM_SERVICE_NAME, metrics::{ - ACTIVITY_EXEC_LATENCY_HISTOGRAM_NAME, ACTIVITY_SCHED_TO_START_LATENCY_HISTOGRAM_NAME, - CoreMeter, Counter, DEFAULT_MS_BUCKETS, DEFAULT_S_BUCKETS, Gauge, GaugeF64, Histogram, - HistogramBase, HistogramDuration, HistogramDurationBase, HistogramF64, HistogramF64Base, - MetricAttributable, MetricAttributes, MetricParameters, NewAttributes, UpDownCounter, - WORKFLOW_E2E_LATENCY_HISTOGRAM_NAME, WORKFLOW_TASK_EXECUTION_LATENCY_HISTOGRAM_NAME, + ACTIVITY_EXEC_LATENCY_HISTOGRAM_NAME, ACTIVITY_PAYLOAD_SIZE_HISTOGRAM_NAME, + ACTIVITY_SCHED_TO_START_LATENCY_HISTOGRAM_NAME, CoreMeter, Counter, DEFAULT_MS_BUCKETS, + DEFAULT_S_BUCKETS, Gauge, GaugeF64, Histogram, HistogramBase, HistogramDuration, + HistogramDurationBase, HistogramF64, HistogramF64Base, MetricAttributable, + MetricAttributes, MetricParameters, NewAttributes, RPC_MESSAGE_SIZE_HISTOGRAM_NAME, + UpDownCounter, WORKFLOW_E2E_LATENCY_HISTOGRAM_NAME, WORKFLOW_PAYLOAD_SIZE_HISTOGRAM_NAME, + WORKFLOW_TASK_EXECUTION_LATENCY_HISTOGRAM_NAME, WORKFLOW_TASK_REPLAY_LATENCY_HISTOGRAM_NAME, WORKFLOW_TASK_SCHED_TO_START_LATENCY_HISTOGRAM_NAME, default_buckets_for, }, @@ -94,6 +96,15 @@ pub(super) fn augment_meter_provider_with_defaults( ACTIVITY_EXEC_LATENCY_HISTOGRAM_NAME, use_seconds, )); + mpb = mpb.with_view(histo_view( + WORKFLOW_PAYLOAD_SIZE_HISTOGRAM_NAME, + use_seconds, + )); + mpb = mpb.with_view(histo_view( + ACTIVITY_PAYLOAD_SIZE_HISTOGRAM_NAME, + use_seconds, + )); + mpb = mpb.with_view(histo_view(RPC_MESSAGE_SIZE_HISTOGRAM_NAME, use_seconds)); // Fallback default mpb = mpb.with_view(move |ins: &Instrument| { if ins.kind() == InstrumentKind::Histogram { diff --git a/crates/sdk-core/src/telemetry/metrics.rs b/crates/sdk-core/src/telemetry/metrics.rs index 98edbfed1..5a8a79f7e 100644 --- a/crates/sdk-core/src/telemetry/metrics.rs +++ b/crates/sdk-core/src/telemetry/metrics.rs @@ -9,6 +9,7 @@ use std::{ sync::{Arc, atomic::AtomicU64}, time::Duration, }; +pub(crate) use temporalio_common::telemetry::metrics::message_direction; use temporalio_common::{ protos::temporal::api::{enums::v1::WorkflowTaskFailedCause, failure::v1::Failure}, telemetry::metrics::{core::*, *}, @@ -40,12 +41,14 @@ struct Instruments { wf_task_sched_to_start_latency: HistogramDuration, wf_task_replay_latency: HistogramDuration, wf_task_execution_latency: HistogramDuration, + wf_payload_size: Histogram, act_poll_no_task: Counter, act_task_received_counter: Counter, act_execution_failed: Counter, act_sched_to_start_latency: HistogramDuration, act_exec_latency: HistogramDuration, act_exec_succeeded_latency: HistogramDuration, + act_payload_size: Histogram, la_execution_cancelled: Counter, la_execution_failed: Counter, la_exec_latency: HistogramDuration, @@ -118,7 +121,7 @@ impl MetricsContext { instruments.update_attributes(tm.get_default_attributes()); Self { instruments: Arc::new(instruments), - meter: self.meter.clone(), + meter: tm, in_memory_metrics: self.in_memory_metrics.clone(), } } @@ -182,6 +185,10 @@ impl MetricsContext { self.instruments.wf_task_replay_latency.records(dur); } + pub(crate) fn wf_payload_size(&self, size_bytes: u64) { + self.instruments.wf_payload_size.records(size_bytes); + } + /// An activity long poll timed out pub(crate) fn act_poll_timeout(&self) { self.instruments.act_poll_no_task.adds(1); @@ -213,6 +220,10 @@ impl MetricsContext { self.instruments.act_exec_latency.records(dur); } + pub(crate) fn act_payload_size(&self, size_bytes: u64) { + self.instruments.act_payload_size.records(size_bytes); + } + pub(crate) fn la_execution_cancelled(&self) { self.instruments.la_execution_cancelled.adds(1); } @@ -383,6 +394,12 @@ impl Instruments { unit: "duration".into(), description: "Histogram of workflow task execution (not replay) latencies".into(), }), + wf_payload_size: meter.histogram(MetricParameters { + name: WORKFLOW_PAYLOAD_SIZE_HISTOGRAM_NAME.into(), + unit: "By".into(), + description: "Histogram of workflow input and successful result payload sizes" + .into(), + }), act_poll_no_task: meter.counter(MetricParameters { name: "activity_poll_no_task".into(), description: "Count of activity task queue poll timeouts (no new task)".into(), @@ -414,6 +431,12 @@ impl Instruments { description: "Histogram of activity execution latencies for successful activities" .into(), }), + act_payload_size: meter.histogram(MetricParameters { + name: ACTIVITY_PAYLOAD_SIZE_HISTOGRAM_NAME.into(), + unit: "By".into(), + description: "Histogram of activity input and successful result payload sizes" + .into(), + }), la_execution_cancelled: meter.counter(MetricParameters { name: "local_activity_execution_cancelled".into(), description: "Count of local activity executions that were cancelled".into(), @@ -535,6 +558,8 @@ impl Instruments { .update_attributes(new_attributes.clone()); self.wf_task_execution_latency .update_attributes(new_attributes.clone()); + self.wf_payload_size + .update_attributes(new_attributes.clone()); self.act_poll_no_task .update_attributes(new_attributes.clone()); self.act_task_received_counter @@ -547,6 +572,8 @@ impl Instruments { .update_attributes(new_attributes.clone()); self.act_exec_succeeded_latency .update_attributes(new_attributes.clone()); + self.act_payload_size + .update_attributes(new_attributes.clone()); self.la_execution_cancelled .update_attributes(new_attributes.clone()); self.la_execution_failed @@ -737,6 +764,7 @@ pub(crate) fn activity_type(ty: String) -> MetricKeyValue { pub(crate) fn workflow_type(ty: String) -> MetricKeyValue { MetricKeyValue::new(KEY_WF_TYPE, ty) } + pub(crate) fn workflow_worker_type() -> MetricKeyValue { MetricKeyValue::new(KEY_WORKER_TYPE, "WorkflowWorker") } @@ -1187,7 +1215,7 @@ mod tests { a2.set(Arc::new(DummyCustomAttrs(2))).unwrap(); // Verify all metrics are created. This number will need to get updated any time a metric // is added. - let num_metrics = 35; + let num_metrics = 37; #[allow(clippy::needless_range_loop)] // Sorry clippy, this reads easier. for metric_num in 2..=num_metrics + 1 { let hole = assert_matches!(&events[metric_num], diff --git a/crates/sdk-core/src/worker/activities.rs b/crates/sdk-core/src/worker/activities.rs index e7bdc3d0d..bfccbe778 100644 --- a/crates/sdk-core/src/worker/activities.rs +++ b/crates/sdk-core/src/worker/activities.rs @@ -14,7 +14,8 @@ use crate::{ }, pollers::{BoxedActPoller, PermittedTqResp, TrackedPermittedTqResp, new_activity_task_poller}, telemetry::metrics::{ - MetricsContext, activity_type, eager, should_record_failure_metric, workflow_type, + MetricsContext, activity_type, eager, message_direction, should_record_failure_metric, + workflow_type, }, worker::{ ActivitySlotKind, PollError, @@ -36,16 +37,22 @@ use std::{ }, time::{Duration, Instant, SystemTime}, }; -use temporalio_common::protos::{ - coresdk::{ - ActivityHeartbeat, ActivitySlotInfo, - activity_result::{self as ar, activity_execution_result as aer}, - activity_task::{ActivityCancelReason, ActivityCancellationDetails, ActivityTask}, - }, - temporal::api::{ - failure::v1::{ApplicationFailureInfo, CanceledFailureInfo, Failure, failure::FailureInfo}, - workflowservice::v1::PollActivityTaskQueueResponse, +use temporalio_common::{ + protos::{ + coresdk::{ + ActivityHeartbeat, ActivitySlotInfo, + activity_result::{self as ar, activity_execution_result as aer}, + activity_task::{ActivityCancelReason, ActivityCancellationDetails, ActivityTask}, + }, + temporal::api::{ + common::v1::{Payload, Payloads}, + failure::v1::{ + ApplicationFailureInfo, CanceledFailureInfo, Failure, failure::FailureInfo, + }, + workflowservice::v1::PollActivityTaskQueueResponse, + }, }, + telemetry::metrics::{MESSAGE_DIRECTION_REQUEST, MESSAGE_DIRECTION_RESPONSE}, }; use tokio::{ join, @@ -325,9 +332,11 @@ impl WorkerActivityTasks { outstanding_activity_tasks.remove(&task_token) }; if let Some(act_info) = act_info { + let activity_type_name = act_info.base.activity_type.clone(); + let workflow_type_name = act_info.base.workflow_type.clone(); let act_metrics = self.metrics.with_new_attrs([ - activity_type(act_info.base.activity_type), - workflow_type(act_info.base.workflow_type), + activity_type(activity_type_name.clone()), + workflow_type(workflow_type_name.clone()), ]); Span::current().record("workflow_id", act_info.base.workflow_id); Span::current().record("run_id", act_info.base.workflow_run_id); @@ -352,6 +361,13 @@ impl WorkerActivityTasks { let maybe_net_err = match status { aer::Status::WillCompleteAsync(_) => None, aer::Status::Completed(ar::Success { result }) => { + self.metrics + .with_new_attrs([ + activity_type(activity_type_name.clone()), + workflow_type(workflow_type_name.clone()), + message_direction(MESSAGE_DIRECTION_RESPONSE), + ]) + .act_payload_size(maybe_payload_size(result.as_ref())); if let Some(sched_time) = act_info .base .scheduled_time @@ -557,6 +573,13 @@ where eager(is_eager), ]) .act_task_received(); + self.metrics + .with_new_attrs([ + activity_type(activity_type_name.to_owned()), + workflow_type(wf_type.name.clone()), + message_direction(MESSAGE_DIRECTION_REQUEST), + ]) + .act_payload_size(payloads_size(task.resp.input.as_ref())); } } // There could be an else statement here but since the response @@ -733,6 +756,25 @@ impl ActivitiesFromWFTsHandle { } } +fn payloads_size(payloads: Option<&Payloads>) -> u64 { + payloads + .map(|payloads| payloads.payloads.iter().map(payload_size).sum()) + .unwrap_or_default() +} + +fn maybe_payload_size(payload: Option<&Payload>) -> u64 { + payload.map(payload_size).unwrap_or_default() +} + +fn payload_size(payload: &Payload) -> u64 { + let metadata_size: usize = payload + .metadata + .iter() + .map(|(key, value)| key.len() + value.len()) + .sum(); + (payload.data.len() + metadata_size) as u64 +} + fn worker_shutdown_failure() -> Failure { Failure { message: "Worker is shutting down and this activity did not complete in time".to_string(), @@ -762,6 +804,19 @@ mod tests { use crossbeam_utils::atomic::AtomicCell; use temporalio_common::protos::coresdk::activity_result::ActivityExecutionResult; + #[test] + fn payload_size_counts_data_and_metadata_bytes() { + let payload = Payload { + metadata: HashMap::from([("encoding".to_owned(), b"json/plain".to_vec())]), + data: b"{\"job_id\":\"abc\"}".to_vec(), + ..Default::default() + }; + + let expected_size = payload.data.len() + "encoding".len() + b"json/plain".len(); + + assert_eq!(payload_size(&payload), expected_size as u64); + } + #[tokio::test] async fn per_worker_ratelimit() { let mut mock_client = mock_worker_client(); diff --git a/crates/sdk-core/src/worker/workflow/mod.rs b/crates/sdk-core/src/worker/workflow/mod.rs index 9c4b8bad9..9884a35ed 100644 --- a/crates/sdk-core/src/worker/workflow/mod.rs +++ b/crates/sdk-core/src/worker/workflow/mod.rs @@ -75,7 +75,7 @@ use temporalio_common::{ temporal::api::{ command::v1::{Command as ProtoCommand, Command, command::Attributes}, common::v1::{ - Memo, MeteringMetadata, RetryPolicy, SearchAttributes, WorkflowExecution, + Memo, MeteringMetadata, Payload, RetryPolicy, SearchAttributes, WorkflowExecution, }, enums::v1::{VersioningBehavior, WorkflowTaskFailedCause}, failure::v1::{ApplicationFailureInfo, failure::FailureInfo}, @@ -86,7 +86,10 @@ use temporalio_common::{ workflowservice::v1::{PollActivityTaskQueueResponse, get_system_info_response}, }, }, - telemetry::set_trace_subscriber_for_current_thread, + telemetry::{ + metrics::{MESSAGE_DIRECTION_REQUEST, MESSAGE_DIRECTION_RESPONSE}, + set_trace_subscriber_for_current_thread, + }, }; use tokio::{ sync::{ @@ -287,6 +290,7 @@ impl Workflows { match al { ActivationOrAuto::LangActivation(mut act) | ActivationOrAuto::ReadyForQueries(mut act) => { + record_workflow_activation_payload_sizes(&self.metrics, &act); prepare_to_ship_activation(&mut act); debug!(activation=%act, "Sending activation to lang"); break Ok(act); @@ -370,6 +374,7 @@ impl Workflows { } else { 0 }; + record_workflow_completion_payload_sizes(&run_metrics, &commands); let mut maybe_record_terminal_metric = detect_terminal_command(&commands); let mut completion = WorkflowTaskCompletion { task_token: task_token.clone(), @@ -1709,6 +1714,52 @@ fn prepare_to_ship_activation(wfa: &mut WorkflowActivation) { }); } +fn record_workflow_activation_payload_sizes( + metric_context: &MetricsContext, + wfa: &WorkflowActivation, +) { + for job in &wfa.jobs { + if let Some(workflow_activation_job::Variant::InitializeWorkflow(init)) = &job.variant { + metric_context + .with_new_attrs([ + metrics::workflow_type(init.workflow_type.clone()), + metrics::message_direction(MESSAGE_DIRECTION_REQUEST), + ]) + .wf_payload_size(payloads_size(&init.arguments)); + } + } +} + +fn record_workflow_completion_payload_sizes(metric_context: &MetricsContext, commands: &[Command]) { + for command in commands { + if let Some(Attributes::CompleteWorkflowExecutionCommandAttributes(complete)) = + &command.attributes + { + metric_context + .with_new_attrs([metrics::message_direction(MESSAGE_DIRECTION_RESPONSE)]) + .wf_payload_size( + complete + .result + .as_ref() + .map_or(0, |payloads| payloads_size(&payloads.payloads)), + ); + } + } +} + +fn payloads_size(payloads: &[Payload]) -> u64 { + payloads.iter().map(payload_size).sum() +} + +fn payload_size(payload: &Payload) -> u64 { + let metadata_size: usize = payload + .metadata + .iter() + .map(|(key, value)| key.len() + value.len()) + .sum(); + (payload.data.len() + metadata_size) as u64 +} + fn make_grpc_message_too_large_failure() -> Failure { Failure { failure: Some( diff --git a/crates/sdk-core/tests/integ_tests/metrics_tests.rs b/crates/sdk-core/tests/integ_tests/metrics_tests.rs index 782549977..7446d13b1 100644 --- a/crates/sdk-core/tests/integ_tests/metrics_tests.rs +++ b/crates/sdk-core/tests/integ_tests/metrics_tests.rs @@ -64,8 +64,8 @@ use temporalio_common::{ }; use temporalio_macros::{activities, workflow, workflow_methods}; use temporalio_sdk::{ - ActivityOptions, CancellableFuture, LocalActivityOptions, NexusOperationOptions, - WorkflowContext, WorkflowResult, + ActivityOptions, CancellableFuture, ChildWorkflowOptions, LocalActivityOptions, + NexusOperationOptions, WorkflowContext, WorkflowResult, activities::{ActivityContext, ActivityError}, }; use temporalio_sdk_core::{ @@ -87,6 +87,16 @@ pub(crate) async fn get_text(endpoint: String) -> String { reqwest::get(endpoint).await.unwrap().text().await.unwrap() } +fn metric_value(body: &str, metric_prefix: &str) -> f64 { + body.lines() + .find(|line| line.starts_with(metric_prefix)) + .and_then(|line| line.rsplit_once(' ')) + .unwrap_or_else(|| panic!("missing metric line starting with {metric_prefix}")) + .1 + .parse() + .unwrap_or_else(|err| panic!("metric value for {metric_prefix} should parse: {err}")) +} + #[rstest::rstest] #[tokio::test] async fn prometheus_metrics_exported( @@ -939,6 +949,40 @@ async fn activity_metrics() { namespace=\"{NAMESPACE}\",service_name=\"temporal-core-sdk\",\ task_queue=\"{task_queue}\",workflow_type=\"{wf_type}\"}} 1" ))); + assert!(body.contains(&format!( + "temporal_activity_payload_size_count{{activity_type=\"pass_fail_act\",\ + message_direction=\"request\",namespace=\"{NAMESPACE}\",\ + service_name=\"temporal-core-sdk\",task_queue=\"{task_queue}\",\ + workflow_type=\"{wf_type}\"}} 2" + ))); + assert!(body.contains(&format!( + "temporal_activity_payload_size_count{{activity_type=\"pass_fail_act\",\ + message_direction=\"response\",namespace=\"{NAMESPACE}\",\ + service_name=\"temporal-core-sdk\",task_queue=\"{task_queue}\",\ + workflow_type=\"{wf_type}\"}} 1" + ))); + assert!( + metric_value( + &body, + &format!( + "temporal_activity_payload_size_sum{{activity_type=\"pass_fail_act\",\ + message_direction=\"request\",namespace=\"{NAMESPACE}\",\ + service_name=\"temporal-core-sdk\",task_queue=\"{task_queue}\",\ + workflow_type=\"{wf_type}\"}}" + ) + ) > 0.0 + ); + assert!( + metric_value( + &body, + &format!( + "temporal_activity_payload_size_sum{{activity_type=\"pass_fail_act\",\ + message_direction=\"response\",namespace=\"{NAMESPACE}\",\ + service_name=\"temporal-core-sdk\",task_queue=\"{task_queue}\",\ + workflow_type=\"{wf_type}\"}}" + ) + ) > 0.0 + ); assert!(body.contains(&format!( "temporal_local_activity_total{{activity_type=\"pass_fail_act\",namespace=\"{NAMESPACE}\",\ @@ -971,6 +1015,100 @@ async fn activity_metrics() { ))); } +#[tokio::test] +async fn payload_size_metrics() { + let (telemopts, addr, _aborter) = prom_metrics(None); + let rt = CoreRuntime::new_assume_tokio(get_integ_runtime_options(telemopts)).unwrap(); + let wf_name = "payload_size_metrics"; + let mut starter = CoreWfStarter::new_with_runtime(wf_name, rt); + starter.sdk_config.task_types = WorkerTaskTypes::workflow_only(); + let mut worker = starter.worker().await; + + #[workflow] + #[derive(Default)] + struct PayloadMetricsChild; + + #[workflow_methods] + impl PayloadMetricsChild { + #[run] + async fn run(_ctx: &mut WorkflowContext, input: String) -> WorkflowResult { + Ok(format!("child-result:{input}")) + } + } + + #[workflow] + #[derive(Default)] + struct PayloadMetricsParent; + + #[workflow_methods] + impl PayloadMetricsParent { + #[run] + async fn run(ctx: &mut WorkflowContext, input: String) -> WorkflowResult { + let started = ctx + .child_workflow( + PayloadMetricsChild::run, + "child-input".to_string(), + ChildWorkflowOptions { + workflow_id: "payload-size-child".to_string(), + ..Default::default() + }, + ) + .await + .expect("child workflow should start"); + let child_result = started.result().await?; + Ok(format!("parent-result:{input}:{child_result}")) + } + } + + worker.register_workflow::(); + worker.register_workflow::(); + let task_queue = starter.get_task_queue().to_owned(); + let workflow_id = wf_name.to_owned(); + worker + .submit_workflow( + PayloadMetricsParent::run, + "parent-input".to_string(), + WorkflowStartOptions::new(task_queue.clone(), workflow_id).build(), + ) + .await + .unwrap(); + worker.run_until_done().await.unwrap(); + + let body = get_text(format!("http://{addr}/metrics")).await; + let parent_wf_type = PayloadMetricsParent::name(); + let child_wf_type = PayloadMetricsChild::name(); + assert!(body.contains(&format!( + "temporal_workflow_payload_size_count{{message_direction=\"request\",\ + namespace=\"{NAMESPACE}\",service_name=\"temporal-core-sdk\",\ + task_queue=\"{task_queue}\",workflow_type=\"{parent_wf_type}\"}} 1" + ))); + assert!(body.contains(&format!( + "temporal_workflow_payload_size_count{{message_direction=\"response\",\ + namespace=\"{NAMESPACE}\",service_name=\"temporal-core-sdk\",\ + task_queue=\"{task_queue}\",workflow_type=\"{parent_wf_type}\"}} 1" + ))); + assert!(body.contains(&format!( + "temporal_workflow_payload_size_count{{message_direction=\"request\",\ + namespace=\"{NAMESPACE}\",service_name=\"temporal-core-sdk\",\ + task_queue=\"{task_queue}\",workflow_type=\"{child_wf_type}\"}} 1" + ))); + assert!(body.contains(&format!( + "temporal_workflow_payload_size_count{{message_direction=\"response\",\ + namespace=\"{NAMESPACE}\",service_name=\"temporal-core-sdk\",\ + task_queue=\"{task_queue}\",workflow_type=\"{child_wf_type}\"}} 1" + ))); + assert!(body.contains(&format!( + "temporal_rpc_message_size_count{{message_direction=\"request\",\ + namespace=\"{NAMESPACE}\",operation=\"RespondWorkflowTaskCompleted\",\ + service_name=\"temporal-core-sdk\"}} " + ))); + assert!(body.contains(&format!( + "temporal_rpc_message_size_count{{message_direction=\"response\",\ + namespace=\"{NAMESPACE}\",operation=\"RespondWorkflowTaskCompleted\",\ + service_name=\"temporal-core-sdk\"}} " + ))); +} + #[tokio::test] async fn nexus_metrics() { let (telemopts, addr, _aborter) = prom_metrics(None);