From 78a774737ace6dc4f4beba64d0a45ea9a499cad6 Mon Sep 17 00:00:00 2001 From: Hugo Lecuyer Date: Wed, 21 Aug 2024 15:11:54 +0200 Subject: [PATCH] feat(core): decoralate intermediate states --- pipeline/dags/dag_utils/dbt.py | 31 +++++++++++---------- pipeline/dags/dag_utils/sources/__init__.py | 2 ++ pipeline/dags/import_sources.py | 2 ++ pipeline/dbt/dbt_project.yml | 2 +- 4 files changed, 21 insertions(+), 16 deletions(-) diff --git a/pipeline/dags/dag_utils/dbt.py b/pipeline/dags/dag_utils/dbt.py index e229f60e..a202df6e 100644 --- a/pipeline/dags/dag_utils/dbt.py +++ b/pipeline/dags/dag_utils/dbt.py @@ -1,3 +1,4 @@ +import json from typing import Optional from airflow.models import Variable @@ -12,6 +13,7 @@ def dbt_operator_factory( task_id: str, command: str, + dbt_vars: Optional[dict] = None, select: Optional[str] = None, exclude: Optional[str] = None, trigger_rule: TriggerRule = TriggerRule.ALL_SUCCESS, @@ -23,11 +25,15 @@ def dbt_operator_factory( dbt_args += f" --select {select}" if exclude is not None: dbt_args += f" --exclude {exclude}" + if dbt_vars is not None: + dbt_vars = json.dumps(dbt_vars) + dbt_args += f" --vars '{dbt_vars}'" return bash.BashOperator( task_id=task_id, bash_command=f"{DBT_PYTHON_BIN_PATH.parent / 'dbt'} {dbt_args}", append_env=True, + trigger_rule=trigger_rule, env={ "DBT_PROFILES_DIR": Variable.get("DBT_PROJECT_DIR"), "DBT_TARGET_PATH": Variable.get("DBT_TARGET_PATH", "target"), @@ -39,7 +45,6 @@ def dbt_operator_factory( "POSTGRES_DB": "{{ conn.pg.schema }}", }, cwd=Variable.get("DBT_PROJECT_DIR"), - trigger_rule=trigger_rule, ) @@ -47,7 +52,7 @@ def get_staging_tasks(schedule=None): task_list = [] for source_id, src_meta in sorted(SOURCES_CONFIGS.items()): - if schedule and src_meta["schedule"] != schedule: + if schedule and "schedule" in src_meta and src_meta["schedule"] != schedule: continue dbt_source_id = source_id.replace("-", "_") @@ -68,23 +73,24 @@ def get_staging_tasks(schedule=None): select=stg_selector, ) - dbt_run_intermediate = dbt_operator_factory( - task_id="dbt_run_intermediate", - command="run", + dbt_build_intermediate_tmp = dbt_operator_factory( + task_id="dbt_build_intermediate_tmp", + command="build", select=int_selector, + dbt_vars={"build_intermediate_tmp": True}, ) - dbt_test_intermediate = dbt_operator_factory( - task_id="dbt_test_intermediate", - command="test", + dbt_run_intermediate = dbt_operator_factory( + task_id="dbt_run_intermediate", + command="run", select=int_selector, ) ( dbt_run_staging >> dbt_test_staging + >> dbt_build_intermediate_tmp >> dbt_run_intermediate - >> dbt_test_intermediate ) task_list += [source_task_group] @@ -98,17 +104,12 @@ def get_before_geocoding_tasks(): command="build", select=" ".join( [ - # FIXME: handle odspep as other sources (add to dags/settings.py) - "path:models/staging/sources/odspep", - "path:models/intermediate/sources/odspep", - # FIXME: handle monenfant as other sources (add to dags/settings.py) - "path:models/staging/sources/monenfant", - "path:models/intermediate/sources/monenfant", "path:models/intermediate/int__union_adresses.sql", "path:models/intermediate/int__union_services.sql", "path:models/intermediate/int__union_structures.sql", ] ), + trigger_rule=TriggerRule.ALL_DONE, ) diff --git a/pipeline/dags/dag_utils/sources/__init__.py b/pipeline/dags/dag_utils/sources/__init__.py index 97083bf9..40b42c16 100644 --- a/pipeline/dags/dag_utils/sources/__init__.py +++ b/pipeline/dags/dag_utils/sources/__init__.py @@ -250,6 +250,8 @@ }, }, }, + "odspep": {}, + "monenfant": {}, } diff --git a/pipeline/dags/import_sources.py b/pipeline/dags/import_sources.py index 222b32cd..4aef1712 100644 --- a/pipeline/dags/import_sources.py +++ b/pipeline/dags/import_sources.py @@ -121,6 +121,8 @@ def load_from_s3_to_data_warehouse(source_id, stream_id, run_id, logical_date): for source_id, source_config in sources.SOURCES_CONFIGS.items(): + if "streams" not in source_config: + continue model_name = source_id.replace("-", "_") dag_id = f"import_{model_name}" diff --git a/pipeline/dbt/dbt_project.yml b/pipeline/dbt/dbt_project.yml index c90ff5b0..77f6a86a 100644 --- a/pipeline/dbt/dbt_project.yml +++ b/pipeline/dbt/dbt_project.yml @@ -24,7 +24,7 @@ models: +materialized: view intermediate: - +schema: intermediate + +schema: "{{ 'intermediate_tmp' if var('build_intermediate_tmp', false) else 'intermediate' }}" +materialized: table marts: