From 39fc0de5b188f2f78972b56af4c91535878fec77 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Mon, 28 Oct 2024 18:07:11 +0000 Subject: [PATCH] zmq router and udp router are *forwarders* : they receive a message and forward it on use the radio interface to make it clear that that is what they are doing the intention there is to make it cleaner to swap out *how* they perform forwarding - clarifying that these two threads are forwarders, not doing anything else to the message flow. after this patch, my intention is that: i) the ZMQ listener moves into the db manager and there is no multiprocessing queue forwarding from outside of the db manager ii) the UDP listener, all that is left, means this can be renamed the UDP radio receiver, sitting as a processes alongside the filesystem radio receiver (and eventually any other radio receivers) compare to how the interchange sends htexradio messages onwards via some arbitrary other radio (which right now is always the ZMQRadioSender) -- rather than having its own radio code. the previous multiprocessing queue code should be replaced with multiprocessing queue sender --- parsl/monitoring/monitoring.py | 4 +++- parsl/monitoring/router.py | 12 ++++++++---- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/parsl/monitoring/monitoring.py b/parsl/monitoring/monitoring.py index 134e1ba50f..8a895ee447 100644 --- a/parsl/monitoring/monitoring.py +++ b/parsl/monitoring/monitoring.py @@ -264,6 +264,8 @@ def filesystem_receiver(q: Queue[TaggedMonitoringMessage], run_dir: str) -> None new_dir = f"{base_path}/new/" logger.debug(f"Creating new and tmp paths under {base_path}") + target_radio = MultiprocessingQueueRadioSender(q) + os.makedirs(tmp_dir, exist_ok=True) os.makedirs(new_dir, exist_ok=True) @@ -279,7 +281,7 @@ def filesystem_receiver(q: Queue[TaggedMonitoringMessage], run_dir: str) -> None message = deserialize(f.read()) logger.debug(f"Message received is: {message}") assert isinstance(message, tuple) - q.put(cast(TaggedMonitoringMessage, message)) + target_radio.send(cast(TaggedMonitoringMessage, message)) os.remove(full_path_filename) except Exception: logger.exception(f"Exception processing {filename} - probably will be retried next iteration") diff --git a/parsl/monitoring/router.py b/parsl/monitoring/router.py index 790582d823..0d420b3da6 100644 --- a/parsl/monitoring/router.py +++ b/parsl/monitoring/router.py @@ -14,6 +14,7 @@ import zmq from parsl.log_utils import set_file_logger +from parsl.monitoring.radios import MultiprocessingQueueRadioSender from parsl.monitoring.types import TaggedMonitoringMessage from parsl.process_loggers import wrap_with_logs from parsl.utils import setproctitle @@ -55,6 +56,9 @@ def __init__(self, The amount of time in seconds to terminate the hub without receiving any messages, after the last dfk workflow message is received. resource_msgs : multiprocessing.Queue A multiprocessing queue to receive messages to be routed onwards to the database process + In later PRs, this will be replaced by a monitoring radio configuration to allow the UDP radio receiver to + forward wherever without code changes in these modules. (and the ZMQ radio will disappear from this file, + moving into the DB manager) exit_event : Event An event that the main Parsl process will set to signal that the monitoring router should shut down. @@ -98,7 +102,7 @@ def __init__(self, min_port=zmq_port_range[0], max_port=zmq_port_range[1]) - self.resource_msgs = resource_msgs + self.target_radio = MultiprocessingQueueRadioSender(resource_msgs) self.exit_event = exit_event @wrap_with_logs(target="monitoring_router") @@ -125,7 +129,7 @@ def start_udp_listener(self) -> None: data, addr = self.udp_sock.recvfrom(2048) resource_msg = pickle.loads(data) self.logger.debug("Got UDP Message from {}: {}".format(addr, resource_msg)) - self.resource_msgs.put(resource_msg) + self.target_radio.send(resource_msg) except socket.timeout: pass @@ -136,7 +140,7 @@ def start_udp_listener(self) -> None: data, addr = self.udp_sock.recvfrom(2048) msg = pickle.loads(data) self.logger.debug("Got UDP Message from {}: {}".format(addr, msg)) - self.resource_msgs.put(msg) + self.target_radio.send(msg) last_msg_received_time = time.time() except socket.timeout: pass @@ -160,7 +164,7 @@ def start_zmq_listener(self) -> None: assert len(msg) >= 1, "ZMQ Receiver expects tuples of length at least 1, got {}".format(msg) assert len(msg) == 2, "ZMQ Receiver expects message tuples of exactly length 2, got {}".format(msg) - self.resource_msgs.put(msg) + self.target_radio.send(msg) except zmq.Again: pass except Exception: