diff --git a/shotover-proxy/tests/kafka_int_tests/mod.rs b/shotover-proxy/tests/kafka_int_tests/mod.rs index 589236694..bff1272ac 100644 --- a/shotover-proxy/tests/kafka_int_tests/mod.rs +++ b/shotover-proxy/tests/kafka_int_tests/mod.rs @@ -5,6 +5,7 @@ use pretty_assertions::assert_eq; use rstest::rstest; use std::time::Duration; use std::time::Instant; +use test_cases::produce_consume_partitions1; use test_cases::{assert_topic_creation_is_denied_due_to_acl, setup_basic_user_acls}; use test_helpers::connection::kafka::{KafkaConnectionBuilder, KafkaDriver}; use test_helpers::docker_compose::docker_compose; @@ -390,6 +391,39 @@ async fn cluster_sasl_scram_over_mtls_single_shotover(#[case] driver: KafkaDrive .await .expect("Shotover did not shutdown within 10s"); } + + // rerun same tests as before with different ordering + { + let shotover = shotover_process( + "tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology-single.yaml", + ) + .start() + .await; + + // admin requests sent by regular user are unsuccessful + assert_connection_fails_with_incorrect_password(driver, "basic_user").await; + let connection_basic = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192") + .use_sasl_scram("basic_user", "basic_password"); + assert_topic_creation_is_denied_due_to_acl(&connection_basic).await; + assert_connection_fails_with_incorrect_password(driver, "basic_user").await; + + // admin requests sent by admin user are successful + // admin requests sent by regular user remain unsuccessful + let connection_super = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192") + .use_sasl_scram("super_user", "super_password"); + produce_consume_partitions1(&connection_super, "c3220ff0-9390-425d-a56d-9d880a339c8c") + .await; + assert_topic_creation_is_denied_due_to_acl(&connection_basic).await; + assert_connection_fails_with_incorrect_password(driver, "basic_user").await; + assert_connection_fails_with_incorrect_password(driver, "super_user").await; + + tokio::time::timeout( + Duration::from_secs(10), + shotover.shutdown_and_then_consume_events(&[]), + ) + .await + .expect("Shotover did not shutdown within 10s"); + } } async fn assert_connection_fails_with_incorrect_password(driver: KafkaDriver, username: &str) { diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index 88c3184b0..dae3d7db9 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -1603,17 +1603,20 @@ routing message to a random node so that: } async fn add_node_if_new(&mut self, new_node: KafkaNode) { - let new = self + let missing_from_shared = self .nodes_shared .read() .await .iter() .all(|node| node.broker_id != new_node.broker_id); - if new { + if missing_from_shared { self.nodes_shared.write().await.push(new_node); - - self.update_local_nodes().await; } + + // We need to run this every time, not just when missing_from_shared. + // This is because nodes_shared could already contain new_node while its missing from `self.nodes`. + // This could happen when another KafkaSinkCluster instance updates nodes_shared just before we read from it. + self.update_local_nodes().await; } fn broker_within_rack(&self, broker_id: BrokerId) -> bool { diff --git a/shotover/src/transforms/kafka/sink_cluster/node.rs b/shotover/src/transforms/kafka/sink_cluster/node.rs index 84011703a..a814294f4 100644 --- a/shotover/src/transforms/kafka/sink_cluster/node.rs +++ b/shotover/src/transforms/kafka/sink_cluster/node.rs @@ -8,6 +8,7 @@ use crate::tls::TlsConnector; use crate::transforms::kafka::sink_cluster::SASL_SCRAM_MECHANISMS; use anyhow::{anyhow, Context, Result}; use bytes::Bytes; +use derivative::Derivative; use kafka_protocol::messages::{ApiKey, BrokerId, RequestHeader, SaslAuthenticateRequest}; use kafka_protocol::protocol::{Builder, StrBytes}; use kafka_protocol::ResponseError; @@ -256,10 +257,13 @@ impl KafkaAddress { } } +#[derive(Derivative)] +#[derivative(Debug)] pub struct KafkaNode { pub broker_id: BrokerId, pub rack: Option, pub kafka_address: KafkaAddress, + #[derivative(Debug = "ignore")] connection: Option, }