Skip to content

Commit

Permalink
Merge pull request galaxyproject#16647 from jdavcs/23.1_detached_sess…
Browse files Browse the repository at this point in the history
…ion_error

[23.1] Ensure Job belongs to current SA session
  • Loading branch information
mvdbeek authored Sep 7, 2023
2 parents 02461d9 + a155fa4 commit 3db5f56
Showing 1 changed file with 18 additions and 0 deletions.
18 changes: 18 additions & 0 deletions lib/galaxy/jobs/runners/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
Queue,
)

from sqlalchemy.orm import object_session

import galaxy.jobs
from galaxy import model
from galaxy.exceptions import ConfigurationError
Expand Down Expand Up @@ -154,6 +156,10 @@ def run_next(self):
name = method.__name__
except Exception:
name = UNKNOWN

# Ensure a Job object belongs to a session
self._ensure_db_session(arg)

try:
action_str = f"galaxy.jobs.runners.{self.__class__.__name__.lower()}.{name}"
action_timer = self.app.execution_timer_factory.get_timer(
Expand All @@ -171,6 +177,18 @@ def run_next(self):
# Prevent fail_job cycle in the work_queue
self.work_queue.put((self.fail_job, job_state))

def _ensure_db_session(self, arg: typing.Union["JobWrapper", "JobState"]) -> None:
"""Ensure Job object belongs to current session."""
try:
job_wrapper = arg.job_wrapper # type: ignore[union-attr]
except AttributeError:
job_wrapper = arg

if job_wrapper._job_io:
job = job_wrapper._job_io.job
if object_session(job) is None:
self.app.model.session().add(job)

# Causes a runner's `queue_job` method to be called from a worker thread
def put(self, job_wrapper: "MinimalJobWrapper"):
"""Add a job to the queue (by job identifier), indicate that the job is ready to run."""
Expand Down

0 comments on commit 3db5f56

Please sign in to comment.