From 1257b56dbb2ca84e4c177cab1ed2f8685d94b240 Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Wed, 17 Jan 2024 12:43:04 +0500 Subject: [PATCH] Port Ent Faktory test: async expiring job --- src/async/proto/single/cmd.rs | 2 +- tests/real/async/community.rs | 4 +- tests/real/async/enterprise.rs | 67 +++++++++++++++++++++++++++++----- 3 files changed, 60 insertions(+), 13 deletions(-) diff --git a/src/async/proto/single/cmd.rs b/src/async/proto/single/cmd.rs index 647d369f..6d07848e 100644 --- a/src/async/proto/single/cmd.rs +++ b/src/async/proto/single/cmd.rs @@ -1,7 +1,7 @@ use tokio::io::AsyncWriteExt; use crate::{ - proto::{Ack, Fail, Fetch, Heartbeat, Hello, Push, QueueAction, QueueControl, Info}, + proto::{Ack, Fail, Fetch, Heartbeat, Hello, Info, Push, QueueAction, QueueControl}, Error, }; diff --git a/tests/real/async/community.rs b/tests/real/async/community.rs index aa300388..d5c5dcb6 100644 --- a/tests/real/async/community.rs +++ b/tests/real/async/community.rs @@ -1,7 +1,7 @@ //! This modle contains a port of the test suite from //! the `tests/real/community.rs` module. -//! -//! Diff: +//! +//! Main diff: //! - tests are marked this `async_` prefix; //! - AsyncConsumerBuilder used instead of ConsumerBuilder; //! - AsyncProducer used instead of Producer; diff --git a/tests/real/async/enterprise.rs b/tests/real/async/enterprise.rs index 05959730..9b908962 100644 --- a/tests/real/async/enterprise.rs +++ b/tests/real/async/enterprise.rs @@ -1,20 +1,67 @@ +//! This modle contains a port of the test suite from +//! the `tests/real/enterprise.rs` module. +//! +//! Main diff: +//! - tests are marked this `async_` prefix; +//! - AsyncConsumerBuilder used instead of ConsumerBuilder; +//! - AsyncProducer used instead of Producer; +//! - await used where needed; + extern crate faktory; -use faktory::{AsyncProducer, JobBuilder}; +use std::io; + +use faktory::{AsyncConsumerBuilder, AsyncProducer, Job, JobBuilder}; +use tokio::time; use crate::skip_if_not_enterprise; use crate::utils::learn_faktory_url; +async fn print_job(j: Job) -> io::Result<()> { + Ok(eprintln!("{:?}", j)) +} + #[tokio::test] -async fn async_enqueue_expiring_job() { +async fn async_ent_expiring_job() { skip_if_not_enterprise!(); let url = learn_faktory_url(); - let mut p = AsyncProducer::connect(Some(&url)).await.unwrap(); - p.enqueue( - JobBuilder::new("order") - .expires_at(chrono::Utc::now() + chrono::Duration::seconds(3)) - .build(), - ) - .await - .unwrap(); + + // prepare a producer ("client" in Faktory terms) and consumer ("worker"): + let mut producer = AsyncProducer::connect(Some(&url)).await.unwrap(); + let mut consumer = AsyncConsumerBuilder::default(); + consumer.register("AnExpiringJob", |j| Box::pin(print_job(j))); + let mut consumer = consumer.connect(Some(&url)).await.unwrap(); + + // prepare an expiring job: + let job_ttl_secs: u64 = 3; + + let ttl = chrono::Duration::seconds(job_ttl_secs as i64); + let job1 = JobBuilder::new("AnExpiringJob") + .args(vec!["ISBN-13:9781718501850"]) + .expires_at(chrono::Utc::now() + ttl) + .build(); + + // enqueue and fetch immediately job1: + producer.enqueue(job1).await.unwrap(); + let had_job = consumer.run_one(0, &["default"]).await.unwrap(); + assert!(had_job); + + // check that the queue is drained: + let had_job = consumer.run_one(0, &["default"]).await.unwrap(); + assert!(!had_job); + + // prepare another one: + let job2 = JobBuilder::new("AnExpiringJob") + .args(vec!["ISBN-13:9781718501850"]) + .expires_at(chrono::Utc::now() + ttl) + .build(); + + // enqueue and then fetch job2, but after ttl: + producer.enqueue(job2).await.unwrap(); + tokio::time::sleep(time::Duration::from_secs(job_ttl_secs * 2)).await; + let had_job = consumer.run_one(0, &["default"]).await.unwrap(); + + // For the non-enterprise edition of Faktory, this assertion will + // fail, which should be taken into account when running the test suite on CI. + assert!(!had_job); }