Skip to content

Commit

Permalink
Greatly simplify JobSize for first draft
Browse files Browse the repository at this point in the history
Mainly reduced the functionality of the `size_from_jobs_sims_blocks`
class method. Best to leave that to more feedback and GH-360.
  • Loading branch information
TimothyWillard committed Nov 7, 2024
1 parent 37ce2c1 commit c8c0723
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 147 deletions.
92 changes: 10 additions & 82 deletions flepimop/gempyor_pkg/src/gempyor/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import sys
from tempfile import mkstemp
from typing import Any, Literal, Self
import warnings

import click

Expand Down Expand Up @@ -134,10 +135,7 @@ def size_from_jobs_sims_blocks(
jobs: int | None,
simulations: int | None,
blocks: int | None,
iterations_per_slot: int | None,
slots: int | None,
subpops: int | None,
batch_system: BatchSystem,
inference_method: Literal["emcee"] | None,
) -> "JobSize":
"""
Infer a job size from several explicit and implicit parameters.
Expand All @@ -146,69 +144,17 @@ def size_from_jobs_sims_blocks(
jobs: An explicit number of jobs.
simulations: An explicit number of simulations per a block.
blocks: An explicit number of blocks per a job.
iterations_per_slot: A total number of iterations per a job, which is
simulations times blocks. Required if `simulations` or `blocks` is
not given.
slots: An implicit number of slots to use for the job. Required if `jobs`
is not given.
subpops: The number of subpopulations being considered in this job. Affects
the inferred simulations per a job on AWS. Required if `simulations`
and `blocks` are not given.
batch_size: The system the job is being sized for. Affects the inferred
simulations per a job.
inference_method: The inference method being used as different methods have
different restrictions.
Returns:
A job size instance with either the explicit or inferred job sizing.
Examples:
>>> JobSize.size_from_jobs_sims_blocks(1, 2, 3, None, None, None, "local")
JobSize(jobs=1, simulations=2, blocks=3)
>>> JobSize.size_from_jobs_sims_blocks(
... None, None, None, 100, 10, 25, "local"
... )
JobSize(jobs=10, simulations=100, blocks=1)
>>> JobSize.size_from_jobs_sims_blocks(None, None, 4, 100, 10, 25, "local")
JobSize(jobs=10, simulations=25, blocks=4)
Raises:
ValueError: If `iterations_per_slot` is `None` and either `simulations` or
`blocks` is `None`.
ValueError: If `jobs` and `slots` are both `None`.
ValueError: If `simulations`, `blocks`, and `subpops` are all `None`.
"""
if iterations_per_slot is None and (simulations is None or blocks is None):
raise ValueError(
(
"If simulations and blocks are not all explicitly "
"provided then an iterations per slot must be given."
)
)

jobs = slots if jobs is None else jobs
if jobs is None:
raise ValueError(
"If jobs is not explicitly provided, it must be given via slots."
)

if simulations is None:
if blocks is None:
if subpops is None:
raise ValueError(
(
"If simulations and blocks are not explicitly "
"provided, then a subpops must be given."
)
)
if batch_system == BatchSystem.AWS:
simulations = 5 * math.ceil(max(60 - math.sqrt(subpops), 10) / 5)
else:
simulations = iterations_per_slot
else:
simulations = math.ceil(iterations_per_slot / blocks)

if blocks is None:
blocks = math.ceil(iterations_per_slot / simulations)

inference_method = (
inference_method if inference_method is None else inference_method.lower()
)
if inference_method == "emcee":
return cls(jobs=jobs, simulations=blocks * simulations, blocks=1)
return cls(jobs=jobs, simulations=simulations, blocks=blocks)


Expand Down Expand Up @@ -754,29 +700,11 @@ def _click_batch(ctx: click.Context = mock_context, **kwargs) -> None:
)

# Job size
iterations_per_slot = (
cfg["inference"]["iterations_per_slot"].get(int)
if cfg["inference"].exists() and cfg["inference"]["iterations_per_slot"].exists()
else None
)
nslots = cfg["nslots"].get(int) if cfg["nslots"].exists() else None
subpops: int | None = None
if (
cfg["subpop_setup"].exists()
and cfg["subpop_setup"]["geodata"].exists()
and (geodata := cfg["subpop_setup"]["geodata"].as_path()).exists()
):
with geodata.open() as f:
subpops = sum(1 for _ in f)
subpops -= 1
job_size = JobSize.size_from_jobs_sims_blocks(
kwargs["jobs"],
kwargs["simulations"],
kwargs["blocks"],
iterations_per_slot,
nslots,
subpops,
batch_system,
inference_method,
)
logger.info("Preparing a job with size %s", job_size)
if inference_method == "emcee" and job_size.blocks != 1:
Expand Down
74 changes: 9 additions & 65 deletions flepimop/gempyor_pkg/tests/batch/test_job_size_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,76 +32,20 @@ def test_less_than_one_value_error(
JobSize(**kwargs)


@pytest.mark.parametrize(("simulations", "blocks"), [(None, None), (1, None), (None, 1)])
def test_size_from_jobs_sims_blocks_iteration_value_error(
simulations: int | None, blocks: int | None
) -> None:
with pytest.raises(
ValueError,
match=(
"^If simulations and blocks are not all explicitly "
"provided then an iterations per slot must be given.$"
),
):
JobSize.size_from_jobs_sims_blocks(1, simulations, blocks, None, 1, 1, "aws")


def test_size_from_jobs_sims_blocks_slots_value_error() -> None:
with pytest.raises(
ValueError,
match="^If jobs is not explicitly provided, it must be given via slots.$",
):
JobSize.size_from_jobs_sims_blocks(None, 1, 1, 1, None, 1, "aws")


def test_size_from_jobs_sims_blocks_subpops_value_error() -> None:
with pytest.raises(
ValueError,
match=(
"^If simulations and blocks are not explicitly "
"provided, then a subpops must be given.$"
),
):
JobSize.size_from_jobs_sims_blocks(1, None, None, 1, 1, None, "aws")


def generate_size_from_jobs_sims_blocks(
*args: int | None,
) -> Generator[tuple[int | None, ...], None, None]:
for combo in product(args, repeat=6):
jobs, simulations, blocks, iterations_per_slot, slots, subpops = combo
if iterations_per_slot is None and (simulations is None or blocks is None):
continue
elif jobs is None and slots is None:
continue
elif simulations is None and blocks is None and subpops is None:
continue
for combo in product(args, repeat=3):
yield combo


@pytest.mark.parametrize("combo", generate_size_from_jobs_sims_blocks(None, 1, 10))
def test_size_from_jobs_sims_blocks_output(combo: tuple[int | None, ...]) -> None:
jobs, simulations, blocks, iterations_per_slot, slots, subpops = combo
job_sizes_by_batch_system = {}
for batch_system in ("aws", "local", "slurm"):
@pytest.mark.parametrize("combo", generate_size_from_jobs_sims_blocks(1, 5, 10))
def test_size_from_jobs_sims_blocks_output(combo: tuple[int, ...]) -> None:
jobs, simulations, blocks = combo
for inference_method in (None, ""):
job_size = JobSize.size_from_jobs_sims_blocks(
jobs, simulations, blocks, iterations_per_slot, slots, subpops, batch_system
)
assert (
job_size.jobs == jobs
if jobs is not None
else isinstance(job_size.jobs, int) and job_size.jobs > 0
)
assert (
job_size.simulations == simulations
if simulations is not None
else isinstance(job_size.simulations, int) and job_size.simulations > 0
)
assert (
job_size.blocks == blocks
if blocks is not None
else isinstance(job_size.blocks, int) and job_size.blocks > 0
jobs, simulations, blocks, inference_method
)
job_sizes_by_batch_system[batch_system] = job_size
assert job_sizes_by_batch_system["local"] == job_sizes_by_batch_system["slurm"]
assert job_sizes_by_batch_system["local"].jobs == job_sizes_by_batch_system["aws"].jobs
assert job_size.jobs == jobs
assert job_size.simulations >= simulations
assert job_size.blocks >= 1

0 comments on commit c8c0723

Please sign in to comment.