diff --git a/pipeline/dags/dag_utils/dbt.py b/pipeline/dags/dag_utils/dbt.py index 59fc67a8..60bd168a 100644 --- a/pipeline/dags/dag_utils/dbt.py +++ b/pipeline/dags/dag_utils/dbt.py @@ -3,6 +3,7 @@ from airflow.models import Variable from airflow.operators import bash from airflow.utils.task_group import TaskGroup +from airflow.utils.trigger_rule import TriggerRule from dag_utils.sources import SOURCES_CONFIGS from dag_utils.virtualenvs import DBT_PYTHON_BIN_PATH @@ -13,6 +14,7 @@ def dbt_operator_factory( command: str, select: Optional[str] = None, exclude: Optional[str] = None, + trigger_rule: TriggerRule = TriggerRule.ALL_SUCCESS, ) -> bash.BashOperator: """A basic factory for bash operators operating dbt commands.""" @@ -37,6 +39,7 @@ def dbt_operator_factory( "POSTGRES_DB": "{{ conn.pg.schema }}", }, cwd=Variable.get("DBT_PROJECT_DIR"), + trigger_rule=trigger_rule, ) @@ -124,4 +127,5 @@ def get_after_geocoding_tasks(): "marts", ] ), + exclude="path:modes/intermediate/quality", ) diff --git a/pipeline/dags/import_data_inclusion_api.py b/pipeline/dags/import_data_inclusion_api.py index 610a9e45..1c7ca7df 100644 --- a/pipeline/dags/import_data_inclusion_api.py +++ b/pipeline/dags/import_data_inclusion_api.py @@ -3,8 +3,10 @@ import airflow from airflow.decorators import task from airflow.operators import empty +from airflow.utils.trigger_rule import TriggerRule from dag_utils import date +from dag_utils.dbt import dbt_operator_factory from dag_utils.virtualenvs import PYTHON_BIN_PATH default_args = {} @@ -87,4 +89,28 @@ def import_data_inclusion_api(): start = empty.EmptyOperator(task_id="start") end = empty.EmptyOperator(task_id="end") - start >> import_data_inclusion_api() >> end + build_source_stats = dbt_operator_factory( + task_id="generate_source_stats", + command="build", + select="path:models/intermediate/quality", + trigger_rule=TriggerRule.ALL_DONE, + ) + + snapshot_source_stats = dbt_operator_factory( + task_id="snapshot_source_stats", + command="snapshot", + select="quality", + trigger_rule=TriggerRule.ALL_DONE, + ) + + ( + start + >> import_data_inclusion_api() + # Will generate the daily stats 24 times a day. + # The same table will be generated, the snapshot won't + # be triggered except on day boundaries and it's fast. + # The alternative would be more complicated code. + >> build_source_stats + >> snapshot_source_stats + >> end + ) diff --git a/pipeline/dags/import_sources.py b/pipeline/dags/import_sources.py index c604aa19..222b32cd 100644 --- a/pipeline/dags/import_sources.py +++ b/pipeline/dags/import_sources.py @@ -139,7 +139,7 @@ def load_from_s3_to_data_warehouse(source_id, stream_id, run_id, logical_date): dbt_snapshot_source = dbt_operator_factory( task_id="dbt_snapshot_source", command="snapshot", - select=model_name, + select=f"sources.{model_name}", ) for stream_id in source_config["streams"]: diff --git a/pipeline/dbt/models/intermediate/quality/_quality_models.yml b/pipeline/dbt/models/intermediate/quality/_quality_models.yml index 3ee13f20..15a5ced6 100644 --- a/pipeline/dbt/models/intermediate/quality/_quality_models.yml +++ b/pipeline/dbt/models/intermediate/quality/_quality_models.yml @@ -46,45 +46,3 @@ models: - services - siaes - structures - - - name: count_raw - data_tests: - - dbt_utils.accepted_range: - min_value: 0 - inclusive: false - - - name: count_stg - data_tests: - - dbt_utils.accepted_range: - min_value: 0 - inclusive: false - - - name: count_int - data_tests: - - dbt_utils.accepted_range: - min_value: 0 - inclusive: false - - - name: count_marts - data_tests: - - dbt_utils.accepted_range: - min_value: 0 - inclusive: false - - - name: count_api - data_tests: - - dbt_utils.accepted_range: - min_value: 0 - inclusive: false - - - name: count_contacts - data_tests: - - dbt_utils.accepted_range: - min_value: 0 - inclusive: true - - - name: count_addresses - data_tests: - - dbt_utils.accepted_range: - min_value: 0 - inclusive: true diff --git a/pipeline/dbt/models/intermediate/quality/int_quality__stats.sql b/pipeline/dbt/models/intermediate/quality/int_quality__stats.sql index 8e050661..9b94d042 100644 --- a/pipeline/dbt/models/intermediate/quality/int_quality__stats.sql +++ b/pipeline/dbt/models/intermediate/quality/int_quality__stats.sql @@ -81,16 +81,16 @@ WITH */ -- noqa: disable=layout.spacing SELECT - '{{ run_started_at.strftime("%Y-%m-%d") }}' AS date_day, - '{{ source_name }}' AS source, - '{{ stream_name }}' AS stream, -- noqa: references.keywords - (SELECT COUNT(*) FROM {{ source(source_name, stream_name) }}) AS count_raw, - (SELECT COUNT(*) FROM {{ ref('stg_' ~ source_name ~ '__' ~ staging_name) }}) AS count_stg, - (SELECT COUNT(*) FROM {{ ref('int_' ~ source_name ~ '__' ~ stream_kind) }}) AS count_int, - (SELECT COUNT(*) FROM {{ source_name }}__{{ stream_name }}__tmp_marts) AS count_marts, - (SELECT COUNT(*) FROM {{ source_name }}__{{ stream_name }}__tmp_api) AS count_api, - (SELECT COUNT(*) FROM {{ source_name }}__{{ stream_name }}__tmp_api_contacts) AS count_contacts, - (SELECT COUNT(*) FROM {{ source_name }}__{{ stream_name }}__tmp_api_adresse) AS count_addresses + CAST('{{ run_started_at.strftime("%Y-%m-%d") }}' AS DATE) AS date_day, + '{{ source_name }}' AS source, + '{{ stream_name }}' AS stream, -- noqa: references.keywords + (SELECT COUNT(*) FROM {{ source(source_name, stream_name) }}) AS count_raw, + (SELECT COUNT(*) FROM {{ ref('stg_' ~ source_name ~ '__' ~ staging_name) }}) AS count_stg, + (SELECT COUNT(*) FROM {{ ref('int_' ~ source_name ~ '__' ~ stream_kind) }}) AS count_int, + (SELECT COUNT(*) FROM {{ source_name }}__{{ stream_name }}__tmp_marts) AS count_marts, + (SELECT COUNT(*) FROM {{ source_name }}__{{ stream_name }}__tmp_api) AS count_api, + (SELECT COUNT(*) FROM {{ source_name }}__{{ stream_name }}__tmp_api_contacts)AS count_contacts, + (SELECT COUNT(*) FROM {{ source_name }}__{{ stream_name }}__tmp_api_adresse) AS count_addresses -- noqa: enable=layout.spacing ), diff --git a/pipeline/dbt/snapshots/quality/snps_quality__stats.sql b/pipeline/dbt/snapshots/quality/snps_quality__stats.sql new file mode 100644 index 00000000..4321ef6e --- /dev/null +++ b/pipeline/dbt/snapshots/quality/snps_quality__stats.sql @@ -0,0 +1,18 @@ +{% snapshot snps_quality__stats %} + +{{ + config( + target_schema='snapshots', + unique_key="source||'--'||stream", + strategy='timestamp', + updated_at='date_day', + invalidate_hard_deletes=True, + ) +}} + + SELECT + source || '--' || stream AS id, + * + FROM {{ ref('int_quality__stats') }} + +{% endsnapshot %} diff --git a/pipeline/dbt/snapshots/annuaire_du_service_public/snps_annuaire_du_service_public__etablissements.sql b/pipeline/dbt/snapshots/sources/annuaire_du_service_public/snps_annuaire_du_service_public__etablissements.sql similarity index 100% rename from pipeline/dbt/snapshots/annuaire_du_service_public/snps_annuaire_du_service_public__etablissements.sql rename to pipeline/dbt/snapshots/sources/annuaire_du_service_public/snps_annuaire_du_service_public__etablissements.sql diff --git a/pipeline/dbt/snapshots/cd35/snps_cd35__organisations.sql b/pipeline/dbt/snapshots/sources/cd35/snps_cd35__organisations.sql similarity index 100% rename from pipeline/dbt/snapshots/cd35/snps_cd35__organisations.sql rename to pipeline/dbt/snapshots/sources/cd35/snps_cd35__organisations.sql diff --git a/pipeline/dbt/snapshots/dora/snps_dora__services.sql b/pipeline/dbt/snapshots/sources/dora/snps_dora__services.sql similarity index 100% rename from pipeline/dbt/snapshots/dora/snps_dora__services.sql rename to pipeline/dbt/snapshots/sources/dora/snps_dora__services.sql diff --git a/pipeline/dbt/snapshots/dora/snps_dora__structures.sql b/pipeline/dbt/snapshots/sources/dora/snps_dora__structures.sql similarity index 100% rename from pipeline/dbt/snapshots/dora/snps_dora__structures.sql rename to pipeline/dbt/snapshots/sources/dora/snps_dora__structures.sql diff --git a/pipeline/dbt/snapshots/emplois_de_linclusion/snps_emplois_de_linclusion__organisations.sql b/pipeline/dbt/snapshots/sources/emplois_de_linclusion/snps_emplois_de_linclusion__organisations.sql similarity index 100% rename from pipeline/dbt/snapshots/emplois_de_linclusion/snps_emplois_de_linclusion__organisations.sql rename to pipeline/dbt/snapshots/sources/emplois_de_linclusion/snps_emplois_de_linclusion__organisations.sql diff --git a/pipeline/dbt/snapshots/emplois_de_linclusion/snps_emplois_de_linclusion__siaes.sql b/pipeline/dbt/snapshots/sources/emplois_de_linclusion/snps_emplois_de_linclusion__siaes.sql similarity index 100% rename from pipeline/dbt/snapshots/emplois_de_linclusion/snps_emplois_de_linclusion__siaes.sql rename to pipeline/dbt/snapshots/sources/emplois_de_linclusion/snps_emplois_de_linclusion__siaes.sql diff --git a/pipeline/dbt/snapshots/finess/snps_finess__etablissements.sql b/pipeline/dbt/snapshots/sources/finess/snps_finess__etablissements.sql similarity index 100% rename from pipeline/dbt/snapshots/finess/snps_finess__etablissements.sql rename to pipeline/dbt/snapshots/sources/finess/snps_finess__etablissements.sql diff --git a/pipeline/dbt/snapshots/mes_aides/snps_mes_aides__aides.sql b/pipeline/dbt/snapshots/sources/mes_aides/snps_mes_aides__aides.sql similarity index 100% rename from pipeline/dbt/snapshots/mes_aides/snps_mes_aides__aides.sql rename to pipeline/dbt/snapshots/sources/mes_aides/snps_mes_aides__aides.sql diff --git a/pipeline/dbt/snapshots/mes_aides/snps_mes_aides__garages.sql b/pipeline/dbt/snapshots/sources/mes_aides/snps_mes_aides__garages.sql similarity index 100% rename from pipeline/dbt/snapshots/mes_aides/snps_mes_aides__garages.sql rename to pipeline/dbt/snapshots/sources/mes_aides/snps_mes_aides__garages.sql diff --git a/pipeline/dbt/snapshots/soliguide/snps_soliguide__lieux.sql b/pipeline/dbt/snapshots/sources/soliguide/snps_soliguide__lieux.sql similarity index 100% rename from pipeline/dbt/snapshots/soliguide/snps_soliguide__lieux.sql rename to pipeline/dbt/snapshots/sources/soliguide/snps_soliguide__lieux.sql diff --git a/pipeline/dbt/snapshots/un_jeune_une_solution/snps_un_jeune_une_solution__benefits.sql b/pipeline/dbt/snapshots/sources/un_jeune_une_solution/snps_un_jeune_une_solution__benefits.sql similarity index 100% rename from pipeline/dbt/snapshots/un_jeune_une_solution/snps_un_jeune_une_solution__benefits.sql rename to pipeline/dbt/snapshots/sources/un_jeune_une_solution/snps_un_jeune_une_solution__benefits.sql diff --git a/pipeline/dbt/snapshots/un_jeune_une_solution/snps_un_jeune_une_solution__institutions.sql b/pipeline/dbt/snapshots/sources/un_jeune_une_solution/snps_un_jeune_une_solution__institutions.sql similarity index 100% rename from pipeline/dbt/snapshots/un_jeune_une_solution/snps_un_jeune_une_solution__institutions.sql rename to pipeline/dbt/snapshots/sources/un_jeune_une_solution/snps_un_jeune_une_solution__institutions.sql