Skip to content

Commit

Permalink
Merge pull request galaxyproject#18809 from mvdbeek/collect_job_metri…
Browse files Browse the repository at this point in the history
…cs_also_when_job_failed

[24.1] Collect job metrics also when job failed
  • Loading branch information
mvdbeek authored Sep 13, 2024
2 parents 00f6482 + 4a5b3dd commit 310cbd9
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 18 deletions.
24 changes: 22 additions & 2 deletions lib/galaxy/jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1379,7 +1379,15 @@ def _fix_output_permissions(self):
util.umask_fix_perms(path, self.app.config.umask, 0o666, self.app.config.gid)

def fail(
self, message, exception=False, tool_stdout="", tool_stderr="", exit_code=None, job_stdout=None, job_stderr=None
self,
message,
exception=False,
tool_stdout="",
tool_stderr="",
exit_code=None,
job_stdout=None,
job_stderr=None,
job_metrics_directory=None,
):
"""
Indicate job failure by setting state and message on all output
Expand All @@ -1406,6 +1414,10 @@ def fail(
message = str(message)
working_directory_exists = self.working_directory_exists()

if not job.tasks:
# If job was composed of tasks, don't attempt to recollect statistics
self._collect_metrics(job, job_metrics_directory)

# if the job was deleted, don't fail it
if not job.state == job.states.DELETED:
# Check if the failure is due to an exception
Expand Down Expand Up @@ -1481,6 +1493,7 @@ def fail(
pjaa.post_job_action for pjaa in job.post_job_actions if pjaa.post_job_action.action_type == "EmailAction"
]:
ActionBox.execute(self.app, self.sa_session, pja, job)

# If the job was deleted, call tool specific fail actions (used for e.g. external metadata) and clean up
if self.tool:
try:
Expand Down Expand Up @@ -1841,6 +1854,7 @@ def fail(message=job.info, exception=None):
job_stdout=job_stdout,
job_stderr=job_stderr,
exception=exception,
job_metrics_directory=job_metrics_directory,
)

# TODO: After failing here, consider returning from the function.
Expand Down Expand Up @@ -2143,7 +2157,13 @@ def cleanup(self, delete_files=True):

def _collect_metrics(self, has_metrics, job_metrics_directory=None):
job = has_metrics.get_job()
job_metrics_directory = job_metrics_directory or self.working_directory
if job_metrics_directory is None:
try:
# working directory might have been purged already
job_metrics_directory = self.working_directory
except Exception:
log.exception("Could not recover job metrics")
return
per_plugin_properties = self.app.job_metrics.collect_properties(
job.destination_id, self.job_id, job_metrics_directory
)
Expand Down
4 changes: 2 additions & 2 deletions lib/galaxy/jobs/runners/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def __init__(self, app, nworkers):

super().__init__(app, nworkers)

def __command_line(self, job_wrapper: "MinimalJobWrapper") -> Tuple[str, str]:
def _command_line(self, job_wrapper: "MinimalJobWrapper") -> Tuple[str, str]:
""" """
command_line = job_wrapper.runner_command_line

Expand Down Expand Up @@ -90,7 +90,7 @@ def queue_job(self, job_wrapper):
stderr = stdout = ""

# command line has been added to the wrapper by prepare_job()
job_file, exit_code_path = self.__command_line(job_wrapper)
job_file, exit_code_path = self._command_line(job_wrapper)
job_id = job_wrapper.get_id_tag()

try:
Expand Down
11 changes: 2 additions & 9 deletions lib/galaxy/jobs/runners/pulsar.py
Original file line number Diff line number Diff line change
Expand Up @@ -689,16 +689,9 @@ def finish_job(self, job_state: JobState):
) as exit_code_file:
exit_code_file.write(str(exit_code))
self._handle_metadata_externally(job_wrapper, resolve_requirements=True)
job_metrics_directory = os.path.join(job_wrapper.working_directory, "metadata")
# Finish the job
try:
job_metrics_directory = os.path.join(job_wrapper.working_directory, "metadata")
# Following check is a hack for jobs started during 19.01 or earlier release
# and finishing with a 19.05 code base. Eliminate the hack in 19.09 or later
# along with hacks for legacy metadata compute strategy.
if not os.path.exists(job_metrics_directory) or not any(
"__instrument" in f for f in os.listdir(job_metrics_directory)
):
job_metrics_directory = job_wrapper.working_directory
job_wrapper.finish(
tool_stdout,
tool_stderr,
Expand All @@ -710,7 +703,7 @@ def finish_job(self, job_state: JobState):
)
except Exception:
log.exception("Job wrapper finish method failed")
job_wrapper.fail("Unable to finish job", exception=True)
job_wrapper.fail("Unable to finish job", exception=True, job_metrics_directory=job_metrics_directory)

def check_pid(self, pid):
try:
Expand Down
4 changes: 3 additions & 1 deletion test/integration/resubmission_runners.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import subprocess
import time
from typing import List

Expand All @@ -13,7 +14,8 @@ class FailsJobRunner(LocalJobRunner):
def queue_job(self, job_wrapper):
if not self._prepare_job_local(job_wrapper):
return

command_line, _ = self._command_line(job_wrapper)
subprocess.run([command_line])
resource_parameters = job_wrapper.get_resource_parameters()
failure_state = resource_parameters.get("failure_state", None)

Expand Down
31 changes: 27 additions & 4 deletions test/integration/test_job_resubmission.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import os

from galaxy_test.base.populators import DatasetPopulator
from galaxy_test.driver import integration_util

SCRIPT_DIRECTORY = os.path.abspath(os.path.dirname(__file__))
Expand All @@ -27,15 +28,15 @@
class _BaseResubmissionIntegrationTestCase(integration_util.IntegrationTestCase):
framework_tool_and_types = True

def _assert_job_passes(self, tool_id="exit_code_oom", resource_parameters=None):
def _assert_job_passes(self, tool_id="exit_code_oom", resource_parameters=None, history_id=None):
resource_parameters = resource_parameters or {}
self._run_tool_test(tool_id, resource_parameters=resource_parameters)
self._run_tool_test(tool_id, resource_parameters=resource_parameters, test_history=history_id)

def _assert_job_fails(self, tool_id="exit_code_oom", resource_parameters=None):
def _assert_job_fails(self, tool_id="exit_code_oom", resource_parameters=None, history_id=None):
resource_parameters = resource_parameters or {}
exception_thrown = False
try:
self._run_tool_test(tool_id, resource_parameters=resource_parameters)
self._run_tool_test(tool_id, resource_parameters=resource_parameters, test_history=history_id)
except Exception:
exception_thrown = True

Expand All @@ -44,6 +45,11 @@ def _assert_job_fails(self, tool_id="exit_code_oom", resource_parameters=None):

class TestJobResubmissionIntegration(_BaseResubmissionIntegrationTestCase):
framework_tool_and_types = True
dataset_populator: DatasetPopulator

def setUp(self):
super().setUp()
self.dataset_populator = DatasetPopulator(self.galaxy_interactor)

@classmethod
def handle_galaxy_config_kwds(cls, config):
Expand All @@ -52,6 +58,9 @@ def handle_galaxy_config_kwds(cls, config):
config["job_resource_params_file"] = JOB_RESUBMISSION_JOB_RESOURCES_CONFIG_FILE
config["job_runner_monitor_sleep"] = 1
config["job_handler_monitor_sleep"] = 1
config["job_metrics"] = [{"type": "core"}]
# Can't set job_metrics_config_file to None as default location will be used otherwise
config["job_metrics_config_file"] = "xxx.xml"

def test_retry_tools_have_resource_params(self):
tool_show = self._get("tools/simple_constructs", data=dict(io_details=True)).json()
Expand All @@ -74,6 +83,20 @@ def test_failure_runner(self):
}
)

def test_failure_runner_job_metrics_collected(self):
with self.dataset_populator.test_history() as history_id:
self._assert_job_fails(
resource_parameters={
"test_name": "test_failure_runner",
"initial_target_environment": "fails_without_resubmission",
},
history_id=history_id,
)
jobs = self.dataset_populator.history_jobs(history_id=history_id)
assert len(jobs) == 1
job_metrics = self.dataset_populator._get(f"/api/jobs/{jobs[0]['id']}/metrics").json()
assert job_metrics

def test_walltime_resubmission(self):
self._assert_job_passes(
resource_parameters={"test_name": "test_walltime_resubmission", "failure_state": "walltime_reached"}
Expand Down

0 comments on commit 310cbd9

Please sign in to comment.