From fb01cf2b3b5c05490147a94b3f4a7903a0cbbe5e Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Wed, 29 Nov 2023 13:55:43 +1100 Subject: [PATCH 1/4] Add shotover metrics profiler --- .github/workflows/windsock_benches.yaml | 1 + Cargo.lock | 14 ++ shotover-proxy/Cargo.toml | 2 + shotover-proxy/benches/windsock/cassandra.rs | 3 +- shotover-proxy/benches/windsock/kafka.rs | 6 +- shotover-proxy/benches/windsock/profilers.rs | 39 ++++- .../windsock/profilers/shotover_metrics.rs | 148 ++++++++++++++++++ shotover-proxy/benches/windsock/redis.rs | 6 +- windsock/src/report.rs | 7 + 9 files changed, 219 insertions(+), 7 deletions(-) create mode 100644 shotover-proxy/benches/windsock/profilers/shotover_metrics.rs diff --git a/.github/workflows/windsock_benches.yaml b/.github/workflows/windsock_benches.yaml index 2586e0ba1..0b48ce96b 100644 --- a/.github/workflows/windsock_benches.yaml +++ b/.github/workflows/windsock_benches.yaml @@ -40,6 +40,7 @@ jobs: cargo windsock --bench-length-seconds 5 --operations-per-second 100 --profilers flamegraph --name cassandra,compression=none,driver=scylla,operation=read_i64,protocol=v4,shotover=standard,topology=single cargo windsock --bench-length-seconds 5 --operations-per-second 100 --profilers samply --name cassandra,compression=none,driver=scylla,operation=read_i64,protocol=v4,shotover=standard,topology=single cargo windsock --bench-length-seconds 5 --operations-per-second 100 --profilers sys_monitor --name kafka,shotover=standard,size=1B,topology=single + cargo windsock --bench-length-seconds 5 --operations-per-second 100 --profilers shotover_metrics --name redis,encryption=none,operation=get,shotover=standard,topology=single # windsock/examples/cassandra.rs - this can stay here until windsock is moved to its own repo cargo run --release --example cassandra -- --bench-length-seconds 5 --operations-per-second 100 diff --git a/Cargo.lock b/Cargo.lock index b465054fb..6ed87e434 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3411,6 +3411,18 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prometheus-parse" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c2aa5feb83bf4b2c8919eaf563f51dbab41183de73ba2353c0e03cd7b6bd892" +dependencies = [ + "chrono", + "itertools 0.10.5", + "once_cell", + "regex", +] + [[package]] name = "quanta" version = "0.11.1" @@ -4450,11 +4462,13 @@ dependencies = [ "itertools 0.12.0", "nix 0.27.1", "opensearch", + "prometheus-parse", "rand 0.8.5", "rand_distr", "redis", "redis-protocol", "regex", + "reqwest", "rstest", "rstest_reuse", "rustls-pemfile", diff --git a/shotover-proxy/Cargo.toml b/shotover-proxy/Cargo.toml index 583886d28..dd34b5259 100644 --- a/shotover-proxy/Cargo.toml +++ b/shotover-proxy/Cargo.toml @@ -11,6 +11,8 @@ license = "Apache-2.0" shotover = { path = "../shotover" } [dev-dependencies] +prometheus-parse = "0.2.4" +reqwest.workspace = true scylla.workspace = true anyhow.workspace = true tokio.workspace = true diff --git a/shotover-proxy/benches/windsock/cassandra.rs b/shotover-proxy/benches/windsock/cassandra.rs index fd0c2b802..aedeb3230 100644 --- a/shotover-proxy/benches/windsock/cassandra.rs +++ b/shotover-proxy/benches/windsock/cassandra.rs @@ -552,7 +552,8 @@ impl Bench for CassandraBench { } } let mut profiler = - CloudProfilerRunner::new(self.name(), profiling, profiler_instances).await; + CloudProfilerRunner::new(self.name(), profiling, profiler_instances, &shotover_ip) + .await; let cassandra_nodes = vec![ AwsNodeInfo { diff --git a/shotover-proxy/benches/windsock/kafka.rs b/shotover-proxy/benches/windsock/kafka.rs index 6ffca3f48..9e8a8520f 100644 --- a/shotover-proxy/benches/windsock/kafka.rs +++ b/shotover-proxy/benches/windsock/kafka.rs @@ -245,12 +245,12 @@ impl Bench for KafkaBench { } } - let mut profiler = - CloudProfilerRunner::new(self.name(), profiling, profiler_instances).await; - let kafka_ip = kafka_instance1.instance.private_ip().to_string(); let shotover_ip = shotover_instance.instance.private_ip().to_string(); + let mut profiler = + CloudProfilerRunner::new(self.name(), profiling, profiler_instances, &kafka_ip).await; + let kafka_instances = vec![ kafka_instance1.clone(), kafka_instance2.clone(), diff --git a/shotover-proxy/benches/windsock/profilers.rs b/shotover-proxy/benches/windsock/profilers.rs index c84a031fd..f5eb02bf8 100644 --- a/shotover-proxy/benches/windsock/profilers.rs +++ b/shotover-proxy/benches/windsock/profilers.rs @@ -1,4 +1,4 @@ -use self::samply::Samply; +use self::{samply::Samply, shotover_metrics::ShotoverMetrics}; use crate::common::Shotover; use anyhow::Result; use aws_throwaway::Ec2Instance; @@ -11,14 +11,17 @@ use windsock::Profiling; mod perf_flamegraph; mod samply; mod sar; +mod shotover_metrics; pub struct ProfilerRunner { bench_name: String, run_flamegraph: bool, run_samply: bool, + run_shotover_metrics: bool, run_sys_monitor: bool, results_path: PathBuf, perf: Option, + shotover_metrics: Option, samply: Option, sys_monitor: Option>>, } @@ -32,14 +35,19 @@ impl ProfilerRunner { let run_sys_monitor = profiling .profilers_to_use .contains(&"sys_monitor".to_owned()); + let run_shotover_metrics = profiling + .profilers_to_use + .contains(&"shotover_metrics".to_owned()); ProfilerRunner { bench_name, run_flamegraph, run_sys_monitor, run_samply, + run_shotover_metrics, results_path: profiling.results_path, perf: None, + shotover_metrics: None, samply: None, sys_monitor: None, } @@ -58,6 +66,15 @@ impl ProfilerRunner { } else { None }; + self.shotover_metrics = if self.run_shotover_metrics { + if shotover.is_some() { + Some(ShotoverMetrics::new(self.bench_name.clone(), "localhost")) + } else { + panic!("shotover_metrics not supported when benching without shotover") + } + } else { + None + }; self.samply = if self.run_samply { if let Some(shotover) = &shotover { Some(Samply::run(self.results_path.clone(), shotover.child().id().unwrap()).await) @@ -91,6 +108,9 @@ impl Drop for ProfilerRunner { if let Some(samply) = self.samply.take() { samply.wait(); } + if let Some(shotover_metrics) = self.shotover_metrics.take() { + shotover_metrics.insert_results_to_bench_archive(); + } if let Some(mut rx) = self.sys_monitor.take() { sar::insert_sar_results_to_bench_archive(&self.bench_name, "", sar::parse_sar(&mut rx)); } @@ -100,6 +120,7 @@ impl Drop for ProfilerRunner { pub struct CloudProfilerRunner { bench_name: String, monitor_instances: HashMap>>, + shotover_metrics: Option, } impl CloudProfilerRunner { @@ -107,11 +128,16 @@ impl CloudProfilerRunner { bench_name: String, profiling: Profiling, instances: HashMap, + shotover_ip: &str, ) -> Self { let run_sys_monitor = profiling .profilers_to_use .contains(&"sys_monitor".to_owned()); + let run_shotover_metrics = profiling + .profilers_to_use + .contains(&"shotover_metrics".to_owned()); + let mut monitor_instances = HashMap::new(); if run_sys_monitor { for (name, instance) in instances { @@ -119,9 +145,16 @@ impl CloudProfilerRunner { } } + let shotover_metrics = if run_shotover_metrics { + Some(ShotoverMetrics::new(bench_name.clone(), shotover_ip)) + } else { + None + }; + CloudProfilerRunner { bench_name, monitor_instances, + shotover_metrics, } } @@ -133,6 +166,9 @@ impl CloudProfilerRunner { sar::parse_sar(instance_rx), ); } + if let Some(shotover_metrics) = self.shotover_metrics.take() { + shotover_metrics.insert_results_to_bench_archive(); + } } } @@ -144,6 +180,7 @@ pub fn supported_profilers(shotover: Shotover) -> Vec { "flamegraph".to_owned(), "samply".to_owned(), "sys_monitor".to_owned(), + "shotover_metrics".to_owned(), ] } } diff --git a/shotover-proxy/benches/windsock/profilers/shotover_metrics.rs b/shotover-proxy/benches/windsock/profilers/shotover_metrics.rs new file mode 100644 index 000000000..6410c1720 --- /dev/null +++ b/shotover-proxy/benches/windsock/profilers/shotover_metrics.rs @@ -0,0 +1,148 @@ +use prometheus_parse::Value; +use std::{collections::HashMap, time::Duration}; +use tokio::sync::mpsc; +use tokio::task::JoinHandle; +use windsock::{Goal, Metric, ReportArchive}; + +pub struct ShotoverMetrics { + shutdown_tx: mpsc::Sender<()>, + task: JoinHandle<()>, +} +type ParsedMetrics = HashMap>; + +impl ShotoverMetrics { + pub fn new(bench_name: String, shotover_ip: &str) -> Self { + let url = format!("http://{shotover_ip}:9001/metrics"); + let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1); + let task = tokio::spawn(async move { + let mut results: ParsedMetrics = HashMap::new(); + loop { + tokio::select! { + _ = shutdown_rx.recv() => { + break; + } + _ = Self::get_metrics(&mut results, &url) => {} + } + } + + // The bench will start after sar has started so we need to throw away all sar metrics that were recorded before the bench started. + // let time_diff = report.bench_started_at - sar.started_at; + // let inital_values_to_discard = time_diff.as_seconds_f32().round() as usize; + // for values in sar.named_values.values_mut() { + // values.drain(0..inital_values_to_discard); + // } + + let mut new_metrics = vec![]; + for (name, value) in results { + match value[0] { + Value::Gauge(_) => { + new_metrics.push(Metric::EachSecond { + name, + values: value + .iter() + .map(|x| { + let Value::Gauge(x) = x else { + panic!("metric type changed during bench run") + }; + (*x, x.to_string(), Goal::SmallerIsBetter) // TODO: Goal::None + }) + .collect(), + }); + } + Value::Counter(_) => { + let mut prev = 0.0; + new_metrics.push(Metric::EachSecond { + name, + values: value + .iter() + .map(|x| { + let Value::Counter(x) = x else { + panic!("metric type changed during bench run") + }; + let diff = x - prev; + prev = *x; + (diff, diff.to_string(), Goal::SmallerIsBetter) + // TODO: Goal::None + }) + .collect(), + }); + } + Value::Summary(_) => { + let last = value.last().unwrap(); + let Value::Summary(summary) = last else { + panic!("metric type changed during bench run") + }; + let values = summary + .iter() + .map(|x| { + ( + x.count, + format!("{} - {:.4}ms", x.quantile, x.count * 1000.0), + Goal::SmallerIsBetter, + ) + }) + .collect(); + // TODO: add a Metric::QuantileLatency and use instead + new_metrics.push(Metric::EachSecond { name, values }); + } + _ => { + tracing::warn!("Unused shotover metric: {name}") + } + } + } + new_metrics.sort_by_key(|x| { + let name = x.name(); + // move latency metrics to the top + if name.starts_with("chain_latency") || name.starts_with("transform_latency") { + format!("aaa_{name}") + } + // move failure metrics to the bottom. + else if name.starts_with("transform_failures") + || name.starts_with("failed_requests") + || name.starts_with("chain_failures") + { + format!("zzz_{name}") + } else { + name.to_owned() + } + }); + + let mut report = ReportArchive::load(&bench_name).unwrap(); + report.metrics.extend(new_metrics); + report.save(); + }); + ShotoverMetrics { task, shutdown_tx } + } + + async fn get_metrics(results: &mut ParsedMetrics, url: &str) { + let metrics = tokio::time::timeout(Duration::from_secs(3), reqwest::get(url)) + .await + .unwrap() + .unwrap() + .text() + .await + .unwrap(); + + let metrics = + prometheus_parse::Scrape::parse(metrics.lines().map(|x| Ok(x.to_owned()))).unwrap(); + for sample in metrics.samples { + let key = format!( + "{}{{{}}}", + sample + .metric + .strip_prefix("shotover_") + .unwrap_or(&sample.metric), + sample.labels + ); + + results.entry(key).or_default().push(sample.value); + } + tokio::time::sleep(Duration::from_secs(1)).await + } + + pub fn insert_results_to_bench_archive(self) { + std::mem::drop(self.shutdown_tx); + // TODO: asyncify it all or something + futures::executor::block_on(async { self.task.await.unwrap() }) + } +} diff --git a/shotover-proxy/benches/windsock/redis.rs b/shotover-proxy/benches/windsock/redis.rs index bb13653ee..6e7b51485 100644 --- a/shotover-proxy/benches/windsock/redis.rs +++ b/shotover-proxy/benches/windsock/redis.rs @@ -222,12 +222,14 @@ impl Bench for RedisBench { profiler_instances.insert("redis".to_owned(), &instance.instance); } } - let mut profiler = - CloudProfilerRunner::new(self.name(), profiling, profiler_instances).await; let redis_ip = redis_instances.private_ips()[0].to_string(); let shotover_ip = shotover_instance.instance.private_ip().to_string(); + let mut profiler = + CloudProfilerRunner::new(self.name(), profiling, profiler_instances, &shotover_ip) + .await; + let (_, running_shotover) = futures::join!( redis_instances.run(self.encryption), self.run_aws_shotover(shotover_instance.clone(), redis_ip.clone()) diff --git a/windsock/src/report.rs b/windsock/src/report.rs index 8a69893e8..a9b6db51f 100644 --- a/windsock/src/report.rs +++ b/windsock/src/report.rs @@ -161,6 +161,13 @@ pub enum Metric { } impl Metric { + pub fn name(&self) -> &str { + match self { + Metric::Total { name, .. } => name, + Metric::EachSecond { name, .. } => name, + } + } + pub(crate) fn identifier(&self) -> MetricIdentifier { match self { Metric::Total { name, .. } => MetricIdentifier::Total { From 77e53f5dc74428b0d6022492f32e1232e2e1f641 Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Fri, 1 Dec 2023 09:07:22 +1100 Subject: [PATCH 2/4] Refactor --- .../windsock/profilers/shotover_metrics.rs | 249 ++++++++++-------- 1 file changed, 137 insertions(+), 112 deletions(-) diff --git a/shotover-proxy/benches/windsock/profilers/shotover_metrics.rs b/shotover-proxy/benches/windsock/profilers/shotover_metrics.rs index 6410c1720..b3fe30a62 100644 --- a/shotover-proxy/benches/windsock/profilers/shotover_metrics.rs +++ b/shotover-proxy/benches/windsock/profilers/shotover_metrics.rs @@ -1,143 +1,168 @@ use prometheus_parse::Value; use std::{collections::HashMap, time::Duration}; -use tokio::sync::mpsc; +use time::OffsetDateTime; use tokio::task::JoinHandle; +use tokio::{sync::mpsc, time::MissedTickBehavior}; use windsock::{Goal, Metric, ReportArchive}; pub struct ShotoverMetrics { shutdown_tx: mpsc::Sender<()>, task: JoinHandle<()>, } + +pub struct RawPrometheusExposition { + timestamp: OffsetDateTime, + content: String, +} type ParsedMetrics = HashMap>; impl ShotoverMetrics { pub fn new(bench_name: String, shotover_ip: &str) -> Self { let url = format!("http://{shotover_ip}:9001/metrics"); - let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1); + let (shutdown_tx, shutdown_rx) = mpsc::channel(1); let task = tokio::spawn(async move { - let mut results: ParsedMetrics = HashMap::new(); - loop { - tokio::select! { - _ = shutdown_rx.recv() => { - break; - } - _ = Self::get_metrics(&mut results, &url) => {} - } - } + // collect metrics until shutdown + let raw_metrics = Self::collect_metrics(shutdown_rx, &url).await; + + // we are now shutting down and need to process and save all collected metrics + let mut report = ReportArchive::load(&bench_name).unwrap(); + let parsed = Self::parse_metrics(raw_metrics, &report); + report.metrics.extend(Self::windsock_metrics(parsed)); + report.save(); + }); + ShotoverMetrics { task, shutdown_tx } + } - // The bench will start after sar has started so we need to throw away all sar metrics that were recorded before the bench started. - // let time_diff = report.bench_started_at - sar.started_at; - // let inital_values_to_discard = time_diff.as_seconds_f32().round() as usize; - // for values in sar.named_values.values_mut() { - // values.drain(0..inital_values_to_discard); - // } + pub fn parse_metrics( + raw_metrics: Vec, + report: &ReportArchive, + ) -> ParsedMetrics { + let mut parsed_metrics: ParsedMetrics = HashMap::new(); + for raw_metric in raw_metrics { + if raw_metric.timestamp > report.bench_started_at { + let metrics = prometheus_parse::Scrape::parse( + raw_metric.content.lines().map(|x| Ok(x.to_owned())), + ) + .unwrap(); + for sample in metrics.samples { + let key = format!( + "{}{{{}}}", + sample + .metric + .strip_prefix("shotover_") + .unwrap_or(&sample.metric), + sample.labels + ); - let mut new_metrics = vec![]; - for (name, value) in results { - match value[0] { - Value::Gauge(_) => { - new_metrics.push(Metric::EachSecond { - name, - values: value - .iter() - .map(|x| { - let Value::Gauge(x) = x else { - panic!("metric type changed during bench run") - }; - (*x, x.to_string(), Goal::SmallerIsBetter) // TODO: Goal::None - }) - .collect(), - }); - } - Value::Counter(_) => { - let mut prev = 0.0; - new_metrics.push(Metric::EachSecond { - name, - values: value - .iter() - .map(|x| { - let Value::Counter(x) = x else { - panic!("metric type changed during bench run") - }; - let diff = x - prev; - prev = *x; - (diff, diff.to_string(), Goal::SmallerIsBetter) - // TODO: Goal::None - }) - .collect(), - }); - } - Value::Summary(_) => { - let last = value.last().unwrap(); - let Value::Summary(summary) = last else { - panic!("metric type changed during bench run") - }; - let values = summary + parsed_metrics.entry(key).or_default().push(sample.value); + } + } + } + parsed_metrics + } + pub fn windsock_metrics(parsed_metrics: ParsedMetrics) -> Vec { + let mut new_metrics = vec![]; + for (name, value) in parsed_metrics { + match value[0] { + Value::Gauge(_) => { + new_metrics.push(Metric::EachSecond { + name, + values: value .iter() .map(|x| { - ( - x.count, - format!("{} - {:.4}ms", x.quantile, x.count * 1000.0), - Goal::SmallerIsBetter, - ) + let Value::Gauge(x) = x else { + panic!("metric type changed during bench run") + }; + (*x, x.to_string(), Goal::SmallerIsBetter) // TODO: Goal::None }) - .collect(); - // TODO: add a Metric::QuantileLatency and use instead - new_metrics.push(Metric::EachSecond { name, values }); - } - _ => { - tracing::warn!("Unused shotover metric: {name}") - } + .collect(), + }); } - } - new_metrics.sort_by_key(|x| { - let name = x.name(); - // move latency metrics to the top - if name.starts_with("chain_latency") || name.starts_with("transform_latency") { - format!("aaa_{name}") + Value::Counter(_) => { + let mut prev = 0.0; + new_metrics.push(Metric::EachSecond { + name, + values: value + .iter() + .map(|x| { + let Value::Counter(x) = x else { + panic!("metric type changed during bench run") + }; + let diff = x - prev; + prev = *x; + (diff, diff.to_string(), Goal::SmallerIsBetter) + // TODO: Goal::None + }) + .collect(), + }); } - // move failure metrics to the bottom. - else if name.starts_with("transform_failures") - || name.starts_with("failed_requests") - || name.starts_with("chain_failures") - { - format!("zzz_{name}") - } else { - name.to_owned() + Value::Summary(_) => { + let last = value.last().unwrap(); + let Value::Summary(summary) = last else { + panic!("metric type changed during bench run") + }; + let values = summary + .iter() + .map(|x| { + ( + x.count, + format!("{} - {:.4}ms", x.quantile, x.count * 1000.0), + Goal::SmallerIsBetter, + ) + }) + .collect(); + // TODO: add a Metric::QuantileLatency and use instead + new_metrics.push(Metric::EachSecond { name, values }); } - }); - - let mut report = ReportArchive::load(&bench_name).unwrap(); - report.metrics.extend(new_metrics); - report.save(); + _ => { + tracing::warn!("Unused shotover metric: {name}") + } + } + } + new_metrics.sort_by_key(|x| { + let name = x.name(); + // move latency metrics to the top + if name.starts_with("chain_latency") || name.starts_with("transform_latency") { + format!("aaa_{name}") + } + // move failure metrics to the bottom. + else if name.starts_with("transform_failures") + || name.starts_with("failed_requests") + || name.starts_with("chain_failures") + { + format!("zzz_{name}") + } else { + name.to_owned() + } }); - ShotoverMetrics { task, shutdown_tx } + new_metrics } - async fn get_metrics(results: &mut ParsedMetrics, url: &str) { - let metrics = tokio::time::timeout(Duration::from_secs(3), reqwest::get(url)) - .await - .unwrap() - .unwrap() - .text() - .await - .unwrap(); + async fn collect_metrics( + mut shutdown_rx: mpsc::Receiver<()>, + url: &str, + ) -> Vec { + let mut results = vec![]; - let metrics = - prometheus_parse::Scrape::parse(metrics.lines().map(|x| Ok(x.to_owned()))).unwrap(); - for sample in metrics.samples { - let key = format!( - "{}{{{}}}", - sample - .metric - .strip_prefix("shotover_") - .unwrap_or(&sample.metric), - sample.labels - ); - - results.entry(key).or_default().push(sample.value); + let mut interval = tokio::time::interval(Duration::from_secs(1)); + interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + loop { + tokio::select! { + _ = shutdown_rx.recv() => break, + _ = interval.tick() => { + match tokio::time::timeout(Duration::from_secs(3), reqwest::get(url)).await.unwrap() { + Ok(response) => { + results.push(RawPrometheusExposition { + timestamp: OffsetDateTime::now_utc(), + content: response.text().await.unwrap(), + }); + } + Err(err) => tracing::debug!("Failed to request from metrics endpoint, probably not up yet, error was {err:?}") + } + } + } } - tokio::time::sleep(Duration::from_secs(1)).await + results } pub fn insert_results_to_bench_archive(self) { From 400b14bb206273a7a941b03ddfcdba3f13a81284 Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Fri, 1 Dec 2023 11:58:39 +1100 Subject: [PATCH 3/4] Goal::None --- .../benches/windsock/profilers/shotover_metrics.rs | 5 ++--- windsock/src/tables.rs | 1 + 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/shotover-proxy/benches/windsock/profilers/shotover_metrics.rs b/shotover-proxy/benches/windsock/profilers/shotover_metrics.rs index b3fe30a62..a9a85dea2 100644 --- a/shotover-proxy/benches/windsock/profilers/shotover_metrics.rs +++ b/shotover-proxy/benches/windsock/profilers/shotover_metrics.rs @@ -73,7 +73,7 @@ impl ShotoverMetrics { let Value::Gauge(x) = x else { panic!("metric type changed during bench run") }; - (*x, x.to_string(), Goal::SmallerIsBetter) // TODO: Goal::None + (*x, x.to_string(), Goal::None) }) .collect(), }); @@ -90,8 +90,7 @@ impl ShotoverMetrics { }; let diff = x - prev; prev = *x; - (diff, diff.to_string(), Goal::SmallerIsBetter) - // TODO: Goal::None + (diff, diff.to_string(), Goal::None) }) .collect(), }); diff --git a/windsock/src/tables.rs b/windsock/src/tables.rs index 95431d0ed..239d5c13e 100644 --- a/windsock/src/tables.rs +++ b/windsock/src/tables.rs @@ -783,6 +783,7 @@ struct Measurement { pub enum Goal { BiggerIsBetter, SmallerIsBetter, + None, } enum Color { From d0d5840dcc144962ff26d97b983a171257cad342 Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Fri, 1 Dec 2023 12:28:44 +1100 Subject: [PATCH 4/4] Percentiles --- .../windsock/profilers/shotover_metrics.rs | 34 +++++++-------- windsock/src/lib.rs | 5 ++- windsock/src/report.rs | 27 ++++++++++++ windsock/src/tables.rs | 42 ++++++++++++++++++- 4 files changed, 87 insertions(+), 21 deletions(-) diff --git a/shotover-proxy/benches/windsock/profilers/shotover_metrics.rs b/shotover-proxy/benches/windsock/profilers/shotover_metrics.rs index a9a85dea2..18eeb5f09 100644 --- a/shotover-proxy/benches/windsock/profilers/shotover_metrics.rs +++ b/shotover-proxy/benches/windsock/profilers/shotover_metrics.rs @@ -3,7 +3,7 @@ use std::{collections::HashMap, time::Duration}; use time::OffsetDateTime; use tokio::task::JoinHandle; use tokio::{sync::mpsc, time::MissedTickBehavior}; -use windsock::{Goal, Metric, ReportArchive}; +use windsock::{Goal, LatencyPercentile, Metric, ReportArchive}; pub struct ShotoverMetrics { shutdown_tx: mpsc::Sender<()>, @@ -37,7 +37,7 @@ impl ShotoverMetrics { raw_metrics: Vec, report: &ReportArchive, ) -> ParsedMetrics { - let mut parsed_metrics: ParsedMetrics = HashMap::new(); + let mut result: ParsedMetrics = HashMap::new(); for raw_metric in raw_metrics { if raw_metric.timestamp > report.bench_started_at { let metrics = prometheus_parse::Scrape::parse( @@ -54,18 +54,19 @@ impl ShotoverMetrics { sample.labels ); - parsed_metrics.entry(key).or_default().push(sample.value); + result.entry(key).or_default().push(sample.value); } } } - parsed_metrics + result } + pub fn windsock_metrics(parsed_metrics: ParsedMetrics) -> Vec { - let mut new_metrics = vec![]; + let mut result = vec![]; for (name, value) in parsed_metrics { match value[0] { Value::Gauge(_) => { - new_metrics.push(Metric::EachSecond { + result.push(Metric::EachSecond { name, values: value .iter() @@ -80,7 +81,7 @@ impl ShotoverMetrics { } Value::Counter(_) => { let mut prev = 0.0; - new_metrics.push(Metric::EachSecond { + result.push(Metric::EachSecond { name, values: value .iter() @@ -102,23 +103,20 @@ impl ShotoverMetrics { }; let values = summary .iter() - .map(|x| { - ( - x.count, - format!("{} - {:.4}ms", x.quantile, x.count * 1000.0), - Goal::SmallerIsBetter, - ) + .map(|x| LatencyPercentile { + value: x.count, + value_display: format!("{:.4}ms", x.count * 1000.0), + quantile: x.quantile.to_string(), }) .collect(); - // TODO: add a Metric::QuantileLatency and use instead - new_metrics.push(Metric::EachSecond { name, values }); + result.push(Metric::LatencyPercentiles { name, values }); } _ => { tracing::warn!("Unused shotover metric: {name}") } } } - new_metrics.sort_by_key(|x| { + result.sort_by_key(|x| { let name = x.name(); // move latency metrics to the top if name.starts_with("chain_latency") || name.starts_with("transform_latency") { @@ -134,7 +132,7 @@ impl ShotoverMetrics { name.to_owned() } }); - new_metrics + result } async fn collect_metrics( @@ -166,7 +164,7 @@ impl ShotoverMetrics { pub fn insert_results_to_bench_archive(self) { std::mem::drop(self.shutdown_tx); - // TODO: asyncify it all or something + // TODO: make this function + caller async, lets do this in a follow up PR to avoid making this PR even more complex. futures::executor::block_on(async { self.task.await.unwrap() }) } } diff --git a/windsock/src/lib.rs b/windsock/src/lib.rs index b6b69adaa..2d64f8c77 100644 --- a/windsock/src/lib.rs +++ b/windsock/src/lib.rs @@ -8,7 +8,10 @@ mod report; mod tables; pub use bench::{Bench, BenchParameters, BenchTask, Profiling}; -pub use report::{ExternalReport, Metric, OperationsReport, PubSubReport, Report, ReportArchive}; +pub use report::{ + ExternalReport, LatencyPercentile, Metric, OperationsReport, PubSubReport, Report, + ReportArchive, +}; pub use tables::Goal; use anyhow::{anyhow, Result}; diff --git a/windsock/src/report.rs b/windsock/src/report.rs index a9b6db51f..cd20d53cc 100644 --- a/windsock/src/report.rs +++ b/windsock/src/report.rs @@ -158,6 +158,27 @@ pub enum Metric { name: String, values: Vec<(f64, String, Goal)>, }, + LatencyPercentiles { + name: String, + values: Vec, + }, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct LatencyPercentile { + pub quantile: String, + pub value: f64, + pub value_display: String, +} + +impl LatencyPercentile { + pub(crate) fn to_measurement(&self) -> (f64, String, Goal) { + ( + self.value, + self.value_display.clone(), + Goal::SmallerIsBetter, + ) + } } impl Metric { @@ -165,6 +186,7 @@ impl Metric { match self { Metric::Total { name, .. } => name, Metric::EachSecond { name, .. } => name, + Metric::LatencyPercentiles { name, .. } => name, } } @@ -176,6 +198,9 @@ impl Metric { Metric::EachSecond { name, .. } => MetricIdentifier::EachSecond { name: name.to_owned(), }, + Metric::LatencyPercentiles { name, .. } => MetricIdentifier::LatencyPercentiles { + name: name.to_owned(), + }, } } @@ -184,6 +209,7 @@ impl Metric { match self { Metric::Total { .. } => 1, Metric::EachSecond { values, .. } => values.len(), + Metric::LatencyPercentiles { values, .. } => values.len(), } } } @@ -192,6 +218,7 @@ impl Metric { pub enum MetricIdentifier { Total { name: String }, EachSecond { name: String }, + LatencyPercentiles { name: String }, } fn error_message_insertion(messages: &mut Vec, new_message: String) { diff --git a/windsock/src/tables.rs b/windsock/src/tables.rs index 239d5c13e..c33f69971 100644 --- a/windsock/src/tables.rs +++ b/windsock/src/tables.rs @@ -530,13 +530,13 @@ fn base(reports: &[ReportColumn], table_type: &str) { .iter() .find(|metric| metric.identifier() == metric_identifier) .map(|metric| match metric { - Metric::EachSecond { .. } => unreachable!(), Metric::Total { compare, value, goal, .. } => (*compare, value.to_owned(), *goal), + _ => unreachable!(), }) })); } @@ -561,12 +561,50 @@ fn base(reports: &[ReportColumn], table_type: &str) { .iter() .find(|x| x.identifier() == metric_identifier) .and_then(|metric| match metric { - Metric::Total { .. } => unreachable!(), Metric::EachSecond { values, .. } => values.get(i).cloned(), + _ => unreachable!(), }) })); } } + MetricIdentifier::LatencyPercentiles { name } => { + rows.push(Row::Heading(format!("{name} Percentiles"))); + for (i, largest_col) in reports + .iter() + .map(|x| { + x.current + .metrics + .iter() + .find(|x| x.identifier() == metric_identifier) + .map(|metric| match metric { + Metric::LatencyPercentiles { values, .. } => values.clone(), + _ => unreachable!(), + }) + .unwrap_or(vec![]) + }) + .max_by_key(|x| x.len()) + .unwrap() + .into_iter() + .enumerate() + { + rows.push(Row::measurements( + reports, + &largest_col.quantile, + |report| { + report + .metrics + .iter() + .find(|x| x.identifier() == metric_identifier) + .and_then(|metric| match metric { + Metric::LatencyPercentiles { values, .. } => { + values.get(i).map(|x| x.to_measurement()) + } + _ => unreachable!(), + }) + }, + )); + } + } } }