Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split monitoring router into two radio-specific receiver threads #3558

Merged
merged 6 commits into from
Aug 5, 2024
Merged
54 changes: 40 additions & 14 deletions parsl/monitoring/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import pickle
import queue
import socket
import threading
import time
from multiprocessing.synchronize import Event
from typing import Optional, Tuple, Union
Expand Down Expand Up @@ -108,7 +109,24 @@ def __init__(self,
self.resource_msgs = resource_msgs
self.exit_event = exit_event

@wrap_with_logs(target="monitoring_router")
def start(self) -> None:
self.logger.info("Starting UDP listener thread")
udp_radio_receiver_thread = threading.Thread(target=self.start_udp_listener, daemon=True)
udp_radio_receiver_thread.start()

self.logger.info("Starting ZMQ listener thread")
zmq_radio_receiver_thread = threading.Thread(target=self.start_zmq_listener, daemon=True)
zmq_radio_receiver_thread.start()

self.logger.info("Joining on ZMQ listener thread")
zmq_radio_receiver_thread.join()
self.logger.info("Joining on UDP listener thread")
udp_radio_receiver_thread.join()
self.logger.info("Joined on both ZMQ and UDP listener threads")

@wrap_with_logs(target="monitoring_router")
def start_udp_listener(self) -> None:
try:
while not self.exit_event.is_set():
try:
Expand All @@ -119,6 +137,26 @@ def start(self) -> None:
except socket.timeout:
pass

self.logger.info("UDP listener draining")
last_msg_received_time = time.time()
while time.time() - last_msg_received_time < self.atexit_timeout:
try:
data, addr = self.udp_sock.recvfrom(2048)
msg = pickle.loads(data)
self.logger.debug("Got UDP Message from {}: {}".format(addr, msg))
self.resource_msgs.put((msg, addr))
last_msg_received_time = time.time()
except socket.timeout:
pass

self.logger.info("UDP listener finishing normally")
finally:
self.logger.info("UDP listener finished")

@wrap_with_logs(target="monitoring_router")
def start_zmq_listener(self) -> None:
try:
while not self.exit_event.is_set():
try:
dfk_loop_start = time.time()
while time.time() - dfk_loop_start < 1.0: # TODO make configurable
Expand Down Expand Up @@ -161,21 +199,9 @@ def start(self) -> None:
# thing to do.
self.logger.warning("Failure processing a ZMQ message", exc_info=True)

self.logger.info("Monitoring router draining")
last_msg_received_time = time.time()
while time.time() - last_msg_received_time < self.atexit_timeout:
try:
data, addr = self.udp_sock.recvfrom(2048)
msg = pickle.loads(data)
self.logger.debug("Got UDP Message from {}: {}".format(addr, msg))
self.resource_msgs.put((msg, addr))
last_msg_received_time = time.time()
except socket.timeout:
pass

self.logger.info("Monitoring router finishing normally")
self.logger.info("ZMQ listener finishing normally")
finally:
self.logger.info("Monitoring router finished")
self.logger.info("ZMQ listener finished")


@wrap_with_logs
Expand Down
Loading