diff --git a/lib/galaxy/jobs/__init__.py b/lib/galaxy/jobs/__init__.py index c930229df5e0..34d3550dc1ae 100644 --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -1379,7 +1379,15 @@ def _fix_output_permissions(self): util.umask_fix_perms(path, self.app.config.umask, 0o666, self.app.config.gid) def fail( - self, message, exception=False, tool_stdout="", tool_stderr="", exit_code=None, job_stdout=None, job_stderr=None + self, + message, + exception=False, + tool_stdout="", + tool_stderr="", + exit_code=None, + job_stdout=None, + job_stderr=None, + job_metrics_directory=None, ): """ Indicate job failure by setting state and message on all output @@ -1406,6 +1414,10 @@ def fail( message = str(message) working_directory_exists = self.working_directory_exists() + if not job.tasks: + # If job was composed of tasks, don't attempt to recollect statistics + self._collect_metrics(job, job_metrics_directory) + # if the job was deleted, don't fail it if not job.state == job.states.DELETED: # Check if the failure is due to an exception @@ -1481,6 +1493,7 @@ def fail( pjaa.post_job_action for pjaa in job.post_job_actions if pjaa.post_job_action.action_type == "EmailAction" ]: ActionBox.execute(self.app, self.sa_session, pja, job) + # If the job was deleted, call tool specific fail actions (used for e.g. external metadata) and clean up if self.tool: try: @@ -1841,6 +1854,7 @@ def fail(message=job.info, exception=None): job_stdout=job_stdout, job_stderr=job_stderr, exception=exception, + job_metrics_directory=job_metrics_directory, ) # TODO: After failing here, consider returning from the function. @@ -2143,7 +2157,13 @@ def cleanup(self, delete_files=True): def _collect_metrics(self, has_metrics, job_metrics_directory=None): job = has_metrics.get_job() - job_metrics_directory = job_metrics_directory or self.working_directory + if job_metrics_directory is None: + try: + # working directory might have been purged already + job_metrics_directory = self.working_directory + except Exception: + log.exception("Could not recover job metrics") + return per_plugin_properties = self.app.job_metrics.collect_properties( job.destination_id, self.job_id, job_metrics_directory ) diff --git a/lib/galaxy/jobs/runners/local.py b/lib/galaxy/jobs/runners/local.py index 98438a155ef6..eb1853eb01bf 100644 --- a/lib/galaxy/jobs/runners/local.py +++ b/lib/galaxy/jobs/runners/local.py @@ -57,7 +57,7 @@ def __init__(self, app, nworkers): super().__init__(app, nworkers) - def __command_line(self, job_wrapper: "MinimalJobWrapper") -> Tuple[str, str]: + def _command_line(self, job_wrapper: "MinimalJobWrapper") -> Tuple[str, str]: """ """ command_line = job_wrapper.runner_command_line @@ -90,7 +90,7 @@ def queue_job(self, job_wrapper): stderr = stdout = "" # command line has been added to the wrapper by prepare_job() - job_file, exit_code_path = self.__command_line(job_wrapper) + job_file, exit_code_path = self._command_line(job_wrapper) job_id = job_wrapper.get_id_tag() try: diff --git a/lib/galaxy/jobs/runners/pulsar.py b/lib/galaxy/jobs/runners/pulsar.py index 58cee82f3f29..4bfb47ae296d 100644 --- a/lib/galaxy/jobs/runners/pulsar.py +++ b/lib/galaxy/jobs/runners/pulsar.py @@ -689,16 +689,9 @@ def finish_job(self, job_state: JobState): ) as exit_code_file: exit_code_file.write(str(exit_code)) self._handle_metadata_externally(job_wrapper, resolve_requirements=True) + job_metrics_directory = os.path.join(job_wrapper.working_directory, "metadata") # Finish the job try: - job_metrics_directory = os.path.join(job_wrapper.working_directory, "metadata") - # Following check is a hack for jobs started during 19.01 or earlier release - # and finishing with a 19.05 code base. Eliminate the hack in 19.09 or later - # along with hacks for legacy metadata compute strategy. - if not os.path.exists(job_metrics_directory) or not any( - "__instrument" in f for f in os.listdir(job_metrics_directory) - ): - job_metrics_directory = job_wrapper.working_directory job_wrapper.finish( tool_stdout, tool_stderr, @@ -710,7 +703,7 @@ def finish_job(self, job_state: JobState): ) except Exception: log.exception("Job wrapper finish method failed") - job_wrapper.fail("Unable to finish job", exception=True) + job_wrapper.fail("Unable to finish job", exception=True, job_metrics_directory=job_metrics_directory) def check_pid(self, pid): try: diff --git a/test/integration/resubmission_runners.py b/test/integration/resubmission_runners.py index 73da3c2e1e7f..e516ca243b27 100644 --- a/test/integration/resubmission_runners.py +++ b/test/integration/resubmission_runners.py @@ -1,3 +1,4 @@ +import subprocess import time from typing import List @@ -13,7 +14,8 @@ class FailsJobRunner(LocalJobRunner): def queue_job(self, job_wrapper): if not self._prepare_job_local(job_wrapper): return - + command_line, _ = self._command_line(job_wrapper) + subprocess.run([command_line]) resource_parameters = job_wrapper.get_resource_parameters() failure_state = resource_parameters.get("failure_state", None) diff --git a/test/integration/test_job_resubmission.py b/test/integration/test_job_resubmission.py index dd6da5d423a3..a812978d96e5 100644 --- a/test/integration/test_job_resubmission.py +++ b/test/integration/test_job_resubmission.py @@ -2,6 +2,7 @@ import os +from galaxy_test.base.populators import DatasetPopulator from galaxy_test.driver import integration_util SCRIPT_DIRECTORY = os.path.abspath(os.path.dirname(__file__)) @@ -27,15 +28,15 @@ class _BaseResubmissionIntegrationTestCase(integration_util.IntegrationTestCase): framework_tool_and_types = True - def _assert_job_passes(self, tool_id="exit_code_oom", resource_parameters=None): + def _assert_job_passes(self, tool_id="exit_code_oom", resource_parameters=None, history_id=None): resource_parameters = resource_parameters or {} - self._run_tool_test(tool_id, resource_parameters=resource_parameters) + self._run_tool_test(tool_id, resource_parameters=resource_parameters, test_history=history_id) - def _assert_job_fails(self, tool_id="exit_code_oom", resource_parameters=None): + def _assert_job_fails(self, tool_id="exit_code_oom", resource_parameters=None, history_id=None): resource_parameters = resource_parameters or {} exception_thrown = False try: - self._run_tool_test(tool_id, resource_parameters=resource_parameters) + self._run_tool_test(tool_id, resource_parameters=resource_parameters, test_history=history_id) except Exception: exception_thrown = True @@ -44,6 +45,11 @@ def _assert_job_fails(self, tool_id="exit_code_oom", resource_parameters=None): class TestJobResubmissionIntegration(_BaseResubmissionIntegrationTestCase): framework_tool_and_types = True + dataset_populator: DatasetPopulator + + def setUp(self): + super().setUp() + self.dataset_populator = DatasetPopulator(self.galaxy_interactor) @classmethod def handle_galaxy_config_kwds(cls, config): @@ -52,6 +58,9 @@ def handle_galaxy_config_kwds(cls, config): config["job_resource_params_file"] = JOB_RESUBMISSION_JOB_RESOURCES_CONFIG_FILE config["job_runner_monitor_sleep"] = 1 config["job_handler_monitor_sleep"] = 1 + config["job_metrics"] = [{"type": "core"}] + # Can't set job_metrics_config_file to None as default location will be used otherwise + config["job_metrics_config_file"] = "xxx.xml" def test_retry_tools_have_resource_params(self): tool_show = self._get("tools/simple_constructs", data=dict(io_details=True)).json() @@ -74,6 +83,20 @@ def test_failure_runner(self): } ) + def test_failure_runner_job_metrics_collected(self): + with self.dataset_populator.test_history() as history_id: + self._assert_job_fails( + resource_parameters={ + "test_name": "test_failure_runner", + "initial_target_environment": "fails_without_resubmission", + }, + history_id=history_id, + ) + jobs = self.dataset_populator.history_jobs(history_id=history_id) + assert len(jobs) == 1 + job_metrics = self.dataset_populator._get(f"/api/jobs/{jobs[0]['id']}/metrics").json() + assert job_metrics + def test_walltime_resubmission(self): self._assert_job_passes( resource_parameters={"test_name": "test_walltime_resubmission", "failure_state": "walltime_reached"}