Otherwise it returns the result of {@link #shouldRetry(Throwable, Object)}.
+ */
@Override
- public boolean shouldRetry(Throwable prevThrowable, ResponseT prevResponse) {
- return (prevThrowable instanceof ApiException) && ((ApiException) prevThrowable).isRetryable();
+ public boolean shouldRetry(
+ RetryingContext context, Throwable previousThrowable, ResponseT previousResponse) {
+ if (context.getRetryableCodes() != null) {
+ // Ignore the isRetryable() value of the throwable if the RetryingContext has a specific list
+ // of codes that should be retried.
+ return (previousThrowable instanceof ApiException)
+ && context
+ .getRetryableCodes()
+ .contains(((ApiException) previousThrowable).getStatusCode().getCode());
+ }
+ return shouldRetry(previousThrowable, previousResponse);
}
}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/AttemptCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/AttemptCallable.java
new file mode 100644
index 0000000000..3599e1e4df
--- /dev/null
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/AttemptCallable.java
@@ -0,0 +1,84 @@
+/*
+ * Copyright 2023 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.gaxx.retrying;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutures;
+import com.google.api.core.InternalApi;
+import com.google.api.gax.retrying.NonCancellableFuture;
+import com.google.api.gax.retrying.RetryingFuture;
+import com.google.api.gax.rpc.ApiCallContext;
+import com.google.api.gax.rpc.UnaryCallable;
+import com.google.common.base.Preconditions;
+import java.util.concurrent.Callable;
+import org.threeten.bp.Duration;
+
+// TODO: remove this once ApiResultRetryAlgorithm is added to gax.
+/**
+ * A callable representing an attempt to make an RPC call. This class is used from {@link
+ * RetryingCallable}.
+ *
+ * @param The callable objects wrap a given direct callable with features like retry and exception
+ * translation.
+ */
+@InternalApi
+public class Callables {
+
+ private Callables() {}
+
+ public static Wraps a request, a {@link ResponseObserver} and an inner {@link ServerStreamingCallable} and
+ * coordinates retries between them. When the inner callable throws an error, this class will
+ * schedule retries using the configured {@link ScheduledRetryingExecutor}.
+ *
+ * Streams can be resumed using a {@link StreamResumptionStrategy}. The {@link
+ * StreamResumptionStrategy} is notified of incoming responses and is expected to track the progress
+ * of the stream. Upon receiving an error, the {@link StreamResumptionStrategy} is asked to modify
+ * the original request to resume the stream.
+ */
+@InternalApi
+public final class RetryingServerStreamingCallable The lifecycle of this class is:
+ *
+ * This class is meant to be used as middleware between an outer {@link ResponseObserver} and an
+ * inner {@link ServerStreamingCallable}. As such it follows the general threading model of {@link
+ * ServerStreamingCallable}s:
+ *
+ * With this model in mind, this class only needs to synchronize access data that is shared
+ * between: the outer {@link ResponseObserver} (via this class' {@link StreamController}) and the
+ * inner {@link ServerStreamingCallable}: pendingRequests, cancellationCause and the current
+ * innerController.
+ *
+ * @param This method expects to be called by one thread at a time. Furthermore, it expects that the
+ * current RPC finished before the next time it's called.
+ */
+ @Override
+ public Void call() {
+ Preconditions.checkState(isStarted, "Must be started first");
+
+ RequestT request =
+ (++numAttempts == 1) ? initialRequest : resumptionStrategy.getResumeRequest(initialRequest);
+
+ // Should never happen. onAttemptError will check if ResumptionStrategy can create a resume
+ // request,
+ // which the RetryingFuture/StreamResumptionStrategy should respect.
+ Preconditions.checkState(request != null, "ResumptionStrategy returned a null request.");
+
+ innerAttemptFuture = SettableApiFuture.create();
+ seenSuccessSinceLastError = false;
+
+ ApiCallContext attemptContext = context;
+
+ if (!outerRetryingFuture.getAttemptSettings().getRpcTimeout().isZero()
+ && attemptContext.getTimeout() == null) {
+ attemptContext =
+ attemptContext.withTimeout(outerRetryingFuture.getAttemptSettings().getRpcTimeout());
+ }
+
+ attemptContext
+ .getTracer()
+ .attemptStarted(request, outerRetryingFuture.getAttemptSettings().getOverallAttemptCount());
+
+ innerCallable.call(
+ request,
+ new StateCheckingResponseObserver
+ *
+ *
+ *
+ *
+ *
+ *