From b73b9ef53eb914dbd7dbf86c8666c8a572e2eb28 Mon Sep 17 00:00:00 2001 From: stxue1 <122345910+stxue1@users.noreply.github.com> Date: Thu, 5 Oct 2023 10:11:24 -0700 Subject: [PATCH] Make WDLOutputJob collect all task outputs (#4602) Co-authored-by: Adam Novak --- src/toil/wdl/wdltoil.py | 38 ++++++++++++++++++++++++++------------ 1 file changed, 26 insertions(+), 12 deletions(-) diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index f9e0baa256..321c896081 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -2284,9 +2284,10 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: # Make jobs to run all the parts of the workflow sink = self.create_subgraph(self._workflow.body, [], bindings) - if self._workflow.outputs: + if self._workflow.outputs != []: # Compare against empty list as None means there should be outputs + # Either the output section is declared and nonempty or it is not declared # Add evaluating the outputs after the sink - outputs_job = WDLOutputsJob(self._workflow.outputs, sink.rv(), self._execution_dir) + outputs_job = WDLOutputsJob(self._workflow, sink.rv(), self._execution_dir) sink.addFollowOn(outputs_job) # Caller is responsible for making sure namespaces are applied self.defer_postprocessing(outputs_job) @@ -2301,15 +2302,14 @@ class WDLOutputsJob(WDLBaseJob): Returns an environment with just the outputs bound, in no namespace. """ - - def __init__(self, outputs: List[WDL.Tree.Decl], bindings: Promised[WDLBindings], execution_dir: Optional[str] = None, **kwargs: Any): + def __init__(self, workflow: WDL.Tree.Workflow, bindings: Promised[WDLBindings], execution_dir: Optional[str] = None, **kwargs: Any): """ Make a new WDLWorkflowOutputsJob for the given workflow, with the given set of bindings after its body runs. """ super().__init__(execution_dir, **kwargs) - self._outputs = outputs self._bindings = bindings + self._workflow = workflow def run(self, file_store: AbstractFileStore) -> WDLBindings: """ @@ -2317,15 +2317,29 @@ def run(self, file_store: AbstractFileStore) -> WDLBindings: """ super().run(file_store) - # Evaluate all the outputs in the normal, non-task-outputs library context - standard_library = ToilWDLStdLibBase(file_store, self._execution_dir) - # Combine the bindings from the previous job - - output_bindings = evaluate_output_decls(self._outputs, unwrap(self._bindings), standard_library) - + if self._workflow.outputs is None: + # The output section is not declared + # So get all task outputs and return that + # First get all task output names + output_set = set() + for call in self._workflow.body: + if isinstance(call, WDL.Tree.Call): + for type_binding in call.effective_outputs: + output_set.add(type_binding.name) + # Collect all bindings that are task outputs + output_bindings: WDL.Env.Bindings[WDL.Value.Base] = WDL.Env.Bindings() + for binding in unwrap(self._bindings): + if binding.name in output_set: + # The bindings will already be namespaced with the task namespaces + output_bindings = output_bindings.bind(binding.name, binding.value) + else: + # Output section is declared and is nonempty, so evaluate normally + # Evaluate all the outputs in the normal, non-task-outputs library context + standard_library = ToilWDLStdLibBase(file_store, self._execution_dir) + # Combine the bindings from the previous job + output_bindings = evaluate_output_decls(self._workflow.outputs, unwrap(self._bindings), standard_library) return self.postprocess(output_bindings) - class WDLRootJob(WDLSectionJob): """ Job that evaluates an entire WDL workflow, and returns the workflow outputs