From 157f199466fbc6f778714bf4568f9939b8dc9afb Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Mon, 11 Nov 2024 17:42:15 +1100 Subject: [PATCH] KafkaSinkCluster: Fix DeleteRecords routing (#1803) --- .../tests/kafka_int_tests/test_cases.rs | 66 ++++++++- .../src/transforms/kafka/sink_cluster/mod.rs | 129 ++++++++++++++++-- .../transforms/kafka/sink_cluster/split.rs | 37 ++++- test-helpers/src/connection/kafka/mod.rs | 2 +- 4 files changed, 216 insertions(+), 18 deletions(-) diff --git a/shotover-proxy/tests/kafka_int_tests/test_cases.rs b/shotover-proxy/tests/kafka_int_tests/test_cases.rs index 7a6b0576f..c8c32a6bf 100644 --- a/shotover-proxy/tests/kafka_int_tests/test_cases.rs +++ b/shotover-proxy/tests/kafka_int_tests/test_cases.rs @@ -134,7 +134,8 @@ async fn admin_cleanup(connection_builder: &KafkaConnectionBuilder) { let admin = connection_builder.connect_admin().await; admin.delete_groups(&["some_group", "some_group1"]).await; - delete_records(&admin, connection_builder).await; + delete_records_partitions1(&admin, connection_builder).await; + delete_records_partitions3(&admin, connection_builder).await; } async fn delete_offsets(connection_builder: &KafkaConnectionBuilder) { @@ -181,7 +182,10 @@ async fn delete_offsets(connection_builder: &KafkaConnectionBuilder) { } } -async fn delete_records(admin: &KafkaAdmin, connection_builder: &KafkaConnectionBuilder) { +async fn delete_records_partitions1( + admin: &KafkaAdmin, + connection_builder: &KafkaConnectionBuilder, +) { // Only supported by java driver #[allow(irrefutable_let_patterns)] if let KafkaConnectionBuilder::Java(_) = connection_builder { @@ -225,6 +229,64 @@ async fn delete_records(admin: &KafkaAdmin, connection_builder: &KafkaConnection } } +async fn delete_records_partitions3( + admin: &KafkaAdmin, + connection_builder: &KafkaConnectionBuilder, +) { + // Only supported by java driver + #[allow(irrefutable_let_patterns)] + if let KafkaConnectionBuilder::Java(_) = connection_builder { + // assert partition contains a record + let mut consumer = connection_builder + .connect_consumer( + ConsumerConfig::consume_from_topics(vec!["partitions3_case1".to_owned()]) + .with_group("test_delete_records"), + ) + .await; + + // assert that a record exists, due to cross partition ordering we dont know what the record is, just that it exists. + consumer.consume(Duration::from_secs(30)).await; + + // delete all records in the partition + admin + .delete_records(&[ + RecordsToDelete { + topic_partition: TopicPartition { + topic_name: "partitions3_case1".to_owned(), + partition: 0, + }, + delete_before_offset: -1, + }, + RecordsToDelete { + topic_partition: TopicPartition { + topic_name: "partitions3_case1".to_owned(), + partition: 1, + }, + delete_before_offset: -1, + }, + RecordsToDelete { + topic_partition: TopicPartition { + topic_name: "partitions3_case1".to_owned(), + partition: 2, + }, + delete_before_offset: -1, + }, + ]) + .await; + + // assert partition no longer contains a record + let mut consumer = connection_builder + .connect_consumer( + ConsumerConfig::consume_from_topics(vec!["partitions3_case1".to_owned()]) + .with_group("test_delete_records2"), + ) + .await; + consumer + .assert_no_consume_within_timeout(Duration::from_secs(2)) + .await; + } +} + /// Attempt to make the driver batch produce requests for different topics into the same request /// This is important to test since shotover has complex logic for splitting these batch requests into individual requests. pub async fn produce_consume_multi_topic_batch(connection_builder: &KafkaConnectionBuilder) { diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index 21d729b83..fe9c440e5 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -14,6 +14,8 @@ use connections::{Connections, Destination}; use dashmap::DashMap; use kafka_node::{ConnectionFactory, KafkaAddress, KafkaNode, KafkaNodeState}; use kafka_protocol::messages::add_partitions_to_txn_request::AddPartitionsToTxnTransaction; +use kafka_protocol::messages::delete_records_request::DeleteRecordsTopic; +use kafka_protocol::messages::delete_records_response::DeleteRecordsTopicResult; use kafka_protocol::messages::fetch_request::FetchTopic; use kafka_protocol::messages::fetch_response::LeaderIdAndEpoch as FetchResponseLeaderIdAndEpoch; use kafka_protocol::messages::list_offsets_request::ListOffsetsTopic; @@ -27,14 +29,14 @@ use kafka_protocol::messages::produce_response::{ }; use kafka_protocol::messages::{ AddOffsetsToTxnRequest, AddPartitionsToTxnRequest, AddPartitionsToTxnResponse, ApiKey, - BrokerId, DeleteGroupsRequest, DeleteGroupsResponse, EndTxnRequest, FetchRequest, - FetchResponse, FindCoordinatorRequest, FindCoordinatorResponse, GroupId, HeartbeatRequest, - InitProducerIdRequest, JoinGroupRequest, LeaveGroupRequest, ListGroupsResponse, - ListOffsetsRequest, ListOffsetsResponse, ListTransactionsResponse, MetadataRequest, - MetadataResponse, OffsetFetchRequest, OffsetFetchResponse, OffsetForLeaderEpochRequest, - OffsetForLeaderEpochResponse, ProduceRequest, ProduceResponse, RequestHeader, - SaslAuthenticateRequest, SaslAuthenticateResponse, SaslHandshakeRequest, SyncGroupRequest, - TopicName, TransactionalId, TxnOffsetCommitRequest, + BrokerId, DeleteGroupsRequest, DeleteGroupsResponse, DeleteRecordsRequest, + DeleteRecordsResponse, EndTxnRequest, FetchRequest, FetchResponse, FindCoordinatorRequest, + FindCoordinatorResponse, GroupId, HeartbeatRequest, InitProducerIdRequest, JoinGroupRequest, + LeaveGroupRequest, ListGroupsResponse, ListOffsetsRequest, ListOffsetsResponse, + ListTransactionsResponse, MetadataRequest, MetadataResponse, OffsetFetchRequest, + OffsetFetchResponse, OffsetForLeaderEpochRequest, OffsetForLeaderEpochResponse, ProduceRequest, + ProduceResponse, RequestHeader, SaslAuthenticateRequest, SaslAuthenticateResponse, + SaslHandshakeRequest, SyncGroupRequest, TopicName, TransactionalId, TxnOffsetCommitRequest, }; use kafka_protocol::protocol::StrBytes; use kafka_protocol::ResponseError; @@ -49,8 +51,9 @@ use scram_over_mtls::{ use serde::{Deserialize, Serialize}; use shotover_node::{ShotoverNode, ShotoverNodeConfig}; use split::{ - AddPartitionsToTxnRequestSplitAndRouter, DeleteGroupsSplitAndRouter, ListGroupsSplitAndRouter, - ListOffsetsRequestSplitAndRouter, ListTransactionsSplitAndRouter, OffsetFetchSplitAndRouter, + AddPartitionsToTxnRequestSplitAndRouter, DeleteGroupsSplitAndRouter, + DeleteRecordsRequestSplitAndRouter, ListGroupsSplitAndRouter, ListOffsetsRequestSplitAndRouter, + ListTransactionsSplitAndRouter, OffsetFetchSplitAndRouter, OffsetForLeaderEpochRequestSplitAndRouter, ProduceRequestSplitAndRouter, RequestSplitAndRouter, }; use std::collections::{HashMap, HashSet, VecDeque}; @@ -885,6 +888,12 @@ impl KafkaSinkCluster { })) => self.split_and_route_request::( request, )?, + Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::DeleteRecords(_), + .. + })) => { + self.split_and_route_request::(request)? + } // route to group coordinator Some(Frame::Kafka(KafkaFrame::Request { @@ -1055,7 +1064,6 @@ The connection to the client has been closed." | RequestBody::AlterConfigs(_) | RequestBody::CreatePartitions(_) | RequestBody::DeleteTopics(_) - | RequestBody::DeleteRecords(_) | RequestBody::CreateAcls(_) | RequestBody::ApiVersions(_), .. @@ -1587,6 +1595,58 @@ The connection to the client has been closed." result } + /// This method removes all topics from the DeleteRecords request and returns them split up by their destination. + /// If any topics are unroutable they will have their BrokerId set to -1 + fn split_delete_records_request_by_destination( + &mut self, + body: &mut DeleteRecordsRequest, + ) -> HashMap> { + let mut result: HashMap> = Default::default(); + + for mut topic in body.topics.drain(..) { + let topic_name = &topic.name; + if let Some(topic_meta) = self.topic_by_name.get(topic_name) { + for partition in std::mem::take(&mut topic.partitions) { + let partition_index = partition.partition_index as usize; + let destination = if let Some(partition) = + topic_meta.partitions.get(partition_index) + { + if partition.leader_id == -1 { + tracing::warn!( + "leader_id is unknown for {topic_name:?} at partition index {partition_index}", + ); + } + partition.leader_id + } else { + let partition_len = topic_meta.partitions.len(); + tracing::warn!("no known partition for {topic_name:?} at partition index {partition_index} out of {partition_len} partitions, routing request to a random broker so that a NOT_LEADER_OR_FOLLOWER or similar error is returned to the client"); + BrokerId(-1) + }; + tracing::debug!( + "Routing DeleteRecords request portion of partition {partition_index} in {topic_name:?} to broker {}", + destination.0 + ); + let dest_topics = result.entry(destination).or_default(); + if let Some(dest_topic) = dest_topics.iter_mut().find(|x| x.name == topic.name) + { + dest_topic.partitions.push(partition); + } else { + let mut topic = topic.clone(); + topic.partitions.push(partition); + dest_topics.push(topic); + } + } + } else { + tracing::warn!("no known partition replica for {topic_name:?}, routing request to a random broker so that a NOT_LEADER_OR_FOLLOWER or similar error is returned to the client"); + let destination = BrokerId(-1); + let dest_topics = result.entry(destination).or_default(); + dest_topics.push(topic); + } + } + + result + } + /// This method removes all transactions from the AddPartitionsToTxn request and returns them split up by their destination /// If any topics are unroutable they will have their BrokerId set to -1 fn split_add_partition_to_txn_request_by_destination( @@ -2077,6 +2137,10 @@ The connection to the client has been closed." body: ResponseBody::DeleteGroups(base), .. })) => Self::combine_delete_groups_responses(base, drain)?, + Some(Frame::Kafka(KafkaFrame::Response { + body: ResponseBody::DeleteRecords(base), + .. + })) => Self::combine_delete_records(base, drain)?, Some(Frame::Kafka(KafkaFrame::Response { body: ResponseBody::OffsetFetch(base), .. @@ -2271,6 +2335,49 @@ The connection to the client has been closed." Ok(()) } + fn combine_delete_records( + base_body: &mut DeleteRecordsResponse, + drain: impl Iterator, + ) -> Result<()> { + let mut base_topics: HashMap = + std::mem::take(&mut base_body.topics) + .into_iter() + .map(|response| (response.name.clone(), response)) + .collect(); + for mut next in drain { + if let Some(Frame::Kafka(KafkaFrame::Response { + body: ResponseBody::DeleteRecords(next_body), + .. + })) = next.frame() + { + for next_response in std::mem::take(&mut next_body.topics) { + if let Some(base_response) = base_topics.get_mut(&next_response.name) { + for next_partition in &next_response.partitions { + for base_partition in &base_response.partitions { + if next_partition.partition_index == base_partition.partition_index + { + tracing::warn!("Duplicate partition indexes in combined DeleteRecords response, if this ever occurs we should investigate the repercussions") + } + } + } + // A partition can only be contained in one response so there is no risk of duplicating partitions + base_response.partitions.extend(next_response.partitions) + } else { + base_topics.insert(next_response.name.clone(), next_response); + } + } + } else { + return Err(anyhow!( + "Combining DeleteRecords responses but received another message type" + )); + } + } + + base_body.topics.extend(base_topics.into_values()); + + Ok(()) + } + fn combine_delete_groups_responses( base_delete_groups: &mut DeleteGroupsResponse, drain: impl Iterator, diff --git a/shotover/src/transforms/kafka/sink_cluster/split.rs b/shotover/src/transforms/kafka/sink_cluster/split.rs index 7ada3283b..7b34bc0c3 100644 --- a/shotover/src/transforms/kafka/sink_cluster/split.rs +++ b/shotover/src/transforms/kafka/sink_cluster/split.rs @@ -8,11 +8,12 @@ use crate::{ }; use kafka_protocol::messages::{ add_partitions_to_txn_request::AddPartitionsToTxnTransaction, - list_offsets_request::ListOffsetsTopic, offset_fetch_request::OffsetFetchRequestGroup, + delete_records_request::DeleteRecordsTopic, list_offsets_request::ListOffsetsTopic, + offset_fetch_request::OffsetFetchRequestGroup, offset_for_leader_epoch_request::OffsetForLeaderTopic, produce_request::TopicProduceData, - AddPartitionsToTxnRequest, BrokerId, DeleteGroupsRequest, GroupId, ListGroupsRequest, - ListOffsetsRequest, ListTransactionsRequest, OffsetFetchRequest, OffsetForLeaderEpochRequest, - ProduceRequest, TopicName, + AddPartitionsToTxnRequest, BrokerId, DeleteGroupsRequest, DeleteRecordsRequest, GroupId, + ListGroupsRequest, ListOffsetsRequest, ListTransactionsRequest, OffsetFetchRequest, + OffsetForLeaderEpochRequest, ProduceRequest, TopicName, }; use std::collections::HashMap; @@ -139,6 +140,34 @@ impl RequestSplitAndRouter for OffsetForLeaderEpochRequestSplitAndRouter { } } +pub struct DeleteRecordsRequestSplitAndRouter; + +impl RequestSplitAndRouter for DeleteRecordsRequestSplitAndRouter { + type Request = DeleteRecordsRequest; + type SubRequests = Vec; + + fn split_by_destination( + transform: &mut KafkaSinkCluster, + request: &mut Self::Request, + ) -> HashMap { + transform.split_delete_records_request_by_destination(request) + } + + fn get_request_frame(request: &mut Message) -> Option<&mut Self::Request> { + match request.frame() { + Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::DeleteRecords(request), + .. + })) => Some(request), + _ => None, + } + } + + fn reassemble(request: &mut Self::Request, item: Self::SubRequests) { + request.topics = item; + } +} + pub struct DeleteGroupsSplitAndRouter; impl RequestSplitAndRouter for DeleteGroupsSplitAndRouter { diff --git a/test-helpers/src/connection/kafka/mod.rs b/test-helpers/src/connection/kafka/mod.rs index 6d899abd1..7fe493b75 100644 --- a/test-helpers/src/connection/kafka/mod.rs +++ b/test-helpers/src/connection/kafka/mod.rs @@ -268,7 +268,7 @@ impl KafkaConsumer { } } - async fn consume(&mut self, timeout: Duration) -> ExpectedResponse { + pub async fn consume(&mut self, timeout: Duration) -> ExpectedResponse { match self { #[cfg(feature = "kafka-cpp-driver-tests")] Self::Cpp(cpp) => cpp.consume(timeout).await,