Skip to content

Commit

Permalink
feat: Add redis sentinel connection pool
Browse files Browse the repository at this point in the history
  • Loading branch information
smf8 committed Jul 16, 2024
1 parent 7b933c5 commit ed7b6b2
Show file tree
Hide file tree
Showing 5 changed files with 574 additions and 1 deletion.
4 changes: 3 additions & 1 deletion redis/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ rt_tokio_1 = ["deadpool/rt_tokio_1", "redis/tokio-comp"]
rt_async-std_1 = ["deadpool/rt_async-std_1", "redis/async-std-comp"]
serde = ["deadpool/serde", "dep:serde"]
cluster = ["redis/cluster-async"]
sentinel = ["redis/sentinel", "tokio/sync"]

[dependencies]
deadpool = { path = "../", version = "0.12.0", default-features = false, features = [
Expand All @@ -32,6 +33,7 @@ redis = { version = "0.25", default-features = false, features = ["aio"] }
serde = { package = "serde", version = "1.0", features = [
"derive",
], optional = true }
tokio = { version = "1.0", features = ["sync"] }

[dev-dependencies]
config = { version = "0.14", features = ["json"] }
Expand All @@ -40,4 +42,4 @@ futures = "0.3.15"
redis = { version = "0.25", default-features = false, features = [
"tokio-comp",
] }
tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }
tokio = { version = "1.0", features = ["macros", "rt-multi-thread", "sync"] }
3 changes: 3 additions & 0 deletions redis/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
pub mod cluster;
mod config;

#[cfg(feature = "sentinel")]
pub mod sentinel;

use std::{
ops::{Deref, DerefMut},
sync::atomic::{AtomicUsize, Ordering},
Expand Down
194 changes: 194 additions & 0 deletions redis/src/sentinel/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
use redis::sentinel::SentinelNodeConnectionInfo;
use redis::TlsMode;

pub use crate::config::ConfigError;
use crate::{ConnectionAddr, ConnectionInfo};

use super::{CreatePoolError, Pool, PoolBuilder, PoolConfig, Runtime};

/// Configuration object.
///
/// # Example (from environment)
///
/// By enabling the `serde` feature you can read the configuration using the
/// [`config`](https://crates.io/crates/config) crate as following:
/// ```env
/// REDIS_CLUSTER__URLS=redis://127.0.0.1:7000,redis://127.0.0.1:7001
/// REDIS_CLUSTER__POOL__MAX_SIZE=16
/// REDIS_CLUSTER__POOL__TIMEOUTS__WAIT__SECS=2
/// REDIS_CLUSTER__POOL__TIMEOUTS__WAIT__NANOS=0
/// ```
/// ```rust
/// #[derive(serde::Deserialize)]
/// struct Config {
/// redis_cluster: deadpool_redis::cluster::Config,
/// }
///
/// impl Config {
/// pub fn from_env() -> Result<Self, config::ConfigError> {
/// let mut cfg = config::Config::builder()
/// .add_source(
/// config::Environment::default()
/// .separator("__")
/// .try_parsing(true)
/// .list_separator(","),
/// )
/// .build()?;
/// cfg.try_deserialize()
/// }
/// }
/// ```
#[derive(Clone, Debug)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
pub struct Config {
/// Redis URLs.
///
/// See [Connection Parameters](redis#connection-parameters).
pub urls: Option<Vec<String>>,
/// ServerType
///
/// [`SentinelServerType`]
pub server_type: SentinelServerType,
/// Sentinel setup master name. default value is `mymaster`
pub master_name: String,
/// [`redis::ConnectionInfo`] structures.
pub connections: Option<Vec<ConnectionInfo>>,
// SentinelNodeConnectionInfo doesn't implement debug, so we can't
// use it as a field, also they have identical fields.
sentinel_connection_info: Option<ConnectionInfo>,
/// Pool configuration.
pub pool: Option<PoolConfig>,
}

impl Config {
/// Creates a new [`Pool`] using this [`Config`].
///
/// # Errors
///
/// See [`CreatePoolError`] for details.
pub fn create_pool(&self, runtime: Option<Runtime>) -> Result<Pool, CreatePoolError> {
let mut builder = self.builder().map_err(CreatePoolError::Config)?;
if let Some(runtime) = runtime {
builder = builder.runtime(runtime);
}
builder.build().map_err(CreatePoolError::Build)
}

/// Creates a new [`PoolBuilder`] using this [`Config`].
///
/// # Errors
///
/// See [`ConfigError`] for details.
pub fn builder(&self) -> Result<PoolBuilder, ConfigError> {
let sentinel_node_connection_info = self.sentinel_connection_info.clone().map(|c| {
let tls_mode = match c.addr {
ConnectionAddr::TcpTls { insecure: i, .. } => {
if i {
Some(TlsMode::Insecure)
} else {
Some(TlsMode::Secure)
}
}
ConnectionAddr::Unix(_) | ConnectionAddr::Tcp(_, _) => None,
};

SentinelNodeConnectionInfo {
tls_mode,
redis_connection_info: Some(c.redis.into()),
}
});

let manager = match (&self.urls, &self.connections) {
(Some(urls), None) => super::Manager::new(
urls.iter().map(|url| url.as_str()).collect(),
self.master_name.clone(),
sentinel_node_connection_info,
self.server_type.clone(),
)?,
(None, Some(connections)) => super::Manager::new(
connections.clone(),
self.master_name.clone(),
sentinel_node_connection_info,
self.server_type.clone(),
)?,
(None, None) => super::Manager::new(
vec![ConnectionInfo::default()],
self.master_name.clone(),
sentinel_node_connection_info,
self.server_type.clone(),
)?,
(Some(_), Some(_)) => return Err(ConfigError::UrlAndConnectionSpecified),
};
let pool_config = self.get_pool_config();
Ok(Pool::builder(manager).config(pool_config))
}

/// Returns [`deadpool::managed::PoolConfig`] which can be used to construct
/// a [`deadpool::managed::Pool`] instance.
#[must_use]
pub fn get_pool_config(&self) -> PoolConfig {
self.pool.unwrap_or_default()
}

/// Creates a new [`Config`] from the given Redis URL (like
/// `redis://127.0.0.1`).
#[must_use]
pub fn from_urls<T: Into<Vec<String>>>(
urls: T,
master_name: String,
server_type: SentinelServerType,
) -> Config {
Config {
urls: Some(urls.into()),
connections: None,
server_type,
master_name,
pool: None,
sentinel_connection_info: None,
}
}
}

impl Default for Config {
fn default() -> Self {
let mut default_connection_info = ConnectionInfo::default();
default_connection_info.addr = ConnectionAddr::Tcp("127.0.0.1".to_string(), 26379);

Self {
urls: None,
connections: Some(vec![default_connection_info.clone()]),
server_type: SentinelServerType::Master,
master_name: String::from("mymaster"),
pool: None,
sentinel_connection_info: Some(default_connection_info),
}
}
}

/// This type is a wrapper for [`redis::sentinel::SentinelServerType`] for serialize/deserialize.
#[derive(Debug, Clone, Copy)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
pub enum SentinelServerType {
/// Master connections only
Master,
/// Replica connections only
Replica,
}

impl From<redis::sentinel::SentinelServerType> for SentinelServerType {
fn from(value: redis::sentinel::SentinelServerType) -> Self {
match value {
redis::sentinel::SentinelServerType::Master => SentinelServerType::Master,
redis::sentinel::SentinelServerType::Replica => SentinelServerType::Replica,
}
}
}

impl From<SentinelServerType> for redis::sentinel::SentinelServerType {
fn from(value: SentinelServerType) -> Self {
match value {
SentinelServerType::Master => redis::sentinel::SentinelServerType::Master,
SentinelServerType::Replica => redis::sentinel::SentinelServerType::Replica,
}
}
}
168 changes: 168 additions & 0 deletions redis/src/sentinel/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
//! This module extends the library to support Redis Cluster.
use std::{
ops::{Deref, DerefMut},
sync::atomic::{AtomicUsize, Ordering},
};

use redis;
use redis::aio::MultiplexedConnection;
use redis::sentinel::{SentinelClient, SentinelNodeConnectionInfo};
use redis::{aio::ConnectionLike, IntoConnectionInfo, RedisError, RedisResult};
use tokio::sync::Mutex;

use deadpool::managed;
pub use deadpool::managed::reexports::*;

use crate::sentinel::config::SentinelServerType;

pub use self::config::{Config, ConfigError};

mod config;

deadpool::managed_reexports!(
"redis_sentinel",
Manager,
Connection,
RedisError,
ConfigError
);

type RecycleResult = managed::RecycleResult<RedisError>;

/// Wrapper around [`redis::cluster_async::ClusterConnection`].
///
/// This structure implements [`redis::aio::ConnectionLike`] and can therefore
/// be used just like a regular [`redis::cluster_async::ClusterConnection`].
#[allow(missing_debug_implementations)] // `redis::cluster_async::ClusterConnection: !Debug`
pub struct Connection {
conn: Object,
}

impl Connection {
/// Takes this [`Connection`] from its [`Pool`] permanently.
///
/// This reduces the size of the [`Pool`].
#[must_use]
pub fn take(this: Self) -> MultiplexedConnection {
Object::take(this.conn)
}
}

impl From<Object> for Connection {
fn from(conn: Object) -> Self {
Self { conn }
}
}

impl Deref for Connection {
type Target = MultiplexedConnection;

fn deref(&self) -> &MultiplexedConnection {
&self.conn
}
}

impl DerefMut for Connection {
fn deref_mut(&mut self) -> &mut MultiplexedConnection {
&mut self.conn
}
}

impl AsRef<MultiplexedConnection> for Connection {
fn as_ref(&self) -> &MultiplexedConnection {
&self.conn
}
}

impl AsMut<MultiplexedConnection> for Connection {
fn as_mut(&mut self) -> &mut MultiplexedConnection {
&mut self.conn
}
}

impl ConnectionLike for Connection {
fn req_packed_command<'a>(
&'a mut self,
cmd: &'a redis::Cmd,
) -> redis::RedisFuture<'a, redis::Value> {
self.conn.req_packed_command(cmd)
}

fn req_packed_commands<'a>(
&'a mut self,
cmd: &'a redis::Pipeline,
offset: usize,
count: usize,
) -> redis::RedisFuture<'a, Vec<redis::Value>> {
self.conn.req_packed_commands(cmd, offset, count)
}

fn get_db(&self) -> i64 {
self.conn.get_db()
}
}

/// [`Manager`] for creating and recycling [`redis::cluster_async`] connections.
///
/// [`Manager`]: managed::Manager
pub struct Manager {
client: Mutex<SentinelClient>,
ping_number: AtomicUsize,
}

impl std::fmt::Debug for Manager {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Manager")
.field("client", &format!("{:p}", &self.client))
.field("ping_number", &self.ping_number)
.finish()
}
}

impl Manager {
/// Creates a new [`Manager`] from the given `params`.
///
/// # Errors
///
/// If establishing a new [`ClusterClient`] fails.
pub fn new<T: IntoConnectionInfo>(
param: Vec<T>,
service_name: String,
sentinel_node_connection_info: Option<SentinelNodeConnectionInfo>,
server_type: SentinelServerType,
) -> RedisResult<Self> {
Ok(Self {
client: Mutex::new(SentinelClient::build(
param,
service_name,
sentinel_node_connection_info,
server_type.into(),
)?),
ping_number: AtomicUsize::new(0),
})
}
}

impl managed::Manager for Manager {
type Type = MultiplexedConnection;
type Error = RedisError;

async fn create(&self) -> Result<MultiplexedConnection, RedisError> {
let mut client = self.client.lock().await;
let conn = client.get_async_connection().await?;
Ok(conn)
}

async fn recycle(&self, conn: &mut MultiplexedConnection, _: &Metrics) -> RecycleResult {
let ping_number = self.ping_number.fetch_add(1, Ordering::Relaxed).to_string();
let n = redis::cmd("PING")
.arg(&ping_number)
.query_async::<_, String>(conn)
.await?;
if n == ping_number {
Ok(())
} else {
Err(managed::RecycleError::message("Invalid PING response"))
}
}
}
Loading

0 comments on commit ed7b6b2

Please sign in to comment.