Skip to content

Commit

Permalink
Merge branch 'Parsl:master' into trackingFileProvenance
Browse files Browse the repository at this point in the history
  • Loading branch information
astro-friedel authored Dec 3, 2024
2 parents 0316cf9 + 76e5328 commit af51f0e
Show file tree
Hide file tree
Showing 41 changed files with 337 additions and 413 deletions.
2 changes: 0 additions & 2 deletions docs/reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,6 @@ Exceptions
parsl.providers.errors.ScaleOutFailed
parsl.providers.errors.SchedulerMissingArgs
parsl.providers.errors.ScriptPathError
parsl.channels.errors.ChannelError
parsl.channels.errors.FileCopyException
parsl.executors.high_throughput.errors.WorkerLost
parsl.executors.high_throughput.interchange.ManagerLost
parsl.serialize.errors.DeserializationError
Expand Down
1 change: 0 additions & 1 deletion docs/userguide/monitoring.rst
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ configuration. Here the `parsl.monitoring.MonitoringHub` is specified to use por
],
monitoring=MonitoringHub(
hub_address=address_by_hostname(),
hub_port=55055,
monitoring_debug=False,
resource_monitoring_interval=10,
),
Expand Down
53 changes: 2 additions & 51 deletions parsl/channels/base.py
Original file line number Diff line number Diff line change
@@ -1,54 +1,5 @@
from abc import ABCMeta, abstractmethod, abstractproperty
from typing import Tuple
from abc import ABCMeta


class Channel(metaclass=ABCMeta):
"""Channels are abstractions that enable ExecutionProviders to talk to
resource managers of remote compute facilities.
For certain resources such as campus clusters or supercomputers at
research laboratories, resource requirements may require authentication.
The only remaining Channel, *LocalChannel*, executes commands locally in a
shell.
Channels provide the ability to execute commands remotely, using the
execute_wait method, and manipulate the remote file system using methods
such as push_file, pull_file and makedirs.
Channels should ensure that each launched command runs in a new process
group, so that providers (such as LocalProvider) which terminate long
running commands using process groups can do so.
"""

@abstractmethod
def execute_wait(self, cmd: str, walltime: int = 0) -> Tuple[int, str, str]:
''' Executes the cmd, with a defined walltime.
Args:
- cmd (string): Command string to execute over the channel
- walltime (int) : Timeout in seconds
Returns:
- (exit_code, stdout, stderr) (int, string, string)
'''
pass

@abstractproperty
def script_dir(self) -> str:
''' This is a property. Returns the directory assigned for storing all internal scripts such as
scheduler submit scripts. This is usually where error logs from the scheduler would reside on the
channel destination side.
Args:
- None
Returns:
- Channel script dir
'''
pass

# DFK expects to be able to modify this, so it needs to be in the abstract class
@script_dir.setter
def script_dir(self, value: str) -> None:
pass
pass
30 changes: 0 additions & 30 deletions parsl/channels/errors.py

This file was deleted.

58 changes: 1 addition & 57 deletions parsl/channels/local/local.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import logging
import os
import subprocess

from parsl.channels.base import Channel
from parsl.utils import RepresentationMixin
Expand All @@ -9,58 +7,4 @@


class LocalChannel(Channel, RepresentationMixin):
''' This is not even really a channel, since opening a local shell is not heavy
and done so infrequently that they do not need a persistent channel
'''

def __init__(self):
''' Initialize the local channel. script_dir is required by set to a default.
KwArgs:
- script_dir (string): Directory to place scripts
'''
self.script_dir = None

def execute_wait(self, cmd, walltime=None):
''' Synchronously execute a commandline string on the shell.
Args:
- cmd (string) : Commandline string to execute
- walltime (int) : walltime in seconds
Returns:
- retcode : Return code from the execution
- stdout : stdout string
- stderr : stderr string
'''
try:
logger.debug("Creating process with command '%s'", cmd)
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=True,
preexec_fn=os.setpgrp
)
logger.debug("Created process with pid %s. Performing communicate", proc.pid)
(stdout, stderr) = proc.communicate(timeout=walltime)
retcode = proc.returncode
logger.debug("Process %s returned %s", proc.pid, proc.returncode)

except Exception:
logger.exception(f"Execution of command failed:\n{cmd}")
raise
else:
logger.debug("Execution of command in process %s completed normally", proc.pid)

return (retcode, stdout.decode("utf-8"), stderr.decode("utf-8"))

@property
def script_dir(self):
return self._script_dir

@script_dir.setter
def script_dir(self, value):
if value is not None:
value = os.path.abspath(value)
self._script_dir = value
pass
1 change: 0 additions & 1 deletion parsl/configs/ASPIRE1.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
],
monitoring=MonitoringHub(
hub_address=address_by_interface('ib0'),
hub_port=55055,
resource_monitoring_interval=10,
),
strategy='simple',
Expand Down
2 changes: 0 additions & 2 deletions parsl/dataflow/dflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1280,8 +1280,6 @@ def add_executors(self, executors: Sequence[ParslExecutor]) -> None:
executor.provider.script_dir = os.path.join(self.run_dir, 'submit_scripts')
os.makedirs(executor.provider.script_dir, exist_ok=True)

executor.provider.channel.script_dir = executor.provider.script_dir

self.executors[executor.label] = executor
executor.start()
block_executors = [e for e in executors if isinstance(e, BlockProviderExecutor)]
Expand Down
2 changes: 1 addition & 1 deletion parsl/executors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from typing_extensions import Literal, Self

from parsl.monitoring.radios import MonitoringRadioSender
from parsl.monitoring.radios.base import MonitoringRadioSender


class ParslExecutor(metaclass=ABCMeta):
Expand Down
17 changes: 15 additions & 2 deletions parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,9 @@ def __init__(self,
interchange_launch_cmd = DEFAULT_INTERCHANGE_LAUNCH_CMD
self.interchange_launch_cmd = interchange_launch_cmd

self._result_queue_thread_exit = threading.Event()
self._result_queue_thread: Optional[threading.Thread] = None

radio_mode = "htex"
enable_mpi_mode: bool = False
mpi_launcher: str = "mpiexec"
Expand Down Expand Up @@ -455,9 +458,11 @@ def _result_queue_worker(self):
"""
logger.debug("Result queue worker starting")

while not self.bad_state_is_set:
while not self.bad_state_is_set and not self._result_queue_thread_exit.is_set():
try:
msgs = self.incoming_q.get()
msgs = self.incoming_q.get(timeout_ms=self.poll_period)
if msgs is None: # timeout
continue

except IOError as e:
logger.exception("Caught broken queue with exception code {}: {}".format(e.errno, e))
Expand Down Expand Up @@ -515,6 +520,8 @@ def _result_queue_worker(self):
else:
raise BadMessage("Message received with unknown type {}".format(msg['type']))

logger.info("Closing result ZMQ pipe")
self.incoming_q.close()
logger.info("Result queue worker finished")

def _start_local_interchange_process(self) -> None:
Expand Down Expand Up @@ -817,6 +824,8 @@ def shutdown(self, timeout: float = 10.0):

logger.info("Attempting HighThroughputExecutor shutdown")

logger.info("Terminating interchange and result queue thread")
self._result_queue_thread_exit.set()
self.interchange_proc.terminate()
try:
self.interchange_proc.wait(timeout=timeout)
Expand All @@ -841,6 +850,10 @@ def shutdown(self, timeout: float = 10.0):
logger.info("Closing command client")
self.command_client.close()

logger.info("Waiting for result queue thread exit")
if self._result_queue_thread:
self._result_queue_thread.join()

logger.info("Finished HighThroughputExecutor shutdown attempt")

def get_usage_information(self):
Expand Down
3 changes: 2 additions & 1 deletion parsl/executors/high_throughput/interchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
from parsl.executors.high_throughput.manager_record import ManagerRecord
from parsl.executors.high_throughput.manager_selector import ManagerSelector
from parsl.monitoring.message_type import MessageType
from parsl.monitoring.radios import MonitoringRadioSender, ZMQRadioSender
from parsl.monitoring.radios.base import MonitoringRadioSender
from parsl.monitoring.radios.zmq import ZMQRadioSender
from parsl.process_loggers import wrap_with_logs
from parsl.serialize import serialize as serialize_object
from parsl.utils import setproctitle
Expand Down
17 changes: 13 additions & 4 deletions parsl/executors/high_throughput/zmq_pipes.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,12 +206,21 @@ def __init__(self, ip_address, port_range, cert_dir: Optional[str] = None):
self.port = self.results_receiver.bind_to_random_port(tcp_url(ip_address),
min_port=port_range[0],
max_port=port_range[1])
self.poller = zmq.Poller()
self.poller.register(self.results_receiver, zmq.POLLIN)

def get(self):
def get(self, timeout_ms=None):
"""Get a message from the queue, returning None if timeout expires
without a message. timeout is measured in milliseconds.
"""
logger.debug("Waiting for ResultsIncoming message")
m = self.results_receiver.recv_multipart()
logger.debug("Received ResultsIncoming message")
return m
socks = dict(self.poller.poll(timeout=timeout_ms))
if self.results_receiver in socks and socks[self.results_receiver] == zmq.POLLIN:
m = self.results_receiver.recv_multipart()
logger.debug("Received ResultsIncoming message")
return m
else:
return None

def close(self):
self.results_receiver.close()
Expand Down
2 changes: 1 addition & 1 deletion parsl/monitoring/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from parsl.log_utils import set_file_logger
from parsl.monitoring.errors import MonitoringHubStartError
from parsl.monitoring.radios import MultiprocessingQueueRadioSender
from parsl.monitoring.radios.multiprocessing import MultiprocessingQueueRadioSender
from parsl.monitoring.router import router_starter
from parsl.monitoring.types import TaggedMonitoringMessage
from parsl.multiprocessing import ForkProcess, SizedQueue
Expand Down
Loading

0 comments on commit af51f0e

Please sign in to comment.