From 2b8a1bfd6ecad8af9b2647bf0015dd12c059bf5e Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 11 Apr 2024 15:25:56 +0000 Subject: [PATCH] Split monitoring router into two radio-specific receiver threads This is part of two ongoing strands of development: * Preparation for arbitrary monitoring radio receivers, where UDP is not special compared to other methods. This code separates out the UDP specific code making it easier to move around later (see development in PR #3315) * Avoiding poll-one-thing, poll-another-thing tight polling loops in favour of multiple blocking loops. The two router receiver sections used to run in a single tight non-blocking loop, each component waiting loop_freq (= 10 ms) for something to happen. After this PR, the separated loops are more amenable to longer blocking times - they only need to discover when exit event is set which probably can be more on the order of 1 second. --- parsl/monitoring/router.py | 58 +++++++++++++++++++++++++++++--------- 1 file changed, 44 insertions(+), 14 deletions(-) diff --git a/parsl/monitoring/router.py b/parsl/monitoring/router.py index 9a422027c1..396e3c653d 100644 --- a/parsl/monitoring/router.py +++ b/parsl/monitoring/router.py @@ -5,6 +5,7 @@ import pickle import queue import socket +import threading import time from multiprocessing.synchronize import Event from typing import Optional, Tuple, Union @@ -108,7 +109,28 @@ def __init__(self, self.resource_msgs = resource_msgs self.exit_event = exit_event + @wrap_with_logs(target="monitoring_router") def start(self) -> None: + self.logger.info("Starting UDP listener thread") + udp_radio_receiver_thread = threading.Thread(target=self.start_udp_listener) + udp_radio_receiver_thread.start() + + self.logger.info("Starting ZMQ listener thread") + zmq_radio_receiver_thread = threading.Thread(target=self.start_zmq_listener) + zmq_radio_receiver_thread.start() + + # exit when both of those have exiting + # TODO: this is to preserve the existing behaviour of start(), but it + # isn't necessarily the *right* thing to do... + + self.logger.info("Joining on ZMQ listener thread") + zmq_radio_receiver_thread.join() + self.logger.info("Joining on UDP listener thread") + udp_radio_receiver_thread.join() + self.logger.info("Joined on both ZMQ and UDP listener threads") + + @wrap_with_logs(target="monitoring_router") + def start_udp_listener(self) -> None: try: while not self.exit_event.is_set(): try: @@ -119,6 +141,26 @@ def start(self) -> None: except socket.timeout: pass + self.logger.info("UDP listener draining") + last_msg_received_time = time.time() + while time.time() - last_msg_received_time < self.atexit_timeout: + try: + data, addr = self.udp_sock.recvfrom(2048) + msg = pickle.loads(data) + self.logger.debug("Got UDP Message from {}: {}".format(addr, msg)) + self.resource_msgs.put((msg, addr)) + last_msg_received_time = time.time() + except socket.timeout: + pass + + self.logger.info("UDP listener finishing normally") + finally: + self.logger.info("UDP listener finished") + + @wrap_with_logs(target="monitoring_router") + def start_zmq_listener(self) -> None: + try: + while not self.exit_event.is_set(): try: dfk_loop_start = time.time() while time.time() - dfk_loop_start < 1.0: # TODO make configurable @@ -161,21 +203,9 @@ def start(self) -> None: # thing to do. self.logger.warning("Failure processing a ZMQ message", exc_info=True) - self.logger.info("Monitoring router draining") - last_msg_received_time = time.time() - while time.time() - last_msg_received_time < self.atexit_timeout: - try: - data, addr = self.udp_sock.recvfrom(2048) - msg = pickle.loads(data) - self.logger.debug("Got UDP Message from {}: {}".format(addr, msg)) - self.resource_msgs.put((msg, addr)) - last_msg_received_time = time.time() - except socket.timeout: - pass - - self.logger.info("Monitoring router finishing normally") + self.logger.info("ZMQ listener finishing normally") finally: - self.logger.info("Monitoring router finished") + self.logger.info("ZMQ listener finished") @wrap_with_logs