Skip to content

Commit

Permalink
KafkaSinkCluster scram_over_mtls - full integration tests (#1651)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Jun 13, 2024
1 parent 796e346 commit 9f89390
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 4 deletions.
34 changes: 34 additions & 0 deletions shotover-proxy/tests/kafka_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use pretty_assertions::assert_eq;
use rstest::rstest;
use std::time::Duration;
use std::time::Instant;
use test_cases::produce_consume_partitions1;
use test_cases::{assert_topic_creation_is_denied_due_to_acl, setup_basic_user_acls};
use test_helpers::connection::kafka::{KafkaConnectionBuilder, KafkaDriver};
use test_helpers::docker_compose::docker_compose;
Expand Down Expand Up @@ -390,6 +391,39 @@ async fn cluster_sasl_scram_over_mtls_single_shotover(#[case] driver: KafkaDrive
.await
.expect("Shotover did not shutdown within 10s");
}

// rerun same tests as before with different ordering
{
let shotover = shotover_process(
"tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology-single.yaml",
)
.start()
.await;

// admin requests sent by regular user are unsuccessful
assert_connection_fails_with_incorrect_password(driver, "basic_user").await;
let connection_basic = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192")
.use_sasl_scram("basic_user", "basic_password");
assert_topic_creation_is_denied_due_to_acl(&connection_basic).await;
assert_connection_fails_with_incorrect_password(driver, "basic_user").await;

// admin requests sent by admin user are successful
// admin requests sent by regular user remain unsuccessful
let connection_super = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192")
.use_sasl_scram("super_user", "super_password");
produce_consume_partitions1(&connection_super, "c3220ff0-9390-425d-a56d-9d880a339c8c")
.await;
assert_topic_creation_is_denied_due_to_acl(&connection_basic).await;
assert_connection_fails_with_incorrect_password(driver, "basic_user").await;
assert_connection_fails_with_incorrect_password(driver, "super_user").await;

tokio::time::timeout(
Duration::from_secs(10),
shotover.shutdown_and_then_consume_events(&[]),
)
.await
.expect("Shotover did not shutdown within 10s");
}
}

async fn assert_connection_fails_with_incorrect_password(driver: KafkaDriver, username: &str) {
Expand Down
11 changes: 7 additions & 4 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1603,17 +1603,20 @@ routing message to a random node so that:
}

async fn add_node_if_new(&mut self, new_node: KafkaNode) {
let new = self
let missing_from_shared = self
.nodes_shared
.read()
.await
.iter()
.all(|node| node.broker_id != new_node.broker_id);
if new {
if missing_from_shared {
self.nodes_shared.write().await.push(new_node);

self.update_local_nodes().await;
}

// We need to run this every time, not just when missing_from_shared.
// This is because nodes_shared could already contain new_node while its missing from `self.nodes`.
// This could happen when another KafkaSinkCluster instance updates nodes_shared just before we read from it.
self.update_local_nodes().await;
}

fn broker_within_rack(&self, broker_id: BrokerId) -> bool {
Expand Down
4 changes: 4 additions & 0 deletions shotover/src/transforms/kafka/sink_cluster/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::tls::TlsConnector;
use crate::transforms::kafka::sink_cluster::SASL_SCRAM_MECHANISMS;
use anyhow::{anyhow, Context, Result};
use bytes::Bytes;
use derivative::Derivative;
use kafka_protocol::messages::{ApiKey, BrokerId, RequestHeader, SaslAuthenticateRequest};
use kafka_protocol::protocol::{Builder, StrBytes};
use kafka_protocol::ResponseError;
Expand Down Expand Up @@ -256,10 +257,13 @@ impl KafkaAddress {
}
}

#[derive(Derivative)]
#[derivative(Debug)]
pub struct KafkaNode {
pub broker_id: BrokerId,
pub rack: Option<StrBytes>,
pub kafka_address: KafkaAddress,
#[derivative(Debug = "ignore")]
connection: Option<SinkConnection>,
}

Expand Down

0 comments on commit 9f89390

Please sign in to comment.