Skip to content

Commit

Permalink
windsock: measure consume latency (#1753)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Sep 20, 2024
1 parent 4853f84 commit 4693aab
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 13 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/windsock_benches.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ jobs:
# run some extra cases that arent handled by nextest
cargo windsock local-run --bench-length-seconds 5 --operations-per-second 100 --profilers flamegraph db=cassandra,compression=none,connection_count=1,driver=scylla,operation=read_i64,protocol=v4,shotover=standard,topology=single
cargo windsock local-run --bench-length-seconds 5 --operations-per-second 100 --profilers samply db=cassandra,compression=none,connection_count=1,driver=scylla,operation=read_i64,protocol=v4,shotover=standard,topology=single
cargo windsock local-run --bench-length-seconds 5 --operations-per-second 100 --profilers sys_monitor db=kafka,shotover=standard,size=1B,topology=single
cargo windsock local-run --bench-length-seconds 5 --operations-per-second 100 --profilers sys_monitor db=kafka,shotover=standard,size=12B,topology=single
cargo windsock local-run --bench-length-seconds 5 --operations-per-second 100 --profilers shotover_metrics db=redis,encryption=none,operation=get,shotover=standard,topology=single
- name: Ensure that tests did not create or modify any files that arent .gitignore'd
run: |
Expand Down
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion shotover-proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ tokio-bin-process.workspace = true
rustls-pemfile = "2.0.0"
rustls-pki-types = "1.1.0"
aws-throwaway.workspace = true
windsock = "0.1.0"
windsock = "0.2.0"
regex = "1.7.0"
opensearch = { version = "2.1.0", default-features = false, features = ["rustls-tls"] }
serde_json = "1.0.103"
Expand Down
49 changes: 41 additions & 8 deletions shotover-proxy/benches/windsock/kafka/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use shotover::transforms::kafka::sink_cluster::{KafkaSinkClusterConfig, Shotover
use shotover::transforms::kafka::sink_single::KafkaSinkSingleConfig;
use shotover::transforms::TransformConfig;
use std::sync::Arc;
use std::time::SystemTime;
use std::{collections::HashMap, time::Duration};
use test_helpers::connection::kafka::cpp::rdkafka::admin::{
AdminClient, AdminOptions, NewTopic, TopicReplication,
Expand All @@ -27,6 +28,7 @@ use test_helpers::connection::kafka::cpp::rdkafka::config::ClientConfig;
use test_helpers::connection::kafka::cpp::rdkafka::consumer::{Consumer, StreamConsumer};
use test_helpers::connection::kafka::cpp::rdkafka::producer::{FutureProducer, FutureRecord};
use test_helpers::connection::kafka::cpp::rdkafka::util::Timeout;
use test_helpers::connection::kafka::cpp::rdkafka::Message;
use test_helpers::docker_compose::docker_compose;
use tokio::{sync::mpsc::UnboundedSender, task::JoinHandle, time::Instant};
use windsock::{Bench, BenchParameters, Profiling, Report};
Expand All @@ -39,7 +41,9 @@ pub struct KafkaBench {

#[derive(Clone)]
pub enum Size {
B1,
// The smallest possible size is 12 bytes since any smaller would be
// unable to hold the timestamp used for measuring consumer latency
B12,
KB1,
KB100,
}
Expand Down Expand Up @@ -252,7 +256,7 @@ impl Bench for KafkaBench {
self.topology.to_tag(),
self.shotover.to_tag(),
match self.message_size {
Size::B1 => ("size".to_owned(), "1B".to_owned()),
Size::B12 => ("size".to_owned(), "12B".to_owned()),
Size::KB1 => ("size".to_owned(), "1KB".to_owned()),
Size::KB100 => ("size".to_owned(), "100KB".to_owned()),
},
Expand Down Expand Up @@ -440,12 +444,12 @@ impl Bench for KafkaBench {
.unwrap();

let message = match &self.message_size {
Size::B1 => vec![0; 1],
Size::B12 => vec![0; 12],
Size::KB1 => vec![0; 1024],
Size::KB100 => vec![0; 1024 * 100],
};

let producer = BenchTaskProducerKafka { producer, message };
let mut producer = BenchTaskProducerKafka { producer, message };

// ensure topic exists
producer.produce_one().await.unwrap();
Expand Down Expand Up @@ -498,7 +502,10 @@ struct BenchTaskProducerKafka {

#[async_trait]
impl BenchTaskProducer for BenchTaskProducerKafka {
async fn produce_one(&self) -> Result<(), String> {
async fn produce_one(&mut self) -> Result<(), String> {
// overwrite timestamp portion of the message with current timestamp
serialize_system_time(SystemTime::now(), &mut self.message);

// key is set to None which will result in round robin routing between all brokers
let record: FutureRecord<(), _> = FutureRecord::to("topic_foo").payload(&self.message);
self.producer
Expand All @@ -514,7 +521,12 @@ async fn consume(consumer: &StreamConsumer, reporter: UnboundedSender<Report>) {
let mut stream = consumer.stream();
loop {
let report = match stream.next().await.unwrap() {
Ok(_) => Report::ConsumeCompleted,
Ok(record) => {
let produce_instant: SystemTime =
deserialize_system_time(record.payload().unwrap());
// If time has gone backwards the results of this benchmark are compromised, so just unwrap elapsed()
Report::ConsumeCompletedIn(Some(produce_instant.elapsed().unwrap()))
}
Err(err) => Report::ConsumeErrored {
message: format!("{err:?}"),
},
Expand All @@ -526,6 +538,27 @@ async fn consume(consumer: &StreamConsumer, reporter: UnboundedSender<Report>) {
}
}

/// Writes the system time into the first 12 bytes.
///
/// SystemTime is used instead of Instance because Instance does not expose
/// a way to retrieve the inner value and it is therefore impossible to serialize/deserialize.
fn serialize_system_time(time: SystemTime, dest: &mut [u8]) {
let duration_since_epoch = time.duration_since(SystemTime::UNIX_EPOCH).unwrap();
let secs = duration_since_epoch.as_secs();
let nanos = duration_since_epoch.subsec_nanos();
dest[0..8].copy_from_slice(&secs.to_be_bytes());
dest[8..12].copy_from_slice(&nanos.to_be_bytes());
}

/// Reads the system time from the first 12 bytes
fn deserialize_system_time(source: &[u8]) -> SystemTime {
let secs = u64::from_be_bytes(source[0..8].try_into().unwrap());
let nanos = u32::from_be_bytes(source[8..12].try_into().unwrap());
SystemTime::UNIX_EPOCH
.checked_add(Duration::new(secs, nanos))
.unwrap()
}

fn spawn_consumer_tasks(
consumer: StreamConsumer,
reporter: UnboundedSender<Report>,
Expand All @@ -547,7 +580,7 @@ fn spawn_consumer_tasks(

#[async_trait]
pub trait BenchTaskProducer: Clone + Send + Sync + 'static {
async fn produce_one(&self) -> Result<(), String>;
async fn produce_one(&mut self) -> Result<(), String>;

async fn spawn_tasks(
self,
Expand All @@ -561,7 +594,7 @@ pub trait BenchTaskProducer: Clone + Send + Sync + 'static {
let allocated_time_per_op = operations_per_second
.map(|ops| (Duration::from_secs(1) * task_count as u32) / ops as u32);
for _ in 0..task_count {
let task = self.clone();
let mut task = self.clone();
let reporter = reporter.clone();
tasks.push(tokio::spawn(async move {
let mut interval = allocated_time_per_op.map(tokio::time::interval);
Expand Down
2 changes: 1 addition & 1 deletion shotover-proxy/benches/windsock/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub fn benches() -> Vec<ShotoverBench> {
KafkaTopology::Cluster1,
KafkaTopology::Cluster3
],
[Size::B1, Size::KB1, Size::KB100]
[Size::B12, Size::KB1, Size::KB100]
)
.map(|(shotover, topology, size)| {
Box::new(KafkaBench::new(shotover, topology, size)) as ShotoverBench
Expand Down

0 comments on commit 4693aab

Please sign in to comment.