diff --git a/tests/real/async/enterprise.rs b/tests/real/async/enterprise.rs index 85f7f019..123b22b1 100644 --- a/tests/real/async/enterprise.rs +++ b/tests/real/async/enterprise.rs @@ -12,7 +12,7 @@ extern crate faktory; use std::io; -use faktory::{AsyncConsumerBuilder, AsyncProducer, Job, JobBuilder}; +use faktory::{AsyncConsumerBuilder, AsyncProducer, ConsumerBuilder, Job, JobBuilder}; use tokio::time; use crate::skip_if_not_enterprise; @@ -169,3 +169,211 @@ async fn async_ent_unique_job() { let had_job = consumer.run_one(0, &[queue_name]).await.unwrap(); assert!(!had_job); } + +#[tokio::test(flavor = "multi_thread")] +async fn async_ent_unique_job_until_success() { + use faktory::error; + use std::io; + use tokio::time; + + skip_if_not_enterprise!(); + + let url = learn_faktory_url(); + + let queue_name = "ent_unique_job_until_success"; + let job_type = "order"; + + // the job will be being executed for at least 3 seconds, + // but is unique for 4 seconds; + let difficulty_level = 3; + let unique_for = 4; + + let url1 = url.clone(); + let handle = tokio::spawn(async move { + // prepare producer and consumer, where the former can + // send a job difficulty level as a job's args and the lattter + // will sleep for a corresponding period of time, pretending + // to work hard: + let mut producer_a = AsyncProducer::connect(Some(&url1)).await.unwrap(); + let mut consumer_a = ConsumerBuilder::default_async(); + consumer_a.register(job_type, |job| { + Box::pin(async move { + let args = job.args().to_owned(); + let mut args = args.iter(); + let diffuculty_level = args + .next() + .expect("job difficulty level is there") + .to_owned(); + let sleep_secs = + serde_json::from_value::(diffuculty_level).expect("a valid number"); + time::sleep(time::Duration::from_secs(sleep_secs as u64)).await; + eprintln!("{:?}", job); + Ok::<(), io::Error>(()) + }) + }); + let mut consumer_a = consumer_a.connect(Some(&url1)).await.unwrap(); + let job = JobBuilder::new(job_type) + .args(vec![difficulty_level]) + .queue(queue_name) + .unique_for(unique_for) + .unique_until_success() // Faktory's default + .build(); + producer_a.enqueue(job).await.unwrap(); + let had_job = consumer_a.run_one(0, &[queue_name]).await.unwrap(); + assert!(had_job); + }); + + // let spawned thread gain momentum: + time::sleep(time::Duration::from_secs(1)).await; + + // continue + let mut producer_b = AsyncProducer::connect(Some(&url)).await.unwrap(); + + // this one is a 'duplicate' because the job is still + // being executed in the spawned thread: + let job = JobBuilder::new(job_type) + .args(vec![difficulty_level]) + .queue(queue_name) + .unique_for(unique_for) + .build(); + + // as a result: + let res = producer_b.enqueue(job).await.unwrap_err(); + if let error::Error::Protocol(error::Protocol::UniqueConstraintViolation { msg }) = res { + assert_eq!(msg, "Job not unique"); + } else { + panic!("Expected protocol error.") + } + + handle.await.expect("should join successfully"); + + // Now that the job submitted in a spawned thread has been successfully executed + // (with ACK sent to server), the producer 'B' can push another one: + producer_b + .enqueue( + JobBuilder::new(job_type) + .args(vec![difficulty_level]) + .queue(queue_name) + .unique_for(unique_for) + .build(), + ) + .await + .unwrap(); +} + +#[tokio::test(flavor = "multi_thread")] +async fn async_ent_unique_job_until_start() { + use tokio::time; + + skip_if_not_enterprise!(); + + let url = learn_faktory_url(); + + let queue_name = "ent_unique_job_until_start"; + let job_type = "order"; + let difficulty_level = 3; + let unique_for = 4; + + let url1 = url.clone(); + let handle = tokio::spawn(async move { + let mut producer_a = AsyncProducer::connect(Some(&url1)).await.unwrap(); + let mut consumer_a = ConsumerBuilder::default_async(); + consumer_a.register(job_type, |job| { + Box::pin(async move { + let args = job.args().to_owned(); + let mut args = args.iter(); + let diffuculty_level = args + .next() + .expect("job difficulty level is there") + .to_owned(); + let sleep_secs = + serde_json::from_value::(diffuculty_level).expect("a valid number"); + time::sleep(time::Duration::from_secs(sleep_secs as u64)).await; + eprintln!("{:?}", job); + Ok::<(), io::Error>(()) + }) + }); + let mut consumer_a = consumer_a.connect(Some(&url1)).await.unwrap(); + producer_a + .enqueue( + JobBuilder::new(job_type) + .args(vec![difficulty_level]) + .queue(queue_name) + .unique_for(unique_for) + .unique_until_start() // NB! + .build(), + ) + .await + .unwrap(); + // as soon as the job is fetched, the unique lock gets released + let had_job = consumer_a.run_one(0, &[queue_name]).await.unwrap(); + assert!(had_job); + }); + + // let spawned thread gain momentum: + time::sleep(time::Duration::from_secs(1)).await; + + // the unique lock has been released by this time, so the job is enqueued successfully: + let mut producer_b = AsyncProducer::connect(Some(&url)).await.unwrap(); + producer_b + .enqueue( + JobBuilder::new(job_type) + .args(vec![difficulty_level]) + .queue(queue_name) + .unique_for(unique_for) + .build(), + ) + .await + .unwrap(); + + handle.await.expect("should join successfully"); +} + +#[tokio::test(flavor = "multi_thread")] +async fn async_ent_unique_job_bypass_unique_lock() { + use faktory::error; + + skip_if_not_enterprise!(); + + let url = learn_faktory_url(); + let mut producer = AsyncProducer::connect(Some(&url)).await.unwrap(); + let queue_name = "ent_unique_job_bypass_unique_lock"; + let job1 = Job::builder("order") + .queue(queue_name) + .unique_for(60) + .build(); + + // Now the following job is _technically_ a 'duplicate', BUT if the `unique_for` value is not set, + // the uniqueness lock will be bypassed on the server. This special case is mentioned in the docs: + // https://github.com/contribsys/faktory/wiki/Ent-Unique-Jobs#bypassing-uniqueness + let job2 = Job::builder("order") // same jobtype and args (args are just not set) + .queue(queue_name) // same queue + .build(); // NB: `unique_for` not set + + producer.enqueue(job1).await.unwrap(); + producer.enqueue(job2).await.unwrap(); // bypassing the lock! + + // This _is_ a 'duplicate'. + let job3 = Job::builder("order") + .queue(queue_name) + .unique_for(60) // NB + .build(); + + let res = producer.enqueue(job3).await.unwrap_err(); // NOT bypassing the lock! + + if let error::Error::Protocol(error::Protocol::UniqueConstraintViolation { msg }) = res { + assert_eq!(msg, "Job not unique"); + } else { + panic!("Expected protocol error.") + } + + // let's consume three times from the queue to verify that the first two jobs + // have been enqueued for real, while the last one has not. + let mut c = ConsumerBuilder::default_async(); + c.register("order", |j| Box::pin(print_job(j))); + let mut c = c.connect(Some(&url)).await.unwrap(); + + assert!(c.run_one(0, &[queue_name]).await.unwrap()); + assert!(c.run_one(0, &[queue_name]).await.unwrap()); + assert!(!c.run_one(0, &[queue_name]).await.unwrap()); // empty; +}