Skip to content

Commit

Permalink
Add config file support (#4569)
Browse files Browse the repository at this point in the history
* Centralize defaults

* Add requirements

* Grab logLevel

grabbed logLevel used to be the default in Config(), so grab effective
logLevel that is set

* Satisfy mypy

mypy might still complain about missing stubs for configargparser
though

* Fix wrong default

* add config tool

* temp fix

config sets defaults but so does argparse, runs twice in workflows but
deals with tests

* Fix create_config for tests instead

* Fix setting of config defaults

* Go back to previous method, create defaults at init

* Fix default cli options set

* Centralize, config util, and read properly

* Fix type hinting to support 3.9

* mypy

* Fix cwl edge case

* Fix tests

* fix typos, always generate config, fix some tests

* Remove subprocess as maybe tests are flaky on CI with it?

* just run quick_test_offline

* make CI print stuff

* Harden default config creation against races

* Cleanup and argument renaming

* Fix bad yaml and toil status bug

* Fix mypy

* Change behavior of --stats and --clean

* Change test behavior as options namespace and config now have the same
behavior

* Put forgotten line

ouch

* Batchsystem, requirements, fixes for tests

* Mypy conformance

* Mypy conformance

* Fix retryCount argument and kubernetesPodTimeout type

* Only run batchsystem and slurm_test tests on CI

* Whoops, this implementation never worked

* Add pyyaml to requirements for slurm to pass

* Add rest of gitlab CI back and run all tests

* Update stub file to be compatible with updated mypy

* Fix environment CLI option

* Update provisioner test to use configargparse

* Code cleanup and add jobstore_as_flag to DefaultArgumentParser etc

* Fix toil config test

* Add suggestions

* Deprecate options, add underscore CLI options only for newly deprecated options

* Update docs/argparse help and fix bug with deprecated options
also make most generic arg as default for runLocalJobsOnWorkers

* Add config file section to docs

* Remove upper bound for ruamel requirements

* Remove redundancies and improve disableCaching's destination name

* Update src/toil/batchSystems/kubernetes.py

Co-authored-by: Adam Novak <[email protected]>

* Remove redundant line in status util

* Remove comments in configargparse stub

* Workaround to get action=append instead of nargs and get proper backwards compatibility
Fix wrong name for link_imports and move_exports, remove new unused functions

* Import SYS_MAX_SIZE from common rather than duplicating it

* Mypy and syntax errors

* Move config options back to the old naming syntax

* Change names for link_imports and move_exports to camelCase options

* Fix formatting

* Bring back old --restart and --clean functionality where they collide and raise an error

* Make debug less spammy and remove unused types

* Disable kubernetes temporarily

* Revert changes to --restart and --clean collision

* Typo in tests

* Change some comments and add member fields to config

* Fix pickling error when jobstate file doesnt exist and fix threading error when lock file exists then disappears (#4575)

Co-authored-by: Brandon Walker <[email protected]>
Co-authored-by: Adam Novak <[email protected]>

* Reduce the number of assert statements (#4590)

* Change all asserts to raising errors for central toil files

Co-authored-by: Adam Novak <[email protected]>

* Fix mypy and update docs to match options in common

* Update src/toil/common.py

Co-authored-by: Adam Novak <[email protected]>

---------

Co-authored-by: Adam Novak <[email protected]>
Co-authored-by: Brandon Walker <[email protected]>
Co-authored-by: Brandon Walker <[email protected]>
  • Loading branch information
4 people authored Oct 5, 2023
1 parent ff9302c commit f8f9b41
Show file tree
Hide file tree
Showing 44 changed files with 1,055 additions and 591 deletions.
2 changes: 1 addition & 1 deletion attic/toil-sort-example.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import absolute_import
from six.moves import xrange
from argparse import ArgumentParser
from configargparse import ArgumentParser
import os
import logging
import random
Expand Down
3 changes: 3 additions & 0 deletions contrib/mypy-stubs/configargparse/__init__.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .configargparse import ArgParser as ArgParser
from .configargparse import YAMLConfigFileParser as YAMLConfigFileParser
from .configargparse import ArgumentParser as ArgumentParser
30 changes: 30 additions & 0 deletions contrib/mypy-stubs/configargparse/configargparse.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import argparse
from typing import Sequence, Any, TypeVar, OrderedDict

__all__ = [
"ArgumentParser",
"YAMLConfigFileParser",
"ConfigFileParser"
]
_N = TypeVar("_N")

class ConfigFileParser(object):
def get_syntax_description(self) -> Any: ...
def parse(self, stream: Any) -> Any: ...
def serialize(self, items: OrderedDict[Any, Any]) -> Any: ...

class YAMLConfigFileParser(ConfigFileParser):
def get_syntax_description(self) -> str: ...
def parse(self, stream: Any) -> OrderedDict[Any, Any]: ...
def serialize(self, items: OrderedDict[Any, Any], default_flow_style: bool = ...) -> Any: ...

class ArgumentParser(argparse.ArgumentParser):
@property
def _config_file_parser(self) -> Any: ...

def __init__(self, *args: Any, **kwargs: Any) -> None: ...
# There may be a better way of type hinting this without a type: ignore, but mypy gets unhappy pretty much no matter what as the signatures for parse_args doesn't match with its superclass in argparse
def parse_args(self, args: Sequence[str] | None = None, namespace: Namespace | None = None, config_file_contents: str | None = None, env_vars: Any=None) -> Namespace: ... # type: ignore[override]

Namespace = argparse.Namespace
ArgParser = ArgumentParser
48 changes: 32 additions & 16 deletions docs/running/cliOptions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,29 @@ Commandline Options

A quick way to see all of Toil's commandline options is by executing the following on a toil script::

$ toil example.py --help
$ python example.py --help

For a basic toil workflow, Toil has one mandatory argument, the job store. All other arguments are optional.

The Config File
-------------
Instead of changing the arguments on the CLI, Toil offers support for using a configuration file (Note: Support for the
configuration file and environmental variables require the use of ``configargparse``).

To generate a default configuration file::

$ toil config [file].yaml

After editing the config file, make Toil take in the new options::

$ python example.py --config=[file].yaml

If CLI options are used in addition with the configuration file, the CLI options will overwrite the configuration file
options::

$ python example.py --config=[file].yaml --maxNodes 20
# maxNodes=[20] even though default maxNodes=[10]

The Job Store
-------------

Expand Down Expand Up @@ -228,22 +247,19 @@ levels in toil are based on priority from the logging module:
**Data Storage Options**
Allows configuring Toil's data storage.

--linkImports When using a filesystem based job store, CWL input files
are by default symlinked in. Specifying this option
--symlinkImports BOOL When using a filesystem based job store, CWL input files
are by default symlinked in. Setting this option to True
instead copies the files into the job store, which may
protect them from being modified externally. When not
specified and as long as caching is enabled, Toil will
protect them from being modified externally. When set
to False and as long as caching is enabled, Toil will
protect the file automatically by changing the permissions
to read-only.
--moveExports When using a filesystem based job store, output files
to read-only. (Default=True)
--moveOutputs BOOL When using a filesystem based job store, output files
are by default moved to the output directory, and a
symlink to the moved exported file is created at the
initial location. Specifying this option instead copies
the files into the output directory. Applies to
filesystem-based job stores only.
--disableCaching Disables caching in the file store. This flag must be
set to use a batch system that does not support
cleanup, such as Parasol.
initial location. Setting this option to True instead
copies the files into the output directory. Applies to
filesystem-based job stores only. (Default=False)
--caching BOOL Set caching options. This must be set to "false"
to use a batch system that does not support
cleanup, such as Parasol. Set to "true" if caching
Expand Down Expand Up @@ -280,7 +296,7 @@ autoscaled cluster, as well as parameters to control the level of provisioning.
if using auto-scaling. This should be provided as a
comma-separated list of the same length as the list of
node types. default=0
--maxNodes MAXNODES Maximum number of nodes of each type in the cluster,
--maxNodes MAXNODES Maximum number of nodes of each type in the cluster, Maximum number of nodes of each type in the cluster,
if using autoscaling, provided as a comma-separated
list. The first value is used as a default if the list
length is less than the number of nodeTypes.
Expand Down Expand Up @@ -363,7 +379,7 @@ from the batch system.
Only applicable to jobs that do not specify an
explicit value for this requirement. Standard suffixes
like K, Ki, M, Mi, G or Gi are supported. Default is
2.0G
2.0Gi
--defaultCores FLOAT The default number of CPU cores to dedicate a job.
Only applicable to jobs that do not specify an
explicit value for this requirement. Fractions of a
Expand All @@ -374,7 +390,7 @@ from the batch system.
Only applicable to jobs that do not specify an
explicit value for this requirement. Standard suffixes
like K, Ki, M, Mi, G or Gi are supported. Default is
2.0G
2.0Gi
--defaultAccelerators ACCELERATOR
The default amount of accelerators to request for a
job. Only applicable to jobs that do not specify an
Expand Down
3 changes: 3 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,7 @@ PyPubSub >=4.0.3, <5
addict>=2.2.1, <2.5
pytz>=2012
enlighten>=1.5.2, <2
configargparse>=1.7,<2
ruamel.yaml>=0.15
pyyaml>=6,<7
typing-extensions>=4.6.2, <5
12 changes: 6 additions & 6 deletions src/toil/batchSystems/awsBatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -559,17 +559,17 @@ def killBatchJobs(self, job_ids: List[int]) -> None:

@classmethod
def add_options(cls, parser: Union[ArgumentParser, _ArgumentGroup]) -> None:
parser.add_argument("--awsBatchRegion", dest="aws_batch_region", default=None,
parser.add_argument("--awsBatchRegion", dest="aws_batch_region", default=None, env_var="TOIL_AWS_REGION",
help="The AWS region containing the AWS Batch queue to submit to.")
parser.add_argument("--awsBatchQueue", dest="aws_batch_queue", default=None,
parser.add_argument("--awsBatchQueue", dest="aws_batch_queue", default=None, env_var="TOIL_AWS_BATCH_QUEUE",
help="The name or ARN of the AWS Batch queue to submit to.")
parser.add_argument("--awsBatchJobRoleArn", dest="aws_batch_job_role_arn", default=None,
parser.add_argument("--awsBatchJobRoleArn", dest="aws_batch_job_role_arn", default=None, env_var="TOIL_AWS_BATCH_JOB_ROLE_ARN",
help=("The ARN of an IAM role to run AWS Batch jobs as, so they "
"can e.g. access a job store. Must be assumable by "
"ecs-tasks.amazonaws.com."))

@classmethod
def setOptions(cls, setOption: OptionSetter) -> None:
setOption("aws_batch_region", default=None)
setOption("aws_batch_queue", default=None, env=["TOIL_AWS_BATCH_QUEUE"])
setOption("aws_batch_job_role_arn", default=None, env=["TOIL_AWS_BATCH_JOB_ROLE_ARN"])
setOption("aws_batch_region")
setOption("aws_batch_queue")
setOption("aws_batch_job_role_arn")
56 changes: 29 additions & 27 deletions src/toil/batchSystems/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ def __init__(self, config: Config, maxCores: int, maxMemory: int, maxDisk: int)
super().__init__(config, maxCores, maxMemory, maxDisk)

# Re-type the config to make sure it has all the fields we need.
# This convinces MyPy we really do have this type.
assert isinstance(config, KubernetesBatchSystem.KubernetesConfig)

# Turn down log level for Kubernetes modules and dependencies.
Expand All @@ -167,26 +168,26 @@ def __init__(self, config: Config, maxCores: int, maxMemory: int, maxDisk: int)
self._apis: KubernetesBatchSystem._ApiStorageDict = {}

# Get our namespace (and our Kubernetes credentials to make sure they exist)
self.namespace = self._api('namespace')
self.namespace: str = self._api('namespace')

# Decide if we are going to mount a Kubernetes host path as the Toil
# work dir in the workers, for shared caching.
self.host_path = config.kubernetes_host_path
self.host_path: Optional[str] = config.kubernetes_host_path

# Get the service account name to use, if any.
self.service_account = config.kubernetes_service_account
self.service_account: Optional[str] = config.kubernetes_service_account

# Get how long we should wait for a pod that lands on a node to
# actually start.
self.pod_timeout = config.kubernetes_pod_timeout
self.pod_timeout: float = config.kubernetes_pod_timeout

# Get the username to mark jobs with
username = config.kubernetes_owner
username = config.kubernetes_owner or self.get_default_kubernetes_owner()
# And a unique ID for the run
self.unique_id = uuid.uuid4()

# Create a prefix for jobs, starting with our username
self.job_prefix = f'{username}-toil-{self.unique_id}-'
self.job_prefix: str = f'{username}-toil-{self.unique_id}-'
# Instead of letting Kubernetes assign unique job names, we assign our
# own based on a numerical job ID. This functionality is managed by the
# BatchSystemLocalSupport.
Expand All @@ -199,17 +200,17 @@ def __init__(self, config: Config, maxCores: int, maxMemory: int, maxDisk: int)
# conformance tests. To work around this, we tag all our jobs with an
# explicit TTL that is long enough that we're sure we can deal with all
# the finished jobs before they expire.
self.finished_job_ttl = 3600 # seconds
self.finished_job_ttl: int = 3600 # seconds

# Here is where we will store the user script resource object if we get one.
self.user_script: Optional[Resource] = None

# Ge the image to deploy from Toil's configuration
self.docker_image = applianceSelf()
self.docker_image: str = applianceSelf()

# Try and guess what Toil work dir the workers will use.
# We need to be able to provision (possibly shared) space there.
self.worker_work_dir = Toil.getToilWorkDir(config.workDir)
self.worker_work_dir: str = Toil.getToilWorkDir(config.workDir)
if (config.workDir is None and
os.getenv('TOIL_WORKDIR') is None and
self.worker_work_dir == tempfile.gettempdir()):
Expand All @@ -226,17 +227,17 @@ def __init__(self, config: Config, maxCores: int, maxMemory: int, maxDisk: int)
self.environment['TMPDIR'] = '/var/tmp'

# Get the name of the AWS secret, if any, to mount in containers.
self.aws_secret_name = os.environ.get("TOIL_AWS_SECRET_NAME", None)
self.aws_secret_name: Optional[str] = os.environ.get("TOIL_AWS_SECRET_NAME", None)

# Set this to True to enable the experimental wait-for-job-update code
self.enable_watching = os.environ.get("KUBE_WATCH_ENABLED", False)
self.enable_watching: bool = os.environ.get("KUBE_WATCH_ENABLED", False)

# This will be a label to select all our jobs.
self.run_id = f'toil-{self.unique_id}'
self.run_id: str = f'toil-{self.unique_id}'

# Keep track of available resources.
maxMillicores = int(SYS_MAX_SIZE if self.maxCores == SYS_MAX_SIZE else self.maxCores * 1000)
self.resource_sources = [
self.resource_sources: List[ResourcePool] = [
# A pool representing available job slots
ResourcePool(self.config.max_jobs, 'job slots'),
# A pool representing available CPU in units of millicores (1 CPU
Expand All @@ -261,16 +262,16 @@ def __init__(self, config: Config, maxCores: int, maxMemory: int, maxDisk: int)
self._killed_queue_jobs: Set[int] = set()

# We use this event to signal shutdown
self._shutting_down = Event()
self._shutting_down: Event = Event()

# A lock to protect critical regions when working with queued jobs.
self._mutex = RLock()
self._mutex: RLock = RLock()

# A condition set to true when there is more work to do. e.g.: new job
# in the queue or any resource becomes available.
self._work_available = Condition(lock=self._mutex)
self._work_available: Condition = Condition(lock=self._mutex)

self.schedulingThread = Thread(target=self._scheduler, daemon=True)
self.schedulingThread: Thread = Thread(target=self._scheduler, daemon=True)
self.schedulingThread.start()

def _pretty_print(self, kubernetes_object: Any) -> str:
Expand Down Expand Up @@ -1864,24 +1865,25 @@ class KubernetesConfig(Protocol):

@classmethod
def add_options(cls, parser: Union[ArgumentParser, _ArgumentGroup]) -> None:
parser.add_argument("--kubernetesHostPath", dest="kubernetes_host_path", default=None,
parser.add_argument("--kubernetesHostPath", dest="kubernetes_host_path", default=None, env_var="TOIL_KUBERNETES_HOST_PATH",
help="Path on Kubernetes hosts to use as shared inter-pod temp directory. "
"(default: %(default)s)")
parser.add_argument("--kubernetesOwner", dest="kubernetes_owner", default=cls.get_default_kubernetes_owner(),
help="Username to mark Kubernetes jobs with. "
"(default: %(default)s)")
parser.add_argument("--kubernetesServiceAccount", dest="kubernetes_service_account", default=None,
parser.add_argument("--kubernetesOwner", dest="kubernetes_owner", default=None, env_var="TOIL_KUBERNETES_OWNER",
help=f"Username to mark Kubernetes jobs with. If the provided value is None, the value will "
f"be generated at runtime. "
f"(Generated default: {cls.get_default_kubernetes_owner()})")
parser.add_argument("--kubernetesServiceAccount", dest="kubernetes_service_account", default=None, env_var="TOIL_KUBERNETES_SERVICE_ACCOUNT",
help="Service account to run jobs as. "
"(default: %(default)s)")
parser.add_argument("--kubernetesPodTimeout", dest="kubernetes_pod_timeout", default=120,
parser.add_argument("--kubernetesPodTimeout", dest="kubernetes_pod_timeout", default=120, env_var="TOIL_KUBERNETES_POD_TIMEOUT", type=float,
help="Seconds to wait for a scheduled Kubernetes pod to start running. "
"(default: %(default)s)")

OptionType = TypeVar('OptionType')
@classmethod
def setOptions(cls, setOption: OptionSetter) -> None:
setOption("kubernetes_host_path", default=None, env=['TOIL_KUBERNETES_HOST_PATH'])
setOption("kubernetes_owner", default=cls.get_default_kubernetes_owner(), env=['TOIL_KUBERNETES_OWNER'])
setOption("kubernetes_service_account", default=None, env=['TOIL_KUBERNETES_SERVICE_ACCOUNT'])
setOption("kubernetes_pod_timeout", default=120, env=['TOIL_KUBERNETES_POD_TIMEOUT'])
setOption("kubernetes_host_path")
setOption("kubernetes_owner")
setOption("kubernetes_service_account",)
setOption("kubernetes_pod_timeout")

4 changes: 3 additions & 1 deletion src/toil/batchSystems/local_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from toil.batchSystems.singleMachine import SingleMachineBatchSystem
from toil.common import Config
from toil.job import JobDescription
from toil.lib.threading import cpu_count

logger = logging.getLogger(__name__)

Expand All @@ -28,8 +29,9 @@ class BatchSystemLocalSupport(BatchSystemSupport):

def __init__(self, config: Config, maxCores: float, maxMemory: int, maxDisk: int) -> None:
super().__init__(config, maxCores, maxMemory, maxDisk)
max_local_jobs = config.max_local_jobs if config.max_local_jobs is not None else cpu_count()
self.localBatch: SingleMachineBatchSystem = SingleMachineBatchSystem(
config, maxCores, maxMemory, maxDisk, max_jobs=config.max_local_jobs
config, maxCores, maxMemory, maxDisk, max_jobs=max_local_jobs
)

def handleLocalJob(self, jobDesc: JobDescription) -> Optional[int]:
Expand Down
12 changes: 7 additions & 5 deletions src/toil/batchSystems/mesos/batchSystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def __init__(self, config, maxCores, maxMemory, maxDisk):
self.jobQueues = JobQueue()

# Address of the Mesos master in the form host:port where host can be an IP or a hostname
self.mesos_endpoint = config.mesos_endpoint
self.mesos_endpoint = config.mesos_endpoint or self.get_default_mesos_endpoint()
if config.mesos_role is not None:
self.mesos_role = config.mesos_role
self.mesos_name = config.mesos_name
Expand Down Expand Up @@ -846,8 +846,10 @@ def get_default_mesos_endpoint(cls) -> str:

@classmethod
def add_options(cls, parser: Union[ArgumentParser, _ArgumentGroup]) -> None:
parser.add_argument("--mesosEndpoint", "--mesosMaster", dest="mesos_endpoint", default=cls.get_default_mesos_endpoint(),
help="The host and port of the Mesos master separated by colon. (default: %(default)s)")
parser.add_argument("--mesosEndpoint", "--mesosMaster", dest="mesos_endpoint", default=None,
help=f"The host and port of the Mesos master separated by colon. If the provided value "
f"is None, the value will be generated at runtime. "
f"(Generated default: {cls.get_default_mesos_endpoint})")
parser.add_argument("--mesosFrameworkId", dest="mesos_framework_id",
help="Use a specific Mesos framework ID.")
parser.add_argument("--mesosRole", dest="mesos_role",
Expand All @@ -857,8 +859,8 @@ def add_options(cls, parser: Union[ArgumentParser, _ArgumentGroup]) -> None:

@classmethod
def setOptions(cls, setOption: OptionSetter):
setOption("mesos_endpoint", None, None, cls.get_default_mesos_endpoint(), old_names=["mesosMasterAddress"])
setOption("mesos_name", None, None, "toil")
setOption("mesos_endpoint", old_names=["mesosMasterAddress"])
setOption("mesos_name")
setOption("mesos_role")
setOption("mesos_framework_id")

Loading

0 comments on commit f8f9b41

Please sign in to comment.