From 902fea585dc6eeea4a37eb161ace6539137e3cae Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Thu, 28 Nov 2024 16:52:33 -0500 Subject: [PATCH] fix MicrobatchExecutionDebug message --- core/dbt/task/run.py | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index b9cf7e22a08..514cbe9ad91 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -374,13 +374,23 @@ def set_relation_exists(self, relation_exists: bool) -> None: def set_batches(self, batches: Dict[int, BatchType]) -> None: self.batches = batches + @property + def batch_start(self) -> Optional[datetime]: + if self.batch_idx is None: + return None + else: + return self.batches[self.batch_idx][0] + def describe_node(self) -> str: return f"{self.node.language} microbatch model {self.get_node_representation()}" - def describe_batch(self, batch_start: datetime) -> str: + def describe_batch(self) -> str: + if self.batch_idx is None: + return "" + # Only visualize date if batch_start year/month/day formatted_batch_start = MicrobatchBuilder.format_batch_start( - batch_start, self.node.config.batch_size + self.batch_start, self.node.config.batch_size ) return f"batch {formatted_batch_start} of {self.get_node_representation()}" @@ -391,8 +401,7 @@ def print_batch_result_line( if self.batch_idx is None: return - batch_start = self.batches[self.batch_idx][0] - description = self.describe_batch(batch_start) + description = self.describe_batch() group = group_lookup.get(self.node.unique_id) if result.status == NodeStatus.Error: status = result.status @@ -421,7 +430,7 @@ def print_batch_start_line(self) -> None: if batch_start is None: return - batch_description = self.describe_batch(batch_start) + batch_description = self.describe_batch() fire_event( LogStartLine( description=batch_description, @@ -724,14 +733,14 @@ def handle_microbatch_model( if runner._should_run_in_parallel(relation_exists): fire_event( MicrobatchExecutionDebug( - msg=f"{batch_runner.describe_batch} is being run concurrently" + msg=f"{batch_runner.describe_batch()} is being run concurrently" ) ) self._submit(pool, [batch_runner], batch_results.append) else: fire_event( MicrobatchExecutionDebug( - msg=f"{batch_runner.describe_batch} is being run sequentially" + msg=f"{batch_runner.describe_batch()} is being run sequentially" ) ) batch_results.append(self.call_runner(batch_runner))