diff --git a/shotover-proxy/tests/kafka_int_tests/test_cases.rs b/shotover-proxy/tests/kafka_int_tests/test_cases.rs index 1578da805..611c692d4 100644 --- a/shotover-proxy/tests/kafka_int_tests/test_cases.rs +++ b/shotover-proxy/tests/kafka_int_tests/test_cases.rs @@ -241,6 +241,24 @@ pub async fn standard_test_suite(connection_builder: &KafkaConnectionBuilder) { produce_consume_partitions1(connection_builder, "partitions1").await; produce_consume_partitions1(connection_builder, "unknown_topic").await; produce_consume_partitions3(connection_builder).await; + + // Only run this test case on the java driver, + // since even without going through shotover the cpp driver fails this test. + #[allow(irrefutable_let_patterns)] + if let KafkaConnectionBuilder::Java(_) = connection_builder { + // delete and recreate topic to force shotover to adjust its existing routing metadata + let admin = connection_builder.connect_admin().await; + admin.delete_topics(&["partitions1"]).await; + admin + .create_topics(&[NewTopic { + name: "partitions1", + num_partitions: 1, + replication_factor: 1, + }]) + .await; + produce_consume_partitions1(connection_builder, "partitions1").await; + } + produce_consume_acks0(connection_builder).await; connection_builder.admin_cleanup().await; } diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index b8e4fd773..d79989a29 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -216,7 +216,11 @@ impl TransformBuilder for KafkaSinkClusterBuilder { first_contact_node: None, control_connection: None, pending_requests: Default::default(), + // TODO: this approach with `find_coordinator_requests` and `routed_to_coordinator_for_group` + // is prone to memory leaks and logic errors. + // We should replace these fields with extra state within `PendingRequestTy::Received/Sent`. find_coordinator_requests: Default::default(), + routed_to_coordinator_for_group: Default::default(), temp_responses_buffer: Default::default(), sasl_mechanism: None, authorize_scram_over_mtls: self.authorize_scram_over_mtls.as_ref().map(|x| x.build()), @@ -245,6 +249,10 @@ impl AtomicBrokerId { .store(value.0.into(), std::sync::atomic::Ordering::Relaxed) } + fn clear(&self) { + self.0.store(i64::MAX, std::sync::atomic::Ordering::Relaxed) + } + /// Returns `None` when set has never been called. /// Otherwise returns `Some` containing the latest set value. fn get(&self) -> Option { @@ -274,6 +282,7 @@ pub struct KafkaSinkCluster { /// Ordering must be maintained to ensure responses match up with their request. pending_requests: VecDeque, find_coordinator_requests: MessageIdMap, + routed_to_coordinator_for_group: MessageIdMap, /// A temporary buffer used when receiving responses, only held onto in order to avoid reallocating. temp_responses_buffer: Vec, sasl_mechanism: Option, @@ -661,6 +670,7 @@ impl KafkaSinkCluster { body: RequestBody::DeleteGroups(groups), .. })) => { + // TODO: we need to split this up into multiple requests so it can be correctly routed to all possible nodes let group_id = groups.groups_names.first().unwrap().clone(); self.route_to_coordinator(message, group_id); } @@ -1186,7 +1196,6 @@ routing message to a random node so that: } async fn process_responses(&mut self, responses: &mut [Message]) -> Result<()> { - // TODO: Handle errors like NOT_COORDINATOR by removing element from self.topics and self.coordinator_broker_id for response in responses.iter_mut() { let request_id = response.request_id().unwrap(); match response.frame() { @@ -1234,6 +1243,128 @@ routing message to a random node so that: })) => { self.process_sasl_authenticate(authenticate).await?; } + Some(Frame::Kafka(KafkaFrame::Response { + body: ResponseBody::Produce(produce), + .. + })) => { + for (topic_name, response_topic) in &produce.responses { + for response_partition in &response_topic.partition_responses { + if let Some(ResponseError::NotLeaderOrFollower) = + ResponseError::try_from_code(response_partition.error_code) + { + if response_partition.current_leader.leader_id != -1 { + // The broker has informed us who the new leader is, we can just directly update the leader + if let Some(mut stored_topic) = + self.topic_by_name.get_mut(topic_name) + { + if let Some(stored_partition) = stored_topic + .partitions + .get_mut(response_partition.index as usize) + { + if response_partition.current_leader.leader_epoch + > stored_partition.leader_epoch + { + stored_partition.leader_id = + response_partition.current_leader.leader_id; + stored_partition.leader_epoch = + response_partition.current_leader.leader_epoch; + } + tracing::info!( + "Produce response included error NOT_LEADER_OR_FOLLOWER and so updated leader in topic {:?} partition {}", + topic_name, + response_partition.index + ); + } + } + } else { + // The broker doesnt know who the new leader is, clear the entire topic. + self.topic_by_name.remove(topic_name); + tracing::info!( + "Produce response included error NOT_LEADER_OR_FOLLOWER and so cleared topic {:?}", + topic_name, + ); + break; + } + } + } + } + } + Some(Frame::Kafka(KafkaFrame::Response { + body: ResponseBody::Fetch(fetch), + .. + })) => { + for response in &fetch.responses { + for partition in &response.partitions { + if let Some(ResponseError::NotLeaderOrFollower) = + ResponseError::try_from_code(partition.error_code) + { + // The fetch response includes the leader_id which a client could could use to route a fetch request to, + // but we cant use it to fix our list of replicas, so our only option is to clear the whole thing. + self.topic_by_name.remove(&response.topic); + self.topic_by_id.remove(&response.topic_id); + tracing::info!( + "Fetch response included error NOT_LEADER_OR_FOLLOWER and so cleared topic {:?} {:?}", + response.topic, + response.topic_id + ); + break; + } + } + } + } + Some(Frame::Kafka(KafkaFrame::Response { + body: ResponseBody::Heartbeat(heartbeat), + .. + })) => self.handle_coordinator_routing_error(request_id, heartbeat.error_code), + Some(Frame::Kafka(KafkaFrame::Response { + body: ResponseBody::SyncGroup(sync_group), + .. + })) => self.handle_coordinator_routing_error(request_id, sync_group.error_code), + Some(Frame::Kafka(KafkaFrame::Response { + body: ResponseBody::OffsetFetch(offset_fetch), + .. + })) => self.handle_coordinator_routing_error(request_id, offset_fetch.error_code), + Some(Frame::Kafka(KafkaFrame::Response { + body: ResponseBody::JoinGroup(join_group), + .. + })) => self.handle_coordinator_routing_error(request_id, join_group.error_code), + Some(Frame::Kafka(KafkaFrame::Response { + body: ResponseBody::LeaveGroup(leave_group), + .. + })) => self.handle_coordinator_routing_error(request_id, leave_group.error_code), + Some(Frame::Kafka(KafkaFrame::Response { + body: ResponseBody::DeleteGroups(delete_groups), + .. + })) => { + for (group_id, result) in &delete_groups.results { + if let Some(ResponseError::NotCoordinator) = + ResponseError::try_from_code(result.error_code) + { + // Need to run this to avoid memory leaks, since route_to_coordinator is called for DeleteGroup requests + self.routed_to_coordinator_for_group.remove(&request_id); + + // Need to run this to ensure we remove for all groups + self.group_to_coordinator_broker.remove(group_id); + } + } + } + Some(Frame::Kafka(KafkaFrame::Response { + body: ResponseBody::CreateTopics(create_topics), + .. + })) => { + for topic in create_topics.topics.values() { + if let Some(ResponseError::NotController) = + ResponseError::try_from_code(topic.error_code) + { + tracing::info!( + "Response to CreateTopics included error NOT_CONTROLLER and so reset controller broker, previously was {:?}", + self.controller_broker.get() + ); + self.controller_broker.clear(); + break; + } + } + } Some(Frame::Kafka(KafkaFrame::Response { body: ResponseBody::Metadata(metadata), .. @@ -1259,6 +1390,20 @@ routing message to a random node so that: Ok(()) } + /// This method must be called for every response to a request that was routed via `route_to_coordinator` + fn handle_coordinator_routing_error(&mut self, request_id: u128, error_code: i16) { + if let Some(ResponseError::NotCoordinator) = ResponseError::try_from_code(error_code) { + if let Some(group_id) = self.routed_to_coordinator_for_group.remove(&request_id) { + let broker_id = self.group_to_coordinator_broker.remove(&group_id); + tracing::info!( + "Response was error NOT_COORDINATOR and so cleared group id {:?} coordinator mapping to broker {:?}", + group_id, + broker_id, + ); + } + } + } + async fn process_sasl_authenticate( &mut self, authenticate: &mut SaslAuthenticateResponse, @@ -1338,6 +1483,8 @@ routing message to a random node so that: fn route_to_coordinator(&mut self, request: Message, group_id: GroupId) { let destination = self.group_to_coordinator_broker.get(&group_id); + self.routed_to_coordinator_for_group + .insert(request.id(), group_id.clone()); let destination = match destination { Some(destination) => *destination, None => {