diff --git a/core/dbt/materializations/incremental/microbatch.py b/core/dbt/materializations/incremental/microbatch.py index da8930acb89..690083f3c3e 100644 --- a/core/dbt/materializations/incremental/microbatch.py +++ b/core/dbt/materializations/incremental/microbatch.py @@ -1,5 +1,5 @@ from datetime import datetime, timedelta -from typing import List, Optional +from typing import Any, Dict, List, Optional import pytz @@ -99,6 +99,26 @@ def build_batches(self, start: datetime, end: datetime) -> List[BatchType]: return batches + def build_batch_context(self, incremental_batch: bool) -> Dict[str, Any]: + """ + Create context with entries that reflect microbatch model + incremental execution state + + Assumes self.model has been (re)-compiled with necessary batch filters applied. + """ + batch_context: Dict[str, Any] = {} + + # Microbatch model properties + batch_context["model"] = self.model.to_dict() + batch_context["sql"] = self.model.compiled_code + batch_context["compiled_code"] = self.model.compiled_code + + # Add incremental context variables for batches running incrementally + if incremental_batch: + batch_context["is_incremental"] = lambda: True + batch_context["should_full_refresh"] = lambda: False + + return batch_context + @staticmethod def offset_timestamp(timestamp: datetime, batch_size: BatchSize, offset: int) -> datetime: """Truncates the passed in timestamp based on the batch_size and then applies the offset by the batch_size. diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index 416a3daf854..0700880c617 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -489,28 +489,29 @@ def _execute_microbatch_materialization( materialization_macro: MacroProtocol, ) -> List[RunResult]: batch_results: List[RunResult] = [] + microbatch_builder = MicrobatchBuilder( + model=model, + is_incremental=self._is_incremental(model), + event_time_start=getattr(self.config.args, "EVENT_TIME_START", None), + event_time_end=getattr(self.config.args, "EVENT_TIME_END", None), + default_end_time=self.config.invoked_at, + ) + # Indicates whether current batch should be run incrementally + incremental_batch = False # Note currently (9/30/2024) model.batch_info is only ever _not_ `None` # IFF `dbt retry` is being run and the microbatch model had batches which # failed on the run of the model (which is being retried) if model.batch_info is None: - microbatch_builder = MicrobatchBuilder( - model=model, - is_incremental=self._is_incremental(model), - event_time_start=getattr(self.config.args, "EVENT_TIME_START", None), - event_time_end=getattr(self.config.args, "EVENT_TIME_END", None), - default_end_time=self.config.invoked_at, - ) end = microbatch_builder.build_end_time() start = microbatch_builder.build_start_time(end) batches = microbatch_builder.build_batches(start, end) else: batches = model.batch_info.failed - # if there is batch info, then don't run as full_refresh and do force is_incremental + # If there is batch info, then don't run as full_refresh and do force is_incremental # not doing this risks blowing away the work that has already been done if self._has_relation(model=model): - context["is_incremental"] = lambda: True - context["should_full_refresh"] = lambda: False + incremental_batch = True # iterate over each batch, calling materialization_macro to get a batch-level run result for batch_idx, batch in enumerate(batches): @@ -532,9 +533,11 @@ def _execute_microbatch_materialization( batch[0], model.config.batch_size ), ) - context["model"] = model.to_dict() - context["sql"] = model.compiled_code - context["compiled_code"] = model.compiled_code + # Update jinja context with batch context members + batch_context = microbatch_builder.build_batch_context( + incremental_batch=incremental_batch + ) + context.update(batch_context) # Materialize batch and cache any materialized relations result = MacroGenerator( @@ -547,9 +550,9 @@ def _execute_microbatch_materialization( batch_run_result = self._build_succesful_run_batch_result( model, context, batch, time.perf_counter() - start_time ) - # Update context vars for future batches - context["is_incremental"] = lambda: True - context["should_full_refresh"] = lambda: False + # At least one batch has been inserted successfully! + incremental_batch = True + except Exception as e: exception = e batch_run_result = self._build_failed_run_batch_result( diff --git a/tests/unit/materializations/incremental/test_microbatch.py b/tests/unit/materializations/incremental/test_microbatch.py index 5c368f535a9..8581e074ee7 100644 --- a/tests/unit/materializations/incremental/test_microbatch.py +++ b/tests/unit/materializations/incremental/test_microbatch.py @@ -488,6 +488,33 @@ def test_build_batches(self, microbatch_model, start, end, batch_size, expected_ assert len(actual_batches) == len(expected_batches) assert actual_batches == expected_batches + def test_build_batch_context_incremental_batch(self, microbatch_model): + microbatch_builder = MicrobatchBuilder( + model=microbatch_model, is_incremental=True, event_time_start=None, event_time_end=None + ) + context = microbatch_builder.build_batch_context(incremental_batch=True) + + assert context["model"] == microbatch_model.to_dict() + assert context["sql"] == microbatch_model.compiled_code + assert context["compiled_code"] == microbatch_model.compiled_code + + assert context["is_incremental"]() is True + assert context["should_full_refresh"]() is False + + def test_build_batch_context_incremental_batch_false(self, microbatch_model): + microbatch_builder = MicrobatchBuilder( + model=microbatch_model, is_incremental=True, event_time_start=None, event_time_end=None + ) + context = microbatch_builder.build_batch_context(incremental_batch=False) + + assert context["model"] == microbatch_model.to_dict() + assert context["sql"] == microbatch_model.compiled_code + assert context["compiled_code"] == microbatch_model.compiled_code + + # Only build is_incremental callables when not first batch + assert "is_incremental" not in context + assert "should_full_refresh" not in context + @pytest.mark.parametrize( "timestamp,batch_size,offset,expected_timestamp", [