Skip to content

Commit

Permalink
rename PendingRequestTy -> PendingRequestState
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Sep 24, 2024
1 parent fca032a commit 4a626bc
Showing 1 changed file with 32 additions and 31 deletions.
63 changes: 32 additions & 31 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ impl TransformBuilder for KafkaSinkClusterBuilder {
pending_requests: Default::default(),
// TODO: this approach with `find_coordinator_requests` and `routed_to_coordinator_for_group`
// is prone to memory leaks and logic errors.
// We should replace these fields with extra state within `PendingRequestTy::Received/Sent`.
// We should replace these fields with extra state within `PendingRequestState::Received/Sent`.
find_coordinator_requests: Default::default(),
routed_to_coordinator_for_group: Default::default(),
temp_responses_buffer: Default::default(),
Expand Down Expand Up @@ -300,7 +300,7 @@ struct KafkaSinkCluster {
/// State of a Request/Response is maintained by this enum.
/// The state progresses from Routed -> Sent -> Received
#[derive(Debug)]
enum PendingRequestTy {
enum PendingRequestState {
/// A route has been determined for this request but it has not yet been sent.
Routed {
destination: Destination,
Expand All @@ -319,7 +319,7 @@ enum PendingRequestTy {
Received { response: Message },
}

impl PendingRequestTy {
impl PendingRequestState {
fn routed(broker_id: BrokerId, request: Message) -> Self {
Self::Routed {
destination: Destination::Id(broker_id),
Expand All @@ -329,7 +329,7 @@ impl PendingRequestTy {
}

struct PendingRequest {
ty: PendingRequestTy,
state: PendingRequestState,
/// Combine the next N responses into a single response
/// This message should be considered the base message and will retain the shotover Message::id and kafka correlation_id
combine_responses: usize,
Expand Down Expand Up @@ -804,7 +804,7 @@ impl KafkaSinkCluster {
let destination = random_broker_id(&self.nodes, &mut self.rng);
tracing::debug!("Routing request to random broker {}", destination.0);
self.pending_requests.push_back(PendingRequest {
ty: PendingRequestTy::routed(destination, message),
state: PendingRequestState::routed(destination, message),
combine_responses: 1,
});
}
Expand All @@ -827,7 +827,7 @@ impl KafkaSinkCluster {
let destination = random_broker_id(&self.nodes, &mut self.rng);

self.pending_requests.push_back(PendingRequest {
ty: PendingRequestTy::routed(destination, message),
state: PendingRequestState::routed(destination, message),
combine_responses: 1,
});
tracing::debug!(
Expand All @@ -848,7 +848,7 @@ impl KafkaSinkCluster {

produce.topic_data = topic_data;
self.pending_requests.push_back(PendingRequest {
ty: PendingRequestTy::routed(destination, message),
state: PendingRequestState::routed(destination, message),
combine_responses: 1,
});
tracing::debug!(
Expand Down Expand Up @@ -880,7 +880,7 @@ impl KafkaSinkCluster {
produce.topic_data = topic_data;
}
self.pending_requests.push_back(PendingRequest {
ty: PendingRequestTy::routed(destination, request),
state: PendingRequestState::routed(destination, request),
combine_responses,
});
}
Expand Down Expand Up @@ -1037,7 +1037,7 @@ impl KafkaSinkCluster {
let destination = random_broker_id(&self.nodes, &mut self.rng);

self.pending_requests.push_back(PendingRequest {
ty: PendingRequestTy::routed(destination, message),
state: PendingRequestState::routed(destination, message),
combine_responses: 1,
});
tracing::debug!(
Expand All @@ -1058,7 +1058,7 @@ impl KafkaSinkCluster {

fetch.topics = topics;
self.pending_requests.push_back(PendingRequest {
ty: PendingRequestTy::routed(destination, message),
state: PendingRequestState::routed(destination, message),
combine_responses: 1,
});
tracing::debug!("Routing fetch request to single broker {}", destination.0);
Expand Down Expand Up @@ -1087,7 +1087,7 @@ impl KafkaSinkCluster {
fetch.topics = topics;
}
self.pending_requests.push_back(PendingRequest {
ty: PendingRequestTy::routed(destination, request),
state: PendingRequestState::routed(destination, request),
combine_responses,
});
}
Expand Down Expand Up @@ -1172,7 +1172,7 @@ impl KafkaSinkCluster {
.context("Failed to query metadata of topics")
}

/// Convert all PendingRequestTy::Routed into PendingRequestTy::Sent
/// Convert all PendingRequestState::Routed into PendingRequestState::Sent
async fn send_requests(&mut self) -> Result<()> {
struct RoutedRequests {
requests: Vec<Message>,
Expand All @@ -1181,7 +1181,8 @@ impl KafkaSinkCluster {

let mut broker_to_routed_requests: HashMap<Destination, RoutedRequests> = HashMap::new();
for i in 0..self.pending_requests.len() {
if let PendingRequestTy::Routed { destination, .. } = &self.pending_requests[i].ty {
if let PendingRequestState::Routed { destination, .. } = &self.pending_requests[i].state
{
let routed_requests = broker_to_routed_requests
.entry(*destination)
.or_insert_with(|| RoutedRequests {
Expand All @@ -1190,10 +1191,10 @@ impl KafkaSinkCluster {
.pending_requests
.iter()
.filter(|pending_request| {
if let PendingRequestTy::Sent {
if let PendingRequestState::Sent {
destination: check_destination,
..
} = &pending_request.ty
} = &pending_request.state
{
check_destination == destination
} else {
Expand All @@ -1202,12 +1203,12 @@ impl KafkaSinkCluster {
})
.count(),
});
let mut value = PendingRequestTy::Sent {
let mut value = PendingRequestState::Sent {
destination: *destination,
index: routed_requests.requests.len() + routed_requests.already_pending,
};
std::mem::swap(&mut self.pending_requests[i].ty, &mut value);
if let PendingRequestTy::Routed { request, .. } = value {
std::mem::swap(&mut self.pending_requests[i].state, &mut value);
if let PendingRequestState::Routed { request, .. } = value {
routed_requests.requests.push(request);
}
}
Expand Down Expand Up @@ -1296,7 +1297,7 @@ impl KafkaSinkCluster {
// To work around borrow checker issues, store connection errors in this temporary list before handling them.
let mut connection_errors = vec![];

// Convert all received PendingRequestTy::Sent into PendingRequestTy::Received
// Convert all received PendingRequestState::Sent into PendingRequestState::Received
for (connection_destination, connection) in &mut self.connections.connections {
// skip recv when no pending requests to avoid timeouts on old connections
if connection.pending_requests_count() != 0 {
Expand All @@ -1309,15 +1310,15 @@ impl KafkaSinkCluster {
for response in self.temp_responses_buffer.drain(..) {
let mut response = Some(response);
for pending_request in &mut self.pending_requests {
if let PendingRequestTy::Sent { destination, index } =
&mut pending_request.ty
if let PendingRequestState::Sent { destination, index } =
&mut pending_request.state
{
if destination == connection_destination {
// Store the PendingRequestTy::Received at the location of the next PendingRequestTy::Sent
// All other PendingRequestTy::Sent need to be decremented, in order to determine the PendingRequestTy::Sent
// Store the PendingRequestState::Received at the location of the next PendingRequestState::Sent
// All other PendingRequestState::Sent need to be decremented, in order to determine the PendingRequestState::Sent
// to be used next time, and the time after that, and ...
if *index == 0 {
pending_request.ty = PendingRequestTy::Received {
pending_request.state = PendingRequestState::Received {
response: response.take().unwrap(),
};
} else {
Expand Down Expand Up @@ -1347,14 +1348,14 @@ impl KafkaSinkCluster {
.await?;
}

// Remove and return all PendingRequestTy::Received that are ready to be received.
// Remove and return all PendingRequestState::Received that are ready to be received.
let mut responses = vec![];
while let Some(pending_request) = self.pending_requests.front() {
let all_combined_received = (0..pending_request.combine_responses).all(|i| {
matches!(
self.pending_requests.get(i),
Some(PendingRequest {
ty: PendingRequestTy::Received { .. },
state: PendingRequestState::Received { .. },
..
})
)
Expand All @@ -1363,7 +1364,7 @@ impl KafkaSinkCluster {
// The next response we are waiting on has been received, add it to responses
if pending_request.combine_responses == 1 {
if let Some(PendingRequest {
ty: PendingRequestTy::Received { response },
state: PendingRequestState::Received { response },
..
}) = self.pending_requests.pop_front()
{
Expand All @@ -1375,7 +1376,7 @@ impl KafkaSinkCluster {
.drain(..pending_request.combine_responses)
.map(|x| {
if let PendingRequest {
ty: PendingRequestTy::Received { response },
state: PendingRequestState::Received { response },
..
} = x
{
Expand Down Expand Up @@ -1770,7 +1771,7 @@ impl KafkaSinkCluster {
"route_to_control_connection cannot be called after auth is complete. Otherwise it would collide with control_send_receive"
);
self.pending_requests.push_back(PendingRequest {
ty: PendingRequestTy::Routed {
state: PendingRequestState::Routed {
destination: Destination::ControlConnection,
request,
},
Expand All @@ -1794,7 +1795,7 @@ impl KafkaSinkCluster {
};

self.pending_requests.push_back(PendingRequest {
ty: PendingRequestTy::routed(destination, request),
state: PendingRequestState::routed(destination, request),
combine_responses: 1,
});
tracing::debug!(
Expand All @@ -1816,7 +1817,7 @@ impl KafkaSinkCluster {
};

self.pending_requests.push_back(PendingRequest {
ty: PendingRequestTy::routed(destination, request),
state: PendingRequestState::routed(destination, request),
combine_responses: 1,
});
tracing::debug!(
Expand Down

0 comments on commit 4a626bc

Please sign in to comment.