Skip to content

Commit

Permalink
route OffsetForLeaderEpoch
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Oct 18, 2024
1 parent 1e4b1a7 commit bb266ea
Show file tree
Hide file tree
Showing 2 changed files with 170 additions and 8 deletions.
145 changes: 139 additions & 6 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use kafka_protocol::messages::fetch_response::LeaderIdAndEpoch as FetchResponseL
use kafka_protocol::messages::list_offsets_request::ListOffsetsTopic;
use kafka_protocol::messages::metadata_request::MetadataRequestTopic;
use kafka_protocol::messages::metadata_response::MetadataResponseBroker;
use kafka_protocol::messages::offset_for_leader_epoch_request::OffsetForLeaderTopic;
use kafka_protocol::messages::produce_request::TopicProduceData;
use kafka_protocol::messages::produce_response::{
LeaderIdAndEpoch as ProduceResponseLeaderIdAndEpoch, TopicProduceResponse,
Expand All @@ -28,9 +29,9 @@ use kafka_protocol::messages::{
BrokerId, EndTxnRequest, FetchRequest, FetchResponse, FindCoordinatorRequest,
FindCoordinatorResponse, GroupId, HeartbeatRequest, InitProducerIdRequest, JoinGroupRequest,
LeaveGroupRequest, ListOffsetsRequest, ListOffsetsResponse, MetadataRequest, MetadataResponse,
ProduceRequest, ProduceResponse, RequestHeader, SaslAuthenticateRequest,
SaslAuthenticateResponse, SaslHandshakeRequest, SyncGroupRequest, TopicName, TransactionalId,
TxnOffsetCommitRequest,
OffsetForLeaderEpochRequest, OffsetForLeaderEpochResponse, ProduceRequest, ProduceResponse,
RequestHeader, SaslAuthenticateRequest, SaslAuthenticateResponse, SaslHandshakeRequest,
SyncGroupRequest, TopicName, TransactionalId, TxnOffsetCommitRequest,
};
use kafka_protocol::protocol::StrBytes;
use kafka_protocol::ResponseError;
Expand All @@ -46,7 +47,7 @@ use serde::{Deserialize, Serialize};
use shotover_node::{ShotoverNode, ShotoverNodeConfig};
use split::{
AddPartitionsToTxnRequestSplitAndRouter, ListOffsetsRequestSplitAndRouter,
ProduceRequestSplitAndRouter, RequestSplitAndRouter,
OffsetForLeaderEpochRequestSplitAndRouter, ProduceRequestSplitAndRouter, RequestSplitAndRouter,
};
use std::collections::{HashMap, HashSet, VecDeque};
use std::hash::Hasher;
Expand Down Expand Up @@ -675,6 +676,14 @@ impl KafkaSinkCluster {
self.store_topic_names(&mut topic_names, topic.name.clone());
}
}
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::OffsetForLeaderEpoch(body),
..
})) => {
for topic in &body.topics {
self.store_topic_names(&mut topic_names, topic.topic.clone());
}
}
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::Fetch(fetch),
..
Expand Down Expand Up @@ -852,6 +861,12 @@ impl KafkaSinkCluster {
body: RequestBody::ListOffsets(_),
..
})) => self.split_and_route_request::<ListOffsetsRequestSplitAndRouter>(message)?,
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::OffsetForLeaderEpoch(_),
..
})) => self.split_and_route_request::<OffsetForLeaderEpochRequestSplitAndRouter>(
message,
)?,

// route to group coordinator
Some(Frame::Kafka(KafkaFrame::Request {
Expand Down Expand Up @@ -980,7 +995,10 @@ impl KafkaSinkCluster {

// error handling
Some(Frame::Kafka(KafkaFrame::Request { header, .. })) => {
let request_type = ApiKey::try_from(header.request_api_key).unwrap();
let request_type =
format!("{:?}", ApiKey::try_from(header.request_api_key).unwrap());
// remove Key postfix, since its not part of the actual message name which is confusing.
let request_type = request_type.trim_end_matches("Key");
tracing::warn!("Routing for request of type {request_type:?} has not been implemented yet.");
self.route_to_random_broker(message)
}
Expand Down Expand Up @@ -1353,6 +1371,59 @@ impl KafkaSinkCluster {
result
}

/// This method removes all topics from the list offsets request and returns them split up by their destination
/// If any topics are unroutable they will have their BrokerId set to -1
fn split_offset_for_leader_epoch_request_by_destination(
&mut self,
body: &mut OffsetForLeaderEpochRequest,
) -> HashMap<BrokerId, Vec<OffsetForLeaderTopic>> {
let mut result: HashMap<BrokerId, Vec<OffsetForLeaderTopic>> = Default::default();

for mut topic in body.topics.drain(..) {
let topic_name = &topic.topic;
if let Some(topic_meta) = self.topic_by_name.get(topic_name) {
for partition in std::mem::take(&mut topic.partitions) {
let partition_index = partition.partition as usize;
let destination = if let Some(partition) =
topic_meta.partitions.get(partition_index)
{
if partition.leader_id == -1 {
tracing::warn!(
"leader_id is unknown for {topic_name:?} at partition index {partition_index}",
);
}
partition.leader_id
} else {
let partition_len = topic_meta.partitions.len();
tracing::warn!("no known partition for {topic_name:?} at partition index {partition_index} out of {partition_len} partitions, routing request to a random broker so that a NOT_LEADER_OR_FOLLOWER or similar error is returned to the client");
BrokerId(-1)
};
tracing::debug!(
"Routing OffsetForLeaderEpoch request portion of partition {partition_index} in {topic_name:?} to broker {}",
destination.0
);
let dest_topics = result.entry(destination).or_default();
if let Some(dest_topic) =
dest_topics.iter_mut().find(|x| x.topic == topic.topic)
{
dest_topic.partitions.push(partition);
} else {
let mut topic = topic.clone();
topic.partitions.push(partition);
dest_topics.push(topic);
}
}
} else {
tracing::warn!("no known partition replica for {topic_name:?}, routing message to a random broker so that a NOT_LEADER_OR_FOLLOWER or similar error is returned to the client");
let destination = BrokerId(-1);
let dest_topics = result.entry(destination).or_default();
dest_topics.push(topic);
}
}

result
}

/// This method removes all transactions from the AddPartitionsToTxn request and returns them split up by their destination
/// If any topics are unroutable they will have their BrokerId set to -1
fn split_add_partition_to_txn_request_by_destination(
Expand Down Expand Up @@ -1831,6 +1902,10 @@ impl KafkaSinkCluster {
body: ResponseBody::ListOffsets(base),
..
})) => Self::combine_list_offsets_responses(base, drain)?,
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::OffsetForLeaderEpoch(base),
..
})) => Self::combine_offset_for_leader_epoch_responses(base, drain)?,
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::Produce(base),
..
Expand Down Expand Up @@ -1926,7 +2001,46 @@ impl KafkaSinkCluster {
}
} else {
return Err(anyhow!(
"Combining ListOffests responses but received another message type"
"Combining ListOffsets responses but received another message type"
));
}
}

Ok(())
}

fn combine_offset_for_leader_epoch_responses(
base_list_offsets: &mut OffsetForLeaderEpochResponse,
drain: impl Iterator<Item = Message>,
) -> Result<()> {
for mut next in drain {
if let Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::OffsetForLeaderEpoch(next_body),
..
})) = next.frame()
{
for next_topic in std::mem::take(&mut next_body.topics) {
if let Some(base_topic) = base_list_offsets
.topics
.iter_mut()
.find(|topic| topic.topic == next_topic.topic)
{
for next_partition in &next_topic.partitions {
for base_partition in &base_topic.partitions {
if next_partition.partition == base_partition.partition {
tracing::warn!("Duplicate partition indexes in combined OffsetForLeaderEpoch response, if this ever occurs we should investigate the repercussions")
}
}
}
// A partition can only be contained in one response so there is no risk of duplicating partitions
base_topic.partitions.extend(next_topic.partitions)
} else {
base_list_offsets.topics.push(next_topic);
}
}
} else {
return Err(anyhow!(
"Combining OffsetForLeaderEpoch responses but received another message type"
));
}
}
Expand Down Expand Up @@ -2155,6 +2269,25 @@ impl KafkaSinkCluster {
}
}
}
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::OffsetForLeaderEpoch(body),
..
})) => {
for topic in &mut body.topics {
for partition in &mut topic.partitions {
if let Some(ResponseError::NotLeaderOrFollower) =
ResponseError::try_from_code(partition.error_code)
{
self.topic_by_name.remove(&topic.topic);
tracing::info!(
"OffsetForLeaderEpoch response included error NOT_LEADER_OR_FOLLOWER and so cleared metadata for topic {:?}",
topic.topic,
);
break;
}
}
}
}
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::Heartbeat(heartbeat),
..
Expand Down
33 changes: 31 additions & 2 deletions shotover/src/transforms/kafka/sink_cluster/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ use crate::{
};
use kafka_protocol::messages::{
add_partitions_to_txn_request::AddPartitionsToTxnTransaction,
list_offsets_request::ListOffsetsTopic, produce_request::TopicProduceData,
AddPartitionsToTxnRequest, BrokerId, ListOffsetsRequest, ProduceRequest, TopicName,
list_offsets_request::ListOffsetsTopic, offset_for_leader_epoch_request::OffsetForLeaderTopic,
produce_request::TopicProduceData, AddPartitionsToTxnRequest, BrokerId, ListOffsetsRequest,
OffsetForLeaderEpochRequest, ProduceRequest, TopicName,
};
use std::collections::HashMap;

Expand Down Expand Up @@ -107,3 +108,31 @@ impl RequestSplitAndRouter for ListOffsetsRequestSplitAndRouter {
request.topics = item;
}
}

pub struct OffsetForLeaderEpochRequestSplitAndRouter;

impl RequestSplitAndRouter for OffsetForLeaderEpochRequestSplitAndRouter {
type Request = OffsetForLeaderEpochRequest;
type SubRequests = Vec<OffsetForLeaderTopic>;

fn split_by_destination(
transform: &mut KafkaSinkCluster,
request: &mut Self::Request,
) -> HashMap<BrokerId, Self::SubRequests> {
transform.split_offset_for_leader_epoch_request_by_destination(request)
}

fn get_request_frame(request: &mut Message) -> Option<&mut Self::Request> {
match request.frame() {
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::OffsetForLeaderEpoch(request),
..
})) => Some(request),
_ => None,
}
}

fn reassemble(request: &mut Self::Request, item: Self::SubRequests) {
request.topics = item;
}
}

0 comments on commit bb266ea

Please sign in to comment.