Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
mutianf committed Nov 27, 2024
1 parent 1d6f3b9 commit 3b3138e
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 127 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package com.google.cloud.bigtable.data.v2;

import com.google.api.core.BetaApi;
import com.google.api.gax.core.BackgroundResource;
import com.google.api.gax.rpc.ClientContext;
import com.google.cloud.bigtable.data.v2.stub.BigtableClientContext;
import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStub;
Expand Down Expand Up @@ -94,10 +93,7 @@ private BigtableDataClientFactory(
*/
@Override
public void close() throws Exception {
for (BackgroundResource resource :
sharedClientContext.getClientContext().getBackgroundResources()) {
resource.close();
}
sharedClientContext.close();
}

/**
Expand All @@ -117,7 +113,7 @@ public BigtableDataClient createDefault() {
.toBuilder()
.setTracerFactory(
EnhancedBigtableStub.createBigtableTracerFactory(
defaultSettings.getStubSettings(), sharedClientContext.createOpenTelemetry()))
defaultSettings.getStubSettings(), sharedClientContext.getOpenTelemetry()))
.build();

return BigtableDataClient.createWithClientContext(defaultSettings, clientContext);
Expand Down Expand Up @@ -147,7 +143,7 @@ public BigtableDataClient createForAppProfile(@Nonnull String appProfileId) thro
.toBuilder()
.setTracerFactory(
EnhancedBigtableStub.createBigtableTracerFactory(
settings.getStubSettings(), sharedClientContext.createOpenTelemetry()))
settings.getStubSettings(), sharedClientContext.getOpenTelemetry()))
.build();
return BigtableDataClient.createWithClientContext(settings, clientContext);
}
Expand Down Expand Up @@ -177,7 +173,7 @@ public BigtableDataClient createForInstance(@Nonnull String projectId, @Nonnull
.toBuilder()
.setTracerFactory(
EnhancedBigtableStub.createBigtableTracerFactory(
settings.getStubSettings(), sharedClientContext.createOpenTelemetry()))
settings.getStubSettings(), sharedClientContext.getOpenTelemetry()))
.build();

return BigtableDataClient.createWithClientContext(settings, clientContext);
Expand Down Expand Up @@ -208,7 +204,7 @@ public BigtableDataClient createForInstance(
.toBuilder()
.setTracerFactory(
EnhancedBigtableStub.createBigtableTracerFactory(
settings.getStubSettings(), sharedClientContext.createOpenTelemetry()))
settings.getStubSettings(), sharedClientContext.getOpenTelemetry()))
.build();
return BigtableDataClient.createWithClientContext(settings, clientContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,27 @@
*/
package com.google.cloud.bigtable.data.v2.stub;

import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.APP_PROFILE_KEY;
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.BIGTABLE_PROJECT_ID_KEY;
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.CLIENT_NAME_KEY;
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.INSTANCE_ID_KEY;

import com.google.api.core.ApiFunction;
import com.google.api.core.InternalApi;
import com.google.api.gax.core.BackgroundResource;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
import com.google.api.gax.rpc.ClientContext;
import com.google.auth.Credentials;
import com.google.cloud.bigtable.Version;
import com.google.auth.oauth2.ServiceAccountJwtAccessCredentials;
import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
import com.google.cloud.bigtable.data.v2.internal.JwtCredentialsWithAudience;
import com.google.cloud.bigtable.data.v2.stub.metrics.CustomOpenTelemetryMetricsProvider;
import com.google.cloud.bigtable.data.v2.stub.metrics.DefaultMetricsProvider;
import com.google.cloud.bigtable.data.v2.stub.metrics.ErrorCountPerConnectionMetricTracker;
import com.google.cloud.bigtable.data.v2.stub.metrics.MetricsProvider;
import com.google.cloud.bigtable.data.v2.stub.metrics.NoopMetricsProvider;
import io.grpc.ManagedChannelBuilder;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.Attributes;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
Expand All @@ -49,107 +49,99 @@ public class BigtableClientContext {

private static final Logger logger = Logger.getLogger(BigtableClientContext.class.getName());

private OpenTelemetry openTelemetry = null;
@Nullable private final OpenTelemetry openTelemetry;
private final ClientContext clientContext;

public static BigtableClientContext create(EnhancedBigtableStubSettings settings)
throws IOException {
return new BigtableClientContext(settings);
}

private BigtableClientContext(EnhancedBigtableStubSettings settings) throws IOException {
Credentials credentials = settings.getCredentialsProvider().getCredentials();

EnhancedBigtableStubSettings.Builder builder = settings.toBuilder();

InstantiatingGrpcChannelProvider.Builder transportProvider =
builder.getTransportChannelProvider() instanceof InstantiatingGrpcChannelProvider
? ((InstantiatingGrpcChannelProvider) builder.getTransportChannelProvider()).toBuilder()
: null;
// Set up credentials
patchCredentials(builder);

// Fix the credentials so that they can be shared
Credentials credentials = null;
if (builder.getCredentialsProvider() != null) {
credentials = builder.getCredentialsProvider().getCredentials();
}
builder.setCredentialsProvider(FixedCredentialsProvider.create(credentials));

// Set up OpenTelemetry
OpenTelemetry openTelemetry = null;
try {
// We don't want client side metrics to crash the client, so catch any exception when getting
// the OTEL instance and log the exception instead.
this.openTelemetry =
createOpenTelemetry(
openTelemetry =
getOpenTelemetryFromMetricsProvider(
settings.getProjectId(),
settings.getMetricsProvider(),
credentials,
settings.getMetricsEndpoint());
} catch (Throwable t) {
logger.log(Level.WARNING, "Failed to get OTEL, will skip exporting client side metrics", t);
}
ErrorCountPerConnectionMetricTracker errorCountPerConnectionMetricTracker;
// Skip setting up ErrorCountPerConnectionMetricTracker if openTelemetry is null
if (openTelemetry != null && transportProvider != null) {
errorCountPerConnectionMetricTracker =
new ErrorCountPerConnectionMetricTracker(
openTelemetry, EnhancedBigtableStub.createBuiltinAttributes(settings));
ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder> oldChannelConfigurator =
transportProvider.getChannelConfigurator();
transportProvider.setChannelConfigurator(
managedChannelBuilder -> {
if (settings.getEnableRoutingCookie()) {
managedChannelBuilder.intercept(new CookiesInterceptor());
}

managedChannelBuilder.intercept(errorCountPerConnectionMetricTracker.getInterceptor());

if (oldChannelConfigurator != null) {
managedChannelBuilder = oldChannelConfigurator.apply(managedChannelBuilder);
}
return managedChannelBuilder;
});
} else {
errorCountPerConnectionMetricTracker = null;
}

// Inject channel priming
if (settings.isRefreshingChannel()) {
// Set up channel
InstantiatingGrpcChannelProvider.Builder transportProvider =
builder.getTransportChannelProvider() instanceof InstantiatingGrpcChannelProvider
? ((InstantiatingGrpcChannelProvider) builder.getTransportChannelProvider()).toBuilder()
: null;

ErrorCountPerConnectionMetricTracker errorCountPerConnectionMetricTracker = null;

if (transportProvider != null) {
if (transportProvider != null) {
// Set up cookie holder if routing cookie is enabled
if (builder.getEnableRoutingCookie()) {
setupCookieHolder(transportProvider);
}
// Set up per connection error count tracker if OpenTelemetry is not null
if (openTelemetry != null) {
errorCountPerConnectionMetricTracker =
setupPerConnectionErrorTracer(builder, transportProvider, openTelemetry);
}
// Inject channel priming if enabled
if (builder.isRefreshingChannel()) {
transportProvider.setChannelPrimer(
BigtableChannelPrimer.create(
credentials,
settings.getProjectId(),
settings.getInstanceId(),
settings.getAppProfileId()));
}
}

if (transportProvider != null) {
builder.setTransportChannelProvider(transportProvider.build());
}

clientContext = ClientContext.create(builder.build());
ClientContext clientContext = ClientContext.create(builder.build());

if (errorCountPerConnectionMetricTracker != null) {
errorCountPerConnectionMetricTracker.startConnectionErrorCountTracker(
clientContext.getExecutor());
}

return new BigtableClientContext(clientContext, openTelemetry);
}

public OpenTelemetry createOpenTelemetry() {
private BigtableClientContext(ClientContext clientContext, OpenTelemetry openTelemetry) {
this.clientContext = clientContext;
this.openTelemetry = openTelemetry;
}

public OpenTelemetry getOpenTelemetry() {
return this.openTelemetry;
}

public ClientContext getClientContext() {
return this.clientContext;
}

private static Attributes createBuiltinAttributes(EnhancedBigtableStubSettings settings) {
return Attributes.of(
BIGTABLE_PROJECT_ID_KEY,
settings.getProjectId(),
INSTANCE_ID_KEY,
settings.getInstanceId(),
APP_PROFILE_KEY,
settings.getAppProfileId(),
CLIENT_NAME_KEY,
"bigtable-java/" + Version.VERSION);
public void close() throws Exception {
for (BackgroundResource resource : clientContext.getBackgroundResources()) {
resource.close();
}
}

private static OpenTelemetry createOpenTelemetry(
private static OpenTelemetry getOpenTelemetryFromMetricsProvider(
String projectId,
MetricsProvider metricsProvider,
@Nullable Credentials defaultCredentials,
Expand All @@ -171,4 +163,75 @@ private static OpenTelemetry createOpenTelemetry(
}
throw new IOException("Invalid MetricsProvider type " + metricsProvider);
}

private static void patchCredentials(EnhancedBigtableStubSettings.Builder settings)
throws IOException {
int i = settings.getEndpoint().lastIndexOf(":");
String host = settings.getEndpoint().substring(0, i);
String audience = settings.getJwtAudienceMapping().get(host);

if (audience == null) {
return;
}
URI audienceUri = null;
try {
audienceUri = new URI(audience);
} catch (URISyntaxException e) {
throw new IllegalStateException("invalid JWT audience override", e);
}

CredentialsProvider credentialsProvider = settings.getCredentialsProvider();
if (credentialsProvider == null) {
return;
}

Credentials credentials = credentialsProvider.getCredentials();
if (credentials == null) {
return;
}

if (!(credentials instanceof ServiceAccountJwtAccessCredentials)) {
return;
}

ServiceAccountJwtAccessCredentials jwtCreds = (ServiceAccountJwtAccessCredentials) credentials;
JwtCredentialsWithAudience patchedCreds = new JwtCredentialsWithAudience(jwtCreds, audienceUri);
settings.setCredentialsProvider(FixedCredentialsProvider.create(patchedCreds));
}

private static ErrorCountPerConnectionMetricTracker setupPerConnectionErrorTracer(
EnhancedBigtableStubSettings.Builder builder,
InstantiatingGrpcChannelProvider.Builder transportProvider,
OpenTelemetry openTelemetry) {
ErrorCountPerConnectionMetricTracker errorCountPerConnectionMetricTracker =
new ErrorCountPerConnectionMetricTracker(
openTelemetry, EnhancedBigtableStub.createBuiltinAttributes(builder.build()));
ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder> oldChannelConfigurator =
transportProvider.getChannelConfigurator();
transportProvider.setChannelConfigurator(
managedChannelBuilder -> {
managedChannelBuilder.intercept(errorCountPerConnectionMetricTracker.getInterceptor());

if (oldChannelConfigurator != null) {
managedChannelBuilder = oldChannelConfigurator.apply(managedChannelBuilder);
}
return managedChannelBuilder;
});
return errorCountPerConnectionMetricTracker;
}

private static void setupCookieHolder(
InstantiatingGrpcChannelProvider.Builder transportProvider) {
ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder> oldChannelConfigurator =
transportProvider.getChannelConfigurator();
transportProvider.setChannelConfigurator(
managedChannelBuilder -> {
managedChannelBuilder.intercept(new CookiesInterceptor());

if (oldChannelConfigurator != null) {
managedChannelBuilder = oldChannelConfigurator.apply(managedChannelBuilder);
}
return managedChannelBuilder;
});
}
}
Loading

0 comments on commit 3b3138e

Please sign in to comment.