diff --git a/pipeline/RGPD.md b/pipeline/RGPD.md index 14c41cbca..952fad0e3 100644 --- a/pipeline/RGPD.md +++ b/pipeline/RGPD.md @@ -7,7 +7,8 @@ data⋅inclusion. En l'absence de réponse de leur part, comme vu dans les CGAUs de data⋅inclusion, nous publierons le courriel en question dans le jeu de données disponible dans l'API. -A date (fin 2023), nous ne proposons l'affichage que des courriels renseignés dans les structures et services DORA. +L'intérêt secondaire pour data⋅inclusion de ces notification est de permettre une validation poussée des e-mails de contact +ainsi notifiés; nous savons donc pour chaque courriel, s'il est digne de confiance ou non. ## Déroulement technique @@ -15,49 +16,26 @@ A date (fin 2023), nous ne proposons l'affichage que des courriels renseignés d Chaque jour le DAG `import_brevo` récupère la liste à jour des personnes contactées lors des notifications RGPD. Pour ce faire il réalise un extract + load (datalake puis datawarehouse) des données depuis l'API Brevo. -A noter que les contacts enregistrés dans Brevo sont identifiés par: +A noter que les contacts enregistrés dans Brevo contiennent a minima: - leur email (qui est la clé unique de renseignement d'un contact Brevo) -- un ensemble d'attributs, dont: - * la date d'opposition RGPD (remplie par Osiris) - * la liste des structures dont l'adresse email est membre +- le fait que cet email ait donné lieu à un envoi ou non, avec ou sans 'hardbounce' +- un ensemble d'attributs, dont la date d'opposition RGPD (remplie par Osiris) ### Traitements de données -Chaque jour le DAG `main` se chargera de créer les tables (staging & intermediate) liées d'un côté -aux courriels venant des nos sources (structures et services Dora uniquement pour l'instant) et -par ailleurs aux données de contact Brevo. +Chaque jour le DAG `main` se chargera de créer la table `int__union_contacts__enhanced` qui relie courriels venant +de nos sources de données, et courriels connus de Brevo (avec succès de l'envoi ou non, date d'opposition...) ### Notification -Chaque mois, le DAG `notify_rgpd_contacts` tire une liste de contacts depuis les sources qui nous -intéressent (aujourd'hui, Dora uniquement) et en tire une liste de "nouveaux contacts". +Chaque mois, le DAG `notify_rgpd_contacts` tire une liste de courriels connus depuis nos sources. -Pour cela, il compare les `contact_uid` (combinaison de source, stream et UID) d'un contact enregistrés -dans Brevo et ceux provenant de Dora; -- si le `contact_uid` n'existe pas dans Brevo, ET que l'email n'existe pas non plus, c'est un nouveau contact. -- si le `contact_uid` n'existe pas dans Brevo, mais que l'email est connu, on va mettre à jour ce contact. +Ceux n'ayant jamais fait l'objet d'une notification seront déposés dans une liste qui servira à l'envoi +de la prochaine notification; tous les contacts sont par ailleurs ajoutés à la liste de "tous les contacts +connus", qui est celle importée régulièrement par `import_brevo`. -A noter que la mise à jour d'un contact va écraser les `contact_uids` par exemple, mais ne touchera pas à la -date d'opposition RGPD. +Puis nous déclenchons l'envoi d'une nouvelle campagne de notification RGPD pour le mois courant, envoyée +à la liste des "nouveaux emails" connus. -Dans tous les cas, si on a déjà connu ce `contact_uid` dans Brevo, quelle que soit l'adresse mail -associée, nous ne recontacterons pas. - -Ainsi, une fois la liste des "nouveaux contacts" établie, le DAG se charge de: - -1. Enregistrer dans Brevo la liste de tous les contacts ayant été contactés -2. Remettre à zéro et remplir dans Brevo la liste des "contacts à contacter ce mois-ci" -3. Déclencher une nouvelle campagne de notification dans Brevo et l'envoyer - -L'avantage d'utiliser des campagnes indépendantes mensuelles est que ces dernières permettent l'utilisation -des dashboards analytiques poussées de Brevo. - -## Reste à faire - -### Intégration dans l'API -Il faut intégrer les emails des sources Dora (et Emplois !) dans l'API. - -Cela va concerner: - -- toutes les adresses connues (notification envoyée à ce jour ou non) -- non "opposés" via le DPO dans Brevo -- non "unsubscribed" -- ipour le confort des utilisateurs, non hardbouncés bar Brevo : le mail est forcément invalide. +Par le passé nous maintenions une liste des structures et services ayant été liés à un email, grâce à un attribut +spécial sous forme de liste, attaché à chaque contact et envoyé à Brevo. Nous avons remarqué au bout d'un an, +que aucune source ne modifiait ses courriels; nous avonc donc cessé de maintenir ce système complexe, mais il en +reste des traces dans certains contacts Brevo. Ils ne sont plus mis à jour. diff --git a/pipeline/dags/notify_rgpd_contacts.py b/pipeline/dags/notify_rgpd_contacts.py index 9d9f5c183..0b6c743be 100644 --- a/pipeline/dags/notify_rgpd_contacts.py +++ b/pipeline/dags/notify_rgpd_contacts.py @@ -12,53 +12,27 @@ def _sync_new_contacts_to_brevo(): - from collections import defaultdict - from airflow.models import Variable from dag_utils import constants, pg from dag_utils.sources import brevo - potential_contacts = pg.hook().get_records( - sql=( - "SELECT courriel, ARRAY_AGG(contact_uid) as contact_uids " - "FROM public_intermediate.int__union_contacts " - "GROUP BY courriel" - ) - ) - - brevo_contacts = pg.hook().get_records( + contacts = pg.hook().get_records( sql=( - "SELECT courriel, contact_uids " - "FROM public_intermediate.int_brevo__contacts" + """ + SELECT + json_build_object('email', _courriel_original), + rgpd_notice_has_hardbounced + FROM public_intermediate.int__union_contacts__enhanced + GROUP BY _courriel_original, rgpd_notice_has_hardbounced + ORDER BY _courriel_original + """ ) ) - brevo_contacts_uid_set = { - uid for _, contact_uids in brevo_contacts for uid in contact_uids - } - brevo_contacts_map = {email: contact_uids for email, contact_uids in brevo_contacts} - - # We only consider a contact as "new" if BOTH its contact uid AND its email are new. - # If the email is not new but linked to a new contact UID, update the associated - # list accordingly. - new_contacts_map = defaultdict(list) - for email, contact_uids in potential_contacts: - for contact_uid in contact_uids: - if contact_uid not in brevo_contacts_uid_set: - if email in brevo_contacts_map: - brevo_contacts_map[email].append(contact_uid) - else: - new_contacts_map[email].append(contact_uid) - - new_contacts = [ - {"email": email, "attributes": {"contact_uids": ",".join(contact_uids)}} - for email, contact_uids in new_contacts_map.items() - ] - all_contacts = new_contacts + [ - {"email": email, "attributes": {"contact_uids": ",".join(contact_uids)}} - for email, contact_uids in brevo_contacts_map.items() - ] + # new contacts : all those for which we don't even know if they bounced or not + new_contacts = [c[0] for c in contacts if c[1] is None] + all_contacts = [c[0] for c in contacts] brevo_client = brevo.BrevoClient(token=Variable.get("BREVO_API_KEY")) diff --git a/pipeline/dbt/models/intermediate/_models.yml b/pipeline/dbt/models/intermediate/_models.yml index af34ee3b3..e839547e8 100644 --- a/pipeline/dbt/models/intermediate/_models.yml +++ b/pipeline/dbt/models/intermediate/_models.yml @@ -13,10 +13,6 @@ models: - name: int__union_contacts columns: - - name: contact_uid - data_tests: - - unique - - not_null - name: courriel data_tests: - not_null @@ -43,6 +39,7 @@ models: - name: int__union_contacts__enhanced columns: - name: _di_surrogate_id + - name: _courriel_original - name: id - name: source - name: contact_nom_prenom @@ -241,23 +238,21 @@ unit_tests: given: - input: ref('int__union_contacts') rows: - - {"source": "dora", "id": "foo", "courriel": "truc@toto.at", "contact_nom_prenom": "Jean Valjean", "telephone": "0123456789"} - - {"source": "mednum", "id": "bar", "courriel": "truc@toto.at", "contact_nom_prenom": "Jean Valjean", "telephone": "0123456789"} - - {"source": "lezemplois", "id": "baz", "courriel": "truc@toto.at", "contact_nom_prenom": "Jean Valjean", "telephone": "0123456789"} + - {"source": "dora", "id": "foo", "courriel": "truc1@toto.at", "contact_nom_prenom": "Jean Valjean", "telephone": "0123456789"} + - {"source": "other", "id": "foo", "courriel": "truc1@toto.at", "contact_nom_prenom": "Jean Other", "telephone": "9876543210"} + - {"source": "mednum", "id": "bar", "courriel": "truc2@toto.at", "contact_nom_prenom": "Jean Valjean", "telephone": "0123456789"} + - {"source": "lezemplois", "id": "baz", "courriel": "truc3@toto.at", "contact_nom_prenom": "Jean Valjean", "telephone": "0123456789"} - {"source": "autre", "id": "solo", "courriel": "solo@autre.net", "contact_nom_prenom": "Autre Personne", "telephone": "06666666"} - input: ref('int_brevo__contacts') - format: sql rows: - select ARRAY['dora:services:foo'] as contact_uids, TRUE as est_interdit, '2021-01-01' as date_di_rgpd_opposition - UNION ALL - select ARRAY['mednum:services:bar'] as contact_uids, FALSE as est_interdit, '2021-01-01' as date_di_rgpd_opposition - UNION ALL - select ARRAY['lezemplois:services:baz'] as contact_uids, FALSE as est_interdit, NULL as date_di_rgpd_opposition - UNION ALL - select ARRAY['sans:services:rapport'] as contact_uids, TRUE as est_interdit, NULL as date_di_rgpd_opposition + - {"courriel": "truc1@toto.at", "has_hardbounced": true, "was_objected_to": true} + - {"courriel": "truc2@toto.at", "has_hardbounced": false, "was_objected_to": true} + - {"courriel": "truc3@toto.at", "has_hardbounced": false, "was_objected_to": null} + - {"courriel": "solo@autre.net", "has_hardbounced": true, "was_objected_to": null} expect: rows: - {source: dora, id: foo, courriel: NULL, contact_nom_prenom: NULL, telephone: NULL} # objected to, hardbounce + - {source: other, id: foo, courriel: NULL, contact_nom_prenom: NULL, telephone: NULL} # objected to, hardbounce - {source: mednum, id: bar, courriel: NULL, contact_nom_prenom: NULL, telephone: NULL} # objected to, no hardbounce - - {source: lezemplois, id: baz, courriel: truc@toto.at, contact_nom_prenom: Jean Valjean, telephone: '0123456789'} # not objected to, no hardbounce - - {source: autre, id: solo, courriel: solo@autre.net, contact_nom_prenom: Autre Personne, telephone: '06666666'} # hardbounce : only email is hidden + - {source: lezemplois, id: baz, courriel: truc3@toto.at, contact_nom_prenom: Jean Valjean, telephone: '0123456789'} # not objected to, no hardbounce + - {source: autre, id: solo, courriel: NULL, contact_nom_prenom: Autre Personne, telephone: '06666666'} # hardbounce : only email is hidden diff --git a/pipeline/dbt/models/intermediate/brevo/_brevo__models.yml b/pipeline/dbt/models/intermediate/brevo/_brevo__models.yml index 67d356d5d..bbc57def2 100644 --- a/pipeline/dbt/models/intermediate/brevo/_brevo__models.yml +++ b/pipeline/dbt/models/intermediate/brevo/_brevo__models.yml @@ -14,7 +14,7 @@ models: - not_null - dbt_utils.not_constant - dbt_utils.not_empty_string - - name: est_interdit + - name: has_hardbounced description: | Signifie que cet e-mail est considéré par Brevo comme: - un "hard bounce" (n'existe pas) @@ -22,9 +22,12 @@ models: - a classé le mail en spam - a été classé "bloqué" par un administrateur Brevo data_tests: - - not_null - accepted_values: values: [true, false] - - name: date_di_rgpd_opposition - description: Date d'opposition au RGPD telle qu'entrée par notre DPO dans Brevo lorsque signalé. + - name: was_objected_to + description: | + Permet de savoir si le contact a fait l'objet d'une demande de retrait RGPD. + data_tests: + - accepted_values: + values: [true, false] diff --git a/pipeline/dbt/models/intermediate/brevo/int_brevo__contacts.sql b/pipeline/dbt/models/intermediate/brevo/int_brevo__contacts.sql index a3f0d8884..be209d80f 100644 --- a/pipeline/dbt/models/intermediate/brevo/int_brevo__contacts.sql +++ b/pipeline/dbt/models/intermediate/brevo/int_brevo__contacts.sql @@ -6,9 +6,8 @@ final AS ( SELECT id AS "id", email AS "courriel", - email_blacklisted AS "est_interdit", - date_di_rgpd_opposition AS "date_di_rgpd_opposition", - STRING_TO_ARRAY(contact_uids, ',') AS "contact_uids" + email_blacklisted AS "has_hardbounced", + date_di_rgpd_opposition IS NOT NULL AS "was_objected_to" FROM contacts ) diff --git a/pipeline/dbt/models/intermediate/int__union_contacts.sql b/pipeline/dbt/models/intermediate/int__union_contacts.sql index 538e26c54..b09f01ee4 100644 --- a/pipeline/dbt/models/intermediate/int__union_contacts.sql +++ b/pipeline/dbt/models/intermediate/int__union_contacts.sql @@ -18,8 +18,7 @@ final AS ( source AS "source", courriel AS "courriel", telephone AS "telephone", - contact_nom_prenom AS "contact_nom_prenom", - contact_uid AS "contact_uid" + contact_nom_prenom AS "contact_nom_prenom" FROM contacts_union ) diff --git a/pipeline/dbt/models/intermediate/int__union_contacts__enhanced.sql b/pipeline/dbt/models/intermediate/int__union_contacts__enhanced.sql index 589f77849..eba32fe00 100644 --- a/pipeline/dbt/models/intermediate/int__union_contacts__enhanced.sql +++ b/pipeline/dbt/models/intermediate/int__union_contacts__enhanced.sql @@ -2,18 +2,14 @@ WITH contacts AS ( SELECT * FROM {{ ref('int__union_contacts') }} ), -brevo_contacts AS ( - SELECT * FROM {{ ref('int_brevo__contacts') }} -), - rgpd_notices AS ( - SELECT DISTINCT - SPLIT_PART(contact_uid, ':', 1) AS "source", - SPLIT_PART(contact_uid, ':', 3) AS "id", - date_di_rgpd_opposition IS NOT NULL AS "was_objected_to", - est_interdit AS "has_hardbounced" - FROM brevo_contacts, - UNNEST(contact_uids) AS contact_uid + SELECT + courriel, + has_hardbounced, + was_objected_to, + was_objected_to IS TRUE AS was_objected_to_truthy, + has_hardbounced IS TRUE AS has_hardbounced_truthy + FROM {{ ref('int_brevo__contacts') }} ), final AS ( @@ -21,22 +17,23 @@ final AS ( contacts.source || '-' || contacts.id AS "_di_surrogate_id", contacts.id AS "id", contacts.source AS "source", + contacts.courriel AS "_courriel_original", CASE WHEN - rgpd_notices.id IS NULL - OR NOT rgpd_notices.was_objected_to + rgpd_notices.courriel IS NULL + OR NOT rgpd_notices.was_objected_to_truthy THEN contacts.contact_nom_prenom END AS "contact_nom_prenom", CASE WHEN - rgpd_notices.id IS NULL - OR (NOT rgpd_notices.was_objected_to AND NOT rgpd_notices.has_hardbounced) + rgpd_notices.courriel IS NULL + OR (NOT rgpd_notices.was_objected_to_truthy AND NOT rgpd_notices.has_hardbounced_truthy) THEN contacts.courriel END AS "courriel", CASE WHEN - rgpd_notices.id IS NULL - OR NOT rgpd_notices.was_objected_to + rgpd_notices.courriel IS NULL + OR NOT rgpd_notices.was_objected_to_truthy THEN contacts.telephone END AS "telephone", rgpd_notices.was_objected_to AS "rgpd_notice_was_objected_to", @@ -44,8 +41,7 @@ final AS ( FROM contacts LEFT JOIN rgpd_notices ON - contacts.source = rgpd_notices.source - AND contacts.id = rgpd_notices.id + contacts.courriel = rgpd_notices.courriel ) SELECT * FROM final diff --git a/pipeline/dbt/models/intermediate/sources/dora/_dora__models.yml b/pipeline/dbt/models/intermediate/sources/dora/_dora__models.yml index df17232ed..190fd9512 100644 --- a/pipeline/dbt/models/intermediate/sources/dora/_dora__models.yml +++ b/pipeline/dbt/models/intermediate/sources/dora/_dora__models.yml @@ -14,10 +14,6 @@ models: - name: int_dora__contacts columns: - - name: contact_uid - data_tests: - - unique - - not_null - name: courriel data_tests: - not_null diff --git a/pipeline/dbt/models/intermediate/sources/dora/int_dora__contacts.sql b/pipeline/dbt/models/intermediate/sources/dora/int_dora__contacts.sql index 8ae76fb09..c742904a4 100644 --- a/pipeline/dbt/models/intermediate/sources/dora/int_dora__contacts.sql +++ b/pipeline/dbt/models/intermediate/sources/dora/int_dora__contacts.sql @@ -4,8 +4,7 @@ WITH structure_contacts AS ( _di_source_id AS "source", courriel AS "courriel", telephone AS "telephone", - NULL AS "contact_nom_prenom", - 'dora:structures:' || id AS "contact_uid" + NULL AS "contact_nom_prenom" FROM {{ ref('stg_dora__structures') }} WHERE courriel IS NOT NULL ), @@ -16,8 +15,7 @@ service_contacts AS ( _di_source_id AS "source", courriel AS "courriel", telephone AS "telephone", - contact_nom_prenom AS "contact_nom_prenom", - 'dora:services:' || id AS "contact_uid" + contact_nom_prenom AS "contact_nom_prenom" FROM {{ ref('stg_dora__services') }} WHERE courriel IS NOT NULL ), diff --git a/pipeline/dbt/models/intermediate/sources/mediation_numerique/_mediation_numerique_models.yml b/pipeline/dbt/models/intermediate/sources/mediation_numerique/_mediation_numerique_models.yml index 60ffbf5af..9349be97f 100644 --- a/pipeline/dbt/models/intermediate/sources/mediation_numerique/_mediation_numerique_models.yml +++ b/pipeline/dbt/models/intermediate/sources/mediation_numerique/_mediation_numerique_models.yml @@ -18,10 +18,6 @@ models: - name: int_mediation_numerique__contacts columns: - - name: contact_uid - data_tests: - - unique - - not_null - name: courriel data_tests: - not_null diff --git a/pipeline/dbt/models/intermediate/sources/mediation_numerique/int_mediation_numerique__contacts.sql b/pipeline/dbt/models/intermediate/sources/mediation_numerique/int_mediation_numerique__contacts.sql index 628295fc9..282b7be3f 100644 --- a/pipeline/dbt/models/intermediate/sources/mediation_numerique/int_mediation_numerique__contacts.sql +++ b/pipeline/dbt/models/intermediate/sources/mediation_numerique/int_mediation_numerique__contacts.sql @@ -4,9 +4,7 @@ WITH final AS ( _di_source_id AS "source", courriel AS "courriel", telephone AS "telephone", - NULL AS "contact_nom_prenom", - -- services in mediation numerique have no contact information - 'mediation-numerique:structures:' || id AS "contact_uid" + NULL AS "contact_nom_prenom" FROM {{ ref('stg_mediation_numerique__structures') }} WHERE courriel IS NOT NULL ) diff --git a/pipeline/dbt/models/intermediate/sources/mes_aides/_mes_aides__models.yml b/pipeline/dbt/models/intermediate/sources/mes_aides/_mes_aides__models.yml index d71d05380..966a1f554 100644 --- a/pipeline/dbt/models/intermediate/sources/mes_aides/_mes_aides__models.yml +++ b/pipeline/dbt/models/intermediate/sources/mes_aides/_mes_aides__models.yml @@ -12,7 +12,6 @@ models: - unique - not_null - dbt_utils.not_empty_string - - name: int_mes_aides__structures data_tests: - check_structure: @@ -56,11 +55,7 @@ models: field: id - name: int_mes_aides__contacts columns: - - name: contact_uid - data_tests: - - unique - - not_null - name: courriel data_tests: - not_null - - dbt_utils.not_empty_string \ No newline at end of file + - dbt_utils.not_empty_string diff --git a/pipeline/dbt/models/intermediate/sources/mes_aides/int_mes_aides__contacts.sql b/pipeline/dbt/models/intermediate/sources/mes_aides/int_mes_aides__contacts.sql index 5e202d3ca..78aa04797 100644 --- a/pipeline/dbt/models/intermediate/sources/mes_aides/int_mes_aides__contacts.sql +++ b/pipeline/dbt/models/intermediate/sources/mes_aides/int_mes_aides__contacts.sql @@ -1,15 +1,21 @@ WITH structure_contacts AS ( SELECT - email AS "courriel", - 'mes-aides:garages:' || id AS contact_uid + id AS "id", + _di_source_id AS "source", + email AS "courriel", + telephone AS "telephone", + NULL AS "contact_nom_prenom" FROM {{ ref('stg_mes_aides__garages') }} WHERE email IS NOT NULL ), service_contacts AS ( SELECT - contact_email AS "courriel", - 'mes-aides:aides:' || id AS contact_uid + id AS "id", + _di_source_id AS "source", + contact_email AS "courriel", + NULL AS "telephone", + NULL AS "contact_nom_prenom" FROM {{ ref('stg_mes_aides__permis_velo') }} WHERE contact_email IS NOT NULL ), diff --git a/pipeline/dbt/models/staging/brevo/_brevo__models.yml b/pipeline/dbt/models/staging/brevo/_brevo__models.yml index 3cb099c34..ef24e5f38 100644 --- a/pipeline/dbt/models/staging/brevo/_brevo__models.yml +++ b/pipeline/dbt/models/staging/brevo/_brevo__models.yml @@ -31,8 +31,3 @@ models: data_tests: - not_null - dbt_utils.not_constant - - name: contact_uids - data_tests: - - not_null - - dbt_utils.not_empty_string - - dbt_utils.not_constant diff --git a/pipeline/dbt/models/staging/brevo/stg_brevo__contacts.sql b/pipeline/dbt/models/staging/brevo/stg_brevo__contacts.sql index e72cfd1ff..4fd4027fb 100644 --- a/pipeline/dbt/models/staging/brevo/stg_brevo__contacts.sql +++ b/pipeline/dbt/models/staging/brevo/stg_brevo__contacts.sql @@ -11,7 +11,6 @@ final AS ( CAST(ARRAY(SELECT * FROM JSONB_ARRAY_ELEMENTS(data -> 'listIds')) AS INT []) AS "list_ids", data ->> 'id' AS "id", TO_DATE(data -> 'attributes' ->> 'DATE_DI_RGPD_OPPOSITION', 'YYYY-MM-DD') AS "date_di_rgpd_opposition", - data -> 'attributes' ->> 'CONTACT_UIDS' AS "contact_uids", NULLIF(TRIM(data ->> 'email'), '') AS "email" FROM source )