Skip to content

Commit

Permalink
Support dataset identifier with schema name, e.g. metrics.application…
Browse files Browse the repository at this point in the history
…_metrics
  • Loading branch information
imtaos committed Aug 4, 2022
1 parent e4ea582 commit a96f293
Showing 1 changed file with 10 additions and 3 deletions.
13 changes: 10 additions & 3 deletions metadata-ingestion/src/datahub/ingestion/source/affirm/artifact.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

@dataclass
class AffirmArtifact:
schema_name: str
name: str
description: str
owner: str
Expand All @@ -50,13 +51,19 @@ class AffirmArtifactSourceConfig(ConfigModel):
def iterate_artifact(directory: str) -> Iterable[AffirmArtifact]:
def fix_description(description: str):
return ' '.join(description.split())
for r, _, filenames in os.walk(directory):
def get_schema(dir: str):
relative_dir = dir.replace(os.path.abspath(directory), '').strip()
relative_dir = relative_dir if not relative_dir.startswith(os.sep) else relative_dir[len(os.sep):]
relative_dir = relative_dir if not relative_dir.endswith(os.sep) else relative_dir[len(os.sep):]
return '.'.join(relative_dir.split(os.sep))
for dir, _, filenames in os.walk(directory):
for filename in filenames:
if filename.endswith('.yaml'):
filepath = os.path.join(r, filename)
filepath = os.path.join(dir, filename)
with open(filepath, 'r') as f:
content = yaml.load(f)
artifact = AffirmArtifact(
schema_name=get_schema(dir),
name=filename.replace('.yaml', ''),
description=fix_description(content.get('description', '')),
owner=content.get('owner', ''),
Expand Down Expand Up @@ -101,7 +108,7 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
platform = self.config.platform
env = self.config.env
for artifact in iterate_artifact(directory):
dataset_name = artifact.name
dataset_name = f'{artifact.schema_name}.{artifact.name}' if len(artifact.schema_name) > 0 else artifact.name
logging.info(f'> Processing dataset {dataset_name}')
dataset_urn = builder.make_dataset_urn(platform, dataset_name, env)
dataset_snapshot = DatasetSnapshot(
Expand Down

0 comments on commit a96f293

Please sign in to comment.