Skip to content

Commit

Permalink
Merge pull request #24 from nihohit/round-robin
Browse files Browse the repository at this point in the history
Implement round robin read from replica strategy for async cluster.
  • Loading branch information
nihohit authored Sep 21, 2023
2 parents bfdf588 + 350dfc5 commit ca6b9cb
Show file tree
Hide file tree
Showing 7 changed files with 508 additions and 255 deletions.
16 changes: 9 additions & 7 deletions redis/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ pub struct ClusterConnection<C = Connection> {
connections: RefCell<HashMap<String, C>>,
slots: RefCell<SlotMap>,
auto_reconnect: RefCell<bool>,
read_from_replicas: bool,
read_timeout: RefCell<Option<Duration>>,
write_timeout: RefCell<Option<Duration>>,
cluster_params: ClusterParams,
Expand All @@ -144,9 +143,8 @@ where
) -> RedisResult<Self> {
let connection = Self {
connections: RefCell::new(HashMap::new()),
slots: RefCell::new(SlotMap::new(vec![])),
slots: RefCell::new(SlotMap::new(vec![], cluster_params.read_from_replicas)),
auto_reconnect: RefCell::new(true),
read_from_replicas: cluster_params.read_from_replicas,
cluster_params,
read_timeout: RefCell::new(None),
write_timeout: RefCell::new(None),
Expand Down Expand Up @@ -298,7 +296,9 @@ where
)));
for conn in samples.iter_mut() {
let value = conn.req_command(&slot_cmd())?;
match parse_slots(&value, self.cluster_params.tls).map(SlotMap::new) {
match parse_slots(&value, self.cluster_params.tls)
.map(|slots_data| SlotMap::new(slots_data, self.cluster_params.read_from_replicas))
{
Ok(new_slots) => {
result = Ok(new_slots);
break;
Expand All @@ -313,7 +313,9 @@ where
let info = get_connection_info(node, self.cluster_params.clone())?;

let mut conn = C::connect(info, Some(self.cluster_params.connection_timeout))?;
if self.read_from_replicas {
if self.cluster_params.read_from_replicas
!= crate::cluster_topology::ReadFromReplicaStrategy::AlwaysFromPrimary
{
// If READONLY is sent to primary nodes, it will have no effect
cmd("READONLY").query(&mut conn)?;
}
Expand Down Expand Up @@ -365,7 +367,7 @@ where
Ok(slot_addr.to_string())
};

match RoutingInfo::for_routable(cmd, self.read_from_replicas) {
match RoutingInfo::for_routable(cmd) {
Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random)) => {
let mut rng = thread_rng();
Ok(addr_for_slot(Route::new(
Expand Down Expand Up @@ -431,7 +433,7 @@ where
T: MergeResults + std::fmt::Debug,
F: FnMut(&mut C) -> RedisResult<T>,
{
let route = match RoutingInfo::for_routable(cmd, self.read_from_replicas) {
let route = match RoutingInfo::for_routable(cmd) {
Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random)) => None,
Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::SpecificNode(route))) => {
Some(route)
Expand Down
155 changes: 112 additions & 43 deletions redis/src/cluster_async/connections_container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::net::IpAddr;
use rand::seq::IteratorRandom;

use crate::cluster_routing::{MultipleNodeRoutingInfo, Route, SlotAddr};
use crate::cluster_topology::SlotMap;
use crate::cluster_topology::{ReadFromReplicaStrategy, SlotMap, SlotMapValue};

type IdentifierType = String;

Expand All @@ -30,13 +30,15 @@ pub(crate) struct Identifier(IdentifierType);
pub(crate) struct ConnectionsContainer<Connection> {
connection_map: HashMap<Identifier, Option<ClusterNode<Connection>>>,
slot_map: SlotMap,
read_from_replica_strategy: ReadFromReplicaStrategy,
}

impl<Connection> Default for ConnectionsContainer<Connection> {
fn default() -> Self {
Self {
connection_map: Default::default(),
slot_map: Default::default(),
read_from_replica_strategy: ReadFromReplicaStrategy::AlwaysFromPrimary,
}
}
}
Expand All @@ -50,27 +52,74 @@ where
pub(crate) fn new(
slot_map: SlotMap,
connection_map: HashMap<String, ClusterNode<Connection>>,
read_from_replica_strategy: ReadFromReplicaStrategy,
) -> Self {
Self {
connection_map: connection_map
.into_iter()
.map(|(address, node)| (Identifier(address), Some(node)))
.collect(),
slot_map,
read_from_replica_strategy,
}
}

fn round_robin_read_from_replica(
&self,
slot_map_value: &SlotMapValue,
) -> Option<ConnectionAndIdentifier<Connection>> {
let addrs = &slot_map_value.addrs;
let initial_index = slot_map_value
.latest_used_replica
.load(std::sync::atomic::Ordering::Relaxed);
let mut check_count = 0;
loop {
check_count += 1;

// Looped through all replicas, no connected replica was found.
if check_count > addrs.replicas.len() {
return self.connection_for_address(addrs.primary.as_str());
}
let index = (initial_index + check_count) % addrs.replicas.len();
if let Some(connection) = self.connection_for_address(addrs.replicas[index].as_str()) {
let _ = slot_map_value.latest_used_replica.compare_exchange_weak(
initial_index,
index,
std::sync::atomic::Ordering::Relaxed,
std::sync::atomic::Ordering::Relaxed,
);
return Some(connection);
}
}
}

fn lookup_route(&self, route: &Route) -> Option<ConnectionAndIdentifier<Connection>> {
let address = self.slot_map.slot_addr_for_route(route)?;
self.connection_for_address(address)
let slot_map_value = self.slot_map.slot_value_for_route(route)?;
let addrs = &slot_map_value.addrs;
if addrs.replicas.is_empty() {
return self.connection_for_address(addrs.primary.as_str());
}

match route.slot_addr() {
SlotAddr::Master => self.connection_for_address(addrs.primary.as_str()),
SlotAddr::ReplicaOptional => match self.read_from_replica_strategy {
ReadFromReplicaStrategy::AlwaysFromPrimary => {
self.connection_for_address(addrs.primary.as_str())
}
ReadFromReplicaStrategy::RoundRobin => {
self.round_robin_read_from_replica(slot_map_value)
}
},
SlotAddr::ReplicaRequired => self.round_robin_read_from_replica(slot_map_value),
}
}

pub(crate) fn connection_for_route(
&self,
route: &Route,
) -> Option<ConnectionAndIdentifier<Connection>> {
self.lookup_route(route).or_else(|| {
if route.slot_addr() == SlotAddr::Replica {
if route.slot_addr() != SlotAddr::Master {
self.lookup_route(&Route::new(route.slot(), SlotAddr::Master))
} else {
None
Expand Down Expand Up @@ -209,22 +258,27 @@ mod tests {
expected_connections.contains(&found)
}

fn create_container() -> ConnectionsContainer<usize> {
let slot_map = SlotMap::new(vec![
Slot::new(1, 1000, "primary1".to_owned(), Vec::new()),
Slot::new(
1002,
2000,
"primary2".to_owned(),
vec!["replica2-1".to_owned()],
),
Slot::new(
2001,
3000,
"primary3".to_owned(),
vec!["replica3-1".to_owned(), "replica3-2".to_owned()],
),
]);
fn create_container_with_strategy(
stragey: ReadFromReplicaStrategy,
) -> ConnectionsContainer<usize> {
let slot_map = SlotMap::new(
vec![
Slot::new(1, 1000, "primary1".to_owned(), Vec::new()),
Slot::new(
1002,
2000,
"primary2".to_owned(),
vec!["replica2-1".to_owned()],
),
Slot::new(
2001,
3000,
"primary3".to_owned(),
vec!["replica3-1".to_owned(), "replica3-2".to_owned()],
),
],
ReadFromReplicaStrategy::AlwaysFromPrimary, // this argument shouldn't matter, since we overload the RFR strategy.
);
let mut connection_map = HashMap::new();
connection_map.insert(
Identifier("primary1".into()),
Expand Down Expand Up @@ -254,9 +308,14 @@ mod tests {
ConnectionsContainer {
slot_map,
connection_map,
read_from_replica_strategy: stragey,
}
}

fn create_container() -> ConnectionsContainer<usize> {
create_container_with_strategy(ReadFromReplicaStrategy::RoundRobin)
}

#[test]
fn get_connection_for_primary_route() {
let container = create_container();
Expand Down Expand Up @@ -315,27 +374,27 @@ mod tests {
let container = create_container();

assert!(container
.connection_for_route(&Route::new(1001, SlotAddr::Replica))
.connection_for_route(&Route::new(1001, SlotAddr::ReplicaOptional))
.is_none());

assert_eq!(
21,
container
.connection_for_route(&Route::new(1002, SlotAddr::Replica))
.connection_for_route(&Route::new(1002, SlotAddr::ReplicaOptional))
.unwrap()
.1
);

assert_eq!(
21,
container
.connection_for_route(&Route::new(1500, SlotAddr::Replica))
.connection_for_route(&Route::new(1500, SlotAddr::ReplicaOptional))
.unwrap()
.1
);

assert!(one_of(
container.connection_for_route(&Route::new(2001, SlotAddr::Replica)),
container.connection_for_route(&Route::new(2001, SlotAddr::ReplicaOptional)),
&[31, 32],
));
}
Expand All @@ -345,40 +404,50 @@ mod tests {
let container = create_container();

assert!(container
.connection_for_route(&Route::new(0, SlotAddr::Replica))
.connection_for_route(&Route::new(0, SlotAddr::ReplicaOptional))
.is_none());

assert_eq!(
1,
container
.connection_for_route(&Route::new(500, SlotAddr::Replica))
.connection_for_route(&Route::new(500, SlotAddr::ReplicaOptional))
.unwrap()
.1
);

assert_eq!(
1,
container
.connection_for_route(&Route::new(1000, SlotAddr::Replica))
.connection_for_route(&Route::new(1000, SlotAddr::ReplicaOptional))
.unwrap()
.1
);
}

// TODO - this is waiting for slot_map to support this.
// #[test]
// fn get_replica_connection_for_replica_route_if_some_but_not_all_replicas_were_removed() {
// let mut container = create_container();
// container.remove_connection(&Identifier("replica3-2".into()));
#[test]
fn get_replica_connection_for_replica_route_if_some_but_not_all_replicas_were_removed() {
let mut container = create_container();
container.remove_connection(&Identifier("replica3-2".into()));

assert_eq!(
31,
container
.connection_for_route(&Route::new(2001, SlotAddr::ReplicaRequired))
.unwrap()
.1
);
}

#[test]
fn get_replica_connection_for_replica_route_if_replica_is_required_even_if_strategy_is_always_from_primary(
) {
let container = create_container_with_strategy(ReadFromReplicaStrategy::AlwaysFromPrimary);

// assert_eq!(
// 31,
// container
// .connection_for_route(&Route::new(2001, SlotAddr::Replica))
// .unwrap()
// .1
// );
// }
assert!(one_of(
container.connection_for_route(&Route::new(2001, SlotAddr::ReplicaRequired)),
&[31, 32],
));
}

#[test]
fn get_primary_connection_for_replica_route_if_all_replicas_were_removed() {
Expand All @@ -388,23 +457,23 @@ mod tests {
assert_eq!(
2,
container
.connection_for_route(&Route::new(1002, SlotAddr::Replica))
.connection_for_route(&Route::new(1002, SlotAddr::ReplicaOptional))
.unwrap()
.1
);

assert_eq!(
2,
container
.connection_for_route(&Route::new(1500, SlotAddr::Replica))
.connection_for_route(&Route::new(1500, SlotAddr::ReplicaOptional))
.unwrap()
.1
);

assert_eq!(
3,
container
.connection_for_route(&Route::new(2001, SlotAddr::Replica))
.connection_for_route(&Route::new(2001, SlotAddr::ReplicaOptional))
.unwrap()
.1
);
Expand Down
Loading

0 comments on commit ca6b9cb

Please sign in to comment.