Skip to content

Commit

Permalink
feat(pipeline) : Use mediation numerique API
Browse files Browse the repository at this point in the history
And get rid of the use of the multiple structures and sources.

To be noted:
- we have added a check to remove the entries form the API that are
  "merged" structures, since DI already stores a lot of duplicates.
  The merged IDs from the ANCT don't look stable and we don't want to
  depend on their algorithm for it.
  We only keep deduplicated structures and services for now.

- with the API we gained a few fields in the structures : we now have an
  INSEE code and accessibility options.

- it seems that switching from the open data to the API we "lost" a few
  sources but gained quite a few others (23875 -> 26592 records)
  We lost the sources : "cd28-appui-territorial" and "hub-antilles"
  We gained the sources : "Corse", "Epernay", "La Sarthe", "Mednum BFC".

Also, we filter-out the Dora structures and services.
  • Loading branch information
vperron committed Dec 18, 2023
1 parent dc6723e commit 09e8a08
Show file tree
Hide file tree
Showing 92 changed files with 90 additions and 938 deletions.
71 changes: 17 additions & 54 deletions pipeline/dags/dag_utils/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,60 +128,23 @@
},
],
},
*[
{
"id": f"mediation-numerique-{source_id}",
"schedule": "@daily",
"snapshot": False,
"streams": [
{
"id": "structures",
"filename": "structures.json",
"url": Variable.get(
f"MEDNUM_{source_id.upper().replace('-', '_')}_DATASET_URL",
None,
),
},
{
"id": "services",
"filename": "services.json",
"url": Variable.get(
f"MEDNUM_{source_id.upper().replace('-', '_')}_DATASET_URL",
None,
),
},
],
}
for source_id in [
"aidants-connect",
"angers",
"assembleurs",
"cd17",
"cd23",
"cd28-appui-territorial",
"cd33",
"cd40",
"cd44",
"cd49",
"cd85",
"cd87",
"conseiller-numerique",
"conumm",
"cr93",
"etapes-numerique",
"fibre-64",
"france-services",
"france-tiers-lieux",
"francilin",
"hinaura",
"hub-antilles",
"hub-lo",
"mulhouse",
"res-in",
"rhinocc",
"ultra-numerique",
]
],
{
"id": "mediation-numerique",
"schedule": "@daily",
"snapshot": False,
"streams": [
{
"id": "structures",
"filename": "structures.json",
"url": Variable.get("MEDNUM_API_URL", None),
},
{
"id": "services",
"filename": "services.json",
"url": Variable.get("MEDNUM_API_URL", None),
},
],
},
{
"id": "soliguide",
"schedule": "@daily",
Expand Down
11 changes: 4 additions & 7 deletions pipeline/dags/import_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,10 @@ def extract_from_source_to_datalake_bucket(
"formations": reseau_alpha.extract_formations,
},
"pole-emploi": dora.extract,
"mediation-numerique": mediation_numerique.extract,
}

# TODO(vperron): Replace by dict of objects
if source_id.startswith("mediation-numerique-"):
extract_fn = mediation_numerique.extract
elif isinstance(EXTRACT_FN_BY_SOURCE_ID[source_id], dict):
if isinstance(EXTRACT_FN_BY_SOURCE_ID[source_id], dict):
extract_fn = EXTRACT_FN_BY_SOURCE_ID[source_id][stream_config["id"]]
else:
extract_fn = EXTRACT_FN_BY_SOURCE_ID[source_id]
Expand Down Expand Up @@ -127,14 +125,13 @@ def load_from_s3_to_data_warehouse(
},
"agefiph": agefiph.read,
"pole-emploi": utils.read_json,
"mediation-numerique": utils.read_json,
}

source_id = source_config["id"]
stream_id = stream_config["id"]

if source_id.startswith("mediation-numerique-"):
read_fn = utils.read_json
elif isinstance(READ_FN_BY_SOURCE_ID[source_id], dict):
if isinstance(READ_FN_BY_SOURCE_ID[source_id], dict):
read_fn = READ_FN_BY_SOURCE_ID[source_id][stream_id]
else:
read_fn = READ_FN_BY_SOURCE_ID[source_id]
Expand Down
29 changes: 23 additions & 6 deletions pipeline/dags/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,26 +92,44 @@ def _geocode():
)

dbt_staging_tasks_list = []

for source_config in sorted(SOURCES_CONFIGS, key=lambda d: d["id"]):
dbt_source_id = source_config["id"].replace("-", "_")

stg_selector = f"path:models/staging/sources/**/stg_{dbt_source_id}__*.sql"
int_selector = f"path:models/intermediate/sources/**/int_{dbt_source_id}__*.sql"

with TaskGroup(group_id=source_config["id"]) as source_task_group:
dbt_run_staging = dbt_operator_factory(
task_id="dbt_run_staging",
command="run",
select=f"path:models/staging/sources/**/stg_{dbt_source_id}__*.sql",
select=stg_selector,
)

dbt_test_staging = dbt_operator_factory(
task_id="dbt_test_staging",
command="test",
select=f"path:models/staging/sources/**/stg_{dbt_source_id}__*.sql",
select=stg_selector,
)

dbt_run_intermediate = dbt_operator_factory(
task_id="dbt_run_intermediate",
command="run",
select=int_selector,
)

# TODO: add intermediate models here
# this would require to refactor mediation numerique models
dbt_test_intermediate = dbt_operator_factory(
task_id="dbt_test_intermediate",
command="test",
select=int_selector,
)

(dbt_run_staging >> dbt_test_staging)
(
dbt_run_staging
>> dbt_test_staging
>> dbt_run_intermediate
>> dbt_test_intermediate
)

dbt_staging_tasks_list += [source_task_group]

Expand All @@ -122,7 +140,6 @@ def _geocode():
[
# FIXME: handle odspep as other sources (add to dags/settings.py)
"path:models/staging/sources/odspep",
"path:models/intermediate/sources/**/*",
"path:models/intermediate/int__union_adresses.sql",
"path:models/intermediate/int__union_services.sql",
"path:models/intermediate/int__union_structures.sql",
Expand Down
160 changes: 2 additions & 158 deletions pipeline/dbt/models/_sources.yml
Original file line number Diff line number Diff line change
Expand Up @@ -84,164 +84,8 @@ sources:
- name: siaes
- name: organisations

- name: mediation_numerique_aidants_connect
schema: mediation_numerique_aidants_connect
tables:
- name: structures
- name: services

- name: mediation_numerique_angers
schema: mediation_numerique_angers
tables:
- name: structures
- name: services

- name: mediation_numerique_assembleurs
schema: mediation_numerique_assembleurs
tables:
- name: structures
- name: services

- name: mediation_numerique_cd17
schema: mediation_numerique_cd17
tables:
- name: structures
- name: services

- name: mediation_numerique_cd23
schema: mediation_numerique_cd23
tables:
- name: structures
- name: services

- name: mediation_numerique_cd28_appui_territorial
schema: mediation_numerique_cd28_appui_territorial
tables:
- name: structures
- name: services

- name: mediation_numerique_cd33
schema: mediation_numerique_cd33
tables:
- name: structures
- name: services

- name: mediation_numerique_cd40
schema: mediation_numerique_cd40
tables:
- name: structures
- name: services

- name: mediation_numerique_cd44
schema: mediation_numerique_cd44
tables:
- name: structures
- name: services

- name: mediation_numerique_cd49
schema: mediation_numerique_cd49
tables:
- name: structures
- name: services

- name: mediation_numerique_cd85
schema: mediation_numerique_cd85
tables:
- name: structures
- name: services

- name: mediation_numerique_cd87
schema: mediation_numerique_cd87
tables:
- name: structures
- name: services

- name: mediation_numerique_conseiller_numerique
schema: mediation_numerique_conseiller_numerique
tables:
- name: structures
- name: services

- name: mediation_numerique_conumm
schema: mediation_numerique_conumm
tables:
- name: structures
- name: services

- name: mediation_numerique_cr93
schema: mediation_numerique_cr93
tables:
- name: structures
- name: services

- name: mediation_numerique_etapes_numerique
schema: mediation_numerique_etapes_numerique
tables:
- name: structures
- name: services

- name: mediation_numerique_fibre_64
schema: mediation_numerique_fibre_64
tables:
- name: structures
- name: services

- name: mediation_numerique_france_services
schema: mediation_numerique_france_services
tables:
- name: structures
- name: services

- name: mediation_numerique_france_tiers_lieux
schema: mediation_numerique_france_tiers_lieux
tables:
- name: structures
- name: services

- name: mediation_numerique_francilin
schema: mediation_numerique_francilin
tables:
- name: structures
- name: services

- name: mediation_numerique_hinaura
schema: mediation_numerique_hinaura
tables:
- name: structures
- name: services

- name: mediation_numerique_hub_antilles
schema: mediation_numerique_hub_antilles
tables:
- name: structures
- name: services

- name: mediation_numerique_hub_lo
schema: mediation_numerique_hub_lo
tables:
- name: structures
- name: services

- name: mediation_numerique_mulhouse
schema: mediation_numerique_mulhouse
tables:
- name: structures
- name: services

- name: mediation_numerique_res_in
schema: mediation_numerique_res_in
tables:
- name: structures
- name: services

- name: mediation_numerique_rhinocc
schema: mediation_numerique_rhinocc
tables:
- name: structures
- name: services

- name: mediation_numerique_ultra_numerique
schema: mediation_numerique_ultra_numerique
- name: mediation_numerique
schema: mediation_numerique
tables:
- name: structures
- name: services
Expand Down
Original file line number Diff line number Diff line change
@@ -1,51 +1,13 @@
WITH structures AS (
{{
dbt_utils.union_relations(
relations=[
ref('stg_mediation_numerique_aidants_connect__structures'),
ref('stg_mediation_numerique_angers__structures'),
ref('stg_mediation_numerique_assembleurs__structures'),
ref('stg_mediation_numerique_cd17__structures'),
ref('stg_mediation_numerique_cd23__structures'),
ref('stg_mediation_numerique_cd28_appui_territorial__structures'),
ref('stg_mediation_numerique_cd33__structures'),
ref('stg_mediation_numerique_cd40__structures'),
ref('stg_mediation_numerique_cd44__structures'),
ref('stg_mediation_numerique_cd49__structures'),
ref('stg_mediation_numerique_cd85__structures'),
ref('stg_mediation_numerique_cd87__structures'),
ref('stg_mediation_numerique_conseiller_numerique__structures'),
ref('stg_mediation_numerique_conumm__structures'),
ref('stg_mediation_numerique_cr93__structures'),
ref('stg_mediation_numerique_etapes_numerique__structures'),
ref('stg_mediation_numerique_fibre_64__structures'),
ref('stg_mediation_numerique_france_services__structures'),
ref('stg_mediation_numerique_france_tiers_lieux__structures'),
ref('stg_mediation_numerique_francilin__structures'),
ref('stg_mediation_numerique_hinaura__structures'),
ref('stg_mediation_numerique_hub_antilles__structures'),
ref('stg_mediation_numerique_hub_lo__structures'),
ref('stg_mediation_numerique_mulhouse__structures'),
ref('stg_mediation_numerique_res_in__structures'),
ref('stg_mediation_numerique_rhinocc__structures'),
ref('stg_mediation_numerique_ultra_numerique__structures'),
],
column_override={
"thematiques": "TEXT[]",
"labels_nationaux": "TEXT[]",
"labels_autres": "TEXT[]",
},
source_column_name=None
)
}}
SELECT * FROM {{ ref('stg_mediation_numerique__structures') }}
),

final AS (
SELECT
id AS "id",
commune AS "commune",
code_postal AS "code_postal",
NULL AS "code_insee",
code_insee AS "code_insee",
adresse AS "adresse",
NULL AS "complement_adresse",
longitude AS "longitude",
Expand Down
Loading

0 comments on commit 09e8a08

Please sign in to comment.