diff --git a/pipeline/dags/import_sources.py b/pipeline/dags/import_sources.py index 6dd2be1b..c604aa19 100644 --- a/pipeline/dags/import_sources.py +++ b/pipeline/dags/import_sources.py @@ -136,6 +136,12 @@ def load_from_s3_to_data_warehouse(source_id, stream_id, run_id, logical_date): start = empty.EmptyOperator(task_id="start") end = empty.EmptyOperator(task_id="end") + dbt_snapshot_source = dbt_operator_factory( + task_id="dbt_snapshot_source", + command="snapshot", + select=model_name, + ) + for stream_id in source_config["streams"]: with TaskGroup(group_id=stream_id) as stream_task_group: extract = python.ExternalPythonOperator( @@ -160,15 +166,10 @@ def load_from_s3_to_data_warehouse(source_id, stream_id, run_id, logical_date): start >> extract >> load - # FIXME(vperron) : didn't Valentin say that snapshots aren't actually used ? - if source_config["snapshot"]: - dbt_snapshot_source = dbt_operator_factory( - task_id="dbt_snapshot_source", - command="snapshot", - select=model_name, - ) - stream_task_group >> dbt_snapshot_source >> end - else: - stream_task_group >> end + # FIXME(vperron) : didn't Valentin say that snapshots aren't actually used ? + if source_config["snapshot"]: + stream_task_group >> dbt_snapshot_source >> end + else: + stream_task_group >> end globals()[dag_id] = dag diff --git a/pipeline/tests/integration/test_geocoding.py b/pipeline/tests/integration/test_geocoding.py index a6c4d3b6..fb8e4535 100644 --- a/pipeline/tests/integration/test_geocoding.py +++ b/pipeline/tests/integration/test_geocoding.py @@ -1,3 +1,5 @@ +from unittest.mock import ANY + import pandas as pd import pytest @@ -49,7 +51,7 @@ def test_ban_geocode( "latitude": "50.627078", "longitude": "3.067372", "result_label": "17 Rue Malus 59000 Lille", - "result_score": "0.9747818181818181", + "result_score": ANY, "result_score_next": None, "result_type": "housenumber", "result_id": "59350_5835_00017",