Skip to content

Commit

Permalink
KafkaSinkCluster route by NodeState
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Aug 30, 2024
1 parent 149dc70 commit 2855606
Show file tree
Hide file tree
Showing 4 changed files with 263 additions and 53 deletions.
84 changes: 72 additions & 12 deletions shotover-proxy/tests/kafka_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,21 +247,52 @@ async fn single_sasl_scram_plaintext_source_tls_sink(#[case] driver: KafkaDriver
#[case::java(KafkaDriver::Java)]
#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear
async fn cluster_1_rack_single_shotover(#[case] driver: KafkaDriver) {
let _docker_compose =
let docker_compose =
docker_compose("tests/test-configs/kafka/cluster-1-rack/docker-compose.yaml");
let shotover = shotover_process("tests/test-configs/kafka/cluster-1-rack/topology-single.yaml")
.start()

{
let shotover =
shotover_process("tests/test-configs/kafka/cluster-1-rack/topology-single.yaml")
.start()
.await;

let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192");
test_cases::standard_test_suite(&connection_builder).await;
tokio::time::timeout(
Duration::from_secs(10),
shotover.shutdown_and_then_consume_events(&[]),
)
.await
.expect("Shotover did not shutdown within 10s");
}

{
let shotover =
shotover_process("tests/test-configs/kafka/cluster-1-rack/topology-single.yaml")
.start()
.await;
let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192");

test_cases::produce_consume_partitions1_kafka_node_goes_down(
&docker_compose,
&connection_builder,
"kafka_node_goes_down_test",
)
.await;

let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192");
test_cases::cluster_test_suite(&connection_builder).await;
let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192");
test_cases::cluster_test_suite(&connection_builder).await;

tokio::time::timeout(
Duration::from_secs(10),
shotover.shutdown_and_then_consume_events(&[]),
)
.await
.expect("Shotover did not shutdown within 10s");
// Shotover can reasonably hit many kinds of errors due to a kafka node down so ignore all of them.
tokio::time::timeout(
Duration::from_secs(10),
shotover.shutdown_and_then_consume_events(&[EventMatcher::new()
.with_level(Level::Error)
.with_count(Count::Any)]),
)
.await
.expect("Shotover did not shutdown within 10s");
}
}

#[rstest]
Expand Down Expand Up @@ -376,7 +407,7 @@ async fn cluster_sasl_scram_single_shotover(#[case] driver: KafkaDriver) {
async fn cluster_sasl_scram_over_mtls_single_shotover(#[case] driver: KafkaDriver) {
test_helpers::cert::generate_kafka_test_certs();

let _docker_compose =
let docker_compose =
docker_compose("tests/test-configs/kafka/cluster-sasl-scram-over-mtls/docker-compose.yaml");

// test concurrent connections with different access levels to ensure that:
Expand Down Expand Up @@ -443,6 +474,35 @@ async fn cluster_sasl_scram_over_mtls_single_shotover(#[case] driver: KafkaDrive
.await
.expect("Shotover did not shutdown within 10s");
}

// Test handling of down kafka nodes.
{
let shotover = shotover_process(
"tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology-single.yaml",
)
.start()
.await;

let connection = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192")
.use_sasl_scram("super_user", "super_password");

test_cases::produce_consume_partitions1_kafka_node_goes_down(
&docker_compose,
&connection,
"kafka_node_goes_down_test",
)
.await;

// Shotover can reasonably hit many kinds of errors due to a kafka node down so ignore all of them.
tokio::time::timeout(
Duration::from_secs(10),
shotover.shutdown_and_then_consume_events(&[EventMatcher::new()
.with_level(Level::Error)
.with_count(Count::Any)]),
)
.await
.expect("Shotover did not shutdown within 10s");
}
}

async fn assert_connection_fails_with_incorrect_password(driver: KafkaDriver, username: &str) {
Expand Down
129 changes: 125 additions & 4 deletions shotover-proxy/tests/kafka_int_tests/test_cases.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use std::collections::HashMap;
use test_helpers::connection::kafka::{
Acl, AclOperation, AclPermissionType, AlterConfig, ConfigEntry, ExpectedResponse,
KafkaConnectionBuilder, NewPartition, NewTopic, Record, ResourcePatternType, ResourceSpecifier,
ResourceType, TopicPartition,
use test_helpers::{
connection::kafka::{
Acl, AclOperation, AclPermissionType, AlterConfig, ConfigEntry, ExpectedResponse,
KafkaConnectionBuilder, NewPartition, NewTopic, Record, ResourcePatternType,
ResourceSpecifier, ResourceType, TopicPartition,
},
docker_compose::DockerCompose,
};

async fn admin_setup(connection_builder: &KafkaConnectionBuilder) {
Expand Down Expand Up @@ -169,6 +172,124 @@ pub async fn produce_consume_partitions1(
}
}

pub async fn produce_consume_partitions1_kafka_node_goes_down(
docker_compose: &DockerCompose,
connection_builder: &KafkaConnectionBuilder,
topic_name: &str,
) {
let admin = connection_builder.connect_admin().await;
admin
.create_topics(&[NewTopic {
name: topic_name,
num_partitions: 1,
replication_factor: 3,
}])
.await;

{
let producer = connection_builder.connect_producer("all").await;
// create an initial record to force kafka to create the topic if it doesnt yet exist
producer
.assert_produce(
Record {
payload: "initial",
topic_name,
key: Some("Key"),
},
Some(0),
)
.await;

let mut consumer = connection_builder
.connect_consumer(topic_name, "some_group")
.await;
consumer
.assert_consume(ExpectedResponse {
message: "initial".to_owned(),
key: Some("Key".to_owned()),
topic_name: topic_name.to_owned(),
offset: Some(0),
})
.await;

docker_compose.kill_service("kafka1");

// create and consume records
for i in 0..5 {
producer
.assert_produce(
Record {
payload: "Message1",
topic_name,
key: Some("Key"),
},
Some(i * 2 + 1),
)
.await;
producer
.assert_produce(
Record {
payload: "Message2",
topic_name,
key: None,
},
Some(i * 2 + 2),
)
.await;

consumer
.assert_consume(ExpectedResponse {
message: "Message1".to_owned(),
key: Some("Key".to_owned()),
topic_name: topic_name.to_owned(),
offset: Some(i * 2 + 1),
})
.await;

consumer
.assert_consume(ExpectedResponse {
message: "Message2".to_owned(),
key: None,
topic_name: topic_name.to_owned(),
offset: Some(i * 2 + 2),
})
.await;
}
}

// if we create a new consumer it will start from the beginning since auto.offset.reset = earliest and enable.auto.commit false
// so we test that we can access all records ever created on this topic
let mut consumer = connection_builder
.connect_consumer(topic_name, "some_group")
.await;
consumer
.assert_consume(ExpectedResponse {
message: "initial".to_owned(),
key: Some("Key".to_owned()),
topic_name: topic_name.to_owned(),
offset: Some(0),
})
.await;
for i in 0..5 {
consumer
.assert_consume(ExpectedResponse {
message: "Message1".to_owned(),
key: Some("Key".to_owned()),
topic_name: topic_name.to_owned(),
offset: Some(i * 2 + 1),
})
.await;
consumer
.assert_consume(ExpectedResponse {
message: "Message2".to_owned(),
key: None,
topic_name: topic_name.to_owned(),
offset: Some(i * 2 + 2),
})
.await;
}
}

pub async fn produce_consume_commit_offsets_partitions1(
connection_builder: &KafkaConnectionBuilder,
topic_name: &str,
Expand Down
24 changes: 12 additions & 12 deletions shotover/src/transforms/kafka/sink_cluster/connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,7 @@ impl Connections {

let connection = connection_factory
.create_connection(address, authorize_scram_over_mtls, sasl_mechanism)
.await
.context("Failed to create a new connection");
.await;

// Update the node state according to whether we can currently open a connection.
let node_state = if connection.is_err() {
Expand All @@ -227,23 +226,24 @@ impl Connections {
.unwrap()
.state
.store(node_state, Ordering::Relaxed);
let connection = connection?;

// Recreating the node succeeded.
// So store it as the new connection, as long as we werent waiting on any responses in the old connection
let connection =
KafkaConnection::new(authorize_scram_over_mtls, sasl_mechanism, connection, None)?;

if old_connection
.map(|old| old.pending_requests_count())
.unwrap_or(0)
> 0
{
Err(error.context("Succesfully reopened outgoing connection but previous outgoing connection had pending requests."))
} else {
self.connections.insert(destination, connection);
Ok(())
return Err(error.context("Outgoing connection had pending requests, those requests/responses are lost so connection recovery cannot be attempted."));
}

let connection = connection.context("Failed to create a new connection")?;

// Recreating the node succeeded.
// So store it as the new connection, as long as we werent waiting on any responses in the old connection
let connection =
KafkaConnection::new(authorize_scram_over_mtls, sasl_mechanism, connection, None)?;

self.connections.insert(destination, connection);
Ok(())
}
}

Expand Down
Loading

0 comments on commit 2855606

Please sign in to comment.