Skip to content

Commit

Permalink
refactor splitting
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Oct 15, 2024
1 parent 5a41409 commit baf3be6
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 189 deletions.
221 changes: 32 additions & 189 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ use scram_over_mtls::{
};
use serde::{Deserialize, Serialize};
use shotover_node::{ShotoverNode, ShotoverNodeConfig};
use split::{
AddPartitionsToTxnRequestSplitAndRouter, ListOffsetsRequestSplitAndRouter,
ProduceRequestSplitAndRouter, RequestSplitAndRouter,
};
use std::collections::{HashMap, VecDeque};
use std::hash::Hasher;
use std::sync::atomic::AtomicI64;
Expand All @@ -54,6 +58,7 @@ mod connections;
mod kafka_node;
mod scram_over_mtls;
pub mod shotover_node;
pub(crate) mod split;

const SASL_SCRAM_MECHANISMS: [&str; 2] = ["SCRAM-SHA-256", "SCRAM-SHA-512"];

Expand Down Expand Up @@ -269,7 +274,7 @@ impl AtomicBrokerId {
}
}

struct KafkaSinkCluster {
pub(crate) struct KafkaSinkCluster {
first_contact_points: Vec<KafkaAddress>,
shotover_nodes: Vec<ShotoverNode>,
rack: StrBytes,
Expand Down Expand Up @@ -815,15 +820,15 @@ impl KafkaSinkCluster {
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::Produce(_),
..
})) => self.route_produce_request(message)?,
})) => self.split_and_route_request::<ProduceRequestSplitAndRouter>(message)?,
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::Fetch(_),
..
})) => self.route_fetch_request(message)?,
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::ListOffsets(_),
..
})) => self.route_list_offsets(message)?,
})) => self.split_and_route_request::<ListOffsetsRequestSplitAndRouter>(message)?,

// route to group coordinator
Some(Frame::Kafka(KafkaFrame::Request {
Expand Down Expand Up @@ -938,84 +943,75 @@ impl KafkaSinkCluster {
});
}

fn route_produce_request(&mut self, mut message: Message) -> Result<()> {
if let Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::Produce(produce),
..
})) = message.frame()
{
let routing = self.split_produce_request_by_destination(produce);
fn split_and_route_request<T: RequestSplitAndRouter>(
&mut self,
mut request: Message,
) -> Result<()> {
if let Some(request_frame) = T::get_request_frame(&mut request) {
let routing = T::split_by_destination(self, request_frame);

if routing.is_empty() {
// Produce contains no topics, so we can just pick a random destination.
// The message is unchanged so we can just send as is.
// The request is unchanged so we can just send as is.
let destination = random_broker_id(&self.nodes, &mut self.rng);

self.pending_requests.push_back(PendingRequest {
state: PendingRequestState::routed(destination, message),
state: PendingRequestState::routed(destination, request),
ty: PendingRequestTy::Other,
combine_responses: 1,
});
tracing::debug!(
"Routing produce request to random broker {} due to being empty",
"Routing request to random broker {} due to being empty",
destination.0
);
} else if routing.len() == 1 {
// Only 1 destination,
// so we can just reconstruct the original message as is,
// so we can just reconstruct the original request as is,
// act like this never happened 😎,
// we dont even need to invalidate the message's cache.
// we dont even need to invalidate the request's cache.
let (destination, topic_data) = routing.into_iter().next().unwrap();
let destination = if destination == -1 {
random_broker_id(&self.nodes, &mut self.rng)
} else {
destination
};

produce.topic_data = topic_data;
T::reassemble(request_frame, topic_data);
self.pending_requests.push_back(PendingRequest {
state: PendingRequestState::routed(destination, message),
state: PendingRequestState::routed(destination, request),
ty: PendingRequestTy::Other,
combine_responses: 1,
});
tracing::debug!(
"Routing produce request to single broker {:?}",
destination.0
);
tracing::debug!("Routing request to single broker {:?}", destination.0);
} else {
// The message has been split so it may be delivered to multiple destinations.
// We must generate a unique message for each destination.
// The request has been split so it may be delivered to multiple destinations.
// We must generate a unique request for each destination.
let combine_responses = routing.len();
message.invalidate_cache();
request.invalidate_cache();
for (i, (destination, topic_data)) in routing.into_iter().enumerate() {
let destination = if destination == -1 {
random_broker_id(&self.nodes, &mut self.rng)
} else {
destination
};
let mut request = if i == 0 {
// First message acts as base and retains message id
message.clone()
// First request acts as base and retains message id
request.clone()
} else {
message.clone_with_new_id()
request.clone_with_new_id()
};
if let Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::Produce(produce),
..
})) = request.frame()
{
produce.topic_data = topic_data;
if let Some(request_frame) = T::get_request_frame(&mut request) {
T::reassemble(request_frame, topic_data)
}
self.pending_requests.push_back(PendingRequest {
state: PendingRequestState::routed(destination, request),
ty: PendingRequestTy::Other,
combine_responses,
});
}
tracing::debug!("Routing produce request to multiple brokers");
tracing::debug!("Routing request to multiple brokers");
}
}

Ok(())
}

Expand Down Expand Up @@ -1295,88 +1291,6 @@ impl KafkaSinkCluster {
result
}

fn route_list_offsets(&mut self, mut request: Message) -> Result<()> {
if let Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::ListOffsets(list_offsets),
..
})) = request.frame()
{
let routing = self.split_list_offsets_request_by_destination(list_offsets);

if routing.is_empty() {
// ListOffsets contains no topics, so we can just pick a random destination.
// The message is unchanged so we can just send as is.
let destination = random_broker_id(&self.nodes, &mut self.rng);

self.pending_requests.push_back(PendingRequest {
state: PendingRequestState::routed(destination, request),
// we dont need special handling for list_offsets, so just use Other
ty: PendingRequestTy::Other,
combine_responses: 1,
});
tracing::debug!(
"Routing ListOffsets request to random broker {} due to being empty",
destination.0
);
} else if routing.len() == 1 {
// Only 1 destination,
// so we can just reconstruct the original message as is,
// act like this never happened 😎,
// we dont even need to invalidate the message's cache.
let (destination, topics) = routing.into_iter().next().unwrap();
let destination = if destination == -1 {
random_broker_id(&self.nodes, &mut self.rng)
} else {
destination
};

list_offsets.topics = topics;
self.pending_requests.push_back(PendingRequest {
state: PendingRequestState::routed(destination, request),
// we dont need special handling for ListOffsets, so just use Other
ty: PendingRequestTy::Other,
combine_responses: 1,
});
tracing::debug!(
"Routing ListOffsets request to single broker {}",
destination.0
);
} else {
// The message has been split so it may be delivered to multiple destinations.
// We must generate a unique message for each destination.
let combine_responses = routing.len();
request.invalidate_cache();
for (i, (destination, topics)) in routing.into_iter().enumerate() {
let destination = if destination == -1 {
random_broker_id(&self.nodes, &mut self.rng)
} else {
destination
};
let mut request = if i == 0 {
// First message acts as base and retains message id
request.clone()
} else {
request.clone_with_new_id()
};
if let Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::ListOffsets(list_offsets),
..
})) = request.frame()
{
list_offsets.topics = topics;
}
self.pending_requests.push_back(PendingRequest {
state: PendingRequestState::routed(destination, request),
ty: PendingRequestTy::Other,
combine_responses,
});
}
tracing::debug!("Routing ListOffsets request to multiple brokers");
}
}
Ok(())
}

/// This method removes all transactions from the AddPartitionsToTxn request and returns them split up by their destination
/// If any topics are unroutable they will have their BrokerId set to -1
fn split_add_partition_to_txn_request_by_destination(
Expand Down Expand Up @@ -1416,78 +1330,7 @@ impl KafkaSinkCluster {
let transaction_id = body.v3_and_below_transactional_id.clone();
self.route_to_transaction_coordinator(request, transaction_id);
} else {
let routing = self.split_add_partition_to_txn_request_by_destination(body);

if routing.is_empty() {
// ListOffsets contains no topics, so we can just pick a random destination.
// The message is unchanged so we can just send as is.
let destination = random_broker_id(&self.nodes, &mut self.rng);

self.pending_requests.push_back(PendingRequest {
state: PendingRequestState::routed(destination, request),
// we dont need special handling for list_offsets, so just use Other
ty: PendingRequestTy::Other,
combine_responses: 1,
});
tracing::debug!(
"Routing AddPartitionsToTxn request to random broker {} due to being empty",
destination.0
);
} else if routing.len() == 1 {
// Only 1 destination,
// so we can just reconstruct the original message as is,
// act like this never happened 😎,
// we dont even need to invalidate the message's cache.
let (destination, transactions) = routing.into_iter().next().unwrap();
let destination = if destination == -1 {
random_broker_id(&self.nodes, &mut self.rng)
} else {
destination
};

body.transactions = transactions;
self.pending_requests.push_back(PendingRequest {
state: PendingRequestState::routed(destination, request),
// we dont need special handling for ListOffsets, so just use Other
ty: PendingRequestTy::Other,
combine_responses: 1,
});
tracing::debug!(
"Routing AddPartitionsToTxn request to single broker {}",
destination.0
);
} else {
// The message has been split so it may be delivered to multiple destinations.
// We must generate a unique message for each destination.
let combine_responses = routing.len();
request.invalidate_cache();
for (i, (destination, transactions)) in routing.into_iter().enumerate() {
let destination = if destination == -1 {
random_broker_id(&self.nodes, &mut self.rng)
} else {
destination
};
let mut request = if i == 0 {
// First message acts as base and retains message id
request.clone()
} else {
request.clone_with_new_id()
};
if let Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::AddPartitionsToTxn(body),
..
})) = request.frame()
{
body.transactions = transactions;
}
self.pending_requests.push_back(PendingRequest {
state: PendingRequestState::routed(destination, request),
ty: PendingRequestTy::Other,
combine_responses,
});
}
tracing::debug!("Routing AddPartitionsToTxn request to multiple brokers");
}
self.split_and_route_request::<AddPartitionsToTxnRequestSplitAndRouter>(request)?;
}
}
Ok(())
Expand Down
Loading

0 comments on commit baf3be6

Please sign in to comment.