diff --git a/shotover-proxy/tests/kafka_int_tests/test_cases.rs b/shotover-proxy/tests/kafka_int_tests/test_cases.rs index 1879e10ab..cc38f343b 100644 --- a/shotover-proxy/tests/kafka_int_tests/test_cases.rs +++ b/shotover-proxy/tests/kafka_int_tests/test_cases.rs @@ -14,7 +14,7 @@ use tokio_bin_process::BinProcess; async fn admin_setup(connection_builder: &KafkaConnectionBuilder) { let admin = connection_builder.connect_admin().await; admin - .create_topics(&[ + .create_topics_and_wait(&[ NewTopic { name: "partitions1", num_partitions: 1, @@ -442,7 +442,7 @@ pub async fn produce_consume_partitions1_kafka_node_goes_down( { let admin = connection_builder.connect_admin().await; admin - .create_topics(&[NewTopic { + .create_topics_and_wait(&[NewTopic { name: topic_name, num_partitions: 1, replication_factor: 3, @@ -568,7 +568,7 @@ pub async fn produce_consume_partitions1_shotover_nodes_go_down( { let admin = connection_builder.connect_admin().await; admin - .create_topics(&[NewTopic { + .create_topics_and_wait(&[NewTopic { name: topic_name, num_partitions: 1, replication_factor: 3, @@ -1204,16 +1204,13 @@ async fn produce_consume_acks0(connection_builder: &KafkaConnectionBuilder) { pub async fn test_broker_idle_timeout(connection_builder: &KafkaConnectionBuilder) { let admin = connection_builder.connect_admin().await; admin - .create_topics(&[NewTopic { + .create_topics_and_wait(&[NewTopic { name: "partitions3", num_partitions: 3, replication_factor: 1, }]) .await; - // cpp driver hits race condition here - tokio::time::sleep(Duration::from_secs(2)).await; - let mut producer = connection_builder.connect_producer("all", 0).await; let mut consumer = connection_builder .connect_consumer( @@ -1286,7 +1283,7 @@ pub async fn standard_test_suite(connection_builder: &KafkaConnectionBuilder) { let admin = connection_builder.connect_admin().await; admin.delete_topics(&["partitions1"]).await; admin - .create_topics(&[NewTopic { + .create_topics_and_wait(&[NewTopic { name: "partitions1", num_partitions: 1, replication_factor: 1, @@ -1368,7 +1365,7 @@ pub async fn cluster_test_suite(connection_builder: &KafkaConnectionBuilder) { standard_test_suite(connection_builder).await; let admin = connection_builder.connect_admin().await; admin - .create_topics(&[ + .create_topics_and_wait(&[ NewTopic { name: "partitions1_rf3", num_partitions: 1, @@ -1381,7 +1378,6 @@ pub async fn cluster_test_suite(connection_builder: &KafkaConnectionBuilder) { }, ]) .await; - tokio::time::sleep(Duration::from_secs(10)).await; produce_consume_partitions1(connection_builder, "partitions1_rf3").await; produce_consume_partitions3(connection_builder, "partitions3_rf3", 1, 500).await; } @@ -1428,7 +1424,7 @@ pub async fn assert_topic_creation_is_denied_due_to_acl(connection: &KafkaConnec // * The request succeeds because user has AclOperation::Describe. // * But no topic is found since the topic creation was denied. assert_eq!( - admin.describe_topic("acl_check_topic").await.unwrap_err().to_string(), + admin.describe_topics(&["acl_check_topic"]).await.unwrap_err().to_string(), "org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.\n" ) } diff --git a/test-helpers/src/connection/kafka/java.rs b/test-helpers/src/connection/kafka/java.rs index d4cee9fbb..fcf76f449 100644 --- a/test-helpers/src/connection/kafka/java.rs +++ b/test-helpers/src/connection/kafka/java.rs @@ -2,6 +2,7 @@ use super::{ Acl, AclOperation, AclPermissionType, AlterConfig, ConsumerConfig, ExpectedResponse, ListOffsetsResultInfo, NewPartition, NewTopic, OffsetAndMetadata, OffsetSpec, ProduceResult, Record, ResourcePatternType, ResourceSpecifier, ResourceType, TopicDescription, TopicPartition, + TopicPartitionInfo, }; use crate::connection::java::{Jvm, Value}; use anyhow::Result; @@ -437,16 +438,39 @@ impl KafkaAdminJava { self.create_topics_fallible(topics).await.unwrap(); } - pub async fn describe_topic(&self, topic_name: &str) -> Result { - let topics = self - .jvm - .new_list("java.lang.String", vec![self.jvm.new_string(topic_name)]); + pub async fn describe_topics(&self, topic_names: &[&str]) -> Result> { + let topics = self.jvm.new_list( + "java.lang.String", + topic_names + .iter() + .map(|topic| self.jvm.new_string(topic)) + .collect(), + ); - self.admin + let topic_names_to_info = self + .admin .call("describeTopics", vec![topics]) .call_async_fallible("allTopicNames", vec![]) .await?; - Ok(TopicDescription {}) + + Ok(topic_names_to_info + .call("values", vec![]) + .call("iterator", vec![]) + .into_iter() + .map(|java_topic_description| { + let java_topic_description = + java_topic_description.cast("org.apache.kafka.clients.admin.TopicDescription"); + TopicDescription { + topic_name: java_topic_description.call("name", vec![]).into_rust(), + partitions: java_topic_description + .call("partitions", vec![]) + .call("iterator", vec![]) + .into_iter() + .map(|_| TopicPartitionInfo {}) + .collect(), + } + }) + .collect()) } pub async fn create_topics_fallible(&self, topics: &[NewTopic<'_>]) -> Result<()> { diff --git a/test-helpers/src/connection/kafka/mod.rs b/test-helpers/src/connection/kafka/mod.rs index 69ec0700f..5909916bf 100644 --- a/test-helpers/src/connection/kafka/mod.rs +++ b/test-helpers/src/connection/kafka/mod.rs @@ -1,7 +1,7 @@ use pretty_assertions::assert_eq; use std::{ collections::{HashMap, HashSet}, - time::Duration, + time::{Duration, Instant}, }; #[cfg(feature = "kafka-cpp-driver-tests")] @@ -348,11 +348,56 @@ impl KafkaAdmin { } } - pub async fn describe_topic(&self, topic_name: &str) -> Result { + pub async fn create_topics_and_wait(&self, topics: &[NewTopic<'_>]) { + self.create_topics(topics).await; + + match self { + #[cfg(feature = "kafka-cpp-driver-tests")] + KafkaAdmin::Cpp(_) => { + // rdkafka-rs driver doesnt support describe_topics so just wait instead :/ + tokio::time::sleep(Duration::from_secs(5)).await; + } + KafkaAdmin::Java(_) => { + let instant = Instant::now(); + let topics_to_wait_for: Vec<&str> = topics.iter().map(|x| x.name).collect(); + loop { + if self.are_topics_ready(&topics_to_wait_for).await { + break; + } else { + tokio::time::sleep(Duration::from_millis(1)).await; + if instant.elapsed() > Duration::from_secs(30) { + panic!("Timedout while waiting for created topics to be available. Was waiting for topics {topics_to_wait_for:?}") + } + } + } + } + } + } + + async fn are_topics_ready(&self, topics_to_wait_for: &[&str]) -> bool { match self { #[cfg(feature = "kafka-cpp-driver-tests")] KafkaAdmin::Cpp(_) => unimplemented!(), - KafkaAdmin::Java(java) => java.describe_topic(topic_name).await, + KafkaAdmin::Java(java) => match java.describe_topics(topics_to_wait_for).await { + Ok(topics) => { + for topic in topics { + if topic.partitions.is_empty() { + return false; + } + } + true + } + + Err(_) => false, + }, + } + } + + pub async fn describe_topics(&self, topic_names: &[&str]) -> Result> { + match self { + #[cfg(feature = "kafka-cpp-driver-tests")] + KafkaAdmin::Cpp(_) => unimplemented!(), + KafkaAdmin::Java(java) => java.describe_topics(topic_names).await, } } @@ -494,11 +539,13 @@ pub enum AclPermissionType { #[derive(Debug)] pub struct TopicDescription { - // None of our tests actually make use of the contents of TopicDescription, - // instead they just check if the describe succeeded or failed, - // so this is intentionally left empty for now + pub topic_name: String, + pub partitions: Vec, } +#[derive(Debug)] +pub struct TopicPartitionInfo {} + pub enum OffsetSpec { Earliest, Latest,