Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track batch execution time for microbatch models #10828

Merged
merged 4 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20241004-163908.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Begin tracking execution time of microbatch model batches
time: 2024-10-04T16:39:08.464064-05:00
custom:
Author: QMalcolm
Issue: "10825"
34 changes: 25 additions & 9 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import functools
import os
import threading
import time
from copy import deepcopy
from datetime import datetime
from typing import AbstractSet, Any, Dict, Iterable, List, Optional, Set, Tuple, Type
Expand Down Expand Up @@ -284,7 +285,7 @@
track_model_run(self.node_index, self.num_nodes, result)
self.print_result_line(result)

def _build_run_model_result(self, model, context):
def _build_run_model_result(self, model, context, elapsed_time: float = 0.0):
result = context["load_result"]("main")
if not result:
raise DbtRuntimeError("main is not being called during running model")
Expand All @@ -296,7 +297,7 @@
status=RunStatus.Success,
timing=[],
thread_id=threading.current_thread().name,
execution_time=0,
execution_time=elapsed_time,
message=str(result.response),
adapter_response=adapter_response,
failures=result.get("failures"),
Expand Down Expand Up @@ -340,7 +341,8 @@
status=status,
timing=[],
thread_id=threading.current_thread().name,
# TODO -- why isn't this getting propagated to logs?
# The execution_time here doesn't get propagated to logs because
# `safe_run_hooks` handles the elapsed time at the node level
execution_time=0,
message=msg,
adapter_response={},
Expand All @@ -349,19 +351,28 @@
)

def _build_succesful_run_batch_result(
self, model: ModelNode, context: Dict[str, Any], batch: BatchType
self,
model: ModelNode,
context: Dict[str, Any],
batch: BatchType,
elapsed_time: float = 0.0,
) -> RunResult:
run_result = self._build_run_model_result(model, context)
run_result = self._build_run_model_result(model, context, elapsed_time)

Check warning on line 360 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L360

Added line #L360 was not covered by tests
run_result.batch_results = BatchResults(successful=[batch])
return run_result

def _build_failed_run_batch_result(self, model: ModelNode, batch: BatchType) -> RunResult:
def _build_failed_run_batch_result(
self,
model: ModelNode,
batch: BatchType,
elapsed_time: float = 0.0,
) -> RunResult:
return RunResult(
node=model,
status=RunStatus.Error,
timing=[],
thread_id=threading.current_thread().name,
execution_time=0,
execution_time=elapsed_time,
message="ERROR",
adapter_response={},
failures=1,
Expand Down Expand Up @@ -504,6 +515,7 @@
self.print_batch_start_line(batch[0], batch_idx + 1, len(batches))

exception = None
start_time = time.perf_counter()

Check warning on line 518 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L518

Added line #L518 was not covered by tests
try:
# Set start/end in context prior to re-compiling
model.config["__dbt_internal_microbatch_event_time_start"] = batch[0]
Expand All @@ -530,13 +542,17 @@
self.adapter.cache_added(relation.incorporate(dbt_created=True))

# Build result of executed batch
batch_run_result = self._build_succesful_run_batch_result(model, context, batch)
batch_run_result = self._build_succesful_run_batch_result(

Check warning on line 545 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L545

Added line #L545 was not covered by tests
model, context, batch, time.perf_counter() - start_time
)
# Update context vars for future batches
context["is_incremental"] = lambda: True
context["should_full_refresh"] = lambda: False
except Exception as e:
exception = e
batch_run_result = self._build_failed_run_batch_result(model, batch)
batch_run_result = self._build_failed_run_batch_result(

Check warning on line 553 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L553

Added line #L553 was not covered by tests
model, batch, time.perf_counter() - start_time
)

self.print_batch_result_line(
batch_run_result, batch[0], batch_idx + 1, len(batches), exception
Expand Down
7 changes: 4 additions & 3 deletions core/dbt/tests/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from contextvars import ContextVar, copy_context
from datetime import datetime
from io import StringIO
from typing import Any, Dict, List, Optional
from typing import Any, Callable, Dict, List, Optional
from unittest import mock

import pytz
Expand All @@ -17,7 +17,7 @@
from dbt.contracts.graph.manifest import Manifest
from dbt.materializations.incremental.microbatch import MicrobatchBuilder
from dbt_common.context import _INVOCATION_CONTEXT_VAR, InvocationContext
from dbt_common.events.base_types import EventLevel
from dbt_common.events.base_types import EventLevel, EventMsg
from dbt_common.events.functions import (
capture_stdout_logs,
fire_event,
Expand Down Expand Up @@ -76,6 +76,7 @@
def run_dbt(
args: Optional[List[str]] = None,
expect_pass: bool = True,
callbacks: Optional[List[Callable[[EventMsg], None]]] = None,
):
# reset global vars
reset_metadata_vars()
Expand All @@ -93,7 +94,7 @@ def run_dbt(
args.extend(["--project-dir", project_dir])
if profiles_dir and "--profiles-dir" not in args:
args.extend(["--profiles-dir", profiles_dir])
dbt = dbtRunner()
dbt = dbtRunner(callbacks=callbacks)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this pattern is used in many places, let's refactor them all at once?

For now, this should be fine?

@pytest.fixture(scope="function")
def runner(catcher: EventCatcher) -> dbtRunner:
    return dbtRunner(callbacks=[catcher.catch])

Copy link
Contributor Author

@QMalcolm QMalcolm Oct 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good idea. However, we wouldn't be able to use that in this util. I think that fixture would likely make most sense tests/ not core/dbt/tests/. This utility function (run_dbt) wouldn't have access to it. That doesn't mean we shouldn't do that work, but it's out of scope for this PR and should be its own segment of work.


res = dbt.invoke(args)

Expand Down
16 changes: 15 additions & 1 deletion tests/functional/microbatch/test_microbatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import pytest

from dbt.events.types import LogModelResult
from dbt.tests.util import (
get_artifact,
patch_microbatch_end_time,
Expand All @@ -12,6 +13,7 @@
run_dbt_and_capture,
write_file,
)
from tests.utils import EventCatcher

input_model_sql = """
{{ config(materialized='table', event_time='event_time') }}
Expand Down Expand Up @@ -186,10 +188,22 @@ class TestMicrobatchCLI(BaseMicrobatchTest):
@mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"})
def test_run_with_event_time(self, project):
# run without --event-time-start or --event-time-end - 3 expected rows in output
catcher = EventCatcher(event_to_catch=LogModelResult)

with patch_microbatch_end_time("2020-01-03 13:57:00"):
run_dbt(["run"])
run_dbt(["run"], callbacks=[catcher.catch])
self.assert_row_count(project, "microbatch_model", 3)

assert len(catcher.caught_events) == 5
batch_creation_events = 0
for caught_event in catcher.caught_events:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe assert the number of events here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can do!

if "batch 2020" in caught_event.data.description:
batch_creation_events += 1
assert caught_event.data.execution_time > 0
# 3 batches should have been run, so there should be 3 batch
# creation events
assert batch_creation_events == 3

# build model >= 2020-01-02
with patch_microbatch_end_time("2020-01-03 13:57:00"):
run_dbt(["run", "--event-time-start", "2020-01-02", "--full-refresh"])
Expand Down
Loading