Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ The OTLP exporter is the default, or can be explicitly selected with `--exporter
| --otlp-exporter-tls-ca-pem | | |
| --otlp-exporter-tls-skip-verify | | |
| --otlp-exporter-request-timeout | 5s | |
| --otlp-exporter-pool-idle-timeout | 30s | |
| --otlp-exporter-pool-max-idle-per-host | 100 | |
| --otlp-exporter-retry-initial-backoff | (uses global exporter default) | |
| --otlp-exporter-retry-max-backoff | (uses global exporter default) | |
| --otlp-exporter-retry-max-elapsed-time | (uses global exporter default) | |
Expand Down
10 changes: 9 additions & 1 deletion src/exporters/awsemf/cloudwatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,16 @@ impl Cloudwatch {
HeaderValue::from_static("application/x-amz-json-1.1"),
);

use crate::exporters::http::client::{
DEFAULT_POOL_IDLE_TIMEOUT, DEFAULT_POOL_MAX_IDLE_PER_HOST,
};
// Use the existing HTTP client builder
let client = build_hyper_client(Config::default(), false)?;
let client = build_hyper_client(
Config::default(),
false,
DEFAULT_POOL_IDLE_TIMEOUT,
DEFAULT_POOL_MAX_IDLE_PER_HOST,
)?;

Ok(Self {
endpoint,
Expand Down
11 changes: 10 additions & 1 deletion src/exporters/awsemf/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,16 @@ impl AwsEmfExporterBuilder {
flush_receiver: Option<FlushReceiver>,
aws_creds_provider: AwsCredsProvider,
) -> Result<ExporterType<'a, ResourceMetrics, DefaultHTTPAcknowledger>, BoxError> {
let client = Client::build(tls::Config::default(), Protocol::Http, Default::default())?;
use crate::exporters::http::client::{
DEFAULT_POOL_IDLE_TIMEOUT, DEFAULT_POOL_MAX_IDLE_PER_HOST,
};
let client = Client::build(
tls::Config::default(),
Protocol::Http,
Default::default(),
DEFAULT_POOL_IDLE_TIMEOUT,
DEFAULT_POOL_MAX_IDLE_PER_HOST,
)?;
let dim_filter = Arc::new(DimensionFilter::new(
self.config.include_dimensions.clone(),
self.config.exclude_dimensions.clone(),
Expand Down
11 changes: 10 additions & 1 deletion src/exporters/clickhouse/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,16 @@ impl ClickhouseExporterBuilder {
Resource: Clone + Send + Sync + 'static,
Transformer: TransformPayload<Resource>,
{
let client = Client::build(tls::Config::default(), Protocol::Http, Default::default())?;
use crate::exporters::http::client::{
DEFAULT_POOL_IDLE_TIMEOUT, DEFAULT_POOL_MAX_IDLE_PER_HOST,
};
let client = Client::build(
tls::Config::default(),
Protocol::Http,
Default::default(),
DEFAULT_POOL_IDLE_TIMEOUT,
DEFAULT_POOL_MAX_IDLE_PER_HOST,
)?;

let transformer = Transformer::new(self.config.compression.clone(), self.config.use_json);

Expand Down
11 changes: 10 additions & 1 deletion src/exporters/datadog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,16 @@ impl DatadogExporterBuilder {
rx: BoundedReceiver<Vec<Message<ResourceSpans>>>,
flush_receiver: Option<FlushReceiver>,
) -> Result<ExporterType<'a, ResourceSpans>, BoxError> {
let client = Client::build(tls::Config::default(), Protocol::Http, Default::default())?;
use crate::exporters::http::client::{
DEFAULT_POOL_IDLE_TIMEOUT, DEFAULT_POOL_MAX_IDLE_PER_HOST,
};
let client = Client::build(
tls::Config::default(),
Protocol::Http,
Default::default(),
DEFAULT_POOL_IDLE_TIMEOUT,
DEFAULT_POOL_MAX_IDLE_PER_HOST,
)?;

let transformer = Transformer::new(self.environment, self.hostname);

Expand Down
26 changes: 21 additions & 5 deletions src/exporters/http/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ use tonic::Status;
use tower::{BoxError, Service};
use tracing::warn;

// Default connection pool settings
pub const DEFAULT_POOL_IDLE_TIMEOUT: Duration = Duration::from_secs(30);
pub const DEFAULT_POOL_MAX_IDLE_PER_HOST: usize = 100;

#[derive(Clone, Debug, PartialEq)]
pub enum Protocol {
Grpc,
Expand Down Expand Up @@ -78,6 +82,8 @@ pub trait ResponseDecode<T> {
pub(crate) fn build_hyper_client<ReqBody>(
tls_config: Config,
http2_only: bool,
pool_idle_timeout: Duration,
pool_max_idle_per_host: usize,
) -> Result<HyperClient<HttpsConnector<HttpConnector>, ReqBody>, BoxError>
where
ReqBody: Body + Send,
Expand All @@ -92,9 +98,8 @@ where
.build();

let client = hyper_util::client::legacy::Client::builder(TokioExecutor::new())
// todo: make configurable
.pool_idle_timeout(Duration::from_secs(30))
.pool_max_idle_per_host(100)
.pool_idle_timeout(pool_idle_timeout)
.pool_max_idle_per_host(pool_max_idle_per_host)
.http2_only(http2_only)
.timer(TokioTimer::new())
.build::<_, ReqBody>(https);
Expand All @@ -107,8 +112,19 @@ where
ReqBody: Body + Send,
<ReqBody as Body>::Data: Send,
{
pub fn build(tls_config: Config, protocol: Protocol, decoder: Dec) -> Result<Self, BoxError> {
let inner = build_hyper_client(tls_config, protocol == Protocol::Grpc)?;
pub fn build(
tls_config: Config,
protocol: Protocol,
decoder: Dec,
pool_idle_timeout: Duration,
pool_max_idle_per_host: usize,
) -> Result<Self, BoxError> {
let inner = build_hyper_client(
tls_config,
protocol == Protocol::Grpc,
pool_idle_timeout,
pool_max_idle_per_host,
)?;

Ok(Self {
inner,
Expand Down
19 changes: 17 additions & 2 deletions src/exporters/otlp/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use tonic::codegen::Service;
use tower::BoxError;

Expand Down Expand Up @@ -189,16 +190,30 @@ where
sent: RotelCounter<u64>,
send_failed: RotelCounter<u64>,
signing_builder: AwsSigningServiceBuilder,
pool_idle_timeout: Duration,
pool_max_idle_per_host: usize,
) -> Result<Self, Box<dyn Error + Send + Sync>> {
let client = match protocol {
Protocol::Grpc => {
let decoder: GrpcDecoder<T> = GrpcDecoder::new(send_failed.clone());
let http_client = Client::build(tls_config, HttpProtocol::Grpc, decoder)?;
let http_client = Client::build(
tls_config,
HttpProtocol::Grpc,
decoder,
pool_idle_timeout,
pool_max_idle_per_host,
)?;
UnifiedClientType::Grpc(signing_builder.build(http_client))
}
Protocol::Http => {
let decoder: HttpDecoder<T> = HttpDecoder::new(send_failed.clone());
let http_client = Client::build(tls_config, HttpProtocol::Http, decoder)?;
let http_client = Client::build(
tls_config,
HttpProtocol::Http,
decoder,
pool_idle_timeout,
pool_max_idle_per_host,
)?;
UnifiedClientType::Http(signing_builder.build(http_client))
}
};
Expand Down
15 changes: 15 additions & 0 deletions src/exporters/otlp/config.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// SPDX-License-Identifier: Apache-2.0

use crate::exporters::http::client::{DEFAULT_POOL_IDLE_TIMEOUT, DEFAULT_POOL_MAX_IDLE_PER_HOST};
use crate::exporters::http::retry::RetryConfig;
use crate::exporters::http::tls::{Config, ConfigBuilder};
use crate::exporters::otlp::{
Expand All @@ -25,6 +26,8 @@ pub struct OTLPExporterConfig {
pub(crate) encode_drain_max_time: Duration,
pub(crate) export_drain_max_time: Duration,
pub(crate) tls_cfg_builder: ConfigBuilder,
pub(crate) pool_idle_timeout: Duration,
pub(crate) pool_max_idle_per_host: usize,
}

impl OTLPExporterConfig {
Expand All @@ -46,6 +49,8 @@ impl OTLPExporterConfig {
request_timeout: DEFAULT_REQUEST_TIMEOUT,
encode_drain_max_time: Duration::from_secs(2),
export_drain_max_time: Duration::from_secs(3),
pool_idle_timeout: DEFAULT_POOL_IDLE_TIMEOUT,
pool_max_idle_per_host: DEFAULT_POOL_MAX_IDLE_PER_HOST,
}
}

Expand Down Expand Up @@ -122,4 +127,14 @@ impl OTLPExporterConfig {
self.export_drain_max_time = max_time;
self
}

pub fn with_pool_idle_timeout(mut self, timeout: Duration) -> Self {
self.pool_idle_timeout = timeout;
self
}

pub fn with_pool_max_idle_per_host(mut self, max_idle: usize) -> Self {
self.pool_max_idle_per_host = max_idle;
self
}
}
6 changes: 6 additions & 0 deletions src/exporters/otlp/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ pub fn build_traces_exporter(
sent,
send_failed.clone(),
aws_signing,
traces_config.pool_idle_timeout,
traces_config.pool_max_idle_per_host,
)?;

let retry_policy = RetryPolicy::new(
Expand Down Expand Up @@ -223,6 +225,8 @@ pub fn build_logs_exporter(
sent,
send_failed.clone(),
aws_signing,
logs_config.pool_idle_timeout,
logs_config.pool_max_idle_per_host,
)?;

let retry_policy = RetryPolicy::new(
Expand Down Expand Up @@ -305,6 +309,8 @@ fn _build_metrics_exporter(
sent,
send_failed.clone(),
aws_signing,
metrics_config.pool_idle_timeout,
metrics_config.pool_max_idle_per_host,
)?;

let retry_policy = RetryPolicy::new(
Expand Down
11 changes: 10 additions & 1 deletion src/exporters/xray/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,16 @@ impl XRayExporterBuilder {
environment: String,
creds_provider: AwsCredsProvider,
) -> Result<ExporterType<'a, ResourceSpans>, BoxError> {
let client = Client::build(tls::Config::default(), Protocol::Http, Default::default())?;
use crate::exporters::http::client::{
DEFAULT_POOL_IDLE_TIMEOUT, DEFAULT_POOL_MAX_IDLE_PER_HOST,
};
let client = Client::build(
tls::Config::default(),
Protocol::Http,
Default::default(),
DEFAULT_POOL_IDLE_TIMEOUT,
DEFAULT_POOL_MAX_IDLE_PER_HOST,
)?;
let transformer = Transformer::new(environment);

let req_builder =
Expand Down
25 changes: 24 additions & 1 deletion src/init/otlp_exporter.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::exporters::http::client::{DEFAULT_POOL_IDLE_TIMEOUT, DEFAULT_POOL_MAX_IDLE_PER_HOST};
use crate::exporters::otlp;
use crate::exporters::otlp::config::OTLPExporterConfig;
use crate::exporters::otlp::{CompressionEncoding, Endpoint, Protocol};
Expand Down Expand Up @@ -112,6 +113,24 @@ pub struct OTLPExporterBaseArgs {
#[serde(with = "humantime_serde")]
pub request_timeout: std::time::Duration,

/// Connection pool idle timeout - How long idle connections remain in the pool before being closed.
#[arg(
long("otlp-exporter-pool-idle-timeout"),
env = "ROTEL_OTLP_EXPORTER_POOL_IDLE_TIMEOUT",
default_value = "30s",
value_parser = humantime::parse_duration,
)]
#[serde(with = "humantime_serde")]
pub pool_idle_timeout: Duration,

/// Maximum idle connections per host - Controls connection reuse for keep-alive.
#[arg(
long("otlp-exporter-pool-max-idle-per-host"),
env = "ROTEL_OTLP_EXPORTER_POOL_MAX_IDLE_PER_HOST",
default_value = "100"
)]
pub pool_max_idle_per_host: usize,

#[command(flatten)]
#[serde(flatten)]
pub retry: OtlpRetryArgs,
Expand Down Expand Up @@ -142,6 +161,8 @@ impl Default for OTLPExporterBaseArgs {
},
tls_skip_verify: false,
request_timeout: Duration::from_secs(5),
pool_idle_timeout: DEFAULT_POOL_IDLE_TIMEOUT,
pool_max_idle_per_host: DEFAULT_POOL_MAX_IDLE_PER_HOST,
retry: Default::default(),
}
}
Expand Down Expand Up @@ -631,7 +652,9 @@ impl OTLPExporterBaseArgs {
.with_tls_skip_verify(self.tls_skip_verify)
.with_headers(custom_headers.as_slice())
.with_request_timeout(self.request_timeout.into())
.with_compression_encoding(self.compression.into());
.with_compression_encoding(self.compression.into())
.with_pool_idle_timeout(self.pool_idle_timeout)
.with_pool_max_idle_per_host(self.pool_max_idle_per_host);

if let Some(tls_cert_file) = self.cert_group.tls_cert_file {
builder = builder.with_cert_file(tls_cert_file.as_str());
Expand Down