Skip to content

Commit

Permalink
Begin tracking the execution time of batches for microbatch models
Browse files Browse the repository at this point in the history
  • Loading branch information
QMalcolm committed Oct 4, 2024
1 parent 5153c6b commit d68c6f5
Showing 1 changed file with 25 additions and 9 deletions.
34 changes: 25 additions & 9 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import functools
import os
import threading
import time
from copy import deepcopy
from datetime import datetime
from typing import AbstractSet, Any, Dict, Iterable, List, Optional, Set, Tuple, Type
Expand Down Expand Up @@ -284,7 +285,7 @@ def after_execute(self, result) -> None:
track_model_run(self.node_index, self.num_nodes, result)
self.print_result_line(result)

def _build_run_model_result(self, model, context):
def _build_run_model_result(self, model, context, elapsed_time: float = 0.0):
result = context["load_result"]("main")
if not result:
raise DbtRuntimeError("main is not being called during running model")
Expand All @@ -296,7 +297,7 @@ def _build_run_model_result(self, model, context):
status=RunStatus.Success,
timing=[],
thread_id=threading.current_thread().name,
execution_time=0,
execution_time=elapsed_time,
message=str(result.response),
adapter_response=adapter_response,
failures=result.get("failures"),
Expand Down Expand Up @@ -340,7 +341,8 @@ def _build_run_microbatch_model_result(
status=status,
timing=[],
thread_id=threading.current_thread().name,
# TODO -- why isn't this getting propagated to logs?
# The execution_time here doesn't get propagated to logs because
# `safe_run_hooks` handles the elapsed time at the node level
execution_time=0,
message=msg,
adapter_response={},
Expand All @@ -349,19 +351,28 @@ def _build_run_microbatch_model_result(
)

def _build_succesful_run_batch_result(
self, model: ModelNode, context: Dict[str, Any], batch: BatchType
self,
model: ModelNode,
context: Dict[str, Any],
batch: BatchType,
elapsed_time: float = 0.0,
) -> RunResult:
run_result = self._build_run_model_result(model, context)
run_result = self._build_run_model_result(model, context, elapsed_time)
run_result.batch_results = BatchResults(successful=[batch])
return run_result

def _build_failed_run_batch_result(self, model: ModelNode, batch: BatchType) -> RunResult:
def _build_failed_run_batch_result(
self,
model: ModelNode,
batch: BatchType,
elapsed_time: float = 0.0,
) -> RunResult:
return RunResult(
node=model,
status=RunStatus.Error,
timing=[],
thread_id=threading.current_thread().name,
execution_time=0,
execution_time=elapsed_time,
message="ERROR",
adapter_response={},
failures=1,
Expand Down Expand Up @@ -504,6 +515,7 @@ def _execute_microbatch_materialization(
self.print_batch_start_line(batch[0], batch_idx + 1, len(batches))

exception = None
start_time = time.perf_counter()
try:
# Set start/end in context prior to re-compiling
model.config["__dbt_internal_microbatch_event_time_start"] = batch[0]
Expand All @@ -530,13 +542,17 @@ def _execute_microbatch_materialization(
self.adapter.cache_added(relation.incorporate(dbt_created=True))

# Build result of executed batch
batch_run_result = self._build_succesful_run_batch_result(model, context, batch)
batch_run_result = self._build_succesful_run_batch_result(
model, context, batch, time.perf_counter() - start_time
)
# Update context vars for future batches
context["is_incremental"] = lambda: True
context["should_full_refresh"] = lambda: False
except Exception as e:
exception = e
batch_run_result = self._build_failed_run_batch_result(model, batch)
batch_run_result = self._build_failed_run_batch_result(
model, batch, time.perf_counter() - start_time
)

self.print_batch_result_line(
batch_run_result, batch[0], batch_idx + 1, len(batches), exception
Expand Down

0 comments on commit d68c6f5

Please sign in to comment.