Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KafkaSinkCluster: run java driver against standard test suite #1645

Merged
merged 2 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 4 additions & 20 deletions shotover-proxy/tests/kafka_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,11 +265,7 @@ async fn cluster_1_rack_multi_shotover(#[case] driver: KafkaDriver) {
}

let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192");
match driver {
#[cfg(feature = "kafka-cpp-driver-tests")]
KafkaDriver::Cpp => test_cases::standard_test_suite(connection_builder).await,
KafkaDriver::Java => test_cases::minimal_test_suite(connection_builder).await,
}
test_cases::standard_test_suite(connection_builder).await;

for shotover in shotovers {
tokio::time::timeout(
Expand Down Expand Up @@ -306,11 +302,7 @@ async fn cluster_2_racks_multi_shotover(#[case] driver: KafkaDriver) {
}

let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192");
match driver {
#[cfg(feature = "kafka-cpp-driver-tests")]
KafkaDriver::Cpp => test_cases::standard_test_suite(connection_builder).await,
KafkaDriver::Java => test_cases::minimal_test_suite(connection_builder).await,
}
test_cases::standard_test_suite(connection_builder).await;

for shotover in shotovers {
tokio::time::timeout(
Expand Down Expand Up @@ -410,11 +402,7 @@ async fn cluster_sasl_scram_over_mtls_multi_shotover(#[case] driver: KafkaDriver

let connection_builder =
KafkaConnectionBuilder::new(driver, "127.0.0.1:9192").use_sasl_scram("user", "password");
match driver {
#[cfg(feature = "kafka-cpp-driver-tests")]
KafkaDriver::Cpp => test_cases::standard_test_suite(connection_builder).await,
KafkaDriver::Java => test_cases::minimal_test_suite(connection_builder).await,
}
test_cases::standard_test_suite(connection_builder).await;

for shotover in shotovers {
tokio::time::timeout(
Expand Down Expand Up @@ -450,11 +438,7 @@ async fn cluster_sasl_plain_multi_shotover(#[case] driver: KafkaDriver) {

let connection_builder =
KafkaConnectionBuilder::new(driver, "127.0.0.1:9192").use_sasl_plain("user", "password");
match driver {
#[cfg(feature = "kafka-cpp-driver-tests")]
KafkaDriver::Cpp => test_cases::standard_test_suite(connection_builder).await,
KafkaDriver::Java => test_cases::minimal_test_suite(connection_builder).await,
}
test_cases::standard_test_suite(connection_builder).await;

// Test invalid credentials
// We perform the regular test suite first in an attempt to catch a scenario
Expand Down
11 changes: 0 additions & 11 deletions shotover-proxy/tests/kafka_int_tests/test_cases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,14 +243,3 @@ pub async fn standard_test_suite(connection_builder: KafkaConnectionBuilder) {
produce_consume_acks0(&connection_builder).await;
connection_builder.admin_cleanup().await;
}

// TODO: get all tests passing on the standard_test_suite and then delete this function
pub async fn minimal_test_suite(connection_builder: KafkaConnectionBuilder) {
admin_setup(&connection_builder).await;
produce_consume_partitions1(&connection_builder, "partitions1").await;
// fails due to missing metadata on the unknown_topic (out of bounds error)
//produce_consume_partitions1(&connection_builder, "unknown_topic").await;
produce_consume_partitions3(&connection_builder).await;
produce_consume_acks0(&connection_builder).await;
connection_builder.admin_cleanup().await;
}
92 changes: 53 additions & 39 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -571,10 +571,18 @@ impl KafkaSinkCluster {
..
})) => {
for topic in metadata.topics.values() {
if let Some(err) = ResponseError::try_from_code(topic.error_code) {
return Err(anyhow!(
"Kafka responded to Metadata request with error {err:?}"
));
match ResponseError::try_from_code(topic.error_code) {
Some(ResponseError::UnknownTopicOrPartition) => {
// We need to look up all topics sent to us by the client
// but the client may request a topic that doesnt exist.
}
Some(err) => {
// Some other kind of error, better to terminate the connection
return Err(anyhow!(
"Kafka responded to Metadata request with error {err:?}"
));
}
None => {}
}
}
self.process_metadata_response(metadata).await
Expand Down Expand Up @@ -699,7 +707,12 @@ impl KafkaSinkCluster {
let destination = match connection {
Some(connection) => connection,
None => {
tracing::warn!("no known partition leader for {topic_name:?}, routing message to a random node so that a NOT_LEADER_OR_FOLLOWER or similar error is returned to the client");
tracing::debug!(
r#"no known partition leader for {topic_name:?}
routing message to a random node so that:
* if auto topic creation is enabled, auto topic creation will occur
* if auto topic creation is disabled a NOT_LEADER_OR_FOLLOWER is returned to the client"#
);
self.nodes.choose(&mut self.rng).unwrap().broker_id
}
};
Expand Down Expand Up @@ -1346,40 +1359,41 @@ impl KafkaSinkCluster {

self.controller_broker.set(metadata.controller_id);

for topic in &metadata.topics {
let mut partitions: Vec<_> = topic
.1
.partitions
.iter()
.map(|partition| Partition {
index: partition.partition_index,
leader_id: partition.leader_id,
shotover_rack_replica_nodes: partition
.replica_nodes
.iter()
.cloned()
.filter(|replica_node_id| self.broker_within_rack(*replica_node_id))
.collect(),
external_rack_replica_nodes: partition
.replica_nodes
.iter()
.cloned()
.filter(|replica_node_id| !self.broker_within_rack(*replica_node_id))
.collect(),
})
.collect();
partitions.sort_by_key(|x| x.index);
if !topic.0.is_empty() {
self.topic_by_name.insert(
topic.0.clone(),
Topic {
partitions: partitions.clone(),
},
);
}
if !topic.1.topic_id.is_nil() {
self.topic_by_id
.insert(topic.1.topic_id, Topic { partitions });
for (topic_name, topic) in &metadata.topics {
if ResponseError::try_from_code(topic.error_code).is_none() {
let mut partitions: Vec<_> = topic
.partitions
.iter()
.map(|partition| Partition {
index: partition.partition_index,
leader_id: partition.leader_id,
shotover_rack_replica_nodes: partition
.replica_nodes
.iter()
.cloned()
.filter(|replica_node_id| self.broker_within_rack(*replica_node_id))
.collect(),
external_rack_replica_nodes: partition
.replica_nodes
.iter()
.cloned()
.filter(|replica_node_id| !self.broker_within_rack(*replica_node_id))
.collect(),
})
.collect();
partitions.sort_by_key(|x| x.index);
if !topic_name.is_empty() {
self.topic_by_name.insert(
topic_name.clone(),
Topic {
partitions: partitions.clone(),
},
);
}
if !topic.topic_id.is_nil() {
self.topic_by_id
.insert(topic.topic_id, Topic { partitions });
}
}
}
}
Expand Down