Skip to content

Commit

Permalink
chore(pipeline): chunck admin express communes
Browse files Browse the repository at this point in the history
  • Loading branch information
vmttn committed Oct 9, 2023
1 parent 6038d90 commit 2cec2fb
Showing 1 changed file with 35 additions and 20 deletions.
55 changes: 35 additions & 20 deletions pipeline/dags/import_admin_express.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,12 @@ def _load_communes():
import textwrap

import geopandas
import tqdm
from airflow.providers.postgres.hooks import postgres

pg_hook = postgres.PostgresHook(postgres_conn_id="pg")
engine = pg_hook.get_sqlalchemy_engine()

tmp_dir_path = (
pathlib.Path("/tmp")
/ "ADMIN-EXPRESS-COG_3-0__SHP__FRA_2021-05-19"
Expand All @@ -41,31 +44,43 @@ def _load_communes():
)

tmp_file_path = tmp_dir_path / "COMMUNE.shp"
df = geopandas.read_file(tmp_file_path)
df = df.rename(
columns={
"INSEE_COM": "code",
"NOM": "nom",
"INSEE_DEP": "departement",
"INSEE_REG": "region",
"SIREN_EPCI": "siren_epci",
}
)
df = df.rename_geometry("geom")
df = df[["code", "nom", "departement", "region", "siren_epci", "geom"]]
df = df.sort_values(by="code") # optimize future lookups

engine = pg_hook.get_sqlalchemy_engine()
target_table = "admin_express_communes"

with engine.connect() as conn:
with conn.begin():
df.to_postgis(
target_table,
con=conn,
if_exists="replace",
index=False,
)
for i in tqdm.tqdm(range(100)):
chunck_df = geopandas.read_file(
tmp_file_path, rows=slice(1000 * i, 1000 * (i + 1))
)

if len(chunck_df) == 0:
break

chunck_df = chunck_df.rename(
columns={
"INSEE_COM": "code",
"NOM": "nom",
"INSEE_DEP": "departement",
"INSEE_REG": "region",
"SIREN_EPCI": "siren_epci",
}
)
chunck_df = chunck_df.rename_geometry("geom")
chunck_df = chunck_df[
["code", "nom", "departement", "region", "siren_epci", "geom"]
]

# optimize future lookups
chunck_df = chunck_df.sort_values(by="code")

chunck_df.to_postgis(
target_table,
con=conn,
if_exists="replace" if i == 0 else "append",
index=False,
)

conn.execute(f"ALTER TABLE {target_table} ADD PRIMARY KEY (code);")
# create an index on simplified geography elements, for fast radius search
conn.execute(
Expand Down

0 comments on commit 2cec2fb

Please sign in to comment.