Skip to content

Commit

Permalink
GrpcServiceBridgeImpl handle HttpClosedException
Browse files Browse the repository at this point in the history
fixes #101
  • Loading branch information
hu-chia committed Jun 19, 2024
1 parent 741022c commit a70e463
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.grpc.ServerServiceDefinition;
import io.grpc.Status;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpClosedException;
import io.vertx.core.net.SocketAddress;
import io.vertx.grpc.common.GrpcError;
import io.vertx.grpc.common.GrpcStatus;
Expand Down Expand Up @@ -152,6 +153,11 @@ void init(ServerCall.Listener<Req> listener) {
listener.onCancel();
}
});
req.exceptionHandler(throwable -> {
if (throwable instanceof HttpClosedException && !closed) {
listener.onCancel();
}
});
readAdapter.init(req, new BridgeMessageDecoder<>(methodDef.getMethodDescriptor().getRequestMarshaller(), decompressor));
writeAdapter.init(req.response(), new BridgeMessageEncoder<>(methodDef.getMethodDescriptor().getResponseMarshaller(), compressor));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package io.vertx.grpc.server;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

public class ProcessHelper {

private ProcessHelper() {}

public static Process exec(Class<?> main, List<String> args) throws IOException {

List<String> command = new ArrayList<>();
// java binary executable
command.add(System.getProperty("java.home") + File.separator + "bin" + File.separator + "java");
command.add("-cp");
// inherit classpath
command.add(System.getProperty("java.class.path"));
// main class name
command.add(main.getName());
// args
command.addAll(Objects.requireNonNullElse(args, Collections.emptyList()));

return new ProcessBuilder(command).inheritIO().start();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,16 @@
import io.grpc.examples.streaming.Empty;
import io.grpc.examples.streaming.Item;
import io.grpc.examples.streaming.StreamingGrpc;
import io.grpc.examples.streaming.StreamingGrpc.StreamingImplBase;
import io.grpc.protobuf.StatusProto;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import io.vertx.core.Promise;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import org.junit.Test;

import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -463,4 +468,78 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, Re
HelloReply res = stub.sayHello(request);
should.assertEquals(1, testAttributesStep.get());
}

@Test
public void testCallNetworkInterrupted(TestContext should) throws InterruptedException, IOException {
AtomicInteger requestCount = new AtomicInteger();
Promise<Void> completed = Promise.promise();
Async async = should.async();

StreamingGrpc.StreamingImplBase impl = new StreamingImplBase() {
@Override
public StreamObserver<Item> pipe(StreamObserver<Item> responseObserver) {
return new StreamObserver<Item>() {
@Override
public void onNext(Item item) {
requestCount.incrementAndGet();
}

@Override
public void onError(Throwable throwable) {
completed.fail(throwable);
async.complete();
}

@Override
public void onCompleted() {
completed.complete();
async.complete();
}
};
}
};

GrpcServer server = GrpcServer.server(vertx);
GrpcServiceBridge serverStub = GrpcServiceBridge.bridge(impl);
serverStub.bind(server);
startServer(server);

Process client = ProcessHelper.exec(ServerBridgeTest.class, Collections.singletonList(String.valueOf(port)));
// waiting for doing request
Thread.sleep(1_000);
client.destroy();
client.waitFor();

async.await(20_000);

should.assertEquals(requestCount.get(), 3);
should.assertTrue(completed.future().failed());
}

public static void main(String... args) throws InterruptedException {
StreamObserver<Item> noop = new StreamObserver<Item>() {
@Override public void onNext(Item item) {

}

@Override public void onError(Throwable throwable) {

}

@Override public void onCompleted() {

}
};

Channel channel = ManagedChannelBuilder.forAddress("localhost", Integer.parseInt(args[0])).usePlaintext().build();
StreamingGrpc.StreamingStub stub = StreamingGrpc.newStub(channel);
StreamObserver<Item> requestObserver = stub.pipe(noop);
Item request = Item.newBuilder().setValue("item").build();
requestObserver.onNext(request);
requestObserver.onNext(request);
requestObserver.onNext(request);

// waiting to be killed
Thread.currentThread().join();
}
}

0 comments on commit a70e463

Please sign in to comment.