Skip to content

Commit

Permalink
Merge branch 'main' into feat/ent-batch-jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
rustworthy committed Jan 20, 2024
2 parents b143490 + b5ddd21 commit 030484f
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 14 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "faktory"
version = "0.12.3"
version = "0.12.4"
authors = ["Jon Gjengset <[email protected]>"]
edition = "2021"
license = "MIT OR Apache-2.0"
Expand Down Expand Up @@ -45,3 +45,7 @@ openssl = { version = "0.10.60", optional = true }
name = "loadtest"
path = "src/bin/loadtest.rs"
required-features = ["binaries"]

[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]
6 changes: 4 additions & 2 deletions src/error.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
//! Enumerates all errors that this crate may return.
//!
//! [`Error`] is the top level error enum.
//! [`enum@Error`] is the top level error enum.
//! Most consumers should only need to interact with this type.
//! This is also where more generic errors such as I/O errors are placed,
//! whereas the more specific errors ([`Connection`] and [`Protocol`]) are
//! whereas the more specific errors ([`Connect`] and [`Protocol`]) are
//! related to logic.
//!
//! [`Connect`] describes errors specific to the connection logic, for example
Expand Down Expand Up @@ -44,6 +44,7 @@ pub enum Error {

/// Indicates an error in the underlying TLS stream.
#[cfg(feature = "tls")]
#[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
#[error("underlying tls stream")]
TlsStream(#[source] native_tls::Error),
}
Expand Down Expand Up @@ -95,6 +96,7 @@ pub enum Protocol {

/// The server reported a unique constraint violation.
#[cfg(feature = "ent")]
#[cfg_attr(docsrs, doc(cfg(feature = "ent")))]
#[error("server reported unique constraint violation: {msg}")]
UniqueConstraintViolation {
/// The error message given by the server.
Expand Down
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
//! }
//! ```
#![deny(missing_docs)]
#![cfg_attr(docsrs, feature(doc_cfg))]
#![warn(rust_2018_idioms)]

#[macro_use]
Expand All @@ -66,8 +67,10 @@ mod producer;
mod proto;

#[cfg(feature = "tls")]
#[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
mod tls;
#[cfg(feature = "tls")]
#[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
pub use tls::TlsStream;

pub use crate::consumer::{Consumer, ConsumerBuilder};
Expand Down
5 changes: 0 additions & 5 deletions src/proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,6 @@ pub use self::single::{
gen_random_wid, Ack, Fail, Heartbeat, Info, Job, JobBuilder, Push, QueueAction, QueueControl,
};

// responses that users can see
pub use self::single::Hi;

pub use self::single::gen_random_jid;

#[cfg(feature = "ent")]
mod batch;
#[cfg(feature = "ent")]
Expand Down
3 changes: 2 additions & 1 deletion src/proto/single/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ mod resp;
mod utils;

#[cfg(feature = "ent")]
pub mod ent;
#[cfg_attr(docsrs, doc(cfg(feature = "ent")))]
mod ent;

use crate::error::Error;

Expand Down
18 changes: 14 additions & 4 deletions tests/real/enterprise.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,25 +351,25 @@ fn ent_unique_job_bypass_unique_lock() {
let url = learn_faktory_url();

let mut producer = Producer::connect(Some(&url)).unwrap();

let queue_name = "ent_unique_job_bypass_unique_lock";
let job1 = Job::builder("order")
.queue("ent_unique_job_bypass_unique_lock")
.queue(queue_name)
.unique_for(60)
.build();

// Now the following job is _technically_ a 'duplicate', BUT if the `unique_for` value is not set,
// the uniqueness lock will be bypassed on the server. This special case is mentioned in the docs:
// https://github.com/contribsys/faktory/wiki/Ent-Unique-Jobs#bypassing-uniqueness
let job2 = Job::builder("order") // same jobtype and args (args are just not set)
.queue("ent_unique_job_bypass_unique_lock") // same queue
.queue(queue_name) // same queue
.build(); // NB: `unique_for` not set

producer.enqueue(job1).unwrap();
producer.enqueue(job2).unwrap(); // bypassing the lock!

// This _is_ a 'duplicate'.
let job3 = Job::builder("order")
.queue("ent_unique_job_bypass_unique_lock")
.queue(queue_name)
.unique_for(60) // NB
.build();

Expand All @@ -380,6 +380,16 @@ fn ent_unique_job_bypass_unique_lock() {
} else {
panic!("Expected protocol error.")
}

// let's consume three times from the queue to verify that the first two jobs
// have been enqueued for real, while the last one has not.
let mut c = ConsumerBuilder::default();
c.register("order", |j| -> io::Result<_> { Ok(eprintln!("{:?}", j)) });
let mut c = c.connect(Some(&url)).unwrap();

assert!(c.run_one(0, &[queue_name]).unwrap());
assert!(c.run_one(0, &[queue_name]).unwrap());
assert!(!c.run_one(0, &[queue_name]).unwrap()); // empty;
}

#[test]
Expand Down

0 comments on commit 030484f

Please sign in to comment.