Skip to content

Commit

Permalink
Update DagBatchPipelineOperator.py
Browse files Browse the repository at this point in the history
  • Loading branch information
CaptainOfHacks committed Sep 20, 2023
1 parent acf52b4 commit 0764261
Showing 1 changed file with 6 additions and 2 deletions.
8 changes: 6 additions & 2 deletions dags/operators/DagBatchPipelineOperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
DEFAULT_START_WITH_TASK_ID = "notice_normalisation_pipeline"
DEFAULT_PIPELINE_NAME_FOR_LOGS = "unknown_pipeline_name"
AIRFLOW_NUMBER_OF_WORKERS = config.AIRFLOW_NUMBER_OF_WORKERS
DEFAULT_BATCH_SIZE = 2000
MAX_BATCH_SIZE = 2000


class BatchPipelineCallable(Protocol):
Expand Down Expand Up @@ -142,7 +142,11 @@ def execute(self, context: Any):
self.execute_only_one_step = get_dag_param(key=EXECUTE_ONLY_ONE_STEP_KEY, default_value=False)
notice_ids = pull_dag_upstream(key=NOTICE_IDS_KEY)
if notice_ids:
batch_size = self.batch_size or DEFAULT_BATCH_SIZE
if self.batch_size:
batch_size = 1 + len(notice_ids) // AIRFLOW_NUMBER_OF_WORKERS
batch_size = batch_size if batch_size < MAX_BATCH_SIZE else MAX_BATCH_SIZE
else:
batch_size = self.batch_size
for notice_batch in chunks(notice_ids, chunk_size=batch_size):
TriggerDagRunOperator(
task_id=f'trigger_worker_dag_{uuid4().hex}',
Expand Down

0 comments on commit 0764261

Please sign in to comment.