Skip to content

Commit

Permalink
KafkaSinkCluster: increment out_of_rack_requests metric for all out o…
Browse files Browse the repository at this point in the history
…f rack requests (#1639)
  • Loading branch information
rukai authored May 31, 2024
1 parent bb9f2b6 commit 60d7caa
Showing 1 changed file with 24 additions and 10 deletions.
34 changes: 24 additions & 10 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -753,7 +753,9 @@ impl KafkaSinkCluster {
{
node.broker_id
} else {
self.out_of_rack_requests.increment(1);
tracing::debug!(
"Routing fetch request to replica outside of shotover's rack"
);
self.nodes
.iter_mut()
.filter(|node| {
Expand Down Expand Up @@ -1008,17 +1010,28 @@ impl KafkaSinkCluster {
}

for (destination, requests) in broker_to_routed_requests {
self.nodes
let node = self
.nodes
.iter_mut()
.find(|x| x.broker_id == destination)
.unwrap()
.get_connection(
&self.connection_factory,
&self.authorize_scram_over_mtls,
&self.sasl_mechanism,
)
.await?
.send(requests.requests)?;
.unwrap();

if node
.rack
.as_ref()
.map(|rack| rack != &self.rack)
.unwrap_or(false)
{
self.out_of_rack_requests.increment(1);
}

node.get_connection(
&self.connection_factory,
&self.authorize_scram_over_mtls,
&self.sasl_mechanism,
)
.await?
.send(requests.requests)?;
}

Ok(())
Expand Down Expand Up @@ -1311,6 +1324,7 @@ impl KafkaSinkCluster {
self.nodes.choose(&mut self.rng).unwrap().broker_id
}
};

self.pending_requests.push_back(PendingRequest {
ty: PendingRequestTy::Routed {
destination,
Expand Down

0 comments on commit 60d7caa

Please sign in to comment.