From 506eb626f9b6cda0bd0c878eb0b4c3f47a207503 Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Tue, 23 Jan 2024 08:02:09 +1100 Subject: [PATCH] Cassandra optimize ring walking (#1428) --- .../transforms/cassandra/sink_cluster/mod.rs | 2 +- .../cassandra/sink_cluster/node_pool.rs | 8 +-- .../{token_map.rs => token_ring.rs} | 54 +++++++++++++------ 3 files changed, 42 insertions(+), 22 deletions(-) rename shotover/src/transforms/cassandra/sink_cluster/{token_map.rs => token_ring.rs} (75%) diff --git a/shotover/src/transforms/cassandra/sink_cluster/mod.rs b/shotover/src/transforms/cassandra/sink_cluster/mod.rs index d21b46277..79cae182d 100644 --- a/shotover/src/transforms/cassandra/sink_cluster/mod.rs +++ b/shotover/src/transforms/cassandra/sink_cluster/mod.rs @@ -36,7 +36,7 @@ mod rewrite; mod routing_key; #[cfg(test)] mod test_router; -mod token_map; +mod token_ring; pub mod topology; pub type KeyspaceChanTx = watch::Sender>; diff --git a/shotover/src/transforms/cassandra/sink_cluster/node_pool.rs b/shotover/src/transforms/cassandra/sink_cluster/node_pool.rs index f4be6281b..f269ec0a7 100644 --- a/shotover/src/transforms/cassandra/sink_cluster/node_pool.rs +++ b/shotover/src/transforms/cassandra/sink_cluster/node_pool.rs @@ -1,6 +1,6 @@ use super::node::{CassandraNode, ConnectionFactory}; use super::routing_key::calculate_routing_key; -use super::token_map::TokenMap; +use super::token_ring::TokenRing; use super::KeyspaceChanRx; use crate::transforms::cassandra::connection::CassandraConnection; use anyhow::{anyhow, Context, Error, Result}; @@ -60,7 +60,7 @@ impl NodePoolBuilder { NodePool { prepared_metadata: self.prepared_metadata.clone(), keyspace_metadata: HashMap::new(), - token_map: TokenMap::new(&[]), + token_map: TokenRing::new(&[]), nodes: vec![], out_of_rack_requests: self.out_of_rack_requests.clone(), } @@ -70,7 +70,7 @@ impl NodePoolBuilder { pub struct NodePool { prepared_metadata: Arc>>>, keyspace_metadata: HashMap, - token_map: TokenMap, + token_map: TokenRing, nodes: Vec, out_of_rack_requests: Counter, } @@ -99,7 +99,7 @@ impl NodePool { } } self.nodes = new_nodes; - self.token_map = TokenMap::new(self.nodes.as_slice()); + self.token_map = TokenRing::new(self.nodes.as_slice()); tracing::debug!( "nodes updated, nodes={:#?}\ntokens={:#?}", self.nodes, diff --git a/shotover/src/transforms/cassandra/sink_cluster/token_map.rs b/shotover/src/transforms/cassandra/sink_cluster/token_ring.rs similarity index 75% rename from shotover/src/transforms/cassandra/sink_cluster/token_map.rs rename to shotover/src/transforms/cassandra/sink_cluster/token_ring.rs index bfd4f57fd..72243f5dc 100644 --- a/shotover/src/transforms/cassandra/sink_cluster/token_map.rs +++ b/shotover/src/transforms/cassandra/sink_cluster/token_ring.rs @@ -1,23 +1,46 @@ use crate::transforms::cassandra::sink_cluster::CassandraNode; use cassandra_protocol::token::Murmur3Token; -use std::collections::BTreeMap; use uuid::Uuid; use super::node_pool::{KeyspaceMetadata, ReplicationStrategy}; #[derive(Debug, Clone)] -pub struct TokenMap { - token_ring: BTreeMap, +pub struct TokenRing { + ring_in: Vec, + ring_out: Vec, } -impl TokenMap { +impl TokenRing { pub fn new(nodes: &[CassandraNode]) -> Self { - TokenMap { - token_ring: nodes - .iter() - .flat_map(|node| node.tokens.iter().map(|token| (*token, node.host_id))) - .collect(), - } + let mut ring: Vec<_> = nodes + .iter() + .flat_map(|node| node.tokens.iter().map(|token| (*token, node.host_id))) + .collect(); + ring.sort_by(|a, b| a.0.cmp(&b.0)); + + // Split ring into ring_in and ring_out as its faster to search and retrive seperately + let ring_in: Vec<_> = ring.iter().map(|node| node.0).collect(); + let ring_out: Vec<_> = ring.iter().map(|node| node.1).collect(); + + TokenRing { ring_in, ring_out } + } + + /// Provides an iterator over the ring members starting at the given token. + /// The iterator traverses the whole ring in the direction of increasing tokens. + /// After reaching the maximum token it wraps around and continues from the lowest one. + /// The iterator visits each member once, it doesn't have infinite length. + pub fn ring_range(&self, token: Murmur3Token) -> impl Iterator + '_ { + let binary_search_index: usize = match self.ring_in.binary_search_by(|e| e.cmp(&token)) { + Ok(exact_match_index) => exact_match_index, + Err(first_greater_index) => first_greater_index, + }; + + self.ring_out + .iter() + .skip(binary_search_index) + .chain(self.ring_out.iter()) + .copied() + .take(self.ring_out.len()) } /// Walk the token ring to figure out which nodes are acting as replicas for the given query. @@ -36,13 +59,11 @@ impl TokenMap { keyspace: &'a KeyspaceMetadata, ) -> impl Iterator + '_ { let mut racks_used = vec![]; - self.token_ring - .range(token_from_key..) - .chain(self.token_ring.iter()) - .filter(move |(_, host_id)| { + self.ring_range(token_from_key) + .filter(move |host_id| { if let ReplicationStrategy::NetworkTopologyStrategy = keyspace.replication_strategy { - let rack = &nodes.iter().find(|x| x.host_id == **host_id).unwrap().rack; + let rack = &nodes.iter().find(|x| x.host_id == *host_id).unwrap().rack; if racks_used.contains(&rack) { false } else { @@ -54,7 +75,6 @@ impl TokenMap { } }) .take(keyspace.replication_factor) - .map(|(_, node)| *node) } } @@ -122,7 +142,7 @@ mod test_token_map { } fn verify_tokens(node_host_ids: &[Uuid], token: Murmur3Token) { - let token_map = TokenMap::new(prepare_nodes().as_slice()); + let token_map = TokenRing::new(prepare_nodes().as_slice()); let nodes = token_map .iter_replica_nodes( &[