Skip to content

Commit

Permalink
Disable blob level Lineage metrics for FileSystems
Browse files Browse the repository at this point in the history
  • Loading branch information
Abacn committed Oct 3, 2024
1 parent 2fb9efc commit 9f391a8
Show file tree
Hide file tree
Showing 7 changed files with 10 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ protected String getScheme() {
protected void reportLineage(GcsResourceId resourceId, Lineage lineage) {
GcsPath path = resourceId.getGcsPath();
if (!path.getBucket().isEmpty()) {
lineage.add("gcs", ImmutableList.of(path.getBucket(), path.getObject()));
lineage.add("gcs", ImmutableList.of(path.getBucket()));
} else {
LOG.warn("Report Lineage on relative path {} is unsupported", path.getObject());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,7 @@ protected S3ResourceId matchNewResource(String singleResourceSpec, boolean isDir

@Override
protected void reportLineage(S3ResourceId resourceId, Lineage lineage) {
lineage.add("s3", ImmutableList.of(resourceId.getBucket(), resourceId.getKey()));
lineage.add("s3", ImmutableList.of(resourceId.getBucket()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,7 @@ protected S3ResourceId matchNewResource(String singleResourceSpec, boolean isDir

@Override
protected void reportLineage(S3ResourceId resourceId, Lineage lineage) {
lineage.add("s3", ImmutableList.of(resourceId.getBucket(), resourceId.getKey()));
lineage.add("s3", ImmutableList.of(resourceId.getBucket()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -453,13 +453,6 @@ protected AzfsResourceId matchNewResource(String singleResourceSpec, boolean isD

@Override
protected void reportLineage(AzfsResourceId resourceId, Lineage lineage) {
if (!Strings.isNullOrEmpty(resourceId.getBlob())) {
lineage.add(
"abs",
ImmutableList.of(
resourceId.getAccount(), resourceId.getContainer(), resourceId.getBlob()));
} else {
lineage.add("abs", ImmutableList.of(resourceId.getAccount(), resourceId.getContainer()));
}
lineage.add("abs", ImmutableList.of(resourceId.getAccount(), resourceId.getContainer()));
}
}
2 changes: 2 additions & 0 deletions sdks/python/apache_beam/io/aws/s3filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,4 +321,6 @@ def report_lineage(self, path, lineage):
except ValueError:
# report lineage is fail-safe
return
if len(components) > 1:
components = components[:-1]
lineage.add('s3', *components)
2 changes: 2 additions & 0 deletions sdks/python/apache_beam/io/azure/blobstoragefilesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,4 +323,6 @@ def report_lineage(self, path, lineage):
except ValueError:
# report lineage is fail-safe
return
if len(components) > 1:
components = components[:-1]
lineage.add('abs', *components)
4 changes: 2 additions & 2 deletions sdks/python/apache_beam/io/gcp/gcsfilesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,8 +368,8 @@ def delete(self, paths):

def report_lineage(self, path, lineage):
try:
bucket, blob = gcsio.parse_gcs_path(path)
bucket, _ = gcsio.parse_gcs_path(path)
except ValueError:
# report lineage is fail-safe
return
lineage.add('gcs', bucket, blob)
lineage.add('gcs', bucket)

0 comments on commit 9f391a8

Please sign in to comment.