Skip to content

Commit

Permalink
fix comments
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Aug 6, 2024
1 parent 96c6abc commit 1b943b8
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 14 deletions.
18 changes: 13 additions & 5 deletions shotover/src/transforms/kafka/sink_cluster/connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ use anyhow::{Context, Result};
use fnv::FnvBuildHasher;
use kafka_protocol::{messages::BrokerId, protocol::StrBytes};
use metrics::Counter;
use rand::{rngs::SmallRng, seq::SliceRandom};
use rand::{
rngs::SmallRng,
seq::{IteratorRandom, SliceRandom},
};
use std::{collections::HashMap, sync::atomic::Ordering};

use super::{
Expand Down Expand Up @@ -76,10 +79,15 @@ impl Connections {
let address = match &node {
Some(node) => &node.kafka_address,
None => {
// TODO: filter out down nodes
// if no up nodes, return error.
// do it in this PR.
self.control_connection_address = contact_points.choose(rng).cloned();
// If we have a node in the nodes list that is up use its address.
// Otherwise fall back to the first contact points
let address_from_node = nodes
.iter()
.filter(|x| matches!(x.state.load(Ordering::Relaxed), NodeState::Up))
.choose(rng)
.map(|x| x.kafka_address.clone());
self.control_connection_address =
address_from_node.or_else(|| contact_points.iter().choose(rng).cloned());
self.control_connection_address.as_ref().unwrap()
}
};
Expand Down
13 changes: 4 additions & 9 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1080,8 +1080,7 @@ routing message to a random node so that:
.await?
.send(requests.requests)
{
// Attempt to reopen connection for the side effect of setting node state to down.
// Dont actually use the connection though since we cant resend a failed a request.
// Dont retry the send on the new connection since we cant tell if the broker received the request or not.
self.connections
.handle_connection_error(
&mut self.rng,
Expand All @@ -1106,7 +1105,7 @@ routing message to a random node so that:
/// For response ordering reasons, some responses will remain in self.pending_requests until other responses are received.
async fn recv_responses(&mut self) -> Result<Vec<Message>> {
// Convert all received PendingRequestTy::Sent into PendingRequestTy::Received
let mut connections_to_reopen = vec![];
let mut connection_errors = vec![];
for (connection_destination, connection) in &mut self.connections.connections {
self.temp_responses_buffer.clear();
match connection.try_recv_into(&mut self.temp_responses_buffer) {
Expand All @@ -1133,15 +1132,11 @@ routing message to a random node so that:
}
}
}
Err(err) => connections_to_reopen.push((*connection_destination, err)),
Err(err) => connection_errors.push((*connection_destination, err)),
}
}

for (destination, err) in connections_to_reopen {
// Attempt to reopen connection for the side effects of:
// * setting node state to down
// * removing connections to down nodes so we dont continue attempting to receive from it.
// We do not attempt to receive from the node again, since if there actually were any pending responses we would need to give up.
for (destination, err) in connection_errors {
self.connections
.handle_connection_error(
&mut self.rng,
Expand Down

0 comments on commit 1b943b8

Please sign in to comment.