Skip to content

Commit

Permalink
Support sql check
Browse files Browse the repository at this point in the history
  • Loading branch information
Michiel De Smet committed Nov 6, 2024
1 parent f20c81b commit 6d04ff0
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 0 deletions.
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def read(*names, **kwargs):
"ruamel.yaml==0.18.6",
"tabulate==0.9.0",
"requests==2.31.0",
"sqlglot",
],
extras_require={
# eg:
Expand Down
2 changes: 2 additions & 0 deletions src/datapilot/core/platforms/dbt/insights/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
from datapilot.core.platforms.dbt.insights.modelling.unused_sources import DBTUnusedSources
from datapilot.core.platforms.dbt.insights.performance.chain_view_linking import DBTChainViewLinking
from datapilot.core.platforms.dbt.insights.performance.exposure_parent_materializations import DBTExposureParentMaterialization
from datapilot.core.platforms.dbt.insights.sql.sql_check import SqlCheck
from datapilot.core.platforms.dbt.insights.structure.model_directories_structure import DBTModelDirectoryStructure
from datapilot.core.platforms.dbt.insights.structure.model_naming_conventions import DBTModelNamingConvention
from datapilot.core.platforms.dbt.insights.structure.source_directories_structure import DBTSourceDirectoryStructure
Expand Down Expand Up @@ -112,4 +113,5 @@
CheckSourceHasTests,
CheckSourceTableHasDescription,
CheckSourceTags,
SqlCheck,
]
Empty file.
23 changes: 23 additions & 0 deletions src/datapilot/core/platforms/dbt/insights/sql/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from abc import abstractmethod
from typing import Tuple

from datapilot.core.platforms.dbt.insights.base import DBTInsight


class SqlInsight(DBTInsight):
TYPE = "governance"

@abstractmethod
def generate(self, *args, **kwargs) -> dict:
pass

@classmethod
def has_all_required_data(cls, has_manifest: bool, **kwargs) -> Tuple[bool, str]:
"""
Check if all required data is available for the insight to run.
:param has_manifest: A boolean indicating if manifest is available.
:return: A boolean indicating if all required data is available.
"""
if not has_manifest:
return False, "manifest is required for insight to run."
return True, ""
116 changes: 116 additions & 0 deletions src/datapilot/core/platforms/dbt/insights/sql/sql_check.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import inspect
from typing import List

from sqlglot import parse_one
from sqlglot.optimizer.eliminate_ctes import eliminate_ctes
from sqlglot.optimizer.eliminate_joins import eliminate_joins
from sqlglot.optimizer.eliminate_subqueries import eliminate_subqueries
from sqlglot.optimizer.normalize import normalize
from sqlglot.optimizer.pushdown_projections import pushdown_projections
from sqlglot.optimizer.qualify import qualify
from sqlglot.optimizer.unnest_subqueries import unnest_subqueries

from datapilot.core.insights.sql.base.insight import SqlInsight
from datapilot.core.insights.utils import get_severity
from datapilot.core.platforms.dbt.insights.schema import DBTInsightResult, DBTModelInsightResponse

RULES = (
pushdown_projections,
normalize,
unnest_subqueries,
eliminate_subqueries,
eliminate_joins,
eliminate_ctes,
)

class SqlCheck(SqlInsight):
"""
This class identifies DBT models with test coverage below a specified threshold.
It aims to ensure that a minimum percentage of tests are applied to each model to maintain data integrity.
"""

NAME = "sql optimization issues"
ALIAS = "check_sql_optimization"
DESCRIPTION = "Checks if the model has SQL optimization issues. "
REASON_TO_FLAG = "The query can be optimized."
FAILURE_MESSAGE = "The query for model `{model_unique_id}` has optimization opportunities:\n{rule_name}. "
RECOMMENDATION = "Please adapt the query of the model `{model_unique_id}` as in following example:\n{optimized_sql}"

def _build_failure_result(self, model_unique_id: str, rule_name: str, optimized_sql: str) -> DBTInsightResult:
"""
Constructs a failure result for a given model with low test coverage.
:param coverage: The calculated test coverage percentage for the model.
:param min_coverage: The minimum required test coverage percentage.
:return: An instance of DBTInsightResult containing failure details.
"""
failure_message = self.FAILURE_MESSAGE.format(model_unique_id=model_unique_id, rule_name=rule_name)
recommendation = self.RECOMMENDATION.format(model_unique_id=model_unique_id, optimized_sql=optimized_sql)
return DBTInsightResult(
type=self.TYPE,
name=self.NAME,
message=failure_message,
recommendation=recommendation,
reason_to_flag=self.REASON_TO_FLAG,
metadata={"model_unique_id": model_unique_id, "rule_name": rule_name},
)

def generate(self, *args, **kwargs) -> List[DBTModelInsightResponse]:
"""
Generates insights for each DBT model in the project, focusing on test coverage.
:return: A list of DBTModelInsightResponse objects with insights for each model.
"""
self.logger.debug("Generating sql insights for DBT models")
insights = []

possible_kwargs = {
"db": None,
"catalog": None,
"dialect": self.dialect,
"isolate_tables": True, # needed for other optimizations to perform well
"quote_identifiers": False,
**kwargs,
}
raise "test"
for node_id, node in self.nodes.items():
parsed_query = node.compiled_code
qualified = qualify(parsed_query, **possible_kwargs)
changed = qualified.copy()
for rule in RULES:
original = changed.copy()
rule_params = inspect.getfullargspec(rule).args
rule_kwargs = {
param: possible_kwargs[param]
for param in rule_params
if param in possible_kwargs
}
changed = rule(changed, **rule_kwargs)
if changed.sql() != original.sql():
insights.append(
DBTModelInsightResponse(
unique_id=node_id,
package_name=node.package_name,
path=node.original_file_path,
original_file_path=node.original_file_path,
insight=self._build_failure_result(
node_id,
rule.__name__,
),
severity=get_severity(self.config, self.ALIAS, self.DEFAULT_SEVERITY),
)
)

return insights

def parse_query(
query: str,
dialect: str = "snowflake",
):
"""
Parses the query and returns the parsed query object
"""
try:
parsed = parse_one(query, read=dialect)
except Exception as e:
parsed = None
return parsed

0 comments on commit 6d04ff0

Please sign in to comment.