diff --git a/arq/connections.py b/arq/connections.py index e843ac47..c1058890 100644 --- a/arq/connections.py +++ b/arq/connections.py @@ -167,9 +167,7 @@ async def enqueue_job( else: score = enqueue_time_ms - expires_ms = expires_ms or ( - 1 if score - enqueue_time_ms < 1 else score - enqueue_time_ms + self.expires_extra_ms - ) + expires_ms = expires_ms or score - enqueue_time_ms + self.expires_extra_ms job = serialize_job(function, args, kwargs, _job_try, enqueue_time_ms, serializer=self.job_serializer) pipe.multi() diff --git a/arq/worker.py b/arq/worker.py index 4c33b677..545ccf1c 100644 --- a/arq/worker.py +++ b/arq/worker.py @@ -751,7 +751,8 @@ async def run_cron(self, n: datetime, delay: float, num_windows: int = 2) -> Non job_id = f'{cron_job.name}:{to_unix_ms(cron_job.next_run)}' if cron_job.unique else None job_futures.add( self.pool.enqueue_job( - cron_job.name, _job_id=job_id, _queue_name=self.queue_name, _defer_until=cron_job.next_run + cron_job.name, _job_id=job_id, _queue_name=self.queue_name, + _defer_until=(cron_job.next_run if cron_job.next_run > datetime.now(tz=self.timezone) else None), ) ) cron_job.calculate_next(cron_job.next_run)