Skip to content

Commit

Permalink
chore: add gax package private classes (#2027)
Browse files Browse the repository at this point in the history
* chore: add gax package private classes

* fix format

* add callables

* update retry algorithm to be in sync with gax

* fix clirr

* mark as internal

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

---------

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
mutianf and gcf-owl-bot[bot] authored Dec 13, 2023
1 parent 24f8cc0 commit ccc2764
Show file tree
Hide file tree
Showing 8 changed files with 741 additions and 23 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ If you are using Maven without the BOM, add this to your dependencies:
If you are using Gradle 5.x or later, add this to your dependencies:

```Groovy
implementation platform('com.google.cloud:libraries-bom:26.28.0')
implementation platform('com.google.cloud:libraries-bom:26.29.0')
implementation 'com.google.cloud:google-cloud-bigtable'
```
Expand Down
6 changes: 6 additions & 0 deletions google-cloud-bigtable/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -150,4 +150,10 @@
<differenceType>8001</differenceType>
<className>com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerBatchedUnaryCallable</className>
</difference>
<!-- InternalApi was updated -->
<difference>
<differenceType>6001</differenceType>
<className>com/google/cloud/bigtable/gaxx/retrying/ApiResultRetryAlgorithm</className>
<field>*</field>
</difference>
</differences>
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,38 @@
package com.google.cloud.bigtable.gaxx.retrying;

import com.google.api.core.InternalApi;
import com.google.api.gax.retrying.ResultRetryAlgorithm;
import com.google.api.gax.retrying.TimedAttemptSettings;
import com.google.api.gax.retrying.BasicResultRetryAlgorithm;
import com.google.api.gax.retrying.RetryingContext;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.DeadlineExceededException;
import org.threeten.bp.Duration;

/** For internal use, public for technical reasons. */
@InternalApi
public class ApiResultRetryAlgorithm<ResponseT> implements ResultRetryAlgorithm<ResponseT> {
// Duration to sleep on if the error is DEADLINE_EXCEEDED.
public static final Duration DEADLINE_SLEEP_DURATION = Duration.ofMillis(1);
public class ApiResultRetryAlgorithm<ResponseT> extends BasicResultRetryAlgorithm<ResponseT> {

/** Returns true if previousThrowable is an {@link ApiException} that is retryable. */
@Override
public TimedAttemptSettings createNextAttempt(
Throwable prevThrowable, ResponseT prevResponse, TimedAttemptSettings prevSettings) {
if (prevThrowable != null && prevThrowable instanceof DeadlineExceededException) {
return TimedAttemptSettings.newBuilder()
.setGlobalSettings(prevSettings.getGlobalSettings())
.setRetryDelay(prevSettings.getRetryDelay())
.setRpcTimeout(prevSettings.getRpcTimeout())
.setRandomizedRetryDelay(DEADLINE_SLEEP_DURATION)
.setAttemptCount(prevSettings.getAttemptCount() + 1)
.setFirstAttemptStartTimeNanos(prevSettings.getFirstAttemptStartTimeNanos())
.build();
}
return null;
public boolean shouldRetry(Throwable previousThrowable, ResponseT previousResponse) {
return (previousThrowable instanceof ApiException)
&& ((ApiException) previousThrowable).isRetryable();
}

/**
* If {@link RetryingContext#getRetryableCodes()} is not null: Returns true if the status code of
* previousThrowable is in the list of retryable code of the {@link RetryingContext}.
*
* <p>Otherwise it returns the result of {@link #shouldRetry(Throwable, Object)}.
*/
@Override
public boolean shouldRetry(Throwable prevThrowable, ResponseT prevResponse) {
return (prevThrowable instanceof ApiException) && ((ApiException) prevThrowable).isRetryable();
public boolean shouldRetry(
RetryingContext context, Throwable previousThrowable, ResponseT previousResponse) {
if (context.getRetryableCodes() != null) {
// Ignore the isRetryable() value of the throwable if the RetryingContext has a specific list
// of codes that should be retried.
return (previousThrowable instanceof ApiException)
&& context
.getRetryableCodes()
.contains(((ApiException) previousThrowable).getStatusCode().getCode());
}
return shouldRetry(previousThrowable, previousResponse);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.gaxx.retrying;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.core.InternalApi;
import com.google.api.gax.retrying.NonCancellableFuture;
import com.google.api.gax.retrying.RetryingFuture;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.common.base.Preconditions;
import java.util.concurrent.Callable;
import org.threeten.bp.Duration;

// TODO: remove this once ApiResultRetryAlgorithm is added to gax.
/**
* A callable representing an attempt to make an RPC call. This class is used from {@link
* RetryingCallable}.
*
* @param <RequestT> request type
* @param <ResponseT> response type
*/
@InternalApi
public class AttemptCallable<RequestT, ResponseT> implements Callable<ResponseT> {
private final UnaryCallable<RequestT, ResponseT> callable;
private final RequestT request;
private final ApiCallContext originalCallContext;

private volatile RetryingFuture<ResponseT> externalFuture;

AttemptCallable(
UnaryCallable<RequestT, ResponseT> callable, RequestT request, ApiCallContext callContext) {
this.callable = Preconditions.checkNotNull(callable);
this.request = Preconditions.checkNotNull(request);
this.originalCallContext = Preconditions.checkNotNull(callContext);
}

public void setExternalFuture(RetryingFuture<ResponseT> externalFuture) {
this.externalFuture = Preconditions.checkNotNull(externalFuture);
}

@Override
public ResponseT call() {
ApiCallContext callContext = originalCallContext;

try {
// Set the RPC timeout if the caller did not provide their own.
Duration rpcTimeout = externalFuture.getAttemptSettings().getRpcTimeout();
if (!rpcTimeout.isZero() && callContext.getTimeout() == null) {
callContext = callContext.withTimeout(rpcTimeout);
}

externalFuture.setAttemptFuture(new NonCancellableFuture<ResponseT>());
if (externalFuture.isDone()) {
return null;
}

callContext
.getTracer()
.attemptStarted(request, externalFuture.getAttemptSettings().getOverallAttemptCount());

ApiFuture<ResponseT> internalFuture = callable.futureCall(request, callContext);
externalFuture.setAttemptFuture(internalFuture);
} catch (Throwable e) {
externalFuture.setAttemptFuture(ApiFutures.<ResponseT>immediateFailedFuture(e));
}

return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.gaxx.retrying;

import com.google.api.core.InternalApi;
import com.google.api.gax.retrying.ExponentialRetryAlgorithm;
import com.google.api.gax.retrying.RetryAlgorithm;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.retrying.ScheduledRetryingExecutor;
import com.google.api.gax.retrying.StreamingRetryAlgorithm;
import com.google.api.gax.rpc.ClientContext;
import com.google.api.gax.rpc.ServerStreamingCallSettings;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.StatusCode;
import com.google.api.gax.rpc.UnaryCallSettings;
import com.google.api.gax.rpc.UnaryCallable;
import java.util.Collection;

// TODO: remove this once ApiResultRetryAlgorithm is added to gax.
/**
* Class with utility methods to create callable objects using provided settings.
*
* <p>The callable objects wrap a given direct callable with features like retry and exception
* translation.
*/
@InternalApi
public class Callables {

private Callables() {}

public static <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> retrying(
UnaryCallable<RequestT, ResponseT> innerCallable,
UnaryCallSettings<?, ?> callSettings,
ClientContext clientContext) {

UnaryCallSettings<?, ?> settings = callSettings;

if (areRetriesDisabled(settings.getRetryableCodes(), settings.getRetrySettings())) {
// When retries are disabled, the total timeout can be treated as the rpc timeout.
settings =
settings
.toBuilder()
.setSimpleTimeoutNoRetries(settings.getRetrySettings().getTotalTimeout())
.build();
}

RetryAlgorithm<ResponseT> retryAlgorithm =
new RetryAlgorithm<>(
new ApiResultRetryAlgorithm<ResponseT>(),
new ExponentialRetryAlgorithm(settings.getRetrySettings(), clientContext.getClock()));
ScheduledRetryingExecutor<ResponseT> retryingExecutor =
new ScheduledRetryingExecutor<>(retryAlgorithm, clientContext.getExecutor());
return new RetryingCallable<>(
clientContext.getDefaultCallContext(), innerCallable, retryingExecutor);
}

public static <RequestT, ResponseT> ServerStreamingCallable<RequestT, ResponseT> retrying(
ServerStreamingCallable<RequestT, ResponseT> innerCallable,
ServerStreamingCallSettings<RequestT, ResponseT> callSettings,
ClientContext clientContext) {

ServerStreamingCallSettings<RequestT, ResponseT> settings = callSettings;
if (areRetriesDisabled(settings.getRetryableCodes(), settings.getRetrySettings())) {
// When retries are disabled, the total timeout can be treated as the rpc timeout.
settings =
settings
.toBuilder()
.setSimpleTimeoutNoRetries(settings.getRetrySettings().getTotalTimeout())
.build();
}

StreamingRetryAlgorithm<Void> retryAlgorithm =
new StreamingRetryAlgorithm<>(
new ApiResultRetryAlgorithm<Void>(),
new ExponentialRetryAlgorithm(settings.getRetrySettings(), clientContext.getClock()));

ScheduledRetryingExecutor<Void> retryingExecutor =
new ScheduledRetryingExecutor<>(retryAlgorithm, clientContext.getExecutor());

return new RetryingServerStreamingCallable<>(
innerCallable, retryingExecutor, settings.getResumptionStrategy());
}

private static boolean areRetriesDisabled(
Collection<StatusCode.Code> retryableCodes, RetrySettings retrySettings) {
return retrySettings.getMaxAttempts() == 1
|| retryableCodes.isEmpty()
|| (retrySettings.getMaxAttempts() == 0 && retrySettings.getTotalTimeout().isZero());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.gaxx.retrying;

import com.google.api.core.InternalApi;
import com.google.api.gax.retrying.RetryingExecutorWithContext;
import com.google.api.gax.retrying.RetryingFuture;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.common.base.Preconditions;

// TODO: remove this once ApiResultRetryAlgorithm is added to gax.
/**
* A UnaryCallable that will keep issuing calls to an inner callable until it succeeds or times out.
*/
@InternalApi
public class RetryingCallable<RequestT, ResponseT> extends UnaryCallable<RequestT, ResponseT> {
private final ApiCallContext callContextPrototype;
private final UnaryCallable<RequestT, ResponseT> callable;
private final RetryingExecutorWithContext<ResponseT> executor;

public RetryingCallable(
ApiCallContext callContextPrototype,
UnaryCallable<RequestT, ResponseT> callable,
RetryingExecutorWithContext<ResponseT> executor) {
this.callContextPrototype = (ApiCallContext) Preconditions.checkNotNull(callContextPrototype);
this.callable = (UnaryCallable) Preconditions.checkNotNull(callable);
this.executor = (RetryingExecutorWithContext) Preconditions.checkNotNull(executor);
}

public RetryingFuture<ResponseT> futureCall(RequestT request, ApiCallContext inputContext) {
ApiCallContext context = this.callContextPrototype.nullToSelf(inputContext);
AttemptCallable<RequestT, ResponseT> retryCallable =
new AttemptCallable(this.callable, request, context);
RetryingFuture<ResponseT> retryingFuture =
this.executor.createFuture(retryCallable, inputContext);
retryCallable.setExternalFuture(retryingFuture);
retryCallable.call();
return retryingFuture;
}

public String toString() {
return String.format("retrying(%s)", this.callable);
}
}
Loading

0 comments on commit ccc2764

Please sign in to comment.