Skip to content

Commit

Permalink
Merge pull request #1711 from scpwiki/rsmq-async-version
Browse files Browse the repository at this point in the history
Upgrade rsmq-async to version 8
  • Loading branch information
emmiegit authored Nov 20, 2023
2 parents 0f38b2a + a6fc802 commit 64e9f7e
Show file tree
Hide file tree
Showing 7 changed files with 25 additions and 21 deletions.
4 changes: 2 additions & 2 deletions deepwell/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion deepwell/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ redis = { version = "0.23", features = ["aio", "connection-manager", "keep-alive
ref-map = "0.1"
regex = "1"
reqwest = { version = "0.11", features = ["json", "rustls-tls"], default-features = false }
rsmq_async = "7"
rsmq_async = "8"
rust-s3 = { version = "0.32", features = ["with-tokio"], default-features = false }
sea-orm = { version = "0.12", features = ["sqlx-postgres", "runtime-tokio-rustls", "postgres-array", "macros", "with-json", "with-time"], default-features = false }
sea-query = "0.30"
Expand Down
10 changes: 6 additions & 4 deletions deepwell/src/config/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,10 +370,12 @@ impl ConfigFile {
job_work_delay: StdDuration::from_millis(job_work_delay_ms),
job_min_poll_delay: StdDuration::from_secs(job_min_poll_delay_secs),
job_max_poll_delay: StdDuration::from_secs(job_max_poll_delay_secs),
job_prune_session_secs,
job_prune_text_secs,
job_name_change_refill_secs,
job_lift_expired_punishments_secs,
job_prune_session: StdDuration::from_secs(job_prune_session_secs),
job_prune_text: StdDuration::from_secs(job_prune_text_secs),
job_name_change_refill: StdDuration::from_secs(job_name_change_refill_secs),
job_lift_expired_punishments: StdDuration::from_secs(
job_lift_expired_punishments_secs,
),
render_timeout: StdDuration::from_millis(render_timeout_ms),
rerender_skip: rerender_skip
.iter()
Expand Down
8 changes: 4 additions & 4 deletions deepwell/src/config/object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,16 +134,16 @@ pub struct Config {
pub job_max_poll_delay: StdDuration,

/// How often to run the "prune expired sessions" recurring job.
pub job_prune_session_secs: u64,
pub job_prune_session: StdDuration,

/// How often to run the "prune unused text" recurring job.
pub job_prune_text_secs: u64,
pub job_prune_text: StdDuration,

/// How often to run the "refill name change tokens" recurring job.
pub job_name_change_refill_secs: u64,
pub job_name_change_refill: StdDuration,

/// How often to run the "lift expired punishments" recurring job.
pub job_lift_expired_punishments_secs: u64,
pub job_lift_expired_punishments: StdDuration,

/// Maximum run time for a render request.
pub render_timeout: StdDuration,
Expand Down
4 changes: 2 additions & 2 deletions deepwell/src/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ pub async fn connect(redis_uri: &str) -> Result<(ConnectionManager, MultiplexedR
// Set up queue if it doesn't already exist
if !job_queue_exists(&mut rsmq).await? {
info!("Creating Redis job queue '{JOB_QUEUE_NAME}'");
info!("* Process time: {JOB_QUEUE_PROCESS_TIME:?} seconds");
info!("* Delay time: {JOB_QUEUE_DELAY:?} seconds");
info!("* Process time: {JOB_QUEUE_PROCESS_TIME:?}");
info!("* Delay time: {JOB_QUEUE_DELAY:?}");
info!("* Maximum body: {JOB_QUEUE_MAXIMUM_SIZE:?} bytes");

rsmq.create_queue(
Expand Down
7 changes: 4 additions & 3 deletions deepwell/src/services/job/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

use super::prelude::*;
use rsmq_async::RsmqConnection;
use std::time::Duration;

pub const JOB_QUEUE_NAME: &str = "job";

Expand All @@ -33,10 +34,10 @@ pub const JOB_QUEUE_NAME: &str = "job";
/// period a job is allowed to run. If a job takes longer than that, then we assume it failed or
/// died. This risks a false positive of still-running jobs, but as long as this time is well
/// above what a job should take to run this risk is minimal.
pub const JOB_QUEUE_PROCESS_TIME: Option<u32> = Some(30);
pub const JOB_QUEUE_PROCESS_TIME: Option<Duration> = Some(Duration::from_secs(30));

/// How long to wait before messages are delivered to consumers.
pub const JOB_QUEUE_DELAY: Option<u32> = None;
pub const JOB_QUEUE_DELAY: Option<Duration> = None;

/// The maximum size, in bytes, that a job payload is allowed to be
///
Expand All @@ -55,7 +56,7 @@ impl JobService {
pub async fn queue_job(
ctx: &ServiceContext<'_>,
job: &Job,
delay: Option<u64>,
delay: Option<Duration>,
) -> Result<()> {
info!("Queuing job {job:?} (delay {delay:?})");
let payload = serde_json::to_vec(job)?;
Expand Down
11 changes: 6 additions & 5 deletions deepwell/src/services/job/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use rsmq_async::{MultiplexedRsmq, RsmqConnection, RsmqMessage};
use sea_orm::TransactionTrait;
use std::convert::Infallible;
use std::sync::Arc;
use std::time::Duration;
use tokio::time;

/// Tells the main loop of the worker whether the queue had an item or not.
Expand All @@ -39,7 +40,7 @@ enum JobProcessStatus {
/// Used to queue a follow-up job, if needed.
#[derive(Debug)]
enum NextJob {
Next { job: Job, delay: Option<u64> },
Next { job: Job, delay: Option<Duration> },
Done,
}

Expand Down Expand Up @@ -188,15 +189,15 @@ impl JobWorker {
SessionService::prune(ctx).await?;
NextJob::Next {
job: Job::PruneSessions,
delay: Some(self.state.config.job_prune_session_secs),
delay: Some(self.state.config.job_prune_session),
}
}
Job::PruneText => {
debug!("Pruning all unused text items from database");
TextService::prune(ctx).await?;
NextJob::Next {
job: Job::PruneText,
delay: Some(self.state.config.job_prune_text_secs),
delay: Some(self.state.config.job_prune_text),
}
}
Job::NameChangeRefill => {
Expand All @@ -208,7 +209,7 @@ impl JobWorker {
// add user credits to each where they are above that time
NextJob::Next {
job: Job::NameChangeRefill,
delay: Some(self.state.config.job_name_change_refill_secs),
delay: Some(self.state.config.job_name_change_refill),
}
}
Job::LiftExpiredPunishments => {
Expand All @@ -222,7 +223,7 @@ impl JobWorker {
// currently only bans are the temporary, but others can be added here
NextJob::Next {
job: Job::LiftExpiredPunishments,
delay: Some(self.state.config.job_lift_expired_punishments_secs),
delay: Some(self.state.config.job_lift_expired_punishments),
}
}
};
Expand Down

0 comments on commit 64e9f7e

Please sign in to comment.