Skip to content

Commit

Permalink
Make ZMQ, UDP and filesystem monitoring routers send via radios
Browse files Browse the repository at this point in the history
This PR is intended to consolidate monitoring message sending in the
monitoring radio code.

This is a step towards removing Python multiprocessing from the
monitoring code base (see issue #2343) by making it clearer how to change
to a different message send implementation (by swapping out the radio
implementation and configuration)

Compare to how the interchange forwards HTEXRadio messages onwards via
some other radio (which right now is always the ZMQRadioSender) -- rather
than having its own ZMQ code.
  • Loading branch information
benclifford committed Nov 14, 2024
1 parent 5cb58d1 commit 090322f
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 6 deletions.
4 changes: 3 additions & 1 deletion parsl/monitoring/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,8 @@ def filesystem_receiver(logdir: str, q: Queue[TaggedMonitoringMessage], run_dir:
new_dir = f"{base_path}/new/"
logger.debug("Creating new and tmp paths under %s", base_path)

target_radio = MultiprocessingQueueRadioSender(q)

os.makedirs(tmp_dir, exist_ok=True)
os.makedirs(new_dir, exist_ok=True)

Expand All @@ -285,7 +287,7 @@ def filesystem_receiver(logdir: str, q: Queue[TaggedMonitoringMessage], run_dir:
message = pickle.load(f)
logger.debug("Message received is: %s", message)
assert isinstance(message, tuple)
q.put(cast(TaggedMonitoringMessage, message))
target_radio.send(cast(TaggedMonitoringMessage, message))
os.remove(full_path_filename)
except Exception:
logger.exception("Exception processing %s - probably will be retried next iteration", filename)
Expand Down
10 changes: 5 additions & 5 deletions parsl/monitoring/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import zmq

from parsl.log_utils import set_file_logger
from parsl.monitoring.radios import MultiprocessingQueueRadioSender
from parsl.monitoring.types import TaggedMonitoringMessage
from parsl.process_loggers import wrap_with_logs
from parsl.utils import setproctitle
Expand Down Expand Up @@ -55,7 +56,6 @@ def __init__(self,
The amount of time in seconds to terminate the hub without receiving any messages, after the last dfk workflow message is received.
resource_msgs : multiprocessing.Queue
A multiprocessing queue to receive messages to be routed onwards to the database process
exit_event : Event
An event that the main Parsl process will set to signal that the monitoring router should shut down.
"""
Expand Down Expand Up @@ -98,7 +98,7 @@ def __init__(self,
min_port=zmq_port_range[0],
max_port=zmq_port_range[1])

self.resource_msgs = resource_msgs
self.target_radio = MultiprocessingQueueRadioSender(resource_msgs)
self.exit_event = exit_event

@wrap_with_logs(target="monitoring_router")
Expand All @@ -125,7 +125,7 @@ def start_udp_listener(self) -> None:
data, addr = self.udp_sock.recvfrom(2048)
resource_msg = pickle.loads(data)
self.logger.debug("Got UDP Message from {}: {}".format(addr, resource_msg))
self.resource_msgs.put(resource_msg)
self.target_radio.send(resource_msg)
except socket.timeout:
pass

Expand All @@ -136,7 +136,7 @@ def start_udp_listener(self) -> None:
data, addr = self.udp_sock.recvfrom(2048)
msg = pickle.loads(data)
self.logger.debug("Got UDP Message from {}: {}".format(addr, msg))
self.resource_msgs.put(msg)
self.target_radio.send(msg)
last_msg_received_time = time.time()
except socket.timeout:
pass
Expand All @@ -160,7 +160,7 @@ def start_zmq_listener(self) -> None:
assert len(msg) >= 1, "ZMQ Receiver expects tuples of length at least 1, got {}".format(msg)
assert len(msg) == 2, "ZMQ Receiver expects message tuples of exactly length 2, got {}".format(msg)

self.resource_msgs.put(msg)
self.target_radio.send(msg)
except zmq.Again:
pass
except Exception:
Expand Down

0 comments on commit 090322f

Please sign in to comment.