From bfa1d4dcb2bd1511d1f852e15274e26c308a53c2 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Mon, 25 Mar 2024 16:45:45 -0400 Subject: [PATCH] key freshness metadata cache by relation, defer information_schema-based querying to adapter batch method --- core/dbt/task/freshness.py | 78 ++++++++++++++------------------------ 1 file changed, 29 insertions(+), 49 deletions(-) diff --git a/core/dbt/task/freshness.py b/core/dbt/task/freshness.py index 055a763629b..9630883614d 100644 --- a/core/dbt/task/freshness.py +++ b/core/dbt/task/freshness.py @@ -1,7 +1,7 @@ import os import threading import time -from typing import Optional, List, AbstractSet, Dict, Tuple +from typing import Optional, List, AbstractSet, Dict from .base import BaseRunner from .printer import ( @@ -28,7 +28,8 @@ from dbt.adapters.capability import Capability from dbt.adapters.contracts.connection import AdapterResponse -from dbt.adapters.base.relation import InformationSchema, BaseRelation +from dbt.adapters.base.relation import BaseRelation +from dbt.adapters.base.impl import FreshnessResponse from dbt.contracts.graph.nodes import SourceDefinition, HookNode from dbt_common.events.base_types import EventLevel from dbt.graph import ResourceTypeSelector @@ -39,10 +40,10 @@ class FreshnessRunner(BaseRunner): def __init__(self, config, adapter, node, node_index, num_nodes) -> None: super().__init__(config, adapter, node, node_index, num_nodes) - self._metadata_freshness_cache: Dict[Tuple[str, str], FreshnessResult] = {} + self._metadata_freshness_cache: Dict[BaseRelation, FreshnessResult] = {} def set_metadata_freshness_cache( - self, metadata_freshness_cache: Dict[Tuple[str, str], FreshnessResult] + self, metadata_freshness_cache: Dict[BaseRelation, FreshnessResult] ) -> None: self._metadata_freshness_cache = metadata_freshness_cache @@ -115,7 +116,7 @@ def execute(self, compiled_node, manifest): with self.adapter.connection_named(compiled_node.unique_id, compiled_node): self.adapter.clear_transaction() adapter_response: Optional[AdapterResponse] = None - freshness = None + freshness: Optional[FreshnessResponse] = None if compiled_node.loaded_at_field is not None: adapter_response, freshness = self.adapter.calculate_freshness( @@ -130,17 +131,14 @@ def execute(self, compiled_node, manifest): if compiled_node.freshness.filter is not None: fire_event( Note( - f"A filter cannot be applied to a metadata freshness check on source '{compiled_node.name}'.", - EventLevel.WARN, - ) + msg=f"A filter cannot be applied to a metadata freshness check on source '{compiled_node.name}'." + ), + EventLevel.WARN, ) - if ( - compiled_node.schema.lower(), - compiled_node.identifier.lower(), - ) in self._metadata_freshness_cache: - freshness = self._metadata_freshness_cache[ - (compiled_node.schema.lower(), compiled_node.identifier.lower()) - ] + + metadata_source = self.adapter.Relation.create_from(self.config, compiled_node) + if metadata_source in self._metadata_freshness_cache: + freshness = self._metadata_freshness_cache[metadata_source] else: adapter_response, freshness = self.adapter.calculate_freshness_from_metadata( relation, @@ -149,9 +147,8 @@ def execute(self, compiled_node, manifest): status = compiled_node.freshness.status(freshness["age"]) else: - status = FreshnessStatus.Warn - fire_event( - Note(f"Skipping freshness for source {compiled_node.name}."), + raise DbtRuntimeError( + f"Could not compute freshness for source {compiled_node.name}: no 'loaded_at_field' provided and {self.adapter.type()} adapter does not support metadata-based freshness checks." ) # adapter_response was not returned in previous versions, so this will be None @@ -191,7 +188,7 @@ def node_is_match(self, node): class FreshnessTask(RunTask): def __init__(self, args, config, manifest) -> None: super().__init__(args, config, manifest) - self._metadata_freshness_cache: Dict[Tuple[str, str], FreshnessResult] = {} + self._metadata_freshness_cache: Dict[BaseRelation, FreshnessResult] = {} def result_path(self): if self.args.output: @@ -252,37 +249,20 @@ def _populate_metadata_freshness_cache(self, adapter, selected_uids: AbstractSet if self.manifest is None or self.graph is None: raise DbtInternalError("manifest must be set to get populate metadata freshness cache") - # Group metadata sources by information schema -- one query per information schema will be necessary - information_schema_to_metadata_sources: Dict[InformationSchema, List[BaseRelation]] = {} + batch_metadata_sources: List[BaseRelation] = [] for selected_source_uid in list(selected_uids): source = self.manifest.sources.get(selected_source_uid) if source and source.loaded_at_field is None: - metadata_source_relation = adapter.Relation.create_from(self.config, source) - information_schema = metadata_source_relation.information_schema_only() + metadata_source = adapter.Relation.create_from(self.config, source) + batch_metadata_sources.append(metadata_source) - if information_schema not in information_schema_to_metadata_sources: - information_schema_to_metadata_sources[information_schema] = [ - metadata_source_relation - ] - else: - information_schema_to_metadata_sources[information_schema].append( - metadata_source_relation - ) - - # Get freshness metadata results per information schema - for ( - information_schema, - sources_for_information_schema, - ) in information_schema_to_metadata_sources.items(): - fire_event( - Note(msg=f"Generating metadata freshness for sources in {information_schema}"), - EventLevel.INFO, - ) - _, metadata_freshness_results = adapter.calculate_freshness_from_metadata_batch( - sources_for_information_schema, information_schema - ) - # Update cache based on result for information schema - for (schema, identifier), freshness_result in metadata_freshness_results.items(): - self._metadata_freshness_cache[ - (schema.lower(), identifier.lower()) - ] = freshness_result + fire_event( + Note( + msg=f"Pulling freshness from warehouse metadata tables for {len(batch_metadata_sources)} sources" + ), + EventLevel.INFO, + ) + _, metadata_freshness_results = adapter.calculate_freshness_from_metadata_batch( + batch_metadata_sources + ) + self._metadata_freshness_cache.update(metadata_freshness_results)