From 77985d54cd6702244582a7a79b329682f762c812 Mon Sep 17 00:00:00 2001 From: Thomas Roeblitz Date: Tue, 15 Aug 2023 16:35:50 +0200 Subject: [PATCH] eessi_bot_job_manager.py: improved and completed docstrings --- eessi_bot_job_manager.py | 133 ++++++++++++++++++++++++++++++++++----- 1 file changed, 119 insertions(+), 14 deletions(-) diff --git a/eessi_bot_job_manager.py b/eessi_bot_job_manager.py index bded09fd..7a1d86c6 100644 --- a/eessi_bot_job_manager.py +++ b/eessi_bot_job_manager.py @@ -73,14 +73,35 @@ class EESSIBotSoftwareLayerJobManager: - "main class for (Slurm) job manager of EESSI bot (separate process)" + """ + Class for representing the job manager of the build-and-deploy bot. It + monitors the job queue and processes job state changes. + """ def __init__(self): + """ + EESSIBotSoftwareLayerJobManager constructor. Just reads the + configuration to set the path to the logfile. + """ cfg = config.read_config() job_manager_cfg = cfg['job_manager'] self.logfile = job_manager_cfg.get('log_path') def get_current_jobs(self): + """ + Obtains a list of jobs currently managed by the batch system. + Retains key information about each job such as its id and its state. + + Args: + No arguments + + Returns: + (dict): maps a job id to a dictionary containing key information + about a job (currently: 'jobid', 'state' and 'reason') + + Raises: + Exception: if the environment variable USER is not set + """ # who am i username = os.getenv('USER', None) if username is None: @@ -123,13 +144,14 @@ def get_current_jobs(self): return current_jobs def determine_running_jobs(self, current_jobs): - """ determine which jobs are in running state + """ + Determine currently running jobs. Args: current_jobs (dict): dictionary containing data of current jobs Returns: - running_jobs (list): list containing ids of running jobs + (list): list of ids of currently running jobs """ running_jobs = [] for job in current_jobs.values(): @@ -139,6 +161,21 @@ def determine_running_jobs(self, current_jobs): # known_jobs = job_manager.get_known_jobs() def get_known_jobs(self): + """ + Obtain information about jobs that should be known to the job manager + (e.g., before it stopped or when it is resumed after a crash). This + method obtains the information from a local store (database or + filesystem). When comparing its results to the list of jobs currently + registered with the job management system (see method + get_current_jobs()), new jobs and finished jobs can be derived. + + Args: + No arguments + + Returns: + (dict): maps a job id to a dictionary containing key information + about a job (currently: 'jobid') + """ # find all symlinks resembling job ids (digits only) in jobdir known_jobs = {} if os.path.isdir(self.submitted_jobs_dir): @@ -174,6 +211,18 @@ def get_known_jobs(self): # new_jobs = job.manager.determine_new_jobs(known_jobs, current_jobs) def determine_new_jobs(self, known_jobs, current_jobs): + """ + Determine which jobs are new. + + Args: + known_jobs (dict): dictionary with information about jobs that are + already known/seen from before + current_jobs (dict): dictionary with information about jobs that are + currently registered with the job management system + + Returns: + (list): list of ids of new jobs + """ # known_jobs is a dictionary: jobid -> {'jobid':jobid} # current_jobs is a dictionary: jobid -> {'jobid':jobid, # 'state':val,'reason':val} @@ -187,6 +236,18 @@ def determine_new_jobs(self, known_jobs, current_jobs): # finished_jobs = job.manager.determine_finished_jobs(known_jobs, # current_jobs) def determine_finished_jobs(self, known_jobs, current_jobs): + """ + Determine which jobs have finished. + + Args: + known_jobs (dict): dictionary with information about jobs that are + already known/seen from before + current_jobs (dict): dictionary with information about jobs that are + currently registered with the job management system + + Returns: + (list): list of ids of finished jobs + """ # known_jobs is a dictionary: jobid -> {'jobid':jobid} # current_jobs is a dictionary: jobid -> {'jobid':jobid, # 'state':val,'reason':val} @@ -199,7 +260,14 @@ def determine_finished_jobs(self, known_jobs, current_jobs): def read_job_pr_metadata(self, job_metadata_path): """ - Check if metadata file exists, read it and return 'PR' section if so, return None if not. + Read job metadata file and return the contents of the 'PR' section. + + Args: + job_metadata_path (string): path to job metadata file + + Returns: + (ConfigParser): instance of ConfigParser corresponding to the 'PR' + section or None """ # just use a function provided by module tools.job_metadata metadata = read_metadata_file(job_metadata_path, self.logfile) @@ -210,7 +278,14 @@ def read_job_pr_metadata(self, job_metadata_path): def read_job_result(self, job_result_file_path): """ - Check if result file exists, read it and return 'RESULT section if so, return None if not. + Read job result file and return the contents of the 'RESULT' section. + + Args: + job_result_file_path (string): path to job result file + + Returns: + (ConfigParser): instance of ConfigParser corresponding to the + 'RESULT' section or None """ # just use a function provided by module tools.job_metadata result = read_metadata_file(job_result_file_path, self.logfile) @@ -221,11 +296,20 @@ def read_job_result(self, job_result_file_path): # job_manager.process_new_job(current_jobs[nj]) def process_new_job(self, new_job): - # create symlink in submitted_jobs_dir (destination is the working - # dir of the job derived via scontrol) - # release job - # update PR comment with new status (released) + """ + Process a new job by verifying that it is a bot job and if so + - create symlink in submitted_jobs_dir (destination is the working + dir of the job derived via scontrol) + - release the job (so it may be started by the scheduler) + - update the PR comment by adding its new status (released) + Args: + new_job (dict): dictionary storing key information about the job + + Returns: + (bool): True if method completed the tasks described, False if job + is not a bot job + """ job_id = new_job["jobid"] scontrol_cmd = "%s --oneliner show jobid %s" % ( @@ -343,13 +427,19 @@ def process_new_job(self, new_job): return True def process_running_jobs(self, running_job): - """process the jobs in running state and print comment + """ + Process a running job by verifying that it is a bot job and if so + - determines the PR comment body and id corresponding to the job, + - updates the PR comment (if found) Args: running_job (dict): dictionary containing data of the running jobs + Returns: + None (implicitly) + Raises: - Exception: raise exception if there is no metadata file + Exception: if there is no metadata file or reading it failed """ gh = github.get_instance() @@ -407,10 +497,19 @@ def process_running_jobs(self, running_job): ) def process_finished_job(self, finished_job): - """Process a finished job (move symlink, log and update PR comment). + """ + Process a finished job by + - moving the symlink to the directory storing finished jobs, + - updating the PR comment with information from '*.result' file Args: - finished_job (dict): dictionary with information about job + finished_job (dict): dictionary with information about the job + + Returns: + None (implicitly) + + Raises: + Exception: if there is no metadata file or reading it failed """ fn = sys._getframe().f_code.co_name @@ -498,7 +597,13 @@ def process_finished_job(self, finished_job): def main(): - """Main function.""" + """ + Main function which parses command line arguments, verifies if required + configuration settings are defined, creates an instance of + EESSIBotSoftwareLayerJobManager, reads the configuration to initialize + core attributes, determines known jobs and starts the main loop that + monitors jobs. + """ opts = job_manager_parse()