Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Job builder #37

Merged
merged 33 commits into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
1552f75
Use 'derive_builder' macro for 'Job'
rustworthy Dec 3, 2023
69e6836
Add initial test for 'JobBuilder'
rustworthy Dec 3, 2023
d389b19
Add client-side error group, add 'MalformedJob'
rustworthy Dec 3, 2023
3aa1599
Add custom build remapping to 'MalformedJob' err
rustworthy Dec 3, 2023
e888e39
Add 'single/utils' mod with 'gen_random_jid' utility
rustworthy Dec 4, 2023
21e6dbb
Rm duplicated import in Job::new
rustworthy Dec 4, 2023
88a3d77
Use 'gen_random_jid' as 'jid' default on 'Job' struct
rustworthy Dec 4, 2023
ede140b
Add builder default for 'gueue' field on 'Job' struct
rustworthy Dec 4, 2023
f1f3b34
Add test for required fields in 'Job'
rustworthy Dec 4, 2023
6885a3b
Add defaults for 'created_at', 'enqueued_at' and 'at' fields
rustworthy Dec 4, 2023
21179ac
Add defaults for 'reserved_for', 'retry' and 'priority' fields
rustworthy Dec 5, 2023
c41e4a0
Add custom validator, check priority is valid
rustworthy Dec 5, 2023
ca2bbdc
Add test for job created with 'JobBuilder'
rustworthy Dec 5, 2023
a824b5c
Use constants in Job::new
rustworthy Dec 5, 2023
414e73e
Align API of Job::new and JobBuilder
rustworthy Dec 5, 2023
7081990
Make JobBuilder publicly available
rustworthy Dec 5, 2023
ac361ae
Add demo usage of 'JobBuilder' to loadtest
rustworthy Dec 5, 2023
6700c11
Do clean up before PR submitting
rustworthy Dec 5, 2023
ecab0ee
Make 'try_build' private
rustworthy Dec 7, 2023
b2938c3
Opt out of setter for 'failure' on JobBuilder
rustworthy Dec 8, 2023
903ab25
Skip 'enqueued_at' setter on 'JobBuilder'
rustworthy Dec 12, 2023
7b48efe
Update 'build' signature to return 'Error'
rustworthy Dec 13, 2023
c7e329b
Checkout loadtest bin to main
rustworthy Dec 18, 2023
c92856c
Fmt enum variants in 'error' mod
rustworthy Dec 18, 2023
5cfa0fb
Remove PartialEq derive from 'Failure'
rustworthy Dec 18, 2023
ba49903
Do not validate priority
rustworthy Dec 18, 2023
f58e346
Add custom constructor instead of 'default' impl
rustworthy Dec 18, 2023
3325dcb
Make 'JobBuilder' infallible, rm 'Client' from Error enum
rustworthy Dec 18, 2023
d4070e5
Add example usage of JobBuidler::new and Job::builder
rustworthy Dec 18, 2023
0482aad
Use JobBuilder::new in Job::new
rustworthy Dec 18, 2023
053dad4
Checkout src/error to main
rustworthy Dec 18, 2023
26ced57
Add test to tests/real using all job creation variants
rustworthy Dec 18, 2023
9d5dc16
Apply suggestions from code review
jonhoo Jan 3, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ fnv = "1.0.5"
native-tls = { version = "0.2", optional = true }
clap = { version = "3.1.0", optional = true }
thiserror = "1.0.30"
derive_builder = "0.12.0"

[dev-dependencies]
mockstream = "0.0.3"
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,5 +73,5 @@ pub use tls::TlsStream;
pub use crate::consumer::{Consumer, ConsumerBuilder};
pub use crate::error::Error;
pub use crate::producer::Producer;
pub use crate::proto::Job;
pub use crate::proto::Reconnect;
pub use crate::proto::{Job, JobBuilder};
4 changes: 3 additions & 1 deletion src/proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@
mod single;

// commands that users can issue
pub use self::single::{Ack, Fail, Heartbeat, Info, Job, Push, QueueAction, QueueControl};
pub use self::single::{
Ack, Fail, Heartbeat, Info, Job, JobBuilder, Push, QueueAction, QueueControl,
};

// responses that users can see
pub use self::single::Hi;

Check warning on line 19 in src/proto/mod.rs

View workflow job for this annotation

GitHub Actions / nightly / doc

unused import: `self::single::Hi`

Check warning on line 19 in src/proto/mod.rs

View workflow job for this annotation

GitHub Actions / ubuntu / beta

unused import: `self::single::Hi`

Check warning on line 19 in src/proto/mod.rs

View workflow job for this annotation

GitHub Actions / ubuntu / nightly

unused import: `self::single::Hi`

Check warning on line 19 in src/proto/mod.rs

View workflow job for this annotation

GitHub Actions / ubuntu / beta / updated

unused import: `self::single::Hi`

pub(crate) fn get_env_url() -> String {
use std::env;
Expand Down
182 changes: 156 additions & 26 deletions src/proto/single/mod.rs
Original file line number Diff line number Diff line change
@@ -1,77 +1,134 @@
use chrono::{DateTime, Utc};
use derive_builder::Builder;
use std::collections::HashMap;
use std::io::prelude::*;

mod cmd;
mod resp;
mod utils;

use crate::error::Error;

pub use self::cmd::*;
pub use self::resp::*;

const JOB_DEFAULT_QUEUE: &str = "default";
const JOB_DEFAULT_RESERVED_FOR_SECS: usize = 600;
const JOB_DEFAULT_RETRY_COUNT: usize = 25;
const JOB_DEFAULT_PRIORITY: u8 = 5;
const JOB_DEFAULT_BACKTRACE: usize = 0;

/// A Faktory job.
///
/// To create a job, use 'Job::new' specifying 'kind' and 'args':
/// ```
/// use faktory::Job;
///
/// let _job = Job::new("order", vec!["ISBN-13:9781718501850"]);
/// ```
///
/// Alternatively, use 'JobBuilder' to construct a job:
jonhoo marked this conversation as resolved.
Show resolved Hide resolved
/// ```
/// use faktory::JobBuilder;
///
/// let _job = JobBuilder::new("order")
/// .args(vec!["ISBN-13:9781718501850"])
/// .build();
/// ```
///
/// Equivalently:
/// ```
/// use faktory::Job;
///
/// let _job = Job::builder("order")
/// .args(vec!["ISBN-13:9781718501850"])
/// .build();
/// ```
///
/// In case no arguments are expected 'on the other side', you can simply go with:
/// ```
/// use faktory::Job;
///
/// let _job = Job::builder("rebuild_index").build();
/// ```
///
/// See also the [Faktory wiki](https://github.com/contribsys/faktory/wiki/The-Job-Payload).
#[derive(Serialize, Deserialize, Debug)]
#[derive(Serialize, Deserialize, Debug, Builder)]
#[builder(
custom_constructor,
setter(into),
build_fn(name = "try_build", private)
)]
pub struct Job {
/// The job's unique identifier.
#[builder(default = "utils::gen_random_jid()")]
pub(crate) jid: String,

/// The queue this job belongs to. Usually `default`.
#[builder(default = "JOB_DEFAULT_QUEUE.into()")]
pub queue: String,

/// The job's type. Called `kind` because `type` is reserved.
#[serde(rename = "jobtype")]
#[builder(setter(custom))]
pub(crate) kind: String,
jonhoo marked this conversation as resolved.
Show resolved Hide resolved

/// The arguments provided for this job.
#[builder(setter(custom), default = "Vec::new()")]
pub(crate) args: Vec<serde_json::Value>,

/// When this job was created.
// note that serializing works correctly here since the default chrono serialization
// is RFC3339, which is also what Faktory expects.
#[serde(skip_serializing_if = "Option::is_none")]
#[builder(default = "Some(Utc::now())")]
pub created_at: Option<DateTime<Utc>>,

/// When this job was supplied to the Faktory server.
#[serde(skip_serializing_if = "Option::is_none")]
#[builder(setter(skip))]
pub enqueued_at: Option<DateTime<Utc>>,

/// When this job is scheduled for.
///
/// Defaults to immediately.
#[serde(skip_serializing_if = "Option::is_none")]
#[builder(default = "None")]
pub at: Option<DateTime<Utc>>,

/// How long to allow this job to run for.
///
/// Defaults to 600 seconds.
#[serde(skip_serializing_if = "Option::is_none")]
#[builder(default = "Some(JOB_DEFAULT_RESERVED_FOR_SECS)")]
pub reserve_for: Option<usize>,

/// Number of times to retry this job.
///
/// Defaults to 25.
#[serde(skip_serializing_if = "Option::is_none")]
pub retry: Option<isize>,
#[builder(default = "Some(JOB_DEFAULT_RETRY_COUNT)")]
pub retry: Option<usize>,

/// The priority of this job from 1-9 (9 is highest).
///
/// Pushing a job with priority 9 will effectively put it at the front of the queue.
/// Defaults to 5.
#[builder(default = "Some(JOB_DEFAULT_PRIORITY)")]
pub priority: Option<u8>,

/// Number of lines of backtrace to keep if this job fails.
///
/// Defaults to 0.
#[serde(skip_serializing_if = "Option::is_none")]
#[builder(default = "Some(JOB_DEFAULT_BACKTRACE)")]
pub backtrace: Option<usize>,

/// Data about this job's most recent failure.
///
/// This field is read-only.
#[serde(skip_serializing)]
#[builder(setter(skip))]
failure: Option<Failure>,

/// Extra context to include with the job.
Expand All @@ -83,10 +140,36 @@
/// across a complex distributed system, etc.
#[serde(skip_serializing_if = "HashMap::is_empty")]
#[serde(default = "HashMap::default")]
#[builder(default = "HashMap::default()")]
pub custom: HashMap<String, serde_json::Value>,
}

#[derive(Serialize, Deserialize, Debug)]
impl JobBuilder {
/// Create a new instance of 'JobBuilder'
jonhoo marked this conversation as resolved.
Show resolved Hide resolved
pub fn new(kind: impl Into<String>) -> JobBuilder {
JobBuilder {
kind: Some(kind.into()),
..JobBuilder::create_empty()
}
}

/// Setter for the arguments provided for this job.
pub fn args<A>(&mut self, args: Vec<A>) -> &mut Self
where
A: Into<serde_json::Value>,
{
self.args = Some(args.into_iter().map(|s| s.into()).collect());
self
}

/// Builds a new 'Job'
jonhoo marked this conversation as resolved.
Show resolved Hide resolved
pub fn build(&self) -> Job {
self.try_build()
.expect("All required fields have been set.")
}
}

#[derive(Serialize, Deserialize, Debug, Clone)]

Check warning on line 172 in src/proto/single/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/proto/single/mod.rs#L172

Added line #L172 was not covered by tests
pub struct Failure {
retry_count: usize,
failed_at: String,
Expand All @@ -108,29 +191,14 @@
S: Into<String>,
A: Into<serde_json::Value>,
{
use rand::{thread_rng, Rng};
let random_jid = thread_rng()
.sample_iter(&rand::distributions::Alphanumeric)
.map(char::from)
.take(16)
.collect();
use chrono::prelude::*;
Job {
jid: random_jid,
queue: "default".into(),
kind: kind.into(),
args: args.into_iter().map(|s| s.into()).collect(),

created_at: Some(Utc::now()),
enqueued_at: None,
at: None,
reserve_for: Some(600),
retry: Some(25),
priority: Some(5),
backtrace: Some(0),
failure: None,
custom: Default::default(),
}
JobBuilder::new(kind).args(args).build()
}

/// Create an instance of JobBuilder with 'kind' already set.
///
/// Equivalent to 'JobBuilder::new'
jonhoo marked this conversation as resolved.
Show resolved Hide resolved
pub fn builder<S: Into<String>>(kind: S) -> JobBuilder {
JobBuilder::new(kind)
}

/// Place this job on the given `queue`.
Expand Down Expand Up @@ -175,3 +243,65 @@
write_command(x, command)?;
read_ok(x)
}

#[cfg(test)]
mod test {
use super::*;

#[test]
fn test_job_can_be_created_with_builder() {
let job_kind = "order";
let job_args = vec!["ISBN-13:9781718501850"];
let job = JobBuilder::new(job_kind).args(job_args.clone()).build();

assert!(job.jid != "".to_owned());
assert!(job.queue == JOB_DEFAULT_QUEUE.to_string());
assert_eq!(job.kind, job_kind);
assert_eq!(job.args, job_args);

assert!(job.created_at.is_some());
assert!(job.created_at < Some(Utc::now()));

assert!(job.enqueued_at.is_none());
assert!(job.at.is_none());
assert_eq!(job.reserve_for, Some(JOB_DEFAULT_RESERVED_FOR_SECS));
assert_eq!(job.retry, Some(JOB_DEFAULT_RETRY_COUNT));
assert_eq!(job.priority, Some(JOB_DEFAULT_PRIORITY));
assert_eq!(job.backtrace, Some(JOB_DEFAULT_BACKTRACE));
assert!(job.failure.is_none());
assert_eq!(job.custom, HashMap::default());

let job = JobBuilder::new(job_kind).build();
assert!(job.args.is_empty());
}

#[test]
fn test_all_job_creation_variants_align() {
let job1 = Job::new("order", vec!["ISBN-13:9781718501850"]);
let job2 = JobBuilder::new("order")
.args(vec!["ISBN-13:9781718501850"])
.build();
assert_eq!(job1.kind, job2.kind);
assert_eq!(job1.args, job2.args);
assert_eq!(job1.queue, job2.queue);
assert_eq!(job1.enqueued_at, job2.enqueued_at);
assert_eq!(job1.at, job2.at);
assert_eq!(job1.reserve_for, job2.reserve_for);
assert_eq!(job1.retry, job2.retry);
assert_eq!(job1.priority, job2.priority);
assert_eq!(job1.backtrace, job2.backtrace);
assert_eq!(job1.custom, job2.custom);

assert_ne!(job1.jid, job2.jid);
assert_ne!(job1.created_at, job2.created_at);

let job3 = Job::builder("order")
.args(vec!["ISBN-13:9781718501850"])
.build();
assert_eq!(job2.kind, job3.kind);
assert_eq!(job1.args, job2.args);

assert_ne!(job2.jid, job3.jid);
assert_ne!(job2.created_at, job3.created_at);
}
}
Loading
Loading