Skip to content

Commit

Permalink
Add schema to SpannerIO read (#32008)
Browse files Browse the repository at this point in the history
* Add schema to SpannerIO read

Signed-off-by: Jeffrey Kinard <[email protected]>

* fix spotless

Signed-off-by: Jeffrey Kinard <[email protected]>

* fix spotless

Signed-off-by: Jeffrey Kinard <[email protected]>

* address comments

Signed-off-by: Jeffrey Kinard <[email protected]>

* spotless

Signed-off-by: Jeffrey Kinard <[email protected]>

---------

Signed-off-by: Jeffrey Kinard <[email protected]>
  • Loading branch information
Polber authored Aug 27, 2024
1 parent 3dec995 commit 24255ac
Show file tree
Hide file tree
Showing 9 changed files with 442 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,14 @@ public static ReadAll readAll() {
.build();
}

public static Read readWithSchema() {
return read()
.withBeamRowConverters(
TypeDescriptor.of(Struct.class),
StructUtils.structToBeamRow(),
StructUtils.structFromBeamRow());
}

/**
* Returns a transform that creates a batch transaction. By default, {@link
* TimestampBound#strong()} transaction is created, to override this use {@link
Expand Down Expand Up @@ -708,6 +716,12 @@ static ServiceCallMetric buildServiceCallMetricForReadOp(
@AutoValue
public abstract static class Read extends PTransform<PBegin, PCollection<Struct>> {

interface ToBeamRowFunction
extends SerializableFunction<Schema, SerializableFunction<Struct, Row>> {}

interface FromBeamRowFunction
extends SerializableFunction<Schema, SerializableFunction<Row, Struct>> {}

abstract SpannerConfig getSpannerConfig();

abstract ReadOperation getReadOperation();
Expand All @@ -720,6 +734,12 @@ public abstract static class Read extends PTransform<PBegin, PCollection<Struct>

abstract Boolean getBatching();

abstract @Nullable TypeDescriptor<Struct> getTypeDescriptor();

abstract @Nullable ToBeamRowFunction getToBeamRowFn();

abstract @Nullable FromBeamRowFunction getFromBeamRowFn();

abstract Builder toBuilder();

@AutoValue.Builder
Expand All @@ -737,9 +757,26 @@ abstract static class Builder {

abstract Builder setBatching(Boolean batching);

abstract Builder setTypeDescriptor(TypeDescriptor<Struct> typeDescriptor);

abstract Builder setToBeamRowFn(ToBeamRowFunction toRowFn);

abstract Builder setFromBeamRowFn(FromBeamRowFunction fromRowFn);

abstract Read build();
}

public Read withBeamRowConverters(
TypeDescriptor<Struct> typeDescriptor,
ToBeamRowFunction toRowFn,
FromBeamRowFunction fromRowFn) {
return toBuilder()
.setTypeDescriptor(typeDescriptor)
.setToBeamRowFn(toRowFn)
.setFromBeamRowFn(fromRowFn)
.build();
}

/** Specifies the Cloud Spanner configuration. */
public Read withSpannerConfig(SpannerConfig spannerConfig) {
return toBuilder().setSpannerConfig(spannerConfig).build();
Expand Down Expand Up @@ -876,6 +913,14 @@ public Read withHighPriority() {
return withSpannerConfig(config.withRpcPriority(RpcPriority.HIGH));
}

private SpannerSourceDef createSourceDef() {
if (getReadOperation().getQuery() != null) {
return SpannerQuerySourceDef.create(getSpannerConfig(), getReadOperation().getQuery());
}
return SpannerTableSourceDef.create(
getSpannerConfig(), getReadOperation().getTable(), getReadOperation().getColumns());
}

@Override
public PCollection<Struct> expand(PBegin input) {
getSpannerConfig().validate();
Expand Down Expand Up @@ -905,13 +950,32 @@ public PCollection<Struct> expand(PBegin input) {
"SpannerIO.read() requires query OR table to set with withTable OR withQuery method.");
}

final SpannerSourceDef sourceDef = createSourceDef();

Schema beamSchema = null;
if (getTypeDescriptor() != null && getToBeamRowFn() != null && getFromBeamRowFn() != null) {
beamSchema = sourceDef.getBeamSchema();
}

ReadAll readAll =
readAll()
.withSpannerConfig(getSpannerConfig())
.withTimestampBound(getTimestampBound())
.withBatching(getBatching())
.withTransaction(getTransaction());
return input.apply(Create.of(getReadOperation())).apply("Execute query", readAll);

PCollection<Struct> rows =
input.apply(Create.of(getReadOperation())).apply("Execute query", readAll);

if (beamSchema != null) {
rows.setSchema(
beamSchema,
getTypeDescriptor(),
getToBeamRowFn().apply(beamSchema),
getFromBeamRowFn().apply(beamSchema));
}

return rows;
}

SerializableFunction<Struct, Row> getFormatFn() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.spanner;

import static org.apache.beam.sdk.io.gcp.spanner.StructUtils.structTypeToBeamRowSchema;

import com.google.cloud.spanner.ReadContext;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.Statement;
import org.apache.beam.sdk.schemas.Schema;

class SpannerQuerySourceDef implements SpannerSourceDef {

private final SpannerConfig config;
private final Statement query;

static SpannerQuerySourceDef create(SpannerConfig config, Statement query) {
return new SpannerQuerySourceDef(config, query);
}

private SpannerQuerySourceDef(SpannerConfig config, Statement query) {
this.config = config;
this.query = query;
}

/** {@inheritDoc} */
@Override
public Schema getBeamSchema() {
Schema beamSchema;
try (SpannerAccessor spannerAccessor = SpannerAccessor.getOrCreate(config)) {
try (ReadContext readContext = spannerAccessor.getDatabaseClient().singleUse()) {
ResultSet result = readContext.analyzeQuery(query, ReadContext.QueryAnalyzeMode.PLAN);
result.next();
beamSchema = structTypeToBeamRowSchema(result.getMetadata().getRowType(), true);
}
} catch (Exception e) {
throw new SpannerSchemaRetrievalException("Exception while trying to retrieve schema", e);
}
return beamSchema;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.spanner;

/** Exception to signal that Spanner schema retrieval failed. */
public class SpannerSchemaRetrievalException extends RuntimeException {
SpannerSchemaRetrievalException(String message, Throwable cause) {
super(message, cause);
}

SpannerSchemaRetrievalException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.spanner;

import java.io.Serializable;
import org.apache.beam.sdk.schemas.Schema;

/**
* Represents a source used for {@link SpannerIO#read()}. Currently, this could be either a table or
* a query. Direct read sources are not yet supported.
*/
interface SpannerSourceDef extends Serializable {

/**
* Extract the Beam {@link Schema} corresponding to this source.
*
* @return Beam schema of the source
* @throws SpannerSchemaRetrievalException if schema retrieval fails
*/
Schema getBeamSchema();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.spanner;

import static org.apache.beam.sdk.io.gcp.spanner.StructUtils.structTypeToBeamRowSchema;

import com.google.cloud.spanner.KeySet;
import com.google.cloud.spanner.Options;
import com.google.cloud.spanner.ReadContext;
import com.google.cloud.spanner.ResultSet;
import org.apache.beam.sdk.schemas.Schema;

class SpannerTableSourceDef implements SpannerSourceDef {

private final SpannerConfig config;
private final String table;
private final Iterable<String> columns;

static SpannerTableSourceDef create(
SpannerConfig config, String table, Iterable<String> columns) {
return new SpannerTableSourceDef(config, table, columns);
}

private SpannerTableSourceDef(SpannerConfig config, String table, Iterable<String> columns) {
this.table = table;
this.config = config;
this.columns = columns;
}

/** {@inheritDoc} */
@Override
public Schema getBeamSchema() {
Schema beamSchema;
try (SpannerAccessor spannerAccessor = SpannerAccessor.getOrCreate(config)) {
try (ReadContext readContext = spannerAccessor.getDatabaseClient().singleUse()) {
ResultSet result = readContext.read(table, KeySet.all(), columns, Options.limit(1));
if (result.next()) {
beamSchema = structTypeToBeamRowSchema(result.getMetadata().getRowType(), true);
} else {
throw new SpannerSchemaRetrievalException("Cannot find Spanner table.");
}
}
} catch (Exception e) {
throw new SpannerSchemaRetrievalException("Exception while trying to retrieve schema", e);
}
return beamSchema;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Struct;
import com.google.cloud.spanner.Type;
import com.google.spanner.v1.StructType;
import java.math.BigDecimal;
import java.util.HashMap;
import java.util.List;
Expand All @@ -38,6 +39,20 @@

final class StructUtils {

private static final SpannerIO.Read.ToBeamRowFunction STRUCT_TO_BEAM_ROW_FUNCTION =
schema -> (Struct struct) -> structToBeamRow(struct, schema);

public static SpannerIO.Read.ToBeamRowFunction structToBeamRow() {
return STRUCT_TO_BEAM_ROW_FUNCTION;
}

private static final SpannerIO.Read.FromBeamRowFunction STRUCT_FROM_BEAM_ROW_FUNCTION =
ignored -> StructUtils::beamRowToStruct;

public static SpannerIO.Read.FromBeamRowFunction structFromBeamRow() {
return STRUCT_FROM_BEAM_ROW_FUNCTION;
}

// It's not possible to pass nulls as values even with a field is nullable
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
Expand All @@ -52,6 +67,58 @@ public static Row structToBeamRow(Struct struct, Schema schema) {
return Row.withSchema(schema).withFieldValues(structValues).build();
}

public static Schema structTypeToBeamRowSchema(StructType structType, boolean isRead) {
Schema.Builder beamSchema = Schema.builder();
structType
.getFieldsList()
.forEach(
field -> {
Schema.FieldType fieldType;
try {
fieldType = convertSpannerTypeToBeamFieldType(field.getType());
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException(
"Error processing struct to row: " + e.getMessage());
}
// Treat reads from Spanner as Nullable and leave Null handling to Spanner
if (isRead) {
beamSchema.addNullableField(field.getName(), fieldType);
} else {
beamSchema.addField(field.getName(), fieldType);
}
});
return beamSchema.build();
}

public static Schema.FieldType convertSpannerTypeToBeamFieldType(
com.google.spanner.v1.Type spannerType) {
switch (spannerType.getCode()) {
case BOOL:
return Schema.FieldType.BOOLEAN;
case BYTES:
return Schema.FieldType.BYTES;
case TIMESTAMP:
case DATE:
return Schema.FieldType.DATETIME;
case INT64:
return Schema.FieldType.INT64;
case FLOAT32:
return Schema.FieldType.FLOAT;
case FLOAT64:
return Schema.FieldType.DOUBLE;
case NUMERIC:
return Schema.FieldType.DECIMAL;
case ARRAY:
return Schema.FieldType.array(
convertSpannerTypeToBeamFieldType(spannerType.getArrayElementType()));
case STRUCT:
throw new IllegalArgumentException(
String.format("Unsupported type '%s'.", spannerType.getCode()));
default:
return Schema.FieldType.STRING;
}
}

public static Struct beamRowToStruct(Row row) {
Struct.Builder structBuilder = Struct.newBuilder();
List<Schema.Field> fields = row.getSchema().getFields();
Expand Down
Loading

0 comments on commit 24255ac

Please sign in to comment.