Skip to content
Sebastian Jeckel edited this page Jun 22, 2014 · 3 revisions

Motivation

To be able to handle complex tasks with signals and events, more reactive operations are required. These include:

  • Transformations between signals and events;
  • mechanisms to incrementally modify signal state as a result of events.

Tutorials

The following tutorials use this domain definition:

#include "react/Domain.h"
REACTIVE_DOMAIN(D, sequential)

For convenience, all headers are included:

#include "react/Signal.h"
#include "react/Event.h"
#include "react/Observer.h"
#include "react/ReactiveObject.h"
#include "react/Algorithm.h"

Converting events to signals

The semantics of this conversion are to hold on to the most recent event and store it as a signal.

This is accomplished with Hold:

class Sensor : public ReactiveObject<D>
{
public:
    EventSourceT<int>   Samples     = MakeEventSource<int>();
    SignalT<int>        LastSample  = Hold(Samples);
};
Sensor mySensor;

Observe(mySensor.LastSample, [] (int v) {
    std::cout << v << std::endl;
});

mySensor.Samples << 20 << 21 << 21 << 22; // output: 20, 21, 22

If a stream emits multiple events during a single turn, only the last one is passed on to Hold:

D::DoTransaction([&] {
    mySensor.Samples << 20 << 21 << 21 << 22;
}); // output: 22

Converting signals to events

The semantics of this conversion are to generate an event stream of changed signal values. This is similar to what we already did with signal observers, which react to value changes. The difference is that rather than applying side-effects, the changed value is forwarded as an event.

The function implementing the conversion is named Monitor:

class Employee : public ReactiveObject<D>
{
public:
    VarSignalT<string>   Name    = MakeVar(string("Bob"));
    VarSignalT<int>      Salary  = MakeVar(3000);

    EventsT<int>         SalaryChanged = Monitor(Salary);
};

One potential use is to create synchronized observers with the resulting event stream:

Employee bob;

Observe(
    bob.SalaryChanged,
    With(bob.Name),
    [] (int newSalary, const string& name) {
        std::cout << name << " now earns " << newSalary << std::endl;
    });

Creating stateful signals

The signals shown so far were generated by pure functions, but what if we want to update a signal based on its current state?

This is shown on the example of a reactive counter:

class Counter : public ReactiveObject<D>
{
public:
    EventSourceT<>  Increment = MakeEventSource();
    
    SignalT<int> Count = Iterate(
        Increment,
        0,
        [] (Token, int oldCount) {
            return oldCount + 1;
        });
};
Counter myCounter;
myCounter.Increment();
myCounter.Increment();
myCounter.Increment();

std::cout << myCounter.Count() << std::endl; // output: 3

// Note: Using function-style operator() instead of .Emit() and .Value()

If you are familar with functional programming, then Iterate is semantically equivalent to the higher order function Fold. In this example, the signal is first initialized with zero; then, for every received increment event, the given function is called with the event value (a token in this case) and the current value of the signal. The return value of the function is used as the new signal value.

The event value can also be used in the computation. To show this, we calculate the average of measured samples:

class Sensor : public ReactiveObject<D>
{
public:
    EventSourceT<int>   Input = MakeEventSource<int>();
    
    SignalT<float> Average = Iterate(
        Input,
        0.,
        [] (int sample, float oldAvg) {
            return (oldAvg + sample) / 2.;
        });
};

If other signal values are required in the iterate function, they can be sychronized by adding them in a signal pack. As usual, changes of these signals do not trigger an update of the iterate function - only the event does.

Here's an example that extends the previous counter with some extra functionality:

enum class ECmd { increment, decrement, reset };

class Counter : public ReactiveObject<D>
{
public:
    EventSourceT<ECmd>  Update  = MakeEventSource<ECmd>();
    VarSignalT<int>     Delta   = MakeVar(1);
    VarSignalT<int>     Start   = MakeVar(0);
    
    SignalT<int> Count = Iterate(
        Update,
        Start.Value(),
        With(Delta, Start),
        [] (ECmd cmd, int oldCount, int delta, int start) {
            if (cmd == ECmd::increment)
                return oldCount + delta;
            else if (cmd == ECmd::decrement)
                return oldCount - delta;
            else
                return start;
        });
};

Avoiding expensive copies

Updating a signal usually involves copying its current value, moving the copy into the passed function and comparing the result to the old value to decide whether it has been changed. For some types, i.e. containers, these operations are rather expensive as they result in allocations and repeated element-wise copying and comparing.

To avoid this, Iterate also supports pass-by non-const reference. This automatically enables, if the return type of the passed function is void.

#include <vector>
using std::vector;

class Sensor : public ReactiveObject<D>
{
public:
    EventSourceT<int>   Input = MakeEventSource<int>();

    VarSignalT<int>     Threshold = MakeVar(1);
    
    SignalT<vector<int>> AllSamples = Iterate(
        Input,
        vector<int>{},
        [] (int input, vector<int>& all) {
            all.push_back(input);
        });

    SignalT<vector<int>> CriticalSamples = Iterate(
        Input,
        vector<int>{},
        With(Threshold),
        [] (int input, vector<int>& critical, int threshold) {
            if (input > threshold)
                critical.push_back(input);
        });
};

The downside is that since the new and old values can no longer be compared, the signal will always assume that it has been changed.

Details

TODO