Skip to content

Commit

Permalink
chore: add gax package private classes
Browse files Browse the repository at this point in the history
  • Loading branch information
mutianf committed Dec 7, 2023
1 parent 4b64482 commit 41c5355
Show file tree
Hide file tree
Showing 4 changed files with 607 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.gaxx.retrying;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.core.InternalApi;
import com.google.api.gax.retrying.NonCancellableFuture;
import com.google.api.gax.retrying.RetryingFuture;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.common.base.Preconditions;
import java.util.concurrent.Callable;
import org.threeten.bp.Duration;

// TODO: remove this once ApiResultRetryAlgorithm is added to gax.
/**
* A callable representing an attempt to make an RPC call. This class is used from {@link
* com.google.cloud.bigtable.gaxx.retrying.RetryingCallable}.
*
* @param <RequestT> request type
* @param <ResponseT> response type
*/
@InternalApi
public class AttemptCallable<RequestT, ResponseT> implements Callable<ResponseT> {
private final UnaryCallable<RequestT, ResponseT> callable;
private final RequestT request;
private final ApiCallContext originalCallContext;

private volatile RetryingFuture<ResponseT> externalFuture;

AttemptCallable(
UnaryCallable<RequestT, ResponseT> callable, RequestT request, ApiCallContext callContext) {
this.callable = Preconditions.checkNotNull(callable);
this.request = Preconditions.checkNotNull(request);
this.originalCallContext = Preconditions.checkNotNull(callContext);
}

public void setExternalFuture(RetryingFuture<ResponseT> externalFuture) {
this.externalFuture = Preconditions.checkNotNull(externalFuture);
}

@Override
public ResponseT call() {
ApiCallContext callContext = originalCallContext;

try {
// Set the RPC timeout if the caller did not provide their own.
Duration rpcTimeout = externalFuture.getAttemptSettings().getRpcTimeout();
if (!rpcTimeout.isZero() && callContext.getTimeout() == null) {
callContext = callContext.withTimeout(rpcTimeout);
}

externalFuture.setAttemptFuture(new NonCancellableFuture<ResponseT>());
if (externalFuture.isDone()) {
return null;
}

callContext
.getTracer()
.attemptStarted(request, externalFuture.getAttemptSettings().getOverallAttemptCount());

ApiFuture<ResponseT> internalFuture = callable.futureCall(request, callContext);
externalFuture.setAttemptFuture(internalFuture);
} catch (Throwable e) {
externalFuture.setAttemptFuture(ApiFutures.<ResponseT>immediateFailedFuture(e));
}

return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.gaxx.retrying;

import com.google.api.core.InternalApi;
import com.google.api.gax.retrying.RetryingExecutorWithContext;
import com.google.api.gax.retrying.RetryingFuture;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.common.base.Preconditions;

// TODO: remove this once ApiResultRetryAlgorithm is added to gax.
/**
* A UnaryCallable that will keep issuing calls to an inner callable until it succeeds or times out.
*/
@InternalApi
public class RetryingCallable<RequestT, ResponseT> extends UnaryCallable<RequestT, ResponseT> {
private final ApiCallContext callContextPrototype;
private final UnaryCallable<RequestT, ResponseT> callable;
private final RetryingExecutorWithContext<ResponseT> executor;

public RetryingCallable(
ApiCallContext callContextPrototype,
UnaryCallable<RequestT, ResponseT> callable,
RetryingExecutorWithContext<ResponseT> executor) {
this.callContextPrototype = (ApiCallContext) Preconditions.checkNotNull(callContextPrototype);
this.callable = (UnaryCallable) Preconditions.checkNotNull(callable);
this.executor = (RetryingExecutorWithContext) Preconditions.checkNotNull(executor);
}

public RetryingFuture<ResponseT> futureCall(RequestT request, ApiCallContext inputContext) {
ApiCallContext context = this.callContextPrototype.nullToSelf(inputContext);
AttemptCallable<RequestT, ResponseT> retryCallable =
new AttemptCallable(this.callable, request, context);
RetryingFuture<ResponseT> retryingFuture =
this.executor.createFuture(retryCallable, inputContext);
retryCallable.setExternalFuture(retryingFuture);
retryCallable.call();
return retryingFuture;
}

public String toString() {
return String.format("retrying(%s)", this.callable);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.gaxx.retrying;

import static com.google.common.util.concurrent.MoreExecutors.directExecutor;

import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.core.InternalApi;
import com.google.api.gax.retrying.RetryingFuture;
import com.google.api.gax.retrying.ScheduledRetryingExecutor;
import com.google.api.gax.retrying.ServerStreamingAttemptException;
import com.google.api.gax.retrying.StreamResumptionStrategy;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.ServerStreamingCallable;

// TODO: remove this once ApiResultRetryAlgorithm is added to gax.
/**
* A ServerStreamingCallable that implements resumable retries.
*
* <p>Wraps a request, a {@link ResponseObserver} and an inner {@link ServerStreamingCallable} and
* coordinates retries between them. When the inner callable throws an error, this class will
* schedule retries using the configured {@link ScheduledRetryingExecutor}.
*
* <p>Streams can be resumed using a {@link StreamResumptionStrategy}. The {@link
* StreamResumptionStrategy} is notified of incoming responses and is expected to track the progress
* of the stream. Upon receiving an error, the {@link StreamResumptionStrategy} is asked to modify
* the original request to resume the stream.
*/
@InternalApi
public final class RetryingServerStreamingCallable<RequestT, ResponseT>
extends ServerStreamingCallable<RequestT, ResponseT> {

private final ServerStreamingCallable<RequestT, ResponseT> innerCallable;
private final ScheduledRetryingExecutor<Void> executor;
private final StreamResumptionStrategy<RequestT, ResponseT> resumptionStrategyPrototype;

public RetryingServerStreamingCallable(
ServerStreamingCallable<RequestT, ResponseT> innerCallable,
ScheduledRetryingExecutor<Void> executor,
StreamResumptionStrategy<RequestT, ResponseT> resumptionStrategyPrototype) {
this.innerCallable = innerCallable;
this.executor = executor;
this.resumptionStrategyPrototype = resumptionStrategyPrototype;
}

@Override
public void call(
RequestT request,
final ResponseObserver<ResponseT> responseObserver,
ApiCallContext context) {

ServerStreamingAttemptCallable<RequestT, ResponseT> attemptCallable =
new ServerStreamingAttemptCallable<>(
innerCallable,
resumptionStrategyPrototype.createNew(),
request,
context,
responseObserver);

RetryingFuture<Void> retryingFuture = executor.createFuture(attemptCallable, context);
attemptCallable.setExternalFuture(retryingFuture);
attemptCallable.start();

// Bridge the future result back to the external responseObserver
ApiFutures.addCallback(
retryingFuture,
new ApiFutureCallback<Void>() {
@Override
public void onFailure(Throwable throwable) {
// Make sure to unwrap the underlying ApiException
if (throwable instanceof ServerStreamingAttemptException) {
throwable = throwable.getCause();
}
responseObserver.onError(throwable);
}

@Override
public void onSuccess(Void ignored) {
responseObserver.onComplete();
}
},
directExecutor());
}
}
Loading

0 comments on commit 41c5355

Please sign in to comment.