Skip to content

Commit

Permalink
[yaml] Add examples for Spanner IO in YAML (#32288)
Browse files Browse the repository at this point in the history
* Add example for spanner read

* Add example for spanner write

* move spanner examples

* minor update

* minor changes

1. Add good element in spanner write example to pass checks.
2. Remove spanner examples from examples_test.py for the time being.

* add license
  • Loading branch information
reeba212 authored Sep 16, 2024
1 parent f8475c9 commit 9f8a4b2
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 0 deletions.
80 changes: 80 additions & 0 deletions sdks/python/apache_beam/yaml/examples/io/spanner_read.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# coding=utf-8
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

pipeline:
transforms:

# Reading data from a Spanner database. The table used here has the following columns:
# shipment_id (String), customer_id (String), shipment_date (String), shipment_cost (Float64), customer_name (String), customer_email (String)
# ReadFromSpanner transform is called using project_id, instance_id, database_id and a query
# A table with a list of columns can also be specified instead of a query
- type: ReadFromSpanner
name: ReadShipments
config:
project_id: 'apache-beam-testing'
instance_id: 'shipment-test'
database_id: 'shipment'
query: 'SELECT * FROM shipments'

# Filtering the data based on a specific condition
# Here, the condition is used to keep only the rows where the customer_id is 'C1'
- type: Filter
name: FilterShipments
input: ReadShipments
config:
language: python
keep: "customer_id == 'C1'"

# Mapping the data fields and applying transformations
# A new field 'shipment_cost_category' is added with a custom transformation
# A callable is defined to categorize shipment cost
- type: MapToFields
name: MapFieldsForSpanner
input: FilterShipments
config:
language: python
fields:
shipment_id: shipment_id
customer_id: customer_id
shipment_date: shipment_date
shipment_cost: shipment_cost
customer_name: customer_name
customer_email: customer_email
shipment_cost_category:
callable: |
def categorize_cost(row):
cost = float(row[3])
if cost < 50:
return 'Low Cost'
elif cost < 200:
return 'Medium Cost'
else:
return 'High Cost'
# Writing the transformed data to a CSV file
- type: WriteToCsv
name: WriteBig
input: MapFieldsForSpanner
config:
path: shipments.csv


# On executing the above pipeline, a new CSV file is created with the following records

# Expected:
# Row(shipment_id='S1', customer_id='C1', shipment_date='2023-05-01', shipment_cost=150.0, customer_name='Alice', customer_email='[email protected]', shipment_cost_category='Medium Cost')
# Row(shipment_id='S3', customer_id='C1', shipment_date='2023-05-10', shipment_cost=20.0, customer_name='Alice', customer_email='[email protected]', shipment_cost_category='Low Cost')
53 changes: 53 additions & 0 deletions sdks/python/apache_beam/yaml/examples/io/spanner_write.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# coding=utf-8
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

pipeline:
transforms:

# Step 1: Creating rows to be written to Spanner
# The element names correspond to the column names in the Spanner table
- type: Create
name: CreateRows
config:
elements:
- shipment_id: "S5"
customer_id: "C5"
shipment_date: "2023-05-09"
shipment_cost: 300.0
customer_name: "Erin"
customer_email: "[email protected]"

# Step 2: Writing the created rows to a Spanner database
# We require the project ID, instance ID, database ID and table ID to connect to Spanner
# Error handling can be specified optionally to ensure any failed operations aren't lost
# The failed data is passed on in the pipeline and can be handled
- type: WriteToSpanner
name: WriteSpanner
input: CreateRows
config:
project_id: 'apache-beam-testing'
instance_id: 'shipment-test'
database_id: 'shipment'
table_id: 'shipments'
error_handling:
output: my_error_output

# Step 3: Writing the failed records to a JSON file
- type: WriteToJson
input: WriteSpanner.my_error_output
config:
path: errors.json

0 comments on commit 9f8a4b2

Please sign in to comment.