The connector uses the Spark SQL Data Source API to read data from Google BigQuery.
The BigQuery Storage API and this connector are in Beta and are subject to change.
Changes may include, but are not limited to:
- Type conversion
- Partitioning
- Parameters
Breaking changes will be restricted to major and minor versions.
The Storage API streams data in parallel directly from BigQuery via gRPC without using Google Cloud Storage as an intermediary.
It has a number of advantages over using the previous export-based read flow that should generally lead to better read performance:
It does not leave any temporary files in Google Cloud Storage. Rows are read directly from BigQuery servers using an Avro wire format.
The new API allows column and limited predicate filtering to only read the data you are interested in.
Since BigQuery is backed by a columnar datastore, it can efficiently stream data without reading all columns.
The Storage API supports limited pushdown of predicate filters. It supports a single comparison to a literal e.g.
col1 = 'val'
The API rebalances records between readers until they all complete. This means that all Map phases will finish nearly concurrently. See this blog article on how dynamic sharding is similarly used in Google Cloud Dataflow.
See Configuring Partitioning for more details.
Follow these instructions.
If you do not have an Apache Spark environment you can create a Cloud Dataproc cluster with pre-configured auth. The following examples assume you are using Cloud Dataproc, but you can use spark-submit
on any cluster.
Any Dataproc cluster using the API needs the 'bigquery' or 'cloud-platform' scopes. Dataproc clusters have the 'bigquery' scope by default, so most clusters in enabled projects should work by default e.g.
MY_CLUSTER=...
gcloud dataproc clusters create "$MY_CLUSTER"
The latest version connector of the connector is publicly available in gs://spark-lib/bigquery/spark-bigquery-latest.jar.
You can run a simple PySpark wordcount against the API without compilation by running
gcloud dataproc jobs submit pyspark --cluster "$MY_CLUSTER" \
--jars gs://spark-lib/bigquery/spark-bigquery-latest.jar \
examples/python/shakespeare.py
Unless you wish to use the implicit Scala API spark.read.bigquery("TABLE_ID")
, there is no need to compile against the connector.
To include the connector in your project:
<dependency>
<groupId>com.google.cloud.spark</groupId>
<artifactId>spark-bigquery_${scala.version}</artifactId>
<version>0.5.1-beta</version>
</dependency>
libraryDependencies += "com.google.cloud.spark" %% "spark-bigquery" % "0.5.1-beta"
The connector uses the cross language Spark SQL Data Source API:
df = spark.read
.format("bigquery")
.option("table", "publicdata.samples.shakespeare")
.load()
or the Scala only implicit API:
import com.google.cloud.spark.bigquery._
val df: DataFrame = spark.read.bigquery("publicdata.samples.shakespeare")
See Shakespeare.scala and shakespeare.py for more information.
The API Supports a number of options to configure the read
Property | Meaning |
table
|
The BigQuery table to read in the format [[project:]dataset.]table . (Required)
|
dataset
|
The dataset containing the table to read
(Optional unless omitted in |
project
|
The Google Cloud Project ID of the table to read from.
(Optional. Defaults to the project of the Service Account being used) |
parentProject
|
The Google Cloud Project ID of the table to bill for the export.
(Optional. Defaults to the project of the Service Account being used) |
parallelism
|
The number of partitions to split the data into. Actual number may be less if BigQuery deems the data small enough. If there are not enough executors to schedule a reader per partition, some partitions may be empty.
(Optional. Defaults to |
skewLimit
|
A soft limit to how many extra rows each partition will read after reading the expected number of rows (total rows / # partitions) for that partition.
It is a float representing the ratio of the limit to the expected number of rows e.g 2.0 allows each partition to be twice as large as expected. Must be at least 1.0.
(Optional. Defaults to 1.5 (150%). See |
filter
|
A manual predicate filter expression to pass to pass to BigQuery.
(Optional see filtering) |
With the exception of DATETIME
and TIME
all BigQuery data types directed map into the corresponding Spark SQL data type. Here are all of the mappings:
BigQuery Standard SQL Data Type | Spark SQL
Data Type |
Notes |
BOOL
|
BooleanType
|
|
INT64
|
LongType
|
|
FLOAT64
|
DoubleType
|
|
NUMERIC
|
DecimalType
|
This preserves NUMERIC 's full 38 digits of precision and 9 digits of scope.
|
STRING
|
StringType
|
|
BYTES
|
BinaryType
|
|
STRUCT
|
StructType
|
|
ARRAY
|
ArrayType
|
|
TIMESTAMP
|
TimestampType
|
|
DATE
|
DateType
|
|
DATETIME
|
StringType
|
Spark has no DATETIME type. Casting to TIMESTAMP uses a configured TimeZone, which defaults to the local timezone (UTC in GCE / Dataproc).
We are considering adding an optional TimeZone property to allow automatically converting to TimeStamp, this would be consistent with Spark's handling of CSV/JSON (except they always try to convert when inferring schema, and default to the local timezone) |
TIME
|
LongType
|
Spark has no TIME type. The generated longs, which indicate microseconds since midnight can be safely cast to TimestampType, but this causes the date to be inferred as the current day. Thus times are left as longs and user can cast if they like.
When casting to Timestamp TIME have the same TimeZone issues as DATETIME |
The connector automatically computes column and pushdown filters the DataFrame's SELECT
statement e.g.
spark.read.bigquery("publicdata:samples.shakespeare")
.select("word")
.where("word = 'Hamlet'")
.collect()
filters to the column word
and pushed down the predicate filter word = 'hamlet'
.
If you do not wish to make multiple read requests to BigQuery, you can cache the DataFrame before filtering e.g.:
val cachedDF = spark.read.bigquery("publicdata:samples.shakespeare").cache()
val rows = cachedDF.select("word")
.where("word = 'Hamlet'")
.collect()
// All of the table was cached and this doesn't require an API call
val otherRows = cachedDF.select("word_count")
.where("word = 'Romeo'")
.collect()
You can also manually specify the filter
option, which will override automatic pushdown and Spark will do the rest of the filtering in the client.
By default the connector creates one partition per current core available (Spark Default Parallelism) to get maximum concurrent bandwidth. This can be configured explicitly with the
parallelism
property. BigQuery may limit the number of partitions based on server constraints.
If not all partitions are currently being read some partitions may grow larger and some may be smaller or even empty. The fraction that partitions are allowed to grow beyond the expected total number of rows / number of partitions is bounded by the
skewLimit
parameter. The limit is soft and does not guarantee exact partitioning especially on small tables.
The connector is built using SBT:
sbt assembly
See the BigQuery pricing documentation.
You can manually set the number of partitions with the parallelism
property. BigQuery may provide fewer partitions than you ask for. See Configuring Partitioning.
You can also always repartition after reading in Spark.
Because the Storage API balances records between partitions as you read, if you don't schedule all of your map tasks concurrently, the last scheduled partitions may be empty.
Decreasing the skewLimit parameter to 1.0 (or something near it) should make your parttions more uniform (at the expense of tail latency). Alternatively you can increase your cluster size to schedule all partitions concurrently. See Configuring Partitioning
You can also always repartition after reading in Spark, which should remove the empty partitions.
You can use the existing MapReduce connector or write DataFrames to GCS and then load the data into BigQuery.
Use a service account JSON key and GOOGLE_APPLICATION_CREDENTIALS
as described here.
TODO(#6): Wire auth through Spark/Hadoop properties.