Skip to content

Commit

Permalink
Cassandra optimize ring walking (#1428)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Jan 22, 2024
1 parent 71c024e commit 506eb62
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 22 deletions.
2 changes: 1 addition & 1 deletion shotover/src/transforms/cassandra/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ mod rewrite;
mod routing_key;
#[cfg(test)]
mod test_router;
mod token_map;
mod token_ring;
pub mod topology;

pub type KeyspaceChanTx = watch::Sender<HashMap<String, KeyspaceMetadata>>;
Expand Down
8 changes: 4 additions & 4 deletions shotover/src/transforms/cassandra/sink_cluster/node_pool.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::node::{CassandraNode, ConnectionFactory};
use super::routing_key::calculate_routing_key;
use super::token_map::TokenMap;
use super::token_ring::TokenRing;
use super::KeyspaceChanRx;
use crate::transforms::cassandra::connection::CassandraConnection;
use anyhow::{anyhow, Context, Error, Result};
Expand Down Expand Up @@ -60,7 +60,7 @@ impl NodePoolBuilder {
NodePool {
prepared_metadata: self.prepared_metadata.clone(),
keyspace_metadata: HashMap::new(),
token_map: TokenMap::new(&[]),
token_map: TokenRing::new(&[]),
nodes: vec![],
out_of_rack_requests: self.out_of_rack_requests.clone(),
}
Expand All @@ -70,7 +70,7 @@ impl NodePoolBuilder {
pub struct NodePool {
prepared_metadata: Arc<RwLock<HashMap<CBytesShort, Arc<PreparedMetadata>>>>,
keyspace_metadata: HashMap<String, KeyspaceMetadata>,
token_map: TokenMap,
token_map: TokenRing,
nodes: Vec<CassandraNode>,
out_of_rack_requests: Counter,
}
Expand Down Expand Up @@ -99,7 +99,7 @@ impl NodePool {
}
}
self.nodes = new_nodes;
self.token_map = TokenMap::new(self.nodes.as_slice());
self.token_map = TokenRing::new(self.nodes.as_slice());
tracing::debug!(
"nodes updated, nodes={:#?}\ntokens={:#?}",
self.nodes,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,46 @@
use crate::transforms::cassandra::sink_cluster::CassandraNode;
use cassandra_protocol::token::Murmur3Token;
use std::collections::BTreeMap;
use uuid::Uuid;

use super::node_pool::{KeyspaceMetadata, ReplicationStrategy};

#[derive(Debug, Clone)]
pub struct TokenMap {
token_ring: BTreeMap<Murmur3Token, Uuid>,
pub struct TokenRing {
ring_in: Vec<Murmur3Token>,
ring_out: Vec<Uuid>,
}

impl TokenMap {
impl TokenRing {
pub fn new(nodes: &[CassandraNode]) -> Self {
TokenMap {
token_ring: nodes
.iter()
.flat_map(|node| node.tokens.iter().map(|token| (*token, node.host_id)))
.collect(),
}
let mut ring: Vec<_> = nodes
.iter()
.flat_map(|node| node.tokens.iter().map(|token| (*token, node.host_id)))
.collect();
ring.sort_by(|a, b| a.0.cmp(&b.0));

// Split ring into ring_in and ring_out as its faster to search and retrive seperately
let ring_in: Vec<_> = ring.iter().map(|node| node.0).collect();
let ring_out: Vec<_> = ring.iter().map(|node| node.1).collect();

TokenRing { ring_in, ring_out }
}

/// Provides an iterator over the ring members starting at the given token.
/// The iterator traverses the whole ring in the direction of increasing tokens.
/// After reaching the maximum token it wraps around and continues from the lowest one.
/// The iterator visits each member once, it doesn't have infinite length.
pub fn ring_range(&self, token: Murmur3Token) -> impl Iterator<Item = Uuid> + '_ {
let binary_search_index: usize = match self.ring_in.binary_search_by(|e| e.cmp(&token)) {
Ok(exact_match_index) => exact_match_index,
Err(first_greater_index) => first_greater_index,
};

self.ring_out
.iter()
.skip(binary_search_index)
.chain(self.ring_out.iter())
.copied()
.take(self.ring_out.len())
}

/// Walk the token ring to figure out which nodes are acting as replicas for the given query.
Expand All @@ -36,13 +59,11 @@ impl TokenMap {
keyspace: &'a KeyspaceMetadata,
) -> impl Iterator<Item = Uuid> + '_ {
let mut racks_used = vec![];
self.token_ring
.range(token_from_key..)
.chain(self.token_ring.iter())
.filter(move |(_, host_id)| {
self.ring_range(token_from_key)
.filter(move |host_id| {
if let ReplicationStrategy::NetworkTopologyStrategy = keyspace.replication_strategy
{
let rack = &nodes.iter().find(|x| x.host_id == **host_id).unwrap().rack;
let rack = &nodes.iter().find(|x| x.host_id == *host_id).unwrap().rack;
if racks_used.contains(&rack) {
false
} else {
Expand All @@ -54,7 +75,6 @@ impl TokenMap {
}
})
.take(keyspace.replication_factor)
.map(|(_, node)| *node)
}
}

Expand Down Expand Up @@ -122,7 +142,7 @@ mod test_token_map {
}

fn verify_tokens(node_host_ids: &[Uuid], token: Murmur3Token) {
let token_map = TokenMap::new(prepare_nodes().as_slice());
let token_map = TokenRing::new(prepare_nodes().as_slice());
let nodes = token_map
.iter_replica_nodes(
&[
Expand Down

0 comments on commit 506eb62

Please sign in to comment.