Skip to content

Commit

Permalink
Implement routing for ListOffsets (#1767)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Oct 9, 2024
1 parent 9801ed4 commit 902273d
Show file tree
Hide file tree
Showing 4 changed files with 373 additions and 32 deletions.
69 changes: 67 additions & 2 deletions shotover-proxy/tests/kafka_int_tests/test_cases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use test_helpers::{
connection::kafka::{
Acl, AclOperation, AclPermissionType, AlterConfig, ConfigEntry, ConsumerConfig,
ExpectedResponse, KafkaConnectionBuilder, KafkaConsumer, KafkaDriver, KafkaProducer,
NewPartition, NewTopic, Record, ResourcePatternType, ResourceSpecifier, ResourceType,
TopicPartition,
ListOffsetsResultInfo, NewPartition, NewTopic, OffsetSpec, Record, ResourcePatternType,
ResourceSpecifier, ResourceType, TopicPartition,
},
docker_compose::DockerCompose,
};
Expand Down Expand Up @@ -859,6 +859,71 @@ pub async fn standard_test_suite(connection_builder: &KafkaConnectionBuilder) {
}])
.await;
produce_consume_partitions1(connection_builder, "partitions1").await;

let results = admin
.list_offsets(HashMap::from([
(
TopicPartition {
topic_name: "partitions3_case3".to_owned(),
partition: 0,
},
OffsetSpec::Earliest,
),
(
TopicPartition {
topic_name: "partitions3_case3".to_owned(),
partition: 1,
},
OffsetSpec::Earliest,
),
(
TopicPartition {
topic_name: "partitions3_case3".to_owned(),
partition: 2,
},
OffsetSpec::Earliest,
),
(
TopicPartition {
topic_name: "partitions1".to_owned(),
partition: 0,
},
OffsetSpec::Latest,
),
]))
.await;

let expected = HashMap::from([
(
TopicPartition {
topic_name: "partitions3_case3".to_owned(),
partition: 0,
},
ListOffsetsResultInfo { offset: 0 },
),
(
TopicPartition {
topic_name: "partitions3_case3".to_owned(),
partition: 1,
},
ListOffsetsResultInfo { offset: 0 },
),
(
TopicPartition {
topic_name: "partitions3_case3".to_owned(),
partition: 2,
},
ListOffsetsResultInfo { offset: 0 },
),
(
TopicPartition {
topic_name: "partitions1".to_owned(),
partition: 0,
},
ListOffsetsResultInfo { offset: 11 },
),
]);
assert_eq!(results, expected);
}

produce_consume_acks0(connection_builder).await;
Expand Down
235 changes: 222 additions & 13 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,17 @@ use kafka_node::{ConnectionFactory, KafkaAddress, KafkaNode, KafkaNodeState};
use kafka_protocol::indexmap::IndexMap;
use kafka_protocol::messages::fetch_request::FetchTopic;
use kafka_protocol::messages::fetch_response::LeaderIdAndEpoch as FetchResponseLeaderIdAndEpoch;
use kafka_protocol::messages::list_offsets_request::ListOffsetsTopic;
use kafka_protocol::messages::metadata_request::MetadataRequestTopic;
use kafka_protocol::messages::metadata_response::MetadataResponseBroker;
use kafka_protocol::messages::produce_request::TopicProduceData;
use kafka_protocol::messages::produce_response::LeaderIdAndEpoch as ProduceResponseLeaderIdAndEpoch;
use kafka_protocol::messages::{
ApiKey, BrokerId, FetchRequest, FetchResponse, FindCoordinatorRequest, FindCoordinatorResponse,
GroupId, HeartbeatRequest, JoinGroupRequest, LeaveGroupRequest, MetadataRequest,
MetadataResponse, ProduceRequest, ProduceResponse, RequestHeader, SaslAuthenticateRequest,
SaslAuthenticateResponse, SaslHandshakeRequest, SyncGroupRequest, TopicName,
GroupId, HeartbeatRequest, JoinGroupRequest, LeaveGroupRequest, ListOffsetsRequest,
ListOffsetsResponse, MetadataRequest, MetadataResponse, ProduceRequest, ProduceResponse,
RequestHeader, SaslAuthenticateRequest, SaslAuthenticateResponse, SaslHandshakeRequest,
SyncGroupRequest, TopicName,
};
use kafka_protocol::protocol::StrBytes;
use kafka_protocol::ResponseError;
Expand Down Expand Up @@ -613,6 +615,14 @@ impl KafkaSinkCluster {
self.store_topic_names(&mut topic_names, name.clone());
}
}
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::ListOffsets(list_offsets),
..
})) => {
for topic in &list_offsets.topics {
self.store_topic_names(&mut topic_names, topic.name.clone());
}
}
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::Fetch(fetch),
..
Expand Down Expand Up @@ -712,17 +722,19 @@ impl KafkaSinkCluster {
for mut message in requests {
// This routing is documented in transforms.md so make sure to update that when making changes here.
match message.frame() {
// route to partition leader
// split and route to partition leader
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::Produce(_),
..
})) => self.route_produce_request(message)?,

// route to random partition replica
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::Fetch(_),
..
})) => self.route_fetch_request(message)?,
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::ListOffsets(_),
..
})) => self.route_list_offsets(message)?,

// route to group coordinator
Some(Frame::Kafka(KafkaFrame::Request {
Expand Down Expand Up @@ -1117,6 +1129,140 @@ impl KafkaSinkCluster {
Ok(())
}

/// This method removes all topics from the list offsets request and returns them split up by their destination
/// If any topics are unroutable they will have their BrokerId set to -1
fn split_list_offsets_request_by_destination(
&mut self,
list_offsets: &mut ListOffsetsRequest,
) -> HashMap<BrokerId, Vec<ListOffsetsTopic>> {
let mut result: HashMap<BrokerId, Vec<ListOffsetsTopic>> = Default::default();

for mut topic in list_offsets.topics.drain(..) {
let topic_name = &topic.name;
if let Some(topic_meta) = self.topic_by_name.get(topic_name) {
for partition in std::mem::take(&mut topic.partitions) {
let partition_index = partition.partition_index as usize;
let destination = if let Some(partition) =
topic_meta.partitions.get(partition_index)
{
if partition.leader_id == -1 {
tracing::warn!(
"leader_id is unknown for {topic_name:?} at partition index {partition_index}",
);
}
partition.leader_id
} else {
let partition_len = topic_meta.partitions.len();
tracing::warn!("no known partition for {topic_name:?} at partition index {partition_index} out of {partition_len} partitions, routing message to a random broker so that a NOT_LEADER_OR_FOLLOWER or similar error is returned to the client");
BrokerId(-1)
};
tracing::debug!(
"Routing list_offsets request portion of partition {partition_index} in {topic_name:?} to broker {}",
destination.0
);
let dest_topics = result.entry(destination).or_default();
if let Some(dest_topic) = dest_topics.iter_mut().find(|x| x.name == topic.name)
{
dest_topic.partitions.push(partition);
} else {
let mut topic = topic.clone();
topic.partitions.push(partition);
dest_topics.push(topic);
}
}
} else {
tracing::warn!("no known partition replica for {topic_name:?}, routing message to a random broker so that a NOT_LEADER_OR_FOLLOWER or similar error is returned to the client");
let destination = BrokerId(-1);
let dest_topics = result.entry(destination).or_default();
dest_topics.push(topic);
}
}

result
}

fn route_list_offsets(&mut self, mut request: Message) -> Result<()> {
if let Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::ListOffsets(list_offsets),
..
})) = request.frame()
{
let routing = self.split_list_offsets_request_by_destination(list_offsets);

if routing.is_empty() {
// ListOffsets contains no topics, so we can just pick a random destination.
// The message is unchanged so we can just send as is.
let destination = random_broker_id(&self.nodes, &mut self.rng);

self.pending_requests.push_back(PendingRequest {
state: PendingRequestState::routed(destination, request),
// we dont need special handling for list_offsets, so just use Other
ty: PendingRequestTy::Other,
combine_responses: 1,
});
tracing::debug!(
"Routing ListOffsets request to random broker {} due to being empty",
destination.0
);
} else if routing.len() == 1 {
// Only 1 destination,
// so we can just reconstruct the original message as is,
// act like this never happened 😎,
// we dont even need to invalidate the message's cache.
let (destination, topics) = routing.into_iter().next().unwrap();
let destination = if destination == -1 {
random_broker_id(&self.nodes, &mut self.rng)
} else {
destination
};

list_offsets.topics = topics;
self.pending_requests.push_back(PendingRequest {
state: PendingRequestState::routed(destination, request),
// we dont need special handling for ListOffsets, so just use Other
ty: PendingRequestTy::Other,
combine_responses: 1,
});
tracing::debug!(
"Routing ListOffsets request to single broker {}",
destination.0
);
} else {
// The message has been split so it may be delivered to multiple destinations.
// We must generate a unique message for each destination.
let combine_responses = routing.len();
request.invalidate_cache();
for (i, (destination, topics)) in routing.into_iter().enumerate() {
let destination = if destination == -1 {
random_broker_id(&self.nodes, &mut self.rng)
} else {
destination
};
let mut request = if i == 0 {
// First message acts as base and retains message id
request.clone()
} else {
request.clone_with_new_id()
};
if let Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::ListOffsets(list_offsets),
..
})) = request.frame()
{
list_offsets.topics = topics;
}
self.pending_requests.push_back(PendingRequest {
state: PendingRequestState::routed(destination, request),
ty: PendingRequestTy::Other,
combine_responses,
});
}
tracing::debug!("Routing ListOffsets request to multiple brokers");
}
}
Ok(())
}

async fn find_coordinator_of_group(
&mut self,
group: GroupId,
Expand Down Expand Up @@ -1538,6 +1684,10 @@ impl KafkaSinkCluster {
body: ResponseBody::Fetch(base),
..
})) => Self::combine_fetch_responses(base, drain)?,
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::ListOffsets(base),
..
})) => Self::combine_list_offsets_responses(base, drain)?,
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::Produce(base),
..
Expand Down Expand Up @@ -1593,6 +1743,46 @@ impl KafkaSinkCluster {
Ok(())
}

fn combine_list_offsets_responses(
base_list_offsets: &mut ListOffsetsResponse,
drain: impl Iterator<Item = Message>,
) -> Result<()> {
for mut next in drain {
if let Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::ListOffsets(next_list_offsets),
..
})) = next.frame()
{
for next_topic in std::mem::take(&mut next_list_offsets.topics) {
if let Some(base_topic) = base_list_offsets
.topics
.iter_mut()
.find(|topic| topic.name == next_topic.name)
{
for next_partition in &next_topic.partitions {
for base_partition in &base_topic.partitions {
if next_partition.partition_index == base_partition.partition_index
{
tracing::warn!("Duplicate partition indexes in combined fetch response, if this ever occurs we should investigate the repercussions")
}
}
}
// A partition can only be contained in one response so there is no risk of duplicating partitions
base_topic.partitions.extend(next_topic.partitions)
} else {
base_list_offsets.topics.push(next_topic);
}
}
} else {
return Err(anyhow!(
"Combining Fetch responses but received another message type"
));
}
}

Ok(())
}

fn combine_produce_responses(
base_produce: &mut ProduceResponse,
drain: impl Iterator<Item = Message>,
Expand Down Expand Up @@ -1708,19 +1898,19 @@ impl KafkaSinkCluster {
response_partition.current_leader.leader_epoch;
}
tracing::info!(
"Produce response included error NOT_LEADER_OR_FOLLOWER and so updated leader in topic {:?} partition {}",
topic_name,
response_partition.index
);
"Produce response included error NOT_LEADER_OR_FOLLOWER and so updated leader in topic {:?} partition {}",
topic_name,
response_partition.index
);
}
}
} else {
// The broker doesnt know who the new leader is, clear the entire topic.
self.topic_by_name.remove(topic_name);
tracing::info!(
"Produce response included error NOT_LEADER_OR_FOLLOWER and so cleared topic {:?}",
topic_name,
);
"Produce response included error NOT_LEADER_OR_FOLLOWER and so cleared topic {:?}",
topic_name,
);
break;
}
}
Expand Down Expand Up @@ -1762,6 +1952,25 @@ impl KafkaSinkCluster {
}
response.invalidate_cache();
}
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::ListOffsets(list_offsets),
..
})) => {
for topic in &mut list_offsets.topics {
for partition in &mut topic.partitions {
if let Some(ResponseError::NotLeaderOrFollower) =
ResponseError::try_from_code(partition.error_code)
{
self.topic_by_name.remove(&topic.name);
tracing::info!(
"ListOffsets response included error NOT_LEADER_OR_FOLLOWER and so cleared metadata for topic {:?}",
topic.name,
);
break;
}
}
}
}
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::Heartbeat(heartbeat),
..
Expand Down
Loading

0 comments on commit 902273d

Please sign in to comment.