Skip to content

Commit

Permalink
eessi_bot_job_manager.py: improved and completed docstrings
Browse files Browse the repository at this point in the history
  • Loading branch information
truib committed Aug 15, 2023
1 parent 079c71d commit 77985d5
Showing 1 changed file with 119 additions and 14 deletions.
133 changes: 119 additions & 14 deletions eessi_bot_job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,35 @@


class EESSIBotSoftwareLayerJobManager:
"main class for (Slurm) job manager of EESSI bot (separate process)"
"""
Class for representing the job manager of the build-and-deploy bot. It
monitors the job queue and processes job state changes.
"""

def __init__(self):
"""
EESSIBotSoftwareLayerJobManager constructor. Just reads the
configuration to set the path to the logfile.
"""
cfg = config.read_config()
job_manager_cfg = cfg['job_manager']
self.logfile = job_manager_cfg.get('log_path')

def get_current_jobs(self):
"""
Obtains a list of jobs currently managed by the batch system.
Retains key information about each job such as its id and its state.
Args:
No arguments
Returns:
(dict): maps a job id to a dictionary containing key information
about a job (currently: 'jobid', 'state' and 'reason')
Raises:
Exception: if the environment variable USER is not set
"""
# who am i
username = os.getenv('USER', None)
if username is None:
Expand Down Expand Up @@ -123,13 +144,14 @@ def get_current_jobs(self):
return current_jobs

def determine_running_jobs(self, current_jobs):
""" determine which jobs are in running state
"""
Determine currently running jobs.
Args:
current_jobs (dict): dictionary containing data of current jobs
Returns:
running_jobs (list): list containing ids of running jobs
(list): list of ids of currently running jobs
"""
running_jobs = []
for job in current_jobs.values():
Expand All @@ -139,6 +161,21 @@ def determine_running_jobs(self, current_jobs):

# known_jobs = job_manager.get_known_jobs()
def get_known_jobs(self):
"""
Obtain information about jobs that should be known to the job manager
(e.g., before it stopped or when it is resumed after a crash). This
method obtains the information from a local store (database or
filesystem). When comparing its results to the list of jobs currently
registered with the job management system (see method
get_current_jobs()), new jobs and finished jobs can be derived.
Args:
No arguments
Returns:
(dict): maps a job id to a dictionary containing key information
about a job (currently: 'jobid')
"""
# find all symlinks resembling job ids (digits only) in jobdir
known_jobs = {}
if os.path.isdir(self.submitted_jobs_dir):
Expand Down Expand Up @@ -174,6 +211,18 @@ def get_known_jobs(self):

# new_jobs = job.manager.determine_new_jobs(known_jobs, current_jobs)
def determine_new_jobs(self, known_jobs, current_jobs):
"""
Determine which jobs are new.
Args:
known_jobs (dict): dictionary with information about jobs that are
already known/seen from before
current_jobs (dict): dictionary with information about jobs that are
currently registered with the job management system
Returns:
(list): list of ids of new jobs
"""
# known_jobs is a dictionary: jobid -> {'jobid':jobid}
# current_jobs is a dictionary: jobid -> {'jobid':jobid,
# 'state':val,'reason':val}
Expand All @@ -187,6 +236,18 @@ def determine_new_jobs(self, known_jobs, current_jobs):
# finished_jobs = job.manager.determine_finished_jobs(known_jobs,
# current_jobs)
def determine_finished_jobs(self, known_jobs, current_jobs):
"""
Determine which jobs have finished.
Args:
known_jobs (dict): dictionary with information about jobs that are
already known/seen from before
current_jobs (dict): dictionary with information about jobs that are
currently registered with the job management system
Returns:
(list): list of ids of finished jobs
"""
# known_jobs is a dictionary: jobid -> {'jobid':jobid}
# current_jobs is a dictionary: jobid -> {'jobid':jobid,
# 'state':val,'reason':val}
Expand All @@ -199,7 +260,14 @@ def determine_finished_jobs(self, known_jobs, current_jobs):

def read_job_pr_metadata(self, job_metadata_path):
"""
Check if metadata file exists, read it and return 'PR' section if so, return None if not.
Read job metadata file and return the contents of the 'PR' section.
Args:
job_metadata_path (string): path to job metadata file
Returns:
(ConfigParser): instance of ConfigParser corresponding to the 'PR'
section or None
"""
# just use a function provided by module tools.job_metadata
metadata = read_metadata_file(job_metadata_path, self.logfile)
Expand All @@ -210,7 +278,14 @@ def read_job_pr_metadata(self, job_metadata_path):

def read_job_result(self, job_result_file_path):
"""
Check if result file exists, read it and return 'RESULT section if so, return None if not.
Read job result file and return the contents of the 'RESULT' section.
Args:
job_result_file_path (string): path to job result file
Returns:
(ConfigParser): instance of ConfigParser corresponding to the
'RESULT' section or None
"""
# just use a function provided by module tools.job_metadata
result = read_metadata_file(job_result_file_path, self.logfile)
Expand All @@ -221,11 +296,20 @@ def read_job_result(self, job_result_file_path):

# job_manager.process_new_job(current_jobs[nj])
def process_new_job(self, new_job):
# create symlink in submitted_jobs_dir (destination is the working
# dir of the job derived via scontrol)
# release job
# update PR comment with new status (released)
"""
Process a new job by verifying that it is a bot job and if so
- create symlink in submitted_jobs_dir (destination is the working
dir of the job derived via scontrol)
- release the job (so it may be started by the scheduler)
- update the PR comment by adding its new status (released)
Args:
new_job (dict): dictionary storing key information about the job
Returns:
(bool): True if method completed the tasks described, False if job
is not a bot job
"""
job_id = new_job["jobid"]

scontrol_cmd = "%s --oneliner show jobid %s" % (
Expand Down Expand Up @@ -343,13 +427,19 @@ def process_new_job(self, new_job):
return True

def process_running_jobs(self, running_job):
"""process the jobs in running state and print comment
"""
Process a running job by verifying that it is a bot job and if so
- determines the PR comment body and id corresponding to the job,
- updates the PR comment (if found)
Args:
running_job (dict): dictionary containing data of the running jobs
Returns:
None (implicitly)
Raises:
Exception: raise exception if there is no metadata file
Exception: if there is no metadata file or reading it failed
"""

gh = github.get_instance()
Expand Down Expand Up @@ -407,10 +497,19 @@ def process_running_jobs(self, running_job):
)

def process_finished_job(self, finished_job):
"""Process a finished job (move symlink, log and update PR comment).
"""
Process a finished job by
- moving the symlink to the directory storing finished jobs,
- updating the PR comment with information from '*.result' file
Args:
finished_job (dict): dictionary with information about job
finished_job (dict): dictionary with information about the job
Returns:
None (implicitly)
Raises:
Exception: if there is no metadata file or reading it failed
"""
fn = sys._getframe().f_code.co_name

Expand Down Expand Up @@ -498,7 +597,13 @@ def process_finished_job(self, finished_job):


def main():
"""Main function."""
"""
Main function which parses command line arguments, verifies if required
configuration settings are defined, creates an instance of
EESSIBotSoftwareLayerJobManager, reads the configuration to initialize
core attributes, determines known jobs and starts the main loop that
monitors jobs.
"""

opts = job_manager_parse()

Expand Down

0 comments on commit 77985d5

Please sign in to comment.