From 77e53f5dc74428b0d6022492f32e1232e2e1f641 Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Fri, 1 Dec 2023 09:07:22 +1100 Subject: [PATCH] Refactor --- .../windsock/profilers/shotover_metrics.rs | 249 ++++++++++-------- 1 file changed, 137 insertions(+), 112 deletions(-) diff --git a/shotover-proxy/benches/windsock/profilers/shotover_metrics.rs b/shotover-proxy/benches/windsock/profilers/shotover_metrics.rs index 6410c1720..b3fe30a62 100644 --- a/shotover-proxy/benches/windsock/profilers/shotover_metrics.rs +++ b/shotover-proxy/benches/windsock/profilers/shotover_metrics.rs @@ -1,143 +1,168 @@ use prometheus_parse::Value; use std::{collections::HashMap, time::Duration}; -use tokio::sync::mpsc; +use time::OffsetDateTime; use tokio::task::JoinHandle; +use tokio::{sync::mpsc, time::MissedTickBehavior}; use windsock::{Goal, Metric, ReportArchive}; pub struct ShotoverMetrics { shutdown_tx: mpsc::Sender<()>, task: JoinHandle<()>, } + +pub struct RawPrometheusExposition { + timestamp: OffsetDateTime, + content: String, +} type ParsedMetrics = HashMap>; impl ShotoverMetrics { pub fn new(bench_name: String, shotover_ip: &str) -> Self { let url = format!("http://{shotover_ip}:9001/metrics"); - let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1); + let (shutdown_tx, shutdown_rx) = mpsc::channel(1); let task = tokio::spawn(async move { - let mut results: ParsedMetrics = HashMap::new(); - loop { - tokio::select! { - _ = shutdown_rx.recv() => { - break; - } - _ = Self::get_metrics(&mut results, &url) => {} - } - } + // collect metrics until shutdown + let raw_metrics = Self::collect_metrics(shutdown_rx, &url).await; + + // we are now shutting down and need to process and save all collected metrics + let mut report = ReportArchive::load(&bench_name).unwrap(); + let parsed = Self::parse_metrics(raw_metrics, &report); + report.metrics.extend(Self::windsock_metrics(parsed)); + report.save(); + }); + ShotoverMetrics { task, shutdown_tx } + } - // The bench will start after sar has started so we need to throw away all sar metrics that were recorded before the bench started. - // let time_diff = report.bench_started_at - sar.started_at; - // let inital_values_to_discard = time_diff.as_seconds_f32().round() as usize; - // for values in sar.named_values.values_mut() { - // values.drain(0..inital_values_to_discard); - // } + pub fn parse_metrics( + raw_metrics: Vec, + report: &ReportArchive, + ) -> ParsedMetrics { + let mut parsed_metrics: ParsedMetrics = HashMap::new(); + for raw_metric in raw_metrics { + if raw_metric.timestamp > report.bench_started_at { + let metrics = prometheus_parse::Scrape::parse( + raw_metric.content.lines().map(|x| Ok(x.to_owned())), + ) + .unwrap(); + for sample in metrics.samples { + let key = format!( + "{}{{{}}}", + sample + .metric + .strip_prefix("shotover_") + .unwrap_or(&sample.metric), + sample.labels + ); - let mut new_metrics = vec![]; - for (name, value) in results { - match value[0] { - Value::Gauge(_) => { - new_metrics.push(Metric::EachSecond { - name, - values: value - .iter() - .map(|x| { - let Value::Gauge(x) = x else { - panic!("metric type changed during bench run") - }; - (*x, x.to_string(), Goal::SmallerIsBetter) // TODO: Goal::None - }) - .collect(), - }); - } - Value::Counter(_) => { - let mut prev = 0.0; - new_metrics.push(Metric::EachSecond { - name, - values: value - .iter() - .map(|x| { - let Value::Counter(x) = x else { - panic!("metric type changed during bench run") - }; - let diff = x - prev; - prev = *x; - (diff, diff.to_string(), Goal::SmallerIsBetter) - // TODO: Goal::None - }) - .collect(), - }); - } - Value::Summary(_) => { - let last = value.last().unwrap(); - let Value::Summary(summary) = last else { - panic!("metric type changed during bench run") - }; - let values = summary + parsed_metrics.entry(key).or_default().push(sample.value); + } + } + } + parsed_metrics + } + pub fn windsock_metrics(parsed_metrics: ParsedMetrics) -> Vec { + let mut new_metrics = vec![]; + for (name, value) in parsed_metrics { + match value[0] { + Value::Gauge(_) => { + new_metrics.push(Metric::EachSecond { + name, + values: value .iter() .map(|x| { - ( - x.count, - format!("{} - {:.4}ms", x.quantile, x.count * 1000.0), - Goal::SmallerIsBetter, - ) + let Value::Gauge(x) = x else { + panic!("metric type changed during bench run") + }; + (*x, x.to_string(), Goal::SmallerIsBetter) // TODO: Goal::None }) - .collect(); - // TODO: add a Metric::QuantileLatency and use instead - new_metrics.push(Metric::EachSecond { name, values }); - } - _ => { - tracing::warn!("Unused shotover metric: {name}") - } + .collect(), + }); } - } - new_metrics.sort_by_key(|x| { - let name = x.name(); - // move latency metrics to the top - if name.starts_with("chain_latency") || name.starts_with("transform_latency") { - format!("aaa_{name}") + Value::Counter(_) => { + let mut prev = 0.0; + new_metrics.push(Metric::EachSecond { + name, + values: value + .iter() + .map(|x| { + let Value::Counter(x) = x else { + panic!("metric type changed during bench run") + }; + let diff = x - prev; + prev = *x; + (diff, diff.to_string(), Goal::SmallerIsBetter) + // TODO: Goal::None + }) + .collect(), + }); } - // move failure metrics to the bottom. - else if name.starts_with("transform_failures") - || name.starts_with("failed_requests") - || name.starts_with("chain_failures") - { - format!("zzz_{name}") - } else { - name.to_owned() + Value::Summary(_) => { + let last = value.last().unwrap(); + let Value::Summary(summary) = last else { + panic!("metric type changed during bench run") + }; + let values = summary + .iter() + .map(|x| { + ( + x.count, + format!("{} - {:.4}ms", x.quantile, x.count * 1000.0), + Goal::SmallerIsBetter, + ) + }) + .collect(); + // TODO: add a Metric::QuantileLatency and use instead + new_metrics.push(Metric::EachSecond { name, values }); } - }); - - let mut report = ReportArchive::load(&bench_name).unwrap(); - report.metrics.extend(new_metrics); - report.save(); + _ => { + tracing::warn!("Unused shotover metric: {name}") + } + } + } + new_metrics.sort_by_key(|x| { + let name = x.name(); + // move latency metrics to the top + if name.starts_with("chain_latency") || name.starts_with("transform_latency") { + format!("aaa_{name}") + } + // move failure metrics to the bottom. + else if name.starts_with("transform_failures") + || name.starts_with("failed_requests") + || name.starts_with("chain_failures") + { + format!("zzz_{name}") + } else { + name.to_owned() + } }); - ShotoverMetrics { task, shutdown_tx } + new_metrics } - async fn get_metrics(results: &mut ParsedMetrics, url: &str) { - let metrics = tokio::time::timeout(Duration::from_secs(3), reqwest::get(url)) - .await - .unwrap() - .unwrap() - .text() - .await - .unwrap(); + async fn collect_metrics( + mut shutdown_rx: mpsc::Receiver<()>, + url: &str, + ) -> Vec { + let mut results = vec![]; - let metrics = - prometheus_parse::Scrape::parse(metrics.lines().map(|x| Ok(x.to_owned()))).unwrap(); - for sample in metrics.samples { - let key = format!( - "{}{{{}}}", - sample - .metric - .strip_prefix("shotover_") - .unwrap_or(&sample.metric), - sample.labels - ); - - results.entry(key).or_default().push(sample.value); + let mut interval = tokio::time::interval(Duration::from_secs(1)); + interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + loop { + tokio::select! { + _ = shutdown_rx.recv() => break, + _ = interval.tick() => { + match tokio::time::timeout(Duration::from_secs(3), reqwest::get(url)).await.unwrap() { + Ok(response) => { + results.push(RawPrometheusExposition { + timestamp: OffsetDateTime::now_utc(), + content: response.text().await.unwrap(), + }); + } + Err(err) => tracing::debug!("Failed to request from metrics endpoint, probably not up yet, error was {err:?}") + } + } + } } - tokio::time::sleep(Duration::from_secs(1)).await + results } pub fn insert_results_to_bench_archive(self) {