Skip to content

Commit

Permalink
Move time from DataPoint to Sum/Gauge
Browse files Browse the repository at this point in the history
  • Loading branch information
fraillt committed Dec 3, 2024
1 parent 506a4f9 commit 3f87777
Show file tree
Hide file tree
Showing 9 changed files with 132 additions and 156 deletions.
8 changes: 4 additions & 4 deletions opentelemetry-proto/src/transform/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,8 +295,8 @@ pub mod tonic {
.iter()
.map(|dp| TonicNumberDataPoint {
attributes: dp.attributes.iter().map(Into::into).collect(),
start_time_unix_nano: to_nanos(dp.start_time),
time_unix_nano: to_nanos(dp.time),
start_time_unix_nano: to_nanos(sum.start_time),
time_unix_nano: to_nanos(sum.time),

Check warning on line 299 in opentelemetry-proto/src/transform/metrics.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-proto/src/transform/metrics.rs#L298-L299

Added lines #L298 - L299 were not covered by tests
exemplars: dp.exemplars.iter().map(Into::into).collect(),
flags: TonicDataPointFlags::default() as u32,
value: Some(dp.value.into()),
Expand All @@ -319,8 +319,8 @@ pub mod tonic {
.iter()
.map(|dp| TonicNumberDataPoint {
attributes: dp.attributes.iter().map(Into::into).collect(),
start_time_unix_nano: to_nanos(dp.start_time),
time_unix_nano: to_nanos(dp.time),
start_time_unix_nano: to_nanos(gauge.start_time),
time_unix_nano: to_nanos(gauge.time),

Check warning on line 323 in opentelemetry-proto/src/transform/metrics.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-proto/src/transform/metrics.rs#L322-L323

Added lines #L322 - L323 were not covered by tests
exemplars: dp.exemplars.iter().map(Into::into).collect(),
flags: TonicDataPointFlags::default() as u32,
value: Some(dp.value.into()),
Expand Down
16 changes: 8 additions & 8 deletions opentelemetry-sdk/src/metrics/data/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ pub trait Aggregation: fmt::Debug + any::Any + Send + Sync {
pub struct Gauge<T> {
/// Represents individual aggregated measurements with unique attributes.
pub data_points: Vec<DataPoint<T>>,
/// The time when the time series was started.
pub start_time: SystemTime,
/// The time when the time series was recorded.
pub time: SystemTime,
}

impl<T: fmt::Debug + Send + Sync + 'static> Aggregation for Gauge<T> {
Expand All @@ -74,6 +78,10 @@ impl<T: fmt::Debug + Send + Sync + 'static> Aggregation for Gauge<T> {
pub struct Sum<T> {
/// Represents individual aggregated measurements with unique attributes.
pub data_points: Vec<DataPoint<T>>,
/// The time when the time series was started.
pub start_time: SystemTime,
/// The time when the time series was recorded.
pub time: SystemTime,
/// Describes if the aggregation is reported as the change from the last report
/// time, or the cumulative changes since a fixed start time.
pub temporality: Temporality,
Expand All @@ -96,10 +104,6 @@ pub struct DataPoint<T> {
/// Attributes is the set of key value pairs that uniquely identify the
/// time series.
pub attributes: Vec<KeyValue>,
/// The time when the time series was started.
pub start_time: SystemTime,
/// The time when the time series was recorded.
pub time: SystemTime,
/// The value of this data point.
pub value: T,
/// The sampled [Exemplar]s collected during the time series.
Expand All @@ -110,8 +114,6 @@ impl<T: Copy> Clone for DataPoint<T> {
fn clone(&self) -> Self {
Self {
attributes: self.attributes.clone(),
start_time: self.start_time,
time: self.time,
value: self.value,
exemplars: self.exemplars.clone(),
}
Expand Down Expand Up @@ -338,8 +340,6 @@ mod tests {
fn validate_cloning_data_points() {
let data_type = DataPoint {
attributes: vec![KeyValue::new("key", "value")],
start_time: std::time::SystemTime::now(),
time: std::time::SystemTime::now(),
value: 0u32,
exemplars: vec![Exemplar {
filtered_attributes: vec![],
Expand Down
51 changes: 14 additions & 37 deletions opentelemetry-sdk/src/metrics/internal/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@ use std::{marker, sync::Arc};

use opentelemetry::KeyValue;

use crate::metrics::{
data::{Aggregation, Gauge},
Temporality,
};
use crate::metrics::{data::Aggregation, Temporality};

use super::{
exponential_histogram::ExpoHistogram, histogram::Histogram, last_value::LastValue,
Expand Down Expand Up @@ -99,31 +96,15 @@ impl<T: Number> AggregateBuilder<T> {

/// Builds a last-value aggregate function input and output.
pub(crate) fn last_value(&self) -> (impl Measure<T>, impl ComputeAggregation) {
let lv_filter = Arc::new(LastValue::new());
let lv_agg = Arc::clone(&lv_filter);
let lv = Arc::new(LastValue::new());
let agg_lv = Arc::clone(&lv);
let t = self.temporality;

(
self.filter(move |n, a: &[KeyValue]| lv_filter.measure(n, a)),
move |dest: Option<&mut dyn Aggregation>| {
let g = dest.and_then(|d| d.as_mut().downcast_mut::<Gauge<T>>());
let mut new_agg = if g.is_none() {
Some(Gauge {
data_points: vec![],
})
} else {
None
};
let g = g.unwrap_or_else(|| new_agg.as_mut().expect("present if g is none"));

match t {
Some(Temporality::Delta) => {
lv_agg.compute_aggregation_delta(&mut g.data_points)
}
_ => lv_agg.compute_aggregation_cumulative(&mut g.data_points),
}

(g.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>))
self.filter(move |n, a: &[KeyValue]| lv.measure(n, a)),
move |dest: Option<&mut dyn Aggregation>| match t {
Some(Temporality::Delta) => agg_lv.delta(dest),
_ => agg_lv.cumulative(dest),
},
)
}
Expand Down Expand Up @@ -211,7 +192,7 @@ impl<T: Number> AggregateBuilder<T> {
#[cfg(test)]
mod tests {
use crate::metrics::data::{
DataPoint, ExponentialBucket, ExponentialHistogram, ExponentialHistogramDataPoint,
DataPoint, ExponentialBucket, ExponentialHistogram, ExponentialHistogramDataPoint, Gauge,
Histogram, HistogramDataPoint, Sum,
};
use std::{time::SystemTime, vec};
Expand All @@ -224,11 +205,11 @@ mod tests {
let mut a = Gauge {
data_points: vec![DataPoint {
attributes: vec![KeyValue::new("a", 1)],
start_time: SystemTime::now(),
time: SystemTime::now(),
value: 1u64,
exemplars: vec![],
}],
start_time: SystemTime::now(),
time: SystemTime::now(),
};
let new_attributes = [KeyValue::new("b", 2)];
measure.call(2, &new_attributes[..]);
Expand All @@ -251,19 +232,17 @@ mod tests {
data_points: vec![
DataPoint {
attributes: vec![KeyValue::new("a1", 1)],
start_time: SystemTime::now(),
time: SystemTime::now(),
value: 1u64,
exemplars: vec![],
},
DataPoint {
attributes: vec![KeyValue::new("a2", 1)],
start_time: SystemTime::now(),
time: SystemTime::now(),
value: 2u64,
exemplars: vec![],
},
],
start_time: SystemTime::now(),
time: SystemTime::now(),
temporality: if temporality == Temporality::Delta {
Temporality::Cumulative
} else {
Expand Down Expand Up @@ -294,19 +273,17 @@ mod tests {
data_points: vec![
DataPoint {
attributes: vec![KeyValue::new("a1", 1)],
start_time: SystemTime::now(),
time: SystemTime::now(),
value: 1u64,
exemplars: vec![],
},
DataPoint {
attributes: vec![KeyValue::new("a2", 1)],
start_time: SystemTime::now(),
time: SystemTime::now(),
value: 2u64,
exemplars: vec![],
},
],
start_time: SystemTime::now(),
time: SystemTime::now(),
temporality: if temporality == Temporality::Delta {
Temporality::Cumulative
} else {
Expand Down
58 changes: 4 additions & 54 deletions opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1440,15 +1440,14 @@ mod tests {
count = out_fn.call(Some(got.as_mut())).0
}

assert_aggregation_eq::<T>(Box::new(test.want), got, true, test.name);
assert_aggregation_eq::<T>(Box::new(test.want), got, test.name);
assert_eq!(test.want_count, count, "{}", test.name);
}
}

fn assert_aggregation_eq<T: Number + PartialEq>(
a: Box<dyn Aggregation>,
b: Box<dyn Aggregation>,
ignore_timestamp: bool,
test_name: &'static str,
) {
assert_eq!(
Expand All @@ -1467,13 +1466,7 @@ mod tests {
test_name
);
for (a, b) in a.data_points.iter().zip(b.data_points.iter()) {
assert_data_points_eq(
a,
b,
ignore_timestamp,
"mismatching gauge data points",
test_name,
);
assert_data_points_eq(a, b, "mismatching gauge data points", test_name);

Check warning on line 1469 in opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs#L1469

Added line #L1469 was not covered by tests
}
} else if let Some(a) = a.as_any().downcast_ref::<data::Sum<T>>() {
let b = b.as_any().downcast_ref::<data::Sum<T>>().unwrap();
Expand All @@ -1494,13 +1487,7 @@ mod tests {
test_name
);
for (a, b) in a.data_points.iter().zip(b.data_points.iter()) {
assert_data_points_eq(
a,
b,
ignore_timestamp,
"mismatching sum data points",
test_name,
);
assert_data_points_eq(a, b, "mismatching sum data points", test_name);

Check warning on line 1490 in opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs#L1490

Added line #L1490 was not covered by tests
}
} else if let Some(a) = a.as_any().downcast_ref::<data::Histogram<T>>() {
let b = b.as_any().downcast_ref::<data::Histogram<T>>().unwrap();
Expand All @@ -1516,13 +1503,7 @@ mod tests {
test_name
);
for (a, b) in a.data_points.iter().zip(b.data_points.iter()) {
assert_hist_data_points_eq(
a,
b,
ignore_timestamp,
"mismatching hist data points",
test_name,
);
assert_hist_data_points_eq(a, b, "mismatching hist data points", test_name);

Check warning on line 1506 in opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs#L1506

Added line #L1506 was not covered by tests
}
} else if let Some(a) = a.as_any().downcast_ref::<data::ExponentialHistogram<T>>() {
let b = b
Expand All @@ -1544,7 +1525,6 @@ mod tests {
assert_exponential_hist_data_points_eq(
a,
b,
ignore_timestamp,
"mismatching hist data points",
test_name,
);
Expand All @@ -1557,7 +1537,6 @@ mod tests {
fn assert_data_points_eq<T: Number>(
a: &data::DataPoint<T>,
b: &data::DataPoint<T>,
ignore_timestamp: bool,
message: &'static str,
test_name: &'static str,
) {
Expand All @@ -1567,21 +1546,11 @@ mod tests {
test_name, message
);
assert_eq!(a.value, b.value, "{}: {} value", test_name, message);

if !ignore_timestamp {
assert_eq!(
a.start_time, b.start_time,
"{}: {} start time",
test_name, message
);
assert_eq!(a.time, b.time, "{}: {} time", test_name, message);
}
}

fn assert_hist_data_points_eq<T: Number>(
a: &data::HistogramDataPoint<T>,
b: &data::HistogramDataPoint<T>,
ignore_timestamp: bool,
message: &'static str,
test_name: &'static str,
) {
Expand All @@ -1600,21 +1569,11 @@ mod tests {
assert_eq!(a.min, b.min, "{}: {} min", test_name, message);
assert_eq!(a.max, b.max, "{}: {} max", test_name, message);
assert_eq!(a.sum, b.sum, "{}: {} sum", test_name, message);

if !ignore_timestamp {
assert_eq!(
a.start_time, b.start_time,
"{}: {} start time",
test_name, message
);
assert_eq!(a.time, b.time, "{}: {} time", test_name, message);
}
}

fn assert_exponential_hist_data_points_eq<T: Number>(
a: &data::ExponentialHistogramDataPoint<T>,
b: &data::ExponentialHistogramDataPoint<T>,
ignore_timestamp: bool,
message: &'static str,
test_name: &'static str,
) {
Expand Down Expand Up @@ -1645,14 +1604,5 @@ mod tests {
"{}: {} neg",
test_name, message
);

if !ignore_timestamp {
assert_eq!(
a.start_time, b.start_time,
"{}: {} start time",
test_name, message
);
assert_eq!(a.time, b.time, "{}: {} time", test_name, message);
}
}
}
Loading

0 comments on commit 3f87777

Please sign in to comment.