From c54e228ce847cd4926df52608301516035bcd5db Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Wed, 11 Sep 2024 19:56:32 +0100 Subject: [PATCH 01/24] wip: refactor safe_run_hooks and add more metadata in --- core/dbt/contracts/graph/nodes.py | 1 + core/dbt/events/types.py | 11 ++- core/dbt/task/run.py | 129 ++++++++++++++++-------------- 3 files changed, 77 insertions(+), 64 deletions(-) diff --git a/core/dbt/contracts/graph/nodes.py b/core/dbt/contracts/graph/nodes.py index b28910c0de3..e275c8761f5 100644 --- a/core/dbt/contracts/graph/nodes.py +++ b/core/dbt/contracts/graph/nodes.py @@ -1654,6 +1654,7 @@ class ParsedMacroPatch(ParsedPatch): ResultNode = Union[ ManifestNode, SourceDefinition, + HookNode, ] # All nodes that can be in the DAG diff --git a/core/dbt/events/types.py b/core/dbt/events/types.py index dfd95eec6bf..6b33cb01bfd 100644 --- a/core/dbt/events/types.py +++ b/core/dbt/events/types.py @@ -1513,10 +1513,17 @@ def code(self) -> str: return "Q033" def message(self) -> str: - msg = f"OK hook: {self.statement}" + if self.status == "success": + info = "OK" + status = green(info) + else: + info = "ERROR" + status = red(info) + msg = f"{info} hook: {self.statement}" + return format_fancy_output_line( msg=msg, - status=green(self.status), + status=status, index=self.index, total=self.total, execution_time=self.execution_time, diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index 73a2b26acce..f2625a628a7 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -1,11 +1,10 @@ import functools import threading -import time from datetime import datetime from typing import AbstractSet, Any, Dict, Iterable, List, Optional, Set, Tuple, Type from dbt import tracking, utils -from dbt.adapters.base import BaseRelation +from dbt.adapters.base import BaseAdapter, BaseRelation from dbt.adapters.events.types import ( DatabaseErrorRunningHook, FinishedRunningStats, @@ -14,10 +13,10 @@ from dbt.adapters.exceptions import MissingMaterializationError from dbt.artifacts.resources import Hook from dbt.artifacts.schemas.results import ( - BaseResult, NodeStatus, RunningStatus, RunStatus, + TimingInfo, ) from dbt.artifacts.schemas.run import RunResult from dbt.cli.flags import Flags @@ -48,25 +47,6 @@ from .printer import get_counts, print_run_end_messages -class Timer: - def __init__(self) -> None: - self.start = None - self.end = None - - @property - def elapsed(self): - if self.start is None or self.end is None: - return None - return self.end - self.start - - def __enter__(self): - self.start = time.time() - return self - - def __exit__(self, exc_type, exc_value, exc_tracebck): - self.end = time.time() - - @functools.total_ordering class BiggestName(str): def __lt__(self, other): @@ -101,6 +81,22 @@ def get_hook(source, index): return Hook.from_dict(hook_dict) +def get_execution_status(sql: str, adapter: BaseAdapter) -> Tuple[RunStatus, str]: + if not sql.strip(): + return RunStatus.Success, "OK" + + message = "" + status = RunStatus.Success + try: + response, _ = adapter.execute(sql, auto_begin=False, fetch=False) + message = response._message + except DbtRuntimeError as exc: + status = RunStatus.Error + message = exc.msg + finally: + return status, message + + def track_model_run(index, num_nodes, run_model_result): if tracking.active_user is None: raise DbtInternalError("cannot track model run with no active user") @@ -336,7 +332,10 @@ def get_hooks_by_type(self, hook_type: RunHookType) -> List[HookNode]: hooks.sort(key=self._hook_keyfunc) return hooks - def run_hooks(self, adapter, hook_type: RunHookType, extra_context) -> None: + def safe_run_hooks( + self, adapter: BaseAdapter, hook_type: RunHookType, extra_context: Dict[str, Any] + ) -> None: + started_at = datetime.utcnow() ordered_hooks = self.get_hooks_by_type(hook_type) # on-run-* hooks should run outside of a transaction. This happens @@ -350,71 +349,77 @@ def run_hooks(self, adapter, hook_type: RunHookType, extra_context) -> None: fire_event(Formatting("")) fire_event(HooksRunning(num_hooks=num_hooks, hook_type=hook_type)) - for idx, hook in enumerate(ordered_hooks, start=1): - # We want to include node_info in the appropriate log files, so use - # log_contextvars + message = None + idx = 0 + timings = [] + total_execution_time = 0.0 + + for idx in range(len(ordered_hooks)): + hook = ordered_hooks[idx] with log_contextvars(node_info=hook.node_info): hook.update_event_status( - started_at=datetime.utcnow().isoformat(), node_status=RunningStatus.Started + started_at=started_at.isoformat(), node_status=RunningStatus.Started ) - sql = self.get_hook_sql(adapter, hook, idx, num_hooks, extra_context) + sql = self.get_hook_sql(adapter, hook, idx + 1, num_hooks, extra_context) - hook_text = "{}.{}.{}".format(hook.package_name, hook_type, hook.index) + hook_name = f"{hook.package_name}.{hook_type}.{idx + 1}" fire_event( LogHookStartLine( - statement=hook_text, - index=idx, + statement=hook_name, + index=idx + 1, total=num_hooks, node_info=hook.node_info, ) ) - with Timer() as timer: - if len(sql.strip()) > 0: - response, _ = adapter.execute(sql, auto_begin=False, fetch=False) - status = response._message - else: - status = "OK" + status, message = get_execution_status(sql, adapter) + finished_at = datetime.utcnow() + timings.append(TimingInfo(hook_name, started_at, finished_at)) + hook.update_event_status(finished_at=finished_at.isoformat(), node_status=status) self.ran_hooks.append(hook) - hook.update_event_status(finished_at=datetime.utcnow().isoformat()) - hook.update_event_status(node_status=RunStatus.Success) + + execution_time = (finished_at - started_at).total_seconds() + total_execution_time += execution_time + fire_event( LogHookEndLine( - statement=hook_text, + statement=hook_name, status=status, - index=idx, + index=idx + 1, total=num_hooks, - execution_time=timer.elapsed, + execution_time=execution_time, node_info=hook.node_info, ) ) - # `_event_status` dict is only used for logging. Make sure - # it gets deleted when we're done with it hook.clear_event_status() - self._total_executed += len(ordered_hooks) + if status != RunStatus.Success: + break - fire_event(Formatting("")) + self._total_executed += idx + 1 - def safe_run_hooks( - self, adapter, hook_type: RunHookType, extra_context: Dict[str, Any] - ) -> None: - try: - self.run_hooks(adapter, hook_type, extra_context) - except DbtRuntimeError as exc: + if status == RunStatus.Success: + node_result_message = f"{idx + 1} {hook_type.value} hooks executed successfully" + num_failures = 0 + else: + fire_event(Formatting("")) fire_event(DatabaseErrorRunningHook(hook_type=hook_type.value)) - self.node_results.append( - BaseResult( - status=RunStatus.Error, - thread_id="main", - timing=[], - message=f"{hook_type.value} failed, error:\n {exc.msg}", - adapter_response={}, - execution_time=0, - failures=1, - ) + node_result_message = f"{hook_type.value} failed, error:\n {message}" + num_failures = 1 + + self.node_results.append( + RunResult( + status=status, + thread_id="main", + timing=timings, + message=node_result_message, + adapter_response={}, + execution_time=total_execution_time, + failures=num_failures, + node=hook, ) + ) def print_results_line(self, results, execution_time) -> None: nodes = [r.node for r in results if hasattr(r, "node")] + self.ran_hooks From 7ad6c39a901c1001a2c890a61788fefb7994362f Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Wed, 11 Sep 2024 19:59:02 +0100 Subject: [PATCH 02/24] Add back comment about log_contextvars --- core/dbt/task/run.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index f2625a628a7..ee82b2dc998 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -356,6 +356,8 @@ def safe_run_hooks( for idx in range(len(ordered_hooks)): hook = ordered_hooks[idx] + # We want to include node_info in the appropriate log files, so use + # log_contextvars with log_contextvars(node_info=hook.node_info): hook.update_event_status( started_at=started_at.isoformat(), node_status=RunningStatus.Started From 7fcfb0caf745d72bd5fc6ef7db219d3bbe99b5f9 Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Wed, 11 Sep 2024 20:27:35 +0100 Subject: [PATCH 03/24] make printer correct --- core/dbt/task/printer.py | 19 +++++++++++++++---- core/dbt/task/run.py | 21 +++++++++++---------- 2 files changed, 26 insertions(+), 14 deletions(-) diff --git a/core/dbt/task/printer.py b/core/dbt/task/printer.py index a5995d50b40..2fcbddfef3a 100644 --- a/core/dbt/task/printer.py +++ b/core/dbt/task/printer.py @@ -1,7 +1,7 @@ from typing import Dict, Optional from dbt.artifacts.schemas.results import NodeStatus -from dbt.contracts.graph.nodes import Group +from dbt.contracts.graph.nodes import Group, HookNode from dbt.events.types import ( CheckNodeTestFailure, EndOfRunSummary, @@ -61,9 +61,20 @@ def print_run_status_line(results) -> None: } for r in results: - result_type = interpret_run_result(r) - stats[result_type] += 1 - stats["total"] += 1 + # All on-run-* nodes are returned as a single result, so we need to unpack it to get the correct counts + if isinstance(r.node, HookNode): + n = r.node.index or 0 + stats["total"] += n + # Hooks "fail fast", so there will be at most one error + if r.status == NodeStatus.Error: + stats["error"] += 1 + stats["pass"] += n - 1 + else: + stats["pass"] += n + else: + result_type = interpret_run_result(r) + stats[result_type] += 1 + stats["total"] += 1 fire_event(Formatting("")) fire_event(StatsLine(stats=stats)) diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index ee82b2dc998..12026d3ae30 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -85,10 +85,9 @@ def get_execution_status(sql: str, adapter: BaseAdapter) -> Tuple[RunStatus, str if not sql.strip(): return RunStatus.Success, "OK" - message = "" - status = RunStatus.Success try: response, _ = adapter.execute(sql, auto_begin=False, fetch=False) + status = RunStatus.Success message = response._message except DbtRuntimeError as exc: status = RunStatus.Error @@ -349,26 +348,26 @@ def safe_run_hooks( fire_event(Formatting("")) fire_event(HooksRunning(num_hooks=num_hooks, hook_type=hook_type)) - message = None idx = 0 timings = [] total_execution_time = 0.0 - for idx in range(len(ordered_hooks)): + while idx < num_hooks: hook = ordered_hooks[idx] # We want to include node_info in the appropriate log files, so use # log_contextvars with log_contextvars(node_info=hook.node_info): + hook.index = idx + 1 hook.update_event_status( started_at=started_at.isoformat(), node_status=RunningStatus.Started ) - sql = self.get_hook_sql(adapter, hook, idx + 1, num_hooks, extra_context) + sql = self.get_hook_sql(adapter, hook, hook.index, num_hooks, extra_context) - hook_name = f"{hook.package_name}.{hook_type}.{idx + 1}" + hook_name = f"{hook.package_name}.{hook_type}.{hook.index}" fire_event( LogHookStartLine( statement=hook_name, - index=idx + 1, + index=hook.index, total=num_hooks, node_info=hook.node_info, ) @@ -388,7 +387,7 @@ def safe_run_hooks( LogHookEndLine( statement=hook_name, status=status, - index=idx + 1, + index=hook.index, total=num_hooks, execution_time=execution_time, node_info=hook.node_info, @@ -399,10 +398,12 @@ def safe_run_hooks( if status != RunStatus.Success: break - self._total_executed += idx + 1 + idx += 1 + + self._total_executed += hook.index or 0 if status == RunStatus.Success: - node_result_message = f"{idx + 1} {hook_type.value} hooks executed successfully" + node_result_message = f"{hook.index} {hook_type.value} hooks executed successfully" num_failures = 0 else: fire_event(Formatting("")) From 064f60ec08010d487de4b29dcc0f0a61de18a866 Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Thu, 12 Sep 2024 15:49:10 +0100 Subject: [PATCH 04/24] fix existing tests --- tests/functional/adapter/hooks/test_run_hooks.py | 6 +++++- tests/functional/artifacts/test_run_results.py | 2 +- tests/functional/dependencies/test_local_dependency.py | 2 +- tests/functional/schema_tests/test_schema_v2_tests.py | 8 ++++---- 4 files changed, 11 insertions(+), 7 deletions(-) diff --git a/tests/functional/adapter/hooks/test_run_hooks.py b/tests/functional/adapter/hooks/test_run_hooks.py index f8bec5c6aeb..030988b7c5c 100644 --- a/tests/functional/adapter/hooks/test_run_hooks.py +++ b/tests/functional/adapter/hooks/test_run_hooks.py @@ -4,6 +4,7 @@ import pytest from dbt.tests.util import check_table_does_not_exist, run_dbt +from dbt_common.exceptions import DbtDatabaseError from tests.functional.adapter.hooks.fixtures import ( macros__before_and_after, macros__hook, @@ -158,4 +159,7 @@ def project_config_update(self): } def test_missing_column_pre_hook(self, project): - run_dbt(["run"], expect_pass=False) + with pytest.raises( + DbtDatabaseError, match=r'relation "(\w+)\.test_column" does not exist' + ): + run_dbt(["run"], expect_pass=False) diff --git a/tests/functional/artifacts/test_run_results.py b/tests/functional/artifacts/test_run_results.py index dea947f342b..a474f80c4c7 100644 --- a/tests/functional/artifacts/test_run_results.py +++ b/tests/functional/artifacts/test_run_results.py @@ -55,7 +55,7 @@ def project_config_update(self): def test_results_serializable(self, project): results = run_dbt(["run"]) - assert len(results.results) == 1 + assert len(results.results) == 2 # This test is failing due to the faulty assumptions that run_results.json would diff --git a/tests/functional/dependencies/test_local_dependency.py b/tests/functional/dependencies/test_local_dependency.py index d26345f2da6..f96d22bbbed 100644 --- a/tests/functional/dependencies/test_local_dependency.py +++ b/tests/functional/dependencies/test_local_dependency.py @@ -328,7 +328,7 @@ def test_hook_dependency(self, prepare_dependencies, project): run_dbt(["deps", "--vars", cli_vars]) results = run_dbt(["run", "--vars", cli_vars]) - assert len(results) == 2 + assert len(results) == 3 check_relations_equal(project.adapter, ["actual", "expected"]) diff --git a/tests/functional/schema_tests/test_schema_v2_tests.py b/tests/functional/schema_tests/test_schema_v2_tests.py index ea33e62bce3..519ff64e853 100644 --- a/tests/functional/schema_tests/test_schema_v2_tests.py +++ b/tests/functional/schema_tests/test_schema_v2_tests.py @@ -474,9 +474,9 @@ def test_hooks_do_run_for_tests( ): # This passes now that hooks run, a behavior we changed in v1.0 results = run_dbt(["test", "--model", "ephemeral"]) - assert len(results) == 1 + assert len(results) == 3 for result in results: - assert result.status == "pass" + assert result.status in ("pass", "success") assert not result.skipped assert result.failures == 0, "test {} failed".format(result.node.name) @@ -507,9 +507,9 @@ def test_these_hooks_dont_run_for_tests( ): # This would fail if the hooks ran results = run_dbt(["test", "--model", "ephemeral"]) - assert len(results) == 1 + assert len(results) == 3 for result in results: - assert result.status == "pass" + assert result.status in ("pass", "success") assert not result.skipped assert result.failures == 0, "test {} failed".format(result.node.name) From 526b31c6f39b1217efb9a702d061435b033a9030 Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Thu, 12 Sep 2024 17:24:53 +0100 Subject: [PATCH 05/24] only execute nodes if pre-hooks succeeded --- core/dbt/task/clone.py | 5 +++-- core/dbt/task/freshness.py | 30 +++++++++++++++++++++----- core/dbt/task/run.py | 11 ++++++---- core/dbt/task/runnable.py | 44 ++++++++++++++++++++++++++++---------- 4 files changed, 68 insertions(+), 22 deletions(-) diff --git a/core/dbt/task/clone.py b/core/dbt/task/clone.py index 09a7942aa31..2dc90d172f8 100644 --- a/core/dbt/task/clone.py +++ b/core/dbt/task/clone.py @@ -1,7 +1,7 @@ import threading from typing import AbstractSet, Any, Collection, Iterable, List, Optional, Set, Type -from dbt.adapters.base import BaseRelation +from dbt.adapters.base import BaseAdapter, BaseRelation from dbt.artifacts.resources.types import NodeType from dbt.artifacts.schemas.run import RunResult, RunStatus from dbt.clients.jinja import MacroGenerator @@ -124,7 +124,7 @@ def get_model_schemas(self, adapter, selected_uids: Iterable[str]) -> Set[BaseRe return result - def before_run(self, adapter, selected_uids: AbstractSet[str]): + def before_run(self, adapter: BaseAdapter, selected_uids: AbstractSet[str]) -> RunStatus: with adapter.connection_named("master"): self.defer_to_manifest() # only create target schemas, but also cache defer_relation schemas @@ -132,6 +132,7 @@ def before_run(self, adapter, selected_uids: AbstractSet[str]): self.create_schemas(adapter, schemas_to_create) schemas_to_cache = self.get_model_schemas(adapter, selected_uids) self.populate_adapter_cache(adapter, schemas_to_cache) + return RunStatus.Success @property def resource_types(self) -> List[NodeType]: diff --git a/core/dbt/task/freshness.py b/core/dbt/task/freshness.py index eb1508acb36..06e78b17c7b 100644 --- a/core/dbt/task/freshness.py +++ b/core/dbt/task/freshness.py @@ -4,6 +4,7 @@ from typing import AbstractSet, Dict, List, Optional, Type from dbt import deprecations +from dbt.adapters.base import BaseAdapter from dbt.adapters.base.impl import FreshnessResponse from dbt.adapters.base.relation import BaseRelation from dbt.adapters.capability import Capability @@ -204,10 +205,25 @@ def get_node_selector(self): resource_types=[NodeType.Source], ) - def before_run(self, adapter, selected_uids: AbstractSet[str]) -> None: - super().before_run(adapter, selected_uids) - if adapter.supports(Capability.TableLastModifiedMetadataBatch): - self.populate_metadata_freshness_cache(adapter, selected_uids) + def before_run(self, adapter: BaseAdapter, selected_uids: AbstractSet[str]) -> RunStatus: + populate_metadata_freshness_cache_status = RunStatus.Success + + before_run_status = super().before_run(adapter, selected_uids) + + if before_run_status == RunStatus.Success and adapter.supports( + Capability.TableLastModifiedMetadataBatch + ): + populate_metadata_freshness_cache_status = self.populate_metadata_freshness_cache( + adapter, selected_uids + ) + + if ( + before_run_status == RunStatus.Success + and populate_metadata_freshness_cache_status == RunStatus.Success + ): + return RunStatus.Success + else: + return RunStatus.Error def get_runner(self, node) -> BaseRunner: freshness_runner = super().get_runner(node) @@ -243,7 +259,9 @@ def get_hooks_by_type(self, hook_type: RunHookType) -> List[HookNode]: deprecations.warn("source-freshness-project-hooks") return [] - def populate_metadata_freshness_cache(self, adapter, selected_uids: AbstractSet[str]) -> None: + def populate_metadata_freshness_cache( + self, adapter, selected_uids: AbstractSet[str] + ) -> RunStatus: if self.manifest is None: raise DbtInternalError("Manifest must be set to populate metadata freshness cache") @@ -266,6 +284,7 @@ def populate_metadata_freshness_cache(self, adapter, selected_uids: AbstractSet[ batch_metadata_sources ) self._metadata_freshness_cache.update(metadata_freshness_results) + return RunStatus.Success except Exception as e: # This error handling is intentionally very coarse. # If anything goes wrong during batch metadata calculation, we can safely @@ -276,6 +295,7 @@ def populate_metadata_freshness_cache(self, adapter, selected_uids: AbstractSet[ Note(msg=f"Metadata freshness could not be computed in batch: {e}"), EventLevel.WARN, ) + return RunStatus.Error def get_freshness_metadata_cache(self) -> Dict[BaseRelation, FreshnessResult]: return self._metadata_freshness_cache diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index 12026d3ae30..d26c381acd8 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -333,7 +333,7 @@ def get_hooks_by_type(self, hook_type: RunHookType) -> List[HookNode]: def safe_run_hooks( self, adapter: BaseAdapter, hook_type: RunHookType, extra_context: Dict[str, Any] - ) -> None: + ) -> RunStatus: started_at = datetime.utcnow() ordered_hooks = self.get_hooks_by_type(hook_type) @@ -342,7 +342,7 @@ def safe_run_hooks( # is created. adapter.clear_transaction() if not ordered_hooks: - return + return RunStatus.Success num_hooks = len(ordered_hooks) fire_event(Formatting("")) @@ -424,6 +424,8 @@ def safe_run_hooks( ) ) + return status + def print_results_line(self, results, execution_time) -> None: nodes = [r.node for r in results if hasattr(r, "node")] + self.ran_hooks stat_line = get_counts(nodes) @@ -440,13 +442,14 @@ def print_results_line(self, results, execution_time) -> None: ) ) - def before_run(self, adapter, selected_uids: AbstractSet[str]) -> None: + def before_run(self, adapter: BaseAdapter, selected_uids: AbstractSet[str]) -> RunStatus: with adapter.connection_named("master"): self.defer_to_manifest() required_schemas = self.get_model_schemas(adapter, selected_uids) self.create_schemas(adapter, required_schemas) self.populate_adapter_cache(adapter, required_schemas) - self.safe_run_hooks(adapter, RunHookType.Start, {}) + run_hooks_status = self.safe_run_hooks(adapter, RunHookType.Start, {}) + return run_hooks_status def after_run(self, adapter, results) -> None: # in on-run-end hooks, provide the value 'database_schemas', which is a diff --git a/core/dbt/task/runnable.py b/core/dbt/task/runnable.py index 19a2a968df8..7027f891d82 100644 --- a/core/dbt/task/runnable.py +++ b/core/dbt/task/runnable.py @@ -11,7 +11,7 @@ import dbt.tracking import dbt.utils import dbt_common.utils.formatting -from dbt.adapters.base import BaseRelation +from dbt.adapters.base import BaseAdapter, BaseRelation from dbt.adapters.factory import get_adapter from dbt.artifacts.schemas.results import ( BaseResult, @@ -65,6 +65,18 @@ class GraphRunnableMode(StrEnum): Independent = "independent" +def mark_nodes_as_skipped( + executed_node_ids: Set[str], flattened_nodes: List[ResultNode], message: str +) -> List[RunResult]: + node_results = [] + + for r in flattened_nodes: + if r.unique_id not in executed_node_ids: + node_results.append(RunResult.from_node(r, RunStatus.Skipped, message)) + + return node_results + + class GraphRunnableTask(ConfiguredTask): MARK_DEPENDENT_ERRORS_STATUSES = [NodeStatus.Error] @@ -401,13 +413,12 @@ def execute_nodes(self): except FailFastError as failure: self._cancel_connections(pool) - executed_node_ids = [r.node.unique_id for r in self.node_results] - - for r in self._flattened_nodes: - if r.unique_id not in executed_node_ids: - self.node_results.append( - RunResult.from_node(r, RunStatus.Skipped, "Skipping due to fail_fast") - ) + skipped_results = mark_nodes_as_skipped( + {r.node.unique_id for r in self.node_results}, + self._flattened_nodes, + "Skipping due to fail_fast", + ) + self.node_results.extend(skipped_results) print_run_result_error(failure.result) # ensure information about all nodes is propagated to run results when failing fast @@ -478,10 +489,11 @@ def populate_adapter_cache( {"adapter_cache_construction_elapsed": cache_populate_time} ) - def before_run(self, adapter, selected_uids: AbstractSet[str]): + def before_run(self, adapter: BaseAdapter, selected_uids: AbstractSet[str]) -> RunStatus: with adapter.connection_named("master"): self.defer_to_manifest() self.populate_adapter_cache(adapter) + return RunStatus.Success def after_run(self, adapter, results) -> None: pass @@ -493,8 +505,18 @@ def execute_with_hooks(self, selected_uids: AbstractSet[str]): adapter = get_adapter(self.config) self.started_at = time.time() try: - self.before_run(adapter, selected_uids) - res = self.execute_nodes() + before_run_status = self.before_run(adapter, selected_uids) + + if before_run_status == RunStatus.Success: + res = self.execute_nodes() + else: + res = mark_nodes_as_skipped( + {r.node.unique_id for r in self.node_results if hasattr(r, "node")}, + self._flattened_nodes or [], + "Skipping due to on-run-start failure", + ) + self.node_results.extend(res) + self.after_run(adapter, res) finally: adapter.cleanup_connections() From 12b7694336f8cbf490e42444f321bba19fdefd96 Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Fri, 13 Sep 2024 16:50:00 +0100 Subject: [PATCH 06/24] Add skip messages for nodes --- core/dbt/task/run.py | 7 +- core/dbt/task/runnable.py | 58 +++--- .../adapter/hooks/test_on_run_hooks.py | 43 +++++ .../adapter/hooks/test_run_hooks.py | 165 ------------------ 4 files changed, 80 insertions(+), 193 deletions(-) create mode 100644 tests/functional/adapter/hooks/test_on_run_hooks.py delete mode 100644 tests/functional/adapter/hooks/test_run_hooks.py diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index d26c381acd8..7afa2050e47 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -5,11 +5,7 @@ from dbt import tracking, utils from dbt.adapters.base import BaseAdapter, BaseRelation -from dbt.adapters.events.types import ( - DatabaseErrorRunningHook, - FinishedRunningStats, - HooksRunning, -) +from dbt.adapters.events.types import FinishedRunningStats, HooksRunning from dbt.adapters.exceptions import MissingMaterializationError from dbt.artifacts.resources import Hook from dbt.artifacts.schemas.results import ( @@ -407,7 +403,6 @@ def safe_run_hooks( num_failures = 0 else: fire_event(Formatting("")) - fire_event(DatabaseErrorRunningHook(hook_type=hook_type.value)) node_result_message = f"{hook_type.value} failed, error:\n {message}" num_failures = 1 diff --git a/core/dbt/task/runnable.py b/core/dbt/task/runnable.py index 7027f891d82..99757ae42c2 100644 --- a/core/dbt/task/runnable.py +++ b/core/dbt/task/runnable.py @@ -36,6 +36,7 @@ NodeStart, NothingToDo, QueryCancelationUnsupported, + SkippingDetails, ) from dbt.exceptions import DbtInternalError, DbtRuntimeError, FailFastError from dbt.flags import get_flags @@ -65,16 +66,12 @@ class GraphRunnableMode(StrEnum): Independent = "independent" -def mark_nodes_as_skipped( - executed_node_ids: Set[str], flattened_nodes: List[ResultNode], message: str -) -> List[RunResult]: - node_results = [] - - for r in flattened_nodes: - if r.unique_id not in executed_node_ids: - node_results.append(RunResult.from_node(r, RunStatus.Skipped, message)) - - return node_results +def mark_node_as_skipped( + node: ResultNode, executed_node_ids: Set[str], message: Optional[str] +) -> Optional[RunResult]: + if node.unique_id not in executed_node_ids: + return RunResult.from_node(node, RunStatus.Skipped, message) + return None class GraphRunnableTask(ConfiguredTask): @@ -413,12 +410,14 @@ def execute_nodes(self): except FailFastError as failure: self._cancel_connections(pool) - skipped_results = mark_nodes_as_skipped( - {r.node.unique_id for r in self.node_results}, - self._flattened_nodes, - "Skipping due to fail_fast", - ) - self.node_results.extend(skipped_results) + executed_node_ids = {r.node.unique_id for r in self.node_results} + message = "Skipping due to fail_fast" + + for node in self._flattened_nodes: + if node.unique_id not in executed_node_ids: + self.node_results.append( + mark_node_as_skipped(node, executed_node_ids, message) + ) print_run_result_error(failure.result) # ensure information about all nodes is propagated to run results when failing fast @@ -510,12 +509,27 @@ def execute_with_hooks(self, selected_uids: AbstractSet[str]): if before_run_status == RunStatus.Success: res = self.execute_nodes() else: - res = mark_nodes_as_skipped( - {r.node.unique_id for r in self.node_results if hasattr(r, "node")}, - self._flattened_nodes or [], - "Skipping due to on-run-start failure", - ) - self.node_results.extend(res) + executed_node_ids = { + r.node.unique_id for r in self.node_results if hasattr(r, "node") + } + + res = [] + + for index, node in enumerate(self._flattened_nodes or []): + if node.unique_id not in executed_node_ids: + fire_event( + SkippingDetails( + resource_type=node.resource_type, + schema=node.schema, + node_name=node.name, + index=index + 1, + total=self.num_nodes, + node_info=node.node_info, + ) + ) + skipped_node = mark_node_as_skipped(node, executed_node_ids, None) + if skipped_node: + self.node_results.append(skipped_node) self.after_run(adapter, res) finally: diff --git a/tests/functional/adapter/hooks/test_on_run_hooks.py b/tests/functional/adapter/hooks/test_on_run_hooks.py new file mode 100644 index 00000000000..6701d98bf6a --- /dev/null +++ b/tests/functional/adapter/hooks/test_on_run_hooks.py @@ -0,0 +1,43 @@ +import pytest + +from dbt.artifacts.schemas.results import RunStatus +from dbt.tests.util import run_dbt_and_capture + + +class Test__StartHookFail__SelectedNodesSkip__EndHookFail: + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "on-run-start": [ + "create table if not exists {{ target.schema }}.my_start_table ( id int )", # success + "drop table if exists {{ target.schema }}.my_start_table", # success + "insert into {{ target.schema }}.my_start_table (id) values (1, 2, 3)", # fail + "create table if not exists {{ target.schema }}.my_start_table ( id int )", # skip + ], + "on-run-end": [ + "create table if not exists {{ target.schema }}.my_end_table ( id int )", # success + "drop table if exists {{ target.schema }}.my_end_table", # success + "insert into {{ target.schema }}.my_end_table (id) values (1, 2, 3)", # fail + "create table if not exists {{ target.schema }}.my_end_table ( id int )", # skip + ], + } + + @pytest.fixture(scope="class") + def models(self): + return { + "my_model.sql": "select * from {{ target.schema }}.my_start_table" + " union all " + "select * from {{ target.schema }}.my_end_table" + } + + def test_results(self, project): + results, log_output = run_dbt_and_capture(["run"], expect_pass=False) + assert [result.status for result in results] == [ + RunStatus.Error, + RunStatus.Skipped, + RunStatus.Error, + ] + + +class StartHookFail__SelectedNodesSkip__EndHookPass: + pass diff --git a/tests/functional/adapter/hooks/test_run_hooks.py b/tests/functional/adapter/hooks/test_run_hooks.py deleted file mode 100644 index 030988b7c5c..00000000000 --- a/tests/functional/adapter/hooks/test_run_hooks.py +++ /dev/null @@ -1,165 +0,0 @@ -import os -from pathlib import Path - -import pytest - -from dbt.tests.util import check_table_does_not_exist, run_dbt -from dbt_common.exceptions import DbtDatabaseError -from tests.functional.adapter.hooks.fixtures import ( - macros__before_and_after, - macros__hook, - macros_missing_column, - models__hooks, - models__missing_column, - seeds__example_seed_csv, -) - - -class TestPrePostRunHooks(object): - @pytest.fixture(scope="function") - def setUp(self, project): - project.run_sql_file(project.test_data_dir / Path("seed_run.sql")) - project.run_sql(f"drop table if exists { project.test_schema }.schemas") - project.run_sql(f"drop table if exists { project.test_schema }.db_schemas") - os.environ["TERM_TEST"] = "TESTING" - - @pytest.fixture(scope="class") - def macros(self): - return {"hook.sql": macros__hook, "before-and-after.sql": macros__before_and_after} - - @pytest.fixture(scope="class") - def models(self): - return {"hooks.sql": models__hooks} - - @pytest.fixture(scope="class") - def seeds(self): - return {"example_seed.csv": seeds__example_seed_csv} - - @pytest.fixture(scope="class") - def project_config_update(self): - return { - # The create and drop table statements here validate that these hooks run - # in the same order that they are defined. Drop before create is an error. - # Also check that the table does not exist below. - "on-run-start": [ - "{{ custom_run_hook('start', target, run_started_at, invocation_id) }}", - "create table {{ target.schema }}.start_hook_order_test ( id int )", - "drop table {{ target.schema }}.start_hook_order_test", - "{{ log(env_var('TERM_TEST'), info=True) }}", - ], - "on-run-end": [ - "{{ custom_run_hook('end', target, run_started_at, invocation_id) }}", - "create table {{ target.schema }}.end_hook_order_test ( id int )", - "drop table {{ target.schema }}.end_hook_order_test", - "create table {{ target.schema }}.schemas ( schema text )", - "insert into {{ target.schema }}.schemas (schema) values {% for schema in schemas %}( '{{ schema }}' ){% if not loop.last %},{% endif %}{% endfor %}", - "create table {{ target.schema }}.db_schemas ( db text, schema text )", - "insert into {{ target.schema }}.db_schemas (db, schema) values {% for db, schema in database_schemas %}('{{ db }}', '{{ schema }}' ){% if not loop.last %},{% endif %}{% endfor %}", - ], - "seeds": { - "quote_columns": False, - }, - } - - def get_ctx_vars(self, state, project): - fields = [ - "test_state", - "target_dbname", - "target_host", - "target_name", - "target_schema", - "target_threads", - "target_type", - "target_user", - "target_pass", - "run_started_at", - "invocation_id", - "thread_id", - ] - field_list = ", ".join(['"{}"'.format(f) for f in fields]) - query = f"select {field_list} from {project.test_schema}.on_run_hook where test_state = '{state}'" - - vals = project.run_sql(query, fetch="all") - assert len(vals) != 0, "nothing inserted into on_run_hook table" - assert len(vals) == 1, "too many rows in hooks table" - ctx = dict([(k, v) for (k, v) in zip(fields, vals[0])]) - - return ctx - - def assert_used_schemas(self, project): - schemas_query = "select * from {}.schemas".format(project.test_schema) - results = project.run_sql(schemas_query, fetch="all") - assert len(results) == 1 - assert results[0][0] == project.test_schema - - db_schemas_query = "select * from {}.db_schemas".format(project.test_schema) - results = project.run_sql(db_schemas_query, fetch="all") - assert len(results) == 1 - assert results[0][0] == project.database - assert results[0][1] == project.test_schema - - def check_hooks(self, state, project, host): - ctx = self.get_ctx_vars(state, project) - - assert ctx["test_state"] == state - assert ctx["target_dbname"] == "dbt" - assert ctx["target_host"] == host - assert ctx["target_name"] == "default" - assert ctx["target_schema"] == project.test_schema - assert ctx["target_threads"] == 4 - assert ctx["target_type"] == "postgres" - assert ctx["target_user"] == "root" - assert ctx["target_pass"] == "" - - assert ( - ctx["run_started_at"] is not None and len(ctx["run_started_at"]) > 0 - ), "run_started_at was not set" - assert ( - ctx["invocation_id"] is not None and len(ctx["invocation_id"]) > 0 - ), "invocation_id was not set" - assert ctx["thread_id"].startswith("Thread-") or ctx["thread_id"] == "MainThread" - - def test_pre_and_post_run_hooks(self, setUp, project, dbt_profile_target): - run_dbt(["run"]) - - self.check_hooks("start", project, dbt_profile_target.get("host", None)) - self.check_hooks("end", project, dbt_profile_target.get("host", None)) - - check_table_does_not_exist(project.adapter, "start_hook_order_test") - check_table_does_not_exist(project.adapter, "end_hook_order_test") - self.assert_used_schemas(project) - - def test_pre_and_post_seed_hooks(self, setUp, project, dbt_profile_target): - run_dbt(["seed"]) - - self.check_hooks("start", project, dbt_profile_target.get("host", None)) - self.check_hooks("end", project, dbt_profile_target.get("host", None)) - - check_table_does_not_exist(project.adapter, "start_hook_order_test") - check_table_does_not_exist(project.adapter, "end_hook_order_test") - self.assert_used_schemas(project) - - -class TestAfterRunHooks(object): - @pytest.fixture(scope="class") - def macros(self): - return {"temp_macro.sql": macros_missing_column} - - @pytest.fixture(scope="class") - def models(self): - return {"test_column.sql": models__missing_column} - - @pytest.fixture(scope="class") - def project_config_update(self): - return { - # The create and drop table statements here validate that these hooks run - # in the same order that they are defined. Drop before create is an error. - # Also check that the table does not exist below. - "on-run-start": "- {{ export_table_check() }}" - } - - def test_missing_column_pre_hook(self, project): - with pytest.raises( - DbtDatabaseError, match=r'relation "(\w+)\.test_column" does not exist' - ): - run_dbt(["run"], expect_pass=False) From 75f64d0ce2181e80a842a16ddca90500963c1bf7 Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Tue, 17 Sep 2024 14:28:09 +0100 Subject: [PATCH 07/24] One result per hook --- core/dbt/events/types.py | 3 ++ core/dbt/task/printer.py | 19 ++------ core/dbt/task/run.py | 98 ++++++++++++++++++++------------------- core/dbt/task/runnable.py | 19 ++++---- 4 files changed, 68 insertions(+), 71 deletions(-) diff --git a/core/dbt/events/types.py b/core/dbt/events/types.py index 6b33cb01bfd..4dffee56ca3 100644 --- a/core/dbt/events/types.py +++ b/core/dbt/events/types.py @@ -1516,6 +1516,9 @@ def message(self) -> str: if self.status == "success": info = "OK" status = green(info) + elif self.status == "skipped": + info = "SKIP" + status = yellow(info) else: info = "ERROR" status = red(info) diff --git a/core/dbt/task/printer.py b/core/dbt/task/printer.py index 2fcbddfef3a..a5995d50b40 100644 --- a/core/dbt/task/printer.py +++ b/core/dbt/task/printer.py @@ -1,7 +1,7 @@ from typing import Dict, Optional from dbt.artifacts.schemas.results import NodeStatus -from dbt.contracts.graph.nodes import Group, HookNode +from dbt.contracts.graph.nodes import Group from dbt.events.types import ( CheckNodeTestFailure, EndOfRunSummary, @@ -61,20 +61,9 @@ def print_run_status_line(results) -> None: } for r in results: - # All on-run-* nodes are returned as a single result, so we need to unpack it to get the correct counts - if isinstance(r.node, HookNode): - n = r.node.index or 0 - stats["total"] += n - # Hooks "fail fast", so there will be at most one error - if r.status == NodeStatus.Error: - stats["error"] += 1 - stats["pass"] += n - 1 - else: - stats["pass"] += n - else: - result_type = interpret_run_result(r) - stats[result_type] += 1 - stats["total"] += 1 + result_type = interpret_run_result(r) + stats[result_type] += 1 + stats["total"] += 1 fire_event(Formatting("")) fire_event(StatsLine(stats=stats)) diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index 7afa2050e47..927e4e78174 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -5,7 +5,7 @@ from dbt import tracking, utils from dbt.adapters.base import BaseAdapter, BaseRelation -from dbt.adapters.events.types import FinishedRunningStats, HooksRunning +from dbt.adapters.events.types import FinishedRunningStats from dbt.adapters.exceptions import MissingMaterializationError from dbt.artifacts.resources import Hook from dbt.artifacts.schemas.results import ( @@ -15,11 +15,8 @@ TimingInfo, ) from dbt.artifacts.schemas.run import RunResult -from dbt.cli.flags import Flags from dbt.clients.jinja import MacroGenerator -from dbt.config.runtime import RuntimeConfig from dbt.context.providers import generate_runtime_model_context -from dbt.contracts.graph.manifest import Manifest from dbt.contracts.graph.nodes import HookNode, ResultNode from dbt.events.types import ( LogHookEndLine, @@ -289,14 +286,6 @@ def execute(self, model, manifest): class RunTask(CompileTask): - def __init__(self, args: Flags, config: RuntimeConfig, manifest: Manifest) -> None: - super().__init__(args, config, manifest) - self.ran_hooks: List[HookNode] = [] - self._total_executed = 0 - - def index_offset(self, value: int) -> int: - return self._total_executed + value - def raise_on_first_error(self) -> bool: return False @@ -341,12 +330,7 @@ def safe_run_hooks( return RunStatus.Success num_hooks = len(ordered_hooks) - fire_event(Formatting("")) - fire_event(HooksRunning(num_hooks=num_hooks, hook_type=hook_type)) - idx = 0 - timings = [] - total_execution_time = 0.0 while idx < num_hooks: hook = ordered_hooks[idx] @@ -371,14 +355,27 @@ def safe_run_hooks( status, message = get_execution_status(sql, adapter) finished_at = datetime.utcnow() - timings.append(TimingInfo(hook_name, started_at, finished_at)) - - hook.update_event_status(finished_at=finished_at.isoformat(), node_status=status) - self.ran_hooks.append(hook) + hook.update_event_status(finished_at=finished_at.isoformat()) execution_time = (finished_at - started_at).total_seconds() - total_execution_time += execution_time + if status == RunStatus.Success: + message = f"{hook_name} passed" + else: + message = f"{hook_name} failed, error:\n {message}" + + self.node_results.append( + RunResult( + status=status, + thread_id="main", + timing=[TimingInfo(hook_name, started_at, finished_at)], + message=message, + adapter_response={}, + execution_time=execution_time, + failures=0 if status == RunStatus.Success else 1, + node=hook, + ) + ) fire_event( LogHookEndLine( statement=hook_name, @@ -389,40 +386,47 @@ def safe_run_hooks( node_info=hook.node_info, ) ) - hook.clear_event_status() + idx += 1 if status != RunStatus.Success: break - idx += 1 - - self._total_executed += hook.index or 0 + while idx < num_hooks: + hook = ordered_hooks[idx] - if status == RunStatus.Success: - node_result_message = f"{hook.index} {hook_type.value} hooks executed successfully" - num_failures = 0 - else: - fire_event(Formatting("")) - node_result_message = f"{hook_type.value} failed, error:\n {message}" - num_failures = 1 + with log_contextvars(node_info=hook.node_info): + hook.index = idx + 1 + hook_name = f"{hook.package_name}.{hook_type}.{hook.index}" + status = RunStatus.Skipped - self.node_results.append( - RunResult( - status=status, - thread_id="main", - timing=timings, - message=node_result_message, - adapter_response={}, - execution_time=total_execution_time, - failures=num_failures, - node=hook, - ) - ) + self.node_results.append( + RunResult( + status=status, + thread_id="main", + timing=[], + message=f"{status} hook: '{hook_name}'", + adapter_response={}, + execution_time=0, + failures=1, + node=hook, + ) + ) + fire_event( + LogHookEndLine( + statement=hook_name, + status=RunStatus.Skipped, + index=hook.index, + total=num_hooks, + execution_time=0, + node_info=hook.node_info, + ) + ) + idx += 1 return status def print_results_line(self, results, execution_time) -> None: - nodes = [r.node for r in results if hasattr(r, "node")] + self.ran_hooks + nodes = [r.node for r in results if hasattr(r, "node")] stat_line = get_counts(nodes) execution = "" @@ -459,8 +463,6 @@ def after_run(self, adapter, results) -> None: and r.status not in (NodeStatus.Error, NodeStatus.Fail, NodeStatus.Skipped) } - self._total_executed += len(results) - extras = { "schemas": list({s for _, s in database_schema_set}), "results": results, diff --git a/core/dbt/task/runnable.py b/core/dbt/task/runnable.py index 99757ae42c2..e0f7a3df83f 100644 --- a/core/dbt/task/runnable.py +++ b/core/dbt/task/runnable.py @@ -395,14 +395,6 @@ def _cancel_connections(self, pool): def execute_nodes(self): num_threads = self.config.threads - target_name = self.config.target_name - - fire_event( - ConcurrencyLine( - num_threads=num_threads, target_name=target_name, node_count=self.num_nodes - ) - ) - fire_event(Formatting("")) pool = ThreadPool(num_threads, self._pool_thread_initializer, [get_invocation_context()]) try: @@ -502,9 +494,20 @@ def print_results_line(self, node_results, elapsed): def execute_with_hooks(self, selected_uids: AbstractSet[str]): adapter = get_adapter(self.config) + + fire_event( + ConcurrencyLine( + num_threads=self.config.threads, + target_name=self.config.target_name, + node_count=self.num_nodes, + ) + ) + fire_event(Formatting("")) + self.started_at = time.time() try: before_run_status = self.before_run(adapter, selected_uids) + fire_event(Formatting("")) if before_run_status == RunStatus.Success: res = self.execute_nodes() From 900d1cd8fd5acd57d84e1f5f8c71f945149140f0 Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Tue, 17 Sep 2024 14:46:57 +0100 Subject: [PATCH 08/24] o1 refactor --- core/dbt/task/run.py | 108 ++++++++++++++++--------------------------- 1 file changed, 39 insertions(+), 69 deletions(-) diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index 927e4e78174..d5d6417f0f0 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -322,60 +322,66 @@ def safe_run_hooks( started_at = datetime.utcnow() ordered_hooks = self.get_hooks_by_type(hook_type) - # on-run-* hooks should run outside of a transaction. This happens - # b/c psycopg2 automatically begins a transaction when a connection - # is created. + # on-run-* hooks should run outside a transaction. This happens because psycopg2 automatically begins a transaction when a connection is created. adapter.clear_transaction() if not ordered_hooks: return RunStatus.Success - num_hooks = len(ordered_hooks) - idx = 0 + status = RunStatus.Success + failed = False + num_hooks = len(ordered_hooks) - while idx < num_hooks: - hook = ordered_hooks[idx] - # We want to include node_info in the appropriate log files, so use - # log_contextvars + for idx, hook in enumerate(ordered_hooks, 1): with log_contextvars(node_info=hook.node_info): - hook.index = idx + 1 - hook.update_event_status( - started_at=started_at.isoformat(), node_status=RunningStatus.Started - ) - sql = self.get_hook_sql(adapter, hook, hook.index, num_hooks, extra_context) - + hook.index = idx hook_name = f"{hook.package_name}.{hook_type}.{hook.index}" - fire_event( - LogHookStartLine( - statement=hook_name, - index=hook.index, - total=num_hooks, - node_info=hook.node_info, - ) - ) - - status, message = get_execution_status(sql, adapter) - finished_at = datetime.utcnow() - hook.update_event_status(finished_at=finished_at.isoformat()) + execution_time = 0.0 + timing = [] + failures = 1 - execution_time = (finished_at - started_at).total_seconds() + if not failed: + hook.update_event_status( + started_at=started_at.isoformat(), node_status=RunningStatus.Started + ) + sql = self.get_hook_sql(adapter, hook, hook.index, num_hooks, extra_context) + fire_event( + LogHookStartLine( + statement=hook_name, + index=hook.index, + total=num_hooks, + node_info=hook.node_info, + ) + ) - if status == RunStatus.Success: - message = f"{hook_name} passed" + status, message = get_execution_status(sql, adapter) + finished_at = datetime.utcnow() + hook.update_event_status(finished_at=finished_at.isoformat()) + execution_time = (finished_at - started_at).total_seconds() + timing = [TimingInfo(hook_name, started_at, finished_at)] + failures = 0 if status == RunStatus.Success else 1 + + if status == RunStatus.Success: + message = f"{hook_name} passed" + else: + message = f"{hook_name} failed, error:\n {message}" + failed = True else: - message = f"{hook_name} failed, error:\n {message}" + status = RunStatus.Skipped + message = f"{hook_name} skipped" self.node_results.append( RunResult( status=status, thread_id="main", - timing=[TimingInfo(hook_name, started_at, finished_at)], + timing=timing, message=message, adapter_response={}, execution_time=execution_time, - failures=0 if status == RunStatus.Success else 1, + failures=failures, node=hook, ) ) + fire_event( LogHookEndLine( statement=hook_name, @@ -386,42 +392,6 @@ def safe_run_hooks( node_info=hook.node_info, ) ) - idx += 1 - - if status != RunStatus.Success: - break - - while idx < num_hooks: - hook = ordered_hooks[idx] - - with log_contextvars(node_info=hook.node_info): - hook.index = idx + 1 - hook_name = f"{hook.package_name}.{hook_type}.{hook.index}" - status = RunStatus.Skipped - - self.node_results.append( - RunResult( - status=status, - thread_id="main", - timing=[], - message=f"{status} hook: '{hook_name}'", - adapter_response={}, - execution_time=0, - failures=1, - node=hook, - ) - ) - fire_event( - LogHookEndLine( - statement=hook_name, - status=RunStatus.Skipped, - index=hook.index, - total=num_hooks, - execution_time=0, - node_info=hook.node_info, - ) - ) - idx += 1 return status From 69702915bdb50b0e4358ed3b4aff7c9102828569 Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Tue, 17 Sep 2024 16:01:33 +0100 Subject: [PATCH 09/24] fix tests --- core/dbt/task/run.py | 2 +- .../adapter/hooks/test_on_run_hooks.py | 72 +++++++++++++++---- 2 files changed, 61 insertions(+), 13 deletions(-) diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index d5d6417f0f0..833303774c3 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -334,7 +334,7 @@ def safe_run_hooks( for idx, hook in enumerate(ordered_hooks, 1): with log_contextvars(node_info=hook.node_info): hook.index = idx - hook_name = f"{hook.package_name}.{hook_type}.{hook.index}" + hook_name = f"{hook.package_name}.{hook_type}.{hook.index - 1}" execution_time = 0.0 timing = [] failures = 1 diff --git a/tests/functional/adapter/hooks/test_on_run_hooks.py b/tests/functional/adapter/hooks/test_on_run_hooks.py index 6701d98bf6a..c3201eabdf6 100644 --- a/tests/functional/adapter/hooks/test_on_run_hooks.py +++ b/tests/functional/adapter/hooks/test_on_run_hooks.py @@ -9,16 +9,16 @@ class Test__StartHookFail__SelectedNodesSkip__EndHookFail: def project_config_update(self): return { "on-run-start": [ - "create table if not exists {{ target.schema }}.my_start_table ( id int )", # success - "drop table if exists {{ target.schema }}.my_start_table", # success + "create table {{ target.schema }}.my_start_table ( id int )", # success + "drop table {{ target.schema }}.my_start_table", # success "insert into {{ target.schema }}.my_start_table (id) values (1, 2, 3)", # fail - "create table if not exists {{ target.schema }}.my_start_table ( id int )", # skip + "create table {{ target.schema }}.my_start_table ( id int )", # skip ], "on-run-end": [ - "create table if not exists {{ target.schema }}.my_end_table ( id int )", # success - "drop table if exists {{ target.schema }}.my_end_table", # success + "create table {{ target.schema }}.my_end_table ( id int )", # success + "drop table {{ target.schema }}.my_end_table", # success "insert into {{ target.schema }}.my_end_table (id) values (1, 2, 3)", # fail - "create table if not exists {{ target.schema }}.my_end_table ( id int )", # skip + "create table {{ target.schema }}.my_end_table ( id int )", # skip ], } @@ -32,12 +32,60 @@ def models(self): def test_results(self, project): results, log_output = run_dbt_and_capture(["run"], expect_pass=False) - assert [result.status for result in results] == [ - RunStatus.Error, - RunStatus.Skipped, - RunStatus.Error, + assert [(result.node.alias, result.status) for result in results] == [ + ("test-on-run-start-0", RunStatus.Success), + ("test-on-run-start-1", RunStatus.Success), + ("test-on-run-start-2", RunStatus.Error), + ("test-on-run-start-3", RunStatus.Skipped), + ("my_model", RunStatus.Skipped), + ("test-on-run-end-0", RunStatus.Success), + ("test-on-run-end-1", RunStatus.Success), + ("test-on-run-end-2", RunStatus.Error), + ("test-on-run-end-3", RunStatus.Skipped), ] + assert f'relation "{project.test_schema}.my_start_table" does not exist' in log_output + assert "PASS=4 WARN=0 ERROR=2 SKIP=3 TOTAL=9" in log_output + assert "8 project hooks, 1 view model" in log_output -class StartHookFail__SelectedNodesSkip__EndHookPass: - pass + +class Test__StartHookFail__SelectedNodesSkip__EndHookPass: + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "on-run-start": [ + "create table {{ target.schema }}.my_start_table ( id int )", # success + "drop table {{ target.schema }}.my_start_table", # success + "insert into {{ target.schema }}.my_start_table (id) values (1, 2, 3)", # fail + "create table {{ target.schema }}.my_start_table ( id int )", # skip + ], + "on-run-end": [ + "create table {{ target.schema }}.my_end_table ( id int )", # success + "drop table {{ target.schema }}.my_end_table", # success + ], + } + + @pytest.fixture(scope="class") + def models(self): + return { + "my_model.sql": "select * from {{ target.schema }}.my_start_table" + " union all " + "select * from {{ target.schema }}.my_end_table" + } + + def test_results(self, project): + results, log_output = run_dbt_and_capture(["run"], expect_pass=False) + + assert [(result.node.alias, result.status) for result in results] == [ + ("test-on-run-start-0", RunStatus.Success), + ("test-on-run-start-1", RunStatus.Success), + ("test-on-run-start-2", RunStatus.Error), + ("test-on-run-start-3", RunStatus.Skipped), + ("my_model", RunStatus.Skipped), + ("test-on-run-end-0", RunStatus.Success), + ("test-on-run-end-1", RunStatus.Success), + ] + + assert f'relation "{project.test_schema}.my_start_table" does not exist' in log_output + assert "PASS=4 WARN=0 ERROR=1 SKIP=2 TOTAL=7" in log_output + assert "6 project hooks, 1 view model" in log_output From 66997f26e22e87b3d2abec7addd8e3f74a7fdd9e Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Tue, 17 Sep 2024 17:55:15 +0100 Subject: [PATCH 10/24] Add flag --- core/dbt/contracts/project.py | 2 ++ core/dbt/task/runnable.py | 4 +++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/core/dbt/contracts/project.py b/core/dbt/contracts/project.py index 041d310cc5e..170f18965c8 100644 --- a/core/dbt/contracts/project.py +++ b/core/dbt/contracts/project.py @@ -341,6 +341,7 @@ class ProjectFlags(ExtensibleDbtClassMixin): require_explicit_package_overrides_for_builtin_materializations: bool = True require_resource_names_without_spaces: bool = False source_freshness_run_project_hooks: bool = False + skip_nodes_if_on_run_start_fails: bool = False @property def project_only_flags(self) -> Dict[str, Any]: @@ -348,6 +349,7 @@ def project_only_flags(self) -> Dict[str, Any]: "require_explicit_package_overrides_for_builtin_materializations": self.require_explicit_package_overrides_for_builtin_materializations, "require_resource_names_without_spaces": self.require_resource_names_without_spaces, "source_freshness_run_project_hooks": self.source_freshness_run_project_hooks, + "skip_nodes_if_on_run_start_fails": self.skip_nodes_if_on_run_start_fails, } diff --git a/core/dbt/task/runnable.py b/core/dbt/task/runnable.py index e0f7a3df83f..a588e93d1e5 100644 --- a/core/dbt/task/runnable.py +++ b/core/dbt/task/runnable.py @@ -509,7 +509,9 @@ def execute_with_hooks(self, selected_uids: AbstractSet[str]): before_run_status = self.before_run(adapter, selected_uids) fire_event(Formatting("")) - if before_run_status == RunStatus.Success: + if before_run_status == RunStatus.Success or ( + not get_flags().skip_nodes_if_on_run_start_fails + ): res = self.execute_nodes() else: executed_node_ids = { From 498a0159921ae58b7a3195168f950894f5b5581e Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Tue, 17 Sep 2024 17:59:41 +0100 Subject: [PATCH 11/24] Fix existing tests --- tests/functional/dependencies/test_local_dependency.py | 2 +- tests/functional/sources/test_source_freshness.py | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/functional/dependencies/test_local_dependency.py b/tests/functional/dependencies/test_local_dependency.py index f96d22bbbed..e5bf1de5323 100644 --- a/tests/functional/dependencies/test_local_dependency.py +++ b/tests/functional/dependencies/test_local_dependency.py @@ -328,7 +328,7 @@ def test_hook_dependency(self, prepare_dependencies, project): run_dbt(["deps", "--vars", cli_vars]) results = run_dbt(["run", "--vars", cli_vars]) - assert len(results) == 3 + assert len(results) == 8 check_relations_equal(project.adapter, ["actual", "expected"]) diff --git a/tests/functional/sources/test_source_freshness.py b/tests/functional/sources/test_source_freshness.py index 2f42a3aaa56..70c8866869e 100644 --- a/tests/functional/sources/test_source_freshness.py +++ b/tests/functional/sources/test_source_freshness.py @@ -129,12 +129,12 @@ def _assert_freshness_results(self, path, state): ] def _assert_project_hooks_called(self, logs: str): - assert "Running 1 on-run-start hook" in logs - assert "Running 1 on-run-end hook" in logs + assert "test.on-run-start.0" in logs + assert "test.on-run-start.0" in logs def _assert_project_hooks_not_called(self, logs: str): - assert "Running 1 on-run-start hook" not in logs - assert "Running 1 on-run-end hook" not in logs + assert "test.on-run-end.0" not in logs + assert "test.on-run-end.0" not in logs class TestSourceFreshness(SuccessfulSourceFreshnessTest): From 512b25233e21ac856a04ffcc8edf8fb7b3ff96e8 Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Wed, 18 Sep 2024 14:10:24 +0100 Subject: [PATCH 12/24] Write tests for on-run-start hooks --- .../adapter/hooks/test_on_run_hooks.py | 106 ++++++++---------- tests/functional/logging/test_logging.py | 2 +- 2 files changed, 50 insertions(+), 58 deletions(-) diff --git a/tests/functional/adapter/hooks/test_on_run_hooks.py b/tests/functional/adapter/hooks/test_on_run_hooks.py index c3201eabdf6..16944558d0c 100644 --- a/tests/functional/adapter/hooks/test_on_run_hooks.py +++ b/tests/functional/adapter/hooks/test_on_run_hooks.py @@ -1,12 +1,16 @@ import pytest from dbt.artifacts.schemas.results import RunStatus -from dbt.tests.util import run_dbt_and_capture +from dbt.tests.util import get_artifact, run_dbt_and_capture -class Test__StartHookFail__SelectedNodesSkip__EndHookFail: +class Test__StartHookFail__FlagIsNone__SelectedNodesRun: @pytest.fixture(scope="class") - def project_config_update(self): + def flags(self): + return {} + + @pytest.fixture(scope="class") + def project_config_update(self, flags): return { "on-run-start": [ "create table {{ target.schema }}.my_start_table ( id int )", # success @@ -14,12 +18,7 @@ def project_config_update(self): "insert into {{ target.schema }}.my_start_table (id) values (1, 2, 3)", # fail "create table {{ target.schema }}.my_start_table ( id int )", # skip ], - "on-run-end": [ - "create table {{ target.schema }}.my_end_table ( id int )", # success - "drop table {{ target.schema }}.my_end_table", # success - "insert into {{ target.schema }}.my_end_table (id) values (1, 2, 3)", # fail - "create table {{ target.schema }}.my_end_table ( id int )", # skip - ], + "flags": flags, } @pytest.fixture(scope="class") @@ -30,62 +29,55 @@ def models(self): "select * from {{ target.schema }}.my_end_table" } - def test_results(self, project): + @pytest.fixture(scope="class") + def log_counts(self): + return "PASS=2 WARN=0 ERROR=2 SKIP=1 TOTAL=5" + + @pytest.fixture(scope="class") + def my_model_run_status(self): + return RunStatus.Error + + def test_results(self, project, log_counts, my_model_run_status): results, log_output = run_dbt_and_capture(["run"], expect_pass=False) - assert [(result.node.alias, result.status) for result in results] == [ - ("test-on-run-start-0", RunStatus.Success), - ("test-on-run-start-1", RunStatus.Success), - ("test-on-run-start-2", RunStatus.Error), - ("test-on-run-start-3", RunStatus.Skipped), - ("my_model", RunStatus.Skipped), - ("test-on-run-end-0", RunStatus.Success), - ("test-on-run-end-1", RunStatus.Success), - ("test-on-run-end-2", RunStatus.Error), - ("test-on-run-end-3", RunStatus.Skipped), + + expected_results = [ + ("operation.test.test-on-run-start-0", RunStatus.Success), + ("operation.test.test-on-run-start-1", RunStatus.Success), + ("operation.test.test-on-run-start-2", RunStatus.Error), + ("operation.test.test-on-run-start-3", RunStatus.Skipped), + ("model.test.my_model", my_model_run_status), ] + assert [(result.node.unique_id, result.status) for result in results] == expected_results assert f'relation "{project.test_schema}.my_start_table" does not exist' in log_output - assert "PASS=4 WARN=0 ERROR=2 SKIP=3 TOTAL=9" in log_output - assert "8 project hooks, 1 view model" in log_output + assert log_counts in log_output + assert "4 project hooks, 1 view model" in log_output + run_results = get_artifact(project.project_root, "target", "run_results.json") + assert [ + (result["unique_id"], result["status"]) for result in run_results["results"] + ] == expected_results -class Test__StartHookFail__SelectedNodesSkip__EndHookPass: - @pytest.fixture(scope="class") - def project_config_update(self): - return { - "on-run-start": [ - "create table {{ target.schema }}.my_start_table ( id int )", # success - "drop table {{ target.schema }}.my_start_table", # success - "insert into {{ target.schema }}.my_start_table (id) values (1, 2, 3)", # fail - "create table {{ target.schema }}.my_start_table ( id int )", # skip - ], - "on-run-end": [ - "create table {{ target.schema }}.my_end_table ( id int )", # success - "drop table {{ target.schema }}.my_end_table", # success - ], - } +class Test__StartHookFail__FlagIsFalse__SelectedNodesRun( + Test__StartHookFail__FlagIsNone__SelectedNodesRun +): @pytest.fixture(scope="class") - def models(self): - return { - "my_model.sql": "select * from {{ target.schema }}.my_start_table" - " union all " - "select * from {{ target.schema }}.my_end_table" - } + def flags(self): + return {"skip_nodes_if_on_run_start_fails": False} - def test_results(self, project): - results, log_output = run_dbt_and_capture(["run"], expect_pass=False) - assert [(result.node.alias, result.status) for result in results] == [ - ("test-on-run-start-0", RunStatus.Success), - ("test-on-run-start-1", RunStatus.Success), - ("test-on-run-start-2", RunStatus.Error), - ("test-on-run-start-3", RunStatus.Skipped), - ("my_model", RunStatus.Skipped), - ("test-on-run-end-0", RunStatus.Success), - ("test-on-run-end-1", RunStatus.Success), - ] +class Test__StartHookFail__FlagIsTrue__SelectedNodesSkipped( + Test__StartHookFail__FlagIsNone__SelectedNodesRun +): + @pytest.fixture(scope="class") + def flags(self): + return {"skip_nodes_if_on_run_start_fails": True} - assert f'relation "{project.test_schema}.my_start_table" does not exist' in log_output - assert "PASS=4 WARN=0 ERROR=1 SKIP=2 TOTAL=7" in log_output - assert "6 project hooks, 1 view model" in log_output + @pytest.fixture(scope="class") + def log_counts(self): + return "PASS=2 WARN=0 ERROR=1 SKIP=2 TOTAL=5" + + @pytest.fixture(scope="class") + def my_model_run_status(self): + return RunStatus.Skipped diff --git a/tests/functional/logging/test_logging.py b/tests/functional/logging/test_logging.py index e29bd9c7cac..bc769466a4b 100644 --- a/tests/functional/logging/test_logging.py +++ b/tests/functional/logging/test_logging.py @@ -87,7 +87,7 @@ def test_formatted_logs(project, logs_dir): if log_event == "Formatting": formatted_json_lines += 1 - assert formatted_json_lines == 5 + assert formatted_json_lines == 6 def test_invalid_event_value(project, logs_dir): From 37c62ab6cff276fa44695a4a99821cfe3b505782 Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Wed, 18 Sep 2024 15:31:21 +0100 Subject: [PATCH 13/24] Add test for on-run-end hooks --- core/dbt/task/printer.py | 3 +- core/dbt/task/run.py | 2 + core/dbt/task/runnable.py | 1 - .../adapter/hooks/test_on_run_hooks.py | 48 ++++++++++++++++--- 4 files changed, 45 insertions(+), 9 deletions(-) diff --git a/core/dbt/task/printer.py b/core/dbt/task/printer.py index a5995d50b40..5b37789e6ae 100644 --- a/core/dbt/task/printer.py +++ b/core/dbt/task/printer.py @@ -33,7 +33,8 @@ def get_counts(flat_nodes) -> str: counts[t] = counts.get(t, 0) + 1 - stat_line = ", ".join([pluralize(v, k).replace("_", " ") for k, v in counts.items()]) + sorted_items = sorted(counts.items(), key=lambda x: x[0]) + stat_line = ", ".join([pluralize(v, k).replace("_", " ") for k, v in sorted_items]) return stat_line diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index 833303774c3..63dc42d7620 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -331,6 +331,8 @@ def safe_run_hooks( failed = False num_hooks = len(ordered_hooks) + fire_event(Formatting("")) + for idx, hook in enumerate(ordered_hooks, 1): with log_contextvars(node_info=hook.node_info): hook.index = idx diff --git a/core/dbt/task/runnable.py b/core/dbt/task/runnable.py index a588e93d1e5..cc562c490f2 100644 --- a/core/dbt/task/runnable.py +++ b/core/dbt/task/runnable.py @@ -502,7 +502,6 @@ def execute_with_hooks(self, selected_uids: AbstractSet[str]): node_count=self.num_nodes, ) ) - fire_event(Formatting("")) self.started_at = time.time() try: diff --git a/tests/functional/adapter/hooks/test_on_run_hooks.py b/tests/functional/adapter/hooks/test_on_run_hooks.py index 16944558d0c..d11e017ed74 100644 --- a/tests/functional/adapter/hooks/test_on_run_hooks.py +++ b/tests/functional/adapter/hooks/test_on_run_hooks.py @@ -4,7 +4,7 @@ from dbt.tests.util import get_artifact, run_dbt_and_capture -class Test__StartHookFail__FlagIsNone__SelectedNodesRun: +class Test__StartHookFail__FlagIsNone__ModelFail: @pytest.fixture(scope="class") def flags(self): return {} @@ -59,17 +59,13 @@ def test_results(self, project, log_counts, my_model_run_status): ] == expected_results -class Test__StartHookFail__FlagIsFalse__SelectedNodesRun( - Test__StartHookFail__FlagIsNone__SelectedNodesRun -): +class Test__StartHookFail__FlagIsFalse__ModelFail(Test__StartHookFail__FlagIsNone__ModelFail): @pytest.fixture(scope="class") def flags(self): return {"skip_nodes_if_on_run_start_fails": False} -class Test__StartHookFail__FlagIsTrue__SelectedNodesSkipped( - Test__StartHookFail__FlagIsNone__SelectedNodesRun -): +class Test__StartHookFail__FlagIsTrue__ModelSkipped(Test__StartHookFail__FlagIsNone__ModelFail): @pytest.fixture(scope="class") def flags(self): return {"skip_nodes_if_on_run_start_fails": True} @@ -81,3 +77,41 @@ def log_counts(self): @pytest.fixture(scope="class") def my_model_run_status(self): return RunStatus.Skipped + + +class Test__ModelPass__EndHookFail: + @pytest.fixture(scope="class") + def project_config_update(self, on_run_end): + return { + "on-run-end": [ + "create table {{ target.schema }}.my_start_table ( id int )", # success + "drop table {{ target.schema }}.my_start_table", # success + "insert into {{ target.schema }}.my_start_table (id) values (1, 2, 3)", # fail + "create table {{ target.schema }}.my_start_table ( id int )", # skip + ], + } + + @pytest.fixture(scope="class") + def models(self): + return {"my_model.sql": "select 1"} + + def test_results(self, project): + results, log_output = run_dbt_and_capture(["run"], expect_pass=False) + + expected_results = [ + ("model.test.my_model", RunStatus.Success), + ("operation.test.test-on-run-end-0", RunStatus.Success), + ("operation.test.test-on-run-end-1", RunStatus.Success), + ("operation.test.test-on-run-end-2", RunStatus.Error), + ("operation.test.test-on-run-end-3", RunStatus.Skipped), + ] + + assert [(result.node.unique_id, result.status) for result in results] == expected_results + assert f'relation "{project.test_schema}.my_start_table" does not exist' in log_output + assert "PASS=3 WARN=0 ERROR=1 SKIP=1 TOTAL=5" in log_output + assert "4 project hooks, 1 view model" in log_output + + run_results = get_artifact(project.project_root, "target", "run_results.json") + assert [ + (result["unique_id"], result["status"]) for result in run_results["results"] + ] == expected_results From ad1d952c58b43562f2db7eabb1179dc6b37f47b6 Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Wed, 18 Sep 2024 16:08:13 +0100 Subject: [PATCH 14/24] do message test off run_results.json --- .../adapter/hooks/test_on_run_hooks.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/tests/functional/adapter/hooks/test_on_run_hooks.py b/tests/functional/adapter/hooks/test_on_run_hooks.py index d11e017ed74..1a423dd55e3 100644 --- a/tests/functional/adapter/hooks/test_on_run_hooks.py +++ b/tests/functional/adapter/hooks/test_on_run_hooks.py @@ -49,7 +49,6 @@ def test_results(self, project, log_counts, my_model_run_status): ] assert [(result.node.unique_id, result.status) for result in results] == expected_results - assert f'relation "{project.test_schema}.my_start_table" does not exist' in log_output assert log_counts in log_output assert "4 project hooks, 1 view model" in log_output @@ -57,6 +56,10 @@ def test_results(self, project, log_counts, my_model_run_status): assert [ (result["unique_id"], result["status"]) for result in run_results["results"] ] == expected_results + assert ( + f'relation "{project.test_schema}.my_start_table" does not exist' + in run_results["results"][2]["message"] + ) class Test__StartHookFail__FlagIsFalse__ModelFail(Test__StartHookFail__FlagIsNone__ModelFail): @@ -81,7 +84,7 @@ def my_model_run_status(self): class Test__ModelPass__EndHookFail: @pytest.fixture(scope="class") - def project_config_update(self, on_run_end): + def project_config_update(self): return { "on-run-end": [ "create table {{ target.schema }}.my_start_table ( id int )", # success @@ -96,7 +99,9 @@ def models(self): return {"my_model.sql": "select 1"} def test_results(self, project): - results, log_output = run_dbt_and_capture(["run"], expect_pass=False) + results, log_output = run_dbt_and_capture( + ["--debug", "run", "--log-format", "json"], expect_pass=False + ) expected_results = [ ("model.test.my_model", RunStatus.Success), @@ -107,7 +112,6 @@ def test_results(self, project): ] assert [(result.node.unique_id, result.status) for result in results] == expected_results - assert f'relation "{project.test_schema}.my_start_table" does not exist' in log_output assert "PASS=3 WARN=0 ERROR=1 SKIP=1 TOTAL=5" in log_output assert "4 project hooks, 1 view model" in log_output @@ -115,3 +119,7 @@ def test_results(self, project): assert [ (result["unique_id"], result["status"]) for result in run_results["results"] ] == expected_results + assert ( + f'relation "{project.test_schema}.my_start_table" does not exist' + in run_results["results"][3]["message"] + ) From 71ead6a24abfac70d15588265f05af77e669393a Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Wed, 18 Sep 2024 16:21:12 +0100 Subject: [PATCH 15/24] add no hooks ran test --- .../adapter/hooks/test_on_run_hooks.py | 59 ++++++++++++++----- 1 file changed, 45 insertions(+), 14 deletions(-) diff --git a/tests/functional/adapter/hooks/test_on_run_hooks.py b/tests/functional/adapter/hooks/test_on_run_hooks.py index 1a423dd55e3..738873c27eb 100644 --- a/tests/functional/adapter/hooks/test_on_run_hooks.py +++ b/tests/functional/adapter/hooks/test_on_run_hooks.py @@ -13,10 +13,10 @@ def flags(self): def project_config_update(self, flags): return { "on-run-start": [ - "create table {{ target.schema }}.my_start_table ( id int )", # success - "drop table {{ target.schema }}.my_start_table", # success - "insert into {{ target.schema }}.my_start_table (id) values (1, 2, 3)", # fail - "create table {{ target.schema }}.my_start_table ( id int )", # skip + "create table {{ target.schema }}.my_hook_table ( id int )", # success + "drop table {{ target.schema }}.my_hook_table", # success + "insert into {{ target.schema }}.my_hook_table (id) values (1, 2, 3)", # fail + "create table {{ target.schema }}.my_hook_table ( id int )", # skip ], "flags": flags, } @@ -24,7 +24,7 @@ def project_config_update(self, flags): @pytest.fixture(scope="class") def models(self): return { - "my_model.sql": "select * from {{ target.schema }}.my_start_table" + "my_model.sql": "select * from {{ target.schema }}.my_hook_table" " union all " "select * from {{ target.schema }}.my_end_table" } @@ -57,7 +57,7 @@ def test_results(self, project, log_counts, my_model_run_status): (result["unique_id"], result["status"]) for result in run_results["results"] ] == expected_results assert ( - f'relation "{project.test_schema}.my_start_table" does not exist' + f'relation "{project.test_schema}.my_hook_table" does not exist' in run_results["results"][2]["message"] ) @@ -87,10 +87,10 @@ class Test__ModelPass__EndHookFail: def project_config_update(self): return { "on-run-end": [ - "create table {{ target.schema }}.my_start_table ( id int )", # success - "drop table {{ target.schema }}.my_start_table", # success - "insert into {{ target.schema }}.my_start_table (id) values (1, 2, 3)", # fail - "create table {{ target.schema }}.my_start_table ( id int )", # skip + "create table {{ target.schema }}.my_hook_table ( id int )", # success + "drop table {{ target.schema }}.my_hook_table", # success + "insert into {{ target.schema }}.my_hook_table (id) values (1, 2, 3)", # fail + "create table {{ target.schema }}.my_hook_table ( id int )", # skip ], } @@ -99,9 +99,7 @@ def models(self): return {"my_model.sql": "select 1"} def test_results(self, project): - results, log_output = run_dbt_and_capture( - ["--debug", "run", "--log-format", "json"], expect_pass=False - ) + results, log_output = run_dbt_and_capture(["run"], expect_pass=False) expected_results = [ ("model.test.my_model", RunStatus.Success), @@ -120,6 +118,39 @@ def test_results(self, project): (result["unique_id"], result["status"]) for result in run_results["results"] ] == expected_results assert ( - f'relation "{project.test_schema}.my_start_table" does not exist' + f'relation "{project.test_schema}.my_hook_table" does not exist' in run_results["results"][3]["message"] ) + + +class Test__SelectorEmpty__NoHooksRan: + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "on-run-start": [ + "create table {{ target.schema }}.my_hook_table ( id int )", # success + "drop table {{ target.schema }}.my_hook_table", # success + ], + "on-run-end": [ + "create table {{ target.schema }}.my_hook_table ( id int )", # success + "drop table {{ target.schema }}.my_hook_table", # success + ], + } + + @pytest.fixture(scope="class") + def models(self): + return {"my_model.sql": "select 1"} + + def test_results(self, project): + results, log_output = run_dbt_and_capture( + ["--debug", "run", "--select", "tag:no_such_tag", "--log-format", "json"] + ) + + assert results.results == [] + assert ( + "The selection criterion 'tag:no_such_tag' does not match any enabled nodes" + in log_output + ) + + run_results = get_artifact(project.project_root, "target", "run_results.json") + assert [run_results["results"]] == [] From 0dcc944525e6016cb2be54733df79dae326aaa92 Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Wed, 18 Sep 2024 16:28:26 +0100 Subject: [PATCH 16/24] Add test for retry to always run hooks --- tests/functional/retry/test_retry.py | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/tests/functional/retry/test_retry.py b/tests/functional/retry/test_retry.py index 012db25e42f..3a002a8e361 100644 --- a/tests/functional/retry/test_retry.py +++ b/tests/functional/retry/test_retry.py @@ -365,3 +365,26 @@ def test_retry_target_path_flag(self, project): results = run_dbt(["retry", "--state", "artifacts", "--target-path", "my_target_path"]) assert len(results) == 1 assert Path("my_target_path").is_dir() + + +class TestRetryHooksAlwaysRun: + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "on-run-start": ["select 1;"], + "on-run-end": ["select 2;"], + } + + @pytest.fixture(scope="class") + def models(self): + return { + "sample_model.sql": models__sample_model, + } + + def test_retry_hooks_always_run(self, project): + res = run_dbt(["run", "--target-path", "target"], expect_pass=False) + assert len(res) == 3 + + write_file(models__second_model, "models", "sample_model.sql") + res = run_dbt(["retry", "--state", "target"]) + assert len(res) == 3 From 66fbae62cea1355525b87fbbffa6552eb352ad64 Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Wed, 18 Sep 2024 16:30:06 +0100 Subject: [PATCH 17/24] Add changie --- .changes/unreleased/Features-20240918-162959.yaml | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .changes/unreleased/Features-20240918-162959.yaml diff --git a/.changes/unreleased/Features-20240918-162959.yaml b/.changes/unreleased/Features-20240918-162959.yaml new file mode 100644 index 00000000000..0213f4f0a8f --- /dev/null +++ b/.changes/unreleased/Features-20240918-162959.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Add new 'skip_nodes_if_on_run_start_fails' to allow skipping nodes if on-run-start hooks fail +time: 2024-09-18T16:29:59.268422+01:00 +custom: + Author: aranke + Issue: "7387" From 4234f84315ee7b817b366f88504570edf7f4af6c Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Wed, 18 Sep 2024 16:31:54 +0100 Subject: [PATCH 18/24] Update Features-20240918-162959.yaml --- .changes/unreleased/Features-20240918-162959.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.changes/unreleased/Features-20240918-162959.yaml b/.changes/unreleased/Features-20240918-162959.yaml index 0213f4f0a8f..0fe132f5e11 100644 --- a/.changes/unreleased/Features-20240918-162959.yaml +++ b/.changes/unreleased/Features-20240918-162959.yaml @@ -1,5 +1,5 @@ kind: Features -body: Add new 'skip_nodes_if_on_run_start_fails' to allow skipping nodes if on-run-start hooks fail +body: Add new 'skip_nodes_if_on_run_start_fails' behavior change flag time: 2024-09-18T16:29:59.268422+01:00 custom: Author: aranke From 5abd9fb117aedd3628df4f0570c100f0ac2e5c23 Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Wed, 18 Sep 2024 16:32:30 +0100 Subject: [PATCH 19/24] Update Features-20240918-162959.yaml --- .changes/unreleased/Features-20240918-162959.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.changes/unreleased/Features-20240918-162959.yaml b/.changes/unreleased/Features-20240918-162959.yaml index 0fe132f5e11..62c037540fb 100644 --- a/.changes/unreleased/Features-20240918-162959.yaml +++ b/.changes/unreleased/Features-20240918-162959.yaml @@ -1,5 +1,5 @@ kind: Features -body: Add new 'skip_nodes_if_on_run_start_fails' behavior change flag +body: Create 'skip_nodes_if_on_run_start_fails' behavior change flag time: 2024-09-18T16:29:59.268422+01:00 custom: Author: aranke From 9c1ff8a8947618d8adbc71ac09e523659458ad1f Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Wed, 18 Sep 2024 17:11:47 +0100 Subject: [PATCH 20/24] Change tests --- tests/functional/adapter/hooks/test_on_run_hooks.py | 2 +- tests/functional/logging/test_logging.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/functional/adapter/hooks/test_on_run_hooks.py b/tests/functional/adapter/hooks/test_on_run_hooks.py index 738873c27eb..ffb34c3f23a 100644 --- a/tests/functional/adapter/hooks/test_on_run_hooks.py +++ b/tests/functional/adapter/hooks/test_on_run_hooks.py @@ -153,4 +153,4 @@ def test_results(self, project): ) run_results = get_artifact(project.project_root, "target", "run_results.json") - assert [run_results["results"]] == [] + assert run_results["results"] == [] diff --git a/tests/functional/logging/test_logging.py b/tests/functional/logging/test_logging.py index bc769466a4b..e29bd9c7cac 100644 --- a/tests/functional/logging/test_logging.py +++ b/tests/functional/logging/test_logging.py @@ -87,7 +87,7 @@ def test_formatted_logs(project, logs_dir): if log_event == "Formatting": formatted_json_lines += 1 - assert formatted_json_lines == 6 + assert formatted_json_lines == 5 def test_invalid_event_value(project, logs_dir): From 53ee509c9d658cbb092237e4c7a4d731527ab2ef Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Fri, 20 Sep 2024 14:07:47 +0100 Subject: [PATCH 21/24] Change formatting order, rename variable --- core/dbt/task/run.py | 8 ++++++-- core/dbt/task/runnable.py | 11 +++++------ 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index 4c7e706780d..a10abd565f7 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -532,6 +532,9 @@ def safe_run_hooks( started_at = datetime.utcnow() ordered_hooks = self.get_hooks_by_type(hook_type) + if hook_type == RunHookType.End and ordered_hooks: + fire_event(Formatting("")) + # on-run-* hooks should run outside a transaction. This happens because psycopg2 automatically begins a transaction when a connection is created. adapter.clear_transaction() if not ordered_hooks: @@ -541,8 +544,6 @@ def safe_run_hooks( failed = False num_hooks = len(ordered_hooks) - fire_event(Formatting("")) - for idx, hook in enumerate(ordered_hooks, 1): with log_contextvars(node_info=hook.node_info): hook.index = idx @@ -605,6 +606,9 @@ def safe_run_hooks( ) ) + if hook_type == RunHookType.Start and ordered_hooks: + fire_event(Formatting("")) + return status def print_results_line(self, results, execution_time) -> None: diff --git a/core/dbt/task/runnable.py b/core/dbt/task/runnable.py index cc562c490f2..be926e7275c 100644 --- a/core/dbt/task/runnable.py +++ b/core/dbt/task/runnable.py @@ -495,6 +495,7 @@ def print_results_line(self, node_results, elapsed): def execute_with_hooks(self, selected_uids: AbstractSet[str]): adapter = get_adapter(self.config) + fire_event(Formatting("")) fire_event( ConcurrencyLine( num_threads=self.config.threads, @@ -502,11 +503,11 @@ def execute_with_hooks(self, selected_uids: AbstractSet[str]): node_count=self.num_nodes, ) ) + fire_event(Formatting("")) self.started_at = time.time() try: before_run_status = self.before_run(adapter, selected_uids) - fire_event(Formatting("")) if before_run_status == RunStatus.Success or ( not get_flags().skip_nodes_if_on_run_start_fails @@ -531,9 +532,9 @@ def execute_with_hooks(self, selected_uids: AbstractSet[str]): node_info=node.node_info, ) ) - skipped_node = mark_node_as_skipped(node, executed_node_ids, None) - if skipped_node: - self.node_results.append(skipped_node) + skipped_node_result = mark_node_as_skipped(node, executed_node_ids, None) + if skipped_node_result: + self.node_results.append(skipped_node_result) self.after_run(adapter, res) finally: @@ -561,7 +562,6 @@ def run(self): ) if len(self._flattened_nodes) == 0: - fire_event(Formatting("")) warn_or_error(NothingToDo()) result = self.get_result( results=[], @@ -569,7 +569,6 @@ def run(self): elapsed_time=0.0, ) else: - fire_event(Formatting("")) selected_uids = frozenset(n.unique_id for n in self._flattened_nodes) result = self.execute_with_hooks(selected_uids) From 2ad3dd25166b1207f4daea33878737ec2f2b64d6 Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Thu, 26 Sep 2024 16:18:04 +0100 Subject: [PATCH 22/24] make imports absolute --- core/dbt/task/run.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index b72c730f406..05b3287a0d1 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -17,7 +17,9 @@ TimingInfo, ) from dbt.artifacts.schemas.run import RunResult +from dbt.cli.flags import Flags from dbt.clients.jinja import MacroGenerator +from dbt.config import RuntimeConfig from dbt.context.providers import generate_runtime_model_context from dbt.contracts.graph.manifest import Manifest from dbt.contracts.graph.nodes import HookNode, ModelNode, ResultNode @@ -34,6 +36,8 @@ from dbt.materializations.incremental.microbatch import MicrobatchBuilder from dbt.node_types import NodeType, RunHookType from dbt.task.base import BaseRunner +from dbt.task.compile import CompileRunner, CompileTask +from dbt.task.printer import get_counts, print_run_end_messages from dbt_common.clients.jinja import MacroProtocol from dbt_common.dataclass_schema import dbtClassMixin from dbt_common.events.base_types import EventLevel @@ -42,11 +46,6 @@ from dbt_common.events.types import Formatting from dbt_common.exceptions import DbtValidationError -from ..cli.flags import Flags -from ..config import RuntimeConfig -from .compile import CompileRunner, CompileTask -from .printer import get_counts, print_run_end_messages - @functools.total_ordering class BiggestName(str): From dbfaa226649e534b5ca8b7050701a18b3e8e1d71 Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Thu, 26 Sep 2024 16:52:15 +0100 Subject: [PATCH 23/24] ignore operations unless we're in run-operation --- core/dbt/task/retry.py | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/core/dbt/task/retry.py b/core/dbt/task/retry.py index eb7b22325ad..83b29695f66 100644 --- a/core/dbt/task/retry.py +++ b/core/dbt/task/retry.py @@ -121,13 +121,15 @@ def __init__(self, args: Flags, config: RuntimeConfig) -> None: self.task_class = TASK_DICT.get(self.previous_command_name) # type: ignore def run(self): - unique_ids = set( - [ - result.unique_id - for result in self.previous_results.results - if result.status in RETRYABLE_STATUSES - ] - ) + unique_ids = { + result.unique_id + for result in self.previous_results.results + if result.status in RETRYABLE_STATUSES + and not ( + self.previous_command_name != "run-operation" + and result.unique_id.startswith("operation") + ) + } batch_map = { result.unique_id: result.batch_results.failed @@ -135,6 +137,10 @@ def run(self): if result.status == NodeStatus.PartialSuccess and result.batch_results is not None and len(result.batch_results.failed) > 0 + and not ( + self.previous_command_name != "run-operation" + and result.unique_id.startswith("operation") + ) } class TaskWrapper(self.task_class): From d80820428f3c6466ba8e23c30c02b47d685d4d1f Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Fri, 27 Sep 2024 15:07:48 +0100 Subject: [PATCH 24/24] Add test for fixing retry hook --- core/dbt/task/retry.py | 4 +- tests/functional/retry/test_retry.py | 68 +++++++++++++++++++++++++++- 2 files changed, 69 insertions(+), 3 deletions(-) diff --git a/core/dbt/task/retry.py b/core/dbt/task/retry.py index 83b29695f66..4f08804d191 100644 --- a/core/dbt/task/retry.py +++ b/core/dbt/task/retry.py @@ -127,7 +127,7 @@ def run(self): if result.status in RETRYABLE_STATUSES and not ( self.previous_command_name != "run-operation" - and result.unique_id.startswith("operation") + and result.unique_id.startswith("operation.") ) } @@ -139,7 +139,7 @@ def run(self): and len(result.batch_results.failed) > 0 and not ( self.previous_command_name != "run-operation" - and result.unique_id.startswith("operation") + and result.unique_id.startswith("operation.") ) } diff --git a/tests/functional/retry/test_retry.py b/tests/functional/retry/test_retry.py index 3a002a8e361..0a9e94f2e06 100644 --- a/tests/functional/retry/test_retry.py +++ b/tests/functional/retry/test_retry.py @@ -5,7 +5,7 @@ from dbt.contracts.results import RunStatus, TestStatus from dbt.exceptions import DbtRuntimeError, TargetNotFoundError -from dbt.tests.util import rm_file, run_dbt, write_file +from dbt.tests.util import rm_file, run_dbt, update_config_file, write_file from tests.functional.retry.fixtures import ( macros__alter_timezone_sql, models__sample_model, @@ -388,3 +388,69 @@ def test_retry_hooks_always_run(self, project): write_file(models__second_model, "models", "sample_model.sql") res = run_dbt(["retry", "--state", "target"]) assert len(res) == 3 + + +class TestFixRetryHook: + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "flags": { + "skip_nodes_if_on_run_start_fails": True, + }, + "on-run-start": [ + "select 1 as id", + "select column_does_not_exist", + "select 2 as id", + ], + } + + @pytest.fixture(scope="class") + def models(self): + return { + "sample_model.sql": "select 1 as id, 1 as foo", + "second_model.sql": models__second_model, + "union_model.sql": models__union_model, + } + + def test_fix_retry_hook(self, project): + res = run_dbt(["run"], expect_pass=False) + assert {r.node.unique_id: r.status for r in res.results} == { + "operation.test.test-on-run-start-0": RunStatus.Success, + "operation.test.test-on-run-start-1": RunStatus.Error, + "operation.test.test-on-run-start-2": RunStatus.Skipped, + "model.test.sample_model": RunStatus.Skipped, + "model.test.second_model": RunStatus.Skipped, + "model.test.union_model": RunStatus.Skipped, + } + + res = run_dbt(["retry"], expect_pass=False) + assert {r.node.unique_id: r.status for r in res.results} == { + "operation.test.test-on-run-start-0": RunStatus.Success, + "operation.test.test-on-run-start-1": RunStatus.Error, + "operation.test.test-on-run-start-2": RunStatus.Skipped, + "model.test.sample_model": RunStatus.Skipped, + "model.test.second_model": RunStatus.Skipped, + "model.test.union_model": RunStatus.Skipped, + } + + new_dbt_project_yml = { + "flags": { + "skip_nodes_if_on_run_start_fails": True, + }, + "on-run-start": [ + "select 1 as id", + "select 3 as id", + "select 2 as id", + ], + } + + update_config_file(new_dbt_project_yml, project.project_root, "dbt_project.yml") + res = run_dbt(["retry"]) + assert {r.node.unique_id: r.status for r in res.results} == { + "operation.test.test-on-run-start-0": RunStatus.Success, + "operation.test.test-on-run-start-1": RunStatus.Success, + "operation.test.test-on-run-start-2": RunStatus.Success, + "model.test.sample_model": RunStatus.Success, + "model.test.second_model": RunStatus.Success, + "model.test.union_model": RunStatus.Success, + }