Skip to content

Commit

Permalink
zmq router and udp router are *forwarders* : they receive a message a…
Browse files Browse the repository at this point in the history
…nd forward it on

use the radio interface to make it clear that that is what they are doing

the intention there is to make it cleaner to swap out *how* they perform forwarding - clarifying that these two threads are forwarders, not doing anything else to the message flow.

after this patch, my intention is that:
 i) the ZMQ listener moves into the db manager and there is no multiprocessing queue forwarding from outside of the db manager
 ii) the UDP listener, all that is left, means this can be renamed the UDP radio receiver, sitting as a processes alongside the filesystem radio receiver (and eventually any other radio receivers)

compare to how the interchange sends htexradio messages onwards via some arbitrary other radio
(which right now is always the ZMQRadioSender) -- rather than having its own radio code.

the previous multiprocessing queue code should be replaced with multiprocessing queue sender
  • Loading branch information
benclifford committed Oct 31, 2024
1 parent 22e62ae commit 39fc0de
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 5 deletions.
4 changes: 3 additions & 1 deletion parsl/monitoring/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,8 @@ def filesystem_receiver(q: Queue[TaggedMonitoringMessage], run_dir: str) -> None
new_dir = f"{base_path}/new/"
logger.debug(f"Creating new and tmp paths under {base_path}")

target_radio = MultiprocessingQueueRadioSender(q)

os.makedirs(tmp_dir, exist_ok=True)
os.makedirs(new_dir, exist_ok=True)

Expand All @@ -279,7 +281,7 @@ def filesystem_receiver(q: Queue[TaggedMonitoringMessage], run_dir: str) -> None
message = deserialize(f.read())
logger.debug(f"Message received is: {message}")
assert isinstance(message, tuple)
q.put(cast(TaggedMonitoringMessage, message))
target_radio.send(cast(TaggedMonitoringMessage, message))
os.remove(full_path_filename)
except Exception:
logger.exception(f"Exception processing {filename} - probably will be retried next iteration")
Expand Down
12 changes: 8 additions & 4 deletions parsl/monitoring/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import zmq

from parsl.log_utils import set_file_logger
from parsl.monitoring.radios import MultiprocessingQueueRadioSender
from parsl.monitoring.types import TaggedMonitoringMessage
from parsl.process_loggers import wrap_with_logs
from parsl.utils import setproctitle
Expand Down Expand Up @@ -55,6 +56,9 @@ def __init__(self,
The amount of time in seconds to terminate the hub without receiving any messages, after the last dfk workflow message is received.
resource_msgs : multiprocessing.Queue
A multiprocessing queue to receive messages to be routed onwards to the database process
In later PRs, this will be replaced by a monitoring radio configuration to allow the UDP radio receiver to
forward wherever without code changes in these modules. (and the ZMQ radio will disappear from this file,
moving into the DB manager)
exit_event : Event
An event that the main Parsl process will set to signal that the monitoring router should shut down.
Expand Down Expand Up @@ -98,7 +102,7 @@ def __init__(self,
min_port=zmq_port_range[0],
max_port=zmq_port_range[1])

self.resource_msgs = resource_msgs
self.target_radio = MultiprocessingQueueRadioSender(resource_msgs)
self.exit_event = exit_event

@wrap_with_logs(target="monitoring_router")
Expand All @@ -125,7 +129,7 @@ def start_udp_listener(self) -> None:
data, addr = self.udp_sock.recvfrom(2048)
resource_msg = pickle.loads(data)
self.logger.debug("Got UDP Message from {}: {}".format(addr, resource_msg))
self.resource_msgs.put(resource_msg)
self.target_radio.send(resource_msg)
except socket.timeout:
pass

Expand All @@ -136,7 +140,7 @@ def start_udp_listener(self) -> None:
data, addr = self.udp_sock.recvfrom(2048)
msg = pickle.loads(data)
self.logger.debug("Got UDP Message from {}: {}".format(addr, msg))
self.resource_msgs.put(msg)
self.target_radio.send(msg)
last_msg_received_time = time.time()
except socket.timeout:
pass
Expand All @@ -160,7 +164,7 @@ def start_zmq_listener(self) -> None:
assert len(msg) >= 1, "ZMQ Receiver expects tuples of length at least 1, got {}".format(msg)
assert len(msg) == 2, "ZMQ Receiver expects message tuples of exactly length 2, got {}".format(msg)

self.resource_msgs.put(msg)
self.target_radio.send(msg)
except zmq.Again:
pass
except Exception:
Expand Down

0 comments on commit 39fc0de

Please sign in to comment.