From d8a0badc2aff21c930d3212fb51b40a24c97946a Mon Sep 17 00:00:00 2001 From: z3z1ma Date: Fri, 27 Dec 2024 14:06:58 -0700 Subject: [PATCH] chore!: start refactor --- src/dbt_osmosis/core/audit_macros.jinja2 | 452 ----------------------- src/dbt_osmosis/core/diff.py | 188 ---------- src/dbt_osmosis/core/macros.py | 14 - src/dbt_osmosis/core/osmosis.py | 140 ++++--- 4 files changed, 66 insertions(+), 728 deletions(-) delete mode 100644 src/dbt_osmosis/core/audit_macros.jinja2 delete mode 100644 src/dbt_osmosis/core/diff.py delete mode 100644 src/dbt_osmosis/core/macros.py diff --git a/src/dbt_osmosis/core/audit_macros.jinja2 b/src/dbt_osmosis/core/audit_macros.jinja2 deleted file mode 100644 index 3137bf9..0000000 --- a/src/dbt_osmosis/core/audit_macros.jinja2 +++ /dev/null @@ -1,452 +0,0 @@ -{% macro _dbt_osmosis_compare_queries(a_query, b_query, primary_key=None) -%} -{{ - return( - adapter.dispatch("_dbt_osmosis_compare_queries")( - a_query, b_query, primary_key - ) - ) -}} -{%- endmacro %} - -{% macro default___dbt_osmosis_compare_queries(a_query, b_query, primary_key=None) %} - -with - - a as ({{ a_query }}), - - b as ({{ b_query }}), - - a_intersect_b as (select * from a {{ dbt_utils.intersect() }} select * from b), - - a_except_b as (select * from a {{ dbt_utils.except() }} select * from b), - - b_except_a as (select * from b {{ dbt_utils.except() }} select * from a), - - all_records as ( - - select *, true as in_a, true as in_b - from a_intersect_b - - union all - - select *, true as in_a, false as in_b - from a_except_b - - union all - - select *, false as in_a, true as in_b - from b_except_a - - ) - -select * -from all_records -where not (in_a and in_b) -order by {{ primary_key ~ ", " if primary_key is not none }} in_a desc, in_b desc - -{% endmacro %} - - -- - - {% macro _dbt_osmosis_compare_queries_agg(a_query, b_query, primary_key=None) -%} - {{ - return( - adapter.dispatch("_dbt_osmosis_compare_queries_agg")( - a_query, b_query, primary_key - ) - ) - }} - {%- endmacro %} - -{% macro default___dbt_osmosis_compare_queries_agg( - a_query, b_query, primary_key=None -) %} - -with - - a as ({{ a_query }}), - - b as ({{ b_query }}), - - a_intersect_b as (select * from a {{ dbt_utils.intersect() }} select * from b), - - a_except_b as (select * from a {{ dbt_utils.except() }} select * from b), - - b_except_a as (select * from b {{ dbt_utils.except() }} select * from a), - - all_records as ( - - select *, true as in_a, true as in_b - from a_intersect_b - - union all - - select *, true as in_a, false as in_b - from a_except_b - - union all - - select *, false as in_a, true as in_b - from b_except_a - - ), - - summary_stats as ( - select in_a, in_b, count(*) as count from all_records group by 1, 2 - ) - -select *, round(100.0 * count / sum(count) over (), 2) as percent_of_total - -from summary_stats -order by in_a desc, in_b desc - -{% endmacro %} - - -- - - {% macro _dbt_osmosis_pop_columns(columns, columns_to_pop) %} - {% set popped_columns = [] %} - - {% for column in columns %} - {% if column.name | lower not in columns_to_pop | lower %} - {% do popped_columns.append(column) %} - {% endif %} - {% endfor %} - - {{ return(popped_columns) }} - {% endmacro %} - - -- - - {% macro _dbt_osmosis_compare_relations( - a_relation, b_relation, exclude_columns=[], primary_key=none -) %} - - {%- set a_columns = adapter.get_columns_in_relation(a_relation) -%} - - {% set check_columns = _dbt_osmosis_pop_columns(a_columns, exclude_columns) %} - - {% set check_cols_csv = check_columns | map(attribute="quoted") | join(", ") %} - -{% set a_query %} -select - {{ check_cols_csv }} - {% if primary_key is none %}, {{ hash(check_cols_csv) }} as _pk{% endif %} - -from {{ a_relation }} -{% endset %} - -{% set b_query %} -select - {{ check_cols_csv }} - {% if primary_key is none %}, {{ hash(check_cols_csv) }} as _pk{% endif %} - -from {{ b_relation }} -{% endset %} - - {{ _dbt_osmosis_compare_queries(a_query, b_query, primary_key or "_pk") }} - - {% endmacro %} - - -- - - {% macro _dbt_osmosis_compare_relations_agg( - a_relation, b_relation, exclude_columns=[], primary_key=none -) %} - - {%- set a_columns = adapter.get_columns_in_relation(a_relation) -%} - - {% set check_columns = _dbt_osmosis_pop_columns(a_columns, exclude_columns) %} - - {% set check_cols_csv = check_columns | map(attribute="quoted") | join(", ") %} - -{% set a_query %} -select - {{ check_cols_csv }} - {% if primary_key is none %}, {{ hash(check_cols_csv) }} as _pk{% endif %} - -from {{ a_relation }} -{% endset %} - -{% set b_query %} -select - {{ check_cols_csv }} - {% if primary_key is none %}, {{ hash(check_cols_csv) }} as _pk{% endif %} - -from {{ b_relation }} -{% endset %} - - {{ _dbt_osmosis_compare_queries_agg(a_query, b_query, primary_key or "_pk") }} - - {% endmacro %} - - -- - - {% macro _dbt_osmosis_compare_relation_columns(a_relation, b_relation) %} - {{ - return( - adapter.dispatch("_dbt_osmosis_compare_relation_columns")( - a_relation, b_relation - ) - ) - }} - {% endmacro %} - -{% macro default___dbt_osmosis_compare_relation_columns(a_relation, b_relation) %} - -with - - a_cols as ({{ get_columns_in_relation_sql_dosmo(a_relation) }}), - - b_cols as ({{ get_columns_in_relation_sql_dosmo(b_relation) }}) - -select - column_name, - a_cols.ordinal_position as a_ordinal_position, - b_cols.ordinal_position as b_ordinal_position, - a_cols.data_type as a_data_type, - b_cols.data_type as b_data_type, - coalesce( - a_cols.ordinal_position = b_cols.ordinal_position, false - ) as has_ordinal_position_match, - coalesce(a_cols.data_type = b_cols.data_type, false) as has_data_type_match -from a_cols -full outer join b_cols using (column_name) -order by coalesce(a_cols.ordinal_position, b_cols.ordinal_position) - -{% endmacro %} - - -- - - {% macro get_columns_in_relation_sql_dosmo(relation) %} - - {{ adapter.dispatch("get_columns_in_relation_sql_dosmo")(relation) }} - - {% endmacro %} - -{% macro redshift__get_columns_in_relation_sql_dosmo(relation) %} -{#- -See https://github.com/dbt-labs/dbt/blob/23484b18b71010f701b5312f920f04529ceaa6b2/plugins/redshift/dbt/include/redshift/macros/adapters.sql#L71 -Edited to include ordinal_position --#} -with - - bound_views as ( - select - ordinal_position, - table_schema, - column_name, - data_type, - character_maximum_length, - numeric_precision, - numeric_scale - - from information_schema."columns" - where table_name = '{{ relation.identifier }}' - ), - - unbound_views as ( - select - ordinal_position, - view_schema, - col_name, - case - when col_type ilike 'character varying%' - then 'character varying' - when col_type ilike 'numeric%' - then 'numeric' - else col_type - end as col_type, - case - when col_type like 'character%' - then nullif(regexp_substr(col_type, '[0-9]+'), '')::int - else null - end as character_maximum_length, - case - when col_type like 'numeric%' - then - nullif( - split_part(regexp_substr(col_type, '[0-9,]+'), ',', 1), '' - )::int - else null - end as numeric_precision, - case - when col_type like 'numeric%' - then - nullif( - split_part(regexp_substr(col_type, '[0-9,]+'), ',', 2), '' - )::int - else null - end as numeric_scale - - from - pg_get_late_binding_view_cols() - cols( - view_schema name, - view_name name, - col_name name, - col_type varchar, - ordinal_position int - ) - where view_name = '{{ relation.identifier }}' - ), - - unioned as ( - select * - from bound_views - union all - select * - from unbound_views - ) - -select * - -from unioned -{% if relation.schema %} where table_schema = '{{ relation.schema }}' {% endif %} -order by ordinal_position - -{% endmacro %} - -{% macro snowflake__get_columns_in_relation_sql_dosmo(relation) %} -{#- -From: https://github.com/dbt-labs/dbt/blob/dev/louisa-may-alcott/plugins/snowflake/dbt/include/snowflake/macros/adapters.sql#L48 -Edited to include ordinal_position --#} -select - ordinal_position, - column_name, - data_type, - character_maximum_length, - numeric_precision, - numeric_scale - -from {{ relation.information_schema("columns") }} - -where - table_name ilike '{{ relation.identifier }}' - {% if relation.schema %} and table_schema ilike '{{ relation.schema }}' {% endif %} - {% if relation.database %} - and table_catalog ilike '{{ relation.database }}' - {% endif %} -order by ordinal_position -{% endmacro %} - -{% macro postgres__get_columns_in_relation_sql_dosmo(relation) %} -{#- -From: https://github.com/dbt-labs/dbt/blob/23484b18b71010f701b5312f920f04529ceaa6b2/plugins/postgres/dbt/include/postgres/macros/adapters.sql#L32 -Edited to include ordinal_position --#} -select - ordinal_position, - column_name, - data_type, - character_maximum_length, - numeric_precision, - numeric_scale - -from {{ relation.information_schema("columns") }} -where - table_name = '{{ relation.identifier }}' - {% if relation.schema %} and table_schema = '{{ relation.schema }}' {% endif %} -order by ordinal_position -{% endmacro %} - - -{% macro bigquery__get_columns_in_relation_sql_dosmo(relation) %} - -select ordinal_position, column_name, data_type - -from `{{ relation.database }}`.`{{ relation.schema }}`.information_schema.columns -where table_name = '{{ relation.identifier }}' - -{% endmacro %} - - -- - - {% macro _dbt_osmosis_compare_column_values( - a_query, b_query, primary_key, column_to_compare -) -%} - {{ - return( - adapter.dispatch("_dbt_osmosis_compare_column_values")( - a_query, b_query, primary_key, column_to_compare - ) - ) - }} - {%- endmacro %} - -{% macro default___dbt_osmosis_compare_column_values( - a_query, b_query, primary_key, column_to_compare -) -%} -with - - a_query as ({{ a_query }}), - - b_query as ({{ b_query }}), - - joined as ( - select - coalesce( - a_query.{{ primary_key }}, b_query.{{ primary_key }} - ) as {{ primary_key }}, - a_query.{{ column_to_compare }} as a_query_value, - b_query.{{ column_to_compare }} as b_query_value, - case - when a_query.{{ column_to_compare }} = b_query.{{ column_to_compare }} - then '✅: perfect match' - when - a_query.{{ column_to_compare }} is null - and b_query.{{ column_to_compare }} is null - then '✅: both are null' - when a_query.{{ primary_key }} is null - then '🤷: ‍missing from a' - when b_query.{{ primary_key }} is null - then '🤷: missing from b' - when a_query.{{ column_to_compare }} is null - then '🤷: value is null in a only' - when b_query.{{ column_to_compare }} is null - then '🤷: value is null in b only' - when a_query.{{ column_to_compare }} != b_query.{{ column_to_compare }} - then '🙅: ‍values do not match' - else 'unknown' -- this should never happen - end as match_status, - case - when a_query.{{ column_to_compare }} = b_query.{{ column_to_compare }} - then 0 - when - a_query.{{ column_to_compare }} is null - and b_query.{{ column_to_compare }} is null - then 1 - when a_query.{{ primary_key }} is null - then 2 - when b_query.{{ primary_key }} is null - then 3 - when a_query.{{ column_to_compare }} is null - then 4 - when b_query.{{ column_to_compare }} is null - then 5 - when a_query.{{ column_to_compare }} != b_query.{{ column_to_compare }} - then 6 - else 7 -- this should never happen - end as match_order - - from a_query - - full outer join b_query on a_query.{{ primary_key }} = b_query.{{ primary_key }} - ), - - aggregated as ( - select - '{{ column_to_compare }}' as column_name, - match_status, - match_order, - count(*) as count_records - from joined - - group by column_name, match_status, match_order - ) - -select - column_name, - match_status, - count_records, - round(100.0 * count_records / sum(count_records) over (), 2) as percent_of_total - -from aggregated - -order by match_order - -{% endmacro %} diff --git a/src/dbt_osmosis/core/diff.py b/src/dbt_osmosis/core/diff.py deleted file mode 100644 index 7ce4f0f..0000000 --- a/src/dbt_osmosis/core/diff.py +++ /dev/null @@ -1,188 +0,0 @@ -import hashlib -from pathlib import Path -from typing import Tuple - -import agate -from dbt.adapters.base.relation import BaseRelation -from git import Repo - -from dbt_osmosis.core.log_controller import logger -from dbt_osmosis.vendored.dbt_core_interface.project import DbtProject - - -def build_diff_queries(model: str, runner: DbtProject) -> Tuple[str, str]: - """Leverage git to build two temporary tables for diffing the results of a query - throughout a change - """ - # Resolve git node - node = runner.get_ref_node(model) - dbt_path = Path(node.root_path) - repo = Repo(dbt_path, search_parent_directories=True) - t = next(Path(repo.working_dir).rglob(node.original_file_path)).relative_to(repo.working_dir) - sha = repo.head.object.hexsha - target = repo.head.object.tree[str(t)] - - # Create original node - git_node_name = "z_" + sha[-7:] - original_node = runner.get_server_node(target.data_stream.read().decode("utf-8"), git_node_name) - - # Alias changed node - changed_node = node - - # Compile models - original_node = runner.compile_node(original_node) - changed_node = runner.compile_node(changed_node) - - return original_node.compiled_sql, changed_node.compiled_sql - - -def build_diff_tables(model: str, runner: DbtProject) -> Tuple[BaseRelation, BaseRelation]: - """Leverage git to build two temporary tables for diffing the results of a query throughout a change""" - # Resolve git node - node = runner.get_ref_node(model) - dbt_path = Path(node.root_path) - repo = Repo(dbt_path, search_parent_directories=True) - t = next(Path(repo.working_dir).rglob(node.original_file_path)).relative_to(repo.working_dir) - sha = repo.head.object.hexsha - target = repo.head.object.tree[str(t)] - - # Create original node - git_node_name = "z_" + sha[-7:] - original_node = runner.get_server_node(target.data_stream.read().decode("utf-8"), git_node_name) - - # Alias changed node - changed_node = node - - # Compile models - original_node = runner.compile_node(original_node).node - changed_node = runner.compile_node(changed_node).node - - # Lookup and resolve original ref based on git sha - git_node_parts = original_node.database, "dbt_diff", git_node_name - ref_A, did_exist = runner.get_or_create_relation(*git_node_parts) - if not did_exist: - logger().info("Creating new relation for %s", ref_A) - with runner.adapter.connection_named("dbt-osmosis"): - runner.execute_macro( - "create_schema", - kwargs={"relation": ref_A}, - ) - runner.execute_macro( - "create_table_as", - kwargs={ - "sql": original_node.compiled_sql, - "relation": ref_A, - "temporary": True, - }, - run_compiled_sql=True, - ) - - # Resolve modified fake ref based on hash of it compiled SQL - temp_node_name = "z_" + hashlib.md5(changed_node.compiled_sql.encode("utf-8")).hexdigest()[-7:] - git_node_parts = original_node.database, "dbt_diff", temp_node_name - ref_B, did_exist = runner.get_or_create_relation(*git_node_parts) - if not did_exist: - ref_B = runner.adapter.Relation.create(*git_node_parts) - logger().info("Creating new relation for %s", ref_B) - with runner.adapter.connection_named("dbt-osmosis"): - runner.execute_macro( - "create_schema", - kwargs={"relation": ref_B}, - ) - runner.execute_macro( - "create_table_as", - kwargs={ - "sql": original_node.compiled_sql, - "relation": ref_B, - "temporary": True, - }, - run_compiled_sql=True, - ) - - return ref_A, ref_B - - -def diff_tables( - ref_A: BaseRelation, - ref_B: BaseRelation, - pk: str, - runner: DbtProject, - aggregate: bool = True, -) -> agate.Table: - logger().info("Running diff") - _, table = runner.adapter_execute( - runner.execute_macro( - "_dbt_osmosis_compare_relations_agg" if aggregate else "_dbt_osmosis_compare_relations", - kwargs={ - "a_relation": ref_A, - "b_relation": ref_B, - "primary_key": pk, - }, - ), - auto_begin=True, - fetch=True, - ) - return table - - -def diff_queries( - sql_A: str, sql_B: str, pk: str, runner: DbtProject, aggregate: bool = True -) -> agate.Table: - logger().info("Running diff") - _, table = runner.adapter_execute( - runner.execute_macro( - "_dbt_osmosis_compare_queries_agg" if aggregate else "_dbt_osmosis_compare_queries", - kwargs={ - "a_query": sql_A, - "b_query": sql_B, - "primary_key": pk, - }, - ), - auto_begin=True, - fetch=True, - ) - return table - - -def diff_and_print_to_console( - model: str, - pk: str, - runner: DbtProject, - make_temp_tables: bool = False, - agg: bool = True, - output: str = "table", -) -> None: - """ - Compare two tables and print the results to the console - """ - if make_temp_tables: - table = diff_tables(*build_diff_tables(model, runner), pk, runner, agg) - else: - table = diff_queries(*build_diff_queries(model, runner), pk, runner, agg) - print("") - output = output.lower() - if output == "table": - table.print_table() - elif output in ("chart", "bar"): - if not agg: - logger().warn( - "Cannot render output format chart with --no-agg option, defaulting to table" - ) - table.print_table() - else: - _table = table.compute( - [ - ( - "in_original, in_changed", - agate.Formula(agate.Text(), lambda r: "%(in_a)s, %(in_b)s" % r), - ) - ] - ) - _table.print_bars( - label_column_name="in_original, in_changed", value_column_name="count" - ) - elif output == "csv": - table.to_csv("dbt-osmosis-diff.csv") - else: - logger().warn("No such output format %s, defaulting to table", output) - table.print_table() diff --git a/src/dbt_osmosis/core/macros.py b/src/dbt_osmosis/core/macros.py deleted file mode 100644 index ba788b4..0000000 --- a/src/dbt_osmosis/core/macros.py +++ /dev/null @@ -1,14 +0,0 @@ -from pathlib import Path - -from dbt_osmosis.core.log_controller import logger -from dbt_osmosis.core.osmosis import DbtProject - - -def inject_macros(dbt: DbtProject) -> None: - logger().info("Injecting macros, please wait...") - macro_overrides = {} - for node in dbt.macro_parser.parse_remote( - (Path(__file__).parent / "audit_macros.jinja2").read_text() - ): - macro_overrides[node.unique_id] = node - dbt.dbt.macros.update(macro_overrides) diff --git a/src/dbt_osmosis/core/osmosis.py b/src/dbt_osmosis/core/osmosis.py index b5fa875..a4d5c05 100644 --- a/src/dbt_osmosis/core/osmosis.py +++ b/src/dbt_osmosis/core/osmosis.py @@ -1,25 +1,16 @@ import json import os import re +import sys +import typing as t from collections import OrderedDict +from collections.abc import Iterable, Iterator, MutableMapping, Sequence from concurrent.futures import ThreadPoolExecutor, wait from dataclasses import dataclass, field from functools import lru_cache from itertools import chain from pathlib import Path from threading import Lock -from typing import ( - Any, - Dict, - Iterable, - Iterator, - List, - MutableMapping, - Optional, - Sequence, - Set, - Tuple, -) import ruamel.yaml from dbt.adapters.base.column import Column @@ -53,7 +44,7 @@ def __init__(self, **kwargs) -> None: @dataclass class SchemaFileLocation: target: Path - current: Optional[Path] = None + current: Path | None = None node_type: NodeType = NodeType.Model @property @@ -63,15 +54,16 @@ def is_valid(self) -> bool: @dataclass class SchemaFileMigration: - output: Dict[str, Any] = field(default_factory=dict) - supersede: Dict[Path, List[str]] = field(default_factory=dict) + output: dict[str, t.Any] = field(default_factory=dict) + supersede: dict[Path, list[str]] = field(default_factory=dict) +@(t.final if sys.version_info >= (3, 8) else lambda f: f) class DbtYamlManager(DbtProject): """The DbtYamlManager class handles developer automation tasks surrounding schema yaml files organziation, documentation, and coverage.""" - audit_report = """ + audit_report: t.ClassVar[str] = """ :white_check_mark: [bold]Audit Report[/bold] ------------------------------- @@ -91,7 +83,7 @@ class DbtYamlManager(DbtProject): # TODO: Let user supply a custom arg / config file / csv of strings which we # consider placeholders which are not valid documentation, these are just my own # We may well drop the placeholder concept too. It is just a convenience for refactors - placeholders = [ + placeholders: t.ClassVar[list[str]] = [ "Pending further documentation", "Pending further documentation.", "No description for this column", @@ -105,18 +97,18 @@ class DbtYamlManager(DbtProject): # NOTE: we use an arbitrarily large TTL since the YAML manager is not # a long-running service which needs to periodically invalidate and refresh - ADAPTER_TTL = 1e9 + ADAPTER_TTL: t.ClassVar[float] = 1e9 def __init__( self, - target: Optional[str] = None, - profiles_dir: Optional[str] = None, - project_dir: Optional[str] = None, - catalog_file: Optional[str] = None, - threads: Optional[int] = 1, - fqn: Optional[str] = None, + target: str | None = None, + profiles_dir: str | None = None, + project_dir: str | None = None, + catalog_file: str | None = None, + threads: int | None = 1, + fqn: str | None = None, dry_run: bool = False, - models: Optional[List[str]] = None, + models: list[str] | None = None, skip_add_columns: bool = False, skip_add_tags: bool = False, skip_add_data_types: bool = False, @@ -124,19 +116,19 @@ def __init__( char_length: bool = False, skip_merge_meta: bool = False, add_progenitor_to_meta: bool = False, - vars: Optional[str] = None, + vars: str | None = None, use_unrendered_descriptions: bool = False, - profile: Optional[str] = None, - add_inheritance_for_specified_keys: Optional[List[str]] = None, + profile: str | None = None, + add_inheritance_for_specified_keys: list[str] | None = None, output_to_lower: bool = False, ): """Initializes the DbtYamlManager class.""" - super().__init__(target, profiles_dir, project_dir, threads, vars=vars, profile=profile) + super().__init__(target, profiles_dir, project_dir, threads, vars=vars, profile=profile) # pyright: ignore[reportArgumentType] self.fqn = fqn self.models = models or [] self.dry_run = dry_run self.catalog_file = catalog_file - self._catalog: Optional[CatalogArtifact] = None + self._catalog: CatalogArtifact | None = None self.skip_add_columns = skip_add_columns self.skip_add_tags = skip_add_tags self.skip_add_data_types = skip_add_data_types @@ -156,7 +148,7 @@ def __init__( ) logger().info( "Please supply a valid fqn segment if using --fqn or a valid model name, path, or" - " subpath if using positional arguments" + + " subpath if using positional arguments" ) exit(0) @@ -240,14 +232,14 @@ def _filter_model(self, node: ManifestNode) -> bool: ) @staticmethod - def get_patch_path(node: ManifestNode) -> Optional[Path]: + def get_patch_path(node: ManifestNode) -> Path | None: """Returns the patch path for a node if it exists""" if node is not None and node.patch_path: return as_path(node.patch_path.split("://")[-1]) def filtered_models( - self, subset: Optional[MutableMapping[str, ManifestNode]] = None - ) -> Iterator[Tuple[str, ManifestNode]]: + self, subset: MutableMapping[str, ManifestNode] | None = None + ) -> Iterator[tuple[str, ManifestNode]]: """Generates an iterator of valid models""" for unique_id, dbt_node in ( subset.items() @@ -257,7 +249,7 @@ def filtered_models( if self._filter_model(dbt_node): yield unique_id, dbt_node - def get_osmosis_path_spec(self, node: ManifestNode) -> Optional[str]: + def get_osmosis_path_spec(self, node: ManifestNode) -> str | None: """Validates a config string. If input is a source, we return the resource type str instead @@ -286,7 +278,7 @@ def get_node_path(self, node: ManifestNode): """Resolve absolute file path for a manifest node""" return as_path(self.config.project_root, node.original_file_path).resolve() - def get_schema_path(self, node: ManifestNode) -> Optional[Path]: + def get_schema_path(self, node: ManifestNode) -> Path | None: """Resolve absolute schema file path for a manifest node""" schema_path = None if node.resource_type == NodeType.Model and node.patch_path: @@ -327,7 +319,7 @@ def get_catalog_key(node: ManifestNode) -> CatalogKey: return CatalogKey(node.database, node.schema, getattr(node, "identifier", node.name)) return CatalogKey(node.database, node.schema, getattr(node, "alias", node.name)) - def get_base_model(self, node: ManifestNode, output_to_lower: bool) -> Dict[str, Any]: + def get_base_model(self, node: ManifestNode, output_to_lower: bool) -> dict[str, t.Any]: """Construct a base model object with model name, column names populated from database""" columns = self.get_columns(self.get_catalog_key(node), output_to_lower) return { @@ -336,10 +328,10 @@ def get_base_model(self, node: ManifestNode, output_to_lower: bool) -> Dict[str, } def augment_existing_model( - self, documentation: Dict[str, Any], node: ManifestNode, output_to_lower: bool - ) -> Dict[str, Any]: + self, documentation: dict[str, t.Any], node: ManifestNode, output_to_lower: bool + ) -> dict[str, t.Any]: """Injects columns from database into existing model if not found""" - model_columns: List[str] = [c["name"] for c in documentation.get("columns", [])] + model_columns: list[str] = [c["name"] for c in documentation.get("columns", [])] database_columns = self.get_columns(self.get_catalog_key(node), output_to_lower) for column in ( c for c in database_columns if not any(c.lower() == m.lower() for m in model_columns) @@ -357,13 +349,13 @@ def augment_existing_model( ) return documentation - def get_columns(self, catalog_key: CatalogKey, output_to_lower: bool) -> List[str]: + def get_columns(self, catalog_key: CatalogKey, output_to_lower: bool) -> list[str]: """Get all columns in a list for a model""" return list(self.get_columns_meta(catalog_key, output_to_lower).keys()) @property - def catalog(self) -> Optional[CatalogArtifact]: + def catalog(self) -> CatalogArtifact | None: """Get the catalog data from the catalog file Catalog data is cached in memory to avoid reading and parsing the file multiple times @@ -391,13 +383,13 @@ def _get_column_type(self, column: Column) -> str: @lru_cache(maxsize=5000) def get_columns_meta( self, catalog_key: CatalogKey, output_to_lower: bool = False - ) -> Dict[str, ColumnMetadata]: + ) -> dict[str, ColumnMetadata]: """Get all columns in a list for a model""" columns = OrderedDict() blacklist = self.config.vars.vars.get("dbt-osmosis", {}).get("_blacklist", []) # If we provide a catalog, we read from it if self.catalog: - matching_models_or_sources: List[CatalogTable] = [ + matching_models_or_sources: list[CatalogTable] = [ model_or_source_values for model_or_source, model_or_source_values in dict( **self.catalog.nodes, **self.catalog.sources @@ -548,7 +540,7 @@ def bootstrap_sources(self, output_to_lower: bool = False) -> None: logger().info("...reloading project to pick up new sources") self.safe_parse_project(reinit=True) - def build_schema_folder_mapping(self, output_to_lower: bool) -> Dict[str, SchemaFileLocation]: + def build_schema_folder_mapping(self, output_to_lower: bool) -> dict[str, SchemaFileLocation]: """Builds a mapping of models or sources to their existing and target schema file paths""" # Resolve target nodes @@ -605,8 +597,8 @@ def _draft( # Model/Source Is Documented but Must be Migrated with self.mutex: schema = self.yaml_handler.load(schema_file.current) - models_in_file: Sequence[Dict[str, Any]] = schema.get("models", []) - sources_in_file: Sequence[Dict[str, Any]] = schema.get("sources", []) + models_in_file: Sequence[dict[str, t.Any]] = schema.get("models", []) + sources_in_file: Sequence[dict[str, t.Any]] = schema.get("sources", []) for documented_model in ( model for model in models_in_file if model["name"] == node.name ): @@ -669,7 +661,7 @@ def _draft( def draft_project_structure_update_plan( self, output_to_lower: bool = False - ) -> Dict[Path, SchemaFileMigration]: + ) -> dict[Path, SchemaFileMigration]: """Build project structure update plan based on `dbt-osmosis:` configs set across dbt_project.yml and model files. The update plan includes injection of undocumented models. Unless this plan is constructed and executed by the `commit_project_restructure` function, @@ -682,7 +674,7 @@ def draft_project_structure_update_plan( """ # Container for output - blueprint: Dict[Path, SchemaFileMigration] = {} + blueprint: dict[Path, SchemaFileMigration] = {} logger().info( ":chart_increasing: Searching project stucture for required updates and building action" " plan" @@ -711,7 +703,7 @@ def cleanup_blueprint(self, blueprint: dict) -> None: def commit_project_restructure_to_disk( self, - blueprint: Optional[Dict[Path, SchemaFileMigration]] = None, + blueprint: dict[Path, SchemaFileMigration] | None = None, output_to_lower: bool = False, ) -> bool: """Given a project restrucure plan of pathlib Paths to a mapping of output and supersedes @@ -720,9 +712,9 @@ def commit_project_restructure_to_disk( as needed. Args: - blueprint (Dict[Path, SchemaFileMigration]): Project restructure plan as typically + blueprint (dict[Path, SchemaFileMigration]): Project restructure plan as typically created by `build_project_structure_update_plan` - output_to_lower (bool): Set column casing to lowercase. + output_to_lower (bool): set column casing to lowercase. Returns: bool: True if the project was restructured, False if no action was required @@ -759,7 +751,7 @@ def commit_project_restructure_to_disk( else: # Update File logger().info(":toolbox: Updating schema file %s", target.name) - target_schema: Optional[Dict[str, Any]] = self.yaml_handler.load(target) + target_schema: dict[str, t.Any] | None = self.yaml_handler.load(target) # Add version if not present if not target_schema: target_schema = {"version": 2} @@ -776,7 +768,7 @@ def commit_project_restructure_to_disk( # Clean superseded schema files for dir, nodes in structure.supersede.items(): - raw_schema: Dict[str, Any] = self.yaml_handler.load(dir) + raw_schema: dict[str, t.Any] = self.yaml_handler.load(dir) # Gather models and sources marked for superseding models_marked_for_superseding = set( node.name for node in nodes if node.resource_type == NodeType.Model @@ -793,7 +785,7 @@ def commit_project_restructure_to_disk( for s in raw_schema.get("sources", []) for t in s.get("tables", []) ) - # Set difference to determine non-superseded models and sources + # set difference to determine non-superseded models and sources non_superseded_models = models_in_schema - models_marked_for_superseding non_superseded_sources = sources_in_schema - sources_marked_for_superseding if len(non_superseded_models) + len(non_superseded_sources) == 0: @@ -835,7 +827,7 @@ def commit_project_restructure_to_disk( return True @staticmethod - def pretty_print_restructure_plan(blueprint: Dict[Path, SchemaFileMigration]) -> None: + def pretty_print_restructure_plan(blueprint: dict[Path, SchemaFileMigration]) -> None: logger().info( list( map( @@ -854,7 +846,7 @@ def get_column_sets( database_columns: Iterable[str], yaml_columns: Iterable[str], documented_columns: Iterable[str], - ) -> Tuple[List[str], List[str], List[str]]: + ) -> tuple[list[str], list[str], list[str]]: """Returns: missing_columns: Columns in database not in dbt -- will be injected into schema file undocumented_columns: Columns missing documentation -- descriptions will be inherited and @@ -876,7 +868,7 @@ def _run( self, unique_id: str, node: ManifestNode, - schema_map: Dict[str, SchemaFileLocation], + schema_map: dict[str, SchemaFileLocation], force_inheritance: bool = False, output_to_lower: bool = False, ): @@ -884,7 +876,7 @@ def _run( with self.mutex: logger().info(":point_right: Processing model: [bold]%s[/bold]", unique_id) # Get schema file path, must exist to propagate documentation - schema_path: Optional[SchemaFileLocation] = schema_map.get(unique_id) + schema_path: SchemaFileLocation | None = schema_map.get(unique_id) if schema_path is None or schema_path.current is None: with self.mutex: logger().info( @@ -892,13 +884,13 @@ def _run( ) # We can't take action return - # Build Sets + # Build sets logger().info(":mag: Resolving columns in database") database_columns_ordered = self.get_columns(self.get_catalog_key(node), output_to_lower) columns_db_meta = self.get_columns_meta(self.get_catalog_key(node), output_to_lower) - database_columns: Set[str] = set(database_columns_ordered) + database_columns: set[str] = set(database_columns_ordered) yaml_columns_ordered = [column for column in node.columns] - yaml_columns: Set[str] = set(yaml_columns_ordered) + yaml_columns: set[str] = set(yaml_columns_ordered) if not database_columns: with self.mutex: @@ -911,7 +903,7 @@ def _run( database_columns = yaml_columns # Get documentated columns - documented_columns: Set[str] = set( + documented_columns: set[str] = set( column for column, info in node.columns.items() if info.description and info.description not in self.placeholders @@ -1056,7 +1048,7 @@ def propagate_documentation_downstream( def remove_columns_not_in_database( extra_columns: Iterable[str], node: ManifestNode, - yaml_file_model_section: Dict[str, Any], + yaml_file_model_section: dict[str, t.Any], ) -> int: """Removes columns found in dbt model that do not exist in database from both node and model simultaneously @@ -1076,11 +1068,11 @@ def remove_columns_not_in_database( def update_columns_attribute( self, node: ManifestNode, - yaml_file_model_section: Dict[str, Any], - columns_db_meta: Dict[str, ColumnMetadata], + yaml_file_model_section: dict[str, t.Any], + columns_db_meta: dict[str, ColumnMetadata], attribute_name: str, meta_key: str, - skip_attribute_update: Any, + skip_attribute_update: t.Any, output_to_lower: bool = False, ) -> int: changes_committed = 0 @@ -1114,8 +1106,8 @@ def update_columns_attribute( def add_missing_cols_to_node_and_model( missing_columns: Iterable, node: ManifestNode, - yaml_file_model_section: Dict[str, Any], - columns_db_meta: Dict[str, ColumnMetadata], + yaml_file_model_section: dict[str, t.Any], + columns_db_meta: dict[str, ColumnMetadata], output_to_lower: bool, ) -> int: """Add missing columns to node and model simultaneously @@ -1164,10 +1156,10 @@ def update_schema_file_and_node( undocumented_columns: Iterable[str], extra_columns: Iterable[str], node: ManifestNode, - section: Dict[str, Any], - columns_db_meta: Dict[str, ColumnMetadata], + section: dict[str, t.Any], + columns_db_meta: dict[str, ColumnMetadata], output_to_lower: bool, - ) -> Tuple[int, int, int, int, int]: + ) -> tuple[int, int, int, int, int]: """Take action on a schema file mirroring changes in the node.""" logger().info(":microscope: Looking for actions for %s", node.unique_id) n_cols_added = 0 @@ -1224,8 +1216,8 @@ def update_schema_file_and_node( @staticmethod def maybe_get_section_from_schema_file( - yaml_file: Dict[str, Any], node: ManifestNode - ) -> Optional[Dict[str, Any]]: + yaml_file: dict[str, t.Any], node: ManifestNode + ) -> dict[str, t.Any] | None: """Get the section of a schema file that corresponds to a node.""" if node.resource_type == NodeType.Source: section = next(