Skip to content

Commit

Permalink
KafkaSinkCluster race condition fix (#1528)
Browse files Browse the repository at this point in the history
  • Loading branch information
conorbros authored Mar 15, 2024
1 parent 97f027a commit 6449b8f
Showing 1 changed file with 21 additions and 11 deletions.
32 changes: 21 additions & 11 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ impl TransformBuilder for KafkaSinkClusterBuilder {
rng: SmallRng::from_rng(rand::thread_rng()).unwrap(),
sasl_status: SaslStatus::new(self.sasl_enabled),
connection_factory: ConnectionFactory::new(self.tls.clone(), self.connect_timeout),
first_contact_node: None,
})
}

Expand Down Expand Up @@ -231,6 +232,7 @@ pub struct KafkaSinkCluster {
rng: SmallRng,
sasl_status: SaslStatus,
connection_factory: ConnectionFactory,
first_contact_node: Option<BrokerId>,
}

#[async_trait]
Expand Down Expand Up @@ -529,7 +531,8 @@ impl KafkaSinkCluster {
..
})) => {
let (tx, rx) = oneshot::channel();
self.route_to_first_node(message.clone(), Some(tx)).await?;
self.route_to_first_contact_node(message.clone(), Some(tx))
.await?;

results.push(rx);
}
Expand All @@ -539,7 +542,8 @@ impl KafkaSinkCluster {
..
})) => {
let (tx, rx) = oneshot::channel();
self.route_to_first_node(message.clone(), Some(tx)).await?;
self.route_to_first_contact_node(message.clone(), Some(tx))
.await?;

self.connection_factory
.add_handshake_message(message.clone());
Expand All @@ -552,7 +556,8 @@ impl KafkaSinkCluster {
..
})) => {
let (tx, rx) = oneshot::channel();
self.route_to_first_node(message.clone(), Some(tx)).await?;
self.route_to_first_contact_node(message.clone(), Some(tx))
.await?;
self.connection_factory.add_auth_message(message.clone());
results.push(rx);
self.sasl_status.set_handshake_complete();
Expand Down Expand Up @@ -580,19 +585,24 @@ impl KafkaSinkCluster {
Ok(results)
}

async fn route_to_first_node(
async fn route_to_first_contact_node(
&mut self,
message: Message,
return_chan: Option<oneshot::Sender<Response>>,
) -> Result<()> {
let connection = self
.nodes
.get_mut(0)
.unwrap()
.get_connection(&self.connection_factory)
.await?;
let node = if let Some(first_contact_node) = self.first_contact_node {
self.nodes
.iter_mut()
.find(|node| node.broker_id == first_contact_node)
.unwrap()
} else {
let node = self.nodes.get_mut(0).unwrap();
self.first_contact_node = Some(node.broker_id);
node
};

connection
node.get_connection(&self.connection_factory)
.await?
.send(Request {
message,
return_chan,
Expand Down

0 comments on commit 6449b8f

Please sign in to comment.