Skip to content

Commit

Permalink
Support time spine configs for sub-daily granularity (#10483)
Browse files Browse the repository at this point in the history
  • Loading branch information
courtneyholcomb authored Jul 29, 2024
1 parent c598741 commit 0a160fc
Show file tree
Hide file tree
Showing 22 changed files with 1,357 additions and 31 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20240722-202238.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Support new semantic layer time spine configs to enable sub-daily granularity.
time: 2024-07-22T20:22:38.258249-07:00
custom:
Author: courtneyholcomb
Issue: "10475"
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,7 @@ help: ## Show this help message.
@echo
@echo 'options:'
@echo 'use USE_DOCKER=true to run target in a docker container'

.PHONY: json_schema
json_schema: ## Update generated JSON schema using code changes.
scripts/collect-artifact-schema.py --path schemas
2 changes: 1 addition & 1 deletion core/dbt/artifacts/resources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
MetricTimeWindow,
MetricTypeParams,
)
from dbt.artifacts.resources.v1.model import Model, ModelConfig
from dbt.artifacts.resources.v1.model import Model, ModelConfig, TimeSpine
from dbt.artifacts.resources.v1.owner import Owner
from dbt.artifacts.resources.v1.saved_query import (
Export,
Expand Down
2 changes: 2 additions & 0 deletions core/dbt/artifacts/resources/v1/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from dbt_common.contracts.constraints import ColumnLevelConstraint
from dbt_common.contracts.util import Mergeable
from dbt_common.dataclass_schema import ExtensibleDbtClassMixin, dbtClassMixin
from dbt_semantic_interfaces.type_enums import TimeGranularity

NodeVersion = Union[str, float]

Expand Down Expand Up @@ -66,6 +67,7 @@ class ColumnInfo(AdditionalPropertiesMixin, ExtensibleDbtClassMixin):
quote: Optional[bool] = None
tags: List[str] = field(default_factory=list)
_extra: Dict[str, Any] = field(default_factory=dict)
granularity: Optional[TimeGranularity] = None


@dataclass
Expand Down
7 changes: 7 additions & 0 deletions core/dbt/artifacts/resources/v1/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from dbt.artifacts.resources.v1.config import NodeConfig
from dbt_common.contracts.config.base import MergeBehavior
from dbt_common.contracts.constraints import ModelLevelConstraint
from dbt_common.dataclass_schema import dbtClassMixin


@dataclass
Expand All @@ -21,6 +22,11 @@ class ModelConfig(NodeConfig):
)


@dataclass
class TimeSpine(dbtClassMixin):
standard_granularity_column: str


@dataclass
class Model(CompiledResource):
resource_type: Literal[NodeType.Model]
Expand All @@ -32,6 +38,7 @@ class Model(CompiledResource):
deprecation_date: Optional[datetime] = None
defer_relation: Optional[DeferRelation] = None
primary_key: List[str] = field(default_factory=list)
time_spine: Optional[TimeSpine] = None

def __post_serialize__(self, dct: Dict, context: Optional[Dict] = None):
dct = super().__post_serialize__(dct, context)
Expand Down
6 changes: 5 additions & 1 deletion core/dbt/constants.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from dbt_semantic_interfaces.type_enums import TimeGranularity

DEFAULT_ENV_PLACEHOLDER = "DBT_DEFAULT_PLACEHOLDER"

SECRET_PLACEHOLDER = "$$$DBT_SECRET_START$$${}$$$DBT_SECRET_END$$$"
Expand All @@ -15,6 +17,8 @@
PACKAGE_LOCK_FILE_NAME = "package-lock.yml"
MANIFEST_FILE_NAME = "manifest.json"
SEMANTIC_MANIFEST_FILE_NAME = "semantic_manifest.json"
TIME_SPINE_MODEL_NAME = "metricflow_time_spine"
LEGACY_TIME_SPINE_MODEL_NAME = "metricflow_time_spine"
LEGACY_TIME_SPINE_GRANULARITY = TimeGranularity.DAY
MINIMUM_REQUIRED_TIME_SPINE_GRANULARITY = TimeGranularity.DAY
PARTIAL_PARSE_FILE_NAME = "partial_parse.msgpack"
PACKAGE_LOCK_HASH_KEY = "sha1_hash"
2 changes: 2 additions & 0 deletions core/dbt/contracts/graph/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
from dbt.artifacts.resources import Snapshot as SnapshotResource
from dbt.artifacts.resources import SourceDefinition as SourceDefinitionResource
from dbt.artifacts.resources import SqlOperation as SqlOperationResource
from dbt.artifacts.resources import TimeSpine
from dbt.artifacts.resources import UnitTestDefinition as UnitTestDefinitionResource
from dbt.contracts.graph.model_config import UnitTestNodeConfig
from dbt.contracts.graph.node_args import ModelNodeArgs
Expand Down Expand Up @@ -1625,6 +1626,7 @@ class ParsedNodePatch(ParsedPatch):
latest_version: Optional[NodeVersion]
constraints: List[Dict[str, Any]]
deprecation_date: Optional[datetime]
time_spine: Optional[TimeSpine] = None


@dataclass
Expand Down
110 changes: 90 additions & 20 deletions core/dbt/contracts/graph/semantic_manifest.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
from dbt.constants import TIME_SPINE_MODEL_NAME
from typing import List, Optional

from dbt.constants import (
LEGACY_TIME_SPINE_GRANULARITY,
LEGACY_TIME_SPINE_MODEL_NAME,
MINIMUM_REQUIRED_TIME_SPINE_GRANULARITY,
)
from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.graph.nodes import ModelNode
from dbt.events.types import SemanticValidationFailure
from dbt.exceptions import ParsingError
from dbt_common.clients.system import write_file
from dbt_common.events.base_types import EventLevel
from dbt_common.events.functions import fire_event
from dbt_semantic_interfaces.implementations.metric import PydanticMetric
from dbt_semantic_interfaces.implementations.node_relation import PydanticNodeRelation
from dbt_semantic_interfaces.implementations.project_configuration import (
PydanticProjectConfiguration,
)
Expand All @@ -13,8 +22,12 @@
PydanticSemanticManifest,
)
from dbt_semantic_interfaces.implementations.semantic_model import PydanticSemanticModel
from dbt_semantic_interfaces.implementations.time_spine import (
PydanticTimeSpine,
PydanticTimeSpinePrimaryColumn,
)
from dbt_semantic_interfaces.implementations.time_spine_table_configuration import (
PydanticTimeSpineTableConfiguration,
PydanticTimeSpineTableConfiguration as LegacyTimeSpine,
)
from dbt_semantic_interfaces.type_enums import TimeGranularity
from dbt_semantic_interfaces.validations.semantic_manifest_validator import (
Expand All @@ -23,7 +36,7 @@


class SemanticManifest:
def __init__(self, manifest) -> None:
def __init__(self, manifest: Manifest) -> None:
self.manifest = manifest

def validate(self) -> bool:
Expand Down Expand Up @@ -59,8 +72,50 @@ def write_json_to_file(self, file_path: str):
write_file(file_path, json)

def _get_pydantic_semantic_manifest(self) -> PydanticSemanticManifest:
pydantic_time_spines: List[PydanticTimeSpine] = []
minimum_time_spine_granularity: Optional[TimeGranularity] = None
for node in self.manifest.nodes.values():
if not (isinstance(node, ModelNode) and node.time_spine):
continue
time_spine = node.time_spine
standard_granularity_column = None
for column in node.columns.values():
if column.name == time_spine.standard_granularity_column:
standard_granularity_column = column
break
# Assertions needed for type checking
if not standard_granularity_column:
raise ParsingError(
"Expected to find time spine standard granularity column in model columns, but did not. "
"This should have been caught in YAML parsing."
)
if not standard_granularity_column.granularity:
raise ParsingError(
"Expected to find granularity set for time spine standard granularity column, but did not. "
"This should have been caught in YAML parsing."
)
pydantic_time_spine = PydanticTimeSpine(
node_relation=PydanticNodeRelation(
alias=node.alias,
schema_name=node.schema,
database=node.database,
relation_name=node.relation_name,
),
primary_column=PydanticTimeSpinePrimaryColumn(
name=time_spine.standard_granularity_column,
time_granularity=standard_granularity_column.granularity,
),
)
pydantic_time_spines.append(pydantic_time_spine)
if (
not minimum_time_spine_granularity
or standard_granularity_column.granularity.to_int()
< minimum_time_spine_granularity.to_int()
):
minimum_time_spine_granularity = standard_granularity_column.granularity

project_config = PydanticProjectConfiguration(
time_spine_table_configurations=[],
time_spine_table_configurations=[], time_spines=pydantic_time_spines
)
pydantic_semantic_manifest = PydanticSemanticManifest(
metrics=[], semantic_models=[], project_configuration=project_config
Expand All @@ -79,24 +134,39 @@ def _get_pydantic_semantic_manifest(self) -> PydanticSemanticManifest:
PydanticSavedQuery.parse_obj(saved_query.to_dict())
)

# Look for time-spine table model and create time spine table configuration
if self.manifest.semantic_models:
# Get model for time_spine_table
model = self.manifest.ref_lookup.find(TIME_SPINE_MODEL_NAME, None, None, self.manifest)
if not model:
legacy_time_spine_model = self.manifest.ref_lookup.find(
LEGACY_TIME_SPINE_MODEL_NAME, None, None, self.manifest
)
if legacy_time_spine_model:
if (
not minimum_time_spine_granularity
or LEGACY_TIME_SPINE_GRANULARITY.to_int()
< minimum_time_spine_granularity.to_int()
):
minimum_time_spine_granularity = LEGACY_TIME_SPINE_GRANULARITY

# If no time spines have been configured at DAY or smaller AND legacy time spine model does not exist, error.
if (
not minimum_time_spine_granularity
or minimum_time_spine_granularity.to_int()
> MINIMUM_REQUIRED_TIME_SPINE_GRANULARITY.to_int()
):
raise ParsingError(
"The semantic layer requires a 'metricflow_time_spine' model in the project, but none was found. "
"Guidance on creating this model can be found on our docs site ("
"https://docs.getdbt.com/docs/build/metricflow-time-spine) "
"The semantic layer requires a time spine model with granularity DAY or smaller in the project, "
"but none was found. Guidance on creating this model can be found on our docs site "
"(https://docs.getdbt.com/docs/build/metricflow-time-spine)." # TODO: update docs link when available!
)
# Create time_spine_table_config, set it in project_config, and add to semantic manifest
time_spine_table_config = PydanticTimeSpineTableConfiguration(
location=model.relation_name,
column_name="date_day",
grain=TimeGranularity.DAY,
)
pydantic_semantic_manifest.project_configuration.time_spine_table_configurations = [
time_spine_table_config
]

# For backward compatibility: if legacy time spine exists, include it in the manifest.
if legacy_time_spine_model:
legacy_time_spine = LegacyTimeSpine(
location=legacy_time_spine_model.relation_name,
column_name="date_day",
grain=LEGACY_TIME_SPINE_GRANULARITY,
)
pydantic_semantic_manifest.project_configuration.time_spine_table_configurations = [
legacy_time_spine
]

return pydantic_semantic_manifest
27 changes: 27 additions & 0 deletions core/dbt/contracts/graph/unparsed.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ class HasColumnAndTestProps(HasColumnProps):
class UnparsedColumn(HasColumnAndTestProps):
quote: Optional[bool] = None
tags: List[str] = field(default_factory=list)
granularity: Optional[str] = None # str is really a TimeGranularity Enum


@dataclass
Expand Down Expand Up @@ -206,13 +207,19 @@ class UnparsedNodeUpdate(HasConfig, HasColumnTests, HasColumnAndTestProps, HasYa
access: Optional[str] = None


@dataclass
class UnparsedTimeSpine(dbtClassMixin):
standard_granularity_column: str


@dataclass
class UnparsedModelUpdate(UnparsedNodeUpdate):
quote_columns: Optional[bool] = None
access: Optional[str] = None
latest_version: Optional[NodeVersion] = None
versions: Sequence[UnparsedVersion] = field(default_factory=list)
deprecation_date: Optional[datetime.datetime] = None
time_spine: Optional[UnparsedTimeSpine] = None

def __post_init__(self) -> None:
if self.latest_version:
Expand All @@ -234,6 +241,26 @@ def __post_init__(self) -> None:

self.deprecation_date = normalize_date(self.deprecation_date)

if self.time_spine:
columns = (
self.get_columns_for_version(self.latest_version)
if self.latest_version
else self.columns
)
column_names_to_columns = {column.name: column for column in columns}
if self.time_spine.standard_granularity_column not in column_names_to_columns:
raise ParsingError(
f"Time spine standard granularity column must be defined on the model. Got invalid "
f"column name '{self.time_spine.standard_granularity_column}' for model '{self.name}'. Valid names"
f"{' for latest version' if self.latest_version else ''}: {list(column_names_to_columns.keys())}."
)
column = column_names_to_columns[self.time_spine.standard_granularity_column]
if not column.granularity:
raise ParsingError(
f"Time spine standard granularity column must have a granularity defined. Please add one for "
f"column '{self.time_spine.standard_granularity_column}' in model '{self.name}'."
)

def get_columns_for_version(self, version: NodeVersion) -> List[UnparsedColumn]:
if version not in self._version_map:
raise DbtInternalError(
Expand Down
11 changes: 6 additions & 5 deletions core/dbt/parser/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from dbt.parser.search import FileBlock
from dbt_common.contracts.constraints import ColumnLevelConstraint, ConstraintType
from dbt_common.exceptions import DbtInternalError
from dbt_semantic_interfaces.type_enums import TimeGranularity


def trimmed(inp: str) -> str:
Expand Down Expand Up @@ -185,13 +186,12 @@ def __init__(self) -> None:
self.column_info: Dict[str, ColumnInfo] = {}

def _add(self, column: HasColumnProps) -> None:
tags: List[str] = []
tags.extend(getattr(column, "tags", ()))
quote: Optional[bool]
tags: List[str] = getattr(column, "tags", [])
quote: Optional[bool] = None
granularity: Optional[TimeGranularity] = None
if isinstance(column, UnparsedColumn):
quote = column.quote
else:
quote = None
granularity = TimeGranularity(column.granularity) if column.granularity else None

if any(
c
Expand All @@ -209,6 +209,7 @@ def _add(self, column: HasColumnProps) -> None:
tags=tags,
quote=quote,
_extra=column.extra,
granularity=granularity,
)

@classmethod
Expand Down
15 changes: 14 additions & 1 deletion core/dbt/parser/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from dbt import deprecations
from dbt.artifacts.resources import RefArgs
from dbt.artifacts.resources.v1.model import TimeSpine
from dbt.clients.jinja_static import statically_parse_ref_or_source
from dbt.clients.yaml_helper import load_yaml_text
from dbt.config import RuntimeConfig
Expand Down Expand Up @@ -619,9 +620,16 @@ def parse_patch(self, block: TargetBlock[NodeTarget], refs: ParserRef) -> None:
# could possibly skip creating one. Leaving here for now for
# code consistency.
deprecation_date: Optional[datetime.datetime] = None
time_spine: Optional[TimeSpine] = None
if isinstance(block.target, UnparsedModelUpdate):
deprecation_date = block.target.deprecation_date

time_spine = (
TimeSpine(
standard_granularity_column=block.target.time_spine.standard_granularity_column
)
if block.target.time_spine
else None
)
patch = ParsedNodePatch(
name=block.target.name,
original_file_path=block.target.original_file_path,
Expand All @@ -637,6 +645,7 @@ def parse_patch(self, block: TargetBlock[NodeTarget], refs: ParserRef) -> None:
latest_version=None,
constraints=block.target.constraints,
deprecation_date=deprecation_date,
time_spine=time_spine,
)
assert isinstance(self.yaml.file, SchemaSourceFile)
source_file: SchemaSourceFile = self.yaml.file
Expand Down Expand Up @@ -915,6 +924,7 @@ def patch_node_properties(self, node, patch: "ParsedNodePatch") -> None:
)
# These two will have to be reapplied after config is built for versioned models
self.patch_constraints(node, patch.constraints)
self.patch_time_spine(node, patch.time_spine)
node.build_contract_checksum()

def patch_constraints(self, node, constraints: List[Dict[str, Any]]) -> None:
Expand Down Expand Up @@ -953,6 +963,9 @@ def _process_constraints_refs_and_sources(self, model_node: ModelNode) -> None:
else:
model_node.sources.append(ref_or_source)

def patch_time_spine(self, node, time_spine: Optional[TimeSpine]) -> None:
node.time_spine = time_spine

def _validate_pk_constraints(
self, model_node: ModelNode, constraints: List[Dict[str, Any]]
) -> None:
Expand Down
Loading

0 comments on commit 0a160fc

Please sign in to comment.