Skip to content

Commit

Permalink
Add version and use_cached_job parameters to ``WorkflowClient…
Browse files Browse the repository at this point in the history
….invoke_workflow()``

Fix galaxyproject#491 .
  • Loading branch information
nsoranzo committed Nov 6, 2024
1 parent e4fc020 commit 2882cc4
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 34 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

* Added support for Python 3.13. Added support for Galaxy release 24.1.

* Added ``version`` and ``use_cached_job`` parameters to
``WorkflowClient.invoke_workflow()`` method (reported by
[Bérénice Batut](https://github.com/bebatut)).

### BioBlend v1.3.0 - 2024-05-12

* Dropped support for Python 3.7. Added support for Python 3.12. Added support
Expand Down
90 changes: 60 additions & 30 deletions bioblend/_tests/TestGalaxyWorkflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ def test_show_workflow(self):
assert wf_data["id"] == wf["id"]
assert wf_data["name"] == wf["name"]
assert wf_data["url"] == wf["url"]
assert wf_data["version"] == 0
assert len(wf_data["steps"]) == 3
assert wf_data["inputs"] is not None

Expand All @@ -208,26 +209,6 @@ def test_update_workflow_published(self):
updated_wf = self.gi.workflows.update_workflow(wf["id"], published=False)
assert not updated_wf["published"]

@test_util.skip_unless_galaxy(
"release_19.09"
) # due to Galaxy bug fixed in https://github.com/galaxyproject/galaxy/pull/9014
def test_show_workflow_versions(self):
path = test_util.get_abspath(os.path.join("data", "paste_columns.ga"))
wf = self.gi.workflows.import_workflow_from_local_path(path)
wf_data = self.gi.workflows.show_workflow(wf["id"])
assert wf_data["version"] == 0
new_name = "new name"
self.gi.workflows.update_workflow(wf["id"], name=new_name)
updated_wf = self.gi.workflows.show_workflow(wf["id"])
assert updated_wf["name"] == new_name
assert updated_wf["version"] == 1
updated_wf = self.gi.workflows.show_workflow(wf["id"], version=0)
assert updated_wf["name"] == "paste_columns"
assert updated_wf["version"] == 0
updated_wf = self.gi.workflows.show_workflow(wf["id"], version=1)
assert updated_wf["name"] == new_name
assert updated_wf["version"] == 1

@test_util.skip_unless_galaxy("release_19.09")
def test_extract_workflow_from_history(self):
path = test_util.get_abspath(os.path.join("data", "paste_columns.ga"))
Expand Down Expand Up @@ -264,16 +245,6 @@ def test_extract_workflow_from_history(self):
assert wf1["steps"][str(i)]["type"] == wf2["steps"][str(i)]["type"]
assert wf1["steps"][str(i)]["tool_id"] == wf2["steps"][str(i)]["tool_id"]

def test_show_versions(self):
path = test_util.get_abspath(os.path.join("data", "paste_columns.ga"))
wf = self.gi.workflows.import_workflow_from_local_path(path)
versions = self.gi.workflows.show_versions(wf["id"])
assert len(versions) == 1
version = versions[0]
assert version["version"] == 0
assert "update_time" in version
assert "steps" in version

@test_util.skip_unless_galaxy("release_21.01")
def test_refactor_workflow(self):
actions: List[Dict[str, Any]] = [
Expand All @@ -288,3 +259,62 @@ def test_refactor_workflow(self):
updated_steps = response["workflow"]["steps"]
assert len(updated_steps) == 4
assert {step["label"] for step in updated_steps.values()} == {"bar", None, "Input 1", "Input 2"}


class TestGalaxyWorkflowVersions(GalaxyTestBase.GalaxyTestBase):
new_name: str
workflow_id: str

@classmethod
def setUpClass(cls) -> None:
super().setUpClass()
path = test_util.get_abspath(os.path.join("data", "paste_columns.ga"))
cls.workflow_id = cls.gi.workflows.import_workflow_from_local_path(path)["id"]
cls.new_name = "new name"
cls.gi.workflows.update_workflow(cls.workflow_id, name=cls.new_name)

@test_util.skip_unless_galaxy(
"release_19.09"
) # due to Galaxy bug fixed in https://github.com/galaxyproject/galaxy/pull/9014
def test_show_workflow_versions(self):
updated_wf = self.gi.workflows.show_workflow(self.workflow_id)
assert updated_wf["name"] == self.new_name
assert updated_wf["version"] == 1
wf_v0 = self.gi.workflows.show_workflow(self.workflow_id, version=0)
assert wf_v0["name"] == "paste_columns"
assert wf_v0["version"] == 0
wf_v1 = self.gi.workflows.show_workflow(self.workflow_id, version=1)
assert updated_wf == wf_v1

def test_show_versions(self):
versions = self.gi.workflows.show_versions(self.workflow_id)
assert len(versions) == 2
for i, version in enumerate(versions):
assert version["version"] == i
assert "update_time" in version
assert "steps" in version

@test_util.skip_unless_galaxy(
"release_24.01"
) # due to Galaxy bug fixed in https://github.com/galaxyproject/galaxy/pull/18378
def test_invoke_previous_version(self):
history_id = self.gi.histories.create_history(name="test_wf_invocation")["id"]
dataset1_id = self._test_dataset(history_id)
dataset = {"src": "hda", "id": dataset1_id}
invocation_id = self.gi.workflows.invoke_workflow(
self.workflow_id,
inputs={"Input 1": dataset, "Input 2": dataset},
history_id=history_id,
inputs_by="name",
version=0,
)["id"]
self.gi.invocations.wait_for_invocation(invocation_id)
# Try invalid invocation (wrong version)
with pytest.raises(ConnectionError):
self.gi.workflows.invoke_workflow(
self.workflow_id,
inputs={"Input 1": dataset, "Input 2": dataset},
history_id=history_id,
inputs_by="name",
version=2,
)
20 changes: 16 additions & 4 deletions bioblend/galaxy/workflows/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,8 @@ def invoke_workflow(
inputs_by: Optional[InputsBy] = None,
parameters_normalized: bool = False,
require_exact_tool_versions: bool = True,
version: Optional[int] = None,
use_cached_job: bool = False,
) -> Dict[str, Any]:
"""
Invoke the workflow identified by ``workflow_id``. This will
Expand Down Expand Up @@ -359,6 +361,14 @@ def invoke_workflow(
Galaxy does not have the exact tool versions. Default is ``True``.
Parameter does not any effect for Galaxy versions < 22.05.
:type version: int
:param version: The version of the workflow to invoke. If omitted or
None, the latest workflow version will be invoked.
:type use_cached_job: bool
:param use_cached_job: Whether to use cached jobs for the workflow
invocation.
:rtype: dict
:return: A dict containing the workflow invocation describing the
scheduling of the workflow. For example::
Expand Down Expand Up @@ -467,7 +477,12 @@ def invoke_workflow(
(which is stable across workflow imports) or the step UUID which is
also stable.
"""
payload: Dict[str, Any] = {}
payload: Dict[str, Any] = {
"allow_tool_state_corrections": allow_tool_state_corrections,
"require_exact_tool_versions": require_exact_tool_versions,
"version": version,
"use_cached_job": use_cached_job,
}
if inputs:
payload["inputs"] = inputs

Expand All @@ -483,11 +498,8 @@ def invoke_workflow(
payload["history"] = history_name
if not import_inputs_to_history:
payload["no_add_to_history"] = True
if allow_tool_state_corrections:
payload["allow_tool_state_corrections"] = allow_tool_state_corrections
if inputs_by is not None:
payload["inputs_by"] = inputs_by
payload["require_exact_tool_versions"] = require_exact_tool_versions
if parameters_normalized:
payload["parameters_normalized"] = parameters_normalized
url = self._invocations_url(workflow_id)
Expand Down

0 comments on commit 2882cc4

Please sign in to comment.