Skip to content

Commit

Permalink
[GDP-2726] - add domain to mapping logic for dbt and kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
eesanoble authored May 23, 2024
1 parent 0ffdfb7 commit 627678b
Show file tree
Hide file tree
Showing 20 changed files with 6,441 additions and 2,764 deletions.
15 changes: 12 additions & 3 deletions metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@
UpstreamLineageClass,
ViewPropertiesClass,
)
from datahub.utilities.hive_schema_to_avro import get_schema_fields_for_hive_column
from datahub.metadata.urns import DatasetUrn
from datahub.sql_parsing.schema_resolver import SchemaResolver
from datahub.sql_parsing.sqlglot_lineage import (
Expand All @@ -126,6 +125,7 @@
sqlglot_lineage,
)
from datahub.sql_parsing.sqlglot_utils import detach_ctes, try_format_query
from datahub.utilities.hive_schema_to_avro import get_schema_fields_for_hive_column
from datahub.utilities.mapping import Constants, OperationProcessor
from datahub.utilities.time import datetime_to_ts_millis
from datahub.utilities.topological_sort import topological_sort
Expand Down Expand Up @@ -1246,6 +1246,14 @@ def create_dbt_platform_mces(
aspect=self._make_data_platform_instance_aspect(),
).as_workunit()

# Domain aspects
meta_domain_aspect = meta_aspects.get(Constants.ADD_DOMAIN_OPERATION)
if meta_domain_aspect:
yield MetadataChangeProposalWrapper(
entityUrn=node_datahub_urn,
aspect=meta_domain_aspect,
).as_workunit()

# add browsePathsV2 aspect
browse_paths_v2_path = []
if mce_platform_instance:
Expand All @@ -1257,7 +1265,8 @@ def create_dbt_platform_mces(
id=platform_instance_urn, urn=platform_instance_urn
)
)
browse_paths_v2_path.append(BrowsePathEntryClass(id=node.schema))
if node.schema:
browse_paths_v2_path.append(BrowsePathEntryClass(id=node.schema))
aspects.append(BrowsePathsV2Class(path=browse_paths_v2_path))

if len(aspects) == 0:
Expand Down Expand Up @@ -1588,7 +1597,7 @@ def get_schema_metadata(
description=description,
default_nullable=True,
custom_tags=column.tags,
**meta_mapping_args,
**meta_mapping_args, # type: ignore
)
assert schema_fields
canonical_schema.extend(schema_fields)
Expand Down
9 changes: 9 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,7 @@ def _extract_record(

# 4. Set dataset's description, tags, ownership, etc, if topic schema type is avro
description: Optional[str] = None
meta_domain_aspect = None
if (
schema_metadata is not None
and isinstance(schema_metadata.platformSchema, KafkaSchemaClass)
Expand Down Expand Up @@ -421,6 +422,8 @@ def _extract_record(
for tag_association in meta_tags_aspect.tags
]

meta_domain_aspect = meta_aspects.get(Constants.ADD_DOMAIN_OPERATION)

if all_tags:
dataset_snapshot.aspects.append(
mce_builder.make_global_tag_aspect_with_tag_list(all_tags)
Expand Down Expand Up @@ -466,6 +469,12 @@ def _extract_record(
entity_urn=dataset_urn,
domain_urn=domain_urn,
)
# If domain is defined in the avro schema and captured by meta mapping
if meta_domain_aspect:
yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=meta_domain_aspect,
).as_workunit()

def build_custom_properties(
self,
Expand Down
12 changes: 8 additions & 4 deletions metadata-ingestion/src/datahub/ingestion/source/tableau.py
Original file line number Diff line number Diff line change
Expand Up @@ -1795,7 +1795,7 @@ def parse_custom_sql(
)
return None

query = clean_query(datasource.get(c.QUERY))
query = clean_query(datasource.get(c.QUERY, ""))
if query is None:
logger.debug(
f"raw sql query is not available for datasource {datasource_urn}"
Expand Down Expand Up @@ -2218,11 +2218,15 @@ def emit_table(

# Browse path V2
platform = get_dataset_platform_from_urn(database_table.urn)
platform_instance = self.config.platform_instance_map.get(
platform, self.config.platform_instance
platform_instance = (
self.config.platform_instance_map.get(
platform, self.config.platform_instance
)
if self.config.platform_instance_map and platform
else None
)
browse_paths_V2_path = []
if platform_instance:
if platform_instance and platform:
platform_instance_urn = builder.make_dataplatform_instance_urn(
platform, platform_instance
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -640,12 +640,7 @@ def create(
)

# TODO: See if we can remove this -- made for redshift
if (
schema
and t_table
and t_full_name
and t_table == t_full_name
):
if schema and t_table and t_full_name and t_table == t_full_name:
logger.debug(
f"Omitting schema for upstream table {t_id}, schema included in table name"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -844,7 +844,10 @@ def _sqlglot_lineage_inner(
# For select statements, qualification will be a no-op. For other statements, this
# is where the qualification actually happens.
qualified_table = table.qualified(
dialect=dialect, default_db=default_db, default_schema=default_schema, platform=schema_resolver.platform
dialect=dialect,
default_db=default_db,
default_schema=default_schema,
platform=schema_resolver.platform,
)

urn, schema_info = schema_resolver.resolve_table(qualified_table)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,11 +272,11 @@ def get_schema_fields_for_hive_column(
avro_schema_json = get_avro_schema_for_hive_column(
hive_column_name=hive_column_name, hive_column_type=hive_column_type
)
if meta_props:
if meta_props and isinstance(avro_schema_json, dict):
avro_schema_json.update(meta_props)

schema_tag_args = {}
if custom_tags:
if custom_tags and isinstance(avro_schema_json, dict):
schema_tag_args["schema_tags_field"] = "custom_tags"
# tag_prefix is required if passing schema_tags_field
schema_tag_args["tag_prefix"] = ""
Expand All @@ -286,7 +286,7 @@ def get_schema_fields_for_hive_column(
default_nullable=default_nullable,
swallow_exceptions=False,
meta_mapping_processor=meta_mapping_processor,
**schema_tag_args,
**schema_tag_args, # type: ignore
)
except Exception as e:
logger.warning(
Expand Down
13 changes: 12 additions & 1 deletion metadata-ingestion/src/datahub/utilities/mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from datahub.emitter.mce_builder import OwnerType
from datahub.metadata.schema_classes import (
AuditStampClass,
DomainsClass,
InstitutionalMemoryClass,
InstitutionalMemoryMetadataClass,
OwnerClass,
Expand Down Expand Up @@ -71,12 +72,14 @@ class Constants:
ADD_TERM_OPERATION = "add_term"
ADD_TERMS_OPERATION = "add_terms"
ADD_OWNER_OPERATION = "add_owner"
ADD_DOMAIN_OPERATION = "add_domain"
OPERATION = "operation"
OPERATION_CONFIG = "config"
TAG = "tag"
TERM = "term"
DOC_LINK = "link"
DOC_DESCRIPTION = "description"
DOMAIN = "domain"
OWNER_TYPE = "owner_type"
OWNER_CATEGORY = "owner_category"
MATCH = "match"
Expand Down Expand Up @@ -291,7 +294,11 @@ def convert_to_aspects(
logger.error(
f"Error while constructing aspect for documentation link and description : {e}"
)

if Constants.ADD_DOMAIN_OPERATION in operation_map:
domain_aspect = DomainsClass(
domains=list(operation_map[Constants.ADD_DOMAIN_OPERATION])
)
aspect_map[Constants.ADD_DOMAIN_OPERATION] = domain_aspect
return aspect_map

def get_operation_value(
Expand Down Expand Up @@ -374,6 +381,10 @@ def get_operation_value(
for term in captured_terms.split(separator)
if term.strip()
]
elif operation_type == Constants.ADD_DOMAIN_OPERATION:
domain = operation_config[Constants.DOMAIN]
domain = _insert_match_value(domain, _get_best_match(match, "domain"))
return mce_builder.make_domain_urn(domain)
return None

def sanitize_owner_ids(self, owner_id: str) -> str:
Expand Down
Loading

0 comments on commit 627678b

Please sign in to comment.