Skip to content

Commit

Permalink
windsock: fix ops instability
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Dec 20, 2023
1 parent 9ab82f7 commit 69cf551
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 38 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions windsock/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ bincode.workspace = true
clap.workspace = true
console = "0.15.5"
copy_dir = "0.1.2"
futures.workspace = true
serde = { workspace = true, features = ["derive"] }
strum = { version = "0.25.0", features = ["derive"] }
time = { version = "0.3.25", features = ["serde"] }
tokio.workspace = true
tokio-stream = "0.1.14"

[dev-dependencies]
scylla = { version = "0.10.0", features = ["ssl"] }
Expand Down
107 changes: 69 additions & 38 deletions windsock/src/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@ use crate::report::{report_builder, Report, ReportArchive};
use crate::tables::ReportColumn;
use anyhow::Result;
use async_trait::async_trait;
use futures::StreamExt;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::fmt::Write;
use std::path::PathBuf;
use std::time::{Duration, Instant};
use tokio::sync::mpsc::UnboundedSender;
use tokio::task::JoinHandle;
use tokio_stream::wrappers::IntervalStream;
use tokio_stream::Stream;

pub struct BenchState {
bench: Box<dyn Bench>,
Expand Down Expand Up @@ -284,45 +287,73 @@ pub trait BenchTask: Clone + Send + Sync + 'static {
reporter: UnboundedSender<Report>,
operations_per_second: Option<u64>,
) -> Vec<JoinHandle<()>> {
let mut tasks = vec![];
// 100 is a generally nice amount of tasks to have, but if we have more tasks than OPS the throughput is very unstable
let task_count = operations_per_second.map(|x| x.min(100)).unwrap_or(100);

let allocated_time_per_op = operations_per_second
.map(|ops| (Duration::from_secs(1) * task_count as u32) / ops as u32);
for i in 0..task_count {
let task = self.clone();
let reporter = reporter.clone();
tasks.push(tokio::spawn(async move {
// spread load out over a second
tokio::time::sleep(Duration::from_nanos((1_000_000_000 / task_count) * i)).await;

let mut interval = allocated_time_per_op.map(tokio::time::interval);

loop {
if let Some(interval) = &mut interval {
interval.tick().await;
}

let operation_start = Instant::now();
let report = match task.run_one_operation().await {
Ok(()) => Report::QueryCompletedIn(operation_start.elapsed()),
Err(message) => Report::QueryErrored {
completed_in: operation_start.elapsed(),
message,
},
};
if reporter.send(report).is_err() {
// The benchmark has completed and the reporter no longer wants to receive reports so just shutdown
return;
}
}
}));
match operations_per_second {
Some(ops) => {
spawn_tasks_inner(
self.clone(),
interval_stream(ops),
reporter,
operations_per_second,
)
.await
}
None => {
spawn_tasks_inner(
self.clone(),
futures::stream::repeat_with(|| ()),
reporter,
operations_per_second,
)
.await
}
}
}
}

// sleep until all tasks have started running
tokio::time::sleep(Duration::from_secs(1)).await;
async fn spawn_tasks_inner<T: BenchTask, I>(
task: T,
stream: impl Stream<Item = I> + std::marker::Unpin + Send + 'static,
reporter: UnboundedSender<Report>,
operations_per_second: Option<u64>,
) -> Vec<JoinHandle<()>> {
// TODO: remove vec
vec![tokio::spawn(async move {
// 100 is a generally nice amount of tasks to have, but if we have more tasks than OPS the throughput is very unstable
let task_count = operations_per_second.map(|x| x.min(500)).unwrap_or(500);

let mut result_stream = stream
// unconstrained to workaround quadratic complexity of buffer_unordered()
.map(|_| {
tokio::task::unconstrained(async {
let start = Instant::now();
let result = task.run_one_operation().await;
(result, start.elapsed())
})
})
.buffer_unordered(task_count as usize);

// TODO: will this adjust to multiple tokio threads? what happens if I make each run_one_operation a task?

while let Some((res, elapsed)) = result_stream.next().await {
let report = match res {
Ok(()) => Report::QueryCompletedIn(elapsed),
Err(message) => Report::QueryErrored {
completed_in: elapsed,
message,
},
};
if reporter.send(report).is_err() {
// The benchmark has completed and the reporter no longer wants to receive reports so just shutdown
break;
}
}
})]
}

tasks
}
/// Create a stream that emits the configured events per second
fn interval_stream(events_per_second: u64) -> IntervalStream {
let mut interval =
tokio::time::interval(Duration::from_nanos(1_000_000_000 / events_per_second));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
IntervalStream::new(interval)
}

0 comments on commit 69cf551

Please sign in to comment.