Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Collect all task outputs when the workflow output section is undeclared #4602

Merged
merged 3 commits into from
Oct 5, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 26 additions & 12 deletions src/toil/wdl/wdltoil.py
Original file line number Diff line number Diff line change
Expand Up @@ -2284,9 +2284,10 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]:
# Make jobs to run all the parts of the workflow
sink = self.create_subgraph(self._workflow.body, [], bindings)

if self._workflow.outputs:
if self._workflow.outputs != []: # Compare against empty list as None means there should be outputs
# Either the output section is declared and nonempty or it is not declared
# Add evaluating the outputs after the sink
outputs_job = WDLOutputsJob(self._workflow.outputs, sink.rv(), self._execution_dir)
outputs_job = WDLOutputsJob(self._workflow, sink.rv(), self._execution_dir)
sink.addFollowOn(outputs_job)
# Caller is responsible for making sure namespaces are applied
self.defer_postprocessing(outputs_job)
Expand All @@ -2301,31 +2302,44 @@ class WDLOutputsJob(WDLBaseJob):

Returns an environment with just the outputs bound, in no namespace.
"""

def __init__(self, outputs: List[WDL.Tree.Decl], bindings: Promised[WDLBindings], execution_dir: Optional[str] = None, **kwargs: Any):
def __init__(self, workflow: WDL.Tree.Workflow, bindings: Promised[WDLBindings], execution_dir: Optional[str] = None, **kwargs: Any):
"""
Make a new WDLWorkflowOutputsJob for the given workflow, with the given set of bindings after its body runs.
"""
super().__init__(execution_dir, **kwargs)

self._outputs = outputs
self._bindings = bindings
self._workflow = workflow

def run(self, file_store: AbstractFileStore) -> WDLBindings:
"""
Make bindings for the outputs.
"""
super().run(file_store)

# Evaluate all the outputs in the normal, non-task-outputs library context
standard_library = ToilWDLStdLibBase(file_store, self._execution_dir)
# Combine the bindings from the previous job

output_bindings = evaluate_output_decls(self._outputs, unwrap(self._bindings), standard_library)

if self._workflow.outputs is None:
# The output section is not declared
# So get all task outputs and return that
# First get all task output names
output_set = set()
for call in self._workflow.body:
if isinstance(call, WDL.Tree.Call):
for type_binding in call.effective_outputs:
output_set.add(type_binding.name)
# Collect all bindings that are task outputs
output_bindings: WDL.Env.Bindings[WDL.Value.Base] = WDL.Env.Bindings()
for binding in unwrap(self._bindings):
if binding.name in output_set:
# The bindings will already be namespaced with the task namespaces
output_bindings = output_bindings.bind(binding.name, binding.value)
else:
# Output section is declared and is nonempty, so evaluate normally
# Evaluate all the outputs in the normal, non-task-outputs library context
standard_library = ToilWDLStdLibBase(file_store, self._execution_dir)
# Combine the bindings from the previous job
output_bindings = evaluate_output_decls(self._workflow.outputs, unwrap(self._bindings), standard_library)
return self.postprocess(output_bindings)


class WDLRootJob(WDLSectionJob):
"""
Job that evaluates an entire WDL workflow, and returns the workflow outputs
Expand Down