Skip to content

Commit

Permalink
update tests and handling of retry cookie
Browse files Browse the repository at this point in the history
  • Loading branch information
mutianf committed Nov 9, 2023
1 parent ba373a7 commit be27ca8
Show file tree
Hide file tree
Showing 3 changed files with 208 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,38 +17,47 @@

import io.grpc.CallOptions;
import io.grpc.Metadata;
import javax.annotation.Nullable;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/** A cookie that holds information for the retry */
/** A cookie that holds information for retry or routing */
class CookiesHolder {

static final CallOptions.Key<CookiesHolder> COOKIES_HOLDER_KEY =
CallOptions.Key.create("bigtable-cookies");

static final String ROUTING_COOKIE_KEY = "x-goog-cbt-cookie-routing";
/** Routing cookie key prefix. */
static final String COOKIE_KEY_PREFIX = "x-goog-cbt-cookie";

static final Metadata.Key<String> ROUTING_COOKIE_METADATA_KEY =
Metadata.Key.of(ROUTING_COOKIE_KEY, Metadata.ASCII_STRING_MARSHALLER);

@Nullable private String routingCookie;
/** A map that stores all the routing cookies. */
private final Map<Metadata.Key<String>, String> cookies = new ConcurrentHashMap<>();

/** Returns CookiesHolder if presents in CallOptions. Otherwise returns null. */
static CookiesHolder fromCallOptions(CallOptions options) {
return options.getOption(COOKIES_HOLDER_KEY);
}

/** Adds routing cookie to header if routing cookie is not null. */
/** Add all the routing cookies to headers if any. */
Metadata addRoutingCookieToHeaders(Metadata headers) {
if (headers != null && routingCookie != null) {
headers.put(ROUTING_COOKIE_METADATA_KEY, routingCookie);
if (headers != null && !cookies.isEmpty()) {
for (Metadata.Key<String> key : cookies.keySet()) headers.put(key, cookies.get(key));
}
return headers;
}

/** Set the routing cookie from trailers to this CookiesHolder. */
/**
* Iterate through all the keys in trailing metadata, and add all the keys that match
* COOKIE_KEY_PREFIX to cookies.
*/
void setRoutingCookieFromTrailers(Metadata trailers) {
if (trailers != null && trailers.containsKey(ROUTING_COOKIE_METADATA_KEY)) {
this.routingCookie = trailers.get(ROUTING_COOKIE_METADATA_KEY);
if (trailers != null) {
for (String key : trailers.keys()) {
if (key.startsWith(COOKIE_KEY_PREFIX)) {
Metadata.Key<String> metadataKey = Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER);
String value = trailers.get(metadataKey);
cookies.put(metadataKey, value);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,9 @@ private UnaryCallable<BulkMutation, Void> createBulkMutateRowsCallable() {
new TracedUnaryCallable<>(
tracedBatcherUnaryCallable, clientContext.getTracerFactory(), spanName);

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
UnaryCallable<BulkMutation, Void> withCookie = new CookiesUnaryCallable<>(traced);

return withCookie.withDefaultCallContext(clientContext.getDefaultCallContext());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package com.google.cloud.bigtable.data.v2.stub;

import static com.google.cloud.bigtable.data.v2.stub.CookiesHolder.ROUTING_COOKIE_METADATA_KEY;
import static com.google.common.truth.Truth.assertThat;

import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
Expand All @@ -31,7 +30,10 @@
import com.google.cloud.bigtable.data.v2.BigtableDataClient;
import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
import com.google.cloud.bigtable.data.v2.FakeServiceBuilder;
import com.google.cloud.bigtable.data.v2.models.BulkMutation;
import com.google.cloud.bigtable.data.v2.models.Query;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.cloud.bigtable.data.v2.models.RowMutationEntry;
import com.google.common.collect.ImmutableList;
import io.grpc.Metadata;
import io.grpc.Server;
Expand All @@ -58,6 +60,13 @@ public class CookieHolderTest {
private List<Metadata> serverMetadata = new ArrayList<>();
private String testCookie = "test-routing-cookie";

private Metadata.Key<String> ROUTING_COOKIE_1 =
Metadata.Key.of("x-goog-cbt-cookie-routing", Metadata.ASCII_STRING_MARSHALLER);
private Metadata.Key<String> ROUTING_COOKIE_2 =
Metadata.Key.of("x-goog-cbt-cookie-random", Metadata.ASCII_STRING_MARSHALLER);
private Metadata.Key<String> BAD_KEY =
Metadata.Key.of("x-goog-cbt-not-cookie", Metadata.ASCII_STRING_MARSHALLER);

@Before
public void setup() throws Exception {
ServerInterceptor serverInterceptor =
Expand Down Expand Up @@ -99,18 +108,77 @@ public void testReadRows() {

assertThat(fakeService.count.get()).isGreaterThan(1);
assertThat(serverMetadata.size()).isEqualTo(fakeService.count.get());
String bytes = serverMetadata.get(1).get(ROUTING_COOKIE_METADATA_KEY);
assertThat(bytes).isNotNull();
assertThat(bytes).isEqualTo(testCookie);
String bytes1 = serverMetadata.get(1).get(ROUTING_COOKIE_1);
String bytes2 = serverMetadata.get(1).get(ROUTING_COOKIE_2);
assertThat(bytes1).isNotNull();
assertThat(bytes1).isEqualTo("readRows");
assertThat(bytes2).isNotNull();
assertThat(bytes2).isEqualTo(testCookie);

// make sure bad key is not added
assertThat(serverMetadata.get(1).get(BAD_KEY)).isNull();

serverMetadata.clear();
}

@Test
public void testMutateRows() {}
public void testReadRow() {
client.readRow("fake-table", "key");

assertThat(fakeService.count.get()).isGreaterThan(1);
assertThat(serverMetadata.size()).isEqualTo(fakeService.count.get());
String bytes1 = serverMetadata.get(1).get(ROUTING_COOKIE_1);
String bytes2 = serverMetadata.get(1).get(ROUTING_COOKIE_2);
assertThat(bytes1).isNotNull();
assertThat(bytes1).isEqualTo("readRows");
assertThat(bytes2).isNotNull();
assertThat(bytes2).isEqualTo(testCookie);

// make sure bad key is not added
assertThat(serverMetadata.get(1).get(BAD_KEY)).isNull();

serverMetadata.clear();
}

@Test
public void testMutateRow() {}
public void testMutateRows() {
client.bulkMutateRows(
BulkMutation.create("fake-table")
.add(RowMutationEntry.create("key").setCell("cf", "q", "v")));

assertThat(fakeService.count.get()).isGreaterThan(1);
assertThat(serverMetadata.size()).isEqualTo(fakeService.count.get());
String bytes1 = serverMetadata.get(1).get(ROUTING_COOKIE_1);
String bytes2 = serverMetadata.get(1).get(ROUTING_COOKIE_2);
assertThat(bytes1).isNotNull();
assertThat(bytes1).isEqualTo("mutateRows");
assertThat(bytes2).isNotNull();
assertThat(bytes2).isEqualTo(testCookie);

// make sure bad key is not added
assertThat(serverMetadata.get(1).get(BAD_KEY)).isNull();

serverMetadata.clear();
}

@Test
public void testMutateRow() {
client.mutateRow(RowMutation.create("table", "key").setCell("cf", "q", "v"));

assertThat(fakeService.count.get()).isGreaterThan(1);
assertThat(serverMetadata.size()).isEqualTo(fakeService.count.get());
String bytes1 = serverMetadata.get(1).get(ROUTING_COOKIE_1);
String bytes2 = serverMetadata.get(1).get(ROUTING_COOKIE_2);
assertThat(bytes1).isNotNull();
assertThat(bytes1).isEqualTo("mutateRow");
assertThat(bytes2).isNotNull();
assertThat(bytes2).isEqualTo(testCookie);

// make sure bad key is not added
assertThat(serverMetadata.get(1).get(BAD_KEY)).isNull();

serverMetadata.clear();
}

@Test
public void testSampleRowKeys() {
Expand All @@ -119,10 +187,90 @@ public void testSampleRowKeys() {

assertThat(fakeService.count.get()).isGreaterThan(1);
assertThat(serverMetadata.size()).isEqualTo(fakeService.count.get());
String bytes = serverMetadata.get(1).get(ROUTING_COOKIE_METADATA_KEY);
String bytes1 = serverMetadata.get(1).get(ROUTING_COOKIE_1);
String bytes2 = serverMetadata.get(1).get(ROUTING_COOKIE_2);
assertThat(bytes1).isNotNull();
assertThat(bytes1).isEqualTo("sampleRowKeys");
assertThat(bytes2).isNotNull();
assertThat(bytes2).isEqualTo(testCookie);

// make sure bad key is not added
assertThat(serverMetadata.get(1).get(BAD_KEY)).isNull();

serverMetadata.clear();
}

@Test
public void testNoCookieSucceedReadRows() {
fakeService.returnCookie = false;

client.readRows(Query.create("fake-table")).iterator().hasNext();

assertThat(fakeService.count.get()).isGreaterThan(1);
assertThat(serverMetadata.size()).isEqualTo(fakeService.count.get());

assertThat(serverMetadata.get(1).get(ROUTING_COOKIE_1)).isNull();
assertThat(serverMetadata.get(1).get(ROUTING_COOKIE_2)).isNull();
serverMetadata.clear();
}

@Test
public void testNoCookieSucceedReadRow() {
fakeService.returnCookie = false;

client.readRow("fake-table", "key");

assertThat(fakeService.count.get()).isGreaterThan(1);
assertThat(serverMetadata.size()).isEqualTo(fakeService.count.get());

assertThat(serverMetadata.get(1).get(ROUTING_COOKIE_1)).isNull();
assertThat(serverMetadata.get(1).get(ROUTING_COOKIE_2)).isNull();
serverMetadata.clear();
}

@Test
public void testNoCookieSucceedMutateRows() {
fakeService.returnCookie = false;

client.bulkMutateRows(
BulkMutation.create("fake-table")
.add(RowMutationEntry.create("key").setCell("cf", "q", "v")));

assertThat(fakeService.count.get()).isGreaterThan(1);
assertThat(serverMetadata.size()).isEqualTo(fakeService.count.get());

assertThat(serverMetadata.get(1).get(ROUTING_COOKIE_1)).isNull();
assertThat(serverMetadata.get(1).get(ROUTING_COOKIE_2)).isNull();

serverMetadata.clear();
}

@Test
public void testNoCookieSucceedMutateRow() {
fakeService.returnCookie = false;

client.mutateRow(RowMutation.create("fake-table", "key").setCell("cf", "q", "v"));

assertThat(fakeService.count.get()).isGreaterThan(1);
assertThat(serverMetadata.size()).isEqualTo(fakeService.count.get());

assertThat(serverMetadata.get(1).get(ROUTING_COOKIE_1)).isNull();
assertThat(serverMetadata.get(1).get(ROUTING_COOKIE_2)).isNull();

serverMetadata.clear();
}

@Test
public void testNoCookieSucceedSampleRowKeys() {
fakeService.returnCookie = false;

client.sampleRowKeys("fake-table");

assertThat(fakeService.count.get()).isGreaterThan(1);
assertThat(serverMetadata.size()).isEqualTo(fakeService.count.get());

assertThat(bytes).isNotNull();
assertThat(bytes).isEqualTo("sampleRowKeys");
assertThat(serverMetadata.get(1).get(ROUTING_COOKIE_1)).isNull();
assertThat(serverMetadata.get(1).get(ROUTING_COOKIE_2)).isNull();

serverMetadata.clear();
}
Expand All @@ -135,14 +283,19 @@ public void tearDown() throws Exception {

class FakeService extends BigtableGrpc.BigtableImplBase {

private AtomicInteger count = new AtomicInteger();
private boolean returnCookie = true;
private final AtomicInteger count = new AtomicInteger();

@Override
public void readRows(
ReadRowsRequest request, StreamObserver<ReadRowsResponse> responseObserver) {
if (count.getAndIncrement() < 1) {
Metadata trailers = new Metadata();
trailers.put(ROUTING_COOKIE_METADATA_KEY, "readRows");
if (returnCookie) {
trailers.put(ROUTING_COOKIE_1, "readRows");
trailers.put(ROUTING_COOKIE_2, testCookie);
trailers.put(BAD_KEY, "bad-key");
}
StatusRuntimeException exception = new StatusRuntimeException(Status.UNAVAILABLE, trailers);
responseObserver.onError(exception);
return;
Expand All @@ -156,7 +309,11 @@ public void mutateRow(
MutateRowRequest request, StreamObserver<MutateRowResponse> responseObserver) {
if (count.getAndIncrement() < 1) {
Metadata trailers = new Metadata();
trailers.put(ROUTING_COOKIE_METADATA_KEY, "mutateRow");
if (returnCookie) {
trailers.put(ROUTING_COOKIE_1, "mutateRow");
trailers.put(ROUTING_COOKIE_2, testCookie);
trailers.put(BAD_KEY, "bad-key");
}
StatusRuntimeException exception = new StatusRuntimeException(Status.UNAVAILABLE, trailers);
responseObserver.onError(exception);
return;
Expand All @@ -170,12 +327,19 @@ public void mutateRows(
MutateRowsRequest request, StreamObserver<MutateRowsResponse> responseObserver) {
if (count.getAndIncrement() < 1) {
Metadata trailers = new Metadata();
trailers.put(ROUTING_COOKIE_METADATA_KEY, "mutateRows");
if (returnCookie) {
trailers.put(ROUTING_COOKIE_1, "mutateRows");
trailers.put(ROUTING_COOKIE_2, testCookie);
trailers.put(BAD_KEY, "bad-key");
}
StatusRuntimeException exception = new StatusRuntimeException(Status.UNAVAILABLE, trailers);
responseObserver.onError(exception);
return;
}
responseObserver.onNext(MutateRowsResponse.getDefaultInstance());
responseObserver.onNext(
MutateRowsResponse.newBuilder()
.addEntries(MutateRowsResponse.Entry.getDefaultInstance())
.build());
responseObserver.onCompleted();
}

Expand All @@ -184,7 +348,11 @@ public void sampleRowKeys(
SampleRowKeysRequest request, StreamObserver<SampleRowKeysResponse> responseObserver) {
if (count.getAndIncrement() < 1) {
Metadata trailers = new Metadata();
trailers.put(ROUTING_COOKIE_METADATA_KEY, "sampleRowKeys");
if (returnCookie) {
trailers.put(ROUTING_COOKIE_1, "sampleRowKeys");
trailers.put(ROUTING_COOKIE_2, testCookie);
trailers.put(BAD_KEY, "bad-key");
}
StatusRuntimeException exception = new StatusRuntimeException(Status.UNAVAILABLE, trailers);
responseObserver.onError(exception);
return;
Expand Down

0 comments on commit be27ca8

Please sign in to comment.