diff --git a/crates/client/src/lib.rs b/crates/client/src/lib.rs index 906469137..bd93883da 100644 --- a/crates/client/src/lib.rs +++ b/crates/client/src/lib.rs @@ -36,7 +36,10 @@ pub use async_activity_handle::{ ActivityHeartbeatResponse, ActivityIdentifier, AsyncActivityHandle, }; -pub use metrics::{LONG_REQUEST_LATENCY_HISTOGRAM_NAME, REQUEST_LATENCY_HISTOGRAM_NAME}; +pub use metrics::{ + LONG_REQUEST_LATENCY_HISTOGRAM_NAME, REQUEST_LATENCY_HISTOGRAM_NAME, + RPC_MESSAGE_SIZE_HISTOGRAM_NAME, +}; pub use options_structs::*; pub use replaceable::SharedReplaceableClient; pub use retry::RetryOptions; diff --git a/crates/client/src/metrics.rs b/crates/client/src/metrics.rs index 92e3ba778..fc2d264f2 100644 --- a/crates/client/src/metrics.rs +++ b/crates/client/src/metrics.rs @@ -1,8 +1,10 @@ use crate::{AttachMetricLabels, CallType, callback_based, dbg_panic}; +use bytes::Bytes; use futures_util::{ FutureExt, TryFutureExt, future::{BoxFuture, Either}, }; +use http_body_util::BodyExt; use std::{ fmt, task::{Context, Poll}, @@ -11,13 +13,16 @@ use std::{ use temporalio_common::telemetry::{ TaskQueueLabelStrategy, metrics::{ - Counter, CounterBase, HistogramDuration, HistogramDurationBase, MetricAttributable, - MetricAttributes, MetricKeyValue, MetricParameters, TemporalMeter, + Counter, CounterBase, Histogram, HistogramBase, HistogramDuration, HistogramDurationBase, + MESSAGE_DIRECTION_REQUEST, MESSAGE_DIRECTION_RESPONSE, MetricAttributable, + MetricAttributes, MetricKeyValue, MetricParameters, TemporalMeter, message_direction, }, }; use tonic::{Code, body::Body, transport::Channel}; use tower::Service; +pub use temporalio_common::telemetry::metrics::RPC_MESSAGE_SIZE_HISTOGRAM_NAME; + /// The string name (which may be prefixed) for this metric pub static REQUEST_LATENCY_HISTOGRAM_NAME: &str = "request_latency"; /// The string name (which may be prefixed) for this metric @@ -40,6 +45,7 @@ struct Instruments { svc_request_latency: HistogramDuration, long_svc_request_latency: HistogramDuration, + rpc_message_size: Histogram, } impl MetricsContext { @@ -75,6 +81,11 @@ impl MetricsContext { unit: "duration".into(), description: "Histogram of client long-poll request latencies".into(), }), + rpc_message_size: tm.histogram(MetricParameters { + name: RPC_MESSAGE_SIZE_HISTOGRAM_NAME.into(), + unit: "By".into(), + description: "Histogram of client gRPC request and response body sizes".into(), + }), }; Self { poll_is_long: false, @@ -109,8 +120,14 @@ impl MetricsContext { .long_svc_request_latency .with_attributes(self.meter.get_default_attributes()) }) - .map(|v| { + .and_then(|v| { self.instruments.long_svc_request_latency = v; + self.instruments + .rpc_message_size + .with_attributes(self.meter.get_default_attributes()) + }) + .map(|v| { + self.instruments.rpc_message_size = v; }) .inspect_err(|e| { dbg_panic!("Failed to extend client metrics attributes: {:?}", e); @@ -166,6 +183,10 @@ impl MetricsContext { self.instruments.svc_request_latency.records(dur); } } + + pub(crate) fn rpc_message_size(&self, size_bytes: u64) { + self.instruments.rpc_message_size.records(size_bytes); + } } const KEY_NAMESPACE: &str = "namespace"; @@ -216,6 +237,44 @@ fn code_as_screaming_snake(code: &Code) -> &'static str { } } +struct BodySizeRecorder { + size_bytes: u64, + record: Box, +} + +impl BodySizeRecorder { + fn new(record: impl Fn(u64) + Send + Sync + 'static) -> Self { + Self { + size_bytes: 0, + record: Box::new(record), + } + } + + fn add_frame(&mut self, frame: &Bytes) { + self.size_bytes += frame.len() as u64; + } +} + +impl Drop for BodySizeRecorder { + fn drop(&mut self) { + (self.record)(self.size_bytes); + } +} + +fn body_with_size_recorder(body: Body, recorder: BodySizeRecorder) -> Body { + let mut recorder = recorder; + Body::new(body.map_frame(move |frame| { + if let Some(data) = frame.data_ref() { + recorder.add_frame(data); + } + frame + })) +} + +fn body_size_recorder(metrics: MetricsContext) -> BodySizeRecorder { + BodySizeRecorder::new(move |size_bytes| metrics.rpc_message_size(size_bytes)) +} + /// Implements metrics functionality for gRPC (really, any http) calls #[derive(Debug, Clone)] pub(crate) struct GrpcMetricSvc { @@ -294,6 +353,11 @@ impl Service> for GrpcMetricSvc { metrics }) }); + if let Some(metrics) = metrics.as_ref() { + let mut req_metrics = metrics.clone(); + req_metrics.with_new_attrs([message_direction(MESSAGE_DIRECTION_REQUEST)]); + req = req.map(|body| body_with_size_recorder(body, body_size_recorder(req_metrics))); + } let callfut = match &mut self.inner { ChannelOrGrpcOverride::Channel(inner) => { Either::Left(inner.call(req).map_err(Into::into)) @@ -306,7 +370,7 @@ impl Service> for GrpcMetricSvc { async move { let started = Instant::now(); let res = callfut.await; - if let Some(metrics) = metrics { + if let Some(metrics) = metrics.as_ref() { metrics.record_svc_req_latency(started.elapsed()); match res { Ok(ref ok_res) => { @@ -339,8 +403,44 @@ impl Service> for GrpcMetricSvc { } } } - res + match (res, metrics) { + (Ok(res), Some(metrics)) => { + let mut resp_metrics = metrics; + resp_metrics.with_new_attrs([message_direction(MESSAGE_DIRECTION_RESPONSE)]); + Ok(res.map(|body| { + body_with_size_recorder(body, body_size_recorder(resp_metrics)) + })) + } + (res, _) => res, + } } .boxed() } } + +#[cfg(test)] +mod tests { + use super::*; + use http_body_util::Full; + use std::sync::{ + Arc, + atomic::{AtomicU64, Ordering}, + }; + + #[tokio::test] + async fn body_size_recorder_counts_data_frames() { + let recorded = Arc::new(AtomicU64::new(0)); + let recorded_clone = recorded.clone(); + let body = Body::new(Full::new(Bytes::from_static(b"hello"))); + let body = body_with_size_recorder( + body, + BodySizeRecorder::new(move |size_bytes| { + recorded_clone.store(size_bytes, Ordering::Relaxed); + }), + ); + + let _ = body.collect().await.unwrap(); + + assert_eq!(recorded.load(Ordering::Relaxed), 5); + } +} diff --git a/crates/common/src/telemetry/metrics.rs b/crates/common/src/telemetry/metrics.rs index 6dc427216..12cd24276 100644 --- a/crates/common/src/telemetry/metrics.rs +++ b/crates/common/src/telemetry/metrics.rs @@ -28,6 +28,23 @@ pub const ACTIVITY_SCHED_TO_START_LATENCY_HISTOGRAM_NAME: &str = "activity_schedule_to_start_latency"; /// The string name (which may be prefixed) for this metric pub const ACTIVITY_EXEC_LATENCY_HISTOGRAM_NAME: &str = "activity_execution_latency"; +/// The string name (which may be prefixed) for this metric +pub const WORKFLOW_PAYLOAD_SIZE_HISTOGRAM_NAME: &str = "workflow_payload_size"; +/// The string name (which may be prefixed) for this metric +pub const ACTIVITY_PAYLOAD_SIZE_HISTOGRAM_NAME: &str = "activity_payload_size"; +/// The string name (which may be prefixed) for this metric +pub const RPC_MESSAGE_SIZE_HISTOGRAM_NAME: &str = "rpc_message_size"; + +const KEY_MESSAGE_DIRECTION: &str = "message_direction"; +/// Kept shared so all payload-size emitters use the same request direction label. +pub const MESSAGE_DIRECTION_REQUEST: &str = "request"; +/// Kept shared so all payload-size emitters use the same response direction label. +pub const MESSAGE_DIRECTION_RESPONSE: &str = "response"; + +/// Keeps the payload-size label key consistent across client and core emitters. +pub fn message_direction(direction: &'static str) -> MetricKeyValue { + MetricKeyValue::new(KEY_MESSAGE_DIRECTION, direction) +} /// Helps define buckets once in terms of millis, but also generates a seconds version macro_rules! define_latency_buckets { @@ -37,12 +54,7 @@ macro_rules! define_latency_buckets { pub(super) static $sec_name: &[f64] = &[$( $bucket / 1000.0, )*]; )* - /// Returns the default histogram buckets that lang should use for a given metric name if - /// they have not been overridden by the user. If `use_seconds` is true, returns buckets - /// in terms of seconds rather than milliseconds. - /// - /// The name must *not* be prefixed with `temporal_` - pub fn default_buckets_for(histo_name: &str, use_seconds: bool) -> &'static [f64] { + fn default_duration_buckets_for(histo_name: &str, use_seconds: bool) -> &'static [f64] { match histo_name { $( $metric_name => { if use_seconds { &$sec_name } else { &$name } }, @@ -104,6 +116,34 @@ define_latency_buckets!( ) ); +pub(super) static PAYLOAD_SIZE_BUCKETS: &[f64] = &[ + 128.0, + 512.0, + 1_024.0, + 4_096.0, + 16_384.0, + 65_536.0, + 262_144.0, + 1_048_576.0, + 4_194_304.0, + 16_777_216.0, + 67_108_864.0, +]; + +/// Returns the default histogram buckets that lang should use for a given metric name if they have +/// not been overridden by the user. If `use_seconds` is true, duration metric buckets are returned +/// in terms of seconds rather than milliseconds. +/// +/// The name must *not* be prefixed with `temporal_`. +pub fn default_buckets_for(histo_name: &str, use_seconds: bool) -> &'static [f64] { + match histo_name { + WORKFLOW_PAYLOAD_SIZE_HISTOGRAM_NAME + | ACTIVITY_PAYLOAD_SIZE_HISTOGRAM_NAME + | RPC_MESSAGE_SIZE_HISTOGRAM_NAME => PAYLOAD_SIZE_BUCKETS, + _ => default_duration_buckets_for(histo_name, use_seconds), + } +} + /// Implementors of this trait are expected to be defined in each language's bridge. /// The implementor is responsible for the allocation/instantiation of new metric meters which /// Core has requested. diff --git a/crates/common/src/telemetry/otel.rs b/crates/common/src/telemetry/otel.rs index 0668ee37f..b40abc3ba 100644 --- a/crates/common/src/telemetry/otel.rs +++ b/crates/common/src/telemetry/otel.rs @@ -2,11 +2,13 @@ use super::{ HistogramBucketOverrides, MetricTemporality, OtelCollectorOptions, OtlpProtocol, TELEM_SERVICE_NAME, metrics::{ - ACTIVITY_EXEC_LATENCY_HISTOGRAM_NAME, ACTIVITY_SCHED_TO_START_LATENCY_HISTOGRAM_NAME, - CoreMeter, Counter, DEFAULT_MS_BUCKETS, DEFAULT_S_BUCKETS, Gauge, GaugeF64, Histogram, - HistogramBase, HistogramDuration, HistogramDurationBase, HistogramF64, HistogramF64Base, - MetricAttributable, MetricAttributes, MetricParameters, NewAttributes, UpDownCounter, - WORKFLOW_E2E_LATENCY_HISTOGRAM_NAME, WORKFLOW_TASK_EXECUTION_LATENCY_HISTOGRAM_NAME, + ACTIVITY_EXEC_LATENCY_HISTOGRAM_NAME, ACTIVITY_PAYLOAD_SIZE_HISTOGRAM_NAME, + ACTIVITY_SCHED_TO_START_LATENCY_HISTOGRAM_NAME, CoreMeter, Counter, DEFAULT_MS_BUCKETS, + DEFAULT_S_BUCKETS, Gauge, GaugeF64, Histogram, HistogramBase, HistogramDuration, + HistogramDurationBase, HistogramF64, HistogramF64Base, MetricAttributable, + MetricAttributes, MetricParameters, NewAttributes, RPC_MESSAGE_SIZE_HISTOGRAM_NAME, + UpDownCounter, WORKFLOW_E2E_LATENCY_HISTOGRAM_NAME, WORKFLOW_PAYLOAD_SIZE_HISTOGRAM_NAME, + WORKFLOW_TASK_EXECUTION_LATENCY_HISTOGRAM_NAME, WORKFLOW_TASK_REPLAY_LATENCY_HISTOGRAM_NAME, WORKFLOW_TASK_SCHED_TO_START_LATENCY_HISTOGRAM_NAME, default_buckets_for, }, @@ -94,6 +96,15 @@ pub(super) fn augment_meter_provider_with_defaults( ACTIVITY_EXEC_LATENCY_HISTOGRAM_NAME, use_seconds, )); + mpb = mpb.with_view(histo_view( + WORKFLOW_PAYLOAD_SIZE_HISTOGRAM_NAME, + use_seconds, + )); + mpb = mpb.with_view(histo_view( + ACTIVITY_PAYLOAD_SIZE_HISTOGRAM_NAME, + use_seconds, + )); + mpb = mpb.with_view(histo_view(RPC_MESSAGE_SIZE_HISTOGRAM_NAME, use_seconds)); // Fallback default mpb = mpb.with_view(move |ins: &Instrument| { if ins.kind() == InstrumentKind::Histogram { diff --git a/crates/sdk-core/src/telemetry/metrics.rs b/crates/sdk-core/src/telemetry/metrics.rs index 98edbfed1..5a8a79f7e 100644 --- a/crates/sdk-core/src/telemetry/metrics.rs +++ b/crates/sdk-core/src/telemetry/metrics.rs @@ -9,6 +9,7 @@ use std::{ sync::{Arc, atomic::AtomicU64}, time::Duration, }; +pub(crate) use temporalio_common::telemetry::metrics::message_direction; use temporalio_common::{ protos::temporal::api::{enums::v1::WorkflowTaskFailedCause, failure::v1::Failure}, telemetry::metrics::{core::*, *}, @@ -40,12 +41,14 @@ struct Instruments { wf_task_sched_to_start_latency: HistogramDuration, wf_task_replay_latency: HistogramDuration, wf_task_execution_latency: HistogramDuration, + wf_payload_size: Histogram, act_poll_no_task: Counter, act_task_received_counter: Counter, act_execution_failed: Counter, act_sched_to_start_latency: HistogramDuration, act_exec_latency: HistogramDuration, act_exec_succeeded_latency: HistogramDuration, + act_payload_size: Histogram, la_execution_cancelled: Counter, la_execution_failed: Counter, la_exec_latency: HistogramDuration, @@ -118,7 +121,7 @@ impl MetricsContext { instruments.update_attributes(tm.get_default_attributes()); Self { instruments: Arc::new(instruments), - meter: self.meter.clone(), + meter: tm, in_memory_metrics: self.in_memory_metrics.clone(), } } @@ -182,6 +185,10 @@ impl MetricsContext { self.instruments.wf_task_replay_latency.records(dur); } + pub(crate) fn wf_payload_size(&self, size_bytes: u64) { + self.instruments.wf_payload_size.records(size_bytes); + } + /// An activity long poll timed out pub(crate) fn act_poll_timeout(&self) { self.instruments.act_poll_no_task.adds(1); @@ -213,6 +220,10 @@ impl MetricsContext { self.instruments.act_exec_latency.records(dur); } + pub(crate) fn act_payload_size(&self, size_bytes: u64) { + self.instruments.act_payload_size.records(size_bytes); + } + pub(crate) fn la_execution_cancelled(&self) { self.instruments.la_execution_cancelled.adds(1); } @@ -383,6 +394,12 @@ impl Instruments { unit: "duration".into(), description: "Histogram of workflow task execution (not replay) latencies".into(), }), + wf_payload_size: meter.histogram(MetricParameters { + name: WORKFLOW_PAYLOAD_SIZE_HISTOGRAM_NAME.into(), + unit: "By".into(), + description: "Histogram of workflow input and successful result payload sizes" + .into(), + }), act_poll_no_task: meter.counter(MetricParameters { name: "activity_poll_no_task".into(), description: "Count of activity task queue poll timeouts (no new task)".into(), @@ -414,6 +431,12 @@ impl Instruments { description: "Histogram of activity execution latencies for successful activities" .into(), }), + act_payload_size: meter.histogram(MetricParameters { + name: ACTIVITY_PAYLOAD_SIZE_HISTOGRAM_NAME.into(), + unit: "By".into(), + description: "Histogram of activity input and successful result payload sizes" + .into(), + }), la_execution_cancelled: meter.counter(MetricParameters { name: "local_activity_execution_cancelled".into(), description: "Count of local activity executions that were cancelled".into(), @@ -535,6 +558,8 @@ impl Instruments { .update_attributes(new_attributes.clone()); self.wf_task_execution_latency .update_attributes(new_attributes.clone()); + self.wf_payload_size + .update_attributes(new_attributes.clone()); self.act_poll_no_task .update_attributes(new_attributes.clone()); self.act_task_received_counter @@ -547,6 +572,8 @@ impl Instruments { .update_attributes(new_attributes.clone()); self.act_exec_succeeded_latency .update_attributes(new_attributes.clone()); + self.act_payload_size + .update_attributes(new_attributes.clone()); self.la_execution_cancelled .update_attributes(new_attributes.clone()); self.la_execution_failed @@ -737,6 +764,7 @@ pub(crate) fn activity_type(ty: String) -> MetricKeyValue { pub(crate) fn workflow_type(ty: String) -> MetricKeyValue { MetricKeyValue::new(KEY_WF_TYPE, ty) } + pub(crate) fn workflow_worker_type() -> MetricKeyValue { MetricKeyValue::new(KEY_WORKER_TYPE, "WorkflowWorker") } @@ -1187,7 +1215,7 @@ mod tests { a2.set(Arc::new(DummyCustomAttrs(2))).unwrap(); // Verify all metrics are created. This number will need to get updated any time a metric // is added. - let num_metrics = 35; + let num_metrics = 37; #[allow(clippy::needless_range_loop)] // Sorry clippy, this reads easier. for metric_num in 2..=num_metrics + 1 { let hole = assert_matches!(&events[metric_num], diff --git a/crates/sdk-core/src/worker/activities.rs b/crates/sdk-core/src/worker/activities.rs index e7bdc3d0d..bfccbe778 100644 --- a/crates/sdk-core/src/worker/activities.rs +++ b/crates/sdk-core/src/worker/activities.rs @@ -14,7 +14,8 @@ use crate::{ }, pollers::{BoxedActPoller, PermittedTqResp, TrackedPermittedTqResp, new_activity_task_poller}, telemetry::metrics::{ - MetricsContext, activity_type, eager, should_record_failure_metric, workflow_type, + MetricsContext, activity_type, eager, message_direction, should_record_failure_metric, + workflow_type, }, worker::{ ActivitySlotKind, PollError, @@ -36,16 +37,22 @@ use std::{ }, time::{Duration, Instant, SystemTime}, }; -use temporalio_common::protos::{ - coresdk::{ - ActivityHeartbeat, ActivitySlotInfo, - activity_result::{self as ar, activity_execution_result as aer}, - activity_task::{ActivityCancelReason, ActivityCancellationDetails, ActivityTask}, - }, - temporal::api::{ - failure::v1::{ApplicationFailureInfo, CanceledFailureInfo, Failure, failure::FailureInfo}, - workflowservice::v1::PollActivityTaskQueueResponse, +use temporalio_common::{ + protos::{ + coresdk::{ + ActivityHeartbeat, ActivitySlotInfo, + activity_result::{self as ar, activity_execution_result as aer}, + activity_task::{ActivityCancelReason, ActivityCancellationDetails, ActivityTask}, + }, + temporal::api::{ + common::v1::{Payload, Payloads}, + failure::v1::{ + ApplicationFailureInfo, CanceledFailureInfo, Failure, failure::FailureInfo, + }, + workflowservice::v1::PollActivityTaskQueueResponse, + }, }, + telemetry::metrics::{MESSAGE_DIRECTION_REQUEST, MESSAGE_DIRECTION_RESPONSE}, }; use tokio::{ join, @@ -325,9 +332,11 @@ impl WorkerActivityTasks { outstanding_activity_tasks.remove(&task_token) }; if let Some(act_info) = act_info { + let activity_type_name = act_info.base.activity_type.clone(); + let workflow_type_name = act_info.base.workflow_type.clone(); let act_metrics = self.metrics.with_new_attrs([ - activity_type(act_info.base.activity_type), - workflow_type(act_info.base.workflow_type), + activity_type(activity_type_name.clone()), + workflow_type(workflow_type_name.clone()), ]); Span::current().record("workflow_id", act_info.base.workflow_id); Span::current().record("run_id", act_info.base.workflow_run_id); @@ -352,6 +361,13 @@ impl WorkerActivityTasks { let maybe_net_err = match status { aer::Status::WillCompleteAsync(_) => None, aer::Status::Completed(ar::Success { result }) => { + self.metrics + .with_new_attrs([ + activity_type(activity_type_name.clone()), + workflow_type(workflow_type_name.clone()), + message_direction(MESSAGE_DIRECTION_RESPONSE), + ]) + .act_payload_size(maybe_payload_size(result.as_ref())); if let Some(sched_time) = act_info .base .scheduled_time @@ -557,6 +573,13 @@ where eager(is_eager), ]) .act_task_received(); + self.metrics + .with_new_attrs([ + activity_type(activity_type_name.to_owned()), + workflow_type(wf_type.name.clone()), + message_direction(MESSAGE_DIRECTION_REQUEST), + ]) + .act_payload_size(payloads_size(task.resp.input.as_ref())); } } // There could be an else statement here but since the response @@ -733,6 +756,25 @@ impl ActivitiesFromWFTsHandle { } } +fn payloads_size(payloads: Option<&Payloads>) -> u64 { + payloads + .map(|payloads| payloads.payloads.iter().map(payload_size).sum()) + .unwrap_or_default() +} + +fn maybe_payload_size(payload: Option<&Payload>) -> u64 { + payload.map(payload_size).unwrap_or_default() +} + +fn payload_size(payload: &Payload) -> u64 { + let metadata_size: usize = payload + .metadata + .iter() + .map(|(key, value)| key.len() + value.len()) + .sum(); + (payload.data.len() + metadata_size) as u64 +} + fn worker_shutdown_failure() -> Failure { Failure { message: "Worker is shutting down and this activity did not complete in time".to_string(), @@ -762,6 +804,19 @@ mod tests { use crossbeam_utils::atomic::AtomicCell; use temporalio_common::protos::coresdk::activity_result::ActivityExecutionResult; + #[test] + fn payload_size_counts_data_and_metadata_bytes() { + let payload = Payload { + metadata: HashMap::from([("encoding".to_owned(), b"json/plain".to_vec())]), + data: b"{\"job_id\":\"abc\"}".to_vec(), + ..Default::default() + }; + + let expected_size = payload.data.len() + "encoding".len() + b"json/plain".len(); + + assert_eq!(payload_size(&payload), expected_size as u64); + } + #[tokio::test] async fn per_worker_ratelimit() { let mut mock_client = mock_worker_client(); diff --git a/crates/sdk-core/src/worker/workflow/mod.rs b/crates/sdk-core/src/worker/workflow/mod.rs index 9c4b8bad9..9884a35ed 100644 --- a/crates/sdk-core/src/worker/workflow/mod.rs +++ b/crates/sdk-core/src/worker/workflow/mod.rs @@ -75,7 +75,7 @@ use temporalio_common::{ temporal::api::{ command::v1::{Command as ProtoCommand, Command, command::Attributes}, common::v1::{ - Memo, MeteringMetadata, RetryPolicy, SearchAttributes, WorkflowExecution, + Memo, MeteringMetadata, Payload, RetryPolicy, SearchAttributes, WorkflowExecution, }, enums::v1::{VersioningBehavior, WorkflowTaskFailedCause}, failure::v1::{ApplicationFailureInfo, failure::FailureInfo}, @@ -86,7 +86,10 @@ use temporalio_common::{ workflowservice::v1::{PollActivityTaskQueueResponse, get_system_info_response}, }, }, - telemetry::set_trace_subscriber_for_current_thread, + telemetry::{ + metrics::{MESSAGE_DIRECTION_REQUEST, MESSAGE_DIRECTION_RESPONSE}, + set_trace_subscriber_for_current_thread, + }, }; use tokio::{ sync::{ @@ -287,6 +290,7 @@ impl Workflows { match al { ActivationOrAuto::LangActivation(mut act) | ActivationOrAuto::ReadyForQueries(mut act) => { + record_workflow_activation_payload_sizes(&self.metrics, &act); prepare_to_ship_activation(&mut act); debug!(activation=%act, "Sending activation to lang"); break Ok(act); @@ -370,6 +374,7 @@ impl Workflows { } else { 0 }; + record_workflow_completion_payload_sizes(&run_metrics, &commands); let mut maybe_record_terminal_metric = detect_terminal_command(&commands); let mut completion = WorkflowTaskCompletion { task_token: task_token.clone(), @@ -1709,6 +1714,52 @@ fn prepare_to_ship_activation(wfa: &mut WorkflowActivation) { }); } +fn record_workflow_activation_payload_sizes( + metric_context: &MetricsContext, + wfa: &WorkflowActivation, +) { + for job in &wfa.jobs { + if let Some(workflow_activation_job::Variant::InitializeWorkflow(init)) = &job.variant { + metric_context + .with_new_attrs([ + metrics::workflow_type(init.workflow_type.clone()), + metrics::message_direction(MESSAGE_DIRECTION_REQUEST), + ]) + .wf_payload_size(payloads_size(&init.arguments)); + } + } +} + +fn record_workflow_completion_payload_sizes(metric_context: &MetricsContext, commands: &[Command]) { + for command in commands { + if let Some(Attributes::CompleteWorkflowExecutionCommandAttributes(complete)) = + &command.attributes + { + metric_context + .with_new_attrs([metrics::message_direction(MESSAGE_DIRECTION_RESPONSE)]) + .wf_payload_size( + complete + .result + .as_ref() + .map_or(0, |payloads| payloads_size(&payloads.payloads)), + ); + } + } +} + +fn payloads_size(payloads: &[Payload]) -> u64 { + payloads.iter().map(payload_size).sum() +} + +fn payload_size(payload: &Payload) -> u64 { + let metadata_size: usize = payload + .metadata + .iter() + .map(|(key, value)| key.len() + value.len()) + .sum(); + (payload.data.len() + metadata_size) as u64 +} + fn make_grpc_message_too_large_failure() -> Failure { Failure { failure: Some( diff --git a/crates/sdk-core/tests/integ_tests/metrics_tests.rs b/crates/sdk-core/tests/integ_tests/metrics_tests.rs index 782549977..7446d13b1 100644 --- a/crates/sdk-core/tests/integ_tests/metrics_tests.rs +++ b/crates/sdk-core/tests/integ_tests/metrics_tests.rs @@ -64,8 +64,8 @@ use temporalio_common::{ }; use temporalio_macros::{activities, workflow, workflow_methods}; use temporalio_sdk::{ - ActivityOptions, CancellableFuture, LocalActivityOptions, NexusOperationOptions, - WorkflowContext, WorkflowResult, + ActivityOptions, CancellableFuture, ChildWorkflowOptions, LocalActivityOptions, + NexusOperationOptions, WorkflowContext, WorkflowResult, activities::{ActivityContext, ActivityError}, }; use temporalio_sdk_core::{ @@ -87,6 +87,16 @@ pub(crate) async fn get_text(endpoint: String) -> String { reqwest::get(endpoint).await.unwrap().text().await.unwrap() } +fn metric_value(body: &str, metric_prefix: &str) -> f64 { + body.lines() + .find(|line| line.starts_with(metric_prefix)) + .and_then(|line| line.rsplit_once(' ')) + .unwrap_or_else(|| panic!("missing metric line starting with {metric_prefix}")) + .1 + .parse() + .unwrap_or_else(|err| panic!("metric value for {metric_prefix} should parse: {err}")) +} + #[rstest::rstest] #[tokio::test] async fn prometheus_metrics_exported( @@ -939,6 +949,40 @@ async fn activity_metrics() { namespace=\"{NAMESPACE}\",service_name=\"temporal-core-sdk\",\ task_queue=\"{task_queue}\",workflow_type=\"{wf_type}\"}} 1" ))); + assert!(body.contains(&format!( + "temporal_activity_payload_size_count{{activity_type=\"pass_fail_act\",\ + message_direction=\"request\",namespace=\"{NAMESPACE}\",\ + service_name=\"temporal-core-sdk\",task_queue=\"{task_queue}\",\ + workflow_type=\"{wf_type}\"}} 2" + ))); + assert!(body.contains(&format!( + "temporal_activity_payload_size_count{{activity_type=\"pass_fail_act\",\ + message_direction=\"response\",namespace=\"{NAMESPACE}\",\ + service_name=\"temporal-core-sdk\",task_queue=\"{task_queue}\",\ + workflow_type=\"{wf_type}\"}} 1" + ))); + assert!( + metric_value( + &body, + &format!( + "temporal_activity_payload_size_sum{{activity_type=\"pass_fail_act\",\ + message_direction=\"request\",namespace=\"{NAMESPACE}\",\ + service_name=\"temporal-core-sdk\",task_queue=\"{task_queue}\",\ + workflow_type=\"{wf_type}\"}}" + ) + ) > 0.0 + ); + assert!( + metric_value( + &body, + &format!( + "temporal_activity_payload_size_sum{{activity_type=\"pass_fail_act\",\ + message_direction=\"response\",namespace=\"{NAMESPACE}\",\ + service_name=\"temporal-core-sdk\",task_queue=\"{task_queue}\",\ + workflow_type=\"{wf_type}\"}}" + ) + ) > 0.0 + ); assert!(body.contains(&format!( "temporal_local_activity_total{{activity_type=\"pass_fail_act\",namespace=\"{NAMESPACE}\",\ @@ -971,6 +1015,100 @@ async fn activity_metrics() { ))); } +#[tokio::test] +async fn payload_size_metrics() { + let (telemopts, addr, _aborter) = prom_metrics(None); + let rt = CoreRuntime::new_assume_tokio(get_integ_runtime_options(telemopts)).unwrap(); + let wf_name = "payload_size_metrics"; + let mut starter = CoreWfStarter::new_with_runtime(wf_name, rt); + starter.sdk_config.task_types = WorkerTaskTypes::workflow_only(); + let mut worker = starter.worker().await; + + #[workflow] + #[derive(Default)] + struct PayloadMetricsChild; + + #[workflow_methods] + impl PayloadMetricsChild { + #[run] + async fn run(_ctx: &mut WorkflowContext, input: String) -> WorkflowResult { + Ok(format!("child-result:{input}")) + } + } + + #[workflow] + #[derive(Default)] + struct PayloadMetricsParent; + + #[workflow_methods] + impl PayloadMetricsParent { + #[run] + async fn run(ctx: &mut WorkflowContext, input: String) -> WorkflowResult { + let started = ctx + .child_workflow( + PayloadMetricsChild::run, + "child-input".to_string(), + ChildWorkflowOptions { + workflow_id: "payload-size-child".to_string(), + ..Default::default() + }, + ) + .await + .expect("child workflow should start"); + let child_result = started.result().await?; + Ok(format!("parent-result:{input}:{child_result}")) + } + } + + worker.register_workflow::(); + worker.register_workflow::(); + let task_queue = starter.get_task_queue().to_owned(); + let workflow_id = wf_name.to_owned(); + worker + .submit_workflow( + PayloadMetricsParent::run, + "parent-input".to_string(), + WorkflowStartOptions::new(task_queue.clone(), workflow_id).build(), + ) + .await + .unwrap(); + worker.run_until_done().await.unwrap(); + + let body = get_text(format!("http://{addr}/metrics")).await; + let parent_wf_type = PayloadMetricsParent::name(); + let child_wf_type = PayloadMetricsChild::name(); + assert!(body.contains(&format!( + "temporal_workflow_payload_size_count{{message_direction=\"request\",\ + namespace=\"{NAMESPACE}\",service_name=\"temporal-core-sdk\",\ + task_queue=\"{task_queue}\",workflow_type=\"{parent_wf_type}\"}} 1" + ))); + assert!(body.contains(&format!( + "temporal_workflow_payload_size_count{{message_direction=\"response\",\ + namespace=\"{NAMESPACE}\",service_name=\"temporal-core-sdk\",\ + task_queue=\"{task_queue}\",workflow_type=\"{parent_wf_type}\"}} 1" + ))); + assert!(body.contains(&format!( + "temporal_workflow_payload_size_count{{message_direction=\"request\",\ + namespace=\"{NAMESPACE}\",service_name=\"temporal-core-sdk\",\ + task_queue=\"{task_queue}\",workflow_type=\"{child_wf_type}\"}} 1" + ))); + assert!(body.contains(&format!( + "temporal_workflow_payload_size_count{{message_direction=\"response\",\ + namespace=\"{NAMESPACE}\",service_name=\"temporal-core-sdk\",\ + task_queue=\"{task_queue}\",workflow_type=\"{child_wf_type}\"}} 1" + ))); + assert!(body.contains(&format!( + "temporal_rpc_message_size_count{{message_direction=\"request\",\ + namespace=\"{NAMESPACE}\",operation=\"RespondWorkflowTaskCompleted\",\ + service_name=\"temporal-core-sdk\"}} " + ))); + assert!(body.contains(&format!( + "temporal_rpc_message_size_count{{message_direction=\"response\",\ + namespace=\"{NAMESPACE}\",operation=\"RespondWorkflowTaskCompleted\",\ + service_name=\"temporal-core-sdk\"}} " + ))); +} + #[tokio::test] async fn nexus_metrics() { let (telemopts, addr, _aborter) = prom_metrics(None);