Skip to content

Commit

Permalink
Otel upgrade to fix gauge aggregation bug
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource committed Nov 20, 2024
1 parent 8ca2100 commit cf75766
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 51 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ derive_more = { version = "1.0", features = ["constructor", "display", "from", "
thiserror = "2"
tonic = "0.12"
tonic-build = "0.12"
opentelemetry = { version = "0.24", features = ["metrics"] }
opentelemetry = { version = "0.26", features = ["metrics"] }
prost = "0.13"
prost-types = "0.13"

Expand Down
6 changes: 3 additions & 3 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ itertools = "0.13"
lru = "0.12"
mockall = "0.13"
opentelemetry = { workspace = true, features = ["metrics"], optional = true }
opentelemetry_sdk = { version = "0.24", features = ["rt-tokio", "metrics"], optional = true }
opentelemetry-otlp = { version = "0.17", features = ["tokio", "metrics", "tls"], optional = true }
opentelemetry-prometheus = { version = "0.17", optional = true }
opentelemetry_sdk = { version = "0.26", features = ["rt-tokio", "metrics"], optional = true }
opentelemetry-otlp = { version = "0.26", features = ["tokio", "metrics", "tls"], optional = true }
opentelemetry-prometheus = { git = "https://github.com/open-telemetry/opentelemetry-rust.git", rev = "e911383", optional = true }
parking_lot = { version = "0.12", features = ["send_guard"] }
pid = "4.0"
pin-project = "1.0"
Expand Down
69 changes: 25 additions & 44 deletions core/src/telemetry/otel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,8 @@ use opentelemetry::{
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::{
metrics::{
data::Temporality,
new_view,
reader::{AggregationSelector, DefaultAggregationSelector, TemporalitySelector},
Aggregation, Instrument, InstrumentKind, MeterProviderBuilder, PeriodicReader,
SdkMeterProvider, View,
data::Temporality, new_view, reader::TemporalitySelector, Aggregation, Instrument,
InstrumentKind, MeterProviderBuilder, PeriodicReader, SdkMeterProvider, View,
},
runtime, Resource,
};
Expand All @@ -38,38 +35,6 @@ use temporal_sdk_core_api::telemetry::{
use tokio::task::AbortHandle;
use tonic::{metadata::MetadataMap, transport::ClientTlsConfig};

/// Chooses appropriate aggregators for our metrics
#[derive(Debug, Clone)]
struct SDKAggSelector {
use_seconds: bool,
default: DefaultAggregationSelector,
}

impl SDKAggSelector {
fn new(use_seconds: bool) -> Self {
Self {
use_seconds,
default: Default::default(),
}
}
}

impl AggregationSelector for SDKAggSelector {
fn aggregation(&self, kind: InstrumentKind) -> Aggregation {
match kind {
InstrumentKind::Histogram => Aggregation::ExplicitBucketHistogram {
boundaries: if self.use_seconds {
DEFAULT_S_BUCKETS.to_vec()
} else {
DEFAULT_MS_BUCKETS.to_vec()
},
record_min_max: true,
},
_ => self.default.aggregation(kind),
}
}
}

fn histo_view(
metric_name: &'static str,
use_seconds: bool,
Expand Down Expand Up @@ -130,6 +95,24 @@ pub(super) fn augment_meter_provider_with_defaults(
),
)?)
}
// Fallback default
mpb = mpb.with_view(new_view(
{
let mut i = Instrument::new();
i.kind = Some(InstrumentKind::Histogram);
i
},
opentelemetry_sdk::metrics::Stream::new().aggregation(
Aggregation::ExplicitBucketHistogram {
boundaries: if use_seconds {
DEFAULT_S_BUCKETS.to_vec()
} else {
DEFAULT_MS_BUCKETS.to_vec()
},
record_min_max: true,
},
),
)?);
Ok(mpb.with_resource(default_resource(global_tags)))
}

Expand All @@ -144,10 +127,9 @@ pub fn build_otlp_metric_exporter(
}
let exporter = exporter
.with_metadata(MetadataMap::from_headers((&opts.headers).try_into()?))
.build_metrics_exporter(
Box::new(SDKAggSelector::new(opts.use_seconds_for_durations)),
Box::new(metric_temporality_to_selector(opts.metric_temporality)),
)?;
.build_metrics_exporter(Box::new(metric_temporality_to_selector(
opts.metric_temporality,
)))?;
let reader = PeriodicReader::builder(exporter, runtime::Tokio)
.with_interval(opts.metric_periodicity)
.build();
Expand Down Expand Up @@ -178,8 +160,7 @@ pub struct StartedPromServer {
pub fn start_prometheus_metric_exporter(
opts: PrometheusExporterOptions,
) -> Result<StartedPromServer, anyhow::Error> {
let (srv, exporter) =
PromServer::new(&opts, SDKAggSelector::new(opts.use_seconds_for_durations))?;
let (srv, exporter) = PromServer::new(&opts)?;
let meter_provider = augment_meter_provider_with_defaults(
MeterProviderBuilder::default().with_reader(exporter),
&opts.global_tags,
Expand All @@ -202,7 +183,7 @@ pub fn start_prometheus_metric_exporter(

#[derive(Debug)]
pub struct CoreOtelMeter {
meter: Meter,
pub meter: Meter,
use_seconds_for_durations: bool,
// we have to hold on to the provider otherwise otel automatically shuts it down on drop
// for whatever crazy reason
Expand Down
3 changes: 0 additions & 3 deletions core/src/telemetry/prometheus_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use hyper_util::{
server::conn::auto,
};
use opentelemetry_prometheus::PrometheusExporter;
use opentelemetry_sdk::metrics::reader::AggregationSelector;
use prometheus::{Encoder, Registry, TextEncoder};
use std::net::{SocketAddr, TcpListener};
use temporal_sdk_core_api::telemetry::PrometheusExporterOptions;
Expand All @@ -20,11 +19,9 @@ pub(super) struct PromServer {
impl PromServer {
pub(super) fn new(
opts: &PrometheusExporterOptions,
aggregation: impl AggregationSelector + 'static,
) -> Result<(Self, PrometheusExporter), anyhow::Error> {
let registry = Registry::new();
let exporter = opentelemetry_prometheus::exporter()
.with_aggregation_selector(aggregation)
.without_scope_info()
.with_registry(registry.clone());
let exporter = if !opts.counters_total_suffix {
Expand Down

0 comments on commit cf75766

Please sign in to comment.