Skip to content

Commit

Permalink
chore(pipeline): clean up old geocoding task
Browse files Browse the repository at this point in the history
  • Loading branch information
vmttn committed Sep 16, 2024
1 parent 84826d4 commit b17d307
Show file tree
Hide file tree
Showing 10 changed files with 10 additions and 342 deletions.
6 changes: 2 additions & 4 deletions pipeline/dags/compute_hourly.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@

from dag_utils import date, marts, notifications
from dag_utils.dbt import (
get_after_geocoding_tasks,
get_before_geocoding_tasks,
get_intermediate_tasks,
get_staging_tasks,
)

Expand All @@ -24,8 +23,7 @@
(
start
>> get_staging_tasks(schedule="@hourly")
>> get_before_geocoding_tasks()
>> get_after_geocoding_tasks()
>> get_intermediate_tasks()
>> marts.export_di_dataset_to_s3()
>> end
)
18 changes: 4 additions & 14 deletions pipeline/dags/dag_utils/dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,9 @@ def get_staging_tasks(schedule=None):
return task_list


def get_before_geocoding_tasks():
def get_intermediate_tasks():
return dbt_operator_factory(
task_id="dbt_build_before_geocoding",
task_id="dbt_build_intermediate",
command="build",
select=" ".join(
[
Expand All @@ -111,22 +111,11 @@ def get_before_geocoding_tasks():
# into a single DAG. Another way to see it is that it depended on
# main since the beginning as it required intermediate data to be
# present ?
"path:models/intermediate/int__geocodages.sql",
"path:models/intermediate/int__union_contacts.sql",
"path:models/intermediate/int__union_adresses.sql",
"path:models/intermediate/int__union_services.sql",
"path:models/intermediate/int__union_structures.sql",
]
),
trigger_rule=TriggerRule.ALL_DONE,
)


def get_after_geocoding_tasks():
return dbt_operator_factory(
task_id="dbt_build_after_geocoding",
command="build",
select=" ".join(
[
"path:models/intermediate/extra",
"path:models/intermediate/int__plausible_personal_emails.sql",
"path:models/intermediate/int__union_adresses__enhanced.sql+",
Expand All @@ -140,4 +129,5 @@ def get_after_geocoding_tasks():
"path:models/intermediate/quality/int_quality__stats.sql+",
]
),
trigger_rule=TriggerRule.ALL_DONE,
)
125 changes: 0 additions & 125 deletions pipeline/dags/dag_utils/geocoding.py

This file was deleted.

75 changes: 3 additions & 72 deletions pipeline/dags/main.py
Original file line number Diff line number Diff line change
@@ -1,76 +1,15 @@
import pendulum

import airflow
from airflow.operators import empty, python
from airflow.operators import empty

from dag_utils import date, marts
from dag_utils.dbt import (
dbt_operator_factory,
get_after_geocoding_tasks,
get_before_geocoding_tasks,
get_intermediate_tasks,
get_staging_tasks,
)
from dag_utils.notifications import notify_failure_args
from dag_utils.virtualenvs import PYTHON_BIN_PATH


def _geocode():
import logging

import sqlalchemy as sqla

from airflow.models import Variable
from airflow.providers.postgres.hooks.postgres import PostgresHook

from dag_utils import geocoding
from dag_utils.sources import utils

logger = logging.getLogger(__name__)

pg_hook = PostgresHook(postgres_conn_id="pg")

# 1. Retrieve input data
input_df = pg_hook.get_pandas_df(
sql="""
SELECT
_di_surrogate_id,
adresse,
code_postal,
code_insee,
commune
FROM public_intermediate.int__union_adresses;
"""
)

utils.log_df_info(input_df, logger=logger)

geocoding_backend = geocoding.BaseAdresseNationaleBackend(
base_url=Variable.get("BAN_API_URL")
)

# 2. Geocode
output_df = geocoding_backend.geocode(input_df)

utils.log_df_info(output_df, logger=logger)

# 3. Write result back
engine = pg_hook.get_sqlalchemy_engine()

with engine.connect() as conn:
with conn.begin():
output_df.to_sql(
"extra__geocoded_results",
schema="public",
con=conn,
if_exists="replace",
index=False,
dtype={
"latitude": sqla.Float,
"longitude": sqla.Float,
"result_score": sqla.Float,
},
)


with airflow.DAG(
dag_id="main",
Expand All @@ -93,20 +32,12 @@ def _geocode():
command="run-operation create_udfs",
)

python_geocode = python.ExternalPythonOperator(
task_id="python_geocode",
python=str(PYTHON_BIN_PATH),
python_callable=_geocode,
)

(
start
>> dbt_seed
>> dbt_create_udfs
>> get_staging_tasks()
>> get_before_geocoding_tasks()
>> python_geocode
>> get_after_geocoding_tasks()
>> get_intermediate_tasks()
>> marts.export_di_dataset_to_s3()
>> end
)
3 changes: 0 additions & 3 deletions pipeline/dbt/models/intermediate/extra/_extra__models.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
version: 2

models:
# TODO: cleanup these models, add staging models

- name: int_extra__insee_prenoms_filtered
- name: int_extra__geocoded_results

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,7 @@ final AS (
adresses.commune AS "commune",
adresses.adresse AS "adresse",
adresses.code_postal AS "code_postal",
adresses.code_insee AS "code_insee",
adresses.result_score AS "_di_geocodage_score"
adresses.code_insee AS "code_insee"
FROM
valid_services
LEFT JOIN adresses ON valid_services._di_adresse_surrogate_id = adresses._di_surrogate_id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ final AS (
adresses.adresse AS "adresse",
adresses.code_postal AS "code_postal",
adresses.code_insee AS "code_insee",
adresses.result_score AS "_di_geocodage_score",
COALESCE(plausible_personal_emails._di_surrogate_id IS NOT NULL, FALSE) AS "_di_email_is_pii"
FROM
valid_structures
Expand Down
Empty file.
Loading

0 comments on commit b17d307

Please sign in to comment.