Skip to content

Commit

Permalink
Refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Nov 30, 2023
1 parent fb01cf2 commit 77e53f5
Showing 1 changed file with 137 additions and 112 deletions.
249 changes: 137 additions & 112 deletions shotover-proxy/benches/windsock/profilers/shotover_metrics.rs
Original file line number Diff line number Diff line change
@@ -1,143 +1,168 @@
use prometheus_parse::Value;
use std::{collections::HashMap, time::Duration};
use tokio::sync::mpsc;
use time::OffsetDateTime;
use tokio::task::JoinHandle;
use tokio::{sync::mpsc, time::MissedTickBehavior};
use windsock::{Goal, Metric, ReportArchive};

pub struct ShotoverMetrics {
shutdown_tx: mpsc::Sender<()>,
task: JoinHandle<()>,
}

pub struct RawPrometheusExposition {
timestamp: OffsetDateTime,
content: String,
}
type ParsedMetrics = HashMap<String, Vec<Value>>;

impl ShotoverMetrics {
pub fn new(bench_name: String, shotover_ip: &str) -> Self {
let url = format!("http://{shotover_ip}:9001/metrics");
let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1);
let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
let task = tokio::spawn(async move {
let mut results: ParsedMetrics = HashMap::new();
loop {
tokio::select! {
_ = shutdown_rx.recv() => {
break;
}
_ = Self::get_metrics(&mut results, &url) => {}
}
}
// collect metrics until shutdown
let raw_metrics = Self::collect_metrics(shutdown_rx, &url).await;

// we are now shutting down and need to process and save all collected metrics
let mut report = ReportArchive::load(&bench_name).unwrap();
let parsed = Self::parse_metrics(raw_metrics, &report);
report.metrics.extend(Self::windsock_metrics(parsed));
report.save();
});
ShotoverMetrics { task, shutdown_tx }
}

// The bench will start after sar has started so we need to throw away all sar metrics that were recorded before the bench started.
// let time_diff = report.bench_started_at - sar.started_at;
// let inital_values_to_discard = time_diff.as_seconds_f32().round() as usize;
// for values in sar.named_values.values_mut() {
// values.drain(0..inital_values_to_discard);
// }
pub fn parse_metrics(
raw_metrics: Vec<RawPrometheusExposition>,
report: &ReportArchive,
) -> ParsedMetrics {
let mut parsed_metrics: ParsedMetrics = HashMap::new();
for raw_metric in raw_metrics {
if raw_metric.timestamp > report.bench_started_at {
let metrics = prometheus_parse::Scrape::parse(
raw_metric.content.lines().map(|x| Ok(x.to_owned())),
)
.unwrap();
for sample in metrics.samples {
let key = format!(
"{}{{{}}}",
sample
.metric
.strip_prefix("shotover_")
.unwrap_or(&sample.metric),
sample.labels
);

let mut new_metrics = vec![];
for (name, value) in results {
match value[0] {
Value::Gauge(_) => {
new_metrics.push(Metric::EachSecond {
name,
values: value
.iter()
.map(|x| {
let Value::Gauge(x) = x else {
panic!("metric type changed during bench run")
};
(*x, x.to_string(), Goal::SmallerIsBetter) // TODO: Goal::None
})
.collect(),
});
}
Value::Counter(_) => {
let mut prev = 0.0;
new_metrics.push(Metric::EachSecond {
name,
values: value
.iter()
.map(|x| {
let Value::Counter(x) = x else {
panic!("metric type changed during bench run")
};
let diff = x - prev;
prev = *x;
(diff, diff.to_string(), Goal::SmallerIsBetter)
// TODO: Goal::None
})
.collect(),
});
}
Value::Summary(_) => {
let last = value.last().unwrap();
let Value::Summary(summary) = last else {
panic!("metric type changed during bench run")
};
let values = summary
parsed_metrics.entry(key).or_default().push(sample.value);
}
}
}
parsed_metrics
}
pub fn windsock_metrics(parsed_metrics: ParsedMetrics) -> Vec<Metric> {
let mut new_metrics = vec![];
for (name, value) in parsed_metrics {
match value[0] {
Value::Gauge(_) => {
new_metrics.push(Metric::EachSecond {
name,
values: value
.iter()
.map(|x| {
(
x.count,
format!("{} - {:.4}ms", x.quantile, x.count * 1000.0),
Goal::SmallerIsBetter,
)
let Value::Gauge(x) = x else {
panic!("metric type changed during bench run")
};
(*x, x.to_string(), Goal::SmallerIsBetter) // TODO: Goal::None
})
.collect();
// TODO: add a Metric::QuantileLatency and use instead
new_metrics.push(Metric::EachSecond { name, values });
}
_ => {
tracing::warn!("Unused shotover metric: {name}")
}
.collect(),
});
}
}
new_metrics.sort_by_key(|x| {
let name = x.name();
// move latency metrics to the top
if name.starts_with("chain_latency") || name.starts_with("transform_latency") {
format!("aaa_{name}")
Value::Counter(_) => {
let mut prev = 0.0;
new_metrics.push(Metric::EachSecond {
name,
values: value
.iter()
.map(|x| {
let Value::Counter(x) = x else {
panic!("metric type changed during bench run")
};
let diff = x - prev;
prev = *x;
(diff, diff.to_string(), Goal::SmallerIsBetter)
// TODO: Goal::None
})
.collect(),
});
}
// move failure metrics to the bottom.
else if name.starts_with("transform_failures")
|| name.starts_with("failed_requests")
|| name.starts_with("chain_failures")
{
format!("zzz_{name}")
} else {
name.to_owned()
Value::Summary(_) => {
let last = value.last().unwrap();
let Value::Summary(summary) = last else {
panic!("metric type changed during bench run")
};
let values = summary
.iter()
.map(|x| {
(
x.count,
format!("{} - {:.4}ms", x.quantile, x.count * 1000.0),
Goal::SmallerIsBetter,
)
})
.collect();
// TODO: add a Metric::QuantileLatency and use instead
new_metrics.push(Metric::EachSecond { name, values });
}
});

let mut report = ReportArchive::load(&bench_name).unwrap();
report.metrics.extend(new_metrics);
report.save();
_ => {
tracing::warn!("Unused shotover metric: {name}")
}
}
}
new_metrics.sort_by_key(|x| {
let name = x.name();
// move latency metrics to the top
if name.starts_with("chain_latency") || name.starts_with("transform_latency") {
format!("aaa_{name}")
}
// move failure metrics to the bottom.
else if name.starts_with("transform_failures")
|| name.starts_with("failed_requests")
|| name.starts_with("chain_failures")
{
format!("zzz_{name}")
} else {
name.to_owned()
}
});
ShotoverMetrics { task, shutdown_tx }
new_metrics
}

async fn get_metrics(results: &mut ParsedMetrics, url: &str) {
let metrics = tokio::time::timeout(Duration::from_secs(3), reqwest::get(url))
.await
.unwrap()
.unwrap()
.text()
.await
.unwrap();
async fn collect_metrics(
mut shutdown_rx: mpsc::Receiver<()>,
url: &str,
) -> Vec<RawPrometheusExposition> {
let mut results = vec![];

let metrics =
prometheus_parse::Scrape::parse(metrics.lines().map(|x| Ok(x.to_owned()))).unwrap();
for sample in metrics.samples {
let key = format!(
"{}{{{}}}",
sample
.metric
.strip_prefix("shotover_")
.unwrap_or(&sample.metric),
sample.labels
);

results.entry(key).or_default().push(sample.value);
let mut interval = tokio::time::interval(Duration::from_secs(1));
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
loop {
tokio::select! {
_ = shutdown_rx.recv() => break,
_ = interval.tick() => {
match tokio::time::timeout(Duration::from_secs(3), reqwest::get(url)).await.unwrap() {
Ok(response) => {
results.push(RawPrometheusExposition {
timestamp: OffsetDateTime::now_utc(),
content: response.text().await.unwrap(),
});
}
Err(err) => tracing::debug!("Failed to request from metrics endpoint, probably not up yet, error was {err:?}")
}
}
}
}
tokio::time::sleep(Duration::from_secs(1)).await
results
}

pub fn insert_results_to_bench_archive(self) {
Expand Down

0 comments on commit 77e53f5

Please sign in to comment.