From 99ad549a0d48ca9f3bcb75a335376dc7b197946d Mon Sep 17 00:00:00 2001
From: Lucas Kent <rubickent@gmail.com>
Date: Wed, 18 Oct 2023 18:59:02 +1100
Subject: [PATCH] windsock: fix ops instability

---
 .cargo/config.toml                           |   1 +
 Cargo.lock                                   |   2 +
 shotover-proxy/benches/windsock/cassandra.rs |   2 +-
 windsock/Cargo.toml                          |   2 +
 windsock/src/bench.rs                        | 106 ++++++++++++-------
 5 files changed, 74 insertions(+), 39 deletions(-)

diff --git a/.cargo/config.toml b/.cargo/config.toml
index 3e1c66273..fdfdc6365 100644
--- a/.cargo/config.toml
+++ b/.cargo/config.toml
@@ -9,5 +9,6 @@ linker = "aarch64-linux-gnu-gcc"
 
 [alias]
 windsock = "test --release --bench windsock --features alpha-transforms,rdkafka-driver-tests --"
+windsock-profile = "test --profile profiling --bench windsock --features alpha-transforms,rdkafka-driver-tests --"
 windsock-debug = "test --bench windsock --features alpha-transforms,rdkafka-driver-tests --"
 windsock-cloud-docker = "run --package windsock-cloud-docker --"
diff --git a/Cargo.lock b/Cargo.lock
index 2b2d8dca0..6cb21a3b8 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -5658,11 +5658,13 @@ dependencies = [
  "console",
  "copy_dir",
  "docker-compose-runner",
+ "futures",
  "scylla",
  "serde",
  "strum 0.25.0",
  "time",
  "tokio",
+ "tokio-stream",
 ]
 
 [[package]]
diff --git a/shotover-proxy/benches/windsock/cassandra.rs b/shotover-proxy/benches/windsock/cassandra.rs
index eceb452ac..7a721520e 100644
--- a/shotover-proxy/benches/windsock/cassandra.rs
+++ b/shotover-proxy/benches/windsock/cassandra.rs
@@ -55,7 +55,7 @@ use tokio_bin_process::bin_path;
 use uuid::Uuid;
 use windsock::{Bench, BenchParameters, BenchTask, Profiling, Report};
 
-const ROW_COUNT: usize = 1000;
+const ROW_COUNT: usize = 100000;
 
 #[derive(Clone)]
 pub enum Compression {
diff --git a/windsock/Cargo.toml b/windsock/Cargo.toml
index df22a391d..97e93efff 100644
--- a/windsock/Cargo.toml
+++ b/windsock/Cargo.toml
@@ -13,10 +13,12 @@ bincode.workspace = true
 clap.workspace = true
 console = "0.15.5"
 copy_dir = "0.1.2"
+futures.workspace = true
 serde = { workspace = true, features = ["derive"] }
 strum = { version = "0.25.0", features = ["derive"] }
 time = { version = "0.3.25", features = ["serde"] }
 tokio.workspace = true
+tokio-stream = "0.1.14"
 
 [dev-dependencies]
 scylla = { version = "0.10.0", features = ["ssl"] }
diff --git a/windsock/src/bench.rs b/windsock/src/bench.rs
index 733d7eb05..9b149c14b 100644
--- a/windsock/src/bench.rs
+++ b/windsock/src/bench.rs
@@ -3,6 +3,7 @@ use crate::report::{report_builder, Report, ReportArchive};
 use crate::tables::ReportColumn;
 use anyhow::Result;
 use async_trait::async_trait;
+use futures::StreamExt;
 use serde::{Deserialize, Serialize};
 use std::collections::{HashMap, HashSet};
 use std::fmt::Write;
@@ -10,6 +11,8 @@ use std::path::PathBuf;
 use std::time::{Duration, Instant};
 use tokio::sync::mpsc::UnboundedSender;
 use tokio::task::JoinHandle;
+use tokio_stream::wrappers::IntervalStream;
+use tokio_stream::Stream;
 
 pub struct BenchState {
     bench: Box<dyn Bench>,
@@ -284,45 +287,72 @@ pub trait BenchTask: Clone + Send + Sync + 'static {
         reporter: UnboundedSender<Report>,
         operations_per_second: Option<u64>,
     ) -> Vec<JoinHandle<()>> {
-        let mut tasks = vec![];
-        // 100 is a generally nice amount of tasks to have, but if we have more tasks than OPS the throughput is very unstable
-        let task_count = operations_per_second.map(|x| x.min(100)).unwrap_or(100);
-
-        let allocated_time_per_op = operations_per_second
-            .map(|ops| (Duration::from_secs(1) * task_count as u32) / ops as u32);
-        for i in 0..task_count {
-            let task = self.clone();
-            let reporter = reporter.clone();
-            tasks.push(tokio::spawn(async move {
-                // spread load out over a second
-                tokio::time::sleep(Duration::from_nanos((1_000_000_000 / task_count) * i)).await;
-
-                let mut interval = allocated_time_per_op.map(tokio::time::interval);
-
-                loop {
-                    if let Some(interval) = &mut interval {
-                        interval.tick().await;
-                    }
-
-                    let operation_start = Instant::now();
-                    let report = match task.run_one_operation().await {
-                        Ok(()) => Report::QueryCompletedIn(operation_start.elapsed()),
-                        Err(message) => Report::QueryErrored {
-                            completed_in: operation_start.elapsed(),
-                            message,
-                        },
-                    };
-                    if reporter.send(report).is_err() {
-                        // The benchmark has completed and the reporter no longer wants to receive reports so just shutdown
-                        return;
-                    }
-                }
-            }));
+        match operations_per_second {
+            Some(ops) => {
+                spawn_tasks_inner(
+                    self.clone(),
+                    interval_stream(ops),
+                    reporter,
+                    operations_per_second,
+                )
+                .await
+            }
+            None => {
+                spawn_tasks_inner(
+                    self.clone(),
+                    futures::stream::repeat_with(|| ()),
+                    reporter,
+                    operations_per_second,
+                )
+                .await
+            }
         }
+    }
+}
 
-        // sleep until all tasks have started running
-        tokio::time::sleep(Duration::from_secs(1)).await;
+async fn spawn_tasks_inner<T: BenchTask, I>(
+    task: T,
+    stream: impl Stream<Item = I> + std::marker::Unpin + Send + 'static,
+    reporter: UnboundedSender<Report>,
+    operations_per_second: Option<u64>,
+) -> Vec<JoinHandle<()>> {
+    vec![tokio::spawn(async move {
+        // 100 is a generally nice amount of tasks to have, but if we have more tasks than OPS the throughput is very unstable
+        let task_count = operations_per_second.map(|x| x.min(500)).unwrap_or(500);
+
+        let mut result_stream = stream
+            // unconstrained to workaround quadratic complexity of buffer_unordered()
+            .map(|_| {
+                tokio::task::unconstrained(async {
+                    let start = Instant::now();
+                    let result = task.run_one_operation().await;
+                    (result, start.elapsed()) // TODO: should elapsed happen here or after the `.next().await`?
+                })
+            })
+            .buffer_unordered(task_count as usize);
+
+        // TODO: will this adjust to multiple tokio threads? what happens if I make each run_one_operation a task?
+
+        while let Some((res, elapsed)) = result_stream.next().await {
+            let report = match res {
+                Ok(()) => Report::QueryCompletedIn(elapsed),
+                Err(message) => Report::QueryErrored {
+                    completed_in: elapsed,
+                    message,
+                },
+            };
+            if reporter.send(report).is_err() {
+                // The benchmark has completed and the reporter no longer wants to receive reports so just shutdown
+                break;
+            }
+        }
+    })]
+}
 
-        tasks
-    }
+/// Create a stream that emits the configured events per second
+fn interval_stream(events_per_second: u64) -> IntervalStream {
+    let mut interval =
+        tokio::time::interval(Duration::from_nanos(1_000_000_000 / events_per_second));
+    interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
+    IntervalStream::new(interval)
 }