Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add shotover metrics profiler #1378

Merged
merged 5 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/windsock_benches.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ jobs:
cargo windsock --bench-length-seconds 5 --operations-per-second 100 --profilers flamegraph --name cassandra,compression=none,driver=scylla,operation=read_i64,protocol=v4,shotover=standard,topology=single
cargo windsock --bench-length-seconds 5 --operations-per-second 100 --profilers samply --name cassandra,compression=none,driver=scylla,operation=read_i64,protocol=v4,shotover=standard,topology=single
cargo windsock --bench-length-seconds 5 --operations-per-second 100 --profilers sys_monitor --name kafka,shotover=standard,size=1B,topology=single
cargo windsock --bench-length-seconds 5 --operations-per-second 100 --profilers shotover_metrics --name redis,encryption=none,operation=get,shotover=standard,topology=single

# windsock/examples/cassandra.rs - this can stay here until windsock is moved to its own repo
cargo run --release --example cassandra -- --bench-length-seconds 5 --operations-per-second 100
Expand Down
14 changes: 14 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions shotover-proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ license = "Apache-2.0"
shotover = { path = "../shotover" }

[dev-dependencies]
prometheus-parse = "0.2.4"
reqwest.workspace = true
scylla.workspace = true
anyhow.workspace = true
tokio.workspace = true
Expand Down
3 changes: 2 additions & 1 deletion shotover-proxy/benches/windsock/cassandra.rs
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,8 @@ impl Bench for CassandraBench {
}
}
let mut profiler =
CloudProfilerRunner::new(self.name(), profiling, profiler_instances).await;
CloudProfilerRunner::new(self.name(), profiling, profiler_instances, &shotover_ip)
.await;

let cassandra_nodes = vec![
AwsNodeInfo {
Expand Down
6 changes: 3 additions & 3 deletions shotover-proxy/benches/windsock/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,12 +246,12 @@ impl Bench for KafkaBench {
}
}

let mut profiler =
CloudProfilerRunner::new(self.name(), profiling, profiler_instances).await;

let kafka_ip = kafka_instance1.instance.private_ip().to_string();
let shotover_ip = shotover_instance.instance.private_ip().to_string();

let mut profiler =
CloudProfilerRunner::new(self.name(), profiling, profiler_instances, &kafka_ip).await;

let kafka_instances = vec![
kafka_instance1.clone(),
kafka_instance2.clone(),
Expand Down
39 changes: 38 additions & 1 deletion shotover-proxy/benches/windsock/profilers.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use self::samply::Samply;
use self::{samply::Samply, shotover_metrics::ShotoverMetrics};
use crate::common::Shotover;
use anyhow::Result;
use aws_throwaway::Ec2Instance;
Expand All @@ -11,14 +11,17 @@ use windsock::Profiling;
mod perf_flamegraph;
mod samply;
mod sar;
mod shotover_metrics;

pub struct ProfilerRunner {
bench_name: String,
run_flamegraph: bool,
run_samply: bool,
run_shotover_metrics: bool,
run_sys_monitor: bool,
results_path: PathBuf,
perf: Option<Perf>,
shotover_metrics: Option<ShotoverMetrics>,
samply: Option<Samply>,
sys_monitor: Option<UnboundedReceiver<Result<String>>>,
}
Expand All @@ -32,14 +35,19 @@ impl ProfilerRunner {
let run_sys_monitor = profiling
.profilers_to_use
.contains(&"sys_monitor".to_owned());
let run_shotover_metrics = profiling
.profilers_to_use
.contains(&"shotover_metrics".to_owned());

ProfilerRunner {
bench_name,
run_flamegraph,
run_sys_monitor,
run_samply,
run_shotover_metrics,
results_path: profiling.results_path,
perf: None,
shotover_metrics: None,
samply: None,
sys_monitor: None,
}
Expand All @@ -58,6 +66,15 @@ impl ProfilerRunner {
} else {
None
};
self.shotover_metrics = if self.run_shotover_metrics {
if shotover.is_some() {
Some(ShotoverMetrics::new(self.bench_name.clone(), "localhost"))
} else {
panic!("shotover_metrics not supported when benching without shotover")
}
} else {
None
};
self.samply = if self.run_samply {
if let Some(shotover) = &shotover {
Some(Samply::run(self.results_path.clone(), shotover.child().id().unwrap()).await)
Expand Down Expand Up @@ -91,6 +108,9 @@ impl Drop for ProfilerRunner {
if let Some(samply) = self.samply.take() {
samply.wait();
}
if let Some(shotover_metrics) = self.shotover_metrics.take() {
shotover_metrics.insert_results_to_bench_archive();
}
if let Some(mut rx) = self.sys_monitor.take() {
sar::insert_sar_results_to_bench_archive(&self.bench_name, "", sar::parse_sar(&mut rx));
}
Expand All @@ -100,28 +120,41 @@ impl Drop for ProfilerRunner {
pub struct CloudProfilerRunner {
bench_name: String,
monitor_instances: HashMap<String, UnboundedReceiver<Result<String>>>,
shotover_metrics: Option<ShotoverMetrics>,
}

impl CloudProfilerRunner {
pub async fn new(
bench_name: String,
profiling: Profiling,
instances: HashMap<String, &Ec2Instance>,
shotover_ip: &str,
) -> Self {
let run_sys_monitor = profiling
.profilers_to_use
.contains(&"sys_monitor".to_owned());

let run_shotover_metrics = profiling
.profilers_to_use
.contains(&"shotover_metrics".to_owned());

let mut monitor_instances = HashMap::new();
if run_sys_monitor {
for (name, instance) in instances {
monitor_instances.insert(name, sar::run_sar_remote(instance).await);
}
}

let shotover_metrics = if run_shotover_metrics {
Some(ShotoverMetrics::new(bench_name.clone(), shotover_ip))
} else {
None
};

CloudProfilerRunner {
bench_name,
monitor_instances,
shotover_metrics,
}
}

Expand All @@ -133,6 +166,9 @@ impl CloudProfilerRunner {
sar::parse_sar(instance_rx),
);
}
if let Some(shotover_metrics) = self.shotover_metrics.take() {
shotover_metrics.insert_results_to_bench_archive();
}
}
}

Expand All @@ -144,6 +180,7 @@ pub fn supported_profilers(shotover: Shotover) -> Vec<String> {
"flamegraph".to_owned(),
"samply".to_owned(),
"sys_monitor".to_owned(),
"shotover_metrics".to_owned(),
]
}
}
170 changes: 170 additions & 0 deletions shotover-proxy/benches/windsock/profilers/shotover_metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
use prometheus_parse::Value;
use std::{collections::HashMap, time::Duration};
use time::OffsetDateTime;
use tokio::task::JoinHandle;
use tokio::{sync::mpsc, time::MissedTickBehavior};
use windsock::{Goal, LatencyPercentile, Metric, ReportArchive};

pub struct ShotoverMetrics {
shutdown_tx: mpsc::Sender<()>,
task: JoinHandle<()>,
}

pub struct RawPrometheusExposition {
timestamp: OffsetDateTime,
content: String,
}
type ParsedMetrics = HashMap<String, Vec<Value>>;

impl ShotoverMetrics {
pub fn new(bench_name: String, shotover_ip: &str) -> Self {
let url = format!("http://{shotover_ip}:9001/metrics");
let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
let task = tokio::spawn(async move {
// collect metrics until shutdown
let raw_metrics = Self::collect_metrics(shutdown_rx, &url).await;

// we are now shutting down and need to process and save all collected metrics
let mut report = ReportArchive::load(&bench_name).unwrap();
let parsed = Self::parse_metrics(raw_metrics, &report);
report.metrics.extend(Self::windsock_metrics(parsed));
report.save();
});
ShotoverMetrics { task, shutdown_tx }
}

pub fn parse_metrics(
raw_metrics: Vec<RawPrometheusExposition>,
report: &ReportArchive,
) -> ParsedMetrics {
let mut result: ParsedMetrics = HashMap::new();
for raw_metric in raw_metrics {
if raw_metric.timestamp > report.bench_started_at {
let metrics = prometheus_parse::Scrape::parse(
raw_metric.content.lines().map(|x| Ok(x.to_owned())),
)
.unwrap();
for sample in metrics.samples {
let key = format!(
"{}{{{}}}",
sample
.metric
.strip_prefix("shotover_")
.unwrap_or(&sample.metric),
sample.labels
);

result.entry(key).or_default().push(sample.value);
}
}
}
result
}

pub fn windsock_metrics(parsed_metrics: ParsedMetrics) -> Vec<Metric> {
let mut result = vec![];
for (name, value) in parsed_metrics {
match value[0] {
Value::Gauge(_) => {
result.push(Metric::EachSecond {
name,
values: value
.iter()
.map(|x| {
let Value::Gauge(x) = x else {
panic!("metric type changed during bench run")
};
(*x, x.to_string(), Goal::None)
})
.collect(),
});
}
Value::Counter(_) => {
let mut prev = 0.0;
result.push(Metric::EachSecond {
name,
values: value
.iter()
.map(|x| {
let Value::Counter(x) = x else {
panic!("metric type changed during bench run")
};
let diff = x - prev;
prev = *x;
(diff, diff.to_string(), Goal::None)
})
.collect(),
});
}
Value::Summary(_) => {
let last = value.last().unwrap();
let Value::Summary(summary) = last else {
panic!("metric type changed during bench run")
};
let values = summary
.iter()
.map(|x| LatencyPercentile {
value: x.count,
value_display: format!("{:.4}ms", x.count * 1000.0),
quantile: x.quantile.to_string(),
})
.collect();
result.push(Metric::LatencyPercentiles { name, values });
}
_ => {
tracing::warn!("Unused shotover metric: {name}")
}
}
}
result.sort_by_key(|x| {
let name = x.name();
// move latency metrics to the top
if name.starts_with("chain_latency") || name.starts_with("transform_latency") {
format!("aaa_{name}")
}
// move failure metrics to the bottom.
else if name.starts_with("transform_failures")
|| name.starts_with("failed_requests")
|| name.starts_with("chain_failures")
{
format!("zzz_{name}")
} else {
name.to_owned()
}
});
result
}

async fn collect_metrics(
mut shutdown_rx: mpsc::Receiver<()>,
url: &str,
) -> Vec<RawPrometheusExposition> {
let mut results = vec![];

let mut interval = tokio::time::interval(Duration::from_secs(1));
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
loop {
tokio::select! {
_ = shutdown_rx.recv() => break,
_ = interval.tick() => {
match tokio::time::timeout(Duration::from_secs(3), reqwest::get(url)).await.unwrap() {
Ok(response) => {
results.push(RawPrometheusExposition {
timestamp: OffsetDateTime::now_utc(),
content: response.text().await.unwrap(),
});
}
Err(err) => tracing::debug!("Failed to request from metrics endpoint, probably not up yet, error was {err:?}")
}
}
}
}
results
}

pub fn insert_results_to_bench_archive(self) {
std::mem::drop(self.shutdown_tx);
// TODO: make this function + caller async, lets do this in a follow up PR to avoid making this PR even more complex.
futures::executor::block_on(async { self.task.await.unwrap() })
}
}
6 changes: 4 additions & 2 deletions shotover-proxy/benches/windsock/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,12 +222,14 @@ impl Bench for RedisBench {
profiler_instances.insert("redis".to_owned(), &instance.instance);
}
}
let mut profiler =
CloudProfilerRunner::new(self.name(), profiling, profiler_instances).await;

let redis_ip = redis_instances.private_ips()[0].to_string();
let shotover_ip = shotover_instance.instance.private_ip().to_string();

let mut profiler =
CloudProfilerRunner::new(self.name(), profiling, profiler_instances, &shotover_ip)
.await;

let (_, running_shotover) = futures::join!(
redis_instances.run(self.encryption),
self.run_aws_shotover(shotover_instance.clone(), redis_ip.clone())
Expand Down
Loading
Loading