From 391b4d8e0eced22e56f390d4c5ee8a51c1c98d27 Mon Sep 17 00:00:00 2001 From: Stephane Landelle Date: Tue, 28 Nov 2017 13:09:42 +0100 Subject: [PATCH] Don't automatically start requesting on subscribe, close #1380 Motivation: HandlerSubscriber automatically starts requests when calling onSubscribe. But when a publisher subscribes, RxJava automatically requests the Subscription if there are some pending requests. When such race condition happen, we end up with 2 threads (calling and event loop) competing for requesting the Subscription. This results is out of order published messages. Modification: Make HandlerSubscriber#onSubscribe noop and actually delay it to after Publisher has subscribed. Result: No more out of order messages. --- .../netty/request/body/NettyReactiveStreamsBody.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/client/src/main/java/org/asynchttpclient/netty/request/body/NettyReactiveStreamsBody.java b/client/src/main/java/org/asynchttpclient/netty/request/body/NettyReactiveStreamsBody.java index 781152bdf1..26ed0667cf 100644 --- a/client/src/main/java/org/asynchttpclient/netty/request/body/NettyReactiveStreamsBody.java +++ b/client/src/main/java/org/asynchttpclient/netty/request/body/NettyReactiveStreamsBody.java @@ -58,6 +58,7 @@ public void write(Channel channel, NettyResponseFuture future) throws IOExcep NettySubscriber subscriber = new NettySubscriber(channel, future); channel.pipeline().addLast(NAME_IN_CHANNEL_PIPELINE, subscriber); publisher.subscribe(new SubscriberAdapter(subscriber)); + subscriber.delayedStart(); } } @@ -108,6 +109,17 @@ protected void complete() { .addListener(future -> removeFromPipeline())); } + private volatile Subscription deferredSubscription; + + @Override + public void onSubscribe(Subscription subscription) { + deferredSubscription = subscription; + } + + public void delayedStart() { + super.onSubscribe(deferredSubscription); + } + @Override protected void error(Throwable error) { if (error == null)