Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create skip_nodes_if_on_run_start_fails behavior change flag #10699

Merged
merged 32 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
c54e228
wip: refactor safe_run_hooks and add more metadata in
aranke Sep 11, 2024
7ad6c39
Add back comment about log_contextvars
aranke Sep 11, 2024
7fcfb0c
make printer correct
aranke Sep 11, 2024
064f60e
fix existing tests
aranke Sep 12, 2024
59f558c
Merge branch 'main' into fix_7387
aranke Sep 12, 2024
526b31c
only execute nodes if pre-hooks succeeded
aranke Sep 12, 2024
12b7694
Add skip messages for nodes
aranke Sep 13, 2024
8694ea5
Merge branch 'main' into fix_7387
aranke Sep 16, 2024
75f64d0
One result per hook
aranke Sep 17, 2024
900d1cd
o1 refactor
aranke Sep 17, 2024
235aaec
Merge branch 'main' into fix_7387
aranke Sep 17, 2024
6970291
fix tests
aranke Sep 17, 2024
66997f2
Add flag
aranke Sep 17, 2024
498a015
Fix existing tests
aranke Sep 17, 2024
512b252
Write tests for on-run-start hooks
aranke Sep 18, 2024
37c62ab
Add test for on-run-end hooks
aranke Sep 18, 2024
ad1d952
do message test off run_results.json
aranke Sep 18, 2024
71ead6a
add no hooks ran test
aranke Sep 18, 2024
0dcc944
Add test for retry to always run hooks
aranke Sep 18, 2024
66fbae6
Add changie
aranke Sep 18, 2024
4234f84
Update Features-20240918-162959.yaml
aranke Sep 18, 2024
5abd9fb
Update Features-20240918-162959.yaml
aranke Sep 18, 2024
9c1ff8a
Change tests
aranke Sep 18, 2024
46a5f2f
Merge branch 'main' into fix_7387
aranke Sep 18, 2024
3a42c72
Merge branch 'main' into fix_7387
aranke Sep 19, 2024
53ee509
Change formatting order, rename variable
aranke Sep 20, 2024
5df400c
Merge branch 'main' into fix_7387
aranke Sep 26, 2024
2b46907
merge from main
aranke Sep 26, 2024
2ad3dd2
make imports absolute
aranke Sep 26, 2024
dbfaa22
ignore operations unless we're in run-operation
aranke Sep 26, 2024
d808204
Add test for fixing retry hook
aranke Sep 27, 2024
9accf08
merge from main
aranke Sep 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20240918-162959.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Create 'skip_nodes_if_on_run_start_fails' behavior change flag
time: 2024-09-18T16:29:59.268422+01:00
custom:
Author: aranke
Issue: "7387"
1 change: 1 addition & 0 deletions core/dbt/contracts/graph/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -1654,6 +1654,7 @@ class ParsedMacroPatch(ParsedPatch):
ResultNode = Union[
ManifestNode,
SourceDefinition,
HookNode,
]

# All nodes that can be in the DAG
Expand Down
2 changes: 2 additions & 0 deletions core/dbt/contracts/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,13 +341,15 @@ class ProjectFlags(ExtensibleDbtClassMixin):
require_explicit_package_overrides_for_builtin_materializations: bool = True
require_resource_names_without_spaces: bool = False
source_freshness_run_project_hooks: bool = False
skip_nodes_if_on_run_start_fails: bool = False

@property
def project_only_flags(self) -> Dict[str, Any]:
return {
"require_explicit_package_overrides_for_builtin_materializations": self.require_explicit_package_overrides_for_builtin_materializations,
"require_resource_names_without_spaces": self.require_resource_names_without_spaces,
"source_freshness_run_project_hooks": self.source_freshness_run_project_hooks,
"skip_nodes_if_on_run_start_fails": self.skip_nodes_if_on_run_start_fails,
}


Expand Down
14 changes: 12 additions & 2 deletions core/dbt/events/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -1513,10 +1513,20 @@ def code(self) -> str:
return "Q033"

def message(self) -> str:
msg = f"OK hook: {self.statement}"
if self.status == "success":
info = "OK"
status = green(info)
elif self.status == "skipped":
info = "SKIP"
status = yellow(info)
else:
info = "ERROR"
status = red(info)
msg = f"{info} hook: {self.statement}"

return format_fancy_output_line(
msg=msg,
status=green(self.status),
status=status,
index=self.index,
total=self.total,
execution_time=self.execution_time,
Expand Down
5 changes: 3 additions & 2 deletions core/dbt/task/clone.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import threading
from typing import AbstractSet, Any, Collection, Iterable, List, Optional, Set, Type

from dbt.adapters.base import BaseRelation
from dbt.adapters.base import BaseAdapter, BaseRelation
from dbt.artifacts.resources.types import NodeType
from dbt.artifacts.schemas.run import RunResult, RunStatus
from dbt.clients.jinja import MacroGenerator
Expand Down Expand Up @@ -124,14 +124,15 @@ def get_model_schemas(self, adapter, selected_uids: Iterable[str]) -> Set[BaseRe

return result

def before_run(self, adapter, selected_uids: AbstractSet[str]):
def before_run(self, adapter: BaseAdapter, selected_uids: AbstractSet[str]) -> RunStatus:
with adapter.connection_named("master"):
self.defer_to_manifest()
# only create target schemas, but also cache defer_relation schemas
schemas_to_create = super().get_model_schemas(adapter, selected_uids)
self.create_schemas(adapter, schemas_to_create)
schemas_to_cache = self.get_model_schemas(adapter, selected_uids)
self.populate_adapter_cache(adapter, schemas_to_cache)
return RunStatus.Success

@property
def resource_types(self) -> List[NodeType]:
Expand Down
30 changes: 25 additions & 5 deletions core/dbt/task/freshness.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import AbstractSet, Dict, List, Optional, Type

from dbt import deprecations
from dbt.adapters.base import BaseAdapter
from dbt.adapters.base.impl import FreshnessResponse
from dbt.adapters.base.relation import BaseRelation
from dbt.adapters.capability import Capability
Expand Down Expand Up @@ -204,10 +205,25 @@
resource_types=[NodeType.Source],
)

def before_run(self, adapter, selected_uids: AbstractSet[str]) -> None:
super().before_run(adapter, selected_uids)
if adapter.supports(Capability.TableLastModifiedMetadataBatch):
self.populate_metadata_freshness_cache(adapter, selected_uids)
def before_run(self, adapter: BaseAdapter, selected_uids: AbstractSet[str]) -> RunStatus:
populate_metadata_freshness_cache_status = RunStatus.Success

before_run_status = super().before_run(adapter, selected_uids)

if before_run_status == RunStatus.Success and adapter.supports(
Capability.TableLastModifiedMetadataBatch
):
populate_metadata_freshness_cache_status = self.populate_metadata_freshness_cache(

Check warning on line 216 in core/dbt/task/freshness.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/freshness.py#L216

Added line #L216 was not covered by tests
adapter, selected_uids
)

if (
before_run_status == RunStatus.Success
and populate_metadata_freshness_cache_status == RunStatus.Success
):
return RunStatus.Success
else:
return RunStatus.Error

def get_runner(self, node) -> BaseRunner:
freshness_runner = super().get_runner(node)
Expand Down Expand Up @@ -243,7 +259,9 @@
deprecations.warn("source-freshness-project-hooks")
return []

def populate_metadata_freshness_cache(self, adapter, selected_uids: AbstractSet[str]) -> None:
def populate_metadata_freshness_cache(
self, adapter, selected_uids: AbstractSet[str]
) -> RunStatus:
if self.manifest is None:
raise DbtInternalError("Manifest must be set to populate metadata freshness cache")

Expand All @@ -266,6 +284,7 @@
batch_metadata_sources
)
self._metadata_freshness_cache.update(metadata_freshness_results)
return RunStatus.Success
except Exception as e:
# This error handling is intentionally very coarse.
# If anything goes wrong during batch metadata calculation, we can safely
Expand All @@ -276,6 +295,7 @@
Note(msg=f"Metadata freshness could not be computed in batch: {e}"),
EventLevel.WARN,
)
return RunStatus.Error

def get_freshness_metadata_cache(self) -> Dict[BaseRelation, FreshnessResult]:
return self._metadata_freshness_cache
3 changes: 2 additions & 1 deletion core/dbt/task/printer.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ def get_counts(flat_nodes) -> str:

counts[t] = counts.get(t, 0) + 1

stat_line = ", ".join([pluralize(v, k).replace("_", " ") for k, v in counts.items()])
sorted_items = sorted(counts.items(), key=lambda x: x[0])
stat_line = ", ".join([pluralize(v, k).replace("_", " ") for k, v in sorted_items])

return stat_line

Expand Down
Loading
Loading