Skip to content

Commit

Permalink
feat(pipeline) : Handle Brevo & Dora contact information
Browse files Browse the repository at this point in the history
This commit introduces:

- An "all DORA contacts" table that combines unique emails from
  structures and services.

- An daily "import from Brevo" DAG that retrieves the contacted people

- A monthly "notify_rgpd_contacts" DAG that that:

  * gets the list of all the known Brevo contacts
  * gets the list of unique Dora contacts
  * finds out who the "new users" are and imports them to Brevo
  * sends a marketing campaign to those users
  • Loading branch information
vperron committed Dec 5, 2023
1 parent 30e377a commit 1cd658c
Show file tree
Hide file tree
Showing 23 changed files with 634 additions and 7 deletions.
1 change: 1 addition & 0 deletions .template.env
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ AIRFLOW_UID=

AIRFLOW_CONN_S3_SOURCES=
AIRFLOW_VAR_DATAGOUV_API_KEY=
AIRFLOW_VAR_BREVO_API_KEY=
AIRFLOW_VAR_DORA_API_TOKEN=
AIRFLOW_VAR_DORA_PREPROD_API_TOKEN=
AIRFLOW_VAR_EMPLOIS_API_TOKEN=
Expand Down
1 change: 1 addition & 0 deletions deployment/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ x-airflow-common:
AIRFLOW_CONN_S3_SOURCES: ${AIRFLOW_CONN_S3_SOURCES}

# Variables
AIRFLOW_VAR_BREVO_API_KEY: ${AIRFLOW_VAR_BREVO_API_KEY}
AIRFLOW_VAR_DATAGOUV_API_KEY: ${AIRFLOW_VAR_DATAGOUV_API_KEY}
AIRFLOW_VAR_DORA_API_TOKEN: ${AIRFLOW_VAR_DORA_API_TOKEN}
AIRFLOW_VAR_DORA_PREPROD_API_TOKEN: ${AIRFLOW_VAR_DORA_PREPROD_API_TOKEN}
Expand Down
1 change: 1 addition & 0 deletions deployment/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ resource "null_resource" "up" {
# pipeline secrets
AIRFLOW_CONN_S3_SOURCES=${var.airflow_conn_s3_sources}
AIRFLOW_VAR_BREVO_API_KEY=${var.brevo_api_key}
AIRFLOW_VAR_DATAGOUV_API_KEY=${var.datagouv_api_key}
AIRFLOW_VAR_DORA_API_TOKEN=${var.dora_api_token}
AIRFLOW_VAR_DORA_PREPROD_API_TOKEN=${var.dora_preprod_api_token}
Expand Down
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ x-airflow-common:
AIRFLOW__LOGGING__DELETE_LOCAL_LOGS: 'True'

# Variables
AIRFLOW_VAR_BREVO_API_KEY: ${AIRFLOW_VAR_BREVO_API_KEY}
AIRFLOW_VAR_DATAGOUV_API_KEY: ${AIRFLOW_VAR_DATAGOUV_API_KEY}
AIRFLOW_VAR_DORA_API_TOKEN: ${AIRFLOW_VAR_DORA_API_TOKEN}
AIRFLOW_VAR_DORA_PREPROD_API_TOKEN: ${AIRFLOW_VAR_DORA_PREPROD_API_TOKEN}
Expand Down
63 changes: 63 additions & 0 deletions pipeline/RGPD.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Notification RGPD

Nous envoyons des notifications par courriel, chaque mois, aux utilisateurs dont nous avons eu connaissance dudit courriel
via l'une ou l'autre de nos sources, afin de leur proposer de s'opposer à la réutilisation de cette donnée dans
data⋅inclusion.

En l'absence de réponse de leur part, comme vu dans les CGAUs de data⋅inclusion, nous publierons le courriel en
question dans le jeu de données disponible dans l'API.

A date (fin 2023), nous ne proposons l'affichage que des courriels renseignés dans les structures et services DORA.

## Déroulement technique

### Imports Brevo
Chaque jour le DAG `import_brevo` récupère la liste à jour des personnes contactées lors des notifications RGPD.
Pour ce faire il réalise un extract + load (datalake puis datawarehouse) des données depuis l'API Brevo.

A noter que les contacts enregistrés dans Brevo sont identifiés par:
- leur email (qui est la clé unique de renseignement d'un contact Brevo)
- un ensemble d'attributs, dont:
* la date d'opposition RGPD (remplie par Osiris)
* la liste des structures dont l'adresse email est membre

### Traitements de données
Chaque jour le DAG `main` se chargera de créer les tables (staging & intermediate) liées d'un côté
aux courriels venant des nos sources (structures et services Dora uniquement pour l'instant) et
par ailleurs aux données de contact Brevo.

### Notification
Chaque mois, le DAG `notify_rgpd_contacts` tire une liste de contacts depuis les sources qui nous
intéressent (aujourd'hui, Dora uniquement) et en tire une liste de "nouveaux contacts".

Pour cela, il compare les `contact_uid` (combinaison de source, stream et UID) d'un contact enregistrés
dans Brevo et ceux provenant de Dora;
- si le `contact_uid` n'existe pas dans Brevo, ET que l'email n'existe pas non plus, c'est un nouveau contact.
- si le `contact_uid` n'existe pas dans Brevo, mais que l'email est connu, on va mettre à jour ce contact.

A noter que la mise à jour d'un contact va écraser les `contact_uids` par exemple, mais ne touchera pas à la
date d'opposition RGPD.

Dans tous les cas, si on a déjà connu ce `contact_uid` dans Brevo, quelle que soit l'adresse mail
associée, nous ne recontacterons pas.

Ainsi, une fois la liste des "nouveaux contacts" établie, le DAG se charge de:

1. Enregistrer dans Brevo la liste de tous les contacts ayant été contactés
2. Remettre à zéro et remplir dans Brevo la liste des "contacts à contacter ce mois-ci"
3. Déclencher une nouvelle campagne de notification dans Brevo et l'envoyer

L'avantage d'utiliser des campagnes indépendantes mensuelles est que ces dernières permettent l'utilisation
des dashboards analytiques poussées de Brevo.

## Reste à faire

### Intégration dans l'API
Il faut intégrer les emails des sources Dora (et Emplois !) dans l'API.

Cela va concerner:

- toutes les adresses connues (notification envoyée à ce jour ou non)
- non "opposés" via le DPO dans Brevo
- non "unsubscribed"
- ipour le confort des utilisateurs, non hardbouncés bar Brevo : le mail est forcément invalide.
4 changes: 4 additions & 0 deletions pipeline/dags/dag_utils/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# No need to add more airflow variables here, there is very little chance that those
# ever change and this is not sensitive information.
BREVO_CURRENT_CONTACTS_LIST_ID = 5
BREVO_ALL_CONTACTS_LIST_ID = 6
11 changes: 11 additions & 0 deletions pipeline/dags/dag_utils/date.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import pendulum


def local_date_str(dt: pendulum.DateTime) -> str:
import pendulum

return (
pendulum.instance(dt.astimezone(pendulum.timezone("Europe/Paris")))
.date()
.to_date_string()
)
67 changes: 67 additions & 0 deletions pipeline/dags/dag_utils/pg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import functools
from contextlib import contextmanager

import pandas as pd
from airflow.providers.postgres.hooks.postgres import PostgresHook


@functools.cache
def hook():
return PostgresHook(postgres_conn_id="pg")


@contextmanager
def connect_begin():
engine = hook().get_sqlalchemy_engine()
with engine.connect() as conn:
with conn.begin():
yield conn


def create_schema(schema_name: str) -> None:
with connect_begin() as conn:
conn.execute(
f"""\
CREATE SCHEMA IF NOT EXISTS {schema_name};
GRANT USAGE ON SCHEMA {schema_name} TO PUBLIC;
ALTER DEFAULT PRIVILEGES IN SCHEMA {schema_name}
GRANT SELECT ON TABLES TO PUBLIC;"""
)


def load_source_df(source_id: str, stream_id: str, df: pd.DataFrame) -> None:
import sqlalchemy as sqla
from sqlalchemy.dialects.postgresql import JSONB

with connect_begin() as conn:
schema_name = source_id.replace("-", "_")
table_name = stream_id.replace("-", "_")

df.to_sql(
f"{table_name}_tmp",
con=conn,
schema=schema_name,
if_exists="replace",
index=False,
dtype={
"data": JSONB,
"_di_logical_date": sqla.Date,
},
)

conn.execute(
f"""\
CREATE TABLE IF NOT EXISTS {schema_name}.{table_name} (
data JSONB,
_di_batch_id TEXT,
_di_source_id TEXT,
_di_stream_id TEXT,
_di_source_url TEXT,
_di_stream_s3_key TEXT,
_di_logical_date DATE
);
TRUNCATE {schema_name}.{table_name};
INSERT INTO {schema_name}.{table_name}
SELECT * FROM {schema_name}.{table_name}_tmp;
DROP TABLE {schema_name}.{table_name}_tmp;"""
)
38 changes: 38 additions & 0 deletions pipeline/dags/dag_utils/s3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import io

from airflow.providers.amazon.aws.hooks import s3

from . import date


def source_file_path(
source_id: str, filename: str, run_id: str, logical_date: str
) -> str:
return "/".join(
[
"data",
"raw",
date.local_date_str(logical_date),
source_id,
run_id,
filename,
]
)


def store_content(
path: str,
content: bytes,
):
s3_hook = s3.S3Hook(aws_conn_id="s3")
with io.BytesIO(content) as buf:
s3_hook.load_file_obj(
key=path,
file_obj=buf,
replace=True,
)


def download_file(path: str) -> str:
s3_hook = s3.S3Hook(aws_conn_id="s3")
return s3_hook.download_file(key=path)
77 changes: 77 additions & 0 deletions pipeline/dags/import_brevo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import logging

import airflow
import pendulum
from airflow.operators import empty, python

from dag_utils.virtualenvs import PYTHON_BIN_PATH

logger = logging.getLogger(__name__)

default_args = {}

SOURCE_ID = "brevo"
STREAM_ID = "contacts"


def _extract_rgpd_contacts(run_id: str, stream_id: str, source_id: str, logical_date):
import json

from airflow.models import Variable

from data_inclusion.scripts.tasks import brevo

from dag_utils import constants, s3

brevo_client = brevo.BrevoClient(token=Variable.get("BREVO_API_KEY"))
contacts = list(brevo_client.list_contacts(constants.BREVO_ALL_CONTACTS_LIST_ID))
s3.store_content(
s3.source_file_path(source_id, f"{stream_id}.json", run_id, logical_date),
json.dumps(contacts).encode(),
)


def _load_rgpd_contacts(run_id: str, stream_id: str, source_id: str, logical_date):
import pandas as pd

from dags.dag_utils import date, pg, s3
from data_inclusion.scripts.tasks import utils

s3_path = s3.source_file_path(source_id, f"{stream_id}.json", run_id, logical_date)
tmp_filename = s3.download_file(s3_path)
df = utils.read_json(tmp_filename)

df = pd.DataFrame().assign(data=df.apply(lambda row: row.to_dict(), axis="columns"))
df = df.assign(_di_batch_id=run_id)
df = df.assign(_di_logical_date=date.local_date_str(logical_date))

pg.create_schema(source_id)
pg.load_source_df(source_id, stream_id, df)


with airflow.DAG(
dag_id="import_brevo",
start_date=pendulum.datetime(2023, 11, 1, tz="Europe/Paris"),
default_args=default_args,
schedule_interval="@daily",
catchup=False,
concurrency=1,
) as dag:
start = empty.EmptyOperator(task_id="start")
end = empty.EmptyOperator(task_id="end")

extract_rgpd_contacts = python.ExternalPythonOperator(
task_id="extract_rgpd_contacts",
python=str(PYTHON_BIN_PATH),
python_callable=_extract_rgpd_contacts,
op_kwargs={"stream_id": STREAM_ID, "source_id": SOURCE_ID},
)

load_rgpd_contacts = python.ExternalPythonOperator(
task_id="load_rgpd_contacts",
python=str(PYTHON_BIN_PATH),
python_callable=_load_rgpd_contacts,
op_kwargs={"stream_id": STREAM_ID, "source_id": SOURCE_ID},
)

(start >> extract_rgpd_contacts >> load_rgpd_contacts >> end)
Loading

0 comments on commit 1cd658c

Please sign in to comment.