From 10b04e6a036b532cdfa7d61b0b4c0a6932717a87 Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Wed, 21 Feb 2024 13:27:12 +1100 Subject: [PATCH 1/2] Add TLS support for KafkaSinkSingle (#1488) --- .gitignore | 1 + .../benches/windsock/kafka/bench.rs | 1 + shotover-proxy/tests/kafka_int_tests/mod.rs | 21 ++++++++ .../kafka/passthrough-tls/docker-compose.yaml | 24 +++++++++ .../kafka/passthrough-tls/topology.yaml | 12 +++++ shotover/src/transforms/kafka/sink_single.rs | 35 ++++++++++--- test-helpers/src/cert.rs | 52 +++++++++++++++++++ test-helpers/src/docker_compose.rs | 7 ++- 8 files changed, 145 insertions(+), 8 deletions(-) create mode 100644 shotover-proxy/tests/test-configs/kafka/passthrough-tls/docker-compose.yaml create mode 100644 shotover-proxy/tests/test-configs/kafka/passthrough-tls/topology.yaml diff --git a/.gitignore b/.gitignore index 281f0628b..77e93dac0 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,7 @@ /.vscode /shotover-proxy/tests/test-configs/redis/tls/certs /shotover-proxy/tests/test-configs/cassandra/tls/certs +/shotover-proxy/tests/test-configs/kafka/tls/certs .workspace.code-workspace **/.DS_Store /.project diff --git a/shotover-proxy/benches/windsock/kafka/bench.rs b/shotover-proxy/benches/windsock/kafka/bench.rs index 62d563cc9..770b5a807 100644 --- a/shotover-proxy/benches/windsock/kafka/bench.rs +++ b/shotover-proxy/benches/windsock/kafka/bench.rs @@ -84,6 +84,7 @@ impl KafkaBench { destination_port: 9192, connect_timeout_ms: 3000, read_timeout: None, + tls: None, }), KafkaTopology::Cluster1 | KafkaTopology::Cluster3 => Box::new(KafkaSinkClusterConfig { connect_timeout_ms: 3000, diff --git a/shotover-proxy/tests/kafka_int_tests/mod.rs b/shotover-proxy/tests/kafka_int_tests/mod.rs index 91b6b7a32..0195e449e 100644 --- a/shotover-proxy/tests/kafka_int_tests/mod.rs +++ b/shotover-proxy/tests/kafka_int_tests/mod.rs @@ -27,6 +27,27 @@ async fn passthrough_standard() { .expect("Shotover did not shutdown within 10s"); } +#[cfg(feature = "rdkafka-driver-tests")] +#[tokio::test] +async fn passthrough_tls() { + test_helpers::cert::generate_kafka_test_certs(); + + let _docker_compose = + docker_compose("tests/test-configs/kafka/passthrough-tls/docker-compose.yaml"); + let shotover = shotover_process("tests/test-configs/kafka/passthrough-tls/topology.yaml") + .start() + .await; + + test_cases::basic("127.0.0.1:9192").await; + + tokio::time::timeout( + Duration::from_secs(10), + shotover.shutdown_and_then_consume_events(&[]), + ) + .await + .expect("Shotover did not shutdown within 10s"); +} + #[cfg(feature = "rdkafka-driver-tests")] #[tokio::test] async fn passthrough_encode() { diff --git a/shotover-proxy/tests/test-configs/kafka/passthrough-tls/docker-compose.yaml b/shotover-proxy/tests/test-configs/kafka/passthrough-tls/docker-compose.yaml new file mode 100644 index 000000000..d4c38e582 --- /dev/null +++ b/shotover-proxy/tests/test-configs/kafka/passthrough-tls/docker-compose.yaml @@ -0,0 +1,24 @@ +version: "3" +services: + kafka0: + image: 'bitnami/kafka:3.6.1-debian-11-r24' + ports: + - '9092:9092' + environment: + - KAFKA_CFG_LISTENERS=BROKER://:9092,CONTROLLER://:9093 + - KAFKA_CFG_ADVERTISED_LISTENERS=BROKER://127.0.0.1:9092 + - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,BROKER:SSL + - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=BROKER + - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER + - KAFKA_CFG_PROCESS_ROLES=controller,broker + - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka0:9093 + - KAFKA_CFG_NODE_ID=0 + - ALLOW_PLAINTEXT_LISTENER=yes + - KAFKA_CERTIFICATE_PASSWORD=password + - KAFKA_TLS_CLIENT_AUTH=none + volumes: + - type: tmpfs + target: /bitnami/kafka + - type: bind + source: "../tls/certs" + target: "/opt/bitnami/kafka/config/certs" diff --git a/shotover-proxy/tests/test-configs/kafka/passthrough-tls/topology.yaml b/shotover-proxy/tests/test-configs/kafka/passthrough-tls/topology.yaml new file mode 100644 index 000000000..3e66ee1f2 --- /dev/null +++ b/shotover-proxy/tests/test-configs/kafka/passthrough-tls/topology.yaml @@ -0,0 +1,12 @@ +--- +sources: + - Kafka: + name: "kafka" + listen_addr: "127.0.0.1:9192" + chain: + - KafkaSinkSingle: + destination_port: 9092 + connect_timeout_ms: 3000 + tls: + certificate_authority_path: "tests/test-configs/kafka/tls/certs/localhost_CA.crt" + verify_hostname: true diff --git a/shotover/src/transforms/kafka/sink_single.rs b/shotover/src/transforms/kafka/sink_single.rs index 6a8740cef..3d985f303 100644 --- a/shotover/src/transforms/kafka/sink_single.rs +++ b/shotover/src/transforms/kafka/sink_single.rs @@ -3,6 +3,7 @@ use crate::frame::kafka::{KafkaFrame, RequestBody, ResponseBody}; use crate::frame::Frame; use crate::message::{Message, Messages}; use crate::tcp; +use crate::tls::{TlsConnector, TlsConnectorConfig}; use crate::transforms::kafka::common::produce_channel; use crate::transforms::util::cluster_connection_pool::{spawn_read_write_tasks, Connection}; use crate::transforms::util::{Request, Response}; @@ -11,6 +12,7 @@ use anyhow::{anyhow, Result}; use async_trait::async_trait; use serde::{Deserialize, Serialize}; use std::time::Duration; +use tokio::io::split; use tokio::sync::{mpsc, oneshot}; use tokio::time::timeout; @@ -22,6 +24,7 @@ pub struct KafkaSinkSingleConfig { pub destination_port: u16, pub connect_timeout_ms: u64, pub read_timeout: Option, + pub tls: Option, } use crate::transforms::TransformConfig; @@ -31,11 +34,13 @@ const NAME: &str = "KafkaSinkSingle"; #[async_trait(?Send)] impl TransformConfig for KafkaSinkSingleConfig { async fn get_builder(&self, chain_name: String) -> Result> { + let tls = self.tls.clone().map(TlsConnector::new).transpose()?; Ok(Box::new(KafkaSinkSingleBuilder::new( self.destination_port, chain_name, self.connect_timeout_ms, self.read_timeout, + tls, ))) } } @@ -45,6 +50,7 @@ pub struct KafkaSinkSingleBuilder { address_port: u16, connect_timeout: Duration, read_timeout: Option, + tls: Option, } impl KafkaSinkSingleBuilder { @@ -53,6 +59,7 @@ impl KafkaSinkSingleBuilder { _chain_name: String, connect_timeout_ms: u64, timeout: Option, + tls: Option, ) -> KafkaSinkSingleBuilder { let receive_timeout = timeout.map(Duration::from_secs); @@ -60,6 +67,7 @@ impl KafkaSinkSingleBuilder { address_port, connect_timeout: Duration::from_millis(connect_timeout_ms), read_timeout: receive_timeout, + tls, } } } @@ -71,6 +79,7 @@ impl TransformBuilder for KafkaSinkSingleBuilder { address_port: self.address_port, pushed_messages_tx: None, connect_timeout: self.connect_timeout, + tls: self.tls.clone(), read_timeout: self.read_timeout, }) } @@ -90,6 +99,7 @@ pub struct KafkaSinkSingle { pushed_messages_tx: Option>, connect_timeout: Duration, read_timeout: Option, + tls: Option, } #[async_trait] @@ -101,13 +111,24 @@ impl Transform for KafkaSinkSingle { async fn transform<'a>(&'a mut self, mut requests_wrapper: Wrapper<'a>) -> Result { if self.outbound.is_none() { let codec = KafkaCodecBuilder::new(Direction::Sink, "KafkaSinkSingle".to_owned()); - let tcp_stream = tcp::tcp_stream( - self.connect_timeout, - (requests_wrapper.local_addr.ip(), self.address_port), - ) - .await?; - let (rx, tx) = tcp_stream.into_split(); - self.outbound = Some(spawn_read_write_tasks(&codec, rx, tx)); + if let Some(tls) = self.tls.as_mut() { + let tls_stream = tls + .connect( + self.connect_timeout, + (requests_wrapper.local_addr.ip(), self.address_port), + ) + .await?; + let (rx, tx) = split(tls_stream); + self.outbound = Some(spawn_read_write_tasks(&codec, rx, tx)); + } else { + let tcp_stream = tcp::tcp_stream( + self.connect_timeout, + (requests_wrapper.local_addr.ip(), self.address_port), + ) + .await?; + let (rx, tx) = tcp_stream.into_split(); + self.outbound = Some(spawn_read_write_tasks(&codec, rx, tx)); + } } // Rewrite requests to use kafkas port instead of shotovers port diff --git a/test-helpers/src/cert.rs b/test-helpers/src/cert.rs index d189767e6..0b275065a 100644 --- a/test-helpers/src/cert.rs +++ b/test-helpers/src/cert.rs @@ -92,3 +92,55 @@ pub fn generate_cassandra_test_certs() { pub fn generate_redis_test_certs() { generate_test_certs(Path::new("tests/test-configs/redis/tls/certs")); } + +pub fn generate_kafka_test_certs() { + let path = Path::new("tests/test-configs/kafka/tls/certs"); + generate_test_certs(path); + std::fs::remove_file(path.join("kafka.keystore.p12")).ok(); + std::fs::remove_file(path.join("kafka.keystore.jks")).ok(); + std::fs::remove_file(path.join("kafka.truststore.jks")).ok(); + run_command( + "openssl", + &[ + "pkcs12", + "-export", + "-out", + path.join("kafka.keystore.p12").to_str().unwrap(), + "-inkey", + path.join("localhost.key").to_str().unwrap(), + "-in", + path.join("localhost.crt").to_str().unwrap(), + "-passout", + "pass:password", + ], + ) + .unwrap(); + + run_command( + "keytool", + &[ + "-importkeystore", + "-srckeystore", + path.join("kafka.keystore.p12").to_str().unwrap(), + "-srcstoretype", + "pkcs12", + "-destkeystore", + path.join("kafka.keystore.jks").to_str().unwrap(), + "-deststoretype", + "JKS", + "-storepass", + "password", + "-srcstorepass", + "password", + ], + ) + .unwrap(); + + // Bitnami or kafka insists on having a truststore, but I dont think it actually uses it at all since client auth is disabled. + // So instead lets just give it a truststore shaped file. + std::fs::copy( + path.join("kafka.keystore.jks").to_str().unwrap(), + path.join("kafka.truststore.jks").to_str().unwrap(), + ) + .unwrap(); +} diff --git a/test-helpers/src/docker_compose.rs b/test-helpers/src/docker_compose.rs index ca8354cdf..613f02c6d 100644 --- a/test-helpers/src/docker_compose.rs +++ b/test-helpers/src/docker_compose.rs @@ -27,7 +27,7 @@ pub fn new_moto() -> DockerCompose { docker_compose("tests/transforms/docker-compose-moto.yaml") } -pub static IMAGE_WAITERS: [Image; 11] = [ +pub static IMAGE_WAITERS: [Image; 12] = [ Image { name: "motoserver/moto", log_regex_to_wait_for: r"Press CTRL\+C to quit", @@ -70,6 +70,11 @@ pub static IMAGE_WAITERS: [Image; 11] = [ log_regex_to_wait_for: r"Startup complete", timeout: Duration::from_secs(120), }, + Image { + name: "bitnami/kafka:3.6.1-debian-11-r24", + log_regex_to_wait_for: r"Kafka Server started", + timeout: Duration::from_secs(120), + }, Image { name: "bitnami/kafka:3.4.0-debian-11-r22", log_regex_to_wait_for: r"Kafka Server started", From 1e3b1d244a59b1d02524104876321fe0177643cd Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Wed, 21 Feb 2024 14:18:31 +1100 Subject: [PATCH 2/2] Update kafka image to 3.6.1 (#1487) --- .../benches/windsock/kafka/bench.rs | 22 ++++++++++++++++--- .../kafka/bench/docker-compose.yaml | 14 ++++++++---- .../kafka/cluster/docker-compose.yaml | 16 +++++++++----- .../kafka/passthrough/docker-compose.yaml | 14 ++++++++---- test-helpers/src/docker_compose.rs | 7 +----- 5 files changed, 50 insertions(+), 23 deletions(-) diff --git a/shotover-proxy/benches/windsock/kafka/bench.rs b/shotover-proxy/benches/windsock/kafka/bench.rs index 770b5a807..537eb5267 100644 --- a/shotover-proxy/benches/windsock/kafka/bench.rs +++ b/shotover-proxy/benches/windsock/kafka/bench.rs @@ -127,16 +127,32 @@ impl KafkaBench { tasks.push(tokio::spawn(async move { node.run_container( - "bitnami/kafka:3.4.0-debian-11-r22", + "bitnami/kafka:3.6.1-debian-11-r24", &[ ("ALLOW_PLAINTEXT_LISTENER".to_owned(), "yes".to_owned()), ( "KAFKA_CFG_ADVERTISED_LISTENERS".to_owned(), - format!("PLAINTEXT://{ip}:{port}"), + format!("BROKER://{ip}:{port}"), ), ( "KAFKA_CFG_LISTENERS".to_owned(), - format!("PLAINTEXT://:{port},CONTROLLER://:9093"), + format!("BROKER://:{port},CONTROLLER://:9093"), + ), + ( + "KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP".to_owned(), + "CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT".to_owned(), + ), + ( + "KAFKA_CFG_INTER_BROKER_LISTENER_NAME".to_owned(), + "BROKER".to_owned(), + ), + ( + "KAFKA_CFG_CONTROLLER_LISTENER_NAMES".to_owned(), + "CONTROLLER".to_owned(), + ), + ( + "KAFKA_CFG_PROCESS_ROLES".to_owned(), + "controller,broker".to_owned(), ), ( "KAFKA_HEAP_OPTS".to_owned(), diff --git a/shotover-proxy/tests/test-configs/kafka/bench/docker-compose.yaml b/shotover-proxy/tests/test-configs/kafka/bench/docker-compose.yaml index c33b3a8e7..ced9c05ab 100644 --- a/shotover-proxy/tests/test-configs/kafka/bench/docker-compose.yaml +++ b/shotover-proxy/tests/test-configs/kafka/bench/docker-compose.yaml @@ -1,10 +1,16 @@ version: "3" services: - kafka: - image: 'bitnami/kafka:3.4.0-debian-11-r22' + kafka0: + image: 'bitnami/kafka:3.6.1-debian-11-r24' ports: - '9192:9192' environment: - - KAFKA_CFG_LISTENERS=PLAINTEXT://:9192,CONTROLLER://:9093 - - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9192 + - KAFKA_CFG_LISTENERS=BROKER://:9192,CONTROLLER://:9093 + - KAFKA_CFG_ADVERTISED_LISTENERS=BROKER://127.0.0.1:9192 + - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT + - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=BROKER + - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER + - KAFKA_CFG_PROCESS_ROLES=controller,broker + - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka0:9093 + - KAFKA_CFG_NODE_ID=0 - ALLOW_PLAINTEXT_LISTENER=yes diff --git a/shotover-proxy/tests/test-configs/kafka/cluster/docker-compose.yaml b/shotover-proxy/tests/test-configs/kafka/cluster/docker-compose.yaml index b7a3fe4a7..f0bf6f799 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster/docker-compose.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster/docker-compose.yaml @@ -10,17 +10,21 @@ networks: gateway: 172.16.1.1 services: kafka0: - image: &image 'bitnami/kafka:3.4.0-debian-11-r22' + image: &image 'bitnami/kafka:3.6.1-debian-11-r24' networks: cluster_subnet: ipv4_address: 172.16.1.2 environment: &environment - KAFKA_CFG_LISTENERS: "PLAINTEXT://:9092,CONTROLLER://:9093" - KAFKA_CFG_ADVERTISED_LISTENERS: "PLAINTEXT://172.16.1.2:9092" - ALLOW_PLAINTEXT_LISTENER: "yes" + KAFKA_CFG_LISTENERS: "BROKER://:9092,CONTROLLER://:9093" + KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.2:9092" + KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT" + KAFKA_CFG_INTER_BROKER_LISTENER_NAME: "BROKER" + KAFKA_CFG_CONTROLLER_LISTENER_NAMES: "CONTROLLER" + KAFKA_CFG_PROCESS_ROLES: "controller,broker" KAFKA_KRAFT_CLUSTER_ID: "abcdefghijklmnopqrstuv" KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: "0@kafka0:9093,1@kafka1:9093,2@kafka2:9093" KAFKA_CFG_NODE_ID: 0 + ALLOW_PLAINTEXT_LISTENER: "yes" volumes: &volumes - type: tmpfs target: /bitnami/kafka @@ -31,7 +35,7 @@ services: ipv4_address: 172.16.1.3 environment: <<: *environment - KAFKA_CFG_ADVERTISED_LISTENERS: "PLAINTEXT://172.16.1.3:9092" + KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.3:9092" KAFKA_CFG_NODE_ID: 1 volumes: *volumes kafka2: @@ -41,6 +45,6 @@ services: ipv4_address: 172.16.1.4 environment: <<: *environment - KAFKA_CFG_ADVERTISED_LISTENERS: "PLAINTEXT://172.16.1.4:9092" + KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.4:9092" KAFKA_CFG_NODE_ID: 2 volumes: *volumes diff --git a/shotover-proxy/tests/test-configs/kafka/passthrough/docker-compose.yaml b/shotover-proxy/tests/test-configs/kafka/passthrough/docker-compose.yaml index 6330ef5e9..62707d6c9 100644 --- a/shotover-proxy/tests/test-configs/kafka/passthrough/docker-compose.yaml +++ b/shotover-proxy/tests/test-configs/kafka/passthrough/docker-compose.yaml @@ -1,12 +1,18 @@ version: "3" services: - kafka: - image: 'bitnami/kafka:3.4.0-debian-11-r22' + kafka0: + image: 'bitnami/kafka:3.6.1-debian-11-r24' ports: - '9092:9092' environment: - - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 - - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 + - KAFKA_CFG_LISTENERS=BROKER://:9092,CONTROLLER://:9093 + - KAFKA_CFG_ADVERTISED_LISTENERS=BROKER://127.0.0.1:9092 + - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT + - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=BROKER + - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER + - KAFKA_CFG_PROCESS_ROLES=controller,broker + - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka0:9093 + - KAFKA_CFG_NODE_ID=0 - ALLOW_PLAINTEXT_LISTENER=yes volumes: - type: tmpfs diff --git a/test-helpers/src/docker_compose.rs b/test-helpers/src/docker_compose.rs index 613f02c6d..fb6a0f747 100644 --- a/test-helpers/src/docker_compose.rs +++ b/test-helpers/src/docker_compose.rs @@ -27,7 +27,7 @@ pub fn new_moto() -> DockerCompose { docker_compose("tests/transforms/docker-compose-moto.yaml") } -pub static IMAGE_WAITERS: [Image; 12] = [ +pub static IMAGE_WAITERS: [Image; 11] = [ Image { name: "motoserver/moto", log_regex_to_wait_for: r"Press CTRL\+C to quit", @@ -75,11 +75,6 @@ pub static IMAGE_WAITERS: [Image; 12] = [ log_regex_to_wait_for: r"Kafka Server started", timeout: Duration::from_secs(120), }, - Image { - name: "bitnami/kafka:3.4.0-debian-11-r22", - log_regex_to_wait_for: r"Kafka Server started", - timeout: Duration::from_secs(120), - }, Image { name: "opensearchproject/opensearch:2.9.0", log_regex_to_wait_for: r"Node started",