Skip to content

Commit

Permalink
key freshness metadata cache by relation, defer information_schema-ba…
Browse files Browse the repository at this point in the history
…sed querying to adapter batch method
  • Loading branch information
MichelleArk committed Mar 25, 2024
1 parent 58dfb59 commit bfa1d4d
Showing 1 changed file with 29 additions and 49 deletions.
78 changes: 29 additions & 49 deletions core/dbt/task/freshness.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os
import threading
import time
from typing import Optional, List, AbstractSet, Dict, Tuple
from typing import Optional, List, AbstractSet, Dict

from .base import BaseRunner
from .printer import (
Expand All @@ -28,7 +28,8 @@

from dbt.adapters.capability import Capability
from dbt.adapters.contracts.connection import AdapterResponse
from dbt.adapters.base.relation import InformationSchema, BaseRelation
from dbt.adapters.base.relation import BaseRelation
from dbt.adapters.base.impl import FreshnessResponse
from dbt.contracts.graph.nodes import SourceDefinition, HookNode
from dbt_common.events.base_types import EventLevel
from dbt.graph import ResourceTypeSelector
Expand All @@ -39,10 +40,10 @@
class FreshnessRunner(BaseRunner):
def __init__(self, config, adapter, node, node_index, num_nodes) -> None:
super().__init__(config, adapter, node, node_index, num_nodes)
self._metadata_freshness_cache: Dict[Tuple[str, str], FreshnessResult] = {}
self._metadata_freshness_cache: Dict[BaseRelation, FreshnessResult] = {}

def set_metadata_freshness_cache(
self, metadata_freshness_cache: Dict[Tuple[str, str], FreshnessResult]
self, metadata_freshness_cache: Dict[BaseRelation, FreshnessResult]
) -> None:
self._metadata_freshness_cache = metadata_freshness_cache

Expand Down Expand Up @@ -115,7 +116,7 @@ def execute(self, compiled_node, manifest):
with self.adapter.connection_named(compiled_node.unique_id, compiled_node):
self.adapter.clear_transaction()
adapter_response: Optional[AdapterResponse] = None
freshness = None
freshness: Optional[FreshnessResponse] = None

if compiled_node.loaded_at_field is not None:
adapter_response, freshness = self.adapter.calculate_freshness(
Expand All @@ -130,17 +131,14 @@ def execute(self, compiled_node, manifest):
if compiled_node.freshness.filter is not None:
fire_event(
Note(
f"A filter cannot be applied to a metadata freshness check on source '{compiled_node.name}'.",
EventLevel.WARN,
)
msg=f"A filter cannot be applied to a metadata freshness check on source '{compiled_node.name}'."
),
EventLevel.WARN,
)
if (
compiled_node.schema.lower(),
compiled_node.identifier.lower(),
) in self._metadata_freshness_cache:
freshness = self._metadata_freshness_cache[
(compiled_node.schema.lower(), compiled_node.identifier.lower())
]

metadata_source = self.adapter.Relation.create_from(self.config, compiled_node)
if metadata_source in self._metadata_freshness_cache:
freshness = self._metadata_freshness_cache[metadata_source]

Check warning on line 141 in core/dbt/task/freshness.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/freshness.py#L139-L141

Added lines #L139 - L141 were not covered by tests
else:
adapter_response, freshness = self.adapter.calculate_freshness_from_metadata(

Check warning on line 143 in core/dbt/task/freshness.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/freshness.py#L143

Added line #L143 was not covered by tests
relation,
Expand All @@ -149,9 +147,8 @@ def execute(self, compiled_node, manifest):

status = compiled_node.freshness.status(freshness["age"])
else:
status = FreshnessStatus.Warn
fire_event(
Note(f"Skipping freshness for source {compiled_node.name}."),
raise DbtRuntimeError(

Check warning on line 150 in core/dbt/task/freshness.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/freshness.py#L150

Added line #L150 was not covered by tests
f"Could not compute freshness for source {compiled_node.name}: no 'loaded_at_field' provided and {self.adapter.type()} adapter does not support metadata-based freshness checks."
)

# adapter_response was not returned in previous versions, so this will be None
Expand Down Expand Up @@ -191,7 +188,7 @@ def node_is_match(self, node):
class FreshnessTask(RunTask):
def __init__(self, args, config, manifest) -> None:
super().__init__(args, config, manifest)
self._metadata_freshness_cache: Dict[Tuple[str, str], FreshnessResult] = {}
self._metadata_freshness_cache: Dict[BaseRelation, FreshnessResult] = {}

def result_path(self):
if self.args.output:
Expand Down Expand Up @@ -252,37 +249,20 @@ def _populate_metadata_freshness_cache(self, adapter, selected_uids: AbstractSet
if self.manifest is None or self.graph is None:
raise DbtInternalError("manifest must be set to get populate metadata freshness cache")

Check warning on line 250 in core/dbt/task/freshness.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/freshness.py#L249-L250

Added lines #L249 - L250 were not covered by tests

# Group metadata sources by information schema -- one query per information schema will be necessary
information_schema_to_metadata_sources: Dict[InformationSchema, List[BaseRelation]] = {}
batch_metadata_sources: List[BaseRelation] = []
for selected_source_uid in list(selected_uids):
source = self.manifest.sources.get(selected_source_uid)
if source and source.loaded_at_field is None:
metadata_source_relation = adapter.Relation.create_from(self.config, source)
information_schema = metadata_source_relation.information_schema_only()
metadata_source = adapter.Relation.create_from(self.config, source)
batch_metadata_sources.append(metadata_source)

Check warning on line 257 in core/dbt/task/freshness.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/freshness.py#L252-L257

Added lines #L252 - L257 were not covered by tests

if information_schema not in information_schema_to_metadata_sources:
information_schema_to_metadata_sources[information_schema] = [
metadata_source_relation
]
else:
information_schema_to_metadata_sources[information_schema].append(
metadata_source_relation
)

# Get freshness metadata results per information schema
for (
information_schema,
sources_for_information_schema,
) in information_schema_to_metadata_sources.items():
fire_event(
Note(msg=f"Generating metadata freshness for sources in {information_schema}"),
EventLevel.INFO,
)
_, metadata_freshness_results = adapter.calculate_freshness_from_metadata_batch(
sources_for_information_schema, information_schema
)
# Update cache based on result for information schema
for (schema, identifier), freshness_result in metadata_freshness_results.items():
self._metadata_freshness_cache[
(schema.lower(), identifier.lower())
] = freshness_result
fire_event(

Check warning on line 259 in core/dbt/task/freshness.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/freshness.py#L259

Added line #L259 was not covered by tests
Note(
msg=f"Pulling freshness from warehouse metadata tables for {len(batch_metadata_sources)} sources"
),
EventLevel.INFO,
)
_, metadata_freshness_results = adapter.calculate_freshness_from_metadata_batch(

Check warning on line 265 in core/dbt/task/freshness.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/freshness.py#L265

Added line #L265 was not covered by tests
batch_metadata_sources
)
self._metadata_freshness_cache.update(metadata_freshness_results)

Check warning on line 268 in core/dbt/task/freshness.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/freshness.py#L268

Added line #L268 was not covered by tests

0 comments on commit bfa1d4d

Please sign in to comment.