diff --git a/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java b/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java index 4f24c69f74b7..afa736202cb9 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java +++ b/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java @@ -43,7 +43,6 @@ import java.util.Map; import java.util.stream.Collectors; import javax.annotation.Nullable; -import org.apache.avro.generic.GenericRecord; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.DefaultCoder; @@ -60,7 +59,6 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions; import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations; import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy; -import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord; import org.apache.beam.sdk.io.gcp.bigquery.TableDestination; import org.apache.beam.sdk.io.gcp.bigquery.WriteResult; import org.apache.beam.sdk.io.range.OffsetRange; @@ -228,8 +226,7 @@ public static void modelBigQueryIO( // [START BigQueryReadFunction] PCollection maxTemperatures = p.apply( - BigQueryIO.read( - (SchemaAndRecord elem) -> (Double) elem.getRecord().get("max_temperature")) + BigQueryIO.readAvro(record -> (Double) record.get("max_temperature")) .from(tableSpec) .withCoder(DoubleCoder.of())); // [END BigQueryReadFunction] @@ -239,8 +236,7 @@ public static void modelBigQueryIO( // [START BigQueryReadQuery] PCollection maxTemperatures = p.apply( - BigQueryIO.read( - (SchemaAndRecord elem) -> (Double) elem.getRecord().get("max_temperature")) + BigQueryIO.readAvro(record -> (Double) record.get("max_temperature")) .fromQuery( "SELECT max_temperature FROM [apache-beam-testing.samples.weather_stations]") .withCoder(DoubleCoder.of())); @@ -251,8 +247,7 @@ public static void modelBigQueryIO( // [START BigQueryReadQueryStdSQL] PCollection maxTemperatures = p.apply( - BigQueryIO.read( - (SchemaAndRecord elem) -> (Double) elem.getRecord().get("max_temperature")) + BigQueryIO.readAvro(record -> (Double) record.get("max_temperature")) .fromQuery( "SELECT max_temperature FROM `clouddataflow-readonly.samples.weather_stations`") .usingStandardSql() @@ -392,15 +387,13 @@ public WeatherData(long year, long month, long day, double maxTemp) { PCollection weatherData = p.apply( - BigQueryIO.read( - (SchemaAndRecord elem) -> { - GenericRecord record = elem.getRecord(); - return new WeatherData( - (Long) record.get("year"), - (Long) record.get("month"), - (Long) record.get("day"), - (Double) record.get("max_temperature")); - }) + BigQueryIO.readAvro( + record -> + new WeatherData( + (Long) record.get("year"), + (Long) record.get("month"), + (Long) record.get("day"), + (Double) record.get("max_temperature"))) .fromQuery( "SELECT year, month, day, max_temperature " + "FROM [apache-beam-testing.samples.weather_stations] " diff --git a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/snippets/Snippets.kt b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/snippets/Snippets.kt index d2f58c215a56..e84f70027d5a 100644 --- a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/snippets/Snippets.kt +++ b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/snippets/Snippets.kt @@ -121,7 +121,7 @@ object Snippets { val tableSpec = "apache-beam-testing.samples.weather_stations" // [START BigQueryReadFunction] val maxTemperatures = pipeline.apply( - BigQueryIO.read { it.record["max_temperature"] as Double? } + BigQueryIO.readAvro { it["max_temperature"] as Double? } .from(tableSpec) .withCoder(DoubleCoder.of())) // [END BigQueryReadFunction] @@ -130,7 +130,7 @@ object Snippets { run { // [START BigQueryReadQuery] val maxTemperatures = pipeline.apply( - BigQueryIO.read { it.record["max_temperature"] as Double? } + BigQueryIO.readAvro { it["max_temperature"] as Double? } .fromQuery( "SELECT max_temperature FROM [apache-beam-testing.samples.weather_stations]") .withCoder(DoubleCoder.of())) @@ -140,7 +140,7 @@ object Snippets { run { // [START BigQueryReadQueryStdSQL] val maxTemperatures = pipeline.apply( - BigQueryIO.read { it.record["max_temperature"] as Double? } + BigQueryIO.readAvro { it["max_temperature"] as Double? } .fromQuery( "SELECT max_temperature FROM `clouddataflow-readonly.samples.weather_stations`") .usingStandardSql() @@ -249,13 +249,12 @@ object Snippets { ) */ val weatherData = pipeline.apply( - BigQueryIO.read { - val record = it.record + BigQueryIO.readAvro { WeatherData( - record.get("year") as Long, - record.get("month") as Long, - record.get("day") as Long, - record.get("max_temperature") as Double) + it.get("year") as Long, + it.get("month") as Long, + it.get("day") as Long, + it.get("max_temperature") as Double) } .fromQuery(""" SELECT year, month, day, max_temperature diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java index 1898c28f670c..84d580b4cb85 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java @@ -237,7 +237,7 @@ private String generateRowRestrictions(Schema schema, List supported) { private TypedRead getBigQueryTypedRead(Schema schema) { return BigQueryIO.read( - record -> BigQueryUtils.toBeamRow(record.getRecord(), schema, conversionOptions)) + record -> BigQueryUtils.toBeamRow(record.getElement(), schema, conversionOptions)) .withMethod(method) .from(bqLocation) .withCoder(SchemaCoder.of(schema)); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index c1fd1a61b0ea..c3330368bc3a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -119,7 +119,6 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Reshuffle; -import org.apache.beam.sdk.transforms.SerializableBiFunction; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.SerializableFunctions; import org.apache.beam.sdk.transforms.SimpleFunction; @@ -662,13 +661,13 @@ public static TypedRead readTableRows(DataFormat dataFormat) { null, true, AvroDatumFactory.generic(), - (s, r) -> BigQueryAvroUtils.convertGenericRecordToTableRow(r), + input -> BigQueryAvroUtils.convertGenericRecordToTableRow(input.getElement()), TableRowJsonCoder.of(), TypeDescriptor.of(TableRow.class)); } else if (dataFormat == DataFormat.ARROW) { return readArrowImpl( null, - (s, r) -> BigQueryUtils.toTableRow(r), + input -> BigQueryUtils.toTableRow(input.getElement()), TableRowJsonCoder.of(), TypeDescriptor.of(TableRow.class)); } else { @@ -742,12 +741,7 @@ public static TypedRead readRows(DataFormat dataFormat) { @Deprecated public static TypedRead read(SerializableFunction parseFn) { return readAvroImpl( - null, - false, - AvroDatumFactory.generic(), - (s, r) -> parseFn.apply(new SchemaAndRecord(r, s)), - null, - TypeDescriptors.outputOf(parseFn)); + null, false, AvroDatumFactory.generic(), parseFn, null, TypeDescriptors.outputOf(parseFn)); } /** @@ -767,7 +761,7 @@ public static TypedRead readWithDatumReader( if (readerFactory instanceof AvroDatumFactory) { td = TypeDescriptor.of(((AvroDatumFactory) readerFactory).getType()); } - return readAvroImpl(null, false, readerFactory, (s, r) -> r, null, td); + return readAvroImpl(null, false, readerFactory, SchemaAndElement::getElement, null, td); } /** @@ -780,7 +774,7 @@ public static TypedRead readAvro() { null, true, AvroDatumFactory.generic(), - (s, r) -> r, + SchemaAndElement::getRecord, null, TypeDescriptor.of(GenericRecord.class)); } @@ -795,7 +789,7 @@ public static TypedRead readAvro(org.apache.avro.Schema schema) { schema, true, AvroDatumFactory.generic(), - (s, r) -> r, + SchemaAndElement::getRecord, AvroCoder.generic(schema), TypeDescriptor.of(GenericRecord.class)); } @@ -817,7 +811,7 @@ public static TypedRead readAvro(Class recordClass) { } AvroCoder coder = AvroCoder.of(factory, schema); TypeDescriptor td = TypeDescriptor.of(recordClass); - return readAvroImpl(schema, true, factory, (s, r) -> r, coder, td); + return readAvroImpl(schema, true, factory, SchemaAndElement::getElement, coder, td); } /** @@ -834,7 +828,7 @@ public static TypedRead readAvro( coder = AvroCoder.of((AvroDatumFactory) readerFactory, schema); td = TypeDescriptor.of(((AvroDatumFactory) readerFactory).getType()); } - return readAvroImpl(schema, true, readerFactory, (s, r) -> r, coder, td); + return readAvroImpl(schema, true, readerFactory, SchemaAndElement::getElement, coder, td); } /** @@ -848,32 +842,35 @@ public static TypedRead readAvro( null, true, AvroDatumFactory.generic(), - (s, r) -> avroFormatFunction.apply(r), + input -> avroFormatFunction.apply(input.getElement()), null, TypeDescriptors.outputOf(avroFormatFunction)); } + @SuppressWarnings("unchecked") private static TypedRead readAvroImpl( org.apache.avro.@Nullable Schema schema, // when null infer from TableSchema at runtime Boolean useAvroLogicalTypes, AvroSource.DatumReaderFactory readerFactory, - SerializableBiFunction avroFormatFunction, + SerializableFunction, T> parseFn, @Nullable Coder coder, @Nullable TypeDescriptor typeDescriptor) { - BigQueryReaderFactory bqReaderFactory = - BigQueryReaderFactory.avro(schema, useAvroLogicalTypes, readerFactory, avroFormatFunction); + if (typeDescriptor != null && typeDescriptor.hasUnresolvedParameters()) { // type extraction failed and will not be serializable typeDescriptor = null; } + return new AutoValue_BigQueryIO_TypedRead.Builder() .setValidate(true) .setWithTemplateCompatibility(false) .setBigQueryServices(new BigQueryServicesImpl()) - .setBigQueryReaderFactory(bqReaderFactory) .setMethod(TypedRead.Method.DEFAULT) - .setUseAvroLogicalTypes(useAvroLogicalTypes) .setFormat(DataFormat.AVRO) + .setAvroSchema(schema) + .setDatumReaderFactory(readerFactory) + .setParseFn(parseFn) + .setUseAvroLogicalTypes(useAvroLogicalTypes) .setProjectionPushdownApplied(false) .setBadRecordErrorHandler(new DefaultErrorHandler<>()) .setBadRecordRouter(BadRecordRouter.THROWING_ROUTER) @@ -887,15 +884,16 @@ private static TypedRead readAvroImpl( * each row of the table or query result as {@link Row}. */ public static TypedRead readArrow() { - return readArrowImpl(null, (s, r) -> r, null, TypeDescriptor.of(Row.class)); + return readArrowImpl(null, SchemaAndRow::getElement, null, TypeDescriptor.of(Row.class)); } /** * Reads from a BigQuery table or query and returns a {@link PCollection} with one element per - * each row of the table or query result as {@link Row} with the desired schema.. + * each row of the table or query result as {@link Row} with the desired schema. */ public static TypedRead readArrow(Schema schema) { - return readArrowImpl(schema, (s, r) -> r, RowCoder.of(schema), TypeDescriptor.of(Row.class)); + return readArrowImpl( + schema, SchemaAndRow::getElement, RowCoder.of(schema), TypeDescriptor.of(Row.class)); } /** @@ -906,18 +904,17 @@ public static TypedRead readArrow(Schema schema) { public static TypedRead readArrow(SerializableFunction arrowFormatFunction) { return readArrowImpl( null, - (s, r) -> arrowFormatFunction.apply(r), + input -> arrowFormatFunction.apply(input.getElement()), null, TypeDescriptors.outputOf(arrowFormatFunction)); } private static TypedRead readArrowImpl( @Nullable Schema schema, // when null infer from TableSchema at runtime - SerializableBiFunction arrowFormatFunction, + SerializableFunction parseFn, @Nullable Coder coder, TypeDescriptor typeDescriptor) { - BigQueryReaderFactory bqReaderFactory = - BigQueryReaderFactory.arrow(schema, arrowFormatFunction); + if (typeDescriptor != null && typeDescriptor.hasUnresolvedParameters()) { // type extraction failed and will not be serializable typeDescriptor = null; @@ -926,10 +923,11 @@ private static TypedRead readArrowImpl( .setValidate(true) .setWithTemplateCompatibility(false) .setBigQueryServices(new BigQueryServicesImpl()) - .setBigQueryReaderFactory(bqReaderFactory) .setMethod(TypedRead.Method.DIRECT_READ) // arrow is only available in direct read - .setUseAvroLogicalTypes(false) .setFormat(DataFormat.ARROW) + .setArrowSchema(schema) + .setArrowParseFn(parseFn) + .setUseAvroLogicalTypes(false) .setProjectionPushdownApplied(false) .setBadRecordErrorHandler(new DefaultErrorHandler<>()) .setBadRecordRouter(BadRecordRouter.THROWING_ROUTER) @@ -1136,7 +1134,16 @@ abstract static class Builder { abstract TypedRead build(); - abstract Builder setBigQueryReaderFactory(BigQueryReaderFactory factory); + abstract Builder setAvroSchema(org.apache.avro.Schema avroSchema); + + abstract Builder setDatumReaderFactory(AvroSource.DatumReaderFactory readerFactory); + + abstract Builder setParseFn( + SerializableFunction, T> parseFn); + + abstract Builder setArrowSchema(Schema arrowSchema); + + abstract Builder setArrowParseFn(SerializableFunction parseFn); abstract Builder setCoder(Coder coder); @@ -1172,7 +1179,15 @@ abstract Builder setBadRecordErrorHandler( abstract BigQueryServices getBigQueryServices(); - abstract BigQueryReaderFactory getBigQueryReaderFactory(); + abstract org.apache.avro.@Nullable Schema getAvroSchema(); + + abstract AvroSource.@Nullable DatumReaderFactory getDatumReaderFactory(); + + abstract @Nullable SerializableFunction, T> getParseFn(); + + abstract @Nullable Schema getArrowSchema(); + + abstract @Nullable SerializableFunction getArrowParseFn(); abstract @Nullable QueryPriority getQueryPriority(); @@ -1274,7 +1289,7 @@ private BigQuerySourceDef createSourceDef() { } private BigQueryStorageQuerySource createStorageQuerySource( - String stepUuid, Coder outputCoder) { + String stepUuid, BigQueryReaderFactory bqReaderFactory, Coder outputCoder) { return BigQueryStorageQuerySource.create( stepUuid, getQuery(), @@ -1286,7 +1301,7 @@ private BigQueryStorageQuerySource createStorageQuerySource( getQueryTempProject(), getKmsKey(), getFormat(), - getBigQueryReaderFactory(), + bqReaderFactory, outputCoder, getBigQueryServices()); } @@ -1440,6 +1455,49 @@ public PCollection expand(PBegin input) { + "which only applies when reading from a table"); } + BigQueryReaderFactory bqReaderFactory; + switch (getFormat()) { + case ARROW: + checkArgument(getArrowParseFn() != null, "Arrow parseFn is required"); + + @Nullable Schema arrowSchema = getArrowSchema(); + SerializableFunction arrowParseFn = getArrowParseFn(); + + if (arrowParseFn == null) { + checkArgument(getParseFn() != null, "Arrow or Avro parseFn is required"); + LOG.warn( + "Reading ARROW from AVRO. Consider using readArrow() instead of withFormat(DataFormat.ARROW)"); + // withFormat() was probably used + SerializableFunction parseFn = + (SerializableFunction) getParseFn(); + arrowParseFn = + arrowInput -> { + GenericRecord record = AvroUtils.toGenericRecord(arrowInput.getElement()); + return parseFn.apply(new SchemaAndRecord(record, arrowInput.getTableSchema())); + }; + } + + bqReaderFactory = BigQueryReaderFactory.arrow(arrowSchema, arrowParseFn); + break; + case AVRO: + checkArgument(getDatumReaderFactory() != null, "Avro datumReaderFactory is required"); + checkArgument(getParseFn() != null, "Avro parseFn is required"); + + org.apache.avro.@Nullable Schema avroSchema = getAvroSchema(); + AvroSource.DatumReaderFactory datumFactory = getDatumReaderFactory(); + SerializableFunction, T> avroParseFn = getParseFn(); + boolean useAvroLogicalTypes = getUseAvroLogicalTypes(); + bqReaderFactory = + BigQueryReaderFactory.avro( + avroSchema, + useAvroLogicalTypes, + (AvroSource.DatumReaderFactory) datumFactory, + (SerializableFunction) avroParseFn); + break; + default: + throw new IllegalArgumentException("Unsupported format: " + getFormat()); + } + // if both toRowFn and fromRowFn values are set, enable Beam schema support Pipeline p = input.getPipeline(); BigQueryOptions bqOptions = p.getOptions().as(BigQueryOptions.class); @@ -1458,7 +1516,7 @@ public PCollection expand(PBegin input) { final Coder coder = inferCoder(p.getCoderRegistry()); if (getMethod() == TypedRead.Method.DIRECT_READ) { - return expandForDirectRead(input, coder, beamSchema, bqOptions); + return expandForDirectRead(input, coder, beamSchema, bqReaderFactory, bqOptions); } final PCollectionView jobIdTokenView; @@ -1475,10 +1533,7 @@ public PCollection expand(PBegin input) { p.apply( org.apache.beam.sdk.io.Read.from( sourceDef.toSource( - staticJobUuid, - coder, - getBigQueryReaderFactory(), - getUseAvroLogicalTypes()))); + staticJobUuid, coder, bqReaderFactory, getUseAvroLogicalTypes()))); } else { // Create a singleton job ID token at execution time. jobIdTokenCollection = @@ -1506,10 +1561,7 @@ public void processElement(ProcessContext c) throws Exception { String jobUuid = c.element(); BigQuerySourceBase source = sourceDef.toSource( - jobUuid, - coder, - getBigQueryReaderFactory(), - getUseAvroLogicalTypes()); + jobUuid, coder, bqReaderFactory, getUseAvroLogicalTypes()); BigQueryOptions options = c.getPipelineOptions().as(BigQueryOptions.class); ExtractResult res = source.extractFiles(options); @@ -1542,10 +1594,7 @@ public void processElement(ProcessContext c) throws Exception { String jobUuid = c.sideInput(jobIdTokenView); BigQuerySourceBase source = sourceDef.toSource( - jobUuid, - coder, - getBigQueryReaderFactory(), - getUseAvroLogicalTypes()); + jobUuid, coder, bqReaderFactory, getUseAvroLogicalTypes()); List> sources = source.createSources( ImmutableList.of( @@ -1612,7 +1661,11 @@ void cleanup(PassThroughThenCleanup.ContextContainer c) throws Exception { } private PCollection expandForDirectRead( - PBegin input, Coder outputCoder, Schema beamSchema, BigQueryOptions bqOptions) { + PBegin input, + Coder outputCoder, + Schema beamSchema, + BigQueryReaderFactory bqReaderFactory, + BigQueryOptions bqOptions) { ValueProvider tableProvider = getTableProvider(); Pipeline p = input.getPipeline(); if (tableProvider != null) { @@ -1628,7 +1681,7 @@ private PCollection expandForDirectRead( getFormat(), getSelectedFields(), getRowRestriction(), - getBigQueryReaderFactory(), + bqReaderFactory, outputCoder, getBigQueryServices(), getProjectionPushdownApplied()))); @@ -1649,7 +1702,7 @@ private PCollection expandForDirectRead( getFormat(), getSelectedFields(), getRowRestriction(), - getBigQueryReaderFactory(), + bqReaderFactory, outputCoder, getBigQueryServices(), getProjectionPushdownApplied()); @@ -1725,7 +1778,7 @@ && getBadRecordRouter() instanceof ThrowingBadRecordRouter) { rows = p.apply( org.apache.beam.sdk.io.Read.from( - createStorageQuerySource(staticJobUuid, outputCoder))); + createStorageQuerySource(staticJobUuid, bqReaderFactory, outputCoder))); } else { // Create a singleton job ID token at pipeline execution time. PCollection jobIdTokenCollection = @@ -1748,7 +1801,12 @@ public String apply(String input) { PCollectionTuple tuple = createTupleForDirectRead( - jobIdTokenCollection, outputCoder, readStreamsTag, readSessionTag, tableSchemaTag); + jobIdTokenCollection, + bqReaderFactory, + outputCoder, + readStreamsTag, + readSessionTag, + tableSchemaTag); tuple.get(readStreamsTag).setCoder(ProtoCoder.of(ReadStream.class)); tuple.get(readSessionTag).setCoder(ProtoCoder.of(ReadSession.class)); tuple.get(tableSchemaTag).setCoder(StringUtf8Coder.of()); @@ -1760,7 +1818,12 @@ public String apply(String input) { rows = createPCollectionForDirectRead( - tuple, outputCoder, readStreamsTag, readSessionView, tableSchemaView); + tuple, + bqReaderFactory, + outputCoder, + readStreamsTag, + readSessionView, + tableSchemaView); } PassThroughThenCleanup.CleanupOperation cleanupOperation = @@ -1812,6 +1875,7 @@ void cleanup(ContextContainer c) throws Exception { private PCollectionTuple createTupleForDirectRead( PCollection jobIdTokenCollection, + BigQueryReaderFactory bqReaderFactory, Coder outputCoder, TupleTag readStreamsTag, TupleTag readSessionTag, @@ -1830,7 +1894,7 @@ public void processElement(ProcessContext c) throws Exception { // The getTargetTable call runs a new instance of the query and returns // the destination table created to hold the results. BigQueryStorageQuerySource querySource = - createStorageQuerySource(jobUuid, outputCoder); + createStorageQuerySource(jobUuid, bqReaderFactory, outputCoder); Table queryResultTable = querySource.getTargetTable(options); // Create a read session without specifying a desired stream count and @@ -1875,6 +1939,7 @@ public void processElement(ProcessContext c) throws Exception { private PCollection createPCollectionForDirectRead( PCollectionTuple tuple, + BigQueryReaderFactory bqReaderFactory, Coder outputCoder, TupleTag readStreamsTag, PCollectionView readSessionView, @@ -1902,7 +1967,7 @@ public void processElement( readSession, readStream, tableSchema, - getBigQueryReaderFactory(), + bqReaderFactory, outputCoder, getBigQueryServices()); @@ -2142,6 +2207,16 @@ public TypedRead withMethod(TypedRead.Method method) { return toBuilder().setMethod(method).build(); } + /** + * See {@link DataFormat}. + * + * @deprecated User {@link #readAvro()} or {@link #readArrow()} instead + */ + @Deprecated + public TypedRead withFormat(DataFormat format) { + return toBuilder().setFormat(format).build(); + } + /** See {@link #withSelectedFields(ValueProvider)}. */ public TypedRead withSelectedFields(List selectedFields) { return withSelectedFields(StaticValueProvider.of(selectedFields)); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java index 4c6ae6db668c..5c881e8ab9e2 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java @@ -36,11 +36,11 @@ import java.util.Set; import java.util.function.Predicate; import java.util.stream.Collectors; -import org.apache.avro.generic.GenericRecord; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.extensions.avro.io.AvroDatumFactory; +import org.apache.beam.sdk.extensions.avro.io.AvroSource; import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.FromBeamRowFunction; @@ -59,7 +59,6 @@ import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.sdk.schemas.logicaltypes.NanosDuration; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.SerializableBiFunction; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.errorhandling.BadRecord; import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter; @@ -71,7 +70,6 @@ import org.apache.beam.sdk.util.construction.TransformUpgrader; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TypeDescriptor; -import org.apache.beam.sdk.values.TypeDescriptors; import org.apache.beam.sdk.values.ValueInSingleWindow; import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates; @@ -97,8 +95,11 @@ static class BigQueryIOReadTranslator implements TransformPayloadTranslator transform) { if (transform.getBigQueryServices() != null) { fieldValues.put("bigquery_services", toByteArray(transform.getBigQueryServices())); } - if (transform.getBigQueryReaderFactory() != null) { - fieldValues.put( - "bigquery_reader_factory", toByteArray(transform.getBigQueryReaderFactory())); + if (transform.getAvroSchema() != null) { + fieldValues.put("avro_schema", toByteArray(transform.getAvroSchema())); + } + if (transform.getDatumReaderFactory() != null) { + fieldValues.put("datum_reader_factory", toByteArray(transform.getDatumReaderFactory())); + } + if (transform.getParseFn() != null) { + fieldValues.put("parse_fn", toByteArray(transform.getParseFn())); + } + if (transform.getArrowSchema() != null) { + fieldValues.put("arrow_schema", toByteArray(transform.getArrowSchema())); + } + if (transform.getArrowParseFn() != null) { + fieldValues.put("arrow_parse_fn", toByteArray(transform.getArrowParseFn())); } if (transform.getQueryPriority() != null) { fieldValues.put("query_priority", toByteArray(transform.getQueryPriority())); @@ -258,6 +270,61 @@ public TypedRead fromConfigRow(Row configRow, PipelineOptions options) { builder.setBigQueryServices(new BigQueryServicesImpl()); } } + byte[] formatBytes = configRow.getBytes("format"); + DataFormat format = null; + if (formatBytes != null) { + format = (DataFormat) fromByteArray(formatBytes); + builder = builder.setFormat(format); + } + byte[] avroSchemaBytes = configRow.getBytes("avro_schema"); + if (avroSchemaBytes != null) { + builder = builder.setAvroSchema((org.apache.avro.Schema) fromByteArray(avroSchemaBytes)); + } + byte[] parseFnBytes = configRow.getBytes("parse_fn"); + if (parseFnBytes != null) { + builder = builder.setParseFn((SerializableFunction) fromByteArray(parseFnBytes)); + } + byte[] datumReaderFactoryBytes = configRow.getBytes("datum_reader_factory"); + if (datumReaderFactoryBytes != null) { + if (TransformUpgrader.compareVersions(updateCompatibilityBeamVersion, "2.62.0") < 0) { + // on old version, readWithDatumReader sets a SerializableFunction with unused parameter + // when parseFnBytes was set, just read as GenericRecord + if (parseFnBytes == null) { + SerializableFunction> + datumReaderFactoryFn = + (SerializableFunction>) + fromByteArray(datumReaderFactoryBytes); + builder = builder.setDatumReaderFactory(datumReaderFactoryFn.apply(null)); + } else { + builder = builder.setDatumReaderFactory(AvroDatumFactory.generic()); + } + } else { + builder = + builder.setDatumReaderFactory( + (AvroSource.DatumReaderFactory) fromByteArray(datumReaderFactoryBytes)); + } + } + byte[] arrowSchemaBytes = configRow.getBytes("arrow_schema"); + if (arrowSchemaBytes != null) { + builder = builder.setArrowSchema((Schema) fromByteArray(avroSchemaBytes)); + } + byte[] arrowParseFnBytes = configRow.getBytes("arrow_parse_fn"); + if (arrowParseFnBytes != null) { + builder = builder.setParseFn((SerializableFunction) fromByteArray(parseFnBytes)); + } else if (format == DataFormat.ARROW + && TransformUpgrader.compareVersions(updateCompatibilityBeamVersion, "2.62.0") < 0) { + if (parseFnBytes != null) { + // on old version, arrow was read from avro record + SerializableFunction avroFn = + (SerializableFunction) fromByteArray(parseFnBytes); + SerializableFunction arrowFn = + input -> + avroFn.apply( + new SchemaAndRecord( + AvroUtils.toGenericRecord(input.getElement()), input.getTableSchema())); + builder = builder.setArrowParseFn(arrowFn); + } + } byte[] queryPriorityBytes = configRow.getBytes("query_priority"); if (queryPriorityBytes != null) { builder = builder.setQueryPriority((QueryPriority) fromByteArray(queryPriorityBytes)); @@ -284,10 +351,6 @@ public TypedRead fromConfigRow(Row configRow, PipelineOptions options) { if (methodBytes != null) { builder = builder.setMethod((TypedRead.Method) fromByteArray(methodBytes)); } - byte[] formatBytes = configRow.getBytes("format"); - if (formatBytes != null) { - builder = builder.setFormat((DataFormat) fromByteArray(formatBytes)); - } Collection selectedFields = configRow.getArray("selected_fields"); if (selectedFields != null && !selectedFields.isEmpty()) { builder.setSelectedFields(StaticValueProvider.of(ImmutableList.of(selectedFields))); @@ -346,54 +409,6 @@ public TypedRead fromConfigRow(Row configRow, PipelineOptions options) { (ErrorHandler) fromByteArray(badRecordErrorHandler)); } - if (TransformUpgrader.compareVersions(updateCompatibilityBeamVersion, "2.61.0") < 0) { - // best effort migration - DataFormat dataFormat = null; - if (formatBytes != null) { - dataFormat = (DataFormat) fromByteArray(formatBytes); - } - - byte[] parseFnBytes = configRow.getBytes("parse_fn"); - if (parseFnBytes == null) { - // parseFn is null only when creating IO with readWithDatumReader - throw new RuntimeException( - "Upgrading BigqueryIO readWithDatumReader transforms is not supported."); - } else { - SerializableFunction parseFn = - (SerializableFunction) fromByteArray(parseFnBytes); - BigQueryReaderFactory readerFactory; - if (DataFormat.ARROW.equals(dataFormat)) { - SerializableBiFunction fromArrow = - (s, r) -> parseFn.apply(new SchemaAndRecord(AvroUtils.toGenericRecord(r), s)); - readerFactory = BigQueryReaderFactory.arrow(null, fromArrow); - } else { - // default to avro - SerializableBiFunction fromAvro = - (s, r) -> parseFn.apply(new SchemaAndRecord(r, s)); - boolean extractWithLogicalTypes = useAvroLogicalTypes != null && useAvroLogicalTypes; - readerFactory = - BigQueryReaderFactory.avro( - null, extractWithLogicalTypes, AvroDatumFactory.generic(), fromAvro); - } - builder.setBigQueryReaderFactory(readerFactory); - - if (configRow.getBytes("type_descriptor") == null) { - TypeDescriptor typeDescriptor = TypeDescriptors.outputOf(parseFn); - if (!typeDescriptor.hasUnresolvedParameters()) { - builder.setTypeDescriptor(typeDescriptor); - } - } - } - } else { - // This property was added for Beam 2.60.0 hence not available when - // upgrading the transform from previous Beam versions. - byte[] readerFactoryBytes = configRow.getBytes("bigquery_reader_factory"); - if (readerFactoryBytes != null) { - builder.setBigQueryReaderFactory( - (BigQueryReaderFactory) fromByteArray(readerFactoryBytes)); - } - } - return builder.build(); } catch (InvalidClassException e) { throw new RuntimeException(e); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryReaderFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryReaderFactory.java index c57f0952b748..b9c4296b13ac 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryReaderFactory.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryReaderFactory.java @@ -29,7 +29,6 @@ import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.fs.MatchResult; import org.apache.beam.sdk.schemas.Schema; -import org.apache.beam.sdk.transforms.SerializableBiFunction; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.util.SerializableSupplier; import org.apache.beam.sdk.values.Row; @@ -47,13 +46,13 @@ static BigQueryReaderFactory avro( org.apache.avro.@Nullable Schema schema, boolean extractWithLogicalTypes, AvroSource.DatumReaderFactory readerFactory, - SerializableBiFunction fromAvro) { + SerializableFunction, T> fromAvro) { return new BigQueryAvroReaderFactory<>( schema, extractWithLogicalTypes, readerFactory, fromAvro); } static BigQueryReaderFactory arrow( - @Nullable Schema schema, SerializableBiFunction fromArrow) { + @Nullable Schema schema, SerializableFunction fromArrow) { return new BigQueryArrowReaderFactory<>(schema, fromArrow); } @@ -84,13 +83,13 @@ static class BigQueryAvroReaderFactory extends BigQueryReaderFactory readerFactory; - private final SerializableBiFunction fromAvro; + private final SerializableFunction, T> fromAvro; BigQueryAvroReaderFactory( org.apache.avro.@Nullable Schema schema, boolean extractWithLogicalTypes, AvroSource.DatumReaderFactory readerFactory, - SerializableBiFunction fromAvro) { + SerializableFunction, T> fromAvro) { this.schemaSupplier = schema == null ? null : new SerializableSchemaSupplier(schema); this.extractWithLogicalTypes = extractWithLogicalTypes; @@ -119,7 +118,7 @@ private AvroSource getSource( readerSchema = BigQueryUtils.toGenericAvroSchema(tableSchema, extractWithLogicalTypes); } SerializableFunction parseFn = - (r) -> fromAvro.apply(tableSchema, (AvroT) r); + (r) -> fromAvro.apply(new SchemaAndElement<>((AvroT) r, tableSchema)); return source .withSchema(readerSchema) .withDatumReaderFactory(readerFactory) @@ -138,7 +137,8 @@ public BigQueryStorageAvroReader getReader( // BQ storage always uses logical-types readerSchema = BigQueryUtils.toGenericAvroSchema(tableSchema, true); } - SerializableFunction fromAvroRecord = (r) -> fromAvro.apply(tableSchema, r); + SerializableFunction fromAvroRecord = + (r) -> fromAvro.apply(new SchemaAndElement<>(r, tableSchema)); return new BigQueryStorageAvroReader<>( writerSchema, readerSchema, readerFactory, fromAvroRecord); } @@ -149,10 +149,10 @@ public BigQueryStorageAvroReader getReader( ///////////////////////////////////////////////////////////////////////////// static class BigQueryArrowReaderFactory extends BigQueryReaderFactory { private final SerializableFunction schemaFactory; - private final SerializableBiFunction parseFn; + private final SerializableFunction parseFn; BigQueryArrowReaderFactory( - @Nullable Schema schema, SerializableBiFunction parseFn) { + @Nullable Schema schema, SerializableFunction parseFn) { this.parseFn = parseFn; if (schema == null) { this.schemaFactory = BigQueryUtils::fromTableSchema; @@ -179,7 +179,8 @@ public BigQueryStorageArrowReader getReader(TableSchema tableSchema, ReadSess org.apache.arrow.vector.types.pojo.Schema writerSchema = ArrowConversion.arrowSchemaFromInput(input); Schema readerSchema = schemaFactory.apply(tableSchema); - SerializableFunction fromRow = (r) -> parseFn.apply(tableSchema, r); + SerializableFunction fromRow = + (r) -> parseFn.apply(new SchemaAndRow(r, tableSchema)); return new BigQueryStorageArrowReader<>(writerSchema, readerSchema, fromRow); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SchemaAndElement.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SchemaAndElement.java new file mode 100644 index 000000000000..ebae513ee8db --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SchemaAndElement.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigquery; + +import com.google.api.services.bigquery.model.TableSchema; + +/** + * A wrapper for a record and the {@link TableSchema} representing the schema of the table (or + * query) it was generated from. + */ +public class SchemaAndElement { + private final T element; + private final TableSchema tableSchema; + + public SchemaAndElement(T record, TableSchema tableSchema) { + this.element = record; + this.tableSchema = tableSchema; + } + + public T getElement() { + return element; + } + + // getRecord is defined here so method is present when cast to SchemaAndRecord + public T getRecord() { + return element; + } + + // getRow is defined here so method is present when cast to SchemaAndRow + protected T getRow() { + return element; + } + + public TableSchema getTableSchema() { + return tableSchema; + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SchemaAndRecord.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SchemaAndRecord.java index e6811efd3d82..2716b5e34e1e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SchemaAndRecord.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SchemaAndRecord.java @@ -24,20 +24,8 @@ * A wrapper for a {@link GenericRecord} and the {@link TableSchema} representing the schema of the * table (or query) it was generated from. */ -public class SchemaAndRecord { - private final GenericRecord record; - private final TableSchema tableSchema; - +public class SchemaAndRecord extends SchemaAndElement { public SchemaAndRecord(GenericRecord record, TableSchema tableSchema) { - this.record = record; - this.tableSchema = tableSchema; - } - - public GenericRecord getRecord() { - return record; - } - - public TableSchema getTableSchema() { - return tableSchema; + super(record, tableSchema); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SchemaAndRow.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SchemaAndRow.java new file mode 100644 index 000000000000..a79952d708d5 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SchemaAndRow.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigquery; + +import com.google.api.services.bigquery.model.TableSchema; +import org.apache.beam.sdk.values.Row; + +/** + * A wrapper for an arrow {@link Row} and the {@link TableSchema} representing the schema of the + * table (or query) it was generated from. + */ +public class SchemaAndRow extends SchemaAndElement { + public SchemaAndRow(Row row, TableSchema tableSchema) { + super(row, tableSchema); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java index 8ab5af6c91d2..fa0a709f2fbc 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java @@ -150,7 +150,7 @@ public void evaluate() throws Throwable { null, false, AvroDatumFactory.generic(), - (s, r) -> BigQueryAvroUtils.convertGenericRecordToTableRow(r)); + (input) -> BigQueryAvroUtils.convertGenericRecordToTableRow(input.getElement())); private static class MyData implements Serializable { private String name; diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java index bdccd35cef1c..68a17190f125 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java @@ -105,7 +105,7 @@ public class BigQueryIOStorageQueryTest { null, false, AvroDatumFactory.generic(), - (s, r) -> BigQueryAvroUtils.convertGenericRecordToTableRow(r)); + input -> BigQueryAvroUtils.convertGenericRecordToTableRow(input.getRecord())); private transient BigQueryOptions options; private transient TemporaryFolder testFolder = new TemporaryFolder(); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java index cfabd58768df..435ef9e0742c 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java @@ -177,10 +177,10 @@ public class BigQueryIOStorageReadTest { null, false, AvroDatumFactory.generic(), - (s, r) -> BigQueryAvroUtils.convertGenericRecordToTableRow(r)); + input -> BigQueryAvroUtils.convertGenericRecordToTableRow(input.getRecord())); private static final BigQueryStorageReaderFactory TABLE_ROW_ARROW_READER_FACTORY = - BigQueryReaderFactory.arrow(null, (s, r) -> BigQueryUtils.toTableRow(r)); + BigQueryReaderFactory.arrow(null, input -> BigQueryUtils.toTableRow(input.getRow())); private transient PipelineOptions options; private final transient TemporaryFolder testFolder = new TemporaryFolder(); @@ -2275,7 +2275,7 @@ public void testActuateProjectionPushdown() { BigQueryIO.read( record -> BigQueryUtils.toBeamRow( - record.getRecord(), schema, ConversionOptions.builder().build())) + record.getElement(), schema, ConversionOptions.builder().build())) .withMethod(Method.DIRECT_READ) .withCoder(SchemaCoder.of(schema)); @@ -2301,7 +2301,7 @@ public void testReadFromQueryDoesNotSupportProjectionPushdown() { BigQueryIO.read( record -> BigQueryUtils.toBeamRow( - record.getRecord(), schema, ConversionOptions.builder().build())) + record.getElement(), schema, ConversionOptions.builder().build())) .fromQuery("SELECT bar FROM `dataset.table`") .withMethod(Method.DIRECT_READ) .withCoder(SchemaCoder.of(schema)); @@ -2339,7 +2339,8 @@ public void testReadFromBigQueryAvroObjectsMutation() throws Exception { .thenReturn(new FakeBigQueryServerStream<>(responses)); BigQueryStorageReaderFactory readerFactory = - BigQueryReaderFactory.avro(null, false, AvroDatumFactory.generic(), (s, r) -> r); + BigQueryReaderFactory.avro( + null, false, AvroDatumFactory.generic(), SchemaAndElement::getRecord); BigQueryStorageStreamSource streamSource = BigQueryStorageStreamSource.create( readSession, diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java index 6699b115bcc1..857d90cf2dfd 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java @@ -24,11 +24,15 @@ import com.google.api.services.bigquery.model.Clustering; import com.google.api.services.bigquery.model.TableRow; +import com.google.api.services.bigquery.model.TableSchema; +import com.google.cloud.bigquery.storage.v1.DataFormat; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import org.apache.beam.sdk.extensions.avro.io.AvroDatumFactory; +import org.apache.beam.sdk.extensions.avro.io.AvroSource; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; @@ -36,6 +40,7 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.util.construction.TransformUpgrader; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.junit.Test; @@ -58,7 +63,11 @@ public class BigQueryIOTranslationTest { READ_TRANSFORM_SCHEMA_MAPPING.put( "getWithTemplateCompatibility", "with_template_compatibility"); READ_TRANSFORM_SCHEMA_MAPPING.put("getBigQueryServices", "bigquery_services"); - READ_TRANSFORM_SCHEMA_MAPPING.put("getBigQueryReaderFactory", "bigquery_reader_factory"); + READ_TRANSFORM_SCHEMA_MAPPING.put("getAvroSchema", "avro_schema"); + READ_TRANSFORM_SCHEMA_MAPPING.put("getParseFn", "parse_fn"); + READ_TRANSFORM_SCHEMA_MAPPING.put("getDatumReaderFactory", "datum_reader_factory"); + READ_TRANSFORM_SCHEMA_MAPPING.put("getArrowSchema", "arrow_schema"); + READ_TRANSFORM_SCHEMA_MAPPING.put("getArrowParseFn", "arrow_parse_fn"); READ_TRANSFORM_SCHEMA_MAPPING.put("getQueryPriority", "query_priority"); READ_TRANSFORM_SCHEMA_MAPPING.put("getQueryLocation", "query_location"); READ_TRANSFORM_SCHEMA_MAPPING.put("getQueryTempDataset", "query_temp_dataset"); @@ -141,9 +150,82 @@ public class BigQueryIOTranslationTest { WRITE_TRANSFORM_SCHEMA_MAPPING.put("getBadRecordErrorHandler", "bad_record_error_handler"); } + static class DummyParseFn implements SerializableFunction { + @Override + public Object apply(SchemaAndRecord input) { + return null; + } + } + + @Test + public void testReCreateReadTransformFromDeprecatedArrow() { + BigQueryIO.TypedRead readTransform = + BigQueryIO.read(new DummyParseFn()) + .withFormat(DataFormat.ARROW) + .from("dummyproject:dummydataset.dummytable") + .withMethod(TypedRead.Method.DIRECT_READ); + + BigQueryIOTranslation.BigQueryIOReadTranslator translator = + new BigQueryIOTranslation.BigQueryIOReadTranslator(); + + // old versions do not set arrow_parse_fn + Row row = + Row.fromRow(translator.toConfigRow(readTransform)) + .withFieldValue("arrow_parse_fn", null) + .build(); + + PipelineOptions options = PipelineOptionsFactory.create(); + options.as(StreamingOptions.class).setUpdateCompatibilityVersion("2.60.0"); + BigQueryIO.TypedRead readTransformFromRow = + (BigQueryIO.TypedRead) translator.fromConfigRow(row, options); + assertNotNull(readTransformFromRow.getTable()); + assertEquals("dummyproject", readTransformFromRow.getTable().getProjectId()); + assertEquals("dummydataset", readTransformFromRow.getTable().getDatasetId()); + assertEquals("dummytable", readTransformFromRow.getTable().getTableId()); + assertNotNull(readTransformFromRow.getArrowParseFn()); + assertEquals(TypedRead.Method.DIRECT_READ, readTransformFromRow.getMethod()); + } + + public static class DummyClass { + + public String name; + + @org.apache.avro.reflect.Nullable public Integer age; + } + + @Test + public void testReCreateReadTransformFromDatumReader() { + AvroSource.DatumReaderFactory readerFactory = + AvroDatumFactory.reflect(DummyClass.class); + BigQueryIO.TypedRead readTransform = + BigQueryIO.readWithDatumReader(readerFactory).from("dummyproject:dummydataset.dummytable"); + + BigQueryIOTranslation.BigQueryIOReadTranslator translator = + new BigQueryIOTranslation.BigQueryIOReadTranslator(); + + // old versions set a SerializableFunction with unused input and do not set parseFn + SerializableFunction> oldDatumFactory = + (schema) -> readerFactory; + Row row = + Row.fromRow(translator.toConfigRow(readTransform)) + .withFieldValue("datum_reader_factory", TransformUpgrader.toByteArray(oldDatumFactory)) + .withFieldValue("parse_fn", null) + .build(); + + PipelineOptions options = PipelineOptionsFactory.create(); + options.as(StreamingOptions.class).setUpdateCompatibilityVersion("2.60.0"); + BigQueryIO.TypedRead readTransformFromRow = + (BigQueryIO.TypedRead) translator.fromConfigRow(row, options); + assertNotNull(readTransformFromRow.getTable()); + assertEquals("dummyproject", readTransformFromRow.getTable().getProjectId()); + assertEquals("dummydataset", readTransformFromRow.getTable().getDatasetId()); + assertEquals("dummytable", readTransformFromRow.getTable().getTableId()); + assertTrue( + readTransformFromRow.getDatumReaderFactory() instanceof AvroSource.DatumReaderFactory); + } + @Test public void testReCreateReadTransformFromRowTable() { - // setting a subset of fields here. BigQueryIO.TypedRead readTransform = BigQueryIO.readTableRows() .from("dummyproject:dummydataset.dummytable") @@ -155,10 +237,8 @@ public void testReCreateReadTransformFromRowTable() { new BigQueryIOTranslation.BigQueryIOReadTranslator(); Row row = translator.toConfigRow(readTransform); - PipelineOptions options = PipelineOptionsFactory.create(); - options.as(StreamingOptions.class).setUpdateCompatibilityVersion("2.61.0"); - BigQueryIO.TypedRead readTransformFromRow = - (BigQueryIO.TypedRead) translator.fromConfigRow(row, options); + BigQueryIO.TypedRead readTransformFromRow = + translator.fromConfigRow(row, PipelineOptionsFactory.create()); assertNotNull(readTransformFromRow.getTable()); assertEquals("dummyproject", readTransformFromRow.getTable().getProjectId()); assertEquals("dummydataset", readTransformFromRow.getTable().getDatasetId()); @@ -168,16 +248,8 @@ public void testReCreateReadTransformFromRowTable() { assertTrue(readTransformFromRow.getWithTemplateCompatibility()); } - static class DummyParseFn implements SerializableFunction { - @Override - public Object apply(SchemaAndRecord input) { - return null; - } - } - @Test public void testReCreateReadTransformFromRowQuery() { - // setting a subset of fields here. BigQueryIO.TypedRead readTransform = BigQueryIO.read(new DummyParseFn()) .fromQuery("dummyquery") @@ -188,11 +260,11 @@ public void testReCreateReadTransformFromRowQuery() { new BigQueryIOTranslation.BigQueryIOReadTranslator(); Row row = translator.toConfigRow(readTransform); - PipelineOptions options = PipelineOptionsFactory.create(); - options.as(StreamingOptions.class).setUpdateCompatibilityVersion("2.61.0"); - BigQueryIO.TypedRead readTransformFromRow = translator.fromConfigRow(row, options); + BigQueryIO.TypedRead readTransformFromRow = + translator.fromConfigRow(row, PipelineOptionsFactory.create()); assertEquals("dummyquery", readTransformFromRow.getQuery().get()); - assertNotNull(readTransformFromRow.getBigQueryReaderFactory()); + assertNotNull(readTransformFromRow.getParseFn()); + assertTrue(readTransformFromRow.getParseFn() instanceof DummyParseFn); assertTrue(readTransformFromRow.getUseAvroLogicalTypes()); assertFalse(readTransformFromRow.getUseLegacySql()); } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageReaderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageReaderTest.java index 746fbe4ce9ed..2cef22d7d64b 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageReaderTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageReaderTest.java @@ -95,7 +95,8 @@ public class BigQueryStorageReaderTest { @Test public void bigQueryStorageReaderFactory_arrowReader() throws Exception { - BigQueryReaderFactory factory = BigQueryReaderFactory.arrow(BEAM_SCHEMA, (s, r) -> r); + BigQueryReaderFactory factory = + BigQueryReaderFactory.arrow(BEAM_SCHEMA, SchemaAndRow::getRow); BigQueryStorageReader reader = factory.getReader(TABLE_SCHEMA, ARROW_READ_SESSION); assertThat(reader, instanceOf(BigQueryStorageArrowReader.class)); @@ -106,7 +107,7 @@ public void bigQueryStorageReaderFactory_arrowReader() throws Exception { public void bigQueryStorageReaderFactory_avroReader() throws Exception { AvroDatumFactory datumFactory = AvroDatumFactory.generic(); BigQueryReaderFactory factory = - BigQueryReaderFactory.avro(AVRO_SCHEMA, false, datumFactory, (s, r) -> r); + BigQueryReaderFactory.avro(AVRO_SCHEMA, false, datumFactory, SchemaAndElement::getRecord); BigQueryStorageReader reader = factory.getReader(TABLE_SCHEMA, AVRO_READ_SESSION);