Skip to content

Commit

Permalink
KafkaSinkSingle SASL authentication tests (#1478)
Browse files Browse the repository at this point in the history
Co-authored-by: Lucas Kent <[email protected]>
  • Loading branch information
conorbros and rukai authored Feb 22, 2024
1 parent 795e391 commit 9f7fca4
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 0 deletions.
29 changes: 29 additions & 0 deletions shotover-proxy/tests/kafka_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,35 @@ async fn passthrough_encode() {
shotover.shutdown_and_then_consume_events(&[]).await;
}

#[cfg(feature = "rdkafka-driver-tests")]
#[tokio::test]
async fn passthrough_sasl() {
let _docker_compose =
docker_compose("tests/test-configs/kafka/passthrough-sasl/docker-compose.yaml");
let shotover = shotover_process("tests/test-configs/kafka/passthrough-sasl/topology.yaml")
.start()
.await;

test_cases::basic_sasl("127.0.0.1:9192").await;

shotover.shutdown_and_then_consume_events(&[]).await;
}

#[cfg(feature = "rdkafka-driver-tests")]
#[tokio::test]
async fn passthrough_sasl_encode() {
let _docker_compose =
docker_compose("tests/test-configs/kafka/passthrough-sasl/docker-compose.yaml");
let shotover =
shotover_process("tests/test-configs/kafka/passthrough-sasl/topology-encode.yaml")
.start()
.await;

test_cases::basic_sasl("127.0.0.1:9192").await;

shotover.shutdown_and_then_consume_events(&[]).await;
}

#[cfg(feature = "rdkafka-driver-tests")]
#[tokio::test]
async fn cluster_single_shotover() {
Expand Down
19 changes: 19 additions & 0 deletions shotover-proxy/tests/kafka_int_tests/test_cases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,3 +308,22 @@ pub async fn basic(address: &str) {
}
admin_cleanup(client.clone()).await;
}

pub async fn basic_sasl(address: &str) {
let mut client = ClientConfig::new();
client
.set("bootstrap.servers", address)
.set("sasl.mechanisms", "PLAIN")
.set("sasl.username", "user")
.set("sasl.password", "password")
.set("security.protocol", "SASL_PLAINTEXT")
// internal driver debug logs are emitted to tokio tracing, assuming the appropriate filter is used by the tracing subscriber
.set("debug", "all");
admin(client.clone()).await;
for i in 0..2 {
produce_consume(client.clone(), "partitions1", i).await;
produce_consume(client.clone(), "partitions3", i).await;
produce_consume_acks0(client.clone()).await;
}
admin_cleanup(client.clone()).await;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
version: '2'

services:
kafka:
image: 'bitnami/kafka:3.6.1-debian-11-r24'
ports:
- '9092:9092'
- '9093:9093'
environment:
- KAFKA_CFG_NODE_ID=0
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
- KAFKA_CFG_LISTENERS=SASL_PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:SASL_PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT
- KAFKA_CFG_ADVERTISED_LISTENERS=SASL_PLAINTEXT://127.0.0.1:9092
- KAFKA_CLIENT_USERS=user
- KAFKA_CLIENT_PASSWORDS=password
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL=PLAIN
- KAFKA_CONTROLLER_USER=controller_user
- KAFKA_CONTROLLER_PASSWORD=controller_password
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=SASL_PLAINTEXT
- KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN
- KAFKA_INTER_BROKER_USER=controller_user
- KAFKA_INTER_BROKER_PASSWORD=controller_password

Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
---
sources:
- Kafka:
name: "kafka"
listen_addr: "127.0.0.1:9192"
chain:
- DebugForceEncode:
encode_requests: true
encode_responses: true
- KafkaSinkSingle:
destination_port: 9092
connect_timeout_ms: 3000
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
sources:
- Kafka:
name: "kafka"
listen_addr: "127.0.0.1:9192"
chain:
- KafkaSinkSingle:
destination_port: 9092
connect_timeout_ms: 3000

0 comments on commit 9f7fca4

Please sign in to comment.