From 7e830593e61ba1fbff16411b5825bfb4aea53ba2 Mon Sep 17 00:00:00 2001 From: Damon Date: Mon, 18 Sep 2023 16:10:20 +0000 Subject: [PATCH] Remove TableSchema to JSON conversion. (#28274) * Rethrow error converting TableSchema to JSON * Remove need to parse TableSchema to/from JSON * Remove GenericDatumTransformer's JSON string param * Remove unused TableSchemaFunction --- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 40 +++++-------------- .../io/gcp/bigquery/BigQueryIOReadTest.java | 18 +++------ 2 files changed, 15 insertions(+), 43 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 58d769312444..3c006d24d037 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -19,6 +19,7 @@ import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation; import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryResourceNaming.createTempTableReference; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; @@ -49,7 +50,6 @@ import com.google.protobuf.DynamicMessage; import com.google.protobuf.Message; import java.io.IOException; -import java.io.Serializable; import java.lang.reflect.InvocationTargetException; import java.util.Collections; import java.util.List; @@ -132,13 +132,10 @@ import org.apache.beam.sdk.values.TypeDescriptors; import org.apache.beam.sdk.values.ValueInSingleWindow; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Function; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Suppliers; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; @@ -649,29 +646,19 @@ public static TypedRead readTableRowsWithSchema() { BigQueryUtils.tableRowFromBeamRow()); } - private static class TableSchemaFunction - implements Serializable, Function<@Nullable String, @Nullable TableSchema> { - @Override - public @Nullable TableSchema apply(@Nullable String input) { - return BigQueryHelpers.fromJsonString(input, TableSchema.class); - } - } - @VisibleForTesting static class GenericDatumTransformer implements DatumReader { private final SerializableFunction parseFn; - private final Supplier tableSchema; + private final TableSchema tableSchema; private GenericDatumReader reader; private org.apache.avro.Schema writerSchema; public GenericDatumTransformer( SerializableFunction parseFn, - String tableSchema, + TableSchema tableSchema, org.apache.avro.Schema writer) { this.parseFn = parseFn; - this.tableSchema = - Suppliers.memoize( - Suppliers.compose(new TableSchemaFunction(), Suppliers.ofInstance(tableSchema))); + this.tableSchema = tableSchema; this.writerSchema = writer; this.reader = new GenericDatumReader<>(this.writerSchema); } @@ -689,7 +676,7 @@ public void setSchema(org.apache.avro.Schema schema) { @Override public T read(T reuse, Decoder in) throws IOException { GenericRecord record = (GenericRecord) this.reader.read(reuse, in); - return parseFn.apply(new SchemaAndRecord(record, this.tableSchema.get())); + return parseFn.apply(new SchemaAndRecord(record, this.tableSchema)); } } @@ -721,16 +708,9 @@ public static TypedRead read(SerializableFunction par .setDatumReaderFactory( (SerializableFunction>) input -> { - try { - String jsonTableSchema = BigQueryIO.JSON_FACTORY.toString(input); - return (AvroSource.DatumReaderFactory) - (writer, reader) -> - new GenericDatumTransformer<>(parseFn, jsonTableSchema, writer); - } catch (IOException e) { - LOG.warn( - String.format("Error while converting table schema %s to JSON!", input), e); - return null; - } + TableSchema safeInput = checkStateNotNull(input); + return (AvroSource.DatumReaderFactory) + (writer, reader) -> new GenericDatumTransformer<>(parseFn, safeInput, writer); }) // TODO: Remove setParseFn once https://github.com/apache/beam/issues/21076 is fixed. .setParseFn(parseFn) @@ -3386,9 +3366,7 @@ private WriteResult expandTyped( @SuppressWarnings({"unchecked", "nullness"}) Descriptors.Descriptor descriptor = (Descriptors.Descriptor) - org.apache.beam.sdk.util.Preconditions.checkStateNotNull( - writeProtoClass.getMethod("getDescriptor")) - .invoke(null); + checkStateNotNull(writeProtoClass.getMethod("getDescriptor")).invoke(null); TableSchema tableSchema = TableRowToStorageApiProto.protoSchemaToTableSchema( TableRowToStorageApiProto.tableSchemaFromDescriptor(descriptor)); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java index bc75ba8bd9ba..e274a8ac68ef 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java @@ -19,6 +19,7 @@ import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryResourceNaming.createTempTableReference; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertEquals; @@ -143,18 +144,11 @@ public void evaluate() throws Throwable { private SerializableFunction> datumReaderFactoryFn = - (SerializableFunction>) - input -> { - try { - String jsonSchema = BigQueryIO.JSON_FACTORY.toString(input); - return (AvroSource.DatumReaderFactory) - (writer, reader) -> - new BigQueryIO.GenericDatumTransformer<>( - BigQueryIO.TableRowParser.INSTANCE, jsonSchema, writer); - } catch (IOException e) { - return null; - } - }; + input -> + (AvroSource.DatumReaderFactory) + (writer, reader) -> + new BigQueryIO.GenericDatumTransformer<>( + BigQueryIO.TableRowParser.INSTANCE, checkStateNotNull(input), writer); private static class MyData implements Serializable { private String name;