diff --git a/shotover-proxy/tests/kafka_int_tests/mod.rs b/shotover-proxy/tests/kafka_int_tests/mod.rs index fe235f36f..1dabe796c 100644 --- a/shotover-proxy/tests/kafka_int_tests/mod.rs +++ b/shotover-proxy/tests/kafka_int_tests/mod.rs @@ -375,6 +375,49 @@ async fn cluster_sasl_scram_over_mtls_single_shotover(#[case] driver: KafkaDrive .expect("Shotover did not shutdown within 10s"); } +#[rstest] +//#[cfg_attr(feature = "kafka-cpp-driver-tests", case::cpp(KafkaDriver::Cpp))] // CPP driver does not support scram +#[case::java(KafkaDriver::Java)] +#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear +async fn cluster_sasl_scram_over_mtls_multi_shotover(#[case] driver: KafkaDriver) { + test_helpers::cert::generate_kafka_test_certs(); + + let _docker_compose = + docker_compose("tests/test-configs/kafka/cluster-sasl-scram-over-mtls/docker-compose.yaml"); + + let mut shotovers = vec![]; + for i in 1..4 { + shotovers.push( + shotover_process(&format!( + "tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology{i}.yaml" + )) + .with_config(&format!( + "tests/test-configs/shotover-config/config{i}.yaml" + )) + .with_log_name(&format!("shotover{i}")) + .start() + .await, + ); + } + + let connection_builder = + KafkaConnectionBuilder::new(driver, "127.0.0.1:9192").use_sasl_scram("user", "password"); + match driver { + #[cfg(feature = "kafka-cpp-driver-tests")] + KafkaDriver::Cpp => test_cases::standard_test_suite(connection_builder).await, + KafkaDriver::Java => test_cases::minimal_test_suite(connection_builder).await, + } + + for shotover in shotovers { + tokio::time::timeout( + Duration::from_secs(10), + shotover.shutdown_and_then_consume_events(&multi_shotover_events(driver)), + ) + .await + .expect("Shotover did not shutdown within 10s"); + } +} + #[rstest] #[cfg_attr(feature = "kafka-cpp-driver-tests", case::cpp(KafkaDriver::Cpp))] #[case::java(KafkaDriver::Java)] diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology-single.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology-single.yaml index 5ec2ed943..7a1b0fd0a 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology-single.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology-single.yaml @@ -12,7 +12,7 @@ sources: local_shotover_broker_id: 0 first_contact_points: ["172.16.1.2:9092"] authorize_scram_over_mtls: - mtls_port_contact_points: ["172.16.1.2:9094", "172.16.1.3:9094", "172.16.1.4:9094"] + mtls_port_contact_points: ["172.16.1.2:9094"] tls: certificate_authority_path: "tests/test-configs/kafka/tls/certs/localhost_CA.crt" certificate_path: "tests/test-configs/kafka/tls/certs/localhost.crt" diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology1.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology1.yaml new file mode 100644 index 000000000..9114fa021 --- /dev/null +++ b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology1.yaml @@ -0,0 +1,33 @@ +--- +sources: + - Kafka: + name: "kafka" + listen_addr: "127.0.0.1:9191" + chain: + - KafkaSinkCluster: + shotover_nodes: + - address: "127.0.0.1:9191" + rack: "rack0" + broker_id: 0 + - address: "127.0.0.1:9192" + rack: "rack0" + broker_id: 1 + - address: "127.0.0.1:9193" + rack: "rack0" + broker_id: 2 + local_shotover_broker_id: 0 + first_contact_points: ["172.16.1.2:9092"] + authorize_scram_over_mtls: + # every shotover node purposefully tests a different number of contact points + mtls_port_contact_points: ["172.16.1.2:9094"] + tls: + certificate_authority_path: "tests/test-configs/kafka/tls/certs/localhost_CA.crt" + certificate_path: "tests/test-configs/kafka/tls/certs/localhost.crt" + private_key_path: "tests/test-configs/kafka/tls/certs/localhost.key" + verify_hostname: true + connect_timeout_ms: 3000 + tls: + certificate_authority_path: "tests/test-configs/kafka/tls/certs/localhost_CA.crt" + certificate_path: "tests/test-configs/kafka/tls/certs/localhost.crt" + private_key_path: "tests/test-configs/kafka/tls/certs/localhost.key" + verify_hostname: true diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology2.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology2.yaml new file mode 100644 index 000000000..e35917ce5 --- /dev/null +++ b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology2.yaml @@ -0,0 +1,33 @@ +--- +sources: + - Kafka: + name: "kafka" + listen_addr: "127.0.0.1:9192" + chain: + - KafkaSinkCluster: + shotover_nodes: + - address: "127.0.0.1:9191" + rack: "rack0" + broker_id: 0 + - address: "127.0.0.1:9192" + rack: "rack0" + broker_id: 1 + - address: "127.0.0.1:9193" + rack: "rack0" + broker_id: 2 + local_shotover_broker_id: 0 + first_contact_points: ["172.16.1.2:9092"] + authorize_scram_over_mtls: + # every shotover node purposefully tests a different number of contact points + mtls_port_contact_points: ["172.16.1.2:9094", "172.16.1.3:9094"] + tls: + certificate_authority_path: "tests/test-configs/kafka/tls/certs/localhost_CA.crt" + certificate_path: "tests/test-configs/kafka/tls/certs/localhost.crt" + private_key_path: "tests/test-configs/kafka/tls/certs/localhost.key" + verify_hostname: true + connect_timeout_ms: 3000 + tls: + certificate_authority_path: "tests/test-configs/kafka/tls/certs/localhost_CA.crt" + certificate_path: "tests/test-configs/kafka/tls/certs/localhost.crt" + private_key_path: "tests/test-configs/kafka/tls/certs/localhost.key" + verify_hostname: true diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology3.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology3.yaml new file mode 100644 index 000000000..ec0a04568 --- /dev/null +++ b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology3.yaml @@ -0,0 +1,33 @@ +--- +sources: + - Kafka: + name: "kafka" + listen_addr: "127.0.0.1:9193" + chain: + - KafkaSinkCluster: + shotover_nodes: + - address: "127.0.0.1:9191" + rack: "rack0" + broker_id: 0 + - address: "127.0.0.1:9192" + rack: "rack0" + broker_id: 1 + - address: "127.0.0.1:9193" + rack: "rack0" + broker_id: 2 + local_shotover_broker_id: 0 + first_contact_points: ["172.16.1.2:9092"] + authorize_scram_over_mtls: + # every shotover node purposefully tests a different number of contact points + mtls_port_contact_points: ["172.16.1.2:9094", "172.16.1.3:9094", "172.16.1.4:9094"] + tls: + certificate_authority_path: "tests/test-configs/kafka/tls/certs/localhost_CA.crt" + certificate_path: "tests/test-configs/kafka/tls/certs/localhost.crt" + private_key_path: "tests/test-configs/kafka/tls/certs/localhost.key" + verify_hostname: true + connect_timeout_ms: 3000 + tls: + certificate_authority_path: "tests/test-configs/kafka/tls/certs/localhost_CA.crt" + certificate_path: "tests/test-configs/kafka/tls/certs/localhost.crt" + private_key_path: "tests/test-configs/kafka/tls/certs/localhost.key" + verify_hostname: true diff --git a/shotover/src/transforms/kafka/sink_cluster/scram_over_mtls.rs b/shotover/src/transforms/kafka/sink_cluster/scram_over_mtls.rs index 6e8e823d8..284218c38 100644 --- a/shotover/src/transforms/kafka/sink_cluster/scram_over_mtls.rs +++ b/shotover/src/transforms/kafka/sink_cluster/scram_over_mtls.rs @@ -15,13 +15,13 @@ use kafka_protocol::{ messages::{ describe_delegation_token_request::DescribeDelegationTokenOwner, ApiKey, CreateDelegationTokenRequest, CreateDelegationTokenResponse, - DescribeDelegationTokenRequest, RequestHeader, + DescribeDelegationTokenRequest, MetadataRequest, RequestHeader, }, protocol::{Builder, StrBytes}, ResponseError, }; -use rand::rngs::SmallRng; -use rand::{prelude::SliceRandom, SeedableRng}; +use rand::SeedableRng; +use rand::{rngs::SmallRng, seq::IteratorRandom}; use serde::{Deserialize, Serialize}; use std::{ collections::HashMap, @@ -88,22 +88,34 @@ async fn task( let mut rng = SmallRng::from_rng(rand::thread_rng())?; let mut username_to_token = HashMap::new(); - let mut connections = vec![]; + let mut nodes = vec![]; while let Some(request) = rx.recv().await { let instant = Instant::now(); - // initialize connections if uninitialized - if connections.is_empty() { + // initialize nodes if uninitialized + if nodes.is_empty() { let mut futures = FuturesUnordered::new(); for address in mtls_addresses { - futures.push( - mtls_connection_factory + futures.push(async move { + let connection = match mtls_connection_factory // Must be unauthed since mTLS is its own auth. - .create_connection_unauthed(address), - ); + .create_connection_unauthed(address) + .await + { + Ok(connection) => Some(connection), + Err(err) => { + tracing::error!("Token Task: Failed to create connection for {address:?} during nodes list init {err}"); + None + } + }; + Node { + connection, + address: address.clone(), + } + }); } - while let Some(connection) = futures.next().await { - connections.push(connection.context("Failed to create connection")?); + while let Some(node) = futures.next().await { + nodes.push(node); } } @@ -118,9 +130,10 @@ async fn task( let token = tokio::time::timeout( Duration::from_secs(120), create_delegation_token_for_user_with_wait( - &mut connections, + &mut nodes, username.clone(), &mut rng, + mtls_connection_factory, ), ) .await @@ -204,14 +217,25 @@ pub enum OriginalScramState { AuthSuccess, } -pub async fn create_delegation_token_for_user_with_wait( - connections: &mut [SinkConnection], +async fn create_delegation_token_for_user_with_wait( + nodes: &mut Vec, username: StrBytes, rng: &mut SmallRng, + mtls_connection_factory: &ConnectionFactory, ) -> Result { - let create_response = create_delegation_token_for_user(connections, &username, rng).await?; - wait_until_delegation_token_ready_on_all_brokers(connections, &create_response, username) - .await?; + let create_response = create_delegation_token_for_user(nodes, &username, rng).await?; + // we specifically run find_new_brokers: + // * after token creation since we are waiting for token propagation anyway. + // * before waiting on brokers because we need to wait on the entire cluster, + // so we want our node list to be as up to date as possible. + find_new_brokers(nodes, rng).await?; + wait_until_delegation_token_ready_on_all_brokers( + nodes, + &create_response, + username, + mtls_connection_factory, + ) + .await?; Ok(DelegationToken { token_id: create_response.token_id.as_str().to_owned(), @@ -219,12 +243,81 @@ pub async fn create_delegation_token_for_user_with_wait( }) } -pub async fn create_delegation_token_for_user( - connections: &mut [SinkConnection], +/// populate existing nodes +/// If no nodes have a connection open an error will be returned. +async fn find_new_brokers(nodes: &mut Vec, rng: &mut SmallRng) -> Result<()> { + let Some(node) = nodes + .iter_mut() + .filter(|node| node.connection.is_some()) + .choose(rng) + else { + return Err(anyhow!("No nodes have an open connection")); + }; + let connection = node + .connection + .as_mut() + .expect("Gauranteed due to above filter"); + + let request = Message::from_frame(Frame::Kafka(KafkaFrame::Request { + header: RequestHeader::builder() + .request_api_key(ApiKey::MetadataKey as i16) + .request_api_version(4) + .correlation_id(0) + .build() + .unwrap(), + body: RequestBody::Metadata(MetadataRequest::builder().build().unwrap()), + })); + connection.send(vec![request])?; + + let response = connection.recv().await?.remove(0); + match response.into_frame() { + Some(Frame::Kafka(KafkaFrame::Response { + body: ResponseBody::Metadata(metadata), + .. + })) => { + let new_nodes: Vec = metadata + .brokers + .into_values() + .filter_map(|broker| { + let address = KafkaAddress::new(broker.host, broker.port); + if nodes.iter().any(|node| node.address == address) { + None + } else { + Some(Node { + address, + connection: None, + }) + } + }) + .collect(); + nodes.extend(new_nodes); + Ok(()) + } + other => Err(anyhow!( + "Unexpected message returned to metadata request {other:?}" + )), + } +} + +/// Create a delegation token for the provided user. +/// If no nodes have a connection open an error will be returned. +async fn create_delegation_token_for_user( + nodes: &mut [Node], username: &StrBytes, rng: &mut SmallRng, ) -> Result { - let connection = connections.choose_mut(rng).unwrap(); + let Some(node) = nodes + .iter_mut() + .filter(|node| node.connection.is_some()) + .choose(rng) + else { + return Err(anyhow!("No nodes have an open connection")); + }; + let connection = node + .connection + .as_mut() + .expect("Gauranteed due to above filter"); + connection.send(vec![Message::from_frame(Frame::Kafka( KafkaFrame::Request { header: RequestHeader::builder() @@ -262,22 +355,42 @@ pub async fn create_delegation_token_for_user( } } +/// Wait until delegation token is ready on all brokers. +/// Will create connections for all nodes that dont have one yet. +/// If a broker is inaccessible it will count as ready to prevent a node going down from stopping delegation token creation. async fn wait_until_delegation_token_ready_on_all_brokers( - connections: &mut [SinkConnection], + nodes: &mut [Node], create_response: &CreateDelegationTokenResponse, username: StrBytes, + mtls_connection_factory: &ConnectionFactory, ) -> Result<()> { - let connections_len = connections.len(); - for (i, connection) in connections.iter_mut().enumerate() { - while !is_delegation_token_ready(connection, create_response, username.clone()) - .await - .with_context(|| { - format!("Failed to check delegation token was ready. Succesful connections {i}/{connections_len}") - })? - { - tokio::time::sleep(Duration::from_millis(10)).await; + let nodes_len = nodes.len(); + for (i, node) in nodes.iter_mut().enumerate() { + let address = &node.address; + if node.connection.is_none() { + node.connection = match mtls_connection_factory + // Must be unauthed since mTLS is its own auth. + .create_connection_unauthed(address) + .await + { + Ok(connection) => Some(connection), + Err(err) => { + tracing::error!("Token Task: Failed to create connection for {address:?} during token wait {err}"); + None + } + }; + } + if let Some(connection) = &mut node.connection { + while !is_delegation_token_ready(connection, create_response, username.clone()) + .await + .with_context(|| { + format!("Failed to check delegation token was ready on broker {address:?}. Succesful connections {i}/{nodes_len}") + })? + { + tokio::time::sleep(Duration::from_millis(10)).await; + } + tracing::debug!("finished checking token is ready on broker {address:?}"); } - tracing::debug!("finished checking token is ready on connection"); } Ok(()) @@ -340,6 +453,11 @@ async fn is_delegation_token_ready( } } +struct Node { + address: KafkaAddress, + connection: Option, +} + #[derive(Clone)] pub struct DelegationToken { pub token_id: String,