From 0f2c69f8249aa0c53ebcf10afa2394da506a953f Mon Sep 17 00:00:00 2001 From: sungjun lee Date: Tue, 28 May 2024 23:58:50 +0900 Subject: [PATCH] Update the randomize_start argument to randomize_start_duration to accept an integer specifying the maximum number of seconds to delay the start of each task. (#199) --- README.md | 2 +- src/datatrove/executor/base.py | 10 +++++----- src/datatrove/executor/local.py | 6 +++--- src/datatrove/executor/slurm.py | 8 ++++---- 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/README.md b/README.md index dd08a8eb..41d576b3 100644 --- a/README.md +++ b/README.md @@ -96,7 +96,7 @@ Some options common to all executors: - `pipeline` a list consisting of the pipeline steps that should be run - `logging_dir` a datafolder where log files, statistics and more should be saved. Do not reuse folders for different pipelines/jobs as this will overwrite your stats, logs and completions. - `skip_completed` (_bool_, `True` by default) datatrove keeps track of completed tasks so that when you relaunch a job they can be skipped. Set this to `False` to disable this behaviour -- `randomize_start` (_bool_, `False` by default) randomizes the start time of each task within a job by approximately 3 minutes to prevent all tasks from starting simultaneously and potentially overloading the system. +- `randomize_start_duration` (_int_, `0` by default) the maximum number of seconds to delay the start of each task to prevent all tasks from starting simultaneously and potentially overloading the system. Call an executor's `run` method to execute its pipeline. diff --git a/src/datatrove/executor/base.py b/src/datatrove/executor/base.py index 7acc4bb5..2ebdb647 100644 --- a/src/datatrove/executor/base.py +++ b/src/datatrove/executor/base.py @@ -29,7 +29,7 @@ class PipelineExecutor(ABC): logging_dir: where to save logs, stats, etc. Should be parsable into a datatrove.io.DataFolder skip_completed: whether to skip tasks that were completed in previous runs. default: True - randomize_start: randomize the start of each task in a job in a ~3 min window + randomize_start_duration: the maximum number of seconds to delay the start of each task. """ @abstractmethod @@ -38,12 +38,12 @@ def __init__( pipeline: list[PipelineStep | Callable], logging_dir: DataFolderLike = None, skip_completed: bool = True, - randomize_start: bool = False, + randomize_start_duration: int = 0, ): self.pipeline: list[PipelineStep | Callable] = pipeline self.logging_dir = get_datafolder(logging_dir if logging_dir else f"logs/{get_timestamp()}_{get_random_str()}") self.skip_completed = skip_completed - self.randomize_start = randomize_start + self.randomize_start_duration = randomize_start_duration @abstractmethod def run(self): @@ -80,8 +80,8 @@ def _run_for_rank(self, rank: int, local_rank: int = 0) -> PipelineStats: logfile = add_task_logger(self.logging_dir, rank, local_rank) log_pipeline(self.pipeline) - if self.randomize_start: - time.sleep(random.randint(0, 60 * 3)) + if self.randomize_start_duration > 0: + time.sleep(random.randint(0, self.randomize_start_duration)) try: # pipe data from one step to the next pipelined_data = None diff --git a/src/datatrove/executor/local.py b/src/datatrove/executor/local.py index 4b0e1380..24d76a18 100644 --- a/src/datatrove/executor/local.py +++ b/src/datatrove/executor/local.py @@ -30,7 +30,7 @@ class LocalPipelineExecutor(PipelineExecutor): Tasks [local_rank_offset, local_rank_offset + local_tasks] will be run. depends: another LocalPipelineExecutor that should run before this one - randomize_start: randomize the start of each task in a job in a ~3 min window + randomize_start_duration: the maximum number of seconds to delay the start of each task. """ def __init__( @@ -44,9 +44,9 @@ def __init__( start_method: str = "forkserver", local_tasks: int = -1, local_rank_offset: int = 0, - randomize_start: bool = False, + randomize_start_duration: int = 0, ): - super().__init__(pipeline, logging_dir, skip_completed, randomize_start) + super().__init__(pipeline, logging_dir, skip_completed, randomize_start_duration) self.tasks = tasks self.workers = workers if workers != -1 else tasks self.start_method = start_method diff --git a/src/datatrove/executor/slurm.py b/src/datatrove/executor/slurm.py index 2f21aeb5..b0cc9077 100644 --- a/src/datatrove/executor/slurm.py +++ b/src/datatrove/executor/slurm.py @@ -72,7 +72,7 @@ class SlurmPipelineExecutor(PipelineExecutor): stagger_max_array_jobs: when max_array_launch_parallel is True, this determines how many seconds to wait between launching each of the parallel jobs run_on_dependency_fail: start executing when a job we depend on finishes even if it has failed - randomize_start: randomize the start of each task in a job in a ~3 min window + randomize_start_duration: the maximum number of seconds to delay the start of each task. requeue_signals: requeue the job and exit when one of these signals is received. Useful for when an instance is being reclaimed and jobs must be stopped for example. Set to None to disable mail_type: see https://slurm.schedmd.com/sbatch.html. Common values are (NONE, BEGIN, END, FAIL, REQUEUE, ALL) @@ -105,7 +105,7 @@ def __init__( max_array_launch_parallel: bool = False, stagger_max_array_jobs: int = 0, run_on_dependency_fail: bool = False, - randomize_start: bool = False, + randomize_start_duration: int = 0, requeue_signals: tuple[str] | None = ("SIGUSR1",), mail_type: str = "ALL", mail_user: str = None, @@ -113,7 +113,7 @@ def __init__( srun_args: dict = None, tasks_per_job: int = 1, ): - super().__init__(pipeline, logging_dir, skip_completed, randomize_start) + super().__init__(pipeline, logging_dir, skip_completed, randomize_start_duration) self.tasks = tasks self.workers = workers self.partition = partition @@ -133,7 +133,7 @@ def __init__( self.max_array_launch_parallel = max_array_launch_parallel self.stagger_max_array_jobs = stagger_max_array_jobs self.run_on_dependency_fail = run_on_dependency_fail - self.randomize_start = randomize_start + self.randomize_start_duration = randomize_start_duration self.job_id = None self.requeue_signals = requeue_signals self.mail_type = mail_type