Skip to content

Commit

Permalink
Move monitoring radios to own modules
Browse files Browse the repository at this point in the history
In subsequent PRs, these modules will get more radio-specific code, as
part of PR #3315 monitoring radio plugin work: for example, the receiving
code for each radio should move here too.

I used `git show --color-moved` to check that the moved RadioSender
definitions were not changed.
  • Loading branch information
benclifford committed Nov 27, 2024
1 parent 7c2646e commit 048b6ed
Show file tree
Hide file tree
Showing 13 changed files with 221 additions and 201 deletions.
2 changes: 1 addition & 1 deletion parsl/executors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from typing_extensions import Literal, Self

from parsl.monitoring.radios import MonitoringRadioSender
from parsl.monitoring.radios.base import MonitoringRadioSender


class ParslExecutor(metaclass=ABCMeta):
Expand Down
3 changes: 2 additions & 1 deletion parsl/executors/high_throughput/interchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
from parsl.executors.high_throughput.manager_record import ManagerRecord
from parsl.executors.high_throughput.manager_selector import ManagerSelector
from parsl.monitoring.message_type import MessageType
from parsl.monitoring.radios import MonitoringRadioSender, ZMQRadioSender
from parsl.monitoring.radios.base import MonitoringRadioSender
from parsl.monitoring.radios.zmq import ZMQRadioSender
from parsl.process_loggers import wrap_with_logs
from parsl.serialize import serialize as serialize_object
from parsl.utils import setproctitle
Expand Down
2 changes: 1 addition & 1 deletion parsl/monitoring/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from parsl.log_utils import set_file_logger
from parsl.monitoring.errors import MonitoringHubStartError
from parsl.monitoring.radios import MultiprocessingQueueRadioSender
from parsl.monitoring.radios.multiprocessing import MultiprocessingQueueRadioSender
from parsl.monitoring.router import router_starter
from parsl.monitoring.types import TaggedMonitoringMessage
from parsl.multiprocessing import ForkProcess, SizedQueue
Expand Down
191 changes: 0 additions & 191 deletions parsl/monitoring/radios.py

This file was deleted.

Empty file.
13 changes: 13 additions & 0 deletions parsl/monitoring/radios/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import logging
from abc import ABCMeta, abstractmethod
from typing import Optional

_db_manager_excepts: Optional[Exception]

logger = logging.getLogger(__name__)


class MonitoringRadioSender(metaclass=ABCMeta):
@abstractmethod
def send(self, message: object) -> None:
pass
52 changes: 52 additions & 0 deletions parsl/monitoring/radios/filesystem.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import logging
import os
import pickle
import uuid

from parsl.monitoring.radios.base import MonitoringRadioSender

logger = logging.getLogger(__name__)


class FilesystemRadioSender(MonitoringRadioSender):
"""A MonitoringRadioSender that sends messages over a shared filesystem.
The messsage directory structure is based on maildir,
https://en.wikipedia.org/wiki/Maildir
The writer creates a message in tmp/ and then when it is fully
written, moves it atomically into new/
The reader ignores tmp/ and only reads and deletes messages from
new/
This avoids a race condition of reading partially written messages.
This radio is likely to give higher shared filesystem load compared to
the UDP radio, but should be much more reliable.
"""

def __init__(self, *, monitoring_url: str, timeout: int = 10, run_dir: str):
logger.info("filesystem based monitoring channel initializing")
self.base_path = f"{run_dir}/monitor-fs-radio/"
self.tmp_path = f"{self.base_path}/tmp"
self.new_path = f"{self.base_path}/new"

os.makedirs(self.tmp_path, exist_ok=True)
os.makedirs(self.new_path, exist_ok=True)

def send(self, message: object) -> None:
logger.info("Sending a monitoring message via filesystem")

unique_id = str(uuid.uuid4())

tmp_filename = f"{self.tmp_path}/{unique_id}"
new_filename = f"{self.new_path}/{unique_id}"
buffer = message

# this will write the message out then atomically
# move it into new/, so that a partially written
# file will never be observed in new/
with open(tmp_filename, "wb") as f:
pickle.dump(buffer, f)
os.rename(tmp_filename, new_filename)
57 changes: 57 additions & 0 deletions parsl/monitoring/radios/htex.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import logging
import pickle

from parsl.monitoring.radios.base import MonitoringRadioSender

logger = logging.getLogger(__name__)


class HTEXRadioSender(MonitoringRadioSender):

def __init__(self, monitoring_url: str, timeout: int = 10):
"""
Parameters
----------
monitoring_url : str
URL of the form <scheme>://<IP>:<PORT>
timeout : int
timeout, default=10s
"""
logger.info("htex-based monitoring channel initialising")

def send(self, message: object) -> None:
""" Sends a message to the UDP receiver
Parameter
---------
message: object
Arbitrary pickle-able object that is to be sent
Returns:
None
"""

import parsl.executors.high_throughput.monitoring_info

result_queue = parsl.executors.high_throughput.monitoring_info.result_queue

# this message needs to go in the result queue tagged so that it is treated
# i) as a monitoring message by the interchange, and then further more treated
# as a RESOURCE_INFO message when received by monitoring (rather than a NODE_INFO
# which is the implicit default for messages from the interchange)

# for the interchange, the outer wrapper, this needs to be a dict:

interchange_msg = {
'type': 'monitoring',
'payload': message
}

if result_queue:
result_queue.put(pickle.dumps(interchange_msg))
else:
logger.error("result_queue is uninitialized - cannot put monitoring message")

return
17 changes: 17 additions & 0 deletions parsl/monitoring/radios/multiprocessing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from multiprocessing.queues import Queue

from parsl.monitoring.radios.base import MonitoringRadioSender


class MultiprocessingQueueRadioSender(MonitoringRadioSender):
"""A monitoring radio which connects over a multiprocessing Queue.
This radio is intended to be used on the submit side, where components
in the submit process, or processes launched by multiprocessing, will have
access to a Queue shared with the monitoring database code (bypassing the
monitoring router).
"""
def __init__(self, queue: Queue) -> None:
self.queue = queue

def send(self, message: object) -> None:
self.queue.put(message)
Loading

0 comments on commit 048b6ed

Please sign in to comment.