Skip to content

Commit

Permalink
Merge branch 'main' into axum
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Mar 14, 2024
2 parents 95b1d9e + 5523020 commit 600a415
Show file tree
Hide file tree
Showing 35 changed files with 644 additions and 3,153 deletions.
3 changes: 0 additions & 3 deletions .github/workflows/windsock_benches.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@ jobs:
cargo windsock local-run --bench-length-seconds 5 --operations-per-second 100 --profilers samply name=cassandra,compression=none,connection_count=1,driver=scylla,operation=read_i64,protocol=v4,shotover=standard,topology=single
cargo windsock local-run --bench-length-seconds 5 --operations-per-second 100 --profilers sys_monitor name=kafka,shotover=standard,size=1B,topology=single
cargo windsock local-run --bench-length-seconds 5 --operations-per-second 100 --profilers shotover_metrics name=redis,encryption=none,operation=get,shotover=standard,topology=single
# windsock/examples/cassandra.rs - this can stay here until windsock is moved to its own repo
cargo run --release --example cassandra -- local-run --bench-length-seconds 5 --operations-per-second 100
- name: Ensure that tests did not create or modify any files that arent .gitignore'd
run: |
if [ -n "$(git status --porcelain)" ]; then
Expand Down
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
[workspace]
members = [
"windsock",
"shotover",
"shotover-proxy",
"test-helpers",
Expand Down
5 changes: 4 additions & 1 deletion docs/src/transforms.md
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ Instead Shotover will pretend to be either a single Kafka node or part of a clus
This is achieved by rewriting the FindCoordinator, Metadata and DescribeCluster messages to contain the nodes in the shotover cluster instead of the kafka cluster.

```yaml
- CassandraSinkCluster:
- KafkaSinkCluster:
# Addresses of the initial kafka brokers to connect to.
first_contact_points: ["172.16.1.2:9042", "172.16.1.3:9042"]
Expand All @@ -309,6 +309,9 @@ This is achieved by rewriting the FindCoordinator, Metadata and DescribeCluster
# When a timeout occurs the connection to the client is immediately closed.
# read_timeout: 60
# When this field is enabled it allows the use of SASL authentication. If you intend to use SASL this field must be enabled, it is false by default.
sasl_enabled: false
# When this field is provided TLS is used when connecting to the remote address.
# Removing this field will disable TLS.
#tls:
Expand Down
2 changes: 1 addition & 1 deletion shotover-proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ tokio-bin-process.workspace = true
rustls-pemfile = "2.0.0"
rustls-pki-types = "1.1.0"
aws-throwaway.workspace = true
windsock = { path = "../windsock" }
windsock = "0.1.0"
regex = "1.7.0"
cql-ws = { git = "https://github.com/shotover/cql-ws" }
opensearch = "2.1.0"
Expand Down
9 changes: 7 additions & 2 deletions shotover-proxy/benches/windsock/kafka/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use itertools::Itertools;
use shotover::config::chain::TransformChainConfig;
use shotover::sources::SourceConfig;
use shotover::transforms::debug::force_parse::DebugForceEncodeConfig;
use shotover::transforms::kafka::sink_cluster::KafkaSinkClusterConfig;
use shotover::transforms::kafka::sink_cluster::{KafkaSinkClusterConfig, ShotoverNodeConfig};
use shotover::transforms::kafka::sink_single::KafkaSinkSingleConfig;
use shotover::transforms::TransformConfig;
use std::sync::Arc;
Expand Down Expand Up @@ -92,8 +92,13 @@ impl KafkaBench {
connect_timeout_ms: 3000,
read_timeout: None,
first_contact_points: vec![kafka_address],
shotover_nodes: vec![host_address.clone()],
shotover_nodes: vec![ShotoverNodeConfig {
address: host_address.parse().unwrap(),
rack: "rack1".into(),
broker_id: 0,
}],
tls: None,
sasl_enabled: Some(false),
}),
});
common::generate_topology(SourceConfig::Kafka(shotover::sources::kafka::KafkaConfig {
Expand Down
68 changes: 53 additions & 15 deletions shotover-proxy/tests/kafka_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,19 +195,58 @@ async fn cluster_1_rack_multi_shotover(#[case] driver: KafkaDriver) {
#[cfg(feature = "rdkafka-driver-tests")] // temporarily needed to avoid a warning
#[rstest]
#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))]
// #[case::java(KafkaDriver::Java)]
//#[case::java(KafkaDriver::Java)]
#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear
async fn cluster_2_racks_single_shotover(#[case] driver: KafkaDriver) {
async fn cluster_2_racks_multi_shotover(#[case] driver: KafkaDriver) {
let _docker_compose =
docker_compose("tests/test-configs/kafka/cluster-2-racks/docker-compose.yaml");
let shotover =
shotover_process("tests/test-configs/kafka/cluster-2-racks/topology-single.yaml")

// One shotover instance per rack
let mut shotovers = vec![];
for i in 1..3 {
shotovers.push(
shotover_process(&format!(
"tests/test-configs/kafka/cluster-2-racks/topology-rack{i}.yaml"
))
.with_config(&format!(
"tests/test-configs/shotover-config/config{i}.yaml"
))
.with_log_name(&format!("shotover{i}"))
.start()
.await;
.await,
);
}

let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192");
test_cases::basic(connection_builder).await;

for shotover in shotovers {
tokio::time::timeout(
Duration::from_secs(10),
shotover.shutdown_and_then_consume_events(&[]),
)
.await
.expect("Shotover did not shutdown within 10s");
}
}

#[cfg(feature = "rdkafka-driver-tests")]
#[rstest]
#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))]
// #[case::java(KafkaDriver::Java)]
#[tokio::test]
async fn cluster_sasl_single_shotover(#[case] driver: KafkaDriver) {
let _docker_compose =
docker_compose("tests/test-configs/kafka/cluster-sasl/docker-compose.yaml");

let shotover = shotover_process("tests/test-configs/kafka/cluster-sasl/topology-single.yaml")
.start()
.await;

let connection_builder =
KafkaConnectionBuilder::new(driver, "127.0.0.1:9192").use_sasl("user", "password");
test_cases::basic(connection_builder).await;

tokio::time::timeout(
Duration::from_secs(10),
shotover.shutdown_and_then_consume_events(&[]),
Expand All @@ -216,21 +255,19 @@ async fn cluster_2_racks_single_shotover(#[case] driver: KafkaDriver) {
.expect("Shotover did not shutdown within 10s");
}

#[cfg(feature = "rdkafka-driver-tests")] // temporarily needed to avoid a warning
#[cfg(feature = "rdkafka-driver-tests")]
#[rstest]
#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))]
//#[case::java(KafkaDriver::Java)]
#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear
async fn cluster_2_racks_multi_shotover(#[case] driver: KafkaDriver) {
// #[case::java(KafkaDriver::Java)]
#[tokio::test]
async fn cluster_sasl_multi_shotover(#[case] driver: KafkaDriver) {
let _docker_compose =
docker_compose("tests/test-configs/kafka/cluster-2-racks/docker-compose.yaml");

// One shotover instance per rack
docker_compose("tests/test-configs/kafka/cluster-sasl/docker-compose.yaml");
let mut shotovers = vec![];
for i in 1..3 {
for i in 1..4 {
shotovers.push(
shotover_process(&format!(
"tests/test-configs/kafka/cluster-2-racks/topology-rack{i}.yaml"
"tests/test-configs/kafka/cluster-sasl/topology{i}.yaml"
))
.with_config(&format!(
"tests/test-configs/shotover-config/config{i}.yaml"
Expand All @@ -241,7 +278,8 @@ async fn cluster_2_racks_multi_shotover(#[case] driver: KafkaDriver) {
);
}

let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192");
let connection_builder =
KafkaConnectionBuilder::new(driver, "127.0.0.1:9192").use_sasl("user", "password");
test_cases::basic(connection_builder).await;

for shotover in shotovers {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ sources:
listen_addr: "127.0.0.1:9192"
chain:
- KafkaSinkCluster:
shotover_nodes: ["127.0.0.1:9192"]
shotover_nodes:
- address: "127.0.0.1:9192"
rack: "rack0"
broker_id: 0
first_contact_points: ["172.16.1.2:9092"]
connect_timeout_ms: 3000
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,15 @@ sources:
listen_addr: "127.0.0.1:9191"
chain:
- KafkaSinkCluster:
shotover_nodes:
- "127.0.0.1:9191"
- "127.0.0.1:9192"
- "127.0.0.1:9193"
shotover_nodes:
- address: "127.0.0.1:9191"
rack: "rack0"
broker_id: 0
- address: "127.0.0.1:9192"
rack: "rack0"
broker_id: 1
- address: "127.0.0.1:9193"
rack: "rack0"
broker_id: 2
first_contact_points: ["172.16.1.2:9092"]
connect_timeout_ms: 3000
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,15 @@ sources:
listen_addr: "127.0.0.1:9192"
chain:
- KafkaSinkCluster:
shotover_nodes:
- "127.0.0.1:9191"
- "127.0.0.1:9192"
- "127.0.0.1:9193"
shotover_nodes:
- address: "127.0.0.1:9191"
rack: "rack0"
broker_id: 0
- address: "127.0.0.1:9192"
rack: "rack0"
broker_id: 1
- address: "127.0.0.1:9193"
rack: "rack0"
broker_id: 2
first_contact_points: ["172.16.1.2:9092"]
connect_timeout_ms: 3000
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,15 @@ sources:
listen_addr: "127.0.0.1:9193"
chain:
- KafkaSinkCluster:
shotover_nodes:
- "127.0.0.1:9191"
- "127.0.0.1:9192"
- "127.0.0.1:9193"
shotover_nodes:
- address: "127.0.0.1:9191"
rack: "rack0"
broker_id: 0
- address: "127.0.0.1:9192"
rack: "rack0"
broker_id: 1
- address: "127.0.0.1:9193"
rack: "rack0"
broker_id: 2
first_contact_points: ["172.16.1.2:9092"]
connect_timeout_ms: 3000
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@ sources:
listen_addr: "127.0.0.1:9191"
chain:
- KafkaSinkCluster:
shotover_nodes:
- "127.0.0.1:9191"
- "127.0.0.1:9192"
shotover_nodes:
- address: "127.0.0.1:9191"
rack: "rack1"
broker_id: 0
- address: "127.0.0.1:9192"
rack: "rack2"
broker_id: 1
first_contact_points: ["172.16.1.2:9092"]
connect_timeout_ms: 3000
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@ sources:
listen_addr: "127.0.0.1:9192"
chain:
- KafkaSinkCluster:
shotover_nodes:
- "127.0.0.1:9191"
- "127.0.0.1:9192"
shotover_nodes:
- address: "127.0.0.1:9191"
rack: "rack1"
broker_id: 0
- address: "127.0.0.1:9192"
rack: "rack2"
broker_id: 1
first_contact_points: ["172.16.1.5:9092"]
connect_timeout_ms: 3000
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
version: '3'

networks:
cluster_subnet:
name: cluster_subnet
driver: bridge
ipam:
driver: default
config:
- subnet: 172.16.1.0/24
gateway: 172.16.1.1

services:
kafka0:
image: &image 'bitnami/kafka:3.6.1-debian-11-r24'
networks:
cluster_subnet:
ipv4_address: 172.16.1.2
environment: &environment
KAFKA_CFG_NODE_ID: 0
KAFKA_CFG_PROCESS_ROLES: "controller,broker"
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: "0@kafka0:9093,1@kafka1:9093,2@kafka2:9093"
KAFKA_CFG_LISTENERS: "SASL_PLAINTEXT://:9092,CONTROLLER://:9093"
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:SASL_PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT"
KAFKA_CFG_ADVERTISED_LISTENERS: "SASL_PLAINTEXT://172.16.1.2:9092"
KAFKA_CLIENT_USERS: "user"
KAFKA_CLIENT_PASSWORDS: "password"
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL: "PLAIN"
KAFKA_CONTROLLER_USER: "controller_user"
KAFKA_CONTROLLER_PASSWORD: "controller_password"
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: "SASL_PLAINTEXT"
KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL: "PLAIN"
KAFKA_INTER_BROKER_USER: "controller_user"
KAFKA_INTER_BROKER_PASSWORD: "controller_password"
KAFKA_CERTIFICATE_PASSWORD: "123456"
KAFKA_KRAFT_CLUSTER_ID: "abcdefghijklmnopqrstuv"
volumes: &volumes
- type: tmpfs
target: /bitnami/kafka
kafka1:
image: *image
networks:
cluster_subnet:
ipv4_address: 172.16.1.3
environment:
<<: *environment
KAFKA_CFG_ADVERTISED_LISTENERS: "SASL_PLAINTEXT://172.16.1.3:9092"
KAFKA_CFG_NODE_ID: 1
volumes: *volumes
kafka2:
image: *image
networks:
cluster_subnet:
ipv4_address: 172.16.1.4
environment:
<<: *environment
KAFKA_CFG_ADVERTISED_LISTENERS: "SASL_PLAINTEXT://172.16.1.4:9092"
KAFKA_CFG_NODE_ID: 2
volumes: *volumes
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ sources:
listen_addr: "127.0.0.1:9192"
chain:
- KafkaSinkCluster:
shotover_nodes: ["127.0.0.1:9192"]
shotover_nodes:
- address: "127.0.0.1:9192"
rack: "rack0"
broker_id: 0
sasl_enabled: true
first_contact_points: ["172.16.1.2:9092"]
connect_timeout_ms: 3000
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
---
sources:
- Kafka:
name: "kafka"
listen_addr: "127.0.0.1:9191"
chain:
- KafkaSinkCluster:
sasl_enabled: true
shotover_nodes:
- address: "127.0.0.1:9191"
rack: "rack0"
broker_id: 0
- address: "127.0.0.1:9192"
rack: "rack0"
broker_id: 1
- address: "127.0.0.1:9193"
rack: "rack0"
broker_id: 2
first_contact_points: ["172.16.1.2:9092"]
connect_timeout_ms: 3000
Loading

0 comments on commit 600a415

Please sign in to comment.