From 6d04ff0e768ac7b992e7fb63e4e773bac754fbbc Mon Sep 17 00:00:00 2001 From: Michiel De Smet Date: Wed, 6 Nov 2024 11:02:03 +0800 Subject: [PATCH] Support sql check --- setup.py | 1 + .../core/platforms/dbt/insights/__init__.py | 2 + .../platforms/dbt/insights/sql/__init__.py | 0 .../core/platforms/dbt/insights/sql/base.py | 23 ++++ .../platforms/dbt/insights/sql/sql_check.py | 116 ++++++++++++++++++ 5 files changed, 142 insertions(+) create mode 100644 src/datapilot/core/platforms/dbt/insights/sql/__init__.py create mode 100644 src/datapilot/core/platforms/dbt/insights/sql/base.py create mode 100644 src/datapilot/core/platforms/dbt/insights/sql/sql_check.py diff --git a/setup.py b/setup.py index 7551cc27..e8d932a9 100755 --- a/setup.py +++ b/setup.py @@ -67,6 +67,7 @@ def read(*names, **kwargs): "ruamel.yaml==0.18.6", "tabulate==0.9.0", "requests==2.31.0", + "sqlglot", ], extras_require={ # eg: diff --git a/src/datapilot/core/platforms/dbt/insights/__init__.py b/src/datapilot/core/platforms/dbt/insights/__init__.py index d903bb7f..c7cf6b0c 100644 --- a/src/datapilot/core/platforms/dbt/insights/__init__.py +++ b/src/datapilot/core/platforms/dbt/insights/__init__.py @@ -51,6 +51,7 @@ from datapilot.core.platforms.dbt.insights.modelling.unused_sources import DBTUnusedSources from datapilot.core.platforms.dbt.insights.performance.chain_view_linking import DBTChainViewLinking from datapilot.core.platforms.dbt.insights.performance.exposure_parent_materializations import DBTExposureParentMaterialization +from datapilot.core.platforms.dbt.insights.sql.sql_check import SqlCheck from datapilot.core.platforms.dbt.insights.structure.model_directories_structure import DBTModelDirectoryStructure from datapilot.core.platforms.dbt.insights.structure.model_naming_conventions import DBTModelNamingConvention from datapilot.core.platforms.dbt.insights.structure.source_directories_structure import DBTSourceDirectoryStructure @@ -112,4 +113,5 @@ CheckSourceHasTests, CheckSourceTableHasDescription, CheckSourceTags, + SqlCheck, ] diff --git a/src/datapilot/core/platforms/dbt/insights/sql/__init__.py b/src/datapilot/core/platforms/dbt/insights/sql/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/datapilot/core/platforms/dbt/insights/sql/base.py b/src/datapilot/core/platforms/dbt/insights/sql/base.py new file mode 100644 index 00000000..22d6e49d --- /dev/null +++ b/src/datapilot/core/platforms/dbt/insights/sql/base.py @@ -0,0 +1,23 @@ +from abc import abstractmethod +from typing import Tuple + +from datapilot.core.platforms.dbt.insights.base import DBTInsight + + +class SqlInsight(DBTInsight): + TYPE = "governance" + + @abstractmethod + def generate(self, *args, **kwargs) -> dict: + pass + + @classmethod + def has_all_required_data(cls, has_manifest: bool, **kwargs) -> Tuple[bool, str]: + """ + Check if all required data is available for the insight to run. + :param has_manifest: A boolean indicating if manifest is available. + :return: A boolean indicating if all required data is available. + """ + if not has_manifest: + return False, "manifest is required for insight to run." + return True, "" diff --git a/src/datapilot/core/platforms/dbt/insights/sql/sql_check.py b/src/datapilot/core/platforms/dbt/insights/sql/sql_check.py new file mode 100644 index 00000000..eb7f78b0 --- /dev/null +++ b/src/datapilot/core/platforms/dbt/insights/sql/sql_check.py @@ -0,0 +1,116 @@ +import inspect +from typing import List + +from sqlglot import parse_one +from sqlglot.optimizer.eliminate_ctes import eliminate_ctes +from sqlglot.optimizer.eliminate_joins import eliminate_joins +from sqlglot.optimizer.eliminate_subqueries import eliminate_subqueries +from sqlglot.optimizer.normalize import normalize +from sqlglot.optimizer.pushdown_projections import pushdown_projections +from sqlglot.optimizer.qualify import qualify +from sqlglot.optimizer.unnest_subqueries import unnest_subqueries + +from datapilot.core.insights.sql.base.insight import SqlInsight +from datapilot.core.insights.utils import get_severity +from datapilot.core.platforms.dbt.insights.schema import DBTInsightResult, DBTModelInsightResponse + +RULES = ( + pushdown_projections, + normalize, + unnest_subqueries, + eliminate_subqueries, + eliminate_joins, + eliminate_ctes, +) + +class SqlCheck(SqlInsight): + """ + This class identifies DBT models with test coverage below a specified threshold. + It aims to ensure that a minimum percentage of tests are applied to each model to maintain data integrity. + """ + + NAME = "sql optimization issues" + ALIAS = "check_sql_optimization" + DESCRIPTION = "Checks if the model has SQL optimization issues. " + REASON_TO_FLAG = "The query can be optimized." + FAILURE_MESSAGE = "The query for model `{model_unique_id}` has optimization opportunities:\n{rule_name}. " + RECOMMENDATION = "Please adapt the query of the model `{model_unique_id}` as in following example:\n{optimized_sql}" + + def _build_failure_result(self, model_unique_id: str, rule_name: str, optimized_sql: str) -> DBTInsightResult: + """ + Constructs a failure result for a given model with low test coverage. + :param coverage: The calculated test coverage percentage for the model. + :param min_coverage: The minimum required test coverage percentage. + :return: An instance of DBTInsightResult containing failure details. + """ + failure_message = self.FAILURE_MESSAGE.format(model_unique_id=model_unique_id, rule_name=rule_name) + recommendation = self.RECOMMENDATION.format(model_unique_id=model_unique_id, optimized_sql=optimized_sql) + return DBTInsightResult( + type=self.TYPE, + name=self.NAME, + message=failure_message, + recommendation=recommendation, + reason_to_flag=self.REASON_TO_FLAG, + metadata={"model_unique_id": model_unique_id, "rule_name": rule_name}, + ) + + def generate(self, *args, **kwargs) -> List[DBTModelInsightResponse]: + """ + Generates insights for each DBT model in the project, focusing on test coverage. + + :return: A list of DBTModelInsightResponse objects with insights for each model. + """ + self.logger.debug("Generating sql insights for DBT models") + insights = [] + + possible_kwargs = { + "db": None, + "catalog": None, + "dialect": self.dialect, + "isolate_tables": True, # needed for other optimizations to perform well + "quote_identifiers": False, + **kwargs, + } + raise "test" + for node_id, node in self.nodes.items(): + parsed_query = node.compiled_code + qualified = qualify(parsed_query, **possible_kwargs) + changed = qualified.copy() + for rule in RULES: + original = changed.copy() + rule_params = inspect.getfullargspec(rule).args + rule_kwargs = { + param: possible_kwargs[param] + for param in rule_params + if param in possible_kwargs + } + changed = rule(changed, **rule_kwargs) + if changed.sql() != original.sql(): + insights.append( + DBTModelInsightResponse( + unique_id=node_id, + package_name=node.package_name, + path=node.original_file_path, + original_file_path=node.original_file_path, + insight=self._build_failure_result( + node_id, + rule.__name__, + ), + severity=get_severity(self.config, self.ALIAS, self.DEFAULT_SEVERITY), + ) + ) + + return insights + + def parse_query( + query: str, + dialect: str = "snowflake", + ): + """ + Parses the query and returns the parsed query object + """ + try: + parsed = parse_one(query, read=dialect) + except Exception as e: + parsed = None + return parsed