From 2cec2fbd00f89d54dfdce53ed2685babaf5e2309 Mon Sep 17 00:00:00 2001 From: Valentin Matton Date: Mon, 9 Oct 2023 18:37:30 +0200 Subject: [PATCH] chore(pipeline): chunck admin express communes --- pipeline/dags/import_admin_express.py | 55 +++++++++++++++++---------- 1 file changed, 35 insertions(+), 20 deletions(-) diff --git a/pipeline/dags/import_admin_express.py b/pipeline/dags/import_admin_express.py index f262bdf1..4838e55a 100644 --- a/pipeline/dags/import_admin_express.py +++ b/pipeline/dags/import_admin_express.py @@ -29,9 +29,12 @@ def _load_communes(): import textwrap import geopandas + import tqdm from airflow.providers.postgres.hooks import postgres pg_hook = postgres.PostgresHook(postgres_conn_id="pg") + engine = pg_hook.get_sqlalchemy_engine() + tmp_dir_path = ( pathlib.Path("/tmp") / "ADMIN-EXPRESS-COG_3-0__SHP__FRA_2021-05-19" @@ -41,31 +44,43 @@ def _load_communes(): ) tmp_file_path = tmp_dir_path / "COMMUNE.shp" - df = geopandas.read_file(tmp_file_path) - df = df.rename( - columns={ - "INSEE_COM": "code", - "NOM": "nom", - "INSEE_DEP": "departement", - "INSEE_REG": "region", - "SIREN_EPCI": "siren_epci", - } - ) - df = df.rename_geometry("geom") - df = df[["code", "nom", "departement", "region", "siren_epci", "geom"]] - df = df.sort_values(by="code") # optimize future lookups - engine = pg_hook.get_sqlalchemy_engine() target_table = "admin_express_communes" with engine.connect() as conn: with conn.begin(): - df.to_postgis( - target_table, - con=conn, - if_exists="replace", - index=False, - ) + for i in tqdm.tqdm(range(100)): + chunck_df = geopandas.read_file( + tmp_file_path, rows=slice(1000 * i, 1000 * (i + 1)) + ) + + if len(chunck_df) == 0: + break + + chunck_df = chunck_df.rename( + columns={ + "INSEE_COM": "code", + "NOM": "nom", + "INSEE_DEP": "departement", + "INSEE_REG": "region", + "SIREN_EPCI": "siren_epci", + } + ) + chunck_df = chunck_df.rename_geometry("geom") + chunck_df = chunck_df[ + ["code", "nom", "departement", "region", "siren_epci", "geom"] + ] + + # optimize future lookups + chunck_df = chunck_df.sort_values(by="code") + + chunck_df.to_postgis( + target_table, + con=conn, + if_exists="replace" if i == 0 else "append", + index=False, + ) + conn.execute(f"ALTER TABLE {target_table} ADD PRIMARY KEY (code);") # create an index on simplified geography elements, for fast radius search conn.execute(