Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unique and expiring jobs (Enterprise Faktory) #45

Merged
merged 19 commits into from
Jan 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions .github/workflows/ent.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# This is a CI workflow that runs the test against Enterprise Edition of Faktory.
# The binary (for macos only) is avalable for download for testing purposes with each Faktory release.
jonhoo marked this conversation as resolved.
Show resolved Hide resolved
permissions:
contents: read
on:
push:
branches:
- main
pull_request:
concurrency:
group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }}
cancel-in-progress: true
name: enterprise
jobs:
test:
runs-on: macos-latest
env:
FAKTORY_VERSION: 1.8.0
steps:
- uses: actions/checkout@v4
- name: Install redis
run: brew install redis
- name: Download Faktory binary
run: |
wget -O faktory.tbz https://github.com/contribsys/faktory/releases/download/v${{ env.FAKTORY_VERSION }}/faktory-ent_${{ env.FAKTORY_VERSION }}.macos.amd64.tbz
tar xfv faktory.tbz
cp ./faktory /usr/local/bin
- name: Launch Faktory in background
run: faktory &
- name: Install stable
uses: dtolnay/rust-toolchain@stable
- name: cargo generate-lockfile
if: hashFiles('Cargo.lock') == ''
run: cargo generate-lockfile
- name: Run tests
env:
FAKTORY_URL: tcp://127.0.0.1:7419
FAKTORY_ENT: true
run: cargo test --locked --features ent --all-targets
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ categories = ["api-bindings", "asynchronous", "network-programming"]
default = []
tls = ["native-tls"]
binaries = ["clap"]
ent = []

[dependencies]
serde_json = "1.0"
Expand Down
10 changes: 10 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,14 @@
desc: String,
},

/// The server reported a unique constraint violation.
#[cfg(feature = "ent")]
#[error("server reported unique constraint violation: {msg}")]
UniqueConstraintViolation {
/// The error message given by the server.
msg: String,
},

/// The server responded with an error.
#[error("an internal server error occurred: {msg}")]
Internal {
Expand Down Expand Up @@ -139,6 +147,8 @@
match code {
Some("ERR") => Protocol::Internal { msg: error },
Some("MALFORMED") => Protocol::Malformed { desc: error },
#[cfg(feature = "ent")]
Some("NOTUNIQUE") => Protocol::UniqueConstraintViolation { msg: error },

Check warning on line 151 in src/error.rs

View check run for this annotation

Codecov / codecov/patch

src/error.rs#L151

Added line #L151 was not covered by tests
Some(c) => Protocol::Internal {
msg: format!("{} {}", c, error),
},
Expand Down
141 changes: 141 additions & 0 deletions src/proto/single/ent.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
use chrono::{DateTime, Utc};

use crate::JobBuilder;

impl JobBuilder {
/// When Faktory should expire this job.
///
/// Faktory Enterprise allows for expiring jobs. This is setter for `expires_at`
/// field in the job's custom data.
/// ```
/// # use faktory::JobBuilder;
/// # use chrono::{Duration, Utc};
/// let _job = JobBuilder::new("order")
/// .args(vec!["ISBN-13:9781718501850"])
/// .expires_at(Utc::now() + Duration::hours(1))
/// .build();
/// ```
pub fn expires_at(&mut self, dt: DateTime<Utc>) -> &mut Self {
self.add_to_custom_data(
"expires_at",
dt.to_rfc3339_opts(chrono::SecondsFormat::Nanos, true),
)
}

/// In what period of time from now (UTC) the Faktory should expire this job.
///
/// Under the hood, the method will call `Utc::now` and add the provided `ttl` duration.
/// You can use this setter when you have a duration rather than some exact date and time,
/// expected by [`expires_at`](struct.JobBuilder.html#method.expires_at) setter.
/// Example usage:
/// ```
/// # use faktory::JobBuilder;
/// # use chrono::Duration;
/// let _job = JobBuilder::new("order")
/// .args(vec!["ISBN-13:9781718501850"])
/// .expires_in(Duration::weeks(1))
/// .build();
/// ```
pub fn expires_in(&mut self, ttl: chrono::Duration) -> &mut Self {
self.expires_at(Utc::now() + ttl)
}

/// How long the Faktory will not accept duplicates of this job.
///
/// The job will be considered unique for the kind-args-queue combination. The uniqueness is best-effort,
/// rather than a guarantee. Check out the Enterprise Faktory [docs](https://github.com/contribsys/faktory/wiki/Ent-Unique-Jobs)
/// for details on how scheduling, retries, and other features live together with `unique_for`.
///
/// If you've already created and pushed a unique job (job "A") to the Faktory server and now have got another one
/// of same kind, with the same args and destined for the same queue (job "B") and you would like - for some reason - to
/// bypass the unique constraint, simply leave `unique_for` field on the job's custom hash empty, i.e. do not use this setter.
/// In this case, the Faktory server will accept job "B", though technically this job "B" is a duplicate.
pub fn unique_for(&mut self, secs: usize) -> &mut Self {
jonhoo marked this conversation as resolved.
Show resolved Hide resolved
self.add_to_custom_data("unique_for", secs)
}

/// Remove unique lock for this job right before the job starts executing.
///
/// Another job with the same kind-args-queue combination will be accepted by the Faktory server
/// after the period specified in [`unique_for`](struct.JobBuilder.html#method.unique_for) has finished
/// _or_ after this job has been been consumed (i.e. its execution has ***started***).
pub fn unique_until_start(&mut self) -> &mut Self {
self.add_to_custom_data("unique_until", "start")
}

/// Do not remove unique lock for this job until it successfully finishes.
///
/// Sets `unique_until` on the Job's custom hash to `success`, which is Faktory's default.
/// Another job with the same kind-args-queue combination will be accepted by the Faktory server
/// after the period specified in [`unique_for`](struct.JobBuilder.html#method.unique_for) has finished
/// _or_ after this job has been been ***successfully*** processed.
pub fn unique_until_success(&mut self) -> &mut Self {
self.add_to_custom_data("unique_until", "success")
}
}

#[cfg(test)]
mod test {
use chrono::{DateTime, Utc};

use crate::JobBuilder;

fn half_stuff() -> JobBuilder {
let mut job = JobBuilder::new("order");
job.args(vec!["ISBN-13:9781718501850"]);
job
}

// Returns date and time string in the format expected by Faktory.
// Serializes date and time into a string as per RFC 3338 and ISO 8601
// with nanoseconds precision and 'Z' literal for the timzone column.
fn to_iso_string(dt: DateTime<Utc>) -> String {
dt.to_rfc3339_opts(chrono::SecondsFormat::Nanos, true)
}

#[test]
fn test_expiration_feature_for_enterprise_faktory() {
let five_min = chrono::Duration::seconds(300);
let exp_at = Utc::now() + five_min;
let job1 = half_stuff().expires_at(exp_at).build();
let stored = job1.custom.get("expires_at").unwrap();
assert_eq!(stored, &serde_json::Value::from(to_iso_string(exp_at)));

let job2 = half_stuff().expires_in(five_min).build();
assert!(job2.custom.get("expires_at").is_some());
}

#[test]
fn test_uniqueness_faeture_for_enterprise_faktory() {
let job = half_stuff().unique_for(60).unique_until_start().build();
let stored_unique_for = job.custom.get("unique_for").unwrap();
let stored_unique_until = job.custom.get("unique_until").unwrap();
assert_eq!(stored_unique_for, &serde_json::Value::from(60));
assert_eq!(stored_unique_until, &serde_json::Value::from("start"));

let job = half_stuff().unique_for(60).unique_until_success().build();

let stored_unique_until = job.custom.get("unique_until").unwrap();
assert_eq!(stored_unique_until, &serde_json::Value::from("success"));
}

#[test]
fn test_same_purpose_setters_applied_simultaneously() {
let expires_at1 = Utc::now() + chrono::Duration::seconds(300);
let expires_at2 = Utc::now() + chrono::Duration::seconds(300);
let job = half_stuff()
.unique_for(60)
.add_to_custom_data("unique_for", 600)
.unique_for(40)
.add_to_custom_data("expires_at", to_iso_string(expires_at1))
.expires_at(expires_at2)
.build();
let stored_unique_for = job.custom.get("unique_for").unwrap();
assert_eq!(stored_unique_for, &serde_json::Value::from(40));
let stored_expires_at = job.custom.get("expires_at").unwrap();
assert_eq!(
stored_expires_at,
&serde_json::Value::from(to_iso_string(expires_at2))
)
}
}
29 changes: 28 additions & 1 deletion src/proto/single/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ mod cmd;
mod resp;
mod utils;

#[cfg(feature = "ent")]
mod ent;

use crate::error::Error;

pub use self::cmd::*;
Expand Down Expand Up @@ -145,7 +148,7 @@ pub struct Job {
}

impl JobBuilder {
/// Create a new builder for a [`Job`]
/// Creates a new builder for a [`Job`]
pub fn new(kind: impl Into<String>) -> JobBuilder {
JobBuilder {
kind: Some(kind.into()),
Expand All @@ -162,6 +165,17 @@ impl JobBuilder {
self
}

/// Sets arbitrary key-value pairs to this job's custom data hash.
pub fn add_to_custom_data(
&mut self,
k: impl Into<String>,
v: impl Into<serde_json::Value>,
) -> &mut Self {
let custom = self.custom.get_or_insert_with(HashMap::new);
custom.insert(k.into(), v.into());
self
}

/// Builds a new [`Job`] from the parameters of this builder.
pub fn build(&self) -> Job {
self.try_build()
Expand Down Expand Up @@ -304,4 +318,17 @@ mod test {
assert_ne!(job2.jid, job3.jid);
assert_ne!(job2.created_at, job3.created_at);
}

#[test]
fn test_arbitrary_custom_data_setter() {
let job = JobBuilder::new("order")
.args(vec!["ISBN-13:9781718501850"])
.add_to_custom_data("arbitrary_key", "arbitrary_value")
.build();

assert_eq!(
job.custom.get("arbitrary_key").unwrap(),
&serde_json::Value::from("arbitrary_value")
);
}
}
File renamed without changes.
Loading
Loading