From 9f391a869b851e4ab68dd59a7d310a29fdffb67e Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Thu, 3 Oct 2024 08:46:06 -0400 Subject: [PATCH] Disable blob level Lineage metrics for FileSystems --- .../beam/sdk/extensions/gcp/storage/GcsFileSystem.java | 2 +- .../java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java | 2 +- .../org/apache/beam/sdk/io/aws2/s3/S3FileSystem.java | 2 +- .../sdk/io/azure/blobstore/AzureBlobStoreFileSystem.java | 9 +-------- sdks/python/apache_beam/io/aws/s3filesystem.py | 2 ++ .../python/apache_beam/io/azure/blobstoragefilesystem.py | 2 ++ sdks/python/apache_beam/io/gcp/gcsfilesystem.py | 4 ++-- 7 files changed, 10 insertions(+), 13 deletions(-) diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java index 6332051c0ddc..fbbc80d97ba6 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java @@ -219,7 +219,7 @@ protected String getScheme() { protected void reportLineage(GcsResourceId resourceId, Lineage lineage) { GcsPath path = resourceId.getGcsPath(); if (!path.getBucket().isEmpty()) { - lineage.add("gcs", ImmutableList.of(path.getBucket(), path.getObject())); + lineage.add("gcs", ImmutableList.of(path.getBucket())); } else { LOG.warn("Report Lineage on relative path {} is unsupported", path.getObject()); } diff --git a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java index 7ed56efa44bd..90011e27de71 100644 --- a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java +++ b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java @@ -627,7 +627,7 @@ protected S3ResourceId matchNewResource(String singleResourceSpec, boolean isDir @Override protected void reportLineage(S3ResourceId resourceId, Lineage lineage) { - lineage.add("s3", ImmutableList.of(resourceId.getBucket(), resourceId.getKey())); + lineage.add("s3", ImmutableList.of(resourceId.getBucket())); } /** diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/s3/S3FileSystem.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/s3/S3FileSystem.java index 384c8c627ee7..02b3d782bcf3 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/s3/S3FileSystem.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/s3/S3FileSystem.java @@ -658,7 +658,7 @@ protected S3ResourceId matchNewResource(String singleResourceSpec, boolean isDir @Override protected void reportLineage(S3ResourceId resourceId, Lineage lineage) { - lineage.add("s3", ImmutableList.of(resourceId.getBucket(), resourceId.getKey())); + lineage.add("s3", ImmutableList.of(resourceId.getBucket())); } /** diff --git a/sdks/java/io/azure/src/main/java/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystem.java b/sdks/java/io/azure/src/main/java/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystem.java index 5137eaf9bb2d..a01f2f89f9d3 100644 --- a/sdks/java/io/azure/src/main/java/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystem.java +++ b/sdks/java/io/azure/src/main/java/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystem.java @@ -453,13 +453,6 @@ protected AzfsResourceId matchNewResource(String singleResourceSpec, boolean isD @Override protected void reportLineage(AzfsResourceId resourceId, Lineage lineage) { - if (!Strings.isNullOrEmpty(resourceId.getBlob())) { - lineage.add( - "abs", - ImmutableList.of( - resourceId.getAccount(), resourceId.getContainer(), resourceId.getBlob())); - } else { - lineage.add("abs", ImmutableList.of(resourceId.getAccount(), resourceId.getContainer())); - } + lineage.add("abs", ImmutableList.of(resourceId.getAccount(), resourceId.getContainer())); } } diff --git a/sdks/python/apache_beam/io/aws/s3filesystem.py b/sdks/python/apache_beam/io/aws/s3filesystem.py index e181beac4a58..8144c74014a9 100644 --- a/sdks/python/apache_beam/io/aws/s3filesystem.py +++ b/sdks/python/apache_beam/io/aws/s3filesystem.py @@ -321,4 +321,6 @@ def report_lineage(self, path, lineage): except ValueError: # report lineage is fail-safe return + if len(components) > 1: + components = components[:-1] lineage.add('s3', *components) diff --git a/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py b/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py index bb56fa09d370..d7bd3f4b3a60 100644 --- a/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py +++ b/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py @@ -323,4 +323,6 @@ def report_lineage(self, path, lineage): except ValueError: # report lineage is fail-safe return + if len(components) > 1: + components = components[:-1] lineage.add('abs', *components) diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py index 053b02d325a5..9310d04cb010 100644 --- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py +++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py @@ -368,8 +368,8 @@ def delete(self, paths): def report_lineage(self, path, lineage): try: - bucket, blob = gcsio.parse_gcs_path(path) + bucket, _ = gcsio.parse_gcs_path(path) except ValueError: # report lineage is fail-safe return - lineage.add('gcs', bucket, blob) + lineage.add('gcs', bucket)