Skip to content

Commit

Permalink
Triple stream support (issue#1332) (#1360)
Browse files Browse the repository at this point in the history
* Add bi-stream call support for triple

* triple pojo mode support stream

---------

Co-authored-by: liujianjun.ljj <[email protected]>
  • Loading branch information
namelessssssssssss and liujianjun.ljj authored May 14, 2024
1 parent 12fbfe1 commit 56ed259
Show file tree
Hide file tree
Showing 27 changed files with 2,001 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,22 @@ public class RpcConstants {
* 调用方式:future
*/
public static final String INVOKER_TYPE_FUTURE = "future";
/**
* 调用方式:一元调用
*/
public static final String INVOKER_TYPE_UNARY = "unary";
/**
* 调用方式:客户端流
*/
public static final String INVOKER_TYPE_CLIENT_STREAMING = "clientStream";
/**
* 调用方式:服务端流
*/
public static final String INVOKER_TYPE_SERVER_STREAMING = "serverStream";
/**
* 调用方式:双向流
*/
public static final String INVOKER_TYPE_BI_STREAMING = "bidirectionalStream";

/**
* Hessian序列化 [不推荐]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.rpc.transport;

/**
* StreamHandler, works just like gRPC StreamObserver.
*/
public interface SofaStreamObserver<T> {

/**
* Sends a message, or defines the behavior when a message is received.
* <p>This method should never be called after {@link SofaStreamObserver#onCompleted()} has been invoked.
*/
void onNext(T message);

/**
* Note: This method MUST be invoked after the transport is complete.
* Failure to do so may result in unexpected errors.
* <p>
* Signals that all messages have been sent/received normally, and closes this stream.
*/
void onCompleted();

/**
* Signals an exception to terminate this stream, or defines the behavior when an error occurs.
* <p></p>
* Once this method is invoked by one side, it can't send more messages, and the corresponding method on the other side will be triggered.
* Depending on the protocol implementation, it's possible that the other side can still call {@link SofaStreamObserver#onNext(Object)} after this method has been invoked, although this is not recommended.
* <p></p>
* As a best practice, it is advised not to send any more information once this method is called.
*
*/
void onError(Throwable throwable);
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ public class RpcErrorType {
*/
public static final int CLIENT_NETWORK = 250;

/**
* 不支持的RPC调用方式异常
*/
public static final int CLIENT_CALL_TYPE = 260;

/**
* 客户端过滤器异常
*/
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,68 @@ triple.Response> getGenericMethod() {
return getGenericMethod;
}

private static volatile io.grpc.MethodDescriptor<triple.Request,
triple.Response> getGenericBiStreamMethod;

@io.grpc.stub.annotations.RpcMethod(
fullMethodName = SERVICE_NAME + '/' + "genericBiStream",
requestType = triple.Request.class,
responseType = triple.Response.class,
methodType = io.grpc.MethodDescriptor.MethodType.BIDI_STREAMING)
public static io.grpc.MethodDescriptor<triple.Request,
triple.Response> getGenericBiStreamMethod() {
io.grpc.MethodDescriptor<triple.Request, triple.Response> getGenericBiStreamMethod;
if ((getGenericBiStreamMethod = GenericServiceGrpc.getGenericBiStreamMethod) == null) {
synchronized (GenericServiceGrpc.class) {
if ((getGenericBiStreamMethod = GenericServiceGrpc.getGenericBiStreamMethod) == null) {
GenericServiceGrpc.getGenericBiStreamMethod = getGenericBiStreamMethod =
io.grpc.MethodDescriptor.<triple.Request, triple.Response>newBuilder()
.setType(io.grpc.MethodDescriptor.MethodType.BIDI_STREAMING)
.setFullMethodName(generateFullMethodName(SERVICE_NAME, "genericBiStream"))
.setSampledToLocalTracing(true)
.setRequestMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
triple.Request.getDefaultInstance()))
.setResponseMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
triple.Response.getDefaultInstance()))
.setSchemaDescriptor(new GenericServiceMethodDescriptorSupplier("genericBiStream"))
.build();
}
}
}
return getGenericBiStreamMethod;
}

private static volatile io.grpc.MethodDescriptor<triple.Request,
triple.Response> getGenericServerStreamMethod;

@io.grpc.stub.annotations.RpcMethod(
fullMethodName = SERVICE_NAME + '/' + "genericServerStream",
requestType = triple.Request.class,
responseType = triple.Response.class,
methodType = io.grpc.MethodDescriptor.MethodType.SERVER_STREAMING)
public static io.grpc.MethodDescriptor<triple.Request,
triple.Response> getGenericServerStreamMethod() {
io.grpc.MethodDescriptor<triple.Request, triple.Response> getGenericServerStreamMethod;
if ((getGenericServerStreamMethod = GenericServiceGrpc.getGenericServerStreamMethod) == null) {
synchronized (GenericServiceGrpc.class) {
if ((getGenericServerStreamMethod = GenericServiceGrpc.getGenericServerStreamMethod) == null) {
GenericServiceGrpc.getGenericServerStreamMethod = getGenericServerStreamMethod =
io.grpc.MethodDescriptor.<triple.Request, triple.Response>newBuilder()
.setType(io.grpc.MethodDescriptor.MethodType.SERVER_STREAMING)
.setFullMethodName(generateFullMethodName(SERVICE_NAME, "genericServerStream"))
.setSampledToLocalTracing(true)
.setRequestMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
triple.Request.getDefaultInstance()))
.setResponseMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
triple.Response.getDefaultInstance()))
.setSchemaDescriptor(new GenericServiceMethodDescriptorSupplier("genericServerStream"))
.build();
}
}
}
return getGenericServerStreamMethod;
}

/**
* Creates a new async stub that supports all call types for the service
*/
Expand Down Expand Up @@ -101,6 +163,20 @@ public void generic(triple.Request request,
io.grpc.stub.ServerCalls.asyncUnimplementedUnaryCall(getGenericMethod(), responseObserver);
}

/**
*/
public io.grpc.stub.StreamObserver<triple.Request> genericBiStream(
io.grpc.stub.StreamObserver<triple.Response> responseObserver) {
return io.grpc.stub.ServerCalls.asyncUnimplementedStreamingCall(getGenericBiStreamMethod(), responseObserver);
}

/**
*/
public void genericServerStream(triple.Request request,
io.grpc.stub.StreamObserver<triple.Response> responseObserver) {
io.grpc.stub.ServerCalls.asyncUnimplementedUnaryCall(getGenericServerStreamMethod(), responseObserver);
}

@java.lang.Override public final io.grpc.ServerServiceDefinition bindService() {
return io.grpc.ServerServiceDefinition.builder(getServiceDescriptor())
.addMethod(
Expand All @@ -110,6 +186,20 @@ public void generic(triple.Request request,
triple.Request,
triple.Response>(
this, METHODID_GENERIC)))
.addMethod(
getGenericBiStreamMethod(),
io.grpc.stub.ServerCalls.asyncBidiStreamingCall(
new MethodHandlers<
triple.Request,
triple.Response>(
this, METHODID_GENERIC_BI_STREAM)))
.addMethod(
getGenericServerStreamMethod(),
io.grpc.stub.ServerCalls.asyncServerStreamingCall(
new MethodHandlers<
triple.Request,
triple.Response>(
this, METHODID_GENERIC_SERVER_STREAM)))
.build();
}
}
Expand All @@ -135,6 +225,22 @@ public void generic(triple.Request request,
io.grpc.stub.ClientCalls.asyncUnaryCall(
getChannel().newCall(getGenericMethod(), getCallOptions()), request, responseObserver);
}

/**
*/
public io.grpc.stub.StreamObserver<triple.Request> genericBiStream(
io.grpc.stub.StreamObserver<triple.Response> responseObserver) {
return io.grpc.stub.ClientCalls.asyncBidiStreamingCall(
getChannel().newCall(getGenericBiStreamMethod(), getCallOptions()), responseObserver);
}

/**
*/
public void genericServerStream(triple.Request request,
io.grpc.stub.StreamObserver<triple.Response> responseObserver) {
io.grpc.stub.ClientCalls.asyncServerStreamingCall(
getChannel().newCall(getGenericServerStreamMethod(), getCallOptions()), request, responseObserver);
}
}

/**
Expand All @@ -157,6 +263,14 @@ public triple.Response generic(triple.Request request) {
return io.grpc.stub.ClientCalls.blockingUnaryCall(
getChannel(), getGenericMethod(), getCallOptions(), request);
}

/**
*/
public java.util.Iterator<triple.Response> genericServerStream(
triple.Request request) {
return io.grpc.stub.ClientCalls.blockingServerStreamingCall(
getChannel(), getGenericServerStreamMethod(), getCallOptions(), request);
}
}

/**
Expand All @@ -183,6 +297,8 @@ public com.google.common.util.concurrent.ListenableFuture<triple.Response> gener
}

private static final int METHODID_GENERIC = 0;
private static final int METHODID_GENERIC_SERVER_STREAM = 1;
private static final int METHODID_GENERIC_BI_STREAM = 2;

private static final class MethodHandlers<Req, Resp> implements
io.grpc.stub.ServerCalls.UnaryMethod<Req, Resp>,
Expand All @@ -205,6 +321,10 @@ public void invoke(Req request, io.grpc.stub.StreamObserver<Resp> responseObserv
serviceImpl.generic((triple.Request) request,
(io.grpc.stub.StreamObserver<triple.Response>) responseObserver);
break;
case METHODID_GENERIC_SERVER_STREAM:
serviceImpl.genericServerStream((triple.Request) request,
(io.grpc.stub.StreamObserver<triple.Response>) responseObserver);
break;
default:
throw new AssertionError();
}
Expand All @@ -215,6 +335,9 @@ public void invoke(Req request, io.grpc.stub.StreamObserver<Resp> responseObserv
public io.grpc.stub.StreamObserver<Req> invoke(
io.grpc.stub.StreamObserver<Resp> responseObserver) {
switch (methodId) {
case METHODID_GENERIC_BI_STREAM:
return (io.grpc.stub.StreamObserver<Req>) serviceImpl.genericBiStream(
(io.grpc.stub.StreamObserver<triple.Response>) responseObserver);
default:
throw new AssertionError();
}
Expand Down Expand Up @@ -267,6 +390,8 @@ public static io.grpc.ServiceDescriptor getServiceDescriptor() {
serviceDescriptor = result = io.grpc.ServiceDescriptor.newBuilder(SERVICE_NAME)
.setSchemaDescriptor(new GenericServiceFileDescriptorSupplier())
.addMethod(getGenericMethod())
.addMethod(getGenericBiStreamMethod())
.addMethod(getGenericServerStreamMethod())
.build();
}
}
Expand Down
Loading

0 comments on commit 56ed259

Please sign in to comment.