Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CassandraSinkCluster fix lost messages #1845

Merged
merged 2 commits into from
Nov 27, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 20 additions & 8 deletions shotover/src/transforms/cassandra/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,15 +476,27 @@ impl CassandraSinkCluster {
// Close all other connections as they are now invalidated.
// They will be recreated as needed with the correct use statement used automatically after the handshake
for node in self.pool.nodes_mut() {
node.outbound = None;
if let Some(outbound) = &mut node.outbound {
// Flush all pending responses first, otherwise they would be lost.
// This is bad for performance but USE statements are bad practice anyway.
outbound
.recv_all_pending(responses, self.version.unwrap())
.await
.map_err(|_| anyhow!("Failed to recv_all_pending"))?;

// close the connection
node.outbound = None;

// TODO:
// Sending the use statement to these connections to keep them alive instead is possible but tricky.
// 1. The destinations need to be calculated here, at sending time, to ensure no new connections have been created in the meantime.
// 2. We need a way to filter out these extra responses from reaching the client.
// 3. But we cant use the TableRewrite abstraction since that occurs too early. See 1.
//
// I think the best way forward to achieve this would be to first perform the refactors listed in
// https://github.com/shotover/shotover-proxy/issues/1844
}
}

// Sending the use statement to these connections to keep them alive instead is possible but tricky.
// 1. The destinations need to be calculated here, at sending time, to ensure no new connections have been created in the meantime.
// 2. We need a way to filter out these extra responses from reaching the client.
// 3. But we cant use the TableRewrite abstraction since that occurs too early. See 1.
//
// It might be worth doing in the future.
} else if is_prepare_message(&mut message) {
let next_host_id = self.message_rewriter.get_destination_for_prepare(&message);
match self
Expand Down
Loading