Skip to content

Commit

Permalink
KafkaSinkCluster TLS support (#1498)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Feb 26, 2024
1 parent 0661bec commit 9a4fb8e
Show file tree
Hide file tree
Showing 9 changed files with 177 additions and 35 deletions.
11 changes: 11 additions & 0 deletions docs/src/sources.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,17 @@ Kafka:
# If not provided defaults to false
hard_connection_limit: false

# When this field is provided TLS is used when the client connects to Shotover.
# Removing this field will disable TLS.
#tls:
# # Path to the certificate file, typically named with a .crt extension.
# certificate_path: "tls/localhost.crt"
# # Path to the private key file, typically named with a .key extension.
# private_key_path: "tls/localhost.key"
# # Path to the certificate authority file, typically named with a .crt extension.
# # When this field is provided client authentication will be enabled.
# #certificate_authority_path: "tls/localhost_CA.crt"

# Timeout in seconds after which to terminate an idle connection. This field is optional, if not provided, idle connections will never be terminated.
# timeout: 60

Expand Down
24 changes: 24 additions & 0 deletions docs/src/transforms.md
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,18 @@ This is achieved by rewriting the FindCoordinator, Metadata and DescribeCluster
# This field is optional, if not provided, timeout will never occur.
# When a timeout occurs the connection to the client is immediately closed.
# read_timeout: 60
# When this field is provided TLS is used when connecting to the remote address.
# Removing this field will disable TLS.
#tls:
# # Path to the certificate authority file, typically named with a .crt extension.
# certificate_authority_path: "tls/localhost_CA.crt"
# # Path to the certificate file, typically named with a .crt extension.
# certificate_path: "tls/localhost.crt"
# # Path to the private key file, typically named with a .key extension.
# private_key_path: "tls/localhost.key"
# # Enable/disable verifying the hostname of the certificate provided by the destination.
# #verify_hostname: true
```

### KafkaSinkSingle
Expand All @@ -332,6 +344,18 @@ In order to force clients to connect through shotover the FindCoordinator, Metad
# This field is optional, if not provided, timeout will never occur.
# When a timeout occurs the connection to the client is immediately closed.
# read_timeout: 60
# When this field is provided TLS is used when connecting to the remote address.
# Removing this field will disable TLS.
#tls:
# # Path to the certificate authority file, typically named with a .crt extension.
# certificate_authority_path: "tls/localhost_CA.crt"
# # Path to the certificate file, typically named with a .crt extension.
# certificate_path: "tls/localhost.crt"
# # Path to the private key file, typically named with a .key extension.
# private_key_path: "tls/localhost.key"
# # Enable/disable verifying the hostname of the certificate provided by the destination.
# #verify_hostname: true
```

This transfrom emits a metrics [counter](user-guide/observability.md#counter) named `failed_requests` and the labels `transform` defined as `CassandraSinkSingle` and `chain` as the name of the chain that this transform is in.
Expand Down
1 change: 1 addition & 0 deletions shotover-proxy/benches/windsock/kafka/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ impl KafkaBench {
read_timeout: None,
first_contact_points: vec![kafka_address],
shotover_nodes: vec![host_address.clone()],
tls: None,
}),
});
common::generate_topology(SourceConfig::Kafka(shotover::sources::kafka::KafkaConfig {
Expand Down
22 changes: 22 additions & 0 deletions shotover-proxy/tests/kafka_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,28 @@ async fn passthrough_tls() {
.expect("Shotover did not shutdown within 10s");
}

#[cfg(feature = "rdkafka-driver-tests")]
#[tokio::test]
async fn cluster_tls() {
test_helpers::cert::generate_kafka_test_certs();

let _docker_compose =
docker_compose("tests/test-configs/kafka/cluster-tls/docker-compose.yaml");
let shotover = shotover_process("tests/test-configs/kafka/cluster-tls/topology.yaml")
.start()
.await;

let connection_builder = KafkaConnectionBuilder::new("127.0.0.1:9192");
test_cases::basic(connection_builder).await;

tokio::time::timeout(
Duration::from_secs(10),
shotover.shutdown_and_then_consume_events(&[]),
)
.await
.expect("Shotover did not shutdown within 10s");
}

#[cfg(feature = "rdkafka-driver-tests")]
#[tokio::test]
async fn passthrough_encode() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
version: "3"
networks:
cluster_subnet:
name: cluster_subnet
driver: bridge
ipam:
driver: default
config:
- subnet: 172.16.1.0/24
gateway: 172.16.1.1
services:
kafka0:
image: &image 'bitnami/kafka:3.6.1-debian-11-r24'
networks:
cluster_subnet:
ipv4_address: 172.16.1.2
environment: &environment
KAFKA_CFG_LISTENERS: "BROKER://:9092,CONTROLLER://:9093"
KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.2:9092"
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,BROKER:SSL"
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: "BROKER"
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
KAFKA_CFG_PROCESS_ROLES: "controller,broker"
KAFKA_KRAFT_CLUSTER_ID: "abcdefghijklmnopqrstuv"
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: "0@kafka0:9093,1@kafka1:9093,2@kafka2:9093"
KAFKA_CFG_NODE_ID: 0
KAFKA_CERTIFICATE_PASSWORD: password
KAFKA_TLS_CLIENT_AUTH: none
volumes: &volumes
- type: tmpfs
target: /bitnami/kafka
- type: bind
source: "../tls/certs"
target: "/opt/bitnami/kafka/config/certs"
kafka1:
image: *image
networks:
cluster_subnet:
ipv4_address: 172.16.1.3
environment:
<<: *environment
KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.3:9092"
KAFKA_CFG_NODE_ID: 1
volumes: *volumes
kafka2:
image: *image
networks:
cluster_subnet:
ipv4_address: 172.16.1.4
environment:
<<: *environment
KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.4:9092"
KAFKA_CFG_NODE_ID: 2
volumes: *volumes
13 changes: 13 additions & 0 deletions shotover-proxy/tests/test-configs/kafka/cluster-tls/topology.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
---
sources:
- Kafka:
name: "kafka"
listen_addr: "127.0.0.1:9192"
chain:
- KafkaSinkCluster:
shotover_nodes: ["127.0.0.1:9192"]
first_contact_points: ["172.16.1.2:9092"]
connect_timeout_ms: 3000
tls:
certificate_authority_path: "tests/test-configs/kafka/tls/certs/localhost_CA.crt"
verify_hostname: true
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ services:
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka0:9093
- KAFKA_CFG_NODE_ID=0
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CERTIFICATE_PASSWORD=password
- KAFKA_TLS_CLIENT_AUTH=none
volumes:
Expand Down
72 changes: 49 additions & 23 deletions shotover/src/transforms/kafka/sink_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::frame::kafka::{strbytes, KafkaFrame, RequestBody, ResponseBody};
use crate::frame::Frame;
use crate::message::{Message, Messages};
use crate::tcp;
use crate::tls::{TlsConnector, TlsConnectorConfig};
use crate::transforms::util::cluster_connection_pool::{spawn_read_write_tasks, Connection};
use crate::transforms::util::{Request, Response};
use crate::transforms::{Transform, TransformBuilder, Wrapper};
Expand All @@ -29,6 +30,7 @@ use std::net::SocketAddr;
use std::sync::atomic::AtomicI64;
use std::sync::Arc;
use std::time::Duration;
use tokio::io::split;
use tokio::sync::{mpsc, oneshot, RwLock};
use tokio::time::timeout;

Expand All @@ -39,6 +41,7 @@ pub struct KafkaSinkClusterConfig {
pub shotover_nodes: Vec<String>,
pub connect_timeout_ms: u64,
pub read_timeout: Option<u64>,
pub tls: Option<TlsConnectorConfig>,
}

const NAME: &str = "KafkaSinkCluster";
Expand All @@ -49,12 +52,14 @@ impl TransformConfig for KafkaSinkClusterConfig {
&self,
transform_context: TransformContextConfig,
) -> Result<Box<dyn TransformBuilder>> {
let tls = self.tls.clone().map(TlsConnector::new).transpose()?;
Ok(Box::new(KafkaSinkClusterBuilder::new(
self.first_contact_points.clone(),
self.shotover_nodes.clone(),
transform_context.chain_name,
self.connect_timeout_ms,
self.read_timeout,
tls,
)))
}
}
Expand All @@ -69,6 +74,7 @@ pub struct KafkaSinkClusterBuilder {
group_to_coordinator_broker: Arc<DashMap<GroupId, BrokerId>>,
topics: Arc<DashMap<TopicName, Topic>>,
nodes_shared: Arc<RwLock<Vec<KafkaNode>>>,
tls: Option<TlsConnector>,
}

impl KafkaSinkClusterBuilder {
Expand All @@ -78,6 +84,7 @@ impl KafkaSinkClusterBuilder {
_chain_name: String,
connect_timeout_ms: u64,
timeout: Option<u64>,
tls: Option<TlsConnector>,
) -> KafkaSinkClusterBuilder {
let receive_timeout = timeout.map(Duration::from_secs);

Expand All @@ -101,6 +108,7 @@ impl KafkaSinkClusterBuilder {
group_to_coordinator_broker: Arc::new(DashMap::new()),
topics: Arc::new(DashMap::new()),
nodes_shared: Arc::new(RwLock::new(vec![])),
tls,
}
}
}
Expand All @@ -119,6 +127,7 @@ impl TransformBuilder for KafkaSinkClusterBuilder {
group_to_coordinator_broker: self.group_to_coordinator_broker.clone(),
topics: self.topics.clone(),
rng: SmallRng::from_rng(rand::thread_rng()).unwrap(),
tls: self.tls.clone(),
})
}

Expand Down Expand Up @@ -165,6 +174,7 @@ pub struct KafkaSinkCluster {
group_to_coordinator_broker: Arc<DashMap<GroupId, BrokerId>>,
topics: Arc<DashMap<TopicName, Topic>>,
rng: SmallRng,
tls: Option<TlsConnector>,
}

#[async_trait]
Expand Down Expand Up @@ -336,8 +346,11 @@ impl KafkaSinkCluster {
as usize];
for node in &mut self.nodes {
if node.broker_id == partition.leader_id {
connection =
Some(node.get_connection(self.connect_timeout).await?.clone());
connection = Some(
node.get_connection(self.connect_timeout, &self.tls)
.await?
.clone(),
);
}
}
}
Expand All @@ -348,7 +361,7 @@ impl KafkaSinkCluster {
self.nodes
.choose_mut(&mut self.rng)
.unwrap()
.get_connection(self.connect_timeout)
.get_connection(self.connect_timeout, &self.tls)
.await?
.clone()
}
Expand Down Expand Up @@ -388,7 +401,7 @@ impl KafkaSinkCluster {
.filter(|node| partition.replica_nodes.contains(&node.broker_id))
.choose(&mut self.rng)
.unwrap()
.get_connection(self.connect_timeout)
.get_connection(self.connect_timeout, &self.tls)
.await?
.clone()
} else {
Expand All @@ -397,7 +410,7 @@ impl KafkaSinkCluster {
self.nodes
.choose_mut(&mut self.rng)
.unwrap()
.get_connection(self.connect_timeout)
.get_connection(self.connect_timeout, &self.tls)
.await?
.clone()
};
Expand Down Expand Up @@ -461,7 +474,7 @@ impl KafkaSinkCluster {
.nodes
.choose_mut(&mut self.rng)
.unwrap()
.get_connection(self.connect_timeout)
.get_connection(self.connect_timeout, &self.tls)
.await?;
let (tx, rx) = oneshot::channel();
connection
Expand Down Expand Up @@ -498,7 +511,7 @@ impl KafkaSinkCluster {
.nodes
.choose_mut(&mut self.rng)
.unwrap()
.get_connection(self.connect_timeout)
.get_connection(self.connect_timeout, &self.tls)
.await?;
let (tx, rx) = oneshot::channel();
connection
Expand Down Expand Up @@ -556,7 +569,7 @@ impl KafkaSinkCluster {
.nodes
.choose_mut(&mut self.rng)
.unwrap()
.get_connection(self.connect_timeout)
.get_connection(self.connect_timeout, &self.tls)
.await?;
let (tx, rx) = oneshot::channel();
connection
Expand Down Expand Up @@ -665,13 +678,15 @@ impl KafkaSinkCluster {
let connection = if let Some(node) =
self.nodes.iter_mut().find(|x| x.broker_id == *broker_id)
{
node.get_connection(self.connect_timeout).await?.clone()
node.get_connection(self.connect_timeout, &self.tls)
.await?
.clone()
} else {
tracing::warn!("no known broker with id {broker_id:?}, routing message to a random node so that a NOT_CONTROLLER or similar error is returned to the client");
self.nodes
.choose_mut(&mut self.rng)
.unwrap()
.get_connection(self.connect_timeout)
.get_connection(self.connect_timeout, &self.tls)
.await?
.clone()
};
Expand All @@ -695,7 +710,11 @@ impl KafkaSinkCluster {
for node in &mut self.nodes {
if let Some(broker_id) = self.group_to_coordinator_broker.get(&group_id) {
if node.broker_id == *broker_id {
connection = Some(node.get_connection(self.connect_timeout).await?.clone());
connection = Some(
node.get_connection(self.connect_timeout, &self.tls)
.await?
.clone(),
);
}
}
}
Expand All @@ -706,7 +725,7 @@ impl KafkaSinkCluster {
self.nodes
.choose_mut(&mut self.rng)
.unwrap()
.get_connection(self.connect_timeout)
.get_connection(self.connect_timeout, &self.tls)
.await?
.clone()
}
Expand Down Expand Up @@ -873,19 +892,26 @@ struct KafkaNode {
}

impl KafkaNode {
async fn get_connection(&mut self, connect_timeout: Duration) -> Result<&Connection> {
async fn get_connection(
&mut self,
connect_timeout: Duration,
tls: &Option<TlsConnector>,
) -> Result<&Connection> {
if self.connection.is_none() {
let codec = KafkaCodecBuilder::new(Direction::Sink, "KafkaSinkCluster".to_owned());
let tcp_stream = tcp::tcp_stream(
connect_timeout,
(
self.kafka_address.host.to_string(),
self.kafka_address.port as u16,
),
)
.await?;
let (rx, tx) = tcp_stream.into_split();
self.connection = Some(spawn_read_write_tasks(&codec, rx, tx));
let address = (
self.kafka_address.host.to_string(),
self.kafka_address.port as u16,
);
if let Some(tls) = tls.as_ref() {
let tls_stream = tls.connect(connect_timeout, address).await?;
let (rx, tx) = split(tls_stream);
self.connection = Some(spawn_read_write_tasks(&codec, rx, tx));
} else {
let tcp_stream = tcp::tcp_stream(connect_timeout, address).await?;
let (rx, tx) = tcp_stream.into_split();
self.connection = Some(spawn_read_write_tasks(&codec, rx, tx));
}
}
Ok(self.connection.as_ref().unwrap())
}
Expand Down
Loading

0 comments on commit 9a4fb8e

Please sign in to comment.