Skip to content

Commit

Permalink
rename age to age_days
Browse files Browse the repository at this point in the history
  • Loading branch information
kaplanelad committed Dec 12, 2024
1 parent 48dd28c commit fa4aadc
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 21 deletions.
34 changes: 23 additions & 11 deletions src/bgworker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,18 +310,22 @@ impl Queue {
async fn get_jobs(
&self,
status: Option<&Vec<JobStatus>>,
age: Option<i64>,
age_days: Option<i64>,
) -> Result<serde_json::Value> {
tracing::debug!(status = ?status, age = ?age, "getting jobs");
tracing::debug!(status = ?status, age_days = ?age_days, "getting jobs");
let jobs = match self {
#[cfg(feature = "bg_pg")]
Self::Postgres(pool, _, _) => {
let jobs = pg::get_jobs(pool, status, age).await.map_err(Box::from)?;
let jobs = pg::get_jobs(pool, status, age_days)
.await
.map_err(Box::from)?;
serde_json::to_value(jobs)?
}
#[cfg(feature = "bg_sqlt")]
Self::Sqlite(pool, _, _) => {
let jobs = sqlt::get_jobs(pool, status, age).await.map_err(Box::from)?;
let jobs = sqlt::get_jobs(pool, status, age_days)
.await
.map_err(Box::from)?;

serde_json::to_value(jobs)?
}
Expand Down Expand Up @@ -383,14 +387,22 @@ impl Queue {
/// - If the Redis provider is selected, it will return an error stating that clearing jobs is not supported.
/// - Any error in the underlying provider's job clearing logic will propagate from the respective function.
///
pub async fn clear_jobs_older_than(&self, age: i64, status: &Vec<JobStatus>) -> Result<()> {
tracing::debug!(age = age, status = ?status, "cancel jobs with age");
pub async fn clear_jobs_older_than(
&self,
age_days: i64,
status: &Vec<JobStatus>,
) -> Result<()> {
tracing::debug!(age_days = age_days, status = ?status, "cancel jobs with age");

match self {
#[cfg(feature = "bg_pg")]
Self::Postgres(pool, _, _) => pg::clear_jobs_older_than(pool, age, Some(status)).await,
Self::Postgres(pool, _, _) => {
pg::clear_jobs_older_than(pool, age_days, Some(status)).await
}
#[cfg(feature = "bg_sqlt")]
Self::Sqlite(pool, _, _) => sqlt::clear_jobs_older_than(pool, age, Some(status)).await,
Self::Sqlite(pool, _, _) => {
sqlt::clear_jobs_older_than(pool, age_days, Some(status)).await
}
Self::Redis(_, _, _) => {
tracing::error!("clear jobs for redis provider not implemented");
Err(Error::string("clear jobs not supported for redis provider"))
Expand Down Expand Up @@ -446,9 +458,9 @@ impl Queue {
&self,
path: &Path,
status: Option<&Vec<JobStatus>>,
age: Option<i64>,
age_days: Option<i64>,
) -> Result<PathBuf> {
tracing::debug!(path = %path.display(), status = ?status, age = ?age, "dumping jobs");
tracing::debug!(path = %path.display(), status = ?status, age_days = ?age_days, "dumping jobs");

if !path.exists() {
tracing::debug!(path = %path.display(), "folder not exists, creating...");
Expand All @@ -460,7 +472,7 @@ impl Queue {
chrono::Utc::now().format("%Y-%m-%d-%H-%M-%S")
));

let jobs = self.get_jobs(status, age).await?;
let jobs = self.get_jobs(status, age_days).await?;

let data = serde_yaml::to_string(&jobs)?;
let mut file = File::create(&dump_file)?;
Expand Down
10 changes: 5 additions & 5 deletions src/bgworker/pg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,14 +365,14 @@ pub async fn clear_by_status(pool: &PgPool, status: Vec<JobStatus>) -> Result<()
/// This function will return an error if it fails
pub async fn clear_jobs_older_than(
pool: &PgPool,
age: i64,
age_days: i64,
status: Option<&Vec<JobStatus>>,
) -> Result<()> {
let mut query_builder = sqlx::query_builder::QueryBuilder::<sqlx::Postgres>::new(
"DELETE FROM pg_loco_queue WHERE created_at < NOW() - INTERVAL '1 day' * ",
);

query_builder.push_bind(age);
query_builder.push_bind(age_days);

if let Some(status_list) = status {
if !status_list.is_empty() {
Expand Down Expand Up @@ -416,7 +416,7 @@ pub async fn ping(pool: &PgPool) -> Result<()> {
pub async fn get_jobs(
pool: &PgPool,
status: Option<&Vec<JobStatus>>,
age: Option<i64>,
age_days: Option<i64>,
) -> Result<Vec<Job>, sqlx::Error> {
let mut query = String::from("SELECT * FROM pg_loco_queue where true");

Expand All @@ -429,9 +429,9 @@ pub async fn get_jobs(
query.push_str(&format!(" AND status in ({status_in})"));
}

if let Some(age) = age {
if let Some(age_days) = age_days {
query.push_str(&format!(
"AND created_at <= NOW() - INTERVAL '1 day' * {age}"
"AND created_at <= NOW() - INTERVAL '1 day' * {age_days}"
));
}

Expand Down
10 changes: 5 additions & 5 deletions src/bgworker/sqlt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -429,10 +429,10 @@ pub async fn clear_by_status(pool: &SqlitePool, status: Vec<JobStatus>) -> Resul
/// This function will return an error if it fails
pub async fn clear_jobs_older_than(
pool: &SqlitePool,
age: i64,
age_days: i64,
status: Option<&Vec<JobStatus>>,
) -> Result<()> {
let cutoff_date = Utc::now() - chrono::Duration::days(age);
let cutoff_date = Utc::now() - chrono::Duration::days(age_days);
let threshold_date = cutoff_date.format("%+").to_string();

let mut query_builder =
Expand Down Expand Up @@ -504,7 +504,7 @@ pub async fn create_provider(qcfg: &SqliteQueueConfig) -> Result<Queue> {
pub async fn get_jobs(
pool: &SqlitePool,
status: Option<&Vec<JobStatus>>,
age: Option<i64>,
age_days: Option<i64>,
) -> Result<Vec<Job>> {
let mut query = String::from("SELECT * FROM sqlt_loco_queue WHERE 1 = 1 ");

Expand All @@ -517,8 +517,8 @@ pub async fn get_jobs(
query.push_str(&format!("AND status IN ({status_in}) "));
}

if let Some(age) = age {
let cutoff_date = Utc::now() - chrono::Duration::days(age);
if let Some(age_days) = age_days {
let cutoff_date = Utc::now() - chrono::Duration::days(age_days);
let threshold_date = cutoff_date.format("%+").to_string();
query.push_str(&format!("AND created_at <= '{threshold_date}' "));
}
Expand Down

0 comments on commit fa4aadc

Please sign in to comment.