Skip to content

Commit

Permalink
KafkaSinkCluster route by NodeState
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Aug 28, 2024
1 parent e275b2d commit c0a6cd6
Show file tree
Hide file tree
Showing 4 changed files with 196 additions and 18 deletions.
37 changes: 35 additions & 2 deletions shotover-proxy/tests/kafka_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ async fn single_sasl_scram_plaintext_source_tls_sink(#[case] driver: KafkaDriver
#[case::java(KafkaDriver::Java)]
#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear
async fn cluster_1_rack_single_shotover(#[case] driver: KafkaDriver) {
let _docker_compose =
let docker_compose =
docker_compose("tests/test-configs/kafka/cluster-1-rack/docker-compose.yaml");
let shotover = shotover_process("tests/test-configs/kafka/cluster-1-rack/topology-single.yaml")
.start()
Expand All @@ -256,6 +256,13 @@ async fn cluster_1_rack_single_shotover(#[case] driver: KafkaDriver) {
let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192");
test_cases::standard_test_suite(&connection_builder).await;

test_cases::produce_consume_partitions1_kafka_node_goes_down(
&docker_compose,
&connection_builder,
"a176a97a-b98f-4de7-a54a-aee32a874ed2",
)
.await;

tokio::time::timeout(
Duration::from_secs(10),
shotover.shutdown_and_then_consume_events(&[]),
Expand Down Expand Up @@ -376,7 +383,7 @@ async fn cluster_sasl_scram_single_shotover(#[case] driver: KafkaDriver) {
async fn cluster_sasl_scram_over_mtls_single_shotover(#[case] driver: KafkaDriver) {
test_helpers::cert::generate_kafka_test_certs();

let _docker_compose =
let docker_compose =
docker_compose("tests/test-configs/kafka/cluster-sasl-scram-over-mtls/docker-compose.yaml");

// test concurrent connections with different access levels to ensure that:
Expand Down Expand Up @@ -443,6 +450,32 @@ async fn cluster_sasl_scram_over_mtls_single_shotover(#[case] driver: KafkaDrive
.await
.expect("Shotover did not shutdown within 10s");
}

// Test handling of down kafka nodes.
{
let shotover = shotover_process(
"tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology-single.yaml",
)
.start()
.await;

let connection = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192")
.use_sasl_scram("super_user", "super_password");

test_cases::produce_consume_partitions1_kafka_node_goes_down(
&docker_compose,
&connection,
"a176a97a-b98f-4de7-a54a-aee32a874ed2",
)
.await;

tokio::time::timeout(
Duration::from_secs(10),
shotover.shutdown_and_then_consume_events(&[]),
)
.await
.expect("Shotover did not shutdown within 10s");
}
}

async fn assert_connection_fails_with_incorrect_password(driver: KafkaDriver, username: &str) {
Expand Down
120 changes: 116 additions & 4 deletions shotover-proxy/tests/kafka_int_tests/test_cases.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use std::collections::HashMap;
use test_helpers::connection::kafka::{
Acl, AclOperation, AclPermissionType, AlterConfig, ConfigEntry, ExpectedResponse,
KafkaConnectionBuilder, NewPartition, NewTopic, Record, ResourcePatternType, ResourceSpecifier,
ResourceType, TopicPartition,
use test_helpers::{
connection::kafka::{
Acl, AclOperation, AclPermissionType, AlterConfig, ConfigEntry, ExpectedResponse,
KafkaConnectionBuilder, NewPartition, NewTopic, Record, ResourcePatternType,
ResourceSpecifier, ResourceType, TopicPartition,
},
docker_compose::DockerCompose,
};

async fn admin_setup(connection_builder: &KafkaConnectionBuilder) {
Expand Down Expand Up @@ -169,6 +172,115 @@ pub async fn produce_consume_partitions1(
}
}

pub async fn produce_consume_partitions1_kafka_node_goes_down(
docker_compose: &DockerCompose,
connection_builder: &KafkaConnectionBuilder,
topic_name: &str,
) {
{
let producer = connection_builder.connect_producer(1).await;
// create an initial record to force kafka to create the topic if it doesnt yet exist
producer
.assert_produce(
Record {
payload: "initial",
topic_name,
key: Some("Key"),
},
Some(0),
)
.await;

let mut consumer = connection_builder
.connect_consumer(topic_name, "some_group")
.await;
consumer
.assert_consume(ExpectedResponse {
message: "initial".to_owned(),
key: Some("Key".to_owned()),
topic_name: topic_name.to_owned(),
offset: Some(0),
})
.await;

docker_compose.kill_service("kafka1");

// create and consume records
for i in 0..5 {
producer
.assert_produce(
Record {
payload: "Message1",
topic_name,
key: Some("Key"),
},
Some(i * 2 + 1),
)
.await;
producer
.assert_produce(
Record {
payload: "Message2",
topic_name,
key: None,
},
Some(i * 2 + 2),
)
.await;

consumer
.assert_consume(ExpectedResponse {
message: "Message1".to_owned(),
key: Some("Key".to_owned()),
topic_name: topic_name.to_owned(),
offset: Some(i * 2 + 1),
})
.await;

consumer
.assert_consume(ExpectedResponse {
message: "Message2".to_owned(),
key: None,
topic_name: topic_name.to_owned(),
offset: Some(i * 2 + 2),
})
.await;
}
}

// if we create a new consumer it will start from the beginning since auto.offset.reset = earliest and enable.auto.commit false
// so we test that we can access all records ever created on this topic
let mut consumer = connection_builder
.connect_consumer(topic_name, "some_group")
.await;
consumer
.assert_consume(ExpectedResponse {
message: "initial".to_owned(),
key: Some("Key".to_owned()),
topic_name: topic_name.to_owned(),
offset: Some(0),
})
.await;
for i in 0..5 {
consumer
.assert_consume(ExpectedResponse {
message: "Message1".to_owned(),
key: Some("Key".to_owned()),
topic_name: topic_name.to_owned(),
offset: Some(i * 2 + 1),
})
.await;
consumer
.assert_consume(ExpectedResponse {
message: "Message2".to_owned(),
key: None,
topic_name: topic_name.to_owned(),
offset: Some(i * 2 + 2),
})
.await;
}
}

pub async fn produce_consume_commit_offsets_partitions1(
connection_builder: &KafkaConnectionBuilder,
topic_name: &str,
Expand Down
56 changes: 44 additions & 12 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -827,29 +827,61 @@ routing message to a random node so that:
{
if let Some(node) = self
.nodes
.iter_mut()
.iter()
.filter(|node| {
partition
.shotover_rack_replica_nodes
.contains(&node.broker_id)
&& node.state.load(Ordering::Relaxed) == NodeState::Up
})
.choose(&mut self.rng)
{
node.broker_id
} else {
} else if let Some(node) = self
.nodes
.iter()
.filter(|node| {
partition
.external_rack_replica_nodes
.contains(&node.broker_id)
&& node.state.load(Ordering::Relaxed) == NodeState::Up
})
.choose(&mut self.rng)
{
tracing::debug!(
"Routing fetch request to replica outside of shotover's rack"
);
self.nodes
.iter_mut()
.filter(|node| {
partition
.external_rack_replica_nodes
.contains(&node.broker_id)
})
.choose(&mut self.rng)
.unwrap()
.broker_id
node.broker_id
} else if let Some(node) = self
.nodes
.iter()
.filter(|node| {
partition
.shotover_rack_replica_nodes
.contains(&node.broker_id)
})
.choose(&mut self.rng)
{
tracing::debug!(
"Routing fetch request to down node (maybe its come back up by now)"
);
node.broker_id
} else if let Some(node) = self
.nodes
.iter()
.filter(|node| {
partition
.external_rack_replica_nodes
.contains(&node.broker_id)
})
.choose(&mut self.rng)
{
tracing::debug!(
"Routing fetch request to down replica outside of shotover's rack (maybe its come back up by now)"
);
node.broker_id
} else {
panic!("partition metadata is invalid, contains no nodes")
}
} else {
let partition_len = topic_meta.partitions.len();
Expand Down
1 change: 1 addition & 0 deletions shotover/src/transforms/kafka/sink_cluster/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ impl KafkaNode {
}

#[atomic_enum]
#[derive(PartialEq)]
pub enum NodeState {
Up,
Down,
Expand Down

0 comments on commit c0a6cd6

Please sign in to comment.