From d31c10556df81636a36b24264b1133b5d0bf25e7 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Thu, 21 Nov 2024 08:51:21 -0800 Subject: [PATCH] Custom metric buckets (#844) --- Cargo.toml | 3 +- client/Cargo.toml | 2 +- client/src/lib.rs | 4 +- client/src/metrics.rs | 9 +- core-api/Cargo.toml | 2 +- core-api/src/telemetry.rs | 28 +++- core-api/src/telemetry/metrics.rs | 26 +++ core/Cargo.toml | 8 +- core/src/telemetry/metrics.rs | 41 +++-- core/src/telemetry/mod.rs | 7 +- core/src/telemetry/otel.rs | 209 +++++++++--------------- core/src/telemetry/prometheus_server.rs | 3 - sdk-core-protos/Cargo.toml | 2 +- sdk/Cargo.toml | 2 +- test-utils/Cargo.toml | 2 +- tests/integ_tests/metrics_tests.rs | 72 +++++--- tests/integ_tests/workflow_tests.rs | 2 +- 17 files changed, 234 insertions(+), 188 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 59f2c6c1d..583f59f78 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,9 +9,10 @@ license-file = "LICENSE.txt" [workspace.dependencies] derive_builder = "0.20" derive_more = { version = "1.0", features = ["constructor", "display", "from", "into", "debug"] } +thiserror = "2" tonic = "0.12" tonic-build = "0.12" -opentelemetry = { version = "0.24", features = ["metrics"] } +opentelemetry = { version = "0.26", features = ["metrics"] } prost = "0.13" prost-types = "0.13" diff --git a/client/Cargo.toml b/client/Cargo.toml index ca0ca14fc..7e5f57110 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -30,7 +30,7 @@ opentelemetry = { workspace = true, features = ["metrics"], optional = true } parking_lot = "0.12" prost-types = { workspace = true } slotmap = "1.0" -thiserror = "1.0" +thiserror = { workspace = true } tokio = "1.1" tonic = { workspace = true, features = ["tls", "tls-roots"] } tower = { version = "0.5", features = ["util"] } diff --git a/client/src/lib.rs b/client/src/lib.rs index 8f5e743db..778257e5d 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -18,6 +18,7 @@ pub use crate::{ proxy::HttpConnectProxyOptions, retry::{CallType, RetryClient, RETRYABLE_ERROR_CODES}, }; +pub use metrics::{LONG_REQUEST_LATENCY_HISTOGRAM_NAME, REQUEST_LATENCY_HISTOGRAM_NAME}; pub use raw::{CloudService, HealthService, OperatorService, TestService, WorkflowService}; pub use temporal_sdk_core_protos::temporal::api::{ enums::v1::ArchivalState, @@ -42,13 +43,12 @@ use crate::{ use backoff::{exponential, ExponentialBackoff, SystemClock}; use http::{uri::InvalidUri, Uri}; use parking_lot::RwLock; -use std::sync::OnceLock; use std::{ collections::HashMap, fmt::{Debug, Formatter}, ops::{Deref, DerefMut}, str::FromStr, - sync::Arc, + sync::{Arc, OnceLock}, time::{Duration, Instant}, }; use temporal_sdk_core_api::telemetry::metrics::TemporalMeter; diff --git a/client/src/metrics.rs b/client/src/metrics.rs index 784c1c057..56e998881 100644 --- a/client/src/metrics.rs +++ b/client/src/metrics.rs @@ -12,6 +12,11 @@ use temporal_sdk_core_api::telemetry::metrics::{ use tonic::{body::BoxBody, transport::Channel, Code}; use tower::Service; +/// The string name (which may be prefixed) for this metric +pub static REQUEST_LATENCY_HISTOGRAM_NAME: &str = "request_latency"; +/// The string name (which may be prefixed) for this metric +pub static LONG_REQUEST_LATENCY_HISTOGRAM_NAME: &str = "long_request_latency"; + /// Used to track context associated with metrics, and record/update them // Possible improvement: make generic over some type tag so that methods are only exposed if the // appropriate k/vs have already been set. @@ -58,12 +63,12 @@ impl MetricsContext { unit: "".into(), }), svc_request_latency: meter.histogram_duration(MetricParameters { - name: "request_latency".into(), + name: REQUEST_LATENCY_HISTOGRAM_NAME.into(), unit: "duration".into(), description: "Histogram of client request latencies".into(), }), long_svc_request_latency: meter.histogram_duration(MetricParameters { - name: "long_request_latency".into(), + name: LONG_REQUEST_LATENCY_HISTOGRAM_NAME.into(), unit: "duration".into(), description: "Histogram of client long-poll request latencies".into(), }), diff --git a/core-api/Cargo.toml b/core-api/Cargo.toml index d9725660d..63e903f7f 100644 --- a/core-api/Cargo.toml +++ b/core-api/Cargo.toml @@ -23,7 +23,7 @@ opentelemetry = { workspace = true, optional = true } prost = { workspace = true } prost-types = { workspace = true } serde_json = "1.0" -thiserror = "1.0" +thiserror = { workspace = true } tonic = { workspace = true } tracing-core = "0.1" url = "2.3" diff --git a/core-api/src/telemetry.rs b/core-api/src/telemetry.rs index 7d9a5b2e4..75597aa0c 100644 --- a/core-api/src/telemetry.rs +++ b/core-api/src/telemetry.rs @@ -68,6 +68,9 @@ pub struct OtelCollectorOptions { /// If set to true, use f64 seconds for durations instead of u64 milliseconds #[builder(default)] pub use_seconds_for_durations: bool, + /// Overrides for histogram buckets. Units depend on the value of `use_seconds_for_durations`. + #[builder(default)] + pub histogram_bucket_overrides: HistogramBucketOverrides, } /// Options for exporting metrics to Prometheus @@ -78,15 +81,33 @@ pub struct PrometheusExporterOptions { #[builder(default)] pub global_tags: HashMap, /// If set true, all counters will include a "_total" suffix - #[builder(default = "false")] + #[builder(default)] pub counters_total_suffix: bool, /// If set true, all histograms will include the unit in their name as a suffix. /// Ex: "_milliseconds". - #[builder(default = "false")] + #[builder(default)] pub unit_suffix: bool, /// If set to true, use f64 seconds for durations instead of u64 milliseconds #[builder(default)] pub use_seconds_for_durations: bool, + /// Overrides for histogram buckets. Units depend on the value of `use_seconds_for_durations`. + #[builder(default)] + pub histogram_bucket_overrides: HistogramBucketOverrides, +} + +/// Allows overriding the buckets used by histogram metrics +#[derive(Debug, Clone, Default)] +pub struct HistogramBucketOverrides { + /// Overrides where the key is the metric name and the value is the list of bucket boundaries. + /// The metric name will apply regardless of name prefixing, if any. IE: the name acts like + /// `*metric_name`. + /// + /// The string names of core's built-in histogram metrics are publicly available on the + /// `core::telemetry` module and the `client` crate. + /// + /// See [here](https://docs.rs/opentelemetry_sdk/latest/opentelemetry_sdk/metrics/enum.Aggregation.html#variant.ExplicitBucketHistogram.field.boundaries) + /// for the exact meaning of boundaries. + pub overrides: HashMap>, } /// Control where logs go @@ -102,7 +123,8 @@ pub enum Logger { /// An [EnvFilter](https://docs.rs/tracing-subscriber/latest/tracing_subscriber/struct.EnvFilter.html) filter string. filter: String, }, - // Push logs to Lang. Can used with temporal_sdk_core::telemetry::CoreLogBufferedConsumer to buffer. + /// Push logs to Lang. Can be used with + /// temporal_sdk_core::telemetry::log_export::CoreLogBufferedConsumer to buffer. Push { /// An [EnvFilter](https://docs.rs/tracing-subscriber/latest/tracing_subscriber/struct.EnvFilter.html) filter string. filter: String, diff --git a/core-api/src/telemetry/metrics.rs b/core-api/src/telemetry/metrics.rs index 5bdeb7d55..5a8de90e9 100644 --- a/core-api/src/telemetry/metrics.rs +++ b/core-api/src/telemetry/metrics.rs @@ -377,6 +377,32 @@ mod otel_impls { } } + impl Gauge for metrics::Gauge { + fn record(&self, value: u64, attributes: &MetricAttributes) { + if let MetricAttributes::OTel { kvs } = attributes { + self.record(value, kvs); + } else { + debug_assert!( + false, + "Must use OTel attributes with an OTel metric implementation" + ); + } + } + } + + impl GaugeF64 for metrics::Gauge { + fn record(&self, value: f64, attributes: &MetricAttributes) { + if let MetricAttributes::OTel { kvs } = attributes { + self.record(value, kvs); + } else { + debug_assert!( + false, + "Must use OTel attributes with an OTel metric implementation" + ); + } + } + } + impl Histogram for metrics::Histogram { fn record(&self, value: u64, attributes: &MetricAttributes) { if let MetricAttributes::OTel { kvs } = attributes { diff --git a/core/Cargo.toml b/core/Cargo.toml index d065a484f..562032468 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -43,9 +43,9 @@ itertools = "0.13" lru = "0.12" mockall = "0.13" opentelemetry = { workspace = true, features = ["metrics"], optional = true } -opentelemetry_sdk = { version = "0.24", features = ["rt-tokio", "metrics"], optional = true } -opentelemetry-otlp = { version = "0.17", features = ["tokio", "metrics", "tls"], optional = true } -opentelemetry-prometheus = { version = "0.17", optional = true } +opentelemetry_sdk = { version = "0.26", features = ["rt-tokio", "metrics"], optional = true } +opentelemetry-otlp = { version = "0.26", features = ["tokio", "metrics", "tls"], optional = true } +opentelemetry-prometheus = { git = "https://github.com/open-telemetry/opentelemetry-rust.git", rev = "e911383", optional = true } parking_lot = { version = "0.12", features = ["send_guard"] } pid = "4.0" pin-project = "1.0" @@ -61,7 +61,7 @@ siphasher = "1.0" slotmap = "1.0" sysinfo = { version = "0.32", default-features = false, features = ["system"] } tar = { version = "0.4", optional = true } -thiserror = "1.0" +thiserror = { workspace = true } tokio = { version = "1.37", features = ["rt", "rt-multi-thread", "parking_lot", "time", "fs", "process"] } tokio-util = { version = "0.7", features = ["io", "io-util"] } tokio-stream = "0.1" diff --git a/core/src/telemetry/metrics.rs b/core/src/telemetry/metrics.rs index 3a375a50f..6aef75bd8 100644 --- a/core/src/telemetry/metrics.rs +++ b/core/src/telemetry/metrics.rs @@ -292,7 +292,7 @@ impl Instruments { unit: "".into(), }), wf_e2e_latency: meter.histogram_duration(MetricParameters { - name: WF_E2E_LATENCY_NAME.into(), + name: WORKFLOW_E2E_LATENCY_HISTOGRAM_NAME.into(), unit: "duration".into(), description: "Histogram of total workflow execution latencies".into(), }), @@ -312,17 +312,17 @@ impl Instruments { unit: "".into(), }), wf_task_sched_to_start_latency: meter.histogram_duration(MetricParameters { - name: WF_TASK_SCHED_TO_START_LATENCY_NAME.into(), + name: WORKFLOW_TASK_SCHED_TO_START_LATENCY_HISTOGRAM_NAME.into(), unit: "duration".into(), description: "Histogram of workflow task schedule-to-start latencies".into(), }), wf_task_replay_latency: meter.histogram_duration(MetricParameters { - name: WF_TASK_REPLAY_LATENCY_NAME.into(), + name: WORKFLOW_TASK_REPLAY_LATENCY_HISTOGRAM_NAME.into(), unit: "duration".into(), description: "Histogram of workflow task replay latencies".into(), }), wf_task_execution_latency: meter.histogram_duration(MetricParameters { - name: WF_TASK_EXECUTION_LATENCY_NAME.into(), + name: WORKFLOW_TASK_EXECUTION_LATENCY_HISTOGRAM_NAME.into(), unit: "duration".into(), description: "Histogram of workflow task execution (not replay) latencies".into(), }), @@ -342,12 +342,12 @@ impl Instruments { unit: "".into(), }), act_sched_to_start_latency: meter.histogram_duration(MetricParameters { - name: ACT_SCHED_TO_START_LATENCY_NAME.into(), + name: ACTIVITY_SCHED_TO_START_LATENCY_HISTOGRAM_NAME.into(), unit: "duration".into(), description: "Histogram of activity schedule-to-start latencies".into(), }), act_exec_latency: meter.histogram_duration(MetricParameters { - name: ACT_EXEC_LATENCY_NAME.into(), + name: ACTIVITY_EXEC_LATENCY_HISTOGRAM_NAME.into(), unit: "duration".into(), description: "Histogram of activity execution latencies".into(), }), @@ -496,13 +496,20 @@ pub(crate) fn failure_reason(reason: FailureReason) -> MetricKeyValue { MetricKeyValue::new(KEY_TASK_FAILURE_TYPE, reason.to_string()) } -pub(super) const WF_E2E_LATENCY_NAME: &str = "workflow_endtoend_latency"; -pub(super) const WF_TASK_SCHED_TO_START_LATENCY_NAME: &str = +/// The string name (which may be prefixed) for this metric +pub const WORKFLOW_E2E_LATENCY_HISTOGRAM_NAME: &str = "workflow_endtoend_latency"; +/// The string name (which may be prefixed) for this metric +pub const WORKFLOW_TASK_SCHED_TO_START_LATENCY_HISTOGRAM_NAME: &str = "workflow_task_schedule_to_start_latency"; -pub(super) const WF_TASK_REPLAY_LATENCY_NAME: &str = "workflow_task_replay_latency"; -pub(super) const WF_TASK_EXECUTION_LATENCY_NAME: &str = "workflow_task_execution_latency"; -pub(super) const ACT_SCHED_TO_START_LATENCY_NAME: &str = "activity_schedule_to_start_latency"; -pub(super) const ACT_EXEC_LATENCY_NAME: &str = "activity_execution_latency"; +/// The string name (which may be prefixed) for this metric +pub const WORKFLOW_TASK_REPLAY_LATENCY_HISTOGRAM_NAME: &str = "workflow_task_replay_latency"; +/// The string name (which may be prefixed) for this metric +pub const WORKFLOW_TASK_EXECUTION_LATENCY_HISTOGRAM_NAME: &str = "workflow_task_execution_latency"; +/// The string name (which may be prefixed) for this metric +pub const ACTIVITY_SCHED_TO_START_LATENCY_HISTOGRAM_NAME: &str = + "activity_schedule_to_start_latency"; +/// The string name (which may be prefixed) for this metric +pub const ACTIVITY_EXEC_LATENCY_HISTOGRAM_NAME: &str = "activity_execution_latency"; pub(super) const NUM_POLLERS_NAME: &str = "num_pollers"; pub(super) const TASK_SLOTS_AVAILABLE_NAME: &str = "worker_task_slots_available"; pub(super) const TASK_SLOTS_USED_NAME: &str = "worker_task_slots_used"; @@ -533,7 +540,7 @@ macro_rules! define_latency_buckets { define_latency_buckets!( ( - WF_E2E_LATENCY_NAME, + WORKFLOW_E2E_LATENCY_HISTOGRAM_NAME, WF_LATENCY_MS_BUCKETS, WF_LATENCY_S_BUCKETS, [ @@ -556,19 +563,21 @@ define_latency_buckets!( ] ), ( - WF_TASK_EXECUTION_LATENCY_NAME | WF_TASK_REPLAY_LATENCY_NAME, + WORKFLOW_TASK_EXECUTION_LATENCY_HISTOGRAM_NAME + | WORKFLOW_TASK_REPLAY_LATENCY_HISTOGRAM_NAME, WF_TASK_MS_BUCKETS, WF_TASK_S_BUCKETS, [1., 10., 20., 50., 100., 200., 500., 1000.] ), ( - ACT_EXEC_LATENCY_NAME, + ACTIVITY_EXEC_LATENCY_HISTOGRAM_NAME, ACT_EXE_MS_BUCKETS, ACT_EXE_S_BUCKETS, [50., 100., 500., 1000., 5000., 10_000., 60_000.] ), ( - WF_TASK_SCHED_TO_START_LATENCY_NAME | ACT_SCHED_TO_START_LATENCY_NAME, + WORKFLOW_TASK_SCHED_TO_START_LATENCY_HISTOGRAM_NAME + | ACTIVITY_SCHED_TO_START_LATENCY_HISTOGRAM_NAME, TASK_SCHED_TO_START_MS_BUCKETS, TASK_SCHED_TO_START_S_BUCKETS, [100., 500., 1000., 5000., 10_000., 100_000., 1_000_000.] diff --git a/core/src/telemetry/mod.rs b/core/src/telemetry/mod.rs index e6886618c..38e0f304b 100644 --- a/core/src/telemetry/mod.rs +++ b/core/src/telemetry/mod.rs @@ -9,7 +9,12 @@ mod otel; mod prometheus_server; #[cfg(feature = "otel")] -pub use metrics::{default_buckets_for, MetricsCallBuffer}; +pub use metrics::{ + default_buckets_for, MetricsCallBuffer, ACTIVITY_EXEC_LATENCY_HISTOGRAM_NAME, + ACTIVITY_SCHED_TO_START_LATENCY_HISTOGRAM_NAME, WORKFLOW_E2E_LATENCY_HISTOGRAM_NAME, + WORKFLOW_TASK_EXECUTION_LATENCY_HISTOGRAM_NAME, WORKFLOW_TASK_REPLAY_LATENCY_HISTOGRAM_NAME, + WORKFLOW_TASK_SCHED_TO_START_LATENCY_HISTOGRAM_NAME, +}; #[cfg(feature = "otel")] pub use otel::{build_otlp_metric_exporter, start_prometheus_metric_exporter}; diff --git a/core/src/telemetry/otel.rs b/core/src/telemetry/otel.rs index 770b6da42..c4bca1c52 100644 --- a/core/src/telemetry/otel.rs +++ b/core/src/telemetry/otel.rs @@ -1,9 +1,11 @@ use super::{ default_buckets_for, metrics::{ - ACT_EXEC_LATENCY_NAME, ACT_SCHED_TO_START_LATENCY_NAME, DEFAULT_MS_BUCKETS, - WF_E2E_LATENCY_NAME, WF_TASK_EXECUTION_LATENCY_NAME, WF_TASK_REPLAY_LATENCY_NAME, - WF_TASK_SCHED_TO_START_LATENCY_NAME, + ACTIVITY_EXEC_LATENCY_HISTOGRAM_NAME, ACTIVITY_SCHED_TO_START_LATENCY_HISTOGRAM_NAME, + DEFAULT_MS_BUCKETS, WORKFLOW_E2E_LATENCY_HISTOGRAM_NAME, + WORKFLOW_TASK_EXECUTION_LATENCY_HISTOGRAM_NAME, + WORKFLOW_TASK_REPLAY_LATENCY_HISTOGRAM_NAME, + WORKFLOW_TASK_SCHED_TO_START_LATENCY_HISTOGRAM_NAME, }, prometheus_server::PromServer, TELEM_SERVICE_NAME, @@ -17,58 +19,22 @@ use opentelemetry::{ use opentelemetry_otlp::WithExportConfig; use opentelemetry_sdk::{ metrics::{ - data::Temporality, - new_view, - reader::{AggregationSelector, DefaultAggregationSelector, TemporalitySelector}, - Aggregation, AttributeSet, Instrument, InstrumentKind, MeterProviderBuilder, - PeriodicReader, SdkMeterProvider, View, + data::Temporality, new_view, reader::TemporalitySelector, Aggregation, Instrument, + InstrumentKind, MeterProviderBuilder, PeriodicReader, SdkMeterProvider, View, }, runtime, Resource, }; -use parking_lot::RwLock; use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration}; use temporal_sdk_core_api::telemetry::{ metrics::{ CoreMeter, Counter, Gauge, GaugeF64, Histogram, HistogramDuration, HistogramF64, MetricAttributes, MetricParameters, NewAttributes, }, - MetricTemporality, OtelCollectorOptions, PrometheusExporterOptions, + HistogramBucketOverrides, MetricTemporality, OtelCollectorOptions, PrometheusExporterOptions, }; use tokio::task::AbortHandle; use tonic::{metadata::MetadataMap, transport::ClientTlsConfig}; -/// Chooses appropriate aggregators for our metrics -#[derive(Debug, Clone)] -struct SDKAggSelector { - use_seconds: bool, - default: DefaultAggregationSelector, -} - -impl SDKAggSelector { - fn new(use_seconds: bool) -> Self { - Self { - use_seconds, - default: Default::default(), - } - } -} - -impl AggregationSelector for SDKAggSelector { - fn aggregation(&self, kind: InstrumentKind) -> Aggregation { - match kind { - InstrumentKind::Histogram => Aggregation::ExplicitBucketHistogram { - boundaries: if self.use_seconds { - DEFAULT_S_BUCKETS.to_vec() - } else { - DEFAULT_MS_BUCKETS.to_vec() - }, - record_min_max: true, - }, - _ => self.default.aggregation(kind), - } - } -} - fn histo_view( metric_name: &'static str, use_seconds: bool, @@ -89,71 +55,65 @@ pub(super) fn augment_meter_provider_with_defaults( mpb: MeterProviderBuilder, global_tags: &HashMap, use_seconds: bool, + bucket_overrides: HistogramBucketOverrides, ) -> opentelemetry::metrics::Result { // Some histograms are actually gauges, but we have to use histograms otherwise they forget // their value between collections since we don't use callbacks. - Ok(mpb - .with_view(histo_view(WF_E2E_LATENCY_NAME, use_seconds)?) - .with_view(histo_view(WF_TASK_EXECUTION_LATENCY_NAME, use_seconds)?) - .with_view(histo_view(WF_TASK_REPLAY_LATENCY_NAME, use_seconds)?) + let mut mpb = mpb .with_view(histo_view( - WF_TASK_SCHED_TO_START_LATENCY_NAME, + WORKFLOW_E2E_LATENCY_HISTOGRAM_NAME, use_seconds, )?) - .with_view(histo_view(ACT_SCHED_TO_START_LATENCY_NAME, use_seconds)?) - .with_view(histo_view(ACT_EXEC_LATENCY_NAME, use_seconds)?) - .with_resource(default_resource(global_tags))) -} - -/// OTel has no built-in synchronous Gauge. Histograms used to be able to serve that purpose, but -/// they broke that. Lovely. So, we need to implement one by hand. -pub(crate) struct MemoryGauge { - labels_to_values: Arc>>, -} - -macro_rules! impl_memory_gauge { - ($ty:ty, $gauge_fn:ident, $observe_fn:ident) => { - impl MemoryGauge<$ty> { - fn new(params: MetricParameters, meter: &Meter) -> Self { - let gauge = meter - .$gauge_fn(params.name) - .with_unit(params.unit) - .with_description(params.description) - .init(); - let map = Arc::new(RwLock::new(HashMap::::new())); - let map_c = map.clone(); - meter - .register_callback(&[gauge.as_any()], move |o| { - // This whole thing is... extra stupid. - // See https://github.com/open-telemetry/opentelemetry-rust/issues/1181 - // The performance is likely bad here, but, given this is only called when - // metrics are exported it should be livable for now. - let map_rlock = map_c.read(); - for (kvs, val) in map_rlock.iter() { - let kvs: Vec<_> = kvs - .iter() - .map(|(k, v)| KeyValue::new(k.clone(), v.clone())) - .collect(); - o.$observe_fn(&gauge, *val, kvs.as_slice()) - } - }) - .expect("instrument must exist we just created it"); - MemoryGauge { - labels_to_values: map, - } - } - } - }; -} -impl_memory_gauge!(u64, u64_observable_gauge, observe_u64); -impl_memory_gauge!(f64, f64_observable_gauge, observe_f64); - -impl MemoryGauge { - fn record(&self, val: U, kvs: &[KeyValue]) { - self.labels_to_values - .write() - .insert(AttributeSet::from(kvs), val); + .with_view(histo_view( + WORKFLOW_TASK_EXECUTION_LATENCY_HISTOGRAM_NAME, + use_seconds, + )?) + .with_view(histo_view( + WORKFLOW_TASK_REPLAY_LATENCY_HISTOGRAM_NAME, + use_seconds, + )?) + .with_view(histo_view( + WORKFLOW_TASK_SCHED_TO_START_LATENCY_HISTOGRAM_NAME, + use_seconds, + )?) + .with_view(histo_view( + ACTIVITY_SCHED_TO_START_LATENCY_HISTOGRAM_NAME, + use_seconds, + )?) + .with_view(histo_view( + ACTIVITY_EXEC_LATENCY_HISTOGRAM_NAME, + use_seconds, + )?); + for (name, buckets) in bucket_overrides.overrides { + mpb = mpb.with_view(new_view( + Instrument::new().name(format!("*{name}")), + opentelemetry_sdk::metrics::Stream::new().aggregation( + Aggregation::ExplicitBucketHistogram { + boundaries: buckets, + record_min_max: true, + }, + ), + )?) } + // Fallback default + mpb = mpb.with_view(new_view( + { + let mut i = Instrument::new(); + i.kind = Some(InstrumentKind::Histogram); + i + }, + opentelemetry_sdk::metrics::Stream::new().aggregation( + Aggregation::ExplicitBucketHistogram { + boundaries: if use_seconds { + DEFAULT_S_BUCKETS.to_vec() + } else { + DEFAULT_MS_BUCKETS.to_vec() + }, + record_min_max: true, + }, + ), + )?); + Ok(mpb.with_resource(default_resource(global_tags))) } /// Create an OTel meter that can be used as a [CoreMeter] to export metrics over OTLP. @@ -167,10 +127,9 @@ pub fn build_otlp_metric_exporter( } let exporter = exporter .with_metadata(MetadataMap::from_headers((&opts.headers).try_into()?)) - .build_metrics_exporter( - Box::new(SDKAggSelector::new(opts.use_seconds_for_durations)), - Box::new(metric_temporality_to_selector(opts.metric_temporality)), - )?; + .build_metrics_exporter(Box::new(metric_temporality_to_selector( + opts.metric_temporality, + )))?; let reader = PeriodicReader::builder(exporter, runtime::Tokio) .with_interval(opts.metric_periodicity) .build(); @@ -178,6 +137,7 @@ pub fn build_otlp_metric_exporter( MeterProviderBuilder::default().with_reader(reader), &opts.global_tags, opts.use_seconds_for_durations, + opts.histogram_bucket_overrides, )? .build(); Ok::<_, anyhow::Error>(CoreOtelMeter { @@ -200,12 +160,12 @@ pub struct StartedPromServer { pub fn start_prometheus_metric_exporter( opts: PrometheusExporterOptions, ) -> Result { - let (srv, exporter) = - PromServer::new(&opts, SDKAggSelector::new(opts.use_seconds_for_durations))?; + let (srv, exporter) = PromServer::new(&opts)?; let meter_provider = augment_meter_provider_with_defaults( MeterProviderBuilder::default().with_reader(exporter), &opts.global_tags, opts.use_seconds_for_durations, + opts.histogram_bucket_overrides, )? .build(); let bound_addr = srv.bound_addr()?; @@ -223,7 +183,7 @@ pub fn start_prometheus_metric_exporter( #[derive(Debug)] pub struct CoreOtelMeter { - meter: Meter, + pub meter: Meter, use_seconds_for_durations: bool, // we have to hold on to the provider otherwise otel automatically shuts it down on drop // for whatever crazy reason @@ -292,11 +252,23 @@ impl CoreMeter for CoreOtelMeter { } fn gauge(&self, params: MetricParameters) -> Arc { - Arc::new(MemoryGauge::::new(params, &self.meter)) + Arc::new( + self.meter + .u64_gauge(params.name) + .with_unit(params.unit) + .with_description(params.description) + .init(), + ) } fn gauge_f64(&self, params: MetricParameters) -> Arc { - Arc::new(MemoryGauge::::new(params, &self.meter)) + Arc::new( + self.meter + .f64_gauge(params.name) + .with_unit(params.unit) + .with_description(params.description) + .init(), + ) } } @@ -315,25 +287,6 @@ impl HistogramDuration for DurationHistogram { } } -impl Gauge for MemoryGauge { - fn record(&self, value: u64, attributes: &MetricAttributes) { - if let MetricAttributes::OTel { kvs } = attributes { - self.record(value, kvs); - } else { - dbg_panic!("Must use OTel attributes with an OTel metric implementation"); - } - } -} -impl GaugeF64 for MemoryGauge { - fn record(&self, value: f64, attributes: &MetricAttributes) { - if let MetricAttributes::OTel { kvs } = attributes { - self.record(value, kvs); - } else { - dbg_panic!("Must use OTel attributes with an OTel metric implementation"); - } - } -} - fn default_resource_instance() -> &'static Resource { use std::sync::OnceLock; diff --git a/core/src/telemetry/prometheus_server.rs b/core/src/telemetry/prometheus_server.rs index 9eefa700c..e49ddf9a1 100644 --- a/core/src/telemetry/prometheus_server.rs +++ b/core/src/telemetry/prometheus_server.rs @@ -5,7 +5,6 @@ use hyper_util::{ server::conn::auto, }; use opentelemetry_prometheus::PrometheusExporter; -use opentelemetry_sdk::metrics::reader::AggregationSelector; use prometheus::{Encoder, Registry, TextEncoder}; use std::net::{SocketAddr, TcpListener}; use temporal_sdk_core_api::telemetry::PrometheusExporterOptions; @@ -20,11 +19,9 @@ pub(super) struct PromServer { impl PromServer { pub(super) fn new( opts: &PrometheusExporterOptions, - aggregation: impl AggregationSelector + 'static, ) -> Result<(Self, PrometheusExporter), anyhow::Error> { let registry = Registry::new(); let exporter = opentelemetry_prometheus::exporter() - .with_aggregation_selector(aggregation) .without_scope_info() .with_registry(registry.clone()); let exporter = if !opts.counters_total_suffix { diff --git a/sdk-core-protos/Cargo.toml b/sdk-core-protos/Cargo.toml index d1a3687ec..477a980d9 100644 --- a/sdk-core-protos/Cargo.toml +++ b/sdk-core-protos/Cargo.toml @@ -25,7 +25,7 @@ prost-wkt-types = "0.6" rand = { version = "0.8", optional = true } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" -thiserror = "1.0" +thiserror = { workspace = true } tonic = { workspace = true } uuid = { version = "1.1", features = ["v4"], optional = true } diff --git a/sdk/Cargo.toml b/sdk/Cargo.toml index e019b4b14..0a05059f2 100644 --- a/sdk/Cargo.toml +++ b/sdk/Cargo.toml @@ -12,7 +12,7 @@ categories = ["development-tools"] [dependencies] async-trait = "0.1" -thiserror = "1.0" +thiserror = { workspace = true } anyhow = "1.0" derive_more = { workspace = true } futures-util = { version = "0.3", default-features = false } diff --git a/test-utils/Cargo.toml b/test-utils/Cargo.toml index 96c10cc01..2abf5d0cd 100644 --- a/test-utils/Cargo.toml +++ b/test-utils/Cargo.toml @@ -30,7 +30,7 @@ temporal-client = { path = "../client" } temporal-sdk = { path = "../sdk" } temporal-sdk-core = { path = "../core" } temporal-sdk-core-api = { path = "../core-api" } -thiserror = "1.0" +thiserror = { workspace = true } tokio = "1.1" tokio-util = { version = "0.7" } tracing = "0.1" diff --git a/tests/integ_tests/metrics_tests.rs b/tests/integ_tests/metrics_tests.rs index 39d4d9e16..16477d193 100644 --- a/tests/integ_tests/metrics_tests.rs +++ b/tests/integ_tests/metrics_tests.rs @@ -1,7 +1,9 @@ use anyhow::anyhow; use assert_matches::assert_matches; -use std::{env, net::SocketAddr, sync::Arc, time::Duration}; -use temporal_client::{WorkflowClientTrait, WorkflowOptions, WorkflowService}; +use std::{collections::HashMap, env, net::SocketAddr, sync::Arc, time::Duration}; +use temporal_client::{ + WorkflowClientTrait, WorkflowOptions, WorkflowService, REQUEST_LATENCY_HISTOGRAM_NAME, +}; use temporal_sdk::{ ActContext, ActivityError, ActivityOptions, CancellableFuture, LocalActivityOptions, WfContext, }; @@ -13,8 +15,8 @@ use temporal_sdk_core::{ use temporal_sdk_core_api::{ telemetry::{ metrics::{CoreMeter, MetricAttributes, MetricParameters}, - OtelCollectorOptionsBuilder, PrometheusExporterOptionsBuilder, TelemetryOptions, - TelemetryOptionsBuilder, + HistogramBucketOverrides, OtelCollectorOptionsBuilder, PrometheusExporterOptions, + PrometheusExporterOptionsBuilder, TelemetryOptions, TelemetryOptionsBuilder, }, worker::WorkerConfigBuilder, Worker, @@ -62,19 +64,16 @@ impl Drop for AbortOnDrop { } pub(crate) fn prom_metrics( - use_seconds: bool, - show_units: bool, + options_override: Option, ) -> (TelemetryOptions, SocketAddr, AbortOnDrop) { - let mut telemopts = get_integ_telem_options(); - let prom_info = start_prometheus_metric_exporter( + let prom_exp_opts = options_override.unwrap_or_else(|| { PrometheusExporterOptionsBuilder::default() .socket_addr(ANY_PORT.parse().unwrap()) - .use_seconds_for_durations(use_seconds) - .unit_suffix(show_units) .build() - .unwrap(), - ) - .unwrap(); + .unwrap() + }); + let mut telemopts = get_integ_telem_options(); + let prom_info = start_prometheus_metric_exporter(prom_exp_opts).unwrap(); telemopts.metrics = Some(prom_info.meter as Arc); ( telemopts, @@ -87,8 +86,24 @@ pub(crate) fn prom_metrics( #[rstest::rstest] #[tokio::test] -async fn prometheus_metrics_exported(#[values(true, false)] use_seconds_latency: bool) { - let (telemopts, addr, _aborter) = prom_metrics(use_seconds_latency, false); +async fn prometheus_metrics_exported( + #[values(true, false)] use_seconds_latency: bool, + #[values(true, false)] custom_buckets: bool, +) { + let mut opts_builder = PrometheusExporterOptionsBuilder::default(); + opts_builder + .socket_addr(ANY_PORT.parse().unwrap()) + .use_seconds_for_durations(use_seconds_latency); + if custom_buckets { + opts_builder.histogram_bucket_overrides(HistogramBucketOverrides { + overrides: { + let mut hm = HashMap::new(); + hm.insert(REQUEST_LATENCY_HISTOGRAM_NAME.to_string(), vec![1337.0]); + hm + }, + }); + } + let (telemopts, addr, _aborter) = prom_metrics(Some(opts_builder.build().unwrap())); let rt = CoreRuntime::new_assume_tokio(telemopts).unwrap(); let opts = get_integ_server_options(); let mut raw_client = opts @@ -109,7 +124,12 @@ async fn prometheus_metrics_exported(#[values(true, false)] use_seconds_latency: assert!(body.contains( "temporal_request_latency_count{operation=\"GetSystemInfo\",service_name=\"temporal-core-sdk\"} 1" )); - if use_seconds_latency { + if custom_buckets { + assert!(body.contains( + "temporal_request_latency_bucket{\ + operation=\"GetSystemInfo\",service_name=\"temporal-core-sdk\",le=\"1337\"}" + )); + } else if use_seconds_latency { assert!(body.contains( "temporal_request_latency_bucket{\ operation=\"GetSystemInfo\",service_name=\"temporal-core-sdk\",le=\"0.05\"}" @@ -132,12 +152,13 @@ async fn prometheus_metrics_exported(#[values(true, false)] use_seconds_latency: }, ); let body = get_text(format!("http://{addr}/metrics")).await; + println!("{}", &body); assert!(body.contains("\nmygauge 42")); } #[tokio::test] async fn one_slot_worker_reports_available_slot() { - let (telemopts, addr, _aborter) = prom_metrics(false, false); + let (telemopts, addr, _aborter) = prom_metrics(None); let tq = "one_slot_worker_tq"; let rt = CoreRuntime::new_assume_tokio(telemopts).unwrap(); @@ -373,7 +394,7 @@ async fn query_of_closed_workflow_doesnt_tick_terminal_metric( )] completion: workflow_command::Variant, ) { - let (telemopts, addr, _aborter) = prom_metrics(false, false); + let (telemopts, addr, _aborter) = prom_metrics(None); let rt = CoreRuntime::new_assume_tokio(telemopts).unwrap(); let mut starter = CoreWfStarter::new_with_runtime("query_of_closed_workflow_doesnt_tick_terminal_metric", rt); @@ -500,7 +521,7 @@ fn runtime_new() { CoreRuntime::new(get_integ_telem_options(), TokioRuntimeBuilder::default()).unwrap(); let handle = rt.tokio_handle(); let _rt = handle.enter(); - let (telemopts, addr, _aborter) = prom_metrics(false, false); + let (telemopts, addr, _aborter) = prom_metrics(None); rt.telemetry_mut() .attach_late_init_metrics(telemopts.metrics.unwrap()); let opts = get_integ_server_options(); @@ -525,7 +546,14 @@ async fn latency_metrics( #[values(true, false)] use_seconds_latency: bool, #[values(true, false)] show_units: bool, ) { - let (telemopts, addr, _aborter) = prom_metrics(use_seconds_latency, show_units); + let (telemopts, addr, _aborter) = prom_metrics(Some( + PrometheusExporterOptionsBuilder::default() + .socket_addr(ANY_PORT.parse().unwrap()) + .use_seconds_for_durations(use_seconds_latency) + .unit_suffix(show_units) + .build() + .unwrap(), + )); let rt = CoreRuntime::new_assume_tokio(telemopts).unwrap(); let mut starter = CoreWfStarter::new_with_runtime("latency_metrics", rt); let worker = starter.get_worker().await; @@ -561,7 +589,7 @@ async fn latency_metrics( #[tokio::test] async fn request_fail_codes() { - let (telemopts, addr, _aborter) = prom_metrics(false, false); + let (telemopts, addr, _aborter) = prom_metrics(None); let rt = CoreRuntime::new_assume_tokio(telemopts).unwrap(); let opts = get_integ_server_options(); let mut client = opts @@ -625,7 +653,7 @@ async fn request_fail_codes_otel() { #[tokio::test] async fn activity_metrics() { - let (telemopts, addr, _aborter) = prom_metrics(false, false); + let (telemopts, addr, _aborter) = prom_metrics(None); let rt = CoreRuntime::new_assume_tokio(telemopts).unwrap(); let wf_name = "activity_metrics"; let mut starter = CoreWfStarter::new_with_runtime(wf_name, rt); diff --git a/tests/integ_tests/workflow_tests.rs b/tests/integ_tests/workflow_tests.rs index c2f006330..673ec9081 100644 --- a/tests/integ_tests/workflow_tests.rs +++ b/tests/integ_tests/workflow_tests.rs @@ -753,7 +753,7 @@ async fn build_id_correct_in_wf_info() { async fn nondeterminism_errors_fail_workflow_when_configured_to( #[values(true, false)] whole_worker: bool, ) { - let (telemopts, addr, _aborter) = metrics_tests::prom_metrics(false, false); + let (telemopts, addr, _aborter) = metrics_tests::prom_metrics(None); let rt = CoreRuntime::new_assume_tokio(telemopts).unwrap(); let wf_name = "nondeterminism_errors_fail_workflow_when_configured_to"; let mut starter = CoreWfStarter::new_with_runtime(wf_name, rt);