Skip to content

Commit

Permalink
fix: pass deadline through ExecuteQuery RetrySettings (#2355)
Browse files Browse the repository at this point in the history
Change-Id: I47fbb6761205c8ab34ecfa490954c56cf01b5288
  • Loading branch information
jackdingilian authored Sep 25, 2024
1 parent 0330d77 commit 6bc9820
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1357,11 +1357,21 @@ public Map<String, String> extract(ExecuteQueryRequest executeQueryRequest) {

ServerStreamingCallSettings<ExecuteQueryCallContext, SqlRow> retrySettings =
ServerStreamingCallSettings.<ExecuteQueryCallContext, SqlRow>newBuilder()
// TODO resumption strategy and retry settings
// TODO add resumption strategy and pass through retry settings unchanged
// we pass through retry settings to use the deadlines now but don't
// support retries
.setRetrySettings(
settings
.executeQuerySettings()
.getRetrySettings()
.toBuilder()
// override maxAttempts as a safeguard against changes from user
.setMaxAttempts(1)
.build())
.build();

// Adding RetryingCallable to the callable chain so that client side metrics can be
// measured correctly. Retries are currently disabled.
// measured correctly and deadlines are set. Retries are currently disabled.
ServerStreamingCallable<ExecuteQueryCallContext, SqlRow> retries =
withRetries(withBigtableTracer, retrySettings);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertThrows;

import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.UnavailableException;
import com.google.bigtable.v2.BigtableGrpc;
import com.google.bigtable.v2.ExecuteQueryRequest;
Expand All @@ -35,18 +36,23 @@
import com.google.cloud.bigtable.data.v2.models.sql.Statement;
import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStub;
import com.google.cloud.bigtable.gaxx.testing.FakeStreamingApi.ServerStreamingStashCallable;
import com.google.common.collect.Range;
import io.grpc.Context;
import io.grpc.Deadline;
import io.grpc.Server;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.threeten.bp.Duration;

@RunWith(JUnit4.class)
public class ExecuteQueryCallableTest {
Expand Down Expand Up @@ -108,13 +114,74 @@ public void testExecuteQueryRequestsAreNotRetried() {
assertThat(fakeService.attempts).isEqualTo(1);
}

@Test
public void testExecuteQueryRequestsIgnoreOverriddenMaxAttempts() throws IOException {
BigtableDataSettings.Builder overrideSettings =
BigtableDataSettings.newBuilderForEmulator(server.getPort())
.setProjectId("fake-project")
.setInstanceId("fake-instance");
overrideSettings
.stubSettings()
.executeQuerySettings()
.setRetrySettings(RetrySettings.newBuilder().setMaxAttempts(10).build());
EnhancedBigtableStub overrideStub =
EnhancedBigtableStub.create(overrideSettings.build().getStubSettings());
SqlServerStream stream =
overrideStub.executeQueryCallable().call(Statement.of("SELECT * FROM table"));

Iterator<SqlRow> iterator = stream.rows().iterator();

assertThrows(UnavailableException.class, iterator::next).getCause();
assertThat(fakeService.attempts).isEqualTo(1);
}

@Test
public void testExecuteQueryRequestsSetDefaultDeadline() {
SqlServerStream stream = stub.executeQueryCallable().call(Statement.of("SELECT * FROM table"));
Iterator<SqlRow> iterator = stream.rows().iterator();
// We don't care about this but are reusing the fake service that tests retries
assertThrows(UnavailableException.class, iterator::next).getCause();
// We have 30s default, we assume less than 1s has been burned when the fake service sets it
assertThat(fakeService.deadlineMillisRemaining).isIn(Range.closed(29000L, 30100L));
}

@Test
public void testExecuteQueryRequestsRespectOverridenDeadline() throws IOException {
BigtableDataSettings.Builder overrideSettings =
BigtableDataSettings.newBuilderForEmulator(server.getPort())
.setProjectId("fake-project")
.setInstanceId("fake-instance");
overrideSettings
.stubSettings()
.executeQuerySettings()
.setRetrySettings(
RetrySettings.newBuilder()
.setInitialRpcTimeout(Duration.ofMinutes(5))
.setMaxRpcTimeout(Duration.ofMinutes(5))
.build());
EnhancedBigtableStub overrideDeadline =
EnhancedBigtableStub.create(overrideSettings.build().getStubSettings());
SqlServerStream streamOverride =
overrideDeadline.executeQueryCallable().call(Statement.of("SELECT * FROM table"));
Iterator<SqlRow> overrideIterator = streamOverride.rows().iterator();
// We don't care about this but are reusing the fake service that tests retries
assertThrows(UnavailableException.class, overrideIterator::next).getCause();
// We have 30s default, we assume less than 1s has been burned when the fake service sets it
assertThat(fakeService.deadlineMillisRemaining).isIn(Range.closed(299000L, 300100L));
}

private static class FakeService extends BigtableGrpc.BigtableImplBase {

private int attempts = 0;
private long deadlineMillisRemaining;

@Override
public void executeQuery(
ExecuteQueryRequest request, StreamObserver<ExecuteQueryResponse> responseObserver) {
Deadline deadline = Context.current().getDeadline();
if (deadline != null) {
deadlineMillisRemaining = deadline.timeRemaining(TimeUnit.MILLISECONDS);
}
attempts++;
responseObserver.onNext(metadata(columnMetadata("test", stringType())));
responseObserver.onError(new StatusRuntimeException(Status.UNAVAILABLE));
Expand Down

0 comments on commit 6bc9820

Please sign in to comment.