From cf49f80e778cf0a9038d1c36538ed9175a975b3b Mon Sep 17 00:00:00 2001 From: Mayuri Nehate <33225191+mayurinehate@users.noreply.github.com> Date: Tue, 10 Sep 2024 01:08:24 +0530 Subject: [PATCH] feat(ingest/sql): auto extract and use mode query user metadata (#11307) Co-authored-by: Harshal Sheth --- .../src/datahub/emitter/mce_builder.py | 16 - .../source/bigquery_v2/queries_extractor.py | 5 +- .../ingestion/source/redshift/lineage_v2.py | 25 +- .../sql_parsing/sql_parsing_aggregator.py | 77 +-- .../sql_parsing/tool_meta_extractor.py | 96 ++++ .../bigquery_lineage_usage_golden.json | 502 ++++++++++++++++++ .../integration/bigquery_v2/test_bigquery.py | 122 ++++- .../bigquery_v2/test_bigquery_queries.py | 4 + .../unit/sql_parsing/test_sql_aggregator.py | 289 ++++++---- .../sql_parsing/test_tool_meta_extractor.py | 55 ++ 10 files changed, 1010 insertions(+), 181 deletions(-) create mode 100644 metadata-ingestion/src/datahub/sql_parsing/tool_meta_extractor.py create mode 100644 metadata-ingestion/tests/integration/bigquery_v2/bigquery_lineage_usage_golden.json create mode 100644 metadata-ingestion/tests/unit/sql_parsing/test_tool_meta_extractor.py diff --git a/metadata-ingestion/src/datahub/emitter/mce_builder.py b/metadata-ingestion/src/datahub/emitter/mce_builder.py index df769f35b4778..e273bab62fe7a 100644 --- a/metadata-ingestion/src/datahub/emitter/mce_builder.py +++ b/metadata-ingestion/src/datahub/emitter/mce_builder.py @@ -50,7 +50,6 @@ UpstreamLineageClass, _Aspect as AspectAbstract, ) -from datahub.metadata.urns import CorpGroupUrn, CorpUserUrn from datahub.utilities.urn_encoder import UrnEncoder from datahub.utilities.urns.data_flow_urn import DataFlowUrn from datahub.utilities.urns.dataset_urn import DatasetUrn @@ -225,21 +224,6 @@ def make_user_urn(username: str) -> str: ) -def make_actor_urn(actor: str) -> Union[CorpUserUrn, CorpGroupUrn]: - """ - Makes a user urn if the input is not a user or group urn already - """ - return ( - CorpUserUrn(actor) - if not actor.startswith(("urn:li:corpuser:", "urn:li:corpGroup:")) - else ( - CorpUserUrn.from_string(actor) - if actor.startswith("urn:li:corpuser:") - else CorpGroupUrn.from_string(actor) - ) - ) - - def make_group_urn(groupname: str) -> str: """ Makes a group urn if the input is not a user or group urn already diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py index 2719d8b95bea8..23106ce7d2f86 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py @@ -36,6 +36,7 @@ BigQueryIdentifierBuilder, ) from datahub.ingestion.source.usage.usage_common import BaseUsageConfig +from datahub.metadata.urns import CorpUserUrn from datahub.sql_parsing.schema_resolver import SchemaResolver from datahub.sql_parsing.sql_parsing_aggregator import ( ObservedQuery, @@ -363,7 +364,9 @@ def _parse_audit_log_row(self, row: BigQueryJob) -> ObservedQuery: session_id=row["session_id"], timestamp=row["creation_time"], user=( - self.identifiers.gen_user_urn(row["user_email"]) + CorpUserUrn.from_string( + self.identifiers.gen_user_urn(row["user_email"]) + ) if row["user_email"] else None ), diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage_v2.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage_v2.py index bd0bbe742a219..4b7f710beed08 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage_v2.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage_v2.py @@ -31,6 +31,7 @@ from datahub.metadata.urns import DatasetUrn from datahub.sql_parsing.sql_parsing_aggregator import ( KnownQueryLineageInfo, + ObservedQuery, SqlParsingAggregator, ) from datahub.utilities.perf_timer import PerfTimer @@ -118,11 +119,13 @@ def build( if self.config.resolve_temp_table_in_lineage: for temp_row in self._lineage_v1.get_temp_tables(connection=connection): self.aggregator.add_observed_query( - query=temp_row.query_text, - default_db=self.database, - default_schema=self.config.default_schema, - session_id=temp_row.session_id, - query_timestamp=temp_row.start_time, + ObservedQuery( + query=temp_row.query_text, + default_db=self.database, + default_schema=self.config.default_schema, + session_id=temp_row.session_id, + timestamp=temp_row.start_time, + ), # The "temp table" query actually returns all CREATE TABLE statements, even if they # aren't explicitly a temp table. As such, setting is_known_temp_table=True # would not be correct. We already have mechanisms to autodetect temp tables, @@ -263,11 +266,13 @@ def _process_sql_parser_lineage(self, lineage_row: LineageRow) -> None: # TODO actor self.aggregator.add_observed_query( - query=ddl, - default_db=self.database, - default_schema=self.config.default_schema, - query_timestamp=lineage_row.timestamp, - session_id=lineage_row.session_id, + ObservedQuery( + query=ddl, + default_db=self.database, + default_schema=self.config.default_schema, + timestamp=lineage_row.timestamp, + session_id=lineage_row.session_id, + ) ) def _make_filtered_target(self, lineage_row: LineageRow) -> Optional[DatasetUrn]: diff --git a/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py b/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py index 9308ca8a0edd5..d945e135f0012 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py +++ b/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py @@ -16,7 +16,7 @@ import datahub.emitter.mce_builder as builder import datahub.metadata.schema_classes as models from datahub.configuration.time_window_config import get_time_bucket -from datahub.emitter.mce_builder import get_sys_time, make_actor_urn, make_ts_millis +from datahub.emitter.mce_builder import get_sys_time, make_ts_millis from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.sql_parsing_builder import compute_upstream_fields from datahub.ingestion.api.closeable import Closeable @@ -47,6 +47,10 @@ get_query_fingerprint, try_format_query, ) +from datahub.sql_parsing.tool_meta_extractor import ( + ToolMetaExtractor, + ToolMetaExtractorReport, +) from datahub.utilities.cooperative_timeout import CooperativeTimeoutError from datahub.utilities.file_backed_collections import ( ConnectionWrapper, @@ -88,9 +92,16 @@ class LoggedQuery: @dataclasses.dataclass -class ObservedQuery(LoggedQuery): +class ObservedQuery: + query: str + session_id: Optional[str] = None + timestamp: Optional[datetime] = None + user: Optional[Union[CorpUserUrn, CorpGroupUrn]] = None + default_db: Optional[str] = None + default_schema: Optional[str] = None query_hash: Optional[str] = None usage_multiplier: int = 1 + # Use this to store addtitional key-value information about query for debugging extra_info: Optional[dict] = None @@ -179,6 +190,8 @@ class PreparsedQuery: query_type_props: QueryTypeProps = dataclasses.field( default_factory=lambda: QueryTypeProps() ) + # Use this to store addtitional key-value information about query for debugging + extra_info: Optional[dict] = None @dataclasses.dataclass @@ -248,6 +261,9 @@ class SqlAggregatorReport(Report): num_operations_generated: int = 0 num_operations_skipped_due_to_filters: int = 0 + # Tool Metadata Extraction + tool_meta_report: Optional[ToolMetaExtractorReport] = None + def compute_stats(self) -> None: self.schema_resolver_count = self._aggregator._schema_resolver.schema_count() self.num_unique_query_fingerprints = len(self._aggregator._query_map) @@ -422,6 +438,10 @@ def __init__( tablename="query_usage_counts", ) + # Tool Extractor + self._tool_meta_extractor = ToolMetaExtractor() + self.report.tool_meta_report = self._tool_meta_extractor.report + def close(self) -> None: self._exit_stack.close() @@ -497,16 +517,7 @@ def add( elif isinstance(item, PreparsedQuery): self.add_preparsed_query(item) elif isinstance(item, ObservedQuery): - self.add_observed_query( - query=item.query, - default_db=item.default_db, - default_schema=item.default_schema, - session_id=item.session_id, - usage_multiplier=item.usage_multiplier, - query_timestamp=item.timestamp, - user=make_actor_urn(item.user) if item.user else None, - query_hash=item.query_hash, - ) + self.add_observed_query(item) else: raise ValueError(f"Cannot add unknown item type: {type(item)}") @@ -650,18 +661,9 @@ def add_view_definition( def add_observed_query( self, - query: str, - default_db: Optional[str] = None, - default_schema: Optional[str] = None, - query_timestamp: Optional[datetime] = None, - user: Optional[Union[CorpUserUrn, CorpGroupUrn]] = None, - session_id: Optional[ - str - ] = None, # can only see temp tables with the same session - usage_multiplier: int = 1, + observed: ObservedQuery, is_known_temp_table: bool = False, require_out_table_schema: bool = False, - query_hash: Optional[str] = None, ) -> None: """Add an observed query to the aggregator. @@ -675,7 +677,7 @@ def add_observed_query( self.report.num_observed_queries += 1 # All queries with no session ID are assumed to be part of the same session. - session_id = session_id or _MISSING_SESSION_ID + session_id = observed.session_id or _MISSING_SESSION_ID # Load in the temp tables for this session. schema_resolver: SchemaResolverInterface = ( @@ -685,17 +687,17 @@ def add_observed_query( # Run the SQL parser. parsed = self._run_sql_parser( - query, - default_db=default_db, - default_schema=default_schema, + observed.query, + default_db=observed.default_db, + default_schema=observed.default_schema, schema_resolver=schema_resolver, session_id=session_id, - timestamp=query_timestamp, - user=user, + timestamp=observed.timestamp, + user=observed.user, ) if parsed.debug_info.error: self.report.observed_query_parse_failures.append( - f"{parsed.debug_info.error} on query: {query[:100]}" + f"{parsed.debug_info.error} on query: {observed.query[:100]}" ) if parsed.debug_info.table_error: self.report.num_observed_queries_failed += 1 @@ -705,14 +707,14 @@ def add_observed_query( if isinstance(parsed.debug_info.column_error, CooperativeTimeoutError): self.report.num_observed_queries_column_timeout += 1 - query_fingerprint = query_hash or parsed.query_fingerprint + query_fingerprint = observed.query_hash or parsed.query_fingerprint self.add_preparsed_query( PreparsedQuery( query_id=query_fingerprint, - query_text=query, - query_count=usage_multiplier, - timestamp=query_timestamp, - user=user, + query_text=observed.query, + query_count=observed.usage_multiplier, + timestamp=observed.timestamp, + user=observed.user, session_id=session_id, query_type=parsed.query_type, query_type_props=parsed.query_type_props, @@ -723,6 +725,7 @@ def add_observed_query( column_usage=compute_upstream_fields(parsed), inferred_schema=infer_output_schema(parsed), confidence_score=parsed.debug_info.confidence, + extra_info=observed.extra_info, ), is_known_temp_table=is_known_temp_table, require_out_table_schema=require_out_table_schema, @@ -738,6 +741,12 @@ def add_preparsed_query( session_has_temp_tables: bool = True, _is_internal: bool = False, ) -> None: + + # Adding tool specific metadata extraction here allows it + # to work for both ObservedQuery and PreparsedQuery as + # add_preparsed_query it used within add_observed_query. + self._tool_meta_extractor.extract_bi_metadata(parsed) + if not _is_internal: self.report.num_preparsed_queries += 1 diff --git a/metadata-ingestion/src/datahub/sql_parsing/tool_meta_extractor.py b/metadata-ingestion/src/datahub/sql_parsing/tool_meta_extractor.py new file mode 100644 index 0000000000000..cdd35c23e3088 --- /dev/null +++ b/metadata-ingestion/src/datahub/sql_parsing/tool_meta_extractor.py @@ -0,0 +1,96 @@ +import json +import logging +from dataclasses import dataclass, field +from typing import Callable, Dict, List, Optional, Tuple, Union + +from typing_extensions import Protocol + +from datahub.ingestion.api.report import Report +from datahub.metadata.urns import CorpGroupUrn, CorpUserUrn +from datahub.utilities.stats_collections import int_top_k_dict + +UrnStr = str + +logger = logging.getLogger(__name__) + + +class QueryLog(Protocol): + """Represents Query Log Entry + expected by QueryMetaExractor interface + """ + + query_text: str + user: Optional[Union[CorpUserUrn, CorpGroupUrn]] + extra_info: Optional[dict] + + +def _get_last_line(query: str) -> str: + return query.rstrip().rsplit("\n", maxsplit=1)[-1] + + +@dataclass +class ToolMetaExtractorReport(Report): + num_queries_meta_extracted: Dict[str, int] = field(default_factory=int_top_k_dict) + + +class ToolMetaExtractor: + """Enriches input query log entry with tool-specific details captured as part of query log + + Such as + - Queries executed on warehouse by Mode BI tool contain information of actual user interacting + with BI tool which is more useful as compared to service account used to execute query as reported + by warehouse query logs. + """ + + def __init__(self) -> None: + self.report = ToolMetaExtractorReport() + self.known_tool_extractors: List[Tuple[str, Callable[[QueryLog], bool]]] = [ + ( + "mode", + self._extract_mode_query, + ) + ] + + def _extract_mode_query(self, entry: QueryLog) -> bool: + """ + Returns: + bool: whether QueryLog entry is that of mode and mode user info + is extracted into entry. + """ + last_line = _get_last_line(entry.query_text) + + if not ( + last_line.startswith("--") + and '"url":"https://modeanalytics.com' in last_line + ): + return False + + mode_json_raw = last_line[2:] + mode_json = json.loads(mode_json_raw) + + original_user = entry.user + + entry.user = email_to_user_urn(mode_json["email"]) + entry.extra_info = entry.extra_info or {} + entry.extra_info["user_via"] = original_user + + # TODO: Generate an "origin" urn. + + return True + + def extract_bi_metadata(self, entry: QueryLog) -> bool: + + for tool, meta_extractor in self.known_tool_extractors: + try: + if meta_extractor(entry): + self.report.num_queries_meta_extracted[tool] += 1 + return True + except Exception: + logger.debug("Tool metadata extraction failed with error : {e}") + return False + + +# NOTE: This is implementing the most common user urn generation scenario +# however may need to be revisited at later point +def email_to_user_urn(email: str) -> CorpUserUrn: + return CorpUserUrn(email.split("@", 1)[0]) diff --git a/metadata-ingestion/tests/integration/bigquery_v2/bigquery_lineage_usage_golden.json b/metadata-ingestion/tests/integration/bigquery_v2/bigquery_lineage_usage_golden.json new file mode 100644 index 0000000000000..a7d46a2412b6c --- /dev/null +++ b/metadata-ingestion/tests/integration/bigquery_v2/bigquery_lineage_usage_golden.json @@ -0,0 +1,502 @@ +[ +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.view-1,PROD)", + "changeType": "UPSERT", + "aspectName": "upstreamLineage", + "aspect": { + "json": { + "upstreams": [ + { + "auditStamp": { + "time": 1643871600000, + "actor": "urn:li:corpuser:_ingestion" + }, + "created": { + "time": 1643871600000, + "actor": "urn:li:corpuser:foo" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.table-1,PROD)", + "type": "TRANSFORMED", + "query": "urn:li:query:176428c30cc3197730c20f6d0161efe869a0a041876d23c044aa2cd60d4c7a12" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.table-1,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetUsageStatistics", + "aspect": { + "json": { + "timestampMillis": 1643846400000, + "eventGranularity": { + "unit": "DAY", + "multiple": 1 + }, + "partitionSpec": { + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" + }, + "uniqueUserCount": 1, + "totalSqlQueries": 4, + "topSqlQueries": [ + "create view `bigquery-dataset-1`.`view-1` as select * from `bigquery-dataset-1`.`table-1`", + "select * from `bigquery-dataset-1`.`table-1`" + ], + "userCounts": [ + { + "user": "urn:li:corpuser:foo", + "count": 4 + } + ], + "fieldCounts": [] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:176428c30cc3197730c20f6d0161efe869a0a041876d23c044aa2cd60d4c7a12", + "changeType": "UPSERT", + "aspectName": "queryProperties", + "aspect": { + "json": { + "statement": { + "value": "create view `bigquery-dataset-1`.`view-1` as select * from `bigquery-dataset-1`.`table-1`", + "language": "SQL" + }, + "source": "SYSTEM", + "created": { + "time": 1643871600000, + "actor": "urn:li:corpuser:foo" + }, + "lastModified": { + "time": 1643871600000, + "actor": "urn:li:corpuser:foo" + } + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.table-1,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:176428c30cc3197730c20f6d0161efe869a0a041876d23c044aa2cd60d4c7a12", + "changeType": "UPSERT", + "aspectName": "queryUsageStatistics", + "aspect": { + "json": { + "timestampMillis": 1643846400000, + "eventGranularity": { + "unit": "DAY", + "multiple": 1 + }, + "partitionSpec": { + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" + }, + "queryCount": 2, + "uniqueUserCount": 1, + "userCounts": [ + { + "user": "urn:li:corpuser:foo", + "count": 2 + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:176428c30cc3197730c20f6d0161efe869a0a041876d23c044aa2cd60d4c7a12", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:bigquery" + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.view-1,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetUsageStatistics", + "aspect": { + "json": { + "timestampMillis": 1643846400000, + "eventGranularity": { + "unit": "DAY", + "multiple": 1 + }, + "partitionSpec": { + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" + }, + "uniqueUserCount": 1, + "totalSqlQueries": 4, + "topSqlQueries": [ + "select * from `bigquery-dataset-1`.`view-1`\nLIMIT 100\n-- {\"user\":\"@bar\",\"email\":\"bar@xyz.com\",\"url\":\"https://modeanalytics.com/acryltest/reports/6234ff78bc7d/runs/662b21949629/queries/f0aad24d5b37\",\"scheduled\":false}\n" + ], + "userCounts": [ + { + "user": "urn:li:corpuser:bar", + "count": 4 + } + ], + "fieldCounts": [] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:176428c30cc3197730c20f6d0161efe869a0a041876d23c044aa2cd60d4c7a12", + "changeType": "UPSERT", + "aspectName": "querySubjects", + "aspect": { + "json": { + "subjects": [ + { + "entity": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.table-1,PROD)" + }, + { + "entity": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.view-1,PROD)" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:23f1935934face229de381fad193e390153180bf1e7afaa6db58e91fc28d0021", + "changeType": "UPSERT", + "aspectName": "queryProperties", + "aspect": { + "json": { + "statement": { + "value": "select * from `bigquery-dataset-1`.`table-1`", + "language": "SQL" + }, + "source": "SYSTEM", + "created": { + "time": 1643871600000, + "actor": "urn:li:corpuser:foo" + }, + "lastModified": { + "time": 1643871600000, + "actor": "urn:li:corpuser:foo" + } + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:176428c30cc3197730c20f6d0161efe869a0a041876d23c044aa2cd60d4c7a12", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:23f1935934face229de381fad193e390153180bf1e7afaa6db58e91fc28d0021", + "changeType": "UPSERT", + "aspectName": "querySubjects", + "aspect": { + "json": { + "subjects": [ + { + "entity": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.table-1,PROD)" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:23f1935934face229de381fad193e390153180bf1e7afaa6db58e91fc28d0021", + "changeType": "UPSERT", + "aspectName": "queryUsageStatistics", + "aspect": { + "json": { + "timestampMillis": 1643846400000, + "eventGranularity": { + "unit": "DAY", + "multiple": 1 + }, + "partitionSpec": { + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" + }, + "queryCount": 2, + "uniqueUserCount": 1, + "userCounts": [ + { + "user": "urn:li:corpuser:foo", + "count": 2 + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:5c0fb184e7b94a6e8203b8c9772f7296ced18d89a38e487247c45751122e3199", + "changeType": "UPSERT", + "aspectName": "queryProperties", + "aspect": { + "json": { + "statement": { + "value": "select * from `bigquery-dataset-1`.`view-1`\nLIMIT 100\n-- {\"user\":\"@bar\",\"email\":\"bar@xyz.com\",\"url\":\"https://modeanalytics.com/acryltest/reports/6234ff78bc7d/runs/662b21949629/queries/f0aad24d5b37\",\"scheduled\":false}\n", + "language": "SQL" + }, + "source": "SYSTEM", + "created": { + "time": 1643871600000, + "actor": "urn:li:corpuser:bar" + }, + "lastModified": { + "time": 1643871600000, + "actor": "urn:li:corpuser:bar" + } + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:23f1935934face229de381fad193e390153180bf1e7afaa6db58e91fc28d0021", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:bigquery" + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:5c0fb184e7b94a6e8203b8c9772f7296ced18d89a38e487247c45751122e3199", + "changeType": "UPSERT", + "aspectName": "querySubjects", + "aspect": { + "json": { + "subjects": [ + { + "entity": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.view-1,PROD)" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:5c0fb184e7b94a6e8203b8c9772f7296ced18d89a38e487247c45751122e3199", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:bigquery" + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:5c0fb184e7b94a6e8203b8c9772f7296ced18d89a38e487247c45751122e3199", + "changeType": "UPSERT", + "aspectName": "queryUsageStatistics", + "aspect": { + "json": { + "timestampMillis": 1643846400000, + "eventGranularity": { + "unit": "DAY", + "multiple": 1 + }, + "partitionSpec": { + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" + }, + "queryCount": 4, + "uniqueUserCount": 1, + "userCounts": [ + { + "user": "urn:li:corpuser:bar", + "count": 4 + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.view-1,PROD)", + "changeType": "UPSERT", + "aspectName": "operation", + "aspect": { + "json": { + "timestampMillis": 1643871600000, + "partitionSpec": { + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" + }, + "actor": "urn:li:corpuser:foo", + "operationType": "CREATE", + "customProperties": { + "query_urn": "urn:li:query:176428c30cc3197730c20f6d0161efe869a0a041876d23c044aa2cd60d4c7a12" + }, + "lastUpdatedTimestamp": 1643871600000 + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.view-1,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:23f1935934face229de381fad193e390153180bf1e7afaa6db58e91fc28d0021", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:5c0fb184e7b94a6e8203b8c9772f7296ced18d89a38e487247c45751122e3199", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery.py b/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery.py index 9029182bb14a3..1934e135457af 100644 --- a/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery.py +++ b/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery.py @@ -1,7 +1,8 @@ import random import string +from datetime import datetime, timezone from typing import Any, Dict -from unittest.mock import patch +from unittest.mock import MagicMock, patch from freezegun import freeze_time from google.cloud.bigquery.table import TableListItem @@ -24,6 +25,7 @@ ) from datahub.ingestion.source.bigquery_v2.bigquery_schema_gen import ( BigQuerySchemaGenerator, + BigQueryV2Config, ) from tests.test_helpers import mce_helpers from tests.test_helpers.state_helpers import run_and_get_pipeline @@ -43,7 +45,7 @@ def random_email(): ) -def recipe(mcp_output_path: str, override: dict = {}) -> dict: +def recipe(mcp_output_path: str, source_config_override: dict = {}) -> dict: return { "source": { "type": "bigquery", @@ -64,7 +66,7 @@ def recipe(mcp_output_path: str, override: dict = {}) -> dict: ], max_workers=1, ).dict(), - **override, + **source_config_override, }, }, "sink": {"type": "file", "config": {"filename": mcp_output_path}}, @@ -403,7 +405,119 @@ def test_bigquery_queries_v2_ingest( # if use_queries_v2 is set. pipeline_config_dict: Dict[str, Any] = recipe( mcp_output_path=mcp_output_path, - override={"use_queries_v2": True, "include_table_lineage": False}, + source_config_override={"use_queries_v2": True, "include_table_lineage": False}, + ) + + run_and_get_pipeline(pipeline_config_dict) + + mce_helpers.check_golden_file( + pytestconfig, + output_path=mcp_output_path, + golden_path=mcp_golden_path, + ) + + +@freeze_time(FROZEN_TIME) +@patch.object(BigQuerySchemaApi, "get_datasets_for_project_id") +@patch.object(BigQueryV2Config, "get_bigquery_client") +@patch("google.cloud.datacatalog_v1.PolicyTagManagerClient") +@patch("google.cloud.resourcemanager_v3.ProjectsClient") +def test_bigquery_queries_v2_lineage_usage_ingest( + projects_client, + policy_tag_manager_client, + get_bigquery_client, + get_datasets_for_project_id, + pytestconfig, + tmp_path, +): + test_resources_dir = pytestconfig.rootpath / "tests/integration/bigquery_v2" + mcp_golden_path = f"{test_resources_dir}/bigquery_lineage_usage_golden.json" + mcp_output_path = "{}/{}".format(tmp_path, "bigquery_lineage_usage_output.json") + + dataset_name = "bigquery-dataset-1" + get_datasets_for_project_id.return_value = [BigqueryDataset(name=dataset_name)] + + client = MagicMock() + get_bigquery_client.return_value = client + client.list_tables.return_value = [ + TableListItem( + {"tableReference": {"projectId": "", "datasetId": "", "tableId": "table-1"}} + ), + TableListItem( + {"tableReference": {"projectId": "", "datasetId": "", "tableId": "view-1"}} + ), + ] + + # mocking the query results for fetching audit log + # note that this is called twice, once for each region + client.query.return_value = [ + { + "job_id": "1", + "project_id": "project-id-1", + "creation_time": datetime.now(timezone.utc), + "user_email": "foo@xyz.com", + "query": "select * from `bigquery-dataset-1`.`table-1`", + "session_id": None, + "query_hash": None, + "statement_type": "SELECT", + "destination_table": None, + "referenced_tables": None, + }, + { + "job_id": "2", + "project_id": "project-id-1", + "creation_time": datetime.now(timezone.utc), + "user_email": "foo@xyz.com", + "query": "create view `bigquery-dataset-1`.`view-1` as select * from `bigquery-dataset-1`.`table-1`", + "session_id": None, + "query_hash": None, + "statement_type": "CREATE", + "destination_table": None, + "referenced_tables": None, + }, + { + "job_id": "3", + "project_id": "project-id-1", + "creation_time": datetime.now(timezone.utc), + "user_email": "service_account@xyz.com", + "query": """\ +select * from `bigquery-dataset-1`.`view-1` +LIMIT 100 +-- {"user":"@bar","email":"bar@xyz.com","url":"https://modeanalytics.com/acryltest/reports/6234ff78bc7d/runs/662b21949629/queries/f0aad24d5b37","scheduled":false} +""", + "session_id": None, + "query_hash": None, + "statement_type": "SELECT", + "destination_table": None, + "referenced_tables": None, + }, + { + "job_id": "4", + "project_id": "project-id-1", + "creation_time": datetime.now(timezone.utc), + "user_email": "service_account@xyz.com", + "query": """\ +select * from `bigquery-dataset-1`.`view-1` +LIMIT 100 +-- {"user":"@foo","email":"foo@xyz.com","url":"https://modeanalytics.com/acryltest/reports/6234ff78bc7d/runs/662b21949629/queries/f0aad24d5b37","scheduled":false} +""", + "session_id": None, + "query_hash": None, + "statement_type": "SELECT", + "destination_table": None, + "referenced_tables": None, + }, + ] + + pipeline_config_dict: Dict[str, Any] = recipe( + mcp_output_path=mcp_output_path, + source_config_override={ + "use_queries_v2": True, + "include_schema_metadata": False, + "include_table_lineage": True, + "include_usage_statistics": True, + "classification": {"enabled": False}, + }, ) run_and_get_pipeline(pipeline_config_dict) diff --git a/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery_queries.py b/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery_queries.py index 058a4094fe2fb..9290100b0c521 100644 --- a/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery_queries.py +++ b/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery_queries.py @@ -9,6 +9,7 @@ from datahub.ingestion.source.bigquery_v2.bigquery_queries import ( BigQueryQueriesSourceReport, ) +from datahub.metadata.urns import CorpUserUrn from datahub.sql_parsing.sql_parsing_aggregator import ObservedQuery from datahub.utilities.file_backed_collections import ConnectionWrapper, FileBackedList from tests.test_helpers import mce_helpers @@ -30,6 +31,9 @@ def _generate_queries_cached_file(tmp_path: Path, queries_json_path: Path) -> No assert isinstance(queries, list) for query in queries: query["timestamp"] = datetime.fromisoformat(query["timestamp"]) + query["user"] = ( + CorpUserUrn.from_string(query["user"]) if query["user"] else None + ) query_cache.append(ObservedQuery(**query)) query_cache.close() diff --git a/metadata-ingestion/tests/unit/sql_parsing/test_sql_aggregator.py b/metadata-ingestion/tests/unit/sql_parsing/test_sql_aggregator.py index 2e15dabb10d11..c730b4ee35e55 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/test_sql_aggregator.py +++ b/metadata-ingestion/tests/unit/sql_parsing/test_sql_aggregator.py @@ -10,6 +10,7 @@ from datahub.metadata.urns import CorpUserUrn, DatasetUrn from datahub.sql_parsing.sql_parsing_aggregator import ( KnownQueryLineageInfo, + ObservedQuery, QueryLogSetting, SqlParsingAggregator, ) @@ -41,9 +42,11 @@ def test_basic_lineage(pytestconfig: pytest.Config, tmp_path: pathlib.Path) -> N ) aggregator.add_observed_query( - query="create table foo as select a, b from bar", - default_db="dev", - default_schema="public", + ObservedQuery( + query="create table foo as select a, b from bar", + default_db="dev", + default_schema="public", + ) ) mcps = list(aggregator.gen_metadata()) @@ -82,16 +85,20 @@ def test_overlapping_inserts(pytestconfig: pytest.Config) -> None: ) aggregator.add_observed_query( - query="insert into downstream (a, b) select a, b from upstream1", - default_db="dev", - default_schema="public", - query_timestamp=_ts(20), + ObservedQuery( + query="insert into downstream (a, b) select a, b from upstream1", + default_db="dev", + default_schema="public", + timestamp=_ts(20), + ) ) aggregator.add_observed_query( - query="insert into downstream (a, c) select a, c from upstream2", - default_db="dev", - default_schema="public", - query_timestamp=_ts(25), + ObservedQuery( + query="insert into downstream (a, c) select a, c from upstream2", + default_db="dev", + default_schema="public", + timestamp=_ts(25), + ) ) mcps = list(aggregator.gen_metadata()) @@ -118,28 +125,36 @@ def test_temp_table(pytestconfig: pytest.Config) -> None: ) aggregator.add_observed_query( - query="create table foo as select a, 2*b as b from bar", - default_db="dev", - default_schema="public", - session_id="session1", + ObservedQuery( + query="create table foo as select a, 2*b as b from bar", + default_db="dev", + default_schema="public", + session_id="session1", + ) ) aggregator.add_observed_query( - query="create temp table foo as select a, b+c as c from bar", - default_db="dev", - default_schema="public", - session_id="session2", + ObservedQuery( + query="create temp table foo as select a, b+c as c from bar", + default_db="dev", + default_schema="public", + session_id="session2", + ) ) aggregator.add_observed_query( - query="create table foo_session2 as select * from foo", - default_db="dev", - default_schema="public", - session_id="session2", + ObservedQuery( + query="create table foo_session2 as select * from foo", + default_db="dev", + default_schema="public", + session_id="session2", + ) ) aggregator.add_observed_query( - query="create table foo_session3 as select * from foo", - default_db="dev", - default_schema="public", - session_id="session3", + ObservedQuery( + query="create table foo_session3 as select * from foo", + default_db="dev", + default_schema="public", + session_id="session3", + ) ) # foo_session2 should come from bar (via temp table foo), have columns a and c, and depend on bar.{a,b,c} @@ -164,28 +179,36 @@ def test_multistep_temp_table(pytestconfig: pytest.Config) -> None: ) aggregator.add_observed_query( - query="create table #temp1 as select a, 2*b as b from upstream1", - default_db="dev", - default_schema="public", - session_id="session1", + ObservedQuery( + query="create table #temp1 as select a, 2*b as b from upstream1", + default_db="dev", + default_schema="public", + session_id="session1", + ) ) aggregator.add_observed_query( - query="create table #temp2 as select b, c from upstream2", - default_db="dev", - default_schema="public", - session_id="session1", + ObservedQuery( + query="create table #temp2 as select b, c from upstream2", + default_db="dev", + default_schema="public", + session_id="session1", + ) ) aggregator.add_observed_query( - query="create temp table staging_foo as select up1.a, up1.b, up2.c from #temp1 up1 left join #temp2 up2 on up1.b = up2.b where up1.b > 0", - default_db="dev", - default_schema="public", - session_id="session1", + ObservedQuery( + query="create temp table staging_foo as select up1.a, up1.b, up2.c from #temp1 up1 left join #temp2 up2 on up1.b = up2.b where up1.b > 0", + default_db="dev", + default_schema="public", + session_id="session1", + ) ) aggregator.add_observed_query( - query="insert into table prod_foo\nselect * from staging_foo", - default_db="dev", - default_schema="public", - session_id="session1", + ObservedQuery( + query="insert into table prod_foo\nselect * from staging_foo", + default_db="dev", + default_schema="public", + session_id="session1", + ) ) mcps = list(aggregator.gen_metadata()) @@ -224,46 +247,56 @@ def test_overlapping_inserts_from_temp_tables(pytestconfig: pytest.Config) -> No # #stage_online_returns is populated from "online_returns", "customer", and "online_survey". aggregator.add_observed_query( - query="create table #stage_in_person_returns as select ipr.customer_id, customer.customer_email, ipr.return_date " - "from in_person_returns ipr " - "left join customer on in_person_returns.customer_id = customer.customer_id", - default_db="dev", - default_schema="public", - session_id="1234", + ObservedQuery( + query="create table #stage_in_person_returns as select ipr.customer_id, customer.customer_email, ipr.return_date " + "from in_person_returns ipr " + "left join customer on in_person_returns.customer_id = customer.customer_id", + default_db="dev", + default_schema="public", + session_id="1234", + ) ) aggregator.add_observed_query( - query="create table #stage_online_returns as select online_ret.customer_id, customer.customer_email, online_ret.return_date, online_survey.return_reason " - "from online_returns online_ret " - "left join customer on online_ret.customer_id = customer.customer_id " - "left join online_survey on online_ret.customer_id = online_survey.customer_id and online_ret.return_id = online_survey.event_id", - default_db="dev", - default_schema="public", - session_id="2323", + ObservedQuery( + query="create table #stage_online_returns as select online_ret.customer_id, customer.customer_email, online_ret.return_date, online_survey.return_reason " + "from online_returns online_ret " + "left join customer on online_ret.customer_id = customer.customer_id " + "left join online_survey on online_ret.customer_id = online_survey.customer_id and online_ret.return_id = online_survey.event_id", + default_db="dev", + default_schema="public", + session_id="2323", + ) ) aggregator.add_observed_query( - query="insert into all_returns (customer_id, customer_email, return_date) select customer_id, customer_email, return_date from #stage_in_person_returns", - default_db="dev", - default_schema="public", - session_id="1234", + ObservedQuery( + query="insert into all_returns (customer_id, customer_email, return_date) select customer_id, customer_email, return_date from #stage_in_person_returns", + default_db="dev", + default_schema="public", + session_id="1234", + ) ) aggregator.add_observed_query( - query="insert into all_returns (customer_id, customer_email, return_date, return_reason) select customer_id, customer_email, return_date, return_reason from #stage_online_returns", - default_db="dev", - default_schema="public", - session_id="2323", + ObservedQuery( + query="insert into all_returns (customer_id, customer_email, return_date, return_reason) select customer_id, customer_email, return_date, return_reason from #stage_online_returns", + default_db="dev", + default_schema="public", + session_id="2323", + ) ) # We only have one create temp table, but the same insert command from multiple sessions. # This should get ignored. assert len(report.queries_with_non_authoritative_session) == 0 aggregator.add_observed_query( - query="insert into all_returns (customer_id, customer_email, return_date, return_reason) select customer_id, customer_email, return_date, return_reason from #stage_online_returns", - default_db="dev", - default_schema="public", - session_id="5435", + ObservedQuery( + query="insert into all_returns (customer_id, customer_email, return_date, return_reason) select customer_id, customer_email, return_date, return_reason from #stage_online_returns", + default_db="dev", + default_schema="public", + session_id="5435", + ) ) assert len(report.queries_with_non_authoritative_session) == 1 @@ -286,25 +319,31 @@ def test_aggregate_operations(pytestconfig: pytest.Config) -> None: ) aggregator.add_observed_query( - query="create table foo as select a, b from bar", - default_db="dev", - default_schema="public", - query_timestamp=_ts(20), - user=CorpUserUrn("user1"), + ObservedQuery( + query="create table foo as select a, b from bar", + default_db="dev", + default_schema="public", + timestamp=_ts(20), + user=CorpUserUrn("user1"), + ) ) aggregator.add_observed_query( - query="create table foo as select a, b from bar", - default_db="dev", - default_schema="public", - query_timestamp=_ts(25), - user=CorpUserUrn("user2"), + ObservedQuery( + query="create table foo as select a, b from bar", + default_db="dev", + default_schema="public", + timestamp=_ts(25), + user=CorpUserUrn("user2"), + ) ) aggregator.add_observed_query( - query="create table foo as select a, b+1 as b from bar", - default_db="dev", - default_schema="public", - query_timestamp=_ts(26), - user=CorpUserUrn("user3"), + ObservedQuery( + query="create table foo as select a, b+1 as b from bar", + default_db="dev", + default_schema="public", + timestamp=_ts(26), + user=CorpUserUrn("user3"), + ) ) # The first query will basically be ignored, as it's a duplicate of the second one. @@ -397,14 +436,18 @@ def test_column_lineage_deduplication(pytestconfig: pytest.Config) -> None: ) aggregator.add_observed_query( - query="/* query 1 */ insert into foo (a, b, c) select a, b, c from bar", - default_db="dev", - default_schema="public", + ObservedQuery( + query="/* query 1 */ insert into foo (a, b, c) select a, b, c from bar", + default_db="dev", + default_schema="public", + ) ) aggregator.add_observed_query( - query="/* query 2 */ insert into foo (a, b) select a, b from bar", - default_db="dev", - default_schema="public", + ObservedQuery( + query="/* query 2 */ insert into foo (a, b) select a, b from bar", + default_db="dev", + default_schema="public", + ) ) mcps = list(aggregator.gen_metadata()) @@ -483,16 +526,20 @@ def test_table_rename(pytestconfig: pytest.Config) -> None: # Add an unrelated query. aggregator.add_observed_query( - query="create table bar as select a, b from baz", - default_db="dev", - default_schema="public", + ObservedQuery( + query="create table bar as select a, b from baz", + default_db="dev", + default_schema="public", + ) ) # Add the query that created the staging table. aggregator.add_observed_query( - query="create table foo_staging as select a, b from foo_dep", - default_db="dev", - default_schema="public", + ObservedQuery( + query="create table foo_staging as select a, b from foo_dep", + default_db="dev", + default_schema="public", + ) ) mcps = list(aggregator.gen_metadata()) @@ -514,10 +561,12 @@ def test_create_table_query_mcps(pytestconfig: pytest.Config) -> None: ) aggregator.add_observed_query( - query="create or replace table `dataset.foo` (date_utc timestamp, revenue int);", - default_db="dev", - default_schema="public", - query_timestamp=datetime.now(), + ObservedQuery( + query="create or replace table `dataset.foo` (date_utc timestamp, revenue int);", + default_db="dev", + default_schema="public", + timestamp=datetime.now(), + ) ) mcps = list(aggregator.gen_metadata()) @@ -539,14 +588,18 @@ def test_lineage_via_temp_table_disordered_add(pytestconfig: pytest.Config) -> N ) aggregator.add_observed_query( - query="create table derived_from_foo as select * from foo", - default_db="dev", - default_schema="public", + ObservedQuery( + query="create table derived_from_foo as select * from foo", + default_db="dev", + default_schema="public", + ) ) aggregator.add_observed_query( - query="create temp table foo as select a, b+c as c from bar", - default_db="dev", - default_schema="public", + ObservedQuery( + query="create temp table foo as select a, b+c as c from bar", + default_db="dev", + default_schema="public", + ) ) mcps = list(aggregator.gen_metadata()) @@ -579,19 +632,23 @@ def test_basic_usage(pytestconfig: pytest.Config) -> None: ) aggregator.add_observed_query( - query="select * from foo", - default_db="dev", - default_schema="public", - usage_multiplier=5, - query_timestamp=frozen_timestamp, - user=CorpUserUrn("user1"), + ObservedQuery( + query="select * from foo", + default_db="dev", + default_schema="public", + usage_multiplier=5, + timestamp=frozen_timestamp, + user=CorpUserUrn("user1"), + ) ) aggregator.add_observed_query( - query="create table bar as select b+c as c from foo", - default_db="dev", - default_schema="public", - query_timestamp=frozen_timestamp, - user=CorpUserUrn("user2"), + ObservedQuery( + query="create table bar as select b+c as c from foo", + default_db="dev", + default_schema="public", + timestamp=frozen_timestamp, + user=CorpUserUrn("user2"), + ) ) mcps = list(aggregator.gen_metadata()) diff --git a/metadata-ingestion/tests/unit/sql_parsing/test_tool_meta_extractor.py b/metadata-ingestion/tests/unit/sql_parsing/test_tool_meta_extractor.py new file mode 100644 index 0000000000000..6f590b5307146 --- /dev/null +++ b/metadata-ingestion/tests/unit/sql_parsing/test_tool_meta_extractor.py @@ -0,0 +1,55 @@ +from datahub.configuration.datetimes import parse_absolute_time +from datahub.metadata.urns import CorpUserUrn +from datahub.sql_parsing.sql_parsing_aggregator import PreparsedQuery +from datahub.sql_parsing.tool_meta_extractor import ToolMetaExtractor + + +def test_extract_mode_metadata() -> None: + extractor = ToolMetaExtractor() + query = """\ +select * from LONG_TAIL_COMPANIONS.ADOPTION.PET_PROFILES +LIMIT 100 +-- {"user":"@foo","email":"foo@acryl.io","url":"https://modeanalytics.com/acryltest/reports/6234ff78bc7d/runs/662b21949629/queries/f0aad24d5b37","scheduled":false} +""" + + entry = PreparsedQuery( + query_id=None, + query_text=query, + upstreams=[], + downstream=None, + column_lineage=None, + column_usage=None, + inferred_schema=None, + user=CorpUserUrn("mode"), + timestamp=parse_absolute_time("2021-08-01T01:02:03Z"), + ) + + assert extractor.extract_bi_metadata(entry) + assert entry.user == CorpUserUrn("foo") + + assert extractor.report.num_queries_meta_extracted["mode"] == 1 + + +def test_extract_no_metadata() -> None: + extractor = ToolMetaExtractor() + query = """\ +select * from LONG_TAIL_COMPANIONS.ADOPTION.PET_PROFILES +LIMIT 100 +-- random comment on query +""" + + entry = PreparsedQuery( + query_id=None, + query_text=query, + upstreams=[], + downstream=None, + column_lineage=None, + column_usage=None, + inferred_schema=None, + user=CorpUserUrn("mode"), + timestamp=parse_absolute_time("2021-08-01T01:02:03Z"), + ) + + assert not extractor.extract_bi_metadata(entry) + + assert extractor.report.num_queries_meta_extracted["mode"] == 0