Skip to content

Commit

Permalink
Make WorkerBuilder return Self
Browse files Browse the repository at this point in the history
  • Loading branch information
rustworthy committed Apr 28, 2024
1 parent fd231d1 commit 15d1711
Show file tree
Hide file tree
Showing 10 changed files with 335 additions and 277 deletions.
27 changes: 14 additions & 13 deletions src/bin/loadtest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,19 +58,20 @@ async fn main() {
set.spawn(async move {
// make producer and consumer
let mut p = Client::connect(None).await.unwrap();
let mut c = WorkerBuilder::default();
c.register_fn("SomeJob", |_| {
Box::pin(async move {
let mut rng = rand::thread_rng();
if rng.gen_bool(0.01) {
Err(io::Error::new(io::ErrorKind::Other, "worker closed"))
} else {
Ok(())
}
let mut worker = WorkerBuilder::default()
.register_fn("SomeJob", |_| {
Box::pin(async move {
let mut rng = rand::thread_rng();
if rng.gen_bool(0.01) {
Err(io::Error::new(io::ErrorKind::Other, "worker closed"))
} else {
Ok(())

Check warning on line 68 in src/bin/loadtest.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/loadtest.rs#L54-L68

Added lines #L54 - L68 were not covered by tests
}
})
})
});
let mut c = c.connect(None).await.unwrap();

.connect(None)
.await
.unwrap();
let mut rng = rand::rngs::OsRng;
let mut random_queues = Vec::from(QUEUES);
random_queues.shuffle(&mut rng);
Expand All @@ -89,7 +90,7 @@ async fn main() {
}
} else {
// pop
c.run_one(0, &random_queues[..]).await?;
worker.run_one(0, &random_queues[..]).await?;
if popped.fetch_add(1, atomic::Ordering::SeqCst) >= jobs {
return Ok(idx);

Check warning on line 95 in src/bin/loadtest.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/loadtest.rs#L70-L95

Added lines #L70 - L95 were not covered by tests
}
Expand Down
13 changes: 7 additions & 6 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,13 @@
//! # tokio_test::block_on(async {
//! use faktory::WorkerBuilder;
//! use std::io;
//! let mut w = WorkerBuilder::default();
//! w.register_fn("foobar", |job| async move {
//! println!("{:?}", job);
//! Ok::<(), io::Error>(())
//! });
//! let mut w = w.connect(None).await.unwrap();
//! let mut w = WorkerBuilder::default()
//! .register_fn("foobar", |job| async move {
//! println!("{:?}", job);
//! Ok::<(), io::Error>(())
//! })
//! .connect(None).await.unwrap();
//!
//! if let Err(e) = w.run(&["default"]).await {
//! println!("worker failed: {}", e);
//! }
Expand Down
17 changes: 8 additions & 9 deletions src/worker/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,15 @@ impl<E: 'static> WorkerBuilder<E> {
/// Set the hostname to use for this worker.
///
/// Defaults to the machine's hostname as reported by the operating system.
pub fn hostname(&mut self, hn: String) -> &mut Self {
pub fn hostname(mut self, hn: String) -> Self {
self.opts.hostname = Some(hn);
self
}

/// Set a unique identifier for this worker.
///
/// Defaults to a randomly generated 32-char ASCII string.
pub fn wid(&mut self, wid: WorkerId) -> &mut Self {
pub fn wid(mut self, wid: WorkerId) -> Self {
self.opts.wid = Some(wid);
self
}
Expand All @@ -60,7 +60,7 @@ impl<E: 'static> WorkerBuilder<E> {
/// Note that calling this overrides the labels set previously.
///
/// If you need to extend the labels already set, use [`WorkerBuilder::add_to_labels`] instead.
pub fn labels<I>(&mut self, labels: I) -> &mut Self
pub fn labels<I>(mut self, labels: I) -> Self
where
I: IntoIterator<Item = String>,
{
Expand All @@ -74,7 +74,7 @@ impl<E: 'static> WorkerBuilder<E> {
/// if no labels have been explicitly set before - to the default `"rust"` label.
///
/// If you need to override the labels set previously, use [`WorkerBuilder::labels`] instead.
pub fn add_to_labels<I>(&mut self, labels: I) -> &mut Self
pub fn add_to_labels<I>(mut self, labels: I) -> Self
where
I: IntoIterator<Item = String>,
{
Expand All @@ -85,7 +85,7 @@ impl<E: 'static> WorkerBuilder<E> {
/// Set the number of workers to use `run` and `run_to_completion`.
///
/// Defaults to 1.
pub fn workers(&mut self, w: usize) -> &mut Self {
pub fn workers(mut self, w: usize) -> Self {
self.workers_count = w;
self
}
Expand All @@ -97,14 +97,13 @@ impl<E: 'static> WorkerBuilder<E> {
///
/// Note that only one single handler per job kind is supported. Registering another handler
/// for the same job kind will silently override the handler registered previously.
pub fn register_fn<K, H, Fut>(&mut self, kind: K, handler: H) -> &mut Self
pub fn register_fn<K, H, Fut>(self, kind: K, handler: H) -> Self
where
K: Into<String>,
H: Fn(Job) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<(), E>> + Send,
{
self.register(kind, Closure(handler));
self
self.register(kind, Closure(handler))
}

/// Register a handler for the given job type (`kind`).
Expand All @@ -114,7 +113,7 @@ impl<E: 'static> WorkerBuilder<E> {
///
/// Note that only one single handler per job kind is supported. Registering another handler
/// for the same job kind will silently override the handler registered previously.
pub fn register<K, H>(&mut self, kind: K, runner: H) -> &mut Self
pub fn register<K, H>(mut self, kind: K, runner: H) -> Self
where
K: Into<String>,
H: JobRunner<Error = E> + 'static,
Expand Down
22 changes: 14 additions & 8 deletions src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,12 @@ type CallbacksRegistry<E> = FnvHashMap<String, runner::BoxedJobRunner<E>>;
/// Ok(())
/// }
///
/// let mut w = WorkerBuilder::default();
/// let mut w = WorkerBuilder::default()
/// .register_fn("foo", process_job)
/// .connect(None)
/// .await
/// .unwrap();
///
/// w.register_fn("foo", process_job);
///
/// let mut w = w.connect(None).await.unwrap();
/// if let Err(e) = w.run(&["default"]).await {
/// println!("worker failed: {}", e);
/// }
Expand All @@ -124,12 +125,17 @@ type CallbacksRegistry<E> = FnvHashMap<String, runner::BoxedJobRunner<E>>;
/// Handler can be inlined.
///
/// ```no_run
/// # tokio_test::block_on(async {
/// # use faktory::WorkerBuilder;
/// # use std::io;
/// let mut w = WorkerBuilder::default();
/// w.register_fn("bar", |job| async move {
/// println!("{:?}", job);
/// Ok::<(), io::Error>(())
/// let _w = WorkerBuilder::default()
/// .register_fn("bar", |job| async move {
/// println!("{:?}", job);
/// Ok::<(), io::Error>(())
/// })
/// .connect(None)
/// .await
/// .unwrap();
/// });
/// ```
///
Expand Down
12 changes: 8 additions & 4 deletions src/worker/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,17 @@ use std::future::Future;
/// Ok(())
/// }
/// }
///
/// let mut w = WorkerBuilder::default();
///
/// let handler = MyHandler {
/// config: "bar".to_string(),
/// };
/// w.register("foo", handler);
/// let mut w = w.connect(None).await.unwrap();
///
/// let mut w = WorkerBuilder::default()
/// .register("foo", handler)
/// .connect(None)
/// .await
/// .unwrap();
///
/// if let Err(e) = w.run(&["default"]).await {
/// println!("worker failed: {}", e);
/// }
Expand Down
Loading

0 comments on commit 15d1711

Please sign in to comment.