Skip to content

Commit

Permalink
Don't automatically start requesting on subscribe, close #1380
Browse files Browse the repository at this point in the history
Motivation:

HandlerSubscriber automatically starts requests when calling
onSubscribe.

But when a publisher subscribes, RxJava automatically requests the
Subscription if there are some pending requests.

When such race condition happen, we end up with 2 threads (calling and
event loop) competing for requesting the Subscription. This results is
out of order published messages.

Modification:

Make HandlerSubscriber#onSubscribe noop and actually delay it to after
Publisher has subscribed.

Result:

No more out of order messages.
  • Loading branch information
slandelle committed Nov 28, 2017
1 parent 8c27b67 commit 391b4d8
Showing 1 changed file with 12 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public void write(Channel channel, NettyResponseFuture<?> future) throws IOExcep
NettySubscriber subscriber = new NettySubscriber(channel, future);
channel.pipeline().addLast(NAME_IN_CHANNEL_PIPELINE, subscriber);
publisher.subscribe(new SubscriberAdapter(subscriber));
subscriber.delayedStart();
}
}

Expand Down Expand Up @@ -108,6 +109,17 @@ protected void complete() {
.addListener(future -> removeFromPipeline()));
}

private volatile Subscription deferredSubscription;

@Override
public void onSubscribe(Subscription subscription) {
deferredSubscription = subscription;
}

public void delayedStart() {
super.onSubscribe(deferredSubscription);
}

@Override
protected void error(Throwable error) {
if (error == null)
Expand Down

0 comments on commit 391b4d8

Please sign in to comment.