Skip to content

Commit

Permalink
windsock: add sys monitor profiler - archive support (#1277)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Aug 19, 2023
1 parent 2d4a27d commit b23e5e1
Show file tree
Hide file tree
Showing 6 changed files with 204 additions and 20 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

85 changes: 80 additions & 5 deletions shotover-proxy/examples/windsock/profilers/sar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,90 @@ use tokio::{
process::Command,
sync::mpsc::{unbounded_channel, UnboundedReceiver},
};
use windsock::{Goal, Metric, ReportArchive};

/// Reads the bench archive for `bench_name` from disk.
/// Inserts the passed sar metrics for `instance_name`.
/// Then writes the resulting archive back to disk
pub fn insert_sar_results_to_bench_archive(bench_name: &str, instance_name: &str, sar: ParsedSar) {
// TODO: Insert to archive instead of dumping to stdout
println!("sys_monitor results for {bench_name} - {instance_name}");
println!("started at: {}", sar.started_at);
println!("{:#?}", sar.named_values);
pub fn insert_sar_results_to_bench_archive(
bench_name: &str,
instance_name: &str,
mut sar: ParsedSar,
) {
let mut report = ReportArchive::load(bench_name).unwrap();

// The bench will start after sar has started so we need to throw away all sar metrics that were recorded before the bench started.
let time_diff = report.bench_started_at - sar.started_at;
let inital_values_to_discard = time_diff.as_seconds_f32().round() as usize;
for values in sar.named_values.values_mut() {
values.drain(0..inital_values_to_discard);
}

// use short names so we can keep each call on one line.
let p = instance_name;
let s = &sar;
report.metrics.extend([
metric(s, p, "CPU User", "%", "%user", Goal::SmallerIsBetter),
metric(s, p, "CPU System", "%", "%system", Goal::SmallerIsBetter),
metric(s, p, "CPU Nice", "%", "%nice", Goal::SmallerIsBetter),
metric(s, p, "CPU IO Wait", "%", "%iowait", Goal::SmallerIsBetter),
metric(s, p, "CPU Steal", "%", "%steal", Goal::SmallerIsBetter),
metric(s, p, "CPU Idle", "%", "%idle", Goal::BiggerIsBetter),
metric_with_formatter(
s,
p,
"Memory Used",
|value| {
// sar calls this a KB (kilobyte) but its actually a KiB (kibibyte)
let value_kib: f32 = value.parse().unwrap();
let value_mib = value_kib / 1024.0;
format!("{} MiB", value_mib)
},
"kbmemused",
Goal::SmallerIsBetter,
),
]);

report.save();
}

/// Shortcut for common metric formatting case
fn metric(
sar: &ParsedSar,
prefix: &str,
name: &str,
unit: &str,
sar_name: &str,
goal: Goal,
) -> Metric {
metric_with_formatter(sar, prefix, name, |x| format!("{x}{unit}"), sar_name, goal)
}

/// Take a sars metric and transform it into a metric that can be stored in a bench archive
fn metric_with_formatter<F: Fn(&str) -> String>(
sar: &ParsedSar,
prefix: &str,
name: &str,
value_formatter: F,
sar_name: &str,
goal: Goal,
) -> Metric {
let name = if prefix.is_empty() {
name.to_owned()
} else {
format!("{prefix} - {name}")
};
Metric::EachSecond {
name,
values: sar
.named_values
.get(sar_name)
.ok_or_else(|| format!("No key {} in {:?}", sar_name, sar.named_values))
.unwrap()
.iter()
.map(|x| (x.parse().unwrap(), value_formatter(x), goal))
.collect(),
}
}

/// parse lines of output from the sar command which looks like:
Expand Down
1 change: 1 addition & 0 deletions windsock/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ copy_dir = "0.1.2"
docker-compose-runner = "0.2.0"
serde = { workspace = true, features = ["derive"] }
strum = { version = "0.25.0", features = ["derive"] }
time = { version = "0.3.25", features = ["serde"] }
tokio.workspace = true

[dev-dependencies]
Expand Down
3 changes: 2 additions & 1 deletion windsock/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ mod report;
mod tables;

pub use bench::{Bench, BenchParameters, BenchTask, Profiling};
pub use report::{Report, ReportArchive};
pub use report::{Metric, Report, ReportArchive};
pub use tables::Goal;

use anyhow::{anyhow, Result};
use bench::BenchState;
Expand Down
68 changes: 57 additions & 11 deletions windsock/src/report.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use crate::{bench::Tags, data::windsock_path};
use crate::{bench::Tags, data::windsock_path, Goal};
use anyhow::{anyhow, Result};
use serde::{Deserialize, Serialize};
use std::{io::ErrorKind, path::PathBuf, time::Duration};
use strum::{EnumCount, EnumIter, IntoEnumIterator};
use time::OffsetDateTime;
use tokio::sync::mpsc::UnboundedReceiver;

#[derive(Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -94,8 +95,10 @@ type Percentiles = [Duration; Percentile::COUNT];
pub struct ReportArchive {
pub(crate) running_in_release: bool,
pub(crate) tags: Tags,
pub bench_started_at: OffsetDateTime,
pub(crate) operations_report: Option<OperationsReport>,
pub(crate) pubsub_report: Option<PubSubReport>,
pub metrics: Vec<Metric>,
pub(crate) error_messages: Vec<String>,
}

Expand Down Expand Up @@ -130,6 +133,47 @@ pub(crate) struct PubSubReport {
pub(crate) backlog_each_second: Vec<i64>,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum Metric {
Total {
name: String,
compare: f64,
value: String,
goal: Goal,
},
EachSecond {
name: String,
values: Vec<(f64, String, Goal)>,
},
}

impl Metric {
pub(crate) fn identifier(&self) -> MetricIdentifier {
match self {
Metric::Total { name, .. } => MetricIdentifier::Total {
name: name.to_owned(),
},
Metric::EachSecond { name, .. } => MetricIdentifier::EachSecond {
name: name.to_owned(),
},
}
}

#[allow(clippy::len_without_is_empty)]
pub(crate) fn len(&self) -> usize {
match self {
Metric::Total { .. } => 1,
Metric::EachSecond { values, .. } => values.len(),
}
}
}

#[derive(PartialEq)]
pub enum MetricIdentifier {
Total { name: String },
EachSecond { name: String },
}

fn error_message_insertion(messages: &mut Vec<String>, new_message: String) {
if !messages.contains(&new_message) {
if messages.len() <= 5 {
Expand Down Expand Up @@ -187,7 +231,7 @@ impl ReportArchive {
reports
}

fn save(&self) {
pub fn save(&self) {
let path = self.path();
std::fs::create_dir_all(path.parent().unwrap()).unwrap();
std::fs::write(&path, bincode::serialize(self).unwrap())
Expand Down Expand Up @@ -241,7 +285,7 @@ pub(crate) async fn report_builder(
running_in_release: bool,
) -> ReportArchive {
let mut finished_in = None;
let mut started = false;
let mut started = None;
let mut pubsub_report = None;
let mut operations_report = None;
let mut operation_times = vec![];
Expand All @@ -253,11 +297,11 @@ pub(crate) async fn report_builder(
while let Some(report) = rx.recv().await {
match report {
Report::Start => {
started = true;
started = Some(OffsetDateTime::now_utc());
}
Report::QueryCompletedIn(duration) => {
let report = operations_report.get_or_insert_with(OperationsReport::default);
if started {
if started.is_some() {
report.total += 1;
total_operation_time += duration;
operation_times.push(duration);
Expand All @@ -272,15 +316,15 @@ pub(crate) async fn report_builder(
message,
} => {
let report = operations_report.get_or_insert_with(OperationsReport::default);
if started {
if started.is_some() {
error_message_insertion(&mut error_messages, message);
report.total_errors += 1;
total_operation_time += completed_in;
}
}
Report::ProduceCompletedIn(duration) => {
let report = pubsub_report.get_or_insert_with(PubSubReport::default);
if started {
if started.is_some() {
report.total_backlog += 1;
report.total_produce += 1;
total_produce_time += duration;
Expand All @@ -296,15 +340,15 @@ pub(crate) async fn report_builder(
message,
} => {
let report = pubsub_report.get_or_insert_with(PubSubReport::default);
if started {
if started.is_some() {
error_message_insertion(&mut error_messages, message);
report.total_produce_error += 1;
total_produce_time += completed_in;
}
}
Report::ConsumeCompleted => {
let report = pubsub_report.get_or_insert_with(PubSubReport::default);
if started {
if started.is_some() {
report.total_backlog -= 1;
report.total_consume += 1;
match report.consume_each_second.last_mut() {
Expand All @@ -315,7 +359,7 @@ pub(crate) async fn report_builder(
}
Report::ConsumeErrored { message } => {
let report = pubsub_report.get_or_insert_with(PubSubReport::default);
if started {
if started.is_some() {
error_message_insertion(&mut error_messages, message);
report.total_consume_error += 1;
}
Expand All @@ -335,7 +379,7 @@ pub(crate) async fn report_builder(
}
}
Report::FinishedIn(duration) => {
if !started {
if started.is_none() {
panic!("The bench never returned Report::Start")
}
finished_in = Some(duration);
Expand Down Expand Up @@ -376,11 +420,13 @@ pub(crate) async fn report_builder(
}

let archive = ReportArchive {
bench_started_at: started.unwrap(),
running_in_release,
tags,
pubsub_report,
error_messages,
operations_report,
metrics: vec![],
};
archive.save();
archive
Expand Down
66 changes: 63 additions & 3 deletions windsock/src/tables.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use crate::{
bench::Tags,
filter::Filter,
report::{Percentile, ReportArchive},
report::{MetricIdentifier, Percentile, ReportArchive},
Metric,
};
use anyhow::{Context, Result};
use console::{pad_str, pad_str_with, style, Alignment};
use serde::{Deserialize, Serialize};
use std::{collections::HashSet, time::Duration};
use strum::IntoEnumIterator;

Expand Down Expand Up @@ -503,6 +505,63 @@ fn base(reports: &[ReportColumn], table_type: &str) {
}
}

let mut metrics_to_display = vec![];
for report in reports {
for metric in &report.current.metrics {
if !metrics_to_display.contains(&metric.identifier()) {
metrics_to_display.push(metric.identifier())
}
}
}
for metric_identifier in metrics_to_display {
match &metric_identifier {
MetricIdentifier::Total { name } => {
rows.push(Row::measurements(reports, name, |report| {
report
.metrics
.iter()
.find(|metric| metric.identifier() == metric_identifier)
.map(|metric| match metric {
Metric::EachSecond { .. } => unreachable!(),
Metric::Total {
compare,
value,
goal,
..
} => (*compare, value.to_owned(), *goal),
})
}));
}
MetricIdentifier::EachSecond { name } => {
rows.push(Row::Heading(format!("{name} Each Second")));
for i in 0..reports
.iter()
.map(|x| {
x.current
.metrics
.iter()
.find(|x| x.identifier() == metric_identifier)
.map(|metric| metric.len())
.unwrap_or(0)
})
.max()
.unwrap()
{
rows.push(Row::measurements(reports, &i.to_string(), |report| {
report
.metrics
.iter()
.find(|x| x.identifier() == metric_identifier)
.and_then(|metric| match metric {
Metric::Total { .. } => unreachable!(),
Metric::EachSecond { values, .. } => values.get(i).cloned(),
})
}));
}
}
}
}

// the width of the legend column
let legend_width: usize = rows
.iter()
Expand All @@ -514,7 +573,7 @@ fn base(reports: &[ReportColumn], table_type: &str) {
})
.max()
.unwrap_or(10);
// the width of the comparison compoenent of each column
// the width of the comparison component of each column
let comparison_widths: Vec<usize> = reports
.iter()
.enumerate()
Expand Down Expand Up @@ -712,7 +771,8 @@ struct Measurement {
color: Color,
}

enum Goal {
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
pub enum Goal {
BiggerIsBetter,
SmallerIsBetter,
}
Expand Down

0 comments on commit b23e5e1

Please sign in to comment.