From 00445adda914c210a938924df59d84d815060bd0 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Tue, 1 Oct 2024 16:09:43 -0400 Subject: [PATCH] Support string FQN as a way to add lineage information (#32613) * Support string FQN as a way to add lineage information * clarify the two use case of lineage.add --- sdks/python/apache_beam/metrics/metric.py | 26 ++++++++++++++++++- .../python/apache_beam/metrics/metric_test.py | 11 ++++++++ 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/metrics/metric.py b/sdks/python/apache_beam/metrics/metric.py index 065a497786cb..f402c0acab2f 100644 --- a/sdks/python/apache_beam/metrics/metric.py +++ b/sdks/python/apache_beam/metrics/metric.py @@ -370,7 +370,31 @@ def get_fq_name( def add( self, system: str, *segments: str, subtype: Optional[str] = None) -> None: - self.metric.add(self.get_fq_name(system, *segments, subtype=subtype)) + """ + Adds the given details as Lineage. + + For asset level lineage the resource location should be specified as + Dataplex FQN, see + https://cloud.google.com/data-catalog/docs/fully-qualified-names + + Example of adding FQN components: + + - `add("system", "segment1", "segment2")` + - `add("system", "segment1", "segment2", subtype="subtype")` + + Example of adding a FQN: + + - `add("system:segment1.segment2")` + - `add("system:subtype:segment1.segment2")` + + The first positional argument serves as system, if full segments are + provided, or the full FQN if it is provided as a single argument. + """ + system_or_details = system + if len(segments) == 0 and subtype is None: + self.metric.add(system_or_details) + else: + self.metric.add(self.get_fq_name(system, *segments, subtype=subtype)) @staticmethod def query(results: MetricResults, label: str) -> Set[str]: diff --git a/sdks/python/apache_beam/metrics/metric_test.py b/sdks/python/apache_beam/metrics/metric_test.py index 1c6a11e200a5..524a2143172d 100644 --- a/sdks/python/apache_beam/metrics/metric_test.py +++ b/sdks/python/apache_beam/metrics/metric_test.py @@ -269,6 +269,17 @@ def test_fq_name(self): "apache:beam:" + v + '.' + v, Lineage.get_fq_name("apache", k, k, subtype="beam")) + def test_add(self): + lineage = Lineage(Lineage.SOURCE) + stringset = set() + # override + lineage.metric = stringset + lineage.add("s", "1", "2") + lineage.add("s:3.4") + lineage.add("s", "5", "6.7") + lineage.add("s", "1", "2", subtype="t") + self.assertSetEqual(stringset, {"s:1.2", "s:3.4", "s:t:1.2", "s:5.`6.7`"}) + if __name__ == '__main__': unittest.main()