diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index 9694384e2f..17aa382f96 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -1357,11 +1357,21 @@ public Map extract(ExecuteQueryRequest executeQueryRequest) { ServerStreamingCallSettings retrySettings = ServerStreamingCallSettings.newBuilder() - // TODO resumption strategy and retry settings + // TODO add resumption strategy and pass through retry settings unchanged + // we pass through retry settings to use the deadlines now but don't + // support retries + .setRetrySettings( + settings + .executeQuerySettings() + .getRetrySettings() + .toBuilder() + // override maxAttempts as a safeguard against changes from user + .setMaxAttempts(1) + .build()) .build(); // Adding RetryingCallable to the callable chain so that client side metrics can be - // measured correctly. Retries are currently disabled. + // measured correctly and deadlines are set. Retries are currently disabled. ServerStreamingCallable retries = withRetries(withBigtableTracer, retrySettings); diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/sql/ExecuteQueryCallableTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/sql/ExecuteQueryCallableTest.java index 38eb70cf13..14275d3cd8 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/sql/ExecuteQueryCallableTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/sql/ExecuteQueryCallableTest.java @@ -22,6 +22,7 @@ import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertThrows; +import com.google.api.gax.retrying.RetrySettings; import com.google.api.gax.rpc.UnavailableException; import com.google.bigtable.v2.BigtableGrpc; import com.google.bigtable.v2.ExecuteQueryRequest; @@ -35,6 +36,9 @@ import com.google.cloud.bigtable.data.v2.models.sql.Statement; import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStub; import com.google.cloud.bigtable.gaxx.testing.FakeStreamingApi.ServerStreamingStashCallable; +import com.google.common.collect.Range; +import io.grpc.Context; +import io.grpc.Deadline; import io.grpc.Server; import io.grpc.Status; import io.grpc.StatusRuntimeException; @@ -42,11 +46,13 @@ import java.io.IOException; import java.util.Collections; import java.util.Iterator; +import java.util.concurrent.TimeUnit; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.threeten.bp.Duration; @RunWith(JUnit4.class) public class ExecuteQueryCallableTest { @@ -108,13 +114,74 @@ public void testExecuteQueryRequestsAreNotRetried() { assertThat(fakeService.attempts).isEqualTo(1); } + @Test + public void testExecuteQueryRequestsIgnoreOverriddenMaxAttempts() throws IOException { + BigtableDataSettings.Builder overrideSettings = + BigtableDataSettings.newBuilderForEmulator(server.getPort()) + .setProjectId("fake-project") + .setInstanceId("fake-instance"); + overrideSettings + .stubSettings() + .executeQuerySettings() + .setRetrySettings(RetrySettings.newBuilder().setMaxAttempts(10).build()); + EnhancedBigtableStub overrideStub = + EnhancedBigtableStub.create(overrideSettings.build().getStubSettings()); + SqlServerStream stream = + overrideStub.executeQueryCallable().call(Statement.of("SELECT * FROM table")); + + Iterator iterator = stream.rows().iterator(); + + assertThrows(UnavailableException.class, iterator::next).getCause(); + assertThat(fakeService.attempts).isEqualTo(1); + } + + @Test + public void testExecuteQueryRequestsSetDefaultDeadline() { + SqlServerStream stream = stub.executeQueryCallable().call(Statement.of("SELECT * FROM table")); + Iterator iterator = stream.rows().iterator(); + // We don't care about this but are reusing the fake service that tests retries + assertThrows(UnavailableException.class, iterator::next).getCause(); + // We have 30s default, we assume less than 1s has been burned when the fake service sets it + assertThat(fakeService.deadlineMillisRemaining).isIn(Range.closed(29000L, 30100L)); + } + + @Test + public void testExecuteQueryRequestsRespectOverridenDeadline() throws IOException { + BigtableDataSettings.Builder overrideSettings = + BigtableDataSettings.newBuilderForEmulator(server.getPort()) + .setProjectId("fake-project") + .setInstanceId("fake-instance"); + overrideSettings + .stubSettings() + .executeQuerySettings() + .setRetrySettings( + RetrySettings.newBuilder() + .setInitialRpcTimeout(Duration.ofMinutes(5)) + .setMaxRpcTimeout(Duration.ofMinutes(5)) + .build()); + EnhancedBigtableStub overrideDeadline = + EnhancedBigtableStub.create(overrideSettings.build().getStubSettings()); + SqlServerStream streamOverride = + overrideDeadline.executeQueryCallable().call(Statement.of("SELECT * FROM table")); + Iterator overrideIterator = streamOverride.rows().iterator(); + // We don't care about this but are reusing the fake service that tests retries + assertThrows(UnavailableException.class, overrideIterator::next).getCause(); + // We have 30s default, we assume less than 1s has been burned when the fake service sets it + assertThat(fakeService.deadlineMillisRemaining).isIn(Range.closed(299000L, 300100L)); + } + private static class FakeService extends BigtableGrpc.BigtableImplBase { private int attempts = 0; + private long deadlineMillisRemaining; @Override public void executeQuery( ExecuteQueryRequest request, StreamObserver responseObserver) { + Deadline deadline = Context.current().getDeadline(); + if (deadline != null) { + deadlineMillisRemaining = deadline.timeRemaining(TimeUnit.MILLISECONDS); + } attempts++; responseObserver.onNext(metadata(columnMetadata("test", stringType()))); responseObserver.onError(new StatusRuntimeException(Status.UNAVAILABLE));