From b6b56a9ada46e4d5a26eaa97fb5762b07c920bcf Mon Sep 17 00:00:00 2001 From: Chenyu Li Date: Wed, 18 Dec 2024 17:06:16 -0800 Subject: [PATCH] add model freshness for adaptive job --- core/dbt/artifacts/resources/__init__.py | 7 +- core/dbt/artifacts/resources/v1/model.py | 16 +++- core/dbt/contracts/graph/nodes.py | 2 + core/dbt/contracts/graph/unparsed.py | 2 + core/dbt/parser/schemas.py | 21 ++++- schemas/dbt/manifest/v12.json | 110 +++++++++++++++++++++++ tests/unit/parser/test_parser.py | 39 ++++++++ 7 files changed, 194 insertions(+), 3 deletions(-) diff --git a/core/dbt/artifacts/resources/__init__.py b/core/dbt/artifacts/resources/__init__.py index 3435c386daf..891d6cbba2f 100644 --- a/core/dbt/artifacts/resources/__init__.py +++ b/core/dbt/artifacts/resources/__init__.py @@ -46,7 +46,12 @@ MetricTimeWindow, MetricTypeParams, ) -from dbt.artifacts.resources.v1.model import Model, ModelConfig, TimeSpine +from dbt.artifacts.resources.v1.model import ( + Model, + ModelConfig, + ModelFreshness, + TimeSpine, +) from dbt.artifacts.resources.v1.owner import Owner from dbt.artifacts.resources.v1.saved_query import ( Export, diff --git a/core/dbt/artifacts/resources/v1/model.py b/core/dbt/artifacts/resources/v1/model.py index 9c43970f488..4e7db0d7b60 100644 --- a/core/dbt/artifacts/resources/v1/model.py +++ b/core/dbt/artifacts/resources/v1/model.py @@ -1,12 +1,14 @@ +import enum from dataclasses import dataclass, field from datetime import datetime from typing import Dict, List, Literal, Optional -from dbt.artifacts.resources.types import AccessType, NodeType +from dbt.artifacts.resources.types import AccessType, NodeType, TimePeriod from dbt.artifacts.resources.v1.components import ( CompiledResource, DeferRelation, NodeVersion, + Time, ) from dbt.artifacts.resources.v1.config import NodeConfig from dbt_common.contracts.config.base import MergeBehavior @@ -34,6 +36,17 @@ class TimeSpine(dbtClassMixin): custom_granularities: List[CustomGranularity] = field(default_factory=list) +class ModelFreshnessDependsOnOptions(enum.StrEnum): + all = "all" + any = "any" + + +@dataclass +class ModelFreshness(dbtClassMixin): + depends_on: ModelFreshnessDependsOnOptions = ModelFreshnessDependsOnOptions.any + build_after: Time = field(default_factory=lambda: Time(period=TimePeriod.hour, count=0)) + + @dataclass class Model(CompiledResource): resource_type: Literal[NodeType.Model] @@ -46,6 +59,7 @@ class Model(CompiledResource): defer_relation: Optional[DeferRelation] = None primary_key: List[str] = field(default_factory=list) time_spine: Optional[TimeSpine] = None + freshness: Optional[ModelFreshness] = None def __post_serialize__(self, dct: Dict, context: Optional[Dict] = None): dct = super().__post_serialize__(dct, context) diff --git a/core/dbt/contracts/graph/nodes.py b/core/dbt/contracts/graph/nodes.py index 4bb70db5d9c..e39a3ab2fc3 100644 --- a/core/dbt/contracts/graph/nodes.py +++ b/core/dbt/contracts/graph/nodes.py @@ -60,6 +60,7 @@ from dbt.artifacts.resources import SqlOperation as SqlOperationResource from dbt.artifacts.resources import TimeSpine from dbt.artifacts.resources import UnitTestDefinition as UnitTestDefinitionResource +from dbt.artifacts.resources.v1.model import ModelFreshness from dbt.artifacts.schemas.batch_results import BatchResults from dbt.clients.jinja_static import statically_extract_has_name_this from dbt.contracts.graph.model_config import UnitTestNodeConfig @@ -1698,6 +1699,7 @@ class ParsedNodePatch(ParsedPatch): constraints: List[Dict[str, Any]] deprecation_date: Optional[datetime] time_spine: Optional[TimeSpine] = None + freshness: Optional[ModelFreshness] = None @dataclass diff --git a/core/dbt/contracts/graph/unparsed.py b/core/dbt/contracts/graph/unparsed.py index f78ba15a50f..92e00710c57 100644 --- a/core/dbt/contracts/graph/unparsed.py +++ b/core/dbt/contracts/graph/unparsed.py @@ -18,6 +18,7 @@ MacroArgument, MaturityType, MeasureAggregationParameters, + ModelFreshness, NodeVersion, Owner, Quoting, @@ -221,6 +222,7 @@ class UnparsedModelUpdate(UnparsedNodeUpdate): versions: Sequence[UnparsedVersion] = field(default_factory=list) deprecation_date: Optional[datetime.datetime] = None time_spine: Optional[TimeSpine] = None + freshness: Optional[ModelFreshness] = None def __post_init__(self) -> None: if self.latest_version: diff --git a/core/dbt/parser/schemas.py b/core/dbt/parser/schemas.py index e9c66e184e4..710994ab944 100644 --- a/core/dbt/parser/schemas.py +++ b/core/dbt/parser/schemas.py @@ -6,7 +6,12 @@ from typing import Any, Callable, Dict, Generic, Iterable, List, Optional, Type, TypeVar from dbt.artifacts.resources import RefArgs -from dbt.artifacts.resources.v1.model import CustomGranularity, TimeSpine +from dbt.artifacts.resources.v1.components import Time +from dbt.artifacts.resources.v1.model import ( + CustomGranularity, + ModelFreshness, + TimeSpine, +) from dbt.clients.jinja_static import statically_parse_ref_or_source from dbt.clients.yaml_helper import load_yaml_text from dbt.config import RuntimeConfig @@ -722,6 +727,7 @@ def parse_patch(self, block: TargetBlock[NodeTarget], refs: ParserRef) -> None: # code consistency. deprecation_date: Optional[datetime.datetime] = None time_spine: Optional[TimeSpine] = None + freshness: Optional[ModelFreshness] = None if isinstance(block.target, UnparsedModelUpdate): deprecation_date = block.target.deprecation_date time_spine = ( @@ -738,6 +744,17 @@ def parse_patch(self, block: TargetBlock[NodeTarget], refs: ParserRef) -> None: if block.target.time_spine else None ) + freshness = ( + ModelFreshness( + depends_on=block.target.freshness.depends_on, + build_after=Time( + count=block.target.freshness.build_after.count, + period=block.target.freshness.build_after.period, + ), + ) + if block.target.freshness + else None + ) patch = ParsedNodePatch( name=block.target.name, original_file_path=block.target.original_file_path, @@ -754,6 +771,7 @@ def parse_patch(self, block: TargetBlock[NodeTarget], refs: ParserRef) -> None: constraints=block.target.constraints, deprecation_date=deprecation_date, time_spine=time_spine, + freshness=freshness, ) assert isinstance(self.yaml.file, SchemaSourceFile) source_file: SchemaSourceFile = self.yaml.file @@ -1043,6 +1061,7 @@ def patch_node_properties(self, node, patch: "ParsedNodePatch") -> None: # These two will have to be reapplied after config is built for versioned models self.patch_constraints(node, patch.constraints) self.patch_time_spine(node, patch.time_spine) + node.freshness = patch.freshness node.build_contract_checksum() def patch_constraints(self, node: ModelNode, constraints: List[Dict[str, Any]]) -> None: diff --git a/schemas/dbt/manifest/v12.json b/schemas/dbt/manifest/v12.json index d0466e4852c..ec2a4c0cc22 100644 --- a/schemas/dbt/manifest/v12.json +++ b/schemas/dbt/manifest/v12.json @@ -4777,6 +4777,61 @@ } ], "default": null + }, + "freshness": { + "anyOf": [ + { + "type": "object", + "title": "ModelFreshness", + "properties": { + "depends_on": { + "enum": [ + "all", + "any" + ], + "default": "any" + }, + "build_after": { + "type": "object", + "title": "Time", + "properties": { + "count": { + "anyOf": [ + { + "type": "integer" + }, + { + "type": "null" + } + ], + "default": null + }, + "period": { + "anyOf": [ + { + "enum": [ + "minute", + "hour", + "day" + ] + }, + { + "type": "null" + } + ], + "default": null + } + }, + "additionalProperties": false + } + }, + "additionalProperties": false + }, + { + "type": "null" + } + ], + "default": null } }, "additionalProperties": false, @@ -14574,6 +14629,61 @@ } ], "default": null + }, + "freshness": { + "anyOf": [ + { + "type": "object", + "title": "ModelFreshness", + "properties": { + "depends_on": { + "enum": [ + "all", + "any" + ], + "default": "any" + }, + "build_after": { + "type": "object", + "title": "Time", + "properties": { + "count": { + "anyOf": [ + { + "type": "integer" + }, + { + "type": "null" + } + ], + "default": null + }, + "period": { + "anyOf": [ + { + "enum": [ + "minute", + "hour", + "day" + ] + }, + { + "type": "null" + } + ], + "default": null + } + }, + "additionalProperties": false + } + }, + "additionalProperties": false + }, + { + "type": "null" + } + ], + "default": null } }, "additionalProperties": false, diff --git a/tests/unit/parser/test_parser.py b/tests/unit/parser/test_parser.py index 8894e47ce84..c969b4c7e06 100644 --- a/tests/unit/parser/test_parser.py +++ b/tests/unit/parser/test_parser.py @@ -8,6 +8,7 @@ from dbt import tracking from dbt.artifacts.resources import ModelConfig, RefArgs +from dbt.artifacts.resources.v1.components import Time from dbt.context.context_config import ContextConfig from dbt.contracts.files import FileHash, FilePath, SchemaSourceFile, SourceFile from dbt.contracts.graph.manifest import Manifest @@ -301,6 +302,22 @@ def assertEqualNodes(node_one, node_two): arg: 100 """ +SINGLE_TALBE_MODEL_FRESHNESS = """ +models: + - name: my_model + description: A description of my model + freshness: + build_after: {count: 1, period: day} +""" + +SINGLE_TALBE_MODEL_FRESHNESS_ONLY_DEPEND_ON = """ +models: + - name: my_model + description: A description of my model + freshness: + depends_on: all +""" + MULTIPLE_TABLE_VERSIONED_MODEL_TESTS = """ models: @@ -607,6 +624,28 @@ def test__read_basic_model_tests(self): self.assertEqual(len(list(self.parser.manifest.sources)), 0) self.assertEqual(len(list(self.parser.manifest.nodes)), 4) + def test__parse_model_freshness(self): + block = self.file_block_for(SINGLE_TALBE_MODEL_FRESHNESS, "test_one.yml") + self.parser.manifest.files[block.file.file_id] = block.file + dct = yaml_from_file(block.file) + self.parser.parse_file(block, dct) + self.assert_has_manifest_lengths(self.parser.manifest, nodes=1) + assert self.parser.manifest.nodes["model.root.my_model"].freshness.depends_on == "any" + assert self.parser.manifest.nodes["model.root.my_model"].freshness.build_after == Time( + count=1, period="day" + ) + + def test__parse_model_freshness_depend_on(self): + block = self.file_block_for(SINGLE_TALBE_MODEL_FRESHNESS_ONLY_DEPEND_ON, "test_one.yml") + self.parser.manifest.files[block.file.file_id] = block.file + dct = yaml_from_file(block.file) + self.parser.parse_file(block, dct) + self.assert_has_manifest_lengths(self.parser.manifest, nodes=1) + assert self.parser.manifest.nodes["model.root.my_model"].freshness.depends_on == "all" + assert self.parser.manifest.nodes["model.root.my_model"].freshness.build_after == Time( + count=0, period="hour" + ) + def test__read_basic_model_tests_wrong_severity(self): block = self.yaml_block_for(SINGLE_TABLE_MODEL_TESTS_WRONG_SEVERITY, "test_one.yml") dct = yaml_from_file(block.file)