From 37ec12010d18f05af204c1615c85772e0332707c Mon Sep 17 00:00:00 2001 From: bw2 Date: Wed, 26 Jun 2024 06:35:47 -0400 Subject: [PATCH] fixed handling of preemptibles --- step_pipeline/batch.py | 37 +++++++++++++++++++------------------ 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/step_pipeline/batch.py b/step_pipeline/batch.py index 414b4ba..5232128 100644 --- a/step_pipeline/batch.py +++ b/step_pipeline/batch.py @@ -67,7 +67,7 @@ def __init__(self, name=None, config_arg_parser=None, backend=Backend.HAIL_BATCH self._default_storage = None self._default_timeout = None self._default_custom_machine_type = None - self._default_custom_machine_is_preemptible = None + self._default_preemptible = None self._backend_obj = None @property @@ -88,7 +88,7 @@ def new_step( always_run=False, timeout=None, custom_machine_type=None, - custom_machine_is_preemptible=None, + preemptible=None, output_dir=None, reuse_job_from_previous_step=None, localize_by=Localize.COPY, @@ -127,7 +127,7 @@ def new_step( always_run (bool): Set the Step to always run, even if dependencies fail. timeout (float, int): Set the maximum amount of time this job can run for before being killed. custom_machine_type (str): Use a custom Cloud machine type, eg. 'n1-highmem-32' - custom_machine_is_preemptible (bool): Whether to use a preemptible machine type. + preemptible (bool): Whether to use a preemptible machine type. output_dir (str): Optional default output directory for Step outputs. reuse_job_from_previous_step (Step): Optionally, reuse the batch.Job object from this other upstream Step. localize_by (Localize): If specified, this will be the default Localize approach used by Step inputs. @@ -158,7 +158,7 @@ def new_step( always_run=always_run, timeout=timeout, custom_machine_type=custom_machine_type, - custom_machine_is_preemptible=custom_machine_is_preemptible, + preemptible=preemptible, output_dir=self._default_output_dir or output_dir, reuse_job_from_previous_step=reuse_job_from_previous_step, localize_by=localize_by, @@ -273,13 +273,13 @@ def default_custom_machine_type(self, default_custom_machine_type): self._default_custom_machine_type = default_custom_machine_type return self - def default_custom_machine_is_preemptible(self, default_custom_machine_is_preemptible): + def default_preemptible(self, default_preemptible): """Set whether the custom machine should be preemptible. Args: - default_custom_machine_is_preemptible (bool): Whether the custom machine is preemptible. + default_preemptible (bool): Whether the custom machine is preemptible. """ - self._default_custom_machine_is_preemptible = default_custom_machine_is_preemptible + self._default_preemptible = default_preemptible return self def run(self): @@ -423,7 +423,7 @@ def __init__( always_run=False, timeout=None, custom_machine_type=None, - custom_machine_is_preemptible=None, + preemptible=None, output_dir=None, reuse_job_from_previous_step=None, localize_by=Localize.COPY, @@ -462,7 +462,7 @@ def __init__( always_run (bool): Set the Step to always run, even if dependencies fail. timeout (float, int): Set the maximum amount of time this job can run for before being killed. custom_machine_type (str): - custom_machine_is_preemptible (bool): + preemptible (bool): output_dir (str): Optional default output directory for Step outputs. reuse_job_from_previous_step (Step): Optionally, reuse the batch.Job object from this other upstream Step. localize_by (Localize): If specified, this will be the default Localize approach used by Step inputs. @@ -501,7 +501,7 @@ def __init__( self._always_run = always_run self._timeout = timeout self._custom_machine_type = custom_machine_type - self._custom_machine_is_preemptible = custom_machine_is_preemptible + self._preemptible = preemptible self._reuse_job_from_previous_step = reuse_job_from_previous_step self._job = None @@ -610,13 +610,13 @@ def custom_machine_type(self, custom_machine_type): self._custom_machine_type = custom_machine_type return self - def custom_machine_is_preemptible(self, custom_machine_is_preemptible): + def preemptible(self, preemptible): """Set whether the custom machine should be preemptible. Args: - custom_machine_is_preemptible (bool): Whether the custom machine is preemptible. + preemptible (bool): Whether the custom machine is preemptible. """ - self._custom_machine_is_preemptible = custom_machine_is_preemptible + self._preemptible = preemptible return self def _transfer_step(self): @@ -674,16 +674,17 @@ def _transfer_step(self): raise ValueError(f"Unexpected memory arg type: {type(self._memory)}") custom_machine_type_requested = any(p is not None for p in [ - self._custom_machine_type, self._custom_machine_is_preemptible, - self._pipeline._default_custom_machine_type, self._pipeline._default_custom_machine_is_preemptible, + self._custom_machine_type, self._pipeline._default_custom_machine_type, ]) if custom_machine_type_requested: if self._cpu or self._memory: - raise ValueError("Both a custom_machine_type or custom_machine_is_preemptible as well as cpu or memory " + raise ValueError("Both a custom_machine_type or preemptible as well as cpu or memory " "arguments were specified. Only one or the other should be provided.") self._job._machine_type = self._custom_machine_type or self._pipeline._default_custom_machine_type - #if self._custom_machine_is_preemptible is not None or self._default_custom_machine_is_preemptible is not None: - self._job._preemptible = self._custom_machine_is_preemptible or self._pipeline._default_custom_machine_is_preemptible + + if self._preemptible is not None or self._pipeline._default_preemptible is not None: + print("Setting preemptible to", self._preemptible if self._preemptible is not None else self._pipeline._default_preemptible) + self._job._preemptible = self._preemptible if self._preemptible is not None else self._pipeline._default_preemptible if self._storage: self._job.storage(self._storage)