Skip to content

Commit

Permalink
do not use the stub
Browse files Browse the repository at this point in the history
  • Loading branch information
mutianf committed Dec 2, 2024
1 parent 990330b commit dd6b7ce
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,22 @@
package com.google.cloud.bigtable.data.v2.stub;

import com.google.api.core.BetaApi;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.grpc.ChannelPrimer;
import com.google.api.gax.grpc.GrpcTransportChannel;
import com.google.api.gax.rpc.FixedTransportChannelProvider;
import com.google.api.gax.rpc.InstantiatingWatchdogProvider;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.bigtable.v2.BigtableGrpc;
import com.google.bigtable.v2.InstanceName;
import com.google.bigtable.v2.PingAndWarmRequest;
import com.google.cloud.bigtable.data.v2.internal.NameUtil;
import com.google.common.base.Preconditions;
import com.google.bigtable.v2.PingAndWarmResponse;
import com.google.common.net.PercentEscaper;
import io.grpc.CallCredentials;
import io.grpc.ClientCall;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.auth.MoreCallCredentials;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;

/**
Expand All @@ -40,22 +45,46 @@
class BigtableChannelPrimer implements ChannelPrimer {
private static Logger LOG = Logger.getLogger(BigtableChannelPrimer.class.toString());

private final EnhancedBigtableStubSettings settingsTemplate;
private final Metadata.Key<String> requestParams =
Metadata.Key.of("x-goog-request-params", Metadata.ASCII_STRING_MARSHALLER);
private final PingAndWarmRequest request;
private final Metadata metadata = new Metadata();
private final CallCredentials credentials;

static BigtableChannelPrimer create(EnhancedBigtableStubSettings settings) {
EnhancedBigtableStubSettings.Builder builder = settings.toBuilder();
builder
.setRefreshingChannel(false)
.setBackgroundExecutorProvider(
InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(1).build())
.setStreamWatchdogProvider(InstantiatingWatchdogProvider.create());

return new BigtableChannelPrimer(builder.build());
static BigtableChannelPrimer create(EnhancedBigtableStubSettings settings) throws IOException {
return new BigtableChannelPrimer(settings);
}

private BigtableChannelPrimer(EnhancedBigtableStubSettings settingsTemplate) {
Preconditions.checkNotNull(settingsTemplate, "settingsTemplate can't be null");
this.settingsTemplate = settingsTemplate;
BigtableChannelPrimer(EnhancedBigtableStubSettings settings) throws IOException {
String projectId = settings.getProjectId();
String instanceId = settings.getInstanceId();
String appProfileId = settings.getAppProfileId();

if (settings.getCredentialsProvider().getCredentials() != null) {
credentials = MoreCallCredentials.from(settings.getCredentialsProvider().getCredentials());
} else {
credentials = null;
}

request =
PingAndWarmRequest.newBuilder()
.setName(InstanceName.format(projectId, instanceId))
.setAppProfileId(appProfileId)
.build();

PercentEscaper escaper = new PercentEscaper("._-~", false);

Metadata metadata = new Metadata();
settings
.getHeaderProvider()
.getHeaders()
.forEach((k, v) -> metadata.put(Metadata.Key.of(k, Metadata.ASCII_STRING_MARSHALLER), v));

metadata.put(
requestParams,
escaper.escape(
String.format(
"name=%s&app_profile_id=%s", request.getName(), request.getAppProfileId())));
}

@Override
Expand All @@ -72,35 +101,43 @@ private void primeChannelUnsafe(ManagedChannel managedChannel) throws IOExceptio
sendPrimeRequests(managedChannel);
}

private void sendPrimeRequests(ManagedChannel managedChannel) throws IOException {
// Wrap the channel in a temporary stub
EnhancedBigtableStubSettings primingSettings =
settingsTemplate
.toBuilder()
.setTransportChannelProvider(
FixedTransportChannelProvider.create(GrpcTransportChannel.create(managedChannel)))
.build();
private void sendPrimeRequests(ManagedChannel managedChannel) {
try {
ClientCall<PingAndWarmRequest, PingAndWarmResponse> clientCall =
managedChannel.newCall(
BigtableGrpc.getPingAndWarmMethod(),
GrpcCallContext.createDefault().getCallOptions().withCallCredentials(credentials));

SettableApiFuture<PingAndWarmResponse> future = SettableApiFuture.create();
clientCall.start(
new ClientCall.Listener<PingAndWarmResponse>() {
PingAndWarmResponse response;

@Override
public void onMessage(PingAndWarmResponse message) {
response = message;
}

@Override
public void onClose(Status status, Metadata trailers) {
if (status.isOk()) {
future.set(response);
} else {
future.setException(status.asException());
}
}
},
metadata);
clientCall.sendMessage(request);
clientCall.halfClose();
clientCall.request(1);

try (EnhancedBigtableStub stub = EnhancedBigtableStub.create(primingSettings)) {
PingAndWarmRequest request =
PingAndWarmRequest.newBuilder()
.setName(
NameUtil.formatInstanceName(
primingSettings.getProjectId(), primingSettings.getInstanceId()))
.setAppProfileId(primingSettings.getAppProfileId())
.build();

try {
stub.pingAndWarmCallable().call(request);
} catch (Throwable e) {
// TODO: Not sure if we should swallow the error here. We are pre-emptively swapping
// channels if the new
// channel is bad.
if (e instanceof ExecutionException) {
e = e.getCause();
}
LOG.warning(String.format("Failed to prime channel: %s", e));
}
future.get(1, TimeUnit.MINUTES);
} catch (Throwable e) {
// TODO: Not sure if we should swallow the error here. We are pre-emptively swapping
// channels if the new
// channel is bad.
LOG.warning(String.format("Failed to prime channel: %s", e));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,6 @@ public class EnhancedBigtableStub implements AutoCloseable {
private final UnaryCallable<BulkMutation, Void> externalBulkMutateRowsCallable;
private final UnaryCallable<ConditionalRowMutation, Boolean> checkAndMutateRowCallable;
private final UnaryCallable<ReadModifyWriteRow, Row> readModifyWriteRowCallable;
private final UnaryCallable<PingAndWarmRequest, PingAndWarmResponse> pingAndWarmCallable;

private final ServerStreamingCallable<String, ByteStringRange>
generateInitialChangeStreamPartitionsCallable;
Expand Down Expand Up @@ -321,7 +320,6 @@ public EnhancedBigtableStub(
createGenerateInitialChangeStreamPartitionsCallable();
readChangeStreamCallable =
createReadChangeStreamCallable(new DefaultChangeStreamRecordAdapter());
pingAndWarmCallable = createPingAndWarmCallable();
executeQueryCallable = createExecuteQueryCallable();
}

Expand Down Expand Up @@ -1252,28 +1250,6 @@ ServerStreamingCallSettings<ReqT, RespT> convertUnaryToServerStreamingSettings(
.build();
}

private UnaryCallable<PingAndWarmRequest, PingAndWarmResponse> createPingAndWarmCallable() {
UnaryCallable<PingAndWarmRequest, PingAndWarmResponse> pingAndWarm =
GrpcRawCallableFactory.createUnaryCallable(
GrpcCallSettings.<PingAndWarmRequest, PingAndWarmResponse>newBuilder()
.setMethodDescriptor(BigtableGrpc.getPingAndWarmMethod())
.setParamsExtractor(
new RequestParamsExtractor<PingAndWarmRequest>() {
@Override
public Map<String, String> extract(PingAndWarmRequest request) {
return ImmutableMap.of(
"name", request.getName(),
"app_profile_id", request.getAppProfileId());
}
})
.build(),
Collections.emptySet());
return pingAndWarm.withDefaultCallContext(
clientContext
.getDefaultCallContext()
.withRetrySettings(settings.pingAndWarmSettings().getRetrySettings()));
}

private <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> withRetries(
UnaryCallable<RequestT, ResponseT> innerCallable, UnaryCallSettings<?, ?> unaryCallSettings) {
UnaryCallable<RequestT, ResponseT> retrying;
Expand Down Expand Up @@ -1381,10 +1357,6 @@ public ExecuteQueryCallable executeQueryCallable() {
return executeQueryCallable;
}

UnaryCallable<PingAndWarmRequest, PingAndWarmResponse> pingAndWarmCallable() {
return pingAndWarmCallable;
}

// </editor-fold>

private SpanName getSpanName(String methodName) {
Expand Down

0 comments on commit dd6b7ce

Please sign in to comment.