Skip to content

Commit

Permalink
split AddPartitionsToTxn
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Oct 10, 2024
1 parent 91d8687 commit c2db86d
Showing 1 changed file with 219 additions and 60 deletions.
279 changes: 219 additions & 60 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ use connections::{Connections, Destination};
use dashmap::DashMap;
use kafka_node::{ConnectionFactory, KafkaAddress, KafkaNode, KafkaNodeState};
use kafka_protocol::indexmap::IndexMap;
use kafka_protocol::messages::add_partitions_to_txn_request::{
AddPartitionsToTxnTopic, AddPartitionsToTxnTransaction,
};
use kafka_protocol::messages::fetch_request::FetchTopic;
use kafka_protocol::messages::fetch_response::LeaderIdAndEpoch as FetchResponseLeaderIdAndEpoch;
use kafka_protocol::messages::list_offsets_request::ListOffsetsTopic;
Expand All @@ -22,11 +25,12 @@ use kafka_protocol::messages::metadata_response::MetadataResponseBroker;
use kafka_protocol::messages::produce_request::TopicProduceData;
use kafka_protocol::messages::produce_response::LeaderIdAndEpoch as ProduceResponseLeaderIdAndEpoch;
use kafka_protocol::messages::{
ApiKey, BrokerId, EndTxnRequest, FetchRequest, FetchResponse, FindCoordinatorRequest,
FindCoordinatorResponse, GroupId, HeartbeatRequest, InitProducerIdRequest, JoinGroupRequest,
LeaveGroupRequest, ListOffsetsRequest, ListOffsetsResponse, MetadataRequest, MetadataResponse,
ProduceRequest, ProduceResponse, RequestHeader, SaslAuthenticateRequest,
SaslAuthenticateResponse, SaslHandshakeRequest, SyncGroupRequest, TopicName, TransactionalId,
AddPartitionsToTxnRequest, AddPartitionsToTxnResponse, ApiKey, BrokerId, EndTxnRequest,
FetchRequest, FetchResponse, FindCoordinatorRequest, FindCoordinatorResponse, GroupId,
HeartbeatRequest, InitProducerIdRequest, JoinGroupRequest, LeaveGroupRequest,
ListOffsetsRequest, ListOffsetsResponse, MetadataRequest, MetadataResponse, ProduceRequest,
ProduceResponse, RequestHeader, SaslAuthenticateRequest, SaslAuthenticateResponse,
SaslHandshakeRequest, SyncGroupRequest, TopicName, TransactionalId,
};
use kafka_protocol::protocol::StrBytes;
use kafka_protocol::ResponseError;
Expand Down Expand Up @@ -676,36 +680,29 @@ impl KafkaSinkCluster {
}
Some(Frame::Kafka(KafkaFrame::Request {
body:
// TODO: only keep the ones we actually to route for
// RequestBody::TxnOffsetCommit(TxnOffsetCommitRequest {
// transactional_id, ..
// })|
RequestBody::InitProducerId(InitProducerIdRequest {
transactional_id: Some(transactional_id), ..
transactional_id: Some(transactional_id),
..
})
| RequestBody::EndTxn(EndTxnRequest {
transactional_id, ..
}),
// | RequestBody::AddOffsetsToTxn(AddOffsetsToTxnRequest {
// transactional_id, ..
// }),
..
})) => {
self.store_transaction(&mut transactions, transactional_id.clone());
}
Some(Frame::Kafka(KafkaFrame::Request {
body:
RequestBody::AddPartitionsToTxn(add_partitions_to_txn_request)
,
body: RequestBody::AddPartitionsToTxn(add_partitions_to_txn_request),
header,
})) => {
if header.request_api_version <= 3 {
self.store_transaction(
&mut transactions,
add_partitions_to_txn_request.v3_and_below_transactional_id.clone()
add_partitions_to_txn_request
.v3_and_below_transactional_id
.clone(),
);
}
else {
} else {
for transaction in add_partitions_to_txn_request.transactions.keys() {
self.store_transaction(&mut transactions, transaction.clone());
}
Expand Down Expand Up @@ -838,41 +835,6 @@ impl KafkaSinkCluster {
let group_id = heartbeat.group_id.clone();
self.route_to_group_coordinator(message, group_id);
}
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::AddPartitionsToTxn(add_partitions_to_txn),
header,
})) => {
if header.request_api_version <= 3 {
let transaction_id =
add_partitions_to_txn.v3_and_below_transactional_id.clone();
self.route_to_transaction_coordinator(message, transaction_id);
} else {
// TODO: split request
#[allow(clippy::never_loop)]
for transaction_id in add_partitions_to_txn.transactions.keys() {
let transaction_id = transaction_id.clone();
self.route_to_transaction_coordinator(message, transaction_id);
break;
}
}
}
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::EndTxn(end_txn),
..
})) => {
let transaction_id = end_txn.transactional_id.clone();
self.route_to_transaction_coordinator(message, transaction_id);
}
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::InitProducerId(init_producer_id),
..
})) => {
if let Some(transaction_id) = init_producer_id.transactional_id.clone() {
self.route_to_transaction_coordinator(message, transaction_id);
} else {
self.route_to_random_broker(message);
}
}
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::SyncGroup(sync_group),
..
Expand Down Expand Up @@ -926,6 +888,30 @@ impl KafkaSinkCluster {
let group_id = groups.groups_names.first().unwrap().clone();
self.route_to_group_coordinator(message, group_id);
}

// route to transaction coordinator
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::EndTxn(end_txn),
..
})) => {
let transaction_id = end_txn.transactional_id.clone();
self.route_to_transaction_coordinator(message, transaction_id);
}
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::InitProducerId(init_producer_id),
..
})) => {
if let Some(transaction_id) = init_producer_id.transactional_id.clone() {
self.route_to_transaction_coordinator(message, transaction_id);
} else {
self.route_to_random_broker(message);
}
}
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::AddPartitionsToTxn(add_partitions_to_txn),
header,
})) => self.route_add_partitions_to_txn(message)?,

Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::FindCoordinator(_),
..
Expand Down Expand Up @@ -1393,6 +1379,122 @@ impl KafkaSinkCluster {
Ok(())
}

/// This method removes all transactions from the AddPartitionsToTxn request and returns them split up by their destination
/// If any topics are unroutable they will have their BrokerId set to -1
fn split_add_partition_to_txn_request_by_destination(
&mut self,
body: &mut AddPartitionsToTxnRequest,
) -> HashMap<BrokerId, IndexMap<TransactionalId, AddPartitionsToTxnTransaction>> {
let mut result: HashMap<BrokerId, IndexMap<_, _>> = Default::default();

for (transaction_id, transaction) in body.transactions.drain(..) {
let destination = if let Some(destination) =
self.transaction_to_coordinator_broker.get(&transaction_id)
{
tracing::debug!(
"Routing AddPartitionsToTxn request portion of transaction id {transaction_id:?} to broker {}",
destination.0
);
*destination
} else {
tracing::warn!("no known transaction for {transaction_id:?}, routing request to a random broker so that a NOT_COORDINATOR or similar error is returned to the client");
BrokerId(-1)
};
let dest_transactions = result.entry(destination).or_default();
dest_transactions.insert(transaction_id, transaction);
}

result
}

fn route_add_partitions_to_txn(&mut self, mut request: Message) -> Result<()> {
if let Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::AddPartitionsToTxn(body),
header,
..
})) = request.frame()
{
if header.request_api_version <= 3 {
let transaction_id = body.v3_and_below_transactional_id.clone();
self.route_to_transaction_coordinator(request, transaction_id);
} else {
let routing = self.split_add_partition_to_txn_request_by_destination(body);

if routing.is_empty() {
// ListOffsets contains no topics, so we can just pick a random destination.
// The message is unchanged so we can just send as is.
let destination = random_broker_id(&self.nodes, &mut self.rng);

self.pending_requests.push_back(PendingRequest {
state: PendingRequestState::routed(destination, request),
// we dont need special handling for list_offsets, so just use Other
ty: PendingRequestTy::Other,
combine_responses: 1,
});
tracing::debug!(
"Routing AddPartitionsToTxn request to random broker {} due to being empty",
destination.0
);
} else if routing.len() == 1 {
// Only 1 destination,
// so we can just reconstruct the original message as is,
// act like this never happened 😎,
// we dont even need to invalidate the message's cache.
let (destination, transactions) = routing.into_iter().next().unwrap();
let destination = if destination == -1 {
random_broker_id(&self.nodes, &mut self.rng)
} else {
destination
};

body.transactions = transactions;
self.pending_requests.push_back(PendingRequest {
state: PendingRequestState::routed(destination, request),
// we dont need special handling for ListOffsets, so just use Other
ty: PendingRequestTy::Other,
combine_responses: 1,
});
tracing::debug!(
"Routing AddPartitionsToTxn request to single broker {}",
destination.0
);
} else {
// The message has been split so it may be delivered to multiple destinations.
// We must generate a unique message for each destination.
let combine_responses = routing.len();
request.invalidate_cache();
for (i, (destination, transactions)) in routing.into_iter().enumerate() {
let destination = if destination == -1 {
random_broker_id(&self.nodes, &mut self.rng)
} else {
destination
};
let mut request = if i == 0 {
// First message acts as base and retains message id
request.clone()
} else {
request.clone_with_new_id()
};
if let Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::AddPartitionsToTxn(body),
..
})) = request.frame()
{
body.transactions = transactions;
}
self.pending_requests.push_back(PendingRequest {
state: PendingRequestState::routed(destination, request),
ty: PendingRequestTy::Other,
combine_responses,
});
}
tracing::debug!("Routing AddPartitionsToTxn request to multiple brokers");
}
}
}
Ok(())
}

async fn find_coordinator(
&mut self,
key: CoordinatorKey,
Expand Down Expand Up @@ -1829,6 +1931,14 @@ impl KafkaSinkCluster {
body: ResponseBody::Produce(base),
..
})) => Self::combine_produce_responses(base, drain)?,
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::AddPartitionsToTxn(base),
version,
..
})) => {
debug_assert!(*version > 3);
Self::combine_add_partitions_to_txn(base, drain)?
}
_ => {
return Err(anyhow!(
"Combining of this message type is currently unsupported"
Expand Down Expand Up @@ -1912,7 +2022,7 @@ impl KafkaSinkCluster {
}
} else {
return Err(anyhow!(
"Combining Fetch responses but received another message type"
"Combining ListOffests responses but received another message type"
));
}
}
Expand Down Expand Up @@ -1957,6 +2067,31 @@ impl KafkaSinkCluster {
Ok(())
}

fn combine_add_partitions_to_txn(
base_add_partitions_to_txn: &mut AddPartitionsToTxnResponse,
drain: impl Iterator<Item = Message>,
) -> Result<()> {
for mut next in drain {
if let Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::AddPartitionsToTxn(next_add_partitions_to_txn),
..
})) = next.frame()
{
base_add_partitions_to_txn
.results_by_transaction
.extend(std::mem::take(
&mut next_add_partitions_to_txn.results_by_transaction,
));
} else {
return Err(anyhow!(
"Combining AddPartitionsToTxn responses but received another message type"
));
}
}

Ok(())
}

async fn process_response(
&mut self,
response: &mut Message,
Expand Down Expand Up @@ -2154,14 +2289,38 @@ impl KafkaSinkCluster {
self.handle_transaction_coordinator_routing_error(
&request_ty,
partition_result.partition_error_code,
)
);
}
}
} else {
self.handle_transaction_coordinator_routing_error(
&request_ty,
response.error_code,
)
'outer_loop: for (transaction_id, transaction) in
&response.results_by_transaction
{
for topic_results in transaction.topic_results.values() {
for partition_result in topic_results.results_by_partition.values() {
self.handle_transaction_coordinator_routing_error(
&request_ty,
partition_result.partition_error_code,
);
if let Some(ResponseError::NotCoordinator) =
ResponseError::try_from_code(
partition_result.partition_error_code,
)
{
let broker_id = self
.transaction_to_coordinator_broker
.remove(transaction_id)
.map(|x| x.1);
tracing::info!(
"Response was error NOT_COORDINATOR and so cleared transaction id {:?} coordinator mapping to broker {:?}",
transaction_id,
broker_id,
);
continue 'outer_loop;
}
}
}
}
}
}
Some(Frame::Kafka(KafkaFrame::Response {
Expand Down

0 comments on commit c2db86d

Please sign in to comment.