Skip to content

Commit

Permalink
KafkaSinkCluster route by NodeState
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Aug 30, 2024
1 parent 149dc70 commit acf2b6c
Show file tree
Hide file tree
Showing 3 changed files with 251 additions and 41 deletions.
84 changes: 72 additions & 12 deletions shotover-proxy/tests/kafka_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,21 +247,52 @@ async fn single_sasl_scram_plaintext_source_tls_sink(#[case] driver: KafkaDriver
#[case::java(KafkaDriver::Java)]
#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear
async fn cluster_1_rack_single_shotover(#[case] driver: KafkaDriver) {
let _docker_compose =
let docker_compose =
docker_compose("tests/test-configs/kafka/cluster-1-rack/docker-compose.yaml");
let shotover = shotover_process("tests/test-configs/kafka/cluster-1-rack/topology-single.yaml")
.start()

{
let shotover =
shotover_process("tests/test-configs/kafka/cluster-1-rack/topology-single.yaml")
.start()
.await;

let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192");
test_cases::standard_test_suite(&connection_builder).await;
tokio::time::timeout(
Duration::from_secs(10),
shotover.shutdown_and_then_consume_events(&[]),
)
.await
.expect("Shotover did not shutdown within 10s");
}

{
let shotover =
shotover_process("tests/test-configs/kafka/cluster-1-rack/topology-single.yaml")
.start()
.await;
let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192");

test_cases::produce_consume_partitions1_kafka_node_goes_down(
&docker_compose,
&connection_builder,
"kafka_node_goes_down_test",
)
.await;

let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192");
test_cases::cluster_test_suite(&connection_builder).await;
let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192");
test_cases::cluster_test_suite(&connection_builder).await;

tokio::time::timeout(
Duration::from_secs(10),
shotover.shutdown_and_then_consume_events(&[]),
)
.await
.expect("Shotover did not shutdown within 10s");
// Shotover can reasonably hit many kinds of errors due to a kafka node down so ignore all of them.
tokio::time::timeout(
Duration::from_secs(10),
shotover.shutdown_and_then_consume_events(&[EventMatcher::new()
.with_level(Level::Error)
.with_count(Count::Any)]),
)
.await
.expect("Shotover did not shutdown within 10s");
}
}

#[rstest]
Expand Down Expand Up @@ -376,7 +407,7 @@ async fn cluster_sasl_scram_single_shotover(#[case] driver: KafkaDriver) {
async fn cluster_sasl_scram_over_mtls_single_shotover(#[case] driver: KafkaDriver) {
test_helpers::cert::generate_kafka_test_certs();

let _docker_compose =
let docker_compose =
docker_compose("tests/test-configs/kafka/cluster-sasl-scram-over-mtls/docker-compose.yaml");

// test concurrent connections with different access levels to ensure that:
Expand Down Expand Up @@ -443,6 +474,35 @@ async fn cluster_sasl_scram_over_mtls_single_shotover(#[case] driver: KafkaDrive
.await
.expect("Shotover did not shutdown within 10s");
}

// Test handling of down kafka nodes.
{
let shotover = shotover_process(
"tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology-single.yaml",
)
.start()
.await;

let connection = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192")
.use_sasl_scram("super_user", "super_password");

test_cases::produce_consume_partitions1_kafka_node_goes_down(
&docker_compose,
&connection,
"kafka_node_goes_down_test",
)
.await;

// Shotover can reasonably hit many kinds of errors due to a kafka node down so ignore all of them.
tokio::time::timeout(
Duration::from_secs(10),
shotover.shutdown_and_then_consume_events(&[EventMatcher::new()
.with_level(Level::Error)
.with_count(Count::Any)]),
)
.await
.expect("Shotover did not shutdown within 10s");
}
}

async fn assert_connection_fails_with_incorrect_password(driver: KafkaDriver, username: &str) {
Expand Down
129 changes: 125 additions & 4 deletions shotover-proxy/tests/kafka_int_tests/test_cases.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use std::collections::HashMap;
use test_helpers::connection::kafka::{
Acl, AclOperation, AclPermissionType, AlterConfig, ConfigEntry, ExpectedResponse,
KafkaConnectionBuilder, NewPartition, NewTopic, Record, ResourcePatternType, ResourceSpecifier,
ResourceType, TopicPartition,
use test_helpers::{
connection::kafka::{
Acl, AclOperation, AclPermissionType, AlterConfig, ConfigEntry, ExpectedResponse,
KafkaConnectionBuilder, NewPartition, NewTopic, Record, ResourcePatternType,
ResourceSpecifier, ResourceType, TopicPartition,
},
docker_compose::DockerCompose,
};

async fn admin_setup(connection_builder: &KafkaConnectionBuilder) {
Expand Down Expand Up @@ -169,6 +172,124 @@ pub async fn produce_consume_partitions1(
}
}

pub async fn produce_consume_partitions1_kafka_node_goes_down(
docker_compose: &DockerCompose,
connection_builder: &KafkaConnectionBuilder,
topic_name: &str,
) {
let admin = connection_builder.connect_admin().await;
admin
.create_topics(&[NewTopic {
name: topic_name,
num_partitions: 1,
replication_factor: 3,
}])
.await;

{
let producer = connection_builder.connect_producer("all").await;
// create an initial record to force kafka to create the topic if it doesnt yet exist
producer
.assert_produce(
Record {
payload: "initial",
topic_name,
key: Some("Key"),
},
Some(0),
)
.await;

let mut consumer = connection_builder
.connect_consumer(topic_name, "some_group")
.await;
consumer
.assert_consume(ExpectedResponse {
message: "initial".to_owned(),
key: Some("Key".to_owned()),
topic_name: topic_name.to_owned(),
offset: Some(0),
})
.await;

docker_compose.kill_service("kafka1");

// create and consume records
for i in 0..5 {
producer
.assert_produce(
Record {
payload: "Message1",
topic_name,
key: Some("Key"),
},
Some(i * 2 + 1),
)
.await;
producer
.assert_produce(
Record {
payload: "Message2",
topic_name,
key: None,
},
Some(i * 2 + 2),
)
.await;

consumer
.assert_consume(ExpectedResponse {
message: "Message1".to_owned(),
key: Some("Key".to_owned()),
topic_name: topic_name.to_owned(),
offset: Some(i * 2 + 1),
})
.await;

consumer
.assert_consume(ExpectedResponse {
message: "Message2".to_owned(),
key: None,
topic_name: topic_name.to_owned(),
offset: Some(i * 2 + 2),
})
.await;
}
}

// if we create a new consumer it will start from the beginning since auto.offset.reset = earliest and enable.auto.commit false
// so we test that we can access all records ever created on this topic
let mut consumer = connection_builder
.connect_consumer(topic_name, "some_group")
.await;
consumer
.assert_consume(ExpectedResponse {
message: "initial".to_owned(),
key: Some("Key".to_owned()),
topic_name: topic_name.to_owned(),
offset: Some(0),
})
.await;
for i in 0..5 {
consumer
.assert_consume(ExpectedResponse {
message: "Message1".to_owned(),
key: Some("Key".to_owned()),
topic_name: topic_name.to_owned(),
offset: Some(i * 2 + 1),
})
.await;
consumer
.assert_consume(ExpectedResponse {
message: "Message2".to_owned(),
key: None,
topic_name: topic_name.to_owned(),
offset: Some(i * 2 + 2),
})
.await;
}
}

pub async fn produce_consume_commit_offsets_partitions1(
connection_builder: &KafkaConnectionBuilder,
topic_name: &str,
Expand Down
79 changes: 54 additions & 25 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,8 @@ impl KafkaSinkCluster {
Instant::now(),
Destination::ControlConnection,
)
.await?;
.await
.context("Failed to get control connection")?;
connection.send(vec![request])?;
Ok(connection.recv().await?.remove(0))
}
Expand All @@ -474,10 +475,18 @@ impl KafkaSinkCluster {
topics.push(topic);
}
}

fn store_group(&self, groups: &mut Vec<GroupId>, group_id: GroupId) {
if self.group_to_coordinator_broker.get(&group_id).is_none() && !groups.contains(&group_id)
{
let up_node_exists = match self.group_to_coordinator_broker.get(&group_id) {
Some(broker_id) => self
.nodes
.iter()
.find(|node| node.broker_id == *broker_id)
.map(|node| node.is_up())
.unwrap_or(false),
None => false,
};

if !up_node_exists && !groups.contains(&group_id) {
groups.push(group_id);
}
}
Expand Down Expand Up @@ -1055,7 +1064,7 @@ routing message to a random node so that:
}

let recent_instant = Instant::now();
for (destination, requests) in broker_to_routed_requests {
for (destination, mut requests) in broker_to_routed_requests {
if let Err(err) = self
.connections
.get_or_open_connection(
Expand All @@ -1069,7 +1078,20 @@ routing message to a random node so that:
recent_instant,
destination,
)
.await?
.await
.with_context(|| {
let request_types: Vec<String> = requests
.requests
.iter_mut()
.map(|x| match x.frame() {
Some(Frame::Kafka(KafkaFrame::Request { header, .. })) => {
format!("{:?}", ApiKey::try_from(header.request_api_key).unwrap())
}
_ => "Unknown".to_owned(),
})
.collect();
format!("Failed to get connection to send requests {request_types:?}")
})?
.send(requests.requests)
{
// Dont retry the send on the new connection since we cant tell if the broker received the request or not.
Expand Down Expand Up @@ -1760,25 +1782,32 @@ routing message to a random node so that:
for (_, topic) in &mut metadata.topics {
for partition in &mut topic.partitions {
// Deterministically choose a single shotover node in the rack as leader based on topic + partition id
let leader_rack = self
.nodes
.iter()
.find(|x| x.broker_id == *partition.leader_id)
.map(|x| x.rack.clone())
.unwrap();
let shotover_nodes_in_rack: Vec<_> = self
.shotover_nodes
.iter()
.filter(|shotover_node| {
leader_rack
.as_ref()
.map(|rack| rack == &shotover_node.rack)
.unwrap_or(true)
})
.collect();
let hash = hash_partition(topic.topic_id, partition.partition_index);
let shotover_node = &shotover_nodes_in_rack[hash % shotover_nodes_in_rack.len()];
partition.leader_id = shotover_node.broker_id;
if partition.leader_id == -1 {
// -1 indicates that the leader is offline, leave it as is so the client can tell the leader is offline.
} else {
let leader_rack = self
.nodes
.iter()
.find(|x| x.broker_id == *partition.leader_id)
.map(|x| x.rack.clone())
.unwrap_or_else(|| {
panic!("Unable to find leader_id {:?}", partition.leader_id)
});
let shotover_nodes_in_rack: Vec<_> = self
.shotover_nodes
.iter()
.filter(|shotover_node| {
leader_rack
.as_ref()
.map(|rack| rack == &shotover_node.rack)
.unwrap_or(true)
})
.collect();
let hash = hash_partition(topic.topic_id, partition.partition_index);
let shotover_node =
&shotover_nodes_in_rack[hash % shotover_nodes_in_rack.len()];
partition.leader_id = shotover_node.broker_id;
}

// Every replica node has its entire corresponding shotover rack included.
// Since we can set as many replica nodes as we like, we take this all out approach.
Expand Down

0 comments on commit acf2b6c

Please sign in to comment.