Skip to content

Commit

Permalink
Record NO_REPLACEMENT as step output for unspecified value
Browse files Browse the repository at this point in the history
  • Loading branch information
mvdbeek committed Dec 18, 2024
1 parent 20d904e commit ef27369
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 31 deletions.
19 changes: 17 additions & 2 deletions lib/galaxy/tools/parameters/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
)
from .workflow_utils import (
is_runtime_value,
NO_REPLACEMENT,
runtime_to_json,
)
from .wrapped import flat_to_nested_state
Expand Down Expand Up @@ -180,8 +181,22 @@ def callback_helper(input, input_values, name_prefix, label_prefix, parent_prefi
replace = new_value != no_replacement_value
if replace:
input_values[input.name] = new_value
elif replace_optional_connections and is_runtime_value(value) and hasattr(input, "value"):
input_values[input.name] = input.value
elif replace_optional_connections:
# Only used in workflow context
has_default = hasattr(input, "value")
if new_value is NO_REPLACEMENT:
# NO_REPLACEMENT means value was connected but left unspecified
if has_default:
# Use default if we have one
input_values[input.name] = input.value
else:
# Should fail if input is not optional and does not have default value
# Effectively however depends on parameter implementation.
# We might want to raise an exception here, instead of depending on a tool parameter value error.
input_values[input.name] = None

elif is_runtime_value(value) and has_default:
input_values[input.name] = input.value

def get_current_case(input, input_values):
test_parameter = input.test_param
Expand Down
9 changes: 9 additions & 0 deletions lib/galaxy/tools/parameters/workflow_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
from collections.abc import MutableMapping


class NoReplacement:

def __str__(self):
return "NO_REPLACEMENT singleton"


NO_REPLACEMENT = NoReplacement()


class workflow_building_modes:
DISABLED = False
ENABLED = True
Expand Down
16 changes: 7 additions & 9 deletions lib/galaxy/workflow/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@
from galaxy.tools.parameters.workflow_utils import (
ConnectedValue,
is_runtime_value,
NO_REPLACEMENT,
NoReplacement,
runtime_to_json,
workflow_building_modes,
)
Expand Down Expand Up @@ -129,14 +131,6 @@
POSSIBLE_PARAMETER_TYPES: Tuple[INPUT_PARAMETER_TYPES] = get_args(INPUT_PARAMETER_TYPES)


class NoReplacement:
def __str__(self):
return "NO_REPLACEMENT singleton"


NO_REPLACEMENT = NoReplacement()


class ConditionalStepWhen(BooleanToolParameter):
pass

Expand Down Expand Up @@ -2266,7 +2260,11 @@ def decode_runtime_state(self, step, runtime_state):
)

def execute(
self, trans, progress: "WorkflowProgress", invocation_step, use_cached_job: bool = False
self,
trans,
progress: "WorkflowProgress",
invocation_step: "WorkflowInvocationStep",
use_cached_job: bool = False,
) -> Optional[bool]:
invocation = invocation_step.workflow_invocation
step = invocation_step.workflow_step
Expand Down
6 changes: 2 additions & 4 deletions lib/galaxy/workflow/refactor/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from galaxy.tools.parameters.basic import contains_workflow_parameter
from galaxy.tools.parameters.workflow_utils import (
ConnectedValue,
NO_REPLACEMENT,
runtime_to_json,
)
from .schema import (
Expand Down Expand Up @@ -41,10 +42,7 @@
UpgradeSubworkflowAction,
UpgradeToolAction,
)
from ..modules import (
InputParameterModule,
NO_REPLACEMENT,
)
from ..modules import InputParameterModule

log = logging.getLogger(__name__)

Expand Down
30 changes: 14 additions & 16 deletions lib/galaxy/workflow/run.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import uuid
from collections.abc import MutableMapping
from typing import (
Any,
Dict,
Expand Down Expand Up @@ -37,6 +38,10 @@
WarningReason,
)
from galaxy.tools.parameters.basic import raw_to_galaxy
from galaxy.tools.parameters.workflow_utils import (
NO_REPLACEMENT,
NoReplacement,
)
from galaxy.tools.parameters.wrapped import nested_key_to_path
from galaxy.util import ExecutionTimer
from galaxy.workflow import modules
Expand Down Expand Up @@ -432,11 +437,11 @@ def remaining_steps(

def replacement_for_input(self, trans, step: "WorkflowStep", input_dict: Dict[str, Any]):
replacement: Union[
modules.NoReplacement,
NoReplacement,
model.DatasetCollectionInstance,
List[model.DatasetCollectionInstance],
HistoryItem,
] = modules.NO_REPLACEMENT
] = NO_REPLACEMENT
prefixed_name = input_dict["name"]
multiple = input_dict["multiple"]
is_data = input_dict["input_type"] in ["dataset", "dataset_collection"]
Expand Down Expand Up @@ -494,6 +499,8 @@ def replacement_for_connection(self, connection: "WorkflowStepConnection", is_da
dependent_workflow_step_id=output_step_id,
)
)
if isinstance(replacement, MutableMapping) and replacement.get("__class__") == "NoReplacement":
return NO_REPLACEMENT
if isinstance(replacement, model.HistoryDatasetCollectionAssociation):
if not replacement.collection.populated:
if not replacement.waiting_for_elements:
Expand Down Expand Up @@ -574,19 +581,8 @@ def set_outputs_for_input(
if self.inputs_by_step_id:
step_id = step.id
if step_id not in self.inputs_by_step_id and "output" not in outputs:
default_value = step.get_input_default_value(modules.NO_REPLACEMENT)
if default_value is not modules.NO_REPLACEMENT:
outputs["output"] = default_value
else:
log.error(f"{step.log_str()} not found in inputs_step_id {self.inputs_by_step_id}")
raise modules.FailWorkflowEvaluation(
why=InvocationFailureOutputNotFound(
reason=FailureReason.output_not_found,
workflow_step_id=invocation_step.workflow_step_id,
output_name="output",
dependent_workflow_step_id=invocation_step.workflow_step_id,
)
)
default_value = step.get_input_default_value(NO_REPLACEMENT)
outputs["output"] = default_value
elif step_id in self.inputs_by_step_id:
if self.inputs_by_step_id[step_id] is not None or "output" not in outputs:
outputs["output"] = self.inputs_by_step_id[step_id]
Expand Down Expand Up @@ -620,7 +616,7 @@ def set_step_outputs(
# Add this non-data, non workflow-output output to the workflow outputs.
# This is required for recovering the output in the next scheduling iteration,
# and should be replaced with a WorkflowInvocationStepOutputValue ASAP.
if not workflow_outputs_by_name.get(output_name) and not output_object == modules.NO_REPLACEMENT:
if not workflow_outputs_by_name.get(output_name):
workflow_output = model.WorkflowOutput(step, output_name=output_name)
step.workflow_outputs.append(workflow_output)
for workflow_output in step.workflow_outputs:
Expand All @@ -645,6 +641,8 @@ def set_step_outputs(
)

def _record_workflow_output(self, step: "WorkflowStep", workflow_output: "WorkflowOutput", output: Any) -> None:
if output is NO_REPLACEMENT:
output = {"__class__": "NoReplacement"}
self.workflow_invocation.add_output(workflow_output, step, output)

def mark_step_outputs_delayed(self, step: "WorkflowStep", why: Optional[str] = None) -> None:
Expand Down

0 comments on commit ef27369

Please sign in to comment.