From 795e391565b9a90e98e458f90b758763d4420221 Mon Sep 17 00:00:00 2001 From: Conor Date: Thu, 22 Feb 2024 21:55:56 +1100 Subject: [PATCH 1/2] Kafka SASL messages (#1494) --- shotover/src/frame/kafka.rs | 30 +++++++++++++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/shotover/src/frame/kafka.rs b/shotover/src/frame/kafka.rs index 7f77421f5..9ccde6958 100644 --- a/shotover/src/frame/kafka.rs +++ b/shotover/src/frame/kafka.rs @@ -7,7 +7,9 @@ use kafka_protocol::messages::{ FindCoordinatorResponse, HeartbeatRequest, HeartbeatResponse, JoinGroupRequest, JoinGroupResponse, LeaderAndIsrRequest, ListOffsetsRequest, ListOffsetsResponse, MetadataRequest, MetadataResponse, OffsetFetchRequest, OffsetFetchResponse, ProduceRequest, - ProduceResponse, RequestHeader, ResponseHeader, SyncGroupRequest, SyncGroupResponse, + ProduceResponse, RequestHeader, ResponseHeader, SaslAuthenticateRequest, + SaslAuthenticateResponse, SaslHandshakeRequest, SaslHandshakeResponse, SyncGroupRequest, + SyncGroupResponse, }; use kafka_protocol::protocol::{Decodable, Encodable, HeaderVersion, StrBytes}; use std::fmt::{Display, Formatter, Result as FmtResult}; @@ -86,6 +88,8 @@ pub enum RequestBody { DeleteTopics(DeleteTopicsRequest), DeleteGroups(DeleteGroupsRequest), DescribeConfigs(DescribeConfigsRequest), + SaslAuthenticateRequest(SaslAuthenticateRequest), + SaslHandshakeRequest(SaslHandshakeRequest), Unknown { api_key: ApiKey, message: Bytes }, } @@ -101,6 +105,8 @@ pub enum ResponseBody { Metadata(MetadataResponse), DescribeCluster(DescribeClusterResponse), Heartbeat(HeartbeatResponse), + SaslAuthenticateResponse(SaslAuthenticateResponse), + SaslHandshakeResponse(SaslHandshakeResponse), Unknown { api_key: ApiKey, message: Bytes }, } @@ -117,6 +123,12 @@ impl ResponseBody { ResponseBody::Metadata(_) => MetadataResponse::header_version(version), ResponseBody::DescribeCluster(_) => DescribeClusterResponse::header_version(version), ResponseBody::Heartbeat(_) => HeartbeatResponse::header_version(version), + ResponseBody::SaslAuthenticateResponse(_) => { + SaslAuthenticateResponse::header_version(version) + } + ResponseBody::SaslHandshakeResponse(_) => { + SaslHandshakeResponse::header_version(version) + } ResponseBody::Unknown { api_key, .. } => api_key.response_header_version(version), } } @@ -166,6 +178,12 @@ impl KafkaFrame { ApiKey::DescribeConfigsKey => { RequestBody::DescribeConfigs(decode(&mut bytes, version)?) } + ApiKey::SaslAuthenticateKey => { + RequestBody::SaslAuthenticateRequest(decode(&mut bytes, version)?) + } + ApiKey::SaslHandshakeKey => { + RequestBody::SaslHandshakeRequest(decode(&mut bytes, version)?) + } api_key => RequestBody::Unknown { api_key, message: bytes, @@ -200,6 +218,12 @@ impl KafkaFrame { ResponseBody::DescribeCluster(decode(&mut bytes, version)?) } ApiKey::HeartbeatKey => ResponseBody::Heartbeat(decode(&mut bytes, version)?), + ApiKey::SaslAuthenticateKey => { + ResponseBody::SaslAuthenticateResponse(decode(&mut bytes, version)?) + } + ApiKey::SaslHandshakeKey => { + ResponseBody::SaslHandshakeResponse(decode(&mut bytes, version)?) + } api_key => ResponseBody::Unknown { api_key, message: bytes, @@ -243,6 +267,8 @@ impl KafkaFrame { RequestBody::DeleteTopics(x) => encode(x, bytes, version)?, RequestBody::DeleteGroups(x) => encode(x, bytes, version)?, RequestBody::DescribeConfigs(x) => encode(x, bytes, version)?, + RequestBody::SaslAuthenticateRequest(x) => encode(x, bytes, version)?, + RequestBody::SaslHandshakeRequest(x) => encode(x, bytes, version)?, RequestBody::Unknown { message, .. } => bytes.extend_from_slice(&message), } } @@ -263,6 +289,8 @@ impl KafkaFrame { ResponseBody::Metadata(x) => encode(x, bytes, version)?, ResponseBody::DescribeCluster(x) => encode(x, bytes, version)?, ResponseBody::Heartbeat(x) => encode(x, bytes, version)?, + ResponseBody::SaslAuthenticateResponse(x) => encode(x, bytes, version)?, + ResponseBody::SaslHandshakeResponse(x) => encode(x, bytes, version)?, ResponseBody::Unknown { message, .. } => bytes.extend_from_slice(&message), } } From 9f7fca4f31afc0d8b914a71957f36a9a19cf4ed7 Mon Sep 17 00:00:00 2001 From: Conor Date: Thu, 22 Feb 2024 22:23:27 +1100 Subject: [PATCH 2/2] KafkaSinkSingle SASL authentication tests (#1478) Co-authored-by: Lucas Kent --- shotover-proxy/tests/kafka_int_tests/mod.rs | 29 +++++++++++++++++++ .../tests/kafka_int_tests/test_cases.rs | 19 ++++++++++++ .../passthrough-sasl/docker-compose.yaml | 26 +++++++++++++++++ .../passthrough-sasl/topology-encode.yaml | 12 ++++++++ .../kafka/passthrough-sasl/topology.yaml | 9 ++++++ 5 files changed, 95 insertions(+) create mode 100644 shotover-proxy/tests/test-configs/kafka/passthrough-sasl/docker-compose.yaml create mode 100644 shotover-proxy/tests/test-configs/kafka/passthrough-sasl/topology-encode.yaml create mode 100644 shotover-proxy/tests/test-configs/kafka/passthrough-sasl/topology.yaml diff --git a/shotover-proxy/tests/kafka_int_tests/mod.rs b/shotover-proxy/tests/kafka_int_tests/mod.rs index 0195e449e..75e19c997 100644 --- a/shotover-proxy/tests/kafka_int_tests/mod.rs +++ b/shotover-proxy/tests/kafka_int_tests/mod.rs @@ -62,6 +62,35 @@ async fn passthrough_encode() { shotover.shutdown_and_then_consume_events(&[]).await; } +#[cfg(feature = "rdkafka-driver-tests")] +#[tokio::test] +async fn passthrough_sasl() { + let _docker_compose = + docker_compose("tests/test-configs/kafka/passthrough-sasl/docker-compose.yaml"); + let shotover = shotover_process("tests/test-configs/kafka/passthrough-sasl/topology.yaml") + .start() + .await; + + test_cases::basic_sasl("127.0.0.1:9192").await; + + shotover.shutdown_and_then_consume_events(&[]).await; +} + +#[cfg(feature = "rdkafka-driver-tests")] +#[tokio::test] +async fn passthrough_sasl_encode() { + let _docker_compose = + docker_compose("tests/test-configs/kafka/passthrough-sasl/docker-compose.yaml"); + let shotover = + shotover_process("tests/test-configs/kafka/passthrough-sasl/topology-encode.yaml") + .start() + .await; + + test_cases::basic_sasl("127.0.0.1:9192").await; + + shotover.shutdown_and_then_consume_events(&[]).await; +} + #[cfg(feature = "rdkafka-driver-tests")] #[tokio::test] async fn cluster_single_shotover() { diff --git a/shotover-proxy/tests/kafka_int_tests/test_cases.rs b/shotover-proxy/tests/kafka_int_tests/test_cases.rs index 77a428455..a7d6be1cf 100644 --- a/shotover-proxy/tests/kafka_int_tests/test_cases.rs +++ b/shotover-proxy/tests/kafka_int_tests/test_cases.rs @@ -308,3 +308,22 @@ pub async fn basic(address: &str) { } admin_cleanup(client.clone()).await; } + +pub async fn basic_sasl(address: &str) { + let mut client = ClientConfig::new(); + client + .set("bootstrap.servers", address) + .set("sasl.mechanisms", "PLAIN") + .set("sasl.username", "user") + .set("sasl.password", "password") + .set("security.protocol", "SASL_PLAINTEXT") + // internal driver debug logs are emitted to tokio tracing, assuming the appropriate filter is used by the tracing subscriber + .set("debug", "all"); + admin(client.clone()).await; + for i in 0..2 { + produce_consume(client.clone(), "partitions1", i).await; + produce_consume(client.clone(), "partitions3", i).await; + produce_consume_acks0(client.clone()).await; + } + admin_cleanup(client.clone()).await; +} diff --git a/shotover-proxy/tests/test-configs/kafka/passthrough-sasl/docker-compose.yaml b/shotover-proxy/tests/test-configs/kafka/passthrough-sasl/docker-compose.yaml new file mode 100644 index 000000000..63c802715 --- /dev/null +++ b/shotover-proxy/tests/test-configs/kafka/passthrough-sasl/docker-compose.yaml @@ -0,0 +1,26 @@ +version: '2' + +services: + kafka: + image: 'bitnami/kafka:3.6.1-debian-11-r24' + ports: + - '9092:9092' + - '9093:9093' + environment: + - KAFKA_CFG_NODE_ID=0 + - KAFKA_CFG_PROCESS_ROLES=controller,broker + - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093 + - KAFKA_CFG_LISTENERS=SASL_PLAINTEXT://:9092,CONTROLLER://:9093 + - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:SASL_PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT + - KAFKA_CFG_ADVERTISED_LISTENERS=SASL_PLAINTEXT://127.0.0.1:9092 + - KAFKA_CLIENT_USERS=user + - KAFKA_CLIENT_PASSWORDS=password + - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER + - KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL=PLAIN + - KAFKA_CONTROLLER_USER=controller_user + - KAFKA_CONTROLLER_PASSWORD=controller_password + - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=SASL_PLAINTEXT + - KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN + - KAFKA_INTER_BROKER_USER=controller_user + - KAFKA_INTER_BROKER_PASSWORD=controller_password + diff --git a/shotover-proxy/tests/test-configs/kafka/passthrough-sasl/topology-encode.yaml b/shotover-proxy/tests/test-configs/kafka/passthrough-sasl/topology-encode.yaml new file mode 100644 index 000000000..d2a16eefc --- /dev/null +++ b/shotover-proxy/tests/test-configs/kafka/passthrough-sasl/topology-encode.yaml @@ -0,0 +1,12 @@ +--- +sources: + - Kafka: + name: "kafka" + listen_addr: "127.0.0.1:9192" + chain: + - DebugForceEncode: + encode_requests: true + encode_responses: true + - KafkaSinkSingle: + destination_port: 9092 + connect_timeout_ms: 3000 diff --git a/shotover-proxy/tests/test-configs/kafka/passthrough-sasl/topology.yaml b/shotover-proxy/tests/test-configs/kafka/passthrough-sasl/topology.yaml new file mode 100644 index 000000000..3d6131156 --- /dev/null +++ b/shotover-proxy/tests/test-configs/kafka/passthrough-sasl/topology.yaml @@ -0,0 +1,9 @@ +--- +sources: + - Kafka: + name: "kafka" + listen_addr: "127.0.0.1:9192" + chain: + - KafkaSinkSingle: + destination_port: 9092 + connect_timeout_ms: 3000