diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index c7ac48db44..12945d83c9 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -744,16 +744,16 @@ def launch_task(self, task_record: TaskRecord) -> Future: if self.monitoring is not None and self.monitoring.resource_monitoring_enabled: wrapper_logging_level = logging.DEBUG if self.monitoring.monitoring_debug else logging.INFO + (function, args, kwargs) = monitor_wrapper(f=function, args=args, kwargs=kwargs, x_try_id=try_id, x_task_id=task_id, - monitoring_hub_url=self.monitoring.monitoring_hub_url, + radio_config=executor.remote_monitoring_radio_config, run_id=self.run_id, logging_level=wrapper_logging_level, sleep_dur=self.monitoring.resource_monitoring_interval, - radio_mode=executor.radio_mode, monitor_resources=executor.monitor_resources(), run_dir=self.run_dir) @@ -1181,6 +1181,18 @@ def add_executors(self, executors: Sequence[ParslExecutor]) -> None: executor.hub_address = self.monitoring.hub_address executor.hub_zmq_port = self.monitoring.hub_zmq_port executor.submit_monitoring_radio = self.monitoring.radio + # this will modify the radio config object: it will add relevant parameters needed + # for the particular remote radio sender to communicate back + logger.info("starting monitoring receiver " + f"for executor {executor} " + f"with remote monitoring radio config {executor.remote_monitoring_radio_config}") + executor.monitoring_receiver = self.monitoring.start_receiver(executor.remote_monitoring_radio_config, + ip=self.monitoring.hub_address) + # TODO: this is a weird way to start the receiver. + # Rather than in executor.start, but there's a tangle here + # trying to make the executors usable in a non-pure-parsl + # context where there is no DFK to grab config out of? + # (and no monitoring...) if hasattr(executor, 'provider'): if hasattr(executor.provider, 'script_dir'): executor.provider.script_dir = os.path.join(self.run_dir, 'submit_scripts') diff --git a/parsl/executors/base.py b/parsl/executors/base.py index a112b9eb00..94c485d4be 100644 --- a/parsl/executors/base.py +++ b/parsl/executors/base.py @@ -1,3 +1,4 @@ +import logging import os from abc import ABCMeta, abstractmethod from concurrent.futures import Future @@ -5,7 +6,14 @@ from typing_extensions import Literal, Self -from parsl.monitoring.radios import MonitoringRadioSender +from parsl.monitoring.radios.base import ( + MonitoringRadioReceiver, + MonitoringRadioSender, + RadioConfig, +) +from parsl.monitoring.radios.udp import UDPRadio + +logger = logging.getLogger(__name__) class ParslExecutor(metaclass=ABCMeta): @@ -19,15 +27,13 @@ class ParslExecutor(metaclass=ABCMeta): no arguments and re-raises any thrown exception. In addition to the listed methods, a ParslExecutor instance must always - have a member field: + have these member fields: label: str - a human readable label for the executor, unique with respect to other executors. - Per-executor monitoring behaviour can be influenced by exposing: - - radio_mode: str - a string describing which radio mode should be used to - send task resource data back to the submit side. + remote_monitoring_radio_config: RadioConfig describing how tasks on this executor + should report task resource status An executor may optionally expose: @@ -45,11 +51,16 @@ class ParslExecutor(metaclass=ABCMeta): """ label: str = "undefined" - radio_mode: str = "udp" def __init__( self, *, + + # TODO: I'd like these two to go away but they're needed right now + # to configure the interchange monitoring radio, that is + # in addition to the submit and worker monitoring radios (!). They + # are effectivley a third monitoring radio config, though, so what + # should that look like for the interchange? hub_address: Optional[str] = None, hub_zmq_port: Optional[int] = None, submit_monitoring_radio: Optional[MonitoringRadioSender] = None, @@ -58,10 +69,19 @@ def __init__( ): self.hub_address = hub_address self.hub_zmq_port = hub_zmq_port + + # these are parameters for the monitoring radio to be used on the remote side + # eg. in workers - to send results back, and they should end up encapsulated + # inside a RadioConfig. self.submit_monitoring_radio = submit_monitoring_radio + self.remote_monitoring_radio_config: RadioConfig = UDPRadio() + self.run_dir = os.path.abspath(run_dir) self.run_id = run_id + # will be set externally later, which is pretty ugly + self.monitoring_receiver: Optional[MonitoringRadioReceiver] = None + def __enter__(self) -> Self: return self @@ -94,7 +114,13 @@ def shutdown(self) -> None: This includes all attached resources such as workers and controllers. """ - pass + logger.debug("Starting base executor shutdown") + # logger.error(f"BENC: monitoring receiver on {self} is {self.monitoring_receiver}") + if self.monitoring_receiver is not None: + logger.debug("Starting monitoring receiver shutdown") + self.monitoring_receiver.shutdown() + logger.debug("Done with monitoring receiver shutdown") + logger.debug("Done with base executor shutdown") def monitor_resources(self) -> bool: """Should resource monitoring happen for tasks on running on this executor? diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index d616627a61..8bb6bb292e 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -26,6 +26,7 @@ ) from parsl.executors.status_handling import BlockProviderExecutor from parsl.jobs.states import TERMINAL_STATES, JobState, JobStatus +from parsl.monitoring.radios.base import HTEXRadio, RadioConfig from parsl.process_loggers import wrap_with_logs from parsl.providers import LocalProvider from parsl.providers.base import ExecutionProvider @@ -262,11 +263,13 @@ def __init__(self, enable_mpi_mode: bool = False, mpi_launcher: str = "mpiexec", block_error_handler: Union[bool, Callable[[BlockProviderExecutor, Dict[str, JobStatus]], None]] = True, - encrypted: bool = False): + encrypted: bool = False, + remote_monitoring_radio_config: Optional[RadioConfig] = None): logger.debug("Initializing HighThroughputExecutor") BlockProviderExecutor.__init__(self, provider=provider, block_error_handler=block_error_handler) + self.label = label self.worker_debug = worker_debug self.storage_access = storage_access @@ -310,6 +313,12 @@ def __init__(self, self._workers_per_node = 1 # our best guess-- we do not have any provider hints self._task_counter = 0 + + if remote_monitoring_radio_config is not None: + self.remote_monitoring_radio_config = remote_monitoring_radio_config + else: + self.remote_monitoring_radio_config = HTEXRadio() + self.worker_ports = worker_ports self.worker_port_range = worker_port_range self.interchange_proc: Optional[subprocess.Popen] = None @@ -341,8 +350,6 @@ def __init__(self, interchange_launch_cmd = DEFAULT_INTERCHANGE_LAUNCH_CMD self.interchange_launch_cmd = interchange_launch_cmd - radio_mode = "htex" - def _warn_deprecated(self, old: str, new: str): warnings.warn( f"{old} is deprecated and will be removed in a future release. " @@ -852,6 +859,9 @@ def shutdown(self, timeout: float = 10.0): logger.info("Closing command client") self.command_client.close() + # TODO: implement this across all executors + super().shutdown() + logger.info("Finished HighThroughputExecutor shutdown attempt") def get_usage_information(self): diff --git a/parsl/executors/high_throughput/interchange.py b/parsl/executors/high_throughput/interchange.py index 0e2abbaf5c..65a6ebc09c 100644 --- a/parsl/executors/high_throughput/interchange.py +++ b/parsl/executors/high_throughput/interchange.py @@ -20,7 +20,7 @@ from parsl.executors.high_throughput.errors import ManagerLost, VersionMismatch from parsl.executors.high_throughput.manager_record import ManagerRecord from parsl.monitoring.message_type import MessageType -from parsl.monitoring.radios import MonitoringRadioSender, ZMQRadioSender +from parsl.monitoring.radios.base import MonitoringRadioSender, ZMQRadioSender from parsl.process_loggers import wrap_with_logs from parsl.serialize import serialize as serialize_object from parsl.utils import setproctitle diff --git a/parsl/executors/high_throughput/mpi_executor.py b/parsl/executors/high_throughput/mpi_executor.py index b8045d38b3..26487c5dc1 100644 --- a/parsl/executors/high_throughput/mpi_executor.py +++ b/parsl/executors/high_throughput/mpi_executor.py @@ -10,6 +10,7 @@ ) from parsl.executors.status_handling import BlockProviderExecutor from parsl.jobs.states import JobStatus +from parsl.monitoring.radios.base import RadioConfig from parsl.providers import LocalProvider from parsl.providers.base import ExecutionProvider @@ -56,7 +57,8 @@ def __init__(self, worker_logdir_root: Optional[str] = None, mpi_launcher: str = "mpiexec", block_error_handler: Union[bool, Callable[[BlockProviderExecutor, Dict[str, JobStatus]], None]] = True, - encrypted: bool = False): + encrypted: bool = False, + remote_monitoring_radio_config: Optional[RadioConfig] = None): super().__init__( # Hard-coded settings cores_per_worker=1e-9, # Ensures there will be at least an absurd number of workers @@ -84,7 +86,15 @@ def __init__(self, worker_logdir_root=worker_logdir_root, mpi_launcher=mpi_launcher, block_error_handler=block_error_handler, - encrypted=encrypted + encrypted=encrypted, + + # TODO: + # worker-side monitoring in MPI-style code is probably going to be + # broken - resource monitoring won't see any worker processes + # most likely, as so perhaps it should have worker resource + # monitoring disabled like the thread pool executor has? + # (for related but different reasons...) + remote_monitoring_radio_config=remote_monitoring_radio_config ) self.max_workers_per_block = max_workers_per_block diff --git a/parsl/executors/high_throughput/process_worker_pool.py b/parsl/executors/high_throughput/process_worker_pool.py index 5c766123d7..31942f1c55 100755 --- a/parsl/executors/high_throughput/process_worker_pool.py +++ b/parsl/executors/high_throughput/process_worker_pool.py @@ -834,12 +834,13 @@ def start_file_logger(filename, rank, name='parsl', level=logging.DEBUG, format_ "[%(levelname)s] %(message)s" logger = logging.getLogger(name) + logger2 = logging.getLogger("") logger.setLevel(logging.DEBUG) handler = logging.FileHandler(filename) handler.setLevel(level) formatter = logging.Formatter(format_string, datefmt='%Y-%m-%d %H:%M:%S') handler.setFormatter(formatter) - logger.addHandler(handler) + logger2.addHandler(handler) return logger diff --git a/parsl/executors/taskvine/executor.py b/parsl/executors/taskvine/executor.py index 6cfedf92bb..fd590200f8 100644 --- a/parsl/executors/taskvine/executor.py +++ b/parsl/executors/taskvine/executor.py @@ -619,6 +619,8 @@ def shutdown(self, *args, **kwargs): self._finished_task_queue.close() self._finished_task_queue.join_thread() + super().shutdown() + logger.debug("TaskVine shutdown completed") @wrap_with_logs diff --git a/parsl/executors/threads.py b/parsl/executors/threads.py index cbc813d9d4..d36072f163 100644 --- a/parsl/executors/threads.py +++ b/parsl/executors/threads.py @@ -72,6 +72,7 @@ def shutdown(self, block=True): """ logger.debug("Shutting down executor, which involves waiting for running tasks to complete") self.executor.shutdown(wait=block) + super().shutdown() logger.debug("Done with executor shutdown") def monitor_resources(self): diff --git a/parsl/executors/workqueue/executor.py b/parsl/executors/workqueue/executor.py index e715c23891..1a3371858e 100644 --- a/parsl/executors/workqueue/executor.py +++ b/parsl/executors/workqueue/executor.py @@ -731,6 +731,8 @@ def shutdown(self, *args, **kwargs): self.collector_queue.close() self.collector_queue.join_thread() + super().shutdown() + logger.debug("Work Queue shutdown completed") @wrap_with_logs diff --git a/parsl/monitoring/monitoring.py b/parsl/monitoring/monitoring.py index e1de80116c..4dceb8f971 100644 --- a/parsl/monitoring/monitoring.py +++ b/parsl/monitoring/monitoring.py @@ -5,7 +5,7 @@ import os import queue import time -from multiprocessing import Event, Process +from multiprocessing import Event from multiprocessing.queues import Queue from typing import TYPE_CHECKING, Any, Literal, Optional, Tuple, Union, cast @@ -14,7 +14,7 @@ from parsl.log_utils import set_file_logger from parsl.monitoring.errors import MonitoringHubStartError from parsl.monitoring.message_type import MessageType -from parsl.monitoring.radios import MultiprocessingQueueRadioSender +from parsl.monitoring.radios.base import MultiprocessingQueueRadioSender, RadioConfig from parsl.monitoring.router import router_starter from parsl.monitoring.types import AddressedMonitoringMessage from parsl.multiprocessing import ForkProcess, SizedQueue @@ -129,7 +129,7 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No # in the future, Queue will allow runtime subscripts. if TYPE_CHECKING: - comm_q: Queue[Union[Tuple[int, int], str]] + comm_q: Queue[Union[int, str]] else: comm_q: Queue @@ -150,7 +150,6 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No "resource_msgs": self.resource_msgs, "exit_event": self.router_exit_event, "hub_address": self.hub_address, - "udp_port": self.hub_port, "zmq_port_range": self.hub_port_range, "logdir": self.logdir, "logging_level": logging.DEBUG if self.monitoring_debug else logging.INFO, @@ -172,13 +171,13 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No self.dbm_proc.start() logger.info("Started the router process {} and DBM process {}".format(self.router_proc.pid, self.dbm_proc.pid)) - self.filesystem_proc = Process(target=filesystem_receiver, - args=(self.logdir, self.resource_msgs, dfk_run_dir), - name="Monitoring-Filesystem-Process", - daemon=True - ) - self.filesystem_proc.start() - logger.info(f"Started filesystem radio receiver process {self.filesystem_proc.pid}") + # self.filesystem_proc = Process(target=filesystem_receiver, + # args=(self.logdir, self.resource_msgs, dfk_run_dir), + # name="Monitoring-Filesystem-Process", + # daemon=True + # ) + # self.filesystem_proc.start() + # logger.info(f"Started filesystem radio receiver process {self.filesystem_proc.pid}") self.radio = MultiprocessingQueueRadioSender(self.resource_msgs) @@ -194,9 +193,23 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No logger.error(f"MonitoringRouter sent an error message: {comm_q_result}") raise RuntimeError(f"MonitoringRouter failed to start: {comm_q_result}") - udp_port, zmq_port = comm_q_result + zmq_port = comm_q_result + + self.zmq_port = zmq_port - self.monitoring_hub_url = "udp://{}:{}".format(self.hub_address, udp_port) + # need to initialize radio configs, perhaps first time a radio config is used + # in each executor? (can't do that at startup because executor list is dynamic, + # don't know all the executors till later) + # self.radio_config.monitoring_hub_url = "udp://{}:{}".format(self.hub_address, udp_port) + # How can this config be populated properly? + # There's a UDP port chosen right now by the monitoring router and + # sent back a line above... + # What does that look like for other radios? htexradio has no specific config at all, + # filesystem radio has a path (that should have been created?) for config, and a loop + # that needs to be running, started in this start method. + # so something like... radio_config.receive() generates the appropriate receiver object? + # which has a shutdown method on it for later. and also updates radio_config itself so + # it has the right info to send across the wire? or some state driving like that? logger.info("Monitoring Hub initialized") @@ -228,7 +241,7 @@ def close(self) -> None: ) self.router_proc.terminate() self.dbm_proc.terminate() - self.filesystem_proc.terminate() + # self.filesystem_proc.terminate() logger.info("Setting router termination event") self.router_exit_event.set() logger.info("Waiting for router to terminate") @@ -248,9 +261,9 @@ def close(self) -> None: # should this be message based? it probably doesn't need to be if # we believe we've received all messages logger.info("Terminating filesystem radio receiver process") - self.filesystem_proc.terminate() - self.filesystem_proc.join() - self.filesystem_proc.close() + # self.filesystem_proc.terminate() + # self.filesystem_proc.join() + # self.filesystem_proc.close() logger.info("Closing monitoring multiprocessing queues") self.exception_q.close() @@ -259,6 +272,17 @@ def close(self) -> None: self.resource_msgs.join_thread() logger.info("Closed monitoring multiprocessing queues") + def start_receiver(self, radio_config: RadioConfig, ip: str) -> Any: + """somehow start a radio receiver here and update radioconfig to be sent over the wire, without + losing the info we need to shut down that receiver later... + """ + r = radio_config.create_receiver(ip=ip, resource_msgs=self.resource_msgs) # TODO: return a shutdownable... + logger.info(f"BENC: created receiver {r}") + # assert r is not None + return r + # ... that is, a thing we need to do a shutdown call on at shutdown, a "shutdownable"? without + # expecting any more structure on it? + @wrap_with_logs def filesystem_receiver(logdir: str, q: "queue.Queue[AddressedMonitoringMessage]", run_dir: str) -> None: diff --git a/parsl/monitoring/radios.py b/parsl/monitoring/radios.py deleted file mode 100644 index fadf21f57f..0000000000 --- a/parsl/monitoring/radios.py +++ /dev/null @@ -1,209 +0,0 @@ -import logging -import os -import pickle -import socket -import uuid -from abc import ABCMeta, abstractmethod -from multiprocessing.queues import Queue -from typing import Optional - -import zmq - -from parsl.serialize import serialize - -_db_manager_excepts: Optional[Exception] - - -logger = logging.getLogger(__name__) - - -class MonitoringRadioSender(metaclass=ABCMeta): - @abstractmethod - def send(self, message: object) -> None: - pass - - -class FilesystemRadioSender(MonitoringRadioSender): - """A MonitoringRadioSender that sends messages over a shared filesystem. - - The messsage directory structure is based on maildir, - https://en.wikipedia.org/wiki/Maildir - - The writer creates a message in tmp/ and then when it is fully - written, moves it atomically into new/ - - The reader ignores tmp/ and only reads and deletes messages from - new/ - - This avoids a race condition of reading partially written messages. - - This radio is likely to give higher shared filesystem load compared to - the UDP radio, but should be much more reliable. - """ - - def __init__(self, *, monitoring_url: str, source_id: int, timeout: int = 10, run_dir: str): - logger.info("filesystem based monitoring channel initializing") - self.source_id = source_id - self.base_path = f"{run_dir}/monitor-fs-radio/" - self.tmp_path = f"{self.base_path}/tmp" - self.new_path = f"{self.base_path}/new" - - os.makedirs(self.tmp_path, exist_ok=True) - os.makedirs(self.new_path, exist_ok=True) - - def send(self, message: object) -> None: - logger.info("Sending a monitoring message via filesystem") - - unique_id = str(uuid.uuid4()) - - tmp_filename = f"{self.tmp_path}/{unique_id}" - new_filename = f"{self.new_path}/{unique_id}" - buffer = (message, "NA") - - # this will write the message out then atomically - # move it into new/, so that a partially written - # file will never be observed in new/ - with open(tmp_filename, "wb") as f: - f.write(serialize(buffer)) - os.rename(tmp_filename, new_filename) - - -class HTEXRadioSender(MonitoringRadioSender): - - def __init__(self, monitoring_url: str, source_id: int, timeout: int = 10): - """ - Parameters - ---------- - - monitoring_url : str - URL of the form ://: - source_id : str - String identifier of the source - timeout : int - timeout, default=10s - """ - self.source_id = source_id - logger.info("htex-based monitoring channel initialising") - - def send(self, message: object) -> None: - """ Sends a message to the UDP receiver - - Parameter - --------- - - message: object - Arbitrary pickle-able object that is to be sent - - Returns: - None - """ - - import parsl.executors.high_throughput.monitoring_info - - result_queue = parsl.executors.high_throughput.monitoring_info.result_queue - - # this message needs to go in the result queue tagged so that it is treated - # i) as a monitoring message by the interchange, and then further more treated - # as a RESOURCE_INFO message when received by monitoring (rather than a NODE_INFO - # which is the implicit default for messages from the interchange) - - # for the interchange, the outer wrapper, this needs to be a dict: - - interchange_msg = { - 'type': 'monitoring', - 'payload': message - } - - if result_queue: - result_queue.put(pickle.dumps(interchange_msg)) - else: - logger.error("result_queue is uninitialized - cannot put monitoring message") - - return - - -class UDPRadioSender(MonitoringRadioSender): - - def __init__(self, monitoring_url: str, source_id: int, timeout: int = 10): - """ - Parameters - ---------- - - monitoring_url : str - URL of the form ://: - source_id : str - String identifier of the source - timeout : int - timeout, default=10s - """ - self.monitoring_url = monitoring_url - self.sock_timeout = timeout - self.source_id = source_id - try: - self.scheme, self.ip, port = (x.strip('/') for x in monitoring_url.split(':')) - self.port = int(port) - except Exception: - raise Exception("Failed to parse monitoring url: {}".format(monitoring_url)) - - self.sock = socket.socket(socket.AF_INET, - socket.SOCK_DGRAM, - socket.IPPROTO_UDP) # UDP - self.sock.settimeout(self.sock_timeout) - - def send(self, message: object) -> None: - """ Sends a message to the UDP receiver - - Parameter - --------- - - message: object - Arbitrary pickle-able object that is to be sent - - Returns: - None - """ - try: - buffer = pickle.dumps(message) - except Exception: - logging.exception("Exception during pickling", exc_info=True) - return - - try: - self.sock.sendto(buffer, (self.ip, self.port)) - except socket.timeout: - logging.error("Could not send message within timeout limit") - return - return - - -class MultiprocessingQueueRadioSender(MonitoringRadioSender): - """A monitoring radio which connects over a multiprocessing Queue. - This radio is intended to be used on the submit side, where components - in the submit process, or processes launched by multiprocessing, will have - access to a Queue shared with the monitoring database code (bypassing the - monitoring router). - """ - def __init__(self, queue: Queue) -> None: - self.queue = queue - - def send(self, message: object) -> None: - self.queue.put((message, 0)) - - -class ZMQRadioSender(MonitoringRadioSender): - """A monitoring radio which connects over ZMQ. This radio is not - thread-safe, because its use of ZMQ is not thread-safe. - """ - - def __init__(self, hub_address: str, hub_zmq_port: int) -> None: - print("in zmq radio init. about to log.") - logger.debug("Creating ZMQ socket") - print("in zmq radio init. logged first log. about to create context.") - self._hub_channel = zmq.Context().socket(zmq.DEALER) - print("in zmq radio init. created context.") - self._hub_channel.set_hwm(0) - self._hub_channel.connect(f"tcp://{hub_address}:{hub_zmq_port}") - logger.debug("Created ZMQ socket") - - def send(self, message: object) -> None: - self._hub_channel.send_pyobj(message) diff --git a/parsl/monitoring/radios/__init__.py b/parsl/monitoring/radios/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/parsl/monitoring/radios/base.py b/parsl/monitoring/radios/base.py new file mode 100644 index 0000000000..57cde86d51 --- /dev/null +++ b/parsl/monitoring/radios/base.py @@ -0,0 +1,126 @@ +import logging +import pickle +from abc import ABCMeta, abstractmethod +from multiprocessing.queues import Queue +from typing import Any, Optional + +import zmq + +_db_manager_excepts: Optional[Exception] + +logger = logging.getLogger(__name__) + + +class MonitoringRadioReceiver(metaclass=ABCMeta): + @abstractmethod + def shutdown(self) -> None: + pass + + +class MonitoringRadioSender(metaclass=ABCMeta): + @abstractmethod + def send(self, message: object) -> None: + pass + + +class RadioConfig(metaclass=ABCMeta): + """Base class for radio plugin configuration. + """ + @abstractmethod + def create_sender(self, *, source_id: int) -> MonitoringRadioSender: + pass + + @abstractmethod + def create_receiver(self, *, ip: str, resource_msgs: Queue) -> Any: + # TODO: return a shutdownable, and probably take some context to help in + # creation of the radio config? esp. the ZMQ endpoint to send messages to + # from the receiving process that might be created? + """create a receiver for this config, and update this config as + appropriate so that create_sender will be able to connect back to that + receiver in whichever way is relevant. create_sender can assume + that create_receiver has been called.section 35 of the Opticians Act 1989 provides for a lower quorum of two""" + pass + + +class HTEXRadio(RadioConfig): + def create_sender(self, *, source_id: int) -> MonitoringRadioSender: + return HTEXRadioSender(source_id=source_id) + + def create_receiver(self, *, ip: str, resource_msgs: Queue) -> None: + pass + + +class HTEXRadioSender(MonitoringRadioSender): + + def __init__(self, source_id: int): + self.source_id = source_id + logger.info("htex-based monitoring channel initialising") + + def send(self, message: object) -> None: + """ Sends a message to the UDP receiver + + Parameter + --------- + + message: object + Arbitrary pickle-able object that is to be sent + + Returns: + None + """ + + import parsl.executors.high_throughput.monitoring_info + + result_queue = parsl.executors.high_throughput.monitoring_info.result_queue + + # this message needs to go in the result queue tagged so that it is treated + # i) as a monitoring message by the interchange, and then further more treated + # as a RESOURCE_INFO message when received by monitoring (rather than a NODE_INFO + # which is the implicit default for messages from the interchange) + + # for the interchange, the outer wrapper, this needs to be a dict: + + interchange_msg = { + 'type': 'monitoring', + 'payload': message + } + + if result_queue: + result_queue.put(pickle.dumps(interchange_msg)) + else: + logger.error("result_queue is uninitialized - cannot put monitoring message") + + return + + +class MultiprocessingQueueRadioSender(MonitoringRadioSender): + """A monitoring radio which connects over a multiprocessing Queue. + This radio is intended to be used on the submit side, where components + in the submit process, or processes launched by multiprocessing, will have + access to a Queue shared with the monitoring database code (bypassing the + monitoring router). + """ + def __init__(self, queue: Queue) -> None: + self.queue = queue + + def send(self, message: object) -> None: + self.queue.put((message, 0)) + + +class ZMQRadioSender(MonitoringRadioSender): + """A monitoring radio which connects over ZMQ. This radio is not + thread-safe, because its use of ZMQ is not thread-safe. + """ + + def __init__(self, hub_address: str, hub_zmq_port: int) -> None: + print("in zmq radio init. about to log.") + logger.debug("Creating ZMQ socket") + print("in zmq radio init. logged first log. about to create context.") + self._hub_channel = zmq.Context().socket(zmq.DEALER) + print("in zmq radio init. created context.") + self._hub_channel.set_hwm(0) + self._hub_channel.connect(f"tcp://{hub_address}:{hub_zmq_port}") + logger.debug("Created ZMQ socket") + + def send(self, message: object) -> None: + self._hub_channel.send_pyobj(message) diff --git a/parsl/monitoring/radios/filesystem.py b/parsl/monitoring/radios/filesystem.py new file mode 100644 index 0000000000..44b96860c0 --- /dev/null +++ b/parsl/monitoring/radios/filesystem.py @@ -0,0 +1,53 @@ +import logging +import os +import uuid + +from parsl.monitoring.radios.base import MonitoringRadioSender +from parsl.serialize import serialize + +logger = logging.getLogger(__name__) + + +class FilesystemRadioSender(MonitoringRadioSender): + """A MonitoringRadioSender that sends messages over a shared filesystem. + + The messsage directory structure is based on maildir, + https://en.wikipedia.org/wiki/Maildir + + The writer creates a message in tmp/ and then when it is fully + written, moves it atomically into new/ + + The reader ignores tmp/ and only reads and deletes messages from + new/ + + This avoids a race condition of reading partially written messages. + + This radio is likely to give higher shared filesystem load compared to + the UDP radio, but should be much more reliable. + """ + + def __init__(self, *, monitoring_url: str, source_id: int, timeout: int = 10, run_dir: str): + logger.info("filesystem based monitoring channel initializing") + self.source_id = source_id + self.base_path = f"{run_dir}/monitor-fs-radio/" + self.tmp_path = f"{self.base_path}/tmp" + self.new_path = f"{self.base_path}/new" + + os.makedirs(self.tmp_path, exist_ok=True) + os.makedirs(self.new_path, exist_ok=True) + + def send(self, message: object) -> None: + logger.info("Sending a monitoring message via filesystem") + + unique_id = str(uuid.uuid4()) + + tmp_filename = f"{self.tmp_path}/{unique_id}" + new_filename = f"{self.new_path}/{unique_id}" + buffer = (message, "NA") + + # this will write the message out then atomically + # move it into new/, so that a partially written + # file will never be observed in new/ + with open(tmp_filename, "wb") as f: + f.write(serialize(buffer)) + os.rename(tmp_filename, new_filename) diff --git a/parsl/monitoring/radios/udp.py b/parsl/monitoring/radios/udp.py new file mode 100644 index 0000000000..4ed857f6f9 --- /dev/null +++ b/parsl/monitoring/radios/udp.py @@ -0,0 +1,179 @@ +import logging +import multiprocessing as mp +import pickle +import socket +import threading +import time +from multiprocessing.queues import Queue +from typing import Any, Optional, Union + +from parsl.monitoring.radios.base import ( + MonitoringRadioReceiver, + MonitoringRadioSender, + RadioConfig, +) +from parsl.process_loggers import wrap_with_logs + +logger = logging.getLogger(__name__) + + +class UDPRadio(RadioConfig): + ip: str + + # these two values need to be initialized by a create_receiver step... + # which is why an earlier patch needs to turn the UDP and zmq receivers + # into separate threads that can exist separately + + # TODO: this atexit_timeout is now user exposed - in the prior UDP-in-router impl, I think maybe it wasn't (but i should check) + def __init__(self, *, port: Optional[int] = None, atexit_timeout: Union[int, float] = 3): + # TODO awkward: when a user creates this it can be none, + # but after create_receiver initalization it is always an int. + # perhaps leads to motivation of serializable config being its + # own class distinct from the user-specified RadioConfig object? + # Right now, there would be a type-error in create_sender except + # for an assert that asserts this reasoning to mypy. + self.port = port + self.atexit_timeout = atexit_timeout + + def create_sender(self, *, source_id: int) -> MonitoringRadioSender: + assert self.port is not None, "self.port should have been initialized by create_receiver" + return UDPRadioSender(self.ip, self.port, source_id) + + def create_receiver(self, ip: str, resource_msgs: Queue) -> Any: + """TODO: backwards compatibility would require a single one of these to + exist for all executors that want one, shut down when the last of its + users asks for shut down... in the case that udp_port is specified. + + But maybe the right thing to do here is lose that configuration parameter + in that form? especially as I'd like UDPRadio to go away entirely because + UDP isn't reliable or secure and other code requires reliability of messaging? + """ + + # we could bind to this instead of 0.0.0.0 but that would change behaviour, + # possibly breaking if the IP address isn't bindable (for example, if its + # a port forward). Otherwise, it isn't needed for creation of the listening + # port - only for creation of the sender. + self.ip = ip + + udp_sock = socket.socket(socket.AF_INET, + socket.SOCK_DGRAM, + socket.IPPROTO_UDP) + + # We are trying to bind to all interfaces with 0.0.0.0 + if self.port is None: + udp_sock.bind(('0.0.0.0', 0)) + self.port = udp_sock.getsockname()[1] + else: + try: + udp_sock.bind(('0.0.0.0', self.port)) + except Exception as e: + # TODO: this can be its own patch to use 'from' notation? + raise RuntimeError(f"Could not bind to UDP port {self.port}") from e + udp_sock.settimeout(0.001) # TODO: configurable loop_freq? it's hard-coded though... + logger.info(f"Initialized the UDP socket on port {self.port}") + + # this is now in the submitting process, not the router process. + # I don't think this matters for UDP so much because it's on the + # way out - but how should this work for other things? compare with + # filesystem radio impl? + logger.info("Starting UDP listener thread") + udp_radio_receiver_thread = UDPRadioReceiverThread(udp_sock=udp_sock, resource_msgs=resource_msgs, atexit_timeout=self.atexit_timeout) + udp_radio_receiver_thread.start() + + return udp_radio_receiver_thread + # TODO: wrap this with proper shutdown logic involving events etc? + + +class UDPRadioSender(MonitoringRadioSender): + + def __init__(self, ip: str, port: int, source_id: int, timeout: int = 10) -> None: + """ + Parameters + ---------- + + XXX TODO + monitoring_url : str + URL of the form ://: + source_id : str + String identifier of the source + timeout : int + timeout, default=10s + """ + self.sock_timeout = timeout + self.source_id = source_id + self.ip = ip + self.port = port + + self.sock = socket.socket(socket.AF_INET, + socket.SOCK_DGRAM, + socket.IPPROTO_UDP) # UDP + self.sock.settimeout(self.sock_timeout) + + def send(self, message: object) -> None: + """ Sends a message to the UDP receiver + + Parameter + --------- + + message: object + Arbitrary pickle-able object that is to be sent + + Returns: + None + """ + try: + buffer = pickle.dumps(message) + except Exception: + logging.exception("Exception during pickling", exc_info=True) + return + + try: + self.sock.sendto(buffer, (self.ip, self.port)) + except socket.timeout: + logging.error("Could not send message within timeout limit") + return + return + + +class UDPRadioReceiverThread(threading.Thread, MonitoringRadioReceiver): + def __init__(self, udp_sock: socket.socket, resource_msgs: Queue, atexit_timeout: Union[int, float]): + self.exit_event = mp.Event() + self.udp_sock = udp_sock + self.resource_msgs = resource_msgs + self.atexit_timeout = atexit_timeout + super().__init__() + + @wrap_with_logs + def run(self) -> None: + try: + while not self.exit_event.is_set(): + try: + data, addr = self.udp_sock.recvfrom(2048) + resource_msg = pickle.loads(data) + logger.debug("Got UDP Message from {}: {}".format(addr, resource_msg)) + self.resource_msgs.put((resource_msg, addr)) + except socket.timeout: + pass + + logger.info("UDP listener draining") + last_msg_received_time = time.time() + while time.time() - last_msg_received_time < self.atexit_timeout: + try: + data, addr = self.udp_sock.recvfrom(2048) + msg = pickle.loads(data) + logger.debug("Got UDP Message from {}: {}".format(addr, msg)) + self.resource_msgs.put((msg, addr)) + last_msg_received_time = time.time() + except socket.timeout: + pass + + logger.info("UDP listener finishing normally") + finally: + logger.info("UDP listener finished") + + def shutdown(self) -> None: + logger.debug("Set exit event") + self.exit_event.set() + logger.debug("Joining") + self.join() + logger.debug("done") diff --git a/parsl/monitoring/remote.py b/parsl/monitoring/remote.py index 055a013627..5af66bfc20 100644 --- a/parsl/monitoring/remote.py +++ b/parsl/monitoring/remote.py @@ -7,12 +7,7 @@ from typing import Any, Callable, Dict, List, Sequence, Tuple from parsl.monitoring.message_type import MessageType -from parsl.monitoring.radios import ( - FilesystemRadioSender, - HTEXRadioSender, - MonitoringRadioSender, - UDPRadioSender, -) +from parsl.monitoring.radios.base import MonitoringRadioSender, RadioConfig from parsl.multiprocessing import ForkProcess from parsl.process_loggers import wrap_with_logs @@ -25,11 +20,10 @@ def monitor_wrapper(*, kwargs: Dict, # per invocation x_try_id: int, # per invocation x_task_id: int, # per invocation - monitoring_hub_url: str, # per workflow + radio_config: RadioConfig, # per executor run_id: str, # per workflow logging_level: int, # per workflow sleep_dur: float, # per workflow - radio_mode: str, # per executor monitor_resources: bool, # per workflow run_dir: str) -> Tuple[Callable, Sequence, Dict]: """Wrap the Parsl app with a function that will call the monitor function and point it at the correct pid when the task begins. @@ -43,9 +37,8 @@ def wrapped(*args: List[Any], **kwargs: Dict[str, Any]) -> Any: # Send first message to monitoring router send_first_message(try_id, task_id, - monitoring_hub_url, + radio_config, run_id, - radio_mode, run_dir) if monitor_resources and sleep_dur > 0: @@ -54,9 +47,8 @@ def wrapped(*args: List[Any], **kwargs: Dict[str, Any]) -> Any: args=(os.getpid(), try_id, task_id, - monitoring_hub_url, + radio_config, run_id, - radio_mode, logging_level, sleep_dur, run_dir, @@ -89,9 +81,9 @@ def wrapped(*args: List[Any], **kwargs: Dict[str, Any]) -> Any: send_last_message(try_id, task_id, - monitoring_hub_url, + radio_config, run_id, - radio_mode, run_dir) + run_dir) new_kwargs = kwargs.copy() new_kwargs['_parsl_monitoring_task_id'] = x_task_id @@ -100,49 +92,43 @@ def wrapped(*args: List[Any], **kwargs: Dict[str, Any]) -> Any: return (wrapped, args, new_kwargs) -def get_radio(radio_mode: str, monitoring_hub_url: str, task_id: int, run_dir: str) -> MonitoringRadioSender: +def get_radio(radio_config: RadioConfig, task_id: int) -> MonitoringRadioSender: + + # TODO: maybe this function will end up simple enough to eliminate + radio: MonitoringRadioSender - if radio_mode == "udp": - radio = UDPRadioSender(monitoring_hub_url, - source_id=task_id) - elif radio_mode == "htex": - radio = HTEXRadioSender(monitoring_hub_url, - source_id=task_id) - elif radio_mode == "filesystem": - radio = FilesystemRadioSender(monitoring_url=monitoring_hub_url, - source_id=task_id, run_dir=run_dir) - else: - raise RuntimeError(f"Unknown radio mode: {radio_mode}") + radio = radio_config.create_sender(source_id=task_id) + return radio @wrap_with_logs def send_first_message(try_id: int, task_id: int, - monitoring_hub_url: str, - run_id: str, radio_mode: str, run_dir: str) -> None: - send_first_last_message(try_id, task_id, monitoring_hub_url, run_id, - radio_mode, run_dir, False) + radio_config: RadioConfig, + run_id: str, run_dir: str) -> None: + send_first_last_message(try_id, task_id, radio_config, run_id, + run_dir, False) @wrap_with_logs def send_last_message(try_id: int, task_id: int, - monitoring_hub_url: str, - run_id: str, radio_mode: str, run_dir: str) -> None: - send_first_last_message(try_id, task_id, monitoring_hub_url, run_id, - radio_mode, run_dir, True) + radio_config: RadioConfig, + run_id: str, run_dir: str) -> None: + send_first_last_message(try_id, task_id, radio_config, run_id, + run_dir, True) def send_first_last_message(try_id: int, task_id: int, - monitoring_hub_url: str, - run_id: str, radio_mode: str, run_dir: str, + radio_config: RadioConfig, + run_id: str, run_dir: str, is_last: bool) -> None: import os import platform - radio = get_radio(radio_mode, monitoring_hub_url, task_id, run_dir) + radio = get_radio(radio_config, task_id) msg = (MessageType.RESOURCE_INFO, {'run_id': run_id, @@ -162,9 +148,8 @@ def send_first_last_message(try_id: int, def monitor(pid: int, try_id: int, task_id: int, - monitoring_hub_url: str, + radio_config: RadioConfig, run_id: str, - radio_mode: str, logging_level: int, sleep_dur: float, run_dir: str, @@ -188,7 +173,7 @@ def monitor(pid: int, setproctitle("parsl: task resource monitor") - radio = get_radio(radio_mode, monitoring_hub_url, task_id, run_dir) + radio = get_radio(radio_config, task_id) logging.debug("start of monitor") diff --git a/parsl/monitoring/router.py b/parsl/monitoring/router.py index 9a8d4460e1..6e6e267383 100644 --- a/parsl/monitoring/router.py +++ b/parsl/monitoring/router.py @@ -2,13 +2,11 @@ import logging import os -import pickle -import socket import threading import time from multiprocessing.queues import Queue from multiprocessing.synchronize import Event -from typing import Optional, Tuple, Union +from typing import Tuple, Union import typeguard import zmq @@ -26,7 +24,6 @@ class MonitoringRouter: def __init__(self, *, hub_address: str, - udp_port: Optional[int] = None, zmq_port_range: Tuple[int, int] = (55050, 56000), monitoring_hub_address: str = "127.0.0.1", @@ -42,8 +39,6 @@ def __init__(self, ---------- hub_address : str The ip address at which the workers will be able to reach the Hub. - udp_port : int - The specific port at which workers will be able to reach the Hub via UDP. Default: None zmq_port_range : tuple(int, int) The MonitoringHub picks ports at random from the range which will be used by Hub. Default: (55050, 56000) @@ -70,24 +65,6 @@ def __init__(self, self.loop_freq = 10.0 # milliseconds - # Initialize the UDP socket - self.udp_sock = socket.socket(socket.AF_INET, - socket.SOCK_DGRAM, - socket.IPPROTO_UDP) - - # We are trying to bind to all interfaces with 0.0.0.0 - if not udp_port: - self.udp_sock.bind(('0.0.0.0', 0)) - self.udp_port = self.udp_sock.getsockname()[1] - else: - self.udp_port = udp_port - try: - self.udp_sock.bind(('0.0.0.0', self.udp_port)) - except Exception as e: - raise RuntimeError(f"Could not bind to udp_port {udp_port} because: {e}") - self.udp_sock.settimeout(self.loop_freq / 1000) - self.logger.info("Initialized the UDP socket on 0.0.0.0:{}".format(self.udp_port)) - self._context = zmq.Context() self.zmq_receiver_channel = self._context.socket(zmq.DEALER) self.zmq_receiver_channel.setsockopt(zmq.LINGER, 0) @@ -103,10 +80,6 @@ def __init__(self, @wrap_with_logs(target="monitoring_router") def start(self) -> None: - self.logger.info("Starting UDP listener thread") - udp_radio_receiver_thread = threading.Thread(target=self.start_udp_listener) - udp_radio_receiver_thread.start() - self.logger.info("Starting ZMQ listener thread") zmq_radio_receiver_thread = threading.Thread(target=self.start_zmq_listener) zmq_radio_receiver_thread.start() @@ -117,37 +90,7 @@ def start(self) -> None: self.logger.info("Joining on ZMQ listener thread") zmq_radio_receiver_thread.join() - self.logger.info("Joining on UDP listener thread") - udp_radio_receiver_thread.join() - self.logger.info("Joined on both ZMQ and UDP listener threads") - - @wrap_with_logs(target="monitoring_router") - def start_udp_listener(self) -> None: - try: - while not self.exit_event.is_set(): - try: - data, addr = self.udp_sock.recvfrom(2048) - resource_msg = pickle.loads(data) - self.logger.debug("Got UDP Message from {}: {}".format(addr, resource_msg)) - self.resource_msgs.put((resource_msg, addr)) - except socket.timeout: - pass - - self.logger.info("UDP listener draining") - last_msg_received_time = time.time() - while time.time() - last_msg_received_time < self.atexit_timeout: - try: - data, addr = self.udp_sock.recvfrom(2048) - msg = pickle.loads(data) - self.logger.debug("Got UDP Message from {}: {}".format(addr, msg)) - self.resource_msgs.put((msg, addr)) - last_msg_received_time = time.time() - except socket.timeout: - pass - - self.logger.info("UDP listener finishing normally") - finally: - self.logger.info("UDP listener finished") + self.logger.info("Joined on ZMQ listener thread") @wrap_with_logs(target="monitoring_router") def start_zmq_listener(self) -> None: @@ -185,13 +128,12 @@ def start_zmq_listener(self) -> None: @wrap_with_logs @typeguard.typechecked def router_starter(*, - comm_q: "Queue[Union[Tuple[int, int], str]]", + comm_q: "Queue[Union[int, str]]", exception_q: "Queue[Tuple[str, str]]", resource_msgs: "Queue[AddressedMonitoringMessage]", exit_event: Event, hub_address: str, - udp_port: Optional[int], zmq_port_range: Tuple[int, int], logdir: str, @@ -199,7 +141,6 @@ def router_starter(*, setproctitle("parsl: monitoring router") try: router = MonitoringRouter(hub_address=hub_address, - udp_port=udp_port, zmq_port_range=zmq_port_range, logdir=logdir, logging_level=logging_level, @@ -209,7 +150,7 @@ def router_starter(*, logger.error("MonitoringRouter construction failed.", exc_info=True) comm_q.put(f"Monitoring router construction failed: {e}") else: - comm_q.put((router.udp_port, router.zmq_receiver_port)) + comm_q.put(router.zmq_receiver_port) router.logger.info("Starting MonitoringRouter in router_starter") try: diff --git a/parsl/tests/conftest.py b/parsl/tests/conftest.py index 638088c44c..c22ff24c59 100644 --- a/parsl/tests/conftest.py +++ b/parsl/tests/conftest.py @@ -259,6 +259,8 @@ def load_dfk_local_module(request, pytestconfig, tmpd_cwd_session): dfk.cleanup() assert DataFlowKernelLoader._dfk is None + # assert [t for t in threading.enumerate() if "MainThread" not in t.name and "HTEX-Queue-Management-Thread" not in t.name] == [] + else: yield