From 66c9496baecd40c75d4e53f6ebafe377d70045db Mon Sep 17 00:00:00 2001 From: Victor Perron Date: Mon, 5 Aug 2024 16:46:45 +0200 Subject: [PATCH 1/5] chore(pipeline) : Move the snapshots to a `sources` folder To mimic the behavior found with all the models. --- pipeline/dags/import_sources.py | 2 +- .../snps_annuaire_du_service_public__etablissements.sql | 0 .../snapshots/{ => sources}/cd35/snps_cd35__organisations.sql | 0 .../dbt/snapshots/{ => sources}/dora/snps_dora__services.sql | 0 .../dbt/snapshots/{ => sources}/dora/snps_dora__structures.sql | 0 .../snps_emplois_de_linclusion__organisations.sql | 0 .../emplois_de_linclusion/snps_emplois_de_linclusion__siaes.sql | 0 .../{ => sources}/finess/snps_finess__etablissements.sql | 0 .../snapshots/{ => sources}/mes_aides/snps_mes_aides__aides.sql | 0 .../{ => sources}/mes_aides/snps_mes_aides__garages.sql | 0 .../snapshots/{ => sources}/soliguide/snps_soliguide__lieux.sql | 0 .../snps_un_jeune_une_solution__benefits.sql | 0 .../snps_un_jeune_une_solution__institutions.sql | 0 13 files changed, 1 insertion(+), 1 deletion(-) rename pipeline/dbt/snapshots/{ => sources}/annuaire_du_service_public/snps_annuaire_du_service_public__etablissements.sql (100%) rename pipeline/dbt/snapshots/{ => sources}/cd35/snps_cd35__organisations.sql (100%) rename pipeline/dbt/snapshots/{ => sources}/dora/snps_dora__services.sql (100%) rename pipeline/dbt/snapshots/{ => sources}/dora/snps_dora__structures.sql (100%) rename pipeline/dbt/snapshots/{ => sources}/emplois_de_linclusion/snps_emplois_de_linclusion__organisations.sql (100%) rename pipeline/dbt/snapshots/{ => sources}/emplois_de_linclusion/snps_emplois_de_linclusion__siaes.sql (100%) rename pipeline/dbt/snapshots/{ => sources}/finess/snps_finess__etablissements.sql (100%) rename pipeline/dbt/snapshots/{ => sources}/mes_aides/snps_mes_aides__aides.sql (100%) rename pipeline/dbt/snapshots/{ => sources}/mes_aides/snps_mes_aides__garages.sql (100%) rename pipeline/dbt/snapshots/{ => sources}/soliguide/snps_soliguide__lieux.sql (100%) rename pipeline/dbt/snapshots/{ => sources}/un_jeune_une_solution/snps_un_jeune_une_solution__benefits.sql (100%) rename pipeline/dbt/snapshots/{ => sources}/un_jeune_une_solution/snps_un_jeune_une_solution__institutions.sql (100%) diff --git a/pipeline/dags/import_sources.py b/pipeline/dags/import_sources.py index c604aa19..222b32cd 100644 --- a/pipeline/dags/import_sources.py +++ b/pipeline/dags/import_sources.py @@ -139,7 +139,7 @@ def load_from_s3_to_data_warehouse(source_id, stream_id, run_id, logical_date): dbt_snapshot_source = dbt_operator_factory( task_id="dbt_snapshot_source", command="snapshot", - select=model_name, + select=f"sources.{model_name}", ) for stream_id in source_config["streams"]: diff --git a/pipeline/dbt/snapshots/annuaire_du_service_public/snps_annuaire_du_service_public__etablissements.sql b/pipeline/dbt/snapshots/sources/annuaire_du_service_public/snps_annuaire_du_service_public__etablissements.sql similarity index 100% rename from pipeline/dbt/snapshots/annuaire_du_service_public/snps_annuaire_du_service_public__etablissements.sql rename to pipeline/dbt/snapshots/sources/annuaire_du_service_public/snps_annuaire_du_service_public__etablissements.sql diff --git a/pipeline/dbt/snapshots/cd35/snps_cd35__organisations.sql b/pipeline/dbt/snapshots/sources/cd35/snps_cd35__organisations.sql similarity index 100% rename from pipeline/dbt/snapshots/cd35/snps_cd35__organisations.sql rename to pipeline/dbt/snapshots/sources/cd35/snps_cd35__organisations.sql diff --git a/pipeline/dbt/snapshots/dora/snps_dora__services.sql b/pipeline/dbt/snapshots/sources/dora/snps_dora__services.sql similarity index 100% rename from pipeline/dbt/snapshots/dora/snps_dora__services.sql rename to pipeline/dbt/snapshots/sources/dora/snps_dora__services.sql diff --git a/pipeline/dbt/snapshots/dora/snps_dora__structures.sql b/pipeline/dbt/snapshots/sources/dora/snps_dora__structures.sql similarity index 100% rename from pipeline/dbt/snapshots/dora/snps_dora__structures.sql rename to pipeline/dbt/snapshots/sources/dora/snps_dora__structures.sql diff --git a/pipeline/dbt/snapshots/emplois_de_linclusion/snps_emplois_de_linclusion__organisations.sql b/pipeline/dbt/snapshots/sources/emplois_de_linclusion/snps_emplois_de_linclusion__organisations.sql similarity index 100% rename from pipeline/dbt/snapshots/emplois_de_linclusion/snps_emplois_de_linclusion__organisations.sql rename to pipeline/dbt/snapshots/sources/emplois_de_linclusion/snps_emplois_de_linclusion__organisations.sql diff --git a/pipeline/dbt/snapshots/emplois_de_linclusion/snps_emplois_de_linclusion__siaes.sql b/pipeline/dbt/snapshots/sources/emplois_de_linclusion/snps_emplois_de_linclusion__siaes.sql similarity index 100% rename from pipeline/dbt/snapshots/emplois_de_linclusion/snps_emplois_de_linclusion__siaes.sql rename to pipeline/dbt/snapshots/sources/emplois_de_linclusion/snps_emplois_de_linclusion__siaes.sql diff --git a/pipeline/dbt/snapshots/finess/snps_finess__etablissements.sql b/pipeline/dbt/snapshots/sources/finess/snps_finess__etablissements.sql similarity index 100% rename from pipeline/dbt/snapshots/finess/snps_finess__etablissements.sql rename to pipeline/dbt/snapshots/sources/finess/snps_finess__etablissements.sql diff --git a/pipeline/dbt/snapshots/mes_aides/snps_mes_aides__aides.sql b/pipeline/dbt/snapshots/sources/mes_aides/snps_mes_aides__aides.sql similarity index 100% rename from pipeline/dbt/snapshots/mes_aides/snps_mes_aides__aides.sql rename to pipeline/dbt/snapshots/sources/mes_aides/snps_mes_aides__aides.sql diff --git a/pipeline/dbt/snapshots/mes_aides/snps_mes_aides__garages.sql b/pipeline/dbt/snapshots/sources/mes_aides/snps_mes_aides__garages.sql similarity index 100% rename from pipeline/dbt/snapshots/mes_aides/snps_mes_aides__garages.sql rename to pipeline/dbt/snapshots/sources/mes_aides/snps_mes_aides__garages.sql diff --git a/pipeline/dbt/snapshots/soliguide/snps_soliguide__lieux.sql b/pipeline/dbt/snapshots/sources/soliguide/snps_soliguide__lieux.sql similarity index 100% rename from pipeline/dbt/snapshots/soliguide/snps_soliguide__lieux.sql rename to pipeline/dbt/snapshots/sources/soliguide/snps_soliguide__lieux.sql diff --git a/pipeline/dbt/snapshots/un_jeune_une_solution/snps_un_jeune_une_solution__benefits.sql b/pipeline/dbt/snapshots/sources/un_jeune_une_solution/snps_un_jeune_une_solution__benefits.sql similarity index 100% rename from pipeline/dbt/snapshots/un_jeune_une_solution/snps_un_jeune_une_solution__benefits.sql rename to pipeline/dbt/snapshots/sources/un_jeune_une_solution/snps_un_jeune_une_solution__benefits.sql diff --git a/pipeline/dbt/snapshots/un_jeune_une_solution/snps_un_jeune_une_solution__institutions.sql b/pipeline/dbt/snapshots/sources/un_jeune_une_solution/snps_un_jeune_une_solution__institutions.sql similarity index 100% rename from pipeline/dbt/snapshots/un_jeune_une_solution/snps_un_jeune_une_solution__institutions.sql rename to pipeline/dbt/snapshots/sources/un_jeune_une_solution/snps_un_jeune_une_solution__institutions.sql From bc611b99578a20dd551a97192fdec172cb0d1b17 Mon Sep 17 00:00:00 2001 From: Victor Perron Date: Mon, 5 Aug 2024 17:06:35 +0200 Subject: [PATCH 2/5] feat(monitoring) : Add a snapshot of the quality stats --- .../quality/int_quality__stats.sql | 20 +++++++++---------- .../snapshots/quality/snps_quality__stats.sql | 18 +++++++++++++++++ 2 files changed, 28 insertions(+), 10 deletions(-) create mode 100644 pipeline/dbt/snapshots/quality/snps_quality__stats.sql diff --git a/pipeline/dbt/models/intermediate/quality/int_quality__stats.sql b/pipeline/dbt/models/intermediate/quality/int_quality__stats.sql index 8e050661..9b94d042 100644 --- a/pipeline/dbt/models/intermediate/quality/int_quality__stats.sql +++ b/pipeline/dbt/models/intermediate/quality/int_quality__stats.sql @@ -81,16 +81,16 @@ WITH */ -- noqa: disable=layout.spacing SELECT - '{{ run_started_at.strftime("%Y-%m-%d") }}' AS date_day, - '{{ source_name }}' AS source, - '{{ stream_name }}' AS stream, -- noqa: references.keywords - (SELECT COUNT(*) FROM {{ source(source_name, stream_name) }}) AS count_raw, - (SELECT COUNT(*) FROM {{ ref('stg_' ~ source_name ~ '__' ~ staging_name) }}) AS count_stg, - (SELECT COUNT(*) FROM {{ ref('int_' ~ source_name ~ '__' ~ stream_kind) }}) AS count_int, - (SELECT COUNT(*) FROM {{ source_name }}__{{ stream_name }}__tmp_marts) AS count_marts, - (SELECT COUNT(*) FROM {{ source_name }}__{{ stream_name }}__tmp_api) AS count_api, - (SELECT COUNT(*) FROM {{ source_name }}__{{ stream_name }}__tmp_api_contacts) AS count_contacts, - (SELECT COUNT(*) FROM {{ source_name }}__{{ stream_name }}__tmp_api_adresse) AS count_addresses + CAST('{{ run_started_at.strftime("%Y-%m-%d") }}' AS DATE) AS date_day, + '{{ source_name }}' AS source, + '{{ stream_name }}' AS stream, -- noqa: references.keywords + (SELECT COUNT(*) FROM {{ source(source_name, stream_name) }}) AS count_raw, + (SELECT COUNT(*) FROM {{ ref('stg_' ~ source_name ~ '__' ~ staging_name) }}) AS count_stg, + (SELECT COUNT(*) FROM {{ ref('int_' ~ source_name ~ '__' ~ stream_kind) }}) AS count_int, + (SELECT COUNT(*) FROM {{ source_name }}__{{ stream_name }}__tmp_marts) AS count_marts, + (SELECT COUNT(*) FROM {{ source_name }}__{{ stream_name }}__tmp_api) AS count_api, + (SELECT COUNT(*) FROM {{ source_name }}__{{ stream_name }}__tmp_api_contacts)AS count_contacts, + (SELECT COUNT(*) FROM {{ source_name }}__{{ stream_name }}__tmp_api_adresse) AS count_addresses -- noqa: enable=layout.spacing ), diff --git a/pipeline/dbt/snapshots/quality/snps_quality__stats.sql b/pipeline/dbt/snapshots/quality/snps_quality__stats.sql new file mode 100644 index 00000000..e6bcf41a --- /dev/null +++ b/pipeline/dbt/snapshots/quality/snps_quality__stats.sql @@ -0,0 +1,18 @@ +{% snapshot snps_quality__stats %} + +{{ + config( + target_schema='snapshots', + unique_key="source||'-'||stream", + strategy='timestamp', + updated_at='date_day', + invalidate_hard_deletes=True, + ) +}} + + SELECT + source || '-' || stream AS id, + * + FROM {{ ref('int_quality__stats') }} + +{% endsnapshot %} From 64562622951f7ec3f8c48f59c643e2987ae52311 Mon Sep 17 00:00:00 2001 From: Victor Perron Date: Tue, 6 Aug 2024 11:17:20 +0200 Subject: [PATCH 3/5] feat(pipeline) : Run and snapshot the monitoring data The table and its snapshots are ran after every API import. It's "overkill" but very simple and will keep the data up-to-date if we ever have more hourly changes in the future. Example output: data-inclusion=# SELECT id,date_day,source,stream,count_raw,count_stg,count_int,count_api,dbt_updated_at FROM snapshots.snps_quality__stats ORDER BY id, date_day; id | date_day | source | stream | count_raw | count_stg | count_int | count_api | dbt_updated_at -------------------------------------+------------+-----------------------+------------------------+-----------+-----------+-----------+-----------+---------------- action_logement-services | 2024-08-06 | action_logement | services | 26 | 23 | 2760 | 2760 | 2024-08-06 action_logement-services | 2024-08-07 | action_logement | services | 26 | 23 | 2760 | 2760 | 2024-08-07 action_logement-structures | 2024-08-06 | action_logement | structures | 123 | 120 | 120 | 120 | 2024-08-06 action_logement-structures | 2024-08-07 | action_logement | structures | 123 | 120 | 120 | 120 | 2024-08-07 agefiph-services | 2024-08-06 | agefiph | services | 31 | 31 | 27 | 27 | 2024-08-06 agefiph-services | 2024-08-07 | agefiph | services | 31 | 31 | 27 | 27 | 2024-08-07 cd35-organisations | 2024-08-06 | cd35 | organisations | 3545 | 3545 | 3545 | 3540 | 2024-08-06 cd35-organisations | 2024-08-07 | cd35 | organisations | 3545 | 3545 | 3545 | 3540 | 2024-08-07 cd72-services | 2024-08-06 | cd72 | services | 474 | 463 | 463 | 0 | 2024-08-06 cd72-services | 2024-08-07 | cd72 | services | 474 | 463 | 463 | 0 | 2024-08-07 cd72-structures | 2024-08-06 | cd72 | structures | 217 | 217 | 217 | 457 | 2024-08-06 cd72-structures | 2024-08-07 | cd72 | structures | 217 | 217 | 217 | 457 | 2024-08-07 data_inclusion-services | 2024-08-06 | data_inclusion | services | 47 | 44 | 44 | 44 | 2024-08-06 data_inclusion-services | 2024-08-07 | data_inclusion | services | 47 | 44 | 44 | 44 | 2024-08-07 data_inclusion-structures | 2024-08-06 | data_inclusion | structures | 22 | 19 | 19 | 19 | 2024-08-06 data_inclusion-structures | 2024-08-07 | data_inclusion | structures | 22 | 19 | 19 | 19 | 2024-08-07 dora-services | 2024-08-06 | dora | services | 17717 | 11707 | 11707 | 11034 | 2024-08-06 dora-services | 2024-08-07 | dora | services | 17717 | 11707 | 11707 | 11034 | 2024-08-07 dora-structures | 2024-08-06 | dora | structures | 8554 | 8554 | 8554 | 8538 | 2024-08-06 dora-structures | 2024-08-07 | dora | structures | 8554 | 8554 | 8554 | 8538 | 2024-08-07 No day-over-day change here as I'm using the same database between 2 runs. --- pipeline/dags/dag_utils/dbt.py | 3 ++ pipeline/dags/import_data_inclusion_api.py | 34 ++++++++++++++++++- .../snapshots/quality/snps_quality__stats.sql | 4 +-- 3 files changed, 38 insertions(+), 3 deletions(-) diff --git a/pipeline/dags/dag_utils/dbt.py b/pipeline/dags/dag_utils/dbt.py index 59fc67a8..b221c364 100644 --- a/pipeline/dags/dag_utils/dbt.py +++ b/pipeline/dags/dag_utils/dbt.py @@ -3,6 +3,7 @@ from airflow.models import Variable from airflow.operators import bash from airflow.utils.task_group import TaskGroup +from airflow.utils.trigger_rule import TriggerRule from dag_utils.sources import SOURCES_CONFIGS from dag_utils.virtualenvs import DBT_PYTHON_BIN_PATH @@ -13,6 +14,7 @@ def dbt_operator_factory( command: str, select: Optional[str] = None, exclude: Optional[str] = None, + trigger_rule: TriggerRule = TriggerRule.ALL_SUCCESS, ) -> bash.BashOperator: """A basic factory for bash operators operating dbt commands.""" @@ -37,6 +39,7 @@ def dbt_operator_factory( "POSTGRES_DB": "{{ conn.pg.schema }}", }, cwd=Variable.get("DBT_PROJECT_DIR"), + trigger_rule=trigger_rule, ) diff --git a/pipeline/dags/import_data_inclusion_api.py b/pipeline/dags/import_data_inclusion_api.py index 610a9e45..0eddec96 100644 --- a/pipeline/dags/import_data_inclusion_api.py +++ b/pipeline/dags/import_data_inclusion_api.py @@ -3,8 +3,10 @@ import airflow from airflow.decorators import task from airflow.operators import empty +from airflow.utils.trigger_rule import TriggerRule from dag_utils import date +from dag_utils.dbt import dbt_operator_factory from dag_utils.virtualenvs import PYTHON_BIN_PATH default_args = {} @@ -87,4 +89,34 @@ def import_data_inclusion_api(): start = empty.EmptyOperator(task_id="start") end = empty.EmptyOperator(task_id="end") - start >> import_data_inclusion_api() >> end + build_source_stats = dbt_operator_factory( + task_id="generate_source_stats", + command="build", + select="path:models/intermediate/quality", + ) + + snapshot_source_stats = dbt_operator_factory( + task_id="snapshot_source_stats", + command="snapshot", + select="quality", + # Let's snapshot the stats, regardless of the result of their data tests. + # The data stats tests may "fail" (for instance if some source data is + # 100% missing) and we will be notified, but the snapshot will still be + # generated and recorded. + # Don't snapshot though if the initial API import failed, which + # would result in `build_source_stats` to be skipped. + # In that case there is nothing to be snapshotted. + trigger_rule=TriggerRule.NONE_SKIPPED, + ) + + ( + start + >> import_data_inclusion_api() + # Will generate the daily stats 24 times a day. + # The same table will be generated, the snapshot won't + # be triggered except on day boundaries and it's fast. + # The alternative would be more complicated code. + >> build_source_stats + >> snapshot_source_stats + >> end + ) diff --git a/pipeline/dbt/snapshots/quality/snps_quality__stats.sql b/pipeline/dbt/snapshots/quality/snps_quality__stats.sql index e6bcf41a..4321ef6e 100644 --- a/pipeline/dbt/snapshots/quality/snps_quality__stats.sql +++ b/pipeline/dbt/snapshots/quality/snps_quality__stats.sql @@ -3,7 +3,7 @@ {{ config( target_schema='snapshots', - unique_key="source||'-'||stream", + unique_key="source||'--'||stream", strategy='timestamp', updated_at='date_day', invalidate_hard_deletes=True, @@ -11,7 +11,7 @@ }} SELECT - source || '-' || stream AS id, + source || '--' || stream AS id, * FROM {{ ref('int_quality__stats') }} From 5e7e2ba7cc82c3057d3daeae7d85e81870e6e90b Mon Sep 17 00:00:00 2001 From: Victor Perron Date: Mon, 12 Aug 2024 17:06:24 +0200 Subject: [PATCH 4/5] feat(pipeline) : Run stats & snapshots all the time We'll test and get notified about the data quality another time. --- pipeline/dags/import_data_inclusion_api.py | 10 +---- .../intermediate/quality/_quality_models.yml | 42 ------------------- 2 files changed, 2 insertions(+), 50 deletions(-) diff --git a/pipeline/dags/import_data_inclusion_api.py b/pipeline/dags/import_data_inclusion_api.py index 0eddec96..1c7ca7df 100644 --- a/pipeline/dags/import_data_inclusion_api.py +++ b/pipeline/dags/import_data_inclusion_api.py @@ -93,20 +93,14 @@ def import_data_inclusion_api(): task_id="generate_source_stats", command="build", select="path:models/intermediate/quality", + trigger_rule=TriggerRule.ALL_DONE, ) snapshot_source_stats = dbt_operator_factory( task_id="snapshot_source_stats", command="snapshot", select="quality", - # Let's snapshot the stats, regardless of the result of their data tests. - # The data stats tests may "fail" (for instance if some source data is - # 100% missing) and we will be notified, but the snapshot will still be - # generated and recorded. - # Don't snapshot though if the initial API import failed, which - # would result in `build_source_stats` to be skipped. - # In that case there is nothing to be snapshotted. - trigger_rule=TriggerRule.NONE_SKIPPED, + trigger_rule=TriggerRule.ALL_DONE, ) ( diff --git a/pipeline/dbt/models/intermediate/quality/_quality_models.yml b/pipeline/dbt/models/intermediate/quality/_quality_models.yml index 3ee13f20..15a5ced6 100644 --- a/pipeline/dbt/models/intermediate/quality/_quality_models.yml +++ b/pipeline/dbt/models/intermediate/quality/_quality_models.yml @@ -46,45 +46,3 @@ models: - services - siaes - structures - - - name: count_raw - data_tests: - - dbt_utils.accepted_range: - min_value: 0 - inclusive: false - - - name: count_stg - data_tests: - - dbt_utils.accepted_range: - min_value: 0 - inclusive: false - - - name: count_int - data_tests: - - dbt_utils.accepted_range: - min_value: 0 - inclusive: false - - - name: count_marts - data_tests: - - dbt_utils.accepted_range: - min_value: 0 - inclusive: false - - - name: count_api - data_tests: - - dbt_utils.accepted_range: - min_value: 0 - inclusive: false - - - name: count_contacts - data_tests: - - dbt_utils.accepted_range: - min_value: 0 - inclusive: true - - - name: count_addresses - data_tests: - - dbt_utils.accepted_range: - min_value: 0 - inclusive: true From a315cbf0af4ea7e6219ae8973795156db43256e7 Mon Sep 17 00:00:00 2001 From: Victor Perron Date: Mon, 12 Aug 2024 18:18:54 +0200 Subject: [PATCH 5/5] chore(pipeline) : Ensure quality models are not run after geocoding It's going to all change after Valentin's geocoding revamp anyway. Still I don't get why they get run. --- pipeline/dags/dag_utils/dbt.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pipeline/dags/dag_utils/dbt.py b/pipeline/dags/dag_utils/dbt.py index b221c364..60bd168a 100644 --- a/pipeline/dags/dag_utils/dbt.py +++ b/pipeline/dags/dag_utils/dbt.py @@ -127,4 +127,5 @@ def get_after_geocoding_tasks(): "marts", ] ), + exclude="path:modes/intermediate/quality", )