Skip to content

Commit

Permalink
CassandraSinkCluster fix lost messages (#1845)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Nov 27, 2024
1 parent 5ed7a70 commit 95c920f
Showing 1 changed file with 20 additions and 8 deletions.
28 changes: 20 additions & 8 deletions shotover/src/transforms/cassandra/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,15 +476,27 @@ impl CassandraSinkCluster {
// Close all other connections as they are now invalidated.
// They will be recreated as needed with the correct use statement used automatically after the handshake
for node in self.pool.nodes_mut() {
node.outbound = None;
if let Some(outbound) = &mut node.outbound {
// Flush all pending responses first, otherwise they would be lost.
// This is bad for performance but USE statements are bad practice anyway.
outbound
.recv_all_pending(responses, self.version.unwrap())
.await
.map_err(|_| anyhow!("Failed to recv_all_pending"))?;

// close the connection
node.outbound = None;

// TODO:
// Sending the use statement to these connections to keep them alive instead is possible but tricky.
// 1. The destinations need to be calculated here, at sending time, to ensure no new connections have been created in the meantime.
// 2. We need a way to filter out these extra responses from reaching the client.
// 3. But we cant use the TableRewrite abstraction since that occurs too early. See 1.
//
// I think the best way forward to achieve this would be to first perform the refactors listed in
// https://github.com/shotover/shotover-proxy/issues/1844
}
}

// Sending the use statement to these connections to keep them alive instead is possible but tricky.
// 1. The destinations need to be calculated here, at sending time, to ensure no new connections have been created in the meantime.
// 2. We need a way to filter out these extra responses from reaching the client.
// 3. But we cant use the TableRewrite abstraction since that occurs too early. See 1.
//
// It might be worth doing in the future.
} else if is_prepare_message(&mut message) {
let next_host_id = self.message_rewriter.get_destination_for_prepare(&message);
match self
Expand Down

0 comments on commit 95c920f

Please sign in to comment.