Skip to content

Commit

Permalink
add num workers config for sidekiq queue (#823)
Browse files Browse the repository at this point in the history
Co-authored-by: Elad Kaplan <[email protected]>
  • Loading branch information
jacovdbergh and kaplanelad authored Oct 13, 2024
1 parent 9a84692 commit 387d094
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 8 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ sqlx = { version = "0.7", default-features = false, features = [
ulid = { version = "1", optional = true }

# bg_redis: redis workers
rusty-sidekiq = { version = "0.8.2", default-features = false, optional = true }
rusty-sidekiq = { version = "0.11.0", default-features = false, optional = true }
bb8 = { version = "0.8.1", optional = true }

[workspace.dependencies]
Expand Down
1 change: 1 addition & 0 deletions src/bgworker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ pub async fn converge(queue: &Queue, config: &QueueConfig) -> Result<()> {
dangerously_flush,
uri: _,
queues: _,
num_workers: _,
}) => {
if *dangerously_flush {
queue.clear().await?;
Expand Down
12 changes: 7 additions & 5 deletions src/bgworker/skq.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use std::{marker::PhantomData, sync::Arc};

use async_trait::async_trait;
use bb8::Pool;
use sidekiq::{Processor, RedisConnectionManager};

use super::{BackgroundWorker, Queue};
use crate::{config::RedisQueueConfig, Result};
use async_trait::async_trait;
use bb8::Pool;
use sidekiq::{Processor, ProcessorConfig, RedisConnectionManager};
pub type RedisPool = Pool<RedisConnectionManager>;

pub struct SidekiqBackgroundWorker<W, A> {
Expand Down Expand Up @@ -119,7 +118,10 @@ pub async fn create_provider(qcfg: &RedisQueueConfig) -> Result<Queue> {
let queues = get_queues(&qcfg.queues);
Ok(Queue::Redis(
redis.clone(),
Arc::new(tokio::sync::Mutex::new(Processor::new(redis, queues))),
Arc::new(tokio::sync::Mutex::new(
Processor::new(redis, queues)
.with_config(ProcessorConfig::default().num_workers(qcfg.num_workers as usize)),
)),
))
}

Expand Down
7 changes: 5 additions & 2 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,9 @@ pub struct RedisQueueConfig {
/// Custom queue names declaration. Useful to model priority queues.
/// First queue in list is more important.
pub queues: Option<Vec<String>>,

#[serde(default = "num_workers")]
pub num_workers: u32,
}

#[derive(Debug, Clone, Deserialize, Serialize)]
Expand Down Expand Up @@ -264,7 +267,7 @@ pub struct PostgresQueueConfig {
#[serde(default = "pgq_poll_interval")]
pub poll_interval_sec: u32,

#[serde(default = "pgq_num_workers")]
#[serde(default = "num_workers")]
pub num_workers: u32,
}

Expand All @@ -288,7 +291,7 @@ fn pgq_poll_interval() -> u32 {
1
}

fn pgq_num_workers() -> u32 {
fn num_workers() -> u32 {
2
}

Expand Down

0 comments on commit 387d094

Please sign in to comment.