diff --git a/api/src/data_inclusion/api/entrypoints/fastapi.py b/api/src/data_inclusion/api/entrypoints/fastapi.py index 7b8f2f56..5d974f13 100644 --- a/api/src/data_inclusion/api/entrypoints/fastapi.py +++ b/api/src/data_inclusion/api/entrypoints/fastapi.py @@ -118,6 +118,7 @@ def list_structures( query = query.filter(models.Structure.source != "agefiph") if not request.user.is_authenticated or "dora" not in request.user.username: query = query.filter(models.Structure.source != "soliguide") + query = query.filter(models.Structure.source != "data-inclusion") if id_ is not None: query = query.filter_by(id=id_) @@ -303,6 +304,7 @@ def list_services( query = query.filter(models.Structure.source != "agefiph") if not request.user.is_authenticated or "dora" not in request.user.username: query = query.filter(models.Structure.source != "soliguide") + query = query.filter(models.Structure.source != "data-inclusion") if departement is not None: query = query.filter( @@ -438,6 +440,7 @@ def search_services( query = query.filter(models.Structure.source != "agefiph") if not request.user.is_authenticated or "dora" not in request.user.username: query = query.filter(models.Structure.source != "soliguide") + query = query.filter(models.Structure.source != "data-inclusion") if commune_instance is not None: # filter by zone de diffusion diff --git a/pipeline/dags/dag_utils/settings.py b/pipeline/dags/dag_utils/settings.py index a568f643..a551eb66 100644 --- a/pipeline/dags/dag_utils/settings.py +++ b/pipeline/dags/dag_utils/settings.py @@ -243,12 +243,12 @@ "streams": [ { "id": "services", - "filename": "services.json", + "filename": "services.csv", "url": Variable.get("DI_EXTRA_SERVICES_FILE_URL", None), }, { "id": "structures", - "filename": "structures.json", + "filename": "structures.csv", "url": Variable.get("DI_EXTRA_STRUCTURES_FILE_URL", None), }, ], diff --git a/pipeline/dags/import_sources.py b/pipeline/dags/import_sources.py index 1650e2a0..b02985a0 100644 --- a/pipeline/dags/import_sources.py +++ b/pipeline/dags/import_sources.py @@ -112,7 +112,7 @@ def load_from_s3_to_data_warehouse( "annuaire-du-service-public": annuaire_du_service_public.read, "cd35": lambda path: utils.read_csv(path, sep=";"), "cd72": lambda path: utils.read_csv(path, sep=","), - "data-inclusion": utils.read_json, + "data-inclusion": lambda path: utils.read_csv(path, sep=","), "dora": utils.read_json, "emplois-de-linclusion": utils.read_json, "finess": lambda path: utils.read_csv(path, sep=","), diff --git a/pipeline/dbt/models/intermediate/sources/data_inclusion/int_data_inclusion__services.sql b/pipeline/dbt/models/intermediate/sources/data_inclusion/int_data_inclusion__services.sql index 0785071a..5893bb3c 100644 --- a/pipeline/dbt/models/intermediate/sources/data_inclusion/int_data_inclusion__services.sql +++ b/pipeline/dbt/models/intermediate/sources/data_inclusion/int_data_inclusion__services.sql @@ -16,44 +16,44 @@ di_profil_by_dora_profil AS ( final AS ( SELECT - id AS "adresse_id", - TRUE AS "contact_public", - contact_nom AS "contact_nom_prenom", - courriel AS "courriel", - cumulable AS "cumulable", - date_creation::DATE AS "date_creation", - date_maj::DATE AS "date_maj", - date_suspension::DATE AS "date_suspension", - formulaire_en_ligne AS "formulaire_en_ligne", - frais_autres AS "frais_autres", - id AS "id", - justificatifs AS "justificatifs", - NULL AS "lien_source", -- ignored - modes_accueil AS "modes_accueil", - NULL::TEXT [] AS "modes_orientation_accompagnateur", - NULL AS "modes_orientation_accompagnateur_autres", - NULL::TEXT [] AS "modes_orientation_beneficiaire", - NULL AS "modes_orientation_beneficiaire_autres", - nom AS "nom", - presentation_resume AS "presentation_resume", - presentation_detail AS "presentation_detail", - prise_rdv AS "prise_rdv", + id AS "adresse_id", + contact_public AS "contact_public", + contact_nom AS "contact_nom_prenom", + courriel AS "courriel", + cumulable AS "cumulable", + date_creation::DATE AS "date_creation", + date_maj::DATE AS "date_maj", + date_suspension::DATE AS "date_suspension", + formulaire_en_ligne AS "formulaire_en_ligne", + frais_autres AS "frais_autres", + id AS "id", + justificatifs AS "justificatifs", + NULL AS "lien_source", -- ignored + modes_accueil AS "modes_accueil", + modes_orientation_accompagnateur AS "modes_orientation_accompagnateur", + modes_orientation_accompagnateur_autres AS "modes_orientation_accompagnateur_autres", + modes_orientation_beneficiaire AS "modes_orientation_beneficiaire", + modes_orientation_beneficiaire_autres AS "modes_orientation_beneficiaire_autres", + nom AS "nom", + presentation_resume AS "presentation_resume", + presentation_detail AS "presentation_detail", + prise_rdv AS "prise_rdv", ARRAY( SELECT di_profil_by_dora_profil.di_profil FROM di_profil_by_dora_profil WHERE di_profil_by_dora_profil.dora_profil = ANY(services.profils) - )::TEXT [] AS "profils", - recurrence AS "recurrence", - _di_source_id AS "source", - structure_id AS "structure_id", - NULL AS "telephone", - thematiques AS "thematiques", - types AS "types", - zone_diffusion_code AS "zone_diffusion_code", - zone_diffusion_nom AS "zone_diffusion_nom", - zone_diffusion_type AS "zone_diffusion_type", - pre_requis AS "pre_requis", - ARRAY[frais] AS "frais" + )::TEXT [] AS "profils", + recurrence AS "recurrence", + _di_source_id AS "source", + structure_id AS "structure_id", + telephone AS "telephone", + thematiques AS "thematiques", + types AS "types", + zone_diffusion_code AS "zone_diffusion_code", + zone_diffusion_nom AS "zone_diffusion_nom", + zone_diffusion_type AS "zone_diffusion_type", + pre_requis AS "pre_requis", + frais AS "frais" FROM services ) diff --git a/pipeline/dbt/models/marts/opendata/opendata_services.sql b/pipeline/dbt/models/marts/opendata/opendata_services.sql index f6b7b577..933673a4 100644 --- a/pipeline/dbt/models/marts/opendata/opendata_services.sql +++ b/pipeline/dbt/models/marts/opendata/opendata_services.sql @@ -13,7 +13,7 @@ final AS ( {{ obfuscate('courriel') }} AS "courriel", {{ obfuscate('telephone') }} AS "telephone" FROM services - WHERE services.source NOT IN ('soliguide', 'agefiph') + WHERE services.source NOT IN ('soliguide', 'agefiph', 'data-inclusion') ) SELECT * FROM final diff --git a/pipeline/dbt/models/marts/opendata/opendata_structures.sql b/pipeline/dbt/models/marts/opendata/opendata_structures.sql index 194b6266..f5c8a1b3 100644 --- a/pipeline/dbt/models/marts/opendata/opendata_structures.sql +++ b/pipeline/dbt/models/marts/opendata/opendata_structures.sql @@ -20,7 +20,7 @@ final AS ( ELSE structures.telephone END AS "telephone" FROM structures - WHERE structures.source NOT IN ('soliguide', 'siao', 'finess', 'agefiph') + WHERE structures.source NOT IN ('soliguide', 'siao', 'finess', 'agefiph', 'data-inclusion') ) SELECT * FROM final diff --git a/pipeline/dbt/models/staging/sources/data_inclusion/_data_inclusion__models.yml b/pipeline/dbt/models/staging/sources/data_inclusion/_data_inclusion__models.yml index 0455ea2b..fe4401fe 100644 --- a/pipeline/dbt/models/staging/sources/data_inclusion/_data_inclusion__models.yml +++ b/pipeline/dbt/models/staging/sources/data_inclusion/_data_inclusion__models.yml @@ -8,6 +8,12 @@ models: - unique - not_null - dbt_utils.not_empty_string + - name: structure_id + tests: + - not_null + - relationships: + to: ref('stg_data_inclusion__structures') + field: id - name: stg_data_inclusion__structures columns: diff --git a/pipeline/dbt/models/staging/sources/data_inclusion/stg_data_inclusion__services.sql b/pipeline/dbt/models/staging/sources/data_inclusion/stg_data_inclusion__services.sql index 71ab3ca1..92aaeccd 100644 --- a/pipeline/dbt/models/staging/sources/data_inclusion/stg_data_inclusion__services.sql +++ b/pipeline/dbt/models/staging/sources/data_inclusion/stg_data_inclusion__services.sql @@ -4,45 +4,51 @@ WITH source AS ( final AS ( SELECT - _di_source_id AS "_di_source_id", - (data ->> 'contact_public')::BOOLEAN AS "contact_public", - (data ->> 'cumulable')::BOOLEAN AS "cumulable", - (data ->> 'date_creation')::TIMESTAMP WITH TIME ZONE AS "date_creation", - (data ->> 'date_maj')::TIMESTAMP WITH TIME ZONE AS "date_maj", - (data ->> 'date_suspension')::TIMESTAMP WITH TIME ZONE AS "date_suspension", - (data ->> 'latitude')::FLOAT AS "latitude", - (data ->> 'longitude')::FLOAT AS "longitude", - ARRAY(SELECT * FROM JSONB_ARRAY_ELEMENTS_TEXT(data -> 'modes_accueil'))::TEXT [] AS "modes_accueil", - ARRAY(SELECT * FROM JSONB_ARRAY_ELEMENTS_TEXT(data -> 'profils'))::TEXT [] AS "profils", - ARRAY(SELECT * FROM JSONB_ARRAY_ELEMENTS_TEXT(data -> 'thematiques'))::TEXT [] AS "thematiques", - ARRAY(SELECT * FROM JSONB_ARRAY_ELEMENTS_TEXT(data -> 'types'))::TEXT [] AS "types", - STRING_TO_ARRAY(NULLIF(TRIM(data ->> 'justificatifs'), ''), ',') AS "justificatifs", - STRING_TO_ARRAY(NULLIF(TRIM(data ->> 'pre_requis'), ''), ',') AS "pre_requis", - data ->> 'adresse' AS "adresse", - data ->> 'code_insee' AS "code_insee", - data ->> 'code_postal' AS "code_postal", - data ->> 'commune' AS "commune", - data ->> 'complement_adresse' AS "complement_adresse", - NULLIF(TRIM(data ->> 'contact_nom'), '') AS "contact_nom", - NULLIF(TRIM(data ->> 'contact_prenom'), '') AS "contact_prenom", - NULLIF(TRIM(data ->> 'courriel'), '') AS "courriel", - data ->> 'formulaire_en_ligne' AS "formulaire_en_ligne", - data ->> 'frais_autres' AS "frais_autres", - data ->> 'frais' AS "frais", - data ->> 'id' AS "id", - data ->> 'lien_source' AS "lien_source", - data ->> 'nom' AS "nom", - data ->> 'presentation_resume' AS "presentation_resume", - data ->> 'presentation_detail' AS "presentation_detail", - data ->> 'prise_rdv' AS "prise_rdv", - data ->> 'recurrence' AS "recurrence", - data ->> 'source' AS "source", - data ->> 'structure_id' AS "structure_id", - NULLIF(TRIM(data ->> 'telephone'), '') AS "telephone", - NULLIF(TRIM(data ->> 'zone_diffusion_code'), '') AS "zone_diffusion_code", - NULLIF(TRIM(data ->> 'zone_diffusion_nom'), '') AS "zone_diffusion_nom", - data ->> 'zone_diffusion_type' AS "zone_diffusion_type" + _di_source_id AS "_di_source_id", + 'data-inclusion' AS "source", + CAST(data ->> 'contact_public' AS BOOLEAN) AS "contact_public", + CAST(data ->> 'cumulable' AS BOOLEAN) AS "cumulable", + TO_DATE(data ->> 'date_creation', 'DD/MM/YYYY') AS "date_creation", + TO_DATE(data ->> 'date_maj', 'DD/MM/YYYY') AS "date_maj", + TO_DATE(data ->> 'date_suspension', 'DD/MM/YYYY') AS "date_suspension", + CAST(data ->> 'latitude' AS FLOAT) AS "latitude", + CAST(data ->> 'longitude' AS FLOAT) AS "longitude", + ARRAY_REMOVE(ARRAY(SELECT value FROM JSONB_EACH_TEXT(data) WHERE key ~* 'modes_accueil.\d+'), NULL) AS "modes_accueil", + ARRAY_REMOVE(ARRAY(SELECT value FROM JSONB_EACH_TEXT(data) WHERE key ~* 'profils.\d+'), NULL) AS "profils", + ARRAY_REMOVE(ARRAY(SELECT value FROM JSONB_EACH_TEXT(data) WHERE key ~* 'thematiques.\d+'), NULL) AS "thematiques", + ARRAY_REMOVE(ARRAY(SELECT value FROM JSONB_EACH_TEXT(data) WHERE key ~* 'types.\d+'), NULL) AS "types", + ARRAY_REMOVE(ARRAY(SELECT value FROM JSONB_EACH_TEXT(data) WHERE key ~* 'justificatifs.\d+'), NULL) AS "justificatifs", + ARRAY_REMOVE(ARRAY(SELECT value FROM JSONB_EACH_TEXT(data) WHERE key ~* 'pre_requis.\d+'), NULL) AS "pre_requis", + data ->> 'adresse' AS "adresse", + data ->> 'code_insee' AS "code_insee", + data ->> 'code_postal' AS "code_postal", + data ->> 'commune' AS "commune", + data ->> 'complement_adresse' AS "complement_adresse", + data ->> 'contact_nom' AS "contact_nom", + data ->> 'contact_prenom' AS "contact_prenom", + data ->> 'courriel' AS "courriel", + data ->> 'formulaire_en_ligne' AS "formulaire_en_ligne", + data ->> 'frais_autres' AS "frais_autres", + ARRAY_REMOVE(ARRAY(SELECT value FROM JSONB_EACH_TEXT(data) WHERE key ~* 'frais.\d+'), NULL) AS "frais", + data ->> 'id' AS "id", + data ->> 'lien_source' AS "lien_source", + data ->> 'nom' AS "nom", + data ->> 'presentation_resume' AS "presentation_resume", + data ->> 'presentation_detail' AS "presentation_detail", + data ->> 'prise_rdv' AS "prise_rdv", + data ->> 'recurrence' AS "recurrence", + data ->> 'structure_id' AS "structure_id", + data ->> 'telephone' AS "telephone", + data ->> 'zone_diffusion_code' AS "zone_diffusion_code", + data ->> 'zone_diffusion_nom' AS "zone_diffusion_nom", + data ->> 'zone_diffusion_type' AS "zone_diffusion_type", + ARRAY_REMOVE(ARRAY(SELECT value FROM JSONB_EACH_TEXT(data) WHERE key ~* 'modes_orientation_beneficiaire.\d+'), NULL) AS "modes_orientation_beneficiaire", + data ->> 'modes_orientation_beneficiaire_autres' AS "modes_orientation_beneficiaire_autres", + ARRAY_REMOVE(ARRAY(SELECT value FROM JSONB_EACH_TEXT(data) WHERE key ~* 'modes_orientation_accompagnateur.\d+'), NULL) AS "modes_orientation_accompagnateur", + data ->> 'modes_orientation_accompagnateur_autres' AS "modes_orientation_accompagnateur_autres" FROM source + WHERE + NOT COALESCE(CAST(data ->> '__ignore__' AS BOOLEAN), FALSE) ) SELECT * FROM final diff --git a/pipeline/dbt/models/staging/sources/data_inclusion/stg_data_inclusion__structures.sql b/pipeline/dbt/models/staging/sources/data_inclusion/stg_data_inclusion__structures.sql index ddbfdee0..189ff38d 100644 --- a/pipeline/dbt/models/staging/sources/data_inclusion/stg_data_inclusion__structures.sql +++ b/pipeline/dbt/models/staging/sources/data_inclusion/stg_data_inclusion__structures.sql @@ -4,34 +4,36 @@ WITH source AS ( final AS ( SELECT - _di_source_id AS "_di_source_id", - (data ->> 'antenne')::BOOLEAN AS "antenne", - (data ->> 'date_maj')::TIMESTAMP WITH TIME ZONE AS "date_maj", - ARRAY(SELECT * FROM JSONB_ARRAY_ELEMENTS_TEXT(data -> 'labels_autres'))::TEXT [] AS "labels_autres", - ARRAY(SELECT * FROM JSONB_ARRAY_ELEMENTS_TEXT(data -> 'labels_nationaux'))::TEXT [] AS "labels_nationaux", - (data ->> 'latitude')::FLOAT AS "latitude", - (data ->> 'longitude')::FLOAT AS "longitude", - (data ->> 'thematiques')::TEXT [] AS "thematiques", - data ->> 'accessibilite' AS "accessibilite", - data ->> 'adresse' AS "adresse", - data ->> 'code_insee' AS "code_insee", - data ->> 'code_postal' AS "code_postal", - data ->> 'commune' AS "commune", - NULLIF(TRIM(data ->> 'complement_adresse'), '') AS "complement_adresse", - data ->> 'courriel' AS "courriel", - data ->> 'horaires_ouverture' AS "horaires_ouverture", - data ->> 'id' AS "id", - data ->> 'lien_source' AS "lien_source", - NULLIF(TRIM(data ->> 'nom'), '') AS "nom", - data ->> 'presentation_detail' AS "presentation_detail", - data ->> 'presentation_resume' AS "presentation_resume", - data ->> 'rna' AS "rna", - data ->> 'siret' AS "siret", - data ->> 'site_web' AS "site_web", - data ->> 'source' AS "source", - data ->> 'telephone' AS "telephone", - data ->> 'typologie' AS "typologie" + _di_source_id AS "_di_source_id", + CAST(data ->> 'antenne' AS BOOLEAN) AS "antenne", + TO_DATE(data ->> 'date_maj', 'DD/MM/YYYY') AS "date_maj", + ARRAY_REMOVE(ARRAY(SELECT value FROM JSONB_EACH_TEXT(data) WHERE key ~* 'labels_autres.\d+'), NULL) AS "labels_autres", + ARRAY_REMOVE(ARRAY(SELECT value FROM JSONB_EACH_TEXT(data) WHERE key ~* 'labels_nationaux.\d+'), NULL) AS "labels_nationaux", + CAST(data ->> 'latitude' AS FLOAT) AS "latitude", + CAST(data ->> 'longitude' AS FLOAT) AS "longitude", + ARRAY_REMOVE(ARRAY(SELECT value FROM JSONB_EACH_TEXT(data) WHERE key ~* 'thematiques.\d+'), NULL) AS "thematiques", + data ->> 'accessibilite' AS "accessibilite", + data ->> 'adresse' AS "adresse", + data ->> 'code_insee' AS "code_insee", + data ->> 'code_postal' AS "code_postal", + data ->> 'commune' AS "commune", + data ->> 'complement_adresse' AS "complement_adresse", + data ->> 'courriel' AS "courriel", + data ->> 'horaires_ouverture' AS "horaires_ouverture", + data ->> 'id' AS "id", + data ->> 'lien_source' AS "lien_source", + data ->> 'nom' AS "nom", + data ->> 'presentation_detail' AS "presentation_detail", + data ->> 'presentation_resume' AS "presentation_resume", + data ->> 'rna' AS "rna", + data ->> 'siret' AS "siret", + data ->> 'site_web' AS "site_web", + data ->> 'source' AS "source", + data ->> 'telephone' AS "telephone", + data ->> 'typologie' AS "typologie" FROM source + WHERE + NOT COALESCE(CAST(data ->> '__ignore__' AS BOOLEAN), FALSE) ) SELECT * FROM final diff --git a/pipeline/defaults.env b/pipeline/defaults.env index 9d13bfe1..1ddf4a4f 100644 --- a/pipeline/defaults.env +++ b/pipeline/defaults.env @@ -6,8 +6,8 @@ AIRFLOW_VAR_BAN_API_URL=https://api-adresse.data.gouv.fr AIRFLOW_VAR_CD35_FILE_URL=https://data.ille-et-vilaine.fr/dataset/8d5ec0f0-ebe1-442d-9d99-655b37d5ad07/resource/8b781e9d-e11d-486c-98cf-0f63abfae8ed/download/annuaire_sociale.csv AIRFLOW_VAR_CD72_STRUCTURES_FILE_URL=https://grist.incubateur.net/o/datainclusion/api/docs/dFpXXzs2fug9Kb7zZhyWyn/download/csv?tableId=Structures AIRFLOW_VAR_CD72_SERVICES_FILE_URL=https://grist.incubateur.net/o/datainclusion/api/docs/dFpXXzs2fug9Kb7zZhyWyn/download/csv?tableId=Services -AIRFLOW_VAR_DI_EXTRA_SERVICES_FILE_URL=https://data-inclusion-lake.s3.fr-par.scw.cloud/sources/data-inclusion/2023-08-16/services.json -AIRFLOW_VAR_DI_EXTRA_STRUCTURES_FILE_URL=https://data-inclusion-lake.s3.fr-par.scw.cloud/sources/data-inclusion/2023-08-16/structures.json +AIRFLOW_VAR_DI_EXTRA_SERVICES_FILE_URL=https://docs.google.com/spreadsheets/d/1vAuD_XTMOEmtH9li3hL1iydyp3UVLVb_DaR5nmE5kFY/export?format=csv&gid=0 +AIRFLOW_VAR_DI_EXTRA_STRUCTURES_FILE_URL=https://docs.google.com/spreadsheets/d/1vAuD_XTMOEmtH9li3hL1iydyp3UVLVb_DaR5nmE5kFY/export?format=csv&gid=943548723 AIRFLOW_VAR_DORA_API_URL=https://api.dora.inclusion.beta.gouv.fr/api/v2/ AIRFLOW_VAR_DORA_PREPROD_API_URL=https://api.dora.incubateur.net/api/v2/ AIRFLOW_VAR_EMPLOIS_API_URL=https://emplois.inclusion.beta.gouv.fr/api/v1/structures/