diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index 9a7fbb87f..016567fa1 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -14,6 +14,7 @@ use connections::{Connections, Destination}; use dashmap::DashMap; use kafka_node::{ConnectionFactory, KafkaAddress, KafkaNode, KafkaNodeState}; use kafka_protocol::indexmap::IndexMap; +use kafka_protocol::messages::add_partitions_to_txn_request::AddPartitionsToTxnTransaction; use kafka_protocol::messages::fetch_request::FetchTopic; use kafka_protocol::messages::fetch_response::LeaderIdAndEpoch as FetchResponseLeaderIdAndEpoch; use kafka_protocol::messages::list_offsets_request::ListOffsetsTopic; @@ -22,11 +23,12 @@ use kafka_protocol::messages::metadata_response::MetadataResponseBroker; use kafka_protocol::messages::produce_request::TopicProduceData; use kafka_protocol::messages::produce_response::LeaderIdAndEpoch as ProduceResponseLeaderIdAndEpoch; use kafka_protocol::messages::{ - ApiKey, BrokerId, EndTxnRequest, FetchRequest, FetchResponse, FindCoordinatorRequest, - FindCoordinatorResponse, GroupId, HeartbeatRequest, InitProducerIdRequest, JoinGroupRequest, - LeaveGroupRequest, ListOffsetsRequest, ListOffsetsResponse, MetadataRequest, MetadataResponse, - ProduceRequest, ProduceResponse, RequestHeader, SaslAuthenticateRequest, - SaslAuthenticateResponse, SaslHandshakeRequest, SyncGroupRequest, TopicName, TransactionalId, + AddPartitionsToTxnRequest, AddPartitionsToTxnResponse, ApiKey, BrokerId, EndTxnRequest, + FetchRequest, FetchResponse, FindCoordinatorRequest, FindCoordinatorResponse, GroupId, + HeartbeatRequest, InitProducerIdRequest, JoinGroupRequest, LeaveGroupRequest, + ListOffsetsRequest, ListOffsetsResponse, MetadataRequest, MetadataResponse, ProduceRequest, + ProduceResponse, RequestHeader, SaslAuthenticateRequest, SaslAuthenticateResponse, + SaslHandshakeRequest, SyncGroupRequest, TopicName, TransactionalId, }; use kafka_protocol::protocol::StrBytes; use kafka_protocol::ResponseError; @@ -676,36 +678,29 @@ impl KafkaSinkCluster { } Some(Frame::Kafka(KafkaFrame::Request { body: - // TODO: only keep the ones we actually to route for - // RequestBody::TxnOffsetCommit(TxnOffsetCommitRequest { - // transactional_id, .. - // })| RequestBody::InitProducerId(InitProducerIdRequest { - transactional_id: Some(transactional_id), .. + transactional_id: Some(transactional_id), + .. }) | RequestBody::EndTxn(EndTxnRequest { transactional_id, .. }), - // | RequestBody::AddOffsetsToTxn(AddOffsetsToTxnRequest { - // transactional_id, .. - // }), .. })) => { self.store_transaction(&mut transactions, transactional_id.clone()); } Some(Frame::Kafka(KafkaFrame::Request { - body: - RequestBody::AddPartitionsToTxn(add_partitions_to_txn_request) - , + body: RequestBody::AddPartitionsToTxn(add_partitions_to_txn_request), header, })) => { if header.request_api_version <= 3 { self.store_transaction( &mut transactions, - add_partitions_to_txn_request.v3_and_below_transactional_id.clone() + add_partitions_to_txn_request + .v3_and_below_transactional_id + .clone(), ); - } - else { + } else { for transaction in add_partitions_to_txn_request.transactions.keys() { self.store_transaction(&mut transactions, transaction.clone()); } @@ -838,41 +833,6 @@ impl KafkaSinkCluster { let group_id = heartbeat.group_id.clone(); self.route_to_group_coordinator(message, group_id); } - Some(Frame::Kafka(KafkaFrame::Request { - body: RequestBody::AddPartitionsToTxn(add_partitions_to_txn), - header, - })) => { - if header.request_api_version <= 3 { - let transaction_id = - add_partitions_to_txn.v3_and_below_transactional_id.clone(); - self.route_to_transaction_coordinator(message, transaction_id); - } else { - // TODO: split request - #[allow(clippy::never_loop)] - for transaction_id in add_partitions_to_txn.transactions.keys() { - let transaction_id = transaction_id.clone(); - self.route_to_transaction_coordinator(message, transaction_id); - break; - } - } - } - Some(Frame::Kafka(KafkaFrame::Request { - body: RequestBody::EndTxn(end_txn), - .. - })) => { - let transaction_id = end_txn.transactional_id.clone(); - self.route_to_transaction_coordinator(message, transaction_id); - } - Some(Frame::Kafka(KafkaFrame::Request { - body: RequestBody::InitProducerId(init_producer_id), - .. - })) => { - if let Some(transaction_id) = init_producer_id.transactional_id.clone() { - self.route_to_transaction_coordinator(message, transaction_id); - } else { - self.route_to_random_broker(message); - } - } Some(Frame::Kafka(KafkaFrame::Request { body: RequestBody::SyncGroup(sync_group), .. @@ -926,6 +886,30 @@ impl KafkaSinkCluster { let group_id = groups.groups_names.first().unwrap().clone(); self.route_to_group_coordinator(message, group_id); } + + // route to transaction coordinator + Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::EndTxn(end_txn), + .. + })) => { + let transaction_id = end_txn.transactional_id.clone(); + self.route_to_transaction_coordinator(message, transaction_id); + } + Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::InitProducerId(init_producer_id), + .. + })) => { + if let Some(transaction_id) = init_producer_id.transactional_id.clone() { + self.route_to_transaction_coordinator(message, transaction_id); + } else { + self.route_to_random_broker(message); + } + } + Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::AddPartitionsToTxn(_), + .. + })) => self.route_add_partitions_to_txn(message)?, + Some(Frame::Kafka(KafkaFrame::Request { body: RequestBody::FindCoordinator(_), .. @@ -1393,6 +1377,122 @@ impl KafkaSinkCluster { Ok(()) } + /// This method removes all transactions from the AddPartitionsToTxn request and returns them split up by their destination + /// If any topics are unroutable they will have their BrokerId set to -1 + fn split_add_partition_to_txn_request_by_destination( + &mut self, + body: &mut AddPartitionsToTxnRequest, + ) -> HashMap> { + let mut result: HashMap> = Default::default(); + + for (transaction_id, transaction) in body.transactions.drain(..) { + let destination = if let Some(destination) = + self.transaction_to_coordinator_broker.get(&transaction_id) + { + tracing::debug!( + "Routing AddPartitionsToTxn request portion of transaction id {transaction_id:?} to broker {}", + destination.0 + ); + *destination + } else { + tracing::warn!("no known transaction for {transaction_id:?}, routing request to a random broker so that a NOT_COORDINATOR or similar error is returned to the client"); + BrokerId(-1) + }; + let dest_transactions = result.entry(destination).or_default(); + dest_transactions.insert(transaction_id, transaction); + } + + result + } + + fn route_add_partitions_to_txn(&mut self, mut request: Message) -> Result<()> { + if let Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::AddPartitionsToTxn(body), + header, + .. + })) = request.frame() + { + if header.request_api_version <= 3 { + let transaction_id = body.v3_and_below_transactional_id.clone(); + self.route_to_transaction_coordinator(request, transaction_id); + } else { + let routing = self.split_add_partition_to_txn_request_by_destination(body); + + if routing.is_empty() { + // ListOffsets contains no topics, so we can just pick a random destination. + // The message is unchanged so we can just send as is. + let destination = random_broker_id(&self.nodes, &mut self.rng); + + self.pending_requests.push_back(PendingRequest { + state: PendingRequestState::routed(destination, request), + // we dont need special handling for list_offsets, so just use Other + ty: PendingRequestTy::Other, + combine_responses: 1, + }); + tracing::debug!( + "Routing AddPartitionsToTxn request to random broker {} due to being empty", + destination.0 + ); + } else if routing.len() == 1 { + // Only 1 destination, + // so we can just reconstruct the original message as is, + // act like this never happened 😎, + // we dont even need to invalidate the message's cache. + let (destination, transactions) = routing.into_iter().next().unwrap(); + let destination = if destination == -1 { + random_broker_id(&self.nodes, &mut self.rng) + } else { + destination + }; + + body.transactions = transactions; + self.pending_requests.push_back(PendingRequest { + state: PendingRequestState::routed(destination, request), + // we dont need special handling for ListOffsets, so just use Other + ty: PendingRequestTy::Other, + combine_responses: 1, + }); + tracing::debug!( + "Routing AddPartitionsToTxn request to single broker {}", + destination.0 + ); + } else { + // The message has been split so it may be delivered to multiple destinations. + // We must generate a unique message for each destination. + let combine_responses = routing.len(); + request.invalidate_cache(); + for (i, (destination, transactions)) in routing.into_iter().enumerate() { + let destination = if destination == -1 { + random_broker_id(&self.nodes, &mut self.rng) + } else { + destination + }; + let mut request = if i == 0 { + // First message acts as base and retains message id + request.clone() + } else { + request.clone_with_new_id() + }; + if let Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::AddPartitionsToTxn(body), + .. + })) = request.frame() + { + body.transactions = transactions; + } + self.pending_requests.push_back(PendingRequest { + state: PendingRequestState::routed(destination, request), + ty: PendingRequestTy::Other, + combine_responses, + }); + } + tracing::debug!("Routing AddPartitionsToTxn request to multiple brokers"); + } + } + } + Ok(()) + } + async fn find_coordinator( &mut self, key: CoordinatorKey, @@ -1829,6 +1929,14 @@ impl KafkaSinkCluster { body: ResponseBody::Produce(base), .. })) => Self::combine_produce_responses(base, drain)?, + Some(Frame::Kafka(KafkaFrame::Response { + body: ResponseBody::AddPartitionsToTxn(base), + version, + .. + })) => { + debug_assert!(*version > 3); + Self::combine_add_partitions_to_txn(base, drain)? + } _ => { return Err(anyhow!( "Combining of this message type is currently unsupported" @@ -1912,7 +2020,7 @@ impl KafkaSinkCluster { } } else { return Err(anyhow!( - "Combining Fetch responses but received another message type" + "Combining ListOffests responses but received another message type" )); } } @@ -1957,6 +2065,31 @@ impl KafkaSinkCluster { Ok(()) } + fn combine_add_partitions_to_txn( + base_add_partitions_to_txn: &mut AddPartitionsToTxnResponse, + drain: impl Iterator, + ) -> Result<()> { + for mut next in drain { + if let Some(Frame::Kafka(KafkaFrame::Response { + body: ResponseBody::AddPartitionsToTxn(next_add_partitions_to_txn), + .. + })) = next.frame() + { + base_add_partitions_to_txn + .results_by_transaction + .extend(std::mem::take( + &mut next_add_partitions_to_txn.results_by_transaction, + )); + } else { + return Err(anyhow!( + "Combining AddPartitionsToTxn responses but received another message type" + )); + } + } + + Ok(()) + } + async fn process_response( &mut self, response: &mut Message, @@ -2154,14 +2287,34 @@ impl KafkaSinkCluster { self.handle_transaction_coordinator_routing_error( &request_ty, partition_result.partition_error_code, - ) + ); } } } else { - self.handle_transaction_coordinator_routing_error( - &request_ty, - response.error_code, - ) + 'outer_loop: for (transaction_id, transaction) in + &response.results_by_transaction + { + for topic_results in transaction.topic_results.values() { + for partition_result in topic_results.results_by_partition.values() { + if let Some(ResponseError::NotCoordinator) = + ResponseError::try_from_code( + partition_result.partition_error_code, + ) + { + let broker_id = self + .transaction_to_coordinator_broker + .remove(transaction_id) + .map(|x| x.1); + tracing::info!( + "Response was error NOT_COORDINATOR and so cleared transaction id {:?} coordinator mapping to broker {:?}", + transaction_id, + broker_id, + ); + continue 'outer_loop; + } + } + } + } } } Some(Frame::Kafka(KafkaFrame::Response {