diff --git a/shotover-proxy/benches/windsock/kafka/bench.rs b/shotover-proxy/benches/windsock/kafka/bench.rs index 537eb5267..dbf396a65 100644 --- a/shotover-proxy/benches/windsock/kafka/bench.rs +++ b/shotover-proxy/benches/windsock/kafka/bench.rs @@ -18,13 +18,15 @@ use shotover::transforms::kafka::sink_single::KafkaSinkSingleConfig; use shotover::transforms::TransformConfig; use std::sync::Arc; use std::{collections::HashMap, time::Duration}; +use test_helpers::connection::kafka::rdkafka::admin::{ + AdminClient, AdminOptions, NewTopic, TopicReplication, +}; +use test_helpers::connection::kafka::rdkafka::client::DefaultClientContext; +use test_helpers::connection::kafka::rdkafka::config::ClientConfig; +use test_helpers::connection::kafka::rdkafka::consumer::{Consumer, StreamConsumer}; +use test_helpers::connection::kafka::rdkafka::producer::{FutureProducer, FutureRecord}; +use test_helpers::connection::kafka::rdkafka::util::Timeout; use test_helpers::docker_compose::docker_compose; -use test_helpers::rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication}; -use test_helpers::rdkafka::client::DefaultClientContext; -use test_helpers::rdkafka::config::ClientConfig; -use test_helpers::rdkafka::consumer::{Consumer, StreamConsumer}; -use test_helpers::rdkafka::producer::{FutureProducer, FutureRecord}; -use test_helpers::rdkafka::util::Timeout; use tokio::{sync::mpsc::UnboundedSender, task::JoinHandle, time::Instant}; use windsock::{Bench, BenchParameters, Profiling, Report}; diff --git a/shotover-proxy/tests/kafka_int_tests/mod.rs b/shotover-proxy/tests/kafka_int_tests/mod.rs index 75e19c997..e6ff541f2 100644 --- a/shotover-proxy/tests/kafka_int_tests/mod.rs +++ b/shotover-proxy/tests/kafka_int_tests/mod.rs @@ -8,6 +8,9 @@ use test_helpers::docker_compose::docker_compose; #[cfg(feature = "rdkafka-driver-tests")] mod test_cases; +#[cfg(feature = "rdkafka-driver-tests")] +use test_helpers::connection::kafka::KafkaConnectionBuilder; + #[cfg(feature = "rdkafka-driver-tests")] #[tokio::test] async fn passthrough_standard() { @@ -17,7 +20,8 @@ async fn passthrough_standard() { .start() .await; - test_cases::basic("127.0.0.1:9192").await; + let connection_builder = KafkaConnectionBuilder::new("127.0.0.1:9192"); + test_cases::basic(connection_builder).await; tokio::time::timeout( Duration::from_secs(10), @@ -38,7 +42,8 @@ async fn passthrough_tls() { .start() .await; - test_cases::basic("127.0.0.1:9192").await; + let connection_builder = KafkaConnectionBuilder::new("127.0.0.1:9192"); + test_cases::basic(connection_builder).await; tokio::time::timeout( Duration::from_secs(10), @@ -57,7 +62,8 @@ async fn passthrough_encode() { .start() .await; - test_cases::basic("127.0.0.1:9192").await; + let connection_builder = KafkaConnectionBuilder::new("127.0.0.1:9192"); + test_cases::basic(connection_builder).await; shotover.shutdown_and_then_consume_events(&[]).await; } @@ -71,7 +77,9 @@ async fn passthrough_sasl() { .start() .await; - test_cases::basic_sasl("127.0.0.1:9192").await; + let connection_builder = + KafkaConnectionBuilder::new("127.0.0.1:9192").use_sasl("user", "password"); + test_cases::basic(connection_builder).await; shotover.shutdown_and_then_consume_events(&[]).await; } @@ -86,7 +94,9 @@ async fn passthrough_sasl_encode() { .start() .await; - test_cases::basic_sasl("127.0.0.1:9192").await; + let connection_builder = + KafkaConnectionBuilder::new("127.0.0.1:9192").use_sasl("user", "password"); + test_cases::basic(connection_builder).await; shotover.shutdown_and_then_consume_events(&[]).await; } @@ -99,7 +109,8 @@ async fn cluster_single_shotover() { .start() .await; - test_cases::basic("127.0.0.1:9192").await; + let connection_builder = KafkaConnectionBuilder::new("127.0.0.1:9192"); + test_cases::basic(connection_builder).await; tokio::time::timeout( Duration::from_secs(10), @@ -128,7 +139,8 @@ async fn cluster_multi_shotover() { ); } - test_cases::basic("127.0.0.1:9192").await; + let connection_builder = KafkaConnectionBuilder::new("127.0.0.1:9192"); + test_cases::basic(connection_builder).await; for shotover in shotovers { tokio::time::timeout( diff --git a/shotover-proxy/tests/kafka_int_tests/test_cases.rs b/shotover-proxy/tests/kafka_int_tests/test_cases.rs index a7d6be1cf..82b90eca3 100644 --- a/shotover-proxy/tests/kafka_int_tests/test_cases.rs +++ b/shotover-proxy/tests/kafka_int_tests/test_cases.rs @@ -1,18 +1,14 @@ use std::time::Duration; -use test_helpers::rdkafka::admin::{ - AdminClient, AdminOptions, AlterConfig, NewPartitions, NewTopic, OwnedResourceSpecifier, - ResourceSpecifier, TopicReplication, +use test_helpers::connection::kafka::rdkafka::admin::{ + AdminOptions, AlterConfig, NewPartitions, NewTopic, OwnedResourceSpecifier, ResourceSpecifier, + TopicReplication, }; -use test_helpers::rdkafka::client::DefaultClientContext; -use test_helpers::rdkafka::config::ClientConfig; -use test_helpers::rdkafka::consumer::{Consumer, StreamConsumer}; -use test_helpers::rdkafka::producer::{FutureProducer, FutureRecord}; -use test_helpers::rdkafka::types::RDKafkaErrorCode; -use test_helpers::rdkafka::util::Timeout; -use test_helpers::rdkafka::Message; +use test_helpers::connection::kafka::rdkafka::types::RDKafkaErrorCode; +use test_helpers::connection::kafka::rdkafka::util::Timeout; +use test_helpers::connection::kafka::{ExpectedResponse, KafkaConnectionBuilder, Record}; -async fn admin(config: ClientConfig) { - let admin: AdminClient = config.create().unwrap(); +async fn admin(connection_builder: &KafkaConnectionBuilder) { + let admin = connection_builder.connect_admin().await; admin .create_topics( &[ @@ -109,8 +105,8 @@ async fn admin(config: ClientConfig) { } } -async fn admin_cleanup(config: ClientConfig) { - let admin: AdminClient = config.create().unwrap(); +async fn admin_cleanup(connection_builder: &KafkaConnectionBuilder) { + let admin = connection_builder.connect_admin().await; let results = admin // The cpp driver will lock up when running certain commands after a delete_groups if the delete_groups is targeted at a group that doesnt exist. // So just make sure to run it against a group that does exist. @@ -132,198 +128,87 @@ async fn admin_cleanup(config: ClientConfig) { } } -async fn assert_produce( - producer: &FutureProducer, - record: Record<'_>, - expected_offset: Option, -) { - let send = match record.key { - Some(key) => producer - .send_result( - FutureRecord::to(record.topic_name) - .payload(record.payload) - .key(key), - ) - .unwrap(), - None => producer - .send_result(FutureRecord::<(), _>::to(record.topic_name).payload(record.payload)) - .unwrap(), - }; - let delivery_status = tokio::time::timeout(Duration::from_secs(30), send) - .await - .expect("Timeout while receiving from producer") - .unwrap() - .unwrap(); +async fn produce_consume(connection_builder: &KafkaConnectionBuilder, topic_name: &str, i: i64) { + let producer = connection_builder.connect_producer(1).await; - if let Some(offset) = expected_offset { - assert_eq!(delivery_status.1, offset, "Unexpected offset"); - } -} - -struct Record<'a> { - payload: &'a str, - topic_name: &'a str, - key: Option<&'a str>, -} - -async fn assert_consume(consumer: &StreamConsumer, response: ExpectedResponse<'_>) { - let message = tokio::time::timeout(Duration::from_secs(30), consumer.recv()) - .await - .expect("Timeout while receiving from consumer") - .unwrap(); - let contents = message.payload_view::().unwrap().unwrap(); - assert_eq!(response.message, contents); - assert_eq!( - response.key, - message.key().map(|x| std::str::from_utf8(x).unwrap()) - ); - assert_eq!(response.topic_name, message.topic()); - assert_eq!(response.offset, message.offset()); -} - -struct ExpectedResponse<'a> { - message: &'a str, - key: Option<&'a str>, - topic_name: &'a str, - offset: i64, -} - -async fn produce_consume(client: ClientConfig, topic_name: &str, i: i64) { - let producer: FutureProducer = client - .clone() - .set("message.timeout.ms", "5000") - .create() - .unwrap(); - - assert_produce( - &producer, - Record { - payload: "Message1", - topic_name, - key: Some("Key"), - }, - Some(i * 2), - ) - .await; - assert_produce( - &producer, - Record { - payload: "Message2", - topic_name, - key: None, - }, - Some(i * 2 + 1), - ) - .await; + producer + .assert_produce( + Record { + payload: "Message1", + topic_name, + key: Some("Key"), + }, + Some(i * 2), + ) + .await; + producer + .assert_produce( + Record { + payload: "Message2", + topic_name, + key: None, + }, + Some(i * 2 + 1), + ) + .await; - let consumer: StreamConsumer = client - .clone() - .set("group.id", "some_group") - .set("session.timeout.ms", "6000") - .set("auto.offset.reset", "earliest") - .set("enable.auto.commit", "false") - .create() - .unwrap(); - consumer.subscribe(&[topic_name]).unwrap(); + let consumer = connection_builder.connect_consumer(topic_name).await; - assert_consume( - &consumer, - ExpectedResponse { + consumer + .assert_consume(ExpectedResponse { message: "Message1", key: Some("Key"), topic_name, offset: 0, - }, - ) - .await; - assert_consume( - &consumer, - ExpectedResponse { + }) + .await; + consumer + .assert_consume(ExpectedResponse { message: "Message2", key: None, topic_name, offset: 1, - }, - ) - .await; + }) + .await; } -async fn produce_consume_acks0(client: ClientConfig) { +async fn produce_consume_acks0(connection_builder: &KafkaConnectionBuilder) { let topic_name = "acks0"; - let producer: FutureProducer = client - .clone() - .set("message.timeout.ms", "5000") - .set("acks", "0") - .create() - .unwrap(); + let producer = connection_builder.connect_producer(0).await; for _ in 0..10 { - assert_produce( - &producer, - Record { - payload: "MessageAcks0", - topic_name, - key: Some("KeyAcks0"), - }, - None, - ) - .await; + producer + .assert_produce( + Record { + payload: "MessageAcks0", + topic_name, + key: Some("KeyAcks0"), + }, + None, + ) + .await; } - let consumer: StreamConsumer = client - .clone() - .set("group.id", "some_group") - .set("session.timeout.ms", "6000") - .set("auto.offset.reset", "earliest") - .set("enable.auto.commit", "false") - .create() - .unwrap(); - consumer.subscribe(&[topic_name]).unwrap(); + let consumer = connection_builder.connect_consumer(topic_name).await; for j in 0..10 { - assert_consume( - &consumer, - ExpectedResponse { + consumer + .assert_consume(ExpectedResponse { message: "MessageAcks0", key: Some("KeyAcks0"), topic_name, offset: j, - }, - ) - .await; - } -} - -pub async fn basic(address: &str) { - let mut client = ClientConfig::new(); - client - .set("bootstrap.servers", address) - // internal driver debug logs are emitted to tokio tracing, assuming the appropriate filter is used by the tracing subscriber - .set("debug", "all"); - admin(client.clone()).await; - for i in 0..2 { - produce_consume(client.clone(), "partitions1", i).await; - produce_consume(client.clone(), "partitions3", i).await; - produce_consume_acks0(client.clone()).await; + }) + .await; } - admin_cleanup(client.clone()).await; } -pub async fn basic_sasl(address: &str) { - let mut client = ClientConfig::new(); - client - .set("bootstrap.servers", address) - .set("sasl.mechanisms", "PLAIN") - .set("sasl.username", "user") - .set("sasl.password", "password") - .set("security.protocol", "SASL_PLAINTEXT") - // internal driver debug logs are emitted to tokio tracing, assuming the appropriate filter is used by the tracing subscriber - .set("debug", "all"); - admin(client.clone()).await; +pub async fn basic(connection_builder: KafkaConnectionBuilder) { + admin(&connection_builder).await; for i in 0..2 { - produce_consume(client.clone(), "partitions1", i).await; - produce_consume(client.clone(), "partitions3", i).await; - produce_consume_acks0(client.clone()).await; + produce_consume(&connection_builder, "partitions1", i).await; + produce_consume(&connection_builder, "partitions3", i).await; + produce_consume_acks0(&connection_builder).await; } - admin_cleanup(client.clone()).await; + admin_cleanup(&connection_builder).await; } diff --git a/test-helpers/src/connection/kafka.rs b/test-helpers/src/connection/kafka.rs new file mode 100644 index 000000000..53e75e3c9 --- /dev/null +++ b/test-helpers/src/connection/kafka.rs @@ -0,0 +1,130 @@ +// Allow direct usage of the APIs when the feature is enabled +pub use rdkafka; + +use rdkafka::admin::AdminClient; +use rdkafka::client::DefaultClientContext; +use rdkafka::config::ClientConfig; +use rdkafka::consumer::{Consumer, StreamConsumer}; +use rdkafka::producer::{FutureProducer, FutureRecord}; + +use rdkafka::Message; +use std::time::Duration; + +pub struct KafkaConnectionBuilder { + client: ClientConfig, +} + +impl KafkaConnectionBuilder { + pub fn new(address: &str) -> Self { + let mut client = ClientConfig::new(); + client + .set("bootstrap.servers", address) + // internal driver debug logs are emitted to tokio tracing, assuming the appropriate filter is used by the tracing subscriber + .set("debug", "all"); + KafkaConnectionBuilder { client } + } + + pub fn use_sasl(mut self, user: &str, pass: &str) -> Self { + self.client.set("sasl.mechanisms", "PLAIN"); + self.client.set("sasl.username", user); + self.client.set("sasl.password", pass); + self.client.set("security.protocol", "SASL_PLAINTEXT"); + self + } + + pub async fn connect_producer(&self, acks: i32) -> KafkaProducer { + KafkaProducer { + producer: self + .client + .clone() + .set("message.timeout.ms", "5000") + .set("acks", &acks.to_string()) + .create() + .unwrap(), + } + } + + pub async fn connect_consumer(&self, topic_name: &str) -> KafkaConsumer { + let consumer: StreamConsumer = self + .client + .clone() + .set("group.id", "some_group") + .set("session.timeout.ms", "6000") + .set("auto.offset.reset", "earliest") + .set("enable.auto.commit", "false") + .create() + .unwrap(); + consumer.subscribe(&[topic_name]).unwrap(); + KafkaConsumer { consumer } + } + + pub async fn connect_admin(&self) -> AdminClient { + self.client.create().unwrap() + } +} + +pub struct KafkaProducer { + producer: FutureProducer, +} + +impl KafkaProducer { + pub async fn assert_produce(&self, record: Record<'_>, expected_offset: Option) { + let send = match record.key { + Some(key) => self + .producer + .send_result( + FutureRecord::to(record.topic_name) + .payload(record.payload) + .key(key), + ) + .unwrap(), + None => self + .producer + .send_result(FutureRecord::<(), _>::to(record.topic_name).payload(record.payload)) + .unwrap(), + }; + let delivery_status = tokio::time::timeout(Duration::from_secs(30), send) + .await + .expect("Timeout while receiving from producer") + .unwrap() + .unwrap(); + + if let Some(offset) = expected_offset { + assert_eq!(delivery_status.1, offset, "Unexpected offset"); + } + } +} + +pub struct Record<'a> { + pub payload: &'a str, + pub topic_name: &'a str, + pub key: Option<&'a str>, +} + +pub struct KafkaConsumer { + consumer: StreamConsumer, +} + +impl KafkaConsumer { + pub async fn assert_consume(&self, response: ExpectedResponse<'_>) { + let message = tokio::time::timeout(Duration::from_secs(30), self.consumer.recv()) + .await + .expect("Timeout while receiving from consumer") + .unwrap(); + let contents = message.payload_view::().unwrap().unwrap(); + assert_eq!(response.message, contents); + assert_eq!( + response.key, + message.key().map(|x| std::str::from_utf8(x).unwrap()) + ); + assert_eq!(response.topic_name, message.topic()); + assert_eq!(response.offset, message.offset()); + } +} + +pub struct ExpectedResponse<'a> { + pub message: &'a str, + pub key: Option<&'a str>, + pub topic_name: &'a str, + pub offset: i64, +} diff --git a/test-helpers/src/connection/mod.rs b/test-helpers/src/connection/mod.rs index 22f6a7436..3db4295fd 100644 --- a/test-helpers/src/connection/mod.rs +++ b/test-helpers/src/connection/mod.rs @@ -1,3 +1,6 @@ pub mod cassandra; + +#[cfg(feature = "rdkafka-driver-tests")] +pub mod kafka; // redis_connection is named differently to the cassandra module because it contains raw functions instead of a struct with methods pub mod redis_connection; diff --git a/test-helpers/src/lib.rs b/test-helpers/src/lib.rs index 4fe74b3e6..b704c8f48 100644 --- a/test-helpers/src/lib.rs +++ b/test-helpers/src/lib.rs @@ -9,9 +9,6 @@ mod test_tracing; use anyhow::{anyhow, Result}; use subprocess::{Exec, Redirection}; -#[cfg(feature = "rdkafka-driver-tests")] -pub use rdkafka; - /// Runs a command and returns the output as a string. /// /// Both stderr and stdout are returned in the result.