From e5eaa49c87218a63e91eac369d6e559ef89c260b Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Thu, 19 Sep 2024 12:26:53 +1000 Subject: [PATCH] windsock: measure consume latency --- .github/workflows/windsock_benches.yaml | 2 +- Cargo.lock | 4 +- shotover-proxy/Cargo.toml | 2 +- .../benches/windsock/kafka/bench.rs | 43 +++++++++++++++---- shotover-proxy/benches/windsock/kafka/mod.rs | 2 +- 5 files changed, 40 insertions(+), 13 deletions(-) diff --git a/.github/workflows/windsock_benches.yaml b/.github/workflows/windsock_benches.yaml index b02e8232c..0714d4149 100644 --- a/.github/workflows/windsock_benches.yaml +++ b/.github/workflows/windsock_benches.yaml @@ -39,7 +39,7 @@ jobs: # run some extra cases that arent handled by nextest cargo windsock local-run --bench-length-seconds 5 --operations-per-second 100 --profilers flamegraph db=cassandra,compression=none,connection_count=1,driver=scylla,operation=read_i64,protocol=v4,shotover=standard,topology=single cargo windsock local-run --bench-length-seconds 5 --operations-per-second 100 --profilers samply db=cassandra,compression=none,connection_count=1,driver=scylla,operation=read_i64,protocol=v4,shotover=standard,topology=single - cargo windsock local-run --bench-length-seconds 5 --operations-per-second 100 --profilers sys_monitor db=kafka,shotover=standard,size=1B,topology=single + cargo windsock local-run --bench-length-seconds 5 --operations-per-second 100 --profilers sys_monitor db=kafka,shotover=standard,size=12B,topology=single cargo windsock local-run --bench-length-seconds 5 --operations-per-second 100 --profilers shotover_metrics db=redis,encryption=none,operation=get,shotover=standard,topology=single - name: Ensure that tests did not create or modify any files that arent .gitignore'd run: | diff --git a/Cargo.lock b/Cargo.lock index 7b006bf0e..b4a2941c6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5934,9 +5934,9 @@ checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" [[package]] name = "windsock" -version = "0.1.1" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4fe4435a550d5c9d0361a420bd02d3fd77c7cb15d22743e7857597a5fdb8388" +checksum = "8f186f03f8f547e6eb32a314c55e52754e3315eb299c9d27e4d1758eb1e254f8" dependencies = [ "anyhow", "async-trait", diff --git a/shotover-proxy/Cargo.toml b/shotover-proxy/Cargo.toml index e927d5160..ec0591b25 100644 --- a/shotover-proxy/Cargo.toml +++ b/shotover-proxy/Cargo.toml @@ -46,7 +46,7 @@ tokio-bin-process.workspace = true rustls-pemfile = "2.0.0" rustls-pki-types = "1.1.0" aws-throwaway.workspace = true -windsock = "0.1.0" +windsock = "0.2.0" regex = "1.7.0" opensearch = { version = "2.1.0", default-features = false, features = ["rustls-tls"] } serde_json = "1.0.103" diff --git a/shotover-proxy/benches/windsock/kafka/bench.rs b/shotover-proxy/benches/windsock/kafka/bench.rs index 036121d4f..ebb52333f 100644 --- a/shotover-proxy/benches/windsock/kafka/bench.rs +++ b/shotover-proxy/benches/windsock/kafka/bench.rs @@ -18,6 +18,7 @@ use shotover::transforms::kafka::sink_cluster::{KafkaSinkClusterConfig, Shotover use shotover::transforms::kafka::sink_single::KafkaSinkSingleConfig; use shotover::transforms::TransformConfig; use std::sync::Arc; +use std::time::SystemTime; use std::{collections::HashMap, time::Duration}; use test_helpers::connection::kafka::cpp::rdkafka::admin::{ AdminClient, AdminOptions, NewTopic, TopicReplication, @@ -27,6 +28,7 @@ use test_helpers::connection::kafka::cpp::rdkafka::config::ClientConfig; use test_helpers::connection::kafka::cpp::rdkafka::consumer::{Consumer, StreamConsumer}; use test_helpers::connection::kafka::cpp::rdkafka::producer::{FutureProducer, FutureRecord}; use test_helpers::connection::kafka::cpp::rdkafka::util::Timeout; +use test_helpers::connection::kafka::cpp::rdkafka::Message; use test_helpers::docker_compose::docker_compose; use tokio::{sync::mpsc::UnboundedSender, task::JoinHandle, time::Instant}; use windsock::{Bench, BenchParameters, Profiling, Report}; @@ -39,7 +41,9 @@ pub struct KafkaBench { #[derive(Clone)] pub enum Size { - B1, + // The smallest possible size is 12 bytes since any smaller would be + // unable to hold the timestamp used for measuring consumer latency + B12, KB1, KB100, } @@ -252,7 +256,7 @@ impl Bench for KafkaBench { self.topology.to_tag(), self.shotover.to_tag(), match self.message_size { - Size::B1 => ("size".to_owned(), "1B".to_owned()), + Size::B12 => ("size".to_owned(), "12B".to_owned()), Size::KB1 => ("size".to_owned(), "1KB".to_owned()), Size::KB100 => ("size".to_owned(), "100KB".to_owned()), }, @@ -440,12 +444,12 @@ impl Bench for KafkaBench { .unwrap(); let message = match &self.message_size { - Size::B1 => vec![0; 1], + Size::B12 => vec![0; 12], Size::KB1 => vec![0; 1024], Size::KB100 => vec![0; 1024 * 100], }; - let producer = BenchTaskProducerKafka { producer, message }; + let mut producer = BenchTaskProducerKafka { producer, message }; // ensure topic exists producer.produce_one().await.unwrap(); @@ -498,8 +502,9 @@ struct BenchTaskProducerKafka { #[async_trait] impl BenchTaskProducer for BenchTaskProducerKafka { - async fn produce_one(&self) -> Result<(), String> { + async fn produce_one(&mut self) -> Result<(), String> { // key is set to None which will result in round robin routing between all brokers + serialize_system_time(SystemTime::now(), &mut self.message); let record: FutureRecord<(), _> = FutureRecord::to("topic_foo").payload(&self.message); self.producer .send(record, Timeout::Never) @@ -514,7 +519,12 @@ async fn consume(consumer: &StreamConsumer, reporter: UnboundedSender) { let mut stream = consumer.stream(); loop { let report = match stream.next().await.unwrap() { - Ok(_) => Report::ConsumeCompleted, + Ok(record) => { + let produce_instant: SystemTime = + deserialize_system_time(record.payload().unwrap()); + // If time has gone backwards the results of this benchmark are compromised, so just unwrap elapsed() + Report::ConsumeCompletedIn(Some(produce_instant.elapsed().unwrap())) + } Err(err) => Report::ConsumeErrored { message: format!("{err:?}"), }, @@ -526,6 +536,23 @@ async fn consume(consumer: &StreamConsumer, reporter: UnboundedSender) { } } +/// Writes the system time into the first 12 bytes +fn serialize_system_time(time: SystemTime, dest: &mut [u8]) { + let duration_since_epoch = time.duration_since(SystemTime::UNIX_EPOCH).unwrap(); + let secs = duration_since_epoch.as_secs(); + let nanos = duration_since_epoch.subsec_nanos(); + dest[0..8].copy_from_slice(&secs.to_be_bytes()); + dest[8..12].copy_from_slice(&nanos.to_be_bytes()); +} + +fn deserialize_system_time(source: &[u8]) -> SystemTime { + let secs = u64::from_be_bytes(source[0..8].try_into().unwrap()); + let nanos = u32::from_be_bytes(source[8..12].try_into().unwrap()); + SystemTime::UNIX_EPOCH + .checked_add(Duration::new(secs, nanos)) + .unwrap() +} + fn spawn_consumer_tasks( consumer: StreamConsumer, reporter: UnboundedSender, @@ -547,7 +574,7 @@ fn spawn_consumer_tasks( #[async_trait] pub trait BenchTaskProducer: Clone + Send + Sync + 'static { - async fn produce_one(&self) -> Result<(), String>; + async fn produce_one(&mut self) -> Result<(), String>; async fn spawn_tasks( self, @@ -561,7 +588,7 @@ pub trait BenchTaskProducer: Clone + Send + Sync + 'static { let allocated_time_per_op = operations_per_second .map(|ops| (Duration::from_secs(1) * task_count as u32) / ops as u32); for _ in 0..task_count { - let task = self.clone(); + let mut task = self.clone(); let reporter = reporter.clone(); tasks.push(tokio::spawn(async move { let mut interval = allocated_time_per_op.map(tokio::time::interval); diff --git a/shotover-proxy/benches/windsock/kafka/mod.rs b/shotover-proxy/benches/windsock/kafka/mod.rs index d5c2100f6..2a38593ab 100644 --- a/shotover-proxy/benches/windsock/kafka/mod.rs +++ b/shotover-proxy/benches/windsock/kafka/mod.rs @@ -16,7 +16,7 @@ pub fn benches() -> Vec { KafkaTopology::Cluster1, KafkaTopology::Cluster3 ], - [Size::B1, Size::KB1, Size::KB100] + [Size::B12, Size::KB1, Size::KB100] ) .map(|(shotover, topology, size)| { Box::new(KafkaBench::new(shotover, topology, size)) as ShotoverBench