Skip to content
Sebastian Jeckel edited this page Jun 22, 2014 · 2 revisions

Motivation

Observers allow to execute imperative actions in reaction to events or value changes of signals.

A central point of interest is how the lifetime of these observers is controlled. It should be possible to have explicit control over how long these observers remain active. On the other hand, this should not extend to the point where failing to detach an observer manually would result in a resource leak.

Tutorials

The following tutorials use this domain definition:

#include "react/Domain.h"
REACTIVE_DOMAIN(D, sequential)

Creating subject-bound observers

As tutorials in the previous sections have shown, by default observers are attached to the lifetime of the observed subject. Here's another demonstration of this behaviour:

#include "react/Event.h"

void testFunc()
{
    auto trigger = D::MakeEventSource();

    Observe(trigger, [] {
        // ...
    });
}

After leaving testFunc, trigger gets destroyed. This automatically detaches and destroys the observer as well.

It's possible to detach all observers from a subject with DetachAllObservers.

Sometimes that's all the control we need. As an example, we create a simple callback registry that collects functions and executes them once when triggered:

#include "react/Event.h"
#include "react/Observer.h"

class DelayedExecutor
{
public:
    template <typename F>
    ScheduleForExecution(F&& func)
    {
        Observe(trigger_, std::foward<F>(func));
    }

    void Execute()
    {
        trigger_.Emit();
        DetachAllObservers(trigger_);
    }

private:
    EventSourceT<>    trigger_ = MakeEventSource();
};

Detaching observers manually

To detach an observer before the lifetime of the observed subject ends, the observer handle returned by Observe has to be saved:

#include "react/Event.h"
#include "react/Observer.h"

void testFunc()
{
    auto trigger = D::MakeEventSource();

    D::ObserverT obs = Observe(trigger, [] (Token) {
        std::cout << "Triggered!" << std::endl;
    });

    trigger.Emit(); // output: Triggered!

    obs.Detach();   // Remove the observer

    trigger.Emit(); // no output
}

After the observer has been removed by calling Detach on its handle, the underlying observer node is destroyed and the handle becomes invalid.

While it exists and has not been invalidated, an observer handle also takes shared ownership of the observed subject, i.e. by saving it, we state our continued interest in subject. This is required to enable observers on temporary signals, which are created for the sole purpose of being observed and would otherwise be destroyed immediately.

Using scoped observers

#include "react/Event.h"
#include "react/Observer.h"

void testFunc()
{
    auto trigger = D::MakeEventSource();

    // Start inner scope
    {
        D::ScopedObserverT scopedObs
        {
            Observe(trigger, [] (Token) {
                std::cout << "Triggered!" << std::endl;
            })
        };

        trigger.Emit(); // output: Triggered!
    }
    // End inner scope

    trigger.Emit(); // no output
}
auto obs = Observe(trigger, [] { /* ... */ });

D::ScopedObserverT scopedObs{ std::move(obs) };

Observing temporary reactives

Consider the following case, where we create a merged event stream of two sources specifically for the purpose of being passed as an argument to Observe:

#include "react/Event.h"
#include "react/Observer.h"

void testFunc()
{
    auto e1 = D::MakeEventSource();
    auto e2 = D::MakeEventSource();

    Observe(e1 | e2, [] (Token) {
        std::cout << "Triggered!" << std::endl;
    });

    e1.Emit(); // no output
    e2.Emit(); // no output
}

The reason this prints no output is that the lifetime of the observer is attached to the subject, which in this case is a temporary value. Since nobody takes ownership of e1 | e2, it'll be destroyed immediately after the call. This can be prevented in two ways:

Method 1

auto merged = e1 | e2;

Observe(merged, [] {
    std::cout << "Triggered!" << std::endl;
});

e1.Emit(); // output: Triggered!
e2.Emit(); // output: Triggered!

e1 | e2 is no longer temporary and as long as merged exists, the observer will exist.

Method 2

auto obs = Observe(e1 | e2, [] (Token) {
    std::cout << "Triggered!" << std::endl;
});

e1.Emit(); // output: Triggered!
e2.Emit(); // output: Triggered!

Here we utlize the fact that while ObserverT exists, it takes shared ownership of its subject.

Synchronized access to signal values

The tutorials presented so far observed a single reactive, but we might need access to other data as well:

#include <string>
#include "react/Signal.h"
#include "react/Event.h"
#include "react/Observer.h"
#include "react/ReactiveObject.h"

using std::string;

class Employee : public ReactiveObject<D>
{
    VarSignalT<string>   Name;
    VarSignalT<int>      Years;

    EventSourceT<>       GotFired       = MakeEventSource();
    EventSourceT<string> GotPromoted    = MakeEventSource<string>();

    Employee(const char* name, int years) :
        Name{ MakeVar(string(name)) },
        Years{ MakeVar(years) }
    {
        // Accessing Value() during a turn is not thread-safe!
        Observe(GotFired, [] (Token) {
            std::cout << Name.Value() << " was fired after working "
                      << Years.Value() << " for the company" << std::endl;
        });
        Observe(GotPromoted, [] (Token, const string& position) {
            std::cout << Name.Value() << " was promoted to "
                      << position << std::endl;
        });
    }
}

The problem with this code is that Name and/or Years might get changed in the same turn as the GotFired event is triggered. Because Value() is not thread-safe, this leads to potential data races and undefined behaviour. Even with single-threaded propagation, glitch-freedom is no longer guaranteed.

To avoid this, we can pass additional signal dependencies to Observe:

Observe(
    GotFired,
    With(name, Years),
    [] (Token, const string& name, int years) {
        std::cout << name << " was fired after working " << years
                  << " for the company" << std::endl;
    });
Observe(
    GotPromoted,
    With(Name),
    [] (const string& position, const string& name) {
        std::cout << name << " was promoted to " << position << std::endl;
    });

The current values of these extra dependencies are supplied as additional arguments to the observer function. The observer takes shared ownership of the extra signals to prevent them from disappearing as long it requires their values. Changes of these signals do not trigger a call of the observer function; only events from the observed event stream do.

Details

TODO