Skip to content

Commit

Permalink
Update MicrobatchModelRunner to use new batch specific log events
Browse files Browse the repository at this point in the history
  • Loading branch information
QMalcolm committed Dec 7, 2024
1 parent 3fa7bbc commit f4315ef
Showing 1 changed file with 14 additions and 27 deletions.
41 changes: 14 additions & 27 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,13 @@
from dbt.contracts.graph.nodes import BatchContext, HookNode, ModelNode, ResultNode
from dbt.events.types import (
GenericExceptionOnRun,
LogBatchResult,
LogHookEndLine,
LogHookStartLine,
LogModelResult,
LogStartBatch,
LogStartLine,
MicrobatchExecutionDebug,
SkippingDetails,
)
from dbt.exceptions import CompilationError, DbtInternalError, DbtRuntimeError
from dbt.graph import ResourceTypeSelector
Expand Down Expand Up @@ -398,15 +399,18 @@ def print_batch_result_line(
if result.status == NodeStatus.Error:
status = result.status
level = EventLevel.ERROR
elif result.status == NodeStatus.Skipped:
status = result.status
level = EventLevel.INFO
else:
status = result.message
level = EventLevel.INFO
fire_event(
LogModelResult(
LogBatchResult(
description=description,
status=status,
index=self.batch_idx + 1,
total=len(self.batches),
batch_index=self.batch_idx + 1,
total_batches=len(self.batches),
execution_time=result.execution_time,
node_info=self.node.node_info,
group=group,
Expand All @@ -424,28 +428,10 @@ def print_batch_start_line(self) -> None:

batch_description = self.describe_batch(batch_start)
fire_event(
LogStartLine(
LogStartBatch(
description=batch_description,
index=self.batch_idx + 1,
total=len(self.batches),
node_info=self.node.node_info,
)
)

def print_skip_batch_line(self) -> None:
if self.batch_idx is None:
return

schema_name = getattr(self.node, "schema", "")
node_name = self.node.name

fire_event(
SkippingDetails(
resource_type=self.node.resource_type,
schema=schema_name,
node_name=node_name,
index=self.batch_idx + 1,
total=len(self.batches),
batch_index=self.batch_idx + 1,
total_batches=len(self.batches),
node_info=self.node.node_info,
)
)
Expand Down Expand Up @@ -496,8 +482,7 @@ def on_skip(self):
if self.batch_idx is None:
return super().on_skip()
else:
self.print_skip_batch_line()
return RunResult(
result = RunResult(
node=self.node,
status=RunStatus.Skipped,
timing=[],
Expand All @@ -508,6 +493,8 @@ def on_skip(self):
failures=1,
batch_results=BatchResults(failed=[self.batches[self.batch_idx]]),
)
self.print_batch_result_line(result=result)
return result

def _build_succesful_run_batch_result(
self,
Expand Down

0 comments on commit f4315ef

Please sign in to comment.