From 10a6a00144bbbcf12923e95b8f940370fcf76e9a Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Tue, 6 Aug 2024 23:39:05 +0200 Subject: [PATCH 01/15] Remove monitoring router modification of node message (#3567) Prior to this PR, the monitoring router would add a run_id field to every NODE_INFO message that it received. These are messages from the interchange describing worker pools. The monitoring router does not modify any other messages. This PR sets the run_id at the point of message origination inside the interchange (in _send_monitoring_info), and makes the router leave NODE_INFO messages unmodified (like the other message types). This is part of work to make the router less aware of message types by removing a bunch of message-type specific handling. This PR brings in a bunch of rewiring to get the run id into the interchange rather than into the monitoring router. * Changed Behaviour This should not change any workflow-user-facing behaviour. Globus Compute (or anyone else building a fake Parsl environment) will maybe have to change how they fake their Parsl implementation to pass in a run id (the executor.run_id part of dfk.add_executors). --- parsl/dataflow/dflow.py | 2 +- parsl/executors/high_throughput/executor.py | 1 + parsl/executors/high_throughput/interchange.py | 4 ++++ parsl/monitoring/monitoring.py | 3 +-- parsl/monitoring/router.py | 7 +------ parsl/tests/test_htex/test_zmq_binding.py | 3 ++- 6 files changed, 10 insertions(+), 10 deletions(-) diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index 88ef063230..344173c4b1 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -116,7 +116,7 @@ def __init__(self, config: Config) -> None: if self.monitoring: if self.monitoring.logdir is None: self.monitoring.logdir = self.run_dir - self.monitoring.start(self.run_id, self.run_dir, self.config.run_dir) + self.monitoring.start(self.run_dir, self.config.run_dir) self.time_began = datetime.datetime.now() self.time_completed: Optional[datetime.datetime] = None diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index 6c181cdee7..1a56195c07 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -551,6 +551,7 @@ def _start_local_interchange_process(self) -> None: "logging_level": logging.DEBUG if self.worker_debug else logging.INFO, "cert_dir": self.cert_dir, "manager_selector": self.manager_selector, + "run_id": self.run_id, } config_pickle = pickle.dumps(interchange_config) diff --git a/parsl/executors/high_throughput/interchange.py b/parsl/executors/high_throughput/interchange.py index 5da83ae3ca..fa0969d398 100644 --- a/parsl/executors/high_throughput/interchange.py +++ b/parsl/executors/high_throughput/interchange.py @@ -55,6 +55,7 @@ def __init__(self, poll_period: int, cert_dir: Optional[str], manager_selector: ManagerSelector, + run_id: str, ) -> None: """ Parameters @@ -125,6 +126,8 @@ def __init__(self, self.command_channel.connect("tcp://{}:{}".format(client_address, client_ports[2])) logger.info("Connected to client") + self.run_id = run_id + self.hub_address = hub_address self.hub_zmq_port = hub_zmq_port @@ -227,6 +230,7 @@ def _send_monitoring_info(self, monitoring_radio: Optional[MonitoringRadioSender d: Dict = cast(Dict, manager.copy()) d['timestamp'] = datetime.datetime.now() d['last_heartbeat'] = datetime.datetime.fromtimestamp(d['last_heartbeat']) + d['run_id'] = self.run_id monitoring_radio.send((MessageType.NODE_INFO, d)) diff --git a/parsl/monitoring/monitoring.py b/parsl/monitoring/monitoring.py index c9a2dc9ed7..9dccbecd35 100644 --- a/parsl/monitoring/monitoring.py +++ b/parsl/monitoring/monitoring.py @@ -106,7 +106,7 @@ def __init__(self, self.resource_monitoring_enabled = resource_monitoring_enabled self.resource_monitoring_interval = resource_monitoring_interval - def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> None: + def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> None: logger.debug("Starting MonitoringHub") @@ -161,7 +161,6 @@ def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.Pat "zmq_port_range": self.hub_port_range, "logdir": self.logdir, "logging_level": logging.DEBUG if self.monitoring_debug else logging.INFO, - "run_id": run_id }, name="Monitoring-Router-Process", daemon=True, diff --git a/parsl/monitoring/router.py b/parsl/monitoring/router.py index bf395e3662..4be454b797 100644 --- a/parsl/monitoring/router.py +++ b/parsl/monitoring/router.py @@ -31,7 +31,6 @@ def __init__(self, monitoring_hub_address: str = "127.0.0.1", logdir: str = ".", - run_id: str, logging_level: int = logging.INFO, atexit_timeout: int = 3, # in seconds priority_msgs: "queue.Queue[AddressedMonitoringMessage]", @@ -71,7 +70,6 @@ def __init__(self, self.hub_address = hub_address self.atexit_timeout = atexit_timeout - self.run_id = run_id self.loop_freq = 10.0 # milliseconds @@ -172,7 +170,6 @@ def start_zmq_listener(self) -> None: msg_0 = (msg, 0) if msg[0] == MessageType.NODE_INFO: - msg[1]['run_id'] = self.run_id self.node_msgs.put(msg_0) elif msg[0] == MessageType.RESOURCE_INFO: self.resource_msgs.put(msg_0) @@ -218,8 +215,7 @@ def router_starter(comm_q: "queue.Queue[Union[Tuple[int, int], str]]", zmq_port_range: Tuple[int, int], logdir: str, - logging_level: int, - run_id: str) -> None: + logging_level: int) -> None: setproctitle("parsl: monitoring router") try: router = MonitoringRouter(hub_address=hub_address, @@ -227,7 +223,6 @@ def router_starter(comm_q: "queue.Queue[Union[Tuple[int, int], str]]", zmq_port_range=zmq_port_range, logdir=logdir, logging_level=logging_level, - run_id=run_id, priority_msgs=priority_msgs, node_msgs=node_msgs, block_msgs=block_msgs, diff --git a/parsl/tests/test_htex/test_zmq_binding.py b/parsl/tests/test_htex/test_zmq_binding.py index 2273443b99..e21c065d0d 100644 --- a/parsl/tests/test_htex/test_zmq_binding.py +++ b/parsl/tests/test_htex/test_zmq_binding.py @@ -25,7 +25,8 @@ def make_interchange(*, interchange_address: Optional[str], cert_dir: Optional[s logdir=".", logging_level=logging.INFO, manager_selector=RandomManagerSelector(), - poll_period=10) + poll_period=10, + run_id="test_run_id") @pytest.fixture From 1c7a0e40ed37b4ffe6c31633d4de4e1d9360e9f9 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 8 Aug 2024 10:40:22 +0200 Subject: [PATCH 02/15] Aggressively deprecate Channels and AdHocProvider (#3569) This is described in issue #3515. Issue #3515 has not received any comments arguing in favour of retaining channels and the AdHocprovider, and several in support of removing them, and so this PR takes a heavy handed approach that is well on the way to the end goal of #3515 of deleting channels and the AdHocProvider entirely: Channels except LocalChannel are renamed, so that any users of other channels will have to make a change to their code and actively observe the word "Deprecated" in the name. The AdHocProvider is renamed in the same way. Most documentation (but not docstrings) about channels and the ad-hoc provider is removed or replaced with a link to issue Tests which much be manually run, and so in effect are never run and shouldn't be expected to work now - parsl/tests/manual_tests and parsl/tests/integration/ - are deleted rather than fixed to follow the above naming change. The tests for SSH channels and the AdHocProvider that run in CI are modified to continue passing. Exposure of the deprecated components via top level parsl.providers and parsl.channels re-export is removed. To use this components, the deprecated modules must be imported directly. --- docs/historical/changelog.rst | 8 +-- docs/reference.rst | 13 ++--- docs/userguide/configuring.rst | 38 +++----------- docs/userguide/examples/config.py | 5 +- docs/userguide/execution.rst | 3 +- docs/userguide/plugins.rst | 11 ++--- parsl/channels/__init__.py | 5 +- parsl/channels/oauth_ssh/oauth_ssh.py | 4 +- parsl/channels/ssh/ssh.py | 2 +- parsl/channels/ssh_il/ssh_il.py | 4 +- parsl/configs/ad_hoc.py | 38 -------------- parsl/providers/__init__.py | 4 -- parsl/providers/ad_hoc/ad_hoc.py | 8 ++- parsl/tests/configs/ad_hoc_cluster_htex.py | 35 ------------- parsl/tests/configs/htex_ad_hoc_cluster.py | 26 ---------- parsl/tests/configs/local_adhoc.py | 4 +- parsl/tests/configs/swan_htex.py | 43 ---------------- .../integration/test_channels/test_scp_1.py | 45 ----------------- .../integration/test_channels/test_ssh_1.py | 40 --------------- .../test_channels/test_ssh_errors.py | 46 ----------------- .../test_channels/test_ssh_file_transport.py | 41 ---------------- .../test_channels/test_ssh_interactive.py | 24 --------- parsl/tests/manual_tests/test_ad_hoc_htex.py | 49 ------------------- parsl/tests/manual_tests/test_oauth_ssh.py | 13 ----- .../test_providers/test_local_provider.py | 11 +++-- 25 files changed, 40 insertions(+), 480 deletions(-) delete mode 100644 parsl/configs/ad_hoc.py delete mode 100644 parsl/tests/configs/ad_hoc_cluster_htex.py delete mode 100644 parsl/tests/configs/htex_ad_hoc_cluster.py delete mode 100644 parsl/tests/configs/swan_htex.py delete mode 100644 parsl/tests/integration/test_channels/test_scp_1.py delete mode 100644 parsl/tests/integration/test_channels/test_ssh_1.py delete mode 100644 parsl/tests/integration/test_channels/test_ssh_errors.py delete mode 100644 parsl/tests/integration/test_channels/test_ssh_file_transport.py delete mode 100644 parsl/tests/integration/test_channels/test_ssh_interactive.py delete mode 100644 parsl/tests/manual_tests/test_ad_hoc_htex.py delete mode 100644 parsl/tests/manual_tests/test_oauth_ssh.py diff --git a/docs/historical/changelog.rst b/docs/historical/changelog.rst index 18fe6ca5b1..931998f93d 100644 --- a/docs/historical/changelog.rst +++ b/docs/historical/changelog.rst @@ -334,7 +334,7 @@ New Functionality * New launcher: `parsl.launchers.WrappedLauncher` for launching tasks inside containers. -* `parsl.channels.SSHChannel` now supports a ``key_filename`` kwarg `issue#1639 `_ +* ``parsl.channels.SSHChannel`` now supports a ``key_filename`` kwarg `issue#1639 `_ * Newly added Makefile wraps several frequent developer operations such as: @@ -442,7 +442,7 @@ New Functionality module, parsl.data_provider.globus * `parsl.executors.WorkQueueExecutor`: a new executor that integrates functionality from `Work Queue `_ is now available. -* New provider to support for Ad-Hoc clusters `parsl.providers.AdHocProvider` +* New provider to support for Ad-Hoc clusters ``parsl.providers.AdHocProvider`` * New provider added to support LSF on Summit `parsl.providers.LSFProvider` * Support for CPU and Memory resource hints to providers `(github) `_. * The ``logging_level=logging.INFO`` in `parsl.monitoring.MonitoringHub` is replaced with ``monitoring_debug=False``: @@ -468,7 +468,7 @@ New Functionality * Several test-suite improvements that have dramatically reduced test duration. * Several improvements to the Monitoring interface. -* Configurable port on `parsl.channels.SSHChannel`. +* Configurable port on ``parsl.channels.SSHChannel``. * ``suppress_failure`` now defaults to True. * `parsl.executors.HighThroughputExecutor` is the recommended executor, and ``IPyParallelExecutor`` is deprecated. * `parsl.executors.HighThroughputExecutor` will expose worker information via environment variables: ``PARSL_WORKER_RANK`` and ``PARSL_WORKER_COUNT`` @@ -532,7 +532,7 @@ New Functionality * Cleaner user app file log management. * Updated configurations using `parsl.executors.HighThroughputExecutor` in the configuration section of the userguide. -* Support for OAuth based SSH with `parsl.channels.OAuthSSHChannel`. +* Support for OAuth based SSH with ``parsl.channels.OAuthSSHChannel``. Bug Fixes ^^^^^^^^^ diff --git a/docs/reference.rst b/docs/reference.rst index 1af850792c..d8e18bd244 100644 --- a/docs/reference.rst +++ b/docs/reference.rst @@ -38,15 +38,9 @@ Configuration Channels ======== -.. autosummary:: - :toctree: stubs - :nosignatures: - - parsl.channels.base.Channel - parsl.channels.LocalChannel - parsl.channels.SSHChannel - parsl.channels.OAuthSSHChannel - parsl.channels.SSHInteractiveLoginChannel +Channels are deprecated in Parsl. See +`issue 3515 `_ +for further discussion. Data management =============== @@ -109,7 +103,6 @@ Providers :toctree: stubs :nosignatures: - parsl.providers.AdHocProvider parsl.providers.AWSProvider parsl.providers.CobaltProvider parsl.providers.CondorProvider diff --git a/docs/userguide/configuring.rst b/docs/userguide/configuring.rst index 24ce0ca938..bb3a3949e3 100644 --- a/docs/userguide/configuring.rst +++ b/docs/userguide/configuring.rst @@ -15,7 +15,7 @@ queues, durations, and data management options. The following example shows a basic configuration object (:class:`~parsl.config.Config`) for the Frontera supercomputer at TACC. This config uses the `parsl.executors.HighThroughputExecutor` to submit -tasks from a login node (`parsl.channels.LocalChannel`). It requests an allocation of +tasks from a login node. It requests an allocation of 128 nodes, deploying 1 worker for each of the 56 cores per node, from the normal partition. To limit network connections to just the internal network the config specifies the address used by the infiniband interface with ``address_by_interface('ib0')`` @@ -23,7 +23,6 @@ used by the infiniband interface with ``address_by_interface('ib0')`` .. code-block:: python from parsl.config import Config - from parsl.channels import LocalChannel from parsl.providers import SlurmProvider from parsl.executors import HighThroughputExecutor from parsl.launchers import SrunLauncher @@ -36,7 +35,6 @@ used by the infiniband interface with ``address_by_interface('ib0')`` address=address_by_interface('ib0'), max_workers_per_node=56, provider=SlurmProvider( - channel=LocalChannel(), nodes_per_block=128, init_blocks=1, partition='normal', @@ -197,22 +195,6 @@ Stepping through the following question should help formulate a suitable configu are on a **native Slurm** system like :ref:`configuring_nersc_cori` -4) Where will the main Parsl program run and how will it communicate with the apps? - -+------------------------+--------------------------+---------------------------------------------------+ -| Parsl program location | App execution target | Suitable channel | -+========================+==========================+===================================================+ -| Laptop/Workstation | Laptop/Workstation | `parsl.channels.LocalChannel` | -+------------------------+--------------------------+---------------------------------------------------+ -| Laptop/Workstation | Cloud Resources | No channel is needed | -+------------------------+--------------------------+---------------------------------------------------+ -| Laptop/Workstation | Clusters with no 2FA | `parsl.channels.SSHChannel` | -+------------------------+--------------------------+---------------------------------------------------+ -| Laptop/Workstation | Clusters with 2FA | `parsl.channels.SSHInteractiveLoginChannel` | -+------------------------+--------------------------+---------------------------------------------------+ -| Login node | Cluster/Supercomputer | `parsl.channels.LocalChannel` | -+------------------------+--------------------------+---------------------------------------------------+ - Heterogeneous Resources ----------------------- @@ -337,7 +319,6 @@ Provide either the number of executors (Parsl will assume they are named in inte worker_debug=True, available_accelerators=2, provider=LocalProvider( - channel=LocalChannel(), init_blocks=1, max_blocks=1, ), @@ -372,7 +353,6 @@ Select the best blocking strategy for processor's cache hierarchy (choose ``alte worker_debug=True, cpu_affinity='alternating', provider=LocalProvider( - channel=LocalChannel(), init_blocks=1, max_blocks=1, ), @@ -412,18 +392,12 @@ These include ``OMP_NUM_THREADS``, ``GOMP_COMP_AFFINITY``, and ``KMP_THREAD_AFFI Ad-Hoc Clusters --------------- -Any collection of compute nodes without a scheduler can be considered an -ad-hoc cluster. Often these machines have a shared file system such as NFS or Lustre. -In order to use these resources with Parsl, they need to set-up for password-less SSH access. - -To use these ssh-accessible collection of nodes as an ad-hoc cluster, we use -the `parsl.providers.AdHocProvider` with an `parsl.channels.SSHChannel` to each node. An example -configuration follows. +Parsl's support of ad-hoc clusters of compute nodes without a scheduler +is deprecated. -.. literalinclude:: ../../parsl/configs/ad_hoc.py - -.. note:: - Multiple blocks should not be assigned to each node when using the `parsl.executors.HighThroughputExecutor` +See +`issue #3515 `_ +for further discussion. Amazon Web Services ------------------- diff --git a/docs/userguide/examples/config.py b/docs/userguide/examples/config.py index 166faaf4ac..68057d2b01 100644 --- a/docs/userguide/examples/config.py +++ b/docs/userguide/examples/config.py @@ -1,4 +1,3 @@ -from parsl.channels import LocalChannel from parsl.config import Config from parsl.executors import HighThroughputExecutor from parsl.providers import LocalProvider @@ -8,9 +7,7 @@ HighThroughputExecutor( label="htex_local", cores_per_worker=1, - provider=LocalProvider( - channel=LocalChannel(), - ), + provider=LocalProvider(), ) ], ) diff --git a/docs/userguide/execution.rst b/docs/userguide/execution.rst index 4168367f9d..df17dc458f 100644 --- a/docs/userguide/execution.rst +++ b/docs/userguide/execution.rst @@ -47,8 +47,7 @@ Parsl currently supports the following providers: 7. `parsl.providers.AWSProvider`: This provider allows you to provision and manage cloud nodes from Amazon Web Services. 8. `parsl.providers.GoogleCloudProvider`: This provider allows you to provision and manage cloud nodes from Google Cloud. 9. `parsl.providers.KubernetesProvider`: This provider allows you to provision and manage containers on a Kubernetes cluster. -10. `parsl.providers.AdHocProvider`: This provider allows you manage execution over a collection of nodes to form an ad-hoc cluster. -11. `parsl.providers.LSFProvider`: This provider allows you to schedule resources via IBM's LSF scheduler. +10. `parsl.providers.LSFProvider`: This provider allows you to schedule resources via IBM's LSF scheduler. diff --git a/docs/userguide/plugins.rst b/docs/userguide/plugins.rst index 4ecff86cfe..c3c38dea63 100644 --- a/docs/userguide/plugins.rst +++ b/docs/userguide/plugins.rst @@ -16,8 +16,8 @@ executor to run code on the local submitting host, while another executor can run the same code on a large supercomputer. -Providers, Launchers and Channels ---------------------------------- +Providers and Launchers +----------------------- Some executors are based on blocks of workers (for example the `parsl.executors.HighThroughputExecutor`: the submit side requires a batch system (eg slurm, kubernetes) to start worker processes, which then @@ -34,10 +34,9 @@ add on any wrappers that are needed to launch the command (eg srun inside slurm). Providers and launchers are usually paired together for a particular system type. -A `Channel` allows the commands used to interact with an `ExecutionProvider` to be -executed on a remote system. The default channel executes commands on the -local system, but a few variants of an `parsl.channels.SSHChannel` are provided. - +Parsl also has a deprecated ``Channel`` abstraction. See +`issue 3515 `_ +for further discussion. File staging ------------ diff --git a/parsl/channels/__init__.py b/parsl/channels/__init__.py index 5a45d15278..c81f6a8bf1 100644 --- a/parsl/channels/__init__.py +++ b/parsl/channels/__init__.py @@ -1,7 +1,4 @@ from parsl.channels.base import Channel from parsl.channels.local.local import LocalChannel -from parsl.channels.oauth_ssh.oauth_ssh import OAuthSSHChannel -from parsl.channels.ssh.ssh import SSHChannel -from parsl.channels.ssh_il.ssh_il import SSHInteractiveLoginChannel -__all__ = ['Channel', 'SSHChannel', 'LocalChannel', 'SSHInteractiveLoginChannel', 'OAuthSSHChannel'] +__all__ = ['Channel', 'LocalChannel'] diff --git a/parsl/channels/oauth_ssh/oauth_ssh.py b/parsl/channels/oauth_ssh/oauth_ssh.py index c9efa27767..3173b163a8 100644 --- a/parsl/channels/oauth_ssh/oauth_ssh.py +++ b/parsl/channels/oauth_ssh/oauth_ssh.py @@ -3,7 +3,7 @@ import paramiko -from parsl.channels.ssh.ssh import SSHChannel +from parsl.channels.ssh.ssh import DeprecatedSSHChannel from parsl.errors import OptionalModuleMissing try: @@ -17,7 +17,7 @@ logger = logging.getLogger(__name__) -class OAuthSSHChannel(SSHChannel): +class DeprecatedOAuthSSHChannel(DeprecatedSSHChannel): """SSH persistent channel. This enables remote execution on sites accessible via ssh. This channel uses Globus based OAuth tokens for authentication. """ diff --git a/parsl/channels/ssh/ssh.py b/parsl/channels/ssh/ssh.py index bf33727e63..38b8afe47b 100644 --- a/parsl/channels/ssh/ssh.py +++ b/parsl/channels/ssh/ssh.py @@ -24,7 +24,7 @@ def _auth(self, username, *args): return -class SSHChannel(Channel, RepresentationMixin): +class DeprecatedSSHChannel(Channel, RepresentationMixin): ''' SSH persistent channel. This enables remote execution on sites accessible via ssh. It is assumed that the user has setup host keys so as to ssh to the remote host. Which goes to say that the following diff --git a/parsl/channels/ssh_il/ssh_il.py b/parsl/channels/ssh_il/ssh_il.py index 02e7a58cd4..3a5e0c5096 100644 --- a/parsl/channels/ssh_il/ssh_il.py +++ b/parsl/channels/ssh_il/ssh_il.py @@ -3,12 +3,12 @@ import paramiko -from parsl.channels.ssh.ssh import SSHChannel +from parsl.channels.ssh.ssh import DeprecatedSSHChannel logger = logging.getLogger(__name__) -class SSHInteractiveLoginChannel(SSHChannel): +class DeprecatedSSHInteractiveLoginChannel(DeprecatedSSHChannel): """SSH persistent channel. This enables remote execution on sites accessible via ssh. This channel supports interactive login and is appropriate when keys are not set up. diff --git a/parsl/configs/ad_hoc.py b/parsl/configs/ad_hoc.py deleted file mode 100644 index 05b0e8190d..0000000000 --- a/parsl/configs/ad_hoc.py +++ /dev/null @@ -1,38 +0,0 @@ -from typing import Any, Dict - -from parsl.channels import SSHChannel -from parsl.config import Config -from parsl.executors import HighThroughputExecutor -from parsl.providers import AdHocProvider -from parsl.usage_tracking.levels import LEVEL_1 - -user_opts: Dict[str, Dict[str, Any]] -user_opts = {'adhoc': - {'username': 'YOUR_USERNAME', - 'script_dir': 'YOUR_SCRIPT_DIR', - 'remote_hostnames': ['REMOTE_HOST_URL_1', 'REMOTE_HOST_URL_2'] - } - } - - -config = Config( - executors=[ - HighThroughputExecutor( - label='remote_htex', - max_workers_per_node=2, - worker_logdir_root=user_opts['adhoc']['script_dir'], - provider=AdHocProvider( - # Command to be run before starting a worker, such as: - # 'module load Anaconda; source activate parsl_env'. - worker_init='', - channels=[SSHChannel(hostname=m, - username=user_opts['adhoc']['username'], - script_dir=user_opts['adhoc']['script_dir'], - ) for m in user_opts['adhoc']['remote_hostnames']] - ) - ) - ], - # AdHoc Clusters should not be setup with scaling strategy. - strategy='none', - usage_tracking=LEVEL_1, -) diff --git a/parsl/providers/__init__.py b/parsl/providers/__init__.py index 475737f1f9..150f425f3d 100644 --- a/parsl/providers/__init__.py +++ b/parsl/providers/__init__.py @@ -1,6 +1,3 @@ -# Workstation Provider -from parsl.providers.ad_hoc.ad_hoc import AdHocProvider - # Cloud Providers from parsl.providers.aws.aws import AWSProvider from parsl.providers.azure.azure import AzureProvider @@ -24,7 +21,6 @@ 'SlurmProvider', 'TorqueProvider', 'LSFProvider', - 'AdHocProvider', 'PBSProProvider', 'AWSProvider', 'GoogleCloudProvider', diff --git a/parsl/providers/ad_hoc/ad_hoc.py b/parsl/providers/ad_hoc/ad_hoc.py index 207dd55738..9059648101 100644 --- a/parsl/providers/ad_hoc/ad_hoc.py +++ b/parsl/providers/ad_hoc/ad_hoc.py @@ -12,8 +12,12 @@ logger = logging.getLogger(__name__) -class AdHocProvider(ExecutionProvider, RepresentationMixin): - """ Ad-hoc execution provider +class DeprecatedAdHocProvider(ExecutionProvider, RepresentationMixin): + """ Deprecated ad-hoc execution provider + + The (former) AdHocProvider is deprecated. See + `issue #3515 `_ + for further discussion. This provider is used to provision execution resources over one or more ad hoc nodes that are each accessible over a Channel (say, ssh) but otherwise lack a cluster scheduler. diff --git a/parsl/tests/configs/ad_hoc_cluster_htex.py b/parsl/tests/configs/ad_hoc_cluster_htex.py deleted file mode 100644 index 0949b82392..0000000000 --- a/parsl/tests/configs/ad_hoc_cluster_htex.py +++ /dev/null @@ -1,35 +0,0 @@ -from typing import Any, Dict - -from parsl.channels import SSHChannel -from parsl.config import Config -from parsl.executors import HighThroughputExecutor -from parsl.providers import AdHocProvider - -user_opts = {'adhoc': - {'username': 'YOUR_USERNAME', - 'script_dir': 'YOUR_SCRIPT_DIR', - 'remote_hostnames': ['REMOTE_HOST_URL_1', 'REMOTE_HOST_URL_2'] - } - } # type: Dict[str, Dict[str, Any]] - -config = Config( - executors=[ - HighThroughputExecutor( - label='remote_htex', - max_workers_per_node=2, - worker_logdir_root=user_opts['adhoc']['script_dir'], - encrypted=True, - provider=AdHocProvider( - # Command to be run before starting a worker, such as: - # 'module load Anaconda; source activate parsl_env'. - worker_init='', - channels=[SSHChannel(hostname=m, - username=user_opts['adhoc']['username'], - script_dir=user_opts['adhoc']['script_dir'], - ) for m in user_opts['adhoc']['remote_hostnames']] - ) - ) - ], - # AdHoc Clusters should not be setup with scaling strategy. - strategy='none', -) diff --git a/parsl/tests/configs/htex_ad_hoc_cluster.py b/parsl/tests/configs/htex_ad_hoc_cluster.py deleted file mode 100644 index db24b42ab2..0000000000 --- a/parsl/tests/configs/htex_ad_hoc_cluster.py +++ /dev/null @@ -1,26 +0,0 @@ -from parsl.channels import SSHChannel -from parsl.config import Config -from parsl.executors import HighThroughputExecutor -from parsl.providers import AdHocProvider -from parsl.tests.configs.user_opts import user_opts - -config = Config( - executors=[ - HighThroughputExecutor( - label='remote_htex', - cores_per_worker=1, - worker_debug=False, - address=user_opts['public_ip'], - encrypted=True, - provider=AdHocProvider( - move_files=False, - parallelism=1, - worker_init=user_opts['adhoc']['worker_init'], - channels=[SSHChannel(hostname=m, - username=user_opts['adhoc']['username'], - script_dir=user_opts['adhoc']['script_dir'], - ) for m in user_opts['adhoc']['remote_hostnames']] - ) - ) - ], -) diff --git a/parsl/tests/configs/local_adhoc.py b/parsl/tests/configs/local_adhoc.py index 25b1f38d61..9b1f951842 100644 --- a/parsl/tests/configs/local_adhoc.py +++ b/parsl/tests/configs/local_adhoc.py @@ -1,7 +1,7 @@ from parsl.channels import LocalChannel from parsl.config import Config from parsl.executors import HighThroughputExecutor -from parsl.providers import AdHocProvider +from parsl.providers.ad_hoc.ad_hoc import DeprecatedAdHocProvider def fresh_config(): @@ -10,7 +10,7 @@ def fresh_config(): HighThroughputExecutor( label='AdHoc', encrypted=True, - provider=AdHocProvider( + provider=DeprecatedAdHocProvider( channels=[LocalChannel(), LocalChannel()] ) ) diff --git a/parsl/tests/configs/swan_htex.py b/parsl/tests/configs/swan_htex.py deleted file mode 100644 index 3b1b6785ab..0000000000 --- a/parsl/tests/configs/swan_htex.py +++ /dev/null @@ -1,43 +0,0 @@ -""" -================== Block -| ++++++++++++++ | Node -| | | | -| | Task | | . . . -| | | | -| ++++++++++++++ | -================== -""" -from parsl.channels import SSHChannel -from parsl.config import Config -from parsl.executors import HighThroughputExecutor -from parsl.launchers import AprunLauncher -from parsl.providers import TorqueProvider - -# If you are a developer running tests, make sure to update parsl/tests/configs/user_opts.py -# If you are a user copying-and-pasting this as an example, make sure to either -# 1) create a local `user_opts.py`, or -# 2) delete the user_opts import below and replace all appearances of `user_opts` with the literal value -# (i.e., user_opts['swan']['username'] -> 'your_username') -from .user_opts import user_opts - -config = Config( - executors=[ - HighThroughputExecutor( - label='swan_htex', - encrypted=True, - provider=TorqueProvider( - channel=SSHChannel( - hostname='swan.cray.com', - username=user_opts['swan']['username'], - script_dir=user_opts['swan']['script_dir'], - ), - nodes_per_block=1, - init_blocks=1, - max_blocks=1, - launcher=AprunLauncher(), - scheduler_options=user_opts['swan']['scheduler_options'], - worker_init=user_opts['swan']['worker_init'], - ), - ) - ] -) diff --git a/parsl/tests/integration/test_channels/test_scp_1.py b/parsl/tests/integration/test_channels/test_scp_1.py deleted file mode 100644 index c11df3c663..0000000000 --- a/parsl/tests/integration/test_channels/test_scp_1.py +++ /dev/null @@ -1,45 +0,0 @@ -import os - -from parsl.channels.ssh.ssh import SSHChannel as SSH - - -def connect_and_list(hostname, username): - out = '' - conn = SSH(hostname, username=username) - conn.push_file(os.path.abspath('remote_run.sh'), '/home/davidk/') - # ec, out, err = conn.execute_wait("ls /tmp/remote_run.sh; bash /tmp/remote_run.sh") - conn.close() - return out - - -script = '''#!/bin/bash -echo "Hostname: $HOSTNAME" -echo "Cpu info -----" -cat /proc/cpuinfo -echo "Done----------" -''' - - -def test_connect_1(): - with open('remote_run.sh', 'w') as f: - f.write(script) - - sites = { - 'midway': { - 'url': 'midway.rcc.uchicago.edu', - 'uname': 'yadunand' - }, - 'swift': { - 'url': 'swift.rcc.uchicago.edu', - 'uname': 'yadunand' - } - } - - for site in sites.values(): - out = connect_and_list(site['url'], site['uname']) - print("Sitename :{0} hostname:{1}".format(site['url'], out)) - - -if __name__ == "__main__": - - test_connect_1() diff --git a/parsl/tests/integration/test_channels/test_ssh_1.py b/parsl/tests/integration/test_channels/test_ssh_1.py deleted file mode 100644 index 61ab3f2705..0000000000 --- a/parsl/tests/integration/test_channels/test_ssh_1.py +++ /dev/null @@ -1,40 +0,0 @@ -from parsl.channels.ssh.ssh import SSHChannel as SSH - - -def connect_and_list(hostname, username): - conn = SSH(hostname, username=username) - ec, out, err = conn.execute_wait("echo $HOSTNAME") - conn.close() - return out - - -def test_midway(): - ''' Test ssh channels to midway - ''' - url = 'midway.rcc.uchicago.edu' - uname = 'yadunand' - out = connect_and_list(url, uname) - print("Sitename :{0} hostname:{1}".format(url, out)) - - -def test_beagle(): - ''' Test ssh channels to beagle - ''' - url = 'login04.beagle.ci.uchicago.edu' - uname = 'yadunandb' - out = connect_and_list(url, uname) - print("Sitename :{0} hostname:{1}".format(url, out)) - - -def test_osg(): - ''' Test ssh connectivity to osg - ''' - url = 'login.osgconnect.net' - uname = 'yadunand' - out = connect_and_list(url, uname) - print("Sitename :{0} hostname:{1}".format(url, out)) - - -if __name__ == "__main__": - - pass diff --git a/parsl/tests/integration/test_channels/test_ssh_errors.py b/parsl/tests/integration/test_channels/test_ssh_errors.py deleted file mode 100644 index 7483e30a5c..0000000000 --- a/parsl/tests/integration/test_channels/test_ssh_errors.py +++ /dev/null @@ -1,46 +0,0 @@ -from parsl.channels.errors import BadHostKeyException, SSHException -from parsl.channels.ssh.ssh import SSHChannel as SSH - - -def connect_and_list(hostname, username): - conn = SSH(hostname, username=username) - ec, out, err = conn.execute_wait("echo $HOSTNAME") - conn.close() - return out - - -def test_error_1(): - try: - connect_and_list("bad.url.gov", "ubuntu") - except Exception as e: - assert type(e) is SSHException, "Expected SSException, got: {0}".format(e) - - -def test_error_2(): - try: - connect_and_list("swift.rcc.uchicago.edu", "mango") - except SSHException: - print("Caught the right exception") - else: - raise Exception("Expected SSException, got: {0}".format(e)) - - -def test_error_3(): - ''' This should work - ''' - try: - connect_and_list("edison.nersc.gov", "yadunand") - except BadHostKeyException as e: - print("Caught exception BadHostKeyException: ", e) - else: - assert False, "Expected SSException, got: {0}".format(e) - - -if __name__ == "__main__": - - tests = [test_error_1, test_error_2, test_error_3] - - for test in tests: - print("---------Running : {0}---------------".format(test)) - test() - print("----------------------DONE--------------------------") diff --git a/parsl/tests/integration/test_channels/test_ssh_file_transport.py b/parsl/tests/integration/test_channels/test_ssh_file_transport.py deleted file mode 100644 index 61672c3ff5..0000000000 --- a/parsl/tests/integration/test_channels/test_ssh_file_transport.py +++ /dev/null @@ -1,41 +0,0 @@ -import parsl -from parsl.channels.ssh.ssh import SSHChannel as SSH - - -def connect_and_list(hostname, username): - conn = SSH(hostname, username=username) - ec, out, err = conn.execute_wait("echo $HOSTNAME") - conn.close() - return out - - -def test_push(conn, fname="test001.txt"): - - with open(fname, 'w') as f: - f.write("Hello from parsl.ssh testing\n") - - conn.push_file(fname, "/tmp") - ec, out, err = conn.execute_wait("ls /tmp/{0}".format(fname)) - print(ec, out, err) - - -def test_pull(conn, fname="test001.txt"): - - local = "foo" - conn.pull_file("/tmp/{0}".format(fname), local) - - with open("{0}/{1}".format(local, fname), 'r') as f: - print(f.readlines()) - - -if __name__ == "__main__": - - parsl.set_stream_logger() - - # This is for testing - conn = SSH("midway.rcc.uchicago.edu", username="yadunand") - - test_push(conn) - test_pull(conn) - - conn.close() diff --git a/parsl/tests/integration/test_channels/test_ssh_interactive.py b/parsl/tests/integration/test_channels/test_ssh_interactive.py deleted file mode 100644 index c6f9b9dea9..0000000000 --- a/parsl/tests/integration/test_channels/test_ssh_interactive.py +++ /dev/null @@ -1,24 +0,0 @@ -import parsl -from parsl.channels.ssh_il.ssh_il import SSHInteractiveLoginChannel as SSH - - -def connect_and_list(hostname, username): - conn = SSH(hostname, username=username) - ec, out, err = conn.execute_wait("echo $HOSTNAME") - conn.close() - return out - - -def test_cooley(): - ''' Test ssh channels to midway - ''' - url = 'cooley.alcf.anl.gov' - uname = 'yadunand' - out = connect_and_list(url, uname) - print("Sitename :{0} hostname:{1}".format(url, out)) - return - - -if __name__ == "__main__": - parsl.set_stream_logger() - test_cooley() diff --git a/parsl/tests/manual_tests/test_ad_hoc_htex.py b/parsl/tests/manual_tests/test_ad_hoc_htex.py deleted file mode 100644 index dfa34ec0d1..0000000000 --- a/parsl/tests/manual_tests/test_ad_hoc_htex.py +++ /dev/null @@ -1,49 +0,0 @@ -import parsl -from parsl import python_app - -parsl.set_stream_logger() - -from parsl.channels import SSHChannel -from parsl.config import Config -from parsl.executors import HighThroughputExecutor -from parsl.providers import AdHocProvider - -remotes = ['midway2-login2.rcc.uchicago.edu', 'midway2-login1.rcc.uchicago.edu'] - -config = Config( - executors=[ - HighThroughputExecutor( - label='AdHoc', - max_workers_per_node=2, - worker_logdir_root="/scratch/midway2/yadunand/parsl_scripts", - encrypted=True, - provider=AdHocProvider( - worker_init="source /scratch/midway2/yadunand/parsl_env_setup.sh", - channels=[SSHChannel(hostname=m, - username="yadunand", - script_dir="/scratch/midway2/yadunand/parsl_cluster") - for m in remotes] - ) - ) - ] -) - - -@python_app -def platform(sleep=2, stdout=None): - import platform - import time - time.sleep(sleep) - return platform.uname() - - -def test_raw_provider(): - - parsl.load(config) - - x = [platform() for i in range(10)] - print([i.result() for i in x]) - - -if __name__ == "__main__": - test_raw_provider() diff --git a/parsl/tests/manual_tests/test_oauth_ssh.py b/parsl/tests/manual_tests/test_oauth_ssh.py deleted file mode 100644 index 3d464bcc0e..0000000000 --- a/parsl/tests/manual_tests/test_oauth_ssh.py +++ /dev/null @@ -1,13 +0,0 @@ -from parsl.channels import OAuthSSHChannel - - -def test_channel(): - channel = OAuthSSHChannel(hostname='ssh.demo.globus.org', username='yadunand') - x, stdout, stderr = channel.execute_wait('ls') - print(x, stdout, stderr) - assert x == 0, "Expected exit code 0, got {}".format(x) - - -if __name__ == '__main__': - - test_channel() diff --git a/parsl/tests/test_providers/test_local_provider.py b/parsl/tests/test_providers/test_local_provider.py index c6844b00c0..497c13370d 100644 --- a/parsl/tests/test_providers/test_local_provider.py +++ b/parsl/tests/test_providers/test_local_provider.py @@ -11,7 +11,8 @@ import pytest -from parsl.channels import LocalChannel, SSHChannel +from parsl.channels import LocalChannel +from parsl.channels.ssh.ssh import DeprecatedSSHChannel from parsl.jobs.states import JobState from parsl.launchers import SingleNodeLauncher from parsl.providers import LocalProvider @@ -92,10 +93,10 @@ def test_ssh_channel(): # already exist, so create it here. pathlib.Path('{}/known.hosts'.format(config_dir)).touch(mode=0o600) script_dir = tempfile.mkdtemp() - channel = SSHChannel('127.0.0.1', port=server_port, - script_dir=remote_script_dir, - host_keys_filename='{}/known.hosts'.format(config_dir), - key_filename=priv_key) + channel = DeprecatedSSHChannel('127.0.0.1', port=server_port, + script_dir=remote_script_dir, + host_keys_filename='{}/known.hosts'.format(config_dir), + key_filename=priv_key) try: p = LocalProvider(channel=channel, launcher=SingleNodeLauncher(debug=False)) From 114e701b81f1abbc71a4fd438896fece16784f4d Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 8 Aug 2024 16:08:47 +0200 Subject: [PATCH 03/15] Close processes in Work Queue and Task Vine shutdown (#3576) This releases 2 file descriptors with work queue (from 21 to 19 at the end of CI Work Queue test) and 4 file descriptors with Task Vine (from 19 to 15 at the end of CI Task Vine test) This is part of work being merged from draft PR #3397 to shut down components more cleanly, rather than relying on process exit. --- parsl/executors/taskvine/executor.py | 2 ++ parsl/executors/workqueue/executor.py | 2 ++ 2 files changed, 4 insertions(+) diff --git a/parsl/executors/taskvine/executor.py b/parsl/executors/taskvine/executor.py index bebed1a51b..2e1efb211f 100644 --- a/parsl/executors/taskvine/executor.py +++ b/parsl/executors/taskvine/executor.py @@ -589,11 +589,13 @@ def shutdown(self, *args, **kwargs): # Join all processes before exiting logger.debug("Joining on submit process") self._submit_process.join() + self._submit_process.close() logger.debug("Joining on collector thread") self._collector_thread.join() if self.worker_launch_method == 'factory': logger.debug("Joining on factory process") self._factory_process.join() + self._factory_process.close() # Shutdown multiprocessing queues self._ready_task_queue.close() diff --git a/parsl/executors/workqueue/executor.py b/parsl/executors/workqueue/executor.py index a1ad49bca9..ae39f8c118 100644 --- a/parsl/executors/workqueue/executor.py +++ b/parsl/executors/workqueue/executor.py @@ -704,6 +704,8 @@ def shutdown(self, *args, **kwargs): logger.debug("Joining on submit process") self.submit_process.join() + self.submit_process.close() + logger.debug("Joining on collector thread") self.collector_thread.join() From ec9bbf63807c2d55fea6a8fccbdfb9bec7077950 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 8 Aug 2024 17:00:10 +0200 Subject: [PATCH 04/15] Promote unable to terminate warning to logger.WARNING (#3574) Even if the subsequent SIGKILL works, this is an exceptional circumstance that should be logged. --- parsl/executors/high_throughput/executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index 1a56195c07..301052c4c5 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -832,7 +832,7 @@ def shutdown(self, timeout: float = 10.0): try: self.interchange_proc.wait(timeout=timeout) except subprocess.TimeoutExpired: - logger.info("Unable to terminate Interchange process; sending SIGKILL") + logger.warning("Unable to terminate Interchange process; sending SIGKILL") self.interchange_proc.kill() logger.info("Closing ZMQ pipes") From 03e94c3619943db468feb25051f7b7e2c9933f09 Mon Sep 17 00:00:00 2001 From: Yadu Nand Babuji Date: Fri, 9 Aug 2024 15:22:30 -0500 Subject: [PATCH 05/15] Adding notes on `available_accelerators` (#3577) * Adding notes on how to specify list of strings to available_accelerators * Clarify how to bind multiple GPUs to workers --- docs/userguide/configuring.rst | 43 ++++++++++++++++++++++++++++++---- 1 file changed, 39 insertions(+), 4 deletions(-) diff --git a/docs/userguide/configuring.rst b/docs/userguide/configuring.rst index bb3a3949e3..f3fe5cc407 100644 --- a/docs/userguide/configuring.rst +++ b/docs/userguide/configuring.rst @@ -306,9 +306,13 @@ and Work Queue does not require Python to run. Accelerators ------------ -Many modern clusters provide multiple accelerators per compute note, yet many applications are best suited to using a single accelerator per task. -Parsl supports pinning each worker to difference accelerators using ``available_accelerators`` option of the :class:`~parsl.executors.HighThroughputExecutor`. -Provide either the number of executors (Parsl will assume they are named in integers starting from zero) or a list of the names of the accelerators available on the node. +Many modern clusters provide multiple accelerators per compute note, yet many applications are best suited to using a +single accelerator per task. Parsl supports pinning each worker to different accelerators using +``available_accelerators`` option of the :class:`~parsl.executors.HighThroughputExecutor`. Provide either the number of +executors (Parsl will assume they are named in integers starting from zero) or a list of the names of the accelerators +available on the node. Parsl will limit the number of workers it launches to the number of accelerators specified, +in other words, you cannot have more workers per node than there are accelerators. By default, Parsl will launch +as many workers as the accelerators specified via ``available_accelerators``. .. code-block:: python @@ -327,7 +331,38 @@ Provide either the number of executors (Parsl will assume they are named in inte strategy='none', ) -For hardware that uses Nvidia devices, Parsl allows for the oversubscription of workers to GPUS. This is intended to make use of Nvidia's `Multi-Process Service (MPS) `_ available on many of their GPUs that allows users to run multiple concurrent processes on a single GPU. The user needs to set in the ``worker_init`` commands to start MPS on every node in the block (this is machine dependent). The ``available_accelerators`` option should then be set to the total number of GPU partitions run on a single node in the block. For example, for a node with 4 Nvidia GPUs, to create 8 workers per GPU, set ``available_accelerators=32``. GPUs will be assigned to workers in ascending order in contiguous blocks. In the example, workers 0-7 will be placed on GPU 0, workers 8-15 on GPU 1, workers 16-23 on GPU 2, and workers 24-31 on GPU 3. +It is possible to bind multiple/specific accelerators to each worker by specifying a list of comma separated strings +each specifying accelerators. In the context of binding to NVIDIA GPUs, this works by setting ``CUDA_VISIBLE_DEVICES`` +on each worker to a specific string in the list supplied to ``available_accelerators``. + +Here's an example: + +.. code-block:: python + + # The following config is trimmed for clarity + local_config = Config( + executors=[ + HighThroughputExecutor( + # Starts 2 workers per node, each bound to 2 GPUs + available_accelerators=["0,1", "2,3"], + + # Start a single worker bound to all 4 GPUs + # available_accelerators=["0,1,2,3"] + ) + ], + ) + +GPU Oversubscription +"""""""""""""""""""" + +For hardware that uses Nvidia devices, Parsl allows for the oversubscription of workers to GPUS. This is intended to +make use of Nvidia's `Multi-Process Service (MPS) `_ available on many of their +GPUs that allows users to run multiple concurrent processes on a single GPU. The user needs to set in the +``worker_init`` commands to start MPS on every node in the block (this is machine dependent). The +``available_accelerators`` option should then be set to the total number of GPU partitions run on a single node in the +block. For example, for a node with 4 Nvidia GPUs, to create 8 workers per GPU, set ``available_accelerators=32``. +GPUs will be assigned to workers in ascending order in contiguous blocks. In the example, workers 0-7 will be placed +on GPU 0, workers 8-15 on GPU 1, workers 16-23 on GPU 2, and workers 24-31 on GPU 3. Multi-Threaded Applications --------------------------- From 2067b407bbf6e0d9d9ab66ab5b2393642907a1ae Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Sat, 10 Aug 2024 09:34:44 +0200 Subject: [PATCH 06/15] Convert monitoring type annotations to PEP-526 from comments (#3573) This is in preparation for future type work in the monitoring codebase (for example, see PR #3572). This PR does not claim that the types it is moving around are correct (and PR #3572 contains some instances where the types are incorrect). It is a purely syntactic PR. After this PR, $ git grep '# type:' parsl/monitoring/ returns two remaining comment style annotations, which are 'type: ignore' exclusions not specific types. --- parsl/monitoring/db_manager.py | 20 ++++++++++---------- parsl/monitoring/remote.py | 8 ++++---- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/parsl/monitoring/db_manager.py b/parsl/monitoring/db_manager.py index 9f19cd9f4d..8f9f302640 100644 --- a/parsl/monitoring/db_manager.py +++ b/parsl/monitoring/db_manager.py @@ -283,7 +283,7 @@ def __init__(self, ): self.workflow_end = False - self.workflow_start_message = None # type: Optional[MonitoringMessage] + self.workflow_start_message: Optional[MonitoringMessage] = None self.logdir = logdir os.makedirs(self.logdir, exist_ok=True) @@ -299,10 +299,10 @@ def __init__(self, self.batching_interval = batching_interval self.batching_threshold = batching_threshold - self.pending_priority_queue = queue.Queue() # type: queue.Queue[TaggedMonitoringMessage] - self.pending_node_queue = queue.Queue() # type: queue.Queue[MonitoringMessage] - self.pending_block_queue = queue.Queue() # type: queue.Queue[MonitoringMessage] - self.pending_resource_queue = queue.Queue() # type: queue.Queue[MonitoringMessage] + self.pending_priority_queue: queue.Queue[TaggedMonitoringMessage] = queue.Queue() + self.pending_node_queue: queue.Queue[MonitoringMessage] = queue.Queue() + self.pending_block_queue: queue.Queue[MonitoringMessage] = queue.Queue() + self.pending_resource_queue: queue.Queue[MonitoringMessage] = queue.Queue() def start(self, priority_queue: "queue.Queue[TaggedMonitoringMessage]", @@ -351,18 +351,18 @@ def start(self, If that happens, the message will be added to deferred_resource_messages and processed later. """ - inserted_tasks = set() # type: Set[object] + inserted_tasks: Set[object] = set() """ like inserted_tasks but for task,try tuples """ - inserted_tries = set() # type: Set[Any] + inserted_tries: Set[Any] = set() # for any task ID, we can defer exactly one message, which is the # assumed-to-be-unique first message (with first message flag set). # The code prior to this patch will discard previous message in # the case of multiple messages to defer. - deferred_resource_messages = {} # type: MonitoringMessage + deferred_resource_messages: MonitoringMessage = {} exception_happened = False @@ -505,7 +505,7 @@ def start(self, "Got {} messages from block queue".format(len(block_info_messages))) # block_info_messages is possibly a nested list of dict (at different polling times) # Each dict refers to the info of a job/block at one polling time - block_messages_to_insert = [] # type: List[Any] + block_messages_to_insert: List[Any] = [] for block_msg in block_info_messages: block_messages_to_insert.extend(block_msg) self._insert(table=BLOCK, messages=block_messages_to_insert) @@ -686,7 +686,7 @@ def _insert(self, table: str, messages: List[MonitoringMessage]) -> None: logger.exception("Rollback failed") def _get_messages_in_batch(self, msg_queue: "queue.Queue[X]") -> List[X]: - messages = [] # type: List[X] + messages: List[X] = [] start = time.time() while True: if time.time() - start >= self.batching_interval or len(messages) >= self.batching_threshold: diff --git a/parsl/monitoring/remote.py b/parsl/monitoring/remote.py index 055a013627..d374338dee 100644 --- a/parsl/monitoring/remote.py +++ b/parsl/monitoring/remote.py @@ -199,10 +199,10 @@ def monitor(pid: int, pm = psutil.Process(pid) - children_user_time = {} # type: Dict[int, float] - children_system_time = {} # type: Dict[int, float] - children_num_ctx_switches_voluntary = {} # type: Dict[int, float] - children_num_ctx_switches_involuntary = {} # type: Dict[int, float] + children_user_time: Dict[int, float] = {} + children_system_time: Dict[int, float] = {} + children_num_ctx_switches_voluntary: Dict[int, float] = {} + children_num_ctx_switches_involuntary: Dict[int, float] = {} def accumulate_and_prepare() -> Dict[str, Any]: d = {"psutil_process_" + str(k): v for k, v in pm.as_dict().items() if k in simple} From ffb364450c943f827fdc815d05ade40ebaf2724f Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Wed, 14 Aug 2024 22:57:12 +0200 Subject: [PATCH 07/15] Correct and check types on monitoring router and database processes (#3572) Prior to this PR, the startup code for the monitoring router and database processes had type annotations on queues; but these types were not checked, and were incorrect - they were labelled process-local Queue instead of multiprocessing queues. This did not cause much trouble execution- and mypy-wise, as the interfaces of those two classes are similar enough, but it is confusing to read in a part of the codebase that is already confusing (that confusion is probably what lead to the incorrect annotations in the first place...) They were not checked because the informal policy of "internal stuff is checked with mypy, external interfaces are checked with typeguard" works badly here: The startup methods are launched using multiprocessing.Process, and function invocations are not type-checked by mypy across a multiprocessing Process constructor. Changed Behaviour This PR introduces typeguard decorators onto the router and database start methods so that this internal checking happens at runtime. This consequently reveals that the type annotations of these methods are incorrect, and so this PR makes those consequential changes. Further, generic types (Queue[MessageType]) are not supported on multiprocessing.Queues before Python 3.12 - so those generic indices are removed from the type annotations. That is unfortunate and weakens in-process static verification - but they could be re-introduced after Parsl drops Python 3.11 support (around 2027 in the present informal support policy) --- parsl/monitoring/db_manager.py | 22 +++++++++++++--------- parsl/monitoring/router.py | 26 ++++++++++++++------------ 2 files changed, 27 insertions(+), 21 deletions(-) diff --git a/parsl/monitoring/db_manager.py b/parsl/monitoring/db_manager.py index 8f9f302640..853bc4c3c7 100644 --- a/parsl/monitoring/db_manager.py +++ b/parsl/monitoring/db_manager.py @@ -1,11 +1,14 @@ import datetime import logging +import multiprocessing.queues as mpq import os import queue import threading import time from typing import Any, Dict, List, Optional, Set, Tuple, TypeVar, cast +import typeguard + from parsl.dataflow.states import States from parsl.errors import OptionalModuleMissing from parsl.log_utils import set_file_logger @@ -305,10 +308,10 @@ def __init__(self, self.pending_resource_queue: queue.Queue[MonitoringMessage] = queue.Queue() def start(self, - priority_queue: "queue.Queue[TaggedMonitoringMessage]", - node_queue: "queue.Queue[MonitoringMessage]", - block_queue: "queue.Queue[MonitoringMessage]", - resource_queue: "queue.Queue[MonitoringMessage]") -> None: + priority_queue: mpq.Queue, + node_queue: mpq.Queue, + block_queue: mpq.Queue, + resource_queue: mpq.Queue) -> None: self._kill_event = threading.Event() self._priority_queue_pull_thread = threading.Thread(target=self._migrate_logs_to_internal, @@ -719,11 +722,12 @@ def close(self) -> None: @wrap_with_logs(target="database_manager") -def dbm_starter(exception_q: "queue.Queue[Tuple[str, str]]", - priority_msgs: "queue.Queue[TaggedMonitoringMessage]", - node_msgs: "queue.Queue[MonitoringMessage]", - block_msgs: "queue.Queue[MonitoringMessage]", - resource_msgs: "queue.Queue[MonitoringMessage]", +@typeguard.typechecked +def dbm_starter(exception_q: mpq.Queue, + priority_msgs: mpq.Queue, + node_msgs: mpq.Queue, + block_msgs: mpq.Queue, + resource_msgs: mpq.Queue, db_url: str, logdir: str, logging_level: int) -> None: diff --git a/parsl/monitoring/router.py b/parsl/monitoring/router.py index 4be454b797..7cce223048 100644 --- a/parsl/monitoring/router.py +++ b/parsl/monitoring/router.py @@ -1,15 +1,16 @@ from __future__ import annotations import logging +import multiprocessing.queues as mpq import os import pickle -import queue import socket import threading import time from multiprocessing.synchronize import Event -from typing import Optional, Tuple, Union +from typing import Optional, Tuple +import typeguard import zmq from parsl.log_utils import set_file_logger @@ -33,10 +34,10 @@ def __init__(self, logdir: str = ".", logging_level: int = logging.INFO, atexit_timeout: int = 3, # in seconds - priority_msgs: "queue.Queue[AddressedMonitoringMessage]", - node_msgs: "queue.Queue[AddressedMonitoringMessage]", - block_msgs: "queue.Queue[AddressedMonitoringMessage]", - resource_msgs: "queue.Queue[AddressedMonitoringMessage]", + priority_msgs: mpq.Queue, + node_msgs: mpq.Queue, + block_msgs: mpq.Queue, + resource_msgs: mpq.Queue, exit_event: Event, ): """ Initializes a monitoring configuration class. @@ -202,12 +203,13 @@ def start_zmq_listener(self) -> None: @wrap_with_logs -def router_starter(comm_q: "queue.Queue[Union[Tuple[int, int], str]]", - exception_q: "queue.Queue[Tuple[str, str]]", - priority_msgs: "queue.Queue[AddressedMonitoringMessage]", - node_msgs: "queue.Queue[AddressedMonitoringMessage]", - block_msgs: "queue.Queue[AddressedMonitoringMessage]", - resource_msgs: "queue.Queue[AddressedMonitoringMessage]", +@typeguard.typechecked +def router_starter(comm_q: mpq.Queue, + exception_q: mpq.Queue, + priority_msgs: mpq.Queue, + node_msgs: mpq.Queue, + block_msgs: mpq.Queue, + resource_msgs: mpq.Queue, exit_event: Event, hub_address: str, From e34a70a23090e6f97b6090b3e3d567651e81e3d5 Mon Sep 17 00:00:00 2001 From: arhag23 <35051569+arhag23@users.noreply.github.com> Date: Fri, 16 Aug 2024 07:18:08 -0400 Subject: [PATCH 08/15] Make paramiko an optional dependency (#3584) Removed paramiko from requirements.txt and added it as an optional module in setup.py. Added OptionalModuleMissing errors for the ssh channel files for when usage is attempted without the required paramiko module being installed. Changed Behaviour: If users have code that depends on the ssh channels, they may need to opt in to that module. Prepares for #3515 --- parsl/channels/oauth_ssh/oauth_ssh.py | 12 ++++++++++-- parsl/channels/ssh/ssh.py | 22 ++++++++++++++++------ parsl/channels/ssh_il/ssh_il.py | 14 ++++++++++++-- requirements.txt | 1 - setup.py | 1 + test-requirements.txt | 1 + 6 files changed, 40 insertions(+), 11 deletions(-) diff --git a/parsl/channels/oauth_ssh/oauth_ssh.py b/parsl/channels/oauth_ssh/oauth_ssh.py index 3173b163a8..1b690a4e3c 100644 --- a/parsl/channels/oauth_ssh/oauth_ssh.py +++ b/parsl/channels/oauth_ssh/oauth_ssh.py @@ -1,11 +1,15 @@ import logging import socket -import paramiko - from parsl.channels.ssh.ssh import DeprecatedSSHChannel from parsl.errors import OptionalModuleMissing +try: + import paramiko + _ssh_enabled = True +except (ImportError, NameError, FileNotFoundError): + _ssh_enabled = False + try: from oauth_ssh.oauth_ssh_token import find_access_token from oauth_ssh.ssh_service import SSHService @@ -38,6 +42,10 @@ def __init__(self, hostname, username=None, script_dir=None, envs=None, port=22) Raises: ''' + if not _ssh_enabled: + raise OptionalModuleMissing(['ssh'], + "OauthSSHChannel requires the ssh module and config.") + if not _oauth_ssh_enabled: raise OptionalModuleMissing(['oauth_ssh'], "OauthSSHChannel requires oauth_ssh module and config.") diff --git a/parsl/channels/ssh/ssh.py b/parsl/channels/ssh/ssh.py index 38b8afe47b..c53a26b831 100644 --- a/parsl/channels/ssh/ssh.py +++ b/parsl/channels/ssh/ssh.py @@ -2,8 +2,6 @@ import logging import os -import paramiko - from parsl.channels.base import Channel from parsl.channels.errors import ( AuthException, @@ -13,15 +11,24 @@ FileCopyException, SSHException, ) +from parsl.errors import OptionalModuleMissing from parsl.utils import RepresentationMixin +try: + import paramiko + _ssh_enabled = True +except (ImportError, NameError, FileNotFoundError): + _ssh_enabled = False + + logger = logging.getLogger(__name__) -class NoAuthSSHClient(paramiko.SSHClient): - def _auth(self, username, *args): - self._transport.auth_none(username) - return +if _ssh_enabled: + class NoAuthSSHClient(paramiko.SSHClient): + def _auth(self, username, *args): + self._transport.auth_none(username) + return class DeprecatedSSHChannel(Channel, RepresentationMixin): @@ -53,6 +60,9 @@ def __init__(self, hostname, username=None, password=None, script_dir=None, envs Raises: ''' + if not _ssh_enabled: + raise OptionalModuleMissing(['ssh'], + "SSHChannel requires the ssh module and config.") self.hostname = hostname self.username = username diff --git a/parsl/channels/ssh_il/ssh_il.py b/parsl/channels/ssh_il/ssh_il.py index 3a5e0c5096..67e5501a43 100644 --- a/parsl/channels/ssh_il/ssh_il.py +++ b/parsl/channels/ssh_il/ssh_il.py @@ -1,9 +1,15 @@ import getpass import logging -import paramiko - from parsl.channels.ssh.ssh import DeprecatedSSHChannel +from parsl.errors import OptionalModuleMissing + +try: + import paramiko + _ssh_enabled = True +except (ImportError, NameError, FileNotFoundError): + _ssh_enabled = False + logger = logging.getLogger(__name__) @@ -30,6 +36,10 @@ def __init__(self, hostname, username=None, password=None, script_dir=None, envs Raises: ''' + if not _ssh_enabled: + raise OptionalModuleMissing(['ssh'], + "SSHInteractiveLoginChannel requires the ssh module and config.") + self.hostname = hostname self.username = username self.password = password diff --git a/requirements.txt b/requirements.txt index e89202942e..c60517655f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,7 +12,6 @@ globus-sdk dill tblib requests -paramiko psutil>=5.5.1 setproctitle filelock>=3.13,<4 diff --git a/setup.py b/setup.py index 85e014dc18..4934d01e5d 100755 --- a/setup.py +++ b/setup.py @@ -35,6 +35,7 @@ 'flux': ['pyyaml', 'cffi', 'jsonschema'], 'proxystore': ['proxystore'], 'radical-pilot': ['radical.pilot==1.60', 'radical.utils==1.60'], + 'ssh': ['paramiko'], # Disabling psi-j since github direct links are not allowed by pypi # 'psij': ['psi-j-parsl@git+https://github.com/ExaWorks/psi-j-parsl'] } diff --git a/test-requirements.txt b/test-requirements.txt index c735de8d5c..415e995c1b 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -1,6 +1,7 @@ flake8==6.1.0 ipyparallel pandas +paramiko pytest>=7.4.0,<8 pytest-cov pytest-random-order From 357547ff2b67a60d8935ae5b63d2ee029ca0cada Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Fri, 16 Aug 2024 14:06:12 +0200 Subject: [PATCH 09/15] Make router_starter parameters mandatory kwargs (#3583) See PR #2973 for justification of mandatory keyword args. --- parsl/monitoring/monitoring.py | 11 ++++++++--- parsl/monitoring/router.py | 3 ++- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/parsl/monitoring/monitoring.py b/parsl/monitoring/monitoring.py index 9dccbecd35..a76e2cf487 100644 --- a/parsl/monitoring/monitoring.py +++ b/parsl/monitoring/monitoring.py @@ -154,9 +154,14 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No self.router_exit_event = Event() self.router_proc = ForkProcess(target=router_starter, - args=(comm_q, self.exception_q, self.priority_msgs, self.node_msgs, - self.block_msgs, self.resource_msgs, self.router_exit_event), - kwargs={"hub_address": self.hub_address, + kwargs={"comm_q": comm_q, + "exception_q": self.exception_q, + "priority_msgs": self.priority_msgs, + "node_msgs": self.node_msgs, + "block_msgs": self.block_msgs, + "resource_msgs": self.resource_msgs, + "exit_event": self.router_exit_event, + "hub_address": self.hub_address, "udp_port": self.hub_port, "zmq_port_range": self.hub_port_range, "logdir": self.logdir, diff --git a/parsl/monitoring/router.py b/parsl/monitoring/router.py index 7cce223048..343410e3a4 100644 --- a/parsl/monitoring/router.py +++ b/parsl/monitoring/router.py @@ -204,7 +204,8 @@ def start_zmq_listener(self) -> None: @wrap_with_logs @typeguard.typechecked -def router_starter(comm_q: mpq.Queue, +def router_starter(*, + comm_q: mpq.Queue, exception_q: mpq.Queue, priority_msgs: mpq.Queue, node_msgs: mpq.Queue, From f1359199e4f9e16f3ad15c3b5e9d53f8471820d0 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Fri, 16 Aug 2024 19:12:10 +0200 Subject: [PATCH 10/15] Remove monitoring queue tag switch monitoring db pre-router (#3587) The main goals of this PR is to make _migrate_logs_to_internal much more clearly a message forwarder, rather than a message interpreter. This follows on from PR #2168 which introduces _dispatch_to_internal to dispatches messages based on their tag rather than on the queue the message was received on, and is part of an ongoing series to simplify the queue and routing structure inside the monitoring router and database code. Further PRs in preparation (in draft PR #3315) contain further simplifications building on this PR. After this PR: * the database manager will respond to a STOP message on any incoming queue, vs previously only on the priority queue. This is a consequence of treating the queues all the same now. * the database manager will not perform such strong validation of message structure based on message tag at this point. That's part of expecting the code to forward messages, not inspect them, with later inspecting code being the place to care abou structure. This only affects behaviour when invalid messages are sent. Related PRs and context: #3567 changes the monitoring router to be more of a router and to not inspect and modify certain in-transit messages. There is a long slow project to regularise queues: PR #2117 makes resource info messages look like other message so they can be dispatched alongside other message types. The priority queue was initially (as I understand it) introduced to attempt to address a race condition of message order arrival vs SQL database key constraints. The priority queue is an attempt to force certain messages to be processed before others (not in the priority queue). However a subsequent commit in 2019, 0a4b68555ce1946e46b96a13f9003e0733252ec6, introduces a more robust approach because this priority queue approach does not work and so is not needed. --- parsl/monitoring/db_manager.py | 43 ++++++++++------------------------ 1 file changed, 13 insertions(+), 30 deletions(-) diff --git a/parsl/monitoring/db_manager.py b/parsl/monitoring/db_manager.py index 853bc4c3c7..053c98d598 100644 --- a/parsl/monitoring/db_manager.py +++ b/parsl/monitoring/db_manager.py @@ -316,7 +316,7 @@ def start(self, self._kill_event = threading.Event() self._priority_queue_pull_thread = threading.Thread(target=self._migrate_logs_to_internal, args=( - priority_queue, 'priority', self._kill_event,), + priority_queue, self._kill_event,), name="Monitoring-migrate-priority", daemon=True, ) @@ -324,7 +324,7 @@ def start(self, self._node_queue_pull_thread = threading.Thread(target=self._migrate_logs_to_internal, args=( - node_queue, 'node', self._kill_event,), + node_queue, self._kill_event,), name="Monitoring-migrate-node", daemon=True, ) @@ -332,7 +332,7 @@ def start(self, self._block_queue_pull_thread = threading.Thread(target=self._migrate_logs_to_internal, args=( - block_queue, 'block', self._kill_event,), + block_queue, self._kill_event,), name="Monitoring-migrate-block", daemon=True, ) @@ -340,7 +340,7 @@ def start(self, self._resource_queue_pull_thread = threading.Thread(target=self._migrate_logs_to_internal, args=( - resource_queue, 'resource', self._kill_event,), + resource_queue, self._kill_event,), name="Monitoring-migrate-resource", daemon=True, ) @@ -577,43 +577,26 @@ def start(self, raise RuntimeError("An exception happened sometime during database processing and should have been logged in database_manager.log") @wrap_with_logs(target="database_manager") - def _migrate_logs_to_internal(self, logs_queue: queue.Queue, queue_tag: str, kill_event: threading.Event) -> None: - logger.info("Starting processing for queue {}".format(queue_tag)) + def _migrate_logs_to_internal(self, logs_queue: queue.Queue, kill_event: threading.Event) -> None: + logger.info("Starting _migrate_logs_to_internal") while not kill_event.is_set() or logs_queue.qsize() != 0: - logger.debug("""Checking STOP conditions for {} threads: {}, {}""" - .format(queue_tag, kill_event.is_set(), logs_queue.qsize() != 0)) + logger.debug("Checking STOP conditions: kill event: %s, queue has entries: %s", + kill_event.is_set(), logs_queue.qsize() != 0) try: x, addr = logs_queue.get(timeout=0.1) except queue.Empty: continue else: - if queue_tag == 'priority' and x == 'STOP': + if x == 'STOP': self.close() - elif queue_tag == 'priority': # implicitly not 'STOP' - assert isinstance(x, tuple) - assert len(x) == 2 - assert x[0] in [MessageType.WORKFLOW_INFO, MessageType.TASK_INFO], \ - "_migrate_logs_to_internal can only migrate WORKFLOW_,TASK_INFO message from priority queue, got x[0] == {}".format(x[0]) - self._dispatch_to_internal(x) - elif queue_tag == 'resource': - assert isinstance(x, tuple), "_migrate_logs_to_internal was expecting a tuple, got {}".format(x) - assert x[0] == MessageType.RESOURCE_INFO, ( - "_migrate_logs_to_internal can only migrate RESOURCE_INFO message from resource queue, " - "got tag {}, message {}".format(x[0], x) - ) - self._dispatch_to_internal(x) - elif queue_tag == 'node': - assert len(x) == 2, "expected message tuple to have exactly two elements" - assert x[0] == MessageType.NODE_INFO, "_migrate_logs_to_internal can only migrate NODE_INFO messages from node queue" - - self._dispatch_to_internal(x) - elif queue_tag == "block": - self._dispatch_to_internal(x) else: - logger.error(f"Discarding because unknown queue tag '{queue_tag}', message: {x}") + self._dispatch_to_internal(x) def _dispatch_to_internal(self, x: Tuple) -> None: + assert isinstance(x, tuple) + assert len(x) == 2, "expected message tuple to have exactly two elements" + if x[0] in [MessageType.WORKFLOW_INFO, MessageType.TASK_INFO]: self.pending_priority_queue.put(cast(Any, x)) elif x[0] == MessageType.RESOURCE_INFO: From 123df5151f71c3f1be76e97f06c0ccf5e8be79d3 Mon Sep 17 00:00:00 2001 From: Yadu Nand Babuji Date: Fri, 16 Aug 2024 14:04:54 -0500 Subject: [PATCH 11/15] Move MPI behavior from HTEX to MPIExecutor (#3582) This PR moves the following MPI related functionality and options from HTEX to MPIExecutor: Kwarg options enable_mpi_mode and mpi_launcher is now removed from HTEX Checks for launcher being set to SimpleLauncher Checks for a valid mpi_launcher in now in MPIExecutor A new validate_resource_specification method is added to HTEX that currently asserts that no resource_specification is passed to it, since HTEX does not support any such options MPIExecutor overrides validate_resource_specification to check for a valid MPI resource specification These changes should make it easier to have executor specific resource validation. Changed Behaviour HTEX kwarg enable_mpi_mode and mpi_launcher are no longer supported. Expect to use MPI functionality only through the MPIExecutor --- parsl/executors/high_throughput/executor.py | 42 ++++------ .../executors/high_throughput/mpi_executor.py | 25 +++++- .../high_throughput/mpi_prefix_composer.py | 9 ++- .../test_resource_spec_validation.py | 40 ++++++++++ .../test_mpi_apps/test_bad_mpi_config.py | 43 ++++++---- .../test_mpi_apps/test_mpi_mode_disabled.py | 47 ----------- .../test_mpi_apps/test_mpi_mode_enabled.py | 24 ++++-- parsl/tests/test_mpi_apps/test_mpiex.py | 5 +- .../tests/test_mpi_apps/test_resource_spec.py | 80 +++++++++---------- test-requirements.txt | 1 + 10 files changed, 171 insertions(+), 145 deletions(-) create mode 100644 parsl/tests/test_htex/test_resource_spec_validation.py delete mode 100644 parsl/tests/test_mpi_apps/test_mpi_mode_disabled.py diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index 301052c4c5..c4097500f1 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -12,7 +12,6 @@ import typeguard -import parsl.launchers from parsl import curvezmq from parsl.addresses import get_all_addresses from parsl.app.errors import RemoteExceptionWrapper @@ -25,8 +24,7 @@ RandomManagerSelector, ) from parsl.executors.high_throughput.mpi_prefix_composer import ( - VALID_LAUNCHERS, - validate_resource_spec, + InvalidResourceSpecification, ) from parsl.executors.status_handling import BlockProviderExecutor from parsl.jobs.states import TERMINAL_STATES, JobState, JobStatus @@ -224,17 +222,6 @@ class HighThroughputExecutor(BlockProviderExecutor, RepresentationMixin, UsageIn Parsl will create names as integers starting with 0. default: empty list - - enable_mpi_mode: bool - If enabled, MPI launch prefixes will be composed for the batch scheduler based on - the nodes available in each batch job and the resource_specification dict passed - from the app. This is an experimental feature, please refer to the following doc section - before use: https://parsl.readthedocs.io/en/stable/userguide/mpi_apps.html - - mpi_launcher: str - This field is only used if enable_mpi_mode is set. Select one from the - list of supported MPI launchers = ("srun", "aprun", "mpiexec"). - default: "mpiexec" """ @typeguard.typechecked @@ -263,8 +250,6 @@ def __init__(self, poll_period: int = 10, address_probe_timeout: Optional[int] = None, worker_logdir_root: Optional[str] = None, - enable_mpi_mode: bool = False, - mpi_launcher: str = "mpiexec", manager_selector: ManagerSelector = RandomManagerSelector(), block_error_handler: Union[bool, Callable[[BlockProviderExecutor, Dict[str, JobStatus]], None]] = True, encrypted: bool = False): @@ -330,15 +315,6 @@ def __init__(self, self.encrypted = encrypted self.cert_dir = None - self.enable_mpi_mode = enable_mpi_mode - assert mpi_launcher in VALID_LAUNCHERS, \ - f"mpi_launcher must be set to one of {VALID_LAUNCHERS}" - if self.enable_mpi_mode: - assert isinstance(self.provider.launcher, parsl.launchers.SimpleLauncher), \ - "mpi_mode requires the provider to be configured to use a SimpleLauncher" - - self.mpi_launcher = mpi_launcher - if not launch_cmd: launch_cmd = DEFAULT_LAUNCH_CMD self.launch_cmd = launch_cmd @@ -348,6 +324,8 @@ def __init__(self, self.interchange_launch_cmd = interchange_launch_cmd radio_mode = "htex" + enable_mpi_mode: bool = False + mpi_launcher: str = "mpiexec" def _warn_deprecated(self, old: str, new: str): warnings.warn( @@ -377,6 +355,18 @@ def worker_logdir(self): return "{}/{}".format(self.worker_logdir_root, self.label) return self.logdir + def validate_resource_spec(self, resource_specification: dict): + """HTEX does not support *any* resource_specification options and + will raise InvalidResourceSpecification is any are passed to it""" + if resource_specification: + raise InvalidResourceSpecification( + set(resource_specification.keys()), + ("HTEX does not support the supplied resource_specifications." + "For MPI applications consider using the MPIExecutor. " + "For specifications for core count/memory/walltime, consider using WorkQueueExecutor. ") + ) + return + def initialize_scaling(self): """Compose the launch command and scale out the initial blocks. """ @@ -660,7 +650,7 @@ def submit(self, func, resource_specification, *args, **kwargs): Future """ - validate_resource_spec(resource_specification, self.enable_mpi_mode) + self.validate_resource_spec(resource_specification) if self.bad_state_is_set: raise self.executor_exception diff --git a/parsl/executors/high_throughput/mpi_executor.py b/parsl/executors/high_throughput/mpi_executor.py index b8045d38b3..04b8cf5197 100644 --- a/parsl/executors/high_throughput/mpi_executor.py +++ b/parsl/executors/high_throughput/mpi_executor.py @@ -8,8 +8,13 @@ GENERAL_HTEX_PARAM_DOCS, HighThroughputExecutor, ) +from parsl.executors.high_throughput.mpi_prefix_composer import ( + VALID_LAUNCHERS, + validate_resource_spec, +) from parsl.executors.status_handling import BlockProviderExecutor from parsl.jobs.states import JobStatus +from parsl.launchers import SimpleLauncher from parsl.providers import LocalProvider from parsl.providers.base import ExecutionProvider @@ -30,6 +35,11 @@ class MPIExecutor(HighThroughputExecutor): max_workers_per_block: int Maximum number of MPI applications to run at once per block + mpi_launcher: str + Select one from the list of supported MPI launchers: + ("srun", "aprun", "mpiexec"). + default: "mpiexec" + {GENERAL_HTEX_PARAM_DOCS} """ @@ -60,7 +70,6 @@ def __init__(self, super().__init__( # Hard-coded settings cores_per_worker=1e-9, # Ensures there will be at least an absurd number of workers - enable_mpi_mode=True, max_workers_per_node=max_workers_per_block, # Everything else @@ -82,9 +91,21 @@ def __init__(self, poll_period=poll_period, address_probe_timeout=address_probe_timeout, worker_logdir_root=worker_logdir_root, - mpi_launcher=mpi_launcher, block_error_handler=block_error_handler, encrypted=encrypted ) + self.enable_mpi_mode = True + self.mpi_launcher = mpi_launcher self.max_workers_per_block = max_workers_per_block + + if not isinstance(self.provider.launcher, SimpleLauncher): + raise TypeError("mpi_mode requires the provider to be configured to use a SimpleLauncher") + + if mpi_launcher not in VALID_LAUNCHERS: + raise ValueError(f"mpi_launcher set to:{mpi_launcher} must be set to one of {VALID_LAUNCHERS}") + + self.mpi_launcher = mpi_launcher + + def validate_resource_spec(self, resource_specification: dict): + return validate_resource_spec(resource_specification) diff --git a/parsl/executors/high_throughput/mpi_prefix_composer.py b/parsl/executors/high_throughput/mpi_prefix_composer.py index 78c5d8b867..0125d9a532 100644 --- a/parsl/executors/high_throughput/mpi_prefix_composer.py +++ b/parsl/executors/high_throughput/mpi_prefix_composer.py @@ -21,14 +21,15 @@ def __str__(self): class InvalidResourceSpecification(Exception): """Exception raised when Invalid input is supplied via resource specification""" - def __init__(self, invalid_keys: Set[str]): + def __init__(self, invalid_keys: Set[str], message: str = ''): self.invalid_keys = invalid_keys + self.message = message def __str__(self): - return f"Invalid resource specification options supplied: {self.invalid_keys}" + return f"Invalid resource specification options supplied: {self.invalid_keys} {self.message}" -def validate_resource_spec(resource_spec: Dict[str, str], is_mpi_enabled: bool): +def validate_resource_spec(resource_spec: Dict[str, str]): """Basic validation of keys in the resource_spec Raises: InvalidResourceSpecification if the resource_spec @@ -38,7 +39,7 @@ def validate_resource_spec(resource_spec: Dict[str, str], is_mpi_enabled: bool): # empty resource_spec when mpi_mode is set causes parsl to hang # ref issue #3427 - if is_mpi_enabled and len(user_keys) == 0: + if len(user_keys) == 0: raise MissingResourceSpecification('MPI mode requires optional parsl_resource_specification keyword argument to be configured') legal_keys = set(("ranks_per_node", diff --git a/parsl/tests/test_htex/test_resource_spec_validation.py b/parsl/tests/test_htex/test_resource_spec_validation.py new file mode 100644 index 0000000000..ac0c580c20 --- /dev/null +++ b/parsl/tests/test_htex/test_resource_spec_validation.py @@ -0,0 +1,40 @@ +import queue +from unittest import mock + +import pytest + +from parsl.executors import HighThroughputExecutor +from parsl.executors.high_throughput.mpi_prefix_composer import ( + InvalidResourceSpecification, +) + + +def double(x): + return x * 2 + + +@pytest.mark.local +def test_submit_calls_validate(): + + htex = HighThroughputExecutor() + htex.outgoing_q = mock.Mock(spec=queue.Queue) + htex.validate_resource_spec = mock.Mock(spec=htex.validate_resource_spec) + + res_spec = {} + htex.submit(double, res_spec, (5,), {}) + htex.validate_resource_spec.assert_called() + + +@pytest.mark.local +def test_resource_spec_validation(): + htex = HighThroughputExecutor() + ret_val = htex.validate_resource_spec({}) + assert ret_val is None + + +@pytest.mark.local +def test_resource_spec_validation_bad_keys(): + htex = HighThroughputExecutor() + + with pytest.raises(InvalidResourceSpecification): + htex.validate_resource_spec({"num_nodes": 2}) diff --git a/parsl/tests/test_mpi_apps/test_bad_mpi_config.py b/parsl/tests/test_mpi_apps/test_bad_mpi_config.py index 336bf87703..ebeb64622d 100644 --- a/parsl/tests/test_mpi_apps/test_bad_mpi_config.py +++ b/parsl/tests/test_mpi_apps/test_bad_mpi_config.py @@ -1,33 +1,48 @@ import pytest from parsl import Config -from parsl.executors import HighThroughputExecutor +from parsl.executors import MPIExecutor from parsl.launchers import AprunLauncher, SimpleLauncher, SrunLauncher from parsl.providers import SlurmProvider @pytest.mark.local -def test_bad_launcher_with_mpi_mode(): - """AssertionError if a launcher other than SimpleLauncher is supplied""" +def test_bad_launcher(): + """TypeError if a launcher other than SimpleLauncher is supplied""" for launcher in [SrunLauncher(), AprunLauncher()]: - with pytest.raises(AssertionError): + with pytest.raises(TypeError): Config(executors=[ - HighThroughputExecutor( - enable_mpi_mode=True, + MPIExecutor( provider=SlurmProvider(launcher=launcher), ) ]) @pytest.mark.local -def test_correct_launcher_with_mpi_mode(): +def test_bad_mpi_launcher(): + """ValueError if an unsupported mpi_launcher is specified""" + + with pytest.raises(ValueError): + Config(executors=[ + MPIExecutor( + mpi_launcher="bad_launcher", + provider=SlurmProvider(launcher=SimpleLauncher()), + ) + ]) + + +@pytest.mark.local +@pytest.mark.parametrize( + "mpi_launcher", + ["srun", "aprun", "mpiexec"] +) +def test_correct_launcher_with_mpi_mode(mpi_launcher: str): """Confirm that SimpleLauncher works with mpi_mode""" - config = Config(executors=[ - HighThroughputExecutor( - enable_mpi_mode=True, - provider=SlurmProvider(launcher=SimpleLauncher()), - ) - ]) - assert isinstance(config.executors[0].provider.launcher, SimpleLauncher) + executor = MPIExecutor( + mpi_launcher=mpi_launcher, + provider=SlurmProvider(launcher=SimpleLauncher()), + ) + + assert isinstance(executor.provider.launcher, SimpleLauncher) diff --git a/parsl/tests/test_mpi_apps/test_mpi_mode_disabled.py b/parsl/tests/test_mpi_apps/test_mpi_mode_disabled.py deleted file mode 100644 index e1e5c70883..0000000000 --- a/parsl/tests/test_mpi_apps/test_mpi_mode_disabled.py +++ /dev/null @@ -1,47 +0,0 @@ -from typing import Dict - -import pytest - -import parsl -from parsl import python_app -from parsl.tests.configs.htex_local import fresh_config - -EXECUTOR_LABEL = "MPI_TEST" - - -def local_config(): - config = fresh_config() - config.executors[0].label = EXECUTOR_LABEL - config.executors[0].max_workers_per_node = 1 - config.executors[0].enable_mpi_mode = False - return config - - -@python_app -def get_env_vars(parsl_resource_specification: Dict = {}) -> Dict: - import os - - parsl_vars = {} - for key in os.environ: - if key.startswith("PARSL_"): - parsl_vars[key] = os.environ[key] - return parsl_vars - - -@pytest.mark.local -def test_only_resource_specs_set(): - """Confirm that resource_spec env vars are set while launch prefixes are not - when enable_mpi_mode = False""" - resource_spec = { - "num_nodes": 4, - "ranks_per_node": 2, - } - - future = get_env_vars(parsl_resource_specification=resource_spec) - - result = future.result() - assert isinstance(result, Dict) - assert "PARSL_DEFAULT_PREFIX" not in result - assert "PARSL_SRUN_PREFIX" not in result - assert result["PARSL_NUM_NODES"] == str(resource_spec["num_nodes"]) - assert result["PARSL_RANKS_PER_NODE"] == str(resource_spec["ranks_per_node"]) diff --git a/parsl/tests/test_mpi_apps/test_mpi_mode_enabled.py b/parsl/tests/test_mpi_apps/test_mpi_mode_enabled.py index 6743d40eba..aff2501674 100644 --- a/parsl/tests/test_mpi_apps/test_mpi_mode_enabled.py +++ b/parsl/tests/test_mpi_apps/test_mpi_mode_enabled.py @@ -6,26 +6,34 @@ import pytest import parsl -from parsl import bash_app, python_app +from parsl import Config, bash_app, python_app +from parsl.executors import MPIExecutor from parsl.executors.high_throughput.mpi_prefix_composer import ( MissingResourceSpecification, ) -from parsl.tests.configs.htex_local import fresh_config +from parsl.launchers import SimpleLauncher +from parsl.providers import LocalProvider EXECUTOR_LABEL = "MPI_TEST" def local_setup(): - config = fresh_config() - config.executors[0].label = EXECUTOR_LABEL - config.executors[0].max_workers_per_node = 2 - config.executors[0].enable_mpi_mode = True - config.executors[0].mpi_launcher = "mpiexec" cwd = os.path.abspath(os.path.dirname(__file__)) pbs_nodefile = os.path.join(cwd, "mocks", "pbs_nodefile") - config.executors[0].provider.worker_init = f"export PBS_NODEFILE={pbs_nodefile}" + config = Config( + executors=[ + MPIExecutor( + label=EXECUTOR_LABEL, + max_workers_per_block=2, + mpi_launcher="mpiexec", + provider=LocalProvider( + worker_init=f"export PBS_NODEFILE={pbs_nodefile}", + launcher=SimpleLauncher() + ) + ) + ]) parsl.load(config) diff --git a/parsl/tests/test_mpi_apps/test_mpiex.py b/parsl/tests/test_mpi_apps/test_mpiex.py index a85547abea..2e8a38bc68 100644 --- a/parsl/tests/test_mpi_apps/test_mpiex.py +++ b/parsl/tests/test_mpi_apps/test_mpiex.py @@ -4,7 +4,6 @@ import pytest -import parsl from parsl import Config, HighThroughputExecutor from parsl.executors.high_throughput.mpi_executor import MPIExecutor from parsl.launchers import SimpleLauncher @@ -42,8 +41,8 @@ def test_docstring(): def test_init(): """Ensure all relevant kwargs are copied over from HTEx""" - new_kwargs = {'max_workers_per_block'} - excluded_kwargs = {'available_accelerators', 'enable_mpi_mode', 'cores_per_worker', 'max_workers_per_node', + new_kwargs = {'max_workers_per_block', 'mpi_launcher'} + excluded_kwargs = {'available_accelerators', 'cores_per_worker', 'max_workers_per_node', 'mem_per_worker', 'cpu_affinity', 'max_workers', 'manager_selector'} # Get the kwargs from both HTEx and MPIEx diff --git a/parsl/tests/test_mpi_apps/test_resource_spec.py b/parsl/tests/test_mpi_apps/test_resource_spec.py index 99d0187ccd..f180c67d52 100644 --- a/parsl/tests/test_mpi_apps/test_resource_spec.py +++ b/parsl/tests/test_mpi_apps/test_resource_spec.py @@ -1,18 +1,20 @@ import contextlib import logging import os +import queue import typing import unittest from typing import Dict +from unittest import mock import pytest -import parsl from parsl.app.app import python_app +from parsl.executors.high_throughput.executor import HighThroughputExecutor +from parsl.executors.high_throughput.mpi_executor import MPIExecutor from parsl.executors.high_throughput.mpi_prefix_composer import ( InvalidResourceSpecification, MissingResourceSpecification, - validate_resource_spec, ) from parsl.executors.high_throughput.mpi_resource_management import ( get_nodes_in_batchjob, @@ -20,6 +22,8 @@ get_slurm_hosts_list, identify_scheduler, ) +from parsl.launchers import SimpleLauncher +from parsl.providers import LocalProvider from parsl.tests.configs.htex_local import fresh_config EXECUTOR_LABEL = "MPI_TEST" @@ -48,23 +52,6 @@ def get_env_vars(parsl_resource_specification: Dict = {}) -> Dict: return parsl_vars -@pytest.mark.local -def test_resource_spec_env_vars(): - resource_spec = { - "num_nodes": 4, - "ranks_per_node": 2, - } - - assert double(5).result() == 10 - - future = get_env_vars(parsl_resource_specification=resource_spec) - - result = future.result() - assert isinstance(result, Dict) - assert result["PARSL_NUM_NODES"] == str(resource_spec["num_nodes"]) - assert result["PARSL_RANKS_PER_NODE"] == str(resource_spec["ranks_per_node"]) - - @pytest.mark.local @unittest.mock.patch("subprocess.check_output", return_value=b"c203-031\nc203-032\n") def test_slurm_mocked_mpi_fetch(subprocess_check): @@ -83,16 +70,6 @@ def add_to_path(path: os.PathLike) -> typing.Generator[None, None, None]: os.environ["PATH"] = old_path -@pytest.mark.local -@pytest.mark.skip -def test_slurm_mpi_fetch(): - logging.warning(f"Current pwd : {os.path.dirname(__file__)}") - with add_to_path(os.path.dirname(__file__)): - logging.warning(f"PATH: {os.environ['PATH']}") - nodeinfo = get_slurm_hosts_list() - logging.warning(f"Got : {nodeinfo}") - - @contextlib.contextmanager def mock_pbs_nodefile(nodefile: str = "pbs_nodefile") -> typing.Generator[None, None, None]: cwd = os.path.abspath(os.path.dirname(__file__)) @@ -122,22 +99,43 @@ def test_top_level(): @pytest.mark.local @pytest.mark.parametrize( - "resource_spec, is_mpi_enabled, exception", + "resource_spec, exception", ( - ({"num_nodes": 2, "ranks_per_node": 1}, False, None), - ({"launcher_options": "--debug_foo"}, False, None), - ({"num_nodes": 2, "BAD_OPT": 1}, False, InvalidResourceSpecification), - ({}, False, None), - ({"num_nodes": 2, "ranks_per_node": 1}, True, None), - ({"launcher_options": "--debug_foo"}, True, None), - ({"num_nodes": 2, "BAD_OPT": 1}, True, InvalidResourceSpecification), - ({}, True, MissingResourceSpecification), + + ({"num_nodes": 2, "ranks_per_node": 1}, None), + ({"launcher_options": "--debug_foo"}, None), + ({"num_nodes": 2, "BAD_OPT": 1}, InvalidResourceSpecification), + ({}, MissingResourceSpecification), ) ) -def test_resource_spec(resource_spec: Dict, is_mpi_enabled: bool, exception): +def test_mpi_resource_spec(resource_spec: Dict, exception): + """Test validation of resource_specification in MPIExecutor""" + + mpi_ex = MPIExecutor(provider=LocalProvider(launcher=SimpleLauncher())) + mpi_ex.outgoing_q = mock.Mock(spec=queue.Queue) + if exception: with pytest.raises(exception): - validate_resource_spec(resource_spec, is_mpi_enabled) + mpi_ex.validate_resource_spec(resource_spec) else: - result = validate_resource_spec(resource_spec, is_mpi_enabled) + result = mpi_ex.validate_resource_spec(resource_spec) assert result is None + + +@pytest.mark.local +@pytest.mark.parametrize( + "resource_spec", + ( + {"num_nodes": 2, "ranks_per_node": 1}, + {"launcher_options": "--debug_foo"}, + {"BAD_OPT": 1}, + ) +) +def test_mpi_resource_spec_passed_to_htex(resource_spec: dict): + """HTEX should reject every resource_spec""" + + htex = HighThroughputExecutor() + htex.outgoing_q = mock.Mock(spec=queue.Queue) + + with pytest.raises(InvalidResourceSpecification): + htex.validate_resource_spec(resource_spec) diff --git a/test-requirements.txt b/test-requirements.txt index 415e995c1b..acd670b5e9 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -8,6 +8,7 @@ pytest-random-order nbsphinx sphinx_rtd_theme mypy==1.5.1 +types-mock types-python-dateutil types-requests types-paramiko From 73f6f657233aa393f829f0361ba89c819218ff79 Mon Sep 17 00:00:00 2001 From: "Daniel S. Katz" Date: Tue, 20 Aug 2024 00:37:37 -0500 Subject: [PATCH 12/15] Add CZI badge to README.rst (#3596) --- README.rst | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/README.rst b/README.rst index 72048d39f4..da7f8245a5 100644 --- a/README.rst +++ b/README.rst @@ -1,6 +1,6 @@ Parsl - Parallel Scripting Library ================================== -|licence| |build-status| |docs| |NSF-1550588| |NSF-1550476| |NSF-1550562| |NSF-1550528| +|licence| |build-status| |docs| |NSF-1550588| |NSF-1550476| |NSF-1550562| |NSF-1550528| |CZI-EOSS| Parsl extends parallelism in Python beyond a single computer. @@ -64,6 +64,9 @@ then explore the `parallel computing patterns Date: Tue, 20 Aug 2024 04:46:13 -0700 Subject: [PATCH 13/15] Fallback to squeue when sacct is missing in SlurmProvider (#3591) Adds internal check to test whether the slurm provider should use the sacct or squeue command. Some slurm clusters might not use the accounting database sacct uses. This allows slurm clusters that use the database to use the sacct command which can be easier on the slurm scheduler, or if the database is not present switch to the squeue command which will should work on all clusters. Fixes #3590 --- parsl/providers/slurm/slurm.py | 50 +++++++++++++++++++++++++++------- 1 file changed, 40 insertions(+), 10 deletions(-) diff --git a/parsl/providers/slurm/slurm.py b/parsl/providers/slurm/slurm.py index ec6abeff56..54b4053fed 100644 --- a/parsl/providers/slurm/slurm.py +++ b/parsl/providers/slurm/slurm.py @@ -20,7 +20,7 @@ logger = logging.getLogger(__name__) # From https://slurm.schedmd.com/sacct.html#SECTION_JOB-STATE-CODES -translate_table = { +sacct_translate_table = { 'PENDING': JobState.PENDING, 'RUNNING': JobState.RUNNING, 'CANCELLED': JobState.CANCELLED, @@ -37,6 +37,20 @@ 'REQUEUED': JobState.PENDING } +squeue_translate_table = { + 'PD': JobState.PENDING, + 'R': JobState.RUNNING, + 'CA': JobState.CANCELLED, + 'CF': JobState.PENDING, # (configuring), + 'CG': JobState.RUNNING, # (completing), + 'CD': JobState.COMPLETED, + 'F': JobState.FAILED, # (failed), + 'TO': JobState.TIMEOUT, # (timeout), + 'NF': JobState.FAILED, # (node failure), + 'RV': JobState.FAILED, # (revoked) and + 'SE': JobState.FAILED # (special exit state) +} + class SlurmProvider(ClusterProvider, RepresentationMixin): """Slurm Execution Provider @@ -155,6 +169,23 @@ def __init__(self, self.regex_job_id = regex_job_id self.worker_init = worker_init + '\n' + # Check if sacct works and if not fall back to squeue + cmd = "sacct -X" + logger.debug("Executing %s", cmd) + retcode, stdout, stderr = self.execute_wait(cmd) + # If sacct fails it should return retcode=1 stderr="Slurm accounting storage is disabled" + logger.debug(f"sacct returned retcode={retcode} stderr={stderr}") + if retcode == 0: + logger.debug("using sacct to get job status") + # Using state%20 to get enough characters to not truncate output + # of the state. Without output can look like " CANCELLED+" + self._cmd = "sacct -X --noheader --format=jobid,state%20 --job '{0}'" + self._translate_table = sacct_translate_table + else: + logger.debug(f"sacct failed with retcode={retcode}") + logger.debug("falling back to using squeue to get job status") + self._cmd = "squeue --noheader --format='%i %t' --job '{0}'" + self._translate_table = squeue_translate_table def _status(self): '''Returns the status list for a list of job_ids @@ -172,16 +203,14 @@ def _status(self): logger.debug('No active jobs, skipping status update') return - # Using state%20 to get enough characters to not truncate output - # of the state. Without output can look like " CANCELLED+" - cmd = "sacct -X --noheader --format=jobid,state%20 --job '{0}'".format(job_id_list) + cmd = self._cmd.format(job_id_list) logger.debug("Executing %s", cmd) retcode, stdout, stderr = self.execute_wait(cmd) - logger.debug("sacct returned %s %s", stdout, stderr) + logger.debug("sacct/squeue returned %s %s", stdout, stderr) # Execute_wait failed. Do no update if retcode != 0: - logger.warning("sacct failed with non-zero exit code {}".format(retcode)) + logger.warning("sacct/squeue failed with non-zero exit code {}".format(retcode)) return jobs_missing = set(self.resources.keys()) @@ -193,9 +222,9 @@ def _status(self): # For example " CANCELLED by " # This splits and ignores anything past the first two unpacked values job_id, slurm_state, *ignore = line.split() - if slurm_state not in translate_table: + if slurm_state not in self._translate_table: logger.warning(f"Slurm status {slurm_state} is not recognized") - status = translate_table.get(slurm_state, JobState.UNKNOWN) + status = self._translate_table.get(slurm_state, JobState.UNKNOWN) logger.debug("Updating job {} with slurm status {} to parsl state {!s}".format(job_id, slurm_state, status)) self.resources[job_id]['status'] = JobStatus(status, stdout_path=self.resources[job_id]['job_stdout_path'], @@ -203,9 +232,10 @@ def _status(self): jobs_missing.remove(job_id) # sacct can get job info after jobs have completed so this path shouldn't be hit - # log a warning if there are missing jobs for some reason + # squeue does not report on jobs that are not running. So we are filling in the + # blanks for missing jobs, we might lose some information about why the jobs failed. for missing_job in jobs_missing: - logger.warning("Updating missing job {} to completed status".format(missing_job)) + logger.debug("Updating missing job {} to completed status".format(missing_job)) self.resources[missing_job]['status'] = JobStatus( JobState.COMPLETED, stdout_path=self.resources[missing_job]['job_stdout_path'], stderr_path=self.resources[missing_job]['job_stderr_path']) From 0fc966f2a284839df6c6662fd369d3530ae31a20 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Tue, 20 Aug 2024 14:16:18 +0200 Subject: [PATCH 14/15] Collapse 4 monitoring router to db queues into 1 queue (#3593) Prior to this PR, there are four multiprocessing queues from the monitoring router process to the database manager process. (also used by the submit process via MultiprocessingQueueRadioSender but that is not so relevant for this PR) Each message arriving at the router goes into MonitoringRouter.start_zmq_listener where it is dispatched based on tag type into one of these four queues towards the monitoring database. In the monitoring database code, no matter which queue the messages arrive on, they are all passed into DatabaseManager._dispatch_to_internal. The four queues then don't provide much functionality - their effect is maybe some non-deterministic message order shuffling. This PR collapses those four queues into a single queue. # Changed Behaviour Messages will arrive at the database manager in possibly different orders. This might flush out more race conditions. The monitoring router would previous validate that a message tag was one of 5 known message tags (as part of choosing which queue to dispatch to). This PR removes that validation. That validation now happens at the receiving end of the (now single) queue, in DatabaseManager._dispatch_to_internal. Error messages related to invalid tags (which should only be coming from development of new message types) will now appear in the database manager process, rather than the router process. --- parsl/monitoring/db_manager.py | 39 ++++------------------------------ parsl/monitoring/monitoring.py | 28 +++++------------------- parsl/monitoring/router.py | 36 +++---------------------------- 3 files changed, 12 insertions(+), 91 deletions(-) diff --git a/parsl/monitoring/db_manager.py b/parsl/monitoring/db_manager.py index 053c98d598..4fcf5ec2e2 100644 --- a/parsl/monitoring/db_manager.py +++ b/parsl/monitoring/db_manager.py @@ -308,35 +308,9 @@ def __init__(self, self.pending_resource_queue: queue.Queue[MonitoringMessage] = queue.Queue() def start(self, - priority_queue: mpq.Queue, - node_queue: mpq.Queue, - block_queue: mpq.Queue, resource_queue: mpq.Queue) -> None: self._kill_event = threading.Event() - self._priority_queue_pull_thread = threading.Thread(target=self._migrate_logs_to_internal, - args=( - priority_queue, self._kill_event,), - name="Monitoring-migrate-priority", - daemon=True, - ) - self._priority_queue_pull_thread.start() - - self._node_queue_pull_thread = threading.Thread(target=self._migrate_logs_to_internal, - args=( - node_queue, self._kill_event,), - name="Monitoring-migrate-node", - daemon=True, - ) - self._node_queue_pull_thread.start() - - self._block_queue_pull_thread = threading.Thread(target=self._migrate_logs_to_internal, - args=( - block_queue, self._kill_event,), - name="Monitoring-migrate-block", - daemon=True, - ) - self._block_queue_pull_thread.start() self._resource_queue_pull_thread = threading.Thread(target=self._migrate_logs_to_internal, args=( @@ -372,20 +346,18 @@ def start(self, while (not self._kill_event.is_set() or self.pending_priority_queue.qsize() != 0 or self.pending_resource_queue.qsize() != 0 or self.pending_node_queue.qsize() != 0 or self.pending_block_queue.qsize() != 0 or - priority_queue.qsize() != 0 or resource_queue.qsize() != 0 or - node_queue.qsize() != 0 or block_queue.qsize() != 0): + resource_queue.qsize() != 0): """ WORKFLOW_INFO and TASK_INFO messages (i.e. priority messages) """ try: - logger.debug("""Checking STOP conditions: {}, {}, {}, {}, {}, {}, {}, {}, {}""".format( + logger.debug("""Checking STOP conditions: {}, {}, {}, {}, {}, {}""".format( self._kill_event.is_set(), self.pending_priority_queue.qsize() != 0, self.pending_resource_queue.qsize() != 0, self.pending_node_queue.qsize() != 0, self.pending_block_queue.qsize() != 0, - priority_queue.qsize() != 0, resource_queue.qsize() != 0, - node_queue.qsize() != 0, block_queue.qsize() != 0)) + resource_queue.qsize() != 0)) # This is the list of resource messages which can be reprocessed as if they # had just arrived because the corresponding first task message has been @@ -707,9 +679,6 @@ def close(self) -> None: @wrap_with_logs(target="database_manager") @typeguard.typechecked def dbm_starter(exception_q: mpq.Queue, - priority_msgs: mpq.Queue, - node_msgs: mpq.Queue, - block_msgs: mpq.Queue, resource_msgs: mpq.Queue, db_url: str, logdir: str, @@ -726,7 +695,7 @@ def dbm_starter(exception_q: mpq.Queue, logdir=logdir, logging_level=logging_level) logger.info("Starting dbm in dbm starter") - dbm.start(priority_msgs, node_msgs, block_msgs, resource_msgs) + dbm.start(resource_msgs) except KeyboardInterrupt: logger.exception("KeyboardInterrupt signal caught") dbm.close() diff --git a/parsl/monitoring/monitoring.py b/parsl/monitoring/monitoring.py index a76e2cf487..e1de80116c 100644 --- a/parsl/monitoring/monitoring.py +++ b/parsl/monitoring/monitoring.py @@ -7,7 +7,7 @@ import time from multiprocessing import Event, Process from multiprocessing.queues import Queue -from typing import TYPE_CHECKING, Any, Optional, Tuple, Union, cast +from typing import TYPE_CHECKING, Any, Literal, Optional, Tuple, Union, cast import typeguard @@ -138,27 +138,15 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No self.exception_q: Queue[Tuple[str, str]] self.exception_q = SizedQueue(maxsize=10) - self.priority_msgs: Queue[Tuple[Any, int]] - self.priority_msgs = SizedQueue() - - self.resource_msgs: Queue[AddressedMonitoringMessage] + self.resource_msgs: Queue[Union[AddressedMonitoringMessage, Tuple[Literal["STOP"], Literal[0]]]] self.resource_msgs = SizedQueue() - self.node_msgs: Queue[AddressedMonitoringMessage] - self.node_msgs = SizedQueue() - - self.block_msgs: Queue[AddressedMonitoringMessage] - self.block_msgs = SizedQueue() - self.router_exit_event: ms.Event self.router_exit_event = Event() self.router_proc = ForkProcess(target=router_starter, kwargs={"comm_q": comm_q, "exception_q": self.exception_q, - "priority_msgs": self.priority_msgs, - "node_msgs": self.node_msgs, - "block_msgs": self.block_msgs, "resource_msgs": self.resource_msgs, "exit_event": self.router_exit_event, "hub_address": self.hub_address, @@ -173,7 +161,7 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No self.router_proc.start() self.dbm_proc = ForkProcess(target=dbm_starter, - args=(self.exception_q, self.priority_msgs, self.node_msgs, self.block_msgs, self.resource_msgs,), + args=(self.exception_q, self.resource_msgs,), kwargs={"logdir": self.logdir, "logging_level": logging.DEBUG if self.monitoring_debug else logging.INFO, "db_url": self.logging_endpoint, @@ -192,7 +180,7 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No self.filesystem_proc.start() logger.info(f"Started filesystem radio receiver process {self.filesystem_proc.pid}") - self.radio = MultiprocessingQueueRadioSender(self.block_msgs) + self.radio = MultiprocessingQueueRadioSender(self.resource_msgs) try: comm_q_result = comm_q.get(block=True, timeout=120) @@ -249,7 +237,7 @@ def close(self) -> None: logger.debug("Finished waiting for router termination") if len(exception_msgs) == 0: logger.debug("Sending STOP to DBM") - self.priority_msgs.put(("STOP", 0)) + self.resource_msgs.put(("STOP", 0)) else: logger.debug("Not sending STOP to DBM, because there were DBM exceptions") logger.debug("Waiting for DB termination") @@ -267,14 +255,8 @@ def close(self) -> None: logger.info("Closing monitoring multiprocessing queues") self.exception_q.close() self.exception_q.join_thread() - self.priority_msgs.close() - self.priority_msgs.join_thread() self.resource_msgs.close() self.resource_msgs.join_thread() - self.node_msgs.close() - self.node_msgs.join_thread() - self.block_msgs.close() - self.block_msgs.join_thread() logger.info("Closed monitoring multiprocessing queues") diff --git a/parsl/monitoring/router.py b/parsl/monitoring/router.py index 343410e3a4..e92386c407 100644 --- a/parsl/monitoring/router.py +++ b/parsl/monitoring/router.py @@ -14,7 +14,6 @@ import zmq from parsl.log_utils import set_file_logger -from parsl.monitoring.message_type import MessageType from parsl.monitoring.types import AddressedMonitoringMessage, TaggedMonitoringMessage from parsl.process_loggers import wrap_with_logs from parsl.utils import setproctitle @@ -34,9 +33,6 @@ def __init__(self, logdir: str = ".", logging_level: int = logging.INFO, atexit_timeout: int = 3, # in seconds - priority_msgs: mpq.Queue, - node_msgs: mpq.Queue, - block_msgs: mpq.Queue, resource_msgs: mpq.Queue, exit_event: Event, ): @@ -57,8 +53,8 @@ def __init__(self, Logging level as defined in the logging module. Default: logging.INFO atexit_timeout : float, optional The amount of time in seconds to terminate the hub without receiving any messages, after the last dfk workflow message is received. - *_msgs : Queue - Four multiprocessing queues to receive messages, routed by type tag, and sometimes modified according to type tag. + resource_msgs : multiprocessing.Queue + A multiprocessing queue to receive messages to be routed onwards to the database process exit_event : Event An event that the main Parsl process will set to signal that the monitoring router should shut down. @@ -102,9 +98,6 @@ def __init__(self, min_port=zmq_port_range[0], max_port=zmq_port_range[1]) - self.priority_msgs = priority_msgs - self.node_msgs = node_msgs - self.block_msgs = block_msgs self.resource_msgs = resource_msgs self.exit_event = exit_event @@ -170,24 +163,7 @@ def start_zmq_listener(self) -> None: msg_0: AddressedMonitoringMessage msg_0 = (msg, 0) - if msg[0] == MessageType.NODE_INFO: - self.node_msgs.put(msg_0) - elif msg[0] == MessageType.RESOURCE_INFO: - self.resource_msgs.put(msg_0) - elif msg[0] == MessageType.BLOCK_INFO: - self.block_msgs.put(msg_0) - elif msg[0] == MessageType.TASK_INFO: - self.priority_msgs.put(msg_0) - elif msg[0] == MessageType.WORKFLOW_INFO: - self.priority_msgs.put(msg_0) - else: - # There is a type: ignore here because if msg[0] - # is of the correct type, this code is unreachable, - # but there is no verification that the message - # received from zmq_receiver_channel.recv_pyobj() is actually - # of that type. - self.logger.error("Discarding message " # type: ignore[unreachable] - f"from interchange with unknown type {msg[0].value}") + self.resource_msgs.put(msg_0) except zmq.Again: pass except Exception: @@ -207,9 +183,6 @@ def start_zmq_listener(self) -> None: def router_starter(*, comm_q: mpq.Queue, exception_q: mpq.Queue, - priority_msgs: mpq.Queue, - node_msgs: mpq.Queue, - block_msgs: mpq.Queue, resource_msgs: mpq.Queue, exit_event: Event, @@ -226,9 +199,6 @@ def router_starter(*, zmq_port_range=zmq_port_range, logdir=logdir, logging_level=logging_level, - priority_msgs=priority_msgs, - node_msgs=node_msgs, - block_msgs=block_msgs, resource_msgs=resource_msgs, exit_event=exit_event) except Exception as e: From b284dc1b068e397ad87fe4154e640af60d364c6d Mon Sep 17 00:00:00 2001 From: Kevin Hunter Kesling Date: Wed, 21 Aug 2024 12:32:32 -0400 Subject: [PATCH 15/15] Non-functional change: minor log call-site updates (#3597) Massage log statements to use argument style consistent with recent practice --- .../executors/high_throughput/interchange.py | 60 ++++++++++--------- 1 file changed, 31 insertions(+), 29 deletions(-) diff --git a/parsl/executors/high_throughput/interchange.py b/parsl/executors/high_throughput/interchange.py index fa0969d398..cd7d0596a9 100644 --- a/parsl/executors/high_throughput/interchange.py +++ b/parsl/executors/high_throughput/interchange.py @@ -375,7 +375,7 @@ def start(self) -> None: self.zmq_context.destroy() delta = time.time() - start - logger.info("Processed {} tasks in {} seconds".format(self.count, delta)) + logger.info(f"Processed {self.count} tasks in {delta} seconds") logger.warning("Exiting") def process_task_outgoing_incoming( @@ -396,9 +396,8 @@ def process_task_outgoing_incoming( try: msg = json.loads(message[1].decode('utf-8')) except Exception: - logger.warning("Got Exception reading message from manager: {!r}".format( - manager_id), exc_info=True) - logger.debug("Message: \n{!r}\n".format(message[1])) + logger.warning(f"Got Exception reading message from manager: {manager_id!r}", exc_info=True) + logger.debug("Message:\n %r\n", message[1]) return # perform a bit of validation on the structure of the deserialized @@ -406,7 +405,7 @@ def process_task_outgoing_incoming( # in obviously malformed cases if not isinstance(msg, dict) or 'type' not in msg: logger.error(f"JSON message was not correctly formatted from manager: {manager_id!r}") - logger.debug("Message: \n{!r}\n".format(message[1])) + logger.debug("Message:\n %r\n", message[1]) return if msg['type'] == 'registration': @@ -425,7 +424,7 @@ def process_task_outgoing_incoming( self.connected_block_history.append(msg['block_id']) interesting_managers.add(manager_id) - logger.info("Adding manager: {!r} to ready queue".format(manager_id)) + logger.info(f"Adding manager: {manager_id!r} to ready queue") m = self._ready_managers[manager_id] # m is a ManagerRecord, but msg is a dict[Any,Any] and so can @@ -434,12 +433,12 @@ def process_task_outgoing_incoming( # later. m.update(msg) # type: ignore[typeddict-item] - logger.info("Registration info for manager {!r}: {}".format(manager_id, msg)) + logger.info(f"Registration info for manager {manager_id!r}: {msg}") self._send_monitoring_info(monitoring_radio, m) if (msg['python_v'].rsplit(".", 1)[0] != self.current_platform['python_v'].rsplit(".", 1)[0] or msg['parsl_v'] != self.current_platform['parsl_v']): - logger.error("Manager {!r} has incompatible version info with the interchange".format(manager_id)) + logger.error(f"Manager {manager_id!r} has incompatible version info with the interchange") logger.debug("Setting kill event") kill_event.set() e = VersionMismatch("py.v={} parsl.v={}".format(self.current_platform['python_v'].rsplit(".", 1)[0], @@ -452,16 +451,15 @@ def process_task_outgoing_incoming( self.results_outgoing.send(pkl_package) logger.error("Sent failure reports, shutting down interchange") else: - logger.info("Manager {!r} has compatible Parsl version {}".format(manager_id, msg['parsl_v'])) - logger.info("Manager {!r} has compatible Python version {}".format(manager_id, - msg['python_v'].rsplit(".", 1)[0])) + logger.info(f"Manager {manager_id!r} has compatible Parsl version {msg['parsl_v']}") + logger.info(f"Manager {manager_id!r} has compatible Python version {msg['python_v'].rsplit('.', 1)[0]}") elif msg['type'] == 'heartbeat': self._ready_managers[manager_id]['last_heartbeat'] = time.time() - logger.debug("Manager {!r} sent heartbeat via tasks connection".format(manager_id)) + logger.debug("Manager %r sent heartbeat via tasks connection", manager_id) self.task_outgoing.send_multipart([manager_id, b'', PKL_HEARTBEAT_CODE]) elif msg['type'] == 'drain': self._ready_managers[manager_id]['draining'] = True - logger.debug(f"Manager {manager_id!r} requested drain") + logger.debug("Manager %r requested drain", manager_id) else: logger.error(f"Unexpected message type received from manager: {msg['type']}") logger.debug("leaving task_outgoing section") @@ -484,9 +482,11 @@ def expire_drained_managers(self, interesting_managers: Set[bytes], monitoring_r def process_tasks_to_send(self, interesting_managers: Set[bytes]) -> None: # Check if there are tasks that could be sent to managers - logger.debug("Managers count (interesting/total): {interesting}/{total}".format( - total=len(self._ready_managers), - interesting=len(interesting_managers))) + logger.debug( + "Managers count (interesting/total): {}/{}", + len(interesting_managers), + len(self._ready_managers) + ) if interesting_managers and not self.pending_task_queue.empty(): shuffled_managers = self.manager_selector.sort_managers(self._ready_managers, interesting_managers) @@ -497,7 +497,7 @@ def process_tasks_to_send(self, interesting_managers: Set[bytes]) -> None: tasks_inflight = len(m['tasks']) real_capacity = m['max_capacity'] - tasks_inflight - if (real_capacity and m['active'] and not m['draining']): + if real_capacity and m["active"] and not m["draining"]: tasks = self.get_tasks(real_capacity) if tasks: self.task_outgoing.send_multipart([manager_id, b'', pickle.dumps(tasks)]) @@ -506,19 +506,19 @@ def process_tasks_to_send(self, interesting_managers: Set[bytes]) -> None: tids = [t['task_id'] for t in tasks] m['tasks'].extend(tids) m['idle_since'] = None - logger.debug("Sent tasks: {} to manager {!r}".format(tids, manager_id)) + logger.debug("Sent tasks: %s to manager %r", tids, manager_id) # recompute real_capacity after sending tasks real_capacity = m['max_capacity'] - tasks_inflight if real_capacity > 0: - logger.debug("Manager {!r} has free capacity {}".format(manager_id, real_capacity)) + logger.debug("Manager %r has free capacity %s", manager_id, real_capacity) # ... so keep it in the interesting_managers list else: - logger.debug("Manager {!r} is now saturated".format(manager_id)) + logger.debug("Manager %r is now saturated", manager_id) interesting_managers.remove(manager_id) else: interesting_managers.remove(manager_id) # logger.debug("Nothing to send to manager {}".format(manager_id)) - logger.debug("leaving _ready_managers section, with {} managers still interesting".format(len(interesting_managers))) + logger.debug("leaving _ready_managers section, with %s managers still interesting", len(interesting_managers)) else: logger.debug("either no interesting managers or no tasks, so skipping manager pass") @@ -528,9 +528,9 @@ def process_results_incoming(self, interesting_managers: Set[bytes], monitoring_ logger.debug("entering results_incoming section") manager_id, *all_messages = self.results_incoming.recv_multipart() if manager_id not in self._ready_managers: - logger.warning("Received a result from a un-registered manager: {!r}".format(manager_id)) + logger.warning(f"Received a result from a un-registered manager: {manager_id!r}") else: - logger.debug(f"Got {len(all_messages)} result items in batch from manager {manager_id!r}") + logger.debug("Got %s result items in batch from manager %r", len(all_messages), manager_id) b_messages = [] @@ -548,10 +548,10 @@ def process_results_incoming(self, interesting_managers: Set[bytes], monitoring_ monitoring_radio.send(r['payload']) elif r['type'] == 'heartbeat': - logger.debug(f"Manager {manager_id!r} sent heartbeat via results connection") + logger.debug("Manager %r sent heartbeat via results connection", manager_id) b_messages.append((p_message, r)) else: - logger.error("Interchange discarding result_queue message of unknown type: {}".format(r['type'])) + logger.error("Interchange discarding result_queue message of unknown type: %s", r["type"]) got_result = False m = self._ready_managers[manager_id] @@ -560,14 +560,16 @@ def process_results_incoming(self, interesting_managers: Set[bytes], monitoring_ if r['type'] == 'result': got_result = True try: - logger.debug(f"Removing task {r['task_id']} from manager record {manager_id!r}") + logger.debug("Removing task %s from manager record %r", r["task_id"], manager_id) m['tasks'].remove(r['task_id']) except Exception: # If we reach here, there's something very wrong. - logger.exception("Ignoring exception removing task_id {} for manager {!r} with task list {}".format( + logger.exception( + "Ignoring exception removing task_id %s for manager %r with task list %s", r['task_id'], manager_id, - m['tasks'])) + m["tasks"] + ) b_messages_to_send = [] for (b_message, _) in b_messages: @@ -578,7 +580,7 @@ def process_results_incoming(self, interesting_managers: Set[bytes], monitoring_ self.results_outgoing.send_multipart(b_messages_to_send) logger.debug("Sent messages on results_outgoing") - logger.debug(f"Current tasks on manager {manager_id!r}: {m['tasks']}") + logger.debug("Current tasks on manager %r: %s", manager_id, m["tasks"]) if len(m['tasks']) == 0 and m['idle_since'] is None: m['idle_since'] = time.time()