Skip to content

Commit

Permalink
chore(pipeline): geocode as a dbt model
Browse files Browse the repository at this point in the history
The trick is to leverage plpython to do the geocoding
inside the database. Doing so, geocoding can now be
modelled as a dbt model and orchestrate as such.

The geocoding implementation has been moved to an
actual package maintained next to the datawarehouse
image. The plpython udf simply wraps the call.

Is it worth it ?

* it heavely simplifies the flow and set clearer concerns
between airflow and dbt. Dbt does the transformation, airflow
orchestrate it.
* less error prone since we do not have to pull data from the db
and map it to python ourselves.
* we can even leverage dbt to to the geocoding incrementally on
inputs that have not been seen before. This will drastically
reduce our carbon footprint...

There are a few enhancements we would probably want :

* obviously clean up
* define a macro with the geocoding model that can be used for all
sources
* re-do geocodings for relatively old inputs
* do the geocoding per source ?
  • Loading branch information
vmttn committed Sep 13, 2024
1 parent 3367584 commit 84826d4
Show file tree
Hide file tree
Showing 18 changed files with 582 additions and 102 deletions.
13 changes: 12 additions & 1 deletion datawarehouse/.dockerignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,15 @@
*

!/docker-entrypoint-initdb.d
!/requirements
!/processings

# Python
**/*.py[cod]
**/__pycache__/
**/.venv/
**/build/
**/dist/
**/.tox/
**/.pytest_cache/
**/*.egg-info/
**/db.sqlite3
11 changes: 7 additions & 4 deletions datawarehouse/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@ RUN apt-get update \
&& apt-get clean -y \
&& rm -rf /var/lib/apt/lists/*

RUN python3.11 -m venv ${VIRTUAL_ENV}
RUN pip install --no-cache-dir --upgrade pip setuptools wheel

COPY ./docker-entrypoint-initdb.d /docker-entrypoint-initdb.d

RUN python3.11 -m venv ${VIRTUAL_ENV}
COPY processings/requirements processings/requirements
RUN pip install --no-cache-dir -r processings/requirements/requirements.txt

COPY requirements requirements
RUN pip install --no-cache-dir --upgrade pip setuptools wheel
RUN pip install --no-cache-dir -r requirements/requirements.txt
COPY processings processings
RUN pip install --no-cache-dir -e processings
18 changes: 18 additions & 0 deletions datawarehouse/processings/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
PIP_COMPILE := pipx run uv pip compile pyproject.toml --quiet

ifeq ($(filter upgrade,$(MAKECMDGOALS)),upgrade)
PIP_COMPILE += --upgrade
endif

.PHONY: all dev base test uv upgrade

all: base dev test

base:
$(PIP_COMPILE) --output-file=requirements/requirements.txt

dev:
$(PIP_COMPILE) --extra=dev --output-file=requirements/dev-requirements.txt

test:
$(PIP_COMPILE) --extra=test --output-file=requirements/test-requirements.txt
46 changes: 46 additions & 0 deletions datawarehouse/processings/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
[build-system]
build-backend = "setuptools.build_meta"
requires = ["setuptools", "wheel"]

[project]
name = "data-inclusion-processings"
version = "0.1.0"
dependencies = [
"numpy~=2.0",
"pandas~=2.2",
"requests~=2.31",
"tenacity",
]

[project.optional-dependencies]
dev = [
"pre-commit",
"ruff",
]
test = [
"pytest",
]

[tool.setuptools]
package-dir = {"" = "src"}

[tool.ruff.lint]
# see prefixes in https://beta.ruff.rs/docs/rules/
select = [
"F", # pyflakes
"E", # pycodestyle errors
"W", # pycodestyle warnings
"I", # isort
"UP", # pyupgrade
"S", # bandit
]

[tool.ruff.lint.isort]
combine-as-imports = true
known-first-party = ["data_inclusion"]

[tool.pytest.ini_options]
testpaths = "tests"
markers = '''
ban_api: mark test as requiring the base base adresse nationale api
'''
48 changes: 48 additions & 0 deletions datawarehouse/processings/requirements/dev-requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# This file was autogenerated by uv via the following command:
# uv pip compile pyproject.toml --extra=dev --output-file=requirements/dev-requirements.txt
certifi==2024.8.30
# via requests
cfgv==3.4.0
# via pre-commit
charset-normalizer==3.3.2
# via requests
distlib==0.3.8
# via virtualenv
filelock==3.16.0
# via virtualenv
identify==2.6.0
# via pre-commit
idna==3.8
# via requests
nodeenv==1.9.1
# via pre-commit
numpy==2.1.1
# via
# data-inclusion-processings (pyproject.toml)
# pandas
pandas==2.2.2
# via data-inclusion-processings (pyproject.toml)
platformdirs==4.3.2
# via virtualenv
pre-commit==3.8.0
# via data-inclusion-processings (pyproject.toml)
python-dateutil==2.9.0.post0
# via pandas
pytz==2024.2
# via pandas
pyyaml==6.0.2
# via pre-commit
requests==2.32.3
# via data-inclusion-processings (pyproject.toml)
ruff==0.6.4
# via data-inclusion-processings (pyproject.toml)
six==1.16.0
# via python-dateutil
tenacity==9.0.0
# via data-inclusion-processings (pyproject.toml)
tzdata==2024.1
# via pandas
urllib3==2.2.2
# via requests
virtualenv==20.26.4
# via pre-commit
28 changes: 28 additions & 0 deletions datawarehouse/processings/requirements/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# This file was autogenerated by uv via the following command:
# uv pip compile pyproject.toml --output-file=requirements/requirements.txt
certifi==2024.8.30
# via requests
charset-normalizer==3.3.2
# via requests
idna==3.8
# via requests
numpy==2.1.1
# via
# data-inclusion-processings (pyproject.toml)
# pandas
pandas==2.2.2
# via data-inclusion-processings (pyproject.toml)
python-dateutil==2.9.0.post0
# via pandas
pytz==2024.2
# via pandas
requests==2.32.3
# via data-inclusion-processings (pyproject.toml)
six==1.16.0
# via python-dateutil
tenacity==9.0.0
# via data-inclusion-processings (pyproject.toml)
tzdata==2024.1
# via pandas
urllib3==2.2.2
# via requests
36 changes: 36 additions & 0 deletions datawarehouse/processings/requirements/test-requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# This file was autogenerated by uv via the following command:
# uv pip compile pyproject.toml --extra=test --output-file=requirements/test-requirements.txt
certifi==2024.8.30
# via requests
charset-normalizer==3.3.2
# via requests
idna==3.8
# via requests
iniconfig==2.0.0
# via pytest
numpy==2.1.1
# via
# data-inclusion-processings (pyproject.toml)
# pandas
packaging==24.1
# via pytest
pandas==2.2.2
# via data-inclusion-processings (pyproject.toml)
pluggy==1.5.0
# via pytest
pytest==8.3.3
# via data-inclusion-processings (pyproject.toml)
python-dateutil==2.9.0.post0
# via pandas
pytz==2024.2
# via pandas
requests==2.32.3
# via data-inclusion-processings (pyproject.toml)
six==1.16.0
# via python-dateutil
tenacity==9.0.0
# via data-inclusion-processings (pyproject.toml)
tzdata==2024.1
# via pandas
urllib3==2.2.2
# via requests
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from data_inclusion.processings.geocode import geocode

__all__ = [
"geocode",
]
102 changes: 102 additions & 0 deletions datawarehouse/processings/src/data_inclusion/processings/geocode.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import csv
import io
import logging
from dataclasses import dataclass

import numpy as np
import pandas as pd
import requests

logger = logging.getLogger(__name__)


def _geocode(df: pd.DataFrame) -> pd.DataFrame:
logger.info("Will send address batch, dimensions=%s", df.shape)
with io.BytesIO() as buf:
df.to_csv(buf, index=False, quoting=csv.QUOTE_ALL, sep="|")

try:
response = requests.post(
"https://api-adresse.data.gouv.fr/search/csv/",
files={"data": ("data.csv", buf.getvalue(), "text/csv")},
data={
"columns": ["adresse", "code_postal", "commune"],
# Post-filter on the INSEE code and not the zipcode.
# Explanations from the BAN API creators:
# The postcode is problematic for cities with multiple zipcodes
# if the supplied zipcode is wrong, or the one in the BAN is.
# The INSEE code is more stable, unique and reliable.
# Also this post-filter does not return "possible" results,
# it blindly filters-out.
"postcode": "code_postal",
},
timeout=180, # we upload 2MB of data, so we need a high timeout
)
response.raise_for_status()
except requests.RequestException as e:
logger.info("Error while fetching `%s`: %s", e.request.url, e)
return pd.DataFrame()

with io.StringIO() as f:
f.write(response.text)
f.seek(0)
results_df = pd.read_csv(
f,
encoding_errors="replace",
on_bad_lines="warn",
dtype=str,
sep="|",
)
results_df = results_df.replace({np.nan: None})

logger.info("Got result for address batch, dimensions=%s", results_df.shape)
return results_df


@dataclass
class GeocodeInput:
id: str
adresse: str
code_insee: str
code_postal: str
commune: str


def geocode(
data: GeocodeInput | list[GeocodeInput],
batch_size: int = 20_000,
) -> pd.DataFrame:
# BAN api limits the batch geocoding to 50MB of data
# In our tests, 10_000 rows is about 1MB; but we'll be conservative
# since we also want to avoid upload timeouts.

data = data if isinstance(data, list) else [data]
df = pd.DataFrame.from_records(data)

# drop rows that have not at least one commune, code_insee or code_postal
# as the result can't make sense.
# Note that we keep the rows without an address, as they can be used to
# at least resolve the city.
df = df.dropna(subset=["code_postal", "code_insee", "commune"], thresh=2)

# Cleanup the values a bit to help the BAN's scoring. After experimentation,
# looking for "Ville-Nouvelle" returns worse results than "Ville Nouvelle",
# probably due to a tokenization in the BAN that favors spaces.
# In the same fashion, looking for "U.R.S.S." returns worse results than using
# "URSS" for "Avenue de l'U.R.S.S.". With the dots, it does not find the
# street at all ¯\_(ツ)_/¯
df = df.assign(
adresse=df.adresse.str.strip().replace("-", " ").replace(".", ""),
commune=df.commune.str.strip(),
)

logger.info(f"Only {len(df)} rows can be geocoded.")

return (
df.groupby(
np.arange(len(df)) // batch_size,
group_keys=False,
)
.apply(_geocode)
.to_dict(orient="records")
)
Empty file.
62 changes: 62 additions & 0 deletions datawarehouse/processings/tests/integration/test_geocoding.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
from unittest.mock import ANY

import pytest

from data_inclusion.processings import geocode

pytestmark = pytest.mark.ban_api


@pytest.mark.parametrize(
("adresse", "expected"),
[
(
{
"id": "1",
"adresse": "17 rue Malus",
"code_postal": "59000",
"code_insee": "59350",
"commune": "Lille",
},
{
"id": "1",
"adresse": "17 rue Malus",
"code_postal": "59000",
"code_insee": "59350",
"commune": "Lille",
"latitude": "50.627078",
"longitude": "3.067372",
"result_label": "17 Rue Malus 59000 Lille",
"result_score": ANY,
"result_score_next": None,
"result_type": "housenumber",
"result_id": "59350_5835_00017",
"result_housenumber": "17",
"result_name": "17 Rue Malus",
"result_street": "Rue Malus",
"result_postcode": "59000",
"result_city": "Lille",
"result_context": "59, Nord, Hauts-de-France",
"result_citycode": "59350",
"result_oldcitycode": "59350",
"result_oldcity": "Lille",
"result_district": None,
"result_status": "ok",
},
),
(
{
"id": "2",
"adresse": None,
"code_postal": None,
"code_insee": None,
"commune": None,
},
None,
),
],
)
def test_ban_geocode(adresse: dict, expected: dict):
result_df = geocode(data=adresse)

assert result_df == ([expected] if expected is not None else [])
2 changes: 0 additions & 2 deletions datawarehouse/requirements/requirements.in

This file was deleted.

Loading

0 comments on commit 84826d4

Please sign in to comment.