Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added spring boot 3.2 support #675

Merged
merged 6 commits into from
Aug 31, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions feign-reactor-benchmarks/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,17 @@

<dependency>
<groupId>org.eclipse.jetty.http2</groupId>
<artifactId>http2-client</artifactId>
<artifactId>jetty-http2-client</artifactId>
</dependency>

<dependency>
<groupId>org.eclipse.jetty.http2</groupId>
<artifactId>http2-http-client-transport</artifactId>
<artifactId>jetty-http2-client-transport</artifactId>
</dependency>

<dependency>
<groupId>org.eclipse.jetty.http2</groupId>
<artifactId>http2-server</artifactId>
<artifactId>jetty-http2-server</artifactId>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,19 @@
import io.reactivex.netty.protocol.http.server.HttpServerRequest;
import io.reactivex.netty.protocol.http.server.HttpServerResponse;
import io.reactivex.netty.protocol.http.server.RequestHandler;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.http2.client.HTTP2Client;
import org.eclipse.jetty.http2.client.http.HttpClientTransportOverHTTP2;
import org.eclipse.jetty.http2.client.transport.HttpClientTransportOverHTTP2;
import org.eclipse.jetty.http2.server.HTTP2CServerConnectionFactory;
import org.eclipse.jetty.server.*;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.springframework.web.reactive.function.client.WebClient;
import reactivefeign.java11.Java11ReactiveFeign;
Expand All @@ -27,8 +32,8 @@
import reactivefeign.webclient.WebReactiveFeign;
import reactor.core.publisher.Mono;

import java.io.IOException;
import java.lang.reflect.ParameterizedType;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -161,16 +166,17 @@ public rx.Observable<Void> handle(HttpServerRequest<ByteBuf> request,
private Server jettyH2c(int port){
Server serverJetty = new Server();

serverJetty.setHandler(new AbstractHandler(){
serverJetty.setHandler(new Handler.Abstract(){
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException {
request.getInputStream().skip(Integer.MAX_VALUE);
if(target.equals(PATH_WITH_PAYLOAD)){
response.addHeader("Content-Type", "application/json");
response.getOutputStream().write(responseJson);
response.getOutputStream().flush();
public boolean handle(Request request, Response response, Callback callback) throws Exception {
Content.Chunk chunk = request.read();
chunk.skip(Integer.MAX_VALUE);
if (request.getHttpURI().getPath().startsWith(PATH_WITH_PAYLOAD)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did you changed equals to startsWith?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've reverted changes

response.getHeaders().add("Content-Type", "application/json");
response.write(true, ByteBuffer.wrap(responseJson), callback);
}
baseRequest.setHandled(true);
callback.succeeded();
return true;
}
});

Expand Down
4 changes: 2 additions & 2 deletions feign-reactor-cloud/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@
</dependency>

<dependency>
<groupId>com.github.tomakehurst</groupId>
<artifactId>wiremock-jre8-standalone</artifactId>
<groupId>org.wiremock</groupId>
<artifactId>wiremock-standalone</artifactId>
<scope>test</scope>
</dependency>

Expand Down
10 changes: 8 additions & 2 deletions feign-reactor-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@
</dependency>

<dependency>
<groupId>com.github.tomakehurst</groupId>
<artifactId>wiremock-jre8-standalone</artifactId>
<groupId>org.wiremock</groupId>
<artifactId>wiremock-standalone</artifactId>
<scope>test</scope>
</dependency>

Expand Down Expand Up @@ -151,6 +151,12 @@
</exclusions>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,10 @@ public long retryDelay(Throwable error, int attemptNo) {
if (attemptNo <= maxRetries) {
if(periodInMs > 0) {
long delay;
Date retryAfter;
Long retryAfter;
// "Retry-After" header set
if (error instanceof RetryableException
&& (retryAfter = ((RetryableException) error)
.retryAfter()) != null) {
delay = retryAfter.getTime() - clock.millis();
if (error instanceof RetryableException re && (retryAfter = re.retryAfter()) != null) {
delay = retryAfter - clock.millis();
delay = Math.min(delay, periodInMs);
delay = Math.max(delay, 0);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,19 @@ public static void installBlockHound() {
builder.allowBlockingCallsInside("org.eclipse.jetty.util.BlockingArrayQueue", "offer");
builder.allowBlockingCallsInside("org.eclipse.jetty.util.BlockingArrayQueue", "peek");
//java.net.InMemoryCookieStore.get
builder.allowBlockingCallsInside("org.eclipse.jetty.client.HttpConnection", "normalizeRequest");
builder.allowBlockingCallsInside("org.eclipse.jetty.client.transport.HttpConnection", "normalizeRequest");
builder.allowBlockingCallsInside("java.util.concurrent.FutureTask", "handlePossibleCancellationInterrupt");
builder.allowBlockingCallsInside("org.eclipse.jetty.http2.HTTP2Session$StreamsState", "reserveSlot");
builder.allowBlockingCallsInside("org.eclipse.jetty.http2.HTTP2Session$StreamsState", "flush");

//jetty http2 server
builder.allowBlockingCallsInside("org.eclipse.jetty.util.IteratingCallback", "processing");
builder.allowBlockingCallsInside("org.eclipse.jetty.util.IteratingCallback", "iterate");
builder.allowBlockingCallsInside("org.eclipse.jetty.util.thread.AutoLock", "lock");

//java11
builder.allowBlockingCallsInside("jdk.internal.net.http.MultiExchange", "responseAsync");
builder.allowBlockingCallsInside("jdk.internal.misc.Unsafe", "park");

builder.allowBlockingCallsInside("com.sun.jmx.mbeanserver.Repository", "remove");
builder.allowBlockingCallsInside("com.sun.jmx.mbeanserver.Repository", "contains");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import reactor.core.publisher.Mono;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;

public class DefaultMethodHandlerTest extends BaseReactorTest {
Expand Down Expand Up @@ -36,7 +37,7 @@ public void shouldCallNotDefaultMethodOnActualImplementation() throws Throwable
DefaultMethodHandler defaultMethodHandler
= new DefaultMethodHandler(TestInterface.class.getMethod("defaultMethod"));

TestInterface mockImplementation = mock(TestInterface.class);
TestInterface mockImplementation = spy(TestInterface.class);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why spy?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've reverted changes to use mock


defaultMethodHandler.bindTo(mockImplementation);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public void shouldThrowOnStatusCode() {
return new RetryableException(
response.status(),
"Should retry on next node",
httpMethod, null, request);
httpMethod, (Long) null, request);
}),
throwOnStatus(
status -> status == SC_UNAUTHORIZED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,16 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static java.util.Arrays.asList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.waitAtMost;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static reactivefeign.ReactivityTest.CALLS_NUMBER;
import static reactivefeign.ReactivityTest.timeToCompleteReactively;
import static reactivefeign.TestUtils.toLowerCaseKeys;
Expand Down Expand Up @@ -99,16 +99,17 @@ public void setUp() {

@Test
public void shouldReturnAllPassedParameters() {
Map<String, String> paramMap = new HashMap<String, String>() {{
put("paramKey1", "paramValue1");
put("paramKey2", "paramValue2");
}};
Map<String, String> returned = client.mirrorParameters(555,"777", paramMap)
.subscribeOn(testScheduler()).block();
Map<String, String> paramMap = Map.of("paramKey1", "paramValue1", "paramKey2", "paramValue2");

assertThat(returned).containsEntry("paramInPath", "555");
assertThat(returned).containsEntry("paramInUrl", "777");
assertThat(returned).containsAllEntriesOf(paramMap);
Mono<Map<String, String>> result = client.mirrorParameters(555,"777", paramMap)
.subscribeOn(testScheduler());

StepVerifier.create(result)
.consumeNextWith(returned -> {
assertThat(returned).containsEntry("paramInPath", "555");
assertThat(returned).containsEntry("paramInUrl", "777");
assertThat(returned).containsAllEntriesOf(paramMap);
});
}

@Test
Expand All @@ -118,11 +119,15 @@ public void shouldReturnEmptyPassedParameters() {
put("paramKey", "");
}
};
Map<String, String> returned = client.mirrorParameters(555,"", paramMap)
.subscribeOn(testScheduler()).block();
Mono<Map<String, String>> returned = client.mirrorParameters(555,"", paramMap)
.subscribeOn(testScheduler());

assertThat(returned).containsEntry("paramKey", "");
assertThat(returned).containsEntry("paramInUrl", "");
StepVerifier.create(returned)
.consumeNextWith(map -> {
assertThat(map).containsEntry("paramKey", "");
assertThat(map).containsEntry("paramInUrl", "");
})
.verifyComplete();
}

@Test
Expand Down Expand Up @@ -173,10 +178,12 @@ public void shouldNotReturnNullPassedParametersNew() {
public void shouldReturnAllPassedListParametersNew() {

List<Integer> dynamicListParam = asList(1, 2, 3);
List<Integer> returned = client.mirrorListParametersNew(dynamicListParam)
.subscribeOn(testScheduler()).block();
Mono<List<Integer>> result = client.mirrorListParametersNew(dynamicListParam)
.subscribeOn(testScheduler());

assertThat(returned).containsAll(dynamicListParam);
StepVerifier.create(result)
.consumeNextWith(returned -> assertThat(returned).containsAll(dynamicListParam))
.verifyComplete();
}

@Test
Expand Down Expand Up @@ -251,13 +258,17 @@ public void shouldReturnAllPassedListHeaders() {
public void shouldReturnAllPassedMultiMapHeaders() {
Map<String, List<String>> headersMap = new HashMap<String, List<String>>() {
{
put("headerKey1", asList("headerValue1", "headerValue2"));
put("headerKey1", List.of("headerValue1, headerValue2"));
}
};
Map<String, List<String>> returned = client.mirrorMultiMapHeaders(headersMap)
.subscribeOn(testScheduler()).block();
Mono<Map<String, List<String>>> result = client.mirrorMultiMapHeaders(headersMap)
.subscribeOn(testScheduler());

assertThat(toLowerCaseKeys(returned)).containsAllEntriesOf(toLowerCaseKeys(headersMap));
StepVerifier.create(result)
.consumeNextWith(returned -> {
assertNotNull(returned);
assertThat(toLowerCaseKeys(returned)).containsAllEntriesOf(toLowerCaseKeys(headersMap));
});
}

@Test
Expand Down Expand Up @@ -321,7 +332,7 @@ public void shouldReturnFirstResultBeforeSecondSent() throws InterruptedExceptio
AtomicInteger sentCount = new AtomicInteger();
AtomicInteger receivedCount = new AtomicInteger();

CompletableFuture<AllFeaturesApi.TestObject> firstReceived = new CompletableFuture<>();
CompletableFuture<AllFeaturesApi.TestObject> firstReceived = CompletableFuture.completedFuture(null);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't want completed future here.
I'm going to complete it later

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


Flux<AllFeaturesApi.TestObject> returned = client
.mirrorBodyStream(Flux.just(new AllFeaturesApi.TestObject("testMessage1"),
Expand All @@ -337,14 +348,17 @@ public void shouldReturnFirstResultBeforeSecondSent() throws InterruptedExceptio
countDownLatch.countDown();
}).subscribe();

countDownLatch.await();
boolean await = countDownLatch.await(5, TimeUnit.SECONDS);
assertThat(await).isTrue();
}

@Test
public void shouldReturnEmpty() {
Optional<AllFeaturesApi.TestObject> returned = client.empty()
.subscribeOn(testScheduler()).blockOptional();
assertThat(!returned.isPresent());
Mono<AllFeaturesApi.TestObject> returned = client.empty()
.subscribeOn(testScheduler());

StepVerifier.create(returned)
.verifyComplete();
}

@Test
Expand All @@ -354,7 +368,6 @@ public void shouldReturnDefaultBody() {
assertThat(returned).isEqualTo("default");
}


@Test
public void shouldRunReactively() {

Expand Down Expand Up @@ -411,33 +424,30 @@ public void shouldMirrorBinaryBody() {

@Test
public void shouldMirrorStreamingBinaryBodyReactive() throws InterruptedException {

CountDownLatch countDownLatch = new CountDownLatch(2);

AtomicInteger sentCount = new AtomicInteger();
ConcurrentLinkedQueue<byte[]> receivedAll = new ConcurrentLinkedQueue<>();

CompletableFuture<ByteBuffer> firstReceived = new CompletableFuture<>();
CompletableFuture<ByteBuffer> firstReceived = CompletableFuture.completedFuture(null);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't want completed future here.
I'm going to complete it later

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


Flux<ByteBuffer> returned = client.mirrorStreamingBinaryBodyReactive(
Flux.just(fromByteArray(new byte[]{1,2,3}), fromByteArray(new byte[]{4,5,6})))
Flux.just(fromByteArray(new byte[]{1, 2, 3}), fromByteArray(new byte[]{4, 5, 6})))
.subscribeOn(testScheduler())
.delayUntil(testObject -> sentCount.get() == 1 ? fromFuture(firstReceived)
: empty())
.doOnNext(sent -> sentCount.incrementAndGet());

returned.doOnNext(received -> {
byte[] dataReceived = new byte[received.limit()];
received.get(dataReceived);
receivedAll.add(dataReceived);
assertThat(receivedAll.size()).isEqualTo(sentCount.get());
firstReceived.complete(received);
countDownLatch.countDown();
}).subscribe();

countDownLatch.await();

assertThat(receivedAll).containsExactly(new byte[]{1,2,3}, new byte[]{4,5,6});
StepVerifier.create(returned)
.consumeNextWith(byteBuffer -> {
ByteBuffer expectedBuffer = ByteBuffer.allocateDirect(3)
.put(new byte[]{1, 2, 3})
.position(0);
assertEquals(expectedBuffer, byteBuffer);
})
.consumeNextWith(byteBuffer -> {
ByteBuffer expectedBuffer = ByteBuffer.allocateDirect(3)
.put(new byte[]{4, 5, 6})
.position(0);
assertEquals(expectedBuffer, byteBuffer);
})
.verifyComplete();
}

@Test(expected = IllegalArgumentException.class)
Expand Down Expand Up @@ -480,7 +490,7 @@ public void shouldEncodePathParam() {

@Test
public void shouldEncodePathParamWithReservedChars() {
String PATH_PARAM = "workers?in=(\"123/321\")";
String PATH_PARAM = "workers?in=(\"123321\")";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did you change this test string?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


StepVerifier.create(client.encodePath(PATH_PARAM)
.subscribeOn(testScheduler()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public void shouldTakeIntoAccountRetryAfter(){
assertThat(retryDelay).isEqualTo(delay);

retryDelay = retryPolicy.retryDelay(new RetryableException(-1, "error msg", Request.HttpMethod.GET,
null, request), 1);
(Long) null, request), 1);
assertThat(retryDelay).isEqualTo(backoff);
}

Expand Down
Loading
Loading