From 8eb806528818fad83ca01188ef81a742a829de21 Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Wed, 13 Mar 2024 15:14:39 +1100 Subject: [PATCH] fix kafka cluster bug discovered by java driver --- shotover-proxy/tests/kafka_int_tests/mod.rs | 22 ++-- .../src/transforms/kafka/sink_cluster/mod.rs | 114 +++++++++++++----- 2 files changed, 94 insertions(+), 42 deletions(-) diff --git a/shotover-proxy/tests/kafka_int_tests/mod.rs b/shotover-proxy/tests/kafka_int_tests/mod.rs index 74db991c3..5d750ed51 100644 --- a/shotover-proxy/tests/kafka_int_tests/mod.rs +++ b/shotover-proxy/tests/kafka_int_tests/mod.rs @@ -52,10 +52,9 @@ async fn passthrough_tls(#[case] driver: KafkaDriver) { .expect("Shotover did not shutdown within 10s"); } -#[cfg(feature = "rdkafka-driver-tests")] // temporarily needed to avoid a warning #[rstest] #[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))] -// #[case::java(KafkaDriver::Java)] +#[case::java(KafkaDriver::Java)] #[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear async fn cluster_tls(#[case] driver: KafkaDriver) { test_helpers::cert::generate_kafka_test_certs(); @@ -133,10 +132,9 @@ async fn passthrough_sasl_encode(#[case] driver: KafkaDriver) { shotover.shutdown_and_then_consume_events(&[]).await; } -#[cfg(feature = "rdkafka-driver-tests")] // temporarily needed to avoid a warning #[rstest] #[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))] -// #[case::java(KafkaDriver::Java)] +#[case::java(KafkaDriver::Java)] #[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear async fn cluster_1_rack_single_shotover(#[case] driver: KafkaDriver) { let _docker_compose = @@ -156,11 +154,10 @@ async fn cluster_1_rack_single_shotover(#[case] driver: KafkaDriver) { .expect("Shotover did not shutdown within 10s"); } -#[cfg(feature = "rdkafka-driver-tests")] // temporarily needed to avoid a warning #[rstest] #[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))] -// #[case::java(KafkaDriver::Java)] -#[tokio::test(flavor = "multi_thread")] +#[case::java(KafkaDriver::Java)] +#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear async fn cluster_1_rack_multi_shotover(#[case] driver: KafkaDriver) { let _docker_compose = docker_compose("tests/test-configs/kafka/cluster-1-rack/docker-compose.yaml"); @@ -192,10 +189,9 @@ async fn cluster_1_rack_multi_shotover(#[case] driver: KafkaDriver) { } } -#[cfg(feature = "rdkafka-driver-tests")] // temporarily needed to avoid a warning #[rstest] #[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))] -//#[case::java(KafkaDriver::Java)] +#[case::java(KafkaDriver::Java)] #[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear async fn cluster_2_racks_multi_shotover(#[case] driver: KafkaDriver) { let _docker_compose = @@ -233,8 +229,8 @@ async fn cluster_2_racks_multi_shotover(#[case] driver: KafkaDriver) { #[cfg(feature = "rdkafka-driver-tests")] #[rstest] #[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))] -// #[case::java(KafkaDriver::Java)] -#[tokio::test] +#[case::java(KafkaDriver::Java)] +#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear async fn cluster_sasl_single_shotover(#[case] driver: KafkaDriver) { let _docker_compose = docker_compose("tests/test-configs/kafka/cluster-sasl/docker-compose.yaml"); @@ -258,8 +254,8 @@ async fn cluster_sasl_single_shotover(#[case] driver: KafkaDriver) { #[cfg(feature = "rdkafka-driver-tests")] #[rstest] #[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))] -// #[case::java(KafkaDriver::Java)] -#[tokio::test] +#[case::java(KafkaDriver::Java)] +#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear async fn cluster_sasl_multi_shotover(#[case] driver: KafkaDriver) { let _docker_compose = docker_compose("tests/test-configs/kafka/cluster-sasl/docker-compose.yaml"); diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index 80ee06f1e..59f6eae21 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -32,6 +32,14 @@ use tokio::sync::{mpsc, oneshot, RwLock}; use tokio::time::timeout; use uuid::Uuid; +#[derive(thiserror::Error, Debug)] +enum FindCoordinatorError { + #[error("Coordinator not available")] + CoordinatorNotAvailable, + #[error("{0:?}")] + Unrecoverable(#[from] anyhow::Error), +} + mod node; #[derive(Serialize, Deserialize, Debug)] @@ -355,10 +363,20 @@ impl KafkaSinkCluster { } for group in groups { - let node = self.find_coordinator_of_group(group.clone()).await?; - self.group_to_coordinator_broker - .insert(group, node.broker_id); - self.add_node_if_new(node).await; + match self.find_coordinator_of_group(group.clone()).await { + Ok(node) => { + self.group_to_coordinator_broker + .insert(group, node.broker_id); + self.add_node_if_new(node).await; + } + Err(FindCoordinatorError::CoordinatorNotAvailable) => { + // We cant find the coordinator so do nothing so that the request will be routed to a random node: + // * If it happens to be the coordinator all is well + // * If its not the coordinator then it will return a COORDINATOR_NOT_AVAILABLE message to + // the client prompting it to retry the whole process again. + } + Err(FindCoordinatorError::Unrecoverable(err)) => Err(err)?, + } } // request and process metadata if we are missing topics or the controller broker id @@ -445,21 +463,32 @@ impl KafkaSinkCluster { .first() .ok_or_else(|| anyhow!("No topics in produce message"))?; let connection = if let Some(topic_meta) = self.topics.get(&topic.topic.0) { - // assume that all partitions in this topic have the same routing requirements - let partition = &topic_meta.partitions[topic + let partition_index = topic .partitions .first() .ok_or_else(|| anyhow!("No partitions in topic"))? - .partition - as usize]; - self.nodes - .iter_mut() - .filter(|node| partition.replica_nodes.contains(&node.broker_id)) - .choose(&mut self.rng) - .unwrap() - .get_connection(&self.connection_factory) - .await? - .clone() + .partition as usize; + // assume that all partitions in this topic have the same routing requirements + if let Some(partition) = topic_meta.partitions.get(partition_index) { + self.nodes + .iter_mut() + .filter(|node| partition.replica_nodes.contains(&node.broker_id)) + .choose(&mut self.rng) + .unwrap() + .get_connection(&self.connection_factory) + .await? + .clone() + } else { + let topic = &topic.topic; + let partition_len = topic_meta.partitions.len(); + tracing::warn!("no known partition replica for {topic:?} at index {partition_index}/{partition_len}, routing message to a random node so that a NOT_LEADER_OR_FOLLOWER or similar error is returned to the client"); + self.nodes + .choose_mut(&mut self.rng) + .unwrap() + .get_connection(&self.connection_factory) + .await? + .clone() + } } else { let topic = &topic.topic; tracing::warn!("no known partition replica for {topic:?}, routing message to a random node so that a NOT_LEADER_OR_FOLLOWER or similar error is returned to the client"); @@ -498,9 +527,19 @@ impl KafkaSinkCluster { } Some(Frame::Kafka(KafkaFrame::Request { body: RequestBody::OffsetFetch(offset_fetch), - .. + header, })) => { - let group_id = offset_fetch.group_id.clone(); + let group_id = if header.request_api_version <= 7 { + offset_fetch.group_id.clone() + } else { + // This is possibly dangerous. + // The client could construct a message which is valid for a specific shotover node, but not for any single kafka broker. + // We may need to add some logic to split the request into multiple messages going to different destinations, + // and then reconstruct the response back into a single response + // + // For now just pick the first group as that is sufficient for the simple cases. + offset_fetch.groups.first().unwrap().group_id.clone() + }; results.push(self.route_to_coordinator(message, group_id).await?); } Some(Frame::Kafka(KafkaFrame::Request { @@ -602,7 +641,10 @@ impl KafkaSinkCluster { Ok(()) } - async fn find_coordinator_of_group(&mut self, group: GroupId) -> Result { + async fn find_coordinator_of_group( + &mut self, + group: GroupId, + ) -> Result { let request = Message::from_frame(Frame::Kafka(KafkaFrame::Request { header: RequestHeader::builder() .request_api_key(ApiKey::FindCoordinatorKey as i16) @@ -637,14 +679,21 @@ impl KafkaSinkCluster { Some(Frame::Kafka(KafkaFrame::Response { body: ResponseBody::FindCoordinator(coordinator), .. - })) => Ok(KafkaNode::new( - coordinator.node_id, - KafkaAddress::new(coordinator.host.clone(), coordinator.port), - None, - )), + })) => { + if coordinator.error_code == 0 { + tracing::warn!("generated {coordinator:?}"); + Ok(KafkaNode::new( + coordinator.node_id, + KafkaAddress::new(coordinator.host.clone(), coordinator.port), + None, + )) + } else { + Err(FindCoordinatorError::CoordinatorNotAvailable) + } + } other => Err(anyhow!( "Unexpected message returned to findcoordinator request {other:?}" - )), + ))?, } } @@ -712,6 +761,7 @@ impl KafkaSinkCluster { version, .. })) => { + tracing::warn!("requested {find_coordinator:?}"); let request = find_coordinator_requests .iter() .find(|x| x.index == i) @@ -785,7 +835,9 @@ impl KafkaSinkCluster { for node in &mut self.nodes { if let Some(broker_id) = self.group_to_coordinator_broker.get(&group_id) { if node.broker_id == *broker_id { + tracing::warn!("Route to {:?}", *broker_id); connection = Some(node.get_connection(&self.connection_factory).await?.clone()); + break; } } } @@ -847,12 +899,16 @@ impl KafkaSinkCluster { ) { if request.key_type == 0 { if version <= 3 { - self.group_to_coordinator_broker - .insert(GroupId(request.key.clone()), find_coordinator.node_id); + if find_coordinator.error_code == 0 { + self.group_to_coordinator_broker + .insert(GroupId(request.key.clone()), find_coordinator.node_id); + } } else { for coordinator in &find_coordinator.coordinators { - self.group_to_coordinator_broker - .insert(GroupId(coordinator.key.clone()), find_coordinator.node_id); + if coordinator.error_code == 0 { + self.group_to_coordinator_broker + .insert(GroupId(coordinator.key.clone()), coordinator.node_id); + } } } }