Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BigQueryIO uniformize direct and export reads #32360

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.DefaultCoder;
Expand All @@ -60,7 +59,6 @@
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations;
import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.io.range.OffsetRange;
Expand Down Expand Up @@ -228,8 +226,7 @@ public static void modelBigQueryIO(
// [START BigQueryReadFunction]
PCollection<Double> maxTemperatures =
p.apply(
BigQueryIO.read(
(SchemaAndRecord elem) -> (Double) elem.getRecord().get("max_temperature"))
BigQueryIO.parseGenericRecords(record -> (Double) record.get("max_temperature"))
.from(tableSpec)
.withCoder(DoubleCoder.of()));
// [END BigQueryReadFunction]
Expand All @@ -239,8 +236,7 @@ public static void modelBigQueryIO(
// [START BigQueryReadQuery]
PCollection<Double> maxTemperatures =
p.apply(
BigQueryIO.read(
(SchemaAndRecord elem) -> (Double) elem.getRecord().get("max_temperature"))
BigQueryIO.parseGenericRecords(record -> (Double) record.get("max_temperature"))
.fromQuery(
"SELECT max_temperature FROM [apache-beam-testing.samples.weather_stations]")
.withCoder(DoubleCoder.of()));
Expand All @@ -251,8 +247,7 @@ public static void modelBigQueryIO(
// [START BigQueryReadQueryStdSQL]
PCollection<Double> maxTemperatures =
p.apply(
BigQueryIO.read(
(SchemaAndRecord elem) -> (Double) elem.getRecord().get("max_temperature"))
BigQueryIO.parseGenericRecords(record -> (Double) record.get("max_temperature"))
.fromQuery(
"SELECT max_temperature FROM `clouddataflow-readonly.samples.weather_stations`")
.usingStandardSql()
Expand Down Expand Up @@ -392,15 +387,13 @@ public WeatherData(long year, long month, long day, double maxTemp) {

PCollection<WeatherData> weatherData =
p.apply(
BigQueryIO.read(
(SchemaAndRecord elem) -> {
GenericRecord record = elem.getRecord();
return new WeatherData(
(Long) record.get("year"),
(Long) record.get("month"),
(Long) record.get("day"),
(Double) record.get("max_temperature"));
})
BigQueryIO.parseGenericRecords(
record ->
new WeatherData(
(Long) record.get("year"),
(Long) record.get("month"),
(Long) record.get("day"),
(Double) record.get("max_temperature")))
.fromQuery(
"SELECT year, month, day, max_temperature "
+ "FROM [apache-beam-testing.samples.weather_stations] "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ object Snippets {
val tableSpec = "apache-beam-testing.samples.weather_stations"
// [START BigQueryReadFunction]
val maxTemperatures = pipeline.apply(
BigQueryIO.read { it.record["max_temperature"] as Double? }
BigQueryIO.parseGenericRecords { it["max_temperature"] as Double? }
.from(tableSpec)
.withCoder(DoubleCoder.of()))
// [END BigQueryReadFunction]
Expand All @@ -130,7 +130,7 @@ object Snippets {
run {
// [START BigQueryReadQuery]
val maxTemperatures = pipeline.apply(
BigQueryIO.read { it.record["max_temperature"] as Double? }
BigQueryIO.parseGenericRecords { it["max_temperature"] as Double? }
.fromQuery(
"SELECT max_temperature FROM [apache-beam-testing.samples.weather_stations]")
.withCoder(DoubleCoder.of()))
Expand All @@ -140,7 +140,7 @@ object Snippets {
run {
// [START BigQueryReadQueryStdSQL]
val maxTemperatures = pipeline.apply(
BigQueryIO.read { it.record["max_temperature"] as Double? }
BigQueryIO.parseGenericRecords { it["max_temperature"] as Double? }
.fromQuery(
"SELECT max_temperature FROM `clouddataflow-readonly.samples.weather_stations`")
.usingStandardSql()
Expand Down Expand Up @@ -249,13 +249,12 @@ object Snippets {
)
*/
val weatherData = pipeline.apply(
BigQueryIO.read {
val record = it.record
BigQueryIO.parseGenericRecords {
WeatherData(
record.get("year") as Long,
record.get("month") as Long,
record.get("day") as Long,
record.get("max_temperature") as Double)
it.get("year") as Long,
it.get("month") as Long,
it.get("day") as Long,
it.get("max_temperature") as Double)
}
.fromQuery("""
SELECT year, month, day, max_temperature
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,16 @@ public static RecordBatchRowIterator rowsFromSerializedRecordBatch(
InputStream inputStream,
RootAllocator allocator)
throws IOException {
return rowsFromSerializedRecordBatch(
arrowSchema, ArrowSchemaTranslator.toBeamSchema(arrowSchema), inputStream, allocator);
}

public static RecordBatchRowIterator rowsFromSerializedRecordBatch(
org.apache.arrow.vector.types.pojo.Schema arrowSchema,
Schema schema,
InputStream inputStream,
RootAllocator allocator)
throws IOException {
VectorSchemaRoot vectorRoot = VectorSchemaRoot.create(arrowSchema, allocator);
VectorLoader vectorLoader = new VectorLoader(vectorRoot);
vectorRoot.clear();
Expand All @@ -261,7 +271,7 @@ public static RecordBatchRowIterator rowsFromSerializedRecordBatch(
vectorLoader.load(arrowMessage);
}
}
return rowsFromRecordBatch(ArrowSchemaTranslator.toBeamSchema(arrowSchema), vectorRoot);
return rowsFromRecordBatch(schema, vectorRoot);
}

public static org.apache.arrow.vector.types.pojo.Schema arrowSchemaFromInput(InputStream input)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,11 @@ public interface DatumReaderFactory<T> extends Serializable {
// Use cases of AvroSource are:
// 1) AvroSource<GenericRecord> Reading GenericRecord records with a specified schema.
// 2) AvroSource<Foo> Reading records of a generated Avro class Foo.
// 3) AvroSource<T> Reading GenericRecord records with an unspecified schema
// 3) AvroSource<T> Reading GenericRecord records with an (un)specified schema
// and converting them to type T.
// | Case 1 | Case 2 | Case 3 |
// type | GenericRecord | Foo | GenericRecord |
// readerSchemaString | non-null | non-null | null |
// readerSchemaString | non-null | non-null | either |
// parseFn | null | null | non-null |
// outputCoder | either | either | non-null |
// readerFactory | either | either | either |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,19 @@
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.bigquery.storage.v1.DataFormat;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
import org.apache.beam.sdk.io.gcp.testing.BigqueryClient;
import org.apache.beam.sdk.options.ApplicationNameOptions;
import org.apache.beam.sdk.testing.PAssert;
Expand Down Expand Up @@ -179,20 +178,18 @@ private void readSketchFromBigQuery(String tableId, Long expectedCount) {
"SELECT HLL_COUNT.INIT(%s) AS %s FROM %s",
DATA_FIELD_NAME, QUERY_RESULT_FIELD_NAME, tableSpec);

SerializableFunction<SchemaAndRecord, byte[]> parseQueryResultToByteArray =
input ->
SerializableFunction<GenericRecord, byte[]> parseQueryResultToByteArray =
record ->
// BigQuery BYTES type corresponds to Java java.nio.ByteBuffer type
HllCount.getSketchFromByteBuffer(
(ByteBuffer) input.getRecord().get(QUERY_RESULT_FIELD_NAME));
HllCount.getSketchFromByteBuffer((ByteBuffer) record.get(QUERY_RESULT_FIELD_NAME));

TestPipelineOptions options =
TestPipeline.testingPipelineOptions().as(TestPipelineOptions.class);

Pipeline p = Pipeline.create(options);
PCollection<Long> result =
p.apply(
BigQueryIO.read(parseQueryResultToByteArray)
.withFormat(DataFormat.AVRO)
BigQueryIO.parseGenericRecords(parseQueryResultToByteArray)
.fromQuery(query)
.usingStandardSql()
.withMethod(Method.DIRECT_READ)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import java.io.IOException;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.io.DatumWriter;
import org.apache.beam.sdk.extensions.avro.io.AvroSink;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.MimeTypes;

Expand All @@ -36,7 +36,7 @@ class AvroRowWriter<AvroT, T> extends BigQueryRowWriter<T> {
String basename,
Schema schema,
SerializableFunction<AvroWriteRequest<T>, AvroT> toAvroRecord,
SerializableFunction<Schema, DatumWriter<AvroT>> writerFactory)
AvroSink.DatumWriterFactory<AvroT> writerFactory)
throws Exception {
super(basename, MimeTypes.BINARY);

Expand Down
Loading
Loading