diff --git a/sdks/python/apache_beam/io/aws/s3filesystem.py b/sdks/python/apache_beam/io/aws/s3filesystem.py index 494de14c83a8..229f69b039a0 100644 --- a/sdks/python/apache_beam/io/aws/s3filesystem.py +++ b/sdks/python/apache_beam/io/aws/s3filesystem.py @@ -315,14 +315,13 @@ def delete(self, paths): if exceptions: raise BeamIOError("Delete operation failed", exceptions) - def report_lineage(self, path, lineage, level=None): + def report_lineage(self, path, lineage): try: components = s3io.parse_s3_path(path, object_optional=True) except ValueError: # report lineage is fail-safe + traceback.print_exc() return - if level == FileSystem.LineageLevel.TOP_LEVEL or \ - (len(components) > 1 and components[-1] == ''): - # bucket only + if components and not components[-1]: components = components[:-1] lineage.add('s3', *components, last_segment_sep='/') diff --git a/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py b/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py index ff908451b1b7..e547eecc9b97 100644 --- a/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py +++ b/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py @@ -317,15 +317,14 @@ def delete(self, paths): if exceptions: raise BeamIOError("Delete operation failed", exceptions) - def report_lineage(self, path, lineage, level=None): + def report_lineage(self, path, lineage): try: components = blobstorageio.parse_azfs_path( path, blob_optional=True, get_account=True) except ValueError: # report lineage is fail-safe + traceback.print_exc() return - if level == FileSystem.LineageLevel.TOP_LEVEL \ - or(len(components) > 1 and components[-1] == ''): - # bucket only + if components and not components[-1]: components = components[:-1] lineage.add('abs', *components, last_segment_sep='/') diff --git a/sdks/python/apache_beam/io/filebasedsink.py b/sdks/python/apache_beam/io/filebasedsink.py index f9d4303c8c78..eb433bd60583 100644 --- a/sdks/python/apache_beam/io/filebasedsink.py +++ b/sdks/python/apache_beam/io/filebasedsink.py @@ -286,24 +286,16 @@ def _check_state_for_finalize_write(self, writer_results, num_shards): def _report_sink_lineage(self, dst_glob, dst_files): """ - Report sink Lineage. Report every file if number of files no more than 100, - otherwise only report at directory level. + Report sink Lineage. Report every file if number of files no more than 10, + otherwise only report glob. """ - if len(dst_files) <= 100: + # There is rollup at the higher level, but this loses glob information. + # Better to report multiple globs than just the parent directory. + if len(dst_files) <= 10: for dst in dst_files: FileSystems.report_sink_lineage(dst) else: - dst = dst_glob - # dst_glob has a wildcard for shard number (see _shard_name_template) - sep = dst_glob.find('*') - if sep > 0: - dst = dst[:sep] - try: - dst, _ = FileSystems.split(dst) - except ValueError: - return # lineage report is fail-safe - - FileSystems.report_sink_lineage(dst) + FileSystems.report_sink_lineage(dst_glob) @check_accessible(['file_path_prefix']) def finalize_write( diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py index a02bc6de32c7..8569dce4f42f 100644 --- a/sdks/python/apache_beam/io/filebasedsource.py +++ b/sdks/python/apache_beam/io/filebasedsource.py @@ -170,37 +170,11 @@ def _get_concat_source(self) -> concat_source.ConcatSource: splittable=splittable) single_file_sources.append(single_file_source) - self._report_source_lineage(files_metadata) + FileSystems.report_source_lineage(pattern) self._concat_source = concat_source.ConcatSource(single_file_sources) return self._concat_source - def _report_source_lineage(self, files_metadata): - """ - Report source Lineage. depend on the number of files, report full file - name, only dir, or only top level - """ - if len(files_metadata) <= 100: - for file_metadata in files_metadata: - FileSystems.report_source_lineage(file_metadata.path) - else: - size_track = set() - for file_metadata in files_metadata: - if len(size_track) >= 100: - FileSystems.report_source_lineage( - file_metadata.path, level=FileSystem.LineageLevel.TOP_LEVEL) - return - - try: - base, _ = FileSystems.split(file_metadata.path) - except ValueError: - pass - else: - size_track.add(base) - - for base in size_track: - FileSystems.report_source_lineage(base) - def open_file(self, file_name): return FileSystems.open( file_name, @@ -382,7 +356,7 @@ def process(self, element: Union[str, FileMetadata], *args, match_results = FileSystems.match([element]) metadata_list = match_results[0].metadata_list for metadata in metadata_list: - self._report_source_lineage(metadata.path) + FileSystems.report_source_lineage(metadata.path) splittable = ( self._splittable and _determine_splittability_from_compression_type( @@ -397,28 +371,6 @@ def process(self, element: Union[str, FileMetadata], *args, metadata, OffsetRange(0, range_trackers.OffsetRangeTracker.OFFSET_INFINITY)) - def _report_source_lineage(self, path): - """ - Report source Lineage. Due to the size limit of Beam metrics, report full - file name or only top level depend on the number of files. - - * Number of files<=100, report full file paths; - - * Otherwise, report top level only. - """ - if self._size_track is None: - self._size_track = set() - elif len(self._size_track) == 0: - FileSystems.report_source_lineage( - path, level=FileSystem.LineageLevel.TOP_LEVEL) - return - - self._size_track.add(path) - FileSystems.report_source_lineage(path) - - if len(self._size_track) >= 100: - self._size_track.clear() - class _ReadRange(DoFn): def __init__( diff --git a/sdks/python/apache_beam/io/filesystem.py b/sdks/python/apache_beam/io/filesystem.py index 840fdf3309e7..bdc25dcf0fe5 100644 --- a/sdks/python/apache_beam/io/filesystem.py +++ b/sdks/python/apache_beam/io/filesystem.py @@ -934,11 +934,7 @@ def delete(self, paths): """ raise NotImplementedError - class LineageLevel: - FILE = 'FILE' - TOP_LEVEL = 'TOP_LEVEL' - - def report_lineage(self, path, unused_lineage, level=None): + def report_lineage(self, path, unused_lineage): """ Report Lineage metrics for path. diff --git a/sdks/python/apache_beam/io/filesystems.py b/sdks/python/apache_beam/io/filesystems.py index 87f45f3308ee..1d64f88684b8 100644 --- a/sdks/python/apache_beam/io/filesystems.py +++ b/sdks/python/apache_beam/io/filesystems.py @@ -391,27 +391,21 @@ def get_chunk_size(path): return filesystem.CHUNK_SIZE @staticmethod - def report_source_lineage(path, level=None): + def report_source_lineage(path): """ - Report source :class:`~apache_beam.metrics.metric.LineageLevel`. + Report source :class:`~apache_beam.metrics.metric.Lineage`. Args: path: string path to be reported. - level: the level of file path. default to - :class:`~apache_beam.io.filesystem.FileSystem.LineageLevel`.FILE. """ - filesystem = FileSystems.get_filesystem(path) - filesystem.report_lineage(path, Lineage.sources(), level=level) + FileSystems.get_filesystem(path).report_lineage(path, Lineage.sources()) @staticmethod - def report_sink_lineage(path, level=None): + def report_sink_lineage(path): """ Report sink :class:`~apache_beam.metrics.metric.Lineage`. Args: path: string path to be reported. - level: the level of file path. default to - :class:`~apache_beam.io.filesystem.FileSystem.Lineage`.FILE. """ - filesystem = FileSystems.get_filesystem(path) - filesystem.report_lineage(path, Lineage.sinks(), level=level) + FileSystems.get_filesystem(path).report_lineage(path, Lineage.sinks()) diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py index 7e293ccd9d9f..a933f783cc0b 100644 --- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py +++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py @@ -366,14 +366,13 @@ def delete(self, paths): if exceptions: raise BeamIOError("Delete operation failed", exceptions) - def report_lineage(self, path, lineage, level=None): + def report_lineage(self, path, lineage): try: components = gcsio.parse_gcs_path(path, object_optional=True) except ValueError: # report lineage is fail-safe + traceback.print_exc() return - if level == FileSystem.LineageLevel.TOP_LEVEL \ - or(len(components) > 1 and components[-1] == ''): - # bucket only + if components and not components[-1]: components = components[:-1] lineage.add('gcs', *components, last_segment_sep='/') diff --git a/sdks/python/apache_beam/io/localfilesystem.py b/sdks/python/apache_beam/io/localfilesystem.py index 5525f3b96f1d..daf69b8d030c 100644 --- a/sdks/python/apache_beam/io/localfilesystem.py +++ b/sdks/python/apache_beam/io/localfilesystem.py @@ -365,8 +365,5 @@ def try_delete(path): if exceptions: raise BeamIOError("Delete operation failed", exceptions) - def report_lineage(self, path, lineage, level=None): - if level == FileSystem.LineageLevel.TOP_LEVEL: - lineage.add('filesystem', 'localhost') - else: - lineage.add('filesystem', 'localhost', path, last_segment_sep='/') + def report_lineage(self, path, lineage): + lineage.add('filesystem', 'localhost', path, last_segment_sep='/')