Skip to content

Commit

Permalink
_wip : download all geo data from the same source
Browse files Browse the repository at this point in the history
  • Loading branch information
vperron committed Aug 29, 2024
1 parent 6554996 commit 5762288
Showing 1 changed file with 97 additions and 9 deletions.
106 changes: 97 additions & 9 deletions pipeline/dags/sync_cities.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,14 @@ def to_json_city(data):
"nom": data["nom"],
"region": data["codeRegion"],
"departement": data["codeDepartement"],
"siren_epci": data[
"codeEpci"
], # FIXME(vperron) : if district, use the one from city
"siren_epci": data["codeEpci"],
"centre": data["centre"],
"codes_postaux": sorted(data["codesPostaux"]),
}

def format_district(raw_city):
raw_city["nom"] = raw_city["nom"].replace("Arrondissement", "").strip()
# FIXME(vperron) : this is a hack to get the SIREN of the EPCI
# FIXME(vperron) : this is a hack to get the SIREN of the EPCI for districts
# It might become invalid in the future but it's QUITE unlikely.
raw_city["codeEpci"] = {
"13": "200054807", # Marseille
Expand Down Expand Up @@ -72,8 +70,7 @@ def fetch_cities(districts_only=False):
if districts_only:
answer = [format_district(raw_city) for raw_city in answer]

# FIXME(vperron) : temporarily ignore any city without an EPCI.
return [c for c in answer if "codeEpci" in c]
return answer

cities = sorted(
[to_json_city(c) for c in fetch_cities() + fetch_cities(districts_only=True)],
Expand All @@ -85,7 +82,35 @@ def fetch_cities(districts_only=False):
)


def load_from_s3(run_id, logical_date):
@task.external_python(
python=str(PYTHON_BIN_PATH),
retries=2,
)
def extract_metadata(run_id, logical_date):
import json

import httpx

from dag_utils import s3

def fetch_store_resource(name):
response = httpx.get(f"https://geo.api.gouv.fr/{name}")
response.raise_for_status()
answer = sorted(
response.json(),
key=lambda c: c["code"],
)
s3.store_content(
s3.source_file_path("api_geo", f"{name}.json", run_id, logical_date),
json.dumps(answer).encode(),
)

fetch_store_resource("departements")
fetch_store_resource("regions")
fetch_store_resource("epcis")


def load_cities_from_s3(run_id, logical_date):
import logging

from geoalchemy2 import Geometry
Expand Down Expand Up @@ -148,6 +173,56 @@ def create_point(geom):
)


def load_metadata_from_s3(run_id, logical_date):
import logging

from dag_utils import pg, s3
from dag_utils.sources import utils as source_utils

logger = logging.getLogger(__name__)

DB_SCHEMA = "public"

# region: code,nom
# departements: code,nom,codeRegion
# epcis: code,nom,codesDepartements,codesRegions,population

def load_resource(name):
table_name = f"api_geo__{name}"
s3_file_path = s3.source_file_path(
source_id="api_geo",
filename=f"{name}.json",
run_id=run_id,
logical_date=logical_date,
)
tmp_file_path = s3.download_file(s3_file_path)
logger.info(
"Downloading file s3_path=%s tmp_path=%s", s3_file_path, tmp_file_path
)
df = source_utils.read_json(path=tmp_file_path)
with pg.connect_begin() as conn:
df.to_sql(
f"{table_name}_tmp",
con=conn,
schema=DB_SCHEMA,
if_exists="replace",
index=False,
)

conn.execute(
f"""\
CREATE TABLE IF NOT EXISTS {DB_SCHEMA}.{table_name} (
code TEXT PRIMARY KEY,
nom TEXT NOT NULL,
codesRegions TEXT[] NOT NULL
);
TRUNCATE {DB_SCHEMA}.{table_name};
INSERT INTO {DB_SCHEMA}.{table_name}
SELECT * FROM {DB_SCHEMA}.{table_name}_tmp;
DROP TABLE {DB_SCHEMA}.{table_name}_tmp;"""
)


@dag(
start_date=pendulum.datetime(2022, 1, 1, tz=date.TIME_ZONE),
default_args=default_args,
Expand All @@ -159,13 +234,26 @@ def sync_cities():
start = empty.EmptyOperator(task_id="start")
end = empty.EmptyOperator(task_id="end")

load_metadata = python.ExternalPythonOperator(
task_id="load_metadata",
python=str(PYTHON_BIN_PATH),
python_callable=load_metadata_from_s3,
)

load_cities = python.ExternalPythonOperator(
task_id="load_cities",
python=str(PYTHON_BIN_PATH),
python_callable=load_from_s3,
python_callable=load_cities_from_s3,
)

(start >> extract_cities() >> load_cities >> end)
(
start
>> extract_metadata()
>> extract_cities()
>> load_metadata
>> load_cities
>> end
)


sync_cities()

0 comments on commit 5762288

Please sign in to comment.