diff --git a/docs/reference.rst b/docs/reference.rst index 45f83ad36f..d4c70dd7bd 100644 --- a/docs/reference.rst +++ b/docs/reference.rst @@ -169,8 +169,6 @@ Exceptions parsl.providers.errors.ScaleOutFailed parsl.providers.errors.SchedulerMissingArgs parsl.providers.errors.ScriptPathError - parsl.channels.errors.ChannelError - parsl.channels.errors.FileCopyException parsl.executors.high_throughput.errors.WorkerLost parsl.executors.high_throughput.interchange.ManagerLost parsl.serialize.errors.DeserializationError diff --git a/docs/userguide/monitoring.rst b/docs/userguide/monitoring.rst index caf63a44ce..3df03fb205 100644 --- a/docs/userguide/monitoring.rst +++ b/docs/userguide/monitoring.rst @@ -42,7 +42,6 @@ configuration. Here the `parsl.monitoring.MonitoringHub` is specified to use por ], monitoring=MonitoringHub( hub_address=address_by_hostname(), - hub_port=55055, monitoring_debug=False, resource_monitoring_interval=10, ), diff --git a/parsl/channels/base.py b/parsl/channels/base.py index 05241b878d..4ba3b4e02c 100644 --- a/parsl/channels/base.py +++ b/parsl/channels/base.py @@ -1,54 +1,5 @@ -from abc import ABCMeta, abstractmethod, abstractproperty -from typing import Tuple +from abc import ABCMeta class Channel(metaclass=ABCMeta): - """Channels are abstractions that enable ExecutionProviders to talk to - resource managers of remote compute facilities. - - For certain resources such as campus clusters or supercomputers at - research laboratories, resource requirements may require authentication. - - The only remaining Channel, *LocalChannel*, executes commands locally in a - shell. - - Channels provide the ability to execute commands remotely, using the - execute_wait method, and manipulate the remote file system using methods - such as push_file, pull_file and makedirs. - - Channels should ensure that each launched command runs in a new process - group, so that providers (such as LocalProvider) which terminate long - running commands using process groups can do so. - """ - - @abstractmethod - def execute_wait(self, cmd: str, walltime: int = 0) -> Tuple[int, str, str]: - ''' Executes the cmd, with a defined walltime. - - Args: - - cmd (string): Command string to execute over the channel - - walltime (int) : Timeout in seconds - - Returns: - - (exit_code, stdout, stderr) (int, string, string) - ''' - pass - - @abstractproperty - def script_dir(self) -> str: - ''' This is a property. Returns the directory assigned for storing all internal scripts such as - scheduler submit scripts. This is usually where error logs from the scheduler would reside on the - channel destination side. - - Args: - - None - - Returns: - - Channel script dir - ''' - pass - - # DFK expects to be able to modify this, so it needs to be in the abstract class - @script_dir.setter - def script_dir(self, value: str) -> None: - pass + pass diff --git a/parsl/channels/errors.py b/parsl/channels/errors.py deleted file mode 100644 index effaea9548..0000000000 --- a/parsl/channels/errors.py +++ /dev/null @@ -1,30 +0,0 @@ -''' Exceptions raise by Apps. -''' -from parsl.errors import ParslError - - -class ChannelError(ParslError): - """ Base class for all exceptions - - Only to be invoked when only a more specific error is not available. - """ - def __init__(self, reason: str, e: Exception, hostname: str) -> None: - self.reason = reason - self.e = e - self.hostname = hostname - - def __str__(self) -> str: - return "Hostname:{0}, Reason:{1}".format(self.hostname, self.reason) - - -class FileCopyException(ChannelError): - ''' File copy operation failed - - Contains: - reason(string) - e (paramiko exception object) - hostname (string) - ''' - - def __init__(self, e: Exception, hostname: str) -> None: - super().__init__("File copy failed due to {0}".format(e), e, hostname) diff --git a/parsl/channels/local/local.py b/parsl/channels/local/local.py index 6ef014ac19..61d8dea17a 100644 --- a/parsl/channels/local/local.py +++ b/parsl/channels/local/local.py @@ -1,6 +1,4 @@ import logging -import os -import subprocess from parsl.channels.base import Channel from parsl.utils import RepresentationMixin @@ -9,58 +7,4 @@ class LocalChannel(Channel, RepresentationMixin): - ''' This is not even really a channel, since opening a local shell is not heavy - and done so infrequently that they do not need a persistent channel - ''' - - def __init__(self): - ''' Initialize the local channel. script_dir is required by set to a default. - - KwArgs: - - script_dir (string): Directory to place scripts - ''' - self.script_dir = None - - def execute_wait(self, cmd, walltime=None): - ''' Synchronously execute a commandline string on the shell. - - Args: - - cmd (string) : Commandline string to execute - - walltime (int) : walltime in seconds - - Returns: - - retcode : Return code from the execution - - stdout : stdout string - - stderr : stderr string - ''' - try: - logger.debug("Creating process with command '%s'", cmd) - proc = subprocess.Popen( - cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - shell=True, - preexec_fn=os.setpgrp - ) - logger.debug("Created process with pid %s. Performing communicate", proc.pid) - (stdout, stderr) = proc.communicate(timeout=walltime) - retcode = proc.returncode - logger.debug("Process %s returned %s", proc.pid, proc.returncode) - - except Exception: - logger.exception(f"Execution of command failed:\n{cmd}") - raise - else: - logger.debug("Execution of command in process %s completed normally", proc.pid) - - return (retcode, stdout.decode("utf-8"), stderr.decode("utf-8")) - - @property - def script_dir(self): - return self._script_dir - - @script_dir.setter - def script_dir(self, value): - if value is not None: - value = os.path.abspath(value) - self._script_dir = value + pass diff --git a/parsl/configs/ASPIRE1.py b/parsl/configs/ASPIRE1.py index 7792f15dba..017e1061d7 100644 --- a/parsl/configs/ASPIRE1.py +++ b/parsl/configs/ASPIRE1.py @@ -34,7 +34,6 @@ ], monitoring=MonitoringHub( hub_address=address_by_interface('ib0'), - hub_port=55055, resource_monitoring_interval=10, ), strategy='simple', diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index 19a2b11263..fff183062e 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -1280,8 +1280,6 @@ def add_executors(self, executors: Sequence[ParslExecutor]) -> None: executor.provider.script_dir = os.path.join(self.run_dir, 'submit_scripts') os.makedirs(executor.provider.script_dir, exist_ok=True) - executor.provider.channel.script_dir = executor.provider.script_dir - self.executors[executor.label] = executor executor.start() block_executors = [e for e in executors if isinstance(e, BlockProviderExecutor)] diff --git a/parsl/executors/base.py b/parsl/executors/base.py index 751502ef09..b0d7995f44 100644 --- a/parsl/executors/base.py +++ b/parsl/executors/base.py @@ -6,7 +6,7 @@ from typing_extensions import Literal, Self -from parsl.monitoring.radios import MonitoringRadioSender +from parsl.monitoring.radios.base import MonitoringRadioSender class ParslExecutor(metaclass=ABCMeta): diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index 882776cf5c..d3fa522619 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -331,6 +331,9 @@ def __init__(self, interchange_launch_cmd = DEFAULT_INTERCHANGE_LAUNCH_CMD self.interchange_launch_cmd = interchange_launch_cmd + self._result_queue_thread_exit = threading.Event() + self._result_queue_thread: Optional[threading.Thread] = None + radio_mode = "htex" enable_mpi_mode: bool = False mpi_launcher: str = "mpiexec" @@ -455,9 +458,11 @@ def _result_queue_worker(self): """ logger.debug("Result queue worker starting") - while not self.bad_state_is_set: + while not self.bad_state_is_set and not self._result_queue_thread_exit.is_set(): try: - msgs = self.incoming_q.get() + msgs = self.incoming_q.get(timeout_ms=self.poll_period) + if msgs is None: # timeout + continue except IOError as e: logger.exception("Caught broken queue with exception code {}: {}".format(e.errno, e)) @@ -515,6 +520,8 @@ def _result_queue_worker(self): else: raise BadMessage("Message received with unknown type {}".format(msg['type'])) + logger.info("Closing result ZMQ pipe") + self.incoming_q.close() logger.info("Result queue worker finished") def _start_local_interchange_process(self) -> None: @@ -817,6 +824,8 @@ def shutdown(self, timeout: float = 10.0): logger.info("Attempting HighThroughputExecutor shutdown") + logger.info("Terminating interchange and result queue thread") + self._result_queue_thread_exit.set() self.interchange_proc.terminate() try: self.interchange_proc.wait(timeout=timeout) @@ -841,6 +850,10 @@ def shutdown(self, timeout: float = 10.0): logger.info("Closing command client") self.command_client.close() + logger.info("Waiting for result queue thread exit") + if self._result_queue_thread: + self._result_queue_thread.join() + logger.info("Finished HighThroughputExecutor shutdown attempt") def get_usage_information(self): diff --git a/parsl/executors/high_throughput/interchange.py b/parsl/executors/high_throughput/interchange.py index 88bb6c7156..12d3e07f31 100644 --- a/parsl/executors/high_throughput/interchange.py +++ b/parsl/executors/high_throughput/interchange.py @@ -20,7 +20,8 @@ from parsl.executors.high_throughput.manager_record import ManagerRecord from parsl.executors.high_throughput.manager_selector import ManagerSelector from parsl.monitoring.message_type import MessageType -from parsl.monitoring.radios import MonitoringRadioSender, ZMQRadioSender +from parsl.monitoring.radios.base import MonitoringRadioSender +from parsl.monitoring.radios.zmq import ZMQRadioSender from parsl.process_loggers import wrap_with_logs from parsl.serialize import serialize as serialize_object from parsl.utils import setproctitle diff --git a/parsl/executors/high_throughput/zmq_pipes.py b/parsl/executors/high_throughput/zmq_pipes.py index 54ed8c1da9..a7278cf067 100644 --- a/parsl/executors/high_throughput/zmq_pipes.py +++ b/parsl/executors/high_throughput/zmq_pipes.py @@ -206,12 +206,21 @@ def __init__(self, ip_address, port_range, cert_dir: Optional[str] = None): self.port = self.results_receiver.bind_to_random_port(tcp_url(ip_address), min_port=port_range[0], max_port=port_range[1]) + self.poller = zmq.Poller() + self.poller.register(self.results_receiver, zmq.POLLIN) - def get(self): + def get(self, timeout_ms=None): + """Get a message from the queue, returning None if timeout expires + without a message. timeout is measured in milliseconds. + """ logger.debug("Waiting for ResultsIncoming message") - m = self.results_receiver.recv_multipart() - logger.debug("Received ResultsIncoming message") - return m + socks = dict(self.poller.poll(timeout=timeout_ms)) + if self.results_receiver in socks and socks[self.results_receiver] == zmq.POLLIN: + m = self.results_receiver.recv_multipart() + logger.debug("Received ResultsIncoming message") + return m + else: + return None def close(self): self.results_receiver.close() diff --git a/parsl/monitoring/monitoring.py b/parsl/monitoring/monitoring.py index 90dd00e9c7..4e03de688a 100644 --- a/parsl/monitoring/monitoring.py +++ b/parsl/monitoring/monitoring.py @@ -14,7 +14,7 @@ from parsl.log_utils import set_file_logger from parsl.monitoring.errors import MonitoringHubStartError -from parsl.monitoring.radios import MultiprocessingQueueRadioSender +from parsl.monitoring.radios.multiprocessing import MultiprocessingQueueRadioSender from parsl.monitoring.router import router_starter from parsl.monitoring.types import TaggedMonitoringMessage from parsl.multiprocessing import ForkProcess, SizedQueue diff --git a/parsl/monitoring/radios.py b/parsl/monitoring/radios.py deleted file mode 100644 index 14dc046557..0000000000 --- a/parsl/monitoring/radios.py +++ /dev/null @@ -1,191 +0,0 @@ -import logging -import os -import pickle -import socket -import uuid -from abc import ABCMeta, abstractmethod -from multiprocessing.queues import Queue - -import zmq - -logger = logging.getLogger(__name__) - - -class MonitoringRadioSender(metaclass=ABCMeta): - @abstractmethod - def send(self, message: object) -> None: - pass - - -class FilesystemRadioSender(MonitoringRadioSender): - """A MonitoringRadioSender that sends messages over a shared filesystem. - - The messsage directory structure is based on maildir, - https://en.wikipedia.org/wiki/Maildir - - The writer creates a message in tmp/ and then when it is fully - written, moves it atomically into new/ - - The reader ignores tmp/ and only reads and deletes messages from - new/ - - This avoids a race condition of reading partially written messages. - - This radio is likely to give higher shared filesystem load compared to - the UDP radio, but should be much more reliable. - """ - - def __init__(self, *, monitoring_url: str, timeout: int = 10, run_dir: str): - logger.info("filesystem based monitoring channel initializing") - self.base_path = f"{run_dir}/monitor-fs-radio/" - self.tmp_path = f"{self.base_path}/tmp" - self.new_path = f"{self.base_path}/new" - - os.makedirs(self.tmp_path, exist_ok=True) - os.makedirs(self.new_path, exist_ok=True) - - def send(self, message: object) -> None: - logger.info("Sending a monitoring message via filesystem") - - unique_id = str(uuid.uuid4()) - - tmp_filename = f"{self.tmp_path}/{unique_id}" - new_filename = f"{self.new_path}/{unique_id}" - buffer = message - - # this will write the message out then atomically - # move it into new/, so that a partially written - # file will never be observed in new/ - with open(tmp_filename, "wb") as f: - pickle.dump(buffer, f) - os.rename(tmp_filename, new_filename) - - -class HTEXRadioSender(MonitoringRadioSender): - - def __init__(self, monitoring_url: str, timeout: int = 10): - """ - Parameters - ---------- - - monitoring_url : str - URL of the form ://: - timeout : int - timeout, default=10s - """ - logger.info("htex-based monitoring channel initialising") - - def send(self, message: object) -> None: - """ Sends a message to the UDP receiver - - Parameter - --------- - - message: object - Arbitrary pickle-able object that is to be sent - - Returns: - None - """ - - import parsl.executors.high_throughput.monitoring_info - - result_queue = parsl.executors.high_throughput.monitoring_info.result_queue - - # this message needs to go in the result queue tagged so that it is treated - # i) as a monitoring message by the interchange, and then further more treated - # as a RESOURCE_INFO message when received by monitoring (rather than a NODE_INFO - # which is the implicit default for messages from the interchange) - - # for the interchange, the outer wrapper, this needs to be a dict: - - interchange_msg = { - 'type': 'monitoring', - 'payload': message - } - - if result_queue: - result_queue.put(pickle.dumps(interchange_msg)) - else: - logger.error("result_queue is uninitialized - cannot put monitoring message") - - return - - -class UDPRadioSender(MonitoringRadioSender): - - def __init__(self, monitoring_url: str, timeout: int = 10): - """ - Parameters - ---------- - - monitoring_url : str - URL of the form ://: - timeout : int - timeout, default=10s - """ - self.monitoring_url = monitoring_url - self.sock_timeout = timeout - try: - self.scheme, self.ip, port = (x.strip('/') for x in monitoring_url.split(':')) - self.port = int(port) - except Exception: - raise Exception("Failed to parse monitoring url: {}".format(monitoring_url)) - - self.sock = socket.socket(socket.AF_INET, - socket.SOCK_DGRAM, - socket.IPPROTO_UDP) # UDP - self.sock.settimeout(self.sock_timeout) - - def send(self, message: object) -> None: - """ Sends a message to the UDP receiver - - Parameter - --------- - - message: object - Arbitrary pickle-able object that is to be sent - - Returns: - None - """ - try: - buffer = pickle.dumps(message) - except Exception: - logging.exception("Exception during pickling", exc_info=True) - return - - try: - self.sock.sendto(buffer, (self.ip, self.port)) - except socket.timeout: - logging.error("Could not send message within timeout limit") - return - return - - -class MultiprocessingQueueRadioSender(MonitoringRadioSender): - """A monitoring radio which connects over a multiprocessing Queue. - This radio is intended to be used on the submit side, where components - in the submit process, or processes launched by multiprocessing, will have - access to a Queue shared with the monitoring database code (bypassing the - monitoring router). - """ - def __init__(self, queue: Queue) -> None: - self.queue = queue - - def send(self, message: object) -> None: - self.queue.put(message) - - -class ZMQRadioSender(MonitoringRadioSender): - """A monitoring radio which connects over ZMQ. This radio is not - thread-safe, because its use of ZMQ is not thread-safe. - """ - - def __init__(self, hub_address: str, hub_zmq_port: int) -> None: - self._hub_channel = zmq.Context().socket(zmq.DEALER) - self._hub_channel.set_hwm(0) - self._hub_channel.connect(f"tcp://{hub_address}:{hub_zmq_port}") - - def send(self, message: object) -> None: - self._hub_channel.send_pyobj(message) diff --git a/parsl/tests/test_channels/__init__.py b/parsl/monitoring/radios/__init__.py similarity index 100% rename from parsl/tests/test_channels/__init__.py rename to parsl/monitoring/radios/__init__.py diff --git a/parsl/monitoring/radios/base.py b/parsl/monitoring/radios/base.py new file mode 100644 index 0000000000..2bb799f256 --- /dev/null +++ b/parsl/monitoring/radios/base.py @@ -0,0 +1,13 @@ +import logging +from abc import ABCMeta, abstractmethod +from typing import Optional + +_db_manager_excepts: Optional[Exception] + +logger = logging.getLogger(__name__) + + +class MonitoringRadioSender(metaclass=ABCMeta): + @abstractmethod + def send(self, message: object) -> None: + pass diff --git a/parsl/monitoring/radios/filesystem.py b/parsl/monitoring/radios/filesystem.py new file mode 100644 index 0000000000..accff87d36 --- /dev/null +++ b/parsl/monitoring/radios/filesystem.py @@ -0,0 +1,52 @@ +import logging +import os +import pickle +import uuid + +from parsl.monitoring.radios.base import MonitoringRadioSender + +logger = logging.getLogger(__name__) + + +class FilesystemRadioSender(MonitoringRadioSender): + """A MonitoringRadioSender that sends messages over a shared filesystem. + + The messsage directory structure is based on maildir, + https://en.wikipedia.org/wiki/Maildir + + The writer creates a message in tmp/ and then when it is fully + written, moves it atomically into new/ + + The reader ignores tmp/ and only reads and deletes messages from + new/ + + This avoids a race condition of reading partially written messages. + + This radio is likely to give higher shared filesystem load compared to + the UDP radio, but should be much more reliable. + """ + + def __init__(self, *, monitoring_url: str, timeout: int = 10, run_dir: str): + logger.info("filesystem based monitoring channel initializing") + self.base_path = f"{run_dir}/monitor-fs-radio/" + self.tmp_path = f"{self.base_path}/tmp" + self.new_path = f"{self.base_path}/new" + + os.makedirs(self.tmp_path, exist_ok=True) + os.makedirs(self.new_path, exist_ok=True) + + def send(self, message: object) -> None: + logger.info("Sending a monitoring message via filesystem") + + unique_id = str(uuid.uuid4()) + + tmp_filename = f"{self.tmp_path}/{unique_id}" + new_filename = f"{self.new_path}/{unique_id}" + buffer = message + + # this will write the message out then atomically + # move it into new/, so that a partially written + # file will never be observed in new/ + with open(tmp_filename, "wb") as f: + pickle.dump(buffer, f) + os.rename(tmp_filename, new_filename) diff --git a/parsl/monitoring/radios/htex.py b/parsl/monitoring/radios/htex.py new file mode 100644 index 0000000000..bdb893b303 --- /dev/null +++ b/parsl/monitoring/radios/htex.py @@ -0,0 +1,57 @@ +import logging +import pickle + +from parsl.monitoring.radios.base import MonitoringRadioSender + +logger = logging.getLogger(__name__) + + +class HTEXRadioSender(MonitoringRadioSender): + + def __init__(self, monitoring_url: str, timeout: int = 10): + """ + Parameters + ---------- + + monitoring_url : str + URL of the form ://: + timeout : int + timeout, default=10s + """ + logger.info("htex-based monitoring channel initialising") + + def send(self, message: object) -> None: + """ Sends a message to the UDP receiver + + Parameter + --------- + + message: object + Arbitrary pickle-able object that is to be sent + + Returns: + None + """ + + import parsl.executors.high_throughput.monitoring_info + + result_queue = parsl.executors.high_throughput.monitoring_info.result_queue + + # this message needs to go in the result queue tagged so that it is treated + # i) as a monitoring message by the interchange, and then further more treated + # as a RESOURCE_INFO message when received by monitoring (rather than a NODE_INFO + # which is the implicit default for messages from the interchange) + + # for the interchange, the outer wrapper, this needs to be a dict: + + interchange_msg = { + 'type': 'monitoring', + 'payload': message + } + + if result_queue: + result_queue.put(pickle.dumps(interchange_msg)) + else: + logger.error("result_queue is uninitialized - cannot put monitoring message") + + return diff --git a/parsl/monitoring/radios/multiprocessing.py b/parsl/monitoring/radios/multiprocessing.py new file mode 100644 index 0000000000..6274bbfca8 --- /dev/null +++ b/parsl/monitoring/radios/multiprocessing.py @@ -0,0 +1,17 @@ +from multiprocessing.queues import Queue + +from parsl.monitoring.radios.base import MonitoringRadioSender + + +class MultiprocessingQueueRadioSender(MonitoringRadioSender): + """A monitoring radio which connects over a multiprocessing Queue. + This radio is intended to be used on the submit side, where components + in the submit process, or processes launched by multiprocessing, will have + access to a Queue shared with the monitoring database code (bypassing the + monitoring router). + """ + def __init__(self, queue: Queue) -> None: + self.queue = queue + + def send(self, message: object) -> None: + self.queue.put(message) diff --git a/parsl/monitoring/radios/udp.py b/parsl/monitoring/radios/udp.py new file mode 100644 index 0000000000..f2a652e9ac --- /dev/null +++ b/parsl/monitoring/radios/udp.py @@ -0,0 +1,56 @@ +import logging +import pickle +import socket + +from parsl.monitoring.radios.base import MonitoringRadioSender + + +class UDPRadioSender(MonitoringRadioSender): + + def __init__(self, monitoring_url: str, timeout: int = 10): + """ + Parameters + ---------- + + monitoring_url : str + URL of the form ://: + timeout : int + timeout, default=10s + """ + self.monitoring_url = monitoring_url + self.sock_timeout = timeout + try: + self.scheme, self.ip, port = (x.strip('/') for x in monitoring_url.split(':')) + self.port = int(port) + except Exception: + raise Exception("Failed to parse monitoring url: {}".format(monitoring_url)) + + self.sock = socket.socket(socket.AF_INET, + socket.SOCK_DGRAM, + socket.IPPROTO_UDP) # UDP + self.sock.settimeout(self.sock_timeout) + + def send(self, message: object) -> None: + """ Sends a message to the UDP receiver + + Parameter + --------- + + message: object + Arbitrary pickle-able object that is to be sent + + Returns: + None + """ + try: + buffer = pickle.dumps(message) + except Exception: + logging.exception("Exception during pickling", exc_info=True) + return + + try: + self.sock.sendto(buffer, (self.ip, self.port)) + except socket.timeout: + logging.error("Could not send message within timeout limit") + return + return diff --git a/parsl/monitoring/radios/zmq.py b/parsl/monitoring/radios/zmq.py new file mode 100644 index 0000000000..397c943568 --- /dev/null +++ b/parsl/monitoring/radios/zmq.py @@ -0,0 +1,17 @@ +import zmq + +from parsl.monitoring.radios.base import MonitoringRadioSender + + +class ZMQRadioSender(MonitoringRadioSender): + """A monitoring radio which connects over ZMQ. This radio is not + thread-safe, because its use of ZMQ is not thread-safe. + """ + + def __init__(self, hub_address: str, hub_zmq_port: int) -> None: + self._hub_channel = zmq.Context().socket(zmq.DEALER) + self._hub_channel.set_hwm(0) + self._hub_channel.connect(f"tcp://{hub_address}:{hub_zmq_port}") + + def send(self, message: object) -> None: + self._hub_channel.send_pyobj(message) diff --git a/parsl/monitoring/remote.py b/parsl/monitoring/remote.py index d72b54dc3c..530b39f935 100644 --- a/parsl/monitoring/remote.py +++ b/parsl/monitoring/remote.py @@ -7,12 +7,10 @@ from typing import Any, Callable, Dict, List, Sequence, Tuple from parsl.monitoring.message_type import MessageType -from parsl.monitoring.radios import ( - FilesystemRadioSender, - HTEXRadioSender, - MonitoringRadioSender, - UDPRadioSender, -) +from parsl.monitoring.radios.base import MonitoringRadioSender +from parsl.monitoring.radios.filesystem import FilesystemRadioSender +from parsl.monitoring.radios.htex import HTEXRadioSender +from parsl.monitoring.radios.udp import UDPRadioSender from parsl.multiprocessing import ForkProcess from parsl.process_loggers import wrap_with_logs diff --git a/parsl/monitoring/router.py b/parsl/monitoring/router.py index 04e7480a7a..0926712c36 100644 --- a/parsl/monitoring/router.py +++ b/parsl/monitoring/router.py @@ -14,7 +14,7 @@ import zmq from parsl.log_utils import set_file_logger -from parsl.monitoring.radios import MultiprocessingQueueRadioSender +from parsl.monitoring.radios.multiprocessing import MultiprocessingQueueRadioSender from parsl.monitoring.types import TaggedMonitoringMessage from parsl.process_loggers import wrap_with_logs from parsl.utils import setproctitle diff --git a/parsl/providers/cluster_provider.py b/parsl/providers/cluster_provider.py index 6bc76bdf22..f2feb2ddf3 100644 --- a/parsl/providers/cluster_provider.py +++ b/parsl/providers/cluster_provider.py @@ -6,6 +6,7 @@ from parsl.launchers.errors import BadLauncher from parsl.providers.base import ExecutionProvider from parsl.providers.errors import SchedulerMissingArgs, ScriptPathError +from parsl.utils import execute_wait logger = logging.getLogger(__name__) @@ -76,7 +77,7 @@ def execute_wait(self, cmd, timeout=None): t = self.cmd_timeout if timeout is not None: t = timeout - return self.channel.execute_wait(cmd, t) + return execute_wait(cmd, t) def _write_submit_script(self, template, script_filename, job_name, configs): """Generate submit script and write it to a file. diff --git a/parsl/providers/condor/condor.py b/parsl/providers/condor/condor.py index c8142c4026..72fd5aa243 100644 --- a/parsl/providers/condor/condor.py +++ b/parsl/providers/condor/condor.py @@ -226,7 +226,7 @@ def submit(self, command, tasks_per_node, job_name="parsl.condor"): job_config = {} job_config["job_name"] = job_name - job_config["submit_script_dir"] = self.channel.script_dir + job_config["submit_script_dir"] = self.script_dir job_config["project"] = self.project job_config["nodes"] = self.nodes_per_block job_config["scheduler_options"] = scheduler_options diff --git a/parsl/providers/grid_engine/grid_engine.py b/parsl/providers/grid_engine/grid_engine.py index ddedcaa3e8..795f1946b4 100644 --- a/parsl/providers/grid_engine/grid_engine.py +++ b/parsl/providers/grid_engine/grid_engine.py @@ -100,7 +100,7 @@ def get_configs(self, command, tasks_per_node): self.nodes_per_block, tasks_per_node)) job_config = {} - job_config["submit_script_dir"] = self.channel.script_dir + job_config["submit_script_dir"] = self.script_dir job_config["nodes"] = self.nodes_per_block job_config["walltime"] = self.walltime job_config["scheduler_options"] = self.scheduler_options diff --git a/parsl/providers/local/local.py b/parsl/providers/local/local.py index 5ecf174df2..6357c85cba 100644 --- a/parsl/providers/local/local.py +++ b/parsl/providers/local/local.py @@ -11,7 +11,7 @@ ScriptPathError, SubmitException, ) -from parsl.utils import RepresentationMixin +from parsl.utils import RepresentationMixin, execute_wait logger = logging.getLogger(__name__) @@ -118,7 +118,7 @@ def status(self, job_ids): return [self.resources[jid]['status'] for jid in job_ids] def _is_alive(self, job_dict): - retcode, stdout, stderr = self.channel.execute_wait( + retcode, stdout, stderr = execute_wait( 'ps -p {} > /dev/null 2> /dev/null; echo "STATUS:$?" '.format( job_dict['remote_pid']), self.cmd_timeout) for line in stdout.split('\n'): @@ -223,11 +223,11 @@ def submit(self, command, tasks_per_node, job_name="parsl.localprovider"): # cancel the task later. # # We need to do the >/dev/null 2>&1 so that bash closes stdout, otherwise - # channel.execute_wait hangs reading the process stdout until all the + # execute_wait hangs reading the process stdout until all the # background commands complete. cmd = '/bin/bash -c \'echo - >{0}.ec && {{ {{ bash {0} 1>{0}.out 2>{0}.err ; ' \ 'echo $? > {0}.ec ; }} >/dev/null 2>&1 & echo "PID:$!" ; }}\''.format(script_path) - retcode, stdout, stderr = self.channel.execute_wait(cmd, self.cmd_timeout) + retcode, stdout, stderr = execute_wait(cmd, self.cmd_timeout) if retcode != 0: raise SubmitException(job_name, "Launch command exited with code {0}".format(retcode), stdout, stderr) @@ -258,7 +258,7 @@ def cancel(self, job_ids): job_dict['cancelled'] = True logger.debug("Terminating job/process ID: {0}".format(job)) cmd = "kill -- -$(ps -o pgid= {} | grep -o '[0-9]*')".format(job_dict['remote_pid']) - retcode, stdout, stderr = self.channel.execute_wait(cmd, self.cmd_timeout) + retcode, stdout, stderr = execute_wait(cmd, self.cmd_timeout) if retcode != 0: logger.warning("Failed to kill PID: {} and child processes on {}".format(job_dict['remote_pid'], self.label)) diff --git a/parsl/providers/lsf/lsf.py b/parsl/providers/lsf/lsf.py index b446b063a4..dced93831b 100644 --- a/parsl/providers/lsf/lsf.py +++ b/parsl/providers/lsf/lsf.py @@ -211,7 +211,7 @@ def submit(self, command, tasks_per_node, job_name="parsl.lsf"): logger.debug("Requesting one block with {} nodes".format(self.nodes_per_block)) job_config = {} - job_config["submit_script_dir"] = self.channel.script_dir + job_config["submit_script_dir"] = self.script_dir job_config["nodes"] = self.nodes_per_block job_config["tasks_per_node"] = tasks_per_node job_config["walltime"] = wtime_to_minutes(self.walltime) diff --git a/parsl/providers/pbspro/pbspro.py b/parsl/providers/pbspro/pbspro.py index 71c958f000..b02237a226 100644 --- a/parsl/providers/pbspro/pbspro.py +++ b/parsl/providers/pbspro/pbspro.py @@ -159,7 +159,7 @@ def submit(self, command, tasks_per_node, job_name="parsl"): ) job_config = {} - job_config["submit_script_dir"] = self.channel.script_dir + job_config["submit_script_dir"] = self.script_dir job_config["nodes_per_block"] = self.nodes_per_block job_config["ncpus"] = self.cpus_per_node job_config["walltime"] = self.walltime diff --git a/parsl/providers/slurm/slurm.py b/parsl/providers/slurm/slurm.py index 9b6f38b9d9..92f1a31ad6 100644 --- a/parsl/providers/slurm/slurm.py +++ b/parsl/providers/slurm/slurm.py @@ -3,7 +3,7 @@ import os import re import time -from typing import Optional +from typing import Any, Dict, Optional import typeguard @@ -286,8 +286,8 @@ def submit(self, command: str, tasks_per_node: int, job_name="parsl.slurm") -> s logger.debug("Requesting one block with {} nodes".format(self.nodes_per_block)) - job_config = {} - job_config["submit_script_dir"] = self.channel.script_dir + job_config: Dict[str, Any] = {} + job_config["submit_script_dir"] = self.script_dir job_config["nodes"] = self.nodes_per_block job_config["tasks_per_node"] = tasks_per_node job_config["walltime"] = wtime_to_minutes(self.walltime) diff --git a/parsl/providers/torque/torque.py b/parsl/providers/torque/torque.py index 7992893abb..c1c778a42a 100644 --- a/parsl/providers/torque/torque.py +++ b/parsl/providers/torque/torque.py @@ -171,7 +171,7 @@ def submit(self, command, tasks_per_node, job_name="parsl.torque"): job_config = {} # TODO : script_path might need to change to accommodate script dir set via channels - job_config["submit_script_dir"] = self.channel.script_dir + job_config["submit_script_dir"] = self.script_dir job_config["nodes"] = self.nodes_per_block job_config["task_blocks"] = self.nodes_per_block * tasks_per_node job_config["nodes_per_block"] = self.nodes_per_block diff --git a/parsl/tests/configs/htex_local_alternate.py b/parsl/tests/configs/htex_local_alternate.py index d84a07ad84..5667f3ff8c 100644 --- a/parsl/tests/configs/htex_local_alternate.py +++ b/parsl/tests/configs/htex_local_alternate.py @@ -62,7 +62,6 @@ def fresh_config(): retries=2, monitoring=MonitoringHub( hub_address="localhost", - hub_port=55055, monitoring_debug=False, resource_monitoring_interval=1, ), diff --git a/parsl/tests/configs/local_threads_monitoring.py b/parsl/tests/configs/local_threads_monitoring.py index 81b9095285..9f105af25d 100644 --- a/parsl/tests/configs/local_threads_monitoring.py +++ b/parsl/tests/configs/local_threads_monitoring.py @@ -5,7 +5,6 @@ config = Config(executors=[ThreadPoolExecutor(label='threads', max_threads=4)], monitoring=MonitoringHub( hub_address="localhost", - hub_port=55055, resource_monitoring_interval=3, ) ) diff --git a/parsl/tests/manual_tests/test_udp_simple.py b/parsl/tests/manual_tests/test_udp_simple.py index 8de257d8fa..847f8803bf 100644 --- a/parsl/tests/manual_tests/test_udp_simple.py +++ b/parsl/tests/manual_tests/test_udp_simple.py @@ -15,7 +15,6 @@ def local_setup(): ], monitoring=MonitoringHub( hub_address="127.0.0.1", - hub_port=55055, logging_level=logging.INFO, resource_monitoring_interval=10)) diff --git a/parsl/tests/test_channels/test_large_output.py b/parsl/tests/test_channels/test_large_output.py deleted file mode 100644 index bfc96f38bc..0000000000 --- a/parsl/tests/test_channels/test_large_output.py +++ /dev/null @@ -1,22 +0,0 @@ -import pytest - -from parsl.channels.local.local import LocalChannel - - -@pytest.mark.local -def test_local_large_output_2210(): - """Regression test for #2210. - The local channel was hanging if the specified command gave too - much output, due to a race condition between process exiting and - pipes filling up. - """ - - c = LocalChannel() - - # this will output 128kb of stdout - c.execute_wait("yes | dd count=128 bs=1024", walltime=60) - - # if this test fails, execute_wait should raise a timeout - # exception. - - # The contents out the output is not verified by this test diff --git a/parsl/tests/test_channels/test_local_channel.py b/parsl/tests/test_channels/test_local_channel.py deleted file mode 100644 index a3f55d096d..0000000000 --- a/parsl/tests/test_channels/test_local_channel.py +++ /dev/null @@ -1,19 +0,0 @@ -import pytest - -from parsl.channels.local.local import LocalChannel - - -@pytest.mark.local -def test_env(): - ''' Regression testing for issue #27 - ''' - - lc = LocalChannel() - rc, stdout, stderr = lc.execute_wait("env", 1) - - stdout = stdout.split('\n') - x = [s for s in stdout if s.startswith("PATH=")] - assert x, "PATH not found" - - x = [s for s in stdout if s.startswith("HOME=")] - assert x, "HOME not found" diff --git a/parsl/tests/test_monitoring/test_htex_init_blocks_vs_monitoring.py b/parsl/tests/test_monitoring/test_htex_init_blocks_vs_monitoring.py index ada972e747..c54486f011 100644 --- a/parsl/tests/test_monitoring/test_htex_init_blocks_vs_monitoring.py +++ b/parsl/tests/test_monitoring/test_htex_init_blocks_vs_monitoring.py @@ -37,7 +37,6 @@ def fresh_config(run_dir, strategy, db_url): strategy_period=0.1, monitoring=MonitoringHub( hub_address="localhost", - hub_port=55055, logging_endpoint=db_url ) ) diff --git a/parsl/tests/test_monitoring/test_stdouterr.py b/parsl/tests/test_monitoring/test_stdouterr.py index d1817164c0..8e1935045f 100644 --- a/parsl/tests/test_monitoring/test_stdouterr.py +++ b/parsl/tests/test_monitoring/test_stdouterr.py @@ -37,7 +37,6 @@ def fresh_config(run_dir): strategy_period=0.1, monitoring=MonitoringHub( hub_address="localhost", - hub_port=55055, ) ) diff --git a/parsl/tests/test_providers/test_pbspro_template.py b/parsl/tests/test_providers/test_pbspro_template.py index dec987ccbb..ef9a642541 100644 --- a/parsl/tests/test_providers/test_pbspro_template.py +++ b/parsl/tests/test_providers/test_pbspro_template.py @@ -15,7 +15,6 @@ def test_submit_script_basic(tmp_path): queue="debug", channel=LocalChannel() ) provider.script_dir = tmp_path - provider.channel.script_dir = tmp_path job_id = str(random.randint(55000, 59000)) provider.execute_wait = mock.Mock(spec=PBSProProvider.execute_wait) provider.execute_wait.return_value = (0, job_id, "") diff --git a/parsl/tests/test_providers/test_slurm_template.py b/parsl/tests/test_providers/test_slurm_template.py index 57fd8e4d0b..ceedfa5ee8 100644 --- a/parsl/tests/test_providers/test_slurm_template.py +++ b/parsl/tests/test_providers/test_slurm_template.py @@ -16,7 +16,6 @@ def test_submit_script_basic(tmp_path): partition="debug", channel=LocalChannel() ) provider.script_dir = tmp_path - provider.channel.script_dir = tmp_path job_id = str(random.randint(55000, 59000)) provider.execute_wait = mock.MagicMock(spec=SlurmProvider.execute_wait) provider.execute_wait.return_value = (0, f"Submitted batch job {job_id}", "") diff --git a/parsl/tests/test_utils/test_execute_wait.py b/parsl/tests/test_utils/test_execute_wait.py new file mode 100644 index 0000000000..44488c239c --- /dev/null +++ b/parsl/tests/test_utils/test_execute_wait.py @@ -0,0 +1,35 @@ +import pytest + +from parsl.utils import execute_wait + + +@pytest.mark.local +def test_env(): + ''' Regression testing for issue #27 + ''' + + rc, stdout, stderr = execute_wait("env", 1) + + stdout = stdout.split('\n') + x = [s for s in stdout if s.startswith("PATH=")] + assert x, "PATH not found" + + x = [s for s in stdout if s.startswith("HOME=")] + assert x, "HOME not found" + + +@pytest.mark.local +def test_large_output_2210(): + """Regression test for #2210. + execute_wait was hanging if the specified command gave too + much output, due to a race condition between process exiting and + pipes filling up. + """ + + # this will output 128kb of stdout + execute_wait("yes | dd count=128 bs=1024", walltime=60) + + # if this test fails, execute_wait should raise a timeout + # exception. + + # The contents out the output is not verified by this test diff --git a/parsl/utils.py b/parsl/utils.py index 0ea5d7d9eb..b6544d63d2 100644 --- a/parsl/utils.py +++ b/parsl/utils.py @@ -458,3 +458,38 @@ def sanitize_dns_subdomain_rfc1123(raw_string: str) -> str: raise ValueError(f"Sanitized DNS subdomain is empty for input '{raw_string}'") return sanitized + + +def execute_wait(cmd: str, walltime: Optional[int] = None) -> Tuple[int, str, str]: + ''' Synchronously execute a commandline string on the shell. + + Args: + - cmd (string) : Commandline string to execute + - walltime (int) : walltime in seconds + + Returns: + - retcode : Return code from the execution + - stdout : stdout string + - stderr : stderr string + ''' + try: + logger.debug("Creating process with command '%s'", cmd) + proc = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + shell=True, + preexec_fn=os.setpgrp + ) + logger.debug("Created process with pid %s. Performing communicate", proc.pid) + (stdout, stderr) = proc.communicate(timeout=walltime) + retcode = proc.returncode + logger.debug("Process %s returned %s", proc.pid, proc.returncode) + + except Exception: + logger.exception(f"Execution of command failed:\n{cmd}") + raise + else: + logger.debug("Execution of command in process %s completed normally", proc.pid) + + return (retcode, stdout.decode("utf-8"), stderr.decode("utf-8"))