Skip to content

Commit

Permalink
fix(ingest/powerbi): m-query fixes (datahub-project#11906)
Browse files Browse the repository at this point in the history
Co-authored-by: Harshal Sheth <[email protected]>
Co-authored-by: Aseem Bansal <[email protected]>
  • Loading branch information
3 people authored Nov 20, 2024
1 parent 218c059 commit 3f267af
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,24 @@ def get_tables(native_query: str) -> List[str]:
def remove_drop_statement(query: str) -> str:
# Certain PowerBI M-Queries contain a combination of DROP and SELECT statements within SQL, causing SQLParser to fail on these queries.
# Therefore, these occurrences are being removed.
# Regular expression to match patterns like "DROP TABLE IF EXISTS #<identifier>;"
pattern = r"DROP TABLE IF EXISTS #\w+;?"

return re.sub(pattern, "", query)
patterns = [
# Regular expression to match patterns like:
# "DROP TABLE IF EXISTS #<identifier>;"
# "DROP TABLE IF EXISTS #<identifier>, <identifier2>, ...;"
# "DROP TABLE IF EXISTS #<identifier>, <identifier2>, ...\n"
r"DROP\s+TABLE\s+IF\s+EXISTS\s+(?:#?\w+(?:,\s*#?\w+)*)[;\n]",
]

new_query = query

for pattern in patterns:
new_query = re.sub(pattern, "", new_query, flags=re.IGNORECASE)

# Remove extra spaces caused by consecutive replacements
new_query = re.sub(r"\s+", " ", new_query).strip()

return new_query


def parse_custom_sql(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,16 @@ def urn_creator(
)


def get_next_item(items: List[str], item: str) -> Optional[str]:
if item in items:
try:
index = items.index(item)
return items[index + 1]
except IndexError:
logger.debug(f'item:"{item}", not found in item-list: {items}')
return None


class AbstractDataPlatformTableCreator(ABC):
"""
Base class to share common functionalities among different dataplatform for M-Query parsing.
Expand Down Expand Up @@ -675,7 +685,7 @@ def two_level_access_pattern(
data_access_func_detail.arg_list
)
if server is None or db_name is None:
return Lineage.empty() # Return empty list
return Lineage.empty() # Return an empty list

schema_name: str = cast(
IdentifierAccessor, data_access_func_detail.identifier_accessor
Expand Down Expand Up @@ -782,32 +792,38 @@ def create_lineage(
),
)

if len(arguments) == 2:
# It is a regular case of MS-SQL
logger.debug("Handling with regular case")
return self.two_level_access_pattern(data_access_func_detail)

if len(arguments) >= 4 and arguments[2] != "Query":
logger.debug("Unsupported case is found. Second index is not the Query")
return Lineage.empty()
server, database = self.get_db_detail_from_argument(
data_access_func_detail.arg_list
)
if server is None or database is None:
return Lineage.empty() # Return an empty list

assert server
assert database # to silent the lint

query: Optional[str] = get_next_item(arguments, "Query")
if query:
if self.config.enable_advance_lineage_sql_construct is False:
# Use previous parser to generate URN to keep backward compatibility
return Lineage(
upstreams=self.create_urn_using_old_parser(
query=query,
db_name=database,
server=server,
),
column_lineage=[],
)

if self.config.enable_advance_lineage_sql_construct is False:
# Use previous parser to generate URN to keep backward compatibility
return Lineage(
upstreams=self.create_urn_using_old_parser(
query=arguments[3],
db_name=arguments[1],
server=arguments[0],
),
column_lineage=[],
return self.parse_custom_sql(
query=query,
database=database,
server=server,
schema=MSSqlDataPlatformTableCreator.DEFAULT_SCHEMA,
)

return self.parse_custom_sql(
query=arguments[3],
database=arguments[1],
server=arguments[0],
schema=MSSqlDataPlatformTableCreator.DEFAULT_SCHEMA,
)
# It is a regular case of MS-SQL
logger.debug("Handling with regular case")
return self.two_level_access_pattern(data_access_func_detail)


class OracleDataPlatformTableCreator(AbstractDataPlatformTableCreator):
Expand Down Expand Up @@ -1154,27 +1170,19 @@ def get_db_name(self, data_access_tokens: List[str]) -> Optional[str]:
!= SupportedDataPlatform.DatabricksMultiCloud_SQL.value.powerbi_data_platform_name
):
return None
try:
if "Database" in data_access_tokens:
index = data_access_tokens.index("Database")
if data_access_tokens[index + 1] != Constant.M_QUERY_NULL:
# Database name is explicitly set in argument
return data_access_tokens[index + 1]

if "Name" in data_access_tokens:
index = data_access_tokens.index("Name")
# Next element is value of the Name. It is a database name
return data_access_tokens[index + 1]
database: Optional[str] = get_next_item(data_access_tokens, "Database")

if "Catalog" in data_access_tokens:
index = data_access_tokens.index("Catalog")
# Next element is value of the Catalog. In Databricks Catalog can also be used in place of a database.
return data_access_tokens[index + 1]

except IndexError as e:
logger.debug("Database name is not available", exc_info=e)

return None
if (
database and database != Constant.M_QUERY_NULL
): # database name is explicitly set
return database

return get_next_item( # database name is set in Name argument
data_access_tokens, "Name"
) or get_next_item( # If both above arguments are not available, then try Catalog
data_access_tokens, "Catalog"
)

def create_lineage(
self, data_access_func_detail: DataAccessFunctionDetail
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,13 @@ def test_simple_from():

assert len(tables) == 1
assert tables[0] == "OPERATIONS_ANALYTICS.TRANSFORMED_PROD.V_APS_SME_UNITS_V4"


def test_drop_statement():
expected: str = "SELECT#(lf)concat((UPPER(REPLACE(SELLER,'-',''))), MONTHID) as AGENT_KEY,#(lf)concat((UPPER(REPLACE(CLIENT_DIRECTOR,'-',''))), MONTHID) as CD_AGENT_KEY,#(lf) *#(lf)FROM#(lf)OPERATIONS_ANALYTICS.TRANSFORMED_PROD.V_APS_SME_UNITS_V4"

query: str = "DROP TABLE IF EXISTS #table1; DROP TABLE IF EXISTS #table1,#table2; DROP TABLE IF EXISTS table1; DROP TABLE IF EXISTS table1, #table2;SELECT#(lf)concat((UPPER(REPLACE(SELLER,'-',''))), MONTHID) as AGENT_KEY,#(lf)concat((UPPER(REPLACE(CLIENT_DIRECTOR,'-',''))), MONTHID) as CD_AGENT_KEY,#(lf) *#(lf)FROM#(lf)OPERATIONS_ANALYTICS.TRANSFORMED_PROD.V_APS_SME_UNITS_V4"

actual: str = native_sql_parser.remove_drop_statement(query)

assert actual == expected

0 comments on commit 3f267af

Please sign in to comment.