diff --git a/docs/src/sources.md b/docs/src/sources.md index 1a77d4557..29a3794d1 100644 --- a/docs/src/sources.md +++ b/docs/src/sources.md @@ -105,6 +105,17 @@ Kafka: # If not provided defaults to false hard_connection_limit: false + # When this field is provided TLS is used when the client connects to Shotover. + # Removing this field will disable TLS. + #tls: + # # Path to the certificate file, typically named with a .crt extension. + # certificate_path: "tls/localhost.crt" + # # Path to the private key file, typically named with a .key extension. + # private_key_path: "tls/localhost.key" + # # Path to the certificate authority file, typically named with a .crt extension. + # # When this field is provided client authentication will be enabled. + # #certificate_authority_path: "tls/localhost_CA.crt" + # Timeout in seconds after which to terminate an idle connection. This field is optional, if not provided, idle connections will never be terminated. # timeout: 60 diff --git a/docs/src/transforms.md b/docs/src/transforms.md index 801a96ba6..33bad3387 100644 --- a/docs/src/transforms.md +++ b/docs/src/transforms.md @@ -308,6 +308,18 @@ This is achieved by rewriting the FindCoordinator, Metadata and DescribeCluster # This field is optional, if not provided, timeout will never occur. # When a timeout occurs the connection to the client is immediately closed. # read_timeout: 60 + + # When this field is provided TLS is used when connecting to the remote address. + # Removing this field will disable TLS. + #tls: + # # Path to the certificate authority file, typically named with a .crt extension. + # certificate_authority_path: "tls/localhost_CA.crt" + # # Path to the certificate file, typically named with a .crt extension. + # certificate_path: "tls/localhost.crt" + # # Path to the private key file, typically named with a .key extension. + # private_key_path: "tls/localhost.key" + # # Enable/disable verifying the hostname of the certificate provided by the destination. + # #verify_hostname: true ``` ### KafkaSinkSingle @@ -332,6 +344,18 @@ In order to force clients to connect through shotover the FindCoordinator, Metad # This field is optional, if not provided, timeout will never occur. # When a timeout occurs the connection to the client is immediately closed. # read_timeout: 60 + + # When this field is provided TLS is used when connecting to the remote address. + # Removing this field will disable TLS. + #tls: + # # Path to the certificate authority file, typically named with a .crt extension. + # certificate_authority_path: "tls/localhost_CA.crt" + # # Path to the certificate file, typically named with a .crt extension. + # certificate_path: "tls/localhost.crt" + # # Path to the private key file, typically named with a .key extension. + # private_key_path: "tls/localhost.key" + # # Enable/disable verifying the hostname of the certificate provided by the destination. + # #verify_hostname: true ``` This transfrom emits a metrics [counter](user-guide/observability.md#counter) named `failed_requests` and the labels `transform` defined as `CassandraSinkSingle` and `chain` as the name of the chain that this transform is in. diff --git a/shotover-proxy/benches/windsock/kafka/bench.rs b/shotover-proxy/benches/windsock/kafka/bench.rs index dbf396a65..1e34cc7fd 100644 --- a/shotover-proxy/benches/windsock/kafka/bench.rs +++ b/shotover-proxy/benches/windsock/kafka/bench.rs @@ -93,6 +93,7 @@ impl KafkaBench { read_timeout: None, first_contact_points: vec![kafka_address], shotover_nodes: vec![host_address.clone()], + tls: None, }), }); common::generate_topology(SourceConfig::Kafka(shotover::sources::kafka::KafkaConfig { diff --git a/shotover-proxy/tests/kafka_int_tests/mod.rs b/shotover-proxy/tests/kafka_int_tests/mod.rs index e6ff541f2..ae8f619ad 100644 --- a/shotover-proxy/tests/kafka_int_tests/mod.rs +++ b/shotover-proxy/tests/kafka_int_tests/mod.rs @@ -53,6 +53,28 @@ async fn passthrough_tls() { .expect("Shotover did not shutdown within 10s"); } +#[cfg(feature = "rdkafka-driver-tests")] +#[tokio::test] +async fn cluster_tls() { + test_helpers::cert::generate_kafka_test_certs(); + + let _docker_compose = + docker_compose("tests/test-configs/kafka/cluster-tls/docker-compose.yaml"); + let shotover = shotover_process("tests/test-configs/kafka/cluster-tls/topology.yaml") + .start() + .await; + + let connection_builder = KafkaConnectionBuilder::new("127.0.0.1:9192"); + test_cases::basic(connection_builder).await; + + tokio::time::timeout( + Duration::from_secs(10), + shotover.shutdown_and_then_consume_events(&[]), + ) + .await + .expect("Shotover did not shutdown within 10s"); +} + #[cfg(feature = "rdkafka-driver-tests")] #[tokio::test] async fn passthrough_encode() { diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-tls/docker-compose.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-tls/docker-compose.yaml new file mode 100644 index 000000000..7e019385c --- /dev/null +++ b/shotover-proxy/tests/test-configs/kafka/cluster-tls/docker-compose.yaml @@ -0,0 +1,54 @@ +version: "3" +networks: + cluster_subnet: + name: cluster_subnet + driver: bridge + ipam: + driver: default + config: + - subnet: 172.16.1.0/24 + gateway: 172.16.1.1 +services: + kafka0: + image: &image 'bitnami/kafka:3.6.1-debian-11-r24' + networks: + cluster_subnet: + ipv4_address: 172.16.1.2 + environment: &environment + KAFKA_CFG_LISTENERS: "BROKER://:9092,CONTROLLER://:9093" + KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.2:9092" + KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,BROKER:SSL" + KAFKA_CFG_INTER_BROKER_LISTENER_NAME: "BROKER" + KAFKA_CFG_CONTROLLER_LISTENER_NAMES: "CONTROLLER" + KAFKA_CFG_PROCESS_ROLES: "controller,broker" + KAFKA_KRAFT_CLUSTER_ID: "abcdefghijklmnopqrstuv" + KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: "0@kafka0:9093,1@kafka1:9093,2@kafka2:9093" + KAFKA_CFG_NODE_ID: 0 + KAFKA_CERTIFICATE_PASSWORD: password + KAFKA_TLS_CLIENT_AUTH: none + volumes: &volumes + - type: tmpfs + target: /bitnami/kafka + - type: bind + source: "../tls/certs" + target: "/opt/bitnami/kafka/config/certs" + kafka1: + image: *image + networks: + cluster_subnet: + ipv4_address: 172.16.1.3 + environment: + <<: *environment + KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.3:9092" + KAFKA_CFG_NODE_ID: 1 + volumes: *volumes + kafka2: + image: *image + networks: + cluster_subnet: + ipv4_address: 172.16.1.4 + environment: + <<: *environment + KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.4:9092" + KAFKA_CFG_NODE_ID: 2 + volumes: *volumes diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-tls/topology.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-tls/topology.yaml new file mode 100644 index 000000000..1d4a8b514 --- /dev/null +++ b/shotover-proxy/tests/test-configs/kafka/cluster-tls/topology.yaml @@ -0,0 +1,13 @@ +--- +sources: + - Kafka: + name: "kafka" + listen_addr: "127.0.0.1:9192" + chain: + - KafkaSinkCluster: + shotover_nodes: ["127.0.0.1:9192"] + first_contact_points: ["172.16.1.2:9092"] + connect_timeout_ms: 3000 + tls: + certificate_authority_path: "tests/test-configs/kafka/tls/certs/localhost_CA.crt" + verify_hostname: true diff --git a/shotover-proxy/tests/test-configs/kafka/passthrough-tls/docker-compose.yaml b/shotover-proxy/tests/test-configs/kafka/passthrough-tls/docker-compose.yaml index d4c38e582..cfb2f4f93 100644 --- a/shotover-proxy/tests/test-configs/kafka/passthrough-tls/docker-compose.yaml +++ b/shotover-proxy/tests/test-configs/kafka/passthrough-tls/docker-compose.yaml @@ -13,7 +13,6 @@ services: - KAFKA_CFG_PROCESS_ROLES=controller,broker - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka0:9093 - KAFKA_CFG_NODE_ID=0 - - ALLOW_PLAINTEXT_LISTENER=yes - KAFKA_CERTIFICATE_PASSWORD=password - KAFKA_TLS_CLIENT_AUTH=none volumes: diff --git a/shotover/src/transforms/kafka/sink_cluster.rs b/shotover/src/transforms/kafka/sink_cluster.rs index 276eeb330..1ac0f8fc9 100644 --- a/shotover/src/transforms/kafka/sink_cluster.rs +++ b/shotover/src/transforms/kafka/sink_cluster.rs @@ -4,6 +4,7 @@ use crate::frame::kafka::{strbytes, KafkaFrame, RequestBody, ResponseBody}; use crate::frame::Frame; use crate::message::{Message, Messages}; use crate::tcp; +use crate::tls::{TlsConnector, TlsConnectorConfig}; use crate::transforms::util::cluster_connection_pool::{spawn_read_write_tasks, Connection}; use crate::transforms::util::{Request, Response}; use crate::transforms::{Transform, TransformBuilder, Wrapper}; @@ -29,6 +30,7 @@ use std::net::SocketAddr; use std::sync::atomic::AtomicI64; use std::sync::Arc; use std::time::Duration; +use tokio::io::split; use tokio::sync::{mpsc, oneshot, RwLock}; use tokio::time::timeout; @@ -39,6 +41,7 @@ pub struct KafkaSinkClusterConfig { pub shotover_nodes: Vec, pub connect_timeout_ms: u64, pub read_timeout: Option, + pub tls: Option, } const NAME: &str = "KafkaSinkCluster"; @@ -49,12 +52,14 @@ impl TransformConfig for KafkaSinkClusterConfig { &self, transform_context: TransformContextConfig, ) -> Result> { + let tls = self.tls.clone().map(TlsConnector::new).transpose()?; Ok(Box::new(KafkaSinkClusterBuilder::new( self.first_contact_points.clone(), self.shotover_nodes.clone(), transform_context.chain_name, self.connect_timeout_ms, self.read_timeout, + tls, ))) } } @@ -69,6 +74,7 @@ pub struct KafkaSinkClusterBuilder { group_to_coordinator_broker: Arc>, topics: Arc>, nodes_shared: Arc>>, + tls: Option, } impl KafkaSinkClusterBuilder { @@ -78,6 +84,7 @@ impl KafkaSinkClusterBuilder { _chain_name: String, connect_timeout_ms: u64, timeout: Option, + tls: Option, ) -> KafkaSinkClusterBuilder { let receive_timeout = timeout.map(Duration::from_secs); @@ -101,6 +108,7 @@ impl KafkaSinkClusterBuilder { group_to_coordinator_broker: Arc::new(DashMap::new()), topics: Arc::new(DashMap::new()), nodes_shared: Arc::new(RwLock::new(vec![])), + tls, } } } @@ -119,6 +127,7 @@ impl TransformBuilder for KafkaSinkClusterBuilder { group_to_coordinator_broker: self.group_to_coordinator_broker.clone(), topics: self.topics.clone(), rng: SmallRng::from_rng(rand::thread_rng()).unwrap(), + tls: self.tls.clone(), }) } @@ -165,6 +174,7 @@ pub struct KafkaSinkCluster { group_to_coordinator_broker: Arc>, topics: Arc>, rng: SmallRng, + tls: Option, } #[async_trait] @@ -336,8 +346,11 @@ impl KafkaSinkCluster { as usize]; for node in &mut self.nodes { if node.broker_id == partition.leader_id { - connection = - Some(node.get_connection(self.connect_timeout).await?.clone()); + connection = Some( + node.get_connection(self.connect_timeout, &self.tls) + .await? + .clone(), + ); } } } @@ -348,7 +361,7 @@ impl KafkaSinkCluster { self.nodes .choose_mut(&mut self.rng) .unwrap() - .get_connection(self.connect_timeout) + .get_connection(self.connect_timeout, &self.tls) .await? .clone() } @@ -388,7 +401,7 @@ impl KafkaSinkCluster { .filter(|node| partition.replica_nodes.contains(&node.broker_id)) .choose(&mut self.rng) .unwrap() - .get_connection(self.connect_timeout) + .get_connection(self.connect_timeout, &self.tls) .await? .clone() } else { @@ -397,7 +410,7 @@ impl KafkaSinkCluster { self.nodes .choose_mut(&mut self.rng) .unwrap() - .get_connection(self.connect_timeout) + .get_connection(self.connect_timeout, &self.tls) .await? .clone() }; @@ -461,7 +474,7 @@ impl KafkaSinkCluster { .nodes .choose_mut(&mut self.rng) .unwrap() - .get_connection(self.connect_timeout) + .get_connection(self.connect_timeout, &self.tls) .await?; let (tx, rx) = oneshot::channel(); connection @@ -498,7 +511,7 @@ impl KafkaSinkCluster { .nodes .choose_mut(&mut self.rng) .unwrap() - .get_connection(self.connect_timeout) + .get_connection(self.connect_timeout, &self.tls) .await?; let (tx, rx) = oneshot::channel(); connection @@ -556,7 +569,7 @@ impl KafkaSinkCluster { .nodes .choose_mut(&mut self.rng) .unwrap() - .get_connection(self.connect_timeout) + .get_connection(self.connect_timeout, &self.tls) .await?; let (tx, rx) = oneshot::channel(); connection @@ -665,13 +678,15 @@ impl KafkaSinkCluster { let connection = if let Some(node) = self.nodes.iter_mut().find(|x| x.broker_id == *broker_id) { - node.get_connection(self.connect_timeout).await?.clone() + node.get_connection(self.connect_timeout, &self.tls) + .await? + .clone() } else { tracing::warn!("no known broker with id {broker_id:?}, routing message to a random node so that a NOT_CONTROLLER or similar error is returned to the client"); self.nodes .choose_mut(&mut self.rng) .unwrap() - .get_connection(self.connect_timeout) + .get_connection(self.connect_timeout, &self.tls) .await? .clone() }; @@ -695,7 +710,11 @@ impl KafkaSinkCluster { for node in &mut self.nodes { if let Some(broker_id) = self.group_to_coordinator_broker.get(&group_id) { if node.broker_id == *broker_id { - connection = Some(node.get_connection(self.connect_timeout).await?.clone()); + connection = Some( + node.get_connection(self.connect_timeout, &self.tls) + .await? + .clone(), + ); } } } @@ -706,7 +725,7 @@ impl KafkaSinkCluster { self.nodes .choose_mut(&mut self.rng) .unwrap() - .get_connection(self.connect_timeout) + .get_connection(self.connect_timeout, &self.tls) .await? .clone() } @@ -873,19 +892,26 @@ struct KafkaNode { } impl KafkaNode { - async fn get_connection(&mut self, connect_timeout: Duration) -> Result<&Connection> { + async fn get_connection( + &mut self, + connect_timeout: Duration, + tls: &Option, + ) -> Result<&Connection> { if self.connection.is_none() { let codec = KafkaCodecBuilder::new(Direction::Sink, "KafkaSinkCluster".to_owned()); - let tcp_stream = tcp::tcp_stream( - connect_timeout, - ( - self.kafka_address.host.to_string(), - self.kafka_address.port as u16, - ), - ) - .await?; - let (rx, tx) = tcp_stream.into_split(); - self.connection = Some(spawn_read_write_tasks(&codec, rx, tx)); + let address = ( + self.kafka_address.host.to_string(), + self.kafka_address.port as u16, + ); + if let Some(tls) = tls.as_ref() { + let tls_stream = tls.connect(connect_timeout, address).await?; + let (rx, tx) = split(tls_stream); + self.connection = Some(spawn_read_write_tasks(&codec, rx, tx)); + } else { + let tcp_stream = tcp::tcp_stream(connect_timeout, address).await?; + let (rx, tx) = tcp_stream.into_split(); + self.connection = Some(spawn_read_write_tasks(&codec, rx, tx)); + } } Ok(self.connection.as_ref().unwrap()) } diff --git a/shotover/src/transforms/kafka/sink_single.rs b/shotover/src/transforms/kafka/sink_single.rs index b78e873bc..a3c3be49d 100644 --- a/shotover/src/transforms/kafka/sink_single.rs +++ b/shotover/src/transforms/kafka/sink_single.rs @@ -114,21 +114,13 @@ impl Transform for KafkaSinkSingle { async fn transform<'a>(&'a mut self, mut requests_wrapper: Wrapper<'a>) -> Result { if self.outbound.is_none() { let codec = KafkaCodecBuilder::new(Direction::Sink, "KafkaSinkSingle".to_owned()); + let address = (requests_wrapper.local_addr.ip(), self.address_port); if let Some(tls) = self.tls.as_mut() { - let tls_stream = tls - .connect( - self.connect_timeout, - (requests_wrapper.local_addr.ip(), self.address_port), - ) - .await?; + let tls_stream = tls.connect(self.connect_timeout, address).await?; let (rx, tx) = split(tls_stream); self.outbound = Some(spawn_read_write_tasks(&codec, rx, tx)); } else { - let tcp_stream = tcp::tcp_stream( - self.connect_timeout, - (requests_wrapper.local_addr.ip(), self.address_port), - ) - .await?; + let tcp_stream = tcp::tcp_stream(self.connect_timeout, address).await?; let (rx, tx) = tcp_stream.into_split(); self.outbound = Some(spawn_read_write_tasks(&codec, rx, tx)); }