Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ADAP-835: Optimize manual refresh on auto-refreshed materialized views #8847

Draft
wants to merge 18 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
7544382
changelog
mikealfare Oct 12, 2023
ea1b44d
add test to ensure no changes are made to the database when no change…
mikealfare Oct 12, 2023
4cc726f
add a stock exception for non-implemented test utilities
mikealfare Oct 12, 2023
f67f6be
add a stock exception for non-implemented test utilities
mikealfare Oct 12, 2023
99296fe
add test framework for interaction between auto refresh and running d…
mikealfare Oct 12, 2023
e3b986e
implement autorefresh test for dbt-postgres
mikealfare Oct 13, 2023
6499a68
revert unintended whitespace change
mikealfare Oct 13, 2023
aba3c03
add comments explaining how postgres last refreshed date works for ma…
mikealfare Oct 13, 2023
6e4a960
add method for getting the relation config and condition on auto_refr…
mikealfare Oct 13, 2023
44b3c70
add global F401 ignore for __init__.py files
mikealfare Oct 14, 2023
a6f8e42
add relation config factory for encapsulating relation config methods
mikealfare Oct 14, 2023
34130bb
make submodule private to push import to the subpackage level
mikealfare Oct 14, 2023
1e3f403
update types and names to be more accurate, move some parsing methods…
mikealfare Oct 14, 2023
1d41b50
update types and names to be more accurate
mikealfare Oct 14, 2023
60338bf
return a conservative default instead of raising an exception
mikealfare Oct 14, 2023
be6337b
update warning for no refresh to match existing warning in other adap…
mikealfare Oct 14, 2023
10c90b2
forward from_model_node to from_node for backwards compatibility
mikealfare Oct 14, 2023
c006503
fix exception type
mikealfare Oct 19, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20231012-122917.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Optimize refreshing materialized views when autorefreshed
time: 2023-10-12T12:29:17.705373-04:00
custom:
Author: mikealfare
Issue: "6911"
2 changes: 2 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,5 @@ ignore =
E741
E501 # long line checking is done in black
exclude = test/
per-file-ignores =
*/__init__.py: F401
13 changes: 13 additions & 0 deletions core/dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
)

from dbt.adapters.capability import Capability, CapabilityDict
from dbt.adapters.relation_configs import RelationConfigFactory
from dbt.contracts.graph.nodes import ColumnLevelConstraint, ConstraintType, ModelLevelConstraint

import agate
Expand Down Expand Up @@ -246,6 +247,18 @@ def __init__(self, config) -> None:
self.cache = RelationsCache()
self.connections = self.ConnectionManager(config)
self._macro_manifest_lazy: Optional[MacroManifest] = None
self.relation_config_factory = self._relation_config_factory()

def _relation_config_factory(self) -> RelationConfigFactory:
"""
This sets the default relation config factory in the init.
If you need to adjust the default settings, override this
returning an instance with the settings specific to your adapter.

See `dbt.adapters.relation_configs.factory.RelationConfigFactory`
for more information regarding these settings.
"""
return RelationConfigFactory()

###
# Methods that pass through to the connection manager
Expand Down
10 changes: 7 additions & 3 deletions core/dbt/adapters/relation_configs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
from dbt.adapters.relation_configs.config_base import ( # noqa: F401
from dbt.adapters.relation_configs._factory import RelationConfigFactory
from dbt.adapters.relation_configs._materialized_view import (
MaterializedViewRelationConfig,
)
from dbt.adapters.relation_configs.config_base import (
RelationConfigBase,
RelationResults,
)
from dbt.adapters.relation_configs.config_change import ( # noqa: F401
from dbt.adapters.relation_configs.config_change import (
RelationConfigChangeAction,
RelationConfigChange,
)
from dbt.adapters.relation_configs.config_validation import ( # noqa: F401
from dbt.adapters.relation_configs.config_validation import (
RelationConfigValidationMixin,
RelationConfigValidationRule,
)
58 changes: 58 additions & 0 deletions core/dbt/adapters/relation_configs/_factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
from typing import Dict, Type

from dbt.contracts.graph.nodes import ParsedNode
from dbt.contracts.relation import RelationType
from dbt.dataclass_schema import StrEnum
from dbt.exceptions import DbtRuntimeError

from dbt.adapters.relation_configs.config_base import RelationConfigBase
from dbt.adapters.relation_configs._materialized_view import MaterializedViewRelationConfig


class RelationConfigFactory:
"""
This provides a way to work with relation configs both in the adapter and in the jinja context.

This factory comes with a default set of settings which can be overridden in BaseAdapter.

Args:
relation_types: an enum that contains all possible relation types for this adapter
this is generally `RelationType`, but there are cases where an adapter may override
`RelationType` to include more options or exclude options
relation_configs: a map from a relation_type to a relation_config
this is generally only overridden if `relation_types` is also overridden
"""

def __init__(self, **kwargs):
# the `StrEnum` class will generally be `RelationType`, however this allows for extending that Enum
self.relation_types: Type[StrEnum] = kwargs.get("relation_types", RelationType)
self.relation_configs: Dict[StrEnum, Type[RelationConfigBase]] = kwargs.get(
"relation_configs",
{
RelationType.MaterializedView: MaterializedViewRelationConfig,
},
)

try:
for relation_type in self.relation_configs.keys():
self.relation_types(relation_type)
except AssertionError:
raise DbtRuntimeError(

Check warning on line 40 in core/dbt/adapters/relation_configs/_factory.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/adapters/relation_configs/_factory.py#L39-L40

Added lines #L39 - L40 were not covered by tests
f"Received relation configs for {relation_type} " # noqa
f"but these relation types are not registered on this factory.\n"
f" registered relation types: {', '.join(self.relation_types)}\n"
)

def make_from_node(self, node: ParsedNode) -> RelationConfigBase:
relation_type = self.relation_types(node.config.materialized)
relation_config = self._relation_config(relation_type)
return relation_config.from_node(node)

Check warning on line 49 in core/dbt/adapters/relation_configs/_factory.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/adapters/relation_configs/_factory.py#L47-L49

Added lines #L47 - L49 were not covered by tests

def _relation_config(self, relation_type: StrEnum) -> Type[RelationConfigBase]:
if relation := self.relation_configs.get(relation_type):
return relation
raise DbtRuntimeError(

Check warning on line 54 in core/dbt/adapters/relation_configs/_factory.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/adapters/relation_configs/_factory.py#L52-L54

Added lines #L52 - L54 were not covered by tests
f"This factory does not have a relation config for this type.\n"
f" received: {relation_type}\n"
f" options: {', '.join(t for t in self.relation_configs.keys())}\n"
)
7 changes: 7 additions & 0 deletions core/dbt/adapters/relation_configs/_materialized_view.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from dbt.adapters.relation_configs.config_base import RelationConfigBase


class MaterializedViewRelationConfig(RelationConfigBase):
@property
def auto_refresh(self) -> bool:
return False

Check warning on line 7 in core/dbt/adapters/relation_configs/_materialized_view.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/adapters/relation_configs/_materialized_view.py#L7

Added line #L7 was not covered by tests
52 changes: 47 additions & 5 deletions core/dbt/adapters/relation_configs/config_base.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from dataclasses import dataclass
from typing import Union, Dict
from typing import Any, Dict, Union
from typing_extensions import Self

import agate
from dbt.contracts.graph.nodes import ParsedNode
from dbt.utils import filter_null_values


Expand All @@ -24,7 +26,7 @@
@dataclass(frozen=True)
class RelationConfigBase:
@classmethod
def from_dict(cls, kwargs_dict) -> "RelationConfigBase":
def from_dict(cls, kwargs_dict: Dict[str, Any]) -> Self:
"""
This assumes the subclass of `RelationConfigBase` is flat, in the sense that no attribute is
itself another subclass of `RelationConfigBase`. If that's not the case, this should be overriden
Expand All @@ -37,8 +39,48 @@
"""
return cls(**filter_null_values(kwargs_dict)) # type: ignore

###
# Parser for internal nodes, from dbt
###

@classmethod
def from_node(cls, node: ParsedNode) -> Self:
config_dict = cls.parse_node(node)
return cls.from_dict(config_dict)

Check warning on line 49 in core/dbt/adapters/relation_configs/config_base.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/adapters/relation_configs/config_base.py#L48-L49

Added lines #L48 - L49 were not covered by tests

@classmethod
def parse_node(cls, node: ParsedNode) -> Dict[str, Any]:
# this method was originally implemented as `parse_model_node`
if hasattr(cls, "parse_model_node"):
return cls.parse_model_node(node)
return {}

Check warning on line 56 in core/dbt/adapters/relation_configs/config_base.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/adapters/relation_configs/config_base.py#L54-L56

Added lines #L54 - L56 were not covered by tests

###
# Parser for database results, generally used with `SQLAdapter`
###

@classmethod
def from_relation_results(cls, relation_results: RelationResults) -> Self:
config_dict = cls.parse_relation_results(relation_results)
return cls.from_dict(config_dict)

Check warning on line 65 in core/dbt/adapters/relation_configs/config_base.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/adapters/relation_configs/config_base.py#L64-L65

Added lines #L64 - L65 were not covered by tests

@classmethod
def parse_relation_results(cls, relation_results: RelationResults) -> Dict[str, Any]:
raise NotImplementedError(

Check warning on line 69 in core/dbt/adapters/relation_configs/config_base.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/adapters/relation_configs/config_base.py#L69

Added line #L69 was not covered by tests
"`parse_relation_results` has not been implemented for this relation_type."
)

###
# Parser for api results, generally used with `BaseAdapter`
###

@classmethod
def from_api_results(cls, api_results: Any) -> Self:
config_dict = cls.parse_api_results(api_results)
return cls.from_dict(config_dict)

Check warning on line 80 in core/dbt/adapters/relation_configs/config_base.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/adapters/relation_configs/config_base.py#L79-L80

Added lines #L79 - L80 were not covered by tests

@classmethod
def _not_implemented_error(cls) -> NotImplementedError:
return NotImplementedError(
"This relation type has not been fully configured for this adapter."
def parse_api_results(cls, api_results: Any) -> Dict[str, Any]:
raise NotImplementedError(

Check warning on line 84 in core/dbt/adapters/relation_configs/config_base.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/adapters/relation_configs/config_base.py#L84

Added line #L84 was not covered by tests
"`parse_api_results` has not been implemented for this relation_type."
)
7 changes: 6 additions & 1 deletion core/dbt/context/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
)
from typing_extensions import Protocol

from dbt.adapters.base import BaseAdapter
from dbt.adapters.base.column import Column
from dbt.adapters.factory import get_adapter, get_adapter_package_names, get_adapter_type_names
from dbt.clients import agate_helper
Expand Down Expand Up @@ -107,7 +108,7 @@
via a relation proxy.
"""

def __init__(self, adapter, namespace: MacroNamespace):
def __init__(self, adapter: BaseAdapter, namespace: MacroNamespace):
self._adapter = adapter
self.Relation = RelationProxy(adapter)
self._namespace = namespace
Expand All @@ -125,6 +126,10 @@
def commit(self):
return self._adapter.commit_if_has_connection()

@property
def relation_config_factory(self):
return self._adapter.relation_config_factory

Check warning on line 131 in core/dbt/context/providers.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/context/providers.py#L131

Added line #L131 was not covered by tests

def _get_adapter_macro_prefixes(self) -> List[str]:
# order matters for dispatch:
# 1. current adapter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,15 @@
{% set configuration_changes = get_materialized_view_configuration_changes(existing_relation, config) %}

{% if configuration_changes is none %}
{% set build_sql = refresh_materialized_view(target_relation) %}
{% set relation_config = adapter.relation_config_factory.make_from_node(config.model) %}
{% if relation_config.auto_refresh %}
{% set build_sql = '' %}
{{ exceptions.warn(
"No configuration changes were identified and `" ~ target_relation ~ "` is set to auto refresh. No action taken."
) }}
{% else %}
{% set build_sql = refresh_materialized_view(target_relation) %}
{% endif %}

{% elif on_configuration_change == 'apply' %}
{% set build_sql = get_alter_materialized_view_as_sql(target_relation, configuration_changes, sql, existing_relation, backup_relation, intermediate_relation) %}
Expand Down
8 changes: 8 additions & 0 deletions core/dbt/tests/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -631,3 +631,11 @@

def set_model_file(project, relation: BaseRelation, model_sql: str):
write_file(model_sql, project.project_root, "models", f"{relation.name}.sql")


class UtilityMethodNotImplementedError(NotImplementedError):
def __int__(self, class_name: str, method_name: str, additional_message: Optional[str] = None):
message = f"To use this test, please implement `{class_name}`.`{method_name}`."
if additional_message:
message += f" {additional_message}"
super().__init__(message)

Check warning on line 641 in core/dbt/tests/util.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/tests/util.py#L638-L641

Added lines #L638 - L641 were not covered by tests
4 changes: 1 addition & 3 deletions plugins/postgres/dbt/adapters/postgres/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,7 @@ def get_materialized_view_config_change_collection(
existing_materialized_view = PostgresMaterializedViewConfig.from_relation_results(
relation_results
)
new_materialized_view = PostgresMaterializedViewConfig.from_model_node(
runtime_config.model
)
new_materialized_view = PostgresMaterializedViewConfig.from_node(runtime_config.model)

config_change_collection.indexes = self._get_index_config_changes(
existing_materialized_view.indexes, new_materialized_view.indexes
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from dataclasses import dataclass, field
from typing import Set, FrozenSet
from typing import Any, Dict, FrozenSet, Set

import agate
from dbt.dataclass_schema import StrEnum
Expand Down Expand Up @@ -60,7 +60,7 @@ def validation_rules(self) -> Set[RelationConfigValidationRule]:
}

@classmethod
def from_dict(cls, config_dict) -> "PostgresIndexConfig":
def from_dict(cls, config_dict: Dict[str, Any]) -> "PostgresIndexConfig":
# TODO: include the QuotePolicy instead of defaulting to lower()
kwargs_dict = {
"name": config_dict.get("name"),
Expand All @@ -74,7 +74,7 @@ def from_dict(cls, config_dict) -> "PostgresIndexConfig":
return index

@classmethod
def parse_model_node(cls, model_node_entry: dict) -> dict:
def parse_node(cls, model_node_entry: Dict[str, Any]) -> Dict[str, Any]:
config_dict = {
"column_names": set(model_node_entry.get("columns", set())),
"unique": model_node_entry.get("unique"),
Expand All @@ -83,7 +83,7 @@ def parse_model_node(cls, model_node_entry: dict) -> dict:
return config_dict

@classmethod
def parse_relation_results(cls, relation_results_entry: agate.Row) -> dict:
def parse_relation_results(cls, relation_results_entry: agate.Row) -> Dict[str, Any]:
config_dict = {
"name": relation_results_entry.get("name"),
"column_names": set(relation_results_entry.get("column_names", "").split(",")),
Expand All @@ -93,7 +93,7 @@ def parse_relation_results(cls, relation_results_entry: agate.Row) -> dict:
return config_dict

@property
def as_node_config(self) -> dict:
def as_node_config(self) -> Dict[str, Any]:
"""
Returns: a dictionary that can be passed into `get_create_index_sql()`
"""
Expand Down
Loading
Loading