diff --git a/client/src/metrics.rs b/client/src/metrics.rs index 8f92ba96f..332013151 100644 --- a/client/src/metrics.rs +++ b/client/src/metrics.rs @@ -18,6 +18,7 @@ use tower::Service; #[derive(Clone, derive_more::DebugCustom)] #[debug(fmt = "MetricsContext {{ attribs: {kvs:?}, poll_is_long: {poll_is_long} }}")] pub struct MetricsContext { + meter: Arc, kvs: MetricAttributes, poll_is_long: bool, @@ -66,19 +67,15 @@ impl MetricsContext { unit: "ms".into(), description: "Histogram of client long-poll request latencies".into(), }), + meter, } } - /// Extend an existing metrics context with new attributes, returning a new one - pub(crate) fn with_new_attrs(&self, new_kvs: impl IntoIterator) -> Self { - let mut r = self.clone(); - r.add_new_attrs(new_kvs); - r - } - - /// Add new attributes to the context, mutating it - pub(crate) fn add_new_attrs(&mut self, new_kvs: impl IntoIterator) { - self.kvs.add_new_attrs(new_kvs); + /// Mutate this metrics context with new attributes + pub(crate) fn with_new_attrs(&mut self, new_kvs: impl IntoIterator) { + self.kvs = self + .meter + .extend_attributes(self.kvs.clone(), new_kvs.into()); } pub(crate) fn set_is_long_poll(&mut self) { @@ -152,19 +149,18 @@ impl Service> for GrpcMetricSvc { let metrics = self .metrics .clone() - .map(|m| { + .map(|mut m| { // Attach labels from client wrapper if let Some(other_labels) = req.extensions_mut().remove::() { m.with_new_attrs(other_labels.labels) - } else { - m } + m }) .and_then(|mut metrics| { // Attach method name label if possible req.uri().to_string().rsplit_once('/').map(|split_tup| { let method_name = split_tup.1; - metrics.add_new_attrs([svc_operation(method_name.to_string())]); + metrics.with_new_attrs([svc_operation(method_name.to_string())]); if LONG_POLL_METHOD_NAMES.contains(&method_name) { metrics.set_is_long_poll(); } diff --git a/core-api/src/telemetry/metrics.rs b/core-api/src/telemetry/metrics.rs index 546e94db0..f41415f01 100644 --- a/core-api/src/telemetry/metrics.rs +++ b/core-api/src/telemetry/metrics.rs @@ -1,10 +1,26 @@ -use std::{borrow::Cow, collections::HashSet, fmt::Debug, sync::Arc}; +use std::{ + any::Any, + borrow::Cow, + fmt::Debug, + sync::{Arc, OnceLock}, +}; /// Implementors of this trait are expected to be defined in each language's bridge. /// The implementor is responsible for the allocation/instantiation of new metric meters which /// Core has requested. pub trait CoreMeter: Send + Sync + Debug { - fn new_attributes(&self, attribs: MetricsAttributesOptions) -> MetricAttributes; + /// Given some k/v pairs, create a return a new instantiated instance of metric attributes. + /// Only [MetricAttributes] created by this meter can be used when calling record on instruments + /// created by this meter. + fn new_attributes(&self, attribs: NewAttributes) -> MetricAttributes; + /// Extend some existing attributes with new values. Implementations should create new instances + /// when doing so, rather than mutating whatever is backing the passed in `existing` attributes. + /// Ideally that new instance retains a ref to the extended old attribute, promoting re-use. + fn extend_attributes( + &self, + existing: MetricAttributes, + attribs: NewAttributes, + ) -> MetricAttributes; fn counter(&self, params: MetricParameters) -> Arc; fn histogram(&self, params: MetricParameters) -> Arc; fn gauge(&self, params: MetricParameters) -> Arc; @@ -36,48 +52,22 @@ impl From<&'static str> for MetricParameters { #[derive(derive_more::Constructor, Clone, Debug)] pub struct TemporalMeter { pub inner: Arc, - pub default_attribs: MetricsAttributesOptions, -} - -#[derive(Debug, Clone)] -pub enum MetricEvent { - Create { - params: MetricParameters, - id: u64, - kind: MetricKind, - }, - CreateAttributes { - id: u64, - attributes: Vec, - }, - Update { - id: u64, - attributes: LangMetricAttributes, - update: MetricUpdateVal, - }, -} -#[derive(Debug, Clone, Copy)] -pub enum MetricKind { - Counter, - Gauge, - Histogram, -} -#[derive(Debug, Clone, Copy)] -pub enum MetricUpdateVal { - // Currently all deltas are natural numbers - Delta(u64), - // Currently all values are natural numbers - Value(u64), -} - -pub trait MetricCallBufferer: Send + Sync { - fn retrieve(&self) -> Vec; + pub default_attribs: NewAttributes, } impl CoreMeter for Arc { - fn new_attributes(&self, attribs: MetricsAttributesOptions) -> MetricAttributes { + fn new_attributes(&self, attribs: NewAttributes) -> MetricAttributes { self.as_ref().new_attributes(attribs) } + + fn extend_attributes( + &self, + existing: MetricAttributes, + attribs: NewAttributes, + ) -> MetricAttributes { + self.as_ref().extend_attributes(existing, attribs) + } + fn counter(&self, params: MetricParameters) -> Arc { self.as_ref().counter(params) } @@ -98,60 +88,37 @@ pub enum MetricAttributes { OTel { kvs: Arc>, }, - Lang(LangMetricAttributes), -} -#[derive(Clone, Debug)] -pub struct LangMetricAttributes { - /// A set of references to attributes stored in lang memory. All referenced attributes should - /// be attached to the metric when recording. - pub ids: HashSet, - /// If populated, these key values should also be used in addition to the referred-to - /// existing attributes when recording - pub new_attributes: Vec, + Buffer(BufferAttributes), + Dynamic(Arc), } -impl MetricAttributes { - /// Extend existing metrics attributes with others, returning a new instance - pub fn merge(&self, other: MetricAttributes) -> Self { - let mut me = self.clone(); - match (&mut me, other) { - #[cfg(feature = "otel_impls")] - (MetricAttributes::OTel { ref mut kvs }, MetricAttributes::OTel { kvs: other_kvs }) => { - Arc::make_mut(kvs).extend((*other_kvs).clone()); - } - (MetricAttributes::Lang(ref mut l), MetricAttributes::Lang(ol)) => { - l.ids.extend(ol.ids); - l.new_attributes.extend(ol.new_attributes); - } - _ => panic!("Cannot merge metric attributes of different kinds"), - } - me - } - - /// Mutate self to add new kvs - pub fn add_new_attrs(&mut self, new_kvs: impl IntoIterator) { - match self { - #[cfg(feature = "otel_impls")] - MetricAttributes::OTel { ref mut kvs, .. } => { - Arc::make_mut(kvs).extend(new_kvs.into_iter().map(Into::into)); - } - MetricAttributes::Lang(ref mut attrs, ..) => { - attrs.new_attributes.extend(new_kvs); - } - } - } +/// A reference to some attributes created lang side. +pub trait CustomMetricAttributes: Debug + Send + Sync { + /// Must be implemented to work around existing type system restrictions, see + /// [here](https://internals.rust-lang.org/t/downcast-not-from-any-but-from-any-trait/16736/12) + fn as_any(self: Arc) -> Arc; } /// Options that are attached to metrics on a per-call basis #[derive(Clone, Debug, Default, derive_more::Constructor)] -pub struct MetricsAttributesOptions { +pub struct NewAttributes { pub attributes: Vec, } -impl MetricsAttributesOptions { +impl NewAttributes { pub fn extend(&mut self, new_kvs: impl IntoIterator) { self.attributes.extend(new_kvs) } } +impl From for NewAttributes +where + I: IntoIterator, +{ + fn from(value: I) -> Self { + Self { + attributes: value.into_iter().collect(), + } + } +} /// A K/V pair that can be used to label a specific recording of a metric #[derive(Clone, Debug)] @@ -197,14 +164,92 @@ pub trait Gauge: Send + Sync { fn record(&self, value: u64, attributes: &MetricAttributes); } +#[derive(Debug, Clone)] +pub enum MetricEvent { + Create { + params: MetricParameters, + /// One you receive this event, call `set` on this with the initialized instrument reference + populate_into: LazyBufferInstrument, + kind: MetricKind, + }, + CreateAttributes { + /// One you receive this event, call `set` on this with the initialized attributes + populate_into: BufferAttributes, + /// If not `None`, use these already-initialized attributes as the base (extended with + /// `attributes`) for the ones you are about to initialize. + append_from: Option, + attributes: Vec, + }, + Update { + instrument: LazyBufferInstrument, + attributes: BufferAttributes, + update: MetricUpdateVal, + }, +} +#[derive(Debug, Clone, Copy)] +pub enum MetricKind { + Counter, + Gauge, + Histogram, +} +#[derive(Debug, Clone, Copy)] +pub enum MetricUpdateVal { + // Currently all deltas are natural numbers + Delta(u64), + // Currently all values are natural numbers + Value(u64), +} + +pub trait MetricCallBufferer: Send + Sync { + fn retrieve(&self) -> Vec>; +} + +/// A lazy reference to some metrics buffer attributes +pub type BufferAttributes = LazyRef>; + +/// Types lang uses to contain references to its lang-side defined instrument references must +/// implement this marker trait +pub trait BufferInstrumentRef {} +/// A lazy reference to a metrics buffer instrument +pub type LazyBufferInstrument = LazyRef>; + +#[derive(Debug, Clone)] +pub struct LazyRef { + to_be_initted: Arc>, +} +impl LazyRef { + pub fn hole() -> Self { + Self { + to_be_initted: Arc::new(OnceLock::new()), + } + } + + /// Get the reference you previously initialized + /// + /// # Panics + /// If `set` has not already been called. You must set the reference before using it. + pub fn get(&self) -> &T { + self.to_be_initted + .get() + .expect("You must initialize the reference before using it") + } + + /// Assigns a value to fill this reference. + /// Returns according to semantics of [OnceLock]. + pub fn set(&self, val: T) -> Result<(), T> { + self.to_be_initted.set(val) + } +} + #[derive(Debug)] pub struct NoOpCoreMeter; impl CoreMeter for NoOpCoreMeter { - fn new_attributes(&self, _: MetricsAttributesOptions) -> MetricAttributes { - MetricAttributes::Lang(LangMetricAttributes { - ids: HashSet::new(), - new_attributes: vec![], - }) + fn new_attributes(&self, _: NewAttributes) -> MetricAttributes { + MetricAttributes::Dynamic(Arc::new(NoOpAttributes)) + } + + fn extend_attributes(&self, existing: MetricAttributes, _: NewAttributes) -> MetricAttributes { + existing } fn counter(&self, _: MetricParameters) -> Arc { @@ -231,6 +276,14 @@ impl Gauge for NoOpInstrument { fn record(&self, _: u64, _: &MetricAttributes) {} } +#[derive(Debug, Clone)] +pub struct NoOpAttributes; +impl CustomMetricAttributes for NoOpAttributes { + fn as_any(self: Arc) -> Arc { + self as Arc + } +} + #[cfg(feature = "otel_impls")] mod otel_impls { use super::*; diff --git a/core/src/telemetry/metrics.rs b/core/src/telemetry/metrics.rs index 7f2dafb1d..50b01c489 100644 --- a/core/src/telemetry/metrics.rs +++ b/core/src/telemetry/metrics.rs @@ -21,20 +21,12 @@ use opentelemetry_sdk::{ runtime, AttributeSet, }; use parking_lot::RwLock; -use std::{ - collections::{HashMap, HashSet}, - net::SocketAddr, - sync::{ - atomic::{AtomicU64, Ordering}, - Arc, - }, - time::Duration, -}; +use std::{collections::HashMap, fmt::Debug, net::SocketAddr, sync::Arc, time::Duration}; use temporal_sdk_core_api::telemetry::{ metrics::{ - CoreMeter, Counter, Gauge, Histogram, LangMetricAttributes, MetricAttributes, - MetricCallBufferer, MetricEvent, MetricKeyValue, MetricKind, MetricParameters, - MetricUpdateVal, MetricsAttributesOptions, NoOpCoreMeter, + BufferAttributes, BufferInstrumentRef, CoreMeter, Counter, Gauge, Histogram, + LazyBufferInstrument, MetricAttributes, MetricCallBufferer, MetricEvent, MetricKeyValue, + MetricKind, MetricParameters, MetricUpdateVal, NewAttributes, NoOpCoreMeter, }, OtelCollectorOptions, PrometheusExporterOptions, }; @@ -111,10 +103,9 @@ impl MetricsContext { &self, new_attrs: impl IntoIterator, ) -> Self { - let as_attrs = self.meter.new_attributes(MetricsAttributesOptions::new( - new_attrs.into_iter().collect(), - )); - let kvs = self.kvs.merge(as_attrs); + let kvs = self + .meter + .extend_attributes(self.kvs.clone(), new_attrs.into()); Self { kvs, instruments: self.instruments.clone(), @@ -637,44 +628,69 @@ pub fn start_prometheus_metric_exporter( /// Buffers [MetricEvent]s for periodic consumption by lang #[derive(Debug)] -pub struct MetricsCallBuffer { - instrument_ids: AtomicU64, - attribute_ids: AtomicU64, - calls_rx: crossbeam::channel::Receiver, - calls_tx: crossbeam::channel::Sender, +pub struct MetricsCallBuffer +where + I: BufferInstrumentRef, +{ + calls_rx: crossbeam::channel::Receiver>, + calls_tx: crossbeam::channel::Sender>, } -impl MetricsCallBuffer { + +impl MetricsCallBuffer +where + I: Clone + BufferInstrumentRef, +{ /// Create a new buffer with the given capacity pub fn new(buffer_size: usize) -> Self { let (calls_tx, calls_rx) = crossbeam::channel::bounded(buffer_size); - MetricsCallBuffer { - instrument_ids: AtomicU64::new(0), - attribute_ids: AtomicU64::new(0), - calls_rx, - calls_tx, - } + MetricsCallBuffer { calls_rx, calls_tx } } - fn new_instrument(&self, params: MetricParameters, kind: MetricKind) -> BufferInstrument { - let id = self.instrument_ids.fetch_add(1, Ordering::AcqRel); - let _ = self.calls_tx.send(MetricEvent::Create { params, id, kind }); + fn new_instrument(&self, params: MetricParameters, kind: MetricKind) -> BufferInstrument { + let hole = LazyBufferInstrument::hole(); + let _ = self.calls_tx.send(MetricEvent::Create { + params, + kind, + populate_into: hole.clone(), + }); BufferInstrument { kind, - id, + instrument_ref: hole, tx: self.calls_tx.clone(), } } } -impl CoreMeter for MetricsCallBuffer { - fn new_attributes(&self, opts: MetricsAttributesOptions) -> MetricAttributes { - let id = self.attribute_ids.fetch_add(1, Ordering::AcqRel); + +impl CoreMeter for MetricsCallBuffer +where + I: BufferInstrumentRef + Debug + Send + Sync + Clone + 'static, +{ + fn new_attributes(&self, opts: NewAttributes) -> MetricAttributes { + let ba = BufferAttributes::hole(); let _ = self.calls_tx.send(MetricEvent::CreateAttributes { - id, + populate_into: ba.clone(), + append_from: None, attributes: opts.attributes, }); - MetricAttributes::Lang(LangMetricAttributes { - ids: HashSet::from([id]), - new_attributes: vec![], - }) + MetricAttributes::Buffer(ba) + } + + fn extend_attributes( + &self, + existing: MetricAttributes, + attribs: NewAttributes, + ) -> MetricAttributes { + if let MetricAttributes::Buffer(ol) = existing { + let ba = BufferAttributes::hole(); + let _ = self.calls_tx.send(MetricEvent::CreateAttributes { + populate_into: ba.clone(), + append_from: Some(ol), + attributes: attribs.attributes, + }); + MetricAttributes::Buffer(ba) + } else { + dbg_panic!("Must use buffer attributes with a buffer metric implementation"); + existing + } } fn counter(&self, params: MetricParameters) -> Arc { @@ -689,25 +705,31 @@ impl CoreMeter for MetricsCallBuffer { Arc::new(self.new_instrument(params, MetricKind::Gauge)) } } -impl MetricCallBufferer for MetricsCallBuffer { - fn retrieve(&self) -> Vec { +impl MetricCallBufferer for MetricsCallBuffer +where + I: Send + Sync + BufferInstrumentRef, +{ + fn retrieve(&self) -> Vec> { self.calls_rx.try_iter().collect() } } -struct BufferInstrument { +struct BufferInstrument { kind: MetricKind, - id: u64, - tx: crossbeam::channel::Sender, + instrument_ref: LazyBufferInstrument, + tx: crossbeam::channel::Sender>, } -impl BufferInstrument { +impl BufferInstrument +where + I: Clone + BufferInstrumentRef, +{ fn send(&self, value: u64, attributes: &MetricAttributes) { let attributes = match attributes { - MetricAttributes::Lang(l) => l.clone(), + MetricAttributes::Buffer(l) => l.clone(), _ => panic!("MetricsCallBuffer only works with MetricAttributes::Lang"), }; let _ = self.tx.send(MetricEvent::Update { - id: self.id, + instrument: self.instrument_ref.clone(), update: match self.kind { MetricKind::Counter => MetricUpdateVal::Delta(value), MetricKind::Gauge | MetricKind::Histogram => MetricUpdateVal::Value(value), @@ -716,17 +738,26 @@ impl BufferInstrument { }); } } -impl Counter for BufferInstrument { +impl Counter for BufferInstrument +where + I: BufferInstrumentRef + Send + Sync + Clone, +{ fn add(&self, value: u64, attributes: &MetricAttributes) { self.send(value, attributes) } } -impl Gauge for BufferInstrument { +impl Gauge for BufferInstrument +where + I: BufferInstrumentRef + Send + Sync + Clone, +{ fn record(&self, value: u64, attributes: &MetricAttributes) { self.send(value, attributes) } } -impl Histogram for BufferInstrument { +impl Histogram for BufferInstrument +where + I: BufferInstrumentRef + Send + Sync + Clone, +{ fn record(&self, value: u64, attributes: &MetricAttributes) { self.send(value, attributes) } @@ -735,12 +766,26 @@ impl Histogram for BufferInstrument { #[derive(Debug)] pub struct CoreOtelMeter(Meter); impl CoreMeter for CoreOtelMeter { - fn new_attributes(&self, attribs: MetricsAttributesOptions) -> MetricAttributes { + fn new_attributes(&self, attribs: NewAttributes) -> MetricAttributes { MetricAttributes::OTel { kvs: Arc::new(attribs.attributes.into_iter().map(KeyValue::from).collect()), } } + fn extend_attributes( + &self, + existing: MetricAttributes, + attribs: NewAttributes, + ) -> MetricAttributes { + if let MetricAttributes::OTel { mut kvs } = existing { + Arc::make_mut(&mut kvs).extend(attribs.attributes.into_iter().map(Into::into)); + MetricAttributes::OTel { kvs } + } else { + dbg_panic!("Must use OTel attributes with an OTel metric implementation"); + existing + } + } + fn counter(&self, params: MetricParameters) -> Arc { Arc::new( self.0 @@ -782,10 +827,18 @@ pub(crate) struct PrefixedMetricsMeter { meter: CM, } impl CoreMeter for PrefixedMetricsMeter { - fn new_attributes(&self, attribs: MetricsAttributesOptions) -> MetricAttributes { + fn new_attributes(&self, attribs: NewAttributes) -> MetricAttributes { self.meter.new_attributes(attribs) } + fn extend_attributes( + &self, + existing: MetricAttributes, + attribs: NewAttributes, + ) -> MetricAttributes { + self.meter.extend_attributes(existing, attribs) + } + fn counter(&self, mut params: MetricParameters) -> Arc { params.name = (self.prefix.clone() + &*params.name).into(); self.meter.counter(params) @@ -805,9 +858,36 @@ impl CoreMeter for PrefixedMetricsMeter { #[cfg(test)] mod tests { use super::*; - use temporal_sdk_core_api::telemetry::METRIC_PREFIX; + use std::any::Any; + use temporal_sdk_core_api::telemetry::{ + metrics::{BufferInstrumentRef, CustomMetricAttributes}, + METRIC_PREFIX, + }; use tracing::subscriber::NoSubscriber; + #[derive(Debug)] + struct DummyCustomAttrs(usize); + impl CustomMetricAttributes for DummyCustomAttrs { + fn as_any(self: Arc) -> Arc { + self as Arc + } + } + impl DummyCustomAttrs { + fn as_id(ba: &BufferAttributes) -> usize { + let as_dum = ba + .get() + .clone() + .as_any() + .downcast::() + .unwrap(); + as_dum.0 + } + } + + #[derive(Debug, Clone)] + struct DummyInstrumentRef(usize); + impl BufferInstrumentRef for DummyInstrumentRef {} + #[test] fn test_buffered_core_context() { let no_op_subscriber = Arc::new(NoSubscriber::new()); @@ -822,55 +902,62 @@ mod tests { let mc = MetricsContext::top_level("foo".to_string(), "q".to_string(), &telem_instance); mc.cache_eviction(); let events = call_buffer.retrieve(); - assert_matches!( + let a1 = assert_matches!( &events[0], MetricEvent::CreateAttributes { - id: 0, - attributes + populate_into, + append_from: None, + attributes, } if attributes[0].key == "service_name" && attributes[1].key == "namespace" && attributes[2].key == "task_queue" + => populate_into ); + a1.set(Arc::new(DummyCustomAttrs(1))).unwrap(); // Verify all metrics are created. This number will need to get updated any time a metric // is added. - let num_metrics = 22; + let num_metrics = 23; #[allow(clippy::needless_range_loop)] // Sorry clippy, this reads easier. for metric_num in 1..=num_metrics { - assert_matches!(&events[metric_num], - MetricEvent::Create { id, .. } - if *id == (metric_num - 1) as u64 + let hole = assert_matches!(&events[metric_num], + MetricEvent::Create { populate_into, .. } + => populate_into ); + hole.set(Arc::new(DummyInstrumentRef(metric_num))).unwrap(); } assert_matches!( - &events[num_metrics + 2], // +2 for attrib creation (at start), then this update + &events[num_metrics + 1], // +1 for attrib creation (at start), then this update MetricEvent::Update { - id: 22, + instrument, attributes, update: MetricUpdateVal::Delta(1) } - if attributes.ids == HashSet::from([0]) + if DummyCustomAttrs::as_id(attributes) == 1 && instrument.get().0 == num_metrics ); // Verify creating a new context with new attributes merges them properly let mc2 = mc.with_new_attrs([MetricKeyValue::new("gotta", "go fast")]); mc2.wf_task_latency(Duration::from_secs(1)); let events = call_buffer.retrieve(); - assert_matches!( + let a2 = assert_matches!( &events[0], MetricEvent::CreateAttributes { - id: 1, + populate_into, + append_from: Some(eh), attributes } - if attributes[0].key == "gotta" + if attributes[0].key == "gotta" && DummyCustomAttrs::as_id(eh) == 1 + => populate_into ); + a2.set(Arc::new(DummyCustomAttrs(2))).unwrap(); assert_matches!( &events[1], MetricEvent::Update { - id: 10, + instrument, attributes, update: MetricUpdateVal::Value(1000) // milliseconds } - if attributes.ids == HashSet::from([0, 1]) + if DummyCustomAttrs::as_id(attributes) == 2 && instrument.get().0 == 11 ); } @@ -892,10 +979,10 @@ mod tests { description: "a counter".into(), unit: "bleezles".into(), }); - let attrs_1 = call_buffer.new_attributes(MetricsAttributesOptions { + let attrs_1 = call_buffer.new_attributes(NewAttributes { attributes: vec![MetricKeyValue::new("hi", "yo")], }); - let attrs_2 = call_buffer.new_attributes(MetricsAttributesOptions { + let attrs_2 = call_buffer.new_attributes(NewAttributes { attributes: vec![MetricKeyValue::new("run", "fast")], }); ctr.add(1, &attrs_1); @@ -904,75 +991,87 @@ mod tests { let mut calls = call_buffer.retrieve(); calls.reverse(); - assert_matches!( + let ctr_1 = assert_matches!( calls.pop(), Some(MetricEvent::Create { params, - id: 0, + populate_into, kind: MetricKind::Counter }) if params.name == "ctr" + => populate_into ); - assert_matches!( + ctr_1.set(Arc::new(DummyInstrumentRef(1))).unwrap(); + let hist_2 = assert_matches!( calls.pop(), Some(MetricEvent::Create { params, - id: 1, + populate_into, kind: MetricKind::Histogram }) if params.name == "histo" + => populate_into ); - assert_matches!( + hist_2.set(Arc::new(DummyInstrumentRef(2))).unwrap(); + let gauge_3 = assert_matches!( calls.pop(), Some(MetricEvent::Create { params, - id: 2, + populate_into, kind: MetricKind::Gauge }) if params.name == "gauge" + => populate_into ); - assert_matches!( + gauge_3.set(Arc::new(DummyInstrumentRef(3))).unwrap(); + let a1 = assert_matches!( calls.pop(), Some(MetricEvent::CreateAttributes { - id: 0, + populate_into, + append_from: None, attributes }) if attributes[0].key == "hi" + => populate_into ); - assert_matches!( + a1.set(Arc::new(DummyCustomAttrs(1))).unwrap(); + let a2 = assert_matches!( calls.pop(), Some(MetricEvent::CreateAttributes { - id: 1, + populate_into, + append_from: None, attributes }) if attributes[0].key == "run" + => populate_into ); + a2.set(Arc::new(DummyCustomAttrs(2))).unwrap(); assert_matches!( calls.pop(), Some(MetricEvent::Update{ - id: 0, + instrument, attributes, update: MetricUpdateVal::Delta(1) }) - if attributes.ids == HashSet::from([0]) + if DummyCustomAttrs::as_id(&attributes) == 1 && instrument.get().0 == 1 ); assert_matches!( calls.pop(), Some(MetricEvent::Update{ - id: 1, + instrument, attributes, update: MetricUpdateVal::Value(2) }) - if attributes.ids == HashSet::from([0]) + if DummyCustomAttrs::as_id(&attributes) == 1 && instrument.get().0 == 2 ); assert_matches!( calls.pop(), Some(MetricEvent::Update{ - id: 2, + instrument, attributes, update: MetricUpdateVal::Value(3) }) - if attributes.ids == HashSet::from([1]) + if DummyCustomAttrs::as_id(&attributes) == 2&& instrument.get().0 == 3 ); } } diff --git a/core/src/telemetry/mod.rs b/core/src/telemetry/mod.rs index dadc4324c..244f0e6ad 100644 --- a/core/src/telemetry/mod.rs +++ b/core/src/telemetry/mod.rs @@ -29,7 +29,7 @@ use std::{ }, }; use temporal_sdk_core_api::telemetry::{ - metrics::{CoreMeter, MetricKeyValue, MetricsAttributesOptions, TemporalMeter}, + metrics::{CoreMeter, MetricKeyValue, NewAttributes, TemporalMeter}, CoreLog, CoreTelemetry, Logger, MetricTemporality, TelemetryOptions, }; use tracing::{Level, Subscriber}; @@ -92,7 +92,7 @@ impl TelemetryInstance { pub fn get_temporal_metric_meter(&self) -> Option { self.metrics.clone().map(|m| { let kvs = self.default_kvs(); - let attribs = MetricsAttributesOptions::new(kvs); + let attribs = NewAttributes::new(kvs); TemporalMeter::new( Arc::new(PrefixedMetricsMeter::new(self.metric_prefix.clone(), m)) as Arc, @@ -105,7 +105,7 @@ impl TelemetryInstance { pub fn get_metric_meter(&self) -> Option { self.metrics.clone().map(|m| { let kvs = self.default_kvs(); - let attribs = MetricsAttributesOptions::new(kvs); + let attribs = NewAttributes::new(kvs); TemporalMeter::new(m, attribs) }) }