Skip to content

Commit

Permalink
Add JobSize.size_from_jobs_sims_blocks
Browse files Browse the repository at this point in the history
Added a class method to `gempyor.batch.JobSize` class that is slightly
more convenient to use from a CLI script. Provides a good home for
future code realted to GH-360. Corresponding tests and docs.
  • Loading branch information
TimothyWillard committed Oct 31, 2024
1 parent 243c792 commit 2e9b66a
Show file tree
Hide file tree
Showing 2 changed files with 162 additions and 4 deletions.
88 changes: 85 additions & 3 deletions flepimop/gempyor_pkg/src/gempyor/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from dataclasses import dataclass
import json
import math
from pathlib import Path
from shlex import quote
import subprocess
Expand Down Expand Up @@ -48,6 +49,89 @@ def __post_init__(self) -> None:
)
)

@classmethod
def size_from_jobs_sims_blocks(
cls,
jobs: int | None,
simulations: int | None,
blocks: int | None,
iterations_per_slot: int | None,
slots: int | None,
subpops: int | None,
batch_system: Literal["aws", "local", "slurm"],
) -> "JobSize":
"""
Infer a job size from several explicit and implicit parameters.
Args:
jobs: An explicit number of jobs.
simulations: An explicit number of simulations per a block.
blocks: An explicit number of blocks per a job.
iterations_per_slot: A total number of iterations per a job, which is
simulations times blocks. Required if `simulations` or `blocks` is
not given.
slots: An implicit number of slots to use for the job. Required if `jobs`
is not given.
subpops: The number of subpopulations being considered in this job. Affects
the inferred simulations per a job on AWS. Required if `simulations`
and `blocks` are not given.
batch_size: The system the job is being sized for. Affects the inferred
simulations per a job.
Returns:
A job size instance with either the explicit or inferred job sizing.
Examples:
>>> JobSize.size_from_jobs_sims_blocks(1, 2, 3, None, None, None, "local")
JobSize(jobs=1, simulations=2, blocks=3)
>>> JobSize.size_from_jobs_sims_blocks(
... None, None, None, 100, 10, 25, "local"
... )
JobSize(jobs=10, simulations=100, blocks=1)
>>> JobSize.size_from_jobs_sims_blocks(None, None, 4, 100, 10, 25, "local")
JobSize(jobs=10, simulations=25, blocks=4)
Raises:
ValueError: If `iterations_per_slot` is `None` and either `simulations` or
`blocks` is `None`.
ValueError: If `jobs` and `slots` are both `None`.
ValueError: If `simulations`, `blocks`, and `subpops` are all `None`.
"""
if iterations_per_slot is None and (simulations is None or blocks is None):
raise ValueError(
(
"If simulations and blocks are not all explicitly "
"provided then an iterations per slot must be given."
)
)

jobs = slots if jobs is None else jobs
if jobs is None:
raise ValueError(
"If jobs is not explicitly provided, it must be given via slots."
)

if simulations is None:
if blocks is None:
if subpops is None:
raise ValueError(
(
"If simulations and blocks are not explicitly "
"provided, then a subpops must be given."
)
)
if batch_system == "aws":
simulations = 5 * math.ceil(max(60 - math.sqrt(subpops), 10) / 5)
else:
simulations = iterations_per_slot
else:
simulations = math.ceil(iterations_per_slot / blocks)

if blocks is None:
blocks = math.ceil(iterations_per_slot / simulations)

return cls(jobs=jobs, simulations=simulations, blocks=blocks)


def write_manifest(
job_name: str,
Expand Down Expand Up @@ -110,9 +194,7 @@ def write_manifest(
if additional_meta:
manifest = {**additional_meta, **manifest}

destination = (
Path("manifest.json").absolute() if destination is None else destination
)
destination = Path("manifest.json").absolute() if destination is None else destination
with destination.open(mode="w", encoding="utf-8") as f:
json.dump(manifest, f, indent=4)

Expand Down
78 changes: 77 additions & 1 deletion flepimop/gempyor_pkg/tests/batch/test_job_size_class.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import Literal
from itertools import product
from typing import Generator, Literal

import pytest

Expand Down Expand Up @@ -29,3 +30,78 @@ def test_less_than_one_value_error(
),
):
JobSize(**kwargs)


@pytest.mark.parametrize(("simulations", "blocks"), [(None, None), (1, None), (None, 1)])
def test_size_from_jobs_sims_blocks_iteration_value_error(
simulations: int | None, blocks: int | None
) -> None:
with pytest.raises(
ValueError,
match=(
"^If simulations and blocks are not all explicitly "
"provided then an iterations per slot must be given.$"
),
):
JobSize.size_from_jobs_sims_blocks(1, simulations, blocks, None, 1, 1, "aws")


def test_size_from_jobs_sims_blocks_slots_value_error() -> None:
with pytest.raises(
ValueError,
match="^If jobs is not explicitly provided, it must be given via slots.$",
):
JobSize.size_from_jobs_sims_blocks(None, 1, 1, 1, None, 1, "aws")


def test_size_from_jobs_sims_blocks_subpops_value_error() -> None:
with pytest.raises(
ValueError,
match=(
"^If simulations and blocks are not explicitly "
"provided, then a subpops must be given.$"
),
):
JobSize.size_from_jobs_sims_blocks(1, None, None, 1, 1, None, "aws")


def generate_size_from_jobs_sims_blocks(
*args: int | None,
) -> Generator[tuple[int | None, ...], None, None]:
for combo in product(args, repeat=6):
jobs, simulations, blocks, iterations_per_slot, slots, subpops = combo
if iterations_per_slot is None and (simulations is None or blocks is None):
continue
elif jobs is None and slots is None:
continue
elif simulations is None and blocks is None and subpops is None:
continue
yield combo


@pytest.mark.parametrize("combo", generate_size_from_jobs_sims_blocks(None, 1, 10))
def test_size_from_jobs_sims_blocks_output(combo: tuple[int | None, ...]) -> None:
jobs, simulations, blocks, iterations_per_slot, slots, subpops = combo
job_sizes_by_batch_system = {}
for batch_system in ("aws", "local", "slurm"):
job_size = JobSize.size_from_jobs_sims_blocks(
jobs, simulations, blocks, iterations_per_slot, slots, subpops, batch_system
)
assert (
job_size.jobs == jobs
if jobs is not None
else isinstance(job_size.jobs, int) and job_size.jobs > 0
)
assert (
job_size.simulations == simulations
if simulations is not None
else isinstance(job_size.simulations, int) and job_size.simulations > 0
)
assert (
job_size.blocks == blocks
if blocks is not None
else isinstance(job_size.blocks, int) and job_size.blocks > 0
)
job_sizes_by_batch_system[batch_system] = job_size
assert job_sizes_by_batch_system["local"] == job_sizes_by_batch_system["slurm"]
assert job_sizes_by_batch_system["local"].jobs == job_sizes_by_batch_system["aws"].jobs

0 comments on commit 2e9b66a

Please sign in to comment.