Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Take operator doesn't dispose upstream observable #209

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

WyriHaximus
Copy link
Contributor

If there is an active polling observable upstream it will keep polling as long as it isn't disposed. By disposing when we reach the desired amount of items, the possible upstream polling is also stopped.

If there is an active polling observable upstream it will keep polling as long as it isn't disposed. By disposing when we reach the desired amount of items, the possible upstream polling is also stopped.
@asm89
Copy link
Member

asm89 commented Jan 11, 2021

Could you add test for this case? Ideally calling dispose both in the operator and from the outside to make sure things still work.

I'm not 100% convinced this is the right thing to do yet, but on the surface it seems sensible. I don't see language implementations dispose the source inside take()? I wonder if it'd make sense to have an operator or the like that auto disposes on error/completed for users that want this behavior?

@asm89
Copy link
Member

asm89 commented Jan 11, 2021

cc @davidwdan @mbonneau

@davidwdan
Copy link
Member

take is called with lift which wraps it with an AnonymousObservable that uses an AutoDetachObserver. AutoDetachObservershould dispose of when completed. A test would be good.

@WyriHaximus
Copy link
Contributor Author

@asm89 Sure, here is the full use case, I want to consume 13 items from Redis and then stop consuming. Everything before take(13) comes from a different class and just provides a constant stream of items when available. It is also used in different places of my code base. Now the problem I ran into is that it keeps popping items from Redis when the "fix" in this PR isn't applied.

Observable::defer(
    fn (): Observable => Observable::fromPromise(resolve($this->redis->blpop(
        $key,
        1
    )))
        ->flatMap(
            static fn (?array $event): Observable => $event === null ? Observable::error(new Exception()) : observableFromArray([$event[1]])
        ),
    $this->asyncScheduler
)
->retryWhen(static fn (Observable $errors): Observable => $errors->delay(0))
->repeatWhen(static fn (Observable $errors): Observable => $errors->delay(0))
->map(
    static fn (string $event): array => json_decode($event, true)
)->take(13)->map(
    // Mainly preventing a memory leak here
    static fn (): bool => true,
)->toArray()->toPromise()

@davidwdan
Copy link
Member

I'm not sure that this is an issue with the take operator. This appears to be a scheduling issue. Does it fix the issue if you change the delay on repeatWhen to something like 10?

@WyriHaximus
Copy link
Contributor Author

@davidwdan Will have a try and report back 👍 . So you're suggesting it's a race condition?

@davidwdan
Copy link
Member

I don't think it's a race condition. On some of the other operators, we schedule the subscriptions so they happen in the next tick. We might have to do something like that with defer.

@ReactiveX ReactiveX deleted a comment from Johnstean Jan 11, 2021
@WyriHaximus
Copy link
Contributor Author

@davidwdan @asm89 Changed the repeatWhen defer to 10 and it worked perfectly. Don't like it as a long term solution tho. Can we solve this in RxPHP? If so, please guide me in the right direction to do that :)

@mbonneau
Copy link
Member

@WyriHaximus That is definitely not the long term solution.

In almost all of the Observable objects (wrapped by operators of, error, fromArray, etc.), there is care not to call onNext (or any notifications) immediately (in the same scheduled time).

I believe the correct way to resolve this issue would be to schedule the ->then call on the promise. This will ensure that the onFulfilled and onRejected handlers do not get invoked recursively. This also seems to be in line with the other Observable factory functions.

This change would require the addition of a scheduler argument to Rx\Promise::toObservable.

This change would also break \Rx\Functional\React\PromiseToObservableTest::two_observables_one_disposed_before_resolve. However, this appears to be an incorrect test depending on what you think the behavior is supposed to be.

Here is what I was thinking:

    public static function toObservable(PromiseInterface $promise, SchedulerInterface $scheduler = null): Observable
    {
        $scheduler = $scheduler ?: Scheduler::getDefault();

        return new AnonymousObservable(function ($observer) use ($promise, $scheduler) {
            $subject = new AsyncSubject();
            
            $disp = $scheduler->schedule(function () use ($subject, $promise) {
                $promise->then(
                    function ($value) use ($subject) {
                        $subject->onNext($value);
                        $subject->onCompleted();
                    },
                    function ($error) use ($subject) {
                        $error = $error instanceof \Exception ? $error : new RejectedPromiseException($error);
                        $subject->onError($error);
                    }
                );
            });

            return new CompositeDisposable([
                $subject->subscribe($observer),
                $disp,
                new CallbackDisposable(function () use ($promise) {
                    if ($promise instanceof CancellablePromiseInterface) {
                        $promise->cancel();
                    }
                })
           ]);
        });
    }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants