Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GrpcServiceBridgeImpl handle HttpClosedException #102

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.grpc.ServerServiceDefinition;
import io.grpc.Status;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpClosedException;
import io.vertx.core.net.SocketAddress;
import io.vertx.grpc.common.GrpcError;
import io.vertx.grpc.common.GrpcStatus;
Expand Down Expand Up @@ -152,6 +153,11 @@ void init(ServerCall.Listener<Req> listener) {
listener.onCancel();
}
});
req.exceptionHandler(throwable -> {
if (throwable instanceof HttpClosedException && !closed) {
listener.onCancel();
}
});
readAdapter.init(req, new BridgeMessageDecoder<>(methodDef.getMethodDescriptor().getRequestMarshaller(), decompressor));
writeAdapter.init(req.response(), new BridgeMessageEncoder<>(methodDef.getMethodDescriptor().getResponseMarshaller(), compressor));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,29 @@
import io.grpc.examples.streaming.Empty;
import io.grpc.examples.streaming.Item;
import io.grpc.examples.streaming.StreamingGrpc;
import io.grpc.protobuf.StatusProto;
import io.grpc.examples.streaming.StreamingGrpc.StreamingImplBase;
import io.grpc.protobuf.StatusProto;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetServer;
import io.vertx.core.net.NetSocket;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.grpc.server.GrpcServer;
import io.vertx.grpc.server.GrpcServiceBridge;
import org.junit.Ignore;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;

import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -475,4 +491,128 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, Re
HelloReply res = stub.sayHello(request);
should.assertEquals(1, testAttributesStep.get());
}

@Test
public void testCallNetworkInterrupted(TestContext should) throws InterruptedException, IOException {
AtomicInteger requestCount = new AtomicInteger();
Promise<Void> completed = Promise.promise();
Async async = should.async();

StreamingGrpc.StreamingImplBase impl = new StreamingImplBase() {
@Override
public StreamObserver<Item> pipe(StreamObserver<Item> responseObserver) {
return new StreamObserver<>() {
@Override
public void onNext(Item item) {
requestCount.incrementAndGet();
}

@Override
public void onError(Throwable throwable) {
completed.fail(throwable);
async.complete();
}

@Override
public void onCompleted() {
completed.complete();
async.complete();
}
};
}
};

GrpcServer server = GrpcServer.server(vertx);
GrpcServiceBridge serverStub = GrpcServiceBridge.bridge(impl);
serverStub.bind(server);
startServer(server);

try (var proxyServer = new ProxyServer(vertx, port + 1, port)) {
proxyServer.start();

int proxyPort = proxyServer.proxyServer.actualPort();
Channel channel = ManagedChannelBuilder.forAddress("localhost", proxyPort).usePlaintext().build();
StreamingGrpc.StreamingStub stub = StreamingGrpc.newStub(channel);
StreamObserver<Item> requestObserver = stub.pipe(new NoopStreamObserver<>());
Item request = Item.newBuilder().setValue("item").build();
requestObserver.onNext(request);
requestObserver.onNext(request);
requestObserver.onNext(request);

// waiting for the connection to be established.
Thread.sleep(1000);
}

async.await(20_000);

should.assertEquals(requestCount.get(), 3);
should.assertTrue(completed.future().failed());
}

static class NoopStreamObserver<T> implements StreamObserver<T> {
@Override public void onNext(T ignored) {}

@Override public void onError(Throwable ignored) {}

@Override public void onCompleted() {}
}

static class ProxyServer implements AutoCloseable {

private final int listenPort;

private final int targetPort;

private final NetServer proxyServer;

private final NetClient proxyClient;

// live or dead
private final List<Map.Entry<NetSocket, NetSocket>> sockets = new ArrayList<>();

ProxyServer(Vertx vertx, int listenPort, int targetPort) {
this.listenPort = listenPort;
this.targetPort = targetPort;
this.proxyServer = vertx.createNetServer().connectHandler(this::handle);
this.proxyClient = vertx.createNetClient();
}

void start() {
this.proxyServer.listen(listenPort).toCompletionStage().toCompletableFuture().join();
}

void handle(NetSocket socket) {
socket.pause();

proxyClient.connect(targetPort, "localhost")
.onComplete(ar -> {
if (ar.succeeded()) {
NetSocket proxySocket = ar.result();
proxySocket.pause();

socket.handler(proxySocket::write);
proxySocket.handler(socket::write);
socket.closeHandler(ignored -> proxySocket.close());
proxySocket.closeHandler(ignored -> socket.close());

sockets.add(Map.entry(socket, proxySocket));

proxySocket.resume();
socket.resume();
} else {
socket.close();
}
});
}

@Override
public void close() {
this.sockets.forEach(entry -> {
entry.getKey().close();
entry.getValue().close();
});
this.proxyClient.close();
this.proxyServer.close();
}
}
}