Skip to content

Commit

Permalink
review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
conorbros committed Mar 12, 2024
1 parent bed781a commit 8136647
Showing 1 changed file with 1 addition and 19 deletions.
20 changes: 1 addition & 19 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ impl KafkaSinkClusterBuilder {
sasl_enabled: bool,
) -> KafkaSinkClusterBuilder {
let receive_timeout = timeout.map(Duration::from_secs);
tracing::info!("{:?}", sasl_enabled);

let shotover_nodes = shotover_nodes
.into_iter()
Expand Down Expand Up @@ -503,20 +502,8 @@ impl KafkaSinkCluster {
body: RequestBody::ApiVersions(_),
..
})) => {
let connection = self
.nodes
.get_mut(0)
.unwrap()
.get_connection(&self.connection_factory)
.await?;

let (tx, rx) = oneshot::channel();
connection
.send(Request {
message: message.clone(),
return_chan: Some(tx),
})
.map_err(|_| anyhow!("Failed to send"))?;
self.route_to_first_node(message.clone(), Some(tx)).await?;

results.push(rx);
}
Expand All @@ -543,11 +530,6 @@ impl KafkaSinkCluster {
self.connection_factory.add_auth_message(message.clone());
results.push(rx);
self.sasl_status.set_handshake_complete();

// now that we have the full handshake, open a connection to all nodes
for node in &mut self.nodes {
node.get_connection(&self.connection_factory).await?;
}
}

// route to random node
Expand Down

0 comments on commit 8136647

Please sign in to comment.