Skip to content

Commit

Permalink
Custom SQL for get source maxLoadedAt (#11163)
Browse files Browse the repository at this point in the history
  • Loading branch information
ChenyuLInx authored Dec 19, 2024
1 parent 97ffc37 commit f2222d2
Show file tree
Hide file tree
Showing 14 changed files with 237 additions and 9 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20241217-171631.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Calculate source freshness via a SQL query
time: 2024-12-17T17:16:31.841076-08:00
custom:
Author: ChenyuLInx
Issue: "8797"
1 change: 1 addition & 0 deletions core/dbt/artifacts/resources/v1/source_definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class ParsedSourceMandatory(GraphResource, HasRelationMetadata):
class SourceDefinition(ParsedSourceMandatory):
quoting: Quoting = field(default_factory=Quoting)
loaded_at_field: Optional[str] = None
loaded_at_query: Optional[str] = None
freshness: Optional[FreshnessThreshold] = None
external: Optional[ExternalTable] = None
description: str = ""
Expand Down
18 changes: 16 additions & 2 deletions core/dbt/context/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -880,7 +880,7 @@ class OperationProvider(RuntimeProvider):

# Base context collection, used for parsing configs.
class ProviderContext(ManifestContext):
# subclasses are MacroContext, ModelContext, TestContext
# subclasses are MacroContext, ModelContext, TestContext, SourceContext
def __init__(
self,
model,
Expand All @@ -893,7 +893,7 @@ def __init__(
raise DbtInternalError(f"Invalid provider given to context: {provider}")
# mypy appeasement - we know it'll be a RuntimeConfig
self.config: RuntimeConfig
self.model: Union[Macro, ManifestNode] = model
self.model: Union[Macro, ManifestNode, SourceDefinition] = model
super().__init__(config, manifest, model.package_name)
self.sql_results: Dict[str, Optional[AttrDict]] = {}
self.context_config: Optional[ContextConfig] = context_config
Expand Down Expand Up @@ -1558,6 +1558,20 @@ def __init__(
self._search_package = search_package


class SourceContext(ProviderContext):
# SourceContext is being used to render jinja SQL during execution of
# custom SQL in source freshness. It is not used for parsing.
model: SourceDefinition

@contextproperty()
def this(self) -> Optional[RelationProxy]:
return self.db_wrapper.Relation.create_from(self.config, self.model)

@contextproperty()
def source_node(self) -> SourceDefinition:
return self.model


class ModelContext(ProviderContext):
model: ManifestNode

Expand Down
4 changes: 4 additions & 0 deletions core/dbt/contracts/graph/unparsed.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ class UnparsedSourceTableDefinition(HasColumnTests, HasColumnAndTestProps):
config: Dict[str, Any] = field(default_factory=dict)
loaded_at_field: Optional[str] = None
loaded_at_field_present: Optional[bool] = None
loaded_at_query: Optional[str] = None
identifier: Optional[str] = None
quoting: Quoting = field(default_factory=Quoting)
freshness: Optional[FreshnessThreshold] = field(default_factory=FreshnessThreshold)
Expand All @@ -345,6 +346,7 @@ class UnparsedSourceDefinition(dbtClassMixin):
freshness: Optional[FreshnessThreshold] = field(default_factory=FreshnessThreshold)
loaded_at_field: Optional[str] = None
loaded_at_field_present: Optional[bool] = None
loaded_at_query: Optional[str] = None
tables: List[UnparsedSourceTableDefinition] = field(default_factory=list)
tags: List[str] = field(default_factory=list)
config: Dict[str, Any] = field(default_factory=dict)
Expand Down Expand Up @@ -382,6 +384,7 @@ class SourceTablePatch(dbtClassMixin):
docs: Optional[Docs] = None
loaded_at_field: Optional[str] = None
loaded_at_field_present: Optional[bool] = None
loaded_at_query: Optional[str] = None
identifier: Optional[str] = None
quoting: Quoting = field(default_factory=Quoting)
freshness: Optional[FreshnessThreshold] = field(default_factory=FreshnessThreshold)
Expand Down Expand Up @@ -425,6 +428,7 @@ class SourcePatch(dbtClassMixin):
freshness: Optional[Optional[FreshnessThreshold]] = field(default_factory=FreshnessThreshold)
loaded_at_field: Optional[str] = None
loaded_at_field_present: Optional[bool] = None
loaded_at_query: Optional[str] = None
tables: Optional[List[SourceTablePatch]] = None
tags: Optional[List[str]] = None

Expand Down
17 changes: 13 additions & 4 deletions core/dbt/parser/schema_renderer.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,21 @@ def _is_norender_key(self, keypath: Keypath) -> bool:
"tests" and "data_tests" are both currently supported but "tests" has been deprecated
"""
# top level descriptions and data_tests
if len(keypath) >= 1 and keypath[0] in ("tests", "data_tests", "description"):
if len(keypath) >= 1 and keypath[0] in (
"tests",
"data_tests",
"description",
"loaded_at_query",
):
return True

# columns descriptions and data_tests
if len(keypath) == 2 and keypath[1] in ("tests", "data_tests", "description"):
if len(keypath) == 2 and keypath[1] in (
"tests",
"data_tests",
"description",
"loaded_at_query",
):
return True

# pre- and post-hooks
Expand Down Expand Up @@ -69,9 +79,8 @@ def _is_norender_key(self, keypath: Keypath) -> bool:
def should_render_keypath(self, keypath: Keypath) -> bool:
if len(keypath) < 1:
return True

if self.key == "sources":
if keypath[0] == "description":
if keypath[0] in ("description", "loaded_at_query"):
return False
if keypath[0] == "tables":
if self._is_norender_key(keypath[2:]):
Expand Down
19 changes: 19 additions & 0 deletions core/dbt/parser/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
UnparsedSourceTableDefinition,
)
from dbt.events.types import FreshnessConfigProblem, UnusedTables
from dbt.exceptions import ParsingError
from dbt.node_types import NodeType
from dbt.parser.common import ParserRef
from dbt.parser.schema_generic_tests import SchemaGenericTestParser
Expand Down Expand Up @@ -131,11 +132,28 @@ def parse_source(self, target: UnpatchedSourceDefinition) -> SourceDefinition:
# We need to be able to tell the difference between explicitly setting the loaded_at_field to None/null
# and when it's simply not set. This allows a user to override the source level loaded_at_field so that
# specific table can default to metadata-based freshness.
if table.loaded_at_field_present and table.loaded_at_query:
raise ParsingError(
"Cannot specify both loaded_at_field and loaded_at_query at table level."
)
if source.loaded_at_field and source.loaded_at_query:
raise ParsingError(
"Cannot specify both loaded_at_field and loaded_at_query at source level."
)

if table.loaded_at_field_present or table.loaded_at_field is not None:
loaded_at_field = table.loaded_at_field
else:
loaded_at_field = source.loaded_at_field # may be None, that's okay

loaded_at_query: Optional[str]
if table.loaded_at_query is not None:
loaded_at_query = table.loaded_at_query
else:
if table.loaded_at_field_present:
loaded_at_query = None
else:
loaded_at_query = source.loaded_at_query
freshness = merge_freshness(source.freshness, table.freshness)
quoting = source.quoting.merged(table.quoting)
# path = block.path.original_file_path
Expand Down Expand Up @@ -185,6 +203,7 @@ def parse_source(self, target: UnpatchedSourceDefinition) -> SourceDefinition:
meta=meta,
loader=source.loader,
loaded_at_field=loaded_at_field,
loaded_at_query=loaded_at_query,
freshness=freshness,
quoting=quoting,
resource_type=NodeType.Source,
Expand Down
20 changes: 18 additions & 2 deletions core/dbt/task/freshness.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
PartialSourceFreshnessResult,
SourceFreshnessResult,
)
from dbt.clients import jinja
from dbt.context.providers import RuntimeProvider, SourceContext
from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.graph.nodes import HookNode, SourceDefinition
from dbt.contracts.results import RunStatus
Expand Down Expand Up @@ -114,7 +116,22 @@ def execute(self, compiled_node, manifest):
adapter_response: Optional[AdapterResponse] = None
freshness: Optional[FreshnessResponse] = None

if compiled_node.loaded_at_field is not None:
if compiled_node.loaded_at_query is not None:
# within the context user can have access to `this`, `source_node`(`model` will point to the same thing), etc
compiled_code = jinja.get_rendered(
compiled_node.loaded_at_query,
SourceContext(
compiled_node, self.config, manifest, RuntimeProvider(), None
).to_dict(),
compiled_node,
)
adapter_response, freshness = self.adapter.calculate_freshness_from_custom_sql(
relation,
compiled_code,
macro_resolver=manifest,
)
status = compiled_node.freshness.status(freshness["age"])
elif compiled_node.loaded_at_field is not None:
adapter_response, freshness = self.adapter.calculate_freshness(
relation,
compiled_node.loaded_at_field,
Expand Down Expand Up @@ -146,7 +163,6 @@ def execute(self, compiled_node, manifest):
raise DbtRuntimeError(
f"Could not compute freshness for source {compiled_node.name}: no 'loaded_at_field' provided and {self.adapter.type()} adapter does not support metadata-based freshness checks."
)

# adapter_response was not returned in previous versions, so this will be None
# we cannot call to_dict() on NoneType
if adapter_response:
Expand Down
2 changes: 1 addition & 1 deletion core/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
"dbt-semantic-interfaces>=0.8.3,<0.9",
# Minor versions for these are expected to be backwards-compatible
"dbt-common>=1.13.0,<2.0",
"dbt-adapters>=1.10.1,<2.0",
"dbt-adapters>=1.13.0,<2.0",
# ----
# Expect compatibility with all new versions of these packages, so lower bounds only.
"packaging>20.9",
Expand Down
22 changes: 22 additions & 0 deletions schemas/dbt/manifest/v12.json
Original file line number Diff line number Diff line change
Expand Up @@ -7752,6 +7752,17 @@
],
"default": null
},
"loaded_at_query": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null
},
"freshness": {
"anyOf": [
{
Expand Down Expand Up @@ -17540,6 +17551,17 @@
],
"default": null
},
"loaded_at_query": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null
},
"freshness": {
"anyOf": [
{
Expand Down
2 changes: 2 additions & 0 deletions tests/functional/artifacts/expected_manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,7 @@ def expected_seeded_manifest(project, model_database=None, quote_model=False):
},
"identifier": "seed",
"loaded_at_field": None,
"loaded_at_query": None,
"loader": "a_loader",
"meta": {},
"name": "my_table",
Expand Down Expand Up @@ -1299,6 +1300,7 @@ def expected_references_manifest(project):
},
"identifier": "seed",
"loaded_at_field": None,
"loaded_at_query": None,
"loader": "a_loader",
"meta": {},
"name": "my_table",
Expand Down
23 changes: 23 additions & 0 deletions tests/functional/sources/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,3 +472,26 @@
- name: test_table
identifier: source
"""

freshness_via_custom_sql_schema_yml = """version: 2
sources:
- name: test_source
freshness:
warn_after: {count: 10, period: hour}
schema: "{{ var(env_var('DBT_TEST_SCHEMA_NAME_VARIABLE')) }}"
quoting:
identifier: True
tags:
- my_test_source_tag
tables:
- name: source_a
identifier: source
loaded_at_field: "{{ var('test_loaded_at') | as_text }}"
- name: source_b
identifier: source
loaded_at_query: "select max({{ var('test_loaded_at') | as_text }}) from {{this}}"
- name: source_c
identifier: source
loaded_at_query: "select {{current_timestamp()}}"
"""
16 changes: 16 additions & 0 deletions tests/functional/sources/test_source_freshness.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
error_models_model_sql,
error_models_schema_yml,
filtered_models_schema_yml,
freshness_via_custom_sql_schema_yml,
freshness_via_metadata_schema_yml,
override_freshness_models_schema_yml,
)
Expand Down Expand Up @@ -578,3 +579,18 @@ def test_hooks_do_not_run_for_source_freshness(
)
# default behaviour - no hooks run in source freshness
self._assert_project_hooks_not_called(log_output)


class TestSourceFreshnessCustomSQL(SuccessfulSourceFreshnessTest):
@pytest.fixture(scope="class")
def models(self):
return {"schema.yml": freshness_via_custom_sql_schema_yml}

def test_source_freshness_custom_sql(self, project):
result = self.run_dbt_with_vars(project, ["source", "freshness"], expect_pass=True)
# They are the same source but different queries were executed for each
assert {r.node.name: r.status for r in result} == {
"source_a": "warn",
"source_b": "warn",
"source_c": "pass",
}
Loading

0 comments on commit f2222d2

Please sign in to comment.