diff --git a/docs-site/content/docs/processing/workers.md b/docs-site/content/docs/processing/workers.md index 5ffc45327..8ce8bd397 100644 --- a/docs-site/content/docs/processing/workers.md +++ b/docs-site/content/docs/processing/workers.md @@ -107,6 +107,20 @@ To use a worker, we mainly think about adding a job to the queue, so you `use` t .await ``` +If you want to add the job be run after a specified time, you can use the `perform_in` method and specify a `std::time::Duration`, for example: + +```rust + // .. in your controller .. + DownloadWorker::perform_in( + &ctx, + std::time::Duration::from_secs(60), // Start job after 60 seconds has passed + DownloadWorkerArgs { + user_guid: "foo".to_string(), + }, + ) + .await +``` + Unlike Rails and Ruby, with Rust you can enjoy _strongly typed_ job arguments which gets serialized and pushed into the queue. ## Creating a new worker @@ -169,9 +183,30 @@ workers: mode: BackgroundQueue ``` +By default, `loco` has 2 queues: `default` and `mailer`. If you want to specify other queues for your workers to use, you have to specify them. Adding a `custom` queue would look like this: + +```yaml + mode: BackgroundQueue + queues: + - custom +``` +And then you can specify which queue to use for each worker by implementing the `opts` function on the `Worker` trait. + +```rust +#[async_trait] +impl Worker for DownloadWorker { + //.. + fn opts() -> worker::AppWorkerOpts { + // this won't run if the queue you supply is not in the config + worker::AppWorkerOpts::new().queue("custom") + } + //.. +} +``` + ## Testing a Worker -You can easily test your worker background jobs using `Loco`. Ensure that your worker is set to the `ForegroundBlocking` mode, which blocks the job, ensuring it runs synchronously. When testing the worker, the test will wait until your worker is completed, allowing you to verify if the worker accomplished its intended tasks. +You can easily test your worker background jobs using `loco`. Ensure that your worker is set to the `ForegroundBlocking` mode, which blocks the job, ensuring it runs synchronously. When testing the worker, the test will wait until your worker is completed, allowing you to verify if the worker accomplished its intended tasks. It's recommended to implement tests in the `tests/workers` directory to consolidate all your worker tests in one place. diff --git a/src/prelude.rs b/src/prelude.rs index ea92be2f8..e1f739c2e 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -39,7 +39,7 @@ pub use crate::{ task::{self, Task, TaskInfo}, validation::{self, Validatable}, validator::Validate, - worker::{self, AppWorker}, + worker::{self, AppWorker, AppWorkerOpts}, Result, }; diff --git a/src/worker.rs b/src/worker.rs index 14629a963..3683b64db 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -24,6 +24,8 @@ pub fn get_queues(config_queues: &Option>) -> Vec { queues } +pub type AppWorkerOpts = sidekiq::WorkerOpts; + #[async_trait] #[allow(clippy::module_name_repetitions)] pub trait AppWorker: Worker @@ -32,6 +34,55 @@ where T: Send + Sync + serde::Serialize + 'static, { fn build(ctx: &AppContext) -> Self; + + async fn perform_in(ctx: &AppContext, duration: std::time::Duration, args: T) -> Result<()> { + match &ctx.config.workers.mode { + WorkerMode::BackgroundQueue => { + if let Some(queue) = &ctx.queue { + Self::opts() + .perform_in(queue, duration, args) + .await + .unwrap(); + } else { + error!( + error.msg = + "worker mode requested but no queue connection supplied, skipping job", + "worker_error" + ); + } + } + WorkerMode::ForegroundBlocking => { + std::thread::sleep(duration); + Self::build(ctx).perform(args).await.unwrap(); + } + WorkerMode::BackgroundAsync => { + let dx = ctx.clone(); + tokio::spawn(async move { + // If wait time is larger than a minute, wait in + // intervals of 1 minute to avoid long running tasks + if duration > std::time::Duration::from_secs(60) { + let num_sleeps = duration.as_secs() / 60; + let remaining_time = duration.as_secs() % 60; + + for _ in 0..num_sleeps { + tokio::time::sleep(std::time::Duration::from_secs(60)).await; + } + + if remaining_time > 0 { + tokio::time::sleep(std::time::Duration::from_secs(remaining_time)) + .await; + } + } else { + tokio::time::sleep(duration).await; + } + + Self::build(&dx).perform(args).await + }); + } + } + Ok(()) + } + async fn perform_later(ctx: &AppContext, args: T) -> Result<()> { match &ctx.config.workers.mode { WorkerMode::BackgroundQueue => {