Skip to content

Commit

Permalink
Merge branch 'main' into feat/async-support
Browse files Browse the repository at this point in the history
  • Loading branch information
rustworthy committed Jan 26, 2024
2 parents c13b8ec + b5ddd21 commit 53d4097
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 12 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "faktory"
version = "0.12.3"
version = "0.12.4"
authors = ["Jon Gjengset <[email protected]>"]
edition = "2021"
license = "MIT OR Apache-2.0"
Expand Down Expand Up @@ -63,3 +63,7 @@ required-features = ["binaries"]
name = "async_loadtest"
path = "src/bin/async_loadtest.rs"
required-features = ["binaries"]

[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]
6 changes: 4 additions & 2 deletions src/error.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
//! Enumerates all errors that this crate may return.
//!
//! [`Error`] is the top level error enum.
//! [`enum@Error`] is the top level error enum.
//! Most consumers should only need to interact with this type.
//! This is also where more generic errors such as I/O errors are placed,
//! whereas the more specific errors ([`Connection`] and [`Protocol`]) are
//! whereas the more specific errors ([`Connect`] and [`Protocol`]) are
//! related to logic.
//!
//! [`Connect`] describes errors specific to the connection logic, for example
Expand Down Expand Up @@ -44,6 +44,7 @@ pub enum Error {

/// Indicates an error in the underlying TLS stream.
#[cfg(feature = "tls")]
#[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
#[error("underlying tls stream")]
TlsStream(#[source] native_tls::Error),
}
Expand Down Expand Up @@ -95,6 +96,7 @@ pub enum Protocol {

/// The server reported a unique constraint violation.
#[cfg(feature = "ent")]
#[cfg_attr(docsrs, doc(cfg(feature = "ent")))]
#[error("server reported unique constraint violation: {msg}")]
UniqueConstraintViolation {
/// The error message given by the server.
Expand Down
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
//! }
//! ```
#![deny(missing_docs)]
#![cfg_attr(docsrs, feature(doc_cfg))]
#![warn(rust_2018_idioms)]

#[macro_use]
Expand All @@ -67,8 +68,10 @@ mod producer;
mod proto;

#[cfg(feature = "tls")]
#[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
mod tls;
#[cfg(feature = "tls")]
#[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
pub use tls::TlsStream;

pub use crate::consumer::{Consumer, ConsumerBuilder};
Expand Down
3 changes: 0 additions & 3 deletions src/proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@ pub use self::single::{
QueueControl, RawResponse,
};

// responses that users can see
pub use self::single::Hi;

pub(crate) fn get_env_url() -> String {
use std::env;
let var = env::var("FAKTORY_PROVIDER").unwrap_or_else(|_| "FAKTORY_URL".to_string());
Expand Down
1 change: 1 addition & 0 deletions src/proto/single/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ mod resp;
mod utils;

#[cfg(feature = "ent")]
#[cfg_attr(docsrs, doc(cfg(feature = "ent")))]
mod ent;

use crate::error::Error;
Expand Down
19 changes: 14 additions & 5 deletions tests/real/enterprise.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,31 +316,30 @@ fn ent_unique_job_until_start() {
#[test]
fn ent_unique_job_bypass_unique_lock() {
use faktory::error;
use serde_json::Value;

skip_if_not_enterprise!();

let url = learn_faktory_url();
let mut producer = Producer::connect(Some(&url)).unwrap();

let queue_name = "ent_unique_job_bypass_unique_lock";
let job1 = Job::builder("order")
.queue("ent_unique_job_bypass_unique_lock")
.queue(queue_name)
.unique_for(60)
.build();

// Now the following job is _technically_ a 'duplicate', BUT if the `unique_for` value is not set,
// the uniqueness lock will be bypassed on the server. This special case is mentioned in the docs:
// https://github.com/contribsys/faktory/wiki/Ent-Unique-Jobs#bypassing-uniqueness
let job2 = Job::builder("order") // same jobtype and args (args are just not set)
.queue("ent_unique_job_bypass_unique_lock") // same queue
.queue(queue_name) // same queue
.build(); // NB: `unique_for` not set

producer.enqueue(job1).unwrap();
producer.enqueue(job2).unwrap(); // bypassing the lock!

// This _is_ a 'duplicate'.
let job3 = Job::builder("order")
.queue("ent_unique_job_bypass_unique_lock")
.queue(queue_name)
.unique_for(60) // NB
.build();

Expand All @@ -351,4 +350,14 @@ fn ent_unique_job_bypass_unique_lock() {
} else {
panic!("Expected protocol error.")
}

// let's consume three times from the queue to verify that the first two jobs
// have been enqueued for real, while the last one has not.
let mut c = ConsumerBuilder::default();
c.register("order", |j| -> io::Result<_> { Ok(eprintln!("{:?}", j)) });
let mut c = c.connect(Some(&url)).unwrap();

assert!(c.run_one(0, &[queue_name]).unwrap());
assert!(c.run_one(0, &[queue_name]).unwrap());
assert!(!c.run_one(0, &[queue_name]).unwrap()); // empty;
}

0 comments on commit 53d4097

Please sign in to comment.