Skip to content

Commit

Permalink
Merge branch 'main' into Introduce-TransformContextConfig
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Feb 22, 2024
2 parents ece3e46 + 9f7fca4 commit 5b3baf2
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 1 deletion.
29 changes: 29 additions & 0 deletions shotover-proxy/tests/kafka_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,35 @@ async fn passthrough_encode() {
shotover.shutdown_and_then_consume_events(&[]).await;
}

#[cfg(feature = "rdkafka-driver-tests")]
#[tokio::test]
async fn passthrough_sasl() {
let _docker_compose =
docker_compose("tests/test-configs/kafka/passthrough-sasl/docker-compose.yaml");
let shotover = shotover_process("tests/test-configs/kafka/passthrough-sasl/topology.yaml")
.start()
.await;

test_cases::basic_sasl("127.0.0.1:9192").await;

shotover.shutdown_and_then_consume_events(&[]).await;
}

#[cfg(feature = "rdkafka-driver-tests")]
#[tokio::test]
async fn passthrough_sasl_encode() {
let _docker_compose =
docker_compose("tests/test-configs/kafka/passthrough-sasl/docker-compose.yaml");
let shotover =
shotover_process("tests/test-configs/kafka/passthrough-sasl/topology-encode.yaml")
.start()
.await;

test_cases::basic_sasl("127.0.0.1:9192").await;

shotover.shutdown_and_then_consume_events(&[]).await;
}

#[cfg(feature = "rdkafka-driver-tests")]
#[tokio::test]
async fn cluster_single_shotover() {
Expand Down
19 changes: 19 additions & 0 deletions shotover-proxy/tests/kafka_int_tests/test_cases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,3 +308,22 @@ pub async fn basic(address: &str) {
}
admin_cleanup(client.clone()).await;
}

pub async fn basic_sasl(address: &str) {
let mut client = ClientConfig::new();
client
.set("bootstrap.servers", address)
.set("sasl.mechanisms", "PLAIN")
.set("sasl.username", "user")
.set("sasl.password", "password")
.set("security.protocol", "SASL_PLAINTEXT")
// internal driver debug logs are emitted to tokio tracing, assuming the appropriate filter is used by the tracing subscriber
.set("debug", "all");
admin(client.clone()).await;
for i in 0..2 {
produce_consume(client.clone(), "partitions1", i).await;
produce_consume(client.clone(), "partitions3", i).await;
produce_consume_acks0(client.clone()).await;
}
admin_cleanup(client.clone()).await;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
version: '2'

services:
kafka:
image: 'bitnami/kafka:3.6.1-debian-11-r24'
ports:
- '9092:9092'
- '9093:9093'
environment:
- KAFKA_CFG_NODE_ID=0
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
- KAFKA_CFG_LISTENERS=SASL_PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:SASL_PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT
- KAFKA_CFG_ADVERTISED_LISTENERS=SASL_PLAINTEXT://127.0.0.1:9092
- KAFKA_CLIENT_USERS=user
- KAFKA_CLIENT_PASSWORDS=password
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL=PLAIN
- KAFKA_CONTROLLER_USER=controller_user
- KAFKA_CONTROLLER_PASSWORD=controller_password
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=SASL_PLAINTEXT
- KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN
- KAFKA_INTER_BROKER_USER=controller_user
- KAFKA_INTER_BROKER_PASSWORD=controller_password

Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
---
sources:
- Kafka:
name: "kafka"
listen_addr: "127.0.0.1:9192"
chain:
- DebugForceEncode:
encode_requests: true
encode_responses: true
- KafkaSinkSingle:
destination_port: 9092
connect_timeout_ms: 3000
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
sources:
- Kafka:
name: "kafka"
listen_addr: "127.0.0.1:9192"
chain:
- KafkaSinkSingle:
destination_port: 9092
connect_timeout_ms: 3000
30 changes: 29 additions & 1 deletion shotover/src/frame/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ use kafka_protocol::messages::{
FindCoordinatorResponse, HeartbeatRequest, HeartbeatResponse, JoinGroupRequest,
JoinGroupResponse, LeaderAndIsrRequest, ListOffsetsRequest, ListOffsetsResponse,
MetadataRequest, MetadataResponse, OffsetFetchRequest, OffsetFetchResponse, ProduceRequest,
ProduceResponse, RequestHeader, ResponseHeader, SyncGroupRequest, SyncGroupResponse,
ProduceResponse, RequestHeader, ResponseHeader, SaslAuthenticateRequest,
SaslAuthenticateResponse, SaslHandshakeRequest, SaslHandshakeResponse, SyncGroupRequest,
SyncGroupResponse,
};
use kafka_protocol::protocol::{Decodable, Encodable, HeaderVersion, StrBytes};
use std::fmt::{Display, Formatter, Result as FmtResult};
Expand Down Expand Up @@ -86,6 +88,8 @@ pub enum RequestBody {
DeleteTopics(DeleteTopicsRequest),
DeleteGroups(DeleteGroupsRequest),
DescribeConfigs(DescribeConfigsRequest),
SaslAuthenticateRequest(SaslAuthenticateRequest),
SaslHandshakeRequest(SaslHandshakeRequest),
Unknown { api_key: ApiKey, message: Bytes },
}

Expand All @@ -101,6 +105,8 @@ pub enum ResponseBody {
Metadata(MetadataResponse),
DescribeCluster(DescribeClusterResponse),
Heartbeat(HeartbeatResponse),
SaslAuthenticateResponse(SaslAuthenticateResponse),
SaslHandshakeResponse(SaslHandshakeResponse),
Unknown { api_key: ApiKey, message: Bytes },
}

Expand All @@ -117,6 +123,12 @@ impl ResponseBody {
ResponseBody::Metadata(_) => MetadataResponse::header_version(version),
ResponseBody::DescribeCluster(_) => DescribeClusterResponse::header_version(version),
ResponseBody::Heartbeat(_) => HeartbeatResponse::header_version(version),
ResponseBody::SaslAuthenticateResponse(_) => {
SaslAuthenticateResponse::header_version(version)
}
ResponseBody::SaslHandshakeResponse(_) => {
SaslHandshakeResponse::header_version(version)
}
ResponseBody::Unknown { api_key, .. } => api_key.response_header_version(version),
}
}
Expand Down Expand Up @@ -166,6 +178,12 @@ impl KafkaFrame {
ApiKey::DescribeConfigsKey => {
RequestBody::DescribeConfigs(decode(&mut bytes, version)?)
}
ApiKey::SaslAuthenticateKey => {
RequestBody::SaslAuthenticateRequest(decode(&mut bytes, version)?)
}
ApiKey::SaslHandshakeKey => {
RequestBody::SaslHandshakeRequest(decode(&mut bytes, version)?)
}
api_key => RequestBody::Unknown {
api_key,
message: bytes,
Expand Down Expand Up @@ -200,6 +218,12 @@ impl KafkaFrame {
ResponseBody::DescribeCluster(decode(&mut bytes, version)?)
}
ApiKey::HeartbeatKey => ResponseBody::Heartbeat(decode(&mut bytes, version)?),
ApiKey::SaslAuthenticateKey => {
ResponseBody::SaslAuthenticateResponse(decode(&mut bytes, version)?)
}
ApiKey::SaslHandshakeKey => {
ResponseBody::SaslHandshakeResponse(decode(&mut bytes, version)?)
}
api_key => ResponseBody::Unknown {
api_key,
message: bytes,
Expand Down Expand Up @@ -243,6 +267,8 @@ impl KafkaFrame {
RequestBody::DeleteTopics(x) => encode(x, bytes, version)?,
RequestBody::DeleteGroups(x) => encode(x, bytes, version)?,
RequestBody::DescribeConfigs(x) => encode(x, bytes, version)?,
RequestBody::SaslAuthenticateRequest(x) => encode(x, bytes, version)?,
RequestBody::SaslHandshakeRequest(x) => encode(x, bytes, version)?,
RequestBody::Unknown { message, .. } => bytes.extend_from_slice(&message),
}
}
Expand All @@ -263,6 +289,8 @@ impl KafkaFrame {
ResponseBody::Metadata(x) => encode(x, bytes, version)?,
ResponseBody::DescribeCluster(x) => encode(x, bytes, version)?,
ResponseBody::Heartbeat(x) => encode(x, bytes, version)?,
ResponseBody::SaslAuthenticateResponse(x) => encode(x, bytes, version)?,
ResponseBody::SaslHandshakeResponse(x) => encode(x, bytes, version)?,
ResponseBody::Unknown { message, .. } => bytes.extend_from_slice(&message),
}
}
Expand Down

0 comments on commit 5b3baf2

Please sign in to comment.