diff --git a/README.rst b/README.rst index 72048d39f4..da7f8245a5 100644 --- a/README.rst +++ b/README.rst @@ -1,6 +1,6 @@ Parsl - Parallel Scripting Library ================================== -|licence| |build-status| |docs| |NSF-1550588| |NSF-1550476| |NSF-1550562| |NSF-1550528| +|licence| |build-status| |docs| |NSF-1550588| |NSF-1550476| |NSF-1550562| |NSF-1550528| |CZI-EOSS| Parsl extends parallelism in Python beyond a single computer. @@ -64,6 +64,9 @@ then explore the `parallel computing patterns `_ +* ``parsl.channels.SSHChannel`` now supports a ``key_filename`` kwarg `issue#1639 `_ * Newly added Makefile wraps several frequent developer operations such as: @@ -442,7 +442,7 @@ New Functionality module, parsl.data_provider.globus * `parsl.executors.WorkQueueExecutor`: a new executor that integrates functionality from `Work Queue `_ is now available. -* New provider to support for Ad-Hoc clusters `parsl.providers.AdHocProvider` +* New provider to support for Ad-Hoc clusters ``parsl.providers.AdHocProvider`` * New provider added to support LSF on Summit `parsl.providers.LSFProvider` * Support for CPU and Memory resource hints to providers `(github) `_. * The ``logging_level=logging.INFO`` in `parsl.monitoring.MonitoringHub` is replaced with ``monitoring_debug=False``: @@ -468,7 +468,7 @@ New Functionality * Several test-suite improvements that have dramatically reduced test duration. * Several improvements to the Monitoring interface. -* Configurable port on `parsl.channels.SSHChannel`. +* Configurable port on ``parsl.channels.SSHChannel``. * ``suppress_failure`` now defaults to True. * `parsl.executors.HighThroughputExecutor` is the recommended executor, and ``IPyParallelExecutor`` is deprecated. * `parsl.executors.HighThroughputExecutor` will expose worker information via environment variables: ``PARSL_WORKER_RANK`` and ``PARSL_WORKER_COUNT`` @@ -532,7 +532,7 @@ New Functionality * Cleaner user app file log management. * Updated configurations using `parsl.executors.HighThroughputExecutor` in the configuration section of the userguide. -* Support for OAuth based SSH with `parsl.channels.OAuthSSHChannel`. +* Support for OAuth based SSH with ``parsl.channels.OAuthSSHChannel``. Bug Fixes ^^^^^^^^^ diff --git a/docs/reference.rst b/docs/reference.rst index 1af850792c..d8e18bd244 100644 --- a/docs/reference.rst +++ b/docs/reference.rst @@ -38,15 +38,9 @@ Configuration Channels ======== -.. autosummary:: - :toctree: stubs - :nosignatures: - - parsl.channels.base.Channel - parsl.channels.LocalChannel - parsl.channels.SSHChannel - parsl.channels.OAuthSSHChannel - parsl.channels.SSHInteractiveLoginChannel +Channels are deprecated in Parsl. See +`issue 3515 `_ +for further discussion. Data management =============== @@ -109,7 +103,6 @@ Providers :toctree: stubs :nosignatures: - parsl.providers.AdHocProvider parsl.providers.AWSProvider parsl.providers.CobaltProvider parsl.providers.CondorProvider diff --git a/docs/userguide/configuring.rst b/docs/userguide/configuring.rst index 24ce0ca938..f3fe5cc407 100644 --- a/docs/userguide/configuring.rst +++ b/docs/userguide/configuring.rst @@ -15,7 +15,7 @@ queues, durations, and data management options. The following example shows a basic configuration object (:class:`~parsl.config.Config`) for the Frontera supercomputer at TACC. This config uses the `parsl.executors.HighThroughputExecutor` to submit -tasks from a login node (`parsl.channels.LocalChannel`). It requests an allocation of +tasks from a login node. It requests an allocation of 128 nodes, deploying 1 worker for each of the 56 cores per node, from the normal partition. To limit network connections to just the internal network the config specifies the address used by the infiniband interface with ``address_by_interface('ib0')`` @@ -23,7 +23,6 @@ used by the infiniband interface with ``address_by_interface('ib0')`` .. code-block:: python from parsl.config import Config - from parsl.channels import LocalChannel from parsl.providers import SlurmProvider from parsl.executors import HighThroughputExecutor from parsl.launchers import SrunLauncher @@ -36,7 +35,6 @@ used by the infiniband interface with ``address_by_interface('ib0')`` address=address_by_interface('ib0'), max_workers_per_node=56, provider=SlurmProvider( - channel=LocalChannel(), nodes_per_block=128, init_blocks=1, partition='normal', @@ -197,22 +195,6 @@ Stepping through the following question should help formulate a suitable configu are on a **native Slurm** system like :ref:`configuring_nersc_cori` -4) Where will the main Parsl program run and how will it communicate with the apps? - -+------------------------+--------------------------+---------------------------------------------------+ -| Parsl program location | App execution target | Suitable channel | -+========================+==========================+===================================================+ -| Laptop/Workstation | Laptop/Workstation | `parsl.channels.LocalChannel` | -+------------------------+--------------------------+---------------------------------------------------+ -| Laptop/Workstation | Cloud Resources | No channel is needed | -+------------------------+--------------------------+---------------------------------------------------+ -| Laptop/Workstation | Clusters with no 2FA | `parsl.channels.SSHChannel` | -+------------------------+--------------------------+---------------------------------------------------+ -| Laptop/Workstation | Clusters with 2FA | `parsl.channels.SSHInteractiveLoginChannel` | -+------------------------+--------------------------+---------------------------------------------------+ -| Login node | Cluster/Supercomputer | `parsl.channels.LocalChannel` | -+------------------------+--------------------------+---------------------------------------------------+ - Heterogeneous Resources ----------------------- @@ -324,9 +306,13 @@ and Work Queue does not require Python to run. Accelerators ------------ -Many modern clusters provide multiple accelerators per compute note, yet many applications are best suited to using a single accelerator per task. -Parsl supports pinning each worker to difference accelerators using ``available_accelerators`` option of the :class:`~parsl.executors.HighThroughputExecutor`. -Provide either the number of executors (Parsl will assume they are named in integers starting from zero) or a list of the names of the accelerators available on the node. +Many modern clusters provide multiple accelerators per compute note, yet many applications are best suited to using a +single accelerator per task. Parsl supports pinning each worker to different accelerators using +``available_accelerators`` option of the :class:`~parsl.executors.HighThroughputExecutor`. Provide either the number of +executors (Parsl will assume they are named in integers starting from zero) or a list of the names of the accelerators +available on the node. Parsl will limit the number of workers it launches to the number of accelerators specified, +in other words, you cannot have more workers per node than there are accelerators. By default, Parsl will launch +as many workers as the accelerators specified via ``available_accelerators``. .. code-block:: python @@ -337,7 +323,6 @@ Provide either the number of executors (Parsl will assume they are named in inte worker_debug=True, available_accelerators=2, provider=LocalProvider( - channel=LocalChannel(), init_blocks=1, max_blocks=1, ), @@ -346,7 +331,38 @@ Provide either the number of executors (Parsl will assume they are named in inte strategy='none', ) -For hardware that uses Nvidia devices, Parsl allows for the oversubscription of workers to GPUS. This is intended to make use of Nvidia's `Multi-Process Service (MPS) `_ available on many of their GPUs that allows users to run multiple concurrent processes on a single GPU. The user needs to set in the ``worker_init`` commands to start MPS on every node in the block (this is machine dependent). The ``available_accelerators`` option should then be set to the total number of GPU partitions run on a single node in the block. For example, for a node with 4 Nvidia GPUs, to create 8 workers per GPU, set ``available_accelerators=32``. GPUs will be assigned to workers in ascending order in contiguous blocks. In the example, workers 0-7 will be placed on GPU 0, workers 8-15 on GPU 1, workers 16-23 on GPU 2, and workers 24-31 on GPU 3. +It is possible to bind multiple/specific accelerators to each worker by specifying a list of comma separated strings +each specifying accelerators. In the context of binding to NVIDIA GPUs, this works by setting ``CUDA_VISIBLE_DEVICES`` +on each worker to a specific string in the list supplied to ``available_accelerators``. + +Here's an example: + +.. code-block:: python + + # The following config is trimmed for clarity + local_config = Config( + executors=[ + HighThroughputExecutor( + # Starts 2 workers per node, each bound to 2 GPUs + available_accelerators=["0,1", "2,3"], + + # Start a single worker bound to all 4 GPUs + # available_accelerators=["0,1,2,3"] + ) + ], + ) + +GPU Oversubscription +"""""""""""""""""""" + +For hardware that uses Nvidia devices, Parsl allows for the oversubscription of workers to GPUS. This is intended to +make use of Nvidia's `Multi-Process Service (MPS) `_ available on many of their +GPUs that allows users to run multiple concurrent processes on a single GPU. The user needs to set in the +``worker_init`` commands to start MPS on every node in the block (this is machine dependent). The +``available_accelerators`` option should then be set to the total number of GPU partitions run on a single node in the +block. For example, for a node with 4 Nvidia GPUs, to create 8 workers per GPU, set ``available_accelerators=32``. +GPUs will be assigned to workers in ascending order in contiguous blocks. In the example, workers 0-7 will be placed +on GPU 0, workers 8-15 on GPU 1, workers 16-23 on GPU 2, and workers 24-31 on GPU 3. Multi-Threaded Applications --------------------------- @@ -372,7 +388,6 @@ Select the best blocking strategy for processor's cache hierarchy (choose ``alte worker_debug=True, cpu_affinity='alternating', provider=LocalProvider( - channel=LocalChannel(), init_blocks=1, max_blocks=1, ), @@ -412,18 +427,12 @@ These include ``OMP_NUM_THREADS``, ``GOMP_COMP_AFFINITY``, and ``KMP_THREAD_AFFI Ad-Hoc Clusters --------------- -Any collection of compute nodes without a scheduler can be considered an -ad-hoc cluster. Often these machines have a shared file system such as NFS or Lustre. -In order to use these resources with Parsl, they need to set-up for password-less SSH access. - -To use these ssh-accessible collection of nodes as an ad-hoc cluster, we use -the `parsl.providers.AdHocProvider` with an `parsl.channels.SSHChannel` to each node. An example -configuration follows. +Parsl's support of ad-hoc clusters of compute nodes without a scheduler +is deprecated. -.. literalinclude:: ../../parsl/configs/ad_hoc.py - -.. note:: - Multiple blocks should not be assigned to each node when using the `parsl.executors.HighThroughputExecutor` +See +`issue #3515 `_ +for further discussion. Amazon Web Services ------------------- diff --git a/docs/userguide/examples/config.py b/docs/userguide/examples/config.py index 166faaf4ac..68057d2b01 100644 --- a/docs/userguide/examples/config.py +++ b/docs/userguide/examples/config.py @@ -1,4 +1,3 @@ -from parsl.channels import LocalChannel from parsl.config import Config from parsl.executors import HighThroughputExecutor from parsl.providers import LocalProvider @@ -8,9 +7,7 @@ HighThroughputExecutor( label="htex_local", cores_per_worker=1, - provider=LocalProvider( - channel=LocalChannel(), - ), + provider=LocalProvider(), ) ], ) diff --git a/docs/userguide/execution.rst b/docs/userguide/execution.rst index 4168367f9d..df17dc458f 100644 --- a/docs/userguide/execution.rst +++ b/docs/userguide/execution.rst @@ -47,8 +47,7 @@ Parsl currently supports the following providers: 7. `parsl.providers.AWSProvider`: This provider allows you to provision and manage cloud nodes from Amazon Web Services. 8. `parsl.providers.GoogleCloudProvider`: This provider allows you to provision and manage cloud nodes from Google Cloud. 9. `parsl.providers.KubernetesProvider`: This provider allows you to provision and manage containers on a Kubernetes cluster. -10. `parsl.providers.AdHocProvider`: This provider allows you manage execution over a collection of nodes to form an ad-hoc cluster. -11. `parsl.providers.LSFProvider`: This provider allows you to schedule resources via IBM's LSF scheduler. +10. `parsl.providers.LSFProvider`: This provider allows you to schedule resources via IBM's LSF scheduler. diff --git a/docs/userguide/plugins.rst b/docs/userguide/plugins.rst index 4ecff86cfe..c3c38dea63 100644 --- a/docs/userguide/plugins.rst +++ b/docs/userguide/plugins.rst @@ -16,8 +16,8 @@ executor to run code on the local submitting host, while another executor can run the same code on a large supercomputer. -Providers, Launchers and Channels ---------------------------------- +Providers and Launchers +----------------------- Some executors are based on blocks of workers (for example the `parsl.executors.HighThroughputExecutor`: the submit side requires a batch system (eg slurm, kubernetes) to start worker processes, which then @@ -34,10 +34,9 @@ add on any wrappers that are needed to launch the command (eg srun inside slurm). Providers and launchers are usually paired together for a particular system type. -A `Channel` allows the commands used to interact with an `ExecutionProvider` to be -executed on a remote system. The default channel executes commands on the -local system, but a few variants of an `parsl.channels.SSHChannel` are provided. - +Parsl also has a deprecated ``Channel`` abstraction. See +`issue 3515 `_ +for further discussion. File staging ------------ diff --git a/parsl/channels/__init__.py b/parsl/channels/__init__.py index 5a45d15278..c81f6a8bf1 100644 --- a/parsl/channels/__init__.py +++ b/parsl/channels/__init__.py @@ -1,7 +1,4 @@ from parsl.channels.base import Channel from parsl.channels.local.local import LocalChannel -from parsl.channels.oauth_ssh.oauth_ssh import OAuthSSHChannel -from parsl.channels.ssh.ssh import SSHChannel -from parsl.channels.ssh_il.ssh_il import SSHInteractiveLoginChannel -__all__ = ['Channel', 'SSHChannel', 'LocalChannel', 'SSHInteractiveLoginChannel', 'OAuthSSHChannel'] +__all__ = ['Channel', 'LocalChannel'] diff --git a/parsl/channels/oauth_ssh/oauth_ssh.py b/parsl/channels/oauth_ssh/oauth_ssh.py index c9efa27767..1b690a4e3c 100644 --- a/parsl/channels/oauth_ssh/oauth_ssh.py +++ b/parsl/channels/oauth_ssh/oauth_ssh.py @@ -1,11 +1,15 @@ import logging import socket -import paramiko - -from parsl.channels.ssh.ssh import SSHChannel +from parsl.channels.ssh.ssh import DeprecatedSSHChannel from parsl.errors import OptionalModuleMissing +try: + import paramiko + _ssh_enabled = True +except (ImportError, NameError, FileNotFoundError): + _ssh_enabled = False + try: from oauth_ssh.oauth_ssh_token import find_access_token from oauth_ssh.ssh_service import SSHService @@ -17,7 +21,7 @@ logger = logging.getLogger(__name__) -class OAuthSSHChannel(SSHChannel): +class DeprecatedOAuthSSHChannel(DeprecatedSSHChannel): """SSH persistent channel. This enables remote execution on sites accessible via ssh. This channel uses Globus based OAuth tokens for authentication. """ @@ -38,6 +42,10 @@ def __init__(self, hostname, username=None, script_dir=None, envs=None, port=22) Raises: ''' + if not _ssh_enabled: + raise OptionalModuleMissing(['ssh'], + "OauthSSHChannel requires the ssh module and config.") + if not _oauth_ssh_enabled: raise OptionalModuleMissing(['oauth_ssh'], "OauthSSHChannel requires oauth_ssh module and config.") diff --git a/parsl/channels/ssh/ssh.py b/parsl/channels/ssh/ssh.py index bf33727e63..c53a26b831 100644 --- a/parsl/channels/ssh/ssh.py +++ b/parsl/channels/ssh/ssh.py @@ -2,8 +2,6 @@ import logging import os -import paramiko - from parsl.channels.base import Channel from parsl.channels.errors import ( AuthException, @@ -13,18 +11,27 @@ FileCopyException, SSHException, ) +from parsl.errors import OptionalModuleMissing from parsl.utils import RepresentationMixin +try: + import paramiko + _ssh_enabled = True +except (ImportError, NameError, FileNotFoundError): + _ssh_enabled = False + + logger = logging.getLogger(__name__) -class NoAuthSSHClient(paramiko.SSHClient): - def _auth(self, username, *args): - self._transport.auth_none(username) - return +if _ssh_enabled: + class NoAuthSSHClient(paramiko.SSHClient): + def _auth(self, username, *args): + self._transport.auth_none(username) + return -class SSHChannel(Channel, RepresentationMixin): +class DeprecatedSSHChannel(Channel, RepresentationMixin): ''' SSH persistent channel. This enables remote execution on sites accessible via ssh. It is assumed that the user has setup host keys so as to ssh to the remote host. Which goes to say that the following @@ -53,6 +60,9 @@ def __init__(self, hostname, username=None, password=None, script_dir=None, envs Raises: ''' + if not _ssh_enabled: + raise OptionalModuleMissing(['ssh'], + "SSHChannel requires the ssh module and config.") self.hostname = hostname self.username = username diff --git a/parsl/channels/ssh_il/ssh_il.py b/parsl/channels/ssh_il/ssh_il.py index 02e7a58cd4..67e5501a43 100644 --- a/parsl/channels/ssh_il/ssh_il.py +++ b/parsl/channels/ssh_il/ssh_il.py @@ -1,14 +1,20 @@ import getpass import logging -import paramiko +from parsl.channels.ssh.ssh import DeprecatedSSHChannel +from parsl.errors import OptionalModuleMissing + +try: + import paramiko + _ssh_enabled = True +except (ImportError, NameError, FileNotFoundError): + _ssh_enabled = False -from parsl.channels.ssh.ssh import SSHChannel logger = logging.getLogger(__name__) -class SSHInteractiveLoginChannel(SSHChannel): +class DeprecatedSSHInteractiveLoginChannel(DeprecatedSSHChannel): """SSH persistent channel. This enables remote execution on sites accessible via ssh. This channel supports interactive login and is appropriate when keys are not set up. @@ -30,6 +36,10 @@ def __init__(self, hostname, username=None, password=None, script_dir=None, envs Raises: ''' + if not _ssh_enabled: + raise OptionalModuleMissing(['ssh'], + "SSHInteractiveLoginChannel requires the ssh module and config.") + self.hostname = hostname self.username = username self.password = password diff --git a/parsl/configs/ad_hoc.py b/parsl/configs/ad_hoc.py deleted file mode 100644 index 05b0e8190d..0000000000 --- a/parsl/configs/ad_hoc.py +++ /dev/null @@ -1,38 +0,0 @@ -from typing import Any, Dict - -from parsl.channels import SSHChannel -from parsl.config import Config -from parsl.executors import HighThroughputExecutor -from parsl.providers import AdHocProvider -from parsl.usage_tracking.levels import LEVEL_1 - -user_opts: Dict[str, Dict[str, Any]] -user_opts = {'adhoc': - {'username': 'YOUR_USERNAME', - 'script_dir': 'YOUR_SCRIPT_DIR', - 'remote_hostnames': ['REMOTE_HOST_URL_1', 'REMOTE_HOST_URL_2'] - } - } - - -config = Config( - executors=[ - HighThroughputExecutor( - label='remote_htex', - max_workers_per_node=2, - worker_logdir_root=user_opts['adhoc']['script_dir'], - provider=AdHocProvider( - # Command to be run before starting a worker, such as: - # 'module load Anaconda; source activate parsl_env'. - worker_init='', - channels=[SSHChannel(hostname=m, - username=user_opts['adhoc']['username'], - script_dir=user_opts['adhoc']['script_dir'], - ) for m in user_opts['adhoc']['remote_hostnames']] - ) - ) - ], - # AdHoc Clusters should not be setup with scaling strategy. - strategy='none', - usage_tracking=LEVEL_1, -) diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index 88ef063230..344173c4b1 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -116,7 +116,7 @@ def __init__(self, config: Config) -> None: if self.monitoring: if self.monitoring.logdir is None: self.monitoring.logdir = self.run_dir - self.monitoring.start(self.run_id, self.run_dir, self.config.run_dir) + self.monitoring.start(self.run_dir, self.config.run_dir) self.time_began = datetime.datetime.now() self.time_completed: Optional[datetime.datetime] = None diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index 6c181cdee7..c4097500f1 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -12,7 +12,6 @@ import typeguard -import parsl.launchers from parsl import curvezmq from parsl.addresses import get_all_addresses from parsl.app.errors import RemoteExceptionWrapper @@ -25,8 +24,7 @@ RandomManagerSelector, ) from parsl.executors.high_throughput.mpi_prefix_composer import ( - VALID_LAUNCHERS, - validate_resource_spec, + InvalidResourceSpecification, ) from parsl.executors.status_handling import BlockProviderExecutor from parsl.jobs.states import TERMINAL_STATES, JobState, JobStatus @@ -224,17 +222,6 @@ class HighThroughputExecutor(BlockProviderExecutor, RepresentationMixin, UsageIn Parsl will create names as integers starting with 0. default: empty list - - enable_mpi_mode: bool - If enabled, MPI launch prefixes will be composed for the batch scheduler based on - the nodes available in each batch job and the resource_specification dict passed - from the app. This is an experimental feature, please refer to the following doc section - before use: https://parsl.readthedocs.io/en/stable/userguide/mpi_apps.html - - mpi_launcher: str - This field is only used if enable_mpi_mode is set. Select one from the - list of supported MPI launchers = ("srun", "aprun", "mpiexec"). - default: "mpiexec" """ @typeguard.typechecked @@ -263,8 +250,6 @@ def __init__(self, poll_period: int = 10, address_probe_timeout: Optional[int] = None, worker_logdir_root: Optional[str] = None, - enable_mpi_mode: bool = False, - mpi_launcher: str = "mpiexec", manager_selector: ManagerSelector = RandomManagerSelector(), block_error_handler: Union[bool, Callable[[BlockProviderExecutor, Dict[str, JobStatus]], None]] = True, encrypted: bool = False): @@ -330,15 +315,6 @@ def __init__(self, self.encrypted = encrypted self.cert_dir = None - self.enable_mpi_mode = enable_mpi_mode - assert mpi_launcher in VALID_LAUNCHERS, \ - f"mpi_launcher must be set to one of {VALID_LAUNCHERS}" - if self.enable_mpi_mode: - assert isinstance(self.provider.launcher, parsl.launchers.SimpleLauncher), \ - "mpi_mode requires the provider to be configured to use a SimpleLauncher" - - self.mpi_launcher = mpi_launcher - if not launch_cmd: launch_cmd = DEFAULT_LAUNCH_CMD self.launch_cmd = launch_cmd @@ -348,6 +324,8 @@ def __init__(self, self.interchange_launch_cmd = interchange_launch_cmd radio_mode = "htex" + enable_mpi_mode: bool = False + mpi_launcher: str = "mpiexec" def _warn_deprecated(self, old: str, new: str): warnings.warn( @@ -377,6 +355,18 @@ def worker_logdir(self): return "{}/{}".format(self.worker_logdir_root, self.label) return self.logdir + def validate_resource_spec(self, resource_specification: dict): + """HTEX does not support *any* resource_specification options and + will raise InvalidResourceSpecification is any are passed to it""" + if resource_specification: + raise InvalidResourceSpecification( + set(resource_specification.keys()), + ("HTEX does not support the supplied resource_specifications." + "For MPI applications consider using the MPIExecutor. " + "For specifications for core count/memory/walltime, consider using WorkQueueExecutor. ") + ) + return + def initialize_scaling(self): """Compose the launch command and scale out the initial blocks. """ @@ -551,6 +541,7 @@ def _start_local_interchange_process(self) -> None: "logging_level": logging.DEBUG if self.worker_debug else logging.INFO, "cert_dir": self.cert_dir, "manager_selector": self.manager_selector, + "run_id": self.run_id, } config_pickle = pickle.dumps(interchange_config) @@ -659,7 +650,7 @@ def submit(self, func, resource_specification, *args, **kwargs): Future """ - validate_resource_spec(resource_specification, self.enable_mpi_mode) + self.validate_resource_spec(resource_specification) if self.bad_state_is_set: raise self.executor_exception @@ -831,7 +822,7 @@ def shutdown(self, timeout: float = 10.0): try: self.interchange_proc.wait(timeout=timeout) except subprocess.TimeoutExpired: - logger.info("Unable to terminate Interchange process; sending SIGKILL") + logger.warning("Unable to terminate Interchange process; sending SIGKILL") self.interchange_proc.kill() logger.info("Closing ZMQ pipes") diff --git a/parsl/executors/high_throughput/interchange.py b/parsl/executors/high_throughput/interchange.py index 5da83ae3ca..cd7d0596a9 100644 --- a/parsl/executors/high_throughput/interchange.py +++ b/parsl/executors/high_throughput/interchange.py @@ -55,6 +55,7 @@ def __init__(self, poll_period: int, cert_dir: Optional[str], manager_selector: ManagerSelector, + run_id: str, ) -> None: """ Parameters @@ -125,6 +126,8 @@ def __init__(self, self.command_channel.connect("tcp://{}:{}".format(client_address, client_ports[2])) logger.info("Connected to client") + self.run_id = run_id + self.hub_address = hub_address self.hub_zmq_port = hub_zmq_port @@ -227,6 +230,7 @@ def _send_monitoring_info(self, monitoring_radio: Optional[MonitoringRadioSender d: Dict = cast(Dict, manager.copy()) d['timestamp'] = datetime.datetime.now() d['last_heartbeat'] = datetime.datetime.fromtimestamp(d['last_heartbeat']) + d['run_id'] = self.run_id monitoring_radio.send((MessageType.NODE_INFO, d)) @@ -371,7 +375,7 @@ def start(self) -> None: self.zmq_context.destroy() delta = time.time() - start - logger.info("Processed {} tasks in {} seconds".format(self.count, delta)) + logger.info(f"Processed {self.count} tasks in {delta} seconds") logger.warning("Exiting") def process_task_outgoing_incoming( @@ -392,9 +396,8 @@ def process_task_outgoing_incoming( try: msg = json.loads(message[1].decode('utf-8')) except Exception: - logger.warning("Got Exception reading message from manager: {!r}".format( - manager_id), exc_info=True) - logger.debug("Message: \n{!r}\n".format(message[1])) + logger.warning(f"Got Exception reading message from manager: {manager_id!r}", exc_info=True) + logger.debug("Message:\n %r\n", message[1]) return # perform a bit of validation on the structure of the deserialized @@ -402,7 +405,7 @@ def process_task_outgoing_incoming( # in obviously malformed cases if not isinstance(msg, dict) or 'type' not in msg: logger.error(f"JSON message was not correctly formatted from manager: {manager_id!r}") - logger.debug("Message: \n{!r}\n".format(message[1])) + logger.debug("Message:\n %r\n", message[1]) return if msg['type'] == 'registration': @@ -421,7 +424,7 @@ def process_task_outgoing_incoming( self.connected_block_history.append(msg['block_id']) interesting_managers.add(manager_id) - logger.info("Adding manager: {!r} to ready queue".format(manager_id)) + logger.info(f"Adding manager: {manager_id!r} to ready queue") m = self._ready_managers[manager_id] # m is a ManagerRecord, but msg is a dict[Any,Any] and so can @@ -430,12 +433,12 @@ def process_task_outgoing_incoming( # later. m.update(msg) # type: ignore[typeddict-item] - logger.info("Registration info for manager {!r}: {}".format(manager_id, msg)) + logger.info(f"Registration info for manager {manager_id!r}: {msg}") self._send_monitoring_info(monitoring_radio, m) if (msg['python_v'].rsplit(".", 1)[0] != self.current_platform['python_v'].rsplit(".", 1)[0] or msg['parsl_v'] != self.current_platform['parsl_v']): - logger.error("Manager {!r} has incompatible version info with the interchange".format(manager_id)) + logger.error(f"Manager {manager_id!r} has incompatible version info with the interchange") logger.debug("Setting kill event") kill_event.set() e = VersionMismatch("py.v={} parsl.v={}".format(self.current_platform['python_v'].rsplit(".", 1)[0], @@ -448,16 +451,15 @@ def process_task_outgoing_incoming( self.results_outgoing.send(pkl_package) logger.error("Sent failure reports, shutting down interchange") else: - logger.info("Manager {!r} has compatible Parsl version {}".format(manager_id, msg['parsl_v'])) - logger.info("Manager {!r} has compatible Python version {}".format(manager_id, - msg['python_v'].rsplit(".", 1)[0])) + logger.info(f"Manager {manager_id!r} has compatible Parsl version {msg['parsl_v']}") + logger.info(f"Manager {manager_id!r} has compatible Python version {msg['python_v'].rsplit('.', 1)[0]}") elif msg['type'] == 'heartbeat': self._ready_managers[manager_id]['last_heartbeat'] = time.time() - logger.debug("Manager {!r} sent heartbeat via tasks connection".format(manager_id)) + logger.debug("Manager %r sent heartbeat via tasks connection", manager_id) self.task_outgoing.send_multipart([manager_id, b'', PKL_HEARTBEAT_CODE]) elif msg['type'] == 'drain': self._ready_managers[manager_id]['draining'] = True - logger.debug(f"Manager {manager_id!r} requested drain") + logger.debug("Manager %r requested drain", manager_id) else: logger.error(f"Unexpected message type received from manager: {msg['type']}") logger.debug("leaving task_outgoing section") @@ -480,9 +482,11 @@ def expire_drained_managers(self, interesting_managers: Set[bytes], monitoring_r def process_tasks_to_send(self, interesting_managers: Set[bytes]) -> None: # Check if there are tasks that could be sent to managers - logger.debug("Managers count (interesting/total): {interesting}/{total}".format( - total=len(self._ready_managers), - interesting=len(interesting_managers))) + logger.debug( + "Managers count (interesting/total): {}/{}", + len(interesting_managers), + len(self._ready_managers) + ) if interesting_managers and not self.pending_task_queue.empty(): shuffled_managers = self.manager_selector.sort_managers(self._ready_managers, interesting_managers) @@ -493,7 +497,7 @@ def process_tasks_to_send(self, interesting_managers: Set[bytes]) -> None: tasks_inflight = len(m['tasks']) real_capacity = m['max_capacity'] - tasks_inflight - if (real_capacity and m['active'] and not m['draining']): + if real_capacity and m["active"] and not m["draining"]: tasks = self.get_tasks(real_capacity) if tasks: self.task_outgoing.send_multipart([manager_id, b'', pickle.dumps(tasks)]) @@ -502,19 +506,19 @@ def process_tasks_to_send(self, interesting_managers: Set[bytes]) -> None: tids = [t['task_id'] for t in tasks] m['tasks'].extend(tids) m['idle_since'] = None - logger.debug("Sent tasks: {} to manager {!r}".format(tids, manager_id)) + logger.debug("Sent tasks: %s to manager %r", tids, manager_id) # recompute real_capacity after sending tasks real_capacity = m['max_capacity'] - tasks_inflight if real_capacity > 0: - logger.debug("Manager {!r} has free capacity {}".format(manager_id, real_capacity)) + logger.debug("Manager %r has free capacity %s", manager_id, real_capacity) # ... so keep it in the interesting_managers list else: - logger.debug("Manager {!r} is now saturated".format(manager_id)) + logger.debug("Manager %r is now saturated", manager_id) interesting_managers.remove(manager_id) else: interesting_managers.remove(manager_id) # logger.debug("Nothing to send to manager {}".format(manager_id)) - logger.debug("leaving _ready_managers section, with {} managers still interesting".format(len(interesting_managers))) + logger.debug("leaving _ready_managers section, with %s managers still interesting", len(interesting_managers)) else: logger.debug("either no interesting managers or no tasks, so skipping manager pass") @@ -524,9 +528,9 @@ def process_results_incoming(self, interesting_managers: Set[bytes], monitoring_ logger.debug("entering results_incoming section") manager_id, *all_messages = self.results_incoming.recv_multipart() if manager_id not in self._ready_managers: - logger.warning("Received a result from a un-registered manager: {!r}".format(manager_id)) + logger.warning(f"Received a result from a un-registered manager: {manager_id!r}") else: - logger.debug(f"Got {len(all_messages)} result items in batch from manager {manager_id!r}") + logger.debug("Got %s result items in batch from manager %r", len(all_messages), manager_id) b_messages = [] @@ -544,10 +548,10 @@ def process_results_incoming(self, interesting_managers: Set[bytes], monitoring_ monitoring_radio.send(r['payload']) elif r['type'] == 'heartbeat': - logger.debug(f"Manager {manager_id!r} sent heartbeat via results connection") + logger.debug("Manager %r sent heartbeat via results connection", manager_id) b_messages.append((p_message, r)) else: - logger.error("Interchange discarding result_queue message of unknown type: {}".format(r['type'])) + logger.error("Interchange discarding result_queue message of unknown type: %s", r["type"]) got_result = False m = self._ready_managers[manager_id] @@ -556,14 +560,16 @@ def process_results_incoming(self, interesting_managers: Set[bytes], monitoring_ if r['type'] == 'result': got_result = True try: - logger.debug(f"Removing task {r['task_id']} from manager record {manager_id!r}") + logger.debug("Removing task %s from manager record %r", r["task_id"], manager_id) m['tasks'].remove(r['task_id']) except Exception: # If we reach here, there's something very wrong. - logger.exception("Ignoring exception removing task_id {} for manager {!r} with task list {}".format( + logger.exception( + "Ignoring exception removing task_id %s for manager %r with task list %s", r['task_id'], manager_id, - m['tasks'])) + m["tasks"] + ) b_messages_to_send = [] for (b_message, _) in b_messages: @@ -574,7 +580,7 @@ def process_results_incoming(self, interesting_managers: Set[bytes], monitoring_ self.results_outgoing.send_multipart(b_messages_to_send) logger.debug("Sent messages on results_outgoing") - logger.debug(f"Current tasks on manager {manager_id!r}: {m['tasks']}") + logger.debug("Current tasks on manager %r: %s", manager_id, m["tasks"]) if len(m['tasks']) == 0 and m['idle_since'] is None: m['idle_since'] = time.time() diff --git a/parsl/executors/high_throughput/mpi_executor.py b/parsl/executors/high_throughput/mpi_executor.py index b8045d38b3..04b8cf5197 100644 --- a/parsl/executors/high_throughput/mpi_executor.py +++ b/parsl/executors/high_throughput/mpi_executor.py @@ -8,8 +8,13 @@ GENERAL_HTEX_PARAM_DOCS, HighThroughputExecutor, ) +from parsl.executors.high_throughput.mpi_prefix_composer import ( + VALID_LAUNCHERS, + validate_resource_spec, +) from parsl.executors.status_handling import BlockProviderExecutor from parsl.jobs.states import JobStatus +from parsl.launchers import SimpleLauncher from parsl.providers import LocalProvider from parsl.providers.base import ExecutionProvider @@ -30,6 +35,11 @@ class MPIExecutor(HighThroughputExecutor): max_workers_per_block: int Maximum number of MPI applications to run at once per block + mpi_launcher: str + Select one from the list of supported MPI launchers: + ("srun", "aprun", "mpiexec"). + default: "mpiexec" + {GENERAL_HTEX_PARAM_DOCS} """ @@ -60,7 +70,6 @@ def __init__(self, super().__init__( # Hard-coded settings cores_per_worker=1e-9, # Ensures there will be at least an absurd number of workers - enable_mpi_mode=True, max_workers_per_node=max_workers_per_block, # Everything else @@ -82,9 +91,21 @@ def __init__(self, poll_period=poll_period, address_probe_timeout=address_probe_timeout, worker_logdir_root=worker_logdir_root, - mpi_launcher=mpi_launcher, block_error_handler=block_error_handler, encrypted=encrypted ) + self.enable_mpi_mode = True + self.mpi_launcher = mpi_launcher self.max_workers_per_block = max_workers_per_block + + if not isinstance(self.provider.launcher, SimpleLauncher): + raise TypeError("mpi_mode requires the provider to be configured to use a SimpleLauncher") + + if mpi_launcher not in VALID_LAUNCHERS: + raise ValueError(f"mpi_launcher set to:{mpi_launcher} must be set to one of {VALID_LAUNCHERS}") + + self.mpi_launcher = mpi_launcher + + def validate_resource_spec(self, resource_specification: dict): + return validate_resource_spec(resource_specification) diff --git a/parsl/executors/high_throughput/mpi_prefix_composer.py b/parsl/executors/high_throughput/mpi_prefix_composer.py index 78c5d8b867..0125d9a532 100644 --- a/parsl/executors/high_throughput/mpi_prefix_composer.py +++ b/parsl/executors/high_throughput/mpi_prefix_composer.py @@ -21,14 +21,15 @@ def __str__(self): class InvalidResourceSpecification(Exception): """Exception raised when Invalid input is supplied via resource specification""" - def __init__(self, invalid_keys: Set[str]): + def __init__(self, invalid_keys: Set[str], message: str = ''): self.invalid_keys = invalid_keys + self.message = message def __str__(self): - return f"Invalid resource specification options supplied: {self.invalid_keys}" + return f"Invalid resource specification options supplied: {self.invalid_keys} {self.message}" -def validate_resource_spec(resource_spec: Dict[str, str], is_mpi_enabled: bool): +def validate_resource_spec(resource_spec: Dict[str, str]): """Basic validation of keys in the resource_spec Raises: InvalidResourceSpecification if the resource_spec @@ -38,7 +39,7 @@ def validate_resource_spec(resource_spec: Dict[str, str], is_mpi_enabled: bool): # empty resource_spec when mpi_mode is set causes parsl to hang # ref issue #3427 - if is_mpi_enabled and len(user_keys) == 0: + if len(user_keys) == 0: raise MissingResourceSpecification('MPI mode requires optional parsl_resource_specification keyword argument to be configured') legal_keys = set(("ranks_per_node", diff --git a/parsl/executors/taskvine/executor.py b/parsl/executors/taskvine/executor.py index bebed1a51b..2e1efb211f 100644 --- a/parsl/executors/taskvine/executor.py +++ b/parsl/executors/taskvine/executor.py @@ -589,11 +589,13 @@ def shutdown(self, *args, **kwargs): # Join all processes before exiting logger.debug("Joining on submit process") self._submit_process.join() + self._submit_process.close() logger.debug("Joining on collector thread") self._collector_thread.join() if self.worker_launch_method == 'factory': logger.debug("Joining on factory process") self._factory_process.join() + self._factory_process.close() # Shutdown multiprocessing queues self._ready_task_queue.close() diff --git a/parsl/executors/workqueue/executor.py b/parsl/executors/workqueue/executor.py index a1ad49bca9..ae39f8c118 100644 --- a/parsl/executors/workqueue/executor.py +++ b/parsl/executors/workqueue/executor.py @@ -704,6 +704,8 @@ def shutdown(self, *args, **kwargs): logger.debug("Joining on submit process") self.submit_process.join() + self.submit_process.close() + logger.debug("Joining on collector thread") self.collector_thread.join() diff --git a/parsl/monitoring/db_manager.py b/parsl/monitoring/db_manager.py index 9f19cd9f4d..4fcf5ec2e2 100644 --- a/parsl/monitoring/db_manager.py +++ b/parsl/monitoring/db_manager.py @@ -1,11 +1,14 @@ import datetime import logging +import multiprocessing.queues as mpq import os import queue import threading import time from typing import Any, Dict, List, Optional, Set, Tuple, TypeVar, cast +import typeguard + from parsl.dataflow.states import States from parsl.errors import OptionalModuleMissing from parsl.log_utils import set_file_logger @@ -283,7 +286,7 @@ def __init__(self, ): self.workflow_end = False - self.workflow_start_message = None # type: Optional[MonitoringMessage] + self.workflow_start_message: Optional[MonitoringMessage] = None self.logdir = logdir os.makedirs(self.logdir, exist_ok=True) @@ -299,45 +302,19 @@ def __init__(self, self.batching_interval = batching_interval self.batching_threshold = batching_threshold - self.pending_priority_queue = queue.Queue() # type: queue.Queue[TaggedMonitoringMessage] - self.pending_node_queue = queue.Queue() # type: queue.Queue[MonitoringMessage] - self.pending_block_queue = queue.Queue() # type: queue.Queue[MonitoringMessage] - self.pending_resource_queue = queue.Queue() # type: queue.Queue[MonitoringMessage] + self.pending_priority_queue: queue.Queue[TaggedMonitoringMessage] = queue.Queue() + self.pending_node_queue: queue.Queue[MonitoringMessage] = queue.Queue() + self.pending_block_queue: queue.Queue[MonitoringMessage] = queue.Queue() + self.pending_resource_queue: queue.Queue[MonitoringMessage] = queue.Queue() def start(self, - priority_queue: "queue.Queue[TaggedMonitoringMessage]", - node_queue: "queue.Queue[MonitoringMessage]", - block_queue: "queue.Queue[MonitoringMessage]", - resource_queue: "queue.Queue[MonitoringMessage]") -> None: + resource_queue: mpq.Queue) -> None: self._kill_event = threading.Event() - self._priority_queue_pull_thread = threading.Thread(target=self._migrate_logs_to_internal, - args=( - priority_queue, 'priority', self._kill_event,), - name="Monitoring-migrate-priority", - daemon=True, - ) - self._priority_queue_pull_thread.start() - - self._node_queue_pull_thread = threading.Thread(target=self._migrate_logs_to_internal, - args=( - node_queue, 'node', self._kill_event,), - name="Monitoring-migrate-node", - daemon=True, - ) - self._node_queue_pull_thread.start() - - self._block_queue_pull_thread = threading.Thread(target=self._migrate_logs_to_internal, - args=( - block_queue, 'block', self._kill_event,), - name="Monitoring-migrate-block", - daemon=True, - ) - self._block_queue_pull_thread.start() self._resource_queue_pull_thread = threading.Thread(target=self._migrate_logs_to_internal, args=( - resource_queue, 'resource', self._kill_event,), + resource_queue, self._kill_event,), name="Monitoring-migrate-resource", daemon=True, ) @@ -351,38 +328,36 @@ def start(self, If that happens, the message will be added to deferred_resource_messages and processed later. """ - inserted_tasks = set() # type: Set[object] + inserted_tasks: Set[object] = set() """ like inserted_tasks but for task,try tuples """ - inserted_tries = set() # type: Set[Any] + inserted_tries: Set[Any] = set() # for any task ID, we can defer exactly one message, which is the # assumed-to-be-unique first message (with first message flag set). # The code prior to this patch will discard previous message in # the case of multiple messages to defer. - deferred_resource_messages = {} # type: MonitoringMessage + deferred_resource_messages: MonitoringMessage = {} exception_happened = False while (not self._kill_event.is_set() or self.pending_priority_queue.qsize() != 0 or self.pending_resource_queue.qsize() != 0 or self.pending_node_queue.qsize() != 0 or self.pending_block_queue.qsize() != 0 or - priority_queue.qsize() != 0 or resource_queue.qsize() != 0 or - node_queue.qsize() != 0 or block_queue.qsize() != 0): + resource_queue.qsize() != 0): """ WORKFLOW_INFO and TASK_INFO messages (i.e. priority messages) """ try: - logger.debug("""Checking STOP conditions: {}, {}, {}, {}, {}, {}, {}, {}, {}""".format( + logger.debug("""Checking STOP conditions: {}, {}, {}, {}, {}, {}""".format( self._kill_event.is_set(), self.pending_priority_queue.qsize() != 0, self.pending_resource_queue.qsize() != 0, self.pending_node_queue.qsize() != 0, self.pending_block_queue.qsize() != 0, - priority_queue.qsize() != 0, resource_queue.qsize() != 0, - node_queue.qsize() != 0, block_queue.qsize() != 0)) + resource_queue.qsize() != 0)) # This is the list of resource messages which can be reprocessed as if they # had just arrived because the corresponding first task message has been @@ -505,7 +480,7 @@ def start(self, "Got {} messages from block queue".format(len(block_info_messages))) # block_info_messages is possibly a nested list of dict (at different polling times) # Each dict refers to the info of a job/block at one polling time - block_messages_to_insert = [] # type: List[Any] + block_messages_to_insert: List[Any] = [] for block_msg in block_info_messages: block_messages_to_insert.extend(block_msg) self._insert(table=BLOCK, messages=block_messages_to_insert) @@ -574,43 +549,26 @@ def start(self, raise RuntimeError("An exception happened sometime during database processing and should have been logged in database_manager.log") @wrap_with_logs(target="database_manager") - def _migrate_logs_to_internal(self, logs_queue: queue.Queue, queue_tag: str, kill_event: threading.Event) -> None: - logger.info("Starting processing for queue {}".format(queue_tag)) + def _migrate_logs_to_internal(self, logs_queue: queue.Queue, kill_event: threading.Event) -> None: + logger.info("Starting _migrate_logs_to_internal") while not kill_event.is_set() or logs_queue.qsize() != 0: - logger.debug("""Checking STOP conditions for {} threads: {}, {}""" - .format(queue_tag, kill_event.is_set(), logs_queue.qsize() != 0)) + logger.debug("Checking STOP conditions: kill event: %s, queue has entries: %s", + kill_event.is_set(), logs_queue.qsize() != 0) try: x, addr = logs_queue.get(timeout=0.1) except queue.Empty: continue else: - if queue_tag == 'priority' and x == 'STOP': + if x == 'STOP': self.close() - elif queue_tag == 'priority': # implicitly not 'STOP' - assert isinstance(x, tuple) - assert len(x) == 2 - assert x[0] in [MessageType.WORKFLOW_INFO, MessageType.TASK_INFO], \ - "_migrate_logs_to_internal can only migrate WORKFLOW_,TASK_INFO message from priority queue, got x[0] == {}".format(x[0]) - self._dispatch_to_internal(x) - elif queue_tag == 'resource': - assert isinstance(x, tuple), "_migrate_logs_to_internal was expecting a tuple, got {}".format(x) - assert x[0] == MessageType.RESOURCE_INFO, ( - "_migrate_logs_to_internal can only migrate RESOURCE_INFO message from resource queue, " - "got tag {}, message {}".format(x[0], x) - ) - self._dispatch_to_internal(x) - elif queue_tag == 'node': - assert len(x) == 2, "expected message tuple to have exactly two elements" - assert x[0] == MessageType.NODE_INFO, "_migrate_logs_to_internal can only migrate NODE_INFO messages from node queue" - - self._dispatch_to_internal(x) - elif queue_tag == "block": - self._dispatch_to_internal(x) else: - logger.error(f"Discarding because unknown queue tag '{queue_tag}', message: {x}") + self._dispatch_to_internal(x) def _dispatch_to_internal(self, x: Tuple) -> None: + assert isinstance(x, tuple) + assert len(x) == 2, "expected message tuple to have exactly two elements" + if x[0] in [MessageType.WORKFLOW_INFO, MessageType.TASK_INFO]: self.pending_priority_queue.put(cast(Any, x)) elif x[0] == MessageType.RESOURCE_INFO: @@ -686,7 +644,7 @@ def _insert(self, table: str, messages: List[MonitoringMessage]) -> None: logger.exception("Rollback failed") def _get_messages_in_batch(self, msg_queue: "queue.Queue[X]") -> List[X]: - messages = [] # type: List[X] + messages: List[X] = [] start = time.time() while True: if time.time() - start >= self.batching_interval or len(messages) >= self.batching_threshold: @@ -719,11 +677,9 @@ def close(self) -> None: @wrap_with_logs(target="database_manager") -def dbm_starter(exception_q: "queue.Queue[Tuple[str, str]]", - priority_msgs: "queue.Queue[TaggedMonitoringMessage]", - node_msgs: "queue.Queue[MonitoringMessage]", - block_msgs: "queue.Queue[MonitoringMessage]", - resource_msgs: "queue.Queue[MonitoringMessage]", +@typeguard.typechecked +def dbm_starter(exception_q: mpq.Queue, + resource_msgs: mpq.Queue, db_url: str, logdir: str, logging_level: int) -> None: @@ -739,7 +695,7 @@ def dbm_starter(exception_q: "queue.Queue[Tuple[str, str]]", logdir=logdir, logging_level=logging_level) logger.info("Starting dbm in dbm starter") - dbm.start(priority_msgs, node_msgs, block_msgs, resource_msgs) + dbm.start(resource_msgs) except KeyboardInterrupt: logger.exception("KeyboardInterrupt signal caught") dbm.close() diff --git a/parsl/monitoring/monitoring.py b/parsl/monitoring/monitoring.py index c9a2dc9ed7..e1de80116c 100644 --- a/parsl/monitoring/monitoring.py +++ b/parsl/monitoring/monitoring.py @@ -7,7 +7,7 @@ import time from multiprocessing import Event, Process from multiprocessing.queues import Queue -from typing import TYPE_CHECKING, Any, Optional, Tuple, Union, cast +from typing import TYPE_CHECKING, Any, Literal, Optional, Tuple, Union, cast import typeguard @@ -106,7 +106,7 @@ def __init__(self, self.resource_monitoring_enabled = resource_monitoring_enabled self.resource_monitoring_interval = resource_monitoring_interval - def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> None: + def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> None: logger.debug("Starting MonitoringHub") @@ -138,30 +138,22 @@ def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.Pat self.exception_q: Queue[Tuple[str, str]] self.exception_q = SizedQueue(maxsize=10) - self.priority_msgs: Queue[Tuple[Any, int]] - self.priority_msgs = SizedQueue() - - self.resource_msgs: Queue[AddressedMonitoringMessage] + self.resource_msgs: Queue[Union[AddressedMonitoringMessage, Tuple[Literal["STOP"], Literal[0]]]] self.resource_msgs = SizedQueue() - self.node_msgs: Queue[AddressedMonitoringMessage] - self.node_msgs = SizedQueue() - - self.block_msgs: Queue[AddressedMonitoringMessage] - self.block_msgs = SizedQueue() - self.router_exit_event: ms.Event self.router_exit_event = Event() self.router_proc = ForkProcess(target=router_starter, - args=(comm_q, self.exception_q, self.priority_msgs, self.node_msgs, - self.block_msgs, self.resource_msgs, self.router_exit_event), - kwargs={"hub_address": self.hub_address, + kwargs={"comm_q": comm_q, + "exception_q": self.exception_q, + "resource_msgs": self.resource_msgs, + "exit_event": self.router_exit_event, + "hub_address": self.hub_address, "udp_port": self.hub_port, "zmq_port_range": self.hub_port_range, "logdir": self.logdir, "logging_level": logging.DEBUG if self.monitoring_debug else logging.INFO, - "run_id": run_id }, name="Monitoring-Router-Process", daemon=True, @@ -169,7 +161,7 @@ def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.Pat self.router_proc.start() self.dbm_proc = ForkProcess(target=dbm_starter, - args=(self.exception_q, self.priority_msgs, self.node_msgs, self.block_msgs, self.resource_msgs,), + args=(self.exception_q, self.resource_msgs,), kwargs={"logdir": self.logdir, "logging_level": logging.DEBUG if self.monitoring_debug else logging.INFO, "db_url": self.logging_endpoint, @@ -188,7 +180,7 @@ def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.Pat self.filesystem_proc.start() logger.info(f"Started filesystem radio receiver process {self.filesystem_proc.pid}") - self.radio = MultiprocessingQueueRadioSender(self.block_msgs) + self.radio = MultiprocessingQueueRadioSender(self.resource_msgs) try: comm_q_result = comm_q.get(block=True, timeout=120) @@ -245,7 +237,7 @@ def close(self) -> None: logger.debug("Finished waiting for router termination") if len(exception_msgs) == 0: logger.debug("Sending STOP to DBM") - self.priority_msgs.put(("STOP", 0)) + self.resource_msgs.put(("STOP", 0)) else: logger.debug("Not sending STOP to DBM, because there were DBM exceptions") logger.debug("Waiting for DB termination") @@ -263,14 +255,8 @@ def close(self) -> None: logger.info("Closing monitoring multiprocessing queues") self.exception_q.close() self.exception_q.join_thread() - self.priority_msgs.close() - self.priority_msgs.join_thread() self.resource_msgs.close() self.resource_msgs.join_thread() - self.node_msgs.close() - self.node_msgs.join_thread() - self.block_msgs.close() - self.block_msgs.join_thread() logger.info("Closed monitoring multiprocessing queues") diff --git a/parsl/monitoring/remote.py b/parsl/monitoring/remote.py index 055a013627..d374338dee 100644 --- a/parsl/monitoring/remote.py +++ b/parsl/monitoring/remote.py @@ -199,10 +199,10 @@ def monitor(pid: int, pm = psutil.Process(pid) - children_user_time = {} # type: Dict[int, float] - children_system_time = {} # type: Dict[int, float] - children_num_ctx_switches_voluntary = {} # type: Dict[int, float] - children_num_ctx_switches_involuntary = {} # type: Dict[int, float] + children_user_time: Dict[int, float] = {} + children_system_time: Dict[int, float] = {} + children_num_ctx_switches_voluntary: Dict[int, float] = {} + children_num_ctx_switches_involuntary: Dict[int, float] = {} def accumulate_and_prepare() -> Dict[str, Any]: d = {"psutil_process_" + str(k): v for k, v in pm.as_dict().items() if k in simple} diff --git a/parsl/monitoring/router.py b/parsl/monitoring/router.py index bf395e3662..e92386c407 100644 --- a/parsl/monitoring/router.py +++ b/parsl/monitoring/router.py @@ -1,19 +1,19 @@ from __future__ import annotations import logging +import multiprocessing.queues as mpq import os import pickle -import queue import socket import threading import time from multiprocessing.synchronize import Event -from typing import Optional, Tuple, Union +from typing import Optional, Tuple +import typeguard import zmq from parsl.log_utils import set_file_logger -from parsl.monitoring.message_type import MessageType from parsl.monitoring.types import AddressedMonitoringMessage, TaggedMonitoringMessage from parsl.process_loggers import wrap_with_logs from parsl.utils import setproctitle @@ -31,13 +31,9 @@ def __init__(self, monitoring_hub_address: str = "127.0.0.1", logdir: str = ".", - run_id: str, logging_level: int = logging.INFO, atexit_timeout: int = 3, # in seconds - priority_msgs: "queue.Queue[AddressedMonitoringMessage]", - node_msgs: "queue.Queue[AddressedMonitoringMessage]", - block_msgs: "queue.Queue[AddressedMonitoringMessage]", - resource_msgs: "queue.Queue[AddressedMonitoringMessage]", + resource_msgs: mpq.Queue, exit_event: Event, ): """ Initializes a monitoring configuration class. @@ -57,8 +53,8 @@ def __init__(self, Logging level as defined in the logging module. Default: logging.INFO atexit_timeout : float, optional The amount of time in seconds to terminate the hub without receiving any messages, after the last dfk workflow message is received. - *_msgs : Queue - Four multiprocessing queues to receive messages, routed by type tag, and sometimes modified according to type tag. + resource_msgs : multiprocessing.Queue + A multiprocessing queue to receive messages to be routed onwards to the database process exit_event : Event An event that the main Parsl process will set to signal that the monitoring router should shut down. @@ -71,7 +67,6 @@ def __init__(self, self.hub_address = hub_address self.atexit_timeout = atexit_timeout - self.run_id = run_id self.loop_freq = 10.0 # milliseconds @@ -103,9 +98,6 @@ def __init__(self, min_port=zmq_port_range[0], max_port=zmq_port_range[1]) - self.priority_msgs = priority_msgs - self.node_msgs = node_msgs - self.block_msgs = block_msgs self.resource_msgs = resource_msgs self.exit_event = exit_event @@ -171,25 +163,7 @@ def start_zmq_listener(self) -> None: msg_0: AddressedMonitoringMessage msg_0 = (msg, 0) - if msg[0] == MessageType.NODE_INFO: - msg[1]['run_id'] = self.run_id - self.node_msgs.put(msg_0) - elif msg[0] == MessageType.RESOURCE_INFO: - self.resource_msgs.put(msg_0) - elif msg[0] == MessageType.BLOCK_INFO: - self.block_msgs.put(msg_0) - elif msg[0] == MessageType.TASK_INFO: - self.priority_msgs.put(msg_0) - elif msg[0] == MessageType.WORKFLOW_INFO: - self.priority_msgs.put(msg_0) - else: - # There is a type: ignore here because if msg[0] - # is of the correct type, this code is unreachable, - # but there is no verification that the message - # received from zmq_receiver_channel.recv_pyobj() is actually - # of that type. - self.logger.error("Discarding message " # type: ignore[unreachable] - f"from interchange with unknown type {msg[0].value}") + self.resource_msgs.put(msg_0) except zmq.Again: pass except Exception: @@ -205,12 +179,11 @@ def start_zmq_listener(self) -> None: @wrap_with_logs -def router_starter(comm_q: "queue.Queue[Union[Tuple[int, int], str]]", - exception_q: "queue.Queue[Tuple[str, str]]", - priority_msgs: "queue.Queue[AddressedMonitoringMessage]", - node_msgs: "queue.Queue[AddressedMonitoringMessage]", - block_msgs: "queue.Queue[AddressedMonitoringMessage]", - resource_msgs: "queue.Queue[AddressedMonitoringMessage]", +@typeguard.typechecked +def router_starter(*, + comm_q: mpq.Queue, + exception_q: mpq.Queue, + resource_msgs: mpq.Queue, exit_event: Event, hub_address: str, @@ -218,8 +191,7 @@ def router_starter(comm_q: "queue.Queue[Union[Tuple[int, int], str]]", zmq_port_range: Tuple[int, int], logdir: str, - logging_level: int, - run_id: str) -> None: + logging_level: int) -> None: setproctitle("parsl: monitoring router") try: router = MonitoringRouter(hub_address=hub_address, @@ -227,10 +199,6 @@ def router_starter(comm_q: "queue.Queue[Union[Tuple[int, int], str]]", zmq_port_range=zmq_port_range, logdir=logdir, logging_level=logging_level, - run_id=run_id, - priority_msgs=priority_msgs, - node_msgs=node_msgs, - block_msgs=block_msgs, resource_msgs=resource_msgs, exit_event=exit_event) except Exception as e: diff --git a/parsl/providers/__init__.py b/parsl/providers/__init__.py index 475737f1f9..150f425f3d 100644 --- a/parsl/providers/__init__.py +++ b/parsl/providers/__init__.py @@ -1,6 +1,3 @@ -# Workstation Provider -from parsl.providers.ad_hoc.ad_hoc import AdHocProvider - # Cloud Providers from parsl.providers.aws.aws import AWSProvider from parsl.providers.azure.azure import AzureProvider @@ -24,7 +21,6 @@ 'SlurmProvider', 'TorqueProvider', 'LSFProvider', - 'AdHocProvider', 'PBSProProvider', 'AWSProvider', 'GoogleCloudProvider', diff --git a/parsl/providers/ad_hoc/ad_hoc.py b/parsl/providers/ad_hoc/ad_hoc.py index 207dd55738..9059648101 100644 --- a/parsl/providers/ad_hoc/ad_hoc.py +++ b/parsl/providers/ad_hoc/ad_hoc.py @@ -12,8 +12,12 @@ logger = logging.getLogger(__name__) -class AdHocProvider(ExecutionProvider, RepresentationMixin): - """ Ad-hoc execution provider +class DeprecatedAdHocProvider(ExecutionProvider, RepresentationMixin): + """ Deprecated ad-hoc execution provider + + The (former) AdHocProvider is deprecated. See + `issue #3515 `_ + for further discussion. This provider is used to provision execution resources over one or more ad hoc nodes that are each accessible over a Channel (say, ssh) but otherwise lack a cluster scheduler. diff --git a/parsl/providers/slurm/slurm.py b/parsl/providers/slurm/slurm.py index ec6abeff56..54b4053fed 100644 --- a/parsl/providers/slurm/slurm.py +++ b/parsl/providers/slurm/slurm.py @@ -20,7 +20,7 @@ logger = logging.getLogger(__name__) # From https://slurm.schedmd.com/sacct.html#SECTION_JOB-STATE-CODES -translate_table = { +sacct_translate_table = { 'PENDING': JobState.PENDING, 'RUNNING': JobState.RUNNING, 'CANCELLED': JobState.CANCELLED, @@ -37,6 +37,20 @@ 'REQUEUED': JobState.PENDING } +squeue_translate_table = { + 'PD': JobState.PENDING, + 'R': JobState.RUNNING, + 'CA': JobState.CANCELLED, + 'CF': JobState.PENDING, # (configuring), + 'CG': JobState.RUNNING, # (completing), + 'CD': JobState.COMPLETED, + 'F': JobState.FAILED, # (failed), + 'TO': JobState.TIMEOUT, # (timeout), + 'NF': JobState.FAILED, # (node failure), + 'RV': JobState.FAILED, # (revoked) and + 'SE': JobState.FAILED # (special exit state) +} + class SlurmProvider(ClusterProvider, RepresentationMixin): """Slurm Execution Provider @@ -155,6 +169,23 @@ def __init__(self, self.regex_job_id = regex_job_id self.worker_init = worker_init + '\n' + # Check if sacct works and if not fall back to squeue + cmd = "sacct -X" + logger.debug("Executing %s", cmd) + retcode, stdout, stderr = self.execute_wait(cmd) + # If sacct fails it should return retcode=1 stderr="Slurm accounting storage is disabled" + logger.debug(f"sacct returned retcode={retcode} stderr={stderr}") + if retcode == 0: + logger.debug("using sacct to get job status") + # Using state%20 to get enough characters to not truncate output + # of the state. Without output can look like " CANCELLED+" + self._cmd = "sacct -X --noheader --format=jobid,state%20 --job '{0}'" + self._translate_table = sacct_translate_table + else: + logger.debug(f"sacct failed with retcode={retcode}") + logger.debug("falling back to using squeue to get job status") + self._cmd = "squeue --noheader --format='%i %t' --job '{0}'" + self._translate_table = squeue_translate_table def _status(self): '''Returns the status list for a list of job_ids @@ -172,16 +203,14 @@ def _status(self): logger.debug('No active jobs, skipping status update') return - # Using state%20 to get enough characters to not truncate output - # of the state. Without output can look like " CANCELLED+" - cmd = "sacct -X --noheader --format=jobid,state%20 --job '{0}'".format(job_id_list) + cmd = self._cmd.format(job_id_list) logger.debug("Executing %s", cmd) retcode, stdout, stderr = self.execute_wait(cmd) - logger.debug("sacct returned %s %s", stdout, stderr) + logger.debug("sacct/squeue returned %s %s", stdout, stderr) # Execute_wait failed. Do no update if retcode != 0: - logger.warning("sacct failed with non-zero exit code {}".format(retcode)) + logger.warning("sacct/squeue failed with non-zero exit code {}".format(retcode)) return jobs_missing = set(self.resources.keys()) @@ -193,9 +222,9 @@ def _status(self): # For example " CANCELLED by " # This splits and ignores anything past the first two unpacked values job_id, slurm_state, *ignore = line.split() - if slurm_state not in translate_table: + if slurm_state not in self._translate_table: logger.warning(f"Slurm status {slurm_state} is not recognized") - status = translate_table.get(slurm_state, JobState.UNKNOWN) + status = self._translate_table.get(slurm_state, JobState.UNKNOWN) logger.debug("Updating job {} with slurm status {} to parsl state {!s}".format(job_id, slurm_state, status)) self.resources[job_id]['status'] = JobStatus(status, stdout_path=self.resources[job_id]['job_stdout_path'], @@ -203,9 +232,10 @@ def _status(self): jobs_missing.remove(job_id) # sacct can get job info after jobs have completed so this path shouldn't be hit - # log a warning if there are missing jobs for some reason + # squeue does not report on jobs that are not running. So we are filling in the + # blanks for missing jobs, we might lose some information about why the jobs failed. for missing_job in jobs_missing: - logger.warning("Updating missing job {} to completed status".format(missing_job)) + logger.debug("Updating missing job {} to completed status".format(missing_job)) self.resources[missing_job]['status'] = JobStatus( JobState.COMPLETED, stdout_path=self.resources[missing_job]['job_stdout_path'], stderr_path=self.resources[missing_job]['job_stderr_path']) diff --git a/parsl/tests/configs/ad_hoc_cluster_htex.py b/parsl/tests/configs/ad_hoc_cluster_htex.py deleted file mode 100644 index 0949b82392..0000000000 --- a/parsl/tests/configs/ad_hoc_cluster_htex.py +++ /dev/null @@ -1,35 +0,0 @@ -from typing import Any, Dict - -from parsl.channels import SSHChannel -from parsl.config import Config -from parsl.executors import HighThroughputExecutor -from parsl.providers import AdHocProvider - -user_opts = {'adhoc': - {'username': 'YOUR_USERNAME', - 'script_dir': 'YOUR_SCRIPT_DIR', - 'remote_hostnames': ['REMOTE_HOST_URL_1', 'REMOTE_HOST_URL_2'] - } - } # type: Dict[str, Dict[str, Any]] - -config = Config( - executors=[ - HighThroughputExecutor( - label='remote_htex', - max_workers_per_node=2, - worker_logdir_root=user_opts['adhoc']['script_dir'], - encrypted=True, - provider=AdHocProvider( - # Command to be run before starting a worker, such as: - # 'module load Anaconda; source activate parsl_env'. - worker_init='', - channels=[SSHChannel(hostname=m, - username=user_opts['adhoc']['username'], - script_dir=user_opts['adhoc']['script_dir'], - ) for m in user_opts['adhoc']['remote_hostnames']] - ) - ) - ], - # AdHoc Clusters should not be setup with scaling strategy. - strategy='none', -) diff --git a/parsl/tests/configs/htex_ad_hoc_cluster.py b/parsl/tests/configs/htex_ad_hoc_cluster.py deleted file mode 100644 index db24b42ab2..0000000000 --- a/parsl/tests/configs/htex_ad_hoc_cluster.py +++ /dev/null @@ -1,26 +0,0 @@ -from parsl.channels import SSHChannel -from parsl.config import Config -from parsl.executors import HighThroughputExecutor -from parsl.providers import AdHocProvider -from parsl.tests.configs.user_opts import user_opts - -config = Config( - executors=[ - HighThroughputExecutor( - label='remote_htex', - cores_per_worker=1, - worker_debug=False, - address=user_opts['public_ip'], - encrypted=True, - provider=AdHocProvider( - move_files=False, - parallelism=1, - worker_init=user_opts['adhoc']['worker_init'], - channels=[SSHChannel(hostname=m, - username=user_opts['adhoc']['username'], - script_dir=user_opts['adhoc']['script_dir'], - ) for m in user_opts['adhoc']['remote_hostnames']] - ) - ) - ], -) diff --git a/parsl/tests/configs/local_adhoc.py b/parsl/tests/configs/local_adhoc.py index 25b1f38d61..9b1f951842 100644 --- a/parsl/tests/configs/local_adhoc.py +++ b/parsl/tests/configs/local_adhoc.py @@ -1,7 +1,7 @@ from parsl.channels import LocalChannel from parsl.config import Config from parsl.executors import HighThroughputExecutor -from parsl.providers import AdHocProvider +from parsl.providers.ad_hoc.ad_hoc import DeprecatedAdHocProvider def fresh_config(): @@ -10,7 +10,7 @@ def fresh_config(): HighThroughputExecutor( label='AdHoc', encrypted=True, - provider=AdHocProvider( + provider=DeprecatedAdHocProvider( channels=[LocalChannel(), LocalChannel()] ) ) diff --git a/parsl/tests/configs/swan_htex.py b/parsl/tests/configs/swan_htex.py deleted file mode 100644 index 3b1b6785ab..0000000000 --- a/parsl/tests/configs/swan_htex.py +++ /dev/null @@ -1,43 +0,0 @@ -""" -================== Block -| ++++++++++++++ | Node -| | | | -| | Task | | . . . -| | | | -| ++++++++++++++ | -================== -""" -from parsl.channels import SSHChannel -from parsl.config import Config -from parsl.executors import HighThroughputExecutor -from parsl.launchers import AprunLauncher -from parsl.providers import TorqueProvider - -# If you are a developer running tests, make sure to update parsl/tests/configs/user_opts.py -# If you are a user copying-and-pasting this as an example, make sure to either -# 1) create a local `user_opts.py`, or -# 2) delete the user_opts import below and replace all appearances of `user_opts` with the literal value -# (i.e., user_opts['swan']['username'] -> 'your_username') -from .user_opts import user_opts - -config = Config( - executors=[ - HighThroughputExecutor( - label='swan_htex', - encrypted=True, - provider=TorqueProvider( - channel=SSHChannel( - hostname='swan.cray.com', - username=user_opts['swan']['username'], - script_dir=user_opts['swan']['script_dir'], - ), - nodes_per_block=1, - init_blocks=1, - max_blocks=1, - launcher=AprunLauncher(), - scheduler_options=user_opts['swan']['scheduler_options'], - worker_init=user_opts['swan']['worker_init'], - ), - ) - ] -) diff --git a/parsl/tests/integration/test_channels/test_scp_1.py b/parsl/tests/integration/test_channels/test_scp_1.py deleted file mode 100644 index c11df3c663..0000000000 --- a/parsl/tests/integration/test_channels/test_scp_1.py +++ /dev/null @@ -1,45 +0,0 @@ -import os - -from parsl.channels.ssh.ssh import SSHChannel as SSH - - -def connect_and_list(hostname, username): - out = '' - conn = SSH(hostname, username=username) - conn.push_file(os.path.abspath('remote_run.sh'), '/home/davidk/') - # ec, out, err = conn.execute_wait("ls /tmp/remote_run.sh; bash /tmp/remote_run.sh") - conn.close() - return out - - -script = '''#!/bin/bash -echo "Hostname: $HOSTNAME" -echo "Cpu info -----" -cat /proc/cpuinfo -echo "Done----------" -''' - - -def test_connect_1(): - with open('remote_run.sh', 'w') as f: - f.write(script) - - sites = { - 'midway': { - 'url': 'midway.rcc.uchicago.edu', - 'uname': 'yadunand' - }, - 'swift': { - 'url': 'swift.rcc.uchicago.edu', - 'uname': 'yadunand' - } - } - - for site in sites.values(): - out = connect_and_list(site['url'], site['uname']) - print("Sitename :{0} hostname:{1}".format(site['url'], out)) - - -if __name__ == "__main__": - - test_connect_1() diff --git a/parsl/tests/integration/test_channels/test_ssh_1.py b/parsl/tests/integration/test_channels/test_ssh_1.py deleted file mode 100644 index 61ab3f2705..0000000000 --- a/parsl/tests/integration/test_channels/test_ssh_1.py +++ /dev/null @@ -1,40 +0,0 @@ -from parsl.channels.ssh.ssh import SSHChannel as SSH - - -def connect_and_list(hostname, username): - conn = SSH(hostname, username=username) - ec, out, err = conn.execute_wait("echo $HOSTNAME") - conn.close() - return out - - -def test_midway(): - ''' Test ssh channels to midway - ''' - url = 'midway.rcc.uchicago.edu' - uname = 'yadunand' - out = connect_and_list(url, uname) - print("Sitename :{0} hostname:{1}".format(url, out)) - - -def test_beagle(): - ''' Test ssh channels to beagle - ''' - url = 'login04.beagle.ci.uchicago.edu' - uname = 'yadunandb' - out = connect_and_list(url, uname) - print("Sitename :{0} hostname:{1}".format(url, out)) - - -def test_osg(): - ''' Test ssh connectivity to osg - ''' - url = 'login.osgconnect.net' - uname = 'yadunand' - out = connect_and_list(url, uname) - print("Sitename :{0} hostname:{1}".format(url, out)) - - -if __name__ == "__main__": - - pass diff --git a/parsl/tests/integration/test_channels/test_ssh_errors.py b/parsl/tests/integration/test_channels/test_ssh_errors.py deleted file mode 100644 index 7483e30a5c..0000000000 --- a/parsl/tests/integration/test_channels/test_ssh_errors.py +++ /dev/null @@ -1,46 +0,0 @@ -from parsl.channels.errors import BadHostKeyException, SSHException -from parsl.channels.ssh.ssh import SSHChannel as SSH - - -def connect_and_list(hostname, username): - conn = SSH(hostname, username=username) - ec, out, err = conn.execute_wait("echo $HOSTNAME") - conn.close() - return out - - -def test_error_1(): - try: - connect_and_list("bad.url.gov", "ubuntu") - except Exception as e: - assert type(e) is SSHException, "Expected SSException, got: {0}".format(e) - - -def test_error_2(): - try: - connect_and_list("swift.rcc.uchicago.edu", "mango") - except SSHException: - print("Caught the right exception") - else: - raise Exception("Expected SSException, got: {0}".format(e)) - - -def test_error_3(): - ''' This should work - ''' - try: - connect_and_list("edison.nersc.gov", "yadunand") - except BadHostKeyException as e: - print("Caught exception BadHostKeyException: ", e) - else: - assert False, "Expected SSException, got: {0}".format(e) - - -if __name__ == "__main__": - - tests = [test_error_1, test_error_2, test_error_3] - - for test in tests: - print("---------Running : {0}---------------".format(test)) - test() - print("----------------------DONE--------------------------") diff --git a/parsl/tests/integration/test_channels/test_ssh_file_transport.py b/parsl/tests/integration/test_channels/test_ssh_file_transport.py deleted file mode 100644 index 61672c3ff5..0000000000 --- a/parsl/tests/integration/test_channels/test_ssh_file_transport.py +++ /dev/null @@ -1,41 +0,0 @@ -import parsl -from parsl.channels.ssh.ssh import SSHChannel as SSH - - -def connect_and_list(hostname, username): - conn = SSH(hostname, username=username) - ec, out, err = conn.execute_wait("echo $HOSTNAME") - conn.close() - return out - - -def test_push(conn, fname="test001.txt"): - - with open(fname, 'w') as f: - f.write("Hello from parsl.ssh testing\n") - - conn.push_file(fname, "/tmp") - ec, out, err = conn.execute_wait("ls /tmp/{0}".format(fname)) - print(ec, out, err) - - -def test_pull(conn, fname="test001.txt"): - - local = "foo" - conn.pull_file("/tmp/{0}".format(fname), local) - - with open("{0}/{1}".format(local, fname), 'r') as f: - print(f.readlines()) - - -if __name__ == "__main__": - - parsl.set_stream_logger() - - # This is for testing - conn = SSH("midway.rcc.uchicago.edu", username="yadunand") - - test_push(conn) - test_pull(conn) - - conn.close() diff --git a/parsl/tests/integration/test_channels/test_ssh_interactive.py b/parsl/tests/integration/test_channels/test_ssh_interactive.py deleted file mode 100644 index c6f9b9dea9..0000000000 --- a/parsl/tests/integration/test_channels/test_ssh_interactive.py +++ /dev/null @@ -1,24 +0,0 @@ -import parsl -from parsl.channels.ssh_il.ssh_il import SSHInteractiveLoginChannel as SSH - - -def connect_and_list(hostname, username): - conn = SSH(hostname, username=username) - ec, out, err = conn.execute_wait("echo $HOSTNAME") - conn.close() - return out - - -def test_cooley(): - ''' Test ssh channels to midway - ''' - url = 'cooley.alcf.anl.gov' - uname = 'yadunand' - out = connect_and_list(url, uname) - print("Sitename :{0} hostname:{1}".format(url, out)) - return - - -if __name__ == "__main__": - parsl.set_stream_logger() - test_cooley() diff --git a/parsl/tests/manual_tests/test_ad_hoc_htex.py b/parsl/tests/manual_tests/test_ad_hoc_htex.py deleted file mode 100644 index dfa34ec0d1..0000000000 --- a/parsl/tests/manual_tests/test_ad_hoc_htex.py +++ /dev/null @@ -1,49 +0,0 @@ -import parsl -from parsl import python_app - -parsl.set_stream_logger() - -from parsl.channels import SSHChannel -from parsl.config import Config -from parsl.executors import HighThroughputExecutor -from parsl.providers import AdHocProvider - -remotes = ['midway2-login2.rcc.uchicago.edu', 'midway2-login1.rcc.uchicago.edu'] - -config = Config( - executors=[ - HighThroughputExecutor( - label='AdHoc', - max_workers_per_node=2, - worker_logdir_root="/scratch/midway2/yadunand/parsl_scripts", - encrypted=True, - provider=AdHocProvider( - worker_init="source /scratch/midway2/yadunand/parsl_env_setup.sh", - channels=[SSHChannel(hostname=m, - username="yadunand", - script_dir="/scratch/midway2/yadunand/parsl_cluster") - for m in remotes] - ) - ) - ] -) - - -@python_app -def platform(sleep=2, stdout=None): - import platform - import time - time.sleep(sleep) - return platform.uname() - - -def test_raw_provider(): - - parsl.load(config) - - x = [platform() for i in range(10)] - print([i.result() for i in x]) - - -if __name__ == "__main__": - test_raw_provider() diff --git a/parsl/tests/manual_tests/test_oauth_ssh.py b/parsl/tests/manual_tests/test_oauth_ssh.py deleted file mode 100644 index 3d464bcc0e..0000000000 --- a/parsl/tests/manual_tests/test_oauth_ssh.py +++ /dev/null @@ -1,13 +0,0 @@ -from parsl.channels import OAuthSSHChannel - - -def test_channel(): - channel = OAuthSSHChannel(hostname='ssh.demo.globus.org', username='yadunand') - x, stdout, stderr = channel.execute_wait('ls') - print(x, stdout, stderr) - assert x == 0, "Expected exit code 0, got {}".format(x) - - -if __name__ == '__main__': - - test_channel() diff --git a/parsl/tests/test_htex/test_resource_spec_validation.py b/parsl/tests/test_htex/test_resource_spec_validation.py new file mode 100644 index 0000000000..ac0c580c20 --- /dev/null +++ b/parsl/tests/test_htex/test_resource_spec_validation.py @@ -0,0 +1,40 @@ +import queue +from unittest import mock + +import pytest + +from parsl.executors import HighThroughputExecutor +from parsl.executors.high_throughput.mpi_prefix_composer import ( + InvalidResourceSpecification, +) + + +def double(x): + return x * 2 + + +@pytest.mark.local +def test_submit_calls_validate(): + + htex = HighThroughputExecutor() + htex.outgoing_q = mock.Mock(spec=queue.Queue) + htex.validate_resource_spec = mock.Mock(spec=htex.validate_resource_spec) + + res_spec = {} + htex.submit(double, res_spec, (5,), {}) + htex.validate_resource_spec.assert_called() + + +@pytest.mark.local +def test_resource_spec_validation(): + htex = HighThroughputExecutor() + ret_val = htex.validate_resource_spec({}) + assert ret_val is None + + +@pytest.mark.local +def test_resource_spec_validation_bad_keys(): + htex = HighThroughputExecutor() + + with pytest.raises(InvalidResourceSpecification): + htex.validate_resource_spec({"num_nodes": 2}) diff --git a/parsl/tests/test_htex/test_zmq_binding.py b/parsl/tests/test_htex/test_zmq_binding.py index 2273443b99..e21c065d0d 100644 --- a/parsl/tests/test_htex/test_zmq_binding.py +++ b/parsl/tests/test_htex/test_zmq_binding.py @@ -25,7 +25,8 @@ def make_interchange(*, interchange_address: Optional[str], cert_dir: Optional[s logdir=".", logging_level=logging.INFO, manager_selector=RandomManagerSelector(), - poll_period=10) + poll_period=10, + run_id="test_run_id") @pytest.fixture diff --git a/parsl/tests/test_mpi_apps/test_bad_mpi_config.py b/parsl/tests/test_mpi_apps/test_bad_mpi_config.py index 336bf87703..ebeb64622d 100644 --- a/parsl/tests/test_mpi_apps/test_bad_mpi_config.py +++ b/parsl/tests/test_mpi_apps/test_bad_mpi_config.py @@ -1,33 +1,48 @@ import pytest from parsl import Config -from parsl.executors import HighThroughputExecutor +from parsl.executors import MPIExecutor from parsl.launchers import AprunLauncher, SimpleLauncher, SrunLauncher from parsl.providers import SlurmProvider @pytest.mark.local -def test_bad_launcher_with_mpi_mode(): - """AssertionError if a launcher other than SimpleLauncher is supplied""" +def test_bad_launcher(): + """TypeError if a launcher other than SimpleLauncher is supplied""" for launcher in [SrunLauncher(), AprunLauncher()]: - with pytest.raises(AssertionError): + with pytest.raises(TypeError): Config(executors=[ - HighThroughputExecutor( - enable_mpi_mode=True, + MPIExecutor( provider=SlurmProvider(launcher=launcher), ) ]) @pytest.mark.local -def test_correct_launcher_with_mpi_mode(): +def test_bad_mpi_launcher(): + """ValueError if an unsupported mpi_launcher is specified""" + + with pytest.raises(ValueError): + Config(executors=[ + MPIExecutor( + mpi_launcher="bad_launcher", + provider=SlurmProvider(launcher=SimpleLauncher()), + ) + ]) + + +@pytest.mark.local +@pytest.mark.parametrize( + "mpi_launcher", + ["srun", "aprun", "mpiexec"] +) +def test_correct_launcher_with_mpi_mode(mpi_launcher: str): """Confirm that SimpleLauncher works with mpi_mode""" - config = Config(executors=[ - HighThroughputExecutor( - enable_mpi_mode=True, - provider=SlurmProvider(launcher=SimpleLauncher()), - ) - ]) - assert isinstance(config.executors[0].provider.launcher, SimpleLauncher) + executor = MPIExecutor( + mpi_launcher=mpi_launcher, + provider=SlurmProvider(launcher=SimpleLauncher()), + ) + + assert isinstance(executor.provider.launcher, SimpleLauncher) diff --git a/parsl/tests/test_mpi_apps/test_mpi_mode_disabled.py b/parsl/tests/test_mpi_apps/test_mpi_mode_disabled.py deleted file mode 100644 index e1e5c70883..0000000000 --- a/parsl/tests/test_mpi_apps/test_mpi_mode_disabled.py +++ /dev/null @@ -1,47 +0,0 @@ -from typing import Dict - -import pytest - -import parsl -from parsl import python_app -from parsl.tests.configs.htex_local import fresh_config - -EXECUTOR_LABEL = "MPI_TEST" - - -def local_config(): - config = fresh_config() - config.executors[0].label = EXECUTOR_LABEL - config.executors[0].max_workers_per_node = 1 - config.executors[0].enable_mpi_mode = False - return config - - -@python_app -def get_env_vars(parsl_resource_specification: Dict = {}) -> Dict: - import os - - parsl_vars = {} - for key in os.environ: - if key.startswith("PARSL_"): - parsl_vars[key] = os.environ[key] - return parsl_vars - - -@pytest.mark.local -def test_only_resource_specs_set(): - """Confirm that resource_spec env vars are set while launch prefixes are not - when enable_mpi_mode = False""" - resource_spec = { - "num_nodes": 4, - "ranks_per_node": 2, - } - - future = get_env_vars(parsl_resource_specification=resource_spec) - - result = future.result() - assert isinstance(result, Dict) - assert "PARSL_DEFAULT_PREFIX" not in result - assert "PARSL_SRUN_PREFIX" not in result - assert result["PARSL_NUM_NODES"] == str(resource_spec["num_nodes"]) - assert result["PARSL_RANKS_PER_NODE"] == str(resource_spec["ranks_per_node"]) diff --git a/parsl/tests/test_mpi_apps/test_mpi_mode_enabled.py b/parsl/tests/test_mpi_apps/test_mpi_mode_enabled.py index 6743d40eba..aff2501674 100644 --- a/parsl/tests/test_mpi_apps/test_mpi_mode_enabled.py +++ b/parsl/tests/test_mpi_apps/test_mpi_mode_enabled.py @@ -6,26 +6,34 @@ import pytest import parsl -from parsl import bash_app, python_app +from parsl import Config, bash_app, python_app +from parsl.executors import MPIExecutor from parsl.executors.high_throughput.mpi_prefix_composer import ( MissingResourceSpecification, ) -from parsl.tests.configs.htex_local import fresh_config +from parsl.launchers import SimpleLauncher +from parsl.providers import LocalProvider EXECUTOR_LABEL = "MPI_TEST" def local_setup(): - config = fresh_config() - config.executors[0].label = EXECUTOR_LABEL - config.executors[0].max_workers_per_node = 2 - config.executors[0].enable_mpi_mode = True - config.executors[0].mpi_launcher = "mpiexec" cwd = os.path.abspath(os.path.dirname(__file__)) pbs_nodefile = os.path.join(cwd, "mocks", "pbs_nodefile") - config.executors[0].provider.worker_init = f"export PBS_NODEFILE={pbs_nodefile}" + config = Config( + executors=[ + MPIExecutor( + label=EXECUTOR_LABEL, + max_workers_per_block=2, + mpi_launcher="mpiexec", + provider=LocalProvider( + worker_init=f"export PBS_NODEFILE={pbs_nodefile}", + launcher=SimpleLauncher() + ) + ) + ]) parsl.load(config) diff --git a/parsl/tests/test_mpi_apps/test_mpiex.py b/parsl/tests/test_mpi_apps/test_mpiex.py index a85547abea..2e8a38bc68 100644 --- a/parsl/tests/test_mpi_apps/test_mpiex.py +++ b/parsl/tests/test_mpi_apps/test_mpiex.py @@ -4,7 +4,6 @@ import pytest -import parsl from parsl import Config, HighThroughputExecutor from parsl.executors.high_throughput.mpi_executor import MPIExecutor from parsl.launchers import SimpleLauncher @@ -42,8 +41,8 @@ def test_docstring(): def test_init(): """Ensure all relevant kwargs are copied over from HTEx""" - new_kwargs = {'max_workers_per_block'} - excluded_kwargs = {'available_accelerators', 'enable_mpi_mode', 'cores_per_worker', 'max_workers_per_node', + new_kwargs = {'max_workers_per_block', 'mpi_launcher'} + excluded_kwargs = {'available_accelerators', 'cores_per_worker', 'max_workers_per_node', 'mem_per_worker', 'cpu_affinity', 'max_workers', 'manager_selector'} # Get the kwargs from both HTEx and MPIEx diff --git a/parsl/tests/test_mpi_apps/test_resource_spec.py b/parsl/tests/test_mpi_apps/test_resource_spec.py index 99d0187ccd..f180c67d52 100644 --- a/parsl/tests/test_mpi_apps/test_resource_spec.py +++ b/parsl/tests/test_mpi_apps/test_resource_spec.py @@ -1,18 +1,20 @@ import contextlib import logging import os +import queue import typing import unittest from typing import Dict +from unittest import mock import pytest -import parsl from parsl.app.app import python_app +from parsl.executors.high_throughput.executor import HighThroughputExecutor +from parsl.executors.high_throughput.mpi_executor import MPIExecutor from parsl.executors.high_throughput.mpi_prefix_composer import ( InvalidResourceSpecification, MissingResourceSpecification, - validate_resource_spec, ) from parsl.executors.high_throughput.mpi_resource_management import ( get_nodes_in_batchjob, @@ -20,6 +22,8 @@ get_slurm_hosts_list, identify_scheduler, ) +from parsl.launchers import SimpleLauncher +from parsl.providers import LocalProvider from parsl.tests.configs.htex_local import fresh_config EXECUTOR_LABEL = "MPI_TEST" @@ -48,23 +52,6 @@ def get_env_vars(parsl_resource_specification: Dict = {}) -> Dict: return parsl_vars -@pytest.mark.local -def test_resource_spec_env_vars(): - resource_spec = { - "num_nodes": 4, - "ranks_per_node": 2, - } - - assert double(5).result() == 10 - - future = get_env_vars(parsl_resource_specification=resource_spec) - - result = future.result() - assert isinstance(result, Dict) - assert result["PARSL_NUM_NODES"] == str(resource_spec["num_nodes"]) - assert result["PARSL_RANKS_PER_NODE"] == str(resource_spec["ranks_per_node"]) - - @pytest.mark.local @unittest.mock.patch("subprocess.check_output", return_value=b"c203-031\nc203-032\n") def test_slurm_mocked_mpi_fetch(subprocess_check): @@ -83,16 +70,6 @@ def add_to_path(path: os.PathLike) -> typing.Generator[None, None, None]: os.environ["PATH"] = old_path -@pytest.mark.local -@pytest.mark.skip -def test_slurm_mpi_fetch(): - logging.warning(f"Current pwd : {os.path.dirname(__file__)}") - with add_to_path(os.path.dirname(__file__)): - logging.warning(f"PATH: {os.environ['PATH']}") - nodeinfo = get_slurm_hosts_list() - logging.warning(f"Got : {nodeinfo}") - - @contextlib.contextmanager def mock_pbs_nodefile(nodefile: str = "pbs_nodefile") -> typing.Generator[None, None, None]: cwd = os.path.abspath(os.path.dirname(__file__)) @@ -122,22 +99,43 @@ def test_top_level(): @pytest.mark.local @pytest.mark.parametrize( - "resource_spec, is_mpi_enabled, exception", + "resource_spec, exception", ( - ({"num_nodes": 2, "ranks_per_node": 1}, False, None), - ({"launcher_options": "--debug_foo"}, False, None), - ({"num_nodes": 2, "BAD_OPT": 1}, False, InvalidResourceSpecification), - ({}, False, None), - ({"num_nodes": 2, "ranks_per_node": 1}, True, None), - ({"launcher_options": "--debug_foo"}, True, None), - ({"num_nodes": 2, "BAD_OPT": 1}, True, InvalidResourceSpecification), - ({}, True, MissingResourceSpecification), + + ({"num_nodes": 2, "ranks_per_node": 1}, None), + ({"launcher_options": "--debug_foo"}, None), + ({"num_nodes": 2, "BAD_OPT": 1}, InvalidResourceSpecification), + ({}, MissingResourceSpecification), ) ) -def test_resource_spec(resource_spec: Dict, is_mpi_enabled: bool, exception): +def test_mpi_resource_spec(resource_spec: Dict, exception): + """Test validation of resource_specification in MPIExecutor""" + + mpi_ex = MPIExecutor(provider=LocalProvider(launcher=SimpleLauncher())) + mpi_ex.outgoing_q = mock.Mock(spec=queue.Queue) + if exception: with pytest.raises(exception): - validate_resource_spec(resource_spec, is_mpi_enabled) + mpi_ex.validate_resource_spec(resource_spec) else: - result = validate_resource_spec(resource_spec, is_mpi_enabled) + result = mpi_ex.validate_resource_spec(resource_spec) assert result is None + + +@pytest.mark.local +@pytest.mark.parametrize( + "resource_spec", + ( + {"num_nodes": 2, "ranks_per_node": 1}, + {"launcher_options": "--debug_foo"}, + {"BAD_OPT": 1}, + ) +) +def test_mpi_resource_spec_passed_to_htex(resource_spec: dict): + """HTEX should reject every resource_spec""" + + htex = HighThroughputExecutor() + htex.outgoing_q = mock.Mock(spec=queue.Queue) + + with pytest.raises(InvalidResourceSpecification): + htex.validate_resource_spec(resource_spec) diff --git a/parsl/tests/test_providers/test_local_provider.py b/parsl/tests/test_providers/test_local_provider.py index c6844b00c0..497c13370d 100644 --- a/parsl/tests/test_providers/test_local_provider.py +++ b/parsl/tests/test_providers/test_local_provider.py @@ -11,7 +11,8 @@ import pytest -from parsl.channels import LocalChannel, SSHChannel +from parsl.channels import LocalChannel +from parsl.channels.ssh.ssh import DeprecatedSSHChannel from parsl.jobs.states import JobState from parsl.launchers import SingleNodeLauncher from parsl.providers import LocalProvider @@ -92,10 +93,10 @@ def test_ssh_channel(): # already exist, so create it here. pathlib.Path('{}/known.hosts'.format(config_dir)).touch(mode=0o600) script_dir = tempfile.mkdtemp() - channel = SSHChannel('127.0.0.1', port=server_port, - script_dir=remote_script_dir, - host_keys_filename='{}/known.hosts'.format(config_dir), - key_filename=priv_key) + channel = DeprecatedSSHChannel('127.0.0.1', port=server_port, + script_dir=remote_script_dir, + host_keys_filename='{}/known.hosts'.format(config_dir), + key_filename=priv_key) try: p = LocalProvider(channel=channel, launcher=SingleNodeLauncher(debug=False)) diff --git a/requirements.txt b/requirements.txt index e89202942e..c60517655f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,7 +12,6 @@ globus-sdk dill tblib requests -paramiko psutil>=5.5.1 setproctitle filelock>=3.13,<4 diff --git a/setup.py b/setup.py index 85e014dc18..4934d01e5d 100755 --- a/setup.py +++ b/setup.py @@ -35,6 +35,7 @@ 'flux': ['pyyaml', 'cffi', 'jsonschema'], 'proxystore': ['proxystore'], 'radical-pilot': ['radical.pilot==1.60', 'radical.utils==1.60'], + 'ssh': ['paramiko'], # Disabling psi-j since github direct links are not allowed by pypi # 'psij': ['psi-j-parsl@git+https://github.com/ExaWorks/psi-j-parsl'] } diff --git a/test-requirements.txt b/test-requirements.txt index c735de8d5c..acd670b5e9 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -1,12 +1,14 @@ flake8==6.1.0 ipyparallel pandas +paramiko pytest>=7.4.0,<8 pytest-cov pytest-random-order nbsphinx sphinx_rtd_theme mypy==1.5.1 +types-mock types-python-dateutil types-requests types-paramiko