From 198b5f6a58caabd0ed3e4b2ca0c760e805321fad Mon Sep 17 00:00:00 2001 From: stxue1 <122345910+stxue1@users.noreply.github.com> Date: Thu, 21 Sep 2023 21:56:24 -0700 Subject: [PATCH 1/4] Remove old buckets from AWS (#4588) * Detect buckets with old naming schemes by accounting for different UUIDs --------- Co-authored-by: Adam Novak --- contrib/admin/cleanup_aws_resources.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/contrib/admin/cleanup_aws_resources.py b/contrib/admin/cleanup_aws_resources.py index 709e78670a..e626d0e4f4 100755 --- a/contrib/admin/cleanup_aws_resources.py +++ b/contrib/admin/cleanup_aws_resources.py @@ -45,7 +45,7 @@ def contains_uuid(string): Determines if a string contains a pattern like: '28064c76-a491-43e7-9b50-da424f920354', which toil uses in its test generated bucket names. """ - return bool(re.compile('[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}').findall(string)) + return bool(re.compile('[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{8,12}').findall(string)) def contains_uuid_with_underscores(string): @@ -61,7 +61,7 @@ def contains_num_only_uuid(string): Determines if a string contains a pattern like: '13614-31311-31347', which toil uses in its test generated sdb domain names. """ - return bool(re.compile('[0-9]{5}-[0-9]{5}-[0-9]{5}').findall(string)) + return bool(re.compile('[0-9]{4,5}-[0-9]{4,5}-[0-9]{4,5}').findall(string)) def contains_toil_test_patterns(string): @@ -69,7 +69,8 @@ def contains_toil_test_patterns(string): def matches(resource_name): - if resource_name.endswith('--files') or resource_name.endswith('--jobs') or resource_name.endswith('_toil'): + if (resource_name.endswith('--files') or resource_name.endswith('--jobs') or resource_name.endswith('_toil') + or resource_name.endswith('--internal') or resource_name.startswith('toil-s3test-')): if contains_toil_test_patterns(resource_name): return resource_name From 765059714517ce0f3ee407b6863e75d57dce6184 Mon Sep 17 00:00:00 2001 From: William Gao Date: Wed, 27 Sep 2023 08:48:35 -0700 Subject: [PATCH 2/4] Update docs to hide Mesos (#4413) * Update docs to hide Mesos * address review comments * remove invisible characters? * replace mesos in more places * Document Kubernetes-managed autoscaling, with in-workflow Mesos autoscaling as deprected * Reword some documentation and messages * Chase out more Mesoses * Don't insist on processes actually running promptly in parallel * Ask for a compatible set of Sphinx packages * Keep back astroid We can't use astroid 3 until sphinx-autoapi releases a fix for https://github.com/readthedocs/sphinx-autoapi/issues/392 --------- Co-authored-by: Adam Novak --- docs/appendices/deploy.rst | 45 +-- .../toilAPIBatchsystem.rst | 8 +- docs/gettingStarted/quickStart.rst | 40 +-- docs/running/cloud/amazon.rst | 292 ++++++++++-------- docs/running/cloud/cloud.rst | 12 +- docs/running/introduction.rst | 13 +- requirements-dev.txt | 10 +- src/toil/test/src/promisedRequirementTest.py | 4 +- src/toil/utils/toilLaunchCluster.py | 13 +- 9 files changed, 235 insertions(+), 202 deletions(-) diff --git a/docs/appendices/deploy.rst b/docs/appendices/deploy.rst index 738eefb232..0502354c90 100644 --- a/docs/appendices/deploy.rst +++ b/docs/appendices/deploy.rst @@ -31,27 +31,27 @@ From here, you can install a project and its dependencies:: $ tree . ├── util - │   ├── __init__.py - │   └── sort - │   ├── __init__.py - │   └── quick.py + │ ├── __init__.py + │ └── sort + │ ├── __init__.py + │ └── quick.py └── workflow ├── __init__.py └── main.py 3 directories, 5 files $ pip install matplotlib - $ cp -R workflow util venv/lib/python2.7/site-packages + $ cp -R workflow util venv/lib/python3.9/site-packages Ideally, your project would have a ``setup.py`` file (see `setuptools`_) which streamlines the installation process:: $ tree . ├── util - │   ├── __init__.py - │   └── sort - │   ├── __init__.py - │   └── quick.py + │ ├── __init__.py + │ └── sort + │ ├── __init__.py + │ └── quick.py ├── workflow │ ├── __init__.py │ └── main.py @@ -70,7 +70,7 @@ both Python and Toil are assumed to be present on the leader and all worker node We can now run our workflow:: - $ python main.py --batchSystem=mesos … + $ python main.py --batchSystem=kubernetes … .. important:: @@ -101,13 +101,13 @@ This scenario applies if the user script imports modules that are its siblings:: $ cd my_project $ ls userScript.py utilities.py - $ ./userScript.py --batchSystem=mesos … + $ ./userScript.py --batchSystem=kubernetes … Here ``userScript.py`` imports additional functionality from ``utilities.py``. Toil detects that ``userScript.py`` has sibling modules and copies them to the workers, alongside the user script. Note that sibling modules will be auto-deployed regardless of whether they are actually imported by the user -script–all .py files residing in the same directory as the user script will +script: all .py files residing in the same directory as the user script will automatically be auto-deployed. Sibling modules are a suitable method of organizing the source code of @@ -134,16 +134,16 @@ The following shell session illustrates this:: $ tree . ├── utils - │   ├── __init__.py - │   └── sort - │   ├── __init__.py - │   └── quick.py + │ ├── __init__.py + │ └── sort + │ ├── __init__.py + │ └── quick.py └── workflow ├── __init__.py └── main.py 3 directories, 5 files - $ python -m workflow.main --batchSystem=mesos … + $ python -m workflow.main --batchSystem=kubernetes … .. _package: https://docs.python.org/2/tutorial/modules.html#packages @@ -168,7 +168,7 @@ could do this:: $ cd my_project $ export PYTHONPATH="$PWD" $ cd /some/other/dir - $ python -m workflow.main --batchSystem=mesos … + $ python -m workflow.main --batchSystem=kubernetes … Also note that the root directory itself must not be package, i.e. must not contain an ``__init__.py``. @@ -193,7 +193,8 @@ replicates ``PYTHONPATH`` from the leader to every worker. Toil Appliance -------------- -The term Toil Appliance refers to the Mesos Docker image that Toil uses to simulate the machines in the virtual mesos -cluster. It's easily deployed, only needs Docker, and allows for workflows to be run in single-machine mode and for -clusters of VMs to be provisioned. To specify a different image, see the Toil :ref:`envars` section. For more -information on the Toil Appliance, see the :ref:`runningAWS` section. +The term Toil Appliance refers to the Ubuntu-based Docker image that Toil uses +for the machines in the cluster. It's easily deployed, only needs Docker, and +allows a consistent environment on all Toil clusters. To specify a different +image, see the Toil :ref:`envars` section. For more information on the Toil +Appliance, see the :ref:`runningAWS` section. diff --git a/docs/developingWorkflows/toilAPIBatchsystem.rst b/docs/developingWorkflows/toilAPIBatchsystem.rst index 000f569383..f105d2f8b5 100644 --- a/docs/developingWorkflows/toilAPIBatchsystem.rst +++ b/docs/developingWorkflows/toilAPIBatchsystem.rst @@ -6,12 +6,12 @@ Batch System API ================ The batch system interface is used by Toil to abstract over different ways of running -batches of jobs, for example Slurm, GridEngine, Mesos, Parasol and a single node. The +batches of jobs, for example on Slurm clusters, Kubernetes clusters, or a single node. The :class:`toil.batchSystems.abstractBatchSystem.AbstractBatchSystem` API is implemented to -run jobs using a given job management system, e.g. Mesos. +run jobs using a given job management system. -Batch System Enivronmental Variables ------------------------------------- +Batch System Environment Variables +---------------------------------- Environmental variables allow passing of scheduler specific parameters. diff --git a/docs/gettingStarted/quickStart.rst b/docs/gettingStarted/quickStart.rst index e56805b29d..9174a8ed2e 100644 --- a/docs/gettingStarted/quickStart.rst +++ b/docs/gettingStarted/quickStart.rst @@ -32,14 +32,14 @@ Toil uses batch systems to manage the jobs it creates. The ``singleMachine`` batch system is primarily used to prepare and debug workflows on a local machine. Once validated, try running them on a full-fledged batch system (see :ref:`batchsysteminterface`). -Toil supports many different batch systems such as `Apache Mesos`_ and Grid Engine; its versatility makes it +Toil supports many different batch systems such as `Kubernetes`_ and Grid Engine; its versatility makes it easy to run your workflow in all kinds of places. Toil is totally customizable! Run ``python helloWorld.py --help`` to see a complete list of available options. For something beyond a "Hello, world!" example, refer to :ref:`runningDetail`. -.. _Apache Mesos: https://mesos.apache.org/getting-started/ +.. _Kubernetes: https://kubernetes.io/ .. _cwlquickstart: @@ -279,7 +279,7 @@ workflow there is always one leader process, and potentially many worker process When using the single-machine batch system (the default), the worker processes will be running on the same machine as the leader process. With full-fledged batch systems like -Mesos the worker processes will typically be started on separate machines. The +Kubernetes the worker processes will typically be started on separate machines. The boilerplate ensures that the pipeline is only started once---on the leader---but not when its job functions are imported and executed on the individual workers. @@ -394,8 +394,10 @@ Also! Remember to use the :ref:`destroyCluster` command when finished to destro #. Launch a cluster in AWS using the :ref:`launchCluster` command:: (venv) $ toil launch-cluster \ + --clusterType kubernetes \ --keyPairName \ --leaderNodeType t2.medium \ + --nodeTypes t2.medium -w 1 \ --zone us-west-2a The arguments ``keyPairName``, ``leaderNodeType``, and ``zone`` are required to launch a cluster. @@ -448,8 +450,10 @@ Also! Remember to use the :ref:`destroyCluster` command when finished to destro #. First launch a node in AWS using the :ref:`launchCluster` command:: (venv) $ toil launch-cluster \ + --clusterType kubernetes \ --keyPairName \ --leaderNodeType t2.medium \ + --nodeTypes t2.medium -w 1 \ --zone us-west-2a #. Copy ``example.cwl`` and ``example-job.yaml`` from the :ref:`CWL example ` to the node using @@ -462,24 +466,25 @@ Also! Remember to use the :ref:`destroyCluster` command when finished to destro (venv) $ toil ssh-cluster --zone us-west-2a -#. Once on the leader node, it's a good idea to update and install the following:: +#. Once on the leader node, command line tools such as ``kubectl`` will be available to you. It's also a good idea to + update and install the following:: sudo apt-get update sudo apt-get -y upgrade sudo apt-get -y dist-upgrade sudo apt-get -y install git - sudo pip install mesos.cli #. Now create a new ``virtualenv`` with the ``--system-site-packages`` option and activate:: virtualenv --system-site-packages venv source venv/bin/activate -#. Now run the CWL workflow:: +#. Now run the CWL workflow with the Kubernetes batch system:: (venv) $ toil-cwl-runner \ --provisioner aws \ - --jobStore aws:us-west-2a:any-name \ + --batchSystem kubernetes \ + --jobStore aws:us-west-2:any-name \ /tmp/example.cwl /tmp/example-job.yaml .. tip:: @@ -528,12 +533,14 @@ Also! Remember to use the :ref:`destroyCluster` command when finished to destro #. Download :download:`pestis.tar.gz <../../src/toil/test/cactus/pestis.tar.gz>` -#. Launch a leader node using the :ref:`launchCluster` command:: +#. Launch a cluster using the :ref:`launchCluster` command:: (venv) $ toil launch-cluster \ --provisioner \ --keyPairName \ --leaderNodeType \ + --nodeType \ + -w 1-2 \ --zone @@ -579,13 +586,9 @@ Also! Remember to use the :ref:`destroyCluster` command when finished to destro #. Run `Cactus `__ as an autoscaling workflow:: - (cact_venv) $ TOIL_APPLIANCE_SELF=quay.io/ucsc_cgl/toil:3.14.0 cactus \ - --provisioner \ - --nodeType \ - --maxNodes 2 \ - --minNodes 0 \ + (cact_venv) $ cactus \ --retry 10 \ - --batchSystem mesos \ + --batchSystem kubernetes \ --logDebug \ --logFile /logFile_pestis3 \ --configFile \ @@ -597,15 +600,6 @@ Also! Remember to use the :ref:`destroyCluster` command when finished to destro **Pieces of the Puzzle**: - ``TOIL_APPLIANCE_SELF=quay.io/ucsc_cgl/toil:3.14.0`` --- specifies the version of Toil being used, 3.14.0; - if the latest one is desired, please eliminate. - - ``--nodeType`` --- determines the instance type used for worker nodes. The instance type specified here must be on - the same cloud provider as the one specified with ``--leaderNodeType`` - - ``--maxNodes 2`` --- creates up to two instances of the type specified with ``--nodeType`` and - launches Mesos worker containers inside them. - ``--logDebug`` --- equivalent to ``--logLevel DEBUG``. ``--logFile /logFile_pestis3`` --- writes logs in a file named `logFile_pestis3` under ``/`` folder. diff --git a/docs/running/cloud/amazon.rst b/docs/running/cloud/amazon.rst index 8f975df197..f3361a191d 100644 --- a/docs/running/cloud/amazon.rst +++ b/docs/running/cloud/amazon.rst @@ -88,32 +88,35 @@ during the computation of a workflow, first set up and configure an account with This will create the files `~/.aws/config` and `~/.aws/credentials`. -#. If not done already, install toil (example uses version 5.3.0, but we recommend the latest release): :: +#. If not done already, install toil (example uses version 5.12.0, but we recommend the latest release): :: $ virtualenv venv $ source venv/bin/activate - $ pip install toil[all]==5.3.0 + $ pip install toil[all]==5.12.0 #. Now that toil is installed and you are running a virtualenv, an example of launching a toil leader node would be the following (again, note that we set TOIL_APPLIANCE_SELF to toil version 5.3.0 in this example, but please set the version to the installed version that you are using if you're using a different version): :: - $ TOIL_APPLIANCE_SELF=quay.io/ucsc_cgl/toil:5.3.0 \ - toil launch-cluster clustername \ + $ toil launch-cluster \ + --clusterType kubernetes \ --leaderNodeType t2.medium \ + --nodeTypes t2.medium -w 1 \ --zone us-west-1a \ --keyPairName id_rsa To further break down each of these commands: - **TOIL_APPLIANCE_SELF=quay.io/ucsc_cgl/toil:latest** --- This is optional. It specifies a mesos docker image that we maintain with the latest version of toil installed on it. If you want to use a different version of toil, please specify the image tag you need from https://quay.io/repository/ucsc_cgl/toil?tag=latest&tab=tags. - **toil launch-cluster** --- Base command in toil to launch a cluster. - **clustername** --- Just choose a name for your cluster. + **** --- Just choose a name for your cluster. + + **--clusterType kubernetes** --- Specify the type of cluster to coordinate and execute your workflow. Kubernetes is the recommended option. **--leaderNodeType t2.medium** --- Specify the leader node type. Make a t2.medium (2CPU; 4Gb RAM; $0.0464/Hour). List of available AWS instances: https://aws.amazon.com/ec2/pricing/on-demand/ + **--nodeTypes t2.medium -w 1** --- Specify the worker node type and the number of worker nodes to launch. The Kubernetes cluster requires at least 1 worker node. + **--zone us-west-1a** --- Specify the AWS zone you want to launch the instance in. Must have the same prefix as the zone in your awscli credentials (which, in the example of this tutorial is: "us-west-1"). **--keyPairName id_rsa** --- The name of your key pair, which should be "id_rsa" if you've followed this tutorial. @@ -124,12 +127,15 @@ To further break down each of these commands: For example, if you ``export TOIL_AWS_TAGS='{"project-name": "variant-calling"}'`` in your shell before using Toil, AWS resources created by Toil will be tagged with a ``project-name`` tag with the value ``variant-calling``. + You can also set the ``TOIL_APPLIANCE_SELF`` environment variable to one of the `Toil project's Docker images`_, if you would like to launch a cluster using a different version of Toil than the one you have installed. + .. _AWS account: https://aws.amazon.com/premiumsupport/knowledge-center/create-and-activate-aws-account/ .. _key pair: http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-key-pairs.html .. _Amazon's instructions : http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-key-pairs.html#how-to-generate-your-own-key-and-import-it-to-aws .. _install: http://docs.aws.amazon.com/cli/latest/userguide/installing.html .. _configure: http://docs.aws.amazon.com/cli/latest/userguide/cli-chap-getting-started.html .. _blog instructions: https://toilpipelines.wordpress.com/2018/01/18/running-toil-autoscaling-with-aws/ +.. _Toil project's Docker images: https://quay.io/repository/ucsc_cgl/toil?tag=latest&tab=tags .. _awsJobStore: @@ -148,8 +154,9 @@ To run the sort example :ref:`sort example ` with the AWS job store Toil Provisioner ---------------- -The Toil provisioner is included in Toil alongside the ``[aws]`` extra and -allows us to spin up a cluster. +The Toil provisioner is the component responsible for creating resources in +Amazon's cloud. It is included in Toil alongside the ``[aws]`` extra and allows +us to spin up a cluster. Getting started with the provisioner is simple: @@ -161,8 +168,8 @@ Getting started with the provisioner is simple: setting up your AWS credentials follow instructions `here `__. -The Toil provisioner is built around the Toil Appliance, a Docker image that bundles -Toil and all its requirements (e.g. Mesos). This makes deployment simple across +The Toil provisioner makes heavy use of the Toil Appliance, a Docker image that bundles +Toil and all its requirements (e.g. Kubernetes). This makes deployment simple across platforms, and you can even simulate a cluster locally (see :ref:`appliance_dev` for details). .. admonition:: Choosing Toil Appliance Image @@ -182,12 +189,14 @@ Details about Launching a Cluster in AWS ---------------------------------------- Using the provisioner to launch a Toil leader instance is simple using the ``launch-cluster`` command. For example, -to launch a cluster named "my-cluster" with a t2.medium leader in the us-west-2a zone, run :: +to launch a Kubernetes cluster named "my-cluster" with a t2.medium leader in the us-west-2a zone, run :: (venv) $ toil launch-cluster my-cluster \ + --clusterType kubernetes \ --leaderNodeType t2.medium \ + --nodeTypes t2.medium -w 1 \ --zone us-west-2a \ - --keyPairName + --keyPairName The cluster name is used to uniquely identify your cluster and will be used to populate the instance's ``Name`` tag. Also, the Toil provisioner will @@ -234,9 +243,12 @@ change. This is in contrast with :ref:`Autoscaling`. To launch worker nodes alongside the leader we use the ``-w`` option:: (venv) $ toil launch-cluster my-cluster \ + --clusterType kubernetes \ --leaderNodeType t2.small -z us-west-2a \ - --keyPairName your-AWS-key-pair-name \ - --nodeTypes m3.large,t2.micro -w 1,4 + --keyPairName \ + --nodeTypes m3.large,t2.micro -w 1,4 \ + --zone us-west-2a + This will spin up a leader node of type t2.small with five additional workers --- one m3.large instance and four t2.micro. @@ -260,129 +272,24 @@ look like :: section for a detailed explanation on how to include them. .. _Autoscaling: +.. _ProvisioningWithKubernetes: Running a Workflow with Autoscaling ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -Autoscaling is a feature of running Toil in a cloud whereby additional cloud instances are launched to run the workflow. -Autoscaling leverages Mesos containers to provide an execution environment for these workflows. +Toil can create an autoscaling Kubernetes cluster for you using the AWS +provisioner. Autoscaling is a feature of running Toil in a cloud whereby +additional cloud instances are launched as needed to run the workflow. .. note:: Make sure you've done the AWS setup in :ref:`prepareAWS`. -#. Download :download:`sort.py <../../../src/toil/test/sort/sort.py>` - -#. Launch the leader node in AWS using the :ref:`launchCluster` command: :: - - (venv) $ toil launch-cluster \ - --keyPairName \ - --leaderNodeType t2.medium \ - --zone us-west-2a - -#. Copy the ``sort.py`` script up to the leader node: :: - - (venv) $ toil rsync-cluster -z us-west-2a sort.py :/root - -#. Login to the leader node: :: - - (venv) $ toil ssh-cluster -z us-west-2a - -#. Run the script as an autoscaling workflow: :: - - $ python /root/sort.py aws:us-west-2: \ - --provisioner aws \ - --nodeTypes c3.large \ - --maxNodes 2 \ - --batchSystem mesos - -.. note:: - - In this example, the autoscaling Toil code creates up to two instances of type `c3.large` and launches Mesos - slave containers inside them. The containers are then available to run jobs defined by the `sort.py` script. - Toil also creates a bucket in S3 called `aws:us-west-2:autoscaling-sort-jobstore` to store intermediate job - results. The Toil autoscaler can also provision multiple different node types, which is useful for workflows - that have jobs with varying resource requirements. For example, one could execute the script with - ``--nodeTypes c3.large,r3.xlarge --maxNodes 5,1``, which would allow the provisioner to create up to five - c3.large nodes and one r3.xlarge node for memory-intensive jobs. In this situation, the autoscaler would avoid - creating the more expensive r3.xlarge node until needed, running most jobs on the c3.large nodes. - -#. View the generated file to sort:: - - $ head fileToSort.txt - -#. View the sorted file:: - - $ head sortedFile.txt - -For more information on other autoscaling (and other) options have a look at :ref:`workflowOptions` and/or run :: - - $ python my-toil-script.py --help - -.. important:: - - Some important caveats about starting a toil run through an ssh session are - explained in the :ref:`sshCluster` section. - -Preemptibility -^^^^^^^^^^^^^^ - -Toil can run on a heterogeneous cluster of both preemptible and non-preemptible nodes. Being a preemptible node simply -means that the node may be shut down at any time, while jobs are running. These jobs can then be restarted later -somewhere else. - -A node type can be specified as preemptible by adding a `spot bid`_ to its entry in the list of node types provided with -the ``--nodeTypes`` flag. If spot instance prices rise above your bid, the preemptible node whill be shut down. - -Individual jobs can explicitly specify whether they should be run on preemptible nodes via the boolean ``preemptible`` -resource requirement in Toil's Python API. In CWL, this is `exposed as a hint`__ ``UsePreemptible`` in the -``http://arvados.org/cwl#`` namespace (usually imported as ``arv``). In WDL, this is `exposed as a runtime attribute`___ -``preemptible`` as recognized by Cromwell. - -If a job is not specified to be preemptible, the job will not run on preemptible nodes even if preemptible nodes -are available, unless the workflow is run with the ``--defaultPreemptible`` flag. The ``--defaultPreemptible`` flag will allow -jobs without a ``preemptible`` requirement to run on preemptible machines. For example:: - - $ python /root/sort.py aws:us-west-2: \ - --provisioner aws \ - --nodeTypes c3.4xlarge:2.00 \ - --maxNodes 2 \ - --batchSystem mesos \ - --defaultPreemptible - -.. admonition:: Specify Preemptibility Carefully - - Ensure that your choices for ``--nodeTypes`` and ``--maxNodes <>`` make - sense for your workflow and won't cause it to hang. You should make sure the - provisioner is able to create nodes large enough to run the largest job - in the workflow, and that non-preemptible node types are allowed if there are - non-preemptible jobs in the workflow. - -Finally, the ``--preemptibleCompensation`` flag can be used to handle cases where preemptible nodes may not be -available but are required for your workflow. With this flag enabled, the autoscaler will attempt to compensate -for a shortage of preemptible nodes of a certain type by creating non-preemptible nodes of that type, if -non-preemptible nodes of that type were specified in ``--nodeTypes``. - -.. _spot bid: https://aws.amazon.com/ec2/spot/pricing/ - -.. __exposed as a hint: https://doc.arvados.org/user/cwl/cwl-extensions.html - -.. ___exposed as a runtime attribute: https://cromwell.readthedocs.io/en/stable/RuntimeAttributes/#preemptible - - -.. _ProvisioningWithKubernetes: - -Provisioning with a Kubernetes cluster -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -If you don't have an existing Kubernetes cluster but still want to use -Kubernetes to orchestrate jobs, Toil can create a Kubernetes cluster for you -using the AWS provisioner. - -By default, the ``toil launch-cluster`` command uses a Mesos cluster as the -jobs scheduler. Toil can also create a Kubernetes cluster to schedule Toil -jobs. To set up a Kubernetes cluster, simply add the ``--clusterType=kubernetes`` -command line option to ``toil launch-cluster``. +To set up a Kubernetes cluster, simply use the ``--clusterType=kubernetes`` +command line option to ``toil launch-cluster``. To make it autoscale, specify a +range of possible node counts for a node type (such as ``-w 1-4``). The cluster +will automatically add and remove nodes, within that range, depending on how +many seem to be needed to run the jobs submitted to the cluster. For example, to launch a Toil cluster with a Kubernetes scheduler, run: :: @@ -390,14 +297,14 @@ For example, to launch a Toil cluster with a Kubernetes scheduler, run: :: --provisioner=aws \ --clusterType kubernetes \ --zone us-west-2a \ - --keyPairName wlgao@ucsc.edu \ + --keyPairName \ --leaderNodeType t2.medium \ --leaderStorage 50 \ --nodeTypes t2.medium -w 1-4 \ --nodeStorage 20 \ --logDebug -Behind the scenes, Toil installs kubeadm and configures kubelet on the Toil +Behind the scenes, Toil installs kubeadm and configures the kubelet on the Toil leader and all worker nodes. This Toil cluster can then schedule jobs using Kubernetes. @@ -405,9 +312,7 @@ Kubernetes. You should set at least one worker node, otherwise Kubernetes would not be able to schedule any jobs. It is also normal for this step to take a while. - -Below is a tutorial on how to launch a Toil job on this newly created cluster. -As a demostration, we will use :download:`sort.py <../../../src/toil/test/sort/sort.py>` +As a demonstration, we will use :download:`sort.py <../../../src/toil/test/sort/sort.py>` again, but run it on a Toil cluster with Kubernetes. First, download this file and put it to the current working directory. @@ -422,6 +327,11 @@ free to use your own cluster configuration and/or workflow files. For more information on this step, see the corresponding section of the :ref:`StaticProvisioning` tutorial. +.. important:: + + Some important caveats about starting a toil run through an ssh session are + explained in the :ref:`sshCluster` section. + Now that we are inside the cluster, a Kubernetes environment should already be configured and running. To verify this, simply run: :: @@ -451,7 +361,6 @@ are good to start running workflows. :: Now we can run the workflow: :: $ python sort.py \ - --provisioner aws --batchSystem kubernetes \ aws:: @@ -487,6 +396,57 @@ If everything is successful, you should be able to see an output file from the s You can now run your own workflows! +Preemptibility +^^^^^^^^^^^^^^ + +Toil can run on a heterogeneous cluster of both preemptible and non-preemptible nodes. Being a preemptible node simply +means that the node may be shut down at any time, while jobs are running. These jobs can then be restarted later +somewhere else. + +A node type can be specified as preemptible by adding a `spot bid`_ in dollars, after a colon, to its entry in the list of node types provided with +the ``--nodeTypes`` flag. If spot instance prices rise above your bid, the preemptible nodes will be shut down. + +For example, this cluster will have both preemptible and non-preemptible nodes: :: + + (venv) $ toil launch-cluster \ + --provisioner=aws \ + --clusterType kubernetes \ + --zone us-west-2a \ + --keyPairName \ + --leaderNodeType t2.medium \ + --leaderStorage 50 \ + --nodeTypes t2.medium -w 1-4 \ + --nodeTypes t2.large:0.20 -w 1-4 \ + --nodeStorage 20 \ + --logDebug + +Individual jobs can explicitly specify whether they should be run on preemptible nodes via the boolean ``preemptible`` +resource requirement in Toil's Python API. In CWL, this is `exposed as a hint`__ ``UsePreemptible`` in the +``http://arvados.org/cwl#`` namespace (usually imported as ``arv``). In WDL, this is `exposed as a runtime attribute`___ +``preemptible`` as recognized by Cromwell. Toil's Kubernetes batch system will prefer to schedule preemptible jobs +on preemptible nodes. + +If a job is not specified to be preemptible, the job will not run on preemptible nodes even if preemptible nodes +are available, unless the workflow is run with the ``--defaultPreemptible`` flag. The ``--defaultPreemptible`` flag will allow +jobs without an explicit ``preemptible`` requirement to run on preemptible machines. For example:: + + $ python /root/sort.py aws:us-west-2: \ + --batchSystem kubernetes \ + --defaultPreemptible + +.. admonition:: Specify Preemptibility Carefully + + Ensure that your choices for ``--nodeTypes`` and ``--maxNodes <>`` make + sense for your workflow and won't cause it to hang. You should make sure the + provisioner is able to create nodes large enough to run the largest job + in the workflow, and that non-preemptible node types are allowed if there are + non-preemptible jobs in the workflow. + +.. _spot bid: https://aws.amazon.com/ec2/spot/pricing/ + +.. __exposed as a hint: https://doc.arvados.org/user/cwl/cwl-extensions.html + +.. ___exposed as a runtime attribute: https://cromwell.readthedocs.io/en/stable/RuntimeAttributes/#preemptible Using MinIO and S3-Compatible object stores ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -509,6 +469,68 @@ Examples:: .. _S3-compatible object store: https://en.wikipedia.org/wiki/Amazon_S3#S3_API_and_competing_services .. _MinIO: https://min.io/ +In-Workflow Autoscaling with Mesos +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Instead of the normal Kubernetes-based autoscaling, you can also use Toil's old +Mesos-based autoscaling method, where the scaling logic runs inside the Toil +workflow. With this approach, a Toil cluster can only run one workflow at a +time. This method also does not work on the ARM architecture. + +In this mode, the ``--preemptibleCompensation`` flag can be used to handle cases where preemptible nodes may not be +available but are required for your workflow. With this flag enabled, the autoscaler will attempt to compensate +for a shortage of preemptible nodes of a certain type by creating non-preemptible nodes of that type, if +non-preemptible nodes of that type were specified in ``--nodeTypes``. + +.. note:: + + This approach is deprecated, because the Mesos project is no longer publishing up-to-date builds. + +#. Download :download:`sort.py <../../../src/toil/test/sort/sort.py>` + +#. Launch a Mesos leader node in AWS using the :ref:`launchCluster` command, without using any ranges of node counts: :: + + (venv) $ toil launch-cluster \ + --clusterType mesos \ + --keyPairName \ + --leaderNodeType t2.medium \ + --zone us-west-2a + +#. Copy the ``sort.py`` script up to the leader node: :: + + (venv) $ toil rsync-cluster -z us-west-2a sort.py :/root + +#. Login to the leader node: :: + + (venv) $ toil ssh-cluster -z us-west-2a + +#. Run the script as an autoscaling workflow, specifying a provisioner and node types and counts as workflow arguments: :: + + $ python /root/sort.py aws:us-west-2: \ + --provisioner aws \ + --nodeTypes c3.large \ + --maxNodes 2 \ + --batchSystem mesos + +.. note:: + + In this example, the autoscaling Toil code creates up to two instances of type `c3.large` and launches Mesos + agent containers inside them. The containers are then available to run jobs defined by the `sort.py` script. + Toil also creates a bucket in S3 called `aws:us-west-2:autoscaling-sort-jobstore` to store intermediate job + results. The Toil autoscaler can also provision multiple different node types, which is useful for workflows + that have jobs with varying resource requirements. For example, one could execute the script with + ``--nodeTypes c3.large,r3.xlarge --maxNodes 5,1``, which would allow the provisioner to create up to five + c3.large nodes and one r3.xlarge node for memory-intensive jobs. In this situation, the autoscaler would avoid + creating the more expensive r3.xlarge node until needed, running most jobs on the c3.large nodes. + +#. View the generated file to sort:: + + $ head fileToSort.txt + +#. View the sorted file:: + + $ head sortedFile.txt + Dashboard --------- diff --git a/docs/running/cloud/cloud.rst b/docs/running/cloud/cloud.rst index 65cd6cc617..a567d2a079 100644 --- a/docs/running/cloud/cloud.rst +++ b/docs/running/cloud/cloud.rst @@ -7,11 +7,12 @@ Running in the Cloud Toil supports Amazon Web Services (AWS) and Google Compute Engine (GCE) in the cloud and has autoscaling capabilities that can adapt to the size of your workflow, whether your workflow requires 10 instances or 20,000. -Toil does this by creating a virtual cluster with `Apache Mesos`_. `Apache Mesos`_ requires a leader node to coordinate -the workflow, and worker nodes to execute the various tasks within the workflow. As the workflow runs, Toil will -"autoscale", creating and terminating workers as needed to meet the demands of the workflow. +Toil does this by creating a virtual cluster running `Kubernetes`_. Kubernetes requires a leader node to coordinate +the workflow, and worker nodes to execute the various tasks within the workflow. As the workflow runs, Kubernetes will +"autoscale", creating and terminating workers as needed to meet the demands of the workflow. Historically, Toil has +spun up clusters with `Apache Mesos`_, but it is no longer recommended. -Once a user is familiar with the basics of running toil locally (specifying a :ref:`jobStore `, and +Once a user is familiar with the basics of running Toil locally (specifying a :ref:`jobStore `, and how to write a toil script), they can move on to the guides below to learn how to translate these workflows into cloud ready workflows. @@ -25,12 +26,13 @@ distributed over several nodes. The provisioner also has the ability to automati the cluster to handle dynamic changes in computational demand (autoscaling). Currently we have working provisioners with AWS and GCE (Azure support has been deprecated). -Toil uses `Apache Mesos`_ as the :ref:`batchSystemOverview`. +Toil uses `Kubernetes`_ as the :ref:`batchSystemOverview`. See here for instructions for :ref:`runningAWS`. See here for instructions for :ref:`runningGCE`. +.. _Kubernetes: https://kubernetes.io/ .. _Apache Mesos: https://mesos.apache.org/gettingstarted/ .. _cloudJobStore: diff --git a/docs/running/introduction.rst b/docs/running/introduction.rst index 43b4bb619c..fa781a59e3 100644 --- a/docs/running/introduction.rst +++ b/docs/running/introduction.rst @@ -12,7 +12,7 @@ Toil is built in a modular way so that it can be used on lots of different syste The three configurable pieces are the - :ref:`jobStoreInterface`: A filepath or url that can host and centralize all files for a workflow (e.g. a local folder, or an AWS s3 bucket url). - - :ref:`batchSystemInterface`: Specifies either a local single-machine or a currently supported HPC environment (lsf, parasol, mesos, slurm, torque, htcondor, kubernetes, or grid_engine). Mesos is a special case, and is launched for cloud environments. + - :ref:`batchSystemInterface`: Specifies either a local single-machine or a currently supported HPC environment (lsf, parasol, mesos, slurm, torque, htcondor, kubernetes, or grid_engine). - :ref:`provisionerOverview`: For running in the cloud only. This specifies which cloud provider provides instances to do the "work" of your workflow. .. _jobStoreOverview: @@ -52,11 +52,12 @@ worker machines all running jobs that need to access the job store. Batch System ------------ -A Toil batch system is either a local single-machine (one computer) or a currently supported -HPC cluster of computers (lsf, parasol, mesos, slurm, torque, htcondor, or grid_engine). Mesos -is a special case, and is launched for cloud environments. These environments manage individual -worker nodes under a leader node to process the work required in a workflow. The leader and its -workers all coordinate their tasks and files through a centralized job store location. +A Toil batch system is either a local single-machine (one computer) or a +currently supported cluster of computers (lsf, parasol, mesos, slurm, torque, +htcondor, or grid_engine) These environments manage individual worker nodes +under a leader node to process the work required in a workflow. The leader and +its workers all coordinate their tasks and files through a centralized job +store location. See :ref:`batchSystemInterface` for a more detailed description of different batch systems. diff --git a/requirements-dev.txt b/requirements-dev.txt index e053074df2..11ab916305 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -4,10 +4,12 @@ pytest-cov>=2.12.1,<5 pytest-timeout>=1.4.2,<3 stubserver>=1.1,<2 setuptools>=65.5.1,<69 -sphinx>=4,<6 -sphinx-autoapi -sphinx-autodoc-typehints -sphinxcontrib-autoprogram +sphinx>=7,<8 +sphinx-autoapi>=2.1.1,<3 +# astroid 3 won't work until some sphinx-autoapi release after 2.1.1 +astroid>=2.15,<3 +sphinx-autodoc-typehints>=1.24.0,<2 +sphinxcontrib-autoprogram==0.1.8 cwltest>=2.2.20211116163652 mypy==1.5.1 types-requests diff --git a/src/toil/test/src/promisedRequirementTest.py b/src/toil/test/src/promisedRequirementTest.py index f29b76adf8..de1b1e8dbd 100644 --- a/src/toil/test/src/promisedRequirementTest.py +++ b/src/toil/test/src/promisedRequirementTest.py @@ -48,7 +48,7 @@ def testConcurrencyDynamic(self): cores=1, memory='1M', disk='1M') values = Job.Runner.startToil(root, self.getOptions(tempDir)) maxValue = max(values) - self.assertEqual(maxValue, self.cpuCount // coresPerJob) + self.assertLessEqual(maxValue, self.cpuCount // coresPerJob) @slow @retry_flaky_test(prepare=[batchSystemTest.hidden.AbstractBatchSystemJobTest.tearDown, @@ -74,7 +74,7 @@ def testConcurrencyStatic(self): disk='1M')) Job.Runner.startToil(root, self.getOptions(tempDir)) _, maxValue = batchSystemTest.getCounters(counterPath) - self.assertEqual(maxValue, self.cpuCount // coresPerJob) + self.assertLessEqual(maxValue, self.cpuCount // coresPerJob) def getOptions(self, tempDir, caching=True): options = super().getOptions(tempDir) diff --git a/src/toil/utils/toilLaunchCluster.py b/src/toil/utils/toilLaunchCluster.py index 27bea8ad77..0c367d2c88 100644 --- a/src/toil/utils/toilLaunchCluster.py +++ b/src/toil/utils/toilLaunchCluster.py @@ -39,7 +39,8 @@ def create_tags_dict(tags: List[str]) -> Dict[str, str]: def main() -> None: parser = parser_with_common_options(provisioner_options=True, jobstore_option=False) parser.add_argument("-T", "--clusterType", dest="clusterType", - choices=['mesos', 'kubernetes'], default='mesos', + choices=['mesos', 'kubernetes'], + default=None, # TODO: change default to "kubernetes" when we are ready. help="Cluster scheduler to use.") parser.add_argument("--leaderNodeType", dest="leaderNodeType", required=True, help="Non-preemptible node type to use for the cluster leader.") @@ -160,6 +161,16 @@ def main() -> None: raise RuntimeError(f'Please provide a value for --zone or set a default in the ' f'TOIL_{options.provisioner.upper()}_ZONE environment variable.') + if options.clusterType == "mesos": + logger.warning('You are using a Mesos cluster, which is no longer recommended as Toil is ' + 'transitioning to Kubernetes-based clusters. Consider switching to ' + '--clusterType=kubernetes instead.') + + if options.clusterType is None: + logger.warning('Argument --clusterType is not set... using "mesos". ' + 'In future versions of Toil, the default cluster scheduler will be ' + 'set to "kubernetes" if the cluster type is not specified.') + options.clusterType = "mesos" logger.info('Creating cluster %s...', options.clusterName) From f1055a332c2320a58103e6d8da057be0cfc9562b Mon Sep 17 00:00:00 2001 From: Adam Novak Date: Fri, 29 Sep 2023 12:42:53 -0400 Subject: [PATCH 3/4] Avoid concurrent modification in cluster scaler tests (#4600) This will fix #4599 by making the mock leader thread safe. --- src/toil/test/provisioners/clusterScalerTest.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/toil/test/provisioners/clusterScalerTest.py b/src/toil/test/provisioners/clusterScalerTest.py index 7b19c75848..bea8670c1c 100644 --- a/src/toil/test/provisioners/clusterScalerTest.py +++ b/src/toil/test/provisioners/clusterScalerTest.py @@ -873,7 +873,9 @@ def getNumberOfJobsIssued(self, preemptible=None): return self.jobQueue.qsize() def getJobs(self): - return self.jobBatchSystemIDToIssuedJob.values() + # jobBatchSystemIDToIssuedJob may be modified while we are working. + # So copy it. + return dict(self.jobBatchSystemIDToIssuedJob).values() # AbstractScalableBatchSystem functionality def getNodes(self, preemptible: Optional[bool] = False, timeout: int = 600): From 9b09d62197d5ecf7251b893576921d397555b89b Mon Sep 17 00:00:00 2001 From: stxue1 <122345910+stxue1@users.noreply.github.com> Date: Fri, 29 Sep 2023 13:52:37 -0700 Subject: [PATCH 4/4] Add String to File functionality into toil-wdl-runner (#4589) * monkeypatch coerce for workflow related nodes * Fix task inputs string coerce * Disable kubernetes * Comment out cwl kubernetes * Maybe markers are wrong and comment out cactus-on-kubernetes * Add docstrings to changed functions + change input list to dict * Deal with nonetype --------- Co-authored-by: Adam Novak --- src/toil/wdl/wdltoil.py | 272 +++++++++++++++++++++++++--------------- 1 file changed, 172 insertions(+), 100 deletions(-) diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index b898988691..764b501c5b 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -31,9 +31,10 @@ import tempfile import uuid -from contextlib import ExitStack +from contextlib import ExitStack, contextmanager from graphlib import TopologicalSorter -from typing import cast, Any, Callable, Union, Dict, List, Optional, Set, Sequence, Tuple, Type, TypeVar, Iterator +from typing import cast, Any, Callable, Union, Dict, List, Optional, Set, Sequence, Tuple, Type, TypeVar, Iterator, \ + Generator from urllib.parse import urlsplit, urljoin, quote, unquote import WDL @@ -428,7 +429,7 @@ class ToilWDLStdLibBase(WDL.StdLib.Base): Standard library implementation for WDL as run on Toil. """ - def __init__(self, file_store: AbstractFileStore): + def __init__(self, file_store: AbstractFileStore, execution_dir: Optional[str] = None): """ Set up the standard library. """ @@ -446,6 +447,8 @@ def __init__(self, file_store: AbstractFileStore): # Keep the file store around so we can access files. self._file_store = file_store + self._execution_dir = execution_dir + def _is_url(self, filename: str, schemes: List[str] = ['http:', 'https:', 's3:', 'gs:', TOIL_URI_SCHEME]) -> bool: """ Decide if a filename is a known kind of URL @@ -485,7 +488,12 @@ def _devirtualize_filename(self, filename: str) -> str: result = self._file_store.readGlobalFile(imported) else: # This is a local file - result = filename + # To support relative paths, join the execution dir and filename + # if filename is already an abs path, join() will do nothing + if self._execution_dir is not None: + result = os.path.join(self._execution_dir, filename) + else: + result = filename logger.debug('Devirtualized %s as openable file %s', filename, result) if not os.path.exists(result): @@ -501,11 +509,17 @@ def _virtualize_filename(self, filename: str) -> str: if self._is_url(filename): # Already virtual - logger.debug('Virtualized %s as WDL file %s', filename, filename) + logger.debug('Already virtualized %s as WDL file %s', filename, filename) return filename # Otherwise this is a local file and we want to fake it as a Toil file store file - file_id = self._file_store.writeGlobalFile(filename) + + # To support relative paths from execution directory, join the execution dir and filename + # If filename is already an abs path, join() will not do anything + if self._execution_dir is not None: + file_id = self._file_store.writeGlobalFile(os.path.join(self._execution_dir, filename)) + else: + file_id = self._file_store.writeGlobalFile(filename) result = pack_toil_uri(file_id, os.path.basename(filename)) logger.debug('Virtualized %s as WDL file %s', filename, result) return result @@ -737,15 +751,20 @@ def evaluate_decl(node: WDL.Tree.Decl, environment: WDLBindings, stdlib: WDL.Std return evaluate_named_expression(node, node.name, node.type, node.expr, environment, stdlib) -def evaluate_call_inputs(context: Union[WDL.Error.SourceNode, WDL.Error.SourcePosition], expressions: Dict[str, WDL.Expr.Base], environment: WDLBindings, stdlib: WDL.StdLib.Base) -> WDLBindings: +def evaluate_call_inputs(context: Union[WDL.Error.SourceNode, WDL.Error.SourcePosition], expressions: Dict[str, WDL.Expr.Base], environment: WDLBindings, stdlib: WDL.StdLib.Base, inputs_dict: Optional[Dict[str, WDL.Type.Base]] = None) -> WDLBindings: """ - Evaluate a bunch of expressions with names, and make them into a fresh set of bindings. + Evaluate a bunch of expressions with names, and make them into a fresh set of bindings. `inputs_dict` is a mapping of + variable names to their expected type for the input decls in a task. """ - new_bindings: WDLBindings = WDL.Env.Bindings() for k, v in expressions.items(): # Add each binding in turn - new_bindings = new_bindings.bind(k, evaluate_named_expression(context, k, None, v, environment, stdlib)) + # If the expected type is optional, then don't type check the lhs and rhs as miniwdl will return a StaticTypeMismatch error, so pass in None + expected_type = None + if not v.type.optional and inputs_dict is not None: + # This is done to enable passing in a string into a task input of file type + expected_type = inputs_dict.get(k, None) + new_bindings = new_bindings.bind(k, evaluate_named_expression(context, k, expected_type, v, environment, stdlib)) return new_bindings def evaluate_defaultable_decl(node: WDL.Tree.Decl, environment: WDLBindings, stdlib: WDL.StdLib.Base) -> WDL.Value.Base: @@ -756,7 +775,10 @@ def evaluate_defaultable_decl(node: WDL.Tree.Decl, environment: WDLBindings, std try: if node.name in environment and not isinstance(environment[node.name], WDL.Value.Null): logger.debug('Name %s is already defined with a non-null value, not using default', node.name) - return environment[node.name] + if not isinstance(environment[node.name], type(node.type)): + return environment[node.name].coerce(node.type) + else: + return environment[node.name] else: if node.type is not None and not node.type.optional and node.expr is None: # We need a value for this but there isn't one. @@ -965,7 +987,7 @@ class WDLBaseJob(Job): as the job's run method calls postprocess(). """ - def __init__(self, **kwargs: Any) -> None: + def __init__(self, execution_dir: Optional[str] = None, **kwargs: Any) -> None: """ Make a WDL-related job. @@ -992,6 +1014,8 @@ def __init__(self, **kwargs: Any) -> None: # jobs returning other jobs' promised RVs. self._postprocessing_steps: List[Tuple[str, Union[str, Promised[WDLBindings]]]] = [] + self._execution_dir = execution_dir + # TODO: We're not allowed by MyPy to override a method and widen the return # type, so this has to be Any. def run(self, file_store: AbstractFileStore) -> Any: @@ -1481,11 +1505,11 @@ class WDLWorkflowNodeJob(WDLBaseJob): Job that evaluates a WDL workflow node. """ - def __init__(self, node: WDL.Tree.WorkflowNode, prev_node_results: Sequence[Promised[WDLBindings]], namespace: str, **kwargs: Any) -> None: + def __init__(self, node: WDL.Tree.WorkflowNode, prev_node_results: Sequence[Promised[WDLBindings]], namespace: str, execution_dir: Optional[str] = None, **kwargs: Any) -> None: """ Make a new job to run a workflow node to completion. """ - super().__init__(unitName=node.workflow_node_id, displayName=node.workflow_node_id, **kwargs) + super().__init__(unitName=node.workflow_node_id, displayName=node.workflow_node_id, execution_dir=execution_dir, **kwargs) self._node = node self._prev_node_results = prev_node_results @@ -1504,59 +1528,64 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: # Combine the bindings we get from previous jobs incoming_bindings = combine_bindings(unwrap_all(self._prev_node_results)) # Set up the WDL standard library - standard_library = ToilWDLStdLibBase(file_store) - - if isinstance(self._node, WDL.Tree.Decl): - # This is a variable assignment - logger.info('Setting %s to %s', self._node.name, self._node.expr) - value = evaluate_decl(self._node, incoming_bindings, standard_library) - return self.postprocess(incoming_bindings.bind(self._node.name, value)) - elif isinstance(self._node, WDL.Tree.Call): - # This is a call of a task or workflow - - # Fetch all the inputs we are passing and bind them. - # The call is only allowed to use these. - logger.debug("Evaluating step inputs") - input_bindings = evaluate_call_inputs(self._node, self._node.inputs, incoming_bindings, standard_library) - - # Bindings may also be added in from the enclosing workflow inputs - # TODO: this is letting us also inject them from the workflow body. - # TODO: Can this result in picking up non-namespaced values that - # aren't meant to be inputs, by not changing their names? - passed_down_bindings = incoming_bindings.enter_namespace(self._node.name) - - if isinstance(self._node.callee, WDL.Tree.Workflow): - # This is a call of a workflow - subjob: WDLBaseJob = WDLWorkflowJob(self._node.callee, [input_bindings, passed_down_bindings], self._node.callee_id, f'{self._namespace}.{self._node.name}') + standard_library = ToilWDLStdLibBase(file_store, self._execution_dir) + with monkeypatch_coerce(standard_library): + if isinstance(self._node, WDL.Tree.Decl): + # This is a variable assignment + logger.info('Setting %s to %s', self._node.name, self._node.expr) + value = evaluate_decl(self._node, incoming_bindings, standard_library) + return self.postprocess(incoming_bindings.bind(self._node.name, value)) + elif isinstance(self._node, WDL.Tree.Call): + # This is a call of a task or workflow + + # Fetch all the inputs we are passing and bind them. + # The call is only allowed to use these. + logger.debug("Evaluating step inputs") + if self._node.callee is None: + # This should never be None, but mypy gets unhappy and this is better than an assert + inputs_mapping = None + else: + inputs_mapping = {e.name: e.type for e in self._node.callee.inputs or []} + input_bindings = evaluate_call_inputs(self._node, self._node.inputs, incoming_bindings, standard_library, inputs_mapping) + + # Bindings may also be added in from the enclosing workflow inputs + # TODO: this is letting us also inject them from the workflow body. + # TODO: Can this result in picking up non-namespaced values that + # aren't meant to be inputs, by not changing their names? + passed_down_bindings = incoming_bindings.enter_namespace(self._node.name) + + if isinstance(self._node.callee, WDL.Tree.Workflow): + # This is a call of a workflow + subjob: WDLBaseJob = WDLWorkflowJob(self._node.callee, [input_bindings, passed_down_bindings], self._node.callee_id, f'{self._namespace}.{self._node.name}', self._execution_dir) + self.addChild(subjob) + elif isinstance(self._node.callee, WDL.Tree.Task): + # This is a call of a task + subjob = WDLTaskJob(self._node.callee, [input_bindings, passed_down_bindings], self._node.callee_id, f'{self._namespace}.{self._node.name}') + self.addChild(subjob) + else: + raise WDL.Error.InvalidType(self._node, "Cannot call a " + str(type(self._node.callee))) + + # We need to agregate outputs namespaced with our node name, and existing bindings + subjob.then_namespace(self._node.name) + subjob.then_overlay(incoming_bindings) + self.defer_postprocessing(subjob) + return subjob.rv() + elif isinstance(self._node, WDL.Tree.Scatter): + subjob = WDLScatterJob(self._node, [incoming_bindings], self._namespace, self._execution_dir) self.addChild(subjob) - elif isinstance(self._node.callee, WDL.Tree.Task): - # This is a call of a task - subjob = WDLTaskJob(self._node.callee, [input_bindings, passed_down_bindings], self._node.callee_id, f'{self._namespace}.{self._node.name}') + # Scatters don't really make a namespace, just kind of a scope? + # TODO: Let stuff leave scope! + self.defer_postprocessing(subjob) + return subjob.rv() + elif isinstance(self._node, WDL.Tree.Conditional): + subjob = WDLConditionalJob(self._node, [incoming_bindings], self._namespace, self._execution_dir) self.addChild(subjob) + # Conditionals don't really make a namespace, just kind of a scope? + # TODO: Let stuff leave scope! + self.defer_postprocessing(subjob) + return subjob.rv() else: - raise WDL.Error.InvalidType(self._node, "Cannot call a " + str(type(self._node.callee))) - - # We need to agregate outputs namespaced with our node name, and existing bindings - subjob.then_namespace(self._node.name) - subjob.then_overlay(incoming_bindings) - self.defer_postprocessing(subjob) - return subjob.rv() - elif isinstance(self._node, WDL.Tree.Scatter): - subjob = WDLScatterJob(self._node, [incoming_bindings], self._namespace) - self.addChild(subjob) - # Scatters don't really make a namespace, just kind of a scope? - # TODO: Let stuff leave scope! - self.defer_postprocessing(subjob) - return subjob.rv() - elif isinstance(self._node, WDL.Tree.Conditional): - subjob = WDLConditionalJob(self._node, [incoming_bindings], self._namespace) - self.addChild(subjob) - # Conditionals don't really make a namespace, just kind of a scope? - # TODO: Let stuff leave scope! - self.defer_postprocessing(subjob) - return subjob.rv() - else: - raise WDL.Error.InvalidType(self._node, "Unimplemented WorkflowNode: " + str(type(self._node))) + raise WDL.Error.InvalidType(self._node, "Unimplemented WorkflowNode: " + str(type(self._node))) class WDLWorkflowNodeListJob(WDLBaseJob): """ @@ -1565,11 +1594,11 @@ class WDLWorkflowNodeListJob(WDLBaseJob): workflows or tasks or sections. """ - def __init__(self, nodes: List[WDL.Tree.WorkflowNode], prev_node_results: Sequence[Promised[WDLBindings]], namespace: str, **kwargs: Any) -> None: + def __init__(self, nodes: List[WDL.Tree.WorkflowNode], prev_node_results: Sequence[Promised[WDLBindings]], namespace: str, execution_dir: Optional[str] = None, **kwargs: Any) -> None: """ Make a new job to run a list of workflow nodes to completion. """ - super().__init__(unitName=nodes[0].workflow_node_id + '+', displayName=nodes[0].workflow_node_id + '+', **kwargs) + super().__init__(unitName=nodes[0].workflow_node_id + '+', displayName=nodes[0].workflow_node_id + '+', execution_dir=execution_dir, **kwargs) self._nodes = nodes self._prev_node_results = prev_node_results @@ -1588,16 +1617,17 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: # Combine the bindings we get from previous jobs current_bindings = combine_bindings(unwrap_all(self._prev_node_results)) # Set up the WDL standard library - standard_library = ToilWDLStdLibBase(file_store) - - for node in self._nodes: - if isinstance(node, WDL.Tree.Decl): - # This is a variable assignment - logger.info('Setting %s to %s', node.name, node.expr) - value = evaluate_decl(node, current_bindings, standard_library) - current_bindings = current_bindings.bind(node.name, value) - else: - raise WDL.Error.InvalidType(node, "Unimplemented WorkflowNode: " + str(type(node))) + standard_library = ToilWDLStdLibBase(file_store, self._execution_dir) + + with monkeypatch_coerce(standard_library): + for node in self._nodes: + if isinstance(node, WDL.Tree.Decl): + # This is a variable assignment + logger.info('Setting %s to %s', node.name, node.expr) + value = evaluate_decl(node, current_bindings, standard_library) + current_bindings = current_bindings.bind(node.name, value) + else: + raise WDL.Error.InvalidType(node, "Unimplemented WorkflowNode: " + str(type(node))) return self.postprocess(current_bindings) @@ -1766,12 +1796,12 @@ class WDLSectionJob(WDLBaseJob): Job that can create more graph for a section of the wrokflow. """ - def __init__(self, namespace: str, **kwargs: Any) -> None: + def __init__(self, namespace: str, execution_dir: Optional[str] = None, **kwargs: Any) -> None: """ Make a WDLSectionJob where the interior runs in the given namespace, starting with the root workflow. """ - super().__init__(**kwargs) + super().__init__(execution_dir, **kwargs) self._namespace = namespace @staticmethod @@ -1917,10 +1947,10 @@ def get_job_set_any(wdl_ids: Set[str]) -> List[WDLBaseJob]: if len(node_ids) == 1: # Make a one-node job - job: WDLBaseJob = WDLWorkflowNodeJob(section_graph.get(node_ids[0]), rvs, self._namespace) + job: WDLBaseJob = WDLWorkflowNodeJob(section_graph.get(node_ids[0]), rvs, self._namespace, self._execution_dir) else: # Make a multi-node job - job = WDLWorkflowNodeListJob([section_graph.get(node_id) for node_id in node_ids], rvs, self._namespace) + job = WDLWorkflowNodeListJob([section_graph.get(node_id) for node_id in node_ids], rvs, self._namespace, self._execution_dir) for prev_job in prev_jobs: # Connect up the happens-after relationships to make sure the # return values are available. @@ -2017,11 +2047,11 @@ class WDLScatterJob(WDLSectionJob): instance of the body. If an instance of the body doesn't create a binding, it gets a null value in the corresponding array. """ - def __init__(self, scatter: WDL.Tree.Scatter, prev_node_results: Sequence[Promised[WDLBindings]], namespace: str, **kwargs: Any) -> None: + def __init__(self, scatter: WDL.Tree.Scatter, prev_node_results: Sequence[Promised[WDLBindings]], namespace: str, execution_dir: Optional[str] = None, **kwargs: Any) -> None: """ Create a subtree that will run a WDL scatter. The scatter itself and the contents live in the given namespace. """ - super().__init__(namespace, **kwargs, unitName=scatter.workflow_node_id, displayName=scatter.workflow_node_id) + super().__init__(namespace, **kwargs, unitName=scatter.workflow_node_id, displayName=scatter.workflow_node_id, execution_dir=execution_dir) # Because we need to return the return value of the workflow, we need # to return a Toil promise for the last/sink job in the workflow's @@ -2050,7 +2080,8 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: standard_library = ToilWDLStdLibBase(file_store) # Get what to scatter over - scatter_value = evaluate_named_expression(self._scatter, self._scatter.variable, None, self._scatter.expr, bindings, standard_library) + with monkeypatch_coerce(standard_library): + scatter_value = evaluate_named_expression(self._scatter, self._scatter.variable, None, self._scatter.expr, bindings, standard_library) if not isinstance(scatter_value, WDL.Value.Array): raise RuntimeError("The returned value from a scatter is not an Array type.") @@ -2153,11 +2184,11 @@ class WDLConditionalJob(WDLSectionJob): """ Job that evaluates a conditional in a WDL workflow. """ - def __init__(self, conditional: WDL.Tree.Conditional, prev_node_results: Sequence[Promised[WDLBindings]], namespace: str, **kwargs: Any) -> None: + def __init__(self, conditional: WDL.Tree.Conditional, prev_node_results: Sequence[Promised[WDLBindings]], namespace: str, execution_dir: Optional[str] = None, **kwargs: Any) -> None: """ Create a subtree that will run a WDL conditional. The conditional itself and its contents live in the given namespace. """ - super().__init__(namespace, **kwargs, unitName=conditional.workflow_node_id, displayName=conditional.workflow_node_id) + super().__init__(namespace, **kwargs, unitName=conditional.workflow_node_id, displayName=conditional.workflow_node_id, execution_dir=execution_dir) # Once again we need to ship the whole body template to be instantiated # into Toil jobs only if it will actually run. @@ -2182,7 +2213,8 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: standard_library = ToilWDLStdLibBase(file_store) # Get the expression value. Fake a name. - expr_value = evaluate_named_expression(self._conditional, "", WDL.Type.Boolean(), self._conditional.expr, bindings, standard_library) + with monkeypatch_coerce(standard_library): + expr_value = evaluate_named_expression(self._conditional, "", WDL.Type.Boolean(), self._conditional.expr, bindings, standard_library) if expr_value.value: # Evaluated to true! @@ -2203,7 +2235,7 @@ class WDLWorkflowJob(WDLSectionJob): Job that evaluates an entire WDL workflow. """ - def __init__(self, workflow: WDL.Tree.Workflow, prev_node_results: Sequence[Promised[WDLBindings]], workflow_id: List[str], namespace: str, **kwargs: Any) -> None: + def __init__(self, workflow: WDL.Tree.Workflow, prev_node_results: Sequence[Promised[WDLBindings]], workflow_id: List[str], namespace: str, execution_dir: Optional[str] = None, **kwargs: Any) -> None: """ Create a subtree that will run a WDL workflow. The job returns the return value of the workflow. @@ -2211,7 +2243,7 @@ def __init__(self, workflow: WDL.Tree.Workflow, prev_node_results: Sequence[Prom :param namespace: the namespace that the workflow's *contents* will be in. Caller has already added the workflow's own name. """ - super().__init__(namespace, **kwargs) + super().__init__(namespace, execution_dir, **kwargs) # Because we need to return the return value of the workflow, we need # to return a Toil promise for the last/sink job in the workflow's @@ -2240,19 +2272,20 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: # For a task we only see the insode-the-task namespace. bindings = combine_bindings(unwrap_all(self._prev_node_results)) # Set up the WDL standard library - standard_library = ToilWDLStdLibBase(file_store) + standard_library = ToilWDLStdLibBase(file_store, self._execution_dir) if self._workflow.inputs: - for input_decl in self._workflow.inputs: - # Evaluate all the inputs that aren't pre-set - bindings = bindings.bind(input_decl.name, evaluate_defaultable_decl(input_decl, bindings, standard_library)) + with monkeypatch_coerce(standard_library): + for input_decl in self._workflow.inputs: + # Evaluate all the inputs that aren't pre-set + bindings = bindings.bind(input_decl.name, evaluate_defaultable_decl(input_decl, bindings, standard_library)) # Make jobs to run all the parts of the workflow sink = self.create_subgraph(self._workflow.body, [], bindings) if self._workflow.outputs: # Add evaluating the outputs after the sink - outputs_job = WDLOutputsJob(self._workflow.outputs, sink.rv()) + outputs_job = WDLOutputsJob(self._workflow.outputs, sink.rv(), self._execution_dir) sink.addFollowOn(outputs_job) # Caller is responsible for making sure namespaces are applied self.defer_postprocessing(outputs_job) @@ -2268,11 +2301,11 @@ class WDLOutputsJob(WDLBaseJob): Returns an environment with just the outputs bound, in no namespace. """ - def __init__(self, outputs: List[WDL.Tree.Decl], bindings: Promised[WDLBindings], **kwargs: Any): + def __init__(self, outputs: List[WDL.Tree.Decl], bindings: Promised[WDLBindings], execution_dir: Optional[str] = None, **kwargs: Any): """ Make a new WDLWorkflowOutputsJob for the given workflow, with the given set of bindings after its body runs. """ - super().__init__(**kwargs) + super().__init__(execution_dir, **kwargs) self._outputs = outputs self._bindings = bindings @@ -2284,13 +2317,14 @@ def run(self, file_store: AbstractFileStore) -> WDLBindings: super().run(file_store) # Evaluate all the outputs in the normal, non-task-outputs library context - standard_library = ToilWDLStdLibBase(file_store) + standard_library = ToilWDLStdLibBase(file_store, self._execution_dir) # Combine the bindings from the previous job output_bindings = evaluate_output_decls(self._outputs, unwrap(self._bindings), standard_library) return self.postprocess(output_bindings) + class WDLRootJob(WDLSectionJob): """ Job that evaluates an entire WDL workflow, and returns the workflow outputs @@ -2298,13 +2332,13 @@ class WDLRootJob(WDLSectionJob): the workflow name; both forms are accepted. """ - def __init__(self, workflow: WDL.Tree.Workflow, inputs: WDLBindings, **kwargs: Any) -> None: + def __init__(self, workflow: WDL.Tree.Workflow, inputs: WDLBindings, execution_dir: Optional[str] = None, **kwargs: Any) -> None: """ Create a subtree to run the workflow and namespace the outputs. """ # The root workflow names the root namespace - super().__init__(workflow.name, **kwargs) + super().__init__(workflow.name, execution_dir, **kwargs) self._workflow = workflow self._inputs = inputs @@ -2317,12 +2351,47 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: # Run the workflow. We rely in this to handle entering the input # namespace if needed, or handling free-floating inputs. - workflow_job = WDLWorkflowJob(self._workflow, [self._inputs], [self._workflow.name], self._namespace) + workflow_job = WDLWorkflowJob(self._workflow, [self._inputs], [self._workflow.name], self._namespace, self._execution_dir) workflow_job.then_namespace(self._namespace) self.addChild(workflow_job) self.defer_postprocessing(workflow_job) return workflow_job.rv() +@contextmanager +def monkeypatch_coerce(standard_library: ToilWDLStdLibBase) -> Generator[None, None, None]: + """ + Monkeypatch miniwdl's WDL.Value.Base.coerce() function to virtualize files when they are represented as Strings. + Calls _virtualize_filename from a given standard library object. + :param standard_library: a standard library object + :return + """ + # We're doing this because while miniwdl recognizes when a string needs to be converted into a file, it's method of + # conversion is to just store the local filepath. Toil needs to virtualize the file into the jobstore so until + # there is an internal entrypoint, monkeypatch it. + def base_coerce(self: WDL.Value.Base, desired_type: Optional[WDL.Type.Base] = None) -> WDL.Value.Base: + if isinstance(desired_type, WDL.Type.File): + self.value = standard_library._virtualize_filename(self.value) + return self + return old_base_coerce(self, desired_type) # old_coerce will recurse back into this monkey patched coerce + def string_coerce(self: WDL.Value.String, desired_type: Optional[WDL.Type.Base] = None) -> WDL.Value.Base: + # Sometimes string coerce is called instead, so monkeypatch this one as well + if isinstance(desired_type, WDL.Type.File) and not isinstance(self, WDL.Type.File): + return WDL.Value.File(standard_library._virtualize_filename(self.value), self.expr) + return old_str_coerce(self, desired_type) + + old_base_coerce = WDL.Value.Base.coerce + old_str_coerce = WDL.Value.String.coerce + try: + # Mypy does not like monkeypatching: + # https://github.com/python/mypy/issues/2427#issuecomment-1419206807 + WDL.Value.Base.coerce = base_coerce # type: ignore[method-assign] + WDL.Value.String.coerce = string_coerce # type: ignore[method-assign] + yield + finally: + WDL.Value.Base.coerce = old_base_coerce # type: ignore[method-assign] + WDL.Value.String.coerce = old_str_coerce # type: ignore[method-assign] + + def main() -> None: """ A Toil workflow to interpret WDL input files. @@ -2411,8 +2480,11 @@ def main() -> None: # TODO: Automatically set a good MINIWDL__SINGULARITY__IMAGE_CACHE ? + # Get the execution directory + execution_dir = os.getcwd() + # Run the workflow and get its outputs namespaced with the workflow name. - root_job = WDLRootJob(document.workflow, input_bindings) + root_job = WDLRootJob(document.workflow, input_bindings, execution_dir) output_bindings = toil.start(root_job) if not isinstance(output_bindings, WDL.Env.Bindings): raise RuntimeError("The output of the WDL job is not a binding.")