Skip to content

Commit

Permalink
CassandraSinkCluster fix lost messages
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Nov 26, 2024
1 parent 13d8842 commit 38817c5
Showing 1 changed file with 11 additions and 8 deletions.
19 changes: 11 additions & 8 deletions shotover/src/transforms/cassandra/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,15 +476,18 @@ impl CassandraSinkCluster {
// Close all other connections as they are now invalidated.
// They will be recreated as needed with the correct use statement used automatically after the handshake
for node in self.pool.nodes_mut() {
node.outbound = None;
}
if let Some(outbound) = &mut node.outbound {
// Flush all pending responses first, otherwise they would be lost.
// This is bad for performance but USE statements are bad practice anyway.
outbound
.recv_all_pending(responses, self.version.unwrap())
.await
.map_err(|_| anyhow!("Failed to recv_all_pending"))?;

// Sending the use statement to these connections to keep them alive instead is possible but tricky.
// 1. The destinations need to be calculated here, at sending time, to ensure no new connections have been created in the meantime.
// 2. We need a way to filter out these extra responses from reaching the client.
// 3. But we cant use the TableRewrite abstraction since that occurs too early. See 1.
//
// It might be worth doing in the future.
// close the connection
node.outbound = None;
}
}
} else if is_prepare_message(&mut message) {
let next_host_id = self.message_rewriter.get_destination_for_prepare(&message);
match self
Expand Down

0 comments on commit 38817c5

Please sign in to comment.