From 38817c5922cff73e9e48867cba06992f48d2c2ee Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Tue, 26 Nov 2024 13:48:15 +1100 Subject: [PATCH] CassandraSinkCluster fix lost messages --- .../transforms/cassandra/sink_cluster/mod.rs | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/shotover/src/transforms/cassandra/sink_cluster/mod.rs b/shotover/src/transforms/cassandra/sink_cluster/mod.rs index 1d3695fa5..c401f5082 100644 --- a/shotover/src/transforms/cassandra/sink_cluster/mod.rs +++ b/shotover/src/transforms/cassandra/sink_cluster/mod.rs @@ -476,15 +476,18 @@ impl CassandraSinkCluster { // Close all other connections as they are now invalidated. // They will be recreated as needed with the correct use statement used automatically after the handshake for node in self.pool.nodes_mut() { - node.outbound = None; - } + if let Some(outbound) = &mut node.outbound { + // Flush all pending responses first, otherwise they would be lost. + // This is bad for performance but USE statements are bad practice anyway. + outbound + .recv_all_pending(responses, self.version.unwrap()) + .await + .map_err(|_| anyhow!("Failed to recv_all_pending"))?; - // Sending the use statement to these connections to keep them alive instead is possible but tricky. - // 1. The destinations need to be calculated here, at sending time, to ensure no new connections have been created in the meantime. - // 2. We need a way to filter out these extra responses from reaching the client. - // 3. But we cant use the TableRewrite abstraction since that occurs too early. See 1. - // - // It might be worth doing in the future. + // close the connection + node.outbound = None; + } + } } else if is_prepare_message(&mut message) { let next_host_id = self.message_rewriter.get_destination_for_prepare(&message); match self