Skip to content

Commit

Permalink
Add test for 'unique_until_start' option
Browse files Browse the repository at this point in the history
  • Loading branch information
rustworthy committed Dec 9, 2023
1 parent f437f06 commit fa86dbd
Showing 1 changed file with 67 additions and 0 deletions.
67 changes: 67 additions & 0 deletions tests/real.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,3 +440,70 @@ fn ent_unique_job_until_success() {
)
.is_ok());
}

#[cfg(feature = "ent")]
#[test]
fn ent_unique_job_until_start() {
use std::thread;
use std::time;

skip_if_not_enterprise!();

let url = learn_faktory_url();

let queue_name = "ent_unique_job_until_start";
let job_type = "order";
let difficulty_level = 3;
let unique_for = 4;

let url1 = url.clone();
let handle = thread::spawn(move || {

Check warning on line 460 in tests/real.rs

View workflow job for this annotation

GitHub Actions / ubuntu / stable / features

unused variable: `handle`

Check warning on line 460 in tests/real.rs

View workflow job for this annotation

GitHub Actions / ubuntu / stable / features

unused variable: `handle`

Check warning on line 460 in tests/real.rs

View workflow job for this annotation

GitHub Actions / ubuntu / stable / features

unused variable: `handle`

Check warning on line 460 in tests/real.rs

View workflow job for this annotation

GitHub Actions / ubuntu / stable / features

unused variable: `handle`

Check warning on line 460 in tests/real.rs

View workflow job for this annotation

GitHub Actions / ubuntu / stable / features

unused variable: `handle`

Check warning on line 460 in tests/real.rs

View workflow job for this annotation

GitHub Actions / ubuntu / stable

unused variable: `handle`

Check warning on line 460 in tests/real.rs

View workflow job for this annotation

GitHub Actions / macos-latest / stable

unused variable: `handle`

Check warning on line 460 in tests/real.rs

View workflow job for this annotation

GitHub Actions / windows-latest / stable

unused variable: `handle`
let mut producer_a = Producer::connect(Some(&url1)).unwrap();
let mut consumer_a = ConsumerBuilder::default();
consumer_a.register(job_type, |job| -> io::Result<_> {
let args = job.args().to_owned();
let mut args = args.iter();
let diffuculty_level = args
.next()
.expect("job difficulty level is there")
.to_owned();
let sleep_secs =
serde_json::from_value::<i64>(diffuculty_level).expect("a valid number");
thread::sleep(time::Duration::from_secs(sleep_secs as u64));
Ok(eprintln!("{:?}", job))
});
let mut consumer_a = consumer_a.connect(Some(&url1)).unwrap();
producer_a
.enqueue(
JobBuilder::default()
.args(vec![difficulty_level])
.kind(job_type)
.queue(queue_name)
.unique_for(unique_for)
.unique_until_start() // NB!
.build()
.unwrap(),
)
.unwrap();
// as soon as the job is fetched, the unique lock gets released
let had_job = consumer_a.run_one(0, &[queue_name]).unwrap();
assert!(had_job);
});

// let spawned thread gain momentum:
thread::sleep(time::Duration::from_secs(1));

// the unique lock has been released by this time, so the job is enqueued successfully:
let mut producer_b = Producer::connect(Some(&url)).unwrap();
assert!(producer_b
.enqueue(
JobBuilder::default()
.args(vec![difficulty_level])
.kind(job_type)
.queue(queue_name)
.unique_for(unique_for)
.build()
.unwrap()
)
.is_ok());
}

0 comments on commit fa86dbd

Please sign in to comment.