From 6003851c6a42832839bb9fb8a017c355d2d8a4df Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Tue, 11 Jun 2024 12:43:41 +1000 Subject: [PATCH] Replace smoke_test with produce_consume_partitions1 --- shotover-proxy/tests/kafka_int_tests/mod.rs | 5 +++-- .../tests/kafka_int_tests/test_cases.rs | 17 ----------------- .../src/transforms/kafka/sink_cluster/mod.rs | 11 +++++++---- 3 files changed, 10 insertions(+), 23 deletions(-) diff --git a/shotover-proxy/tests/kafka_int_tests/mod.rs b/shotover-proxy/tests/kafka_int_tests/mod.rs index bc2abc6d0..bff1272ac 100644 --- a/shotover-proxy/tests/kafka_int_tests/mod.rs +++ b/shotover-proxy/tests/kafka_int_tests/mod.rs @@ -5,7 +5,7 @@ use pretty_assertions::assert_eq; use rstest::rstest; use std::time::Duration; use std::time::Instant; -use test_cases::smoke_test; +use test_cases::produce_consume_partitions1; use test_cases::{assert_topic_creation_is_denied_due_to_acl, setup_basic_user_acls}; use test_helpers::connection::kafka::{KafkaConnectionBuilder, KafkaDriver}; use test_helpers::docker_compose::docker_compose; @@ -411,7 +411,8 @@ async fn cluster_sasl_scram_over_mtls_single_shotover(#[case] driver: KafkaDrive // admin requests sent by regular user remain unsuccessful let connection_super = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192") .use_sasl_scram("super_user", "super_password"); - smoke_test(&connection_super).await; + produce_consume_partitions1(&connection_super, "c3220ff0-9390-425d-a56d-9d880a339c8c") + .await; assert_topic_creation_is_denied_due_to_acl(&connection_basic).await; assert_connection_fails_with_incorrect_password(driver, "basic_user").await; assert_connection_fails_with_incorrect_password(driver, "super_user").await; diff --git a/shotover-proxy/tests/kafka_int_tests/test_cases.rs b/shotover-proxy/tests/kafka_int_tests/test_cases.rs index 21712bc4e..1578da805 100644 --- a/shotover-proxy/tests/kafka_int_tests/test_cases.rs +++ b/shotover-proxy/tests/kafka_int_tests/test_cases.rs @@ -290,20 +290,3 @@ pub async fn assert_topic_creation_is_denied_due_to_acl(connection: &KafkaConnec "org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.\n" ) } - -// A quick test without running the whole test suite -// Assumes that admin_setup or standard_test_suite has already been run. -pub async fn smoke_test(connection_builder: &KafkaConnectionBuilder) { - let producer = connection_builder.connect_producer(1).await; - - producer - .assert_produce( - Record { - payload: "initial", - topic_name: "partitions1", - key: Some("Key"), - }, - None, - ) - .await; -} diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index 88c3184b0..08b3120e2 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -1603,17 +1603,20 @@ routing message to a random node so that: } async fn add_node_if_new(&mut self, new_node: KafkaNode) { - let new = self + let missing_from_shared = self .nodes_shared .read() .await .iter() .all(|node| node.broker_id != new_node.broker_id); - if new { + if missing_from_shared { self.nodes_shared.write().await.push(new_node); - - self.update_local_nodes().await; } + + // We need to run this every time, not just when `missing_from_shared`. + // This is because nodes_shared could already contain this node while its missing from local nodes. + // This could happen when another KafkaSinkCluster instance updates nodes_shared just before we read from it. + self.update_local_nodes().await; } fn broker_within_rack(&self, broker_id: BrokerId) -> bool {