Skip to content

Commit

Permalink
feat(pipeline): extract fredo
Browse files Browse the repository at this point in the history
  • Loading branch information
vmttn committed Jun 3, 2024
1 parent 10c5529 commit b71be3d
Show file tree
Hide file tree
Showing 15 changed files with 274 additions and 1 deletion.
1 change: 1 addition & 0 deletions deployment/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ x-airflow-common:
AIRFLOW_VAR_DORA_API_TOKEN: ${AIRFLOW_VAR_DORA_API_TOKEN}
AIRFLOW_VAR_DORA_API_URL: ${AIRFLOW_VAR_DORA_API_URL}
AIRFLOW_VAR_EMPLOIS_API_TOKEN: ${AIRFLOW_VAR_EMPLOIS_API_TOKEN}
AIRFLOW_VAR_FREDO_API_TOKEN: ${AIRFLOW_VAR_FREDO_API_TOKEN}
AIRFLOW_VAR_FT_API_TOKEN: ${AIRFLOW_VAR_FT_API_TOKEN}
AIRFLOW_VAR_GRIST_API_TOKEN: ${AIRFLOW_VAR_GRIST_API_TOKEN}
AIRFLOW_VAR_MES_AIDES_AIRTABLE_KEY: ${AIRFLOW_VAR_MES_AIDES_AIRTABLE_KEY}
Expand Down
1 change: 1 addition & 0 deletions deployment/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ resource "null_resource" "up" {
AIRFLOW_VAR_DORA_API_TOKEN='${var.dora_api_token}'
AIRFLOW_VAR_DORA_API_URL='${var.dora_api_url}'
AIRFLOW_VAR_EMPLOIS_API_TOKEN='${var.emplois_api_token}'
AIRFLOW_VAR_FREDO_API_TOKEN='${var.fredo_api_token}'
AIRFLOW_VAR_FT_API_TOKEN='${var.ft_api_token}'
AIRFLOW_VAR_GRIST_API_TOKEN='${var.grist_api_token}'
AIRFLOW_VAR_MES_AIDES_AIRTABLE_KEY='${var.mes_aides_airtable_key}'
Expand Down
9 changes: 8 additions & 1 deletion deployment/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -214,4 +214,11 @@ variable "twocaptcha_api_key" {
type = string
sensitive = true
default = ""
}
}

variable "fredo_api_token" {
description = "Used in extraction tasks orchestrated by airflow"
type = string
sensitive = true
default = ""
}
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ x-airflow-common:
AIRFLOW_VAR_BREVO_API_KEY: ${AIRFLOW_VAR_BREVO_API_KEY}
AIRFLOW_VAR_DATAGOUV_API_KEY: ${AIRFLOW_VAR_DATAGOUV_API_KEY}
AIRFLOW_VAR_DORA_API_TOKEN: ${AIRFLOW_VAR_DORA_API_TOKEN}
AIRFLOW_VAR_FREDO_API_TOKEN: ${AIRFLOW_VAR_FREDO_API_TOKEN}
AIRFLOW_VAR_FT_API_TOKEN: ${AIRFLOW_VAR_FT_API_TOKEN}
AIRFLOW_VAR_DORA_PREPROD_API_TOKEN: ${AIRFLOW_VAR_DORA_PREPROD_API_TOKEN}
AIRFLOW_VAR_EMPLOIS_API_TOKEN: ${AIRFLOW_VAR_EMPLOIS_API_TOKEN}
Expand Down
13 changes: 13 additions & 0 deletions pipeline/dags/dag_utils/sources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
dora,
emplois_de_linclusion,
france_travail,
fredo,
grist,
mediation_numerique,
mes_aides,
Expand Down Expand Up @@ -78,6 +79,18 @@
},
},
},
"fredo": {
"schedule": "@daily",
"snapshot": True,
"extractor": fredo.extract,
"streams": {
"structures": {
"filename": "structures.json",
"url": Variable.get("FREDO_API_URL", None),
"token": Variable.get("FREDO_API_TOKEN", None),
},
},
},
"cd35": {
"schedule": "@daily",
"snapshot": True,
Expand Down
8 changes: 8 additions & 0 deletions pipeline/dags/dag_utils/sources/fredo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import httpx


def extract(id: str, url: str, token: str, **kwargs) -> bytes:
headers = {"Authorization": f"Bearer {token}"}
response = httpx.post(url, headers=headers)
response.raise_for_status()
return response.content
5 changes: 5 additions & 0 deletions pipeline/dbt/models/_sources.yml
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,8 @@ sources:
schema: brevo
tables:
- name: contacts

- name: fredo
schema: fredo
tables:
- name: structures
123 changes: 123 additions & 0 deletions pipeline/dbt/models/staging/sources/fredo/_fredo__models.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
version: 2

models:
- name: stg_fredo__structures
columns:
- name: hash
tests:
- unique
- not_null
- dbt_utils.not_empty_string

- name: adresse
tests:
- not_null:
config:
severity: warn
- dbt_utils.not_empty_string
- name: code_postal
tests:
- not_null:
config:
severity: warn
- dbt_utils.not_empty_string
- name: commune
tests:
- not_null:
config:
severity: warn
- dbt_utils.not_empty_string
- name: frais
tests:
- dbt_utils.not_empty_string
- name: horaires_ouverture
tests:
- dbt_utils.not_empty_string
- name: lien_source
tests:
- dbt_utils.not_empty_string
- name: nom
tests:
- not_null
- dbt_utils.not_empty_string
- name: presentation_resume
tests:
- dbt_utils.not_empty_string
- name: siret
tests:
- dbt_utils.not_empty_string
- name: telephone
tests:
- dbt_utils.not_empty_string
- name: latitude
- name: longitude
- name: categories
- name: publics
- name: quartiers
- name: services
- name: type_structure

- name: stg_fredo__categories
columns:
- name: hash
tests:
- not_null
- relationships:
to: ref('stg_fredo__structures')
field: hash
- name: value
tests:
- not_null
- dbt_utils.not_empty_string

- name: stg_fredo__publics
columns:
- name: hash
tests:
- not_null
- relationships:
to: ref('stg_fredo__structures')
field: hash
- name: value
tests:
- not_null
- dbt_utils.not_empty_string

- name: stg_fredo__quartiers
columns:
- name: hash
tests:
- not_null
- relationships:
to: ref('stg_fredo__structures')
field: hash
- name: value
tests:
- not_null
- dbt_utils.not_empty_string

- name: stg_fredo__services
columns:
- name: hash
tests:
- not_null
- relationships:
to: ref('stg_fredo__structures')
field: hash
- name: value
tests:
- not_null
- dbt_utils.not_empty_string

- name: stg_fredo__types
columns:
- name: hash
tests:
- not_null
- relationships:
to: ref('stg_fredo__structures')
field: hash
- name: value
tests:
- not_null
- dbt_utils.not_empty_string
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
WITH source AS (
{{ stg_source_header('fredo', 'structures') }}
),

final AS (
SELECT
MD5(data::TEXT) AS "hash",
JSONB_ARRAY_ELEMENTS_TEXT(data -> 'categories') AS "value"
FROM source
WHERE data ->> 'categories' IS NOT NULL
)

SELECT * FROM final
13 changes: 13 additions & 0 deletions pipeline/dbt/models/staging/sources/fredo/stg_fredo__publics.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
WITH source AS (
{{ stg_source_header('fredo', 'structures') }}
),

final AS (
SELECT
MD5(data::TEXT) AS "hash",
JSONB_ARRAY_ELEMENTS_TEXT(data -> 'publics') AS "value"
FROM source
WHERE data ->> 'publics' IS NOT NULL
)

SELECT * FROM final
13 changes: 13 additions & 0 deletions pipeline/dbt/models/staging/sources/fredo/stg_fredo__quartiers.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
WITH source AS (
{{ stg_source_header('fredo', 'structures') }}
),

final AS (
SELECT
MD5(data::TEXT) AS "hash",
JSONB_ARRAY_ELEMENTS_TEXT(data -> 'quartiers') AS "value"
FROM source
WHERE data ->> 'quartiers' IS NOT NULL
)

SELECT * FROM final
13 changes: 13 additions & 0 deletions pipeline/dbt/models/staging/sources/fredo/stg_fredo__services.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
WITH source AS (
{{ stg_source_header('fredo', 'structures') }}
),

final AS (
SELECT
MD5(data::TEXT) AS "hash",
JSONB_ARRAY_ELEMENTS_TEXT(data -> 'services') AS "value"
FROM source
WHERE data ->> 'services' IS NOT NULL
)

SELECT * FROM final
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
WITH source AS (
{{ stg_source_header('fredo', 'structures') }}
),

final AS (
SELECT
MD5(CAST(data AS TEXT)) AS "hash",
NULLIF(TRIM(data ->> 'adresse'), '') AS "adresse",
NULLIF(TRIM(data ->> 'code_postal'), '') AS "code_postal",
NULLIF(TRIM(data ->> 'commune'), '') AS "commune",
NULLIF(TRIM(data ->> 'frais'), '') AS "frais",
NULLIF(TRIM(data ->> 'horaires_ouverture'), '') AS "horaires_ouverture",
NULLIF(TRIM(data ->> 'lien_source'), '') AS "lien_source",
NULLIF(TRIM(data ->> 'nom'), '') AS "nom",
NULLIF(TRIM(data ->> 'presentation_resume'), '') AS "presentation_resume",
NULLIF(TRIM(data ->> 'siret'), '') AS "siret",
NULLIF(TRIM(data ->> 'telephone'), '') AS "telephone",
CAST(NULLIF(TRIM(data ->> 'latitude'), '') AS FLOAT) AS "latitude",
CAST(NULLIF(TRIM(data ->> 'longitude'), '') AS FLOAT) AS "longitude",
CASE
WHEN data ->> 'categories' IS NOT NULL
THEN CAST(ARRAY(SELECT * FROM JSONB_ARRAY_ELEMENTS_TEXT(data -> 'categories')) AS TEXT [])
ELSE CAST(NULL AS TEXT [])
END AS "categories",
CASE
WHEN data ->> 'publics' IS NOT NULL
THEN CAST(ARRAY(SELECT * FROM JSONB_ARRAY_ELEMENTS_TEXT(data -> 'publics')) AS TEXT [])
ELSE CAST(NULL AS TEXT [])
END AS "publics",
CASE
WHEN data ->> 'quartiers' IS NOT NULL
THEN CAST(ARRAY(SELECT * FROM JSONB_ARRAY_ELEMENTS_TEXT(data -> 'quartiers')) AS TEXT [])
ELSE CAST(NULL AS TEXT [])
END AS "quartiers",
CASE
WHEN data ->> 'services' IS NOT NULL
THEN CAST(ARRAY(SELECT * FROM JSONB_ARRAY_ELEMENTS_TEXT(data -> 'services')) AS TEXT [])
ELSE CAST(NULL AS TEXT [])
END AS "services",
CASE
WHEN data ->> 'type_structure' IS NOT NULL
THEN CAST(ARRAY(SELECT * FROM JSONB_ARRAY_ELEMENTS_TEXT(data -> 'type_structure')) AS TEXT [])
ELSE CAST(NULL AS TEXT [])
END AS "type_structure"
FROM source
)

SELECT * FROM final
13 changes: 13 additions & 0 deletions pipeline/dbt/models/staging/sources/fredo/stg_fredo__types.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
WITH source AS (
{{ stg_source_header('fredo', 'structures') }}
),

final AS (
SELECT
MD5(data::TEXT) AS "hash",
JSONB_ARRAY_ELEMENTS_TEXT(data -> 'type_structure') AS "value"
FROM source
WHERE data ->> 'type_structure' IS NOT NULL
)

SELECT * FROM final
1 change: 1 addition & 0 deletions pipeline/defaults.env
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ AIRFLOW_VAR_CD72_SERVICES_FILE_URL=https://grist.incubateur.net/o/datainclusion/
AIRFLOW_VAR_DI_EXTRA_SERVICES_FILE_URL=https://docs.google.com/spreadsheets/d/1vAuD_XTMOEmtH9li3hL1iydyp3UVLVb_DaR5nmE5kFY/export?format=csv&gid=0
AIRFLOW_VAR_DI_EXTRA_STRUCTURES_FILE_URL=https://docs.google.com/spreadsheets/d/1vAuD_XTMOEmtH9li3hL1iydyp3UVLVb_DaR5nmE5kFY/export?format=csv&gid=943548723
AIRFLOW_VAR_DORA_API_URL=https://api.dora.inclusion.beta.gouv.fr/api/v2/
AIRFLOW_VAR_FREDO_API_URL=https://fredo.re/app-fredo/api/v1
AIRFLOW_VAR_FT_API_URL=https://api.emploi-store.fr/
AIRFLOW_VAR_FT_SERVICES_TEMPLATE_URL=https://docs.google.com/spreadsheets/d/e/2PACX-1vTeZ1eyKN5DkCUh1U6DlOuBGHLVk4vQ_5ZBgJlNtkZqKnSQrSJK_sSJi4KhuhOM2yZyrscc7DQ27d5l/pub?gid=0&single=true&output=csv
AIRFLOW_VAR_DORA_PREPROD_API_URL=https://api.dora.incubateur.net/api/v2/
Expand Down

0 comments on commit b71be3d

Please sign in to comment.