Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
mutianf committed Nov 16, 2023
1 parent 1fa268d commit 91989cd
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.grpc.Metadata;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nullable;

/** A cookie that holds information for retry or routing */
class CookiesHolder {
Expand All @@ -32,8 +33,13 @@ class CookiesHolder {
/** A map that stores all the routing cookies. */
private final Map<Metadata.Key<String>, String> cookies = new HashMap<>();

/** Returns CookiesHolder if presents in CallOptions. Otherwise returns null. */
/** Returns CookiesHolder if presents in CallOptions, otherwise returns null. */
@Nullable
static CookiesHolder fromCallOptions(CallOptions options) {
// CookiesHolder should be added by CookiesServerStreamingCallable and
// CookiesUnaryCallable for most methods. However, methods like PingAndWarm
// or ReadChangeStream doesn't support routing cookie, in which case this
// method will return null.
return options.getOption(COOKIES_HOLDER_KEY);
}

Expand All @@ -49,7 +55,7 @@ Metadata injectCookiesInRequestHeaders(Metadata headers) {
* Iterate through all the keys in trailing metadata, and add all the keys that match
* COOKIE_KEY_PREFIX to cookies.
*/
void extractCookiesFromResponseTrailers(Metadata trailers) {
void extractCookiesFromResponseTrailers(@Nullable Metadata trailers) {
if (trailers == null) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import java.util.logging.Logger;

/**
* A cookie interceptor that checks the cookie value from returned trailer, updates the cookie
* holder, and inject it in the header of the next request.
*/
class CookiesInterceptor implements ClientInterceptor {

private static final Logger LOG = Logger.getLogger(CookiesInterceptor.class.getName());

@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> methodDescriptor, CallOptions callOptions, Channel channel) {
Expand All @@ -41,12 +44,17 @@ public void start(Listener<RespT> responseListener, Metadata headers) {
// Gets the CookiesHolder added from CookiesServerStreamingCallable and
// CookiesUnaryCallable.
// Add CookiesHolder content to request headers if there's any.
CookiesHolder cookie = CookiesHolder.fromCallOptions(callOptions);
if (cookie != null) {
cookie.injectCookiesInRequestHeaders(headers);
responseListener = new UpdateCookieListener<>(responseListener, cookie);
try {
CookiesHolder cookie = CookiesHolder.fromCallOptions(callOptions);
if (cookie != null) {
cookie.injectCookiesInRequestHeaders(headers);
responseListener = new UpdateCookieListener<>(responseListener, cookie);
}
} catch (Throwable e) {
LOG.warning("Failed to inject cookie to request headers: " + e);
} finally {
super.start(responseListener, headers);
}
super.start(responseListener, headers);
}
};
}
Expand All @@ -64,8 +72,13 @@ static class UpdateCookieListener<RespT>

@Override
public void onClose(Status status, Metadata trailers) {
cookie.extractCookiesFromResponseTrailers(trailers);
super.onClose(status, trailers);
try {
cookie.extractCookiesFromResponseTrailers(trailers);
} catch (Throwable e) {
LOG.warning("Failed to extract cookie from response trailers: " + e);
} finally {
super.onClose(status, trailers);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
.setProjectId("fake-project")
.setInstanceId("fake-instance");

// Override CheckAndMutate and ReadModifyWrite retry settings here. These operations
// are currently not retryable but this could change in the future after we
// have routing cookie sends back meaningful information and changes how retry works.
// Routing cookie still needs to be respected and handled by the callables.
settings
.stubSettings()
.checkAndMutateRowSettings()
Expand Down Expand Up @@ -306,6 +310,10 @@ public void testNoCookieSucceedSampleRowKeys() {

@Test
public void testAllMethodsAreCalled() {
// This test ensures that all methods respect the retry cookie except for the ones that are
// explicitly added to the methods list. It requires that any newly method is exercised in this
// test.
// This is enforced by introspecting grpc method descriptors.
client.readRows(Query.create("fake-table")).iterator().hasNext();
fakeService.count.set(0);
client.mutateRow(RowMutation.create("fake-table", "key").setCell("cf", "q", "v"));
Expand Down Expand Up @@ -351,11 +359,7 @@ public void readRows(
ReadRowsRequest request, StreamObserver<ReadRowsResponse> responseObserver) {
if (count.getAndIncrement() < 1) {
Metadata trailers = new Metadata();
if (returnCookie) {
trailers.put(ROUTING_COOKIE_1, "readRows");
trailers.put(ROUTING_COOKIE_2, testCookie);
trailers.put(BAD_KEY, "bad-key");
}
maybePopulateCookie(trailers, "readRows");
StatusRuntimeException exception = new StatusRuntimeException(Status.UNAVAILABLE, trailers);
responseObserver.onError(exception);
return;
Expand All @@ -369,11 +373,7 @@ public void mutateRow(
MutateRowRequest request, StreamObserver<MutateRowResponse> responseObserver) {
if (count.getAndIncrement() < 1) {
Metadata trailers = new Metadata();
if (returnCookie) {
trailers.put(ROUTING_COOKIE_1, "mutateRow");
trailers.put(ROUTING_COOKIE_2, testCookie);
trailers.put(BAD_KEY, "bad-key");
}
maybePopulateCookie(trailers, "mutateRow");
StatusRuntimeException exception = new StatusRuntimeException(Status.UNAVAILABLE, trailers);
responseObserver.onError(exception);
return;
Expand All @@ -387,11 +387,7 @@ public void mutateRows(
MutateRowsRequest request, StreamObserver<MutateRowsResponse> responseObserver) {
if (count.getAndIncrement() < 1) {
Metadata trailers = new Metadata();
if (returnCookie) {
trailers.put(ROUTING_COOKIE_1, "mutateRows");
trailers.put(ROUTING_COOKIE_2, testCookie);
trailers.put(BAD_KEY, "bad-key");
}
maybePopulateCookie(trailers, "mutateRows");
StatusRuntimeException exception = new StatusRuntimeException(Status.UNAVAILABLE, trailers);
responseObserver.onError(exception);
return;
Expand All @@ -408,11 +404,7 @@ public void sampleRowKeys(
SampleRowKeysRequest request, StreamObserver<SampleRowKeysResponse> responseObserver) {
if (count.getAndIncrement() < 1) {
Metadata trailers = new Metadata();
if (returnCookie) {
trailers.put(ROUTING_COOKIE_1, "sampleRowKeys");
trailers.put(ROUTING_COOKIE_2, testCookie);
trailers.put(BAD_KEY, "bad-key");
}
maybePopulateCookie(trailers, "sampleRowKeys");
StatusRuntimeException exception = new StatusRuntimeException(Status.UNAVAILABLE, trailers);
responseObserver.onError(exception);
return;
Expand All @@ -427,11 +419,7 @@ public void checkAndMutateRow(
StreamObserver<CheckAndMutateRowResponse> responseObserver) {
if (count.getAndIncrement() < 1) {
Metadata trailers = new Metadata();
if (returnCookie) {
trailers.put(ROUTING_COOKIE_1, "checkAndMutate");
trailers.put(ROUTING_COOKIE_2, testCookie);
trailers.put(BAD_KEY, "bad-key");
}
maybePopulateCookie(trailers, "checkAndMutate");
StatusRuntimeException exception = new StatusRuntimeException(Status.UNAVAILABLE, trailers);
responseObserver.onError(exception);
return;
Expand All @@ -446,17 +434,21 @@ public void readModifyWriteRow(
StreamObserver<ReadModifyWriteRowResponse> responseObserver) {
if (count.getAndIncrement() < 1) {
Metadata trailers = new Metadata();
if (returnCookie) {
trailers.put(ROUTING_COOKIE_1, "readModifyWrite");
trailers.put(ROUTING_COOKIE_2, testCookie);
trailers.put(BAD_KEY, "bad-key");
}
maybePopulateCookie(trailers, "readModifyWrite");
StatusRuntimeException exception = new StatusRuntimeException(Status.UNAVAILABLE, trailers);
responseObserver.onError(exception);
return;
}
responseObserver.onNext(ReadModifyWriteRowResponse.getDefaultInstance());
responseObserver.onCompleted();
}

private void maybePopulateCookie(Metadata trailers, String label) {
if (returnCookie) {
trailers.put(ROUTING_COOKIE_1, label);
trailers.put(ROUTING_COOKIE_2, testCookie);
trailers.put(BAD_KEY, "bad-key");
}
}
}
}

0 comments on commit 91989cd

Please sign in to comment.