Skip to content

Commit

Permalink
fix: send priming requests on the channel directly (#2435)
Browse files Browse the repository at this point in the history
Send priming requests on the channel instead of using the stub. This means that we'll not collect metrics on ping and warm requests.

Fixes #2371 ☕️

If you write sample code, please follow the [samples format](
https://togithub.com/GoogleCloudPlatform/java-docs-samples/blob/main/SAMPLE_FORMAT.md).
  • Loading branch information
mutianf authored Dec 3, 2024
1 parent 73557c3 commit b76698d
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 92 deletions.
1 change: 0 additions & 1 deletion google-cloud-bigtable/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,6 @@
grpc-auth is not directly used transitively, but is pulled to align with other grpc parts
opencensus-impl-core is brought in transitively through opencensus-impl
-->
<usedDependencies>io.grpc:grpc-auth</usedDependencies>
<ignoredUsedUndeclaredDependencies>
<ignoredUsedUndeclaredDependency>io.opencensus:opencensus-impl-core</ignoredUsedUndeclaredDependency>
</ignoredUsedUndeclaredDependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,27 @@
package com.google.cloud.bigtable.data.v2.stub;

import com.google.api.core.BetaApi;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.grpc.ChannelPrimer;
import com.google.api.gax.grpc.GrpcTransportChannel;
import com.google.api.gax.rpc.FixedTransportChannelProvider;
import com.google.auth.Credentials;
import com.google.bigtable.v2.BigtableGrpc;
import com.google.bigtable.v2.InstanceName;
import com.google.bigtable.v2.PingAndWarmRequest;
import com.google.cloud.bigtable.data.v2.internal.NameUtil;
import com.google.common.base.Preconditions;
import com.google.bigtable.v2.PingAndWarmResponse;
import io.grpc.CallCredentials;
import io.grpc.CallOptions;
import io.grpc.ClientCall;
import io.grpc.Deadline;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.auth.MoreCallCredentials;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
Expand All @@ -41,72 +50,113 @@
class BigtableChannelPrimer implements ChannelPrimer {
private static Logger LOG = Logger.getLogger(BigtableChannelPrimer.class.toString());

private final EnhancedBigtableStubSettings settingsTemplate;
static final Metadata.Key<String> REQUEST_PARAMS =
Metadata.Key.of("x-goog-request-params", Metadata.ASCII_STRING_MARSHALLER);
private final PingAndWarmRequest request;
private final CallCredentials callCredentials;
private final Map<String, String> headers;

static BigtableChannelPrimer create(
Credentials credentials, String projectId, String instanceId, String appProfileId) {
EnhancedBigtableStubSettings.Builder builder =
EnhancedBigtableStubSettings.newBuilder()
.setProjectId(projectId)
.setInstanceId(instanceId)
.setAppProfileId(appProfileId)
.setCredentialsProvider(FixedCredentialsProvider.create(credentials))
// Disable refreshing channel here to avoid creating settings in a loop
.setRefreshingChannel(false)
.setExecutorProvider(
InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(1).build());

return new BigtableChannelPrimer(builder.build());
String projectId,
String instanceId,
String appProfileId,
Credentials credentials,
Map<String, String> headers) {
return new BigtableChannelPrimer(projectId, instanceId, appProfileId, credentials, headers);
}

private BigtableChannelPrimer(EnhancedBigtableStubSettings settingsTemplate) {
Preconditions.checkNotNull(settingsTemplate, "settingsTemplate can't be null");
this.settingsTemplate = settingsTemplate;
BigtableChannelPrimer(
String projectId,
String instanceId,
String appProfileId,
Credentials credentials,
Map<String, String> headers) {
if (credentials != null) {
callCredentials = MoreCallCredentials.from(credentials);
} else {
callCredentials = null;
}

request =
PingAndWarmRequest.newBuilder()
.setName(InstanceName.format(projectId, instanceId))
.setAppProfileId(appProfileId)
.build();

this.headers = headers;
}

@Override
public void primeChannel(ManagedChannel managedChannel) {
try {
primeChannelUnsafe(managedChannel);
} catch (IOException | RuntimeException e) {
LOG.warning(
String.format("Unexpected error while trying to prime a channel: %s", e.getMessage()));
LOG.log(Level.WARNING, "Unexpected error while trying to prime a channel", e);
}
}

private void primeChannelUnsafe(ManagedChannel managedChannel) throws IOException {
sendPrimeRequests(managedChannel);
}

private void sendPrimeRequests(ManagedChannel managedChannel) throws IOException {
// Wrap the channel in a temporary stub
EnhancedBigtableStubSettings primingSettings =
settingsTemplate
.toBuilder()
.setTransportChannelProvider(
FixedTransportChannelProvider.create(GrpcTransportChannel.create(managedChannel)))
.build();
private void sendPrimeRequests(ManagedChannel managedChannel) {
try {
ClientCall<PingAndWarmRequest, PingAndWarmResponse> clientCall =
managedChannel.newCall(
BigtableGrpc.getPingAndWarmMethod(),
CallOptions.DEFAULT
.withCallCredentials(callCredentials)
.withDeadline(Deadline.after(1, TimeUnit.MINUTES)));

try (EnhancedBigtableStub stub = EnhancedBigtableStub.create(primingSettings)) {
PingAndWarmRequest request =
PingAndWarmRequest.newBuilder()
.setName(
NameUtil.formatInstanceName(
primingSettings.getProjectId(), primingSettings.getInstanceId()))
.setAppProfileId(primingSettings.getAppProfileId())
.build();

try {
stub.pingAndWarmCallable().call(request);
} catch (Throwable e) {
// TODO: Not sure if we should swallow the error here. We are pre-emptively swapping
// channels if the new
// channel is bad.
if (e instanceof ExecutionException) {
e = e.getCause();
}
LOG.warning(String.format("Failed to prime channel: %s", e));
}
SettableApiFuture<PingAndWarmResponse> future = SettableApiFuture.create();
clientCall.start(
new ClientCall.Listener<PingAndWarmResponse>() {
PingAndWarmResponse response;

@Override
public void onMessage(PingAndWarmResponse message) {
response = message;
}

@Override
public void onClose(Status status, Metadata trailers) {
if (status.isOk()) {
future.set(response);
} else {
future.setException(status.asException());
}
}
},
createMetadata(headers, request));
clientCall.sendMessage(request);
clientCall.halfClose();
clientCall.request(Integer.MAX_VALUE);

future.get(1, TimeUnit.MINUTES);
} catch (Throwable e) {
// TODO: Not sure if we should swallow the error here. We are pre-emptively swapping
// channels if the new
// channel is bad.
LOG.log(Level.WARNING, "Failed to prime channel", e);
}
}

private static Metadata createMetadata(Map<String, String> headers, PingAndWarmRequest request) {
Metadata metadata = new Metadata();

headers.forEach(
(k, v) -> metadata.put(Metadata.Key.of(k, Metadata.ASCII_STRING_MARSHALLER), v));
try {
metadata.put(
REQUEST_PARAMS,
String.format(
"name=%s&app_profile_id=%s",
URLEncoder.encode(request.getName(), "UTF-8"),
URLEncoder.encode(request.getAppProfileId(), "UTF-8")));
} catch (UnsupportedEncodingException e) {
LOG.log(Level.WARNING, "Failed to encode request params", e);
}

return metadata;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,11 @@ public static BigtableClientContext create(EnhancedBigtableStubSettings settings
if (builder.isRefreshingChannel()) {
transportProvider.setChannelPrimer(
BigtableChannelPrimer.create(
builder.getProjectId(),
builder.getInstanceId(),
builder.getAppProfileId(),
credentials,
settings.getProjectId(),
settings.getInstanceId(),
settings.getAppProfileId()));
builder.getHeaderProvider().getHeaders()));
}

builder.setTransportChannelProvider(transportProvider.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@
import com.google.bigtable.v2.GenerateInitialChangeStreamPartitionsResponse;
import com.google.bigtable.v2.MutateRowsRequest;
import com.google.bigtable.v2.MutateRowsResponse;
import com.google.bigtable.v2.PingAndWarmRequest;
import com.google.bigtable.v2.PingAndWarmResponse;
import com.google.bigtable.v2.ReadChangeStreamRequest;
import com.google.bigtable.v2.ReadChangeStreamResponse;
import com.google.bigtable.v2.ReadRowsRequest;
Expand Down Expand Up @@ -188,7 +186,6 @@ public class EnhancedBigtableStub implements AutoCloseable {
private final UnaryCallable<BulkMutation, Void> externalBulkMutateRowsCallable;
private final UnaryCallable<ConditionalRowMutation, Boolean> checkAndMutateRowCallable;
private final UnaryCallable<ReadModifyWriteRow, Row> readModifyWriteRowCallable;
private final UnaryCallable<PingAndWarmRequest, PingAndWarmResponse> pingAndWarmCallable;

private final ServerStreamingCallable<String, ByteStringRange>
generateInitialChangeStreamPartitionsCallable;
Expand Down Expand Up @@ -321,7 +318,6 @@ public EnhancedBigtableStub(
createGenerateInitialChangeStreamPartitionsCallable();
readChangeStreamCallable =
createReadChangeStreamCallable(new DefaultChangeStreamRecordAdapter());
pingAndWarmCallable = createPingAndWarmCallable();
executeQueryCallable = createExecuteQueryCallable();
}

Expand Down Expand Up @@ -1252,28 +1248,6 @@ ServerStreamingCallSettings<ReqT, RespT> convertUnaryToServerStreamingSettings(
.build();
}

private UnaryCallable<PingAndWarmRequest, PingAndWarmResponse> createPingAndWarmCallable() {
UnaryCallable<PingAndWarmRequest, PingAndWarmResponse> pingAndWarm =
GrpcRawCallableFactory.createUnaryCallable(
GrpcCallSettings.<PingAndWarmRequest, PingAndWarmResponse>newBuilder()
.setMethodDescriptor(BigtableGrpc.getPingAndWarmMethod())
.setParamsExtractor(
new RequestParamsExtractor<PingAndWarmRequest>() {
@Override
public Map<String, String> extract(PingAndWarmRequest request) {
return ImmutableMap.of(
"name", request.getName(),
"app_profile_id", request.getAppProfileId());
}
})
.build(),
Collections.emptySet());
return pingAndWarm.withDefaultCallContext(
clientContext
.getDefaultCallContext()
.withRetrySettings(settings.pingAndWarmSettings().getRetrySettings()));
}

private <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> withRetries(
UnaryCallable<RequestT, ResponseT> innerCallable, UnaryCallSettings<?, ?> unaryCallSettings) {
UnaryCallable<RequestT, ResponseT> retrying;
Expand Down Expand Up @@ -1381,10 +1355,6 @@ public ExecuteQueryCallable executeQueryCallable() {
return executeQueryCallable;
}

UnaryCallable<PingAndWarmRequest, PingAndWarmResponse> pingAndWarmCallable() {
return pingAndWarmCallable;
}

// </editor-fold>

private SpanName getSpanName(String methodName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.bigtable.v2.PingAndWarmRequest;
import com.google.bigtable.v2.PingAndWarmResponse;
import com.google.cloud.bigtable.data.v2.FakeServiceBuilder;
import com.google.common.collect.ImmutableMap;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
Expand Down Expand Up @@ -69,10 +70,11 @@ public void setup() throws IOException {

primer =
BigtableChannelPrimer.create(
OAuth2Credentials.create(new AccessToken(TOKEN_VALUE, null)),
"fake-project",
"fake-instance",
"fake-app-profile");
"fake-app-profile",
OAuth2Credentials.create(new AccessToken(TOKEN_VALUE, null)),
ImmutableMap.of("bigtable-feature", "fake-feature"));

channel =
ManagedChannelBuilder.forAddress("localhost", server.getPort()).usePlaintext().build();
Expand Down Expand Up @@ -133,7 +135,7 @@ public PingAndWarmResponse apply(PingAndWarmRequest pingAndWarmRequest) {

assertThat(logHandler.logs).hasSize(1);
for (LogRecord log : logHandler.logs) {
assertThat(log.getMessage()).contains("FAILED_PRECONDITION");
assertThat(log.getThrown().getMessage()).contains("FAILED_PRECONDITION");
}
}

Expand All @@ -146,7 +148,21 @@ public void testChannelErrorsAreLogged() {

assertThat(logHandler.logs).hasSize(1);
for (LogRecord log : logHandler.logs) {
assertThat(log.getMessage()).contains("UnsupportedOperationException");
assertThat(log.getThrown()).isInstanceOf(UnsupportedOperationException.class);
}
}

@Test
public void testHeadersAreSent() {
primer.primeChannel(channel);

for (Metadata metadata : metadataInterceptor.metadataList) {
assertThat(metadata.get(BigtableChannelPrimer.REQUEST_PARAMS))
.isEqualTo(
"name=projects%2Ffake-project%2Finstances%2Ffake-instance&app_profile_id=fake-app-profile");
assertThat(
metadata.get(Metadata.Key.of("bigtable-feature", Metadata.ASCII_STRING_MARSHALLER)))
.isEqualTo("fake-feature");
}
}

Expand Down

0 comments on commit b76698d

Please sign in to comment.