diff --git a/.github/workflows/ent.yaml b/.github/workflows/ent.yaml new file mode 100644 index 00000000..bebd6e11 --- /dev/null +++ b/.github/workflows/ent.yaml @@ -0,0 +1,39 @@ +# This is a CI workflow that runs the test against Enterprise Edition of Faktory. +# The binary (for macos only) is avalable for download for testing purposes with each Faktory release. +permissions: + contents: read +on: + push: + branches: + - main + pull_request: +concurrency: + group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }} + cancel-in-progress: true +name: enterprise +jobs: + test: + runs-on: macos-latest + env: + FAKTORY_VERSION: 1.8.0 + steps: + - uses: actions/checkout@v4 + - name: Install redis + run: brew install redis + - name: Download Faktory binary + run: | + wget -O faktory.tbz https://github.com/contribsys/faktory/releases/download/v${{ env.FAKTORY_VERSION }}/faktory-ent_${{ env.FAKTORY_VERSION }}.macos.amd64.tbz + tar xfv faktory.tbz + cp ./faktory /usr/local/bin + - name: Launch Faktory in background + run: faktory & + - name: Install stable + uses: dtolnay/rust-toolchain@stable + - name: cargo generate-lockfile + if: hashFiles('Cargo.lock') == '' + run: cargo generate-lockfile + - name: Run tests + env: + FAKTORY_URL: tcp://127.0.0.1:7419 + FAKTORY_ENT: true + run: cargo test --locked --features ent --all-targets diff --git a/Cargo.toml b/Cargo.toml index c0eebd9e..7dc85b6e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,7 @@ categories = ["api-bindings", "asynchronous", "network-programming"] default = [] tls = ["native-tls"] binaries = ["clap"] +ent = [] [dependencies] serde_json = "1.0" diff --git a/src/error.rs b/src/error.rs index 6c3d4dd9..38e14ec8 100644 --- a/src/error.rs +++ b/src/error.rs @@ -93,6 +93,14 @@ pub enum Protocol { desc: String, }, + /// The server reported a unique constraint violation. + #[cfg(feature = "ent")] + #[error("server reported unique constraint violation: {msg}")] + UniqueConstraintViolation { + /// The error message given by the server. + msg: String, + }, + /// The server responded with an error. #[error("an internal server error occurred: {msg}")] Internal { @@ -139,6 +147,8 @@ impl Protocol { match code { Some("ERR") => Protocol::Internal { msg: error }, Some("MALFORMED") => Protocol::Malformed { desc: error }, + #[cfg(feature = "ent")] + Some("NOTUNIQUE") => Protocol::UniqueConstraintViolation { msg: error }, Some(c) => Protocol::Internal { msg: format!("{} {}", c, error), }, diff --git a/src/proto/single/ent.rs b/src/proto/single/ent.rs new file mode 100644 index 00000000..1a866788 --- /dev/null +++ b/src/proto/single/ent.rs @@ -0,0 +1,141 @@ +use chrono::{DateTime, Utc}; + +use crate::JobBuilder; + +impl JobBuilder { + /// When Faktory should expire this job. + /// + /// Faktory Enterprise allows for expiring jobs. This is setter for `expires_at` + /// field in the job's custom data. + /// ``` + /// # use faktory::JobBuilder; + /// # use chrono::{Duration, Utc}; + /// let _job = JobBuilder::new("order") + /// .args(vec!["ISBN-13:9781718501850"]) + /// .expires_at(Utc::now() + Duration::hours(1)) + /// .build(); + /// ``` + pub fn expires_at(&mut self, dt: DateTime) -> &mut Self { + self.add_to_custom_data( + "expires_at", + dt.to_rfc3339_opts(chrono::SecondsFormat::Nanos, true), + ) + } + + /// In what period of time from now (UTC) the Faktory should expire this job. + /// + /// Under the hood, the method will call `Utc::now` and add the provided `ttl` duration. + /// You can use this setter when you have a duration rather than some exact date and time, + /// expected by [`expires_at`](struct.JobBuilder.html#method.expires_at) setter. + /// Example usage: + /// ``` + /// # use faktory::JobBuilder; + /// # use chrono::Duration; + /// let _job = JobBuilder::new("order") + /// .args(vec!["ISBN-13:9781718501850"]) + /// .expires_in(Duration::weeks(1)) + /// .build(); + /// ``` + pub fn expires_in(&mut self, ttl: chrono::Duration) -> &mut Self { + self.expires_at(Utc::now() + ttl) + } + + /// How long the Faktory will not accept duplicates of this job. + /// + /// The job will be considered unique for the kind-args-queue combination. The uniqueness is best-effort, + /// rather than a guarantee. Check out the Enterprise Faktory [docs](https://github.com/contribsys/faktory/wiki/Ent-Unique-Jobs) + /// for details on how scheduling, retries, and other features live together with `unique_for`. + /// + /// If you've already created and pushed a unique job (job "A") to the Faktory server and now have got another one + /// of same kind, with the same args and destined for the same queue (job "B") and you would like - for some reason - to + /// bypass the unique constraint, simply leave `unique_for` field on the job's custom hash empty, i.e. do not use this setter. + /// In this case, the Faktory server will accept job "B", though technically this job "B" is a duplicate. + pub fn unique_for(&mut self, secs: usize) -> &mut Self { + self.add_to_custom_data("unique_for", secs) + } + + /// Remove unique lock for this job right before the job starts executing. + /// + /// Another job with the same kind-args-queue combination will be accepted by the Faktory server + /// after the period specified in [`unique_for`](struct.JobBuilder.html#method.unique_for) has finished + /// _or_ after this job has been been consumed (i.e. its execution has ***started***). + pub fn unique_until_start(&mut self) -> &mut Self { + self.add_to_custom_data("unique_until", "start") + } + + /// Do not remove unique lock for this job until it successfully finishes. + /// + /// Sets `unique_until` on the Job's custom hash to `success`, which is Faktory's default. + /// Another job with the same kind-args-queue combination will be accepted by the Faktory server + /// after the period specified in [`unique_for`](struct.JobBuilder.html#method.unique_for) has finished + /// _or_ after this job has been been ***successfully*** processed. + pub fn unique_until_success(&mut self) -> &mut Self { + self.add_to_custom_data("unique_until", "success") + } +} + +#[cfg(test)] +mod test { + use chrono::{DateTime, Utc}; + + use crate::JobBuilder; + + fn half_stuff() -> JobBuilder { + let mut job = JobBuilder::new("order"); + job.args(vec!["ISBN-13:9781718501850"]); + job + } + + // Returns date and time string in the format expected by Faktory. + // Serializes date and time into a string as per RFC 3338 and ISO 8601 + // with nanoseconds precision and 'Z' literal for the timzone column. + fn to_iso_string(dt: DateTime) -> String { + dt.to_rfc3339_opts(chrono::SecondsFormat::Nanos, true) + } + + #[test] + fn test_expiration_feature_for_enterprise_faktory() { + let five_min = chrono::Duration::seconds(300); + let exp_at = Utc::now() + five_min; + let job1 = half_stuff().expires_at(exp_at).build(); + let stored = job1.custom.get("expires_at").unwrap(); + assert_eq!(stored, &serde_json::Value::from(to_iso_string(exp_at))); + + let job2 = half_stuff().expires_in(five_min).build(); + assert!(job2.custom.get("expires_at").is_some()); + } + + #[test] + fn test_uniqueness_faeture_for_enterprise_faktory() { + let job = half_stuff().unique_for(60).unique_until_start().build(); + let stored_unique_for = job.custom.get("unique_for").unwrap(); + let stored_unique_until = job.custom.get("unique_until").unwrap(); + assert_eq!(stored_unique_for, &serde_json::Value::from(60)); + assert_eq!(stored_unique_until, &serde_json::Value::from("start")); + + let job = half_stuff().unique_for(60).unique_until_success().build(); + + let stored_unique_until = job.custom.get("unique_until").unwrap(); + assert_eq!(stored_unique_until, &serde_json::Value::from("success")); + } + + #[test] + fn test_same_purpose_setters_applied_simultaneously() { + let expires_at1 = Utc::now() + chrono::Duration::seconds(300); + let expires_at2 = Utc::now() + chrono::Duration::seconds(300); + let job = half_stuff() + .unique_for(60) + .add_to_custom_data("unique_for", 600) + .unique_for(40) + .add_to_custom_data("expires_at", to_iso_string(expires_at1)) + .expires_at(expires_at2) + .build(); + let stored_unique_for = job.custom.get("unique_for").unwrap(); + assert_eq!(stored_unique_for, &serde_json::Value::from(40)); + let stored_expires_at = job.custom.get("expires_at").unwrap(); + assert_eq!( + stored_expires_at, + &serde_json::Value::from(to_iso_string(expires_at2)) + ) + } +} diff --git a/src/proto/single/mod.rs b/src/proto/single/mod.rs index 011544ed..a02df87b 100644 --- a/src/proto/single/mod.rs +++ b/src/proto/single/mod.rs @@ -7,6 +7,9 @@ mod cmd; mod resp; mod utils; +#[cfg(feature = "ent")] +mod ent; + use crate::error::Error; pub use self::cmd::*; @@ -145,7 +148,7 @@ pub struct Job { } impl JobBuilder { - /// Create a new builder for a [`Job`] + /// Creates a new builder for a [`Job`] pub fn new(kind: impl Into) -> JobBuilder { JobBuilder { kind: Some(kind.into()), @@ -162,6 +165,17 @@ impl JobBuilder { self } + /// Sets arbitrary key-value pairs to this job's custom data hash. + pub fn add_to_custom_data( + &mut self, + k: impl Into, + v: impl Into, + ) -> &mut Self { + let custom = self.custom.get_or_insert_with(HashMap::new); + custom.insert(k.into(), v.into()); + self + } + /// Builds a new [`Job`] from the parameters of this builder. pub fn build(&self) -> Job { self.try_build() @@ -304,4 +318,17 @@ mod test { assert_ne!(job2.jid, job3.jid); assert_ne!(job2.created_at, job3.created_at); } + + #[test] + fn test_arbitrary_custom_data_setter() { + let job = JobBuilder::new("order") + .args(vec!["ISBN-13:9781718501850"]) + .add_to_custom_data("arbitrary_key", "arbitrary_value") + .build(); + + assert_eq!( + job.custom.get("arbitrary_key").unwrap(), + &serde_json::Value::from("arbitrary_value") + ); + } } diff --git a/tests/real.rs b/tests/real/community.rs similarity index 100% rename from tests/real.rs rename to tests/real/community.rs diff --git a/tests/real/enterprise.rs b/tests/real/enterprise.rs new file mode 100644 index 00000000..d1f651f1 --- /dev/null +++ b/tests/real/enterprise.rs @@ -0,0 +1,363 @@ +extern crate faktory; +extern crate serde_json; +extern crate url; + +use faktory::*; +use std::io; + +macro_rules! skip_if_not_enterprise { + () => { + if std::env::var_os("FAKTORY_ENT").is_none() { + return; + } + }; +} + +fn learn_faktory_url() -> String { + let url = std::env::var_os("FAKTORY_URL").expect( + "Enterprise Faktory should be running for this test, and 'FAKTORY_URL' environment variable should be provided", + ); + url.to_str().expect("Is a utf-8 string").to_owned() +} + +#[test] +fn ent_expiring_job() { + use std::{thread, time}; + + skip_if_not_enterprise!(); + + let url = learn_faktory_url(); + + // prepare a producer ("client" in Faktory terms) and consumer ("worker"): + let mut producer = Producer::connect(Some(&url)).unwrap(); + let mut consumer = ConsumerBuilder::default(); + consumer.register("AnExpiringJob", move |job| -> io::Result<_> { + Ok(eprintln!("{:?}", job)) + }); + let mut consumer = consumer.connect(Some(&url)).unwrap(); + + // prepare an expiring job: + let job_ttl_secs: u64 = 3; + + let ttl = chrono::Duration::seconds(job_ttl_secs as i64); + let job1 = JobBuilder::new("AnExpiringJob") + .args(vec!["ISBN-13:9781718501850"]) + .expires_at(chrono::Utc::now() + ttl) + .build(); + + // enqueue and fetch immediately job1: + producer.enqueue(job1).unwrap(); + let had_job = consumer.run_one(0, &["default"]).unwrap(); + assert!(had_job); + + // check that the queue is drained: + let had_job = consumer.run_one(0, &["default"]).unwrap(); + assert!(!had_job); + + // prepare another one: + let job2 = JobBuilder::new("AnExpiringJob") + .args(vec!["ISBN-13:9781718501850"]) + .expires_at(chrono::Utc::now() + ttl) + .build(); + + // enqueue and then fetch job2, but after ttl: + producer.enqueue(job2).unwrap(); + thread::sleep(time::Duration::from_secs(job_ttl_secs * 2)); + let had_job = consumer.run_one(0, &["default"]).unwrap(); + + // For the non-enterprise edition of Faktory, this assertion will + // fail, which should be taken into account when running the test suite on CI. + assert!(!had_job); +} + +#[test] +fn ent_unique_job() { + use faktory::error; + use serde_json::Value; + + skip_if_not_enterprise!(); + + let url = learn_faktory_url(); + + let job_type = "order"; + + // prepare producer and consumer: + let mut producer = Producer::connect(Some(&url)).unwrap(); + let mut consumer = ConsumerBuilder::default(); + consumer.register(job_type, |job| -> io::Result<_> { + Ok(eprintln!("{:?}", job)) + }); + let mut consumer = consumer.connect(Some(&url)).unwrap(); + + // Reminder. Jobs are considered unique for kind + args + queue. + // So the following two jobs, will be accepted by Faktory, since we + // are not setting 'unique_for' when creating those jobs: + let queue_name = "ent_unique_job"; + let args = vec![Value::from("ISBN-13:9781718501850"), Value::from(100)]; + let job1 = JobBuilder::new(job_type) + .args(args.clone()) + .queue(queue_name) + .build(); + producer.enqueue(job1).unwrap(); + let job2 = JobBuilder::new(job_type) + .args(args.clone()) + .queue(queue_name) + .build(); + producer.enqueue(job2).unwrap(); + + let had_job = consumer.run_one(0, &[queue_name]).unwrap(); + assert!(had_job); + let had_another_one = consumer.run_one(0, &[queue_name]).unwrap(); + assert!(had_another_one); + let and_that_is_it_for_now = !consumer.run_one(0, &[queue_name]).unwrap(); + assert!(and_that_is_it_for_now); + + // let's now create a unique job and followed by a job with + // the same args and kind (jobtype in Faktory terms) and pushed + // to the same queue: + let unique_for_secs = 3; + let job1 = Job::builder(job_type) + .args(args.clone()) + .queue(queue_name) + .unique_for(unique_for_secs) + .build(); + producer.enqueue(job1).unwrap(); + // this one is a 'duplicate' ... + let job2 = Job::builder(job_type) + .args(args.clone()) + .queue(queue_name) + .unique_for(unique_for_secs) + .build(); + // ... so the server will respond accordingly: + let res = producer.enqueue(job2).unwrap_err(); + if let error::Error::Protocol(error::Protocol::UniqueConstraintViolation { msg }) = res { + assert_eq!(msg, "Job not unique"); + } else { + panic!("Expected protocol error.") + } + + // Let's now consume the job which is 'holding' a unique lock: + let had_job = consumer.run_one(0, &[queue_name]).unwrap(); + assert!(had_job); + + // And check that the queue is really empty (`job2` from above + // has not been queued indeed): + let queue_is_empty = !consumer.run_one(0, &[queue_name]).unwrap(); + assert!(queue_is_empty); + + // Now let's repeat the latter case, but providing different args to job2: + let job1 = JobBuilder::new(job_type) + .args(args.clone()) + .queue(queue_name) + .unique_for(unique_for_secs) + .build(); + producer.enqueue(job1).unwrap(); + // this one is *NOT* a 'duplicate' ... + let job2 = JobBuilder::new(job_type) + .args(vec![Value::from("ISBN-13:9781718501850"), Value::from(101)]) + .queue(queue_name) + .unique_for(unique_for_secs) + .build(); + // ... so the server will accept it: + producer.enqueue(job2).unwrap(); + + let had_job = consumer.run_one(0, &[queue_name]).unwrap(); + assert!(had_job); + let had_another_one = consumer.run_one(0, &[queue_name]).unwrap(); + assert!(had_another_one); + + // and the queue is empty again: + let had_job = consumer.run_one(0, &[queue_name]).unwrap(); + assert!(!had_job); +} + +#[test] +fn ent_unique_job_until_success() { + use faktory::error; + use std::thread; + use std::time; + + skip_if_not_enterprise!(); + + let url = learn_faktory_url(); + + let queue_name = "ent_unique_job_until_success"; + let job_type = "order"; + + // the job will be being executed for at least 3 seconds, + // but is unique for 4 seconds; + let difficulty_level = 3; + let unique_for = 4; + + let url1 = url.clone(); + let handle = thread::spawn(move || { + // prepare producer and consumer, where the former can + // send a job difficulty level as a job's args and the lattter + // will sleep for a corresponding period of time, pretending + // to work hard: + let mut producer_a = Producer::connect(Some(&url1)).unwrap(); + let mut consumer_a = ConsumerBuilder::default(); + consumer_a.register(job_type, |job| -> io::Result<_> { + let args = job.args().to_owned(); + let mut args = args.iter(); + let diffuculty_level = args + .next() + .expect("job difficulty level is there") + .to_owned(); + let sleep_secs = + serde_json::from_value::(diffuculty_level).expect("a valid number"); + thread::sleep(time::Duration::from_secs(sleep_secs as u64)); + Ok(eprintln!("{:?}", job)) + }); + let mut consumer_a = consumer_a.connect(Some(&url1)).unwrap(); + let job = JobBuilder::new(job_type) + .args(vec![difficulty_level]) + .queue(queue_name) + .unique_for(unique_for) + .unique_until_success() // Faktory's default + .build(); + producer_a.enqueue(job).unwrap(); + let had_job = consumer_a.run_one(0, &[queue_name]).unwrap(); + assert!(had_job); + }); + + // let spawned thread gain momentum: + thread::sleep(time::Duration::from_secs(1)); + + // continue + let mut producer_b = Producer::connect(Some(&url)).unwrap(); + + // this one is a 'duplicate' because the job is still + // being executed in the spawned thread: + let job = JobBuilder::new(job_type) + .args(vec![difficulty_level]) + .queue(queue_name) + .unique_for(unique_for) + .build(); + + // as a result: + let res = producer_b.enqueue(job).unwrap_err(); + if let error::Error::Protocol(error::Protocol::UniqueConstraintViolation { msg }) = res { + assert_eq!(msg, "Job not unique"); + } else { + panic!("Expected protocol error.") + } + + handle.join().expect("should join successfully"); + + // Now that the job submitted in a spawned thread has been successfully executed + // (with ACK sent to server), the producer 'B' can push another one: + producer_b + .enqueue( + JobBuilder::new(job_type) + .args(vec![difficulty_level]) + .queue(queue_name) + .unique_for(unique_for) + .build(), + ) + .unwrap(); +} + +#[test] +fn ent_unique_job_until_start() { + use std::thread; + use std::time; + + skip_if_not_enterprise!(); + + let url = learn_faktory_url(); + + let queue_name = "ent_unique_job_until_start"; + let job_type = "order"; + let difficulty_level = 3; + let unique_for = 4; + + let url1 = url.clone(); + let handle = thread::spawn(move || { + let mut producer_a = Producer::connect(Some(&url1)).unwrap(); + let mut consumer_a = ConsumerBuilder::default(); + consumer_a.register(job_type, |job| -> io::Result<_> { + let args = job.args().to_owned(); + let mut args = args.iter(); + let diffuculty_level = args + .next() + .expect("job difficulty level is there") + .to_owned(); + let sleep_secs = + serde_json::from_value::(diffuculty_level).expect("a valid number"); + thread::sleep(time::Duration::from_secs(sleep_secs as u64)); + Ok(eprintln!("{:?}", job)) + }); + let mut consumer_a = consumer_a.connect(Some(&url1)).unwrap(); + producer_a + .enqueue( + JobBuilder::new(job_type) + .args(vec![difficulty_level]) + .queue(queue_name) + .unique_for(unique_for) + .unique_until_start() // NB! + .build(), + ) + .unwrap(); + // as soon as the job is fetched, the unique lock gets released + let had_job = consumer_a.run_one(0, &[queue_name]).unwrap(); + assert!(had_job); + }); + + // let spawned thread gain momentum: + thread::sleep(time::Duration::from_secs(1)); + + // the unique lock has been released by this time, so the job is enqueued successfully: + let mut producer_b = Producer::connect(Some(&url)).unwrap(); + producer_b + .enqueue( + JobBuilder::new(job_type) + .args(vec![difficulty_level]) + .queue(queue_name) + .unique_for(unique_for) + .build(), + ) + .unwrap(); + + handle.join().expect("should join successfully"); +} + +#[test] +fn ent_unique_job_bypass_unique_lock() { + use faktory::error; + use serde_json::Value; + + skip_if_not_enterprise!(); + + let url = learn_faktory_url(); + let mut producer = Producer::connect(Some(&url)).unwrap(); + + let job1 = Job::builder("order") + .queue("ent_unique_job_bypass_unique_lock") + .unique_for(60) + .build(); + + // Now the following job is _technically_ a 'duplicate', BUT if the `unique_for` value is not set, + // the uniqueness lock will be bypassed on the server. This special case is mentioned in the docs: + // https://github.com/contribsys/faktory/wiki/Ent-Unique-Jobs#bypassing-uniqueness + let job2 = Job::builder("order") // same jobtype and args (args are just not set) + .queue("ent_unique_job_bypass_unique_lock") // same queue + .build(); // NB: `unique_for` not set + + producer.enqueue(job1).unwrap(); + producer.enqueue(job2).unwrap(); // bypassing the lock! + + // This _is_ a 'duplicate'. + let job3 = Job::builder("order") + .queue("ent_unique_job_bypass_unique_lock") + .unique_for(60) // NB + .build(); + + let res = producer.enqueue(job3).unwrap_err(); // NOT bypassing the lock! + + if let error::Error::Protocol(error::Protocol::UniqueConstraintViolation { msg }) = res { + assert_eq!(msg, "Job not unique"); + } else { + panic!("Expected protocol error.") + } +} diff --git a/tests/real/main.rs b/tests/real/main.rs new file mode 100644 index 00000000..b8b8f3dd --- /dev/null +++ b/tests/real/main.rs @@ -0,0 +1,4 @@ +mod community; + +#[cfg(feature = "ent")] +mod enterprise;