diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java index 3fd6614a3..80e977f76 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java @@ -16,17 +16,22 @@ package com.google.cloud.bigtable.data.v2.stub; import com.google.api.core.BetaApi; -import com.google.api.gax.core.InstantiatingExecutorProvider; +import com.google.api.core.SettableApiFuture; import com.google.api.gax.grpc.ChannelPrimer; -import com.google.api.gax.grpc.GrpcTransportChannel; -import com.google.api.gax.rpc.FixedTransportChannelProvider; -import com.google.api.gax.rpc.InstantiatingWatchdogProvider; +import com.google.api.gax.grpc.GrpcCallContext; +import com.google.bigtable.v2.BigtableGrpc; +import com.google.bigtable.v2.InstanceName; import com.google.bigtable.v2.PingAndWarmRequest; -import com.google.cloud.bigtable.data.v2.internal.NameUtil; -import com.google.common.base.Preconditions; +import com.google.bigtable.v2.PingAndWarmResponse; +import com.google.common.net.PercentEscaper; +import io.grpc.CallCredentials; +import io.grpc.ClientCall; import io.grpc.ManagedChannel; +import io.grpc.Metadata; +import io.grpc.Status; +import io.grpc.auth.MoreCallCredentials; import java.io.IOException; -import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.logging.Logger; /** @@ -40,22 +45,46 @@ class BigtableChannelPrimer implements ChannelPrimer { private static Logger LOG = Logger.getLogger(BigtableChannelPrimer.class.toString()); - private final EnhancedBigtableStubSettings settingsTemplate; + private final Metadata.Key requestParams = + Metadata.Key.of("x-goog-request-params", Metadata.ASCII_STRING_MARSHALLER); + private final PingAndWarmRequest request; + private final Metadata metadata = new Metadata(); + private final CallCredentials credentials; - static BigtableChannelPrimer create(EnhancedBigtableStubSettings settings) { - EnhancedBigtableStubSettings.Builder builder = settings.toBuilder(); - builder - .setRefreshingChannel(false) - .setBackgroundExecutorProvider( - InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(1).build()) - .setStreamWatchdogProvider(InstantiatingWatchdogProvider.create()); - - return new BigtableChannelPrimer(builder.build()); + static BigtableChannelPrimer create(EnhancedBigtableStubSettings settings) throws IOException { + return new BigtableChannelPrimer(settings); } - private BigtableChannelPrimer(EnhancedBigtableStubSettings settingsTemplate) { - Preconditions.checkNotNull(settingsTemplate, "settingsTemplate can't be null"); - this.settingsTemplate = settingsTemplate; + BigtableChannelPrimer(EnhancedBigtableStubSettings settings) throws IOException { + String projectId = settings.getProjectId(); + String instanceId = settings.getInstanceId(); + String appProfileId = settings.getAppProfileId(); + + if (settings.getCredentialsProvider().getCredentials() != null) { + credentials = MoreCallCredentials.from(settings.getCredentialsProvider().getCredentials()); + } else { + credentials = null; + } + + request = + PingAndWarmRequest.newBuilder() + .setName(InstanceName.format(projectId, instanceId)) + .setAppProfileId(appProfileId) + .build(); + + PercentEscaper escaper = new PercentEscaper("._-~", false); + + Metadata metadata = new Metadata(); + settings + .getHeaderProvider() + .getHeaders() + .forEach((k, v) -> metadata.put(Metadata.Key.of(k, Metadata.ASCII_STRING_MARSHALLER), v)); + + metadata.put( + requestParams, + escaper.escape( + String.format( + "name=%s&app_profile_id=%s", request.getName(), request.getAppProfileId()))); } @Override @@ -72,35 +101,43 @@ private void primeChannelUnsafe(ManagedChannel managedChannel) throws IOExceptio sendPrimeRequests(managedChannel); } - private void sendPrimeRequests(ManagedChannel managedChannel) throws IOException { - // Wrap the channel in a temporary stub - EnhancedBigtableStubSettings primingSettings = - settingsTemplate - .toBuilder() - .setTransportChannelProvider( - FixedTransportChannelProvider.create(GrpcTransportChannel.create(managedChannel))) - .build(); + private void sendPrimeRequests(ManagedChannel managedChannel) { + try { + ClientCall clientCall = + managedChannel.newCall( + BigtableGrpc.getPingAndWarmMethod(), + GrpcCallContext.createDefault().getCallOptions().withCallCredentials(credentials)); + + SettableApiFuture future = SettableApiFuture.create(); + clientCall.start( + new ClientCall.Listener() { + PingAndWarmResponse response; + + @Override + public void onMessage(PingAndWarmResponse message) { + response = message; + } + + @Override + public void onClose(Status status, Metadata trailers) { + if (status.isOk()) { + future.set(response); + } else { + future.setException(status.asException()); + } + } + }, + metadata); + clientCall.sendMessage(request); + clientCall.halfClose(); + clientCall.request(1); - try (EnhancedBigtableStub stub = EnhancedBigtableStub.create(primingSettings)) { - PingAndWarmRequest request = - PingAndWarmRequest.newBuilder() - .setName( - NameUtil.formatInstanceName( - primingSettings.getProjectId(), primingSettings.getInstanceId())) - .setAppProfileId(primingSettings.getAppProfileId()) - .build(); - - try { - stub.pingAndWarmCallable().call(request); - } catch (Throwable e) { - // TODO: Not sure if we should swallow the error here. We are pre-emptively swapping - // channels if the new - // channel is bad. - if (e instanceof ExecutionException) { - e = e.getCause(); - } - LOG.warning(String.format("Failed to prime channel: %s", e)); - } + future.get(1, TimeUnit.MINUTES); + } catch (Throwable e) { + // TODO: Not sure if we should swallow the error here. We are pre-emptively swapping + // channels if the new + // channel is bad. + LOG.warning(String.format("Failed to prime channel: %s", e)); } } } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index 5cab91c92..cc9524397 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -188,7 +188,6 @@ public class EnhancedBigtableStub implements AutoCloseable { private final UnaryCallable externalBulkMutateRowsCallable; private final UnaryCallable checkAndMutateRowCallable; private final UnaryCallable readModifyWriteRowCallable; - private final UnaryCallable pingAndWarmCallable; private final ServerStreamingCallable generateInitialChangeStreamPartitionsCallable; @@ -321,7 +320,6 @@ public EnhancedBigtableStub( createGenerateInitialChangeStreamPartitionsCallable(); readChangeStreamCallable = createReadChangeStreamCallable(new DefaultChangeStreamRecordAdapter()); - pingAndWarmCallable = createPingAndWarmCallable(); executeQueryCallable = createExecuteQueryCallable(); } @@ -1252,28 +1250,6 @@ ServerStreamingCallSettings convertUnaryToServerStreamingSettings( .build(); } - private UnaryCallable createPingAndWarmCallable() { - UnaryCallable pingAndWarm = - GrpcRawCallableFactory.createUnaryCallable( - GrpcCallSettings.newBuilder() - .setMethodDescriptor(BigtableGrpc.getPingAndWarmMethod()) - .setParamsExtractor( - new RequestParamsExtractor() { - @Override - public Map extract(PingAndWarmRequest request) { - return ImmutableMap.of( - "name", request.getName(), - "app_profile_id", request.getAppProfileId()); - } - }) - .build(), - Collections.emptySet()); - return pingAndWarm.withDefaultCallContext( - clientContext - .getDefaultCallContext() - .withRetrySettings(settings.pingAndWarmSettings().getRetrySettings())); - } - private UnaryCallable withRetries( UnaryCallable innerCallable, UnaryCallSettings unaryCallSettings) { UnaryCallable retrying; @@ -1381,10 +1357,6 @@ public ExecuteQueryCallable executeQueryCallable() { return executeQueryCallable; } - UnaryCallable pingAndWarmCallable() { - return pingAndWarmCallable; - } - // private SpanName getSpanName(String methodName) {