diff --git a/dags/notice_processing_pipeline.py b/dags/notice_processing_pipeline.py index 65deb7be..db365590 100644 --- a/dags/notice_processing_pipeline.py +++ b/dags/notice_processing_pipeline.py @@ -7,7 +7,7 @@ from dags import DEFAULT_DAG_ARGUMENTS from dags.dags_utils import get_dag_param, smart_xcom_push, smart_xcom_forward, smart_xcom_pull from dags.operators.DagBatchPipelineOperator import NoticeBatchPipelineOperator, NOTICE_IDS_KEY, \ - EXECUTE_ONLY_ONE_STEP_KEY, START_WITH_STEP_NAME_KEY, NUMBER_OF_AIRFLOW_WORKERS + EXECUTE_ONLY_ONE_STEP_KEY, START_WITH_STEP_NAME_KEY, AIRFLOW_NUMBER_OF_WORKERS from dags.pipelines.notice_batch_processor_pipelines import notices_batch_distillation_pipeline, \ notice_batch_transformer_pipeline from dags.pipelines.notice_processor_pipelines import notice_normalisation_pipeline, notice_validation_pipeline, \ @@ -47,7 +47,7 @@ def branch_selector(result_branch: str, xcom_forward_keys: List[str] = [NOTICE_I @dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, - max_active_runs=NUMBER_OF_AIRFLOW_WORKERS, + max_active_runs=AIRFLOW_NUMBER_OF_WORKERS, max_active_tasks=1, tags=['worker', 'pipeline']) def notice_processing_pipeline(): diff --git a/dags/operators/DagBatchPipelineOperator.py b/dags/operators/DagBatchPipelineOperator.py index fa565301..64b4e5db 100644 --- a/dags/operators/DagBatchPipelineOperator.py +++ b/dags/operators/DagBatchPipelineOperator.py @@ -22,7 +22,7 @@ NOTICE_PROCESSING_PIPELINE_DAG_NAME = "notice_processing_pipeline" DEFAULT_START_WITH_TASK_ID = "notice_normalisation_pipeline" DEFAULT_PIPELINE_NAME_FOR_LOGS = "unknown_pipeline_name" -NUMBER_OF_AIRFLOW_WORKERS = config.NUMBER_OF_AIRFLOW_WORKERS +AIRFLOW_NUMBER_OF_WORKERS = config.AIRFLOW_NUMBER_OF_WORKERS MAX_BATCH_SIZE = 2000 diff --git a/infra/airflow-cluster/docker-compose.yaml b/infra/airflow-cluster/docker-compose.yaml index 2c8d5ad4..be7a40f6 100644 --- a/infra/airflow-cluster/docker-compose.yaml +++ b/infra/airflow-cluster/docker-compose.yaml @@ -60,6 +60,7 @@ x-airflow-common: AIRFLOW__CELERY__WORKER_CONCURRENCY: 1 AIRFLOW__CORE__SQL_ALCHEMY_POOL_SIZE: 512 AIRFLOW__CORE__SQL_ALCHEMY_MAX_OVERFLOW: 1024 + AIRFLOW__WEBSERVER__WARN_DEPLOYMENT_EXPOSURE: 'false' AIRFLOW__CORE__SQL_ALCHEMY_CONN: "postgresql+psycopg2://${AIRFLOW_POSTGRES_USER}:${AIRFLOW_POSTGRES_PASSWORD}@postgres/${AIRFLOW_POSTGRES_DB_NAME}" AIRFLOW__CELERY__RESULT_BACKEND: "db+postgresql://${AIRFLOW_POSTGRES_USER}:${AIRFLOW_POSTGRES_PASSWORD}@postgres/${AIRFLOW_POSTGRES_DB_NAME}" AIRFLOW__CELERY__BROKER_URL: "redis://:${REDIS_PASSWORD}@redis:6379/0" diff --git a/infra/airflow/docker-compose.yaml b/infra/airflow/docker-compose.yaml index 396582f2..10d6ad93 100644 --- a/infra/airflow/docker-compose.yaml +++ b/infra/airflow/docker-compose.yaml @@ -61,6 +61,7 @@ x-airflow-common: AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true' AIRFLOW__CORE__ENABLE_XCOM_PICKLING: "true" AIRFLOW__CORE__LOAD_EXAMPLES: 'false' + AIRFLOW__WEBSERVER__WARN_DEPLOYMENT_EXPOSURE: 'false' AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth' _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-} VAULT_TOKEN: ${VAULT_TOKEN}