Skip to content

Commit

Permalink
Migrate lineage counters to bounded tries.
Browse files Browse the repository at this point in the history
  • Loading branch information
robertwb committed Dec 14, 2024
1 parent 0768e10 commit d15b55a
Show file tree
Hide file tree
Showing 9 changed files with 61 additions and 18 deletions.
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/io/aws/s3filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,4 +325,4 @@ def report_lineage(self, path, lineage, level=None):
(len(components) > 1 and components[-1] == ''):
# bucket only
components = components[:-1]
lineage.add('s3', *components)
lineage.add('s3', *components, last_segment_sep='/')
3 changes: 2 additions & 1 deletion sdks/python/apache_beam/io/aws/s3filesystem_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,8 @@ def test_lineage(self):
def _verify_lineage(self, uri, expected_segments):
lineage_mock = mock.MagicMock()
self.fs.report_lineage(uri, lineage_mock)
lineage_mock.add.assert_called_once_with("s3", *expected_segments)
lineage_mock.add.assert_called_once_with(
"s3", *expected_segments, last_segment_sep='/')


if __name__ == '__main__':
Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/io/azure/blobstoragefilesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,4 +328,4 @@ def report_lineage(self, path, lineage, level=None):
or(len(components) > 1 and components[-1] == ''):
# bucket only
components = components[:-1]
lineage.add('abs', *components)
lineage.add('abs', *components, last_segment_sep='/')
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,8 @@ def test_lineage(self):
def _verify_lineage(self, uri, expected_segments):
lineage_mock = mock.MagicMock()
self.fs.report_lineage(uri, lineage_mock)
lineage_mock.add.assert_called_once_with("abs", *expected_segments)
lineage_mock.add.assert_called_once_with(
"abs", *expected_segments, last_segment_sep='/')


if __name__ == '__main__':
Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/io/gcp/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -1163,7 +1163,7 @@ def split(self, desired_bundle_size, start_position=None, stop_position=None):
self.table_reference.datasetId,
self.table_reference.tableId)
Lineage.sources().add(
"bigquery",
'bigquery',
self.table_reference.projectId,
self.table_reference.datasetId,
self.table_reference.tableId)
Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/io/gcp/gcsfilesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,4 +376,4 @@ def report_lineage(self, path, lineage, level=None):
or(len(components) > 1 and components[-1] == ''):
# bucket only
components = components[:-1]
lineage.add('gcs', *components)
lineage.add('gcs', *components, last_segment_sep='/')
3 changes: 2 additions & 1 deletion sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,8 @@ def test_lineage(self):
def _verify_lineage(self, uri, expected_segments):
lineage_mock = mock.MagicMock()
self.fs.report_lineage(uri, lineage_mock)
lineage_mock.add.assert_called_once_with("gcs", *expected_segments)
lineage_mock.add.assert_called_once_with(
"gcs", *expected_segments, last_segment_sep='/')


if __name__ == '__main__':
Expand Down
51 changes: 43 additions & 8 deletions sdks/python/apache_beam/metrics/metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from typing import Dict
from typing import FrozenSet
from typing import Iterable
from typing import Iterator
from typing import List
from typing import Optional
from typing import Set
Expand Down Expand Up @@ -342,8 +343,8 @@ class Lineage:
SINK = "sinks"

_METRICS = {
SOURCE: Metrics.string_set(LINEAGE_NAMESPACE, SOURCE),
SINK: Metrics.string_set(LINEAGE_NAMESPACE, SINK)
SOURCE: Metrics.bounded_trie(LINEAGE_NAMESPACE, SOURCE),
SINK: Metrics.bounded_trie(LINEAGE_NAMESPACE, SINK)
}

def __init__(self, label: str) -> None:
Expand Down Expand Up @@ -392,8 +393,32 @@ def get_fq_name(
return ':'.join((system, subtype, segs))
return ':'.join((system, segs))

@staticmethod
def _get_fqn_parts(
system: str,
*segments: str,
subtype: Optional[str] = None,
last_segment_sep=None) -> Iterator[str]:
yield system + ':'
if subtype:
yield subtype + ':'
if segments:
for segment in segments[:-1]:
yield segment + '.'
if last_segment_sep:
sub_segments = segments[-1].split(last_segment_sep)
for sub_segment in sub_segments[:-1]:
yield sub_segment + last_segment_sep
yield sub_segments[-1]
else:
yield segments[-1]

def add(
self, system: str, *segments: str, subtype: Optional[str] = None) -> None:
self,
system: str,
*segments: str,
subtype: Optional[str] = None,
last_segment_sep=None) -> None:
"""
Adds the given details as Lineage.
Expand All @@ -414,11 +439,21 @@ def add(
The first positional argument serves as system, if full segments are
provided, or the full FQN if it is provided as a single argument.
"""
system_or_details = system
if len(segments) == 0 and subtype is None:
self.metric.add(system_or_details)
else:
self.metric.add(self.get_fq_name(system, *segments, subtype=subtype))
self.add_raw(
*self._get_fqn_parts(
system,
*segments,
subtype=subtype,
last_segment_sep=last_segment_sep))

def add_raw(self, *rollup_segments: str):
"""Adds the given fqn as lineage.
`rollup_segments` should be an iterable of strings whose concatenation
is a valid Dataplex FQN. In particular, this means they will often have
trailing delimiters.
"""
self.metric.add(rollup_segments)

@staticmethod
def query(results: MetricResults, label: str) -> Set[str]:
Expand Down
11 changes: 8 additions & 3 deletions sdks/python/apache_beam/metrics/metric_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,14 +271,19 @@ def test_fq_name(self):

def test_add(self):
lineage = Lineage(Lineage.SOURCE)
stringset = set()
added = set()
# override
lineage.metric = stringset
lineage.metric = added
lineage.add("s", "1", "2")
lineage.add("s:3.4")
lineage.add("s", "5", "6.7")
lineage.add("s", "1", "2", subtype="t")
self.assertSetEqual(stringset, {"s:1.2", "s:3.4", "s:t:1.2", "s:5.`6.7`"})
lineage.add("sys", "seg1", "seg2", "seg3/part2/part3", last_segment_sep='/')
self.assertSetEqual(
added,
{('s:', '1.', '2'), ('s:3.4:', ), ('s:', '5.', '6.7'),
('s:', 't:', '1.', '2'),
('sys:', 'seg1.', 'seg2.', 'seg3/', 'part2/', 'part3')})


if __name__ == '__main__':
Expand Down

0 comments on commit d15b55a

Please sign in to comment.