diff --git a/.changes/unreleased/Features-20240926-153210.yaml b/.changes/unreleased/Features-20240926-153210.yaml new file mode 100644 index 00000000000..8f8919b918f --- /dev/null +++ b/.changes/unreleased/Features-20240926-153210.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Ensure microbatch models respect `full_refresh` model config +time: 2024-09-26T15:32:10.202789-05:00 +custom: + Author: QMalcolm + Issue: "10785" diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index 80c06dffcce..ac8298cc9d9 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -555,12 +555,17 @@ def _is_incremental(self, model) -> bool: relation = self.adapter.get_relation( relation_info.database, relation_info.schema, relation_info.name ) - return ( + if ( relation is not None and relation.type == "table" and model.config.materialized == "incremental" - and not (getattr(self.config.args, "FULL_REFRESH", False) or model.config.full_refresh) - ) + ): + if model.config.full_refresh is not None: + return not model.config.full_refresh + else: + return not getattr(self.config.args, "FULL_REFRESH", False) + else: + return False class RunTask(CompileTask): diff --git a/tests/functional/microbatch/test_microbatch.py b/tests/functional/microbatch/test_microbatch.py index 3962fab02d6..c233657f180 100644 --- a/tests/functional/microbatch/test_microbatch.py +++ b/tests/functional/microbatch/test_microbatch.py @@ -93,6 +93,11 @@ SELECT * FROM {{ ref('microbatch_model') }} """ +microbatch_model_full_refresh_false_sql = """ +{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0), full_refresh=False) }} +select * from {{ ref('input_model') }} +""" + class BaseMicrobatchCustomUserStrategy: @pytest.fixture(scope="class") @@ -578,3 +583,39 @@ def test_run_with_event_time(self, project): "microbatch_model", "microbatch_model_2020-01-03.sql", ) + + +class TestMicrobatchFullRefreshConfigFalse(BaseMicrobatchTest): + @pytest.fixture(scope="class") + def models(self): + return { + "input_model.sql": input_model_sql, + "microbatch_model.sql": microbatch_model_full_refresh_false_sql, + "downstream_model.sql": downstream_model_of_microbatch_sql, + } + + @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) + def test_run_with_event_time(self, project): + # run all partitions from 2020-01-02 to spoofed "now" - 2 expected rows in output + with patch_microbatch_end_time("2020-01-03 13:57:00"): + run_dbt(["run", "--event-time-start", "2020-01-02"]) + self.assert_row_count(project, "microbatch_model", 2) + + # re-running shouldn't change what it's in the data set because there is nothing new + with patch_microbatch_end_time("2020-01-03 13:57:00"): + run_dbt(["run"]) + self.assert_row_count(project, "microbatch_model", 2) + + # running with --full-refresh shouldn't pick up 2020-01-01 BECAUSE the model has + # full_refresh = false + with patch_microbatch_end_time("2020-01-03 13:57:00"): + run_dbt(["run", "--full-refresh"]) + self.assert_row_count(project, "microbatch_model", 2) + + # update the microbatch model to no longer have full_refresh=False config + write_file(microbatch_model_sql, project.project_root, "models", "microbatch_model.sql") + + # running with full refresh should now pick up the 2020-01-01 data + with patch_microbatch_end_time("2020-01-03 13:57:00"): + run_dbt(["run", "--full-refresh"]) + self.assert_row_count(project, "microbatch_model", 3) diff --git a/tests/unit/task/test_run.py b/tests/unit/task/test_run.py index f0d768bcd9e..70de7aa76ca 100644 --- a/tests/unit/task/test_run.py +++ b/tests/unit/task/test_run.py @@ -1,11 +1,15 @@ import threading from argparse import Namespace +from dataclasses import dataclass from datetime import datetime, timedelta +from typing import Optional from unittest.mock import MagicMock, patch import pytest +from pytest_mock import MockerFixture from dbt.adapters.postgres import PostgresAdapter +from dbt.artifacts.resources.v1.model import ModelConfig from dbt.artifacts.schemas.batch_results import BatchResults from dbt.artifacts.schemas.results import RunStatus from dbt.artifacts.schemas.run import RunResult @@ -174,3 +178,64 @@ def test__build_run_microbatch_model_result( assert expect_success.status == RunStatus.Success assert expect_error.status == RunStatus.Error assert expect_partial_success.status == RunStatus.PartialSuccess + + @pytest.mark.parametrize( + "has_relation,relation_type,materialized,full_refresh_config,full_refresh_flag,expectation", + [ + (False, "table", "incremental", None, False, False), + (True, "other", "incremental", None, False, False), + (True, "table", "other", None, False, False), + # model config takes precendence + (True, "table", "incremental", True, False, False), + # model config takes precendence + (True, "table", "incremental", True, True, False), + # model config takes precendence + (True, "table", "incremental", False, False, True), + # model config takes precendence + (True, "table", "incremental", False, True, True), + # model config is none, so opposite flag value + (True, "table", "incremental", None, True, False), + # model config is none, so opposite flag value + (True, "table", "incremental", None, False, True), + ], + ) + def test__is_incremental( + self, + mocker: MockerFixture, + model_runner: ModelRunner, + has_relation: bool, + relation_type: str, + materialized: str, + full_refresh_config: Optional[bool], + full_refresh_flag: bool, + expectation: bool, + ) -> None: + + # Setup adapter relation getting + @dataclass + class RelationInfo: + database: str = "database" + schema: str = "schema" + name: str = "name" + + @dataclass + class Relation: + type: str + + model_runner.adapter = mocker.Mock() + model_runner.adapter.Relation.create_from.return_value = RelationInfo() + + if has_relation: + model_runner.adapter.get_relation.return_value = Relation(type=relation_type) + else: + model_runner.adapter.get_relation.return_value = None + + # Set ModelRunner configs + model_runner.config.args = Namespace(FULL_REFRESH=full_refresh_flag) + + # Create model with configs + model = model_runner.node + model.config = ModelConfig(materialized=materialized, full_refresh=full_refresh_config) + + # Assert result of _is_incremental + assert model_runner._is_incremental(model) == expectation