Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for awaiting Data Boost #2329

Merged
merged 27 commits into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
1617301
Introduce ConsistencyParams model
djyau Aug 30, 2024
54e6540
Create AwaitConsistencyCallable, a delegate for AwaitReplicationCallable
djyau Sep 4, 2024
d1b4320
Address some PR comments
djyau Sep 4, 2024
febfb02
Remove unused imports from AwaitReplicationCallable
djyau Sep 4, 2024
84a2422
Plumb the Consistency callable through to some places, add some tests
djyau Sep 6, 2024
0f1faec
Add integration test
djyau Sep 6, 2024
20f7809
Rework the ConsistencyRequest model, plumb through RequestContext to …
djyau Sep 6, 2024
2c47294
Fix imports
djyau Sep 6, 2024
ce0a439
Fix more imports, fix some tests
djyau Sep 6, 2024
0327038
Rename some things
djyau Sep 6, 2024
c7fbb27
Add tests for ConsistencyRequest model
djyau Sep 6, 2024
c8ff829
Add newline
djyau Sep 6, 2024
f57b84d
Fix broken test
djyau Sep 6, 2024
86f6d8a
Make request context a final variable in test
djyau Sep 6, 2024
916535f
Get test working using correct expectations
djyau Sep 6, 2024
119ba23
Add a couple of tests for AwaitReplicationCallable
djyau Sep 6, 2024
c16491c
Use RequestContextNoAP class
djyau Sep 9, 2024
09a230f
Make ConsistencyRequest model an AutoValue
djyau Sep 9, 2024
4aa93f2
Fix license year, fix some formatting
djyau Sep 9, 2024
1da2dc7
Run auto formatter
djyau Sep 9, 2024
beadfb0
Rename new RequestContext to TableAdminRequestContext, re run auto fo…
djyau Sep 9, 2024
f774895
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 9, 2024
e263edd
Add license header to ConsistencyRequestTest
djyau Sep 10, 2024
5dbc61a
Merge branch 'feature-await-data-boost-2' of https://github.com/djyau…
djyau Sep 10, 2024
f628eb6
Add EnhancedBigtableTableAdminStub to clirr-ignored-differences
djyau Sep 11, 2024
3614dca
Fix IT tests, skip data boost one for now until we run it concurrently
djyau Sep 12, 2024
1240b3c
Run autoformatter
djyau Sep 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import com.google.cloud.bigtable.admin.v2.internal.NameUtil;
import com.google.cloud.bigtable.admin.v2.models.AuthorizedView;
import com.google.cloud.bigtable.admin.v2.models.Backup;
import com.google.cloud.bigtable.admin.v2.models.ConsistencyRequest;
import com.google.cloud.bigtable.admin.v2.models.CopyBackupRequest;
import com.google.cloud.bigtable.admin.v2.models.CreateAuthorizedViewRequest;
import com.google.cloud.bigtable.admin.v2.models.CreateBackupRequest;
Expand All @@ -61,6 +62,7 @@
import com.google.cloud.bigtable.admin.v2.models.UpdateBackupRequest;
import com.google.cloud.bigtable.admin.v2.models.UpdateTableRequest;
import com.google.cloud.bigtable.admin.v2.stub.EnhancedBigtableTableAdminStub;
import com.google.cloud.bigtable.data.v2.internal.TableAdminRequestContext;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -154,8 +156,10 @@ public static BigtableTableAdminClient create(
/** Constructs an instance of BigtableTableAdminClient with the given settings. */
public static BigtableTableAdminClient create(@Nonnull BigtableTableAdminSettings settings)
throws IOException {
TableAdminRequestContext requestContext =
TableAdminRequestContext.create(settings.getProjectId(), settings.getInstanceId());
EnhancedBigtableTableAdminStub stub =
EnhancedBigtableTableAdminStub.createEnhanced(settings.getStubSettings());
EnhancedBigtableTableAdminStub.createEnhanced(settings.getStubSettings(), requestContext);
return create(settings.getProjectId(), settings.getInstanceId(), stub);
}

Expand Down Expand Up @@ -917,6 +921,11 @@ public void awaitReplication(String tableId) {
stub.awaitReplicationCallable().futureCall(tableName));
}

public void awaitConsistency(ConsistencyRequest consistencyRequest) {
ApiExceptions.callAndTranslateApiException(
stub.awaitConsistencyCallable().futureCall(consistencyRequest));
}

/**
* Creates a backup with the specified configuration.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.admin.v2.models;

import com.google.api.core.InternalApi;
import com.google.auto.value.AutoValue;
import com.google.bigtable.admin.v2.CheckConsistencyRequest;
import com.google.bigtable.admin.v2.DataBoostReadLocalWrites;
import com.google.bigtable.admin.v2.GenerateConsistencyTokenRequest;
import com.google.bigtable.admin.v2.StandardReadRemoteWrites;
import com.google.bigtable.admin.v2.TableName;
import com.google.cloud.bigtable.data.v2.internal.TableAdminRequestContext;
import javax.annotation.Nonnull;

@AutoValue
public abstract class ConsistencyRequest {
@Nonnull
protected abstract String getTableId();

@Nonnull
protected abstract CheckConsistencyRequest.ModeCase getMode();

public static ConsistencyRequest forReplication(String tableId) {
return new AutoValue_ConsistencyRequest(
tableId, CheckConsistencyRequest.ModeCase.STANDARD_READ_REMOTE_WRITES);
}

public static ConsistencyRequest forDataBoost(String tableId) {
return new AutoValue_ConsistencyRequest(
tableId, CheckConsistencyRequest.ModeCase.DATA_BOOST_READ_LOCAL_WRITES);
}

@InternalApi
public CheckConsistencyRequest toCheckConsistencyProto(
TableAdminRequestContext requestContext, String token) {
CheckConsistencyRequest.Builder builder = CheckConsistencyRequest.newBuilder();
TableName tableName =
TableName.of(requestContext.getProjectId(), requestContext.getInstanceId(), getTableId());

if (getMode().equals(CheckConsistencyRequest.ModeCase.STANDARD_READ_REMOTE_WRITES)) {
builder.setStandardReadRemoteWrites(StandardReadRemoteWrites.newBuilder().build());
} else {
builder.setDataBoostReadLocalWrites(DataBoostReadLocalWrites.newBuilder().build());
}

return builder.setName(tableName.toString()).setConsistencyToken(token).build();
}

@InternalApi
public GenerateConsistencyTokenRequest toGenerateTokenProto(
TableAdminRequestContext requestContext) {
GenerateConsistencyTokenRequest.Builder builder = GenerateConsistencyTokenRequest.newBuilder();
TableName tableName =
TableName.of(requestContext.getProjectId(), requestContext.getInstanceId(), getTableId());

return builder.setName(tableName.toString()).build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.admin.v2.stub;

import com.google.api.core.ApiAsyncFunction;
import com.google.api.core.ApiFunction;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.retrying.ExponentialPollAlgorithm;
import com.google.api.gax.retrying.NonCancellableFuture;
import com.google.api.gax.retrying.ResultRetryAlgorithm;
import com.google.api.gax.retrying.RetryAlgorithm;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.retrying.RetryingExecutor;
import com.google.api.gax.retrying.RetryingFuture;
import com.google.api.gax.retrying.ScheduledRetryingExecutor;
import com.google.api.gax.retrying.TimedAttemptSettings;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ClientContext;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.bigtable.admin.v2.CheckConsistencyRequest;
import com.google.bigtable.admin.v2.CheckConsistencyResponse;
import com.google.bigtable.admin.v2.GenerateConsistencyTokenRequest;
import com.google.bigtable.admin.v2.GenerateConsistencyTokenResponse;
import com.google.cloud.bigtable.admin.v2.models.ConsistencyRequest;
import com.google.cloud.bigtable.data.v2.internal.TableAdminRequestContext;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;

/**
* Callable that waits until either replication or Data Boost has caught up to the point it was
* called.
*
* <p>This callable wraps GenerateConsistencyToken and CheckConsistency RPCs. It will generate a
* token then poll until isConsistent is true.
*/
class AwaitConsistencyCallable extends UnaryCallable<ConsistencyRequest, Void> {
private final UnaryCallable<GenerateConsistencyTokenRequest, GenerateConsistencyTokenResponse>
generateCallable;
private final UnaryCallable<CheckConsistencyRequest, CheckConsistencyResponse> checkCallable;
private final RetryingExecutor<CheckConsistencyResponse> executor;

private final TableAdminRequestContext requestContext;

static AwaitConsistencyCallable create(
UnaryCallable<GenerateConsistencyTokenRequest, GenerateConsistencyTokenResponse>
generateCallable,
UnaryCallable<CheckConsistencyRequest, CheckConsistencyResponse> checkCallable,
ClientContext clientContext,
RetrySettings pollingSettings,
TableAdminRequestContext requestContext) {

RetryAlgorithm<CheckConsistencyResponse> retryAlgorithm =
new RetryAlgorithm<>(
new PollResultAlgorithm(),
new ExponentialPollAlgorithm(pollingSettings, clientContext.getClock()));

RetryingExecutor<CheckConsistencyResponse> retryingExecutor =
new ScheduledRetryingExecutor<>(retryAlgorithm, clientContext.getExecutor());

return new AwaitConsistencyCallable(
generateCallable, checkCallable, retryingExecutor, requestContext);
}

@VisibleForTesting
AwaitConsistencyCallable(
UnaryCallable<GenerateConsistencyTokenRequest, GenerateConsistencyTokenResponse>
generateCallable,
UnaryCallable<CheckConsistencyRequest, CheckConsistencyResponse> checkCallable,
RetryingExecutor<CheckConsistencyResponse> executor,
TableAdminRequestContext requestContext) {
this.generateCallable = generateCallable;
this.checkCallable = checkCallable;
this.executor = executor;
this.requestContext = requestContext;
}

@Override
public ApiFuture<Void> futureCall(
final ConsistencyRequest consistencyRequest, final ApiCallContext apiCallContext) {
ApiFuture<GenerateConsistencyTokenResponse> tokenFuture =
generateToken(consistencyRequest.toGenerateTokenProto(requestContext), apiCallContext);

return ApiFutures.transformAsync(
tokenFuture,
new ApiAsyncFunction<GenerateConsistencyTokenResponse, Void>() {
@Override
public ApiFuture<Void> apply(GenerateConsistencyTokenResponse input) {
CheckConsistencyRequest request =
consistencyRequest.toCheckConsistencyProto(
requestContext, input.getConsistencyToken());
return pollToken(request, apiCallContext);
}
},
MoreExecutors.directExecutor());
}

private ApiFuture<GenerateConsistencyTokenResponse> generateToken(
GenerateConsistencyTokenRequest generateRequest, ApiCallContext context) {
return generateCallable.futureCall(generateRequest, context);
}

private ApiFuture<Void> pollToken(CheckConsistencyRequest request, ApiCallContext context) {
AttemptCallable<CheckConsistencyRequest, CheckConsistencyResponse> attemptCallable =
new AttemptCallable<>(checkCallable, request, context);
RetryingFuture<CheckConsistencyResponse> retryingFuture =
executor.createFuture(attemptCallable);
attemptCallable.setExternalFuture(retryingFuture);
attemptCallable.call();

return ApiFutures.transform(
retryingFuture,
new ApiFunction<CheckConsistencyResponse, Void>() {
@Override
public Void apply(CheckConsistencyResponse input) {
return null;
}
},
MoreExecutors.directExecutor());
}

/** A callable representing an attempt to make an RPC call. */
private static class AttemptCallable<RequestT, ResponseT> implements Callable<ResponseT> {
private final UnaryCallable<RequestT, ResponseT> callable;
private final RequestT request;

private volatile RetryingFuture<ResponseT> externalFuture;
private volatile ApiCallContext callContext;

AttemptCallable(
UnaryCallable<RequestT, ResponseT> callable, RequestT request, ApiCallContext callContext) {
this.callable = callable;
this.request = request;
this.callContext = callContext;
}

void setExternalFuture(RetryingFuture<ResponseT> externalFuture) {
this.externalFuture = externalFuture;
}

@Override
public ResponseT call() {
try {
// NOTE: unlike gax's AttemptCallable, this ignores rpc timeouts
externalFuture.setAttemptFuture(new NonCancellableFuture<ResponseT>());
if (externalFuture.isDone()) {
return null;
}
ApiFuture<ResponseT> internalFuture = callable.futureCall(request, callContext);
externalFuture.setAttemptFuture(internalFuture);
} catch (Throwable e) {
externalFuture.setAttemptFuture(ApiFutures.<ResponseT>immediateFailedFuture(e));
}

return null;
}
}

/**
* A polling algorithm for waiting for a consistent {@link CheckConsistencyResponse}. Please note
* that this class doesn't handle retryable errors and expects the underlying callable chain to
* handle this.
*/
private static class PollResultAlgorithm
implements ResultRetryAlgorithm<CheckConsistencyResponse> {
@Override
public TimedAttemptSettings createNextAttempt(
Throwable prevThrowable,
CheckConsistencyResponse prevResponse,
TimedAttemptSettings prevSettings) {
return null;
}

@Override
public boolean shouldRetry(Throwable prevThrowable, CheckConsistencyResponse prevResponse)
throws CancellationException {
return prevResponse != null && !prevResponse.getConsistent();
}
}
}
Loading
Loading