diff --git a/client/src/main/java/org/asynchttpclient/netty/request/body/NettyReactiveStreamsBody.java b/client/src/main/java/org/asynchttpclient/netty/request/body/NettyReactiveStreamsBody.java index 781152bdf1..26ed0667cf 100644 --- a/client/src/main/java/org/asynchttpclient/netty/request/body/NettyReactiveStreamsBody.java +++ b/client/src/main/java/org/asynchttpclient/netty/request/body/NettyReactiveStreamsBody.java @@ -58,6 +58,7 @@ public void write(Channel channel, NettyResponseFuture future) throws IOExcep NettySubscriber subscriber = new NettySubscriber(channel, future); channel.pipeline().addLast(NAME_IN_CHANNEL_PIPELINE, subscriber); publisher.subscribe(new SubscriberAdapter(subscriber)); + subscriber.delayedStart(); } } @@ -108,6 +109,17 @@ protected void complete() { .addListener(future -> removeFromPipeline())); } + private volatile Subscription deferredSubscription; + + @Override + public void onSubscribe(Subscription subscription) { + deferredSubscription = subscription; + } + + public void delayedStart() { + super.onSubscribe(deferredSubscription); + } + @Override protected void error(Throwable error) { if (error == null)