Skip to content

Commit

Permalink
[dv, dvsim] Add job queue/lic wait timeout in addition to run timeout
Browse files Browse the repository at this point in the history
The timeout_mins specified in the test list json file or on dvsim command
 line is intended to be a timeout on the wall time elapsed after the job
has started running.It is not intended to count in the time spent in
waiting for a compute node and/or a license to become available.
Despite best efforts to specify license resource strings, the
availability of a compute node doesn't always guarantee the availability
of a license as this scheme only works when all jobs from all projects
running on the cluster adhere to this license requesting scheme which
sadly can't be enforced in a fool proof manner.
In the proposed scheme, the file size as a proxy to determine if the job
has really started running. While this is not one hundred percent
accurate, it prevents false timeouts on jobs/tests that haven't even had
a chance to start running because of unavailability of compute or license
 resources. The file size threshold to determine whether the job has
started running has been chosen after carefully analyzing log files of
jobs executed successfuly and jobs that were killed while waiting

Signed-off-by: Venkat Krishnan <[email protected]>
  • Loading branch information
venkatk-ot committed Oct 18, 2024
1 parent 45a92de commit b589300
Showing 1 changed file with 51 additions and 7 deletions.
58 changes: 51 additions & 7 deletions util/dvsim/NcLauncher.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ def get_submit_cmd(self):
license_args +
['--', f'{odir}/run.sh'])

def _pre_launch(self):
# start the start_time (correspoinding to job wait time counter)
super()._pre_launch()
# hold the start_run_time at 0 (corresponding to job run time counter)
self.start_run_time = 0

def _do_launch(self):
# Compute the environment for the subprocess by overriding environment
# variables of this process with matching ones from self.deploy.exports
Expand All @@ -88,14 +94,15 @@ def _do_launch(self):

self._dump_env_vars(exports)

rm_path(self.deploy.get_log_path())
# using os.open instead of fopen as this allows
# sharing of file descriptors across processes
fd = os.open(self.deploy.get_log_path(), os.O_WRONLY | os.O_CREAT)
fobj = os.fdopen(fd, 'w', encoding='UTF-8')
os.set_inheritable(fd, True)
message = '[Executing]:\n{}\n\n'.format(self.deploy.cmd)
fobj.write(message)

fobj.flush()
if self.deploy.sim_cfg.interactive:
# Interactive: Set RUN_INTERACTIVE to 1
exports['RUN_INTERACTIVE'] = '1'
Expand Down Expand Up @@ -150,14 +157,51 @@ def poll(self):
"""

assert self.process is not None
elapsed_time = datetime.datetime.now() - self.start_time
job_runtime_secs = elapsed_time.total_seconds()
if self.process.poll() is None:
timeout_mins = self.deploy.get_timeout_mins()
if timeout_mins is not None and not self.deploy.gui:
if job_runtime_secs > (timeout_mins * 60):
run_timeout_mins = self.deploy.get_timeout_mins()
wait_timeout_mins = 180 # max wait time in job queue / license
file_size_thresh_bytes = 5000 # log file size threshold
if run_timeout_mins is not None and not self.deploy.gui:
job_waittime = 0
job_runtime = 0
# query the log file size
f_size = os.path.getsize(self.deploy.get_log_path())
if f_size < file_size_thresh_bytes:
# if the job log size is less than the threshold, increment
# the job wait time counter. hold the run time counter at 0
self.start_run_time = 0
elapsed_wait_time = (datetime.datetime.now() -
self.start_time)
job_waittime = elapsed_wait_time.total_seconds() / 60
else:
# if the job log size is more than the threshold, start the
# the job run time counter if has not already started.
# Hold the wait time counter at the last updated value
if (self.start_run_time == 0):
self.start_run_time = datetime.datetime.now()
elapsed_wait_time = (self.start_run_time -
self.start_time)
job_waittime = elapsed_wait_time.total_seconds() / 60
# if the job run time counter has already started, increment
# the job run time counter
if (self.start_run_time != 0):
elapsed_run_time = (datetime.datetime.now() -
self.start_run_time)
job_runtime = elapsed_run_time.total_seconds() / 60
# if the job run has not started within the max time specified
# in this file or if the job has been running for longer than
# the time specified in the block json or via dvsim switches
# terminate it and send the appropriate error message
wait_timeout = (job_waittime > wait_timeout_mins)
run_timeout = (job_runtime > run_timeout_mins)
if run_timeout or wait_timeout:
self._kill()
timeout_message = f'Job timed out after {timeout_mins} mins'
if run_timeout:
timeout_message = f'Job timed out after running ' \
f'{run_timeout_mins} mins'
if wait_timeout:
timeout_message = f'Job timed out after waiting ' \
f'{wait_timeout_mins} mins'
self._post_finish('K',
ErrorMessage(line_number=None,
message=timeout_message,
Expand Down

0 comments on commit b589300

Please sign in to comment.