Skip to content

Commit

Permalink
feat(ingest/sql): auto extract and use mode query user metadata (data…
Browse files Browse the repository at this point in the history
…hub-project#11307)

Co-authored-by: Harshal Sheth <[email protected]>
  • Loading branch information
mayurinehate and hsheth2 authored Sep 9, 2024
1 parent 28310bb commit cf49f80
Show file tree
Hide file tree
Showing 10 changed files with 1,010 additions and 181 deletions.
16 changes: 0 additions & 16 deletions metadata-ingestion/src/datahub/emitter/mce_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
UpstreamLineageClass,
_Aspect as AspectAbstract,
)
from datahub.metadata.urns import CorpGroupUrn, CorpUserUrn
from datahub.utilities.urn_encoder import UrnEncoder
from datahub.utilities.urns.data_flow_urn import DataFlowUrn
from datahub.utilities.urns.dataset_urn import DatasetUrn
Expand Down Expand Up @@ -225,21 +224,6 @@ def make_user_urn(username: str) -> str:
)


def make_actor_urn(actor: str) -> Union[CorpUserUrn, CorpGroupUrn]:
"""
Makes a user urn if the input is not a user or group urn already
"""
return (
CorpUserUrn(actor)
if not actor.startswith(("urn:li:corpuser:", "urn:li:corpGroup:"))
else (
CorpUserUrn.from_string(actor)
if actor.startswith("urn:li:corpuser:")
else CorpGroupUrn.from_string(actor)
)
)


def make_group_urn(groupname: str) -> str:
"""
Makes a group urn if the input is not a user or group urn already
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
BigQueryIdentifierBuilder,
)
from datahub.ingestion.source.usage.usage_common import BaseUsageConfig
from datahub.metadata.urns import CorpUserUrn
from datahub.sql_parsing.schema_resolver import SchemaResolver
from datahub.sql_parsing.sql_parsing_aggregator import (
ObservedQuery,
Expand Down Expand Up @@ -363,7 +364,9 @@ def _parse_audit_log_row(self, row: BigQueryJob) -> ObservedQuery:
session_id=row["session_id"],
timestamp=row["creation_time"],
user=(
self.identifiers.gen_user_urn(row["user_email"])
CorpUserUrn.from_string(
self.identifiers.gen_user_urn(row["user_email"])
)
if row["user_email"]
else None
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from datahub.metadata.urns import DatasetUrn
from datahub.sql_parsing.sql_parsing_aggregator import (
KnownQueryLineageInfo,
ObservedQuery,
SqlParsingAggregator,
)
from datahub.utilities.perf_timer import PerfTimer
Expand Down Expand Up @@ -118,11 +119,13 @@ def build(
if self.config.resolve_temp_table_in_lineage:
for temp_row in self._lineage_v1.get_temp_tables(connection=connection):
self.aggregator.add_observed_query(
query=temp_row.query_text,
default_db=self.database,
default_schema=self.config.default_schema,
session_id=temp_row.session_id,
query_timestamp=temp_row.start_time,
ObservedQuery(
query=temp_row.query_text,
default_db=self.database,
default_schema=self.config.default_schema,
session_id=temp_row.session_id,
timestamp=temp_row.start_time,
),
# The "temp table" query actually returns all CREATE TABLE statements, even if they
# aren't explicitly a temp table. As such, setting is_known_temp_table=True
# would not be correct. We already have mechanisms to autodetect temp tables,
Expand Down Expand Up @@ -263,11 +266,13 @@ def _process_sql_parser_lineage(self, lineage_row: LineageRow) -> None:
# TODO actor

self.aggregator.add_observed_query(
query=ddl,
default_db=self.database,
default_schema=self.config.default_schema,
query_timestamp=lineage_row.timestamp,
session_id=lineage_row.session_id,
ObservedQuery(
query=ddl,
default_db=self.database,
default_schema=self.config.default_schema,
timestamp=lineage_row.timestamp,
session_id=lineage_row.session_id,
)
)

def _make_filtered_target(self, lineage_row: LineageRow) -> Optional[DatasetUrn]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import datahub.emitter.mce_builder as builder
import datahub.metadata.schema_classes as models
from datahub.configuration.time_window_config import get_time_bucket
from datahub.emitter.mce_builder import get_sys_time, make_actor_urn, make_ts_millis
from datahub.emitter.mce_builder import get_sys_time, make_ts_millis
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.sql_parsing_builder import compute_upstream_fields
from datahub.ingestion.api.closeable import Closeable
Expand Down Expand Up @@ -47,6 +47,10 @@
get_query_fingerprint,
try_format_query,
)
from datahub.sql_parsing.tool_meta_extractor import (
ToolMetaExtractor,
ToolMetaExtractorReport,
)
from datahub.utilities.cooperative_timeout import CooperativeTimeoutError
from datahub.utilities.file_backed_collections import (
ConnectionWrapper,
Expand Down Expand Up @@ -88,9 +92,16 @@ class LoggedQuery:


@dataclasses.dataclass
class ObservedQuery(LoggedQuery):
class ObservedQuery:
query: str
session_id: Optional[str] = None
timestamp: Optional[datetime] = None
user: Optional[Union[CorpUserUrn, CorpGroupUrn]] = None
default_db: Optional[str] = None
default_schema: Optional[str] = None
query_hash: Optional[str] = None
usage_multiplier: int = 1

# Use this to store addtitional key-value information about query for debugging
extra_info: Optional[dict] = None

Expand Down Expand Up @@ -179,6 +190,8 @@ class PreparsedQuery:
query_type_props: QueryTypeProps = dataclasses.field(
default_factory=lambda: QueryTypeProps()
)
# Use this to store addtitional key-value information about query for debugging
extra_info: Optional[dict] = None


@dataclasses.dataclass
Expand Down Expand Up @@ -248,6 +261,9 @@ class SqlAggregatorReport(Report):
num_operations_generated: int = 0
num_operations_skipped_due_to_filters: int = 0

# Tool Metadata Extraction
tool_meta_report: Optional[ToolMetaExtractorReport] = None

def compute_stats(self) -> None:
self.schema_resolver_count = self._aggregator._schema_resolver.schema_count()
self.num_unique_query_fingerprints = len(self._aggregator._query_map)
Expand Down Expand Up @@ -422,6 +438,10 @@ def __init__(
tablename="query_usage_counts",
)

# Tool Extractor
self._tool_meta_extractor = ToolMetaExtractor()
self.report.tool_meta_report = self._tool_meta_extractor.report

def close(self) -> None:
self._exit_stack.close()

Expand Down Expand Up @@ -497,16 +517,7 @@ def add(
elif isinstance(item, PreparsedQuery):
self.add_preparsed_query(item)
elif isinstance(item, ObservedQuery):
self.add_observed_query(
query=item.query,
default_db=item.default_db,
default_schema=item.default_schema,
session_id=item.session_id,
usage_multiplier=item.usage_multiplier,
query_timestamp=item.timestamp,
user=make_actor_urn(item.user) if item.user else None,
query_hash=item.query_hash,
)
self.add_observed_query(item)
else:
raise ValueError(f"Cannot add unknown item type: {type(item)}")

Expand Down Expand Up @@ -650,18 +661,9 @@ def add_view_definition(

def add_observed_query(
self,
query: str,
default_db: Optional[str] = None,
default_schema: Optional[str] = None,
query_timestamp: Optional[datetime] = None,
user: Optional[Union[CorpUserUrn, CorpGroupUrn]] = None,
session_id: Optional[
str
] = None, # can only see temp tables with the same session
usage_multiplier: int = 1,
observed: ObservedQuery,
is_known_temp_table: bool = False,
require_out_table_schema: bool = False,
query_hash: Optional[str] = None,
) -> None:
"""Add an observed query to the aggregator.
Expand All @@ -675,7 +677,7 @@ def add_observed_query(
self.report.num_observed_queries += 1

# All queries with no session ID are assumed to be part of the same session.
session_id = session_id or _MISSING_SESSION_ID
session_id = observed.session_id or _MISSING_SESSION_ID

# Load in the temp tables for this session.
schema_resolver: SchemaResolverInterface = (
Expand All @@ -685,17 +687,17 @@ def add_observed_query(

# Run the SQL parser.
parsed = self._run_sql_parser(
query,
default_db=default_db,
default_schema=default_schema,
observed.query,
default_db=observed.default_db,
default_schema=observed.default_schema,
schema_resolver=schema_resolver,
session_id=session_id,
timestamp=query_timestamp,
user=user,
timestamp=observed.timestamp,
user=observed.user,
)
if parsed.debug_info.error:
self.report.observed_query_parse_failures.append(
f"{parsed.debug_info.error} on query: {query[:100]}"
f"{parsed.debug_info.error} on query: {observed.query[:100]}"
)
if parsed.debug_info.table_error:
self.report.num_observed_queries_failed += 1
Expand All @@ -705,14 +707,14 @@ def add_observed_query(
if isinstance(parsed.debug_info.column_error, CooperativeTimeoutError):
self.report.num_observed_queries_column_timeout += 1

query_fingerprint = query_hash or parsed.query_fingerprint
query_fingerprint = observed.query_hash or parsed.query_fingerprint
self.add_preparsed_query(
PreparsedQuery(
query_id=query_fingerprint,
query_text=query,
query_count=usage_multiplier,
timestamp=query_timestamp,
user=user,
query_text=observed.query,
query_count=observed.usage_multiplier,
timestamp=observed.timestamp,
user=observed.user,
session_id=session_id,
query_type=parsed.query_type,
query_type_props=parsed.query_type_props,
Expand All @@ -723,6 +725,7 @@ def add_observed_query(
column_usage=compute_upstream_fields(parsed),
inferred_schema=infer_output_schema(parsed),
confidence_score=parsed.debug_info.confidence,
extra_info=observed.extra_info,
),
is_known_temp_table=is_known_temp_table,
require_out_table_schema=require_out_table_schema,
Expand All @@ -738,6 +741,12 @@ def add_preparsed_query(
session_has_temp_tables: bool = True,
_is_internal: bool = False,
) -> None:

# Adding tool specific metadata extraction here allows it
# to work for both ObservedQuery and PreparsedQuery as
# add_preparsed_query it used within add_observed_query.
self._tool_meta_extractor.extract_bi_metadata(parsed)

if not _is_internal:
self.report.num_preparsed_queries += 1

Expand Down
96 changes: 96 additions & 0 deletions metadata-ingestion/src/datahub/sql_parsing/tool_meta_extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import json
import logging
from dataclasses import dataclass, field
from typing import Callable, Dict, List, Optional, Tuple, Union

from typing_extensions import Protocol

from datahub.ingestion.api.report import Report
from datahub.metadata.urns import CorpGroupUrn, CorpUserUrn
from datahub.utilities.stats_collections import int_top_k_dict

UrnStr = str

logger = logging.getLogger(__name__)


class QueryLog(Protocol):
"""Represents Query Log Entry
expected by QueryMetaExractor interface
"""

query_text: str
user: Optional[Union[CorpUserUrn, CorpGroupUrn]]
extra_info: Optional[dict]


def _get_last_line(query: str) -> str:
return query.rstrip().rsplit("\n", maxsplit=1)[-1]


@dataclass
class ToolMetaExtractorReport(Report):
num_queries_meta_extracted: Dict[str, int] = field(default_factory=int_top_k_dict)


class ToolMetaExtractor:
"""Enriches input query log entry with tool-specific details captured as part of query log
Such as
- Queries executed on warehouse by Mode BI tool contain information of actual user interacting
with BI tool which is more useful as compared to service account used to execute query as reported
by warehouse query logs.
"""

def __init__(self) -> None:
self.report = ToolMetaExtractorReport()
self.known_tool_extractors: List[Tuple[str, Callable[[QueryLog], bool]]] = [
(
"mode",
self._extract_mode_query,
)
]

def _extract_mode_query(self, entry: QueryLog) -> bool:
"""
Returns:
bool: whether QueryLog entry is that of mode and mode user info
is extracted into entry.
"""
last_line = _get_last_line(entry.query_text)

if not (
last_line.startswith("--")
and '"url":"https://modeanalytics.com' in last_line
):
return False

mode_json_raw = last_line[2:]
mode_json = json.loads(mode_json_raw)

original_user = entry.user

entry.user = email_to_user_urn(mode_json["email"])
entry.extra_info = entry.extra_info or {}
entry.extra_info["user_via"] = original_user

# TODO: Generate an "origin" urn.

return True

def extract_bi_metadata(self, entry: QueryLog) -> bool:

for tool, meta_extractor in self.known_tool_extractors:
try:
if meta_extractor(entry):
self.report.num_queries_meta_extracted[tool] += 1
return True
except Exception:
logger.debug("Tool metadata extraction failed with error : {e}")
return False


# NOTE: This is implementing the most common user urn generation scenario
# however may need to be revisited at later point
def email_to_user_urn(email: str) -> CorpUserUrn:
return CorpUserUrn(email.split("@", 1)[0])
Loading

0 comments on commit cf49f80

Please sign in to comment.