From 10faf777132c0f2460db1632b440968337c6d8a8 Mon Sep 17 00:00:00 2001 From: Timothy Willard <9395586+TimothyWillard@users.noreply.github.com> Date: Thu, 31 Oct 2024 15:18:52 -0400 Subject: [PATCH] Add `JobSize.size_from_jobs_sims_blocks` Added a class method to `gempyor.batch.JobSize` class that is slightly more convenient to use from a CLI script. Provides a good home for future code realted to GH-360. Corresponding tests and docs. --- flepimop/gempyor_pkg/src/gempyor/batch.py | 88 ++++++++++++++++++- .../tests/batch/test_job_size_class.py | 78 +++++++++++++++- 2 files changed, 162 insertions(+), 4 deletions(-) diff --git a/flepimop/gempyor_pkg/src/gempyor/batch.py b/flepimop/gempyor_pkg/src/gempyor/batch.py index 9b0fc95ee..8fd22d23c 100644 --- a/flepimop/gempyor_pkg/src/gempyor/batch.py +++ b/flepimop/gempyor_pkg/src/gempyor/batch.py @@ -10,6 +10,7 @@ from dataclasses import dataclass import json +import math from pathlib import Path from shlex import quote import subprocess @@ -48,6 +49,89 @@ def __post_init__(self) -> None: ) ) + @classmethod + def size_from_jobs_sims_blocks( + cls, + jobs: int | None, + simulations: int | None, + blocks: int | None, + iterations_per_slot: int | None, + slots: int | None, + subpops: int | None, + batch_system: Literal["aws", "local", "slurm"], + ) -> "JobSize": + """ + Infer a job size from several explicit and implicit parameters. + + Args: + jobs: An explicit number of jobs. + simulations: An explicit number of simulations per a block. + blocks: An explicit number of blocks per a job. + iterations_per_slot: A total number of iterations per a job, which is + simulations times blocks. Required if `simulations` or `blocks` is + not given. + slots: An implicit number of slots to use for the job. Required if `jobs` + is not given. + subpops: The number of subpopulations being considered in this job. Affects + the inferred simulations per a job on AWS. Required if `simulations` + and `blocks` are not given. + batch_size: The system the job is being sized for. Affects the inferred + simulations per a job. + + Returns: + A job size instance with either the explicit or inferred job sizing. + + Examples: + >>> JobSize.size_from_jobs_sims_blocks(1, 2, 3, None, None, None, "local") + JobSize(jobs=1, simulations=2, blocks=3) + >>> JobSize.size_from_jobs_sims_blocks( + ... None, None, None, 100, 10, 25, "local" + ... ) + JobSize(jobs=10, simulations=100, blocks=1) + >>> JobSize.size_from_jobs_sims_blocks(None, None, 4, 100, 10, 25, "local") + JobSize(jobs=10, simulations=25, blocks=4) + + Raises: + ValueError: If `iterations_per_slot` is `None` and either `simulations` or + `blocks` is `None`. + ValueError: If `jobs` and `slots` are both `None`. + ValueError: If `simulations`, `blocks`, and `subpops` are all `None`. + """ + if iterations_per_slot is None and (simulations is None or blocks is None): + raise ValueError( + ( + "If simulations and blocks are not all explicitly " + "provided then an iterations per slot must be given." + ) + ) + + jobs = slots if jobs is None else jobs + if jobs is None: + raise ValueError( + "If jobs is not explicitly provided, it must be given via slots." + ) + + if simulations is None: + if blocks is None: + if subpops is None: + raise ValueError( + ( + "If simulations and blocks are not explicitly " + "provided, then a subpops must be given." + ) + ) + if batch_system == "aws": + simulations = 5 * math.ceil(max(60 - math.sqrt(subpops), 10) / 5) + else: + simulations = iterations_per_slot + else: + simulations = math.ceil(iterations_per_slot / blocks) + + if blocks is None: + blocks = math.ceil(iterations_per_slot / simulations) + + return cls(jobs=jobs, simulations=simulations, blocks=blocks) + def write_manifest( job_name: str, @@ -110,9 +194,7 @@ def write_manifest( if additional_meta: manifest = {**additional_meta, **manifest} - destination = ( - Path("manifest.json").absolute() if destination is None else destination - ) + destination = Path("manifest.json").absolute() if destination is None else destination with destination.open(mode="w", encoding="utf-8") as f: json.dump(manifest, f, indent=4) diff --git a/flepimop/gempyor_pkg/tests/batch/test_job_size_class.py b/flepimop/gempyor_pkg/tests/batch/test_job_size_class.py index cdee832d4..7ffe4d7ac 100644 --- a/flepimop/gempyor_pkg/tests/batch/test_job_size_class.py +++ b/flepimop/gempyor_pkg/tests/batch/test_job_size_class.py @@ -1,4 +1,5 @@ -from typing import Literal +from itertools import product +from typing import Generator, Literal import pytest @@ -29,3 +30,78 @@ def test_less_than_one_value_error( ), ): JobSize(**kwargs) + + +@pytest.mark.parametrize(("simulations", "blocks"), [(None, None), (1, None), (None, 1)]) +def test_size_from_jobs_sims_blocks_iteration_value_error( + simulations: int | None, blocks: int | None +) -> None: + with pytest.raises( + ValueError, + match=( + "^If simulations and blocks are not all explicitly " + "provided then an iterations per slot must be given.$" + ), + ): + JobSize.size_from_jobs_sims_blocks(1, simulations, blocks, None, 1, 1, "aws") + + +def test_size_from_jobs_sims_blocks_slots_value_error() -> None: + with pytest.raises( + ValueError, + match="^If jobs is not explicitly provided, it must be given via slots.$", + ): + JobSize.size_from_jobs_sims_blocks(None, 1, 1, 1, None, 1, "aws") + + +def test_size_from_jobs_sims_blocks_subpops_value_error() -> None: + with pytest.raises( + ValueError, + match=( + "^If simulations and blocks are not explicitly " + "provided, then a subpops must be given.$" + ), + ): + JobSize.size_from_jobs_sims_blocks(1, None, None, 1, 1, None, "aws") + + +def generate_size_from_jobs_sims_blocks( + *args: int | None, +) -> Generator[tuple[int | None, ...], None, None]: + for combo in product(args, repeat=6): + jobs, simulations, blocks, iterations_per_slot, slots, subpops = combo + if iterations_per_slot is None and (simulations is None or blocks is None): + continue + elif jobs is None and slots is None: + continue + elif simulations is None and blocks is None and subpops is None: + continue + yield combo + + +@pytest.mark.parametrize("combo", generate_size_from_jobs_sims_blocks(None, 1, 10)) +def test_size_from_jobs_sims_blocks_output(combo: tuple[int | None, ...]) -> None: + jobs, simulations, blocks, iterations_per_slot, slots, subpops = combo + job_sizes_by_batch_system = {} + for batch_system in ("aws", "local", "slurm"): + job_size = JobSize.size_from_jobs_sims_blocks( + jobs, simulations, blocks, iterations_per_slot, slots, subpops, batch_system + ) + assert ( + job_size.jobs == jobs + if jobs is not None + else isinstance(job_size.jobs, int) and job_size.jobs > 0 + ) + assert ( + job_size.simulations == simulations + if simulations is not None + else isinstance(job_size.simulations, int) and job_size.simulations > 0 + ) + assert ( + job_size.blocks == blocks + if blocks is not None + else isinstance(job_size.blocks, int) and job_size.blocks > 0 + ) + job_sizes_by_batch_system[batch_system] = job_size + assert job_sizes_by_batch_system["local"] == job_sizes_by_batch_system["slurm"] + assert job_sizes_by_batch_system["local"].jobs == job_sizes_by_batch_system["aws"].jobs