From 5db0b81da1cf9d2b06aa58669dfe6f0ba5719b43 Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Tue, 8 Oct 2024 14:32:58 -0500 Subject: [PATCH] Track batch execution time for microbatch models (#10828) * Begin testing that microbatch execution times are being tracked and set * Begin tracking the execution time of batches for microbatch models * Add changie doc * Additional assertions in microbatch testing --- .../unreleased/Fixes-20241004-163908.yaml | 6 ++++ core/dbt/task/run.py | 34 ++++++++++++++----- core/dbt/tests/util.py | 7 ++-- .../functional/microbatch/test_microbatch.py | 16 ++++++++- 4 files changed, 50 insertions(+), 13 deletions(-) create mode 100644 .changes/unreleased/Fixes-20241004-163908.yaml diff --git a/.changes/unreleased/Fixes-20241004-163908.yaml b/.changes/unreleased/Fixes-20241004-163908.yaml new file mode 100644 index 00000000000..12aec93d0ed --- /dev/null +++ b/.changes/unreleased/Fixes-20241004-163908.yaml @@ -0,0 +1,6 @@ +kind: Fixes +body: Begin tracking execution time of microbatch model batches +time: 2024-10-04T16:39:08.464064-05:00 +custom: + Author: QMalcolm + Issue: "10825" diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index 93a8e261b50..ade89c6ab05 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -1,6 +1,7 @@ import functools import os import threading +import time from copy import deepcopy from datetime import datetime from typing import AbstractSet, Any, Dict, Iterable, List, Optional, Set, Tuple, Type @@ -284,7 +285,7 @@ def after_execute(self, result) -> None: track_model_run(self.node_index, self.num_nodes, result) self.print_result_line(result) - def _build_run_model_result(self, model, context): + def _build_run_model_result(self, model, context, elapsed_time: float = 0.0): result = context["load_result"]("main") if not result: raise DbtRuntimeError("main is not being called during running model") @@ -296,7 +297,7 @@ def _build_run_model_result(self, model, context): status=RunStatus.Success, timing=[], thread_id=threading.current_thread().name, - execution_time=0, + execution_time=elapsed_time, message=str(result.response), adapter_response=adapter_response, failures=result.get("failures"), @@ -340,7 +341,8 @@ def _build_run_microbatch_model_result( status=status, timing=[], thread_id=threading.current_thread().name, - # TODO -- why isn't this getting propagated to logs? + # The execution_time here doesn't get propagated to logs because + # `safe_run_hooks` handles the elapsed time at the node level execution_time=0, message=msg, adapter_response={}, @@ -349,19 +351,28 @@ def _build_run_microbatch_model_result( ) def _build_succesful_run_batch_result( - self, model: ModelNode, context: Dict[str, Any], batch: BatchType + self, + model: ModelNode, + context: Dict[str, Any], + batch: BatchType, + elapsed_time: float = 0.0, ) -> RunResult: - run_result = self._build_run_model_result(model, context) + run_result = self._build_run_model_result(model, context, elapsed_time) run_result.batch_results = BatchResults(successful=[batch]) return run_result - def _build_failed_run_batch_result(self, model: ModelNode, batch: BatchType) -> RunResult: + def _build_failed_run_batch_result( + self, + model: ModelNode, + batch: BatchType, + elapsed_time: float = 0.0, + ) -> RunResult: return RunResult( node=model, status=RunStatus.Error, timing=[], thread_id=threading.current_thread().name, - execution_time=0, + execution_time=elapsed_time, message="ERROR", adapter_response={}, failures=1, @@ -504,6 +515,7 @@ def _execute_microbatch_materialization( self.print_batch_start_line(batch[0], batch_idx + 1, len(batches)) exception = None + start_time = time.perf_counter() try: # Set start/end in context prior to re-compiling model.config["__dbt_internal_microbatch_event_time_start"] = batch[0] @@ -530,13 +542,17 @@ def _execute_microbatch_materialization( self.adapter.cache_added(relation.incorporate(dbt_created=True)) # Build result of executed batch - batch_run_result = self._build_succesful_run_batch_result(model, context, batch) + batch_run_result = self._build_succesful_run_batch_result( + model, context, batch, time.perf_counter() - start_time + ) # Update context vars for future batches context["is_incremental"] = lambda: True context["should_full_refresh"] = lambda: False except Exception as e: exception = e - batch_run_result = self._build_failed_run_batch_result(model, batch) + batch_run_result = self._build_failed_run_batch_result( + model, batch, time.perf_counter() - start_time + ) self.print_batch_result_line( batch_run_result, batch[0], batch_idx + 1, len(batches), exception diff --git a/core/dbt/tests/util.py b/core/dbt/tests/util.py index 19eeecd470c..d745560d2ed 100644 --- a/core/dbt/tests/util.py +++ b/core/dbt/tests/util.py @@ -5,7 +5,7 @@ from contextvars import ContextVar, copy_context from datetime import datetime from io import StringIO -from typing import Any, Dict, List, Optional +from typing import Any, Callable, Dict, List, Optional from unittest import mock import pytz @@ -17,7 +17,7 @@ from dbt.contracts.graph.manifest import Manifest from dbt.materializations.incremental.microbatch import MicrobatchBuilder from dbt_common.context import _INVOCATION_CONTEXT_VAR, InvocationContext -from dbt_common.events.base_types import EventLevel +from dbt_common.events.base_types import EventLevel, EventMsg from dbt_common.events.functions import ( capture_stdout_logs, fire_event, @@ -76,6 +76,7 @@ def run_dbt( args: Optional[List[str]] = None, expect_pass: bool = True, + callbacks: Optional[List[Callable[[EventMsg], None]]] = None, ): # reset global vars reset_metadata_vars() @@ -93,7 +94,7 @@ def run_dbt( args.extend(["--project-dir", project_dir]) if profiles_dir and "--profiles-dir" not in args: args.extend(["--profiles-dir", profiles_dir]) - dbt = dbtRunner() + dbt = dbtRunner(callbacks=callbacks) res = dbt.invoke(args) diff --git a/tests/functional/microbatch/test_microbatch.py b/tests/functional/microbatch/test_microbatch.py index c867acfac66..f6b49e1405a 100644 --- a/tests/functional/microbatch/test_microbatch.py +++ b/tests/functional/microbatch/test_microbatch.py @@ -3,6 +3,7 @@ import pytest +from dbt.events.types import LogModelResult from dbt.tests.util import ( get_artifact, patch_microbatch_end_time, @@ -12,6 +13,7 @@ run_dbt_and_capture, write_file, ) +from tests.utils import EventCatcher input_model_sql = """ {{ config(materialized='table', event_time='event_time') }} @@ -186,10 +188,22 @@ class TestMicrobatchCLI(BaseMicrobatchTest): @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) def test_run_with_event_time(self, project): # run without --event-time-start or --event-time-end - 3 expected rows in output + catcher = EventCatcher(event_to_catch=LogModelResult) + with patch_microbatch_end_time("2020-01-03 13:57:00"): - run_dbt(["run"]) + run_dbt(["run"], callbacks=[catcher.catch]) self.assert_row_count(project, "microbatch_model", 3) + assert len(catcher.caught_events) == 5 + batch_creation_events = 0 + for caught_event in catcher.caught_events: + if "batch 2020" in caught_event.data.description: + batch_creation_events += 1 + assert caught_event.data.execution_time > 0 + # 3 batches should have been run, so there should be 3 batch + # creation events + assert batch_creation_events == 3 + # build model >= 2020-01-02 with patch_microbatch_end_time("2020-01-03 13:57:00"): run_dbt(["run", "--event-time-start", "2020-01-02", "--full-refresh"])