Skip to content

Commit

Permalink
Improved fix
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Oct 31, 2024
1 parent a55d04e commit 8e1f7bf
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 30 deletions.
45 changes: 22 additions & 23 deletions shotover/src/transforms/cassandra/sink_cluster/node_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@ pub struct NodePool {
keyspace_metadata: HashMap<String, KeyspaceMetadata>,
token_map: TokenRing,
nodes: Vec<CassandraNode>,
#[allow(
dead_code,
reason = r#"The code path that previously incremented this has been removed,
so from the perspective of the old logic this is no longer possible.
However I have kept the metric so that we can replace it with a system similar to KafkaSinkCluster where we
run logic just before sending out the message to detect out-of-rack requests regardless of the routing logic"#
)]
out_of_rack_requests: Counter,
}

Expand Down Expand Up @@ -170,8 +177,7 @@ impl NodePool {
}

/// Get a token routed replica node for the supplied execute message (if exists)
/// Will attempt to get a replica in the supplied rack if exists, otherwise get one in
/// the same data center
/// Will only return replcias in the supplied rack.
pub async fn get_replica_node_in_dc(
&mut self,
execute: &BodyReqExecuteOwned,
Expand Down Expand Up @@ -212,34 +218,22 @@ impl NodePool {
let mut nodes: Vec<&mut CassandraNode> = self
.nodes
.iter_mut()
.filter(|node| replica_host_ids.contains(&node.host_id) && node.is_up)
.filter(|node| {
replica_host_ids.contains(&node.host_id) && node.is_up && node.rack == *rack
})
.collect();
nodes.shuffle(rng);

// Move all nodes that are in the rack to the front of the list.
// This way they will be preferred over all other nodes
let mut nodes_found_in_rack = 0;
for i in 0..nodes.len() {
if nodes[i].rack == rack {
nodes.swap(i, nodes_found_in_rack);
nodes_found_in_rack += 1;
}
}
if nodes_found_in_rack == 0 {
// An execute message is being delivered outside of CassandraSinkCluster's designated rack. The only cases this can occur is when:
// The client correctly routes to the shotover node that reports it has the token in its rack, however the destination cassandra node has since gone down and is now inaccessible.
// or
// ReplicationStrategy::SimpleStrategy is used with a replication factor > 1
// or
// The clients token aware routing is broken.
#[cfg(debug_assertions)]
tracing::warn!("No suitable nodes to route to found within rack. This error only occurs in debug builds as it should never occur in an ideal integration test situation.");
self.out_of_rack_requests.increment(1);
}
tracing::debug!(
"Shotover with designated rack {rack:?} found replica nodes {replica_host_ids:?}"
);

if nodes.is_empty() {
return Err(GetReplicaErr::NoNodeAvailable(anyhow!(
"No nodes within rack {rack:?} could be routed to"
)));
}

Ok(nodes)
}

Expand Down Expand Up @@ -285,6 +279,11 @@ pub async fn get_accessible_node<'a>(
nodes: Vec<&'a mut CassandraNode>,
) -> Result<&'a mut CassandraNode> {
let mut errors = vec![];

if nodes.is_empty() {
return Err(anyhow!("No suitable nodes to route to"));
}

for node in nodes {
match node.get_connection(connection_factory).await {
Ok(_) => {
Expand Down
9 changes: 8 additions & 1 deletion shotover/src/transforms/cassandra/sink_cluster/rewrite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,16 @@ impl MessageRewriter {
batch_mode = BatchMode::Isolated;
}
RewriteTableTy::Prepare { clone_index } => {
if pool.nodes().is_empty() {
// sending no requests would result in no response being sent for the prepare request,
// this would violate transform invariants, so return an error in this case.
return Err(anyhow!(
"Cannot route prepare when all nodes in rack are down"
));
}
let mut first = true;
for node in pool.nodes().iter() {
if node.is_up {
if node.is_up && node.rack == self.local_shotover_node.rack {
if first {
let message_id = messages[*clone_index].id();
self.prepare_requests_to_destination_nodes
Expand Down
10 changes: 4 additions & 6 deletions shotover/src/transforms/cassandra/sink_cluster/test_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ mod test_token_aware_router {
.add_prepared_result(id.clone(), prepared_metadata())
.await;

for (pk, test_token, rack_replicas, dc_replicas) in test_data() {
for (pk, test_token, rack_replicas, _dc_replicas) in test_data() {
let query_parameters = QueryParams {
consistency: One,
with_names: false,
Expand All @@ -83,14 +83,12 @@ mod test_token_aware_router {
"rack1",
&mut rng,
)
.await
.unwrap()
.remove(0);
.await;

if !rack_replicas.is_empty() {
assert!(rack_replicas.contains(&node.address));
assert!(rack_replicas.contains(&node.unwrap()[0].address));
} else {
assert!(dc_replicas.contains(&node.address));
node.unwrap_err();
}
}
}
Expand Down

0 comments on commit 8e1f7bf

Please sign in to comment.