Skip to content

Commit

Permalink
Do not error if batch does not exist when re-opening
Browse files Browse the repository at this point in the history
  • Loading branch information
rustworthy committed Jan 30, 2024
1 parent 143319c commit e2c0a5c
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 47 deletions.
7 changes: 2 additions & 5 deletions src/consumer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
use crate::error::Error;

use crate::proto::{
self, parse_provided_or_from_env, Client, ClientOptions, HeartbeatStatus, Reconnect,
self, parse_provided_or_from_env, Ack, Client, ClientOptions, Fail, HeartbeatStatus, Job,
Reconnect,
};

use fnv::FnvHashMap;
use std::error::Error as StdError;
use std::io::prelude::*;
use std::net::TcpStream;
use std::sync::{atomic, Arc, Mutex};

use crate::proto::{Ack, Fail, Job};

const STATUS_RUNNING: usize = 0;
const STATUS_QUIET: usize = 1;
const STATUS_TERMINATING: usize = 2;
Expand Down
4 changes: 1 addition & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,7 @@ pub use tls::TlsStream;
pub use crate::consumer::{Consumer, ConsumerBuilder};
pub use crate::error::Error;
pub use crate::producer::Producer;
pub use crate::proto::Reconnect;

pub use crate::proto::{Job, JobBuilder};
pub use crate::proto::{Job, JobBuilder, Reconnect};

#[cfg(feature = "ent")]
#[cfg_attr(docsrs, doc(cfg(feature = "ent")))]
Expand Down
18 changes: 10 additions & 8 deletions src/producer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,8 @@ use crate::error::Error;
use crate::proto::{
self, parse_provided_or_from_env, Client, Info, Job, Push, QueueAction, QueueControl,
};

#[cfg(feature = "ent")]
use crate::proto::{BatchHandle, CommitBatch, OpenBatch};
#[cfg(feature = "ent")]
use crate::Batch;

use crate::proto::{Batch, BatchHandle, CommitBatch, OpenBatch};
use std::io::prelude::*;
use std::net::TcpStream;

Expand Down Expand Up @@ -143,10 +139,16 @@ impl<S: Read + Write> Producer<S> {
}

Check warning on line 139 in src/producer/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/producer/mod.rs#L136-L139

Added lines #L136 - L139 were not covered by tests

/// Open an already existing batch of jobs.
///
/// This will not error if a batch with the provided `bid` does not exist,
/// rather `Ok(None)` will be returned.
#[cfg(feature = "ent")]
pub fn open_batch(&mut self, bid: String) -> Result<BatchHandle<'_, S>, Error> {
let bid = self.c.issue(&OpenBatch::from(bid))?.read_bid()?;
Ok(BatchHandle::new(bid, self))
pub fn open_batch(&mut self, bid: String) -> Result<Option<BatchHandle<'_, S>>, Error> {
let bid = self.c.issue(&OpenBatch::from(bid))?.maybe_bid()?;
match bid {
Some(bid) => Ok(Some(BatchHandle::new(bid, self))),
None => Ok(None),

Check warning on line 150 in src/producer/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/producer/mod.rs#L146-L150

Added lines #L146 - L150 were not covered by tests
}
}

Check warning on line 152 in src/producer/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/producer/mod.rs#L152

Added line #L152 was not covered by tests

#[cfg(feature = "ent")]
Expand Down
8 changes: 3 additions & 5 deletions src/proto/batch/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use std::io::{Read, Write};

use crate::{Error, Job, Producer};
use chrono::{DateTime, Utc};
use derive_builder::Builder;

use crate::{Error, Job, Producer};
use std::io::{Read, Write};

mod cmd;

Expand Down Expand Up @@ -76,7 +74,7 @@ pub use cmd::{CommitBatch, GetBatchStatus, OpenBatch};
/// effectively building a pipeline this way, since the Faktory guarantees that callback jobs will not be queued unless
/// the batch gets committed.
///
/// You can retieve the batch status using a [`tracker`](struct.Tracker.html):
/// You can retieve the batch status using a [`Tracker`]:
/// ```no_run
/// # use faktory::Error;
/// # use faktory::{Producer, Job, Batch, Tracker};
Expand Down
17 changes: 17 additions & 0 deletions src/proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,23 @@ impl<'a, S: Read + Write> ReadToken<'a, S> {
pub(crate) fn read_bid(self) -> Result<String, Error> {
single::read_bid(&mut self.0.stream)
}

Check warning on line 308 in src/proto/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/proto/mod.rs#L306-L308

Added lines #L306 - L308 were not covered by tests

#[cfg(feature = "ent")]
pub(crate) fn maybe_bid(self) -> Result<Option<String>, Error> {
let bid_read_res = single::read_bid(&mut self.0.stream);
if bid_read_res.is_ok() {
return Ok(Some(bid_read_res.unwrap()));
}
match bid_read_res.unwrap_err() {
Error::Protocol(error::Protocol::Internal { msg }) => {
if msg.starts_with("No such batch") {
return Ok(None);
}
return Err(error::Protocol::Internal { msg }.into());

Check warning on line 321 in src/proto/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/proto/mod.rs#L311-L321

Added lines #L311 - L321 were not covered by tests
}
another => Err(another),

Check warning on line 323 in src/proto/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/proto/mod.rs#L323

Added line #L323 was not covered by tests
}
}

Check warning on line 325 in src/proto/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/proto/mod.rs#L325

Added line #L325 was not covered by tests
}

#[cfg(test)]
Expand Down
6 changes: 0 additions & 6 deletions src/proto/single/resp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,6 @@ pub fn read_json<R: BufRead, T: serde::de::DeserializeOwned>(r: R) -> Result<Opt
#[cfg(feature = "ent")]
pub fn read_bid<R: BufRead>(r: R) -> Result<String, Error> {
match read(r)? {
RawResponse::String(ref s) if s.is_empty() => Err(error::Protocol::BadType {
expected: "non-empty string representation of batch id",
received: "empty string".into(),
}
.into()),
RawResponse::String(ref s) => Ok(s.to_string()),
RawResponse::Blob(ref b) if b.is_empty() => Err(error::Protocol::BadType {
expected: "non-empty blob representation of batch id",
received: "empty blob".into(),
Expand Down
49 changes: 29 additions & 20 deletions tests/real/enterprise.rs
Original file line number Diff line number Diff line change
Expand Up @@ -812,15 +812,15 @@ fn test_callback_will_not_be_queued_unless_batch_gets_committed() {
}

#[test]
fn test_callback_will_be_queue_upon_commit_even_if_batch_is_empty() {
fn test_callback_will_be_queued_upon_commit_even_if_batch_is_empty() {
use std::{thread, time};

skip_if_not_enterprise!();
let url = learn_faktory_url();
let mut p = Producer::connect(Some(&url)).unwrap();
let mut t = Tracker::connect(Some(&url)).unwrap();
let jobtype = "callback_jobtype";
let q_name = "test_callback_will_be_queue_upon_commit_even_if_batch_is_empty";
let q_name = "test_callback_will_be_queued_upon_commit_even_if_batch_is_empty";
let mut callbacks = some_jobs(jobtype, q_name, 2);
let b = p
.start_batch(
Expand Down Expand Up @@ -893,9 +893,14 @@ fn test_batch_can_be_reopened_add_extra_jobs_and_batches_added() {
assert_eq!(status.pending, 2);

// ############################## SUBTEST 0 ##########################################
// Let's try to open/reopen a batch we have never declared:
let b = p.open_batch(String::from("non-existent-batch-id")).unwrap();
assert!(b.is_none());
// ########################## END OF SUBTEST 0 #######################################

// ############################## SUBTEST 1 ##########################################
// Let's fist of all try to open the batch we have not committed yet:
let mut b = p.open_batch(bid.clone()).unwrap();
assert_eq!(b.id(), bid);
let mut b = p.open_batch(bid.clone()).unwrap().unwrap();
b.add(jobs.next().unwrap()).unwrap(); // 3 jobs

b.commit().unwrap(); // committig the batch
Expand All @@ -904,36 +909,38 @@ fn test_batch_can_be_reopened_add_extra_jobs_and_batches_added() {
assert_eq!(status.total, 3);
assert_eq!(status.pending, 3);

// Subtest 0 result:
// Subtest 1 result:
// The Faktory server let's us open the uncommitted batch. This is something not mention
// in the docs, but still worth checking.
// ########################### END OF SUBTEST 1 ######################################

// ############################## SUBTEST 1 ##########################################
// ############################## SUBTEST 2 ##########################################
// From the docs:
// """Note that, once committed, only a job within the batch may reopen it.
// Faktory will return an error if you dynamically add jobs from "outside" the batch;
// this is to prevent a race condition between callbacks firing and an outsider adding more jobs."""
// Ref: https://github.com/contribsys/faktory/wiki/Ent-Batches#batch-open-bid (Jan 10, 2024)

// Let's try to open an already committed batch:
let mut b = p.open_batch(bid.clone()).unwrap();
assert_eq!(b.id(), bid);
let mut b = p
.open_batch(bid.clone())
.expect("no error")
.expect("is some");
b.add(jobs.next().unwrap()).unwrap(); // 4 jobs
b.commit().unwrap(); // committing the batch again!

let s = t.get_batch_status(bid.clone()).unwrap().unwrap();
assert_eq!(s.total, 4);
assert_eq!(s.pending, 4);

// Subtest 1 result:
// Subtest 2 result:
// We managed to open a batch "from outside" and the server accepted the job INSTEAD OF ERRORING BACK.
// ############################ END OF SUBTEST 1 #######################################
// ############################ END OF SUBTEST 2 #######################################

// ############################## SUBTEST 2 ############################################
// ############################## SUBTEST 3 ############################################
// Let's see if we will be able to - again - open the committed batch "from outside" and
// add a nested batch to it.
let mut b = p.open_batch(bid.clone()).unwrap();
assert_eq!(b.id(), bid); // this is to make sure this is the same batch INDEED
let mut b = p.open_batch(bid.clone()).unwrap().expect("is some");
let mut nested_callbacks = some_jobs(
"order_clean_up__NESTED",
"test_batch_can_be_reopned_add_extra_jobs_added__CALLBACKs__NESTED",
Expand All @@ -955,19 +962,21 @@ fn test_batch_can_be_reopened_add_extra_jobs_and_batches_added() {
assert_eq!(s.parent_bid, Some(bid)); // this is really our child batch
assert_eq!(s.complete_callback_state, "1"); // has been enqueud

// Subtest 2 result:
// Subtest 3 result:
// We managed to open an already committed batch "from outside" and the server accepted
// a nested batch INSTEAD OF ERRORING BACK.
// ############################ END OF SUBTEST 2 #######################################
// ############################ END OF SUBTEST 3 #######################################

// ############################## SUBTEST 3 ############################################
// ############################## SUBTEST 4 ############################################
// From the docs:
// """Once a callback has enqueued for a batch, you may not add anything to the batch."""
// ref: https://github.com/contribsys/faktory/wiki/Ent-Batches#guarantees (Jan 10, 2024)

// Let's try to re-open the nested batch that we have already committed and add some jobs to it.
let mut b = p.open_batch(nested_bid.clone()).unwrap();
assert_eq!(b.id(), nested_bid); // this is to make sure this is the same batch INDEED
let mut b = p
.open_batch(nested_bid.clone())
.expect("no error")
.expect("is some");
let mut more_jobs = some_jobs(
"order_clean_up__NESTED",
"test_batch_can_be_reopned_add_extra_jobs_added__NESTED",
Expand All @@ -982,10 +991,10 @@ fn test_batch_can_be_reopened_add_extra_jobs_and_batches_added() {
assert_eq!(s.pending, 2); // ... though there are pending jobs
assert_eq!(s.total, 2);

// Subtest 3 result:
// Subtest 4 result:
// We were able to add more jobs to the batch for which the Faktory server had already
// queued the callback.
// ############################## END OF SUBTEST 3 #####################################
// ############################## END OF SUBTEST 4 #####################################

// ############################## OVERALL RESULTS ######################################
// The guarantees that definitely hold:
Expand Down

0 comments on commit e2c0a5c

Please sign in to comment.