diff --git a/lib/galaxy/model/__init__.py b/lib/galaxy/model/__init__.py index 97c41e7ee2e1..5090fbefd9be 100644 --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -7035,6 +7035,9 @@ def contains_collection(self, collection_id): return len(results) > 0 +HistoryItem: TypeAlias = Union[HistoryDatasetAssociation, HistoryDatasetCollectionAssociation] + + class LibraryDatasetCollectionAssociation(Base, DatasetCollectionInstance, RepresentById): """Associates a DatasetCollection with a library folder.""" diff --git a/lib/galaxy/model/store/__init__.py b/lib/galaxy/model/store/__init__.py index 38564174ab34..dc8d3aa76da0 100644 --- a/lib/galaxy/model/store/__init__.py +++ b/lib/galaxy/model/store/__init__.py @@ -120,7 +120,10 @@ if TYPE_CHECKING: from galaxy.managers.workflows import WorkflowContentsManager - from galaxy.model import ImplicitCollectionJobs + from galaxy.model import ( + HistoryItem, + ImplicitCollectionJobs, + ) from galaxy.model.tags import GalaxyTagHandlerSession log = logging.getLogger(__name__) @@ -1339,9 +1342,6 @@ def _copied_from_object_key( return copied_from_object_key -HasHid = Union[model.HistoryDatasetAssociation, model.HistoryDatasetCollectionAssociation] - - class ObjectImportTracker: """Keep track of new and existing imported objects. @@ -1359,8 +1359,8 @@ class ObjectImportTracker: hda_copied_from_sinks: Dict[ObjectKeyType, ObjectKeyType] hdca_copied_from_sinks: Dict[ObjectKeyType, ObjectKeyType] jobs_by_key: Dict[ObjectKeyType, model.Job] - requires_hid: List[HasHid] - copy_hid_for: Dict[HasHid, HasHid] + requires_hid: List["HistoryItem"] + copy_hid_for: Dict["HistoryItem", "HistoryItem"] def __init__(self) -> None: self.libraries_by_key = {} diff --git a/lib/galaxy/tool_util/client/staging.py b/lib/galaxy/tool_util/client/staging.py index 0a93fb9ddd98..8156dd123f58 100644 --- a/lib/galaxy/tool_util/client/staging.py +++ b/lib/galaxy/tool_util/client/staging.py @@ -72,7 +72,7 @@ def _fetch_post(self, payload: Dict[str, Any]) -> Dict[str, Any]: return tool_response @abc.abstractmethod - def _handle_job(self, job_response): + def _handle_job(self, job_response: Dict[str, Any]): """Implementer can decide if to wait for job(s) individually or not here.""" def stage( @@ -288,7 +288,7 @@ def _post(self, api_path: str, payload: Dict[str, Any]) -> Dict[str, Any]: assert response.status_code == 200, response.text return response.json() - def _handle_job(self, job_response): + def _handle_job(self, job_response: Dict[str, Any]): self.galaxy_interactor.wait_for_job(job_response["id"]) @property diff --git a/lib/galaxy/tool_util/verify/interactor.py b/lib/galaxy/tool_util/verify/interactor.py index da4bf13baf3b..f92e5a507827 100644 --- a/lib/galaxy/tool_util/verify/interactor.py +++ b/lib/galaxy/tool_util/verify/interactor.py @@ -385,24 +385,23 @@ def compare(val, expected): except KeyError: raise Exception(f"Failed to verify dataset metadata, metadata key [{key}] was not found.") - def wait_for_job(self, job_id, history_id=None, maxseconds=DEFAULT_TOOL_TEST_WAIT): + def wait_for_job(self, job_id: str, history_id: Optional[str] = None, maxseconds=DEFAULT_TOOL_TEST_WAIT) -> None: self.wait_for(lambda: self.__job_ready(job_id, history_id), maxseconds=maxseconds) - def wait_for(self, func, what="tool test run", **kwd): + def wait_for(self, func: Callable, what: str = "tool test run", **kwd) -> None: walltime_exceeded = int(kwd.get("maxseconds", DEFAULT_TOOL_TEST_WAIT)) wait_on(func, what, walltime_exceeded) - def get_job_stdio(self, job_id): - job_stdio = self.__get_job_stdio(job_id).json() - return job_stdio + def get_job_stdio(self, job_id: str) -> Dict[str, Any]: + return self.__get_job_stdio(job_id).json() - def __get_job(self, job_id): + def __get_job(self, job_id: str) -> Response: return self._get(f"jobs/{job_id}") - def __get_job_stdio(self, job_id): + def __get_job_stdio(self, job_id: str) -> Response: return self._get(f"jobs/{job_id}?full=true") - def get_history(self, history_name="test_history"): + def get_history(self, history_name: str = "test_history") -> Optional[Dict[str, Any]]: # Return the most recent non-deleted history matching the provided name filters = urllib.parse.urlencode({"q": "name", "qv": history_name, "order": "update_time"}) response = self._get(f"histories?{filters}") @@ -430,7 +429,7 @@ def test_history( if cleanup and cleanup_callback is not None: cleanup_callback(history_id) - def new_history(self, history_name="test_history", publish_history=False): + def new_history(self, history_name: str = "test_history", publish_history: bool = False) -> str: create_response = self._post("histories", {"name": history_name}) try: create_response.raise_for_status() @@ -441,7 +440,7 @@ def new_history(self, history_name="test_history", publish_history=False): self.publish_history(history_id) return history_id - def publish_history(self, history_id): + def publish_history(self, history_id: str) -> None: response = self._put(f"histories/{history_id}", json.dumps({"published": True})) response.raise_for_status() @@ -710,10 +709,10 @@ def __dictify_outputs(self, datasets_object) -> OutputsDict: def output_hid(self, output_data): return output_data["id"] - def delete_history(self, history): + def delete_history(self, history: str) -> None: self._delete(f"histories/{history}") - def __job_ready(self, job_id, history_id=None): + def __job_ready(self, job_id: str, history_id: Optional[str] = None): if job_id is None: raise ValueError("__job_ready passed empty job_id") try: @@ -803,7 +802,7 @@ def __contents(self, history_id): history_contents_response.raise_for_status() return history_contents_response.json() - def _state_ready(self, job_id, error_msg): + def _state_ready(self, job_id: str, error_msg: str): state_str = self.__get_job(job_id).json()["state"] if state_str == "ok": return True diff --git a/lib/galaxy/tool_util/verify/script.py b/lib/galaxy/tool_util/verify/script.py index 3c0393c20668..f14f0f68df4f 100644 --- a/lib/galaxy/tool_util/verify/script.py +++ b/lib/galaxy/tool_util/verify/script.py @@ -179,7 +179,7 @@ def test_tools( verify_kwds = (verify_kwds or {}).copy() tool_test_start = dt.datetime.now() history_created = False - test_history = None + test_history: Optional[str] = None if not history_per_test_case: if not history_name: history_name = f"History for {results.suitename}" @@ -192,8 +192,8 @@ def test_tools( if log: log.info(f"Using existing history with id '{test_history}', last updated: {history['update_time']}") if not test_history: - history_created = True test_history = galaxy_interactor.new_history(history_name=history_name, publish_history=publish_history) + history_created = True if log: log.info(f"History created with id '{test_history}'") verify_kwds.update( @@ -231,7 +231,7 @@ def test_tools( log.info(f"Report written to '{destination}'") log.info(results.info_message()) log.info(f"Total tool test time: {dt.datetime.now() - tool_test_start}") - if history_created and not no_history_cleanup: + if test_history and history_created and not no_history_cleanup: galaxy_interactor.delete_history(test_history) diff --git a/lib/galaxy/tools/expressions/evaluation.py b/lib/galaxy/tools/expressions/evaluation.py index e6334111c84f..e5ec9636f2c3 100644 --- a/lib/galaxy/tools/expressions/evaluation.py +++ b/lib/galaxy/tools/expressions/evaluation.py @@ -1,24 +1,34 @@ import json import os import subprocess -from typing import MutableMapping +from typing import ( + Optional, + TYPE_CHECKING, +) from cwl_utils.expression import do_eval as _do_eval from .util import find_engine +if TYPE_CHECKING: + from cwl_utils.types import ( + CWLObjectType, + CWLOutputType, + ) + FILE_DIRECTORY = os.path.normpath(os.path.dirname(os.path.join(__file__))) NODE_ENGINE = os.path.join(FILE_DIRECTORY, "cwlNodeEngine.js") -def do_eval(expression: str, context: MutableMapping): +def do_eval(expression: str, jobinput: "CWLObjectType", context: Optional["CWLOutputType"] = None): return _do_eval( expression, - context, + jobinput, [{"class": "InlineJavascriptRequirement"}], None, None, {}, + context=context, cwlVersion="v1.2.1", ) diff --git a/lib/galaxy/tools/parameters/basic.py b/lib/galaxy/tools/parameters/basic.py index b5a8c30a8754..35ab932a3ec4 100644 --- a/lib/galaxy/tools/parameters/basic.py +++ b/lib/galaxy/tools/parameters/basic.py @@ -65,7 +65,12 @@ if TYPE_CHECKING: from sqlalchemy.orm import Session + from galaxy.model import ( + History, + HistoryItem, + ) from galaxy.security.idencoding import IdEncodingHelper + from galaxy.structured_app import MinimalApp log = logging.getLogger(__name__) @@ -2116,7 +2121,7 @@ def from_json(self, value, trans, other_values=None): raise ParameterValueError("specify a dataset of the required format / build for parameter", self.name) if value in [None, "None", ""]: if self.default_object: - return raw_to_galaxy(trans, self.default_object) + return raw_to_galaxy(trans.app, trans.history, self.default_object) return None if isinstance(value, MutableMapping) and "values" in value: value = self.to_python(value, trans.app) @@ -2456,7 +2461,7 @@ def from_json(self, value, trans, other_values=None): raise ParameterValueError("specify a dataset collection of the correct type", self.name) if value in [None, "None"]: if self.default_object: - return raw_to_galaxy(trans, self.default_object) + return raw_to_galaxy(trans.app, trans.history, self.default_object) return None if isinstance(value, MutableMapping) and "values" in value: value = self.to_python(value, trans.app) @@ -2672,10 +2677,7 @@ def to_text(self, value): # Code from CWL branch to massage in order to be shared across tools and workflows, # and for CWL artifacts as well as Galaxy ones. -def raw_to_galaxy(trans, as_dict_value): - app = trans.app - history = trans.history - +def raw_to_galaxy(app: "MinimalApp", history: "History", as_dict_value: Dict[str, Any]) -> "HistoryItem": object_class = as_dict_value["class"] if object_class == "File": # TODO: relative_to = "/" @@ -2714,15 +2716,15 @@ def raw_to_galaxy(trans, as_dict_value): dbkey="?", dataset=dataset, flush=False, - sa_session=trans.sa_session, + sa_session=app.model.session, ) primary_data.state = Dataset.states.DEFERRED permissions = app.security_agent.history_get_default_permissions(history) app.security_agent.set_all_dataset_permissions(primary_data.dataset, permissions, new=True, flush=False) - trans.sa_session.add(primary_data) + app.model.session.add(primary_data) history.stage_addition(primary_data) history.add_pending_items() - trans.sa_session.flush() + app.model.session.flush() return primary_data else: name = as_dict_value.get("name") @@ -2741,7 +2743,7 @@ def write_elements_to_collection(has_elements, collection_builder): element_class = element_dict["class"] identifier = element_dict["identifier"] if element_class == "File": - hda = raw_to_galaxy(trans, element_dict) + hda = raw_to_galaxy(app, history, element_dict) collection_builder.add_dataset(identifier, hda) else: subcollection_builder = collection_builder.get_level(identifier) @@ -2750,8 +2752,8 @@ def write_elements_to_collection(has_elements, collection_builder): collection_builder = builder.BoundCollectionBuilder(collection) write_elements_to_collection(as_dict_value, collection_builder) collection_builder.populate() - trans.sa_session.add(hdca) - trans.sa_session.flush() + app.model.session.add(hdca) + app.model.session.flush() return hdca diff --git a/lib/galaxy/webapps/galaxy/services/history_contents.py b/lib/galaxy/webapps/galaxy/services/history_contents.py index 8ce9db729e45..781ad34965fd 100644 --- a/lib/galaxy/webapps/galaxy/services/history_contents.py +++ b/lib/galaxy/webapps/galaxy/services/history_contents.py @@ -9,6 +9,7 @@ List, Optional, Set, + TYPE_CHECKING, Union, ) @@ -127,10 +128,12 @@ ServiceBase, ) +if TYPE_CHECKING: + from galaxy.model import HistoryItem + log = logging.getLogger(__name__) DatasetDetailsType = Union[Set[DecodedDatabaseIdField], Literal["all"]] -HistoryItemModel = Union[HistoryDatasetAssociation, HistoryDatasetCollectionAssociation] class HistoryContentsIndexParams(Model): @@ -690,7 +693,7 @@ def bulk_operation( history = self.history_manager.get_mutable(history_id, trans.user, current_history=trans.history) filters = self.history_contents_filters.parse_query_filters(filter_query_params) self._validate_bulk_operation_params(payload, trans.user, trans) - contents: List[HistoryItemModel] + contents: List["HistoryItem"] if payload.items: contents = self._get_contents_by_item_list( trans, @@ -1328,7 +1331,7 @@ def _validate_bulk_operation_params( def _apply_bulk_operation( self, - contents: Iterable[HistoryItemModel], + contents: Iterable["HistoryItem"], operation: HistoryContentItemOperation, params: Optional[AnyBulkOperationParams], trans: ProvidesHistoryContext, @@ -1343,7 +1346,7 @@ def _apply_bulk_operation( def _apply_operation_to_item( self, operation: HistoryContentItemOperation, - item: HistoryItemModel, + item: "HistoryItem", params: Optional[AnyBulkOperationParams], trans: ProvidesHistoryContext, ) -> Optional[BulkOperationItemError]: @@ -1358,8 +1361,8 @@ def _apply_operation_to_item( def _get_contents_by_item_list( self, trans, history: History, items: List[HistoryContentItem] - ) -> List[HistoryItemModel]: - contents: List[HistoryItemModel] = [] + ) -> List["HistoryItem"]: + contents: List["HistoryItem"] = [] dataset_items = filter(lambda item: item.history_content_type == HistoryContentType.dataset, items) datasets_ids = map(lambda dataset: dataset.id, dataset_items) @@ -1380,7 +1383,7 @@ def _get_contents_by_item_list( class ItemOperation(Protocol): def __call__( - self, item: HistoryItemModel, params: Optional[AnyBulkOperationParams], trans: ProvidesHistoryContext + self, item: "HistoryItem", params: Optional[AnyBulkOperationParams], trans: ProvidesHistoryContext ) -> None: ... @@ -1414,24 +1417,24 @@ def __init__( def apply( self, operation: HistoryContentItemOperation, - item: HistoryItemModel, + item: "HistoryItem", params: Optional[AnyBulkOperationParams], trans: ProvidesHistoryContext, ): self._operation_map[operation](item, params, trans) - def _get_item_manager(self, item: HistoryItemModel): + def _get_item_manager(self, item: "HistoryItem"): if isinstance(item, HistoryDatasetAssociation): return self.hda_manager return self.hdca_manager - def _hide(self, item: HistoryItemModel): + def _hide(self, item: "HistoryItem"): item.visible = False - def _unhide(self, item: HistoryItemModel): + def _unhide(self, item: "HistoryItem"): item.visible = True - def _delete(self, item: HistoryItemModel, trans: ProvidesHistoryContext): + def _delete(self, item: "HistoryItem", trans: ProvidesHistoryContext): if isinstance(item, HistoryDatasetCollectionAssociation): self.dataset_collection_manager.delete(trans, "history", item.id, recursive=True, purge=False) else: @@ -1440,13 +1443,13 @@ def _delete(self, item: HistoryItemModel, trans: ProvidesHistoryContext): # otherwise the history will wait indefinitely for the items to be deleted item.update() - def _undelete(self, item: HistoryItemModel): + def _undelete(self, item: "HistoryItem"): if getattr(item, "purged", False): raise exceptions.ItemDeletionException("This item has been permanently deleted and cannot be recovered.") manager = self._get_item_manager(item) manager.undelete(item, flush=self.flush) - def _purge(self, item: HistoryItemModel, trans: ProvidesHistoryContext): + def _purge(self, item: "HistoryItem", trans: ProvidesHistoryContext): if getattr(item, "purged", False): # TODO: remove this `update` when we can properly track the operation results to notify the history item.update() @@ -1456,7 +1459,7 @@ def _purge(self, item: HistoryItemModel, trans: ProvidesHistoryContext): self.hda_manager.purge(item, flush=True, user=trans.user) def _change_datatype( - self, item: HistoryItemModel, params: ChangeDatatypeOperationParams, trans: ProvidesHistoryContext + self, item: "HistoryItem", params: ChangeDatatypeOperationParams, trans: ProvidesHistoryContext ): if isinstance(item, HistoryDatasetAssociation): wrapped_task = self._change_item_datatype(item, params, trans) @@ -1501,15 +1504,15 @@ def _change_item_datatype( dataset_id=item.id, datatype=params.datatype, task_user_id=getattr(trans.user, "id", None) ) - def _change_dbkey(self, item: HistoryItemModel, params: ChangeDbkeyOperationParams): + def _change_dbkey(self, item: "HistoryItem", params: ChangeDbkeyOperationParams): if isinstance(item, HistoryDatasetAssociation): item.set_dbkey(params.dbkey) elif isinstance(item, HistoryDatasetCollectionAssociation): for dataset_instance in item.dataset_instances: dataset_instance.set_dbkey(params.dbkey) - def _add_tags(self, trans: ProvidesUserContext, item: HistoryItemModel, params: TagOperationParams): + def _add_tags(self, trans: ProvidesUserContext, item: "HistoryItem", params: TagOperationParams): trans.tag_handler.add_tags_from_list(trans.user, item, params.tags, flush=self.flush) - def _remove_tags(self, trans: ProvidesUserContext, item: HistoryItemModel, params: TagOperationParams): + def _remove_tags(self, trans: ProvidesUserContext, item: "HistoryItem", params: TagOperationParams): trans.tag_handler.remove_tags_from_list(trans.user, item, params.tags, flush=self.flush) diff --git a/lib/galaxy/workflow/modules.py b/lib/galaxy/workflow/modules.py index a6138d2588d2..7783cfb40f05 100644 --- a/lib/galaxy/workflow/modules.py +++ b/lib/galaxy/workflow/modules.py @@ -942,7 +942,7 @@ def execute( if input_value is None: default_value = step.get_input_default_value(NO_REPLACEMENT) if default_value is not NO_REPLACEMENT: - input_value = raw_to_galaxy(trans, default_value) + input_value = raw_to_galaxy(trans.app, trans.history, default_value) step_outputs = dict(output=input_value) diff --git a/lib/galaxy/workflow/run.py b/lib/galaxy/workflow/run.py index 4d1c024fbdd7..c008848a3ee3 100644 --- a/lib/galaxy/workflow/run.py +++ b/lib/galaxy/workflow/run.py @@ -45,6 +45,7 @@ if TYPE_CHECKING: from galaxy.model import ( + HistoryItem, Workflow, WorkflowOutput, WorkflowStep, @@ -422,6 +423,7 @@ def replacement_for_input(self, trans, step: "WorkflowStep", input_dict: Dict[st modules.NoReplacement, model.DatasetCollectionInstance, List[model.DatasetCollectionInstance], + "HistoryItem", ] = modules.NO_REPLACEMENT prefixed_name = input_dict["name"] multiple = input_dict["multiple"] @@ -446,7 +448,7 @@ def replacement_for_input(self, trans, step: "WorkflowStep", input_dict: Dict[st for step_input in step.inputs: if step_input.name == prefixed_name and step_input.default_value_set: if is_data: - replacement = raw_to_galaxy(trans, step_input.default_value) + replacement = raw_to_galaxy(trans.app, trans.history, step_input.default_value) return replacement def replacement_for_connection(self, connection: "WorkflowStepConnection", is_data: bool = True): @@ -713,7 +715,7 @@ def subworkflow_progress( ) def raw_to_galaxy(self, value: dict): - return raw_to_galaxy(self.module_injector.trans, value) + return raw_to_galaxy(self.module_injector.trans.app, self.module_injector.trans.history, value) def _recover_mapping(self, step_invocation: WorkflowInvocationStep) -> None: try: