Skip to content

Commit

Permalink
Adds clusters option for slurm (#3694)
Browse files Browse the repository at this point in the history
# Description

Adds the `clusters` option from slurm for sites with [multi-cluster
slurm](https://slurm.schedmd.com/multi_cluster.html) setups.

# Changed Behaviour

Users on federated Slurm clusters will be able to schedule tasks between
different clusters within the same slurm instance.

# Fixes

Fixes #3675 

## Type of change

- New feature

---------

Co-authored-by: Nicholas Tyler <[email protected]>
  • Loading branch information
tylern4 and Nicholas Tyler authored Nov 14, 2024
1 parent 92ab47f commit 5cb58d1
Showing 1 changed file with 25 additions and 3 deletions.
28 changes: 25 additions & 3 deletions parsl/providers/slurm/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ class SlurmProvider(ClusterProvider, RepresentationMixin):
Slurm queue to place job in. If unspecified or ``None``, no queue slurm directive will be specified.
constraint : str
Slurm job constraint, often used to choose cpu or gpu type. If unspecified or ``None``, no constraint slurm directive will be added.
clusters : str
Slurm cluster name, or comma seperated cluster list, used to choose between different clusters in a federated Slurm instance.
If unspecified or ``None``, no slurm directive for clusters will be added.
channel : Channel
Channel for accessing this provider.
nodes_per_block : int
Expand Down Expand Up @@ -116,6 +119,7 @@ def __init__(self,
account: Optional[str] = None,
qos: Optional[str] = None,
constraint: Optional[str] = None,
clusters: Optional[str] = None,
channel: Channel = LocalChannel(),
nodes_per_block: int = 1,
cores_per_node: Optional[int] = None,
Expand Down Expand Up @@ -152,6 +156,7 @@ def __init__(self,
self.account = account
self.qos = qos
self.constraint = constraint
self.clusters = clusters
self.scheduler_options = scheduler_options + '\n'
if exclusive:
self.scheduler_options += "#SBATCH --exclusive\n"
Expand All @@ -163,6 +168,8 @@ def __init__(self,
self.scheduler_options += "#SBATCH --qos={}\n".format(qos)
if constraint:
self.scheduler_options += "#SBATCH --constraint={}\n".format(constraint)
if clusters:
self.scheduler_options += "#SBATCH --clusters={}\n".format(clusters)

self.regex_job_id = regex_job_id
self.worker_init = worker_init + '\n'
Expand All @@ -174,14 +181,22 @@ def __init__(self,
logger.debug(f"sacct returned retcode={retcode} stderr={stderr}")
if retcode == 0:
logger.debug("using sacct to get job status")
_cmd = "sacct"
# Add clusters option to sacct if provided
if self.clusters:
_cmd += f" --clusters={self.clusters}"
# Using state%20 to get enough characters to not truncate output
# of the state. Without output can look like "<job_id> CANCELLED+"
self._cmd = "sacct -X --noheader --format=jobid,state%20 --job '{0}'"
self._cmd = _cmd + " -X --noheader --format=jobid,state%20 --job '{0}'"
self._translate_table = sacct_translate_table
else:
logger.debug(f"sacct failed with retcode={retcode}")
logger.debug("falling back to using squeue to get job status")
self._cmd = "squeue --noheader --format='%i %t' --job '{0}'"
_cmd = "squeue"
# Add clusters option to squeue if provided
if self.clusters:
_cmd += f" --clusters={self.clusters}"
self._cmd = _cmd + " --noheader --format='%i %t' --job '{0}'"
self._translate_table = squeue_translate_table

def _status(self):
Expand Down Expand Up @@ -344,7 +359,14 @@ def cancel(self, job_ids):
'''

job_id_list = ' '.join(job_ids)
retcode, stdout, stderr = self.execute_wait("scancel {0}".format(job_id_list))

# Make the command to cancel jobs
_cmd = "scancel"
if self.clusters:
_cmd += f" --clusters={self.clusters}"
_cmd += " {0}"

retcode, stdout, stderr = self.execute_wait(_cmd.format(job_id_list))
rets = None
if retcode == 0:
for jid in job_ids:
Expand Down

0 comments on commit 5cb58d1

Please sign in to comment.