Skip to content

Commit

Permalink
Merge branch 'main' into more_transactions_integration_tests
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Oct 17, 2024
2 parents 29ac91f + 899ee23 commit 97313ab
Show file tree
Hide file tree
Showing 9 changed files with 652 additions and 126 deletions.
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ clap = { version = "4.0.4", features = ["cargo", "derive"] }
async-trait = "0.1.30"
typetag = "0.2.5"
aws-throwaway = { version = "0.6.0", default-features = false }
tokio-bin-process = "0.5.0"
tokio-bin-process = "0.6.0"
ordered-float = { version = "4.0.0", features = ["serde"] }
shell-quote = { default-features = false, features = ["bash"], version = "0.7.0" }
pretty_assertions = "1.4.0"
258 changes: 228 additions & 30 deletions shotover-proxy/tests/kafka_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use rstest::rstest;
use std::time::Duration;
use std::time::Instant;
use test_cases::produce_consume_partitions1;
use test_cases::produce_consume_partitions3;
use test_cases::{assert_topic_creation_is_denied_due_to_acl, setup_basic_user_acls};
use test_helpers::connection::kafka::node::run_node_smoke_test_scram;
use test_helpers::connection::kafka::{KafkaConnectionBuilder, KafkaDriver};
Expand Down Expand Up @@ -365,19 +366,138 @@ async fn cluster_1_rack_multi_shotover(#[case] driver: KafkaDriver) {
}

#[rstest]
#[cfg_attr(feature = "kafka-cpp-driver-tests", case::cpp(KafkaDriver::Cpp))]
// #[cfg_attr(feature = "kafka-cpp-driver-tests", case::cpp(KafkaDriver::Cpp))] //CPP driver may cause flaky tests.
#[case::java(KafkaDriver::Java)]
#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear
async fn cluster_2_racks_multi_shotover(#[case] driver: KafkaDriver) {
async fn cluster_1_rack_multi_shotover_with_1_shotover_down(#[case] driver: KafkaDriver) {
let _docker_compose =
docker_compose("tests/test-configs/kafka/cluster-2-racks/docker-compose.yaml");
docker_compose("tests/test-configs/kafka/cluster-1-rack/docker-compose.yaml");
let mut shotovers = vec![];
for i in 1..4 {
shotovers.push(
shotover_process(&format!(
"tests/test-configs/kafka/cluster-1-rack/topology{i}.yaml"
))
.with_config(&format!(
"tests/test-configs/shotover-config/config{i}.yaml"
))
.with_log_name(&format!("shotover{i}"))
.start()
.await,
);
}

// One shotover instance per rack
// Wait for check_shotover_peers to start
tokio::time::sleep(Duration::from_secs(15)).await;

// produce and consume messages, kill 1 shotover node and produce and consume more messages
let connection_builder = KafkaConnectionBuilder::new(driver, "localhost:9192");
let shotover_nodes_to_kill: Vec<_> = shotovers.drain(0..1).collect();
test_cases::produce_consume_partitions1_shotover_nodes_go_down(
shotover_nodes_to_kill,
&connection_builder,
"shotover_node_goes_down_test",
)
.await;

// create a new connection and produce and consume messages
let new_connection_builder = KafkaConnectionBuilder::new(driver, "localhost:9193");
test_cases::cluster_test_suite(&new_connection_builder).await;

let mut expected_events = multi_shotover_events();
// Other shotover nodes should detect the killed node at least once
expected_events.push(
EventMatcher::new()
.with_level(Level::Warn)
.with_target("shotover::transforms::kafka::sink_cluster::shotover_node")
.with_message(r#"Shotover peer 127.0.0.1:9191 is down"#)
.with_count(Count::GreaterThanOrEqual(1)),
);

for shotover in shotovers {
tokio::time::timeout(
Duration::from_secs(10),
shotover.shutdown_and_then_consume_events(&expected_events),
)
.await
.expect("Shotover did not shutdown within 10s");
}
}

#[rstest]
// #[cfg_attr(feature = "kafka-cpp-driver-tests", case::cpp(KafkaDriver::Cpp))] //CPP driver may cause flaky tests.
#[case::java(KafkaDriver::Java)]
#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear
async fn cluster_3_racks_multi_shotover_with_2_shotover_down(#[case] driver: KafkaDriver) {
let _docker_compose =
docker_compose("tests/test-configs/kafka/cluster-3-racks/docker-compose.yaml");
let mut shotovers = vec![];
for i in 1..4 {
shotovers.push(
shotover_process(&format!(
"tests/test-configs/kafka/cluster-3-racks/topology-rack{i}.yaml"
))
.with_config(&format!(
"tests/test-configs/shotover-config/config{i}.yaml"
))
.with_log_name(&format!("shotover{i}"))
.start()
.await,
);
}

// Wait for check_shotover_peers to start
tokio::time::sleep(Duration::from_secs(15)).await;

// produce and consume messages, kill 2 shotover nodes and produce and consume more messages
let connection_builder = KafkaConnectionBuilder::new(driver, "localhost:9193");
let shotover_nodes_to_kill: Vec<_> = shotovers.drain(0..2).collect();
test_cases::produce_consume_partitions1_shotover_nodes_go_down(
shotover_nodes_to_kill,
&connection_builder,
"shotover_nodes_go_down_test",
)
.await;

// create a new connection and produce and consume messages
let new_connection_builder = KafkaConnectionBuilder::new(driver, "localhost:9193");
test_cases::cluster_test_suite(&new_connection_builder).await;

let mut expected_events = multi_shotover_events();
// The UP shotover node should detect the killed nodes at least once
for i in 1..3 {
expected_events.push(
EventMatcher::new()
.with_level(Level::Warn)
.with_target("shotover::transforms::kafka::sink_cluster::shotover_node")
.with_message(&format!(r#"Shotover peer localhost:919{i} is down"#))
.with_count(Count::GreaterThanOrEqual(1)),
);
}

for shotover in shotovers {
tokio::time::timeout(
Duration::from_secs(10),
shotover.shutdown_and_then_consume_events(&expected_events),
)
.await
.expect("Shotover did not shutdown within 10s");
}
}

#[rstest]
#[cfg_attr(feature = "kafka-cpp-driver-tests", case::cpp(KafkaDriver::Cpp))]
#[case::java(KafkaDriver::Java)]
#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear
async fn cluster_3_racks_multi_shotover_with_1_shotover_missing(#[case] driver: KafkaDriver) {
let _docker_compose =
docker_compose("tests/test-configs/kafka/cluster-3-racks/docker-compose.yaml");
let mut shotovers = vec![];
// Only start shotover1/2 and leave shotover3 missing
for i in 1..3 {
shotovers.push(
shotover_process(&format!(
"tests/test-configs/kafka/cluster-2-racks/topology-rack{i}.yaml"
"tests/test-configs/kafka/cluster-3-racks/topology-rack{i}.yaml"
))
.with_config(&format!(
"tests/test-configs/shotover-config/config{i}.yaml"
Expand All @@ -388,23 +508,38 @@ async fn cluster_2_racks_multi_shotover(#[case] driver: KafkaDriver) {
);
}

let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192");
// Wait for check_shotover_peers to start
tokio::time::sleep(Duration::from_secs(15)).await;

// Send some produce and consume requests
let connection_builder = KafkaConnectionBuilder::new(driver, "localhost:9192");
test_cases::cluster_test_suite(&connection_builder).await;

let mut expected_events = multi_shotover_events();
// Other shotover nodes should detect the missing node at least once
expected_events.push(
EventMatcher::new()
.with_level(Level::Warn)
.with_target("shotover::transforms::kafka::sink_cluster::shotover_node")
.with_message(r#"Shotover peer localhost:9193 is down"#)
.with_count(Count::GreaterThanOrEqual(1)),
);

for shotover in shotovers {
tokio::time::timeout(
Duration::from_secs(10),
shotover.shutdown_and_then_consume_events(&multi_shotover_events()),
shotover.shutdown_and_then_consume_events(&expected_events),
)
.await
.expect("Shotover did not shutdown within 10s");
}
}

#[rstest]
#[cfg_attr(feature = "kafka-cpp-driver-tests", case::cpp(KafkaDriver::Cpp))]
#[case::java(KafkaDriver::Java)]
#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear
async fn cluster_2_racks_multi_shotover_with_one_shotover_down(#[case] _driver: KafkaDriver) {
async fn cluster_2_racks_multi_shotover(#[case] driver: KafkaDriver) {
let _docker_compose =
docker_compose("tests/test-configs/kafka/cluster-2-racks/docker-compose.yaml");

Expand All @@ -424,35 +559,16 @@ async fn cluster_2_racks_multi_shotover_with_one_shotover_down(#[case] _driver:
);
}

// Wait for check_shotover_peers to start
tokio::time::sleep(Duration::from_secs(15)).await;

// Kill one shotover node
tokio::time::timeout(
Duration::from_secs(10),
shotovers.remove(0).shutdown_and_then_consume_events(&[]),
)
.await
.expect("Shotover did not shutdown within 10s");

// Wait for the other shotover node to detect the down node
tokio::time::sleep(Duration::from_secs(5)).await;
let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192");
test_cases::cluster_test_suite(&connection_builder).await;

for shotover in shotovers {
let events = tokio::time::timeout(
tokio::time::timeout(
Duration::from_secs(10),
shotover.shutdown_and_then_consume_events(&multi_shotover_events()),
)
.await
.expect("Shotover did not shutdown within 10s");

// Check if the other shotover node detected the killed node
events.assert_contains(
&EventMatcher::new()
.with_level(Level::Warn)
.with_target("shotover::transforms::kafka::sink_cluster::shotover_node")
.with_message(r#"Shotover peer localhost:9191 is down"#),
);
}
}

Expand Down Expand Up @@ -680,6 +796,88 @@ async fn cluster_sasl_scram_over_mtls_multi_shotover(#[case] driver: KafkaDriver
}
}

#[rstest]
//#[cfg_attr(feature = "kafka-cpp-driver-tests", case::cpp(KafkaDriver::Cpp))] // CPP driver does not support scram
#[case::java(KafkaDriver::Java)]
#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear
async fn cluster_sasl_scram_over_mtls_multi_shotover_with_2_shotover_down(
#[case] driver: KafkaDriver,
) {
test_helpers::cert::generate_kafka_test_certs();

let _docker_compose =
docker_compose("tests/test-configs/kafka/cluster-sasl-scram-over-mtls/docker-compose.yaml");

let mut shotovers = vec![];
for i in 1..4 {
shotovers.push(
shotover_process(&format!(
"tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology{i}.yaml"
))
.with_config(&format!(
"tests/test-configs/shotover-config/config{i}.yaml"
))
.with_log_name(&format!("shotover{i}"))
.start()
.await,
);
}

// Wait for check_shotover_peers to start
tokio::time::sleep(Duration::from_secs(15)).await;

let instant = Instant::now();
let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9193")
.use_sasl_scram("super_user", "super_password");
produce_consume_partitions3(&connection_builder, "partitions3_rf3", 1, 500).await;

// Wait 20s since we started the initial run to ensure that we hit the 15s token lifetime limit
tokio::time::sleep_until((instant + Duration::from_secs(20)).into()).await;
produce_consume_partitions3(&connection_builder, "partitions3_rf3", 1, 500).await;

// Kill 2 shotover nodes
for _ in 0..2 {
tokio::time::timeout(
Duration::from_secs(10),
shotovers.remove(0).shutdown_and_then_consume_events(&[]),
)
.await
.expect("Shotover did not shutdown within 10s");
}

// Wait for the other shotover node to detect the down nodes
tokio::time::sleep(Duration::from_secs(10)).await;

// Send more produce and consume requests
produce_consume_partitions3(&connection_builder, "partitions3_rf3", 1, 500).await;

let mut expected_events = multi_shotover_events();
// The up shotover node should detect the killed nodes at least once
expected_events.push(
EventMatcher::new()
.with_level(Level::Warn)
.with_target("shotover::transforms::kafka::sink_cluster::shotover_node")
.with_message(r#"Shotover peer 127.0.0.1:9191 is down"#)
.with_count(Count::GreaterThanOrEqual(1)),
);
expected_events.push(
EventMatcher::new()
.with_level(Level::Warn)
.with_target("shotover::transforms::kafka::sink_cluster::shotover_node")
.with_message(r#"Shotover peer 127.0.0.1:9192 is down"#)
.with_count(Count::GreaterThanOrEqual(1)),
);

for shotover in shotovers {
tokio::time::timeout(
Duration::from_secs(10),
shotover.shutdown_and_then_consume_events(&expected_events),
)
.await
.expect("Shotover did not shutdown within 10s");
}
}

#[rstest]
#[cfg_attr(feature = "kafka-cpp-driver-tests", case::cpp(KafkaDriver::Cpp))]
#[case::java(KafkaDriver::Java)]
Expand Down
Loading

0 comments on commit 97313ab

Please sign in to comment.