From 7203fcc78537cb604f2fecf9e2c9ca2404333696 Mon Sep 17 00:00:00 2001 From: James Lucas Date: Tue, 10 Sep 2024 12:42:44 -0500 Subject: [PATCH] Add redis sentinel support --- omniqueue/Cargo.toml | 2 + omniqueue/src/backends/redis/mod.rs | 78 +++++++++-- omniqueue/src/backends/redis/sentinel.rs | 56 ++++++++ omniqueue/tests/it/main.rs | 2 +- omniqueue/tests/it/redis.rs | 161 ++++++++++++++++++----- omniqueue/tests/it/redis_cluster.rs | 4 + omniqueue/tests/it/redis_fallback.rs | 4 + testing-docker-compose.yml | 36 +++++ 8 files changed, 302 insertions(+), 41 deletions(-) create mode 100644 omniqueue/src/backends/redis/sentinel.rs diff --git a/omniqueue/Cargo.toml b/omniqueue/Cargo.toml index cb74a41..70bd821 100644 --- a/omniqueue/Cargo.toml +++ b/omniqueue/Cargo.toml @@ -39,6 +39,7 @@ serde = { version = "1.0.196", features = ["derive"] } tokio = { version = "1", features = ["macros"] } tokio-executor-trait = "2.1" tokio-reactor-trait = "1.1" +rstest = "0.23.0" [features] default = ["in_memory", "gcp_pubsub", "rabbitmq", "redis", "redis_cluster", "sqs"] @@ -49,6 +50,7 @@ rabbitmq = ["dep:futures-util", "dep:lapin"] rabbitmq-with-message-ids = ["rabbitmq", "dep:svix-ksuid"] redis = ["dep:bb8", "dep:bb8-redis", "dep:redis", "dep:svix-ksuid"] redis_cluster = ["dep:async-trait", "redis", "redis/cluster-async"] +redis_sentinel = ["dep:async-trait", "redis", "redis/sentinel"] sqs = ["dep:aws-config", "dep:aws-sdk-sqs"] azure_queue_storage = ["dep:azure_storage", "dep:azure_storage_queues"] beta = [] diff --git a/omniqueue/src/backends/redis/mod.rs b/omniqueue/src/backends/redis/mod.rs index d6b9ffe..5ec1d0c 100644 --- a/omniqueue/src/backends/redis/mod.rs +++ b/omniqueue/src/backends/redis/mod.rs @@ -41,7 +41,10 @@ use std::{ use bb8::ManageConnection; pub use bb8_redis::RedisConnectionManager; -use redis::{AsyncCommands, ExistenceCheck, SetExpiry, SetOptions}; +use redis::{ + sentinel::SentinelNodeConnectionInfo, AsyncCommands, ExistenceCheck, ProtocolVersion, + RedisConnectionInfo, SetExpiry, SetOptions, TlsMode, +}; use serde::Serialize; use svix_ksuid::KsuidLike; use thiserror::Error; @@ -58,10 +61,14 @@ use crate::{ #[cfg(feature = "redis_cluster")] mod cluster; mod fallback; +#[cfg(feature = "redis_sentinel")] +mod sentinel; mod streams; #[cfg(feature = "redis_cluster")] pub use cluster::RedisClusterConnectionManager; +#[cfg(feature = "redis_sentinel")] +pub use sentinel::RedisSentinelConnectionManager; pub trait RedisConnection: ManageConnection< @@ -69,19 +76,50 @@ pub trait RedisConnection: Error: std::error::Error + Send + Sync + 'static, > { - fn from_dsn(dsn: &str) -> Result; + fn from_config(config: &RedisConfig) -> Result; } impl RedisConnection for RedisConnectionManager { - fn from_dsn(dsn: &str) -> Result { - Self::new(dsn).map_err(QueueError::generic) + fn from_config(config: &RedisConfig) -> Result { + Self::new(config.dsn.as_str()).map_err(QueueError::generic) } } #[cfg(feature = "redis_cluster")] impl RedisConnection for RedisClusterConnectionManager { - fn from_dsn(dsn: &str) -> Result { - Self::new(dsn).map_err(QueueError::generic) + fn from_config(config: &RedisConfig) -> Result { + Self::new(config.dsn.as_str()).map_err(QueueError::generic) + } +} + +#[cfg(feature = "redis_sentinel")] +impl RedisConnection for RedisSentinelConnectionManager { + fn from_config(config: &RedisConfig) -> Result { + let cfg = config + .sentinel_config + .clone() + .ok_or(QueueError::Unsupported("Missing sentinel configuration"))?; + + let tls_mode = cfg.redis_tls_mode_secure.then_some(TlsMode::Secure); + let protocol = if cfg.redis_use_resp3 { + ProtocolVersion::RESP3 + } else { + ProtocolVersion::default() + }; + RedisSentinelConnectionManager::new( + vec![config.dsn.as_str()], + cfg.service_name.clone(), + Some(SentinelNodeConnectionInfo { + tls_mode, + redis_connection_info: Some(RedisConnectionInfo { + db: cfg.redis_db.unwrap_or(0), + username: cfg.redis_username.clone(), + password: cfg.redis_password.clone(), + protocol, + }), + }), + ) + .map_err(QueueError::generic) } } @@ -233,6 +271,17 @@ pub struct RedisConfig { pub payload_key: String, pub ack_deadline_ms: i64, pub dlq_config: Option, + pub sentinel_config: Option, +} + +#[derive(Clone)] +pub struct SentinelConfig { + pub service_name: String, + pub redis_tls_mode_secure: bool, + pub redis_db: Option, + pub redis_username: Option, + pub redis_password: Option, + pub redis_use_resp3: bool, } #[derive(Clone)] @@ -269,6 +318,12 @@ impl RedisBackend { pub fn cluster_builder(config: RedisConfig) -> RedisClusterBackendBuilder { RedisBackendBuilder::new(config) } + + #[cfg(feature = "redis_sentinel")] + /// Creates a new redis sentinel queue builder with the given configuration. + pub fn sentinel_builder(config: RedisConfig) -> RedisSentinelBackendBuilder { + RedisBackendBuilder::new(config) + } } #[allow(deprecated)] @@ -305,8 +360,11 @@ pub struct RedisBackendBuilder { #[cfg(feature = "redis_cluster")] pub type RedisClusterBackendBuilder = RedisBackendBuilder; +#[cfg(feature = "redis_sentinel")] +pub type RedisSentinelBackendBuilder = RedisBackendBuilder; + impl RedisBackendBuilder { - fn new(config: RedisConfig) -> Self { + pub fn new(config: RedisConfig) -> Self { Self { config, use_redis_streams: true, @@ -372,7 +430,7 @@ impl RedisBackendBuilder { } pub async fn build_pair(self) -> Result<(RedisProducer, RedisConsumer)> { - let redis = R::from_dsn(&self.config.dsn)?; + let redis = R::from_config(&self.config)?; let redis = bb8::Pool::builder() .max_size(self.config.max_connections.into()) .build(redis) @@ -407,7 +465,7 @@ impl RedisBackendBuilder { } pub async fn build_producer(self) -> Result> { - let redis = R::from_dsn(&self.config.dsn)?; + let redis = R::from_config(&self.config)?; let redis = bb8::Pool::builder() .max_size(self.config.max_connections.into()) .build(redis) @@ -427,7 +485,7 @@ impl RedisBackendBuilder { } pub async fn build_consumer(self) -> Result> { - let redis = R::from_dsn(&self.config.dsn)?; + let redis = R::from_config(&self.config)?; let redis = bb8::Pool::builder() .max_size(self.config.max_connections.into()) .build(redis) diff --git a/omniqueue/src/backends/redis/sentinel.rs b/omniqueue/src/backends/redis/sentinel.rs new file mode 100644 index 0000000..6346ea1 --- /dev/null +++ b/omniqueue/src/backends/redis/sentinel.rs @@ -0,0 +1,56 @@ +use async_trait::async_trait; +use redis::{ + sentinel::{SentinelClient, SentinelNodeConnectionInfo, SentinelServerType}, + ErrorKind, IntoConnectionInfo, RedisError, +}; +use tokio::sync::Mutex; + +// The mutex here is needed b/c there's currently +// no way to get connections in the redis sentinel client +// without a mutable reference to the underlying client. +struct LockedSentinelClient(pub(crate) Mutex); + +/// ConnectionManager that implements `bb8::ManageConnection` and supports +/// asynchronous Sentinel connections via `redis::sentinel::SentinelClient` +pub struct RedisSentinelConnectionManager { + client: LockedSentinelClient, +} + +impl RedisSentinelConnectionManager { + pub fn new( + info: Vec, + service_name: String, + node_connection_info: Option, + ) -> Result { + Ok(RedisSentinelConnectionManager { + client: LockedSentinelClient(Mutex::new(SentinelClient::build( + info, + service_name, + node_connection_info, + SentinelServerType::Master, + )?)), + }) + } +} + +#[async_trait] +impl bb8::ManageConnection for RedisSentinelConnectionManager { + type Connection = redis::aio::MultiplexedConnection; + type Error = RedisError; + + async fn connect(&self) -> Result { + self.client.0.lock().await.get_async_connection().await + } + + async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> { + let pong: String = redis::cmd("PING").query_async(conn).await?; + match pong.as_str() { + "PONG" => Ok(()), + _ => Err((ErrorKind::ResponseError, "ping request").into()), + } + } + + fn has_broken(&self, _: &mut Self::Connection) -> bool { + false + } +} diff --git a/omniqueue/tests/it/main.rs b/omniqueue/tests/it/main.rs index 34eba23..8187c40 100644 --- a/omniqueue/tests/it/main.rs +++ b/omniqueue/tests/it/main.rs @@ -4,7 +4,7 @@ mod azure_queue_storage; mod gcp_pubsub; #[cfg(feature = "rabbitmq")] mod rabbitmq; -#[cfg(feature = "redis")] +#[cfg(any(feature = "redis", feature = "redis_sentinel"))] mod redis; #[cfg(feature = "redis_cluster")] mod redis_cluster; diff --git a/omniqueue/tests/it/redis.rs b/omniqueue/tests/it/redis.rs index bbad302..51640a4 100644 --- a/omniqueue/tests/it/redis.rs +++ b/omniqueue/tests/it/redis.rs @@ -1,16 +1,22 @@ use std::time::{Duration, Instant}; +use bb8_redis::RedisConnectionManager; use omniqueue::{ backends::{ - redis::{DeadLetterQueueConfig, RedisBackendBuilder}, + redis::{ + DeadLetterQueueConfig, RedisBackendBuilder, RedisConnection, + RedisSentinelConnectionManager, SentinelConfig, + }, RedisBackend, RedisConfig, }, Delivery, }; use redis::{AsyncCommands, Client, Commands}; +use rstest::rstest; use serde::{Deserialize, Serialize}; const ROOT_URL: &str = "redis://localhost"; +const SENTINEL_ROOT_URL: &str = "redis://localhost:26379"; pub struct RedisStreamDrop(String); impl Drop for RedisStreamDrop { @@ -30,21 +36,40 @@ impl Drop for RedisStreamDrop { /// /// This will also return a [`RedisStreamDrop`] to clean up the stream after the /// test ends. -async fn make_test_queue() -> (RedisBackendBuilder, RedisStreamDrop) { +async fn make_test_queue( + dsn: String, +) -> (RedisBackendBuilder, RedisStreamDrop) { let stream_name: String = std::iter::repeat_with(fastrand::alphanumeric) .take(8) .collect(); - let client = Client::open(ROOT_URL).unwrap(); - let mut conn = client.get_multiplexed_async_connection().await.unwrap(); + #[cfg(feature = "redis")] + { + let client = Client::open(ROOT_URL).unwrap(); + let mut conn = client.get_multiplexed_async_connection().await.unwrap(); + let _: () = conn + .xgroup_create_mkstream(&stream_name, "test_cg", 0i8) + .await + .unwrap(); + } - let _: () = conn - .xgroup_create_mkstream(&stream_name, "test_cg", 0i8) - .await + #[cfg(feature = "redis_sentinel")] + { + let mut client = redis::sentinel::SentinelClient::build( + vec![SENTINEL_ROOT_URL], + "master0".to_string(), + None, + redis::sentinel::SentinelServerType::Master, + ) .unwrap(); - + let mut conn = client.get_async_connection().await.unwrap(); + let _: () = conn + .xgroup_create_mkstream(&stream_name, "test_cg", 0i8) + .await + .unwrap(); + } let config = RedisConfig { - dsn: ROOT_URL.to_owned(), + dsn, max_connections: 8, reinsert_on_nack: false, queue_key: stream_name.clone(), @@ -55,14 +80,32 @@ async fn make_test_queue() -> (RedisBackendBuilder, RedisStreamDrop) { payload_key: "payload".to_owned(), ack_deadline_ms: 5_000, dlq_config: None, + sentinel_config: Some(SentinelConfig { + service_name: "master0".to_owned(), + redis_tls_mode_secure: false, + redis_db: None, + redis_username: None, + redis_password: None, + redis_use_resp3: true, + }), }; - (RedisBackend::builder(config), RedisStreamDrop(stream_name)) + ( + RedisBackendBuilder::new(config), + RedisStreamDrop(stream_name), + ) } +#[rstest] +#[cfg_attr(feature = "redis", case(async { make_test_queue::(ROOT_URL.to_owned()).await }))] +#[cfg_attr(feature = "redis_sentinel", case(async { make_test_queue::(SENTINEL_ROOT_URL.to_owned()).await }))] #[tokio::test] -async fn test_raw_send_recv() { - let (builder, _drop) = make_test_queue().await; +async fn test_raw_send_recv( + #[future] + #[case] + get_builder: (RedisBackendBuilder, RedisStreamDrop), +) { + let (builder, _drop1) = get_builder.await; let payload = b"{\"test\": \"data\"}"; let (p, mut c) = builder.build_pair().await.unwrap(); @@ -72,11 +115,18 @@ async fn test_raw_send_recv() { assert_eq!(d.borrow_payload().unwrap(), payload); } +#[rstest] +#[cfg_attr(feature = "redis", case(async { make_test_queue::(ROOT_URL.to_owned()).await }))] +#[cfg_attr(feature = "redis_sentinel", case(async { make_test_queue::(SENTINEL_ROOT_URL.to_owned()).await }))] #[tokio::test] -async fn test_bytes_send_recv() { +async fn test_bytes_send_recv( + #[future] + #[case] + get_builder: (RedisBackendBuilder, RedisStreamDrop), +) { use omniqueue::QueueProducer as _; - let (builder, _drop) = make_test_queue().await; + let (builder, _drop) = get_builder.await; let payload = b"hello"; let (p, mut c) = builder.build_pair().await.unwrap(); @@ -91,10 +141,16 @@ async fn test_bytes_send_recv() { pub struct ExType { a: u8, } - +#[rstest] +#[cfg_attr(feature = "redis", case(async { make_test_queue::(ROOT_URL.to_owned()).await }))] +#[cfg_attr(feature = "redis_sentinel", case(async { make_test_queue::(SENTINEL_ROOT_URL.to_owned()).await }))] #[tokio::test] -async fn test_serde_send_recv() { - let (builder, _drop) = make_test_queue().await; +async fn test_serde_send_recv( + #[future] + #[case] + get_builder: (RedisBackendBuilder, RedisStreamDrop), +) { + let (builder, _drop) = get_builder.await; let payload = ExType { a: 2 }; let (p, mut c) = builder.build_pair().await.unwrap(); @@ -107,9 +163,16 @@ async fn test_serde_send_recv() { /// Consumer will return immediately if there are fewer than max messages to /// start with. +#[rstest] +#[cfg_attr(feature = "redis", case(async { make_test_queue::(ROOT_URL.to_owned()).await }))] +#[cfg_attr(feature = "redis_sentinel", case(async { make_test_queue::(SENTINEL_ROOT_URL.to_owned()).await }))] #[tokio::test] -async fn test_send_recv_all_partial() { - let (builder, _drop) = make_test_queue().await; +async fn test_send_recv_all_partial( + #[future] + #[case] + get_builder: (RedisBackendBuilder, RedisStreamDrop), +) { + let (builder, _drop1) = get_builder.await; let payload = ExType { a: 2 }; let (p, mut c) = builder.build_pair().await.unwrap(); @@ -128,12 +191,19 @@ async fn test_send_recv_all_partial() { /// Consumer should yield items immediately if there's a full batch ready on the /// first poll. +#[rstest] +#[cfg_attr(feature = "redis", case(async { make_test_queue::(ROOT_URL.to_owned()).await }))] +#[cfg_attr(feature = "redis_sentinel", case(async { make_test_queue::(SENTINEL_ROOT_URL.to_owned()).await }))] #[tokio::test] -async fn test_send_recv_all_full() { +async fn test_send_recv_all_full( + #[future] + #[case] + get_builder: (RedisBackendBuilder, RedisStreamDrop), +) { let payload1 = ExType { a: 1 }; let payload2 = ExType { a: 2 }; - let (builder, _drop) = make_test_queue().await; + let (builder, _drop1) = get_builder.await; let (p, mut c) = builder.build_pair().await.unwrap(); @@ -164,14 +234,21 @@ async fn test_send_recv_all_full() { /// Consumer will return the full batch immediately, but also return immediately /// if a partial batch is ready. +#[rstest] +#[cfg_attr(feature = "redis", case(async { make_test_queue::(ROOT_URL.to_owned()).await }))] +#[cfg_attr(feature = "redis_sentinel", case(async { make_test_queue::(SENTINEL_ROOT_URL.to_owned()).await }))] #[tokio::test] -async fn test_send_recv_all_full_then_partial() { +async fn test_send_recv_all_full_then_partial( + #[future] + #[case] + get_builder: (RedisBackendBuilder, RedisStreamDrop), +) { + let (builder, _drop) = get_builder.await; + let payload1 = ExType { a: 1 }; let payload2 = ExType { a: 2 }; let payload3 = ExType { a: 3 }; - let (builder, _drop) = make_test_queue().await; - let (p, mut c) = builder.build_pair().await.unwrap(); p.send_serde_json(&payload1).await.unwrap(); @@ -211,9 +288,16 @@ async fn test_send_recv_all_full_then_partial() { } /// Consumer will NOT wait indefinitely for at least one item. +#[rstest] +#[cfg_attr(feature = "redis", case(async { make_test_queue::(ROOT_URL.to_owned()).await }))] +#[cfg_attr(feature = "redis_sentinel", case(async { make_test_queue::(SENTINEL_ROOT_URL.to_owned()).await }))] #[tokio::test] -async fn test_send_recv_all_late_arriving_items() { - let (builder, _drop) = make_test_queue().await; +async fn test_send_recv_all_late_arriving_items( + #[future] + #[case] + get_builder: (RedisBackendBuilder, RedisStreamDrop), +) { + let (builder, _drop) = get_builder.await; let (_p, mut c) = builder.build_pair().await.unwrap(); @@ -228,10 +312,17 @@ async fn test_send_recv_all_late_arriving_items() { assert!(elapsed <= deadline + Duration::from_millis(200)); } +#[rstest] +#[cfg_attr(feature = "redis", case(async { make_test_queue::(ROOT_URL.to_owned()).await }))] +#[cfg_attr(feature = "redis_sentinel", case(async { make_test_queue::(SENTINEL_ROOT_URL.to_owned()).await }))] #[tokio::test] -async fn test_scheduled() { +async fn test_scheduled( + #[future] + #[case] + get_builder: (RedisBackendBuilder, RedisStreamDrop), +) { let payload1 = ExType { a: 1 }; - let (builder, _drop) = make_test_queue().await; + let (builder, _drop) = get_builder.await; let (p, mut c) = builder.build_pair().await.unwrap(); @@ -250,11 +341,18 @@ async fn test_scheduled() { assert_eq!(Some(payload1), delivery.payload_serde_json().unwrap()); } +#[rstest] +#[cfg_attr(feature = "redis", case(async { make_test_queue::(ROOT_URL.to_owned()).await }))] +#[cfg_attr(feature = "redis_sentinel", case(async { make_test_queue::(SENTINEL_ROOT_URL.to_owned()).await }))] #[tokio::test] -async fn test_pending() { +async fn test_pending( + #[future] + #[case] + get_builder: (RedisBackendBuilder, RedisStreamDrop), +) { let payload1 = ExType { a: 1 }; let payload2 = ExType { a: 2 }; - let (builder, _drop) = make_test_queue().await; + let (builder, _drop) = get_builder.await; let (p, mut c) = builder.build_pair().await.unwrap(); @@ -336,6 +434,7 @@ async fn test_deadletter_config() { queue_key: dlq_key.to_owned(), max_receives, }), + sentinel_config: None, }; let check_dlq = |asserted_len: usize| { @@ -463,6 +562,7 @@ async fn test_deadletter_config_order() { queue_key: dlq_key.to_owned(), max_receives, }), + sentinel_config: None, }; let check_dlq = |asserted_len: usize| { @@ -553,6 +653,7 @@ async fn test_backward_compatible() { queue_key: dlq_key.to_owned(), max_receives, }), + sentinel_config: None, }; let (builder, _drop) = ( diff --git a/omniqueue/tests/it/redis_cluster.rs b/omniqueue/tests/it/redis_cluster.rs index 8cfb94a..0245cc9 100644 --- a/omniqueue/tests/it/redis_cluster.rs +++ b/omniqueue/tests/it/redis_cluster.rs @@ -54,6 +54,7 @@ async fn make_test_queue() -> (RedisClusterBackendBuilder, RedisStreamDrop) { payload_key: "payload".to_owned(), ack_deadline_ms: 5_000, dlq_config: None, + sentinel_config: None, }; ( @@ -338,6 +339,7 @@ async fn test_deadletter_config() { queue_key: dlq_key.to_owned(), max_receives, }), + sentinel_config: None, }; let check_dlq = |asserted_len: usize| { @@ -462,6 +464,7 @@ async fn test_deadletter_config_order() { queue_key: dlq_key.to_owned(), max_receives, }), + sentinel_config: None, }; let check_dlq = |asserted_len: usize| { @@ -551,6 +554,7 @@ async fn test_backward_compatible() { queue_key: dlq_key.to_owned(), max_receives, }), + sentinel_config: None, }; let (builder, _drop) = ( diff --git a/omniqueue/tests/it/redis_fallback.rs b/omniqueue/tests/it/redis_fallback.rs index 347d189..627d38d 100644 --- a/omniqueue/tests/it/redis_fallback.rs +++ b/omniqueue/tests/it/redis_fallback.rs @@ -49,6 +49,7 @@ async fn make_test_queue() -> (RedisBackendBuilder, RedisKeyDrop) { payload_key: "payload".to_owned(), ack_deadline_ms: 5_000, dlq_config: None, + sentinel_config: None, }; ( @@ -328,6 +329,7 @@ async fn test_deadletter_config() { queue_key: dlq_key.to_owned(), max_receives, }), + sentinel_config: None, }; let check_dlq = |asserted_len: usize| { @@ -443,6 +445,7 @@ async fn test_deadletter_config_order() { queue_key: dlq_key.to_owned(), max_receives, }), + sentinel_config: None, }; let check_dlq = |asserted_len: usize| { @@ -525,6 +528,7 @@ async fn test_backward_compatible() { queue_key: dlq_key.to_owned(), max_receives, }), + sentinel_config: None, }; let (builder, _drop) = ( diff --git a/testing-docker-compose.yml b/testing-docker-compose.yml index ed3ccfe..a9b89fe 100644 --- a/testing-docker-compose.yml +++ b/testing-docker-compose.yml @@ -78,6 +78,42 @@ services: ports: - "6385:6379" + redis-sentinel: + image: docker.io/redis:7 + ports: + - "26379:26379" + command: > + sh -c 'echo "bind 0.0.0.0" > /etc/sentinel.conf && + echo "sentinel monitor master0 redis-master-0 6379 2" >> /etc/sentinel.conf && + echo "sentinel resolve-hostnames yes" >> /etc/sentinel.conf && + echo "sentinel down-after-milliseconds master0 10000" >> /etc/sentinel.conf && + echo "sentinel failover-timeout master0 10000" >> /etc/sentinel.conf && + echo "sentinel parallel-syncs master0 1" >> /etc/sentinel.conf && + redis-sentinel /etc/sentinel.conf' + + redis-master-0: + image: docker.io/redis:7 + ports: + - "6387:6379" + + redis-replica-0: + image: docker.io/redis:7 + ports: + - "6388:6379" + command: + [ + "redis-server", + "--appendonly", + "yes", + "--replicaof", + "redis-master-0", + "6379", + "--repl-diskless-load", + "on-empty-db", + "--protected-mode", + "no" + ] + gcp-pubsub: image: gcr.io/google.com/cloudsdktool/google-cloud-cli:emulators ports: