From e79db84c43b33c209d2e78e47aeaa6003fcc959d Mon Sep 17 00:00:00 2001 From: Ezequiel Werner Date: Wed, 2 Oct 2024 16:45:34 -0300 Subject: [PATCH 1/2] honour selector timeout in first read --- .../GrizzlyRequestDispatcherFilter.java | 7 +- .../server/grizzly/GrizzlyServerManager.java | 2 +- .../ResponseStreamingCompletionHandler.java | 47 ++++++++-- ...nseStreamingCompletionHandlerTestCase.java | 92 +++++++++++++------ 4 files changed, 109 insertions(+), 39 deletions(-) diff --git a/src/main/java/org/mule/service/http/impl/service/server/grizzly/GrizzlyRequestDispatcherFilter.java b/src/main/java/org/mule/service/http/impl/service/server/grizzly/GrizzlyRequestDispatcherFilter.java index 4aa67b03..8cea54ed 100644 --- a/src/main/java/org/mule/service/http/impl/service/server/grizzly/GrizzlyRequestDispatcherFilter.java +++ b/src/main/java/org/mule/service/http/impl/service/server/grizzly/GrizzlyRequestDispatcherFilter.java @@ -50,6 +50,7 @@ import java.nio.charset.Charset; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; import javax.net.ssl.SSLSession; @@ -60,14 +61,16 @@ public class GrizzlyRequestDispatcherFilter extends BaseFilter { private final RequestHandlerProvider requestHandlerProvider; + private final ExecutorService workerPool; private final byte[] SERVER_NOT_AVAILABLE_CONTENT = ("Server not available to handle this request, either not initialized yet " + "or it has been disposed.").getBytes(defaultCharset()); private ConcurrentMap activeRequests = new ConcurrentHashMap<>(); - GrizzlyRequestDispatcherFilter(final RequestHandlerProvider requestHandlerProvider) { + GrizzlyRequestDispatcherFilter(final RequestHandlerProvider requestHandlerProvider, ExecutorService workerPool) { this.requestHandlerProvider = requestHandlerProvider; + this.workerPool = workerPool; } @Override @@ -140,7 +143,7 @@ public void responseReady(HttpResponse response, ResponseStatusCallback response if (response.getEntity().isStreaming()) { new ResponseStreamingCompletionHandler(ctx, requestHandler.getContextClassLoader(), request, response, - requestAdapterNotifyingResponseStatusCallback).start(); + requestAdapterNotifyingResponseStatusCallback, workerPool).start(); } else { new ResponseCompletionHandler(ctx, requestHandler.getContextClassLoader(), request, response, requestAdapterNotifyingResponseStatusCallback).start(); diff --git a/src/main/java/org/mule/service/http/impl/service/server/grizzly/GrizzlyServerManager.java b/src/main/java/org/mule/service/http/impl/service/server/grizzly/GrizzlyServerManager.java index 4f72da01..78252d3a 100644 --- a/src/main/java/org/mule/service/http/impl/service/server/grizzly/GrizzlyServerManager.java +++ b/src/main/java/org/mule/service/http/impl/service/server/grizzly/GrizzlyServerManager.java @@ -116,7 +116,7 @@ public GrizzlyServerManager(ExecutorService selectorPool, this.httpListenerRegistry = httpListenerRegistry; // TODO - MULE-14960: Remove system property once this can be configured through a file this.serverTimeout = getInteger("mule.http.server.timeout", DEFAULT_SERVER_TIMEOUT_MILLIS); - requestHandlerFilter = new GrizzlyRequestDispatcherFilter(httpListenerRegistry); + requestHandlerFilter = new GrizzlyRequestDispatcherFilter(httpListenerRegistry, workerPool); timeoutFilterDelegate = new GrizzlyAddressDelegateFilter<>(); sslFilterDelegate = new GrizzlyAddressDelegateFilter<>(); webSocketFilter = new GrizzlyAddressDelegateFilter<>(); diff --git a/src/main/java/org/mule/service/http/impl/service/server/grizzly/ResponseStreamingCompletionHandler.java b/src/main/java/org/mule/service/http/impl/service/server/grizzly/ResponseStreamingCompletionHandler.java index 8f0c8707..2832d3d6 100644 --- a/src/main/java/org/mule/service/http/impl/service/server/grizzly/ResponseStreamingCompletionHandler.java +++ b/src/main/java/org/mule/service/http/impl/service/server/grizzly/ResponseStreamingCompletionHandler.java @@ -6,22 +6,24 @@ */ package org.mule.service.http.impl.service.server.grizzly; -import static com.google.common.base.Preconditions.checkArgument; +import static org.mule.runtime.api.util.DataUnit.KB; +import static org.mule.runtime.api.util.MuleSystemProperties.MULE_LOG_SEPARATION_DISABLED; +import static org.mule.runtime.api.util.MuleSystemProperties.SYSTEM_PROPERTY_PREFIX; +import static org.mule.runtime.core.api.util.ClassUtils.setContextClassLoader; +import static org.mule.runtime.core.api.util.StringUtils.isEmpty; +import static org.mule.runtime.http.api.HttpHeaders.Names.CONTENT_LENGTH; +import static org.mule.service.http.impl.service.server.grizzly.ExecutorPerServerAddressIOStrategy.DELEGATE_WRITES_IN_CONFIGURED_EXECUTOR; + import static java.lang.Integer.valueOf; import static java.lang.Math.min; import static java.lang.System.getProperty; import static java.lang.System.nanoTime; -import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.lang.Thread.currentThread; +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +import static com.google.common.base.Preconditions.checkArgument; import static org.glassfish.grizzly.http.HttpServerFilter.RESPONSE_COMPLETE_EVENT; import static org.glassfish.grizzly.nio.transport.TCPNIOTransport.MAX_SEND_BUFFER_SIZE; -import static org.mule.runtime.api.util.DataUnit.KB; -import static org.mule.runtime.api.util.MuleSystemProperties.SYSTEM_PROPERTY_PREFIX; -import static org.mule.runtime.api.util.MuleSystemProperties.MULE_LOG_SEPARATION_DISABLED; -import static org.mule.runtime.core.api.util.ClassUtils.setContextClassLoader; -import static org.mule.runtime.core.api.util.StringUtils.isEmpty; -import static org.mule.runtime.http.api.HttpHeaders.Names.CONTENT_LENGTH; -import static org.mule.service.http.impl.service.server.grizzly.ExecutorPerServerAddressIOStrategy.DELEGATE_WRITES_IN_CONFIGURED_EXECUTOR; import static org.slf4j.LoggerFactory.getLogger; import org.mule.runtime.api.connection.SourceRemoteConnectionException; @@ -33,6 +35,8 @@ import java.io.IOException; import java.io.InputStream; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; import org.glassfish.grizzly.Buffer; import org.glassfish.grizzly.Connection; @@ -62,6 +66,7 @@ public class ResponseStreamingCompletionHandler extends BaseResponseCompletionHa private static final String SELECTOR_TIMEOUT = SYSTEM_PROPERTY_PREFIX + "timeoutToUseSelectorWhileStreamingResponseMillis"; private final long selectorTimeoutNanos = MILLISECONDS.toNanos(Long.valueOf(getProperty(SELECTOR_TIMEOUT, "50"))); + private final ExecutorService workerPool; private volatile boolean isDone; private boolean alreadyFailed = false; @@ -69,7 +74,9 @@ public class ResponseStreamingCompletionHandler extends BaseResponseCompletionHa public ResponseStreamingCompletionHandler(final FilterChainContext ctx, ClassLoader ctxClassLoader, final HttpRequestPacket request, - final HttpResponse httpResponse, ResponseStatusCallback responseStatusCallback) { + final HttpResponse httpResponse, + ResponseStatusCallback responseStatusCallback, + ExecutorService workerPool) { checkArgument((httpResponse.getEntity().isStreaming()), "HTTP response entity must be stream based"); this.ctx = ctx; this.ctxClassLoader = ctxClassLoader; @@ -79,6 +86,7 @@ public ResponseStreamingCompletionHandler(final FilterChainContext ctx, bufferSize = calculateBufferSize(ctx, ctxClassLoader); this.responseStatusCallback = responseStatusCallback; this.startTimeNanos = nanoTime(); + this.workerPool = workerPool; } /** @@ -117,6 +125,25 @@ private int calculateBufferSize(FilterChainContext ctx, ClassLoader ctxClassLoad } public void start() throws IOException { + if (isSelectorTimeout()) { + markConnectionToDelegateWritesInConfiguredExecutor(true); + try { + workerPool.submit(() -> { + try { + start0(); + } catch (Exception exception) { + responseStatusCallback.onErrorSendingResponse(exception); + } + }); + } catch (RejectedExecutionException ree) { + start0(); + } + } else { + start0(); + } + } + + private void start0() throws IOException { Thread thread = null; ClassLoader currentClassLoader = null; ClassLoader newClassLoader = null; diff --git a/src/test/java/org/mule/service/http/impl/service/server/grizzly/ResponseStreamingCompletionHandlerTestCase.java b/src/test/java/org/mule/service/http/impl/service/server/grizzly/ResponseStreamingCompletionHandlerTestCase.java index 4c985f86..7d8daeac 100644 --- a/src/test/java/org/mule/service/http/impl/service/server/grizzly/ResponseStreamingCompletionHandlerTestCase.java +++ b/src/test/java/org/mule/service/http/impl/service/server/grizzly/ResponseStreamingCompletionHandlerTestCase.java @@ -6,41 +6,45 @@ */ package org.mule.service.http.impl.service.server.grizzly; +import static org.mule.runtime.http.api.HttpHeaders.Names.CONNECTION; +import static org.mule.service.http.impl.AllureConstants.HttpFeature.HTTP_SERVICE; +import static org.mule.service.http.impl.AllureConstants.HttpFeature.HttpStory.RESPONSES; +import static org.mule.tck.MuleTestUtils.testWithSystemProperty; + import static java.lang.Thread.currentThread; + import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; -import static org.junit.rules.ExpectedException.none; +import static org.junit.Assert.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import static org.mule.runtime.http.api.HttpHeaders.Names.CONNECTION; -import static org.mule.service.http.impl.AllureConstants.HttpFeature.HTTP_SERVICE; -import static org.mule.service.http.impl.AllureConstants.HttpFeature.HttpStory.RESPONSES; - -import io.qameta.allure.Issue; -import org.glassfish.grizzly.http.ProcessingState; -import org.junit.Rule; -import org.junit.Test; import org.mule.runtime.api.exception.MuleRuntimeException; import org.mule.runtime.api.util.MultiMap; import org.mule.runtime.http.api.domain.entity.InputStreamHttpEntity; import org.mule.runtime.http.api.domain.message.response.HttpResponse; -import org.glassfish.grizzly.Transport; -import org.junit.Before; - import java.io.IOException; import java.io.InputStream; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import io.qameta.allure.Feature; +import io.qameta.allure.Issue; import io.qameta.allure.Story; -import org.junit.rules.ExpectedException; +import org.glassfish.grizzly.Transport; +import org.glassfish.grizzly.http.HttpContent; +import org.glassfish.grizzly.http.ProcessingState; +import org.junit.Before; +import org.junit.Test; @Feature(HTTP_SERVICE) @Story(RESPONSES) @@ -48,9 +52,7 @@ public class ResponseStreamingCompletionHandlerTestCase extends BaseResponseComp private ResponseStreamingCompletionHandler handler; private InputStream mockStream; - - @Rule - public ExpectedException exception = none(); + private final ExecutorService workerPool = mock(ExecutorService.class); @Before public void setUp() { @@ -62,7 +64,8 @@ public void setUp() { currentThread().getContextClassLoader(), request, responseMock, - callback); + callback, + workerPool); } @Override @@ -80,7 +83,8 @@ public void keepAliveConnection() { currentThread().getContextClassLoader(), request, responseMock, - callback); + callback, + workerPool); assertThat(handler.getHttpResponsePacket().getHeader(CONNECTION), equalTo(KEEP_ALIVE)); } @@ -94,7 +98,8 @@ public void cLoseConnection() { currentThread().getContextClassLoader(), request, responseMock, - callback); + callback, + workerPool); assertThat(getHandler().getHttpResponsePacket().getHeader(CONNECTION), equalTo(CLOSE)); } @@ -106,7 +111,8 @@ public void completionHandlerFailsIfAReadOperationThrowsAMuleRuntimeException() currentThread().getContextClassLoader(), request, responseMock, - callback)); + callback, + workerPool)); when(mockStream.read(any(), anyInt(), anyInt())).thenThrow(new MuleRuntimeException(new NullPointerException())); handler.sendInputStreamChunk(); @@ -122,11 +128,10 @@ public void IOExceptionIsRethrownIfCauseOfFailure() throws IOException { currentThread().getContextClassLoader(), request, responseMock, - callback)); - - exception.expect(IOException.class); + callback, + workerPool)); when(mockStream.read(any(), anyInt(), anyInt())).thenThrow(new MuleRuntimeException(new IOException())); - handler.sendInputStreamChunk(); + assertThrows(IOException.class, () -> handler.sendInputStreamChunk()); } @Test @@ -137,7 +142,8 @@ public void handlerDoesntThrowNPEWhenConnectionIsNull() { currentThread().getContextClassLoader(), request, responseMock, - callback); + callback, + workerPool); // When an unexpected error makes getConnection return null. when(ctx.getConnection()).thenReturn(null); // Then the failed() method is executed without throwing NPE. @@ -152,7 +158,8 @@ public void failedMethodBehaviorIsExecutedOnlyOnceForTheSameHandler() throws IOE currentThread().getContextClassLoader(), request, responseMock, - callback); + callback, + workerPool); // When the failed() method is invoked several times handler.failed(createExpectedException()); @@ -164,6 +171,39 @@ public void failedMethodBehaviorIsExecutedOnlyOnceForTheSameHandler() throws IOE verify(callback, times(1)).onErrorSendingResponse(any(Exception.class)); } + @Test + public void whenTheTimeoutIsElapsedThenTheStartIsExecutedInTheWorkerScheduler() throws Exception { + responseMock = HttpResponse.builder().entity(new InputStreamHttpEntity(mockStream)).build(); + testWithSystemProperty("mule.timeoutToUseSelectorWhileStreamingResponseMillis", "0", () -> { + handler = new ResponseStreamingCompletionHandler(ctx, + currentThread().getContextClassLoader(), + request, + responseMock, + callback, + workerPool); + + handler.start(); + verify(workerPool, times(1)).submit(any(Runnable.class)); + }); + } + + @Test + public void whenTheTimeoutIsNotElapsedThenTheStartIsNotExecutedInTheWorkerScheduler() throws Exception { + responseMock = HttpResponse.builder().entity(new InputStreamHttpEntity(mockStream)).build(); + testWithSystemProperty("mule.timeoutToUseSelectorWhileStreamingResponseMillis", "100000", () -> { + handler = new ResponseStreamingCompletionHandler(ctx, + currentThread().getContextClassLoader(), + request, + responseMock, + callback, + workerPool); + + handler.start(); + verify(workerPool, never()).submit(any(Runnable.class)); + verify(ctx, times(1)).write(any(HttpContent.class), eq(handler)); + }); + } + private Exception createExpectedException() { return new Exception("EXPECTED EXCEPTION"); } From ba6e28d2a7c227c16a65cd5a270c97c4ea1091c8 Mon Sep 17 00:00:00 2001 From: Ezequiel Werner Date: Wed, 2 Oct 2024 17:12:51 -0300 Subject: [PATCH 2/2] allure --- .../grizzly/ResponseStreamingCompletionHandlerTestCase.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/test/java/org/mule/service/http/impl/service/server/grizzly/ResponseStreamingCompletionHandlerTestCase.java b/src/test/java/org/mule/service/http/impl/service/server/grizzly/ResponseStreamingCompletionHandlerTestCase.java index 7d8daeac..6a54cf17 100644 --- a/src/test/java/org/mule/service/http/impl/service/server/grizzly/ResponseStreamingCompletionHandlerTestCase.java +++ b/src/test/java/org/mule/service/http/impl/service/server/grizzly/ResponseStreamingCompletionHandlerTestCase.java @@ -172,6 +172,7 @@ public void failedMethodBehaviorIsExecutedOnlyOnceForTheSameHandler() throws IOE } @Test + @Issue("W-16892585") public void whenTheTimeoutIsElapsedThenTheStartIsExecutedInTheWorkerScheduler() throws Exception { responseMock = HttpResponse.builder().entity(new InputStreamHttpEntity(mockStream)).build(); testWithSystemProperty("mule.timeoutToUseSelectorWhileStreamingResponseMillis", "0", () -> { @@ -188,6 +189,7 @@ public void whenTheTimeoutIsElapsedThenTheStartIsExecutedInTheWorkerScheduler() } @Test + @Issue("W-16892585") public void whenTheTimeoutIsNotElapsedThenTheStartIsNotExecutedInTheWorkerScheduler() throws Exception { responseMock = HttpResponse.builder().entity(new InputStreamHttpEntity(mockStream)).build(); testWithSystemProperty("mule.timeoutToUseSelectorWhileStreamingResponseMillis", "100000", () -> {