Skip to content

Commit

Permalink
feat(pipeline) : Simplify notify_rgpd_contacts
Browse files Browse the repository at this point in the history
After an entire year of operation, we noticed that our sources never
change their contact emails. Ever.

Even if someday this happens, we can probably suppose that it's till
going to be quite rare.

In those exceptional cases, it means that the structure would get a new
RGPD notification.

But it does make the code way easier to maintain and more robust:
- now we will never miss an email that would have been "forgotten" by
  Brevo (due to rate limits, payment issues, ...)
- it can also be seen as a feature to send a new notification if the
  email changes, for instance when the previous one was incorrect.

Moreover, we don't need to maintain the `contact_uid` list that was sent
to brevo.
  • Loading branch information
vperron committed Oct 24, 2024
1 parent 30bb4a1 commit 5902e9f
Show file tree
Hide file tree
Showing 15 changed files with 80 additions and 153 deletions.
56 changes: 17 additions & 39 deletions pipeline/RGPD.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,57 +7,35 @@ data⋅inclusion.
En l'absence de réponse de leur part, comme vu dans les CGAUs de data⋅inclusion, nous publierons le courriel en
question dans le jeu de données disponible dans l'API.

A date (fin 2023), nous ne proposons l'affichage que des courriels renseignés dans les structures et services DORA.
L'intérêt secondaire pour data⋅inclusion de ces notification est de permettre une validation poussée des e-mails de contact
ainsi notifiés; nous savons donc pour chaque courriel, s'il est digne de confiance ou non.

## Déroulement technique

### Imports Brevo
Chaque jour le DAG `import_brevo` récupère la liste à jour des personnes contactées lors des notifications RGPD.
Pour ce faire il réalise un extract + load (datalake puis datawarehouse) des données depuis l'API Brevo.

A noter que les contacts enregistrés dans Brevo sont identifiés par:
A noter que les contacts enregistrés dans Brevo contiennent a minima:
- leur email (qui est la clé unique de renseignement d'un contact Brevo)
- un ensemble d'attributs, dont:
* la date d'opposition RGPD (remplie par Osiris)
* la liste des structures dont l'adresse email est membre
- le fait que cet email ait donné lieu à un envoi ou non, avec ou sans 'hardbounce'
- un ensemble d'attributs, dont la date d'opposition RGPD (remplie par Osiris)

### Traitements de données
Chaque jour le DAG `main` se chargera de créer les tables (staging & intermediate) liées d'un côté
aux courriels venant des nos sources (structures et services Dora uniquement pour l'instant) et
par ailleurs aux données de contact Brevo.
Chaque jour le DAG `main` se chargera de créer la table `int__union_contacts__enhanced` qui relie courriels venant
de nos sources de données, et courriels connus de Brevo (avec succès de l'envoi ou non, date d'opposition...)

### Notification
Chaque mois, le DAG `notify_rgpd_contacts` tire une liste de contacts depuis les sources qui nous
intéressent (aujourd'hui, Dora uniquement) et en tire une liste de "nouveaux contacts".
Chaque mois, le DAG `notify_rgpd_contacts` tire une liste de courriels connus depuis nos sources.

Pour cela, il compare les `contact_uid` (combinaison de source, stream et UID) d'un contact enregistrés
dans Brevo et ceux provenant de Dora;
- si le `contact_uid` n'existe pas dans Brevo, ET que l'email n'existe pas non plus, c'est un nouveau contact.
- si le `contact_uid` n'existe pas dans Brevo, mais que l'email est connu, on va mettre à jour ce contact.
Ceux n'ayant jamais fait l'objet d'une notification seront déposés dans une liste qui servira à l'envoi
de la prochaine notification; tous les contacts sont par ailleurs ajoutés à la liste de "tous les contacts
connus", qui est celle importée régulièrement par `import_brevo`.

A noter que la mise à jour d'un contact va écraser les `contact_uids` par exemple, mais ne touchera pas à la
date d'opposition RGPD.
Puis nous déclenchons l'envoi d'une nouvelle campagne de notification RGPD pour le mois courant, envoyée
à la liste des "nouveaux emails" connus.

Dans tous les cas, si on a déjà connu ce `contact_uid` dans Brevo, quelle que soit l'adresse mail
associée, nous ne recontacterons pas.

Ainsi, une fois la liste des "nouveaux contacts" établie, le DAG se charge de:

1. Enregistrer dans Brevo la liste de tous les contacts ayant été contactés
2. Remettre à zéro et remplir dans Brevo la liste des "contacts à contacter ce mois-ci"
3. Déclencher une nouvelle campagne de notification dans Brevo et l'envoyer

L'avantage d'utiliser des campagnes indépendantes mensuelles est que ces dernières permettent l'utilisation
des dashboards analytiques poussées de Brevo.

## Reste à faire

### Intégration dans l'API
Il faut intégrer les emails des sources Dora (et Emplois !) dans l'API.

Cela va concerner:

- toutes les adresses connues (notification envoyée à ce jour ou non)
- non "opposés" via le DPO dans Brevo
- non "unsubscribed"
- ipour le confort des utilisateurs, non hardbouncés bar Brevo : le mail est forcément invalide.
Par le passé nous maintenions une liste des structures et services ayant été liés à un email, grâce à un attribut
spécial sous forme de liste, attaché à chaque contact et envoyé à Brevo. Nous avons remarqué au bout d'un an,
que aucune source ne modifiait ses courriels; nous avonc donc cessé de maintenir ce système complexe, mais il en
reste des traces dans certains contacts Brevo. Ils ne sont plus mis à jour.
50 changes: 12 additions & 38 deletions pipeline/dags/notify_rgpd_contacts.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,53 +12,27 @@


def _sync_new_contacts_to_brevo():
from collections import defaultdict

from airflow.models import Variable

from dag_utils import constants, pg
from dag_utils.sources import brevo

potential_contacts = pg.hook().get_records(
sql=(
"SELECT courriel, ARRAY_AGG(contact_uid) as contact_uids "
"FROM public_intermediate.int__union_contacts "
"GROUP BY courriel"
)
)

brevo_contacts = pg.hook().get_records(
contacts = pg.hook().get_records(
sql=(
"SELECT courriel, contact_uids "
"FROM public_intermediate.int_brevo__contacts"
"""
SELECT
json_build_object('email', _courriel_original),
rgpd_notice_has_hardbounced
FROM public_intermediate.int__union_contacts__enhanced
GROUP BY _courriel_original, rgpd_notice_has_hardbounced
ORDER BY _courriel_original
"""
)
)

brevo_contacts_uid_set = {
uid for _, contact_uids in brevo_contacts for uid in contact_uids
}
brevo_contacts_map = {email: contact_uids for email, contact_uids in brevo_contacts}

# We only consider a contact as "new" if BOTH its contact uid AND its email are new.
# If the email is not new but linked to a new contact UID, update the associated
# list accordingly.
new_contacts_map = defaultdict(list)
for email, contact_uids in potential_contacts:
for contact_uid in contact_uids:
if contact_uid not in brevo_contacts_uid_set:
if email in brevo_contacts_map:
brevo_contacts_map[email].append(contact_uid)
else:
new_contacts_map[email].append(contact_uid)

new_contacts = [
{"email": email, "attributes": {"contact_uids": ",".join(contact_uids)}}
for email, contact_uids in new_contacts_map.items()
]
all_contacts = new_contacts + [
{"email": email, "attributes": {"contact_uids": ",".join(contact_uids)}}
for email, contact_uids in brevo_contacts_map.items()
]
# new contacts : all those for which we don't even know if they bounced or not
new_contacts = [c[0] for c in contacts if c[1] is None]
all_contacts = [c[0] for c in contacts]

brevo_client = brevo.BrevoClient(token=Variable.get("BREVO_API_KEY"))

Expand Down
29 changes: 12 additions & 17 deletions pipeline/dbt/models/intermediate/_models.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@ models:

- name: int__union_contacts
columns:
- name: contact_uid
data_tests:
- unique
- not_null
- name: courriel
data_tests:
- not_null
Expand All @@ -43,6 +39,7 @@ models:
- name: int__union_contacts__enhanced
columns:
- name: _di_surrogate_id
- name: _courriel_original
- name: id
- name: source
- name: contact_nom_prenom
Expand Down Expand Up @@ -241,23 +238,21 @@ unit_tests:
given:
- input: ref('int__union_contacts')
rows:
- {"source": "dora", "id": "foo", "courriel": "[email protected]", "contact_nom_prenom": "Jean Valjean", "telephone": "0123456789"}
- {"source": "mednum", "id": "bar", "courriel": "[email protected]", "contact_nom_prenom": "Jean Valjean", "telephone": "0123456789"}
- {"source": "lezemplois", "id": "baz", "courriel": "[email protected]", "contact_nom_prenom": "Jean Valjean", "telephone": "0123456789"}
- {"source": "dora", "id": "foo", "courriel": "[email protected]", "contact_nom_prenom": "Jean Valjean", "telephone": "0123456789"}
- {"source": "other", "id": "foo", "courriel": "[email protected]", "contact_nom_prenom": "Jean Other", "telephone": "9876543210"}
- {"source": "mednum", "id": "bar", "courriel": "[email protected]", "contact_nom_prenom": "Jean Valjean", "telephone": "0123456789"}
- {"source": "lezemplois", "id": "baz", "courriel": "[email protected]", "contact_nom_prenom": "Jean Valjean", "telephone": "0123456789"}
- {"source": "autre", "id": "solo", "courriel": "[email protected]", "contact_nom_prenom": "Autre Personne", "telephone": "06666666"}
- input: ref('int_brevo__contacts')
format: sql
rows:
select ARRAY['dora:services:foo'] as contact_uids, TRUE as est_interdit, '2021-01-01' as date_di_rgpd_opposition
UNION ALL
select ARRAY['mednum:services:bar'] as contact_uids, FALSE as est_interdit, '2021-01-01' as date_di_rgpd_opposition
UNION ALL
select ARRAY['lezemplois:services:baz'] as contact_uids, FALSE as est_interdit, NULL as date_di_rgpd_opposition
UNION ALL
select ARRAY['sans:services:rapport'] as contact_uids, TRUE as est_interdit, NULL as date_di_rgpd_opposition
- {"courriel": "[email protected]", "has_hardbounced": true, "was_objected_to": true}
- {"courriel": "[email protected]", "has_hardbounced": false, "was_objected_to": true}
- {"courriel": "[email protected]", "has_hardbounced": false, "was_objected_to": null}
- {"courriel": "[email protected]", "has_hardbounced": true, "was_objected_to": null}
expect:
rows:
- {source: dora, id: foo, courriel: NULL, contact_nom_prenom: NULL, telephone: NULL} # objected to, hardbounce
- {source: other, id: foo, courriel: NULL, contact_nom_prenom: NULL, telephone: NULL} # objected to, hardbounce
- {source: mednum, id: bar, courriel: NULL, contact_nom_prenom: NULL, telephone: NULL} # objected to, no hardbounce
- {source: lezemplois, id: baz, courriel: truc@toto.at, contact_nom_prenom: Jean Valjean, telephone: '0123456789'} # not objected to, no hardbounce
- {source: autre, id: solo, courriel: [email protected], contact_nom_prenom: Autre Personne, telephone: '06666666'} # hardbounce : only email is hidden
- {source: lezemplois, id: baz, courriel: truc3@toto.at, contact_nom_prenom: Jean Valjean, telephone: '0123456789'} # not objected to, no hardbounce
- {source: autre, id: solo, courriel: NULL, contact_nom_prenom: Autre Personne, telephone: '06666666'} # hardbounce : only email is hidden
11 changes: 7 additions & 4 deletions pipeline/dbt/models/intermediate/brevo/_brevo__models.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,20 @@ models:
- not_null
- dbt_utils.not_constant
- dbt_utils.not_empty_string
- name: est_interdit
- name: has_hardbounced
description: |
Signifie que cet e-mail est considéré par Brevo comme:
- un "hard bounce" (n'existe pas)
- a fait l'objet d'une demande de désincription explicite.
- a classé le mail en spam
- a été classé "bloqué" par un administrateur Brevo
data_tests:
- not_null
- accepted_values:
values: [true, false]

- name: date_di_rgpd_opposition
description: Date d'opposition au RGPD telle qu'entrée par notre DPO dans Brevo lorsque signalé.
- name: was_objected_to
description: |
Permet de savoir si le contact a fait l'objet d'une demande de retrait RGPD.
data_tests:
- accepted_values:
values: [true, false]
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@ final AS (
SELECT
id AS "id",
email AS "courriel",
email_blacklisted AS "est_interdit",
date_di_rgpd_opposition AS "date_di_rgpd_opposition",
STRING_TO_ARRAY(contact_uids, ',') AS "contact_uids"
email_blacklisted AS "has_hardbounced",
date_di_rgpd_opposition IS NOT NULL AS "was_objected_to"
FROM contacts
)

Expand Down
3 changes: 1 addition & 2 deletions pipeline/dbt/models/intermediate/int__union_contacts.sql
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ final AS (
source AS "source",
courriel AS "courriel",
telephone AS "telephone",
contact_nom_prenom AS "contact_nom_prenom",
contact_uid AS "contact_uid"
contact_nom_prenom AS "contact_nom_prenom"
FROM contacts_union
)

Expand Down
34 changes: 15 additions & 19 deletions pipeline/dbt/models/intermediate/int__union_contacts__enhanced.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,50 +2,46 @@ WITH contacts AS (
SELECT * FROM {{ ref('int__union_contacts') }}
),

brevo_contacts AS (
SELECT * FROM {{ ref('int_brevo__contacts') }}
),

rgpd_notices AS (
SELECT DISTINCT
SPLIT_PART(contact_uid, ':', 1) AS "source",
SPLIT_PART(contact_uid, ':', 3) AS "id",
date_di_rgpd_opposition IS NOT NULL AS "was_objected_to",
est_interdit AS "has_hardbounced"
FROM brevo_contacts,
UNNEST(contact_uids) AS contact_uid
SELECT
courriel,
has_hardbounced,
was_objected_to,
was_objected_to IS TRUE AS was_objected_to_truthy,
has_hardbounced IS TRUE AS has_hardbounced_truthy
FROM {{ ref('int_brevo__contacts') }}
),

final AS (
SELECT
contacts.source || '-' || contacts.id AS "_di_surrogate_id",
contacts.id AS "id",
contacts.source AS "source",
contacts.courriel AS "_courriel_original",
CASE
WHEN
rgpd_notices.id IS NULL
OR NOT rgpd_notices.was_objected_to
rgpd_notices.courriel IS NULL
OR NOT rgpd_notices.was_objected_to_truthy
THEN contacts.contact_nom_prenom
END AS "contact_nom_prenom",
CASE
WHEN
rgpd_notices.id IS NULL
OR (NOT rgpd_notices.was_objected_to AND NOT rgpd_notices.has_hardbounced)
rgpd_notices.courriel IS NULL
OR (NOT rgpd_notices.was_objected_to_truthy AND NOT rgpd_notices.has_hardbounced_truthy)
THEN contacts.courriel
END AS "courriel",
CASE
WHEN
rgpd_notices.id IS NULL
OR NOT rgpd_notices.was_objected_to
rgpd_notices.courriel IS NULL
OR NOT rgpd_notices.was_objected_to_truthy
THEN contacts.telephone
END AS "telephone",
rgpd_notices.was_objected_to AS "rgpd_notice_was_objected_to",
rgpd_notices.has_hardbounced AS "rgpd_notice_has_hardbounced"
FROM contacts
LEFT JOIN rgpd_notices
ON
contacts.source = rgpd_notices.source
AND contacts.id = rgpd_notices.id
contacts.courriel = rgpd_notices.courriel
)

SELECT * FROM final
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@ models:

- name: int_dora__contacts
columns:
- name: contact_uid
data_tests:
- unique
- not_null
- name: courriel
data_tests:
- not_null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ WITH structure_contacts AS (
_di_source_id AS "source",
courriel AS "courriel",
telephone AS "telephone",
NULL AS "contact_nom_prenom",
'dora:structures:' || id AS "contact_uid"
NULL AS "contact_nom_prenom"
FROM {{ ref('stg_dora__structures') }}
WHERE courriel IS NOT NULL
),
Expand All @@ -16,8 +15,7 @@ service_contacts AS (
_di_source_id AS "source",
courriel AS "courriel",
telephone AS "telephone",
contact_nom_prenom AS "contact_nom_prenom",
'dora:services:' || id AS "contact_uid"
contact_nom_prenom AS "contact_nom_prenom"
FROM {{ ref('stg_dora__services') }}
WHERE courriel IS NOT NULL
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@ models:

- name: int_mediation_numerique__contacts
columns:
- name: contact_uid
data_tests:
- unique
- not_null
- name: courriel
data_tests:
- not_null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@ WITH final AS (
_di_source_id AS "source",
courriel AS "courriel",
telephone AS "telephone",
NULL AS "contact_nom_prenom",
-- services in mediation numerique have no contact information
'mediation-numerique:structures:' || id AS "contact_uid"
NULL AS "contact_nom_prenom"
FROM {{ ref('stg_mediation_numerique__structures') }}
WHERE courriel IS NOT NULL
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ models:
- unique
- not_null
- dbt_utils.not_empty_string

- name: int_mes_aides__structures
data_tests:
- check_structure:
Expand Down Expand Up @@ -56,11 +55,7 @@ models:
field: id
- name: int_mes_aides__contacts
columns:
- name: contact_uid
data_tests:
- unique
- not_null
- name: courriel
data_tests:
- not_null
- dbt_utils.not_empty_string
- dbt_utils.not_empty_string
Loading

0 comments on commit 5902e9f

Please sign in to comment.