From 9c652e6daa91f35c6ddadda4c343715282612749 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Tue, 10 Dec 2024 10:59:46 -0500 Subject: [PATCH] fix MicrobatchExecutionDebug message (#11071) * fix MicrobatchExecutionDebug message * Fix typing in `describe_batch` to convince mypy `batch_start` exists when needed --------- Co-authored-by: Quigley Malcolm (cherry picked from commit fc6167a2ee5291cf6c562eadcd975b48e6a34d65) --- .../unreleased/Fixes-20241209-113806.yaml | 6 +++++ core/dbt/task/run.py | 22 ++++++++++++++----- 2 files changed, 22 insertions(+), 6 deletions(-) create mode 100644 .changes/unreleased/Fixes-20241209-113806.yaml diff --git a/.changes/unreleased/Fixes-20241209-113806.yaml b/.changes/unreleased/Fixes-20241209-113806.yaml new file mode 100644 index 00000000000..37a40ebb5e8 --- /dev/null +++ b/.changes/unreleased/Fixes-20241209-113806.yaml @@ -0,0 +1,6 @@ +kind: Fixes +body: Fix debug log messages for microbatch batch execution information +time: 2024-12-09T11:38:06.972743-06:00 +custom: + Author: MichelleArk QMalcolm + Issue: "11111" diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index 33da3dbf4e7..44d45272cfb 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -376,10 +376,21 @@ def set_relation_exists(self, relation_exists: bool) -> None: def set_batches(self, batches: Dict[int, BatchType]) -> None: self.batches = batches + @property + def batch_start(self) -> Optional[datetime]: + if self.batch_idx is None: + return None + else: + return self.batches[self.batch_idx][0] + def describe_node(self) -> str: return f"{self.node.language} microbatch model {self.get_node_representation()}" - def describe_batch(self, batch_start: datetime) -> str: + def describe_batch(self) -> str: + batch_start = self.batch_start + if batch_start is None: + return "" + # Only visualize date if batch_start year/month/day formatted_batch_start = MicrobatchBuilder.format_batch_start( batch_start, self.node.config.batch_size @@ -393,8 +404,7 @@ def print_batch_result_line( if self.batch_idx is None: return - batch_start = self.batches[self.batch_idx][0] - description = self.describe_batch(batch_start) + description = self.describe_batch() group = group_lookup.get(self.node.unique_id) if result.status == NodeStatus.Error: status = result.status @@ -426,7 +436,7 @@ def print_batch_start_line(self) -> None: if batch_start is None: return - batch_description = self.describe_batch(batch_start) + batch_description = self.describe_batch() fire_event( LogStartBatch( description=batch_description, @@ -828,14 +838,14 @@ def _submit_batch( if not force_sequential_run and batch_runner.should_run_in_parallel(): fire_event( MicrobatchExecutionDebug( - msg=f"{batch_runner.describe_batch} is being run concurrently" + msg=f"{batch_runner.describe_batch()} is being run concurrently" ) ) self._submit(pool, [batch_runner], batch_results.append) else: fire_event( MicrobatchExecutionDebug( - msg=f"{batch_runner.describe_batch} is being run sequentially" + msg=f"{batch_runner.describe_batch()} is being run sequentially" ) ) batch_results.append(self.call_runner(batch_runner))