Skip to content

Commit

Permalink
make interchange use the radio interface for forwarding monitoring me…
Browse files Browse the repository at this point in the history
…ssages

This is intended to clarify that what an interchange is sending is nothing special, beyond a normal monitoring message.

This is intended to loosen up future opportunities in two ways:

this is hard-coded / not configurable in the interchange - it might be later desirable to make
the interchange configurable if wanting to do other stuff with this status information
- eg christines prototype use case (see PR #3359) of wanting to run specific commands when
a worker connects (where a radio message is treated as a more general event hook),
or sending monitoring info to other places not the monitoring db

it might be desirable to deliver monitoring information from a worker node using ZMQ,
for example when using work queue co-processes or the task vine equivalent - where
the cost of starting a ZMQ connection might be amortised across many tasks.
  • Loading branch information
benclifford committed Jul 26, 2024
1 parent 8c3078b commit 609bd00
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 34 deletions.
66 changes: 32 additions & 34 deletions parsl/executors/high_throughput/interchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from parsl.executors.high_throughput.errors import ManagerLost, VersionMismatch
from parsl.executors.high_throughput.manager_record import ManagerRecord
from parsl.monitoring.message_type import MessageType
from parsl.monitoring.radios import MonitoringRadioSender, ZMQRadioSender
from parsl.process_loggers import wrap_with_logs
from parsl.serialize import serialize as serialize_object
from parsl.utils import setproctitle
Expand Down Expand Up @@ -216,36 +217,26 @@ def task_puller(self) -> NoReturn:
task_counter += 1
logger.debug(f"Fetched {task_counter} tasks so far")

def _create_monitoring_channel(self) -> Optional[zmq.Socket]:
if self.hub_address and self.hub_zmq_port:
logger.info("Connecting to MonitoringHub")
# This is a one-off because monitoring is unencrypted
hub_channel = zmq.Context().socket(zmq.DEALER)
hub_channel.set_hwm(0)
hub_channel.connect("tcp://{}:{}".format(self.hub_address, self.hub_zmq_port))
logger.info("Connected to MonitoringHub")
return hub_channel
else:
return None

def _send_monitoring_info(self, hub_channel: Optional[zmq.Socket], manager: ManagerRecord) -> None:
if hub_channel:
def _send_monitoring_info(self, monitoring_radio: Optional[MonitoringRadioSender], manager: ManagerRecord) -> None:
if monitoring_radio:
logger.info("Sending message {} to MonitoringHub".format(manager))

d: Dict = cast(Dict, manager.copy())
d['timestamp'] = datetime.datetime.now()
d['last_heartbeat'] = datetime.datetime.fromtimestamp(d['last_heartbeat'])

hub_channel.send_pyobj((MessageType.NODE_INFO, d))
monitoring_radio.send((MessageType.NODE_INFO, d))

@wrap_with_logs(target="interchange")
def _command_server(self) -> NoReturn:
""" Command server to run async command to the interchange
"""
logger.debug("Command Server Starting")

# Need to create a new ZMQ socket for command server thread
hub_channel = self._create_monitoring_channel()
if self.hub_address is not None and self.hub_zmq_port is not None:
monitoring_radio = ZMQRadioSender(self.hub_address, self.hub_zmq_port)
else:
monitoring_radio = None

reply: Any # the type of reply depends on the command_req received (aka this needs dependent types...)

Expand Down Expand Up @@ -295,7 +286,7 @@ def _command_server(self) -> NoReturn:
if manager_id in self._ready_managers:
m = self._ready_managers[manager_id]
m['active'] = False
self._send_monitoring_info(hub_channel, m)
self._send_monitoring_info(monitoring_radio, m)
else:
logger.warning("Worker to hold was not in ready managers list")

Expand Down Expand Up @@ -330,9 +321,16 @@ def start(self) -> None:
# parent-process-inheritance problems.
signal.signal(signal.SIGTERM, signal.SIG_DFL)

logger.info("Incoming ports bound")
logger.info("Starting main interchange method")

hub_channel = self._create_monitoring_channel()
if self.hub_address is not None and self.hub_zmq_port is not None:
print("logging inside if")
logger.info("BENC: inside if")
print("constructing radio sender")
monitoring_radio = ZMQRadioSender(self.hub_address, self.hub_zmq_port)
logger.info("Created monitoring radio")
else:
monitoring_radio = None

poll_period = self.poll_period

Expand Down Expand Up @@ -363,10 +361,10 @@ def start(self) -> None:
while not kill_event.is_set():
self.socks = dict(poller.poll(timeout=poll_period))

self.process_task_outgoing_incoming(interesting_managers, hub_channel, kill_event)
self.process_results_incoming(interesting_managers, hub_channel)
self.expire_bad_managers(interesting_managers, hub_channel)
self.expire_drained_managers(interesting_managers, hub_channel)
self.process_task_outgoing_incoming(interesting_managers, monitoring_radio, kill_event)
self.process_results_incoming(interesting_managers, monitoring_radio)
self.expire_bad_managers(interesting_managers, monitoring_radio)
self.expire_drained_managers(interesting_managers, monitoring_radio)
self.process_tasks_to_send(interesting_managers)

self.zmq_context.destroy()
Expand All @@ -377,7 +375,7 @@ def start(self) -> None:
def process_task_outgoing_incoming(
self,
interesting_managers: Set[bytes],
hub_channel: Optional[zmq.Socket],
monitoring_radio: Optional[MonitoringRadioSender],
kill_event: threading.Event
) -> None:
"""Process one message from manager on the task_outgoing channel.
Expand Down Expand Up @@ -431,7 +429,7 @@ def process_task_outgoing_incoming(
m.update(msg) # type: ignore[typeddict-item]

logger.info("Registration info for manager {!r}: {}".format(manager_id, msg))
self._send_monitoring_info(hub_channel, m)
self._send_monitoring_info(monitoring_radio, m)

if (msg['python_v'].rsplit(".", 1)[0] != self.current_platform['python_v'].rsplit(".", 1)[0] or
msg['parsl_v'] != self.current_platform['parsl_v']):
Expand Down Expand Up @@ -462,7 +460,7 @@ def process_task_outgoing_incoming(
logger.error(f"Unexpected message type received from manager: {msg['type']}")
logger.debug("leaving task_outgoing section")

def expire_drained_managers(self, interesting_managers: Set[bytes], hub_channel: Optional[zmq.Socket]) -> None:
def expire_drained_managers(self, interesting_managers: Set[bytes], monitoring_radio: Optional[MonitoringRadioSender]) -> None:

for manager_id in list(interesting_managers):
# is it always true that a draining manager will be in interesting managers?
Expand All @@ -475,7 +473,7 @@ def expire_drained_managers(self, interesting_managers: Set[bytes], hub_channel:
self._ready_managers.pop(manager_id)

m['active'] = False
self._send_monitoring_info(hub_channel, m)
self._send_monitoring_info(monitoring_radio, m)

def process_tasks_to_send(self, interesting_managers: Set[bytes]) -> None:
# Check if there are tasks that could be sent to managers
Expand Down Expand Up @@ -519,7 +517,7 @@ def process_tasks_to_send(self, interesting_managers: Set[bytes]) -> None:
else:
logger.debug("either no interesting managers or no tasks, so skipping manager pass")

def process_results_incoming(self, interesting_managers: Set[bytes], hub_channel: Optional[zmq.Socket]) -> None:
def process_results_incoming(self, interesting_managers: Set[bytes], monitoring_radio: Optional[MonitoringRadioSender]) -> None:
# Receive any results and forward to client
if self.results_incoming in self.socks and self.socks[self.results_incoming] == zmq.POLLIN:
logger.debug("entering results_incoming section")
Expand All @@ -539,11 +537,11 @@ def process_results_incoming(self, interesting_managers: Set[bytes], hub_channel
elif r['type'] == 'monitoring':
# the monitoring code makes the assumption that no
# monitoring messages will be received if monitoring
# is not configured, and that hub_channel will only
# is not configured, and that monitoring_radio will only
# be None when monitoring is not configurated.
assert hub_channel is not None
assert monitoring_radio is not None

hub_channel.send_pyobj(r['payload'])
monitoring_radio.send(r['payload'])
elif r['type'] == 'heartbeat':
logger.debug(f"Manager {manager_id!r} sent heartbeat via results connection")
b_messages.append((p_message, r))
Expand Down Expand Up @@ -587,15 +585,15 @@ def process_results_incoming(self, interesting_managers: Set[bytes], hub_channel
interesting_managers.add(manager_id)
logger.debug("leaving results_incoming section")

def expire_bad_managers(self, interesting_managers: Set[bytes], hub_channel: Optional[zmq.Socket]) -> None:
def expire_bad_managers(self, interesting_managers: Set[bytes], monitoring_radio: Optional[MonitoringRadioSender]) -> None:
bad_managers = [(manager_id, m) for (manager_id, m) in self._ready_managers.items() if
time.time() - m['last_heartbeat'] > self.heartbeat_threshold]
for (manager_id, m) in bad_managers:
logger.debug("Last: {} Current: {}".format(m['last_heartbeat'], time.time()))
logger.warning(f"Too many heartbeats missed for manager {manager_id!r} - removing manager")
if m['active']:
m['active'] = False
self._send_monitoring_info(hub_channel, m)
self._send_monitoring_info(monitoring_radio, m)

logger.warning(f"Cancelling htex tasks {m['tasks']} on removed manager")
for tid in m['tasks']:
Expand Down
21 changes: 21 additions & 0 deletions parsl/monitoring/radios.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from multiprocessing.queues import Queue
from typing import Optional

import zmq

from parsl.serialize import serialize

_db_manager_excepts: Optional[Exception]
Expand Down Expand Up @@ -186,3 +188,22 @@ def __init__(self, queue: Queue) -> None:

def send(self, message: object) -> None:
self.queue.put((message, 0))


class ZMQRadioSender(MonitoringRadioSender):
"""A monitoring radio which connects over ZMQ. This radio is not
thread-safe, because its use of ZMQ is not thread-safe.
"""

def __init__(self, hub_address: str, hub_zmq_port: int) -> None:
print("in zmq radio init. about to log.")
logger.debug("Creating ZMQ socket")
print("in zmq radio init. logged first log. about to create context.")
self._hub_channel = zmq.Context().socket(zmq.DEALER)
print("in zmq radio init. created context.")
self._hub_channel.set_hwm(0)
self._hub_channel.connect(f"tcp://{hub_address}:{hub_zmq_port}")
logger.debug("Created ZMQ socket")

def send(self, message: object) -> None:
self._hub_channel.send_pyobj(message)

0 comments on commit 609bd00

Please sign in to comment.