From a3ba87a2a8f0be263ba26a4b06137242d8c3de4f Mon Sep 17 00:00:00 2001 From: Nick Tyler Date: Fri, 6 Sep 2024 04:51:41 -0700 Subject: [PATCH 01/21] Test parsl+slurm in CI (#3606) Fixes #3579 --- .github/workflows/parsl+slurm.yaml | 35 ++++++++++++++++++++++++++++++ parsl/tests/configs/slurm_local.py | 26 ++++++++++++++++++++++ parsl/tests/slurm-entrypoint.sh | 16 ++++++++++++++ 3 files changed, 77 insertions(+) create mode 100644 .github/workflows/parsl+slurm.yaml create mode 100644 parsl/tests/configs/slurm_local.py create mode 100755 parsl/tests/slurm-entrypoint.sh diff --git a/.github/workflows/parsl+slurm.yaml b/.github/workflows/parsl+slurm.yaml new file mode 100644 index 0000000000..0828a6cabb --- /dev/null +++ b/.github/workflows/parsl+slurm.yaml @@ -0,0 +1,35 @@ +name: Test Slurm Scheduler +on: + pull_request: + +jobs: + build: + runs-on: ubuntu-22.04 + permissions: + packages: read + strategy: + fail-fast: false + matrix: + container: ["ghcr.io/tylern4/slurm-standalone:slurm-24-05-0-1"] + timeout-minutes: 30 + + container: + image: ${{ matrix.container }} + options: "--platform=linux/amd64 --rm -h node01" + + name: ${{ matrix.container }} + steps: + - name: Checkout + uses: actions/checkout@v3 + + - name: Install Dependencies and Parsl + run: | + CC=/usr/lib64/openmpi/bin/mpicc pip3 install . -r test-requirements.txt + + - name: Verify Parsl Installation + run: | + pytest parsl/tests/ -k "not cleannet and not unix_filesystem_permissions_required" --config parsl/tests/configs/local_threads.py --random-order --durations 10 + + - name: Test Parsl with Slurm Config + run: | + ./parsl/tests/slurm-entrypoint.sh pytest parsl/tests/ -k "not cleannet and not unix_filesystem_permissions_required" --config parsl/tests/configs/slurm_local.py --random-order --durations 10 diff --git a/parsl/tests/configs/slurm_local.py b/parsl/tests/configs/slurm_local.py new file mode 100644 index 0000000000..2a63f68e51 --- /dev/null +++ b/parsl/tests/configs/slurm_local.py @@ -0,0 +1,26 @@ +from parsl.channels import LocalChannel +from parsl.config import Config +from parsl.executors import HighThroughputExecutor +from parsl.launchers import SrunLauncher +from parsl.providers import SlurmProvider + + +def fresh_config(): + return Config( + executors=[ + HighThroughputExecutor( + label="docker_slurm", + encrypted=True, + provider=SlurmProvider( + cmd_timeout=60, # Add extra time for slow scheduler responses + channel=LocalChannel(), + nodes_per_block=1, + init_blocks=1, + min_blocks=1, + max_blocks=1, + walltime='00:10:00', + launcher=SrunLauncher(), + ), + ) + ], + ) diff --git a/parsl/tests/slurm-entrypoint.sh b/parsl/tests/slurm-entrypoint.sh new file mode 100755 index 0000000000..418db8419e --- /dev/null +++ b/parsl/tests/slurm-entrypoint.sh @@ -0,0 +1,16 @@ +#!/bin/bash +set -e + +echo "---> Starting the MUNGE Authentication service (munged) ..." +gosu munge /usr/sbin/munged + +echo "---> Starting the slurmctld ..." +exec gosu slurm /usr/sbin/slurmctld -i -Dvvv & + +echo "---> Waiting for slurmctld to become active before starting slurmd..." + +echo "---> Starting the Slurm Node Daemon (slurmd) ..." +exec /usr/sbin/slurmd -Dvvv & + +echo "---> Running user command '${@}'" +exec "$@" From dc7911fd58f1f8d0e2c56dc42103adad22c6e97d Mon Sep 17 00:00:00 2001 From: Colin Thomas <33940547+colinthomas-z80@users.noreply.github.com> Date: Fri, 6 Sep 2024 09:13:00 -0400 Subject: [PATCH 02/21] TaskVineExecutor: write function data to temp directory (#3592) This moves the working directory for TaskVine executor function data to a directory in /tmp These files were previously written adjacent to the logging in the working directory. They may be written in excess when running a large number of tasks. --- parsl/executors/taskvine/executor.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/parsl/executors/taskvine/executor.py b/parsl/executors/taskvine/executor.py index 2e1efb211f..a15a444d2c 100644 --- a/parsl/executors/taskvine/executor.py +++ b/parsl/executors/taskvine/executor.py @@ -3,6 +3,7 @@ high-throughput system for delegating Parsl tasks to thousands of remote machines """ +import getpass import hashlib import inspect import itertools @@ -18,6 +19,7 @@ import threading import uuid from concurrent.futures import Future +from datetime import datetime from typing import List, Literal, Optional, Union # Import other libraries @@ -215,9 +217,9 @@ def __create_data_and_logging_dirs(self): # Create directories for data and results log_dir = os.path.join(run_dir, self.label) - self._function_data_dir = os.path.join(run_dir, self.label, "function_data") os.makedirs(log_dir) - os.makedirs(self._function_data_dir) + tmp_prefix = f'{self.label}-{getpass.getuser()}-{datetime.now().strftime("%Y%m%d%H%M%S%f")}-' + self._function_data_dir = tempfile.TemporaryDirectory(prefix=tmp_prefix) # put TaskVine logs outside of a Parsl run as TaskVine caches between runs while # Parsl does not. @@ -227,7 +229,7 @@ def __create_data_and_logging_dirs(self): # factory logs go with manager logs regardless self.factory_config.scratch_dir = self.manager_config.vine_log_dir - logger.debug(f"Function data directory: {self._function_data_dir}, log directory: {log_dir}") + logger.debug(f"Function data directory: {self._function_data_dir.name}, log directory: {log_dir}") logger.debug( f"TaskVine manager log directory: {self.manager_config.vine_log_dir}, " f"factory log directory: {self.factory_config.scratch_dir}") @@ -293,7 +295,7 @@ def _path_in_task(self, executor_task_id, *path_components): 'map': Pickled file with a dict between local parsl names, and remote taskvine names. """ task_dir = "{:04d}".format(executor_task_id) - return os.path.join(self._function_data_dir, task_dir, *path_components) + return os.path.join(self._function_data_dir.name, task_dir, *path_components) def submit(self, func, resource_specification, *args, **kwargs): """Processes the Parsl app by its arguments and submits the function From 9491fbb6685e81fbb6b4e7b291868f1c96d2a3f8 Mon Sep 17 00:00:00 2001 From: Harichandra Prasath <129069095+Harichandra-Prasath@users.noreply.github.com> Date: Thu, 12 Sep 2024 21:41:34 +0530 Subject: [PATCH 03/21] Removal of 'task_block' parameter from several launcher's docstring (#3613) In current version, Some of the launchers have parameter called 'task_block' mentioned in the docstring of the __call__ method but it is redundant and have no reflection on the code. This fix removes the mentioned parameter from the docstrings. --- parsl/launchers/launchers.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/parsl/launchers/launchers.py b/parsl/launchers/launchers.py index 021357b56c..e4ac7ec717 100644 --- a/parsl/launchers/launchers.py +++ b/parsl/launchers/launchers.py @@ -61,7 +61,6 @@ def __call__(self, command: str, tasks_per_node: int, nodes_per_block: int) -> s """ Args: - command (string): The command string to be launched - - task_block (string) : bash evaluated string. - fail_on_any: If True, return a nonzero exit code if any worker failed, otherwise zero; if False, return a nonzero exit code if all workers failed, otherwise zero. @@ -131,7 +130,6 @@ def __call__(self, command: str, tasks_per_node: int, nodes_per_block: int) -> s """ Args: - command (string): The command string to be launched - - task_block (string) : bash evaluated string. """ task_blocks = tasks_per_node * nodes_per_block @@ -208,7 +206,6 @@ def __call__(self, command: str, tasks_per_node: int, nodes_per_block: int) -> s """ Args: - command (string): The command string to be launched - - task_block (string) : bash evaluated string. """ task_blocks = tasks_per_node * nodes_per_block @@ -263,7 +260,6 @@ def __call__(self, command: str, tasks_per_node: int, nodes_per_block: int) -> s """ Args: - command (string): The command string to be launched - - task_block (string) : bash evaluated string. """ task_blocks = tasks_per_node * nodes_per_block @@ -311,7 +307,6 @@ def __call__(self, command: str, tasks_per_node: int, nodes_per_block: int) -> s """ Args: - command (string): The command string to be launched - - task_block (string) : bash evaluated string. """ task_blocks = tasks_per_node * nodes_per_block @@ -363,7 +358,6 @@ def __call__(self, command: str, tasks_per_node: int, nodes_per_block: int) -> s """ Args: - command (string): The command string to be launched - - task_block (string) : bash evaluated string. """ task_blocks = tasks_per_node * nodes_per_block From dd9150d7ac26b04eb8ff15247b1c18ce9893f79c Mon Sep 17 00:00:00 2001 From: Harichandra Prasath <129069095+Harichandra-Prasath@users.noreply.github.com> Date: Fri, 13 Sep 2024 15:52:06 +0530 Subject: [PATCH 04/21] Remove broken CI status badge from README (#3614) --- README.rst | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/README.rst b/README.rst index da7f8245a5..b09c8c3ff6 100644 --- a/README.rst +++ b/README.rst @@ -1,6 +1,6 @@ Parsl - Parallel Scripting Library ================================== -|licence| |build-status| |docs| |NSF-1550588| |NSF-1550476| |NSF-1550562| |NSF-1550528| |CZI-EOSS| +|licence| |docs| |NSF-1550588| |NSF-1550476| |NSF-1550562| |NSF-1550528| |CZI-EOSS| Parsl extends parallelism in Python beyond a single computer. @@ -43,9 +43,6 @@ then explore the `parallel computing patterns Date: Mon, 30 Sep 2024 16:25:01 -0500 Subject: [PATCH 05/21] Add usage tracking project name (#3624) Add option to deanonymize usage tracking data by tagging it with a name using parsl.Config.project_name. --- parsl/config.py | 5 +++++ parsl/tests/unit/test_usage_tracking.py | 21 +++++++++++++++++++++ parsl/usage_tracking/usage.py | 8 ++++++++ 3 files changed, 34 insertions(+) diff --git a/parsl/config.py b/parsl/config.py index c3725eccf8..1358e99d28 100644 --- a/parsl/config.py +++ b/parsl/config.py @@ -83,6 +83,9 @@ class Config(RepresentationMixin, UsageInformation): Setting this field to 0 will disable usage tracking. Default (this field is not set): usage tracking is not enabled. Parsl only collects minimal, non personally-identifiable, information used for reporting to our funding agencies. + project_name: str, optional + Option to deanonymize usage tracking data. + If set, this value will be used as the project name in the usage tracking data and placed on the leaderboard. initialize_logging : bool, optional Make DFK optionally not initialize any logging. Log messages will still be passed into the python logging system under the @@ -118,6 +121,7 @@ def __init__(self, max_idletime: float = 120.0, monitoring: Optional[MonitoringHub] = None, usage_tracking: int = 0, + project_name: Optional[str] = None, initialize_logging: bool = True) -> None: executors = tuple(executors or []) @@ -154,6 +158,7 @@ def __init__(self, self.max_idletime = max_idletime self.validate_usage_tracking(usage_tracking) self.usage_tracking = usage_tracking + self.project_name = project_name self.initialize_logging = initialize_logging self.monitoring = monitoring self.std_autopath: Optional[Callable] = std_autopath diff --git a/parsl/tests/unit/test_usage_tracking.py b/parsl/tests/unit/test_usage_tracking.py index 351355811c..1581249cd2 100644 --- a/parsl/tests/unit/test_usage_tracking.py +++ b/parsl/tests/unit/test_usage_tracking.py @@ -43,3 +43,24 @@ def test_invalid_types(level): # we can't instantiate TypeCheckError if we're in typeguard 2.x environment # because it does not exist... so check name using strings. assert ex.type.__name__ in ["TypeCheckError", "TypeError"] + + +@pytest.mark.local +def test_valid_project_name(): + """Test valid project_name.""" + assert ( + Config( + usage_tracking=3, + project_name="unit-test", + ).project_name == "unit-test" + ) + + +@pytest.mark.local +@pytest.mark.parametrize("name", (1, 1.0, True, object())) +def test_invalid_project_name(name): + """Test invalid project_name.""" + with pytest.raises(Exception) as ex: + Config(usage_tracking=3, project_name=name) + + assert ex.type.__name__ in ["TypeCheckError", "TypeError"] diff --git a/parsl/usage_tracking/usage.py b/parsl/usage_tracking/usage.py index 3730fcc464..c22eb529fe 100644 --- a/parsl/usage_tracking/usage.py +++ b/parsl/usage_tracking/usage.py @@ -114,6 +114,7 @@ def __init__(self, dfk, port=50077, sys.version_info.minor, sys.version_info.micro) self.tracking_level = self.check_tracking_level() + self.project_name = self.config.project_name self.start_time = None logger.debug("Tracking level: {}".format(self.tracking_level)) @@ -153,6 +154,9 @@ def construct_start_message(self) -> bytes: 'platform.system': platform.system(), 'tracking_level': int(self.tracking_level)} + if self.project_name: + message['project_name'] = self.project_name + if self.tracking_level >= 2: message['components'] = get_parsl_usage(self.dfk._config) @@ -188,6 +192,10 @@ def construct_end_message(self) -> bytes: 'end': end_time, 'execution_time': end_time - self.start_time, 'components': [dfk_component] + get_parsl_usage(self.dfk._config)} + + if self.project_name: + message['project_name'] = self.project_name + logger.debug(f"Usage tracking end message (unencoded): {message}") return self.encode_message(message) From 2758763632df084b38400ddb6914c9b772608f4f Mon Sep 17 00:00:00 2001 From: Nishchay Karle <45297081+NishchayKarle@users.noreply.github.com> Date: Wed, 2 Oct 2024 03:31:16 -0500 Subject: [PATCH 06/21] Update docs with new usage tracking info (#3626) Co-authored-by: Daniel S. Katz --- docs/quickstart.rst | 2 +- docs/userguide/usage_tracking.rst | 167 +++++++++++++++++++++++------- 2 files changed, 129 insertions(+), 40 deletions(-) diff --git a/docs/quickstart.rst b/docs/quickstart.rst index f1de3d1f94..a9b1ccf469 100644 --- a/docs/quickstart.rst +++ b/docs/quickstart.rst @@ -236,7 +236,7 @@ for reporting purposes. As an NSF-funded project, our ability to track usage metrics is important for continued funding. -You can opt-in by setting ``usage_tracking=True`` in the configuration object (`parsl.config.Config`). +You can opt-in by setting ``usage_tracking=3`` in the configuration object (`parsl.config.Config`). To read more about what information is collected and how it is used see :ref:`label-usage-tracking`. diff --git a/docs/userguide/usage_tracking.rst b/docs/userguide/usage_tracking.rst index 777e8003b2..da8ac9b79d 100644 --- a/docs/userguide/usage_tracking.rst +++ b/docs/userguide/usage_tracking.rst @@ -1,82 +1,171 @@ .. _label-usage-tracking: -Usage statistics collection +Usage Statistics Collection =========================== -Parsl uses an **Opt-in** model to send usage statistics back to the Parsl development team to -measure worldwide usage and improve reliability and usability. The usage statistics are used only for -improvements and reporting. They are not shared in raw form outside of the Parsl team. - +Parsl uses an **Opt-in** model for usage tracking, allowing users to decide if they wish to participate. Usage statistics are crucial for improving software reliability and help focus development and maintenance efforts on the most used components of Parsl. The collected data is used solely for enhancements and reporting and is not shared in its raw form outside of the Parsl team. Why are we doing this? ---------------------- -The Parsl development team receives support from government funding agencies. For the team to continue to -receive such funding, and for the agencies themselves to argue for funding, both the team and the agencies -must be able to demonstrate that the scientific community is benefiting from these investments. To this end, -it is important that we provide aggregate usage data about such things as the following: +The Parsl development team relies on funding from government agencies. To sustain this funding and advocate for continued support, it is essential to show that the research community benefits from these investments. + +By opting in to share usage data, you actively support the ongoing development and maintenance of Parsl. (See:ref:`What is sent? ` below). + +Opt-In Model +------------ + +We use an **opt-in model** for usage tracking to respect user privacy and provide full control over shared information. We hope that developers and researchers will choose to send us this information. The reason is that we need this data - it is a requirement for funding. -* How many people use Parsl -* Average job length -* Parsl exit codes +Choose the data you share with Usage Tracking Levels. -By participating in this project, you help justify continuing support for the software on which you rely. -(see :ref:`What is sent? ` below). +**Usage Tracking Levels:** -Opt-In ------- +* **Level 1:** Only basic information such as Python version, Parsl version, and platform name (Linux, MacOS, etc.) +* **Level 2:** Level 1 information and configuration information including provider, executor, and launcher names. +* **Level 3:** Level 2 information and workflow execution details, including the number of applications run, failures, and execution time. -We have chosen opt-in collection rather than opt-out with the hope that developers and researchers -will choose to send us this information. The reason is that we need this data - it is a requirement for funding. +By enabling usage tracking, you support Parsl's development. -By opting-in, and allowing these statistics to be reported back, you are explicitly supporting the -further development of Parsl. +**To opt-in, set** ``usage_tracking`` **to the desired level (1, 2, or 3) in the configuration object** (``parsl.config.Config``) **.** -If you wish to opt in to usage reporting, set ``usage_tracking=True`` in the configuration object (`parsl.config.Config`). +Example: +.. code-block:: python3 + + config = Config( + executors=[ + HighThroughputExecutor( + ... + ) + ], + usage_tracking=3 + ) .. _what-is-sent: What is sent? ------------- -* IP address -* Run UUID -* Start and end times -* Number of executors used -* Number of failures -* Parsl and Python version -* OS and OS version - +The data collected depends on the tracking level selected: + +* **Level 1:** Only basic information such as Python version, Parsl version, and platform name (Linux, MacOS, etc.) +* **Level 2:** Level 1 information and configuration information including provider, executor, and launcher names. +* **Level 3:** Level 2 information and workflow execution details, including the number of applications run, failures, and execution time. + +**Example Messages:** + +- At launch: + + .. code-block:: json + + { + "correlator":"6bc7484e-5693-48b2-b6c0-5889a73f7f4e", + "parsl_v":"1.3.0-dev", + "python_v":"3.12.2", + "platform.system":"Darwin", + "tracking_level":3, + "components":[ + { + "c":"parsl.config.Config", + "executors_len":1, + "dependency_resolver":false + }, + "parsl.executors.threads.ThreadPoolExecutor" + ], + "start":1727156153 + } + +- On closure (Tracking Level 3 only): + + .. code-block:: json + + { + "correlator":"6bc7484e-5693-48b2-b6c0-5889a73f7f4e", + "execution_time":31, + "components":[ + { + "c":"parsl.dataflow.dflow.DataFlowKernel", + "app_count":3, + "app_fails":0 + }, + { + "c":"parsl.config.Config", + "executors_len":1, + "dependency_resolver":false + }, + "parsl.executors.threads.ThreadPoolExecutor" + ], + "end":1727156156 + } + +**All messages sent are logged in the** ``parsl.log`` **file, ensuring complete transparency.** How is the data sent? --------------------- -The data is sent via UDP. While this may cause us to lose some data, it drastically reduces the possibility -that the usage statistics reporting will adversely affect the operation of the software. +Data is sent using **UDP** to minimize the impact on workflow performance. While this may result in some data loss, it significantly reduces the chances of usage tracking affecting the software's operation. +The data is processed through AWS CloudWatch to generate a monitoring dashboard, providing valuable insights into usage patterns. When is the data sent? ---------------------- -The data is sent twice per run, once when Parsl starts a script, and once when the script is completed. +Data is sent twice per run: +1. At the start of the script. +2. Upon script completion (for Tracking Level 3). What will the data be used for? ------------------------------- -The data will be used for reporting purposes to answer questions such as: +The data will help the Parsl team understand Parsl usage and make development and maintenance decisions, including: + +* Focus development and maintenance on the most-used components of Parsl. +* Determine which Python versions to continue supporting. +* Track the age of Parsl installations. +* Assess how long it takes for most users to adopt new changes. +* Track usage statistics to report to funders. + +Usage Statistics Dashboard +-------------------------- -* How many unique users are using Parsl? -* To determine patterns of usage - is activity increasing or decreasing? +The collected data is aggregated and displayed on a publicly accessible dashboard. This dashboard provides an overview of how Parsl is being used across different environments and includes metrics such as: -We will also use this information to improve Parsl by identifying software faults. +* Total workflows executed over time +* Most-used Python and Parsl versions +* Most common platforms and executors and more -* What percentage of tasks complete successfully? -* Of the tasks that fail, what is the most common fault code returned? +`Find the dashboard here `_ +Leaderboard +----------- + +**Opting in to usage tracking also allows you to participate in the Parsl Leaderboard. +To participate in the leaderboard, you can deanonymize yourself using the** ``project_name`` **parameter in the parsl configuration object** (``parsl.config.Config``) **.** + +`Find the Parsl Leaderboard here `_ + +Example: + +.. code-block:: python3 + + config = Config( + executors=[ + HighThroughputExecutor( + ... + ) + ], + usage_tracking=3, + project_name="my-test-project" + ) + +Every run of parsl with usage tracking **Level 1** or **Level 2** earns you **1 point**. And every run with usage tracking **Level 3**, earns you **2 points**. + Feedback -------- -Please send us your feedback at parsl@googlegroups.com. Feedback from our user communities will be +Please send us your feedback at parsl@googlegroups.com. Feedback from our user communities will be useful in determining our path forward with usage tracking in the future. + +**Please consider turning on usage tracking to support the continued development of Parsl.** From 94e34d5fe663e2d8d7706d54e31acc63e5aaa375 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Wed, 2 Oct 2024 15:11:43 +0000 Subject: [PATCH 07/21] Remove unused ipyparallel dependency (#3622) Parsl does not need to install ipyparallel as a test dependency any more. This is in context of https://bugs.debian.org/cgi-bin/bugreport.cgi?bug=1082524 which reports ipyparallel packaging issues in the Debian packaging of Parsl. --- docs/faq.rst | 2 +- mypy.ini | 6 ------ test-requirements.txt | 1 - 3 files changed, 1 insertion(+), 8 deletions(-) diff --git a/docs/faq.rst b/docs/faq.rst index a03287c378..ca4bd82bdb 100644 --- a/docs/faq.rst +++ b/docs/faq.rst @@ -209,7 +209,7 @@ For instance, with conda, follow this `cheatsheet # Install packages: - conda install + conda install How do I run code that uses Python2.X? diff --git a/mypy.ini b/mypy.ini index 604fa4d07a..4b64a12de2 100644 --- a/mypy.ini +++ b/mypy.ini @@ -137,12 +137,6 @@ ignore_missing_imports = True [mypy-copy_reg.*] ignore_missing_imports = True -[mypy-ipyparallel.*] -ignore_missing_imports = True - -[mypy-ipython_genutils.*] -ignore_missing_imports = True - [mypy-cmreslogging.handlers.*] ignore_missing_imports = True diff --git a/test-requirements.txt b/test-requirements.txt index acd670b5e9..6abf727ccd 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -1,5 +1,4 @@ flake8==6.1.0 -ipyparallel pandas paramiko pytest>=7.4.0,<8 From aab398aafb3d73ca12a39182022525a3ea696f83 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 3 Oct 2024 10:36:45 +0000 Subject: [PATCH 08/21] Add more usage tracking to htex_local_alternate test (#3625) --- parsl/tests/configs/htex_local_alternate.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/parsl/tests/configs/htex_local_alternate.py b/parsl/tests/configs/htex_local_alternate.py index 52124211bc..d84a07ad84 100644 --- a/parsl/tests/configs/htex_local_alternate.py +++ b/parsl/tests/configs/htex_local_alternate.py @@ -66,7 +66,8 @@ def fresh_config(): monitoring_debug=False, resource_monitoring_interval=1, ), - usage_tracking=True + usage_tracking=3, + project_name="parsl htex_local_alternate test configuration" ) From 530373537a234db5d30b7e407276e6701e9b7151 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Tue, 8 Oct 2024 13:25:56 +0000 Subject: [PATCH 09/21] Drop support for end-of-life Python 3.8 (#3629) Python 3.8 entered the end-of-life phase on 2024-10-07. See https://peps.python.org/pep-0569/ This PR removes CI testing for Python 3.8 and relabels several occurences of 3.8 to 3.9 in the documentation. --- .github/workflows/ci.yaml | 2 +- README.rst | 2 +- codemeta.json | 4 ++-- docs/quickstart.rst | 4 ++-- setup.py | 3 +-- 5 files changed, 7 insertions(+), 8 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 953c17405e..7a686b8649 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -10,7 +10,7 @@ jobs: main-test-suite: strategy: matrix: - python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"] + python-version: ["3.9", "3.10", "3.11", "3.12"] runs-on: ubuntu-20.04 timeout-minutes: 60 diff --git a/README.rst b/README.rst index b09c8c3ff6..a8254e2e40 100644 --- a/README.rst +++ b/README.rst @@ -117,7 +117,7 @@ For Developers Requirements ============ -Parsl is supported in Python 3.8+. Requirements can be found `here `_. Requirements for running tests can be found `here `_. +Parsl is supported in Python 3.9+. Requirements can be found `here `_. Requirements for running tests can be found `here `_. Code of Conduct =============== diff --git a/codemeta.json b/codemeta.json index 917abb5afd..9754c55358 100644 --- a/codemeta.json +++ b/codemeta.json @@ -191,8 +191,8 @@ "name": "The Python Package Index", "url": "https://pypi.org" }, - "runtimePlatform": "Python 3.8", + "runtimePlatform": "Python 3.9", "url": "https://github.com/Parsl/parsl", "developmentStatus": "active", - "programmingLanguage": "Python :: 3.8" + "programmingLanguage": "Python :: 3.9" } diff --git a/docs/quickstart.rst b/docs/quickstart.rst index a9b1ccf469..b511d2397d 100644 --- a/docs/quickstart.rst +++ b/docs/quickstart.rst @@ -10,7 +10,7 @@ Installation Parsl is available on `PyPI `_ and `conda-forge `_. -Parsl requires Python3.8+ and has been tested on Linux and macOS. +Parsl requires Python3.9+ and has been tested on Linux and macOS. Installation using Pip @@ -31,7 +31,7 @@ Installation using Conda 1. Create and activate a new conda environment:: - $ conda create --name parsl_py38 python=3.8 + $ conda create --name parsl_py38 python=3.9 $ source activate parsl_py38 2. Install Parsl:: diff --git a/setup.py b/setup.py index 4934d01e5d..b381ecfc2b 100755 --- a/setup.py +++ b/setup.py @@ -54,7 +54,7 @@ include_package_data=True, package_data={'parsl': ['py.typed']}, packages=find_packages(), - python_requires=">=3.8.0", + python_requires=">=3.9.0", install_requires=install_requires, scripts = ['parsl/executors/high_throughput/process_worker_pool.py', 'parsl/executors/high_throughput/interchange.py', @@ -71,7 +71,6 @@ # Licence, must match with licence above 'License :: OSI Approved :: Apache Software License', # Python versions supported - 'Programming Language :: Python :: 3.8', 'Programming Language :: Python :: 3.9', 'Programming Language :: Python :: 3.10', 'Programming Language :: Python :: 3.11', From fd73930e95f4d62a6f0026a2b5df1725b1646786 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Tue, 8 Oct 2024 14:02:57 +0000 Subject: [PATCH 10/21] Remove claim that Parsl is tested on macOS (#3630) --- docs/quickstart.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/quickstart.rst b/docs/quickstart.rst index b511d2397d..e1c3821466 100644 --- a/docs/quickstart.rst +++ b/docs/quickstart.rst @@ -10,7 +10,7 @@ Installation Parsl is available on `PyPI `_ and `conda-forge `_. -Parsl requires Python3.9+ and has been tested on Linux and macOS. +Parsl requires Python3.9+ and has been tested on Linux. Installation using Pip From 8356233ba48ff6c58616ba20c1c26f3202423790 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 10 Oct 2024 06:58:54 +0000 Subject: [PATCH 11/21] Unhide pytest output by removing dotfile directory (#3631) Prior to this PR, output from pytest was generally hidden by placing it in the .pytest directory. After this PR, that .pytest directory is now pytest-parsl. Most immediately, a breaking change in the GitHub actions artifact upload has started respecting that dotfile hiddenness, and so with that version of the artifact uploader, test runs prior to this PR are missing a bunch of output. https://github.com/actions/upload-artifact/issues/602 The uploader now has an option to enable hidden files... but ... .. before this most immediate motivation, I've run into a couple of developer experience cases that look like: Q: I cannot find the output of pytest A: We have deliberately made it hard for you to find which is enough to make me rename the directory rather than turn on the artifact uploader hidden files option. --- .github/workflows/ci.yaml | 10 +++++----- .gitignore | 1 + parsl/tests/conftest.py | 2 +- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 7a686b8649..e5247c5d25 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -60,7 +60,7 @@ jobs: export PARSL_TEST_PRESERVE_NUM_RUNS=7 make test - ln -s .pytest/parsltest-current test_runinfo + ln -s pytest-parsl/parsltest-current test_runinfo - name: Documentation checks run: | @@ -80,11 +80,11 @@ jobs: # database manager log file or monitoring router log file. It would be better if # the tests themselves failed immediately when there was a monitoring error, but # in the absence of that, this is a dirty way to check. - bash -c '! grep ERROR .pytest/parsltest-current/runinfo*/*/database_manager.log' - bash -c '! grep ERROR .pytest/parsltest-current/runinfo*/*/monitoring_router.log' + bash -c '! grep ERROR pytest-parsl/parsltest-current/runinfo*/*/database_manager.log' + bash -c '! grep ERROR pytest-parsl/parsltest-current/runinfo*/*/monitoring_router.log' # temporary; until test-matrixification - rm -f .pytest/parsltest-current test_runinfo + rm -f pytest-parsl/parsltest-current test_runinfo - name: Checking parsl-visualize run: | @@ -105,6 +105,6 @@ jobs: name: runinfo-${{ matrix.python-version }}-${{ steps.job-info.outputs.as-ascii }}-${{ github.sha }} path: | runinfo/ - .pytest/ + pytest-parsl/ ci_job_info.txt compression-level: 9 diff --git a/.gitignore b/.gitignore index acdb6254d1..8811016b83 100644 --- a/.gitignore +++ b/.gitignore @@ -63,6 +63,7 @@ coverage.xml *.cover .hypothesis/ /.pytest/ +/pytest-parsl/ # Translations *.mo diff --git a/parsl/tests/conftest.py b/parsl/tests/conftest.py index 638088c44c..b8af73e4bf 100644 --- a/parsl/tests/conftest.py +++ b/parsl/tests/conftest.py @@ -58,7 +58,7 @@ def _chmod(path: pathlib.Path, mode: int): config = re.sub(r"[^A-z0-9_-]+", "_", pytestconfig.getoption('config')[0]) cwd = pathlib.Path(os.getcwd()) - pytest_dir = cwd / ".pytest" + pytest_dir = cwd / "pytest-parsl" pytest_dir.mkdir(mode=0o700, parents=True, exist_ok=True) test_dir_prefix = "parsltest-" From 264d1288a06c4d43acc9ba9d39b82b55030284e9 Mon Sep 17 00:00:00 2001 From: Harichandra Prasath <129069095+Harichandra-Prasath@users.noreply.github.com> Date: Mon, 14 Oct 2024 15:18:07 +0530 Subject: [PATCH 12/21] Use a single exception class for all resource specification errors (#3627) There are three places where the executors raise the errors regarding resource specification. * when passing the resource_specification parameter to the htex, it raises the mpi_prefix_composer InvalidResourceSpecification from validate_resource_spec method. https://github.com/Parsl/parsl/blob/master/parsl/executors/high_throughput/executor.py#L342 * ThreadPoolExecutor also raises the UnsupportedFeatureError when passing resource specification to it, https://github.com/Parsl/parsl/blob/master/parsl/executors/threads.py#L54 * WorkQueueExecutor raises two errors similar to this but as a Base ExecutorError, one when there is invalid resource_specification ; https://github.com/Parsl/parsl/blob/master/parsl/executors/workqueue/executor.py#L418 and another one when the autolabel is false and resource_specification is not right, https://github.com/Parsl/parsl/blob/master/parsl/executors/workqueue/executor.py#L429 This PR rationalize these errors by moving InvalidResourceSpecification to parsl.executors.error and using it across the executors. --- parsl/executors/errors.py | 13 +++++++++ parsl/executors/high_throughput/executor.py | 9 +++--- .../high_throughput/mpi_prefix_composer.py | 28 ++++--------------- parsl/executors/threads.py | 5 ++-- parsl/executors/workqueue/executor.py | 11 ++++---- .../test_error_handling/test_resource_spec.py | 24 +++++++--------- .../test_resource_spec_validation.py | 4 +-- .../test_mpi_apps/test_mpi_mode_enabled.py | 6 ++-- .../tests/test_mpi_apps/test_resource_spec.py | 7 ++--- 9 files changed, 47 insertions(+), 60 deletions(-) diff --git a/parsl/executors/errors.py b/parsl/executors/errors.py index 9725105dd9..fed43228be 100644 --- a/parsl/executors/errors.py +++ b/parsl/executors/errors.py @@ -1,4 +1,6 @@ """Exceptions raise by Executors.""" +from typing import Set + from parsl.errors import ParslError from parsl.executors.base import ParslExecutor @@ -44,6 +46,17 @@ def __str__(self): self.current_executor) +class InvalidResourceSpecification(ExecutorError): + """Error raised when Invalid input is supplied via resource Specification""" + + def __init__(self, invalid_keys: Set[str], message: str = ''): + self.invalid_keys = invalid_keys + self.message = message + + def __str__(self): + return f"Invalid Resource Specification Supplied: {self.invalid_keys}. {self.message}" + + class ScalingFailed(ExecutorError): """Scaling failed due to error in Execution provider.""" diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index e589975fb5..338bc57a4e 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -16,16 +16,17 @@ from parsl.addresses import get_all_addresses from parsl.app.errors import RemoteExceptionWrapper from parsl.data_provider.staging import Staging -from parsl.executors.errors import BadMessage, ScalingFailed +from parsl.executors.errors import ( + BadMessage, + InvalidResourceSpecification, + ScalingFailed, +) from parsl.executors.high_throughput import zmq_pipes from parsl.executors.high_throughput.errors import CommandClientTimeoutError from parsl.executors.high_throughput.manager_selector import ( ManagerSelector, RandomManagerSelector, ) -from parsl.executors.high_throughput.mpi_prefix_composer import ( - InvalidResourceSpecification, -) from parsl.executors.status_handling import BlockProviderExecutor from parsl.jobs.states import TERMINAL_STATES, JobState, JobStatus from parsl.process_loggers import wrap_with_logs diff --git a/parsl/executors/high_throughput/mpi_prefix_composer.py b/parsl/executors/high_throughput/mpi_prefix_composer.py index 0125d9a532..689fbb771f 100644 --- a/parsl/executors/high_throughput/mpi_prefix_composer.py +++ b/parsl/executors/high_throughput/mpi_prefix_composer.py @@ -1,5 +1,7 @@ import logging -from typing import Dict, List, Set, Tuple +from typing import Dict, List, Tuple + +from parsl.executors.errors import InvalidResourceSpecification logger = logging.getLogger(__name__) @@ -8,27 +10,6 @@ 'mpiexec') -class MissingResourceSpecification(Exception): - """Exception raised when input is not supplied a resource specification""" - - def __init__(self, reason: str): - self.reason = reason - - def __str__(self): - return f"Missing resource specification: {self.reason}" - - -class InvalidResourceSpecification(Exception): - """Exception raised when Invalid input is supplied via resource specification""" - - def __init__(self, invalid_keys: Set[str], message: str = ''): - self.invalid_keys = invalid_keys - self.message = message - - def __str__(self): - return f"Invalid resource specification options supplied: {self.invalid_keys} {self.message}" - - def validate_resource_spec(resource_spec: Dict[str, str]): """Basic validation of keys in the resource_spec @@ -40,7 +21,8 @@ def validate_resource_spec(resource_spec: Dict[str, str]): # empty resource_spec when mpi_mode is set causes parsl to hang # ref issue #3427 if len(user_keys) == 0: - raise MissingResourceSpecification('MPI mode requires optional parsl_resource_specification keyword argument to be configured') + raise InvalidResourceSpecification(user_keys, + 'MPI mode requires optional parsl_resource_specification keyword argument to be configured') legal_keys = set(("ranks_per_node", "num_nodes", diff --git a/parsl/executors/threads.py b/parsl/executors/threads.py index cbc813d9d4..9b3b0df5ce 100644 --- a/parsl/executors/threads.py +++ b/parsl/executors/threads.py @@ -6,7 +6,7 @@ from parsl.data_provider.staging import Staging from parsl.executors.base import ParslExecutor -from parsl.executors.errors import UnsupportedFeatureError +from parsl.executors.errors import InvalidResourceSpecification from parsl.utils import RepresentationMixin logger = logging.getLogger(__name__) @@ -54,7 +54,8 @@ def submit(self, func, resource_specification, *args, **kwargs): if resource_specification: logger.error("Ignoring the resource specification. " "Parsl resource specification is not supported in ThreadPool Executor.") - raise UnsupportedFeatureError('resource specification', 'ThreadPool Executor', None) + raise InvalidResourceSpecification(set(resource_specification.keys()), + "Parsl resource specification is not supported in ThreadPool Executor.") return self.executor.submit(func, *args, **kwargs) diff --git a/parsl/executors/workqueue/executor.py b/parsl/executors/workqueue/executor.py index ae39f8c118..155c990ab5 100644 --- a/parsl/executors/workqueue/executor.py +++ b/parsl/executors/workqueue/executor.py @@ -28,7 +28,7 @@ from parsl.data_provider.files import File from parsl.data_provider.staging import Staging from parsl.errors import OptionalModuleMissing -from parsl.executors.errors import ExecutorError +from parsl.executors.errors import ExecutorError, InvalidResourceSpecification from parsl.executors.status_handling import BlockProviderExecutor from parsl.executors.workqueue import exec_parsl_function from parsl.process_loggers import wrap_with_logs @@ -419,7 +419,7 @@ def submit(self, func, resource_specification, *args, **kwargs): message = "Task resource specification only accepts these types of resources: {}".format( ', '.join(acceptable_fields)) logger.error(message) - raise ExecutorError(self, message) + raise InvalidResourceSpecification(keys, message) # this checks that either all of the required resource types are specified, or # that none of them are: the `required_resource_types` are not actually required, @@ -430,9 +430,10 @@ def submit(self, func, resource_specification, *args, **kwargs): logger.error("Running with `autolabel=False`. In this mode, " "task resource specification requires " "three resources to be specified simultaneously: cores, memory, and disk") - raise ExecutorError(self, "Task resource specification requires " - "three resources to be specified simultaneously: cores, memory, and disk. " - "Try setting autolabel=True if you are unsure of the resource usage") + raise InvalidResourceSpecification(keys, + "Task resource specification requires " + "three resources to be specified simultaneously: cores, memory, and disk. " + "Try setting autolabel=True if you are unsure of the resource usage") for k in keys: if k == 'cores': diff --git a/parsl/tests/test_error_handling/test_resource_spec.py b/parsl/tests/test_error_handling/test_resource_spec.py index 871df68512..4616219be2 100644 --- a/parsl/tests/test_error_handling/test_resource_spec.py +++ b/parsl/tests/test_error_handling/test_resource_spec.py @@ -1,11 +1,9 @@ import parsl from parsl.app.app import python_app from parsl.executors import WorkQueueExecutor -from parsl.executors.errors import ExecutorError, UnsupportedFeatureError +from parsl.executors.errors import InvalidResourceSpecification from parsl.executors.high_throughput.executor import HighThroughputExecutor -from parsl.executors.high_throughput.mpi_prefix_composer import ( - InvalidResourceSpecification, -) +from parsl.executors.threads import ThreadPoolExecutor @python_app @@ -27,11 +25,10 @@ def test_resource(n=2): try: fut.result() except InvalidResourceSpecification: - assert isinstance(executor, HighThroughputExecutor) - except UnsupportedFeatureError: - assert not isinstance(executor, WorkQueueExecutor) - except Exception as e: - assert isinstance(e, ExecutorError) + assert ( + isinstance(executor, HighThroughputExecutor) or + isinstance(executor, WorkQueueExecutor) or + isinstance(executor, ThreadPoolExecutor)) # Specify resources with wrong types # 'cpus' is incorrect, should be 'cores' @@ -40,8 +37,7 @@ def test_resource(n=2): try: fut.result() except InvalidResourceSpecification: - assert isinstance(executor, HighThroughputExecutor) - except UnsupportedFeatureError: - assert not isinstance(executor, WorkQueueExecutor) - except Exception as e: - assert isinstance(e, ExecutorError) + assert ( + isinstance(executor, HighThroughputExecutor) or + isinstance(executor, WorkQueueExecutor) or + isinstance(executor, ThreadPoolExecutor)) diff --git a/parsl/tests/test_htex/test_resource_spec_validation.py b/parsl/tests/test_htex/test_resource_spec_validation.py index ac0c580c20..5587891650 100644 --- a/parsl/tests/test_htex/test_resource_spec_validation.py +++ b/parsl/tests/test_htex/test_resource_spec_validation.py @@ -4,9 +4,7 @@ import pytest from parsl.executors import HighThroughputExecutor -from parsl.executors.high_throughput.mpi_prefix_composer import ( - InvalidResourceSpecification, -) +from parsl.executors.errors import InvalidResourceSpecification def double(x): diff --git a/parsl/tests/test_mpi_apps/test_mpi_mode_enabled.py b/parsl/tests/test_mpi_apps/test_mpi_mode_enabled.py index aff2501674..63005a8df7 100644 --- a/parsl/tests/test_mpi_apps/test_mpi_mode_enabled.py +++ b/parsl/tests/test_mpi_apps/test_mpi_mode_enabled.py @@ -8,9 +8,7 @@ import parsl from parsl import Config, bash_app, python_app from parsl.executors import MPIExecutor -from parsl.executors.high_throughput.mpi_prefix_composer import ( - MissingResourceSpecification, -) +from parsl.executors.errors import InvalidResourceSpecification from parsl.launchers import SimpleLauncher from parsl.providers import LocalProvider @@ -185,6 +183,6 @@ def test_simulated_load(rounds: int = 100): @pytest.mark.local def test_missing_resource_spec(): - with pytest.raises(MissingResourceSpecification): + with pytest.raises(InvalidResourceSpecification): future = mock_app(sleep_dur=0.4) future.result(timeout=10) diff --git a/parsl/tests/test_mpi_apps/test_resource_spec.py b/parsl/tests/test_mpi_apps/test_resource_spec.py index f180c67d52..058a7ef425 100644 --- a/parsl/tests/test_mpi_apps/test_resource_spec.py +++ b/parsl/tests/test_mpi_apps/test_resource_spec.py @@ -10,12 +10,9 @@ import pytest from parsl.app.app import python_app +from parsl.executors.errors import InvalidResourceSpecification from parsl.executors.high_throughput.executor import HighThroughputExecutor from parsl.executors.high_throughput.mpi_executor import MPIExecutor -from parsl.executors.high_throughput.mpi_prefix_composer import ( - InvalidResourceSpecification, - MissingResourceSpecification, -) from parsl.executors.high_throughput.mpi_resource_management import ( get_nodes_in_batchjob, get_pbs_hosts_list, @@ -105,7 +102,7 @@ def test_top_level(): ({"num_nodes": 2, "ranks_per_node": 1}, None), ({"launcher_options": "--debug_foo"}, None), ({"num_nodes": 2, "BAD_OPT": 1}, InvalidResourceSpecification), - ({}, MissingResourceSpecification), + ({}, InvalidResourceSpecification), ) ) def test_mpi_resource_spec(resource_spec: Dict, exception): From 658619dfff45d041f5ac2b87f1b49c78205c63b3 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Mon, 14 Oct 2024 14:14:35 +0000 Subject: [PATCH 13/21] Count outstanding tasks on submit side rather than in interchange (#3637) The count of outstanding tasks may now be higher: A task will be counted when it enters the task submission ZMQ channel to the interchange, rather than when it reaches the end of the ZMQ channel (which could be a queue under load), and a task will be counted as complete when it arrives back on the submit side, rather than when the interchange places it into the results channel. It's probably more desirable to count the tasks earlier like this and less desirable to count the results later like this, for scaling purposes - scaling load will be observed before it reaches the interchange, but reduction in load due to completion won't be observed until the result has got all the way through the result queue. I don't think this will affect users in real life - but I don't have any numbers for what delivery times look like across the two queues under heavy load. This removes an RPC that could be computed locally, and removes a use of the command channel which is generally thread-unsafe. This removes part of shutdown hangs: when pressing ctrl-c at the right point, shutdown will wait for the strategy thread to shut down, but the strategy thread can get blocked on OUTSTANDING_C, and OUTSTANDING_C will never return because the ctrl-C also made the interchange go away. This shutdown block is hard to demonstrate in a CI test because there are other blocking actions (such as a MANAGERS rpc) involved in shutdown, and it seems to need the strategy thread to be in just the right place. Implements #3365 --- parsl/executors/high_throughput/executor.py | 2 +- parsl/executors/high_throughput/interchange.py | 8 +------- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index 338bc57a4e..f2bd26bf5b 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -582,7 +582,7 @@ def hold_worker(self, worker_id: str) -> None: def outstanding(self) -> int: """Returns the count of tasks outstanding across the interchange and managers""" - return self.command_client.run("OUTSTANDING_C") + return len(self.tasks) @property def connected_workers(self) -> int: diff --git a/parsl/executors/high_throughput/interchange.py b/parsl/executors/high_throughput/interchange.py index d61c76fed2..17e5f1da9e 100644 --- a/parsl/executors/high_throughput/interchange.py +++ b/parsl/executors/high_throughput/interchange.py @@ -252,13 +252,7 @@ def _command_server(self) -> NoReturn: try: command_req = self.command_channel.recv_pyobj() logger.debug("Received command request: {}".format(command_req)) - if command_req == "OUTSTANDING_C": - outstanding = self.pending_task_queue.qsize() - for manager in self._ready_managers.values(): - outstanding += len(manager['tasks']) - reply = outstanding - - elif command_req == "CONNECTED_BLOCKS": + if command_req == "CONNECTED_BLOCKS": reply = self.connected_block_history elif command_req == "WORKERS": From 2eb05cdfd39a13dac3e0ae3a650f9d69145d8eec Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Mon, 14 Oct 2024 14:52:56 +0000 Subject: [PATCH 14/21] Don't forward worker heartbeats to the submit side (#3634) Prior to this PR, heartbeats sent from worker pools to the interchange on the results channel would be forwarded on to the submit side along the interchange to submit side results channel. These heartbeat messages would then be discarded on the submit side. I think these forwarded messages don't do anything, so this PR removes the forwarding. The network path between interchange and submit side is on localhost and unlikely to need any TCP level keepalives. There is no handling for missing heartbeats, and indeed no expectation of heartbeats arriving: an interchange might exist for many hours with no connected workers, and so no forwarded heartbeats. This PR should reduce message load on the submit side results channel by one message per work pool per heartbeat period. See issue #3464 for further context. --- parsl/executors/high_throughput/executor.py | 4 +--- parsl/executors/high_throughput/interchange.py | 1 - 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index f2bd26bf5b..06ced1d1f4 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -460,9 +460,7 @@ def _result_queue_worker(self): except pickle.UnpicklingError: raise BadMessage("Message received could not be unpickled") - if msg['type'] == 'heartbeat': - continue - elif msg['type'] == 'result': + if msg['type'] == 'result': try: tid = msg['task_id'] except Exception: diff --git a/parsl/executors/high_throughput/interchange.py b/parsl/executors/high_throughput/interchange.py index 17e5f1da9e..036aefa2cd 100644 --- a/parsl/executors/high_throughput/interchange.py +++ b/parsl/executors/high_throughput/interchange.py @@ -543,7 +543,6 @@ def process_results_incoming(self, interesting_managers: Set[bytes], monitoring_ monitoring_radio.send(r['payload']) elif r['type'] == 'heartbeat': logger.debug("Manager %r sent heartbeat via results connection", manager_id) - b_messages.append((p_message, r)) else: logger.error("Interchange discarding result_queue message of unknown type: %s", r["type"]) From ae5e5f49fe74c89d365fec34f75a76ec9c1c1810 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Mon, 14 Oct 2024 15:20:14 +0000 Subject: [PATCH 15/21] Remove unneeded interchange SIGTERM handler (#3635) This was introduced in PR #2629 to guard against the submit process installing a SIGTERM handler and then that handler being unexpectedly inherited by the interchange via multiprocesssing fork PR #3463 changed the interchange to run as a fresh Python process, which will not inherit SIGTERM handlers, so since then this line has been vestigial. Fixes issue #3588 --- parsl/executors/high_throughput/interchange.py | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/parsl/executors/high_throughput/interchange.py b/parsl/executors/high_throughput/interchange.py index 036aefa2cd..b0228b52f0 100644 --- a/parsl/executors/high_throughput/interchange.py +++ b/parsl/executors/high_throughput/interchange.py @@ -6,7 +6,6 @@ import pickle import platform import queue -import signal import sys import threading import time @@ -313,16 +312,6 @@ def start(self) -> None: """ Start the interchange """ - # If a user workflow has set its own signal handler for sigterm, that - # handler will be inherited by the interchange process because it is - # launched as a multiprocessing fork process. - # That can interfere with the interchange shutdown mechanism, which is - # to receive a SIGTERM and exit immediately. - # See Parsl issue #2343 (Threads and multiprocessing cannot be - # intermingled without deadlocks) which talks about other fork-related - # parent-process-inheritance problems. - signal.signal(signal.SIGTERM, signal.SIG_DFL) - logger.info("Starting main interchange method") if self.hub_address is not None and self.hub_zmq_port is not None: From 2a6bd18f1ab6b4e580fee81781b209669dab8873 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Tue, 15 Oct 2024 09:13:49 +0000 Subject: [PATCH 16/21] Correct whitespace in InvalidResourceSpecification message (#3633) --- parsl/executors/high_throughput/executor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index 06ced1d1f4..7f8ea42d7e 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -346,9 +346,9 @@ def validate_resource_spec(self, resource_specification: dict): if resource_specification: raise InvalidResourceSpecification( set(resource_specification.keys()), - ("HTEX does not support the supplied resource_specifications." + ("HTEX does not support the supplied resource_specifications. " "For MPI applications consider using the MPIExecutor. " - "For specifications for core count/memory/walltime, consider using WorkQueueExecutor. ") + "For specifications for core count/memory/walltime, consider using WorkQueueExecutor.") ) return From ac62e048087c88c1eaeaec1fee04ceabbad8ae0f Mon Sep 17 00:00:00 2001 From: matthewc2003 Date: Tue, 15 Oct 2024 07:16:34 -0700 Subject: [PATCH 17/21] Add BlockIdManagerSelector (#3560) Adds the BlockIdManagerSelector to the list of available manager selectors. This selector returns a sorted list of managers by their block id, from greatest (newest) to least (oldest). --- docs/reference.rst | 10 ++++ parsl/executors/high_throughput/executor.py | 5 ++ .../high_throughput/manager_selector.py | 30 +++++++++++ .../test_block_manager_selector_unit.py | 20 +++++++ .../test_manager_selector_by_block.py | 53 +++++++++++++++++++ 5 files changed, 118 insertions(+) create mode 100644 parsl/tests/test_htex/test_block_manager_selector_unit.py create mode 100644 parsl/tests/test_htex/test_manager_selector_by_block.py diff --git a/docs/reference.rst b/docs/reference.rst index f2d89afaf8..3436635cad 100644 --- a/docs/reference.rst +++ b/docs/reference.rst @@ -78,6 +78,16 @@ Executors parsl.executors.FluxExecutor parsl.executors.radical.RadicalPilotExecutor +Manager Selectors +================= + +.. autosummary:: + :toctree: stubs + :nosignatures: + + parsl.executors.high_throughput.manager_selector.RandomManagerSelector + parsl.executors.high_throughput.manager_selector.BlockIdManagerSelector + Launchers ========= diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index 7f8ea42d7e..fb38c0121e 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -146,6 +146,11 @@ encrypted : bool Flag to enable/disable encryption (CurveZMQ). Default is False. + + manager_selector: ManagerSelector + Determines what strategy the interchange uses to select managers during task distribution. + See API reference under "Manager Selectors" regarding the various manager selectors. + Default: 'RandomManagerSelector' """ # Documentation for params used by both HTEx and MPIEx diff --git a/parsl/executors/high_throughput/manager_selector.py b/parsl/executors/high_throughput/manager_selector.py index 0ede28ee7d..25a9c49ebc 100644 --- a/parsl/executors/high_throughput/manager_selector.py +++ b/parsl/executors/high_throughput/manager_selector.py @@ -19,7 +19,37 @@ def sort_managers(self, ready_managers: Dict[bytes, ManagerRecord], manager_list class RandomManagerSelector(ManagerSelector): + """Returns a shuffled list of interesting_managers + + By default this strategy is used by the interchange. Works well + in distributing workloads equally across all availble compute + resources. The random workload strategy is not effective in + conjunction with elastic scaling behavior as the even task + distribution does not allow the scaling down of blocks, leading + to wasted resource consumption. + """ + def sort_managers(self, ready_managers: Dict[bytes, ManagerRecord], manager_list: Set[bytes]) -> List[bytes]: c_manager_list = list(manager_list) random.shuffle(c_manager_list) return c_manager_list + + +class BlockIdManagerSelector(ManagerSelector): + + """Returns an interesting_managers list sorted by block ID + + Observations: + 1. BlockID manager selector helps with workloads that see a varying + amount of tasks over time. New blocks are prioritized with the + blockID manager selector, when used with 'htex_auto_scaling', results + in compute cost savings. + + 2. Doesn't really work with bag-of-tasks workloads. When all the tasks + are put into the queue upfront, all blocks operate at near full + utilization for the majority of the workload, which task goes where + doesn't really matter. + """ + + def sort_managers(self, ready_managers: Dict[bytes, ManagerRecord], manager_list: Set[bytes]) -> List[bytes]: + return sorted(manager_list, key=lambda x: (ready_managers[x]['block_id'] is not None, ready_managers[x]['block_id'])) diff --git a/parsl/tests/test_htex/test_block_manager_selector_unit.py b/parsl/tests/test_htex/test_block_manager_selector_unit.py new file mode 100644 index 0000000000..46b2bb1e5d --- /dev/null +++ b/parsl/tests/test_htex/test_block_manager_selector_unit.py @@ -0,0 +1,20 @@ +import pytest + +from parsl.executors.high_throughput.manager_record import ManagerRecord +from parsl.executors.high_throughput.manager_selector import BlockIdManagerSelector + + +@pytest.mark.local +def test_sort_managers(): + ready_managers = { + b'manager1': {'block_id': 1}, + b'manager2': {'block_id': None}, + b'manager3': {'block_id': 3}, + b'manager4': {'block_id': 2} + } + + manager_list = {b'manager1', b'manager2', b'manager3', b'manager4'} + expected_sorted_list = [b'manager2', b'manager1', b'manager4', b'manager3'] + manager_selector = BlockIdManagerSelector() + sorted_managers = manager_selector.sort_managers(ready_managers, manager_list) + assert sorted_managers == expected_sorted_list diff --git a/parsl/tests/test_htex/test_manager_selector_by_block.py b/parsl/tests/test_htex/test_manager_selector_by_block.py new file mode 100644 index 0000000000..0933b581ff --- /dev/null +++ b/parsl/tests/test_htex/test_manager_selector_by_block.py @@ -0,0 +1,53 @@ +import time + +import pytest + +import parsl +from parsl.app.app import bash_app, python_app +from parsl.channels import LocalChannel +from parsl.config import Config +from parsl.executors import HighThroughputExecutor +from parsl.executors.high_throughput.manager_selector import ( + BlockIdManagerSelector, + ManagerSelector, +) +from parsl.launchers import WrappedLauncher +from parsl.providers import LocalProvider +from parsl.usage_tracking.levels import LEVEL_1 + +BLOCK_COUNT = 2 + + +@parsl.python_app +def get_worker_pid(): + import os + return os.environ.get('PARSL_WORKER_BLOCK_ID') + + +@pytest.mark.local +def test_block_id_selection(try_assert): + htex = HighThroughputExecutor( + label="htex_local", + max_workers_per_node=1, + manager_selector=BlockIdManagerSelector(), + provider=LocalProvider( + channel=LocalChannel(), + init_blocks=BLOCK_COUNT, + max_blocks=BLOCK_COUNT, + min_blocks=BLOCK_COUNT, + ), + ) + + config = Config( + executors=[htex], + usage_tracking=LEVEL_1, + ) + + with parsl.load(config): + blockids = [] + try_assert(lambda: len(htex.connected_managers()) == BLOCK_COUNT, timeout_ms=20000) + for i in range(10): + future = get_worker_pid() + blockids.append(future.result()) + + assert all(blockid == "1" for blockid in blockids) From af4241489ed26d2396834da8597915a5e24cd295 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Tue, 15 Oct 2024 20:26:19 +0000 Subject: [PATCH 18/21] Fix typo in "fully drained" log message (#3636) --- parsl/executors/high_throughput/process_worker_pool.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parsl/executors/high_throughput/process_worker_pool.py b/parsl/executors/high_throughput/process_worker_pool.py index 59efe501f1..7e238bb61c 100755 --- a/parsl/executors/high_throughput/process_worker_pool.py +++ b/parsl/executors/high_throughput/process_worker_pool.py @@ -362,7 +362,7 @@ def pull_tasks(self, kill_event): if tasks == HEARTBEAT_CODE: logger.debug("Got heartbeat from interchange") elif tasks == DRAINED_CODE: - logger.info("Got fulled drained message from interchange - setting kill flag") + logger.info("Got fully drained message from interchange - setting kill flag") kill_event.set() else: task_recv_counter += len(tasks) From 29f960f264c2bb959f65c07faafb07d20402ad5a Mon Sep 17 00:00:00 2001 From: matthewc2003 Date: Thu, 17 Oct 2024 13:29:49 -0700 Subject: [PATCH 19/21] Add resource specification fields to HTEX (#3638) Adds the parameter 'priority' as a valid entry in the resource spec dict. Necessary for changing the pending_task_queue to a different structure than queue.Queue. Also passes resource spec to the interchange. --- parsl/executors/high_throughput/executor.py | 20 ++++++++++--------- .../test_resource_spec_validation.py | 7 +++++++ 2 files changed, 18 insertions(+), 9 deletions(-) diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index fb38c0121e..ab1498efc4 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -346,15 +346,17 @@ def worker_logdir(self): return self.logdir def validate_resource_spec(self, resource_specification: dict): - """HTEX does not support *any* resource_specification options and - will raise InvalidResourceSpecification is any are passed to it""" + """HTEX supports the following *Optional* resource specifications: + priority: lower value is higher priority""" if resource_specification: - raise InvalidResourceSpecification( - set(resource_specification.keys()), - ("HTEX does not support the supplied resource_specifications. " - "For MPI applications consider using the MPIExecutor. " - "For specifications for core count/memory/walltime, consider using WorkQueueExecutor.") - ) + acceptable_fields = {'priority'} + keys = set(resource_specification.keys()) + invalid_keys = keys - acceptable_fields + if invalid_keys: + message = "Task resource specification only accepts these types of resources: {}".format( + ', '.join(acceptable_fields)) + logger.error(message) + raise InvalidResourceSpecification(set(invalid_keys), message) return def initialize_scaling(self): @@ -662,7 +664,7 @@ def submit(self, func, resource_specification, *args, **kwargs): except TypeError: raise SerializationError(func.__name__) - msg = {"task_id": task_id, "buffer": fn_buf} + msg = {"task_id": task_id, "resource_spec": resource_specification, "buffer": fn_buf} # Post task to the outgoing queue self.outgoing_q.put(msg) diff --git a/parsl/tests/test_htex/test_resource_spec_validation.py b/parsl/tests/test_htex/test_resource_spec_validation.py index 5587891650..2a6d577416 100644 --- a/parsl/tests/test_htex/test_resource_spec_validation.py +++ b/parsl/tests/test_htex/test_resource_spec_validation.py @@ -30,6 +30,13 @@ def test_resource_spec_validation(): assert ret_val is None +@pytest.mark.local +def test_resource_spec_validation_one_key(): + htex = HighThroughputExecutor() + ret_val = htex.validate_resource_spec({"priority": 2}) + assert ret_val is None + + @pytest.mark.local def test_resource_spec_validation_bad_keys(): htex = HighThroughputExecutor() From 973ee4840814e11e538c6b405119f7136502a9db Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Sat, 19 Oct 2024 16:29:34 +0000 Subject: [PATCH 20/21] Reduce load on drain integration test to reduce race-condition failures (#3640) This PR reduces the load places on the interchange and on the whole test environment caused by repeatedly querying the interchange for connected managers. It does this by increasing the period between such requests, from the default, every 20ms, to every 100ms. In the last few days, test_drain.py began failing often. I have seen it occasionally fail before. This was initially a problem in PR #3639 which is unrelated, but I recreated the problem in CI against master as of #3627. I investigated and found this behaviour causing the failure: * test_drain configures the drain period to be 1 second * startup of a worker pool was taking more than 1 second * the worker pool enters drain state, drains and terminates immediately on being fully started up. * test_py fails its assumption that there is a worker pool to inspect after waiting for there to be worker pool to inspect. This is the race condition: the assertion at line 57 is true but line 59 returns an empty managers list. ``` 57 try_assert(lambda: len(htex.connected_managers()) == 1, check_period_ms=CONNECTED_MANAGERS_POLL_MS) 58 59 managers = htex.connected_managers() 60 assert managers[0]['active'], "The manager should be active" ``` Looking at the CI logs for a failing case, I saw direct evidence that the worker pool takes more than 1 second to start up in `manager.log`: ``` 2024-10-18 10:31:16.007 parsl:914 29414 MainThread [INFO] Python version: 3.12.7 (main, Oct 1 2024, 15:17:50) [GCC 9.4.0] [...] 2024-10-18 10:31:16.008 parsl:151 29414 MainThread [INFO] Manager initializing [this is where the worker start time for drain purposes is measured] [...] 2024-10-18 10:31:16.011 parsl:183 29414 MainThread [INFO] Manager connected to interchange 2024-10-18 10:31:17.058 parsl:243 29414 MainThread [INFO] Will request drain at 1729247477.0087547 [...] 2024-10-18 10:31:17.073 parsl:336 29414 Task-Puller [DEBUG] ready workers: 0, pending tasks: 0 ``` There's more than a second delay between "... connected to interchange" and the subsequent message "Will request drain". There's not a huge amount of stuff happening between these lines but there are things like multiprocessing initialization which starts a new process. It looks like this bit of code is slow even in the successful case: rerunning until success, I see this timing in CI: ``` 2024-10-18 12:11:17.475 parsl:183 23062 MainThread [INFO] Manager connected to interchange 2024-10-18 12:11:18.181 parsl:243 23062 MainThread [INFO] Will request drain at 1729253478.4731379 ``` which is still a large fraction of a second (but sufficiently less than a second for the test to pass). I haven't investigated what is taking that time. I haven't investigated if I also see that on my laptop. I hypothesised that a lot of these test failures come from the test environment being quite loaded. I'm especially suspicious of using `try_assert` with its default timings which are very tight (20ms) - the connected managers RPC here would be expected to run much less frequently, more like every 5 seconds in regular Parsl use. So I lengthed the period of the try_asserts in this test, to try to reduce load caused there. That makes the test pass repeatedly again. Things not investigated/debugged: * why this is taking >0.5 second even in the successful case - it's possible that this is a reasonable startup time and so the test might be lengthened by a few seconds * how to do a test without being timing reliant - draining is, by its very nature, reliant on the passage of "real time". For example, you might mock (at the libc level if not at the Python level) system time. * what other loads are present on the system - one of the points of slowly-ongoing PR #3397 shutdown tidyup is to reduce thread load left behind by earlier tests --- parsl/tests/test_htex/test_drain.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/parsl/tests/test_htex/test_drain.py b/parsl/tests/test_htex/test_drain.py index da115b42f0..5f8e86e4b7 100644 --- a/parsl/tests/test_htex/test_drain.py +++ b/parsl/tests/test_htex/test_drain.py @@ -15,6 +15,8 @@ # last twice that many number of seconds. TIME_CONST = 1 +CONNECTED_MANAGERS_POLL_MS = 100 + def local_config(): return Config( @@ -52,7 +54,7 @@ def test_drain(try_assert): # wait till we have a block running... - try_assert(lambda: len(htex.connected_managers()) == 1) + try_assert(lambda: len(htex.connected_managers()) == 1, check_period_ms=CONNECTED_MANAGERS_POLL_MS) managers = htex.connected_managers() assert managers[0]['active'], "The manager should be active" @@ -63,7 +65,7 @@ def test_drain(try_assert): time.sleep(TIME_CONST) # this assert should happen *very fast* after the above delay... - try_assert(lambda: htex.connected_managers()[0]['draining'], timeout_ms=500) + try_assert(lambda: htex.connected_managers()[0]['draining'], timeout_ms=500, check_period_ms=CONNECTED_MANAGERS_POLL_MS) # and the test task should still be running... assert not fut.done(), "The test task should still be running" @@ -76,4 +78,4 @@ def test_drain(try_assert): # connected managers. # As with the above draining assert, this should happen very fast after # the task ends. - try_assert(lambda: len(htex.connected_managers()) == 0, timeout_ms=500) + try_assert(lambda: len(htex.connected_managers()) == 0, timeout_ms=500, check_period_ms=CONNECTED_MANAGERS_POLL_MS) From fec4e407652d3beb7bd6578fff2d2883fdc188ce Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Sat, 19 Oct 2024 21:09:58 +0000 Subject: [PATCH 21/21] Increase time scale for drain test outside of the scale of #3640 problems (#3641) --- parsl/tests/test_htex/test_drain.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parsl/tests/test_htex/test_drain.py b/parsl/tests/test_htex/test_drain.py index 5f8e86e4b7..efd0405863 100644 --- a/parsl/tests/test_htex/test_drain.py +++ b/parsl/tests/test_htex/test_drain.py @@ -13,7 +13,7 @@ # based around the expected drain period: the drain period # is TIME_CONST seconds, and the single executed task will # last twice that many number of seconds. -TIME_CONST = 1 +TIME_CONST = 4 CONNECTED_MANAGERS_POLL_MS = 100