From 42329f11bb789d73d8b0e742d7547464ffdfb67f Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Wed, 20 Nov 2024 23:27:48 -0600 Subject: [PATCH] Add `batch_id` to jinja context of microbatch batches --- core/dbt/materializations/incremental/microbatch.py | 7 ++++++- core/dbt/task/run.py | 3 ++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/core/dbt/materializations/incremental/microbatch.py b/core/dbt/materializations/incremental/microbatch.py index b89c834d4a2..2cdaf71c138 100644 --- a/core/dbt/materializations/incremental/microbatch.py +++ b/core/dbt/materializations/incremental/microbatch.py @@ -100,7 +100,7 @@ def build_batches(self, start: datetime, end: datetime) -> List[BatchType]: return batches - def build_batch_context(self, incremental_batch: bool) -> Dict[str, Any]: + def build_batch_context(self, incremental_batch: bool, start_time: datetime) -> Dict[str, Any]: """ Create context with entries that reflect microbatch model + incremental execution state @@ -112,6 +112,7 @@ def build_batch_context(self, incremental_batch: bool) -> Dict[str, Any]: batch_context["model"] = self.model.to_dict() batch_context["sql"] = self.model.compiled_code batch_context["compiled_code"] = self.model.compiled_code + batch_context["batch_id"] = self.batch_id(start_time=start_time) # Add incremental context variables for batches running incrementally if incremental_batch: @@ -192,6 +193,10 @@ def truncate_timestamp(timestamp: datetime, batch_size: BatchSize) -> datetime: return truncated + @staticmethod + def batch_id(start_time: datetime) -> str: + return start_time.strftime("%Y%M%d%H") + @staticmethod def format_batch_start( batch_start: Optional[datetime], batch_size: BatchSize diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index 0c2888bb325..df8bea0289f 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -545,7 +545,8 @@ def _execute_microbatch_materialization( ) # Update jinja context with batch context members batch_context = microbatch_builder.build_batch_context( - incremental_batch=self.relation_exists + incremental_batch=self.relation_exists, + start_time=batch[0], ) context.update(batch_context)