Skip to content

Commit

Permalink
put each radio type in its own module
Browse files Browse the repository at this point in the history
in preparation for moving radio-specific radio code into these modules too
  • Loading branch information
benclifford committed Oct 31, 2024
1 parent 2552add commit 4d358d3
Show file tree
Hide file tree
Showing 13 changed files with 221 additions and 201 deletions.
2 changes: 1 addition & 1 deletion parsl/executors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from typing_extensions import Literal, Self

from parsl.monitoring.radios import MonitoringRadioSender
from parsl.monitoring.radios.base import MonitoringRadioSender


class ParslExecutor(metaclass=ABCMeta):
Expand Down
3 changes: 2 additions & 1 deletion parsl/executors/high_throughput/interchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
from parsl.executors.high_throughput.manager_record import ManagerRecord
from parsl.executors.high_throughput.manager_selector import ManagerSelector
from parsl.monitoring.message_type import MessageType
from parsl.monitoring.radios import MonitoringRadioSender, ZMQRadioSender
from parsl.monitoring.radios.base import MonitoringRadioSender
from parsl.monitoring.radios.zmq import ZMQRadioSender
from parsl.process_loggers import wrap_with_logs
from parsl.serialize import serialize as serialize_object
from parsl.utils import setproctitle
Expand Down
2 changes: 1 addition & 1 deletion parsl/monitoring/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from parsl.log_utils import set_file_logger
from parsl.monitoring.errors import MonitoringHubStartError
from parsl.monitoring.radios import MultiprocessingQueueRadioSender
from parsl.monitoring.radios.multiprocessing import MultiprocessingQueueRadioSender
from parsl.monitoring.router import router_starter
from parsl.monitoring.types import TaggedMonitoringMessage
from parsl.multiprocessing import ForkProcess, SizedQueue
Expand Down
191 changes: 0 additions & 191 deletions parsl/monitoring/radios.py

This file was deleted.

Empty file.
13 changes: 13 additions & 0 deletions parsl/monitoring/radios/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import logging
from abc import ABCMeta, abstractmethod
from typing import Optional

_db_manager_excepts: Optional[Exception]

logger = logging.getLogger(__name__)


class MonitoringRadioSender(metaclass=ABCMeta):
@abstractmethod
def send(self, message: object) -> None:
pass
52 changes: 52 additions & 0 deletions parsl/monitoring/radios/filesystem.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import logging
import os
import pickle
import uuid

from parsl.monitoring.radios.base import MonitoringRadioSender

logger = logging.getLogger(__name__)


class FilesystemRadioSender(MonitoringRadioSender):
"""A MonitoringRadioSender that sends messages over a shared filesystem.
The messsage directory structure is based on maildir,
https://en.wikipedia.org/wiki/Maildir
The writer creates a message in tmp/ and then when it is fully
written, moves it atomically into new/
The reader ignores tmp/ and only reads and deletes messages from
new/
This avoids a race condition of reading partially written messages.
This radio is likely to give higher shared filesystem load compared to
the UDP radio, but should be much more reliable.
"""

def __init__(self, *, monitoring_url: str, timeout: int = 10, run_dir: str):
logger.info("filesystem based monitoring channel initializing")
self.base_path = f"{run_dir}/monitor-fs-radio/"
self.tmp_path = f"{self.base_path}/tmp"
self.new_path = f"{self.base_path}/new"

os.makedirs(self.tmp_path, exist_ok=True)
os.makedirs(self.new_path, exist_ok=True)

def send(self, message: object) -> None:
logger.info("Sending a monitoring message via filesystem")

unique_id = str(uuid.uuid4())

tmp_filename = f"{self.tmp_path}/{unique_id}"
new_filename = f"{self.new_path}/{unique_id}"
buffer = message

# this will write the message out then atomically
# move it into new/, so that a partially written
# file will never be observed in new/
with open(tmp_filename, "wb") as f:
pickle.dump(buffer, f)
os.rename(tmp_filename, new_filename)
57 changes: 57 additions & 0 deletions parsl/monitoring/radios/htex.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import logging
import pickle

from parsl.monitoring.radios.base import MonitoringRadioSender

logger = logging.getLogger(__name__)


class HTEXRadioSender(MonitoringRadioSender):

def __init__(self, monitoring_url: str, timeout: int = 10):
"""
Parameters
----------
monitoring_url : str
URL of the form <scheme>://<IP>:<PORT>
timeout : int
timeout, default=10s
"""
logger.info("htex-based monitoring channel initialising")

def send(self, message: object) -> None:
""" Sends a message to the UDP receiver
Parameter
---------
message: object
Arbitrary pickle-able object that is to be sent
Returns:
None
"""

import parsl.executors.high_throughput.monitoring_info

result_queue = parsl.executors.high_throughput.monitoring_info.result_queue

# this message needs to go in the result queue tagged so that it is treated
# i) as a monitoring message by the interchange, and then further more treated
# as a RESOURCE_INFO message when received by monitoring (rather than a NODE_INFO
# which is the implicit default for messages from the interchange)

# for the interchange, the outer wrapper, this needs to be a dict:

interchange_msg = {
'type': 'monitoring',
'payload': message
}

if result_queue:
result_queue.put(pickle.dumps(interchange_msg))
else:
logger.error("result_queue is uninitialized - cannot put monitoring message")

return
17 changes: 17 additions & 0 deletions parsl/monitoring/radios/multiprocessing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from multiprocessing.queues import Queue

from parsl.monitoring.radios.base import MonitoringRadioSender


class MultiprocessingQueueRadioSender(MonitoringRadioSender):
"""A monitoring radio which connects over a multiprocessing Queue.
This radio is intended to be used on the submit side, where components
in the submit process, or processes launched by multiprocessing, will have
access to a Queue shared with the monitoring database code (bypassing the
monitoring router).
"""
def __init__(self, queue: Queue) -> None:
self.queue = queue

def send(self, message: object) -> None:
self.queue.put(message)
Loading

0 comments on commit 4d358d3

Please sign in to comment.