From 0dd22b8228c81de7e903b9f050222ee2c8fe6165 Mon Sep 17 00:00:00 2001 From: Reeba Qureshi <64488642+reeba212@users.noreply.github.com> Date: Thu, 22 Aug 2024 23:28:31 +0530 Subject: [PATCH 01/14] Create use case for enriching spanner data with bigquery End to end use case that demonstrates how spanner IO and enrichment transform coupled with other YAML transforms can be used in the real world --- .../enrich_spanner_with_bigquery.yaml | 79 +++++++++++++++++++ 1 file changed, 79 insertions(+) create mode 100644 sdks/python/apache_beam/yaml/examples/enrich_spanner_with_bigquery.yaml diff --git a/sdks/python/apache_beam/yaml/examples/enrich_spanner_with_bigquery.yaml b/sdks/python/apache_beam/yaml/examples/enrich_spanner_with_bigquery.yaml new file mode 100644 index 000000000000..f4b897302a7b --- /dev/null +++ b/sdks/python/apache_beam/yaml/examples/enrich_spanner_with_bigquery.yaml @@ -0,0 +1,79 @@ +pipeline: + transforms: + # Step 1: Read orders details from Spanner + - type: ReadFromSpanner + name: ReadOrders + config: + project_id: 'apache-beam-testing' + instance_id: 'orders-test' + database_id: 'order-database' + query: 'SELECT customer_id, product_id, order_date, order_amount FROM orders' + + # Step 2: Enrich order details with customers details from BigQuery + - type: Enrichment + name: Enriched + input: ReadOrders + config: + enrichment_handler: 'BigQuery' + handler_config: + project: "apache-beam-testing" + table_name: "apache-beam-testing.ALL_TEST.customers" + row_restriction_template: "customer_id = 1001 or customer_id = 1003" + fields: ["customer_id"] + + # Step 3: Map enriched values to fields + - type: MapToFields + name: MapEnrichedValues + input: Enriched + config: + language: python + fields: + customer_id: + callable: 'lambda x: x.customer_id' + output_type: integer + customer_name: + callable: 'lambda x: x.customer_name' + output_type: string + customer_email: + callable: 'lambda x: x.customer_email' + output_type: string + product_id: + callable: 'lambda x: x.product_id' + output_type: integer + order_date: + callable: 'lambda x: x.order_date' + output_type: string + order_amount: + callable: 'lambda x: x.order_amount' + output_type: integer + + # Step 4: Filter orders with amount greater than 100 + - type: Filter + name: FilterHighValueOrders + input: MapEnrichedValues + config: + keep: "order_amount > 100" + language: "python" + + + # Step 6: Write processed order to another spanner table + - type: WriteToSpanner + name: WriteProcessedOrders + input: FilterHighValueOrders + config: + project_id: 'apache-beam-testing' + instance_id: 'orders-test' + database_id: 'order-database' + table_id: 'orders_with_customers' + error_handling: + output: my_error_output + + # Step 7: Handle write errors by writing to JSON + - type: WriteToJson + name: WriteErrorsToJson + input: WriteProcessedOrders.my_error_output + config: + path: 'errors.json' +options: + project_id: "apache-beam-testing" + From c0ea43abc11b81e0e206a58dc1c6d9cbb25789d7 Mon Sep 17 00:00:00 2001 From: Reeba Qureshi <64488642+reeba212@users.noreply.github.com> Date: Thu, 22 Aug 2024 23:45:02 +0530 Subject: [PATCH 02/14] Create example for bigtable enrichment --- .../yaml/examples/bigtable_enrichment.yaml | 35 +++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100644 sdks/python/apache_beam/yaml/examples/bigtable_enrichment.yaml diff --git a/sdks/python/apache_beam/yaml/examples/bigtable_enrichment.yaml b/sdks/python/apache_beam/yaml/examples/bigtable_enrichment.yaml new file mode 100644 index 000000000000..59ddbeb77b1e --- /dev/null +++ b/sdks/python/apache_beam/yaml/examples/bigtable_enrichment.yaml @@ -0,0 +1,35 @@ +pipeline: + type: chain + transforms: + + # Step 1: Creating a collection of elements that needs + # to be enriched. Here we are simulating sales data + - type: Create + config: + elements: + - sale_id: 1 + customer_id: 1 + product_id: 1 + quantity: 1 + + # Step 2: Enriching the data with Bigtable + # This specific bigtable stores product data in the below format + # product:product_id, product:product_name, product:product_stock + - type: Enrichment + config: + enrichment_handler: 'BigTable' + handler_config: + project_id: 'apache-beam-testing' + instance_id: 'beam-test' + table_id: 'bigtable-enrichment-test' + row_key: 'product_id' + timeout: 30 + + # Step 3: Logging for testing + # This is a simple way to view the enriched data + # We can also store it somewhere like a json file + - type: LogForTesting + + # The logs will show the below output + # {"sale_id": 1, "customer_id": 1, "product_id": 1, "quantity": 1, "product": {"product_id": "1", "product_name": "pixel 5", "product_stock": "2"}} + # We can see that the original collection we created has been enriched from the product data from bigtable From 78da9025880c821f3e8f44644434ceee81c7f4e2 Mon Sep 17 00:00:00 2001 From: Reeba Qureshi <64488642+reeba212@users.noreply.github.com> Date: Fri, 23 Aug 2024 00:15:12 +0530 Subject: [PATCH 03/14] Add project_id parameter to BigQueryWrapper --- sdks/python/apache_beam/io/gcp/bigquery_tools.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py index 48da929a07b2..6ee5fd0462e2 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py @@ -352,8 +352,8 @@ class BigQueryWrapper(object): HISTOGRAM_METRIC_LOGGER = MetricLogger() - def __init__(self, client=None, temp_dataset_id=None, temp_table_ref=None): - self.client = client or BigQueryWrapper._bigquery_client(PipelineOptions()) + def __init__(self, client=None, temp_dataset_id=None, temp_table_ref=None, project_id = "apache-beam-testing"): + self.client = client or BigQueryWrapper._bigquery_client(PipelineOptions(project = project_id)) self.gcp_bq_client = client or gcp_bigquery.Client( client_info=ClientInfo( user_agent="apache-beam-%s" % apache_beam.__version__)) From ae0d9729918c94c1e936dc2e3f437d30adab6bf9 Mon Sep 17 00:00:00 2001 From: Reeba Qureshi Date: Sat, 24 Aug 2024 01:48:53 +0530 Subject: [PATCH 04/14] minor changes --- sdks/python/apache_beam/io/gcp/bigquery_tools.py | 2 +- .../apache_beam/yaml/examples/bigtable_enrichment.yaml | 5 ++--- .../yaml/examples/enrich_spanner_with_bigquery.yaml | 5 ++--- sdks/python/apache_beam/yaml/integration_tests.py | 2 +- 4 files changed, 6 insertions(+), 8 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py index 6ee5fd0462e2..4c9fc47d23c6 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py @@ -352,7 +352,7 @@ class BigQueryWrapper(object): HISTOGRAM_METRIC_LOGGER = MetricLogger() - def __init__(self, client=None, temp_dataset_id=None, temp_table_ref=None, project_id = "apache-beam-testing"): + def __init__(self, client=None, temp_dataset_id=None, temp_table_ref=None, project_id = None): self.client = client or BigQueryWrapper._bigquery_client(PipelineOptions(project = project_id)) self.gcp_bq_client = client or gcp_bigquery.Client( client_info=ClientInfo( diff --git a/sdks/python/apache_beam/yaml/examples/bigtable_enrichment.yaml b/sdks/python/apache_beam/yaml/examples/bigtable_enrichment.yaml index 59ddbeb77b1e..8a55ea9a6e8e 100644 --- a/sdks/python/apache_beam/yaml/examples/bigtable_enrichment.yaml +++ b/sdks/python/apache_beam/yaml/examples/bigtable_enrichment.yaml @@ -30,6 +30,5 @@ pipeline: # We can also store it somewhere like a json file - type: LogForTesting - # The logs will show the below output - # {"sale_id": 1, "customer_id": 1, "product_id": 1, "quantity": 1, "product": {"product_id": "1", "product_name": "pixel 5", "product_stock": "2"}} - # We can see that the original collection we created has been enriched from the product data from bigtable + # Expected: + # Row(sale_id = 1, customer_id = 1, product_id = 1, quantity = 1, product = Row(product_id: "1", product_name = "pixel 5", product_stock = "2")) diff --git a/sdks/python/apache_beam/yaml/examples/enrich_spanner_with_bigquery.yaml b/sdks/python/apache_beam/yaml/examples/enrich_spanner_with_bigquery.yaml index f4b897302a7b..6804bc9729f7 100644 --- a/sdks/python/apache_beam/yaml/examples/enrich_spanner_with_bigquery.yaml +++ b/sdks/python/apache_beam/yaml/examples/enrich_spanner_with_bigquery.yaml @@ -21,7 +21,8 @@ pipeline: row_restriction_template: "customer_id = 1001 or customer_id = 1003" fields: ["customer_id"] - # Step 3: Map enriched values to fields + # Step 3: Map enriched values to Beam schema + # TODO: This should be removed when schema'd enrichment is available - type: MapToFields name: MapEnrichedValues input: Enriched @@ -74,6 +75,4 @@ pipeline: input: WriteProcessedOrders.my_error_output config: path: 'errors.json' -options: - project_id: "apache-beam-testing" diff --git a/sdks/python/apache_beam/yaml/integration_tests.py b/sdks/python/apache_beam/yaml/integration_tests.py index 72b3918195da..08533e2ccfdc 100644 --- a/sdks/python/apache_beam/yaml/integration_tests.py +++ b/sdks/python/apache_beam/yaml/integration_tests.py @@ -65,7 +65,7 @@ def temp_spanner_table(project, prefix='temp_spanner_db_'): @contextlib.contextmanager def temp_bigquery_table(project, prefix='yaml_bq_it_'): - bigquery_client = BigQueryWrapper() + bigquery_client = BigQueryWrapper(project_id = "apache-beam-testing") dataset_id = '%s_%s' % (prefix, uuid.uuid4().hex) bigquery_client.get_or_create_dataset(project, dataset_id) logging.info("Created dataset %s in project %s", dataset_id, project) From 85eda70969fb4a923bc3b8ae9cfe91f5e9d578f5 Mon Sep 17 00:00:00 2001 From: Reeba Qureshi Date: Wed, 28 Aug 2024 22:42:10 +0530 Subject: [PATCH 05/14] remove project id being passed into bigquery wrapper --- sdks/python/apache_beam/io/gcp/bigquery_tools.py | 2 +- sdks/python/apache_beam/yaml/integration_tests.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py index 4c9fc47d23c6..42087d76b3f8 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py @@ -353,7 +353,7 @@ class BigQueryWrapper(object): HISTOGRAM_METRIC_LOGGER = MetricLogger() def __init__(self, client=None, temp_dataset_id=None, temp_table_ref=None, project_id = None): - self.client = client or BigQueryWrapper._bigquery_client(PipelineOptions(project = project_id)) + self.client = client or BigQueryWrapper._bigquery_client(PipelineOptions()) self.gcp_bq_client = client or gcp_bigquery.Client( client_info=ClientInfo( user_agent="apache-beam-%s" % apache_beam.__version__)) diff --git a/sdks/python/apache_beam/yaml/integration_tests.py b/sdks/python/apache_beam/yaml/integration_tests.py index 08533e2ccfdc..72b3918195da 100644 --- a/sdks/python/apache_beam/yaml/integration_tests.py +++ b/sdks/python/apache_beam/yaml/integration_tests.py @@ -65,7 +65,7 @@ def temp_spanner_table(project, prefix='temp_spanner_db_'): @contextlib.contextmanager def temp_bigquery_table(project, prefix='yaml_bq_it_'): - bigquery_client = BigQueryWrapper(project_id = "apache-beam-testing") + bigquery_client = BigQueryWrapper() dataset_id = '%s_%s' % (prefix, uuid.uuid4().hex) bigquery_client.get_or_create_dataset(project, dataset_id) logging.info("Created dataset %s in project %s", dataset_id, project) From 29507beb2382e5a36eaeb4bccda4796aa168abb3 Mon Sep 17 00:00:00 2001 From: Reeba Qureshi Date: Fri, 18 Oct 2024 01:24:27 +0530 Subject: [PATCH 06/14] add license --- .../yaml/examples/bigtable_enrichment.yaml | 18 ++++++++++++++++++ .../examples/enrich_spanner_with_bigquery.yaml | 18 ++++++++++++++++++ 2 files changed, 36 insertions(+) diff --git a/sdks/python/apache_beam/yaml/examples/bigtable_enrichment.yaml b/sdks/python/apache_beam/yaml/examples/bigtable_enrichment.yaml index 8a55ea9a6e8e..daaafe0d4031 100644 --- a/sdks/python/apache_beam/yaml/examples/bigtable_enrichment.yaml +++ b/sdks/python/apache_beam/yaml/examples/bigtable_enrichment.yaml @@ -1,3 +1,21 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + pipeline: type: chain transforms: diff --git a/sdks/python/apache_beam/yaml/examples/enrich_spanner_with_bigquery.yaml b/sdks/python/apache_beam/yaml/examples/enrich_spanner_with_bigquery.yaml index 6804bc9729f7..96ddd65afa06 100644 --- a/sdks/python/apache_beam/yaml/examples/enrich_spanner_with_bigquery.yaml +++ b/sdks/python/apache_beam/yaml/examples/enrich_spanner_with_bigquery.yaml @@ -1,3 +1,21 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + pipeline: transforms: # Step 1: Read orders details from Spanner From b4aa22feb32755a2ec7cb45102500ed78ab1a2ca Mon Sep 17 00:00:00 2001 From: Reeba Qureshi Date: Wed, 6 Nov 2024 14:14:26 +0530 Subject: [PATCH 07/14] add expected blocks --- .../apache_beam/yaml/examples/bigtable_enrichment.yaml | 7 +++++-- .../yaml/examples/enrich_spanner_with_bigquery.yaml | 3 +++ .../apache_beam/yaml/examples/testing/examples_test.py | 6 ++++-- 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/yaml/examples/bigtable_enrichment.yaml b/sdks/python/apache_beam/yaml/examples/bigtable_enrichment.yaml index daaafe0d4031..788b69de7857 100644 --- a/sdks/python/apache_beam/yaml/examples/bigtable_enrichment.yaml +++ b/sdks/python/apache_beam/yaml/examples/bigtable_enrichment.yaml @@ -48,5 +48,8 @@ pipeline: # We can also store it somewhere like a json file - type: LogForTesting - # Expected: - # Row(sale_id = 1, customer_id = 1, product_id = 1, quantity = 1, product = Row(product_id: "1", product_name = "pixel 5", product_stock = "2")) +options: + yaml_experimental_features: Enrichment + +# Expected: +# Row(sale_id=1, customer_id=1, product_id=1, quantity=1, product={'product_id': '1', 'product_name': 'pixel 5', 'product_stock': '2'}) \ No newline at end of file diff --git a/sdks/python/apache_beam/yaml/examples/enrich_spanner_with_bigquery.yaml b/sdks/python/apache_beam/yaml/examples/enrich_spanner_with_bigquery.yaml index 96ddd65afa06..56d1408f0818 100644 --- a/sdks/python/apache_beam/yaml/examples/enrich_spanner_with_bigquery.yaml +++ b/sdks/python/apache_beam/yaml/examples/enrich_spanner_with_bigquery.yaml @@ -94,3 +94,6 @@ pipeline: config: path: 'errors.json' +# Expected: +# Row(customer_id=1001, customer_name='Alice', customer_email='alice@gmail.com', product_id=2001, order_date='24-03-24', order_amount=150) +# Row(customer_id=1003, customer_name='Claire', customer_email='claire@gmail.com', product_id=2003, order_date='7-05-24', order_amount=110) diff --git a/sdks/python/apache_beam/yaml/examples/testing/examples_test.py b/sdks/python/apache_beam/yaml/examples/testing/examples_test.py index 3b497ed1efab..c75d21f79eb0 100644 --- a/sdks/python/apache_beam/yaml/examples/testing/examples_test.py +++ b/sdks/python/apache_beam/yaml/examples/testing/examples_test.py @@ -213,9 +213,11 @@ def _wordcount_test_preprocessor( 'test_simple_filter_yaml', 'test_simple_filter_and_combine_yaml', 'test_spanner_read_yaml', - 'test_spanner_write_yaml' + 'test_spanner_write_yaml', + 'test_bigtable_enrichment_yaml', + 'test_enrich_spanner_with_bigquery_yaml' ]) -def _io_write_test_preprocessor( +def _file_io_write_test_preprocessor( test_spec: dict, expected: List[str], env: TestEnvironment): if pipeline := test_spec.get('pipeline', None): From a138d42a6a2914f870914add7519e52d9e08b18e Mon Sep 17 00:00:00 2001 From: Reeba Qureshi Date: Wed, 4 Dec 2024 15:51:18 +0530 Subject: [PATCH 08/14] Update examples_test.py --- .../apache_beam/yaml/examples/testing/examples_test.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/yaml/examples/testing/examples_test.py b/sdks/python/apache_beam/yaml/examples/testing/examples_test.py index c75d21f79eb0..3b497ed1efab 100644 --- a/sdks/python/apache_beam/yaml/examples/testing/examples_test.py +++ b/sdks/python/apache_beam/yaml/examples/testing/examples_test.py @@ -213,11 +213,9 @@ def _wordcount_test_preprocessor( 'test_simple_filter_yaml', 'test_simple_filter_and_combine_yaml', 'test_spanner_read_yaml', - 'test_spanner_write_yaml', - 'test_bigtable_enrichment_yaml', - 'test_enrich_spanner_with_bigquery_yaml' + 'test_spanner_write_yaml' ]) -def _file_io_write_test_preprocessor( +def _io_write_test_preprocessor( test_spec: dict, expected: List[str], env: TestEnvironment): if pipeline := test_spec.get('pipeline', None): From 7ae42843ff6ecd921124ac8a701219c5f17c43ef Mon Sep 17 00:00:00 2001 From: Reeba Qureshi Date: Thu, 5 Dec 2024 23:06:43 +0530 Subject: [PATCH 09/14] Update examples_test.py --- .../python/apache_beam/yaml/examples/testing/examples_test.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/yaml/examples/testing/examples_test.py b/sdks/python/apache_beam/yaml/examples/testing/examples_test.py index 3b497ed1efab..6ceae918b168 100644 --- a/sdks/python/apache_beam/yaml/examples/testing/examples_test.py +++ b/sdks/python/apache_beam/yaml/examples/testing/examples_test.py @@ -213,7 +213,9 @@ def _wordcount_test_preprocessor( 'test_simple_filter_yaml', 'test_simple_filter_and_combine_yaml', 'test_spanner_read_yaml', - 'test_spanner_write_yaml' + 'test_spanner_write_yaml', + 'test_bigtable_enrichment_yaml', + 'test_enrich_spanner_with_bigquery_yaml' ]) def _io_write_test_preprocessor( test_spec: dict, expected: List[str], env: TestEnvironment): From fad1f7fe8f27d2780a31f3ebc1d39041c4960f34 Mon Sep 17 00:00:00 2001 From: Reeba Qureshi Date: Thu, 5 Dec 2024 23:56:42 +0530 Subject: [PATCH 10/14] fix formatting --- sdks/python/apache_beam/io/gcp/bigquery_tools.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py index 42087d76b3f8..c3d59aa08670 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py @@ -352,7 +352,12 @@ class BigQueryWrapper(object): HISTOGRAM_METRIC_LOGGER = MetricLogger() - def __init__(self, client=None, temp_dataset_id=None, temp_table_ref=None, project_id = None): + def __init__( + self, + client=None, + temp_dataset_id=None, + temp_table_ref=None, + project_id=None): self.client = client or BigQueryWrapper._bigquery_client(PipelineOptions()) self.gcp_bq_client = client or gcp_bigquery.Client( client_info=ClientInfo( From 0f323235de77e1bcfdb93a90229b80e118fa0a1e Mon Sep 17 00:00:00 2001 From: Jeffrey Kinard Date: Fri, 6 Dec 2024 17:38:36 -0500 Subject: [PATCH 11/14] fix examples_test Signed-off-by: Jeffrey Kinard --- .../yaml/examples/testing/examples_test.py | 166 +++++++++++++++++- .../ml}/bigtable_enrichment.yaml | 0 .../ml}/enrich_spanner_with_bigquery.yaml | 17 +- 3 files changed, 169 insertions(+), 14 deletions(-) rename sdks/python/apache_beam/yaml/examples/{ => transforms/ml}/bigtable_enrichment.yaml (100%) rename sdks/python/apache_beam/yaml/examples/{ => transforms/ml}/enrich_spanner_with_bigquery.yaml (89%) diff --git a/sdks/python/apache_beam/yaml/examples/testing/examples_test.py b/sdks/python/apache_beam/yaml/examples/testing/examples_test.py index 6ceae918b168..ce1948d51a2e 100644 --- a/sdks/python/apache_beam/yaml/examples/testing/examples_test.py +++ b/sdks/python/apache_beam/yaml/examples/testing/examples_test.py @@ -21,9 +21,11 @@ import os import random import unittest +from typing import Any from typing import Callable from typing import Dict from typing import List +from typing import Optional from typing import Union from unittest import mock @@ -34,11 +36,63 @@ from apache_beam.examples.snippets.util import assert_matches_stdout from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.yaml import yaml_provider from apache_beam.yaml import yaml_transform from apache_beam.yaml.readme_test import TestEnvironment from apache_beam.yaml.readme_test import replace_recursive +# Used to simulate Enrichment transform during tests +# The GitHub action that invokes these tests does not +# have gcp dependencies installed which is a prerequisite +# to apache_beam.transforms.enrichment.Enrichment as a top-level +# import. +@beam.ptransform.ptransform_fn +def test_enrichment( + pcoll, + enrichment_handler: str, + handler_config: Dict[str, Any], + timeout: Optional[float] = 30): + if enrichment_handler == 'BigTable': + row_key = handler_config['row_key'] + bt_data = INPUT_TABLES[( + 'BigTable', handler_config['instance_id'], handler_config['table_id'])] + products = {str(data[row_key]): data for data in bt_data} + + def _fn(row): + left = row._asdict() + right = products[str(row[row_key])] + left['product'] = left.get('product', None) or right + return beam.Row(**row) + elif enrichment_handler == 'BigQuery': + row_key = handler_config['fields'] + dataset, table = handler_config['table_name'].split('.')[-2:] + bq_data = INPUT_TABLES[('BigQuery', str(dataset), str(table))] + products = { + tuple(str(data[key]) for key in row_key): data + for data in bq_data + } + + def _fn(row): + left = row._asdict() + right = products[tuple(str(left[k]) for k in row_key)] + row = { + key: left.get(key, None) or right[key] + for key in {*left.keys(), *right.keys()} + } + return beam.Row(**row) + + else: + raise ValueError(f'{enrichment_handler} is not a valid enrichment_handler.') + + return pcoll | beam.Map(_fn) + + +TEST_PROVIDERS = { + 'TestEnrichment': test_enrichment, +} + + def check_output(expected: List[str]): def _check_inner(actual: List[PCollection[str]]): formatted_actual = actual | beam.Flatten() | beam.Map( @@ -59,7 +113,31 @@ def products_csv(): ]) -def spanner_data(): +def spanner_orders_data(): + return [{ + 'order_id': 1, + 'customer_id': 1001, + 'product_id': 2001, + 'order_date': '24-03-24', + 'order_amount': 150, + }, + { + 'order_id': 2, + 'customer_id': 1002, + 'product_id': 2002, + 'order_date': '19-04-24', + 'order_amount': 90, + }, + { + 'order_id': 3, + 'customer_id': 1003, + 'product_id': 2003, + 'order_date': '7-05-24', + 'order_amount': 110, + }] + + +def spanner_shipments_data(): return [{ 'shipment_id': 'S1', 'customer_id': 'C1', @@ -110,6 +188,44 @@ def spanner_data(): }] +def bigtable_data(): + return [{ + 'product_id': '1', 'product_name': 'pixel 5', 'product_stock': '2' + }, { + 'product_id': '2', 'product_name': 'pixel 6', 'product_stock': '4' + }, { + 'product_id': '3', 'product_name': 'pixel 7', 'product_stock': '20' + }, { + 'product_id': '4', 'product_name': 'pixel 8', 'product_stock': '10' + }, { + 'product_id': '5', 'product_name': 'pixel 11', 'product_stock': '3' + }, { + 'product_id': '6', 'product_name': 'pixel 12', 'product_stock': '7' + }, { + 'product_id': '7', 'product_name': 'pixel 13', 'product_stock': '8' + }, { + 'product_id': '8', 'product_name': 'pixel 14', 'product_stock': '3' + }] + + +def bigquery_data(): + return [{ + 'customer_id': 1001, + 'customer_name': 'Alice', + 'customer_email': 'alice@gmail.com' + }, + { + 'customer_id': 1002, + 'customer_name': 'Bob', + 'customer_email': 'bob@gmail.com' + }, + { + 'customer_id': 1003, + 'customer_name': 'Claire', + 'customer_email': 'claire@gmail.com' + }] + + def create_test_method( pipeline_spec_file: str, custom_preprocessors: List[Callable[..., Union[Dict, List]]]): @@ -135,7 +251,11 @@ def test_yaml_example(self): pickle_library='cloudpickle', **yaml_transform.SafeLineLoader.strip_metadata(pipeline_spec.get( 'options', {})))) as p: - actual = [yaml_transform.expand_pipeline(p, pipeline_spec)] + actual = [ + yaml_transform.expand_pipeline( + p, + pipeline_spec, [yaml_provider.InlineProvider(TEST_PROVIDERS)]) + ] if not actual[0]: actual = list(p.transforms_stack[0].parts[-1].outputs.values()) for transform in p.transforms_stack[0].parts[:-1]: @@ -214,7 +334,6 @@ def _wordcount_test_preprocessor( 'test_simple_filter_and_combine_yaml', 'test_spanner_read_yaml', 'test_spanner_write_yaml', - 'test_bigtable_enrichment_yaml', 'test_enrich_spanner_with_bigquery_yaml' ]) def _io_write_test_preprocessor( @@ -251,7 +370,8 @@ def _file_io_read_test_preprocessor( return test_spec -@YamlExamplesTestSuite.register_test_preprocessor(['test_spanner_read_yaml']) +@YamlExamplesTestSuite.register_test_preprocessor( + ['test_spanner_read_yaml', 'test_enrich_spanner_with_bigquery_yaml']) def _spanner_io_read_test_preprocessor( test_spec: dict, expected: List[str], env: TestEnvironment): @@ -267,14 +387,42 @@ def _spanner_io_read_test_preprocessor( k: v for k, v in config.items() if k.startswith('__') } - transform['config']['elements'] = INPUT_TABLES[( - str(instance), str(database), str(table))] + elements = INPUT_TABLES[(str(instance), str(database), str(table))] + if config.get('query', None): + config['query'].replace('select ', + 'SELECT ').replace(' from ', ' FROM ') + columns = set( + ''.join(config['query'].split('SELECT ')[1:]).split( + ' FROM', maxsplit=1)[0]) + if columns != {'*'}: + elements = [{ + column: element[column] + for column in element if column in columns + } for element in elements] + transform['config']['elements'] = elements + + return test_spec + + +@YamlExamplesTestSuite.register_test_preprocessor( + ['test_bigtable_enrichment_yaml', 'test_enrich_spanner_with_bigquery_yaml']) +def _enrichment_test_preprocessor( + test_spec: dict, expected: List[str], env: TestEnvironment): + if pipeline := test_spec.get('pipeline', None): + for transform in pipeline.get('transforms', []): + if transform.get('type', '').startswith('Enrichment'): + transform['type'] = 'TestEnrichment' return test_spec INPUT_FILES = {'products.csv': products_csv()} -INPUT_TABLES = {('shipment-test', 'shipment', 'shipments'): spanner_data()} +INPUT_TABLES = { + ('shipment-test', 'shipment', 'shipments'): spanner_shipments_data(), + ('orders-test', 'order-database', 'orders'): spanner_orders_data(), + ('BigTable', 'beam-test', 'bigtable-enrichment-test'): bigtable_data(), + ('BigQuery', 'ALL_TEST', 'customers'): bigquery_data() +} YAML_DOCS_DIR = os.path.join(os.path.dirname(__file__)) ExamplesTest = YamlExamplesTestSuite( @@ -292,6 +440,10 @@ def _spanner_io_read_test_preprocessor( 'IOExamplesTest', os.path.join(YAML_DOCS_DIR, '../transforms/io/*.yaml')).run() +MLTest = YamlExamplesTestSuite( + 'MLExamplesTest', os.path.join(YAML_DOCS_DIR, + '../transforms/ml/*.yaml')).run() + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) unittest.main() diff --git a/sdks/python/apache_beam/yaml/examples/bigtable_enrichment.yaml b/sdks/python/apache_beam/yaml/examples/transforms/ml/bigtable_enrichment.yaml similarity index 100% rename from sdks/python/apache_beam/yaml/examples/bigtable_enrichment.yaml rename to sdks/python/apache_beam/yaml/examples/transforms/ml/bigtable_enrichment.yaml diff --git a/sdks/python/apache_beam/yaml/examples/enrich_spanner_with_bigquery.yaml b/sdks/python/apache_beam/yaml/examples/transforms/ml/enrich_spanner_with_bigquery.yaml similarity index 89% rename from sdks/python/apache_beam/yaml/examples/enrich_spanner_with_bigquery.yaml rename to sdks/python/apache_beam/yaml/examples/transforms/ml/enrich_spanner_with_bigquery.yaml index 56d1408f0818..e63b3105cc0c 100644 --- a/sdks/python/apache_beam/yaml/examples/enrich_spanner_with_bigquery.yaml +++ b/sdks/python/apache_beam/yaml/examples/transforms/ml/enrich_spanner_with_bigquery.yaml @@ -66,24 +66,25 @@ pipeline: callable: 'lambda x: x.order_amount' output_type: integer - # Step 4: Filter orders with amount greater than 100 + # Step 4: Filter orders with amount greater than 110 - type: Filter name: FilterHighValueOrders input: MapEnrichedValues config: - keep: "order_amount > 100" + keep: "order_amount > 110" language: "python" # Step 6: Write processed order to another spanner table + # Note: Make sure to replace $VARS with your values. - type: WriteToSpanner name: WriteProcessedOrders input: FilterHighValueOrders config: - project_id: 'apache-beam-testing' - instance_id: 'orders-test' - database_id: 'order-database' - table_id: 'orders_with_customers' + project_id: '$PROJECT' + instance_id: '$INSTANCE' + database_id: '$DATABASE' + table_id: '$TABLE' error_handling: output: my_error_output @@ -94,6 +95,8 @@ pipeline: config: path: 'errors.json' +options: + yaml_experimental_features: Enrichment + # Expected: # Row(customer_id=1001, customer_name='Alice', customer_email='alice@gmail.com', product_id=2001, order_date='24-03-24', order_amount=150) -# Row(customer_id=1003, customer_name='Claire', customer_email='claire@gmail.com', product_id=2003, order_date='7-05-24', order_amount=110) From bb511e9c089f11bf02e87e5c7a50fff0969c4296 Mon Sep 17 00:00:00 2001 From: Reeba Qureshi <64488642+reeba212@users.noreply.github.com> Date: Tue, 10 Dec 2024 13:11:36 +0530 Subject: [PATCH 12/14] Apply suggestions from code review Co-authored-by: Jeff Kinard --- .../apache_beam/yaml/examples/testing/examples_test.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/yaml/examples/testing/examples_test.py b/sdks/python/apache_beam/yaml/examples/testing/examples_test.py index ce1948d51a2e..109e98410852 100644 --- a/sdks/python/apache_beam/yaml/examples/testing/examples_test.py +++ b/sdks/python/apache_beam/yaml/examples/testing/examples_test.py @@ -61,21 +61,21 @@ def test_enrichment( def _fn(row): left = row._asdict() - right = products[str(row[row_key])] + right = products[str(left[row_key])] left['product'] = left.get('product', None) or right - return beam.Row(**row) + return beam.Row(**left) elif enrichment_handler == 'BigQuery': row_key = handler_config['fields'] dataset, table = handler_config['table_name'].split('.')[-2:] bq_data = INPUT_TABLES[('BigQuery', str(dataset), str(table))] - products = { + bq_data = { tuple(str(data[key]) for key in row_key): data for data in bq_data } def _fn(row): left = row._asdict() - right = products[tuple(str(left[k]) for k in row_key)] + right = bq_data[tuple(str(left[k]) for k in row_key)] row = { key: left.get(key, None) or right[key] for key in {*left.keys(), *right.keys()} @@ -393,7 +393,7 @@ def _spanner_io_read_test_preprocessor( 'SELECT ').replace(' from ', ' FROM ') columns = set( ''.join(config['query'].split('SELECT ')[1:]).split( - ' FROM', maxsplit=1)[0]) + ' FROM', maxsplit=1)[0].split(', ')) if columns != {'*'}: elements = [{ column: element[column] From 1a2e9daad23b69f4cf6e3bb7d51548b26634c36a Mon Sep 17 00:00:00 2001 From: Reeba Qureshi <64488642+reeba212@users.noreply.github.com> Date: Fri, 13 Dec 2024 23:36:55 +0530 Subject: [PATCH 13/14] Update bigquery_tools.py --- sdks/python/apache_beam/io/gcp/bigquery_tools.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py index c3d59aa08670..dfe47475f180 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py @@ -356,8 +356,7 @@ def __init__( self, client=None, temp_dataset_id=None, - temp_table_ref=None, - project_id=None): + temp_table_ref=None): self.client = client or BigQueryWrapper._bigquery_client(PipelineOptions()) self.gcp_bq_client = client or gcp_bigquery.Client( client_info=ClientInfo( From cd136c90db85f541c936de5d1288e142f3c767e5 Mon Sep 17 00:00:00 2001 From: Reeba Qureshi <64488642+reeba212@users.noreply.github.com> Date: Fri, 13 Dec 2024 23:53:02 +0530 Subject: [PATCH 14/14] Update bigquery_tools.py --- sdks/python/apache_beam/io/gcp/bigquery_tools.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py index dfe47475f180..48da929a07b2 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py @@ -352,11 +352,7 @@ class BigQueryWrapper(object): HISTOGRAM_METRIC_LOGGER = MetricLogger() - def __init__( - self, - client=None, - temp_dataset_id=None, - temp_table_ref=None): + def __init__(self, client=None, temp_dataset_id=None, temp_table_ref=None): self.client = client or BigQueryWrapper._bigquery_client(PipelineOptions()) self.gcp_bq_client = client or gcp_bigquery.Client( client_info=ClientInfo(