Skip to content

Commit

Permalink
Remove file-specific lineage bounding.
Browse files Browse the repository at this point in the history
  • Loading branch information
robertwb committed Dec 17, 2024
1 parent 76db105 commit 30288ae
Show file tree
Hide file tree
Showing 8 changed files with 25 additions and 98 deletions.
7 changes: 3 additions & 4 deletions sdks/python/apache_beam/io/aws/s3filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,14 +315,13 @@ def delete(self, paths):
if exceptions:
raise BeamIOError("Delete operation failed", exceptions)

def report_lineage(self, path, lineage, level=None):
def report_lineage(self, path, lineage):
try:
components = s3io.parse_s3_path(path, object_optional=True)
except ValueError:
# report lineage is fail-safe
traceback.print_exc()
return
if level == FileSystem.LineageLevel.TOP_LEVEL or \
(len(components) > 1 and components[-1] == ''):
# bucket only
if components and not components[-1]:
components = components[:-1]
lineage.add('s3', *components, last_segment_sep='/')
7 changes: 3 additions & 4 deletions sdks/python/apache_beam/io/azure/blobstoragefilesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,15 +317,14 @@ def delete(self, paths):
if exceptions:
raise BeamIOError("Delete operation failed", exceptions)

def report_lineage(self, path, lineage, level=None):
def report_lineage(self, path, lineage):
try:
components = blobstorageio.parse_azfs_path(
path, blob_optional=True, get_account=True)
except ValueError:
# report lineage is fail-safe
traceback.print_exc()
return
if level == FileSystem.LineageLevel.TOP_LEVEL \
or(len(components) > 1 and components[-1] == ''):
# bucket only
if components and not components[-1]:
components = components[:-1]
lineage.add('abs', *components, last_segment_sep='/')
20 changes: 6 additions & 14 deletions sdks/python/apache_beam/io/filebasedsink.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,24 +286,16 @@ def _check_state_for_finalize_write(self, writer_results, num_shards):

def _report_sink_lineage(self, dst_glob, dst_files):
"""
Report sink Lineage. Report every file if number of files no more than 100,
otherwise only report at directory level.
Report sink Lineage. Report every file if number of files no more than 10,
otherwise only report glob.
"""
if len(dst_files) <= 100:
# There is rollup at the higher level, but this loses glob information.
# Better to report multiple globs than just the parent directory.
if len(dst_files) <= 10:
for dst in dst_files:
FileSystems.report_sink_lineage(dst)
else:
dst = dst_glob
# dst_glob has a wildcard for shard number (see _shard_name_template)
sep = dst_glob.find('*')
if sep > 0:
dst = dst[:sep]
try:
dst, _ = FileSystems.split(dst)
except ValueError:
return # lineage report is fail-safe

FileSystems.report_sink_lineage(dst)
FileSystems.report_sink_lineage(dst_glob)

@check_accessible(['file_path_prefix'])
def finalize_write(
Expand Down
53 changes: 2 additions & 51 deletions sdks/python/apache_beam/io/filebasedsource.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
from apache_beam.io import range_trackers
from apache_beam.io.filesystem import CompressionTypes
from apache_beam.io.filesystem import FileMetadata
from apache_beam.io.filesystem import FileSystem
from apache_beam.io.filesystems import FileSystems
from apache_beam.io.restriction_trackers import OffsetRange
from apache_beam.options.value_provider import StaticValueProvider
Expand Down Expand Up @@ -170,37 +169,11 @@ def _get_concat_source(self) -> concat_source.ConcatSource:
splittable=splittable)
single_file_sources.append(single_file_source)

self._report_source_lineage(files_metadata)
FileSystems.report_source_lineage(pattern)
self._concat_source = concat_source.ConcatSource(single_file_sources)

return self._concat_source

def _report_source_lineage(self, files_metadata):
"""
Report source Lineage. depend on the number of files, report full file
name, only dir, or only top level
"""
if len(files_metadata) <= 100:
for file_metadata in files_metadata:
FileSystems.report_source_lineage(file_metadata.path)
else:
size_track = set()
for file_metadata in files_metadata:
if len(size_track) >= 100:
FileSystems.report_source_lineage(
file_metadata.path, level=FileSystem.LineageLevel.TOP_LEVEL)
return

try:
base, _ = FileSystems.split(file_metadata.path)
except ValueError:
pass
else:
size_track.add(base)

for base in size_track:
FileSystems.report_source_lineage(base)

def open_file(self, file_name):
return FileSystems.open(
file_name,
Expand Down Expand Up @@ -382,7 +355,7 @@ def process(self, element: Union[str, FileMetadata], *args,
match_results = FileSystems.match([element])
metadata_list = match_results[0].metadata_list
for metadata in metadata_list:
self._report_source_lineage(metadata.path)
FileSystems.report_source_lineage(metadata.path)

splittable = (
self._splittable and _determine_splittability_from_compression_type(
Expand All @@ -397,28 +370,6 @@ def process(self, element: Union[str, FileMetadata], *args,
metadata,
OffsetRange(0, range_trackers.OffsetRangeTracker.OFFSET_INFINITY))

def _report_source_lineage(self, path):
"""
Report source Lineage. Due to the size limit of Beam metrics, report full
file name or only top level depend on the number of files.
* Number of files<=100, report full file paths;
* Otherwise, report top level only.
"""
if self._size_track is None:
self._size_track = set()
elif len(self._size_track) == 0:
FileSystems.report_source_lineage(
path, level=FileSystem.LineageLevel.TOP_LEVEL)
return

self._size_track.add(path)
FileSystems.report_source_lineage(path)

if len(self._size_track) >= 100:
self._size_track.clear()


class _ReadRange(DoFn):
def __init__(
Expand Down
6 changes: 1 addition & 5 deletions sdks/python/apache_beam/io/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -934,11 +934,7 @@ def delete(self, paths):
"""
raise NotImplementedError

class LineageLevel:
FILE = 'FILE'
TOP_LEVEL = 'TOP_LEVEL'

def report_lineage(self, path, unused_lineage, level=None):
def report_lineage(self, path, unused_lineage):
"""
Report Lineage metrics for path.
Expand Down
16 changes: 5 additions & 11 deletions sdks/python/apache_beam/io/filesystems.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,27 +391,21 @@ def get_chunk_size(path):
return filesystem.CHUNK_SIZE

@staticmethod
def report_source_lineage(path, level=None):
def report_source_lineage(path):
"""
Report source :class:`~apache_beam.metrics.metric.LineageLevel`.
Report source :class:`~apache_beam.metrics.metric.Lineage`.
Args:
path: string path to be reported.
level: the level of file path. default to
:class:`~apache_beam.io.filesystem.FileSystem.LineageLevel`.FILE.
"""
filesystem = FileSystems.get_filesystem(path)
filesystem.report_lineage(path, Lineage.sources(), level=level)
FileSystems.get_filesystem(path).report_lineage(path, Lineage.sources())

@staticmethod
def report_sink_lineage(path, level=None):
def report_sink_lineage(path):
"""
Report sink :class:`~apache_beam.metrics.metric.Lineage`.
Args:
path: string path to be reported.
level: the level of file path. default to
:class:`~apache_beam.io.filesystem.FileSystem.Lineage`.FILE.
"""
filesystem = FileSystems.get_filesystem(path)
filesystem.report_lineage(path, Lineage.sinks(), level=level)
FileSystems.get_filesystem(path).report_lineage(path, Lineage.sinks())
7 changes: 3 additions & 4 deletions sdks/python/apache_beam/io/gcp/gcsfilesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,14 +366,13 @@ def delete(self, paths):
if exceptions:
raise BeamIOError("Delete operation failed", exceptions)

def report_lineage(self, path, lineage, level=None):
def report_lineage(self, path, lineage):
try:
components = gcsio.parse_gcs_path(path, object_optional=True)
except ValueError:
# report lineage is fail-safe
traceback.print_exc()
return
if level == FileSystem.LineageLevel.TOP_LEVEL \
or(len(components) > 1 and components[-1] == ''):
# bucket only
if components and not components[-1]:
components = components[:-1]
lineage.add('gcs', *components, last_segment_sep='/')
7 changes: 2 additions & 5 deletions sdks/python/apache_beam/io/localfilesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,8 +365,5 @@ def try_delete(path):
if exceptions:
raise BeamIOError("Delete operation failed", exceptions)

def report_lineage(self, path, lineage, level=None):
if level == FileSystem.LineageLevel.TOP_LEVEL:
lineage.add('filesystem', 'localhost')
else:
lineage.add('filesystem', 'localhost', path, last_segment_sep='/')
def report_lineage(self, path, lineage):
lineage.add('filesystem', 'localhost', path, last_segment_sep='/')

0 comments on commit 30288ae

Please sign in to comment.