Skip to content

Commit

Permalink
Remove TableSchema to JSON conversion. (#28274)
Browse files Browse the repository at this point in the history
* Rethrow error converting TableSchema to JSON

* Remove need to parse TableSchema to/from JSON

* Remove GenericDatumTransformer's JSON string param

* Remove unused TableSchemaFunction
  • Loading branch information
damondouglas authored Sep 18, 2023
1 parent de10fbd commit 7e83059
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryResourceNaming.createTempTableReference;
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

Expand Down Expand Up @@ -49,7 +50,6 @@
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.Message;
import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -132,13 +132,10 @@
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Function;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Suppliers;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
Expand Down Expand Up @@ -649,29 +646,19 @@ public static TypedRead<TableRow> readTableRowsWithSchema() {
BigQueryUtils.tableRowFromBeamRow());
}

private static class TableSchemaFunction
implements Serializable, Function<@Nullable String, @Nullable TableSchema> {
@Override
public @Nullable TableSchema apply(@Nullable String input) {
return BigQueryHelpers.fromJsonString(input, TableSchema.class);
}
}

@VisibleForTesting
static class GenericDatumTransformer<T> implements DatumReader<T> {
private final SerializableFunction<SchemaAndRecord, T> parseFn;
private final Supplier<TableSchema> tableSchema;
private final TableSchema tableSchema;
private GenericDatumReader<T> reader;
private org.apache.avro.Schema writerSchema;

public GenericDatumTransformer(
SerializableFunction<SchemaAndRecord, T> parseFn,
String tableSchema,
TableSchema tableSchema,
org.apache.avro.Schema writer) {
this.parseFn = parseFn;
this.tableSchema =
Suppliers.memoize(
Suppliers.compose(new TableSchemaFunction(), Suppliers.ofInstance(tableSchema)));
this.tableSchema = tableSchema;
this.writerSchema = writer;
this.reader = new GenericDatumReader<>(this.writerSchema);
}
Expand All @@ -689,7 +676,7 @@ public void setSchema(org.apache.avro.Schema schema) {
@Override
public T read(T reuse, Decoder in) throws IOException {
GenericRecord record = (GenericRecord) this.reader.read(reuse, in);
return parseFn.apply(new SchemaAndRecord(record, this.tableSchema.get()));
return parseFn.apply(new SchemaAndRecord(record, this.tableSchema));
}
}

Expand Down Expand Up @@ -721,16 +708,9 @@ public static <T> TypedRead<T> read(SerializableFunction<SchemaAndRecord, T> par
.setDatumReaderFactory(
(SerializableFunction<TableSchema, AvroSource.DatumReaderFactory<T>>)
input -> {
try {
String jsonTableSchema = BigQueryIO.JSON_FACTORY.toString(input);
return (AvroSource.DatumReaderFactory<T>)
(writer, reader) ->
new GenericDatumTransformer<>(parseFn, jsonTableSchema, writer);
} catch (IOException e) {
LOG.warn(
String.format("Error while converting table schema %s to JSON!", input), e);
return null;
}
TableSchema safeInput = checkStateNotNull(input);
return (AvroSource.DatumReaderFactory<T>)
(writer, reader) -> new GenericDatumTransformer<>(parseFn, safeInput, writer);
})
// TODO: Remove setParseFn once https://github.com/apache/beam/issues/21076 is fixed.
.setParseFn(parseFn)
Expand Down Expand Up @@ -3386,9 +3366,7 @@ private <DestinationT> WriteResult expandTyped(
@SuppressWarnings({"unchecked", "nullness"})
Descriptors.Descriptor descriptor =
(Descriptors.Descriptor)
org.apache.beam.sdk.util.Preconditions.checkStateNotNull(
writeProtoClass.getMethod("getDescriptor"))
.invoke(null);
checkStateNotNull(writeProtoClass.getMethod("getDescriptor")).invoke(null);
TableSchema tableSchema =
TableRowToStorageApiProto.protoSchemaToTableSchema(
TableRowToStorageApiProto.tableSchemaFromDescriptor(descriptor));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryResourceNaming.createTempTableReference;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -143,18 +144,11 @@ public void evaluate() throws Throwable {

private SerializableFunction<TableSchema, AvroSource.DatumReaderFactory<TableRow>>
datumReaderFactoryFn =
(SerializableFunction<TableSchema, AvroSource.DatumReaderFactory<TableRow>>)
input -> {
try {
String jsonSchema = BigQueryIO.JSON_FACTORY.toString(input);
return (AvroSource.DatumReaderFactory<TableRow>)
(writer, reader) ->
new BigQueryIO.GenericDatumTransformer<>(
BigQueryIO.TableRowParser.INSTANCE, jsonSchema, writer);
} catch (IOException e) {
return null;
}
};
input ->
(AvroSource.DatumReaderFactory<TableRow>)
(writer, reader) ->
new BigQueryIO.GenericDatumTransformer<>(
BigQueryIO.TableRowParser.INSTANCE, checkStateNotNull(input), writer);

private static class MyData implements Serializable {
private String name;
Expand Down

0 comments on commit 7e83059

Please sign in to comment.