Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pipeline) : Simplify notify_rgpd_contacts #326

Merged
merged 1 commit into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 17 additions & 39 deletions pipeline/RGPD.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,57 +7,35 @@ data⋅inclusion.
En l'absence de réponse de leur part, comme vu dans les CGAUs de data⋅inclusion, nous publierons le courriel en
question dans le jeu de données disponible dans l'API.

A date (fin 2023), nous ne proposons l'affichage que des courriels renseignés dans les structures et services DORA.
L'intérêt secondaire pour data⋅inclusion de ces notification est de permettre une validation poussée des e-mails de contact
ainsi notifiés; nous savons donc pour chaque courriel, s'il est digne de confiance ou non.

## Déroulement technique

### Imports Brevo
Chaque jour le DAG `import_brevo` récupère la liste à jour des personnes contactées lors des notifications RGPD.
Pour ce faire il réalise un extract + load (datalake puis datawarehouse) des données depuis l'API Brevo.

A noter que les contacts enregistrés dans Brevo sont identifiés par:
A noter que les contacts enregistrés dans Brevo contiennent a minima:
- leur email (qui est la clé unique de renseignement d'un contact Brevo)
- un ensemble d'attributs, dont:
* la date d'opposition RGPD (remplie par Osiris)
* la liste des structures dont l'adresse email est membre
- le fait que cet email ait donné lieu à un envoi ou non, avec ou sans 'hardbounce'
- un ensemble d'attributs, dont la date d'opposition RGPD (remplie par Osiris)

### Traitements de données
Chaque jour le DAG `main` se chargera de créer les tables (staging & intermediate) liées d'un côté
aux courriels venant des nos sources (structures et services Dora uniquement pour l'instant) et
par ailleurs aux données de contact Brevo.
Chaque jour le DAG `main` se chargera de créer la table `int__union_contacts__enhanced` qui relie courriels venant
de nos sources de données, et courriels connus de Brevo (avec succès de l'envoi ou non, date d'opposition...)

### Notification
Chaque mois, le DAG `notify_rgpd_contacts` tire une liste de contacts depuis les sources qui nous
intéressent (aujourd'hui, Dora uniquement) et en tire une liste de "nouveaux contacts".
Chaque mois, le DAG `notify_rgpd_contacts` tire une liste de courriels connus depuis nos sources.

Pour cela, il compare les `contact_uid` (combinaison de source, stream et UID) d'un contact enregistrés
dans Brevo et ceux provenant de Dora;
- si le `contact_uid` n'existe pas dans Brevo, ET que l'email n'existe pas non plus, c'est un nouveau contact.
- si le `contact_uid` n'existe pas dans Brevo, mais que l'email est connu, on va mettre à jour ce contact.
Ceux n'ayant jamais fait l'objet d'une notification seront déposés dans une liste qui servira à l'envoi
de la prochaine notification; tous les contacts sont par ailleurs ajoutés à la liste de "tous les contacts
connus", qui est celle importée régulièrement par `import_brevo`.

A noter que la mise à jour d'un contact va écraser les `contact_uids` par exemple, mais ne touchera pas à la
date d'opposition RGPD.
Puis nous déclenchons l'envoi d'une nouvelle campagne de notification RGPD pour le mois courant, envoyée
à la liste des "nouveaux emails" connus.

Dans tous les cas, si on a déjà connu ce `contact_uid` dans Brevo, quelle que soit l'adresse mail
associée, nous ne recontacterons pas.

Ainsi, une fois la liste des "nouveaux contacts" établie, le DAG se charge de:

1. Enregistrer dans Brevo la liste de tous les contacts ayant été contactés
2. Remettre à zéro et remplir dans Brevo la liste des "contacts à contacter ce mois-ci"
3. Déclencher une nouvelle campagne de notification dans Brevo et l'envoyer

L'avantage d'utiliser des campagnes indépendantes mensuelles est que ces dernières permettent l'utilisation
des dashboards analytiques poussées de Brevo.

## Reste à faire

### Intégration dans l'API
Il faut intégrer les emails des sources Dora (et Emplois !) dans l'API.

Cela va concerner:

- toutes les adresses connues (notification envoyée à ce jour ou non)
- non "opposés" via le DPO dans Brevo
- non "unsubscribed"
- ipour le confort des utilisateurs, non hardbouncés bar Brevo : le mail est forcément invalide.
Par le passé nous maintenions une liste des structures et services ayant été liés à un email, grâce à un attribut
spécial sous forme de liste, attaché à chaque contact et envoyé à Brevo. Nous avons remarqué au bout d'un an,
que aucune source ne modifiait ses courriels; nous avonc donc cessé de maintenir ce système complexe, mais il en
vperron marked this conversation as resolved.
Show resolved Hide resolved
reste des traces dans certains contacts Brevo. Ils ne sont plus mis à jour.
52 changes: 14 additions & 38 deletions pipeline/dags/notify_rgpd_contacts.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,53 +12,29 @@


def _sync_new_contacts_to_brevo():
from collections import defaultdict

from airflow.models import Variable

from dag_utils import constants, pg
from dag_utils.sources import brevo

potential_contacts = pg.hook().get_records(
sql=(
"SELECT courriel, ARRAY_AGG(contact_uid) as contact_uids "
"FROM public_intermediate.int__union_contacts "
"GROUP BY courriel"
)
)

brevo_contacts = pg.hook().get_records(
contacts = pg.hook().get_records(
sql=(
"SELECT courriel, contact_uids "
"FROM public_intermediate.int_brevo__contacts"
"""
SELECT
json_build_object('email', our_contacts.courriel),
brevo_contacts.has_hardbounced
FROM public_intermediate.int__union_contacts AS our_contacts
LEFT JOIN public_intermediate.int_brevo__contacts AS brevo_contacts
ON our_contacts.courriel = brevo_contacts.courriel
GROUP BY our_contacts.courriel, brevo_contacts.has_hardbounced
ORDER BY our_contacts.courriel
"""
)
)

brevo_contacts_uid_set = {
uid for _, contact_uids in brevo_contacts for uid in contact_uids
}
brevo_contacts_map = {email: contact_uids for email, contact_uids in brevo_contacts}

# We only consider a contact as "new" if BOTH its contact uid AND its email are new.
# If the email is not new but linked to a new contact UID, update the associated
# list accordingly.
new_contacts_map = defaultdict(list)
for email, contact_uids in potential_contacts:
for contact_uid in contact_uids:
if contact_uid not in brevo_contacts_uid_set:
if email in brevo_contacts_map:
brevo_contacts_map[email].append(contact_uid)
else:
new_contacts_map[email].append(contact_uid)

new_contacts = [
{"email": email, "attributes": {"contact_uids": ",".join(contact_uids)}}
for email, contact_uids in new_contacts_map.items()
]
all_contacts = new_contacts + [
{"email": email, "attributes": {"contact_uids": ",".join(contact_uids)}}
for email, contact_uids in brevo_contacts_map.items()
]
# new contacts : all those for which we don't even know if they bounced or not
new_contacts = [c[0] for c in contacts if c[1] is None]
all_contacts = [c[0] for c in contacts]
vperron marked this conversation as resolved.
Show resolved Hide resolved

brevo_client = brevo.BrevoClient(token=Variable.get("BREVO_API_KEY"))

Expand Down
29 changes: 12 additions & 17 deletions pipeline/dbt/models/intermediate/_models.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,6 @@ models:

- name: int__union_contacts
columns:
- name: contact_uid
data_tests:
- unique
- not_null
- name: courriel
data_tests:
- not_null
Expand Down Expand Up @@ -260,23 +256,22 @@ unit_tests:
given:
- input: ref('int__union_contacts')
rows:
- {"source": "dora", "id": "foo", "courriel": "[email protected]", "contact_nom_prenom": "Jean Valjean", "telephone": "0123456789"}
- {"source": "mednum", "id": "bar", "courriel": "[email protected]", "contact_nom_prenom": "Jean Valjean", "telephone": "0123456789"}
- {"source": "lezemplois", "id": "baz", "courriel": "[email protected]", "contact_nom_prenom": "Jean Valjean", "telephone": "0123456789"}
- {"source": "dora", "id": "foo", "courriel": "[email protected]", "contact_nom_prenom": "Jean Valjean", "telephone": "0123456789"}
- {"source": "other", "id": "foo", "courriel": "[email protected]", "contact_nom_prenom": "Jean Other", "telephone": "9876543210"}
- {"source": "mednum", "id": "bar", "courriel": "[email protected]", "contact_nom_prenom": "Jean Valjean", "telephone": "0123456789"}
- {"source": "lezemplois", "id": "baz", "courriel": "[email protected]", "contact_nom_prenom": "Jean Valjean", "telephone": "0123456789"}
- {"source": "autre", "id": "solo", "courriel": "[email protected]", "contact_nom_prenom": "Autre Personne", "telephone": "06666666"}
- input: ref('int_brevo__contacts')
format: sql
rows:
select ARRAY['dora:services:foo'] as contact_uids, TRUE as est_interdit, '2021-01-01' as date_di_rgpd_opposition
UNION ALL
select ARRAY['mednum:services:bar'] as contact_uids, FALSE as est_interdit, '2021-01-01' as date_di_rgpd_opposition
UNION ALL
select ARRAY['lezemplois:services:baz'] as contact_uids, FALSE as est_interdit, NULL as date_di_rgpd_opposition
UNION ALL
select ARRAY['sans:services:rapport'] as contact_uids, TRUE as est_interdit, NULL as date_di_rgpd_opposition
# note: values can't be NULL. has_hardbounced is set to true/false by brevo, and was_objected to is set to true/false by us.
- {"courriel": "[email protected]", "has_hardbounced": true, "was_objected_to": true}
- {"courriel": "[email protected]", "has_hardbounced": false, "was_objected_to": true}
- {"courriel": "[email protected]", "has_hardbounced": false, "was_objected_to": false}
- {"courriel": "[email protected]", "has_hardbounced": true, "was_objected_to": false}
expect:
rows:
- {source: dora, id: foo, courriel: NULL, contact_nom_prenom: NULL, telephone: NULL} # objected to, hardbounce
- {source: other, id: foo, courriel: NULL, contact_nom_prenom: NULL, telephone: NULL} # objected to, hardbounce
- {source: mednum, id: bar, courriel: NULL, contact_nom_prenom: NULL, telephone: NULL} # objected to, no hardbounce
- {source: lezemplois, id: baz, courriel: truc@toto.at, contact_nom_prenom: Jean Valjean, telephone: '0123456789'} # not objected to, no hardbounce
- {source: autre, id: solo, courriel: [email protected], contact_nom_prenom: Autre Personne, telephone: '06666666'} # hardbounce : only email is hidden
vmttn marked this conversation as resolved.
Show resolved Hide resolved
- {source: lezemplois, id: baz, courriel: truc3@toto.at, contact_nom_prenom: Jean Valjean, telephone: '0123456789'} # not objected to, no hardbounce
- {source: autre, id: solo, courriel: NULL, contact_nom_prenom: Autre Personne, telephone: '06666666'} # hardbounce : only email is hidden
11 changes: 7 additions & 4 deletions pipeline/dbt/models/intermediate/brevo/_brevo__models.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,20 @@ models:
- not_null
- dbt_utils.not_constant
- dbt_utils.not_empty_string
- name: est_interdit
- name: has_hardbounced
description: |
Signifie que cet e-mail est considéré par Brevo comme:
- un "hard bounce" (n'existe pas)
- a fait l'objet d'une demande de désincription explicite.
- a classé le mail en spam
- a été classé "bloqué" par un administrateur Brevo
data_tests:
- not_null
- accepted_values:
values: [true, false]

- name: date_di_rgpd_opposition
description: Date d'opposition au RGPD telle qu'entrée par notre DPO dans Brevo lorsque signalé.
- name: was_objected_to
description: |
Permet de savoir si le contact a fait l'objet d'une demande de retrait RGPD.
data_tests:
- accepted_values:
values: [true, false]
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@ WITH contacts AS (

final AS (
SELECT
id AS "id",
email AS "courriel",
email_blacklisted AS "est_interdit",
date_di_rgpd_opposition AS "date_di_rgpd_opposition",
STRING_TO_ARRAY(contact_uids, ',') AS "contact_uids"
id AS "id",
email AS "courriel",
email_blacklisted AS "has_hardbounced",
date_di_rgpd_opposition IS NOT NULL AS "was_objected_to"
FROM contacts
)

Expand Down
3 changes: 1 addition & 2 deletions pipeline/dbt/models/intermediate/int__union_contacts.sql
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ final AS (
source AS "source",
courriel AS "courriel",
telephone AS "telephone",
contact_nom_prenom AS "contact_nom_prenom",
contact_uid AS "contact_uid"
contact_nom_prenom AS "contact_nom_prenom"
FROM contacts_union
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,12 @@ WITH contacts AS (
SELECT * FROM {{ ref('int__union_contacts') }}
),

brevo_contacts AS (
SELECT * FROM {{ ref('int_brevo__contacts') }}
),

rgpd_notices AS (
SELECT DISTINCT
SPLIT_PART(contact_uid, ':', 1) AS "source",
SPLIT_PART(contact_uid, ':', 3) AS "id",
date_di_rgpd_opposition IS NOT NULL AS "was_objected_to",
est_interdit AS "has_hardbounced"
FROM brevo_contacts,
UNNEST(contact_uids) AS contact_uid
SELECT
courriel,
has_hardbounced,
was_objected_to
FROM {{ ref('int_brevo__contacts') }}
),

final AS (
Expand All @@ -23,19 +17,19 @@ final AS (
contacts.source AS "source",
CASE
WHEN
rgpd_notices.id IS NULL
rgpd_notices.courriel IS NULL
OR NOT rgpd_notices.was_objected_to
THEN contacts.contact_nom_prenom
END AS "contact_nom_prenom",
CASE
WHEN
rgpd_notices.id IS NULL
rgpd_notices.courriel IS NULL
OR (NOT rgpd_notices.was_objected_to AND NOT rgpd_notices.has_hardbounced)
THEN contacts.courriel
END AS "courriel",
CASE
WHEN
rgpd_notices.id IS NULL
rgpd_notices.courriel IS NULL
OR NOT rgpd_notices.was_objected_to
THEN contacts.telephone
END AS "telephone",
Expand All @@ -44,8 +38,7 @@ final AS (
FROM contacts
LEFT JOIN rgpd_notices
ON
contacts.source = rgpd_notices.source
AND contacts.id = rgpd_notices.id
contacts.courriel = rgpd_notices.courriel
)

SELECT * FROM final
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@ models:

- name: int_dora__contacts
columns:
- name: contact_uid
data_tests:
- unique
- not_null
- name: courriel
data_tests:
- not_null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ WITH structure_contacts AS (
_di_source_id AS "source",
courriel AS "courriel",
telephone AS "telephone",
NULL AS "contact_nom_prenom",
'dora:structures:' || id AS "contact_uid"
NULL AS "contact_nom_prenom"
FROM {{ ref('stg_dora__structures') }}
WHERE courriel IS NOT NULL
),
Expand All @@ -16,8 +15,7 @@ service_contacts AS (
_di_source_id AS "source",
courriel AS "courriel",
telephone AS "telephone",
contact_nom_prenom AS "contact_nom_prenom",
'dora:services:' || id AS "contact_uid"
contact_nom_prenom AS "contact_nom_prenom"
FROM {{ ref('stg_dora__services') }}
WHERE courriel IS NOT NULL
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@ models:

- name: int_mediation_numerique__contacts
columns:
- name: contact_uid
data_tests:
- unique
- not_null
- name: courriel
data_tests:
- not_null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@ WITH final AS (
_di_source_id AS "source",
courriel AS "courriel",
telephone AS "telephone",
NULL AS "contact_nom_prenom",
-- services in mediation numerique have no contact information
'mediation-numerique:structures:' || id AS "contact_uid"
NULL AS "contact_nom_prenom"
FROM {{ ref('stg_mediation_numerique__structures') }}
WHERE courriel IS NOT NULL
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ models:
- unique
- not_null
- dbt_utils.not_empty_string

- name: int_mes_aides__structures
data_tests:
- check_structure:
Expand Down Expand Up @@ -56,11 +55,7 @@ models:
field: id
- name: int_mes_aides__contacts
columns:
- name: contact_uid
data_tests:
- unique
- not_null
- name: courriel
data_tests:
- not_null
- dbt_utils.not_empty_string
- dbt_utils.not_empty_string
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
WITH structure_contacts AS (
SELECT
email AS "courriel",
'mes-aides:garages:' || id AS contact_uid
id AS "id",
_di_source_id AS "source",
email AS "courriel",
telephone AS "telephone",
NULL AS "contact_nom_prenom"
FROM {{ ref('stg_mes_aides__garages') }}
WHERE email IS NOT NULL
),

service_contacts AS (
SELECT
contact_email AS "courriel",
'mes-aides:aides:' || id AS contact_uid
id AS "id",
_di_source_id AS "source",
contact_email AS "courriel",
NULL AS "telephone",
NULL AS "contact_nom_prenom"
FROM {{ ref('stg_mes_aides__permis_velo') }}
WHERE contact_email IS NOT NULL
),
Expand Down
Loading