Skip to content

Commit

Permalink
Port rest of 'Ent' e2e tests
Browse files Browse the repository at this point in the history
  • Loading branch information
rustworthy committed Jan 27, 2024
1 parent 0694f30 commit a12e38a
Showing 1 changed file with 209 additions and 1 deletion.
210 changes: 209 additions & 1 deletion tests/real/async/enterprise.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ extern crate faktory;

use std::io;

use faktory::{AsyncConsumerBuilder, AsyncProducer, Job, JobBuilder};
use faktory::{AsyncConsumerBuilder, AsyncProducer, ConsumerBuilder, Job, JobBuilder};
use tokio::time;

use crate::skip_if_not_enterprise;
Expand Down Expand Up @@ -169,3 +169,211 @@ async fn async_ent_unique_job() {
let had_job = consumer.run_one(0, &[queue_name]).await.unwrap();
assert!(!had_job);
}

#[tokio::test(flavor = "multi_thread")]
async fn async_ent_unique_job_until_success() {
use faktory::error;
use std::io;
use tokio::time;

skip_if_not_enterprise!();

let url = learn_faktory_url();

let queue_name = "ent_unique_job_until_success";
let job_type = "order";

// the job will be being executed for at least 3 seconds,
// but is unique for 4 seconds;
let difficulty_level = 3;
let unique_for = 4;

let url1 = url.clone();
let handle = tokio::spawn(async move {
// prepare producer and consumer, where the former can
// send a job difficulty level as a job's args and the lattter
// will sleep for a corresponding period of time, pretending
// to work hard:
let mut producer_a = AsyncProducer::connect(Some(&url1)).await.unwrap();
let mut consumer_a = ConsumerBuilder::default_async();
consumer_a.register(job_type, |job| {
Box::pin(async move {
let args = job.args().to_owned();
let mut args = args.iter();
let diffuculty_level = args
.next()
.expect("job difficulty level is there")
.to_owned();
let sleep_secs =
serde_json::from_value::<i64>(diffuculty_level).expect("a valid number");
time::sleep(time::Duration::from_secs(sleep_secs as u64)).await;
eprintln!("{:?}", job);
Ok::<(), io::Error>(())
})
});
let mut consumer_a = consumer_a.connect(Some(&url1)).await.unwrap();
let job = JobBuilder::new(job_type)
.args(vec![difficulty_level])
.queue(queue_name)
.unique_for(unique_for)
.unique_until_success() // Faktory's default
.build();
producer_a.enqueue(job).await.unwrap();
let had_job = consumer_a.run_one(0, &[queue_name]).await.unwrap();
assert!(had_job);
});

// let spawned thread gain momentum:
time::sleep(time::Duration::from_secs(1)).await;

// continue
let mut producer_b = AsyncProducer::connect(Some(&url)).await.unwrap();

// this one is a 'duplicate' because the job is still
// being executed in the spawned thread:
let job = JobBuilder::new(job_type)
.args(vec![difficulty_level])
.queue(queue_name)
.unique_for(unique_for)
.build();

// as a result:
let res = producer_b.enqueue(job).await.unwrap_err();
if let error::Error::Protocol(error::Protocol::UniqueConstraintViolation { msg }) = res {
assert_eq!(msg, "Job not unique");
} else {
panic!("Expected protocol error.")
}

handle.await.expect("should join successfully");

// Now that the job submitted in a spawned thread has been successfully executed
// (with ACK sent to server), the producer 'B' can push another one:
producer_b
.enqueue(
JobBuilder::new(job_type)
.args(vec![difficulty_level])
.queue(queue_name)
.unique_for(unique_for)
.build(),
)
.await
.unwrap();
}

#[tokio::test(flavor = "multi_thread")]
async fn async_ent_unique_job_until_start() {
use tokio::time;

skip_if_not_enterprise!();

let url = learn_faktory_url();

let queue_name = "ent_unique_job_until_start";
let job_type = "order";
let difficulty_level = 3;
let unique_for = 4;

let url1 = url.clone();
let handle = tokio::spawn(async move {
let mut producer_a = AsyncProducer::connect(Some(&url1)).await.unwrap();
let mut consumer_a = ConsumerBuilder::default_async();
consumer_a.register(job_type, |job| {
Box::pin(async move {
let args = job.args().to_owned();
let mut args = args.iter();
let diffuculty_level = args
.next()
.expect("job difficulty level is there")
.to_owned();
let sleep_secs =
serde_json::from_value::<i64>(diffuculty_level).expect("a valid number");
time::sleep(time::Duration::from_secs(sleep_secs as u64)).await;
eprintln!("{:?}", job);
Ok::<(), io::Error>(())
})
});
let mut consumer_a = consumer_a.connect(Some(&url1)).await.unwrap();
producer_a
.enqueue(
JobBuilder::new(job_type)
.args(vec![difficulty_level])
.queue(queue_name)
.unique_for(unique_for)
.unique_until_start() // NB!
.build(),
)
.await
.unwrap();
// as soon as the job is fetched, the unique lock gets released
let had_job = consumer_a.run_one(0, &[queue_name]).await.unwrap();
assert!(had_job);
});

// let spawned thread gain momentum:
time::sleep(time::Duration::from_secs(1)).await;

// the unique lock has been released by this time, so the job is enqueued successfully:
let mut producer_b = AsyncProducer::connect(Some(&url)).await.unwrap();
producer_b
.enqueue(
JobBuilder::new(job_type)
.args(vec![difficulty_level])
.queue(queue_name)
.unique_for(unique_for)
.build(),
)
.await
.unwrap();

handle.await.expect("should join successfully");
}

#[tokio::test(flavor = "multi_thread")]
async fn async_ent_unique_job_bypass_unique_lock() {
use faktory::error;

skip_if_not_enterprise!();

let url = learn_faktory_url();
let mut producer = AsyncProducer::connect(Some(&url)).await.unwrap();
let queue_name = "ent_unique_job_bypass_unique_lock";
let job1 = Job::builder("order")
.queue(queue_name)
.unique_for(60)
.build();

// Now the following job is _technically_ a 'duplicate', BUT if the `unique_for` value is not set,
// the uniqueness lock will be bypassed on the server. This special case is mentioned in the docs:
// https://github.com/contribsys/faktory/wiki/Ent-Unique-Jobs#bypassing-uniqueness
let job2 = Job::builder("order") // same jobtype and args (args are just not set)
.queue(queue_name) // same queue
.build(); // NB: `unique_for` not set

producer.enqueue(job1).await.unwrap();
producer.enqueue(job2).await.unwrap(); // bypassing the lock!

// This _is_ a 'duplicate'.
let job3 = Job::builder("order")
.queue(queue_name)
.unique_for(60) // NB
.build();

let res = producer.enqueue(job3).await.unwrap_err(); // NOT bypassing the lock!

if let error::Error::Protocol(error::Protocol::UniqueConstraintViolation { msg }) = res {
assert_eq!(msg, "Job not unique");
} else {
panic!("Expected protocol error.")
}

// let's consume three times from the queue to verify that the first two jobs
// have been enqueued for real, while the last one has not.
let mut c = ConsumerBuilder::default_async();
c.register("order", |j| Box::pin(print_job(j)));
let mut c = c.connect(Some(&url)).await.unwrap();

assert!(c.run_one(0, &[queue_name]).await.unwrap());
assert!(c.run_one(0, &[queue_name]).await.unwrap());
assert!(!c.run_one(0, &[queue_name]).await.unwrap()); // empty;
}

0 comments on commit a12e38a

Please sign in to comment.