Skip to content

Commit

Permalink
Use round-robin read from replica in clusters.
Browse files Browse the repository at this point in the history
  • Loading branch information
nihohit committed Aug 16, 2023
1 parent fef4c6a commit 4307936
Show file tree
Hide file tree
Showing 6 changed files with 329 additions and 93 deletions.
12 changes: 7 additions & 5 deletions redis/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ pub struct ClusterConnection<C = Connection> {
connections: RefCell<HashMap<String, C>>,
slots: RefCell<SlotMap>,
auto_reconnect: RefCell<bool>,
read_from_replicas: bool,
read_timeout: RefCell<Option<Duration>>,
write_timeout: RefCell<Option<Duration>>,
cluster_params: ClusterParams,
Expand All @@ -143,9 +142,8 @@ where
) -> RedisResult<Self> {
let connection = Self {
connections: RefCell::new(HashMap::new()),
slots: RefCell::new(SlotMap::new(vec![])),
slots: RefCell::new(SlotMap::new(vec![], cluster_params.read_from_replicas)),
auto_reconnect: RefCell::new(true),
read_from_replicas: cluster_params.read_from_replicas,
cluster_params,
read_timeout: RefCell::new(None),
write_timeout: RefCell::new(None),
Expand Down Expand Up @@ -297,7 +295,9 @@ where
)));
for conn in samples.iter_mut() {
let value = conn.req_command(&slot_cmd())?;
match parse_slots(&value, self.cluster_params.tls).map(SlotMap::new) {
match parse_slots(&value, self.cluster_params.tls)
.map(|slots_data| SlotMap::new(slots_data, self.cluster_params.read_from_replicas))
{
Ok(new_slots) => {
result = Ok(new_slots);
break;
Expand All @@ -312,7 +312,9 @@ where
let info = get_connection_info(node, self.cluster_params.clone())?;

let mut conn = C::connect(info, Some(self.cluster_params.connection_timeout))?;
if self.read_from_replicas {
if self.cluster_params.read_from_replicas
!= crate::cluster_topology::ReadFromReplicaStrategy::AlwaysFromPrimary
{
// If READONLY is sent to primary nodes, it will have no effect
cmd("READONLY").query(&mut conn)?;
}
Expand Down
9 changes: 7 additions & 2 deletions redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,10 @@ where
) -> RedisResult<Self> {
let connections = Self::create_initial_connections(initial_nodes, &cluster_params).await?;
let inner = Arc::new(InnerCore {
conn_lock: RwLock::new((connections, Default::default())),
conn_lock: RwLock::new((
connections,
SlotMap::new(vec![], cluster_params.read_from_replicas),
)),
cluster_params,
pending_requests: Mutex::new(Vec::new()),
});
Expand Down Expand Up @@ -713,6 +716,7 @@ where
curr_retry,
inner.cluster_params.tls,
num_of_nodes_to_query,
inner.cluster_params.read_from_replicas,
)?;
// Create a new connection vector of the found nodes
let connections: &ConnectionMap<C> = &read_guard.0;
Expand Down Expand Up @@ -1313,7 +1317,8 @@ async fn connect_and_check<C>(node: &str, params: ClusterParams) -> RedisResult<
where
C: ConnectionLike + Connect + Send + 'static,
{
let read_from_replicas = params.read_from_replicas;
let read_from_replicas = params.read_from_replicas
!= crate::cluster_topology::ReadFromReplicaStrategy::AlwaysFromPrimary;
let connection_timeout = params.connection_timeout.into();
let info = get_connection_info(node, params)?;
let mut conn: C = C::connect(info).timeout(connection_timeout).await??;
Expand Down
13 changes: 9 additions & 4 deletions redis/src/cluster_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::time::Duration;

use rand::Rng;

use crate::cluster_topology::ReadFromReplicaStrategy;
use crate::connection::{ConnectionAddr, ConnectionInfo, IntoConnectionInfo};
use crate::types::{ErrorKind, RedisError, RedisResult};
use crate::{cluster, cluster::TlsMode};
Expand All @@ -16,7 +17,7 @@ use crate::cluster_async;
struct BuilderParams {
password: Option<String>,
username: Option<String>,
read_from_replicas: bool,
read_from_replicas: ReadFromReplicaStrategy,
tls: Option<TlsMode>,
retries_configuration: RetryParams,
connection_timeout: Option<Duration>,
Expand Down Expand Up @@ -64,7 +65,7 @@ impl RetryParams {
pub(crate) struct ClusterParams {
pub(crate) password: Option<String>,
pub(crate) username: Option<String>,
pub(crate) read_from_replicas: bool,
pub(crate) read_from_replicas: ReadFromReplicaStrategy,
/// tls indicates tls behavior of connections.
/// When Some(TlsMode), connections use tls and verify certification depends on TlsMode.
/// When None, connections do not use tls.
Expand Down Expand Up @@ -237,7 +238,7 @@ impl ClusterClientBuilder {
/// If enabled, then read queries will go to the replica nodes & write queries will go to the
/// primary nodes. If there are no replica nodes, then all queries will go to the primary nodes.
pub fn read_from_replicas(mut self) -> ClusterClientBuilder {
self.builder_params.read_from_replicas = true;
self.builder_params.read_from_replicas = ReadFromReplicaStrategy::RoundRobin;
self
}

Expand All @@ -258,7 +259,11 @@ impl ClusterClientBuilder {
/// Use `read_from_replicas()`.
#[deprecated(since = "0.22.0", note = "Use read_from_replicas()")]
pub fn readonly(mut self, read_from_replicas: bool) -> ClusterClientBuilder {
self.builder_params.read_from_replicas = read_from_replicas;
self.builder_params.read_from_replicas = if read_from_replicas {
ReadFromReplicaStrategy::RoundRobin
} else {
ReadFromReplicaStrategy::AlwaysFromPrimary
};
self
}
}
Expand Down
12 changes: 2 additions & 10 deletions redis/src/cluster_routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -456,23 +456,15 @@ pub enum SlotAddr {
/// a command is executed
#[derive(Debug, Eq, PartialEq)]
pub(crate) struct SlotAddrs {
primary: String,
replicas: Vec<String>,
pub(crate) primary: String,
pub(crate) replicas: Vec<String>,
}

impl SlotAddrs {
pub(crate) fn new(primary: String, replicas: Vec<String>) -> Self {
Self { primary, replicas }
}

pub(crate) fn slot_addr(&self, slot_addr: SlotAddr) -> &str {
if slot_addr == SlotAddr::Master || self.replicas.is_empty() {
self.primary.as_str()
} else {
self.replicas[0].as_str()
}
}

pub(crate) fn from_slot(slot: Slot) -> Self {
SlotAddrs::new(slot.master, slot.replicas)
}
Expand Down
Loading

0 comments on commit 4307936

Please sign in to comment.