Skip to content

Commit

Permalink
[dv, dvsim] Add job queue/lic wait timeout in addition to run timeout
Browse files Browse the repository at this point in the history
The timeout_mins specified in the test list json file or on dvsim command
 line is intended to be a timeout on the wall time elapsed after the job
has started running.It is not intended to count in the time spent in
waiting for a compute node and/or a license to become available.
Despite best efforts to specify license resource strings, the
availability of a compute node doesn't always guarantee the availability
of a license as this scheme only works when all jobs from all projects
running on the cluster adhere to this license requesting scheme which
sadly can't be enforced in a fool proof manner.
In the proposed scheme, the file size as a proxy to determine if the job
has really started running. While this is not one hundred percent
accurate, it prevents false timeouts on jobs/tests that haven't even had
a chance to start running because of unavailability of compute or license
 resources. The file size threshold to determine whether the job has
started running has been chosen after carefully analyzing log files of
jobs executed successfuly and jobs that were killed while waiting

Signed-off-by: Venkat Krishnan <[email protected]>
  • Loading branch information
venkatk-ot committed Oct 22, 2024
1 parent 45a92de commit 9ee2375
Showing 1 changed file with 52 additions and 7 deletions.
59 changes: 52 additions & 7 deletions util/dvsim/NcLauncher.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ def get_submit_cmd(self):
license_args +
['--', f'{odir}/run.sh'])

def _pre_launch(self):
# store the start_time (correspoinding to job wait time counter)
super()._pre_launch()
# set the nc_job_state to the initial state - waiting for resource
self.nc_job_state = 'waiting'

def _do_launch(self):
# Compute the environment for the subprocess by overriding environment
# variables of this process with matching ones from self.deploy.exports
Expand All @@ -88,14 +94,18 @@ def _do_launch(self):

self._dump_env_vars(exports)

# For reruns, delete the log file of the past run to avoid any race
# condition between the log file getting updated for the new run
# versus the logic that distinguishes the job wait versus run times.
rm_path(self.deploy.get_log_path())
# using os.open instead of fopen as this allows
# sharing of file descriptors across processes
fd = os.open(self.deploy.get_log_path(), os.O_WRONLY | os.O_CREAT)
fobj = os.fdopen(fd, 'w', encoding='UTF-8')
os.set_inheritable(fd, True)
message = '[Executing]:\n{}\n\n'.format(self.deploy.cmd)
fobj.write(message)

fobj.flush()
if self.deploy.sim_cfg.interactive:
# Interactive: Set RUN_INTERACTIVE to 1
exports['RUN_INTERACTIVE'] = '1'
Expand Down Expand Up @@ -150,14 +160,49 @@ def poll(self):
"""

assert self.process is not None
elapsed_time = datetime.datetime.now() - self.start_time
job_runtime_secs = elapsed_time.total_seconds()
if self.process.poll() is None:
timeout_mins = self.deploy.get_timeout_mins()
if timeout_mins is not None and not self.deploy.gui:
if job_runtime_secs > (timeout_mins * 60):
# get the run_timeout value from the test hjson or dvsim switch
# define a wait_timeout to 3 hours (hardcoded in this file)
# define a file size threshold on the run log for the algo to
# predict the state of the job (running or waiting for resource)
run_timeout_mins = self.deploy.get_timeout_mins()
wait_timeout_mins = 180 # max wait time in job / license queue
file_size_thresh_bytes = 5120 # log file size threshold
if run_timeout_mins is not None and not self.deploy.gui:
# query the log file size
f_size = os.path.getsize(self.deploy.get_log_path())

if f_size >= file_size_thresh_bytes:
if (self.nc_job_state == 'waiting'):
# if the job log size is more than the threshold, start the
# the job run time counter if has not already been started.
self.start_time = datetime.datetime.now()
self.nc_job_state = 'running'

if self.nc_job_state == 'waiting':
# if the job log size is less than the threshold, increment
# the job wait time counter.
elapsed_wait_time = (datetime.datetime.now() -
self.start_time)
job_waittime_mins = elapsed_wait_time.total_seconds() / 60
if (job_waittime_mins > wait_timeout_mins):
self.nc_job_state = 'wait_timeout'

if self.nc_job_state == 'running':
elapsed_run_time = (datetime.datetime.now() -
self.start_time)
job_runtime_mins = elapsed_run_time.total_seconds() / 60
if (job_runtime_mins > run_timeout_mins):
self.nc_job_state = 'run_timeout'

if self.nc_job_state in ('wait_timeout', 'run_timeout'):
self._kill()
timeout_message = f'Job timed out after {timeout_mins} mins'
if self.nc_job_state == 'run_timeout':
timeout_message = f'Job timed out after running ' \
f'{run_timeout_mins} mins'
if self.nc_job_state == 'wait_timeout':
timeout_message = f'Job timed out after waiting ' \
f'{wait_timeout_mins} mins'
self._post_finish('K',
ErrorMessage(line_number=None,
message=timeout_message,
Expand Down

0 comments on commit 9ee2375

Please sign in to comment.