diff --git a/client/src/main/java/org/asynchttpclient/netty/request/body/NettyReactiveStreamsBody.java b/client/src/main/java/org/asynchttpclient/netty/request/body/NettyReactiveStreamsBody.java index 9002e600e1..781152bdf1 100644 --- a/client/src/main/java/org/asynchttpclient/netty/request/body/NettyReactiveStreamsBody.java +++ b/client/src/main/java/org/asynchttpclient/netty/request/body/NettyReactiveStreamsBody.java @@ -32,96 +32,97 @@ public class NettyReactiveStreamsBody implements NettyBody { - private static final Logger LOGGER = LoggerFactory.getLogger(NettyReactiveStreamsBody.class); - private static final String NAME_IN_CHANNEL_PIPELINE = "request-body-streamer"; - - private final Publisher publisher; - - private final long contentLength; - - public NettyReactiveStreamsBody(Publisher publisher, long contentLength) { - this.publisher = publisher; - this.contentLength = contentLength; - } - - @Override - public long getContentLength() { - return contentLength; - } - - @Override - public void write(Channel channel, NettyResponseFuture future) throws IOException { - if (future.isStreamConsumed()) { - LOGGER.warn("Stream has already been consumed and cannot be reset"); - } else { - future.setStreamConsumed(true); - NettySubscriber subscriber = new NettySubscriber(channel, future); - channel.pipeline().addLast(NAME_IN_CHANNEL_PIPELINE, subscriber); - publisher.subscribe(new SubscriberAdapter(subscriber)); - } - } - - private static class SubscriberAdapter implements Subscriber { - private final Subscriber subscriber; - - public SubscriberAdapter(Subscriber subscriber) { - this.subscriber = subscriber; - } - - @Override - public void onSubscribe(Subscription s) { - subscriber.onSubscribe(s); - } - - @Override - public void onNext(ByteBuf buffer) { - HttpContent content = new DefaultHttpContent(buffer); - subscriber.onNext(content); - } - - @Override - public void onError(Throwable t) { - subscriber.onError(t); - } - - @Override - public void onComplete() { - subscriber.onComplete(); - } - } - - private static class NettySubscriber extends HandlerSubscriber { - private static final Logger LOGGER = LoggerFactory.getLogger(NettySubscriber.class); - - private final Channel channel; - private final NettyResponseFuture future; - - public NettySubscriber(Channel channel, NettyResponseFuture future) { - super(channel.eventLoop()); - this.channel = channel; - this.future = future; - } - - @Override - protected void complete() { - channel.eventLoop().execute(() -> channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT).addListener(future -> removeFromPipeline())); - } - - @Override - protected void error(Throwable error) { - if (error == null) - throw null; - removeFromPipeline(); - future.abort(error); - } - - private void removeFromPipeline() { - try { - channel.pipeline().remove(this); - LOGGER.debug(String.format("Removed handler %s from pipeline.", NAME_IN_CHANNEL_PIPELINE)); - } catch (NoSuchElementException e) { - LOGGER.debug(String.format("Failed to remove handler %s from pipeline.", NAME_IN_CHANNEL_PIPELINE), e); - } - } - } + private static final Logger LOGGER = LoggerFactory.getLogger(NettyReactiveStreamsBody.class); + private static final String NAME_IN_CHANNEL_PIPELINE = "request-body-streamer"; + + private final Publisher publisher; + + private final long contentLength; + + public NettyReactiveStreamsBody(Publisher publisher, long contentLength) { + this.publisher = publisher; + this.contentLength = contentLength; + } + + @Override + public long getContentLength() { + return contentLength; + } + + @Override + public void write(Channel channel, NettyResponseFuture future) throws IOException { + if (future.isStreamConsumed()) { + LOGGER.warn("Stream has already been consumed and cannot be reset"); + } else { + future.setStreamConsumed(true); + NettySubscriber subscriber = new NettySubscriber(channel, future); + channel.pipeline().addLast(NAME_IN_CHANNEL_PIPELINE, subscriber); + publisher.subscribe(new SubscriberAdapter(subscriber)); + } + } + + private static class SubscriberAdapter implements Subscriber { + private final Subscriber subscriber; + + public SubscriberAdapter(Subscriber subscriber) { + this.subscriber = subscriber; + } + + @Override + public void onSubscribe(Subscription s) { + subscriber.onSubscribe(s); + } + + @Override + public void onNext(ByteBuf buffer) { + HttpContent content = new DefaultHttpContent(buffer); + subscriber.onNext(content); + } + + @Override + public void onError(Throwable t) { + subscriber.onError(t); + } + + @Override + public void onComplete() { + subscriber.onComplete(); + } + } + + private static class NettySubscriber extends HandlerSubscriber { + private static final Logger LOGGER = LoggerFactory.getLogger(NettySubscriber.class); + + private final Channel channel; + private final NettyResponseFuture future; + + public NettySubscriber(Channel channel, NettyResponseFuture future) { + super(channel.eventLoop()); + this.channel = channel; + this.future = future; + } + + @Override + protected void complete() { + channel.eventLoop().execute(() -> channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT) + .addListener(future -> removeFromPipeline())); + } + + @Override + protected void error(Throwable error) { + if (error == null) + throw null; + removeFromPipeline(); + future.abort(error); + } + + private void removeFromPipeline() { + try { + channel.pipeline().remove(this); + LOGGER.debug(String.format("Removed handler %s from pipeline.", NAME_IN_CHANNEL_PIPELINE)); + } catch (NoSuchElementException e) { + LOGGER.debug(String.format("Failed to remove handler %s from pipeline.", NAME_IN_CHANNEL_PIPELINE), e); + } + } + } }