Skip to content

Commit

Permalink
Job builder (#37)
Browse files Browse the repository at this point in the history
  • Loading branch information
rustworthy authored Jan 3, 2024
1 parent 481b152 commit 0fbcb22
Show file tree
Hide file tree
Showing 7 changed files with 332 additions and 28 deletions.
73 changes: 73 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ fnv = "1.0.5"
native-tls = { version = "0.2", optional = true }
clap = { version = "3.1.0", optional = true }
thiserror = "1.0.30"
derive_builder = "0.12.0"

[dev-dependencies]
mockstream = "0.0.3"
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,5 +73,5 @@ pub use tls::TlsStream;
pub use crate::consumer::{Consumer, ConsumerBuilder};
pub use crate::error::Error;
pub use crate::producer::Producer;
pub use crate::proto::Job;
pub use crate::proto::Reconnect;
pub use crate::proto::{Job, JobBuilder};
4 changes: 3 additions & 1 deletion src/proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ pub(crate) const EXPECTED_PROTOCOL_VERSION: usize = 2;
mod single;

// commands that users can issue
pub use self::single::{Ack, Fail, Heartbeat, Info, Job, Push, QueueAction, QueueControl};
pub use self::single::{
Ack, Fail, Heartbeat, Info, Job, JobBuilder, Push, QueueAction, QueueControl,
};

// responses that users can see
pub use self::single::Hi;

Check warning on line 19 in src/proto/mod.rs

View workflow job for this annotation

GitHub Actions / clippy

unused import: `self::single::Hi`

warning: unused import: `self::single::Hi` --> src/proto/mod.rs:19:9 | 19 | pub use self::single::Hi; | ^^^^^^^^^^^^^^^^ | = note: `#[warn(unused_imports)]` on by default

Check warning on line 19 in src/proto/mod.rs

View workflow job for this annotation

GitHub Actions / nightly / doc

unused import: `self::single::Hi`

Check warning on line 19 in src/proto/mod.rs

View workflow job for this annotation

GitHub Actions / ubuntu / stable / features

unused import: `self::single::Hi`

Check warning on line 19 in src/proto/mod.rs

View workflow job for this annotation

GitHub Actions / ubuntu / stable / features

unused import: `self::single::Hi`

Check warning on line 19 in src/proto/mod.rs

View workflow job for this annotation

GitHub Actions / ubuntu / stable / features

unused import: `self::single::Hi`

Check warning on line 19 in src/proto/mod.rs

View workflow job for this annotation

GitHub Actions / ubuntu / stable / features

unused import: `self::single::Hi`

Check warning on line 19 in src/proto/mod.rs

View workflow job for this annotation

GitHub Actions / ubuntu / stable / features

unused import: `self::single::Hi`

Check warning on line 19 in src/proto/mod.rs

View workflow job for this annotation

GitHub Actions / ubuntu / stable

unused import: `self::single::Hi`

Check warning on line 19 in src/proto/mod.rs

View workflow job for this annotation

GitHub Actions / ubuntu / nightly

unused import: `self::single::Hi`

Check warning on line 19 in src/proto/mod.rs

View workflow job for this annotation

GitHub Actions / ubuntu / beta / updated

unused import: `self::single::Hi`

Check warning on line 19 in src/proto/mod.rs

View workflow job for this annotation

GitHub Actions / ubuntu / beta

unused import: `self::single::Hi`

Check warning on line 19 in src/proto/mod.rs

View workflow job for this annotation

GitHub Actions / windows-latest / stable

unused import: `self::single::Hi`

Check warning on line 19 in src/proto/mod.rs

View workflow job for this annotation

GitHub Actions / macos-latest / stable

unused import: `self::single::Hi`
Expand Down
182 changes: 156 additions & 26 deletions src/proto/single/mod.rs
Original file line number Diff line number Diff line change
@@ -1,77 +1,134 @@
use chrono::{DateTime, Utc};
use derive_builder::Builder;
use std::collections::HashMap;
use std::io::prelude::*;

mod cmd;
mod resp;
mod utils;

use crate::error::Error;

pub use self::cmd::*;
pub use self::resp::*;

const JOB_DEFAULT_QUEUE: &str = "default";
const JOB_DEFAULT_RESERVED_FOR_SECS: usize = 600;
const JOB_DEFAULT_RETRY_COUNT: usize = 25;
const JOB_DEFAULT_PRIORITY: u8 = 5;
const JOB_DEFAULT_BACKTRACE: usize = 0;

/// A Faktory job.
///
/// To create a job, use 'Job::new' specifying 'kind' and 'args':
/// ```
/// use faktory::Job;
///
/// let _job = Job::new("order", vec!["ISBN-13:9781718501850"]);
/// ```
///
/// Alternatively, use [`JobBuilder`] to configure more aspects of a job:
/// ```
/// use faktory::JobBuilder;
///
/// let _job = JobBuilder::new("order")
/// .args(vec!["ISBN-13:9781718501850"])
/// .build();
/// ```
///
/// Equivalently:
/// ```
/// use faktory::Job;
///
/// let _job = Job::builder("order")
/// .args(vec!["ISBN-13:9781718501850"])
/// .build();
/// ```
///
/// In case no arguments are expected 'on the other side', you can simply go with:
/// ```
/// use faktory::Job;
///
/// let _job = Job::builder("rebuild_index").build();
/// ```
///
/// See also the [Faktory wiki](https://github.com/contribsys/faktory/wiki/The-Job-Payload).
#[derive(Serialize, Deserialize, Debug)]
#[derive(Serialize, Deserialize, Debug, Builder)]
#[builder(
custom_constructor,
setter(into),
build_fn(name = "try_build", private)
)]
pub struct Job {
/// The job's unique identifier.
#[builder(default = "utils::gen_random_jid()")]
pub(crate) jid: String,

/// The queue this job belongs to. Usually `default`.
#[builder(default = "JOB_DEFAULT_QUEUE.into()")]
pub queue: String,

/// The job's type. Called `kind` because `type` is reserved.
#[serde(rename = "jobtype")]
#[builder(setter(custom))]
pub(crate) kind: String,

/// The arguments provided for this job.
#[builder(setter(custom), default = "Vec::new()")]
pub(crate) args: Vec<serde_json::Value>,

/// When this job was created.
// note that serializing works correctly here since the default chrono serialization
// is RFC3339, which is also what Faktory expects.
#[serde(skip_serializing_if = "Option::is_none")]
#[builder(default = "Some(Utc::now())")]
pub created_at: Option<DateTime<Utc>>,

/// When this job was supplied to the Faktory server.
#[serde(skip_serializing_if = "Option::is_none")]
#[builder(setter(skip))]
pub enqueued_at: Option<DateTime<Utc>>,

/// When this job is scheduled for.
///
/// Defaults to immediately.
#[serde(skip_serializing_if = "Option::is_none")]
#[builder(default = "None")]
pub at: Option<DateTime<Utc>>,

/// How long to allow this job to run for.
///
/// Defaults to 600 seconds.
#[serde(skip_serializing_if = "Option::is_none")]
#[builder(default = "Some(JOB_DEFAULT_RESERVED_FOR_SECS)")]
pub reserve_for: Option<usize>,

/// Number of times to retry this job.
///
/// Defaults to 25.
#[serde(skip_serializing_if = "Option::is_none")]
pub retry: Option<isize>,
#[builder(default = "Some(JOB_DEFAULT_RETRY_COUNT)")]
pub retry: Option<usize>,

/// The priority of this job from 1-9 (9 is highest).
///
/// Pushing a job with priority 9 will effectively put it at the front of the queue.
/// Defaults to 5.
#[builder(default = "Some(JOB_DEFAULT_PRIORITY)")]
pub priority: Option<u8>,

/// Number of lines of backtrace to keep if this job fails.
///
/// Defaults to 0.
#[serde(skip_serializing_if = "Option::is_none")]
#[builder(default = "Some(JOB_DEFAULT_BACKTRACE)")]
pub backtrace: Option<usize>,

/// Data about this job's most recent failure.
///
/// This field is read-only.
#[serde(skip_serializing)]
#[builder(setter(skip))]
failure: Option<Failure>,

/// Extra context to include with the job.
Expand All @@ -83,10 +140,36 @@ pub struct Job {
/// across a complex distributed system, etc.
#[serde(skip_serializing_if = "HashMap::is_empty")]
#[serde(default = "HashMap::default")]
#[builder(default = "HashMap::default()")]
pub custom: HashMap<String, serde_json::Value>,
}

#[derive(Serialize, Deserialize, Debug)]
impl JobBuilder {
/// Create a new builder for a [`Job`]
pub fn new(kind: impl Into<String>) -> JobBuilder {
JobBuilder {
kind: Some(kind.into()),
..JobBuilder::create_empty()
}
}

/// Setter for the arguments provided for this job.
pub fn args<A>(&mut self, args: Vec<A>) -> &mut Self
where
A: Into<serde_json::Value>,
{
self.args = Some(args.into_iter().map(|s| s.into()).collect());
self
}

/// Builds a new [`Job`] from the parameters of this builder.
pub fn build(&self) -> Job {
self.try_build()
.expect("All required fields have been set.")
}
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Failure {
retry_count: usize,
failed_at: String,
Expand All @@ -108,29 +191,14 @@ impl Job {
S: Into<String>,
A: Into<serde_json::Value>,
{
use rand::{thread_rng, Rng};
let random_jid = thread_rng()
.sample_iter(&rand::distributions::Alphanumeric)
.map(char::from)
.take(16)
.collect();
use chrono::prelude::*;
Job {
jid: random_jid,
queue: "default".into(),
kind: kind.into(),
args: args.into_iter().map(|s| s.into()).collect(),

created_at: Some(Utc::now()),
enqueued_at: None,
at: None,
reserve_for: Some(600),
retry: Some(25),
priority: Some(5),
backtrace: Some(0),
failure: None,
custom: Default::default(),
}
JobBuilder::new(kind).args(args).build()
}

/// Creates an ergonomic constructor for a new [`Job`].
///
/// Also equivalent to [`JobBuilder::new`].
pub fn builder<S: Into<String>>(kind: S) -> JobBuilder {
JobBuilder::new(kind)
}

/// Place this job on the given `queue`.
Expand Down Expand Up @@ -175,3 +243,65 @@ pub fn write_command_and_await_ok<X: BufRead + Write, C: FaktoryCommand>(
write_command(x, command)?;
read_ok(x)
}

#[cfg(test)]
mod test {
use super::*;

#[test]
fn test_job_can_be_created_with_builder() {
let job_kind = "order";
let job_args = vec!["ISBN-13:9781718501850"];
let job = JobBuilder::new(job_kind).args(job_args.clone()).build();

assert!(job.jid != "".to_owned());
assert!(job.queue == JOB_DEFAULT_QUEUE.to_string());
assert_eq!(job.kind, job_kind);
assert_eq!(job.args, job_args);

assert!(job.created_at.is_some());
assert!(job.created_at < Some(Utc::now()));

assert!(job.enqueued_at.is_none());
assert!(job.at.is_none());
assert_eq!(job.reserve_for, Some(JOB_DEFAULT_RESERVED_FOR_SECS));
assert_eq!(job.retry, Some(JOB_DEFAULT_RETRY_COUNT));
assert_eq!(job.priority, Some(JOB_DEFAULT_PRIORITY));
assert_eq!(job.backtrace, Some(JOB_DEFAULT_BACKTRACE));
assert!(job.failure.is_none());
assert_eq!(job.custom, HashMap::default());

let job = JobBuilder::new(job_kind).build();
assert!(job.args.is_empty());
}

#[test]
fn test_all_job_creation_variants_align() {
let job1 = Job::new("order", vec!["ISBN-13:9781718501850"]);
let job2 = JobBuilder::new("order")
.args(vec!["ISBN-13:9781718501850"])
.build();
assert_eq!(job1.kind, job2.kind);
assert_eq!(job1.args, job2.args);
assert_eq!(job1.queue, job2.queue);
assert_eq!(job1.enqueued_at, job2.enqueued_at);
assert_eq!(job1.at, job2.at);
assert_eq!(job1.reserve_for, job2.reserve_for);
assert_eq!(job1.retry, job2.retry);
assert_eq!(job1.priority, job2.priority);
assert_eq!(job1.backtrace, job2.backtrace);
assert_eq!(job1.custom, job2.custom);

assert_ne!(job1.jid, job2.jid);
assert_ne!(job1.created_at, job2.created_at);

let job3 = Job::builder("order")
.args(vec!["ISBN-13:9781718501850"])
.build();
assert_eq!(job2.kind, job3.kind);
assert_eq!(job1.args, job2.args);

assert_ne!(job2.jid, job3.jid);
assert_ne!(job2.created_at, job3.created_at);
}
}
Loading

0 comments on commit 0fbcb22

Please sign in to comment.