diff --git a/shotover-proxy/tests/kafka_int_tests/test_cases.rs b/shotover-proxy/tests/kafka_int_tests/test_cases.rs index 42414fda4..3b7d194cc 100644 --- a/shotover-proxy/tests/kafka_int_tests/test_cases.rs +++ b/shotover-proxy/tests/kafka_int_tests/test_cases.rs @@ -4,8 +4,8 @@ use test_helpers::{ connection::kafka::{ Acl, AclOperation, AclPermissionType, AlterConfig, ConfigEntry, ConsumerConfig, ExpectedResponse, KafkaConnectionBuilder, KafkaConsumer, KafkaDriver, KafkaProducer, - NewPartition, NewTopic, Record, ResourcePatternType, ResourceSpecifier, ResourceType, - TopicPartition, + ListOffsetsResultInfo, NewPartition, NewTopic, OffsetSpec, Record, ResourcePatternType, + ResourceSpecifier, ResourceType, TopicPartition, }, docker_compose::DockerCompose, }; @@ -859,6 +859,71 @@ pub async fn standard_test_suite(connection_builder: &KafkaConnectionBuilder) { }]) .await; produce_consume_partitions1(connection_builder, "partitions1").await; + + let results = admin + .list_offsets(HashMap::from([ + ( + TopicPartition { + topic_name: "partitions3_case3".to_owned(), + partition: 0, + }, + OffsetSpec::Earliest, + ), + ( + TopicPartition { + topic_name: "partitions3_case3".to_owned(), + partition: 1, + }, + OffsetSpec::Earliest, + ), + ( + TopicPartition { + topic_name: "partitions3_case3".to_owned(), + partition: 2, + }, + OffsetSpec::Earliest, + ), + ( + TopicPartition { + topic_name: "partitions1".to_owned(), + partition: 0, + }, + OffsetSpec::Latest, + ), + ])) + .await; + + let expected = HashMap::from([ + ( + TopicPartition { + topic_name: "partitions3_case3".to_owned(), + partition: 0, + }, + ListOffsetsResultInfo { offset: 0 }, + ), + ( + TopicPartition { + topic_name: "partitions3_case3".to_owned(), + partition: 1, + }, + ListOffsetsResultInfo { offset: 0 }, + ), + ( + TopicPartition { + topic_name: "partitions3_case3".to_owned(), + partition: 2, + }, + ListOffsetsResultInfo { offset: 0 }, + ), + ( + TopicPartition { + topic_name: "partitions1".to_owned(), + partition: 0, + }, + ListOffsetsResultInfo { offset: 11 }, + ), + ]); + assert_eq!(results, expected); } produce_consume_acks0(connection_builder).await; diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index 4ba41a4ab..1c2554098 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -15,15 +15,17 @@ use kafka_node::{ConnectionFactory, KafkaAddress, KafkaNode, KafkaNodeState}; use kafka_protocol::indexmap::IndexMap; use kafka_protocol::messages::fetch_request::FetchTopic; use kafka_protocol::messages::fetch_response::LeaderIdAndEpoch as FetchResponseLeaderIdAndEpoch; +use kafka_protocol::messages::list_offsets_request::ListOffsetsTopic; use kafka_protocol::messages::metadata_request::MetadataRequestTopic; use kafka_protocol::messages::metadata_response::MetadataResponseBroker; use kafka_protocol::messages::produce_request::TopicProduceData; use kafka_protocol::messages::produce_response::LeaderIdAndEpoch as ProduceResponseLeaderIdAndEpoch; use kafka_protocol::messages::{ ApiKey, BrokerId, FetchRequest, FetchResponse, FindCoordinatorRequest, FindCoordinatorResponse, - GroupId, HeartbeatRequest, JoinGroupRequest, LeaveGroupRequest, MetadataRequest, - MetadataResponse, ProduceRequest, ProduceResponse, RequestHeader, SaslAuthenticateRequest, - SaslAuthenticateResponse, SaslHandshakeRequest, SyncGroupRequest, TopicName, + GroupId, HeartbeatRequest, JoinGroupRequest, LeaveGroupRequest, ListOffsetsRequest, + ListOffsetsResponse, MetadataRequest, MetadataResponse, ProduceRequest, ProduceResponse, + RequestHeader, SaslAuthenticateRequest, SaslAuthenticateResponse, SaslHandshakeRequest, + SyncGroupRequest, TopicName, }; use kafka_protocol::protocol::StrBytes; use kafka_protocol::ResponseError; @@ -696,17 +698,19 @@ impl KafkaSinkCluster { for mut message in requests { // This routing is documented in transforms.md so make sure to update that when making changes here. match message.frame() { - // route to partition leader + // split and route to partition leader Some(Frame::Kafka(KafkaFrame::Request { body: RequestBody::Produce(_), .. })) => self.route_produce_request(message)?, - - // route to random partition replica Some(Frame::Kafka(KafkaFrame::Request { body: RequestBody::Fetch(_), .. })) => self.route_fetch_request(message)?, + Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::ListOffsets(_), + .. + })) => self.route_list_offsets(message)?, // route to group coordinator Some(Frame::Kafka(KafkaFrame::Request { @@ -1101,6 +1105,140 @@ impl KafkaSinkCluster { Ok(()) } + /// This method removes all topics from the list offsets request and returns them split up by their destination + /// If any topics are unroutable they will have their BrokerId set to -1 + fn split_list_offsets_request_by_destination( + &mut self, + list_offsets: &mut ListOffsetsRequest, + ) -> HashMap> { + let mut result: HashMap> = Default::default(); + + for mut topic in list_offsets.topics.drain(..) { + let topic_name = &topic.name; + if let Some(topic_meta) = self.topic_by_name.get(topic_name) { + for partition in std::mem::take(&mut topic.partitions) { + let partition_index = partition.partition_index as usize; + let destination = if let Some(partition) = + topic_meta.partitions.get(partition_index) + { + if partition.leader_id == -1 { + tracing::warn!( + "leader_id is unknown for {topic_name:?} at partition index {partition_index}", + ); + } + partition.leader_id + } else { + let partition_len = topic_meta.partitions.len(); + tracing::warn!("no known partition for {topic_name:?} at partition index {partition_index} out of {partition_len} partitions, routing message to a random broker so that a NOT_LEADER_OR_FOLLOWER or similar error is returned to the client"); + BrokerId(-1) + }; + tracing::debug!( + "Routing list_offsets request portion of partition {partition_index} in {topic_name:?} to broker {}", + destination.0 + ); + let dest_topics = result.entry(destination).or_default(); + if let Some(dest_topic) = dest_topics.iter_mut().find(|x| x.name == topic.name) + { + dest_topic.partitions.push(partition); + } else { + let mut topic = topic.clone(); + topic.partitions.push(partition); + dest_topics.push(topic); + } + } + } else { + tracing::warn!("no known partition replica for {topic_name:?}, routing message to a random broker so that a NOT_LEADER_OR_FOLLOWER or similar error is returned to the client"); + let destination = BrokerId(-1); + let dest_topics = result.entry(destination).or_default(); + dest_topics.push(topic); + } + } + + result + } + + fn route_list_offsets(&mut self, mut request: Message) -> Result<()> { + if let Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::ListOffsets(list_offsets), + .. + })) = request.frame() + { + let routing = self.split_list_offsets_request_by_destination(list_offsets); + + if routing.is_empty() { + // Fetch contains no topics, so we can just pick a random destination. + // The message is unchanged so we can just send as is. + let destination = random_broker_id(&self.nodes, &mut self.rng); + + self.pending_requests.push_back(PendingRequest { + state: PendingRequestState::routed(destination, request), + // we dont need special handling for list_offsets, so just use Other + ty: PendingRequestTy::Other, + combine_responses: 1, + }); + tracing::debug!( + "Routing ListOffsets request to random broker {} due to being empty", + destination.0 + ); + } else if routing.len() == 1 { + // Only 1 destination, + // so we can just reconstruct the original message as is, + // act like this never happened 😎, + // we dont even need to invalidate the message's cache. + let (destination, topics) = routing.into_iter().next().unwrap(); + let destination = if destination == -1 { + random_broker_id(&self.nodes, &mut self.rng) + } else { + destination + }; + + list_offsets.topics = topics; + self.pending_requests.push_back(PendingRequest { + state: PendingRequestState::routed(destination, request), + // we dont need special handling for list_offsets, so just use Other + ty: PendingRequestTy::Other, + combine_responses: 1, + }); + tracing::debug!( + "Routing ListOffsets request to single broker {}", + destination.0 + ); + } else { + // The message has been split so it may be delivered to multiple destinations. + // We must generate a unique message for each destination. + let combine_responses = routing.len(); + request.invalidate_cache(); + for (i, (destination, topics)) in routing.into_iter().enumerate() { + let destination = if destination == -1 { + random_broker_id(&self.nodes, &mut self.rng) + } else { + destination + }; + let mut request = if i == 0 { + // First message acts as base and retains message id + request.clone() + } else { + request.clone_with_new_id() + }; + if let Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::ListOffsets(list_offsets), + .. + })) = request.frame() + { + list_offsets.topics = topics; + } + self.pending_requests.push_back(PendingRequest { + state: PendingRequestState::routed(destination, request), + ty: PendingRequestTy::Other, + combine_responses, + }); + } + tracing::debug!("Routing ListOffsets request to multiple brokers"); + } + } + Ok(()) + } + async fn find_coordinator_of_group( &mut self, group: GroupId, @@ -1523,6 +1661,10 @@ impl KafkaSinkCluster { body: ResponseBody::Fetch(base), .. })) => Self::combine_fetch_responses(base, drain)?, + Some(Frame::Kafka(KafkaFrame::Response { + body: ResponseBody::ListOffsets(base), + .. + })) => Self::combine_list_offsets_responses(base, drain)?, Some(Frame::Kafka(KafkaFrame::Response { body: ResponseBody::Produce(base), .. @@ -1576,6 +1718,46 @@ impl KafkaSinkCluster { Ok(()) } + fn combine_list_offsets_responses( + base_fetch: &mut ListOffsetsResponse, + drain: impl Iterator, + ) -> Result<()> { + for mut next in drain { + if let Some(Frame::Kafka(KafkaFrame::Response { + body: ResponseBody::ListOffsets(next_list_offsets), + .. + })) = next.frame() + { + for next_topic in std::mem::take(&mut next_list_offsets.topics) { + if let Some(base_response) = base_fetch + .topics + .iter_mut() + .find(|topic| topic.name == next_topic.name) + { + for next_partition in &next_topic.partitions { + for base_partition in &base_response.partitions { + if next_partition.partition_index == base_partition.partition_index + { + tracing::warn!("Duplicate partition indexes in combined fetch response, if this ever occurs we should investigate the repercussions") + } + } + } + // A partition can only be contained in one response so there is no risk of duplicating partitions + base_response.partitions.extend(next_topic.partitions) + } else { + base_fetch.topics.push(next_topic); + } + } + } else { + return Err(anyhow!( + "Combining Fetch responses but received another message type" + )); + } + } + + Ok(()) + } + fn combine_produce_responses( base_produce: &mut ProduceResponse, drain: impl Iterator, @@ -1745,6 +1927,25 @@ impl KafkaSinkCluster { } response.invalidate_cache(); } + Some(Frame::Kafka(KafkaFrame::Response { + body: ResponseBody::ListOffsets(list_offsets), + .. + })) => { + for topic in &mut list_offsets.topics { + for partition in &mut topic.partitions { + if let Some(ResponseError::NotLeaderOrFollower) = + ResponseError::try_from_code(partition.error_code) + { + self.topic_by_name.remove(&topic.name); + tracing::info!( + "ListOffets response included error NOT_LEADER_OR_FOLLOWER and so cleared metadata for topic {:?}", + topic.name, + ); + break; + } + } + } + } Some(Frame::Kafka(KafkaFrame::Response { body: ResponseBody::Heartbeat(heartbeat), .. diff --git a/test-helpers/src/connection/kafka/java.rs b/test-helpers/src/connection/kafka/java.rs index 21ae4692b..659a7f38c 100644 --- a/test-helpers/src/connection/kafka/java.rs +++ b/test-helpers/src/connection/kafka/java.rs @@ -1,7 +1,7 @@ use super::{ Acl, AclOperation, AclPermissionType, AlterConfig, ConsumerConfig, ExpectedResponse, - NewPartition, NewTopic, Record, ResourcePatternType, ResourceSpecifier, ResourceType, - TopicDescription, TopicPartition, + ListOffsetsResultInfo, NewPartition, NewTopic, OffsetSpec, Record, ResourcePatternType, + ResourceSpecifier, ResourceType, TopicDescription, TopicPartition, }; use crate::connection::java::{Jvm, Value}; use anyhow::Result; @@ -253,13 +253,7 @@ impl KafkaConsumerJava { .iter() .map(|(tp, offset)| { ( - self.jvm.construct( - "org.apache.kafka.common.TopicPartition", - vec![ - self.jvm.new_string(&tp.topic_name), - self.jvm.new_int(tp.partition), - ], - ), + create_topic_partition(&self.jvm, tp), self.jvm.construct( "org.apache.kafka.clients.consumer.OffsetAndMetadata", vec![self.jvm.new_long(*offset)], @@ -279,13 +273,7 @@ impl KafkaConsumerJava { let mut offsets = HashMap::new(); for tp in partitions { - let topic_partition = self.jvm.construct( - "org.apache.kafka.common.TopicPartition", - vec![ - self.jvm.new_string(&tp.topic_name), - self.jvm.new_int(tp.partition), - ], - ); + let topic_partition = create_topic_partition(&self.jvm, &tp); let timeout = self.jvm.call_static( "java.time.Duration", @@ -471,6 +459,49 @@ impl KafkaAdminJava { .await; } + pub async fn list_offsets( + &self, + topic_partitions: HashMap, + ) -> HashMap { + let offset_spec_class = "org.apache.kafka.clients.admin.OffsetSpec"; + let topic_partitions_java: Vec<_> = topic_partitions + .iter() + .map(|(topic_partition, offset_spec)| { + ( + create_topic_partition(&self.jvm, topic_partition), + match offset_spec { + OffsetSpec::Earliest => { + self.jvm.call_static(offset_spec_class, "earliest", vec![]) + } + OffsetSpec::Latest => { + self.jvm.call_static(offset_spec_class, "latest", vec![]) + } + }, + ) + }) + .collect(); + let topic_partitions_java = self.jvm.new_map(topic_partitions_java); + + let java_results = self + .admin + .call("listOffsets", vec![topic_partitions_java]) + .call_async("all", vec![]) + .await; + + let mut results = HashMap::new(); + for topic_partition in topic_partitions.into_keys() { + let result = java_results + .call( + "get", + vec![create_topic_partition(&self.jvm, &topic_partition)], + ) + .cast("org.apache.kafka.clients.admin.ListOffsetsResult$ListOffsetsResultInfo"); + let offset: i32 = result.call("offset", vec![]).into_rust(); + results.insert(topic_partition, ListOffsetsResultInfo { offset }); + } + results + } + pub async fn create_acls(&self, acls: Vec) { let resource_type = self .jvm @@ -552,3 +583,10 @@ impl KafkaAdminJava { .await; } } + +fn create_topic_partition(jvm: &Jvm, tp: &TopicPartition) -> Value { + jvm.construct( + "org.apache.kafka.common.TopicPartition", + vec![jvm.new_string(&tp.topic_name), jvm.new_int(tp.partition)], + ) +} diff --git a/test-helpers/src/connection/kafka/mod.rs b/test-helpers/src/connection/kafka/mod.rs index aaecac4c8..2aab65aad 100644 --- a/test-helpers/src/connection/kafka/mod.rs +++ b/test-helpers/src/connection/kafka/mod.rs @@ -280,6 +280,17 @@ impl KafkaAdmin { } } + pub async fn list_offsets( + &self, + topic_partitions: HashMap, + ) -> HashMap { + match self { + #[cfg(feature = "kafka-cpp-driver-tests")] + Self::Cpp(_) => panic!("rdkafka-rs driver does not support list_offsets"), + Self::Java(java) => java.list_offsets(topic_partitions).await, + } + } + pub async fn create_partitions(&self, partitions: &[NewPartition<'_>]) { match self { #[cfg(feature = "kafka-cpp-driver-tests")] @@ -313,6 +324,11 @@ impl KafkaAdmin { } } +#[derive(Eq, PartialEq, Debug)] +pub struct ListOffsetsResultInfo { + pub offset: i32, +} + pub struct NewTopic<'a> { pub name: &'a str, pub num_partitions: i32, @@ -324,7 +340,7 @@ pub struct NewPartition<'a> { pub new_partition_count: i32, } -#[derive(Clone, Eq, PartialEq, Hash)] +#[derive(Clone, Eq, PartialEq, Hash, Debug)] pub struct TopicPartition { pub topic_name: String, pub partition: i32, @@ -399,6 +415,11 @@ pub struct TopicDescription { // so this is intentionally left empty for now } +pub enum OffsetSpec { + Earliest, + Latest, +} + #[derive(Default)] pub struct ConsumerConfig { topic_name: String,