From 9d1ff3fbd9a5bff80d6d12b6bafa105119d627c2 Mon Sep 17 00:00:00 2001 From: Valentin Matton Date: Mon, 12 Aug 2024 09:23:27 +0200 Subject: [PATCH 1/2] chore(pipeline): geocode as a dbt model The trick is to leverage plpython to do the geocoding inside the database. Doing so, geocoding can now be modelled as a dbt model and orchestrate as such. The geocoding implementation has been moved to an actual package maintained next to the datawarehouse image. The plpython udf simply wraps the call. Is it worth it ? * it heavely simplifies the flow and set clearer concerns between airflow and dbt. Dbt does the transformation, airflow orchestrate it. * less error prone since we do not have to pull data from the db and map it to python ourselves. * we can even leverage dbt to to the geocoding incrementally on inputs that have not been seen before. This will drastically reduce our carbon footprint... There are a few enhancements we would probably want : * obviously clean up * define a macro with the geocoding model that can be used for all sources * re-do geocodings for relatively old inputs * do the geocoding per source ? --- datawarehouse/.dockerignore | 13 ++- datawarehouse/Dockerfile | 11 +- datawarehouse/processings/Makefile | 18 +++ datawarehouse/processings/pyproject.toml | 46 ++++++++ .../requirements/dev-requirements.txt | 48 ++++++++ .../processings/requirements/requirements.txt | 28 +++++ .../requirements/test-requirements.txt | 36 ++++++ .../data_inclusion/processings/__init__.py | 5 + .../src/data_inclusion/processings/geocode.py | 102 +++++++++++++++++ .../processings/tests/integration/__init__.py | 0 .../tests/integration/test_geocoding.py | 62 ++++++++++ datawarehouse/requirements/requirements.in | 2 - datawarehouse/requirements/requirements.txt | 56 --------- pipeline/dbt/macros/create_udfs.sql | 4 + pipeline/dbt/macros/udfs/udf__geocode.sql | 33 ++++++ pipeline/dbt/models/intermediate/_models.yml | 106 ++++++++++++++++++ .../models/intermediate/int__geocodages.sql | 63 +++++++++++ .../int__union_adresses__enhanced.sql | 59 ++++------ 18 files changed, 590 insertions(+), 102 deletions(-) create mode 100644 datawarehouse/processings/Makefile create mode 100644 datawarehouse/processings/pyproject.toml create mode 100644 datawarehouse/processings/requirements/dev-requirements.txt create mode 100644 datawarehouse/processings/requirements/requirements.txt create mode 100644 datawarehouse/processings/requirements/test-requirements.txt create mode 100644 datawarehouse/processings/src/data_inclusion/processings/__init__.py create mode 100644 datawarehouse/processings/src/data_inclusion/processings/geocode.py create mode 100644 datawarehouse/processings/tests/integration/__init__.py create mode 100644 datawarehouse/processings/tests/integration/test_geocoding.py delete mode 100644 datawarehouse/requirements/requirements.in delete mode 100644 datawarehouse/requirements/requirements.txt create mode 100644 pipeline/dbt/macros/udfs/udf__geocode.sql create mode 100644 pipeline/dbt/models/intermediate/int__geocodages.sql diff --git a/datawarehouse/.dockerignore b/datawarehouse/.dockerignore index 7a4893146..4dee7d3a5 100644 --- a/datawarehouse/.dockerignore +++ b/datawarehouse/.dockerignore @@ -1,4 +1,15 @@ * !/docker-entrypoint-initdb.d -!/requirements \ No newline at end of file +!/processings + +# Python +**/*.py[cod] +**/__pycache__/ +**/.venv/ +**/build/ +**/dist/ +**/.tox/ +**/.pytest_cache/ +**/*.egg-info/ +**/db.sqlite3 diff --git a/datawarehouse/Dockerfile b/datawarehouse/Dockerfile index 348d60818..2a53b5ad9 100644 --- a/datawarehouse/Dockerfile +++ b/datawarehouse/Dockerfile @@ -17,10 +17,13 @@ RUN apt-get update \ && apt-get clean -y \ && rm -rf /var/lib/apt/lists/* +RUN python3.11 -m venv ${VIRTUAL_ENV} +RUN pip install --no-cache-dir --upgrade pip setuptools wheel + COPY ./docker-entrypoint-initdb.d /docker-entrypoint-initdb.d -RUN python3.11 -m venv ${VIRTUAL_ENV} +COPY processings/requirements processings/requirements +RUN pip install --no-cache-dir -r processings/requirements/requirements.txt -COPY requirements requirements -RUN pip install --no-cache-dir --upgrade pip setuptools wheel -RUN pip install --no-cache-dir -r requirements/requirements.txt +COPY processings processings +RUN pip install --no-cache-dir -e processings diff --git a/datawarehouse/processings/Makefile b/datawarehouse/processings/Makefile new file mode 100644 index 000000000..e12d4bdc3 --- /dev/null +++ b/datawarehouse/processings/Makefile @@ -0,0 +1,18 @@ +PIP_COMPILE := pipx run uv pip compile pyproject.toml --quiet + +ifeq ($(filter upgrade,$(MAKECMDGOALS)),upgrade) +PIP_COMPILE += --upgrade +endif + +.PHONY: all dev base test uv upgrade + +all: base dev test + +base: + $(PIP_COMPILE) --output-file=requirements/requirements.txt + +dev: + $(PIP_COMPILE) --extra=dev --output-file=requirements/dev-requirements.txt + +test: + $(PIP_COMPILE) --extra=test --output-file=requirements/test-requirements.txt diff --git a/datawarehouse/processings/pyproject.toml b/datawarehouse/processings/pyproject.toml new file mode 100644 index 000000000..98c5ca38f --- /dev/null +++ b/datawarehouse/processings/pyproject.toml @@ -0,0 +1,46 @@ +[build-system] +build-backend = "setuptools.build_meta" +requires = ["setuptools", "wheel"] + +[project] +name = "data-inclusion-processings" +version = "0.1.0" +dependencies = [ + "numpy~=2.0", + "pandas~=2.2", + "requests~=2.31", + "tenacity", +] + +[project.optional-dependencies] +dev = [ + "pre-commit", + "ruff", +] +test = [ + "pytest", +] + +[tool.setuptools] +package-dir = {"" = "src"} + +[tool.ruff.lint] +# see prefixes in https://beta.ruff.rs/docs/rules/ +select = [ + "F", # pyflakes + "E", # pycodestyle errors + "W", # pycodestyle warnings + "I", # isort + "UP", # pyupgrade + "S", # bandit +] + +[tool.ruff.lint.isort] +combine-as-imports = true +known-first-party = ["data_inclusion"] + +[tool.pytest.ini_options] +testpaths = "tests" +markers = ''' + ban_api: mark test as requiring the base base adresse nationale api +''' diff --git a/datawarehouse/processings/requirements/dev-requirements.txt b/datawarehouse/processings/requirements/dev-requirements.txt new file mode 100644 index 000000000..45d049a96 --- /dev/null +++ b/datawarehouse/processings/requirements/dev-requirements.txt @@ -0,0 +1,48 @@ +# This file was autogenerated by uv via the following command: +# uv pip compile pyproject.toml --extra=dev --output-file=requirements/dev-requirements.txt +certifi==2024.8.30 + # via requests +cfgv==3.4.0 + # via pre-commit +charset-normalizer==3.3.2 + # via requests +distlib==0.3.8 + # via virtualenv +filelock==3.16.0 + # via virtualenv +identify==2.6.0 + # via pre-commit +idna==3.8 + # via requests +nodeenv==1.9.1 + # via pre-commit +numpy==2.1.1 + # via + # data-inclusion-processings (pyproject.toml) + # pandas +pandas==2.2.2 + # via data-inclusion-processings (pyproject.toml) +platformdirs==4.3.2 + # via virtualenv +pre-commit==3.8.0 + # via data-inclusion-processings (pyproject.toml) +python-dateutil==2.9.0.post0 + # via pandas +pytz==2024.2 + # via pandas +pyyaml==6.0.2 + # via pre-commit +requests==2.32.3 + # via data-inclusion-processings (pyproject.toml) +ruff==0.6.4 + # via data-inclusion-processings (pyproject.toml) +six==1.16.0 + # via python-dateutil +tenacity==9.0.0 + # via data-inclusion-processings (pyproject.toml) +tzdata==2024.1 + # via pandas +urllib3==2.2.2 + # via requests +virtualenv==20.26.4 + # via pre-commit diff --git a/datawarehouse/processings/requirements/requirements.txt b/datawarehouse/processings/requirements/requirements.txt new file mode 100644 index 000000000..c4e17e6b9 --- /dev/null +++ b/datawarehouse/processings/requirements/requirements.txt @@ -0,0 +1,28 @@ +# This file was autogenerated by uv via the following command: +# uv pip compile pyproject.toml --output-file=requirements/requirements.txt +certifi==2024.8.30 + # via requests +charset-normalizer==3.3.2 + # via requests +idna==3.8 + # via requests +numpy==2.1.1 + # via + # data-inclusion-processings (pyproject.toml) + # pandas +pandas==2.2.2 + # via data-inclusion-processings (pyproject.toml) +python-dateutil==2.9.0.post0 + # via pandas +pytz==2024.2 + # via pandas +requests==2.32.3 + # via data-inclusion-processings (pyproject.toml) +six==1.16.0 + # via python-dateutil +tenacity==9.0.0 + # via data-inclusion-processings (pyproject.toml) +tzdata==2024.1 + # via pandas +urllib3==2.2.2 + # via requests diff --git a/datawarehouse/processings/requirements/test-requirements.txt b/datawarehouse/processings/requirements/test-requirements.txt new file mode 100644 index 000000000..24517c072 --- /dev/null +++ b/datawarehouse/processings/requirements/test-requirements.txt @@ -0,0 +1,36 @@ +# This file was autogenerated by uv via the following command: +# uv pip compile pyproject.toml --extra=test --output-file=requirements/test-requirements.txt +certifi==2024.8.30 + # via requests +charset-normalizer==3.3.2 + # via requests +idna==3.8 + # via requests +iniconfig==2.0.0 + # via pytest +numpy==2.1.1 + # via + # data-inclusion-processings (pyproject.toml) + # pandas +packaging==24.1 + # via pytest +pandas==2.2.2 + # via data-inclusion-processings (pyproject.toml) +pluggy==1.5.0 + # via pytest +pytest==8.3.3 + # via data-inclusion-processings (pyproject.toml) +python-dateutil==2.9.0.post0 + # via pandas +pytz==2024.2 + # via pandas +requests==2.32.3 + # via data-inclusion-processings (pyproject.toml) +six==1.16.0 + # via python-dateutil +tenacity==9.0.0 + # via data-inclusion-processings (pyproject.toml) +tzdata==2024.1 + # via pandas +urllib3==2.2.2 + # via requests diff --git a/datawarehouse/processings/src/data_inclusion/processings/__init__.py b/datawarehouse/processings/src/data_inclusion/processings/__init__.py new file mode 100644 index 000000000..92b402191 --- /dev/null +++ b/datawarehouse/processings/src/data_inclusion/processings/__init__.py @@ -0,0 +1,5 @@ +from data_inclusion.processings.geocode import geocode + +__all__ = [ + "geocode", +] diff --git a/datawarehouse/processings/src/data_inclusion/processings/geocode.py b/datawarehouse/processings/src/data_inclusion/processings/geocode.py new file mode 100644 index 000000000..4faa31e99 --- /dev/null +++ b/datawarehouse/processings/src/data_inclusion/processings/geocode.py @@ -0,0 +1,102 @@ +import csv +import io +import logging +from dataclasses import dataclass + +import numpy as np +import pandas as pd +import requests + +logger = logging.getLogger(__name__) + + +def _geocode(df: pd.DataFrame) -> pd.DataFrame: + logger.info("Will send address batch, dimensions=%s", df.shape) + with io.BytesIO() as buf: + df.to_csv(buf, index=False, quoting=csv.QUOTE_ALL, sep="|") + + try: + response = requests.post( + "https://api-adresse.data.gouv.fr/search/csv/", + files={"data": ("data.csv", buf.getvalue(), "text/csv")}, + data={ + "columns": ["adresse", "code_postal", "commune"], + # Post-filter on the INSEE code and not the zipcode. + # Explanations from the BAN API creators: + # The postcode is problematic for cities with multiple zipcodes + # if the supplied zipcode is wrong, or the one in the BAN is. + # The INSEE code is more stable, unique and reliable. + # Also this post-filter does not return "possible" results, + # it blindly filters-out. + "postcode": "code_postal", + }, + timeout=180, # we upload 2MB of data, so we need a high timeout + ) + response.raise_for_status() + except requests.RequestException as e: + logger.info("Error while fetching `%s`: %s", e.request.url, e) + return pd.DataFrame() + + with io.StringIO() as f: + f.write(response.text) + f.seek(0) + results_df = pd.read_csv( + f, + encoding_errors="replace", + on_bad_lines="warn", + dtype=str, + sep="|", + ) + results_df = results_df.replace({np.nan: None}) + + logger.info("Got result for address batch, dimensions=%s", results_df.shape) + return results_df + + +@dataclass +class GeocodeInput: + id: str + adresse: str + code_insee: str + code_postal: str + commune: str + + +def geocode( + data: GeocodeInput | list[GeocodeInput], + batch_size: int = 20_000, +) -> pd.DataFrame: + # BAN api limits the batch geocoding to 50MB of data + # In our tests, 10_000 rows is about 1MB; but we'll be conservative + # since we also want to avoid upload timeouts. + + data = data if isinstance(data, list) else [data] + df = pd.DataFrame.from_records(data) + + # drop rows that have not at least one commune, code_insee or code_postal + # as the result can't make sense. + # Note that we keep the rows without an address, as they can be used to + # at least resolve the city. + df = df.dropna(subset=["code_postal", "code_insee", "commune"], thresh=2) + + # Cleanup the values a bit to help the BAN's scoring. After experimentation, + # looking for "Ville-Nouvelle" returns worse results than "Ville Nouvelle", + # probably due to a tokenization in the BAN that favors spaces. + # In the same fashion, looking for "U.R.S.S." returns worse results than using + # "URSS" for "Avenue de l'U.R.S.S.". With the dots, it does not find the + # street at all ¯\_(ツ)_/¯ + df = df.assign( + adresse=df.adresse.str.strip().replace("-", " ").replace(".", ""), + commune=df.commune.str.strip(), + ) + + logger.info(f"Only {len(df)} rows can be geocoded.") + + return ( + df.groupby( + np.arange(len(df)) // batch_size, + group_keys=False, + ) + .apply(_geocode) + .to_dict(orient="records") + ) diff --git a/datawarehouse/processings/tests/integration/__init__.py b/datawarehouse/processings/tests/integration/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/datawarehouse/processings/tests/integration/test_geocoding.py b/datawarehouse/processings/tests/integration/test_geocoding.py new file mode 100644 index 000000000..da17ac140 --- /dev/null +++ b/datawarehouse/processings/tests/integration/test_geocoding.py @@ -0,0 +1,62 @@ +from unittest.mock import ANY + +import pytest + +from data_inclusion.processings import geocode + +pytestmark = pytest.mark.ban_api + + +@pytest.mark.parametrize( + ("adresse", "expected"), + [ + ( + { + "id": "1", + "adresse": "17 rue Malus", + "code_postal": "59000", + "code_insee": "59350", + "commune": "Lille", + }, + { + "id": "1", + "adresse": "17 rue Malus", + "code_postal": "59000", + "code_insee": "59350", + "commune": "Lille", + "latitude": "50.627078", + "longitude": "3.067372", + "result_label": "17 Rue Malus 59000 Lille", + "result_score": ANY, + "result_score_next": None, + "result_type": "housenumber", + "result_id": "59350_5835_00017", + "result_housenumber": "17", + "result_name": "17 Rue Malus", + "result_street": "Rue Malus", + "result_postcode": "59000", + "result_city": "Lille", + "result_context": "59, Nord, Hauts-de-France", + "result_citycode": "59350", + "result_oldcitycode": "59350", + "result_oldcity": "Lille", + "result_district": None, + "result_status": "ok", + }, + ), + ( + { + "id": "2", + "adresse": None, + "code_postal": None, + "code_insee": None, + "commune": None, + }, + None, + ), + ], +) +def test_ban_geocode(adresse: dict, expected: dict): + result_df = geocode(data=adresse) + + assert result_df == ([expected] if expected is not None else []) diff --git a/datawarehouse/requirements/requirements.in b/datawarehouse/requirements/requirements.in deleted file mode 100644 index 3d535537b..000000000 --- a/datawarehouse/requirements/requirements.in +++ /dev/null @@ -1,2 +0,0 @@ -requests~=2.31 -trafilatura~=1.6 \ No newline at end of file diff --git a/datawarehouse/requirements/requirements.txt b/datawarehouse/requirements/requirements.txt deleted file mode 100644 index f8a7bf25f..000000000 --- a/datawarehouse/requirements/requirements.txt +++ /dev/null @@ -1,56 +0,0 @@ -# -# This file is autogenerated by pip-compile with Python 3.10 -# by the following command: -# -# pip-compile requirements/requirements.in -# -certifi==2023.7.22 - # via - # requests - # trafilatura -charset-normalizer==3.2.0 - # via - # htmldate - # requests - # trafilatura -courlan==0.9.4 - # via trafilatura -dateparser==1.1.8 - # via htmldate -htmldate==1.5.1 - # via trafilatura -idna==3.4 - # via requests -justext==3.0.0 - # via trafilatura -langcodes==3.3.0 - # via courlan -lxml==4.9.3 - # via - # htmldate - # justext - # trafilatura -python-dateutil==2.8.2 - # via - # dateparser - # htmldate -pytz==2023.3.post1 - # via dateparser -regex==2023.8.8 - # via dateparser -requests==2.31.0 - # via -r requirements/requirements.in -six==1.16.0 - # via python-dateutil -tld==0.13 - # via courlan -trafilatura==1.6.2 - # via -r requirements/requirements.in -tzlocal==5.0.1 - # via dateparser -urllib3==2.0.4 - # via - # courlan - # htmldate - # requests - # trafilatura diff --git a/pipeline/dbt/macros/create_udfs.sql b/pipeline/dbt/macros/create_udfs.sql index 4e875d1ce..e8263217e 100644 --- a/pipeline/dbt/macros/create_udfs.sql +++ b/pipeline/dbt/macros/create_udfs.sql @@ -9,6 +9,10 @@ Another way would be to use the `on-run-start` hook, but it does not play nicely {% set sql %} +CREATE SCHEMA IF NOT EXISTS processings; + +{{ udf__geocode() }} + {{ create_udf_soliguide__new_hours_to_osm_opening_hours() }} {{ create_udf__common_checks() }} {{ create_udf__service_checks() }} diff --git a/pipeline/dbt/macros/udfs/udf__geocode.sql b/pipeline/dbt/macros/udfs/udf__geocode.sql new file mode 100644 index 000000000..ee78abaf9 --- /dev/null +++ b/pipeline/dbt/macros/udfs/udf__geocode.sql @@ -0,0 +1,33 @@ +{% macro udf__geocode() %} + +DROP FUNCTION IF EXISTS processings.geocode; + +CREATE OR REPLACE FUNCTION processings.geocode(data JSONB) +RETURNS + TABLE( + id TEXT, + result_score FLOAT, + result_label TEXT, + result_city TEXT, + result_type TEXT, + result_citycode TEXT, + result_postcode TEXT, + result_name TEXT, + longitude FLOAT, + latitude FLOAT + ) +AS $$ + +import json + +from data_inclusion import processings + +return ( + processings.geocode(data=json.loads(data)) + if data is not None + else [] +) + +$$ LANGUAGE plpython3u; + +{% endmacro %} diff --git a/pipeline/dbt/models/intermediate/_models.yml b/pipeline/dbt/models/intermediate/_models.yml index 6675631fd..4356f352d 100644 --- a/pipeline/dbt/models/intermediate/_models.yml +++ b/pipeline/dbt/models/intermediate/_models.yml @@ -69,3 +69,109 @@ models: * geocoded addresses * email with pii flag + + - name: int__geocodages + description: | + Geocoding results for all sources. + + This model is incremental, it will only geocode new or changed addresses. + It stores raw geocoding results, without filtering. + + Geocoding is done by calling the BAN api in PL/Python. + columns: + - name: geocoded_at + data_tests: + - not_null + - name: adresse_id + data_tests: + - unique + - not_null + - relationships: + to: ref('int__union_adresses') + field: _di_surrogate_id + - name: input_adresse + - name: input_code_insee + - name: input_code_postal + - name: input_commune + - name: commune + data_tests: + - dbt_utils.not_empty_string + - name: adresse + data_tests: + - dbt_utils.not_empty_string + - name: code_postal + data_tests: + - dbt_utils.not_empty_string + - name: code_insee + data_tests: + - dbt_utils.not_empty_string + - name: latitude + - name: longitude + - name: score + data_tests: + - not_null: + config: + severity: warn + - name: type + data_tests: + - not_null: + config: + severity: warn + - accepted_values: + values: + - housenumber + - street + - locality + - municipality + +unit_tests: + - name: test_geocodages_full_refresh_mode + model: int__geocodages + overrides: + macros: + is_incremental: false + given: + - input: ref('int__union_adresses') + rows: + - {_di_surrogate_id: foo, adresse: 17 rue Malus, commune: Lille, code_postal: 59000, code_insee: 59350} + expect: + rows: + - {adresse_id: foo} + + - name: test_geocodages_incremental_mode + # - row `unchanged` was previously geocoded and has not changed. It must not be re-geocoded. + # - row `failed` was previously geocoded but failed. It must be re-geocoded. + # - rows `changed-` were previously geocoded, but INPUT have changed. They must be re-geocoded. + # - row `new` was not previously geocoded. It must be geocoded. + model: int__geocodages + description: | + Test that geocoding is incremental + overrides: + macros: + is_incremental: true + given: + - input: ref('int__union_adresses') + rows: + - {_di_surrogate_id: unchanged, adresse: 17 rue Malus, code_postal: 59000, code_insee: 59350, commune: Lille} + - {_di_surrogate_id: failed, adresse: 17 rue Malus, code_postal: 59000, code_insee: 59350, commune: Lille} + - {_di_surrogate_id: changed-adresse, adresse: changed, code_postal: 59000, code_insee: 59350, commune: Lille} + - {_di_surrogate_id: changed-code-postal, adresse: 17 rue Malus, code_postal: changed, code_insee: 59350, commune: Lille} + - {_di_surrogate_id: changed-code-insee, adresse: 17 rue Malus, code_postal: 59000, code_insee: changed, commune: Lille} + - {_di_surrogate_id: changed-commune, adresse: 17 rue Malus, code_postal: 59000, code_insee: 59350, commune: changed} + - {_di_surrogate_id: new, adresse: 17 rue Malus, code_postal: 59000, code_insee: 59350, commune: Lille} + - input: this + rows: + - {adresse_id: unchanged, input_adresse: 17 rue Malus, input_code_postal: 59000, input_code_insee: 59350, input_commune: Lille, score: 1} + - {adresse_id: failed, input_adresse: 17 rue Malus, input_code_postal: 59000, input_code_insee: 59350, input_commune: Lille, score: null} + - {adresse_id: changed-adresse, input_adresse: 17 rue Malus, input_code_postal: 59000, input_code_insee: 59350, input_commune: Lille, score: 1} + - {adresse_id: changed-code-postal, input_adresse: 17 rue Malus, input_code_postal: 59000, input_code_insee: 59350, input_commune: Lille, score: 1} + - {adresse_id: changed-code-insee, input_adresse: 17 rue Malus, input_code_postal: 59000, input_code_insee: 59350, input_commune: Lille, score: 1} + - {adresse_id: changed-commune, input_adresse: 17 rue Malus, input_code_postal: 59000, input_code_insee: 59350, input_commune: Lille, score: 1} + expect: + rows: + - {adresse_id: failed} + - {adresse_id: changed-adresse} + - {adresse_id: changed-code-postal} + - {adresse_id: changed-code-insee} + - {adresse_id: changed-commune} + - {adresse_id: new} diff --git a/pipeline/dbt/models/intermediate/int__geocodages.sql b/pipeline/dbt/models/intermediate/int__geocodages.sql new file mode 100644 index 000000000..dadd4f78d --- /dev/null +++ b/pipeline/dbt/models/intermediate/int__geocodages.sql @@ -0,0 +1,63 @@ +{{ + config( + materialized="incremental", + unique_key="adresse_id", + ) +}} + +WITH adresses AS ( + SELECT * FROM {{ ref('int__union_adresses') }} +), + +final AS ( + SELECT + CAST('{{ run_started_at }}' AS TIMESTAMP) AS "geocoded_at", + adresses._di_surrogate_id AS "adresse_id", + adresses.adresse AS "input_adresse", + adresses.code_postal AS "input_code_postal", + adresses.code_insee AS "input_code_insee", + adresses.commune AS "input_commune", + geocodings.result_city AS "commune", + geocodings.result_name AS "adresse", + geocodings.result_postcode AS "code_postal", + geocodings.result_citycode AS "code_insee", + geocodings.result_score AS "score", + geocodings.result_type AS "type", + geocodings.longitude AS "longitude", + geocodings.latitude AS "latitude" + FROM + adresses + INNER JOIN processings.geocode( + ( + SELECT + JSONB_AGG( + JSONB_OBJECT( + ARRAY[ + 'id', adresses._di_surrogate_id, + 'adresse', adresses.adresse, + 'code_postal', adresses.code_postal, + 'code_insee', adresses.code_insee, + 'commune', adresses.commune + ] + ) + ) + FROM adresses + {% if is_incremental() %} + -- then only geocode new or changed rows + LEFT JOIN {{ this }} ON adresses._di_surrogate_id = {{ this }}.adresse_id + WHERE + -- new rows + {{ this }}.adresse_id IS NULL + -- previously failed rows + OR {{ this }}.score IS NULL + -- changed rows + OR {{ this }}.input_adresse != adresses.adresse + OR {{ this }}.input_code_postal != adresses.code_postal + OR {{ this }}.input_code_insee != adresses.code_insee + OR {{ this }}.input_commune != adresses.commune + {% endif %} + ) + ) AS geocodings ON adresses._di_surrogate_id = geocodings.id +) + +SELECT * FROM final diff --git a/pipeline/dbt/models/intermediate/int__union_adresses__enhanced.sql b/pipeline/dbt/models/intermediate/int__union_adresses__enhanced.sql index cb534f270..11a14d77f 100644 --- a/pipeline/dbt/models/intermediate/int__union_adresses__enhanced.sql +++ b/pipeline/dbt/models/intermediate/int__union_adresses__enhanced.sql @@ -2,51 +2,32 @@ WITH adresses AS ( SELECT * FROM {{ ref('int__union_adresses') }} ), -valid_adresses AS ( - SELECT adresses.* - FROM adresses - LEFT JOIN - LATERAL - LIST_ADRESSE_ERRORS( - adresses.adresse, - adresses.code_insee, - adresses.code_postal, - adresses.commune, - adresses.complement_adresse, - adresses.id, - adresses.latitude, - adresses.longitude, - adresses.source - ) AS errors ON TRUE - WHERE errors.field IS NULL -), - -geocoded_results AS ( - SELECT * FROM {{ ref('int_extra__geocoded_results') }} +geocodages AS ( + SELECT * FROM {{ ref('int__geocodages') }} ), final AS ( SELECT - valid_adresses._di_surrogate_id AS "_di_surrogate_id", - valid_adresses.id AS "id", - valid_adresses.source AS "source", - valid_adresses.complement_adresse AS "complement_adresse", + adresses._di_surrogate_id AS "_di_surrogate_id", + adresses.id AS "id", + adresses.source AS "source", + adresses.complement_adresse AS "complement_adresse", CASE - WHEN geocoded_results.result_type = 'municipality' - THEN valid_adresses.adresse - ELSE COALESCE(geocoded_results.result_name, valid_adresses.adresse) - END AS "adresse", - COALESCE(geocoded_results.longitude, valid_adresses.longitude) AS "longitude", - COALESCE(geocoded_results.latitude, valid_adresses.latitude) AS "latitude", - COALESCE(geocoded_results.result_city, valid_adresses.commune) AS "commune", - COALESCE(geocoded_results.result_postcode, valid_adresses.code_postal) AS "code_postal", - COALESCE(geocoded_results.result_citycode, valid_adresses.code_insee) AS "code_insee", - geocoded_results.result_score AS "result_score" - FROM valid_adresses - LEFT JOIN geocoded_results + WHEN geocodages.type = 'municipality' + THEN adresses.adresse + ELSE COALESCE(geocodages.adresse, adresses.adresse) + END AS "adresse", + COALESCE(geocodages.longitude, adresses.longitude) AS "longitude", + COALESCE(geocodages.latitude, adresses.latitude) AS "latitude", + COALESCE(geocodages.commune, adresses.commune) AS "commune", + COALESCE(geocodages.code_postal, adresses.code_postal) AS "code_postal", + COALESCE(geocodages.code_insee, adresses.code_insee) AS "code_insee", + geocodages.score AS "score_geocodage" + FROM adresses + LEFT JOIN geocodages ON - valid_adresses._di_surrogate_id = geocoded_results._di_surrogate_id - AND geocoded_results.result_score >= 0.8 + adresses._di_surrogate_id = geocodages.adresse_id + AND geocodages.score >= 0.8 ) SELECT * FROM final From d8b3f5a58b31c3757ec8081b122117a61373c430 Mon Sep 17 00:00:00 2001 From: Valentin Matton Date: Fri, 13 Sep 2024 18:29:34 +0200 Subject: [PATCH 2/2] chore(pipeline): clean up old geocoding task --- .../api/inclusion_data/commands.py | 3 - pipeline/dags/compute_hourly.py | 6 +- pipeline/dags/dag_utils/dbt.py | 18 +-- pipeline/dags/dag_utils/geocoding.py | 125 ------------------ pipeline/dags/main.py | 75 +---------- pipeline/dbt/macros/create_udfs.sql | 2 +- .../intermediate/extra/_extra__models.yml | 3 - .../extra/int_extra__geocoded_results.sql | 48 ------- .../int__union_adresses__enhanced.sql | 21 ++- .../int__union_services__enhanced.sql | 3 +- .../int__union_structures__enhanced.sql | 1 - .../marts/inclusion/_inclusion_models.yml | 4 - pipeline/tests/integration/__init__.py | 0 pipeline/tests/integration/test_geocoding.py | 73 ---------- 14 files changed, 31 insertions(+), 351 deletions(-) delete mode 100644 pipeline/dags/dag_utils/geocoding.py delete mode 100644 pipeline/dbt/models/intermediate/extra/int_extra__geocoded_results.sql delete mode 100644 pipeline/tests/integration/__init__.py delete mode 100644 pipeline/tests/integration/test_geocoding.py diff --git a/api/src/data_inclusion/api/inclusion_data/commands.py b/api/src/data_inclusion/api/inclusion_data/commands.py index b0c047b00..608e8b5fc 100644 --- a/api/src/data_inclusion/api/inclusion_data/commands.py +++ b/api/src/data_inclusion/api/inclusion_data/commands.py @@ -123,9 +123,6 @@ def load_inclusion_data(): structures_df = structures_df.replace({np.nan: None}) services_df = services_df.replace({np.nan: None}) - structures_df = structures_df.drop(columns=["_di_geocodage_score"]) - services_df = services_df.drop(columns=["_di_geocodage_score"]) - structure_errors_df = validate_df(structures_df, model_schema=schema.Structure) service_errors_df = validate_df(services_df, model_schema=schema.Service) diff --git a/pipeline/dags/compute_hourly.py b/pipeline/dags/compute_hourly.py index 5d3ca3e70..34e93d149 100644 --- a/pipeline/dags/compute_hourly.py +++ b/pipeline/dags/compute_hourly.py @@ -5,8 +5,7 @@ from dag_utils import date, marts, notifications from dag_utils.dbt import ( - get_after_geocoding_tasks, - get_before_geocoding_tasks, + get_intermediate_tasks, get_staging_tasks, ) @@ -24,8 +23,7 @@ ( start >> get_staging_tasks(schedule="@hourly") - >> get_before_geocoding_tasks() - >> get_after_geocoding_tasks() + >> get_intermediate_tasks() >> marts.export_di_dataset_to_s3() >> end ) diff --git a/pipeline/dags/dag_utils/dbt.py b/pipeline/dags/dag_utils/dbt.py index a5b3a21c6..61c0787fd 100644 --- a/pipeline/dags/dag_utils/dbt.py +++ b/pipeline/dags/dag_utils/dbt.py @@ -98,9 +98,9 @@ def get_staging_tasks(schedule=None): return task_list -def get_before_geocoding_tasks(): +def get_intermediate_tasks(): return dbt_operator_factory( - task_id="dbt_build_before_geocoding", + task_id="dbt_build_intermediate", command="build", select=" ".join( [ @@ -111,22 +111,11 @@ def get_before_geocoding_tasks(): # into a single DAG. Another way to see it is that it depended on # main since the beginning as it required intermediate data to be # present ? + "path:models/intermediate/int__geocodages.sql", "path:models/intermediate/int__union_contacts.sql", "path:models/intermediate/int__union_adresses.sql", "path:models/intermediate/int__union_services.sql", "path:models/intermediate/int__union_structures.sql", - ] - ), - trigger_rule=TriggerRule.ALL_DONE, - ) - - -def get_after_geocoding_tasks(): - return dbt_operator_factory( - task_id="dbt_build_after_geocoding", - command="build", - select=" ".join( - [ "path:models/intermediate/extra", "path:models/intermediate/int__plausible_personal_emails.sql", "path:models/intermediate/int__union_adresses__enhanced.sql+", @@ -140,4 +129,5 @@ def get_after_geocoding_tasks(): "path:models/intermediate/quality/int_quality__stats.sql+", ] ), + trigger_rule=TriggerRule.ALL_DONE, ) diff --git a/pipeline/dags/dag_utils/geocoding.py b/pipeline/dags/dag_utils/geocoding.py deleted file mode 100644 index 9ced0c175..000000000 --- a/pipeline/dags/dag_utils/geocoding.py +++ /dev/null @@ -1,125 +0,0 @@ -import csv -import io -import logging - -import numpy as np -import pandas as pd -import requests -import tenacity -from tenacity import before, stop, wait - -logger = logging.getLogger(__name__) - - -class GeocodingBackend: - def geocode(self, df: pd.DataFrame) -> pd.DataFrame: - raise NotImplementedError - - -class BaseAdresseNationaleBackend(GeocodingBackend): - def __init__(self, base_url: str): - self.base_url = base_url.strip("/") - - def _geocode(self, df: pd.DataFrame) -> pd.DataFrame: - logger.info("Will send address batch, dimensions=%s", df.shape) - with io.BytesIO() as buf: - df.to_csv(buf, index=False, quoting=csv.QUOTE_ALL, sep="|") - - try: - response = requests.post( - self.base_url + "/search/csv/", - files={"data": ("data.csv", buf.getvalue(), "text/csv")}, - data={ - "columns": ["adresse", "code_postal", "commune"], - # Post-filter on the INSEE code and not the zipcode. - # Explanations from the BAN API creators: - # The postcode is problematic for cities with multiple zipcodes - # if the supplied zipcode is wrong, or the one in the BAN is. - # The INSEE code is more stable, unique and reliable. - # Also this post-filter does not return "possible" results, - # it blindly filters-out. - "citycode": "code_insee", - }, - timeout=180, # we upload 2MB of data, so we need a high timeout - ) - response.raise_for_status() - except requests.RequestException as e: - logger.info("Error while fetching `%s`: %s", e.request.url, e) - return pd.DataFrame() - - with io.StringIO() as f: - f.write(response.text) - f.seek(0) - results_df = pd.read_csv( - f, - encoding_errors="replace", - on_bad_lines="warn", - dtype=str, - sep="|", - ) - results_df = results_df.replace({np.nan: None}) - # In some cases (ex: address='20' and city='Paris'), the BAN API will return - # a municipality as a result with a very high score. This is be discarded - # as this will not be valuable information to localize a structure. - results_df = results_df[results_df.result_type != "municipality"] - - logger.info("Got result for address batch, dimensions=%s", results_df.shape) - return results_df - - def geocode(self, df: pd.DataFrame) -> pd.DataFrame: - # BAN api limits the batch geocoding to 50MB of data - # In our tests, 10_000 rows is about 1MB; but we'll be conservative - # since we also want to avoid upload timeouts. - BATCH_SIZE = 20_000 - - # drop rows that have not at least one commune, code_insee or code_postal - # as the result can't make sense. - # Note that we keep the rows without an address, as they can be used to - # at least resolve the city. - df = df.dropna(subset=["code_postal", "code_insee", "commune"], thresh=2) - df = df.sort_values(by="_di_surrogate_id") - # Cleanup the values a bit to help the BAN's scoring. After experimentation, - # looking for "Ville-Nouvelle" returns worse results than "Ville Nouvelle", - # probably due to a tokenization in the BAN that favors spaces. - # In the same fashion, looking for "U.R.S.S." returns worse results than using - # "URSS" for "Avenue de l'U.R.S.S.". With the dots, it does not find the - # street at all ¯\_(ツ)_/¯ - df = df.assign( - adresse=(df.adresse.str.strip().replace("-", " ").replace(".", "")), - commune=df.commune.str.strip(), - ) - - logger.info(f"Only {len(df)} rows can be geocoded.") - - def _geocode_with_retry(df: pd.DataFrame) -> pd.DataFrame: - try: - for attempt in tenacity.Retrying( - stop=stop.stop_after_attempt(6), - wait=wait.wait_random_exponential(multiplier=10), - before=before.before_log(logger, logging.INFO), - ): - with attempt: - results_df = self._geocode(df) - - if ( - len(df) > 0 - and len(results_df.dropna(subset=["result_citycode"])) - / len(df) - # arbitrary threshold, if less than this percentage of - # the rows have been geocoded, retry. - < 0.3 - ): - # the BAN api often fails to properly complete a - # geocoding batch. If that happens, raise for retry. - raise tenacity.TryAgain - - return results_df - - except tenacity.RetryError: - logger.error("Failed to geocode batch") - return results_df - - return df.groupby( - np.arange(len(df)) // BATCH_SIZE, - group_keys=False, - ).apply(_geocode_with_retry) diff --git a/pipeline/dags/main.py b/pipeline/dags/main.py index 7b39b58e7..adde7e646 100644 --- a/pipeline/dags/main.py +++ b/pipeline/dags/main.py @@ -1,76 +1,15 @@ import pendulum import airflow -from airflow.operators import empty, python +from airflow.operators import empty from dag_utils import date, marts from dag_utils.dbt import ( dbt_operator_factory, - get_after_geocoding_tasks, - get_before_geocoding_tasks, + get_intermediate_tasks, get_staging_tasks, ) from dag_utils.notifications import notify_failure_args -from dag_utils.virtualenvs import PYTHON_BIN_PATH - - -def _geocode(): - import logging - - import sqlalchemy as sqla - - from airflow.models import Variable - from airflow.providers.postgres.hooks.postgres import PostgresHook - - from dag_utils import geocoding - from dag_utils.sources import utils - - logger = logging.getLogger(__name__) - - pg_hook = PostgresHook(postgres_conn_id="pg") - - # 1. Retrieve input data - input_df = pg_hook.get_pandas_df( - sql=""" - SELECT - _di_surrogate_id, - adresse, - code_postal, - code_insee, - commune - FROM public_intermediate.int__union_adresses; - """ - ) - - utils.log_df_info(input_df, logger=logger) - - geocoding_backend = geocoding.BaseAdresseNationaleBackend( - base_url=Variable.get("BAN_API_URL") - ) - - # 2. Geocode - output_df = geocoding_backend.geocode(input_df) - - utils.log_df_info(output_df, logger=logger) - - # 3. Write result back - engine = pg_hook.get_sqlalchemy_engine() - - with engine.connect() as conn: - with conn.begin(): - output_df.to_sql( - "extra__geocoded_results", - schema="public", - con=conn, - if_exists="replace", - index=False, - dtype={ - "latitude": sqla.Float, - "longitude": sqla.Float, - "result_score": sqla.Float, - }, - ) - with airflow.DAG( dag_id="main", @@ -93,20 +32,12 @@ def _geocode(): command="run-operation create_udfs", ) - python_geocode = python.ExternalPythonOperator( - task_id="python_geocode", - python=str(PYTHON_BIN_PATH), - python_callable=_geocode, - ) - ( start >> dbt_seed >> dbt_create_udfs >> get_staging_tasks() - >> get_before_geocoding_tasks() - >> python_geocode - >> get_after_geocoding_tasks() + >> get_intermediate_tasks() >> marts.export_di_dataset_to_s3() >> end ) diff --git a/pipeline/dbt/macros/create_udfs.sql b/pipeline/dbt/macros/create_udfs.sql index e8263217e..4276c0f9b 100644 --- a/pipeline/dbt/macros/create_udfs.sql +++ b/pipeline/dbt/macros/create_udfs.sql @@ -23,4 +23,4 @@ CREATE SCHEMA IF NOT EXISTS processings; {% do run_query(sql) %} -{% endmacro %}s \ No newline at end of file +{% endmacro %} diff --git a/pipeline/dbt/models/intermediate/extra/_extra__models.yml b/pipeline/dbt/models/intermediate/extra/_extra__models.yml index 20020c1ff..26af30df3 100644 --- a/pipeline/dbt/models/intermediate/extra/_extra__models.yml +++ b/pipeline/dbt/models/intermediate/extra/_extra__models.yml @@ -1,7 +1,4 @@ version: 2 models: - # TODO: cleanup these models, add staging models - - name: int_extra__insee_prenoms_filtered - - name: int_extra__geocoded_results diff --git a/pipeline/dbt/models/intermediate/extra/int_extra__geocoded_results.sql b/pipeline/dbt/models/intermediate/extra/int_extra__geocoded_results.sql deleted file mode 100644 index d03e189b5..000000000 --- a/pipeline/dbt/models/intermediate/extra/int_extra__geocoded_results.sql +++ /dev/null @@ -1,48 +0,0 @@ -{% set source_model = source('internal', 'extra__geocoded_results') %} - -{% set table_exists = adapter.get_relation(database=source_model.database, schema=source_model.schema, identifier=source_model.name) is not none %} - -{% if table_exists %} - - WITH source AS ( - SELECT * FROM {{ source_model }} - ), - -{% else %} - -WITH source AS ( - SELECT - NULL AS "_di_surrogate_id", - NULL AS "adresse", - NULL AS "code_postal", - NULL AS "commune", - CAST(NULL AS FLOAT) AS "latitude", - CAST(NULL AS FLOAT) AS "longitude", - NULL AS "result_label", - CAST(NULL AS FLOAT) AS "result_score", - NULL AS "result_score_next", - NULL AS "result_type", - NULL AS "result_id", - NULL AS "result_housenumber", - NULL AS "result_name", - NULL AS "result_street", - NULL AS "result_postcode", - NULL AS "result_city", - NULL AS "result_context", - NULL AS "result_citycode", - NULL AS "result_oldcitycode", - NULL AS "result_oldcity", - NULL AS "result_district", - NULL AS "result_status" - WHERE FALSE -), - -{% endif %} - -final AS ( - SELECT * - FROM source - WHERE result_id IS NOT NULL -) - -SELECT * FROM final diff --git a/pipeline/dbt/models/intermediate/int__union_adresses__enhanced.sql b/pipeline/dbt/models/intermediate/int__union_adresses__enhanced.sql index 11a14d77f..ec824feb3 100644 --- a/pipeline/dbt/models/intermediate/int__union_adresses__enhanced.sql +++ b/pipeline/dbt/models/intermediate/int__union_adresses__enhanced.sql @@ -6,7 +6,7 @@ geocodages AS ( SELECT * FROM {{ ref('int__geocodages') }} ), -final AS ( +overriden_adresses AS ( SELECT adresses._di_surrogate_id AS "_di_surrogate_id", adresses.id AS "id", @@ -28,6 +28,25 @@ final AS ( ON adresses._di_surrogate_id = geocodages.adresse_id AND geocodages.score >= 0.8 +), + +final AS ( + SELECT overriden_adresses.* + FROM overriden_adresses + LEFT JOIN + LATERAL + LIST_ADRESSE_ERRORS( + overriden_adresses.adresse, + overriden_adresses.code_insee, + overriden_adresses.code_postal, + overriden_adresses.commune, + overriden_adresses.complement_adresse, + overriden_adresses.id, + overriden_adresses.latitude, + overriden_adresses.longitude, + overriden_adresses.source + ) AS errors ON TRUE + WHERE errors.field IS NULL ) SELECT * FROM final diff --git a/pipeline/dbt/models/intermediate/int__union_services__enhanced.sql b/pipeline/dbt/models/intermediate/int__union_services__enhanced.sql index a02857e24..461c1a6cb 100644 --- a/pipeline/dbt/models/intermediate/int__union_services__enhanced.sql +++ b/pipeline/dbt/models/intermediate/int__union_services__enhanced.sql @@ -92,8 +92,7 @@ final AS ( adresses.commune AS "commune", adresses.adresse AS "adresse", adresses.code_postal AS "code_postal", - adresses.code_insee AS "code_insee", - adresses.result_score AS "_di_geocodage_score" + adresses.code_insee AS "code_insee" FROM valid_services LEFT JOIN adresses ON valid_services._di_adresse_surrogate_id = adresses._di_surrogate_id diff --git a/pipeline/dbt/models/intermediate/int__union_structures__enhanced.sql b/pipeline/dbt/models/intermediate/int__union_structures__enhanced.sql index ddc703a89..d8d629133 100644 --- a/pipeline/dbt/models/intermediate/int__union_structures__enhanced.sql +++ b/pipeline/dbt/models/intermediate/int__union_structures__enhanced.sql @@ -49,7 +49,6 @@ final AS ( adresses.adresse AS "adresse", adresses.code_postal AS "code_postal", adresses.code_insee AS "code_insee", - adresses.result_score AS "_di_geocodage_score", COALESCE(plausible_personal_emails._di_surrogate_id IS NOT NULL, FALSE) AS "_di_email_is_pii" FROM valid_structures diff --git a/pipeline/dbt/models/marts/inclusion/_inclusion_models.yml b/pipeline/dbt/models/marts/inclusion/_inclusion_models.yml index 389411472..dd1be2e0d 100644 --- a/pipeline/dbt/models/marts/inclusion/_inclusion_models.yml +++ b/pipeline/dbt/models/marts/inclusion/_inclusion_models.yml @@ -10,8 +10,6 @@ models: data_type: text constraints: - type: primary_key - - name: _di_geocodage_score - data_type: float - name: id data_type: text constraints: @@ -135,8 +133,6 @@ models: - type: not_null - type: foreign_key expression: "public_marts.marts_inclusion__structures (_di_surrogate_id)" - - name: _di_geocodage_score - data_type: float - name: id data_type: text constraints: diff --git a/pipeline/tests/integration/__init__.py b/pipeline/tests/integration/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/pipeline/tests/integration/test_geocoding.py b/pipeline/tests/integration/test_geocoding.py deleted file mode 100644 index b705c2f44..000000000 --- a/pipeline/tests/integration/test_geocoding.py +++ /dev/null @@ -1,73 +0,0 @@ -from unittest.mock import ANY - -import pandas as pd -import pytest - -from dags.dag_utils import geocoding - -pytestmark = pytest.mark.ban_api - - -@pytest.fixture -def ban_backend(): - return geocoding.BaseAdresseNationaleBackend( - base_url="https://api-adresse.data.gouv.fr" - ) - - -@pytest.fixture -def sample_df() -> pd.DataFrame: - return pd.DataFrame.from_records( - data=[ - { - "source": "dora", - "_di_surrogate_id": "1", - "adresse": "17 rue Malus", - "code_postal": "59000", - "code_insee": "59350", - "commune": "Lille", - }, - { - "source": "dora", - "_di_surrogate_id": "2", - "adresse": None, - "code_postal": None, - "code_insee": None, - "commune": None, - }, - ] - ) - - -def test_ban_geocode( - ban_backend: geocoding.BaseAdresseNationaleBackend, - sample_df: pd.DataFrame, -): - assert ban_backend.geocode(sample_df).to_dict(orient="records") == [ - { - "_di_surrogate_id": "1", - "source": "dora", - "adresse": "17 rue Malus", - "code_insee": "59350", - "code_postal": "59000", - "commune": "Lille", - "latitude": "50.627078", - "longitude": "3.067372", - "result_label": "17 Rue Malus 59000 Lille", - "result_score": ANY, - "result_score_next": None, - "result_type": "housenumber", - "result_id": "59350_5835_00017", - "result_housenumber": "17", - "result_name": "17 Rue Malus", - "result_street": "Rue Malus", - "result_postcode": "59000", - "result_city": "Lille", - "result_context": "59, Nord, Hauts-de-France", - "result_citycode": "59350", - "result_oldcitycode": "59350", - "result_oldcity": "Lille", - "result_district": None, - "result_status": "ok", - } - ]