From 04a25f432738c10c9aa1cddcc78e135ef8df63d3 Mon Sep 17 00:00:00 2001 From: thiagogds Date: Thu, 17 Oct 2024 19:30:02 +0200 Subject: [PATCH] Fix RQ wrongly moving jobs to FailedJobRegistry (#7186) Something changed in python-rq and the old code was behaving in a way that if a job ran for longer than 2 min it would be automatically set as failed, but it would continue running. This causes a problem in the UI because it is as if the job stopped, but it actually didn't --- redash/tasks/worker.py | 55 +++++++++++++++++++++++++++--------------- 1 file changed, 36 insertions(+), 19 deletions(-) diff --git a/redash/tasks/worker.py b/redash/tasks/worker.py index 1983168d3d..004a30be8b 100644 --- a/redash/tasks/worker.py +++ b/redash/tasks/worker.py @@ -6,7 +6,7 @@ from rq import Queue as BaseQueue from rq.job import Job as BaseJob from rq.job import JobStatus -from rq.timeouts import HorseMonitorTimeoutException, UnixSignalDeathPenalty +from rq.timeouts import HorseMonitorTimeoutException from rq.utils import utcnow from rq.worker import ( HerokuWorker, # HerokuWorker implements graceful shutdown on SIGTERM @@ -113,30 +113,44 @@ def enforce_hard_limit(self, job): ) self.kill_horse() - def monitor_work_horse(self, job, queue): + def monitor_work_horse(self, job: "Job", queue: "Queue"): """The worker will monitor the work horse and make sure that it either executes successfully or the status of the job is set to failed + + Args: + job (Job): _description_ + queue (Queue): _description_ """ self.monitor_started = utcnow() + retpid = ret_val = rusage = None job.started_at = utcnow() while True: try: - with UnixSignalDeathPenalty(self.job_monitoring_interval, HorseMonitorTimeoutException): - retpid, ret_val = os.waitpid(self._horse_pid, 0) + with self.death_penalty_class(self.job_monitoring_interval, HorseMonitorTimeoutException): + retpid, ret_val, rusage = self.wait_for_horse() break except HorseMonitorTimeoutException: # Horse has not exited yet and is still running. # Send a heartbeat to keep the worker alive. - self.heartbeat(self.job_monitoring_interval + 5) + self.set_current_job_working_time((utcnow() - job.started_at).total_seconds()) job.refresh() + # Kill the job from this side if something is really wrong (interpreter lock/etc). + if job.timeout != -1 and self.current_job_working_time > (job.timeout + 60): # type: ignore + self.heartbeat(self.job_monitoring_interval + 60) + self.kill_horse() + self.wait_for_horse() + break + + self.maintain_heartbeats(job) if job.is_cancelled: self.stop_executing_job(job) if self.soft_limit_exceeded(job): self.enforce_hard_limit(job) + except OSError as e: # In case we encountered an OSError due to EINTR (which is # caused by a SIGINT or SIGTERM signal during @@ -149,29 +163,32 @@ def monitor_work_horse(self, job, queue): # Send a heartbeat to keep the worker alive. self.heartbeat() + self.set_current_job_working_time(0) + self._horse_pid = 0 # Set horse PID to 0, horse has finished working if ret_val == os.EX_OK: # The process exited normally. return + job_status = job.get_status() + if job_status is None: # Job completed and its ttl has expired return - if job_status not in [JobStatus.FINISHED, JobStatus.FAILED]: + elif self._stopped_job_id == job.id: + # Work-horse killed deliberately + self.log.warning("Job stopped by user, moving job to FailedJobRegistry") + if job.stopped_callback: + job.execute_stopped_callback(self.death_penalty_class) + self.handle_job_failure(job, queue=queue, exc_string="Job stopped by user, work-horse terminated.") + elif job_status not in [JobStatus.FINISHED, JobStatus.FAILED]: if not job.ended_at: job.ended_at = utcnow() # Unhandled failure: move the job to the failed queue - self.log.warning( - ( - "Moving job to FailedJobRegistry " - "(work-horse terminated unexpectedly; waitpid returned {})" # fmt: skip - ).format(ret_val) - ) - - self.handle_job_failure( - job, - queue=queue, - exc_string="Work-horse process was terminated unexpectedly " - "(waitpid returned %s)" % ret_val, # fmt: skip - ) + signal_msg = f" (signal {os.WTERMSIG(ret_val)})" if ret_val and os.WIFSIGNALED(ret_val) else "" + exc_string = f"Work-horse terminated unexpectedly; waitpid returned {ret_val}{signal_msg}; " + self.log.warning("Moving job to FailedJobRegistry (%s)", exc_string) + + self.handle_work_horse_killed(job, retpid, ret_val, rusage) + self.handle_job_failure(job, queue=queue, exc_string=exc_string) class RedashWorker(StatsdRecordingWorker, HardLimitingWorker):