Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
222 changes: 193 additions & 29 deletions datadog-sidecar-ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use datadog_sidecar::config::LogMethod;
use datadog_sidecar::crashtracker::crashtracker_unix_socket_path;
use datadog_sidecar::one_way_shared_memory::{OneWayShmReader, ReaderOpener};
use datadog_sidecar::service::agent_info::AgentInfoReader;
use datadog_sidecar::service::telemetry::InternalTelemetryAction;
use datadog_sidecar::service::{
blocking::{self, SidecarTransport},
DynamicInstrumentationConfigState, InstanceId, QueueId, RuntimeMetadata,
Expand All @@ -39,6 +40,8 @@ use libdd_common_ffi::{self as ffi, MaybeError};
#[cfg(windows)]
use libdd_crashtracker_ffi::Metadata;
use libdd_dogstatsd_client::DogStatsDActionOwned;
use libdd_telemetry::data::metrics::{MetricNamespace, MetricType};
use libdd_telemetry::metrics::MetricContext;
use libdd_telemetry::{
data::{self, Dependency, Integration},
worker::{LifecycleAction, LogIdentifier, TelemetryActions},
Expand Down Expand Up @@ -683,6 +686,60 @@ fn char_slice_to_string(slice: CharSlice) -> Result<String, String> {
.map_err(|e| format!("Failed to convert CharSlice to String: {e}"))
}

struct TelemetryContext {
instance_id: InstanceId,
service_name: String,
env_name: String,
}

impl TelemetryContext {
fn from_ffi(
session_id_ffi: CharSlice,
runtime_id_ffi: CharSlice,
service_name_ffi: CharSlice,
env_name_ffi: CharSlice,
) -> Result<Self, String> {
if session_id_ffi.is_empty() {
return Err("Null or empty session_id".into());
}
if runtime_id_ffi.is_empty() {
return Err("Null or empty runtime_id".into());
}
if service_name_ffi.is_empty() {
return Err("Null or empty service_name".into());
}
if env_name_ffi.is_empty() {
return Err("Null or empty env_name".into());
}

Ok(Self {
instance_id: InstanceId::new(
char_slice_to_string(session_id_ffi)?,
char_slice_to_string(runtime_id_ffi)?,
),
service_name: char_slice_to_string(service_name_ffi)?,
env_name: char_slice_to_string(env_name_ffi)?,
})
}

/// Sends a telemetry action through the internal telemetry channel
fn send_action(self, action: InternalTelemetryAction) -> Result<(), String> {
let sender = get_telemetry_action_sender()
.map_err(|e| format!("Failed to get telemetry action sender: {e}"))?;

let msg = InternalTelemetryActions {
instance_id: self.instance_id,
service_name: self.service_name,
env_name: self.env_name,
actions: vec![action],
};

sender
.try_send(msg)
.map_err(|e| format!("Failed to send telemetry action: {e}"))
}
}

#[allow(clippy::too_many_arguments)]
fn ddog_sidecar_enqueue_telemetry_log_impl(
session_id_ffi: CharSlice,
Expand All @@ -696,31 +753,19 @@ fn ddog_sidecar_enqueue_telemetry_log_impl(
tags_ffi: Option<NonNull<CharSlice>>,
is_sensitive: bool,
) -> Result<(), String> {
if session_id_ffi.is_empty()
|| runtime_id_ffi.is_empty()
|| service_name_ffi.is_empty()
|| env_name_ffi.is_empty()
|| identifier_ffi.is_empty()
|| message_ffi.is_empty()
{
if identifier_ffi.is_empty() || message_ffi.is_empty() {
return Err("Null or empty required arguments".into());
}

let sender = match get_telemetry_action_sender() {
Ok(s) => s,
Err(e) => {
return Err(format!("Failed to get telemetry action sender: {e}"));
}
};
let ctx = TelemetryContext::from_ffi(
session_id_ffi,
runtime_id_ffi,
service_name_ffi,
env_name_ffi,
)?;

let instance_id = InstanceId::new(
char_slice_to_string(session_id_ffi)?,
char_slice_to_string(runtime_id_ffi)?,
);
let service_name: String = char_slice_to_string(service_name_ffi)?;
let env_name: String = char_slice_to_string(env_name_ffi)?;
let identifier: String = char_slice_to_string(identifier_ffi)?;
let message: String = char_slice_to_string(message_ffi)?;
let identifier = char_slice_to_string(identifier_ffi)?;
let message = char_slice_to_string(message_ffi)?;

let stack_trace = stack_trace_ffi
.map(|s| char_slice_to_string(*unsafe { s.as_ref() }))
Expand All @@ -746,17 +791,136 @@ fn ddog_sidecar_enqueue_telemetry_log_impl(
};
let log_action = TelemetryActions::AddLog((log_id, log_data));

let msg = InternalTelemetryActions {
instance_id,
service_name,
env_name,
actions: vec![log_action],
ctx.send_action(InternalTelemetryAction::TelemetryAction(log_action))
}

/// Enqueues a telemetry point to be processed internally.
///
/// # Safety
/// Pointers must be valid, strings must be null-terminated if not null.
#[no_mangle]
pub unsafe extern "C" fn ddog_sidecar_enqueue_telemetry_point(
session_id_ffi: CharSlice,
runtime_id_ffi: CharSlice,
service_name_ffi: CharSlice,
env_name_ffi: CharSlice,
metric_name_ffi: CharSlice,
value: f64,
tags_ffi: Option<NonNull<CharSlice>>,
) -> MaybeError {
try_c!(ddog_sidecar_enqueue_telemetry_point_impl(
session_id_ffi,
runtime_id_ffi,
service_name_ffi,
env_name_ffi,
metric_name_ffi,
value,
tags_ffi,
));
MaybeError::None
}

fn ddog_sidecar_enqueue_telemetry_point_impl(
session_id_ffi: CharSlice,
runtime_id_ffi: CharSlice,
service_name_ffi: CharSlice,
env_name_ffi: CharSlice,
metric_name_ffi: CharSlice,
value: f64,
tags_ffi: Option<NonNull<CharSlice>>,
) -> Result<(), String> {
if metric_name_ffi.is_empty() {
return Err("Null or empty metric_name".into());
}

let ctx = TelemetryContext::from_ffi(
session_id_ffi,
runtime_id_ffi,
service_name_ffi,
env_name_ffi,
)?;

let metric_name = char_slice_to_string(metric_name_ffi)?;

fn get_tags(tags_slice: CharSlice) -> Result<Vec<Tag>, String> {
let tags = char_slice_to_string(tags_slice)?;
let (tags, error) = libdd_common::tag::parse_tags(tags.as_str());
if let Some(error) = error {
return Err(error.to_string());
}
Ok(tags)
}

let tags = match tags_ffi {
Some(tags_slice) => get_tags(*unsafe { tags_slice.as_ref() })?,
None => Vec::default(),
};

match sender.try_send(msg) {
Ok(_) => Ok(()),
Err(err) => Err(format!("Failed to send telemetry action: {err}")),
ctx.send_action(InternalTelemetryAction::AddMetricPoint((
value,
metric_name,
tags,
)))
}

/// Registers a telemetry metric to be processed internally.
///
/// # Safety
/// Pointers must be valid, strings must be null-terminated if not null.
#[no_mangle]
pub unsafe extern "C" fn ddog_sidecar_enqueue_telemetry_metric(
session_id_ffi: CharSlice,
runtime_id_ffi: CharSlice,
service_name_ffi: CharSlice,
env_name_ffi: CharSlice,
metric_name_ffi: CharSlice,
metric_type: MetricType,
metric_namespace: MetricNamespace,
) -> MaybeError {
try_c!(ddog_sidecar_enqueue_telemetry_metric_impl(
session_id_ffi,
runtime_id_ffi,
service_name_ffi,
env_name_ffi,
metric_name_ffi,
metric_type,
metric_namespace,
));
MaybeError::None
}

#[allow(clippy::too_many_arguments)]
fn ddog_sidecar_enqueue_telemetry_metric_impl(
session_id_ffi: CharSlice,
runtime_id_ffi: CharSlice,
service_name_ffi: CharSlice,
env_name_ffi: CharSlice,
metric_name_ffi: CharSlice,
metric_type: MetricType,
metric_namespace: MetricNamespace,
) -> Result<(), String> {
if metric_name_ffi.is_empty() {
return Err("Null or empty metric_name".into());
}

let ctx = TelemetryContext::from_ffi(
session_id_ffi,
runtime_id_ffi,
service_name_ffi,
env_name_ffi,
)?;

let metric_name = char_slice_to_string(metric_name_ffi)?;

ctx.send_action(InternalTelemetryAction::RegisterTelemetryMetric(
MetricContext {
name: metric_name,
tags: Vec::default(),
metric_type,
common: true,
namespace: metric_namespace,
},
))
}

/// Sends a trace to the sidecar via shared memory.
Expand Down
42 changes: 34 additions & 8 deletions datadog-sidecar/src/service/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,12 +364,19 @@ pub fn path_for_telemetry(service: &str, env: &str) -> CString {
CString::new(path).unwrap()
}

#[derive(Debug)]
pub enum InternalTelemetryAction {
TelemetryAction(TelemetryActions),
RegisterTelemetryMetric(MetricContext),
AddMetricPoint((f64, String, Vec<Tag>)),
}

#[derive(Debug)]
pub struct InternalTelemetryActions {
pub instance_id: InstanceId,
pub service_name: String,
pub env_name: String,
pub actions: Vec<TelemetryActions>,
pub actions: Vec<InternalTelemetryAction>,
}

pub fn get_telemetry_action_sender() -> Result<mpsc::Sender<InternalTelemetryActions>> {
Expand Down Expand Up @@ -398,14 +405,33 @@ pub(crate) async fn telemetry_action_receiver_task(sidecar: SidecarServer) {
);
let client = telemetry_client.lock_or_panic().worker.clone();

for action in actions.actions {
let action_str = format!("{action:?}");
match client.send_msg(action).await {
Ok(_) => {
debug!("Sent telemetry action to TelemetryWorker: {action_str}");
for it_action in actions.actions {
match it_action {
InternalTelemetryAction::TelemetryAction(action) => {
let action_str = format!("{action:?}");
match client.send_msg(action).await {
Ok(_) => {
debug!("Sent telemetry action to TelemetryWorker: {action_str}");
}
Err(e) => {
warn!("Failed to send telemetry action {action_str} to TelemetryWorker: {e}");
}
}
}
Err(e) => {
warn!("Failed to send telemetry action {action_str} to TelemetryWorker: {e}");
InternalTelemetryAction::RegisterTelemetryMetric(metric) => {
debug!("Registered telemetry metric: {metric:?}");
telemetry_client.lock_or_panic().register_metric(metric);
}
InternalTelemetryAction::AddMetricPoint((value, name, tags)) => {
let actions_point = telemetry_client
.lock_or_panic()
.to_telemetry_point((name, value, tags));
match client.send_msg(actions_point).await {
Ok(_) => {}
Err(e) => {
warn!("Failed to send telemetry point to TelemetryWorker: {e}");
}
}
}
}
}
Expand Down
Loading