From 42a9d78a492bd9868ad3c83ae899fc3a98271998 Mon Sep 17 00:00:00 2001 From: Timothy Willard <9395586+TimothyWillard@users.noreply.github.com> Date: Thu, 7 Nov 2024 17:28:16 -0500 Subject: [PATCH] Add `JobResources` class Added a class to represent job resources required/requested for a batch job. Includes documentation and some unit tests, but not enough. Incorporated into `_click_batch`. --- flepimop/gempyor_pkg/src/gempyor/batch.py | 103 ++++++++++++++++-- .../tests/batch/test_job_resources_class.py | 31 ++++++ 2 files changed, 122 insertions(+), 12 deletions(-) create mode 100644 flepimop/gempyor_pkg/tests/batch/test_job_resources_class.py diff --git a/flepimop/gempyor_pkg/src/gempyor/batch.py b/flepimop/gempyor_pkg/src/gempyor/batch.py index ae3137964..244e734cb 100644 --- a/flepimop/gempyor_pkg/src/gempyor/batch.py +++ b/flepimop/gempyor_pkg/src/gempyor/batch.py @@ -5,7 +5,7 @@ metadata and job size calculations for example. """ -__all__ = ["BatchSystem", "JobSize", "JobTimeLimit", "write_manifest"] +__all__ = ["BatchSystem", "JobSize", "JobResources", "JobTimeLimit", "write_manifest"] from dataclasses import dataclass @@ -150,14 +150,90 @@ def size_from_jobs_sims_blocks( Returns: A job size instance with either the explicit or inferred job sizing. """ - inference_method = ( - inference_method if inference_method is None else inference_method.lower() - ) if inference_method == "emcee": return cls(jobs=jobs, simulations=blocks * simulations, blocks=1) return cls(jobs=jobs, simulations=simulations, blocks=blocks) +@dataclass(frozen=True, slots=True) +class JobResources: + """ + A batch submission job resources request. + + Attributes: + nodes: The number of nodes to request. + cpus: The number of CPUs to request per a node. + memory: The amount of memory to request per a node in MB. + + Raises: + ValueError: If any of the attributes are less than 1. + """ + + nodes: int + cpus: int + memory: int + + def __post_init__(self) -> None: + for p in self.__slots__: + if (val := getattr(self, p)) < 1: + raise ValueError( + ( + f"The '{p}' attribute must be greater than 0, " + f"but instead was given '{val}'." + ) + ) + + @classmethod + def from_presets( + cls, job_size: JobSize, inference_method: Literal["emcee"] | None + ) -> "JobResources": + """ + Calculate suggested job resources from presets. + + Args: + job_size: The size of the job being ran. + inference_method: The inference method being used for this job. + + Returns: + A job resources instances scaled to the job size given. + """ + if inference_method == "emcee": + return cls( + nodes=1, cpus=2 * job_size.jobs, memory=2 * 1024 * job_size.simulations + ) + return cls(nodes=job_size.jobs, cpus=2, memory=2 * 1024) + + @property + def total_cpus(self) -> int: + """ + Calculate the total number of CPUs. + + Returns: + The total number of CPUs represented by this instance. + """ + return self.nodes * self.cpus + + @property + def total_memory(self) -> int: + """ + Calculate the total amount of memory. + + Returns: + The total amount of memory represented by this instance. + """ + return self.nodes * self.memory + + def total_resources(self) -> tuple[int, int, int]: + """ + Calculate the total resources. + + Returns: + A tuple of the nodes, total CPUs, and total memory represented by + this instance. + """ + return (self.nodes, self.total_cpus, self.total_memory) + + @dataclass(frozen=True, slots=True) class JobTimeLimit: """ @@ -261,8 +337,6 @@ def from_per_simulation_time( Raises: ValueError: If `time_per_simulation` is non-positive. ValueError: If `initial_time` is non-positive. - - Examples: """ if (total_seconds := time_per_simulation.total_seconds()) <= 0.0: raise ValueError( @@ -672,6 +746,9 @@ def _click_batch(ctx: click.Context = mock_context, **kwargs) -> None: "The `flepimop batch` CLI only supports EMCEE inference jobs." ) inference_method = cfg["inference"]["method"].as_str() + inference_method = ( + inference_method if inference_method is None else inference_method.lower() + ) # Job name/run id name = cfg["name"].get(str) if cfg["name"].exists() else None @@ -719,6 +796,9 @@ def _click_batch(ctx: click.Context = mock_context, **kwargs) -> None: ) logger.info("Setting a total job time limit of %s minutes", job_time_limit.format()) + # Job resources + job_resources = JobResources.from_presets(job_size, inference_method) + # Cluster info cluster: Cluster | None = None if batch_system == BatchSystem.SLURM: @@ -756,7 +836,7 @@ def _click_batch(ctx: click.Context = mock_context, **kwargs) -> None: "flepi_path": kwargs["flepi_path"].absolute(), "job_name": job_name, "jobs": job_size.jobs, - "nslots": kwargs["nslots"], + "nslots": job_size.simulations, "nthin": kwargs["nthin"], "prefix": kwargs["prefix"], "project_path": kwargs["project_path"].absolute(), @@ -766,12 +846,11 @@ def _click_batch(ctx: click.Context = mock_context, **kwargs) -> None: options = { "chdir": kwargs["project_path"].absolute(), "comment": f"Generated on {now:%c %Z} and submitted by {getuser()}.", - "cpus-per-task": job_size.jobs, + "cpus-per-task": job_resources.cpus, "job-name": job_name, - "mem": "100GB", - "nodes": 1, # EMCEE can only run on one node for now. - "ntasks": 1, - "partition": "general", + "mem": job_resources.memory, + "nodes": job_resources.nodes, + "ntasks-per-node": 1, "time": job_time_limit.format("slurm"), } if kwargs["partition"] is not None: diff --git a/flepimop/gempyor_pkg/tests/batch/test_job_resources_class.py b/flepimop/gempyor_pkg/tests/batch/test_job_resources_class.py new file mode 100644 index 000000000..6bb8b8551 --- /dev/null +++ b/flepimop/gempyor_pkg/tests/batch/test_job_resources_class.py @@ -0,0 +1,31 @@ +from typing import Literal + +import pytest + +from gempyor.batch import JobResources + + +@pytest.mark.parametrize( + "kwargs", + ( + {"nodes": 0, "cpus": 1, "memory": 1}, + {"nodes": 1, "cpus": 0, "memory": 1}, + {"nodes": 1, "cpus": 1, "memory": 0}, + {"nodes": 0, "cpus": 0, "memory": 1}, + {"nodes": 1, "cpus": 0, "memory": 0}, + {"nodes": 0, "cpus": 1, "memory": 0}, + {"nodes": 0, "cpus": 0, "memory": 0}, + ), +) +def test_less_than_one_value_error( + kwargs: dict[Literal["nodes", "cpus", "memory"], int] +) -> None: + param = next(k for k, v in kwargs.items() if v < 1) + with pytest.raises( + ValueError, + match=( + f"^The '{param}' attribute must be greater than 0, " + f"but instead was given '{kwargs.get(param)}'.$" + ), + ): + JobResources(**kwargs)