From e1f6d5a1ea7a8555e1c499a8118c33c7912e07d7 Mon Sep 17 00:00:00 2001 From: Michel Davit Date: Fri, 20 Sep 2024 11:52:45 +0200 Subject: [PATCH] Include minimal changeset --- .../io/gcp/bigquery/BigQueryAvroUtils.java | 41 +- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 30 +- .../gcp/bigquery/BigQueryQuerySourceDef.java | 12 +- .../io/gcp/bigquery/BigQuerySourceDef.java | 9 - .../bigquery/BigQueryStorageSourceBase.java | 28 +- .../gcp/bigquery/BigQueryTableSourceDef.java | 12 +- .../sdk/io/gcp/bigquery/BigQueryUtils.java | 60 +-- .../gcp/bigquery/BigQueryAvroUtilsTest.java | 393 +++++++----------- .../bigquery/BigQueryIOStorageQueryTest.java | 19 +- .../bigquery/BigQueryIOStorageReadTest.java | 80 ++-- 10 files changed, 328 insertions(+), 356 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java index b24fe13262c2..06a8e4e100ef 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java @@ -35,6 +35,8 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.avro.Conversions; import org.apache.avro.LogicalType; import org.apache.avro.LogicalTypes; @@ -58,6 +60,7 @@ */ class BigQueryAvroUtils { + // org.apache.avro.LogicalType static class DateTimeLogicalType extends LogicalType { public DateTimeLogicalType() { super("datetime"); @@ -112,6 +115,7 @@ static Schema getPrimitiveType(TableFieldSchema schema, Boolean useAvroLogicalTy } else if (bqType.equals("NUMERIC")) { logicalType = LogicalTypes.decimal(38, 9); } else { + // BIGNUMERIC logicalType = LogicalTypes.decimal(77, 38); } return logicalType.addToSchema(SchemaBuilder.builder().bytesType()); @@ -230,6 +234,41 @@ private static String formatTime(long timeMicros) { return LocalTime.ofNanoOfDay(timeMicros * 1000).format(formatter); } + static TableSchema trimBigQueryTableSchema(TableSchema inputSchema, Schema avroSchema) { + List subSchemas = + inputSchema.getFields().stream() + .flatMap(fieldSchema -> mapTableFieldSchema(fieldSchema, avroSchema)) + .collect(Collectors.toList()); + + return new TableSchema().setFields(subSchemas); + } + + private static Stream mapTableFieldSchema( + TableFieldSchema fieldSchema, Schema avroSchema) { + Field avroFieldSchema = avroSchema.getField(fieldSchema.getName()); + if (avroFieldSchema == null) { + return Stream.empty(); + } else if (avroFieldSchema.schema().getType() != Type.RECORD) { + return Stream.of(fieldSchema); + } + + List subSchemas = + fieldSchema.getFields().stream() + .flatMap(subSchema -> mapTableFieldSchema(subSchema, avroFieldSchema.schema())) + .collect(Collectors.toList()); + + TableFieldSchema output = + new TableFieldSchema() + .setCategories(fieldSchema.getCategories()) + .setDescription(fieldSchema.getDescription()) + .setFields(subSchemas) + .setMode(fieldSchema.getMode()) + .setName(fieldSchema.getName()) + .setType(fieldSchema.getType()); + + return Stream.of(output); + } + /** * Utility function to convert from an Avro {@link GenericRecord} to a BigQuery {@link TableRow}. * @@ -303,8 +342,6 @@ private static Object convertRequiredField(String name, Schema schema, Object v) // INTEGER type maps to an Avro LONG type. checkNotNull(v, "REQUIRED field %s should not be null", name); - // For historical reasons, don't validate avroLogicalType except for with NUMERIC. - // BigQuery represents NUMERIC in Avro format as BYTES with a DECIMAL logical type. Type type = schema.getType(); LogicalType logicalType = schema.getLogicalType(); switch (type) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 7bd2eb7da62a..19ff1576bb24 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -647,12 +647,12 @@ public static Read read() { * domain-specific type, due to the overhead of converting the rows to {@link TableRow}. */ public static TypedRead readTableRows() { - return read(TableRowParser.INSTANCE).withCoder(TableRowJsonCoder.of()); + return read(new TableRowParser()).withCoder(TableRowJsonCoder.of()); } /** Like {@link #readTableRows()} but with {@link Schema} support. */ public static TypedRead readTableRowsWithSchema() { - return read(TableRowParser.INSTANCE) + return read(new TableRowParser()) .withCoder(TableRowJsonCoder.of()) .withBeamRowConverters( TypeDescriptor.of(TableRow.class), @@ -1272,12 +1272,8 @@ public PCollection expand(PBegin input) { Schema beamSchema = null; if (getTypeDescriptor() != null && getToBeamRowFn() != null && getFromBeamRowFn() != null) { - TableSchema tableSchema = sourceDef.getTableSchema(bqOptions); - ValueProvider> selectedFields = getSelectedFields(); - if (selectedFields != null) { - tableSchema = BigQueryUtils.trimSchema(tableSchema, selectedFields.get()); - } - beamSchema = BigQueryUtils.fromTableSchema(tableSchema); + beamSchema = sourceDef.getBeamSchema(bqOptions); + beamSchema = getFinalSchema(beamSchema, getSelectedFields()); } final Coder coder = inferCoder(p.getCoderRegistry()); @@ -1442,6 +1438,24 @@ void cleanup(PassThroughThenCleanup.ContextContainer c) throws Exception { return rows; } + private static Schema getFinalSchema( + Schema beamSchema, ValueProvider> selectedFields) { + List flds = + beamSchema.getFields().stream() + .filter( + field -> { + if (selectedFields != null + && selectedFields.isAccessible() + && selectedFields.get() != null) { + return selectedFields.get().contains(field.getName()); + } else { + return true; + } + }) + .collect(Collectors.toList()); + return Schema.builder().addFields(flds).build(); + } + private PCollection expandForDirectRead( PBegin input, Coder outputCoder, Schema beamSchema, BigQueryOptions bqOptions) { ValueProvider tableProvider = getTableProvider(); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySourceDef.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySourceDef.java index 9d0d82f72eb8..b4035a4e9ac3 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySourceDef.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySourceDef.java @@ -178,7 +178,7 @@ public BigQuerySourceBase toSource( /** {@inheritDoc} */ @Override - public TableSchema getTableSchema(BigQueryOptions bqOptions) { + public Schema getBeamSchema(BigQueryOptions bqOptions) { try { JobStatistics stats = BigQueryQueryHelper.dryRunQueryIfNeeded( @@ -189,20 +189,14 @@ public TableSchema getTableSchema(BigQueryOptions bqOptions) { flattenResults, useLegacySql, location); - return stats.getQuery().getSchema(); + TableSchema tableSchema = stats.getQuery().getSchema(); + return BigQueryUtils.fromTableSchema(tableSchema); } catch (IOException | InterruptedException | NullPointerException e) { throw new BigQuerySchemaRetrievalException( "Exception while trying to retrieve schema of query", e); } } - /** {@inheritDoc} */ - @Override - public Schema getBeamSchema(BigQueryOptions bqOptions) { - TableSchema tableSchema = getTableSchema(bqOptions); - return BigQueryUtils.fromTableSchema(tableSchema); - } - ValueProvider getQuery() { return query; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceDef.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceDef.java index 579a602ab552..c9b1d5f73224 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceDef.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceDef.java @@ -45,15 +45,6 @@ BigQuerySourceBase toSource( SerializableFunction> readerFactory, boolean useAvroLogicalTypes); - /** - * Extract the {@link TableSchema} corresponding to this source. - * - * @param bqOptions BigQueryOptions - * @return table schema of the source - * @throws BigQuerySchemaRetrievalException if schema retrieval fails - */ - TableSchema getTableSchema(BigQueryOptions bqOptions); - /** * Extract the Beam {@link Schema} corresponding to this source. * diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java index cef2e3aee3f4..51a5a8f391a6 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java @@ -28,7 +28,10 @@ import com.google.cloud.bigquery.storage.v1.ReadStream; import java.io.IOException; import java.util.List; +import org.apache.avro.Schema; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.extensions.arrow.ArrowConversion; +import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.StorageClient; import org.apache.beam.sdk.metrics.Lineage; @@ -179,17 +182,30 @@ public List> split( LOG.info("Read session returned {} streams", readSession.getStreamsList().size()); } - // TODO: this is inconsistent with method above, where it can be null - Preconditions.checkStateNotNull(targetTable); - TableSchema tableSchema = targetTable.getSchema(); - if (selectedFieldsProvider != null) { - tableSchema = BigQueryUtils.trimSchema(tableSchema, selectedFieldsProvider.get()); + Schema sessionSchema; + if (readSession.getDataFormat() == DataFormat.ARROW) { + org.apache.arrow.vector.types.pojo.Schema schema = + ArrowConversion.arrowSchemaFromInput( + readSession.getArrowSchema().getSerializedSchema().newInput()); + org.apache.beam.sdk.schemas.Schema beamSchema = + ArrowConversion.ArrowSchemaTranslator.toBeamSchema(schema); + sessionSchema = AvroUtils.toAvroSchema(beamSchema); + } else if (readSession.getDataFormat() == DataFormat.AVRO) { + sessionSchema = new Schema.Parser().parse(readSession.getAvroSchema().getSchema()); + } else { + throw new IllegalArgumentException( + "data is not in a supported dataFormat: " + readSession.getDataFormat()); } + + Preconditions.checkStateNotNull( + targetTable); // TODO: this is inconsistent with method above, where it can be null + TableSchema trimmedSchema = + BigQueryAvroUtils.trimBigQueryTableSchema(targetTable.getSchema(), sessionSchema); List> sources = Lists.newArrayList(); for (ReadStream readStream : readSession.getStreamsList()) { sources.add( BigQueryStorageStreamSource.create( - readSession, readStream, tableSchema, parseFn, outputCoder, bqServices)); + readSession, readStream, trimmedSchema, parseFn, outputCoder, bqServices)); } return ImmutableList.copyOf(sources); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSourceDef.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSourceDef.java index cc1532a522bc..b399900f9a24 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSourceDef.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSourceDef.java @@ -102,22 +102,16 @@ public BigQuerySourceBase toSource( /** {@inheritDoc} */ @Override - public TableSchema getTableSchema(BigQueryOptions bqOptions) { + public Schema getBeamSchema(BigQueryOptions bqOptions) { try { try (DatasetService datasetService = bqServices.getDatasetService(bqOptions)) { TableReference tableRef = getTableReference(bqOptions); Table table = datasetService.getTable(tableRef); - return Preconditions.checkStateNotNull(table).getSchema(); + TableSchema tableSchema = Preconditions.checkStateNotNull(table).getSchema(); + return BigQueryUtils.fromTableSchema(tableSchema); } } catch (Exception e) { throw new BigQuerySchemaRetrievalException("Exception while trying to retrieve schema", e); } } - - /** {@inheritDoc} */ - @Override - public Schema getBeamSchema(BigQueryOptions bqOptions) { - TableSchema tableSchema = getTableSchema(bqOptions); - return BigQueryUtils.fromTableSchema(tableSchema); - } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java index d9d2b135f22b..606ce31b8bea 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java @@ -310,44 +310,45 @@ static StandardSQLTypeName toStandardSQLTypeName(FieldType fieldType) { * *

Supports both standard and legacy SQL types. * - * @param typeName Name of the type + * @param typeName Name of the type returned by {@link TableFieldSchema#getType()} * @param nestedFields Nested fields for the given type (eg. RECORD type) * @return Corresponding Beam {@link FieldType} */ private static FieldType fromTableFieldSchemaType( String typeName, List nestedFields, SchemaConversionOptions options) { + // see + // https://googleapis.dev/java/google-api-services-bigquery/latest/com/google/api/services/bigquery/model/TableFieldSchema.html#getType-- switch (typeName) { case "STRING": return FieldType.STRING; case "BYTES": return FieldType.BYTES; - case "INT64": - case "INT": - case "SMALLINT": case "INTEGER": - case "BIGINT": - case "TINYINT": - case "BYTEINT": + case "INT64": return FieldType.INT64; + case "FLOAT": case "FLOAT64": - case "FLOAT": // even if not a valid BQ type, it is used in the schema return FieldType.DOUBLE; - case "BOOL": case "BOOLEAN": + case "BOOL": return FieldType.BOOLEAN; - case "NUMERIC": - case "BIGNUMERIC": - return FieldType.DECIMAL; case "TIMESTAMP": return FieldType.DATETIME; - case "TIME": - return FieldType.logicalType(SqlTypes.TIME); case "DATE": return FieldType.logicalType(SqlTypes.DATE); + case "TIME": + return FieldType.logicalType(SqlTypes.TIME); case "DATETIME": return FieldType.logicalType(SqlTypes.DATETIME); - case "STRUCT": + case "NUMERIC": + case "BIGNUMERIC": + return FieldType.DECIMAL; + case "GEOGRAPHY": + case "JSON": + // TODO Add metadata for custom sql types ? + return FieldType.STRING; case "RECORD": + case "STRUCT": if (options.getInferMaps() && nestedFields.size() == 2) { TableFieldSchema key = nestedFields.get(0); TableFieldSchema value = nestedFields.get(1); @@ -358,13 +359,9 @@ private static FieldType fromTableFieldSchemaType( fromTableFieldSchemaType(value.getType(), value.getFields(), options)); } } - Schema rowSchema = fromTableFieldSchema(nestedFields, options); return FieldType.row(rowSchema); - case "GEOGRAPHY": - case "JSON": - // TODO Add metadata for custom sql types - return FieldType.STRING; + case "RANGE": // TODO add support for range type default: throw new UnsupportedOperationException( "Converting BigQuery type " + typeName + " to Beam type is unsupported"); @@ -463,8 +460,8 @@ public static org.apache.avro.Schema toGenericAvroSchema(TableSchema tableSchema /** Convert a list of BigQuery {@link TableSchema} to Avro {@link org.apache.avro.Schema}. */ public static org.apache.avro.Schema toGenericAvroSchema( - TableSchema tableSchema, Boolean stringLogicalTypes) { - return toGenericAvroSchema("root", tableSchema.getFields(), stringLogicalTypes); + TableSchema tableSchema, Boolean useAvroLogicalTypes) { + return toGenericAvroSchema("root", tableSchema.getFields(), useAvroLogicalTypes); } /** Convert a list of BigQuery {@link TableFieldSchema} to Avro {@link org.apache.avro.Schema}. */ @@ -475,8 +472,8 @@ public static org.apache.avro.Schema toGenericAvroSchema( /** Convert a list of BigQuery {@link TableFieldSchema} to Avro {@link org.apache.avro.Schema}. */ public static org.apache.avro.Schema toGenericAvroSchema( - String schemaName, List fieldSchemas, Boolean stringLogicalTypes) { - return BigQueryAvroUtils.toGenericAvroSchema(schemaName, fieldSchemas, stringLogicalTypes); + String schemaName, List fieldSchemas, Boolean useAvroLogicalTypes) { + return BigQueryAvroUtils.toGenericAvroSchema(schemaName, fieldSchemas, useAvroLogicalTypes); } private static final BigQueryIO.TypedRead.ToBeamRowFunction @@ -1077,21 +1074,6 @@ private static Object convertAvroNumeric(Object value) { return tableSpec; } - static TableSchema trimSchema(TableSchema schema, @Nullable List selectedFields) { - if (selectedFields == null || selectedFields.isEmpty()) { - return schema; - } - - List fields = schema.getFields(); - List trimmedFields = new ArrayList<>(); - for (TableFieldSchema field : fields) { - if (selectedFields.contains(field.getName())) { - trimmedFields.add(field); - } - } - return new TableSchema().setFields(trimmedFields); - } - private static @Nullable ServiceCallMetric callMetricForMethod( @Nullable TableReference tableReference, String method) { if (tableReference != null) { diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java index 0447b2d07b1a..662f2658eb6b 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java @@ -17,7 +17,6 @@ */ package org.apache.beam.sdk.io.gcp.bigquery; -import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryAvroUtils.DATETIME_LOGICAL_TYPE; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -29,22 +28,25 @@ import java.math.BigDecimal; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.time.Instant; import java.util.ArrayList; import java.util.List; import org.apache.avro.Conversions; import org.apache.avro.LogicalType; import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; -import org.apache.avro.SchemaBuilder; +import org.apache.avro.Schema.Field; +import org.apache.avro.Schema.Type; +import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.GenericRecordBuilder; import org.apache.avro.reflect.AvroSchema; import org.apache.avro.reflect.Nullable; import org.apache.avro.reflect.ReflectData; import org.apache.avro.util.Utf8; +import org.apache.beam.sdk.extensions.avro.coders.AvroCoder; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.BaseEncoding; -import org.joda.time.Instant; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -112,11 +114,8 @@ public void testConvertGenericRecordToTableRow() throws Exception { { // Test nullable fields. - GenericRecord record = - new GenericRecordBuilder(avroSchema) - .set("number", 5L) - .set("associates", new ArrayList()) - .build(); + GenericRecord record = new GenericData.Record(avroSchema); + record.put("number", 5L); TableRow convertedRow = BigQueryAvroUtils.convertGenericRecordToTableRow(record); TableRow row = new TableRow().set("number", "5").set("associates", new ArrayList()); assertEquals(row, convertedRow); @@ -126,54 +125,50 @@ public void testConvertGenericRecordToTableRow() throws Exception { { // Test type conversion for: // INTEGER, FLOAT, NUMERIC, TIMESTAMP, BOOLEAN, BYTES, DATE, DATETIME, TIME. + GenericRecord record = new GenericData.Record(avroSchema); byte[] soundBytes = "chirp,chirp".getBytes(StandardCharsets.UTF_8); - GenericRecord record = - new GenericRecordBuilder(avroSchema) - .set("number", 5L) - .set("quality", 5.0) - .set("birthday", 5L) - .set("birthdayMoney", numericBytes) - .set("lotteryWinnings", bigNumericBytes) - .set("flighted", Boolean.TRUE) - .set("sound", ByteBuffer.wrap(soundBytes)) - .set("anniversaryDate", new Utf8("2000-01-01")) - .set("anniversaryDatetime", "2000-01-01 00:00:00.000005") - .set("anniversaryTime", new Utf8("00:00:00.000005")) - .set("geoPositions", "LINESTRING(1 2, 3 4, 5 6, 7 8)") - .set("associates", new ArrayList()) - .build(); - + ByteBuffer soundByteBuffer = ByteBuffer.wrap(soundBytes); + soundByteBuffer.rewind(); + record.put("number", 5L); + record.put("quality", 5.0); + record.put("birthday", 5L); + record.put("birthdayMoney", numericBytes); + record.put("lotteryWinnings", bigNumericBytes); + record.put("flighted", Boolean.TRUE); + record.put("sound", soundByteBuffer); + record.put("anniversaryDate", new Utf8("2000-01-01")); + record.put("anniversaryDatetime", new String("2000-01-01 00:00:00.000005")); + record.put("anniversaryTime", new Utf8("00:00:00.000005")); + record.put("geoPositions", new String("LINESTRING(1 2, 3 4, 5 6, 7 8)")); TableRow convertedRow = BigQueryAvroUtils.convertGenericRecordToTableRow(record); TableRow row = new TableRow() - .set("anniversaryDate", "2000-01-01") - .set("anniversaryDatetime", "2000-01-01 00:00:00.000005") - .set("anniversaryTime", "00:00:00.000005") - .set("associates", new ArrayList()) + .set("number", "5") .set("birthday", "1970-01-01 00:00:00.000005 UTC") .set("birthdayMoney", numeric.toString()) - .set("flighted", Boolean.TRUE) - .set("geoPositions", "LINESTRING(1 2, 3 4, 5 6, 7 8)") .set("lotteryWinnings", bigNumeric.toString()) - .set("number", "5") .set("quality", 5.0) - .set("sound", BaseEncoding.base64().encode(soundBytes)); - assertEquals(row, convertedRow); + .set("associates", new ArrayList()) + .set("flighted", Boolean.TRUE) + .set("sound", BaseEncoding.base64().encode(soundBytes)) + .set("anniversaryDate", "2000-01-01") + .set("anniversaryDatetime", "2000-01-01 00:00:00.000005") + .set("anniversaryTime", "00:00:00.000005") + .set("geoPositions", "LINESTRING(1 2, 3 4, 5 6, 7 8)"); TableRow clonedRow = convertedRow.clone(); - assertEquals(clonedRow, convertedRow); + assertEquals(convertedRow, clonedRow); + assertEquals(row, convertedRow); } { // Test repeated fields. - Schema subBirdSchema = ReflectData.get().getSchema(Bird.SubBird.class); - GenericRecord nestedRecord = - new GenericRecordBuilder(subBirdSchema).set("species", "other").build(); - GenericRecord record = - new GenericRecordBuilder(avroSchema) - .set("number", 5L) - .set("associates", Lists.newArrayList(nestedRecord)) - .set("birthdayMoney", numericBytes) - .set("lotteryWinnings", bigNumericBytes) - .build(); + Schema subBirdSchema = AvroCoder.of(Bird.SubBird.class).getSchema(); + GenericRecord nestedRecord = new GenericData.Record(subBirdSchema); + nestedRecord.put("species", "other"); + GenericRecord record = new GenericData.Record(avroSchema); + record.put("number", 5L); + record.put("associates", Lists.newArrayList(nestedRecord)); + record.put("birthdayMoney", numericBytes); + record.put("lotteryWinnings", bigNumericBytes); TableRow convertedRow = BigQueryAvroUtils.convertGenericRecordToTableRow(record); TableRow row = new TableRow() @@ -188,269 +183,201 @@ public void testConvertGenericRecordToTableRow() throws Exception { } @Test - public void testConvertBigQuerySchemaToAvroSchemaDisabledLogicalTypes() { + public void testConvertBigQuerySchemaToAvroSchema() { TableSchema tableSchema = new TableSchema(); tableSchema.setFields(fields); - Schema avroSchema = BigQueryAvroUtils.toGenericAvroSchema(tableSchema, false); + Schema avroSchema = BigQueryAvroUtils.toGenericAvroSchema(tableSchema); - assertThat(avroSchema.getField("number").schema(), equalTo(SchemaBuilder.builder().longType())); + assertThat(avroSchema.getField("number").schema(), equalTo(Schema.create(Type.LONG))); assertThat( avroSchema.getField("species").schema(), - equalTo(SchemaBuilder.builder().unionOf().nullType().and().stringType().endUnion())); + equalTo(Schema.createUnion(Schema.create(Type.NULL), Schema.create(Type.STRING)))); assertThat( avroSchema.getField("quality").schema(), - equalTo(SchemaBuilder.builder().unionOf().nullType().and().doubleType().endUnion())); + equalTo(Schema.createUnion(Schema.create(Type.NULL), Schema.create(Type.DOUBLE)))); assertThat( avroSchema.getField("quantity").schema(), - equalTo(SchemaBuilder.builder().unionOf().nullType().and().longType().endUnion())); + equalTo(Schema.createUnion(Schema.create(Type.NULL), Schema.create(Type.LONG)))); assertThat( avroSchema.getField("birthday").schema(), equalTo( - SchemaBuilder.builder() - .unionOf() - .nullType() - .and() - .type( - LogicalTypes.timestampMicros().addToSchema(SchemaBuilder.builder().longType())) - .endUnion())); + Schema.createUnion( + Schema.create(Type.NULL), + LogicalTypes.timestampMicros().addToSchema(Schema.create(Type.LONG))))); assertThat( avroSchema.getField("birthdayMoney").schema(), equalTo( - SchemaBuilder.builder() - .unionOf() - .nullType() - .and() - .type(LogicalTypes.decimal(38, 9).addToSchema(SchemaBuilder.builder().bytesType())) - .endUnion())); + Schema.createUnion( + Schema.create(Type.NULL), + LogicalTypes.decimal(38, 9).addToSchema(Schema.create(Type.BYTES))))); assertThat( avroSchema.getField("lotteryWinnings").schema(), equalTo( - SchemaBuilder.builder() - .unionOf() - .nullType() - .and() - .type(LogicalTypes.decimal(77, 38).addToSchema(SchemaBuilder.builder().bytesType())) - .endUnion())); + Schema.createUnion( + Schema.create(Type.NULL), + LogicalTypes.decimal(77, 38).addToSchema(Schema.create(Type.BYTES))))); assertThat( avroSchema.getField("flighted").schema(), - equalTo(SchemaBuilder.builder().unionOf().nullType().and().booleanType().endUnion())); + equalTo(Schema.createUnion(Schema.create(Type.NULL), Schema.create(Type.BOOLEAN)))); assertThat( avroSchema.getField("sound").schema(), - equalTo(SchemaBuilder.builder().unionOf().nullType().and().bytesType().endUnion())); + equalTo(Schema.createUnion(Schema.create(Type.NULL), Schema.create(Type.BYTES)))); + Schema dateSchema = Schema.create(Type.INT); + LogicalTypes.date().addToSchema(dateSchema); assertThat( avroSchema.getField("anniversaryDate").schema(), - equalTo( - SchemaBuilder.builder() - .unionOf() - .nullType() - .and() - .stringBuilder() - .prop("sqlType", "DATE") - .endString() - .endUnion())); + equalTo(Schema.createUnion(Schema.create(Type.NULL), dateSchema))); + Schema dateTimeSchema = Schema.create(Type.STRING); + BigQueryAvroUtils.DATETIME_LOGICAL_TYPE.addToSchema(dateTimeSchema); assertThat( avroSchema.getField("anniversaryDatetime").schema(), - equalTo( - SchemaBuilder.builder() - .unionOf() - .nullType() - .and() - .stringBuilder() - .prop("sqlType", "DATETIME") - .endString() - .endUnion())); + equalTo(Schema.createUnion(Schema.create(Type.NULL), dateTimeSchema))); + Schema timeSchema = Schema.create(Type.LONG); + LogicalTypes.timeMicros().addToSchema(timeSchema); assertThat( avroSchema.getField("anniversaryTime").schema(), - equalTo( - SchemaBuilder.builder() - .unionOf() - .nullType() - .and() - .stringBuilder() - .prop("sqlType", "TIME") - .endString() - .endUnion())); + equalTo(Schema.createUnion(Schema.create(Type.NULL), timeSchema))); + Schema geoSchema = Schema.create(Type.STRING); + geoSchema.addProp("sqlType", "GEOGRAPHY"); assertThat( avroSchema.getField("geoPositions").schema(), - equalTo( - SchemaBuilder.builder() - .unionOf() - .nullType() - .and() - .stringBuilder() - .prop("sqlType", "GEOGRAPHY") - .endString() - .endUnion())); + equalTo(Schema.createUnion(Schema.create(Type.NULL), geoSchema))); assertThat( avroSchema.getField("scion").schema(), equalTo( - SchemaBuilder.builder() - .unionOf() - .nullType() - .and() - .record("scion") - .doc("Translated Avro Schema for scion") - .namespace("org.apache.beam.sdk.io.gcp.bigquery") - .fields() - .name("species") - .type() - .unionOf() - .nullType() - .and() - .stringType() - .endUnion() - .noDefault() - .endRecord() - .endUnion())); - + Schema.createUnion( + Schema.create(Type.NULL), + Schema.createRecord( + "scion", + "Translated Avro Schema for scion", + "org.apache.beam.sdk.io.gcp.bigquery", + false, + ImmutableList.of( + new Field( + "species", + Schema.createUnion( + Schema.create(Type.NULL), Schema.create(Type.STRING)), + null, + (Object) null)))))); assertThat( avroSchema.getField("associates").schema(), equalTo( - SchemaBuilder.array() - .items() - .record("associates") - .doc("Translated Avro Schema for associates") - .namespace("org.apache.beam.sdk.io.gcp.bigquery") - .fields() - .name("species") - .type() - .unionOf() - .nullType() - .and() - .stringType() - .endUnion() - .noDefault() - .endRecord())); + Schema.createArray( + Schema.createRecord( + "associates", + "Translated Avro Schema for associates", + "org.apache.beam.sdk.io.gcp.bigquery", + false, + ImmutableList.of( + new Field( + "species", + Schema.createUnion( + Schema.create(Type.NULL), Schema.create(Type.STRING)), + null, + (Object) null)))))); } @Test - public void testConvertBigQuerySchemaToAvroSchema() { + public void testConvertBigQuerySchemaToAvroSchemaWithoutLogicalTypes() { TableSchema tableSchema = new TableSchema(); tableSchema.setFields(fields); - Schema avroSchema = BigQueryAvroUtils.toGenericAvroSchema(tableSchema); + Schema avroSchema = BigQueryAvroUtils.toGenericAvroSchema(tableSchema, false); - assertThat(avroSchema.getField("number").schema(), equalTo(SchemaBuilder.builder().longType())); + assertThat(avroSchema.getField("number").schema(), equalTo(Schema.create(Schema.Type.LONG))); assertThat( avroSchema.getField("species").schema(), - equalTo(SchemaBuilder.builder().unionOf().nullType().and().stringType().endUnion())); + equalTo( + Schema.createUnion( + Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING)))); assertThat( avroSchema.getField("quality").schema(), - equalTo(SchemaBuilder.builder().unionOf().nullType().and().doubleType().endUnion())); + equalTo( + Schema.createUnion( + Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.DOUBLE)))); assertThat( avroSchema.getField("quantity").schema(), - equalTo(SchemaBuilder.builder().unionOf().nullType().and().longType().endUnion())); + equalTo( + Schema.createUnion(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.LONG)))); assertThat( avroSchema.getField("birthday").schema(), equalTo( - SchemaBuilder.builder() - .unionOf() - .nullType() - .and() - .type( - LogicalTypes.timestampMicros().addToSchema(SchemaBuilder.builder().longType())) - .endUnion())); + Schema.createUnion( + Schema.create(Schema.Type.NULL), + LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG))))); assertThat( avroSchema.getField("birthdayMoney").schema(), equalTo( - SchemaBuilder.builder() - .unionOf() - .nullType() - .and() - .type(LogicalTypes.decimal(38, 9).addToSchema(SchemaBuilder.builder().bytesType())) - .endUnion())); + Schema.createUnion( + Schema.create(Schema.Type.NULL), + LogicalTypes.decimal(38, 9).addToSchema(Schema.create(Schema.Type.BYTES))))); assertThat( avroSchema.getField("lotteryWinnings").schema(), equalTo( - SchemaBuilder.builder() - .unionOf() - .nullType() - .and() - .type(LogicalTypes.decimal(77, 38).addToSchema(SchemaBuilder.builder().bytesType())) - .endUnion())); + Schema.createUnion( + Schema.create(Schema.Type.NULL), + LogicalTypes.decimal(77, 38).addToSchema(Schema.create(Schema.Type.BYTES))))); assertThat( avroSchema.getField("flighted").schema(), - equalTo(SchemaBuilder.builder().unionOf().nullType().and().booleanType().endUnion())); + equalTo( + Schema.createUnion( + Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.BOOLEAN)))); assertThat( avroSchema.getField("sound").schema(), - equalTo(SchemaBuilder.builder().unionOf().nullType().and().bytesType().endUnion())); + equalTo( + Schema.createUnion(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.BYTES)))); + Schema dateSchema = Schema.create(Schema.Type.STRING); + dateSchema.addProp("sqlType", "DATE"); assertThat( avroSchema.getField("anniversaryDate").schema(), - equalTo( - SchemaBuilder.builder() - .unionOf() - .nullType() - .and() - .type(LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType())) - .endUnion())); + equalTo(Schema.createUnion(Schema.create(Schema.Type.NULL), dateSchema))); + Schema dateTimeSchema = Schema.create(Schema.Type.STRING); + dateTimeSchema.addProp("sqlType", "DATETIME"); assertThat( avroSchema.getField("anniversaryDatetime").schema(), - equalTo( - SchemaBuilder.builder() - .unionOf() - .nullType() - .and() - .type(DATETIME_LOGICAL_TYPE.addToSchema(SchemaBuilder.builder().stringType())) - .endUnion())); + equalTo(Schema.createUnion(Schema.create(Schema.Type.NULL), dateTimeSchema))); + Schema timeSchema = Schema.create(Schema.Type.STRING); + timeSchema.addProp("sqlType", "TIME"); assertThat( avroSchema.getField("anniversaryTime").schema(), - equalTo( - SchemaBuilder.builder() - .unionOf() - .nullType() - .and() - .type(LogicalTypes.timeMicros().addToSchema(SchemaBuilder.builder().longType())) - .endUnion())); + equalTo(Schema.createUnion(Schema.create(Schema.Type.NULL), timeSchema))); + Schema geoSchema = Schema.create(Type.STRING); + geoSchema.addProp("sqlType", "GEOGRAPHY"); assertThat( avroSchema.getField("geoPositions").schema(), - equalTo( - SchemaBuilder.builder() - .unionOf() - .nullType() - .and() - .stringBuilder() - .prop("sqlType", "GEOGRAPHY") - .endString() - .endUnion())); + equalTo(Schema.createUnion(Schema.create(Schema.Type.NULL), geoSchema))); assertThat( avroSchema.getField("scion").schema(), equalTo( - SchemaBuilder.builder() - .unionOf() - .nullType() - .and() - .record("scion") - .doc("Translated Avro Schema for scion") - .namespace("org.apache.beam.sdk.io.gcp.bigquery") - .fields() - .name("species") - .type() - .unionOf() - .nullType() - .and() - .stringType() - .endUnion() - .noDefault() - .endRecord() - .endUnion())); - + Schema.createUnion( + Schema.create(Schema.Type.NULL), + Schema.createRecord( + "scion", + "Translated Avro Schema for scion", + "org.apache.beam.sdk.io.gcp.bigquery", + false, + ImmutableList.of( + new Schema.Field( + "species", + Schema.createUnion( + Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING)), + null, + (Object) null)))))); assertThat( avroSchema.getField("associates").schema(), equalTo( - SchemaBuilder.array() - .items() - .record("associates") - .doc("Translated Avro Schema for associates") - .namespace("org.apache.beam.sdk.io.gcp.bigquery") - .fields() - .name("species") - .type() - .unionOf() - .nullType() - .and() - .stringType() - .endUnion() - .noDefault() - .endRecord())); + Schema.createArray( + Schema.createRecord( + "associates", + "Translated Avro Schema for associates", + "org.apache.beam.sdk.io.gcp.bigquery", + false, + ImmutableList.of( + new Schema.Field( + "species", + Schema.createUnion( + Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING)), + null, + (Object) null)))))); } @Test diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java index 570435a4c95f..497653f9ab8d 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java @@ -170,7 +170,9 @@ public void testDefaultQueryBasedSource() throws Exception { @Test public void testQueryBasedSourceWithCustomQuery() throws Exception { TypedRead typedRead = - BigQueryIO.readTableRows().fromQuery("SELECT * FROM `google.com:project.dataset.table`"); + BigQueryIO.read(new TableRowParser()) + .fromQuery("SELECT * FROM `google.com:project.dataset.table`") + .withCoder(TableRowJsonCoder.of()); checkTypedReadQueryObject(typedRead, "SELECT * FROM `google.com:project.dataset.table`"); } @@ -225,7 +227,10 @@ public void testQueryBasedSourceWithTemplateCompatibility() throws Exception { } private TypedRead getDefaultTypedRead() { - return BigQueryIO.readTableRows().fromQuery(DEFAULT_QUERY).withMethod(Method.DIRECT_READ); + return BigQueryIO.read(new TableRowParser()) + .fromQuery(DEFAULT_QUERY) + .withCoder(TableRowJsonCoder.of()) + .withMethod(Method.DIRECT_READ); } private void checkTypedReadQueryObject(TypedRead typedRead, String query) { @@ -305,7 +310,7 @@ public void testQuerySourceEstimatedSize() throws Exception { /* queryTempProject = */ null, /* kmsKey = */ null, null, - TableRowParser.INSTANCE, + new TableRowParser(), TableRowJsonCoder.of(), fakeBigQueryServices); @@ -418,7 +423,7 @@ private void doQuerySourceInitialSplit( /* queryTempProject = */ null, /* kmsKey = */ null, null, - TableRowParser.INSTANCE, + new TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices() .withDatasetService(fakeDatasetService) @@ -667,7 +672,7 @@ public void testQuerySourceInitialSplitWithBigQueryProject_EmptyResult() throws /* queryTempProject = */ null, /* kmsKey = */ null, DataFormat.AVRO, - TableRowParser.INSTANCE, + new TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices() .withDatasetService(fakeDatasetService) @@ -739,7 +744,7 @@ public void testQuerySourceInitialSplit_EmptyResult() throws Exception { /* queryTempProject = */ null, /* kmsKey = */ null, null, - TableRowParser.INSTANCE, + new TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices() .withDatasetService(fakeDatasetService) @@ -764,7 +769,7 @@ public void testQuerySourceCreateReader() throws Exception { /* queryTempProject = */ null, /* kmsKey = */ null, null, - TableRowParser.INSTANCE, + new TableRowParser(), TableRowJsonCoder.of(), fakeBigQueryServices); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java index 4740b61d808d..d7930b595538 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java @@ -193,7 +193,8 @@ public void teardown() { @Test public void testBuildTableBasedSource() { BigQueryIO.TypedRead typedRead = - BigQueryIO.readTableRows() + BigQueryIO.read(new TableRowParser()) + .withCoder(TableRowJsonCoder.of()) .withMethod(Method.DIRECT_READ) .from("foo.com:project:dataset.table"); checkTypedReadTableObject(typedRead, "foo.com:project", "dataset", "table"); @@ -203,7 +204,8 @@ public void testBuildTableBasedSource() { @Test public void testBuildTableBasedSourceWithoutValidation() { BigQueryIO.TypedRead typedRead = - BigQueryIO.readTableRows() + BigQueryIO.read(new TableRowParser()) + .withCoder(TableRowJsonCoder.of()) .withMethod(Method.DIRECT_READ) .from("foo.com:project:dataset.table") .withoutValidation(); @@ -214,7 +216,10 @@ public void testBuildTableBasedSourceWithoutValidation() { @Test public void testBuildTableBasedSourceWithDefaultProject() { BigQueryIO.TypedRead typedRead = - BigQueryIO.readTableRows().withMethod(Method.DIRECT_READ).from("myDataset.myTable"); + BigQueryIO.read(new TableRowParser()) + .withCoder(TableRowJsonCoder.of()) + .withMethod(Method.DIRECT_READ) + .from("myDataset.myTable"); checkTypedReadTableObject(typedRead, null, "myDataset", "myTable"); } @@ -226,7 +231,10 @@ public void testBuildTableBasedSourceWithTableReference() { .setDatasetId("dataset") .setTableId("table"); BigQueryIO.TypedRead typedRead = - BigQueryIO.readTableRows().withMethod(Method.DIRECT_READ).from(tableReference); + BigQueryIO.read(new TableRowParser()) + .withCoder(TableRowJsonCoder.of()) + .withMethod(Method.DIRECT_READ) + .from(tableReference); checkTypedReadTableObject(typedRead, "foo.com:project", "dataset", "table"); } @@ -247,7 +255,8 @@ public void testBuildSourceWithTableAndFlatten() { + " which only applies to queries"); p.apply( "ReadMyTable", - BigQueryIO.readTableRows() + BigQueryIO.read(new TableRowParser()) + .withCoder(TableRowJsonCoder.of()) .withMethod(Method.DIRECT_READ) .from("foo.com:project:dataset.table") .withoutResultFlattening()); @@ -262,7 +271,8 @@ public void testBuildSourceWithTableAndSqlDialect() { + " which only applies to queries"); p.apply( "ReadMyTable", - BigQueryIO.readTableRows() + BigQueryIO.read(new TableRowParser()) + .withCoder(TableRowJsonCoder.of()) .withMethod(Method.DIRECT_READ) .from("foo.com:project:dataset.table") .usingStandardSql()); @@ -273,7 +283,8 @@ public void testBuildSourceWithTableAndSqlDialect() { public void testDisplayData() { String tableSpec = "foo.com:project:dataset.table"; BigQueryIO.TypedRead typedRead = - BigQueryIO.readTableRows() + BigQueryIO.read(new TableRowParser()) + .withCoder(TableRowJsonCoder.of()) .withMethod(Method.DIRECT_READ) .withSelectedFields(ImmutableList.of("foo", "bar")) .withProjectionPushdownApplied() @@ -288,7 +299,8 @@ public void testDisplayData() { public void testName() { assertEquals( "BigQueryIO.TypedRead", - BigQueryIO.readTableRows() + BigQueryIO.read(new TableRowParser()) + .withCoder(TableRowJsonCoder.of()) .withMethod(Method.DIRECT_READ) .from("foo.com:project:dataset.table") .getName()); @@ -335,7 +347,7 @@ private void doTableSourceEstimatedSizeTest(boolean useStreamingBuffer) throws E ValueProvider.StaticValueProvider.of(tableRef), null, null, - TableRowParser.INSTANCE, + new TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices().withDatasetService(fakeDatasetService)); @@ -355,7 +367,7 @@ public void testTableSourceEstimatedSize_WithBigQueryProject() throws Exception ValueProvider.StaticValueProvider.of(BigQueryHelpers.parseTableSpec("dataset.table")), null, null, - TableRowParser.INSTANCE, + new TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices().withDatasetService(fakeDatasetService)); @@ -374,7 +386,7 @@ public void testTableSourceEstimatedSize_WithDefaultProject() throws Exception { ValueProvider.StaticValueProvider.of(BigQueryHelpers.parseTableSpec("dataset.table")), null, null, - TableRowParser.INSTANCE, + new TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices().withDatasetService(fakeDatasetService)); @@ -466,7 +478,7 @@ private void doTableSourceInitialSplitTest(long bundleSize, int streamCount) thr ValueProvider.StaticValueProvider.of(tableRef), null, null, - TableRowParser.INSTANCE, + new TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices() .withDatasetService(fakeDatasetService) @@ -514,7 +526,7 @@ public void testTableSourceInitialSplit_WithSelectedFieldsAndRowRestriction() th ValueProvider.StaticValueProvider.of(tableRef), StaticValueProvider.of(Lists.newArrayList("name")), StaticValueProvider.of("number > 5"), - TableRowParser.INSTANCE, + new TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices() .withDatasetService(fakeDatasetService) @@ -559,7 +571,7 @@ public void testTableSourceInitialSplit_WithDefaultProject() throws Exception { ValueProvider.StaticValueProvider.of(BigQueryHelpers.parseTableSpec("dataset.table")), null, null, - TableRowParser.INSTANCE, + new TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices() .withDatasetService(fakeDatasetService) @@ -600,7 +612,7 @@ public void testTableSourceInitialSplit_EmptyTable() throws Exception { ValueProvider.StaticValueProvider.of(tableRef), null, null, - TableRowParser.INSTANCE, + new TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices() .withDatasetService(fakeDatasetService) @@ -618,7 +630,7 @@ public void testTableSourceCreateReader() throws Exception { BigQueryHelpers.parseTableSpec("foo.com:project:dataset.table")), null, null, - TableRowParser.INSTANCE, + new TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices().withDatasetService(fakeDatasetService)); @@ -737,7 +749,7 @@ public void testStreamSourceEstimatedSizeBytes() throws Exception { ReadSession.getDefaultInstance(), ReadStream.getDefaultInstance(), TABLE_SCHEMA, - TableRowParser.INSTANCE, + new TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices()); @@ -752,7 +764,7 @@ public void testStreamSourceSplit() throws Exception { ReadSession.getDefaultInstance(), ReadStream.getDefaultInstance(), TABLE_SCHEMA, - TableRowParser.INSTANCE, + new TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices()); @@ -781,7 +793,7 @@ public void testSplitReadStreamAtFraction() throws IOException { readSession, ReadStream.newBuilder().setName("readStream").build(), TABLE_SCHEMA, - TableRowParser.INSTANCE, + new TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices().withStorageClient(fakeStorageClient)); @@ -824,7 +836,7 @@ public void testReadFromStreamSource() throws Exception { readSession, ReadStream.newBuilder().setName("readStream").build(), TABLE_SCHEMA, - TableRowParser.INSTANCE, + new TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices().withStorageClient(fakeStorageClient)); @@ -880,7 +892,7 @@ public void testFractionConsumed() throws Exception { readSession, ReadStream.newBuilder().setName("readStream").build(), TABLE_SCHEMA, - TableRowParser.INSTANCE, + new TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices().withStorageClient(fakeStorageClient)); @@ -965,7 +977,7 @@ public void testFractionConsumedWithSplit() throws Exception { readSession, ReadStream.newBuilder().setName("parentStream").build(), TABLE_SCHEMA, - TableRowParser.INSTANCE, + new TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices().withStorageClient(fakeStorageClient)); @@ -1050,7 +1062,7 @@ public void testStreamSourceSplitAtFractionSucceeds() throws Exception { .build(), ReadStream.newBuilder().setName("parentStream").build(), TABLE_SCHEMA, - TableRowParser.INSTANCE, + new TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices().withStorageClient(fakeStorageClient)); @@ -1187,7 +1199,7 @@ public void testStreamSourceSplitAtFractionRepeated() throws Exception { .build(), readStreams.get(0), TABLE_SCHEMA, - TableRowParser.INSTANCE, + new TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices().withStorageClient(fakeStorageClient)); @@ -1251,7 +1263,7 @@ public void testStreamSourceSplitAtFractionFailsWhenSplitIsNotPossible() throws .build(), ReadStream.newBuilder().setName("parentStream").build(), TABLE_SCHEMA, - TableRowParser.INSTANCE, + new TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices().withStorageClient(fakeStorageClient)); @@ -1340,7 +1352,7 @@ public void testStreamSourceSplitAtFractionFailsWhenParentIsPastSplitPoint() thr .build(), ReadStream.newBuilder().setName("parentStream").build(), TABLE_SCHEMA, - TableRowParser.INSTANCE, + new TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices().withStorageClient(fakeStorageClient)); @@ -1432,7 +1444,7 @@ public void testStreamSourceSplitAtFractionFailsWhenReaderRunning() throws Excep readSession, ReadStream.newBuilder().setName("readStream").build(), TABLE_SCHEMA, - TableRowParser.INSTANCE, + new TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices().withStorageClient(fakeStorageClient)); @@ -1764,7 +1776,7 @@ public void testReadFromStreamSourceArrow() throws Exception { readSession, ReadStream.newBuilder().setName("readStream").build(), TABLE_SCHEMA, - TableRowParser.INSTANCE, + new TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices().withStorageClient(fakeStorageClient)); @@ -1813,7 +1825,7 @@ public void testFractionConsumedArrow() throws Exception { readSession, ReadStream.newBuilder().setName("readStream").build(), TABLE_SCHEMA, - TableRowParser.INSTANCE, + new TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices().withStorageClient(fakeStorageClient)); @@ -1895,7 +1907,7 @@ public void testFractionConsumedWithSplitArrow() throws Exception { readSession, ReadStream.newBuilder().setName("parentStream").build(), TABLE_SCHEMA, - TableRowParser.INSTANCE, + new TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices().withStorageClient(fakeStorageClient)); @@ -1976,7 +1988,7 @@ public void testStreamSourceSplitAtFractionSucceedsArrow() throws Exception { .build(), ReadStream.newBuilder().setName("parentStream").build(), TABLE_SCHEMA, - TableRowParser.INSTANCE, + new TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices().withStorageClient(fakeStorageClient)); @@ -2095,7 +2107,7 @@ public void testStreamSourceSplitAtFractionRepeatedArrow() throws Exception { .build(), readStreams.get(0), TABLE_SCHEMA, - TableRowParser.INSTANCE, + new TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices().withStorageClient(fakeStorageClient)); @@ -2155,7 +2167,7 @@ public void testStreamSourceSplitAtFractionFailsWhenSplitIsNotPossibleArrow() th .build(), ReadStream.newBuilder().setName("parentStream").build(), TABLE_SCHEMA, - TableRowParser.INSTANCE, + new TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices().withStorageClient(fakeStorageClient)); @@ -2241,7 +2253,7 @@ public void testStreamSourceSplitAtFractionFailsWhenParentIsPastSplitPointArrow( .build(), ReadStream.newBuilder().setName("parentStream").build(), TABLE_SCHEMA, - TableRowParser.INSTANCE, + new TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices().withStorageClient(fakeStorageClient));