Skip to content

Commit

Permalink
Merge branch 'main' into dummy_requests
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Feb 21, 2024
2 parents f3fbc13 + 1e3b1d2 commit 0573428
Show file tree
Hide file tree
Showing 11 changed files with 189 additions and 25 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
/.vscode
/shotover-proxy/tests/test-configs/redis/tls/certs
/shotover-proxy/tests/test-configs/cassandra/tls/certs
/shotover-proxy/tests/test-configs/kafka/tls/certs
.workspace.code-workspace
**/.DS_Store
/.project
Expand Down
23 changes: 20 additions & 3 deletions shotover-proxy/benches/windsock/kafka/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ impl KafkaBench {
destination_port: 9192,
connect_timeout_ms: 3000,
read_timeout: None,
tls: None,
}),
KafkaTopology::Cluster1 | KafkaTopology::Cluster3 => Box::new(KafkaSinkClusterConfig {
connect_timeout_ms: 3000,
Expand Down Expand Up @@ -126,16 +127,32 @@ impl KafkaBench {

tasks.push(tokio::spawn(async move {
node.run_container(
"bitnami/kafka:3.4.0-debian-11-r22",
"bitnami/kafka:3.6.1-debian-11-r24",
&[
("ALLOW_PLAINTEXT_LISTENER".to_owned(), "yes".to_owned()),
(
"KAFKA_CFG_ADVERTISED_LISTENERS".to_owned(),
format!("PLAINTEXT://{ip}:{port}"),
format!("BROKER://{ip}:{port}"),
),
(
"KAFKA_CFG_LISTENERS".to_owned(),
format!("PLAINTEXT://:{port},CONTROLLER://:9093"),
format!("BROKER://:{port},CONTROLLER://:9093"),
),
(
"KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP".to_owned(),
"CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT".to_owned(),
),
(
"KAFKA_CFG_INTER_BROKER_LISTENER_NAME".to_owned(),
"BROKER".to_owned(),
),
(
"KAFKA_CFG_CONTROLLER_LISTENER_NAMES".to_owned(),
"CONTROLLER".to_owned(),
),
(
"KAFKA_CFG_PROCESS_ROLES".to_owned(),
"controller,broker".to_owned(),
),
(
"KAFKA_HEAP_OPTS".to_owned(),
Expand Down
21 changes: 21 additions & 0 deletions shotover-proxy/tests/kafka_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,27 @@ async fn passthrough_standard() {
.expect("Shotover did not shutdown within 10s");
}

#[cfg(feature = "rdkafka-driver-tests")]
#[tokio::test]
async fn passthrough_tls() {
test_helpers::cert::generate_kafka_test_certs();

let _docker_compose =
docker_compose("tests/test-configs/kafka/passthrough-tls/docker-compose.yaml");
let shotover = shotover_process("tests/test-configs/kafka/passthrough-tls/topology.yaml")
.start()
.await;

test_cases::basic("127.0.0.1:9192").await;

tokio::time::timeout(
Duration::from_secs(10),
shotover.shutdown_and_then_consume_events(&[]),
)
.await
.expect("Shotover did not shutdown within 10s");
}

#[cfg(feature = "rdkafka-driver-tests")]
#[tokio::test]
async fn passthrough_encode() {
Expand Down
14 changes: 10 additions & 4 deletions shotover-proxy/tests/test-configs/kafka/bench/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
version: "3"
services:
kafka:
image: 'bitnami/kafka:3.4.0-debian-11-r22'
kafka0:
image: 'bitnami/kafka:3.6.1-debian-11-r24'
ports:
- '9192:9192'
environment:
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9192,CONTROLLER://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9192
- KAFKA_CFG_LISTENERS=BROKER://:9192,CONTROLLER://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=BROKER://127.0.0.1:9192
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=BROKER
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka0:9093
- KAFKA_CFG_NODE_ID=0
- ALLOW_PLAINTEXT_LISTENER=yes
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,21 @@ networks:
gateway: 172.16.1.1
services:
kafka0:
image: &image 'bitnami/kafka:3.4.0-debian-11-r22'
image: &image 'bitnami/kafka:3.6.1-debian-11-r24'
networks:
cluster_subnet:
ipv4_address: 172.16.1.2
environment: &environment
KAFKA_CFG_LISTENERS: "PLAINTEXT://:9092,CONTROLLER://:9093"
KAFKA_CFG_ADVERTISED_LISTENERS: "PLAINTEXT://172.16.1.2:9092"
ALLOW_PLAINTEXT_LISTENER: "yes"
KAFKA_CFG_LISTENERS: "BROKER://:9092,CONTROLLER://:9093"
KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.2:9092"
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT"
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: "BROKER"
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
KAFKA_CFG_PROCESS_ROLES: "controller,broker"
KAFKA_KRAFT_CLUSTER_ID: "abcdefghijklmnopqrstuv"
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: "0@kafka0:9093,1@kafka1:9093,2@kafka2:9093"
KAFKA_CFG_NODE_ID: 0
ALLOW_PLAINTEXT_LISTENER: "yes"
volumes: &volumes
- type: tmpfs
target: /bitnami/kafka
Expand All @@ -31,7 +35,7 @@ services:
ipv4_address: 172.16.1.3
environment:
<<: *environment
KAFKA_CFG_ADVERTISED_LISTENERS: "PLAINTEXT://172.16.1.3:9092"
KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.3:9092"
KAFKA_CFG_NODE_ID: 1
volumes: *volumes
kafka2:
Expand All @@ -41,6 +45,6 @@ services:
ipv4_address: 172.16.1.4
environment:
<<: *environment
KAFKA_CFG_ADVERTISED_LISTENERS: "PLAINTEXT://172.16.1.4:9092"
KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.4:9092"
KAFKA_CFG_NODE_ID: 2
volumes: *volumes
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
version: "3"
services:
kafka0:
image: 'bitnami/kafka:3.6.1-debian-11-r24'
ports:
- '9092:9092'
environment:
- KAFKA_CFG_LISTENERS=BROKER://:9092,CONTROLLER://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=BROKER://127.0.0.1:9092
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,BROKER:SSL
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=BROKER
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka0:9093
- KAFKA_CFG_NODE_ID=0
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CERTIFICATE_PASSWORD=password
- KAFKA_TLS_CLIENT_AUTH=none
volumes:
- type: tmpfs
target: /bitnami/kafka
- type: bind
source: "../tls/certs"
target: "/opt/bitnami/kafka/config/certs"
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
---
sources:
- Kafka:
name: "kafka"
listen_addr: "127.0.0.1:9192"
chain:
- KafkaSinkSingle:
destination_port: 9092
connect_timeout_ms: 3000
tls:
certificate_authority_path: "tests/test-configs/kafka/tls/certs/localhost_CA.crt"
verify_hostname: true
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
version: "3"
services:
kafka:
image: 'bitnami/kafka:3.4.0-debian-11-r22'
kafka0:
image: 'bitnami/kafka:3.6.1-debian-11-r24'
ports:
- '9092:9092'
environment:
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
- KAFKA_CFG_LISTENERS=BROKER://:9092,CONTROLLER://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=BROKER://127.0.0.1:9092
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=BROKER
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka0:9093
- KAFKA_CFG_NODE_ID=0
- ALLOW_PLAINTEXT_LISTENER=yes
volumes:
- type: tmpfs
Expand Down
35 changes: 28 additions & 7 deletions shotover/src/transforms/kafka/sink_single.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::frame::kafka::{KafkaFrame, RequestBody, ResponseBody};
use crate::frame::Frame;
use crate::message::{Message, Messages};
use crate::tcp;
use crate::tls::{TlsConnector, TlsConnectorConfig};
use crate::transforms::kafka::common::produce_channel;
use crate::transforms::util::cluster_connection_pool::{spawn_read_write_tasks, Connection};
use crate::transforms::util::{Request, Response};
Expand All @@ -11,6 +12,7 @@ use anyhow::{anyhow, Result};
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::time::Duration;
use tokio::io::split;
use tokio::sync::{mpsc, oneshot};
use tokio::time::timeout;

Expand All @@ -22,6 +24,7 @@ pub struct KafkaSinkSingleConfig {
pub destination_port: u16,
pub connect_timeout_ms: u64,
pub read_timeout: Option<u64>,
pub tls: Option<TlsConnectorConfig>,
}

use crate::transforms::TransformConfig;
Expand All @@ -31,11 +34,13 @@ const NAME: &str = "KafkaSinkSingle";
#[async_trait(?Send)]
impl TransformConfig for KafkaSinkSingleConfig {
async fn get_builder(&self, chain_name: String) -> Result<Box<dyn TransformBuilder>> {
let tls = self.tls.clone().map(TlsConnector::new).transpose()?;
Ok(Box::new(KafkaSinkSingleBuilder::new(
self.destination_port,
chain_name,
self.connect_timeout_ms,
self.read_timeout,
tls,
)))
}
}
Expand All @@ -45,6 +50,7 @@ pub struct KafkaSinkSingleBuilder {
address_port: u16,
connect_timeout: Duration,
read_timeout: Option<Duration>,
tls: Option<TlsConnector>,
}

impl KafkaSinkSingleBuilder {
Expand All @@ -53,13 +59,15 @@ impl KafkaSinkSingleBuilder {
_chain_name: String,
connect_timeout_ms: u64,
timeout: Option<u64>,
tls: Option<TlsConnector>,
) -> KafkaSinkSingleBuilder {
let receive_timeout = timeout.map(Duration::from_secs);

KafkaSinkSingleBuilder {
address_port,
connect_timeout: Duration::from_millis(connect_timeout_ms),
read_timeout: receive_timeout,
tls,
}
}
}
Expand All @@ -71,6 +79,7 @@ impl TransformBuilder for KafkaSinkSingleBuilder {
address_port: self.address_port,
pushed_messages_tx: None,
connect_timeout: self.connect_timeout,
tls: self.tls.clone(),
read_timeout: self.read_timeout,
})
}
Expand All @@ -90,6 +99,7 @@ pub struct KafkaSinkSingle {
pushed_messages_tx: Option<mpsc::UnboundedSender<Messages>>,
connect_timeout: Duration,
read_timeout: Option<Duration>,
tls: Option<TlsConnector>,
}

#[async_trait]
Expand All @@ -101,13 +111,24 @@ impl Transform for KafkaSinkSingle {
async fn transform<'a>(&'a mut self, mut requests_wrapper: Wrapper<'a>) -> Result<Messages> {
if self.outbound.is_none() {
let codec = KafkaCodecBuilder::new(Direction::Sink, "KafkaSinkSingle".to_owned());
let tcp_stream = tcp::tcp_stream(
self.connect_timeout,
(requests_wrapper.local_addr.ip(), self.address_port),
)
.await?;
let (rx, tx) = tcp_stream.into_split();
self.outbound = Some(spawn_read_write_tasks(&codec, rx, tx));
if let Some(tls) = self.tls.as_mut() {
let tls_stream = tls
.connect(
self.connect_timeout,
(requests_wrapper.local_addr.ip(), self.address_port),
)
.await?;
let (rx, tx) = split(tls_stream);
self.outbound = Some(spawn_read_write_tasks(&codec, rx, tx));
} else {
let tcp_stream = tcp::tcp_stream(
self.connect_timeout,
(requests_wrapper.local_addr.ip(), self.address_port),
)
.await?;
let (rx, tx) = tcp_stream.into_split();
self.outbound = Some(spawn_read_write_tasks(&codec, rx, tx));
}
}

// Rewrite requests to use kafkas port instead of shotovers port
Expand Down
52 changes: 52 additions & 0 deletions test-helpers/src/cert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,55 @@ pub fn generate_cassandra_test_certs() {
pub fn generate_redis_test_certs() {
generate_test_certs(Path::new("tests/test-configs/redis/tls/certs"));
}

pub fn generate_kafka_test_certs() {
let path = Path::new("tests/test-configs/kafka/tls/certs");
generate_test_certs(path);
std::fs::remove_file(path.join("kafka.keystore.p12")).ok();
std::fs::remove_file(path.join("kafka.keystore.jks")).ok();
std::fs::remove_file(path.join("kafka.truststore.jks")).ok();
run_command(
"openssl",
&[
"pkcs12",
"-export",
"-out",
path.join("kafka.keystore.p12").to_str().unwrap(),
"-inkey",
path.join("localhost.key").to_str().unwrap(),
"-in",
path.join("localhost.crt").to_str().unwrap(),
"-passout",
"pass:password",
],
)
.unwrap();

run_command(
"keytool",
&[
"-importkeystore",
"-srckeystore",
path.join("kafka.keystore.p12").to_str().unwrap(),
"-srcstoretype",
"pkcs12",
"-destkeystore",
path.join("kafka.keystore.jks").to_str().unwrap(),
"-deststoretype",
"JKS",
"-storepass",
"password",
"-srcstorepass",
"password",
],
)
.unwrap();

// Bitnami or kafka insists on having a truststore, but I dont think it actually uses it at all since client auth is disabled.
// So instead lets just give it a truststore shaped file.
std::fs::copy(
path.join("kafka.keystore.jks").to_str().unwrap(),
path.join("kafka.truststore.jks").to_str().unwrap(),
)
.unwrap();
}
2 changes: 1 addition & 1 deletion test-helpers/src/docker_compose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ pub static IMAGE_WAITERS: [Image; 11] = [
timeout: Duration::from_secs(120),
},
Image {
name: "bitnami/kafka:3.4.0-debian-11-r22",
name: "bitnami/kafka:3.6.1-debian-11-r24",
log_regex_to_wait_for: r"Kafka Server started",
timeout: Duration::from_secs(120),
},
Expand Down

0 comments on commit 0573428

Please sign in to comment.