Skip to content

Commit

Permalink
Adjust tests to use JobBuidler::new
Browse files Browse the repository at this point in the history
  • Loading branch information
rustworthy committed Dec 20, 2023
1 parent 03202b9 commit 08c3dbb
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 57 deletions.
13 changes: 5 additions & 8 deletions src/proto/single/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,11 +411,11 @@ mod test {
fn test_expiration_feature_for_enterprise_faktory() {
let five_min = chrono::Duration::seconds(300);
let exp_at = Utc::now() + five_min;
let job1 = half_stuff().expires_at(exp_at).build().unwrap();
let job1 = half_stuff().expires_at(exp_at).build();
let stored = job1.custom.get("expires_at").unwrap();
assert_eq!(stored, &serde_json::Value::from(to_iso_string(exp_at)));

let job2 = half_stuff().expires_in(five_min).build().unwrap();
let job2 = half_stuff().expires_in(five_min).build();
assert!(job2.custom.get("expires_at").is_some());
}

Expand All @@ -425,8 +425,7 @@ mod test {
let job = half_stuff()
.unique_for(60)
.unique_until_start()
.build()
.unwrap();
.build();
let stored_unique_for = job.custom.get("unique_for").unwrap();
let stored_unique_until = job.custom.get("unique_until").unwrap();
assert_eq!(stored_unique_for, &serde_json::Value::from(60));
Expand All @@ -435,8 +434,7 @@ mod test {
let job = half_stuff()
.unique_for(60)
.unique_until_success()
.build()
.unwrap();
.build();

let stored_unique_until = job.custom.get("unique_until").unwrap();
assert_eq!(stored_unique_until, &serde_json::Value::from("success"));
Expand All @@ -453,8 +451,7 @@ mod test {
.unique_for(40)
.add_to_custom_data("expires_at".into(), to_iso_string(expires_at1))
.expires_at(expires_at2)
.build()
.unwrap();
.build();
let stored_unique_for = job.custom.get("unique_for").unwrap();
assert_eq!(stored_unique_for, &serde_json::Value::from(40));
let stored_expires_at = job.custom.get("expires_at").unwrap();
Expand Down
73 changes: 24 additions & 49 deletions tests/real.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,12 +255,10 @@ fn ent_expiring_job() {
let job_ttl_secs: u64 = 3;

let ttl = chrono::Duration::seconds(job_ttl_secs as i64);
let job1 = JobBuilder::default()
.kind("AnExpiringJob")
let job1 = JobBuilder::new("AnExpiringJob")
.args(vec!["ISBN-13:9781718501850"])
.expires_at(chrono::Utc::now() + ttl)
.build()
.unwrap();
.build();

// enqueue and fetch immediately job1:
producer.enqueue(job1).unwrap();
Expand All @@ -272,12 +270,10 @@ fn ent_expiring_job() {
assert!(!had_job);

// prepare another one:
let job2 = JobBuilder::default()
.kind("AnExpiringJob")
let job2 = JobBuilder::new("AnExpiringJob")
.args(vec!["ISBN-13:9781718501850"])
.expires_at(chrono::Utc::now() + ttl)
.build()
.unwrap();
.build();

// enquere and then fetch job2, but after ttl:
producer.enqueue(job2).unwrap();
Expand Down Expand Up @@ -314,19 +310,15 @@ fn ent_unique_job() {
// are not setting 'unique_for' when creating those jobs:
let queue_name = "ent_unique_job";
let args = vec![Value::from("ISBN-13:9781718501850"), Value::from(100)];
let job1 = JobBuilder::default()
let job1 = JobBuilder::new(job_type)
.args(args.clone())
.kind(job_type)
.queue(queue_name)
.build()
.unwrap();
.build();
producer.enqueue(job1).unwrap();
let job2 = JobBuilder::default()
let job2 = JobBuilder::new(job_type)
.args(args.clone())
.kind(job_type)
.queue(queue_name)
.build()
.unwrap();
.build();
producer.enqueue(job2).unwrap();

let had_job = consumer.run_one(0, &[queue_name]).unwrap();
Expand All @@ -340,22 +332,18 @@ fn ent_unique_job() {
// the same args and kind (jobtype in Faktory terms) and pushed
// to the same queue:
let unique_for_secs = 3;
let job1 = JobBuilder::default()
let job1 = Job::builder(job_type)
.args(args.clone())
.kind(job_type)
.queue(queue_name)
.unique_for(unique_for_secs)
.build()
.unwrap();
.build();
producer.enqueue(job1).unwrap();
// this one is a 'duplicate' ...
let job2 = JobBuilder::default()
let job2 = Job::builder(job_type)
.args(args.clone())
.kind(job_type)
.queue(queue_name)
.unique_for(unique_for_secs)
.build()
.unwrap();
.build();
// ... so the server will respond accordingly:
let res = producer.enqueue(job2).unwrap_err();
if let error::Error::Protocol(error::Protocol::Internal { msg }) = res {
Expand All @@ -372,22 +360,18 @@ fn ent_unique_job() {
assert!(!had_another_one);

// Now let's repeat the latter case, but providing different args to job2:
let job1 = JobBuilder::default()
let job1 = JobBuilder::new(job_type)
.args(args.clone())
.kind(job_type)
.queue(queue_name)
.unique_for(unique_for_secs)
.build()
.unwrap();
.build();
producer.enqueue(job1).unwrap();
// this one is *NOT* a 'duplicate' ...
let job2 = JobBuilder::default()
let job2 = JobBuilder::new(job_type)
.args(vec![Value::from("ISBN-13:9781718501850"), Value::from(101)])
.kind(job_type)
.queue(queue_name)
.unique_for(unique_for_secs)
.build()
.unwrap();
.build();
// ... so the server will accept it:
producer.enqueue(job2).unwrap();

Expand Down Expand Up @@ -441,14 +425,12 @@ fn ent_unique_job_until_success() {
Ok(eprintln!("{:?}", job))
});
let mut consumer_a = consumer_a.connect(Some(&url1)).unwrap();
let job = JobBuilder::default()
let job = JobBuilder::new(job_type)
.args(vec![difficulty_level])
.kind(job_type)
.queue(queue_name)
.unique_for(unique_for)
.unique_until_success() // Faktory's default
.build()
.unwrap();
.build();
producer_a.enqueue(job).unwrap();
let had_job = consumer_a.run_one(0, &[queue_name]).unwrap();
assert!(had_job);
Expand All @@ -462,13 +444,12 @@ fn ent_unique_job_until_success() {

// this one is a 'duplicate' because the job is still
// being executed in the spawned thread:
let job = JobBuilder::default()
let job = JobBuilder::new(job_type)
.args(vec![difficulty_level])
.kind(job_type)
.queue(queue_name)
.unique_for(unique_for)
.build()
.unwrap();
.build();

// as a result:
let res = producer_b.enqueue(job).unwrap_err();
if let error::Error::Protocol(error::Protocol::Internal { msg }) = res {
Expand All @@ -483,13 +464,11 @@ fn ent_unique_job_until_success() {
// (with ACK sent to server), the producer 'B' can push another one:
assert!(producer_b
.enqueue(
JobBuilder::default()
JobBuilder::new(job_type)
.args(vec![difficulty_level])
.kind(job_type)
.queue(queue_name)
.unique_for(unique_for)
.build()
.unwrap()
)
.is_ok());
}
Expand Down Expand Up @@ -528,14 +507,12 @@ fn ent_unique_job_until_start() {
let mut consumer_a = consumer_a.connect(Some(&url1)).unwrap();
producer_a
.enqueue(
JobBuilder::default()
JobBuilder::new(job_type)
.args(vec![difficulty_level])
.kind(job_type)
.queue(queue_name)
.unique_for(unique_for)
.unique_until_start() // NB!
.build()
.unwrap(),
)
.unwrap();
// as soon as the job is fetched, the unique lock gets released
Expand All @@ -550,13 +527,11 @@ fn ent_unique_job_until_start() {
let mut producer_b = Producer::connect(Some(&url)).unwrap();
assert!(producer_b
.enqueue(
JobBuilder::default()
JobBuilder::new(job_type)
.args(vec![difficulty_level])
.kind(job_type)
.queue(queue_name)
.unique_for(unique_for)
.build()
.unwrap()
)
.is_ok());

Expand Down

0 comments on commit 08c3dbb

Please sign in to comment.