diff --git a/pipeline/dags/dag_utils/sources/__init__.py b/pipeline/dags/dag_utils/sources/__init__.py index f75667dd..a74bf646 100644 --- a/pipeline/dags/dag_utils/sources/__init__.py +++ b/pipeline/dags/dag_utils/sources/__init__.py @@ -245,6 +245,7 @@ # the "request token" is the client_id:client_secret string. "token": Variable.get("FT_API_TOKEN", None), "extractor": france_travail.extract, + "reader": france_travail.read, }, "services": { "filename": "services.json", diff --git a/pipeline/dags/dag_utils/sources/france_travail.py b/pipeline/dags/dag_utils/sources/france_travail.py index 09e78cc7..6b853384 100644 --- a/pipeline/dags/dag_utils/sources/france_travail.py +++ b/pipeline/dags/dag_utils/sources/france_travail.py @@ -1,3 +1,6 @@ +import json +from pathlib import Path +from typing import Any, Dict, List, Optional from urllib.parse import urljoin @@ -32,3 +35,93 @@ def extract(url, token, id=None): response.raise_for_status() return response.content + + +def same_values_every_day(formated_horaires: Dict[str, str]) -> bool: + first_value = next(iter(formated_horaires.values())) + return all(value == first_value for value in formated_horaires.values()) + + +def format_date_ft_to_open_street_map( + horaires: Optional[List[Dict[str, Any]]], +) -> Optional[str]: + """Format horaires from France Travail to Open Street Map format + cf: https://francetravail.io/data/api/referentiel-agences?tabgroup-api=documentation&doc-section=api-doc-section-caracteristiques + cf: https://wiki.openstreetmap.org/wiki/Key:opening_hours + The object horaires also contains information on openning hours for appointment + only. + We could added it in this format for example: + "Mo 12:00-14:00 open "Sans RDv", Mo 14:00-16:00 open "Sur RDV seulement"; PH off" + Because of the complexity of the format, we will not implement it for now. + """ + if not horaires: + return None + + mapping_days = { + 1: "Mo", + 2: "Tu", + 3: "We", + 4: "Th", + 5: "Fr", + } + + horaires = sorted(horaires, key=lambda x: x["jour"]) + + formated_horaires = {} + + for horaire in horaires: + if horaire["horaireFerme"] == "O": + continue + day = mapping_days[horaire["jour"]] + if horaire["horaireEnContinu"] == "O": + formated_horaires[day] = ( + f"{horaire['ouvertureMatin']}-{horaire['fermetureApresMidi']};" + ) + elif "ouvertureApresMidi" not in horaire: + formated_horaires[day] = ( + f"{horaire['ouvertureMatin']}-{horaire['fermetureMatin']};" + ) + elif "ouvertureMatin" not in horaire: + formated_horaires[day] = ( + f"{horaire['ouvertureApresMidi']}-{horaire['fermetureApresMidi']};" + ) + else: + formated_horaires[day] = ( + f"{horaire['ouvertureMatin']}-{horaire['fermetureMatin']},{horaire['ouvertureApresMidi']}-{horaire['fermetureApresMidi']};" + ) + + if formated_horaires == {}: + return None + + # Simplify format for 70% of the dataset + if len(formated_horaires) == 5 and same_values_every_day(formated_horaires): + return f"Mo-Fr {next(iter(formated_horaires.values()))} PH off" + + open_street_map_horaires = "" + for day, formated_horaire in formated_horaires.items(): + open_street_map_horaires += f"{day} {formated_horaire}" + + return f"{open_street_map_horaires} PH off" + + +def read(path: Path): + """utils.read_json is enough but parse horaires to horaires_open_street_map + cf: https://francetravail.io/data/api/referentiel-agences?tabgroup-api=documentation&doc-section=api-doc-section-caracteristiques + cf: https://wiki.openstreetmap.org/wiki/Key:opening_hours + """ + import pandas as pd + + from . import utils + + with path.open() as file: + data = json.load(file) + + for agence in data: + agence["horaires_open_street_map"] = ( + format_date_ft_to_open_street_map(agence["horaires"]) + if "horaires" in agence + else None + ) + + df = pd.DataFrame.from_records(data) + return utils.df_clear_nan(df) diff --git a/pipeline/dbt/models/intermediate/sources/france_travail/int_france_travail__structures.sql b/pipeline/dbt/models/intermediate/sources/france_travail/int_france_travail__structures.sql index 8a624f23..486ec7cb 100644 --- a/pipeline/dbt/models/intermediate/sources/france_travail/int_france_travail__structures.sql +++ b/pipeline/dbt/models/intermediate/sources/france_travail/int_france_travail__structures.sql @@ -7,7 +7,7 @@ final AS ( FALSE AS "antenne", NULL::TEXT [] AS "labels_autres", NULL::TEXT [] AS "thematiques", - NULL AS "horaires_ouverture", + horaires_open_street_map AS "horaires_ouverture", NULL AS "lien_source", NULL AS "presentation_detail", NULL AS "presentation_resume", diff --git a/pipeline/dbt/models/staging/sources/france_travail/stg_france_travail__agences.sql b/pipeline/dbt/models/staging/sources/france_travail/stg_france_travail__agences.sql index 5044b49b..064a8d12 100644 --- a/pipeline/dbt/models/staging/sources/france_travail/stg_france_travail__agences.sql +++ b/pipeline/dbt/models/staging/sources/france_travail/stg_france_travail__agences.sql @@ -4,21 +4,22 @@ WITH source AS ( final AS ( SELECT - _di_source_id AS "_di_source_id", - CURRENT_DATE AS "date_maj", - CASE WHEN data ->> 'dispositifADEDA' = 'true' THEN 'https://www.francetravail.fr/actualites/a-laffiche/2022/adeda-un-dispositif-pour-mieux-a.html' END AS "accessibilite", - CAST(data #>> '{adressePrincipale,gpsLat}' AS FLOAT) AS "latitude", - CAST(data #>> '{adressePrincipale,gpsLon}' AS FLOAT) AS "longitude", - data #>> '{adressePrincipale,ligne4}' AS "adresse", - data #>> '{adressePrincipale,ligne3}' AS "complement_adresse", - data #>> '{adressePrincipale,communeImplantation}' AS "code_insee", - data #>> '{adressePrincipale,bureauDistributeur}' AS "code_postal", - data #>> '{contact,email}' AS "courriel", - data #>> '{contact,telephonePublic}' AS "telephone", - data ->> 'code' AS "id", - data ->> 'libelleEtendu' AS "nom", - data ->> 'siret' AS "siret", - data ->> 'type' AS "typologie" + _di_source_id AS "_di_source_id", + CURRENT_DATE AS "date_maj", + CASE WHEN data ->> 'dispositifADEDA' = 'true' THEN 'https://www.pole-emploi.fr/actualites/a-laffiche/2022/adeda-un-dispositif-pour-mieux-a.html' END AS "accessibilite", + CAST(data #>> '{adressePrincipale,gpsLat}' AS FLOAT) AS "latitude", + CAST(data #>> '{adressePrincipale,gpsLon}' AS FLOAT) AS "longitude", + data #>> '{adressePrincipale,ligne4}' AS "adresse", + data #>> '{adressePrincipale,ligne3}' AS "complement_adresse", + data #>> '{adressePrincipale,communeImplantation}' AS "code_insee", + data #>> '{adressePrincipale,bureauDistributeur}' AS "code_postal", + data #>> '{contact,email}' AS "courriel", + data ->> 'horaires_open_street_map' AS "horaires_open_street_map", + data #>> '{contact,telephonePublic}' AS "telephone", + data ->> 'code' AS "id", + data ->> 'libelleEtendu' AS "nom", + data ->> 'siret' AS "siret", + data ->> 'type' AS "typologie" FROM source ) diff --git a/pipeline/tests/unit/test_format_date_ft_to_open_street_map.py b/pipeline/tests/unit/test_format_date_ft_to_open_street_map.py new file mode 100644 index 00000000..ab2e69c5 --- /dev/null +++ b/pipeline/tests/unit/test_format_date_ft_to_open_street_map.py @@ -0,0 +1,156 @@ +from dags.dag_utils.sources.france_travail import format_date_ft_to_open_street_map + + +def test_horaires_continus(): + horaires = [ + { + "jour": 1, + "horaireFerme": "N", + "horaireEnContinu": "O", + "ouvertureMatin": "08:00", + "fermetureApresMidi": "18:00", + } + ] + assert format_date_ft_to_open_street_map(horaires) == "Mo 08:00-18:00; PH off" + + +def test_horaires_non_continus_sans_apres_midi(): + horaires = [ + { + "jour": 2, + "horaireFerme": "N", + "horaireEnContinu": "N", + "ouvertureMatin": "09:00", + "fermetureMatin": "12:00", + } + ] + assert format_date_ft_to_open_street_map(horaires) == "Tu 09:00-12:00; PH off" + + +def test_deux_jours(): + horaires = [ + { + "jour": 1, + "horaireFerme": "N", + "horaireEnContinu": "O", + "ouvertureMatin": "08:00", + "fermetureApresMidi": "18:00", + }, + { + "jour": 2, + "horaireFerme": "N", + "horaireEnContinu": "N", + "ouvertureMatin": "09:00", + "fermetureMatin": "12:00", + }, + ] + assert ( + format_date_ft_to_open_street_map(horaires) + == "Mo 08:00-18:00;Tu 09:00-12:00; PH off" + ) + + +def test_horaires_non_continus_sans_matin(): + horaires = [ + { + "jour": 3, + "horaireFerme": "N", + "horaireEnContinu": "N", + "ouvertureApresMidi": "14:00", + "fermetureApresMidi": "17:00", + } + ] + assert format_date_ft_to_open_street_map(horaires) == "We 14:00-17:00; PH off" + + +def test_horaires_non_continus_complets(): + horaires = [ + { + "jour": 4, + "horaireFerme": "N", + "horaireEnContinu": "N", + "ouvertureMatin": "09:00", + "fermetureMatin": "12:00", + "ouvertureApresMidi": "14:00", + "fermetureApresMidi": "17:00", + } + ] + assert ( + format_date_ft_to_open_street_map(horaires) + == "Th 09:00-12:00,14:00-17:00; PH off" + ) + + +def test_jour_ferme_seul(): + horaires = [{"jour": 4, "horaireFerme": "O"}] + assert format_date_ft_to_open_street_map(horaires) is None + + +def test_jour_ferme(): + horaires = [ + {"jour": 4, "horaireFerme": "O"}, + { + "jour": 5, + "horaireFerme": "N", + "horaireEnContinu": "N", + "ouvertureMatin": "09:00", + "fermetureMatin": "12:00", + }, + ] + assert format_date_ft_to_open_street_map(horaires) == "Fr 09:00-12:00; PH off" + + +def test_dictionnaire_vide(): + horaires = [] + assert format_date_ft_to_open_street_map(horaires) is None + + +def test_format_uniforme_tous_les_jours(): + horaires = [ + { + "jour": 1, + "horaireFerme": "N", + "horaireEnContinu": "O", + "ouvertureMatin": "08:00", + "fermetureApresMidi": "18:00", + }, + { + "jour": 2, + "horaireFerme": "N", + "horaireEnContinu": "O", + "ouvertureMatin": "08:00", + "fermetureApresMidi": "18:00", + }, + { + "jour": 3, + "horaireFerme": "N", + "horaireEnContinu": "O", + "ouvertureMatin": "08:00", + "fermetureApresMidi": "18:00", + }, + { + "jour": 4, + "horaireFerme": "N", + "horaireEnContinu": "O", + "ouvertureMatin": "08:00", + "fermetureApresMidi": "18:00", + }, + { + "jour": 5, + "horaireFerme": "N", + "horaireEnContinu": "O", + "ouvertureMatin": "08:00", + "fermetureApresMidi": "18:00", + }, + ] + assert format_date_ft_to_open_street_map(horaires) == "Mo-Fr 08:00-18:00; PH off" + + +def test_horaires_vide(): + horaires = [] + assert format_date_ft_to_open_street_map(horaires) is None + + +def test_horaires_none(): + horaires = None + assert format_date_ft_to_open_street_map(horaires) is None