Skip to content

Commit

Permalink
Only hadnle cancellation future for non-forever runs
Browse files Browse the repository at this point in the history
  • Loading branch information
rustworthy committed May 24, 2024
1 parent 8340767 commit 6093201
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 6 deletions.
4 changes: 1 addition & 3 deletions src/worker/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,7 @@ impl<E: 'static> WorkerBuilder<E> {
/// Set a graceful shutdown signal.
///
/// As soon as the provided future resolves, the graceful shutdown will step in
/// making the [`Worker::run`] operation return control to the calling code.
/// In case of the [`Worker::run_to_completion`] operation, the process will be exited
/// upon gracefull shutdown.
/// making the long-running operation (see [`Worker::run`]) return control to the calling code.
///
/// The graceful shutdown itself is a race between the clean up needed to be performed
/// (e.g. report on the currently processed to the Faktory server) and a shutdown deadline.
Expand Down
13 changes: 10 additions & 3 deletions src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,14 @@ impl<
.await?;
}

let signal = self.shutdown_signal.take();
// for "forever" operations (currently only `Worker::run_to_completion`) we support SIGTERM handling,
// whereas for long-running operations (see `Worker::run`), we are polling the cancellation future which
// serves as a signal to start graceful shutdowna and return control to the calling site
let cancel_signal = if self.forever {
None

Check warning on line 401 in src/worker/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/worker/mod.rs#L401

Added line #L401 was not covered by tests
} else {
self.shutdown_signal.take()
};

let report = tokio::select! {
// A signal SIGTERM from the OS received.
Expand All @@ -412,7 +419,7 @@ impl<
}
},
// A signal from the user space received.
_ = async { let signal = signal.unwrap(); signal.await }, if signal.is_some() => {
_ = async { let signal = cancel_signal.unwrap(); signal.await }, if cancel_signal.is_some() => {
let nrunning = tokio::select! {
_ = tokio_sleep(TokioDuration::from_millis(self.shutdown_timeout)) => {
0
Expand Down Expand Up @@ -457,7 +464,7 @@ impl<

/// Run this worker until the server tells us to exit or a connection cannot be re-established.
///
/// This function never returns. When the worker decides to exit or SIGTERM is received,
/// This function never returns. When the worker decides to exit or `SIGTERM` is received,
/// the process is terminated within the [shutdown period](WorkerBuilder::graceful_shutdown_period).
pub async fn run_to_completion<Q>(mut self, queues: &[Q]) -> !
where
Expand Down

0 comments on commit 6093201

Please sign in to comment.