Skip to content

Commit

Permalink
Merge pull request #491 from OP-TED/feature/TED6-23
Browse files Browse the repository at this point in the history
Feature/ted6 23
  • Loading branch information
CaptainOfHacks authored Sep 12, 2023
2 parents c968cc7 + 6f81d83 commit 3274fd0
Show file tree
Hide file tree
Showing 4 changed files with 5 additions and 3 deletions.
4 changes: 2 additions & 2 deletions dags/notice_processing_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from dags import DEFAULT_DAG_ARGUMENTS
from dags.dags_utils import get_dag_param, smart_xcom_push, smart_xcom_forward, smart_xcom_pull
from dags.operators.DagBatchPipelineOperator import NoticeBatchPipelineOperator, NOTICE_IDS_KEY, \
EXECUTE_ONLY_ONE_STEP_KEY, START_WITH_STEP_NAME_KEY, NUMBER_OF_AIRFLOW_WORKERS
EXECUTE_ONLY_ONE_STEP_KEY, START_WITH_STEP_NAME_KEY, AIRFLOW_NUMBER_OF_WORKERS
from dags.pipelines.notice_batch_processor_pipelines import notices_batch_distillation_pipeline, \
notice_batch_transformer_pipeline
from dags.pipelines.notice_processor_pipelines import notice_normalisation_pipeline, notice_validation_pipeline, \
Expand Down Expand Up @@ -47,7 +47,7 @@ def branch_selector(result_branch: str, xcom_forward_keys: List[str] = [NOTICE_I

@dag(default_args=DEFAULT_DAG_ARGUMENTS,
schedule_interval=None,
max_active_runs=NUMBER_OF_AIRFLOW_WORKERS,
max_active_runs=AIRFLOW_NUMBER_OF_WORKERS,
max_active_tasks=1,
tags=['worker', 'pipeline'])
def notice_processing_pipeline():
Expand Down
2 changes: 1 addition & 1 deletion dags/operators/DagBatchPipelineOperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
NOTICE_PROCESSING_PIPELINE_DAG_NAME = "notice_processing_pipeline"
DEFAULT_START_WITH_TASK_ID = "notice_normalisation_pipeline"
DEFAULT_PIPELINE_NAME_FOR_LOGS = "unknown_pipeline_name"
NUMBER_OF_AIRFLOW_WORKERS = config.NUMBER_OF_AIRFLOW_WORKERS
AIRFLOW_NUMBER_OF_WORKERS = config.AIRFLOW_NUMBER_OF_WORKERS
MAX_BATCH_SIZE = 2000


Expand Down
1 change: 1 addition & 0 deletions infra/airflow-cluster/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ x-airflow-common:
AIRFLOW__CELERY__WORKER_CONCURRENCY: 1
AIRFLOW__CORE__SQL_ALCHEMY_POOL_SIZE: 512
AIRFLOW__CORE__SQL_ALCHEMY_MAX_OVERFLOW: 1024
AIRFLOW__WEBSERVER__WARN_DEPLOYMENT_EXPOSURE: 'false'
AIRFLOW__CORE__SQL_ALCHEMY_CONN: "postgresql+psycopg2://${AIRFLOW_POSTGRES_USER}:${AIRFLOW_POSTGRES_PASSWORD}@postgres/${AIRFLOW_POSTGRES_DB_NAME}"
AIRFLOW__CELERY__RESULT_BACKEND: "db+postgresql://${AIRFLOW_POSTGRES_USER}:${AIRFLOW_POSTGRES_PASSWORD}@postgres/${AIRFLOW_POSTGRES_DB_NAME}"
AIRFLOW__CELERY__BROKER_URL: "redis://:${REDIS_PASSWORD}@redis:6379/0"
Expand Down
1 change: 1 addition & 0 deletions infra/airflow/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ x-airflow-common:
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__ENABLE_XCOM_PICKLING: "true"
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
AIRFLOW__WEBSERVER__WARN_DEPLOYMENT_EXPOSURE: 'false'
AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
VAULT_TOKEN: ${VAULT_TOKEN}
Expand Down

0 comments on commit 3274fd0

Please sign in to comment.