Skip to content

Commit

Permalink
Add support for getting freshness from DBMS metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
peterallenwebb committed Oct 8, 2023
1 parent 46ee3f3 commit 4c9a492
Show file tree
Hide file tree
Showing 9 changed files with 206 additions and 45 deletions.
121 changes: 107 additions & 14 deletions core/dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import abc
import dataclasses
from concurrent.futures import as_completed, Future
from contextlib import contextmanager
from datetime import datetime
Expand All @@ -17,6 +18,7 @@
Set,
Tuple,
Type,
TypedDict,
Union,
)

Expand Down Expand Up @@ -76,6 +78,7 @@
GET_CATALOG_MACRO_NAME = "get_catalog"
GET_CATALOG_RELATIONS_MACRO_NAME = "get_catalog_relations"
FRESHNESS_MACRO_NAME = "collect_freshness"
GET_RELATION_LAST_MODIFIED_MACRO_NAME = "get_relation_last_modified"


class ConstraintSupport(str, Enum):
Expand All @@ -92,6 +95,14 @@ def _expect_row_value(key: str, row: agate.Row):
return row[key]


def _get_column_value_uncased(key: str, row: agate.Row) -> Any:
for col_name, value in row.items():
if col_name.casefold() == key.casefold():
return value

raise KeyError


def _catalog_filter_schemas(manifest: Manifest) -> Callable[[agate.Row], bool]:
"""Return a function that takes a row and decides if the row should be
included in the catalog output.
Expand All @@ -110,7 +121,7 @@ def test(row: agate.Row) -> bool:
return test


def _utc(dt: Optional[datetime], source: BaseRelation, field_name: str) -> datetime:
def _utc(dt: Optional[datetime], source: Optional[BaseRelation], field_name: str) -> datetime:
"""If dt has a timezone, return a new datetime that's in UTC. Otherwise,
assume the datetime is already for UTC and add the timezone.
"""
Expand Down Expand Up @@ -162,12 +173,50 @@ def submit(self, compiled_code: str) -> Any:
raise NotImplementedError("PythonJobHelper submit function is not implemented yet")


class AdapterFeature(str, Enum):
class Capability(str, Enum):
"""Enumeration of optional adapter features which can be probed using BaseAdapter.has_feature()"""

CatalogByRelations = "CatalogByRelations"
"""Flags support for retrieving catalog information using a list of relations, rather than always retrieving all
the relations in a schema """
SchemaMetadataByRelations = "CatalogByRelations"
"""Indicates efficient support for retrieving schema metadata for a list of relations, rather than always retrieving
all the relations in a schema."""

TableLastModifiedMetadata = "TableLastModifiedMetadata"
"""Indicates support for determining the time of the last table modification by querying database metadata."""


class Support(str, Enum):
Unknown = "Unknown"
"""The adapter has not declared whether this capability is a feature of the underlying DBMS."""

Unsupported = "Unsupported"
"""This capability is not possible with the underlying DBMS, so the adapter does not implement related macros."""

NotImplemented = "NotImplemented"
"""This capability is available in the underlying DBMS, but support has not yet been implemented in the adapter."""

Versioned = "Versioned"
"""Some versions of the DBMS supported by the adapter support this capability and the adapter has implemented any
macros needed to use it."""

Full = "Full"
"""All versions of the DBMS supported by the adapter support this capability and the adapter has implemented any
macros needed to use it."""


@dataclasses.dataclass
class CapabilitySupport:
capability: Capability
support: Support
first_version: Optional[str] = None

def __bool__(self):
return self.support == Support.Versioned or self.support == Support.Full


class FreshnessResponse(TypedDict):
max_loaded_at: datetime
snapshotted_at: datetime
age: float # age in seconds


class BaseAdapter(metaclass=AdapterMeta):
Expand Down Expand Up @@ -1147,7 +1196,9 @@ def get_catalog(self, manifest: Manifest) -> Tuple[agate.Table, List[Exception]]
with executor(self.config) as tpe:
futures: List[Future[agate.Table]] = []
relation_count = len(self._get_catalog_relations(manifest))
if relation_count <= 100 and self.has_feature(AdapterFeature.CatalogByRelations):
if relation_count <= 100 and self.capability_support(
Capability.SchemaMetadataByRelations
):
relations_by_schema = self._get_catalog_relations_by_info_schema(manifest)
for info_schema in relations_by_schema:
name = ".".join([str(info_schema.database), "information_schema"])
Expand Down Expand Up @@ -1186,7 +1237,7 @@ def calculate_freshness(
loaded_at_field: str,
filter: Optional[str],
manifest: Optional[Manifest] = None,
) -> Tuple[Optional[AdapterResponse], Dict[str, Any]]:
) -> Tuple[Optional[AdapterResponse], FreshnessResponse]:
"""Calculate the freshness of sources in dbt, and return it"""
kwargs: Dict[str, Any] = {
"source": source,
Expand Down Expand Up @@ -1221,13 +1272,53 @@ def calculate_freshness(

snapshotted_at = _utc(table[0][1], source, loaded_at_field)
age = (snapshotted_at - max_loaded_at).total_seconds()
freshness = {
freshness: FreshnessResponse = {
"max_loaded_at": max_loaded_at,
"snapshotted_at": snapshotted_at,
"age": age,
}
return adapter_response, freshness

def calculate_freshness_from_metadata(
self,
source: BaseRelation,
manifest: Optional[Manifest] = None,
) -> Tuple[Optional[AdapterResponse], FreshnessResponse]:
kwargs: Dict[str, Any] = {
"information_schema": source.information_schema_only(),
"relations": [source],
}
result = self.execute_macro(
GET_RELATION_LAST_MODIFIED_MACRO_NAME, kwargs=kwargs, manifest=manifest
)
adapter_response, table = result.response, result.table # type: ignore[attr-defined]

try:
assert len(table) == 1 # There should be one row since we requested one relation
row = table[0]
last_modified_val = _get_column_value_uncased("last_modified", row)
snapshotted_at_val = _get_column_value_uncased("snapshotted_at", row)

if last_modified_val is None:
# no records in the table, so really the max_loaded_at was
# infinitely long ago. Just call it 0:00 January 1 year UTC
max_loaded_at = datetime(1, 1, 1, 0, 0, 0, tzinfo=pytz.UTC)
else:
max_loaded_at = _utc(last_modified_val, None, "last_modified")

snapshotted_at = _utc(snapshotted_at_val, None, "snapshotted_at")
age = (snapshotted_at - max_loaded_at).total_seconds()

freshness: FreshnessResponse = {
"max_loaded_at": max_loaded_at,
"snapshotted_at": snapshotted_at,
"age": age,
}

return adapter_response, freshness
except Exception:
raise MacroResultError(GET_RELATION_LAST_MODIFIED_MACRO_NAME, table)

def pre_model_hook(self, config: Mapping[str, Any]) -> Any:
"""A hook for running some operation before the model materialization
runs. The hook can assume it has a connection available.
Expand Down Expand Up @@ -1501,12 +1592,14 @@ def render_model_constraint(cls, constraint: ModelLevelConstraint) -> Optional[s
else:
return None

@classmethod
def has_feature(cls, feature: AdapterFeature) -> bool:
# The base adapter implementation does not implement any optional
# features, so always return false. Adapters which wish to provide
# optional features will have to override this function.
return False
def capabilities(self) -> List[CapabilitySupport]:
return []

def capability_support(self, capability: Capability) -> CapabilitySupport:
return CapabilitySupport(
capability=capability,
support=Support.Unknown,
)


COLUMNS_EQUAL_SQL = """
Expand Down
4 changes: 2 additions & 2 deletions core/dbt/contracts/graph/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -1301,8 +1301,8 @@ def sources(self):
return []

@property
def has_freshness(self):
return bool(self.freshness) and self.loaded_at_field is not None
def has_freshness(self) -> bool:
return bool(self.freshness)

@property
def search_name(self):
Expand Down
9 changes: 9 additions & 0 deletions core/dbt/include/global_project/macros/adapters/metadata.sql
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,12 @@
{{ exceptions.raise_not_implemented(
'get_relations macro not implemented for adapter '+adapter.type()) }}
{% endmacro %}

{% macro get_relation_last_modified(information_schema, relations) %}
{{ return(adapter.dispatch('get_relation_last_modified', 'dbt')(information_schema, relations)) }}
{% endmacro %}

{% macro default__get_relation_last_modified(information_schema, relations) %}
{{ exceptions.raise_not_implemented(
'get_relation_last_modified macro not implemented for adapter ' + adapter.type()) }}
{% endmacro %}
15 changes: 14 additions & 1 deletion core/dbt/parser/sources.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import itertools
from pathlib import Path
from typing import Iterable, Dict, Optional, Set, Any, List

from dbt.adapters.base.impl import Capability
from dbt.adapters.factory import get_adapter
from dbt.config import RuntimeConfig
from dbt.context.context_config import (
Expand All @@ -26,7 +28,7 @@
)
from dbt.events.functions import warn_or_error
from dbt.events.types import UnusedTables
from dbt.exceptions import DbtInternalError
from dbt.exceptions import DbtInternalError, ParsingError
from dbt.node_types import NodeType

from dbt.parser.common import ParserRef
Expand Down Expand Up @@ -184,6 +186,17 @@ def parse_source(self, target: UnpatchedSourceDefinition) -> SourceDefinition:
unrendered_config=unrendered_config,
)

if (
parsed_source.freshness
and not parsed_source.loaded_at_field
and not get_adapter(self.root_project).capability_support(
Capability.TableLastModifiedMetadata
)
):
raise ParsingError(
"Adapter does not support metadata-based freshness. A loaded_at_field must be specified for source freshness."
)

# relation name is added after instantiation because the adapter does
# not provide the relation name for a UnpatchedSourceDefinition object
parsed_source.relation_name = self._get_relation_name(parsed_source)
Expand Down
37 changes: 22 additions & 15 deletions core/dbt/task/freshness.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import threading
import time
from typing import Optional

from .base import BaseRunner
from .printer import (
Expand All @@ -25,7 +26,7 @@

from dbt.graph import ResourceTypeSelector
from dbt.contracts.graph.nodes import SourceDefinition

from ..contracts.connection import AdapterResponse

RESULT_FILE_NAME = "sources.json"

Expand Down Expand Up @@ -95,24 +96,30 @@ def from_run_result(self, result, start_time, timing_info):
return result

def execute(self, compiled_node, manifest):
# we should only be here if we compiled_node.has_freshness, and
# therefore loaded_at_field should be a str. If this invariant is
# broken, raise!
if compiled_node.loaded_at_field is None:
raise DbtInternalError(
"Got to execute for source freshness of a source that has no loaded_at_field!"
)

relation = self.adapter.Relation.create_from_source(compiled_node)
# given a Source, calculate its freshness.
with self.adapter.connection_for(compiled_node):
self.adapter.clear_transaction()
adapter_response, freshness = self.adapter.calculate_freshness(
relation,
compiled_node.loaded_at_field,
compiled_node.freshness.filter,
manifest=manifest,
)
adapter_response: Optional[AdapterResponse] = None
freshness = None

if compiled_node.loaded_at_field is not None:
adapter_response, freshness = self.adapter.calculate_freshness(
relation,
compiled_node.loaded_at_field,
compiled_node.freshness.filter,
manifest=manifest,
)
else:
if compiled_node.freshness.filter is not None:
raise DbtRuntimeError(
"A filter cannot be applied to a metadata freshness check."
)

adapter_response, freshness = self.adapter.calculate_freshness_from_metadata(
relation,
manifest=manifest,
)

status = compiled_node.freshness.status(freshness["age"])

Expand Down
30 changes: 25 additions & 5 deletions plugins/postgres/dbt/adapters/postgres/impl.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
from datetime import datetime
from dataclasses import dataclass
from typing import Optional, Set, List, Any
from typing import Any, Dict, Optional, Set, List

from dbt.adapters.base.meta import available
from dbt.adapters.base.impl import AdapterConfig, AdapterFeature, ConstraintSupport
from dbt.adapters.base.impl import (
AdapterConfig,
Capability,
ConstraintSupport,
CapabilitySupport,
Support,
)
from dbt.adapters.sql import SQLAdapter
from dbt.adapters.postgres import PostgresConnectionManager
from dbt.adapters.postgres.column import PostgresColumn
Expand Down Expand Up @@ -75,7 +81,12 @@ class PostgresAdapter(SQLAdapter):

CATALOG_BY_RELATION_SUPPORT = True

SUPPORTED_FEATURES: Set[AdapterFeature] = frozenset([AdapterFeature.CatalogByRelations])
_capabilities: Dict[Capability, CapabilitySupport] = {
Capability.SchemaMetadataByRelations: CapabilitySupport(
capability=Capability.SchemaMetadataByRelations,
support=Support.Full,
)
}

@classmethod
def date_function(cls):
Expand Down Expand Up @@ -149,5 +160,14 @@ def debug_query(self):
self.execute("select 1 as id")

@classmethod
def has_feature(cls, feature: AdapterFeature) -> bool:
return feature in cls.SUPPORTED_FEATURES
def capabilities(cls) -> List[CapabilitySupport]:
return cls._capabilities.values()

def capability_support(self, capability: Capability) -> CapabilitySupport:
if capability in self._capabilities:
return self._capabilities[capability]
else:
return CapabilitySupport(
capability=capability,
support=Support.Unknown,
)
Loading

0 comments on commit 4c9a492

Please sign in to comment.