diff --git a/.changes/unreleased/Features-20240729-173203.yaml b/.changes/unreleased/Features-20240729-173203.yaml new file mode 100644 index 00000000000..e788c8e9cc9 --- /dev/null +++ b/.changes/unreleased/Features-20240729-173203.yaml @@ -0,0 +1,7 @@ +kind: Features +body: Include models that depend on changed vars in state:modified, add state:modified.vars + selection method +time: 2024-07-29T17:32:03.368508-04:00 +custom: + Author: michelleark + Issue: "4304" diff --git a/.changes/unreleased/Features-20240918-162959.yaml b/.changes/unreleased/Features-20240918-162959.yaml new file mode 100644 index 00000000000..62c037540fb --- /dev/null +++ b/.changes/unreleased/Features-20240918-162959.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Create 'skip_nodes_if_on_run_start_fails' behavior change flag +time: 2024-09-18T16:29:59.268422+01:00 +custom: + Author: aranke + Issue: "7387" diff --git a/core/dbt/artifacts/resources/v1/components.py b/core/dbt/artifacts/resources/v1/components.py index 8eb43f35d8e..02bfa5d875d 100644 --- a/core/dbt/artifacts/resources/v1/components.py +++ b/core/dbt/artifacts/resources/v1/components.py @@ -197,6 +197,7 @@ class ParsedResource(ParsedResourceMandatory): unrendered_config_call_dict: Dict[str, Any] = field(default_factory=dict) relation_name: Optional[str] = None raw_code: str = "" + vars: Dict[str, Any] = field(default_factory=dict) def __post_serialize__(self, dct: Dict, context: Optional[Dict] = None): dct = super().__post_serialize__(dct, context) diff --git a/core/dbt/artifacts/resources/v1/exposure.py b/core/dbt/artifacts/resources/v1/exposure.py index 00f3c8b89e1..7d8c291381b 100644 --- a/core/dbt/artifacts/resources/v1/exposure.py +++ b/core/dbt/artifacts/resources/v1/exposure.py @@ -41,6 +41,7 @@ class Exposure(GraphResource): tags: List[str] = field(default_factory=list) config: ExposureConfig = field(default_factory=ExposureConfig) unrendered_config: Dict[str, Any] = field(default_factory=dict) + vars: Dict[str, Any] = field(default_factory=dict) url: Optional[str] = None depends_on: DependsOn = field(default_factory=DependsOn) refs: List[RefArgs] = field(default_factory=list) diff --git a/core/dbt/artifacts/resources/v1/source_definition.py b/core/dbt/artifacts/resources/v1/source_definition.py index 9044307563e..454de4d7506 100644 --- a/core/dbt/artifacts/resources/v1/source_definition.py +++ b/core/dbt/artifacts/resources/v1/source_definition.py @@ -69,6 +69,7 @@ class SourceDefinition(ParsedSourceMandatory): config: SourceConfig = field(default_factory=SourceConfig) patch_path: Optional[str] = None unrendered_config: Dict[str, Any] = field(default_factory=dict) + vars: Dict[str, Any] = field(default_factory=dict) relation_name: Optional[str] = None created_at: float = field(default_factory=lambda: time.time()) unrendered_database: Optional[str] = None diff --git a/core/dbt/context/configured.py b/core/dbt/context/configured.py index 240d9afb843..9ee7ebc79c3 100644 --- a/core/dbt/context/configured.py +++ b/core/dbt/context/configured.py @@ -31,23 +31,35 @@ def __init__(self, package_name: str): self.resource_type = NodeType.Model +class SchemaYamlVars: + def __init__(self): + self.env_vars = {} + self.vars = {} + + class ConfiguredVar(Var): def __init__( self, context: Dict[str, Any], config: AdapterRequiredConfig, project_name: str, + schema_yaml_vars: Optional[SchemaYamlVars] = None, ): super().__init__(context, config.cli_vars) self._config = config self._project_name = project_name + self.schema_yaml_vars = schema_yaml_vars def __call__(self, var_name, default=Var._VAR_NOTSET): my_config = self._config.load_dependencies()[self._project_name] + var_found = False + var_value = None + # cli vars > active project > local project if var_name in self._config.cli_vars: - return self._config.cli_vars[var_name] + var_found = True + var_value = self._config.cli_vars[var_name] adapter_type = self._config.credentials.type lookup = FQNLookup(self._project_name) @@ -58,19 +70,21 @@ def __call__(self, var_name, default=Var._VAR_NOTSET): all_vars.add(my_config.vars.vars_for(lookup, adapter_type)) all_vars.add(active_vars) - if var_name in all_vars: - return all_vars[var_name] + if not var_found and var_name in all_vars: + var_found = True + var_value = all_vars[var_name] - if default is not Var._VAR_NOTSET: - return default - - return self.get_missing_var(var_name) + if not var_found and default is not Var._VAR_NOTSET: + var_found = True + var_value = default + if not var_found: + return self.get_missing_var(var_name) + else: + if self.schema_yaml_vars: + self.schema_yaml_vars.vars[var_name] = var_value -class SchemaYamlVars: - def __init__(self): - self.env_vars = {} - self.vars = {} + return var_value class SchemaYamlContext(ConfiguredContext): @@ -82,7 +96,7 @@ def __init__(self, config, project_name: str, schema_yaml_vars: Optional[SchemaY @contextproperty() def var(self) -> ConfiguredVar: - return ConfiguredVar(self._ctx, self.config, self._project_name) + return ConfiguredVar(self._ctx, self.config, self._project_name, self.schema_yaml_vars) @contextmember() def env_var(self, var: str, default: Optional[str] = None) -> str: diff --git a/core/dbt/context/providers.py b/core/dbt/context/providers.py index dfc8c9bb40b..a0e3751587a 100644 --- a/core/dbt/context/providers.py +++ b/core/dbt/context/providers.py @@ -790,6 +790,14 @@ def get_missing_var(self, var_name): # in the parser, just always return None. return None + def __call__(self, var_name: str, default: Any = ModelConfiguredVar._VAR_NOTSET) -> Any: + var_value = super().__call__(var_name, default) + + if self._node and hasattr(self._node, "vars"): + self._node.vars[var_name] = var_value + + return var_value + class RuntimeVar(ModelConfiguredVar): pass diff --git a/core/dbt/contracts/files.py b/core/dbt/contracts/files.py index d5c1dba5366..a1cc75abcc4 100644 --- a/core/dbt/contracts/files.py +++ b/core/dbt/contracts/files.py @@ -215,6 +215,7 @@ class SchemaSourceFile(BaseSourceFile): unrendered_configs: Dict[str, Any] = field(default_factory=dict) unrendered_databases: Dict[str, Any] = field(default_factory=dict) unrendered_schemas: Dict[str, Any] = field(default_factory=dict) + vars: Dict[str, Any] = field(default_factory=dict) pp_dict: Optional[Dict[str, Any]] = None pp_test_index: Optional[Dict[str, Any]] = None @@ -355,6 +356,22 @@ def delete_from_unrendered_configs(self, yaml_key, name): if not self.unrendered_configs[yaml_key]: del self.unrendered_configs[yaml_key] + def add_vars(self, vars: Dict[str, Any], yaml_key: str, name: str) -> None: + if yaml_key not in self.vars: + self.vars[yaml_key] = {} + + if name not in self.vars[yaml_key]: + self.vars[yaml_key][name] = vars + + def get_vars(self, yaml_key: str, name: str) -> Dict[str, Any]: + if yaml_key not in self.vars: + return {} + + if name not in self.vars[yaml_key]: + return {} + + return self.vars[yaml_key][name] + def add_env_var(self, var, yaml_key, name): if yaml_key not in self.env_vars: self.env_vars[yaml_key] = {} diff --git a/core/dbt/contracts/graph/nodes.py b/core/dbt/contracts/graph/nodes.py index 13560ef742c..2cdd25506a6 100644 --- a/core/dbt/contracts/graph/nodes.py +++ b/core/dbt/contracts/graph/nodes.py @@ -369,6 +369,9 @@ def same_contract(self, old, adapter_type=None) -> bool: # This would only apply to seeds return True + def same_vars(self, old) -> bool: + return self.vars == old.vars + def same_contents(self, old, adapter_type) -> bool: if old is None: return False @@ -376,12 +379,20 @@ def same_contents(self, old, adapter_type) -> bool: # Need to ensure that same_contract is called because it # could throw an error same_contract = self.same_contract(old, adapter_type) + + # Legacy behaviour + if not get_flags().state_modified_compare_vars: + same_vars = True + else: + same_vars = self.same_vars(old) + return ( self.same_body(old) and self.same_config(old) and self.same_persisted_description(old) and self.same_fqn(old) and self.same_database_representation(old) + and same_vars and same_contract and True ) @@ -1255,6 +1266,9 @@ def same_config(self, old: "SourceDefinition") -> bool: old.unrendered_config, ) + def same_vars(self, other: "SourceDefinition") -> bool: + return self.vars == other.vars + def same_contents(self, old: Optional["SourceDefinition"]) -> bool: # existing when it didn't before is a change! if old is None: @@ -1268,6 +1282,12 @@ def same_contents(self, old: Optional["SourceDefinition"]) -> bool: # freshness changes are changes, I guess # metadata/tags changes are not "changes" # patching/description changes are not "changes" + # Legacy behaviour + if not get_flags().state_modified_compare_vars: + same_vars = True + else: + same_vars = self.same_vars(old) + return ( self.same_database_representation(old) and self.same_fqn(old) @@ -1275,6 +1295,7 @@ def same_contents(self, old: Optional["SourceDefinition"]) -> bool: and self.same_quoting(old) and self.same_freshness(old) and self.same_external(old) + and same_vars and True ) @@ -1371,12 +1392,21 @@ def same_config(self, old: "Exposure") -> bool: old.unrendered_config, ) + def same_vars(self, old: "Exposure") -> bool: + return self.vars == old.vars + def same_contents(self, old: Optional["Exposure"]) -> bool: # existing when it didn't before is a change! # metadata/tags changes are not "changes" if old is None: return True + # Legacy behaviour + if not get_flags().state_modified_compare_vars: + same_vars = True + else: + same_vars = self.same_vars(old) + return ( self.same_fqn(old) and self.same_exposure_type(old) @@ -1387,6 +1417,7 @@ def same_contents(self, old: Optional["Exposure"]) -> bool: and self.same_label(old) and self.same_depends_on(old) and self.same_config(old) + and same_vars and True ) @@ -1638,6 +1669,7 @@ class ParsedNodePatch(ParsedPatch): latest_version: Optional[NodeVersion] constraints: List[Dict[str, Any]] deprecation_date: Optional[datetime] + vars: Dict[str, Any] time_spine: Optional[TimeSpine] = None @@ -1673,6 +1705,7 @@ class ParsedMacroPatch(ParsedPatch): ResultNode = Union[ ManifestNode, SourceDefinition, + HookNode, ] # All nodes that can be in the DAG diff --git a/core/dbt/contracts/project.py b/core/dbt/contracts/project.py index e08131ecd8f..f5a4ec605ec 100644 --- a/core/dbt/contracts/project.py +++ b/core/dbt/contracts/project.py @@ -341,7 +341,9 @@ class ProjectFlags(ExtensibleDbtClassMixin): require_explicit_package_overrides_for_builtin_materializations: bool = True require_resource_names_without_spaces: bool = False source_freshness_run_project_hooks: bool = False + skip_nodes_if_on_run_start_fails: bool = False state_modified_compare_more_unrendered_values: bool = False + state_modified_compare_vars: bool = False @property def project_only_flags(self) -> Dict[str, Any]: @@ -349,7 +351,9 @@ def project_only_flags(self) -> Dict[str, Any]: "require_explicit_package_overrides_for_builtin_materializations": self.require_explicit_package_overrides_for_builtin_materializations, "require_resource_names_without_spaces": self.require_resource_names_without_spaces, "source_freshness_run_project_hooks": self.source_freshness_run_project_hooks, + "skip_nodes_if_on_run_start_fails": self.skip_nodes_if_on_run_start_fails, "state_modified_compare_more_unrendered_values": self.state_modified_compare_more_unrendered_values, + "state_modified_compare_vars": self.state_modified_compare_vars, } diff --git a/core/dbt/events/types.py b/core/dbt/events/types.py index e55e1cdd252..d5b6256a4f6 100644 --- a/core/dbt/events/types.py +++ b/core/dbt/events/types.py @@ -1516,10 +1516,20 @@ def code(self) -> str: return "Q033" def message(self) -> str: - msg = f"OK hook: {self.statement}" + if self.status == "success": + info = "OK" + status = green(info) + elif self.status == "skipped": + info = "SKIP" + status = yellow(info) + else: + info = "ERROR" + status = red(info) + msg = f"{info} hook: {self.statement}" + return format_fancy_output_line( msg=msg, - status=green(self.status), + status=status, index=self.index, total=self.total, execution_time=self.execution_time, diff --git a/core/dbt/graph/selector_methods.py b/core/dbt/graph/selector_methods.py index dbeaf7ed4c3..c7a2490760b 100644 --- a/core/dbt/graph/selector_methods.py +++ b/core/dbt/graph/selector_methods.py @@ -752,6 +752,7 @@ def search(self, included_nodes: Set[UniqueId], selector: str) -> Iterator[Uniqu "modified.relation": self.check_modified_factory("same_database_representation"), "modified.macros": self.check_modified_macros, "modified.contract": self.check_modified_contract("same_contract", adapter_type), + "modified.vars": self.check_modified_factory("same_vars"), } if selector in state_checks: checker = state_checks[selector] diff --git a/core/dbt/parser/schema_yaml_readers.py b/core/dbt/parser/schema_yaml_readers.py index dc99e87a218..4fe28a74866 100644 --- a/core/dbt/parser/schema_yaml_readers.py +++ b/core/dbt/parser/schema_yaml_readers.py @@ -91,6 +91,9 @@ def parse_exposure(self, unparsed: UnparsedExposure) -> None: unique_id = f"{NodeType.Exposure}.{package_name}.{unparsed.name}" path = self.yaml.path.relative_path + assert isinstance(self.yaml.file, SchemaSourceFile) + exposure_vars = self.yaml.file.get_vars(self.key, unparsed.name) + fqn = self.schema_parser.get_fqn_prefix(path) fqn.append(unparsed.name) @@ -133,6 +136,7 @@ def parse_exposure(self, unparsed: UnparsedExposure) -> None: maturity=unparsed.maturity, config=config, unrendered_config=unrendered_config, + vars=exposure_vars, ) ctx = generate_parse_exposure( parsed, @@ -144,7 +148,6 @@ def parse_exposure(self, unparsed: UnparsedExposure) -> None: get_rendered(depends_on_jinja, ctx, parsed, capture_macros=True) # parsed now has a populated refs/sources/metrics - assert isinstance(self.yaml.file, SchemaSourceFile) if parsed.config.enabled: self.manifest.add_exposure(self.yaml.file, parsed) else: diff --git a/core/dbt/parser/schemas.py b/core/dbt/parser/schemas.py index 9ac70e85219..ae6aca476a3 100644 --- a/core/dbt/parser/schemas.py +++ b/core/dbt/parser/schemas.py @@ -412,10 +412,14 @@ def get_key_dicts(self) -> Iterable[Dict[str, Any]]: if self.schema_yaml_vars.env_vars: self.schema_parser.manifest.env_vars.update(self.schema_yaml_vars.env_vars) - for var in self.schema_yaml_vars.env_vars.keys(): - schema_file.add_env_var(var, self.key, entry["name"]) + for env_var in self.schema_yaml_vars.env_vars.keys(): + schema_file.add_env_var(env_var, self.key, entry["name"]) self.schema_yaml_vars.env_vars = {} + if self.schema_yaml_vars.vars: + schema_file.add_vars(self.schema_yaml_vars.vars, self.key, entry["name"]) + self.schema_yaml_vars.vars = {} + yield entry def render_entry(self, dct): @@ -695,6 +699,9 @@ def parse_patch(self, block: TargetBlock[NodeTarget], refs: ParserRef) -> None: # code consistency. deprecation_date: Optional[datetime.datetime] = None time_spine: Optional[TimeSpine] = None + assert isinstance(self.yaml.file, SchemaSourceFile) + source_file: SchemaSourceFile = self.yaml.file + if isinstance(block.target, UnparsedModelUpdate): deprecation_date = block.target.deprecation_date time_spine = ( @@ -727,9 +734,9 @@ def parse_patch(self, block: TargetBlock[NodeTarget], refs: ParserRef) -> None: constraints=block.target.constraints, deprecation_date=deprecation_date, time_spine=time_spine, + vars=source_file.get_vars(block.target.yaml_key, block.target.name), ) - assert isinstance(self.yaml.file, SchemaSourceFile) - source_file: SchemaSourceFile = self.yaml.file + if patch.yaml_key in ["models", "seeds", "snapshots"]: unique_id = self.manifest.ref_lookup.get_unique_id( patch.name, self.project.project_name, None @@ -823,6 +830,8 @@ def patch_node_properties(self, node, patch: "ParsedNodePatch") -> None: node.description = patch.description node.columns = patch.columns node.name = patch.name + # Prefer node-level vars to vars from patch + node.vars = {**patch.vars, **node.vars} if not isinstance(node, ModelNode): for attr in ["latest_version", "access", "version", "constraints"]: @@ -972,6 +981,7 @@ def parse_patch(self, block: TargetBlock[UnparsedModelUpdate], refs: ParserRef) latest_version=latest_version, constraints=unparsed_version.constraints or target.constraints, deprecation_date=unparsed_version.deprecation_date, + vars=source_file.get_vars(block.target.yaml_key, block.target.name), ) # Node patched before config because config patching depends on model name, # which may have been updated in the version patch diff --git a/core/dbt/parser/sources.py b/core/dbt/parser/sources.py index 0fe882750ae..e98b3103691 100644 --- a/core/dbt/parser/sources.py +++ b/core/dbt/parser/sources.py @@ -12,6 +12,7 @@ ContextConfigGenerator, UnrenderedConfigGenerator, ) +from dbt.contracts.files import SchemaSourceFile from dbt.contracts.graph.manifest import Manifest, SourceKey from dbt.contracts.graph.nodes import ( GenericTestNode, @@ -158,6 +159,10 @@ def parse_source(self, target: UnpatchedSourceDefinition) -> SourceDefinition: rendered=False, ) + schema_file = self.manifest.files[target.file_id] + assert isinstance(schema_file, SchemaSourceFile) + source_vars = schema_file.get_vars("sources", source.name) + if not isinstance(config, SourceConfig): raise DbtInternalError( f"Calculated a {type(config)} for a source, but expected a SourceConfig" @@ -192,6 +197,7 @@ def parse_source(self, target: UnpatchedSourceDefinition) -> SourceDefinition: tags=tags, config=config, unrendered_config=unrendered_config, + vars=source_vars, ) if ( diff --git a/core/dbt/task/clone.py b/core/dbt/task/clone.py index 68012861907..0e5b797ed64 100644 --- a/core/dbt/task/clone.py +++ b/core/dbt/task/clone.py @@ -1,7 +1,7 @@ import threading from typing import AbstractSet, Any, Collection, Iterable, List, Optional, Set, Type -from dbt.adapters.base import BaseRelation +from dbt.adapters.base import BaseAdapter, BaseRelation from dbt.artifacts.resources.types import NodeType from dbt.artifacts.schemas.run import RunResult, RunStatus from dbt.clients.jinja import MacroGenerator @@ -125,7 +125,7 @@ def get_model_schemas(self, adapter, selected_uids: Iterable[str]) -> Set[BaseRe return result - def before_run(self, adapter, selected_uids: AbstractSet[str]): + def before_run(self, adapter: BaseAdapter, selected_uids: AbstractSet[str]) -> RunStatus: with adapter.connection_named("master"): self.defer_to_manifest() # only create target schemas, but also cache defer_relation schemas @@ -133,6 +133,7 @@ def before_run(self, adapter, selected_uids: AbstractSet[str]): self.create_schemas(adapter, schemas_to_create) schemas_to_cache = self.get_model_schemas(adapter, selected_uids) self.populate_adapter_cache(adapter, schemas_to_cache) + return RunStatus.Success @property def resource_types(self) -> List[NodeType]: diff --git a/core/dbt/task/freshness.py b/core/dbt/task/freshness.py index eb1508acb36..06e78b17c7b 100644 --- a/core/dbt/task/freshness.py +++ b/core/dbt/task/freshness.py @@ -4,6 +4,7 @@ from typing import AbstractSet, Dict, List, Optional, Type from dbt import deprecations +from dbt.adapters.base import BaseAdapter from dbt.adapters.base.impl import FreshnessResponse from dbt.adapters.base.relation import BaseRelation from dbt.adapters.capability import Capability @@ -204,10 +205,25 @@ def get_node_selector(self): resource_types=[NodeType.Source], ) - def before_run(self, adapter, selected_uids: AbstractSet[str]) -> None: - super().before_run(adapter, selected_uids) - if adapter.supports(Capability.TableLastModifiedMetadataBatch): - self.populate_metadata_freshness_cache(adapter, selected_uids) + def before_run(self, adapter: BaseAdapter, selected_uids: AbstractSet[str]) -> RunStatus: + populate_metadata_freshness_cache_status = RunStatus.Success + + before_run_status = super().before_run(adapter, selected_uids) + + if before_run_status == RunStatus.Success and adapter.supports( + Capability.TableLastModifiedMetadataBatch + ): + populate_metadata_freshness_cache_status = self.populate_metadata_freshness_cache( + adapter, selected_uids + ) + + if ( + before_run_status == RunStatus.Success + and populate_metadata_freshness_cache_status == RunStatus.Success + ): + return RunStatus.Success + else: + return RunStatus.Error def get_runner(self, node) -> BaseRunner: freshness_runner = super().get_runner(node) @@ -243,7 +259,9 @@ def get_hooks_by_type(self, hook_type: RunHookType) -> List[HookNode]: deprecations.warn("source-freshness-project-hooks") return [] - def populate_metadata_freshness_cache(self, adapter, selected_uids: AbstractSet[str]) -> None: + def populate_metadata_freshness_cache( + self, adapter, selected_uids: AbstractSet[str] + ) -> RunStatus: if self.manifest is None: raise DbtInternalError("Manifest must be set to populate metadata freshness cache") @@ -266,6 +284,7 @@ def populate_metadata_freshness_cache(self, adapter, selected_uids: AbstractSet[ batch_metadata_sources ) self._metadata_freshness_cache.update(metadata_freshness_results) + return RunStatus.Success except Exception as e: # This error handling is intentionally very coarse. # If anything goes wrong during batch metadata calculation, we can safely @@ -276,6 +295,7 @@ def populate_metadata_freshness_cache(self, adapter, selected_uids: AbstractSet[ Note(msg=f"Metadata freshness could not be computed in batch: {e}"), EventLevel.WARN, ) + return RunStatus.Error def get_freshness_metadata_cache(self) -> Dict[BaseRelation, FreshnessResult]: return self._metadata_freshness_cache diff --git a/core/dbt/task/printer.py b/core/dbt/task/printer.py index a88690e1046..58a39552450 100644 --- a/core/dbt/task/printer.py +++ b/core/dbt/task/printer.py @@ -33,7 +33,8 @@ def get_counts(flat_nodes) -> str: counts[t] = counts.get(t, 0) + 1 - stat_line = ", ".join([pluralize(v, k).replace("_", " ") for k, v in counts.items()]) + sorted_items = sorted(counts.items(), key=lambda x: x[0]) + stat_line = ", ".join([pluralize(v, k).replace("_", " ") for k, v in sorted_items]) return stat_line diff --git a/core/dbt/task/retry.py b/core/dbt/task/retry.py index eb7b22325ad..4f08804d191 100644 --- a/core/dbt/task/retry.py +++ b/core/dbt/task/retry.py @@ -121,13 +121,15 @@ def __init__(self, args: Flags, config: RuntimeConfig) -> None: self.task_class = TASK_DICT.get(self.previous_command_name) # type: ignore def run(self): - unique_ids = set( - [ - result.unique_id - for result in self.previous_results.results - if result.status in RETRYABLE_STATUSES - ] - ) + unique_ids = { + result.unique_id + for result in self.previous_results.results + if result.status in RETRYABLE_STATUSES + and not ( + self.previous_command_name != "run-operation" + and result.unique_id.startswith("operation.") + ) + } batch_map = { result.unique_id: result.batch_results.failed @@ -135,6 +137,10 @@ def run(self): if result.status == NodeStatus.PartialSuccess and result.batch_results is not None and len(result.batch_results.failed) > 0 + and not ( + self.previous_command_name != "run-operation" + and result.unique_id.startswith("operation.") + ) } class TaskWrapper(self.task_class): diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index ac8298cc9d9..12e17de02c9 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -1,30 +1,25 @@ import functools import os import threading -import time from datetime import datetime from typing import AbstractSet, Any, Dict, Iterable, List, Optional, Set, Tuple, Type from dbt import tracking, utils -from dbt.adapters.base import BaseRelation -from dbt.adapters.events.types import ( - DatabaseErrorRunningHook, - FinishedRunningStats, - HooksRunning, -) +from dbt.adapters.base import BaseAdapter, BaseRelation +from dbt.adapters.events.types import FinishedRunningStats from dbt.adapters.exceptions import MissingMaterializationError from dbt.artifacts.resources import Hook from dbt.artifacts.schemas.batch_results import BatchResults, BatchType from dbt.artifacts.schemas.results import ( - BaseResult, NodeStatus, RunningStatus, RunStatus, + TimingInfo, ) from dbt.artifacts.schemas.run import RunResult from dbt.cli.flags import Flags from dbt.clients.jinja import MacroGenerator -from dbt.config.runtime import RuntimeConfig +from dbt.config import RuntimeConfig from dbt.context.providers import generate_runtime_model_context from dbt.contracts.graph.manifest import Manifest from dbt.contracts.graph.nodes import HookNode, ModelNode, ResultNode @@ -40,7 +35,10 @@ from dbt.hooks import get_hook_dict from dbt.materializations.incremental.microbatch import MicrobatchBuilder from dbt.node_types import NodeType, RunHookType +from dbt.task import group_lookup from dbt.task.base import BaseRunner +from dbt.task.compile import CompileRunner, CompileTask +from dbt.task.printer import get_counts, print_run_end_messages from dbt_common.clients.jinja import MacroProtocol from dbt_common.dataclass_schema import dbtClassMixin from dbt_common.events.base_types import EventLevel @@ -49,29 +47,6 @@ from dbt_common.events.types import Formatting from dbt_common.exceptions import DbtValidationError -from . import group_lookup -from .compile import CompileRunner, CompileTask -from .printer import get_counts, print_run_end_messages - - -class Timer: - def __init__(self) -> None: - self.start = None - self.end = None - - @property - def elapsed(self): - if self.start is None or self.end is None: - return None - return self.end - self.start - - def __enter__(self): - self.start = time.time() - return self - - def __exit__(self, exc_type, exc_value, exc_tracebck): - self.end = time.time() - @functools.total_ordering class BiggestName(str): @@ -107,6 +82,21 @@ def get_hook(source, index): return Hook.from_dict(hook_dict) +def get_execution_status(sql: str, adapter: BaseAdapter) -> Tuple[RunStatus, str]: + if not sql.strip(): + return RunStatus.Success, "OK" + + try: + response, _ = adapter.execute(sql, auto_begin=False, fetch=False) + status = RunStatus.Success + message = response._message + except DbtRuntimeError as exc: + status = RunStatus.Error + message = exc.msg + finally: + return status, message + + def track_model_run(index, num_nodes, run_model_result): if tracking.active_user is None: raise DbtInternalError("cannot track model run with no active user") @@ -577,13 +567,8 @@ def __init__( batch_map: Optional[Dict[str, List[BatchType]]] = None, ) -> None: super().__init__(args, config, manifest) - self.ran_hooks: List[HookNode] = [] - self._total_executed = 0 self.batch_map = batch_map - def index_offset(self, value: int) -> int: - return self._total_executed + value - def raise_on_first_error(self) -> bool: return False @@ -614,88 +599,93 @@ def get_hooks_by_type(self, hook_type: RunHookType) -> List[HookNode]: hooks.sort(key=self._hook_keyfunc) return hooks - def run_hooks(self, adapter, hook_type: RunHookType, extra_context) -> None: + def safe_run_hooks( + self, adapter: BaseAdapter, hook_type: RunHookType, extra_context: Dict[str, Any] + ) -> RunStatus: + started_at = datetime.utcnow() ordered_hooks = self.get_hooks_by_type(hook_type) - # on-run-* hooks should run outside of a transaction. This happens - # b/c psycopg2 automatically begins a transaction when a connection - # is created. + if hook_type == RunHookType.End and ordered_hooks: + fire_event(Formatting("")) + + # on-run-* hooks should run outside a transaction. This happens because psycopg2 automatically begins a transaction when a connection is created. adapter.clear_transaction() if not ordered_hooks: - return - num_hooks = len(ordered_hooks) + return RunStatus.Success - fire_event(Formatting("")) - fire_event(HooksRunning(num_hooks=num_hooks, hook_type=hook_type)) + status = RunStatus.Success + failed = False + num_hooks = len(ordered_hooks) - for idx, hook in enumerate(ordered_hooks, start=1): - # We want to include node_info in the appropriate log files, so use - # log_contextvars + for idx, hook in enumerate(ordered_hooks, 1): with log_contextvars(node_info=hook.node_info): - hook.update_event_status( - started_at=datetime.utcnow().isoformat(), node_status=RunningStatus.Started - ) - sql = self.get_hook_sql(adapter, hook, idx, num_hooks, extra_context) - - hook_text = "{}.{}.{}".format(hook.package_name, hook_type, hook.index) - fire_event( - LogHookStartLine( - statement=hook_text, - index=idx, - total=num_hooks, - node_info=hook.node_info, + hook.index = idx + hook_name = f"{hook.package_name}.{hook_type}.{hook.index - 1}" + execution_time = 0.0 + timing = [] + failures = 1 + + if not failed: + hook.update_event_status( + started_at=started_at.isoformat(), node_status=RunningStatus.Started ) - ) + sql = self.get_hook_sql(adapter, hook, hook.index, num_hooks, extra_context) + fire_event( + LogHookStartLine( + statement=hook_name, + index=hook.index, + total=num_hooks, + node_info=hook.node_info, + ) + ) + + status, message = get_execution_status(sql, adapter) + finished_at = datetime.utcnow() + hook.update_event_status(finished_at=finished_at.isoformat()) + execution_time = (finished_at - started_at).total_seconds() + timing = [TimingInfo(hook_name, started_at, finished_at)] + failures = 0 if status == RunStatus.Success else 1 - with Timer() as timer: - if len(sql.strip()) > 0: - response, _ = adapter.execute(sql, auto_begin=False, fetch=False) - status = response._message + if status == RunStatus.Success: + message = f"{hook_name} passed" else: - status = "OK" + message = f"{hook_name} failed, error:\n {message}" + failed = True + else: + status = RunStatus.Skipped + message = f"{hook_name} skipped" + + self.node_results.append( + RunResult( + status=status, + thread_id="main", + timing=timing, + message=message, + adapter_response={}, + execution_time=execution_time, + failures=failures, + node=hook, + ) + ) - self.ran_hooks.append(hook) - hook.update_event_status(finished_at=datetime.utcnow().isoformat()) - hook.update_event_status(node_status=RunStatus.Success) fire_event( LogHookEndLine( - statement=hook_text, + statement=hook_name, status=status, - index=idx, + index=hook.index, total=num_hooks, - execution_time=timer.elapsed, + execution_time=execution_time, node_info=hook.node_info, ) ) - # `_event_status` dict is only used for logging. Make sure - # it gets deleted when we're done with it - hook.clear_event_status() - self._total_executed += len(ordered_hooks) + if hook_type == RunHookType.Start and ordered_hooks: + fire_event(Formatting("")) - fire_event(Formatting("")) - - def safe_run_hooks( - self, adapter, hook_type: RunHookType, extra_context: Dict[str, Any] - ) -> None: - try: - self.run_hooks(adapter, hook_type, extra_context) - except DbtRuntimeError as exc: - fire_event(DatabaseErrorRunningHook(hook_type=hook_type.value)) - self.node_results.append( - BaseResult( - status=RunStatus.Error, - thread_id="main", - timing=[], - message=f"{hook_type.value} failed, error:\n {exc.msg}", - adapter_response={}, - execution_time=0, - failures=1, - ) - ) + return status def print_results_line(self, results, execution_time) -> None: - nodes = [r.node for r in results if hasattr(r, "node")] + self.ran_hooks + nodes = [r.node for r in results if hasattr(r, "node")] stat_line = get_counts(nodes) execution = "" @@ -718,15 +708,16 @@ def populate_microbatch_batches(self, selected_uids: AbstractSet[str]): if isinstance(node, ModelNode): node.batches = self.batch_map[uid] - def before_run(self, adapter, selected_uids: AbstractSet[str]) -> None: + def before_run(self, adapter: BaseAdapter, selected_uids: AbstractSet[str]) -> RunStatus: with adapter.connection_named("master"): self.defer_to_manifest() required_schemas = self.get_model_schemas(adapter, selected_uids) self.create_schemas(adapter, required_schemas) self.populate_adapter_cache(adapter, required_schemas) self.populate_microbatch_batches(selected_uids) - self.safe_run_hooks(adapter, RunHookType.Start, {}) group_lookup.init(self.manifest, selected_uids) + run_hooks_status = self.safe_run_hooks(adapter, RunHookType.Start, {}) + return run_hooks_status def after_run(self, adapter, results) -> None: # in on-run-end hooks, provide the value 'database_schemas', which is a @@ -741,8 +732,6 @@ def after_run(self, adapter, results) -> None: and r.status not in (NodeStatus.Error, NodeStatus.Fail, NodeStatus.Skipped) } - self._total_executed += len(results) - extras = { "schemas": list({s for _, s in database_schema_set}), "results": results, diff --git a/core/dbt/task/runnable.py b/core/dbt/task/runnable.py index e7e66ce2f60..a37308f81ea 100644 --- a/core/dbt/task/runnable.py +++ b/core/dbt/task/runnable.py @@ -11,7 +11,7 @@ import dbt.tracking import dbt.utils import dbt_common.utils.formatting -from dbt.adapters.base import BaseRelation +from dbt.adapters.base import BaseAdapter, BaseRelation from dbt.adapters.factory import get_adapter from dbt.artifacts.schemas.results import ( BaseResult, @@ -36,6 +36,7 @@ NodeStart, NothingToDo, QueryCancelationUnsupported, + SkippingDetails, ) from dbt.exceptions import DbtInternalError, DbtRuntimeError, FailFastError from dbt.flags import get_flags @@ -65,6 +66,14 @@ class GraphRunnableMode(StrEnum): Independent = "independent" +def mark_node_as_skipped( + node: ResultNode, executed_node_ids: Set[str], message: Optional[str] +) -> Optional[RunResult]: + if node.unique_id not in executed_node_ids: + return RunResult.from_node(node, RunStatus.Skipped, message) + return None + + class GraphRunnableTask(ConfiguredTask): MARK_DEPENDENT_ERRORS_STATUSES = [NodeStatus.Error, NodeStatus.PartialSuccess] @@ -390,14 +399,6 @@ def _cancel_connections(self, pool): def execute_nodes(self): num_threads = self.config.threads - target_name = self.config.target_name - - fire_event( - ConcurrencyLine( - num_threads=num_threads, target_name=target_name, node_count=self.num_nodes - ) - ) - fire_event(Formatting("")) pool = ThreadPool(num_threads, self._pool_thread_initializer, [get_invocation_context()]) try: @@ -405,12 +406,13 @@ def execute_nodes(self): except FailFastError as failure: self._cancel_connections(pool) - executed_node_ids = [r.node.unique_id for r in self.node_results] + executed_node_ids = {r.node.unique_id for r in self.node_results} + message = "Skipping due to fail_fast" - for r in self._flattened_nodes: - if r.unique_id not in executed_node_ids: + for node in self._flattened_nodes: + if node.unique_id not in executed_node_ids: self.node_results.append( - RunResult.from_node(r, RunStatus.Skipped, "Skipping due to fail_fast") + mark_node_as_skipped(node, executed_node_ids, message) ) print_run_result_error(failure.result) @@ -482,10 +484,11 @@ def populate_adapter_cache( {"adapter_cache_construction_elapsed": cache_populate_time} ) - def before_run(self, adapter, selected_uids: AbstractSet[str]): + def before_run(self, adapter: BaseAdapter, selected_uids: AbstractSet[str]) -> RunStatus: with adapter.connection_named("master"): self.defer_to_manifest() self.populate_adapter_cache(adapter) + return RunStatus.Success def after_run(self, adapter, results) -> None: pass @@ -495,10 +498,48 @@ def print_results_line(self, node_results, elapsed): def execute_with_hooks(self, selected_uids: AbstractSet[str]): adapter = get_adapter(self.config) + + fire_event(Formatting("")) + fire_event( + ConcurrencyLine( + num_threads=self.config.threads, + target_name=self.config.target_name, + node_count=self.num_nodes, + ) + ) + fire_event(Formatting("")) + self.started_at = time.time() try: - self.before_run(adapter, selected_uids) - res = self.execute_nodes() + before_run_status = self.before_run(adapter, selected_uids) + + if before_run_status == RunStatus.Success or ( + not get_flags().skip_nodes_if_on_run_start_fails + ): + res = self.execute_nodes() + else: + executed_node_ids = { + r.node.unique_id for r in self.node_results if hasattr(r, "node") + } + + res = [] + + for index, node in enumerate(self._flattened_nodes or []): + if node.unique_id not in executed_node_ids: + fire_event( + SkippingDetails( + resource_type=node.resource_type, + schema=node.schema, + node_name=node.name, + index=index + 1, + total=self.num_nodes, + node_info=node.node_info, + ) + ) + skipped_node_result = mark_node_as_skipped(node, executed_node_ids, None) + if skipped_node_result: + self.node_results.append(skipped_node_result) + self.after_run(adapter, res) finally: adapter.cleanup_connections() @@ -525,7 +566,6 @@ def run(self): ) if len(self._flattened_nodes) == 0: - fire_event(Formatting("")) warn_or_error(NothingToDo()) result = self.get_result( results=[], @@ -533,7 +573,6 @@ def run(self): elapsed_time=0.0, ) else: - fire_event(Formatting("")) selected_uids = frozenset(n.unique_id for n in self._flattened_nodes) result = self.execute_with_hooks(selected_uids) diff --git a/schemas/dbt/manifest/v12.json b/schemas/dbt/manifest/v12.json index 5b6c338d614..c58275fa35b 100644 --- a/schemas/dbt/manifest/v12.json +++ b/schemas/dbt/manifest/v12.json @@ -727,6 +727,12 @@ "type": "string", "default": "" }, + "vars": { + "type": "object", + "propertyNames": { + "type": "string" + } + }, "root_path": { "anyOf": [ { @@ -1766,6 +1772,12 @@ "type": "string", "default": "" }, + "vars": { + "type": "object", + "propertyNames": { + "type": "string" + } + }, "language": { "type": "string", "default": "sql" @@ -2420,6 +2432,12 @@ "type": "string", "default": "" }, + "vars": { + "type": "object", + "propertyNames": { + "type": "string" + } + }, "language": { "type": "string", "default": "sql" @@ -3211,6 +3229,12 @@ "type": "string", "default": "" }, + "vars": { + "type": "object", + "propertyNames": { + "type": "string" + } + }, "language": { "type": "string", "default": "sql" @@ -4021,6 +4045,12 @@ "type": "string", "default": "" }, + "vars": { + "type": "object", + "propertyNames": { + "type": "string" + } + }, "language": { "type": "string", "default": "sql" @@ -5382,6 +5412,12 @@ "type": "string", "default": "" }, + "vars": { + "type": "object", + "propertyNames": { + "type": "string" + } + }, "language": { "type": "string", "default": "sql" @@ -6036,6 +6072,12 @@ "type": "string", "default": "" }, + "vars": { + "type": "object", + "propertyNames": { + "type": "string" + } + }, "language": { "type": "string", "default": "sql" @@ -6994,6 +7036,12 @@ "type": "string", "default": "" }, + "vars": { + "type": "object", + "propertyNames": { + "type": "string" + } + }, "language": { "type": "string", "default": "sql" @@ -8131,6 +8179,12 @@ "type": "string" } }, + "vars": { + "type": "object", + "propertyNames": { + "type": "string" + } + }, "relation_name": { "anyOf": [ { @@ -8524,6 +8578,12 @@ "type": "string" } }, + "vars": { + "type": "object", + "propertyNames": { + "type": "string" + } + }, "url": { "anyOf": [ { @@ -10601,6 +10661,12 @@ "type": "string", "default": "" }, + "vars": { + "type": "object", + "propertyNames": { + "type": "string" + } + }, "root_path": { "anyOf": [ { @@ -11640,6 +11706,12 @@ "type": "string", "default": "" }, + "vars": { + "type": "object", + "propertyNames": { + "type": "string" + } + }, "language": { "type": "string", "default": "sql" @@ -12294,6 +12366,12 @@ "type": "string", "default": "" }, + "vars": { + "type": "object", + "propertyNames": { + "type": "string" + } + }, "language": { "type": "string", "default": "sql" @@ -13085,6 +13163,12 @@ "type": "string", "default": "" }, + "vars": { + "type": "object", + "propertyNames": { + "type": "string" + } + }, "language": { "type": "string", "default": "sql" @@ -13895,6 +13979,12 @@ "type": "string", "default": "" }, + "vars": { + "type": "object", + "propertyNames": { + "type": "string" + } + }, "language": { "type": "string", "default": "sql" @@ -15256,6 +15346,12 @@ "type": "string", "default": "" }, + "vars": { + "type": "object", + "propertyNames": { + "type": "string" + } + }, "language": { "type": "string", "default": "sql" @@ -15910,6 +16006,12 @@ "type": "string", "default": "" }, + "vars": { + "type": "object", + "propertyNames": { + "type": "string" + } + }, "language": { "type": "string", "default": "sql" @@ -16868,6 +16970,12 @@ "type": "string", "default": "" }, + "vars": { + "type": "object", + "propertyNames": { + "type": "string" + } + }, "language": { "type": "string", "default": "sql" @@ -17996,6 +18104,12 @@ "type": "string" } }, + "vars": { + "type": "object", + "propertyNames": { + "type": "string" + } + }, "relation_name": { "anyOf": [ { @@ -18187,6 +18301,12 @@ "type": "string" } }, + "vars": { + "type": "object", + "propertyNames": { + "type": "string" + } + }, "url": { "anyOf": [ { diff --git a/tests/functional/adapter/hooks/test_on_run_hooks.py b/tests/functional/adapter/hooks/test_on_run_hooks.py new file mode 100644 index 00000000000..ffb34c3f23a --- /dev/null +++ b/tests/functional/adapter/hooks/test_on_run_hooks.py @@ -0,0 +1,156 @@ +import pytest + +from dbt.artifacts.schemas.results import RunStatus +from dbt.tests.util import get_artifact, run_dbt_and_capture + + +class Test__StartHookFail__FlagIsNone__ModelFail: + @pytest.fixture(scope="class") + def flags(self): + return {} + + @pytest.fixture(scope="class") + def project_config_update(self, flags): + return { + "on-run-start": [ + "create table {{ target.schema }}.my_hook_table ( id int )", # success + "drop table {{ target.schema }}.my_hook_table", # success + "insert into {{ target.schema }}.my_hook_table (id) values (1, 2, 3)", # fail + "create table {{ target.schema }}.my_hook_table ( id int )", # skip + ], + "flags": flags, + } + + @pytest.fixture(scope="class") + def models(self): + return { + "my_model.sql": "select * from {{ target.schema }}.my_hook_table" + " union all " + "select * from {{ target.schema }}.my_end_table" + } + + @pytest.fixture(scope="class") + def log_counts(self): + return "PASS=2 WARN=0 ERROR=2 SKIP=1 TOTAL=5" + + @pytest.fixture(scope="class") + def my_model_run_status(self): + return RunStatus.Error + + def test_results(self, project, log_counts, my_model_run_status): + results, log_output = run_dbt_and_capture(["run"], expect_pass=False) + + expected_results = [ + ("operation.test.test-on-run-start-0", RunStatus.Success), + ("operation.test.test-on-run-start-1", RunStatus.Success), + ("operation.test.test-on-run-start-2", RunStatus.Error), + ("operation.test.test-on-run-start-3", RunStatus.Skipped), + ("model.test.my_model", my_model_run_status), + ] + + assert [(result.node.unique_id, result.status) for result in results] == expected_results + assert log_counts in log_output + assert "4 project hooks, 1 view model" in log_output + + run_results = get_artifact(project.project_root, "target", "run_results.json") + assert [ + (result["unique_id"], result["status"]) for result in run_results["results"] + ] == expected_results + assert ( + f'relation "{project.test_schema}.my_hook_table" does not exist' + in run_results["results"][2]["message"] + ) + + +class Test__StartHookFail__FlagIsFalse__ModelFail(Test__StartHookFail__FlagIsNone__ModelFail): + @pytest.fixture(scope="class") + def flags(self): + return {"skip_nodes_if_on_run_start_fails": False} + + +class Test__StartHookFail__FlagIsTrue__ModelSkipped(Test__StartHookFail__FlagIsNone__ModelFail): + @pytest.fixture(scope="class") + def flags(self): + return {"skip_nodes_if_on_run_start_fails": True} + + @pytest.fixture(scope="class") + def log_counts(self): + return "PASS=2 WARN=0 ERROR=1 SKIP=2 TOTAL=5" + + @pytest.fixture(scope="class") + def my_model_run_status(self): + return RunStatus.Skipped + + +class Test__ModelPass__EndHookFail: + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "on-run-end": [ + "create table {{ target.schema }}.my_hook_table ( id int )", # success + "drop table {{ target.schema }}.my_hook_table", # success + "insert into {{ target.schema }}.my_hook_table (id) values (1, 2, 3)", # fail + "create table {{ target.schema }}.my_hook_table ( id int )", # skip + ], + } + + @pytest.fixture(scope="class") + def models(self): + return {"my_model.sql": "select 1"} + + def test_results(self, project): + results, log_output = run_dbt_and_capture(["run"], expect_pass=False) + + expected_results = [ + ("model.test.my_model", RunStatus.Success), + ("operation.test.test-on-run-end-0", RunStatus.Success), + ("operation.test.test-on-run-end-1", RunStatus.Success), + ("operation.test.test-on-run-end-2", RunStatus.Error), + ("operation.test.test-on-run-end-3", RunStatus.Skipped), + ] + + assert [(result.node.unique_id, result.status) for result in results] == expected_results + assert "PASS=3 WARN=0 ERROR=1 SKIP=1 TOTAL=5" in log_output + assert "4 project hooks, 1 view model" in log_output + + run_results = get_artifact(project.project_root, "target", "run_results.json") + assert [ + (result["unique_id"], result["status"]) for result in run_results["results"] + ] == expected_results + assert ( + f'relation "{project.test_schema}.my_hook_table" does not exist' + in run_results["results"][3]["message"] + ) + + +class Test__SelectorEmpty__NoHooksRan: + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "on-run-start": [ + "create table {{ target.schema }}.my_hook_table ( id int )", # success + "drop table {{ target.schema }}.my_hook_table", # success + ], + "on-run-end": [ + "create table {{ target.schema }}.my_hook_table ( id int )", # success + "drop table {{ target.schema }}.my_hook_table", # success + ], + } + + @pytest.fixture(scope="class") + def models(self): + return {"my_model.sql": "select 1"} + + def test_results(self, project): + results, log_output = run_dbt_and_capture( + ["--debug", "run", "--select", "tag:no_such_tag", "--log-format", "json"] + ) + + assert results.results == [] + assert ( + "The selection criterion 'tag:no_such_tag' does not match any enabled nodes" + in log_output + ) + + run_results = get_artifact(project.project_root, "target", "run_results.json") + assert run_results["results"] == [] diff --git a/tests/functional/adapter/hooks/test_run_hooks.py b/tests/functional/adapter/hooks/test_run_hooks.py deleted file mode 100644 index f8bec5c6aeb..00000000000 --- a/tests/functional/adapter/hooks/test_run_hooks.py +++ /dev/null @@ -1,161 +0,0 @@ -import os -from pathlib import Path - -import pytest - -from dbt.tests.util import check_table_does_not_exist, run_dbt -from tests.functional.adapter.hooks.fixtures import ( - macros__before_and_after, - macros__hook, - macros_missing_column, - models__hooks, - models__missing_column, - seeds__example_seed_csv, -) - - -class TestPrePostRunHooks(object): - @pytest.fixture(scope="function") - def setUp(self, project): - project.run_sql_file(project.test_data_dir / Path("seed_run.sql")) - project.run_sql(f"drop table if exists { project.test_schema }.schemas") - project.run_sql(f"drop table if exists { project.test_schema }.db_schemas") - os.environ["TERM_TEST"] = "TESTING" - - @pytest.fixture(scope="class") - def macros(self): - return {"hook.sql": macros__hook, "before-and-after.sql": macros__before_and_after} - - @pytest.fixture(scope="class") - def models(self): - return {"hooks.sql": models__hooks} - - @pytest.fixture(scope="class") - def seeds(self): - return {"example_seed.csv": seeds__example_seed_csv} - - @pytest.fixture(scope="class") - def project_config_update(self): - return { - # The create and drop table statements here validate that these hooks run - # in the same order that they are defined. Drop before create is an error. - # Also check that the table does not exist below. - "on-run-start": [ - "{{ custom_run_hook('start', target, run_started_at, invocation_id) }}", - "create table {{ target.schema }}.start_hook_order_test ( id int )", - "drop table {{ target.schema }}.start_hook_order_test", - "{{ log(env_var('TERM_TEST'), info=True) }}", - ], - "on-run-end": [ - "{{ custom_run_hook('end', target, run_started_at, invocation_id) }}", - "create table {{ target.schema }}.end_hook_order_test ( id int )", - "drop table {{ target.schema }}.end_hook_order_test", - "create table {{ target.schema }}.schemas ( schema text )", - "insert into {{ target.schema }}.schemas (schema) values {% for schema in schemas %}( '{{ schema }}' ){% if not loop.last %},{% endif %}{% endfor %}", - "create table {{ target.schema }}.db_schemas ( db text, schema text )", - "insert into {{ target.schema }}.db_schemas (db, schema) values {% for db, schema in database_schemas %}('{{ db }}', '{{ schema }}' ){% if not loop.last %},{% endif %}{% endfor %}", - ], - "seeds": { - "quote_columns": False, - }, - } - - def get_ctx_vars(self, state, project): - fields = [ - "test_state", - "target_dbname", - "target_host", - "target_name", - "target_schema", - "target_threads", - "target_type", - "target_user", - "target_pass", - "run_started_at", - "invocation_id", - "thread_id", - ] - field_list = ", ".join(['"{}"'.format(f) for f in fields]) - query = f"select {field_list} from {project.test_schema}.on_run_hook where test_state = '{state}'" - - vals = project.run_sql(query, fetch="all") - assert len(vals) != 0, "nothing inserted into on_run_hook table" - assert len(vals) == 1, "too many rows in hooks table" - ctx = dict([(k, v) for (k, v) in zip(fields, vals[0])]) - - return ctx - - def assert_used_schemas(self, project): - schemas_query = "select * from {}.schemas".format(project.test_schema) - results = project.run_sql(schemas_query, fetch="all") - assert len(results) == 1 - assert results[0][0] == project.test_schema - - db_schemas_query = "select * from {}.db_schemas".format(project.test_schema) - results = project.run_sql(db_schemas_query, fetch="all") - assert len(results) == 1 - assert results[0][0] == project.database - assert results[0][1] == project.test_schema - - def check_hooks(self, state, project, host): - ctx = self.get_ctx_vars(state, project) - - assert ctx["test_state"] == state - assert ctx["target_dbname"] == "dbt" - assert ctx["target_host"] == host - assert ctx["target_name"] == "default" - assert ctx["target_schema"] == project.test_schema - assert ctx["target_threads"] == 4 - assert ctx["target_type"] == "postgres" - assert ctx["target_user"] == "root" - assert ctx["target_pass"] == "" - - assert ( - ctx["run_started_at"] is not None and len(ctx["run_started_at"]) > 0 - ), "run_started_at was not set" - assert ( - ctx["invocation_id"] is not None and len(ctx["invocation_id"]) > 0 - ), "invocation_id was not set" - assert ctx["thread_id"].startswith("Thread-") or ctx["thread_id"] == "MainThread" - - def test_pre_and_post_run_hooks(self, setUp, project, dbt_profile_target): - run_dbt(["run"]) - - self.check_hooks("start", project, dbt_profile_target.get("host", None)) - self.check_hooks("end", project, dbt_profile_target.get("host", None)) - - check_table_does_not_exist(project.adapter, "start_hook_order_test") - check_table_does_not_exist(project.adapter, "end_hook_order_test") - self.assert_used_schemas(project) - - def test_pre_and_post_seed_hooks(self, setUp, project, dbt_profile_target): - run_dbt(["seed"]) - - self.check_hooks("start", project, dbt_profile_target.get("host", None)) - self.check_hooks("end", project, dbt_profile_target.get("host", None)) - - check_table_does_not_exist(project.adapter, "start_hook_order_test") - check_table_does_not_exist(project.adapter, "end_hook_order_test") - self.assert_used_schemas(project) - - -class TestAfterRunHooks(object): - @pytest.fixture(scope="class") - def macros(self): - return {"temp_macro.sql": macros_missing_column} - - @pytest.fixture(scope="class") - def models(self): - return {"test_column.sql": models__missing_column} - - @pytest.fixture(scope="class") - def project_config_update(self): - return { - # The create and drop table statements here validate that these hooks run - # in the same order that they are defined. Drop before create is an error. - # Also check that the table does not exist below. - "on-run-start": "- {{ export_table_check() }}" - } - - def test_missing_column_pre_hook(self, project): - run_dbt(["run"], expect_pass=False) diff --git a/tests/functional/artifacts/expected_manifest.py b/tests/functional/artifacts/expected_manifest.py index 501efef85a2..05317fa0df9 100644 --- a/tests/functional/artifacts/expected_manifest.py +++ b/tests/functional/artifacts/expected_manifest.py @@ -363,6 +363,7 @@ def expected_seeded_manifest(project, model_database=None, quote_model=False): "extra_ctes": [], "checksum": checksum_file(model_sql_path), "unrendered_config": unrendered_model_config, + "vars": {}, "access": "protected", "version": None, "latest_version": None, @@ -462,6 +463,7 @@ def expected_seeded_manifest(project, model_database=None, quote_model=False): "extra_ctes": [], "checksum": checksum_file(second_model_sql_path), "unrendered_config": unrendered_second_config, + "vars": {}, "access": "protected", "version": None, "latest_version": None, @@ -544,6 +546,7 @@ def expected_seeded_manifest(project, model_database=None, quote_model=False): "docs": {"node_color": None, "show": True}, "checksum": checksum_file(seed_path), "unrendered_config": unrendered_seed_config, + "vars": {}, "relation_name": relation_name_node_format.format( project.database, my_schema_name, "seed" ), @@ -599,6 +602,7 @@ def expected_seeded_manifest(project, model_database=None, quote_model=False): }, "checksum": {"name": "none", "checksum": ""}, "unrendered_config": unrendered_test_config, + "vars": {}, "contract": {"checksum": None, "enforced": False, "alias_types": True}, }, "snapshot.test.snapshot_seed": { @@ -646,6 +650,7 @@ def expected_seeded_manifest(project, model_database=None, quote_model=False): "tags": [], "unique_id": "snapshot.test.snapshot_seed", "unrendered_config": unrendered_snapshot_config, + "vars": {"alternate_schema": alternate_schema}, }, "test.test.test_nothing_model_.5d38568946": { "alias": "test_nothing_model_", @@ -698,6 +703,7 @@ def expected_seeded_manifest(project, model_database=None, quote_model=False): }, "checksum": {"name": "none", "checksum": ""}, "unrendered_config": unrendered_test_config, + "vars": {}, }, "test.test.unique_model_id.67b76558ff": { "alias": "unique_model_id", @@ -751,6 +757,7 @@ def expected_seeded_manifest(project, model_database=None, quote_model=False): }, "checksum": {"name": "none", "checksum": ""}, "unrendered_config": unrendered_test_config, + "vars": {}, }, }, "sources": { @@ -809,6 +816,7 @@ def expected_seeded_manifest(project, model_database=None, quote_model=False): "unrendered_config": {}, "unrendered_database": None, "unrendered_schema": "{{ var('test_schema') }}", + "vars": {"test_schema": ANY}, }, }, "exposures": { @@ -843,6 +851,7 @@ def expected_seeded_manifest(project, model_database=None, quote_model=False): "unique_id": "exposure.test.notebook_exposure", "url": "http://example.com/notebook/1", "unrendered_config": {}, + "vars": {}, }, "exposure.test.simple_exposure": { "created_at": ANY, @@ -875,6 +884,7 @@ def expected_seeded_manifest(project, model_database=None, quote_model=False): "meta": {}, "tags": [], "unrendered_config": {}, + "vars": {}, }, }, "metrics": {}, @@ -992,6 +1002,7 @@ def expected_references_manifest(project): "extra_ctes": [], "checksum": checksum_file(ephemeral_copy_path), "unrendered_config": get_unrendered_model_config(materialized="ephemeral"), + "vars": {}, "access": "protected", "version": None, "latest_version": None, @@ -1064,6 +1075,7 @@ def expected_references_manifest(project): "unrendered_config": get_unrendered_model_config( materialized="table", group="test_group" ), + "vars": {}, "access": "protected", "version": None, "latest_version": None, @@ -1132,6 +1144,7 @@ def expected_references_manifest(project): "extra_ctes": [], "checksum": checksum_file(view_summary_path), "unrendered_config": get_unrendered_model_config(materialized="view"), + "vars": {}, "access": "protected", "version": None, "latest_version": None, @@ -1215,6 +1228,7 @@ def expected_references_manifest(project): "unique_id": "seed.test.seed", "checksum": checksum_file(seed_path), "unrendered_config": get_unrendered_seed_config(), + "vars": {}, "relation_name": '"{0}"."{1}".seed'.format(project.database, my_schema_name), }, "snapshot.test.snapshot_seed": { @@ -1229,7 +1243,10 @@ def expected_references_manifest(project): "config": get_rendered_snapshot_config(target_schema=alternate_schema), "contract": {"checksum": None, "enforced": False, "alias_types": True}, "database": model_database, - "depends_on": {"macros": [], "nodes": ["seed.test.seed"]}, + "depends_on": { + "macros": [], + "nodes": ["seed.test.seed"], + }, "description": "", "docs": {"node_color": None, "show": True}, "extra_ctes": [], @@ -1257,6 +1274,7 @@ def expected_references_manifest(project): "unrendered_config": get_unrendered_snapshot_config( target_schema=alternate_schema ), + "vars": {"alternate_schema": alternate_schema}, }, }, "sources": { @@ -1313,6 +1331,7 @@ def expected_references_manifest(project): "unrendered_config": {}, "unrendered_database": None, "unrendered_schema": "{{ var('test_schema') }}", + "vars": {"test_schema": ANY}, }, }, "exposures": { @@ -1338,6 +1357,7 @@ def expected_references_manifest(project): "package_name": "test", "path": "schema.yml", "refs": [{"name": "view_summary", "package": None, "version": None}], + "vars": {}, "resource_type": "exposure", "sources": [], "type": "notebook", @@ -1598,6 +1618,7 @@ def expected_versions_manifest(project): group="test_group", meta={"size": "large", "color": "blue"}, ), + "vars": {}, "access": "protected", "version": 1, "latest_version": 2, @@ -1669,6 +1690,7 @@ def expected_versions_manifest(project): "unrendered_config": get_unrendered_model_config( materialized="view", group="test_group", meta={"size": "large", "color": "red"} ), + "vars": {}, "access": "protected", "version": 2, "latest_version": 2, @@ -1727,6 +1749,7 @@ def expected_versions_manifest(project): "extra_ctes": [], "checksum": checksum_file(ref_versioned_model_path), "unrendered_config": get_unrendered_model_config(), + "vars": {}, "access": "protected", "version": None, "latest_version": None, @@ -1784,6 +1807,7 @@ def expected_versions_manifest(project): }, "checksum": {"name": "none", "checksum": ""}, "unrendered_config": unrendered_test_config, + "vars": {}, }, "test.test.unique_versioned_model_v1_count.0b4c0b688a": { "alias": "unique_versioned_model_v1_count", @@ -1837,6 +1861,7 @@ def expected_versions_manifest(project): }, "checksum": {"name": "none", "checksum": ""}, "unrendered_config": unrendered_test_config, + "vars": {}, }, "test.test.unique_versioned_model_v2_first_name.998430d28e": { "alias": "unique_versioned_model_v2_first_name", @@ -1890,6 +1915,7 @@ def expected_versions_manifest(project): }, "checksum": {"name": "none", "checksum": ""}, "unrendered_config": unrendered_test_config, + "vars": {}, }, }, "exposures": { @@ -1921,6 +1947,7 @@ def expected_versions_manifest(project): "unique_id": "exposure.test.notebook_exposure", "url": None, "unrendered_config": {}, + "vars": {}, }, }, "metrics": {}, diff --git a/tests/functional/artifacts/test_run_results.py b/tests/functional/artifacts/test_run_results.py index dea947f342b..a474f80c4c7 100644 --- a/tests/functional/artifacts/test_run_results.py +++ b/tests/functional/artifacts/test_run_results.py @@ -55,7 +55,7 @@ def project_config_update(self): def test_results_serializable(self, project): results = run_dbt(["run"]) - assert len(results.results) == 1 + assert len(results.results) == 2 # This test is failing due to the faulty assumptions that run_results.json would diff --git a/tests/functional/defer_state/test_modified_state.py b/tests/functional/defer_state/test_modified_state.py index 2ded38e742b..3d0f0955c93 100644 --- a/tests/functional/defer_state/test_modified_state.py +++ b/tests/functional/defer_state/test_modified_state.py @@ -1144,3 +1144,216 @@ def test_changed_semantic_model_contents(self, project): write_file(modified_semantic_model_schema_yml, "models", "schema.yml") results = run_dbt(["list", "-s", "state:modified", "--state", "./state"]) assert len(results) == 1 + + +class TestModifiedVarsLegacy(BaseModifiedState): + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "flags": { + "state_modified_compare_vars": False, + }, + "vars": {"my_var": 1}, + } + + @pytest.fixture(scope="class") + def models(self): + return { + "model_with_var.sql": "select {{ var('my_var') }} as id", + } + + def test_changed_vars(self, project): + self.run_and_save_state() + + # No var change + assert not run_dbt(["list", "-s", "state:modified", "--state", "./state"]) + assert not run_dbt(["list", "-s", "state:modified.vars", "--state", "./state"]) + + # Modify var (my_var: 1 -> 2) + update_config_file({"vars": {"my_var": 2}}, "dbt_project.yml") + + # By default, do not detect vars in state:modified to preserve legacy behaviour + assert run_dbt(["list", "-s", "state:modified", "--state", "./state"]) == [] + + # state:modified.vars is a new selector, opt-in method -> returns results + assert run_dbt(["list", "-s", "state:modified.vars", "--state", "./state"]) == [ + "test.model_with_var" + ] + + # Reset dbt_project.yml + update_config_file({"vars": {"my_var": 1}}, "dbt_project.yml") + + # Modify var via --var CLI flag + assert not run_dbt( + ["list", "--vars", '{"my_var": 1}', "-s", "state:modified", "--state", "./state"] + ) + assert ( + run_dbt( + ["list", "--vars", '{"my_var": 2}', "-s", "state:modified", "--state", "./state"] + ) + == [] + ) + # state:modified.vars is a new selector, opt-in method -> returns results + assert run_dbt( + ["list", "--vars", '{"my_var": 2}', "-s", "state:modified.vars", "--state", "./state"] + ) == ["test.model_with_var"] + + +class TestModifiedVars(BaseModifiedState): + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "flags": { + "state_modified_compare_vars": True, + }, + "vars": {"my_var": 1}, + } + + @pytest.fixture(scope="class") + def models(self): + return { + "model_with_var.sql": "select {{ var('my_var') }} as id", + } + + def test_changed_vars(self, project): + self.run_and_save_state() + + # No var change + assert not run_dbt(["list", "-s", "state:modified", "--state", "./state"]) + assert not run_dbt(["list", "-s", "state:modified.vars", "--state", "./state"]) + + # Modify var (my_var: 1 -> 2) + update_config_file({"vars": {"my_var": 2}}, "dbt_project.yml") + assert run_dbt(["list", "-s", "state:modified", "--state", "./state"]) == [ + "test.model_with_var" + ] + assert run_dbt(["list", "-s", "state:modified.vars", "--state", "./state"]) == [ + "test.model_with_var" + ] + + # Reset dbt_project.yml + update_config_file({"vars": {"my_var": 1}}, "dbt_project.yml") + + # Modify var via --var CLI flag + assert not run_dbt( + ["list", "--vars", '{"my_var": 1}', "-s", "state:modified", "--state", "./state"] + ) + assert run_dbt( + ["list", "--vars", '{"my_var": 2}', "-s", "state:modified", "--state", "./state"] + ) == ["test.model_with_var"] + + +macro_with_var_sql = """ +{% macro macro_with_var() %} + {{ var('my_var') }} +{% endmacro %} +""" + + +class TestModifiedMacroVars(BaseModifiedState): + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "flags": { + "state_modified_compare_vars": True, + }, + "vars": {"my_var": 1}, + } + + @pytest.fixture(scope="class") + def models(self): + return { + "model_with_macro.sql": "select {{ macro_with_var() }} as id", + } + + @pytest.fixture(scope="class") + def macros(self): + return { + "macro_with_var.sql": macro_with_var_sql, + } + + def test_changed_vars(self, project): + self.run_and_save_state() + + # No var change + assert not run_dbt(["list", "-s", "state:modified", "--state", "./state"]) + assert not run_dbt(["list", "-s", "state:modified.vars", "--state", "./state"]) + + # Modify var (my_var: 1 -> 2) + update_config_file({"vars": {"my_var": 2}}, "dbt_project.yml") + assert run_dbt(["list", "-s", "state:modified", "--state", "./state"]) == [ + "test.model_with_macro" + ] + assert run_dbt(["list", "-s", "state:modified.vars", "--state", "./state"]) == [ + "test.model_with_macro" + ] + + # Macros themselves not captured as modified because the var value depends on a node's context + assert not run_dbt(["list", "-s", "state:modified.macros", "--state", "./state"]) + + +# TODO: test versioned models, tests +model_with_var_schema_yml = """ +version: 2 +models: + - name: model_with_var + config: + materialized: "{{ var('my_var') }}" + +exposures: + - name: exposure_name + type: dashboard + owner: + name: "{{ var('my_var') }}" + +sources: + - name: jaffle_shop + database: "{{ var('my_var') }}" + schema: jaffle_shop + tables: + - name: orders + - name: customers +""" + + +class TestModifiedVarsSchemaYml(BaseModifiedState): + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "flags": { + "state_modified_compare_vars": True, + }, + "vars": {"my_var": "table"}, + } + + @pytest.fixture(scope="class") + def models(self): + return {"model_with_var.sql": "select 1 as id", "schema.yml": model_with_var_schema_yml} + + def test_changed_vars(self, project): + self.run_and_save_state() + + # No var change + assert not run_dbt(["list", "-s", "state:modified", "--state", "./state"]) + assert not run_dbt(["list", "-s", "state:modified.vars", "--state", "./state"]) + + # Modify var (my_var: table -> view) + update_config_file({"vars": {"my_var": "view"}}, "dbt_project.yml") + assert sorted(run_dbt(["list", "-s", "state:modified", "--state", "./state"])) == sorted( + [ + "test.model_with_var", + "exposure:test.exposure_name", + "source:test.jaffle_shop.customers", + "source:test.jaffle_shop.orders", + ] + ) + assert sorted( + run_dbt(["list", "-s", "state:modified.vars", "--state", "./state"]) + ) == sorted( + [ + "test.model_with_var", + "exposure:test.exposure_name", + "source:test.jaffle_shop.customers", + "source:test.jaffle_shop.orders", + ] + ) diff --git a/tests/functional/dependencies/test_local_dependency.py b/tests/functional/dependencies/test_local_dependency.py index d26345f2da6..e5bf1de5323 100644 --- a/tests/functional/dependencies/test_local_dependency.py +++ b/tests/functional/dependencies/test_local_dependency.py @@ -328,7 +328,7 @@ def test_hook_dependency(self, prepare_dependencies, project): run_dbt(["deps", "--vars", cli_vars]) results = run_dbt(["run", "--vars", cli_vars]) - assert len(results) == 2 + assert len(results) == 8 check_relations_equal(project.adapter, ["actual", "expected"]) diff --git a/tests/functional/list/test_list.py b/tests/functional/list/test_list.py index 6894ec96bb4..7b98e5c8251 100644 --- a/tests/functional/list/test_list.py +++ b/tests/functional/list/test_list.py @@ -49,7 +49,10 @@ def expect_snapshot_output(self, happy_path_project): # noqa: F811 "json": { "name": "my_snapshot", "package_name": "test", - "depends_on": {"nodes": [], "macros": []}, + "depends_on": { + "nodes": [], + "macros": [], + }, "tags": [], "config": { "enabled": True, diff --git a/tests/functional/retry/test_retry.py b/tests/functional/retry/test_retry.py index 012db25e42f..0a9e94f2e06 100644 --- a/tests/functional/retry/test_retry.py +++ b/tests/functional/retry/test_retry.py @@ -5,7 +5,7 @@ from dbt.contracts.results import RunStatus, TestStatus from dbt.exceptions import DbtRuntimeError, TargetNotFoundError -from dbt.tests.util import rm_file, run_dbt, write_file +from dbt.tests.util import rm_file, run_dbt, update_config_file, write_file from tests.functional.retry.fixtures import ( macros__alter_timezone_sql, models__sample_model, @@ -365,3 +365,92 @@ def test_retry_target_path_flag(self, project): results = run_dbt(["retry", "--state", "artifacts", "--target-path", "my_target_path"]) assert len(results) == 1 assert Path("my_target_path").is_dir() + + +class TestRetryHooksAlwaysRun: + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "on-run-start": ["select 1;"], + "on-run-end": ["select 2;"], + } + + @pytest.fixture(scope="class") + def models(self): + return { + "sample_model.sql": models__sample_model, + } + + def test_retry_hooks_always_run(self, project): + res = run_dbt(["run", "--target-path", "target"], expect_pass=False) + assert len(res) == 3 + + write_file(models__second_model, "models", "sample_model.sql") + res = run_dbt(["retry", "--state", "target"]) + assert len(res) == 3 + + +class TestFixRetryHook: + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "flags": { + "skip_nodes_if_on_run_start_fails": True, + }, + "on-run-start": [ + "select 1 as id", + "select column_does_not_exist", + "select 2 as id", + ], + } + + @pytest.fixture(scope="class") + def models(self): + return { + "sample_model.sql": "select 1 as id, 1 as foo", + "second_model.sql": models__second_model, + "union_model.sql": models__union_model, + } + + def test_fix_retry_hook(self, project): + res = run_dbt(["run"], expect_pass=False) + assert {r.node.unique_id: r.status for r in res.results} == { + "operation.test.test-on-run-start-0": RunStatus.Success, + "operation.test.test-on-run-start-1": RunStatus.Error, + "operation.test.test-on-run-start-2": RunStatus.Skipped, + "model.test.sample_model": RunStatus.Skipped, + "model.test.second_model": RunStatus.Skipped, + "model.test.union_model": RunStatus.Skipped, + } + + res = run_dbt(["retry"], expect_pass=False) + assert {r.node.unique_id: r.status for r in res.results} == { + "operation.test.test-on-run-start-0": RunStatus.Success, + "operation.test.test-on-run-start-1": RunStatus.Error, + "operation.test.test-on-run-start-2": RunStatus.Skipped, + "model.test.sample_model": RunStatus.Skipped, + "model.test.second_model": RunStatus.Skipped, + "model.test.union_model": RunStatus.Skipped, + } + + new_dbt_project_yml = { + "flags": { + "skip_nodes_if_on_run_start_fails": True, + }, + "on-run-start": [ + "select 1 as id", + "select 3 as id", + "select 2 as id", + ], + } + + update_config_file(new_dbt_project_yml, project.project_root, "dbt_project.yml") + res = run_dbt(["retry"]) + assert {r.node.unique_id: r.status for r in res.results} == { + "operation.test.test-on-run-start-0": RunStatus.Success, + "operation.test.test-on-run-start-1": RunStatus.Success, + "operation.test.test-on-run-start-2": RunStatus.Success, + "model.test.sample_model": RunStatus.Success, + "model.test.second_model": RunStatus.Success, + "model.test.union_model": RunStatus.Success, + } diff --git a/tests/functional/schema_tests/test_schema_v2_tests.py b/tests/functional/schema_tests/test_schema_v2_tests.py index ea33e62bce3..519ff64e853 100644 --- a/tests/functional/schema_tests/test_schema_v2_tests.py +++ b/tests/functional/schema_tests/test_schema_v2_tests.py @@ -474,9 +474,9 @@ def test_hooks_do_run_for_tests( ): # This passes now that hooks run, a behavior we changed in v1.0 results = run_dbt(["test", "--model", "ephemeral"]) - assert len(results) == 1 + assert len(results) == 3 for result in results: - assert result.status == "pass" + assert result.status in ("pass", "success") assert not result.skipped assert result.failures == 0, "test {} failed".format(result.node.name) @@ -507,9 +507,9 @@ def test_these_hooks_dont_run_for_tests( ): # This would fail if the hooks ran results = run_dbt(["test", "--model", "ephemeral"]) - assert len(results) == 1 + assert len(results) == 3 for result in results: - assert result.status == "pass" + assert result.status in ("pass", "success") assert not result.skipped assert result.failures == 0, "test {} failed".format(result.node.name) diff --git a/tests/functional/sources/test_source_freshness.py b/tests/functional/sources/test_source_freshness.py index 2f42a3aaa56..70c8866869e 100644 --- a/tests/functional/sources/test_source_freshness.py +++ b/tests/functional/sources/test_source_freshness.py @@ -129,12 +129,12 @@ def _assert_freshness_results(self, path, state): ] def _assert_project_hooks_called(self, logs: str): - assert "Running 1 on-run-start hook" in logs - assert "Running 1 on-run-end hook" in logs + assert "test.on-run-start.0" in logs + assert "test.on-run-start.0" in logs def _assert_project_hooks_not_called(self, logs: str): - assert "Running 1 on-run-start hook" not in logs - assert "Running 1 on-run-end hook" not in logs + assert "test.on-run-end.0" not in logs + assert "test.on-run-end.0" not in logs class TestSourceFreshness(SuccessfulSourceFreshnessTest): diff --git a/tests/unit/contracts/graph/test_manifest.py b/tests/unit/contracts/graph/test_manifest.py index 36f14537b09..eaff7f6c57e 100644 --- a/tests/unit/contracts/graph/test_manifest.py +++ b/tests/unit/contracts/graph/test_manifest.py @@ -97,6 +97,7 @@ "defer_relation", "time_spine", "batches", + "vars", } ) diff --git a/tests/unit/contracts/graph/test_nodes.py b/tests/unit/contracts/graph/test_nodes.py index 4e3a2353105..1271cb7108f 100644 --- a/tests/unit/contracts/graph/test_nodes.py +++ b/tests/unit/contracts/graph/test_nodes.py @@ -1,5 +1,6 @@ import pickle import re +from argparse import Namespace from dataclasses import replace import pytest @@ -24,6 +25,11 @@ ) +@pytest.fixture +def args_for_flags() -> Namespace: + return Namespace(state_modified_compare_vars=False) + + def norm_whitespace(string): _RE_COMBINE_WHITESPACE = re.compile(r"\s+") string = _RE_COMBINE_WHITESPACE.sub(" ", string).strip() @@ -200,6 +206,7 @@ def basic_compiled_dict(): }, "unrendered_config": {}, "unrendered_config_call_dict": {}, + "vars": {}, "config_call_dict": {}, "access": "protected", "constraints": [], @@ -520,6 +527,7 @@ def basic_compiled_schema_test_dict(): "severity": "warn", }, "unrendered_config_call_dict": {}, + "vars": {}, "config_call_dict": {}, } diff --git a/tests/unit/contracts/graph/test_nodes_parsed.py b/tests/unit/contracts/graph/test_nodes_parsed.py index 1b4f4b696a4..db0b6fae653 100644 --- a/tests/unit/contracts/graph/test_nodes_parsed.py +++ b/tests/unit/contracts/graph/test_nodes_parsed.py @@ -67,7 +67,9 @@ @pytest.fixture def args_for_flags() -> Namespace: return Namespace( - send_anonymous_usage_stats=False, state_modified_compare_more_unrendered_values=False + send_anonymous_usage_stats=False, + state_modified_compare_more_unrendered_values=False, + state_modified_compare_vars=False, ) @@ -202,6 +204,7 @@ def base_parsed_model_dict(): }, "unrendered_config": {}, "unrendered_config_call_dict": {}, + "vars": {}, "config_call_dict": {}, "access": AccessType.Protected.value, "constraints": [], @@ -258,6 +261,7 @@ def minimal_parsed_model_dict(): "checksum": "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", }, "unrendered_config": {}, + "vars": {}, } @@ -325,6 +329,7 @@ def complex_parsed_model_dict(): "post_hook": ['insert into blah(a, b) select "1", 1'], }, "unrendered_config_call_dict": {}, + "vars": {}, "config_call_dict": {}, "access": AccessType.Protected.value, "constraints": [], @@ -535,6 +540,7 @@ def basic_parsed_seed_dict(): "checksum": {"name": "path", "checksum": "seeds/seed.csv"}, "unrendered_config": {}, "unrendered_config_call_dict": {}, + "vars": {}, "config_call_dict": {}, } @@ -641,6 +647,7 @@ def complex_parsed_seed_dict(): "persist_docs": {"relation": True, "columns": True}, }, "unrendered_config_call_dict": {}, + "vars": {}, "config_call_dict": {}, } @@ -841,6 +848,7 @@ def base_parsed_hook_dict(): }, "unrendered_config": {}, "unrendered_config_call_dict": {}, + "vars": {}, "config_call_dict": {}, } @@ -935,6 +943,7 @@ def complex_parsed_hook_dict(): "materialized": "table", }, "unrendered_config_call_dict": {}, + "vars": {}, "config_call_dict": {}, } @@ -1080,6 +1089,7 @@ def basic_parsed_schema_test_dict(): }, "unrendered_config": {}, "unrendered_config_call_dict": {}, + "vars": {}, "config_call_dict": {}, } @@ -1169,6 +1179,7 @@ def complex_parsed_schema_test_dict(): }, "unrendered_config": {"materialized": "table", "severity": "WARN"}, "unrendered_config_call_dict": {}, + "vars": {}, "config_call_dict": {}, } @@ -1564,6 +1575,7 @@ def basic_timestamp_snapshot_dict(): "target_schema": "some_snapshot_schema", }, "unrendered_config_call_dict": {}, + "vars": {}, "config_call_dict": {}, } @@ -1670,6 +1682,7 @@ def basic_check_snapshot_dict(): }, "unrendered_config_call_dict": {}, "config_call_dict": {}, + "vars": {}, } @@ -1879,6 +1892,7 @@ def basic_parsed_source_definition_dict(): "enabled": True, }, "unrendered_config": {}, + "vars": {}, } @@ -1911,6 +1925,7 @@ def complex_parsed_source_definition_dict(): "freshness": {"warn_after": {"period": "hour", "count": 1}, "error_after": {}}, "loaded_at_field": "loaded_at", "unrendered_config": {}, + "vars": {}, } @@ -2082,6 +2097,7 @@ def basic_parsed_exposure_dict(): "enabled": True, }, "unrendered_config": {}, + "vars": {}, } @@ -2137,6 +2153,7 @@ def complex_parsed_exposure_dict(): "enabled": True, }, "unrendered_config": {}, + "vars": {}, } diff --git a/tests/unit/graph/test_selector_methods.py b/tests/unit/graph/test_selector_methods.py index 5fa96f7c158..d500c631a1b 100644 --- a/tests/unit/graph/test_selector_methods.py +++ b/tests/unit/graph/test_selector_methods.py @@ -646,7 +646,9 @@ def previous_state(manifest): @pytest.fixture def args_for_flags(): - return Namespace(state_modified_compare_more_unrendered_values=False) + return Namespace( + state_modified_compare_more_unrendered_values=False, state_modified_compare_vars=False + ) def add_node(manifest, node): diff --git a/tests/unit/parser/test_parser.py b/tests/unit/parser/test_parser.py index b9b414ebac2..0d95a495cdf 100644 --- a/tests/unit/parser/test_parser.py +++ b/tests/unit/parser/test_parser.py @@ -463,6 +463,7 @@ def test__parse_basic_source(self): @mock.patch("dbt.parser.sources.get_adapter") def test__parse_basic_source_meta(self, mock_get_adapter): block = self.file_block_for(MULTIPLE_TABLE_SOURCE_META, "test_one.yml") + self.parser.manifest.files[block.file.file_id] = block.file dct = yaml_from_file(block.file) self.parser.parse_file(block, dct) self.assert_has_manifest_lengths(self.parser.manifest, sources=2)