From 41c5355c0ccd0352dc0cbd5aee7d4fcea2d34d44 Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Thu, 7 Dec 2023 18:06:19 -0500 Subject: [PATCH 1/9] chore: add gax package private classes --- .../gaxx/retrying/AttemptCallable.java | 84 ++++ .../gaxx/retrying/RetryingCallable.java | 58 +++ .../RetryingServerStreamingCallable.java | 99 +++++ .../ServerStreamingAttemptCallable.java | 366 ++++++++++++++++++ 4 files changed, 607 insertions(+) create mode 100644 google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/AttemptCallable.java create mode 100644 google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/RetryingCallable.java create mode 100644 google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/RetryingServerStreamingCallable.java create mode 100644 google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/ServerStreamingAttemptCallable.java diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/AttemptCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/AttemptCallable.java new file mode 100644 index 0000000000..5bb8847e60 --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/AttemptCallable.java @@ -0,0 +1,84 @@ +/* +* Copyright 2023 Google LLC +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* https://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package com.google.cloud.bigtable.gaxx.retrying; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutures; +import com.google.api.core.InternalApi; +import com.google.api.gax.retrying.NonCancellableFuture; +import com.google.api.gax.retrying.RetryingFuture; +import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.UnaryCallable; +import com.google.common.base.Preconditions; +import java.util.concurrent.Callable; +import org.threeten.bp.Duration; + +// TODO: remove this once ApiResultRetryAlgorithm is added to gax. +/** + * A callable representing an attempt to make an RPC call. This class is used from {@link + * com.google.cloud.bigtable.gaxx.retrying.RetryingCallable}. + * + * @param request type + * @param response type + */ +@InternalApi +public class AttemptCallable implements Callable { + private final UnaryCallable callable; + private final RequestT request; + private final ApiCallContext originalCallContext; + + private volatile RetryingFuture externalFuture; + + AttemptCallable( + UnaryCallable callable, RequestT request, ApiCallContext callContext) { + this.callable = Preconditions.checkNotNull(callable); + this.request = Preconditions.checkNotNull(request); + this.originalCallContext = Preconditions.checkNotNull(callContext); + } + + public void setExternalFuture(RetryingFuture externalFuture) { + this.externalFuture = Preconditions.checkNotNull(externalFuture); + } + + @Override + public ResponseT call() { + ApiCallContext callContext = originalCallContext; + + try { + // Set the RPC timeout if the caller did not provide their own. + Duration rpcTimeout = externalFuture.getAttemptSettings().getRpcTimeout(); + if (!rpcTimeout.isZero() && callContext.getTimeout() == null) { + callContext = callContext.withTimeout(rpcTimeout); + } + + externalFuture.setAttemptFuture(new NonCancellableFuture()); + if (externalFuture.isDone()) { + return null; + } + + callContext + .getTracer() + .attemptStarted(request, externalFuture.getAttemptSettings().getOverallAttemptCount()); + + ApiFuture internalFuture = callable.futureCall(request, callContext); + externalFuture.setAttemptFuture(internalFuture); + } catch (Throwable e) { + externalFuture.setAttemptFuture(ApiFutures.immediateFailedFuture(e)); + } + + return null; + } +} \ No newline at end of file diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/RetryingCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/RetryingCallable.java new file mode 100644 index 0000000000..bfb5541624 --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/RetryingCallable.java @@ -0,0 +1,58 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.gaxx.retrying; + +import com.google.api.core.InternalApi; +import com.google.api.gax.retrying.RetryingExecutorWithContext; +import com.google.api.gax.retrying.RetryingFuture; +import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.UnaryCallable; +import com.google.common.base.Preconditions; + +// TODO: remove this once ApiResultRetryAlgorithm is added to gax. +/** + * A UnaryCallable that will keep issuing calls to an inner callable until it succeeds or times out. + */ +@InternalApi +public class RetryingCallable extends UnaryCallable { + private final ApiCallContext callContextPrototype; + private final UnaryCallable callable; + private final RetryingExecutorWithContext executor; + + public RetryingCallable( + ApiCallContext callContextPrototype, + UnaryCallable callable, + RetryingExecutorWithContext executor) { + this.callContextPrototype = (ApiCallContext) Preconditions.checkNotNull(callContextPrototype); + this.callable = (UnaryCallable) Preconditions.checkNotNull(callable); + this.executor = (RetryingExecutorWithContext) Preconditions.checkNotNull(executor); + } + + public RetryingFuture futureCall(RequestT request, ApiCallContext inputContext) { + ApiCallContext context = this.callContextPrototype.nullToSelf(inputContext); + AttemptCallable retryCallable = + new AttemptCallable(this.callable, request, context); + RetryingFuture retryingFuture = + this.executor.createFuture(retryCallable, inputContext); + retryCallable.setExternalFuture(retryingFuture); + retryCallable.call(); + return retryingFuture; + } + + public String toString() { + return String.format("retrying(%s)", this.callable); + } +} \ No newline at end of file diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/RetryingServerStreamingCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/RetryingServerStreamingCallable.java new file mode 100644 index 0000000000..5b693d9075 --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/RetryingServerStreamingCallable.java @@ -0,0 +1,99 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.gaxx.retrying; + +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; + +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; +import com.google.api.core.InternalApi; +import com.google.api.gax.retrying.RetryingFuture; +import com.google.api.gax.retrying.ScheduledRetryingExecutor; +import com.google.api.gax.retrying.ServerStreamingAttemptException; +import com.google.api.gax.retrying.StreamResumptionStrategy; +import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.ResponseObserver; +import com.google.api.gax.rpc.ServerStreamingCallable; + +// TODO: remove this once ApiResultRetryAlgorithm is added to gax. +/** + * A ServerStreamingCallable that implements resumable retries. + * + *

Wraps a request, a {@link ResponseObserver} and an inner {@link ServerStreamingCallable} and + * coordinates retries between them. When the inner callable throws an error, this class will + * schedule retries using the configured {@link ScheduledRetryingExecutor}. + * + *

Streams can be resumed using a {@link StreamResumptionStrategy}. The {@link + * StreamResumptionStrategy} is notified of incoming responses and is expected to track the progress + * of the stream. Upon receiving an error, the {@link StreamResumptionStrategy} is asked to modify + * the original request to resume the stream. + */ +@InternalApi +public final class RetryingServerStreamingCallable + extends ServerStreamingCallable { + + private final ServerStreamingCallable innerCallable; + private final ScheduledRetryingExecutor executor; + private final StreamResumptionStrategy resumptionStrategyPrototype; + + public RetryingServerStreamingCallable( + ServerStreamingCallable innerCallable, + ScheduledRetryingExecutor executor, + StreamResumptionStrategy resumptionStrategyPrototype) { + this.innerCallable = innerCallable; + this.executor = executor; + this.resumptionStrategyPrototype = resumptionStrategyPrototype; + } + + @Override + public void call( + RequestT request, + final ResponseObserver responseObserver, + ApiCallContext context) { + + ServerStreamingAttemptCallable attemptCallable = + new ServerStreamingAttemptCallable<>( + innerCallable, + resumptionStrategyPrototype.createNew(), + request, + context, + responseObserver); + + RetryingFuture retryingFuture = executor.createFuture(attemptCallable, context); + attemptCallable.setExternalFuture(retryingFuture); + attemptCallable.start(); + + // Bridge the future result back to the external responseObserver + ApiFutures.addCallback( + retryingFuture, + new ApiFutureCallback() { + @Override + public void onFailure(Throwable throwable) { + // Make sure to unwrap the underlying ApiException + if (throwable instanceof ServerStreamingAttemptException) { + throwable = throwable.getCause(); + } + responseObserver.onError(throwable); + } + + @Override + public void onSuccess(Void ignored) { + responseObserver.onComplete(); + } + }, + directExecutor()); + } +} \ No newline at end of file diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/ServerStreamingAttemptCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/ServerStreamingAttemptCallable.java new file mode 100644 index 0000000000..2b8e7d39a5 --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/ServerStreamingAttemptCallable.java @@ -0,0 +1,366 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.gaxx.retrying; + +import com.google.api.core.InternalApi; +import com.google.api.core.SettableApiFuture; +import com.google.api.gax.retrying.RetryingFuture; +import com.google.api.gax.retrying.ServerStreamingAttemptException; +import com.google.api.gax.retrying.StreamResumptionStrategy; +import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.ResponseObserver; +import com.google.api.gax.rpc.ServerStreamingCallable; +import com.google.api.gax.rpc.StateCheckingResponseObserver; +import com.google.api.gax.rpc.StreamController; +import com.google.common.base.Preconditions; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; + +// TODO: remove this once ApiResultRetryAlgorithm is added to gax. +/** + * A callable that generates Server Streaming attempts. At any one time, it is responsible for at + * most a single outstanding attempt. During an attempt, it proxies all incoming message to the + * outer {@link ResponseObserver} and the {@link StreamResumptionStrategy}. Once the attempt + * completes, the external {@link RetryingFuture} future is notified. If the {@link RetryingFuture} + * decides to retry the attempt, it will invoke {@link #call()}. + * + *

The lifecycle of this class is: + * + *

    + *
  1. The caller instantiates this class. + *
  2. The caller sets the {@link RetryingFuture} via {@link #setExternalFuture(RetryingFuture)}. + * The {@link RetryingFuture} will be responsible for scheduling future attempts. + *
  3. The caller calls {@link #start()}. This notifies the outer {@link ResponseObserver} that + * call is about to start. + *
  4. The outer {@link ResponseObserver} configures inbound flow control via the {@link + * StreamController} that it received in {@link ResponseObserver#onStart(StreamController)}. + *
  5. The attempt call is sent via the inner/upstream {@link ServerStreamingCallable}. + *
  6. A future representing the end state of the inner attempt is passed to the outer {@link + * RetryingFuture}. + *
  7. All messages received from the inner {@link ServerStreamingCallable} are recorded by the + * {@link StreamResumptionStrategy}. + *
  8. All messages received from the inner {@link ServerStreamingCallable} are forwarded to the + * outer {@link ResponseObserver}. + *
  9. Upon attempt completion (either success or failure) are communicated to the outer {@link + * RetryingFuture}. + *
  10. If the {@link RetryingFuture} decides to resume the RPC, it will invoke {@link #call()}, + * which will consult the {@link StreamResumptionStrategy} for the resuming request and + * restart the process at step 5. + *
  11. Once the {@link RetryingFuture} decides to stop the retry loop, it will notify the outer + * {@link ResponseObserver}. + *
+ * + *

This class is meant to be used as middleware between an outer {@link ResponseObserver} and an + * inner {@link ServerStreamingCallable}. As such it follows the general threading model of {@link + * ServerStreamingCallable}s: + * + *

    + *
  • {@code onStart} must be called in the same thread that invoked {@code call()} + *
  • The outer {@link ResponseObserver} can call {@code request()} and {@code cancel()} on this + * class' {@link StreamController} from any thread + *
  • The inner callable will serialize calls to {@code onResponse()}, {@code onError()} and + * {@code onComplete} + *
+ * + *

With this model in mind, this class only needs to synchronize access data that is shared + * between: the outer {@link ResponseObserver} (via this class' {@link StreamController}) and the + * inner {@link ServerStreamingCallable}: pendingRequests, cancellationCause and the current + * innerController. + * + * @param request type + * @param response type + */ +@InternalApi +public final class ServerStreamingAttemptCallable implements Callable { + private final Object lock = new Object(); + + private final ServerStreamingCallable innerCallable; + private final StreamResumptionStrategy resumptionStrategy; + private final RequestT initialRequest; + private ApiCallContext context; + private final ResponseObserver outerObserver; + + // Start state + private boolean autoFlowControl = true; + private boolean isStarted; + + // Outer state + private Throwable cancellationCause; + + private int pendingRequests; + + private RetryingFuture outerRetryingFuture; + + // Internal retry state + private int numAttempts; + + private StreamController innerController; + + private boolean seenSuccessSinceLastError; + private SettableApiFuture innerAttemptFuture; + + public ServerStreamingAttemptCallable( + ServerStreamingCallable innerCallable, + StreamResumptionStrategy resumptionStrategy, + RequestT initialRequest, + ApiCallContext context, + ResponseObserver outerObserver) { + this.innerCallable = innerCallable; + this.resumptionStrategy = resumptionStrategy; + this.initialRequest = initialRequest; + this.context = context; + this.outerObserver = outerObserver; + } + + /** Sets controlling {@link RetryingFuture}. Must be called be before {@link #start()}. */ + void setExternalFuture(RetryingFuture retryingFuture) { + Preconditions.checkState(!isStarted, "Can't change the RetryingFuture once the call has start"); + Preconditions.checkNotNull(retryingFuture, "RetryingFuture can't be null"); + + this.outerRetryingFuture = retryingFuture; + } + + /** + * Starts the initial call. The call is attempted on the caller's thread. Further call attempts + * will be scheduled by the {@link RetryingFuture}. + */ + public void start() { + Preconditions.checkState(!isStarted, "Already started"); + + // Initialize the outer observer + outerObserver.onStart( + new StreamController() { + @Override + public void disableAutoInboundFlowControl() { + Preconditions.checkState( + !isStarted, "Can't disable auto flow control once the stream is started"); + autoFlowControl = false; + } + + @Override + public void request(int count) { + onRequest(count); + } + + @Override + public void cancel() { + onCancel(); + } + }); + + if (autoFlowControl) { + synchronized (lock) { + pendingRequests = Integer.MAX_VALUE; + } + } + isStarted = true; + + // Call the inner callable + call(); + } + + /** + * Sends the actual RPC. The request being sent will first be transformed by the {@link + * StreamResumptionStrategy}. + * + *

This method expects to be called by one thread at a time. Furthermore, it expects that the + * current RPC finished before the next time it's called. + */ + @Override + public Void call() { + Preconditions.checkState(isStarted, "Must be started first"); + + RequestT request = + (++numAttempts == 1) ? initialRequest : resumptionStrategy.getResumeRequest(initialRequest); + + // Should never happen. onAttemptError will check if ResumptionStrategy can create a resume + // request, + // which the RetryingFuture/StreamResumptionStrategy should respect. + Preconditions.checkState(request != null, "ResumptionStrategy returned a null request."); + + innerAttemptFuture = SettableApiFuture.create(); + seenSuccessSinceLastError = false; + + ApiCallContext attemptContext = context; + + if (!outerRetryingFuture.getAttemptSettings().getRpcTimeout().isZero() + && attemptContext.getTimeout() == null) { + attemptContext = + attemptContext.withTimeout(outerRetryingFuture.getAttemptSettings().getRpcTimeout()); + } + + attemptContext + .getTracer() + .attemptStarted(request, outerRetryingFuture.getAttemptSettings().getOverallAttemptCount()); + + innerCallable.call( + request, + new StateCheckingResponseObserver() { + @Override + public void onStartImpl(StreamController controller) { + onAttemptStart(controller); + } + + @Override + public void onResponseImpl(ResponseT response) { + onAttemptResponse(response); + } + + @Override + public void onErrorImpl(Throwable t) { + onAttemptError(t); + } + + @Override + public void onCompleteImpl() { + onAttemptComplete(); + } + }, + attemptContext); + + outerRetryingFuture.setAttemptFuture(innerAttemptFuture); + + return null; + } + + /** + * Called by the inner {@link ServerStreamingCallable} when the call is about to start. This will + * transfer unfinished state from the previous attempt. + * + * @see ResponseObserver#onStart(StreamController) + */ + private void onAttemptStart(StreamController controller) { + if (!autoFlowControl) { + controller.disableAutoInboundFlowControl(); + } + + Throwable localCancellationCause; + int numToRequest = 0; + + synchronized (lock) { + innerController = controller; + + localCancellationCause = this.cancellationCause; + + if (!autoFlowControl) { + numToRequest = pendingRequests; + } + } + + if (localCancellationCause != null) { + controller.cancel(); + } else if (numToRequest > 0) { + controller.request(numToRequest); + } + } + + /** + * Called when the outer {@link ResponseObserver} wants to prematurely cancel the stream. + * + * @see StreamController#cancel() + */ + private void onCancel() { + StreamController localInnerController; + + synchronized (lock) { + if (cancellationCause != null) { + return; + } + // NOTE: BasicRetryingFuture will replace j.u.c.CancellationExceptions with it's own, + // which will not have the current stacktrace, so a special wrapper has be used here. + cancellationCause = + new ServerStreamingAttemptException( + new CancellationException("User cancelled stream"), + resumptionStrategy.canResume(), + seenSuccessSinceLastError); + localInnerController = innerController; + } + + if (localInnerController != null) { + localInnerController.cancel(); + } + } + + /** + * Called when the outer {@link ResponseObserver} is ready for more data. + * + * @see StreamController#request(int) + */ + private void onRequest(int count) { + Preconditions.checkState(!autoFlowControl, "Automatic flow control is enabled"); + Preconditions.checkArgument(count > 0, "Count must be > 0"); + + final StreamController localInnerController; + + synchronized (lock) { + int maxInc = Integer.MAX_VALUE - pendingRequests; + count = Math.min(maxInc, count); + + pendingRequests += count; + localInnerController = this.innerController; + } + + // Note: there is a race condition here where the count might go to the previous attempt's + // StreamController after it failed. But it doesn't matter, because the controller will just + // ignore it and the current controller will pick it up onStart. + if (localInnerController != null) { + localInnerController.request(count); + } + } + + /** Called when the inner callable has responses to deliver. */ + private void onAttemptResponse(ResponseT message) { + if (!autoFlowControl) { + synchronized (lock) { + pendingRequests--; + } + } + // Update local state to allow for future resume. + seenSuccessSinceLastError = true; + message = resumptionStrategy.processResponse(message); + // Notify the outer observer. + outerObserver.onResponse(message); + } + + /** + * Called when the current RPC fails. The error will be bubbled up to the outer {@link + * RetryingFuture} via the {@link #innerAttemptFuture}. + */ + private void onAttemptError(Throwable throwable) { + Throwable localCancellationCause; + synchronized (lock) { + localCancellationCause = cancellationCause; + } + + if (localCancellationCause != null) { + // Take special care to preserve the cancellation's stack trace. + innerAttemptFuture.setException(localCancellationCause); + } else { + // Wrap the original exception and provide more context for StreamingRetryAlgorithm. + innerAttemptFuture.setException( + new ServerStreamingAttemptException( + throwable, resumptionStrategy.canResume(), seenSuccessSinceLastError)); + } + } + + /** + * Called when the current RPC successfully completes. Notifies the outer {@link RetryingFuture} + * via {@link #innerAttemptFuture}. + */ + private void onAttemptComplete() { + innerAttemptFuture.set(null); + } +} \ No newline at end of file From 03190c15f02d29451c82a42c238863d9f7a694b3 Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Fri, 8 Dec 2023 09:33:53 -0500 Subject: [PATCH 2/9] fix format --- .../gaxx/retrying/AttemptCallable.java | 102 ++-- .../gaxx/retrying/RetryingCallable.java | 50 +- .../RetryingServerStreamingCallable.java | 94 ++-- .../ServerStreamingAttemptCallable.java | 518 +++++++++--------- 4 files changed, 382 insertions(+), 382 deletions(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/AttemptCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/AttemptCallable.java index 5bb8847e60..d9a6d8f9c4 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/AttemptCallable.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/AttemptCallable.java @@ -1,18 +1,18 @@ /* -* Copyright 2023 Google LLC -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* https://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.google.cloud.bigtable.gaxx.retrying; import com.google.api.core.ApiFuture; @@ -36,49 +36,49 @@ */ @InternalApi public class AttemptCallable implements Callable { - private final UnaryCallable callable; - private final RequestT request; - private final ApiCallContext originalCallContext; - - private volatile RetryingFuture externalFuture; + private final UnaryCallable callable; + private final RequestT request; + private final ApiCallContext originalCallContext; - AttemptCallable( - UnaryCallable callable, RequestT request, ApiCallContext callContext) { - this.callable = Preconditions.checkNotNull(callable); - this.request = Preconditions.checkNotNull(request); - this.originalCallContext = Preconditions.checkNotNull(callContext); - } + private volatile RetryingFuture externalFuture; - public void setExternalFuture(RetryingFuture externalFuture) { - this.externalFuture = Preconditions.checkNotNull(externalFuture); - } + AttemptCallable( + UnaryCallable callable, RequestT request, ApiCallContext callContext) { + this.callable = Preconditions.checkNotNull(callable); + this.request = Preconditions.checkNotNull(request); + this.originalCallContext = Preconditions.checkNotNull(callContext); + } - @Override - public ResponseT call() { - ApiCallContext callContext = originalCallContext; + public void setExternalFuture(RetryingFuture externalFuture) { + this.externalFuture = Preconditions.checkNotNull(externalFuture); + } - try { - // Set the RPC timeout if the caller did not provide their own. - Duration rpcTimeout = externalFuture.getAttemptSettings().getRpcTimeout(); - if (!rpcTimeout.isZero() && callContext.getTimeout() == null) { - callContext = callContext.withTimeout(rpcTimeout); - } + @Override + public ResponseT call() { + ApiCallContext callContext = originalCallContext; - externalFuture.setAttemptFuture(new NonCancellableFuture()); - if (externalFuture.isDone()) { - return null; - } + try { + // Set the RPC timeout if the caller did not provide their own. + Duration rpcTimeout = externalFuture.getAttemptSettings().getRpcTimeout(); + if (!rpcTimeout.isZero() && callContext.getTimeout() == null) { + callContext = callContext.withTimeout(rpcTimeout); + } - callContext - .getTracer() - .attemptStarted(request, externalFuture.getAttemptSettings().getOverallAttemptCount()); + externalFuture.setAttemptFuture(new NonCancellableFuture()); + if (externalFuture.isDone()) { + return null; + } - ApiFuture internalFuture = callable.futureCall(request, callContext); - externalFuture.setAttemptFuture(internalFuture); - } catch (Throwable e) { - externalFuture.setAttemptFuture(ApiFutures.immediateFailedFuture(e)); - } + callContext + .getTracer() + .attemptStarted(request, externalFuture.getAttemptSettings().getOverallAttemptCount()); - return null; + ApiFuture internalFuture = callable.futureCall(request, callContext); + externalFuture.setAttemptFuture(internalFuture); + } catch (Throwable e) { + externalFuture.setAttemptFuture(ApiFutures.immediateFailedFuture(e)); } -} \ No newline at end of file + + return null; + } +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/RetryingCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/RetryingCallable.java index bfb5541624..d78bf08322 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/RetryingCallable.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/RetryingCallable.java @@ -28,31 +28,31 @@ */ @InternalApi public class RetryingCallable extends UnaryCallable { - private final ApiCallContext callContextPrototype; - private final UnaryCallable callable; - private final RetryingExecutorWithContext executor; + private final ApiCallContext callContextPrototype; + private final UnaryCallable callable; + private final RetryingExecutorWithContext executor; - public RetryingCallable( - ApiCallContext callContextPrototype, - UnaryCallable callable, - RetryingExecutorWithContext executor) { - this.callContextPrototype = (ApiCallContext) Preconditions.checkNotNull(callContextPrototype); - this.callable = (UnaryCallable) Preconditions.checkNotNull(callable); - this.executor = (RetryingExecutorWithContext) Preconditions.checkNotNull(executor); - } + public RetryingCallable( + ApiCallContext callContextPrototype, + UnaryCallable callable, + RetryingExecutorWithContext executor) { + this.callContextPrototype = (ApiCallContext) Preconditions.checkNotNull(callContextPrototype); + this.callable = (UnaryCallable) Preconditions.checkNotNull(callable); + this.executor = (RetryingExecutorWithContext) Preconditions.checkNotNull(executor); + } - public RetryingFuture futureCall(RequestT request, ApiCallContext inputContext) { - ApiCallContext context = this.callContextPrototype.nullToSelf(inputContext); - AttemptCallable retryCallable = - new AttemptCallable(this.callable, request, context); - RetryingFuture retryingFuture = - this.executor.createFuture(retryCallable, inputContext); - retryCallable.setExternalFuture(retryingFuture); - retryCallable.call(); - return retryingFuture; - } + public RetryingFuture futureCall(RequestT request, ApiCallContext inputContext) { + ApiCallContext context = this.callContextPrototype.nullToSelf(inputContext); + AttemptCallable retryCallable = + new AttemptCallable(this.callable, request, context); + RetryingFuture retryingFuture = + this.executor.createFuture(retryCallable, inputContext); + retryCallable.setExternalFuture(retryingFuture); + retryCallable.call(); + return retryingFuture; + } - public String toString() { - return String.format("retrying(%s)", this.callable); - } -} \ No newline at end of file + public String toString() { + return String.format("retrying(%s)", this.callable); + } +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/RetryingServerStreamingCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/RetryingServerStreamingCallable.java index 5b693d9075..504cf4f2b7 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/RetryingServerStreamingCallable.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/RetryingServerStreamingCallable.java @@ -43,57 +43,57 @@ */ @InternalApi public final class RetryingServerStreamingCallable - extends ServerStreamingCallable { + extends ServerStreamingCallable { - private final ServerStreamingCallable innerCallable; - private final ScheduledRetryingExecutor executor; - private final StreamResumptionStrategy resumptionStrategyPrototype; + private final ServerStreamingCallable innerCallable; + private final ScheduledRetryingExecutor executor; + private final StreamResumptionStrategy resumptionStrategyPrototype; - public RetryingServerStreamingCallable( - ServerStreamingCallable innerCallable, - ScheduledRetryingExecutor executor, - StreamResumptionStrategy resumptionStrategyPrototype) { - this.innerCallable = innerCallable; - this.executor = executor; - this.resumptionStrategyPrototype = resumptionStrategyPrototype; - } + public RetryingServerStreamingCallable( + ServerStreamingCallable innerCallable, + ScheduledRetryingExecutor executor, + StreamResumptionStrategy resumptionStrategyPrototype) { + this.innerCallable = innerCallable; + this.executor = executor; + this.resumptionStrategyPrototype = resumptionStrategyPrototype; + } - @Override - public void call( - RequestT request, - final ResponseObserver responseObserver, - ApiCallContext context) { + @Override + public void call( + RequestT request, + final ResponseObserver responseObserver, + ApiCallContext context) { - ServerStreamingAttemptCallable attemptCallable = - new ServerStreamingAttemptCallable<>( - innerCallable, - resumptionStrategyPrototype.createNew(), - request, - context, - responseObserver); + ServerStreamingAttemptCallable attemptCallable = + new ServerStreamingAttemptCallable<>( + innerCallable, + resumptionStrategyPrototype.createNew(), + request, + context, + responseObserver); - RetryingFuture retryingFuture = executor.createFuture(attemptCallable, context); - attemptCallable.setExternalFuture(retryingFuture); - attemptCallable.start(); + RetryingFuture retryingFuture = executor.createFuture(attemptCallable, context); + attemptCallable.setExternalFuture(retryingFuture); + attemptCallable.start(); - // Bridge the future result back to the external responseObserver - ApiFutures.addCallback( - retryingFuture, - new ApiFutureCallback() { - @Override - public void onFailure(Throwable throwable) { - // Make sure to unwrap the underlying ApiException - if (throwable instanceof ServerStreamingAttemptException) { - throwable = throwable.getCause(); - } - responseObserver.onError(throwable); - } + // Bridge the future result back to the external responseObserver + ApiFutures.addCallback( + retryingFuture, + new ApiFutureCallback() { + @Override + public void onFailure(Throwable throwable) { + // Make sure to unwrap the underlying ApiException + if (throwable instanceof ServerStreamingAttemptException) { + throwable = throwable.getCause(); + } + responseObserver.onError(throwable); + } - @Override - public void onSuccess(Void ignored) { - responseObserver.onComplete(); - } - }, - directExecutor()); - } -} \ No newline at end of file + @Override + public void onSuccess(Void ignored) { + responseObserver.onComplete(); + } + }, + directExecutor()); + } +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/ServerStreamingAttemptCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/ServerStreamingAttemptCallable.java index 2b8e7d39a5..793cf2e91c 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/ServerStreamingAttemptCallable.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/ServerStreamingAttemptCallable.java @@ -85,282 +85,282 @@ */ @InternalApi public final class ServerStreamingAttemptCallable implements Callable { - private final Object lock = new Object(); - - private final ServerStreamingCallable innerCallable; - private final StreamResumptionStrategy resumptionStrategy; - private final RequestT initialRequest; - private ApiCallContext context; - private final ResponseObserver outerObserver; - - // Start state - private boolean autoFlowControl = true; - private boolean isStarted; - - // Outer state - private Throwable cancellationCause; - - private int pendingRequests; + private final Object lock = new Object(); + + private final ServerStreamingCallable innerCallable; + private final StreamResumptionStrategy resumptionStrategy; + private final RequestT initialRequest; + private ApiCallContext context; + private final ResponseObserver outerObserver; + + // Start state + private boolean autoFlowControl = true; + private boolean isStarted; + + // Outer state + private Throwable cancellationCause; + + private int pendingRequests; + + private RetryingFuture outerRetryingFuture; + + // Internal retry state + private int numAttempts; + + private StreamController innerController; + + private boolean seenSuccessSinceLastError; + private SettableApiFuture innerAttemptFuture; + + public ServerStreamingAttemptCallable( + ServerStreamingCallable innerCallable, + StreamResumptionStrategy resumptionStrategy, + RequestT initialRequest, + ApiCallContext context, + ResponseObserver outerObserver) { + this.innerCallable = innerCallable; + this.resumptionStrategy = resumptionStrategy; + this.initialRequest = initialRequest; + this.context = context; + this.outerObserver = outerObserver; + } + + /** Sets controlling {@link RetryingFuture}. Must be called be before {@link #start()}. */ + void setExternalFuture(RetryingFuture retryingFuture) { + Preconditions.checkState(!isStarted, "Can't change the RetryingFuture once the call has start"); + Preconditions.checkNotNull(retryingFuture, "RetryingFuture can't be null"); + + this.outerRetryingFuture = retryingFuture; + } + + /** + * Starts the initial call. The call is attempted on the caller's thread. Further call attempts + * will be scheduled by the {@link RetryingFuture}. + */ + public void start() { + Preconditions.checkState(!isStarted, "Already started"); + + // Initialize the outer observer + outerObserver.onStart( + new StreamController() { + @Override + public void disableAutoInboundFlowControl() { + Preconditions.checkState( + !isStarted, "Can't disable auto flow control once the stream is started"); + autoFlowControl = false; + } + + @Override + public void request(int count) { + onRequest(count); + } + + @Override + public void cancel() { + onCancel(); + } + }); + + if (autoFlowControl) { + synchronized (lock) { + pendingRequests = Integer.MAX_VALUE; + } + } + isStarted = true; + + // Call the inner callable + call(); + } + + /** + * Sends the actual RPC. The request being sent will first be transformed by the {@link + * StreamResumptionStrategy}. + * + *

This method expects to be called by one thread at a time. Furthermore, it expects that the + * current RPC finished before the next time it's called. + */ + @Override + public Void call() { + Preconditions.checkState(isStarted, "Must be started first"); + + RequestT request = + (++numAttempts == 1) ? initialRequest : resumptionStrategy.getResumeRequest(initialRequest); + + // Should never happen. onAttemptError will check if ResumptionStrategy can create a resume + // request, + // which the RetryingFuture/StreamResumptionStrategy should respect. + Preconditions.checkState(request != null, "ResumptionStrategy returned a null request."); + + innerAttemptFuture = SettableApiFuture.create(); + seenSuccessSinceLastError = false; + + ApiCallContext attemptContext = context; + + if (!outerRetryingFuture.getAttemptSettings().getRpcTimeout().isZero() + && attemptContext.getTimeout() == null) { + attemptContext = + attemptContext.withTimeout(outerRetryingFuture.getAttemptSettings().getRpcTimeout()); + } - private RetryingFuture outerRetryingFuture; + attemptContext + .getTracer() + .attemptStarted(request, outerRetryingFuture.getAttemptSettings().getOverallAttemptCount()); + + innerCallable.call( + request, + new StateCheckingResponseObserver() { + @Override + public void onStartImpl(StreamController controller) { + onAttemptStart(controller); + } + + @Override + public void onResponseImpl(ResponseT response) { + onAttemptResponse(response); + } + + @Override + public void onErrorImpl(Throwable t) { + onAttemptError(t); + } + + @Override + public void onCompleteImpl() { + onAttemptComplete(); + } + }, + attemptContext); + + outerRetryingFuture.setAttemptFuture(innerAttemptFuture); + + return null; + } + + /** + * Called by the inner {@link ServerStreamingCallable} when the call is about to start. This will + * transfer unfinished state from the previous attempt. + * + * @see ResponseObserver#onStart(StreamController) + */ + private void onAttemptStart(StreamController controller) { + if (!autoFlowControl) { + controller.disableAutoInboundFlowControl(); + } - // Internal retry state - private int numAttempts; + Throwable localCancellationCause; + int numToRequest = 0; - private StreamController innerController; + synchronized (lock) { + innerController = controller; - private boolean seenSuccessSinceLastError; - private SettableApiFuture innerAttemptFuture; + localCancellationCause = this.cancellationCause; - public ServerStreamingAttemptCallable( - ServerStreamingCallable innerCallable, - StreamResumptionStrategy resumptionStrategy, - RequestT initialRequest, - ApiCallContext context, - ResponseObserver outerObserver) { - this.innerCallable = innerCallable; - this.resumptionStrategy = resumptionStrategy; - this.initialRequest = initialRequest; - this.context = context; - this.outerObserver = outerObserver; + if (!autoFlowControl) { + numToRequest = pendingRequests; + } } - /** Sets controlling {@link RetryingFuture}. Must be called be before {@link #start()}. */ - void setExternalFuture(RetryingFuture retryingFuture) { - Preconditions.checkState(!isStarted, "Can't change the RetryingFuture once the call has start"); - Preconditions.checkNotNull(retryingFuture, "RetryingFuture can't be null"); - - this.outerRetryingFuture = retryingFuture; + if (localCancellationCause != null) { + controller.cancel(); + } else if (numToRequest > 0) { + controller.request(numToRequest); } - - /** - * Starts the initial call. The call is attempted on the caller's thread. Further call attempts - * will be scheduled by the {@link RetryingFuture}. - */ - public void start() { - Preconditions.checkState(!isStarted, "Already started"); - - // Initialize the outer observer - outerObserver.onStart( - new StreamController() { - @Override - public void disableAutoInboundFlowControl() { - Preconditions.checkState( - !isStarted, "Can't disable auto flow control once the stream is started"); - autoFlowControl = false; - } - - @Override - public void request(int count) { - onRequest(count); - } - - @Override - public void cancel() { - onCancel(); - } - }); - - if (autoFlowControl) { - synchronized (lock) { - pendingRequests = Integer.MAX_VALUE; - } - } - isStarted = true; - - // Call the inner callable - call(); + } + + /** + * Called when the outer {@link ResponseObserver} wants to prematurely cancel the stream. + * + * @see StreamController#cancel() + */ + private void onCancel() { + StreamController localInnerController; + + synchronized (lock) { + if (cancellationCause != null) { + return; + } + // NOTE: BasicRetryingFuture will replace j.u.c.CancellationExceptions with it's own, + // which will not have the current stacktrace, so a special wrapper has be used here. + cancellationCause = + new ServerStreamingAttemptException( + new CancellationException("User cancelled stream"), + resumptionStrategy.canResume(), + seenSuccessSinceLastError); + localInnerController = innerController; } - /** - * Sends the actual RPC. The request being sent will first be transformed by the {@link - * StreamResumptionStrategy}. - * - *

This method expects to be called by one thread at a time. Furthermore, it expects that the - * current RPC finished before the next time it's called. - */ - @Override - public Void call() { - Preconditions.checkState(isStarted, "Must be started first"); - - RequestT request = - (++numAttempts == 1) ? initialRequest : resumptionStrategy.getResumeRequest(initialRequest); - - // Should never happen. onAttemptError will check if ResumptionStrategy can create a resume - // request, - // which the RetryingFuture/StreamResumptionStrategy should respect. - Preconditions.checkState(request != null, "ResumptionStrategy returned a null request."); - - innerAttemptFuture = SettableApiFuture.create(); - seenSuccessSinceLastError = false; - - ApiCallContext attemptContext = context; - - if (!outerRetryingFuture.getAttemptSettings().getRpcTimeout().isZero() - && attemptContext.getTimeout() == null) { - attemptContext = - attemptContext.withTimeout(outerRetryingFuture.getAttemptSettings().getRpcTimeout()); - } - - attemptContext - .getTracer() - .attemptStarted(request, outerRetryingFuture.getAttemptSettings().getOverallAttemptCount()); - - innerCallable.call( - request, - new StateCheckingResponseObserver() { - @Override - public void onStartImpl(StreamController controller) { - onAttemptStart(controller); - } - - @Override - public void onResponseImpl(ResponseT response) { - onAttemptResponse(response); - } - - @Override - public void onErrorImpl(Throwable t) { - onAttemptError(t); - } - - @Override - public void onCompleteImpl() { - onAttemptComplete(); - } - }, - attemptContext); - - outerRetryingFuture.setAttemptFuture(innerAttemptFuture); - - return null; + if (localInnerController != null) { + localInnerController.cancel(); } + } - /** - * Called by the inner {@link ServerStreamingCallable} when the call is about to start. This will - * transfer unfinished state from the previous attempt. - * - * @see ResponseObserver#onStart(StreamController) - */ - private void onAttemptStart(StreamController controller) { - if (!autoFlowControl) { - controller.disableAutoInboundFlowControl(); - } - - Throwable localCancellationCause; - int numToRequest = 0; - - synchronized (lock) { - innerController = controller; - - localCancellationCause = this.cancellationCause; - - if (!autoFlowControl) { - numToRequest = pendingRequests; - } - } - - if (localCancellationCause != null) { - controller.cancel(); - } else if (numToRequest > 0) { - controller.request(numToRequest); - } - } + /** + * Called when the outer {@link ResponseObserver} is ready for more data. + * + * @see StreamController#request(int) + */ + private void onRequest(int count) { + Preconditions.checkState(!autoFlowControl, "Automatic flow control is enabled"); + Preconditions.checkArgument(count > 0, "Count must be > 0"); - /** - * Called when the outer {@link ResponseObserver} wants to prematurely cancel the stream. - * - * @see StreamController#cancel() - */ - private void onCancel() { - StreamController localInnerController; - - synchronized (lock) { - if (cancellationCause != null) { - return; - } - // NOTE: BasicRetryingFuture will replace j.u.c.CancellationExceptions with it's own, - // which will not have the current stacktrace, so a special wrapper has be used here. - cancellationCause = - new ServerStreamingAttemptException( - new CancellationException("User cancelled stream"), - resumptionStrategy.canResume(), - seenSuccessSinceLastError); - localInnerController = innerController; - } - - if (localInnerController != null) { - localInnerController.cancel(); - } - } + final StreamController localInnerController; - /** - * Called when the outer {@link ResponseObserver} is ready for more data. - * - * @see StreamController#request(int) - */ - private void onRequest(int count) { - Preconditions.checkState(!autoFlowControl, "Automatic flow control is enabled"); - Preconditions.checkArgument(count > 0, "Count must be > 0"); - - final StreamController localInnerController; - - synchronized (lock) { - int maxInc = Integer.MAX_VALUE - pendingRequests; - count = Math.min(maxInc, count); - - pendingRequests += count; - localInnerController = this.innerController; - } - - // Note: there is a race condition here where the count might go to the previous attempt's - // StreamController after it failed. But it doesn't matter, because the controller will just - // ignore it and the current controller will pick it up onStart. - if (localInnerController != null) { - localInnerController.request(count); - } - } + synchronized (lock) { + int maxInc = Integer.MAX_VALUE - pendingRequests; + count = Math.min(maxInc, count); - /** Called when the inner callable has responses to deliver. */ - private void onAttemptResponse(ResponseT message) { - if (!autoFlowControl) { - synchronized (lock) { - pendingRequests--; - } - } - // Update local state to allow for future resume. - seenSuccessSinceLastError = true; - message = resumptionStrategy.processResponse(message); - // Notify the outer observer. - outerObserver.onResponse(message); + pendingRequests += count; + localInnerController = this.innerController; } - /** - * Called when the current RPC fails. The error will be bubbled up to the outer {@link - * RetryingFuture} via the {@link #innerAttemptFuture}. - */ - private void onAttemptError(Throwable throwable) { - Throwable localCancellationCause; - synchronized (lock) { - localCancellationCause = cancellationCause; - } - - if (localCancellationCause != null) { - // Take special care to preserve the cancellation's stack trace. - innerAttemptFuture.setException(localCancellationCause); - } else { - // Wrap the original exception and provide more context for StreamingRetryAlgorithm. - innerAttemptFuture.setException( - new ServerStreamingAttemptException( - throwable, resumptionStrategy.canResume(), seenSuccessSinceLastError)); - } + // Note: there is a race condition here where the count might go to the previous attempt's + // StreamController after it failed. But it doesn't matter, because the controller will just + // ignore it and the current controller will pick it up onStart. + if (localInnerController != null) { + localInnerController.request(count); + } + } + + /** Called when the inner callable has responses to deliver. */ + private void onAttemptResponse(ResponseT message) { + if (!autoFlowControl) { + synchronized (lock) { + pendingRequests--; + } + } + // Update local state to allow for future resume. + seenSuccessSinceLastError = true; + message = resumptionStrategy.processResponse(message); + // Notify the outer observer. + outerObserver.onResponse(message); + } + + /** + * Called when the current RPC fails. The error will be bubbled up to the outer {@link + * RetryingFuture} via the {@link #innerAttemptFuture}. + */ + private void onAttemptError(Throwable throwable) { + Throwable localCancellationCause; + synchronized (lock) { + localCancellationCause = cancellationCause; } - /** - * Called when the current RPC successfully completes. Notifies the outer {@link RetryingFuture} - * via {@link #innerAttemptFuture}. - */ - private void onAttemptComplete() { - innerAttemptFuture.set(null); + if (localCancellationCause != null) { + // Take special care to preserve the cancellation's stack trace. + innerAttemptFuture.setException(localCancellationCause); + } else { + // Wrap the original exception and provide more context for StreamingRetryAlgorithm. + innerAttemptFuture.setException( + new ServerStreamingAttemptException( + throwable, resumptionStrategy.canResume(), seenSuccessSinceLastError)); } -} \ No newline at end of file + } + + /** + * Called when the current RPC successfully completes. Notifies the outer {@link RetryingFuture} + * via {@link #innerAttemptFuture}. + */ + private void onAttemptComplete() { + innerAttemptFuture.set(null); + } +} From 300cba9c9e2469f55e334e51d57c36d3e2cb4c68 Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Tue, 12 Dec 2023 12:52:02 -0500 Subject: [PATCH 3/9] add callables --- .../gaxx/retrying/AttemptCallable.java | 2 +- .../bigtable/gaxx/retrying/Callables.java | 117 ++++++++++++++++++ 2 files changed, 118 insertions(+), 1 deletion(-) create mode 100644 google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/Callables.java diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/AttemptCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/AttemptCallable.java index d9a6d8f9c4..3599e1e4df 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/AttemptCallable.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/AttemptCallable.java @@ -29,7 +29,7 @@ // TODO: remove this once ApiResultRetryAlgorithm is added to gax. /** * A callable representing an attempt to make an RPC call. This class is used from {@link - * com.google.cloud.bigtable.gaxx.retrying.RetryingCallable}. + * RetryingCallable}. * * @param request type * @param response type diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/Callables.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/Callables.java new file mode 100644 index 0000000000..3c051048b3 --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/Callables.java @@ -0,0 +1,117 @@ +/* + * Copyright 2017 Google LLC + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google LLC nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.cloud.bigtable.gaxx.retrying; + +import com.google.api.core.BetaApi; +import com.google.api.gax.retrying.ExponentialRetryAlgorithm; +import com.google.api.gax.retrying.RetryAlgorithm; +import com.google.api.gax.retrying.RetrySettings; +import com.google.api.gax.retrying.ScheduledRetryingExecutor; +import com.google.api.gax.retrying.StreamingRetryAlgorithm; +import com.google.api.gax.rpc.ClientContext; +import com.google.api.gax.rpc.ServerStreamingCallSettings; +import com.google.api.gax.rpc.ServerStreamingCallable; +import com.google.api.gax.rpc.StatusCode; +import com.google.api.gax.rpc.UnaryCallSettings; +import com.google.api.gax.rpc.UnaryCallable; +import java.util.Collection; + +// TODO: remove this once ApiResultRetryAlgorithm is added to gax. +/** + * Class with utility methods to create callable objects using provided settings. + * + *

The callable objects wrap a given direct callable with features like retry and exception + * translation. + */ +@BetaApi +public class Callables { + + private Callables() {} + + public static UnaryCallable retrying( + UnaryCallable innerCallable, + UnaryCallSettings callSettings, + ClientContext clientContext) { + + UnaryCallSettings settings = callSettings; + + if (areRetriesDisabled(settings.getRetryableCodes(), settings.getRetrySettings())) { + // When retries are disabled, the total timeout can be treated as the rpc timeout. + settings = + settings + .toBuilder() + .setSimpleTimeoutNoRetries(settings.getRetrySettings().getTotalTimeout()) + .build(); + } + + RetryAlgorithm retryAlgorithm = + new RetryAlgorithm<>( + new ApiResultRetryAlgorithm(), + new ExponentialRetryAlgorithm(settings.getRetrySettings(), clientContext.getClock())); + ScheduledRetryingExecutor retryingExecutor = + new ScheduledRetryingExecutor<>(retryAlgorithm, clientContext.getExecutor()); + return new RetryingCallable<>( + clientContext.getDefaultCallContext(), innerCallable, retryingExecutor); + } + + public static ServerStreamingCallable retrying( + ServerStreamingCallable innerCallable, + ServerStreamingCallSettings callSettings, + ClientContext clientContext) { + + ServerStreamingCallSettings settings = callSettings; + if (areRetriesDisabled(settings.getRetryableCodes(), settings.getRetrySettings())) { + // When retries are disabled, the total timeout can be treated as the rpc timeout. + settings = + settings + .toBuilder() + .setSimpleTimeoutNoRetries(settings.getRetrySettings().getTotalTimeout()) + .build(); + } + + StreamingRetryAlgorithm retryAlgorithm = + new StreamingRetryAlgorithm<>( + new ApiResultRetryAlgorithm(), + new ExponentialRetryAlgorithm(settings.getRetrySettings(), clientContext.getClock())); + + ScheduledRetryingExecutor retryingExecutor = + new ScheduledRetryingExecutor<>(retryAlgorithm, clientContext.getExecutor()); + + return new RetryingServerStreamingCallable<>( + innerCallable, retryingExecutor, settings.getResumptionStrategy()); + } + + private static boolean areRetriesDisabled( + Collection retryableCodes, RetrySettings retrySettings) { + return retrySettings.getMaxAttempts() == 1 + || retryableCodes.isEmpty() + || (retrySettings.getMaxAttempts() == 0 && retrySettings.getTotalTimeout().isZero()); + } +} From b99917d5049ad9a88141eb5c3e9151264a1144f8 Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Tue, 12 Dec 2023 13:11:45 -0500 Subject: [PATCH 4/9] update retry algorithm to be in sync with gax --- .../retrying/ApiResultRetryAlgorithm.java | 46 ++++++++++--------- .../bigtable/gaxx/retrying/Callables.java | 34 ++++---------- 2 files changed, 34 insertions(+), 46 deletions(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/ApiResultRetryAlgorithm.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/ApiResultRetryAlgorithm.java index c7f3d18b62..d71a044235 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/ApiResultRetryAlgorithm.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/ApiResultRetryAlgorithm.java @@ -16,36 +16,38 @@ package com.google.cloud.bigtable.gaxx.retrying; import com.google.api.core.InternalApi; -import com.google.api.gax.retrying.ResultRetryAlgorithm; -import com.google.api.gax.retrying.TimedAttemptSettings; +import com.google.api.gax.retrying.BasicResultRetryAlgorithm; +import com.google.api.gax.retrying.RetryingContext; import com.google.api.gax.rpc.ApiException; -import com.google.api.gax.rpc.DeadlineExceededException; -import org.threeten.bp.Duration; /** For internal use, public for technical reasons. */ @InternalApi -public class ApiResultRetryAlgorithm implements ResultRetryAlgorithm { - // Duration to sleep on if the error is DEADLINE_EXCEEDED. - public static final Duration DEADLINE_SLEEP_DURATION = Duration.ofMillis(1); +public class ApiResultRetryAlgorithm extends BasicResultRetryAlgorithm { + /** Returns true if previousThrowable is an {@link ApiException} that is retryable. */ @Override - public TimedAttemptSettings createNextAttempt( - Throwable prevThrowable, ResponseT prevResponse, TimedAttemptSettings prevSettings) { - if (prevThrowable != null && prevThrowable instanceof DeadlineExceededException) { - return TimedAttemptSettings.newBuilder() - .setGlobalSettings(prevSettings.getGlobalSettings()) - .setRetryDelay(prevSettings.getRetryDelay()) - .setRpcTimeout(prevSettings.getRpcTimeout()) - .setRandomizedRetryDelay(DEADLINE_SLEEP_DURATION) - .setAttemptCount(prevSettings.getAttemptCount() + 1) - .setFirstAttemptStartTimeNanos(prevSettings.getFirstAttemptStartTimeNanos()) - .build(); - } - return null; + public boolean shouldRetry(Throwable previousThrowable, ResponseT previousResponse) { + return (previousThrowable instanceof ApiException) + && ((ApiException) previousThrowable).isRetryable(); } + /** + * If {@link RetryingContext#getRetryableCodes()} is not null: Returns true if the status code of + * previousThrowable is in the list of retryable code of the {@link RetryingContext}. + * + *

Otherwise it returns the result of {@link #shouldRetry(Throwable, Object)}. + */ @Override - public boolean shouldRetry(Throwable prevThrowable, ResponseT prevResponse) { - return (prevThrowable instanceof ApiException) && ((ApiException) prevThrowable).isRetryable(); + public boolean shouldRetry( + RetryingContext context, Throwable previousThrowable, ResponseT previousResponse) { + if (context.getRetryableCodes() != null) { + // Ignore the isRetryable() value of the throwable if the RetryingContext has a specific list + // of codes that should be retried. + return (previousThrowable instanceof ApiException) + && context + .getRetryableCodes() + .contains(((ApiException) previousThrowable).getStatusCode().getCode()); + } + return shouldRetry(previousThrowable, previousResponse); } } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/Callables.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/Callables.java index 3c051048b3..49ddfddb35 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/Callables.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/Callables.java @@ -1,31 +1,17 @@ /* - * Copyright 2017 Google LLC + * Copyright 2023 Google LLC * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google LLC nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. + * https://www.apache.org/licenses/LICENSE-2.0 * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package com.google.cloud.bigtable.gaxx.retrying; From b6eeea825fd4e04778ef7d6bfab853e13b0eda3c Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Tue, 12 Dec 2023 13:14:49 -0500 Subject: [PATCH 5/9] fix clirr --- google-cloud-bigtable/clirr-ignored-differences.xml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/google-cloud-bigtable/clirr-ignored-differences.xml b/google-cloud-bigtable/clirr-ignored-differences.xml index 4bb4684c38..1f0ff9f4a1 100644 --- a/google-cloud-bigtable/clirr-ignored-differences.xml +++ b/google-cloud-bigtable/clirr-ignored-differences.xml @@ -150,4 +150,10 @@ 8001 com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerBatchedUnaryCallable + + + 6001 + com/google/cloud/bigtable/gaxx/retrying/ApiResultRetryAlgorithm + * + From 3dc8585834d03d4a8926430f37dd99dbf2793b7f Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Wed, 13 Dec 2023 13:42:05 -0500 Subject: [PATCH 6/9] mark as internal --- .../com/google/cloud/bigtable/gaxx/retrying/Callables.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/Callables.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/Callables.java index 49ddfddb35..e62d371ac3 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/Callables.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/Callables.java @@ -15,7 +15,7 @@ */ package com.google.cloud.bigtable.gaxx.retrying; -import com.google.api.core.BetaApi; +import com.google.api.core.InternalApi; import com.google.api.gax.retrying.ExponentialRetryAlgorithm; import com.google.api.gax.retrying.RetryAlgorithm; import com.google.api.gax.retrying.RetrySettings; @@ -36,7 +36,7 @@ *

The callable objects wrap a given direct callable with features like retry and exception * translation. */ -@BetaApi +@InternalApi public class Callables { private Callables() {} From ab838fa9dac006c58883aeea345fa2e245cdb09e Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Wed, 13 Dec 2023 19:26:03 +0000 Subject: [PATCH 7/9] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20po?= =?UTF-8?q?st-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 8c7027f42f..a1a00577d2 100644 --- a/README.md +++ b/README.md @@ -50,7 +50,7 @@ If you are using Maven without the BOM, add this to your dependencies: If you are using Gradle 5.x or later, add this to your dependencies: ```Groovy -implementation platform('com.google.cloud:libraries-bom:26.27.0') +implementation platform('com.google.cloud:libraries-bom:26.29.0') implementation 'com.google.cloud:google-cloud-bigtable' ``` From 35f22c3fadcfb6c4ad3a2fbff692be4f139db23a Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Wed, 13 Dec 2023 19:55:46 +0000 Subject: [PATCH 8/9] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20po?= =?UTF-8?q?st-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index d9ba07b218..a1a00577d2 100644 --- a/README.md +++ b/README.md @@ -50,7 +50,7 @@ If you are using Maven without the BOM, add this to your dependencies: If you are using Gradle 5.x or later, add this to your dependencies: ```Groovy -implementation platform('com.google.cloud:libraries-bom:26.28.0') +implementation platform('com.google.cloud:libraries-bom:26.29.0') implementation 'com.google.cloud:google-cloud-bigtable' ``` From 4318999259e6417cc27a5f16bf32f89327d6e771 Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Wed, 13 Dec 2023 19:56:52 +0000 Subject: [PATCH 9/9] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20po?= =?UTF-8?q?st-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index d9ba07b218..a1a00577d2 100644 --- a/README.md +++ b/README.md @@ -50,7 +50,7 @@ If you are using Maven without the BOM, add this to your dependencies: If you are using Gradle 5.x or later, add this to your dependencies: ```Groovy -implementation platform('com.google.cloud:libraries-bom:26.28.0') +implementation platform('com.google.cloud:libraries-bom:26.29.0') implementation 'com.google.cloud:google-cloud-bigtable' ```