From 00ef0aa0f46f327ca9a7dcfdf2e65c0f76ae2e7d Mon Sep 17 00:00:00 2001 From: Charles Cowart <42684307+charles-cowart@users.noreply.github.com> Date: Fri, 23 Feb 2024 11:19:38 -0800 Subject: [PATCH] log-parsing (#127) * WIP: log-parsing example log-parsing for ConvertJob, based on sample log files. sample logs for other errored-jobs and unit tests are needed. * Added support and tests for ConvertJob, NuQCJob FastQCJob still in progress * test fix for CI * debug CI * debug CI * debug CI: Exception not being caught * flake8 * Error-handling appears normal. Test re-written. Test rewritten to be perhaps more flexible. * test * fix error in new test 'Logs' -> 'logs' in pathing. Doesn't affect MacOSX systems since we default to case-insensitive fs. However on Ubuntu and CI, paths are case-sensitive, hence the bug appears. * FastQCJob test added * cleanup --------- Co-authored-by: charlie --- sequence_processing_pipeline/ConvertJob.py | 37 ++++++++-- sequence_processing_pipeline/FastQCJob.py | 34 +++++++++- sequence_processing_pipeline/Job.py | 42 +++++++++--- sequence_processing_pipeline/NuQCJob.py | 53 +++++++++++---- sequence_processing_pipeline/PipelineError.py | 28 +++++++- sequence_processing_pipeline/QCJob.py | 6 +- .../tests/test_ConvertJob.py | 35 +++++++++- .../tests/test_FastQCJob.py | 68 ++++++++++++++++++- .../tests/test_Job.py | 11 +-- .../tests/test_NuQCJob.py | 63 ++++++++++++++++- 10 files changed, 334 insertions(+), 43 deletions(-) diff --git a/sequence_processing_pipeline/ConvertJob.py b/sequence_processing_pipeline/ConvertJob.py index bfe9572e..6fe75887 100644 --- a/sequence_processing_pipeline/ConvertJob.py +++ b/sequence_processing_pipeline/ConvertJob.py @@ -1,6 +1,7 @@ -from os.path import join +from os.path import join, exists from sequence_processing_pipeline.Job import Job -from sequence_processing_pipeline.PipelineError import PipelineError +from sequence_processing_pipeline.PipelineError import (PipelineError, + JobFailedError) import logging import re @@ -141,7 +142,35 @@ def run(self, callback=None): changed. :return: """ - job_info = self.submit_job(self.job_script_path, None, None, - exec_from=self.log_path, callback=callback) + try: + job_info = self.submit_job(self.job_script_path, + exec_from=self.log_path, + callback=callback) + except JobFailedError as e: + # When a job has failed, parse the logs generated by this specific + # job to return a more descriptive message to the user. + info = self.parse_logs() + # prepend just the message component of the Error. + info.insert(0, str(e)) + raise JobFailedError('\n'.join(info)) logging.info(f'Successful job: {job_info}') + + def parse_logs(self): + log_path = join(self.output_path, 'Logs') + errors = join(log_path, 'Errors.log') + + msgs = [] + + if not exists(errors): + # we do not raise an Error in this case because it's expected that + # parse_logs() will be called in response to an exceptional + # condition. + msgs.append(f"'{errors} does not exist") + + with open(errors, 'r') as f: + lines = f.readlines() + for line in [x.strip() for x in lines]: + msgs.append(line) + + return msgs diff --git a/sequence_processing_pipeline/FastQCJob.py b/sequence_processing_pipeline/FastQCJob.py index 819f2e99..cdbcafa9 100644 --- a/sequence_processing_pipeline/FastQCJob.py +++ b/sequence_processing_pipeline/FastQCJob.py @@ -1,10 +1,12 @@ from os import listdir, makedirs from os.path import exists, join, basename from sequence_processing_pipeline.Job import Job -from sequence_processing_pipeline.PipelineError import PipelineError +from sequence_processing_pipeline.PipelineError import (PipelineError, + JobFailedError) from functools import partial from json import dumps import logging +import glob class FastQCJob(Job): @@ -205,8 +207,18 @@ def _get_failed_indexes(self, job_id): return failed_indexes def run(self, callback=None): - job_info = self.submit_job(self.job_script_path, None, None, - exec_from=self.log_path, callback=callback) + try: + job_info = self.submit_job(self.job_script_path, + exec_from=self.log_path, + callback=callback) + except JobFailedError as e: + # When a job has failed, parse the logs generated by this specific + # job to return a more descriptive message to the user. + info = self.parse_logs() + # prepend just the message component of the Error. + info.insert(0, str(e)) + raise JobFailedError('\n'.join(info)) + logging.debug(job_info) # If project-level reports were not needed, MultiQC could simply be @@ -296,3 +308,19 @@ def _generate_job_script(self): with open(sh_details_fp, 'w') as f: f.write('\n'.join(self.commands)) + + def parse_logs(self): + log_path = join(self.output_path, 'logs') + files = sorted(glob.glob(join(log_path, '*.out'))) + msgs = [] + + for some_file in files: + with open(some_file, 'r') as f: + msgs += [line for line in f.readlines() + # note 'error' is not same + # requirement as found in QCJob. + # ('error:'). This is a very + # generalized filter. + if 'error' in line.lower()] + + return [msg.strip() for msg in msgs] diff --git a/sequence_processing_pipeline/Job.py b/sequence_processing_pipeline/Job.py index e91e8beb..c1d703e6 100644 --- a/sequence_processing_pipeline/Job.py +++ b/sequence_processing_pipeline/Job.py @@ -1,7 +1,9 @@ from itertools import zip_longest from os import makedirs, walk from os.path import basename, exists, split, join -from sequence_processing_pipeline.PipelineError import PipelineError +from sequence_processing_pipeline.PipelineError import (PipelineError, + JobFailedError, + ExecFailedError) from subprocess import Popen, PIPE from time import sleep import logging @@ -22,6 +24,7 @@ def __init__(self, root_dir, output_path, job_name, executable_paths, self.job_name = job_name self.root_dir = root_dir self._directory_check(self.root_dir, create=False) + self.force_job_fail = False self.output_path = join(output_path, self.job_name) self._directory_check(self.output_path, create=True) @@ -77,6 +80,9 @@ def run(self): """ raise PipelineError("Base class run() method not implemented.") + def parse_logs(self): + raise PipelineError("Base class parse_logs() method not implemented.") + def _which(self, file_path, modules_to_load=None): """ Returns file_path if file_path exists and file_path is a full path. @@ -177,7 +183,7 @@ def _system_call(self, cmd, allow_return_codes=[], callback=None): f'stdout: {stdout}\n' f'stderr: {stderr}\n') logging.error(msg) - raise PipelineError(message=msg) + raise ExecFailedError(message=msg) if callback is not None: callback(jid=proc.pid, status='COMPLETED') @@ -185,8 +191,8 @@ def _system_call(self, cmd, allow_return_codes=[], callback=None): return {'stdout': stdout, 'stderr': stderr, 'return_code': return_code} def submit_job(self, script_path, job_parameters=None, - script_parameters=None, wait=True, exec_from=None, - callback=None): + script_parameters=None, wait=True, + exec_from=None, callback=None): """ Submit a Torque job script and optionally wait for it to finish. :param script_path: The path to a Torque job (bash) script. @@ -211,6 +217,10 @@ def submit_job(self, script_path, job_parameters=None, cmd = f'cd {exec_from};' + cmd logging.debug("job scheduler call: %s" % cmd) + + if self.force_job_fail: + raise JobFailedError("This job died.") + # if system_call does not raise a PipelineError(), then the scheduler # successfully submitted the job. In this case, it should return # the id of the job in stdout. @@ -228,7 +238,8 @@ def submit_job(self, script_path, job_parameters=None, "JobID,JobName,State,Elapsed,ExitCode") if result['return_code'] != 0: - raise ValueError(result['stderr']) + # sacct did not successfully submit the job. + raise ExecFailedError(result['stderr']) # [-1] remove the extra \n jobs_data = result['stdout'].split('\n')[:-1] @@ -279,22 +290,23 @@ def submit_job(self, script_path, job_parameters=None, # job completed successfully return job_info else: - raise PipelineError(f"job {job_id} exited with exit_" - f"status {job_info['exit_status']}" - ) + exit_status = job_info['exit_status'] + raise JobFailedError(f"job {job_id} exited with exit_" + f"status {exit_status}") else: # with no other info, assume job completed successfully return job_info else: # job exited unsuccessfully - raise PipelineError(f"job {job_id} exited with status " - f"{job_info['job_state']}") + raise JobFailedError(f"job {job_id} exited with status " + f"{job_info['job_state']}") else: # job was never in the queue - return an error. if callback is not None: callback(jid=job_id, status='ERROR') - raise PipelineError("job %s never appeared in the queue." % job_id) + raise JobFailedError(f"job {job_id} never appeared in the " + "queue.") def _group_commands(self, cmds): # break list of commands into chunks of max_array_length (Typically @@ -356,3 +368,11 @@ def audit(self, sample_ids): break return sorted(list(set(found) ^ set(sample_ids))) + + def _toggle_force_job_fail(self): + if self.force_job_fail is True: + self.force_job_fail = False + else: + self.force_job_fail = True + + return self.force_job_fail diff --git a/sequence_processing_pipeline/NuQCJob.py b/sequence_processing_pipeline/NuQCJob.py index 7e567c93..616b0646 100644 --- a/sequence_processing_pipeline/NuQCJob.py +++ b/sequence_processing_pipeline/NuQCJob.py @@ -3,7 +3,8 @@ from os import stat, makedirs, rename from os.path import join, basename, dirname, exists, abspath, getmtime from sequence_processing_pipeline.Job import Job -from sequence_processing_pipeline.PipelineError import PipelineError +from sequence_processing_pipeline.PipelineError import (PipelineError, + JobFailedError) from sequence_processing_pipeline.Pipeline import Pipeline from shutil import move import logging @@ -229,9 +230,12 @@ def run(self, callback=None): job_script_path = self._generate_job_script() batch_location = join(self.temp_dir, self.batch_prefix) - batch_count = split_similar_size_bins(self.root_dir, - self.max_file_list_size_in_gb, - batch_location) + + batch_count = 0 if self.force_job_fail else \ + split_similar_size_bins(self.root_dir, + self.max_file_list_size_in_gb, + batch_location) + self.counts[self.batch_prefix] = batch_count export_params = [f"MMI={self.minimap_database_paths}", @@ -244,15 +248,20 @@ def run(self, callback=None): # job_script_path formerly known as: # process.multiprep.pangenome.adapter-filter.pe.sbatch - job_info = self.submit_job(job_script_path, - # job_parameters - was None - ' '.join(job_params), - # script_parameters - None, - # assume we want to exec from the log_path - # for now. - exec_from=self.log_path, - callback=callback) + + try: + job_info = self.submit_job(job_script_path, + job_parameters=' '.join(job_params), + exec_from=self.log_path, + callback=callback) + except JobFailedError as e: + # When a job has failed, parse the logs generated by this specific + # job to return a more descriptive message to the user. + info = self.parse_logs() + # prepend just the message component of the Error. + info.insert(0, str(e)) + raise JobFailedError('\n'.join(info)) + job_id = job_info['job_id'] logging.debug(f'NuQCJob {job_id} completed') @@ -383,6 +392,11 @@ def _process_sample_sheet(self): 'sample_ids': sample_ids} def _generate_job_script(self): + # bypass generating job script for a force-fail job, since it is + # not needed. + if self.force_job_fail: + return None + job_script_path = join(self.output_path, 'process_all_fastq_files.sh') template = self.jinja_env.get_template("nuqc_job.sh") @@ -434,3 +448,16 @@ def _generate_job_script(self): length_limit=self.length_limit)) return job_script_path + + def parse_logs(self): + log_path = join(self.output_path, 'logs') + # sorted lists give predictable results + files = sorted(glob.glob(join(log_path, '*.out'))) + msgs = [] + + for some_file in files: + with open(some_file, 'r') as f: + msgs += [line for line in f.readlines() + if 'error:' in line.lower()] + + return [msg.strip() for msg in msgs] diff --git a/sequence_processing_pipeline/PipelineError.py b/sequence_processing_pipeline/PipelineError.py index 6248735f..d604546f 100644 --- a/sequence_processing_pipeline/PipelineError.py +++ b/sequence_processing_pipeline/PipelineError.py @@ -1,5 +1,29 @@ class PipelineError(Exception): - def __init__(self, message=None, mailing_list=None): + def __init__(self, message=None): + self.message = message + super().__init__(self.message) + + +class JobFailedError(PipelineError): + # Occurs when a successfully-submitted job as failed. + def __init__(self, message=None): + self.message = message + super().__init__(self.message) + + +class ExecFailedError(PipelineError): + # Occurs when an executed command returns w/an error, which is defined as + # the command returning a value not zero and not an acceptable non-zero + # value. + def __init__(self, message=None): + self.message = message + super().__init__(self.message) + + +class LogParsingError(PipelineError): + # Occurs when an executed command returns w/an error, which is defined as + # the command returning a value not zero and not an acceptable non-zero + # value. May or may not be useful. + def __init__(self, message=None): self.message = message - self.mailing_list = mailing_list super().__init__(self.message) diff --git a/sequence_processing_pipeline/QCJob.py b/sequence_processing_pipeline/QCJob.py index bd2901f4..2067150e 100644 --- a/sequence_processing_pipeline/QCJob.py +++ b/sequence_processing_pipeline/QCJob.py @@ -175,9 +175,9 @@ def run(self, callback=None): for project in self.project_data: project_name = project['Sample_Project'] needs_human_filtering = project['HumanFiltering'] - job_info = self.submit_job( - self.script_paths[project_name], None, None, - exec_from=self.log_path, callback=callback) + job_info = self.submit_job(self.script_paths[project_name], + exec_from=self.log_path, + callback=callback) job_id = job_info['job_id'] logging.debug(f'QCJob {job_id} completed') source_dir = join(self.output_path, project_name) diff --git a/sequence_processing_pipeline/tests/test_ConvertJob.py b/sequence_processing_pipeline/tests/test_ConvertJob.py index 009f5a00..79fa75a0 100644 --- a/sequence_processing_pipeline/tests/test_ConvertJob.py +++ b/sequence_processing_pipeline/tests/test_ConvertJob.py @@ -1,7 +1,8 @@ from os.path import join, abspath from os import makedirs from sequence_processing_pipeline.ConvertJob import ConvertJob -from sequence_processing_pipeline.PipelineError import PipelineError +from sequence_processing_pipeline.PipelineError import (PipelineError, + JobFailedError) from functools import partial import unittest from shutil import rmtree @@ -893,6 +894,17 @@ def setUp(self): with open(line, 'w') as f2: f2.write("This is a file.") + # convert to Log directory and file created by bcl-convert. + # this is separate from the standard 'log' directory created + # by all jobs. + self.convert_log_path = join(faked_output_path, 'Logs') + makedirs(self.convert_log_path, exist_ok=True) + self.convert_log_path = join(self.convert_log_path, 'Errors.log') + with open(self.convert_log_path, 'w') as f: + f.write("2024-01-01T12:12:12Z thread 99999 ERROR: Sample Sheet " + "Error: in OverrideCycles: Read # 2 specified does not " + "add up to the 8 bases expected from RunInfo.xml\n") + def tearDown(self): rmtree(self.good_input_path) rmtree(self.good_output_path) @@ -930,6 +942,27 @@ def test_creation(self): gop=self.good_output_path, run_dir=run_dir)) + def test_error_msg_from_logs(self): + run_dir = self.base_path('211021_A00000_0000_SAMPLE') + qiita_id = 'abcdabcdabcdabcdabcdabcdabcdabcd' + + job = ConvertJob(run_dir, self.good_output_path, + self.sample_sheet_path, 'qiita', 1, 16, 1440, '10gb', + 'tests/bin/bcl-convert', [], qiita_id) + + # an internal method to force submit_job() to raise a JobFailedError + # instead of submitting the job w/sbatch and waiting for a failed + # job w/sacct. + self.assertTrue(job._toggle_force_job_fail()) + + error_msg = ("This job died.\n2024-01-01T12:12:12Z thread 99999 ERROR:" + " Sample Sheet Error: in OverrideCycles: Read # 2 " + "specified does not add up to the 8 bases expected from" + " RunInfo.xml") + + with self.assertRaisesRegex(JobFailedError, error_msg): + job.run() + def test_audit(self): # the faked output should be in self.good_output_path/ConvertJob. # ConvertJob already takes into account 'ConvertJob' and so the diff --git a/sequence_processing_pipeline/tests/test_FastQCJob.py b/sequence_processing_pipeline/tests/test_FastQCJob.py index 46bf4708..28fe52cb 100644 --- a/sequence_processing_pipeline/tests/test_FastQCJob.py +++ b/sequence_processing_pipeline/tests/test_FastQCJob.py @@ -2,7 +2,8 @@ from os.path import join, exists, isfile from functools import partial from sequence_processing_pipeline.FastQCJob import FastQCJob -from sequence_processing_pipeline.PipelineError import PipelineError +from sequence_processing_pipeline.PipelineError import (PipelineError, + JobFailedError) from os import makedirs, listdir, mkdir from shutil import rmtree, move from json import load @@ -15,6 +16,7 @@ def setUp(self): self.path = partial(join, package_root, 'tests', 'data') self.qiita_job_id = 'abcdabcdabcdabcdabcdabcdabcdabcd' self.output_path = self.path('output_dir2') + self.fastqc_log_path = join(self.output_path, 'logs') self.raw_fastq_files_path = ('sequence_processing_pipeline/tests/data' '/211021_A00000_0000_SAMPLE/Data/Fastq/p' 'roject1') @@ -527,6 +529,41 @@ def setUp(self): with open(file_name, 'w') as f2: f2.write("This is a file.") + # set up dummy logs + self.fastqc_log_path = join(self.output_path, "FastQCJob", "logs") + makedirs(self.fastqc_log_path, exist_ok=True) + + log_files = { + 'slurm-9999999_35.out': ["---------------", + "Run details:", + ("hds-fe848a9e-c0e9-49d9-978d-" + "27565a314e8b 1908305 b2-018"), + "---------------", + "+ this", + "+ that", + "+ blah", + ("something error: Generic Standin Error" + " (GSE).")], + 'slurm-9999999_17.out': ["---------------", + "Run details:", + ("hds-fe848a9e-c0e9-49d9-978d-" + "27565a314e8b 1908305 b2-018"), + "---------------", + "+ this", + "+ that", + "+ blah", + ("something error: Another Standin Error" + " (ASE).")] + } + + for log_file in log_files: + fp = join(self.fastqc_log_path, log_file) + + with open(fp, 'w') as f: + lines = log_files[log_file] + for line in lines: + f.write(f"{line}\n") + def tearDown(self): rmtree(self.output_path) @@ -1071,6 +1108,35 @@ def test_completed_file_generation_some_failures(self): "failed_indexes": [3, 4]} self.assertDictEqual(obs, exp) + def test_error_msg_from_logs(self): + job = FastQCJob(self.qc_root_path, self.output_path, + self.raw_fastq_files_path.replace('/project1', ''), + self.processed_fastq_files_path, + 16, 16, + 'sequence_processing_pipeline/tests/bin/fastqc', [], + self.qiita_job_id, 'queue_name', 4, 23, '8g', 30, + self.config_yml, 1000, False) + + self.assertFalse(job is None) + + # an internal method to force submit_job() to raise a JobFailedError + # instead of submitting the job w/sbatch and waiting for a failed + # job w/sacct. + self.assertTrue(job._toggle_force_job_fail()) + + try: + job.run() + except JobFailedError as e: + # assert that the text of the original error message was + # preserved, while including known strings from each of the + # sample log-files. + print(str(e)) + self.assertIn('This job died.', str(e)) + self.assertIn('something error: Generic Standin Error (GSE)', + str(e)) + self.assertIn('something error: Another Standin Error (ASE)', + str(e)) + if __name__ == '__main__': unittest.main() diff --git a/sequence_processing_pipeline/tests/test_Job.py b/sequence_processing_pipeline/tests/test_Job.py index b01e651c..0eccaf6a 100644 --- a/sequence_processing_pipeline/tests/test_Job.py +++ b/sequence_processing_pipeline/tests/test_Job.py @@ -45,10 +45,13 @@ def my_callback(jid=None, status=None): obs = job._system_call('ls ' + join(package_root, 'tests', 'bin'), callback=my_callback) - exp = {'stdout': 'bcl-convert\nbcl2fastq\nfastqc\n', - 'stderr': '', - 'return_code': 0} - self.assertDictEqual(obs, exp) + + exp = ['bcl2fastq\nbcl-convert\nfastqc\n', + 'bcl-convert\nbcl2fastq\nfastqc\n'] + + self.assertIn(obs['stdout'], exp) + self.assertEqual(obs['stderr'], '') + self.assertEqual(obs['return_code'], 0) for item in callback_results: self.assertTrue(isinstance(item[0], int)) diff --git a/sequence_processing_pipeline/tests/test_NuQCJob.py b/sequence_processing_pipeline/tests/test_NuQCJob.py index 0f4bcd9f..cb683f1d 100644 --- a/sequence_processing_pipeline/tests/test_NuQCJob.py +++ b/sequence_processing_pipeline/tests/test_NuQCJob.py @@ -3,7 +3,8 @@ from os.path import join, abspath, exists, dirname from functools import partial from sequence_processing_pipeline.NuQCJob import NuQCJob -from sequence_processing_pipeline.PipelineError import PipelineError +from sequence_processing_pipeline.PipelineError import (PipelineError, + JobFailedError) from os import makedirs, remove from metapool import load_sample_sheet import glob @@ -542,6 +543,40 @@ def setUp(self): self.sample_ids = self.feist_ids + self.gerwick_ids + self.nyu_ids + self.qc_log_path = join(self.output_path, "NuQCJob", "logs") + makedirs(self.qc_log_path, exist_ok=True) + + log_files = { + 'slurm-9999999_35.out': ["---------------", + "Run details:", + ("hds-fe848a9e-c0e9-49d9-978d-" + "27565a314e8b 1908305 b2-018"), + "---------------", + "+ this", + "+ that", + "+ blah", + ("something error: Generic Standin Error" + " (GSE).")], + 'slurm-9999999_17.out': ["---------------", + "Run details:", + ("hds-fe848a9e-c0e9-49d9-978d-" + "27565a314e8b 1908305 b2-018"), + "---------------", + "+ this", + "+ that", + "+ blah", + ("something error: Another Standin Error" + " (ASE).")] + } + + for log_file in log_files: + fp = join(self.qc_log_path, log_file) + + with open(fp, 'w') as f: + lines = log_files[log_file] + for line in lines: + f.write(f"{line}\n") + def tearDown(self): shutil.rmtree(self.output_path) if exists(self.tmp_file_path): @@ -601,6 +636,32 @@ def test_nuqcjob_creation(self): 'fastp', 'minimap2', 'samtools', [], self.qiita_job_id, 1000, '') + def test_error_msg_from_logs(self): + job = NuQCJob(self.fastq_root_path, self.output_path, + self.good_sample_sheet_path, self.mmi_db_paths, + 'queue_name', 1, 1440, '8gb', + 'fastp', 'minimap2', 'samtools', [], + self.qiita_job_id, 1000, '') + + self.assertFalse(job is None) + + # an internal method to force submit_job() to raise a JobFailedError + # instead of submitting the job w/sbatch and waiting for a failed + # job w/sacct. + self.assertTrue(job._toggle_force_job_fail()) + + try: + job.run() + except JobFailedError as e: + # assert that the text of the original error message was + # preserved, while including known strings from each of the + # sample log-files. + self.assertIn('This job died.', str(e)) + self.assertIn('something error: Generic Standin Error (GSE)', + str(e)) + self.assertIn('something error: Another Standin Error (ASE)', + str(e)) + def test_assay_value(self): with self.assertRaisesRegex(ValueError, "bad-sample-sheet-metagenomics" ".csv' does not appear to be a"