Skip to content

Commit

Permalink
eessi_bot_job_manager.py: improved and cleaned up comments
Browse files Browse the repository at this point in the history
  • Loading branch information
truib committed Aug 15, 2023
1 parent 77985d5 commit fc200f6
Showing 1 changed file with 38 additions and 80 deletions.
118 changes: 38 additions & 80 deletions eessi_bot_job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@
# (Slurm) job manager for the GitHub App for the EESSI project
#
# This tool monitors EESSI build jobs and acts on state changes of
# these jobs. It releases jobs initially held, it processes finished
# jobs and for both reports status changes/results back to the
# corresponding GitHub pull request to a software-layer repo (origin
# or fork).
# these jobs. It releases jobs initially held, it processes running and
# finished jobs and reports status changes/results back to the
# corresponding GitHub pull request to a target software-layer repository.
#
# EESSI build jobs are recognised by
# - being submitted in JobUserHeld status (sbatch parameter --hold)
# - job ids listed in a specific directory (ids being symlinks to job
# directories created by EESSI bot)
# - a metadata file in the job's working directory
#
# This file is part of the EESSI build-and-deploy bot,
# see https://github.com/EESSI/eessi-bot-software-layer
Expand Down Expand Up @@ -102,7 +102,6 @@ def get_current_jobs(self):
Raises:
Exception: if the environment variable USER is not set
"""
# who am i
username = os.getenv('USER', None)
if username is None:
raise Exception("Unable to find username")
Expand All @@ -114,10 +113,9 @@ def get_current_jobs(self):
log_file=self.logfile,
)

# create dictionary of jobs
# if any with the following information per job:
# jobid, state, nodelist_reason
# skip first two lines of output ("range(2,...)")
# create dictionary of jobs from output of 'squeue_cmd'
# with the following information per job: jobid, state,
# nodelist_reason
current_jobs = {}
lines = str(squeue_output).rstrip().split("\n")
bad_state_messages = {
Expand All @@ -127,8 +125,9 @@ def get_current_jobs(self):
}

# get job info, logging any Slurm issues
# Note, the first two lines of the output are skipped ("range(2,...)")
# because they contain header information.
for i in range(2, len(lines)):
# assume lines 2 to len(lines) contain jobs
job = lines[i].rstrip().split()
if len(job) >= 9:
job_id = job[0]
Expand Down Expand Up @@ -159,7 +158,6 @@ def determine_running_jobs(self, current_jobs):
running_jobs.append(job["jobid"])
return running_jobs

# known_jobs = job_manager.get_known_jobs()
def get_known_jobs(self):
"""
Obtain information about jobs that should be known to the job manager
Expand All @@ -176,7 +174,9 @@ def get_known_jobs(self):
(dict): maps a job id to a dictionary containing key information
about a job (currently: 'jobid')
"""
# find all symlinks resembling job ids (digits only) in jobdir
# find all symlinks resembling job ids (digits only) in
# self.submitted_jobs_dir (the symlink is created by method
# process_new_job)
known_jobs = {}
if os.path.isdir(self.submitted_jobs_dir):
regex = re.compile(r"(\d)+")
Expand Down Expand Up @@ -209,7 +209,6 @@ def get_known_jobs(self):

return known_jobs

# new_jobs = job.manager.determine_new_jobs(known_jobs, current_jobs)
def determine_new_jobs(self, known_jobs, current_jobs):
"""
Determine which jobs are new.
Expand All @@ -223,18 +222,13 @@ def determine_new_jobs(self, known_jobs, current_jobs):
Returns:
(list): list of ids of new jobs
"""
# known_jobs is a dictionary: jobid -> {'jobid':jobid}
# current_jobs is a dictionary: jobid -> {'jobid':jobid,
# 'state':val,'reason':val}
new_jobs = []
for ckey in current_jobs:
if ckey not in known_jobs:
new_jobs.append(ckey)

return new_jobs

# finished_jobs = job.manager.determine_finished_jobs(known_jobs,
# current_jobs)
def determine_finished_jobs(self, known_jobs, current_jobs):
"""
Determine which jobs have finished.
Expand All @@ -248,9 +242,6 @@ def determine_finished_jobs(self, known_jobs, current_jobs):
Returns:
(list): list of ids of finished jobs
"""
# known_jobs is a dictionary: jobid -> {'jobid':jobid}
# current_jobs is a dictionary: jobid -> {'jobid':jobid,
# 'state':val,'reason':val}
finished_jobs = []
for kkey in known_jobs:
if kkey not in current_jobs:
Expand All @@ -269,7 +260,7 @@ def read_job_pr_metadata(self, job_metadata_path):
(ConfigParser): instance of ConfigParser corresponding to the 'PR'
section or None
"""
# just use a function provided by module tools.job_metadata
# reuse function from module tools.job_metadata to read metadata file
metadata = read_metadata_file(job_metadata_path, self.logfile)
if metadata and "PR" in metadata:
return metadata["PR"]
Expand All @@ -287,14 +278,13 @@ def read_job_result(self, job_result_file_path):
(ConfigParser): instance of ConfigParser corresponding to the
'RESULT' section or None
"""
# just use a function provided by module tools.job_metadata
# reuse function from module tools.job_metadata to read metadata file
result = read_metadata_file(job_result_file_path, self.logfile)
if result and "RESULT" in result:
return result["RESULT"]
else:
return None

# job_manager.process_new_job(current_jobs[nj])
def process_new_job(self, new_job):
"""
Process a new job by verifying that it is a bot job and if so
Expand Down Expand Up @@ -322,8 +312,8 @@ def process_new_job(self, new_job):
log_file=self.logfile,
)

# parse output,
# look for WorkDir=dir
# parse output of 'scontrol_cmd' to determine the job's working
# directory
match = re.search(r".* WorkDir=(\S+) .*",
str(scontrol_output))
if match:
Expand All @@ -338,7 +328,8 @@ def process_new_job(self, new_job):
job_id,
)

# check if metadata file exist
# assuming that a bot job's working directory contains a metadata
# file, its existence is used to check if the job belongs to the bot
metadata_pr = self.read_job_pr_metadata(job_metadata_path)

if metadata_pr is None:
Expand All @@ -365,28 +356,17 @@ def process_new_job(self, new_job):
log_file=self.logfile,
)

# update PR
# (a) get repo name and PR number
# from file _bot_job<JOBID>.metadata
# (b) find & get comment for this job
# (c) add a row to the table

# (a) get repo name and PR number from
# file _bot_job<JOBID>.metadata
# the file should be written by the event handler
# to the working dir of the job

# get repo name
# update PR defined by repo and pr_number stored in the job's
# metadata file
repo_name = metadata_pr.get("repo", "")
# get pr number
pr_number = metadata_pr.get("pr_number", None)

gh = github.get_instance()

repo = gh.get_repo(repo_name)
pr = repo.get_pull(int(pr_number))

# (b) find & get comment for this job
# find & get comment for this job
# only get comment if we don't know its id yet
if "comment_id" not in new_job:
new_job_cmnt = get_submitted_job_comment(pr, new_job['jobid'])
Expand All @@ -399,8 +379,7 @@ def process_new_job(self, new_job):
)
new_job["comment_id"] = new_job_cmnt.id

# (c) add a row to the table
# add row to status table if we found a comment
# update status table if we found a comment
if "comment_id" in new_job:
new_job_comments_cfg = config.read_config()[NEW_JOB_COMMENTS]
dt = datetime.now(timezone.utc)
Expand All @@ -413,11 +392,7 @@ def process_new_job(self, new_job):
" for job '%s'" % job_id,
self.logfile,
)
# TODO just create one?
else:
# TODO can we run this tool on a job directory? the path to
# the directory might be obtained from
# a comment to the PR
log(
"process_new_job(): did not find work dir for job '%s'"
% job_id,
Expand All @@ -444,12 +419,9 @@ def process_running_jobs(self, running_job):

gh = github.get_instance()

# set some variables for accessing work dir of job
# set variable for accessing the working directory of the job
job_dir = os.path.join(self.submitted_jobs_dir, running_job["jobid"])

# TODO create function for obtaining values from metadata file
# might be based on allowing multiple configuration files
# in tools/config.py
metadata_file = "_bot_job%s.metadata" % running_job["jobid"]
job_metadata_path = os.path.join(job_dir, metadata_file)

Expand All @@ -458,9 +430,7 @@ def process_running_jobs(self, running_job):
if metadata_pr is None:
raise Exception("Unable to find metadata file")

# get repo name
repo_name = metadata_pr.get("repo", "")
# get pr number
pr_number = metadata_pr.get("pr_number", None)

repo = gh.get_repo(repo_name)
Expand Down Expand Up @@ -513,13 +483,9 @@ def process_finished_job(self, finished_job):
"""
fn = sys._getframe().f_code.co_name

# PROCEDURE
# - MOVE symlink to finished dir
# - REPORT status always to log, if accessible also to PR comment

job_id = finished_job['jobid']

# MOVE symlink from job_ids_dir/submitted to jobs_ids_dir/finished
# move symlink from job_ids_dir/submitted to jobs_ids_dir/finished
old_symlink = os.path.join(self.submitted_jobs_dir, job_id)

finished_jobs_dir = os.path.join(self.job_ids_dir, "finished")
Expand Down Expand Up @@ -548,10 +514,9 @@ def process_finished_job(self, finished_job):
job_result_file_path = os.path.join(new_symlink, job_result_file)
job_results = self.read_job_result(job_result_file_path)

# set comment_description in case no results were found (self.read_job_result
# returned None), it's also used (reused actually) in case the job
# results do not have a preformatted comment
job_result_unknown_fmt = finished_job_comments_cfg[JOB_RESULT_UNKNOWN_FMT]
# set fallback comment_description in case no result file was found
# (self.read_job_result returned None)
comment_description = job_result_unknown_fmt.format(filename=job_result_file)
if job_results:
# get preformatted comment_description or use previously set default for unknown
Expand All @@ -568,24 +533,18 @@ def process_finished_job(self, finished_job):
comment_update = f"\n|{dt.strftime('%b %d %X %Z %Y')}|finished|"
comment_update += f"{comment_description}|"

# obtain id of PR comment to be updated (from _bot_jobID.metadata)
# obtain id of PR comment to be updated (from file '_bot_jobID.metadata')
metadata_file = f"_bot_job{job_id}.metadata"
job_metadata_path = os.path.join(new_symlink, metadata_file)
metadata_pr = self.read_job_pr_metadata(job_metadata_path)
if metadata_pr is None:
# TODO should we raise the Exception here? maybe first process
# the finished job and raise an exception at the end?
raise Exception("Unable to find metadata file ... skip updating PR comment")

# get repo name
repo_name = metadata_pr.get("repo", None)
# get pr number
pr_number = metadata_pr.get("pr_number", -1)
# get pr comment id
pr_comment_id = metadata_pr.get("pr_comment_id", -1)
log(f"{fn}(): pr comment id {pr_comment_id}", self.logfile)

# establish contact to pull request on github
gh = github.get_instance()

repo = gh.get_repo(repo_name)
Expand All @@ -607,7 +566,8 @@ def main():

opts = job_manager_parse()

# config is read and checked for settings to raise an exception early when the job_manager runs.
# config is read and checked for settings to raise an exception early when
# the job_manager runs
config.check_required_cfg_settings(REQUIRED_CONFIG)
github.connect()

Expand All @@ -629,13 +589,16 @@ def main():

# before main loop, get list of known jobs (stored on disk)
# main loop
# ---------
# get current jobs of the bot user (job id, state, reason)
# (assume all are jobs building software)
# determine new jobs (comparing known and current jobs)
# process new jobs (filtered by optional command line option)
# process new jobs (filtered by optional command line option)
# determine running jobs (comparing known and current jobs)
# process running jobs (filtered by optional command line option)
# determine finished jobs (comparing known and current jobs)
# process finished jobs (filtered by optional command line option)
# process finished jobs (filtered by optional command line option)
# set known jobs to list of current jobs
# wait configurable period before next iteration begins

max_iter = int(opts.max_manager_iterations)
# retrieve some settings from app.cfg
Expand Down Expand Up @@ -691,15 +654,12 @@ def main():
for nj in new_jobs:
# assume it is not a bot job
is_bot_job = False
# apply filtering of job ids
if not job_manager.job_filter or nj in job_manager.job_filter:
is_bot_job = job_manager.process_new_job(current_jobs[nj])
if not is_bot_job:
# add job id to non_bot_jobs list
non_bot_jobs.append(nj)
# else:
# log("job manager main loop: skipping new job"
# " %s due to parameter '--jobs %s'" % (
# nj,opts.jobs), job_manager.logfile)

# remove non bot jobs from current_jobs
for job in non_bot_jobs:
Expand All @@ -713,6 +673,7 @@ def main():
)

for rj in running_jobs:
# apply filtering of job ids
if not job_manager.job_filter or rj in job_manager.job_filter:
job_manager.process_running_jobs(current_jobs[rj])

Expand All @@ -725,12 +686,9 @@ def main():
)
# process finished jobs
for fj in finished_jobs:
# apply filtering of job ids
if not job_manager.job_filter or fj in job_manager.job_filter:
job_manager.process_finished_job(known_jobs[fj])
# else:
# log("job manager main loop: skipping finished "
# "job %s due"" to parameter '--jobs %s'" % (fj,opts.jobs),
# " job_manager.logfile)"

known_jobs = current_jobs

Expand Down

0 comments on commit fc200f6

Please sign in to comment.