Skip to content

Commit

Permalink
chore(pipeline): minor clean ups
Browse files Browse the repository at this point in the history
  • Loading branch information
vmttn committed Sep 13, 2024
1 parent 6af47ea commit 3367584
Show file tree
Hide file tree
Showing 12 changed files with 174 additions and 144 deletions.
109 changes: 109 additions & 0 deletions pipeline/dags/import_decoupage_administratif.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import pendulum

from airflow.decorators import dag, task
from airflow.operators import empty

from dag_utils import date, dbt, notifications
from dag_utils.virtualenvs import PYTHON_BIN_PATH


@task.external_python(
python=str(PYTHON_BIN_PATH),
retries=2,
)
def extract_and_load():
import pandas as pd
import sqlalchemy as sqla
from furl import furl

from dag_utils import pg

base_url = furl("https://geo.api.gouv.fr")
# the default zone parameter is inconsistent between resources
# so we explicitely set it for all resources
base_url.set({"zone": ",".join(["metro", "drom", "com"])})
URL_BY_RESOURCE = {
"regions": base_url / "regions",
"departements": base_url / "departements",
"epcis": base_url / "epcis",
"communes": (base_url / "communes").set(
{
# explicitely list retrieve fields
# to include the "center" field
"fields": ",".join(
[
"nom",
"code",
"centre",
"codesPostaux",
"codeEpci",
"codeDepartement",
"codeRegion",
]
)
}
),
}

# arrondissements do not have a dedicated endpoint
# they are retrieved using an opt-in parameter
# on the communes endpoint
URL_BY_RESOURCE["arrondissements"] = (
URL_BY_RESOURCE["communes"].copy().add({"type": "arrondissement-municipal"})
)

schema = "decoupage_administratif"
pg.create_schema(schema)

for resource, url in URL_BY_RESOURCE.items():
print(f"Fetching resource={resource} from url={url}")
df = pd.read_json(str(url), dtype=False)

fq_table_name = f"{schema}.{resource}"
print(f"Loading to {fq_table_name}")
with pg.connect_begin() as conn:
df.to_sql(
f"{resource}_tmp",
con=conn,
schema=schema,
if_exists="replace",
index=False,
dtype={
"centre": sqla.JSON,
"codesPostaux": sqla.ARRAY(sqla.TEXT),
}
if resource in ["communes", "arrondissements"]
else None,
)
conn.execute(
f"""\
CREATE TABLE IF NOT EXISTS {fq_table_name}
(LIKE {fq_table_name}_tmp);
TRUNCATE {fq_table_name};
INSERT INTO {fq_table_name}
(SELECT * FROM {fq_table_name}_tmp);
DROP TABLE {fq_table_name}_tmp;
"""
)


@dag(
start_date=pendulum.datetime(2022, 1, 1, tz=date.TIME_ZONE),
default_args=notifications.notify_failure_args(),
schedule="@monthly",
catchup=False,
)
def import_decoupage_administratif():
start = empty.EmptyOperator(task_id="start")
end = empty.EmptyOperator(task_id="end")

dbt_build_staging = dbt.dbt_operator_factory(
task_id="dbt_build_staging",
command="build",
select="path:models/staging/decoupage_administratif",
)

start >> extract_and_load() >> dbt_build_staging >> end


import_decoupage_administratif()
90 changes: 0 additions & 90 deletions pipeline/dags/sync_cities.py

This file was deleted.

7 changes: 3 additions & 4 deletions pipeline/dbt/models/_sources.yml
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,8 @@ sources:
- name: decoupage_administratif
schema: decoupage_administratif
tables:
- name: communes
- name: districts
- name: regions
- name: departements
- name: epcis
- name: regions

- name: communes
- name: arrondissements
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ adresses AS (
),

departements AS (
SELECT * FROM {{ source('decoupage_administratif', 'departements') }}
SELECT * FROM {{ ref('stg_decoupage_administratif__departements') }}
),

-- TODO: Refactoring needed to be able to do geocoding per source and then use the result in the mapping
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,28 @@ models:
columns:
- name: code
data_tests:
- not_null
- dbt_utils.not_constant
- dbt_utils.not_empty_string
- name: nom
data_tests:
- not_null
- dbt_utils.not_constant
- dbt_utils.not_empty_string

- name: stg_decoupage_administratif__departements
columns:
- name: code
data_tests:
- not_null
- dbt_utils.not_constant
- dbt_utils.not_empty_string
- name: nom
data_tests:
- not_null
- dbt_utils.not_constant
- dbt_utils.not_empty_string
- name: region
- name: code_region
data_tests:
- not_null
- dbt_utils.not_constant
Expand All @@ -39,23 +43,23 @@ models:
- not_null
- dbt_utils.not_constant
- dbt_utils.not_empty_string
- name: departement
- name: code_departement
data_tests:
- not_null
- dbt_utils.not_constant
- dbt_utils.not_empty_string
- relationships:
to: ref('stg_decoupage_administratif__departements')
field: code
- name: region
- name: code_region
data_tests:
- not_null
- dbt_utils.not_constant
- dbt_utils.not_empty_string
- relationships:
to: ref('stg_decoupage_administratif__regions')
field: code
- name: siren_epci
- name: code_epci
data_tests:
# NOTE(vperron): the test is against a source, since the EPCIs are
# very rarely used yet and it does not seem worth it to create a
Expand Down
Original file line number Diff line number Diff line change
@@ -1,39 +1,40 @@
WITH communes AS (
WITH source_communes AS (
{{ stg_source_header('decoupage_administratif', 'communes') }}
),

districts AS (
{{ stg_source_header('decoupage_administratif', 'districts') }}
source_arrondissements AS (
{{ stg_source_header('decoupage_administratif', 'arrondissements') }}
),

clean_communes AS (
communes AS (
SELECT
communes.code AS "code",
communes.nom AS "nom",
communes."codeRegion" AS "region",
communes."codeDepartement" AS "departement",
communes."codeEpci" AS "siren_epci",
ST_GEOMFROMGEOJSON(communes.centre) AS "centre",
communes."codesPostaux" AS "codes_postaux"
FROM communes
source_communes.code AS "code",
source_communes.nom AS "nom",
source_communes."codeRegion" AS "code_region",
source_communes."codeDepartement" AS "code_departement",
source_communes."codeEpci" AS "code_epci",
ST_GEOMFROMGEOJSON(source_communes.centre) AS "centre",
source_communes."codesPostaux" AS "codes_postaux"
FROM source_communes
),

clean_districts AS (
arrondissements AS (
SELECT
districts.code AS "code",
districts.nom AS "nom",
districts."codeRegion" AS "region",
districts."codeDepartement" AS "departement",
NULL AS "siren_epci",
ST_GEOMFROMGEOJSON(districts.centre) AS "centre",
districts."codesPostaux" AS "codes_postaux"
FROM districts
source_arrondissements.code AS "code",
source_arrondissements.nom AS "nom",
source_arrondissements."codeRegion" AS "code_region",
source_arrondissements."codeDepartement" AS "code_departement",
NULL AS "code_epci",
ST_GEOMFROMGEOJSON(source_arrondissements.centre) AS "centre",
source_arrondissements."codesPostaux" AS "codes_postaux"
FROM source_arrondissements
),

final AS (
SELECT * FROM clean_communes
SELECT * FROM communes
UNION ALL
SELECT * FROM clean_districts
SELECT * FROM arrondissements
ORDER BY code
)

SELECT * FROM final
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
WITH source AS (
{{ stg_source_header('decoupage_administratif', 'departements') }}
),

final AS (
SELECT
code AS "code",
nom AS "nom",
"codeRegion" AS "code_region"
FROM source
ORDER BY code
)

SELECT * FROM final

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
WITH regions AS (
WITH source AS (
{{ stg_source_header('decoupage_administratif', 'regions') }}
),

final AS (
SELECT * FROM regions ORDER BY nom
SELECT
code AS "code",
nom AS "nom"
FROM source
ORDER BY code
)

SELECT * FROM final
Loading

0 comments on commit 3367584

Please sign in to comment.