diff --git a/shotover-proxy/tests/kafka_int_tests/mod.rs b/shotover-proxy/tests/kafka_int_tests/mod.rs index 0195e449e..75e19c997 100644 --- a/shotover-proxy/tests/kafka_int_tests/mod.rs +++ b/shotover-proxy/tests/kafka_int_tests/mod.rs @@ -62,6 +62,35 @@ async fn passthrough_encode() { shotover.shutdown_and_then_consume_events(&[]).await; } +#[cfg(feature = "rdkafka-driver-tests")] +#[tokio::test] +async fn passthrough_sasl() { + let _docker_compose = + docker_compose("tests/test-configs/kafka/passthrough-sasl/docker-compose.yaml"); + let shotover = shotover_process("tests/test-configs/kafka/passthrough-sasl/topology.yaml") + .start() + .await; + + test_cases::basic_sasl("127.0.0.1:9192").await; + + shotover.shutdown_and_then_consume_events(&[]).await; +} + +#[cfg(feature = "rdkafka-driver-tests")] +#[tokio::test] +async fn passthrough_sasl_encode() { + let _docker_compose = + docker_compose("tests/test-configs/kafka/passthrough-sasl/docker-compose.yaml"); + let shotover = + shotover_process("tests/test-configs/kafka/passthrough-sasl/topology-encode.yaml") + .start() + .await; + + test_cases::basic_sasl("127.0.0.1:9192").await; + + shotover.shutdown_and_then_consume_events(&[]).await; +} + #[cfg(feature = "rdkafka-driver-tests")] #[tokio::test] async fn cluster_single_shotover() { diff --git a/shotover-proxy/tests/kafka_int_tests/test_cases.rs b/shotover-proxy/tests/kafka_int_tests/test_cases.rs index 77a428455..a7d6be1cf 100644 --- a/shotover-proxy/tests/kafka_int_tests/test_cases.rs +++ b/shotover-proxy/tests/kafka_int_tests/test_cases.rs @@ -308,3 +308,22 @@ pub async fn basic(address: &str) { } admin_cleanup(client.clone()).await; } + +pub async fn basic_sasl(address: &str) { + let mut client = ClientConfig::new(); + client + .set("bootstrap.servers", address) + .set("sasl.mechanisms", "PLAIN") + .set("sasl.username", "user") + .set("sasl.password", "password") + .set("security.protocol", "SASL_PLAINTEXT") + // internal driver debug logs are emitted to tokio tracing, assuming the appropriate filter is used by the tracing subscriber + .set("debug", "all"); + admin(client.clone()).await; + for i in 0..2 { + produce_consume(client.clone(), "partitions1", i).await; + produce_consume(client.clone(), "partitions3", i).await; + produce_consume_acks0(client.clone()).await; + } + admin_cleanup(client.clone()).await; +} diff --git a/shotover-proxy/tests/test-configs/kafka/passthrough-sasl/docker-compose.yaml b/shotover-proxy/tests/test-configs/kafka/passthrough-sasl/docker-compose.yaml new file mode 100644 index 000000000..63c802715 --- /dev/null +++ b/shotover-proxy/tests/test-configs/kafka/passthrough-sasl/docker-compose.yaml @@ -0,0 +1,26 @@ +version: '2' + +services: + kafka: + image: 'bitnami/kafka:3.6.1-debian-11-r24' + ports: + - '9092:9092' + - '9093:9093' + environment: + - KAFKA_CFG_NODE_ID=0 + - KAFKA_CFG_PROCESS_ROLES=controller,broker + - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093 + - KAFKA_CFG_LISTENERS=SASL_PLAINTEXT://:9092,CONTROLLER://:9093 + - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:SASL_PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT + - KAFKA_CFG_ADVERTISED_LISTENERS=SASL_PLAINTEXT://127.0.0.1:9092 + - KAFKA_CLIENT_USERS=user + - KAFKA_CLIENT_PASSWORDS=password + - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER + - KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL=PLAIN + - KAFKA_CONTROLLER_USER=controller_user + - KAFKA_CONTROLLER_PASSWORD=controller_password + - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=SASL_PLAINTEXT + - KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN + - KAFKA_INTER_BROKER_USER=controller_user + - KAFKA_INTER_BROKER_PASSWORD=controller_password + diff --git a/shotover-proxy/tests/test-configs/kafka/passthrough-sasl/topology-encode.yaml b/shotover-proxy/tests/test-configs/kafka/passthrough-sasl/topology-encode.yaml new file mode 100644 index 000000000..d2a16eefc --- /dev/null +++ b/shotover-proxy/tests/test-configs/kafka/passthrough-sasl/topology-encode.yaml @@ -0,0 +1,12 @@ +--- +sources: + - Kafka: + name: "kafka" + listen_addr: "127.0.0.1:9192" + chain: + - DebugForceEncode: + encode_requests: true + encode_responses: true + - KafkaSinkSingle: + destination_port: 9092 + connect_timeout_ms: 3000 diff --git a/shotover-proxy/tests/test-configs/kafka/passthrough-sasl/topology.yaml b/shotover-proxy/tests/test-configs/kafka/passthrough-sasl/topology.yaml new file mode 100644 index 000000000..3d6131156 --- /dev/null +++ b/shotover-proxy/tests/test-configs/kafka/passthrough-sasl/topology.yaml @@ -0,0 +1,9 @@ +--- +sources: + - Kafka: + name: "kafka" + listen_addr: "127.0.0.1:9192" + chain: + - KafkaSinkSingle: + destination_port: 9092 + connect_timeout_ms: 3000