From 2890bc42895c3069b9eec89f9397de502a2bd700 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Mon, 11 Mar 2024 16:36:17 -0400 Subject: [PATCH 01/10] first pass: source freshness batches metadata results --- core/dbt/task/freshness.py | 54 +++++++++++++++++++++++++++++++++----- 1 file changed, 48 insertions(+), 6 deletions(-) diff --git a/core/dbt/task/freshness.py b/core/dbt/task/freshness.py index ff1159ab6e6..d2f4ede038d 100644 --- a/core/dbt/task/freshness.py +++ b/core/dbt/task/freshness.py @@ -1,7 +1,7 @@ import os import threading import time -from typing import Optional, List +from typing import Optional, List, AbstractSet from .base import BaseRunner from .printer import ( @@ -36,6 +36,13 @@ class FreshnessRunner(BaseRunner): + def __init__(self, config, adapter, node, node_index, num_nodes) -> None: + super().__init__(config, adapter, node, node_index, num_nodes) + self._metadata_freshness_results = None + + def set_metadata_freshness_results(self, metadata_freshness_results) -> None: + self._metadata_freshness_results = metadata_freshness_results + def on_skip(self): raise DbtRuntimeError("Freshness: nodes cannot be skipped!") @@ -124,11 +131,13 @@ def execute(self, compiled_node, manifest): EventLevel.WARN, ) ) - - adapter_response, freshness = self.adapter.calculate_freshness_from_metadata( - relation, - macro_resolver=manifest, - ) + if compiled_node.unique_id in self._metadata_freshness_results: + freshness = self._metadata_freshness_results[compiled_node.unique_id] + else: + adapter_response, freshness = self.adapter.calculate_freshness_from_metadata( + relation, + macro_resolver=manifest, + ) status = compiled_node.freshness.status(freshness["age"]) else: @@ -172,6 +181,10 @@ def node_is_match(self, node): class FreshnessTask(RunTask): + def __init__(self, args, config, manifest) -> None: + super().__init__(args, config, manifest) + self._metadata_freshness_results = None + def result_path(self): if self.args.output: return os.path.realpath(self.args.output) @@ -191,6 +204,35 @@ def get_node_selector(self): resource_types=[NodeType.Source], ) + def before_run(self, adapter, selected_uids: AbstractSet[str]) -> None: + super().before_run(adapter, selected_uids) + # TODO: new TableLastModifiedMetadataBatch ? + if adapter.supports(Capability.TableLastModifiedMetadata): + metadata_source_unique_ids = [] + metadata_source_relations = [] + for selected_source_uid in list(selected_uids): + source = self.manifest.expect(selected_source_uid) + if source.loaded_at_field is None: + source_relation = adapter.Relation.create_from(self.config, source) + metadata_source_unique_ids.append(selected_source_uid) + metadata_source_relations.append(source_relation) + + _, metadata_fresnhess_results = adapter.calculate_freshness_from_metadata_batch( + metadata_source_relations + ) + + metadata_fresnhess_results_dict = { + metadata_source_unique_ids[i]: metadata_fresnhess_results[i] + for i in range(len(metadata_fresnhess_results)) + } + self._metadata_freshness_results = metadata_fresnhess_results_dict + + def get_runner(self, node) -> BaseRunner: + freshness_runner = super().get_runner(node) + assert isinstance(freshness_runner, FreshnessRunner) + freshness_runner.set_metadata_freshness_results(self._metadata_freshness_results) + return freshness_runner + def get_runner_type(self, _): return FreshnessRunner From a6be2acd2c9294480dc073a154fb8a47f8276346 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Mon, 11 Mar 2024 18:44:39 -0400 Subject: [PATCH 02/10] TableLastModifiedMetadataBatch + populate freshness metadata cache based on information schema queries --- core/dbt/task/freshness.py | 72 +++++++++++++++++++++++--------------- dev-requirements.txt | 2 +- 2 files changed, 45 insertions(+), 29 deletions(-) diff --git a/core/dbt/task/freshness.py b/core/dbt/task/freshness.py index d2f4ede038d..05a23c26cd5 100644 --- a/core/dbt/task/freshness.py +++ b/core/dbt/task/freshness.py @@ -1,7 +1,7 @@ import os import threading import time -from typing import Optional, List, AbstractSet +from typing import Optional, List, AbstractSet, Dict from .base import BaseRunner from .printer import ( @@ -28,6 +28,7 @@ from dbt.adapters.capability import Capability from dbt.adapters.contracts.connection import AdapterResponse +from dbt.adapters.base.relation import InformationSchema, BaseRelation from dbt.contracts.graph.nodes import SourceDefinition, HookNode from dbt_common.events.base_types import EventLevel from dbt.graph import ResourceTypeSelector @@ -38,10 +39,10 @@ class FreshnessRunner(BaseRunner): def __init__(self, config, adapter, node, node_index, num_nodes) -> None: super().__init__(config, adapter, node, node_index, num_nodes) - self._metadata_freshness_results = None + self._metadata_freshness_cache: Dict[str, FreshnessResult] = {} - def set_metadata_freshness_results(self, metadata_freshness_results) -> None: - self._metadata_freshness_results = metadata_freshness_results + def set_metadata_freshness_cache(self, metadata_freshness_cache: Dict[str, FreshnessResult]) -> None: + self._metadata_freshness_cache = metadata_freshness_cache def on_skip(self): raise DbtRuntimeError("Freshness: nodes cannot be skipped!") @@ -131,8 +132,8 @@ def execute(self, compiled_node, manifest): EventLevel.WARN, ) ) - if compiled_node.unique_id in self._metadata_freshness_results: - freshness = self._metadata_freshness_results[compiled_node.unique_id] + if compiled_node.unique_id in self._metadata_freshness_cache: + freshness = self._metadata_freshness_cache[compiled_node.unique_id] else: adapter_response, freshness = self.adapter.calculate_freshness_from_metadata( relation, @@ -183,7 +184,7 @@ def node_is_match(self, node): class FreshnessTask(RunTask): def __init__(self, args, config, manifest) -> None: super().__init__(args, config, manifest) - self._metadata_freshness_results = None + self._metadata_freshness_cache: Dict[str, FreshnessResult] = {} def result_path(self): if self.args.output: @@ -206,31 +207,13 @@ def get_node_selector(self): def before_run(self, adapter, selected_uids: AbstractSet[str]) -> None: super().before_run(adapter, selected_uids) - # TODO: new TableLastModifiedMetadataBatch ? - if adapter.supports(Capability.TableLastModifiedMetadata): - metadata_source_unique_ids = [] - metadata_source_relations = [] - for selected_source_uid in list(selected_uids): - source = self.manifest.expect(selected_source_uid) - if source.loaded_at_field is None: - source_relation = adapter.Relation.create_from(self.config, source) - metadata_source_unique_ids.append(selected_source_uid) - metadata_source_relations.append(source_relation) - - _, metadata_fresnhess_results = adapter.calculate_freshness_from_metadata_batch( - metadata_source_relations - ) - - metadata_fresnhess_results_dict = { - metadata_source_unique_ids[i]: metadata_fresnhess_results[i] - for i in range(len(metadata_fresnhess_results)) - } - self._metadata_freshness_results = metadata_fresnhess_results_dict + if adapter.supports(Capability.TableLastModifiedMetadataBatch): + self._populate_metadata_freshness_cache(adapter, selected_uids) def get_runner(self, node) -> BaseRunner: freshness_runner = super().get_runner(node) assert isinstance(freshness_runner, FreshnessRunner) - freshness_runner.set_metadata_freshness_results(self._metadata_freshness_results) + freshness_runner.set_metadata_freshness_cache(self._metadata_freshness_cache) return freshness_runner def get_runner_type(self, _): @@ -257,3 +240,36 @@ def get_hooks_by_type(self, hook_type: RunHookType) -> List[HookNode]: return super().get_hooks_by_type(hook_type) else: return [] + + def _populate_metadata_freshness_cache(self, adapter, selected_uids: AbstractSet[str]) -> None: + # Group metadata sources by information schema -- one query per information schema will be necessary + information_schema_to_metadata_sources: Dict[InformationSchema, List[BaseRelation]] = {} + # Track unique ids of sources for use as cache key + information_schema_to_source_unique_ids: Dict[InformationSchema, List[str]] = {} + for selected_source_uid in list(selected_uids): + source = self.manifest.expect(selected_source_uid) + if source.loaded_at_field is None: + metadata_source_relation = adapter.Relation.create_from(self.config, source) + information_schema = metadata_source_relation.information_schema_only() + + if information_schema not in information_schema_to_metadata_sources: + information_schema_to_metadata_sources[information_schema] = [metadata_source_relation] + information_schema_to_source_unique_ids[information_schema] = [source.unique_id] + else: + information_schema_to_metadata_sources[information_schema].append(metadata_source_relation) + information_schema_to_source_unique_ids[information_schema].append(source.unique_id) + + # Get freshness metadata results per information schema + for information_schema, sources_for_information_schema in information_schema_to_metadata_sources.items(): + fire_event( + Note(msg=f"Generating metadata freshness for sources in {information_schema}"), + EventLevel.INFO, + ) + _, metadata_fresnhess_results = adapter.calculate_freshness_from_metadata_batch( + sources_for_information_schema, information_schema + ) + # Update cache based on result for information schema + for idx in range(len(information_schema_to_source_unique_ids[information_schema])): + source_unique_id = information_schema_to_source_unique_ids[information_schema][idx] + source_metadata_freshness_result = metadata_fresnhess_results[idx] + self._metadata_freshness_cache[source_unique_id] = source_metadata_freshness_result diff --git a/dev-requirements.txt b/dev-requirements.txt index 0c706dddbbe..f6b0cdf1a9d 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -1,4 +1,4 @@ -git+https://github.com/dbt-labs/dbt-adapters.git@main +git+https://github.com/dbt-labs/dbt-adapters.git@batch-metadata-freshness git+https://github.com/dbt-labs/dbt-adapters.git@main#subdirectory=dbt-tests-adapter git+https://github.com/dbt-labs/dbt-common.git@main git+https://github.com/dbt-labs/dbt-postgres.git@main From fb7ae2b4fd93de3e73462a3145f4acde0a57cdbb Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Mon, 11 Mar 2024 18:51:01 -0400 Subject: [PATCH 03/10] linting --- core/dbt/task/freshness.py | 38 +++++++++++++++++++++++++++----------- 1 file changed, 27 insertions(+), 11 deletions(-) diff --git a/core/dbt/task/freshness.py b/core/dbt/task/freshness.py index 05a23c26cd5..d132f1d0697 100644 --- a/core/dbt/task/freshness.py +++ b/core/dbt/task/freshness.py @@ -41,7 +41,9 @@ def __init__(self, config, adapter, node, node_index, num_nodes) -> None: super().__init__(config, adapter, node, node_index, num_nodes) self._metadata_freshness_cache: Dict[str, FreshnessResult] = {} - def set_metadata_freshness_cache(self, metadata_freshness_cache: Dict[str, FreshnessResult]) -> None: + def set_metadata_freshness_cache( + self, metadata_freshness_cache: Dict[str, FreshnessResult] + ) -> None: self._metadata_freshness_cache = metadata_freshness_cache def on_skip(self): @@ -242,29 +244,43 @@ def get_hooks_by_type(self, hook_type: RunHookType) -> List[HookNode]: return [] def _populate_metadata_freshness_cache(self, adapter, selected_uids: AbstractSet[str]) -> None: + if self.manifest is None or self.graph is None: + raise DbtInternalError("manifest must be set to get populate metadata freshness cache") + # Group metadata sources by information schema -- one query per information schema will be necessary information_schema_to_metadata_sources: Dict[InformationSchema, List[BaseRelation]] = {} # Track unique ids of sources for use as cache key information_schema_to_source_unique_ids: Dict[InformationSchema, List[str]] = {} for selected_source_uid in list(selected_uids): - source = self.manifest.expect(selected_source_uid) - if source.loaded_at_field is None: + source = self.manifest.sources.get(selected_source_uid) + if source and source.loaded_at_field is None: metadata_source_relation = adapter.Relation.create_from(self.config, source) information_schema = metadata_source_relation.information_schema_only() if information_schema not in information_schema_to_metadata_sources: - information_schema_to_metadata_sources[information_schema] = [metadata_source_relation] - information_schema_to_source_unique_ids[information_schema] = [source.unique_id] + information_schema_to_metadata_sources[information_schema] = [ + metadata_source_relation + ] + information_schema_to_source_unique_ids[information_schema] = [ + source.unique_id + ] else: - information_schema_to_metadata_sources[information_schema].append(metadata_source_relation) - information_schema_to_source_unique_ids[information_schema].append(source.unique_id) + information_schema_to_metadata_sources[information_schema].append( + metadata_source_relation + ) + information_schema_to_source_unique_ids[information_schema].append( + source.unique_id + ) # Get freshness metadata results per information schema - for information_schema, sources_for_information_schema in information_schema_to_metadata_sources.items(): + for ( + information_schema, + sources_for_information_schema, + ) in information_schema_to_metadata_sources.items(): fire_event( - Note(msg=f"Generating metadata freshness for sources in {information_schema}"), - EventLevel.INFO, - ) + Note(msg=f"Generating metadata freshness for sources in {information_schema}"), + EventLevel.INFO, + ) _, metadata_fresnhess_results = adapter.calculate_freshness_from_metadata_batch( sources_for_information_schema, information_schema ) From ee3a471b5bf05516a2c6f03f3438da1da3d5210a Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Wed, 13 Mar 2024 19:18:51 -0400 Subject: [PATCH 04/10] use schema,identifier as key in freshness cache --- core/dbt/task/freshness.py | 35 ++++++++++++++++------------------- 1 file changed, 16 insertions(+), 19 deletions(-) diff --git a/core/dbt/task/freshness.py b/core/dbt/task/freshness.py index d132f1d0697..055a763629b 100644 --- a/core/dbt/task/freshness.py +++ b/core/dbt/task/freshness.py @@ -1,7 +1,7 @@ import os import threading import time -from typing import Optional, List, AbstractSet, Dict +from typing import Optional, List, AbstractSet, Dict, Tuple from .base import BaseRunner from .printer import ( @@ -39,10 +39,10 @@ class FreshnessRunner(BaseRunner): def __init__(self, config, adapter, node, node_index, num_nodes) -> None: super().__init__(config, adapter, node, node_index, num_nodes) - self._metadata_freshness_cache: Dict[str, FreshnessResult] = {} + self._metadata_freshness_cache: Dict[Tuple[str, str], FreshnessResult] = {} def set_metadata_freshness_cache( - self, metadata_freshness_cache: Dict[str, FreshnessResult] + self, metadata_freshness_cache: Dict[Tuple[str, str], FreshnessResult] ) -> None: self._metadata_freshness_cache = metadata_freshness_cache @@ -134,8 +134,13 @@ def execute(self, compiled_node, manifest): EventLevel.WARN, ) ) - if compiled_node.unique_id in self._metadata_freshness_cache: - freshness = self._metadata_freshness_cache[compiled_node.unique_id] + if ( + compiled_node.schema.lower(), + compiled_node.identifier.lower(), + ) in self._metadata_freshness_cache: + freshness = self._metadata_freshness_cache[ + (compiled_node.schema.lower(), compiled_node.identifier.lower()) + ] else: adapter_response, freshness = self.adapter.calculate_freshness_from_metadata( relation, @@ -186,7 +191,7 @@ def node_is_match(self, node): class FreshnessTask(RunTask): def __init__(self, args, config, manifest) -> None: super().__init__(args, config, manifest) - self._metadata_freshness_cache: Dict[str, FreshnessResult] = {} + self._metadata_freshness_cache: Dict[Tuple[str, str], FreshnessResult] = {} def result_path(self): if self.args.output: @@ -249,8 +254,6 @@ def _populate_metadata_freshness_cache(self, adapter, selected_uids: AbstractSet # Group metadata sources by information schema -- one query per information schema will be necessary information_schema_to_metadata_sources: Dict[InformationSchema, List[BaseRelation]] = {} - # Track unique ids of sources for use as cache key - information_schema_to_source_unique_ids: Dict[InformationSchema, List[str]] = {} for selected_source_uid in list(selected_uids): source = self.manifest.sources.get(selected_source_uid) if source and source.loaded_at_field is None: @@ -261,16 +264,10 @@ def _populate_metadata_freshness_cache(self, adapter, selected_uids: AbstractSet information_schema_to_metadata_sources[information_schema] = [ metadata_source_relation ] - information_schema_to_source_unique_ids[information_schema] = [ - source.unique_id - ] else: information_schema_to_metadata_sources[information_schema].append( metadata_source_relation ) - information_schema_to_source_unique_ids[information_schema].append( - source.unique_id - ) # Get freshness metadata results per information schema for ( @@ -281,11 +278,11 @@ def _populate_metadata_freshness_cache(self, adapter, selected_uids: AbstractSet Note(msg=f"Generating metadata freshness for sources in {information_schema}"), EventLevel.INFO, ) - _, metadata_fresnhess_results = adapter.calculate_freshness_from_metadata_batch( + _, metadata_freshness_results = adapter.calculate_freshness_from_metadata_batch( sources_for_information_schema, information_schema ) # Update cache based on result for information schema - for idx in range(len(information_schema_to_source_unique_ids[information_schema])): - source_unique_id = information_schema_to_source_unique_ids[information_schema][idx] - source_metadata_freshness_result = metadata_fresnhess_results[idx] - self._metadata_freshness_cache[source_unique_id] = source_metadata_freshness_result + for (schema, identifier), freshness_result in metadata_freshness_results.items(): + self._metadata_freshness_cache[ + (schema.lower(), identifier.lower()) + ] = freshness_result From bfa1d4dcb2bd1511d1f852e15274e26c308a53c2 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Mon, 25 Mar 2024 16:45:45 -0400 Subject: [PATCH 05/10] key freshness metadata cache by relation, defer information_schema-based querying to adapter batch method --- core/dbt/task/freshness.py | 78 ++++++++++++++------------------------ 1 file changed, 29 insertions(+), 49 deletions(-) diff --git a/core/dbt/task/freshness.py b/core/dbt/task/freshness.py index 055a763629b..9630883614d 100644 --- a/core/dbt/task/freshness.py +++ b/core/dbt/task/freshness.py @@ -1,7 +1,7 @@ import os import threading import time -from typing import Optional, List, AbstractSet, Dict, Tuple +from typing import Optional, List, AbstractSet, Dict from .base import BaseRunner from .printer import ( @@ -28,7 +28,8 @@ from dbt.adapters.capability import Capability from dbt.adapters.contracts.connection import AdapterResponse -from dbt.adapters.base.relation import InformationSchema, BaseRelation +from dbt.adapters.base.relation import BaseRelation +from dbt.adapters.base.impl import FreshnessResponse from dbt.contracts.graph.nodes import SourceDefinition, HookNode from dbt_common.events.base_types import EventLevel from dbt.graph import ResourceTypeSelector @@ -39,10 +40,10 @@ class FreshnessRunner(BaseRunner): def __init__(self, config, adapter, node, node_index, num_nodes) -> None: super().__init__(config, adapter, node, node_index, num_nodes) - self._metadata_freshness_cache: Dict[Tuple[str, str], FreshnessResult] = {} + self._metadata_freshness_cache: Dict[BaseRelation, FreshnessResult] = {} def set_metadata_freshness_cache( - self, metadata_freshness_cache: Dict[Tuple[str, str], FreshnessResult] + self, metadata_freshness_cache: Dict[BaseRelation, FreshnessResult] ) -> None: self._metadata_freshness_cache = metadata_freshness_cache @@ -115,7 +116,7 @@ def execute(self, compiled_node, manifest): with self.adapter.connection_named(compiled_node.unique_id, compiled_node): self.adapter.clear_transaction() adapter_response: Optional[AdapterResponse] = None - freshness = None + freshness: Optional[FreshnessResponse] = None if compiled_node.loaded_at_field is not None: adapter_response, freshness = self.adapter.calculate_freshness( @@ -130,17 +131,14 @@ def execute(self, compiled_node, manifest): if compiled_node.freshness.filter is not None: fire_event( Note( - f"A filter cannot be applied to a metadata freshness check on source '{compiled_node.name}'.", - EventLevel.WARN, - ) + msg=f"A filter cannot be applied to a metadata freshness check on source '{compiled_node.name}'." + ), + EventLevel.WARN, ) - if ( - compiled_node.schema.lower(), - compiled_node.identifier.lower(), - ) in self._metadata_freshness_cache: - freshness = self._metadata_freshness_cache[ - (compiled_node.schema.lower(), compiled_node.identifier.lower()) - ] + + metadata_source = self.adapter.Relation.create_from(self.config, compiled_node) + if metadata_source in self._metadata_freshness_cache: + freshness = self._metadata_freshness_cache[metadata_source] else: adapter_response, freshness = self.adapter.calculate_freshness_from_metadata( relation, @@ -149,9 +147,8 @@ def execute(self, compiled_node, manifest): status = compiled_node.freshness.status(freshness["age"]) else: - status = FreshnessStatus.Warn - fire_event( - Note(f"Skipping freshness for source {compiled_node.name}."), + raise DbtRuntimeError( + f"Could not compute freshness for source {compiled_node.name}: no 'loaded_at_field' provided and {self.adapter.type()} adapter does not support metadata-based freshness checks." ) # adapter_response was not returned in previous versions, so this will be None @@ -191,7 +188,7 @@ def node_is_match(self, node): class FreshnessTask(RunTask): def __init__(self, args, config, manifest) -> None: super().__init__(args, config, manifest) - self._metadata_freshness_cache: Dict[Tuple[str, str], FreshnessResult] = {} + self._metadata_freshness_cache: Dict[BaseRelation, FreshnessResult] = {} def result_path(self): if self.args.output: @@ -252,37 +249,20 @@ def _populate_metadata_freshness_cache(self, adapter, selected_uids: AbstractSet if self.manifest is None or self.graph is None: raise DbtInternalError("manifest must be set to get populate metadata freshness cache") - # Group metadata sources by information schema -- one query per information schema will be necessary - information_schema_to_metadata_sources: Dict[InformationSchema, List[BaseRelation]] = {} + batch_metadata_sources: List[BaseRelation] = [] for selected_source_uid in list(selected_uids): source = self.manifest.sources.get(selected_source_uid) if source and source.loaded_at_field is None: - metadata_source_relation = adapter.Relation.create_from(self.config, source) - information_schema = metadata_source_relation.information_schema_only() + metadata_source = adapter.Relation.create_from(self.config, source) + batch_metadata_sources.append(metadata_source) - if information_schema not in information_schema_to_metadata_sources: - information_schema_to_metadata_sources[information_schema] = [ - metadata_source_relation - ] - else: - information_schema_to_metadata_sources[information_schema].append( - metadata_source_relation - ) - - # Get freshness metadata results per information schema - for ( - information_schema, - sources_for_information_schema, - ) in information_schema_to_metadata_sources.items(): - fire_event( - Note(msg=f"Generating metadata freshness for sources in {information_schema}"), - EventLevel.INFO, - ) - _, metadata_freshness_results = adapter.calculate_freshness_from_metadata_batch( - sources_for_information_schema, information_schema - ) - # Update cache based on result for information schema - for (schema, identifier), freshness_result in metadata_freshness_results.items(): - self._metadata_freshness_cache[ - (schema.lower(), identifier.lower()) - ] = freshness_result + fire_event( + Note( + msg=f"Pulling freshness from warehouse metadata tables for {len(batch_metadata_sources)} sources" + ), + EventLevel.INFO, + ) + _, metadata_freshness_results = adapter.calculate_freshness_from_metadata_batch( + batch_metadata_sources + ) + self._metadata_freshness_cache.update(metadata_freshness_results) From 23df45b83872cd3724a37981fb49a2f0f9ce9d02 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Thu, 4 Apr 2024 16:18:00 -0700 Subject: [PATCH 06/10] gracefully handle calculate_freshness_from_metadata_batch failures --- core/dbt/task/freshness.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/core/dbt/task/freshness.py b/core/dbt/task/freshness.py index 9630883614d..f2ce34cc1cc 100644 --- a/core/dbt/task/freshness.py +++ b/core/dbt/task/freshness.py @@ -262,7 +262,16 @@ def _populate_metadata_freshness_cache(self, adapter, selected_uids: AbstractSet ), EventLevel.INFO, ) - _, metadata_freshness_results = adapter.calculate_freshness_from_metadata_batch( - batch_metadata_sources - ) - self._metadata_freshness_cache.update(metadata_freshness_results) + + try: + _, metadata_freshness_results = adapter.calculate_freshness_from_metadata_batch( + batch_metadata_sources + ) + self._metadata_freshness_cache.update(metadata_freshness_results) + except Exception: + # This error handling is intentionally very coarse. + # If anything goes wrong during batch metadata calculation, we can safely + # leave _metadata_freshness_cache unpopulated. + # Downstream, this will be gracefully handled as a cache miss and non-batch + # metadata-based freshness will still be performed on a source-by-source basis. + pass From 8f32a7910000e1b2d4942d1875c59cb9faf67c5f Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Thu, 4 Apr 2024 17:06:52 -0700 Subject: [PATCH 07/10] add unit tests TestFreshnessTaskMetadataCache --- core/dbt/task/freshness.py | 9 +- tests/unit/test_freshness_task.py | 154 ++++++++++++++++++++++++++++++ 2 files changed, 160 insertions(+), 3 deletions(-) create mode 100644 tests/unit/test_freshness_task.py diff --git a/core/dbt/task/freshness.py b/core/dbt/task/freshness.py index f2ce34cc1cc..615d7314384 100644 --- a/core/dbt/task/freshness.py +++ b/core/dbt/task/freshness.py @@ -212,7 +212,7 @@ def get_node_selector(self): def before_run(self, adapter, selected_uids: AbstractSet[str]) -> None: super().before_run(adapter, selected_uids) if adapter.supports(Capability.TableLastModifiedMetadataBatch): - self._populate_metadata_freshness_cache(adapter, selected_uids) + self.populate_metadata_freshness_cache(adapter, selected_uids) def get_runner(self, node) -> BaseRunner: freshness_runner = super().get_runner(node) @@ -245,8 +245,8 @@ def get_hooks_by_type(self, hook_type: RunHookType) -> List[HookNode]: else: return [] - def _populate_metadata_freshness_cache(self, adapter, selected_uids: AbstractSet[str]) -> None: - if self.manifest is None or self.graph is None: + def populate_metadata_freshness_cache(self, adapter, selected_uids: AbstractSet[str]) -> None: + if self.manifest is None: raise DbtInternalError("manifest must be set to get populate metadata freshness cache") batch_metadata_sources: List[BaseRelation] = [] @@ -275,3 +275,6 @@ def _populate_metadata_freshness_cache(self, adapter, selected_uids: AbstractSet # Downstream, this will be gracefully handled as a cache miss and non-batch # metadata-based freshness will still be performed on a source-by-source basis. pass + + def get_freshness_metadata_cache(self) -> Dict[BaseRelation, FreshnessResult]: + return self._metadata_freshness_cache diff --git a/tests/unit/test_freshness_task.py b/tests/unit/test_freshness_task.py new file mode 100644 index 00000000000..05c00df75da --- /dev/null +++ b/tests/unit/test_freshness_task.py @@ -0,0 +1,154 @@ +import datetime +import pytest +from unittest import mock + +from dbt.task.freshness import FreshnessTask, FreshnessResponse + + +class TestFreshnessTaskMetadataCache: + @pytest.fixture(scope="class") + def args(self): + mock_args = mock.Mock() + mock_args.state = None + mock_args.defer_state = None + mock_args.write_json = None + + return mock_args + + @pytest.fixture(scope="class") + def config(self): + mock_config = mock.Mock() + mock_config.threads = 1 + mock_config.target_name = "mock_config_target_name" + + @pytest.fixture(scope="class") + def manifest(self): + return mock.Mock() + + @pytest.fixture(scope="class") + def source_with_loaded_at_field(self): + mock_source = mock.Mock() + mock_source.unique_id = "source_with_loaded_at_field" + mock_source.loaded_at_field = "loaded_at_field" + return mock_source + + @pytest.fixture(scope="class") + def source_no_loaded_at_field(self): + mock_source = mock.Mock() + mock_source.unique_id = "source_no_loaded_at_field" + return mock_source + + @pytest.fixture(scope="class") + def source_no_loaded_at_field2(self): + mock_source = mock.Mock() + mock_source.unique_id = "source_no_loaded_at_field2" + return mock_source + + @pytest.fixture(scope="class") + def adapter(self): + return mock.Mock() + + @pytest.fixture(scope="class") + def freshness_response(self): + return FreshnessResponse( + max_loaded_at=datetime.datetime(2020, 5, 2), + snapshotted_at=datetime.datetime(2020, 5, 4), + age=2, + ) + + def test_populate_metadata_freshness_cache( + self, args, config, manifest, adapter, source_no_loaded_at_field, freshness_response + ): + manifest.sources = {source_no_loaded_at_field.unique_id: source_no_loaded_at_field} + adapter.Relation.create_from.return_value = "source_relation" + adapter.calculate_freshness_from_metadata_batch.return_value = ( + [], + {"source_relation": freshness_response}, + ) + task = FreshnessTask(args=args, config=config, manifest=manifest) + + task.populate_metadata_freshness_cache(adapter, {source_no_loaded_at_field.unique_id}) + + assert task.get_freshness_metadata_cache() == {"source_relation": freshness_response} + + def test_populate_metadata_freshness_cache_multiple_sources( + self, + args, + config, + manifest, + adapter, + source_no_loaded_at_field, + source_no_loaded_at_field2, + freshness_response, + ): + manifest.sources = { + source_no_loaded_at_field.unique_id: source_no_loaded_at_field, + source_no_loaded_at_field2.unique_id: source_no_loaded_at_field2, + } + adapter.Relation.create_from.side_effect = ["source_relation1", "source_relation2"] + adapter.calculate_freshness_from_metadata_batch.return_value = ( + [], + {"source_relation1": freshness_response, "source_relation2": freshness_response}, + ) + task = FreshnessTask(args=args, config=config, manifest=manifest) + + task.populate_metadata_freshness_cache(adapter, {source_no_loaded_at_field.unique_id}) + + assert task.get_freshness_metadata_cache() == { + "source_relation1": freshness_response, + "source_relation2": freshness_response, + } + + def test_populate_metadata_freshness_cache_with_loaded_at_field( + self, args, config, manifest, adapter, source_with_loaded_at_field, freshness_response + ): + manifest.sources = { + source_with_loaded_at_field.unique_id: source_with_loaded_at_field, + } + adapter.Relation.create_from.return_value = "source_relation" + adapter.calculate_freshness_from_metadata_batch.return_value = ( + [], + {"source_relation": freshness_response}, + ) + task = FreshnessTask(args=args, config=config, manifest=manifest) + + task.populate_metadata_freshness_cache(adapter, {source_with_loaded_at_field.unique_id}) + + assert task.get_freshness_metadata_cache() == {"source_relation": freshness_response} + + def test_populate_metadata_freshness_cache_multiple_sources_mixed( + self, + args, + config, + manifest, + adapter, + source_no_loaded_at_field, + source_with_loaded_at_field, + freshness_response, + ): + manifest.sources = { + source_no_loaded_at_field.unique_id: source_no_loaded_at_field, + source_with_loaded_at_field.unique_id: source_with_loaded_at_field, + } + adapter.Relation.create_from.return_value = "source_relation" + adapter.calculate_freshness_from_metadata_batch.return_value = ( + [], + {"source_relation": freshness_response}, + ) + task = FreshnessTask(args=args, config=config, manifest=manifest) + + task.populate_metadata_freshness_cache(adapter, {source_no_loaded_at_field.unique_id}) + + assert task.get_freshness_metadata_cache() == {"source_relation": freshness_response} + + def test_populate_metadata_freshness_cache_adapter_exception( + self, args, config, manifest, adapter, source_no_loaded_at_field, freshness_response + ): + manifest.sources = {source_no_loaded_at_field.unique_id: source_no_loaded_at_field} + adapter.Relation.create_from.return_value = "source_relation" + adapter.calculate_freshness_from_metadata_batch.side_effect = Exception() + task = FreshnessTask(args=args, config=config, manifest=manifest) + + task.populate_metadata_freshness_cache(adapter, {source_no_loaded_at_field.unique_id}) + + assert task.get_freshness_metadata_cache() == {} From 5c9403893cf48d495f09338f8209b4ff21dfba83 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Thu, 4 Apr 2024 17:07:42 -0700 Subject: [PATCH 08/10] changelog entry --- .changes/unreleased/Features-20240404-170728.yaml | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .changes/unreleased/Features-20240404-170728.yaml diff --git a/.changes/unreleased/Features-20240404-170728.yaml b/.changes/unreleased/Features-20240404-170728.yaml new file mode 100644 index 00000000000..6db7735acbc --- /dev/null +++ b/.changes/unreleased/Features-20240404-170728.yaml @@ -0,0 +1,6 @@ +kind: Features +body: 'source freshness precomputes metadata-based freshness in batch, if possible ' +time: 2024-04-04T17:07:28.717868-07:00 +custom: + Author: michelleark + Issue: "8705" From 2608baa409da06f63369d8f6ba4898ad509eccf1 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Fri, 12 Apr 2024 10:07:19 -0700 Subject: [PATCH 09/10] improve error/warn messaging --- core/dbt/task/freshness.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/core/dbt/task/freshness.py b/core/dbt/task/freshness.py index 615d7314384..b1fe7581c30 100644 --- a/core/dbt/task/freshness.py +++ b/core/dbt/task/freshness.py @@ -247,7 +247,7 @@ def get_hooks_by_type(self, hook_type: RunHookType) -> List[HookNode]: def populate_metadata_freshness_cache(self, adapter, selected_uids: AbstractSet[str]) -> None: if self.manifest is None: - raise DbtInternalError("manifest must be set to get populate metadata freshness cache") + raise DbtInternalError("Manifest must be set to populate metadata freshness cache") batch_metadata_sources: List[BaseRelation] = [] for selected_source_uid in list(selected_uids): @@ -268,13 +268,16 @@ def populate_metadata_freshness_cache(self, adapter, selected_uids: AbstractSet[ batch_metadata_sources ) self._metadata_freshness_cache.update(metadata_freshness_results) - except Exception: + except Exception as e: # This error handling is intentionally very coarse. # If anything goes wrong during batch metadata calculation, we can safely # leave _metadata_freshness_cache unpopulated. # Downstream, this will be gracefully handled as a cache miss and non-batch # metadata-based freshness will still be performed on a source-by-source basis. - pass + fire_event( + Note(msg=f"Metadata freshness could not be computed in batch: {e}"), + EventLevel.WARN, + ) def get_freshness_metadata_cache(self) -> Dict[BaseRelation, FreshnessResult]: return self._metadata_freshness_cache From f95f86c7dae51d2fd194b763b01372c90227b32e Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Fri, 12 Apr 2024 13:00:59 -0700 Subject: [PATCH 10/10] restore dev-requirements.txt --- dev-requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev-requirements.txt b/dev-requirements.txt index f6b0cdf1a9d..a1decf2ae62 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -1,4 +1,4 @@ -git+https://github.com/dbt-labs/dbt-adapters.git@batch-metadata-freshness +git+https://github.com/dbt-labs/dbt-adapters.git git+https://github.com/dbt-labs/dbt-adapters.git@main#subdirectory=dbt-tests-adapter git+https://github.com/dbt-labs/dbt-common.git@main git+https://github.com/dbt-labs/dbt-postgres.git@main