Skip to content

Commit

Permalink
fixed handling of preemptibles
Browse files Browse the repository at this point in the history
  • Loading branch information
bw2 committed Jun 26, 2024
1 parent 5cd1142 commit 37ec120
Showing 1 changed file with 19 additions and 18 deletions.
37 changes: 19 additions & 18 deletions step_pipeline/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def __init__(self, name=None, config_arg_parser=None, backend=Backend.HAIL_BATCH
self._default_storage = None
self._default_timeout = None
self._default_custom_machine_type = None
self._default_custom_machine_is_preemptible = None
self._default_preemptible = None
self._backend_obj = None

@property
Expand All @@ -88,7 +88,7 @@ def new_step(
always_run=False,
timeout=None,
custom_machine_type=None,
custom_machine_is_preemptible=None,
preemptible=None,
output_dir=None,
reuse_job_from_previous_step=None,
localize_by=Localize.COPY,
Expand Down Expand Up @@ -127,7 +127,7 @@ def new_step(
always_run (bool): Set the Step to always run, even if dependencies fail.
timeout (float, int): Set the maximum amount of time this job can run for before being killed.
custom_machine_type (str): Use a custom Cloud machine type, eg. 'n1-highmem-32'
custom_machine_is_preemptible (bool): Whether to use a preemptible machine type.
preemptible (bool): Whether to use a preemptible machine type.
output_dir (str): Optional default output directory for Step outputs.
reuse_job_from_previous_step (Step): Optionally, reuse the batch.Job object from this other upstream Step.
localize_by (Localize): If specified, this will be the default Localize approach used by Step inputs.
Expand Down Expand Up @@ -158,7 +158,7 @@ def new_step(
always_run=always_run,
timeout=timeout,
custom_machine_type=custom_machine_type,
custom_machine_is_preemptible=custom_machine_is_preemptible,
preemptible=preemptible,
output_dir=self._default_output_dir or output_dir,
reuse_job_from_previous_step=reuse_job_from_previous_step,
localize_by=localize_by,
Expand Down Expand Up @@ -273,13 +273,13 @@ def default_custom_machine_type(self, default_custom_machine_type):
self._default_custom_machine_type = default_custom_machine_type
return self

def default_custom_machine_is_preemptible(self, default_custom_machine_is_preemptible):
def default_preemptible(self, default_preemptible):
"""Set whether the custom machine should be preemptible.
Args:
default_custom_machine_is_preemptible (bool): Whether the custom machine is preemptible.
default_preemptible (bool): Whether the custom machine is preemptible.
"""
self._default_custom_machine_is_preemptible = default_custom_machine_is_preemptible
self._default_preemptible = default_preemptible
return self

def run(self):
Expand Down Expand Up @@ -423,7 +423,7 @@ def __init__(
always_run=False,
timeout=None,
custom_machine_type=None,
custom_machine_is_preemptible=None,
preemptible=None,
output_dir=None,
reuse_job_from_previous_step=None,
localize_by=Localize.COPY,
Expand Down Expand Up @@ -462,7 +462,7 @@ def __init__(
always_run (bool): Set the Step to always run, even if dependencies fail.
timeout (float, int): Set the maximum amount of time this job can run for before being killed.
custom_machine_type (str):
custom_machine_is_preemptible (bool):
preemptible (bool):
output_dir (str): Optional default output directory for Step outputs.
reuse_job_from_previous_step (Step): Optionally, reuse the batch.Job object from this other upstream Step.
localize_by (Localize): If specified, this will be the default Localize approach used by Step inputs.
Expand Down Expand Up @@ -501,7 +501,7 @@ def __init__(
self._always_run = always_run
self._timeout = timeout
self._custom_machine_type = custom_machine_type
self._custom_machine_is_preemptible = custom_machine_is_preemptible
self._preemptible = preemptible
self._reuse_job_from_previous_step = reuse_job_from_previous_step

self._job = None
Expand Down Expand Up @@ -610,13 +610,13 @@ def custom_machine_type(self, custom_machine_type):
self._custom_machine_type = custom_machine_type
return self

def custom_machine_is_preemptible(self, custom_machine_is_preemptible):
def preemptible(self, preemptible):
"""Set whether the custom machine should be preemptible.
Args:
custom_machine_is_preemptible (bool): Whether the custom machine is preemptible.
preemptible (bool): Whether the custom machine is preemptible.
"""
self._custom_machine_is_preemptible = custom_machine_is_preemptible
self._preemptible = preemptible
return self

def _transfer_step(self):
Expand Down Expand Up @@ -674,16 +674,17 @@ def _transfer_step(self):
raise ValueError(f"Unexpected memory arg type: {type(self._memory)}")

custom_machine_type_requested = any(p is not None for p in [
self._custom_machine_type, self._custom_machine_is_preemptible,
self._pipeline._default_custom_machine_type, self._pipeline._default_custom_machine_is_preemptible,
self._custom_machine_type, self._pipeline._default_custom_machine_type,
])
if custom_machine_type_requested:
if self._cpu or self._memory:
raise ValueError("Both a custom_machine_type or custom_machine_is_preemptible as well as cpu or memory "
raise ValueError("Both a custom_machine_type or preemptible as well as cpu or memory "
"arguments were specified. Only one or the other should be provided.")
self._job._machine_type = self._custom_machine_type or self._pipeline._default_custom_machine_type
#if self._custom_machine_is_preemptible is not None or self._default_custom_machine_is_preemptible is not None:
self._job._preemptible = self._custom_machine_is_preemptible or self._pipeline._default_custom_machine_is_preemptible

if self._preemptible is not None or self._pipeline._default_preemptible is not None:
print("Setting preemptible to", self._preemptible if self._preemptible is not None else self._pipeline._default_preemptible)
self._job._preemptible = self._preemptible if self._preemptible is not None else self._pipeline._default_preemptible

if self._storage:
self._job.storage(self._storage)
Expand Down

0 comments on commit 37ec120

Please sign in to comment.