From b58930058d15cf6b8bc62b4a8a51ab51ba7a5df6 Mon Sep 17 00:00:00 2001 From: Venkat Krishnan Date: Fri, 18 Oct 2024 11:55:05 -0700 Subject: [PATCH] [dv, dvsim] Add job queue/lic wait timeout in addition to run timeout The timeout_mins specified in the test list json file or on dvsim command line is intended to be a timeout on the wall time elapsed after the job has started running.It is not intended to count in the time spent in waiting for a compute node and/or a license to become available. Despite best efforts to specify license resource strings, the availability of a compute node doesn't always guarantee the availability of a license as this scheme only works when all jobs from all projects running on the cluster adhere to this license requesting scheme which sadly can't be enforced in a fool proof manner. In the proposed scheme, the file size as a proxy to determine if the job has really started running. While this is not one hundred percent accurate, it prevents false timeouts on jobs/tests that haven't even had a chance to start running because of unavailability of compute or license resources. The file size threshold to determine whether the job has started running has been chosen after carefully analyzing log files of jobs executed successfuly and jobs that were killed while waiting Signed-off-by: Venkat Krishnan --- util/dvsim/NcLauncher.py | 58 +++++++++++++++++++++++++++++++++++----- 1 file changed, 51 insertions(+), 7 deletions(-) diff --git a/util/dvsim/NcLauncher.py b/util/dvsim/NcLauncher.py index 18ab66db605f43..77d22a75093585 100755 --- a/util/dvsim/NcLauncher.py +++ b/util/dvsim/NcLauncher.py @@ -71,6 +71,12 @@ def get_submit_cmd(self): license_args + ['--', f'{odir}/run.sh']) + def _pre_launch(self): + # start the start_time (correspoinding to job wait time counter) + super()._pre_launch() + # hold the start_run_time at 0 (corresponding to job run time counter) + self.start_run_time = 0 + def _do_launch(self): # Compute the environment for the subprocess by overriding environment # variables of this process with matching ones from self.deploy.exports @@ -88,6 +94,7 @@ def _do_launch(self): self._dump_env_vars(exports) + rm_path(self.deploy.get_log_path()) # using os.open instead of fopen as this allows # sharing of file descriptors across processes fd = os.open(self.deploy.get_log_path(), os.O_WRONLY | os.O_CREAT) @@ -95,7 +102,7 @@ def _do_launch(self): os.set_inheritable(fd, True) message = '[Executing]:\n{}\n\n'.format(self.deploy.cmd) fobj.write(message) - + fobj.flush() if self.deploy.sim_cfg.interactive: # Interactive: Set RUN_INTERACTIVE to 1 exports['RUN_INTERACTIVE'] = '1' @@ -150,14 +157,51 @@ def poll(self): """ assert self.process is not None - elapsed_time = datetime.datetime.now() - self.start_time - job_runtime_secs = elapsed_time.total_seconds() if self.process.poll() is None: - timeout_mins = self.deploy.get_timeout_mins() - if timeout_mins is not None and not self.deploy.gui: - if job_runtime_secs > (timeout_mins * 60): + run_timeout_mins = self.deploy.get_timeout_mins() + wait_timeout_mins = 180 # max wait time in job queue / license + file_size_thresh_bytes = 5000 # log file size threshold + if run_timeout_mins is not None and not self.deploy.gui: + job_waittime = 0 + job_runtime = 0 + # query the log file size + f_size = os.path.getsize(self.deploy.get_log_path()) + if f_size < file_size_thresh_bytes: + # if the job log size is less than the threshold, increment + # the job wait time counter. hold the run time counter at 0 + self.start_run_time = 0 + elapsed_wait_time = (datetime.datetime.now() - + self.start_time) + job_waittime = elapsed_wait_time.total_seconds() / 60 + else: + # if the job log size is more than the threshold, start the + # the job run time counter if has not already started. + # Hold the wait time counter at the last updated value + if (self.start_run_time == 0): + self.start_run_time = datetime.datetime.now() + elapsed_wait_time = (self.start_run_time - + self.start_time) + job_waittime = elapsed_wait_time.total_seconds() / 60 + # if the job run time counter has already started, increment + # the job run time counter + if (self.start_run_time != 0): + elapsed_run_time = (datetime.datetime.now() - + self.start_run_time) + job_runtime = elapsed_run_time.total_seconds() / 60 + # if the job run has not started within the max time specified + # in this file or if the job has been running for longer than + # the time specified in the block json or via dvsim switches + # terminate it and send the appropriate error message + wait_timeout = (job_waittime > wait_timeout_mins) + run_timeout = (job_runtime > run_timeout_mins) + if run_timeout or wait_timeout: self._kill() - timeout_message = f'Job timed out after {timeout_mins} mins' + if run_timeout: + timeout_message = f'Job timed out after running ' \ + f'{run_timeout_mins} mins' + if wait_timeout: + timeout_message = f'Job timed out after waiting ' \ + f'{wait_timeout_mins} mins' self._post_finish('K', ErrorMessage(line_number=None, message=timeout_message,