Skip to content

Commit

Permalink
KafkaSinkCluster: improve message naming (#1787)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Oct 29, 2024
1 parent 487ffec commit 9b845ef
Showing 1 changed file with 43 additions and 43 deletions.
86 changes: 43 additions & 43 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -852,33 +852,33 @@ impl KafkaSinkCluster {
}
other => {
return Err(anyhow!(
"Unexpected message returned to metadata request {other:?}"
"Unexpected response returned to metadata request {other:?}"
))
}
}
}

for mut message in requests {
for mut request in requests {
// This routing is documented in transforms.md so make sure to update that when making changes here.
match message.frame() {
match request.frame() {
// split and route to partition leader
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::Produce(_),
..
})) => self.split_and_route_request::<ProduceRequestSplitAndRouter>(message)?,
})) => self.split_and_route_request::<ProduceRequestSplitAndRouter>(request)?,
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::Fetch(_),
..
})) => self.route_fetch_request(message)?,
})) => self.route_fetch_request(request)?,
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::ListOffsets(_),
..
})) => self.split_and_route_request::<ListOffsetsRequestSplitAndRouter>(message)?,
})) => self.split_and_route_request::<ListOffsetsRequestSplitAndRouter>(request)?,
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::OffsetForLeaderEpoch(_),
..
})) => self.split_and_route_request::<OffsetForLeaderEpochRequestSplitAndRouter>(
message,
request,
)?,

// route to group coordinator
Expand All @@ -887,60 +887,60 @@ impl KafkaSinkCluster {
..
})) => {
let group_id = heartbeat.group_id.clone();
self.route_to_group_coordinator(message, group_id);
self.route_to_group_coordinator(request, group_id);
}
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::SyncGroup(sync_group),
..
})) => {
let group_id = sync_group.group_id.clone();
self.route_to_group_coordinator(message, group_id);
self.route_to_group_coordinator(request, group_id);
}
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::OffsetFetch(offset_fetch),
header,
})) => {
if header.request_api_version <= 7 {
let group_id = offset_fetch.group_id.clone();
self.route_to_group_coordinator(message, group_id);
self.route_to_group_coordinator(request, group_id);
} else {
self.split_and_route_request::<OffsetFetchSplitAndRouter>(message)?;
self.split_and_route_request::<OffsetFetchSplitAndRouter>(request)?;
};
}
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::OffsetCommit(offset_commit),
..
})) => {
let group_id = offset_commit.group_id.clone();
self.route_to_group_coordinator(message, group_id);
self.route_to_group_coordinator(request, group_id);
}
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::JoinGroup(join_group),
..
})) => {
let group_id = join_group.group_id.clone();
self.route_to_group_coordinator(message, group_id);
self.route_to_group_coordinator(request, group_id);
}
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::LeaveGroup(leave_group),
..
})) => {
let group_id = leave_group.group_id.clone();
self.route_to_group_coordinator(message, group_id);
self.route_to_group_coordinator(request, group_id);
}
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::DeleteGroups(_),
..
})) => {
self.split_and_route_request::<DeleteGroupsSplitAndRouter>(message)?;
self.split_and_route_request::<DeleteGroupsSplitAndRouter>(request)?;
}
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::TxnOffsetCommit(txn_offset_commit),
..
})) => {
let group_id = txn_offset_commit.group_id.clone();
// Despite being a transaction request this request is routed by group_id
self.route_to_group_coordinator(message, group_id);
self.route_to_group_coordinator(request, group_id);
}

// route to transaction coordinator
Expand All @@ -949,42 +949,42 @@ impl KafkaSinkCluster {
..
})) => {
let transaction_id = end_txn.transactional_id.clone();
self.route_to_transaction_coordinator(message, transaction_id);
self.route_to_transaction_coordinator(request, transaction_id);
}
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::InitProducerId(init_producer_id),
..
})) => {
if let Some(transaction_id) = init_producer_id.transactional_id.clone() {
self.route_to_transaction_coordinator(message, transaction_id);
self.route_to_transaction_coordinator(request, transaction_id);
} else {
self.route_to_random_broker(message);
self.route_to_random_broker(request);
}
}
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::AddPartitionsToTxn(_),
..
})) => self.route_add_partitions_to_txn(message)?,
})) => self.route_add_partitions_to_txn(request)?,
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::AddOffsetsToTxn(add_offsets_to_txn),
..
})) => {
let transaction_id = add_offsets_to_txn.transactional_id.clone();
self.route_to_transaction_coordinator(message, transaction_id);
self.route_to_transaction_coordinator(request, transaction_id);
}

Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::FindCoordinator(_),
..
})) => {
self.route_find_coordinator(message);
self.route_find_coordinator(request);
}

// route to controller broker
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::CreateTopics(_),
..
})) => self.route_to_controller(message),
})) => self.route_to_controller(request),

// route to random broker
Some(Frame::Kafka(KafkaFrame::Request {
Expand All @@ -997,7 +997,7 @@ impl KafkaSinkCluster {
| RequestBody::CreateAcls(_)
| RequestBody::ApiVersions(_),
..
})) => self.route_to_random_broker(message),
})) => self.route_to_random_broker(request),

// error handling
Some(Frame::Kafka(KafkaFrame::Request { header, .. })) => {
Expand All @@ -1006,12 +1006,12 @@ impl KafkaSinkCluster {
// remove Key postfix, since its not part of the actual message name which is confusing.
let request_type = request_type.trim_end_matches("Key");
tracing::warn!("Routing for request of type {request_type:?} has not been implemented yet.");
self.route_to_random_broker(message)
self.route_to_random_broker(request)
}
Some(_) => unreachable!("Must be a kafka request"),
None => {
tracing::warn!("Unable to parse request, routing to a random node");
self.route_to_random_broker(message)
self.route_to_random_broker(request)
}
}
}
Expand Down Expand Up @@ -1203,7 +1203,7 @@ impl KafkaSinkCluster {
partition.leader_id
} else {
let partition_len = topic_meta.partitions.len();
tracing::warn!("no known partition replica for {format_topic_name} at partition index {partition_index} out of {partition_len} partitions, routing message to a random broker so that a NOT_LEADER_OR_FOLLOWER or similar error is returned to the client");
tracing::warn!("no known partition replica for {format_topic_name} at partition index {partition_index} out of {partition_len} partitions, routing request to a random broker so that a NOT_LEADER_OR_FOLLOWER or similar error is returned to the client");
BrokerId(-1)
};
tracing::debug!(
Expand All @@ -1223,7 +1223,7 @@ impl KafkaSinkCluster {
}
}
} else {
tracing::warn!("no known partition replica for {format_topic_name}, routing message to a random broker so that a NOT_LEADER_OR_FOLLOWER or similar error is returned to the client");
tracing::warn!("no known partition replica for {format_topic_name}, routing request to a random broker so that a NOT_LEADER_OR_FOLLOWER or similar error is returned to the client");
let destination = BrokerId(-1);
let dest_topics = result.entry(destination).or_default();
dest_topics.push(topic);
Expand All @@ -1233,11 +1233,11 @@ impl KafkaSinkCluster {
result
}

fn route_fetch_request(&mut self, mut message: Message) -> Result<()> {
fn route_fetch_request(&mut self, mut request: Message) -> Result<()> {
if let Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::Fetch(fetch),
..
})) = message.frame()
})) = request.frame()
{
let routing = self.split_fetch_request_by_destination(fetch);

Expand All @@ -1247,7 +1247,7 @@ impl KafkaSinkCluster {
let destination = random_broker_id(&self.nodes, &mut self.rng);

self.pending_requests.push_back(PendingRequest {
state: PendingRequestState::routed(destination, message),
state: PendingRequestState::routed(destination, request),
// we dont need special handling for fetch, so just use Other
ty: PendingRequestTy::Other,
combine_responses: 1,
Expand All @@ -1270,7 +1270,7 @@ impl KafkaSinkCluster {

fetch.topics = topics;
self.pending_requests.push_back(PendingRequest {
state: PendingRequestState::routed(destination, message),
state: PendingRequestState::routed(destination, request),
// we dont need special handling for fetch, so just use Other
ty: PendingRequestTy::Other,
combine_responses: 1,
Expand All @@ -1288,7 +1288,7 @@ impl KafkaSinkCluster {
// The message has been split so it may be delivered to multiple destinations.
// We must generate a unique message for each destination.
let combine_responses = routing.len();
message.invalidate_cache();
request.invalidate_cache();
for (i, (destination, topics)) in routing.into_iter().enumerate() {
let destination = if destination == -1 {
random_broker_id(&self.nodes, &mut self.rng)
Expand All @@ -1297,9 +1297,9 @@ impl KafkaSinkCluster {
};
let mut request = if i == 0 {
// First message acts as base and retains message id
message.clone()
request.clone()
} else {
message.clone_with_new_id()
request.clone_with_new_id()
};
if let Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::Fetch(fetch),
Expand Down Expand Up @@ -1349,7 +1349,7 @@ impl KafkaSinkCluster {
partition.leader_id
} else {
let partition_len = topic_meta.partitions.len();
tracing::warn!("no known partition for {topic_name:?} at partition index {partition_index} out of {partition_len} partitions, routing message to a random broker so that a NOT_LEADER_OR_FOLLOWER or similar error is returned to the client");
tracing::warn!("no known partition for {topic_name:?} at partition index {partition_index} out of {partition_len} partitions, routing request to a random broker so that a NOT_LEADER_OR_FOLLOWER or similar error is returned to the client");
BrokerId(-1)
};
tracing::debug!(
Expand All @@ -1367,7 +1367,7 @@ impl KafkaSinkCluster {
}
}
} else {
tracing::warn!("no known partition replica for {topic_name:?}, routing message to a random broker so that a NOT_LEADER_OR_FOLLOWER or similar error is returned to the client");
tracing::warn!("no known partition replica for {topic_name:?}, routing request to a random broker so that a NOT_LEADER_OR_FOLLOWER or similar error is returned to the client");
let destination = BrokerId(-1);
let dest_topics = result.entry(destination).or_default();
dest_topics.push(topic);
Expand All @@ -1390,7 +1390,7 @@ impl KafkaSinkCluster {
let dest_groups = result.entry(*destination).or_default();
dest_groups.push(group_id);
} else {
tracing::warn!("no known coordinator for group {group_id:?}, routing message to a random broker so that a NOT_COORDINATOR or similar error is returned to the client");
tracing::warn!("no known coordinator for group {group_id:?}, routing request to a random broker so that a NOT_COORDINATOR or similar error is returned to the client");
let destination = BrokerId(-1);
let dest_groups = result.entry(destination).or_default();
dest_groups.push(group_id);
Expand Down Expand Up @@ -1469,7 +1469,7 @@ impl KafkaSinkCluster {
}
}
} else {
tracing::warn!("no known partition replica for {topic_name:?}, routing message to a random broker so that a NOT_LEADER_OR_FOLLOWER or similar error is returned to the client");
tracing::warn!("no known partition replica for {topic_name:?}, routing request to a random broker so that a NOT_LEADER_OR_FOLLOWER or similar error is returned to the client");
let destination = BrokerId(-1);
let dest_topics = result.entry(destination).or_default();
dest_topics.push(topic);
Expand Down Expand Up @@ -1569,7 +1569,7 @@ impl KafkaSinkCluster {
))),
},
other => Err(anyhow!(
"Unexpected message returned to findcoordinator request {other:?}"
"Unexpected response returned to findcoordinator request {other:?}"
))?,
}
}
Expand Down Expand Up @@ -2670,7 +2670,7 @@ impl KafkaSinkCluster {
{
node.broker_id
} else {
tracing::warn!("no known broker with id {broker_id:?} that is 'up', routing message to a random broker so that a NOT_CONTROLLER or similar error is returned to the client");
tracing::warn!("no known broker with id {broker_id:?} that is 'up', routing request to a random broker so that a NOT_CONTROLLER or similar error is returned to the client");
random_broker_id(&self.nodes, &mut self.rng)
};

Expand All @@ -2690,7 +2690,7 @@ impl KafkaSinkCluster {
let destination = match destination {
Some(destination) => *destination,
None => {
tracing::info!("no known coordinator for {group_id:?}, routing message to a random broker so that a NOT_COORDINATOR or similar error is returned to the client");
tracing::info!("no known coordinator for {group_id:?}, routing request to a random broker so that a NOT_COORDINATOR or similar error is returned to the client");
random_broker_id(&self.nodes, &mut self.rng)
}
};
Expand All @@ -2717,7 +2717,7 @@ impl KafkaSinkCluster {
let destination = match destination {
Some(destination) => *destination,
None => {
tracing::info!("no known coordinator for {transaction_id:?}, routing message to a random broker so that a NOT_COORDINATOR or similar error is returned to the client");
tracing::info!("no known coordinator for {transaction_id:?}, routing request to a random broker so that a NOT_COORDINATOR or similar error is returned to the client");
random_broker_id(&self.nodes, &mut self.rng)
}
};
Expand Down

0 comments on commit 9b845ef

Please sign in to comment.