From a96f2932cf081d1cc9e79e6615c51bb8859942e1 Mon Sep 17 00:00:00 2001 From: Tao Sun Date: Thu, 4 Aug 2022 13:30:00 -0700 Subject: [PATCH 1/4] Support dataset identifier with schema name, e.g. metrics.application_metrics --- .../src/datahub/ingestion/source/affirm/artifact.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/affirm/artifact.py b/metadata-ingestion/src/datahub/ingestion/source/affirm/artifact.py index 5f059a2e75a48..ba5e09cc38b87 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/affirm/artifact.py +++ b/metadata-ingestion/src/datahub/ingestion/source/affirm/artifact.py @@ -24,6 +24,7 @@ @dataclass class AffirmArtifact: + schema_name: str name: str description: str owner: str @@ -50,13 +51,19 @@ class AffirmArtifactSourceConfig(ConfigModel): def iterate_artifact(directory: str) -> Iterable[AffirmArtifact]: def fix_description(description: str): return ' '.join(description.split()) - for r, _, filenames in os.walk(directory): + def get_schema(dir: str): + relative_dir = dir.replace(os.path.abspath(directory), '').strip() + relative_dir = relative_dir if not relative_dir.startswith(os.sep) else relative_dir[len(os.sep):] + relative_dir = relative_dir if not relative_dir.endswith(os.sep) else relative_dir[len(os.sep):] + return '.'.join(relative_dir.split(os.sep)) + for dir, _, filenames in os.walk(directory): for filename in filenames: if filename.endswith('.yaml'): - filepath = os.path.join(r, filename) + filepath = os.path.join(dir, filename) with open(filepath, 'r') as f: content = yaml.load(f) artifact = AffirmArtifact( + schema_name=get_schema(dir), name=filename.replace('.yaml', ''), description=fix_description(content.get('description', '')), owner=content.get('owner', ''), @@ -101,7 +108,7 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]: platform = self.config.platform env = self.config.env for artifact in iterate_artifact(directory): - dataset_name = artifact.name + dataset_name = f'{artifact.schema_name}.{artifact.name}' if len(artifact.schema_name) > 0 else artifact.name logging.info(f'> Processing dataset {dataset_name}') dataset_urn = builder.make_dataset_urn(platform, dataset_name, env) dataset_snapshot = DatasetSnapshot( From 5e1604642cd47c9b5cbcb21356f21a2271754177 Mon Sep 17 00:00:00 2001 From: Tao Sun Date: Wed, 17 Aug 2022 17:58:03 -0700 Subject: [PATCH 2/4] Add artifact example recipes --- .../3rd-party.dataplatform.json | 22 +++++++++++++++++++ .../3rd-party.dataplatform.recipe.yml | 9 ++++++++ .../affirm_artifact/3rd-party.recipe.yml | 12 ++++++++++ .../affirm_artifact/infra.dataplatform.json | 22 +++++++++++++++++++ .../infra.dataplatform.recipe.yml | 9 ++++++++ .../examples/affirm_artifact/infra.recipe.yml | 12 ++++++++++ .../unstructured-s3-recipe.yml | 12 ++++++++++ .../src/datahub/classification/classifier.py | 1 - .../ingestion/source/affirm/artifact.py | 22 ++++++++++++++----- 9 files changed, 115 insertions(+), 6 deletions(-) create mode 100644 metadata-ingestion/examples/affirm_artifact/3rd-party.dataplatform.json create mode 100644 metadata-ingestion/examples/affirm_artifact/3rd-party.dataplatform.recipe.yml create mode 100644 metadata-ingestion/examples/affirm_artifact/3rd-party.recipe.yml create mode 100644 metadata-ingestion/examples/affirm_artifact/infra.dataplatform.json create mode 100644 metadata-ingestion/examples/affirm_artifact/infra.dataplatform.recipe.yml create mode 100644 metadata-ingestion/examples/affirm_artifact/infra.recipe.yml create mode 100644 metadata-ingestion/examples/affirm_artifact/unstructured-s3-recipe.yml diff --git a/metadata-ingestion/examples/affirm_artifact/3rd-party.dataplatform.json b/metadata-ingestion/examples/affirm_artifact/3rd-party.dataplatform.json new file mode 100644 index 0000000000000..dc9a2f0624049 --- /dev/null +++ b/metadata-ingestion/examples/affirm_artifact/3rd-party.dataplatform.json @@ -0,0 +1,22 @@ +[ + { + "auditHeader": null, + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DataPlatformSnapshot": { + "urn": "urn:li:dataPlatform:affirm3rdParty", + "aspects": [ + { + "com.linkedin.pegasus2avro.dataplatform.DataPlatformInfo": { + "datasetNameDelimiter": "/", + "name": "affirm3rdParty", + "displayName": "3rd-party Artifact", + "type": "OTHERS", + "logoUrl": "https://cdn-assets.affirm.com/images/black_logo-white_bg.jpg" + } + } + ] + } + }, + "proposedDelta": null + } +] diff --git a/metadata-ingestion/examples/affirm_artifact/3rd-party.dataplatform.recipe.yml b/metadata-ingestion/examples/affirm_artifact/3rd-party.dataplatform.recipe.yml new file mode 100644 index 0000000000000..1509c74fa569a --- /dev/null +++ b/metadata-ingestion/examples/affirm_artifact/3rd-party.dataplatform.recipe.yml @@ -0,0 +1,9 @@ +source: + type: file + config: + filename: './3rd-party.dataplatform.json' + +sink: + type: 'datahub-rest' + config: + server: 'http://localhost:8080' diff --git a/metadata-ingestion/examples/affirm_artifact/3rd-party.recipe.yml b/metadata-ingestion/examples/affirm_artifact/3rd-party.recipe.yml new file mode 100644 index 0000000000000..b0289e196e9a8 --- /dev/null +++ b/metadata-ingestion/examples/affirm_artifact/3rd-party.recipe.yml @@ -0,0 +1,12 @@ +source: + type: affirm-artifact + config: + directory: '/Users/tao.sun/tao/datahub-metadata/artifacts/3rd_party' + platform: 'affirm3rdParty' + platform_instance: '' + env: 'PROD' + +sink: + type: 'datahub-rest' + config: + server: 'http://localhost:8080' diff --git a/metadata-ingestion/examples/affirm_artifact/infra.dataplatform.json b/metadata-ingestion/examples/affirm_artifact/infra.dataplatform.json new file mode 100644 index 0000000000000..730f6e8182681 --- /dev/null +++ b/metadata-ingestion/examples/affirm_artifact/infra.dataplatform.json @@ -0,0 +1,22 @@ +[ + { + "auditHeader": null, + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DataPlatformSnapshot": { + "urn": "urn:li:dataPlatform:affirmInfra", + "aspects": [ + { + "com.linkedin.pegasus2avro.dataplatform.DataPlatformInfo": { + "datasetNameDelimiter": "/", + "name": "affirmInfra", + "displayName": "Infra Artifact", + "type": "OTHERS", + "logoUrl": "https://cdn-assets.affirm.com/images/black_logo-white_bg.jpg" + } + } + ] + } + }, + "proposedDelta": null + } +] diff --git a/metadata-ingestion/examples/affirm_artifact/infra.dataplatform.recipe.yml b/metadata-ingestion/examples/affirm_artifact/infra.dataplatform.recipe.yml new file mode 100644 index 0000000000000..dd8b91cbcbe43 --- /dev/null +++ b/metadata-ingestion/examples/affirm_artifact/infra.dataplatform.recipe.yml @@ -0,0 +1,9 @@ +source: + type: file + config: + filename: './infra.dataplatform.json' + +sink: + type: 'datahub-rest' + config: + server: 'http://localhost:8080' diff --git a/metadata-ingestion/examples/affirm_artifact/infra.recipe.yml b/metadata-ingestion/examples/affirm_artifact/infra.recipe.yml new file mode 100644 index 0000000000000..2d0bc8cdd8117 --- /dev/null +++ b/metadata-ingestion/examples/affirm_artifact/infra.recipe.yml @@ -0,0 +1,12 @@ +source: + type: affirm-artifact + config: + directory: '/Users/tao.sun/tao/datahub-metadata/artifacts/infra' + platform: 'affirmInfra' + platform_instance: '' + env: 'PROD' + +sink: + type: 'datahub-rest' + config: + server: 'http://localhost:8080' diff --git a/metadata-ingestion/examples/affirm_artifact/unstructured-s3-recipe.yml b/metadata-ingestion/examples/affirm_artifact/unstructured-s3-recipe.yml new file mode 100644 index 0000000000000..b19b3c6b87197 --- /dev/null +++ b/metadata-ingestion/examples/affirm_artifact/unstructured-s3-recipe.yml @@ -0,0 +1,12 @@ +source: + type: affirm-artifact + config: + directory: '/Users/tao.sun/tao/datahub-metadata/artifacts/unstructured_s3' + platform: 's3' + platform_instance: 'unstructured_s3' + env: 'PROD' + +sink: + type: 'datahub-rest' + config: + server: 'http://localhost:8080' diff --git a/metadata-ingestion/src/datahub/classification/classifier.py b/metadata-ingestion/src/datahub/classification/classifier.py index d44ab51712c74..2ccff85df92a5 100644 --- a/metadata-ingestion/src/datahub/classification/classifier.py +++ b/metadata-ingestion/src/datahub/classification/classifier.py @@ -4,7 +4,6 @@ from typing import Dict, Set import pandas as pd -import spacy from datahub.classification.privacy.privacy.api import PIIEngine from datahub.ingestion.api.common import RecordEnvelope diff --git a/metadata-ingestion/src/datahub/ingestion/source/affirm/artifact.py b/metadata-ingestion/src/datahub/ingestion/source/affirm/artifact.py index ba5e09cc38b87..a188ff82570b7 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/affirm/artifact.py +++ b/metadata-ingestion/src/datahub/ingestion/source/affirm/artifact.py @@ -6,7 +6,7 @@ from ruamel.yaml import YAML import datahub.emitter.mce_builder as builder -from datahub.configuration.common import ConfigModel +from datahub.configuration.source_common import PlatformSourceConfigBase from datahub.ingestion.api.source import Source, SourceReport from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent @@ -20,6 +20,7 @@ yaml = YAML(typ='rt') +logging.basicConfig(stream=sys.stderr, level=logging.INFO) @dataclass @@ -37,14 +38,15 @@ def __post_init__(self): self.privacy_entrypoint = '' if self.processing_purposes is None: self.processing_purposes = [] + if self.schema_name is None: + self.schema_name = '' -class AffirmArtifactSourceConfig(ConfigModel): +class AffirmArtifactSourceConfig(PlatformSourceConfigBase): ''' TODO support git repo to automate the whole process: git clone, locate artifact and ingest ''' directory: str - platform: str env: str @@ -106,11 +108,21 @@ def create(cls, config_dict, ctx): def get_workunits(self) -> Iterable[MetadataWorkUnit]: directory = self.config.directory platform = self.config.platform + platform_instance = self.config.platform_instance env = self.config.env for artifact in iterate_artifact(directory): - dataset_name = f'{artifact.schema_name}.{artifact.name}' if len(artifact.schema_name) > 0 else artifact.name + dataset_name = ( + f'{artifact.schema_name}.{artifact.name}' + if len(artifact.schema_name) > 0 + else artifact.name + ) logging.info(f'> Processing dataset {dataset_name}') - dataset_urn = builder.make_dataset_urn(platform, dataset_name, env) + dataset_urn = builder.make_dataset_urn_with_platform_instance( + platform=platform, + name=dataset_name, + platform_instance=platform_instance, + env=env + ) dataset_snapshot = DatasetSnapshot( urn=dataset_urn, aspects=[], From 29f3d48a51cfd6c6c0231664d26e2d7153048428 Mon Sep 17 00:00:00 2001 From: Tao Sun Date: Thu, 25 Aug 2022 11:58:08 -0700 Subject: [PATCH 3/4] Address review comments --- .../examples/affirm_artifact/3rd-party.recipe.yml | 3 +-- metadata-ingestion/examples/affirm_artifact/infra.recipe.yml | 3 +-- .../examples/affirm_artifact/unstructured-s3-recipe.yml | 2 +- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/metadata-ingestion/examples/affirm_artifact/3rd-party.recipe.yml b/metadata-ingestion/examples/affirm_artifact/3rd-party.recipe.yml index b0289e196e9a8..b7679e251e1c7 100644 --- a/metadata-ingestion/examples/affirm_artifact/3rd-party.recipe.yml +++ b/metadata-ingestion/examples/affirm_artifact/3rd-party.recipe.yml @@ -1,9 +1,8 @@ source: type: affirm-artifact config: - directory: '/Users/tao.sun/tao/datahub-metadata/artifacts/3rd_party' + directory: '/datahub-metadata/artifacts/3rd_party' platform: 'affirm3rdParty' - platform_instance: '' env: 'PROD' sink: diff --git a/metadata-ingestion/examples/affirm_artifact/infra.recipe.yml b/metadata-ingestion/examples/affirm_artifact/infra.recipe.yml index 2d0bc8cdd8117..4444b16c6ff04 100644 --- a/metadata-ingestion/examples/affirm_artifact/infra.recipe.yml +++ b/metadata-ingestion/examples/affirm_artifact/infra.recipe.yml @@ -1,9 +1,8 @@ source: type: affirm-artifact config: - directory: '/Users/tao.sun/tao/datahub-metadata/artifacts/infra' + directory: '/datahub-metadata/artifacts/infra' platform: 'affirmInfra' - platform_instance: '' env: 'PROD' sink: diff --git a/metadata-ingestion/examples/affirm_artifact/unstructured-s3-recipe.yml b/metadata-ingestion/examples/affirm_artifact/unstructured-s3-recipe.yml index b19b3c6b87197..c65e341d47d9e 100644 --- a/metadata-ingestion/examples/affirm_artifact/unstructured-s3-recipe.yml +++ b/metadata-ingestion/examples/affirm_artifact/unstructured-s3-recipe.yml @@ -1,7 +1,7 @@ source: type: affirm-artifact config: - directory: '/Users/tao.sun/tao/datahub-metadata/artifacts/unstructured_s3' + directory: '/datahub-metadata/artifacts/unstructured_s3' platform: 's3' platform_instance: 'unstructured_s3' env: 'PROD' From f87ae52c400068488d60d21f7e635ff6bb3c1af0 Mon Sep 17 00:00:00 2001 From: Tao Sun Date: Thu, 25 Aug 2022 12:17:45 -0700 Subject: [PATCH 4/4] Simplify artifact ingestion source --- .../ingestion/source/affirm/artifact.py | 92 +++++-------------- 1 file changed, 21 insertions(+), 71 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/affirm/artifact.py b/metadata-ingestion/src/datahub/ingestion/source/affirm/artifact.py index a188ff82570b7..b2fa55df663da 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/affirm/artifact.py +++ b/metadata-ingestion/src/datahub/ingestion/source/affirm/artifact.py @@ -2,42 +2,34 @@ import os import logging from dataclasses import dataclass, field -from typing import Iterable, Sequence +from typing import Iterable from ruamel.yaml import YAML import datahub.emitter.mce_builder as builder +from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.configuration.source_common import PlatformSourceConfigBase from datahub.ingestion.api.source import Source, SourceReport from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import DatasetSnapshot from datahub.metadata.schema_classes import ( - DatasetPropertiesClass, - OwnerClass, - OwnershipClass, - OwnershipTypeClass, + ChangeTypeClass, + StatusClass, ) yaml = YAML(typ='rt') logging.basicConfig(stream=sys.stderr, level=logging.INFO) +logger = logging.getLogger(__name__) @dataclass class AffirmArtifact: schema_name: str name: str - description: str - owner: str - privacy_entrypoint: str - retention_days: str - processing_purposes: Sequence[str] + is_deprecated: bool def __post_init__(self): - if self.privacy_entrypoint is None: - self.privacy_entrypoint = '' - if self.processing_purposes is None: - self.processing_purposes = [] if self.schema_name is None: self.schema_name = '' @@ -51,8 +43,6 @@ class AffirmArtifactSourceConfig(PlatformSourceConfigBase): def iterate_artifact(directory: str) -> Iterable[AffirmArtifact]: - def fix_description(description: str): - return ' '.join(description.split()) def get_schema(dir: str): relative_dir = dir.replace(os.path.abspath(directory), '').strip() relative_dir = relative_dir if not relative_dir.startswith(os.sep) else relative_dir[len(os.sep):] @@ -67,34 +57,11 @@ def get_schema(dir: str): artifact = AffirmArtifact( schema_name=get_schema(dir), name=filename.replace('.yaml', ''), - description=fix_description(content.get('description', '')), - owner=content.get('owner', ''), - privacy_entrypoint=content.get('privacy_entrypoint', ''), - retention_days=str(content.get('retention_days')) if content.get('retention_days') else '', - processing_purposes=content.get('processing_purposes', []) + is_deprecated=content.get('is_deprecated', False) ) yield artifact -def make_groupname(team: str) -> str: - prefix = 'teams/' - if team.startswith(prefix): - team = team[len(prefix):] - dot_delimited = ".".join(reversed(team.split('/'))) - camel_case = dot_delimited.replace('_', ' ').title().replace(' ', '') - return f'{camel_case}.Team' - -def make_processing_purpose_term(term: str) -> str: - """ - processing_purposes.payment_processing -> ProcessingPurpose.PaymentProcessing - """ - prefix = 'processing_purposes.' - if term.startswith(prefix): - term = term[len(prefix):] - camel_case = term.replace('_', ' ').title().replace(' ', '') - return f'ProcessingPurpose.{camel_case}' - - @dataclass class AffirmArtifactSource(Source): config: AffirmArtifactSourceConfig @@ -116,49 +83,32 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]: if len(artifact.schema_name) > 0 else artifact.name ) - logging.info(f'> Processing dataset {dataset_name}') + logger.info(f'Processing dataset {dataset_name}') dataset_urn = builder.make_dataset_urn_with_platform_instance( platform=platform, name=dataset_name, platform_instance=platform_instance, env=env ) - dataset_snapshot = DatasetSnapshot( - urn=dataset_urn, - aspects=[], - ) - # set up dataset properties - dataset_properties = DatasetPropertiesClass( - description=artifact.description, - tags=[], - customProperties={ - 'privacy_entrypoint': artifact.privacy_entrypoint, - 'retention_days': f'{artifact.retention_days}' - } + + mcp = MetadataChangeProposalWrapper( + entityType='dataset', + entityUrn=dataset_urn, + changeType=ChangeTypeClass.UPSERT, + aspectName='status', + aspect=StatusClass(removed=artifact.is_deprecated) ) - dataset_snapshot.aspects.append(dataset_properties) - # set up ownership - ownership = OwnershipClass( - owners=[ - OwnerClass( - owner= builder.make_group_urn(make_groupname(artifact.owner)), - type=OwnershipTypeClass.DATAOWNER - ) - ] + + wu = MetadataWorkUnit( + id=dataset_urn, + mcp=mcp ) - dataset_snapshot.aspects.append(ownership) - # set up processing purposes glossary terms - processing_purposes = [make_processing_purpose_term(x) for x in artifact.processing_purposes] - processing_purpose_urns = [builder.make_term_urn(x) for x in processing_purposes] - processing_purpose_terms = builder.make_glossary_terms_aspect_from_urn_list(processing_purpose_urns) - dataset_snapshot.aspects.append(processing_purpose_terms) - # build mce & metadata work unit - mce = MetadataChangeEvent(proposedSnapshot=dataset_snapshot) - wu = MetadataWorkUnit(id=dataset_name, mce=mce) yield wu + def get_report(self): return self.report + def close(self): pass