Skip to content

Commit

Permalink
chore(pipeline): cleanup pii detection
Browse files Browse the repository at this point in the history
* Simplify personal email detection. More false positives
in favour of simplicity and speed.
* Add unit tests
  • Loading branch information
vmttn committed Nov 4, 2024
1 parent b768303 commit 48a6c95
Show file tree
Hide file tree
Showing 16 changed files with 169 additions and 155 deletions.
5 changes: 2 additions & 3 deletions pipeline/dags/dag_utils/dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,14 +108,13 @@ def get_intermediate_tasks():
# into a single DAG. Another way to see it is that it depended on
# main since the beginning as it required intermediate data to be
# present ?
"path:models/intermediate/int__geocodages.sql",
"path:models/intermediate/int__courriels_personnels.sql",
"path:models/intermediate/int__criteres_qualite.sql",
"path:models/intermediate/int__geocodages.sql",
"path:models/intermediate/int__union_contacts.sql",
"path:models/intermediate/int__union_adresses.sql",
"path:models/intermediate/int__union_services.sql",
"path:models/intermediate/int__union_structures.sql",
"path:models/intermediate/extra",
"path:models/intermediate/int__plausible_personal_emails.sql",
"path:models/intermediate/int__union_contacts__enhanced.sql+",
"path:models/intermediate/int__union_adresses__enhanced.sql+",
"path:models/intermediate/int__union_services__enhanced.sql+",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,22 @@

import pendulum

import airflow
from airflow.operators import empty, python
from airflow.decorators import dag, task
from airflow.operators import empty

from dag_utils import date
from dag_utils import date, dbt
from dag_utils.virtualenvs import PYTHON_BIN_PATH

logger = logging.getLogger(__name__)

default_args = {}


def _import_dataset():
@task.external_python(
python=str(PYTHON_BIN_PATH),
retries=2,
)
def extract_and_load():
import pandas as pd

from airflow.models import Variable
Expand All @@ -22,31 +26,35 @@ def _import_dataset():

df = pd.read_csv(Variable.get("INSEE_FIRSTNAME_FILE_URL"), sep=";")

schema = "insee"
pg.create_schema(schema)

with pg.connect_begin() as conn:
df.to_sql(
schema="insee",
schema=schema,
name="etat_civil_prenoms",
# FIXME(vperron): the schema should be "insee" instead of "public"
con=conn,
if_exists="replace",
index=False,
)


with airflow.DAG(
dag_id="import_insee_firstnames",
@dag(
start_date=pendulum.datetime(2022, 1, 1, tz=date.TIME_ZONE),
default_args=default_args,
schedule="@once",
schedule="@yearly",
catchup=False,
) as dag:
)
def import_insee_prenoms():
start = empty.EmptyOperator(task_id="start")
end = empty.EmptyOperator(task_id="end")

import_dataset = python.ExternalPythonOperator(
task_id="import",
python=str(PYTHON_BIN_PATH),
python_callable=_import_dataset,
dbt_build_staging = dbt.dbt_operator_factory(
task_id="dbt_build_staging",
command="build",
select="path:models/staging/insee",
)

start >> import_dataset >> end
start >> extract_and_load() >> dbt_build_staging >> end


import_insee_prenoms()
3 changes: 0 additions & 3 deletions pipeline/dbt/macros/obfuscate.sql

This file was deleted.

7 changes: 0 additions & 7 deletions pipeline/dbt/models/_sources.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,6 @@ sources:
meta:
kind: service

- name: insee
schema: insee
tables:
- name: etat_civil_prenoms
- name: sirene_etablissement_historique
- name: sirene_etablissement_succession

- name: france_travail
schema: france_travail
meta:
Expand Down
23 changes: 21 additions & 2 deletions pipeline/dbt/models/intermediate/_models.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
version: 2

models:
- name: int__plausible_personal_emails

- name: int__union_adresses
data_tests:
- check_adresse:
Expand Down Expand Up @@ -183,8 +181,29 @@ models:
min_value: 0
max_value: 1

- name: int__courriels_personnels
columns:
- name: courriel
data_tests:
- not_null
- dbt_utils.not_empty_string

unit_tests:
- name: test_courriels_personnels
model: int__courriels_personnels
given:
- input: ref('int__union_contacts')
rows:
- {courriel: [email protected]}
- {courriel: [email protected]}
- {courriel: [email protected]}
- input: ref('stg_insee__prenoms')
rows:
- {prenom: jean}
expect:
rows:
- {courriel: [email protected]}

- name: test_geocodages_full_refresh_mode
model: int__geocodages
overrides:
Expand Down
4 changes: 0 additions & 4 deletions pipeline/dbt/models/intermediate/extra/_extra__models.yml

This file was deleted.

This file was deleted.

34 changes: 34 additions & 0 deletions pipeline/dbt/models/intermediate/int__courriels_personnels.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
WITH contacts AS (
SELECT * FROM {{ ref('int__union_contacts') }}
),

insee_prenoms AS (
SELECT * FROM {{ ref('stg_insee__prenoms') }}
),

final AS (
SELECT DISTINCT contacts.courriel
FROM contacts
INNER JOIN insee_prenoms
ON STARTS_WITH(contacts.courriel, (insee_prenoms.prenom || '.'))
WHERE
-- focus on mainstream email providers
SPLIT_PART(SPLIT_PART(contacts.courriel, '@', 2), '.', -2) -- 2nd level domain
IN (
-- list compiled by looking at most common domains in the dataset
'free',
'gmail',
'hotmail',
'laposte',
'nordnet',
'orange',
'outlook',
'sfr',
'wanadoo',
'yahoo'
)
-- ignore common patterns for non personal emails
AND NOT SPLIT_PART(contacts.courriel, '@', 1) ~ 'mairie|commune|adil|services|ccas'
)

SELECT * FROM final

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,29 @@ rgpd_notices AS (

final AS (
SELECT
contacts.source || '-' || contacts.id AS "_di_surrogate_id",
contacts.id AS "id",
contacts.source AS "source",
contacts._di_surrogate_id AS "_di_surrogate_id",
contacts.id AS "id",
contacts.source AS "source",
CASE
WHEN
rgpd_notices.id IS NULL
OR NOT rgpd_notices.was_objected_to
THEN contacts.contact_nom_prenom
END AS "contact_nom_prenom",
END AS "contact_nom_prenom",
CASE
WHEN
rgpd_notices.id IS NULL
OR (NOT rgpd_notices.was_objected_to AND NOT rgpd_notices.has_hardbounced)
THEN contacts.courriel
END AS "courriel",
END AS "courriel",
CASE
WHEN
rgpd_notices.id IS NULL
OR NOT rgpd_notices.was_objected_to
THEN contacts.telephone
END AS "telephone",
rgpd_notices.was_objected_to AS "rgpd_notice_was_objected_to",
rgpd_notices.has_hardbounced AS "rgpd_notice_has_hardbounced"
END AS "telephone",
rgpd_notices.was_objected_to AS "rgpd_notice_was_objected_to",
rgpd_notices.has_hardbounced AS "rgpd_notice_has_hardbounced"
FROM contacts
LEFT JOIN rgpd_notices
ON
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,6 @@ WITH structures AS (
SELECT * FROM {{ ref('int__union_structures') }}
),

plausible_personal_emails AS (
SELECT * FROM {{ ref('int__plausible_personal_emails') }}
),

adresses AS (
SELECT * FROM {{ ref('int__union_adresses__enhanced') }}
),
Expand Down Expand Up @@ -42,17 +38,15 @@ valid_structures AS (
final AS (
SELECT
valid_structures.*,
adresses.longitude AS "longitude",
adresses.latitude AS "latitude",
adresses.complement_adresse AS "complement_adresse",
adresses.commune AS "commune",
adresses.adresse AS "adresse",
adresses.code_postal AS "code_postal",
adresses.code_insee AS "code_insee",
COALESCE(plausible_personal_emails._di_surrogate_id IS NOT NULL, FALSE) AS "_di_email_is_pii"
adresses.longitude AS "longitude",
adresses.latitude AS "latitude",
adresses.complement_adresse AS "complement_adresse",
adresses.commune AS "commune",
adresses.adresse AS "adresse",
adresses.code_postal AS "code_postal",
adresses.code_insee AS "code_insee"
FROM
valid_structures
LEFT JOIN plausible_personal_emails ON valid_structures._di_surrogate_id = plausible_personal_emails._di_surrogate_id
LEFT JOIN adresses ON valid_structures._di_adresse_surrogate_id = adresses._di_surrogate_id
)

Expand Down
15 changes: 13 additions & 2 deletions pipeline/dbt/models/marts/opendata/opendata_services.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ WITH services AS (
SELECT * FROM {{ ref('int__union_services__enhanced') }}
),

courriels_personnels AS (
SELECT * FROM {{ ref('int__courriels_personnels') }}
),

final AS (
SELECT
{{
Expand All @@ -10,9 +14,16 @@ final AS (
from=ref('int__union_services__enhanced'),
except=['courriel', 'telephone'])
}},
{{ obfuscate('courriel') }} AS "courriel",
{{ obfuscate('telephone') }} AS "telephone"
-- obfuscate email (& telephone) if it is potentially personal
NULLIF(services.courriel, courriels_personnels.courriel) AS "courriel",
CASE
WHEN services.courriel != courriels_personnels.courriel
THEN services.telephone
END AS "telephone"
FROM services
LEFT JOIN courriels_personnels
ON services.courriel = courriels_personnels.courriel
-- exclude specific sources from open data
WHERE services.source NOT IN ('soliguide', 'agefiph', 'data-inclusion')
)

Expand Down
Loading

0 comments on commit 48a6c95

Please sign in to comment.