Skip to content

Commit

Permalink
?
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Feb 8, 2024
1 parent 7478e03 commit f8fd4a1
Showing 1 changed file with 25 additions and 8 deletions.
33 changes: 25 additions & 8 deletions shotover-proxy/benches/windsock/kafka/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ use shotover::transforms::TransformConfig;
use std::sync::Arc;
use std::{collections::HashMap, time::Duration};
use test_helpers::docker_compose::docker_compose;
use test_helpers::rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication};
use test_helpers::rdkafka::client::DefaultClientContext;
use test_helpers::rdkafka::config::ClientConfig;
use test_helpers::rdkafka::consumer::{Consumer, StreamConsumer};
use test_helpers::rdkafka::producer::{FutureProducer, FutureRecord};
Expand Down Expand Up @@ -392,9 +394,28 @@ impl Bench for KafkaBench {
// only one string field so we just directly store the value in resources
let broker_address = resources;

let admin: AdminClient<DefaultClientContext> = ClientConfig::new()
.set("bootstrap.servers", broker_address)
.create()
.unwrap();
admin
.create_topics(
&[NewTopic {
name: "topic_foo",
num_partitions: 3,
replication: TopicReplication::Fixed(1),
config: vec![],
}],
&AdminOptions::new()
.operation_timeout(Some(Timeout::After(Duration::from_secs(60)))),
)
.await
.unwrap();

let producer: FutureProducer = ClientConfig::new()
.set("bootstrap.servers", broker_address)
.set("message.timeout.ms", "5000")
.set("debug", "all")
.set("message.timeout.ms", "30000")
.create()
.unwrap();

Expand Down Expand Up @@ -458,14 +479,10 @@ struct BenchTaskProducerKafka {
#[async_trait]
impl BenchTaskProducer for BenchTaskProducerKafka {
async fn produce_one(&self) -> Result<(), String> {
let key = rand::random::<[u8; 4]>();
// key is set to None which will result in round robin routing between all brokers
let record: FutureRecord<(), _> = FutureRecord::to("topic_foo").payload(&self.message);
self.producer
.send(
FutureRecord::to("topic_foo")
.payload(&self.message)
.key(&key),
Timeout::Never,
)
.send(record, Timeout::Never)
.await
// Take just the error, ignoring the message contents because large messages result in unreadable noise in the logs.
.map_err(|e| format!("{:?}", e.0))
Expand Down

0 comments on commit f8fd4a1

Please sign in to comment.