Skip to content

Commit

Permalink
Split monitoring router into two radio-specific receiver threads
Browse files Browse the repository at this point in the history
This is part of two ongoing strands of development:

* Preparation for arbitrary monitoring radio receivers, where UDP is not
special compared to other methods. This code separates out the UDP specific
code making it easier to move around later (see development in PR #3315)

* Avoiding poll-one-thing, poll-another-thing tight polling loops in
favour of multiple blocking loops. The two router receiver sections used
to run in a single tight non-blocking loop, each component waiting
loop_freq (= 10 ms) for something to happen. After this PR, the separated
loops are more amenable to longer blocking times - they only need to
discover when exit event is set which probably can be more on the order
of 1 second.
  • Loading branch information
benclifford committed Jul 31, 2024
1 parent 9c982c5 commit 2b8a1bf
Showing 1 changed file with 44 additions and 14 deletions.
58 changes: 44 additions & 14 deletions parsl/monitoring/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import pickle
import queue
import socket
import threading
import time
from multiprocessing.synchronize import Event
from typing import Optional, Tuple, Union
Expand Down Expand Up @@ -108,7 +109,28 @@ def __init__(self,
self.resource_msgs = resource_msgs
self.exit_event = exit_event

@wrap_with_logs(target="monitoring_router")
def start(self) -> None:
self.logger.info("Starting UDP listener thread")
udp_radio_receiver_thread = threading.Thread(target=self.start_udp_listener)
udp_radio_receiver_thread.start()

self.logger.info("Starting ZMQ listener thread")
zmq_radio_receiver_thread = threading.Thread(target=self.start_zmq_listener)
zmq_radio_receiver_thread.start()

# exit when both of those have exiting
# TODO: this is to preserve the existing behaviour of start(), but it
# isn't necessarily the *right* thing to do...

self.logger.info("Joining on ZMQ listener thread")
zmq_radio_receiver_thread.join()
self.logger.info("Joining on UDP listener thread")
udp_radio_receiver_thread.join()
self.logger.info("Joined on both ZMQ and UDP listener threads")

@wrap_with_logs(target="monitoring_router")
def start_udp_listener(self) -> None:
try:
while not self.exit_event.is_set():
try:
Expand All @@ -119,6 +141,26 @@ def start(self) -> None:
except socket.timeout:
pass

self.logger.info("UDP listener draining")
last_msg_received_time = time.time()
while time.time() - last_msg_received_time < self.atexit_timeout:
try:
data, addr = self.udp_sock.recvfrom(2048)
msg = pickle.loads(data)
self.logger.debug("Got UDP Message from {}: {}".format(addr, msg))
self.resource_msgs.put((msg, addr))
last_msg_received_time = time.time()
except socket.timeout:
pass

self.logger.info("UDP listener finishing normally")
finally:
self.logger.info("UDP listener finished")

@wrap_with_logs(target="monitoring_router")
def start_zmq_listener(self) -> None:
try:
while not self.exit_event.is_set():
try:
dfk_loop_start = time.time()
while time.time() - dfk_loop_start < 1.0: # TODO make configurable
Expand Down Expand Up @@ -161,21 +203,9 @@ def start(self) -> None:
# thing to do.
self.logger.warning("Failure processing a ZMQ message", exc_info=True)

self.logger.info("Monitoring router draining")
last_msg_received_time = time.time()
while time.time() - last_msg_received_time < self.atexit_timeout:
try:
data, addr = self.udp_sock.recvfrom(2048)
msg = pickle.loads(data)
self.logger.debug("Got UDP Message from {}: {}".format(addr, msg))
self.resource_msgs.put((msg, addr))
last_msg_received_time = time.time()
except socket.timeout:
pass

self.logger.info("Monitoring router finishing normally")
self.logger.info("ZMQ listener finishing normally")
finally:
self.logger.info("Monitoring router finished")
self.logger.info("ZMQ listener finished")


@wrap_with_logs
Expand Down

0 comments on commit 2b8a1bf

Please sign in to comment.