From ed7b6b2caaceb9effd4c815f0cb5ba2a701a1790 Mon Sep 17 00:00:00 2001 From: Mohammad Fatemi Date: Tue, 16 Jul 2024 22:30:53 +0330 Subject: [PATCH] feat: Add redis sentinel connection pool --- redis/Cargo.toml | 4 +- redis/src/lib.rs | 3 + redis/src/sentinel/config.rs | 194 ++++++++++++++++++++++++++++++++ redis/src/sentinel/mod.rs | 168 +++++++++++++++++++++++++++ redis/tests/redis_sentinel.rs | 206 ++++++++++++++++++++++++++++++++++ 5 files changed, 574 insertions(+), 1 deletion(-) create mode 100644 redis/src/sentinel/config.rs create mode 100644 redis/src/sentinel/mod.rs create mode 100644 redis/tests/redis_sentinel.rs diff --git a/redis/Cargo.toml b/redis/Cargo.toml index 7f6ea4f..b58fb80 100644 --- a/redis/Cargo.toml +++ b/redis/Cargo.toml @@ -23,6 +23,7 @@ rt_tokio_1 = ["deadpool/rt_tokio_1", "redis/tokio-comp"] rt_async-std_1 = ["deadpool/rt_async-std_1", "redis/async-std-comp"] serde = ["deadpool/serde", "dep:serde"] cluster = ["redis/cluster-async"] +sentinel = ["redis/sentinel", "tokio/sync"] [dependencies] deadpool = { path = "../", version = "0.12.0", default-features = false, features = [ @@ -32,6 +33,7 @@ redis = { version = "0.25", default-features = false, features = ["aio"] } serde = { package = "serde", version = "1.0", features = [ "derive", ], optional = true } +tokio = { version = "1.0", features = ["sync"] } [dev-dependencies] config = { version = "0.14", features = ["json"] } @@ -40,4 +42,4 @@ futures = "0.3.15" redis = { version = "0.25", default-features = false, features = [ "tokio-comp", ] } -tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] } +tokio = { version = "1.0", features = ["macros", "rt-multi-thread", "sync"] } diff --git a/redis/src/lib.rs b/redis/src/lib.rs index 84ed295..f103cdc 100644 --- a/redis/src/lib.rs +++ b/redis/src/lib.rs @@ -25,6 +25,9 @@ pub mod cluster; mod config; +#[cfg(feature = "sentinel")] +pub mod sentinel; + use std::{ ops::{Deref, DerefMut}, sync::atomic::{AtomicUsize, Ordering}, diff --git a/redis/src/sentinel/config.rs b/redis/src/sentinel/config.rs new file mode 100644 index 0000000..3167cfc --- /dev/null +++ b/redis/src/sentinel/config.rs @@ -0,0 +1,194 @@ +use redis::sentinel::SentinelNodeConnectionInfo; +use redis::TlsMode; + +pub use crate::config::ConfigError; +use crate::{ConnectionAddr, ConnectionInfo}; + +use super::{CreatePoolError, Pool, PoolBuilder, PoolConfig, Runtime}; + +/// Configuration object. +/// +/// # Example (from environment) +/// +/// By enabling the `serde` feature you can read the configuration using the +/// [`config`](https://crates.io/crates/config) crate as following: +/// ```env +/// REDIS_CLUSTER__URLS=redis://127.0.0.1:7000,redis://127.0.0.1:7001 +/// REDIS_CLUSTER__POOL__MAX_SIZE=16 +/// REDIS_CLUSTER__POOL__TIMEOUTS__WAIT__SECS=2 +/// REDIS_CLUSTER__POOL__TIMEOUTS__WAIT__NANOS=0 +/// ``` +/// ```rust +/// #[derive(serde::Deserialize)] +/// struct Config { +/// redis_cluster: deadpool_redis::cluster::Config, +/// } +/// +/// impl Config { +/// pub fn from_env() -> Result { +/// let mut cfg = config::Config::builder() +/// .add_source( +/// config::Environment::default() +/// .separator("__") +/// .try_parsing(true) +/// .list_separator(","), +/// ) +/// .build()?; +/// cfg.try_deserialize() +/// } +/// } +/// ``` +#[derive(Clone, Debug)] +#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))] +pub struct Config { + /// Redis URLs. + /// + /// See [Connection Parameters](redis#connection-parameters). + pub urls: Option>, + /// ServerType + /// + /// [`SentinelServerType`] + pub server_type: SentinelServerType, + /// Sentinel setup master name. default value is `mymaster` + pub master_name: String, + /// [`redis::ConnectionInfo`] structures. + pub connections: Option>, + // SentinelNodeConnectionInfo doesn't implement debug, so we can't + // use it as a field, also they have identical fields. + sentinel_connection_info: Option, + /// Pool configuration. + pub pool: Option, +} + +impl Config { + /// Creates a new [`Pool`] using this [`Config`]. + /// + /// # Errors + /// + /// See [`CreatePoolError`] for details. + pub fn create_pool(&self, runtime: Option) -> Result { + let mut builder = self.builder().map_err(CreatePoolError::Config)?; + if let Some(runtime) = runtime { + builder = builder.runtime(runtime); + } + builder.build().map_err(CreatePoolError::Build) + } + + /// Creates a new [`PoolBuilder`] using this [`Config`]. + /// + /// # Errors + /// + /// See [`ConfigError`] for details. + pub fn builder(&self) -> Result { + let sentinel_node_connection_info = self.sentinel_connection_info.clone().map(|c| { + let tls_mode = match c.addr { + ConnectionAddr::TcpTls { insecure: i, .. } => { + if i { + Some(TlsMode::Insecure) + } else { + Some(TlsMode::Secure) + } + } + ConnectionAddr::Unix(_) | ConnectionAddr::Tcp(_, _) => None, + }; + + SentinelNodeConnectionInfo { + tls_mode, + redis_connection_info: Some(c.redis.into()), + } + }); + + let manager = match (&self.urls, &self.connections) { + (Some(urls), None) => super::Manager::new( + urls.iter().map(|url| url.as_str()).collect(), + self.master_name.clone(), + sentinel_node_connection_info, + self.server_type.clone(), + )?, + (None, Some(connections)) => super::Manager::new( + connections.clone(), + self.master_name.clone(), + sentinel_node_connection_info, + self.server_type.clone(), + )?, + (None, None) => super::Manager::new( + vec![ConnectionInfo::default()], + self.master_name.clone(), + sentinel_node_connection_info, + self.server_type.clone(), + )?, + (Some(_), Some(_)) => return Err(ConfigError::UrlAndConnectionSpecified), + }; + let pool_config = self.get_pool_config(); + Ok(Pool::builder(manager).config(pool_config)) + } + + /// Returns [`deadpool::managed::PoolConfig`] which can be used to construct + /// a [`deadpool::managed::Pool`] instance. + #[must_use] + pub fn get_pool_config(&self) -> PoolConfig { + self.pool.unwrap_or_default() + } + + /// Creates a new [`Config`] from the given Redis URL (like + /// `redis://127.0.0.1`). + #[must_use] + pub fn from_urls>>( + urls: T, + master_name: String, + server_type: SentinelServerType, + ) -> Config { + Config { + urls: Some(urls.into()), + connections: None, + server_type, + master_name, + pool: None, + sentinel_connection_info: None, + } + } +} + +impl Default for Config { + fn default() -> Self { + let mut default_connection_info = ConnectionInfo::default(); + default_connection_info.addr = ConnectionAddr::Tcp("127.0.0.1".to_string(), 26379); + + Self { + urls: None, + connections: Some(vec![default_connection_info.clone()]), + server_type: SentinelServerType::Master, + master_name: String::from("mymaster"), + pool: None, + sentinel_connection_info: Some(default_connection_info), + } + } +} + +/// This type is a wrapper for [`redis::sentinel::SentinelServerType`] for serialize/deserialize. +#[derive(Debug, Clone, Copy)] +#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))] +pub enum SentinelServerType { + /// Master connections only + Master, + /// Replica connections only + Replica, +} + +impl From for SentinelServerType { + fn from(value: redis::sentinel::SentinelServerType) -> Self { + match value { + redis::sentinel::SentinelServerType::Master => SentinelServerType::Master, + redis::sentinel::SentinelServerType::Replica => SentinelServerType::Replica, + } + } +} + +impl From for redis::sentinel::SentinelServerType { + fn from(value: SentinelServerType) -> Self { + match value { + SentinelServerType::Master => redis::sentinel::SentinelServerType::Master, + SentinelServerType::Replica => redis::sentinel::SentinelServerType::Replica, + } + } +} diff --git a/redis/src/sentinel/mod.rs b/redis/src/sentinel/mod.rs new file mode 100644 index 0000000..9638641 --- /dev/null +++ b/redis/src/sentinel/mod.rs @@ -0,0 +1,168 @@ +//! This module extends the library to support Redis Cluster. +use std::{ + ops::{Deref, DerefMut}, + sync::atomic::{AtomicUsize, Ordering}, +}; + +use redis; +use redis::aio::MultiplexedConnection; +use redis::sentinel::{SentinelClient, SentinelNodeConnectionInfo}; +use redis::{aio::ConnectionLike, IntoConnectionInfo, RedisError, RedisResult}; +use tokio::sync::Mutex; + +use deadpool::managed; +pub use deadpool::managed::reexports::*; + +use crate::sentinel::config::SentinelServerType; + +pub use self::config::{Config, ConfigError}; + +mod config; + +deadpool::managed_reexports!( + "redis_sentinel", + Manager, + Connection, + RedisError, + ConfigError +); + +type RecycleResult = managed::RecycleResult; + +/// Wrapper around [`redis::cluster_async::ClusterConnection`]. +/// +/// This structure implements [`redis::aio::ConnectionLike`] and can therefore +/// be used just like a regular [`redis::cluster_async::ClusterConnection`]. +#[allow(missing_debug_implementations)] // `redis::cluster_async::ClusterConnection: !Debug` +pub struct Connection { + conn: Object, +} + +impl Connection { + /// Takes this [`Connection`] from its [`Pool`] permanently. + /// + /// This reduces the size of the [`Pool`]. + #[must_use] + pub fn take(this: Self) -> MultiplexedConnection { + Object::take(this.conn) + } +} + +impl From for Connection { + fn from(conn: Object) -> Self { + Self { conn } + } +} + +impl Deref for Connection { + type Target = MultiplexedConnection; + + fn deref(&self) -> &MultiplexedConnection { + &self.conn + } +} + +impl DerefMut for Connection { + fn deref_mut(&mut self) -> &mut MultiplexedConnection { + &mut self.conn + } +} + +impl AsRef for Connection { + fn as_ref(&self) -> &MultiplexedConnection { + &self.conn + } +} + +impl AsMut for Connection { + fn as_mut(&mut self) -> &mut MultiplexedConnection { + &mut self.conn + } +} + +impl ConnectionLike for Connection { + fn req_packed_command<'a>( + &'a mut self, + cmd: &'a redis::Cmd, + ) -> redis::RedisFuture<'a, redis::Value> { + self.conn.req_packed_command(cmd) + } + + fn req_packed_commands<'a>( + &'a mut self, + cmd: &'a redis::Pipeline, + offset: usize, + count: usize, + ) -> redis::RedisFuture<'a, Vec> { + self.conn.req_packed_commands(cmd, offset, count) + } + + fn get_db(&self) -> i64 { + self.conn.get_db() + } +} + +/// [`Manager`] for creating and recycling [`redis::cluster_async`] connections. +/// +/// [`Manager`]: managed::Manager +pub struct Manager { + client: Mutex, + ping_number: AtomicUsize, +} + +impl std::fmt::Debug for Manager { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Manager") + .field("client", &format!("{:p}", &self.client)) + .field("ping_number", &self.ping_number) + .finish() + } +} + +impl Manager { + /// Creates a new [`Manager`] from the given `params`. + /// + /// # Errors + /// + /// If establishing a new [`ClusterClient`] fails. + pub fn new( + param: Vec, + service_name: String, + sentinel_node_connection_info: Option, + server_type: SentinelServerType, + ) -> RedisResult { + Ok(Self { + client: Mutex::new(SentinelClient::build( + param, + service_name, + sentinel_node_connection_info, + server_type.into(), + )?), + ping_number: AtomicUsize::new(0), + }) + } +} + +impl managed::Manager for Manager { + type Type = MultiplexedConnection; + type Error = RedisError; + + async fn create(&self) -> Result { + let mut client = self.client.lock().await; + let conn = client.get_async_connection().await?; + Ok(conn) + } + + async fn recycle(&self, conn: &mut MultiplexedConnection, _: &Metrics) -> RecycleResult { + let ping_number = self.ping_number.fetch_add(1, Ordering::Relaxed).to_string(); + let n = redis::cmd("PING") + .arg(&ping_number) + .query_async::<_, String>(conn) + .await?; + if n == ping_number { + Ok(()) + } else { + Err(managed::RecycleError::message("Invalid PING response")) + } + } +} diff --git a/redis/tests/redis_sentinel.rs b/redis/tests/redis_sentinel.rs new file mode 100644 index 0000000..0cda961 --- /dev/null +++ b/redis/tests/redis_sentinel.rs @@ -0,0 +1,206 @@ +#![cfg(feature = "serde")] + +use futures::FutureExt; +use redis::cmd; +use serde::{Deserialize, Serialize}; + +use deadpool_redis::Runtime; + +#[derive(Debug, Default, Deserialize, Serialize)] +struct Config { + #[serde(default)] + redis: deadpool_redis::sentinel::Config, +} + +impl Config { + pub fn from_env() -> Self { + config::Config::builder() + .add_source(config::Environment::default().separator("__")) + .build() + .unwrap() + .try_deserialize() + .unwrap() + } +} + +fn create_pool() -> deadpool_redis::sentinel::Pool { + let cfg = Config::from_env(); + cfg.redis.create_pool(Some(Runtime::Tokio1)).unwrap() +} + +#[tokio::test] +async fn test_pipeline() { + use deadpool_redis::redis::pipe; + let pool = create_pool(); + let mut conn = pool.get().await.unwrap(); + let (value,): (String,) = pipe() + .cmd("SET") + .arg("deadpool/pipeline_test_key") + .arg("42") + .ignore() + .cmd("GET") + .arg("deadpool/pipeline_test_key") + .query_async(&mut conn) + .await + .unwrap(); + assert_eq!(value, "42".to_string()); +} + +#[tokio::test] +async fn test_high_level_commands() { + use deadpool_redis::redis::AsyncCommands; + let pool = create_pool(); + let mut conn = pool.get().await.unwrap(); + conn.set::<_, _, ()>("deadpool/hlc_test_key", 42) + .await + .unwrap(); + let value: isize = conn.get("deadpool/hlc_test_key").await.unwrap(); + assert_eq!(value, 42); +} + +#[tokio::test] +async fn test_aborted_command() { + let pool = create_pool(); + + { + let mut conn = pool.get().await.unwrap(); + // Poll the future once. This does execute the query but does not + // wait for the response. The connection now has a request queued + // and the response to it will be returned when using the connection + // the next time: + // https://github.com/bikeshedder/deadpool/issues/97 + // https://github.com/mitsuhiko/redis-rs/issues/489 + cmd("PING") + .arg("wrong") + .query_async::<_, String>(&mut conn) + .now_or_never(); + } + { + let mut conn = pool.get().await.unwrap(); + let value: String = cmd("PING") + .arg("right") + .query_async(&mut conn) + .await + .unwrap(); + assert_eq!(value, "right"); + } +} + +#[tokio::test] +async fn test_recycled() { + let pool = create_pool(); + + let client_id = { + let mut conn = pool.get().await.unwrap(); + + cmd("CLIENT") + .arg("ID") + .query_async::<_, i64>(&mut conn) + .await + .unwrap() + }; + + { + let mut conn = pool.get().await.unwrap(); + + let new_client_id = cmd("CLIENT") + .arg("ID") + .query_async::<_, i64>(&mut conn) + .await + .unwrap(); + + assert_eq!( + client_id, new_client_id, + "the redis connection was not recycled" + ); + } +} + +#[tokio::test] +async fn test_recycled_with_watch() { + use deadpool_redis::redis::{pipe, Value}; + + let pool = create_pool(); + + const WATCHED_KEY: &str = "deadpool/watched_test_key"; + const TXN_KEY: &str = "deadpool/txn_test_key"; + + // Start transaction on one key and return connection to pool + let client_with_watch_id = { + let mut conn = pool.get().await.unwrap(); + + let client_id = cmd("CLIENT") + .arg("ID") + .query_async::<_, i64>(&mut conn) + .await + .unwrap(); + + cmd("WATCH") + .arg(WATCHED_KEY) + .query_async::<_, ()>(&mut conn) + .await + .unwrap(); + + client_id + }; + + { + let mut txn_conn = pool.get().await.unwrap(); + + let new_client_id = cmd("CLIENT") + .arg("ID") + .query_async::<_, i64>(&mut txn_conn) + .await + .unwrap(); + + // Ensure that's the same connection as the one in first transaction + assert_eq!( + client_with_watch_id, new_client_id, + "the redis connection with transaction was not recycled" + ); + + // Start transaction on another key + cmd("WATCH") + .arg(TXN_KEY) + .query_async::<_, ()>(&mut txn_conn) + .await + .unwrap(); + + { + let mut writer_conn = pool.get().await.unwrap(); + + // Overwrite key from first transaction from another connection + cmd("SET") + .arg(WATCHED_KEY) + .arg("v") + .query_async::<_, ()>(&mut writer_conn) + .await + .unwrap(); + } + + let get_pipe = pipe() + .atomic() + .get("key2") + .query_async::<_, Value>(&mut txn_conn) + .await + .unwrap(); + let get = cmd("GET") + .arg("key2") + .query_async::<_, Value>(&mut txn_conn) + .await + .unwrap(); + + // Expect that new transaction is not aborted by irrelevant key + let txn_result = pipe() + .atomic() + .set(TXN_KEY, "foo") + .query_async::<_, Value>(&mut txn_conn) + .await + .unwrap(); + assert_eq!( + txn_result, + Value::Bulk(vec![Value::Okay]), + "redis transaction in recycled connection aborted", + ); + } +}