Skip to content

Commit

Permalink
Add tracker, add support for batch jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
rustworthy committed Jan 17, 2024
1 parent 12af04a commit 129fa3e
Show file tree
Hide file tree
Showing 13 changed files with 1,514 additions and 84 deletions.
18 changes: 9 additions & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 6 additions & 5 deletions src/consumer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
use crate::error::Error;
use crate::proto::{self, Client, ClientOptions, HeartbeatStatus, Reconnect};

use crate::proto::{
self, parse_provided_or_from_env, Client, ClientOptions, HeartbeatStatus, Reconnect,
};

use fnv::FnvHashMap;
use std::error::Error as StdError;
use std::io::prelude::*;
Expand Down Expand Up @@ -213,10 +217,7 @@ impl<E> ConsumerBuilder<E> {
///
/// If `url` is given, but does not specify a port, it defaults to 7419.
pub fn connect(self, url: Option<&str>) -> Result<Consumer<TcpStream, E>, Error> {
let url = match url {
Some(url) => proto::url_parse(url),
None => proto::url_parse(&proto::get_env_url()),
}?;
let url = parse_provided_or_from_env(url)?;
let stream = TcpStream::connect(proto::host_from_url(&url))?;
Self::connect_with(self, stream, url.password().map(|p| p.to_string()))
}
Expand Down
10 changes: 10 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,14 @@ pub use crate::consumer::{Consumer, ConsumerBuilder};
pub use crate::error::Error;
pub use crate::producer::Producer;
pub use crate::proto::Reconnect;

pub use crate::proto::{Job, JobBuilder};

pub use crate::proto::{Batch, BatchBuilder, BatchStatus};

#[cfg(feature = "ent")]
mod tracker;
#[cfg(feature = "ent")]
pub use crate::proto::{Progress, ProgressUpdate, ProgressUpdateBuilder};
#[cfg(feature = "ent")]
pub use crate::tracker::Tracker;
29 changes: 24 additions & 5 deletions src/producer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
use crate::error::Error;
use crate::proto::{self, Client, Info, Job, Push, QueueAction, QueueControl};
use crate::proto::{
self, parse_provided_or_from_env, BatchHandle, Client, CommitBatch, Info, Job, OpenBatch, Push,
QueueAction, QueueControl,
};

use crate::Batch;

use std::io::prelude::*;
use std::net::TcpStream;

Expand Down Expand Up @@ -82,10 +88,7 @@ impl Producer<TcpStream> {
///
/// If `url` is given, but does not specify a port, it defaults to 7419.
pub fn connect(url: Option<&str>) -> Result<Self, Error> {
let url = match url {
Some(url) => proto::url_parse(url),
None => proto::url_parse(&proto::get_env_url()),
}?;
let url = parse_provided_or_from_env(url)?;
let stream = TcpStream::connect(proto::host_from_url(&url))?;
Self::connect_with(stream, url.password().map(|p| p.to_string()))
}
Expand Down Expand Up @@ -129,6 +132,22 @@ impl<S: Read + Write> Producer<S> {
.issue(&QueueControl::new(QueueAction::Resume, queues))?
.await_ok()
}

/// Initiate a new batch of jobs.
pub fn start_batch(&mut self, batch: Batch) -> Result<BatchHandle<'_, S>, Error> {
let bid = self.c.issue(&batch)?.read_bid()?;
Ok(BatchHandle::new(bid, self))
}

/// Open an already existing batch of jobs.
pub fn open_batch(&mut self, bid: String) -> Result<BatchHandle<'_, S>, Error> {
let bid = self.c.issue(&OpenBatch::from(bid))?.read_bid()?;
Ok(BatchHandle::new(bid, self))
}

pub(crate) fn commit_batch(&mut self, bid: String) -> Result<(), Error> {
self.c.issue(&CommitBatch::from(bid))?.await_ok()
}
}

#[cfg(test)]
Expand Down
65 changes: 65 additions & 0 deletions src/proto/batch/cmd.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use crate::proto::single::FaktoryCommand;
use crate::{Batch, Error};
use std::io::Write;

impl FaktoryCommand for Batch {
fn issue<W: Write>(&self, w: &mut W) -> Result<(), Error> {
w.write_all(b"BATCH NEW ")?;
serde_json::to_writer(&mut *w, self).map_err(Error::Serialization)?;
Ok(w.write_all(b"\r\n")?)
}
}

// ----------------------------------------------

pub struct CommitBatch(String);

impl From<String> for CommitBatch {
fn from(value: String) -> Self {
CommitBatch(value)
}
}

impl FaktoryCommand for CommitBatch {
fn issue<W: Write>(&self, w: &mut W) -> Result<(), Error> {
w.write_all(b"BATCH COMMIT ")?;
w.write_all(self.0.as_bytes())?;
Ok(w.write_all(b"\r\n")?)
}
}

// ----------------------------------------------

pub struct GetBatchStatus(String);

impl From<String> for GetBatchStatus {
fn from(value: String) -> Self {
GetBatchStatus(value)
}
}

impl FaktoryCommand for GetBatchStatus {
fn issue<W: Write>(&self, w: &mut W) -> Result<(), Error> {
w.write_all(b"BATCH STATUS ")?;
w.write_all(self.0.as_bytes())?;
Ok(w.write_all(b"\r\n")?)
}
}

// ----------------------------------------------

pub struct OpenBatch(String);

impl From<String> for OpenBatch {
fn from(value: String) -> Self {
OpenBatch(value)
}
}

impl FaktoryCommand for OpenBatch {
fn issue<W: Write>(&self, w: &mut W) -> Result<(), Error> {
w.write_all(b"BATCH OPEN ")?;
w.write_all(self.0.as_bytes())?;
Ok(w.write_all(b"\r\n")?)
}
}
Loading

0 comments on commit 129fa3e

Please sign in to comment.