Skip to content

Commit

Permalink
feat(pipeline) : Handle Brevo & Dora contact information
Browse files Browse the repository at this point in the history
This commit introduces:

- An "all DORA emails" table that combines unique emails ffrom
  structures and services.

- An "import from Brevo" DAG that retrieves the contacted emails

- A "process_dora_brevo_contacts" DAG that that:

  * gets the list of all the known Brevo users
  * finds out who the "new users" are and imports them to Brevo
  * sends a marketing campaign to those users

That's it.

Optionnally/later, we could:

- make use of an "excluded contacts" table that is imported from Grist
  (refused any marketing communications)
- handle the soft bounces/hard bounces after a timeout.
  https://help.brevo.com/hc/en-us/articles/209435165-What-is-a-soft-bounce-and-a-hard-bounce-in-email-
  • Loading branch information
vperron committed Nov 24, 2023
1 parent 89cba8e commit 9cbcae0
Show file tree
Hide file tree
Showing 13 changed files with 321 additions and 7 deletions.
1 change: 1 addition & 0 deletions .template.env
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ AIRFLOW_UID=

AIRFLOW_CONN_S3_SOURCES=
AIRFLOW_VAR_DATAGOUV_API_KEY=
AIRFLOW_VAR_BREVO_API_KEY=
AIRFLOW_VAR_DORA_API_TOKEN=
AIRFLOW_VAR_EMPLOIS_API_TOKEN=
AIRFLOW_VAR_GRIST_API_TOKEN=
Expand Down
1 change: 1 addition & 0 deletions deployment/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ x-airflow-common:
AIRFLOW_CONN_S3_SOURCES: ${AIRFLOW_CONN_S3_SOURCES}

# Variables
AIRFLOW_VAR_BREVO_API_KEY: ${AIRFLOW_VAR_BREVO_API_KEY}
AIRFLOW_VAR_DATAGOUV_API_KEY: ${AIRFLOW_VAR_DATAGOUV_API_KEY}
AIRFLOW_VAR_DORA_API_TOKEN: ${AIRFLOW_VAR_DORA_API_TOKEN}
AIRFLOW_VAR_EMPLOIS_API_TOKEN: ${AIRFLOW_VAR_EMPLOIS_API_TOKEN}
Expand Down
1 change: 1 addition & 0 deletions deployment/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ resource "null_resource" "up" {
# pipeline secrets
AIRFLOW_CONN_S3_SOURCES=${var.airflow_conn_s3_sources}
AIRFLOW_VAR_BREVO_API_KEY=${var.brevo_api_key}
AIRFLOW_VAR_DATAGOUV_API_KEY=${var.datagouv_api_key}
AIRFLOW_VAR_DORA_API_TOKEN=${var.dora_api_token}
AIRFLOW_VAR_EMPLOIS_API_TOKEN=${var.emplois_api_token}
Expand Down
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ x-airflow-common:
AIRFLOW__LOGGING__DELETE_LOCAL_LOGS: 'True'

# Variables
AIRFLOW_VAR_BREVO_API_KEY: ${AIRFLOW_VAR_BREVO_API_KEY}
AIRFLOW_VAR_DATAGOUV_API_KEY: ${AIRFLOW_VAR_DATAGOUV_API_KEY}
AIRFLOW_VAR_DORA_API_TOKEN: ${AIRFLOW_VAR_DORA_API_TOKEN}
AIRFLOW_VAR_EMPLOIS_API_TOKEN: ${AIRFLOW_VAR_EMPLOIS_API_TOKEN}
Expand Down
4 changes: 4 additions & 0 deletions pipeline/dags/dag_utils/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# No need to add more airflow variables here, there is very little chance that those
# ever change and this is not sensitive information.
BREVO_CURRENT_CONTACTS_LIST_ID = 5
BREVO_ALL_CONTACTS_LIST_ID = 6
28 changes: 28 additions & 0 deletions pipeline/dags/dag_utils/pg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import functools
from contextlib import contextmanager

from airflow.providers.postgres.hooks.postgres import PostgresHook


@functools.cache
def hook():
return PostgresHook(postgres_conn_id="pg")


@contextmanager
def connect_begin():
engine = hook().get_sqlalchemy_engine()
with engine.connect() as conn:
with conn.begin():
yield conn


def create_schema(schema_name: str) -> None:
with connect_begin as conn:
conn.execute(
f"""\
CREATE SCHEMA IF NOT EXISTS {schema_name};
GRANT USAGE ON SCHEMA {schema_name} TO PUBLIC;
ALTER DEFAULT PRIVILEGES IN SCHEMA {schema_name}
GRANT SELECT ON TABLES TO PUBLIC;"""
)
63 changes: 63 additions & 0 deletions pipeline/dags/import_brevo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import logging

import airflow
import pendulum
from airflow.operators import empty, python
from dag_utils.virtualenvs import PYTHON_BIN_PATH

logger = logging.getLogger(__name__)

default_args = {}


def _import_rgpd_contacts():
import pandas as pd
from airflow.models import Variable
from dag_utils import constants

from dags.dag_utils import pg
from data_inclusion.scripts.tasks import brevo

brevo_client = brevo.BrevoClient(token=Variable.get("BREVO_API_KEY"))

pg.create_schema("brevo")

contacts_df = pd.DataFrame.from_records(
brevo_client.list_contacts(constants.BREVO_ALL_CONTACTS_LIST_ID),
columns=[
"email",
"id",
"emailBlacklisted",
"smsBlacklisted",
"createdAt",
"modifiedAt",
],
)
with pg.connect_begin() as conn:
contacts_df.to_sql(
"all_rgpd_dora_contacts",
con=conn,
schema="brevo",
if_exists="replace",
index=False,
)


with airflow.DAG(
dag_id="import_brevo",
start_date=pendulum.datetime(2023, 11, 1, tz="Europe/Paris"),
default_args=default_args,
schedule_interval="@daily",
catchup=False,
concurrency=1,
) as dag:
start = empty.EmptyOperator(task_id="start")
end = empty.EmptyOperator(task_id="end")

import_rgpd_contacts = python.ExternalPythonOperator(
task_id="import_rgpd_contacts",
python=str(PYTHON_BIN_PATH),
python_callable=_import_rgpd_contacts,
)

(start >> import_rgpd_contacts >> end)
84 changes: 84 additions & 0 deletions pipeline/dags/notify_dora_rgpd_contacts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import logging

import airflow
import pendulum
from airflow.operators import empty, python
from dag_utils.virtualenvs import PYTHON_BIN_PATH

default_args = {}

logger = logging.getLogger(__name__)


def _sync_new_contacts_to_brevo():
from airflow.models import Variable
from dag_utils import constants

from dags.dag_utils import pg
from data_inclusion.scripts.tasks import brevo

all_emails = [
row[0]
for row in pg.hook().get_records(
sql="SELECT courriel FROM dora.int_dora__contacts",
)
]

brevo_contacts = [
row[0]
for row in pg.hook().get_records(
sql="SELECT email FROM brevo.all_rgpd_dora_contacts",
)
]

brevo_client = brevo.BrevoClient(token=Variable.get("BREVO_API_KEY"))

new_emails = sorted(set(all_emails) - set(brevo_contacts))

brevo_client.import_to_list(constants.BREVO_ALL_CONTACTS_LIST_ID, new_emails)
brevo_client.empty_list(constants.BREVO_CURRENT_CONTACTS_LIST_ID)
brevo_client.import_to_list(constants.BREVO_CURRENT_CONTACTS_LIST_ID, new_emails)


def _send_rgpd_notice():
from airflow.models import Variable

from dags.dag_utils import constants
from data_inclusion.scripts.tasks import brevo

brevo_client = brevo.BrevoClient(token=Variable.get("BREVO_API_KEY"))
brevo_client.create_and_send_email_campaign(
subject="[data·inclusion] Notification RGPD",
template_id=2,
to_list_id=constants.BREVO_CURRENT_CONTACTS_LIST_ID,
tag="di-rgpd-notice",
from_email="[email protected]",
from_name="L'équipe data inclusion",
reply_to="[email protected]",
)


with airflow.DAG(
dag_id="dora_rgpd_notifications",
description="Sends RGPD notifications to DORA users",
start_date=pendulum.datetime(2023, 11, 1),
default_args=default_args,
schedule_interval="@monthly",
catchup=False,
) as dag:
start = empty.EmptyOperator(task_id="start")
end = empty.EmptyOperator(task_id="end")

sync_new_contacts_to_brevo = python.ExternalPythonOperator(
task_id="sync_new_contacts_to_brevo",
python=str(PYTHON_BIN_PATH),
python_callable=_sync_new_contacts_to_brevo,
)

send_rgpd_notice = python.ExternalPythonOperator(
task_id="send_rgpd_notice",
python=str(PYTHON_BIN_PATH),
python_callable=_send_rgpd_notice,
)

(start >> sync_new_contacts_to_brevo >> send_rgpd_notice >> end)
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ models:
tests:
- unique
- not_null

- name: int_dora__contacts
columns:
- name: courriel
tests:
- unique
- not_null
- dbt_utils.not_empty_string

- name: int_dora__services
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
WITH structure_contacts AS (
SELECT courriel
FROM {{ ref('stg_dora__structures') }}
WHERE courriel IS NOT NULL
),

service_contacts AS (
SELECT courriel
FROM {{ ref('stg_dora__services') }}
WHERE courriel IS NOT NULL
),

final AS (
SELECT * FROM structure_contacts
UNION
SELECT * FROM service_contacts
)

SELECT * FROM final
3 changes: 2 additions & 1 deletion pipeline/requirements/tasks/python/requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ psycopg2==2.*
pyairtable
pyogrio
requests==2.*
sib-api-v3-sdk
SQLAlchemy
tenacity
tqdm
trafilatura
urllib3
xlsxwriter
xlsxwriter
14 changes: 8 additions & 6 deletions pipeline/requirements/tasks/python/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,7 @@ apache-airflow-providers-postgres==5.8.0
apache-airflow-providers-sqlite==3.5.0
# via apache-airflow
apispec[yaml]==6.3.0
# via
# apispec
# flask-appbuilder
# via flask-appbuilder
argcomplete==3.1.6
# via apache-airflow
asgiref==3.7.2
Expand Down Expand Up @@ -104,6 +102,7 @@ certifi==2023.7.22
# pyogrio
# pyproj
# requests
# sib-api-v3-sdk
# trafilatura
cffi==1.16.0
# via cryptography
Expand Down Expand Up @@ -134,9 +133,7 @@ colorlog==4.8.0
configupdater==3.1.1
# via apache-airflow
connexion[flask]==2.14.2
# via
# apache-airflow
# connexion
# via apache-airflow
courlan==0.9.4
# via trafilatura
cron-descriptor==1.4.0
Expand Down Expand Up @@ -433,6 +430,7 @@ python-dateutil==2.8.2
# htmldate
# pandas
# pendulum
# sib-api-v3-sdk
python-nvd3==0.15.0
# via apache-airflow
python-slugify==8.0.1
Expand Down Expand Up @@ -493,12 +491,15 @@ setproctitle==1.3.3
# via apache-airflow
shapely==2.0.2
# via geopandas
sib-api-v3-sdk==7.6.0
# via -r requirements/tasks/python/requirements.in
six==1.16.0
# via
# fiona
# prison
# python-dateutil
# rfc3339-validator
# sib-api-v3-sdk
sniffio==1.3.0
# via
# anyio
Expand Down Expand Up @@ -567,6 +568,7 @@ urllib3==2.0.7
# htmldate
# pyairtable
# requests
# sib-api-v3-sdk
# trafilatura
watchtower==3.0.1
# via apache-airflow-providers-amazon
Expand Down
Loading

0 comments on commit 9cbcae0

Please sign in to comment.