Skip to content

Commit

Permalink
__wip : Attempt a version of the stats without magic
Browse files Browse the repository at this point in the history
  • Loading branch information
vperron committed Jul 26, 2024
1 parent f43a451 commit e81d717
Show file tree
Hide file tree
Showing 2 changed files with 450 additions and 117 deletions.
73 changes: 10 additions & 63 deletions pipeline/dbt/macros/quality.sql
Original file line number Diff line number Diff line change
@@ -1,72 +1,19 @@
{% macro table_exists(model, schema, identifier) %}
{% set relation = adapter.get_relation(
database=model.database,
schema=schema or model.schema,
identifier=identifier or model.name) -%}
{% do return(relation is not none) %}
{% macro current_paris_date() %}
CURRENT_DATE AT TIME ZONE 'Europe/Paris'
{% endmacro %}

{% macro stat_name(model) %}
{% set stat_name = model.schema ~ "_" ~ model.name ~ "_stats" -%}
{% do return(stat_name) %}
{% macro count_marts(source_slug, stream_kind) %}
SELECT COUNT(*) FROM public_marts.marts_inclusion__{{ stream_kind }} WHERE source = '{{ source_slug }}'
{% endmacro %}

{% macro source_or_service(stream) %}
{% set mappings = ({
"agences":"structure",
"aides":"service",
"benefits":"service",
"creches": "structure",
"DD009_RES_PARTENARIALE": "structure",
"etablissements": "structure",
"formations": "service",
"garages":"structure",
"institutions":"structure",
"lieux":"structure",
"organisations":"structure",
"siaes":"structure",
"services":"service",
"structures":"structure"
})
-%}
{% set mapped_name = mappings.get(stream) -%}
{% do return(mapped_name) %}
{% macro count_api(source_slug, stream_kind) %}
SELECT COUNT(*) FROM public.api__{{ stream_kind }} WHERE source = '{{ source_slug }}'
{% endmacro %}

{% macro intermediate_name(model) %}
{% set int_name = "int_" ~ model.schema ~ "__" ~ source_or_service(model.name) ~ "s" %}
{% do return(int_name) %}
{% macro count_contacts(source_slug, stream_kind) %}
SELECT COUNT(*) FROM public.api__{{ stream_kind }} WHERE source = '{{ source_slug }}'
{% endmacro %}

{% macro source_stats(source) %}
{%- if not execute -%}
{{ return('') }}
{% endif -%}
{% set staging_name = "stg_" ~ source.schema ~ "__" ~ source.name %}
{% set intermediate_name = intermediate_name(source) %}
{% set stream_kind = source_or_service(source.name) %}
{% set source_slug = source.schema | replace("_", "-") %}
{{ stat_name(source) }} as (
SELECT
CURRENT_DATE AT TIME ZONE 'Europe/Paris' as date,
'{{source.schema}}' as source,
'{{source.name}}' as stream,
COUNT(*) as count_raw,
{% if table_exists(source, "public_staging", staging_name) %}
(SELECT COUNT(*) FROM public_staging.{{ staging_name }}) as count_stg,
{% else %}
-1 as count_stg,
{% endif %}
{% if table_exists(source, "public_intermediate", intermediate_name) %}
(SELECT COUNT(*) FROM public_intermediate.{{ intermediate_name }}) as count_int,
{% else %}
-1 as count_int,
{% endif %}
(SELECT COUNT(*) FROM public_marts.marts_inclusion__{{ stream_kind }}s WHERE source = '{{ source_slug }}') as count_marts,
(SELECT COUNT(*) FROM public.api__{{ stream_kind }}s WHERE source = '{{ source_slug }}') as count_api,
(SELECT COUNT(*) FROM public.api__{{ stream_kind }}s WHERE source = '{{ source_slug }}' AND courriel IS NOT NULL AND telephone IS NOT NULL) as nb_contacts,
(SELECT COUNT(*) FROM public.api__{{ stream_kind }}s WHERE source = '{{ source_slug }}' AND (adresse IS NOT NULL OR commune IS NOT NULL OR code_postal IS NOT NULL or code_insee IS NOT NULL)) as nb_addresses
from
{{ source }}
)
{% macro count_addresses(source_slug, stream_kind) %}
SELECT COUNT(*) FROM public.api__{{ stream_kind }} WHERE source = '{{ source_slug }}'
{% endmacro %}
Loading

0 comments on commit e81d717

Please sign in to comment.