From 090322fe3ddececc10b3c57ff89c84c3ea94166f Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Mon, 28 Oct 2024 18:07:11 +0000 Subject: [PATCH] Make ZMQ, UDP and filesystem monitoring routers send via radios This PR is intended to consolidate monitoring message sending in the monitoring radio code. This is a step towards removing Python multiprocessing from the monitoring code base (see issue #2343) by making it clearer how to change to a different message send implementation (by swapping out the radio implementation and configuration) Compare to how the interchange forwards HTEXRadio messages onwards via some other radio (which right now is always the ZMQRadioSender) -- rather than having its own ZMQ code. --- parsl/monitoring/monitoring.py | 4 +++- parsl/monitoring/router.py | 10 +++++----- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/parsl/monitoring/monitoring.py b/parsl/monitoring/monitoring.py index a1b20f2705..08d771036a 100644 --- a/parsl/monitoring/monitoring.py +++ b/parsl/monitoring/monitoring.py @@ -270,6 +270,8 @@ def filesystem_receiver(logdir: str, q: Queue[TaggedMonitoringMessage], run_dir: new_dir = f"{base_path}/new/" logger.debug("Creating new and tmp paths under %s", base_path) + target_radio = MultiprocessingQueueRadioSender(q) + os.makedirs(tmp_dir, exist_ok=True) os.makedirs(new_dir, exist_ok=True) @@ -285,7 +287,7 @@ def filesystem_receiver(logdir: str, q: Queue[TaggedMonitoringMessage], run_dir: message = pickle.load(f) logger.debug("Message received is: %s", message) assert isinstance(message, tuple) - q.put(cast(TaggedMonitoringMessage, message)) + target_radio.send(cast(TaggedMonitoringMessage, message)) os.remove(full_path_filename) except Exception: logger.exception("Exception processing %s - probably will be retried next iteration", filename) diff --git a/parsl/monitoring/router.py b/parsl/monitoring/router.py index 1d4b522e82..a45500fc23 100644 --- a/parsl/monitoring/router.py +++ b/parsl/monitoring/router.py @@ -14,6 +14,7 @@ import zmq from parsl.log_utils import set_file_logger +from parsl.monitoring.radios import MultiprocessingQueueRadioSender from parsl.monitoring.types import TaggedMonitoringMessage from parsl.process_loggers import wrap_with_logs from parsl.utils import setproctitle @@ -55,7 +56,6 @@ def __init__(self, The amount of time in seconds to terminate the hub without receiving any messages, after the last dfk workflow message is received. resource_msgs : multiprocessing.Queue A multiprocessing queue to receive messages to be routed onwards to the database process - exit_event : Event An event that the main Parsl process will set to signal that the monitoring router should shut down. """ @@ -98,7 +98,7 @@ def __init__(self, min_port=zmq_port_range[0], max_port=zmq_port_range[1]) - self.resource_msgs = resource_msgs + self.target_radio = MultiprocessingQueueRadioSender(resource_msgs) self.exit_event = exit_event @wrap_with_logs(target="monitoring_router") @@ -125,7 +125,7 @@ def start_udp_listener(self) -> None: data, addr = self.udp_sock.recvfrom(2048) resource_msg = pickle.loads(data) self.logger.debug("Got UDP Message from {}: {}".format(addr, resource_msg)) - self.resource_msgs.put(resource_msg) + self.target_radio.send(resource_msg) except socket.timeout: pass @@ -136,7 +136,7 @@ def start_udp_listener(self) -> None: data, addr = self.udp_sock.recvfrom(2048) msg = pickle.loads(data) self.logger.debug("Got UDP Message from {}: {}".format(addr, msg)) - self.resource_msgs.put(msg) + self.target_radio.send(msg) last_msg_received_time = time.time() except socket.timeout: pass @@ -160,7 +160,7 @@ def start_zmq_listener(self) -> None: assert len(msg) >= 1, "ZMQ Receiver expects tuples of length at least 1, got {}".format(msg) assert len(msg) == 2, "ZMQ Receiver expects message tuples of exactly length 2, got {}".format(msg) - self.resource_msgs.put(msg) + self.target_radio.send(msg) except zmq.Again: pass except Exception: