Skip to content

Commit

Permalink
[CT-10785] Microbatch models should respect full_refresh model conf…
Browse files Browse the repository at this point in the history
…ig (#10788)

* Add tests to check how microbatch models respect `full_refresh` model configs

* Fix `_is_incremental` to properly respect `full_refresh` model config

In dbt-core, it is generally expected that values passed via CLI flags take
precedence over model level configs. However, `full_refresh` on a model is an
exception to this rule, where in the model config takes precedence. This
config exists specifically to _prevent_ accidental full refreshes of large
incremental models, as doing so can be costly. **_It is actually best
practice_** to set `full_refresh=False` on incremental models.

Prior to this commit, for microbatch models, the above was not happening. The
CLI flag `--full-refresh` was taking precedence over the model config
`full_refresh`. That meant that if `--full-refresh` was supplied, then the
microbatch model **_would full refresh_** even if `full_refresh=False` was
set on the model. This commit solves that problem.

* Add changie doc for microbatch `full_refresh` config handling
  • Loading branch information
QMalcolm authored Sep 26, 2024
1 parent 1076352 commit d8b1bf5
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 3 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20240926-153210.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Ensure microbatch models respect `full_refresh` model config
time: 2024-09-26T15:32:10.202789-05:00
custom:
Author: QMalcolm
Issue: "10785"
11 changes: 8 additions & 3 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -555,12 +555,17 @@ def _is_incremental(self, model) -> bool:
relation = self.adapter.get_relation(
relation_info.database, relation_info.schema, relation_info.name
)
return (
if (
relation is not None
and relation.type == "table"
and model.config.materialized == "incremental"
and not (getattr(self.config.args, "FULL_REFRESH", False) or model.config.full_refresh)
)
):
if model.config.full_refresh is not None:
return not model.config.full_refresh
else:
return not getattr(self.config.args, "FULL_REFRESH", False)
else:
return False


class RunTask(CompileTask):
Expand Down
41 changes: 41 additions & 0 deletions tests/functional/microbatch/test_microbatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@
SELECT * FROM {{ ref('microbatch_model') }}
"""

microbatch_model_full_refresh_false_sql = """
{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0), full_refresh=False) }}
select * from {{ ref('input_model') }}
"""


class BaseMicrobatchCustomUserStrategy:
@pytest.fixture(scope="class")
Expand Down Expand Up @@ -578,3 +583,39 @@ def test_run_with_event_time(self, project):
"microbatch_model",
"microbatch_model_2020-01-03.sql",
)


class TestMicrobatchFullRefreshConfigFalse(BaseMicrobatchTest):
@pytest.fixture(scope="class")
def models(self):
return {
"input_model.sql": input_model_sql,
"microbatch_model.sql": microbatch_model_full_refresh_false_sql,
"downstream_model.sql": downstream_model_of_microbatch_sql,
}

@mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"})
def test_run_with_event_time(self, project):
# run all partitions from 2020-01-02 to spoofed "now" - 2 expected rows in output
with patch_microbatch_end_time("2020-01-03 13:57:00"):
run_dbt(["run", "--event-time-start", "2020-01-02"])
self.assert_row_count(project, "microbatch_model", 2)

# re-running shouldn't change what it's in the data set because there is nothing new
with patch_microbatch_end_time("2020-01-03 13:57:00"):
run_dbt(["run"])
self.assert_row_count(project, "microbatch_model", 2)

# running with --full-refresh shouldn't pick up 2020-01-01 BECAUSE the model has
# full_refresh = false
with patch_microbatch_end_time("2020-01-03 13:57:00"):
run_dbt(["run", "--full-refresh"])
self.assert_row_count(project, "microbatch_model", 2)

# update the microbatch model to no longer have full_refresh=False config
write_file(microbatch_model_sql, project.project_root, "models", "microbatch_model.sql")

# running with full refresh should now pick up the 2020-01-01 data
with patch_microbatch_end_time("2020-01-03 13:57:00"):
run_dbt(["run", "--full-refresh"])
self.assert_row_count(project, "microbatch_model", 3)
65 changes: 65 additions & 0 deletions tests/unit/task/test_run.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import threading
from argparse import Namespace
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Optional
from unittest.mock import MagicMock, patch

import pytest
from pytest_mock import MockerFixture

from dbt.adapters.postgres import PostgresAdapter
from dbt.artifacts.resources.v1.model import ModelConfig
from dbt.artifacts.schemas.batch_results import BatchResults
from dbt.artifacts.schemas.results import RunStatus
from dbt.artifacts.schemas.run import RunResult
Expand Down Expand Up @@ -174,3 +178,64 @@ def test__build_run_microbatch_model_result(
assert expect_success.status == RunStatus.Success
assert expect_error.status == RunStatus.Error
assert expect_partial_success.status == RunStatus.PartialSuccess

@pytest.mark.parametrize(
"has_relation,relation_type,materialized,full_refresh_config,full_refresh_flag,expectation",
[
(False, "table", "incremental", None, False, False),
(True, "other", "incremental", None, False, False),
(True, "table", "other", None, False, False),
# model config takes precendence
(True, "table", "incremental", True, False, False),
# model config takes precendence
(True, "table", "incremental", True, True, False),
# model config takes precendence
(True, "table", "incremental", False, False, True),
# model config takes precendence
(True, "table", "incremental", False, True, True),
# model config is none, so opposite flag value
(True, "table", "incremental", None, True, False),
# model config is none, so opposite flag value
(True, "table", "incremental", None, False, True),
],
)
def test__is_incremental(
self,
mocker: MockerFixture,
model_runner: ModelRunner,
has_relation: bool,
relation_type: str,
materialized: str,
full_refresh_config: Optional[bool],
full_refresh_flag: bool,
expectation: bool,
) -> None:

# Setup adapter relation getting
@dataclass
class RelationInfo:
database: str = "database"
schema: str = "schema"
name: str = "name"

@dataclass
class Relation:
type: str

model_runner.adapter = mocker.Mock()
model_runner.adapter.Relation.create_from.return_value = RelationInfo()

if has_relation:
model_runner.adapter.get_relation.return_value = Relation(type=relation_type)
else:
model_runner.adapter.get_relation.return_value = None

# Set ModelRunner configs
model_runner.config.args = Namespace(FULL_REFRESH=full_refresh_flag)

# Create model with configs
model = model_runner.node
model.config = ModelConfig(materialized=materialized, full_refresh=full_refresh_config)

# Assert result of _is_incremental
assert model_runner._is_incremental(model) == expectation

0 comments on commit d8b1bf5

Please sign in to comment.