From fe51ae00de97e3ed07e5f780d7ee7a9dd8853a5c Mon Sep 17 00:00:00 2001 From: Victor Perron Date: Fri, 30 Aug 2024 16:36:16 +0200 Subject: [PATCH] feat(api) : Use a single source of truth for the communes We now get all of our data from the DWH. The services, structures and communes are all downloaded from the same place and deemed coherent. --- api/requirements/requirements.txt | 20 ---- api/setup.py | 1 - api/src/data_inclusion/api/cli.py | 7 -- .../api/code_officiel_geo/commands.py | 99 ------------------- .../api/code_officiel_geo/constants.py | 67 ------------- .../api/code_officiel_geo/models.py | 85 ---------------- .../api/inclusion_data/commands.py | 30 +++--- .../api/inclusion_data/models.py | 17 +++- .../api/inclusion_data/routes.py | 4 - pipeline/dags/dag_utils/marts.py | 12 ++- 10 files changed, 42 insertions(+), 300 deletions(-) delete mode 100644 api/src/data_inclusion/api/code_officiel_geo/commands.py delete mode 100644 api/src/data_inclusion/api/code_officiel_geo/models.py diff --git a/api/requirements/requirements.txt b/api/requirements/requirements.txt index 5795201c8..c633d279b 100644 --- a/api/requirements/requirements.txt +++ b/api/requirements/requirements.txt @@ -15,8 +15,6 @@ argon2-cffi-bindings==21.2.0 # via argon2-cffi attrs==23.2.0 # via fiona -brotli==1.1.0 - # via py7zr certifi==2024.2.2 # via # data-inclusion-api (setup.py) @@ -93,8 +91,6 @@ idna==3.7 # email-validator # httpx # requests -inflate64==1.0.0 - # via py7zr jinja2==3.1.4 # via data-inclusion-api (setup.py) mako==1.3.3 @@ -105,8 +101,6 @@ markupsafe==2.1.5 # mako minio==7.2.5 # via data-inclusion-api (setup.py) -multivolumefile==0.2.3 - # via py7zr numpy==1.26.4 # via # data-inclusion-api (setup.py) @@ -124,26 +118,18 @@ pandas==2.2.2 # via # data-inclusion-api (setup.py) # geopandas -psutil==5.9.8 - # via py7zr psycopg2==2.9.9 # via data-inclusion-api (setup.py) -py7zr==0.21.0 - # via data-inclusion-api (setup.py) pyarrow==16.0.0 # via data-inclusion-api (setup.py) pyasn1==0.6.0 # via # python-jose # rsa -pybcj==1.0.2 - # via py7zr pycparser==2.22 # via cffi pycryptodome==3.20.0 # via minio -pycryptodomex==3.20.0 - # via py7zr pydantic==2.7.0 # via # data-inclusion-api (setup.py) @@ -155,8 +141,6 @@ pydantic-core==2.18.1 # via pydantic pydantic-settings==2.2.1 # via data-inclusion-api (setup.py) -pyppmd==1.1.0 - # via py7zr pyproj==3.6.1 # via geopandas python-dateutil==2.9.0.post0 @@ -176,8 +160,6 @@ pytz==2024.1 # pandas pyyaml==6.0.1 # via uvicorn -pyzstd==0.15.10 - # via py7zr requests==2.31.0 # via data-inclusion-api (setup.py) rsa==4.9 @@ -204,8 +186,6 @@ sqlalchemy==2.0.29 # geoalchemy2 starlette==0.37.2 # via fastapi -texttable==1.7.0 - # via py7zr tqdm==4.66.2 # via data-inclusion-api (setup.py) typing-extensions==4.11.0 diff --git a/api/setup.py b/api/setup.py index b821ff63a..48726ef00 100644 --- a/api/setup.py +++ b/api/setup.py @@ -27,7 +27,6 @@ "numpy", "pandas", "psycopg2", - "py7zr", "pyarrow", "pydantic[email]>=2.5.0", "pydantic-settings", diff --git a/api/src/data_inclusion/api/cli.py b/api/src/data_inclusion/api/cli.py index da48e4013..db09eb6fd 100644 --- a/api/src/data_inclusion/api/cli.py +++ b/api/src/data_inclusion/api/cli.py @@ -3,7 +3,6 @@ import click from data_inclusion.api import auth -from data_inclusion.api.code_officiel_geo.commands import import_admin_express from data_inclusion.api.inclusion_data.commands import load_inclusion_data logger = logging.getLogger(__name__) @@ -34,12 +33,6 @@ def _generate_token_for_user( click.echo(auth.create_access_token(subject=email, admin=admin)) -@cli.command(name="import_admin_express") -def _import_admin_express(): - """Import the latest Admin Express COG database""" - click.echo(import_admin_express()) - - @cli.command(name="load_inclusion_data") def _load_inclusion_data(): """Load the latest inclusion data""" diff --git a/api/src/data_inclusion/api/code_officiel_geo/commands.py b/api/src/data_inclusion/api/code_officiel_geo/commands.py deleted file mode 100644 index 09eb9f5cc..000000000 --- a/api/src/data_inclusion/api/code_officiel_geo/commands.py +++ /dev/null @@ -1,99 +0,0 @@ -import logging -import pathlib -import tempfile - -import geopandas -import httpx -import py7zr -from sqlalchemy.dialects.postgresql import insert -from tqdm import tqdm - -from data_inclusion.api.code_officiel_geo import models -from data_inclusion.api.core import db - -logger = logging.getLogger(__name__) - -IGN_ADMIN_EXPRESS_FILE_URL = "http://files.opendatarchives.fr/professionnels.ign.fr/adminexpress/ADMIN-EXPRESS_3-1__SHP__FRA_WM_2022-09-20.7z" - - -# TODO(vmttn): use https://geo.api.gouv.fr/ - - -def download(url: str, output_path: pathlib.Path): - with httpx.stream("GET", url) as response: - total = int(response.headers["Content-Length"]) - - response.raise_for_status() - - with output_path.open("wb") as fp: - with tqdm( - total=total, unit_scale=True, unit_divisor=1024, unit="B" - ) as progress: - num_bytes_downloaded = response.num_bytes_downloaded - for chunck in response.iter_bytes(chunk_size=32768): - fp.write(chunck) - progress.update( - response.num_bytes_downloaded - num_bytes_downloaded - ) - num_bytes_downloaded = response.num_bytes_downloaded - - -def load_communes(file_path: pathlib.Path): - for i in tqdm(range(100)): - chunck_df = geopandas.read_file(file_path, rows=slice(1000 * i, 1000 * (i + 1))) - - if len(chunck_df) == 0: - break - - chunck_df = chunck_df.rename( - columns={ - "INSEE_COM": "code", - "NOM": "nom", - "INSEE_DEP": "departement", - "INSEE_REG": "region", - "SIREN_EPCI": "siren_epci", - } - ) - - chunck_df = chunck_df.rename_geometry("geom") - - column_names = ["code", "nom", "departement", "region", "siren_epci", "geom"] - chunck_df = chunck_df[column_names] - - chunck_df = chunck_df.to_wkt() - - # optimize future lookups - chunck_df = chunck_df.sort_values(by="code") - - commune_data_list = chunck_df.to_dict(orient="records") - - stmt = insert(models.Commune).values(commune_data_list) - - stmt = stmt.on_conflict_do_update( - index_elements=[models.Commune.code], - set_={col: getattr(stmt.excluded, col) for col in column_names}, - ) - - with db.SessionLocal() as session: - session.execute(stmt) - session.commit() - - -def import_admin_express(): - with tempfile.TemporaryDirectory() as tmp_dir_name: - tmp_dir_path = pathlib.Path(tmp_dir_name) - archive_path = tmp_dir_path / "ign_admin_express.7z" - extract_dir_path = tmp_dir_path - logger.info(f"Downloading to {archive_path}") - - download(IGN_ADMIN_EXPRESS_FILE_URL, archive_path) - - logger.info(f"Extracting to {extract_dir_path}") - - with py7zr.SevenZipFile(archive_path, "r") as archive: - archive.extractall(extract_dir_path) - - logger.info("Loading communes") - - communes_file_path = next(tmp_dir_path.glob("**/COMMUNE.shp")) - load_communes(communes_file_path) diff --git a/api/src/data_inclusion/api/code_officiel_geo/constants.py b/api/src/data_inclusion/api/code_officiel_geo/constants.py index c50f20785..8bdbe8a1b 100644 --- a/api/src/data_inclusion/api/code_officiel_geo/constants.py +++ b/api/src/data_inclusion/api/code_officiel_geo/constants.py @@ -162,70 +162,3 @@ class RegionEnum(Enum): "RegionCodeEnum", {member.name: member.value.code for member in RegionEnum}, ) - - -# based on -# https://github.com/gip-inclusion/dora-back/blob/main/dora/admin_express/utils.py - -_DISTRICTS_BY_CITY = { - # Paris - "75056": [ - "75101", - "75102", - "75103", - "75104", - "75105", - "75106", - "75107", - "75108", - "75109", - "75110", - "75111", - "75112", - "75113", - "75114", - "75115", - "75116", - "75117", - "75118", - "75119", - "75120", - ], - # Lyon - "69123": [ - "69381", - "69382", - "69383", - "69384", - "69385", - "69386", - "69387", - "69388", - "69389", - ], - # Marseille - "13055": [ - "13201", - "13202", - "13203", - "13204", - "13205", - "13206", - "13207", - "13208", - "13209", - "13210", - "13211", - "13212", - "13213", - "13214", - "13215", - "13216", - ], -} - -CODE_COMMUNE_BY_CODE_ARRONDISSEMENT = { - code_arrondissement: code_commune - for code_commune, codes_arrondissements in _DISTRICTS_BY_CITY.items() - for code_arrondissement in codes_arrondissements -} diff --git a/api/src/data_inclusion/api/code_officiel_geo/models.py b/api/src/data_inclusion/api/code_officiel_geo/models.py deleted file mode 100644 index 58154e466..000000000 --- a/api/src/data_inclusion/api/code_officiel_geo/models.py +++ /dev/null @@ -1,85 +0,0 @@ -import geoalchemy2 -import sqlalchemy as sqla -from sqlalchemy.orm import Mapped, mapped_column - -from data_inclusion.api.core.db import Base - -# all fields are nullable or have a default value. These models will only be used to -# query valid data coming from the data pipeline. - - -class Commune(Base): - code: Mapped[str] = mapped_column(primary_key=True) - nom: Mapped[str] - departement: Mapped[str] - region: Mapped[str] - siren_epci: Mapped[str] - geom = mapped_column( - geoalchemy2.Geometry("Geometry", srid=4326, spatial_index=False) - ) - - __table_args__ = ( - sqla.Index( - "ix_api__communes__geography", - sqla.text("CAST(ST_Simplify(geom, 0.01) AS geography(geometry, 4326))"), - ), - ) - - def __repr__(self) -> str: - return f"" - - -class EPCI(Base): - code: Mapped[str] = mapped_column(primary_key=True) - nom: Mapped[str] - nature: Mapped[str] - geom = mapped_column( - geoalchemy2.Geometry("Geometry", srid=4326, spatial_index=False) - ) - - __table_args__ = ( - sqla.Index( - "ix_api__epcis__geography", - sqla.text("CAST(ST_Simplify(geom, 0.01) AS geography(geometry, 4326))"), - ), - ) - - def __repr__(self) -> str: - return f"" - - -class Departement(Base): - code: Mapped[str] = mapped_column(primary_key=True) - nom: Mapped[str] - insee_reg: Mapped[str] - geom = mapped_column( - geoalchemy2.Geometry("Geometry", srid=4326, spatial_index=False) - ) - - __table_args__ = ( - sqla.Index( - "ix_api__departements__geography", - sqla.text("CAST(ST_Simplify(geom, 0.01) AS geography(geometry, 4326))"), - ), - ) - - def __repr__(self) -> str: - return f"" - - -class Region(Base): - code: Mapped[str] = mapped_column(primary_key=True) - nom: Mapped[str] - geom = mapped_column( - geoalchemy2.Geometry("Geometry", srid=4326, spatial_index=False) - ) - - __table_args__ = ( - sqla.Index( - "ix_api__regions__geography", - sqla.text("CAST(ST_Simplify(geom, 0.01) AS geography(geometry, 4326))"), - ), - ) - - def __repr__(self) -> str: - return f"" diff --git a/api/src/data_inclusion/api/inclusion_data/commands.py b/api/src/data_inclusion/api/inclusion_data/commands.py index 0e89635ad..1cd0b5079 100644 --- a/api/src/data_inclusion/api/inclusion_data/commands.py +++ b/api/src/data_inclusion/api/inclusion_data/commands.py @@ -11,7 +11,6 @@ from tqdm import tqdm from data_inclusion import schema -from data_inclusion.api.code_officiel_geo import constants from data_inclusion.api.config import settings from data_inclusion.api.core import db from data_inclusion.api.inclusion_data import models @@ -120,17 +119,11 @@ def load_inclusion_data(): structures_df = df_by_ressource["structures"] services_df = df_by_ressource["services"] + communes_df = df_by_ressource["communes"] structures_df = structures_df.replace({np.nan: None}) services_df = services_df.replace({np.nan: None}) - - # TODO(vperron) : To remove when we handle the city districts - structures_df = structures_df.assign( - code_insee=structures_df.code_insee.apply(clean_up_code_insee), - ) - services_df = services_df.assign( - code_insee=services_df.code_insee.apply(clean_up_code_insee), - ) + communes_df = communes_df.replace({np.nan: None}) structures_df = structures_df.drop(columns=["_di_geocodage_score"]) services_df = services_df.drop(columns=["_di_geocodage_score"]) @@ -156,6 +149,7 @@ def load_inclusion_data(): ) ] + commune_data_list = communes_df.to_dict(orient="records") structure_data_list = structures_df.to_dict(orient="records") service_data_list = services_df.to_dict(orient="records") @@ -163,6 +157,19 @@ def load_inclusion_data(): with db.SessionLocal() as session: session.execute(sqla.delete(models.Service)) session.execute(sqla.delete(models.Structure)) + session.execute(sqla.delete(models.Commune)) + + for commune_data in tqdm(commune_data_list): + commune_instance = models.commune(**commune_data) + try: + with session.begin_nested(): + session.add(commune_instance) + except sqla.exc.IntegrityError as exc: + logger.error( + f"commune source={commune_data['source']} " + f"code={commune_data['code']}" + ) + logger.info(exc.orig) for structure_data in tqdm(structure_data_list): structure_instance = models.Structure(**structure_data) @@ -192,14 +199,11 @@ def load_inclusion_data(): with db.default_db_engine.connect().execution_options( isolation_level="AUTOCOMMIT" ) as connection: + connection.execute(sqla.text("VACUUM ANALYZE api__communes")) connection.execute(sqla.text("VACUUM ANALYZE api__structures")) connection.execute(sqla.text("VACUUM ANALYZE api__services")) -def clean_up_code_insee(v) -> str | None: - return constants.CODE_COMMUNE_BY_CODE_ARRONDISSEMENT.get(v, v) - - def validate_data(model_schema, data): try: model_schema(**data) diff --git a/api/src/data_inclusion/api/inclusion_data/models.py b/api/src/data_inclusion/api/inclusion_data/models.py index 9d49fa362..ee8f1133a 100644 --- a/api/src/data_inclusion/api/inclusion_data/models.py +++ b/api/src/data_inclusion/api/inclusion_data/models.py @@ -1,15 +1,30 @@ from datetime import date +import geoalchemy2 import sqlalchemy as sqla from sqlalchemy.orm import Mapped, mapped_column, relationship -from data_inclusion.api.code_officiel_geo.models import Commune from data_inclusion.api.core.db import Base # all fields are nullable or have a default value. These models will only be used to # query valid data coming from the data pipeline. +class Commune(Base): + code: Mapped[str] = mapped_column(primary_key=True) + nom: Mapped[str] + departement: Mapped[str] + region: Mapped[str] + siren_epci: Mapped[str] + centre = mapped_column( + geoalchemy2.Geometry("Geometry", srid=4326, spatial_index=True) + ) + codes_postaux: Mapped[list[str]] + + def __repr__(self) -> str: + return f"" + + class Structure(Base): # internal metadata _di_surrogate_id: Mapped[str] = mapped_column(primary_key=True) diff --git a/api/src/data_inclusion/api/inclusion_data/routes.py b/api/src/data_inclusion/api/inclusion_data/routes.py index d833944ae..fd0e0d4df 100644 --- a/api/src/data_inclusion/api/inclusion_data/routes.py +++ b/api/src/data_inclusion/api/inclusion_data/routes.py @@ -7,7 +7,6 @@ from data_inclusion import schema as di_schema from data_inclusion.api import auth from data_inclusion.api.code_officiel_geo.constants import ( - CODE_COMMUNE_BY_CODE_ARRONDISSEMENT, DepartementCodeEnum, DepartementSlugEnum, RegionCodeEnum, @@ -412,9 +411,6 @@ def search_services_endpoint( commune_instance = None search_point = None if code_commune is not None: - code_commune = CODE_COMMUNE_BY_CODE_ARRONDISSEMENT.get( - code_commune, code_commune - ) commune_instance = db_session.get(Commune, code_commune) if commune_instance is None: raise fastapi.HTTPException( diff --git a/pipeline/dags/dag_utils/marts.py b/pipeline/dags/dag_utils/marts.py index f93f61922..5dd4df0ab 100644 --- a/pipeline/dags/dag_utils/marts.py +++ b/pipeline/dags/dag_utils/marts.py @@ -19,9 +19,15 @@ def _export_di_dataset_to_s3(logical_date, run_id): prefix = f"data/marts/{date.local_date_str(logical_date)}/{run_id}/" - for ressource in ["structures", "services"]: - key = f"{prefix}{ressource}.parquet" - query = f"SELECT * FROM public_marts.marts_inclusion__{ressource}" + RESOURCES = { + "structures": "public_marts.marts_inclusion__structures", + "services": "public_marts.marts_inclusion__services", + "communes": "decoupage_administratif.communes", + } + + for resource_name, sql_table in RESOURCES.items(): + key = f"{prefix}{resource_name}.parquet" + query = f"SELECT * FROM {sql_table}" logger.info("downloading data from query='%s'", query) df = pg_hook.get_pandas_df(sql=query)