Skip to content

Commit

Permalink
feat(core): decoralate intermediate states
Browse files Browse the repository at this point in the history
  • Loading branch information
hlecuyer committed Aug 28, 2024
1 parent 9f1a37c commit 78a7747
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 16 deletions.
31 changes: 16 additions & 15 deletions pipeline/dags/dag_utils/dbt.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
from typing import Optional

from airflow.models import Variable
Expand All @@ -12,6 +13,7 @@
def dbt_operator_factory(
task_id: str,
command: str,
dbt_vars: Optional[dict] = None,
select: Optional[str] = None,
exclude: Optional[str] = None,
trigger_rule: TriggerRule = TriggerRule.ALL_SUCCESS,
Expand All @@ -23,11 +25,15 @@ def dbt_operator_factory(
dbt_args += f" --select {select}"
if exclude is not None:
dbt_args += f" --exclude {exclude}"
if dbt_vars is not None:
dbt_vars = json.dumps(dbt_vars)
dbt_args += f" --vars '{dbt_vars}'"

return bash.BashOperator(
task_id=task_id,
bash_command=f"{DBT_PYTHON_BIN_PATH.parent / 'dbt'} {dbt_args}",
append_env=True,
trigger_rule=trigger_rule,
env={
"DBT_PROFILES_DIR": Variable.get("DBT_PROJECT_DIR"),
"DBT_TARGET_PATH": Variable.get("DBT_TARGET_PATH", "target"),
Expand All @@ -39,15 +45,14 @@ def dbt_operator_factory(
"POSTGRES_DB": "{{ conn.pg.schema }}",
},
cwd=Variable.get("DBT_PROJECT_DIR"),
trigger_rule=trigger_rule,
)


def get_staging_tasks(schedule=None):
task_list = []

for source_id, src_meta in sorted(SOURCES_CONFIGS.items()):
if schedule and src_meta["schedule"] != schedule:
if schedule and "schedule" in src_meta and src_meta["schedule"] != schedule:
continue

dbt_source_id = source_id.replace("-", "_")
Expand All @@ -68,23 +73,24 @@ def get_staging_tasks(schedule=None):
select=stg_selector,
)

dbt_run_intermediate = dbt_operator_factory(
task_id="dbt_run_intermediate",
command="run",
dbt_build_intermediate_tmp = dbt_operator_factory(
task_id="dbt_build_intermediate_tmp",
command="build",
select=int_selector,
dbt_vars={"build_intermediate_tmp": True},
)

dbt_test_intermediate = dbt_operator_factory(
task_id="dbt_test_intermediate",
command="test",
dbt_run_intermediate = dbt_operator_factory(
task_id="dbt_run_intermediate",
command="run",
select=int_selector,
)

(
dbt_run_staging
>> dbt_test_staging
>> dbt_build_intermediate_tmp
>> dbt_run_intermediate
>> dbt_test_intermediate
)

task_list += [source_task_group]
Expand All @@ -98,17 +104,12 @@ def get_before_geocoding_tasks():
command="build",
select=" ".join(
[
# FIXME: handle odspep as other sources (add to dags/settings.py)
"path:models/staging/sources/odspep",
"path:models/intermediate/sources/odspep",
# FIXME: handle monenfant as other sources (add to dags/settings.py)
"path:models/staging/sources/monenfant",
"path:models/intermediate/sources/monenfant",
"path:models/intermediate/int__union_adresses.sql",
"path:models/intermediate/int__union_services.sql",
"path:models/intermediate/int__union_structures.sql",
]
),
trigger_rule=TriggerRule.ALL_DONE,
)


Expand Down
2 changes: 2 additions & 0 deletions pipeline/dags/dag_utils/sources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,8 @@
},
},
},
"odspep": {},
"monenfant": {},
}


Expand Down
2 changes: 2 additions & 0 deletions pipeline/dags/import_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ def load_from_s3_to_data_warehouse(source_id, stream_id, run_id, logical_date):


for source_id, source_config in sources.SOURCES_CONFIGS.items():
if "streams" not in source_config:
continue
model_name = source_id.replace("-", "_")
dag_id = f"import_{model_name}"

Expand Down
2 changes: 1 addition & 1 deletion pipeline/dbt/dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ models:
+materialized: view

intermediate:
+schema: intermediate
+schema: "{{ 'intermediate_tmp' if var('build_intermediate_tmp', false) else 'intermediate' }}"
+materialized: table

marts:
Expand Down

0 comments on commit 78a7747

Please sign in to comment.