Skip to content

Commit

Permalink
[Tidy-First]: Fix timings object for hooks and macros, and make typ…
Browse files Browse the repository at this point in the history
…es of timings explicit
  • Loading branch information
aranke committed Oct 18, 2024
1 parent 8be0635 commit e85826d
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 29 deletions.
14 changes: 11 additions & 3 deletions core/dbt/artifacts/schemas/results.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Callable, Dict, List, Optional, Sequence, Union
from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Union

from dbt.contracts.graph.nodes import ResultNode
from dbt_common.dataclass_schema import StrEnum, dbtClassMixin
Expand All @@ -10,7 +10,13 @@

@dataclass
class TimingInfo(dbtClassMixin):
name: str
"""
Represents a step in the execution of a node.
`name` should be one of: compile, execute, or other
Do not call directly, use `collect_timing_info` instead.
"""

name: Literal["compile", "execute", "other"]
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None

Expand All @@ -31,7 +37,9 @@ def to_msg_dict(self):

# This is a context manager
class collect_timing_info:
def __init__(self, name: str, callback: Callable[[TimingInfo], None]) -> None:
def __init__(
self, name: Literal["compile", "execute", "other"], callback: Callable[[TimingInfo], None]
) -> None:
self.timing_info = TimingInfo(name=name)
self.callback = callback

Expand Down
19 changes: 13 additions & 6 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
RunningStatus,
RunStatus,
TimingInfo,
collect_timing_info,
)
from dbt.artifacts.schemas.run import RunResult
from dbt.cli.flags import Flags
Expand Down Expand Up @@ -633,7 +634,6 @@ def get_hooks_by_type(self, hook_type: RunHookType) -> List[HookNode]:
def safe_run_hooks(
self, adapter: BaseAdapter, hook_type: RunHookType, extra_context: Dict[str, Any]
) -> RunStatus:
started_at = datetime.utcnow()
ordered_hooks = self.get_hooks_by_type(hook_type)

if hook_type == RunHookType.End and ordered_hooks:
Expand All @@ -653,14 +653,20 @@ def safe_run_hooks(
hook.index = idx
hook_name = f"{hook.package_name}.{hook_type}.{hook.index - 1}"
execution_time = 0.0
timing = []
timing: List[TimingInfo] = []
failures = 1

if not failed:
with collect_timing_info("compile", timing.append):
sql = self.get_hook_sql(
adapter, hook, hook.index, num_hooks, extra_context
)

started_at = timing[0].started_at or datetime.utcnow()
hook.update_event_status(
started_at=started_at.isoformat(), node_status=RunningStatus.Started
)
sql = self.get_hook_sql(adapter, hook, hook.index, num_hooks, extra_context)

fire_event(
LogHookStartLine(
statement=hook_name,
Expand All @@ -670,11 +676,12 @@ def safe_run_hooks(
)
)

status, message = get_execution_status(sql, adapter)
finished_at = datetime.utcnow()
with collect_timing_info("execute", timing.append):
status, message = get_execution_status(sql, adapter)

finished_at = timing[1].completed_at or datetime.utcnow()
hook.update_event_status(finished_at=finished_at.isoformat())
execution_time = (finished_at - started_at).total_seconds()
timing = [TimingInfo(hook_name, started_at, finished_at)]
failures = 0 if status == RunStatus.Success else 1

if status == RunStatus.Success:
Expand Down
46 changes: 26 additions & 20 deletions core/dbt/task/run_operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
import threading
import traceback
from datetime import datetime
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, List

import dbt_common.exceptions
from dbt.adapters.factory import get_adapter
from dbt.artifacts.schemas.results import RunStatus, TimingInfo
from dbt.artifacts.schemas.results import RunStatus, TimingInfo, collect_timing_info
from dbt.artifacts.schemas.run import RunResult, RunResultsArtifact
from dbt.contracts.files import FileHash
from dbt.contracts.graph.nodes import HookNode
Expand Down Expand Up @@ -51,25 +51,29 @@ def _run_unsafe(self, package_name, macro_name) -> "agate.Table":
return res

def run(self) -> RunResultsArtifact:
start = datetime.utcnow()
self.compile_manifest()
timing: List[TimingInfo] = []

success = True
with collect_timing_info("compile", timing.append):
self.compile_manifest()

start = timing[0].started_at

success = True
package_name, macro_name = self._get_macro_parts()

try:
self._run_unsafe(package_name, macro_name)
except dbt_common.exceptions.DbtBaseException as exc:
fire_event(RunningOperationCaughtError(exc=str(exc)))
fire_event(LogDebugStackTrace(exc_info=traceback.format_exc()))
success = False
except Exception as exc:
fire_event(RunningOperationUncaughtError(exc=str(exc)))
fire_event(LogDebugStackTrace(exc_info=traceback.format_exc()))
success = False
with collect_timing_info("execute", timing.append):
try:
self._run_unsafe(package_name, macro_name)
except dbt_common.exceptions.DbtBaseException as exc:
fire_event(RunningOperationCaughtError(exc=str(exc)))
fire_event(LogDebugStackTrace(exc_info=traceback.format_exc()))
success = False
except Exception as exc:
fire_event(RunningOperationUncaughtError(exc=str(exc)))
fire_event(LogDebugStackTrace(exc_info=traceback.format_exc()))
success = False

Check warning on line 74 in core/dbt/task/run_operation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run_operation.py#L67-L74

Added lines #L67 - L74 were not covered by tests

end = datetime.utcnow()
end = timing[1].completed_at

macro = (
self.manifest.find_macro_by_name(macro_name, self.config.project_name, package_name)
Expand All @@ -85,10 +89,12 @@ def run(self) -> RunResultsArtifact:
f"dbt could not find a macro with the name '{macro_name}' in any package"
)

execution_time = (end - start).total_seconds() if start and end else 0.0

run_result = RunResult(
adapter_response={},
status=RunStatus.Success if success else RunStatus.Error,
execution_time=(end - start).total_seconds(),
execution_time=execution_time,
failures=0 if success else 1,
message=None,
node=HookNode(
Expand All @@ -105,13 +111,13 @@ def run(self) -> RunResultsArtifact:
original_file_path="",
),
thread_id=threading.current_thread().name,
timing=[TimingInfo(name=macro_name, started_at=start, completed_at=end)],
timing=timing,
batch_results=None,
)

results = RunResultsArtifact.from_execution_results(
generated_at=end,
elapsed_time=(end - start).total_seconds(),
generated_at=end or datetime.utcnow(),
elapsed_time=execution_time,
args={
k: v
for k, v in self.args.__dict__.items()
Expand Down
8 changes: 8 additions & 0 deletions tests/functional/adapter/hooks/test_on_run_hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ def test_results(self, project, log_counts, my_model_run_status):
for result in results
if isinstance(result.node, HookNode)
] == [(id, str(status)) for id, status in expected_results if id.startswith("operation")]

for result in results:
if result.status == RunStatus.Skipped:
continue

timing_keys = [timing.name for timing in result.timing]
assert timing_keys == ["compile", "execute"]

assert log_counts in log_output
assert "4 project hooks, 1 view model" in log_output

Expand Down
17 changes: 17 additions & 0 deletions tests/functional/run_operations/test_run_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import pytest
import yaml

from dbt.artifacts.schemas.results import RunStatus
from dbt.tests.util import (
check_table_does_exist,
mkdir,
Expand Down Expand Up @@ -135,9 +136,25 @@ def test_run_operation_local_macro(self, project):
run_dbt(["deps"])

results, log_output = run_dbt_and_capture(["run-operation", "something_cool"])

for result in results:
if result.status == RunStatus.Skipped:
continue

timing_keys = [timing.name for timing in result.timing]
assert timing_keys == ["compile", "execute"]

assert "something cool" in log_output

results, log_output = run_dbt_and_capture(["run-operation", "pkg.something_cool"])

for result in results:
if result.status == RunStatus.Skipped:
continue

timing_keys = [timing.name for timing in result.timing]
assert timing_keys == ["compile", "execute"]

assert "something cool" in log_output

rm_dir("pkg")
Expand Down

0 comments on commit e85826d

Please sign in to comment.