From 83869555737f7d04d854a10a8fc85e342c64decc Mon Sep 17 00:00:00 2001 From: Adam Novak Date: Thu, 2 Nov 2023 17:22:18 -0400 Subject: [PATCH 1/4] Turn batch system tests back on (#4649) This should fix #4648 by turning on the batch system tests again. The Mesos-specific ones are already moved elsewhere. Co-authored-by: Lon Blauvelt --- .gitlab-ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 4aff012b58..4f04798c40 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -190,7 +190,7 @@ batch_systems: - export TOIL_TES_ENDPOINT="http://127.0.0.1:8000" - export TOIL_TES_USER="${FUNNEL_SERVER_USER}" - export TOIL_TES_PASSWORD="${FUNNEL_SERVER_PASSWORD}" -# - make test threads="${TEST_THREADS}" marker="${MARKER}" tests="src/toil/test/batchSystems/batchSystemTest.py src/toil/test/mesos/MesosDataStructuresTest.py" + - make test threads="${TEST_THREADS}" marker="${MARKER}" tests="src/toil/test/batchSystems/batchSystemTest.py" - kill $(jobs -p) || true slurm_test: From 2068c760618c8835cee42461f904cd0f9b640cc6 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 6 Nov 2023 10:19:07 -0800 Subject: [PATCH 2/4] Bump miniwdl from 1.10.0 to 1.11.1 (#4669) Bumps [miniwdl](https://github.com/chanzuckerberg/miniwdl) from 1.10.0 to 1.11.1. - [Release notes](https://github.com/chanzuckerberg/miniwdl/releases) - [Commits](https://github.com/chanzuckerberg/miniwdl/compare/v1.10.0...v1.11.1) --- updated-dependencies: - dependency-name: miniwdl dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- requirements-wdl.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements-wdl.txt b/requirements-wdl.txt index 3ca204aaf6..314ee398b4 100644 --- a/requirements-wdl.txt +++ b/requirements-wdl.txt @@ -1,3 +1,3 @@ -miniwdl==1.10.0 +miniwdl==1.11.1 wdlparse==0.1.0 graphlib-backport==1.0 ; python_version < '3.9' From 2ecbb21937f3cf7917dafa4e9c6cd51efcd70e7a Mon Sep 17 00:00:00 2001 From: Adam Novak Date: Wed, 8 Nov 2023 15:00:42 -0500 Subject: [PATCH 3/4] Move TES batch system to a plugin (#4650) * Implement new batch system finding API and plugin scan * Satisfy MyPy * Implement deprecation for the old constants * Get plugin loader to actually load, and drop TES * Remove TES Kubernetes setup we don't use * Stop asking for needs_tes --------- Co-authored-by: Lon Blauvelt --- .gitlab-ci.yml | 26 - contrib/tes/funnel/kubernetes/setup.yml | 232 --------- docs/appendices/environment_vars.rst | 11 - docs/running/cliOptions.rst | 8 - requirements.txt | 1 - setup.cfg | 5 - src/toil/batchSystems/contained_executor.py | 2 +- src/toil/batchSystems/options.py | 22 +- src/toil/batchSystems/registry.py | 121 ++++- src/toil/batchSystems/tes.py | 462 ------------------ src/toil/common.py | 6 +- src/toil/test/__init__.py | 26 - src/toil/test/batchSystems/batchSystemTest.py | 32 +- 13 files changed, 126 insertions(+), 828 deletions(-) delete mode 100644 contrib/tes/funnel/kubernetes/setup.yml delete mode 100644 src/toil/batchSystems/tes.py diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 4f04798c40..3e0133d27e 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -166,32 +166,6 @@ batch_systems: script: - pwd - ${MAIN_PYTHON_PKG} -m virtualenv venv && . venv/bin/activate && pip install -U pip wheel && make prepare && make develop extras=[all] packages='htcondor==10.2.3' - - wget https://github.com/ohsu-comp-bio/funnel/releases/download/0.10.1/funnel-linux-amd64-0.10.1.tar.gz - - tar -xvf funnel-linux-amd64-0.10.1.tar.gz funnel - - export FUNNEL_SERVER_USER=toil - - export FUNNEL_SERVER_PASSWORD=$(openssl rand -hex 256) - - | - cat >funnel.conf <:8000) - --tesUser TES_USER User name to use for basic authentication to TES server. - --tesPassword TES_PASSWORD - Password to use for basic authentication to TES server. - --tesBearerToken TES_BEARER_TOKEN - Bearer token to use for authentication to TES server. --awsBatchRegion AWS_BATCH_REGION The AWS region containing the AWS Batch queue to submit to. diff --git a/requirements.txt b/requirements.txt index e79fe7ebcc..568f1ce1e4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,7 +5,6 @@ docker>=3.7.2, <7 urllib3>=1.26.0,<3 python-dateutil psutil >= 3.0.1, <6 -py-tes>=0.4.2,<1 PyPubSub >=4.0.3, <5 addict>=2.2.1, <2.5 pytz>=2012 diff --git a/setup.cfg b/setup.cfg index 8b4ba740ca..f603a662d0 100644 --- a/setup.cfg +++ b/setup.cfg @@ -34,7 +34,6 @@ markers = slow slurm singularity - tes torque wes_server cwl_small_log_dir @@ -68,7 +67,3 @@ no_warn_no_return = True [mypy-toil.cwl.*] strict = True - -[mypy-tes] -ignore_errors = True -follow_imports=skip diff --git a/src/toil/batchSystems/contained_executor.py b/src/toil/batchSystems/contained_executor.py index b46ecefabf..699f78e332 100644 --- a/src/toil/batchSystems/contained_executor.py +++ b/src/toil/batchSystems/contained_executor.py @@ -14,7 +14,7 @@ """ Executor for running inside a container. -Useful for Kubernetes and TES batch systems. +Useful for Kubernetes batch system and TES batch system plugin. """ import base64 import logging diff --git a/src/toil/batchSystems/options.py b/src/toil/batchSystems/options.py index f028a64da9..db9c07c097 100644 --- a/src/toil/batchSystems/options.py +++ b/src/toil/batchSystems/options.py @@ -21,8 +21,8 @@ else: from typing_extensions import Protocol -from toil.batchSystems.registry import (BATCH_SYSTEM_FACTORY_REGISTRY, - BATCH_SYSTEMS, +from toil.batchSystems.registry import (get_batch_system, + get_batch_systems, DEFAULT_BATCH_SYSTEM) from toil.lib.threading import cpu_count @@ -55,14 +55,14 @@ def set_batchsystem_options(batch_system: Optional[str], set_option: OptionSette """ if batch_system is not None: # Use this batch system - batch_system_type = BATCH_SYSTEM_FACTORY_REGISTRY[batch_system]() + batch_system_type = get_batch_system(batch_system) batch_system_type.setOptions(set_option) else: - for factory in BATCH_SYSTEM_FACTORY_REGISTRY.values(): + for name in get_batch_systems(): # All the batch systems are responsible for setting their own options # with their setOptions() class methods. try: - batch_system_type = factory() + batch_system_type = get_batch_system(name) except ImportError: # Skip anything we can't import continue @@ -86,9 +86,9 @@ def add_all_batchsystem_options(parser: Union[ArgumentParser, _ArgumentGroup]) - "--batchSystem", dest="batchSystem", default=DEFAULT_BATCH_SYSTEM, - choices=BATCH_SYSTEMS, + choices=get_batch_systems(), help=f"The type of batch system to run the job(s) with, currently can be one " - f"of {', '.join(BATCH_SYSTEMS)}. default={DEFAULT_BATCH_SYSTEM}", + f"of {', '.join(get_batch_systems())}. default={DEFAULT_BATCH_SYSTEM}", ) parser.add_argument( "--disableHotDeployment", @@ -175,14 +175,14 @@ def add_all_batchsystem_options(parser: Union[ArgumentParser, _ArgumentGroup]) - "systems such as gridengine, htcondor, torque, slurm, and lsf." ) - for factory in BATCH_SYSTEM_FACTORY_REGISTRY.values(): + for name in get_batch_systems(): # All the batch systems are responsible for adding their own options # with the add_options class method. try: - batch_system_type = factory() + batch_system_type = get_batch_system(name) except ImportError: # Skip anything we can't import continue # Ask the batch system to create its options in the parser - logger.debug('Add options for %s', batch_system_type) - batch_system_type.add_options(parser) \ No newline at end of file + logger.debug('Add options for %s batch system', name) + batch_system_type.add_options(parser) diff --git a/src/toil/batchSystems/registry.py b/src/toil/batchSystems/registry.py index 848c6d6166..ce4f1f36a9 100644 --- a/src/toil/batchSystems/registry.py +++ b/src/toil/batchSystems/registry.py @@ -12,14 +12,58 @@ # See the License for the specific language governing permissions and # limitations under the License. +import importlib +import pkgutil import logging -from typing import TYPE_CHECKING, Callable, Dict, List, Tuple, Type +import warnings +from typing import TYPE_CHECKING, Callable, Dict, List, Sequence, Tuple, Type + +from toil.lib.compatibility import deprecated +from toil.lib.memoize import memoize if TYPE_CHECKING: from toil.batchSystems.abstractBatchSystem import AbstractBatchSystem logger = logging.getLogger(__name__) +##### +# Plugin system/API +##### + +def add_batch_system_factory(key: str, class_factory: Callable[[], Type['AbstractBatchSystem']]): + """ + Adds a batch system to the registry for workflow or plugin-supplied batch systems. + + :param class_factory: A function that returns a batch system class (NOT an instance), which implements :class:`toil.batchSystems.abstractBatchSystem.AbstractBatchSystem`. + """ + _registry_keys.append(key) + _registry[key] = class_factory + +def get_batch_systems() -> Sequence[str]: + """ + Get the names of all the availsble batch systems. + """ + _load_all_plugins() + + return _registry_keys + +def get_batch_system(key: str) -> Type['AbstractBatchSystem']: + """ + Get a batch system class by name. + + :raises: KeyError if the key is not the name of a batch system, and + ImportError if the batch system's class cannot be loaded. + """ + + return _registry[key]() + + +DEFAULT_BATCH_SYSTEM = 'single_machine' + +##### +# Built-in batch systems +##### + def aws_batch_batch_system_factory(): from toil.batchSystems.awsBatch import AWSBatchBatchSystem return AWSBatchBatchSystem @@ -53,10 +97,6 @@ def slurm_batch_system_factory(): from toil.batchSystems.slurm import SlurmBatchSystem return SlurmBatchSystem -def tes_batch_system_factory(): - from toil.batchSystems.tes import TESBatchSystem - return TESBatchSystem - def torque_batch_system_factory(): from toil.batchSystems.torque import TorqueBatchSystem return TorqueBatchSystem @@ -71,8 +111,11 @@ def kubernetes_batch_system_factory(): from toil.batchSystems.kubernetes import KubernetesBatchSystem return KubernetesBatchSystem +##### +# Registry implementation +##### -BATCH_SYSTEM_FACTORY_REGISTRY: Dict[str, Callable[[], Type["AbstractBatchSystem"]]] = { +_registry: Dict[str, Callable[[], Type["AbstractBatchSystem"]]] = { 'aws_batch' : aws_batch_batch_system_factory, 'parasol' : parasol_batch_system_factory, 'single_machine' : single_machine_batch_system_factory, @@ -80,20 +123,64 @@ def kubernetes_batch_system_factory(): 'lsf' : lsf_batch_system_factory, 'mesos' : mesos_batch_system_factory, 'slurm' : slurm_batch_system_factory, - 'tes' : tes_batch_system_factory, 'torque' : torque_batch_system_factory, 'htcondor' : htcondor_batch_system_factory, 'kubernetes' : kubernetes_batch_system_factory } -BATCH_SYSTEMS = list(BATCH_SYSTEM_FACTORY_REGISTRY.keys()) -DEFAULT_BATCH_SYSTEM = 'single_machine' +_registry_keys = list(_registry.keys()) + +# We will load any packages starting with this prefix and let them call +# add_batch_system_factory() +_PLUGIN_NAME_PREFIX = "toil_batch_system_" + +@memoize +def _load_all_plugins() -> None: + """ + Load all the batch system plugins that are installed. + """ + + for finder, name, is_pkg in pkgutil.iter_modules(): + # For all installed packages + if name.startswith(_PLUGIN_NAME_PREFIX): + # If it is a Toil batch system plugin, import it + importlib.import_module(name) + +##### +# Deprecated API +##### +# We used to directly access these constants, but now the Right Way to use this +# module is add_batch_system_factory() to register and get_batch_systems() to +# get the list/get_batch_system() to get a class by name. + + +def __getattr__(name): + """ + Implement a fallback attribute getter to handle deprecated constants. + + See . + """ + if name == "BATCH_SYSTEM_FACTORY_REGISTRY": + warnings.warn("BATCH_SYSTEM_FACTORY_REGISTRY is deprecated; use get_batch_system() or add_batch_system_factory()", DeprecationWarning) + return _registry + elif name == "BATCH_SYSTEMS": + warnings.warn("BATCH_SYSTEMS is deprecated; use get_batch_systems()", DeprecationWarning) + return _registry_keys + else: + raise AttributeError(f"Module {__name__} ahs no attribute {name}") + + +@deprecated(new_function_name="add_batch_system_factory") def addBatchSystemFactory(key: str, batchSystemFactory: Callable[[], Type['AbstractBatchSystem']]): """ - Adds a batch system to the registry for workflow-supplied batch systems. + Deprecated method to add a batch system. """ - BATCH_SYSTEMS.append(key) - BATCH_SYSTEM_FACTORY_REGISTRY[key] = batchSystemFactory + return add_batch_system_factory(key, batchSystemFactory) + + +##### +# Testing utilities +##### # We need a snapshot save/restore system for testing. We can't just tamper with # the globals because module-level globals are their own references, so we @@ -106,7 +193,7 @@ def save_batch_system_plugin_state() -> Tuple[List[str], Dict[str, Callable[[], tests. """ - snapshot = (list(BATCH_SYSTEMS), dict(BATCH_SYSTEM_FACTORY_REGISTRY)) + snapshot = (list(_registry_keys), dict(_registry)) return snapshot def restore_batch_system_plugin_state(snapshot: Tuple[List[str], Dict[str, Callable[[], Type['AbstractBatchSystem']]]]): @@ -118,7 +205,7 @@ def restore_batch_system_plugin_state(snapshot: Tuple[List[str], Dict[str, Calla # We need to apply the snapshot without rebinding the names, because that # won't affect modules that imported the names. wanted_batch_systems, wanted_registry = snapshot - BATCH_SYSTEMS.clear() - BATCH_SYSTEMS.extend(wanted_batch_systems) - BATCH_SYSTEM_FACTORY_REGISTRY.clear() - BATCH_SYSTEM_FACTORY_REGISTRY.update(wanted_registry) + _registry_keys.clear() + _registry_keys.extend(wanted_batch_systems) + _registry.clear() + _registry.update(wanted_registry) diff --git a/src/toil/batchSystems/tes.py b/src/toil/batchSystems/tes.py deleted file mode 100644 index 90a155e017..0000000000 --- a/src/toil/batchSystems/tes.py +++ /dev/null @@ -1,462 +0,0 @@ -# Copyright (C) 2015-2021 Regents of the University of California -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -""" -Batch system for running Toil workflows on GA4GH TES. - -Useful with network-based job stores when the TES server provides tasks with -credentials, and filesystem-based job stores when the TES server lets tasks -mount the job store. - -Additional containers should be launched with Singularity, not Docker. -""" -import datetime -import logging -import math -import os -import pickle -import time -from argparse import ArgumentParser, _ArgumentGroup -from typing import Any, Callable, Dict, List, Optional, Union - -import tes -from requests.exceptions import HTTPError - -from toil import applianceSelf -from toil.batchSystems.abstractBatchSystem import (EXIT_STATUS_UNAVAILABLE_VALUE, - BatchJobExitReason, - UpdatedBatchJobInfo) -from toil.batchSystems.cleanup_support import BatchSystemCleanupSupport -from toil.batchSystems.contained_executor import pack_job -from toil.batchSystems.options import OptionSetter -from toil.common import Config, Toil -from toil.job import JobDescription -from toil.lib.misc import get_public_ip, slow_down, utc_now -from toil.resource import Resource - -logger = logging.getLogger(__name__) - - -# Map from TES terminal states to Toil batch job exit reasons -STATE_TO_EXIT_REASON: Dict[str, BatchJobExitReason] = { - 'COMPLETE': BatchJobExitReason.FINISHED, - 'CANCELED': BatchJobExitReason.KILLED, - 'EXECUTOR_ERROR': BatchJobExitReason.FAILED, - 'SYSTEM_ERROR': BatchJobExitReason.ERROR, - 'UNKNOWN': BatchJobExitReason.ERROR -} - - -class TESBatchSystem(BatchSystemCleanupSupport): - @classmethod - def supportsAutoDeployment(cls) -> bool: - return True - - @classmethod - def get_default_tes_endpoint(cls) -> str: - """ - Get the default TES endpoint URL to use. - - (unless overridden by an option or environment variable) - """ - return f'http://{get_public_ip()}:8000' - - def __init__(self, config: Config, maxCores: float, maxMemory: int, maxDisk: int) -> None: - super().__init__(config, maxCores, maxMemory, maxDisk) - # Connect to TES, using Funnel-compatible environment variables to fill in credentials if not specified. - tes_endpoint = config.tes_endpoint or self.get_default_tes_endpoint() - self.tes = tes.HTTPClient(tes_endpoint, - user=config.tes_user, - password=config.tes_password, - token=config.tes_bearer_token) - - # Get service info from the TES server and pull out supported storages. - # We need this so we can tell if the server is likely to be able to - # mount any of our local files. These are URL bases that the server - # supports. - server_info = self.tes.get_service_info() - logger.debug("Detected TES server info: %s", server_info) - self.server_storages = server_info.storage or [] - - # Define directories to mount for each task, as py-tes Input objects - self.mounts: List[tes.Input] = [] - - if config.jobStore: - job_store_type, job_store_path = Toil.parseLocator(config.jobStore) - if job_store_type == 'file': - # If we have a file job store, we want to mount it at the same path, if we can - self._mount_local_path_if_possible(job_store_path, job_store_path) - - # If we have AWS credentials, we want to mount them in our home directory if we can. - aws_credentials_path = os.path.join(os.path.expanduser("~"), '.aws') - if os.path.isdir(aws_credentials_path): - self._mount_local_path_if_possible(aws_credentials_path, '/root/.aws') - - # We assign job names based on a numerical job ID. This functionality - # is managed by the BatchSystemLocalSupport. - - # Here is where we will store the user script resource object if we get one. - self.user_script: Optional[Resource] = None - - # Ge the image to deploy from Toil's configuration - self.docker_image = applianceSelf() - - # We need a way to map between our batch system ID numbers, and TES task IDs from the server. - self.bs_id_to_tes_id: Dict[int, str] = {} - self.tes_id_to_bs_id: Dict[str, int] = {} - - def _server_can_mount(self, url: str) -> bool: - """ - Internal function. Should not be called outside this class. - - Return true if the given URL is under a supported storage location for - the TES server, and false otherwise. - """ - # TODO: build some kind of fast matcher in case there are a lot of - # storages supported. - - for base_url in self.server_storages: - if url.startswith(base_url): - return True - return False - - def _mount_local_path_if_possible(self, local_path: str, container_path: str) -> None: - """ - Internal function. Should not be called outside this class. - - If a local path is somewhere the server thinks it can access, mount it - into all the tasks. - """ - # TODO: We aren't going to work well with linked imports if we're mounting the job store into the container... - - path_url = 'file://' + os.path.abspath(local_path) - if os.path.exists(local_path) and self._server_can_mount(path_url): - # We can access this file from the server. Probably. - self.mounts.append(tes.Input(url=path_url, - path=container_path, - type="DIRECTORY" if os.path.isdir(local_path) else "FILE")) - - def setUserScript(self, user_script: Resource) -> None: - logger.debug(f'Setting user script for deployment: {user_script}') - self.user_script = user_script - - # setEnv is provided by BatchSystemSupport, updates self.environment - - def issueBatchJob(self, job_desc: JobDescription, job_environment: Optional[Dict[str, str]] = None) -> int: - # TODO: get a sensible self.maxCores, etc. so we can check_resource_request. - # How do we know if the cluster will autoscale? - - # Try the job as local - local_id = self.handleLocalJob(job_desc) - if local_id is not None: - # It is a local job - return local_id - else: - # We actually want to send to the cluster - - # Check resource requirements (managed by BatchSystemSupport) - self.check_resource_request(job_desc) - - # Make a batch system scope job ID - bs_id = self.getNextJobID() - # Make a vaguely human-readable name. - # TES does not require it to be unique. - # We could add a per-workflow prefix to use with ListTasks, but - # ListTasks doesn't let us filter for newly done tasks, so it's not - # actually useful for us over polling each task. - job_name = str(job_desc) - - # Launch the job on TES - - # Determine job environment - environment = self.environment.copy() - if job_environment: - environment.update(job_environment) - if 'TOIL_WORKDIR' not in environment: - # The appliance container defaults TOIL_WORKDIR to - # /var/lib/toil, but TES doesn't (always?) give us a writable - # /, so we need to use the writable space in /tmp by default - # instead when running on TES. - environment['TOIL_WORKDIR'] = '/tmp' - - # Make a command to run it in the executor - command_list = pack_job(job_desc, self.user_script) - - # Make the sequence of TES containers ("executors") to run. - # We just run one which is the Toil executor to grab the user - # script and do the job. - task_executors = [tes.Executor(image=self.docker_image, - command=command_list, - env=environment - )] - - # Prepare inputs. - task_inputs = list(self.mounts) - # If we had any per-job input files they would come in here. - - # Prepare resource requirements - task_resources = tes.Resources(cpu_cores=math.ceil(job_desc.cores), - ram_gb=job_desc.memory / (1024**3), - disk_gb=job_desc.disk / (1024**3), - # TODO: py-tes spells this differently than Toil - preemptible=job_desc.preemptible) - - # Package into a TES Task - task = tes.Task(name=job_name, - executors=task_executors, - inputs=task_inputs, - resources=task_resources) - - # Launch it and get back the TES ID that we can use to poll the task - tes_id = self.tes.create_task(task) - - # Tie it to the numeric ID - self.bs_id_to_tes_id[bs_id] = tes_id - self.tes_id_to_bs_id[tes_id] = bs_id - - logger.debug('Launched job: %s', job_name) - - return bs_id - - def _get_runtime(self, task: tes.Task) -> Optional[float]: - """ - Internal function. Should not be called outside this class. - - Get the time that the given job ran/has been running for, in seconds, - or None if that time is not available. Never returns 0. - """ - start_time = None - end_time = utc_now() - for log in (task.logs or []): - if log.start_time: - # Find the first start time that is set - start_time = log.start_time - break - - if not start_time: - # It hasn't been running for a measurable amount of time. - return None - - for log in reversed(task.logs or []): - if log.end_time: - # Find the last end time that is set, and override now - end_time = log.end_time - break - # We have a set start time, so it is/was running. Return the time - # it has been running for. - return slow_down((end_time - start_time).total_seconds()) - - def _get_exit_code(self, task: tes.Task) -> int: - """ - Internal function. Should not be called outside this class. - - Get the exit code of the last executor with a log in the task, or - EXIT_STATUS_UNAVAILABLE_VALUE if no executor has a log. - """ - for task_log in reversed(task.logs or []): - for executor_log in reversed(task_log.logs or []): - if isinstance(executor_log.exit_code, int): - # Find the last executor exit code that is a number and return it - return executor_log.exit_code - - if task.state == 'COMPLETE': - # If the task completes without error but has no code logged, the - # code must be 0. - return 0 - - # If we get here we couldn't find an exit code. - return EXIT_STATUS_UNAVAILABLE_VALUE - - def __get_log_text(self, task: tes.Task) -> Optional[str]: - """ - Get the log text (standard error) of the last executor with a log in - the task, or None. - """ - - for task_log in reversed(task.logs or []): - for executor_log in reversed(task_log.logs or []): - if isinstance(executor_log.stderr, str): - # Find the last executor log code that is a string and return it - return executor_log.stderr - - # If we get here we couldn't find a log. - return None - - def getUpdatedBatchJob(self, maxWait: int) -> Optional[UpdatedBatchJobInfo]: - # Remember when we started, for respecting the timeout - entry = datetime.datetime.now() - # This is the updated job we have found, if any - result = None - while result is None and ((datetime.datetime.now() - entry).total_seconds() < maxWait or not maxWait): - result = self.getUpdatedLocalJob(0) - - if result: - return result - - # Collect together the list of TES and batch system IDs for tasks we - # are acknowledging and don't care about anymore. - acknowledged = [] - - for tes_id, bs_id in self.tes_id_to_bs_id.items(): - # Immediately poll all the jobs we issued. - # TODO: There's no way to acknowledge a finished job, so there's no - # faster way to find the newly finished jobs than polling - task = self.tes.get_task(tes_id, view="MINIMAL") - if task.state in ["COMPLETE", "CANCELED", "EXECUTOR_ERROR", "SYSTEM_ERROR"]: - # This task is done! - logger.debug("Found stopped task: %s", task) - - # Acknowledge it - acknowledged.append((tes_id, bs_id)) - - if task.state == "CANCELED": - # Killed jobs aren't allowed to appear as updated. - continue - - # Otherwise, it stopped running and it wasn't our fault. - - # Fetch the task's full info, including logs. - task = self.tes.get_task(tes_id, view="FULL") - - # Record runtime - runtime = self._get_runtime(task) - - # Determine if it succeeded - exit_reason = STATE_TO_EXIT_REASON[task.state] - - # Get its exit code - exit_code = self._get_exit_code(task) - - if task.state == "EXECUTOR_ERROR": - # The task failed, so report executor logs. - logger.warning('Log from failed executor: %s', self.__get_log_text(task)) - - # Compose a result - result = UpdatedBatchJobInfo(jobID=bs_id, exitStatus=exit_code, wallTime=runtime, exitReason=exit_reason) - - # No more iteration needed, we found a result. - break - - # After the iteration, drop all the records for tasks we acknowledged - for (tes_id, bs_id) in acknowledged: - del self.tes_id_to_bs_id[tes_id] - del self.bs_id_to_tes_id[bs_id] - - if not maxWait: - # Don't wait at all - break - elif result is None: - # Wait a bit and poll again - time.sleep(min(maxWait/2, 1.0)) - - # When we get here we have all the result we can get - return result - - def shutdown(self) -> None: - - # Shutdown local processes first - self.shutdownLocal() - - for tes_id in self.tes_id_to_bs_id.keys(): - # Shut down all the TES jobs we issued. - self._try_cancel(tes_id) - - def _try_cancel(self, tes_id: str) -> None: - """ - Internal function. Should not be called outside this class. - - Try to cancel a TES job. - - Succeed if it can't be canceled because it has stopped, - but fail if it can't be canceled for some other reason. - """ - try: - # Kill each of our tasks in TES - self.tes.cancel_task(tes_id) - except HTTPError as e: - if e.response is not None and e.response.status_code in [409, 500]: - # TODO: This is what we probably get when trying to cancel - # something that is actually done. But can we rely on that? - pass - elif '500' in str(e) or '409' in str(e): - # TODO: drop this after merges. - # py-tes might be hiding the actual code and just putting it in a string - pass - else: - raise - - def getIssuedBatchJobIDs(self) -> List[int]: - return self.getIssuedLocalJobIDs() + list(self.bs_id_to_tes_id.keys()) - - def getRunningBatchJobIDs(self) -> Dict[int, float]: - # We need a dict from job_id (integer) to seconds it has been running - bs_id_to_runtime = {} - - for tes_id, bs_id in self.tes_id_to_bs_id.items(): - # Poll every issued task, and get the runtime info right away in - # the default BASIC view. - # TODO: use list_tasks filtering by name prefix and running state! - task = self.tes.get_task(tes_id) - logger.debug("Observed task: %s", task) - if task.state in ["INITIALIZING", "RUNNING"]: - # We count INITIALIZING tasks because they may be e.g. pulling - # Docker containers, and we don't want to time out on them in - # the tests. But they may not have any runtimes, so it might - # not really help. - runtime = self._get_runtime(task) - if runtime: - # We can measure a runtime - bs_id_to_runtime[bs_id] = runtime - # If we can't find a runtime, we can't say it's running - # because we can't say how long it has been running for. - - # Give back the times all our running jobs have been running for. - return bs_id_to_runtime - - def killBatchJobs(self, job_ids: List[int]) -> None: - # Kill all the ones that are local - self.killLocalJobs(job_ids) - - for bs_id in job_ids: - if bs_id in self.bs_id_to_tes_id: - # We sent this to TES. So try to cancel it. - self._try_cancel(self.bs_id_to_tes_id[bs_id]) - # But don't forget the mapping until we actually get the finish - # notification for the job. - - # TODO: If the kill races the collection of a finished update, do we - # have to censor the finished update even if the kill never took - # effect??? That's not implemented. - - @classmethod - def add_options(cls, parser: Union[ArgumentParser, _ArgumentGroup]) -> None: - parser.add_argument("--tesEndpoint", dest="tes_endpoint", default=None, env_var="TOIL_TES_ENDPOINT", - help=f"The http(s) URL of the TES server. If the provided value is None, the value will be " - f"generated at runtime. " - f"(Generated default: {cls.get_default_tes_endpoint()})") - parser.add_argument("--tesUser", dest="tes_user", default=None, env_var="TOIL_TES_USER", - help="User name to use for basic authentication to TES server.") - parser.add_argument("--tesPassword", dest="tes_password", default=None, env_var="TOIL_TES_PASSWORD", - help="Password to use for basic authentication to TES server.") - parser.add_argument("--tesBearerToken", dest="tes_bearer_token", default=None, env_var="TOIL_TES_BEARER_TOKEN", - help="Bearer token to use for authentication to TES server.") - - @classmethod - def setOptions(cls, setOption: OptionSetter) -> None: - # Because we use the keyword arguments, we can't specify a type for setOption without using Protocols. - # TODO: start using Protocols, or just start returning objects to represent the options. - # When actually parsing options, remember to check the environment variables - setOption("tes_endpoint") - setOption("tes_user") - setOption("tes_password") - setOption("tes_bearer_token") diff --git a/src/toil/common.py b/src/toil/common.py index d1c9245198..833cf93181 100644 --- a/src/toil/common.py +++ b/src/toil/common.py @@ -1519,13 +1519,13 @@ def createBatchSystem(config: Config) -> "AbstractBatchSystem": maxMemory=config.maxMemory, maxDisk=config.maxDisk) - from toil.batchSystems.registry import BATCH_SYSTEM_FACTORY_REGISTRY + from toil.batchSystems.registry import get_batch_system, get_batch_systems try: - batch_system = BATCH_SYSTEM_FACTORY_REGISTRY[config.batchSystem]() + batch_system = get_batch_system(config.batchSystem) except KeyError: raise RuntimeError(f'Unrecognized batch system: {config.batchSystem} ' - f'(choose from: {BATCH_SYSTEM_FACTORY_REGISTRY.keys()})') + f'(choose from: {", ".join(get_batch_systems())})') if config.caching and not batch_system.supportsWorkerCleanup(): raise RuntimeError(f'{config.batchSystem} currently does not support shared caching, because it ' diff --git a/src/toil/test/__init__.py b/src/toil/test/__init__.py index 7a10e68e84..3fdc18e838 100644 --- a/src/toil/test/__init__.py +++ b/src/toil/test/__init__.py @@ -447,32 +447,6 @@ def needs_torque(test_item: MT) -> MT: return test_item return unittest.skip("Install PBS/Torque to include this test.")(test_item) - -def needs_tes(test_item: MT) -> MT: - """Use as a decorator before test classes or methods to run only if TES is available.""" - test_item = _mark_test('tes', test_item) - - try: - from toil.batchSystems.tes import TESBatchSystem - except ImportError: - return unittest.skip("Install py-tes to include this test")(test_item) - - tes_url = os.environ.get('TOIL_TES_ENDPOINT', TESBatchSystem.get_default_tes_endpoint()) - try: - urlopen(tes_url) - except HTTPError: - # Funnel happens to 404 if TES is working. But any HTTPError means we - # dialed somebody who picked up. - pass - except URLError: - # Will give connection refused if we can't connect because the server's - # not there. We can also get a "cannot assign requested address" if - # we're on Kubernetes dialing localhost and !!creative things!! have - # been done to the network stack. - return unittest.skip(f"Run a TES server on {tes_url} to include this test")(test_item) - return test_item - - def needs_kubernetes_installed(test_item: MT) -> MT: """Use as a decorator before test classes or methods to run only if Kubernetes is installed.""" test_item = _mark_test('kubernetes', test_item) diff --git a/src/toil/test/batchSystems/batchSystemTest.py b/src/toil/test/batchSystems/batchSystemTest.py index deae0bc4f8..6f8f095881 100644 --- a/src/toil/test/batchSystems/batchSystemTest.py +++ b/src/toil/test/batchSystems/batchSystemTest.py @@ -32,9 +32,9 @@ # protected by annotations. from toil.batchSystems.mesos.test import MesosTestSupport from toil.batchSystems.parasol import ParasolBatchSystem -from toil.batchSystems.registry import (BATCH_SYSTEM_FACTORY_REGISTRY, - BATCH_SYSTEMS, - addBatchSystemFactory, +from toil.batchSystems.registry import (get_batch_system, + get_batch_systems, + add_batch_system_factory, restore_batch_system_plugin_state, save_batch_system_plugin_state) from toil.batchSystems.singleMachine import SingleMachineBatchSystem @@ -54,7 +54,6 @@ needs_mesos, needs_parasol, needs_slurm, - needs_tes, needs_torque, slow) from toil.test.batchSystems.parasolTestSupport import ParasolTestSupport @@ -88,16 +87,16 @@ def tearDown(self): restore_batch_system_plugin_state(self.__state) super().tearDown() - def testAddBatchSystemFactory(self): + def test_add_batch_system_factory(self): def test_batch_system_factory(): # TODO: Adding the same batch system under multiple names means we # can't actually create Toil options, because each version tries to # add its arguments. return SingleMachineBatchSystem - addBatchSystemFactory('testBatchSystem', test_batch_system_factory) - assert ('testBatchSystem', test_batch_system_factory) in BATCH_SYSTEM_FACTORY_REGISTRY.items() - assert 'testBatchSystem' in BATCH_SYSTEMS + add_batch_system_factory('testBatchSystem', test_batch_system_factory) + assert 'testBatchSystem' in get_batch_systems() + assert get_batch_system('testBatchSystem') == SingleMachineBatchSystem class hidden: """ @@ -575,23 +574,6 @@ def test_label_constraints(self): self.assertEqual(str(spec.tolerations), "None") -@needs_tes -@needs_fetchable_appliance -class TESBatchSystemTest(hidden.AbstractBatchSystemTest): - """ - Tests against the TES batch system - """ - - def supportsWallTime(self): - return True - - def createBatchSystem(self): - # Import the batch system when we know we have it. - # Doesn't really matter for TES right now, but someday it might. - from toil.batchSystems.tes import TESBatchSystem - return TESBatchSystem(config=self.config, - maxCores=numCores, maxMemory=1e9, maxDisk=2001) - @needs_aws_batch @needs_fetchable_appliance class AWSBatchBatchSystemTest(hidden.AbstractBatchSystemTest): From bdb02868abf058a05c6c98e495c84f4dd46f83fa Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Thu, 9 Nov 2023 10:31:28 -0500 Subject: [PATCH 4/4] skip unwanted networkx version (#4450) * skip unwanted networkx version * Limit to released major versions of networkx --------- Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Adam Novak --- requirements-cwl.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements-cwl.txt b/requirements-cwl.txt index d978e83b16..0068351039 100644 --- a/requirements-cwl.txt +++ b/requirements-cwl.txt @@ -4,5 +4,5 @@ galaxy-tool-util<23 galaxy-util<23 ruamel.yaml>=0.15,<=0.18.3 ruamel.yaml.clib>=0.2.6 -networkx>=2,<2.8.9 +networkx!=2.8.1,<4 CacheControl[filecache]