From 91f8a0bae333db248fc7371493cd916cce7380ba Mon Sep 17 00:00:00 2001 From: Shachar Langbeheim Date: Thu, 31 Aug 2023 12:00:57 +0000 Subject: [PATCH 1/3] Remove `allow_replica` from `for_routable`. --- redis/src/cluster.rs | 4 +-- redis/src/cluster_async/mod.rs | 34 ++++++-------------- redis/src/cluster_routing.rs | 59 ++++++++++++++++------------------ 3 files changed, 40 insertions(+), 57 deletions(-) diff --git a/redis/src/cluster.rs b/redis/src/cluster.rs index a3329c78c..33efc3321 100644 --- a/redis/src/cluster.rs +++ b/redis/src/cluster.rs @@ -365,7 +365,7 @@ where Ok(slot_addr.to_string()) }; - match RoutingInfo::for_routable(cmd, self.read_from_replicas) { + match RoutingInfo::for_routable(cmd) { Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random)) => { let mut rng = thread_rng(); Ok(addr_for_slot(Route::new( @@ -431,7 +431,7 @@ where T: MergeResults + std::fmt::Debug, F: FnMut(&mut C) -> RedisResult, { - let route = match RoutingInfo::for_routable(cmd, self.read_from_replicas) { + let route = match RoutingInfo::for_routable(cmd) { Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random)) => None, Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::SpecificNode(route))) => { Some(route) diff --git a/redis/src/cluster_async/mod.rs b/redis/src/cluster_async/mod.rs index 95bba9c81..8d94e0e1f 100644 --- a/redis/src/cluster_async/mod.rs +++ b/redis/src/cluster_async/mod.rs @@ -87,10 +87,7 @@ use self::connections_container::{ConnectionAndIdentifier, Identifier as Connect /// underlying connections maintained for each node in the cluster, as well /// as common parameters for connecting to nodes and executing commands. #[derive(Clone)] -pub struct ClusterConnection { - sender: mpsc::Sender>, - read_from_replicas: bool, -} +pub struct ClusterConnection(mpsc::Sender>); impl ClusterConnection where @@ -100,7 +97,6 @@ where initial_nodes: &[ConnectionInfo], cluster_params: ClusterParams, ) -> RedisResult> { - let read_from_replicas = cluster_params.read_from_replicas; ClusterConnInner::new(initial_nodes, cluster_params) .await .map(|inner| { @@ -116,10 +112,7 @@ where #[cfg(all(not(feature = "tokio-comp"), feature = "async-std-comp"))] AsyncStd::spawn(stream); - ClusterConnection { - sender: tx, - read_from_replicas, - } + ClusterConnection(tx) }) } @@ -129,15 +122,14 @@ where cmd: &Cmd, routing: Option, ) -> RedisResult { - let allow_replicas = self.read_from_replicas; trace!("send_packed_command"); let (sender, receiver) = oneshot::channel(); - self.sender + self.0 .send(Message { cmd: CmdArg::Cmd { cmd: Arc::new(cmd.clone()), // TODO Remove this clone? routing: CommandRouting::Route( - routing.or_else(|| RoutingInfo::for_routable(cmd, allow_replicas)), + routing.or_else(|| RoutingInfo::for_routable(cmd)), ), }, sender, @@ -171,15 +163,14 @@ where count: usize, route: Option, ) -> RedisResult> { - let allow_replicas = self.read_from_replicas; let (sender, receiver) = oneshot::channel(); - self.sender + self.0 .send(Message { cmd: CmdArg::Pipeline { pipeline: Arc::new(pipeline.clone()), // TODO Remove this clone? offset, count, - route: route.or_else(|| route_pipeline(pipeline, allow_replicas)), + route: route.or_else(|| route_pipeline(pipeline)), }, sender, }) @@ -245,9 +236,9 @@ enum CmdArg { }, } -fn route_pipeline(pipeline: &crate::Pipeline, allow_replica: bool) -> Option { +fn route_pipeline(pipeline: &crate::Pipeline) -> Option { let route_for_command = |cmd| -> Option { - match RoutingInfo::for_routable(cmd, allow_replica) { + match RoutingInfo::for_routable(cmd) { Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random)) => None, Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::SpecificNode(route))) => { Some(route) @@ -1492,14 +1483,9 @@ mod pipeline_routing_tests { .add_command(cmd("EVAL")); // route randomly assert_eq!( - route_pipeline(&pipeline, true), + route_pipeline(&pipeline), Some(Route::new(12182, SlotAddr::Replica)) ); - - assert_eq!( - route_pipeline(&pipeline, false), - Some(Route::new(12182, SlotAddr::Master)) - ); } #[test] @@ -1512,7 +1498,7 @@ mod pipeline_routing_tests { .get("foo"); // route to slot 12182 assert_eq!( - route_pipeline(&pipeline, false), + route_pipeline(&pipeline), Some(Route::new(4813, SlotAddr::Master)) ); } diff --git a/redis/src/cluster_routing.rs b/redis/src/cluster_routing.rs index 62f46a1e7..f16736740 100644 --- a/redis/src/cluster_routing.rs +++ b/redis/src/cluster_routing.rs @@ -285,7 +285,7 @@ impl ResponsePolicy { impl RoutingInfo { /// Returns the routing info for `r`. - pub fn for_routable(r: &R, allow_replica: bool) -> Option + pub fn for_routable(r: &R) -> Option where R: Routable + ?Sized, { @@ -338,8 +338,7 @@ impl RoutingInfo { if key_count == 0 { Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random)) } else { - r.arg_idx(3) - .map(|key| RoutingInfo::for_key(cmd, key, allow_replica)) + r.arg_idx(3).map(|key| RoutingInfo::for_key(cmd, key)) } } b"XGROUP CREATE" @@ -349,24 +348,22 @@ impl RoutingInfo { | b"XGROUP SETID" | b"XINFO CONSUMERS" | b"XINFO GROUPS" - | b"XINFO STREAM" => r - .arg_idx(2) - .map(|key| RoutingInfo::for_key(cmd, key, allow_replica)), + | b"XINFO STREAM" => r.arg_idx(2).map(|key| RoutingInfo::for_key(cmd, key)), b"XREAD" | b"XREADGROUP" => { let streams_position = r.position(b"STREAMS")?; r.arg_idx(streams_position + 1) - .map(|key| RoutingInfo::for_key(cmd, key, allow_replica)) + .map(|key| RoutingInfo::for_key(cmd, key)) } _ => match r.arg_idx(1) { - Some(key) => Some(RoutingInfo::for_key(cmd, key, allow_replica)), + Some(key) => Some(RoutingInfo::for_key(cmd, key)), None => Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random)), }, } } - fn for_key(cmd: &[u8], key: &[u8], allow_replica: bool) -> RoutingInfo { + fn for_key(cmd: &[u8], key: &[u8]) -> RoutingInfo { RoutingInfo::SingleNode(SingleNodeRoutingInfo::SpecificNode(get_route( - allow_replica && is_readonly_cmd(cmd), + is_readonly_cmd(cmd), key, ))) } @@ -658,16 +655,16 @@ mod tests { lower.arg("streams").arg("foo").arg(0); assert_eq!( - RoutingInfo::for_routable(&upper, false).unwrap(), - RoutingInfo::for_routable(&lower, false).unwrap() + RoutingInfo::for_routable(&upper).unwrap(), + RoutingInfo::for_routable(&lower).unwrap() ); let mut mixed = cmd("xReAd"); mixed.arg("StReAmS").arg("foo").arg(0); assert_eq!( - RoutingInfo::for_routable(&lower, false).unwrap(), - RoutingInfo::for_routable(&mixed, false).unwrap() + RoutingInfo::for_routable(&lower).unwrap(), + RoutingInfo::for_routable(&mixed).unwrap() ); } @@ -718,8 +715,8 @@ mod tests { for cmd in test_cmds { let value = parse_redis_value(&cmd.get_packed_command()).unwrap(); assert_eq!( - RoutingInfo::for_routable(&value, false).unwrap(), - RoutingInfo::for_routable(&cmd, false).unwrap(), + RoutingInfo::for_routable(&value).unwrap(), + RoutingInfo::for_routable(&cmd).unwrap(), ); } @@ -727,7 +724,7 @@ mod tests { for cmd in [cmd("FLUSHALL"), cmd("FLUSHDB"), cmd("PING")] { assert_eq!( - RoutingInfo::for_routable(&cmd, false), + RoutingInfo::for_routable(&cmd), Some(RoutingInfo::MultiNode(( MultipleNodeRoutingInfo::AllMasters, Some(ResponsePolicy::AllSucceeded) @@ -736,7 +733,7 @@ mod tests { } assert_eq!( - RoutingInfo::for_routable(&cmd("DBSIZE"), false), + RoutingInfo::for_routable(&cmd("DBSIZE")), Some(RoutingInfo::MultiNode(( MultipleNodeRoutingInfo::AllMasters, Some(ResponsePolicy::Aggregate(AggregateOp::Sum)) @@ -744,7 +741,7 @@ mod tests { ); assert_eq!( - RoutingInfo::for_routable(&cmd("SCRIPT KILL"), false), + RoutingInfo::for_routable(&cmd("SCRIPT KILL")), Some(RoutingInfo::MultiNode(( MultipleNodeRoutingInfo::AllMasters, Some(ResponsePolicy::OneSucceeded) @@ -752,7 +749,7 @@ mod tests { ); assert_eq!( - RoutingInfo::for_routable(&cmd("INFO"), false), + RoutingInfo::for_routable(&cmd("INFO")), Some(RoutingInfo::MultiNode(( MultipleNodeRoutingInfo::AllMasters, Some(ResponsePolicy::Special) @@ -760,7 +757,7 @@ mod tests { ); assert_eq!( - RoutingInfo::for_routable(&cmd("KEYS"), false), + RoutingInfo::for_routable(&cmd("KEYS")), Some(RoutingInfo::MultiNode(( MultipleNodeRoutingInfo::AllMasters, Some(ResponsePolicy::CombineArrays) @@ -776,7 +773,7 @@ mod tests { cmd("BITOP"), ] { assert_eq!( - RoutingInfo::for_routable(&cmd, false), + RoutingInfo::for_routable(&cmd), None, "{}", std::str::from_utf8(cmd.arg_idx(0).unwrap()).unwrap() @@ -788,7 +785,7 @@ mod tests { cmd("EVALSHA").arg(r#"redis.call("PING");"#).arg(0), ] { assert_eq!( - RoutingInfo::for_routable(cmd, false), + RoutingInfo::for_routable(cmd), Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random)) ); } @@ -858,7 +855,7 @@ mod tests { ), ] { assert_eq!( - RoutingInfo::for_routable(cmd, true), + RoutingInfo::for_routable(cmd), expected, "{}", std::str::from_utf8(cmd.arg_idx(0).unwrap()).unwrap() @@ -893,7 +890,7 @@ mod tests { ), ] { assert_eq!( - RoutingInfo::for_routable(cmd, false), + RoutingInfo::for_routable(cmd), expected, "{}", std::str::from_utf8(cmd.arg_idx(0).unwrap()).unwrap() @@ -906,28 +903,28 @@ mod tests { assert!(matches!(RoutingInfo::for_routable(&parse_redis_value(&[ 42, 50, 13, 10, 36, 54, 13, 10, 69, 88, 73, 83, 84, 83, 13, 10, 36, 49, 54, 13, 10, 244, 93, 23, 40, 126, 127, 253, 33, 89, 47, 185, 204, 171, 249, 96, 139, 13, 10 - ]).unwrap(), true), Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::SpecificNode(Route(slot, SlotAddr::Replica)))) if slot == 964)); + ]).unwrap()), Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::SpecificNode(Route(slot, SlotAddr::Replica)))) if slot == 964)); assert!(matches!(RoutingInfo::for_routable(&parse_redis_value(&[ 42, 54, 13, 10, 36, 51, 13, 10, 83, 69, 84, 13, 10, 36, 49, 54, 13, 10, 36, 241, 197, 111, 180, 254, 5, 175, 143, 146, 171, 39, 172, 23, 164, 145, 13, 10, 36, 52, 13, 10, 116, 114, 117, 101, 13, 10, 36, 50, 13, 10, 78, 88, 13, 10, 36, 50, 13, 10, 80, 88, 13, 10, 36, 55, 13, 10, 49, 56, 48, 48, 48, 48, 48, 13, 10 - ]).unwrap(), true), Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::SpecificNode(Route(slot, SlotAddr::Master)))) if slot == 8352)); + ]).unwrap()), Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::SpecificNode(Route(slot, SlotAddr::Master)))) if slot == 8352)); assert!(matches!(RoutingInfo::for_routable(&parse_redis_value(&[ 42, 54, 13, 10, 36, 51, 13, 10, 83, 69, 84, 13, 10, 36, 49, 54, 13, 10, 169, 233, 247, 59, 50, 247, 100, 232, 123, 140, 2, 101, 125, 221, 66, 170, 13, 10, 36, 52, 13, 10, 116, 114, 117, 101, 13, 10, 36, 50, 13, 10, 78, 88, 13, 10, 36, 50, 13, 10, 80, 88, 13, 10, 36, 55, 13, 10, 49, 56, 48, 48, 48, 48, 48, 13, 10 - ]).unwrap(), true), Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::SpecificNode(Route(slot, SlotAddr::Master)))) if slot == 5210)); + ]).unwrap()), Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::SpecificNode(Route(slot, SlotAddr::Master)))) if slot == 5210)); } #[test] fn test_multi_shard() { let mut cmd = cmd("DEL"); cmd.arg("foo").arg("bar").arg("baz").arg("{bar}vaz"); - let routing = RoutingInfo::for_routable(&cmd, true); + let routing = RoutingInfo::for_routable(&cmd); let mut expected = std::collections::HashMap::new(); expected.insert(Route(4813, SlotAddr::Master), vec![3]); expected.insert(Route(5061, SlotAddr::Master), vec![2, 4]); @@ -943,7 +940,7 @@ mod tests { let mut cmd = crate::cmd("MGET"); cmd.arg("foo").arg("bar").arg("baz").arg("{bar}vaz"); - let routing = RoutingInfo::for_routable(&cmd, true); + let routing = RoutingInfo::for_routable(&cmd); let mut expected = std::collections::HashMap::new(); expected.insert(Route(4813, SlotAddr::Replica), vec![3]); expected.insert(Route(5061, SlotAddr::Replica), vec![2, 4]); @@ -962,7 +959,7 @@ mod tests { fn test_combine_multi_shard_to_single_node_when_all_keys_are_in_same_slot() { let mut cmd = cmd("DEL"); cmd.arg("foo").arg("{foo}bar").arg("{foo}baz"); - let routing = RoutingInfo::for_routable(&cmd, true); + let routing = RoutingInfo::for_routable(&cmd); assert!( matches!( From 89846b19126fea5bf67b3c427278c33f606b4913 Mon Sep 17 00:00:00 2001 From: Shachar Langbeheim Date: Thu, 14 Sep 2023 14:23:26 +0000 Subject: [PATCH 2/3] Use round-robin read from replica in clusters. --- redis/src/cluster.rs | 12 +- .../cluster_async/connections_container.rs | 115 +++++--- redis/src/cluster_async/mod.rs | 16 +- redis/src/cluster_client.rs | 13 +- redis/src/cluster_routing.rs | 54 +--- redis/src/cluster_topology.rs | 266 ++++++++++++------ redis/tests/test_cluster_async.rs | 126 +++++++++ 7 files changed, 425 insertions(+), 177 deletions(-) diff --git a/redis/src/cluster.rs b/redis/src/cluster.rs index 33efc3321..4fc243023 100644 --- a/redis/src/cluster.rs +++ b/redis/src/cluster.rs @@ -128,7 +128,6 @@ pub struct ClusterConnection { connections: RefCell>, slots: RefCell, auto_reconnect: RefCell, - read_from_replicas: bool, read_timeout: RefCell>, write_timeout: RefCell>, cluster_params: ClusterParams, @@ -144,9 +143,8 @@ where ) -> RedisResult { let connection = Self { connections: RefCell::new(HashMap::new()), - slots: RefCell::new(SlotMap::new(vec![])), + slots: RefCell::new(SlotMap::new(vec![], cluster_params.read_from_replicas)), auto_reconnect: RefCell::new(true), - read_from_replicas: cluster_params.read_from_replicas, cluster_params, read_timeout: RefCell::new(None), write_timeout: RefCell::new(None), @@ -298,7 +296,9 @@ where ))); for conn in samples.iter_mut() { let value = conn.req_command(&slot_cmd())?; - match parse_slots(&value, self.cluster_params.tls).map(SlotMap::new) { + match parse_slots(&value, self.cluster_params.tls) + .map(|slots_data| SlotMap::new(slots_data, self.cluster_params.read_from_replicas)) + { Ok(new_slots) => { result = Ok(new_slots); break; @@ -313,7 +313,9 @@ where let info = get_connection_info(node, self.cluster_params.clone())?; let mut conn = C::connect(info, Some(self.cluster_params.connection_timeout))?; - if self.read_from_replicas { + if self.cluster_params.read_from_replicas + != crate::cluster_topology::ReadFromReplicaStrategy::AlwaysFromPrimary + { // If READONLY is sent to primary nodes, it will have no effect cmd("READONLY").query(&mut conn)?; } diff --git a/redis/src/cluster_async/connections_container.rs b/redis/src/cluster_async/connections_container.rs index d35f5c3e9..f2b635cfc 100644 --- a/redis/src/cluster_async/connections_container.rs +++ b/redis/src/cluster_async/connections_container.rs @@ -4,7 +4,7 @@ use std::net::IpAddr; use rand::seq::IteratorRandom; use crate::cluster_routing::{MultipleNodeRoutingInfo, Route, SlotAddr}; -use crate::cluster_topology::SlotMap; +use crate::cluster_topology::{ReadFromReplicaStrategy, SlotMap, SlotMapValue}; type IdentifierType = String; @@ -30,6 +30,7 @@ pub(crate) struct Identifier(IdentifierType); pub(crate) struct ConnectionsContainer { connection_map: HashMap>>, slot_map: SlotMap, + read_from_replica_strategy: ReadFromReplicaStrategy, } impl Default for ConnectionsContainer { @@ -37,6 +38,7 @@ impl Default for ConnectionsContainer { Self { connection_map: Default::default(), slot_map: Default::default(), + read_from_replica_strategy: ReadFromReplicaStrategy::AlwaysFromPrimary, } } } @@ -50,6 +52,7 @@ where pub(crate) fn new( slot_map: SlotMap, connection_map: HashMap>, + read_from_replica_strategy: ReadFromReplicaStrategy, ) -> Self { Self { connection_map: connection_map @@ -57,12 +60,57 @@ where .map(|(address, node)| (Identifier(address), Some(node))) .collect(), slot_map, + read_from_replica_strategy, + } + } + + fn round_robin_read_from_replica( + &self, + slot_map_value: &SlotMapValue, + ) -> Option> { + let addrs = &slot_map_value.addrs; + let initial_index = slot_map_value + .latest_used_replica + .load(std::sync::atomic::Ordering::Relaxed); + let mut check_count = 0; + loop { + check_count += 1; + + // Looped through all replicas, no connected replica was found. + if check_count > addrs.replicas.len() { + return self.connection_for_address(addrs.primary.as_str()); + } + let index = (initial_index + check_count) % addrs.replicas.len(); + if let Some(connection) = self.connection_for_address(addrs.replicas[index].as_str()) { + let _ = slot_map_value.latest_used_replica.compare_exchange_weak( + initial_index, + index, + std::sync::atomic::Ordering::Relaxed, + std::sync::atomic::Ordering::Relaxed, + ); + return Some(connection); + } } } fn lookup_route(&self, route: &Route) -> Option> { - let address = self.slot_map.slot_addr_for_route(route)?; - self.connection_for_address(address) + let slot_map_value = self.slot_map.slot_value_for_route(route)?; + let addrs = &slot_map_value.addrs; + if addrs.replicas.is_empty() { + return self.connection_for_address(addrs.primary.as_str()); + } + + match route.slot_addr() { + SlotAddr::Master => self.connection_for_address(addrs.primary.as_str()), + SlotAddr::Replica => match self.read_from_replica_strategy { + ReadFromReplicaStrategy::AlwaysFromPrimary => { + self.connection_for_address(addrs.primary.as_str()) + } + ReadFromReplicaStrategy::RoundRobin => { + self.round_robin_read_from_replica(slot_map_value) + } + }, + } } pub(crate) fn connection_for_route( @@ -210,21 +258,24 @@ mod tests { } fn create_container() -> ConnectionsContainer { - let slot_map = SlotMap::new(vec![ - Slot::new(1, 1000, "primary1".to_owned(), Vec::new()), - Slot::new( - 1002, - 2000, - "primary2".to_owned(), - vec!["replica2-1".to_owned()], - ), - Slot::new( - 2001, - 3000, - "primary3".to_owned(), - vec!["replica3-1".to_owned(), "replica3-2".to_owned()], - ), - ]); + let slot_map = SlotMap::new( + vec![ + Slot::new(1, 1000, "primary1".to_owned(), Vec::new()), + Slot::new( + 1002, + 2000, + "primary2".to_owned(), + vec!["replica2-1".to_owned()], + ), + Slot::new( + 2001, + 3000, + "primary3".to_owned(), + vec!["replica3-1".to_owned(), "replica3-2".to_owned()], + ), + ], + ReadFromReplicaStrategy::AlwaysFromPrimary, // this argument shouldn't matter, since we overload the RFR strategy. + ); let mut connection_map = HashMap::new(); connection_map.insert( Identifier("primary1".into()), @@ -254,6 +305,7 @@ mod tests { ConnectionsContainer { slot_map, connection_map, + read_from_replica_strategy: ReadFromReplicaStrategy::RoundRobin, } } @@ -365,20 +417,19 @@ mod tests { ); } - // TODO - this is waiting for slot_map to support this. - // #[test] - // fn get_replica_connection_for_replica_route_if_some_but_not_all_replicas_were_removed() { - // let mut container = create_container(); - // container.remove_connection(&Identifier("replica3-2".into())); - - // assert_eq!( - // 31, - // container - // .connection_for_route(&Route::new(2001, SlotAddr::Replica)) - // .unwrap() - // .1 - // ); - // } + #[test] + fn get_replica_connection_for_replica_route_if_some_but_not_all_replicas_were_removed() { + let mut container = create_container(); + container.remove_connection(&Identifier("replica3-2".into())); + + assert_eq!( + 31, + container + .connection_for_route(&Route::new(2001, SlotAddr::Replica)) + .unwrap() + .1 + ); + } #[test] fn get_primary_connection_for_replica_route_if_all_replicas_were_removed() { diff --git a/redis/src/cluster_async/mod.rs b/redis/src/cluster_async/mod.rs index 8d94e0e1f..36f64cd5b 100644 --- a/redis/src/cluster_async/mod.rs +++ b/redis/src/cluster_async/mod.rs @@ -480,7 +480,11 @@ where ) -> RedisResult { let connections = Self::create_initial_connections(initial_nodes, &cluster_params).await?; let inner = Arc::new(InnerCore { - conn_lock: RwLock::new(ConnectionsContainer::new(Default::default(), connections)), + conn_lock: RwLock::new(ConnectionsContainer::new( + Default::default(), + connections, + cluster_params.read_from_replicas, + )), cluster_params, pending_requests: Mutex::new(Vec::new()), }); @@ -736,6 +740,7 @@ where curr_retry, inner.cluster_params.tls, num_of_nodes_to_query, + inner.cluster_params.read_from_replicas, )?; let connections = &*read_guard; // Create a new connection vector of the found nodes @@ -786,7 +791,11 @@ where drop(read_guard); // Replace the current slot map and connection vector with the new ones let mut write_guard = inner.conn_lock.write().await; - *write_guard = ConnectionsContainer::new(new_slots, new_connections); + *write_guard = ConnectionsContainer::new( + new_slots, + new_connections, + inner.cluster_params.read_from_replicas, + ); Ok(()) } @@ -1427,7 +1436,8 @@ async fn connect_and_check( where C: ConnectionLike + Connect + Send + 'static, { - let read_from_replicas = params.read_from_replicas; + let read_from_replicas = params.read_from_replicas + != crate::cluster_topology::ReadFromReplicaStrategy::AlwaysFromPrimary; let connection_timeout = params.connection_timeout.into(); let info = get_connection_info(node, params)?; let (mut conn, ip) = C::connect(info, socket_addr) diff --git a/redis/src/cluster_client.rs b/redis/src/cluster_client.rs index 4941b9142..78685a70a 100644 --- a/redis/src/cluster_client.rs +++ b/redis/src/cluster_client.rs @@ -2,6 +2,7 @@ use std::time::Duration; use rand::Rng; +use crate::cluster_topology::ReadFromReplicaStrategy; use crate::connection::{ConnectionAddr, ConnectionInfo, IntoConnectionInfo}; use crate::types::{ErrorKind, RedisError, RedisResult}; use crate::{cluster, TlsMode}; @@ -16,7 +17,7 @@ use crate::cluster_async; struct BuilderParams { password: Option, username: Option, - read_from_replicas: bool, + read_from_replicas: ReadFromReplicaStrategy, tls: Option, retries_configuration: RetryParams, connection_timeout: Option, @@ -64,7 +65,7 @@ impl RetryParams { pub(crate) struct ClusterParams { pub(crate) password: Option, pub(crate) username: Option, - pub(crate) read_from_replicas: bool, + pub(crate) read_from_replicas: ReadFromReplicaStrategy, /// tls indicates tls behavior of connections. /// When Some(TlsMode), connections use tls and verify certification depends on TlsMode. /// When None, connections do not use tls. @@ -237,7 +238,7 @@ impl ClusterClientBuilder { /// If enabled, then read queries will go to the replica nodes & write queries will go to the /// primary nodes. If there are no replica nodes, then all queries will go to the primary nodes. pub fn read_from_replicas(mut self) -> ClusterClientBuilder { - self.builder_params.read_from_replicas = true; + self.builder_params.read_from_replicas = ReadFromReplicaStrategy::RoundRobin; self } @@ -258,7 +259,11 @@ impl ClusterClientBuilder { /// Use `read_from_replicas()`. #[deprecated(since = "0.22.0", note = "Use read_from_replicas()")] pub fn readonly(mut self, read_from_replicas: bool) -> ClusterClientBuilder { - self.builder_params.read_from_replicas = read_from_replicas; + self.builder_params.read_from_replicas = if read_from_replicas { + ReadFromReplicaStrategy::RoundRobin + } else { + ReadFromReplicaStrategy::AlwaysFromPrimary + }; self } } diff --git a/redis/src/cluster_routing.rs b/redis/src/cluster_routing.rs index f16736740..0ee347044 100644 --- a/redis/src/cluster_routing.rs +++ b/redis/src/cluster_routing.rs @@ -2,8 +2,6 @@ use crate::cluster_topology::get_slot; use crate::cmd::{Arg, Cmd}; use crate::types::Value; use crate::{ErrorKind, RedisResult}; -use rand::seq::SliceRandom; -use rand::thread_rng; use std::cmp::min; use std::collections::HashMap; use std::iter::{Iterator, Once}; @@ -569,7 +567,7 @@ impl Slot { } } -/// What type of node should a request be routed to. +/// What type of node should a request be routed to, assuming read from replica is enabled. #[derive(Eq, PartialEq, Clone, Copy, Debug, Hash)] pub enum SlotAddr { /// Primary node @@ -584,8 +582,8 @@ pub enum SlotAddr { /// a command is executed #[derive(Debug, Eq, PartialEq)] pub(crate) struct SlotAddrs { - primary: String, - replicas: Vec, + pub(crate) primary: String, + pub(crate) replicas: Vec, } impl SlotAddrs { @@ -593,17 +591,6 @@ impl SlotAddrs { Self { primary, replicas } } - pub(crate) fn slot_addr(&self, slot_addr: SlotAddr) -> &str { - match slot_addr { - SlotAddr::Master => &self.primary, - SlotAddr::Replica => self - .replicas - .choose(&mut thread_rng()) - .unwrap_or(&self.primary), - } - .as_str() - } - pub(crate) fn from_slot(slot: Slot) -> Self { SlotAddrs::new(slot.master, slot.replicas) } @@ -863,41 +850,6 @@ mod tests { } } - #[test] - fn test_routing_info_without_allowing_replicas() { - for (cmd, expected) in [ - ( - cmd("XINFO").arg("GROUPS").arg("foo"), - Some(RoutingInfo::SingleNode( - SingleNodeRoutingInfo::SpecificNode(Route::new(slot(b"foo"), SlotAddr::Master)), - )), - ), - ( - cmd("XREAD") - .arg("COUNT") - .arg("2") - .arg("STREAMS") - .arg("mystream") - .arg("writers") - .arg("0-0") - .arg("0-0"), - Some(RoutingInfo::SingleNode( - SingleNodeRoutingInfo::SpecificNode(Route::new( - slot(b"mystream"), - SlotAddr::Master, - )), - )), - ), - ] { - assert_eq!( - RoutingInfo::for_routable(cmd), - expected, - "{}", - std::str::from_utf8(cmd.arg_idx(0).unwrap()).unwrap() - ); - } - } - #[test] fn test_slot_for_packed_cmd() { assert!(matches!(RoutingInfo::for_routable(&parse_redis_value(&[ diff --git a/redis/src/cluster_topology.rs b/redis/src/cluster_topology.rs index ecb94b648..1862e982e 100644 --- a/redis/src/cluster_topology.rs +++ b/redis/src/cluster_topology.rs @@ -1,17 +1,12 @@ //! This module provides the functionality to refresh and calculate the cluster topology for Redis Cluster. use crate::cluster::get_connection_addr; -use crate::cluster_routing::MultipleNodeRoutingInfo; -use crate::cluster_routing::Route; -use crate::cluster_routing::SlotAddr; -use crate::cluster_routing::SlotAddrs; -use crate::{cluster::TlsMode, cluster_routing::Slot, ErrorKind, RedisError, RedisResult, Value}; +use crate::cluster_routing::{MultipleNodeRoutingInfo, Route, Slot, SlotAddr, SlotAddrs}; +use crate::{cluster::TlsMode, ErrorKind, RedisError, RedisResult, Value}; use derivative::Derivative; -use std::collections::hash_map::DefaultHasher; -use std::collections::BTreeMap; -use std::collections::HashMap; -use std::collections::HashSet; +use std::collections::{hash_map::DefaultHasher, BTreeMap, HashMap, HashSet}; use std::hash::{Hash, Hasher}; +use std::sync::atomic::AtomicUsize; use std::time::Duration; use tracing::trace; @@ -58,9 +53,10 @@ impl TopologyView { } #[derive(Debug)] -struct SlotMapValue { +pub(crate) struct SlotMapValue { start: u16, - addrs: SlotAddrs, + pub(crate) addrs: SlotAddrs, + pub(crate) latest_used_replica: AtomicUsize, } impl SlotMapValue { @@ -68,17 +64,51 @@ impl SlotMapValue { Self { start: slot.start(), addrs: SlotAddrs::from_slot(slot), + latest_used_replica: AtomicUsize::new(0), } } } +#[derive(Debug, Default, Clone, PartialEq, Copy)] +pub(crate) enum ReadFromReplicaStrategy { + #[default] + AlwaysFromPrimary, + RoundRobin, +} + #[derive(Debug, Default)] -pub(crate) struct SlotMap(BTreeMap); +pub(crate) struct SlotMap { + slots: BTreeMap, + read_from_replica: ReadFromReplicaStrategy, +} + +fn get_address_from_slot( + slot: &SlotMapValue, + read_from_replica: ReadFromReplicaStrategy, + slot_addr: SlotAddr, +) -> &str { + if slot_addr == SlotAddr::Master || slot.addrs.replicas.is_empty() { + return slot.addrs.primary.as_str(); + } + match read_from_replica { + ReadFromReplicaStrategy::AlwaysFromPrimary => slot.addrs.primary.as_str(), + ReadFromReplicaStrategy::RoundRobin => { + let index = slot + .latest_used_replica + .fetch_add(1, std::sync::atomic::Ordering::Relaxed) + % slot.addrs.replicas.len(); + slot.addrs.replicas[index].as_str() + } + } +} impl SlotMap { - pub fn new(slots: Vec) -> Self { - let mut this = Self(BTreeMap::new()); - this.0.extend( + pub(crate) fn new(slots: Vec, read_from_replica: ReadFromReplicaStrategy) -> Self { + let mut this = Self { + slots: BTreeMap::new(), + read_from_replica, + }; + this.slots.extend( slots .into_iter() .map(|slot| (slot.end(), SlotMapValue::from_slot(slot))), @@ -87,28 +117,36 @@ impl SlotMap { this } - pub fn slot_addr_for_route(&self, route: &Route) -> Option<&str> { + pub fn slot_value_for_route(&self, route: &Route) -> Option<&SlotMapValue> { let slot = route.slot(); - self.0.range(slot..).next().and_then(|(end, slot_value)| { - if slot <= *end && slot_value.start <= slot { - Some(slot_value.addrs.slot_addr(route.slot_addr())) - } else { - None - } + self.slots + .range(slot..) + .next() + .and_then(|(end, slot_value)| { + if slot <= *end && slot_value.start <= slot { + Some(slot_value) + } else { + None + } + }) + } + + pub fn slot_addr_for_route(&self, route: &Route) -> Option<&str> { + self.slot_value_for_route(route).map(|slot_value| { + get_address_from_slot(slot_value, self.read_from_replica, route.slot_addr()) }) } pub fn values(&self) -> impl Iterator { - self.0.values().map(|slot_value| &slot_value.addrs) + self.slots.values().map(|slot_value| &slot_value.addrs) } fn all_unique_addresses(&self, only_primaries: bool) -> HashSet<&str> { let mut addresses = HashSet::new(); for slot in self.values() { - if only_primaries { - addresses.insert(slot.slot_addr(SlotAddr::Master)); - } else { - addresses.extend(slot.into_iter().map(|str| str.as_str())); + addresses.insert(slot.primary.as_str()); + if !only_primaries { + addresses.extend(slot.replicas.iter().map(|str| str.as_str())); } } @@ -243,6 +281,7 @@ pub(crate) fn calculate_topology( curr_retry: usize, tls_mode: Option, num_of_queried_nodes: usize, + read_from_replica: ReadFromReplicaStrategy, ) -> Result { if topology_views.is_empty() { return Err(RedisError::from(( @@ -316,7 +355,7 @@ pub(crate) fn calculate_topology( "Failed to parse the slots on the majority view", )))?; - Ok(SlotMap::new(slots_data)) + Ok(SlotMap::new(slots_data, read_from_replica)) }; if non_unique_max_node_count { @@ -432,7 +471,14 @@ mod tests { get_view(&ViewType::SingleNodeViewFullCoverage), get_view(&ViewType::TwoNodesViewFullCoverage), ]; - let topology_view = calculate_topology(topology_results, 1, None, queried_nodes).unwrap(); + let topology_view = calculate_topology( + topology_results, + 1, + None, + queried_nodes, + ReadFromReplicaStrategy::AlwaysFromPrimary, + ) + .unwrap(); let res: Vec<_> = topology_view.values().collect(); let node_1 = get_node_addr("node1", 6379); let expected: Vec<&SlotAddrs> = vec![&node_1]; @@ -448,7 +494,13 @@ mod tests { get_view(&ViewType::TwoNodesViewFullCoverage), get_view(&ViewType::TwoNodesViewMissingSlots), ]; - let topology_view = calculate_topology(topology_results, 1, None, queried_nodes); + let topology_view = calculate_topology( + topology_results, + 1, + None, + queried_nodes, + ReadFromReplicaStrategy::AlwaysFromPrimary, + ); assert!(topology_view.is_err()); } @@ -461,7 +513,14 @@ mod tests { get_view(&ViewType::TwoNodesViewFullCoverage), get_view(&ViewType::TwoNodesViewMissingSlots), ]; - let topology_view = calculate_topology(topology_results, 3, None, queried_nodes).unwrap(); + let topology_view = calculate_topology( + topology_results, + 3, + None, + queried_nodes, + ReadFromReplicaStrategy::AlwaysFromPrimary, + ) + .unwrap(); let res: Vec<_> = topology_view.values().collect(); let node_1 = get_node_addr("node1", 6379); let node_2 = get_node_addr("node2", 6380); @@ -477,7 +536,14 @@ mod tests { get_view(&ViewType::TwoNodesViewFullCoverage), get_view(&ViewType::TwoNodesViewMissingSlots), ]; - let topology_view = calculate_topology(topology_results, 1, None, queried_nodes).unwrap(); + let topology_view = calculate_topology( + topology_results, + 1, + None, + queried_nodes, + ReadFromReplicaStrategy::AlwaysFromPrimary, + ) + .unwrap(); let res: Vec<_> = topology_view.values().collect(); let node_1 = get_node_addr("node1", 6379); let node_2 = get_node_addr("node2", 6380); @@ -494,7 +560,14 @@ mod tests { get_view(&ViewType::SingleNodeViewMissingSlots), get_view(&ViewType::TwoNodesViewMissingSlots), ]; - let topology_view = calculate_topology(topology_results, 1, None, queried_nodes).unwrap(); + let topology_view = calculate_topology( + topology_results, + 1, + None, + queried_nodes, + ReadFromReplicaStrategy::AlwaysFromPrimary, + ) + .unwrap(); let res: Vec<_> = topology_view.values().collect(); let node_1 = get_node_addr("node3", 6381); let node_2 = get_node_addr("node4", 6382); @@ -511,7 +584,14 @@ mod tests { get_view(&ViewType::TwoNodesViewMissingSlots), get_view(&ViewType::SingleNodeViewMissingSlots), ]; - let topology_view = calculate_topology(topology_results, 1, None, queried_nodes).unwrap(); + let topology_view = calculate_topology( + topology_results, + 1, + None, + queried_nodes, + ReadFromReplicaStrategy::AlwaysFromPrimary, + ) + .unwrap(); let res: Vec<_> = topology_view.values().collect(); let node_1 = get_node_addr("node1", 6379); let expected: Vec<&SlotAddrs> = vec![&node_1]; @@ -520,20 +600,23 @@ mod tests { #[test] fn test_slot_map_retrieve_routes() { - let slot_map = SlotMap::new(vec![ - Slot::new( - 1, - 1000, - "node1:6379".to_owned(), - vec!["replica1:6379".to_owned()], - ), - Slot::new( - 1002, - 2000, - "node2:6379".to_owned(), - vec!["replica2:6379".to_owned()], - ), - ]); + let slot_map = SlotMap::new( + vec![ + Slot::new( + 1, + 1000, + "node1:6379".to_owned(), + vec!["replica1:6379".to_owned()], + ), + Slot::new( + 1002, + 2000, + "node2:6379".to_owned(), + vec!["replica2:6379".to_owned()], + ), + ], + ReadFromReplicaStrategy::AlwaysFromPrimary, + ); assert!(slot_map .slot_addr_for_route(&Route::new(0, SlotAddr::Master)) @@ -583,42 +666,45 @@ mod tests { .is_none()); } - fn get_slot_map() -> SlotMap { - SlotMap::new(vec![ - Slot::new( - 1, - 1000, - "node1:6379".to_owned(), - vec!["replica1:6379".to_owned()], - ), - Slot::new( - 1002, - 2000, - "node2:6379".to_owned(), - vec!["replica2:6379".to_owned(), "replica3:6379".to_owned()], - ), - Slot::new( - 2001, - 3000, - "node3:6379".to_owned(), - vec![ - "replica4:6379".to_owned(), - "replica5:6379".to_owned(), - "replica6:6379".to_owned(), - ], - ), - Slot::new( - 3001, - 4000, - "node2:6379".to_owned(), - vec!["replica2:6379".to_owned(), "replica3:6379".to_owned()], - ), - ]) + fn get_slot_map(read_from_replica: ReadFromReplicaStrategy) -> SlotMap { + SlotMap::new( + vec![ + Slot::new( + 1, + 1000, + "node1:6379".to_owned(), + vec!["replica1:6379".to_owned()], + ), + Slot::new( + 1002, + 2000, + "node2:6379".to_owned(), + vec!["replica2:6379".to_owned(), "replica3:6379".to_owned()], + ), + Slot::new( + 2001, + 3000, + "node3:6379".to_owned(), + vec![ + "replica4:6379".to_owned(), + "replica5:6379".to_owned(), + "replica6:6379".to_owned(), + ], + ), + Slot::new( + 3001, + 4000, + "node2:6379".to_owned(), + vec!["replica2:6379".to_owned(), "replica3:6379".to_owned()], + ), + ], + read_from_replica, + ) } #[test] fn test_slot_map_get_all_primaries() { - let slot_map = get_slot_map(); + let slot_map = get_slot_map(ReadFromReplicaStrategy::AlwaysFromPrimary); let mut addresses = slot_map.addresses_for_multi_routing(&MultipleNodeRoutingInfo::AllMasters); addresses.sort(); @@ -627,7 +713,7 @@ mod tests { #[test] fn test_slot_map_get_all_nodes() { - let slot_map = get_slot_map(); + let slot_map = get_slot_map(ReadFromReplicaStrategy::AlwaysFromPrimary); let mut addresses = slot_map.addresses_for_multi_routing(&MultipleNodeRoutingInfo::AllNodes); addresses.sort(); @@ -649,7 +735,7 @@ mod tests { #[test] fn test_slot_map_get_multi_node() { - let slot_map = get_slot_map(); + let slot_map = get_slot_map(ReadFromReplicaStrategy::RoundRobin); let mut addresses = slot_map.addresses_for_multi_routing(&MultipleNodeRoutingInfo::MultiSlot(vec![ (Route::new(1, SlotAddr::Master), vec![]), @@ -663,4 +749,20 @@ mod tests { || addresses.contains(&"replica6:6379") ); } + + #[test] + fn test_slot_map_rotate_read_replicas() { + let slot_map = get_slot_map(ReadFromReplicaStrategy::RoundRobin); + let route = Route::new(2001, SlotAddr::Replica); + let mut addresses = vec![ + slot_map.slot_addr_for_route(&route).unwrap(), + slot_map.slot_addr_for_route(&route).unwrap(), + slot_map.slot_addr_for_route(&route).unwrap(), + ]; + addresses.sort(); + assert_eq!( + addresses, + vec!["replica4:6379", "replica5:6379", "replica6:6379"] + ); + } } diff --git a/redis/tests/test_cluster_async.rs b/redis/tests/test_cluster_async.rs index b4090861c..6581737cd 100644 --- a/redis/tests/test_cluster_async.rs +++ b/redis/tests/test_cluster_async.rs @@ -1583,3 +1583,129 @@ fn test_async_cluster_non_retryable_error_should_not_retry() { } assert_eq!(completed.load(Ordering::SeqCst), 1); } + +#[test] +fn test_async_cluster_read_from_primary() { + let name = "node"; + let found_ports = Arc::new(std::sync::Mutex::new(Vec::new())); + let ports_clone = found_ports.clone(); + let MockEnv { + runtime, + async_connection: mut connection, + handler: _handler, + .. + } = MockEnv::with_client_builder( + ClusterClient::builder(vec![&*format!("redis://{name}")]).retries(0), + name, + move |received_cmd: &[u8], port| { + respond_startup_with_replica_using_config( + name, + received_cmd, + Some(vec![ + MockSlotRange { + primary_port: 6379, + replica_ports: vec![6380, 6381], + slot_range: (0..8191), + }, + MockSlotRange { + primary_port: 6382, + replica_ports: vec![6383, 6384], + slot_range: (8192..16383), + }, + ]), + )?; + ports_clone.lock().unwrap().push(port); + Err(Ok(Value::Nil)) + }, + ); + + runtime.block_on(async { + cmd("GET") + .arg("foo") + .query_async::<_, ()>(&mut connection) + .await + .unwrap(); + cmd("GET") + .arg("bar") + .query_async::<_, ()>(&mut connection) + .await + .unwrap(); + cmd("GET") + .arg("foo") + .query_async::<_, ()>(&mut connection) + .await + .unwrap(); + cmd("GET") + .arg("bar") + .query_async::<_, ()>(&mut connection) + .await + .unwrap(); + }); + + found_ports.lock().unwrap().sort(); + assert_eq!(*found_ports.lock().unwrap(), vec![6379, 6379, 6382, 6382]); +} + +#[test] +fn test_async_cluster_round_robin_read_from_replica() { + let name = "node"; + let found_ports = Arc::new(std::sync::Mutex::new(Vec::new())); + let ports_clone = found_ports.clone(); + let MockEnv { + runtime, + async_connection: mut connection, + handler: _handler, + .. + } = MockEnv::with_client_builder( + ClusterClient::builder(vec![&*format!("redis://{name}")]) + .retries(0) + .read_from_replicas(), + name, + move |received_cmd: &[u8], port| { + respond_startup_with_replica_using_config( + name, + received_cmd, + Some(vec![ + MockSlotRange { + primary_port: 6379, + replica_ports: vec![6380, 6381], + slot_range: (0..8191), + }, + MockSlotRange { + primary_port: 6382, + replica_ports: vec![6383, 6384], + slot_range: (8192..16383), + }, + ]), + )?; + ports_clone.lock().unwrap().push(port); + Err(Ok(Value::Nil)) + }, + ); + + runtime.block_on(async { + cmd("GET") + .arg("foo") + .query_async::<_, ()>(&mut connection) + .await + .unwrap(); + cmd("GET") + .arg("bar") + .query_async::<_, ()>(&mut connection) + .await + .unwrap(); + cmd("GET") + .arg("foo") + .query_async::<_, ()>(&mut connection) + .await + .unwrap(); + cmd("GET") + .arg("bar") + .query_async::<_, ()>(&mut connection) + .await + .unwrap(); + }); + + found_ports.lock().unwrap().sort(); + assert_eq!(*found_ports.lock().unwrap(), vec![6380, 6381, 6383, 6384]); +} From 350dfc5083494be81df2a47ec008402b0563cff2 Mon Sep 17 00:00:00 2001 From: Shachar Langbeheim Date: Thu, 21 Sep 2023 10:24:26 +0000 Subject: [PATCH 3/3] Allow user to force send to replica. --- .../cluster_async/connections_container.rs | 48 +++++++++++++------ redis/src/cluster_async/mod.rs | 2 +- redis/src/cluster_routing.rs | 24 ++++++---- redis/src/cluster_topology.rs | 4 +- 4 files changed, 50 insertions(+), 28 deletions(-) diff --git a/redis/src/cluster_async/connections_container.rs b/redis/src/cluster_async/connections_container.rs index f2b635cfc..3da9a9d14 100644 --- a/redis/src/cluster_async/connections_container.rs +++ b/redis/src/cluster_async/connections_container.rs @@ -102,7 +102,7 @@ where match route.slot_addr() { SlotAddr::Master => self.connection_for_address(addrs.primary.as_str()), - SlotAddr::Replica => match self.read_from_replica_strategy { + SlotAddr::ReplicaOptional => match self.read_from_replica_strategy { ReadFromReplicaStrategy::AlwaysFromPrimary => { self.connection_for_address(addrs.primary.as_str()) } @@ -110,6 +110,7 @@ where self.round_robin_read_from_replica(slot_map_value) } }, + SlotAddr::ReplicaRequired => self.round_robin_read_from_replica(slot_map_value), } } @@ -118,7 +119,7 @@ where route: &Route, ) -> Option> { self.lookup_route(route).or_else(|| { - if route.slot_addr() == SlotAddr::Replica { + if route.slot_addr() != SlotAddr::Master { self.lookup_route(&Route::new(route.slot(), SlotAddr::Master)) } else { None @@ -257,7 +258,9 @@ mod tests { expected_connections.contains(&found) } - fn create_container() -> ConnectionsContainer { + fn create_container_with_strategy( + stragey: ReadFromReplicaStrategy, + ) -> ConnectionsContainer { let slot_map = SlotMap::new( vec![ Slot::new(1, 1000, "primary1".to_owned(), Vec::new()), @@ -305,10 +308,14 @@ mod tests { ConnectionsContainer { slot_map, connection_map, - read_from_replica_strategy: ReadFromReplicaStrategy::RoundRobin, + read_from_replica_strategy: stragey, } } + fn create_container() -> ConnectionsContainer { + create_container_with_strategy(ReadFromReplicaStrategy::RoundRobin) + } + #[test] fn get_connection_for_primary_route() { let container = create_container(); @@ -367,13 +374,13 @@ mod tests { let container = create_container(); assert!(container - .connection_for_route(&Route::new(1001, SlotAddr::Replica)) + .connection_for_route(&Route::new(1001, SlotAddr::ReplicaOptional)) .is_none()); assert_eq!( 21, container - .connection_for_route(&Route::new(1002, SlotAddr::Replica)) + .connection_for_route(&Route::new(1002, SlotAddr::ReplicaOptional)) .unwrap() .1 ); @@ -381,13 +388,13 @@ mod tests { assert_eq!( 21, container - .connection_for_route(&Route::new(1500, SlotAddr::Replica)) + .connection_for_route(&Route::new(1500, SlotAddr::ReplicaOptional)) .unwrap() .1 ); assert!(one_of( - container.connection_for_route(&Route::new(2001, SlotAddr::Replica)), + container.connection_for_route(&Route::new(2001, SlotAddr::ReplicaOptional)), &[31, 32], )); } @@ -397,13 +404,13 @@ mod tests { let container = create_container(); assert!(container - .connection_for_route(&Route::new(0, SlotAddr::Replica)) + .connection_for_route(&Route::new(0, SlotAddr::ReplicaOptional)) .is_none()); assert_eq!( 1, container - .connection_for_route(&Route::new(500, SlotAddr::Replica)) + .connection_for_route(&Route::new(500, SlotAddr::ReplicaOptional)) .unwrap() .1 ); @@ -411,7 +418,7 @@ mod tests { assert_eq!( 1, container - .connection_for_route(&Route::new(1000, SlotAddr::Replica)) + .connection_for_route(&Route::new(1000, SlotAddr::ReplicaOptional)) .unwrap() .1 ); @@ -425,12 +432,23 @@ mod tests { assert_eq!( 31, container - .connection_for_route(&Route::new(2001, SlotAddr::Replica)) + .connection_for_route(&Route::new(2001, SlotAddr::ReplicaRequired)) .unwrap() .1 ); } + #[test] + fn get_replica_connection_for_replica_route_if_replica_is_required_even_if_strategy_is_always_from_primary( + ) { + let container = create_container_with_strategy(ReadFromReplicaStrategy::AlwaysFromPrimary); + + assert!(one_of( + container.connection_for_route(&Route::new(2001, SlotAddr::ReplicaRequired)), + &[31, 32], + )); + } + #[test] fn get_primary_connection_for_replica_route_if_all_replicas_were_removed() { let mut container = create_container(); @@ -439,7 +457,7 @@ mod tests { assert_eq!( 2, container - .connection_for_route(&Route::new(1002, SlotAddr::Replica)) + .connection_for_route(&Route::new(1002, SlotAddr::ReplicaOptional)) .unwrap() .1 ); @@ -447,7 +465,7 @@ mod tests { assert_eq!( 2, container - .connection_for_route(&Route::new(1500, SlotAddr::Replica)) + .connection_for_route(&Route::new(1500, SlotAddr::ReplicaOptional)) .unwrap() .1 ); @@ -455,7 +473,7 @@ mod tests { assert_eq!( 3, container - .connection_for_route(&Route::new(2001, SlotAddr::Replica)) + .connection_for_route(&Route::new(2001, SlotAddr::ReplicaOptional)) .unwrap() .1 ); diff --git a/redis/src/cluster_async/mod.rs b/redis/src/cluster_async/mod.rs index 36f64cd5b..cef0ed0b5 100644 --- a/redis/src/cluster_async/mod.rs +++ b/redis/src/cluster_async/mod.rs @@ -1494,7 +1494,7 @@ mod pipeline_routing_tests { assert_eq!( route_pipeline(&pipeline), - Some(Route::new(12182, SlotAddr::Replica)) + Some(Route::new(12182, SlotAddr::ReplicaOptional)) ); } diff --git a/redis/src/cluster_routing.rs b/redis/src/cluster_routing.rs index 0ee347044..fe78bfc39 100644 --- a/redis/src/cluster_routing.rs +++ b/redis/src/cluster_routing.rs @@ -195,7 +195,7 @@ pub(crate) fn combine_and_sort_array_results<'a>( fn get_route(is_readonly: bool, key: &[u8]) -> Route { let slot = get_slot(key); if is_readonly { - Route::new(slot, SlotAddr::Replica) + Route::new(slot, SlotAddr::ReplicaOptional) } else { Route::new(slot, SlotAddr::Master) } @@ -570,10 +570,14 @@ impl Slot { /// What type of node should a request be routed to, assuming read from replica is enabled. #[derive(Eq, PartialEq, Clone, Copy, Debug, Hash)] pub enum SlotAddr { - /// Primary node + /// The request must be routed to primary node Master, - /// Replica node - Replica, + /// The request may be routed to a replica node. + /// For example, a GET command can be routed either to replica or primary. + ReplicaOptional, + /// The request must be routed to replica node, if one exists. + /// For example, by user requested routing. + ReplicaRequired, } /// This is just a simplified version of [`Slot`], @@ -806,7 +810,7 @@ mod tests { Some(RoutingInfo::SingleNode( SingleNodeRoutingInfo::SpecificNode(Route::new( slot(b"foo"), - SlotAddr::Replica, + SlotAddr::ReplicaOptional, )), )), ), @@ -836,7 +840,7 @@ mod tests { Some(RoutingInfo::SingleNode( SingleNodeRoutingInfo::SpecificNode(Route::new( slot(b"mystream"), - SlotAddr::Replica, + SlotAddr::ReplicaOptional, )), )), ), @@ -855,7 +859,7 @@ mod tests { assert!(matches!(RoutingInfo::for_routable(&parse_redis_value(&[ 42, 50, 13, 10, 36, 54, 13, 10, 69, 88, 73, 83, 84, 83, 13, 10, 36, 49, 54, 13, 10, 244, 93, 23, 40, 126, 127, 253, 33, 89, 47, 185, 204, 171, 249, 96, 139, 13, 10 - ]).unwrap()), Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::SpecificNode(Route(slot, SlotAddr::Replica)))) if slot == 964)); + ]).unwrap()), Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::SpecificNode(Route(slot, SlotAddr::ReplicaOptional)))) if slot == 964)); assert!(matches!(RoutingInfo::for_routable(&parse_redis_value(&[ 42, 54, 13, 10, 36, 51, 13, 10, 83, 69, 84, 13, 10, 36, 49, 54, 13, 10, 36, 241, @@ -894,9 +898,9 @@ mod tests { cmd.arg("foo").arg("bar").arg("baz").arg("{bar}vaz"); let routing = RoutingInfo::for_routable(&cmd); let mut expected = std::collections::HashMap::new(); - expected.insert(Route(4813, SlotAddr::Replica), vec![3]); - expected.insert(Route(5061, SlotAddr::Replica), vec![2, 4]); - expected.insert(Route(12182, SlotAddr::Replica), vec![1]); + expected.insert(Route(4813, SlotAddr::ReplicaOptional), vec![3]); + expected.insert(Route(5061, SlotAddr::ReplicaOptional), vec![2, 4]); + expected.insert(Route(12182, SlotAddr::ReplicaOptional), vec![1]); assert!( matches!(routing.clone(), Some(RoutingInfo::MultiNode((MultipleNodeRoutingInfo::MultiSlot(vec), Some(ResponsePolicy::CombineArrays)))) if { diff --git a/redis/src/cluster_topology.rs b/redis/src/cluster_topology.rs index 1862e982e..aa138e5b2 100644 --- a/redis/src/cluster_topology.rs +++ b/redis/src/cluster_topology.rs @@ -739,7 +739,7 @@ mod tests { let mut addresses = slot_map.addresses_for_multi_routing(&MultipleNodeRoutingInfo::MultiSlot(vec![ (Route::new(1, SlotAddr::Master), vec![]), - (Route::new(2001, SlotAddr::Replica), vec![]), + (Route::new(2001, SlotAddr::ReplicaOptional), vec![]), ])); addresses.sort(); assert!(addresses.contains(&"node1:6379")); @@ -753,7 +753,7 @@ mod tests { #[test] fn test_slot_map_rotate_read_replicas() { let slot_map = get_slot_map(ReadFromReplicaStrategy::RoundRobin); - let route = Route::new(2001, SlotAddr::Replica); + let route = Route::new(2001, SlotAddr::ReplicaOptional); let mut addresses = vec![ slot_map.slot_addr_for_route(&route).unwrap(), slot_map.slot_addr_for_route(&route).unwrap(),