From a155fa47141553e437ece0b2b2f28bb007b14b1f Mon Sep 17 00:00:00 2001 From: John Davis Date: Tue, 5 Sep 2023 19:50:49 -0400 Subject: [PATCH] Ensure Job belongs to current SA session --- lib/galaxy/jobs/runners/__init__.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/lib/galaxy/jobs/runners/__init__.py b/lib/galaxy/jobs/runners/__init__.py index c79d1926d0e9..6c95de6312b3 100644 --- a/lib/galaxy/jobs/runners/__init__.py +++ b/lib/galaxy/jobs/runners/__init__.py @@ -15,6 +15,8 @@ Queue, ) +from sqlalchemy.orm import object_session + import galaxy.jobs from galaxy import model from galaxy.exceptions import ConfigurationError @@ -154,6 +156,10 @@ def run_next(self): name = method.__name__ except Exception: name = UNKNOWN + + # Ensure a Job object belongs to a session + self._ensure_db_session(arg) + try: action_str = f"galaxy.jobs.runners.{self.__class__.__name__.lower()}.{name}" action_timer = self.app.execution_timer_factory.get_timer( @@ -171,6 +177,18 @@ def run_next(self): # Prevent fail_job cycle in the work_queue self.work_queue.put((self.fail_job, job_state)) + def _ensure_db_session(self, arg: typing.Union["JobWrapper", "JobState"]) -> None: + """Ensure Job object belongs to current session.""" + try: + job_wrapper = arg.job_wrapper # type: ignore[union-attr] + except AttributeError: + job_wrapper = arg + + if job_wrapper._job_io: + job = job_wrapper._job_io.job + if object_session(job) is None: + self.app.model.session().add(job) + # Causes a runner's `queue_job` method to be called from a worker thread def put(self, job_wrapper: "MinimalJobWrapper"): """Add a job to the queue (by job identifier), indicate that the job is ready to run."""