Skip to content

Commit

Permalink
feature(session): add deadpool-redis compatibility (#381)
Browse files Browse the repository at this point in the history
* Add compatibility of `deadpool-redis` for the storage `redis_rs`.

* Keep up-to-date the `actix-redis` version.

* Format the project issued by command `cargo +nightly fmt`.

* Add `deadpool-redis` into the documentation and tests.

* Update CHANGES.md.

* Update the documentation of `Deadpool Redis` section on `redis_rs`.

* Replace `no_run` with `ignore` attribute on "Deadpool Redis" example to skip the doc tests failure.

* Rollback the renaming `redis::cmd` to `cmd` for better reading and avoid shadowing, fix the wrong return type on builder function comment.

* Format the project issued by command `cargo +nightly fmt`.

* Format.

* Fix feature naming from the last merge.

* Fix feature missing from the last merge.

* Format the project issued by command `cargo +nightly fmt`.

* Re-import `cookie-session` feature. (Maybe was removed accidentally from the last merge?)

* tmp

* chore: bump deadpool-redis to 0.16

* chore: fixup rest of redis code for pool

* fix: add missing cfg guard

* docs: fix pool docs

---------

Co-authored-by: Rob Ede <[email protected]>
  • Loading branch information
0rangeFox and robjtede authored Aug 6, 2024
1 parent 31b1dc5 commit 504e894
Show file tree
Hide file tree
Showing 4 changed files with 197 additions and 57 deletions.
1 change: 1 addition & 0 deletions actix-session/CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## Unreleased

- Add `redis-session-rustls` crate feature that enables `rustls`-secured Redis sessions.
- Add `redis-pool` crate feature (off-by-default) which enables `RedisSessionStore::{new, builder}_pooled()` constructors.
- Rename `redis-rs-session` crate feature to `redis-session`.
- Rename `redis-rs-tls-session` crate feature to `redis-session-native-tls`.
- Remove `redis-actor-session` crate feature (and, therefore, the `actix-redis` based storage backend).
Expand Down
2 changes: 2 additions & 0 deletions actix-session/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ cookie-session = []
redis-session = ["dep:redis", "dep:rand"]
redis-session-native-tls = ["redis-session", "redis/tokio-native-tls-comp"]
redis-session-rustls = ["redis-session", "redis/tokio-rustls-comp"]
redis-pool = ["dep:deadpool-redis"]

[dependencies]
actix-service = "2"
Expand All @@ -38,6 +39,7 @@ tracing = { version = "0.1.30", default-features = false, features = ["log"] }

# redis-session
redis = { version = "0.26", default-features = false, features = ["tokio-comp", "connection-manager"], optional = true }
deadpool-redis = { version = "0.16", optional = true }

[dev-dependencies]
actix-session = { path = ".", features = ["cookie-session", "redis-session"] }
Expand Down
9 changes: 5 additions & 4 deletions actix-session/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ mod utils;

#[cfg(feature = "cookie-session")]
pub use self::cookie::CookieSessionStore;
#[cfg(feature = "redis-session")]
pub use self::redis_rs::{RedisSessionStore, RedisSessionStoreBuilder};
#[cfg(feature = "redis-session")]
pub use self::utils::generate_session_key;
pub use self::{
interface::{LoadError, SaveError, SessionStore, UpdateError},
session_key::SessionKey,
};
#[cfg(feature = "redis-session")]
pub use self::{
redis_rs::{RedisSessionStore, RedisSessionStoreBuilder},
utils::generate_session_key,
};
242 changes: 189 additions & 53 deletions actix-session/src/storage/redis_rs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::sync::Arc;

use actix_web::cookie::time::Duration;
use anyhow::Error;
use redis::{aio::ConnectionManager, AsyncCommands, Cmd, FromRedisValue, RedisResult, Value};
use redis::{aio::ConnectionManager, AsyncCommands, Client, Cmd, FromRedisValue, Value};

use super::SessionKey;
use crate::storage::{
Expand Down Expand Up @@ -56,14 +56,38 @@ use crate::storage::{
/// # })
/// ```
///
/// # Pooled Redis Connections
///
/// When the `redis-pool` crate feature is enabled, a pre-existing pool from [`deadpool_redis`] can
/// be provided.
///
/// ```no_run
/// use actix_session::storage::RedisSessionStore;
/// use deadpool_redis::{Config, Runtime};
///
/// let redis_cfg = Config::from_url("redis://127.0.0.1:6379");
/// let redis_pool = redis_cfg.create_pool(Some(Runtime::Tokio1)).unwrap();
///
/// let store = RedisSessionStore::new_pooled(redis_pool);
/// ```
///
/// # Implementation notes
/// `RedisSessionStore` leverages [`redis-rs`] as Redis client.
///
/// [`redis-rs`]: https://github.com/mitsuhiko/redis-rs
/// `RedisSessionStore` leverages the [`redis`] crate as the underlying Redis client.
#[derive(Clone)]
pub struct RedisSessionStore {
configuration: CacheConfiguration,
client: ConnectionManager,
client: RedisSessionConn,
}

#[derive(Clone)]
enum RedisSessionConn {
/// Single connection.
Single(ConnectionManager),

/// Connection pool.
#[cfg(feature = "redis-pool")]
Pool(deadpool_redis::Pool),
}

#[derive(Clone)]
Expand All @@ -80,34 +104,77 @@ impl Default for CacheConfiguration {
}

impl RedisSessionStore {
/// A fluent API to configure [`RedisSessionStore`].
/// It takes as input the only required input to create a new instance of [`RedisSessionStore`] - a
/// connection string for Redis.
pub fn builder<S: Into<String>>(connection_string: S) -> RedisSessionStoreBuilder {
/// Returns a fluent API builder to configure [`RedisSessionStore`].
///
/// It takes as input the only required input to create a new instance of [`RedisSessionStore`]
/// - a connection string for Redis.
pub fn builder(connection_string: impl Into<String>) -> RedisSessionStoreBuilder {
RedisSessionStoreBuilder {
configuration: CacheConfiguration::default(),
connection_string: connection_string.into(),
conn_builder: RedisSessionConnBuilder::Single(connection_string.into()),
}
}

/// Create a new instance of [`RedisSessionStore`] using the default configuration.
/// It takes as input the only required input to create a new instance of [`RedisSessionStore`] - a
/// connection string for Redis.
pub async fn new<S: Into<String>>(
connection_string: S,
) -> Result<RedisSessionStore, anyhow::Error> {
/// Returns a fluent API builder to configure [`RedisSessionStore`].
///
/// It takes as input the only required input to create a new instance of [`RedisSessionStore`]
/// - a pool object for Redis.
#[cfg(feature = "redis-pool")]
pub fn builder_pooled(pool: impl Into<deadpool_redis::Pool>) -> RedisSessionStoreBuilder {
RedisSessionStoreBuilder {
configuration: CacheConfiguration::default(),
conn_builder: RedisSessionConnBuilder::Pool(pool.into()),
}
}

/// Creates a new instance of [`RedisSessionStore`] using the default configuration.
///
/// It takes as input the only required input to create a new instance of [`RedisSessionStore`]
/// - a connection string for Redis.
pub async fn new(connection_string: impl Into<String>) -> Result<RedisSessionStore, Error> {
Self::builder(connection_string).build().await
}

/// Creates a new instance of [`RedisSessionStore`] using the default configuration.
///
/// It takes as input the only required input to create a new instance of [`RedisSessionStore`]
/// - a pool object for Redis.
#[cfg(feature = "redis-pool")]
pub async fn new_pooled(
pool: impl Into<deadpool_redis::Pool>,
) -> anyhow::Result<RedisSessionStore> {
Self::builder_pooled(pool).build().await
}
}

/// A fluent builder to construct a [`RedisSessionStore`] instance with custom configuration
/// parameters.
///
/// [`RedisSessionStore`]: crate::storage::RedisSessionStore
#[must_use]
pub struct RedisSessionStoreBuilder {
connection_string: String,
configuration: CacheConfiguration,
conn_builder: RedisSessionConnBuilder,
}

enum RedisSessionConnBuilder {
/// Single connection string.
Single(String),

/// Pre-built connection pool.
#[cfg(feature = "redis-pool")]
Pool(deadpool_redis::Pool),
}

impl RedisSessionConnBuilder {
async fn into_client(self) -> anyhow::Result<RedisSessionConn> {
Ok(match self {
RedisSessionConnBuilder::Single(conn_string) => {
RedisSessionConn::Single(ConnectionManager::new(Client::open(conn_string)?).await?)
}

#[cfg(feature = "redis-pool")]
RedisSessionConnBuilder::Pool(pool) => RedisSessionConn::Pool(pool),
})
}
}

impl RedisSessionStoreBuilder {
Expand All @@ -120,9 +187,10 @@ impl RedisSessionStoreBuilder {
self
}

/// Finalise the builder and return a [`RedisSessionStore`] instance.
pub async fn build(self) -> Result<RedisSessionStore, anyhow::Error> {
let client = ConnectionManager::new(redis::Client::open(self.connection_string)?).await?;
/// Finalises builder and returns a [`RedisSessionStore`] instance.
pub async fn build(self) -> anyhow::Result<RedisSessionStore> {
let client = self.conn_builder.into_client().await?;

Ok(RedisSessionStore {
configuration: self.configuration,
client,
Expand Down Expand Up @@ -190,7 +258,7 @@ impl SessionStore for RedisSessionStore {

let cache_key = (self.configuration.cache_keygen)(session_key.as_ref());

let v: redis::Value = self
let v: Value = self
.execute_command(redis::cmd("SET").arg(&[
&cache_key,
&body,
Expand Down Expand Up @@ -223,18 +291,29 @@ impl SessionStore for RedisSessionStore {
}
}

async fn update_ttl(&self, session_key: &SessionKey, ttl: &Duration) -> Result<(), Error> {
async fn update_ttl(&self, session_key: &SessionKey, ttl: &Duration) -> anyhow::Result<()> {
let cache_key = (self.configuration.cache_keygen)(session_key.as_ref());

self.client
.clone()
.expire::<_, ()>(&cache_key, ttl.whole_seconds())
.await?;
match self.client {
RedisSessionConn::Single(ref conn) => {
conn.clone()
.expire::<_, ()>(&cache_key, ttl.whole_seconds())
.await?;
}

#[cfg(feature = "redis-pool")]
RedisSessionConn::Pool(ref pool) => {
pool.get()
.await?
.expire::<_, ()>(&cache_key, ttl.whole_seconds())
.await?;
}
}

Ok(())
}

async fn delete(&self, session_key: &SessionKey) -> Result<(), anyhow::Error> {
async fn delete(&self, session_key: &SessionKey) -> Result<(), Error> {
let cache_key = (self.configuration.cache_keygen)(session_key.as_ref());

self.execute_command::<()>(redis::cmd("DEL").arg(&[&cache_key]))
Expand All @@ -261,24 +340,55 @@ impl RedisSessionStore {
/// retry will be executed on a fresh connection, therefore it is likely to succeed (or fail for
/// a different more meaningful reason).
#[allow(clippy::needless_pass_by_ref_mut)]
async fn execute_command<T: FromRedisValue>(&self, cmd: &mut Cmd) -> RedisResult<T> {
async fn execute_command<T: FromRedisValue>(&self, cmd: &mut Cmd) -> anyhow::Result<T> {
let mut can_retry = true;

loop {
match cmd.query_async(&mut self.client.clone()).await {
Ok(value) => return Ok(value),
Err(err) => {
if can_retry && err.is_connection_dropped() {
tracing::debug!(
"Connection dropped while trying to talk to Redis. Retrying."
);

// Retry at most once
can_retry = false;

continue;
} else {
return Err(err);
match self.client {
RedisSessionConn::Single(ref conn) => {
let mut conn = conn.clone();

loop {
match cmd.query_async(&mut conn).await {
Ok(value) => return Ok(value),
Err(err) => {
if can_retry && err.is_connection_dropped() {
tracing::debug!(
"Connection dropped while trying to talk to Redis. Retrying."
);

// Retry at most once
can_retry = false;

continue;
} else {
return Err(err.into());
}
}
}
}
}

#[cfg(feature = "redis-pool")]
RedisSessionConn::Pool(ref pool) => {
let mut conn = pool.get().await?;

loop {
match cmd.query_async(&mut conn).await {
Ok(value) => return Ok(value),
Err(err) => {
if can_retry && err.is_connection_dropped() {
tracing::debug!(
"Connection dropped while trying to talk to Redis. Retrying."
);

// Retry at most once
can_retry = false;

continue;
} else {
return Err(err.into());
}
}
}
}
}
Expand All @@ -291,14 +401,27 @@ mod tests {
use std::collections::HashMap;

use actix_web::cookie::time;
#[cfg(not(feature = "redis-session"))]
use deadpool_redis::{Config, Runtime};

use super::*;
use crate::test_helpers::acceptance_test_suite;

async fn redis_store() -> RedisSessionStore {
RedisSessionStore::new("redis://127.0.0.1:6379")
.await
.unwrap()
#[cfg(feature = "redis-session")]
{
RedisSessionStore::new("redis://127.0.0.1:6379")
.await
.unwrap()
}

#[cfg(not(feature = "redis-session"))]
{
let redis_pool = Config::from_url("redis://127.0.0.1:6379")
.create_pool(Some(Runtime::Tokio1))
.unwrap();
RedisSessionStore::new(redis_pool.clone())
}
}

#[actix_web::test]
Expand All @@ -318,12 +441,25 @@ mod tests {
async fn loading_an_invalid_session_state_returns_deserialization_error() {
let store = redis_store().await;
let session_key = generate_session_key();
store
.client
.clone()
.set::<_, _, ()>(session_key.as_ref(), "random-thing-which-is-not-json")
.await
.unwrap();

match store.client {
RedisSessionConn::Single(ref conn) => conn
.clone()
.set::<_, _, ()>(session_key.as_ref(), "random-thing-which-is-not-json")
.await
.unwrap(),

#[cfg(feature = "redis-pool")]
RedisSessionConn::Pool(ref pool) => {
pool.get()
.await
.unwrap()
.set::<_, _, ()>(session_key.as_ref(), "random-thing-which-is-not-json")
.await
.unwrap();
}
}

assert!(matches!(
store.load(&session_key).await.unwrap_err(),
LoadError::Deserialization(_),
Expand Down

0 comments on commit 504e894

Please sign in to comment.