From 7b04ec16bd5500b51e4c1397231784f4cc59630e Mon Sep 17 00:00:00 2001 From: Valentin Matton Date: Fri, 13 Sep 2024 17:09:28 +0200 Subject: [PATCH] chore(pipeline): minor clean ups --- .../dags/import_decoupage_administratif.py | 109 ++++++++++++++++++ pipeline/dags/sync_cities.py | 90 --------------- pipeline/dbt/models/_sources.yml | 7 +- .../int__union_services__enhanced.sql | 2 +- .../_decoupage_administratif__models.yml | 12 +- .../stg_decoupage_administratif__communes.sql | 47 ++++---- ..._decoupage_administratif__departements.sql | 14 +++ ...g_decoupage_administratif__departments.sql | 9 -- .../stg_decoupage_administratif__regions.sql | 8 +- .../sources/odspep/stg_odspep__communes.sql | 10 +- .../odspep/stg_odspep__departements.sql | 4 +- .../sources/odspep/stg_odspep__regions.sql | 6 +- 12 files changed, 174 insertions(+), 144 deletions(-) create mode 100755 pipeline/dags/import_decoupage_administratif.py delete mode 100755 pipeline/dags/sync_cities.py create mode 100644 pipeline/dbt/models/staging/decoupage_administratif/stg_decoupage_administratif__departements.sql delete mode 100644 pipeline/dbt/models/staging/decoupage_administratif/stg_decoupage_administratif__departments.sql diff --git a/pipeline/dags/import_decoupage_administratif.py b/pipeline/dags/import_decoupage_administratif.py new file mode 100755 index 00000000..6c1797b8 --- /dev/null +++ b/pipeline/dags/import_decoupage_administratif.py @@ -0,0 +1,109 @@ +import pendulum + +from airflow.decorators import dag, task +from airflow.operators import empty + +from dag_utils import date, dbt, notifications +from dag_utils.virtualenvs import PYTHON_BIN_PATH + + +@task.external_python( + python=str(PYTHON_BIN_PATH), + retries=2, +) +def extract_and_load(): + import pandas as pd + import sqlalchemy as sqla + from furl import furl + + from dag_utils import pg + + base_url = furl("https://geo.api.gouv.fr") + # the default zone parameter is inconsistent between resources + # so we explicitely set it for all resources + base_url.set({"zone": ",".join(["metro", "drom", "com"])}) + URL_BY_RESOURCE = { + "regions": base_url / "regions", + "departements": base_url / "departements", + "epcis": base_url / "epcis", + "communes": (base_url / "communes").set( + { + # explicitely list retrieve fields + # to include the "center" field + "fields": ",".join( + [ + "nom", + "code", + "centre", + "codesPostaux", + "codeEpci", + "codeDepartement", + "codeRegion", + ] + ) + } + ), + } + + # arrondissements do not have a dedicated endpoint + # they are retrieved using an opt-in parameter + # on the communes endpoint + URL_BY_RESOURCE["arrondissements"] = ( + URL_BY_RESOURCE["communes"].copy().add({"type": "arrondissement-municipal"}) + ) + + schema = "decoupage_administratif" + pg.create_schema(schema) + + for resource, url in URL_BY_RESOURCE.items(): + print(f"Fetching resource={resource} from url={url}") + df = pd.read_json(str(url), dtype=False) + + fq_table_name = f"{schema}.{resource}" + print(f"Loading to {fq_table_name}") + with pg.connect_begin() as conn: + df.to_sql( + f"{resource}_tmp", + con=conn, + schema=schema, + if_exists="replace", + index=False, + dtype={ + "centre": sqla.JSON, + "codesPostaux": sqla.ARRAY(sqla.TEXT), + } + if resource in ["communes", "arrondissements"] + else None, + ) + conn.execute( + f"""\ + CREATE TABLE IF NOT EXISTS {fq_table_name} + (LIKE {fq_table_name}_tmp); + TRUNCATE {fq_table_name}; + INSERT INTO {fq_table_name} + (SELECT * FROM {fq_table_name}_tmp); + DROP TABLE {fq_table_name}_tmp; + """ + ) + + +@dag( + start_date=pendulum.datetime(2022, 1, 1, tz=date.TIME_ZONE), + default_args=notifications.notify_failure_args(), + schedule="@monthly", + catchup=False, +) +def import_decoupage_administratif(): + start = empty.EmptyOperator(task_id="start") + end = empty.EmptyOperator(task_id="end") + + dbt_build_staging = dbt.dbt_operator_factory( + task_id="dbt_build_staging", + command="build", + select="path:models/staging/decoupage_administratif", + ) + + start >> extract_and_load() >> dbt_build_staging >> end + + +import_decoupage_administratif() diff --git a/pipeline/dags/sync_cities.py b/pipeline/dags/sync_cities.py deleted file mode 100755 index 1eab0182..00000000 --- a/pipeline/dags/sync_cities.py +++ /dev/null @@ -1,90 +0,0 @@ -import pendulum - -from airflow.decorators import dag, task -from airflow.operators import empty - -from dag_utils import date, notifications -from dag_utils.virtualenvs import PYTHON_BIN_PATH - - -@task.external_python( - python=str(PYTHON_BIN_PATH), - retries=2, -) -def extract_load_from_api(run_id, logical_date): - import logging - - import pandas as pd - from furl import furl - from sqlalchemy import types - - from dag_utils import pg - - logger = logging.getLogger(__name__) - - city_params = { - "fields": ("nom,code,codesPostaux,codeDepartement,codeRegion,codeEpci,centre"), - "format": "json", - } - city_dtypes = { - "centre": types.JSON, - "codesPostaux": types.ARRAY(types.TEXT), - } - - for resource, query_params, table_name, dtypes in [ - ("departements", {"zone": "metro,drom,com"}, None, None), - ("regions", {"zone": "metro,drom,com"}, None, None), - ("epcis", None, None, None), - ("communes", city_params, "communes", city_dtypes), - ( - "communes", - city_params | {"type": "arrondissement-municipal"}, - "districts", - city_dtypes, - ), - ]: - url = ( - (furl("https://geo.api.gouv.fr") / resource) - .set(query_params=query_params) - .url - ) - logger.info(f"> fetching resource={resource} from url={url}") - df = pd.read_json(url, dtype=False) - with pg.connect_begin() as conn: - schema = "decoupage_administratif" - table_name = table_name or resource - df.to_sql( - f"{table_name}_tmp", - con=conn, - schema=schema, - if_exists="replace", - index=False, - dtype=dtypes, - ) - conn.execute( - f"""\ - CREATE TABLE IF NOT EXISTS {schema}.{table_name} - (LIKE {schema}.{table_name}_tmp); - TRUNCATE {schema}.{table_name}; - INSERT INTO {schema}.{table_name} - (SELECT * FROM {schema}.{table_name}_tmp); - DROP TABLE {schema}.{table_name}_tmp; - """ - ) - - -@dag( - start_date=pendulum.datetime(2022, 1, 1, tz=date.TIME_ZONE), - default_args=notifications.notify_failure_args(), - schedule="@monthly", - catchup=False, - tags=["source"], -) -def sync_cities(): - start = empty.EmptyOperator(task_id="start") - end = empty.EmptyOperator(task_id="end") - - (start >> extract_load_from_api() >> end) - - -sync_cities() diff --git a/pipeline/dbt/models/_sources.yml b/pipeline/dbt/models/_sources.yml index 6c8ff4ae..1ffa198e 100644 --- a/pipeline/dbt/models/_sources.yml +++ b/pipeline/dbt/models/_sources.yml @@ -211,9 +211,8 @@ sources: - name: decoupage_administratif schema: decoupage_administratif tables: - - name: communes - - name: districts + - name: regions - name: departements - name: epcis - - name: regions - + - name: communes + - name: arrondissements diff --git a/pipeline/dbt/models/intermediate/int__union_services__enhanced.sql b/pipeline/dbt/models/intermediate/int__union_services__enhanced.sql index 4521a971..a02857e2 100644 --- a/pipeline/dbt/models/intermediate/int__union_services__enhanced.sql +++ b/pipeline/dbt/models/intermediate/int__union_services__enhanced.sql @@ -11,7 +11,7 @@ adresses AS ( ), departements AS ( - SELECT * FROM {{ source('decoupage_administratif', 'departements') }} + SELECT * FROM {{ ref('stg_decoupage_administratif__departements') }} ), -- TODO: Refactoring needed to be able to do geocoding per source and then use the result in the mapping diff --git a/pipeline/dbt/models/staging/decoupage_administratif/_decoupage_administratif__models.yml b/pipeline/dbt/models/staging/decoupage_administratif/_decoupage_administratif__models.yml index 6ba0881d..664acfcf 100644 --- a/pipeline/dbt/models/staging/decoupage_administratif/_decoupage_administratif__models.yml +++ b/pipeline/dbt/models/staging/decoupage_administratif/_decoupage_administratif__models.yml @@ -5,10 +5,12 @@ models: columns: - name: code data_tests: + - not_null - dbt_utils.not_constant - dbt_utils.not_empty_string - name: nom data_tests: + - not_null - dbt_utils.not_constant - dbt_utils.not_empty_string @@ -16,13 +18,15 @@ models: columns: - name: code data_tests: + - not_null - dbt_utils.not_constant - dbt_utils.not_empty_string - name: nom data_tests: + - not_null - dbt_utils.not_constant - dbt_utils.not_empty_string - - name: region + - name: code_region data_tests: - not_null - dbt_utils.not_constant @@ -39,7 +43,7 @@ models: - not_null - dbt_utils.not_constant - dbt_utils.not_empty_string - - name: departement + - name: code_departement data_tests: - not_null - dbt_utils.not_constant @@ -47,7 +51,7 @@ models: - relationships: to: ref('stg_decoupage_administratif__departements') field: code - - name: region + - name: code_region data_tests: - not_null - dbt_utils.not_constant @@ -55,7 +59,7 @@ models: - relationships: to: ref('stg_decoupage_administratif__regions') field: code - - name: siren_epci + - name: code_epci data_tests: # NOTE(vperron): the test is against a source, since the EPCIs are # very rarely used yet and it does not seem worth it to create a diff --git a/pipeline/dbt/models/staging/decoupage_administratif/stg_decoupage_administratif__communes.sql b/pipeline/dbt/models/staging/decoupage_administratif/stg_decoupage_administratif__communes.sql index 4dd6c7e2..fa385dac 100644 --- a/pipeline/dbt/models/staging/decoupage_administratif/stg_decoupage_administratif__communes.sql +++ b/pipeline/dbt/models/staging/decoupage_administratif/stg_decoupage_administratif__communes.sql @@ -1,39 +1,40 @@ -WITH communes AS ( +WITH source_communes AS ( {{ stg_source_header('decoupage_administratif', 'communes') }} ), -districts AS ( - {{ stg_source_header('decoupage_administratif', 'districts') }} +source_arrondissements AS ( + {{ stg_source_header('decoupage_administratif', 'arrondissements') }} ), -clean_communes AS ( +communes AS ( SELECT - communes.code AS "code", - communes.nom AS "nom", - communes."codeRegion" AS "region", - communes."codeDepartement" AS "departement", - communes."codeEpci" AS "siren_epci", - ST_GEOMFROMGEOJSON(communes.centre) AS "centre", - communes."codesPostaux" AS "codes_postaux" - FROM communes + source_communes.code AS "code", + source_communes.nom AS "nom", + source_communes."codeRegion" AS "code_region", + source_communes."codeDepartement" AS "code_departement", + source_communes."codeEpci" AS "code_epci", + ST_GEOMFROMGEOJSON(source_communes.centre) AS "centre", + source_communes."codesPostaux" AS "codes_postaux" + FROM source_communes ), -clean_districts AS ( +arrondissements AS ( SELECT - districts.code AS "code", - districts.nom AS "nom", - districts."codeRegion" AS "region", - districts."codeDepartement" AS "departement", - NULL AS "siren_epci", - ST_GEOMFROMGEOJSON(districts.centre) AS "centre", - districts."codesPostaux" AS "codes_postaux" - FROM districts + source_arrondissements.code AS "code", + source_arrondissements.nom AS "nom", + source_arrondissements."codeRegion" AS "code_region", + source_arrondissements."codeDepartement" AS "code_departement", + NULL AS "code_epci", + ST_GEOMFROMGEOJSON(source_arrondissements.centre) AS "centre", + source_arrondissements."codesPostaux" AS "codes_postaux" + FROM source_arrondissements ), final AS ( - SELECT * FROM clean_communes + SELECT * FROM communes UNION ALL - SELECT * FROM clean_districts + SELECT * FROM arrondissements + ORDER BY code ) SELECT * FROM final diff --git a/pipeline/dbt/models/staging/decoupage_administratif/stg_decoupage_administratif__departements.sql b/pipeline/dbt/models/staging/decoupage_administratif/stg_decoupage_administratif__departements.sql new file mode 100644 index 00000000..0c59e169 --- /dev/null +++ b/pipeline/dbt/models/staging/decoupage_administratif/stg_decoupage_administratif__departements.sql @@ -0,0 +1,14 @@ +WITH source AS ( + {{ stg_source_header('decoupage_administratif', 'departements') }} +), + +final AS ( + SELECT + code AS "code", + nom AS "nom", + "codeRegion" AS "code_region" + FROM source + ORDER BY code +) + +SELECT * FROM final diff --git a/pipeline/dbt/models/staging/decoupage_administratif/stg_decoupage_administratif__departments.sql b/pipeline/dbt/models/staging/decoupage_administratif/stg_decoupage_administratif__departments.sql deleted file mode 100644 index de906880..00000000 --- a/pipeline/dbt/models/staging/decoupage_administratif/stg_decoupage_administratif__departments.sql +++ /dev/null @@ -1,9 +0,0 @@ -WITH departements AS ( - {{ stg_source_header('decoupage_administratif', 'departements') }} -), - -final AS ( - SELECT * FROM departements ORDER BY code -) - -SELECT * FROM final diff --git a/pipeline/dbt/models/staging/decoupage_administratif/stg_decoupage_administratif__regions.sql b/pipeline/dbt/models/staging/decoupage_administratif/stg_decoupage_administratif__regions.sql index 7b6530ef..95c6ae3d 100644 --- a/pipeline/dbt/models/staging/decoupage_administratif/stg_decoupage_administratif__regions.sql +++ b/pipeline/dbt/models/staging/decoupage_administratif/stg_decoupage_administratif__regions.sql @@ -1,9 +1,13 @@ -WITH regions AS ( +WITH source AS ( {{ stg_source_header('decoupage_administratif', 'regions') }} ), final AS ( - SELECT * FROM regions ORDER BY nom + SELECT + code AS "code", + nom AS "nom" + FROM source + ORDER BY code ) SELECT * FROM final diff --git a/pipeline/dbt/models/staging/sources/odspep/stg_odspep__communes.sql b/pipeline/dbt/models/staging/sources/odspep/stg_odspep__communes.sql index 778d3631..08502540 100644 --- a/pipeline/dbt/models/staging/sources/odspep/stg_odspep__communes.sql +++ b/pipeline/dbt/models/staging/sources/odspep/stg_odspep__communes.sql @@ -16,11 +16,11 @@ final AS ( SELECT - source."ID_COM" AS "id", - source."ID_COM", - source."ID_RES", - source."CODE_COMMUNE_COM", - communes.nom AS "libelle" + source."ID_COM" AS "id", + source."ID_COM" AS "id_com", + source."ID_RES" AS "id_res", + source."CODE_COMMUNE_COM" AS "code_commune_com", + communes.nom AS "libelle" FROM source LEFT JOIN communes ON source."CODE_COMMUNE_COM" = communes.code ) diff --git a/pipeline/dbt/models/staging/sources/odspep/stg_odspep__departements.sql b/pipeline/dbt/models/staging/sources/odspep/stg_odspep__departements.sql index eafba6fe..a239b15d 100644 --- a/pipeline/dbt/models/staging/sources/odspep/stg_odspep__departements.sql +++ b/pipeline/dbt/models/staging/sources/odspep/stg_odspep__departements.sql @@ -2,7 +2,7 @@ {% set table_exists = adapter.get_relation(database=source_model.database, schema=source_model.schema, identifier=source_model.name) is not none %} --- depends_on: {{ source('decoupage_administratif', 'departements') }} +-- depends_on: {{ ref('stg_decoupage_administratif__departements') }} {% if table_exists %} @@ -11,7 +11,7 @@ ), departements AS ( - SELECT * FROM {{ source('decoupage_administratif', 'departements') }} + SELECT * FROM {{ ref('stg_decoupage_administratif__departements') }} ), final AS ( diff --git a/pipeline/dbt/models/staging/sources/odspep/stg_odspep__regions.sql b/pipeline/dbt/models/staging/sources/odspep/stg_odspep__regions.sql index f3143a6d..4ac0bda1 100644 --- a/pipeline/dbt/models/staging/sources/odspep/stg_odspep__regions.sql +++ b/pipeline/dbt/models/staging/sources/odspep/stg_odspep__regions.sql @@ -10,7 +10,7 @@ ) is not none) -%} --- depends_on: {{ source('decoupage_administratif', 'regions') }} +-- depends_on: {{ ref('stg_decoupage_administratif__regions') }} {% if tables_exist %} @@ -23,7 +23,7 @@ ), regions AS ( - SELECT * FROM {{ source('decoupage_administratif', 'regions') }} + SELECT * FROM {{ ref('stg_decoupage_administratif__regions') }} ), final AS ( @@ -34,10 +34,8 @@ source."CODE_REGION_REG" AS "code_region_reg", 'RĂ©gion' AS "zone_diffusion_type", regions.nom AS "libelle" - FROM source LEFT JOIN regions ON source."CODE_REGION_REG" = regions.code - ) SELECT * FROM final