From aea08e69a8a0a3e00b693a6e3a485c06624d1f91 Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Fri, 1 Nov 2024 09:40:52 +1100 Subject: [PATCH] Improved fix --- .../tests/cassandra_int_tests/mod.rs | 12 +---- .../cassandra/sink_cluster/node_pool.rs | 45 +++++++++---------- .../cassandra/sink_cluster/rewrite.rs | 9 +++- .../cassandra/sink_cluster/test_router.rs | 10 ++--- 4 files changed, 35 insertions(+), 41 deletions(-) diff --git a/shotover-proxy/tests/cassandra_int_tests/mod.rs b/shotover-proxy/tests/cassandra_int_tests/mod.rs index 5fe7a66ba..f9f043c68 100644 --- a/shotover-proxy/tests/cassandra_int_tests/mod.rs +++ b/shotover-proxy/tests/cassandra_int_tests/mod.rs @@ -497,17 +497,7 @@ async fn cluster_multi_rack_2_per_rack_go_smoke_test() { test_helpers::connection::cassandra::go::run_go_smoke_test().await; - // gocql driver will route execute requests to its control connection during initialization which results in out of rack requests. - // This warning is correctly triggered in that case. - // The warning occurs only in rack1, gocql driver always picks rack 1 for its control connection - shotover_rack1 - .shutdown_and_then_consume_events(&[EventMatcher::new() - .with_level(Level::Warn) - .with_target("shotover::transforms::cassandra::sink_cluster::node_pool") - .with_message("No suitable nodes to route to found within rack. This error only occurs in debug builds as it should never occur in an ideal integration test situation.") - .with_count(Count::Any) - ]) - .await; + shotover_rack1.shutdown_and_then_consume_events(&[]).await; shotover_rack2.shutdown_and_then_consume_events(&[]).await; shotover_rack3.shutdown_and_then_consume_events(&[]).await; } diff --git a/shotover/src/transforms/cassandra/sink_cluster/node_pool.rs b/shotover/src/transforms/cassandra/sink_cluster/node_pool.rs index 9852fd7af..41ee5c2ac 100644 --- a/shotover/src/transforms/cassandra/sink_cluster/node_pool.rs +++ b/shotover/src/transforms/cassandra/sink_cluster/node_pool.rs @@ -72,6 +72,13 @@ pub struct NodePool { keyspace_metadata: HashMap, token_map: TokenRing, nodes: Vec, + #[allow( + dead_code, + reason = r#"The code path that previously incremented this has been removed, +so from the perspective of the old logic this is no longer possible. +However I have kept the metric so that we can replace it with a system similar to KafkaSinkCluster where we +run logic just before sending out the message to detect out-of-rack requests regardless of the routing logic"# + )] out_of_rack_requests: Counter, } @@ -170,8 +177,7 @@ impl NodePool { } /// Get a token routed replica node for the supplied execute message (if exists) - /// Will attempt to get a replica in the supplied rack if exists, otherwise get one in - /// the same data center + /// Will only return replicas in the supplied rack. pub async fn get_replica_node_in_dc( &mut self, execute: &BodyReqExecuteOwned, @@ -212,34 +218,22 @@ impl NodePool { let mut nodes: Vec<&mut CassandraNode> = self .nodes .iter_mut() - .filter(|node| replica_host_ids.contains(&node.host_id) && node.is_up) + .filter(|node| { + replica_host_ids.contains(&node.host_id) && node.is_up && node.rack == *rack + }) .collect(); nodes.shuffle(rng); - // Move all nodes that are in the rack to the front of the list. - // This way they will be preferred over all other nodes - let mut nodes_found_in_rack = 0; - for i in 0..nodes.len() { - if nodes[i].rack == rack { - nodes.swap(i, nodes_found_in_rack); - nodes_found_in_rack += 1; - } - } - if nodes_found_in_rack == 0 { - // An execute message is being delivered outside of CassandraSinkCluster's designated rack. The only cases this can occur is when: - // The client correctly routes to the shotover node that reports it has the token in its rack, however the destination cassandra node has since gone down and is now inaccessible. - // or - // ReplicationStrategy::SimpleStrategy is used with a replication factor > 1 - // or - // The clients token aware routing is broken. - #[cfg(debug_assertions)] - tracing::warn!("No suitable nodes to route to found within rack. This error only occurs in debug builds as it should never occur in an ideal integration test situation."); - self.out_of_rack_requests.increment(1); - } tracing::debug!( "Shotover with designated rack {rack:?} found replica nodes {replica_host_ids:?}" ); + if nodes.is_empty() { + return Err(GetReplicaErr::NoNodeAvailable(anyhow!( + "No nodes within rack {rack:?} could be routed to" + ))); + } + Ok(nodes) } @@ -285,6 +279,11 @@ pub async fn get_accessible_node<'a>( nodes: Vec<&'a mut CassandraNode>, ) -> Result<&'a mut CassandraNode> { let mut errors = vec![]; + + if nodes.is_empty() { + return Err(anyhow!("No suitable nodes to route to")); + } + for node in nodes { match node.get_connection(connection_factory).await { Ok(_) => { diff --git a/shotover/src/transforms/cassandra/sink_cluster/rewrite.rs b/shotover/src/transforms/cassandra/sink_cluster/rewrite.rs index e181deff6..46a5447a8 100644 --- a/shotover/src/transforms/cassandra/sink_cluster/rewrite.rs +++ b/shotover/src/transforms/cassandra/sink_cluster/rewrite.rs @@ -117,9 +117,16 @@ impl MessageRewriter { batch_mode = BatchMode::Isolated; } RewriteTableTy::Prepare { clone_index } => { + if pool.nodes().is_empty() { + // sending no requests would result in no response being sent for the prepare request, + // this would violate transform invariants, so return an error in this case. + return Err(anyhow!( + "Cannot route prepare when all nodes in rack are down" + )); + } let mut first = true; for node in pool.nodes().iter() { - if node.is_up { + if node.is_up && node.rack == self.local_shotover_node.rack { if first { let message_id = messages[*clone_index].id(); self.prepare_requests_to_destination_nodes diff --git a/shotover/src/transforms/cassandra/sink_cluster/test_router.rs b/shotover/src/transforms/cassandra/sink_cluster/test_router.rs index 25f17dec7..cb5aea2f6 100644 --- a/shotover/src/transforms/cassandra/sink_cluster/test_router.rs +++ b/shotover/src/transforms/cassandra/sink_cluster/test_router.rs @@ -56,7 +56,7 @@ mod test_token_aware_router { .add_prepared_result(id.clone(), prepared_metadata()) .await; - for (pk, test_token, rack_replicas, dc_replicas) in test_data() { + for (pk, test_token, rack_replicas, _dc_replicas) in test_data() { let query_parameters = QueryParams { consistency: One, with_names: false, @@ -83,14 +83,12 @@ mod test_token_aware_router { "rack1", &mut rng, ) - .await - .unwrap() - .remove(0); + .await; if !rack_replicas.is_empty() { - assert!(rack_replicas.contains(&node.address)); + assert!(rack_replicas.contains(&node.unwrap()[0].address)); } else { - assert!(dc_replicas.contains(&node.address)); + node.unwrap_err(); } } }