Skip to content

Commit

Permalink
Improve retry logic and update unmaintained dependencies for Rust lin…
Browse files Browse the repository at this point in the history
…t CI (#2673)
  • Loading branch information
barshaul authored Nov 12, 2024
1 parent 0d4b94b commit 0e52ac9
Show file tree
Hide file tree
Showing 12 changed files with 104 additions and 77 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
* Core: Add support for sending multi-slot JSON.MSET and JSON.MGET commands ([#2587]https://github.com/valkey-io/valkey-glide/pull/2587)
* Node: Add `JSON.DEBUG` command ([#2572](https://github.com/valkey-io/valkey-glide/pull/2572))
* Node: Add `JSON.NUMINCRBY` and `JSON.NUMMULTBY` command ([#2555](https://github.com/valkey-io/valkey-glide/pull/2555))
* Core: Improve retry logic and update unmaintained dependencies for Rust lint CI ([#2673](https://github.com/valkey-io/valkey-glide/pull/2643))

#### Breaking Changes

Expand Down
1 change: 0 additions & 1 deletion benchmarks/rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ redis = { path = "../../glide-core/redis-rs/redis", features = ["aio"] }
futures = "0.3.28"
rand = "0.8.5"
itoa = "1.0.6"
futures-time = "^3.0.0"
clap = { version = "4.3.8", features = ["derive"] }
chrono = "0.4.26"
serde_json = "1.0.99"
Expand Down
4 changes: 1 addition & 3 deletions deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ yanked = "deny"
ignore = [
# Unmaintained dependency error that needs more attention due to nested dependencies
"RUSTSEC-2024-0370",
"RUSTSEC-2024-0384",
"RUSTSEC-2024-0388",
]
# Threshold for security vulnerabilities, any vulnerability with a CVSS score
# lower than the range specified will be ignored. Note that ignored advisories
Expand Down Expand Up @@ -59,7 +57,7 @@ allow = [
"Unicode-DFS-2016",
"ISC",
"OpenSSL",
"MPL-2.0",
"MPL-2.0"
]
# The confidence threshold for detecting a license from license text.
# The higher the value, the more closely the license text must be to the
Expand Down
3 changes: 2 additions & 1 deletion glide-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ logger_core = { path = "../logger_core" }
dispose = "0.5.0"
tokio-util = { version = "^0.7", features = ["rt"], optional = true }
num_cpus = { version = "^1.15", optional = true }
tokio-retry = "0.3.0"
tokio-retry2 = {version = "0.5", features = ["jitter"]}

protobuf = { version = "3", features = [
"bytes",
"with-bytes",
Expand Down
12 changes: 4 additions & 8 deletions glide-core/redis-rs/redis/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,25 +53,21 @@ dispose = { version = "0.5.0", optional = true }
# Only needed for the connection manager
arc-swap = { version = "1.7.1" }
futures = { version = "0.3.3", optional = true }
tokio-retry = { version = "0.3.0", optional = true }

# Only needed for the r2d2 feature
r2d2 = { version = "0.8.8", optional = true }

# Only needed for cluster
crc16 = { version = "0.4", optional = true }
rand = { version = "0.8", optional = true }
derivative = { version = "2.2.0", optional = true }

# Only needed for async cluster
dashmap = { version = "6.0", optional = true }

async-trait = { version = "0.1.24", optional = true }

# Only needed for tokio support
backoff-tokio = { package = "backoff", version = "0.4.0", optional = true, features = [
"tokio",
] }
tokio-retry2 = {version = "0.5", features = ["jitter"], optional = true}

# Only needed for native tls
native-tls = { version = "0.2", optional = true }
Expand Down Expand Up @@ -134,7 +130,7 @@ aio = [
]
geospatial = []
json = ["serde", "serde/derive", "serde_json"]
cluster = ["crc16", "rand", "derivative"]
cluster = ["crc16", "rand"]
script = ["sha1_smol"]
tls-native-tls = ["native-tls"]
tls-rustls = [
Expand All @@ -145,10 +141,10 @@ tls-rustls = [
]
tls-rustls-insecure = ["tls-rustls"]
tls-rustls-webpki-roots = ["tls-rustls", "webpki-roots"]
tokio-comp = ["aio", "tokio/net", "backoff-tokio"]
tokio-comp = ["aio", "tokio/net", "tokio-retry2"]
tokio-native-tls-comp = ["tokio-comp", "tls-native-tls", "tokio-native-tls"]
tokio-rustls-comp = ["tokio-comp", "tls-rustls", "tokio-rustls"]
connection-manager = ["futures", "aio", "tokio-retry"]
connection-manager = ["futures", "aio", "tokio-retry2"]
streams = []
cluster-async = ["cluster", "futures", "futures-util", "dashmap"]
keep-alive = ["socket2"]
Expand Down
19 changes: 11 additions & 8 deletions glide-core/redis-rs/redis/src/aio/connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ use futures::{
};
use futures_util::future::BoxFuture;
use std::sync::Arc;
use tokio_retry::strategy::{jitter, ExponentialBackoff};
use tokio_retry::Retry;
use tokio_retry2::strategy::{jitter, ExponentialBackoff};
use tokio_retry2::{Retry, RetryError};

/// A `ConnectionManager` is a proxy that wraps a [multiplexed
/// connection][multiplexed-connection] and automatically reconnects to the
Expand Down Expand Up @@ -191,12 +191,15 @@ impl ConnectionManager {
connection_timeout: std::time::Duration,
) -> RedisResult<MultiplexedConnection> {
let retry_strategy = exponential_backoff.map(jitter).take(number_of_retries);
Retry::spawn(retry_strategy, || {
client.get_multiplexed_async_connection_with_timeouts(
response_timeout,
connection_timeout,
GlideConnectionOptions::default(),
)
Retry::spawn(retry_strategy, || async {
client
.get_multiplexed_async_connection_with_timeouts(
response_timeout,
connection_timeout,
GlideConnectionOptions::default(),
)
.await
.map_err(RetryError::transient)
})
.await
}
Expand Down
54 changes: 25 additions & 29 deletions glide-core/redis-rs/redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@ use crate::{
client::GlideConnectionOptions,
cluster_routing::{Routable, RoutingInfo},
cluster_slotmap::SlotMap,
cluster_topology::SLOT_SIZE,
cluster_topology::{
calculate_topology, get_slot, SlotRefreshState, DEFAULT_NUMBER_OF_REFRESH_SLOTS_RETRIES,
DEFAULT_REFRESH_SLOTS_RETRY_BASE_DURATION_MILLIS, DEFAULT_REFRESH_SLOTS_RETRY_BASE_FACTOR,
SLOT_SIZE,
},
cmd,
commands::cluster_scan::{cluster_scan, ClusterScanArgs, ObjectType, ScanStateRC},
FromRedisValue, InfoDict, ToRedisArgs,
Expand Down Expand Up @@ -69,10 +73,6 @@ use crate::{
self, MultipleNodeRoutingInfo, Redirect, ResponsePolicy, Route, SingleNodeRoutingInfo,
SlotAddr,
},
cluster_topology::{
calculate_topology, get_slot, SlotRefreshState, DEFAULT_NUMBER_OF_REFRESH_SLOTS_RETRIES,
DEFAULT_REFRESH_SLOTS_RETRY_INITIAL_INTERVAL, DEFAULT_REFRESH_SLOTS_RETRY_MAX_INTERVAL,
},
connection::{PubSubSubscriptionInfo, PubSubSubscriptionKind},
push_manager::PushInfo,
Cmd, ConnectionInfo, ErrorKind, IntoConnectionInfo, RedisError, RedisFuture, RedisResult,
Expand All @@ -84,9 +84,10 @@ use std::time::Duration;
#[cfg(feature = "tokio-comp")]
use async_trait::async_trait;
#[cfg(feature = "tokio-comp")]
use backoff_tokio::future::retry;
use tokio_retry2::strategy::{jitter_range, ExponentialFactorBackoff};
#[cfg(feature = "tokio-comp")]
use backoff_tokio::{Error as BackoffError, ExponentialBackoff};
use tokio_retry2::{Retry, RetryError};

#[cfg(feature = "tokio-comp")]
use tokio::{sync::Notify, time::timeout};

Expand Down Expand Up @@ -1518,16 +1519,24 @@ where

let mut res = Ok(());
if !skip_slots_refresh {
let retry_strategy = ExponentialBackoff {
initial_interval: DEFAULT_REFRESH_SLOTS_RETRY_INITIAL_INTERVAL,
max_interval: DEFAULT_REFRESH_SLOTS_RETRY_MAX_INTERVAL,
max_elapsed_time: None,
..Default::default()
};
let retry_strategy = ExponentialFactorBackoff::from_millis(
DEFAULT_REFRESH_SLOTS_RETRY_BASE_DURATION_MILLIS,
DEFAULT_REFRESH_SLOTS_RETRY_BASE_FACTOR,
)
.map(jitter_range(0.8, 1.2))
.take(DEFAULT_NUMBER_OF_REFRESH_SLOTS_RETRIES);
let retries_counter = AtomicUsize::new(0);
res = retry(retry_strategy, || {
res = Retry::spawn(retry_strategy, || async {
let curr_retry = retries_counter.fetch_add(1, atomic::Ordering::Relaxed);
Self::refresh_slots(inner.clone(), curr_retry)
.await
.map_err(|err| {
if err.kind() == ErrorKind::AllConnectionsUnavailable {
RetryError::permanent(err)
} else {
RetryError::transient(err)
}
})
})
.await;
}
Expand Down Expand Up @@ -1706,26 +1715,13 @@ where
false
}

async fn refresh_slots(
inner: Arc<InnerCore<C>>,
curr_retry: usize,
) -> Result<(), BackoffError<RedisError>> {
async fn refresh_slots(inner: Arc<InnerCore<C>>, curr_retry: usize) -> RedisResult<()> {
// Update the slot refresh last run timestamp
let now = SystemTime::now();
let mut last_run_wlock = inner.slot_refresh_state.last_run.write().await;
*last_run_wlock = Some(now);
drop(last_run_wlock);
Self::refresh_slots_inner(inner, curr_retry)
.await
.map_err(|err| {
if curr_retry > DEFAULT_NUMBER_OF_REFRESH_SLOTS_RETRIES
|| err.kind() == ErrorKind::AllConnectionsUnavailable
{
BackoffError::Permanent(err)
} else {
BackoffError::from(err)
}
})
Self::refresh_slots_inner(inner, curr_retry).await
}

pub(crate) fn check_if_all_slots_covered(slot_map: &SlotMap) -> bool {
Expand Down
22 changes: 12 additions & 10 deletions glide-core/redis-rs/redis/src/cluster_topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use crate::cluster_slotmap::{ReadFromReplicaStrategy, SlotMap};
use crate::{cluster::TlsMode, ErrorKind, RedisError, RedisResult, Value};
#[cfg(all(feature = "cluster-async", not(feature = "tokio-comp")))]
use async_std::sync::RwLock;
use derivative::Derivative;
use std::collections::{hash_map::DefaultHasher, HashMap};
use std::hash::{Hash, Hasher};
use std::sync::atomic::AtomicBool;
Expand All @@ -21,11 +20,10 @@ use tracing::info;
// Exponential backoff constants for retrying a slot refresh
/// The default number of refresh topology retries in the same call
pub const DEFAULT_NUMBER_OF_REFRESH_SLOTS_RETRIES: usize = 3;
/// The default maximum interval between two retries of the same call for topology refresh
pub const DEFAULT_REFRESH_SLOTS_RETRY_MAX_INTERVAL: Duration = Duration::from_secs(1);
/// The default initial interval for retrying topology refresh
pub const DEFAULT_REFRESH_SLOTS_RETRY_INITIAL_INTERVAL: Duration = Duration::from_millis(500);

/// The default base duration for retrying topology refresh
pub const DEFAULT_REFRESH_SLOTS_RETRY_BASE_DURATION_MILLIS: u64 = 500;
/// The default base factor for retrying topology refresh
pub const DEFAULT_REFRESH_SLOTS_RETRY_BASE_FACTOR: f64 = 1.5;
// Constants for the intervals between two independent consecutive refresh slots calls
/// The default wait duration between two consecutive refresh slots calls
#[cfg(feature = "cluster-async")]
Expand Down Expand Up @@ -58,17 +56,21 @@ impl SlotRefreshState {
}
}

#[derive(Derivative)]
#[derivative(PartialEq, Eq)]
#[derive(Debug)]
pub(crate) struct TopologyView {
pub(crate) hash_value: TopologyHash,
#[derivative(PartialEq = "ignore")]
pub(crate) nodes_count: u16,
#[derivative(PartialEq = "ignore")]
slots_and_count: (u16, Vec<Slot>),
}

impl PartialEq for TopologyView {
fn eq(&self, other: &Self) -> bool {
self.hash_value == other.hash_value
}
}

impl Eq for TopologyView {}

pub(crate) fn slot(key: &[u8]) -> u16 {
crc16::State::<crc16::XMODEM>::calculate(key) % SLOT_SIZE
}
Expand Down
8 changes: 6 additions & 2 deletions glide-core/src/client/reconnecting_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use telemetrylib::Telemetry;
use tokio::sync::{mpsc, Notify};
use tokio::task;
use tokio::time::timeout;
use tokio_retry::Retry;
use tokio_retry2::{Retry, RetryError};

use super::{run_with_timeout, DEFAULT_CONNECTION_ATTEMPT_TIMEOUT};

Expand Down Expand Up @@ -121,7 +121,11 @@ async fn create_connection(
TokioDisconnectNotifier::new(),
)),
};
let action = || get_multiplexed_connection(client, &connection_options);
let action = || async {
get_multiplexed_connection(client, &connection_options)
.await
.map_err(RetryError::transient)
};

match Retry::spawn(retry_strategy.get_iterator(), action).await {
Ok(connection) => {
Expand Down
30 changes: 23 additions & 7 deletions glide-core/src/retry_strategies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
*/
use crate::client::ConnectionRetryStrategy;
use std::time::Duration;
use tokio_retry::strategy::{jitter, ExponentialBackoff};
use tokio_retry2::strategy::{jitter_range, ExponentialBackoff};

#[derive(Clone, Debug)]
pub(super) struct RetryStrategy {
Expand All @@ -27,7 +27,7 @@ impl RetryStrategy {
pub(super) fn get_iterator(&self) -> impl Iterator<Item = Duration> {
ExponentialBackoff::from_millis(self.exponent_base as u64)
.factor(self.factor as u64)
.map(jitter)
.map(jitter_range(0.8, 1.2))
.take(self.number_of_retries as usize)
}
}
Expand Down Expand Up @@ -78,23 +78,39 @@ mod tests {
let mut counter = 0;
for duration in intervals {
counter += 1;
assert!(duration.as_millis() <= interval_duration as u128);
let upper_limit = (interval_duration as f32 * 1.2) as u128;
let lower_limit = (interval_duration as f32 * 0.8) as u128;
assert!(
lower_limit <= duration.as_millis() || duration.as_millis() <= upper_limit,
"{:?}ms <= {:?}ms <= {:?}ms",
lower_limit,
duration.as_millis(),
upper_limit
);
}
assert_eq!(counter, retries);
}

#[test]
fn test_exponential_backoff_with_jitter() {
let retries = 3;
let base = 10;
let factor = 5;
let retries = 5;
let base = 2;
let factor = 100;
let intervals = get_exponential_backoff(base, factor, retries).get_iterator();

let mut counter = 0;
for duration in intervals {
counter += 1;
let unjittered_duration = factor * (base.pow(counter));
assert!(duration.as_millis() <= unjittered_duration as u128);
let upper_limit = (unjittered_duration as f32 * 1.2) as u128;
let lower_limit = (unjittered_duration as f32 * 0.8) as u128;
assert!(
lower_limit <= duration.as_millis() || duration.as_millis() <= upper_limit,
"{:?}ms <= {:?}ms <= {:?}ms",
lower_limit,
duration.as_millis(),
upper_limit
);
}

assert_eq!(counter, retries);
Expand Down
1 change: 0 additions & 1 deletion go/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ redis = { path = "../glide-core/redis-rs/redis", features = ["aio", "tokio-comp"
glide-core = { path = "../glide-core", features = ["socket-layer"] }
tokio = { version = "^1", features = ["rt", "macros", "rt-multi-thread", "time"] }
protobuf = { version = "3.3.0", features = [] }
derivative = "2.2.0"

[profile.release]
lto = true
Expand Down
Loading

0 comments on commit 0e52ac9

Please sign in to comment.