From 8e1f7bf69f67d153a740470a0a05c5c0ea5f975b Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Fri, 1 Nov 2024 09:40:52 +1100 Subject: [PATCH] Improved fix --- .../cassandra/sink_cluster/node_pool.rs | 45 +++++++++---------- .../cassandra/sink_cluster/rewrite.rs | 9 +++- .../cassandra/sink_cluster/test_router.rs | 10 ++--- 3 files changed, 34 insertions(+), 30 deletions(-) diff --git a/shotover/src/transforms/cassandra/sink_cluster/node_pool.rs b/shotover/src/transforms/cassandra/sink_cluster/node_pool.rs index 9852fd7af..8ccc0bc9d 100644 --- a/shotover/src/transforms/cassandra/sink_cluster/node_pool.rs +++ b/shotover/src/transforms/cassandra/sink_cluster/node_pool.rs @@ -72,6 +72,13 @@ pub struct NodePool { keyspace_metadata: HashMap, token_map: TokenRing, nodes: Vec, + #[allow( + dead_code, + reason = r#"The code path that previously incremented this has been removed, +so from the perspective of the old logic this is no longer possible. +However I have kept the metric so that we can replace it with a system similar to KafkaSinkCluster where we +run logic just before sending out the message to detect out-of-rack requests regardless of the routing logic"# + )] out_of_rack_requests: Counter, } @@ -170,8 +177,7 @@ impl NodePool { } /// Get a token routed replica node for the supplied execute message (if exists) - /// Will attempt to get a replica in the supplied rack if exists, otherwise get one in - /// the same data center + /// Will only return replcias in the supplied rack. pub async fn get_replica_node_in_dc( &mut self, execute: &BodyReqExecuteOwned, @@ -212,34 +218,22 @@ impl NodePool { let mut nodes: Vec<&mut CassandraNode> = self .nodes .iter_mut() - .filter(|node| replica_host_ids.contains(&node.host_id) && node.is_up) + .filter(|node| { + replica_host_ids.contains(&node.host_id) && node.is_up && node.rack == *rack + }) .collect(); nodes.shuffle(rng); - // Move all nodes that are in the rack to the front of the list. - // This way they will be preferred over all other nodes - let mut nodes_found_in_rack = 0; - for i in 0..nodes.len() { - if nodes[i].rack == rack { - nodes.swap(i, nodes_found_in_rack); - nodes_found_in_rack += 1; - } - } - if nodes_found_in_rack == 0 { - // An execute message is being delivered outside of CassandraSinkCluster's designated rack. The only cases this can occur is when: - // The client correctly routes to the shotover node that reports it has the token in its rack, however the destination cassandra node has since gone down and is now inaccessible. - // or - // ReplicationStrategy::SimpleStrategy is used with a replication factor > 1 - // or - // The clients token aware routing is broken. - #[cfg(debug_assertions)] - tracing::warn!("No suitable nodes to route to found within rack. This error only occurs in debug builds as it should never occur in an ideal integration test situation."); - self.out_of_rack_requests.increment(1); - } tracing::debug!( "Shotover with designated rack {rack:?} found replica nodes {replica_host_ids:?}" ); + if nodes.is_empty() { + return Err(GetReplicaErr::NoNodeAvailable(anyhow!( + "No nodes within rack {rack:?} could be routed to" + ))); + } + Ok(nodes) } @@ -285,6 +279,11 @@ pub async fn get_accessible_node<'a>( nodes: Vec<&'a mut CassandraNode>, ) -> Result<&'a mut CassandraNode> { let mut errors = vec![]; + + if nodes.is_empty() { + return Err(anyhow!("No suitable nodes to route to")); + } + for node in nodes { match node.get_connection(connection_factory).await { Ok(_) => { diff --git a/shotover/src/transforms/cassandra/sink_cluster/rewrite.rs b/shotover/src/transforms/cassandra/sink_cluster/rewrite.rs index e181deff6..46a5447a8 100644 --- a/shotover/src/transforms/cassandra/sink_cluster/rewrite.rs +++ b/shotover/src/transforms/cassandra/sink_cluster/rewrite.rs @@ -117,9 +117,16 @@ impl MessageRewriter { batch_mode = BatchMode::Isolated; } RewriteTableTy::Prepare { clone_index } => { + if pool.nodes().is_empty() { + // sending no requests would result in no response being sent for the prepare request, + // this would violate transform invariants, so return an error in this case. + return Err(anyhow!( + "Cannot route prepare when all nodes in rack are down" + )); + } let mut first = true; for node in pool.nodes().iter() { - if node.is_up { + if node.is_up && node.rack == self.local_shotover_node.rack { if first { let message_id = messages[*clone_index].id(); self.prepare_requests_to_destination_nodes diff --git a/shotover/src/transforms/cassandra/sink_cluster/test_router.rs b/shotover/src/transforms/cassandra/sink_cluster/test_router.rs index 25f17dec7..cb5aea2f6 100644 --- a/shotover/src/transforms/cassandra/sink_cluster/test_router.rs +++ b/shotover/src/transforms/cassandra/sink_cluster/test_router.rs @@ -56,7 +56,7 @@ mod test_token_aware_router { .add_prepared_result(id.clone(), prepared_metadata()) .await; - for (pk, test_token, rack_replicas, dc_replicas) in test_data() { + for (pk, test_token, rack_replicas, _dc_replicas) in test_data() { let query_parameters = QueryParams { consistency: One, with_names: false, @@ -83,14 +83,12 @@ mod test_token_aware_router { "rack1", &mut rng, ) - .await - .unwrap() - .remove(0); + .await; if !rack_replicas.is_empty() { - assert!(rack_replicas.contains(&node.address)); + assert!(rack_replicas.contains(&node.unwrap()[0].address)); } else { - assert!(dc_replicas.contains(&node.address)); + node.unwrap_err(); } } }