Skip to content

Commit

Permalink
chore: refactor to wrap client context in BigtableClientContext (#2433)
Browse files Browse the repository at this point in the history
Refactor ClientContext creation.

We need to create OpenTelemetry before client context is created so we can inject the PerConnectionErrorTracker interceptor on the ManagedChannel.
We need to access the open telemetry instance later when we create the TracerFactory.

This PR creates a new BigtableCleintContext class that wraps gax ClientContext and OpenTelemetry so we can access both later to avoid creating a global open telemetry instance.

Also moved client context creation logic from EnhancedBigtableStub to BigtableClientContext.
  • Loading branch information
mutianf authored Nov 27, 2024
1 parent 3156889 commit bfa156d
Show file tree
Hide file tree
Showing 6 changed files with 283 additions and 239 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,10 @@
package com.google.cloud.bigtable.data.v2;

import com.google.api.core.BetaApi;
import com.google.api.gax.core.BackgroundResource;
import com.google.api.gax.rpc.ClientContext;
import com.google.cloud.bigtable.data.v2.stub.BigtableClientContext;
import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStub;
import io.opentelemetry.api.OpenTelemetry;
import java.io.IOException;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nonnull;

/**
Expand Down Expand Up @@ -66,11 +63,8 @@
@BetaApi("This feature is currently experimental and can change in the future")
public final class BigtableDataClientFactory implements AutoCloseable {

private static final Logger logger = Logger.getLogger(BigtableDataClientFactory.class.getName());

private final BigtableDataSettings defaultSettings;
private final ClientContext sharedClientContext;
private final OpenTelemetry openTelemetry;
private final BigtableClientContext sharedClientContext;

/**
* Create a instance of this factory.
Expand All @@ -80,31 +74,16 @@ public final class BigtableDataClientFactory implements AutoCloseable {
*/
public static BigtableDataClientFactory create(BigtableDataSettings defaultSettings)
throws IOException {
ClientContext sharedClientContext =
EnhancedBigtableStub.createClientContext(defaultSettings.getStubSettings());
OpenTelemetry openTelemetry = null;
try {
// We don't want client side metrics to crash the client, so catch any exception when getting
// the OTEL instance and log the exception instead.
openTelemetry =
EnhancedBigtableStub.getOpenTelemetry(
defaultSettings.getProjectId(),
defaultSettings.getMetricsProvider(),
sharedClientContext.getCredentials(),
defaultSettings.getStubSettings().getMetricsEndpoint());
} catch (Throwable t) {
logger.log(Level.WARNING, "Failed to get OTEL, will skip exporting client side metrics", t);
}
return new BigtableDataClientFactory(sharedClientContext, defaultSettings, openTelemetry);
BigtableClientContext sharedClientContext =
EnhancedBigtableStub.createBigtableClientContext(defaultSettings.getStubSettings());

return new BigtableDataClientFactory(sharedClientContext, defaultSettings);
}

private BigtableDataClientFactory(
ClientContext sharedClientContext,
BigtableDataSettings defaultSettings,
OpenTelemetry openTelemetry) {
BigtableClientContext sharedClientContext, BigtableDataSettings defaultSettings) {
this.sharedClientContext = sharedClientContext;
this.defaultSettings = defaultSettings;
this.openTelemetry = openTelemetry;
}

/**
Expand All @@ -114,9 +93,7 @@ private BigtableDataClientFactory(
*/
@Override
public void close() throws Exception {
for (BackgroundResource resource : sharedClientContext.getBackgroundResources()) {
resource.close();
}
sharedClientContext.close();
}

/**
Expand All @@ -132,10 +109,11 @@ public BigtableDataClient createDefault() {
try {
ClientContext clientContext =
sharedClientContext
.getClientContext()
.toBuilder()
.setTracerFactory(
EnhancedBigtableStub.createBigtableTracerFactory(
defaultSettings.getStubSettings(), openTelemetry))
defaultSettings.getStubSettings(), sharedClientContext.getOpenTelemetry()))
.build();

return BigtableDataClient.createWithClientContext(defaultSettings, clientContext);
Expand All @@ -161,10 +139,11 @@ public BigtableDataClient createForAppProfile(@Nonnull String appProfileId) thro

ClientContext clientContext =
sharedClientContext
.getClientContext()
.toBuilder()
.setTracerFactory(
EnhancedBigtableStub.createBigtableTracerFactory(
settings.getStubSettings(), openTelemetry))
settings.getStubSettings(), sharedClientContext.getOpenTelemetry()))
.build();
return BigtableDataClient.createWithClientContext(settings, clientContext);
}
Expand All @@ -190,10 +169,11 @@ public BigtableDataClient createForInstance(@Nonnull String projectId, @Nonnull

ClientContext clientContext =
sharedClientContext
.getClientContext()
.toBuilder()
.setTracerFactory(
EnhancedBigtableStub.createBigtableTracerFactory(
settings.getStubSettings(), openTelemetry))
settings.getStubSettings(), sharedClientContext.getOpenTelemetry()))
.build();

return BigtableDataClient.createWithClientContext(settings, clientContext);
Expand All @@ -220,10 +200,11 @@ public BigtableDataClient createForInstance(
.build();
ClientContext clientContext =
sharedClientContext
.getClientContext()
.toBuilder()
.setTracerFactory(
EnhancedBigtableStub.createBigtableTracerFactory(
settings.getStubSettings(), openTelemetry))
settings.getStubSettings(), sharedClientContext.getOpenTelemetry()))
.build();
return BigtableDataClient.createWithClientContext(settings, clientContext);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.stub;

import com.google.api.core.ApiFunction;
import com.google.api.core.InternalApi;
import com.google.api.gax.core.BackgroundResource;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
import com.google.api.gax.rpc.ClientContext;
import com.google.auth.Credentials;
import com.google.auth.oauth2.ServiceAccountJwtAccessCredentials;
import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
import com.google.cloud.bigtable.data.v2.internal.JwtCredentialsWithAudience;
import com.google.cloud.bigtable.data.v2.stub.metrics.CustomOpenTelemetryMetricsProvider;
import com.google.cloud.bigtable.data.v2.stub.metrics.DefaultMetricsProvider;
import com.google.cloud.bigtable.data.v2.stub.metrics.ErrorCountPerConnectionMetricTracker;
import com.google.cloud.bigtable.data.v2.stub.metrics.MetricsProvider;
import com.google.cloud.bigtable.data.v2.stub.metrics.NoopMetricsProvider;
import io.grpc.ManagedChannelBuilder;
import io.opentelemetry.api.OpenTelemetry;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;

/**
* This class wraps all state needed during the lifetime of the Bigtable client. This includes gax's
* {@link ClientContext} plus any additional state that Bigtable Client needs.
*/
@InternalApi
public class BigtableClientContext {

private static final Logger logger = Logger.getLogger(BigtableClientContext.class.getName());

@Nullable private final OpenTelemetry openTelemetry;
private final ClientContext clientContext;

public static BigtableClientContext create(EnhancedBigtableStubSettings settings)
throws IOException {
EnhancedBigtableStubSettings.Builder builder = settings.toBuilder();

// Set up credentials
patchCredentials(builder);

// Fix the credentials so that they can be shared
Credentials credentials = null;
if (builder.getCredentialsProvider() != null) {
credentials = builder.getCredentialsProvider().getCredentials();
}
builder.setCredentialsProvider(FixedCredentialsProvider.create(credentials));

// Set up OpenTelemetry
OpenTelemetry openTelemetry = null;
try {
// We don't want client side metrics to crash the client, so catch any exception when getting
// the OTEL instance and log the exception instead.
// TODO openTelemetry doesn't need to be tied to a project id. This is incorrect and will be
// fixed in the following PR.
openTelemetry =
getOpenTelemetryFromMetricsProvider(
settings.getProjectId(),
settings.getMetricsProvider(),
credentials,
settings.getMetricsEndpoint());
} catch (Throwable t) {
logger.log(Level.WARNING, "Failed to get OTEL, will skip exporting client side metrics", t);
}

// Set up channel
InstantiatingGrpcChannelProvider.Builder transportProvider =
builder.getTransportChannelProvider() instanceof InstantiatingGrpcChannelProvider
? ((InstantiatingGrpcChannelProvider) builder.getTransportChannelProvider()).toBuilder()
: null;

ErrorCountPerConnectionMetricTracker errorCountPerConnectionMetricTracker = null;

if (transportProvider != null) {
// Set up cookie holder if routing cookie is enabled
if (builder.getEnableRoutingCookie()) {
setupCookieHolder(transportProvider);
}
// Set up per connection error count tracker if OpenTelemetry is not null
if (openTelemetry != null) {
errorCountPerConnectionMetricTracker =
setupPerConnectionErrorTracer(builder, transportProvider, openTelemetry);
}
// Inject channel priming if enabled
if (builder.isRefreshingChannel()) {
transportProvider.setChannelPrimer(
BigtableChannelPrimer.create(
credentials,
settings.getProjectId(),
settings.getInstanceId(),
settings.getAppProfileId()));
}

builder.setTransportChannelProvider(transportProvider.build());
}

ClientContext clientContext = ClientContext.create(builder.build());

if (errorCountPerConnectionMetricTracker != null) {
errorCountPerConnectionMetricTracker.startConnectionErrorCountTracker(
clientContext.getExecutor());
}

return new BigtableClientContext(clientContext, openTelemetry);
}

private BigtableClientContext(ClientContext clientContext, OpenTelemetry openTelemetry) {
this.clientContext = clientContext;
this.openTelemetry = openTelemetry;
}

public OpenTelemetry getOpenTelemetry() {
return this.openTelemetry;
}

public ClientContext getClientContext() {
return this.clientContext;
}

public void close() throws Exception {
for (BackgroundResource resource : clientContext.getBackgroundResources()) {
resource.close();
}
}

private static OpenTelemetry getOpenTelemetryFromMetricsProvider(
String projectId,
MetricsProvider metricsProvider,
@Nullable Credentials defaultCredentials,
@Nullable String metricsEndpoint)
throws IOException {
if (metricsProvider instanceof CustomOpenTelemetryMetricsProvider) {
CustomOpenTelemetryMetricsProvider customMetricsProvider =
(CustomOpenTelemetryMetricsProvider) metricsProvider;
return customMetricsProvider.getOpenTelemetry();
} else if (metricsProvider instanceof DefaultMetricsProvider) {
Credentials credentials =
BigtableDataSettings.getMetricsCredentials() != null
? BigtableDataSettings.getMetricsCredentials()
: defaultCredentials;
DefaultMetricsProvider defaultMetricsProvider = (DefaultMetricsProvider) metricsProvider;
return defaultMetricsProvider.getOpenTelemetry(projectId, metricsEndpoint, credentials);
} else if (metricsProvider instanceof NoopMetricsProvider) {
return null;
}
throw new IOException("Invalid MetricsProvider type " + metricsProvider);
}

private static void patchCredentials(EnhancedBigtableStubSettings.Builder settings)
throws IOException {
int i = settings.getEndpoint().lastIndexOf(":");
String host = settings.getEndpoint().substring(0, i);
String audience = settings.getJwtAudienceMapping().get(host);

if (audience == null) {
return;
}
URI audienceUri = null;
try {
audienceUri = new URI(audience);
} catch (URISyntaxException e) {
throw new IllegalStateException("invalid JWT audience override", e);
}

CredentialsProvider credentialsProvider = settings.getCredentialsProvider();
if (credentialsProvider == null) {
return;
}

Credentials credentials = credentialsProvider.getCredentials();
if (credentials == null) {
return;
}

if (!(credentials instanceof ServiceAccountJwtAccessCredentials)) {
return;
}

ServiceAccountJwtAccessCredentials jwtCreds = (ServiceAccountJwtAccessCredentials) credentials;
JwtCredentialsWithAudience patchedCreds = new JwtCredentialsWithAudience(jwtCreds, audienceUri);
settings.setCredentialsProvider(FixedCredentialsProvider.create(patchedCreds));
}

private static ErrorCountPerConnectionMetricTracker setupPerConnectionErrorTracer(
EnhancedBigtableStubSettings.Builder builder,
InstantiatingGrpcChannelProvider.Builder transportProvider,
OpenTelemetry openTelemetry) {
ErrorCountPerConnectionMetricTracker errorCountPerConnectionMetricTracker =
new ErrorCountPerConnectionMetricTracker(
openTelemetry, EnhancedBigtableStub.createBuiltinAttributes(builder.build()));
ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder> oldChannelConfigurator =
transportProvider.getChannelConfigurator();
transportProvider.setChannelConfigurator(
managedChannelBuilder -> {
managedChannelBuilder.intercept(errorCountPerConnectionMetricTracker.getInterceptor());

if (oldChannelConfigurator != null) {
managedChannelBuilder = oldChannelConfigurator.apply(managedChannelBuilder);
}
return managedChannelBuilder;
});
return errorCountPerConnectionMetricTracker;
}

private static void setupCookieHolder(
InstantiatingGrpcChannelProvider.Builder transportProvider) {
ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder> oldChannelConfigurator =
transportProvider.getChannelConfigurator();
transportProvider.setChannelConfigurator(
managedChannelBuilder -> {
managedChannelBuilder.intercept(new CookiesInterceptor());

if (oldChannelConfigurator != null) {
managedChannelBuilder = oldChannelConfigurator.apply(managedChannelBuilder);
}
return managedChannelBuilder;
});
}
}
Loading

0 comments on commit bfa156d

Please sign in to comment.