From d48057fcd887010fd6d345fe7ccd8c706a64450d Mon Sep 17 00:00:00 2001 From: Isaac Donaldson Date: Tue, 17 Sep 2024 17:45:20 -0700 Subject: [PATCH 1/4] add trait to allow specifying a queue for a worker --- src/prelude.rs | 2 +- src/worker.rs | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/prelude.rs b/src/prelude.rs index 5c6e23384..67f9200a6 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -39,7 +39,7 @@ pub use crate::{ task::{self, Task, TaskInfo}, validation::{self, Validatable}, validator::Validate, - worker::{self, AppWorker}, + worker::{self, AppWorker, AppWorkerOpts}, Result, }; diff --git a/src/worker.rs b/src/worker.rs index 14629a963..a3291a8c2 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -24,6 +24,8 @@ pub fn get_queues(config_queues: &Option>) -> Vec { queues } +pub type AppWorkerOpts = sidekiq::WorkerOpts; + #[async_trait] #[allow(clippy::module_name_repetitions)] pub trait AppWorker: Worker From 4c59fc1e740f0d67660067e8dd8d86bbca4e3485 Mon Sep 17 00:00:00 2001 From: Isaac Donaldson Date: Tue, 17 Sep 2024 17:45:53 -0700 Subject: [PATCH 2/4] add ability to perform a task after a specified duration --- src/worker.rs | 44 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/src/worker.rs b/src/worker.rs index a3291a8c2..050f790ef 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -34,6 +34,50 @@ where T: Send + Sync + serde::Serialize + 'static, { fn build(ctx: &AppContext) -> Self; + + async fn perform_in(ctx: &AppContext, duration: std::time::Duration, args: T) -> Result<()> { + match &ctx.config.workers.mode { + WorkerMode::BackgroundQueue => { + if let Some(queue) = &ctx.queue { + Self::opts() + .perform_in(queue, duration, args) + .await + .unwrap(); + } else { + error!( + error.msg = + "worker mode requested but no queue connection supplied, skipping job", + "worker_error" + ); + } + } + WorkerMode::ForegroundBlocking => { + std::thread::sleep(duration); + Self::build(ctx).perform(args).await.unwrap(); + } + WorkerMode::BackgroundAsync => { + let dx = ctx.clone(); + tokio::spawn(async move { + // If wait time is larger than a minute (+ 4 seconds), wait in + // intervals of 1 minute to avoid long running tasks in the + // event loop, as well as computer sleep mode or clock changing. + if duration > std::time::Duration::from_secs(64) { + // Give 4 seconds buffer for waking up + let num_sleeps = duration.as_secs() / 60; + for _ in 0..num_sleeps { + tokio::time::sleep(std::time::Duration::from_secs(60)).await; + } + } else { + tokio::time::sleep(duration).await; + } + + Self::build(&dx).perform(args).await + }); + } + } + Ok(()) + } + async fn perform_later(ctx: &AppContext, args: T) -> Result<()> { match &ctx.config.workers.mode { WorkerMode::BackgroundQueue => { From cab0830ee5a549be0973a3ce67c51172240e77eb Mon Sep 17 00:00:00 2001 From: Isaac Donaldson Date: Tue, 17 Sep 2024 17:46:16 -0700 Subject: [PATCH 3/4] update docs to reflect worker ability additions --- docs-site/content/docs/processing/workers.md | 37 +++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) diff --git a/docs-site/content/docs/processing/workers.md b/docs-site/content/docs/processing/workers.md index 5ffc45327..8ce8bd397 100644 --- a/docs-site/content/docs/processing/workers.md +++ b/docs-site/content/docs/processing/workers.md @@ -107,6 +107,20 @@ To use a worker, we mainly think about adding a job to the queue, so you `use` t .await ``` +If you want to add the job be run after a specified time, you can use the `perform_in` method and specify a `std::time::Duration`, for example: + +```rust + // .. in your controller .. + DownloadWorker::perform_in( + &ctx, + std::time::Duration::from_secs(60), // Start job after 60 seconds has passed + DownloadWorkerArgs { + user_guid: "foo".to_string(), + }, + ) + .await +``` + Unlike Rails and Ruby, with Rust you can enjoy _strongly typed_ job arguments which gets serialized and pushed into the queue. ## Creating a new worker @@ -169,9 +183,30 @@ workers: mode: BackgroundQueue ``` +By default, `loco` has 2 queues: `default` and `mailer`. If you want to specify other queues for your workers to use, you have to specify them. Adding a `custom` queue would look like this: + +```yaml + mode: BackgroundQueue + queues: + - custom +``` +And then you can specify which queue to use for each worker by implementing the `opts` function on the `Worker` trait. + +```rust +#[async_trait] +impl Worker for DownloadWorker { + //.. + fn opts() -> worker::AppWorkerOpts { + // this won't run if the queue you supply is not in the config + worker::AppWorkerOpts::new().queue("custom") + } + //.. +} +``` + ## Testing a Worker -You can easily test your worker background jobs using `Loco`. Ensure that your worker is set to the `ForegroundBlocking` mode, which blocks the job, ensuring it runs synchronously. When testing the worker, the test will wait until your worker is completed, allowing you to verify if the worker accomplished its intended tasks. +You can easily test your worker background jobs using `loco`. Ensure that your worker is set to the `ForegroundBlocking` mode, which blocks the job, ensuring it runs synchronously. When testing the worker, the test will wait until your worker is completed, allowing you to verify if the worker accomplished its intended tasks. It's recommended to implement tests in the `tests/workers` directory to consolidate all your worker tests in one place. From e49a21dc792c0814de463a838582ea72677a22f3 Mon Sep 17 00:00:00 2001 From: Isaac Date: Mon, 23 Sep 2024 20:12:32 -0700 Subject: [PATCH 4/4] adjust async sleep to make sense --- src/worker.rs | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/src/worker.rs b/src/worker.rs index 050f790ef..3683b64db 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -58,15 +58,20 @@ where WorkerMode::BackgroundAsync => { let dx = ctx.clone(); tokio::spawn(async move { - // If wait time is larger than a minute (+ 4 seconds), wait in - // intervals of 1 minute to avoid long running tasks in the - // event loop, as well as computer sleep mode or clock changing. - if duration > std::time::Duration::from_secs(64) { - // Give 4 seconds buffer for waking up + // If wait time is larger than a minute, wait in + // intervals of 1 minute to avoid long running tasks + if duration > std::time::Duration::from_secs(60) { let num_sleeps = duration.as_secs() / 60; + let remaining_time = duration.as_secs() % 60; + for _ in 0..num_sleeps { tokio::time::sleep(std::time::Duration::from_secs(60)).await; } + + if remaining_time > 0 { + tokio::time::sleep(std::time::Duration::from_secs(remaining_time)) + .await; + } } else { tokio::time::sleep(duration).await; }