From ff9302cfd1c8672f0ef706c058f609cd00e5a6ed Mon Sep 17 00:00:00 2001 From: Adam Novak Date: Wed, 4 Oct 2023 01:35:49 -0400 Subject: [PATCH 1/4] Separate out integration tests to run on a schedule (#4612) * Reorganize tests and move integration tests to scheduled pipeline runs * Also handle tags --- .gitlab-ci.yml | 108 +++++++++++++++++++++++++++++++------------------ 1 file changed, 69 insertions(+), 39 deletions(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index a3f4bef60f..3a738b74f2 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -63,6 +63,8 @@ stages: lint: + rules: + - if: $CI_PIPELINE_SOURCE != "schedule" stage: linting_and_dependencies script: - pwd @@ -73,6 +75,8 @@ lint: cwl_dependency_is_stand_alone: + rules: + - if: $CI_PIPELINE_SOURCE != "schedule" stage: linting_and_dependencies script: - pwd @@ -81,6 +85,8 @@ cwl_dependency_is_stand_alone: wdl_dependency_is_stand_alone: + rules: + - if: $CI_PIPELINE_SOURCE != "schedule" stage: linting_and_dependencies script: - pwd @@ -88,6 +94,8 @@ wdl_dependency_is_stand_alone: - make test threads="${TEST_THREADS}" marker="${MARKER}" tests=src/toil/test/wdl/toilwdlTest.py::ToilWdlTest::testMD5sum quick_test_offline: + rules: + - if: $CI_PIPELINE_SOURCE != "schedule" stage: basic_tests script: - ${MAIN_PYTHON_PKG} -m virtualenv venv @@ -98,6 +106,8 @@ quick_test_offline: - TOIL_TEST_QUICK=True make test_offline threads="${TEST_THREADS}" py37_appliance_build: + rules: + - if: $CI_PIPELINE_SOURCE == "schedule" || $CI_COMMIT_TAG || $CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH stage: basic_tests script: - pwd @@ -107,6 +117,8 @@ py37_appliance_build: - make push_docker py38_appliance_build: + rules: + - if: $CI_PIPELINE_SOURCE == "schedule" || $CI_COMMIT_TAG || $CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH stage: basic_tests script: - pwd @@ -125,6 +137,8 @@ py39_appliance_build: - make push_docker py310_appliance_build: + rules: + - if: $CI_PIPELINE_SOURCE == "schedule" || $CI_COMMIT_TAG || $CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH stage: basic_tests script: - pwd @@ -134,6 +148,8 @@ py310_appliance_build: - make push_docker py310_main: + rules: + - if: $CI_PIPELINE_SOURCE != "schedule" stage: basic_tests script: - pwd @@ -142,6 +158,8 @@ py310_main: - TOIL_SKIP_DOCKER=true make test threads="${TEST_THREADS}" tests=src/toil/test/lib batch_systems: + rules: + - if: $CI_PIPELINE_SOURCE != "schedule" stage: main_tests script: - pwd @@ -174,7 +192,9 @@ batch_systems: - kill $(jobs -p) || true slurm_test: - stage: main_tests + rules: + - if: $CI_PIPELINE_SOURCE == "schedule" || $CI_COMMIT_TAG + stage: integration script: - pwd - cd contrib/slurm-test/ @@ -182,7 +202,9 @@ slurm_test: - ./slurm_test.sh cwl_v1.2: - stage: main_tests + rules: + - if: $CI_PIPELINE_SOURCE == "schedule" || $CI_COMMIT_TAG + stage: integration script: - pwd - ${MAIN_PYTHON_PKG} -m virtualenv venv && . venv/bin/activate && pip install -U pip wheel && make prepare && make develop extras=[cwl,aws] @@ -190,7 +212,9 @@ cwl_v1.2: - make test threads="${TEST_THREADS}" tests=src/toil/test/cwl/cwlTest.py::CWLv12Test::test_run_conformance_with_in_place_update cwl_on_arm: - stage: main_tests + rules: + - if: $CI_PIPELINE_SOURCE == "schedule" || $CI_COMMIT_TAG + stage: integration script: - pwd - ${MAIN_PYTHON_PKG} -m virtualenv venv && . venv/bin/activate && pip install -U pip wheel && make prepare && make develop extras=[cwl,aws] @@ -201,6 +225,8 @@ cwl_on_arm: - make test threads="${TEST_THREADS}" tests=src/toil/test/cwl/cwlTest.py::CWLOnARMTest cwl_misc: + rules: + - if: $CI_PIPELINE_SOURCE != "schedule" || $CI_COMMIT_TAG stage: main_tests script: - pwd @@ -208,36 +234,6 @@ cwl_misc: - python setup_gitlab_docker.py # login to increase the docker.io rate limit - make test threads="${TEST_THREADS}" tests='src/toil/test/cwl/cwlTest.py -k "CWLWorkflowTest or cwl_small"' -cwl_v1.0_kubernetes: - stage: main_tests - only: [] - script: - - pwd - - ${MAIN_PYTHON_PKG} -m virtualenv venv && . venv/bin/activate && pip install -U pip wheel && make prepare && make develop extras=[cwl,aws,kubernetes] - - export TOIL_KUBERNETES_OWNER=toiltest - - export TOIL_AWS_SECRET_NAME=shared-s3-credentials - - export TOIL_KUBERNETES_HOST_PATH=/data/scratch - - export TOIL_WORKDIR=/var/lib/toil - - export SINGULARITY_CACHEDIR=/var/lib/toil/singularity-cache - - if [[ ! -z "${KUBERNETES_DOCKER_HUB_MIRROR}" ]] ; then export SINGULARITY_DOCKER_HUB_MIRROR="${KUBERNETES_DOCKER_HUB_MIRROR}" ; fi - - mkdir -p ${TOIL_WORKDIR} - - make test threads="${TEST_THREADS}" tests="src/toil/test/cwl/cwlTest.py::CWLv10Test::test_kubernetes_cwl_conformance src/toil/test/cwl/cwlTest.py::CWLv10Test::test_kubernetes_cwl_conformance_with_caching" - -cwl_v1.1_kubernetes: - stage: main_tests - only: [] - script: - - pwd - - ${MAIN_PYTHON_PKG} -m virtualenv venv && . venv/bin/activate && pip install -U pip wheel && make prepare && make develop extras=[cwl,aws,kubernetes] - - export TOIL_KUBERNETES_OWNER=toiltest - - export TOIL_AWS_SECRET_NAME=shared-s3-credentials - - export TOIL_KUBERNETES_HOST_PATH=/data/scratch - - export TOIL_WORKDIR=/var/lib/toil - - export SINGULARITY_CACHEDIR=/var/lib/toil/singularity-cache - - if [[ ! -z "${KUBERNETES_DOCKER_HUB_MIRROR}" ]] ; then export SINGULARITY_DOCKER_HUB_MIRROR="${KUBERNETES_DOCKER_HUB_MIRROR}" ; fi - - mkdir -p ${TOIL_WORKDIR} - - make test threads="${TEST_THREADS}" tests="src/toil/test/cwl/cwlTest.py::CWLv11Test::test_kubernetes_cwl_conformance src/toil/test/cwl/cwlTest.py::CWLv11Test::test_kubernetes_cwl_conformance_with_caching" - #cwl_v1.2_kubernetes: # stage: main_tests # script: @@ -260,6 +256,8 @@ cwl_v1.1_kubernetes: # expire_in: 14 days wdl: + rules: + - if: $CI_PIPELINE_SOURCE != "schedule" stage: main_tests script: - pwd @@ -269,16 +267,45 @@ wdl: - which java &> /dev/null || { echo >&2 "Java is not installed. Install java to run these tests."; exit 1; } - make test threads="${TEST_THREADS}" marker="${MARKER}" tests="src/toil/test/wdl/toilwdlTest.py src/toil/test/wdl/builtinTest.py" # needs java (default-jre) to run "GATK.jar" -jobstore_and_provisioning: +jobstore: + rules: + - if: $CI_PIPELINE_SOURCE != "schedule" stage: main_tests script: - pwd - - ${MAIN_PYTHON_PKG} -m virtualenv venv && . venv/bin/activate && pip install -U pip wheel && make prepare && make develop extras=[all] packages='htcondor==10.2.0.post1' - - make test threads="${TEST_THREADS}" marker="${MARKER}" tests="src/toil/test/lib/aws/ src/toil/test/jobStores/jobStoreTest.py src/toil/test/sort/sortTest.py src/toil/test/provisioners/aws/awsProvisionerTest.py src/toil/test/provisioners/clusterScalerTest.py" + - ${MAIN_PYTHON_PKG} -m virtualenv venv && . venv/bin/activate && pip install -U pip wheel && make prepare && make develop extras=[all] packages='htcondor==10.2.0.post1' + - make test threads="${TEST_THREADS}" marker="${MARKER}" tests="src/toil/test/jobStores/jobStoreTest.py src/toil/test/sort/sortTest.py" + +provisioner: + rules: + - if: $CI_PIPELINE_SOURCE != "schedule" + stage: main_tests + script: + - pwd + - ${MAIN_PYTHON_PKG} -m virtualenv venv && . venv/bin/activate && pip install -U pip wheel && make prepare && make develop extras=[all] packages='htcondor==10.2.0.post1' + - make test threads="${TEST_THREADS}" marker="${MARKER}" tests="src/toil/test/lib/aws/ src/toil/test/provisioners/aws/awsProvisionerTest.py src/toil/test/provisioners/clusterScalerTest.py" + # https://ucsc-ci.com/databiosphere/toil/-/jobs/38672 # guessing decorators are masking class as function? ^ also, abstract class is run as normal test? should hide. -integration: +jobstore_integration: + rules: + - if: $CI_PIPELINE_SOURCE == "schedule" || $CI_COMMIT_TAG + stage: integration + script: + - pwd + - ${MAIN_PYTHON_PKG} -m virtualenv venv && . venv/bin/activate && pip install -U pip wheel && make prepare && make develop extras=[all] packages='htcondor==10.2.0.post1' + - export TOIL_TEST_INTEGRATIVE=True + - export TOIL_AWS_KEYNAME=id_rsa + - export TOIL_AWS_ZONE=us-west-2a + # This reads GITLAB_SECRET_FILE_SSH_KEYS + - python setup_gitlab_ssh.py + - chmod 400 /root/.ssh/id_rsa + - make test threads="${TEST_THREADS}" tests="src/toil/test/jobStores/jobStoreTest.py" + +server_integration: + rules: + - if: $CI_PIPELINE_SOURCE == "schedule" || $CI_COMMIT_TAG stage: integration script: - pwd @@ -289,11 +316,12 @@ integration: # This reads GITLAB_SECRET_FILE_SSH_KEYS - python setup_gitlab_ssh.py - chmod 400 /root/.ssh/id_rsa - # Test integration with job stores # Test server and its integration with AWS - - make test threads="${TEST_THREADS}" tests="src/toil/test/jobStores/jobStoreTest.py src/toil/test/server" + - make test threads="${TEST_THREADS}" tests="src/toil/test/server" provisioner_integration: + rules: + - if: $CI_PIPELINE_SOURCE == "schedule" || $CI_COMMIT_TAG stage: integration script: - pwd @@ -309,6 +337,8 @@ provisioner_integration: # - make test tests=src/toil/test/provisioners/gceProvisionerTest.py # needs env vars set to run google_jobstore: + rules: + - if: $CI_PIPELINE_SOURCE == "schedule" || $CI_COMMIT_TAG stage: integration script: - pwd From f8f9b41a4dd6476b499bf1b6c1047f575b699f4d Mon Sep 17 00:00:00 2001 From: stxue1 <122345910+stxue1@users.noreply.github.com> Date: Thu, 5 Oct 2023 07:46:45 -0700 Subject: [PATCH 2/4] Add config file support (#4569) * Centralize defaults * Add requirements * Grab logLevel grabbed logLevel used to be the default in Config(), so grab effective logLevel that is set * Satisfy mypy mypy might still complain about missing stubs for configargparser though * Fix wrong default * add config tool * temp fix config sets defaults but so does argparse, runs twice in workflows but deals with tests * Fix create_config for tests instead * Fix setting of config defaults * Go back to previous method, create defaults at init * Fix default cli options set * Centralize, config util, and read properly * Fix type hinting to support 3.9 * mypy * Fix cwl edge case * Fix tests * fix typos, always generate config, fix some tests * Remove subprocess as maybe tests are flaky on CI with it? * just run quick_test_offline * make CI print stuff * Harden default config creation against races * Cleanup and argument renaming * Fix bad yaml and toil status bug * Fix mypy * Change behavior of --stats and --clean * Change test behavior as options namespace and config now have the same behavior * Put forgotten line ouch * Batchsystem, requirements, fixes for tests * Mypy conformance * Mypy conformance * Fix retryCount argument and kubernetesPodTimeout type * Only run batchsystem and slurm_test tests on CI * Whoops, this implementation never worked * Add pyyaml to requirements for slurm to pass * Add rest of gitlab CI back and run all tests * Update stub file to be compatible with updated mypy * Fix environment CLI option * Update provisioner test to use configargparse * Code cleanup and add jobstore_as_flag to DefaultArgumentParser etc * Fix toil config test * Add suggestions * Deprecate options, add underscore CLI options only for newly deprecated options * Update docs/argparse help and fix bug with deprecated options also make most generic arg as default for runLocalJobsOnWorkers * Add config file section to docs * Remove upper bound for ruamel requirements * Remove redundancies and improve disableCaching's destination name * Update src/toil/batchSystems/kubernetes.py Co-authored-by: Adam Novak * Remove redundant line in status util * Remove comments in configargparse stub * Workaround to get action=append instead of nargs and get proper backwards compatibility Fix wrong name for link_imports and move_exports, remove new unused functions * Import SYS_MAX_SIZE from common rather than duplicating it * Mypy and syntax errors * Move config options back to the old naming syntax * Change names for link_imports and move_exports to camelCase options * Fix formatting * Bring back old --restart and --clean functionality where they collide and raise an error * Make debug less spammy and remove unused types * Disable kubernetes temporarily * Revert changes to --restart and --clean collision * Typo in tests * Change some comments and add member fields to config * Fix pickling error when jobstate file doesnt exist and fix threading error when lock file exists then disappears (#4575) Co-authored-by: Brandon Walker Co-authored-by: Adam Novak * Reduce the number of assert statements (#4590) * Change all asserts to raising errors for central toil files Co-authored-by: Adam Novak * Fix mypy and update docs to match options in common * Update src/toil/common.py Co-authored-by: Adam Novak --------- Co-authored-by: Adam Novak Co-authored-by: Brandon Walker <43654521+misterbrandonwalker@users.noreply.github.com> Co-authored-by: Brandon Walker --- attic/toil-sort-example.py | 2 +- .../mypy-stubs/configargparse/__init__.pyi | 3 + .../configargparse/configargparse.pyi | 30 + docs/running/cliOptions.rst | 48 +- requirements.txt | 3 + src/toil/batchSystems/awsBatch.py | 12 +- src/toil/batchSystems/kubernetes.py | 56 +- src/toil/batchSystems/local_support.py | 4 +- src/toil/batchSystems/mesos/batchSystem.py | 12 +- src/toil/batchSystems/options.py | 86 +- src/toil/batchSystems/parasol.py | 13 +- src/toil/batchSystems/singleMachine.py | 10 +- src/toil/batchSystems/slurm.py | 2 +- src/toil/batchSystems/tes.py | 23 +- src/toil/common.py | 1143 +++++++++++------ src/toil/cwl/cwltoil.py | 5 +- src/toil/job.py | 37 +- src/toil/jobStores/fileJobStore.py | 4 +- src/toil/leader.py | 15 +- src/toil/lib/conversions.py | 1 + src/toil/server/app.py | 3 +- src/toil/server/cli/wes_cwl_runner.py | 4 +- src/toil/statsAndLogging.py | 2 + src/toil/test/cwl/mock_mpi/fake_mpi_run.py | 3 +- .../test/docs/scripts/example_alwaysfail.py | 4 +- .../docs/scripts/example_cachingbenchmark.py | 4 +- src/toil/test/jobStores/jobStoreTest.py | 3 +- src/toil/test/mesos/helloWorld.py | 5 +- src/toil/test/mesos/stress.py | 2 +- .../provisioners/aws/awsProvisionerTest.py | 4 +- src/toil/test/provisioners/restartScript.py | 4 +- src/toil/test/sort/restart_sort.py | 2 +- src/toil/test/sort/sort.py | 2 +- src/toil/test/src/fileStoreTest.py | 10 +- src/toil/test/src/jobDescriptionTest.py | 2 +- src/toil/test/src/resourceTest.py | 6 +- .../utils/ABCWorkflowDebug/debugWorkflow.py | 2 +- .../test/utils/ABCWorkflowDebug/mkFile.py | 4 +- src/toil/test/utils/utilsTest.py | 22 +- src/toil/utils/toilConfig.py | 35 + src/toil/utils/toilMain.py | 1 + src/toil/wdl/toilwdl.py | 4 +- src/toil/wdl/wdltoil.py | 3 +- src/toil/worker.py | 6 +- 44 files changed, 1055 insertions(+), 591 deletions(-) create mode 100644 contrib/mypy-stubs/configargparse/__init__.pyi create mode 100644 contrib/mypy-stubs/configargparse/configargparse.pyi create mode 100644 src/toil/utils/toilConfig.py diff --git a/attic/toil-sort-example.py b/attic/toil-sort-example.py index 4a40a6c202..808cc8adac 100644 --- a/attic/toil-sort-example.py +++ b/attic/toil-sort-example.py @@ -1,6 +1,6 @@ from __future__ import absolute_import from six.moves import xrange -from argparse import ArgumentParser +from configargparse import ArgumentParser import os import logging import random diff --git a/contrib/mypy-stubs/configargparse/__init__.pyi b/contrib/mypy-stubs/configargparse/__init__.pyi new file mode 100644 index 0000000000..373219c0d6 --- /dev/null +++ b/contrib/mypy-stubs/configargparse/__init__.pyi @@ -0,0 +1,3 @@ +from .configargparse import ArgParser as ArgParser +from .configargparse import YAMLConfigFileParser as YAMLConfigFileParser +from .configargparse import ArgumentParser as ArgumentParser diff --git a/contrib/mypy-stubs/configargparse/configargparse.pyi b/contrib/mypy-stubs/configargparse/configargparse.pyi new file mode 100644 index 0000000000..1094e9e30a --- /dev/null +++ b/contrib/mypy-stubs/configargparse/configargparse.pyi @@ -0,0 +1,30 @@ +import argparse +from typing import Sequence, Any, TypeVar, OrderedDict + +__all__ = [ + "ArgumentParser", + "YAMLConfigFileParser", + "ConfigFileParser" +] +_N = TypeVar("_N") + +class ConfigFileParser(object): + def get_syntax_description(self) -> Any: ... + def parse(self, stream: Any) -> Any: ... + def serialize(self, items: OrderedDict[Any, Any]) -> Any: ... + +class YAMLConfigFileParser(ConfigFileParser): + def get_syntax_description(self) -> str: ... + def parse(self, stream: Any) -> OrderedDict[Any, Any]: ... + def serialize(self, items: OrderedDict[Any, Any], default_flow_style: bool = ...) -> Any: ... + +class ArgumentParser(argparse.ArgumentParser): + @property + def _config_file_parser(self) -> Any: ... + + def __init__(self, *args: Any, **kwargs: Any) -> None: ... + # There may be a better way of type hinting this without a type: ignore, but mypy gets unhappy pretty much no matter what as the signatures for parse_args doesn't match with its superclass in argparse + def parse_args(self, args: Sequence[str] | None = None, namespace: Namespace | None = None, config_file_contents: str | None = None, env_vars: Any=None) -> Namespace: ... # type: ignore[override] + +Namespace = argparse.Namespace +ArgParser = ArgumentParser \ No newline at end of file diff --git a/docs/running/cliOptions.rst b/docs/running/cliOptions.rst index 4bf2602107..b9e42180a6 100644 --- a/docs/running/cliOptions.rst +++ b/docs/running/cliOptions.rst @@ -7,10 +7,29 @@ Commandline Options A quick way to see all of Toil's commandline options is by executing the following on a toil script:: - $ toil example.py --help + $ python example.py --help For a basic toil workflow, Toil has one mandatory argument, the job store. All other arguments are optional. +The Config File +------------- +Instead of changing the arguments on the CLI, Toil offers support for using a configuration file (Note: Support for the +configuration file and environmental variables require the use of ``configargparse``). + +To generate a default configuration file:: + + $ toil config [file].yaml + +After editing the config file, make Toil take in the new options:: + + $ python example.py --config=[file].yaml + +If CLI options are used in addition with the configuration file, the CLI options will overwrite the configuration file +options:: + + $ python example.py --config=[file].yaml --maxNodes 20 + # maxNodes=[20] even though default maxNodes=[10] + The Job Store ------------- @@ -228,22 +247,19 @@ levels in toil are based on priority from the logging module: **Data Storage Options** Allows configuring Toil's data storage. - --linkImports When using a filesystem based job store, CWL input files - are by default symlinked in. Specifying this option + --symlinkImports BOOL When using a filesystem based job store, CWL input files + are by default symlinked in. Setting this option to True instead copies the files into the job store, which may - protect them from being modified externally. When not - specified and as long as caching is enabled, Toil will + protect them from being modified externally. When set + to False and as long as caching is enabled, Toil will protect the file automatically by changing the permissions - to read-only. - --moveExports When using a filesystem based job store, output files + to read-only. (Default=True) + --moveOutputs BOOL When using a filesystem based job store, output files are by default moved to the output directory, and a symlink to the moved exported file is created at the - initial location. Specifying this option instead copies - the files into the output directory. Applies to - filesystem-based job stores only. - --disableCaching Disables caching in the file store. This flag must be - set to use a batch system that does not support - cleanup, such as Parasol. + initial location. Setting this option to True instead + copies the files into the output directory. Applies to + filesystem-based job stores only. (Default=False) --caching BOOL Set caching options. This must be set to "false" to use a batch system that does not support cleanup, such as Parasol. Set to "true" if caching @@ -280,7 +296,7 @@ autoscaled cluster, as well as parameters to control the level of provisioning. if using auto-scaling. This should be provided as a comma-separated list of the same length as the list of node types. default=0 - --maxNodes MAXNODES Maximum number of nodes of each type in the cluster, + --maxNodes MAXNODES Maximum number of nodes of each type in the cluster, Maximum number of nodes of each type in the cluster, if using autoscaling, provided as a comma-separated list. The first value is used as a default if the list length is less than the number of nodeTypes. @@ -363,7 +379,7 @@ from the batch system. Only applicable to jobs that do not specify an explicit value for this requirement. Standard suffixes like K, Ki, M, Mi, G or Gi are supported. Default is - 2.0G + 2.0Gi --defaultCores FLOAT The default number of CPU cores to dedicate a job. Only applicable to jobs that do not specify an explicit value for this requirement. Fractions of a @@ -374,7 +390,7 @@ from the batch system. Only applicable to jobs that do not specify an explicit value for this requirement. Standard suffixes like K, Ki, M, Mi, G or Gi are supported. Default is - 2.0G + 2.0Gi --defaultAccelerators ACCELERATOR The default amount of accelerators to request for a job. Only applicable to jobs that do not specify an diff --git a/requirements.txt b/requirements.txt index 5c58492dd2..a2c6d9e9ad 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,4 +10,7 @@ PyPubSub >=4.0.3, <5 addict>=2.2.1, <2.5 pytz>=2012 enlighten>=1.5.2, <2 +configargparse>=1.7,<2 +ruamel.yaml>=0.15 +pyyaml>=6,<7 typing-extensions>=4.6.2, <5 diff --git a/src/toil/batchSystems/awsBatch.py b/src/toil/batchSystems/awsBatch.py index cee9ea3a91..cd1a5bb5da 100644 --- a/src/toil/batchSystems/awsBatch.py +++ b/src/toil/batchSystems/awsBatch.py @@ -559,17 +559,17 @@ def killBatchJobs(self, job_ids: List[int]) -> None: @classmethod def add_options(cls, parser: Union[ArgumentParser, _ArgumentGroup]) -> None: - parser.add_argument("--awsBatchRegion", dest="aws_batch_region", default=None, + parser.add_argument("--awsBatchRegion", dest="aws_batch_region", default=None, env_var="TOIL_AWS_REGION", help="The AWS region containing the AWS Batch queue to submit to.") - parser.add_argument("--awsBatchQueue", dest="aws_batch_queue", default=None, + parser.add_argument("--awsBatchQueue", dest="aws_batch_queue", default=None, env_var="TOIL_AWS_BATCH_QUEUE", help="The name or ARN of the AWS Batch queue to submit to.") - parser.add_argument("--awsBatchJobRoleArn", dest="aws_batch_job_role_arn", default=None, + parser.add_argument("--awsBatchJobRoleArn", dest="aws_batch_job_role_arn", default=None, env_var="TOIL_AWS_BATCH_JOB_ROLE_ARN", help=("The ARN of an IAM role to run AWS Batch jobs as, so they " "can e.g. access a job store. Must be assumable by " "ecs-tasks.amazonaws.com.")) @classmethod def setOptions(cls, setOption: OptionSetter) -> None: - setOption("aws_batch_region", default=None) - setOption("aws_batch_queue", default=None, env=["TOIL_AWS_BATCH_QUEUE"]) - setOption("aws_batch_job_role_arn", default=None, env=["TOIL_AWS_BATCH_JOB_ROLE_ARN"]) + setOption("aws_batch_region") + setOption("aws_batch_queue") + setOption("aws_batch_job_role_arn") diff --git a/src/toil/batchSystems/kubernetes.py b/src/toil/batchSystems/kubernetes.py index 2910f0ea91..7f87db8bd2 100644 --- a/src/toil/batchSystems/kubernetes.py +++ b/src/toil/batchSystems/kubernetes.py @@ -152,6 +152,7 @@ def __init__(self, config: Config, maxCores: int, maxMemory: int, maxDisk: int) super().__init__(config, maxCores, maxMemory, maxDisk) # Re-type the config to make sure it has all the fields we need. + # This convinces MyPy we really do have this type. assert isinstance(config, KubernetesBatchSystem.KubernetesConfig) # Turn down log level for Kubernetes modules and dependencies. @@ -167,26 +168,26 @@ def __init__(self, config: Config, maxCores: int, maxMemory: int, maxDisk: int) self._apis: KubernetesBatchSystem._ApiStorageDict = {} # Get our namespace (and our Kubernetes credentials to make sure they exist) - self.namespace = self._api('namespace') + self.namespace: str = self._api('namespace') # Decide if we are going to mount a Kubernetes host path as the Toil # work dir in the workers, for shared caching. - self.host_path = config.kubernetes_host_path + self.host_path: Optional[str] = config.kubernetes_host_path # Get the service account name to use, if any. - self.service_account = config.kubernetes_service_account + self.service_account: Optional[str] = config.kubernetes_service_account # Get how long we should wait for a pod that lands on a node to # actually start. - self.pod_timeout = config.kubernetes_pod_timeout + self.pod_timeout: float = config.kubernetes_pod_timeout # Get the username to mark jobs with - username = config.kubernetes_owner + username = config.kubernetes_owner or self.get_default_kubernetes_owner() # And a unique ID for the run self.unique_id = uuid.uuid4() # Create a prefix for jobs, starting with our username - self.job_prefix = f'{username}-toil-{self.unique_id}-' + self.job_prefix: str = f'{username}-toil-{self.unique_id}-' # Instead of letting Kubernetes assign unique job names, we assign our # own based on a numerical job ID. This functionality is managed by the # BatchSystemLocalSupport. @@ -199,17 +200,17 @@ def __init__(self, config: Config, maxCores: int, maxMemory: int, maxDisk: int) # conformance tests. To work around this, we tag all our jobs with an # explicit TTL that is long enough that we're sure we can deal with all # the finished jobs before they expire. - self.finished_job_ttl = 3600 # seconds + self.finished_job_ttl: int = 3600 # seconds # Here is where we will store the user script resource object if we get one. self.user_script: Optional[Resource] = None # Ge the image to deploy from Toil's configuration - self.docker_image = applianceSelf() + self.docker_image: str = applianceSelf() # Try and guess what Toil work dir the workers will use. # We need to be able to provision (possibly shared) space there. - self.worker_work_dir = Toil.getToilWorkDir(config.workDir) + self.worker_work_dir: str = Toil.getToilWorkDir(config.workDir) if (config.workDir is None and os.getenv('TOIL_WORKDIR') is None and self.worker_work_dir == tempfile.gettempdir()): @@ -226,17 +227,17 @@ def __init__(self, config: Config, maxCores: int, maxMemory: int, maxDisk: int) self.environment['TMPDIR'] = '/var/tmp' # Get the name of the AWS secret, if any, to mount in containers. - self.aws_secret_name = os.environ.get("TOIL_AWS_SECRET_NAME", None) + self.aws_secret_name: Optional[str] = os.environ.get("TOIL_AWS_SECRET_NAME", None) # Set this to True to enable the experimental wait-for-job-update code - self.enable_watching = os.environ.get("KUBE_WATCH_ENABLED", False) + self.enable_watching: bool = os.environ.get("KUBE_WATCH_ENABLED", False) # This will be a label to select all our jobs. - self.run_id = f'toil-{self.unique_id}' + self.run_id: str = f'toil-{self.unique_id}' # Keep track of available resources. maxMillicores = int(SYS_MAX_SIZE if self.maxCores == SYS_MAX_SIZE else self.maxCores * 1000) - self.resource_sources = [ + self.resource_sources: List[ResourcePool] = [ # A pool representing available job slots ResourcePool(self.config.max_jobs, 'job slots'), # A pool representing available CPU in units of millicores (1 CPU @@ -261,16 +262,16 @@ def __init__(self, config: Config, maxCores: int, maxMemory: int, maxDisk: int) self._killed_queue_jobs: Set[int] = set() # We use this event to signal shutdown - self._shutting_down = Event() + self._shutting_down: Event = Event() # A lock to protect critical regions when working with queued jobs. - self._mutex = RLock() + self._mutex: RLock = RLock() # A condition set to true when there is more work to do. e.g.: new job # in the queue or any resource becomes available. - self._work_available = Condition(lock=self._mutex) + self._work_available: Condition = Condition(lock=self._mutex) - self.schedulingThread = Thread(target=self._scheduler, daemon=True) + self.schedulingThread: Thread = Thread(target=self._scheduler, daemon=True) self.schedulingThread.start() def _pretty_print(self, kubernetes_object: Any) -> str: @@ -1864,24 +1865,25 @@ class KubernetesConfig(Protocol): @classmethod def add_options(cls, parser: Union[ArgumentParser, _ArgumentGroup]) -> None: - parser.add_argument("--kubernetesHostPath", dest="kubernetes_host_path", default=None, + parser.add_argument("--kubernetesHostPath", dest="kubernetes_host_path", default=None, env_var="TOIL_KUBERNETES_HOST_PATH", help="Path on Kubernetes hosts to use as shared inter-pod temp directory. " "(default: %(default)s)") - parser.add_argument("--kubernetesOwner", dest="kubernetes_owner", default=cls.get_default_kubernetes_owner(), - help="Username to mark Kubernetes jobs with. " - "(default: %(default)s)") - parser.add_argument("--kubernetesServiceAccount", dest="kubernetes_service_account", default=None, + parser.add_argument("--kubernetesOwner", dest="kubernetes_owner", default=None, env_var="TOIL_KUBERNETES_OWNER", + help=f"Username to mark Kubernetes jobs with. If the provided value is None, the value will " + f"be generated at runtime. " + f"(Generated default: {cls.get_default_kubernetes_owner()})") + parser.add_argument("--kubernetesServiceAccount", dest="kubernetes_service_account", default=None, env_var="TOIL_KUBERNETES_SERVICE_ACCOUNT", help="Service account to run jobs as. " "(default: %(default)s)") - parser.add_argument("--kubernetesPodTimeout", dest="kubernetes_pod_timeout", default=120, + parser.add_argument("--kubernetesPodTimeout", dest="kubernetes_pod_timeout", default=120, env_var="TOIL_KUBERNETES_POD_TIMEOUT", type=float, help="Seconds to wait for a scheduled Kubernetes pod to start running. " "(default: %(default)s)") OptionType = TypeVar('OptionType') @classmethod def setOptions(cls, setOption: OptionSetter) -> None: - setOption("kubernetes_host_path", default=None, env=['TOIL_KUBERNETES_HOST_PATH']) - setOption("kubernetes_owner", default=cls.get_default_kubernetes_owner(), env=['TOIL_KUBERNETES_OWNER']) - setOption("kubernetes_service_account", default=None, env=['TOIL_KUBERNETES_SERVICE_ACCOUNT']) - setOption("kubernetes_pod_timeout", default=120, env=['TOIL_KUBERNETES_POD_TIMEOUT']) + setOption("kubernetes_host_path") + setOption("kubernetes_owner") + setOption("kubernetes_service_account",) + setOption("kubernetes_pod_timeout") diff --git a/src/toil/batchSystems/local_support.py b/src/toil/batchSystems/local_support.py index 5e3e0edb16..a7008cd30d 100644 --- a/src/toil/batchSystems/local_support.py +++ b/src/toil/batchSystems/local_support.py @@ -19,6 +19,7 @@ from toil.batchSystems.singleMachine import SingleMachineBatchSystem from toil.common import Config from toil.job import JobDescription +from toil.lib.threading import cpu_count logger = logging.getLogger(__name__) @@ -28,8 +29,9 @@ class BatchSystemLocalSupport(BatchSystemSupport): def __init__(self, config: Config, maxCores: float, maxMemory: int, maxDisk: int) -> None: super().__init__(config, maxCores, maxMemory, maxDisk) + max_local_jobs = config.max_local_jobs if config.max_local_jobs is not None else cpu_count() self.localBatch: SingleMachineBatchSystem = SingleMachineBatchSystem( - config, maxCores, maxMemory, maxDisk, max_jobs=config.max_local_jobs + config, maxCores, maxMemory, maxDisk, max_jobs=max_local_jobs ) def handleLocalJob(self, jobDesc: JobDescription) -> Optional[int]: diff --git a/src/toil/batchSystems/mesos/batchSystem.py b/src/toil/batchSystems/mesos/batchSystem.py index 30a1761fb6..09f0580fd7 100644 --- a/src/toil/batchSystems/mesos/batchSystem.py +++ b/src/toil/batchSystems/mesos/batchSystem.py @@ -93,7 +93,7 @@ def __init__(self, config, maxCores, maxMemory, maxDisk): self.jobQueues = JobQueue() # Address of the Mesos master in the form host:port where host can be an IP or a hostname - self.mesos_endpoint = config.mesos_endpoint + self.mesos_endpoint = config.mesos_endpoint or self.get_default_mesos_endpoint() if config.mesos_role is not None: self.mesos_role = config.mesos_role self.mesos_name = config.mesos_name @@ -846,8 +846,10 @@ def get_default_mesos_endpoint(cls) -> str: @classmethod def add_options(cls, parser: Union[ArgumentParser, _ArgumentGroup]) -> None: - parser.add_argument("--mesosEndpoint", "--mesosMaster", dest="mesos_endpoint", default=cls.get_default_mesos_endpoint(), - help="The host and port of the Mesos master separated by colon. (default: %(default)s)") + parser.add_argument("--mesosEndpoint", "--mesosMaster", dest="mesos_endpoint", default=None, + help=f"The host and port of the Mesos master separated by colon. If the provided value " + f"is None, the value will be generated at runtime. " + f"(Generated default: {cls.get_default_mesos_endpoint})") parser.add_argument("--mesosFrameworkId", dest="mesos_framework_id", help="Use a specific Mesos framework ID.") parser.add_argument("--mesosRole", dest="mesos_role", @@ -857,8 +859,8 @@ def add_options(cls, parser: Union[ArgumentParser, _ArgumentGroup]) -> None: @classmethod def setOptions(cls, setOption: OptionSetter): - setOption("mesos_endpoint", None, None, cls.get_default_mesos_endpoint(), old_names=["mesosMasterAddress"]) - setOption("mesos_name", None, None, "toil") + setOption("mesos_endpoint", old_names=["mesosMasterAddress"]) + setOption("mesos_name") setOption("mesos_role") setOption("mesos_framework_id") diff --git a/src/toil/batchSystems/options.py b/src/toil/batchSystems/options.py index ecf3734da1..f028a64da9 100644 --- a/src/toil/batchSystems/options.py +++ b/src/toil/batchSystems/options.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and import logging -import os import sys from argparse import ArgumentParser, _ArgumentGroup from typing import Any, Callable, List, Optional, TypeVar, Union, cast @@ -70,17 +69,18 @@ def set_batchsystem_options(batch_system: Optional[str], set_option: OptionSette # Ask the batch system to tell us all its options. batch_system_type.setOptions(set_option) # Options shared between multiple batch systems - set_option("disableAutoDeployment", bool, default=False) + set_option("disableAutoDeployment") # Make limits maximum if set to 0 - set_option("max_jobs", lambda x: int(x) or sys.maxsize) - set_option("max_local_jobs", lambda x: int(x) or sys.maxsize) + set_option("max_jobs") + set_option("max_local_jobs") set_option("manualMemArgs") - set_option("run_local_jobs_on_workers", bool, default=False) + set_option("run_local_jobs_on_workers") set_option("statePollingWait") - set_option("batch_logs_dir", env=["TOIL_BATCH_LOGS_DIR"]) + set_option("batch_logs_dir") def add_all_batchsystem_options(parser: Union[ArgumentParser, _ArgumentGroup]) -> None: + from toil.common import SYS_MAX_SIZE # Do the global cross-batch-system arguments parser.add_argument( "--batchSystem", @@ -93,23 +93,23 @@ def add_all_batchsystem_options(parser: Union[ArgumentParser, _ArgumentGroup]) - parser.add_argument( "--disableHotDeployment", dest="disableAutoDeployment", + default=False, action="store_true", - default=None, help="Hot-deployment was renamed to auto-deployment. Option now redirects to " "--disableAutoDeployment. Left in for backwards compatibility.", ) parser.add_argument( "--disableAutoDeployment", dest="disableAutoDeployment", - action="store_true", - default=None, + default=False, help="Should auto-deployment of the user script be deactivated? If True, the user " "script/package should be present at the same location on all workers. Default = False.", ) parser.add_argument( "--maxJobs", dest="max_jobs", - default=sys.maxsize, # This is *basically* unlimited and saves a lot of Optional[] + default=SYS_MAX_SIZE, # This is *basically* unlimited and saves a lot of Optional[] + type=lambda x: int(x) or SYS_MAX_SIZE, help="Specifies the maximum number of jobs to submit to the " "backing scheduler at once. Not supported on Mesos or " "AWS Batch. Use 0 for unlimited. Defaults to unlimited.", @@ -117,7 +117,8 @@ def add_all_batchsystem_options(parser: Union[ArgumentParser, _ArgumentGroup]) - parser.add_argument( "--maxLocalJobs", dest="max_local_jobs", - default=cpu_count(), + default=None, + type=lambda x: int(x) or 0, help=f"Specifies the maximum number of housekeeping jobs to " f"run sumultaneously on the local system. Use 0 for " f"unlimited. Defaults to the number of local cores ({cpu_count()}).", @@ -125,18 +126,18 @@ def add_all_batchsystem_options(parser: Union[ArgumentParser, _ArgumentGroup]) - parser.add_argument( "--manualMemArgs", default=False, - action="store_true", dest="manualMemArgs", + action="store_true", help="Do not add the default arguments: 'hv=MEMORY' & 'h_vmem=MEMORY' to the qsub " "call, and instead rely on TOIL_GRIDGENGINE_ARGS to supply alternative arguments. " "Requires that TOIL_GRIDGENGINE_ARGS be set.", ) parser.add_argument( - "--runLocalJobsOnWorkers" + "--runLocalJobsOnWorkers", "--runCwlInternalJobsOnWorkers", dest="run_local_jobs_on_workers", + default=False, action="store_true", - default=None, help="Whether to run jobs marked as local (e.g. CWLScatter) on the worker nodes " "instead of the leader node. If false (default), then all such jobs are run on " "the leader node. Setting this to true can speed up CWL pipelines for very large " @@ -166,6 +167,7 @@ def add_all_batchsystem_options(parser: Union[ArgumentParser, _ArgumentGroup]) - "--batchLogsDir", dest="batch_logs_dir", default=None, + env_var="TOIL_BATCH_LOGS_DIR", help="Directory to tell the backing batch system to log into. Should be available " "on both the leader and the workers, if the backing batch system writes logs " "to the worker machines' filesystems, as many HPC schedulers do. If unset, " @@ -183,58 +185,4 @@ def add_all_batchsystem_options(parser: Union[ArgumentParser, _ArgumentGroup]) - continue # Ask the batch system to create its options in the parser logger.debug('Add options for %s', batch_system_type) - batch_system_type.add_options(parser) - -def set_batchsystem_config_defaults(config) -> None: - """ - Set default and environment-based options for builtin batch systems. This - is required if a Config object is not constructed from an Options object. - """ - - # Do the global options across batch systems - config.batchSystem = "single_machine" - config.disableAutoDeployment = False - config.max_jobs = sys.maxsize - config.max_local_jobs = cpu_count() - config.manualMemArgs = False - config.statePollingWait: Optional[Union[float, int]] = None # Number of seconds to wait before querying job state - - OptionType = TypeVar('OptionType') - def set_option(option_name: str, - parsing_function: Optional[Callable[[Any], OptionType]] = None, - check_function: Optional[Callable[[OptionType], None]] = None, - default: Optional[OptionType] = None, - env: Optional[List[str]] = None, - old_names: Optional[List[str]] = None) -> None: - """ - Function to set a batch-system-defined option to its default value, or - one from the environment. - """ - - # TODO: deduplicate with Config - - option_value = default - - if env is not None: - for env_var in env: - # Try all the environment variables - if option_value != default: - break - option_value = os.environ.get(env_var, default) - - if option_value is not None or not hasattr(config, option_name): - if parsing_function is not None: - # Parse whatever it is (string, argparse-made list, etc.) - option_value = parsing_function(option_value) - if check_function is not None: - try: - check_function(option_value) - except AssertionError: - raise RuntimeError(f"The {option_name} option has an invalid value: {option_value}") - setattr(config, option_name, option_value) - - # Set up defaults from all the batch systems - set_batchsystem_options(None, cast(OptionSetter, set_option)) - - - + batch_system_type.add_options(parser) \ No newline at end of file diff --git a/src/toil/batchSystems/parasol.py b/src/toil/batchSystems/parasol.py index 5f7921907f..eff590b79c 100644 --- a/src/toil/batchSystems/parasol.py +++ b/src/toil/batchSystems/parasol.py @@ -22,12 +22,12 @@ from queue import Empty, Queue from shutil import which from threading import Thread -from typing import Dict, Optional, Union +from typing import Dict, Optional, Union, Type, Any from toil.batchSystems.abstractBatchSystem import (BatchSystemSupport, UpdatedBatchJobInfo) from toil.batchSystems.options import OptionSetter -from toil.common import SYS_MAX_SIZE, Toil +from toil.common import SYS_MAX_SIZE, Toil, make_open_interval_action from toil.lib.iterables import concat from toil.test import get_temp_file @@ -365,15 +365,14 @@ def shutdown(self) -> None: @classmethod def add_options(cls, parser: Union[ArgumentParser, _ArgumentGroup]) -> None: - parser.add_argument("--parasolCommand", dest="parasolCommand", default='parasol', + parser.add_argument("--parasol_command", "--parasolCommand", dest="parasolCommand", default='parasol', help="The name or path of the parasol program. Will be looked up on PATH " "unless it starts with a slash. (default: %(default)s).") - parser.add_argument("--parasolMaxBatches", dest="parasolMaxBatches", default=1000, + parser.add_argument("--parasol_max_batches", "--parasolMaxBatches", dest="parasolMaxBatches", default=1000, type=int, action=make_open_interval_action(1), help="Maximum number of job batches the Parasol batch is allowed to create. One batch is " "created for jobs with a a unique set of resource requirements. (default: %(default)s).") @classmethod def setOptions(cls, setOption: OptionSetter): - from toil.common import iC - setOption("parasolCommand", None, None, 'parasol') - setOption("parasolMaxBatches", int, iC(1), 10000) + setOption("parasolCommand") + setOption("parasolMaxBatches") diff --git a/src/toil/batchSystems/singleMachine.py b/src/toil/batchSystems/singleMachine.py index 6fa0fa6638..2fd613bc40 100644 --- a/src/toil/batchSystems/singleMachine.py +++ b/src/toil/batchSystems/singleMachine.py @@ -36,7 +36,7 @@ from toil.bus import ExternalBatchIdMessage from toil.batchSystems.options import OptionSetter -from toil.common import SYS_MAX_SIZE, Config, Toil, fC +from toil.common import SYS_MAX_SIZE, Config, Toil, make_open_interval_action from toil.job import JobDescription, AcceleratorRequirement, accelerator_satisfies, Requirer from toil.lib.accelerators import get_individual_local_accelerators, get_restrictive_environment_for_local_accelerators from toil.lib.threading import cpu_count @@ -104,7 +104,7 @@ def __init__( logger.warning('Not enough cores! User limited to %i but we only have %i.', maxCores, self.numCores) maxCores = self.numCores if maxMemory > self.physicalMemory: - if maxMemory != SYS_MAX_SIZE: + if maxMemory < SYS_MAX_SIZE: # todo: looks like humans2bytes converts SYS_MAX_SIZE to SYS_MAX_SIZE+1 # We have an actually specified limit and not the default logger.warning('Not enough memory! User limited to %i bytes but we only have %i bytes.', maxMemory, self.physicalMemory) maxMemory = self.physicalMemory @@ -112,7 +112,7 @@ def __init__( workdir = Toil.getLocalWorkflowDir(config.workflowID, config.workDir) # config.workDir may be None; this sets a real directory self.physicalDisk = toil.physicalDisk(workdir) if maxDisk > self.physicalDisk: - if maxDisk != SYS_MAX_SIZE: + if maxDisk < SYS_MAX_SIZE: # same as maxMemory logger.warning # We have an actually specified limit and not the default logger.warning('Not enough disk space! User limited to %i bytes but we only have %i bytes.', maxDisk, self.physicalDisk) maxDisk = self.physicalDisk @@ -843,7 +843,7 @@ def getUpdatedBatchJob(self, maxWait: int) -> Optional[UpdatedBatchJobInfo]: @classmethod def add_options(cls, parser: Union[ArgumentParser, _ArgumentGroup]) -> None: - parser.add_argument("--scale", dest="scale", default=1, + parser.add_argument("--scale", dest="scale", type=float, default=1, action=make_open_interval_action(0.0), help="A scaling factor to change the value of all submitted tasks's submitted cores. " "Used in the single_machine batch system. Useful for running workflows on " "smaller machines than they were designed for, by setting a value less than 1. " @@ -851,7 +851,7 @@ def add_options(cls, parser: Union[ArgumentParser, _ArgumentGroup]) -> None: @classmethod def setOptions(cls, setOption: OptionSetter): - setOption("scale", float, fC(0.0), default=1) + setOption("scale") class Info: diff --git a/src/toil/batchSystems/slurm.py b/src/toil/batchSystems/slurm.py index 3656d86119..7f9224050a 100644 --- a/src/toil/batchSystems/slurm.py +++ b/src/toil/batchSystems/slurm.py @@ -387,5 +387,5 @@ def add_options(cls, parser: Union[ArgumentParser, _ArgumentGroup]): OptionType = TypeVar('OptionType') @classmethod def setOptions(cls, setOption: OptionSetter) -> None: - setOption("allocate_mem", bool, default=False) + setOption("allocate_mem") diff --git a/src/toil/batchSystems/tes.py b/src/toil/batchSystems/tes.py index 9cfa75525e..90a155e017 100644 --- a/src/toil/batchSystems/tes.py +++ b/src/toil/batchSystems/tes.py @@ -74,7 +74,8 @@ def get_default_tes_endpoint(cls) -> str: def __init__(self, config: Config, maxCores: float, maxMemory: int, maxDisk: int) -> None: super().__init__(config, maxCores, maxMemory, maxDisk) # Connect to TES, using Funnel-compatible environment variables to fill in credentials if not specified. - self.tes = tes.HTTPClient(config.tes_endpoint, + tes_endpoint = config.tes_endpoint or self.get_default_tes_endpoint() + self.tes = tes.HTTPClient(tes_endpoint, user=config.tes_user, password=config.tes_password, token=config.tes_bearer_token) @@ -439,13 +440,15 @@ def killBatchJobs(self, job_ids: List[int]) -> None: @classmethod def add_options(cls, parser: Union[ArgumentParser, _ArgumentGroup]) -> None: - parser.add_argument("--tesEndpoint", dest="tes_endpoint", default=cls.get_default_tes_endpoint(), - help="The http(s) URL of the TES server. (default: %(default)s)") - parser.add_argument("--tesUser", dest="tes_user", default=None, + parser.add_argument("--tesEndpoint", dest="tes_endpoint", default=None, env_var="TOIL_TES_ENDPOINT", + help=f"The http(s) URL of the TES server. If the provided value is None, the value will be " + f"generated at runtime. " + f"(Generated default: {cls.get_default_tes_endpoint()})") + parser.add_argument("--tesUser", dest="tes_user", default=None, env_var="TOIL_TES_USER", help="User name to use for basic authentication to TES server.") - parser.add_argument("--tesPassword", dest="tes_password", default=None, + parser.add_argument("--tesPassword", dest="tes_password", default=None, env_var="TOIL_TES_PASSWORD", help="Password to use for basic authentication to TES server.") - parser.add_argument("--tesBearerToken", dest="tes_bearer_token", default=None, + parser.add_argument("--tesBearerToken", dest="tes_bearer_token", default=None, env_var="TOIL_TES_BEARER_TOKEN", help="Bearer token to use for authentication to TES server.") @classmethod @@ -453,7 +456,7 @@ def setOptions(cls, setOption: OptionSetter) -> None: # Because we use the keyword arguments, we can't specify a type for setOption without using Protocols. # TODO: start using Protocols, or just start returning objects to represent the options. # When actually parsing options, remember to check the environment variables - setOption("tes_endpoint", default=cls.get_default_tes_endpoint(), env=["TOIL_TES_ENDPOINT"]) - setOption("tes_user", default=None, env=["TOIL_TES_USER"]) - setOption("tes_password", default=None, env=["TOIL_TES_PASSWORD"]) - setOption("tes_bearer_token", default=None, env=["TOIL_TES_BEARER_TOKEN"]) + setOption("tes_endpoint") + setOption("tes_user") + setOption("tes_password") + setOption("tes_bearer_token") diff --git a/src/toil/common.py b/src/toil/common.py index bbd823a321..eb7053fe18 100644 --- a/src/toil/common.py +++ b/src/toil/common.py @@ -23,10 +23,15 @@ import time import uuid import warnings -from argparse import (ArgumentDefaultsHelpFormatter, + +from ruamel.yaml import YAML +from ruamel.yaml.comments import CommentedMap +from configargparse import ArgParser, YAMLConfigFileParser +from argparse import (SUPPRESS, + ArgumentDefaultsHelpFormatter, ArgumentParser, Namespace, - _ArgumentGroup) + _ArgumentGroup, Action, _StoreFalseAction, _StoreTrueAction, _AppendAction) from distutils.util import strtobool from functools import lru_cache from types import TracebackType @@ -37,7 +42,6 @@ ContextManager, Dict, List, - MutableMapping, Optional, Set, Tuple, @@ -57,7 +61,6 @@ from toil import logProcessContext, lookupEnvVar from toil.batchSystems.options import (add_all_batchsystem_options, - set_batchsystem_config_defaults, set_batchsystem_options) from toil.bus import (ClusterDesiredSizeMessage, ClusterSizeMessage, @@ -66,20 +69,18 @@ JobIssuedMessage, JobMissingMessage, MessageBus, - QueueSizeMessage, - gen_message_bus_path) + QueueSizeMessage) from toil.fileStores import FileID from toil.lib.aws import zone_to_region, build_tag_dict_from_env from toil.lib.compatibility import deprecated from toil.lib.conversions import bytes2human, human2bytes -from toil.lib.io import try_path +from toil.lib.io import try_path, AtomicFileCreate from toil.lib.retry import retry from toil.provisioners import (add_provisioner_options, cluster_factory, parse_node_types) from toil.realtimeLogger import RealtimeLogger from toil.statsAndLogging import (add_logging_options, - root_logger, set_logging_from_options) from toil.version import dockerRegistry, dockerTag, version @@ -102,6 +103,39 @@ UUID_LENGTH = 32 logger = logging.getLogger(__name__) +# TODO: should this use an XDG config directory or ~/.config to not clutter the +# base home directory? +TOIL_HOME_DIR: str = os.path.join(os.path.expanduser("~"), ".toil") +DEFAULT_CONFIG_FILE: str = os.path.join(TOIL_HOME_DIR, "default.yaml") + + +def parse_jobstore(jobstore_uri: str) -> str: + """ + Turn the jobstore string into it's corresponding URI + ex: + /path/to/jobstore -> file:/path/to/jobstore + + If the jobstore string already is a URI, return the jobstore: + aws:/path/to/jobstore -> aws:/path/to/jobstore + :param jobstore_uri: string of the jobstore + :return: URI of the jobstore + """ + name, rest = Toil.parseLocator(jobstore_uri) + if name == 'file': + # We need to resolve relative paths early, on the leader, because the worker process + # may have a different working directory than the leader, e.g. under Mesos. + return Toil.buildLocator(name, os.path.abspath(rest)) + else: + return jobstore_uri + + +def parse_str_list(s: str) -> List[str]: + return [str(x) for x in s.split(",")] + + +def parse_int_list(s: str) -> List[int]: + return [int(x) for x in s.split(",")] + class Config: """Class to represent configuration operations for a toil workflow run.""" @@ -110,118 +144,185 @@ class Config: cleanWorkDir: str max_jobs: int max_local_jobs: int + manualMemArgs: bool run_local_jobs_on_workers: bool + coalesceStatusCalls: bool + parasolCommand: str + parasolMaxBatches: int + mesos_endpoint: Optional[str] + mesos_framework_id: Optional[str] + mesos_role: Optional[str] + mesos_name: str + kubernetes_host_path: Optional[str] + kubernetes_owner: Optional[str] + kubernetes_service_account: Optional[str] + kubernetes_pod_timeout: float tes_endpoint: str tes_user: str tes_password: str tes_bearer_token: str - jobStore: str + aws_batch_region: Optional[str] + aws_batch_queue: Optional[str] + aws_batch_job_role_arn: Optional[str] + scale: float batchSystem: str - batch_logs_dir: Optional[str] = None + batch_logs_dir: Optional[str] """The backing scheduler will be instructed, if possible, to save logs to this directory, where the leader can read them.""" - workflowAttemptNumber: int + statePollingWait: int disableAutoDeployment: bool - def __init__(self) -> None: - # Core options - self.workflowID: Optional[str] = None - """This attribute uniquely identifies the job store and therefore the workflow. It is - necessary in order to distinguish between two consecutive workflows for which - self.jobStore is the same, e.g. when a job store name is reused after a previous run has - finished successfully and its job store has been clean up.""" - self.workflowAttemptNumber: int = 0 - self.jobStore: Optional[str] = None # type: ignore - self.logLevel: str = logging.getLevelName(root_logger.getEffectiveLevel()) - self.workDir: Optional[str] = None - self.coordination_dir: Optional[str] = None - self.noStdOutErr: bool = False - self.stats: bool = False - - # Because the stats option needs the jobStore to persist past the end of the run, - # the clean default value depends the specified stats option and is determined in setOptions - self.clean: Optional[str] = None - self.clusterStats = None - - # Restarting the workflow options - self.restart: bool = False + # Core options + workflowID: Optional[str] + """This attribute uniquely identifies the job store and therefore the workflow. It is + necessary in order to distinguish between two consecutive workflows for which + self.jobStore is the same, e.g. when a job store name is reused after a previous run has + finished successfully and its job store has been clean up.""" + workflowAttemptNumber: int + jobStore: str + logLevel: str + workDir: Optional[str] + coordination_dir: Optional[str] + noStdOutErr: bool + stats: bool - # Batch system options - set_batchsystem_config_defaults(self) + # Because the stats option needs the jobStore to persist past the end of the run, + # the clean default value depends the specified stats option and is determined in setOptions + clean: Optional[str] + clusterStats: str - # File store options - self.caching: Optional[bool] = None - self.linkImports: bool = True - self.moveExports: bool = False + # Restarting the workflow options + restart: bool - # Autoscaling options - self.provisioner: Optional[str] = None - self.nodeTypes: List[Tuple[Set[str], Optional[float]]] = [] - self.minNodes = None - self.maxNodes = [10] - self.targetTime: float = defaultTargetTime - self.betaInertia: float = 0.1 - self.scaleInterval: int = 60 - self.preemptibleCompensation: float = 0.0 - self.nodeStorage: int = 50 - self.nodeStorageOverrides: List[str] = [] - self.metrics: bool = False - self.assume_zero_overhead: bool = False - - # Parameters to limit service jobs, so preventing deadlock scheduling scenarios - self.maxPreemptibleServiceJobs: int = sys.maxsize - self.maxServiceJobs: int = sys.maxsize - self.deadlockWait: Union[float, int] = 60 # Number of seconds we must be stuck with all services before declaring a deadlock - self.deadlockCheckInterval: Union[float, int] = 30 # Minimum polling delay for deadlocks - - # Resource requirements - self.defaultMemory: int = 2147483648 - self.defaultCores: Union[float, int] = 1 - self.defaultDisk: int = 2147483648 - self.defaultPreemptible: bool = False - # TODO: These names are generated programmatically in - # Requirer._fetchRequirement so we can't use snake_case until we fix - # that (and add compatibility getters/setters?) - self.defaultAccelerators: List['AcceleratorRequirement'] = [] - self.maxCores: int = SYS_MAX_SIZE - self.maxMemory: int = SYS_MAX_SIZE - self.maxDisk: int = SYS_MAX_SIZE + # Batch system options - # Retrying/rescuing jobs - self.retryCount: int = 1 - self.enableUnlimitedPreemptibleRetries: bool = False - self.doubleMem: bool = False - self.maxJobDuration: int = sys.maxsize - self.rescueJobsFrequency: int = 60 + # File store options + caching: Optional[bool] + symlinkImports: bool + moveOutputs: bool + + # Autoscaling options + provisioner: Optional[str] + nodeTypes: List[Tuple[Set[str], Optional[float]]] + minNodes: List[int] + maxNodes: List[int] + targetTime: float + betaInertia: float + scaleInterval: int + preemptibleCompensation: float + nodeStorage: int + nodeStorageOverrides: List[str] + metrics: bool + assume_zero_overhead: bool + + # Parameters to limit service jobs, so preventing deadlock scheduling scenarios + maxPreemptibleServiceJobs: int + maxServiceJobs: int + deadlockWait: Union[ + float, int] + deadlockCheckInterval: Union[float, int] - # Log management - self.maxLogFileSize: int = 64000 - self.writeLogs = None - self.writeLogsGzip = None - self.writeLogsFromAllJobs: bool = False - self.write_messages: Optional[str] = None + # Resource requirements + defaultMemory: int + defaultCores: Union[float, int] + defaultDisk: int + defaultPreemptible: bool + # TODO: These names are generated programmatically in + # Requirer._fetchRequirement so we can't use snake_case until we fix + # that (and add compatibility getters/setters?) + defaultAccelerators: List['AcceleratorRequirement'] + maxCores: int + maxMemory: int + maxDisk: int - # Misc - self.environment: Dict[str, str] = {} - self.disableChaining: bool = False - self.disableJobStoreChecksumVerification: bool = False - self.sseKey: Optional[str] = None - self.servicePollingInterval: int = 60 - self.useAsync: bool = True - self.forceDockerAppliance: bool = False - self.statusWait: int = 3600 - self.disableProgress: bool = False - self.readGlobalFileMutableByDefault: bool = False - self.kill_polling_interval: int = 5 + # Retrying/rescuing jobs + retryCount: int + enableUnlimitedPreemptibleRetries: bool + doubleMem: bool + maxJobDuration: int + rescueJobsFrequency: int + + # Log management + maxLogFileSize: int + writeLogs: str + writeLogsGzip: str + writeLogsFromAllJobs: bool + write_messages: Optional[str] + realTimeLogging: bool + + # Misc + environment: Dict[str, str] + disableChaining: bool + disableJobStoreChecksumVerification: bool + sseKey: Optional[str] + servicePollingInterval: int + useAsync: bool + forceDockerAppliance: bool + statusWait: int + disableProgress: bool + readGlobalFileMutableByDefault: bool - # Debug options - self.debugWorker: bool = False - self.disableWorkerOutputCapture: bool = False - self.badWorker = 0.0 - self.badWorkerFailInterval = 0.01 + # Debug options + debugWorker: bool + disableWorkerOutputCapture: bool + badWorker: float + badWorkerFailInterval: float + kill_polling_interval: int - # CWL - self.cwl: bool = False + # CWL + cwl: bool + + def __init__(self) -> None: + # only default options that are not CLI options defined here (thus CLI options are centralized) + self.cwl = False # will probably remove later + self.workflowID = None + self.kill_polling_interval = 5 + + self.set_from_default_config() + + def set_from_default_config(self) -> None: + # get defaults from a config file by simulating an argparse run + # as Config often expects defaults to already be instantiated + if not os.path.exists(DEFAULT_CONFIG_FILE): + # The default config file did not appear to exist when we checked. + # It might exist now, though. Try creating it. + self.generate_config_file() + # Check on the config file to make sure it is sensible + config_status = os.stat(DEFAULT_CONFIG_FILE) + if config_status.st_size == 0: + # If we have an empty config file, someone has to manually delete + # it before we will work again. + raise RuntimeError( + f"Config file {DEFAULT_CONFIG_FILE} exists but is empty. Delete it! Stat says: {config_status}") + logger.debug("Loading %s byte default configuration", config_status.st_size) + try: + with open(DEFAULT_CONFIG_FILE, "r") as f: + yaml = YAML(typ="safe") + s = yaml.load(f) + logger.debug("Loaded default configuration: %s", json.dumps(s)) + except: + # Something went wrong reading the default config, so dump its + # contents to the log. + logger.info("Configuration file contents: %s", open(DEFAULT_CONFIG_FILE, 'r').read()) + raise + parser = ArgParser() + addOptions(parser, jobstore_as_flag=True) + ns = parser.parse_args(f"--config={DEFAULT_CONFIG_FILE}") + self.setOptions(ns) + + def generate_config_file(self) -> None: + """ + If the default config file does not exist, create it. + + Raises an error if the default config file cannot be created. + Safe to run simultaneously in multiple processes. If this process runs + this function, it will always see the default config file existing with + parseable contents, even if other processes are racing to create it. + + No process will see an empty or partially-written default config file. + """ + check_and_create_toil_home_dir() + generate_config(DEFAULT_CONFIG_FILE) def prepare_start(self) -> None: """ @@ -240,16 +341,10 @@ def prepare_restart(self) -> None: # exist and that can't safely be re-made. self.write_messages = None - def setOptions(self, options: Namespace) -> None: """Creates a config object from the options object.""" - OptionType = TypeVar("OptionType") def set_option(option_name: str, - parsing_function: Optional[Callable[[Any], OptionType]] = None, - check_function: Optional[Callable[[OptionType], Union[None, bool]]] = None, - default: Optional[OptionType] = None, - env: Optional[List[str]] = None, old_names: Optional[List[str]] = None) -> None: """ Determine the correct value for the given option. @@ -269,194 +364,132 @@ def set_option(option_name: str, If the option gets a non-None value, sets it as an attribute in this Config. """ - option_value = getattr(options, option_name, default) + option_value = getattr(options, option_name, None) if old_names is not None: for old_name in old_names: + # If the option is already set with the new name and not the old name + # prioritize the new name over the old name and break + if option_value is not None and option_value != [] and option_value != {}: + break # Try all the old names in case user code is setting them # in an options object. - if option_value != default: - break - if hasattr(options, old_name): + # This does assume that all deprecated options have a default value of None + if getattr(options, old_name, None) is not None: warnings.warn(f'Using deprecated option field {old_name} to ' f'provide value for config field {option_name}', DeprecationWarning) option_value = getattr(options, old_name) - - if env is not None: - for env_var in env: - # Try all the environment variables - if option_value != default: - break - option_value = os.environ.get(env_var, default) - if option_value is not None or not hasattr(self, option_name): - if parsing_function is not None: - # Parse whatever it is (string, argparse-made list, etc.) - option_value = parsing_function(option_value) - if check_function is not None: - try: - check_function(option_value) # type: ignore - except AssertionError: - raise RuntimeError(f"The {option_name} option has an invalid value: {option_value}") setattr(self, option_name, option_value) - # Function to parse integer from string expressed in different formats - h2b = lambda x: human2bytes(str(x)) - - def parse_jobstore(jobstore_uri: str) -> str: - name, rest = Toil.parseLocator(jobstore_uri) - if name == 'file': - # We need to resolve relative paths early, on the leader, because the worker process - # may have a different working directory than the leader, e.g. under Mesos. - return Toil.buildLocator(name, os.path.abspath(rest)) - else: - return jobstore_uri - - def parse_str_list(s: str) -> List[str]: - return [str(x) for x in s.split(",")] - - def parse_int_list(s: str) -> List[int]: - return [int(x) for x in s.split(",")] - # Core options - set_option("jobStore", parsing_function=parse_jobstore) + set_option("jobStore") # TODO: LOG LEVEL STRING set_option("workDir") - if self.workDir is not None: - self.workDir = os.path.abspath(self.workDir) - if not os.path.exists(self.workDir): - raise RuntimeError(f"The path provided to --workDir ({self.workDir}) does not exist.") - - if len(self.workDir) > 80: - logger.warning(f'Length of workDir path "{self.workDir}" is {len(self.workDir)} characters. ' - f'Consider setting a shorter path with --workPath or setting TMPDIR to something ' - f'like "/tmp" to avoid overly long paths.') set_option("coordination_dir") - if self.coordination_dir is not None: - self.coordination_dir = os.path.abspath(self.coordination_dir) - if not os.path.exists(self.coordination_dir): - raise RuntimeError(f"The path provided to --coordinationDir ({self.coordination_dir}) does not exist.") set_option("noStdOutErr") set_option("stats") set_option("cleanWorkDir") set_option("clean") - if self.stats: - if self.clean != "never" and self.clean is not None: - raise RuntimeError("Contradicting options passed: Clean flag is set to %s " - "despite the stats flag requiring " - "the jobStore to be intact at the end of the run. " - "Set clean to \'never\'" % self.clean) - self.clean = "never" - elif self.clean is None: - self.clean = "onSuccess" set_option('clusterStats') set_option("restart") # Batch system options set_option("batchSystem") - set_batchsystem_options(self.batchSystem, cast("OptionSetter", set_option)) + set_batchsystem_options(None, cast("OptionSetter", + set_option)) # None as that will make set_batchsystem_options iterate through all batch systems and set their corresponding values # File store options - set_option("linkImports", bool, default=True) - set_option("moveExports", bool, default=False) - set_option("caching", bool, default=None) + set_option("symlinkImports", old_names=["linkImports"]) + set_option("moveOutputs", old_names=["moveExports"]) + set_option("caching", old_names=["enableCaching"]) # Autoscaling options set_option("provisioner") - set_option("nodeTypes", parse_node_types) - set_option("minNodes", parse_int_list) - set_option("maxNodes", parse_int_list) - set_option("targetTime", int) - if self.targetTime <= 0: - raise RuntimeError(f'targetTime ({self.targetTime}) must be a positive integer!') - set_option("betaInertia", float) - if not 0.0 <= self.betaInertia <= 0.9: - raise RuntimeError(f'betaInertia ({self.betaInertia}) must be between 0.0 and 0.9!') - set_option("scaleInterval", float) + set_option("nodeTypes") + set_option("minNodes") + set_option("maxNodes") + set_option("targetTime") + set_option("betaInertia") + set_option("scaleInterval") set_option("metrics") set_option("assume_zero_overhead") - set_option("preemptibleCompensation", float) - if not 0.0 <= self.preemptibleCompensation <= 1.0: - raise RuntimeError(f'preemptibleCompensation ({self.preemptibleCompensation}) must be between 0.0 and 1.0!') - set_option("nodeStorage", int) - - def check_nodestoreage_overrides(overrides: List[str]) -> bool: - for override in overrides: - tokens = override.split(":") - if len(tokens) != 2: - raise ValueError("Each component of --nodeStorageOverrides must be of the form :") - if not any(tokens[0] in n[0] for n in self.nodeTypes): - raise ValueError("Instance type in --nodeStorageOverrides must be in --nodeTypes") - if not tokens[1].isdigit(): - raise ValueError("storage must be an integer in --nodeStorageOverrides") - return True - set_option("nodeStorageOverrides", parse_str_list, check_function=check_nodestoreage_overrides) - - # Parameters to limit service jobs / detect deadlocks - set_option("maxServiceJobs", int) - set_option("maxPreemptibleServiceJobs", int) - set_option("deadlockWait", int) - set_option("deadlockCheckInterval", int) - - # Resource requirements - set_option("defaultMemory", h2b, iC(1)) - set_option("defaultCores", float, fC(1.0)) - set_option("defaultDisk", h2b, iC(1)) - set_option("defaultAccelerators", parse_accelerator_list) - set_option("readGlobalFileMutableByDefault") - set_option("maxCores", int, iC(1)) - set_option("maxMemory", h2b, iC(1)) - set_option("maxDisk", h2b, iC(1)) + set_option("preemptibleCompensation") + set_option("nodeStorage") + + set_option("nodeStorageOverrides") + + if self.cwl is False: + # Parameters to limit service jobs / detect deadlocks + set_option("maxServiceJobs") + set_option("maxPreemptibleServiceJobs") + set_option("deadlockWait") + set_option("deadlockCheckInterval") + + set_option("defaultMemory") + set_option("defaultCores") + set_option("defaultDisk") + set_option("defaultAccelerators") + set_option("maxCores") + set_option("maxMemory") + set_option("maxDisk") set_option("defaultPreemptible") # Retrying/rescuing jobs - set_option("retryCount", int, iC(1)) + set_option("retryCount") set_option("enableUnlimitedPreemptibleRetries") set_option("doubleMem") - set_option("maxJobDuration", int, iC(1)) - set_option("rescueJobsFrequency", int, iC(1)) + set_option("maxJobDuration") + set_option("rescueJobsFrequency") # Log management - set_option("maxLogFileSize", h2b, iC(1)) + set_option("maxLogFileSize") set_option("writeLogs") set_option("writeLogsGzip") set_option("writeLogsFromAllJobs") - set_option("write_messages", os.path.abspath) - - if not self.write_messages: - # The user hasn't specified a place for the message bus so we - # should make one. - self.write_messages = gen_message_bus_path() - - if self.writeLogs and self.writeLogsGzip: - raise RuntimeError("Cannot use both --writeLogs and --writeLogsGzip at the same time.") - - if self.writeLogsFromAllJobs and not self.writeLogs and not self.writeLogsGzip: - raise RuntimeError("To enable --writeLogsFromAllJobs, either --writeLogs or --writeLogsGzip must be set.") + set_option("write_messages") # Misc - set_option("environment", parseSetEnv) + set_option("environment") + set_option("disableChaining") set_option("disableJobStoreChecksumVerification") - set_option("statusWait", int) + set_option("statusWait") set_option("disableProgress") - def check_sse_key(sse_key: str) -> None: - with open(sse_key) as f: - if len(f.readline().rstrip()) != 32: - raise RuntimeError("SSE key appears to be invalid.") - - set_option("sseKey", check_function=check_sse_key) - set_option("servicePollingInterval", float, fC(0.0)) + set_option("sseKey") + set_option("servicePollingInterval") set_option("forceDockerAppliance") # Debug options set_option("debugWorker") set_option("disableWorkerOutputCapture") - set_option("badWorker", float, fC(0.0, 1.0)) - set_option("badWorkerFailInterval", float, fC(0.0)) + set_option("badWorker") + set_option("badWorkerFailInterval") + set_option("logLevel") + + self.check_configuration_consistency() + + def check_configuration_consistency(self) -> None: + """Old checks that cannot be fit into an action class for argparse""" + if self.writeLogs and self.writeLogsGzip: + raise ValueError("Cannot use both --writeLogs and --writeLogsGzip at the same time.") + if self.writeLogsFromAllJobs and not self.writeLogs and not self.writeLogsGzip: + raise ValueError("To enable --writeLogsFromAllJobs, either --writeLogs or --writeLogsGzip must be set.") + for override in self.nodeStorageOverrides: + tokens = override.split(":") + if not any(tokens[0] in n[0] for n in self.nodeTypes): + raise ValueError("Instance type in --nodeStorageOverrides must be in --nodeTypes") + + if self.stats: + if self.clean != "never" and self.clean is not None: + logger.warning("Contradicting options passed: Clean flag is set to %s " + "despite the stats flag requiring " + "the jobStore to be intact at the end of the run. " + "Setting clean to \'never\'." % self.clean) + self.clean = "never" def __eq__(self, other: object) -> bool: return self.__dict__ == other.__dict__ @@ -465,6 +498,92 @@ def __hash__(self) -> int: return self.__dict__.__hash__() # type: ignore +def check_and_create_toil_home_dir() -> None: + """ + Ensure that TOIL_HOME_DIR exists. + + Raises an error if it does not exist and cannot be created. Safe to run + simultaneously in multiple processes. + """ + + dir_path = try_path(TOIL_HOME_DIR) + if dir_path is None: + raise RuntimeError(f"Cannot create or access Toil configuration directory {TOIL_HOME_DIR}") + + +def generate_config(filepath: str) -> None: + """ + Write a Toil config file to the given path. + + Safe to run simultaneously in multiple processes. No process will see an + empty or partially-written file at the given path. + """ + # this is placed in common.py rather than toilConfig.py to prevent circular imports + + # configargparse's write_config function does not write options with a None value + # Thus, certain CLI options that use None as their default won't be written to the config file. + # it also does not support printing config elements in nonalphabetical order + + # Instead, mimic configargparser's write_config behavior and also make it output arguments with + # a default value of None + + # To do this, iterate through the options + # Skip --help and --config as they should not be included in the config file + # Skip deprecated/redundant options + # Various log options are skipped as they are store_const arguments that are redundant to --logLevel + # linkImports, moveExports, disableCaching, are deprecated in favor of --symlinkImports, --moveOutputs, + # and --caching respectively + # Skip StoreTrue and StoreFalse options that have opposite defaults as including it in the config would + # override those defaults + deprecated_or_redundant_options = ("help", "config", "logCritical", "logDebug", "logError", "logInfo", "logOff", + "logWarning", "linkImports", "noLinkImports", "moveExports", "noMoveExports", + "enableCaching", "disableCaching") + + parser = ArgParser(YAMLConfigFileParser()) + addOptions(parser, jobstore_as_flag=True) + + data = CommentedMap() # to preserve order + group_title_key: Dict[str, str] = dict() + for action in parser._actions: + if any(s.replace("-", "") in deprecated_or_redundant_options for s in action.option_strings): + continue + # if action is StoreFalse and default is True then don't include + if isinstance(action, _StoreFalseAction) and action.default is True: + continue + # if action is StoreTrue and default is False then don't include + if isinstance(action, _StoreTrueAction) and action.default is False: + continue + + option_string = action.option_strings[0] if action.option_strings[0].find("--") != -1 else \ + action.option_strings[1] + option = option_string[2:] + + default = action.default + + data[option] = default + + # store where each argparse group starts + group_title = action.container.title # type: ignore[attr-defined] + group_title_key.setdefault(group_title, option) + + # add comment for when each argparse group starts + for group_title, key in group_title_key.items(): + data.yaml_set_comment_before_after_key(key, group_title) + + # Now we need to put the config file in place at filepath. + # But someone else may have already created a file at that path, or may be + # about to open the file at that path and read it before we can finish + # writing the contents. So we write the config file at a temporary path and + # atomically move it over. There's still a race to see which process's + # config file actually is left at the name in the end, but nobody will ever + # see an empty or partially-written file at that name (if there wasn't one + # there to begin with). + with AtomicFileCreate(filepath) as temp_path: + with open(temp_path, "w") as f: + yaml = YAML() + yaml.dump(data, f) + + JOBSTORE_HELP = ("The location of the job store for the workflow. " "A job store holds persistent information about the jobs, stats, and files in a " "workflow. If the workflow is run with a distributed batch system, the job " @@ -481,9 +600,9 @@ def __hash__(self) -> int: def parser_with_common_options( - provisioner_options: bool = False, jobstore_option: bool = True -) -> ArgumentParser: - parser = ArgumentParser(prog="Toil", formatter_class=ArgumentDefaultsHelpFormatter) + provisioner_options: bool = False, jobstore_option: bool = True +) -> ArgParser: + parser = ArgParser(prog="Toil", formatter_class=ArgumentDefaultsHelpFormatter) if provisioner_options: add_provisioner_options(parser) @@ -500,20 +619,67 @@ def parser_with_common_options( return parser -def addOptions(parser: ArgumentParser, config: Optional[Config] = None, jobstore_as_flag: bool = False) -> None: +# This is kept in the outer scope as multiple batchsystem files use this +def make_open_interval_action(min: Union[int, float], max: Optional[Union[int, float]] = None) -> Type[Action]: + """ + Returns an argparse action class to check if the input is within the given half-open interval. + ex: + Provided value to argparse must be within the interval [min, max) + Types of min and max must be the same (max may be None) + + :param min: float/int + :param max: optional float/int + :return: argparse action class + """ + + class IntOrFloatOpenAction(Action): + def __call__(self, parser: Any, namespace: Any, values: Any, option_string: Any = None) -> None: + if isinstance(min, int): + if max is not None: # for mypy + assert isinstance(max, int) + func = iC(min, max) + else: + func = fC(min, max) + try: + if not func(values): + raise parser.error( + f"{option_string} ({values}) must be within the range: [{min}, {'infinity' if max is None else max})") + except AssertionError: + raise RuntimeError(f"The {option_string} option has an invalid value: {values}") + setattr(namespace, self.dest, values) + + return IntOrFloatOpenAction + + +def addOptions(parser: ArgumentParser, jobstore_as_flag: bool = False, cwl: bool = False) -> None: """ Add Toil command line options to a parser. + Support for config files. + :param config: If specified, take defaults from the given Config. :param jobstore_as_flag: make the job store option a --jobStore flag instead of a required jobStore positional argument. """ - if config is None: - config = Config() if not (isinstance(parser, ArgumentParser) or isinstance(parser, _ArgumentGroup)): - raise ValueError(f"Unanticipated class: {parser.__class__}. Must be: argparse.ArgumentParser or ArgumentGroup.") + raise ValueError( + f"Unanticipated class: {parser.__class__}. Must be: argparse.ArgumentParser or ArgumentGroup.") + if isinstance(parser, ArgParser): + # in case the user passes in their own configargparse instance instead of calling getDefaultArgumentParser() + # this forces configargparser to process the config file in YAML rather than in it's own format + parser._config_file_parser = YAMLConfigFileParser() # type: ignore[misc] + else: + # configargparse advertises itself as a drag and drop replacement, and running the normal argparse ArgumentParser + # through this code still seems to work (with the exception of --config and environmental variables) + warnings.warn(f'Using deprecated library argparse for options parsing.' + f'This will not parse config files or use environment variables.' + f'Use configargparse instead or call Job.Runner.getDefaultArgumentParser()', + DeprecationWarning) + + opt_strtobool = lambda b: b if b is None else bool(strtobool(b)) + convert_bool = lambda b: bool(strtobool(b)) add_logging_options(parser) parser.register("type", "bool", parseBool) # Custom type for arg=True/False. @@ -524,10 +690,74 @@ def addOptions(parser: ArgumentParser, config: Optional[Config] = None, jobstore "turn on stats collation about the performance of jobs." ) if jobstore_as_flag: - core_options.add_argument('--jobStore', '--jobstore', dest='jobStore', type=str, default=None, help=JOBSTORE_HELP) + core_options.add_argument('--jobstore', '--jobStore', dest='jobStore', type=parse_jobstore, default=None, + help=JOBSTORE_HELP) else: - core_options.add_argument('jobStore', type=str, help=JOBSTORE_HELP) - core_options.add_argument("--workDir", dest="workDir", default=None, + core_options.add_argument('jobStore', type=parse_jobstore, help=JOBSTORE_HELP) + + class WorkDirAction(Action): + """ + Argparse action class to check that the provided --workDir exists + """ + + def __call__(self, parser: Any, namespace: Any, values: Any, option_string: Any = None) -> None: + workDir = values + if workDir is not None: + workDir = os.path.abspath(workDir) + if not os.path.exists(workDir): + raise RuntimeError(f"The path provided to --workDir ({workDir}) does not exist.") + + if len(workDir) > 80: + logger.warning(f'Length of workDir path "{workDir}" is {len(workDir)} characters. ' + f'Consider setting a shorter path with --workPath or setting TMPDIR to something ' + f'like "/tmp" to avoid overly long paths.') + setattr(namespace, self.dest, values) + + class CoordinationDirAction(Action): + """ + Argparse action class to check that the provided --coordinationDir exists + """ + + def __call__(self, parser: Any, namespace: Any, values: Any, option_string: Any = None) -> None: + coordination_dir = values + if coordination_dir is not None: + coordination_dir = os.path.abspath(coordination_dir) + if not os.path.exists(coordination_dir): + raise RuntimeError( + f"The path provided to --coordinationDir ({coordination_dir}) does not exist.") + setattr(namespace, self.dest, values) + + def make_closed_interval_action(min: Union[int, float], max: Optional[Union[int, float]] = None) -> Type[ + Action]: + """ + Returns an argparse action class to check if the input is within the given half-open interval. + ex: + Provided value to argparse must be within the interval [min, max] + + :param min: int/float + :param max: optional int/float + :return: argparse action + """ + + class ClosedIntOrFloatAction(Action): + def __call__(self, parser: Any, namespace: Any, values: Any, option_string: Any = None) -> None: + def is_within(x: Union[int, float]) -> bool: + if max is None: + return min <= x + else: + return min <= x <= max + + try: + if not is_within(values): + raise parser.error( + f"{option_string} ({values}) must be within the range: [{min}, {'infinity' if max is None else max}]") + except AssertionError: + raise RuntimeError(f"The {option_string} option has an invalid value: {values}") + setattr(namespace, self.dest, values) + + return ClosedIntOrFloatAction + + core_options.add_argument("--workDir", dest="workDir", default=None, env_var="TOIL_WORKDIR", action=WorkDirAction, help="Absolute path to directory where temporary files generated during the Toil " "run should be placed. Standard output and error from batch system jobs " "(unless --noStdOutErr is set) will be placed in this directory. A cache directory " @@ -540,15 +770,16 @@ def addOptions(parser: ArgumentParser, config: Optional[Config] = None, jobstore "When sharing a cache between containers on a host, this directory must be " "shared between the containers.") core_options.add_argument("--coordinationDir", dest="coordination_dir", default=None, + env_var="TOIL_COORDINATION_DIR", action=CoordinationDirAction, help="Absolute path to directory where Toil will keep state and lock files." "When sharing a cache between containers on a host, this directory must be " "shared between the containers.") - core_options.add_argument("--noStdOutErr", dest="noStdOutErr", action="store_true", default=None, + core_options.add_argument("--noStdOutErr", dest="noStdOutErr", default=False, action="store_true", help="Do not capture standard output and error from batch system jobs.") - core_options.add_argument("--stats", dest="stats", action="store_true", default=None, + core_options.add_argument("--stats", dest="stats", default=False, action="store_true", help="Records statistics about the toil workflow to be used by 'toil stats'.") clean_choices = ['always', 'onError', 'never', 'onSuccess'] - core_options.add_argument("--clean", dest="clean", choices=clean_choices, default=None, + core_options.add_argument("--clean", dest="clean", choices=clean_choices, default="onSuccess", help=f"Determines the deletion of the jobStore upon completion of the program. " f"Choices: {clean_choices}. The --stats option requires information from the " f"jobStore upon completion so the jobStore will never be deleted with that flag. " @@ -571,7 +802,7 @@ def addOptions(parser: ArgumentParser, config: Optional[Config] = None, jobstore title="Toil options for restarting an existing workflow.", description="Allows the restart of an existing workflow" ) - restart_options.add_argument("--restart", dest="restart", default=None, action="store_true", + restart_options.add_argument("--restart", dest="restart", default=False, action="store_true", help="If --restart is specified then will attempt to restart existing workflow " "at the location pointed to by the --jobStore option. Will raise an exception " "if the workflow does not exist") @@ -590,27 +821,25 @@ def addOptions(parser: ArgumentParser, config: Optional[Config] = None, jobstore ) link_imports = file_store_options.add_mutually_exclusive_group() link_imports_help = ("When using a filesystem based job store, CWL input files are by default symlinked in. " - "Specifying this option instead copies the files into the job store, which may protect " - "them from being modified externally. When not specified and as long as caching is enabled, " - "Toil will protect the file automatically by changing the permissions to read-only.") - link_imports.add_argument("--linkImports", dest="linkImports", action='store_true', help=link_imports_help) - link_imports.add_argument("--noLinkImports", dest="linkImports", action='store_false', help=link_imports_help) - link_imports.set_defaults(linkImports=True) - + "Setting this option to True instead copies the files into the job store, which may protect " + "them from being modified externally. When set to False, as long as caching is enabled, " + "Toil will protect the file automatically by changing the permissions to read-only." + "default=%(default)s") + link_imports.add_argument("--symlinkImports", dest="symlinkImports", type=convert_bool, default=True, + help=link_imports_help) move_exports = file_store_options.add_mutually_exclusive_group() move_exports_help = ('When using a filesystem based job store, output files are by default moved to the ' 'output directory, and a symlink to the moved exported file is created at the initial ' - 'location. Specifying this option instead copies the files into the output directory. ' - 'Applies to filesystem-based job stores only.') - move_exports.add_argument("--moveExports", dest="moveExports", action='store_true', help=move_exports_help) - move_exports.add_argument("--noMoveExports", dest="moveExports", action='store_false', help=move_exports_help) - move_exports.set_defaults(moveExports=False) + 'location. Setting this option to True instead copies the files into the output directory. ' + 'Applies to filesystem-based job stores only.' + 'default=%(default)s') + move_exports.add_argument("--moveOutputs", dest="moveOutputs", type=convert_bool, default=False, + help=move_exports_help) caching = file_store_options.add_mutually_exclusive_group() caching_help = ("Enable or disable caching for your workflow, specifying this overrides default from job store") - caching.add_argument('--disableCaching', dest='caching', action='store_false', help=caching_help) - caching.add_argument('--caching', dest='caching', type=lambda val: bool(strtobool(val)), help=caching_help) - caching.set_defaults(caching=None) + caching.add_argument('--caching', dest='caching', type=opt_strtobool, default=None, help=caching_help) + # default is None according to PR 4299, seems to be generated at runtime # Auto scaling options autoscaling_options = parser.add_argument_group( @@ -619,13 +848,16 @@ def addOptions(parser: ArgumentParser, config: Optional[Config] = None, jobstore "as well as parameters to control the level of provisioning." ) provisioner_choices = ['aws', 'gce', None] + # TODO: Better consolidate this provisioner arg and the one in provisioners/__init__.py? autoscaling_options.add_argument('--provisioner', '-p', dest="provisioner", choices=provisioner_choices, + default=None, help=f"The provisioner for cluster auto-scaling. This is the main Toil " f"'--provisioner' option, and defaults to None for running on single " f"machine and non-auto-scaling batch systems. The currently supported " - f"choices are {provisioner_choices}. The default is {config.provisioner}.") - autoscaling_options.add_argument('--nodeTypes', default=None, + f"choices are {provisioner_choices}. The default is %(default)s.") + autoscaling_options.add_argument('--nodeTypes', default=[], dest="nodeTypes", type=parse_node_types, + action="extend", help="Specifies a list of comma-separated node types, each of which is " "composed of slash-separated instance types, and an optional spot " "bid set off by a colon, making the node type preemptible. Instance " @@ -640,80 +872,105 @@ def addOptions(parser: ArgumentParser, config: Optional[Config] = None, jobstore "Semantics:\n" "\tBid $0.42/hour for either c5.4xlarge or c5a.4xlarge instances,\n" "\ttreated interchangeably, while they are available at that price,\n" - "\tand buy t2.large instances at full price") - autoscaling_options.add_argument('--minNodes', default=None, - help="Mininum number of nodes of each type in the cluster, if using " - "auto-scaling. This should be provided as a comma-separated list of the " - "same length as the list of node types. default=0") - autoscaling_options.add_argument('--maxNodes', default=None, + "\tand buy t2.large instances at full price.\n" + "default=%(default)s") + class NodeExtendAction(_AppendAction): + """ + argparse Action class to remove the default value on first call, and act as an extend action after + """ + # with action=append/extend, the argparse default is always prepended to the option + # so make the CLI have priority by rewriting the option on the first run + def __init__(self, option_strings: Any, dest: Any, **kwargs: Any): + super().__init__(option_strings, dest, **kwargs) + self.is_default = True + def __call__(self, parser: Any, namespace: Any, values: Any, option_string: Any = None) -> None: + if self.is_default: + setattr(namespace, self.dest, values) + self.is_default = False + else: + super().__call__(parser, namespace, values, option_string) + + autoscaling_options.add_argument('--maxNodes', default=[10], dest="maxNodes", type=parse_int_list, action=NodeExtendAction, help=f"Maximum number of nodes of each type in the cluster, if using autoscaling, " f"provided as a comma-separated list. The first value is used as a default " f"if the list length is less than the number of nodeTypes. " - f"default={config.maxNodes[0]}") - autoscaling_options.add_argument("--targetTime", dest="targetTime", default=None, + f"default=%(default)s") + autoscaling_options.add_argument('--minNodes', default=[0], dest="minNodes", type=parse_int_list, action=NodeExtendAction, + help="Mininum number of nodes of each type in the cluster, if using " + "auto-scaling. This should be provided as a comma-separated list of the " + "same length as the list of node types. default=%(default)s") + autoscaling_options.add_argument("--targetTime", dest="targetTime", default=defaultTargetTime, type=int, + action=make_closed_interval_action(0), help=f"Sets how rapidly you aim to complete jobs in seconds. Shorter times mean " f"more aggressive parallelization. The autoscaler attempts to scale up/down " f"so that it expects all queued jobs will complete within targetTime " - f"seconds. default={config.targetTime}") - autoscaling_options.add_argument("--betaInertia", dest="betaInertia", default=None, + f"seconds. default=%(default)s") + autoscaling_options.add_argument("--betaInertia", dest="betaInertia", default=0.1, type=float, + action=make_closed_interval_action(0.0, 0.9), help=f"A smoothing parameter to prevent unnecessary oscillations in the number " f"of provisioned nodes. This controls an exponentially weighted moving " f"average of the estimated number of nodes. A value of 0.0 disables any " f"smoothing, and a value of 0.9 will smooth so much that few changes will " - f"ever be made. Must be between 0.0 and 0.9. default={config.betaInertia}") - autoscaling_options.add_argument("--scaleInterval", dest="scaleInterval", default=None, + f"ever be made. Must be between 0.0 and 0.9. default=%(default)s") + autoscaling_options.add_argument("--scaleInterval", dest="scaleInterval", default=60, type=int, help=f"The interval (seconds) between assessing if the scale of " - f"the cluster needs to change. default={config.scaleInterval}") - autoscaling_options.add_argument("--preemptibleCompensation", "--preemptableCompensation", dest="preemptibleCompensation", default=None, + f"the cluster needs to change. default=%(default)s") + autoscaling_options.add_argument("--preemptibleCompensation", "--preemptableCompensation", + dest="preemptibleCompensation", default=0.0, type=float, + action=make_closed_interval_action(0.0, 1.0), help=f"The preference of the autoscaler to replace preemptible nodes with " f"non-preemptible nodes, when preemptible nodes cannot be started for some " - f"reason. Defaults to {config.preemptibleCompensation}. This value must be " - f"between 0.0 and 1.0, inclusive. A value of 0.0 disables such " + f"reason. This value must be between 0.0 and 1.0, inclusive. " + f"A value of 0.0 disables such " f"compensation, a value of 0.5 compensates two missing preemptible nodes " f"with a non-preemptible one. A value of 1.0 replaces every missing " - f"pre-emptable node with a non-preemptible one.") - autoscaling_options.add_argument("--nodeStorage", dest="nodeStorage", default=50, + f"pre-emptable node with a non-preemptible one. default=%(default)s") + autoscaling_options.add_argument("--nodeStorage", dest="nodeStorage", default=50, type=int, help="Specify the size of the root volume of worker nodes when they are launched " "in gigabytes. You may want to set this if your jobs require a lot of disk " - "space. (default: %(default)s).") - autoscaling_options.add_argument('--nodeStorageOverrides', default=None, + f"space. (default=%(default)s).") + autoscaling_options.add_argument('--nodeStorageOverrides', dest="nodeStorageOverrides", default=[], + type=parse_str_list, action="extend", help="Comma-separated list of nodeType:nodeStorage that are used to override " "the default value from --nodeStorage for the specified nodeType(s). " "This is useful for heterogeneous jobs where some tasks require much more " "disk than others.") - autoscaling_options.add_argument("--metrics", dest="metrics", default=False, action="store_true", + + autoscaling_options.add_argument("--metrics", dest="metrics", default=False, type=convert_bool, help="Enable the prometheus/grafana dashboard for monitoring CPU/RAM usage, " "queue size, and issued jobs.") - autoscaling_options.add_argument("--assumeZeroOverhead", dest="assume_zero_overhead", default=False, action="store_true", + autoscaling_options.add_argument("--assumeZeroOverhead", dest="assume_zero_overhead", default=False, + type=convert_bool, help="Ignore scheduler and OS overhead and assume jobs can use every last byte " "of memory and disk on a node when autoscaling.") # Parameters to limit service jobs / detect service deadlocks - if not config.cwl: + if not cwl: service_options = parser.add_argument_group( title="Toil options for limiting the number of service jobs and detecting service deadlocks", description="Allows the specification of the maximum number of service jobs in a cluster. By keeping " "this limited we can avoid nodes occupied with services causing deadlocks." ) - service_options.add_argument("--maxServiceJobs", dest="maxServiceJobs", default=None, type=int, + service_options.add_argument("--maxServiceJobs", dest="maxServiceJobs", default=SYS_MAX_SIZE, type=int, help=f"The maximum number of service jobs that can be run concurrently, " f"excluding service jobs running on preemptible nodes. " - f"default={config.maxServiceJobs}") - service_options.add_argument("--maxPreemptibleServiceJobs", dest="maxPreemptibleServiceJobs", default=None, + f"default=%(default)s") + service_options.add_argument("--maxPreemptibleServiceJobs", dest="maxPreemptibleServiceJobs", + default=SYS_MAX_SIZE, type=int, help=f"The maximum number of service jobs that can run concurrently on " - f"preemptible nodes. default={config.maxPreemptibleServiceJobs}") - service_options.add_argument("--deadlockWait", dest="deadlockWait", default=None, type=int, + f"preemptible nodes. default=%(default)s") + service_options.add_argument("--deadlockWait", dest="deadlockWait", default=60, type=int, help=f"Time, in seconds, to tolerate the workflow running only the same service " f"jobs, with no jobs to use them, before declaring the workflow to be " - f"deadlocked and stopping. default={config.deadlockWait}") - service_options.add_argument("--deadlockCheckInterval", dest="deadlockCheckInterval", default=None, type=int, + f"deadlocked and stopping. default=%(default)s") + service_options.add_argument("--deadlockCheckInterval", dest="deadlockCheckInterval", default=30, type=int, help="Time, in seconds, to wait between checks to see if the workflow is stuck " "running only service jobs, with no jobs to use them. Should be shorter " "than --deadlockWait. May need to be increased if the batch system cannot " "enumerate running jobs quickly enough, or if polling for running jobs is " "placing an unacceptable load on a shared cluster. " - "default={config.deadlockCheckInterval}") + f"default=%(default)s") # Resource requirements resource_options = parser.add_argument_group( @@ -727,63 +984,84 @@ def addOptions(parser: ArgumentParser, config: Optional[Config] = None, jobstore 'Default is {}.') cpu_note = 'Fractions of a core (for example 0.1) are supported on some batch systems [mesos, single_machine]' disk_mem_note = 'Standard suffixes like K, Ki, M, Mi, G or Gi are supported' - accelerators_note = ('Each accelerator specification can have a type (gpu [default], nvidia, amd, cuda, rocm, opencl, ' - 'or a specific model like nvidia-tesla-k80), and a count [default: 1]. If both a type and a count ' - 'are used, they must be separated by a colon. If multiple types of accelerators are ' - 'used, the specifications are separated by commas') - resource_options.add_argument('--defaultMemory', dest='defaultMemory', default=None, metavar='INT', - help=resource_help_msg.format('default', 'memory', disk_mem_note, bytes2human(config.defaultMemory))) - resource_options.add_argument('--defaultCores', dest='defaultCores', default=None, metavar='FLOAT', - help=resource_help_msg.format('default', 'cpu', cpu_note, str(config.defaultCores))) - resource_options.add_argument('--defaultDisk', dest='defaultDisk', default=None, metavar='INT', - help=resource_help_msg.format('default', 'disk', disk_mem_note, bytes2human(config.defaultDisk))) - resource_options.add_argument('--defaultAccelerators', dest='defaultAccelerators', default=None, metavar='ACCELERATOR[,ACCELERATOR...]', - help=resource_help_msg.format('default', 'accelerators', accelerators_note, config.defaultAccelerators)) - resource_options.add_argument('--defaultPreemptible', '--defaultPreemptable', dest='defaultPreemptible', metavar='BOOL', - type=bool, nargs='?', const=True, default=False, + accelerators_note = ( + 'Each accelerator specification can have a type (gpu [default], nvidia, amd, cuda, rocm, opencl, ' + 'or a specific model like nvidia-tesla-k80), and a count [default: 1]. If both a type and a count ' + 'are used, they must be separated by a colon. If multiple types of accelerators are ' + 'used, the specifications are separated by commas') + + h2b = lambda x: human2bytes(str(x)) + + resource_options.add_argument('--defaultMemory', dest='defaultMemory', default="2.0 Gi", type=h2b, + action=make_open_interval_action(1), + help=resource_help_msg.format('default', 'memory', disk_mem_note, + bytes2human(2147483648))) + resource_options.add_argument('--defaultCores', dest='defaultCores', default=1, metavar='FLOAT', type=float, + action=make_open_interval_action(1.0), + help=resource_help_msg.format('default', 'cpu', cpu_note, str(1))) + resource_options.add_argument('--defaultDisk', dest='defaultDisk', default="2.0 Gi", metavar='INT', type=h2b, + action=make_open_interval_action(1), + help=resource_help_msg.format('default', 'disk', disk_mem_note, + bytes2human(2147483648))) + resource_options.add_argument('--defaultAccelerators', dest='defaultAccelerators', default=[], + metavar='ACCELERATOR[,ACCELERATOR...]', type=parse_accelerator_list, action="extend", + help=resource_help_msg.format('default', 'accelerators', accelerators_note, [])) + resource_options.add_argument('--defaultPreemptible', '--defaultPreemptable', dest='defaultPreemptible', + metavar='BOOL', + type=convert_bool, nargs='?', const=True, default=False, help='Make all jobs able to run on preemptible (spot) nodes by default.') - resource_options.add_argument('--maxCores', dest='maxCores', default=None, metavar='INT', - help=resource_help_msg.format('max', 'cpu', cpu_note, str(config.maxCores))) - resource_options.add_argument('--maxMemory', dest='maxMemory', default=None, metavar='INT', - help=resource_help_msg.format('max', 'memory', disk_mem_note, bytes2human(config.maxMemory))) - resource_options.add_argument('--maxDisk', dest='maxDisk', default=None, metavar='INT', - help=resource_help_msg.format('max', 'disk', disk_mem_note, bytes2human(config.maxDisk))) + resource_options.add_argument('--maxCores', dest='maxCores', default=SYS_MAX_SIZE, metavar='INT', type=int, + action=make_open_interval_action(1), + help=resource_help_msg.format('max', 'cpu', cpu_note, str(SYS_MAX_SIZE))) + resource_options.add_argument('--maxMemory', dest='maxMemory', default=SYS_MAX_SIZE, metavar='INT', type=h2b, + action=make_open_interval_action(1), + help=resource_help_msg.format('max', 'memory', disk_mem_note, + bytes2human(SYS_MAX_SIZE))) + resource_options.add_argument('--maxDisk', dest='maxDisk', default=SYS_MAX_SIZE, metavar='INT', type=h2b, + action=make_open_interval_action(1), + help=resource_help_msg.format('max', 'disk', disk_mem_note, + bytes2human(SYS_MAX_SIZE))) # Retrying/rescuing jobs job_options = parser.add_argument_group( title="Toil options for rescuing/killing/restarting jobs.", description="The options for jobs that either run too long/fail or get lost (some batch systems have issues!)." ) - job_options.add_argument("--retryCount", dest="retryCount", default=None, + job_options.add_argument("--retryCount", dest="retryCount", default=1, type=int, + action=make_open_interval_action(0), help=f"Number of times to retry a failing job before giving up and " - f"labeling job failed. default={config.retryCount}") - job_options.add_argument("--enableUnlimitedPreemptibleRetries", "--enableUnlimitedPreemptableRetries", dest="enableUnlimitedPreemptibleRetries", - action='store_true', default=False, + f"labeling job failed. default={1}") + job_options.add_argument("--enableUnlimitedPreemptibleRetries", "--enableUnlimitedPreemptableRetries", + dest="enableUnlimitedPreemptibleRetries", + type=convert_bool, default=False, help="If set, preemptible failures (or any failure due to an instance getting " "unexpectedly terminated) will not count towards job failures and --retryCount.") - job_options.add_argument("--doubleMem", dest="doubleMem", action='store_true', default=False, + job_options.add_argument("--doubleMem", dest="doubleMem", type=convert_bool, default=False, help="If set, batch jobs which die to reaching memory limit on batch schedulers " "will have their memory doubled and they will be retried. The remaining " "retry count will be reduced by 1. Currently supported by LSF.") - job_options.add_argument("--maxJobDuration", dest="maxJobDuration", default=None, + job_options.add_argument("--maxJobDuration", dest="maxJobDuration", default=SYS_MAX_SIZE, type=int, + action=make_open_interval_action(1), help=f"Maximum runtime of a job (in seconds) before we kill it (this is a lower bound, " f"and the actual time before killing the job may be longer). " - f"default={config.maxJobDuration}") - job_options.add_argument("--rescueJobsFrequency", dest="rescueJobsFrequency", default=None, + f"default=%(default)s") + job_options.add_argument("--rescueJobsFrequency", dest="rescueJobsFrequency", default=60, type=int, + action=make_open_interval_action(1), help=f"Period of time to wait (in seconds) between checking for missing/overlong jobs, " f"that is jobs which get lost by the batch system. Expert parameter. " - f"default={config.rescueJobsFrequency}") + f"default=%(default)s") # Log management options log_options = parser.add_argument_group( title="Toil log management options.", description="Options for how Toil should manage its logs." ) - log_options.add_argument("--maxLogFileSize", dest="maxLogFileSize", default=None, + log_options.add_argument("--maxLogFileSize", dest="maxLogFileSize", default=64000, type=h2b, + action=make_open_interval_action(1), help=f"The maximum size of a job log file to keep (in bytes), log files larger than " f"this will be truncated to the last X bytes. Setting this option to zero will " f"prevent any truncation. Setting this option to a negative value will truncate " - f"from the beginning. Default={bytes2human(config.maxLogFileSize)}") + f"from the beginning. Default={bytes2human(64000)}") log_options.add_argument("--writeLogs", dest="writeLogs", nargs='?', action='store', default=None, const=os.getcwd(), help="Write worker logs received by the leader into their own files at the specified " @@ -796,14 +1074,15 @@ def addOptions(parser: ArgumentParser, config: Optional[Config] = None, jobstore log_options.add_argument("--writeLogsGzip", dest="writeLogsGzip", nargs='?', action='store', default=None, const=os.getcwd(), help="Identical to --writeLogs except the logs files are gzipped on the leader.") - log_options.add_argument("--writeLogsFromAllJobs", dest="writeLogsFromAllJobs", action='store_true', + log_options.add_argument("--writeLogsFromAllJobs", dest="writeLogsFromAllJobs", type=convert_bool, default=False, help="Whether to write logs from all jobs (including the successful ones) without " "necessarily setting the log level to 'debug'. Ensure that either --writeLogs " "or --writeLogsGzip is set if enabling this option.") log_options.add_argument("--writeMessages", dest="write_messages", default=None, + type=lambda x: None if x is None else os.path.abspath(x), help="File to send messages from the leader's message bus to.") - log_options.add_argument("--realTimeLogging", dest="realTimeLogging", action="store_true", default=False, + log_options.add_argument("--realTimeLogging", dest="realTimeLogging", type=convert_bool, default=False, help="Enable real-time logging from workers to leader") # Misc options @@ -811,52 +1090,114 @@ def addOptions(parser: ArgumentParser, config: Optional[Config] = None, jobstore title="Toil miscellaneous options.", description="Everything else." ) - misc_options.add_argument('--disableChaining', dest='disableChaining', action='store_true', default=False, + misc_options.add_argument('--disableChaining', dest='disableChaining', type=convert_bool, default=False, help="Disables chaining of jobs (chaining uses one job's resource allocation " "for its successor job if possible).") misc_options.add_argument("--disableJobStoreChecksumVerification", dest="disableJobStoreChecksumVerification", - default=False, action="store_true", + default=False, type=convert_bool, help="Disables checksum verification for files transferred to/from the job store. " "Checksum verification is a safety check to ensure the data is not corrupted " "during transfer. Currently only supported for non-streaming AWS files.") - misc_options.add_argument("--sseKey", dest="sseKey", default=None, + + class SSEKeyAction(Action): + def __call__(self, parser: Any, namespace: Any, values: Any, option_string: Any = None) -> None: + if values is not None: + sse_key = values + if sse_key is None: + return + with open(sse_key) as f: + assert len(f.readline().rstrip()) == 32, 'SSE key appears to be invalid.' + setattr(namespace, self.dest, values) + + misc_options.add_argument("--sseKey", dest="sseKey", default=None, action=SSEKeyAction, help="Path to file containing 32 character key to be used for server-side encryption on " "awsJobStore or googleJobStore. SSE will not be used if this flag is not passed.") - misc_options.add_argument("--setEnv", '-e', metavar='NAME=VALUE or NAME', dest="environment", default=[], - action="append", - help="Set an environment variable early on in the worker. If VALUE is omitted, it will " + + # yaml.safe_load is being deprecated, this is the suggested workaround + def yaml_safe_load(stream: Any) -> Any: + yaml = YAML(typ='safe', pure=True) + d = yaml.load(stream) + if isinstance(d, dict): + # this means the argument was a dictionary and is valid yaml (for configargparse) + return d + else: + # this means the argument is likely in it's string format (for CLI) + return parseSetEnv(parse_str_list(stream)) + + class ExtendActionDict(Action): + """ + Argparse action class to implement the action="extend" functionality on dictionaries + """ + + def __call__(self, parser: Any, namespace: Any, values: Any, option_string: Any = None) -> None: + items = getattr(namespace, self.dest, None) + assert items is not None # for mypy. This should never be None, esp. if called in setEnv + # note: this will overwrite existing entries + items.update(values) + + misc_options.add_argument("--setEnv", '-e', metavar='NAME=VALUE or NAME', dest="environment", + default={}, type=yaml_safe_load, action=ExtendActionDict, + help="Set an environment variable early on in the worker. If VALUE is null, it will " "be looked up in the current environment. Independently of this option, the worker " "will try to emulate the leader's environment before running a job, except for " "some variables known to vary across systems. Using this option, a variable can " "be injected into the worker process itself before it is started.") - misc_options.add_argument("--servicePollingInterval", dest="servicePollingInterval", default=None, + misc_options.add_argument("--servicePollingInterval", dest="servicePollingInterval", default=60.0, type=float, + action=make_open_interval_action(0.0), help=f"Interval of time service jobs wait between polling for the existence of the " - f"keep-alive flag. Default: {config.servicePollingInterval}") - misc_options.add_argument('--forceDockerAppliance', dest='forceDockerAppliance', action='store_true', default=False, + f"keep-alive flag. Default: {60.0}") + misc_options.add_argument('--forceDockerAppliance', dest='forceDockerAppliance', type=convert_bool, default=False, help='Disables sanity checking the existence of the docker image specified by ' 'TOIL_APPLIANCE_SELF, which Toil uses to provision mesos for autoscaling.') misc_options.add_argument('--statusWait', dest='statusWait', type=int, default=3600, help="Seconds to wait between reports of running jobs.") - misc_options.add_argument('--disableProgress', dest='disableProgress', action='store_true', default=False, + misc_options.add_argument('--disableProgress', dest='disableProgress', type=convert_bool, default=False, help="Disables the progress bar shown when standard error is a terminal.") + # If using argparse instead of configargparse, this should just not parse when calling parse_args() + # default config value is set to none as defaults should already be populated at config init + misc_options.add_argument('--config', dest='config', is_config_file_arg=True, default=None, + help="Get options from a config file.") # Debug options debug_options = parser.add_argument_group( title="Toil debug options.", description="Debug options for finding problems or helping with testing." ) - debug_options.add_argument("--debugWorker", default=False, action="store_true", + debug_options.add_argument("--debugWorker", dest="debugWorker", default=False, action="store_true", help="Experimental no forking mode for local debugging. Specifically, workers " "are not forked and stderr/stdout are not redirected to the log.") - debug_options.add_argument("--disableWorkerOutputCapture", default=False, action="store_true", + debug_options.add_argument("--disableWorkerOutputCapture", dest="disableWorkerOutputCapture", default=False, + action="store_true", help="Let worker output go to worker's standard out/error instead of per-job logs.") - debug_options.add_argument("--badWorker", dest="badWorker", default=None, + debug_options.add_argument("--badWorker", dest="badWorker", default=0.0, type=float, + action=make_closed_interval_action(0.0, 1.0), help=f"For testing purposes randomly kill --badWorker proportion of jobs using " - f"SIGKILL. default={config.badWorker}") - debug_options.add_argument("--badWorkerFailInterval", dest="badWorkerFailInterval", default=None, + f"SIGKILL. default={0.0}") + debug_options.add_argument("--badWorkerFailInterval", dest="badWorkerFailInterval", default=0.01, type=float, + action=make_open_interval_action(0.0), help=f"When killing the job pick uniformly within the interval from 0.0 to " f"--badWorkerFailInterval seconds after the worker starts. " - f"default={config.badWorkerFailInterval}") + f"default={0.01}") + + # All deprecated options: + + # These are deprecated in favor of a simpler option + # ex: noLinkImports and linkImports can be simplified into a single link_imports argument + link_imports.add_argument("--noLinkImports", dest="linkImports", action="store_false", + help=SUPPRESS) + link_imports.add_argument("--linkImports", dest="linkImports", action="store_true", + help=SUPPRESS) + link_imports.set_defaults(linkImports=None) + + move_exports.add_argument("--moveExports", dest="moveExports", action="store_true", + help=SUPPRESS) + move_exports.add_argument("--noMoveExports", dest="moveExports", action="store_false", + help=SUPPRESS) + link_imports.set_defaults(moveExports=None) + + # dest is set to enableCaching to not conflict with the current --caching destination + caching.add_argument('--disableCaching', dest='enableCaching', action='store_false', help=SUPPRESS) + caching.set_defaults(disableCaching=None) def parseBool(val: str) -> bool: @@ -867,6 +1208,7 @@ def parseBool(val: str) -> bool: else: raise RuntimeError("Could not interpret \"%s\" as a boolean value" % val) + @lru_cache(maxsize=None) def getNodeID() -> str: """ @@ -962,10 +1304,12 @@ def __enter__(self) -> "Toil": set_logging_from_options(self.options) config = Config() config.setOptions(self.options) + if config.jobStore is None: + raise RuntimeError("No jobstore provided!") jobStore = self.getJobStore(config.jobStore) if config.caching is None: config.caching = jobStore.default_caching() - #Set the caching option because it wasn't set originally, resuming jobstore rebuilds config from CLI options + # Set the caching option because it wasn't set originally, resuming jobstore rebuilds config from CLI options self.options.caching = config.caching if not config.restart: @@ -988,10 +1332,10 @@ def __enter__(self) -> "Toil": return self def __exit__( - self, - exc_type: Optional[Type[BaseException]], - exc_val: Optional[BaseException], - exc_tb: Optional[TracebackType], + self, + exc_type: Optional[Type[BaseException]], + exc_val: Optional[BaseException], + exc_tb: Optional[TracebackType], ) -> Literal[False]: """ Clean up after a workflow invocation. @@ -1194,7 +1538,7 @@ def createBatchSystem(config: Config) -> "AbstractBatchSystem": return batch_system(**kwargs) def _setupAutoDeployment( - self, userScript: Optional["ModuleDescriptor"] = None + self, userScript: Optional["ModuleDescriptor"] = None ) -> None: """ Determine the user script, save it to the job store and inject a reference to the saved copy into the batch system. @@ -1249,13 +1593,15 @@ def _setupAutoDeployment( def importFile(self, srcUrl: str, sharedFileName: str, - symlink: bool = True) -> None: ... + symlink: bool = True) -> None: + ... @overload def importFile(self, srcUrl: str, sharedFileName: None = None, - symlink: bool = True) -> FileID: ... + symlink: bool = True) -> FileID: + ... @deprecated(new_function_name='import_file') def importFile(self, @@ -1269,14 +1615,16 @@ def import_file(self, src_uri: str, shared_file_name: str, symlink: bool = True, - check_existence: bool = True) -> None: ... + check_existence: bool = True) -> None: + ... @overload def import_file(self, src_uri: str, shared_file_name: None = None, symlink: bool = True, - check_existence: bool = True) -> FileID: ... + check_existence: bool = True) -> FileID: + ... def import_file(self, src_uri: str, @@ -1394,7 +1742,8 @@ def getToilWorkDir(configWorkDir: Optional[str] = None) -> str: :param configWorkDir: Value passed to the program using the --workDir flag :return: Path to the Toil work directory, constant across all machines """ - workDir = os.getenv('TOIL_WORKDIR_OVERRIDE') or configWorkDir or os.getenv('TOIL_WORKDIR') or tempfile.gettempdir() + workDir = os.getenv('TOIL_WORKDIR_OVERRIDE') or configWorkDir or os.getenv( + 'TOIL_WORKDIR') or tempfile.gettempdir() if not os.path.exists(workDir): raise RuntimeError(f'The directory specified by --workDir or TOIL_WORKDIR ({workDir}) does not exist.') return workDir @@ -1418,31 +1767,33 @@ def get_toil_coordination_dir(cls, config_work_dir: Optional[str], config_coordi if 'XDG_RUNTIME_DIR' in os.environ and not os.path.exists(os.environ['XDG_RUNTIME_DIR']): # Slurm has been observed providing this variable but not keeping # the directory live as long as we run for. - logger.warning('XDG_RUNTIME_DIR is set to nonexistent directory %s; your environment may be out of spec!', os.environ['XDG_RUNTIME_DIR']) + logger.warning('XDG_RUNTIME_DIR is set to nonexistent directory %s; your environment may be out of spec!', + os.environ['XDG_RUNTIME_DIR']) # Go get a coordination directory, using a lot of short-circuiting of # or and the fact that and returns its second argument when it # succeeds. coordination_dir: Optional[str] = ( # First try an override env var - os.getenv('TOIL_COORDINATION_DIR_OVERRIDE') or - # Then the value from the config - config_coordination_dir or - # Then a normal env var - # TODO: why/how would this propagate when not using single machine? - os.getenv('TOIL_COORDINATION_DIR') or - # Then try a `toil` subdirectory of the XDG runtime directory - # (often /var/run/users/). But only if we are actually in a - # session that has the env var set. Otherwise it might belong to a - # different set of sessions and get cleaned up out from under us - # when that session ends. - # We don't think Slurm XDG sessions are trustworthy, depending on - # the cluster's PAM configuration, so don't use them. - ('XDG_RUNTIME_DIR' in os.environ and 'SLURM_JOBID' not in os.environ and try_path(os.path.join(os.environ['XDG_RUNTIME_DIR'], 'toil'))) or - # Try under /run/lock. It might be a temp dir style sticky directory. - try_path('/run/lock') or - # Finally, fall back on the work dir and hope it's a legit filesystem. - cls.getToilWorkDir(config_work_dir) + os.getenv('TOIL_COORDINATION_DIR_OVERRIDE') or + # Then the value from the config + config_coordination_dir or + # Then a normal env var + # TODO: why/how would this propagate when not using single machine? + os.getenv('TOIL_COORDINATION_DIR') or + # Then try a `toil` subdirectory of the XDG runtime directory + # (often /var/run/users/). But only if we are actually in a + # session that has the env var set. Otherwise it might belong to a + # different set of sessions and get cleaned up out from under us + # when that session ends. + # We don't think Slurm XDG sessions are trustworthy, depending on + # the cluster's PAM configuration, so don't use them. + ('XDG_RUNTIME_DIR' in os.environ and 'SLURM_JOBID' not in os.environ and try_path( + os.path.join(os.environ['XDG_RUNTIME_DIR'], 'toil'))) or + # Try under /run/lock. It might be a temp dir style sticky directory. + try_path('/run/lock') or + # Finally, fall back on the work dir and hope it's a legit filesystem. + cls.getToilWorkDir(config_work_dir) ) if coordination_dir is None: @@ -1464,7 +1815,7 @@ def _get_workflow_path_component(workflow_id: str) -> str: @classmethod def getLocalWorkflowDir( - cls, workflowID: str, configWorkDir: Optional[str] = None + cls, workflowID: str, configWorkDir: Optional[str] = None ) -> str: """ Return the directory where worker directories and the cache will be located for this workflow on this machine. @@ -1491,10 +1842,10 @@ def getLocalWorkflowDir( @classmethod def get_local_workflow_coordination_dir( - cls, - workflow_id: str, - config_work_dir: Optional[str], - config_coordination_dir: Optional[str] + cls, + workflow_id: str, + config_work_dir: Optional[str], + config_coordination_dir: Optional[str] ) -> str: """ Return the directory where coordination files should be located for @@ -1656,8 +2007,10 @@ def __init__(self, bus: MessageBus, provisioner: Optional["AbstractProvisioner"] # The only way to make this inteligible to MyPy is to wrap the dict in # a function that can cast. MessageType = TypeVar('MessageType') + def get_listener(message_type: Type[MessageType]) -> Callable[[MessageType], None]: return cast(Callable[[MessageType], None], TARGETS[message_type]) + # Then set up the listeners. self._listeners = [bus.subscribe(message_type, get_listener(message_type)) for message_type in TARGETS.keys()] @@ -1722,12 +2075,12 @@ def log(self, message: str) -> None: # remaining intact def logClusterSize( - self, m: ClusterSizeMessage + self, m: ClusterSizeMessage ) -> None: self.log("current_size '%s' %i" % (m.instance_type, m.current_size)) def logClusterDesiredSize( - self, m: ClusterDesiredSizeMessage + self, m: ClusterDesiredSizeMessage ) -> None: self.log("desired_size '%s' %i" % (m.instance_type, m.desired_size)) @@ -1800,10 +2153,13 @@ def parseSetEnv(l: List[str]) -> Dict[str, Optional[str]]: return d -def iC(minValue: int, maxValue: int = SYS_MAX_SIZE) -> Callable[[int], bool]: +def iC(minValue: int, maxValue: Optional[int] = None) -> Callable[[int], bool]: """Returns a function that checks if a given int is in the given half-open interval.""" - assert isinstance(minValue, int) and isinstance(maxValue, int) - return lambda x: minValue <= x < maxValue + assert isinstance(minValue, int) + if maxValue is None: + return lambda x: minValue <= x + assert isinstance(maxValue, int) + return lambda x: minValue <= x < maxValue # type: ignore def fC(minValue: float, maxValue: Optional[float] = None) -> Callable[[float], bool]: @@ -1814,6 +2170,7 @@ def fC(minValue: float, maxValue: Optional[float] = None) -> Callable[[float], b assert isinstance(maxValue, float) return lambda x: minValue <= x < maxValue # type: ignore + def parse_accelerator_list(specs: Optional[str]) -> List['AcceleratorRequirement']: """ Parse a string description of one or more accelerator requirements. diff --git a/src/toil/cwl/cwltoil.py b/src/toil/cwl/cwltoil.py index 11cf821c5b..651d27e305 100644 --- a/src/toil/cwl/cwltoil.py +++ b/src/toil/cwl/cwltoil.py @@ -66,6 +66,7 @@ import cwltool.main import cwltool.resolver import schema_salad.ref_resolver +from configargparse import ArgParser from cwltool.loghandler import _logger as cwllogger from cwltool.loghandler import defaultStreamHandler from cwltool.mpi import MpiConfig @@ -3262,8 +3263,8 @@ def main(args: Optional[List[str]] = None, stdout: TextIO = sys.stdout) -> int: config = Config() config.disableChaining = True config.cwl = True - parser = argparse.ArgumentParser() - addOptions(parser, config, jobstore_as_flag=True) + parser = ArgParser() + addOptions(parser, jobstore_as_flag=True, cwl=True) parser.add_argument("cwltool", type=str) parser.add_argument("cwljob", nargs=argparse.REMAINDER) diff --git a/src/toil/job.py b/src/toil/job.py index 89cf64fe8b..c50aec12f1 100644 --- a/src/toil/job.py +++ b/src/toil/job.py @@ -44,6 +44,8 @@ cast, overload) +from configargparse import ArgParser + from toil.lib.compatibility import deprecated if sys.version_info >= (3, 8): @@ -267,7 +269,8 @@ def parse_accelerator(spec: Union[int, str, Dict[str, Union[str, int]]]) -> Acce elif possible_description in APIS: parsed['api'] = possible_description else: - parsed['model'] = possible_description + if possible_description is not None: + parsed['model'] = possible_description elif isinstance(spec, dict): # It's a dict, so merge with the defaults. parsed.update(spec) @@ -424,6 +427,7 @@ def assignConfig(self, config: Config) -> None: raise RuntimeError(f"Config assigned multiple times to {self}") self._config = config + def __getstate__(self) -> Dict[str, Any]: """Return the dict to use as the instance's __dict__ when pickling.""" # We want to exclude the config from pickling. @@ -595,7 +599,8 @@ def _fetchRequirement(self, requirement: str) -> Optional[ParsedRequirement]: ) return value elif self._config is not None: - value = getattr(self._config, 'default' + requirement.capitalize()) + values = [getattr(self._config, 'default_' + requirement, None), getattr(self._config, 'default' + requirement.capitalize(), None)] + value = values[0] if values[0] is not None else values[1] if value is None: raise AttributeError( f"Encountered None for default '{requirement}' requirement " @@ -2143,37 +2148,49 @@ class Runner(): """Used to setup and run Toil workflow.""" @staticmethod - def getDefaultArgumentParser() -> ArgumentParser: + def getDefaultArgumentParser(jobstore_as_flag: bool = False) -> ArgumentParser: """ Get argument parser with added toil workflow options. + :param jobstore_as_flag: make the job store option a --jobStore flag instead of a required jobStore positional argument. :returns: The argument parser used by a toil workflow with added Toil options. """ - parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter) - Job.Runner.addToilOptions(parser) + parser = ArgParser(formatter_class=ArgumentDefaultsHelpFormatter) + Job.Runner.addToilOptions(parser, jobstore_as_flag=jobstore_as_flag) return parser @staticmethod - def getDefaultOptions(jobStore: str) -> Namespace: + def getDefaultOptions(jobStore: Optional[str] = None, jobstore_as_flag: bool = False) -> Namespace: """ Get default options for a toil workflow. :param jobStore: A string describing the jobStore \ for the workflow. + :param jobstore_as_flag: make the job store option a --jobStore flag instead of a required jobStore positional argument. :returns: The options used by a toil workflow. """ - parser = Job.Runner.getDefaultArgumentParser() - return parser.parse_args(args=[jobStore]) + # setting jobstore_as_flag to True allows the user to declare the jobstore in the config file instead + if not jobstore_as_flag and jobStore is None: + raise RuntimeError("The jobstore argument cannot be missing if the jobstore_as_flag argument is set " + "to False!") + parser = Job.Runner.getDefaultArgumentParser(jobstore_as_flag=jobstore_as_flag) + arguments = [] + if jobstore_as_flag and jobStore is not None: + arguments = ["--jobstore", jobStore] + if not jobstore_as_flag and jobStore is not None: + arguments = [jobStore] + return parser.parse_args(args=arguments) @staticmethod - def addToilOptions(parser: Union["OptionParser", ArgumentParser]) -> None: + def addToilOptions(parser: Union["OptionParser", ArgumentParser], jobstore_as_flag: bool = False) -> None: """ Adds the default toil options to an :mod:`optparse` or :mod:`argparse` parser object. :param parser: Options object to add toil options to. + :param jobstore_as_flag: make the job store option a --jobStore flag instead of a required jobStore positional argument. """ - addOptions(parser) + addOptions(parser, jobstore_as_flag=jobstore_as_flag) @staticmethod def startToil(job: "Job", options) -> Any: diff --git a/src/toil/jobStores/fileJobStore.py b/src/toil/jobStores/fileJobStore.py index bdaa784551..b3b009bcf6 100644 --- a/src/toil/jobStores/fileJobStore.py +++ b/src/toil/jobStores/fileJobStore.py @@ -121,8 +121,8 @@ def initialize(self, config): os.makedirs(self.filesDir, exist_ok=True) os.makedirs(self.jobFilesDir, exist_ok=True) os.makedirs(self.sharedFilesDir, exist_ok=True) - self.linkImports = config.linkImports - self.moveExports = config.moveExports + self.linkImports = config.symlinkImports + self.moveExports = config.moveOutputs super().initialize(config) def resume(self): diff --git a/src/toil/leader.py b/src/toil/leader.py index d6436c6035..c6a81dc63f 100644 --- a/src/toil/leader.py +++ b/src/toil/leader.py @@ -35,7 +35,8 @@ JobIssuedMessage, JobMissingMessage, JobUpdatedMessage, - QueueSizeMessage) + QueueSizeMessage, + gen_message_bus_path) from toil.common import Config, Toil, ToilMetrics from toil.cwl.utils import CWL_UNSUPPORTED_REQUIREMENT_EXIT_CODE from toil.job import (CheckpointJobDescription, @@ -115,10 +116,14 @@ def __init__(self, # state change information about jobs. self.toilState = ToilState(self.jobStore) - if self.config.write_messages is not None: - # Message bus messages need to go to the given file. - # Keep a reference to the return value so the listener stays alive. - self._message_subscription = self.toilState.bus.connect_output_file(self.config.write_messages) + if self.config.write_messages is None: + # The user hasn't specified a place for the message bus so we + # should make one. + self.config.write_messages = gen_message_bus_path() + + # Message bus messages need to go to the given file. + # Keep a reference to the return value so the listener stays alive. + self._message_subscription = self.toilState.bus.connect_output_file(self.config.write_messages) # Connect to the message bus, so we will get all the messages of these # types in an inbox. diff --git a/src/toil/lib/conversions.py b/src/toil/lib/conversions.py index 5370ae55f7..2a8d5a848f 100644 --- a/src/toil/lib/conversions.py +++ b/src/toil/lib/conversions.py @@ -74,6 +74,7 @@ def human2bytes(string: str) -> int: integer number of bytes. """ value, unit = parse_memory_string(string) + return int(convert_units(value, src_unit=unit, dst_unit='b')) diff --git a/src/toil/server/app.py b/src/toil/server/app.py index 0673dbf269..ee4712eea5 100644 --- a/src/toil/server/app.py +++ b/src/toil/server/app.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import argparse +from configargparse import ArgumentParser import logging import os from typing import Type @@ -28,7 +29,7 @@ def parser_with_server_options() -> argparse.ArgumentParser: - parser = argparse.ArgumentParser(description="Toil server mode.") + parser = ArgumentParser(description="Toil server mode.") parser.add_argument("--debug", action="store_true", default=False, help="Enable debug mode.") diff --git a/src/toil/server/cli/wes_cwl_runner.py b/src/toil/server/cli/wes_cwl_runner.py index c72bece960..7387f2e23d 100644 --- a/src/toil/server/cli/wes_cwl_runner.py +++ b/src/toil/server/cli/wes_cwl_runner.py @@ -1,4 +1,4 @@ -import argparse +from configargparse import ArgumentParser import json import logging import os @@ -425,7 +425,7 @@ def print_logs_and_exit(client: WESClientWithWorkflowEngineParameters, run_id: s def main() -> None: - parser = argparse.ArgumentParser(description="A CWL runner that runs workflows through WES.") + parser = ArgumentParser(description="A CWL runner that runs workflows through WES.") # the first two positional arguments are the CWL file and its input file parser.add_argument("cwl_file", type=str) diff --git a/src/toil/statsAndLogging.py b/src/toil/statsAndLogging.py index cb3122f10e..53fcaee37a 100644 --- a/src/toil/statsAndLogging.py +++ b/src/toil/statsAndLogging.py @@ -225,6 +225,8 @@ def add_logging_options(parser: ArgumentParser) -> None: levels += [l.lower() for l in levels] + [l.upper() for l in levels] group.add_argument("--logOff", dest="logLevel", default=default_loglevel, action="store_const", const="CRITICAL", help="Same as --logCRITICAL.") + # Maybe deprecate the above in favor of --logLevel? + group.add_argument("--logLevel", dest="logLevel", default=default_loglevel, choices=levels, help=f"Set the log level. Default: {default_loglevel}. Options: {levels}.") group.add_argument("--logFile", dest="logFile", help="File to log in.") diff --git a/src/toil/test/cwl/mock_mpi/fake_mpi_run.py b/src/toil/test/cwl/mock_mpi/fake_mpi_run.py index 1131c7204f..8dbf0d8a3f 100755 --- a/src/toil/test/cwl/mock_mpi/fake_mpi_run.py +++ b/src/toil/test/cwl/mock_mpi/fake_mpi_run.py @@ -1,12 +1,13 @@ #!/usr/bin/env python3 import argparse +from configargparse import ArgumentParser import subprocess import sys from typing import List def make_parser(): - p = argparse.ArgumentParser() + p = ArgumentParser() p.add_argument("progargs", nargs=argparse.REMAINDER, help="The program and its arguments") p.add_argument("--num", type=int, help="number of times to run the application") p.add_argument("--no-fail", help="add this flag to actually work", action="store_true") diff --git a/src/toil/test/docs/scripts/example_alwaysfail.py b/src/toil/test/docs/scripts/example_alwaysfail.py index 6a68cec73b..2b4c3a55ba 100644 --- a/src/toil/test/docs/scripts/example_alwaysfail.py +++ b/src/toil/test/docs/scripts/example_alwaysfail.py @@ -1,4 +1,4 @@ -import argparse +from configargparse import ArgumentParser import sys from toil.common import Toil @@ -18,7 +18,7 @@ def main(): toil status --printLogs ./jobstore """ - parser = argparse.ArgumentParser(description=main.__doc__) + parser = ArgumentParser(description=main.__doc__) Job.Runner.addToilOptions(parser) options = parser.parse_args(sys.argv[1:]) diff --git a/src/toil/test/docs/scripts/example_cachingbenchmark.py b/src/toil/test/docs/scripts/example_cachingbenchmark.py index 31f9b08248..04eb70d68d 100755 --- a/src/toil/test/docs/scripts/example_cachingbenchmark.py +++ b/src/toil/test/docs/scripts/example_cachingbenchmark.py @@ -17,6 +17,8 @@ """ import argparse +from configargparse import ArgumentParser + import collections import os import random @@ -30,7 +32,7 @@ def main(): - parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) + parser = ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) parser.add_argument('--minSleep', type=int, default=1, help="Minimum seconds to sleep") diff --git a/src/toil/test/jobStores/jobStoreTest.py b/src/toil/test/jobStores/jobStoreTest.py index 618f6fcbcc..3e91822992 100644 --- a/src/toil/test/jobStores/jobStoreTest.py +++ b/src/toil/test/jobStores/jobStoreTest.py @@ -101,7 +101,8 @@ def __new__(cls, *args): return super().__new__(cls) def _createConfig(self): - return Config() + config = Config() + return config @abstractmethod def _createJobStore(self): diff --git a/src/toil/test/mesos/helloWorld.py b/src/toil/test/mesos/helloWorld.py index b0a0a95688..bba679c33d 100644 --- a/src/toil/test/mesos/helloWorld.py +++ b/src/toil/test/mesos/helloWorld.py @@ -15,7 +15,7 @@ A simple user script for Toil """ -import argparse +from configargparse import ArgumentParser from toil.common import Toil from toil.job import Job @@ -32,7 +32,6 @@ def hello_world(job): # Assign FileStoreID to a given file foo_bam = job.fileStore.writeGlobalFile('foo_bam.txt') - # Spawn child job.addChildJobFn(hello_world_child, foo_bam, memory=100, cores=0.5, disk="3G") @@ -56,7 +55,7 @@ def hello_world_child(job, hw): def main(): # Boilerplate -- startToil requires options - parser = argparse.ArgumentParser() + parser = ArgumentParser() Job.Runner.addToilOptions(parser) options = parser.parse_args() diff --git a/src/toil/test/mesos/stress.py b/src/toil/test/mesos/stress.py index 263188cfc8..a0e23d5f36 100644 --- a/src/toil/test/mesos/stress.py +++ b/src/toil/test/mesos/stress.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from argparse import ArgumentParser +from configargparse import ArgumentParser from toil.job import Job diff --git a/src/toil/test/provisioners/aws/awsProvisionerTest.py b/src/toil/test/provisioners/aws/awsProvisionerTest.py index afa5702c63..4c0ff705aa 100644 --- a/src/toil/test/provisioners/aws/awsProvisionerTest.py +++ b/src/toil/test/provisioners/aws/awsProvisionerTest.py @@ -412,7 +412,7 @@ def setUp(self): def _getScript(self): def restartScript(): - import argparse + from configargparse import ArgumentParser import os from toil.job import Job @@ -422,7 +422,7 @@ def f0(job): raise RuntimeError('failed on purpose') if __name__ == '__main__': - parser = argparse.ArgumentParser() + parser = ArgumentParser() Job.Runner.addToilOptions(parser) options = parser.parse_args() rootJob = Job.wrapJobFn(f0, cores=0.5, memory='50 M', disk='50 M') diff --git a/src/toil/test/provisioners/restartScript.py b/src/toil/test/provisioners/restartScript.py index 7b03338a58..ada46baa79 100644 --- a/src/toil/test/provisioners/restartScript.py +++ b/src/toil/test/provisioners/restartScript.py @@ -1,4 +1,4 @@ -import argparse +from configargparse import ArgumentParser import os from toil.job import Job @@ -9,7 +9,7 @@ def f0(job): raise RuntimeError('failed on purpose') if __name__ == '__main__': - parser = argparse.ArgumentParser() + parser = ArgumentParser() Job.Runner.addToilOptions(parser) options = parser.parse_args() rootJob = Job.wrapJobFn(f0, cores=0.5, memory='50 M', disk='50 M') diff --git a/src/toil/test/sort/restart_sort.py b/src/toil/test/sort/restart_sort.py index 255b56336a..c999e01683 100644 --- a/src/toil/test/sort/restart_sort.py +++ b/src/toil/test/sort/restart_sort.py @@ -20,7 +20,7 @@ import os import random import shutil -from argparse import ArgumentParser +from configargparse import ArgumentParser from toil.common import Toil from toil.job import Job diff --git a/src/toil/test/sort/sort.py b/src/toil/test/sort/sort.py index cfbbe6c222..4a0ea34bd6 100755 --- a/src/toil/test/sort/sort.py +++ b/src/toil/test/sort/sort.py @@ -18,7 +18,7 @@ import os import random import shutil -from argparse import ArgumentParser +from configargparse import ArgumentParser from toil.common import Toil from toil.job import Job diff --git a/src/toil/test/src/fileStoreTest.py b/src/toil/test/src/fileStoreTest.py index 2125a67e06..ee3b2456cc 100644 --- a/src/toil/test/src/fileStoreTest.py +++ b/src/toil/test/src/fileStoreTest.py @@ -473,7 +473,15 @@ def _testValidityOfCacheEvictTest(self): # the cache hence this test is redundant (caching will be free). if not self.options.jobStore.startswith(('aws', 'google')): workDirDev = os.stat(self.options.workDir).st_dev - jobStoreDev = os.stat(os.path.dirname(self.options.jobStore)).st_dev + if self.options.jobStore.startswith("file:"): + # Before #4538, options.jobStore would have the raw path while the Config object would prepend the + # filesystem to the path (/path/to/file vs file:/path/to/file) + # The options namespace and the Config object now have the exact same behavior + # which means parse_jobstore will be called with argparse rather than with the config object + # so remove the prepended file: scheme + jobStoreDev = os.stat(os.path.dirname(self.options.jobStore[5:])).st_dev + else: + jobStoreDev = os.stat(os.path.dirname(self.options.jobStore)).st_dev if workDirDev == jobStoreDev: self.skipTest('Job store and working directory are on the same filesystem.') diff --git a/src/toil/test/src/jobDescriptionTest.py b/src/toil/test/src/jobDescriptionTest.py index 9b0a5779b6..76783b4590 100644 --- a/src/toil/test/src/jobDescriptionTest.py +++ b/src/toil/test/src/jobDescriptionTest.py @@ -13,7 +13,7 @@ # limitations under the License. import os -from argparse import ArgumentParser +from configargparse import ArgumentParser from toil.common import Toil from toil.job import Job, JobDescription, TemporaryID diff --git a/src/toil/test/src/resourceTest.py b/src/toil/test/src/resourceTest.py index 498b4aec8b..8c614a6b9b 100644 --- a/src/toil/test/src/resourceTest.py +++ b/src/toil/test/src/resourceTest.py @@ -27,6 +27,8 @@ from toil.resource import ModuleDescriptor, Resource, ResourceException from toil.test import ToilTest from toil.version import exactPython +from configargparse import ArgumentParser + @contextmanager @@ -209,7 +211,7 @@ def testNonPyStandAlone(self): """ def script(): - import argparse + from configargparse import ArgumentParser from toil.common import Toil from toil.job import Job @@ -218,7 +220,7 @@ def fn(): pass if __name__ == '__main__': - parser = argparse.ArgumentParser() + parser = ArgumentParser() Job.Runner.addToilOptions(parser) options = parser.parse_args() job = Job.wrapFn(fn, memory='10M', cores=0.1, disk='10M') diff --git a/src/toil/test/utils/ABCWorkflowDebug/debugWorkflow.py b/src/toil/test/utils/ABCWorkflowDebug/debugWorkflow.py index 5fc7578bde..6278a2417d 100644 --- a/src/toil/test/utils/ABCWorkflowDebug/debugWorkflow.py +++ b/src/toil/test/utils/ABCWorkflowDebug/debugWorkflow.py @@ -139,7 +139,7 @@ def broken_job(job, num): if __name__=="__main__": jobStorePath = sys.argv[1] if len(sys.argv) > 1 else tempfile.mkdtemp("debugWorkflow") options = Job.Runner.getDefaultOptions(jobStorePath) - # options.clean = "always" + options.clean = "never" options.stats = True options.logLevel = "INFO" with Toil(options) as toil: diff --git a/src/toil/test/utils/ABCWorkflowDebug/mkFile.py b/src/toil/test/utils/ABCWorkflowDebug/mkFile.py index 4fbdb045f4..bf07f272d9 100644 --- a/src/toil/test/utils/ABCWorkflowDebug/mkFile.py +++ b/src/toil/test/utils/ABCWorkflowDebug/mkFile.py @@ -1,8 +1,8 @@ -import argparse +from configargparse import ArgumentParser def main(): - parser = argparse.ArgumentParser(description='Creates a file and writes into it.') + parser = ArgumentParser(description='Creates a file and writes into it.') parser.add_argument('file_name', help='File name to be written to.') parser.add_argument('contents', help='A string to be written into the file.') diff --git a/src/toil/test/utils/utilsTest.py b/src/toil/test/utils/utilsTest.py index c5ed08d75e..f639a9bddf 100644 --- a/src/toil/test/utils/utilsTest.py +++ b/src/toil/test/utils/utilsTest.py @@ -27,7 +27,7 @@ import toil from toil import resolveEntryPoint -from toil.common import Config, Toil +from toil.common import Config, Toil, addOptions from toil.job import Job from toil.lib.bioio import system from toil.test import (ToilTest, @@ -115,6 +115,26 @@ def statusCommand(self, failIfNotComplete=False): commandTokens.append('--failIfNotComplete') return commandTokens + def test_config_functionality(self): + """Ensure that creating and reading back the config file works""" + config_file = os.path.abspath("config.yaml") + config_command = [self.toilMain, 'config', config_file] + # make sure the command `toil config file_path` works + try: + subprocess.check_call(config_command) + except subprocess.CalledProcessError: + self.fail("The toil config utility failed!") + + parser = Job.Runner.getDefaultArgumentParser() + # make sure that toil can read from the generated config file + try: + parser.parse_args(["random_jobstore", "--config", config_file]) + except SystemExit: + self.fail("Failed to parse the default generated config file!") + finally: + os.remove(config_file) + + @needs_rsync3 @pytest.mark.timeout(1200) @needs_aws_ec2 diff --git a/src/toil/utils/toilConfig.py b/src/toil/utils/toilConfig.py new file mode 100644 index 0000000000..2adaeef4ec --- /dev/null +++ b/src/toil/utils/toilConfig.py @@ -0,0 +1,35 @@ +# Copyright (C) 2015-2021 Regents of the University of California +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Create a config file with all default Toil options.""" +import logging +import os + +from configargparse import ArgParser + +from toil.common import generate_config +from toil.statsAndLogging import set_logging_from_options, add_logging_options + +logger = logging.getLogger(__name__) + + +def main() -> None: + parser = ArgParser() + + parser.add_argument("output", default="config.yaml") + add_logging_options(parser) + options = parser.parse_args() + set_logging_from_options(options) + logger.debug("Attempting to write a default config file to %s.", os.path.abspath(options.output)) + generate_config(os.path.abspath(options.output)) + logger.info("Successfully wrote a default config file to %s.", os.path.abspath(options.output)) diff --git a/src/toil/utils/toilMain.py b/src/toil/utils/toilMain.py index ee634f409a..9f0174b23f 100755 --- a/src/toil/utils/toilMain.py +++ b/src/toil/utils/toilMain.py @@ -57,6 +57,7 @@ def loadModules() -> Dict[str, types.ModuleType]: from toil.utils import toilSshCluster # noqa from toil.utils import toilStats # noqa from toil.utils import toilStatus # noqa + from toil.utils import toilConfig # noqa return {'-'.join([i.lower() for i in re.findall('[A-Z][^A-Z]*', name)]): module for name, module in locals().items()} diff --git a/src/toil/wdl/toilwdl.py b/src/toil/wdl/toilwdl.py index f322898179..b793ea2b78 100644 --- a/src/toil/wdl/toilwdl.py +++ b/src/toil/wdl/toilwdl.py @@ -11,7 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import argparse +from configargparse import ArgumentParser import logging import os import subprocess @@ -57,7 +57,7 @@ def main(): Additional support to be broadened to include more features soon. """ - parser = argparse.ArgumentParser(description='Runs WDL files with toil.') + parser = ArgumentParser(description='Runs WDL files with toil.') parser.add_argument('wdl_file', help='A WDL workflow file.') parser.add_argument('secondary_file', help='A secondary data file (json).') parser.add_argument("--jobStore", type=str, required=False, default=None) diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index 764b501c5b..f9e0baa256 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -38,6 +38,7 @@ from urllib.parse import urlsplit, urljoin, quote, unquote import WDL +from configargparse import ArgParser from WDL._util import byte_size_units from WDL.runtime.task_container import TaskContainer from WDL.runtime.backend.singularity import SingularityContainer @@ -2397,7 +2398,7 @@ def main() -> None: A Toil workflow to interpret WDL input files. """ - parser = argparse.ArgumentParser(description='Runs WDL files with toil.') + parser = ArgParser(description='Runs WDL files with toil.') addOptions(parser, jobstore_as_flag=True) parser.add_argument("wdl_uri", type=str, diff --git a/src/toil/worker.py b/src/toil/worker.py index abb62e9e56..19c01fda0c 100644 --- a/src/toil/worker.py +++ b/src/toil/worker.py @@ -29,6 +29,8 @@ from contextlib import contextmanager from typing import Any, Callable, Iterator, List, Optional +from configargparse import ArgParser + from toil import logProcessContext from toil.common import Config, Toil, safeUnpickleFromStream from toil.cwl.utils import (CWL_UNSUPPORTED_REQUIREMENT_EXCEPTION, @@ -640,7 +642,7 @@ def make_parent_writable(func: Callable[[str], Any], path: str, _: Any) -> None: else: return 0 -def parse_args(args: List[str]) -> argparse.Namespace: +def parse_args(args: List[str]) -> Any: """ Parse command-line arguments to the worker. """ @@ -649,7 +651,7 @@ def parse_args(args: List[str]) -> argparse.Namespace: args = args[1:] # Make the parser - parser = argparse.ArgumentParser() + parser = ArgParser() # Now add all the options to it From ef2b923cd400a2644f3bfd0953a76c3208e9ab81 Mon Sep 17 00:00:00 2001 From: Glenn Hickey Date: Thu, 5 Oct 2023 10:51:16 -0400 Subject: [PATCH 3/4] take any nvidia-smi exception as not having gpu (#4611) Co-authored-by: Adam Novak --- src/toil/lib/accelerators.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/src/toil/lib/accelerators.py b/src/toil/lib/accelerators.py index 2c7f50c9c2..0c0fbc00f1 100644 --- a/src/toil/lib/accelerators.py +++ b/src/toil/lib/accelerators.py @@ -100,13 +100,7 @@ def count_nvidia_gpus() -> int: .firstChild, ).data ) - except ( - FileNotFoundError, - subprocess.CalledProcessError, - IndexError, - ValueError, - PermissionError, - ): + except: return 0 # TODO: Parse each gpu > product_name > text content and convert to some From b73b9ef53eb914dbd7dbf86c8666c8a572e2eb28 Mon Sep 17 00:00:00 2001 From: stxue1 <122345910+stxue1@users.noreply.github.com> Date: Thu, 5 Oct 2023 10:11:24 -0700 Subject: [PATCH 4/4] Make WDLOutputJob collect all task outputs (#4602) Co-authored-by: Adam Novak --- src/toil/wdl/wdltoil.py | 38 ++++++++++++++++++++++++++------------ 1 file changed, 26 insertions(+), 12 deletions(-) diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index f9e0baa256..321c896081 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -2284,9 +2284,10 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: # Make jobs to run all the parts of the workflow sink = self.create_subgraph(self._workflow.body, [], bindings) - if self._workflow.outputs: + if self._workflow.outputs != []: # Compare against empty list as None means there should be outputs + # Either the output section is declared and nonempty or it is not declared # Add evaluating the outputs after the sink - outputs_job = WDLOutputsJob(self._workflow.outputs, sink.rv(), self._execution_dir) + outputs_job = WDLOutputsJob(self._workflow, sink.rv(), self._execution_dir) sink.addFollowOn(outputs_job) # Caller is responsible for making sure namespaces are applied self.defer_postprocessing(outputs_job) @@ -2301,15 +2302,14 @@ class WDLOutputsJob(WDLBaseJob): Returns an environment with just the outputs bound, in no namespace. """ - - def __init__(self, outputs: List[WDL.Tree.Decl], bindings: Promised[WDLBindings], execution_dir: Optional[str] = None, **kwargs: Any): + def __init__(self, workflow: WDL.Tree.Workflow, bindings: Promised[WDLBindings], execution_dir: Optional[str] = None, **kwargs: Any): """ Make a new WDLWorkflowOutputsJob for the given workflow, with the given set of bindings after its body runs. """ super().__init__(execution_dir, **kwargs) - self._outputs = outputs self._bindings = bindings + self._workflow = workflow def run(self, file_store: AbstractFileStore) -> WDLBindings: """ @@ -2317,15 +2317,29 @@ def run(self, file_store: AbstractFileStore) -> WDLBindings: """ super().run(file_store) - # Evaluate all the outputs in the normal, non-task-outputs library context - standard_library = ToilWDLStdLibBase(file_store, self._execution_dir) - # Combine the bindings from the previous job - - output_bindings = evaluate_output_decls(self._outputs, unwrap(self._bindings), standard_library) - + if self._workflow.outputs is None: + # The output section is not declared + # So get all task outputs and return that + # First get all task output names + output_set = set() + for call in self._workflow.body: + if isinstance(call, WDL.Tree.Call): + for type_binding in call.effective_outputs: + output_set.add(type_binding.name) + # Collect all bindings that are task outputs + output_bindings: WDL.Env.Bindings[WDL.Value.Base] = WDL.Env.Bindings() + for binding in unwrap(self._bindings): + if binding.name in output_set: + # The bindings will already be namespaced with the task namespaces + output_bindings = output_bindings.bind(binding.name, binding.value) + else: + # Output section is declared and is nonempty, so evaluate normally + # Evaluate all the outputs in the normal, non-task-outputs library context + standard_library = ToilWDLStdLibBase(file_store, self._execution_dir) + # Combine the bindings from the previous job + output_bindings = evaluate_output_decls(self._workflow.outputs, unwrap(self._bindings), standard_library) return self.postprocess(output_bindings) - class WDLRootJob(WDLSectionJob): """ Job that evaluates an entire WDL workflow, and returns the workflow outputs