Skip to content

Commit

Permalink
more fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Nov 14, 2024
1 parent 21143af commit a2da189
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 79 deletions.
100 changes: 49 additions & 51 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1125,70 +1125,68 @@ The connection to the client has been closed."
&mut self,
mut request: Message,
) -> Result<()> {
if let Some(request_frame) = T::get_request_frame(&mut request) {
let routing = T::split_by_destination(self, request_frame);
let request_frame = T::get_request_frame(&mut request);
let routing = T::split_by_destination(self, request_frame);

if routing.is_empty() {
// Produce contains no topics, so we can just pick a random destination.
// The request is unchanged so we can just send as is.
let destination = random_broker_id(&self.nodes, &mut self.rng);
if routing.is_empty() {
// Produce contains no topics, so we can just pick a random destination.
// The request is unchanged so we can just send as is.
let destination = random_broker_id(&self.nodes, &mut self.rng);

self.pending_requests.push_back(PendingRequest {
state: PendingRequestState::routed(destination, request),
ty: PendingRequestTy::Other,
combine_responses: 1,
});
tracing::debug!(
"Routing request to random broker {} due to being empty",
destination.0
);
} else if routing.len() == 1 {
// Only 1 destination,
// so we can just reconstruct the original request as is,
// act like this never happened 😎,
// we dont even need to invalidate the request's cache.
let (destination, topic_data) = routing.into_iter().next().unwrap();
self.pending_requests.push_back(PendingRequest {
state: PendingRequestState::routed(destination, request),
ty: PendingRequestTy::Other,
combine_responses: 1,
});
tracing::debug!(
"Routing request to random broker {} due to being empty",
destination.0
);
} else if routing.len() == 1 {
// Only 1 destination,
// so we can just reconstruct the original request as is,
// act like this never happened 😎,
// we dont even need to invalidate the request's cache.
let (destination, topic_data) = routing.into_iter().next().unwrap();
let destination = if destination == -1 {
random_broker_id(&self.nodes, &mut self.rng)
} else {
destination
};

T::reassemble(request_frame, topic_data);
self.pending_requests.push_back(PendingRequest {
state: PendingRequestState::routed(destination, request),
ty: PendingRequestTy::Other,
combine_responses: 1,
});
tracing::debug!("Routing request to single broker {:?}", destination.0);
} else {
// The request has been split so it may be delivered to multiple destinations.
// We must generate a unique request for each destination.
let combine_responses = routing.len();
request.invalidate_cache();
for (i, (destination, topic_data)) in routing.into_iter().enumerate() {
let destination = if destination == -1 {
random_broker_id(&self.nodes, &mut self.rng)
} else {
destination
};

let mut request = if i == 0 {
// First request acts as base and retains message id
request.clone()
} else {
request.clone_with_new_id()
};
let request_frame = T::get_request_frame(&mut request);
T::reassemble(request_frame, topic_data);
self.pending_requests.push_back(PendingRequest {
state: PendingRequestState::routed(destination, request),
ty: PendingRequestTy::Other,
combine_responses: 1,
combine_responses,
});
tracing::debug!("Routing request to single broker {:?}", destination.0);
} else {
// The request has been split so it may be delivered to multiple destinations.
// We must generate a unique request for each destination.
let combine_responses = routing.len();
request.invalidate_cache();
for (i, (destination, topic_data)) in routing.into_iter().enumerate() {
let destination = if destination == -1 {
random_broker_id(&self.nodes, &mut self.rng)
} else {
destination
};
let mut request = if i == 0 {
// First request acts as base and retains message id
request.clone()
} else {
request.clone_with_new_id()
};
if let Some(request_frame) = T::get_request_frame(&mut request) {
T::reassemble(request_frame, topic_data)
}
self.pending_requests.push_back(PendingRequest {
state: PendingRequestState::routed(destination, request),
ty: PendingRequestTy::Other,
combine_responses,
});
}
tracing::debug!("Routing request to multiple brokers");
}
tracing::debug!("Routing request to multiple brokers");
}
Ok(())
}
Expand Down
56 changes: 28 additions & 28 deletions shotover/src/transforms/kafka/sink_cluster/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::collections::HashMap;
pub trait RequestSplitAndRouter {
type SubRequests;
type Request;
fn get_request_frame(request: &mut Message) -> Option<&mut Self::Request>;
fn get_request_frame(request: &mut Message) -> &mut Self::Request;
fn split_by_destination(
transform: &mut KafkaSinkCluster,
request: &mut Self::Request,
Expand All @@ -41,13 +41,13 @@ impl RequestSplitAndRouter for ProduceRequestSplitAndRouter {
transform.split_produce_request_by_destination(request)
}

fn get_request_frame(request: &mut Message) -> Option<&mut Self::Request> {
fn get_request_frame(request: &mut Message) -> &mut Self::Request {
match request.frame() {
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::Produce(request),
..
})) => Some(request),
_ => None,
})) => request,
_ => unreachable!(),
}
}

Expand All @@ -69,13 +69,13 @@ impl RequestSplitAndRouter for AddPartitionsToTxnRequestSplitAndRouter {
transform.split_add_partition_to_txn_request_by_destination(request)
}

fn get_request_frame(request: &mut Message) -> Option<&mut Self::Request> {
fn get_request_frame(request: &mut Message) -> &mut Self::Request {
match request.frame() {
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::AddPartitionsToTxn(request),
..
})) => Some(request),
_ => None,
})) => request,
_ => unreachable!(),
}
}

Expand All @@ -97,13 +97,13 @@ impl RequestSplitAndRouter for ListOffsetsRequestSplitAndRouter {
transform.split_list_offsets_request_by_destination(request)
}

fn get_request_frame(request: &mut Message) -> Option<&mut Self::Request> {
fn get_request_frame(request: &mut Message) -> &mut Self::Request {
match request.frame() {
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::ListOffsets(request),
..
})) => Some(request),
_ => None,
})) => request,
_ => unreachable!(),
}
}

Expand All @@ -125,13 +125,13 @@ impl RequestSplitAndRouter for OffsetForLeaderEpochRequestSplitAndRouter {
transform.split_offset_for_leader_epoch_request_by_destination(request)
}

fn get_request_frame(request: &mut Message) -> Option<&mut Self::Request> {
fn get_request_frame(request: &mut Message) -> &mut Self::Request {
match request.frame() {
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::OffsetForLeaderEpoch(request),
..
})) => Some(request),
_ => None,
})) => request,
_ => unreachable!(),
}
}

Expand All @@ -153,13 +153,13 @@ impl RequestSplitAndRouter for DeleteRecordsRequestSplitAndRouter {
transform.split_delete_records_request_by_destination(request)
}

fn get_request_frame(request: &mut Message) -> Option<&mut Self::Request> {
fn get_request_frame(request: &mut Message) -> &mut Self::Request {
match request.frame() {
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::DeleteRecords(request),
..
})) => Some(request),
_ => None,
})) => request,
_ => unreachable!(),
}
}

Expand All @@ -181,13 +181,13 @@ impl RequestSplitAndRouter for DeleteGroupsSplitAndRouter {
transform.split_delete_groups_request_by_destination(request)
}

fn get_request_frame(request: &mut Message) -> Option<&mut Self::Request> {
fn get_request_frame(request: &mut Message) -> &mut Self::Request {
match request.frame() {
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::DeleteGroups(request),
..
})) => Some(request),
_ => None,
})) => request,
_ => unreachable!(),
}
}

Expand All @@ -209,13 +209,13 @@ impl RequestSplitAndRouter for ListGroupsSplitAndRouter {
transform.split_request_by_routing_to_all_brokers()
}

fn get_request_frame(request: &mut Message) -> Option<&mut Self::Request> {
fn get_request_frame(request: &mut Message) -> &mut Self::Request {
match request.frame() {
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::ListGroups(request),
..
})) => Some(request),
_ => None,
})) => request,
_ => unreachable!(),
}
}

Expand All @@ -237,13 +237,13 @@ impl RequestSplitAndRouter for ListTransactionsSplitAndRouter {
transform.split_request_by_routing_to_all_brokers()
}

fn get_request_frame(request: &mut Message) -> Option<&mut Self::Request> {
fn get_request_frame(request: &mut Message) -> &mut Self::Request {
match request.frame() {
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::ListTransactions(request),
..
})) => Some(request),
_ => None,
})) => request,
_ => unreachable!(),
}
}

Expand All @@ -265,13 +265,13 @@ impl RequestSplitAndRouter for OffsetFetchSplitAndRouter {
transform.split_offset_fetch_request_by_destination(request)
}

fn get_request_frame(request: &mut Message) -> Option<&mut Self::Request> {
fn get_request_frame(request: &mut Message) -> &mut Self::Request {
match request.frame() {
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::OffsetFetch(request),
..
})) => Some(request),
_ => None,
})) => request,
_ => unreachable!(),
}
}

Expand Down

0 comments on commit a2da189

Please sign in to comment.