Skip to content

Commit

Permalink
Added periodic topology checks
Browse files Browse the repository at this point in the history
  • Loading branch information
barshaul committed Oct 2, 2023
1 parent 2d6e6e8 commit 48308b1
Show file tree
Hide file tree
Showing 6 changed files with 254 additions and 39 deletions.
4 changes: 3 additions & 1 deletion redis/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ pin-project-lite = { version = "0.2", optional = true }
tokio-util = { version = "0.7", optional = true }
tokio = { version = "1", features = ["rt", "net", "time"], optional = true }
socket2 = { version = "0.4", default-features = false, optional = true }
fast-math = { version = "0.1.1", optional = true }
dispose = { version = "0.5.0", optional = true }

# Only needed for the connection manager
arc-swap = { version = "1.1.0", optional = true }
Expand Down Expand Up @@ -92,7 +94,7 @@ arcstr = "1.1.5"
[features]
default = ["acl", "streams", "geospatial", "script", "keep-alive"]
acl = []
aio = ["bytes", "pin-project-lite", "futures-util", "futures-util/alloc", "futures-util/sink", "tokio/io-util", "tokio-util", "tokio-util/codec", "tokio/sync", "combine/tokio", "async-trait", "futures-time"]
aio = ["bytes", "pin-project-lite", "futures-util", "futures-util/alloc", "futures-util/sink", "tokio/io-util", "tokio-util", "tokio-util/codec", "tokio/sync", "combine/tokio", "async-trait", "futures-time", "fast-math", "dispose"]
geospatial = []
json = ["serde", "serde/derive", "serde_json"]
cluster = ["crc16", "rand", "derivative"]
Expand Down
11 changes: 10 additions & 1 deletion redis/src/cluster_async/connections_container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use arcstr::ArcStr;
use rand::seq::IteratorRandom;

use crate::cluster_routing::{MultipleNodeRoutingInfo, Route, SlotAddr};
use crate::cluster_topology::{ReadFromReplicaStrategy, SlotMap, SlotMapValue};
use crate::cluster_topology::{ReadFromReplicaStrategy, SlotMap, SlotMapValue, TopologyHash};

type IdentifierType = ArcStr;

Expand All @@ -32,6 +32,7 @@ pub(crate) struct ConnectionsContainer<Connection> {
connection_map: HashMap<Identifier, Option<ClusterNode<Connection>>>,
slot_map: SlotMap,
read_from_replica_strategy: ReadFromReplicaStrategy,
topology_hash: TopologyHash,
}

impl<Connection> Default for ConnectionsContainer<Connection> {
Expand All @@ -40,6 +41,7 @@ impl<Connection> Default for ConnectionsContainer<Connection> {
connection_map: Default::default(),
slot_map: Default::default(),
read_from_replica_strategy: ReadFromReplicaStrategy::AlwaysFromPrimary,
topology_hash: 0,
}
}
}
Expand All @@ -54,6 +56,7 @@ where
slot_map: SlotMap,
connection_map: HashMap<ArcStr, ClusterNode<Connection>>,
read_from_replica_strategy: ReadFromReplicaStrategy,
topology_hash: TopologyHash,
) -> Self {
Self {
connection_map: connection_map
Expand All @@ -62,6 +65,7 @@ where
.collect(),
slot_map,
read_from_replica_strategy,
topology_hash,
}
}

Expand Down Expand Up @@ -221,6 +225,10 @@ where
.filter(|(_, conn_option)| conn_option.is_some())
.count()
}

pub(crate) fn get_current_topology_hash(&self) -> TopologyHash {
self.topology_hash
}
}

#[cfg(test)]
Expand Down Expand Up @@ -310,6 +318,7 @@ mod tests {
slot_map,
connection_map,
read_from_replica_strategy: stragey,
topology_hash: 0,
}
}

Expand Down
146 changes: 120 additions & 26 deletions redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use std::{
net::{IpAddr, SocketAddr},
pin::Pin,
sync::{
atomic::{self, AtomicUsize},
atomic::{self, AtomicUsize, Ordering},
Arc, Mutex,
},
task::{self, Poll},
Expand All @@ -48,12 +48,13 @@ use crate::{
SingleNodeRoutingInfo,
},
cluster_topology::{
calculate_topology, DEFAULT_REFRESH_SLOTS_RETRY_INITIAL_INTERVAL,
DEFAULT_REFRESH_SLOTS_RETRY_TIMEOUT,
calculate_topology, DEFAULT_NUMBER_OF_REFRESH_SLOTS_RETRIES,
DEFAULT_REFRESH_SLOTS_RETRY_INITIAL_INTERVAL, DEFAULT_REFRESH_SLOTS_RETRY_TIMEOUT,
},
Cmd, ConnectionInfo, ErrorKind, IntoConnectionInfo, RedisError, RedisFuture, RedisResult,
Value,
};
use std::time::Duration;

#[cfg(all(not(feature = "tokio-comp"), feature = "async-std-comp"))]
use crate::aio::{async_std::AsyncStd, RedisRuntime};
Expand All @@ -68,13 +69,15 @@ use backoff_tokio::future::retry;
#[cfg(feature = "tokio-comp")]
use backoff_tokio::{Error, ExponentialBackoff};

use dispose::{Disposable, Dispose};
use futures::{
future::{self, BoxFuture},
prelude::*,
ready, stream,
};
use futures_time::future::FutureExt;
use pin_project_lite::pin_project;
use std::sync::atomic::AtomicBool;
use tokio::sync::{
mpsc,
oneshot::{self, Receiver},
Expand Down Expand Up @@ -198,6 +201,7 @@ struct InnerCore<C> {
conn_lock: RwLock<ConnectionsContainer<C>>,
cluster_params: ClusterParams,
pending_requests: Mutex<Vec<PendingRequest<Response, C>>>,
slot_refresh_in_progress: AtomicBool,
}

type Core<C> = Arc<InnerCore<C>>;
Expand All @@ -212,6 +216,14 @@ struct ClusterConnInner<C> {
>,
>,
refresh_error: Option<RedisError>,
// A flag indicating the connection's closure and the requirement to shut down all related tasks.
shutdown_flag: Arc<AtomicBool>,
}

impl<C> Dispose for ClusterConnInner<C> {
fn dispose(self) {
self.shutdown_flag.store(true, Ordering::Relaxed);
}
}

#[derive(Clone)]
Expand Down Expand Up @@ -478,25 +490,42 @@ where
async fn new(
initial_nodes: &[ConnectionInfo],
cluster_params: ClusterParams,
) -> RedisResult<Self> {
) -> RedisResult<Disposable<Self>> {
let connections = Self::create_initial_connections(initial_nodes, &cluster_params).await?;
let topology_checks_interval = cluster_params.topology_checks_interval;
let inner = Arc::new(InnerCore {
conn_lock: RwLock::new(ConnectionsContainer::new(
Default::default(),
connections,
cluster_params.read_from_replicas,
0,
)),
cluster_params,
pending_requests: Mutex::new(Vec::new()),
slot_refresh_in_progress: AtomicBool::new(false),
});
let mut connection = ClusterConnInner {
let shutdown_flag = Arc::new(AtomicBool::new(false));
let connection = ClusterConnInner {
inner,
in_flight_requests: Default::default(),
refresh_error: None,
state: ConnectionState::PollComplete,
shutdown_flag: shutdown_flag.clone(),
};
connection.refresh_slots_with_retries().await?;
Ok(connection)
Self::refresh_slots_with_retries(connection.inner.clone()).await?;
if let Some(duration) = topology_checks_interval {
let periodic_task = ClusterConnInner::periodic_topology_check(
connection.inner.clone(),
duration,
shutdown_flag,
);
#[cfg(feature = "tokio-comp")]
tokio::spawn(periodic_task);
#[cfg(all(not(feature = "tokio-comp"), feature = "async-std-comp"))]
AsyncStd::spawn(periodic_task);
}

Ok(Disposable::new(connection))
}

/// Go through each of the initial nodes and attempt to retrieve all IP entries from them.
Expand Down Expand Up @@ -700,28 +729,91 @@ where
}

// Query a node to discover slot-> master mappings with retries
fn refresh_slots_with_retries(&mut self) -> impl Future<Output = RedisResult<()>> {
let inner = self.inner.clone();
async move {
async fn refresh_slots_with_retries(inner: Arc<InnerCore<C>>) -> RedisResult<()> {
if inner
.slot_refresh_in_progress
.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
.is_err()
{
return Ok(());
}
let retry_strategy = ExponentialBackoff {
initial_interval: DEFAULT_REFRESH_SLOTS_RETRY_INITIAL_INTERVAL,
max_interval: DEFAULT_REFRESH_SLOTS_RETRY_TIMEOUT,
..Default::default()
};
let retries_counter = AtomicUsize::new(0);
let res = retry(retry_strategy, || {
let curr_retry = retries_counter.fetch_add(1, atomic::Ordering::Relaxed);
Self::refresh_slots(inner.clone(), curr_retry).map_err(Error::from)
})
.await;
inner
.slot_refresh_in_progress
.store(false, Ordering::Relaxed);
res
}

async fn periodic_topology_check(
inner: Arc<InnerCore<C>>,
interval_duration: Duration,
shutdown_flag: Arc<AtomicBool>,
) {
loop {
if shutdown_flag.load(Ordering::Relaxed) {
return;
}
#[cfg(feature = "tokio-comp")]
tokio::time::sleep(interval_duration).await;
#[cfg(all(not(feature = "tokio-comp"), feature = "async-std-comp"))]
async_std::task::sleep(interval_duration).await;
let retry_strategy = ExponentialBackoff {
initial_interval: DEFAULT_REFRESH_SLOTS_RETRY_INITIAL_INTERVAL,
max_interval: DEFAULT_REFRESH_SLOTS_RETRY_TIMEOUT,
..Default::default()
};
let retries_counter = AtomicUsize::new(0);
retry(retry_strategy, || {
retries_counter.fetch_add(1, atomic::Ordering::Relaxed);
Self::refresh_slots(
inner.clone(),
retries_counter.load(atomic::Ordering::Relaxed),
)
.map_err(Error::from)
let topology_check_res = retry(retry_strategy, || {
Self::check_for_topology_diff(inner.clone()).map_err(Error::from)
})
.await?;
Ok(())
.await;
if topology_check_res.is_ok() && topology_check_res.unwrap() {
let _ = Self::refresh_slots_with_retries(inner.clone()).await;
};
}
}

/// Queries log2n nodes (where n represents the number of cluster nodes) to determine whether their
/// topology view differs from the one currently stored in the connection manager.
/// Returns true if change was detected, otherwise false.
async fn check_for_topology_diff(inner: Arc<InnerCore<C>>) -> RedisResult<bool> {
let read_guard = inner.conn_lock.read().await;
let num_of_nodes: usize = read_guard.len();
// TODO: Starting from Rust V1.67, integers has logarithms support.
// When we no longer need to support Rust versions < 1.67, remove fast_math and transition to the ilog2 function.
let num_of_nodes_to_query =
std::cmp::max(fast_math::log2_raw(num_of_nodes as f32) as usize, 1);
let requested_nodes = read_guard.random_connections(num_of_nodes_to_query);
let topology_join_results =
futures::future::join_all(requested_nodes.map(|conn| async move {
let mut conn: C = conn.1.await;
conn.req_packed_command(&slot_cmd()).await
}))
.await;
let topology_values: Vec<_> = topology_join_results
.into_iter()
.filter_map(|r| r.ok())
.collect();
let (_, found_topology_hash) = calculate_topology(
topology_values,
DEFAULT_NUMBER_OF_REFRESH_SLOTS_RETRIES,
inner.cluster_params.tls,
num_of_nodes_to_query,
inner.cluster_params.read_from_replicas,
)?;
let change_found = read_guard.get_current_topology_hash() != found_topology_hash;
Ok(change_found)
}

// Query a node to discover slot-> master mappings
async fn refresh_slots(inner: Arc<InnerCore<C>>, curr_retry: usize) -> RedisResult<()> {
let read_guard = inner.conn_lock.read().await;
Expand All @@ -739,7 +831,7 @@ where
.into_iter()
.filter_map(|r| r.ok())
.collect();
let new_slots = calculate_topology(
let (new_slots, topology_hash) = calculate_topology(
topology_values,
curr_retry,
inner.cluster_params.tls,
Expand Down Expand Up @@ -799,6 +891,7 @@ where
new_slots,
new_connections,
inner.cluster_params.read_from_replicas,
topology_hash,
);
Ok(())
}
Expand Down Expand Up @@ -1075,7 +1168,7 @@ where
}
Poll::Ready(Err(err)) => {
self.state = ConnectionState::Recover(RecoverFuture::RecoverSlots(Box::pin(
self.refresh_slots_with_retries(),
Self::refresh_slots_with_retries(self.inner.clone()),
)));
Poll::Ready(Err(err))
}
Expand Down Expand Up @@ -1250,7 +1343,7 @@ impl PollFlushAction {
}
}

impl<C> Sink<Message<C>> for ClusterConnInner<C>
impl<C> Sink<Message<C>> for Disposable<ClusterConnInner<C>>
where
C: ConnectionLike + Connect + Clone + Send + Sync + Unpin + 'static,
{
Expand Down Expand Up @@ -1332,9 +1425,10 @@ where
ConnectionState::PollComplete => match ready!(self.poll_complete(cx)) {
PollFlushAction::None => return Poll::Ready(Ok(())),
PollFlushAction::RebuildSlots => {
self.state = ConnectionState::Recover(RecoverFuture::RecoverSlots(
Box::pin(self.refresh_slots_with_retries()),
));
self.state =
ConnectionState::Recover(RecoverFuture::RecoverSlots(Box::pin(
ClusterConnInner::refresh_slots_with_retries(self.inner.clone()),
)));
}
PollFlushAction::Reconnect(identifiers) => {
self.state = ConnectionState::Recover(RecoverFuture::Reconnect(Box::pin(
Expand Down
14 changes: 14 additions & 0 deletions redis/src/cluster_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ struct BuilderParams {
tls: Option<TlsMode>,
retries_configuration: RetryParams,
connection_timeout: Option<Duration>,
topology_checks_interval: Option<Duration>,
}

#[derive(Clone)]
Expand Down Expand Up @@ -72,6 +73,7 @@ pub(crate) struct ClusterParams {
pub(crate) tls: Option<TlsMode>,
pub(crate) retry_params: RetryParams,
pub(crate) connection_timeout: Duration,
pub(crate) topology_checks_interval: Option<Duration>,
}

impl From<BuilderParams> for ClusterParams {
Expand All @@ -83,6 +85,7 @@ impl From<BuilderParams> for ClusterParams {
tls: value.tls,
retry_params: value.retries_configuration,
connection_timeout: value.connection_timeout.unwrap_or(Duration::MAX),
topology_checks_interval: value.topology_checks_interval,
}
}
}
Expand Down Expand Up @@ -250,6 +253,17 @@ impl ClusterClientBuilder {
self
}

/// Enables periodic topology checks for this client.
///
/// If enabled, periodic topology checks will be executed at the configured intervals to examine whether there
/// have been any changes in the cluster's topology. If a change is detected, it will trigger a slot refresh.
/// Unlike slot refreshments, the periodic topology checks only examine a limited number of nodes to query their
/// topology, ensuring that the check remains quick and efficient.
pub fn periodic_topology_checks(mut self, interval: Duration) -> ClusterClientBuilder {
self.builder_params.topology_checks_interval = Some(interval);
self
}

/// Use `build()`.
#[deprecated(since = "0.22.0", note = "Use build()")]
pub fn open(self) -> RedisResult<ClusterClient> {
Expand Down
Loading

0 comments on commit 48308b1

Please sign in to comment.