Skip to content

Commit

Permalink
Merge pull request #16965 from mvdbeek/dont_hold_on_to_job_object
Browse files Browse the repository at this point in the history
[23.1] Don't store job in JobIO instance attributes
  • Loading branch information
mvdbeek authored Nov 9, 2023
2 parents 0290219 + 5566be0 commit f450dee
Showing 1 changed file with 47 additions and 18 deletions.
65 changes: 47 additions & 18 deletions lib/galaxy/job_execution/setup.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
"""Utilities to help job and tool code setup jobs."""
import json
import os
import threading
from typing import (
Any,
cast,
Dict,
List,
NamedTuple,
Optional,
Tuple,
Union,
Expand Down Expand Up @@ -38,6 +40,27 @@
OutputPaths = List[DatasetPath]


class JobOutput(NamedTuple):
output_name: str
dataset: DatasetInstance
dataset_path: DatasetPath


class JobOutputs(threading.local):
def __init__(self) -> None:
super().__init__()
self.output_hdas_and_paths: Optional[OutputHdasAndType] = None
self.output_paths: Optional[OutputPaths] = None

@property
def populated(self) -> bool:
return self.output_hdas_and_paths is not None

def set_job_outputs(self, job_outputs: List[JobOutput]) -> None:
self.output_paths = [t[2] for t in job_outputs]
self.output_hdas_and_paths = {t.output_name: (t.dataset, t.dataset_path) for t in job_outputs}


class JobIO(Dictifiable):
dict_collection_visible_keys = (
"job_id",
Expand Down Expand Up @@ -99,7 +122,6 @@ def __init__(
user_context_instance = user_context
self.user_context = user_context_instance
self.sa_session = sa_session
self.job = job
self.job_id = job.id
self.working_directory = working_directory
self.outputs_directory = outputs_directory
Expand All @@ -121,25 +143,33 @@ def __init__(
self.is_task = is_task
self.tool_source = tool_source
self.tool_source_class = tool_source_class
self._output_paths: Optional[OutputPaths] = None
self._output_hdas_and_paths: Optional[OutputHdasAndType] = None
self.job_outputs = JobOutputs()
self._dataset_path_rewriter: Optional[DatasetPathRewriter] = None

@property
def job(self):
return self.sa_session.query(Job).get(self.job_id)

@classmethod
def from_json(cls, path, sa_session):
with open(path) as job_io_serialized:
io_dict = json.load(job_io_serialized)
return cls.from_dict(io_dict=io_dict, sa_session=sa_session)

@classmethod
def from_dict(cls, io_dict, sa_session):
io_dict.pop("model_class")
# Drop in 24.0
io_dict.pop("model_class", None)
job_id = io_dict.pop("job_id")
job = sa_session.query(Job).get(job_id)
return cls(sa_session=sa_session, job=job, **io_dict)

@classmethod
def from_dict(cls, io_dict, sa_session):
# Drop in 24.0
io_dict.pop("model_class", None)
return cls(sa_session=sa_session, **io_dict)

def to_dict(self):
io_dict = super().to_dict()
# dict_for will always add `model_class`, we don't need or want it
io_dict.pop("model_class")
io_dict["user_context"] = self.user_context.to_dict()
return io_dict

Expand All @@ -165,15 +195,15 @@ def dataset_path_rewriter(self) -> DatasetPathRewriter:

@property
def output_paths(self) -> OutputPaths:
if self._output_paths is None:
if not self.job_outputs.populated:
self.compute_outputs()
return cast(OutputPaths, self._output_paths)
return cast(OutputPaths, self.job_outputs.output_paths)

@property
def output_hdas_and_paths(self) -> OutputHdasAndType:
if self._output_hdas_and_paths is None:
if not self.job_outputs.populated:
self.compute_outputs()
return cast(OutputHdasAndType, self._output_hdas_and_paths)
return cast(OutputHdasAndType, self.job_outputs.output_hdas_and_paths)

def get_input_dataset_fnames(self, ds: DatasetInstance) -> List[str]:
filenames = [ds.file_name]
Expand Down Expand Up @@ -241,22 +271,21 @@ def compute_outputs(self) -> None:
special = self.sa_session.query(JobExportHistoryArchive).filter_by(job=job).first()
false_path = None

results = []
job_outputs = []
for da in job.output_datasets + job.output_library_datasets:
da_false_path = dataset_path_rewriter.rewrite_dataset_path(da.dataset, "output")
mutable = da.dataset.dataset.external_filename is None
dataset_path = DatasetPath(
da.dataset.dataset.id, da.dataset.file_name, false_path=da_false_path, mutable=mutable
)
results.append((da.name, da.dataset, dataset_path))
job_outputs.append(JobOutput(da.name, da.dataset, dataset_path))

self._output_paths = [t[2] for t in results]
self._output_hdas_and_paths = {t[0]: t[1:] for t in results}
if special:
false_path = dataset_path_rewriter.rewrite_dataset_path(special, "output")
dsp = DatasetPath(special.dataset.id, special.dataset.file_name, false_path)
self._output_paths.append(dsp)
self._output_hdas_and_paths["output_file"] = (special.fda, dsp)
job_outputs.append(JobOutput("output_file", special.fda, dsp))

self.job_outputs.set_job_outputs(job_outputs)

def get_output_file_id(self, file: str) -> Optional[int]:
for dp in self.output_paths:
Expand Down

0 comments on commit f450dee

Please sign in to comment.