Skip to content

Commit

Permalink
Allow CoreMeter implementations to use their own custom attribute r…
Browse files Browse the repository at this point in the history
…eference (#608)
  • Loading branch information
Sushisource authored Sep 26, 2023
1 parent 9056826 commit e9a6f06
Show file tree
Hide file tree
Showing 4 changed files with 338 additions and 190 deletions.
24 changes: 10 additions & 14 deletions client/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use tower::Service;
#[derive(Clone, derive_more::DebugCustom)]
#[debug(fmt = "MetricsContext {{ attribs: {kvs:?}, poll_is_long: {poll_is_long} }}")]
pub struct MetricsContext {
meter: Arc<dyn CoreMeter>,
kvs: MetricAttributes,
poll_is_long: bool,

Expand Down Expand Up @@ -66,19 +67,15 @@ impl MetricsContext {
unit: "ms".into(),
description: "Histogram of client long-poll request latencies".into(),
}),
meter,
}
}

/// Extend an existing metrics context with new attributes, returning a new one
pub(crate) fn with_new_attrs(&self, new_kvs: impl IntoIterator<Item = MetricKeyValue>) -> Self {
let mut r = self.clone();
r.add_new_attrs(new_kvs);
r
}

/// Add new attributes to the context, mutating it
pub(crate) fn add_new_attrs(&mut self, new_kvs: impl IntoIterator<Item = MetricKeyValue>) {
self.kvs.add_new_attrs(new_kvs);
/// Mutate this metrics context with new attributes
pub(crate) fn with_new_attrs(&mut self, new_kvs: impl IntoIterator<Item = MetricKeyValue>) {
self.kvs = self
.meter
.extend_attributes(self.kvs.clone(), new_kvs.into());
}

pub(crate) fn set_is_long_poll(&mut self) {
Expand Down Expand Up @@ -152,19 +149,18 @@ impl Service<http::Request<BoxBody>> for GrpcMetricSvc {
let metrics = self
.metrics
.clone()
.map(|m| {
.map(|mut m| {
// Attach labels from client wrapper
if let Some(other_labels) = req.extensions_mut().remove::<AttachMetricLabels>() {
m.with_new_attrs(other_labels.labels)
} else {
m
}
m
})
.and_then(|mut metrics| {
// Attach method name label if possible
req.uri().to_string().rsplit_once('/').map(|split_tup| {
let method_name = split_tup.1;
metrics.add_new_attrs([svc_operation(method_name.to_string())]);
metrics.with_new_attrs([svc_operation(method_name.to_string())]);
if LONG_POLL_METHOD_NAMES.contains(&method_name) {
metrics.set_is_long_poll();
}
Expand Down
225 changes: 139 additions & 86 deletions core-api/src/telemetry/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,26 @@
use std::{borrow::Cow, collections::HashSet, fmt::Debug, sync::Arc};
use std::{
any::Any,
borrow::Cow,
fmt::Debug,
sync::{Arc, OnceLock},
};

/// Implementors of this trait are expected to be defined in each language's bridge.
/// The implementor is responsible for the allocation/instantiation of new metric meters which
/// Core has requested.
pub trait CoreMeter: Send + Sync + Debug {
fn new_attributes(&self, attribs: MetricsAttributesOptions) -> MetricAttributes;
/// Given some k/v pairs, create a return a new instantiated instance of metric attributes.
/// Only [MetricAttributes] created by this meter can be used when calling record on instruments
/// created by this meter.
fn new_attributes(&self, attribs: NewAttributes) -> MetricAttributes;
/// Extend some existing attributes with new values. Implementations should create new instances
/// when doing so, rather than mutating whatever is backing the passed in `existing` attributes.
/// Ideally that new instance retains a ref to the extended old attribute, promoting re-use.
fn extend_attributes(
&self,
existing: MetricAttributes,
attribs: NewAttributes,
) -> MetricAttributes;
fn counter(&self, params: MetricParameters) -> Arc<dyn Counter>;
fn histogram(&self, params: MetricParameters) -> Arc<dyn Histogram>;
fn gauge(&self, params: MetricParameters) -> Arc<dyn Gauge>;
Expand Down Expand Up @@ -36,48 +52,22 @@ impl From<&'static str> for MetricParameters {
#[derive(derive_more::Constructor, Clone, Debug)]
pub struct TemporalMeter {
pub inner: Arc<dyn CoreMeter>,
pub default_attribs: MetricsAttributesOptions,
}

#[derive(Debug, Clone)]
pub enum MetricEvent {
Create {
params: MetricParameters,
id: u64,
kind: MetricKind,
},
CreateAttributes {
id: u64,
attributes: Vec<MetricKeyValue>,
},
Update {
id: u64,
attributes: LangMetricAttributes,
update: MetricUpdateVal,
},
}
#[derive(Debug, Clone, Copy)]
pub enum MetricKind {
Counter,
Gauge,
Histogram,
}
#[derive(Debug, Clone, Copy)]
pub enum MetricUpdateVal {
// Currently all deltas are natural numbers
Delta(u64),
// Currently all values are natural numbers
Value(u64),
}

pub trait MetricCallBufferer: Send + Sync {
fn retrieve(&self) -> Vec<MetricEvent>;
pub default_attribs: NewAttributes,
}

impl CoreMeter for Arc<dyn CoreMeter> {
fn new_attributes(&self, attribs: MetricsAttributesOptions) -> MetricAttributes {
fn new_attributes(&self, attribs: NewAttributes) -> MetricAttributes {
self.as_ref().new_attributes(attribs)
}

fn extend_attributes(
&self,
existing: MetricAttributes,
attribs: NewAttributes,
) -> MetricAttributes {
self.as_ref().extend_attributes(existing, attribs)
}

fn counter(&self, params: MetricParameters) -> Arc<dyn Counter> {
self.as_ref().counter(params)
}
Expand All @@ -98,60 +88,37 @@ pub enum MetricAttributes {
OTel {
kvs: Arc<Vec<opentelemetry::KeyValue>>,
},
Lang(LangMetricAttributes),
}
#[derive(Clone, Debug)]
pub struct LangMetricAttributes {
/// A set of references to attributes stored in lang memory. All referenced attributes should
/// be attached to the metric when recording.
pub ids: HashSet<u64>,
/// If populated, these key values should also be used in addition to the referred-to
/// existing attributes when recording
pub new_attributes: Vec<MetricKeyValue>,
Buffer(BufferAttributes),
Dynamic(Arc<dyn CustomMetricAttributes>),
}

impl MetricAttributes {
/// Extend existing metrics attributes with others, returning a new instance
pub fn merge(&self, other: MetricAttributes) -> Self {
let mut me = self.clone();
match (&mut me, other) {
#[cfg(feature = "otel_impls")]
(MetricAttributes::OTel { ref mut kvs }, MetricAttributes::OTel { kvs: other_kvs }) => {
Arc::make_mut(kvs).extend((*other_kvs).clone());
}
(MetricAttributes::Lang(ref mut l), MetricAttributes::Lang(ol)) => {
l.ids.extend(ol.ids);
l.new_attributes.extend(ol.new_attributes);
}
_ => panic!("Cannot merge metric attributes of different kinds"),
}
me
}

/// Mutate self to add new kvs
pub fn add_new_attrs(&mut self, new_kvs: impl IntoIterator<Item = MetricKeyValue>) {
match self {
#[cfg(feature = "otel_impls")]
MetricAttributes::OTel { ref mut kvs, .. } => {
Arc::make_mut(kvs).extend(new_kvs.into_iter().map(Into::into));
}
MetricAttributes::Lang(ref mut attrs, ..) => {
attrs.new_attributes.extend(new_kvs);
}
}
}
/// A reference to some attributes created lang side.
pub trait CustomMetricAttributes: Debug + Send + Sync {
/// Must be implemented to work around existing type system restrictions, see
/// [here](https://internals.rust-lang.org/t/downcast-not-from-any-but-from-any-trait/16736/12)
fn as_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
}

/// Options that are attached to metrics on a per-call basis
#[derive(Clone, Debug, Default, derive_more::Constructor)]
pub struct MetricsAttributesOptions {
pub struct NewAttributes {
pub attributes: Vec<MetricKeyValue>,
}
impl MetricsAttributesOptions {
impl NewAttributes {
pub fn extend(&mut self, new_kvs: impl IntoIterator<Item = MetricKeyValue>) {
self.attributes.extend(new_kvs)
}
}
impl<I> From<I> for NewAttributes
where
I: IntoIterator<Item = MetricKeyValue>,
{
fn from(value: I) -> Self {
Self {
attributes: value.into_iter().collect(),
}
}
}

/// A K/V pair that can be used to label a specific recording of a metric
#[derive(Clone, Debug)]
Expand Down Expand Up @@ -197,14 +164,92 @@ pub trait Gauge: Send + Sync {
fn record(&self, value: u64, attributes: &MetricAttributes);
}

#[derive(Debug, Clone)]
pub enum MetricEvent<I: BufferInstrumentRef> {
Create {
params: MetricParameters,
/// One you receive this event, call `set` on this with the initialized instrument reference
populate_into: LazyBufferInstrument<I>,
kind: MetricKind,
},
CreateAttributes {
/// One you receive this event, call `set` on this with the initialized attributes
populate_into: BufferAttributes,
/// If not `None`, use these already-initialized attributes as the base (extended with
/// `attributes`) for the ones you are about to initialize.
append_from: Option<BufferAttributes>,
attributes: Vec<MetricKeyValue>,
},
Update {
instrument: LazyBufferInstrument<I>,
attributes: BufferAttributes,
update: MetricUpdateVal,
},
}
#[derive(Debug, Clone, Copy)]
pub enum MetricKind {
Counter,
Gauge,
Histogram,
}
#[derive(Debug, Clone, Copy)]
pub enum MetricUpdateVal {
// Currently all deltas are natural numbers
Delta(u64),
// Currently all values are natural numbers
Value(u64),
}

pub trait MetricCallBufferer<I: BufferInstrumentRef>: Send + Sync {
fn retrieve(&self) -> Vec<MetricEvent<I>>;
}

/// A lazy reference to some metrics buffer attributes
pub type BufferAttributes = LazyRef<Arc<dyn CustomMetricAttributes + 'static>>;

/// Types lang uses to contain references to its lang-side defined instrument references must
/// implement this marker trait
pub trait BufferInstrumentRef {}
/// A lazy reference to a metrics buffer instrument
pub type LazyBufferInstrument<T> = LazyRef<Arc<T>>;

#[derive(Debug, Clone)]
pub struct LazyRef<T> {
to_be_initted: Arc<OnceLock<T>>,
}
impl<T> LazyRef<T> {
pub fn hole() -> Self {
Self {
to_be_initted: Arc::new(OnceLock::new()),
}
}

/// Get the reference you previously initialized
///
/// # Panics
/// If `set` has not already been called. You must set the reference before using it.
pub fn get(&self) -> &T {
self.to_be_initted
.get()
.expect("You must initialize the reference before using it")
}

/// Assigns a value to fill this reference.
/// Returns according to semantics of [OnceLock].
pub fn set(&self, val: T) -> Result<(), T> {
self.to_be_initted.set(val)
}
}

#[derive(Debug)]
pub struct NoOpCoreMeter;
impl CoreMeter for NoOpCoreMeter {
fn new_attributes(&self, _: MetricsAttributesOptions) -> MetricAttributes {
MetricAttributes::Lang(LangMetricAttributes {
ids: HashSet::new(),
new_attributes: vec![],
})
fn new_attributes(&self, _: NewAttributes) -> MetricAttributes {
MetricAttributes::Dynamic(Arc::new(NoOpAttributes))
}

fn extend_attributes(&self, existing: MetricAttributes, _: NewAttributes) -> MetricAttributes {
existing
}

fn counter(&self, _: MetricParameters) -> Arc<dyn Counter> {
Expand All @@ -231,6 +276,14 @@ impl Gauge for NoOpInstrument {
fn record(&self, _: u64, _: &MetricAttributes) {}
}

#[derive(Debug, Clone)]
pub struct NoOpAttributes;
impl CustomMetricAttributes for NoOpAttributes {
fn as_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
self as Arc<dyn Any + Send + Sync>
}
}

#[cfg(feature = "otel_impls")]
mod otel_impls {
use super::*;
Expand Down
Loading

0 comments on commit e9a6f06

Please sign in to comment.