diff --git a/sdks/python/apache_beam/yaml/examples/io/spanner_read.yaml b/sdks/python/apache_beam/yaml/examples/io/spanner_read.yaml new file mode 100644 index 000000000000..c86d42c1e0c6 --- /dev/null +++ b/sdks/python/apache_beam/yaml/examples/io/spanner_read.yaml @@ -0,0 +1,80 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +pipeline: + transforms: + + # Reading data from a Spanner database. The table used here has the following columns: + # shipment_id (String), customer_id (String), shipment_date (String), shipment_cost (Float64), customer_name (String), customer_email (String) + # ReadFromSpanner transform is called using project_id, instance_id, database_id and a query + # A table with a list of columns can also be specified instead of a query + - type: ReadFromSpanner + name: ReadShipments + config: + project_id: 'apache-beam-testing' + instance_id: 'shipment-test' + database_id: 'shipment' + query: 'SELECT * FROM shipments' + + # Filtering the data based on a specific condition + # Here, the condition is used to keep only the rows where the customer_id is 'C1' + - type: Filter + name: FilterShipments + input: ReadShipments + config: + language: python + keep: "customer_id == 'C1'" + + # Mapping the data fields and applying transformations + # A new field 'shipment_cost_category' is added with a custom transformation + # A callable is defined to categorize shipment cost + - type: MapToFields + name: MapFieldsForSpanner + input: FilterShipments + config: + language: python + fields: + shipment_id: shipment_id + customer_id: customer_id + shipment_date: shipment_date + shipment_cost: shipment_cost + customer_name: customer_name + customer_email: customer_email + shipment_cost_category: + callable: | + def categorize_cost(row): + cost = float(row[3]) + if cost < 50: + return 'Low Cost' + elif cost < 200: + return 'Medium Cost' + else: + return 'High Cost' + + # Writing the transformed data to a CSV file + - type: WriteToCsv + name: WriteBig + input: MapFieldsForSpanner + config: + path: shipments.csv + + + # On executing the above pipeline, a new CSV file is created with the following records + +# Expected: +# Row(shipment_id='S1', customer_id='C1', shipment_date='2023-05-01', shipment_cost=150.0, customer_name='Alice', customer_email='alice@example.com', shipment_cost_category='Medium Cost') +# Row(shipment_id='S3', customer_id='C1', shipment_date='2023-05-10', shipment_cost=20.0, customer_name='Alice', customer_email='alice@example.com', shipment_cost_category='Low Cost') diff --git a/sdks/python/apache_beam/yaml/examples/io/spanner_write.yaml b/sdks/python/apache_beam/yaml/examples/io/spanner_write.yaml new file mode 100644 index 000000000000..74ac35de260f --- /dev/null +++ b/sdks/python/apache_beam/yaml/examples/io/spanner_write.yaml @@ -0,0 +1,53 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +pipeline: + transforms: + + # Step 1: Creating rows to be written to Spanner + # The element names correspond to the column names in the Spanner table + - type: Create + name: CreateRows + config: + elements: + - shipment_id: "S5" + customer_id: "C5" + shipment_date: "2023-05-09" + shipment_cost: 300.0 + customer_name: "Erin" + customer_email: "erin@example.com" + + # Step 2: Writing the created rows to a Spanner database + # We require the project ID, instance ID, database ID and table ID to connect to Spanner + # Error handling can be specified optionally to ensure any failed operations aren't lost + # The failed data is passed on in the pipeline and can be handled + - type: WriteToSpanner + name: WriteSpanner + input: CreateRows + config: + project_id: 'apache-beam-testing' + instance_id: 'shipment-test' + database_id: 'shipment' + table_id: 'shipments' + error_handling: + output: my_error_output + + # Step 3: Writing the failed records to a JSON file + - type: WriteToJson + input: WriteSpanner.my_error_output + config: + path: errors.json