From 3158dc43494fc6f1d8c8c52c17b3241830883a10 Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Wed, 20 Nov 2024 23:27:48 -0600 Subject: [PATCH 01/17] Add `batch_id` to jinja context of microbatch batches --- core/dbt/materializations/incremental/microbatch.py | 7 ++++++- core/dbt/task/run.py | 3 ++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/core/dbt/materializations/incremental/microbatch.py b/core/dbt/materializations/incremental/microbatch.py index b89c834d4a2..2cdaf71c138 100644 --- a/core/dbt/materializations/incremental/microbatch.py +++ b/core/dbt/materializations/incremental/microbatch.py @@ -100,7 +100,7 @@ def build_batches(self, start: datetime, end: datetime) -> List[BatchType]: return batches - def build_batch_context(self, incremental_batch: bool) -> Dict[str, Any]: + def build_batch_context(self, incremental_batch: bool, start_time: datetime) -> Dict[str, Any]: """ Create context with entries that reflect microbatch model + incremental execution state @@ -112,6 +112,7 @@ def build_batch_context(self, incremental_batch: bool) -> Dict[str, Any]: batch_context["model"] = self.model.to_dict() batch_context["sql"] = self.model.compiled_code batch_context["compiled_code"] = self.model.compiled_code + batch_context["batch_id"] = self.batch_id(start_time=start_time) # Add incremental context variables for batches running incrementally if incremental_batch: @@ -192,6 +193,10 @@ def truncate_timestamp(timestamp: datetime, batch_size: BatchSize) -> datetime: return truncated + @staticmethod + def batch_id(start_time: datetime) -> str: + return start_time.strftime("%Y%M%d%H") + @staticmethod def format_batch_start( batch_start: Optional[datetime], batch_size: BatchSize diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index e52dd8d0abd..de36cddacb2 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -545,7 +545,8 @@ def _execute_microbatch_materialization( ) # Update jinja context with batch context members batch_context = microbatch_builder.build_batch_context( - incremental_batch=self.relation_exists + incremental_batch=self.relation_exists, + start_time=batch[0], ) context.update(batch_context) From 749ce26c5f6bde4021e103669b58e7c5e1f8132b Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Thu, 21 Nov 2024 12:57:00 -0600 Subject: [PATCH 02/17] Add changie doc --- .changes/unreleased/Features-20241121-125630.yaml | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .changes/unreleased/Features-20241121-125630.yaml diff --git a/.changes/unreleased/Features-20241121-125630.yaml b/.changes/unreleased/Features-20241121-125630.yaml new file mode 100644 index 00000000000..563c58baeef --- /dev/null +++ b/.changes/unreleased/Features-20241121-125630.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Add `batch_id` to model context +time: 2024-11-21T12:56:30.715473-06:00 +custom: + Author: QMalcolm + Issue: "11025" From 4ad005df83c2b9c1940daddfdef25e7963f319c3 Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Tue, 26 Nov 2024 15:58:56 -0600 Subject: [PATCH 03/17] Update `format_batch_start` to assume `batch_start` is always provided --- core/dbt/materializations/incremental/microbatch.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/core/dbt/materializations/incremental/microbatch.py b/core/dbt/materializations/incremental/microbatch.py index 2cdaf71c138..c596ae945e6 100644 --- a/core/dbt/materializations/incremental/microbatch.py +++ b/core/dbt/materializations/incremental/microbatch.py @@ -198,12 +198,7 @@ def batch_id(start_time: datetime) -> str: return start_time.strftime("%Y%M%d%H") @staticmethod - def format_batch_start( - batch_start: Optional[datetime], batch_size: BatchSize - ) -> Optional[str]: - if batch_start is None: - return batch_start - + def format_batch_start(batch_start: datetime, batch_size: BatchSize) -> str: return str( batch_start.date() if (batch_start and batch_size != BatchSize.hour) else batch_start ) From 30026f8ef60dc179ffdab90ece0e773f4af73670 Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Tue, 26 Nov 2024 16:05:12 -0600 Subject: [PATCH 04/17] Add "runtime only" property `batch_context` to `ModelNode` By it being "runtime only" we mean that it doesn't exist on the artifact and thus won't be written out to the manifest artifact. --- core/dbt/contracts/graph/nodes.py | 2 ++ core/dbt/materializations/incremental/microbatch.py | 8 ++++++++ 2 files changed, 10 insertions(+) diff --git a/core/dbt/contracts/graph/nodes.py b/core/dbt/contracts/graph/nodes.py index 0eaf758ae5a..9ec298eef18 100644 --- a/core/dbt/contracts/graph/nodes.py +++ b/core/dbt/contracts/graph/nodes.py @@ -81,6 +81,7 @@ ) from dbt.exceptions import ContractBreakingChangeError, ParsingError, ValidationError from dbt.flags import get_flags +from dbt.materializations.incremental.microbatch import BatchContext from dbt.node_types import ( REFABLE_NODE_TYPES, VERSIONED_NODE_TYPES, @@ -445,6 +446,7 @@ def resource_class(cls) -> Type[HookNodeResource]: @dataclass class ModelNode(ModelResource, CompiledNode): previous_batch_results: Optional[BatchResults] = None + batch_context: Optional[BatchContext] = None _has_this: Optional[bool] = None def __post_serialize__(self, dct: Dict, context: Optional[Dict] = None): diff --git a/core/dbt/materializations/incremental/microbatch.py b/core/dbt/materializations/incremental/microbatch.py index c596ae945e6..3043a6e233d 100644 --- a/core/dbt/materializations/incremental/microbatch.py +++ b/core/dbt/materializations/incremental/microbatch.py @@ -2,6 +2,7 @@ from typing import Any, Dict, List, Optional import pytz +from attr import dataclass from dbt.artifacts.resources.types import BatchSize from dbt.artifacts.schemas.batch_results import BatchType @@ -9,6 +10,13 @@ from dbt.exceptions import DbtInternalError, DbtRuntimeError +@dataclass +class BatchContext: + id: str + event_time_start: datetime + event_time_end: datetime + + class MicrobatchBuilder: """A utility class for building microbatch definitions associated with a specific model""" From e4d04a0b784d906cc19a6820e043c1a35695415e Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Tue, 26 Nov 2024 16:06:27 -0600 Subject: [PATCH 05/17] Begin populating `batch_context` during materialization execution for microbatch batches --- .../dbt/materializations/incremental/microbatch.py | 5 ++--- core/dbt/task/run.py | 14 ++++++++++---- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/core/dbt/materializations/incremental/microbatch.py b/core/dbt/materializations/incremental/microbatch.py index 3043a6e233d..774a5ee71c3 100644 --- a/core/dbt/materializations/incremental/microbatch.py +++ b/core/dbt/materializations/incremental/microbatch.py @@ -120,7 +120,6 @@ def build_batch_context(self, incremental_batch: bool, start_time: datetime) -> batch_context["model"] = self.model.to_dict() batch_context["sql"] = self.model.compiled_code batch_context["compiled_code"] = self.model.compiled_code - batch_context["batch_id"] = self.batch_id(start_time=start_time) # Add incremental context variables for batches running incrementally if incremental_batch: @@ -202,8 +201,8 @@ def truncate_timestamp(timestamp: datetime, batch_size: BatchSize) -> datetime: return truncated @staticmethod - def batch_id(start_time: datetime) -> str: - return start_time.strftime("%Y%M%d%H") + def batch_id(start_time: datetime, batch_size: BatchSize) -> str: + return MicrobatchBuilder.format_batch_start(start_time, batch_size).replace("_", "") @staticmethod def format_batch_start(batch_start: datetime, batch_size: BatchSize) -> str: diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index de36cddacb2..34badf91b40 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -39,7 +39,7 @@ from dbt.exceptions import CompilationError, DbtInternalError, DbtRuntimeError from dbt.graph import ResourceTypeSelector from dbt.hooks import get_hook_dict -from dbt.materializations.incremental.microbatch import MicrobatchBuilder +from dbt.materializations.incremental.microbatch import BatchContext, MicrobatchBuilder from dbt.node_types import NodeType, RunHookType from dbt.task import group_lookup from dbt.task.base import BaseRunner @@ -353,7 +353,7 @@ def set_batches(self, batches: Dict[int, BatchType]) -> None: def describe_node(self) -> str: return f"{self.node.language} microbatch model {self.get_node_representation()}" - def describe_batch(self, batch_start: Optional[datetime]) -> str: + def describe_batch(self, batch_start: datetime) -> str: # Only visualize date if batch_start year/month/day formatted_batch_start = MicrobatchBuilder.format_batch_start( batch_start, self.node.config.batch_size @@ -530,10 +530,16 @@ def _execute_microbatch_materialization( # call materialization_macro to get a batch-level run result start_time = time.perf_counter() try: - # Set start/end in context prior to re-compiling + # LEGACY: Set start/end in context prior to re-compiling (Will be removed for 1.10+) + # TODO: REMOVE before 1.10 GA model.config["__dbt_internal_microbatch_event_time_start"] = batch[0] model.config["__dbt_internal_microbatch_event_time_end"] = batch[1] - + # Create batch context on model node prior to re-compiling + model.batch_context = BatchContext( + id=MicrobatchBuilder.batch_id(batch[0], model.config.batch_size), + event_time_start=batch[0], + event_time_end=batch[1], + ) # Recompile node to re-resolve refs with event time filters rendered, update context self.compiler.compile_node( model, From ca351eb96fe298cd17d12840fdbdc49f88a27b7a Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Tue, 26 Nov 2024 16:17:10 -0600 Subject: [PATCH 06/17] Fix circular import --- core/dbt/contracts/graph/nodes.py | 8 +++++++- core/dbt/materializations/incremental/microbatch.py | 10 +--------- core/dbt/task/run.py | 7 +++---- 3 files changed, 11 insertions(+), 14 deletions(-) diff --git a/core/dbt/contracts/graph/nodes.py b/core/dbt/contracts/graph/nodes.py index 9ec298eef18..1bfd25c695a 100644 --- a/core/dbt/contracts/graph/nodes.py +++ b/core/dbt/contracts/graph/nodes.py @@ -81,7 +81,6 @@ ) from dbt.exceptions import ContractBreakingChangeError, ParsingError, ValidationError from dbt.flags import get_flags -from dbt.materializations.incremental.microbatch import BatchContext from dbt.node_types import ( REFABLE_NODE_TYPES, VERSIONED_NODE_TYPES, @@ -443,6 +442,13 @@ def resource_class(cls) -> Type[HookNodeResource]: return HookNodeResource +@dataclass +class BatchContext: + id: str + event_time_start: datetime + event_time_end: datetime + + @dataclass class ModelNode(ModelResource, CompiledNode): previous_batch_results: Optional[BatchResults] = None diff --git a/core/dbt/materializations/incremental/microbatch.py b/core/dbt/materializations/incremental/microbatch.py index 774a5ee71c3..eb14f151699 100644 --- a/core/dbt/materializations/incremental/microbatch.py +++ b/core/dbt/materializations/incremental/microbatch.py @@ -2,7 +2,6 @@ from typing import Any, Dict, List, Optional import pytz -from attr import dataclass from dbt.artifacts.resources.types import BatchSize from dbt.artifacts.schemas.batch_results import BatchType @@ -10,13 +9,6 @@ from dbt.exceptions import DbtInternalError, DbtRuntimeError -@dataclass -class BatchContext: - id: str - event_time_start: datetime - event_time_end: datetime - - class MicrobatchBuilder: """A utility class for building microbatch definitions associated with a specific model""" @@ -108,7 +100,7 @@ def build_batches(self, start: datetime, end: datetime) -> List[BatchType]: return batches - def build_batch_context(self, incremental_batch: bool, start_time: datetime) -> Dict[str, Any]: + def build_batch_context(self, incremental_batch: bool) -> Dict[str, Any]: """ Create context with entries that reflect microbatch model + incremental execution state diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index 34badf91b40..d6b198d668d 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -27,7 +27,7 @@ from dbt.config import RuntimeConfig from dbt.context.providers import generate_runtime_model_context from dbt.contracts.graph.manifest import Manifest -from dbt.contracts.graph.nodes import HookNode, ModelNode, ResultNode +from dbt.contracts.graph.nodes import BatchContext, HookNode, ModelNode, ResultNode from dbt.events.types import ( GenericExceptionOnRun, LogHookEndLine, @@ -39,7 +39,7 @@ from dbt.exceptions import CompilationError, DbtInternalError, DbtRuntimeError from dbt.graph import ResourceTypeSelector from dbt.hooks import get_hook_dict -from dbt.materializations.incremental.microbatch import BatchContext, MicrobatchBuilder +from dbt.materializations.incremental.microbatch import MicrobatchBuilder from dbt.node_types import NodeType, RunHookType from dbt.task import group_lookup from dbt.task.base import BaseRunner @@ -551,8 +551,7 @@ def _execute_microbatch_materialization( ) # Update jinja context with batch context members batch_context = microbatch_builder.build_batch_context( - incremental_batch=self.relation_exists, - start_time=batch[0], + incremental_batch=self.relation_exists ) context.update(batch_context) From 585fb040582e41689db1ef5e8088815c90c00a87 Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Wed, 27 Nov 2024 09:41:15 -0600 Subject: [PATCH 07/17] Fixup MicrobatchBuilder.batch_id property method --- core/dbt/materializations/incremental/microbatch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/dbt/materializations/incremental/microbatch.py b/core/dbt/materializations/incremental/microbatch.py index eb14f151699..2f2d31e08f5 100644 --- a/core/dbt/materializations/incremental/microbatch.py +++ b/core/dbt/materializations/incremental/microbatch.py @@ -194,7 +194,7 @@ def truncate_timestamp(timestamp: datetime, batch_size: BatchSize) -> datetime: @staticmethod def batch_id(start_time: datetime, batch_size: BatchSize) -> str: - return MicrobatchBuilder.format_batch_start(start_time, batch_size).replace("_", "") + return MicrobatchBuilder.format_batch_start(start_time, batch_size).replace("-", "") @staticmethod def format_batch_start(batch_start: datetime, batch_size: BatchSize) -> str: From 45daec72f4953be4c157e6e2ab5671455a569396 Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Wed, 27 Nov 2024 09:41:57 -0600 Subject: [PATCH 08/17] Ensure MicrobatchModelRunner doesn't double compile batches We were compiling the node for each batch _twice_. Besides making microbatch models more expensive than they needed to be, double compiling wasn't causing any issue. However the first compilation was happening _before_ we had added the batch context information to the model node for the batch. This was leading to models which try to access the `batch_context` information on the model to blow up, which was undesirable. As such, we've now gone and skipped the first compilation. We've done this similar to how SavedQuery nodes skip compilation. --- core/dbt/task/run.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index d6b198d668d..7c72f17bfd3 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -341,6 +341,13 @@ def __init__(self, config, adapter, node, node_index: int, num_nodes: int): self.batches: Dict[int, BatchType] = {} self.relation_exists: bool = False + def compile(self, manifest: Manifest): + # The default compile function is _always_ called. However, we do our + # compilation _later_ in `_execute_microbatch_materialization`. This + # meant the node was being compiled _twice_ for each batch. To get around + # this, we've overriden the default compile method to do nothing + return self.node + def set_batch_idx(self, batch_idx: int) -> None: self.batch_idx = batch_idx From c925282b0fe7701e806e4b7a163705b17c429a7e Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Wed, 27 Nov 2024 10:00:42 -0600 Subject: [PATCH 09/17] Add `__post_serialize__` method to `BatchContext` to ensure correct dict shape This is weird, but necessary, I apologize. Mashumaro handles the dictification of this class via a compile time generated `to_dict` method based off of the _typing_ of th class. By default `datetime` types are converted to strings. We don't want that, we want them to stay datetimes. --- core/dbt/contracts/graph/nodes.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/core/dbt/contracts/graph/nodes.py b/core/dbt/contracts/graph/nodes.py index 1bfd25c695a..741cdfcadf4 100644 --- a/core/dbt/contracts/graph/nodes.py +++ b/core/dbt/contracts/graph/nodes.py @@ -93,6 +93,7 @@ ConstraintType, ModelLevelConstraint, ) +from dbt_common.dataclass_schema import dbtClassMixin from dbt_common.events.contextvars import set_log_contextvars from dbt_common.events.functions import warn_or_error @@ -443,11 +444,24 @@ def resource_class(cls) -> Type[HookNodeResource]: @dataclass -class BatchContext: +class BatchContext(dbtClassMixin): id: str event_time_start: datetime event_time_end: datetime + def __post_serialize__(self, data, context): + # This is insane, but necessary, I apologize. Mashumaro handles the + # dictification of this class via a compile time generated `to_dict` + # method based off of the _typing_ of th class. By default `datetime` + # types are converted to strings. We don't want that, we want them to + # stay datetimes. + # Note: This is safe because the `BatchContext` isn't part of the artifact + # and thus doesn't get written out. + new_data = super().__post_serialize__(data, context) + new_data["event_time_start"] = self.event_time_start + new_data["event_time_end"] = self.event_time_end + return new_data + @dataclass class ModelNode(ModelResource, CompiledNode): From 345e474db0d6f6b4c001dffb667743baa064f3b7 Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Wed, 27 Nov 2024 10:28:30 -0600 Subject: [PATCH 10/17] Update tests to check for `batch_context` --- tests/functional/microbatch/test_microbatch.py | 18 ++++++++++++++++++ tests/unit/contracts/graph/test_manifest.py | 1 + .../incremental/test_microbatch.py | 1 - 3 files changed, 19 insertions(+), 1 deletion(-) diff --git a/tests/functional/microbatch/test_microbatch.py b/tests/functional/microbatch/test_microbatch.py index f0f4097b9c9..01d06a0af77 100644 --- a/tests/functional/microbatch/test_microbatch.py +++ b/tests/functional/microbatch/test_microbatch.py @@ -498,6 +498,13 @@ def test_run_with_event_time(self, project): {{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }} {{ log("start: "~ model.config.__dbt_internal_microbatch_event_time_start, info=True)}} {{ log("end: "~ model.config.__dbt_internal_microbatch_event_time_end, info=True)}} +{% if model.batch_context %} +{{ log("batch_context.event_time_start: "~ model.batch_context.event_time_start, info=True)}} +{{ log("batch_context.event_time_end: "~ model.batch_context.event_time_end, info=True)}} +{{ log("batch_context.id: "~ model.batch_context.id, info=True)}} +{{ log("start timezone: "~ model.batch_context.event_time_start.tzinfo, info=True)}} +{{ log("end timezone: "~ model.batch_context.event_time_end.tzinfo, info=True)}} +{% endif %} select * from {{ ref('input_model') }} """ @@ -516,12 +523,23 @@ def test_run_with_event_time_logs(self, project): assert "start: 2020-01-01 00:00:00+00:00" in logs assert "end: 2020-01-02 00:00:00+00:00" in logs + assert "batch_context.event_time_start: 2020-01-01 00:00:00+00:00" in logs + assert "batch_context.event_time_end: 2020-01-02 00:00:00+00:00" in logs + assert "batch_context.id: 20200101" in logs + assert "start timezone: UTC" in logs + assert "end timezone: UTC" in logs assert "start: 2020-01-02 00:00:00+00:00" in logs assert "end: 2020-01-03 00:00:00+00:00" in logs + assert "batch_context.event_time_start: 2020-01-02 00:00:00+00:00" in logs + assert "batch_context.event_time_end: 2020-01-03 00:00:00+00:00" in logs + assert "batch_context.id: 20200102" in logs assert "start: 2020-01-03 00:00:00+00:00" in logs assert "end: 2020-01-03 13:57:00+00:00" in logs + assert "batch_context.event_time_start: 2020-01-03 00:00:00+00:00" in logs + assert "batch_context.event_time_end: 2020-01-03 13:57:00+00:00" in logs + assert "batch_context.id: 20200103" in logs microbatch_model_failing_incremental_partition_sql = """ diff --git a/tests/unit/contracts/graph/test_manifest.py b/tests/unit/contracts/graph/test_manifest.py index 0f3a80e5039..93e0fb3940c 100644 --- a/tests/unit/contracts/graph/test_manifest.py +++ b/tests/unit/contracts/graph/test_manifest.py @@ -96,6 +96,7 @@ "deprecation_date", "defer_relation", "time_spine", + "batch_context", } ) diff --git a/tests/unit/materializations/incremental/test_microbatch.py b/tests/unit/materializations/incremental/test_microbatch.py index f114d8649c3..01474be7b40 100644 --- a/tests/unit/materializations/incremental/test_microbatch.py +++ b/tests/unit/materializations/incremental/test_microbatch.py @@ -605,7 +605,6 @@ def test_truncate_timestamp(self, timestamp, batch_size, expected_timestamp): @pytest.mark.parametrize( "batch_size,batch_start,expected_formatted_batch_start", [ - (None, None, None), (BatchSize.year, datetime(2020, 1, 1, 1), "2020-01-01"), (BatchSize.month, datetime(2020, 1, 1, 1), "2020-01-01"), (BatchSize.day, datetime(2020, 1, 1, 1), "2020-01-01"), From c27c494cfbec6ace1ffe3ac6cce175c5f2ddbad8 Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Wed, 27 Nov 2024 10:29:45 -0600 Subject: [PATCH 11/17] Update `resolve_event_time_filter` to use new `batch_context` --- core/dbt/context/providers.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/dbt/context/providers.py b/core/dbt/context/providers.py index 188ad5480b0..ca6a46c4f7a 100644 --- a/core/dbt/context/providers.py +++ b/core/dbt/context/providers.py @@ -244,9 +244,10 @@ def resolve_event_time_filter(self, target: ManifestNode) -> Optional[EventTimeF and self.model.config.materialized == "incremental" and self.model.config.incremental_strategy == "microbatch" and self.manifest.use_microbatch_batches(project_name=self.config.project_name) + and self.model.batch_context is not None ): - start = self.model.config.get("__dbt_internal_microbatch_event_time_start") - end = self.model.config.get("__dbt_internal_microbatch_event_time_end") + start = self.model.batch_context.event_time_start + end = self.model.batch_context.event_time_end if start is not None or end is not None: event_time_filter = EventTimeFilter( From 4588e2324a274e098a4d63d68c15798072e796fb Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Wed, 27 Nov 2024 11:27:58 -0600 Subject: [PATCH 12/17] Stop testing for batchless compiled code for microbatch models In 45daec72f4953be4c157e6e2ab5671455a569396 we stopped an extra compilation that was happening per batch prior to the batch_context being loaded. Stopping this extra compilation means that compiled sql for the microbatch model without the event time filter / batch context is no longer produced. We have discussed this and _believe_ it is okay given that this is a new node type that has not hit GA yet. --- tests/functional/microbatch/test_microbatch.py | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/tests/functional/microbatch/test_microbatch.py b/tests/functional/microbatch/test_microbatch.py index 01d06a0af77..3b299fc6910 100644 --- a/tests/functional/microbatch/test_microbatch.py +++ b/tests/functional/microbatch/test_microbatch.py @@ -693,16 +693,6 @@ def test_run_with_event_time(self, project): with patch_microbatch_end_time("2020-01-03 13:57:00"): run_dbt(["run"]) - # Compiled paths - compiled model without filter only - assert read_file( - project.project_root, - "target", - "compiled", - "test", - "models", - "microbatch_model.sql", - ) - # Compiled paths - batch compilations assert read_file( project.project_root, From 8e7511178e872dd3707582617d92f98eeca22618 Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Wed, 27 Nov 2024 11:52:00 -0600 Subject: [PATCH 13/17] Rename `ModelNode.batch_context` to `ModelNode.batch` --- core/dbt/context/providers.py | 6 ++-- core/dbt/contracts/graph/nodes.py | 2 +- core/dbt/task/run.py | 2 +- .../functional/microbatch/test_microbatch.py | 30 +++++++++---------- tests/unit/contracts/graph/test_manifest.py | 2 +- 5 files changed, 21 insertions(+), 21 deletions(-) diff --git a/core/dbt/context/providers.py b/core/dbt/context/providers.py index ca6a46c4f7a..f9d436a7840 100644 --- a/core/dbt/context/providers.py +++ b/core/dbt/context/providers.py @@ -244,10 +244,10 @@ def resolve_event_time_filter(self, target: ManifestNode) -> Optional[EventTimeF and self.model.config.materialized == "incremental" and self.model.config.incremental_strategy == "microbatch" and self.manifest.use_microbatch_batches(project_name=self.config.project_name) - and self.model.batch_context is not None + and self.model.batch is not None ): - start = self.model.batch_context.event_time_start - end = self.model.batch_context.event_time_end + start = self.model.batch.event_time_start + end = self.model.batch.event_time_end if start is not None or end is not None: event_time_filter = EventTimeFilter( diff --git a/core/dbt/contracts/graph/nodes.py b/core/dbt/contracts/graph/nodes.py index 741cdfcadf4..8fc39c7621b 100644 --- a/core/dbt/contracts/graph/nodes.py +++ b/core/dbt/contracts/graph/nodes.py @@ -466,7 +466,7 @@ def __post_serialize__(self, data, context): @dataclass class ModelNode(ModelResource, CompiledNode): previous_batch_results: Optional[BatchResults] = None - batch_context: Optional[BatchContext] = None + batch: Optional[BatchContext] = None _has_this: Optional[bool] = None def __post_serialize__(self, dct: Dict, context: Optional[Dict] = None): diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index 7c72f17bfd3..8c95a7583f7 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -542,7 +542,7 @@ def _execute_microbatch_materialization( model.config["__dbt_internal_microbatch_event_time_start"] = batch[0] model.config["__dbt_internal_microbatch_event_time_end"] = batch[1] # Create batch context on model node prior to re-compiling - model.batch_context = BatchContext( + model.batch = BatchContext( id=MicrobatchBuilder.batch_id(batch[0], model.config.batch_size), event_time_start=batch[0], event_time_end=batch[1], diff --git a/tests/functional/microbatch/test_microbatch.py b/tests/functional/microbatch/test_microbatch.py index 3b299fc6910..055460f3a6d 100644 --- a/tests/functional/microbatch/test_microbatch.py +++ b/tests/functional/microbatch/test_microbatch.py @@ -498,12 +498,12 @@ def test_run_with_event_time(self, project): {{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }} {{ log("start: "~ model.config.__dbt_internal_microbatch_event_time_start, info=True)}} {{ log("end: "~ model.config.__dbt_internal_microbatch_event_time_end, info=True)}} -{% if model.batch_context %} -{{ log("batch_context.event_time_start: "~ model.batch_context.event_time_start, info=True)}} -{{ log("batch_context.event_time_end: "~ model.batch_context.event_time_end, info=True)}} -{{ log("batch_context.id: "~ model.batch_context.id, info=True)}} -{{ log("start timezone: "~ model.batch_context.event_time_start.tzinfo, info=True)}} -{{ log("end timezone: "~ model.batch_context.event_time_end.tzinfo, info=True)}} +{% if model.batch %} +{{ log("batch.event_time_start: "~ model.batch.event_time_start, info=True)}} +{{ log("batch.event_time_end: "~ model.batch.event_time_end, info=True)}} +{{ log("batch.id: "~ model.batch.id, info=True)}} +{{ log("start timezone: "~ model.batch.event_time_start.tzinfo, info=True)}} +{{ log("end timezone: "~ model.batch.event_time_end.tzinfo, info=True)}} {% endif %} select * from {{ ref('input_model') }} """ @@ -523,23 +523,23 @@ def test_run_with_event_time_logs(self, project): assert "start: 2020-01-01 00:00:00+00:00" in logs assert "end: 2020-01-02 00:00:00+00:00" in logs - assert "batch_context.event_time_start: 2020-01-01 00:00:00+00:00" in logs - assert "batch_context.event_time_end: 2020-01-02 00:00:00+00:00" in logs - assert "batch_context.id: 20200101" in logs + assert "batch.event_time_start: 2020-01-01 00:00:00+00:00" in logs + assert "batch.event_time_end: 2020-01-02 00:00:00+00:00" in logs + assert "batch.id: 20200101" in logs assert "start timezone: UTC" in logs assert "end timezone: UTC" in logs assert "start: 2020-01-02 00:00:00+00:00" in logs assert "end: 2020-01-03 00:00:00+00:00" in logs - assert "batch_context.event_time_start: 2020-01-02 00:00:00+00:00" in logs - assert "batch_context.event_time_end: 2020-01-03 00:00:00+00:00" in logs - assert "batch_context.id: 20200102" in logs + assert "batch.event_time_start: 2020-01-02 00:00:00+00:00" in logs + assert "batch.event_time_end: 2020-01-03 00:00:00+00:00" in logs + assert "batch.id: 20200102" in logs assert "start: 2020-01-03 00:00:00+00:00" in logs assert "end: 2020-01-03 13:57:00+00:00" in logs - assert "batch_context.event_time_start: 2020-01-03 00:00:00+00:00" in logs - assert "batch_context.event_time_end: 2020-01-03 13:57:00+00:00" in logs - assert "batch_context.id: 20200103" in logs + assert "batch.event_time_start: 2020-01-03 00:00:00+00:00" in logs + assert "batch.event_time_end: 2020-01-03 13:57:00+00:00" in logs + assert "batch.id: 20200103" in logs microbatch_model_failing_incremental_partition_sql = """ diff --git a/tests/unit/contracts/graph/test_manifest.py b/tests/unit/contracts/graph/test_manifest.py index 93e0fb3940c..3505ee80037 100644 --- a/tests/unit/contracts/graph/test_manifest.py +++ b/tests/unit/contracts/graph/test_manifest.py @@ -96,7 +96,7 @@ "deprecation_date", "defer_relation", "time_spine", - "batch_context", + "batch", } ) From 1779949dfbaccbe518e86b6710b83fb769398e03 Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Wed, 27 Nov 2024 12:04:52 -0600 Subject: [PATCH 14/17] Rename `build_batch_context` to `build_jinja_context_for_batch` The name `build_batch_context` was confusing as 1) We have a `BatchContext` object, which the method was not building 2) The method builds the jinja context for the batch As such it felt appropriate to rename the method to more accurately communicate what it does. --- .../materializations/incremental/microbatch.py | 16 ++++++++-------- core/dbt/task/run.py | 4 ++-- .../incremental/test_microbatch.py | 8 ++++---- 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/core/dbt/materializations/incremental/microbatch.py b/core/dbt/materializations/incremental/microbatch.py index 2f2d31e08f5..6de6945704c 100644 --- a/core/dbt/materializations/incremental/microbatch.py +++ b/core/dbt/materializations/incremental/microbatch.py @@ -100,25 +100,25 @@ def build_batches(self, start: datetime, end: datetime) -> List[BatchType]: return batches - def build_batch_context(self, incremental_batch: bool) -> Dict[str, Any]: + def build_jinja_context_for_batch(self, incremental_batch: bool) -> Dict[str, Any]: """ Create context with entries that reflect microbatch model + incremental execution state Assumes self.model has been (re)-compiled with necessary batch filters applied. """ - batch_context: Dict[str, Any] = {} + jinja_context: Dict[str, Any] = {} # Microbatch model properties - batch_context["model"] = self.model.to_dict() - batch_context["sql"] = self.model.compiled_code - batch_context["compiled_code"] = self.model.compiled_code + jinja_context["model"] = self.model.to_dict() + jinja_context["sql"] = self.model.compiled_code + jinja_context["compiled_code"] = self.model.compiled_code # Add incremental context variables for batches running incrementally if incremental_batch: - batch_context["is_incremental"] = lambda: True - batch_context["should_full_refresh"] = lambda: False + jinja_context["is_incremental"] = lambda: True + jinja_context["should_full_refresh"] = lambda: False - return batch_context + return jinja_context @staticmethod def offset_timestamp(timestamp: datetime, batch_size: BatchSize, offset: int) -> datetime: diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index 8c95a7583f7..55b73be3d80 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -557,10 +557,10 @@ def _execute_microbatch_materialization( ), ) # Update jinja context with batch context members - batch_context = microbatch_builder.build_batch_context( + jinja_context = microbatch_builder.build_jinja_context_for_batch( incremental_batch=self.relation_exists ) - context.update(batch_context) + context.update(jinja_context) # Materialize batch and cache any materialized relations result = MacroGenerator( diff --git a/tests/unit/materializations/incremental/test_microbatch.py b/tests/unit/materializations/incremental/test_microbatch.py index 01474be7b40..3d827a79975 100644 --- a/tests/unit/materializations/incremental/test_microbatch.py +++ b/tests/unit/materializations/incremental/test_microbatch.py @@ -489,11 +489,11 @@ def test_build_batches(self, microbatch_model, start, end, batch_size, expected_ assert len(actual_batches) == len(expected_batches) assert actual_batches == expected_batches - def test_build_batch_context_incremental_batch(self, microbatch_model): + def test_build_jinja_context_for_incremental_batch(self, microbatch_model): microbatch_builder = MicrobatchBuilder( model=microbatch_model, is_incremental=True, event_time_start=None, event_time_end=None ) - context = microbatch_builder.build_batch_context(incremental_batch=True) + context = microbatch_builder.build_jinja_context_for_batch(incremental_batch=True) assert context["model"] == microbatch_model.to_dict() assert context["sql"] == microbatch_model.compiled_code @@ -502,11 +502,11 @@ def test_build_batch_context_incremental_batch(self, microbatch_model): assert context["is_incremental"]() is True assert context["should_full_refresh"]() is False - def test_build_batch_context_incremental_batch_false(self, microbatch_model): + def test_build_jinja_context_for_incremental_batch_false(self, microbatch_model): microbatch_builder = MicrobatchBuilder( model=microbatch_model, is_incremental=True, event_time_start=None, event_time_end=None ) - context = microbatch_builder.build_batch_context(incremental_batch=False) + context = microbatch_builder.build_jinja_context_for_batch(incremental_batch=False) assert context["model"] == microbatch_model.to_dict() assert context["sql"] == microbatch_model.compiled_code From d54f0ebd4f962cb013330fd38c6f70a60dcfa628 Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Wed, 27 Nov 2024 12:13:16 -0600 Subject: [PATCH 15/17] Rename test macro `invalid_batch_context_macro_sql` to `invalid_batch_jinja_context_macro_sql` This rename was to make it more clear that the jinja context for a batch was being checked, as a batch_context has a slightly different connotation. --- tests/functional/microbatch/test_microbatch.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/functional/microbatch/test_microbatch.py b/tests/functional/microbatch/test_microbatch.py index 055460f3a6d..e3acc415273 100644 --- a/tests/functional/microbatch/test_microbatch.py +++ b/tests/functional/microbatch/test_microbatch.py @@ -64,8 +64,8 @@ select * from {{ ref('microbatch_model') }} """ -invalid_batch_context_macro_sql = """ -{% macro check_invalid_batch_context() %} +invalid_batch_jinja_context_macro_sql = """ +{% macro check_invalid_batch_jinja_context() %} {% if model is not mapping %} {{ exceptions.raise_compiler_error("`model` is invalid: expected mapping type") }} @@ -83,9 +83,9 @@ """ microbatch_model_with_context_checks_sql = """ -{{ config(pre_hook="{{ check_invalid_batch_context() }}", materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }} +{{ config(pre_hook="{{ check_invalid_batch_jinja_context() }}", materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }} -{{ check_invalid_batch_context() }} +{{ check_invalid_batch_jinja_context() }} select * from {{ ref('input_model') }} """ @@ -404,7 +404,7 @@ class TestMicrobatchJinjaContext(BaseMicrobatchTest): @pytest.fixture(scope="class") def macros(self): - return {"check_batch_context.sql": invalid_batch_context_macro_sql} + return {"check_batch_jinja_context.sql": invalid_batch_jinja_context_macro_sql} @pytest.fixture(scope="class") def models(self): From ebe46f1bd2f4698377bfad8568da38dfe0fd281c Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Wed, 27 Nov 2024 12:17:30 -0600 Subject: [PATCH 16/17] Update changie doc --- .changes/unreleased/Features-20241121-125630.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.changes/unreleased/Features-20241121-125630.yaml b/.changes/unreleased/Features-20241121-125630.yaml index 563c58baeef..befd9fac790 100644 --- a/.changes/unreleased/Features-20241121-125630.yaml +++ b/.changes/unreleased/Features-20241121-125630.yaml @@ -1,5 +1,5 @@ kind: Features -body: Add `batch_id` to model context +body: Add `batch` context object to model jinja context time: 2024-11-21T12:56:30.715473-06:00 custom: Author: QMalcolm From b3e270351bd4848a4cad1e1f5dc3009074779f8a Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Wed, 27 Nov 2024 15:26:24 -0500 Subject: [PATCH 17/17] move microbatch compilation to .compile method --- core/dbt/task/run.py | 47 ++++++++++++++++++++++---------------------- 1 file changed, 24 insertions(+), 23 deletions(-) diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index 55b73be3d80..fb4ee145429 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -342,10 +342,30 @@ def __init__(self, config, adapter, node, node_index: int, num_nodes: int): self.relation_exists: bool = False def compile(self, manifest: Manifest): - # The default compile function is _always_ called. However, we do our - # compilation _later_ in `_execute_microbatch_materialization`. This - # meant the node was being compiled _twice_ for each batch. To get around - # this, we've overriden the default compile method to do nothing + if self.batch_idx is not None: + batch = self.batches[self.batch_idx] + + # LEGACY: Set start/end in context prior to re-compiling (Will be removed for 1.10+) + # TODO: REMOVE before 1.10 GA + self.node.config["__dbt_internal_microbatch_event_time_start"] = batch[0] + self.node.config["__dbt_internal_microbatch_event_time_end"] = batch[1] + # Create batch context on model node prior to re-compiling + self.node.batch = BatchContext( + id=MicrobatchBuilder.batch_id(batch[0], self.node.config.batch_size), + event_time_start=batch[0], + event_time_end=batch[1], + ) + # Recompile node to re-resolve refs with event time filters rendered, update context + self.compiler.compile_node( + self.node, + manifest, + {}, + split_suffix=MicrobatchBuilder.format_batch_start( + batch[0], self.node.config.batch_size + ), + ) + + # Skips compilation for non-batch runs return self.node def set_batch_idx(self, batch_idx: int) -> None: @@ -537,25 +557,6 @@ def _execute_microbatch_materialization( # call materialization_macro to get a batch-level run result start_time = time.perf_counter() try: - # LEGACY: Set start/end in context prior to re-compiling (Will be removed for 1.10+) - # TODO: REMOVE before 1.10 GA - model.config["__dbt_internal_microbatch_event_time_start"] = batch[0] - model.config["__dbt_internal_microbatch_event_time_end"] = batch[1] - # Create batch context on model node prior to re-compiling - model.batch = BatchContext( - id=MicrobatchBuilder.batch_id(batch[0], model.config.batch_size), - event_time_start=batch[0], - event_time_end=batch[1], - ) - # Recompile node to re-resolve refs with event time filters rendered, update context - self.compiler.compile_node( - model, - manifest, - {}, - split_suffix=MicrobatchBuilder.format_batch_start( - batch[0], model.config.batch_size - ), - ) # Update jinja context with batch context members jinja_context = microbatch_builder.build_jinja_context_for_batch( incremental_batch=self.relation_exists