From 7a8be3409063412df57609bbdb5484e222da5bc3 Mon Sep 17 00:00:00 2001 From: Chenyu Li Date: Thu, 19 Dec 2024 12:39:39 -0800 Subject: [PATCH] add model freshness for adaptive job --- .../unreleased/Features-20241218-170729.yaml | 6 ++ core/dbt/artifacts/resources/__init__.py | 7 +- core/dbt/artifacts/resources/v1/model.py | 24 +++++- core/dbt/contracts/graph/nodes.py | 2 + core/dbt/contracts/graph/unparsed.py | 2 + core/dbt/parser/schemas.py | 21 ++++- schemas/dbt/manifest/v12.json | 82 +++++++++++++++++++ tests/unit/parser/test_parser.py | 39 +++++++++ 8 files changed, 180 insertions(+), 3 deletions(-) create mode 100644 .changes/unreleased/Features-20241218-170729.yaml diff --git a/.changes/unreleased/Features-20241218-170729.yaml b/.changes/unreleased/Features-20241218-170729.yaml new file mode 100644 index 00000000000..61149cbc1fd --- /dev/null +++ b/.changes/unreleased/Features-20241218-170729.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Add freshness definition on model for adaptive job +time: 2024-12-18T17:07:29.55754-08:00 +custom: + Author: ChenyuLInx + Issue: "11123" diff --git a/core/dbt/artifacts/resources/__init__.py b/core/dbt/artifacts/resources/__init__.py index 3435c386daf..891d6cbba2f 100644 --- a/core/dbt/artifacts/resources/__init__.py +++ b/core/dbt/artifacts/resources/__init__.py @@ -46,7 +46,12 @@ MetricTimeWindow, MetricTypeParams, ) -from dbt.artifacts.resources.v1.model import Model, ModelConfig, TimeSpine +from dbt.artifacts.resources.v1.model import ( + Model, + ModelConfig, + ModelFreshness, + TimeSpine, +) from dbt.artifacts.resources.v1.owner import Owner from dbt.artifacts.resources.v1.saved_query import ( Export, diff --git a/core/dbt/artifacts/resources/v1/model.py b/core/dbt/artifacts/resources/v1/model.py index 9c43970f488..dbc7c4d0de3 100644 --- a/core/dbt/artifacts/resources/v1/model.py +++ b/core/dbt/artifacts/resources/v1/model.py @@ -1,12 +1,14 @@ +import enum from dataclasses import dataclass, field from datetime import datetime from typing import Dict, List, Literal, Optional -from dbt.artifacts.resources.types import AccessType, NodeType +from dbt.artifacts.resources.types import AccessType, NodeType, TimePeriod from dbt.artifacts.resources.v1.components import ( CompiledResource, DeferRelation, NodeVersion, + Time, ) from dbt.artifacts.resources.v1.config import NodeConfig from dbt_common.contracts.config.base import MergeBehavior @@ -34,6 +36,25 @@ class TimeSpine(dbtClassMixin): custom_granularities: List[CustomGranularity] = field(default_factory=list) +class ModelFreshnessDependsOnOptions(enum.StrEnum): + all = "all" + any = "any" + + +@dataclass +class ModelBuildAfter(Time): + depends_on: ModelFreshnessDependsOnOptions = ModelFreshnessDependsOnOptions.any + count: int = 0 + period: TimePeriod = TimePeriod.hour + + +@dataclass +class ModelFreshness(dbtClassMixin): + build_after: ModelBuildAfter = field( + default_factory=lambda: ModelBuildAfter(period=TimePeriod.hour, count=0) + ) + + @dataclass class Model(CompiledResource): resource_type: Literal[NodeType.Model] @@ -46,6 +67,7 @@ class Model(CompiledResource): defer_relation: Optional[DeferRelation] = None primary_key: List[str] = field(default_factory=list) time_spine: Optional[TimeSpine] = None + freshness: Optional[ModelFreshness] = None def __post_serialize__(self, dct: Dict, context: Optional[Dict] = None): dct = super().__post_serialize__(dct, context) diff --git a/core/dbt/contracts/graph/nodes.py b/core/dbt/contracts/graph/nodes.py index f753a6afff1..13b98f65e03 100644 --- a/core/dbt/contracts/graph/nodes.py +++ b/core/dbt/contracts/graph/nodes.py @@ -60,6 +60,7 @@ from dbt.artifacts.resources import SqlOperation as SqlOperationResource from dbt.artifacts.resources import TimeSpine from dbt.artifacts.resources import UnitTestDefinition as UnitTestDefinitionResource +from dbt.artifacts.resources.v1.model import ModelFreshness from dbt.artifacts.schemas.batch_results import BatchResults from dbt.clients.jinja_static import statically_extract_has_name_this from dbt.contracts.graph.model_config import UnitTestNodeConfig @@ -1702,6 +1703,7 @@ class ParsedNodePatch(ParsedPatch): constraints: List[Dict[str, Any]] deprecation_date: Optional[datetime] time_spine: Optional[TimeSpine] = None + freshness: Optional[ModelFreshness] = None @dataclass diff --git a/core/dbt/contracts/graph/unparsed.py b/core/dbt/contracts/graph/unparsed.py index ab17ced5db9..d6b66753a34 100644 --- a/core/dbt/contracts/graph/unparsed.py +++ b/core/dbt/contracts/graph/unparsed.py @@ -18,6 +18,7 @@ MacroArgument, MaturityType, MeasureAggregationParameters, + ModelFreshness, NodeVersion, Owner, Quoting, @@ -224,6 +225,7 @@ class UnparsedModelUpdate(UnparsedNodeUpdate): versions: Sequence[UnparsedVersion] = field(default_factory=list) deprecation_date: Optional[datetime.datetime] = None time_spine: Optional[TimeSpine] = None + freshness: Optional[ModelFreshness] = None def __post_init__(self) -> None: if self.latest_version: diff --git a/core/dbt/parser/schemas.py b/core/dbt/parser/schemas.py index e9c66e184e4..cdcba1303c3 100644 --- a/core/dbt/parser/schemas.py +++ b/core/dbt/parser/schemas.py @@ -6,7 +6,12 @@ from typing import Any, Callable, Dict, Generic, Iterable, List, Optional, Type, TypeVar from dbt.artifacts.resources import RefArgs -from dbt.artifacts.resources.v1.model import CustomGranularity, TimeSpine +from dbt.artifacts.resources.v1.model import ( + CustomGranularity, + ModelBuildAfter, + ModelFreshness, + TimeSpine, +) from dbt.clients.jinja_static import statically_parse_ref_or_source from dbt.clients.yaml_helper import load_yaml_text from dbt.config import RuntimeConfig @@ -722,6 +727,7 @@ def parse_patch(self, block: TargetBlock[NodeTarget], refs: ParserRef) -> None: # code consistency. deprecation_date: Optional[datetime.datetime] = None time_spine: Optional[TimeSpine] = None + freshness: Optional[ModelFreshness] = None if isinstance(block.target, UnparsedModelUpdate): deprecation_date = block.target.deprecation_date time_spine = ( @@ -738,6 +744,17 @@ def parse_patch(self, block: TargetBlock[NodeTarget], refs: ParserRef) -> None: if block.target.time_spine else None ) + freshness = ( + ModelFreshness( + build_after=ModelBuildAfter( + count=block.target.freshness.build_after.count, + period=block.target.freshness.build_after.period, + depends_on=block.target.freshness.build_after.depends_on, + ), + ) + if block.target.freshness + else None + ) patch = ParsedNodePatch( name=block.target.name, original_file_path=block.target.original_file_path, @@ -754,6 +771,7 @@ def parse_patch(self, block: TargetBlock[NodeTarget], refs: ParserRef) -> None: constraints=block.target.constraints, deprecation_date=deprecation_date, time_spine=time_spine, + freshness=freshness, ) assert isinstance(self.yaml.file, SchemaSourceFile) source_file: SchemaSourceFile = self.yaml.file @@ -1043,6 +1061,7 @@ def patch_node_properties(self, node, patch: "ParsedNodePatch") -> None: # These two will have to be reapplied after config is built for versioned models self.patch_constraints(node, patch.constraints) self.patch_time_spine(node, patch.time_spine) + node.freshness = patch.freshness node.build_contract_checksum() def patch_constraints(self, node: ModelNode, constraints: List[Dict[str, Any]]) -> None: diff --git a/schemas/dbt/manifest/v12.json b/schemas/dbt/manifest/v12.json index 1056bb12bd3..9c6413db316 100644 --- a/schemas/dbt/manifest/v12.json +++ b/schemas/dbt/manifest/v12.json @@ -4777,6 +4777,47 @@ } ], "default": null + }, + "freshness": { + "anyOf": [ + { + "type": "object", + "title": "ModelFreshness", + "properties": { + "build_after": { + "type": "object", + "title": "ModelBuildAfter", + "properties": { + "count": { + "type": "integer", + "default": 0 + }, + "period": { + "enum": [ + "minute", + "hour", + "day" + ], + "default": "hour" + }, + "depends_on": { + "enum": [ + "all", + "any" + ], + "default": "any" + } + }, + "additionalProperties": false + } + }, + "additionalProperties": false + }, + { + "type": "null" + } + ], + "default": null } }, "additionalProperties": false, @@ -14585,6 +14626,47 @@ } ], "default": null + }, + "freshness": { + "anyOf": [ + { + "type": "object", + "title": "ModelFreshness", + "properties": { + "build_after": { + "type": "object", + "title": "ModelBuildAfter", + "properties": { + "count": { + "type": "integer", + "default": 0 + }, + "period": { + "enum": [ + "minute", + "hour", + "day" + ], + "default": "hour" + }, + "depends_on": { + "enum": [ + "all", + "any" + ], + "default": "any" + } + }, + "additionalProperties": false + } + }, + "additionalProperties": false + }, + { + "type": "null" + } + ], + "default": null } }, "additionalProperties": false, diff --git a/tests/unit/parser/test_parser.py b/tests/unit/parser/test_parser.py index 59d64f679fc..2921d19af0b 100644 --- a/tests/unit/parser/test_parser.py +++ b/tests/unit/parser/test_parser.py @@ -8,6 +8,7 @@ from dbt import tracking from dbt.artifacts.resources import ModelConfig, RefArgs +from dbt.artifacts.resources.v1.model import ModelBuildAfter from dbt.context.context_config import ContextConfig from dbt.contracts.files import FileHash, FilePath, SchemaSourceFile, SourceFile from dbt.contracts.graph.manifest import Manifest @@ -301,6 +302,23 @@ def assertEqualNodes(node_one, node_two): arg: 100 """ +SINGLE_TALBE_MODEL_FRESHNESS = """ +models: + - name: my_model + description: A description of my model + freshness: + build_after: {count: 1, period: day} +""" + +SINGLE_TALBE_MODEL_FRESHNESS_ONLY_DEPEND_ON = """ +models: + - name: my_model + description: A description of my model + freshness: + build_after: + depends_on: all +""" + MULTIPLE_TABLE_VERSIONED_MODEL_TESTS = """ models: @@ -701,6 +719,27 @@ def test__read_basic_model_tests(self): self.assertEqual(len(list(self.parser.manifest.sources)), 0) self.assertEqual(len(list(self.parser.manifest.nodes)), 4) + def test__parse_model_freshness(self): + block = self.file_block_for(SINGLE_TALBE_MODEL_FRESHNESS, "test_one.yml") + self.parser.manifest.files[block.file.file_id] = block.file + dct = yaml_from_file(block.file) + self.parser.parse_file(block, dct) + self.assert_has_manifest_lengths(self.parser.manifest, nodes=1) + + assert self.parser.manifest.nodes[ + "model.root.my_model" + ].freshness.build_after == ModelBuildAfter(count=1, period="day", depends_on="any") + + def test__parse_model_freshness_depend_on(self): + block = self.file_block_for(SINGLE_TALBE_MODEL_FRESHNESS_ONLY_DEPEND_ON, "test_one.yml") + self.parser.manifest.files[block.file.file_id] = block.file + dct = yaml_from_file(block.file) + self.parser.parse_file(block, dct) + self.assert_has_manifest_lengths(self.parser.manifest, nodes=1) + assert self.parser.manifest.nodes[ + "model.root.my_model" + ].freshness.build_after == ModelBuildAfter(count=0, period="hour", depends_on="all") + def test__read_basic_model_tests_wrong_severity(self): block = self.yaml_block_for(SINGLE_TABLE_MODEL_TESTS_WRONG_SEVERITY, "test_one.yml") dct = yaml_from_file(block.file)