diff --git a/.changes/unreleased/Features-20240918-162959.yaml b/.changes/unreleased/Features-20240918-162959.yaml new file mode 100644 index 00000000000..62c037540fb --- /dev/null +++ b/.changes/unreleased/Features-20240918-162959.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Create 'skip_nodes_if_on_run_start_fails' behavior change flag +time: 2024-09-18T16:29:59.268422+01:00 +custom: + Author: aranke + Issue: "7387" diff --git a/core/dbt/contracts/graph/nodes.py b/core/dbt/contracts/graph/nodes.py index f4b907a9a3c..d04d03d0f81 100644 --- a/core/dbt/contracts/graph/nodes.py +++ b/core/dbt/contracts/graph/nodes.py @@ -1669,6 +1669,7 @@ class ParsedMacroPatch(ParsedPatch): ResultNode = Union[ ManifestNode, SourceDefinition, + HookNode, ] # All nodes that can be in the DAG diff --git a/core/dbt/contracts/project.py b/core/dbt/contracts/project.py index e08131ecd8f..5c3b80e34f0 100644 --- a/core/dbt/contracts/project.py +++ b/core/dbt/contracts/project.py @@ -341,6 +341,7 @@ class ProjectFlags(ExtensibleDbtClassMixin): require_explicit_package_overrides_for_builtin_materializations: bool = True require_resource_names_without_spaces: bool = False source_freshness_run_project_hooks: bool = False + skip_nodes_if_on_run_start_fails: bool = False state_modified_compare_more_unrendered_values: bool = False @property @@ -349,6 +350,7 @@ def project_only_flags(self) -> Dict[str, Any]: "require_explicit_package_overrides_for_builtin_materializations": self.require_explicit_package_overrides_for_builtin_materializations, "require_resource_names_without_spaces": self.require_resource_names_without_spaces, "source_freshness_run_project_hooks": self.source_freshness_run_project_hooks, + "skip_nodes_if_on_run_start_fails": self.skip_nodes_if_on_run_start_fails, "state_modified_compare_more_unrendered_values": self.state_modified_compare_more_unrendered_values, } diff --git a/core/dbt/events/types.py b/core/dbt/events/types.py index e55e1cdd252..d5b6256a4f6 100644 --- a/core/dbt/events/types.py +++ b/core/dbt/events/types.py @@ -1516,10 +1516,20 @@ def code(self) -> str: return "Q033" def message(self) -> str: - msg = f"OK hook: {self.statement}" + if self.status == "success": + info = "OK" + status = green(info) + elif self.status == "skipped": + info = "SKIP" + status = yellow(info) + else: + info = "ERROR" + status = red(info) + msg = f"{info} hook: {self.statement}" + return format_fancy_output_line( msg=msg, - status=green(self.status), + status=status, index=self.index, total=self.total, execution_time=self.execution_time, diff --git a/core/dbt/task/clone.py b/core/dbt/task/clone.py index 68012861907..0e5b797ed64 100644 --- a/core/dbt/task/clone.py +++ b/core/dbt/task/clone.py @@ -1,7 +1,7 @@ import threading from typing import AbstractSet, Any, Collection, Iterable, List, Optional, Set, Type -from dbt.adapters.base import BaseRelation +from dbt.adapters.base import BaseAdapter, BaseRelation from dbt.artifacts.resources.types import NodeType from dbt.artifacts.schemas.run import RunResult, RunStatus from dbt.clients.jinja import MacroGenerator @@ -125,7 +125,7 @@ def get_model_schemas(self, adapter, selected_uids: Iterable[str]) -> Set[BaseRe return result - def before_run(self, adapter, selected_uids: AbstractSet[str]): + def before_run(self, adapter: BaseAdapter, selected_uids: AbstractSet[str]) -> RunStatus: with adapter.connection_named("master"): self.defer_to_manifest() # only create target schemas, but also cache defer_relation schemas @@ -133,6 +133,7 @@ def before_run(self, adapter, selected_uids: AbstractSet[str]): self.create_schemas(adapter, schemas_to_create) schemas_to_cache = self.get_model_schemas(adapter, selected_uids) self.populate_adapter_cache(adapter, schemas_to_cache) + return RunStatus.Success @property def resource_types(self) -> List[NodeType]: diff --git a/core/dbt/task/freshness.py b/core/dbt/task/freshness.py index eb1508acb36..06e78b17c7b 100644 --- a/core/dbt/task/freshness.py +++ b/core/dbt/task/freshness.py @@ -4,6 +4,7 @@ from typing import AbstractSet, Dict, List, Optional, Type from dbt import deprecations +from dbt.adapters.base import BaseAdapter from dbt.adapters.base.impl import FreshnessResponse from dbt.adapters.base.relation import BaseRelation from dbt.adapters.capability import Capability @@ -204,10 +205,25 @@ def get_node_selector(self): resource_types=[NodeType.Source], ) - def before_run(self, adapter, selected_uids: AbstractSet[str]) -> None: - super().before_run(adapter, selected_uids) - if adapter.supports(Capability.TableLastModifiedMetadataBatch): - self.populate_metadata_freshness_cache(adapter, selected_uids) + def before_run(self, adapter: BaseAdapter, selected_uids: AbstractSet[str]) -> RunStatus: + populate_metadata_freshness_cache_status = RunStatus.Success + + before_run_status = super().before_run(adapter, selected_uids) + + if before_run_status == RunStatus.Success and adapter.supports( + Capability.TableLastModifiedMetadataBatch + ): + populate_metadata_freshness_cache_status = self.populate_metadata_freshness_cache( + adapter, selected_uids + ) + + if ( + before_run_status == RunStatus.Success + and populate_metadata_freshness_cache_status == RunStatus.Success + ): + return RunStatus.Success + else: + return RunStatus.Error def get_runner(self, node) -> BaseRunner: freshness_runner = super().get_runner(node) @@ -243,7 +259,9 @@ def get_hooks_by_type(self, hook_type: RunHookType) -> List[HookNode]: deprecations.warn("source-freshness-project-hooks") return [] - def populate_metadata_freshness_cache(self, adapter, selected_uids: AbstractSet[str]) -> None: + def populate_metadata_freshness_cache( + self, adapter, selected_uids: AbstractSet[str] + ) -> RunStatus: if self.manifest is None: raise DbtInternalError("Manifest must be set to populate metadata freshness cache") @@ -266,6 +284,7 @@ def populate_metadata_freshness_cache(self, adapter, selected_uids: AbstractSet[ batch_metadata_sources ) self._metadata_freshness_cache.update(metadata_freshness_results) + return RunStatus.Success except Exception as e: # This error handling is intentionally very coarse. # If anything goes wrong during batch metadata calculation, we can safely @@ -276,6 +295,7 @@ def populate_metadata_freshness_cache(self, adapter, selected_uids: AbstractSet[ Note(msg=f"Metadata freshness could not be computed in batch: {e}"), EventLevel.WARN, ) + return RunStatus.Error def get_freshness_metadata_cache(self) -> Dict[BaseRelation, FreshnessResult]: return self._metadata_freshness_cache diff --git a/core/dbt/task/printer.py b/core/dbt/task/printer.py index a88690e1046..58a39552450 100644 --- a/core/dbt/task/printer.py +++ b/core/dbt/task/printer.py @@ -33,7 +33,8 @@ def get_counts(flat_nodes) -> str: counts[t] = counts.get(t, 0) + 1 - stat_line = ", ".join([pluralize(v, k).replace("_", " ") for k, v in counts.items()]) + sorted_items = sorted(counts.items(), key=lambda x: x[0]) + stat_line = ", ".join([pluralize(v, k).replace("_", " ") for k, v in sorted_items]) return stat_line diff --git a/core/dbt/task/retry.py b/core/dbt/task/retry.py index eb7b22325ad..4f08804d191 100644 --- a/core/dbt/task/retry.py +++ b/core/dbt/task/retry.py @@ -121,13 +121,15 @@ def __init__(self, args: Flags, config: RuntimeConfig) -> None: self.task_class = TASK_DICT.get(self.previous_command_name) # type: ignore def run(self): - unique_ids = set( - [ - result.unique_id - for result in self.previous_results.results - if result.status in RETRYABLE_STATUSES - ] - ) + unique_ids = { + result.unique_id + for result in self.previous_results.results + if result.status in RETRYABLE_STATUSES + and not ( + self.previous_command_name != "run-operation" + and result.unique_id.startswith("operation.") + ) + } batch_map = { result.unique_id: result.batch_results.failed @@ -135,6 +137,10 @@ def run(self): if result.status == NodeStatus.PartialSuccess and result.batch_results is not None and len(result.batch_results.failed) > 0 + and not ( + self.previous_command_name != "run-operation" + and result.unique_id.startswith("operation.") + ) } class TaskWrapper(self.task_class): diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index ac8298cc9d9..12e17de02c9 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -1,30 +1,25 @@ import functools import os import threading -import time from datetime import datetime from typing import AbstractSet, Any, Dict, Iterable, List, Optional, Set, Tuple, Type from dbt import tracking, utils -from dbt.adapters.base import BaseRelation -from dbt.adapters.events.types import ( - DatabaseErrorRunningHook, - FinishedRunningStats, - HooksRunning, -) +from dbt.adapters.base import BaseAdapter, BaseRelation +from dbt.adapters.events.types import FinishedRunningStats from dbt.adapters.exceptions import MissingMaterializationError from dbt.artifacts.resources import Hook from dbt.artifacts.schemas.batch_results import BatchResults, BatchType from dbt.artifacts.schemas.results import ( - BaseResult, NodeStatus, RunningStatus, RunStatus, + TimingInfo, ) from dbt.artifacts.schemas.run import RunResult from dbt.cli.flags import Flags from dbt.clients.jinja import MacroGenerator -from dbt.config.runtime import RuntimeConfig +from dbt.config import RuntimeConfig from dbt.context.providers import generate_runtime_model_context from dbt.contracts.graph.manifest import Manifest from dbt.contracts.graph.nodes import HookNode, ModelNode, ResultNode @@ -40,7 +35,10 @@ from dbt.hooks import get_hook_dict from dbt.materializations.incremental.microbatch import MicrobatchBuilder from dbt.node_types import NodeType, RunHookType +from dbt.task import group_lookup from dbt.task.base import BaseRunner +from dbt.task.compile import CompileRunner, CompileTask +from dbt.task.printer import get_counts, print_run_end_messages from dbt_common.clients.jinja import MacroProtocol from dbt_common.dataclass_schema import dbtClassMixin from dbt_common.events.base_types import EventLevel @@ -49,29 +47,6 @@ from dbt_common.events.types import Formatting from dbt_common.exceptions import DbtValidationError -from . import group_lookup -from .compile import CompileRunner, CompileTask -from .printer import get_counts, print_run_end_messages - - -class Timer: - def __init__(self) -> None: - self.start = None - self.end = None - - @property - def elapsed(self): - if self.start is None or self.end is None: - return None - return self.end - self.start - - def __enter__(self): - self.start = time.time() - return self - - def __exit__(self, exc_type, exc_value, exc_tracebck): - self.end = time.time() - @functools.total_ordering class BiggestName(str): @@ -107,6 +82,21 @@ def get_hook(source, index): return Hook.from_dict(hook_dict) +def get_execution_status(sql: str, adapter: BaseAdapter) -> Tuple[RunStatus, str]: + if not sql.strip(): + return RunStatus.Success, "OK" + + try: + response, _ = adapter.execute(sql, auto_begin=False, fetch=False) + status = RunStatus.Success + message = response._message + except DbtRuntimeError as exc: + status = RunStatus.Error + message = exc.msg + finally: + return status, message + + def track_model_run(index, num_nodes, run_model_result): if tracking.active_user is None: raise DbtInternalError("cannot track model run with no active user") @@ -577,13 +567,8 @@ def __init__( batch_map: Optional[Dict[str, List[BatchType]]] = None, ) -> None: super().__init__(args, config, manifest) - self.ran_hooks: List[HookNode] = [] - self._total_executed = 0 self.batch_map = batch_map - def index_offset(self, value: int) -> int: - return self._total_executed + value - def raise_on_first_error(self) -> bool: return False @@ -614,88 +599,93 @@ def get_hooks_by_type(self, hook_type: RunHookType) -> List[HookNode]: hooks.sort(key=self._hook_keyfunc) return hooks - def run_hooks(self, adapter, hook_type: RunHookType, extra_context) -> None: + def safe_run_hooks( + self, adapter: BaseAdapter, hook_type: RunHookType, extra_context: Dict[str, Any] + ) -> RunStatus: + started_at = datetime.utcnow() ordered_hooks = self.get_hooks_by_type(hook_type) - # on-run-* hooks should run outside of a transaction. This happens - # b/c psycopg2 automatically begins a transaction when a connection - # is created. + if hook_type == RunHookType.End and ordered_hooks: + fire_event(Formatting("")) + + # on-run-* hooks should run outside a transaction. This happens because psycopg2 automatically begins a transaction when a connection is created. adapter.clear_transaction() if not ordered_hooks: - return - num_hooks = len(ordered_hooks) + return RunStatus.Success - fire_event(Formatting("")) - fire_event(HooksRunning(num_hooks=num_hooks, hook_type=hook_type)) + status = RunStatus.Success + failed = False + num_hooks = len(ordered_hooks) - for idx, hook in enumerate(ordered_hooks, start=1): - # We want to include node_info in the appropriate log files, so use - # log_contextvars + for idx, hook in enumerate(ordered_hooks, 1): with log_contextvars(node_info=hook.node_info): - hook.update_event_status( - started_at=datetime.utcnow().isoformat(), node_status=RunningStatus.Started - ) - sql = self.get_hook_sql(adapter, hook, idx, num_hooks, extra_context) - - hook_text = "{}.{}.{}".format(hook.package_name, hook_type, hook.index) - fire_event( - LogHookStartLine( - statement=hook_text, - index=idx, - total=num_hooks, - node_info=hook.node_info, + hook.index = idx + hook_name = f"{hook.package_name}.{hook_type}.{hook.index - 1}" + execution_time = 0.0 + timing = [] + failures = 1 + + if not failed: + hook.update_event_status( + started_at=started_at.isoformat(), node_status=RunningStatus.Started ) - ) + sql = self.get_hook_sql(adapter, hook, hook.index, num_hooks, extra_context) + fire_event( + LogHookStartLine( + statement=hook_name, + index=hook.index, + total=num_hooks, + node_info=hook.node_info, + ) + ) + + status, message = get_execution_status(sql, adapter) + finished_at = datetime.utcnow() + hook.update_event_status(finished_at=finished_at.isoformat()) + execution_time = (finished_at - started_at).total_seconds() + timing = [TimingInfo(hook_name, started_at, finished_at)] + failures = 0 if status == RunStatus.Success else 1 - with Timer() as timer: - if len(sql.strip()) > 0: - response, _ = adapter.execute(sql, auto_begin=False, fetch=False) - status = response._message + if status == RunStatus.Success: + message = f"{hook_name} passed" else: - status = "OK" + message = f"{hook_name} failed, error:\n {message}" + failed = True + else: + status = RunStatus.Skipped + message = f"{hook_name} skipped" + + self.node_results.append( + RunResult( + status=status, + thread_id="main", + timing=timing, + message=message, + adapter_response={}, + execution_time=execution_time, + failures=failures, + node=hook, + ) + ) - self.ran_hooks.append(hook) - hook.update_event_status(finished_at=datetime.utcnow().isoformat()) - hook.update_event_status(node_status=RunStatus.Success) fire_event( LogHookEndLine( - statement=hook_text, + statement=hook_name, status=status, - index=idx, + index=hook.index, total=num_hooks, - execution_time=timer.elapsed, + execution_time=execution_time, node_info=hook.node_info, ) ) - # `_event_status` dict is only used for logging. Make sure - # it gets deleted when we're done with it - hook.clear_event_status() - self._total_executed += len(ordered_hooks) + if hook_type == RunHookType.Start and ordered_hooks: + fire_event(Formatting("")) - fire_event(Formatting("")) - - def safe_run_hooks( - self, adapter, hook_type: RunHookType, extra_context: Dict[str, Any] - ) -> None: - try: - self.run_hooks(adapter, hook_type, extra_context) - except DbtRuntimeError as exc: - fire_event(DatabaseErrorRunningHook(hook_type=hook_type.value)) - self.node_results.append( - BaseResult( - status=RunStatus.Error, - thread_id="main", - timing=[], - message=f"{hook_type.value} failed, error:\n {exc.msg}", - adapter_response={}, - execution_time=0, - failures=1, - ) - ) + return status def print_results_line(self, results, execution_time) -> None: - nodes = [r.node for r in results if hasattr(r, "node")] + self.ran_hooks + nodes = [r.node for r in results if hasattr(r, "node")] stat_line = get_counts(nodes) execution = "" @@ -718,15 +708,16 @@ def populate_microbatch_batches(self, selected_uids: AbstractSet[str]): if isinstance(node, ModelNode): node.batches = self.batch_map[uid] - def before_run(self, adapter, selected_uids: AbstractSet[str]) -> None: + def before_run(self, adapter: BaseAdapter, selected_uids: AbstractSet[str]) -> RunStatus: with adapter.connection_named("master"): self.defer_to_manifest() required_schemas = self.get_model_schemas(adapter, selected_uids) self.create_schemas(adapter, required_schemas) self.populate_adapter_cache(adapter, required_schemas) self.populate_microbatch_batches(selected_uids) - self.safe_run_hooks(adapter, RunHookType.Start, {}) group_lookup.init(self.manifest, selected_uids) + run_hooks_status = self.safe_run_hooks(adapter, RunHookType.Start, {}) + return run_hooks_status def after_run(self, adapter, results) -> None: # in on-run-end hooks, provide the value 'database_schemas', which is a @@ -741,8 +732,6 @@ def after_run(self, adapter, results) -> None: and r.status not in (NodeStatus.Error, NodeStatus.Fail, NodeStatus.Skipped) } - self._total_executed += len(results) - extras = { "schemas": list({s for _, s in database_schema_set}), "results": results, diff --git a/core/dbt/task/runnable.py b/core/dbt/task/runnable.py index e7e66ce2f60..a37308f81ea 100644 --- a/core/dbt/task/runnable.py +++ b/core/dbt/task/runnable.py @@ -11,7 +11,7 @@ import dbt.tracking import dbt.utils import dbt_common.utils.formatting -from dbt.adapters.base import BaseRelation +from dbt.adapters.base import BaseAdapter, BaseRelation from dbt.adapters.factory import get_adapter from dbt.artifacts.schemas.results import ( BaseResult, @@ -36,6 +36,7 @@ NodeStart, NothingToDo, QueryCancelationUnsupported, + SkippingDetails, ) from dbt.exceptions import DbtInternalError, DbtRuntimeError, FailFastError from dbt.flags import get_flags @@ -65,6 +66,14 @@ class GraphRunnableMode(StrEnum): Independent = "independent" +def mark_node_as_skipped( + node: ResultNode, executed_node_ids: Set[str], message: Optional[str] +) -> Optional[RunResult]: + if node.unique_id not in executed_node_ids: + return RunResult.from_node(node, RunStatus.Skipped, message) + return None + + class GraphRunnableTask(ConfiguredTask): MARK_DEPENDENT_ERRORS_STATUSES = [NodeStatus.Error, NodeStatus.PartialSuccess] @@ -390,14 +399,6 @@ def _cancel_connections(self, pool): def execute_nodes(self): num_threads = self.config.threads - target_name = self.config.target_name - - fire_event( - ConcurrencyLine( - num_threads=num_threads, target_name=target_name, node_count=self.num_nodes - ) - ) - fire_event(Formatting("")) pool = ThreadPool(num_threads, self._pool_thread_initializer, [get_invocation_context()]) try: @@ -405,12 +406,13 @@ def execute_nodes(self): except FailFastError as failure: self._cancel_connections(pool) - executed_node_ids = [r.node.unique_id for r in self.node_results] + executed_node_ids = {r.node.unique_id for r in self.node_results} + message = "Skipping due to fail_fast" - for r in self._flattened_nodes: - if r.unique_id not in executed_node_ids: + for node in self._flattened_nodes: + if node.unique_id not in executed_node_ids: self.node_results.append( - RunResult.from_node(r, RunStatus.Skipped, "Skipping due to fail_fast") + mark_node_as_skipped(node, executed_node_ids, message) ) print_run_result_error(failure.result) @@ -482,10 +484,11 @@ def populate_adapter_cache( {"adapter_cache_construction_elapsed": cache_populate_time} ) - def before_run(self, adapter, selected_uids: AbstractSet[str]): + def before_run(self, adapter: BaseAdapter, selected_uids: AbstractSet[str]) -> RunStatus: with adapter.connection_named("master"): self.defer_to_manifest() self.populate_adapter_cache(adapter) + return RunStatus.Success def after_run(self, adapter, results) -> None: pass @@ -495,10 +498,48 @@ def print_results_line(self, node_results, elapsed): def execute_with_hooks(self, selected_uids: AbstractSet[str]): adapter = get_adapter(self.config) + + fire_event(Formatting("")) + fire_event( + ConcurrencyLine( + num_threads=self.config.threads, + target_name=self.config.target_name, + node_count=self.num_nodes, + ) + ) + fire_event(Formatting("")) + self.started_at = time.time() try: - self.before_run(adapter, selected_uids) - res = self.execute_nodes() + before_run_status = self.before_run(adapter, selected_uids) + + if before_run_status == RunStatus.Success or ( + not get_flags().skip_nodes_if_on_run_start_fails + ): + res = self.execute_nodes() + else: + executed_node_ids = { + r.node.unique_id for r in self.node_results if hasattr(r, "node") + } + + res = [] + + for index, node in enumerate(self._flattened_nodes or []): + if node.unique_id not in executed_node_ids: + fire_event( + SkippingDetails( + resource_type=node.resource_type, + schema=node.schema, + node_name=node.name, + index=index + 1, + total=self.num_nodes, + node_info=node.node_info, + ) + ) + skipped_node_result = mark_node_as_skipped(node, executed_node_ids, None) + if skipped_node_result: + self.node_results.append(skipped_node_result) + self.after_run(adapter, res) finally: adapter.cleanup_connections() @@ -525,7 +566,6 @@ def run(self): ) if len(self._flattened_nodes) == 0: - fire_event(Formatting("")) warn_or_error(NothingToDo()) result = self.get_result( results=[], @@ -533,7 +573,6 @@ def run(self): elapsed_time=0.0, ) else: - fire_event(Formatting("")) selected_uids = frozenset(n.unique_id for n in self._flattened_nodes) result = self.execute_with_hooks(selected_uids) diff --git a/tests/functional/adapter/hooks/test_on_run_hooks.py b/tests/functional/adapter/hooks/test_on_run_hooks.py new file mode 100644 index 00000000000..ffb34c3f23a --- /dev/null +++ b/tests/functional/adapter/hooks/test_on_run_hooks.py @@ -0,0 +1,156 @@ +import pytest + +from dbt.artifacts.schemas.results import RunStatus +from dbt.tests.util import get_artifact, run_dbt_and_capture + + +class Test__StartHookFail__FlagIsNone__ModelFail: + @pytest.fixture(scope="class") + def flags(self): + return {} + + @pytest.fixture(scope="class") + def project_config_update(self, flags): + return { + "on-run-start": [ + "create table {{ target.schema }}.my_hook_table ( id int )", # success + "drop table {{ target.schema }}.my_hook_table", # success + "insert into {{ target.schema }}.my_hook_table (id) values (1, 2, 3)", # fail + "create table {{ target.schema }}.my_hook_table ( id int )", # skip + ], + "flags": flags, + } + + @pytest.fixture(scope="class") + def models(self): + return { + "my_model.sql": "select * from {{ target.schema }}.my_hook_table" + " union all " + "select * from {{ target.schema }}.my_end_table" + } + + @pytest.fixture(scope="class") + def log_counts(self): + return "PASS=2 WARN=0 ERROR=2 SKIP=1 TOTAL=5" + + @pytest.fixture(scope="class") + def my_model_run_status(self): + return RunStatus.Error + + def test_results(self, project, log_counts, my_model_run_status): + results, log_output = run_dbt_and_capture(["run"], expect_pass=False) + + expected_results = [ + ("operation.test.test-on-run-start-0", RunStatus.Success), + ("operation.test.test-on-run-start-1", RunStatus.Success), + ("operation.test.test-on-run-start-2", RunStatus.Error), + ("operation.test.test-on-run-start-3", RunStatus.Skipped), + ("model.test.my_model", my_model_run_status), + ] + + assert [(result.node.unique_id, result.status) for result in results] == expected_results + assert log_counts in log_output + assert "4 project hooks, 1 view model" in log_output + + run_results = get_artifact(project.project_root, "target", "run_results.json") + assert [ + (result["unique_id"], result["status"]) for result in run_results["results"] + ] == expected_results + assert ( + f'relation "{project.test_schema}.my_hook_table" does not exist' + in run_results["results"][2]["message"] + ) + + +class Test__StartHookFail__FlagIsFalse__ModelFail(Test__StartHookFail__FlagIsNone__ModelFail): + @pytest.fixture(scope="class") + def flags(self): + return {"skip_nodes_if_on_run_start_fails": False} + + +class Test__StartHookFail__FlagIsTrue__ModelSkipped(Test__StartHookFail__FlagIsNone__ModelFail): + @pytest.fixture(scope="class") + def flags(self): + return {"skip_nodes_if_on_run_start_fails": True} + + @pytest.fixture(scope="class") + def log_counts(self): + return "PASS=2 WARN=0 ERROR=1 SKIP=2 TOTAL=5" + + @pytest.fixture(scope="class") + def my_model_run_status(self): + return RunStatus.Skipped + + +class Test__ModelPass__EndHookFail: + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "on-run-end": [ + "create table {{ target.schema }}.my_hook_table ( id int )", # success + "drop table {{ target.schema }}.my_hook_table", # success + "insert into {{ target.schema }}.my_hook_table (id) values (1, 2, 3)", # fail + "create table {{ target.schema }}.my_hook_table ( id int )", # skip + ], + } + + @pytest.fixture(scope="class") + def models(self): + return {"my_model.sql": "select 1"} + + def test_results(self, project): + results, log_output = run_dbt_and_capture(["run"], expect_pass=False) + + expected_results = [ + ("model.test.my_model", RunStatus.Success), + ("operation.test.test-on-run-end-0", RunStatus.Success), + ("operation.test.test-on-run-end-1", RunStatus.Success), + ("operation.test.test-on-run-end-2", RunStatus.Error), + ("operation.test.test-on-run-end-3", RunStatus.Skipped), + ] + + assert [(result.node.unique_id, result.status) for result in results] == expected_results + assert "PASS=3 WARN=0 ERROR=1 SKIP=1 TOTAL=5" in log_output + assert "4 project hooks, 1 view model" in log_output + + run_results = get_artifact(project.project_root, "target", "run_results.json") + assert [ + (result["unique_id"], result["status"]) for result in run_results["results"] + ] == expected_results + assert ( + f'relation "{project.test_schema}.my_hook_table" does not exist' + in run_results["results"][3]["message"] + ) + + +class Test__SelectorEmpty__NoHooksRan: + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "on-run-start": [ + "create table {{ target.schema }}.my_hook_table ( id int )", # success + "drop table {{ target.schema }}.my_hook_table", # success + ], + "on-run-end": [ + "create table {{ target.schema }}.my_hook_table ( id int )", # success + "drop table {{ target.schema }}.my_hook_table", # success + ], + } + + @pytest.fixture(scope="class") + def models(self): + return {"my_model.sql": "select 1"} + + def test_results(self, project): + results, log_output = run_dbt_and_capture( + ["--debug", "run", "--select", "tag:no_such_tag", "--log-format", "json"] + ) + + assert results.results == [] + assert ( + "The selection criterion 'tag:no_such_tag' does not match any enabled nodes" + in log_output + ) + + run_results = get_artifact(project.project_root, "target", "run_results.json") + assert run_results["results"] == [] diff --git a/tests/functional/adapter/hooks/test_run_hooks.py b/tests/functional/adapter/hooks/test_run_hooks.py deleted file mode 100644 index f8bec5c6aeb..00000000000 --- a/tests/functional/adapter/hooks/test_run_hooks.py +++ /dev/null @@ -1,161 +0,0 @@ -import os -from pathlib import Path - -import pytest - -from dbt.tests.util import check_table_does_not_exist, run_dbt -from tests.functional.adapter.hooks.fixtures import ( - macros__before_and_after, - macros__hook, - macros_missing_column, - models__hooks, - models__missing_column, - seeds__example_seed_csv, -) - - -class TestPrePostRunHooks(object): - @pytest.fixture(scope="function") - def setUp(self, project): - project.run_sql_file(project.test_data_dir / Path("seed_run.sql")) - project.run_sql(f"drop table if exists { project.test_schema }.schemas") - project.run_sql(f"drop table if exists { project.test_schema }.db_schemas") - os.environ["TERM_TEST"] = "TESTING" - - @pytest.fixture(scope="class") - def macros(self): - return {"hook.sql": macros__hook, "before-and-after.sql": macros__before_and_after} - - @pytest.fixture(scope="class") - def models(self): - return {"hooks.sql": models__hooks} - - @pytest.fixture(scope="class") - def seeds(self): - return {"example_seed.csv": seeds__example_seed_csv} - - @pytest.fixture(scope="class") - def project_config_update(self): - return { - # The create and drop table statements here validate that these hooks run - # in the same order that they are defined. Drop before create is an error. - # Also check that the table does not exist below. - "on-run-start": [ - "{{ custom_run_hook('start', target, run_started_at, invocation_id) }}", - "create table {{ target.schema }}.start_hook_order_test ( id int )", - "drop table {{ target.schema }}.start_hook_order_test", - "{{ log(env_var('TERM_TEST'), info=True) }}", - ], - "on-run-end": [ - "{{ custom_run_hook('end', target, run_started_at, invocation_id) }}", - "create table {{ target.schema }}.end_hook_order_test ( id int )", - "drop table {{ target.schema }}.end_hook_order_test", - "create table {{ target.schema }}.schemas ( schema text )", - "insert into {{ target.schema }}.schemas (schema) values {% for schema in schemas %}( '{{ schema }}' ){% if not loop.last %},{% endif %}{% endfor %}", - "create table {{ target.schema }}.db_schemas ( db text, schema text )", - "insert into {{ target.schema }}.db_schemas (db, schema) values {% for db, schema in database_schemas %}('{{ db }}', '{{ schema }}' ){% if not loop.last %},{% endif %}{% endfor %}", - ], - "seeds": { - "quote_columns": False, - }, - } - - def get_ctx_vars(self, state, project): - fields = [ - "test_state", - "target_dbname", - "target_host", - "target_name", - "target_schema", - "target_threads", - "target_type", - "target_user", - "target_pass", - "run_started_at", - "invocation_id", - "thread_id", - ] - field_list = ", ".join(['"{}"'.format(f) for f in fields]) - query = f"select {field_list} from {project.test_schema}.on_run_hook where test_state = '{state}'" - - vals = project.run_sql(query, fetch="all") - assert len(vals) != 0, "nothing inserted into on_run_hook table" - assert len(vals) == 1, "too many rows in hooks table" - ctx = dict([(k, v) for (k, v) in zip(fields, vals[0])]) - - return ctx - - def assert_used_schemas(self, project): - schemas_query = "select * from {}.schemas".format(project.test_schema) - results = project.run_sql(schemas_query, fetch="all") - assert len(results) == 1 - assert results[0][0] == project.test_schema - - db_schemas_query = "select * from {}.db_schemas".format(project.test_schema) - results = project.run_sql(db_schemas_query, fetch="all") - assert len(results) == 1 - assert results[0][0] == project.database - assert results[0][1] == project.test_schema - - def check_hooks(self, state, project, host): - ctx = self.get_ctx_vars(state, project) - - assert ctx["test_state"] == state - assert ctx["target_dbname"] == "dbt" - assert ctx["target_host"] == host - assert ctx["target_name"] == "default" - assert ctx["target_schema"] == project.test_schema - assert ctx["target_threads"] == 4 - assert ctx["target_type"] == "postgres" - assert ctx["target_user"] == "root" - assert ctx["target_pass"] == "" - - assert ( - ctx["run_started_at"] is not None and len(ctx["run_started_at"]) > 0 - ), "run_started_at was not set" - assert ( - ctx["invocation_id"] is not None and len(ctx["invocation_id"]) > 0 - ), "invocation_id was not set" - assert ctx["thread_id"].startswith("Thread-") or ctx["thread_id"] == "MainThread" - - def test_pre_and_post_run_hooks(self, setUp, project, dbt_profile_target): - run_dbt(["run"]) - - self.check_hooks("start", project, dbt_profile_target.get("host", None)) - self.check_hooks("end", project, dbt_profile_target.get("host", None)) - - check_table_does_not_exist(project.adapter, "start_hook_order_test") - check_table_does_not_exist(project.adapter, "end_hook_order_test") - self.assert_used_schemas(project) - - def test_pre_and_post_seed_hooks(self, setUp, project, dbt_profile_target): - run_dbt(["seed"]) - - self.check_hooks("start", project, dbt_profile_target.get("host", None)) - self.check_hooks("end", project, dbt_profile_target.get("host", None)) - - check_table_does_not_exist(project.adapter, "start_hook_order_test") - check_table_does_not_exist(project.adapter, "end_hook_order_test") - self.assert_used_schemas(project) - - -class TestAfterRunHooks(object): - @pytest.fixture(scope="class") - def macros(self): - return {"temp_macro.sql": macros_missing_column} - - @pytest.fixture(scope="class") - def models(self): - return {"test_column.sql": models__missing_column} - - @pytest.fixture(scope="class") - def project_config_update(self): - return { - # The create and drop table statements here validate that these hooks run - # in the same order that they are defined. Drop before create is an error. - # Also check that the table does not exist below. - "on-run-start": "- {{ export_table_check() }}" - } - - def test_missing_column_pre_hook(self, project): - run_dbt(["run"], expect_pass=False) diff --git a/tests/functional/artifacts/test_run_results.py b/tests/functional/artifacts/test_run_results.py index dea947f342b..a474f80c4c7 100644 --- a/tests/functional/artifacts/test_run_results.py +++ b/tests/functional/artifacts/test_run_results.py @@ -55,7 +55,7 @@ def project_config_update(self): def test_results_serializable(self, project): results = run_dbt(["run"]) - assert len(results.results) == 1 + assert len(results.results) == 2 # This test is failing due to the faulty assumptions that run_results.json would diff --git a/tests/functional/dependencies/test_local_dependency.py b/tests/functional/dependencies/test_local_dependency.py index d26345f2da6..e5bf1de5323 100644 --- a/tests/functional/dependencies/test_local_dependency.py +++ b/tests/functional/dependencies/test_local_dependency.py @@ -328,7 +328,7 @@ def test_hook_dependency(self, prepare_dependencies, project): run_dbt(["deps", "--vars", cli_vars]) results = run_dbt(["run", "--vars", cli_vars]) - assert len(results) == 2 + assert len(results) == 8 check_relations_equal(project.adapter, ["actual", "expected"]) diff --git a/tests/functional/retry/test_retry.py b/tests/functional/retry/test_retry.py index 012db25e42f..0a9e94f2e06 100644 --- a/tests/functional/retry/test_retry.py +++ b/tests/functional/retry/test_retry.py @@ -5,7 +5,7 @@ from dbt.contracts.results import RunStatus, TestStatus from dbt.exceptions import DbtRuntimeError, TargetNotFoundError -from dbt.tests.util import rm_file, run_dbt, write_file +from dbt.tests.util import rm_file, run_dbt, update_config_file, write_file from tests.functional.retry.fixtures import ( macros__alter_timezone_sql, models__sample_model, @@ -365,3 +365,92 @@ def test_retry_target_path_flag(self, project): results = run_dbt(["retry", "--state", "artifacts", "--target-path", "my_target_path"]) assert len(results) == 1 assert Path("my_target_path").is_dir() + + +class TestRetryHooksAlwaysRun: + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "on-run-start": ["select 1;"], + "on-run-end": ["select 2;"], + } + + @pytest.fixture(scope="class") + def models(self): + return { + "sample_model.sql": models__sample_model, + } + + def test_retry_hooks_always_run(self, project): + res = run_dbt(["run", "--target-path", "target"], expect_pass=False) + assert len(res) == 3 + + write_file(models__second_model, "models", "sample_model.sql") + res = run_dbt(["retry", "--state", "target"]) + assert len(res) == 3 + + +class TestFixRetryHook: + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "flags": { + "skip_nodes_if_on_run_start_fails": True, + }, + "on-run-start": [ + "select 1 as id", + "select column_does_not_exist", + "select 2 as id", + ], + } + + @pytest.fixture(scope="class") + def models(self): + return { + "sample_model.sql": "select 1 as id, 1 as foo", + "second_model.sql": models__second_model, + "union_model.sql": models__union_model, + } + + def test_fix_retry_hook(self, project): + res = run_dbt(["run"], expect_pass=False) + assert {r.node.unique_id: r.status for r in res.results} == { + "operation.test.test-on-run-start-0": RunStatus.Success, + "operation.test.test-on-run-start-1": RunStatus.Error, + "operation.test.test-on-run-start-2": RunStatus.Skipped, + "model.test.sample_model": RunStatus.Skipped, + "model.test.second_model": RunStatus.Skipped, + "model.test.union_model": RunStatus.Skipped, + } + + res = run_dbt(["retry"], expect_pass=False) + assert {r.node.unique_id: r.status for r in res.results} == { + "operation.test.test-on-run-start-0": RunStatus.Success, + "operation.test.test-on-run-start-1": RunStatus.Error, + "operation.test.test-on-run-start-2": RunStatus.Skipped, + "model.test.sample_model": RunStatus.Skipped, + "model.test.second_model": RunStatus.Skipped, + "model.test.union_model": RunStatus.Skipped, + } + + new_dbt_project_yml = { + "flags": { + "skip_nodes_if_on_run_start_fails": True, + }, + "on-run-start": [ + "select 1 as id", + "select 3 as id", + "select 2 as id", + ], + } + + update_config_file(new_dbt_project_yml, project.project_root, "dbt_project.yml") + res = run_dbt(["retry"]) + assert {r.node.unique_id: r.status for r in res.results} == { + "operation.test.test-on-run-start-0": RunStatus.Success, + "operation.test.test-on-run-start-1": RunStatus.Success, + "operation.test.test-on-run-start-2": RunStatus.Success, + "model.test.sample_model": RunStatus.Success, + "model.test.second_model": RunStatus.Success, + "model.test.union_model": RunStatus.Success, + } diff --git a/tests/functional/schema_tests/test_schema_v2_tests.py b/tests/functional/schema_tests/test_schema_v2_tests.py index ea33e62bce3..519ff64e853 100644 --- a/tests/functional/schema_tests/test_schema_v2_tests.py +++ b/tests/functional/schema_tests/test_schema_v2_tests.py @@ -474,9 +474,9 @@ def test_hooks_do_run_for_tests( ): # This passes now that hooks run, a behavior we changed in v1.0 results = run_dbt(["test", "--model", "ephemeral"]) - assert len(results) == 1 + assert len(results) == 3 for result in results: - assert result.status == "pass" + assert result.status in ("pass", "success") assert not result.skipped assert result.failures == 0, "test {} failed".format(result.node.name) @@ -507,9 +507,9 @@ def test_these_hooks_dont_run_for_tests( ): # This would fail if the hooks ran results = run_dbt(["test", "--model", "ephemeral"]) - assert len(results) == 1 + assert len(results) == 3 for result in results: - assert result.status == "pass" + assert result.status in ("pass", "success") assert not result.skipped assert result.failures == 0, "test {} failed".format(result.node.name) diff --git a/tests/functional/sources/test_source_freshness.py b/tests/functional/sources/test_source_freshness.py index 2f42a3aaa56..70c8866869e 100644 --- a/tests/functional/sources/test_source_freshness.py +++ b/tests/functional/sources/test_source_freshness.py @@ -129,12 +129,12 @@ def _assert_freshness_results(self, path, state): ] def _assert_project_hooks_called(self, logs: str): - assert "Running 1 on-run-start hook" in logs - assert "Running 1 on-run-end hook" in logs + assert "test.on-run-start.0" in logs + assert "test.on-run-start.0" in logs def _assert_project_hooks_not_called(self, logs: str): - assert "Running 1 on-run-start hook" not in logs - assert "Running 1 on-run-end hook" not in logs + assert "test.on-run-end.0" not in logs + assert "test.on-run-end.0" not in logs class TestSourceFreshness(SuccessfulSourceFreshnessTest):