diff --git a/shotover-proxy/tests/kafka_int_tests/mod.rs b/shotover-proxy/tests/kafka_int_tests/mod.rs index dac4d64ff..34de2a307 100644 --- a/shotover-proxy/tests/kafka_int_tests/mod.rs +++ b/shotover-proxy/tests/kafka_int_tests/mod.rs @@ -265,11 +265,7 @@ async fn cluster_1_rack_multi_shotover(#[case] driver: KafkaDriver) { } let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192"); - match driver { - #[cfg(feature = "kafka-cpp-driver-tests")] - KafkaDriver::Cpp => test_cases::standard_test_suite(connection_builder).await, - KafkaDriver::Java => test_cases::minimal_test_suite(connection_builder).await, - } + test_cases::standard_test_suite(connection_builder).await; for shotover in shotovers { tokio::time::timeout( @@ -306,11 +302,7 @@ async fn cluster_2_racks_multi_shotover(#[case] driver: KafkaDriver) { } let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192"); - match driver { - #[cfg(feature = "kafka-cpp-driver-tests")] - KafkaDriver::Cpp => test_cases::standard_test_suite(connection_builder).await, - KafkaDriver::Java => test_cases::minimal_test_suite(connection_builder).await, - } + test_cases::standard_test_suite(connection_builder).await; for shotover in shotovers { tokio::time::timeout( @@ -410,11 +402,7 @@ async fn cluster_sasl_scram_over_mtls_multi_shotover(#[case] driver: KafkaDriver let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192").use_sasl_scram("user", "password"); - match driver { - #[cfg(feature = "kafka-cpp-driver-tests")] - KafkaDriver::Cpp => test_cases::standard_test_suite(connection_builder).await, - KafkaDriver::Java => test_cases::minimal_test_suite(connection_builder).await, - } + test_cases::standard_test_suite(connection_builder).await; for shotover in shotovers { tokio::time::timeout( @@ -450,11 +438,7 @@ async fn cluster_sasl_plain_multi_shotover(#[case] driver: KafkaDriver) { let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192").use_sasl_plain("user", "password"); - match driver { - #[cfg(feature = "kafka-cpp-driver-tests")] - KafkaDriver::Cpp => test_cases::standard_test_suite(connection_builder).await, - KafkaDriver::Java => test_cases::minimal_test_suite(connection_builder).await, - } + test_cases::standard_test_suite(connection_builder).await; // Test invalid credentials // We perform the regular test suite first in an attempt to catch a scenario diff --git a/shotover-proxy/tests/kafka_int_tests/test_cases.rs b/shotover-proxy/tests/kafka_int_tests/test_cases.rs index 368d43270..1cad5dc81 100644 --- a/shotover-proxy/tests/kafka_int_tests/test_cases.rs +++ b/shotover-proxy/tests/kafka_int_tests/test_cases.rs @@ -243,14 +243,3 @@ pub async fn standard_test_suite(connection_builder: KafkaConnectionBuilder) { produce_consume_acks0(&connection_builder).await; connection_builder.admin_cleanup().await; } - -// TODO: get all tests passing on the standard_test_suite and then delete this function -pub async fn minimal_test_suite(connection_builder: KafkaConnectionBuilder) { - admin_setup(&connection_builder).await; - produce_consume_partitions1(&connection_builder, "partitions1").await; - // fails due to missing metadata on the unknown_topic (out of bounds error) - //produce_consume_partitions1(&connection_builder, "unknown_topic").await; - produce_consume_partitions3(&connection_builder).await; - produce_consume_acks0(&connection_builder).await; - connection_builder.admin_cleanup().await; -} diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index e398697b1..edf131cd2 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -571,10 +571,18 @@ impl KafkaSinkCluster { .. })) => { for topic in metadata.topics.values() { - if let Some(err) = ResponseError::try_from_code(topic.error_code) { - return Err(anyhow!( - "Kafka responded to Metadata request with error {err:?}" - )); + match ResponseError::try_from_code(topic.error_code) { + Some(ResponseError::UnknownTopicOrPartition) => { + // We need to look up all topics sent to us by the client + // but the client may request a topic that doesnt exist. + } + Some(err) => { + // Some other kind of error, better to terminate the connection + return Err(anyhow!( + "Kafka responded to Metadata request with error {err:?}" + )); + } + None => {} } } self.process_metadata_response(metadata).await @@ -699,7 +707,12 @@ impl KafkaSinkCluster { let destination = match connection { Some(connection) => connection, None => { - tracing::warn!("no known partition leader for {topic_name:?}, routing message to a random node so that a NOT_LEADER_OR_FOLLOWER or similar error is returned to the client"); + tracing::debug!( + r#"no known partition leader for {topic_name:?} +routing message to a random node so that: +* if auto topic creation is enabled, auto topic creation will occur +* if auto topic creation is disabled a NOT_LEADER_OR_FOLLOWER is returned to the client"# + ); self.nodes.choose(&mut self.rng).unwrap().broker_id } }; @@ -1346,40 +1359,41 @@ impl KafkaSinkCluster { self.controller_broker.set(metadata.controller_id); - for topic in &metadata.topics { - let mut partitions: Vec<_> = topic - .1 - .partitions - .iter() - .map(|partition| Partition { - index: partition.partition_index, - leader_id: partition.leader_id, - shotover_rack_replica_nodes: partition - .replica_nodes - .iter() - .cloned() - .filter(|replica_node_id| self.broker_within_rack(*replica_node_id)) - .collect(), - external_rack_replica_nodes: partition - .replica_nodes - .iter() - .cloned() - .filter(|replica_node_id| !self.broker_within_rack(*replica_node_id)) - .collect(), - }) - .collect(); - partitions.sort_by_key(|x| x.index); - if !topic.0.is_empty() { - self.topic_by_name.insert( - topic.0.clone(), - Topic { - partitions: partitions.clone(), - }, - ); - } - if !topic.1.topic_id.is_nil() { - self.topic_by_id - .insert(topic.1.topic_id, Topic { partitions }); + for (topic_name, topic) in &metadata.topics { + if ResponseError::try_from_code(topic.error_code).is_none() { + let mut partitions: Vec<_> = topic + .partitions + .iter() + .map(|partition| Partition { + index: partition.partition_index, + leader_id: partition.leader_id, + shotover_rack_replica_nodes: partition + .replica_nodes + .iter() + .cloned() + .filter(|replica_node_id| self.broker_within_rack(*replica_node_id)) + .collect(), + external_rack_replica_nodes: partition + .replica_nodes + .iter() + .cloned() + .filter(|replica_node_id| !self.broker_within_rack(*replica_node_id)) + .collect(), + }) + .collect(); + partitions.sort_by_key(|x| x.index); + if !topic_name.is_empty() { + self.topic_by_name.insert( + topic_name.clone(), + Topic { + partitions: partitions.clone(), + }, + ); + } + if !topic.topic_id.is_nil() { + self.topic_by_id + .insert(topic.topic_id, Topic { partitions }); + } } } }