From 62b19d90700b4d10e728bc926b34f2f974f9bb9a Mon Sep 17 00:00:00 2001 From: stxue1 Date: Thu, 31 Aug 2023 16:16:36 -0700 Subject: [PATCH] Fix task inputs string coerce --- src/toil/wdl/wdltoil.py | 138 +++++++++++++++++++++++++--------------- 1 file changed, 87 insertions(+), 51 deletions(-) diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index e85739c850..905eed90e4 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -408,7 +408,7 @@ class ToilWDLStdLibBase(WDL.StdLib.Base): Standard library implementation for WDL as run on Toil. """ - def __init__(self, file_store: AbstractFileStore): + def __init__(self, file_store: AbstractFileStore, execution_dir: Optional[str] = None): """ Set up the standard library. """ @@ -426,6 +426,8 @@ def __init__(self, file_store: AbstractFileStore): # Keep the file store around so we can access files. self._file_store = file_store + self._execution_dir = execution_dir + def _is_url(self, filename: str, schemes: List[str] = ['http:', 'https:', 's3:', 'gs:', TOIL_URI_SCHEME]) -> bool: """ Decide if a filename is a known kind of URL @@ -465,7 +467,12 @@ def _devirtualize_filename(self, filename: str) -> str: result = self._file_store.readGlobalFile(imported) else: # This is a local file - result = filename + # To support relative paths, join the execution dir and filename + # if filename is already an abs path, join() will do nothing + if self._execution_dir is not None: + result = os.path.join(self._execution_dir, filename) + else: + result = filename logger.debug('Devirtualized %s as openable file %s', filename, result) assert os.path.exists(result), f"Virtualized file {filename} looks like a local file but isn't!" @@ -484,7 +491,13 @@ def _virtualize_filename(self, filename: str) -> str: return filename # Otherwise this is a local file and we want to fake it as a Toil file store file - file_id = self._file_store.writeGlobalFile(filename) + + # To support relative paths from execution directory, join the execution dir and filename + # If filename is already an abs path, join() will not do anything + if self._execution_dir is not None: + file_id = self._file_store.writeGlobalFile(os.path.join(self._execution_dir, filename)) + else: + file_id = self._file_store.writeGlobalFile(filename) result = pack_toil_uri(file_id, os.path.basename(filename)) logger.debug('Virtualized %s as WDL file %s', filename, result) return result @@ -716,15 +729,20 @@ def evaluate_decl(node: WDL.Tree.Decl, environment: WDLBindings, stdlib: WDL.Std return evaluate_named_expression(node, node.name, node.type, node.expr, environment, stdlib) -def evaluate_call_inputs(context: Union[WDL.Error.SourceNode, WDL.Error.SourcePosition], expressions: Dict[str, WDL.Expr.Base], environment: WDLBindings, stdlib: WDL.StdLib.Base) -> WDLBindings: +def evaluate_call_inputs(context: Union[WDL.Error.SourceNode, WDL.Error.SourcePosition], expressions: Dict[str, WDL.Expr.Base], environment: WDLBindings, stdlib: WDL.StdLib.Base, inputs_list: Optional[List[WDL.Tree.Decl]] = None) -> WDLBindings: """ Evaluate a bunch of expressions with names, and make them into a fresh set of bindings. """ new_bindings: WDLBindings = WDL.Env.Bindings() + inputs_dict = {e.name: e.type for e in inputs_list or []} for k, v in expressions.items(): # Add each binding in turn - new_bindings = new_bindings.bind(k, evaluate_named_expression(context, k, None, v, environment, stdlib)) + # If the expected type is optional, then don't type check the lhs and rhs as miniwdl will return a StaticTypeMismatch error, so pass in None + expected_type = None + if not v.type.optional: + expected_type = inputs_dict.get(k, None) + new_bindings = new_bindings.bind(k, evaluate_named_expression(context, k, expected_type, v, environment, stdlib)) return new_bindings def evaluate_defaultable_decl(node: WDL.Tree.Decl, environment: WDLBindings, stdlib: WDL.StdLib.Base) -> WDL.Value.Base: @@ -735,7 +753,10 @@ def evaluate_defaultable_decl(node: WDL.Tree.Decl, environment: WDLBindings, std try: if node.name in environment and not isinstance(environment[node.name], WDL.Value.Null): logger.debug('Name %s is already defined with a non-null value, not using default', node.name) - return environment[node.name] + if not isinstance(environment[node.name], type(node.type)): + return environment[node.name].coerce(node.type) + else: + return environment[node.name] else: if node.type is not None and not node.type.optional and node.expr is None: # We need a value for this but there isn't one. @@ -944,7 +965,7 @@ class WDLBaseJob(Job): as the job's run method calls postprocess(). """ - def __init__(self, **kwargs: Any) -> None: + def __init__(self, execution_dir: Optional[str] = None, **kwargs: Any) -> None: """ Make a WDL-related job. @@ -971,6 +992,8 @@ def __init__(self, **kwargs: Any) -> None: # jobs returning other jobs' promised RVs. self._postprocessing_steps: List[Tuple[str, Union[str, Promised[WDLBindings]]]] = [] + self._execution_dir = execution_dir + # TODO: We're not allowed by MyPy to override a method and widen the return # type, so this has to be Any. def run(self, file_store: AbstractFileStore) -> Any: @@ -1462,15 +1485,11 @@ class WDLWorkflowNodeJob(WDLBaseJob): Job that evaluates a WDL workflow node. """ - def __init__(self, node: WDL.Tree.WorkflowNode, prev_node_results: Sequence[Promised[WDLBindings]], namespace: str, **kwargs: Any) -> None: + def __init__(self, node: WDL.Tree.WorkflowNode, prev_node_results: Sequence[Promised[WDLBindings]], namespace: str, execution_dir: Optional[str] = None, **kwargs: Any) -> None: """ Make a new job to run a workflow node to completion. """ - super().__init__(unitName=node.workflow_node_id, displayName=node.workflow_node_id, **kwargs) - # Always run workflow nodes on the leader for issue #4554 - # https://github.com/DataBiosphere/toil/issues/4554 - if 'local' not in kwargs: - kwargs['local'] = True + super().__init__(unitName=node.workflow_node_id, displayName=node.workflow_node_id, execution_dir=execution_dir, **kwargs) self._node = node self._prev_node_results = prev_node_results @@ -1489,7 +1508,7 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: # Combine the bindings we get from previous jobs incoming_bindings = combine_bindings(unwrap_all(self._prev_node_results)) # Set up the WDL standard library - standard_library = ToilWDLStdLibBase(file_store) + standard_library = ToilWDLStdLibBase(file_store, self._execution_dir) with monkeypatch_coerce(standard_library): if isinstance(self._node, WDL.Tree.Decl): # This is a variable assignment @@ -1502,7 +1521,12 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: # Fetch all the inputs we are passing and bind them. # The call is only allowed to use these. logger.debug("Evaluating step inputs") - input_bindings = evaluate_call_inputs(self._node, self._node.inputs, incoming_bindings, standard_library) + if self._node.callee is None: + # This should never be None, but mypy gets unhappy and this is better than an assert + input_decls = None + else: + input_decls = self._node.callee.inputs + input_bindings = evaluate_call_inputs(self._node, self._node.inputs, incoming_bindings, standard_library, input_decls) # Bindings may also be added in from the enclosing workflow inputs # TODO: this is letting us also inject them from the workflow body. @@ -1512,7 +1536,7 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: if isinstance(self._node.callee, WDL.Tree.Workflow): # This is a call of a workflow - subjob: WDLBaseJob = WDLWorkflowJob(self._node.callee, [input_bindings, passed_down_bindings], self._node.callee_id, f'{self._namespace}.{self._node.name}') + subjob: WDLBaseJob = WDLWorkflowJob(self._node.callee, [input_bindings, passed_down_bindings], self._node.callee_id, f'{self._namespace}.{self._node.name}', self._execution_dir) self.addChild(subjob) elif isinstance(self._node.callee, WDL.Tree.Task): # This is a call of a task @@ -1527,14 +1551,14 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: self.defer_postprocessing(subjob) return subjob.rv() elif isinstance(self._node, WDL.Tree.Scatter): - subjob = WDLScatterJob(self._node, [incoming_bindings], self._namespace) + subjob = WDLScatterJob(self._node, [incoming_bindings], self._namespace, self._execution_dir) self.addChild(subjob) # Scatters don't really make a namespace, just kind of a scope? # TODO: Let stuff leave scope! self.defer_postprocessing(subjob) return subjob.rv() elif isinstance(self._node, WDL.Tree.Conditional): - subjob = WDLConditionalJob(self._node, [incoming_bindings], self._namespace) + subjob = WDLConditionalJob(self._node, [incoming_bindings], self._namespace, self._execution_dir) self.addChild(subjob) # Conditionals don't really make a namespace, just kind of a scope? # TODO: Let stuff leave scope! @@ -1550,15 +1574,11 @@ class WDLWorkflowNodeListJob(WDLBaseJob): workflows or tasks or sections. """ - def __init__(self, nodes: List[WDL.Tree.WorkflowNode], prev_node_results: Sequence[Promised[WDLBindings]], namespace: str, **kwargs: Any) -> None: + def __init__(self, nodes: List[WDL.Tree.WorkflowNode], prev_node_results: Sequence[Promised[WDLBindings]], namespace: str, execution_dir: Optional[str] = None, **kwargs: Any) -> None: """ Make a new job to run a list of workflow nodes to completion. """ - super().__init__(unitName=nodes[0].workflow_node_id + '+', displayName=nodes[0].workflow_node_id + '+', **kwargs) - # Always run workflow nodes on the leader for issue #4554 - # https://github.com/DataBiosphere/toil/issues/4554 - if 'local' not in kwargs: - kwargs['local'] = True + super().__init__(unitName=nodes[0].workflow_node_id + '+', displayName=nodes[0].workflow_node_id + '+', execution_dir=execution_dir, **kwargs) self._nodes = nodes self._prev_node_results = prev_node_results @@ -1577,7 +1597,7 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: # Combine the bindings we get from previous jobs current_bindings = combine_bindings(unwrap_all(self._prev_node_results)) # Set up the WDL standard library - standard_library = ToilWDLStdLibBase(file_store) + standard_library = ToilWDLStdLibBase(file_store, self._execution_dir) with monkeypatch_coerce(standard_library): for node in self._nodes: @@ -1756,12 +1776,12 @@ class WDLSectionJob(WDLBaseJob): Job that can create more graph for a section of the wrokflow. """ - def __init__(self, namespace: str, **kwargs: Any) -> None: + def __init__(self, namespace: str, execution_dir: Optional[str] = None, **kwargs: Any) -> None: """ Make a WDLSectionJob where the interior runs in the given namespace, starting with the root workflow. """ - super().__init__(**kwargs) + super().__init__(execution_dir, **kwargs) self._namespace = namespace @staticmethod @@ -1907,10 +1927,10 @@ def get_job_set_any(wdl_ids: Set[str]) -> List[WDLBaseJob]: if len(node_ids) == 1: # Make a one-node job - job: WDLBaseJob = WDLWorkflowNodeJob(section_graph.get(node_ids[0]), rvs, self._namespace) + job: WDLBaseJob = WDLWorkflowNodeJob(section_graph.get(node_ids[0]), rvs, self._namespace, self._execution_dir) else: # Make a multi-node job - job = WDLWorkflowNodeListJob([section_graph.get(node_id) for node_id in node_ids], rvs, self._namespace) + job = WDLWorkflowNodeListJob([section_graph.get(node_id) for node_id in node_ids], rvs, self._namespace, self._execution_dir) for prev_job in prev_jobs: # Connect up the happens-after relationships to make sure the # return values are available. @@ -2007,11 +2027,11 @@ class WDLScatterJob(WDLSectionJob): instance of the body. If an instance of the body doesn't create a binding, it gets a null value in the corresponding array. """ - def __init__(self, scatter: WDL.Tree.Scatter, prev_node_results: Sequence[Promised[WDLBindings]], namespace: str, **kwargs: Any) -> None: + def __init__(self, scatter: WDL.Tree.Scatter, prev_node_results: Sequence[Promised[WDLBindings]], namespace: str, execution_dir: Optional[str] = None, **kwargs: Any) -> None: """ Create a subtree that will run a WDL scatter. The scatter itself and the contents live in the given namespace. """ - super().__init__(namespace, **kwargs, unitName=scatter.workflow_node_id, displayName=scatter.workflow_node_id) + super().__init__(namespace, **kwargs, unitName=scatter.workflow_node_id, displayName=scatter.workflow_node_id, execution_dir=execution_dir) # Because we need to return the return value of the workflow, we need # to return a Toil promise for the last/sink job in the workflow's @@ -2143,11 +2163,11 @@ class WDLConditionalJob(WDLSectionJob): """ Job that evaluates a conditional in a WDL workflow. """ - def __init__(self, conditional: WDL.Tree.Conditional, prev_node_results: Sequence[Promised[WDLBindings]], namespace: str, **kwargs: Any) -> None: + def __init__(self, conditional: WDL.Tree.Conditional, prev_node_results: Sequence[Promised[WDLBindings]], namespace: str, execution_dir: Optional[str] = None, **kwargs: Any) -> None: """ Create a subtree that will run a WDL conditional. The conditional itself and its contents live in the given namespace. """ - super().__init__(namespace, **kwargs, unitName=conditional.workflow_node_id, displayName=conditional.workflow_node_id) + super().__init__(namespace, **kwargs, unitName=conditional.workflow_node_id, displayName=conditional.workflow_node_id, execution_dir=execution_dir) # Once again we need to ship the whole body template to be instantiated # into Toil jobs only if it will actually run. @@ -2194,7 +2214,7 @@ class WDLWorkflowJob(WDLSectionJob): Job that evaluates an entire WDL workflow. """ - def __init__(self, workflow: WDL.Tree.Workflow, prev_node_results: Sequence[Promised[WDLBindings]], workflow_id: List[str], namespace: str, **kwargs: Any) -> None: + def __init__(self, workflow: WDL.Tree.Workflow, prev_node_results: Sequence[Promised[WDLBindings]], workflow_id: List[str], namespace: str, execution_dir: Optional[str] = None, **kwargs: Any) -> None: """ Create a subtree that will run a WDL workflow. The job returns the return value of the workflow. @@ -2202,7 +2222,7 @@ def __init__(self, workflow: WDL.Tree.Workflow, prev_node_results: Sequence[Prom :param namespace: the namespace that the workflow's *contents* will be in. Caller has already added the workflow's own name. """ - super().__init__(namespace, **kwargs) + super().__init__(namespace, execution_dir, **kwargs) # Because we need to return the return value of the workflow, we need # to return a Toil promise for the last/sink job in the workflow's @@ -2231,7 +2251,7 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: # For a task we only see the insode-the-task namespace. bindings = combine_bindings(unwrap_all(self._prev_node_results)) # Set up the WDL standard library - standard_library = ToilWDLStdLibBase(file_store) + standard_library = ToilWDLStdLibBase(file_store, self._execution_dir) if self._workflow.inputs: with monkeypatch_coerce(standard_library): @@ -2244,7 +2264,7 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: if self._workflow.outputs: # Add evaluating the outputs after the sink - outputs_job = WDLOutputsJob(self._workflow.outputs, sink.rv()) + outputs_job = WDLOutputsJob(self._workflow.outputs, sink.rv(), self._execution_dir) sink.addFollowOn(outputs_job) # Caller is responsible for making sure namespaces are applied self.defer_postprocessing(outputs_job) @@ -2260,11 +2280,11 @@ class WDLOutputsJob(WDLBaseJob): Returns an environment with just the outputs bound, in no namespace. """ - def __init__(self, outputs: List[WDL.Tree.Decl], bindings: Promised[WDLBindings], **kwargs: Any): + def __init__(self, outputs: List[WDL.Tree.Decl], bindings: Promised[WDLBindings], execution_dir: Optional[str] = None, **kwargs: Any): """ Make a new WDLWorkflowOutputsJob for the given workflow, with the given set of bindings after its body runs. """ - super().__init__(**kwargs) + super().__init__(execution_dir, **kwargs) self._outputs = outputs self._bindings = bindings @@ -2276,7 +2296,7 @@ def run(self, file_store: AbstractFileStore) -> WDLBindings: super().run(file_store) # Evaluate all the outputs in the normal, non-task-outputs library context - standard_library = ToilWDLStdLibBase(file_store) + standard_library = ToilWDLStdLibBase(file_store, self._execution_dir) output_bindings: WDL.Env.Bindings[WDL.Value.Base] = WDL.Env.Bindings() with monkeypatch_coerce(standard_library): for output_decl in self._outputs: @@ -2284,6 +2304,7 @@ def run(self, file_store: AbstractFileStore) -> WDLBindings: return self.postprocess(output_bindings) + class WDLRootJob(WDLSectionJob): """ Job that evaluates an entire WDL workflow, and returns the workflow outputs @@ -2291,13 +2312,13 @@ class WDLRootJob(WDLSectionJob): the workflow name; both forms are accepted. """ - def __init__(self, workflow: WDL.Tree.Workflow, inputs: WDLBindings, **kwargs: Any) -> None: + def __init__(self, workflow: WDL.Tree.Workflow, inputs: WDLBindings, execution_dir: Optional[str] = None, **kwargs: Any) -> None: """ Create a subtree to run the workflow and namespace the outputs. """ # The root workflow names the root namespace - super().__init__(workflow.name, **kwargs) + super().__init__(workflow.name, execution_dir, **kwargs) self._workflow = workflow self._inputs = inputs @@ -2310,31 +2331,43 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: # Run the workflow. We rely in this to handle entering the input # namespace if needed, or handling free-floating inputs. - workflow_job = WDLWorkflowJob(self._workflow, [self._inputs], [self._workflow.name], self._namespace) + workflow_job = WDLWorkflowJob(self._workflow, [self._inputs], [self._workflow.name], self._namespace, self._execution_dir) workflow_job.then_namespace(self._namespace) self.addChild(workflow_job) self.defer_postprocessing(workflow_job) return workflow_job.rv() -# monkey patch miniwdl's WDL.Value.Base.coerce() function +# Monkey patch miniwdl's WDL.Value.Base.coerce() function # miniwdl recognizes when a string needs to be converted into a file # However miniwdl's string to file conversions is to just store the filepath # Toil needs to virtualize the file into the jobstore # So monkey patch coerce to always virtualize whenever a file is expected # _virtualize_filename should detect if the value is already a file and return immediately if so +# Sometimes string coerce is called instead, so monkeypatch string coerce @contextmanager -def monkeypatch_coerce(standard_library: ToilWDLStdLibBase): - def coerce(self, desired_type: Optional[WDL.Type.Base] = None) -> WDL.Value.Base: +def monkeypatch_coerce(standard_library: ToilWDLStdLibBase) -> Any: + def base_coerce(self: WDL.Value.Base, desired_type: Optional[WDL.Type.Base] = None) -> WDL.Value.Base: if isinstance(desired_type, WDL.Type.File): self.value = standard_library._virtualize_filename(self.value) return self - return old_coerce(self, desired_type) # old_coerce will recurse back into this monkey patched coerce - old_coerce = WDL.Value.Base.coerce + return old_base_coerce(self, desired_type) # old_coerce will recurse back into this monkey patched coerce + def string_coerce(self: WDL.Value.String, desired_type: Optional[WDL.Type.Base] = None) -> WDL.Value.Base: + if isinstance(desired_type, WDL.Type.File) and not isinstance(self, WDL.Type.File): + return WDL.Value.File(standard_library._virtualize_filename(self.value), self.expr) + return old_str_coerce(self, desired_type) + + old_base_coerce = WDL.Value.Base.coerce + old_str_coerce = WDL.Value.String.coerce try: - WDL.Value.Base.coerce = coerce + # Mypy does not like monkeypatching: + # https://github.com/python/mypy/issues/2427#issuecomment-1419206807 + WDL.Value.Base.coerce = base_coerce # type: ignore[method-assign] + WDL.Value.String.coerce = string_coerce # type: ignore[method-assign] yield finally: - WDL.Value.Base.coerce = old_coerce + WDL.Value.Base.coerce = old_base_coerce # type: ignore[method-assign] + WDL.Value.String.coerce = old_str_coerce # type: ignore[method-assign] + def main() -> None: """ @@ -2428,8 +2461,11 @@ def main() -> None: # TODO: Automatically set a good MINIWDL__SINGULARITY__IMAGE_CACHE ? + # Get the execution directory + execution_dir = os.getcwd() + # Run the workflow and get its outputs namespaced with the workflow name. - root_job = WDLRootJob(document.workflow, input_bindings) + root_job = WDLRootJob(document.workflow, input_bindings, execution_dir) output_bindings = toil.start(root_job) assert isinstance(output_bindings, WDL.Env.Bindings)