Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

windsock: measure consume latency #1753

Merged
merged 1 commit into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/windsock_benches.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ jobs:
# run some extra cases that arent handled by nextest
cargo windsock local-run --bench-length-seconds 5 --operations-per-second 100 --profilers flamegraph db=cassandra,compression=none,connection_count=1,driver=scylla,operation=read_i64,protocol=v4,shotover=standard,topology=single
cargo windsock local-run --bench-length-seconds 5 --operations-per-second 100 --profilers samply db=cassandra,compression=none,connection_count=1,driver=scylla,operation=read_i64,protocol=v4,shotover=standard,topology=single
cargo windsock local-run --bench-length-seconds 5 --operations-per-second 100 --profilers sys_monitor db=kafka,shotover=standard,size=1B,topology=single
cargo windsock local-run --bench-length-seconds 5 --operations-per-second 100 --profilers sys_monitor db=kafka,shotover=standard,size=12B,topology=single
cargo windsock local-run --bench-length-seconds 5 --operations-per-second 100 --profilers shotover_metrics db=redis,encryption=none,operation=get,shotover=standard,topology=single
- name: Ensure that tests did not create or modify any files that arent .gitignore'd
run: |
Expand Down
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion shotover-proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ tokio-bin-process.workspace = true
rustls-pemfile = "2.0.0"
rustls-pki-types = "1.1.0"
aws-throwaway.workspace = true
windsock = "0.1.0"
windsock = "0.2.0"
regex = "1.7.0"
opensearch = { version = "2.1.0", default-features = false, features = ["rustls-tls"] }
serde_json = "1.0.103"
Expand Down
49 changes: 41 additions & 8 deletions shotover-proxy/benches/windsock/kafka/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use shotover::transforms::kafka::sink_cluster::{KafkaSinkClusterConfig, Shotover
use shotover::transforms::kafka::sink_single::KafkaSinkSingleConfig;
use shotover::transforms::TransformConfig;
use std::sync::Arc;
use std::time::SystemTime;
use std::{collections::HashMap, time::Duration};
use test_helpers::connection::kafka::cpp::rdkafka::admin::{
AdminClient, AdminOptions, NewTopic, TopicReplication,
Expand All @@ -27,6 +28,7 @@ use test_helpers::connection::kafka::cpp::rdkafka::config::ClientConfig;
use test_helpers::connection::kafka::cpp::rdkafka::consumer::{Consumer, StreamConsumer};
use test_helpers::connection::kafka::cpp::rdkafka::producer::{FutureProducer, FutureRecord};
use test_helpers::connection::kafka::cpp::rdkafka::util::Timeout;
use test_helpers::connection::kafka::cpp::rdkafka::Message;
use test_helpers::docker_compose::docker_compose;
use tokio::{sync::mpsc::UnboundedSender, task::JoinHandle, time::Instant};
use windsock::{Bench, BenchParameters, Profiling, Report};
Expand All @@ -39,7 +41,9 @@ pub struct KafkaBench {

#[derive(Clone)]
pub enum Size {
B1,
// The smallest possible size is 12 bytes since any smaller would be
// unable to hold the timestamp used for measuring consumer latency
B12,
KB1,
KB100,
}
Expand Down Expand Up @@ -252,7 +256,7 @@ impl Bench for KafkaBench {
self.topology.to_tag(),
self.shotover.to_tag(),
match self.message_size {
Size::B1 => ("size".to_owned(), "1B".to_owned()),
Size::B12 => ("size".to_owned(), "12B".to_owned()),
Size::KB1 => ("size".to_owned(), "1KB".to_owned()),
Size::KB100 => ("size".to_owned(), "100KB".to_owned()),
},
Expand Down Expand Up @@ -440,12 +444,12 @@ impl Bench for KafkaBench {
.unwrap();

let message = match &self.message_size {
Size::B1 => vec![0; 1],
Size::B12 => vec![0; 12],
Size::KB1 => vec![0; 1024],
Size::KB100 => vec![0; 1024 * 100],
};

let producer = BenchTaskProducerKafka { producer, message };
let mut producer = BenchTaskProducerKafka { producer, message };

// ensure topic exists
producer.produce_one().await.unwrap();
Expand Down Expand Up @@ -498,7 +502,10 @@ struct BenchTaskProducerKafka {

#[async_trait]
impl BenchTaskProducer for BenchTaskProducerKafka {
async fn produce_one(&self) -> Result<(), String> {
async fn produce_one(&mut self) -> Result<(), String> {
// overwrite timestamp portion of the message with current timestamp
serialize_system_time(SystemTime::now(), &mut self.message);

// key is set to None which will result in round robin routing between all brokers
let record: FutureRecord<(), _> = FutureRecord::to("topic_foo").payload(&self.message);
self.producer
Expand All @@ -514,7 +521,12 @@ async fn consume(consumer: &StreamConsumer, reporter: UnboundedSender<Report>) {
let mut stream = consumer.stream();
loop {
let report = match stream.next().await.unwrap() {
Ok(_) => Report::ConsumeCompleted,
Ok(record) => {
let produce_instant: SystemTime =
deserialize_system_time(record.payload().unwrap());
// If time has gone backwards the results of this benchmark are compromised, so just unwrap elapsed()
Report::ConsumeCompletedIn(Some(produce_instant.elapsed().unwrap()))
}
Err(err) => Report::ConsumeErrored {
message: format!("{err:?}"),
},
Expand All @@ -526,6 +538,27 @@ async fn consume(consumer: &StreamConsumer, reporter: UnboundedSender<Report>) {
}
}

/// Writes the system time into the first 12 bytes.
///
/// SystemTime is used instead of Instance because Instance does not expose
/// a way to retrieve the inner value and it is therefore impossible to serialize/deserialize.
fn serialize_system_time(time: SystemTime, dest: &mut [u8]) {
let duration_since_epoch = time.duration_since(SystemTime::UNIX_EPOCH).unwrap();
let secs = duration_since_epoch.as_secs();
let nanos = duration_since_epoch.subsec_nanos();
dest[0..8].copy_from_slice(&secs.to_be_bytes());
dest[8..12].copy_from_slice(&nanos.to_be_bytes());
}

/// Reads the system time from the first 12 bytes
fn deserialize_system_time(source: &[u8]) -> SystemTime {
let secs = u64::from_be_bytes(source[0..8].try_into().unwrap());
let nanos = u32::from_be_bytes(source[8..12].try_into().unwrap());
SystemTime::UNIX_EPOCH
.checked_add(Duration::new(secs, nanos))
.unwrap()
}

fn spawn_consumer_tasks(
consumer: StreamConsumer,
reporter: UnboundedSender<Report>,
Expand All @@ -547,7 +580,7 @@ fn spawn_consumer_tasks(

#[async_trait]
pub trait BenchTaskProducer: Clone + Send + Sync + 'static {
async fn produce_one(&self) -> Result<(), String>;
async fn produce_one(&mut self) -> Result<(), String>;

async fn spawn_tasks(
self,
Expand All @@ -561,7 +594,7 @@ pub trait BenchTaskProducer: Clone + Send + Sync + 'static {
let allocated_time_per_op = operations_per_second
.map(|ops| (Duration::from_secs(1) * task_count as u32) / ops as u32);
for _ in 0..task_count {
let task = self.clone();
let mut task = self.clone();
let reporter = reporter.clone();
tasks.push(tokio::spawn(async move {
let mut interval = allocated_time_per_op.map(tokio::time::interval);
Expand Down
2 changes: 1 addition & 1 deletion shotover-proxy/benches/windsock/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub fn benches() -> Vec<ShotoverBench> {
KafkaTopology::Cluster1,
KafkaTopology::Cluster3
],
[Size::B1, Size::KB1, Size::KB100]
[Size::B12, Size::KB1, Size::KB100]
)
.map(|(shotover, topology, size)| {
Box::new(KafkaBench::new(shotover, topology, size)) as ShotoverBench
Expand Down
Loading