Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add model freshness for adaptive job #11170

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20241218-170729.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Add freshness definition on model for adaptive job
time: 2024-12-18T17:07:29.55754-08:00
custom:
Author: ChenyuLInx
Issue: "11123"
7 changes: 6 additions & 1 deletion core/dbt/artifacts/resources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,12 @@
MetricTimeWindow,
MetricTypeParams,
)
from dbt.artifacts.resources.v1.model import Model, ModelConfig, TimeSpine
from dbt.artifacts.resources.v1.model import (
Model,
ModelConfig,
ModelFreshness,
TimeSpine,
)
from dbt.artifacts.resources.v1.owner import Owner
from dbt.artifacts.resources.v1.saved_query import (
Export,
Expand Down
22 changes: 21 additions & 1 deletion core/dbt/artifacts/resources/v1/model.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import enum
from dataclasses import dataclass, field
from datetime import datetime
from typing import Dict, List, Literal, Optional

from dbt.artifacts.resources.types import AccessType, NodeType
from dbt.artifacts.resources.types import AccessType, NodeType, TimePeriod
from dbt.artifacts.resources.v1.components import (
CompiledResource,
DeferRelation,
NodeVersion,
Time,
)
from dbt.artifacts.resources.v1.config import NodeConfig
from dbt_common.contracts.config.base import MergeBehavior
Expand Down Expand Up @@ -34,6 +36,23 @@ class TimeSpine(dbtClassMixin):
custom_granularities: List[CustomGranularity] = field(default_factory=list)


class ModelFreshnessDependsOnOptions(enum.Enum):
all = "all"
any = "any"


@dataclass
class ModelBuildAfter(Time):
depends_on: ModelFreshnessDependsOnOptions = ModelFreshnessDependsOnOptions.any
count: int = 0
period: TimePeriod = TimePeriod.hour


@dataclass
class ModelFreshness(dbtClassMixin):
build_after: ModelBuildAfter = field(default_factory=ModelBuildAfter)


@dataclass
class Model(CompiledResource):
resource_type: Literal[NodeType.Model]
Expand All @@ -46,6 +65,7 @@ class Model(CompiledResource):
defer_relation: Optional[DeferRelation] = None
primary_key: List[str] = field(default_factory=list)
time_spine: Optional[TimeSpine] = None
freshness: Optional[ModelFreshness] = None

def __post_serialize__(self, dct: Dict, context: Optional[Dict] = None):
dct = super().__post_serialize__(dct, context)
Expand Down
2 changes: 2 additions & 0 deletions core/dbt/contracts/graph/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
from dbt.artifacts.resources import SqlOperation as SqlOperationResource
from dbt.artifacts.resources import TimeSpine
from dbt.artifacts.resources import UnitTestDefinition as UnitTestDefinitionResource
from dbt.artifacts.resources.v1.model import ModelFreshness
from dbt.artifacts.schemas.batch_results import BatchResults
from dbt.clients.jinja_static import statically_extract_has_name_this
from dbt.contracts.graph.model_config import UnitTestNodeConfig
Expand Down Expand Up @@ -1702,6 +1703,7 @@ class ParsedNodePatch(ParsedPatch):
constraints: List[Dict[str, Any]]
deprecation_date: Optional[datetime]
time_spine: Optional[TimeSpine] = None
freshness: Optional[ModelFreshness] = None


@dataclass
Expand Down
2 changes: 2 additions & 0 deletions core/dbt/contracts/graph/unparsed.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
MacroArgument,
MaturityType,
MeasureAggregationParameters,
ModelFreshness,
NodeVersion,
Owner,
Quoting,
Expand Down Expand Up @@ -224,6 +225,7 @@ class UnparsedModelUpdate(UnparsedNodeUpdate):
versions: Sequence[UnparsedVersion] = field(default_factory=list)
deprecation_date: Optional[datetime.datetime] = None
time_spine: Optional[TimeSpine] = None
freshness: Optional[ModelFreshness] = None

def __post_init__(self) -> None:
if self.latest_version:
Expand Down
21 changes: 20 additions & 1 deletion core/dbt/parser/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,12 @@
from typing import Any, Callable, Dict, Generic, Iterable, List, Optional, Type, TypeVar

from dbt.artifacts.resources import RefArgs
from dbt.artifacts.resources.v1.model import CustomGranularity, TimeSpine
from dbt.artifacts.resources.v1.model import (
CustomGranularity,
ModelBuildAfter,
ModelFreshness,
TimeSpine,
)
from dbt.clients.jinja_static import statically_parse_ref_or_source
from dbt.clients.yaml_helper import load_yaml_text
from dbt.config import RuntimeConfig
Expand Down Expand Up @@ -722,6 +727,7 @@ def parse_patch(self, block: TargetBlock[NodeTarget], refs: ParserRef) -> None:
# code consistency.
deprecation_date: Optional[datetime.datetime] = None
time_spine: Optional[TimeSpine] = None
freshness: Optional[ModelFreshness] = None
if isinstance(block.target, UnparsedModelUpdate):
deprecation_date = block.target.deprecation_date
time_spine = (
Expand All @@ -738,6 +744,17 @@ def parse_patch(self, block: TargetBlock[NodeTarget], refs: ParserRef) -> None:
if block.target.time_spine
else None
)
freshness = (
ModelFreshness(
build_after=ModelBuildAfter(
count=block.target.freshness.build_after.count,
period=block.target.freshness.build_after.period,
depends_on=block.target.freshness.build_after.depends_on,
),
)
if block.target.freshness
else None
)
patch = ParsedNodePatch(
name=block.target.name,
original_file_path=block.target.original_file_path,
Expand All @@ -754,6 +771,7 @@ def parse_patch(self, block: TargetBlock[NodeTarget], refs: ParserRef) -> None:
constraints=block.target.constraints,
deprecation_date=deprecation_date,
time_spine=time_spine,
freshness=freshness,
)
assert isinstance(self.yaml.file, SchemaSourceFile)
source_file: SchemaSourceFile = self.yaml.file
Expand Down Expand Up @@ -1043,6 +1061,7 @@ def patch_node_properties(self, node, patch: "ParsedNodePatch") -> None:
# These two will have to be reapplied after config is built for versioned models
self.patch_constraints(node, patch.constraints)
self.patch_time_spine(node, patch.time_spine)
node.freshness = patch.freshness
node.build_contract_checksum()

def patch_constraints(self, node: ModelNode, constraints: List[Dict[str, Any]]) -> None:
Expand Down
82 changes: 82 additions & 0 deletions schemas/dbt/manifest/v12.json
Original file line number Diff line number Diff line change
Expand Up @@ -4777,6 +4777,47 @@
}
],
"default": null
},
"freshness": {
"anyOf": [
{
"type": "object",
"title": "ModelFreshness",
"properties": {
"build_after": {
"type": "object",
"title": "ModelBuildAfter",
"properties": {
"count": {
"type": "integer",
"default": 0
},
"period": {
"enum": [
"minute",
"hour",
"day"
],
"default": "hour"
},
"depends_on": {
"enum": [
"all",
"any"
],
"default": "any"
}
},
"additionalProperties": false
}
},
"additionalProperties": false
},
{
"type": "null"
}
],
"default": null
}
},
"additionalProperties": false,
Expand Down Expand Up @@ -14585,6 +14626,47 @@
}
],
"default": null
},
"freshness": {
"anyOf": [
{
"type": "object",
"title": "ModelFreshness",
"properties": {
"build_after": {
"type": "object",
"title": "ModelBuildAfter",
"properties": {
"count": {
"type": "integer",
"default": 0
},
"period": {
"enum": [
"minute",
"hour",
"day"
],
"default": "hour"
},
"depends_on": {
"enum": [
"all",
"any"
],
"default": "any"
}
},
"additionalProperties": false
}
},
"additionalProperties": false
},
{
"type": "null"
}
],
"default": null
}
},
"additionalProperties": false,
Expand Down
8 changes: 8 additions & 0 deletions tests/functional/artifacts/expected_manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ def expected_seeded_manifest(project, model_database=None, quote_model=False):
"version": None,
"latest_version": None,
"time_spine": None,
"freshness": None,
},
"model.test.second_model": {
"compiled_path": os.path.join(compiled_model_path, "second_model.sql"),
Expand Down Expand Up @@ -471,6 +472,7 @@ def expected_seeded_manifest(project, model_database=None, quote_model=False):
"version": None,
"latest_version": None,
"time_spine": None,
"freshness": None,
},
"seed.test.seed": {
"build_path": None,
Expand Down Expand Up @@ -1003,6 +1005,7 @@ def expected_references_manifest(project):
"latest_version": None,
"constraints": [],
"time_spine": None,
"freshness": None,
},
"model.test.ephemeral_summary": {
"alias": "ephemeral_summary",
Expand Down Expand Up @@ -1075,6 +1078,7 @@ def expected_references_manifest(project):
"latest_version": None,
"constraints": [],
"time_spine": None,
"freshness": None,
},
"model.test.view_summary": {
"alias": "view_summary",
Expand Down Expand Up @@ -1143,6 +1147,7 @@ def expected_references_manifest(project):
"latest_version": None,
"constraints": [],
"time_spine": None,
"freshness": None,
},
"seed.test.seed": {
"alias": "seed",
Expand Down Expand Up @@ -1609,6 +1614,7 @@ def expected_versions_manifest(project):
"version": 1,
"latest_version": 2,
"time_spine": None,
"freshness": None,
},
"model.test.versioned_model.v2": {
"alias": "versioned_model_v2",
Expand Down Expand Up @@ -1680,6 +1686,7 @@ def expected_versions_manifest(project):
"version": 2,
"latest_version": 2,
"time_spine": None,
"freshness": None,
},
"model.test.ref_versioned_model": {
"alias": "ref_versioned_model",
Expand Down Expand Up @@ -1738,6 +1745,7 @@ def expected_versions_manifest(project):
"version": None,
"latest_version": None,
"time_spine": None,
"freshness": None,
},
"test.test.unique_versioned_model_v1_first_name.6138195dec": {
"alias": "unique_versioned_model_v1_first_name",
Expand Down
1 change: 1 addition & 0 deletions tests/unit/contracts/graph/test_manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
"defer_relation",
"time_spine",
"batch",
"freshness",
}
)

Expand Down
46 changes: 46 additions & 0 deletions tests/unit/parser/test_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@

from dbt import tracking
from dbt.artifacts.resources import ModelConfig, RefArgs
from dbt.artifacts.resources.v1.model import (
ModelBuildAfter,
ModelFreshnessDependsOnOptions,
)
from dbt.context.context_config import ContextConfig
from dbt.contracts.files import FileHash, FilePath, SchemaSourceFile, SourceFile
from dbt.contracts.graph.manifest import Manifest
Expand Down Expand Up @@ -301,6 +305,23 @@ def assertEqualNodes(node_one, node_two):
arg: 100
"""

SINGLE_TALBE_MODEL_FRESHNESS = """
models:
- name: my_model
description: A description of my model
freshness:
build_after: {count: 1, period: day}
"""

SINGLE_TALBE_MODEL_FRESHNESS_ONLY_DEPEND_ON = """
models:
- name: my_model
description: A description of my model
freshness:
build_after:
depends_on: all
"""


MULTIPLE_TABLE_VERSIONED_MODEL_TESTS = """
models:
Expand Down Expand Up @@ -701,6 +722,31 @@ def test__read_basic_model_tests(self):
self.assertEqual(len(list(self.parser.manifest.sources)), 0)
self.assertEqual(len(list(self.parser.manifest.nodes)), 4)

def test__parse_model_freshness(self):
block = self.file_block_for(SINGLE_TALBE_MODEL_FRESHNESS, "test_one.yml")
self.parser.manifest.files[block.file.file_id] = block.file
dct = yaml_from_file(block.file)
self.parser.parse_file(block, dct)
self.assert_has_manifest_lengths(self.parser.manifest, nodes=1)

assert self.parser.manifest.nodes[
"model.root.my_model"
].freshness.build_after == ModelBuildAfter(
count=1, period="day", depends_on=ModelFreshnessDependsOnOptions.any
)

def test__parse_model_freshness_depend_on(self):
block = self.file_block_for(SINGLE_TALBE_MODEL_FRESHNESS_ONLY_DEPEND_ON, "test_one.yml")
self.parser.manifest.files[block.file.file_id] = block.file
dct = yaml_from_file(block.file)
self.parser.parse_file(block, dct)
self.assert_has_manifest_lengths(self.parser.manifest, nodes=1)
assert self.parser.manifest.nodes[
"model.root.my_model"
].freshness.build_after == ModelBuildAfter(
count=0, period="hour", depends_on=ModelFreshnessDependsOnOptions.all
)

def test__read_basic_model_tests_wrong_severity(self):
block = self.yaml_block_for(SINGLE_TABLE_MODEL_TESTS_WRONG_SEVERITY, "test_one.yml")
dct = yaml_from_file(block.file)
Expand Down
Loading