diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index f507b03ac..6605bf20d 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -10,6 +10,7 @@ from cosmos.constants import ( DBT_COMPILE_TASK_ID, DEFAULT_DBT_RESOURCES, + SUPPORTED_BUILD_RESOURCES, TESTABLE_DBT_RESOURCES, DbtResourceType, ExecutionMode, @@ -128,6 +129,31 @@ def create_test_task_metadata( ) +def create_dbt_resource_to_class(test_behavior: TestBehavior) -> dict[str, str]: + """ + Return the map from dbt node type to Cosmos class prefix that should be used + to handle them. + """ + + if test_behavior == TestBehavior.BUILD: + dbt_resource_to_class = { + DbtResourceType.MODEL: "DbtBuild", + DbtResourceType.SNAPSHOT: "DbtBuild", + DbtResourceType.SEED: "DbtBuild", + DbtResourceType.TEST: "DbtTest", + DbtResourceType.SOURCE: "DbtSource", + } + else: + dbt_resource_to_class = { + DbtResourceType.MODEL: "DbtRun", + DbtResourceType.SNAPSHOT: "DbtSnapshot", + DbtResourceType.SEED: "DbtSeed", + DbtResourceType.TEST: "DbtTest", + DbtResourceType.SOURCE: "DbtSource", + } + return dbt_resource_to_class + + def create_task_metadata( node: DbtNode, execution_mode: ExecutionMode, @@ -135,6 +161,7 @@ def create_task_metadata( dbt_dag_task_group_identifier: str, use_task_group: bool = False, source_rendering_behavior: SourceRenderingBehavior = SourceRenderingBehavior.NONE, + test_behavior: TestBehavior = TestBehavior.AFTER_ALL, ) -> TaskMetadata | None: """ Create the metadata that will be used to instantiate the Airflow Task used to run the Dbt node. @@ -148,13 +175,8 @@ def create_task_metadata( If it is False, then use the name as a prefix for the task id, otherwise do not. :returns: The metadata necessary to instantiate the source dbt node as an Airflow task. """ - dbt_resource_to_class = { - DbtResourceType.MODEL: "DbtRun", - DbtResourceType.SNAPSHOT: "DbtSnapshot", - DbtResourceType.SEED: "DbtSeed", - DbtResourceType.TEST: "DbtTest", - DbtResourceType.SOURCE: "DbtSource", - } + dbt_resource_to_class = create_dbt_resource_to_class(test_behavior) + args = {**args, **{"models": node.resource_name}} if DbtResourceType(node.resource_type) in DEFAULT_DBT_RESOURCES and node.resource_type in dbt_resource_to_class: @@ -162,10 +184,13 @@ def create_task_metadata( "dbt_node_config": node.context_dict, "dbt_dag_task_group_identifier": dbt_dag_task_group_identifier, } - if node.resource_type == DbtResourceType.MODEL: - task_id = f"{node.name}_run" - if use_task_group is True: + if test_behavior == TestBehavior.BUILD and node.resource_type in SUPPORTED_BUILD_RESOURCES: + task_id = f"{node.name}_{node.resource_type.value}_build" + elif node.resource_type == DbtResourceType.MODEL: + if use_task_group: task_id = "run" + else: + task_id = f"{node.name}_run" elif node.resource_type == DbtResourceType.SOURCE: if (source_rendering_behavior == SourceRenderingBehavior.NONE) or ( source_rendering_behavior == SourceRenderingBehavior.WITH_TESTS_OR_FRESHNESS @@ -173,8 +198,6 @@ def create_task_metadata( and node.has_test is False ): return None - # TODO: https://github.com/astronomer/astronomer-cosmos - # pragma: no cover task_id = f"{node.name}_source" args["select"] = f"source:{node.resource_name}" args.pop("models") @@ -234,6 +257,7 @@ def generate_task_or_group( dbt_dag_task_group_identifier=_get_dbt_dag_task_group_identifier(dag, task_group), use_task_group=use_task_group, source_rendering_behavior=source_rendering_behavior, + test_behavior=test_behavior, ) # In most cases, we'll map one DBT node to one Airflow task diff --git a/cosmos/constants.py b/cosmos/constants.py index f42cfc4fc..b45170445 100644 --- a/cosmos/constants.py +++ b/cosmos/constants.py @@ -75,6 +75,7 @@ class TestBehavior(Enum): Behavior of the tests. """ + BUILD = "build" NONE = "none" AFTER_EACH = "after_each" AFTER_ALL = "after_all" @@ -144,6 +145,14 @@ def _missing_value_(cls, value): # type: ignore DEFAULT_DBT_RESOURCES = DbtResourceType.__members__.values() +# According to the dbt documentation (https://docs.getdbt.com/reference/commands/build), build also supports test nodes. +# However, in the context of Cosmos, we will run test nodes together with the respective models/seeds/snapshots nodes +SUPPORTED_BUILD_RESOURCES = [ + DbtResourceType.MODEL, + DbtResourceType.SNAPSHOT, + DbtResourceType.SEED, +] + # dbt test runs tests defined on models, sources, snapshots, and seeds. # It expects that you have already created those resources through the appropriate commands. # https://docs.getdbt.com/reference/commands/test diff --git a/dev/dags/example_cosmos_dbt_build.py b/dev/dags/example_cosmos_dbt_build.py new file mode 100644 index 000000000..163e66f03 --- /dev/null +++ b/dev/dags/example_cosmos_dbt_build.py @@ -0,0 +1,47 @@ +""" +An example Airflow DAG that illustrates using the dbt build to run both models/seeds/sources and their respective tests. +""" + +import os +from datetime import datetime +from pathlib import Path + +from cosmos import DbtDag, ProfileConfig, ProjectConfig, RenderConfig +from cosmos.constants import TestBehavior +from cosmos.profiles import PostgresUserPasswordProfileMapping + +DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt" +DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH)) + +profile_config = ProfileConfig( + profile_name="default", + target_name="dev", + profile_mapping=PostgresUserPasswordProfileMapping( + conn_id="example_conn", + profile_args={"schema": "public"}, + disable_event_tracking=True, + ), +) + +# [START build_example] +example_cosmos_dbt_build = DbtDag( + # dbt/cosmos-specific parameters + project_config=ProjectConfig( + DBT_ROOT_PATH / "jaffle_shop", + ), + render_config=RenderConfig( + test_behavior=TestBehavior.BUILD, + ), + profile_config=profile_config, + operator_args={ + "install_deps": True, # install any necessary dependencies before running any dbt command + "full_refresh": True, # used only in dbt commands that support this flag + }, + # normal dag parameters + schedule_interval="@daily", + start_date=datetime(2023, 1, 1), + catchup=False, + dag_id="example_cosmos_dbt_build", + default_args={"retries": 2}, +) +# [END build_example] diff --git a/docs/_static/test_behavior_after_all.png b/docs/_static/test_behavior_after_all.png new file mode 100644 index 000000000..daee080ad Binary files /dev/null and b/docs/_static/test_behavior_after_all.png differ diff --git a/docs/_static/test_behavior_after_each.png b/docs/_static/test_behavior_after_each.png new file mode 100644 index 000000000..bdd770f0d Binary files /dev/null and b/docs/_static/test_behavior_after_each.png differ diff --git a/docs/_static/test_behavior_build.png b/docs/_static/test_behavior_build.png new file mode 100644 index 000000000..2d873ed5d Binary files /dev/null and b/docs/_static/test_behavior_build.png differ diff --git a/docs/configuration/testing-behavior.rst b/docs/configuration/testing-behavior.rst index 7af9ceedd..ea9806565 100644 --- a/docs/configuration/testing-behavior.rst +++ b/docs/configuration/testing-behavior.rst @@ -15,10 +15,16 @@ default behavior, which runs all models and tests, and then reports all failures Cosmos supports the following test behaviors: - ``after_each`` (default): turns each model into a task group with two steps: run the model, and run the tests +- ``build``: run dbt resources using the ``dbt build`` command, using a single task. This applies to dbt models, seeds and snapshots. - ``after_all``: each model becomes a single task, and the tests only run if all models are run successfully - ``none``: don't include tests -Example: +Example of the standard behavior of ``TestBehavior.AFTER_EACH``, +when using the example DAG available in ``dev/dags/basic_cosmos_dag.py``: + +.. image:: ../_static/test_behavior_after_each.png + +Example when changing the behavior to use ``TestBehavior.AFTER_ALL``: .. code-block:: python @@ -31,6 +37,17 @@ Example: ) ) +.. image:: ../_static/test_behavior_after_all.png + + +Finally, an example DAG and how it is rendered in the Airflow UI when using ``TestBehavior.BUILD``: + +.. literalinclude:: ../../dev/dags/example_cosmos_dbt_build.py + :language: python + :start-after: [START build_example] + :end-before: [END build_example] + +.. image:: ../_static/test_behavior_build.png Warning Behavior ---------------- diff --git a/tests/airflow/test_graph.py b/tests/airflow/test_graph.py index 1bd8cab35..d2f943bab 100644 --- a/tests/airflow/test_graph.py +++ b/tests/airflow/test_graph.py @@ -227,6 +227,49 @@ def test_build_airflow_graph_with_after_all(): assert dag.leaves[0].select == ["tag:some"] +@pytest.mark.skipif( + version.parse(airflow_version) < version.parse("2.4"), + reason="Airflow DAG did not have task_group_dict until the 2.4 release", +) +@pytest.mark.integration +def test_build_airflow_graph_with_build(): + with DAG("test-id", start_date=datetime(2022, 1, 1)) as dag: + task_args = { + "project_dir": SAMPLE_PROJ_PATH, + "conn_id": "fake_conn", + "profile_config": ProfileConfig( + profile_name="default", + target_name="default", + profile_mapping=PostgresUserPasswordProfileMapping( + conn_id="fake_conn", + profile_args={"schema": "public"}, + ), + ), + } + render_config = RenderConfig( + test_behavior=TestBehavior.BUILD, + ) + build_airflow_graph( + nodes=sample_nodes, + dag=dag, + execution_mode=ExecutionMode.LOCAL, + test_indirect_selection=TestIndirectSelection.EAGER, + task_args=task_args, + dbt_project_name="astro_shop", + render_config=render_config, + ) + topological_sort = [task.task_id for task in dag.topological_sort()] + expected_sort = ["seed_parent_seed_build", "parent_model_build", "child_model_build", "child2_v2_model_build"] + assert topological_sort == expected_sort + + task_groups = dag.task_group_dict + assert len(task_groups) == 0 + + assert len(dag.leaves) == 2 + assert dag.leaves[0].task_id in ("child_model_build", "child2_v2_model_build") + assert dag.leaves[1].task_id in ("child_model_build", "child2_v2_model_build") + + @pytest.mark.integration @patch("airflow.hooks.base.BaseHook.get_connection", new=MagicMock()) def test_build_airflow_graph_with_dbt_compile_task():