diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/CookieInterceptor.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/CookieInterceptor.java new file mode 100644 index 0000000000..55df349bb9 --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/CookieInterceptor.java @@ -0,0 +1,83 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub; + +import static com.google.cloud.bigtable.data.v2.stub.CookiesHolder.COOKIES_HOLDER_KEY; +import static com.google.cloud.bigtable.data.v2.stub.CookiesHolder.ROUTING_COOKIE_KEY; +import static com.google.cloud.bigtable.data.v2.stub.CookiesHolder.ROUTING_COOKIE_METADATA_KEY; + +import com.google.protobuf.ByteString; +import com.google.rpc.ErrorInfo; +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.ForwardingClientCall; +import io.grpc.ForwardingClientCallListener; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.grpc.Status; +import io.grpc.protobuf.ProtoUtils; + +/** + * A cookie interceptor that checks the cookie value from returned ErrorInfo, updates the cookie + * holder, and inject it in the header of the next request. + */ +class CookieInterceptor implements ClientInterceptor { + + static final Metadata.Key ERROR_INFO_KEY = + ProtoUtils.keyForProto(ErrorInfo.getDefaultInstance()); + + @Override + public ClientCall interceptCall( + MethodDescriptor methodDescriptor, CallOptions callOptions, Channel channel) { + return new ForwardingClientCall.SimpleForwardingClientCall( + channel.newCall(methodDescriptor, callOptions)) { + @Override + public void start(Listener responseListener, Metadata headers) { + CookiesHolder cookie = callOptions.getOption(COOKIES_HOLDER_KEY); + if (cookie != null && cookie.getRoutingCookie() != null) { + headers.put(ROUTING_COOKIE_METADATA_KEY, cookie.getRoutingCookie().toByteArray()); + } + super.start(new UpdateCookieListener<>(responseListener, callOptions), headers); + } + }; + } + + static class UpdateCookieListener + extends ForwardingClientCallListener.SimpleForwardingClientCallListener { + + private final CallOptions callOptions; + + UpdateCookieListener(ClientCall.Listener delegate, CallOptions callOptions) { + super(delegate); + this.callOptions = callOptions; + } + + @Override + public void onClose(Status status, Metadata trailers) { + if (status != Status.OK && trailers != null) { + ErrorInfo errorInfo = trailers.get(ERROR_INFO_KEY); + if (errorInfo != null) { + CookiesHolder cookieHolder = callOptions.getOption(COOKIES_HOLDER_KEY); + cookieHolder.setRoutingCookie( + ByteString.copyFromUtf8(errorInfo.getMetadataMap().get(ROUTING_COOKIE_KEY))); + } + } + super.onClose(status, trailers); + } + } +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/CookiesHolder.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/CookiesHolder.java new file mode 100644 index 0000000000..9a60de15dc --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/CookiesHolder.java @@ -0,0 +1,43 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub; + +import com.google.protobuf.ByteString; +import io.grpc.CallOptions; +import io.grpc.Metadata; +import javax.annotation.Nullable; + +/** A cookie that holds information for the retry */ +class CookiesHolder { + + static final CallOptions.Key COOKIES_HOLDER_KEY = + CallOptions.Key.create("bigtable-cookies"); + + static final String ROUTING_COOKIE_KEY = "bigtable-routing-cookie"; + + static final Metadata.Key ROUTING_COOKIE_METADATA_KEY = + Metadata.Key.of("bigtable-routing-cookie-bin", Metadata.BINARY_BYTE_MARSHALLER); + + @Nullable private ByteString routingCookie; + + void setRoutingCookie(@Nullable ByteString routingCookie) { + this.routingCookie = routingCookie; + } + + ByteString getRoutingCookie() { + return this.routingCookie; + } +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/CookiesServerStreamingCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/CookiesServerStreamingCallable.java new file mode 100644 index 0000000000..1d897f18cf --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/CookiesServerStreamingCallable.java @@ -0,0 +1,44 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub; + +import static com.google.cloud.bigtable.data.v2.stub.CookiesHolder.COOKIES_HOLDER_KEY; + +import com.google.api.gax.grpc.GrpcCallContext; +import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.ResponseObserver; +import com.google.api.gax.rpc.ServerStreamingCallable; + +public class CookiesServerStreamingCallable + extends ServerStreamingCallable { + + private final ServerStreamingCallable callable; + + CookiesServerStreamingCallable(ServerStreamingCallable innerCallable) { + this.callable = innerCallable; + } + + @Override + public void call( + RequestT request, ResponseObserver responseObserver, ApiCallContext context) { + GrpcCallContext grpcCallContext = (GrpcCallContext) context; + callable.call( + request, + responseObserver, + grpcCallContext.withCallOptions( + grpcCallContext.getCallOptions().withOption(COOKIES_HOLDER_KEY, new CookiesHolder()))); + } +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/CookiesUnaryCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/CookiesUnaryCallable.java new file mode 100644 index 0000000000..5e0b33d03a --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/CookiesUnaryCallable.java @@ -0,0 +1,41 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub; + +import static com.google.cloud.bigtable.data.v2.stub.CookiesHolder.COOKIES_HOLDER_KEY; + +import com.google.api.core.ApiFuture; +import com.google.api.gax.grpc.GrpcCallContext; +import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.UnaryCallable; + +/** Cookie callable injects a placeholder for bigtable retry cookie. */ +class CookiesUnaryCallable extends UnaryCallable { + private UnaryCallable innerCallable; + + CookiesUnaryCallable(UnaryCallable callable) { + this.innerCallable = callable; + } + + @Override + public ApiFuture futureCall(RequestT request, ApiCallContext context) { + GrpcCallContext grpcCallContext = (GrpcCallContext) context; + return innerCallable.futureCall( + request, + grpcCallContext.withCallOptions( + grpcCallContext.getCallOptions().withOption(COOKIES_HOLDER_KEY, new CookiesHolder()))); + } +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index 474c140392..3ab9957259 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -365,7 +365,9 @@ public ServerStreamingCallable createReadRowsCallable( new TracedServerStreamingCallable<>( readRowsUserCallable, clientContext.getTracerFactory(), span); - return traced.withDefaultCallContext(clientContext.getDefaultCallContext()); + ServerStreamingCallable withCookie = new CookiesServerStreamingCallable<>(traced); + + return withCookie.withDefaultCallContext(clientContext.getDefaultCallContext()); } /** @@ -401,7 +403,9 @@ public UnaryCallable createReadRowCallable(RowAdapter new TracedUnaryCallable<>( firstRow, clientContext.getTracerFactory(), getSpanName("ReadRow")); - return traced.withDefaultCallContext(clientContext.getDefaultCallContext()); + UnaryCallable withCookie = new CookiesUnaryCallable<>(traced); + + return withCookie.withDefaultCallContext(clientContext.getDefaultCallContext()); } /** @@ -1013,7 +1017,9 @@ private UnaryCallable createUserFacin UnaryCallable traced = new TracedUnaryCallable<>(inner, clientContext.getTracerFactory(), getSpanName(methodName)); - return traced.withDefaultCallContext(clientContext.getDefaultCallContext()); + UnaryCallable withCookie = new CookiesUnaryCallable<>(traced); + + return withCookie.withDefaultCallContext(clientContext.getDefaultCallContext()); } private UnaryCallable createPingAndWarmCallable() { diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java index fd54daa0d5..365b2a9205 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java @@ -329,7 +329,8 @@ public static InstantiatingGrpcChannelProvider.Builder defaultGrpcTransportProvi Duration.ofSeconds(10)) // wait this long before considering the connection dead // Attempts direct access to CBT service over gRPC to improve throughput, // whether the attempt is allowed is totally controlled by service owner. - .setAttemptDirectPath(true); + .setAttemptDirectPath(true) + .setInterceptorProvider(() -> ImmutableList.of(new CookieInterceptor())); } @SuppressWarnings("WeakerAccess") diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/CookieHolderTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/CookieHolderTest.java new file mode 100644 index 0000000000..5321144f95 --- /dev/null +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/CookieHolderTest.java @@ -0,0 +1,131 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub; + +import static com.google.cloud.bigtable.data.v2.stub.CookieInterceptor.ERROR_INFO_KEY; +import static com.google.cloud.bigtable.data.v2.stub.CookiesHolder.ROUTING_COOKIE_KEY; +import static com.google.cloud.bigtable.data.v2.stub.CookiesHolder.ROUTING_COOKIE_METADATA_KEY; +import static com.google.common.truth.Truth.assertThat; + +import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; +import com.google.bigtable.v2.BigtableGrpc; +import com.google.bigtable.v2.MutateRowRequest; +import com.google.bigtable.v2.MutateRowResponse; +import com.google.cloud.bigtable.data.v2.BigtableDataClient; +import com.google.cloud.bigtable.data.v2.BigtableDataSettings; +import com.google.cloud.bigtable.data.v2.FakeServiceBuilder; +import com.google.cloud.bigtable.data.v2.models.RowMutation; +import com.google.common.collect.ImmutableList; +import com.google.rpc.ErrorInfo; +import io.grpc.Metadata; +import io.grpc.Server; +import io.grpc.ServerCall; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class CookieHolderTest { + private Server server; + private FakeService fakeService = new FakeService(); + + private BigtableDataClient client; + + private List serverMetadata = new ArrayList<>(); + + @Before + public void setup() throws Exception { + ServerInterceptor serverInterceptor = + new ServerInterceptor() { + @Override + public ServerCall.Listener interceptCall( + ServerCall serverCall, + Metadata metadata, + ServerCallHandler serverCallHandler) { + serverMetadata.add(metadata); + return serverCallHandler.startCall(serverCall, metadata); + } + }; + + server = FakeServiceBuilder.create(fakeService).intercept(serverInterceptor).start(); + + BigtableDataSettings.Builder settings = + BigtableDataSettings.newBuilderForEmulator(server.getPort()) + .setProjectId("fake-project") + .setInstanceId("fake-instance"); + + InstantiatingGrpcChannelProvider channelProvider = + (InstantiatingGrpcChannelProvider) settings.stubSettings().getTransportChannelProvider(); + settings + .stubSettings() + .setTransportChannelProvider( + channelProvider + .toBuilder() + .setInterceptorProvider(() -> ImmutableList.of(new CookieInterceptor())) + .build()); + + client = BigtableDataClient.create(settings.build()); + } + + @Test + public void testRetryCookieIsForwarded() { + client.mutateRow(RowMutation.create("fake-table", "fake-row").setCell("cf", "q", "v")); + + assertThat(serverMetadata.size()).isEqualTo(fakeService.count.get()); + byte[] bytes = serverMetadata.get(1).get(ROUTING_COOKIE_METADATA_KEY); + assertThat(new String(bytes, StandardCharsets.UTF_8)).isEqualTo("test-routing-cookie"); + + serverMetadata.clear(); + } + + @After + public void tearDown() throws Exception { + client.close(); + server.shutdown(); + } + + class FakeService extends BigtableGrpc.BigtableImplBase { + + private AtomicInteger count = new AtomicInteger(); + + @Override + public void mutateRow( + MutateRowRequest request, StreamObserver responseObserver) { + if (count.getAndIncrement() < 1) { + Metadata trailers = new Metadata(); + ErrorInfo errorInfo = + ErrorInfo.newBuilder().putMetadata(ROUTING_COOKIE_KEY, "test-routing-cookie").build(); + trailers.put(ERROR_INFO_KEY, errorInfo); + StatusRuntimeException exception = new StatusRuntimeException(Status.UNAVAILABLE, trailers); + responseObserver.onError(exception); + return; + } + responseObserver.onNext(MutateRowResponse.getDefaultInstance()); + responseObserver.onCompleted(); + } + } +}