Skip to content

Commit

Permalink
Port Ent Faktory test: async expiring job
Browse files Browse the repository at this point in the history
  • Loading branch information
rustworthy committed Jan 17, 2024
1 parent c7f8c0b commit 1257b56
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 13 deletions.
2 changes: 1 addition & 1 deletion src/async/proto/single/cmd.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use tokio::io::AsyncWriteExt;

use crate::{
proto::{Ack, Fail, Fetch, Heartbeat, Hello, Push, QueueAction, QueueControl, Info},
proto::{Ack, Fail, Fetch, Heartbeat, Hello, Info, Push, QueueAction, QueueControl},
Error,
};

Expand Down
4 changes: 2 additions & 2 deletions tests/real/async/community.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! This modle contains a port of the test suite from
//! the `tests/real/community.rs` module.
//!
//! Diff:
//!
//! Main diff:
//! - tests are marked this `async_` prefix;
//! - AsyncConsumerBuilder used instead of ConsumerBuilder;
//! - AsyncProducer used instead of Producer;
Expand Down
67 changes: 57 additions & 10 deletions tests/real/async/enterprise.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,67 @@
//! This modle contains a port of the test suite from
//! the `tests/real/enterprise.rs` module.
//!
//! Main diff:
//! - tests are marked this `async_` prefix;
//! - AsyncConsumerBuilder used instead of ConsumerBuilder;
//! - AsyncProducer used instead of Producer;
//! - await used where needed;
extern crate faktory;

use faktory::{AsyncProducer, JobBuilder};
use std::io;

use faktory::{AsyncConsumerBuilder, AsyncProducer, Job, JobBuilder};
use tokio::time;

use crate::skip_if_not_enterprise;
use crate::utils::learn_faktory_url;

async fn print_job(j: Job) -> io::Result<()> {
Ok(eprintln!("{:?}", j))
}

#[tokio::test]
async fn async_enqueue_expiring_job() {
async fn async_ent_expiring_job() {
skip_if_not_enterprise!();
let url = learn_faktory_url();
let mut p = AsyncProducer::connect(Some(&url)).await.unwrap();
p.enqueue(
JobBuilder::new("order")
.expires_at(chrono::Utc::now() + chrono::Duration::seconds(3))
.build(),
)
.await
.unwrap();

// prepare a producer ("client" in Faktory terms) and consumer ("worker"):
let mut producer = AsyncProducer::connect(Some(&url)).await.unwrap();
let mut consumer = AsyncConsumerBuilder::default();
consumer.register("AnExpiringJob", |j| Box::pin(print_job(j)));
let mut consumer = consumer.connect(Some(&url)).await.unwrap();

// prepare an expiring job:
let job_ttl_secs: u64 = 3;

let ttl = chrono::Duration::seconds(job_ttl_secs as i64);
let job1 = JobBuilder::new("AnExpiringJob")
.args(vec!["ISBN-13:9781718501850"])
.expires_at(chrono::Utc::now() + ttl)
.build();

// enqueue and fetch immediately job1:
producer.enqueue(job1).await.unwrap();
let had_job = consumer.run_one(0, &["default"]).await.unwrap();
assert!(had_job);

// check that the queue is drained:
let had_job = consumer.run_one(0, &["default"]).await.unwrap();
assert!(!had_job);

// prepare another one:
let job2 = JobBuilder::new("AnExpiringJob")
.args(vec!["ISBN-13:9781718501850"])
.expires_at(chrono::Utc::now() + ttl)
.build();

// enqueue and then fetch job2, but after ttl:
producer.enqueue(job2).await.unwrap();
tokio::time::sleep(time::Duration::from_secs(job_ttl_secs * 2)).await;
let had_job = consumer.run_one(0, &["default"]).await.unwrap();

// For the non-enterprise edition of Faktory, this assertion will
// fail, which should be taken into account when running the test suite on CI.
assert!(!had_job);
}

0 comments on commit 1257b56

Please sign in to comment.