From c0af1ebb8f01fbe803e65f3492b3eaebfd41d03b Mon Sep 17 00:00:00 2001 From: "Ernst Michael (INST/ECS2)" Date: Wed, 5 Jul 2017 13:40:46 +0200 Subject: [PATCH 1/4] 1412 Wait for async close tasks before close completes --- .../asynchttpclient/AsyncHttpClientState.java | 2 +- .../DefaultAsyncHttpClient.java | 16 +++++++++++-- .../netty/channel/ChannelManager.java | 23 ++++++++++++++++++- .../netty/request/NettyChannelConnector.java | 2 +- .../netty/request/NettyRequestSender.java | 2 +- 5 files changed, 39 insertions(+), 6 deletions(-) diff --git a/client/src/main/java/org/asynchttpclient/AsyncHttpClientState.java b/client/src/main/java/org/asynchttpclient/AsyncHttpClientState.java index b2570056f5..b1fa0d5334 100644 --- a/client/src/main/java/org/asynchttpclient/AsyncHttpClientState.java +++ b/client/src/main/java/org/asynchttpclient/AsyncHttpClientState.java @@ -23,7 +23,7 @@ public AsyncHttpClientState(AtomicBoolean closed) { this.closed = closed; } - public boolean isClosed() { + public boolean isClosedOrClosingIsTriggered() { return closed.get(); } } diff --git a/client/src/main/java/org/asynchttpclient/DefaultAsyncHttpClient.java b/client/src/main/java/org/asynchttpclient/DefaultAsyncHttpClient.java index bded469db2..46fec4d351 100644 --- a/client/src/main/java/org/asynchttpclient/DefaultAsyncHttpClient.java +++ b/client/src/main/java/org/asynchttpclient/DefaultAsyncHttpClient.java @@ -19,8 +19,10 @@ import static org.asynchttpclient.util.Assertions.assertNotNull; import io.netty.channel.EventLoopGroup; import io.netty.util.HashedWheelTimer; +import io.netty.util.ThreadDeathWatcher; import io.netty.util.Timer; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Predicate; @@ -42,6 +44,7 @@ public class DefaultAsyncHttpClient implements AsyncHttpClient { private final static Logger LOGGER = LoggerFactory.getLogger(DefaultAsyncHttpClient.class); private final AsyncHttpClientConfig config; + private final AtomicBoolean closeTriggered = new AtomicBoolean(false); private final AtomicBoolean closed = new AtomicBoolean(false); private final ChannelManager channelManager; private final ConnectionSemaphore connectionSemaphore; @@ -87,7 +90,7 @@ public DefaultAsyncHttpClient(AsyncHttpClientConfig config) { channelManager = new ChannelManager(config, nettyTimer); connectionSemaphore = new ConnectionSemaphore(config); - requestSender = new NettyRequestSender(config, channelManager, connectionSemaphore, nettyTimer, new AsyncHttpClientState(closed)); + requestSender = new NettyRequestSender(config, channelManager, connectionSemaphore, nettyTimer, new AsyncHttpClientState(closeTriggered)); channelManager.configureBootstraps(requestSender); } @@ -99,7 +102,7 @@ private Timer newNettyTimer() { @Override public void close() { - if (closed.compareAndSet(false, true)) { + if (closeTriggered.compareAndSet(false, true)) { try { channelManager.close(); } catch (Throwable t) { @@ -112,6 +115,15 @@ public void close() { LOGGER.warn("Unexpected error on HashedWheelTimer close", t); } } + + //see https://github.com/netty/netty/issues/2084#issuecomment-44822314 + try { + ThreadDeathWatcher.awaitInactivity(5, TimeUnit.SECONDS); + } catch(InterruptedException t) { + // Ignore + } + + closed.compareAndSet(false, true); } } diff --git a/client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java b/client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java index 99a9bb4699..5a7736d054 100755 --- a/client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java +++ b/client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java @@ -43,6 +43,7 @@ import java.net.InetSocketAddress; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.function.Function; @@ -99,11 +100,15 @@ public class ChannelManager { private final ChannelPool channelPool; private final ChannelGroup openChannels; + private final CountDownLatch closeLatch; + private AsyncHttpClientHandler wsHandler; public ChannelManager(final AsyncHttpClientConfig config, Timer nettyTimer) { this.config = config; + + closeLatch = new CountDownLatch(2); this.sslEngineFactory = config.getSslEngineFactory() != null ? config.getSslEngineFactory() : new DefaultSslEngineFactory(); try { @@ -300,8 +305,17 @@ public boolean removeAll(Channel connection) { } private void doClose() { - openChannels.close(); + openChannels.close().addListener(future -> closeLatch.countDown()); channelPool.destroy(); + + //see https://github.com/netty/netty/issues/2084#issuecomment-44822314 + try { + GlobalEventExecutor.INSTANCE.awaitInactivity(5, TimeUnit.SECONDS); + } catch(InterruptedException t) { + // Ignore + } + + closeLatch.countDown(); } public void close() { @@ -310,6 +324,13 @@ public void close() { .addListener(future -> doClose()); } else doClose(); + + try { + closeLatch.await(); + } + catch (InterruptedException e) { + // Ignore + } } public void closeChannel(Channel channel) { diff --git a/client/src/main/java/org/asynchttpclient/netty/request/NettyChannelConnector.java b/client/src/main/java/org/asynchttpclient/netty/request/NettyChannelConnector.java index 1bccecec42..89434578ea 100644 --- a/client/src/main/java/org/asynchttpclient/netty/request/NettyChannelConnector.java +++ b/client/src/main/java/org/asynchttpclient/netty/request/NettyChannelConnector.java @@ -72,7 +72,7 @@ public void connect(final Bootstrap bootstrap, final NettyConnectListener con try { connect0(bootstrap, connectListener, remoteAddress); } catch (RejectedExecutionException e) { - if (clientState.isClosed()) { + if (clientState.isClosedOrClosingIsTriggered()) { LOGGER.info("Connect crash but engine is shutting down"); } else { connectListener.onFailure(null, e); diff --git a/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java b/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java index 6e3e0e0f62..f12611710b 100755 --- a/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java +++ b/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java @@ -597,7 +597,7 @@ public void replayRequest(final NettyResponseFuture future, FilterContext fc, } public boolean isClosed() { - return clientState.isClosed(); + return clientState.isClosedOrClosingIsTriggered(); } public void drainChannelAndExecuteNextRequest(final Channel channel, final NettyResponseFuture future, Request nextRequest) { From 88ce96d97073f856c56c83dbdde3545ef75a7dcb Mon Sep 17 00:00:00 2001 From: "Ernst Michael (INST/ECS2)" Date: Wed, 5 Jul 2017 15:42:48 +0200 Subject: [PATCH 2/4] 1412 reused timeouts for waits and covered corner cases for await, renamed state property --- .../java/org/asynchttpclient/AsyncHttpClientState.java | 10 +++++----- .../org/asynchttpclient/DefaultAsyncHttpClient.java | 2 +- .../asynchttpclient/netty/channel/ChannelManager.java | 8 ++++---- .../netty/request/NettyChannelConnector.java | 2 +- .../netty/request/NettyRequestSender.java | 2 +- 5 files changed, 12 insertions(+), 12 deletions(-) diff --git a/client/src/main/java/org/asynchttpclient/AsyncHttpClientState.java b/client/src/main/java/org/asynchttpclient/AsyncHttpClientState.java index b1fa0d5334..dd09ca386a 100644 --- a/client/src/main/java/org/asynchttpclient/AsyncHttpClientState.java +++ b/client/src/main/java/org/asynchttpclient/AsyncHttpClientState.java @@ -17,13 +17,13 @@ public class AsyncHttpClientState { - private final AtomicBoolean closed; + private final AtomicBoolean closeTriggered; - public AsyncHttpClientState(AtomicBoolean closed) { - this.closed = closed; + public AsyncHttpClientState(AtomicBoolean closeTriggered) { + this.closeTriggered = closeTriggered; } - public boolean isClosedOrClosingIsTriggered() { - return closed.get(); + public boolean isCloseTriggered() { + return closeTriggered.get(); } } diff --git a/client/src/main/java/org/asynchttpclient/DefaultAsyncHttpClient.java b/client/src/main/java/org/asynchttpclient/DefaultAsyncHttpClient.java index 46fec4d351..c81768c466 100644 --- a/client/src/main/java/org/asynchttpclient/DefaultAsyncHttpClient.java +++ b/client/src/main/java/org/asynchttpclient/DefaultAsyncHttpClient.java @@ -118,7 +118,7 @@ public void close() { //see https://github.com/netty/netty/issues/2084#issuecomment-44822314 try { - ThreadDeathWatcher.awaitInactivity(5, TimeUnit.SECONDS); + ThreadDeathWatcher.awaitInactivity(config.getShutdownTimeout(), TimeUnit.MILLISECONDS); } catch(InterruptedException t) { // Ignore } diff --git a/client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java b/client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java index 5a7736d054..e8caa14f94 100755 --- a/client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java +++ b/client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java @@ -310,12 +310,12 @@ private void doClose() { //see https://github.com/netty/netty/issues/2084#issuecomment-44822314 try { - GlobalEventExecutor.INSTANCE.awaitInactivity(5, TimeUnit.SECONDS); + GlobalEventExecutor.INSTANCE.awaitInactivity(config.getShutdownTimeout(), TimeUnit.MILLISECONDS); } catch(InterruptedException t) { // Ignore + } finally { + closeLatch.countDown(); } - - closeLatch.countDown(); } public void close() { @@ -326,7 +326,7 @@ public void close() { doClose(); try { - closeLatch.await(); + closeLatch.await(config.getShutdownTimeout(), TimeUnit.MILLISECONDS); } catch (InterruptedException e) { // Ignore diff --git a/client/src/main/java/org/asynchttpclient/netty/request/NettyChannelConnector.java b/client/src/main/java/org/asynchttpclient/netty/request/NettyChannelConnector.java index 89434578ea..8ec3f78725 100644 --- a/client/src/main/java/org/asynchttpclient/netty/request/NettyChannelConnector.java +++ b/client/src/main/java/org/asynchttpclient/netty/request/NettyChannelConnector.java @@ -72,7 +72,7 @@ public void connect(final Bootstrap bootstrap, final NettyConnectListener con try { connect0(bootstrap, connectListener, remoteAddress); } catch (RejectedExecutionException e) { - if (clientState.isClosedOrClosingIsTriggered()) { + if (clientState.isCloseTriggered()) { LOGGER.info("Connect crash but engine is shutting down"); } else { connectListener.onFailure(null, e); diff --git a/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java b/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java index f12611710b..7241c3f008 100755 --- a/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java +++ b/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java @@ -597,7 +597,7 @@ public void replayRequest(final NettyResponseFuture future, FilterContext fc, } public boolean isClosed() { - return clientState.isClosedOrClosingIsTriggered(); + return clientState.isCloseTriggered(); } public void drainChannelAndExecuteNextRequest(final Channel channel, final NettyResponseFuture future, Request nextRequest) { From 3bb5ae475706a305ea84e32b9e7afafbac0b8ef9 Mon Sep 17 00:00:00 2001 From: "Ernst Michael (INST/ECS2)" Date: Thu, 6 Jul 2017 10:50:16 +0200 Subject: [PATCH 3/4] 1412 fixed compile error after merge --- .../org/asynchttpclient/netty/channel/ChannelManager.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java b/client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java index c9df93a10e..7f93988b59 100755 --- a/client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java +++ b/client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java @@ -322,13 +322,12 @@ public void close() { if (allowReleaseEventLoopGroup) { eventLoopGroup.shutdownGracefully(config.getShutdownQuietPeriod(), config.getShutdownTimeout(), TimeUnit.MILLISECONDS)// .addListener(future -> doClose()); - } else { + } else doClose(); try { closeLatch.await(config.getShutdownTimeout(), TimeUnit.MILLISECONDS); - } - catch (InterruptedException e) { + } catch (InterruptedException e) { // Ignore } } From 7026fc237167e245018092cfab4c44d68f569898 Mon Sep 17 00:00:00 2001 From: Michael Ernst Date: Tue, 26 Sep 2017 10:52:57 +0200 Subject: [PATCH 4/4] ahc-1412 added a new close method to wait on shared netty resources --- .../org/asynchttpclient/AsyncHttpClient.java | 8 +++ .../DefaultAsyncHttpClient.java | 65 ++++++++++++++----- .../netty/channel/ChannelManager.java | 48 ++++++-------- .../extras/registry/BadAsyncHttpClient.java | 5 ++ .../extras/registry/TestAsyncHttpClient.java | 4 ++ 5 files changed, 85 insertions(+), 45 deletions(-) diff --git a/client/src/main/java/org/asynchttpclient/AsyncHttpClient.java b/client/src/main/java/org/asynchttpclient/AsyncHttpClient.java index 7d1c0c6506..a9411621f4 100755 --- a/client/src/main/java/org/asynchttpclient/AsyncHttpClient.java +++ b/client/src/main/java/org/asynchttpclient/AsyncHttpClient.java @@ -287,4 +287,12 @@ public interface AsyncHttpClient extends Closeable { * @return the config associated to this client. */ AsyncHttpClientConfig getConfig(); + + /** + * Similar to calling the method {@link #close()} but additionally waits for inactivity on shared resources between + * multiple instances of netty. Calling this method instead of the method {@link #close()} might be helpful + * on application shutdown to prevent errors like a {@link ClassNotFoundException} because the class loader was + * already removed but there are still some active tasks on this shared resources which want to access these classes. + */ + void closeAndAwaitInactivity(); } diff --git a/client/src/main/java/org/asynchttpclient/DefaultAsyncHttpClient.java b/client/src/main/java/org/asynchttpclient/DefaultAsyncHttpClient.java index c81768c466..fe26f4ab58 100644 --- a/client/src/main/java/org/asynchttpclient/DefaultAsyncHttpClient.java +++ b/client/src/main/java/org/asynchttpclient/DefaultAsyncHttpClient.java @@ -21,8 +21,12 @@ import io.netty.util.HashedWheelTimer; import io.netty.util.ThreadDeathWatcher; import io.netty.util.Timer; +import io.netty.util.concurrent.GlobalEventExecutor; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Predicate; @@ -102,31 +106,60 @@ private Timer newNettyTimer() { @Override public void close() { + closeInternal(false); + } + + public void closeAndAwaitInactivity() { + closeInternal(true); + } + + private void closeInternal(boolean awaitInactivity) { if (closeTriggered.compareAndSet(false, true)) { - try { - channelManager.close(); - } catch (Throwable t) { - LOGGER.warn("Unexpected error on ChannelManager close", t); - } - if (allowStopNettyTimer) { - try { - nettyTimer.stop(); - } catch (Throwable t) { - LOGGER.warn("Unexpected error on HashedWheelTimer close", t); + CompletableFuture handledCloseFuture = channelManager.close().whenComplete((v, t) -> { + if(t != null) { + LOGGER.warn("Unexpected error on ChannelManager close", t); + } + if (allowStopNettyTimer) { + try { + nettyTimer.stop(); + } catch (Throwable th) { + LOGGER.warn("Unexpected error on HashedWheelTimer close", th); + } } + }); + + if(awaitInactivity) { + handledCloseFuture = handledCloseFuture.thenCombine(awaitInactivity(), (v1,v2) -> null) ; } - - //see https://github.com/netty/netty/issues/2084#issuecomment-44822314 + try { - ThreadDeathWatcher.awaitInactivity(config.getShutdownTimeout(), TimeUnit.MILLISECONDS); - } catch(InterruptedException t) { - // Ignore + handledCloseFuture.get(config.getShutdownTimeout(), TimeUnit.MILLISECONDS); + } catch (InterruptedException | TimeoutException t) { + LOGGER.warn("Unexpected error on AsyncHttpClient close", t); + } catch (ExecutionException e) { + // already handled and could be ignored } - closed.compareAndSet(false, true); } } + private CompletableFuture awaitInactivity() { + //see https://github.com/netty/netty/issues/2084#issuecomment-44822314 + CompletableFuture wait1 = CompletableFuture.runAsync(() -> { + try { + GlobalEventExecutor.INSTANCE.awaitInactivity(config.getShutdownTimeout(), TimeUnit.MILLISECONDS); + } catch(InterruptedException t) { + // Ignore + }}); + CompletableFuture wait2 = CompletableFuture.runAsync(() -> { + try { + ThreadDeathWatcher.awaitInactivity(config.getShutdownTimeout(), TimeUnit.MILLISECONDS); + } catch(InterruptedException t) { + // Ignore + }}); + return wait1.thenCombine(wait2, (v1,v2) -> null); + } + @Override public boolean isClosed() { return closed.get(); diff --git a/client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java b/client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java index 7f93988b59..5150169388 100755 --- a/client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java +++ b/client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java @@ -38,12 +38,13 @@ import io.netty.handler.stream.ChunkedWriteHandler; import io.netty.util.Timer; import io.netty.util.concurrent.DefaultThreadFactory; +import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GlobalEventExecutor; import java.net.InetSocketAddress; import java.util.Map; import java.util.Map.Entry; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.function.Function; @@ -100,15 +101,11 @@ public class ChannelManager { private final ChannelPool channelPool; private final ChannelGroup openChannels; - private final CountDownLatch closeLatch; - private AsyncHttpClientHandler wsHandler; public ChannelManager(final AsyncHttpClientConfig config, Timer nettyTimer) { this.config = config; - - closeLatch = new CountDownLatch(2); this.sslEngineFactory = config.getSslEngineFactory() != null ? config.getSslEngineFactory() : new DefaultSslEngineFactory(); try { @@ -304,32 +301,25 @@ public boolean removeAll(Channel connection) { return channelPool.removeAll(connection); } - private void doClose() { - openChannels.close().addListener(future -> closeLatch.countDown()); - channelPool.destroy(); - - //see https://github.com/netty/netty/issues/2084#issuecomment-44822314 - try { - GlobalEventExecutor.INSTANCE.awaitInactivity(config.getShutdownTimeout(), TimeUnit.MILLISECONDS); - } catch(InterruptedException t) { - // Ignore - } finally { - closeLatch.countDown(); - } - } - - public void close() { + public CompletableFuture close() { + CompletableFuture closeFuture = CompletableFuture.completedFuture(null); if (allowReleaseEventLoopGroup) { - eventLoopGroup.shutdownGracefully(config.getShutdownQuietPeriod(), config.getShutdownTimeout(), TimeUnit.MILLISECONDS)// - .addListener(future -> doClose()); - } else - doClose(); - - try { - closeLatch.await(config.getShutdownTimeout(), TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - // Ignore + closeFuture = toCompletableFuture( + eventLoopGroup.shutdownGracefully(config.getShutdownQuietPeriod(), config.getShutdownTimeout(), TimeUnit.MILLISECONDS)); } + return closeFuture.thenCompose(v -> toCompletableFuture(openChannels.close())).whenComplete((v,t) -> channelPool.destroy()); + } + + private static CompletableFuture toCompletableFuture(final Future future) { + final CompletableFuture completableFuture = new CompletableFuture<>(); + future.addListener(r -> { + if(r.isSuccess()) { + completableFuture.complete(null); + } else { + completableFuture.completeExceptionally(r.cause()); + } + }); + return completableFuture; } public void closeChannel(Channel channel) { diff --git a/extras/registry/src/test/java/org/asynchttpclient/extras/registry/BadAsyncHttpClient.java b/extras/registry/src/test/java/org/asynchttpclient/extras/registry/BadAsyncHttpClient.java index 713887c98d..8891e7ffae 100644 --- a/extras/registry/src/test/java/org/asynchttpclient/extras/registry/BadAsyncHttpClient.java +++ b/extras/registry/src/test/java/org/asynchttpclient/extras/registry/BadAsyncHttpClient.java @@ -143,4 +143,9 @@ public void flushChannelPoolPartitions(Predicate predicate) { public AsyncHttpClientConfig getConfig() { return null; } + + @Override + public void closeAndAwaitInactivity() { + + } } diff --git a/extras/registry/src/test/java/org/asynchttpclient/extras/registry/TestAsyncHttpClient.java b/extras/registry/src/test/java/org/asynchttpclient/extras/registry/TestAsyncHttpClient.java index 0e61c109fd..275ac2f3fc 100644 --- a/extras/registry/src/test/java/org/asynchttpclient/extras/registry/TestAsyncHttpClient.java +++ b/extras/registry/src/test/java/org/asynchttpclient/extras/registry/TestAsyncHttpClient.java @@ -139,4 +139,8 @@ public void flushChannelPoolPartitions(Predicate predicate) { public AsyncHttpClientConfig getConfig() { return null; } + + @Override + public void closeAndAwaitInactivity() { + } }