Skip to content

Commit

Permalink
Merge pull request #18858 from mvdbeek/rerun_messages
Browse files Browse the repository at this point in the history
[24.1] Raise MessageException instead of assertions on rerun problems
  • Loading branch information
mvdbeek authored Sep 23, 2024
2 parents 1a02832 + b70a763 commit 6c85d45
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 25 deletions.
1 change: 1 addition & 0 deletions lib/galaxy/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4545,6 +4545,7 @@ class DatasetInstance(RepresentById, UsesCreateAndUpdateTime, _HasTable):
creating_job_associations: List[Union[JobToOutputDatasetCollectionAssociation, JobToOutputDatasetAssociation]]
copied_from_history_dataset_association: Optional["HistoryDatasetAssociation"]
copied_from_library_dataset_dataset_association: Optional["LibraryDatasetDatasetAssociation"]
dependent_jobs: List[JobToInputLibraryDatasetAssociation]
implicitly_converted_datasets: List["ImplicitlyConvertedDatasetAssociation"]

validated_states = DatasetValidatedState
Expand Down
68 changes: 43 additions & 25 deletions lib/galaxy/tools/actions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
cast,
Dict,
List,
Optional,
Set,
TYPE_CHECKING,
Union,
Expand All @@ -18,6 +19,7 @@

from galaxy import model
from galaxy.exceptions import (
AuthenticationRequired,
ItemAccessibilityException,
RequestParameterInvalidException,
)
Expand Down Expand Up @@ -699,14 +701,6 @@ def handle_output(name, output, hidden=None):
# Remap any outputs if this is a rerun and the user chose to continue dependent jobs
# This functionality requires tracking jobs in the database.
if app.config.track_jobs_in_database and rerun_remap_job_id is not None:
# Need to flush here so that referencing outputs by id works
session = trans.sa_session()
try:
session.expire_on_commit = False
with transaction(session):
session.commit()
finally:
session.expire_on_commit = True
self._remap_job_on_rerun(
trans=trans,
galaxy_session=galaxy_session,
Expand Down Expand Up @@ -747,30 +741,54 @@ def handle_output(name, output, hidden=None):

return job, out_data, history

def _remap_job_on_rerun(self, trans, galaxy_session, rerun_remap_job_id, current_job, out_data):
def _remap_job_on_rerun(
self,
trans: ProvidesHistoryContext,
galaxy_session: Optional[model.GalaxySession],
rerun_remap_job_id: int,
current_job: Job,
out_data,
):
"""
Re-connect dependent datasets for a job that is being rerun (because it failed initially).
If a job fails, the user has the option to try the job again with changed parameters.
To be able to resume jobs that depend on this jobs output datasets we change the dependent's job
input datasets to be those of the job that is being rerun.
"""
old_job = trans.sa_session.get(Job, rerun_remap_job_id)
if not old_job:
# I don't think that can really happen
raise RequestParameterInvalidException("rerun_remap_job_id parameter is invalid")
old_tool = trans.app.toolbox.get_tool(old_job.tool_id, exact=False)
new_tool = trans.app.toolbox.get_tool(current_job.tool_id, exact=False)
if old_tool and new_tool and old_tool.old_id != new_tool.old_id:
# If we currently only have the old or new tool installed we'll find the other tool anyway with `exact=False`.
# If we don't have the tool at all we'll fail anyway, no need to worry here.
raise RequestParameterInvalidException(
f"Old tool id ({old_job.tool_id}) does not match rerun tool id ({current_job.tool_id})"
)
if trans.user is not None:
if old_job.user_id != trans.user.id:
raise RequestParameterInvalidException(
"Cannot remap job dependencies for job not created by current user."
)
elif trans.user is None and galaxy_session:
if old_job.session_id != galaxy_session.id:
raise RequestParameterInvalidException(
"Cannot remap job dependencies for job not created by current user."
)
else:
raise AuthenticationRequired("Authentication required to remap job dependencies")
# Need to flush here so that referencing outputs by id works
session = trans.sa_session()
try:
session.expire_on_commit = False
with transaction(session):
session.commit()
finally:
session.expire_on_commit = True
try:
old_job = trans.sa_session.get(Job, rerun_remap_job_id)
assert old_job is not None, f"({rerun_remap_job_id}/{current_job.id}): Old job id is invalid"
assert (
old_job.tool_id == current_job.tool_id
), f"({old_job.id}/{current_job.id}): Old tool id ({old_job.tool_id}) does not match rerun tool id ({current_job.tool_id})"
if trans.user is not None:
assert (
old_job.user_id == trans.user.id
), f"({old_job.id}/{current_job.id}): Old user id ({old_job.user_id}) does not match rerun user id ({trans.user.id})"
elif trans.user is None and isinstance(galaxy_session, trans.model.GalaxySession):
assert (
old_job.session_id == galaxy_session.id
), f"({old_job.id}/{current_job.id}): Old session id ({old_job.session_id}) does not match rerun session id ({galaxy_session.id})"
else:
raise Exception(f"({old_job.id}/{current_job.id}): Remapping via the API is not (yet) supported")
# Start by hiding current job outputs before taking over the old job's (implicit) outputs.
current_job.hide_outputs(flush=False)
# Duplicate PJAs before remap.
Expand All @@ -792,7 +810,7 @@ def _remap_job_on_rerun(self, trans, galaxy_session, rerun_remap_job_id, current
for jtod in old_job.output_datasets:
for job_to_remap, jtid in [(jtid.job, jtid) for jtid in jtod.dataset.dependent_jobs]:
if (trans.user is not None and job_to_remap.user_id == trans.user.id) or (
trans.user is None and job_to_remap.session_id == galaxy_session.id
trans.user is None and galaxy_session and job_to_remap.session_id == galaxy_session.id
):
self.__remap_parameters(job_to_remap, jtid, jtod, out_data)
trans.sa_session.add(job_to_remap)
Expand Down
26 changes: 26 additions & 0 deletions lib/galaxy_test/api/test_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,32 @@ def test_no_hide_on_rerun(self):
assert hdca["visible"]
assert isoparse(hdca["update_time"]) > (isoparse(first_update_time))

def test_rerun_exception_handling(self):
with self.dataset_populator.test_history() as history_id:
other_run_response = self.dataset_populator.run_tool(
tool_id="job_properties",
inputs={},
history_id=history_id,
)
unrelated_job_id = other_run_response["jobs"][0]["id"]
run_response = self._run_map_over_error(history_id)
job_id = run_response["jobs"][0]["id"]
self.dataset_populator.wait_for_job(job_id)
failed_hdca = self.dataset_populator.get_history_collection_details(
history_id=history_id,
content_id=run_response["implicit_collections"][0]["id"],
assert_ok=False,
)
assert failed_hdca["visible"]
rerun_params = self._get(f"jobs/{job_id}/build_for_rerun").json()
inputs = rerun_params["state_inputs"]
inputs["rerun_remap_job_id"] = unrelated_job_id
before_rerun_items = self.dataset_populator.get_history_contents(history_id)
rerun_response = self._run_detect_errors(history_id=history_id, inputs=inputs)
assert "does not match rerun tool id" in rerun_response["err_msg"]
after_rerun_items = self.dataset_populator.get_history_contents(history_id)
assert len(before_rerun_items) == len(after_rerun_items)

@skip_without_tool("empty_output")
def test_common_problems(self):
with self.dataset_populator.test_history() as history_id:
Expand Down

0 comments on commit 6c85d45

Please sign in to comment.