diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index d0022a1a46..9694384e2f 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -1335,9 +1335,8 @@ public Map extract(ExecuteQueryRequest executeQueryRequest) { ServerStreamingCallable withStatsHeaders = new StatsHeadersServerStreamingCallable<>(base); - ServerStreamingCallSettings innerSettings = + ServerStreamingCallSettings watchdogSettings = ServerStreamingCallSettings.newBuilder() - // TODO resumption strategy and retry settings .setIdleTimeout(settings.executeQuerySettings().getIdleTimeout()) .setWaitTimeout(settings.executeQuerySettings().getWaitTimeout()) .build(); @@ -1345,7 +1344,7 @@ public Map extract(ExecuteQueryRequest executeQueryRequest) { // Watchdog needs to stay above the metadata observer so that watchdog errors // are passed through to the metadata future. ServerStreamingCallable watched = - Callables.watched(withStatsHeaders, innerSettings, clientContext); + Callables.watched(withStatsHeaders, watchdogSettings, clientContext); ServerStreamingCallable withMetadataObserver = new MetadataResolvingCallable(watched); @@ -1356,10 +1355,19 @@ public Map extract(ExecuteQueryRequest executeQueryRequest) { ServerStreamingCallable withBigtableTracer = new BigtableTracerStreamingCallable<>(merging); + ServerStreamingCallSettings retrySettings = + ServerStreamingCallSettings.newBuilder() + // TODO resumption strategy and retry settings + .build(); + + // Adding RetryingCallable to the callable chain so that client side metrics can be + // measured correctly. Retries are currently disabled. + ServerStreamingCallable retries = + withRetries(withBigtableTracer, retrySettings); + SpanName span = getSpanName("ExecuteQuery"); ServerStreamingCallable traced = - new TracedServerStreamingCallable<>( - withBigtableTracer, clientContext.getTracerFactory(), span); + new TracedServerStreamingCallable<>(retries, clientContext.getTracerFactory(), span); return new ExecuteQueryCallable( traced.withDefaultCallContext(clientContext.getDefaultCallContext()), requestContext); diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/sql/ExecuteQueryCallableTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/sql/ExecuteQueryCallableTest.java index 9788e5d55d..38eb70cf13 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/sql/ExecuteQueryCallableTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/sql/ExecuteQueryCallableTest.java @@ -20,15 +20,30 @@ import static com.google.cloud.bigtable.data.v2.stub.sql.SqlProtoFactory.stringType; import static com.google.cloud.bigtable.data.v2.stub.sql.SqlProtoFactory.stringValue; import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertThrows; +import com.google.api.gax.rpc.UnavailableException; +import com.google.bigtable.v2.BigtableGrpc; +import com.google.bigtable.v2.ExecuteQueryRequest; +import com.google.bigtable.v2.ExecuteQueryResponse; +import com.google.cloud.bigtable.data.v2.BigtableDataSettings; +import com.google.cloud.bigtable.data.v2.FakeServiceBuilder; import com.google.cloud.bigtable.data.v2.internal.ProtoResultSetMetadata; import com.google.cloud.bigtable.data.v2.internal.ProtoSqlRow; import com.google.cloud.bigtable.data.v2.internal.RequestContext; import com.google.cloud.bigtable.data.v2.internal.SqlRow; import com.google.cloud.bigtable.data.v2.models.sql.Statement; +import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStub; import com.google.cloud.bigtable.gaxx.testing.FakeStreamingApi.ServerStreamingStashCallable; +import io.grpc.Server; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; +import java.io.IOException; import java.util.Collections; import java.util.Iterator; +import org.junit.After; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -39,6 +54,29 @@ public class ExecuteQueryCallableTest { private static final RequestContext REQUEST_CONTEXT = RequestContext.create("fake-project", "fake-instance", "fake-profile"); + private Server server; + private FakeService fakeService = new FakeService(); + private EnhancedBigtableStub stub; + + @Before + public void setup() throws IOException { + server = FakeServiceBuilder.create(fakeService).start(); + + BigtableDataSettings settings = + BigtableDataSettings.newBuilderForEmulator(server.getPort()) + .setProjectId("fake-project") + .setInstanceId("fake-instance") + .build(); + + stub = EnhancedBigtableStub.create(settings.getStubSettings()); + } + + @After + public void tearDown() { + stub.close(); + server.shutdown(); + } + @Test public void testCallContextAndServerStreamSetup() { SqlRow row = @@ -57,4 +95,29 @@ public void testCallContextAndServerStreamSetup() { assertThat(responseIterator.next()).isEqualTo(row); assertThat(responseIterator.hasNext()).isFalse(); } + + @Test + public void testExecuteQueryRequestsAreNotRetried() { + // TODO: retries for execute query is currently disabled. This test should be + // updated once resumption token is in place. + SqlServerStream stream = stub.executeQueryCallable().call(Statement.of("SELECT * FROM table")); + + Iterator iterator = stream.rows().iterator(); + + assertThrows(UnavailableException.class, iterator::next).getCause(); + assertThat(fakeService.attempts).isEqualTo(1); + } + + private static class FakeService extends BigtableGrpc.BigtableImplBase { + + private int attempts = 0; + + @Override + public void executeQuery( + ExecuteQueryRequest request, StreamObserver responseObserver) { + attempts++; + responseObserver.onNext(metadata(columnMetadata("test", stringType()))); + responseObserver.onError(new StatusRuntimeException(Status.UNAVAILABLE)); + } + } }