Skip to content

Commit

Permalink
support [one|multiple]-tasks-per-sharing of gres-flags
Browse files Browse the repository at this point in the history
  • Loading branch information
tazend committed Jan 27, 2024
1 parent 334c602 commit 36ca0e4
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 33 deletions.
9 changes: 9 additions & 0 deletions pyslurm/core/job/job.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1153,6 +1153,15 @@ cdef class Job:
else:
return None

@property
def gres_tasks_per_sharing(self):
if self.ptr.bitflags & slurm.GRES_MULT_TASKS_PER_SHARING:
return "multiple"
elif self.ptr.bitflags & slurm.GRES_ONE_TASK_PER_SHARING:
return "one"
else:
return None

@property
def kill_on_invalid_dependency(self):
return u64_parse_bool_flag(self.ptr.bitflags, slurm.KILL_INV_DEP)
Expand Down
30 changes: 20 additions & 10 deletions pyslurm/core/job/submission.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,17 @@ cdef class JobSubmitDescription:
name (str):
Name of the Job, same as -J/--job-name from sbatch.
account (str):
Account of the job, same as -A/--account from sbatch.
Account of the job, same as -A/--account from sbatch.
user_id (Union[str, int]):
Run the job as a different User, same as --uid from sbatch.
This requires root privileges.
You can both specify the name or numeric uid of the User.
You can both specify the name or numeric uid of the User.
group_id (Union[str, int]):
Run the job as a different Group, same as --gid from sbatch.
This requires root privileges.
You can both specify the name or numeric gid of the User.
You can both specify the name or numeric gid of the User.
priority (int):
Specific priority the Job will receive.
Specific priority the Job will receive.
Same as --priority from sbatch.
You can achieve the behaviour of sbatch's --hold option by
specifying a priority of 0.
Expand Down Expand Up @@ -183,7 +183,7 @@ cdef class JobSubmitDescription:
An MCS Label for the Job.
This is the same as --mcs-label from sbatch.
memory_per_cpu (Union[str, int]):
Memory required per allocated CPU.
Memory required per allocated CPU.
The default unit is in Mebibytes. You are also able to specify
unit suffixes like K|M|G|T.
Expand Down Expand Up @@ -237,7 +237,7 @@ cdef class JobSubmitDescription:
Adjusted scheduling priority for the Job.
This is the same as --nice from sbatch.
log_files_open_mode (str):
Mode in which standard_output and standard_error log files should be opened.
Mode in which standard_output and standard_error log files should be opened.
This is the same as --open-mode from sbatch.
Expand Down Expand Up @@ -353,7 +353,7 @@ cdef class JobSubmitDescription:
gpus (Union[dict, str, int]):
GPUs for the Job to be allocated in total.
This is the same as -G/--gpus from sbatch.
Specifying the type of the GPU is optional.
Specifying the type of the GPU is optional.
For example, specifying the GPU counts as a dict:
Expand Down Expand Up @@ -422,7 +422,7 @@ cdef class JobSubmitDescription:
This is the same as --gres from sbatch. You should also use this
option if you want to specify GPUs per node (--gpus-per-node).
Specifying the type (by separating GRES name and type with a
semicolon) is optional.
semicolon) is optional.
For example, specifying it as a dict:
Expand Down Expand Up @@ -463,7 +463,7 @@ cdef class JobSubmitDescription:
switches.
This is the same as --switches from sbatch.
For example, specifying it as a dict:
switches = { "count": 5, "max_wait_time": "00:10:00" }
Expand Down Expand Up @@ -512,13 +512,22 @@ cdef class JobSubmitDescription:
This is the same as --use-min-nodes from sbatch.
gres_binding (str):
Generic resource task binding options.
This is the --gres-flags option from sbatch.
This is contained in the --gres-flags option from sbatch.
Possible values are:
* `enforce-binding`
* `disable-binding`
gres_tasks_per_sharing (str):
Shared GRES Tasks
This is contained in the --gres-flags option from sbatch.
Possible values are:
* `multiple` or `multiple-tasks-per-sharing`
* `one` or `one-task-per-sharing`
temporary_disk_per_node (Union[str, int]):
Amount of temporary disk space needed per node.
Expand Down Expand Up @@ -630,6 +639,7 @@ cdef class JobSubmitDescription:
spreads_over_nodes
use_min_nodes
gres_binding
gres_tasks_per_sharing
temporary_disk_per_node
get_user_environment
min_cpus_per_node
Expand Down
55 changes: 34 additions & 21 deletions pyslurm/core/job/submission.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ from pyslurm.utils.ctime import (
)
from pyslurm.utils.helpers import (
humanize,
dehumanize,
dehumanize,
signal_to_num,
user_to_uid,
group_to_gid,
Expand Down Expand Up @@ -92,7 +92,7 @@ cdef class JobSubmitDescription:
... cpus_per_task=1,
... time_limit="10-00:00:00",
... script="/path/to/your/submit_script.sh")
>>>
>>>
>>> job_id = desc.submit()
>>> print(job_id)
99
Expand All @@ -117,7 +117,7 @@ cdef class JobSubmitDescription:
attributes.
Args:
overwrite (bool):
overwrite (bool):
If set to `True`, the value from an option found in the
environment will override the current value of the attribute
in this instance. Default is `False`
Expand Down Expand Up @@ -166,9 +166,9 @@ cdef class JobSubmitDescription:
# Arguments directly specified upon object creation will
# always have precedence.
continue
spec = attr.upper()
val = pyenviron.get(f"PYSLURM_JOBDESC_{spec)}")

spec = attr.upper()
val = pyenviron.get(f"PYSLURM_JOBDESC_{spec)}")
if (val is not None
and (getattr(self, attr) is None or overwrite)):

Expand Down Expand Up @@ -225,7 +225,7 @@ cdef class JobSubmitDescription:
cstr.from_gres_dict(self.gpus_per_task, "gpu"))
cstr.fmalloc(&ptr.tres_per_node,
cstr.from_gres_dict(self.gres_per_node))
cstr.fmalloc(&ptr.cpus_per_tres,
cstr.fmalloc(&ptr.cpus_per_tres,
cstr.from_gres_dict(self.cpus_per_gpu, "gpu"))
cstr.fmalloc(&ptr.admin_comment, self.admin_comment)
cstr.fmalloc(&self.ptr.dependency,
Expand Down Expand Up @@ -256,7 +256,7 @@ cdef class JobSubmitDescription:
u64_set_bool_flag(&ptr.bitflags, self.spreads_over_nodes,
slurm.SPREAD_JOB)
u64_set_bool_flag(&ptr.bitflags, self.kill_on_invalid_dependency,
slurm.KILL_INV_DEP)
slurm.KILL_INV_DEP)
u64_set_bool_flag(&ptr.bitflags, self.use_min_nodes,
slurm.USE_MIN_NODES)
ptr.contiguous = u16_bool(self.requires_contiguous_nodes)
Expand All @@ -283,6 +283,7 @@ cdef class JobSubmitDescription:
self._set_cpu_frequency()
self._set_gpu_binding()
self._set_gres_binding()
self._set_gres_tasks_per_sharing()
self._set_min_cpus()

# TODO
Expand Down Expand Up @@ -330,7 +331,7 @@ cdef class JobSubmitDescription:
and self.threads_reserved_for_system):
raise ValueError("cores_reserved_for_system is mutually "
" exclusive with threads_reserved_for_system.")

def _set_core_spec(self):
if self.cores_reserved_for_system:
self.ptr.core_spec = u16(self.cores_reserved_for_system)
Expand All @@ -351,13 +352,13 @@ cdef class JobSubmitDescription:
self.ptr.cpu_freq_min = freq_min
self.ptr.cpu_freq_max = freq_max
self.ptr.cpu_freq_gov = freq_gov

def _set_memory(self):
if self.memory_per_cpu:
self.ptr.pn_min_memory = u64(dehumanize(self.memory_per_cpu))
self.ptr.pn_min_memory = u64(dehumanize(self.memory_per_cpu))
self.ptr.pn_min_memory |= slurm.MEM_PER_CPU
elif self.memory_per_node:
self.ptr.pn_min_memory = u64(dehumanize(self.memory_per_node))
self.ptr.pn_min_memory = u64(dehumanize(self.memory_per_node))
elif self.memory_per_gpu:
mem_gpu = u64(dehumanize(val))
cstr.fmalloc(&self.ptr.mem_per_tres, f"gres:gpu:{mem_gpu}")
Expand Down Expand Up @@ -433,7 +434,7 @@ cdef class JobSubmitDescription:
if not "=" in item:
continue

var, val = item.split("=", 1)
var, val = item.split("=", 1)
slurm_env_array_overwrite(&self.ptr.environment,
var, str(val))
get_user_env = True
Expand All @@ -446,7 +447,7 @@ cdef class JobSubmitDescription:
var, str(val))

# Setup all User selected env vars.
for var, val in vals.items():
for var, val in vals.items():
slurm_env_array_overwrite(&self.ptr.environment,
var, str(val))

Expand All @@ -467,7 +468,7 @@ cdef class JobSubmitDescription:

if isinstance(self.distribution, int):
# Assume the user meant to specify the plane size only.
plane = u16(self.distribution)
plane = u16(self.distribution)
elif isinstance(self.distribution, str):
# Support sbatch style string input
dist = TaskDistribution.from_str(self.distribution)
Expand All @@ -492,7 +493,7 @@ cdef class JobSubmitDescription:
if "verbose" in self.gpu_binding:
binding = f"verbose,gpu:{binding}"

cstr.fmalloc(&self.ptr.tres_bind, binding)
cstr.fmalloc(&self.ptr.tres_bind, binding)

def _set_min_cpus(self):
if self.min_cpus_per_node:
Expand Down Expand Up @@ -534,11 +535,23 @@ cdef class JobSubmitDescription:
def _set_gres_binding(self):
if not self.gres_binding:
return None
elif self.gres_binding.casefold() == "enforce-binding":

binding = self.gres_binding.casefold()
if binding == "enforce-binding":
self.ptr.bitflags |= slurm.GRES_ENFORCE_BIND
elif self.gres_binding.casefold() == "disable-binding":
elif binding == "disable-binding":
self.ptr.bitflags |= slurm.GRES_DISABLE_BIND

def _set_gres_tasks_per_sharing(self):
if not self.gres_tasks_per_sharing:
return None

sharing = self.gres_tasks_per_sharing.casefold()
if sharing == "multiple" or sharing == "multiple-tasks-per-sharing":
self.ptr.bitflags |= slurm.GRES_MULT_TASKS_PER_SHARING
elif sharing == "one" or sharing == "one-task-per-sharing":
self.ptr.bitflags |= slurm.GRES_ONE_TASK_PER_SHARING


def _parse_dependencies(val):
final = None
Expand All @@ -565,7 +578,7 @@ def _parse_dependencies(val):
if not isinstance(vals, list):
vals = str(vals).split(",")

vals = [str(s) for s in vals]
vals = [str(s) for s in vals]
final.append(f"{condition}:{':'.join(vals)}")

final = delim.join(final)
Expand Down Expand Up @@ -627,7 +640,7 @@ def _parse_switches_str_to_dict(switches_str):
vals = str(switches_str.split("@"))
if len(vals) > 1:
out["max_wait_time"] = timestr_to_secs(vals[1])

out["count"] = u32(vals[0])

return out
Expand Down Expand Up @@ -691,7 +704,7 @@ def _validate_cpu_freq(freq):
def _validate_batch_script(script, args=None):
if Path(script).is_file():
# First assume the caller is passing a path to a script and we try
# to load it.
# to load it.
script = Path(script).read_text()
else:
if args:
Expand Down
9 changes: 7 additions & 2 deletions tests/unit/test_job_submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,7 @@ def test_parsing_sbatch_options_from_script():
#SBATCH --exclusive
#SBATCH --ntasks = 2
#SBATCH -c=3 # inline-comments should be ignored
#SBATCH --gres-flags=one-task-per-sharing,enforce-binding
sleep 1000
"""
Expand All @@ -364,8 +365,10 @@ def test_parsing_sbatch_options_from_script():
assert job.resource_sharing == "no"
assert job.ntasks == 5
assert job.cpus_per_task == "3"
assert job.gres_tasks_per_sharing == "one-task-per-sharing"
assert job.gres_binding == "enforce-binding"

job = job_desc(ntasks=5)
job = job_desc(ntasks=5, gres_binding="disable-binding")
job.script = path
job.load_sbatch_options(overwrite=True)
assert job.time_limit == "20"
Expand All @@ -374,6 +377,8 @@ def test_parsing_sbatch_options_from_script():
assert job.resource_sharing == "no"
assert job.ntasks == "2"
assert job.cpus_per_task == "3"
assert job.gres_tasks_per_sharing == "one-task-per-sharing"
assert job.gres_binding == "enforce-binding"
finally:
os.remove(path)

0 comments on commit 36ca0e4

Please sign in to comment.