Skip to content

Commit

Permalink
Refactor send_requests and complete PendingRequestState refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
joshuabvarghese authored Nov 13, 2024
1 parent 455b4f7 commit c153971
Showing 1 changed file with 110 additions and 107 deletions.
217 changes: 110 additions & 107 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1894,7 +1894,7 @@ async fn send_requests(&mut self) -> Result<()> {
Ok(())
}

/// Receive all responses from the outgoing connections, returns all responses that are ready to be returned.
/// Receive all responses from the outgoing connections, returns all responses that are ready to be returned.
/// For response ordering reasons, some responses will remain in self.pending_requests until other responses are received.
async fn recv_responses(&mut self, close_client_connection: &mut bool) -> Result<Vec<Message>> {
// To work around borrow checker issues, store connection errors in this temporary list before handling them.
Expand All @@ -1914,18 +1914,16 @@ async fn send_requests(&mut self) -> Result<()> {
let mut response = Some(response);
for pending_request in &mut self.pending_requests {
if let PendingRequestState::Sent {
destination,
index,
request,
} = &mut pending_request.state
{
if destination == connection_destination {
if &pending_request.destination == connection_destination {
// Store the PendingRequestState::Received at the location of the next PendingRequestState::Sent
// All other PendingRequestState::Sent need to be decremented, in order to determine the PendingRequestState::Sent
// to be used next time, and the time after that, and ...
if *index == 0 {
pending_request.state = PendingRequestState::Received {
destination: *destination,
response: response.take().unwrap(),
request: request.take(),
};
Expand Down Expand Up @@ -1993,13 +1991,11 @@ async fn send_requests(&mut self) -> Result<()> {
for i in 0..combine_responses {
let pending_request = &mut self.pending_requests[i];
if let PendingRequestState::Received {
destination,
request,
..
} = &mut pending_request.state
{
pending_request.state = PendingRequestState::Routed {
destination: *destination,
request: request.take().unwrap(),
}
} else {
Expand Down Expand Up @@ -2069,8 +2065,7 @@ async fn send_requests(&mut self) -> Result<()> {
if let Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::Fetch(fetch),
..
})) = response.frame()
{
})) = response.frame() {
result += fetch
.responses
.iter()
Expand All @@ -2087,10 +2082,10 @@ async fn send_requests(&mut self) -> Result<()> {
})
.sum::<i64>();
} else {
panic!("must be called on fetch responses")
panic!("must be called on fetch responses");
}
} else {
panic!("must be called on received responses")
panic!("must be called on received responses");
}
}
result
Expand Down Expand Up @@ -2893,116 +2888,124 @@ async fn send_requests(&mut self) -> Result<()> {
}

fn route_to_control_connection(&mut self, request: Message) {
assert!(
!self.auth_complete,
"route_to_control_connection cannot be called after auth is complete. Otherwise it would collide with control_send_receive"
);
self.pending_requests.push_back(PendingRequest {
state: PendingRequestState::Routed {
destination: Destination::ControlConnection,
request,
},
ty: PendingRequestTy::Other,
combine_responses: 1,
});
tracing::debug!("Routing request to control connection");
}
assert!(
!self.auth_complete,
"route_to_control_connection cannot be called after auth is complete. Otherwise it would collide with control_send_receive"
);

let pending_request = PendingRequest {
state: PendingRequestState::Routed {
request,
},
destination: Destination::ControlConnection,
ty: PendingRequestTy::Other,
combine_responses: 1,
};

fn route_to_controller(&mut self, request: Message) {
let broker_id = self.controller_broker.get().unwrap();
self.pending_requests.push_back(pending_request);
tracing::debug!("Routing request to control connection");
}

let destination = if let Some(node) = self
.nodes
.iter_mut()
.find(|x| x.broker_id == *broker_id && x.is_up())
{
node.broker_id
} else {
tracing::warn!("no known broker with id {broker_id:?} that is 'up', routing request to a random broker so that a NOT_CONTROLLER or similar error is returned to the client");
random_broker_id(&self.nodes, &mut self.rng)
};
fn route_to_controller(&mut self, request: Message) {
let broker_id = self.controller_broker.get().unwrap();
let destination = if let Some(node) = self.nodes.iter_mut().find(|x| x.broker_id == *broker_id && x.is_up()) {
node.broker_id
} else {
tracing::warn!("no known broker with id {broker_id:?} that is 'up', routing request to a random broker so that a NOT_CONTROLLER or similar error is returned to the client");
random_broker_id(&self.nodes, &mut self.rng)
};

self.pending_requests.push_back(PendingRequest {
state: PendingRequestState::routed(destination, request),
ty: PendingRequestTy::Other,
combine_responses: 1,
});
tracing::debug!(
"Routing request relating to controller to broker {}",
destination.0
);
}
let pending_request = PendingRequest {
state: PendingRequestState::Routed {
request,
},
destination,
ty: PendingRequestTy::Other,
combine_responses: 1,
};

fn route_to_group_coordinator(&mut self, request: Message, group_id: GroupId) {
let destination = self.group_to_coordinator_broker.get(&group_id);
let destination = match destination {
Some(destination) => *destination,
None => {
tracing::info!("no known coordinator for {group_id:?}, routing request to a random broker so that a NOT_COORDINATOR or similar error is returned to the client");
random_broker_id(&self.nodes, &mut self.rng)
}
};
self.pending_requests.push_back(pending_request);
tracing::debug!("Routing request relating to controller to broker {}", destination.0);
}

tracing::debug!(
"Routing request relating to group id {:?} to broker {}",
group_id.0,
destination.0
);
fn route_to_group_coordinator(&mut self, request: Message, group_id: GroupId) {
let destination = self.group_to_coordinator_broker.get(&group_id).cloned().unwrap_or_else(|| {
tracing::info!("no known coordinator for {group_id:?}, routing request to a random broker so that a NOT_COORDINATOR or similar error is returned to the client");
random_broker_id(&self.nodes, &mut self.rng)
});

tracing::debug!(
"Routing request relating to group id {:?} to broker {}",
group_id.0,
destination.0
);

let pending_request = PendingRequest {
state: PendingRequestState::Routed {
request,
},
destination,
ty: PendingRequestTy::RoutedToGroup(group_id),
combine_responses: 1,
};

self.pending_requests.push_back(PendingRequest {
state: PendingRequestState::routed(destination, request),
ty: PendingRequestTy::RoutedToGroup(group_id),
combine_responses: 1,
});
}
self.pending_requests.push_back(pending_request);
}

fn route_to_transaction_coordinator(
&mut self,
request: Message,
transaction_id: TransactionalId,
) {
let destination = self.transaction_to_coordinator_broker.get(&transaction_id);
let destination = match destination {
Some(destination) => *destination,
None => {
tracing::info!("no known coordinator for {transaction_id:?}, routing request to a random broker so that a NOT_COORDINATOR or similar error is returned to the client");
random_broker_id(&self.nodes, &mut self.rng)
}
};
fn route_to_transaction_coordinator(
&mut self,
request: Message,
transaction_id: TransactionalId,
) {
let destination = self.transaction_to_coordinator_broker.get(&transaction_id).cloned().unwrap_or_else(|| {
tracing::info!("no known coordinator for {transaction_id:?}, routing request to a random broker so that a NOT_COORDINATOR or similar error is returned to the client");
random_broker_id(&self.nodes, &mut self.rng)
});

tracing::debug!(
"Routing request relating to transaction id {:?} to broker {}",
transaction_id.0,
destination.0
);

let pending_request = PendingRequest {
state: PendingRequestState::Routed {
request,
},
destination,
ty: PendingRequestTy::RoutedToTransaction(transaction_id),
combine_responses: 1,
};

tracing::debug!(
"Routing request relating to transaction id {:?} to broker {}",
transaction_id.0,
destination.0
);
self.pending_requests.push_back(pending_request);
}

self.pending_requests.push_back(PendingRequest {
state: PendingRequestState::routed(destination, request),
ty: PendingRequestTy::RoutedToTransaction(transaction_id),
combine_responses: 1,

fn route_find_coordinator(&mut self, mut request: Message) {
if let Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::FindCoordinator(find_coordinator),
..
})) = request.frame() {
let destination = random_broker_id(&self.nodes, &mut self.rng);
let ty = PendingRequestTy::FindCoordinator(FindCoordinator {
key: find_coordinator.key.clone(),
key_type: find_coordinator.key_type,
});
}

fn route_find_coordinator(&mut self, mut request: Message) {
if let Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::FindCoordinator(find_coordinator),
..
})) = request.frame()
{
let destination = random_broker_id(&self.nodes, &mut self.rng);
let ty = PendingRequestTy::FindCoordinator(FindCoordinator {
key: find_coordinator.key.clone(),
key_type: find_coordinator.key_type,
});
tracing::debug!("Routing FindCoordinator to random broker {}", destination.0);
tracing::debug!("Routing FindCoordinator to random broker {}", destination.0);

self.pending_requests.push_back(PendingRequest {
state: PendingRequestState::routed(destination, request),
ty,
combine_responses: 1,
});
}
let pending_request = PendingRequest {
state: PendingRequestState::Routed {
request,
},
destination,
ty,
combine_responses: 1,
};

self.pending_requests.push_back(pending_request);
}
}

async fn process_metadata_response(&mut self, metadata: &MetadataResponse) {
for broker in &metadata.brokers {
Expand Down

0 comments on commit c153971

Please sign in to comment.