diff --git a/shotover-proxy/tests/kafka_int_tests/mod.rs b/shotover-proxy/tests/kafka_int_tests/mod.rs index 55836542c..cd273c1ff 100644 --- a/shotover-proxy/tests/kafka_int_tests/mod.rs +++ b/shotover-proxy/tests/kafka_int_tests/mod.rs @@ -404,8 +404,8 @@ async fn cluster_2_racks_multi_shotover(#[case] driver: KafkaDriver) { #[cfg_attr(feature = "kafka-cpp-driver-tests", case::cpp(KafkaDriver::Cpp))] #[case::java(KafkaDriver::Java)] #[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear -async fn cluster_2_racks_multi_shotover_offline(#[case] driver: KafkaDriver) { - let docker_compose = +async fn cluster_2_racks_multi_shotover_one_shotover_node_goes_down(#[case] driver: KafkaDriver) { + let _docker_compose = docker_compose("tests/test-configs/kafka/cluster-2-racks/docker-compose.yaml"); // One shotover instance per rack @@ -425,15 +425,17 @@ async fn cluster_2_racks_multi_shotover_offline(#[case] driver: KafkaDriver) { } let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192"); + test_cases::cluster_test_suite(&connection_builder).await; - test_cases::produce_consume_partitions1_shotover_node_goes_down( - driver, - &docker_compose, - &connection_builder, - "shotover_node_goes_down_test", - "shotover1", + tokio::time::timeout( + Duration::from_secs(10), + shotovers.remove(0).shutdown_and_then_consume_events(&[]), ) - .await; + .await + .expect("Shotover did not shutdown within 10s"); + + // Wait for the other shotover node to detect the down peer + tokio::time::sleep(Duration::from_secs(10)).await; for shotover in shotovers { tokio::time::timeout( diff --git a/shotover-proxy/tests/kafka_int_tests/test_cases.rs b/shotover-proxy/tests/kafka_int_tests/test_cases.rs index b22d1e4b5..42414fda4 100644 --- a/shotover-proxy/tests/kafka_int_tests/test_cases.rs +++ b/shotover-proxy/tests/kafka_int_tests/test_cases.rs @@ -539,102 +539,6 @@ pub async fn produce_consume_partitions1_kafka_node_goes_down( } } -pub async fn produce_consume_partitions1_shotover_node_goes_down( - driver: KafkaDriver, - docker_compose: &DockerCompose, - connection_builder: &KafkaConnectionBuilder, - topic_name: &str, - shotover_node_to_kill: &str, -) { - if driver.is_cpp() { - return; - } - { - let admin = connection_builder.connect_admin().await; - admin - .create_topics(&[NewTopic { - name: topic_name, - num_partitions: 1, - replication_factor: 3, - }]) - .await; - } - - { - let producer = connection_builder.connect_producer("all", 0).await; - // create an initial record to force kafka to create the topic if it doesnt yet exist - producer - .assert_produce( - Record { - payload: "initial", - topic_name, - key: Some("Key".into()), - }, - Some(0), - ) - .await; - - let mut consumer = connection_builder - .connect_consumer( - ConsumerConfig::consume_from_topic(topic_name.to_owned()) - .with_group("shotover_node_goes_down_test_group"), - ) - .await; - consumer - .assert_consume(ExpectedResponse { - message: "initial".to_owned(), - key: Some("Key".to_owned()), - topic_name: topic_name.to_owned(), - offset: Some(0), - }) - .await; - - docker_compose.kill_service(shotover_node_to_kill); - - // create and consume records - for i in 0..5 { - producer - .assert_produce( - Record { - payload: "Message1", - topic_name, - key: Some("Key".into()), - }, - Some(i * 2 + 1), - ) - .await; - producer - .assert_produce( - Record { - payload: "Message2", - topic_name, - key: None, - }, - Some(i * 2 + 2), - ) - .await; - - consumer - .assert_consume(ExpectedResponse { - message: "Message1".to_owned(), - key: Some("Key".to_owned()), - topic_name: topic_name.to_owned(), - offset: Some(i * 2 + 1), - }) - .await; - - consumer - .assert_consume(ExpectedResponse { - message: "Message2".to_owned(), - key: None, - topic_name: topic_name.to_owned(), - offset: Some(i * 2 + 2), - }) - .await; - } - } -} - pub async fn produce_consume_commit_offsets_partitions1( connection_builder: &KafkaConnectionBuilder, topic_name: &str,