Skip to content

Commit

Permalink
Test kafka with replication_factor 3
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Aug 30, 2024
1 parent 57a1362 commit c821fae
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 43 deletions.
16 changes: 8 additions & 8 deletions shotover-proxy/tests/kafka_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ async fn cluster_tls(#[case] driver: KafkaDriver) {

let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192")
.use_tls("tests/test-configs/kafka/tls/certs");
test_cases::standard_test_suite(&connection_builder).await;
test_cases::cluster_test_suite(&connection_builder).await;

tokio::time::timeout(
Duration::from_secs(10),
Expand All @@ -139,7 +139,7 @@ async fn cluster_mtls(#[case] driver: KafkaDriver) {

let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192")
.use_tls("tests/test-configs/kafka/tls/certs");
test_cases::standard_test_suite(&connection_builder).await;
test_cases::cluster_test_suite(&connection_builder).await;

tokio::time::timeout(
Duration::from_secs(10),
Expand Down Expand Up @@ -254,7 +254,7 @@ async fn cluster_1_rack_single_shotover(#[case] driver: KafkaDriver) {
.await;

let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192");
test_cases::standard_test_suite(&connection_builder).await;
test_cases::cluster_test_suite(&connection_builder).await;

tokio::time::timeout(
Duration::from_secs(10),
Expand Down Expand Up @@ -287,7 +287,7 @@ async fn cluster_1_rack_multi_shotover(#[case] driver: KafkaDriver) {
}

let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192");
test_cases::standard_test_suite(&connection_builder).await;
test_cases::cluster_test_suite(&connection_builder).await;

for shotover in shotovers {
tokio::time::timeout(
Expand Down Expand Up @@ -324,7 +324,7 @@ async fn cluster_2_racks_multi_shotover(#[case] driver: KafkaDriver) {
}

let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192");
test_cases::standard_test_suite(&connection_builder).await;
test_cases::cluster_test_suite(&connection_builder).await;

for shotover in shotovers {
tokio::time::timeout(
Expand Down Expand Up @@ -394,7 +394,7 @@ async fn cluster_sasl_scram_over_mtls_single_shotover(#[case] driver: KafkaDrive
let connection_super = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192")
.use_sasl_scram("super_user", "super_password");
setup_basic_user_acls(&connection_super, "basic_user").await;
test_cases::standard_test_suite(&connection_super).await;
test_cases::cluster_test_suite(&connection_super).await;
assert_connection_fails_with_incorrect_password(driver, "super_user").await;

// admin requests sent by basic user are unsuccessful
Expand Down Expand Up @@ -505,7 +505,7 @@ async fn cluster_sasl_scram_over_mtls_multi_shotover(#[case] driver: KafkaDriver
let instant = Instant::now();
let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192")
.use_sasl_scram("super_user", "super_password");
test_cases::standard_test_suite(&connection_builder).await;
test_cases::cluster_test_suite(&connection_builder).await;

// Wait 20s since we started the initial run to ensure that we hit the 15s token lifetime limit
tokio::time::sleep_until((instant + Duration::from_secs(20)).into()).await;
Expand Down Expand Up @@ -549,7 +549,7 @@ async fn cluster_sasl_plain_multi_shotover(#[case] driver: KafkaDriver) {

let connection_builder =
KafkaConnectionBuilder::new(driver, "127.0.0.1:9192").use_sasl_plain("user", "password");
test_cases::standard_test_suite(&connection_builder).await;
test_cases::cluster_test_suite(&connection_builder).await;

// Test invalid credentials
// We perform the regular test suite first in an attempt to catch a scenario
Expand Down
38 changes: 31 additions & 7 deletions shotover-proxy/tests/kafka_int_tests/test_cases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ pub async fn produce_consume_partitions1(
topic_name: &str,
) {
{
let producer = connection_builder.connect_producer(1).await;
let producer = connection_builder.connect_producer("all").await;
// create an initial record to force kafka to create the topic if it doesnt yet exist
producer
.assert_produce(
Expand Down Expand Up @@ -174,7 +174,7 @@ pub async fn produce_consume_commit_offsets_partitions1(
topic_name: &str,
) {
{
let producer = connection_builder.connect_producer(1).await;
let producer = connection_builder.connect_producer("1").await;
producer
.assert_produce(
Record {
Expand Down Expand Up @@ -295,9 +295,11 @@ pub async fn produce_consume_commit_offsets_partitions1(
}
}

async fn produce_consume_partitions3(connection_builder: &KafkaConnectionBuilder) {
let topic_name = "partitions3";
let producer = connection_builder.connect_producer(1).await;
async fn produce_consume_partitions3(
connection_builder: &KafkaConnectionBuilder,
topic_name: &str,
) {
let producer = connection_builder.connect_producer("1").await;
let mut consumer = connection_builder
.connect_consumer(topic_name, "some_group")
.await;
Expand Down Expand Up @@ -346,7 +348,7 @@ async fn produce_consume_partitions3(connection_builder: &KafkaConnectionBuilder

async fn produce_consume_acks0(connection_builder: &KafkaConnectionBuilder) {
let topic_name = "acks0";
let producer = connection_builder.connect_producer(0).await;
let producer = connection_builder.connect_producer("0").await;

for _ in 0..10 {
producer
Expand Down Expand Up @@ -382,7 +384,7 @@ pub async fn standard_test_suite(connection_builder: &KafkaConnectionBuilder) {
produce_consume_partitions1(connection_builder, "partitions1").await;
produce_consume_partitions1(connection_builder, "unknown_topic").await;
produce_consume_commit_offsets_partitions1(connection_builder, "partitions1_with_offset").await;
produce_consume_partitions3(connection_builder).await;
produce_consume_partitions3(connection_builder, "partitions3").await;

// Only run this test case on the java driver,
// since even without going through shotover the cpp driver fails this test.
Expand All @@ -405,6 +407,28 @@ pub async fn standard_test_suite(connection_builder: &KafkaConnectionBuilder) {
connection_builder.admin_cleanup().await;
}

pub async fn cluster_test_suite(connection_builder: &KafkaConnectionBuilder) {
//standard_test_suite(connection_builder).await;
let admin = connection_builder.connect_admin().await;
admin
.create_topics(&[
NewTopic {
name: "partitions1_rf3",
num_partitions: 1,
replication_factor: 3,
},
NewTopic {
name: "partitions3_rf3",
num_partitions: 3,
replication_factor: 3,
},
])
.await;
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
produce_consume_partitions1(connection_builder, "partitions1_rf3").await;
//produce_consume_partitions3(connection_builder, "partitions3_rf3").await;
}

pub async fn setup_basic_user_acls(connection: &KafkaConnectionBuilder, username: &str) {
let admin = connection.connect_admin().await;
admin
Expand Down
34 changes: 10 additions & 24 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -825,32 +825,18 @@ routing message to a random node so that:
let destination = if let Some(partition) =
topic_meta.partitions.get(partition_index)
{
if let Some(node) = self
.nodes
.iter_mut()
.filter(|node| {
partition
.shotover_rack_replica_nodes
.contains(&node.broker_id)
})
.choose(&mut self.rng)
{
node.broker_id
} else {
tracing::debug!(
"Routing fetch request to replica outside of shotover's rack"
// While technically kafka has some support for fetching from replicas, its quite weird.
// See https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica
// We should never route to replica_nodes from the metadata response ourselves.
// Instead, when its available, we can make use of preferred_read_replica field in the fetch response as an optimization.
// However its always correct to route to the partition.leader_id which is what we do here.
if partition.leader_id == -1 {
let topic_name = Self::format_topic_name(&topic);
tracing::warn!(
"leader_id is unknown for topic {topic_name} at partition index {partition_index}"
);
self.nodes
.iter_mut()
.filter(|node| {
partition
.external_rack_replica_nodes
.contains(&node.broker_id)
})
.choose(&mut self.rng)
.unwrap()
.broker_id
}
partition.leader_id
} else {
let partition_len = topic_meta.partitions.len();
let topic_name = Self::format_topic_name(&topic);
Expand Down
4 changes: 2 additions & 2 deletions test-helpers/src/connection/kafka/cpp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,13 @@ impl KafkaConnectionBuilderCpp {
self
}

pub async fn connect_producer(&self, acks: i32) -> KafkaProducerCpp {
pub async fn connect_producer(&self, acks: &str) -> KafkaProducerCpp {
KafkaProducerCpp {
producer: self
.client
.clone()
.set("message.timeout.ms", "5000")
.set("acks", acks.to_string())
.set("acks", acks)
.create()
.unwrap(),
}
Expand Down
2 changes: 1 addition & 1 deletion test-helpers/src/connection/kafka/java.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl KafkaConnectionBuilderJava {
self
}

pub async fn connect_producer(&self, acks: i32) -> KafkaProducerJava {
pub async fn connect_producer(&self, acks: &str) -> KafkaProducerJava {
let mut config = self.base_config.clone();
config.insert("acks".to_owned(), acks.to_string());
config.insert(
Expand Down
2 changes: 1 addition & 1 deletion test-helpers/src/connection/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl KafkaConnectionBuilder {
}
}

pub async fn connect_producer(&self, acks: i32) -> KafkaProducer {
pub async fn connect_producer(&self, acks: &str) -> KafkaProducer {
match self {
#[cfg(feature = "kafka-cpp-driver-tests")]
Self::Cpp(cpp) => KafkaProducer::Cpp(cpp.connect_producer(acks).await),
Expand Down

0 comments on commit c821fae

Please sign in to comment.