From 5398a79058c75afeb4fb5162c4c45dcaddd67591 Mon Sep 17 00:00:00 2001 From: Victor Perron Date: Wed, 23 Oct 2024 16:05:19 +0200 Subject: [PATCH] feat(pipeline) : Simplify notify_rgpd_contacts After an entire year of operation, we noticed that our sources never change their contact emails. Ever. Even if someday this happens, we can probably suppose that it's till going to be quite rare. In those exceptional cases, it means that the structure would get a new RGPD notification. But it does make the code way easier to maintain and more robust: - now we will never miss an email that would have been "forgotten" by Brevo (due to rate limits, payment issues, ...) - it can also be seen as a feature to send a new notification if the email changes, for instance when the previous one was incorrect. Moreover, we don't need to maintain the `contact_uid` list that was sent to brevo. --- pipeline/RGPD.md | 56 ++++++------------- pipeline/dags/notify_rgpd_contacts.py | 50 ++++------------- pipeline/dbt/models/intermediate/_models.yml | 29 ++++------ .../intermediate/brevo/_brevo__models.yml | 11 ++-- .../brevo/int_brevo__contacts.sql | 5 +- .../intermediate/int__union_contacts.sql | 3 +- .../int__union_contacts__enhanced.sql | 34 +++++------ .../sources/dora/_dora__models.yml | 4 -- .../sources/dora/int_dora__contacts.sql | 6 +- .../_mediation_numerique_models.yml | 4 -- .../int_mediation_numerique__contacts.sql | 4 +- .../sources/mes_aides/_mes_aides__models.yml | 7 +-- .../mes_aides/int_mes_aides__contacts.sql | 14 +++-- .../models/staging/brevo/_brevo__models.yml | 5 -- .../staging/brevo/stg_brevo__contacts.sql | 1 - 15 files changed, 80 insertions(+), 153 deletions(-) diff --git a/pipeline/RGPD.md b/pipeline/RGPD.md index 14c41cbca..952fad0e3 100644 --- a/pipeline/RGPD.md +++ b/pipeline/RGPD.md @@ -7,7 +7,8 @@ data⋅inclusion. En l'absence de réponse de leur part, comme vu dans les CGAUs de data⋅inclusion, nous publierons le courriel en question dans le jeu de données disponible dans l'API. -A date (fin 2023), nous ne proposons l'affichage que des courriels renseignés dans les structures et services DORA. +L'intérêt secondaire pour data⋅inclusion de ces notification est de permettre une validation poussée des e-mails de contact +ainsi notifiés; nous savons donc pour chaque courriel, s'il est digne de confiance ou non. ## Déroulement technique @@ -15,49 +16,26 @@ A date (fin 2023), nous ne proposons l'affichage que des courriels renseignés d Chaque jour le DAG `import_brevo` récupère la liste à jour des personnes contactées lors des notifications RGPD. Pour ce faire il réalise un extract + load (datalake puis datawarehouse) des données depuis l'API Brevo. -A noter que les contacts enregistrés dans Brevo sont identifiés par: +A noter que les contacts enregistrés dans Brevo contiennent a minima: - leur email (qui est la clé unique de renseignement d'un contact Brevo) -- un ensemble d'attributs, dont: - * la date d'opposition RGPD (remplie par Osiris) - * la liste des structures dont l'adresse email est membre +- le fait que cet email ait donné lieu à un envoi ou non, avec ou sans 'hardbounce' +- un ensemble d'attributs, dont la date d'opposition RGPD (remplie par Osiris) ### Traitements de données -Chaque jour le DAG `main` se chargera de créer les tables (staging & intermediate) liées d'un côté -aux courriels venant des nos sources (structures et services Dora uniquement pour l'instant) et -par ailleurs aux données de contact Brevo. +Chaque jour le DAG `main` se chargera de créer la table `int__union_contacts__enhanced` qui relie courriels venant +de nos sources de données, et courriels connus de Brevo (avec succès de l'envoi ou non, date d'opposition...) ### Notification -Chaque mois, le DAG `notify_rgpd_contacts` tire une liste de contacts depuis les sources qui nous -intéressent (aujourd'hui, Dora uniquement) et en tire une liste de "nouveaux contacts". +Chaque mois, le DAG `notify_rgpd_contacts` tire une liste de courriels connus depuis nos sources. -Pour cela, il compare les `contact_uid` (combinaison de source, stream et UID) d'un contact enregistrés -dans Brevo et ceux provenant de Dora; -- si le `contact_uid` n'existe pas dans Brevo, ET que l'email n'existe pas non plus, c'est un nouveau contact. -- si le `contact_uid` n'existe pas dans Brevo, mais que l'email est connu, on va mettre à jour ce contact. +Ceux n'ayant jamais fait l'objet d'une notification seront déposés dans une liste qui servira à l'envoi +de la prochaine notification; tous les contacts sont par ailleurs ajoutés à la liste de "tous les contacts +connus", qui est celle importée régulièrement par `import_brevo`. -A noter que la mise à jour d'un contact va écraser les `contact_uids` par exemple, mais ne touchera pas à la -date d'opposition RGPD. +Puis nous déclenchons l'envoi d'une nouvelle campagne de notification RGPD pour le mois courant, envoyée +à la liste des "nouveaux emails" connus. -Dans tous les cas, si on a déjà connu ce `contact_uid` dans Brevo, quelle que soit l'adresse mail -associée, nous ne recontacterons pas. - -Ainsi, une fois la liste des "nouveaux contacts" établie, le DAG se charge de: - -1. Enregistrer dans Brevo la liste de tous les contacts ayant été contactés -2. Remettre à zéro et remplir dans Brevo la liste des "contacts à contacter ce mois-ci" -3. Déclencher une nouvelle campagne de notification dans Brevo et l'envoyer - -L'avantage d'utiliser des campagnes indépendantes mensuelles est que ces dernières permettent l'utilisation -des dashboards analytiques poussées de Brevo. - -## Reste à faire - -### Intégration dans l'API -Il faut intégrer les emails des sources Dora (et Emplois !) dans l'API. - -Cela va concerner: - -- toutes les adresses connues (notification envoyée à ce jour ou non) -- non "opposés" via le DPO dans Brevo -- non "unsubscribed" -- ipour le confort des utilisateurs, non hardbouncés bar Brevo : le mail est forcément invalide. +Par le passé nous maintenions une liste des structures et services ayant été liés à un email, grâce à un attribut +spécial sous forme de liste, attaché à chaque contact et envoyé à Brevo. Nous avons remarqué au bout d'un an, +que aucune source ne modifiait ses courriels; nous avonc donc cessé de maintenir ce système complexe, mais il en +reste des traces dans certains contacts Brevo. Ils ne sont plus mis à jour. diff --git a/pipeline/dags/notify_rgpd_contacts.py b/pipeline/dags/notify_rgpd_contacts.py index 6f0fdae6a..521493aa6 100644 --- a/pipeline/dags/notify_rgpd_contacts.py +++ b/pipeline/dags/notify_rgpd_contacts.py @@ -12,53 +12,27 @@ def _sync_new_contacts_to_brevo(): - from collections import defaultdict - from airflow.models import Variable from dag_utils import constants, pg from dag_utils.sources import brevo - potential_contacts = pg.hook().get_records( - sql=( - "SELECT courriel, ARRAY_AGG(contact_uid) as contact_uids " - "FROM public_intermediate.int__union_contacts " - "GROUP BY courriel" - ) - ) - - brevo_contacts = pg.hook().get_records( + contacts = pg.hook().get_records( sql=( - "SELECT courriel, contact_uids " - "FROM public_intermediate.int_brevo__contacts" + """ + SELECT + json_build_object('email', _courriel_original), + rgpd_notice_has_hardbounced + FROM public_intermediate.int__union_contacts__enhanced + GROUP BY _courriel_original, rgpd_notice_has_hardbounced + ORDER BY _courriel_original + """ ) ) - brevo_contacts_uid_set = { - uid for _, contact_uids in brevo_contacts for uid in contact_uids - } - brevo_contacts_map = {email: contact_uids for email, contact_uids in brevo_contacts} - - # We only consider a contact as "new" if BOTH its contact uid AND its email are new. - # If the email is not new but linked to a new contact UID, update the associated - # list accordingly. - new_contacts_map = defaultdict(list) - for email, contact_uids in potential_contacts: - for contact_uid in contact_uids: - if contact_uid not in brevo_contacts_uid_set: - if email in brevo_contacts_map: - brevo_contacts_map[email].append(contact_uid) - else: - new_contacts_map[email].append(contact_uid) - - new_contacts = [ - {"email": email, "attributes": {"contact_uids": ",".join(contact_uids)}} - for email, contact_uids in new_contacts_map.items() - ] - all_contacts = new_contacts + [ - {"email": email, "attributes": {"contact_uids": ",".join(contact_uids)}} - for email, contact_uids in brevo_contacts_map.items() - ] + # new contacts : all those for which we don't even know if they bounced or not + new_contacts = [c[0] for c in contacts if c[1] is None] + all_contacts = [c[0] for c in contacts] brevo_client = brevo.BrevoClient(token=Variable.get("BREVO_API_KEY")) diff --git a/pipeline/dbt/models/intermediate/_models.yml b/pipeline/dbt/models/intermediate/_models.yml index e170bdd75..00227452a 100644 --- a/pipeline/dbt/models/intermediate/_models.yml +++ b/pipeline/dbt/models/intermediate/_models.yml @@ -11,10 +11,6 @@ models: - name: int__union_contacts columns: - - name: contact_uid - data_tests: - - unique - - not_null - name: courriel data_tests: - not_null @@ -41,6 +37,7 @@ models: - name: int__union_contacts__enhanced columns: - name: _di_surrogate_id + - name: _courriel_original - name: id - name: source - name: contact_nom_prenom @@ -260,23 +257,21 @@ unit_tests: given: - input: ref('int__union_contacts') rows: - - {"source": "dora", "id": "foo", "courriel": "truc@toto.at", "contact_nom_prenom": "Jean Valjean", "telephone": "0123456789"} - - {"source": "mednum", "id": "bar", "courriel": "truc@toto.at", "contact_nom_prenom": "Jean Valjean", "telephone": "0123456789"} - - {"source": "lezemplois", "id": "baz", "courriel": "truc@toto.at", "contact_nom_prenom": "Jean Valjean", "telephone": "0123456789"} + - {"source": "dora", "id": "foo", "courriel": "truc1@toto.at", "contact_nom_prenom": "Jean Valjean", "telephone": "0123456789"} + - {"source": "other", "id": "foo", "courriel": "truc1@toto.at", "contact_nom_prenom": "Jean Other", "telephone": "9876543210"} + - {"source": "mednum", "id": "bar", "courriel": "truc2@toto.at", "contact_nom_prenom": "Jean Valjean", "telephone": "0123456789"} + - {"source": "lezemplois", "id": "baz", "courriel": "truc3@toto.at", "contact_nom_prenom": "Jean Valjean", "telephone": "0123456789"} - {"source": "autre", "id": "solo", "courriel": "solo@autre.net", "contact_nom_prenom": "Autre Personne", "telephone": "06666666"} - input: ref('int_brevo__contacts') - format: sql rows: - select ARRAY['dora:services:foo'] as contact_uids, TRUE as est_interdit, '2021-01-01' as date_di_rgpd_opposition - UNION ALL - select ARRAY['mednum:services:bar'] as contact_uids, FALSE as est_interdit, '2021-01-01' as date_di_rgpd_opposition - UNION ALL - select ARRAY['lezemplois:services:baz'] as contact_uids, FALSE as est_interdit, NULL as date_di_rgpd_opposition - UNION ALL - select ARRAY['sans:services:rapport'] as contact_uids, TRUE as est_interdit, NULL as date_di_rgpd_opposition + - {"courriel": "truc1@toto.at", "has_hardbounced": true, "was_objected_to": true} + - {"courriel": "truc2@toto.at", "has_hardbounced": false, "was_objected_to": true} + - {"courriel": "truc3@toto.at", "has_hardbounced": false, "was_objected_to": null} + - {"courriel": "solo@autre.net", "has_hardbounced": true, "was_objected_to": null} expect: rows: - {source: dora, id: foo, courriel: NULL, contact_nom_prenom: NULL, telephone: NULL} # objected to, hardbounce + - {source: other, id: foo, courriel: NULL, contact_nom_prenom: NULL, telephone: NULL} # objected to, hardbounce - {source: mednum, id: bar, courriel: NULL, contact_nom_prenom: NULL, telephone: NULL} # objected to, no hardbounce - - {source: lezemplois, id: baz, courriel: truc@toto.at, contact_nom_prenom: Jean Valjean, telephone: '0123456789'} # not objected to, no hardbounce - - {source: autre, id: solo, courriel: solo@autre.net, contact_nom_prenom: Autre Personne, telephone: '06666666'} # hardbounce : only email is hidden + - {source: lezemplois, id: baz, courriel: truc3@toto.at, contact_nom_prenom: Jean Valjean, telephone: '0123456789'} # not objected to, no hardbounce + - {source: autre, id: solo, courriel: NULL, contact_nom_prenom: Autre Personne, telephone: '06666666'} # hardbounce : only email is hidden diff --git a/pipeline/dbt/models/intermediate/brevo/_brevo__models.yml b/pipeline/dbt/models/intermediate/brevo/_brevo__models.yml index 67d356d5d..bbc57def2 100644 --- a/pipeline/dbt/models/intermediate/brevo/_brevo__models.yml +++ b/pipeline/dbt/models/intermediate/brevo/_brevo__models.yml @@ -14,7 +14,7 @@ models: - not_null - dbt_utils.not_constant - dbt_utils.not_empty_string - - name: est_interdit + - name: has_hardbounced description: | Signifie que cet e-mail est considéré par Brevo comme: - un "hard bounce" (n'existe pas) @@ -22,9 +22,12 @@ models: - a classé le mail en spam - a été classé "bloqué" par un administrateur Brevo data_tests: - - not_null - accepted_values: values: [true, false] - - name: date_di_rgpd_opposition - description: Date d'opposition au RGPD telle qu'entrée par notre DPO dans Brevo lorsque signalé. + - name: was_objected_to + description: | + Permet de savoir si le contact a fait l'objet d'une demande de retrait RGPD. + data_tests: + - accepted_values: + values: [true, false] diff --git a/pipeline/dbt/models/intermediate/brevo/int_brevo__contacts.sql b/pipeline/dbt/models/intermediate/brevo/int_brevo__contacts.sql index a3f0d8884..be209d80f 100644 --- a/pipeline/dbt/models/intermediate/brevo/int_brevo__contacts.sql +++ b/pipeline/dbt/models/intermediate/brevo/int_brevo__contacts.sql @@ -6,9 +6,8 @@ final AS ( SELECT id AS "id", email AS "courriel", - email_blacklisted AS "est_interdit", - date_di_rgpd_opposition AS "date_di_rgpd_opposition", - STRING_TO_ARRAY(contact_uids, ',') AS "contact_uids" + email_blacklisted AS "has_hardbounced", + date_di_rgpd_opposition IS NOT NULL AS "was_objected_to" FROM contacts ) diff --git a/pipeline/dbt/models/intermediate/int__union_contacts.sql b/pipeline/dbt/models/intermediate/int__union_contacts.sql index 538e26c54..b09f01ee4 100644 --- a/pipeline/dbt/models/intermediate/int__union_contacts.sql +++ b/pipeline/dbt/models/intermediate/int__union_contacts.sql @@ -18,8 +18,7 @@ final AS ( source AS "source", courriel AS "courriel", telephone AS "telephone", - contact_nom_prenom AS "contact_nom_prenom", - contact_uid AS "contact_uid" + contact_nom_prenom AS "contact_nom_prenom" FROM contacts_union ) diff --git a/pipeline/dbt/models/intermediate/int__union_contacts__enhanced.sql b/pipeline/dbt/models/intermediate/int__union_contacts__enhanced.sql index 5a6e79669..2ca69e76d 100644 --- a/pipeline/dbt/models/intermediate/int__union_contacts__enhanced.sql +++ b/pipeline/dbt/models/intermediate/int__union_contacts__enhanced.sql @@ -2,18 +2,14 @@ WITH contacts AS ( SELECT * FROM {{ ref('int__union_contacts') }} ), -brevo_contacts AS ( - SELECT * FROM {{ ref('int_brevo__contacts') }} -), - rgpd_notices AS ( - SELECT DISTINCT - SPLIT_PART(contact_uid, ':', 1) AS "source", - SPLIT_PART(contact_uid, ':', 3) AS "id", - date_di_rgpd_opposition IS NOT NULL AS "was_objected_to", - est_interdit AS "has_hardbounced" - FROM brevo_contacts, - UNNEST(contact_uids) AS contact_uid + SELECT + courriel, + has_hardbounced, + was_objected_to, + was_objected_to IS TRUE AS was_objected_to_truthy, + has_hardbounced IS TRUE AS has_hardbounced_truthy + FROM {{ ref('int_brevo__contacts') }} ), final AS ( @@ -21,22 +17,23 @@ final AS ( contacts._di_surrogate_id AS "_di_surrogate_id", contacts.id AS "id", contacts.source AS "source", + contacts.courriel AS "_courriel_original", CASE WHEN - rgpd_notices.id IS NULL - OR NOT rgpd_notices.was_objected_to + rgpd_notices.courriel IS NULL + OR NOT rgpd_notices.was_objected_to_truthy THEN contacts.contact_nom_prenom END AS "contact_nom_prenom", CASE WHEN - rgpd_notices.id IS NULL - OR (NOT rgpd_notices.was_objected_to AND NOT rgpd_notices.has_hardbounced) + rgpd_notices.courriel IS NULL + OR (NOT rgpd_notices.was_objected_to_truthy AND NOT rgpd_notices.has_hardbounced_truthy) THEN contacts.courriel END AS "courriel", CASE WHEN - rgpd_notices.id IS NULL - OR NOT rgpd_notices.was_objected_to + rgpd_notices.courriel IS NULL + OR NOT rgpd_notices.was_objected_to_truthy THEN contacts.telephone END AS "telephone", rgpd_notices.was_objected_to AS "rgpd_notice_was_objected_to", @@ -44,8 +41,7 @@ final AS ( FROM contacts LEFT JOIN rgpd_notices ON - contacts.source = rgpd_notices.source - AND contacts.id = rgpd_notices.id + contacts.courriel = rgpd_notices.courriel ) SELECT * FROM final diff --git a/pipeline/dbt/models/intermediate/sources/dora/_dora__models.yml b/pipeline/dbt/models/intermediate/sources/dora/_dora__models.yml index df17232ed..190fd9512 100644 --- a/pipeline/dbt/models/intermediate/sources/dora/_dora__models.yml +++ b/pipeline/dbt/models/intermediate/sources/dora/_dora__models.yml @@ -14,10 +14,6 @@ models: - name: int_dora__contacts columns: - - name: contact_uid - data_tests: - - unique - - not_null - name: courriel data_tests: - not_null diff --git a/pipeline/dbt/models/intermediate/sources/dora/int_dora__contacts.sql b/pipeline/dbt/models/intermediate/sources/dora/int_dora__contacts.sql index 8ae76fb09..c742904a4 100644 --- a/pipeline/dbt/models/intermediate/sources/dora/int_dora__contacts.sql +++ b/pipeline/dbt/models/intermediate/sources/dora/int_dora__contacts.sql @@ -4,8 +4,7 @@ WITH structure_contacts AS ( _di_source_id AS "source", courriel AS "courriel", telephone AS "telephone", - NULL AS "contact_nom_prenom", - 'dora:structures:' || id AS "contact_uid" + NULL AS "contact_nom_prenom" FROM {{ ref('stg_dora__structures') }} WHERE courriel IS NOT NULL ), @@ -16,8 +15,7 @@ service_contacts AS ( _di_source_id AS "source", courriel AS "courriel", telephone AS "telephone", - contact_nom_prenom AS "contact_nom_prenom", - 'dora:services:' || id AS "contact_uid" + contact_nom_prenom AS "contact_nom_prenom" FROM {{ ref('stg_dora__services') }} WHERE courriel IS NOT NULL ), diff --git a/pipeline/dbt/models/intermediate/sources/mediation_numerique/_mediation_numerique_models.yml b/pipeline/dbt/models/intermediate/sources/mediation_numerique/_mediation_numerique_models.yml index 60ffbf5af..9349be97f 100644 --- a/pipeline/dbt/models/intermediate/sources/mediation_numerique/_mediation_numerique_models.yml +++ b/pipeline/dbt/models/intermediate/sources/mediation_numerique/_mediation_numerique_models.yml @@ -18,10 +18,6 @@ models: - name: int_mediation_numerique__contacts columns: - - name: contact_uid - data_tests: - - unique - - not_null - name: courriel data_tests: - not_null diff --git a/pipeline/dbt/models/intermediate/sources/mediation_numerique/int_mediation_numerique__contacts.sql b/pipeline/dbt/models/intermediate/sources/mediation_numerique/int_mediation_numerique__contacts.sql index 628295fc9..282b7be3f 100644 --- a/pipeline/dbt/models/intermediate/sources/mediation_numerique/int_mediation_numerique__contacts.sql +++ b/pipeline/dbt/models/intermediate/sources/mediation_numerique/int_mediation_numerique__contacts.sql @@ -4,9 +4,7 @@ WITH final AS ( _di_source_id AS "source", courriel AS "courriel", telephone AS "telephone", - NULL AS "contact_nom_prenom", - -- services in mediation numerique have no contact information - 'mediation-numerique:structures:' || id AS "contact_uid" + NULL AS "contact_nom_prenom" FROM {{ ref('stg_mediation_numerique__structures') }} WHERE courriel IS NOT NULL ) diff --git a/pipeline/dbt/models/intermediate/sources/mes_aides/_mes_aides__models.yml b/pipeline/dbt/models/intermediate/sources/mes_aides/_mes_aides__models.yml index d71d05380..966a1f554 100644 --- a/pipeline/dbt/models/intermediate/sources/mes_aides/_mes_aides__models.yml +++ b/pipeline/dbt/models/intermediate/sources/mes_aides/_mes_aides__models.yml @@ -12,7 +12,6 @@ models: - unique - not_null - dbt_utils.not_empty_string - - name: int_mes_aides__structures data_tests: - check_structure: @@ -56,11 +55,7 @@ models: field: id - name: int_mes_aides__contacts columns: - - name: contact_uid - data_tests: - - unique - - not_null - name: courriel data_tests: - not_null - - dbt_utils.not_empty_string \ No newline at end of file + - dbt_utils.not_empty_string diff --git a/pipeline/dbt/models/intermediate/sources/mes_aides/int_mes_aides__contacts.sql b/pipeline/dbt/models/intermediate/sources/mes_aides/int_mes_aides__contacts.sql index 5e202d3ca..78aa04797 100644 --- a/pipeline/dbt/models/intermediate/sources/mes_aides/int_mes_aides__contacts.sql +++ b/pipeline/dbt/models/intermediate/sources/mes_aides/int_mes_aides__contacts.sql @@ -1,15 +1,21 @@ WITH structure_contacts AS ( SELECT - email AS "courriel", - 'mes-aides:garages:' || id AS contact_uid + id AS "id", + _di_source_id AS "source", + email AS "courriel", + telephone AS "telephone", + NULL AS "contact_nom_prenom" FROM {{ ref('stg_mes_aides__garages') }} WHERE email IS NOT NULL ), service_contacts AS ( SELECT - contact_email AS "courriel", - 'mes-aides:aides:' || id AS contact_uid + id AS "id", + _di_source_id AS "source", + contact_email AS "courriel", + NULL AS "telephone", + NULL AS "contact_nom_prenom" FROM {{ ref('stg_mes_aides__permis_velo') }} WHERE contact_email IS NOT NULL ), diff --git a/pipeline/dbt/models/staging/brevo/_brevo__models.yml b/pipeline/dbt/models/staging/brevo/_brevo__models.yml index 3cb099c34..ef24e5f38 100644 --- a/pipeline/dbt/models/staging/brevo/_brevo__models.yml +++ b/pipeline/dbt/models/staging/brevo/_brevo__models.yml @@ -31,8 +31,3 @@ models: data_tests: - not_null - dbt_utils.not_constant - - name: contact_uids - data_tests: - - not_null - - dbt_utils.not_empty_string - - dbt_utils.not_constant diff --git a/pipeline/dbt/models/staging/brevo/stg_brevo__contacts.sql b/pipeline/dbt/models/staging/brevo/stg_brevo__contacts.sql index e72cfd1ff..4fd4027fb 100644 --- a/pipeline/dbt/models/staging/brevo/stg_brevo__contacts.sql +++ b/pipeline/dbt/models/staging/brevo/stg_brevo__contacts.sql @@ -11,7 +11,6 @@ final AS ( CAST(ARRAY(SELECT * FROM JSONB_ARRAY_ELEMENTS(data -> 'listIds')) AS INT []) AS "list_ids", data ->> 'id' AS "id", TO_DATE(data -> 'attributes' ->> 'DATE_DI_RGPD_OPPOSITION', 'YYYY-MM-DD') AS "date_di_rgpd_opposition", - data -> 'attributes' ->> 'CONTACT_UIDS' AS "contact_uids", NULLIF(TRIM(data ->> 'email'), '') AS "email" FROM source )