From b992bc8458f396de4faa423dc22ad9d1533f9aee Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 11 Apr 2024 15:25:56 +0000 Subject: [PATCH] split monitoring router into radio-specific receivers: zmq and UDP this is in preparation for pluggable/configurable radios: see PR #3315 in that future work, these receivers wont be launched unless configured... or at least, the UDP one won't (and similarly the filesystem one) and in pluggable mode we would expect arbitrary new receivers to exist so then theres no reason for these ones to be special this opens up, in a future PR, the ability to not need to poll at such a high frequency - or at all... the poll-timeout is to allow the other poll to happen, but this PR makes those two polls happen in two threads. --- parsl/monitoring/router.py | 58 +++++++++++++++++++++++++++++--------- 1 file changed, 44 insertions(+), 14 deletions(-) diff --git a/parsl/monitoring/router.py b/parsl/monitoring/router.py index b787545c17..5f2efddc22 100644 --- a/parsl/monitoring/router.py +++ b/parsl/monitoring/router.py @@ -5,6 +5,7 @@ import pickle import queue import socket +import threading import time from multiprocessing.synchronize import Event from typing import Optional, Tuple, Union @@ -105,7 +106,28 @@ def __init__(self, self.resource_msgs = resource_msgs self.exit_event = exit_event + @wrap_with_logs(target="monitoring_router") def start(self) -> None: + self.logger.info("Starting UDP listener thread") + udp_radio_receiver_thread = threading.Thread(target=self.start_udp_listener) + udp_radio_receiver_thread.start() + + self.logger.info("Starting ZMQ listener thread") + zmq_radio_receiver_thread = threading.Thread(target=self.start_zmq_listener) + zmq_radio_receiver_thread.start() + + # exit when both of those have exiting + # TODO: this is to preserve the existing behaviour of start(), but it + # isn't necessarily the *right* thing to do... + + self.logger.info("Joining on ZMQ listener thread") + zmq_radio_receiver_thread.join() + self.logger.info("Joining on UDP listener thread") + udp_radio_receiver_thread.join() + self.logger.info("Joined on both ZMQ and UDP listener threads") + + @wrap_with_logs(target="monitoring_router") + def start_udp_listener(self) -> None: try: while not self.exit_event.is_set(): try: @@ -116,6 +138,26 @@ def start(self) -> None: except socket.timeout: pass + self.logger.info("UDP listener draining") + last_msg_received_time = time.time() + while time.time() - last_msg_received_time < self.atexit_timeout: + try: + data, addr = self.udp_sock.recvfrom(2048) + msg = pickle.loads(data) + self.logger.debug("Got UDP Message from {}: {}".format(addr, msg)) + self.resource_msgs.put((msg, addr)) + last_msg_received_time = time.time() + except socket.timeout: + pass + + self.logger.info("UDP listener finishing normally") + finally: + self.logger.info("UDP listener finished") + + @wrap_with_logs(target="monitoring_router") + def start_zmq_listener(self) -> None: + try: + while not self.exit_event.is_set(): try: dfk_loop_start = time.time() while time.time() - dfk_loop_start < 1.0: # TODO make configurable @@ -158,21 +200,9 @@ def start(self) -> None: # thing to do. self.logger.warning("Failure processing a ZMQ message", exc_info=True) - self.logger.info("Monitoring router draining") - last_msg_received_time = time.time() - while time.time() - last_msg_received_time < self.atexit_timeout: - try: - data, addr = self.udp_sock.recvfrom(2048) - msg = pickle.loads(data) - self.logger.debug("Got UDP Message from {}: {}".format(addr, msg)) - self.resource_msgs.put((msg, addr)) - last_msg_received_time = time.time() - except socket.timeout: - pass - - self.logger.info("Monitoring router finishing normally") + self.logger.info("ZMQ listener finishing normally") finally: - self.logger.info("Monitoring router finished") + self.logger.info("ZMQ listener finished") @wrap_with_logs