From 79194c2a6da557c8bd35e366ee6e28643fe0a0ac Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Fri, 20 Sep 2024 00:06:50 +0400 Subject: [PATCH] Use tokio::time::interval in heartbeat thread' --- src/proto/single/utils.rs | 4 +--- src/worker/health.rs | 5 +++-- src/worker/mod.rs | 4 ++-- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/src/proto/single/utils.rs b/src/proto/single/utils.rs index 0fc96214..897c9036 100644 --- a/src/proto/single/utils.rs +++ b/src/proto/single/utils.rs @@ -55,9 +55,7 @@ pub(crate) fn deser_as_optional_duration<'de, D>(value: D) -> Result, { - Ok(u64::deserialize(value) - .ok() - .map(|value| (Duration::from_secs(value)))) + Ok(u64::deserialize(value).ok().map(Duration::from_secs)) } pub(crate) fn ser_server_time(value: &NaiveTime, serializer: S) -> Result diff --git a/src/worker/health.rs b/src/worker/health.rs index e7984258..6522ff20 100644 --- a/src/worker/health.rs +++ b/src/worker/health.rs @@ -5,7 +5,6 @@ use std::{ sync::{atomic, Arc}, time::{self, Duration}, }; -use tokio::time::sleep as tokio_sleep; const CHECK_STATE_INTERVAL: Duration = Duration::from_millis(100); const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5); @@ -34,9 +33,11 @@ where let mut target = STATUS_RUNNING; let mut last = time::Instant::now(); + let mut check_state_interval = tokio::time::interval(CHECK_STATE_INTERVAL); + check_state_interval.tick().await; loop { - tokio_sleep(CHECK_STATE_INTERVAL).await; + check_state_interval.tick().await; // has a worker failed? let worker_failure = target == STATUS_RUNNING diff --git a/src/worker/mod.rs b/src/worker/mod.rs index bbba6178..2b6aab89 100644 --- a/src/worker/mod.rs +++ b/src/worker/mod.rs @@ -331,7 +331,7 @@ impl Worker { /// discontinued due to a signal from the Faktory server or a graceful shutdown signal, /// calling this method will mean you are trying to run a _terminated_ worker which will /// cause a panic. You will need to build and run a new worker instead. - /// + /// /// You can check if the worker has been terminated with [`Worker::is_terminated`]. pub async fn run_one(&mut self, worker: usize, queues: &[Q]) -> Result where @@ -440,7 +440,7 @@ impl Worker { /// Note that if you provided a shutdown signal when building this worker (see [`WorkerBuilder::with_graceful_shutdown`]), /// and this signal resolved, the worker will be marked as terminated and calling this method will cause a panic. /// You will need to build and run a new worker instead. - /// + /// /// You can check if the worker has been terminated with [`Worker::is_terminated`]. pub async fn run(&mut self, queues: &[Q]) -> Result where