diff --git a/Cargo.toml b/Cargo.toml index 3005b381d..583f59f78 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ derive_more = { version = "1.0", features = ["constructor", "display", "from", " thiserror = "2" tonic = "0.12" tonic-build = "0.12" -opentelemetry = { version = "0.24", features = ["metrics"] } +opentelemetry = { version = "0.26", features = ["metrics"] } prost = "0.13" prost-types = "0.13" diff --git a/core/Cargo.toml b/core/Cargo.toml index 3d5221f47..562032468 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -43,9 +43,9 @@ itertools = "0.13" lru = "0.12" mockall = "0.13" opentelemetry = { workspace = true, features = ["metrics"], optional = true } -opentelemetry_sdk = { version = "0.24", features = ["rt-tokio", "metrics"], optional = true } -opentelemetry-otlp = { version = "0.17", features = ["tokio", "metrics", "tls"], optional = true } -opentelemetry-prometheus = { version = "0.17", optional = true } +opentelemetry_sdk = { version = "0.26", features = ["rt-tokio", "metrics"], optional = true } +opentelemetry-otlp = { version = "0.26", features = ["tokio", "metrics", "tls"], optional = true } +opentelemetry-prometheus = { git = "https://github.com/open-telemetry/opentelemetry-rust.git", rev = "e911383", optional = true } parking_lot = { version = "0.12", features = ["send_guard"] } pid = "4.0" pin-project = "1.0" diff --git a/core/src/telemetry/otel.rs b/core/src/telemetry/otel.rs index 138c0a817..c4bca1c52 100644 --- a/core/src/telemetry/otel.rs +++ b/core/src/telemetry/otel.rs @@ -19,11 +19,8 @@ use opentelemetry::{ use opentelemetry_otlp::WithExportConfig; use opentelemetry_sdk::{ metrics::{ - data::Temporality, - new_view, - reader::{AggregationSelector, DefaultAggregationSelector, TemporalitySelector}, - Aggregation, Instrument, InstrumentKind, MeterProviderBuilder, PeriodicReader, - SdkMeterProvider, View, + data::Temporality, new_view, reader::TemporalitySelector, Aggregation, Instrument, + InstrumentKind, MeterProviderBuilder, PeriodicReader, SdkMeterProvider, View, }, runtime, Resource, }; @@ -38,38 +35,6 @@ use temporal_sdk_core_api::telemetry::{ use tokio::task::AbortHandle; use tonic::{metadata::MetadataMap, transport::ClientTlsConfig}; -/// Chooses appropriate aggregators for our metrics -#[derive(Debug, Clone)] -struct SDKAggSelector { - use_seconds: bool, - default: DefaultAggregationSelector, -} - -impl SDKAggSelector { - fn new(use_seconds: bool) -> Self { - Self { - use_seconds, - default: Default::default(), - } - } -} - -impl AggregationSelector for SDKAggSelector { - fn aggregation(&self, kind: InstrumentKind) -> Aggregation { - match kind { - InstrumentKind::Histogram => Aggregation::ExplicitBucketHistogram { - boundaries: if self.use_seconds { - DEFAULT_S_BUCKETS.to_vec() - } else { - DEFAULT_MS_BUCKETS.to_vec() - }, - record_min_max: true, - }, - _ => self.default.aggregation(kind), - } - } -} - fn histo_view( metric_name: &'static str, use_seconds: bool, @@ -130,6 +95,24 @@ pub(super) fn augment_meter_provider_with_defaults( ), )?) } + // Fallback default + mpb = mpb.with_view(new_view( + { + let mut i = Instrument::new(); + i.kind = Some(InstrumentKind::Histogram); + i + }, + opentelemetry_sdk::metrics::Stream::new().aggregation( + Aggregation::ExplicitBucketHistogram { + boundaries: if use_seconds { + DEFAULT_S_BUCKETS.to_vec() + } else { + DEFAULT_MS_BUCKETS.to_vec() + }, + record_min_max: true, + }, + ), + )?); Ok(mpb.with_resource(default_resource(global_tags))) } @@ -144,10 +127,9 @@ pub fn build_otlp_metric_exporter( } let exporter = exporter .with_metadata(MetadataMap::from_headers((&opts.headers).try_into()?)) - .build_metrics_exporter( - Box::new(SDKAggSelector::new(opts.use_seconds_for_durations)), - Box::new(metric_temporality_to_selector(opts.metric_temporality)), - )?; + .build_metrics_exporter(Box::new(metric_temporality_to_selector( + opts.metric_temporality, + )))?; let reader = PeriodicReader::builder(exporter, runtime::Tokio) .with_interval(opts.metric_periodicity) .build(); @@ -178,8 +160,7 @@ pub struct StartedPromServer { pub fn start_prometheus_metric_exporter( opts: PrometheusExporterOptions, ) -> Result { - let (srv, exporter) = - PromServer::new(&opts, SDKAggSelector::new(opts.use_seconds_for_durations))?; + let (srv, exporter) = PromServer::new(&opts)?; let meter_provider = augment_meter_provider_with_defaults( MeterProviderBuilder::default().with_reader(exporter), &opts.global_tags, @@ -202,7 +183,7 @@ pub fn start_prometheus_metric_exporter( #[derive(Debug)] pub struct CoreOtelMeter { - meter: Meter, + pub meter: Meter, use_seconds_for_durations: bool, // we have to hold on to the provider otherwise otel automatically shuts it down on drop // for whatever crazy reason diff --git a/core/src/telemetry/prometheus_server.rs b/core/src/telemetry/prometheus_server.rs index 9eefa700c..e49ddf9a1 100644 --- a/core/src/telemetry/prometheus_server.rs +++ b/core/src/telemetry/prometheus_server.rs @@ -5,7 +5,6 @@ use hyper_util::{ server::conn::auto, }; use opentelemetry_prometheus::PrometheusExporter; -use opentelemetry_sdk::metrics::reader::AggregationSelector; use prometheus::{Encoder, Registry, TextEncoder}; use std::net::{SocketAddr, TcpListener}; use temporal_sdk_core_api::telemetry::PrometheusExporterOptions; @@ -20,11 +19,9 @@ pub(super) struct PromServer { impl PromServer { pub(super) fn new( opts: &PrometheusExporterOptions, - aggregation: impl AggregationSelector + 'static, ) -> Result<(Self, PrometheusExporter), anyhow::Error> { let registry = Registry::new(); let exporter = opentelemetry_prometheus::exporter() - .with_aggregation_selector(aggregation) .without_scope_info() .with_registry(registry.clone()); let exporter = if !opts.counters_total_suffix {