diff --git a/httpclient-core/src/main/java/esa/httpclient/core/exception/ClosedConnectionException.java b/httpclient-core/src/main/java/esa/httpclient/core/exception/ClosedConnectionException.java index fcd11824..64fbfa95 100644 --- a/httpclient-core/src/main/java/esa/httpclient/core/exception/ClosedConnectionException.java +++ b/httpclient-core/src/main/java/esa/httpclient/core/exception/ClosedConnectionException.java @@ -19,9 +19,6 @@ public class ClosedConnectionException extends IOException { - public static final ClosedConnectionException INSTANCE = new ClosedConnectionException( - "Connection is inactive"); - private static final long serialVersionUID = -7491330351921922628L; public ClosedConnectionException(String msg) { diff --git a/httpclient-core/src/main/java/esa/httpclient/core/exception/ClosedStreamException.java b/httpclient-core/src/main/java/esa/httpclient/core/exception/ClosedStreamException.java new file mode 100644 index 00000000..23ddd515 --- /dev/null +++ b/httpclient-core/src/main/java/esa/httpclient/core/exception/ClosedStreamException.java @@ -0,0 +1,34 @@ +/* + * Copyright 2021 OPPO ESA Stack Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package esa.httpclient.core.exception; + +import java.io.IOException; + +public class ClosedStreamException extends IOException { + + public static final ClosedStreamException CAUSED_BY_RST = + new ClosedStreamException("Received reset stream and current stream will be discarded"); + + public static final ClosedStreamException CAUSED_BY_REMOVED = + new ClosedStreamException("Stream has been removed"); + + private static final long serialVersionUID = -7491330351921922628L; + + public ClosedStreamException(String msg) { + super(msg); + } +} + diff --git a/httpclient-core/src/main/java/esa/httpclient/core/exec/RetryPredicateImpl.java b/httpclient-core/src/main/java/esa/httpclient/core/exec/RetryPredicateImpl.java index 296022c5..709c77c6 100644 --- a/httpclient-core/src/main/java/esa/httpclient/core/exec/RetryPredicateImpl.java +++ b/httpclient-core/src/main/java/esa/httpclient/core/exec/RetryPredicateImpl.java @@ -19,6 +19,7 @@ import esa.httpclient.core.HttpRequest; import esa.httpclient.core.HttpResponse; import esa.httpclient.core.exception.ClosedConnectionException; +import esa.httpclient.core.exception.ClosedStreamException; import esa.httpclient.core.util.Futures; import java.net.ConnectException; @@ -40,7 +41,9 @@ public boolean canRetry(HttpRequest request, } final Throwable unwrapped = Futures.unwrapped(cause); - if (unwrapped instanceof ConnectException || unwrapped instanceof ClosedConnectionException) { + if (unwrapped instanceof ConnectException + || unwrapped instanceof ClosedConnectionException + || unwrapped instanceof ClosedStreamException) { return true; } diff --git a/httpclient-core/src/main/java/esa/httpclient/core/netty/Http2FrameHandler.java b/httpclient-core/src/main/java/esa/httpclient/core/netty/Http2FrameHandler.java index 79a708fa..a98122f0 100644 --- a/httpclient-core/src/main/java/esa/httpclient/core/netty/Http2FrameHandler.java +++ b/httpclient-core/src/main/java/esa/httpclient/core/netty/Http2FrameHandler.java @@ -19,7 +19,7 @@ import esa.commons.netty.core.BufferImpl; import esa.commons.netty.http.Http2HeadersAdaptor; import esa.httpclient.core.Context; -import esa.httpclient.core.exception.ClosedConnectionException; +import esa.httpclient.core.exception.ClosedStreamException; import esa.httpclient.core.exception.ContentOverSizedException; import esa.httpclient.core.util.HttpHeadersUtils; import esa.httpclient.core.util.LoggerUtils; @@ -30,7 +30,6 @@ import io.netty.handler.codec.http.HttpStatusClass; import io.netty.handler.codec.http2.Http2CodecUtil; import io.netty.handler.codec.http2.Http2Connection; -import io.netty.handler.codec.http2.Http2Error; import io.netty.handler.codec.http2.Http2EventAdapter; import io.netty.handler.codec.http2.Http2Exception; import io.netty.handler.codec.http2.Http2Headers; @@ -155,8 +154,11 @@ public void onRstStreamRead(ChannelHandlerContext ctx, long errorCode) { final Http2Stream stream = connection.stream(streamId); if (stream != null) { - onError(Http2Exception.streamError(streamId, Http2Error.valueOf(errorCode), - "Received reset stream"), stream, streamId, true); + final Throwable ex = ClosedStreamException.CAUSED_BY_RST; + if (LoggerUtils.logger().isDebugEnabled()) { + LoggerUtils.logger().debug(ex.getMessage()); + } + onError(ex, stream, streamId, false); } } @@ -165,12 +167,12 @@ public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData) throws Http2Exception { final String errMsg = debugData.toString(StandardCharsets.UTF_8); - final ClosedConnectionException ex; + final ClosedStreamException ex; if (NO_ERROR.code() == errorCode) { - ex = new ClosedConnectionException("Received goAway stream in connection: " + ctx.channel() + ex = new ClosedStreamException("Received goAway stream in connection: " + ctx.channel() + ", maybe server has closed the connection"); } else { - ex = new ClosedConnectionException("Received goAway stream in connection: " + + ex = new ClosedStreamException("Received goAway stream in connection: " + ctx.channel() + ", msg: " + errMsg); } @@ -221,8 +223,8 @@ public void onStreamRemoved(Http2Stream stream) { try { super.onStreamRemoved(stream); } finally { - onError(Http2Exception.streamError(stream.id(), Http2Error.STREAM_CLOSED, "Stream removed"), - stream, -1, false); + final Throwable ex = ClosedStreamException.CAUSED_BY_REMOVED; + onError(ex, stream, -1, false); } } diff --git a/httpclient-core/src/main/java/esa/httpclient/core/netty/NettyHandle.java b/httpclient-core/src/main/java/esa/httpclient/core/netty/NettyHandle.java index 6bfacc68..a6129970 100644 --- a/httpclient-core/src/main/java/esa/httpclient/core/netty/NettyHandle.java +++ b/httpclient-core/src/main/java/esa/httpclient/core/netty/NettyHandle.java @@ -139,7 +139,7 @@ private void onError0(Throwable cause) { listener.onError(request, ctx, cause); } catch (Throwable ex) { - LoggerUtils.logger().warn("Unexpected exception occurred on handle#onError0", cause); + LoggerUtils.logger().error("Unexpected exception occurred on handle#onError0", cause); } } diff --git a/httpclient-core/src/test/java/esa/httpclient/core/exec/RetryPredicateImplTest.java b/httpclient-core/src/test/java/esa/httpclient/core/exec/RetryPredicateImplTest.java index c87722e9..bd9b6dde 100644 --- a/httpclient-core/src/test/java/esa/httpclient/core/exec/RetryPredicateImplTest.java +++ b/httpclient-core/src/test/java/esa/httpclient/core/exec/RetryPredicateImplTest.java @@ -19,6 +19,7 @@ import esa.httpclient.core.HttpRequest; import esa.httpclient.core.HttpResponse; import esa.httpclient.core.exception.ClosedConnectionException; +import esa.httpclient.core.exception.ClosedStreamException; import org.junit.jupiter.api.Test; import java.net.ConnectException; @@ -36,7 +37,9 @@ void testCanRetry() { then(predicate.canRetry(mock(HttpRequest.class), mock(HttpResponse.class), mock(Context.class), new ConnectException())).isTrue(); then(predicate.canRetry(mock(HttpRequest.class), mock(HttpResponse.class), mock(Context.class), - ClosedConnectionException.INSTANCE)).isTrue(); + new ClosedConnectionException(""))).isTrue(); + then(predicate.canRetry(mock(HttpRequest.class), mock(HttpResponse.class), mock(Context.class), + new ClosedStreamException(""))).isTrue(); then(predicate.canRetry(mock(HttpRequest.class), mock(HttpResponse.class), mock(Context.class), new RuntimeException())).isFalse(); } diff --git a/httpclient-core/src/test/java/esa/httpclient/core/netty/Http2FrameHandlerTest.java b/httpclient-core/src/test/java/esa/httpclient/core/netty/Http2FrameHandlerTest.java index 5ac068a8..3b7ee237 100644 --- a/httpclient-core/src/test/java/esa/httpclient/core/netty/Http2FrameHandlerTest.java +++ b/httpclient-core/src/test/java/esa/httpclient/core/netty/Http2FrameHandlerTest.java @@ -17,14 +17,13 @@ import esa.commons.http.HttpHeaderNames; import esa.commons.http.HttpVersion; -import esa.commons.netty.core.Buffer; import esa.httpclient.core.Context; import esa.httpclient.core.HttpClient; import esa.httpclient.core.HttpRequest; import esa.httpclient.core.HttpResponse; import esa.httpclient.core.Listener; import esa.httpclient.core.NoopListener; -import esa.httpclient.core.exception.ClosedConnectionException; +import esa.httpclient.core.exception.ClosedStreamException; import esa.httpclient.core.exception.ContentOverSizedException; import esa.httpclient.core.util.Futures; import io.netty.buffer.ByteBufAllocator; @@ -43,7 +42,6 @@ import io.netty.handler.codec.http2.Http2ConnectionDecoder; import io.netty.handler.codec.http2.Http2ConnectionEncoder; import io.netty.handler.codec.http2.Http2Error; -import io.netty.handler.codec.http2.Http2Exception; import io.netty.handler.codec.http2.Http2FrameReader; import io.netty.handler.codec.http2.Http2FrameWriter; import io.netty.handler.codec.http2.Http2Headers; @@ -572,7 +570,7 @@ void testOnRstStreamRead() { frameInboundWriter.writeInboundRstStream(requestId, Http2Error.INTERNAL_ERROR.code()); then(response.isDone() && response.isCompletedExceptionally()).isTrue(); - then(Futures.getCause(response)).isInstanceOf(Http2Exception.class); + then(Futures.getCause(response)).isInstanceOf(ClosedStreamException.class); channel.flush(); then(registry.get(requestId)).isNull(); @@ -635,7 +633,7 @@ void testOnStreamRemoved() { new DefaultHttp2Headers(), 0, true)); then(response.isDone() && response.isCompletedExceptionally()).isTrue(); - then(Futures.getCause(response)).isInstanceOf(Http2Exception.class); + then(Futures.getCause(response)).isInstanceOf(ClosedStreamException.class); channel.flush(); then(registry.get(requestId)).isNull(); @@ -668,216 +666,9 @@ void testOnGoAwayRead() { channel.flushInbound(); then(response.isDone() && response.isCompletedExceptionally()).isTrue(); - then(Futures.getCause(response)).isInstanceOf(ClosedConnectionException.class); + then(Futures.getCause(response)).isInstanceOf(ClosedStreamException.class); then(registry.get(requestId)).isNull(); channel.finishAndReleaseAll(); } - - @Test - void testErrorWhileHandlingOnMessage() { - final HandleRegistry registry = new HandleRegistry(2, 0); - final long maxContentLength = -1L; - - setUp(registry, maxContentLength); - - final HttpRequest request = client.get("/abc"); - final Context ctx = new Context(); - final Listener listener = new NoopListener(); - final CompletableFuture response = new CompletableFuture<>(); - - final NettyHandle handle = new NettyHandle(new DefaultHandle(ByteBufAllocator.DEFAULT), - request, ctx, listener, response) { - @Override - public void onMessage(esa.httpclient.core.HttpMessage message) { - throw new IllegalArgumentException(); - } - }; - - final int requestId = registry.put(handle); - - final Http2Headers headers = new DefaultHttp2Headers(); - headers.add("a", "b"); - headers.add("x", "y"); - headers.status(HttpResponseStatus.BAD_REQUEST.codeAsText()); - - frameInboundWriter.writeInboundHeaders(requestId, headers, 0, true); - - then(response.isDone()).isTrue(); - then(registry.get(requestId)).isNull(); - then(response.isDone()).isTrue(); - then(response.isCompletedExceptionally()).isTrue(); - then(Futures.getCause(response)).isInstanceOf(Http2Exception.StreamException.class); - - then(registry.get(requestId)).isNull(); - channel.finishAndReleaseAll(); - } - - @Test - void testErrorWhileHandlingOnData() { - final HandleRegistry registry = new HandleRegistry(2, 0); - final long maxContentLength = -1L; - - setUp(registry, maxContentLength); - - final HttpRequest request = client.get("/abc"); - final Context ctx = new Context(); - final Listener listener = new NoopListener(); - final CompletableFuture response = new CompletableFuture<>(); - - final NettyHandle handle = new NettyHandle(new DefaultHandle(ByteBufAllocator.DEFAULT), - request, ctx, listener, response) { - @Override - public void onData(Buffer content) { - throw new IllegalArgumentException(); - } - }; - - final int requestId = registry.put(handle); - - final Http2Headers headers = new DefaultHttp2Headers(); - headers.add("a", "b"); - headers.add("x", "y"); - headers.status(HttpResponseStatus.BAD_REQUEST.codeAsText()); - - frameInboundWriter.writeInboundHeaders(requestId, headers, 0, false); - frameInboundWriter.writeInboundData(requestId, Unpooled.buffer().writeBytes(DATA), - 0, true); - - then(response.isDone()).isTrue(); - then(registry.get(requestId)).isNull(); - then(response.isDone()).isTrue(); - then(response.isCompletedExceptionally()).isTrue(); - then(Futures.getCause(response)).isInstanceOf(Http2Exception.StreamException.class); - - then(registry.get(requestId)).isNull(); - channel.finishAndReleaseAll(); - } - - @Test - void testErrorWhileHandingOnTrailers() { - final HandleRegistry registry = new HandleRegistry(2, 0); - final long maxContentLength = -1L; - - setUp(registry, maxContentLength); - - final HttpRequest request = client.get("/abc"); - final Context ctx = new Context(); - final Listener listener = new NoopListener(); - final CompletableFuture response = new CompletableFuture<>(); - - final NettyHandle handle = new NettyHandle(new DefaultHandle(ByteBufAllocator.DEFAULT), - request, ctx, listener, response) { - - @Override - public void onTrailers(esa.commons.http.HttpHeaders trailers) { - throw new RuntimeException(); - } - }; - - final int requestId = registry.put(handle); - - final Http2Headers headers = new DefaultHttp2Headers(); - headers.add("a", "b"); - headers.add("x", "y"); - headers.status(HttpResponseStatus.BAD_REQUEST.codeAsText()); - - frameInboundWriter.writeInboundHeaders(requestId, headers, 0, false); - frameInboundWriter.writeInboundData(requestId, Unpooled.EMPTY_BUFFER, 0, false); - - frameInboundWriter.writeInboundHeaders(requestId, headers, 0, true); - - then(response.isDone()).isTrue(); - then(registry.get(requestId)).isNull(); - then(response.isDone()).isTrue(); - then(response.isCompletedExceptionally()).isTrue(); - then(Futures.getCause(response)).isInstanceOf(Http2Exception.StreamException.class); - - then(registry.get(requestId)).isNull(); - channel.finishAndReleaseAll(); - } - - @Test - void testErrorWhileHandingOnEnd() { - final HandleRegistry registry = new HandleRegistry(2, 0); - final long maxContentLength = -1L; - - setUp(registry, maxContentLength); - - final HttpRequest request = client.get("/abc"); - final Context ctx = new Context(); - final Listener listener = new NoopListener(); - final CompletableFuture response = new CompletableFuture<>(); - - final NettyHandle handle = new NettyHandle(new DefaultHandle(ByteBufAllocator.DEFAULT), - request, ctx, listener, response) { - @Override - public void onEnd() { - throw new IllegalStateException(); - } - }; - - final int requestId = registry.put(handle); - - final Http2Headers headers = new DefaultHttp2Headers(); - headers.add("a", "b"); - headers.add("x", "y"); - headers.status(HttpResponseStatus.BAD_REQUEST.codeAsText()); - - frameInboundWriter.writeInboundHeaders(requestId, headers, 0, false); - frameInboundWriter.writeInboundData(requestId, Unpooled.EMPTY_BUFFER, 0, false); - - frameInboundWriter.writeInboundHeaders(requestId, headers, 0, true); - - then(response.isDone()).isTrue(); - then(registry.get(requestId)).isNull(); - then(response.isDone()).isTrue(); - then(response.isCompletedExceptionally()).isTrue(); - then(Futures.getCause(response)).isInstanceOf(Http2Exception.StreamException.class); - - then(registry.get(requestId)).isNull(); - channel.finishAndReleaseAll(); - } - - @Test - void testErrorWhileHandingOnError() { - final HandleRegistry registry = new HandleRegistry(2, 0); - - final long maxContentLength = 1L; - - setUp(registry, maxContentLength); - - final HttpRequest request = client.get("/abc"); - final Context ctx = new Context(); - final Listener listener = new NoopListener(); - final CompletableFuture response = new CompletableFuture<>(); - - final NettyHandle handle = new NettyHandle(new DefaultHandle(ByteBufAllocator.DEFAULT), - request, ctx, listener, response) { - - @Override - public void onError(Throwable cause) { - throw new IllegalArgumentException(); - } - }; - - final int requestId = registry.put(handle); - - final Http2Headers headers = new DefaultHttp2Headers(); - headers.add("a", "b"); - headers.add("x", "y"); - headers.status(HttpResponseStatus.BAD_REQUEST.codeAsText()); - - frameInboundWriter.writeInboundHeaders(requestId, headers, 0, false); - frameInboundWriter.writeInboundData(requestId, Unpooled.buffer().writeBytes(DATA), 0, - false); - - assertThrows(ClosedChannelException.class, - () -> frameInboundWriter.writeInboundHeaders(requestId, headers, 0, true)); - - then(response.isDone()).isFalse(); - then(registry.get(requestId)).isNull(); - channel.finishAndReleaseAll(); - } - }