diff --git a/src/consumer/mod.rs b/src/consumer/mod.rs index 70102863..7445e47f 100644 --- a/src/consumer/mod.rs +++ b/src/consumer/mod.rs @@ -1,17 +1,14 @@ use crate::error::Error; - use crate::proto::{ - self, parse_provided_or_from_env, Client, ClientOptions, HeartbeatStatus, Reconnect, + self, parse_provided_or_from_env, Ack, Client, ClientOptions, Fail, HeartbeatStatus, Job, + Reconnect, }; - use fnv::FnvHashMap; use std::error::Error as StdError; use std::io::prelude::*; use std::net::TcpStream; use std::sync::{atomic, Arc, Mutex}; -use crate::proto::{Ack, Fail, Job}; - const STATUS_RUNNING: usize = 0; const STATUS_QUIET: usize = 1; const STATUS_TERMINATING: usize = 2; diff --git a/src/lib.rs b/src/lib.rs index 6c48135b..de0aac1f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -76,9 +76,7 @@ pub use tls::TlsStream; pub use crate::consumer::{Consumer, ConsumerBuilder}; pub use crate::error::Error; pub use crate::producer::Producer; -pub use crate::proto::Reconnect; - -pub use crate::proto::{Job, JobBuilder}; +pub use crate::proto::{Job, JobBuilder, Reconnect}; #[cfg(feature = "ent")] #[cfg_attr(docsrs, doc(cfg(feature = "ent")))] diff --git a/src/producer/mod.rs b/src/producer/mod.rs index ebee2c0a..624c69c5 100644 --- a/src/producer/mod.rs +++ b/src/producer/mod.rs @@ -2,12 +2,8 @@ use crate::error::Error; use crate::proto::{ self, parse_provided_or_from_env, Client, Info, Job, Push, QueueAction, QueueControl, }; - -#[cfg(feature = "ent")] -use crate::proto::{BatchHandle, CommitBatch, OpenBatch}; #[cfg(feature = "ent")] -use crate::Batch; - +use crate::proto::{Batch, BatchHandle, CommitBatch, OpenBatch}; use std::io::prelude::*; use std::net::TcpStream; @@ -143,10 +139,16 @@ impl Producer { } /// Open an already existing batch of jobs. + /// + /// This will not error if a batch with the provided `bid` does not exist, + /// rather `Ok(None)` will be returned. #[cfg(feature = "ent")] - pub fn open_batch(&mut self, bid: String) -> Result, Error> { - let bid = self.c.issue(&OpenBatch::from(bid))?.read_bid()?; - Ok(BatchHandle::new(bid, self)) + pub fn open_batch(&mut self, bid: String) -> Result>, Error> { + let bid = self.c.issue(&OpenBatch::from(bid))?.maybe_bid()?; + match bid { + Some(bid) => Ok(Some(BatchHandle::new(bid, self))), + None => Ok(None), + } } #[cfg(feature = "ent")] diff --git a/src/proto/batch/mod.rs b/src/proto/batch/mod.rs index 904c6035..588e541b 100644 --- a/src/proto/batch/mod.rs +++ b/src/proto/batch/mod.rs @@ -1,9 +1,7 @@ -use std::io::{Read, Write}; - +use crate::{Error, Job, Producer}; use chrono::{DateTime, Utc}; use derive_builder::Builder; - -use crate::{Error, Job, Producer}; +use std::io::{Read, Write}; mod cmd; @@ -76,7 +74,7 @@ pub use cmd::{CommitBatch, GetBatchStatus, OpenBatch}; /// effectively building a pipeline this way, since the Faktory guarantees that callback jobs will not be queued unless /// the batch gets committed. /// -/// You can retieve the batch status using a [`tracker`](struct.Tracker.html): +/// You can retieve the batch status using a [`Tracker`](struct.Tracker.html): /// ```no_run /// # use faktory::Error; /// # use faktory::{Producer, Job, Batch, Tracker}; diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 9dc9e988..47fd97e9 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -306,6 +306,23 @@ impl<'a, S: Read + Write> ReadToken<'a, S> { pub(crate) fn read_bid(self) -> Result { single::read_bid(&mut self.0.stream) } + + #[cfg(feature = "ent")] + pub(crate) fn maybe_bid(self) -> Result, Error> { + let bid_read_res = single::read_bid(&mut self.0.stream); + if bid_read_res.is_ok() { + return Ok(Some(bid_read_res.unwrap())); + } + match bid_read_res.unwrap_err() { + Error::Protocol(error::Protocol::Internal { msg }) => { + if msg.starts_with("No such batch") { + return Ok(None); + } + return Err(error::Protocol::Internal { msg }.into()); + } + another => Err(another), + } + } } #[cfg(test)] diff --git a/src/proto/single/resp.rs b/src/proto/single/resp.rs index 29b732e4..9a1d3e19 100644 --- a/src/proto/single/resp.rs +++ b/src/proto/single/resp.rs @@ -62,12 +62,6 @@ pub fn read_json(r: R) -> Result(r: R) -> Result { match read(r)? { - RawResponse::String(ref s) if s.is_empty() => Err(error::Protocol::BadType { - expected: "non-empty string representation of batch id", - received: "empty string".into(), - } - .into()), - RawResponse::String(ref s) => Ok(s.to_string()), RawResponse::Blob(ref b) if b.is_empty() => Err(error::Protocol::BadType { expected: "non-empty blob representation of batch id", received: "empty blob".into(), diff --git a/tests/real/enterprise.rs b/tests/real/enterprise.rs index 8a270f94..bd73c6bb 100644 --- a/tests/real/enterprise.rs +++ b/tests/real/enterprise.rs @@ -812,7 +812,7 @@ fn test_callback_will_not_be_queued_unless_batch_gets_committed() { } #[test] -fn test_callback_will_be_queue_upon_commit_even_if_batch_is_empty() { +fn test_callback_will_be_queued_upon_commit_even_if_batch_is_empty() { use std::{thread, time}; skip_if_not_enterprise!(); @@ -820,7 +820,7 @@ fn test_callback_will_be_queue_upon_commit_even_if_batch_is_empty() { let mut p = Producer::connect(Some(&url)).unwrap(); let mut t = Tracker::connect(Some(&url)).unwrap(); let jobtype = "callback_jobtype"; - let q_name = "test_callback_will_be_queue_upon_commit_even_if_batch_is_empty"; + let q_name = "test_callback_will_be_queued_upon_commit_even_if_batch_is_empty"; let mut callbacks = some_jobs(jobtype, q_name, 2); let b = p .start_batch( @@ -893,9 +893,14 @@ fn test_batch_can_be_reopened_add_extra_jobs_and_batches_added() { assert_eq!(status.pending, 2); // ############################## SUBTEST 0 ########################################## + // Let's try to open/reopen a batch we have never declared: + let b = p.open_batch(String::from("non-existent-batch-id")).unwrap(); + assert!(b.is_none()); + // ########################## END OF SUBTEST 0 ####################################### + + // ############################## SUBTEST 1 ########################################## // Let's fist of all try to open the batch we have not committed yet: - let mut b = p.open_batch(bid.clone()).unwrap(); - assert_eq!(b.id(), bid); + let mut b = p.open_batch(bid.clone()).unwrap().unwrap(); b.add(jobs.next().unwrap()).unwrap(); // 3 jobs b.commit().unwrap(); // committig the batch @@ -904,11 +909,12 @@ fn test_batch_can_be_reopened_add_extra_jobs_and_batches_added() { assert_eq!(status.total, 3); assert_eq!(status.pending, 3); - // Subtest 0 result: + // Subtest 1 result: // The Faktory server let's us open the uncommitted batch. This is something not mention // in the docs, but still worth checking. + // ########################### END OF SUBTEST 1 ###################################### - // ############################## SUBTEST 1 ########################################## + // ############################## SUBTEST 2 ########################################## // From the docs: // """Note that, once committed, only a job within the batch may reopen it. // Faktory will return an error if you dynamically add jobs from "outside" the batch; @@ -916,8 +922,10 @@ fn test_batch_can_be_reopened_add_extra_jobs_and_batches_added() { // Ref: https://github.com/contribsys/faktory/wiki/Ent-Batches#batch-open-bid (Jan 10, 2024) // Let's try to open an already committed batch: - let mut b = p.open_batch(bid.clone()).unwrap(); - assert_eq!(b.id(), bid); + let mut b = p + .open_batch(bid.clone()) + .expect("no error") + .expect("is some"); b.add(jobs.next().unwrap()).unwrap(); // 4 jobs b.commit().unwrap(); // committing the batch again! @@ -925,15 +933,14 @@ fn test_batch_can_be_reopened_add_extra_jobs_and_batches_added() { assert_eq!(s.total, 4); assert_eq!(s.pending, 4); - // Subtest 1 result: + // Subtest 2 result: // We managed to open a batch "from outside" and the server accepted the job INSTEAD OF ERRORING BACK. - // ############################ END OF SUBTEST 1 ####################################### + // ############################ END OF SUBTEST 2 ####################################### - // ############################## SUBTEST 2 ############################################ + // ############################## SUBTEST 3 ############################################ // Let's see if we will be able to - again - open the committed batch "from outside" and // add a nested batch to it. - let mut b = p.open_batch(bid.clone()).unwrap(); - assert_eq!(b.id(), bid); // this is to make sure this is the same batch INDEED + let mut b = p.open_batch(bid.clone()).unwrap().expect("is some"); let mut nested_callbacks = some_jobs( "order_clean_up__NESTED", "test_batch_can_be_reopned_add_extra_jobs_added__CALLBACKs__NESTED", @@ -955,19 +962,21 @@ fn test_batch_can_be_reopened_add_extra_jobs_and_batches_added() { assert_eq!(s.parent_bid, Some(bid)); // this is really our child batch assert_eq!(s.complete_callback_state, "1"); // has been enqueud - // Subtest 2 result: + // Subtest 3 result: // We managed to open an already committed batch "from outside" and the server accepted // a nested batch INSTEAD OF ERRORING BACK. - // ############################ END OF SUBTEST 2 ####################################### + // ############################ END OF SUBTEST 3 ####################################### - // ############################## SUBTEST 3 ############################################ + // ############################## SUBTEST 4 ############################################ // From the docs: // """Once a callback has enqueued for a batch, you may not add anything to the batch.""" // ref: https://github.com/contribsys/faktory/wiki/Ent-Batches#guarantees (Jan 10, 2024) // Let's try to re-open the nested batch that we have already committed and add some jobs to it. - let mut b = p.open_batch(nested_bid.clone()).unwrap(); - assert_eq!(b.id(), nested_bid); // this is to make sure this is the same batch INDEED + let mut b = p + .open_batch(nested_bid.clone()) + .expect("no error") + .expect("is some"); let mut more_jobs = some_jobs( "order_clean_up__NESTED", "test_batch_can_be_reopned_add_extra_jobs_added__NESTED", @@ -982,10 +991,10 @@ fn test_batch_can_be_reopened_add_extra_jobs_and_batches_added() { assert_eq!(s.pending, 2); // ... though there are pending jobs assert_eq!(s.total, 2); - // Subtest 3 result: + // Subtest 4 result: // We were able to add more jobs to the batch for which the Faktory server had already // queued the callback. - // ############################## END OF SUBTEST 3 ##################################### + // ############################## END OF SUBTEST 4 ##################################### // ############################## OVERALL RESULTS ###################################### // The guarantees that definitely hold: