Skip to content

Commit

Permalink
MicrobatchBuilder.build_batch_context + tests
Browse files Browse the repository at this point in the history
  • Loading branch information
MichelleArk committed Oct 28, 2024
1 parent e1aa98e commit 072fe44
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 17 deletions.
22 changes: 21 additions & 1 deletion core/dbt/materializations/incremental/microbatch.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from datetime import datetime, timedelta
from typing import List, Optional
from typing import Any, Dict, List, Optional

import pytz

Expand Down Expand Up @@ -99,6 +99,26 @@ def build_batches(self, start: datetime, end: datetime) -> List[BatchType]:

return batches

def build_batch_context(self, incremental_batch: bool) -> Dict[str, Any]:
"""
Create context with entries that reflect microbatch model + incremental execution state
Assumes self.model has been (re)-compiled with necessary batch filters applied.
"""
batch_context: Dict[str, Any] = {}

# Microbatch model properties
batch_context["model"] = self.model.to_dict()
batch_context["sql"] = self.model.compiled_code
batch_context["compiled_code"] = self.model.compiled_code

# Add incremental context variables for batches running incrementally
if incremental_batch:
batch_context["is_incremental"] = lambda: True
batch_context["should_full_refresh"] = lambda: False

return batch_context

@staticmethod
def offset_timestamp(timestamp: datetime, batch_size: BatchSize, offset: int) -> datetime:
"""Truncates the passed in timestamp based on the batch_size and then applies the offset by the batch_size.
Expand Down
35 changes: 19 additions & 16 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -489,28 +489,29 @@ def _execute_microbatch_materialization(
materialization_macro: MacroProtocol,
) -> List[RunResult]:
batch_results: List[RunResult] = []
microbatch_builder = MicrobatchBuilder(
model=model,
is_incremental=self._is_incremental(model),
event_time_start=getattr(self.config.args, "EVENT_TIME_START", None),
event_time_end=getattr(self.config.args, "EVENT_TIME_END", None),
default_end_time=self.config.invoked_at,
)
# Indicates whether current batch should be run incrementally
incremental_batch = False

# Note currently (9/30/2024) model.batch_info is only ever _not_ `None`
# IFF `dbt retry` is being run and the microbatch model had batches which
# failed on the run of the model (which is being retried)
if model.batch_info is None:
microbatch_builder = MicrobatchBuilder(
model=model,
is_incremental=self._is_incremental(model),
event_time_start=getattr(self.config.args, "EVENT_TIME_START", None),
event_time_end=getattr(self.config.args, "EVENT_TIME_END", None),
default_end_time=self.config.invoked_at,
)
end = microbatch_builder.build_end_time()
start = microbatch_builder.build_start_time(end)
batches = microbatch_builder.build_batches(start, end)
else:
batches = model.batch_info.failed
# if there is batch info, then don't run as full_refresh and do force is_incremental
# If there is batch info, then don't run as full_refresh and do force is_incremental
# not doing this risks blowing away the work that has already been done
if self._has_relation(model=model):
context["is_incremental"] = lambda: True
context["should_full_refresh"] = lambda: False
incremental_batch = True

# iterate over each batch, calling materialization_macro to get a batch-level run result
for batch_idx, batch in enumerate(batches):
Expand All @@ -532,9 +533,11 @@ def _execute_microbatch_materialization(
batch[0], model.config.batch_size
),
)
context["model"] = model.to_dict()
context["sql"] = model.compiled_code
context["compiled_code"] = model.compiled_code
# Update jinja context with batch context members
batch_context = microbatch_builder.build_batch_context(
incremental_batch=incremental_batch
)
context.update(batch_context)

# Materialize batch and cache any materialized relations
result = MacroGenerator(
Expand All @@ -547,9 +550,9 @@ def _execute_microbatch_materialization(
batch_run_result = self._build_succesful_run_batch_result(
model, context, batch, time.perf_counter() - start_time
)
# Update context vars for future batches
context["is_incremental"] = lambda: True
context["should_full_refresh"] = lambda: False
# At least one batch has been inserted successfully!
incremental_batch = True

except Exception as e:
exception = e
batch_run_result = self._build_failed_run_batch_result(
Expand Down
27 changes: 27 additions & 0 deletions tests/unit/materializations/incremental/test_microbatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,33 @@ def test_build_batches(self, microbatch_model, start, end, batch_size, expected_
assert len(actual_batches) == len(expected_batches)
assert actual_batches == expected_batches

def test_build_batch_context_incremental_batch(self, microbatch_model):
microbatch_builder = MicrobatchBuilder(
model=microbatch_model, is_incremental=True, event_time_start=None, event_time_end=None
)
context = microbatch_builder.build_batch_context(incremental_batch=True)

assert context["model"] == microbatch_model.to_dict()
assert context["sql"] == microbatch_model.compiled_code
assert context["compiled_code"] == microbatch_model.compiled_code

assert context["is_incremental"]() is True
assert context["should_full_refresh"]() is False

def test_build_batch_context_incremental_batch_false(self, microbatch_model):
microbatch_builder = MicrobatchBuilder(
model=microbatch_model, is_incremental=True, event_time_start=None, event_time_end=None
)
context = microbatch_builder.build_batch_context(incremental_batch=False)

assert context["model"] == microbatch_model.to_dict()
assert context["sql"] == microbatch_model.compiled_code
assert context["compiled_code"] == microbatch_model.compiled_code

# Only build is_incremental callables when not first batch
assert "is_incremental" not in context
assert "should_full_refresh" not in context

@pytest.mark.parametrize(
"timestamp,batch_size,offset,expected_timestamp",
[
Expand Down

0 comments on commit 072fe44

Please sign in to comment.