Skip to content

Commit

Permalink
Add tests for update_undocumented_columns_with_prior_knowledge
Browse files Browse the repository at this point in the history
  • Loading branch information
syou6162 committed Sep 21, 2023
1 parent 417b74b commit dbe9c24
Show file tree
Hide file tree
Showing 7 changed files with 334 additions and 136 deletions.
39 changes: 39 additions & 0 deletions src/dbt_osmosis/core/column_level_knowledge.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import re
from typing import Any, Dict

ColumnLevelKnowledge = Dict[str, Any]
Knowledge = Dict[str, ColumnLevelKnowledge]


def get_prior_knowledge(
knowledge: Knowledge,
column: str,
) -> ColumnLevelKnowledge:
camel_column = re.sub("_(.)", lambda m: m.group(1).upper(), column)
prior_knowledge_candidates = list(
filter(
lambda k: k,
[
knowledge.get(column),
knowledge.get(column.lower()),
knowledge.get(camel_column),
],
)
)
sorted_prior_knowledge_candidates_sources = sorted(
[k for k in prior_knowledge_candidates if k["progenitor"].startswith("source")],
key=lambda k: k["generation"],
reverse=True,
)
sorted_prior_knowledge_candidates_models = sorted(
[k for k in prior_knowledge_candidates if k["progenitor"].startswith("model")],
key=lambda k: k["generation"],
reverse=True,
)
sorted_prior_knowledge_candidates = (
sorted_prior_knowledge_candidates_sources + sorted_prior_knowledge_candidates_models
)
prior_knowledge = (
sorted_prior_knowledge_candidates[0] if sorted_prior_knowledge_candidates else {}
)
return prior_knowledge
61 changes: 56 additions & 5 deletions src/dbt_osmosis/core/column_level_knowledge_propagator.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
from typing import Any, Dict, List, Optional
from typing import Any, Dict, Iterable, List, Optional

from dbt_osmosis.vendored.dbt_core_interface.project import ManifestNode

ColumnLevelKnowledge = Dict[str, Any]
Knowledge = Dict[str, ColumnLevelKnowledge]
from dbt_osmosis.core.column_level_knowledge import (
ColumnLevelKnowledge,
Knowledge,
get_prior_knowledge,
)
from dbt_osmosis.core.log_controller import logger
from dbt_osmosis.vendored.dbt_core_interface.project import ColumnInfo, ManifestNode


def _build_node_ancestor_tree(
Expand Down Expand Up @@ -84,3 +87,51 @@ def get_node_columns_with_inherited_knowledge(
family_tree = _build_node_ancestor_tree(manifest, node)
knowledge = _inherit_column_level_knowledge(manifest, family_tree, placeholders)
return knowledge

@staticmethod
def update_undocumented_columns_with_prior_knowledge(
undocumented_columns: Iterable[str],
node: ManifestNode,
yaml_file_model_section: Dict[str, Any],
knowledge: Knowledge,
skip_add_tags: bool,
skip_merge_meta: bool,
add_progenitor_to_meta: bool,
) -> int:
"""Update undocumented columns with prior knowledge in node and model simultaneously
THIS MUTATES THE NODE AND MODEL OBJECTS so that state is always accurate"""
inheritables = ["description"]
if not skip_add_tags:
inheritables.append("tags")
if not skip_merge_meta:
inheritables.append("meta")

changes_committed = 0
for column in undocumented_columns:
prior_knowledge: ColumnLevelKnowledge = get_prior_knowledge(knowledge, column)
progenitor = prior_knowledge.pop("progenitor", None)
prior_knowledge = {k: v for k, v in prior_knowledge.items() if k in inheritables}
if add_progenitor_to_meta and progenitor:
prior_knowledge.setdefault("meta", {})
prior_knowledge["meta"]["osmosis_progenitor"] = progenitor
if not prior_knowledge:
continue
if column not in node.columns:
node.columns[column] = ColumnInfo.from_dict({"name": column, **prior_knowledge})
else:
node.columns[column] = ColumnInfo.from_dict(
dict(node.columns[column].to_dict(), **prior_knowledge)
)
for model_column in yaml_file_model_section["columns"]:
if model_column["name"] == column:
model_column.update(prior_knowledge)
changes_committed += 1
logger().info(
":light_bulb: Column %s is inheriting knowledge from the lineage of progenitor"
" (%s) for model %s",
column,
progenitor,
node.unique_id,
)
logger().info(prior_knowledge)
return changes_committed
104 changes: 15 additions & 89 deletions src/dbt_osmosis/core/osmosis.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,7 @@
from dbt.contracts.results import ColumnMetadata
from pydantic import BaseModel

from dbt_osmosis.core.column_level_knowledge_propagator import (
ColumnLevelKnowledge,
ColumnLevelKnowledgePropagator,
Knowledge,
)
from dbt_osmosis.core.column_level_knowledge_propagator import ColumnLevelKnowledgePropagator
from dbt_osmosis.core.exceptions import InvalidOsmosisConfig, MissingOsmosisConfig
from dbt_osmosis.core.log_controller import logger
from dbt_osmosis.vendored.dbt_core_interface.project import (
Expand Down Expand Up @@ -966,88 +962,6 @@ def remove_columns_not_in_database(
)
return changes_committed

@staticmethod
def get_prior_knowledge(
knowledge: Knowledge,
column: str,
) -> ColumnLevelKnowledge:
camel_column = re.sub("_(.)", lambda m: m.group(1).upper(), column)
prior_knowledge_candidates = list(
filter(
lambda k: k,
[
knowledge.get(column),
knowledge.get(column.lower()),
knowledge.get(camel_column),
],
)
)
sorted_prior_knowledge_candidates_sources = sorted(
[k for k in prior_knowledge_candidates if k["progenitor"].startswith("source")],
key=lambda k: k["generation"],
reverse=True,
)
sorted_prior_knowledge_candidates_models = sorted(
[k for k in prior_knowledge_candidates if k["progenitor"].startswith("model")],
key=lambda k: k["generation"],
reverse=True,
)
sorted_prior_knowledge_candidates = (
sorted_prior_knowledge_candidates_sources + sorted_prior_knowledge_candidates_models
)
prior_knowledge = (
sorted_prior_knowledge_candidates[0] if sorted_prior_knowledge_candidates else {}
)
return prior_knowledge

def update_undocumented_columns_with_prior_knowledge(
self,
undocumented_columns: Iterable[str],
node: ManifestNode,
yaml_file_model_section: Dict[str, Any],
) -> int:
"""Update undocumented columns with prior knowledge in node and model simultaneously
THIS MUTATES THE NODE AND MODEL OBJECTS so that state is always accurate"""
knowledge: Knowledge = (
ColumnLevelKnowledgePropagator.get_node_columns_with_inherited_knowledge(
self.manifest, node, self.placeholders
)
)

inheritables = ["description"]
if not self.skip_add_tags:
inheritables.append("tags")
if not self.skip_merge_meta:
inheritables.append("meta")

changes_committed = 0
for column in undocumented_columns:
prior_knowledge: ColumnLevelKnowledge = self.get_prior_knowledge(knowledge, column)
progenitor = prior_knowledge.pop("progenitor", "Unknown")
prior_knowledge = {k: v for k, v in prior_knowledge.items() if k in inheritables}
if not prior_knowledge:
continue
if column not in node.columns:
node.columns[column] = ColumnInfo.from_dict({"name": column, **prior_knowledge})
else:
node.columns[column].replace(kwargs={"name": column, **prior_knowledge})
for model_column in yaml_file_model_section["columns"]:
if model_column["name"] == column:
if self.add_progenitor_to_meta:
prior_knowledge.setdefault("meta", {})
prior_knowledge["meta"]["osmosis_progenitor"] = progenitor
model_column.update(prior_knowledge)
changes_committed += 1
logger().info(
":light_bulb: Column %s is inheriting knowledge from the lineage of progenitor"
" (%s) for model %s",
column,
progenitor,
node.unique_id,
)
logger().info(prior_knowledge)
return changes_committed

def update_columns_data_type(
self,
node: ManifestNode,
Expand Down Expand Up @@ -1107,8 +1021,20 @@ def update_schema_file_and_node(
n_cols_added = self.add_missing_cols_to_node_and_model(
missing_columns, node, section, columns_db_meta
)
n_cols_doc_inherited = self.update_undocumented_columns_with_prior_knowledge(
undocumented_columns, node, section

knowledge = ColumnLevelKnowledgePropagator.get_node_columns_with_inherited_knowledge(
self.manifest, node, self.placeholders
)
n_cols_doc_inherited = (
ColumnLevelKnowledgePropagator.update_undocumented_columns_with_prior_knowledge(
undocumented_columns,
node,
section,
knowledge,
self.skip_add_tags,
self.skip_merge_meta,
self.add_progenitor_to_meta,
)
)
n_cols_data_type_updated = self.update_columns_data_type(node, section, columns_db_meta)
n_cols_removed = self.remove_columns_not_in_database(extra_columns, node, section)
Expand Down
4 changes: 2 additions & 2 deletions src/dbt_osmosis/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ def organize(
skip_add_columns: bool = False,
skip_add_tags: bool = False,
skip_merge_meta: bool = False,
add_progenitor_to_meta : bool = False,
add_progenitor_to_meta: bool = False,
profile: Optional[str] = None,
vars: Optional[str] = None,
):
Expand Down Expand Up @@ -431,7 +431,7 @@ def document(
skip_add_columns: bool = False,
skip_add_tags: bool = False,
skip_merge_meta: bool = False,
add_progenitor_to_meta : bool = False,
add_progenitor_to_meta: bool = False,
profile: Optional[str] = None,
vars: Optional[str] = None,
):
Expand Down
39 changes: 13 additions & 26 deletions src/dbt_osmosis/vendored/dbt_core_interface/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,19 +86,19 @@
from dbt.adapters.factory import get_adapter_class_by_name
from dbt.clients.system import make_directory
from dbt.config.runtime import RuntimeConfig
from dbt.flags import set_from_args
from dbt.node_types import NodeType
from dbt.parser.manifest import PARTIAL_PARSE_FILE_NAME, ManifestLoader, process_node
from dbt.parser.sql import SqlBlockParser, SqlMacroParser
from dbt.task.sql import SqlCompileRunner
from dbt.tracking import disable_tracking
from dbt.flags import set_from_args

# brute force import for dbt 1.3 back-compat
# these are here for consumers of dbt-core-interface
try:
# dbt <= 1.3
from dbt.contracts.graph.parsed import ColumnInfo # type: ignore
from dbt.contracts.graph.compiled import ManifestNode # type: ignore
from dbt.contracts.graph.parsed import ColumnInfo # type: ignore
except Exception:
# dbt > 1.3
from dbt.contracts.graph.nodes import ColumnInfo, ManifestNode # type: ignore
Expand Down Expand Up @@ -601,9 +601,7 @@ def get_ref_node(self, target_model_name: str) -> "ManifestNode":
),
)

def get_source_node(
self, target_source_name: str, target_table_name: str
) -> "ManifestNode":
def get_source_node(self, target_source_name: str, target_table_name: str) -> "ManifestNode":
"""Get a `ManifestNode` from a dbt project source name and table name.
This is the same as one would in a {{ source(...) }} macro call.
Expand Down Expand Up @@ -1663,10 +1661,8 @@ def get_config(self, key, default=None):
0,
13,
"Route.get_config() is deprecated.",
(
"The Route.config property already includes values from the"
" application config for missing keys. Access it directly."
),
"The Route.config property already includes values from the"
" application config for missing keys. Access it directly.",
)
return self.config.get(key, default)

Expand Down Expand Up @@ -1702,21 +1698,17 @@ def __init__(self, **kwargs):
0,
13,
"Bottle(catchall) keyword argument.",
(
"The 'catchall' setting is now part of the app "
"configuration. Fix: `app.config['catchall'] = False`"
),
"The 'catchall' setting is now part of the app "
"configuration. Fix: `app.config['catchall'] = False`",
)
self.config["catchall"] = False
if kwargs.get("autojson") is False:
depr(
0,
13,
"Bottle(autojson) keyword argument.",
(
"The 'autojson' setting is now part of the app "
"configuration. Fix: `app.config['json.enable'] = False`"
),
"The 'autojson' setting is now part of the app "
"configuration. Fix: `app.config['json.enable'] = False`",
)
self.config["json.disable"] = True

Expand Down Expand Up @@ -1846,10 +1838,8 @@ def _mount_app(self, prefix, app, **options):
0,
13,
"Prefix must end in '/'. Falling back to WSGI mount.",
(
"Consider adding an explicit redirect from '/prefix' to '/prefix/' in the"
" parent application."
),
"Consider adding an explicit redirect from '/prefix' to '/prefix/' in the"
" parent application.",
)
return self._mount_wsgi(prefix, app, **options)

Expand Down Expand Up @@ -5517,8 +5507,7 @@ def wrapper(*args, **kwargs):
_HTTP_STATUS_LINES = {k: "%d %s" % (k, v) for (k, v) in HTTP_CODES.items()}

#: The default template used for error pages. Override with @error()
ERROR_PAGE_TEMPLATE = (
"""
ERROR_PAGE_TEMPLATE = """
%%try:
%%from %s import DEBUG, request
<!DOCTYPE HTML PUBLIC "-//IETF//DTD HTML 2.0//EN">
Expand Down Expand Up @@ -5556,9 +5545,7 @@ def wrapper(*args, **kwargs):
<b>ImportError:</b> Could not generate the error page. Please add bottle to
the import path.
%%end
"""
% __name__
)
""" % __name__

#: A thread-safe instance of :class:`LocalRequest`. If accessed from within a
#: request callback, this instance always refers to the *current* request
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from dbt.contracts.graph.manifest import Manifest
from dbt_osmosis.core.osmosis import DbtYamlManager
from dbt_osmosis.core.column_level_knowledge import get_prior_knowledge


class TestDbtYamlManager:
Expand All @@ -17,7 +16,7 @@ def test_get_prior_knowledge(test):
},
}
assert (
DbtYamlManager.get_prior_knowledge(knowledge, "my_column")["progenitor"]
get_prior_knowledge(knowledge, "my_column")["progenitor"]
== "source.my_model.source.Order"
)

Expand All @@ -35,6 +34,5 @@ def test_get_prior_knowledge_with_camel_case(test):
},
}
assert (
DbtYamlManager.get_prior_knowledge(knowledge, "my_column")["progenitor"]
== "model.my_model.dwh.Order"
get_prior_knowledge(knowledge, "my_column")["progenitor"] == "model.my_model.dwh.Order"
)
Loading

0 comments on commit dbe9c24

Please sign in to comment.