Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor kafka_int_tests to use connection creator helper #1495

Merged
merged 2 commits into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions shotover-proxy/benches/windsock/kafka/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ use shotover::transforms::kafka::sink_single::KafkaSinkSingleConfig;
use shotover::transforms::TransformConfig;
use std::sync::Arc;
use std::{collections::HashMap, time::Duration};
use test_helpers::connection::kafka::rdkafka::admin::{
AdminClient, AdminOptions, NewTopic, TopicReplication,
};
use test_helpers::connection::kafka::rdkafka::client::DefaultClientContext;
use test_helpers::connection::kafka::rdkafka::config::ClientConfig;
use test_helpers::connection::kafka::rdkafka::consumer::{Consumer, StreamConsumer};
use test_helpers::connection::kafka::rdkafka::producer::{FutureProducer, FutureRecord};
use test_helpers::connection::kafka::rdkafka::util::Timeout;
use test_helpers::docker_compose::docker_compose;
use test_helpers::rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication};
use test_helpers::rdkafka::client::DefaultClientContext;
use test_helpers::rdkafka::config::ClientConfig;
use test_helpers::rdkafka::consumer::{Consumer, StreamConsumer};
use test_helpers::rdkafka::producer::{FutureProducer, FutureRecord};
use test_helpers::rdkafka::util::Timeout;
use tokio::{sync::mpsc::UnboundedSender, task::JoinHandle, time::Instant};
use windsock::{Bench, BenchParameters, Profiling, Report};

Expand Down
26 changes: 19 additions & 7 deletions shotover-proxy/tests/kafka_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ use test_helpers::docker_compose::docker_compose;
#[cfg(feature = "rdkafka-driver-tests")]
mod test_cases;

#[cfg(feature = "rdkafka-driver-tests")]
use test_helpers::connection::kafka::KafkaConnectionBuilder;

#[cfg(feature = "rdkafka-driver-tests")]
#[tokio::test]
async fn passthrough_standard() {
Expand All @@ -17,7 +20,8 @@ async fn passthrough_standard() {
.start()
.await;

test_cases::basic("127.0.0.1:9192").await;
let connection_builder = KafkaConnectionBuilder::new("127.0.0.1:9192");
test_cases::basic(connection_builder).await;

tokio::time::timeout(
Duration::from_secs(10),
Expand All @@ -38,7 +42,8 @@ async fn passthrough_tls() {
.start()
.await;

test_cases::basic("127.0.0.1:9192").await;
let connection_builder = KafkaConnectionBuilder::new("127.0.0.1:9192");
test_cases::basic(connection_builder).await;

tokio::time::timeout(
Duration::from_secs(10),
Expand All @@ -57,7 +62,8 @@ async fn passthrough_encode() {
.start()
.await;

test_cases::basic("127.0.0.1:9192").await;
let connection_builder = KafkaConnectionBuilder::new("127.0.0.1:9192");
test_cases::basic(connection_builder).await;

shotover.shutdown_and_then_consume_events(&[]).await;
}
Expand All @@ -71,7 +77,9 @@ async fn passthrough_sasl() {
.start()
.await;

test_cases::basic_sasl("127.0.0.1:9192").await;
let connection_builder =
KafkaConnectionBuilder::new("127.0.0.1:9192").use_sasl("user", "password");
test_cases::basic(connection_builder).await;

shotover.shutdown_and_then_consume_events(&[]).await;
}
Expand All @@ -86,7 +94,9 @@ async fn passthrough_sasl_encode() {
.start()
.await;

test_cases::basic_sasl("127.0.0.1:9192").await;
let connection_builder =
KafkaConnectionBuilder::new("127.0.0.1:9192").use_sasl("user", "password");
test_cases::basic(connection_builder).await;

shotover.shutdown_and_then_consume_events(&[]).await;
}
Expand All @@ -99,7 +109,8 @@ async fn cluster_single_shotover() {
.start()
.await;

test_cases::basic("127.0.0.1:9192").await;
let connection_builder = KafkaConnectionBuilder::new("127.0.0.1:9192");
test_cases::basic(connection_builder).await;

tokio::time::timeout(
Duration::from_secs(10),
Expand Down Expand Up @@ -128,7 +139,8 @@ async fn cluster_multi_shotover() {
);
}

test_cases::basic("127.0.0.1:9192").await;
let connection_builder = KafkaConnectionBuilder::new("127.0.0.1:9192");
test_cases::basic(connection_builder).await;

for shotover in shotovers {
tokio::time::timeout(
Expand Down
243 changes: 64 additions & 179 deletions shotover-proxy/tests/kafka_int_tests/test_cases.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
use std::time::Duration;
use test_helpers::rdkafka::admin::{
AdminClient, AdminOptions, AlterConfig, NewPartitions, NewTopic, OwnedResourceSpecifier,
ResourceSpecifier, TopicReplication,
use test_helpers::connection::kafka::rdkafka::admin::{
AdminOptions, AlterConfig, NewPartitions, NewTopic, OwnedResourceSpecifier, ResourceSpecifier,
TopicReplication,
};
use test_helpers::rdkafka::client::DefaultClientContext;
use test_helpers::rdkafka::config::ClientConfig;
use test_helpers::rdkafka::consumer::{Consumer, StreamConsumer};
use test_helpers::rdkafka::producer::{FutureProducer, FutureRecord};
use test_helpers::rdkafka::types::RDKafkaErrorCode;
use test_helpers::rdkafka::util::Timeout;
use test_helpers::rdkafka::Message;
use test_helpers::connection::kafka::rdkafka::types::RDKafkaErrorCode;
use test_helpers::connection::kafka::rdkafka::util::Timeout;
use test_helpers::connection::kafka::{ExpectedResponse, KafkaConnectionBuilder, Record};

async fn admin(config: ClientConfig) {
let admin: AdminClient<DefaultClientContext> = config.create().unwrap();
async fn admin(connection_builder: &KafkaConnectionBuilder) {
let admin = connection_builder.connect_admin().await;
admin
.create_topics(
&[
Expand Down Expand Up @@ -109,8 +105,8 @@ async fn admin(config: ClientConfig) {
}
}

async fn admin_cleanup(config: ClientConfig) {
let admin: AdminClient<DefaultClientContext> = config.create().unwrap();
async fn admin_cleanup(connection_builder: &KafkaConnectionBuilder) {
let admin = connection_builder.connect_admin().await;
let results = admin
// The cpp driver will lock up when running certain commands after a delete_groups if the delete_groups is targeted at a group that doesnt exist.
// So just make sure to run it against a group that does exist.
Expand All @@ -132,198 +128,87 @@ async fn admin_cleanup(config: ClientConfig) {
}
}

async fn assert_produce(
producer: &FutureProducer,
record: Record<'_>,
expected_offset: Option<i64>,
) {
let send = match record.key {
Some(key) => producer
.send_result(
FutureRecord::to(record.topic_name)
.payload(record.payload)
.key(key),
)
.unwrap(),
None => producer
.send_result(FutureRecord::<(), _>::to(record.topic_name).payload(record.payload))
.unwrap(),
};
let delivery_status = tokio::time::timeout(Duration::from_secs(30), send)
.await
.expect("Timeout while receiving from producer")
.unwrap()
.unwrap();
async fn produce_consume(connection_builder: &KafkaConnectionBuilder, topic_name: &str, i: i64) {
let producer = connection_builder.connect_producer(1).await;

if let Some(offset) = expected_offset {
assert_eq!(delivery_status.1, offset, "Unexpected offset");
}
}

struct Record<'a> {
payload: &'a str,
topic_name: &'a str,
key: Option<&'a str>,
}

async fn assert_consume(consumer: &StreamConsumer, response: ExpectedResponse<'_>) {
let message = tokio::time::timeout(Duration::from_secs(30), consumer.recv())
.await
.expect("Timeout while receiving from consumer")
.unwrap();
let contents = message.payload_view::<str>().unwrap().unwrap();
assert_eq!(response.message, contents);
assert_eq!(
response.key,
message.key().map(|x| std::str::from_utf8(x).unwrap())
);
assert_eq!(response.topic_name, message.topic());
assert_eq!(response.offset, message.offset());
}

struct ExpectedResponse<'a> {
message: &'a str,
key: Option<&'a str>,
topic_name: &'a str,
offset: i64,
}

async fn produce_consume(client: ClientConfig, topic_name: &str, i: i64) {
let producer: FutureProducer = client
.clone()
.set("message.timeout.ms", "5000")
.create()
.unwrap();

assert_produce(
&producer,
Record {
payload: "Message1",
topic_name,
key: Some("Key"),
},
Some(i * 2),
)
.await;
assert_produce(
&producer,
Record {
payload: "Message2",
topic_name,
key: None,
},
Some(i * 2 + 1),
)
.await;
producer
.assert_produce(
Record {
payload: "Message1",
topic_name,
key: Some("Key"),
},
Some(i * 2),
)
.await;
producer
.assert_produce(
Record {
payload: "Message2",
topic_name,
key: None,
},
Some(i * 2 + 1),
)
.await;

let consumer: StreamConsumer = client
.clone()
.set("group.id", "some_group")
.set("session.timeout.ms", "6000")
.set("auto.offset.reset", "earliest")
.set("enable.auto.commit", "false")
.create()
.unwrap();
consumer.subscribe(&[topic_name]).unwrap();
let consumer = connection_builder.connect_consumer(topic_name).await;

assert_consume(
&consumer,
ExpectedResponse {
consumer
.assert_consume(ExpectedResponse {
message: "Message1",
key: Some("Key"),
topic_name,
offset: 0,
},
)
.await;
assert_consume(
&consumer,
ExpectedResponse {
})
.await;
consumer
.assert_consume(ExpectedResponse {
message: "Message2",
key: None,
topic_name,
offset: 1,
},
)
.await;
})
.await;
}

async fn produce_consume_acks0(client: ClientConfig) {
async fn produce_consume_acks0(connection_builder: &KafkaConnectionBuilder) {
let topic_name = "acks0";
let producer: FutureProducer = client
.clone()
.set("message.timeout.ms", "5000")
.set("acks", "0")
.create()
.unwrap();
let producer = connection_builder.connect_producer(0).await;

for _ in 0..10 {
assert_produce(
&producer,
Record {
payload: "MessageAcks0",
topic_name,
key: Some("KeyAcks0"),
},
None,
)
.await;
producer
.assert_produce(
Record {
payload: "MessageAcks0",
topic_name,
key: Some("KeyAcks0"),
},
None,
)
.await;
}

let consumer: StreamConsumer = client
.clone()
.set("group.id", "some_group")
.set("session.timeout.ms", "6000")
.set("auto.offset.reset", "earliest")
.set("enable.auto.commit", "false")
.create()
.unwrap();
consumer.subscribe(&[topic_name]).unwrap();
let consumer = connection_builder.connect_consumer(topic_name).await;

for j in 0..10 {
assert_consume(
&consumer,
ExpectedResponse {
consumer
.assert_consume(ExpectedResponse {
message: "MessageAcks0",
key: Some("KeyAcks0"),
topic_name,
offset: j,
},
)
.await;
}
}

pub async fn basic(address: &str) {
let mut client = ClientConfig::new();
client
.set("bootstrap.servers", address)
// internal driver debug logs are emitted to tokio tracing, assuming the appropriate filter is used by the tracing subscriber
.set("debug", "all");
admin(client.clone()).await;
for i in 0..2 {
produce_consume(client.clone(), "partitions1", i).await;
produce_consume(client.clone(), "partitions3", i).await;
produce_consume_acks0(client.clone()).await;
})
.await;
}
admin_cleanup(client.clone()).await;
}

pub async fn basic_sasl(address: &str) {
let mut client = ClientConfig::new();
client
.set("bootstrap.servers", address)
.set("sasl.mechanisms", "PLAIN")
.set("sasl.username", "user")
.set("sasl.password", "password")
.set("security.protocol", "SASL_PLAINTEXT")
// internal driver debug logs are emitted to tokio tracing, assuming the appropriate filter is used by the tracing subscriber
.set("debug", "all");
admin(client.clone()).await;
pub async fn basic(connection_builder: KafkaConnectionBuilder) {
admin(&connection_builder).await;
for i in 0..2 {
produce_consume(client.clone(), "partitions1", i).await;
produce_consume(client.clone(), "partitions3", i).await;
produce_consume_acks0(client.clone()).await;
produce_consume(&connection_builder, "partitions1", i).await;
produce_consume(&connection_builder, "partitions3", i).await;
produce_consume_acks0(&connection_builder).await;
}
admin_cleanup(client.clone()).await;
admin_cleanup(&connection_builder).await;
}
Loading
Loading