Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for torque in IPMU #36

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions python/lsst/ctrl/bps/parsl/sites/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,4 @@
from .local import *
from .slurm import *
from .work_queue import *
from .torque import *
331 changes: 331 additions & 0 deletions python/lsst/ctrl/bps/parsl/sites/torque.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,331 @@
# This file is part of ctrl_bps_parsl.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (https://www.lsst.org) and the LSST DESC (https://www.lsstdesc.org/).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This software is dual licensed under the GNU General Public License and also
# under a 3-clause BSD license. Recipients may choose which of these licenses
# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
# respectively. If you choose the GPL option then the following text applies
# (but note that there is still no warranty even if you opt for BSD instead):
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

import logging
from typing import TYPE_CHECKING, Any

from parsl.executors import HighThroughputExecutor
from parsl.executors.base import ParslExecutor


from ..configuration import get_bps_config_value, get_workflow_name
from ..site import SiteConfig
from parsl.launchers import MpiRunLauncher
from parsl.providers import TorqueProvider
from parsl.channels import LocalChannel

logger = logging.getLogger(__name__)

if TYPE_CHECKING:
from ..job import ParslJob

__all__ = ("Torque",)


Kwargs = dict[str, Any]


class Torque(SiteConfig):
"""Configuration for generic Torque cluster.

This can be used directly as the site configuration for a Torque cluster by
setting the BPS config, e.g.:

.. code-block:: yaml

computeSite: slurm
site:
slurm:
class: lsst.ctrl.bps.parsl.sites.Torque
nodes: 3
tasks_per_node: 20
walltime: "00:59:00" # Note: always quote walltime in YAML

Alternatively, it can be used as a base class for Torque cluster
configurations.

The following BPS configuration parameters are recognised (and required
unless there is a default mentioned here, or provided by a subclass):

- ``queue`` (`int`): Queue for the Torque job.
- ``nodes`` (`int`): number of nodes for each Torque job.
- ``tasks_per_node`` (`int`): number of cores per node for each Torque job;
by default we use all cores on the node.
- ``walltime`` (`str`): time limit for each Torque job.
- ``scheduler_options`` (`str`): text to prepend to the Torque submission
script (each line usually starting with ``#PBS``).
"""

def make_executor(
self,
label: str,
*,
queue: str | None = None,
nodes: int | None = None,
tasks_per_node: int | None = None,
walltime: str | None = None,
mem_per_worker: float | None = None,
scheduler_options: str | None = None,
worker_init: str | None = None,
provider_options: Kwargs | None = None,
executor_options: Kwargs | None = None,
) -> ParslExecutor:
"""Return an executor for running on a Torque cluster.

Parameters
----------
label : `str`
Label for executor.
queue : `str`, optional
Queue for the Torque job.
nodes : `int`, optional
Default number of nodes for each Torque job.
tasks_per_node : `int`, optional
Default number of cores per node for each Torque job.
walltime : `str`, optional
Default time limit for each Torque job.
mem_per_worker : `float`, optional
Minimum memory per worker (GB), limited by the executor.
worker_init : `str`, optional
Environment initiation command
scheduler_options : `str`, optional
``#SBATCH`` directives to prepend to the Torque submission script.
provider_options : `dict`, optional
Additional arguments for `TorqueProvider` constructor.
executor_options : `dict`, optional
Additional arguments for `HighThroughputExecutor` constructor.

Returns
-------
executor : `HighThroughputExecutor`
Executor for Torque jobs.
"""
nodes = get_bps_config_value(
self.site,
"nodes",
int,
nodes,
required=True,
)
queue = get_bps_config_value(
self.site,
"queue",
str,
queue,
)
walltime = get_bps_config_value(
self.site,
"walltime",
str,
walltime,
required=True,
)
tasks_per_node = get_bps_config_value(
self.site,
"tasks_per_node",
int,
tasks_per_node,
)
worker_init = get_bps_config_value(
self.site,
"worker_init",
str,
walltime,
)
scheduler_options = get_bps_config_value(
self.site,
"scheduler_options",
str,
scheduler_options,
)

job_name = get_workflow_name(self.config)
if scheduler_options is None:
scheduler_options = ""
else:
scheduler_options += "\n"
scheduler_options += f"#PBS -N {job_name}\n"
if queue:
scheduler_options += f"#PBS -q {queue}\n"
if not isinstance(nodes, int) or nodes < 1:
nodes = 1
if not isinstance(tasks_per_node, int) or tasks_per_node < 1:
tasks_per_node = 1
if not isinstance(walltime, str) or len(walltime) < 1:
walltime = "48:00:00"
if worker_init is None:
worker_init = ""

return HighThroughputExecutor(
label,
provider=TorqueProviderI(
nodes_per_block=nodes,
tasks_per_node=tasks_per_node,
queue=queue,
walltime=walltime,
scheduler_options=scheduler_options,
worker_init=worker_init,
**(provider_options or {}),
),
mem_per_worker=mem_per_worker,
address=self.get_address(),
**(executor_options or {}),
)

def get_executors(self) -> list[ParslExecutor]:
"""Get a list of executors to be used in processing.

Each executor should have a unique ``label``.
"""
return [self.make_executor("torque")]

def select_executor(self, job: "ParslJob") -> str:
"""Get the ``label`` of the executor to use to execute a job.

Parameters
----------
job : `ParslJob`
Job to be executed.

Returns
-------
label : `str`
Label of executor to use to execute ``job``.
"""
return "torque"


class TorqueProviderI(TorqueProvider):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm interested in what this subclass is for - it looks like you're trying to add a tasks-per-node parameter which would usually end up launching multiple copies of the Parsl worker pool on one node (rather than having one process worker pool manage the whole node). Is this what you're intending / is this actually what happens?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am pasting the submission script generated by parsl:

#!/bin/bash

#PBS -N shear.test
#PBS -q small

#PBS -S /bin/bash
#PBS -N parsl.parsl.torque.block-0.1726231023.145446
#PBS -m n
#PBS -l walltime=10:00:00
#PBS -l nodes=2:ppn=12
#PBS -o /work/xiangchong.li/superonionGW/code/image/xlens/tests/xlens/multiband/runinfo/000/submit_scripts/parsl.parsl.torque.block-0.1726231023.145446.submit.stdout
#PBS -e /work/xiangchong.li/superonionGW/code/image/xlens/tests/xlens/multiband/runinfo/000/submit_scripts/parsl.parsl.torque.block-0.1726231023.145446.submit.stderr

source /work/xiangchong.li/setupIm.sh

export JOBNAME="parsl.parsl.torque.block-0.1726231023.145446"

set -e
export CORES=$(getconf _NPROCESSORS_ONLN)
[[ "1" == "1" ]] && echo "Found cores : $CORES"
WORKERCOUNT=24

cat << MPIRUN_EOF > cmd_$JOBNAME.sh
process_worker_pool.py   -a gw2.local -p 0 -c 1.0 -m None --poll 10 --task_port=54319 --result_port=54758 --logdir=/work/xiangchong.li/superonionGW/code/image/xlens/tests/xlens/multiband/runinfo/000/torque --block_id=0 --hb_period=30  --hb_threshold=120 --cpu-affinity none --available-accelerators  --start-method spawn
MPIRUN_EOF
chmod u+x cmd_$JOBNAME.sh

mpirun -np $WORKERCOUNT  /bin/bash cmd_$JOBNAME.sh

[[ "1" == "1" ]] && echo "All workers done"

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I add this subclass so that I can change the ppn parameters in PBS system by setting the tasks_per_node in the configure file. The goal is to use 12 cpus in each node and each cpu has one task.

I am not sure I am doing it in the best way, but the code can be run on the server.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the usual Parsl model, you'd run one copy of process_worker_pool.py on each node, and that worker pool would be in charge of running multiple tasks at once. The command line you specify has an option -c 1.0 which means 1 core per worker.

So the worker pool code should run as many workers (and so, as many simultaneous tasks) as you have cores on your worker node: that is the code that is in charge of running multiple workers, not mpirun.

Have a look in your run directory (deep inside runinfo/....) for a file called manager.log. You should see one per node (or with your configuration above, 24 per node) and inside those files you should see a log line like this:

2024-09-13 12:54:44.837 parsl:254 72 MainThread [INFO]  Manager will spawn 8 workers

How many workers do you see there?

Copy link
Author

@mr-superonion mr-superonion Sep 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should see one per node (or with your configuration above, 24 per node)

I think in the submission code generated by parsl, I run 12 tasks per node, each with one cpu, and there are 24 tasks over 2 ndoes.
#PBS -l nodes=2:ppn=12 says each node uses 12 cores (therefore 12 tasks running at the same time).
while the WORKERCOUNT=24 says each node has 24 workers across all the nodes.

Copy link
Author

@mr-superonion mr-superonion Sep 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that slurm is doing in a more consistent way, I guess:
https://github.com/Parsl/parsl/blob/dd9150d7ac26b04eb8ff15247b1c18ce9893f79c/parsl/providers/slurm/slurm.py#L266

It has the option to set cores_per_task in addition to tasks_per_node.. PBS does not has this option.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In your setup, i think you should make each worker pool try to use only 1 core, so that when you run 12 worker pools per node, you get 1 x 12 = 12 workers on each node. Have a look at the max_workers Parsl configuration parameter - for example, see how it is configured at in2p3:

Yeah.. Got it now. Thanks

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are lots of different ways to change things to get what you want, so it is quite confusing.

You could try this:

i) set the number of nodes in your job to 1 (so if you want to run on multiple nodes, you launch multiple blocks/multiple batch jobs)

ii) use the change you have made in this PR to set task_per_node to 12 - so that 12 cores are requested in #PBS nodes=...

iii) use the SimpleLauncher instead of the MpiRunLauncher here:

https://github.com/lsst/ctrl_bps_parsl/pull/36/files#diff-e5ba88552b57b323bd184a741f622b7cc7b3a4090d5ac09456f7a8fe85fcc75cR287

so that only a single copy of the process worker pool is launched in each batch job - rather than using mpirun to launch many copies of it

iv) tell the process worker pool to use 12 workers per pool, using max_workers = 12.

That should result in batch jobs where each batch job:

  • gets 12 cores from PBS
  • runs one copy of the Parsl process worker pool
  • the process worker pool runs 12 workers

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Does the SimpleLauncher support running on two nodes? I thought if I use two nodes, I have to have 2 copies, one on each node. And I thought the copy shall be done with MpiRunLauncher? Please correct me if you find this understanding is wrong.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SimpleLauncher does not support running on two nodes.

The model I wrote above has 1 node per block/per batch job - and if you want to use two nodes, set the max_blocks paramter to 2. so that you get two separate batch jobs that look like this.

(I opened a Parsl issue Parsl/parsl#3616 to request that the Parsl team try to make this interface nicer, some time in the future)

"""Torque Execution Provider

This provider uses qsub to submit, qstat for status, and qdel to cancel
jobs. The qsub script to be used is created from a template file in this
same module.

Parameters
----------
channel : Channel
Channel for accessing this provider. Possible channels include
:class:`~parsl.channels.LocalChannel` (the default),
:class:`~parsl.channels.SSHChannel`, or
:class:`~parsl.channels.SSHInteractiveLoginChannel`.
account : str
Account the job will be charged against.
queue : str
Torque queue to request blocks from.
nodes_per_block : int
Nodes to provision per block.
tasks_per_node : int
Number of tasks per node
init_blocks : int
Number of blocks to provision at the start of the run. Default is 1.
min_blocks : int
Minimum number of blocks to maintain. Default is 0.
max_blocks : int
Maximum number of blocks to maintain.
parallelism : float
Ratio of provisioned task slots to active tasks. A parallelism value of
1 represents aggressive
scaling where as many resources as possible are used; parallelism close
to 0 represents
the opposite situation in which as few resources as possible
(i.e., min_blocks) are used.
walltime : str
Walltime requested per block in HH:MM:SS.
scheduler_options : str
String to prepend to the #PBS blocks in the submit script to the
scheduler.
WARNING: scheduler_options should only be given #PBS strings, and
should not have trailing newlines.
worker_init : str
Command to be run before starting a worker, such as
'module load Anaconda; source activate env'.
launcher : Launcher
Launcher for this provider. Possible launchers include
:class:`~parsl.launchers.AprunLauncher` (the default), or
:class:`~parsl.launchers.SingleNodeLauncher`,

"""

def __init__(
self,
channel=LocalChannel(),
account=None,
queue=None,
scheduler_options="",
worker_init="",
nodes_per_block=1,
tasks_per_node=1,
init_blocks=1,
min_blocks=0,
max_blocks=1,
parallelism=1,
launcher=MpiRunLauncher(),
walltime="00:20:00",
cmd_timeout=120,
):
super().__init__(
channel=channel,
account=account,
queue=queue,
scheduler_options=scheduler_options,
worker_init=worker_init,
nodes_per_block=nodes_per_block,
init_blocks=init_blocks,
min_blocks=min_blocks,
max_blocks=max_blocks,
parallelism=parallelism,
launcher=launcher,
walltime=walltime,
cmd_timeout=cmd_timeout,
)
self.tasks_per_node = tasks_per_node

def submit(self, command, tasks_per_node, job_name="parsl.torque"):
"""Submit the command onto an Local Resource Manager job.

This function returns an ID that corresponds to the task that was just
submitted.

Parameters
----------
command (string):
Commandline invocation to be made on the remote side.
job_name (String):
Name for job, must be unique

Returns
-------
None: At capacity, cannot provision more
job_id (string): Identifier for the job

"""
return super().submit(
command=command,
tasks_per_node=self.tasks_per_node,
job_name=job_name,
)