From d68c6f5105dd62daeef6f45ee99942d9e944b8e6 Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Fri, 4 Oct 2024 16:38:20 -0500 Subject: [PATCH] Begin tracking the execution time of batches for microbatch models --- core/dbt/task/run.py | 34 +++++++++++++++++++++++++--------- 1 file changed, 25 insertions(+), 9 deletions(-) diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index 93a8e261b50..ade89c6ab05 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -1,6 +1,7 @@ import functools import os import threading +import time from copy import deepcopy from datetime import datetime from typing import AbstractSet, Any, Dict, Iterable, List, Optional, Set, Tuple, Type @@ -284,7 +285,7 @@ def after_execute(self, result) -> None: track_model_run(self.node_index, self.num_nodes, result) self.print_result_line(result) - def _build_run_model_result(self, model, context): + def _build_run_model_result(self, model, context, elapsed_time: float = 0.0): result = context["load_result"]("main") if not result: raise DbtRuntimeError("main is not being called during running model") @@ -296,7 +297,7 @@ def _build_run_model_result(self, model, context): status=RunStatus.Success, timing=[], thread_id=threading.current_thread().name, - execution_time=0, + execution_time=elapsed_time, message=str(result.response), adapter_response=adapter_response, failures=result.get("failures"), @@ -340,7 +341,8 @@ def _build_run_microbatch_model_result( status=status, timing=[], thread_id=threading.current_thread().name, - # TODO -- why isn't this getting propagated to logs? + # The execution_time here doesn't get propagated to logs because + # `safe_run_hooks` handles the elapsed time at the node level execution_time=0, message=msg, adapter_response={}, @@ -349,19 +351,28 @@ def _build_run_microbatch_model_result( ) def _build_succesful_run_batch_result( - self, model: ModelNode, context: Dict[str, Any], batch: BatchType + self, + model: ModelNode, + context: Dict[str, Any], + batch: BatchType, + elapsed_time: float = 0.0, ) -> RunResult: - run_result = self._build_run_model_result(model, context) + run_result = self._build_run_model_result(model, context, elapsed_time) run_result.batch_results = BatchResults(successful=[batch]) return run_result - def _build_failed_run_batch_result(self, model: ModelNode, batch: BatchType) -> RunResult: + def _build_failed_run_batch_result( + self, + model: ModelNode, + batch: BatchType, + elapsed_time: float = 0.0, + ) -> RunResult: return RunResult( node=model, status=RunStatus.Error, timing=[], thread_id=threading.current_thread().name, - execution_time=0, + execution_time=elapsed_time, message="ERROR", adapter_response={}, failures=1, @@ -504,6 +515,7 @@ def _execute_microbatch_materialization( self.print_batch_start_line(batch[0], batch_idx + 1, len(batches)) exception = None + start_time = time.perf_counter() try: # Set start/end in context prior to re-compiling model.config["__dbt_internal_microbatch_event_time_start"] = batch[0] @@ -530,13 +542,17 @@ def _execute_microbatch_materialization( self.adapter.cache_added(relation.incorporate(dbt_created=True)) # Build result of executed batch - batch_run_result = self._build_succesful_run_batch_result(model, context, batch) + batch_run_result = self._build_succesful_run_batch_result( + model, context, batch, time.perf_counter() - start_time + ) # Update context vars for future batches context["is_incremental"] = lambda: True context["should_full_refresh"] = lambda: False except Exception as e: exception = e - batch_run_result = self._build_failed_run_batch_result(model, batch) + batch_run_result = self._build_failed_run_batch_result( + model, batch, time.perf_counter() - start_time + ) self.print_batch_result_line( batch_run_result, batch[0], batch_idx + 1, len(batches), exception