Skip to content

Commit

Permalink
fix kafka cluster bug discovered by java driver
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Mar 15, 2024
1 parent 97f027a commit 8eb8065
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 42 deletions.
22 changes: 9 additions & 13 deletions shotover-proxy/tests/kafka_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,9 @@ async fn passthrough_tls(#[case] driver: KafkaDriver) {
.expect("Shotover did not shutdown within 10s");
}

#[cfg(feature = "rdkafka-driver-tests")] // temporarily needed to avoid a warning
#[rstest]
#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))]
// #[case::java(KafkaDriver::Java)]
#[case::java(KafkaDriver::Java)]
#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear
async fn cluster_tls(#[case] driver: KafkaDriver) {
test_helpers::cert::generate_kafka_test_certs();
Expand Down Expand Up @@ -133,10 +132,9 @@ async fn passthrough_sasl_encode(#[case] driver: KafkaDriver) {
shotover.shutdown_and_then_consume_events(&[]).await;
}

#[cfg(feature = "rdkafka-driver-tests")] // temporarily needed to avoid a warning
#[rstest]
#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))]
// #[case::java(KafkaDriver::Java)]
#[case::java(KafkaDriver::Java)]
#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear
async fn cluster_1_rack_single_shotover(#[case] driver: KafkaDriver) {
let _docker_compose =
Expand All @@ -156,11 +154,10 @@ async fn cluster_1_rack_single_shotover(#[case] driver: KafkaDriver) {
.expect("Shotover did not shutdown within 10s");
}

#[cfg(feature = "rdkafka-driver-tests")] // temporarily needed to avoid a warning
#[rstest]
#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))]
// #[case::java(KafkaDriver::Java)]
#[tokio::test(flavor = "multi_thread")]
#[case::java(KafkaDriver::Java)]
#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear
async fn cluster_1_rack_multi_shotover(#[case] driver: KafkaDriver) {
let _docker_compose =
docker_compose("tests/test-configs/kafka/cluster-1-rack/docker-compose.yaml");
Expand Down Expand Up @@ -192,10 +189,9 @@ async fn cluster_1_rack_multi_shotover(#[case] driver: KafkaDriver) {
}
}

#[cfg(feature = "rdkafka-driver-tests")] // temporarily needed to avoid a warning
#[rstest]
#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))]
//#[case::java(KafkaDriver::Java)]
#[case::java(KafkaDriver::Java)]
#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear
async fn cluster_2_racks_multi_shotover(#[case] driver: KafkaDriver) {
let _docker_compose =
Expand Down Expand Up @@ -233,8 +229,8 @@ async fn cluster_2_racks_multi_shotover(#[case] driver: KafkaDriver) {
#[cfg(feature = "rdkafka-driver-tests")]
#[rstest]
#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))]
// #[case::java(KafkaDriver::Java)]
#[tokio::test]
#[case::java(KafkaDriver::Java)]
#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear
async fn cluster_sasl_single_shotover(#[case] driver: KafkaDriver) {
let _docker_compose =
docker_compose("tests/test-configs/kafka/cluster-sasl/docker-compose.yaml");
Expand All @@ -258,8 +254,8 @@ async fn cluster_sasl_single_shotover(#[case] driver: KafkaDriver) {
#[cfg(feature = "rdkafka-driver-tests")]
#[rstest]
#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))]
// #[case::java(KafkaDriver::Java)]
#[tokio::test]
#[case::java(KafkaDriver::Java)]
#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear
async fn cluster_sasl_multi_shotover(#[case] driver: KafkaDriver) {
let _docker_compose =
docker_compose("tests/test-configs/kafka/cluster-sasl/docker-compose.yaml");
Expand Down
114 changes: 85 additions & 29 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ use tokio::sync::{mpsc, oneshot, RwLock};
use tokio::time::timeout;
use uuid::Uuid;

#[derive(thiserror::Error, Debug)]
enum FindCoordinatorError {
#[error("Coordinator not available")]
CoordinatorNotAvailable,
#[error("{0:?}")]
Unrecoverable(#[from] anyhow::Error),
}

mod node;

#[derive(Serialize, Deserialize, Debug)]
Expand Down Expand Up @@ -355,10 +363,20 @@ impl KafkaSinkCluster {
}

for group in groups {
let node = self.find_coordinator_of_group(group.clone()).await?;
self.group_to_coordinator_broker
.insert(group, node.broker_id);
self.add_node_if_new(node).await;
match self.find_coordinator_of_group(group.clone()).await {
Ok(node) => {
self.group_to_coordinator_broker
.insert(group, node.broker_id);
self.add_node_if_new(node).await;
}
Err(FindCoordinatorError::CoordinatorNotAvailable) => {
// We cant find the coordinator so do nothing so that the request will be routed to a random node:
// * If it happens to be the coordinator all is well
// * If its not the coordinator then it will return a COORDINATOR_NOT_AVAILABLE message to
// the client prompting it to retry the whole process again.
}
Err(FindCoordinatorError::Unrecoverable(err)) => Err(err)?,
}
}

// request and process metadata if we are missing topics or the controller broker id
Expand Down Expand Up @@ -445,21 +463,32 @@ impl KafkaSinkCluster {
.first()
.ok_or_else(|| anyhow!("No topics in produce message"))?;
let connection = if let Some(topic_meta) = self.topics.get(&topic.topic.0) {
// assume that all partitions in this topic have the same routing requirements
let partition = &topic_meta.partitions[topic
let partition_index = topic
.partitions
.first()
.ok_or_else(|| anyhow!("No partitions in topic"))?
.partition
as usize];
self.nodes
.iter_mut()
.filter(|node| partition.replica_nodes.contains(&node.broker_id))
.choose(&mut self.rng)
.unwrap()
.get_connection(&self.connection_factory)
.await?
.clone()
.partition as usize;
// assume that all partitions in this topic have the same routing requirements
if let Some(partition) = topic_meta.partitions.get(partition_index) {
self.nodes
.iter_mut()
.filter(|node| partition.replica_nodes.contains(&node.broker_id))
.choose(&mut self.rng)
.unwrap()
.get_connection(&self.connection_factory)
.await?
.clone()
} else {
let topic = &topic.topic;
let partition_len = topic_meta.partitions.len();
tracing::warn!("no known partition replica for {topic:?} at index {partition_index}/{partition_len}, routing message to a random node so that a NOT_LEADER_OR_FOLLOWER or similar error is returned to the client");
self.nodes
.choose_mut(&mut self.rng)
.unwrap()
.get_connection(&self.connection_factory)
.await?
.clone()
}
} else {
let topic = &topic.topic;
tracing::warn!("no known partition replica for {topic:?}, routing message to a random node so that a NOT_LEADER_OR_FOLLOWER or similar error is returned to the client");
Expand Down Expand Up @@ -498,9 +527,19 @@ impl KafkaSinkCluster {
}
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::OffsetFetch(offset_fetch),
..
header,
})) => {
let group_id = offset_fetch.group_id.clone();
let group_id = if header.request_api_version <= 7 {
offset_fetch.group_id.clone()
} else {
// This is possibly dangerous.
// The client could construct a message which is valid for a specific shotover node, but not for any single kafka broker.
// We may need to add some logic to split the request into multiple messages going to different destinations,
// and then reconstruct the response back into a single response
//
// For now just pick the first group as that is sufficient for the simple cases.
offset_fetch.groups.first().unwrap().group_id.clone()
};
results.push(self.route_to_coordinator(message, group_id).await?);
}
Some(Frame::Kafka(KafkaFrame::Request {
Expand Down Expand Up @@ -602,7 +641,10 @@ impl KafkaSinkCluster {
Ok(())
}

async fn find_coordinator_of_group(&mut self, group: GroupId) -> Result<KafkaNode> {
async fn find_coordinator_of_group(
&mut self,
group: GroupId,
) -> Result<KafkaNode, FindCoordinatorError> {
let request = Message::from_frame(Frame::Kafka(KafkaFrame::Request {
header: RequestHeader::builder()
.request_api_key(ApiKey::FindCoordinatorKey as i16)
Expand Down Expand Up @@ -637,14 +679,21 @@ impl KafkaSinkCluster {
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::FindCoordinator(coordinator),
..
})) => Ok(KafkaNode::new(
coordinator.node_id,
KafkaAddress::new(coordinator.host.clone(), coordinator.port),
None,
)),
})) => {
if coordinator.error_code == 0 {
tracing::warn!("generated {coordinator:?}");
Ok(KafkaNode::new(
coordinator.node_id,
KafkaAddress::new(coordinator.host.clone(), coordinator.port),
None,
))
} else {
Err(FindCoordinatorError::CoordinatorNotAvailable)
}
}
other => Err(anyhow!(
"Unexpected message returned to findcoordinator request {other:?}"
)),
))?,
}
}

Expand Down Expand Up @@ -712,6 +761,7 @@ impl KafkaSinkCluster {
version,
..
})) => {
tracing::warn!("requested {find_coordinator:?}");
let request = find_coordinator_requests
.iter()
.find(|x| x.index == i)
Expand Down Expand Up @@ -785,7 +835,9 @@ impl KafkaSinkCluster {
for node in &mut self.nodes {
if let Some(broker_id) = self.group_to_coordinator_broker.get(&group_id) {
if node.broker_id == *broker_id {
tracing::warn!("Route to {:?}", *broker_id);
connection = Some(node.get_connection(&self.connection_factory).await?.clone());
break;
}
}
}
Expand Down Expand Up @@ -847,12 +899,16 @@ impl KafkaSinkCluster {
) {
if request.key_type == 0 {
if version <= 3 {
self.group_to_coordinator_broker
.insert(GroupId(request.key.clone()), find_coordinator.node_id);
if find_coordinator.error_code == 0 {
self.group_to_coordinator_broker
.insert(GroupId(request.key.clone()), find_coordinator.node_id);
}
} else {
for coordinator in &find_coordinator.coordinators {
self.group_to_coordinator_broker
.insert(GroupId(coordinator.key.clone()), find_coordinator.node_id);
if coordinator.error_code == 0 {
self.group_to_coordinator_broker
.insert(GroupId(coordinator.key.clone()), coordinator.node_id);
}
}
}
}
Expand Down

0 comments on commit 8eb8065

Please sign in to comment.