From f4315ef43a8d3425bf0c22c726d85e786769c76f Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Fri, 6 Dec 2024 19:26:01 -0600 Subject: [PATCH] Update MicrobatchModelRunner to use new batch specific log events --- core/dbt/task/run.py | 41 ++++++++++++++--------------------------- 1 file changed, 14 insertions(+), 27 deletions(-) diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index d35b3fc8205..649c58a1468 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -30,12 +30,13 @@ from dbt.contracts.graph.nodes import BatchContext, HookNode, ModelNode, ResultNode from dbt.events.types import ( GenericExceptionOnRun, + LogBatchResult, LogHookEndLine, LogHookStartLine, LogModelResult, + LogStartBatch, LogStartLine, MicrobatchExecutionDebug, - SkippingDetails, ) from dbt.exceptions import CompilationError, DbtInternalError, DbtRuntimeError from dbt.graph import ResourceTypeSelector @@ -398,15 +399,18 @@ def print_batch_result_line( if result.status == NodeStatus.Error: status = result.status level = EventLevel.ERROR + elif result.status == NodeStatus.Skipped: + status = result.status + level = EventLevel.INFO else: status = result.message level = EventLevel.INFO fire_event( - LogModelResult( + LogBatchResult( description=description, status=status, - index=self.batch_idx + 1, - total=len(self.batches), + batch_index=self.batch_idx + 1, + total_batches=len(self.batches), execution_time=result.execution_time, node_info=self.node.node_info, group=group, @@ -424,28 +428,10 @@ def print_batch_start_line(self) -> None: batch_description = self.describe_batch(batch_start) fire_event( - LogStartLine( + LogStartBatch( description=batch_description, - index=self.batch_idx + 1, - total=len(self.batches), - node_info=self.node.node_info, - ) - ) - - def print_skip_batch_line(self) -> None: - if self.batch_idx is None: - return - - schema_name = getattr(self.node, "schema", "") - node_name = self.node.name - - fire_event( - SkippingDetails( - resource_type=self.node.resource_type, - schema=schema_name, - node_name=node_name, - index=self.batch_idx + 1, - total=len(self.batches), + batch_index=self.batch_idx + 1, + total_batches=len(self.batches), node_info=self.node.node_info, ) ) @@ -496,8 +482,7 @@ def on_skip(self): if self.batch_idx is None: return super().on_skip() else: - self.print_skip_batch_line() - return RunResult( + result = RunResult( node=self.node, status=RunStatus.Skipped, timing=[], @@ -508,6 +493,8 @@ def on_skip(self): failures=1, batch_results=BatchResults(failed=[self.batches[self.batch_idx]]), ) + self.print_batch_result_line(result=result) + return result def _build_succesful_run_batch_result( self,