diff --git a/Cargo.lock b/Cargo.lock index b9f880370..37431b3b2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5802,6 +5802,7 @@ dependencies = [ "scylla", "serde", "strum 0.25.0", + "time 0.3.25", "tokio", ] diff --git a/shotover-proxy/examples/windsock/profilers/sar.rs b/shotover-proxy/examples/windsock/profilers/sar.rs index 767b16c69..7514c7b47 100644 --- a/shotover-proxy/examples/windsock/profilers/sar.rs +++ b/shotover-proxy/examples/windsock/profilers/sar.rs @@ -8,15 +8,90 @@ use tokio::{ process::Command, sync::mpsc::{unbounded_channel, UnboundedReceiver}, }; +use windsock::{Goal, Metric, ReportArchive}; /// Reads the bench archive for `bench_name` from disk. /// Inserts the passed sar metrics for `instance_name`. /// Then writes the resulting archive back to disk -pub fn insert_sar_results_to_bench_archive(bench_name: &str, instance_name: &str, sar: ParsedSar) { - // TODO: Insert to archive instead of dumping to stdout - println!("sys_monitor results for {bench_name} - {instance_name}"); - println!("started at: {}", sar.started_at); - println!("{:#?}", sar.named_values); +pub fn insert_sar_results_to_bench_archive( + bench_name: &str, + instance_name: &str, + mut sar: ParsedSar, +) { + let mut report = ReportArchive::load(bench_name).unwrap(); + + // The bench will start after sar has started so we need to throw away all sar metrics that were recorded before the bench started. + let time_diff = report.bench_started_at - sar.started_at; + let inital_values_to_discard = time_diff.as_seconds_f32().round() as usize; + for values in sar.named_values.values_mut() { + values.drain(0..inital_values_to_discard); + } + + // use short names so we can keep each call on one line. + let p = instance_name; + let s = &sar; + report.metrics.extend([ + metric(s, p, "CPU User", "%", "%user", Goal::SmallerIsBetter), + metric(s, p, "CPU System", "%", "%system", Goal::SmallerIsBetter), + metric(s, p, "CPU Nice", "%", "%nice", Goal::SmallerIsBetter), + metric(s, p, "CPU IO Wait", "%", "%iowait", Goal::SmallerIsBetter), + metric(s, p, "CPU Steal", "%", "%steal", Goal::SmallerIsBetter), + metric(s, p, "CPU Idle", "%", "%idle", Goal::BiggerIsBetter), + metric_with_formatter( + s, + p, + "Memory Used", + |value| { + // sar calls this a KB (kilobyte) but its actually a KiB (kibibyte) + let value_kib: f32 = value.parse().unwrap(); + let value_mib = value_kib / 1024.0; + format!("{} MiB", value_mib) + }, + "kbmemused", + Goal::SmallerIsBetter, + ), + ]); + + report.save(); +} + +/// Shortcut for common metric formatting case +fn metric( + sar: &ParsedSar, + prefix: &str, + name: &str, + unit: &str, + sar_name: &str, + goal: Goal, +) -> Metric { + metric_with_formatter(sar, prefix, name, |x| format!("{x}{unit}"), sar_name, goal) +} + +/// Take a sars metric and transform it into a metric that can be stored in a bench archive +fn metric_with_formatter String>( + sar: &ParsedSar, + prefix: &str, + name: &str, + value_formatter: F, + sar_name: &str, + goal: Goal, +) -> Metric { + let name = if prefix.is_empty() { + name.to_owned() + } else { + format!("{prefix} - {name}") + }; + Metric::EachSecond { + name, + values: sar + .named_values + .get(sar_name) + .ok_or_else(|| format!("No key {} in {:?}", sar_name, sar.named_values)) + .unwrap() + .iter() + .map(|x| (x.parse().unwrap(), value_formatter(x), goal)) + .collect(), + } } /// parse lines of output from the sar command which looks like: diff --git a/windsock/Cargo.toml b/windsock/Cargo.toml index b3fc5289c..7f4576f4d 100644 --- a/windsock/Cargo.toml +++ b/windsock/Cargo.toml @@ -16,6 +16,7 @@ copy_dir = "0.1.2" docker-compose-runner = "0.2.0" serde = { workspace = true, features = ["derive"] } strum = { version = "0.25.0", features = ["derive"] } +time = { version = "0.3.25", features = ["serde"] } tokio.workspace = true [dev-dependencies] diff --git a/windsock/src/lib.rs b/windsock/src/lib.rs index 485f861f4..ce25e1c96 100644 --- a/windsock/src/lib.rs +++ b/windsock/src/lib.rs @@ -7,7 +7,8 @@ mod report; mod tables; pub use bench::{Bench, BenchParameters, BenchTask, Profiling}; -pub use report::{Report, ReportArchive}; +pub use report::{Metric, Report, ReportArchive}; +pub use tables::Goal; use anyhow::{anyhow, Result}; use bench::BenchState; diff --git a/windsock/src/report.rs b/windsock/src/report.rs index c75a30ed5..9dd45aeb4 100644 --- a/windsock/src/report.rs +++ b/windsock/src/report.rs @@ -1,8 +1,9 @@ -use crate::{bench::Tags, data::windsock_path}; +use crate::{bench::Tags, data::windsock_path, Goal}; use anyhow::{anyhow, Result}; use serde::{Deserialize, Serialize}; use std::{io::ErrorKind, path::PathBuf, time::Duration}; use strum::{EnumCount, EnumIter, IntoEnumIterator}; +use time::OffsetDateTime; use tokio::sync::mpsc::UnboundedReceiver; #[derive(Debug, Serialize, Deserialize)] @@ -94,8 +95,10 @@ type Percentiles = [Duration; Percentile::COUNT]; pub struct ReportArchive { pub(crate) running_in_release: bool, pub(crate) tags: Tags, + pub bench_started_at: OffsetDateTime, pub(crate) operations_report: Option, pub(crate) pubsub_report: Option, + pub metrics: Vec, pub(crate) error_messages: Vec, } @@ -130,6 +133,47 @@ pub(crate) struct PubSubReport { pub(crate) backlog_each_second: Vec, } +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum Metric { + Total { + name: String, + compare: f64, + value: String, + goal: Goal, + }, + EachSecond { + name: String, + values: Vec<(f64, String, Goal)>, + }, +} + +impl Metric { + pub(crate) fn identifier(&self) -> MetricIdentifier { + match self { + Metric::Total { name, .. } => MetricIdentifier::Total { + name: name.to_owned(), + }, + Metric::EachSecond { name, .. } => MetricIdentifier::EachSecond { + name: name.to_owned(), + }, + } + } + + #[allow(clippy::len_without_is_empty)] + pub(crate) fn len(&self) -> usize { + match self { + Metric::Total { .. } => 1, + Metric::EachSecond { values, .. } => values.len(), + } + } +} + +#[derive(PartialEq)] +pub enum MetricIdentifier { + Total { name: String }, + EachSecond { name: String }, +} + fn error_message_insertion(messages: &mut Vec, new_message: String) { if !messages.contains(&new_message) { if messages.len() <= 5 { @@ -187,7 +231,7 @@ impl ReportArchive { reports } - fn save(&self) { + pub fn save(&self) { let path = self.path(); std::fs::create_dir_all(path.parent().unwrap()).unwrap(); std::fs::write(&path, bincode::serialize(self).unwrap()) @@ -241,7 +285,7 @@ pub(crate) async fn report_builder( running_in_release: bool, ) -> ReportArchive { let mut finished_in = None; - let mut started = false; + let mut started = None; let mut pubsub_report = None; let mut operations_report = None; let mut operation_times = vec![]; @@ -253,11 +297,11 @@ pub(crate) async fn report_builder( while let Some(report) = rx.recv().await { match report { Report::Start => { - started = true; + started = Some(OffsetDateTime::now_utc()); } Report::QueryCompletedIn(duration) => { let report = operations_report.get_or_insert_with(OperationsReport::default); - if started { + if started.is_some() { report.total += 1; total_operation_time += duration; operation_times.push(duration); @@ -272,7 +316,7 @@ pub(crate) async fn report_builder( message, } => { let report = operations_report.get_or_insert_with(OperationsReport::default); - if started { + if started.is_some() { error_message_insertion(&mut error_messages, message); report.total_errors += 1; total_operation_time += completed_in; @@ -280,7 +324,7 @@ pub(crate) async fn report_builder( } Report::ProduceCompletedIn(duration) => { let report = pubsub_report.get_or_insert_with(PubSubReport::default); - if started { + if started.is_some() { report.total_backlog += 1; report.total_produce += 1; total_produce_time += duration; @@ -296,7 +340,7 @@ pub(crate) async fn report_builder( message, } => { let report = pubsub_report.get_or_insert_with(PubSubReport::default); - if started { + if started.is_some() { error_message_insertion(&mut error_messages, message); report.total_produce_error += 1; total_produce_time += completed_in; @@ -304,7 +348,7 @@ pub(crate) async fn report_builder( } Report::ConsumeCompleted => { let report = pubsub_report.get_or_insert_with(PubSubReport::default); - if started { + if started.is_some() { report.total_backlog -= 1; report.total_consume += 1; match report.consume_each_second.last_mut() { @@ -315,7 +359,7 @@ pub(crate) async fn report_builder( } Report::ConsumeErrored { message } => { let report = pubsub_report.get_or_insert_with(PubSubReport::default); - if started { + if started.is_some() { error_message_insertion(&mut error_messages, message); report.total_consume_error += 1; } @@ -335,7 +379,7 @@ pub(crate) async fn report_builder( } } Report::FinishedIn(duration) => { - if !started { + if started.is_none() { panic!("The bench never returned Report::Start") } finished_in = Some(duration); @@ -376,11 +420,13 @@ pub(crate) async fn report_builder( } let archive = ReportArchive { + bench_started_at: started.unwrap(), running_in_release, tags, pubsub_report, error_messages, operations_report, + metrics: vec![], }; archive.save(); archive diff --git a/windsock/src/tables.rs b/windsock/src/tables.rs index 5c53f93bf..3b9f3a459 100644 --- a/windsock/src/tables.rs +++ b/windsock/src/tables.rs @@ -1,10 +1,12 @@ use crate::{ bench::Tags, filter::Filter, - report::{Percentile, ReportArchive}, + report::{MetricIdentifier, Percentile, ReportArchive}, + Metric, }; use anyhow::{Context, Result}; use console::{pad_str, pad_str_with, style, Alignment}; +use serde::{Deserialize, Serialize}; use std::{collections::HashSet, time::Duration}; use strum::IntoEnumIterator; @@ -503,6 +505,63 @@ fn base(reports: &[ReportColumn], table_type: &str) { } } + let mut metrics_to_display = vec![]; + for report in reports { + for metric in &report.current.metrics { + if !metrics_to_display.contains(&metric.identifier()) { + metrics_to_display.push(metric.identifier()) + } + } + } + for metric_identifier in metrics_to_display { + match &metric_identifier { + MetricIdentifier::Total { name } => { + rows.push(Row::measurements(reports, name, |report| { + report + .metrics + .iter() + .find(|metric| metric.identifier() == metric_identifier) + .map(|metric| match metric { + Metric::EachSecond { .. } => unreachable!(), + Metric::Total { + compare, + value, + goal, + .. + } => (*compare, value.to_owned(), *goal), + }) + })); + } + MetricIdentifier::EachSecond { name } => { + rows.push(Row::Heading(format!("{name} Each Second"))); + for i in 0..reports + .iter() + .map(|x| { + x.current + .metrics + .iter() + .find(|x| x.identifier() == metric_identifier) + .map(|metric| metric.len()) + .unwrap_or(0) + }) + .max() + .unwrap() + { + rows.push(Row::measurements(reports, &i.to_string(), |report| { + report + .metrics + .iter() + .find(|x| x.identifier() == metric_identifier) + .and_then(|metric| match metric { + Metric::Total { .. } => unreachable!(), + Metric::EachSecond { values, .. } => values.get(i).cloned(), + }) + })); + } + } + } + } + // the width of the legend column let legend_width: usize = rows .iter() @@ -514,7 +573,7 @@ fn base(reports: &[ReportColumn], table_type: &str) { }) .max() .unwrap_or(10); - // the width of the comparison compoenent of each column + // the width of the comparison component of each column let comparison_widths: Vec = reports .iter() .enumerate() @@ -712,7 +771,8 @@ struct Measurement { color: Color, } -enum Goal { +#[derive(Clone, Copy, Debug, Serialize, Deserialize)] +pub enum Goal { BiggerIsBetter, SmallerIsBetter, }