diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index e5247c5d25..dd9fdb0d5e 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -5,6 +5,7 @@ on: types: - opened - synchronize + merge_group: jobs: main-test-suite: diff --git a/.wci.yml b/.wci.yml index c11a2e82b4..ccea1300a6 100644 --- a/.wci.yml +++ b/.wci.yml @@ -33,7 +33,6 @@ execution_environment: - Slurm - LSF - PBS - - Cobalt - Flux - GridEngine - HTCondor diff --git a/docs/faq.rst b/docs/faq.rst index ca4bd82bdb..f58d2639e7 100644 --- a/docs/faq.rst +++ b/docs/faq.rst @@ -257,15 +257,15 @@ There are a few common situations in which a Parsl script might hang: .. code-block:: python - from libsubmit.providers import Cobalt from parsl.config import Config + from parsl.providers import SlurmProvider from parsl.executors import HighThroughputExecutor config = Config( executors=[ HighThroughputExecutor( - label='ALCF_theta_local', - provider=Cobalt(), + label='htex', + provider=SlurmProvider(), worer_port_range=('50000,55000'), interchange_port_range=('50000,55000') ) diff --git a/docs/index.rst b/docs/index.rst index 65696ec048..c5057c0899 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -107,6 +107,7 @@ Table of Contents quickstart 1-parsl-introduction.ipynb userguide/index + userguide/glossary faq reference devguide/index diff --git a/docs/reference.rst b/docs/reference.rst index 3436635cad..1424e08106 100644 --- a/docs/reference.rst +++ b/docs/reference.rst @@ -115,7 +115,6 @@ Providers :nosignatures: parsl.providers.AWSProvider - parsl.providers.CobaltProvider parsl.providers.CondorProvider parsl.providers.GoogleCloudProvider parsl.providers.GridEngineProvider @@ -174,7 +173,6 @@ Exceptions parsl.channels.errors.BadHostKeyException parsl.channels.errors.BadScriptPath parsl.channels.errors.BadPermsScriptPath - parsl.channels.errors.FileExists parsl.channels.errors.AuthException parsl.channels.errors.SSHException parsl.channels.errors.FileCopyException diff --git a/docs/userguide/configuring.rst b/docs/userguide/configuring.rst index a57e815fe7..88d4456a26 100644 --- a/docs/userguide/configuring.rst +++ b/docs/userguide/configuring.rst @@ -9,7 +9,7 @@ environment. Configuration is described by a Python object (:class:`~parsl.confi so that developers can introspect permissible options, validate settings, and retrieve/edit configurations dynamically during execution. A configuration object specifies -details of the provider, executors, connection channel, allocation size, +details of the provider, executors, allocation size, queues, durations, and data management options. The following example shows a basic configuration object (:class:`~parsl.config.Config`) for the Frontera @@ -123,9 +123,6 @@ Stepping through the following question should help formulate a suitable configu | Torque/PBS based | * `parsl.executors.HighThroughputExecutor` | `parsl.providers.TorqueProvider` | | system | * `parsl.executors.WorkQueueExecutor` | | +---------------------+-----------------------------------------------+----------------------------------------+ -| Cobalt based system | * `parsl.executors.HighThroughputExecutor` | `parsl.providers.CobaltProvider` | -| | * `parsl.executors.WorkQueueExecutor` | | -+---------------------+-----------------------------------------------+----------------------------------------+ | GridEngine based | * `parsl.executors.HighThroughputExecutor` | `parsl.providers.GridEngineProvider` | | system | * `parsl.executors.WorkQueueExecutor` | | +---------------------+-----------------------------------------------+----------------------------------------+ @@ -185,8 +182,6 @@ Stepping through the following question should help formulate a suitable configu | `parsl.providers.TorqueProvider` | Any | * `parsl.launchers.AprunLauncher` | | | | * `parsl.launchers.MpiExecLauncher` | +-------------------------------------+--------------------------+----------------------------------------------------+ -| `parsl.providers.CobaltProvider` | Any | * `parsl.launchers.AprunLauncher` | -+-------------------------------------+--------------------------+----------------------------------------------------+ | `parsl.providers.SlurmProvider` | Any | * `parsl.launchers.SrunLauncher` if native slurm | | | | * `parsl.launchers.AprunLauncher`, otherwise | +-------------------------------------+--------------------------+----------------------------------------------------+ @@ -492,7 +487,7 @@ CC-IN2P3 .. image:: https://cc.in2p3.fr/wp-content/uploads/2017/03/bandeau_accueil.jpg The snippet below shows an example configuration for executing from a login node on IN2P3's Computing Centre. -The configuration uses the `parsl.providers.LocalProvider` to run on a login node primarily to avoid GSISSH, which Parsl does not support yet. +The configuration uses the `parsl.providers.LocalProvider` to run on a login node primarily to avoid GSISSH, which Parsl does not support. This system uses Grid Engine which Parsl interfaces with using the `parsl.providers.GridEngineProvider`. .. literalinclude:: ../../parsl/configs/cc_in2p3.py diff --git a/docs/userguide/execution.rst b/docs/userguide/execution.rst index df17dc458f..832985c164 100644 --- a/docs/userguide/execution.rst +++ b/docs/userguide/execution.rst @@ -27,8 +27,7 @@ retrieve the status of an allocation (e.g., squeue), and cancel a running job (e.g., scancel). Parsl implements providers for local execution (fork), for various cloud platforms using cloud-specific APIs, and for clusters and supercomputers that use a Local Resource Manager -(LRM) to manage access to resources, such as Slurm, HTCondor, -and Cobalt. +(LRM) to manage access to resources, such as Slurm and HTCondor. Each provider implementation may allow users to specify additional parameters for further configuration. Parameters are generally mapped to LRM submission script or cloud API options. Examples of LRM-specific options are partition, wall clock time, @@ -39,15 +38,14 @@ parameters include access keys, instance type, and spot bid price Parsl currently supports the following providers: 1. `parsl.providers.LocalProvider`: The provider allows you to run locally on your laptop or workstation. -2. `parsl.providers.CobaltProvider`: This provider allows you to schedule resources via the Cobalt scheduler. **This provider is deprecated and will be removed by 2024.04**. -3. `parsl.providers.SlurmProvider`: This provider allows you to schedule resources via the Slurm scheduler. -4. `parsl.providers.CondorProvider`: This provider allows you to schedule resources via the Condor scheduler. -5. `parsl.providers.GridEngineProvider`: This provider allows you to schedule resources via the GridEngine scheduler. -6. `parsl.providers.TorqueProvider`: This provider allows you to schedule resources via the Torque scheduler. -7. `parsl.providers.AWSProvider`: This provider allows you to provision and manage cloud nodes from Amazon Web Services. -8. `parsl.providers.GoogleCloudProvider`: This provider allows you to provision and manage cloud nodes from Google Cloud. -9. `parsl.providers.KubernetesProvider`: This provider allows you to provision and manage containers on a Kubernetes cluster. -10. `parsl.providers.LSFProvider`: This provider allows you to schedule resources via IBM's LSF scheduler. +2. `parsl.providers.SlurmProvider`: This provider allows you to schedule resources via the Slurm scheduler. +3. `parsl.providers.CondorProvider`: This provider allows you to schedule resources via the Condor scheduler. +4. `parsl.providers.GridEngineProvider`: This provider allows you to schedule resources via the GridEngine scheduler. +5. `parsl.providers.TorqueProvider`: This provider allows you to schedule resources via the Torque scheduler. +6. `parsl.providers.AWSProvider`: This provider allows you to provision and manage cloud nodes from Amazon Web Services. +7. `parsl.providers.GoogleCloudProvider`: This provider allows you to provision and manage cloud nodes from Google Cloud. +8. `parsl.providers.KubernetesProvider`: This provider allows you to provision and manage containers on a Kubernetes cluster. +9. `parsl.providers.LSFProvider`: This provider allows you to schedule resources via IBM's LSF scheduler. diff --git a/docs/userguide/glossary.rst b/docs/userguide/glossary.rst new file mode 100644 index 0000000000..a1a773b6a8 --- /dev/null +++ b/docs/userguide/glossary.rst @@ -0,0 +1,219 @@ +Glossary of Parsl Terms +======================= + +This glossary defines terms based on their usage within Parsl. By defining our terminology, we hope to create understanding across our community and reduce confusion. When asking for or providing support to fellow Parsl users, please use these terms as defined. + +Our glossary is organized alphabetically in English. Feel free to contribute terms and definitions to this list that will benefit Parsl users. + +.. _glossary: + +.. _appglossary: + +**App:** +---------- + +In Parsl, an app is a small, self-contained program that does a specific job. It's a piece of code, such as a Python function or a Bash script, that can run separately from your main program. Think of it as a mini-tool within your larger toolbox. + +.. _appfutureglossary: + +**AppFuture:** +----------------- + +An AppFuture is a placeholder for the result of an app that runs in the background. It's like a ticket you get when you order food at a restaurant – you get the ticket right away, but you have to wait for the food to be ready. Similarly, you get an AppFuture immediately when you start an app, but you have to wait for the app to finish before you can see the results. + +.. _bashappglossary: + +**Bash App:** +--------------- + +A Bash app is a special kind of app in Parsl that lets you run commands from your computer's terminal (like the ones you type in the command prompt or shell). It's a way to use Parsl to automate tasks that you would normally do manually in the terminal. + +.. _blockglossary: + +**Block:** +------------ + +A block is a group of resources, such as nodes or computational units, allocated for executing tasks. Parsl manages the distribution of work across these resources to expedite task completion. + +.. _checkpointingglossary: + +**Checkpointing:** +--------------------- + +Checkpointing is like saving your progress in a video game. If something goes wrong, you can restart from the last saved point instead of starting over. In Parsl, checkpointing saves the state of your work so you can resume it later if interrupted. + +.. _concurrencyglossary: + +**Concurrency:** +------------------- + +Concurrency means doing multiple things at the same time. In Parsl, it enables your apps to run in parallel across different resources, significantly speeding up program execution. It's like a chef preparing multiple dishes in a single kitchen, switching between all of them quickly. + +.. _configurationglossary: + +**Configuration:** +--------------------- + +Configuration sets up the rules for how Parsl should work. It's like adjusting the settings on your phone – you can choose how you want things to look and behave. In Parsl, you can configure things like how many resources to use, where to store data, and how to handle errors. + +.. _datafutureglossary: + +**DataFuture:** +------------------ + +A DataFuture is a placeholder for a file that an app is creating. It's like a receipt for a package you're expecting – you get the receipt right away, but you have to wait for the package to arrive. Similarly, you get a DataFuture immediately when an app starts creating a file, but you have to wait for the file to be finished before you can use it. + +.. _dfkglossary: + +**DataFlowKernel (DFK):** +------------------------------ + +The DataFlowKernel is like the brain of Parsl. It's the part that controls how your apps run and how they share information. It's like the conductor of an orchestra, making sure that all the musicians play together in harmony. + +.. _elasticityglossary: + +**Elasticity:** +----------------- + +Elasticity refers to the ability to scale resources up or down as needed. In Parsl, it allows you to add or remove blocks of computational resources based on workload demands. + +.. _executionproviderglossary: + +**Execution Provider:** +-------------------------- + +An execution provider acts as a bridge between Parsl and the resources you want to use, such as your laptop, a cluster, or a cloud service. It handles communication with these resources to execute tasks. + +.. _executorglossary: + +**Executor:** +---------------- + +An executor is a manager that determines which app runs on which resource and when. It directs the flow of apps to ensure efficient task execution. It's like a traffic controller, directing the flow of apps to make sure they all get where they need to go. + +.. _futureglossary: + +**Future:** +------------- + +A future is a placeholder for the result of a task that hasn't finished yet. Both AppFuture and DataFuture are types of Futures. You can use the ``.result()`` method to get the actual result when it's ready. + +.. _jobglossary: + +**Job:** +--------- + +A job in Parsl is a unit of work submitted to an execution environment (such as a cluster or cloud) for processing. It can consist of one or more apps executed on computational resources. + +.. _launcherglossary: + +**Launcher:** +---------------- + +A launcher in Parsl is responsible for placing the workers onto each computer, preparing them to run the apps. It’s like a bus driver who brings the players to the stadium, ensuring they are ready to start, but not directly involved in telling them what to do once they arrive. + +.. _managerglossary: + +**Manager:** +-------------- + +A manager in Parsl is responsible for overseeing the execution of tasks on specific compute resources. It's like a supervisor who ensures that all workers (or workers within a block) are carrying out their tasks correctly and efficiently. + +.. _memoizationglossary: + +**Memoization:** +------------------- + +Memoization is like remembering something so you don't have to do it again. In Parsl, if you are using memoization and you run an app with the same inputs multiple times, Parsl will remember the result from the first time and give it to you again instead of running the app again. This can save a lot of time. + +.. _mpiappglossary: + +**MPI App:** +--------------- + +An MPI app is a specialized app that uses the Message Passing Interface (MPI) for communication, which can occur both across nodes and within a single node. MPI enables different parts of the app to communicate and coordinate their activities, similar to how a walkie-talkie allows different teams to stay in sync. + +.. _nodeglossary: + +**Node:** +------------ + +A node in Parsl is like a workstation in a factory. It's a physical or virtual machine that provides the computational power needed to run tasks. Each node can host several workers that execute tasks. + +.. _parallelismglossary: + +**Parallelism:** +------------------- + +Parallelism means doing multiple things at the same time but not necessarily in the same location or using the same resources. In Parsl, it involves running apps simultaneously across different nodes or computational resources, accelerating program execution. Unlike concurrency which is like a chef preparing multiple dishes in a single kitchen, parallelism is like multiple chefs preparing different dishes in separate kitchens, at the same time. + +.. _parslscriptglossary: + +**Parsl Script:** +--------------------- + +A Parsl script is a Python program that uses the Parsl library to define and run apps in parallel. It's like a recipe that tells you what ingredients to use and how to combine them. + +.. _pluginglossary: + +**Plugin:** +--------------- + +A plugin is an add-on for Parsl. It's a piece of code that you can add to Parsl to give it new features or change how it works. It's like an extra tool that you can add to your toolbox. + +.. _pythonappglossary: + +**Python App:** +------------------ + +A Python app is a special kind of app in Parsl that's written as a Python function. It's a way to use Parsl to run your Python code in parallel. + +.. _resourceglossary: + +**Resource:** +--------------- + +A resource in Parsl refers to any computational asset that can be used to execute tasks, such as CPU cores, memory, or entire nodes. It's like the tools and materials you need to get a job done. Resources, often grouped in nodes or clusters, are essential for processing workloads. + +.. _serializationglossary: + +**Serialization:** +-------------------- + +Serialization is like packing your belongings into a suitcase so you can take them on a trip. In Parsl, it means converting your data into a format that can be sent over a network to another computer. + +.. _stagingglossary: + +**Staging:** +--------------- + +Staging in Parsl involves moving data to the appropriate location before an app starts running and can also include moving data back after the app finishes. This process ensures that all necessary data is available where it needs to be for the app to execute properly and that the output data is returned to a specified location once the execution is complete. + +.. _taskglossary: + +**Task:** +------------ + +A task in Parsl is the execution of an app, it is the smallest unit of work that can be executed. It's like a single step in a larger process, where each task is part of a broader workflow or job. + +.. _threadglossary: + +**Thread:** +------------- + +A thread is like a smaller part of a program that can run independently. It's like a worker in a factory who can do their job at the same time as other workers. Threads are commonly used for parallelism within a single node. + +.. _workerglossary: + +**Worker:** +------------- + +A worker in Parsl is an independent process that runs on a node to execute tasks. Unlike threads, which share resources within a single process, workers operate as separate entities, each potentially handling different tasks on the same or different nodes. + +.. _workflowglossary: + +**Workflow:** +---------------- + +A workflow is like a series of steps that you follow to complete a task. In Parsl, it's a way to describe how your apps should run and how they depend on each other, like a flowchart that shows you the order in which things need to happen. A workflow is typically expressed in a Parsl script, which is a Python program that leverages the Parsl library to orchestrate these tasks in a structured manner. + diff --git a/docs/userguide/index.rst b/docs/userguide/index.rst index 12254cd6e2..2a97a6104e 100644 --- a/docs/userguide/index.rst +++ b/docs/userguide/index.rst @@ -21,3 +21,4 @@ User guide usage_tracking plugins parsl_perf + glossary diff --git a/parsl/channels/base.py b/parsl/channels/base.py index 18a573d0e6..754e89c385 100644 --- a/parsl/channels/base.py +++ b/parsl/channels/base.py @@ -120,14 +120,3 @@ def isdir(self, path: str) -> bool: Path of directory to check. """ pass - - @abstractmethod - def abspath(self, path: str) -> str: - """Return the absolute path. - - Parameters - ---------- - path : str - Path for which the absolute path will be returned. - """ - pass diff --git a/parsl/channels/errors.py b/parsl/channels/errors.py index 0358dc3ece..1621c49a44 100644 --- a/parsl/channels/errors.py +++ b/parsl/channels/errors.py @@ -1,7 +1,5 @@ ''' Exceptions raise by Apps. ''' -from typing import Optional - from parsl.errors import ParslError @@ -60,21 +58,6 @@ def __init__(self, e: Exception, hostname: str) -> None: super().__init__("User does not have permissions to access the script_dir", e, hostname) -class FileExists(ChannelError): - ''' Push or pull of file over channel fails since a file of the name already - exists on the destination. - - Contains: - reason(string) - e (paramiko exception object) - hostname (string) - ''' - - def __init__(self, e: Exception, hostname: str, filename: Optional[str] = None) -> None: - super().__init__("File name collision in channel transport phase: {}".format(filename), - e, hostname) - - class AuthException(ChannelError): ''' An error raised during execution of an app. What this exception contains depends entirely on context diff --git a/parsl/channels/local/local.py b/parsl/channels/local/local.py index b94629095e..e6266713ef 100644 --- a/parsl/channels/local/local.py +++ b/parsl/channels/local/local.py @@ -37,19 +37,16 @@ def execute_wait(self, cmd, walltime=None, envs={}): Args: - cmd (string) : Commandline string to execute - - walltime (int) : walltime in seconds, this is not really used now. + - walltime (int) : walltime in seconds Kwargs: - envs (dict) : Dictionary of env variables. This will be used to override the envs set at channel initialization. Returns: - - retcode : Return code from the execution, -1 on fail + - retcode : Return code from the execution - stdout : stdout string - stderr : stderr string - - Raises: - None. ''' current_env = copy.deepcopy(self._envs) current_env.update(envs) @@ -145,16 +142,6 @@ def makedirs(self, path, mode=0o700, exist_ok=False): return os.makedirs(path, mode, exist_ok) - def abspath(self, path): - """Return the absolute path. - - Parameters - ---------- - path : str - Path for which the absolute path will be returned. - """ - return os.path.abspath(path) - @property def script_dir(self): return self._script_dir @@ -162,5 +149,5 @@ def script_dir(self): @script_dir.setter def script_dir(self, value): if value is not None: - value = self.abspath(value) + value = os.path.abspath(value) self._script_dir = value diff --git a/parsl/channels/ssh/ssh.py b/parsl/channels/ssh/ssh.py index c53a26b831..1f06748bae 100644 --- a/parsl/channels/ssh/ssh.py +++ b/parsl/channels/ssh/ssh.py @@ -214,7 +214,6 @@ def pull_file(self, remote_source, local_dir): - str: Local path to file Raises: - - FileExists : Name collision at local directory. - FileCopyException : FileCopy failed. ''' @@ -287,16 +286,6 @@ def makedirs(self, path, mode=0o700, exist_ok=False): self.execute_wait('mkdir -p {}'.format(path)) self._valid_sftp_client().chmod(path, mode) - def abspath(self, path): - """Return the absolute path on the remote side. - - Parameters - ---------- - path : str - Path for which the absolute path will be returned. - """ - return self._valid_sftp_client().normalize(path) - @property def script_dir(self): return self._script_dir diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index 344173c4b1..2cb2f4d660 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -987,7 +987,7 @@ def submit(self, - app_kwargs (dict) : Rest of the kwargs to the fn passed as dict. Returns: - (AppFuture) [DataFutures,] + AppFuture """ diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index ab1498efc4..a1def0466a 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -63,7 +63,6 @@ GENERAL_HTEX_PARAM_DOCS = """provider : :class:`~parsl.providers.base.ExecutionProvider` Provider to access computation resources. Can be one of :class:`~parsl.providers.aws.aws.EC2Provider`, - :class:`~parsl.providers.cobalt.cobalt.Cobalt`, :class:`~parsl.providers.condor.condor.Condor`, :class:`~parsl.providers.googlecloud.googlecloud.GoogleCloud`, :class:`~parsl.providers.gridEngine.gridEngine.GridEngine`, diff --git a/parsl/executors/high_throughput/interchange.py b/parsl/executors/high_throughput/interchange.py index b0228b52f0..be38ccf168 100644 --- a/parsl/executors/high_throughput/interchange.py +++ b/parsl/executors/high_throughput/interchange.py @@ -66,7 +66,7 @@ def __init__(self, If specified the interchange will only listen on this address for connections from workers else, it binds to all addresses. - client_ports : triple(int, int, int) + client_ports : tuple(int, int, int) The ports at which the client can be reached worker_ports : tuple(int, int) @@ -104,7 +104,6 @@ def __init__(self, os.makedirs(self.logdir, exist_ok=True) start_file_logger("{}/interchange.log".format(self.logdir), level=logging_level) - logger.propagate = False logger.debug("Initializing Interchange process") self.client_address = client_address @@ -437,9 +436,13 @@ def process_task_outgoing_incoming( logger.info(f"Manager {manager_id!r} has compatible Parsl version {msg['parsl_v']}") logger.info(f"Manager {manager_id!r} has compatible Python version {msg['python_v'].rsplit('.', 1)[0]}") elif msg['type'] == 'heartbeat': - self._ready_managers[manager_id]['last_heartbeat'] = time.time() - logger.debug("Manager %r sent heartbeat via tasks connection", manager_id) - self.task_outgoing.send_multipart([manager_id, b'', PKL_HEARTBEAT_CODE]) + manager = self._ready_managers.get(manager_id) + if manager: + manager['last_heartbeat'] = time.time() + logger.debug("Manager %r sent heartbeat via tasks connection", manager_id) + self.task_outgoing.send_multipart([manager_id, b'', PKL_HEARTBEAT_CODE]) + else: + logger.warning("Received heartbeat via tasks connection for not-registered manager %r", manager_id) elif msg['type'] == 'drain': self._ready_managers[manager_id]['draining'] = True logger.debug("Manager %r requested drain", manager_id) diff --git a/parsl/executors/high_throughput/mpi_resource_management.py b/parsl/executors/high_throughput/mpi_resource_management.py index 3dd3294648..3f3fc33ea4 100644 --- a/parsl/executors/high_throughput/mpi_resource_management.py +++ b/parsl/executors/high_throughput/mpi_resource_management.py @@ -17,7 +17,6 @@ class Scheduler(Enum): Unknown = 0 Slurm = 1 PBS = 2 - Cobalt = 3 def get_slurm_hosts_list() -> List[str]: @@ -37,13 +36,6 @@ def get_pbs_hosts_list() -> List[str]: return [line.strip() for line in f.readlines()] -def get_cobalt_hosts_list() -> List[str]: - """Get list of COBALT hosts from envvar: COBALT_NODEFILE""" - nodefile_name = os.environ["COBALT_NODEFILE"] - with open(nodefile_name) as f: - return [line.strip() for line in f.readlines()] - - def get_nodes_in_batchjob(scheduler: Scheduler) -> List[str]: """Get nodelist from all supported schedulers""" nodelist = [] @@ -51,8 +43,6 @@ def get_nodes_in_batchjob(scheduler: Scheduler) -> List[str]: nodelist = get_slurm_hosts_list() elif scheduler == Scheduler.PBS: nodelist = get_pbs_hosts_list() - elif scheduler == Scheduler.Cobalt: - nodelist = get_cobalt_hosts_list() else: raise RuntimeError(f"mpi_mode does not support scheduler:{scheduler}") return nodelist @@ -64,8 +54,6 @@ def identify_scheduler() -> Scheduler: return Scheduler.Slurm elif os.environ.get("PBS_NODEFILE"): return Scheduler.PBS - elif os.environ.get("COBALT_NODEFILE"): - return Scheduler.Cobalt else: return Scheduler.Unknown diff --git a/parsl/executors/high_throughput/process_worker_pool.py b/parsl/executors/high_throughput/process_worker_pool.py index 7e238bb61c..e75af86743 100755 --- a/parsl/executors/high_throughput/process_worker_pool.py +++ b/parsl/executors/high_throughput/process_worker_pool.py @@ -650,14 +650,6 @@ def worker( debug: bool, mpi_launcher: str, ): - """ - - Put request token into queue - Get task from task_queue - Pop request from queue - Put result into result_queue - """ - # override the global logger inherited from the __main__ process (which # usually logs to manager.log) with one specific to this worker. global logger diff --git a/parsl/monitoring/db_manager.py b/parsl/monitoring/db_manager.py index 4fcf5ec2e2..abdb038e79 100644 --- a/parsl/monitoring/db_manager.py +++ b/parsl/monitoring/db_manager.py @@ -556,7 +556,7 @@ def _migrate_logs_to_internal(self, logs_queue: queue.Queue, kill_event: threadi logger.debug("Checking STOP conditions: kill event: %s, queue has entries: %s", kill_event.is_set(), logs_queue.qsize() != 0) try: - x, addr = logs_queue.get(timeout=0.1) + x = logs_queue.get(timeout=0.1) except queue.Empty: continue else: diff --git a/parsl/monitoring/monitoring.py b/parsl/monitoring/monitoring.py index e1de80116c..f5079112ca 100644 --- a/parsl/monitoring/monitoring.py +++ b/parsl/monitoring/monitoring.py @@ -16,7 +16,7 @@ from parsl.monitoring.message_type import MessageType from parsl.monitoring.radios import MultiprocessingQueueRadioSender from parsl.monitoring.router import router_starter -from parsl.monitoring.types import AddressedMonitoringMessage +from parsl.monitoring.types import TaggedMonitoringMessage from parsl.multiprocessing import ForkProcess, SizedQueue from parsl.process_loggers import wrap_with_logs from parsl.serialize import deserialize @@ -138,7 +138,7 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No self.exception_q: Queue[Tuple[str, str]] self.exception_q = SizedQueue(maxsize=10) - self.resource_msgs: Queue[Union[AddressedMonitoringMessage, Tuple[Literal["STOP"], Literal[0]]]] + self.resource_msgs: Queue[Union[TaggedMonitoringMessage, Literal["STOP"]]] self.resource_msgs = SizedQueue() self.router_exit_event: ms.Event @@ -237,7 +237,7 @@ def close(self) -> None: logger.debug("Finished waiting for router termination") if len(exception_msgs) == 0: logger.debug("Sending STOP to DBM") - self.resource_msgs.put(("STOP", 0)) + self.resource_msgs.put("STOP") else: logger.debug("Not sending STOP to DBM, because there were DBM exceptions") logger.debug("Waiting for DB termination") @@ -261,7 +261,7 @@ def close(self) -> None: @wrap_with_logs -def filesystem_receiver(logdir: str, q: "queue.Queue[AddressedMonitoringMessage]", run_dir: str) -> None: +def filesystem_receiver(logdir: str, q: "queue.Queue[TaggedMonitoringMessage]", run_dir: str) -> None: logger = set_file_logger("{}/monitoring_filesystem_radio.log".format(logdir), name="monitoring_filesystem_radio", level=logging.INFO) @@ -288,7 +288,7 @@ def filesystem_receiver(logdir: str, q: "queue.Queue[AddressedMonitoringMessage] message = deserialize(f.read()) logger.debug(f"Message received is: {message}") assert isinstance(message, tuple) - q.put(cast(AddressedMonitoringMessage, message)) + q.put(cast(TaggedMonitoringMessage, message)) os.remove(full_path_filename) except Exception: logger.exception(f"Exception processing {filename} - probably will be retried next iteration") diff --git a/parsl/monitoring/radios.py b/parsl/monitoring/radios.py index 37bef0b06a..753b5e33ca 100644 --- a/parsl/monitoring/radios.py +++ b/parsl/monitoring/radios.py @@ -58,7 +58,7 @@ def send(self, message: object) -> None: tmp_filename = f"{self.tmp_path}/{unique_id}" new_filename = f"{self.new_path}/{unique_id}" - buffer = (message, "NA") + buffer = message # this will write the message out then atomically # move it into new/, so that a partially written @@ -187,7 +187,7 @@ def __init__(self, queue: Queue) -> None: self.queue = queue def send(self, message: object) -> None: - self.queue.put((message, 0)) + self.queue.put(message) class ZMQRadioSender(MonitoringRadioSender): diff --git a/parsl/monitoring/router.py b/parsl/monitoring/router.py index e92386c407..1d4b522e82 100644 --- a/parsl/monitoring/router.py +++ b/parsl/monitoring/router.py @@ -14,7 +14,7 @@ import zmq from parsl.log_utils import set_file_logger -from parsl.monitoring.types import AddressedMonitoringMessage, TaggedMonitoringMessage +from parsl.monitoring.types import TaggedMonitoringMessage from parsl.process_loggers import wrap_with_logs from parsl.utils import setproctitle @@ -125,7 +125,7 @@ def start_udp_listener(self) -> None: data, addr = self.udp_sock.recvfrom(2048) resource_msg = pickle.loads(data) self.logger.debug("Got UDP Message from {}: {}".format(addr, resource_msg)) - self.resource_msgs.put((resource_msg, addr)) + self.resource_msgs.put(resource_msg) except socket.timeout: pass @@ -136,7 +136,7 @@ def start_udp_listener(self) -> None: data, addr = self.udp_sock.recvfrom(2048) msg = pickle.loads(data) self.logger.debug("Got UDP Message from {}: {}".format(addr, msg)) - self.resource_msgs.put((msg, addr)) + self.resource_msgs.put(msg) last_msg_received_time = time.time() except socket.timeout: pass @@ -160,10 +160,7 @@ def start_zmq_listener(self) -> None: assert len(msg) >= 1, "ZMQ Receiver expects tuples of length at least 1, got {}".format(msg) assert len(msg) == 2, "ZMQ Receiver expects message tuples of exactly length 2, got {}".format(msg) - msg_0: AddressedMonitoringMessage - msg_0 = (msg, 0) - - self.resource_msgs.put(msg_0) + self.resource_msgs.put(msg) except zmq.Again: pass except Exception: diff --git a/parsl/monitoring/types.py b/parsl/monitoring/types.py index c64dd22e7d..662d1421fb 100644 --- a/parsl/monitoring/types.py +++ b/parsl/monitoring/types.py @@ -1,14 +1,11 @@ -from typing import Any, Dict, Tuple, Union +from typing import Any, Dict, Tuple from typing_extensions import TypeAlias from parsl.monitoring.message_type import MessageType -# A basic parsl monitoring message is wrapped by up to two wrappers: -# The basic monitoring message dictionary can first be tagged, giving -# a TaggedMonitoringMessage, and then that can be further tagged with -# an often unused sender address, giving an AddressedMonitoringMessage. +# A MonitoringMessage dictionary can be tagged, giving a +# TaggedMonitoringMessage. MonitoringMessage: TypeAlias = Dict[str, Any] TaggedMonitoringMessage: TypeAlias = Tuple[MessageType, MonitoringMessage] -AddressedMonitoringMessage: TypeAlias = Tuple[TaggedMonitoringMessage, Union[str, int]] diff --git a/parsl/providers/__init__.py b/parsl/providers/__init__.py index 150f425f3d..6855915ba7 100644 --- a/parsl/providers/__init__.py +++ b/parsl/providers/__init__.py @@ -1,7 +1,6 @@ # Cloud Providers from parsl.providers.aws.aws import AWSProvider from parsl.providers.azure.azure import AzureProvider -from parsl.providers.cobalt.cobalt import CobaltProvider from parsl.providers.condor.condor import CondorProvider from parsl.providers.googlecloud.googlecloud import GoogleCloudProvider from parsl.providers.grid_engine.grid_engine import GridEngineProvider @@ -15,7 +14,6 @@ from parsl.providers.torque.torque import TorqueProvider __all__ = ['LocalProvider', - 'CobaltProvider', 'CondorProvider', 'GridEngineProvider', 'SlurmProvider', diff --git a/parsl/providers/base.py b/parsl/providers/base.py index 5475f86d4c..d7d236cacb 100644 --- a/parsl/providers/base.py +++ b/parsl/providers/base.py @@ -2,7 +2,6 @@ from abc import ABCMeta, abstractmethod, abstractproperty from typing import Any, Dict, List, Optional -from parsl.channels.base import Channel from parsl.jobs.states import JobStatus logger = logging.getLogger(__name__) @@ -12,7 +11,7 @@ class ExecutionProvider(metaclass=ABCMeta): """Execution providers are responsible for managing execution resources that have a Local Resource Manager (LRM). For instance, campus clusters and supercomputers generally have LRMs (schedulers) such as Slurm, - Torque/PBS, Condor and Cobalt. Clouds, on the other hand, have API + Torque/PBS, and Condor. Clouds, on the other hand, have API interfaces that allow much more fine-grained composition of an execution environment. An execution provider abstracts these types of resources and provides a single uniform interface to them. @@ -154,18 +153,3 @@ def status_polling_interval(self) -> int: :return: the number of seconds to wait between calls to status() """ pass - - -class Channeled(): - """A marker type to indicate that parsl should manage a Channel for this provider""" - def __init__(self) -> None: - self.channel: Channel - pass - - -class MultiChanneled(): - """A marker type to indicate that parsl should manage multiple Channels for this provider""" - - def __init__(self) -> None: - self.channels: List[Channel] - pass diff --git a/parsl/providers/cobalt/__init__.py b/parsl/providers/cobalt/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/parsl/providers/cobalt/cobalt.py b/parsl/providers/cobalt/cobalt.py deleted file mode 100644 index 4039dfcbea..0000000000 --- a/parsl/providers/cobalt/cobalt.py +++ /dev/null @@ -1,236 +0,0 @@ -import logging -import os -import time -import warnings - -from parsl.channels import LocalChannel -from parsl.jobs.states import JobState, JobStatus -from parsl.launchers import AprunLauncher -from parsl.providers.cluster_provider import ClusterProvider -from parsl.providers.cobalt.template import template_string -from parsl.providers.errors import ScaleOutFailed -from parsl.utils import RepresentationMixin, wtime_to_minutes - -logger = logging.getLogger(__name__) - -translate_table = { - 'QUEUED': JobState.PENDING, - 'STARTING': JobState.PENDING, - 'RUNNING': JobState.RUNNING, - 'EXITING': JobState.COMPLETED, - 'KILLING': JobState.COMPLETED -} - - -class CobaltProvider(ClusterProvider, RepresentationMixin): - """ Cobalt Execution Provider - - WARNING: CobaltProvider is deprecated and will be removed by 2024.04 - - This provider uses cobalt to submit (qsub), obtain the status of (qstat), and cancel (qdel) - jobs. Theo script to be used is created from a template file in this - same module. - - Parameters - ---------- - channel : Channel - Channel for accessing this provider. Possible channels include - :class:`~parsl.channels.LocalChannel` (the default), - :class:`~parsl.channels.SSHChannel`, or - :class:`~parsl.channels.SSHInteractiveLoginChannel`. - nodes_per_block : int - Nodes to provision per block. - min_blocks : int - Minimum number of blocks to maintain. - max_blocks : int - Maximum number of blocks to maintain. - walltime : str - Walltime requested per block in HH:MM:SS. - account : str - Account that the job will be charged against. - queue : str - Torque queue to request blocks from. - scheduler_options : str - String to prepend to the submit script to the scheduler. - worker_init : str - Command to be run before starting a worker, such as 'module load Anaconda; source activate env'. - launcher : Launcher - Launcher for this provider. Possible launchers include - :class:`~parsl.launchers.AprunLauncher` (the default) or, - :class:`~parsl.launchers.SingleNodeLauncher` - """ - def __init__(self, - channel=LocalChannel(), - nodes_per_block=1, - init_blocks=0, - min_blocks=0, - max_blocks=1, - parallelism=1, - walltime="00:10:00", - account=None, - queue=None, - scheduler_options='', - worker_init='', - launcher=AprunLauncher(), - cmd_timeout=10): - label = 'cobalt' - super().__init__(label, - channel=channel, - nodes_per_block=nodes_per_block, - init_blocks=init_blocks, - min_blocks=min_blocks, - max_blocks=max_blocks, - parallelism=parallelism, - walltime=walltime, - launcher=launcher, - cmd_timeout=cmd_timeout) - - self.account = account - self.queue = queue - self.scheduler_options = scheduler_options - self.worker_init = worker_init - warnings.warn("CobaltProvider is deprecated; This will be removed after 2024-04", - DeprecationWarning, - stacklevel=2) - - def _status(self): - """Returns the status list for a list of job_ids - - Args: - self - - Returns: - [status...] : Status list of all jobs - """ - - jobs_missing = list(self.resources.keys()) - - retcode, stdout, stderr = self.execute_wait("qstat -u $USER") - - # Execute_wait failed. Do no update. - if retcode != 0: - return - - for line in stdout.split('\n'): - if line.startswith('='): - continue - - parts = line.upper().split() - if parts and parts[0] != 'JOBID': - job_id = parts[0] - - if job_id not in self.resources: - continue - - status = translate_table.get(parts[4], JobState.UNKNOWN) - - self.resources[job_id]['status'] = JobStatus(status) - jobs_missing.remove(job_id) - - # squeue does not report on jobs that are not running. So we are filling in the - # blanks for missing jobs, we might lose some information about why the jobs failed. - for missing_job in jobs_missing: - self.resources[missing_job]['status'] = JobStatus(JobState.COMPLETED) - - def submit(self, command, tasks_per_node, job_name="parsl.cobalt"): - """ Submits the command onto an Local Resource Manager job of parallel elements. - Submit returns an ID that corresponds to the task that was just submitted. - - If tasks_per_node < 1 : ! This is illegal. tasks_per_node should be integer - - If tasks_per_node == 1: - A single node is provisioned - - If tasks_per_node > 1 : - tasks_per_node number of nodes are provisioned. - - Args: - - command :(String) Commandline invocation to be made on the remote side. - - tasks_per_node (int) : command invocations to be launched per node - - Kwargs: - - job_name (String): Name for job, must be unique - - Returns: - - None: At capacity, cannot provision more - - job_id: (string) Identifier for the job - - """ - - account_opt = '-A {}'.format(self.account) if self.account is not None else '' - - job_name = "parsl.{0}.{1}".format(job_name, time.time()) - - script_path = "{0}/{1}.submit".format(self.script_dir, job_name) - script_path = os.path.abspath(script_path) - - job_config = {} - job_config["scheduler_options"] = self.scheduler_options - job_config["worker_init"] = self.worker_init - - logger.debug("Requesting nodes_per_block:%s tasks_per_node:%s", - self.nodes_per_block, tasks_per_node) - - # Wrap the command - job_config["user_script"] = self.launcher(command, tasks_per_node, self.nodes_per_block) - - queue_opt = '-q {}'.format(self.queue) if self.queue is not None else '' - - logger.debug("Writing submit script") - self._write_submit_script(template_string, script_path, job_name, job_config) - - channel_script_path = self.channel.push_file(script_path, self.channel.script_dir) - - command = 'qsub -n {0} {1} -t {2} {3} {4}'.format( - self.nodes_per_block, queue_opt, wtime_to_minutes(self.walltime), account_opt, channel_script_path) - logger.debug("Executing {}".format(command)) - - retcode, stdout, stderr = self.execute_wait(command) - - # TODO : FIX this block - if retcode != 0: - logger.error("Failed command: {0}".format(command)) - logger.error("Launch failed stdout:\n{0} \nstderr:{1}\n".format(stdout, stderr)) - - logger.debug("Retcode:%s STDOUT:%s STDERR:%s", retcode, stdout.strip(), stderr.strip()) - - job_id = None - - if retcode == 0: - # We should be getting only one line back - job_id = stdout.strip() - self.resources[job_id] = {'job_id': job_id, 'status': JobStatus(JobState.PENDING)} - else: - logger.error("Submit command failed: {0}".format(stderr)) - raise ScaleOutFailed(self.__class__, "Request to submit job to local scheduler failed") - - logger.debug("Returning job id : {0}".format(job_id)) - return job_id - - def cancel(self, job_ids): - """ Cancels the jobs specified by a list of job ids - - Args: - job_ids : [ ...] - - Returns : - [True/False...] : If the cancel operation fails the entire list will be False. - """ - - job_id_list = ' '.join(job_ids) - retcode, stdout, stderr = self.execute_wait("qdel {0}".format(job_id_list)) - rets = None - if retcode == 0: - for jid in job_ids: - # ??? - # self.resources[jid]['status'] = translate_table['KILLING'] # Setting state to cancelled - self.resources[jid]['status'] = JobStatus(JobState.COMPLETED) - rets = [True for i in job_ids] - else: - rets = [False for i in job_ids] - - return rets - - @property - def status_polling_interval(self): - return 60 diff --git a/parsl/providers/cobalt/template.py b/parsl/providers/cobalt/template.py deleted file mode 100755 index 07141dd136..0000000000 --- a/parsl/providers/cobalt/template.py +++ /dev/null @@ -1,17 +0,0 @@ -template_string = '''#!/bin/bash -el -${scheduler_options} - -${worker_init} - -echo "Starting Cobalt job script" - -echo "----Cobalt Nodefile: -----" -cat $$COBALT_NODEFILE -echo "--------------------------" - -export JOBNAME="${jobname}" - -$user_script - -echo "End of Cobalt job" -''' diff --git a/parsl/tests/configs/cooley_htex.py b/parsl/tests/configs/cooley_htex.py deleted file mode 100644 index 202379d0af..0000000000 --- a/parsl/tests/configs/cooley_htex.py +++ /dev/null @@ -1,37 +0,0 @@ -# UNTESTED - -from parsl.config import Config -from parsl.executors import HighThroughputExecutor -from parsl.launchers import MpiRunLauncher -from parsl.providers import CobaltProvider - -# If you are a developer running tests, make sure to update parsl/tests/configs/user_opts.py -# If you are a user copying-and-pasting this as an example, make sure to either -# 1) create a local `user_opts.py`, or -# 2) delete the user_opts import below and replace all appearances of `user_opts` with the literal value -# (i.e., user_opts['swan']['username'] -> 'your_username') -from .user_opts import user_opts - -config = Config( - executors=[ - HighThroughputExecutor( - label="cooley_htex", - worker_debug=False, - cores_per_worker=1, - encrypted=True, - provider=CobaltProvider( - queue='debug', - account=user_opts['cooley']['account'], - launcher=MpiRunLauncher(), # UNTESTED COMPONENT - scheduler_options=user_opts['cooley']['scheduler_options'], - worker_init=user_opts['cooley']['worker_init'], - init_blocks=1, - max_blocks=1, - min_blocks=1, - nodes_per_block=4, - cmd_timeout=60, - walltime='00:10:00', - ), - ) - ] -) diff --git a/parsl/tests/configs/theta.py b/parsl/tests/configs/theta.py deleted file mode 100644 index 71ed25142b..0000000000 --- a/parsl/tests/configs/theta.py +++ /dev/null @@ -1,37 +0,0 @@ -from parsl.config import Config -from parsl.executors import HighThroughputExecutor -from parsl.launchers import AprunLauncher -from parsl.providers import CobaltProvider - -from .user_opts import user_opts - - -def fresh_config(): - return Config( - executors=[ - HighThroughputExecutor( - label='theta_local_htex_multinode', - max_workers_per_node=1, - encrypted=True, - provider=CobaltProvider( - queue=user_opts['theta']['queue'], - account=user_opts['theta']['account'], - launcher=AprunLauncher(overrides="-d 64"), - walltime='00:10:00', - nodes_per_block=2, - init_blocks=1, - max_blocks=1, - # string to prepend to #COBALT blocks in the submit - # script to the scheduler eg: '#COBALT -t 50' - scheduler_options='', - # Command to be run before starting a worker, such as: - # 'module load Anaconda; source activate parsl_env'. - worker_init=user_opts['theta']['worker_init'], - cmd_timeout=120, - ), - ) - ], - ) - - -config = fresh_config() diff --git a/parsl/tests/integration/test_channels/test_channels.py b/parsl/tests/integration/test_channels/test_channels.py deleted file mode 100644 index f5943940eb..0000000000 --- a/parsl/tests/integration/test_channels/test_channels.py +++ /dev/null @@ -1,17 +0,0 @@ -from parsl.channels.local.local import LocalChannel - - -def test_local(): - - channel = LocalChannel(None, None) - - ec, out, err = channel.execute_wait('echo "pwd: $PWD"', 2) - - assert ec == 0, "Channel execute failed" - print("Stdout: ", out) - print("Stderr: ", err) - - -if __name__ == "__main__": - - test_local() diff --git a/parsl/tests/manual_tests/test_fan_in_out_htex_remote.py b/parsl/tests/manual_tests/test_fan_in_out_htex_remote.py deleted file mode 100644 index b780fd47b4..0000000000 --- a/parsl/tests/manual_tests/test_fan_in_out_htex_remote.py +++ /dev/null @@ -1,88 +0,0 @@ -import logging - -import parsl -from parsl.app.app import python_app -from parsl.config import Config -from parsl.executors import HighThroughputExecutor -from parsl.launchers import AprunLauncher -from parsl.monitoring.monitoring import MonitoringHub -from parsl.providers import CobaltProvider - - -def local_setup(): - threads_config = Config( - executors=[ - HighThroughputExecutor( - label="theta_htex", - # worker_debug=True, - cores_per_worker=4, - encrypted=True, - provider=CobaltProvider( - queue='debug-flat-quad', - account='CSC249ADCD01', - launcher=AprunLauncher(overrides="-d 64"), - worker_init='source activate parsl-issues', - init_blocks=1, - max_blocks=1, - min_blocks=1, - nodes_per_block=4, - cmd_timeout=60, - walltime='00:10:00', - ), - ) - ], - monitoring=MonitoringHub( - hub_port=55055, - logging_level=logging.DEBUG, - resource_monitoring_interval=10), - strategy='none') - parsl.load(threads_config) - - -def local_teardown(): - parsl.clear() - - -@python_app -def inc(x): - import time - start = time.time() - sleep_duration = 30.0 - while True: - x += 1 - end = time.time() - if end - start >= sleep_duration: - break - return x - - -@python_app -def add_inc(inputs=[]): - import time - start = time.time() - sleep_duration = 30.0 - res = sum(inputs) - while True: - res += 1 - end = time.time() - if end - start >= sleep_duration: - break - return res - - -if __name__ == "__main__": - - total = 200 - half = int(total / 2) - one_third = int(total / 3) - two_third = int(total / 3 * 2) - futures_1 = [inc(i) for i in range(total)] - futures_2 = [add_inc(inputs=futures_1[0:half]), - add_inc(inputs=futures_1[half:total])] - futures_3 = [inc(futures_2[0]) for _ in range(half)] + [inc(futures_2[1]) for _ in range(half)] - futures_4 = [add_inc(inputs=futures_3[0:one_third]), - add_inc(inputs=futures_3[one_third:two_third]), - add_inc(inputs=futures_3[two_third:total])] - - print([f.result() for f in futures_4]) - print("Done") diff --git a/parsl/tests/site_tests/site_config_selector.py b/parsl/tests/site_tests/site_config_selector.py index 8e41a9103f..921df197b9 100644 --- a/parsl/tests/site_tests/site_config_selector.py +++ b/parsl/tests/site_tests/site_config_selector.py @@ -7,12 +7,7 @@ def fresh_config(): hostname = os.getenv('PARSL_HOSTNAME', platform.uname().node) print("Loading config for {}".format(hostname)) - if 'thetalogin' in hostname: - from parsl.tests.configs.theta import fresh_config - config = fresh_config() - print("Loading Theta config") - - elif 'frontera' in hostname: + if 'frontera' in hostname: print("Loading Frontera config") from parsl.tests.configs.frontera import fresh_config config = fresh_config() diff --git a/parsl/tests/sites/midway.rst b/parsl/tests/sites/midway.rst deleted file mode 100644 index 462e12d81c..0000000000 --- a/parsl/tests/sites/midway.rst +++ /dev/null @@ -1,52 +0,0 @@ -Running Parsl on Midway -======================= - -This is a brief guide to running Parsl on UChicago RCC's midway cluster. - -Requirements -============ - -Make sure you have python3.6 and Parsl installed with all it's dependencies. - -Running IPP -=========== - -In order to run Parsl apps on Midway nodes, we need to first start an IPython controller on the login node.:: - - ipcontroller --port=5XXXX --ip=* - -Once the ipcontroller is started in a separate terminal or in a screen session, we can now run parsl scripts. - -Parsl Config: -============= - -Here's a config for Midway that starts with a request for 2 nodes. - -.. code:: python3 - - USERNAME = - - { "site" : "Midway_Remote_Westmere", - "auth" : { - "channel" : "ssh", - "hostname" : "swift.rcc.uchicago.edu", - "username" : USERNAME, - "script_dir" : "/scratch/midway2/{0}/parsl_scripts".format(USERNAME) - }, - "execution" : { - "executor" : "ipp", - "provider" : "slurm", - "block" : { # Definition of a block - "nodes" : 1, # of nodes in that block - "task_blocks" : 1, # total tasks in a block - "init_blocks" : 1, - "max_blocks" : 1, - "options" : { - "partition" : "westmere", - "overrides" : """module load python/3.5.2+gcc-4.8; source /scratch/midway2/yadunand/parsl_env_3.5.2_gcc/bin/activate""" - } - } - } - } - - diff --git a/parsl/tests/integration/test_channels/test_local_channel.py b/parsl/tests/test_channels/test_local_channel.py similarity index 86% rename from parsl/tests/integration/test_channels/test_local_channel.py rename to parsl/tests/test_channels/test_local_channel.py index cff403a611..2ac13d4c13 100644 --- a/parsl/tests/integration/test_channels/test_local_channel.py +++ b/parsl/tests/test_channels/test_local_channel.py @@ -1,6 +1,9 @@ +import pytest + from parsl.channels.local.local import LocalChannel +@pytest.mark.local def test_env(): ''' Regression testing for issue #27 ''' @@ -15,9 +18,8 @@ def test_env(): x = [s for s in stdout if s.startswith("HOME=")] assert x, "HOME not found" - print("RC:{} \nSTDOUT:{} \nSTDERR:{}".format(rc, stdout, stderr)) - +@pytest.mark.local def test_env_mod(): ''' Testing for env update at execute time. ''' @@ -34,9 +36,3 @@ def test_env_mod(): x = [s for s in stdout if s.startswith("TEST_ENV=fooo")] assert x, "User set env missing" - - -if __name__ == "__main__": - - test_env() - test_env_mod() diff --git a/parsl/tests/test_monitoring/test_basic.py b/parsl/tests/test_monitoring/test_basic.py index 1c792a9d82..9ffa21df01 100644 --- a/parsl/tests/test_monitoring/test_basic.py +++ b/parsl/tests/test_monitoring/test_basic.py @@ -42,6 +42,18 @@ def htex_udp_config(): return c +def htex_filesystem_config(): + """This config will force filesystem radio""" + from parsl.tests.configs.htex_local_alternate import fresh_config + c = fresh_config() + assert len(c.executors) == 1 + + assert c.executors[0].radio_mode == "htex", "precondition: htex has a radio mode attribute, configured for htex radio" + c.executors[0].radio_mode = "filesystem" + + return c + + def workqueue_config(): from parsl.tests.configs.workqueue_ex import fresh_config c = fresh_config() @@ -61,7 +73,7 @@ def taskvine_config(): @pytest.mark.local -@pytest.mark.parametrize("fresh_config", [htex_config, htex_udp_config, workqueue_config, taskvine_config]) +@pytest.mark.parametrize("fresh_config", [htex_config, htex_filesystem_config, htex_udp_config, workqueue_config, taskvine_config]) def test_row_counts(tmpd_cwd, fresh_config): # this is imported here rather than at module level because # it isn't available in a plain parsl install, so this module diff --git a/parsl/tests/test_providers/test_cobalt_deprecation_warning.py b/parsl/tests/test_providers/test_cobalt_deprecation_warning.py deleted file mode 100644 index 249a4d90b9..0000000000 --- a/parsl/tests/test_providers/test_cobalt_deprecation_warning.py +++ /dev/null @@ -1,18 +0,0 @@ -import warnings - -import pytest - -from parsl.providers import CobaltProvider - - -@pytest.mark.local -def test_deprecation_warning(): - - with warnings.catch_warnings(record=True) as w: - warnings.simplefilter("always") - - CobaltProvider() - - assert len(w) == 1 - assert issubclass(w[-1].category, DeprecationWarning) - assert "CobaltProvider" in str(w[-1].message) diff --git a/parsl/tests/test_scaling/test_worker_interchange_bad_messages_3262.py b/parsl/tests/test_scaling/test_worker_interchange_bad_messages_3262.py new file mode 100644 index 0000000000..eee128634e --- /dev/null +++ b/parsl/tests/test_scaling/test_worker_interchange_bad_messages_3262.py @@ -0,0 +1,92 @@ +import os +import signal +import time + +import pytest +import zmq + +import parsl +from parsl.channels import LocalChannel +from parsl.config import Config +from parsl.executors import HighThroughputExecutor +from parsl.launchers import SimpleLauncher +from parsl.providers import LocalProvider + +T_s = 1 + + +def fresh_config(): + htex = HighThroughputExecutor( + heartbeat_period=1 * T_s, + heartbeat_threshold=3 * T_s, + label="htex_local", + worker_debug=True, + cores_per_worker=1, + encrypted=False, + provider=LocalProvider( + channel=LocalChannel(), + init_blocks=0, + min_blocks=0, + max_blocks=0, + launcher=SimpleLauncher(), + ), + ) + c = Config( + executors=[htex], + strategy='none', + strategy_period=0.5, + ) + return c, htex + + +@parsl.python_app +def app(): + return 7 + + +@pytest.mark.local +@pytest.mark.parametrize("msg", + (b'FuzzyByte\rSTREAM', # not JSON + b'{}', # missing fields + b'{"type":"heartbeat"}', # regression test #3262 + ) + ) +def test_bad_messages(try_assert, msg): + """This tests that the interchange is resilient to a few different bad + messages: malformed messages caused by implementation errors, and + heartbeat messages from managers that are not registered. + + The heartbeat test is a regression test for issues #3262, #3632 + """ + + c, htex = fresh_config() + + with parsl.load(c): + + # send a bad message into the interchange on the task_outgoing worker + # channel, and then check that the interchange is still alive enough + # that we can scale out a block and run a task. + + (task_port, result_port) = htex.command_client.run("WORKER_PORTS") + + context = zmq.Context() + channel_timeout = 10000 # in milliseconds + task_channel = context.socket(zmq.DEALER) + task_channel.setsockopt(zmq.LINGER, 0) + task_channel.setsockopt(zmq.IDENTITY, b'testid') + + task_channel.set_hwm(0) + task_channel.setsockopt(zmq.SNDTIMEO, channel_timeout) + task_channel.connect(f"tcp://localhost:{task_port}") + + task_channel.send(msg) + + # If the interchange exits, it's likely that this test will hang rather + # than raise an error, because the interchange interaction code + # assumes the interchange is always there. + # In the case of issue #3262, an exception message goes to stderr, and + # no error goes to the interchange log file. + htex.scale_out_facade(1) + try_assert(lambda: len(htex.connected_managers()) == 1, timeout_ms=10000) + + assert app().result() == 7 diff --git a/parsl/tests/test_serialization/test_3495_deserialize_managerlost.py b/parsl/tests/test_serialization/test_3495_deserialize_managerlost.py index 74c0923108..da912a6a10 100644 --- a/parsl/tests/test_serialization/test_3495_deserialize_managerlost.py +++ b/parsl/tests/test_serialization/test_3495_deserialize_managerlost.py @@ -32,7 +32,7 @@ def test_manager_lost_system_failure(tmpd_cwd): cores_per_worker=1, worker_logdir_root=str(tmpd_cwd), heartbeat_period=1, - heartbeat_threshold=1, + heartbeat_threshold=3, ) c = Config(executors=[hte], strategy='simple', strategy_period=0.1)