From 429347978674b0897d90f0efd25d33c23871adfa Mon Sep 17 00:00:00 2001 From: EitanLiu Date: Sun, 14 Apr 2024 20:20:47 +0800 Subject: [PATCH 1/2] Support kotlinx-coroutines suspend function --- feign-reactor-core/pom.xml | 42 +++++++++++++++++++ .../java/reactivefeign/ReactiveContract.java | 24 +++++++++++ .../java/reactivefeign/ReactiveFeign.java | 5 ++- .../ReactiveMethodHandlerFactory.java | 6 +++ .../methodhandler/SuspendMethodHandler.java | 33 +++++++++++++++ .../reactivefeign/utils/KtCoroutinesUtils.kt | 19 +++++++++ feign-reactor-parent/pom.xml | 1 + feign-reactor-webclient-core/pom.xml | 6 +++ .../client/WebReactiveHttpResponse.java | 3 +- 9 files changed, 136 insertions(+), 3 deletions(-) create mode 100644 feign-reactor-core/src/main/java/reactivefeign/methodhandler/SuspendMethodHandler.java create mode 100644 feign-reactor-core/src/main/java/reactivefeign/utils/KtCoroutinesUtils.kt diff --git a/feign-reactor-core/pom.xml b/feign-reactor-core/pom.xml index 8f415583..0b6ce3b4 100644 --- a/feign-reactor-core/pom.xml +++ b/feign-reactor-core/pom.xml @@ -31,6 +31,17 @@ Feign Reactive Core + + org.jetbrains.kotlin + kotlin-stdlib + provided + + + + org.jetbrains.kotlinx + kotlinx-coroutines-reactor + provided + io.projectreactor @@ -196,6 +207,37 @@ + + org.jetbrains.kotlin + kotlin-maven-plugin + + + compile + process-sources + + compile + + + + src/main/java + target/generated-sources/annotations + + + + + + + -Xjsr305=strict + + + + + org.jetbrains.kotlin + kotlin-maven-allopen + ${kotlin.version} + + + diff --git a/feign-reactor-core/src/main/java/reactivefeign/ReactiveContract.java b/feign-reactor-core/src/main/java/reactivefeign/ReactiveContract.java index 2e7c57ab..0a5485ff 100644 --- a/feign-reactor-core/src/main/java/reactivefeign/ReactiveContract.java +++ b/feign-reactor-core/src/main/java/reactivefeign/ReactiveContract.java @@ -15,6 +15,7 @@ import feign.Contract; import feign.MethodMetadata; +import reactivefeign.utils.KtCoroutinesUtils; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -48,6 +49,12 @@ public List parseAndValidateMetadata(final Class targetType) for (final MethodMetadata metadata : methodsMetadata) { final Type type = metadata.returnType(); + + if (KtCoroutinesUtils.isSuspend(metadata.method())) { + modifySuspendMethodMetadata(metadata); + continue; + } + if (!isReactorType(type)) { throw new IllegalArgumentException(String.format( "Method %s of contract %s doesn't returns reactor.core.publisher.Mono or reactor.core.publisher.Flux", @@ -64,6 +71,23 @@ public List parseAndValidateMetadata(final Class targetType) return methodsMetadata; } + private static void modifySuspendMethodMetadata(MethodMetadata metadata) { + Type kotlinMethodReturnType = KtCoroutinesUtils.suspendReturnType(metadata.method()); + if (kotlinMethodReturnType == null) { + throw new IllegalArgumentException(String.format( + "Method %s can't have continuation argument, only kotlin method is allowed", + metadata.configKey())); + } + metadata.returnType(kotlinMethodReturnType); + + int continuationIndex = metadata.method().getParameterCount() - 1; + metadata.ignoreParamater(continuationIndex); + + if(metadata.bodyIndex() != null && metadata.bodyIndex().equals(continuationIndex)) { + metadata.bodyIndex(null); + } + } + private static final Set REACTOR_PUBLISHERS = new HashSet<>(asList(Mono.class, Flux.class)); public static boolean isReactorType(final Type type) { diff --git a/feign-reactor-core/src/main/java/reactivefeign/ReactiveFeign.java b/feign-reactor-core/src/main/java/reactivefeign/ReactiveFeign.java index 2bc74210..cb04e4ce 100644 --- a/feign-reactor-core/src/main/java/reactivefeign/ReactiveFeign.java +++ b/feign-reactor-core/src/main/java/reactivefeign/ReactiveFeign.java @@ -43,6 +43,7 @@ import reactivefeign.publisher.retry.FluxRetryPublisherHttpClient; import reactivefeign.publisher.retry.MonoRetryPublisherHttpClient; import reactivefeign.retry.ReactiveRetryPolicy; +import reactivefeign.utils.KtCoroutinesUtils; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -316,7 +317,7 @@ public static PublisherHttpClient retry( MethodMetadata methodMetadata, ReactiveRetryPolicy retryPolicy) { Type returnPublisherType = returnPublisherType(methodMetadata); - if(returnPublisherType == Mono.class){ + if (returnPublisherType == Mono.class || KtCoroutinesUtils.isSuspend(methodMetadata.method())) { return new MonoRetryPublisherHttpClient(publisherClient, methodMetadata, retryPolicy); } else if(returnPublisherType == Flux.class) { return new FluxRetryPublisherHttpClient(publisherClient, methodMetadata, retryPolicy); @@ -331,7 +332,7 @@ protected PublisherHttpClient toPublisher(ReactiveHttpClient reactiveHttpClient, } Class returnPublisherType = returnPublisherType(methodMetadata); - if(returnPublisherType == Mono.class){ + if(returnPublisherType == Mono.class || KtCoroutinesUtils.isSuspend(methodMetadata.method())) { return new MonoPublisherHttpClient(reactiveHttpClient); } else if(returnPublisherType == Flux.class){ return new FluxPublisherHttpClient(reactiveHttpClient); diff --git a/feign-reactor-core/src/main/java/reactivefeign/methodhandler/ReactiveMethodHandlerFactory.java b/feign-reactor-core/src/main/java/reactivefeign/methodhandler/ReactiveMethodHandlerFactory.java index f7996dd8..625732a5 100644 --- a/feign-reactor-core/src/main/java/reactivefeign/methodhandler/ReactiveMethodHandlerFactory.java +++ b/feign-reactor-core/src/main/java/reactivefeign/methodhandler/ReactiveMethodHandlerFactory.java @@ -2,8 +2,10 @@ import feign.MethodMetadata; import feign.Target; +import kotlin.coroutines.Continuation; import reactivefeign.publisher.PublisherClientFactory; import reactivefeign.publisher.ResponsePublisherHttpClient; +import reactivefeign.utils.KtCoroutinesUtils; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -44,6 +46,8 @@ public MethodHandler create(MethodMetadata metadata) { return new MonoMethodHandler(methodHandler); } else if(returnPublisherType == Flux.class) { return new FluxMethodHandler(methodHandler); + } else if(KtCoroutinesUtils.isSuspend(metadata.method())){ + return new SuspendMethodHandler(methodHandler); } else { throw new IllegalArgumentException("Unknown returnPublisherType: " + returnPublisherType); } @@ -57,6 +61,8 @@ public MethodHandler createDefault(Method method) { return new MonoMethodHandler(defaultMethodHandler); } else if(method.getReturnType() == Flux.class) { return new FluxMethodHandler(defaultMethodHandler); + } else if(method.getReturnType() == Continuation.class){ + return new SuspendMethodHandler(defaultMethodHandler); } else { throw new IllegalArgumentException("Unknown returnPublisherType: " + method.getReturnType()); } diff --git a/feign-reactor-core/src/main/java/reactivefeign/methodhandler/SuspendMethodHandler.java b/feign-reactor-core/src/main/java/reactivefeign/methodhandler/SuspendMethodHandler.java new file mode 100644 index 00000000..a6084b54 --- /dev/null +++ b/feign-reactor-core/src/main/java/reactivefeign/methodhandler/SuspendMethodHandler.java @@ -0,0 +1,33 @@ +package reactivefeign.methodhandler; + +import kotlin.coroutines.Continuation; +import kotlinx.coroutines.reactor.MonoKt; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Mono; + +import java.util.Arrays; + +public class SuspendMethodHandler implements MethodHandler { + + private final MethodHandler methodHandler; + + public SuspendMethodHandler(MethodHandler methodHandler) { + this.methodHandler = methodHandler; + } + + @Override + public Object invoke(final Object[] argv) { + Object[] args = Arrays.copyOf(argv, argv.length - 1); + Continuation continuation = (Continuation) argv[argv.length - 1]; + return MonoKt.awaitSingleOrNull(invokeMono(args), continuation); + } + + @SuppressWarnings("unchecked") + public Mono invokeMono(final Object[] argv) { + try { + return Mono.from((Publisher) methodHandler.invoke(argv)); + } catch (Throwable throwable) { + return Mono.error(throwable); + } + } +} diff --git a/feign-reactor-core/src/main/java/reactivefeign/utils/KtCoroutinesUtils.kt b/feign-reactor-core/src/main/java/reactivefeign/utils/KtCoroutinesUtils.kt new file mode 100644 index 00000000..05049cf7 --- /dev/null +++ b/feign-reactor-core/src/main/java/reactivefeign/utils/KtCoroutinesUtils.kt @@ -0,0 +1,19 @@ +@file:JvmName("KtCoroutinesUtils") + +package reactivefeign.utils + +import java.lang.reflect.Method +import java.lang.reflect.ParameterizedType +import java.lang.reflect.Type +import kotlin.coroutines.Continuation + +internal fun Method.isSuspend(): Boolean { + val classes: Array> = parameterTypes + return classes.isNotEmpty() && classes[classes.size - 1] == Continuation::class.java +} + +internal fun Method.suspendReturnType() = if (isSuspend()) { + val types: Array = genericParameterTypes + val conType = types[types.size - 1] as ParameterizedType + conType +} else returnType \ No newline at end of file diff --git a/feign-reactor-parent/pom.xml b/feign-reactor-parent/pom.xml index 53500c58..b67767da 100644 --- a/feign-reactor-parent/pom.xml +++ b/feign-reactor-parent/pom.xml @@ -14,6 +14,7 @@ 17 + 1.8.22 UTF-8 UTF-8 diff --git a/feign-reactor-webclient-core/pom.xml b/feign-reactor-webclient-core/pom.xml index 02b7c978..c8a5537a 100644 --- a/feign-reactor-webclient-core/pom.xml +++ b/feign-reactor-webclient-core/pom.xml @@ -12,6 +12,12 @@ feign-reactor-webclient-core + + org.jetbrains.kotlin + kotlin-stdlib + provided + + com.playtika.reactivefeign feign-reactor-core diff --git a/feign-reactor-webclient-core/src/main/java/reactivefeign/webclient/client/WebReactiveHttpResponse.java b/feign-reactor-webclient-core/src/main/java/reactivefeign/webclient/client/WebReactiveHttpResponse.java index 544fd9aa..f3d46e22 100644 --- a/feign-reactor-webclient-core/src/main/java/reactivefeign/webclient/client/WebReactiveHttpResponse.java +++ b/feign-reactor-webclient-core/src/main/java/reactivefeign/webclient/client/WebReactiveHttpResponse.java @@ -1,5 +1,6 @@ package reactivefeign.webclient.client; +import kotlin.coroutines.Continuation; import org.reactivestreams.Publisher; import org.springframework.core.ParameterizedTypeReference; import org.springframework.core.io.buffer.DataBuffer; @@ -49,7 +50,7 @@ public Map> headers() { @Override public P body() { - if (returnPublisherType == Mono.class) { + if (returnPublisherType == Mono.class || returnPublisherType == Continuation.class) { return (P)clientResponse.bodyToMono(returnActualType); } else if(returnPublisherType == Flux.class){ return (P)clientResponse.bodyToFlux(returnActualType); From 6926954de51382af75399d7c903cbcd19c8f5658 Mon Sep 17 00:00:00 2001 From: EitanLiu Date: Sun, 14 Apr 2024 22:33:34 +0800 Subject: [PATCH 2/2] Support kotlinx-coroutines Flow --- .../java/reactivefeign/ReactiveContract.java | 3 +- .../java/reactivefeign/ReactiveFeign.java | 5 ++-- .../methodhandler/FlowMethodHandler.java | 30 +++++++++++++++++++ .../ReactiveMethodHandlerFactory.java | 5 ++++ feign-reactor-webclient-core/pom.xml | 5 ++++ .../client/WebReactiveHttpResponse.java | 3 +- 6 files changed, 47 insertions(+), 4 deletions(-) create mode 100644 feign-reactor-core/src/main/java/reactivefeign/methodhandler/FlowMethodHandler.java diff --git a/feign-reactor-core/src/main/java/reactivefeign/ReactiveContract.java b/feign-reactor-core/src/main/java/reactivefeign/ReactiveContract.java index 0a5485ff..c4964760 100644 --- a/feign-reactor-core/src/main/java/reactivefeign/ReactiveContract.java +++ b/feign-reactor-core/src/main/java/reactivefeign/ReactiveContract.java @@ -15,6 +15,7 @@ import feign.Contract; import feign.MethodMetadata; +import kotlinx.coroutines.flow.Flow; import reactivefeign.utils.KtCoroutinesUtils; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -88,7 +89,7 @@ private static void modifySuspendMethodMetadata(MethodMetadata metadata) { } } - private static final Set REACTOR_PUBLISHERS = new HashSet<>(asList(Mono.class, Flux.class)); + private static final Set REACTOR_PUBLISHERS = new HashSet<>(asList(Mono.class, Flux.class, Flow.class)); public static boolean isReactorType(final Type type) { return (type instanceof ParameterizedType) diff --git a/feign-reactor-core/src/main/java/reactivefeign/ReactiveFeign.java b/feign-reactor-core/src/main/java/reactivefeign/ReactiveFeign.java index cb04e4ce..a77d6b5b 100644 --- a/feign-reactor-core/src/main/java/reactivefeign/ReactiveFeign.java +++ b/feign-reactor-core/src/main/java/reactivefeign/ReactiveFeign.java @@ -18,6 +18,7 @@ import feign.InvocationHandlerFactory; import feign.MethodMetadata; import feign.Target; +import kotlinx.coroutines.flow.Flow; import org.reactivestreams.Publisher; import reactivefeign.client.ReactiveErrorMapper; import reactivefeign.client.ReactiveHttpClient; @@ -319,7 +320,7 @@ public static PublisherHttpClient retry( Type returnPublisherType = returnPublisherType(methodMetadata); if (returnPublisherType == Mono.class || KtCoroutinesUtils.isSuspend(methodMetadata.method())) { return new MonoRetryPublisherHttpClient(publisherClient, methodMetadata, retryPolicy); - } else if(returnPublisherType == Flux.class) { + } else if(returnPublisherType == Flux.class || returnPublisherType == Flow.class) { return new FluxRetryPublisherHttpClient(publisherClient, methodMetadata, retryPolicy); } else { throw new IllegalArgumentException("Unknown returnPublisherType: " + returnPublisherType); @@ -334,7 +335,7 @@ protected PublisherHttpClient toPublisher(ReactiveHttpClient reactiveHttpClient, Class returnPublisherType = returnPublisherType(methodMetadata); if(returnPublisherType == Mono.class || KtCoroutinesUtils.isSuspend(methodMetadata.method())) { return new MonoPublisherHttpClient(reactiveHttpClient); - } else if(returnPublisherType == Flux.class){ + } else if(returnPublisherType == Flux.class || returnPublisherType == Flow.class) { return new FluxPublisherHttpClient(reactiveHttpClient); } else { throw new IllegalArgumentException("Unknown returnPublisherType: " + returnPublisherType); diff --git a/feign-reactor-core/src/main/java/reactivefeign/methodhandler/FlowMethodHandler.java b/feign-reactor-core/src/main/java/reactivefeign/methodhandler/FlowMethodHandler.java new file mode 100644 index 00000000..3ee78306 --- /dev/null +++ b/feign-reactor-core/src/main/java/reactivefeign/methodhandler/FlowMethodHandler.java @@ -0,0 +1,30 @@ +package reactivefeign.methodhandler; + +import kotlinx.coroutines.flow.Flow; +import kotlinx.coroutines.reactive.ReactiveFlowKt; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; + +public class FlowMethodHandler implements MethodHandler { + + private final MethodHandler methodHandler; + + public FlowMethodHandler(MethodHandler methodHandler) { + this.methodHandler = methodHandler; + } + + @Override + public Flow invoke(final Object[] argv) { + return ReactiveFlowKt.asFlow(invokeFlux(argv)); + } + + @SuppressWarnings("unchecked") + protected Flux invokeFlux(final Object[] argv) { + try { + return Flux.from((Publisher) methodHandler.invoke(argv)); + } catch (Throwable throwable) { + return Flux.error(throwable); + } + } + +} diff --git a/feign-reactor-core/src/main/java/reactivefeign/methodhandler/ReactiveMethodHandlerFactory.java b/feign-reactor-core/src/main/java/reactivefeign/methodhandler/ReactiveMethodHandlerFactory.java index 625732a5..279ae216 100644 --- a/feign-reactor-core/src/main/java/reactivefeign/methodhandler/ReactiveMethodHandlerFactory.java +++ b/feign-reactor-core/src/main/java/reactivefeign/methodhandler/ReactiveMethodHandlerFactory.java @@ -3,6 +3,7 @@ import feign.MethodMetadata; import feign.Target; import kotlin.coroutines.Continuation; +import kotlinx.coroutines.flow.Flow; import reactivefeign.publisher.PublisherClientFactory; import reactivefeign.publisher.ResponsePublisherHttpClient; import reactivefeign.utils.KtCoroutinesUtils; @@ -48,6 +49,8 @@ public MethodHandler create(MethodMetadata metadata) { return new FluxMethodHandler(methodHandler); } else if(KtCoroutinesUtils.isSuspend(metadata.method())){ return new SuspendMethodHandler(methodHandler); + } else if(returnPublisherType == Flow.class) { + return new FlowMethodHandler(methodHandler); } else { throw new IllegalArgumentException("Unknown returnPublisherType: " + returnPublisherType); } @@ -63,6 +66,8 @@ public MethodHandler createDefault(Method method) { return new FluxMethodHandler(defaultMethodHandler); } else if(method.getReturnType() == Continuation.class){ return new SuspendMethodHandler(defaultMethodHandler); + } else if(method.getReturnType() == Flow.class) { + return new FlowMethodHandler(defaultMethodHandler); } else { throw new IllegalArgumentException("Unknown returnPublisherType: " + method.getReturnType()); } diff --git a/feign-reactor-webclient-core/pom.xml b/feign-reactor-webclient-core/pom.xml index c8a5537a..6db3f3be 100644 --- a/feign-reactor-webclient-core/pom.xml +++ b/feign-reactor-webclient-core/pom.xml @@ -17,6 +17,11 @@ kotlin-stdlib provided + + org.jetbrains.kotlinx + kotlinx-coroutines-core + provided + com.playtika.reactivefeign diff --git a/feign-reactor-webclient-core/src/main/java/reactivefeign/webclient/client/WebReactiveHttpResponse.java b/feign-reactor-webclient-core/src/main/java/reactivefeign/webclient/client/WebReactiveHttpResponse.java index f3d46e22..984663eb 100644 --- a/feign-reactor-webclient-core/src/main/java/reactivefeign/webclient/client/WebReactiveHttpResponse.java +++ b/feign-reactor-webclient-core/src/main/java/reactivefeign/webclient/client/WebReactiveHttpResponse.java @@ -1,6 +1,7 @@ package reactivefeign.webclient.client; import kotlin.coroutines.Continuation; +import kotlinx.coroutines.flow.Flow; import org.reactivestreams.Publisher; import org.springframework.core.ParameterizedTypeReference; import org.springframework.core.io.buffer.DataBuffer; @@ -52,7 +53,7 @@ public Map> headers() { public P body() { if (returnPublisherType == Mono.class || returnPublisherType == Continuation.class) { return (P)clientResponse.bodyToMono(returnActualType); - } else if(returnPublisherType == Flux.class){ + } else if(returnPublisherType == Flux.class || returnPublisherType == Flow.class) { return (P)clientResponse.bodyToFlux(returnActualType); } else { throw new IllegalArgumentException("Unknown returnPublisherType: " + returnPublisherType);