forked from apache/beam
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[yaml] Add examples for Spanner IO in YAML (apache#32288)
* Add example for spanner read * Add example for spanner write * move spanner examples * minor update * minor changes 1. Add good element in spanner write example to pass checks. 2. Remove spanner examples from examples_test.py for the time being. * add license
- Loading branch information
Showing
2 changed files
with
133 additions
and
0 deletions.
There are no files selected for viewing
80 changes: 80 additions & 0 deletions
80
sdks/python/apache_beam/yaml/examples/io/spanner_read.yaml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
# coding=utf-8 | ||
# | ||
# Licensed to the Apache Software Foundation (ASF) under one or more | ||
# contributor license agreements. See the NOTICE file distributed with | ||
# this work for additional information regarding copyright ownership. | ||
# The ASF licenses this file to You under the Apache License, Version 2.0 | ||
# (the "License"); you may not use this file except in compliance with | ||
# the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
pipeline: | ||
transforms: | ||
|
||
# Reading data from a Spanner database. The table used here has the following columns: | ||
# shipment_id (String), customer_id (String), shipment_date (String), shipment_cost (Float64), customer_name (String), customer_email (String) | ||
# ReadFromSpanner transform is called using project_id, instance_id, database_id and a query | ||
# A table with a list of columns can also be specified instead of a query | ||
- type: ReadFromSpanner | ||
name: ReadShipments | ||
config: | ||
project_id: 'apache-beam-testing' | ||
instance_id: 'shipment-test' | ||
database_id: 'shipment' | ||
query: 'SELECT * FROM shipments' | ||
|
||
# Filtering the data based on a specific condition | ||
# Here, the condition is used to keep only the rows where the customer_id is 'C1' | ||
- type: Filter | ||
name: FilterShipments | ||
input: ReadShipments | ||
config: | ||
language: python | ||
keep: "customer_id == 'C1'" | ||
|
||
# Mapping the data fields and applying transformations | ||
# A new field 'shipment_cost_category' is added with a custom transformation | ||
# A callable is defined to categorize shipment cost | ||
- type: MapToFields | ||
name: MapFieldsForSpanner | ||
input: FilterShipments | ||
config: | ||
language: python | ||
fields: | ||
shipment_id: shipment_id | ||
customer_id: customer_id | ||
shipment_date: shipment_date | ||
shipment_cost: shipment_cost | ||
customer_name: customer_name | ||
customer_email: customer_email | ||
shipment_cost_category: | ||
callable: | | ||
def categorize_cost(row): | ||
cost = float(row[3]) | ||
if cost < 50: | ||
return 'Low Cost' | ||
elif cost < 200: | ||
return 'Medium Cost' | ||
else: | ||
return 'High Cost' | ||
# Writing the transformed data to a CSV file | ||
- type: WriteToCsv | ||
name: WriteBig | ||
input: MapFieldsForSpanner | ||
config: | ||
path: shipments.csv | ||
|
||
|
||
# On executing the above pipeline, a new CSV file is created with the following records | ||
|
||
# Expected: | ||
# Row(shipment_id='S1', customer_id='C1', shipment_date='2023-05-01', shipment_cost=150.0, customer_name='Alice', customer_email='[email protected]', shipment_cost_category='Medium Cost') | ||
# Row(shipment_id='S3', customer_id='C1', shipment_date='2023-05-10', shipment_cost=20.0, customer_name='Alice', customer_email='[email protected]', shipment_cost_category='Low Cost') |
53 changes: 53 additions & 0 deletions
53
sdks/python/apache_beam/yaml/examples/io/spanner_write.yaml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
# coding=utf-8 | ||
# | ||
# Licensed to the Apache Software Foundation (ASF) under one or more | ||
# contributor license agreements. See the NOTICE file distributed with | ||
# this work for additional information regarding copyright ownership. | ||
# The ASF licenses this file to You under the Apache License, Version 2.0 | ||
# (the "License"); you may not use this file except in compliance with | ||
# the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
pipeline: | ||
transforms: | ||
|
||
# Step 1: Creating rows to be written to Spanner | ||
# The element names correspond to the column names in the Spanner table | ||
- type: Create | ||
name: CreateRows | ||
config: | ||
elements: | ||
- shipment_id: "S5" | ||
customer_id: "C5" | ||
shipment_date: "2023-05-09" | ||
shipment_cost: 300.0 | ||
customer_name: "Erin" | ||
customer_email: "[email protected]" | ||
|
||
# Step 2: Writing the created rows to a Spanner database | ||
# We require the project ID, instance ID, database ID and table ID to connect to Spanner | ||
# Error handling can be specified optionally to ensure any failed operations aren't lost | ||
# The failed data is passed on in the pipeline and can be handled | ||
- type: WriteToSpanner | ||
name: WriteSpanner | ||
input: CreateRows | ||
config: | ||
project_id: 'apache-beam-testing' | ||
instance_id: 'shipment-test' | ||
database_id: 'shipment' | ||
table_id: 'shipments' | ||
error_handling: | ||
output: my_error_output | ||
|
||
# Step 3: Writing the failed records to a JSON file | ||
- type: WriteToJson | ||
input: WriteSpanner.my_error_output | ||
config: | ||
path: errors.json |