Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,9 @@ use examples_support::load_generator::models::Generator;
use examples_support::load_generator::{generator::ControlledRateLoadGenerator, models::StopType};

use opentelemetry_api::global;
use opentelemetry_api::metrics::MetricsError;
use opentelemetry_sdk::metrics::reader::DefaultTemporalitySelector;
use opentelemetry_sdk::metrics::{MeterProvider, PeriodicReader};
use opentelemetry_sdk::runtime;
use opentelemetry_stdout::MetricsExporterBuilder;
use rand::Rng;
use rust_decimal::prelude::FromPrimitive;
use talos_metrics::opentel::aggregation_selector::CustomHistogramSelector;
use talos_metrics::opentel::printer::MetricsToStringPrinter;
use talos_metrics::opentel::scaling::ScalingConfig;
use talos_metrics::otel::init_otel_metrics;
use talos_rdkafka_utils::kafka_config::KafkaConfig;
use tokio::{signal, task::JoinHandle, try_join};

Expand All @@ -31,8 +24,6 @@ struct LaunchParams {
target_rate: f32,
threads: u64,
accounts: u64,
scaling_config: HashMap<String, f32>,
metric_print_raw: bool,
}

/// Connects to database, to kafka certification topic as talos agent and as cohort replicator.
Expand Down Expand Up @@ -118,23 +109,7 @@ async fn main() -> Result<(), String> {

let db_config = DatabaseConfig::from_env(Some("COHORT"))?;

let printer = MetricsToStringPrinter::new(params.threads, params.metric_print_raw, ScalingConfig { ratios: params.scaling_config });
let (tx_metrics, rx_metrics) = tokio::sync::watch::channel("".to_string());
let exporter = MetricsExporterBuilder::default()
.with_aggregation_selector(CustomHistogramSelector::new_with_4k_buckets()?)
.with_temporality_selector(DefaultTemporalitySelector::new())
.with_encoder(move |_writer, data| {
let report = printer.print(&data).map_err(MetricsError::Other)?;
tx_metrics.send(report).map_err(|e| MetricsError::Other(e.to_string()))?;
Ok(())
})
.build();

let reader = PeriodicReader::builder(exporter, runtime::Tokio).build();

let meter_provider = MeterProvider::builder().with_reader(reader).build();
let meter_provider_copy = meter_provider.clone();
global::set_meter_provider(meter_provider);
let _ = init_otel_metrics(sdk_config.otel_telemetry.grpc_endpoint.clone());

let meter = global::meter("banking_cohort");
let meter = Arc::new(meter);
Expand Down Expand Up @@ -178,9 +153,6 @@ async fn main() -> Result<(), String> {
}

shutdown_tracer_provider();
let _ = meter_provider_copy.shutdown();
let report = rx_metrics.borrow();
log::warn!("{}", *report);
Ok(())
}

Expand All @@ -200,8 +172,6 @@ async fn get_params() -> Result<LaunchParams, String> {
let mut accounts: Option<u64> = None;
let mut target_rate: Option<f32> = None;
let mut stop_type: Option<StopType> = None;
let mut scaling_config: Option<HashMap<String, f32>> = None;
let mut metric_print_raw = None;

if args.len() >= 3 {
let mut i = 1;
Expand All @@ -228,34 +198,6 @@ async fn get_params() -> Result<LaunchParams, String> {
let count: u64 = param_value.parse().unwrap();
stop_type = Some(StopType::LimitGeneratedTransactions { count })
}
} else if param_name.eq("--metric_print_raw") {
metric_print_raw = Some(true);
} else if param_name.eq("--metric_scaling") {
let param_value = &args[i + 1];
let mut cfg: HashMap<String, f32> = HashMap::new();
for spec in param_value.replace(' ', "").split(',') {
if let Some(i) = spec.find('=') {
let metric: String = spec[..i].into();
let scale_factor_raw: String = spec[i + 1..].into();
let scale_factor = match scale_factor_raw.parse::<f32>() {
Err(e) => {
log::error!(
"Unable to parse scaling factor for metric '{}'. No scaling will be applied. Pasing: '{}'. Error: {}.",
metric,
scale_factor_raw,
e
);
1_f32
}
Ok(scale_factor) => scale_factor,
};
cfg.insert(metric, scale_factor);
}
}

if !cfg.is_empty() {
scaling_config = Some(cfg);
}
}

i += 2;
Expand All @@ -271,8 +213,6 @@ async fn get_params() -> Result<LaunchParams, String> {
stop_type,
threads: threads.unwrap(),
accounts,
scaling_config: scaling_config.unwrap_or_default(),
metric_print_raw: metric_print_raw.is_some(),
})
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use talos_common_utils::env_var;
use talos_common_utils::{env_var, env_var_with_defaults};
use talos_messenger_actions::messenger_with_kafka::{messenger_with_kafka, Configuration};
use talos_messenger_core::utlis::{create_whitelist_actions_from_str, ActionsParserConfig};
use talos_rdkafka_utils::kafka_config::KafkaConfig;
Expand Down Expand Up @@ -31,6 +31,7 @@ async fn main() {
};

let actions_from_env = env_var!("TALOS_MESSENGER_ACTIONS_WHITELIST");
let otel_grpc_endpoint = env_var_with_defaults!("OTEL_EXPORTER_OTLP_ENDPOINT", Option::<String>);
let allowed_actions = create_whitelist_actions_from_str(&actions_from_env, &ActionsParserConfig::default());

let config = Configuration {
Expand All @@ -40,6 +41,7 @@ async fn main() {
channel_buffers: None,
commit_size: Some(2_000),
commit_frequency: None,
otel_grpc_endpoint,
};

messenger_with_kafka(config).await.unwrap();
Expand Down
1 change: 1 addition & 0 deletions packages/talos_certifier/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ tracing-subscriber = { version = "0.3.19", features = [

# internal crates
talos_suffix = { path = "../talos_suffix", version = "0.3.17" }
talos_metrics = { path = "../talos_metrics", version = "0.3.17" }
talos_common_utils = { path = "../talos_common_utils", version = "0.3.17" }

uuid = { version = "1.4.1", features = ["v4"] }
Expand Down
1 change: 0 additions & 1 deletion packages/talos_certifier/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ pub mod core;
pub mod errors;
pub mod healthcheck;
pub mod model;
pub mod otel;
pub mod ports;
pub mod services;
pub mod talos_certifier_service;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use crate::{self as Adapters, KafkaConsumer};
use std::sync::{atomic::AtomicI64, Arc};
use talos_certifier::core::SystemService;
use talos_certifier::model::{CandidateMessage, DecisionMessage};
use talos_certifier::otel::init_otel_metrics;
use talos_certifier::ports::DecisionStore;

use talos_certifier::services::{CertifierServiceConfig, MessageReceiverServiceConfig};
Expand All @@ -16,6 +15,7 @@ use talos_certifier::{
services::{CertifierService, DecisionOutboxService, MessageReceiverService},
talos_certifier_service::{TalosCertifierService, TalosCertifierServiceBuilder},
};
use talos_metrics::otel::init_otel_metrics;
use talos_rdkafka_utils::kafka_config::KafkaConfig;
use talos_suffix::core::SuffixConfig;
use tokio::sync::{broadcast, mpsc};
Expand Down
9 changes: 9 additions & 0 deletions packages/talos_messenger_actions/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,24 @@ toml_parser = { workspace = true }
toml_writer = { workspace = true }
# *** End - Adding to fix napi build error on GH actions.


# Otel packages
opentelemetry = { version = "0.28.0", default-features = false, features = [
"trace",
] }

# Talos packages
talos_certifier = { path = "../talos_certifier", version = "0.3.17" }
talos_suffix = { path = "../talos_suffix", version = "0.3.17" }
talos_certifier_adapters = { path = "../talos_certifier_adapters", version = "0.3.17" }
talos_common_utils = { path = "../talos_common_utils", version = "0.3.17" }
talos_rdkafka_utils = { path = "../talos_rdkafka_utils", version = "0.3.17" }
talos_messenger_core = { path = "../talos_messenger_core", version = "0.3.17" }
talos_metrics = { path = "../talos_metrics", version = "0.3.17" }


[dev-dependencies]
mockall = { version = "0.11.3" }
tokio-test = { version = "0.4.2" }
rand = { version = "0.8.5" }
opentelemetry_sdk = { version = "0.28.0", features = ["metrics", "rt-tokio"] }
1 change: 1 addition & 0 deletions packages/talos_messenger_actions/src/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ pub mod context;
pub mod models;
pub mod producer;
pub mod service;
mod utils;
Loading
Loading