From 5b88ab9767dc831309f631c5905e02b32ce3a864 Mon Sep 17 00:00:00 2001 From: Nicola Soranzo Date: Wed, 6 Nov 2024 14:46:56 +0000 Subject: [PATCH] Add ``version`` and ``use_cached_job`` parameters to ``WorkflowClient.invoke_workflow()`` Fix https://github.com/galaxyproject/bioblend/issues/491 . --- CHANGELOG.md | 4 ++ bioblend/_tests/TestGalaxyWorkflows.py | 86 +++++++++++++++++--------- bioblend/galaxy/workflows/__init__.py | 20 ++++-- 3 files changed, 76 insertions(+), 34 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d120b8fe9..d4b84da3e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ * Added support for Python 3.13. Added support for Galaxy release 24.1. +* Added ``version`` and ``use_cached_job`` parameters to + ``WorkflowClient.invoke_workflow()`` method (reported by + [Bérénice Batut](https://github.com/bebatut)). + ### BioBlend v1.3.0 - 2024-05-12 * Dropped support for Python 3.7. Added support for Python 3.12. Added support diff --git a/bioblend/_tests/TestGalaxyWorkflows.py b/bioblend/_tests/TestGalaxyWorkflows.py index 000506efc..4b942095f 100644 --- a/bioblend/_tests/TestGalaxyWorkflows.py +++ b/bioblend/_tests/TestGalaxyWorkflows.py @@ -188,6 +188,7 @@ def test_show_workflow(self): assert wf_data["id"] == wf["id"] assert wf_data["name"] == wf["name"] assert wf_data["url"] == wf["url"] + assert wf_data["version"] == 0 assert len(wf_data["steps"]) == 3 assert wf_data["inputs"] is not None @@ -208,26 +209,6 @@ def test_update_workflow_published(self): updated_wf = self.gi.workflows.update_workflow(wf["id"], published=False) assert not updated_wf["published"] - @test_util.skip_unless_galaxy( - "release_19.09" - ) # due to Galaxy bug fixed in https://github.com/galaxyproject/galaxy/pull/9014 - def test_show_workflow_versions(self): - path = test_util.get_abspath(os.path.join("data", "paste_columns.ga")) - wf = self.gi.workflows.import_workflow_from_local_path(path) - wf_data = self.gi.workflows.show_workflow(wf["id"]) - assert wf_data["version"] == 0 - new_name = "new name" - self.gi.workflows.update_workflow(wf["id"], name=new_name) - updated_wf = self.gi.workflows.show_workflow(wf["id"]) - assert updated_wf["name"] == new_name - assert updated_wf["version"] == 1 - updated_wf = self.gi.workflows.show_workflow(wf["id"], version=0) - assert updated_wf["name"] == "paste_columns" - assert updated_wf["version"] == 0 - updated_wf = self.gi.workflows.show_workflow(wf["id"], version=1) - assert updated_wf["name"] == new_name - assert updated_wf["version"] == 1 - @test_util.skip_unless_galaxy("release_19.09") def test_extract_workflow_from_history(self): path = test_util.get_abspath(os.path.join("data", "paste_columns.ga")) @@ -264,16 +245,6 @@ def test_extract_workflow_from_history(self): assert wf1["steps"][str(i)]["type"] == wf2["steps"][str(i)]["type"] assert wf1["steps"][str(i)]["tool_id"] == wf2["steps"][str(i)]["tool_id"] - def test_show_versions(self): - path = test_util.get_abspath(os.path.join("data", "paste_columns.ga")) - wf = self.gi.workflows.import_workflow_from_local_path(path) - versions = self.gi.workflows.show_versions(wf["id"]) - assert len(versions) == 1 - version = versions[0] - assert version["version"] == 0 - assert "update_time" in version - assert "steps" in version - @test_util.skip_unless_galaxy("release_21.01") def test_refactor_workflow(self): actions: List[Dict[str, Any]] = [ @@ -288,3 +259,58 @@ def test_refactor_workflow(self): updated_steps = response["workflow"]["steps"] assert len(updated_steps) == 4 assert {step["label"] for step in updated_steps.values()} == {"bar", None, "Input 1", "Input 2"} + + +class TestGalaxyWorkflowVersions(GalaxyTestBase.GalaxyTestBase): + workflow_id: str + + @classmethod + def setUpClass(cls) -> None: + super().setUpClass() + path = test_util.get_abspath(os.path.join("data", "paste_columns.ga")) + cls.workflow_id = cls.gi.workflows.import_workflow_from_local_path(path)["id"] + cls.new_name = "new name" + cls.gi.workflows.update_workflow(cls.workflow_id, name=cls.new_name) + + @test_util.skip_unless_galaxy( + "release_19.09" + ) # due to Galaxy bug fixed in https://github.com/galaxyproject/galaxy/pull/9014 + def test_show_workflow_versions(self): + updated_wf = self.gi.workflows.show_workflow(self.workflow_id) + assert updated_wf["name"] == self.new_name + assert updated_wf["version"] == 1 + wf_v0 = self.gi.workflows.show_workflow(self.workflow_id, version=0) + assert wf_v0["name"] == "paste_columns" + assert wf_v0["version"] == 0 + wf_v1 = self.gi.workflows.show_workflow(self.workflow_id, version=1) + assert updated_wf == wf_v1 + + def test_show_versions(self): + versions = self.gi.workflows.show_versions(self.workflow_id) + assert len(versions) == 2 + for i, version in enumerate(versions): + assert version["version"] == i + assert "update_time" in version + assert "steps" in version + + def test_invoke_previous_version(self): + history_id = self.gi.histories.create_history(name="test_wf_invocation")["id"] + dataset1_id = self._test_dataset(history_id) + dataset = {"src": "hda", "id": dataset1_id} + invocation_id = self.gi.workflows.invoke_workflow( + self.workflow_id, + inputs={"Input 1": dataset, "Input 2": dataset}, + history_id=history_id, + inputs_by="name", + version=0, + )["id"] + self.gi.invocations.wait_for_invocation(invocation_id) + # Try invalid invocation (wrong version) + with pytest.raises(ConnectionError): + self.gi.workflows.invoke_workflow( + self.workflow_id, + inputs={"Input 1": dataset, "Input 2": dataset}, + history_id=history_id, + inputs_by="name", + version=2, + ) diff --git a/bioblend/galaxy/workflows/__init__.py b/bioblend/galaxy/workflows/__init__.py index 89e31e347..32d0c82ce 100644 --- a/bioblend/galaxy/workflows/__init__.py +++ b/bioblend/galaxy/workflows/__init__.py @@ -289,6 +289,8 @@ def invoke_workflow( inputs_by: Optional[InputsBy] = None, parameters_normalized: bool = False, require_exact_tool_versions: bool = True, + version: Optional[int] = None, + use_cached_job: bool = False, ) -> Dict[str, Any]: """ Invoke the workflow identified by ``workflow_id``. This will @@ -359,6 +361,14 @@ def invoke_workflow( Galaxy does not have the exact tool versions. Default is ``True``. Parameter does not any effect for Galaxy versions < 22.05. + :type version: int + :param version: The version of the workflow to invoke. If omitted or + None, the latest workflow version will be invoked. + + :type use_cached_job: bool + :param use_cached_job: Whether to use cached jobs for the workflow + invocation. + :rtype: dict :return: A dict containing the workflow invocation describing the scheduling of the workflow. For example:: @@ -467,7 +477,12 @@ def invoke_workflow( (which is stable across workflow imports) or the step UUID which is also stable. """ - payload: Dict[str, Any] = {} + payload: Dict[str, Any] = { + "allow_tool_state_corrections": allow_tool_state_corrections, + "require_exact_tool_versions": require_exact_tool_versions, + "version": version, + "use_cached_job": use_cached_job, + } if inputs: payload["inputs"] = inputs @@ -483,11 +498,8 @@ def invoke_workflow( payload["history"] = history_name if not import_inputs_to_history: payload["no_add_to_history"] = True - if allow_tool_state_corrections: - payload["allow_tool_state_corrections"] = allow_tool_state_corrections if inputs_by is not None: payload["inputs_by"] = inputs_by - payload["require_exact_tool_versions"] = require_exact_tool_versions if parameters_normalized: payload["parameters_normalized"] = parameters_normalized url = self._invocations_url(workflow_id)