diff --git a/metadata-ingestion/examples/affirm_artifact/3rd-party.dataplatform.json b/metadata-ingestion/examples/affirm_artifact/3rd-party.dataplatform.json new file mode 100644 index 0000000000000..dc9a2f0624049 --- /dev/null +++ b/metadata-ingestion/examples/affirm_artifact/3rd-party.dataplatform.json @@ -0,0 +1,22 @@ +[ + { + "auditHeader": null, + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DataPlatformSnapshot": { + "urn": "urn:li:dataPlatform:affirm3rdParty", + "aspects": [ + { + "com.linkedin.pegasus2avro.dataplatform.DataPlatformInfo": { + "datasetNameDelimiter": "/", + "name": "affirm3rdParty", + "displayName": "3rd-party Artifact", + "type": "OTHERS", + "logoUrl": "https://cdn-assets.affirm.com/images/black_logo-white_bg.jpg" + } + } + ] + } + }, + "proposedDelta": null + } +] diff --git a/metadata-ingestion/examples/affirm_artifact/3rd-party.dataplatform.recipe.yml b/metadata-ingestion/examples/affirm_artifact/3rd-party.dataplatform.recipe.yml new file mode 100644 index 0000000000000..1509c74fa569a --- /dev/null +++ b/metadata-ingestion/examples/affirm_artifact/3rd-party.dataplatform.recipe.yml @@ -0,0 +1,9 @@ +source: + type: file + config: + filename: './3rd-party.dataplatform.json' + +sink: + type: 'datahub-rest' + config: + server: 'http://localhost:8080' diff --git a/metadata-ingestion/examples/affirm_artifact/3rd-party.recipe.yml b/metadata-ingestion/examples/affirm_artifact/3rd-party.recipe.yml new file mode 100644 index 0000000000000..b7679e251e1c7 --- /dev/null +++ b/metadata-ingestion/examples/affirm_artifact/3rd-party.recipe.yml @@ -0,0 +1,11 @@ +source: + type: affirm-artifact + config: + directory: '/datahub-metadata/artifacts/3rd_party' + platform: 'affirm3rdParty' + env: 'PROD' + +sink: + type: 'datahub-rest' + config: + server: 'http://localhost:8080' diff --git a/metadata-ingestion/examples/affirm_artifact/infra.dataplatform.json b/metadata-ingestion/examples/affirm_artifact/infra.dataplatform.json new file mode 100644 index 0000000000000..730f6e8182681 --- /dev/null +++ b/metadata-ingestion/examples/affirm_artifact/infra.dataplatform.json @@ -0,0 +1,22 @@ +[ + { + "auditHeader": null, + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DataPlatformSnapshot": { + "urn": "urn:li:dataPlatform:affirmInfra", + "aspects": [ + { + "com.linkedin.pegasus2avro.dataplatform.DataPlatformInfo": { + "datasetNameDelimiter": "/", + "name": "affirmInfra", + "displayName": "Infra Artifact", + "type": "OTHERS", + "logoUrl": "https://cdn-assets.affirm.com/images/black_logo-white_bg.jpg" + } + } + ] + } + }, + "proposedDelta": null + } +] diff --git a/metadata-ingestion/examples/affirm_artifact/infra.dataplatform.recipe.yml b/metadata-ingestion/examples/affirm_artifact/infra.dataplatform.recipe.yml new file mode 100644 index 0000000000000..dd8b91cbcbe43 --- /dev/null +++ b/metadata-ingestion/examples/affirm_artifact/infra.dataplatform.recipe.yml @@ -0,0 +1,9 @@ +source: + type: file + config: + filename: './infra.dataplatform.json' + +sink: + type: 'datahub-rest' + config: + server: 'http://localhost:8080' diff --git a/metadata-ingestion/examples/affirm_artifact/infra.recipe.yml b/metadata-ingestion/examples/affirm_artifact/infra.recipe.yml new file mode 100644 index 0000000000000..4444b16c6ff04 --- /dev/null +++ b/metadata-ingestion/examples/affirm_artifact/infra.recipe.yml @@ -0,0 +1,11 @@ +source: + type: affirm-artifact + config: + directory: '/datahub-metadata/artifacts/infra' + platform: 'affirmInfra' + env: 'PROD' + +sink: + type: 'datahub-rest' + config: + server: 'http://localhost:8080' diff --git a/metadata-ingestion/examples/affirm_artifact/unstructured-s3-recipe.yml b/metadata-ingestion/examples/affirm_artifact/unstructured-s3-recipe.yml new file mode 100644 index 0000000000000..c65e341d47d9e --- /dev/null +++ b/metadata-ingestion/examples/affirm_artifact/unstructured-s3-recipe.yml @@ -0,0 +1,12 @@ +source: + type: affirm-artifact + config: + directory: '/datahub-metadata/artifacts/unstructured_s3' + platform: 's3' + platform_instance: 'unstructured_s3' + env: 'PROD' + +sink: + type: 'datahub-rest' + config: + server: 'http://localhost:8080' diff --git a/metadata-ingestion/src/datahub/classification/classifier.py b/metadata-ingestion/src/datahub/classification/classifier.py index d44ab51712c74..2ccff85df92a5 100644 --- a/metadata-ingestion/src/datahub/classification/classifier.py +++ b/metadata-ingestion/src/datahub/classification/classifier.py @@ -4,7 +4,6 @@ from typing import Dict, Set import pandas as pd -import spacy from datahub.classification.privacy.privacy.api import PIIEngine from datahub.ingestion.api.common import RecordEnvelope diff --git a/metadata-ingestion/src/datahub/ingestion/source/affirm/artifact.py b/metadata-ingestion/src/datahub/ingestion/source/affirm/artifact.py index 5f059a2e75a48..b2fa55df663da 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/affirm/artifact.py +++ b/metadata-ingestion/src/datahub/ingestion/source/affirm/artifact.py @@ -2,90 +2,66 @@ import os import logging from dataclasses import dataclass, field -from typing import Iterable, Sequence +from typing import Iterable from ruamel.yaml import YAML import datahub.emitter.mce_builder as builder -from datahub.configuration.common import ConfigModel +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.configuration.source_common import PlatformSourceConfigBase from datahub.ingestion.api.source import Source, SourceReport from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import DatasetSnapshot from datahub.metadata.schema_classes import ( - DatasetPropertiesClass, - OwnerClass, - OwnershipClass, - OwnershipTypeClass, + ChangeTypeClass, + StatusClass, ) yaml = YAML(typ='rt') +logging.basicConfig(stream=sys.stderr, level=logging.INFO) +logger = logging.getLogger(__name__) @dataclass class AffirmArtifact: + schema_name: str name: str - description: str - owner: str - privacy_entrypoint: str - retention_days: str - processing_purposes: Sequence[str] + is_deprecated: bool def __post_init__(self): - if self.privacy_entrypoint is None: - self.privacy_entrypoint = '' - if self.processing_purposes is None: - self.processing_purposes = [] + if self.schema_name is None: + self.schema_name = '' -class AffirmArtifactSourceConfig(ConfigModel): +class AffirmArtifactSourceConfig(PlatformSourceConfigBase): ''' TODO support git repo to automate the whole process: git clone, locate artifact and ingest ''' directory: str - platform: str env: str def iterate_artifact(directory: str) -> Iterable[AffirmArtifact]: - def fix_description(description: str): - return ' '.join(description.split()) - for r, _, filenames in os.walk(directory): + def get_schema(dir: str): + relative_dir = dir.replace(os.path.abspath(directory), '').strip() + relative_dir = relative_dir if not relative_dir.startswith(os.sep) else relative_dir[len(os.sep):] + relative_dir = relative_dir if not relative_dir.endswith(os.sep) else relative_dir[len(os.sep):] + return '.'.join(relative_dir.split(os.sep)) + for dir, _, filenames in os.walk(directory): for filename in filenames: if filename.endswith('.yaml'): - filepath = os.path.join(r, filename) + filepath = os.path.join(dir, filename) with open(filepath, 'r') as f: content = yaml.load(f) artifact = AffirmArtifact( + schema_name=get_schema(dir), name=filename.replace('.yaml', ''), - description=fix_description(content.get('description', '')), - owner=content.get('owner', ''), - privacy_entrypoint=content.get('privacy_entrypoint', ''), - retention_days=str(content.get('retention_days')) if content.get('retention_days') else '', - processing_purposes=content.get('processing_purposes', []) + is_deprecated=content.get('is_deprecated', False) ) yield artifact -def make_groupname(team: str) -> str: - prefix = 'teams/' - if team.startswith(prefix): - team = team[len(prefix):] - dot_delimited = ".".join(reversed(team.split('/'))) - camel_case = dot_delimited.replace('_', ' ').title().replace(' ', '') - return f'{camel_case}.Team' - -def make_processing_purpose_term(term: str) -> str: - """ - processing_purposes.payment_processing -> ProcessingPurpose.PaymentProcessing - """ - prefix = 'processing_purposes.' - if term.startswith(prefix): - term = term[len(prefix):] - camel_case = term.replace('_', ' ').title().replace(' ', '') - return f'ProcessingPurpose.{camel_case}' - - @dataclass class AffirmArtifactSource(Source): config: AffirmArtifactSourceConfig @@ -99,47 +75,40 @@ def create(cls, config_dict, ctx): def get_workunits(self) -> Iterable[MetadataWorkUnit]: directory = self.config.directory platform = self.config.platform + platform_instance = self.config.platform_instance env = self.config.env for artifact in iterate_artifact(directory): - dataset_name = artifact.name - logging.info(f'> Processing dataset {dataset_name}') - dataset_urn = builder.make_dataset_urn(platform, dataset_name, env) - dataset_snapshot = DatasetSnapshot( - urn=dataset_urn, - aspects=[], + dataset_name = ( + f'{artifact.schema_name}.{artifact.name}' + if len(artifact.schema_name) > 0 + else artifact.name ) - # set up dataset properties - dataset_properties = DatasetPropertiesClass( - description=artifact.description, - tags=[], - customProperties={ - 'privacy_entrypoint': artifact.privacy_entrypoint, - 'retention_days': f'{artifact.retention_days}' - } + logger.info(f'Processing dataset {dataset_name}') + dataset_urn = builder.make_dataset_urn_with_platform_instance( + platform=platform, + name=dataset_name, + platform_instance=platform_instance, + env=env ) - dataset_snapshot.aspects.append(dataset_properties) - # set up ownership - ownership = OwnershipClass( - owners=[ - OwnerClass( - owner= builder.make_group_urn(make_groupname(artifact.owner)), - type=OwnershipTypeClass.DATAOWNER - ) - ] + + mcp = MetadataChangeProposalWrapper( + entityType='dataset', + entityUrn=dataset_urn, + changeType=ChangeTypeClass.UPSERT, + aspectName='status', + aspect=StatusClass(removed=artifact.is_deprecated) + ) + + wu = MetadataWorkUnit( + id=dataset_urn, + mcp=mcp ) - dataset_snapshot.aspects.append(ownership) - # set up processing purposes glossary terms - processing_purposes = [make_processing_purpose_term(x) for x in artifact.processing_purposes] - processing_purpose_urns = [builder.make_term_urn(x) for x in processing_purposes] - processing_purpose_terms = builder.make_glossary_terms_aspect_from_urn_list(processing_purpose_urns) - dataset_snapshot.aspects.append(processing_purpose_terms) - # build mce & metadata work unit - mce = MetadataChangeEvent(proposedSnapshot=dataset_snapshot) - wu = MetadataWorkUnit(id=dataset_name, mce=mce) yield wu + def get_report(self): return self.report + def close(self): pass