Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Factor interchange monitoring code into a ZMQRadioSender #3556

Merged
merged 4 commits into from
Aug 2, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 30 additions & 34 deletions parsl/executors/high_throughput/interchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from parsl.executors.high_throughput.manager_record import ManagerRecord
from parsl.executors.high_throughput.manager_selector import ManagerSelector
from parsl.monitoring.message_type import MessageType
from parsl.monitoring.radios import MonitoringRadioSender, ZMQRadioSender
from parsl.process_loggers import wrap_with_logs
from parsl.serialize import serialize as serialize_object
from parsl.utils import setproctitle
Expand Down Expand Up @@ -219,36 +220,26 @@ def task_puller(self) -> NoReturn:
task_counter += 1
logger.debug(f"Fetched {task_counter} tasks so far")

def _create_monitoring_channel(self) -> Optional[zmq.Socket]:
if self.hub_address and self.hub_zmq_port:
logger.info("Connecting to MonitoringHub")
# This is a one-off because monitoring is unencrypted
hub_channel = zmq.Context().socket(zmq.DEALER)
hub_channel.set_hwm(0)
hub_channel.connect("tcp://{}:{}".format(self.hub_address, self.hub_zmq_port))
logger.info("Connected to MonitoringHub")
return hub_channel
else:
return None

def _send_monitoring_info(self, hub_channel: Optional[zmq.Socket], manager: ManagerRecord) -> None:
if hub_channel:
def _send_monitoring_info(self, monitoring_radio: Optional[MonitoringRadioSender], manager: ManagerRecord) -> None:
if monitoring_radio:
logger.info("Sending message {} to MonitoringHub".format(manager))

d: Dict = cast(Dict, manager.copy())
d['timestamp'] = datetime.datetime.now()
d['last_heartbeat'] = datetime.datetime.fromtimestamp(d['last_heartbeat'])

hub_channel.send_pyobj((MessageType.NODE_INFO, d))
monitoring_radio.send((MessageType.NODE_INFO, d))

@wrap_with_logs(target="interchange")
def _command_server(self) -> NoReturn:
""" Command server to run async command to the interchange
"""
logger.debug("Command Server Starting")

# Need to create a new ZMQ socket for command server thread
hub_channel = self._create_monitoring_channel()
if self.hub_address is not None and self.hub_zmq_port is not None:
monitoring_radio = ZMQRadioSender(self.hub_address, self.hub_zmq_port)
else:
monitoring_radio = None

reply: Any # the type of reply depends on the command_req received (aka this needs dependent types...)

Expand Down Expand Up @@ -298,7 +289,7 @@ def _command_server(self) -> NoReturn:
if manager_id in self._ready_managers:
m = self._ready_managers[manager_id]
m['active'] = False
self._send_monitoring_info(hub_channel, m)
self._send_monitoring_info(monitoring_radio, m)
else:
logger.warning("Worker to hold was not in ready managers list")

Expand Down Expand Up @@ -333,9 +324,14 @@ def start(self) -> None:
# parent-process-inheritance problems.
signal.signal(signal.SIGTERM, signal.SIG_DFL)

logger.info("Incoming ports bound")
logger.info("Starting main interchange method")

hub_channel = self._create_monitoring_channel()
if self.hub_address is not None and self.hub_zmq_port is not None:
logger.debug("Creating monitoring radio")
monitoring_radio = ZMQRadioSender(self.hub_address, self.hub_zmq_port)
logger.debug("Created monitoring radio")
benclifford marked this conversation as resolved.
Show resolved Hide resolved
else:
monitoring_radio = None

poll_period = self.poll_period

Expand Down Expand Up @@ -366,10 +362,10 @@ def start(self) -> None:
while not kill_event.is_set():
self.socks = dict(poller.poll(timeout=poll_period))

self.process_task_outgoing_incoming(interesting_managers, hub_channel, kill_event)
self.process_results_incoming(interesting_managers, hub_channel)
self.expire_bad_managers(interesting_managers, hub_channel)
self.expire_drained_managers(interesting_managers, hub_channel)
self.process_task_outgoing_incoming(interesting_managers, monitoring_radio, kill_event)
self.process_results_incoming(interesting_managers, monitoring_radio)
self.expire_bad_managers(interesting_managers, monitoring_radio)
self.expire_drained_managers(interesting_managers, monitoring_radio)
self.process_tasks_to_send(interesting_managers)

self.zmq_context.destroy()
Expand All @@ -380,7 +376,7 @@ def start(self) -> None:
def process_task_outgoing_incoming(
self,
interesting_managers: Set[bytes],
hub_channel: Optional[zmq.Socket],
monitoring_radio: Optional[MonitoringRadioSender],
kill_event: threading.Event
) -> None:
"""Process one message from manager on the task_outgoing channel.
Expand Down Expand Up @@ -434,7 +430,7 @@ def process_task_outgoing_incoming(
m.update(msg) # type: ignore[typeddict-item]

logger.info("Registration info for manager {!r}: {}".format(manager_id, msg))
self._send_monitoring_info(hub_channel, m)
self._send_monitoring_info(monitoring_radio, m)

if (msg['python_v'].rsplit(".", 1)[0] != self.current_platform['python_v'].rsplit(".", 1)[0] or
msg['parsl_v'] != self.current_platform['parsl_v']):
Expand Down Expand Up @@ -465,7 +461,7 @@ def process_task_outgoing_incoming(
logger.error(f"Unexpected message type received from manager: {msg['type']}")
logger.debug("leaving task_outgoing section")

def expire_drained_managers(self, interesting_managers: Set[bytes], hub_channel: Optional[zmq.Socket]) -> None:
def expire_drained_managers(self, interesting_managers: Set[bytes], monitoring_radio: Optional[MonitoringRadioSender]) -> None:

for manager_id in list(interesting_managers):
# is it always true that a draining manager will be in interesting managers?
Expand All @@ -478,7 +474,7 @@ def expire_drained_managers(self, interesting_managers: Set[bytes], hub_channel:
self._ready_managers.pop(manager_id)

m['active'] = False
self._send_monitoring_info(hub_channel, m)
self._send_monitoring_info(monitoring_radio, m)

def process_tasks_to_send(self, interesting_managers: Set[bytes]) -> None:
# Check if there are tasks that could be sent to managers
Expand Down Expand Up @@ -521,7 +517,7 @@ def process_tasks_to_send(self, interesting_managers: Set[bytes]) -> None:
else:
logger.debug("either no interesting managers or no tasks, so skipping manager pass")

def process_results_incoming(self, interesting_managers: Set[bytes], hub_channel: Optional[zmq.Socket]) -> None:
def process_results_incoming(self, interesting_managers: Set[bytes], monitoring_radio: Optional[MonitoringRadioSender]) -> None:
# Receive any results and forward to client
if self.results_incoming in self.socks and self.socks[self.results_incoming] == zmq.POLLIN:
logger.debug("entering results_incoming section")
Expand All @@ -541,11 +537,11 @@ def process_results_incoming(self, interesting_managers: Set[bytes], hub_channel
elif r['type'] == 'monitoring':
# the monitoring code makes the assumption that no
# monitoring messages will be received if monitoring
# is not configured, and that hub_channel will only
# is not configured, and that monitoring_radio will only
# be None when monitoring is not configurated.
assert hub_channel is not None
assert monitoring_radio is not None

hub_channel.send_pyobj(r['payload'])
monitoring_radio.send(r['payload'])
elif r['type'] == 'heartbeat':
logger.debug(f"Manager {manager_id!r} sent heartbeat via results connection")
b_messages.append((p_message, r))
Expand Down Expand Up @@ -589,15 +585,15 @@ def process_results_incoming(self, interesting_managers: Set[bytes], hub_channel
interesting_managers.add(manager_id)
logger.debug("leaving results_incoming section")

def expire_bad_managers(self, interesting_managers: Set[bytes], hub_channel: Optional[zmq.Socket]) -> None:
def expire_bad_managers(self, interesting_managers: Set[bytes], monitoring_radio: Optional[MonitoringRadioSender]) -> None:
bad_managers = [(manager_id, m) for (manager_id, m) in self._ready_managers.items() if
time.time() - m['last_heartbeat'] > self.heartbeat_threshold]
for (manager_id, m) in bad_managers:
logger.debug("Last: {} Current: {}".format(m['last_heartbeat'], time.time()))
logger.warning(f"Too many heartbeats missed for manager {manager_id!r} - removing manager")
if m['active']:
m['active'] = False
self._send_monitoring_info(hub_channel, m)
self._send_monitoring_info(monitoring_radio, m)

logger.warning(f"Cancelling htex tasks {m['tasks']} on removed manager")
for tid in m['tasks']:
Expand Down
16 changes: 16 additions & 0 deletions parsl/monitoring/radios.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from multiprocessing.queues import Queue
from typing import Optional

import zmq

from parsl.serialize import serialize

_db_manager_excepts: Optional[Exception]
Expand Down Expand Up @@ -186,3 +188,17 @@ def __init__(self, queue: Queue) -> None:

def send(self, message: object) -> None:
self.queue.put((message, 0))


class ZMQRadioSender(MonitoringRadioSender):
"""A monitoring radio which connects over ZMQ. This radio is not
thread-safe, because its use of ZMQ is not thread-safe.
"""

def __init__(self, hub_address: str, hub_zmq_port: int) -> None:
self._hub_channel = zmq.Context().socket(zmq.DEALER)
self._hub_channel.set_hwm(0)
self._hub_channel.connect(f"tcp://{hub_address}:{hub_zmq_port}")

def send(self, message: object) -> None:
self._hub_channel.send_pyobj(message)
Loading