Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support dataset identifier with schema name #18

Open
wants to merge 4 commits into
base: v0.8.33.affirm
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DataPlatformSnapshot": {
"urn": "urn:li:dataPlatform:affirm3rdParty",
"aspects": [
{
"com.linkedin.pegasus2avro.dataplatform.DataPlatformInfo": {
"datasetNameDelimiter": "/",
"name": "affirm3rdParty",
"displayName": "3rd-party Artifact",
"type": "OTHERS",
"logoUrl": "https://cdn-assets.affirm.com/images/black_logo-white_bg.jpg"
}
}
]
}
},
"proposedDelta": null
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
source:
type: file
config:
filename: './3rd-party.dataplatform.json'

sink:
type: 'datahub-rest'
config:
server: 'http://localhost:8080'
12 changes: 12 additions & 0 deletions metadata-ingestion/examples/affirm_artifact/3rd-party.recipe.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
source:
type: affirm-artifact
config:
directory: '/Users/tao.sun/tao/datahub-metadata/artifacts/3rd_party'
platform: 'affirm3rdParty'
platform_instance: ''
imtaos marked this conversation as resolved.
Show resolved Hide resolved
env: 'PROD'

sink:
type: 'datahub-rest'
config:
server: 'http://localhost:8080'
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DataPlatformSnapshot": {
"urn": "urn:li:dataPlatform:affirmInfra",
"aspects": [
{
"com.linkedin.pegasus2avro.dataplatform.DataPlatformInfo": {
"datasetNameDelimiter": "/",
"name": "affirmInfra",
"displayName": "Infra Artifact",
"type": "OTHERS",
"logoUrl": "https://cdn-assets.affirm.com/images/black_logo-white_bg.jpg"
}
}
]
}
},
"proposedDelta": null
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
source:
type: file
config:
filename: './infra.dataplatform.json'

sink:
type: 'datahub-rest'
config:
server: 'http://localhost:8080'
12 changes: 12 additions & 0 deletions metadata-ingestion/examples/affirm_artifact/infra.recipe.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
source:
type: affirm-artifact
config:
directory: '/Users/tao.sun/tao/datahub-metadata/artifacts/infra'
platform: 'affirmInfra'
platform_instance: ''
env: 'PROD'

sink:
type: 'datahub-rest'
config:
server: 'http://localhost:8080'
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
source:
type: affirm-artifact
config:
directory: '/Users/tao.sun/tao/datahub-metadata/artifacts/unstructured_s3'
platform: 's3'
platform_instance: 'unstructured_s3'
env: 'PROD'

sink:
type: 'datahub-rest'
config:
server: 'http://localhost:8080'
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from typing import Dict, Set

import pandas as pd
import spacy

from datahub.classification.privacy.privacy.api import PIIEngine
from datahub.ingestion.api.common import RecordEnvelope
Expand Down
33 changes: 26 additions & 7 deletions metadata-ingestion/src/datahub/ingestion/source/affirm/artifact.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from ruamel.yaml import YAML

import datahub.emitter.mce_builder as builder
from datahub.configuration.common import ConfigModel
from datahub.configuration.source_common import PlatformSourceConfigBase
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
Expand All @@ -20,10 +20,12 @@


yaml = YAML(typ='rt')
logging.basicConfig(stream=sys.stderr, level=logging.INFO)


@dataclass
class AffirmArtifact:
schema_name: str

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this supposed to be for?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking later in the PR I get that this is to display the path in the repo for the corresponding artifact. Are we going to do the same for the datasets?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm mysql does not have schema concept so we don't have it.
after re-think it over, I feel it's better not to use schema to represent the nested directory structure of artifacts. e.g. logs/application_log.yml, over here we represent logs as database shema-like - and it isn't very much accurate. we can simply name it as logs.application_log
wdyt

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose/value for the end-user we plan to get out of this? Can we just provide the file path in the datahub-metadata that corresponds to the dataset/artifact in DataHub? That may be useful to cross-compare, update, etc? I suppose it is not possible to have links in properties, but we could also consider adding a link to the GitHub file from elsewhere in DataHub.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, this is all going away as we will be moving it to the export-to-datahub script, correct?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it wont' go away. it is part of the dataset urn. say urn:li:dataset:(urn:li:dataPlatform:affirmInfra,development.bluejays,PROD).
the difference is either consider the whole development.bluejays as the schema.table format, or it as development folder, bluejays table.

name: str
description: str
owner: str
Expand All @@ -36,27 +38,34 @@ def __post_init__(self):
self.privacy_entrypoint = ''
if self.processing_purposes is None:
self.processing_purposes = []
if self.schema_name is None:
self.schema_name = ''


class AffirmArtifactSourceConfig(ConfigModel):
class AffirmArtifactSourceConfig(PlatformSourceConfigBase):
''' TODO support git repo to automate the whole process:
git clone, locate artifact and ingest
'''
directory: str
platform: str
env: str


def iterate_artifact(directory: str) -> Iterable[AffirmArtifact]:
def fix_description(description: str):
return ' '.join(description.split())
for r, _, filenames in os.walk(directory):
def get_schema(dir: str):
relative_dir = dir.replace(os.path.abspath(directory), '').strip()
relative_dir = relative_dir if not relative_dir.startswith(os.sep) else relative_dir[len(os.sep):]
relative_dir = relative_dir if not relative_dir.endswith(os.sep) else relative_dir[len(os.sep):]
return '.'.join(relative_dir.split(os.sep))
Comment on lines +46 to +50

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we expect a git directory and get the git root to get the relative path instead?

for dir, _, filenames in os.walk(directory):
for filename in filenames:
if filename.endswith('.yaml'):
filepath = os.path.join(r, filename)
filepath = os.path.join(dir, filename)
with open(filepath, 'r') as f:
content = yaml.load(f)
artifact = AffirmArtifact(
schema_name=get_schema(dir),
name=filename.replace('.yaml', ''),
description=fix_description(content.get('description', '')),
owner=content.get('owner', ''),
Expand Down Expand Up @@ -99,11 +108,21 @@ def create(cls, config_dict, ctx):
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
directory = self.config.directory
platform = self.config.platform
platform_instance = self.config.platform_instance
env = self.config.env
for artifact in iterate_artifact(directory):
dataset_name = artifact.name
dataset_name = (
f'{artifact.schema_name}.{artifact.name}'
if len(artifact.schema_name) > 0
else artifact.name
)
logging.info(f'> Processing dataset {dataset_name}')
dataset_urn = builder.make_dataset_urn(platform, dataset_name, env)
dataset_urn = builder.make_dataset_urn_with_platform_instance(
platform=platform,
name=dataset_name,
platform_instance=platform_instance,
env=env
)
dataset_snapshot = DatasetSnapshot(
urn=dataset_urn,
aspects=[],
Expand Down