From 95c920f853a1191f6725c565b1baf08882426c0e Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Wed, 27 Nov 2024 11:24:10 +1100 Subject: [PATCH] CassandraSinkCluster fix lost messages (#1845) --- .../transforms/cassandra/sink_cluster/mod.rs | 28 +++++++++++++------ 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/shotover/src/transforms/cassandra/sink_cluster/mod.rs b/shotover/src/transforms/cassandra/sink_cluster/mod.rs index 1d3695fa5..772794848 100644 --- a/shotover/src/transforms/cassandra/sink_cluster/mod.rs +++ b/shotover/src/transforms/cassandra/sink_cluster/mod.rs @@ -476,15 +476,27 @@ impl CassandraSinkCluster { // Close all other connections as they are now invalidated. // They will be recreated as needed with the correct use statement used automatically after the handshake for node in self.pool.nodes_mut() { - node.outbound = None; + if let Some(outbound) = &mut node.outbound { + // Flush all pending responses first, otherwise they would be lost. + // This is bad for performance but USE statements are bad practice anyway. + outbound + .recv_all_pending(responses, self.version.unwrap()) + .await + .map_err(|_| anyhow!("Failed to recv_all_pending"))?; + + // close the connection + node.outbound = None; + + // TODO: + // Sending the use statement to these connections to keep them alive instead is possible but tricky. + // 1. The destinations need to be calculated here, at sending time, to ensure no new connections have been created in the meantime. + // 2. We need a way to filter out these extra responses from reaching the client. + // 3. But we cant use the TableRewrite abstraction since that occurs too early. See 1. + // + // I think the best way forward to achieve this would be to first perform the refactors listed in + // https://github.com/shotover/shotover-proxy/issues/1844 + } } - - // Sending the use statement to these connections to keep them alive instead is possible but tricky. - // 1. The destinations need to be calculated here, at sending time, to ensure no new connections have been created in the meantime. - // 2. We need a way to filter out these extra responses from reaching the client. - // 3. But we cant use the TableRewrite abstraction since that occurs too early. See 1. - // - // It might be worth doing in the future. } else if is_prepare_message(&mut message) { let next_host_id = self.message_rewriter.get_destination_for_prepare(&message); match self