Skip to content

Commit

Permalink
Add JobResources class
Browse files Browse the repository at this point in the history
Added a class to represent job resources required/requested for a batch
job. Includes documentation and some unit tests, but not enough.
Incorporated into `_click_batch`.
  • Loading branch information
TimothyWillard committed Nov 7, 2024
1 parent c8c0723 commit 42a9d78
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 12 deletions.
103 changes: 91 additions & 12 deletions flepimop/gempyor_pkg/src/gempyor/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
metadata and job size calculations for example.
"""

__all__ = ["BatchSystem", "JobSize", "JobTimeLimit", "write_manifest"]
__all__ = ["BatchSystem", "JobSize", "JobResources", "JobTimeLimit", "write_manifest"]


from dataclasses import dataclass
Expand Down Expand Up @@ -150,14 +150,90 @@ def size_from_jobs_sims_blocks(
Returns:
A job size instance with either the explicit or inferred job sizing.
"""
inference_method = (
inference_method if inference_method is None else inference_method.lower()
)
if inference_method == "emcee":
return cls(jobs=jobs, simulations=blocks * simulations, blocks=1)
return cls(jobs=jobs, simulations=simulations, blocks=blocks)


@dataclass(frozen=True, slots=True)
class JobResources:
"""
A batch submission job resources request.
Attributes:
nodes: The number of nodes to request.
cpus: The number of CPUs to request per a node.
memory: The amount of memory to request per a node in MB.
Raises:
ValueError: If any of the attributes are less than 1.
"""

nodes: int
cpus: int
memory: int

def __post_init__(self) -> None:
for p in self.__slots__:
if (val := getattr(self, p)) < 1:
raise ValueError(
(
f"The '{p}' attribute must be greater than 0, "
f"but instead was given '{val}'."
)
)

@classmethod
def from_presets(
cls, job_size: JobSize, inference_method: Literal["emcee"] | None
) -> "JobResources":
"""
Calculate suggested job resources from presets.
Args:
job_size: The size of the job being ran.
inference_method: The inference method being used for this job.
Returns:
A job resources instances scaled to the job size given.
"""
if inference_method == "emcee":
return cls(
nodes=1, cpus=2 * job_size.jobs, memory=2 * 1024 * job_size.simulations
)
return cls(nodes=job_size.jobs, cpus=2, memory=2 * 1024)

@property
def total_cpus(self) -> int:
"""
Calculate the total number of CPUs.
Returns:
The total number of CPUs represented by this instance.
"""
return self.nodes * self.cpus

@property
def total_memory(self) -> int:
"""
Calculate the total amount of memory.
Returns:
The total amount of memory represented by this instance.
"""
return self.nodes * self.memory

def total_resources(self) -> tuple[int, int, int]:
"""
Calculate the total resources.
Returns:
A tuple of the nodes, total CPUs, and total memory represented by
this instance.
"""
return (self.nodes, self.total_cpus, self.total_memory)


@dataclass(frozen=True, slots=True)
class JobTimeLimit:
"""
Expand Down Expand Up @@ -261,8 +337,6 @@ def from_per_simulation_time(
Raises:
ValueError: If `time_per_simulation` is non-positive.
ValueError: If `initial_time` is non-positive.
Examples:
"""
if (total_seconds := time_per_simulation.total_seconds()) <= 0.0:
raise ValueError(
Expand Down Expand Up @@ -672,6 +746,9 @@ def _click_batch(ctx: click.Context = mock_context, **kwargs) -> None:
"The `flepimop batch` CLI only supports EMCEE inference jobs."
)
inference_method = cfg["inference"]["method"].as_str()
inference_method = (
inference_method if inference_method is None else inference_method.lower()
)

# Job name/run id
name = cfg["name"].get(str) if cfg["name"].exists() else None
Expand Down Expand Up @@ -719,6 +796,9 @@ def _click_batch(ctx: click.Context = mock_context, **kwargs) -> None:
)
logger.info("Setting a total job time limit of %s minutes", job_time_limit.format())

# Job resources
job_resources = JobResources.from_presets(job_size, inference_method)

# Cluster info
cluster: Cluster | None = None
if batch_system == BatchSystem.SLURM:
Expand Down Expand Up @@ -756,7 +836,7 @@ def _click_batch(ctx: click.Context = mock_context, **kwargs) -> None:
"flepi_path": kwargs["flepi_path"].absolute(),
"job_name": job_name,
"jobs": job_size.jobs,
"nslots": kwargs["nslots"],
"nslots": job_size.simulations,
"nthin": kwargs["nthin"],
"prefix": kwargs["prefix"],
"project_path": kwargs["project_path"].absolute(),
Expand All @@ -766,12 +846,11 @@ def _click_batch(ctx: click.Context = mock_context, **kwargs) -> None:
options = {
"chdir": kwargs["project_path"].absolute(),
"comment": f"Generated on {now:%c %Z} and submitted by {getuser()}.",
"cpus-per-task": job_size.jobs,
"cpus-per-task": job_resources.cpus,
"job-name": job_name,
"mem": "100GB",
"nodes": 1, # EMCEE can only run on one node for now.
"ntasks": 1,
"partition": "general",
"mem": job_resources.memory,
"nodes": job_resources.nodes,
"ntasks-per-node": 1,
"time": job_time_limit.format("slurm"),
}
if kwargs["partition"] is not None:
Expand Down
31 changes: 31 additions & 0 deletions flepimop/gempyor_pkg/tests/batch/test_job_resources_class.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from typing import Literal

import pytest

from gempyor.batch import JobResources


@pytest.mark.parametrize(
"kwargs",
(
{"nodes": 0, "cpus": 1, "memory": 1},
{"nodes": 1, "cpus": 0, "memory": 1},
{"nodes": 1, "cpus": 1, "memory": 0},
{"nodes": 0, "cpus": 0, "memory": 1},
{"nodes": 1, "cpus": 0, "memory": 0},
{"nodes": 0, "cpus": 1, "memory": 0},
{"nodes": 0, "cpus": 0, "memory": 0},
),
)
def test_less_than_one_value_error(
kwargs: dict[Literal["nodes", "cpus", "memory"], int]
) -> None:
param = next(k for k, v in kwargs.items() if v < 1)
with pytest.raises(
ValueError,
match=(
f"^The '{param}' attribute must be greater than 0, "
f"but instead was given '{kwargs.get(param)}'.$"
),
):
JobResources(**kwargs)

0 comments on commit 42a9d78

Please sign in to comment.