Skip to content

Commit

Permalink
log-parsing (#127)
Browse files Browse the repository at this point in the history
* WIP: log-parsing

example log-parsing for ConvertJob, based on sample log files.
sample logs for other errored-jobs and unit tests are needed.

* Added support and tests for ConvertJob, NuQCJob

FastQCJob still in progress

* test fix for CI

* debug CI

* debug CI

* debug CI: Exception not being caught

* flake8

* Error-handling appears normal. Test re-written.

Test rewritten to be perhaps more flexible.

* test

* fix error in new test

'Logs' -> 'logs' in pathing. Doesn't affect MacOSX systems since we default to case-insensitive fs.
However on Ubuntu and CI, paths are case-sensitive, hence the bug appears.

* FastQCJob test added

* cleanup

---------

Co-authored-by: charlie <[email protected]>
  • Loading branch information
charles-cowart and charlie authored Feb 23, 2024
1 parent 2b15b07 commit 00ef0aa
Show file tree
Hide file tree
Showing 10 changed files with 334 additions and 43 deletions.
37 changes: 33 additions & 4 deletions sequence_processing_pipeline/ConvertJob.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from os.path import join
from os.path import join, exists
from sequence_processing_pipeline.Job import Job
from sequence_processing_pipeline.PipelineError import PipelineError
from sequence_processing_pipeline.PipelineError import (PipelineError,
JobFailedError)
import logging
import re

Expand Down Expand Up @@ -141,7 +142,35 @@ def run(self, callback=None):
changed.
:return:
"""
job_info = self.submit_job(self.job_script_path, None, None,
exec_from=self.log_path, callback=callback)
try:
job_info = self.submit_job(self.job_script_path,
exec_from=self.log_path,
callback=callback)
except JobFailedError as e:
# When a job has failed, parse the logs generated by this specific
# job to return a more descriptive message to the user.
info = self.parse_logs()
# prepend just the message component of the Error.
info.insert(0, str(e))
raise JobFailedError('\n'.join(info))

logging.info(f'Successful job: {job_info}')

def parse_logs(self):
log_path = join(self.output_path, 'Logs')
errors = join(log_path, 'Errors.log')

msgs = []

if not exists(errors):
# we do not raise an Error in this case because it's expected that
# parse_logs() will be called in response to an exceptional
# condition.
msgs.append(f"'{errors} does not exist")

with open(errors, 'r') as f:
lines = f.readlines()
for line in [x.strip() for x in lines]:
msgs.append(line)

return msgs
34 changes: 31 additions & 3 deletions sequence_processing_pipeline/FastQCJob.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from os import listdir, makedirs
from os.path import exists, join, basename
from sequence_processing_pipeline.Job import Job
from sequence_processing_pipeline.PipelineError import PipelineError
from sequence_processing_pipeline.PipelineError import (PipelineError,
JobFailedError)
from functools import partial
from json import dumps
import logging
import glob


class FastQCJob(Job):
Expand Down Expand Up @@ -205,8 +207,18 @@ def _get_failed_indexes(self, job_id):
return failed_indexes

def run(self, callback=None):
job_info = self.submit_job(self.job_script_path, None, None,
exec_from=self.log_path, callback=callback)
try:
job_info = self.submit_job(self.job_script_path,
exec_from=self.log_path,
callback=callback)
except JobFailedError as e:
# When a job has failed, parse the logs generated by this specific
# job to return a more descriptive message to the user.
info = self.parse_logs()
# prepend just the message component of the Error.
info.insert(0, str(e))
raise JobFailedError('\n'.join(info))

logging.debug(job_info)

# If project-level reports were not needed, MultiQC could simply be
Expand Down Expand Up @@ -296,3 +308,19 @@ def _generate_job_script(self):

with open(sh_details_fp, 'w') as f:
f.write('\n'.join(self.commands))

def parse_logs(self):
log_path = join(self.output_path, 'logs')
files = sorted(glob.glob(join(log_path, '*.out')))
msgs = []

for some_file in files:
with open(some_file, 'r') as f:
msgs += [line for line in f.readlines()
# note 'error' is not same
# requirement as found in QCJob.
# ('error:'). This is a very
# generalized filter.
if 'error' in line.lower()]

return [msg.strip() for msg in msgs]
42 changes: 31 additions & 11 deletions sequence_processing_pipeline/Job.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from itertools import zip_longest
from os import makedirs, walk
from os.path import basename, exists, split, join
from sequence_processing_pipeline.PipelineError import PipelineError
from sequence_processing_pipeline.PipelineError import (PipelineError,
JobFailedError,
ExecFailedError)
from subprocess import Popen, PIPE
from time import sleep
import logging
Expand All @@ -22,6 +24,7 @@ def __init__(self, root_dir, output_path, job_name, executable_paths,
self.job_name = job_name
self.root_dir = root_dir
self._directory_check(self.root_dir, create=False)
self.force_job_fail = False

self.output_path = join(output_path, self.job_name)
self._directory_check(self.output_path, create=True)
Expand Down Expand Up @@ -77,6 +80,9 @@ def run(self):
"""
raise PipelineError("Base class run() method not implemented.")

def parse_logs(self):
raise PipelineError("Base class parse_logs() method not implemented.")

def _which(self, file_path, modules_to_load=None):
"""
Returns file_path if file_path exists and file_path is a full path.
Expand Down Expand Up @@ -177,16 +183,16 @@ def _system_call(self, cmd, allow_return_codes=[], callback=None):
f'stdout: {stdout}\n'
f'stderr: {stderr}\n')
logging.error(msg)
raise PipelineError(message=msg)
raise ExecFailedError(message=msg)

if callback is not None:
callback(jid=proc.pid, status='COMPLETED')

return {'stdout': stdout, 'stderr': stderr, 'return_code': return_code}

def submit_job(self, script_path, job_parameters=None,
script_parameters=None, wait=True, exec_from=None,
callback=None):
script_parameters=None, wait=True,
exec_from=None, callback=None):
"""
Submit a Torque job script and optionally wait for it to finish.
:param script_path: The path to a Torque job (bash) script.
Expand All @@ -211,6 +217,10 @@ def submit_job(self, script_path, job_parameters=None,
cmd = f'cd {exec_from};' + cmd

logging.debug("job scheduler call: %s" % cmd)

if self.force_job_fail:
raise JobFailedError("This job died.")

# if system_call does not raise a PipelineError(), then the scheduler
# successfully submitted the job. In this case, it should return
# the id of the job in stdout.
Expand All @@ -228,7 +238,8 @@ def submit_job(self, script_path, job_parameters=None,
"JobID,JobName,State,Elapsed,ExitCode")

if result['return_code'] != 0:
raise ValueError(result['stderr'])
# sacct did not successfully submit the job.
raise ExecFailedError(result['stderr'])

# [-1] remove the extra \n
jobs_data = result['stdout'].split('\n')[:-1]
Expand Down Expand Up @@ -279,22 +290,23 @@ def submit_job(self, script_path, job_parameters=None,
# job completed successfully
return job_info
else:
raise PipelineError(f"job {job_id} exited with exit_"
f"status {job_info['exit_status']}"
)
exit_status = job_info['exit_status']
raise JobFailedError(f"job {job_id} exited with exit_"
f"status {exit_status}")
else:
# with no other info, assume job completed successfully
return job_info
else:
# job exited unsuccessfully
raise PipelineError(f"job {job_id} exited with status "
f"{job_info['job_state']}")
raise JobFailedError(f"job {job_id} exited with status "
f"{job_info['job_state']}")
else:
# job was never in the queue - return an error.
if callback is not None:
callback(jid=job_id, status='ERROR')

raise PipelineError("job %s never appeared in the queue." % job_id)
raise JobFailedError(f"job {job_id} never appeared in the "
"queue.")

def _group_commands(self, cmds):
# break list of commands into chunks of max_array_length (Typically
Expand Down Expand Up @@ -356,3 +368,11 @@ def audit(self, sample_ids):
break

return sorted(list(set(found) ^ set(sample_ids)))

def _toggle_force_job_fail(self):
if self.force_job_fail is True:
self.force_job_fail = False
else:
self.force_job_fail = True

return self.force_job_fail
53 changes: 40 additions & 13 deletions sequence_processing_pipeline/NuQCJob.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
from os import stat, makedirs, rename
from os.path import join, basename, dirname, exists, abspath, getmtime
from sequence_processing_pipeline.Job import Job
from sequence_processing_pipeline.PipelineError import PipelineError
from sequence_processing_pipeline.PipelineError import (PipelineError,
JobFailedError)
from sequence_processing_pipeline.Pipeline import Pipeline
from shutil import move
import logging
Expand Down Expand Up @@ -229,9 +230,12 @@ def run(self, callback=None):
job_script_path = self._generate_job_script()

batch_location = join(self.temp_dir, self.batch_prefix)
batch_count = split_similar_size_bins(self.root_dir,
self.max_file_list_size_in_gb,
batch_location)

batch_count = 0 if self.force_job_fail else \
split_similar_size_bins(self.root_dir,
self.max_file_list_size_in_gb,
batch_location)

self.counts[self.batch_prefix] = batch_count

export_params = [f"MMI={self.minimap_database_paths}",
Expand All @@ -244,15 +248,20 @@ def run(self, callback=None):

# job_script_path formerly known as:
# process.multiprep.pangenome.adapter-filter.pe.sbatch
job_info = self.submit_job(job_script_path,
# job_parameters - was None
' '.join(job_params),
# script_parameters
None,
# assume we want to exec from the log_path
# for now.
exec_from=self.log_path,
callback=callback)

try:
job_info = self.submit_job(job_script_path,
job_parameters=' '.join(job_params),
exec_from=self.log_path,
callback=callback)
except JobFailedError as e:
# When a job has failed, parse the logs generated by this specific
# job to return a more descriptive message to the user.
info = self.parse_logs()
# prepend just the message component of the Error.
info.insert(0, str(e))
raise JobFailedError('\n'.join(info))

job_id = job_info['job_id']
logging.debug(f'NuQCJob {job_id} completed')

Expand Down Expand Up @@ -383,6 +392,11 @@ def _process_sample_sheet(self):
'sample_ids': sample_ids}

def _generate_job_script(self):
# bypass generating job script for a force-fail job, since it is
# not needed.
if self.force_job_fail:
return None

job_script_path = join(self.output_path, 'process_all_fastq_files.sh')
template = self.jinja_env.get_template("nuqc_job.sh")

Expand Down Expand Up @@ -434,3 +448,16 @@ def _generate_job_script(self):
length_limit=self.length_limit))

return job_script_path

def parse_logs(self):
log_path = join(self.output_path, 'logs')
# sorted lists give predictable results
files = sorted(glob.glob(join(log_path, '*.out')))
msgs = []

for some_file in files:
with open(some_file, 'r') as f:
msgs += [line for line in f.readlines()
if 'error:' in line.lower()]

return [msg.strip() for msg in msgs]
28 changes: 26 additions & 2 deletions sequence_processing_pipeline/PipelineError.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,29 @@
class PipelineError(Exception):
def __init__(self, message=None, mailing_list=None):
def __init__(self, message=None):
self.message = message
super().__init__(self.message)


class JobFailedError(PipelineError):
# Occurs when a successfully-submitted job as failed.
def __init__(self, message=None):
self.message = message
super().__init__(self.message)


class ExecFailedError(PipelineError):
# Occurs when an executed command returns w/an error, which is defined as
# the command returning a value not zero and not an acceptable non-zero
# value.
def __init__(self, message=None):
self.message = message
super().__init__(self.message)


class LogParsingError(PipelineError):
# Occurs when an executed command returns w/an error, which is defined as
# the command returning a value not zero and not an acceptable non-zero
# value. May or may not be useful.
def __init__(self, message=None):
self.message = message
self.mailing_list = mailing_list
super().__init__(self.message)
6 changes: 3 additions & 3 deletions sequence_processing_pipeline/QCJob.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,9 @@ def run(self, callback=None):
for project in self.project_data:
project_name = project['Sample_Project']
needs_human_filtering = project['HumanFiltering']
job_info = self.submit_job(
self.script_paths[project_name], None, None,
exec_from=self.log_path, callback=callback)
job_info = self.submit_job(self.script_paths[project_name],
exec_from=self.log_path,
callback=callback)
job_id = job_info['job_id']
logging.debug(f'QCJob {job_id} completed')
source_dir = join(self.output_path, project_name)
Expand Down
Loading

0 comments on commit 00ef0aa

Please sign in to comment.