Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Directly implement Measure trait for metric aggregates #2371

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions opentelemetry-sdk/src/metrics/instrument.rs
Original file line number Diff line number Diff line change
@@ -285,7 +285,7 @@ pub(crate) struct ResolvedMeasures<T> {
impl<T: Copy + 'static> SyncInstrument<T> for ResolvedMeasures<T> {
fn measure(&self, val: T, attrs: &[KeyValue]) {
for measure in &self.measures {
measure.call(val, attrs)
measure.measure(val, attrs)
}
}
}
@@ -304,7 +304,7 @@ impl<T> Observable<T> {
impl<T: Copy + Send + Sync + 'static> AsyncInstrument<T> for Observable<T> {
fn observe(&self, measurement: T, attrs: &[KeyValue]) {
for measure in &self.measures {
measure.call(measurement, attrs)
measure.measure(measurement, attrs)
}
}
}
85 changes: 44 additions & 41 deletions opentelemetry-sdk/src/metrics/internal/aggregate.rs
Original file line number Diff line number Diff line change
@@ -21,15 +21,26 @@ pub(crate) fn is_under_cardinality_limit(size: usize) -> bool {

/// Receives measurements to be aggregated.
pub(crate) trait Measure<T>: Send + Sync + 'static {
fn call(&self, measurement: T, attrs: &[KeyValue]);
fn measure(&self, measurement: T, attrs: &[KeyValue]);
}

impl<F, T> Measure<T> for F
struct FilteredMeasureInstrument<I> {
instrument: Arc<I>,
filter: Filter,
}

impl<I, T> Measure<T> for FilteredMeasureInstrument<I>
where
F: Fn(T, &[KeyValue]) + Send + Sync + 'static,
T: 'static,
I: Measure<T>,
{
fn call(&self, measurement: T, attrs: &[KeyValue]) {
self(measurement, attrs)
fn measure(&self, measurement: T, attrs: &[KeyValue]) {
let filtered_attrs: Vec<KeyValue> = attrs
.iter()
.filter(|kv| (self.filter)(kv))
.cloned()
.collect();
self.instrument.measure(measurement, &filtered_attrs);
}
}

@@ -83,28 +94,24 @@ impl<T: Number> AggregateBuilder<T> {
}
}

/// Wraps the passed in measure with an attribute filtering function.
fn filter(&self, f: impl Measure<T>) -> impl Measure<T> {
let filter = self.filter.clone();
move |n, attrs: &[KeyValue]| {
if let Some(filter) = &filter {
let filtered_attrs: Vec<KeyValue> =
attrs.iter().filter(|kv| filter(kv)).cloned().collect();
f.call(n, &filtered_attrs);
} else {
f.call(n, attrs);
};
fn maybe_filtered_measurement(&self, instrument: Arc<impl Measure<T>>) -> Arc<dyn Measure<T>> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any advice on naming?
both for maybe_filtered_measurement function, and for FilteredMeasureInstrument...

if let Some(filter) = &self.filter {
Arc::new(FilteredMeasureInstrument {
filter: filter.clone(),
instrument,
})
} else {
instrument
}
}

/// Builds a last-value aggregate function input and output.
pub(crate) fn last_value(&self) -> (impl Measure<T>, impl ComputeAggregation) {
let lv_filter = Arc::new(LastValue::new());
let lv_agg = Arc::clone(&lv_filter);
pub(crate) fn last_value(&self) -> (Arc<dyn Measure<T>>, impl ComputeAggregation) {
let lv_agg = Arc::new(LastValue::new());
let t = self.temporality;

(
self.filter(move |n, a: &[KeyValue]| lv_filter.measure(n, a)),
self.maybe_filtered_measurement(lv_agg.clone()),
move |dest: Option<&mut dyn Aggregation>| {
let g = dest.and_then(|d| d.as_mut().downcast_mut::<Gauge<T>>());
let mut new_agg = if g.is_none() {
@@ -132,13 +139,12 @@ impl<T: Number> AggregateBuilder<T> {
pub(crate) fn precomputed_sum(
&self,
monotonic: bool,
) -> (impl Measure<T>, impl ComputeAggregation) {
let s = Arc::new(PrecomputedSum::new(monotonic));
let agg_sum = Arc::clone(&s);
) -> (Arc<dyn Measure<T>>, impl ComputeAggregation) {
let agg_sum = Arc::new(PrecomputedSum::new(monotonic));
let t = self.temporality;

(
self.filter(move |n, a: &[KeyValue]| s.measure(n, a)),
self.maybe_filtered_measurement(agg_sum.clone()),
move |dest: Option<&mut dyn Aggregation>| match t {
Some(Temporality::Delta) => agg_sum.delta(dest),
_ => agg_sum.cumulative(dest),
@@ -147,13 +153,12 @@ impl<T: Number> AggregateBuilder<T> {
}

/// Builds a sum aggregate function input and output.
pub(crate) fn sum(&self, monotonic: bool) -> (impl Measure<T>, impl ComputeAggregation) {
let s = Arc::new(Sum::new(monotonic));
let agg_sum = Arc::clone(&s);
pub(crate) fn sum(&self, monotonic: bool) -> (Arc<dyn Measure<T>>, impl ComputeAggregation) {
let agg_sum = Arc::new(Sum::new(monotonic));
let t = self.temporality;

(
self.filter(move |n, a: &[KeyValue]| s.measure(n, a)),
self.maybe_filtered_measurement(agg_sum.clone()),
move |dest: Option<&mut dyn Aggregation>| match t {
Some(Temporality::Delta) => agg_sum.delta(dest),
_ => agg_sum.cumulative(dest),
@@ -167,13 +172,12 @@ impl<T: Number> AggregateBuilder<T> {
boundaries: Vec<f64>,
record_min_max: bool,
record_sum: bool,
) -> (impl Measure<T>, impl ComputeAggregation) {
let h = Arc::new(Histogram::new(boundaries, record_min_max, record_sum));
let agg_h = Arc::clone(&h);
) -> (Arc<dyn Measure<T>>, impl ComputeAggregation) {
let agg_h = Arc::new(Histogram::new(boundaries, record_min_max, record_sum));
let t = self.temporality;

(
self.filter(move |n, a: &[KeyValue]| h.measure(n, a)),
self.maybe_filtered_measurement(agg_h.clone()),
move |dest: Option<&mut dyn Aggregation>| match t {
Some(Temporality::Delta) => agg_h.delta(dest),
_ => agg_h.cumulative(dest),
@@ -188,18 +192,17 @@ impl<T: Number> AggregateBuilder<T> {
max_scale: i8,
record_min_max: bool,
record_sum: bool,
) -> (impl Measure<T>, impl ComputeAggregation) {
let h = Arc::new(ExpoHistogram::new(
) -> (Arc<dyn Measure<T>>, impl ComputeAggregation) {
let agg_h = Arc::new(ExpoHistogram::new(
max_size,
max_scale,
record_min_max,
record_sum,
));
let agg_h = Arc::clone(&h);
let t = self.temporality;

(
self.filter(move |n, a: &[KeyValue]| h.measure(n, a)),
self.maybe_filtered_measurement(agg_h.clone()),
move |dest: Option<&mut dyn Aggregation>| match t {
Some(Temporality::Delta) => agg_h.delta(dest),
_ => agg_h.cumulative(dest),
@@ -231,7 +234,7 @@ mod tests {
}],
};
let new_attributes = [KeyValue::new("b", 2)];
measure.call(2, &new_attributes[..]);
measure.measure(2, &new_attributes[..]);

let (count, new_agg) = agg.call(Some(&mut a));

@@ -272,7 +275,7 @@ mod tests {
is_monotonic: false,
};
let new_attributes = [KeyValue::new("b", 2)];
measure.call(3, &new_attributes[..]);
measure.measure(3, &new_attributes[..]);

let (count, new_agg) = agg.call(Some(&mut a));

@@ -315,7 +318,7 @@ mod tests {
is_monotonic: false,
};
let new_attributes = [KeyValue::new("b", 2)];
measure.call(3, &new_attributes[..]);
measure.measure(3, &new_attributes[..]);

let (count, new_agg) = agg.call(Some(&mut a));

@@ -354,7 +357,7 @@ mod tests {
},
};
let new_attributes = [KeyValue::new("b", 2)];
measure.call(3, &new_attributes[..]);
measure.measure(3, &new_attributes[..]);

let (count, new_agg) = agg.call(Some(&mut a));

@@ -406,7 +409,7 @@ mod tests {
},
};
let new_attributes = [KeyValue::new("b", 2)];
measure.call(3, &new_attributes[..]);
measure.measure(3, &new_attributes[..]);

let (count, new_agg) = agg.call(Some(&mut a));

41 changes: 20 additions & 21 deletions opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs
Original file line number Diff line number Diff line change
@@ -8,7 +8,7 @@ use crate::metrics::{
Temporality,
};

use super::{Aggregator, Number, ValueMap};
use super::{Aggregator, Measure, Number, ValueMap};

pub(crate) const EXPO_MAX_SCALE: i8 = 20;
pub(crate) const EXPO_MIN_SCALE: i8 = -10;
@@ -355,6 +355,19 @@ pub(crate) struct ExpoHistogram<T: Number> {
record_min_max: bool,
}

impl<T: Number> Measure<T> for ExpoHistogram<T> {
fn measure(&self, measurement: T, attrs: &[KeyValue]) {
let f_value = measurement.into_float();
// Ignore NaN and infinity.
// Only makes sense if T is f64, maybe this could be no-op for other cases?
if !f_value.is_finite() {
return;
}

self.value_map.measure(measurement, attrs);
}
}

impl<T: Number> ExpoHistogram<T> {
/// Create a new exponential histogram.
pub(crate) fn new(
@@ -374,17 +387,6 @@ impl<T: Number> ExpoHistogram<T> {
}
}

pub(crate) fn measure(&self, value: T, attrs: &[KeyValue]) {
let f_value = value.into_float();
// Ignore NaN and infinity.
// Only makes sense if T is f64, maybe this could be no-op for other cases?
if !f_value.is_finite() {
return;
}

self.value_map.measure(value, attrs);
}

pub(crate) fn delta(
&self,
dest: Option<&mut dyn Aggregation>,
@@ -510,7 +512,7 @@ impl<T: Number> ExpoHistogram<T> {

#[cfg(test)]
mod tests {
use std::ops::Neg;
use std::{ops::Neg, sync::Arc};

use crate::metrics::internal::{self, AggregateBuilder};

@@ -1217,12 +1219,9 @@ mod tests {
}

fn box_val<T>(
(m, ca): (impl internal::Measure<T>, impl internal::ComputeAggregation),
) -> (
Box<dyn internal::Measure<T>>,
Box<dyn internal::ComputeAggregation>,
) {
(Box::new(m), Box::new(ca))
(m, ca): (Arc<dyn Measure<T>>, impl internal::ComputeAggregation),
) -> (Arc<dyn Measure<T>>, Box<dyn internal::ComputeAggregation>) {
(m, Box::new(ca))
}

fn hist_aggregation<T: Number + From<u32>>() {
@@ -1236,7 +1235,7 @@ mod tests {
name: &'static str,
build: Box<
dyn Fn() -> (
Box<dyn internal::Measure<T>>,
Arc<dyn internal::Measure<T>>,
Box<dyn internal::ComputeAggregation>,
),
>,
@@ -1435,7 +1434,7 @@ mod tests {
let mut count = 0;
for n in test.input {
for v in n {
in_fn.call(v, &[])
in_fn.measure(v, &[])
}
count = out_fn.call(Some(got.as_mut())).0
}
28 changes: 15 additions & 13 deletions opentelemetry-sdk/src/metrics/internal/histogram.rs
Original file line number Diff line number Diff line change
@@ -7,8 +7,8 @@ use crate::metrics::data::{self, Aggregation};
use crate::metrics::Temporality;
use opentelemetry::KeyValue;

use super::ValueMap;
use super::{Aggregator, Number};
use super::{Measure, ValueMap};

impl<T> Aggregator for Mutex<Buckets<T>>
where
@@ -73,6 +73,20 @@ pub(crate) struct Histogram<T: Number> {
start: Mutex<SystemTime>,
}

impl<T: Number> Measure<T> for Histogram<T> {
fn measure(&self, measurement: T, attrs: &[KeyValue]) {
let f = measurement.into_float();
// This search will return an index in the range `[0, bounds.len()]`, where
// it will return `bounds.len()` if value is greater than the last element
// of `bounds`. This aligns with the buckets in that the length of buckets
// is `bounds.len()+1`, with the last bucket representing:
// `(bounds[bounds.len()-1], +∞)`.
let index = self.bounds.partition_point(|&x| x < f);

self.value_map.measure((measurement, index), attrs);
}
}

impl<T: Number> Histogram<T> {
#[allow(unused_mut)]
pub(crate) fn new(mut bounds: Vec<f64>, record_min_max: bool, record_sum: bool) -> Self {
@@ -93,18 +107,6 @@ impl<T: Number> Histogram<T> {
}
}

pub(crate) fn measure(&self, measurement: T, attrs: &[KeyValue]) {
let f = measurement.into_float();
// This search will return an index in the range `[0, bounds.len()]`, where
// it will return `bounds.len()` if value is greater than the last element
// of `bounds`. This aligns with the buckets in that the length of buckets
// is `bounds.len()+1`, with the last bucket representing:
// `(bounds[bounds.len()-1], +∞)`.
let index = self.bounds.partition_point(|&x| x < f);

self.value_map.measure((measurement, index), attrs);
}

pub(crate) fn delta(
&self,
dest: Option<&mut dyn Aggregation>,
13 changes: 7 additions & 6 deletions opentelemetry-sdk/src/metrics/internal/last_value.rs
Original file line number Diff line number Diff line change
@@ -3,7 +3,7 @@ use std::{mem::replace, ops::DerefMut, sync::Mutex, time::SystemTime};
use crate::metrics::data::DataPoint;
use opentelemetry::KeyValue;

use super::{Aggregator, AtomicTracker, AtomicallyUpdate, Number, ValueMap};
use super::{Aggregator, AtomicTracker, AtomicallyUpdate, Measure, Number, ValueMap};

/// this is reused by PrecomputedSum
pub(crate) struct Assign<T>
@@ -43,6 +43,12 @@ pub(crate) struct LastValue<T: Number> {
start: Mutex<SystemTime>,
}

impl<T: Number> Measure<T> for LastValue<T> {
fn measure(&self, measurement: T, attrs: &[KeyValue]) {
self.value_map.measure(measurement, attrs);
}
}

impl<T: Number> LastValue<T> {
pub(crate) fn new() -> Self {
LastValue {
@@ -51,11 +57,6 @@ impl<T: Number> LastValue<T> {
}
}

pub(crate) fn measure(&self, measurement: T, attrs: &[KeyValue]) {
// The argument index is not applicable to LastValue.
self.value_map.measure(measurement, attrs);
}

pub(crate) fn compute_aggregation_delta(&self, dest: &mut Vec<DataPoint<T>>) {
let t = SystemTime::now();
let prev_start = self
Loading