diff --git a/test-proxy/src/main/java/com/google/cloud/bigtable/testproxy/CbtTestProxy.java b/test-proxy/src/main/java/com/google/cloud/bigtable/testproxy/CbtTestProxy.java index 6e563d4df0..1c72704b62 100644 --- a/test-proxy/src/main/java/com/google/cloud/bigtable/testproxy/CbtTestProxy.java +++ b/test-proxy/src/main/java/com/google/cloud/bigtable/testproxy/CbtTestProxy.java @@ -1,5 +1,5 @@ /* - * Copyright 2022 Google LLC + * Copyright 2024 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -41,6 +41,7 @@ import com.google.cloud.bigtable.data.v2.models.ReadModifyWriteRow; import com.google.cloud.bigtable.data.v2.models.RowCell; import com.google.cloud.bigtable.data.v2.models.RowMutation; +import com.google.cloud.bigtable.data.v2.models.sql.ResultSet; import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStubSettings; import com.google.cloud.bigtable.testproxy.CloudBigtableV2TestProxyGrpc.CloudBigtableV2TestProxyImplBase; import com.google.common.base.Preconditions; @@ -50,6 +51,7 @@ import io.grpc.ManagedChannelBuilder; import io.grpc.Status; import io.grpc.StatusException; +import io.grpc.StatusRuntimeException; import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts; import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder; import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext; @@ -65,6 +67,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.logging.Logger; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -159,6 +162,8 @@ private static BigtableDataSettings.Builder overrideTimeoutSetting( settingsBuilder.stubSettings().readModifyWriteRowSettings().retrySettings(), newTimeout); updateTimeout( settingsBuilder.stubSettings().sampleRowKeysSettings().retrySettings(), newTimeout); + updateTimeout( + settingsBuilder.stubSettings().executeQuerySettings().retrySettings(), newTimeout); return settingsBuilder; } @@ -698,6 +703,64 @@ public void readModifyWriteRow( responseObserver.onCompleted(); } + @Override + public void executeQuery( + ExecuteQueryRequest request, StreamObserver responseObserver) { + CbtClient client; + try { + client = getClient(request.getClientId()); + } catch (StatusException e) { + responseObserver.onError(e); + return; + } + try (ResultSet resultSet = + client.dataClient().executeQuery(StatementDeserializer.toStatement(request))) { + responseObserver.onNext(ResultSetSerializer.toExecuteQueryResult(resultSet)); + } catch (InterruptedException e) { + responseObserver.onError(e); + return; + } catch (ExecutionException e) { + responseObserver.onError(e); + return; + } catch (ApiException e) { + responseObserver.onNext( + ExecuteQueryResult.newBuilder() + .setStatus( + com.google.rpc.Status.newBuilder() + .setCode(e.getStatusCode().getCode().ordinal()) + .setMessage(e.getMessage()) + .build()) + .build()); + responseObserver.onCompleted(); + return; + } catch (StatusRuntimeException e) { + responseObserver.onNext( + ExecuteQueryResult.newBuilder() + .setStatus( + com.google.rpc.Status.newBuilder() + .setCode(e.getStatus().getCode().value()) + .setMessage(e.getStatus().getDescription()) + .build()) + .build()); + responseObserver.onCompleted(); + return; + } catch (RuntimeException e) { + // If client encounters problem, don't return any results. + responseObserver.onNext( + ExecuteQueryResult.newBuilder() + .setStatus( + com.google.rpc.Status.newBuilder() + .setCode(Code.INTERNAL.getNumber()) + .setMessage(e.getMessage()) + .build()) + .build()); + responseObserver.onCompleted(); + return; + } + responseObserver.onCompleted(); + return; + } + @Override public synchronized void close() { Iterator> it = idClientMap.entrySet().iterator(); diff --git a/test-proxy/src/main/java/com/google/cloud/bigtable/testproxy/ResultSetSerializer.java b/test-proxy/src/main/java/com/google/cloud/bigtable/testproxy/ResultSetSerializer.java new file mode 100644 index 0000000000..c138c82a6b --- /dev/null +++ b/test-proxy/src/main/java/com/google/cloud/bigtable/testproxy/ResultSetSerializer.java @@ -0,0 +1,233 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.testproxy; + +import com.google.bigtable.v2.ArrayValue; +import com.google.bigtable.v2.Type; +import com.google.bigtable.v2.Type.Array; +import com.google.bigtable.v2.Type.Bool; +import com.google.bigtable.v2.Type.Bytes; +import com.google.bigtable.v2.Type.Float32; +import com.google.bigtable.v2.Type.Float64; +import com.google.bigtable.v2.Type.Int64; +import com.google.bigtable.v2.Type.Map; +import com.google.bigtable.v2.Type.Struct; +import com.google.bigtable.v2.Type.Timestamp; +import com.google.bigtable.v2.Value; +import com.google.cloud.Date; +import com.google.cloud.bigtable.data.v2.models.sql.ColumnMetadata; +import com.google.cloud.bigtable.data.v2.models.sql.ResultSet; +import com.google.cloud.bigtable.data.v2.models.sql.SqlType; +import com.google.cloud.bigtable.data.v2.models.sql.StructReader; +import com.google.protobuf.ByteString; +import java.util.List; +import java.util.concurrent.ExecutionException; +import org.threeten.bp.Instant; + +public class ResultSetSerializer { + public static ExecuteQueryResult toExecuteQueryResult(ResultSet resultSet) + throws ExecutionException, InterruptedException { + ExecuteQueryResult.Builder resultBuilder = ExecuteQueryResult.newBuilder(); + for (ColumnMetadata columnMetadata : resultSet.getMetadata().getColumns()) { + resultBuilder + .getMetadataBuilder() + .addColumnsBuilder() + .setName(columnMetadata.name()) + .setType(toProtoType(columnMetadata.type())); + } + + while (resultSet.next()) { + SqlRow.Builder rowBuilder = resultBuilder.addRowsBuilder(); + + for (int i = 0; i < resultSet.getMetadata().getColumns().size(); i++) { + SqlType colType = resultSet.getMetadata().getColumnType(i); + rowBuilder.addValues(toProtoValue(getColumn(resultSet, i, colType), colType)); + } + } + + return resultBuilder.build(); + } + + private static Value toProtoValue(Object value, SqlType type) { + if (value == null) { + return Value.getDefaultInstance(); + } + + Value.Builder valueBuilder = Value.newBuilder(); + switch (type.getCode()) { + case BYTES: + valueBuilder.setBytesValue((ByteString) value); + break; + case STRING: + valueBuilder.setStringValue((String) value); + break; + case INT64: + valueBuilder.setIntValue((Long) value); + break; + case FLOAT32: + valueBuilder.setFloatValue((Float) value); + break; + case FLOAT64: + valueBuilder.setFloatValue((Double) value); + break; + case BOOL: + valueBuilder.setBoolValue((Boolean) value); + break; + case TIMESTAMP: + Instant ts = (Instant) value; + valueBuilder.setTimestampValue( + com.google.protobuf.Timestamp.newBuilder() + .setSeconds(ts.getEpochSecond()) + .setNanos(ts.getNano()) + .build()); + break; + case DATE: + Date date = (Date) value; + valueBuilder.setDateValue( + com.google.type.Date.newBuilder() + .setYear(date.getYear()) + .setMonth(date.getMonth()) + .setDay(date.getDayOfMonth()) + .build()); + break; + case ARRAY: + SqlType elementType = ((SqlType.Array) type).getElementType(); + ArrayValue.Builder arrayValue = ArrayValue.newBuilder(); + for (Object item : (List) value) { + arrayValue.addValues(toProtoValue(item, elementType)); + } + valueBuilder.setArrayValue(arrayValue.build()); + break; + case MAP: + SqlType.Map mapType = (SqlType.Map) type; + SqlType mapKeyType = mapType.getKeyType(); + SqlType mapValueType = mapType.getValueType(); + + ArrayValue.Builder mapArrayValue = ArrayValue.newBuilder(); + ((java.util.Map) value) + .forEach( + (k, v) -> + mapArrayValue.addValues( + Value.newBuilder() + .setArrayValue( + ArrayValue.newBuilder() + .addValues(toProtoValue(k, mapKeyType)) + .addValues(toProtoValue(v, mapValueType)) + .build()))); + valueBuilder.setArrayValue(mapArrayValue.build()); + break; + case STRUCT: + StructReader structValue = (StructReader) value; + SqlType.Struct structType = (SqlType.Struct) type; + ArrayValue.Builder structArrayValue = ArrayValue.newBuilder(); + for (int i = 0; i < structType.getFields().size(); ++i) { + SqlType fieldType = structType.getType(i); + structArrayValue.addValues(toProtoValue(getColumn(structValue, i, fieldType), fieldType)); + } + valueBuilder.setArrayValue(structArrayValue); + break; + default: + throw new IllegalStateException("Unexpected Type: " + type); + } + + return valueBuilder.build(); + } + + private static Object getColumn(StructReader struct, int fieldIndex, SqlType fieldType) { + if (struct.isNull(fieldIndex)) { + return null; + } + + switch (fieldType.getCode()) { + case ARRAY: + return struct.getList(fieldIndex, (SqlType.Array) fieldType); + case BOOL: + return struct.getBoolean(fieldIndex); + case BYTES: + return struct.getBytes(fieldIndex); + case DATE: + return struct.getDate(fieldIndex); + case FLOAT32: + return struct.getFloat(fieldIndex); + case FLOAT64: + return struct.getDouble(fieldIndex); + case INT64: + return struct.getLong(fieldIndex); + case MAP: + return struct.getMap(fieldIndex, (SqlType.Map) fieldType); + case STRING: + return struct.getString(fieldIndex); + case STRUCT: + return struct.getStruct(fieldIndex); + case TIMESTAMP: + return struct.getTimestamp(fieldIndex); + default: + throw new IllegalStateException("Unexpected Type: " + fieldType); + } + } + + private static Type toProtoType(SqlType type) { + switch (type.getCode()) { + case BYTES: + return Type.newBuilder().setBytesType(Bytes.getDefaultInstance()).build(); + case STRING: + return Type.newBuilder() + .setStringType(com.google.bigtable.v2.Type.String.getDefaultInstance()) + .build(); + case INT64: + return Type.newBuilder().setInt64Type(Int64.getDefaultInstance()).build(); + case FLOAT32: + return Type.newBuilder().setFloat32Type(Float32.getDefaultInstance()).build(); + case FLOAT64: + return Type.newBuilder().setFloat64Type(Float64.getDefaultInstance()).build(); + case BOOL: + return Type.newBuilder().setBoolType(Bool.getDefaultInstance()).build(); + case TIMESTAMP: + return Type.newBuilder().setTimestampType(Timestamp.getDefaultInstance()).build(); + case DATE: + return Type.newBuilder() + .setDateType(com.google.bigtable.v2.Type.Date.getDefaultInstance()) + .build(); + case ARRAY: + SqlType.Array arrayType = (SqlType.Array) type; + return Type.newBuilder() + .setArrayType( + Array.newBuilder().setElementType(toProtoType(arrayType.getElementType()))) + .build(); + case MAP: + SqlType.Map mapType = (SqlType.Map) type; + return Type.newBuilder() + .setMapType( + Map.newBuilder() + .setKeyType(toProtoType(mapType.getKeyType())) + .setValueType(toProtoType(mapType.getValueType()))) + .build(); + case STRUCT: + SqlType.Struct structType = (SqlType.Struct) type; + Struct.Builder structBuilder = Struct.newBuilder(); + for (SqlType.Struct.Field field : structType.getFields()) { + structBuilder + .addFieldsBuilder() + .setFieldName(field.name()) + .setType(toProtoType(field.type())); + } + return Type.newBuilder().setStructType(structBuilder.build()).build(); + + default: + throw new IllegalStateException("Unexpected Type: " + type); + } + } +} diff --git a/test-proxy/src/main/java/com/google/cloud/bigtable/testproxy/StatementDeserializer.java b/test-proxy/src/main/java/com/google/cloud/bigtable/testproxy/StatementDeserializer.java new file mode 100644 index 0000000000..ae3b50aa7f --- /dev/null +++ b/test-proxy/src/main/java/com/google/cloud/bigtable/testproxy/StatementDeserializer.java @@ -0,0 +1,167 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.testproxy; + +import com.google.bigtable.v2.Value; +import com.google.bigtable.v2.Value.KindCase; +import com.google.cloud.Date; +import com.google.cloud.bigtable.data.v2.models.sql.SqlType; +import com.google.cloud.bigtable.data.v2.models.sql.Statement; +import com.google.protobuf.Timestamp; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.threeten.bp.Instant; + +public class StatementDeserializer { + + static Statement toStatement(ExecuteQueryRequest request) { + Statement.Builder statementBuilder = Statement.newBuilder(request.getRequest().getQuery()); + for (Map.Entry paramEntry : request.getRequest().getParamsMap().entrySet()) { + String name = paramEntry.getKey(); + Value value = paramEntry.getValue(); + switch (value.getType().getKindCase()) { + case BYTES_TYPE: + if (value.getKindCase().equals(KindCase.KIND_NOT_SET)) { + statementBuilder.setBytesParam(name, null); + } else if (value.getKindCase().equals(KindCase.BYTES_VALUE)) { + statementBuilder.setBytesParam(name, value.getBytesValue()); + } else { + throw new IllegalArgumentException("Unexpected bytes value: " + value); + } + break; + case STRING_TYPE: + if (value.getKindCase().equals(KindCase.KIND_NOT_SET)) { + statementBuilder.setStringParam(name, null); + } else if (value.getKindCase().equals(KindCase.STRING_VALUE)) { + statementBuilder.setStringParam(name, value.getStringValue()); + } else { + throw new IllegalArgumentException("Malformed string value: " + value); + } + break; + case INT64_TYPE: + if (value.getKindCase().equals(KindCase.KIND_NOT_SET)) { + statementBuilder.setLongParam(name, null); + } else if (value.getKindCase().equals(KindCase.INT_VALUE)) { + statementBuilder.setLongParam(name, value.getIntValue()); + } else { + throw new IllegalArgumentException("Malformed int64 value: " + value); + } + break; + case FLOAT32_TYPE: + if (value.getKindCase().equals(KindCase.KIND_NOT_SET)) { + statementBuilder.setFloatParam(name, null); + } else if (value.getKindCase().equals(KindCase.FLOAT_VALUE)) { + statementBuilder.setFloatParam(name, (float) value.getFloatValue()); + } else { + throw new IllegalArgumentException("Malformed float32 value: " + value); + } + break; + case FLOAT64_TYPE: + if (value.getKindCase().equals(KindCase.KIND_NOT_SET)) { + statementBuilder.setDoubleParam(name, null); + } else if (value.getKindCase().equals(KindCase.FLOAT_VALUE)) { + statementBuilder.setDoubleParam(name, value.getFloatValue()); + } else { + throw new IllegalArgumentException("Malformed float64 value: " + value); + } + break; + case BOOL_TYPE: + if (value.getKindCase().equals(KindCase.KIND_NOT_SET)) { + statementBuilder.setBooleanParam(name, null); + } else if (value.getKindCase().equals(KindCase.BOOL_VALUE)) { + statementBuilder.setBooleanParam(name, value.getBoolValue()); + } else { + throw new IllegalArgumentException("Malformed boolean value: " + value); + } + break; + case TIMESTAMP_TYPE: + if (value.getKindCase().equals(KindCase.KIND_NOT_SET)) { + statementBuilder.setTimestampParam(name, null); + } else if (value.getKindCase().equals(KindCase.TIMESTAMP_VALUE)) { + Timestamp ts = value.getTimestampValue(); + statementBuilder.setTimestampParam(name, toInstant(ts)); + } else { + throw new IllegalArgumentException("Malformed timestamp value: " + value); + } + break; + case DATE_TYPE: + if (value.getKindCase().equals(KindCase.KIND_NOT_SET)) { + statementBuilder.setDateParam(name, null); + } else if (value.getKindCase().equals(KindCase.DATE_VALUE)) { + com.google.type.Date protoDate = value.getDateValue(); + statementBuilder.setDateParam(name, fromProto(protoDate)); + } else { + throw new IllegalArgumentException("Malformed boolean value: " + value); + } + break; + case ARRAY_TYPE: + SqlType.Array sqlType = (SqlType.Array) SqlType.fromProto(value.getType()); + if (value.getKindCase().equals(KindCase.KIND_NOT_SET)) { + statementBuilder.setListParam(name, null, sqlType); + } else if (value.getKindCase().equals(KindCase.ARRAY_VALUE)) { + List array = new ArrayList<>(); + for (Value elem : value.getArrayValue().getValuesList()) { + array.add(decodeArrayElement(elem, sqlType.getElementType())); + } + statementBuilder.setListParam(name, array, sqlType); + } else { + throw new IllegalArgumentException("Malformed array value: " + value); + } + break; + default: + throw new IllegalArgumentException("Unexpected query param type in param: " + value); + } + } + return statementBuilder.build(); + } + + static Object decodeArrayElement(Value value, SqlType elemType) { + if (value.getKindCase().equals(KindCase.KIND_NOT_SET)) { + return null; + } + switch (elemType.getCode()) { + case BYTES: + return value.getBytesValue(); + case STRING: + return value.getStringValue(); + case INT64: + return value.getIntValue(); + case FLOAT64: + return value.getFloatValue(); + case FLOAT32: + // cast to float so we produce List, etc + return (float) value.getFloatValue(); + case BOOL: + return value.getBoolValue(); + case TIMESTAMP: + return toInstant(value.getTimestampValue()); + case DATE: + return fromProto(value.getDateValue()); + default: + // We should have already thrown an exception in the SqlRowMerger + throw new IllegalStateException("Unsupported array query param element type: " + elemType); + } + } + + private static Instant toInstant(Timestamp timestamp) { + return Instant.ofEpochSecond(timestamp.getSeconds(), timestamp.getNanos()); + } + + private static Date fromProto(com.google.type.Date proto) { + return Date.fromYearMonthDay(proto.getYear(), proto.getMonth(), proto.getDay()); + } +} diff --git a/test-proxy/src/main/proto/test_proxy.proto b/test-proxy/src/main/proto/test_proxy.proto index e7caef0e7b..753ca82cc0 100644 --- a/test-proxy/src/main/proto/test_proxy.proto +++ b/test-proxy/src/main/proto/test_proxy.proto @@ -1,4 +1,4 @@ -// Copyright 2023 Google LLC +// Copyright 2024 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -217,6 +217,42 @@ message ReadModifyWriteRowRequest { google.bigtable.v2.ReadModifyWriteRowRequest request = 2; } +// Request to test proxy service to execute a query. +message ExecuteQueryRequest { + // The ID of the target client object. + string client_id = 1; + + // The raw request to the Bigtable server. + google.bigtable.v2.ExecuteQueryRequest request = 2; +} + +// Response from test proxy service for ExecuteQueryRequest. +message ExecuteQueryResult { + // The RPC status from the client binding. + google.rpc.Status status = 1; + + // deprecated + google.bigtable.v2.ResultSetMetadata result_set_metadata = 2; + + // Name and type information for the query result. + ResultSetMetadata metadata = 4; + + // Encoded version of the ResultSet. Should not contain type information. + repeated SqlRow rows = 3; +} + +// Schema information for the query result. +message ResultSetMetadata { + // Column metadata for each column inthe query result. + repeated google.bigtable.v2.ColumnMetadata columns = 1; +} + +// Representation of a single row in the query result. +message SqlRow { + // Columnar values returned by the query. + repeated google.bigtable.v2.Value values = 1; +} + // Note that all RPCs are unary, even when the equivalent client binding call // may be streaming. This is an intentional simplification. // @@ -279,4 +315,7 @@ service CloudBigtableV2TestProxy { // Performs a read-modify-write operation with the client. rpc ReadModifyWriteRow(ReadModifyWriteRowRequest) returns (RowResult) {} + + // Executes a BTQL query with the client. + rpc ExecuteQuery(ExecuteQueryRequest) returns (ExecuteQueryResult) {} }