This repo contains several examples of the Dataflow python API. The examples are solutions to common use cases we see in the field.
- Ingesting data from a file into BigQuery
- Transforming data in Dataflow
- Joining file and BigQuery datasets in Dataflow
- Ingest data from files into Bigquery reading the file structure from Datastore
- Data lake to data mart
The solutions below become more complex as we incorporate more Dataflow features.
This example shows how to ingest a raw CSV file into BigQuery with minimal transformation. It is the simplest example and a great one to start with in order to become familiar with Dataflow.
There are three main steps:
Using the built in TextIO connector allows beam to have several workers read the file in parallel. This allows larger file sizes and large number of input files to scale well within beam.
Dataflow will read in each row of data from the file and distribute the data to the next stage of the data pipeline.
This is the stage of the code where you would typically put your business logic. In this example we are simply transforming the data from a CSV format into a python dictionary. The dictionary maps column names to the values we want to store in BigQuery.
Writing the data to BigQuery does not require custom code. Passing the table name and a few other optional arguments into BigQueryIO sets up the final stage of the pipeline.
This stage of the pipeline is typically referred to as our sink. The sink is the final destination of data. No more processing will occur in the pipeline after this stage.
Ready to dive deeper? Check out the complete code here.
This example builds upon simple ingestion, and demonstrates some basic data type transformations.
In line with the previous example there are 3 steps. The transformation step is made more useful by tranlating the date format from the source data into a date format BigQuery accepts.
- Read in the file.
- Transform the CSV format into a dictionary format and translate the date format.
- Write the data to BigQuery.
Similar to the previous example, this example uses TextIO to read the file from Google Cloud Storage.
This example builds upon the simpler ingestion example by introducing data type transformations.
Just as in our previous example, this example uses BigQuery IO to write out to BigQuery.
Ready to dive deeper? Check out the complete code here.
This example demonstrates how to work with two datasets. A primary dataset is read from a file, and another dataset containing reference is read from BigQuery. The two datasets are then joined in Dataflow before writing the joined dataset down to BigQuery.
This pipeline contains 4 steps:
- Read in the primary dataset from a file.
- Read in the reference data from BigQuery.
- Custom Python code is used to join the two datasets.
- The joined dataset is written out to BigQuery.
Similar to previous examples, we use TextIO to read the dataset from a CSV file.
Using BigQueryIO, we can specify a query to read data from. Dataflow then is able to distribute the data from BigQuery to the next stages in the pipeline.
In this example the additional dataset is represented as a side input. Side inputs in Dataflow are typically reference datasets that fit into memory. Other examples will explore alternative methods for joining datasets which work well for datasets that do not fit into memory.
Using custom python code, we join the two datasets together. Because the two datasets are dictionaries, the python code is the same as it would be for unioning any two python dictionaries.
Finally the joined dataset is written out to BigQuery. This uses the same BigQueryIO API which is used in previous examples.
Ready to dive deeper? Check out the complete code here.
In this example we create a Python Apache Beam pipeline running on Google Cloud Dataflow to import CSV files into BigQuery using the following architecture:
The architecture uses:
- Google Cloud Storage to store CSV source files
- Google Cloud Datastore to store CSV file structure and field type
- Google Cloud Dataflow to read files from Google Cloud Storage, Transform data base on the structure of the file and import the data into Google BigQuery
- Google BigQuery to store data in a Data Lake.
You can use this script as a starting point to import your files into Google BigQuery. You'll probably need to adapt the script logic to your file name structure or to your peculiar needs.
- Up and running GCP project with enabled billing account
- gcloud installed and initiated to your project
- Google Cloud Datastore enabled
- Google Cloud Dataflow API enabled
- Google Cloud Storage Bucket containing the file to import (CSV format) using the following naming convention:
TABLENAME_*.csv
- Google Cloud Storage Bucket for tem and staging Google Dataflow files
- Google BigQuery dataset
- Python >= 2.7 and python-dev module
- gcc
- Google Cloud Application Default Credentials
Create a new virtual environment (recommended) and install requirements:
virtualenv env
source ./env/bin/activate
pip install -r requirements.txt
Create a file that contains the structure of the CSVs to be imported. Filename needs to follow convention: TABLENAME.csv
.
Example:
name,STRING
surname,STRING
age,INTEGER
You can check parameters accepted by the datastore_schema_import.py
script with the following command:
python dataflow_python_examples/datastore_schema_import.py --help
Run the datastore_schema_import.py
script to create the entry in Google Cloud Datastore using the following command:
python dataflow_python_examples/datastore_schema_import.py --schema-file=<path_to_TABLENAME.csv>
Upload files to be imported into Google Bigquery in a Google Cloud Storage Bucket. You can use gsutil
using a command like:
gsutil cp [LOCAL_OBJECT_LOCATION] gs://[DESTINATION_BUCKET_NAME]/
To optimize upload of big files see the documentation. Files need to be in CSV format, with the name of the column as first row. For example:
name,surname,age
test_1,test_1,30
test_2,test_2,40
"test_3, jr",surname,50
You can check parameters accepted by the data_ingestion_configurable.py
script with the following command:
python dataflow_python_examples/data_ingestion_configurable --help
You can run the pipeline locally with the following command:
python dataflow_python_examples/data_ingestion_configurable.py \
--project=###PUT HERE PROJECT ID### \
--input-bucket=###PUT HERE GCS BUCKET NAME: gs://bucket_name ### \
--input-path=###PUT HERE INPUT FOLDER### \
--input-files=###PUT HERE FILE NAMES### \
--bq-dataset=###PUT HERE BQ DATASET NAME###
or you can run the pipeline on Google Dataflow using the following command:
python dataflow_python_examples/data_ingestion_configurable.py \
--runner=DataflowRunner \
--max_num_workers=100 \
--autoscaling_algorithm=THROUGHPUT_BASED \
--region=###PUT HERE REGION### \
--staging_location=###PUT HERE GCS STAGING LOCATION### \
--temp_location=###PUT HERE GCS TMP LOCATION###\
--project=###PUT HERE PROJECT ID### \
--input-bucket=###PUT HERE GCS BUCKET NAME### \
--input-path=###PUT HERE INPUT FOLDER### \
--input-files=###PUT HERE FILE NAMES### \
--bq-dataset=###PUT HERE BQ DATASET NAME###
You can check data imported into Google BigQuery from the Google Cloud Console UI.
This example demonstrates joining data from two different datasets in BigQuery, applying transformations to the joined dataset before uploading to BigQuery.
Joining two datasets from BigQuery is a common use case when a data lake has been implemented in BigQuery. Creating a data mart with denormalized datasets facilitates better performance when using visualization tools.
This pipeline contains 4 steps:
- Read in the primary dataset from BigQuery.
- Read in the reference data from BigQuery.
- Custom Python code is used to join the two datasets. Alternatively, CoGroupByKey can be used to join the two datasets.
- The joined dataset is written out to BigQuery.
Similar to previous examples, we use BigQueryIO to read the dataset from the results of a query. In this case our main dataset is a fake orders dataset, containing a history of orders and associated data like quantity.
In this example we use a fake account details dataset. This represents a common use case for denormalizing a dataset. The account details information contains attributes linked to the accounts in the orders dataset. For example the address and city of the account.
Using custom python code, we join the two datasets together. We provide two examples of joining these datasets. The first example uses side inputs, which require the dataset fit into memory. The second example demonstrates how to use CoGroupByKey to join the datasets.
CoGroupByKey will facilitate joins between two datesets even if neither fit into memory. Explore the comments in the two code examples for a more in depth explanation.
Finally the joined dataset is written out to BigQuery. This uses the same BigQueryIO API which is used in previous examples.
Ready to dive deeper? Check out the complete code. The example using side inputs is here and the example using CoGroupByKey is here.