Skip to content
schlangster edited this page Jun 7, 2014 · 20 revisions

Motivation

These days, many programs are interactive. As such, they have to react to dynamic inputs and update the program state accordingly. Due to data inter-dependencies, the effects of such changes may ripple through the program as cascading updates.

Implementing this propagation manually is tedious and error-prone. If callback mechanisms are required, things get even worse due to the mix of an inverted control flow and the explicit use of side effects.

Signals provide an abstraction in the form of reactive values that automatically update themselves when their dependencies have changed. Each signal is defined as a function that takes its input values as arguments. An underlying runtime engine automatically takes care of the propagation. Since the engine has the "whole picture", it can schedule updates more efficiently, so that:

  • No value is updated unnecessarily;
  • intermediate results are cached to avoid redundant calculations;
  • no glitches will occur (i.e. inconsistent sets of input values).

It can even implicitly parallelize the updates, while automatically taking care of thread-safety and effective utilization of available parallel hardware.

Basics

We assume the following set up for each of the following examples:

#include "react/Domain.h"
#include "react/Signal.h"

#include <iostream>
// ... include any additional required std headers

using std::cout;
using std::endl;

using namespace react;

REACTIVE_DOMAIN(D)

Signal.h defines the signal functionality and Domain.h allows to define a new reactive domain. Purpose of the latter is explained later; for now, it suffices to know that each reactive type (and that includes signals) is tagged with a domain. The last line defines a reactive domain named D. You are free to pick a different name and define multiple domains, but reactives of different domains can not be combined.

Hello World

We start by creating a VarSignal that holds the string "Hello World":

using std::string;

D::VarSignalT<string> myString = MakeVar<D>(string( "Hello world" ));

D::VarSignalT<S> is an alias for VarSignal<D,S>. Putting the domain name in front is arguably easier to read. To omit the domain name from type declarations, the macro USING_REACTIVE_DOMAIN(name) can be used to define aliases for all reactive types of the given domain.

A VarSignal is similar to a regular variable. Its inner value can be read imperatively with myString.Value() and changed with myString.Set(x). An alternative method for changing the value of a VarSignal is the overloaded <<= operator, i.e. myString <<= x. This should not be mixed up with the assignment operator, which is reserved to assign the signal itself, not the inner value.

So far that's not very reactive, so let's make it more interesting. We define a VarSignal for each word and create a signal that concatenates them:

string concatFunc(string first, string second) {
    return first + string(" ") + second;
};
USING_REACTIVE_DOMAIN(D)

// The two words
VarSignalT<string> firstWord  = MakeVar<D>(string( "Change" ));
VarSignalT<string> secondWord = MakeVar<D>(string( "me!" ));

// A signal that concatenates both words
SignalT<string> bothWords = MakeSignal(With(firstWord,secondWord), concatFunc);
cout << bothWords.Value() << endl; // output: "Change me!"
firstWord  <<= string( "Hello" );
secondWord <<= string( "World" );
cout << bothWords.Value() << endl; // output: "Hello World"

Would this work, too?

bothWords <<= "Hello World?"; // No it wouldn't. Only VarSignals use imperative input.

Unlike VarSignal, the value of a Signal cannot be modified directly. Instead, it is the return value of a function. With MakeSignal, we have connected the values of the signals in the With(...) expression to the function arguments of concatFunc. The value type of the created signal matches the return type of the function. The value of bothWords is automatically set by calling concatFunc(firstWord.Value(), secondWord.Value()). This happens to set the initial value. and when firstWord or secondWord have been changed.

The With utility function creates a SignalPack from a variable number of given signals. A single signal can be used directly as the first argument, but multiple signals are usually wrapped in SignalPack instead of passing them directly.

MakeSignal accepts any type of function, including lambdas:

MakeSignal(With(firstWord,secondWord), [] (string first, string second) {
    return first + string(" ") + second;
});

The code can be made even more concise, because the body of concatFunc consists of operators only:

SignalT<string> bothWords = firstWord + string( " " ) + secondWord;

The definition of bothWords now essentially is the body of the function used to calculate it. Most unary and binary operators (see Reference for details) are overloaded to automatically lift expressions with a signal operand to create new signals.

Reacting to value changes

Here's a signal that computes the absolute value of an integer:

USING_REACTIVE_DOMAIN(D)

VarSignalT<int> x    = MakeVar<D>(1);
SignalT<int>    xAbs = MakeSignal(x, [] (int x) { return std::abs(x); };
// There exist several overloads for std::abs().
// Otherwise, using std::abs directly would have worked, too.

We want to print out the current value of xAbs after every change. xAbs.Value() allows to get the current value at any time, but constantly polling that is not feasible. Technically, we could add the output during the computation itself:

SignalT<int> xAbs = MakeSignal(x, [] (int x) {
    auto result = std::abs(x);
    // Note: Don't actually do this, it's wrong.
    cout << "xAbs changed to " << result << endl;
    return result;
});

This is problematic, because now the function to compute xAbs is no longer a pure function. In general, a function used to calculate a signal value should do only that; it should not cause side effects or depend on values other than its arguments. This makes it easier to reason about the program behavior, and more importantly, you can allow the internal propagation engine to parallelize updates without having to worry about data races.

Instead of printing the value in the function that computes it, we create an observer of xAbs:

#include "Observer.h"

Observe(xAbs, [] (int newValue) {
    cout << "xAbs changed to " << newValue << endl;
});

The passed callback function is called when the value of the observed signal changes. Observers are meant to cause side effects. They don't return any value, so it's in fact all they can do.

By default, the lifetime of an observer is attached to its observed subject (that's sum in this case). It's also possible to detach observers manually, but this topic is covered in the Observers guide.

Here's an output sample:

            // initially x is 1
x <<=  2;   // output: xAbs changed to 2
x <<= -3;   // output: xAbs changed to 3
x <<=  3;   // no output, xAbs is still 3

Changing multiple inputs

Consider the following example:

#include "Observer.h"

USING_REACTIVE_DOMAIN(D)

VarSignalT<int> a = MakeVar<D>(1);
VarSignalT<int> b = MakeVar<D>(1);

SignalT<int>     x = a + b;
SignalT<int>     y = a + b;
SignalT<int>     z = x + y;
Observe(z, [] (int newValue) {
    std::cout << "z changed to " << newValue << std::endl;
});

a <<= 2;

How many outputs does this generate? After a is changed, the change is propagated to x and y. z depends on both of them, so it could be updated twice. But there will be just one output: z changed to 6. All propagation engines are guaranteed to be output-minimal. This means that for a single propagation turn, no observer is going be called more than once. The input to a executes a single turn. a <<= 2 <<= 3 executes two turns.

Does this also mean that internally z is only updated once? Not necessarily, but most engines are also update-minimal, which extends the previous guarantee to all reactive values, i.e. no reactive value is going to be updated more than once during a turn.

For

a <<= 2; // output: z changed to 6
b <<= 2; // output: z changed to 8

there are two outputs. Here's how to avoid the intermediate result:

D::DoTransaction([] {
    a <<= 2;
    b <<= 2; 
}); // output: z changed to 8

Input inside the function passed to DoTransaction does not immediately start a turn, but rather waits until the function returns. Then the changes of all inputs are propagated in a single turn. Besides offering a performance gain, this allows to process related inputs together.

If there are multiple inputs to the same signal, all but the last one are discarded:

a <<= 3; // output: z changed to 10

D::DoTransaction([] {
    a <<= 4;
    a <<= 3; 
}); // still 3, no change and no turn

Modifying inputs in-place

VarSignals require imperative input. So far, we've used .Set (or the <<= equivalent notation) to do that, but there might be situations where we want to modify the current signal value rather than replacing it:

using std::string;
using std::vector;

USING_REACTIVE_DOMAIN(D)

VarSignalT<vector<string>> data = MakeVar<D>(vector<string>{ });
auto v = data.Value(); // Copy
v.push_back("hello");  // Change
data <<= std::move(v); // Replace (using move to avoid extra copy)

Using this method, the new and old values will be compared internally, so in summary thats one copy, one comparison and two moves (one for the input, one after the comparison to apply the new value).

The following method allows us to eliminate these intermediate steps by modifying the current value in-place:

data.Modify([] (vector<string>& data) {
    data.push_back("Hello");
});

data.Modify([] (vector<string>& data) {
    data.push_back("World");
});

for (const auto& s : data.Value())
    cout << s << endl;
// output: Hello World

The drawback is that since we can not compare the old and new values, we loose the ability to detect whether the data was actually changed. We always have to assume that it did and re-calculate dependent signals.

Creating complex signals

Signals can be combined to create more complex expressions, as shown in the next example.

For two operands a and b, we want to compute sum, difference and product, get the respective expression as a string, put it into a pair together with the result and put those into a vector.

First, we set up some type definitions and helper functions:

USING_REACTIVE_DOMAIN(D)

using std::string;

using ExprPairT = std::pair<string,int>;
using ExprVectT = std::vector<ExprPairT>;

string makeExprStr(int a, int b, const char* op)
{
    return std::to_string(a) + string( op ) + std::to_string(b);
}

ExprPairT makeExprPair(const string& s, int v)
{
    return std::make_pair(s, v);
}

void printExpressions(const ExprVectT& expressions)
{
    cout << "Expressions: " << endl;
    for (const auto& p : expressions)
        cout << "\t" << p.first << " is " << p.second << endl;
}

From here on, we will explore multiple possible implemenations.

Version 1

In the first implemenation, we successively define intermediate signals:

    // Input operands
    VarSignalT<int> a = MakeVar<D>(1);
    VarSignalT<int> b = MakeVar<D>(2);

    // Calculations
    SignalT<int> sum  = a + b;
    SignalT<int> diff = a - b;
    SignalT<int> prod = a * b;

    using std::placeholders::_1;
    using std::placeholders::_2;

    // Stringified expressions
    SignalT<string> sumExpr =
        MakeSignal(With(a,b), bind(makeExprStr, _1, _2, "+"));

    SignalT<string> diffExpr =
        MakeSignal(With(a,b), bind(makeExprStr, _1, _2, "-"));

    SignalT<string> prodExpr =
        MakeSignal(With(a,b), bind(makeExprStr, _1, _2, "*"));

    // The expression vector
    SignalT<ExprVectT> expressions = MakeSignal(
        With(
            MakeSignal(With(sumExpr, sum),   &makeExprPair),
            MakeSignal(With(diffExpr, diff), &makeExprPair),
            MakeSignal(With(prodExpr, prod), &makeExprPair)
        ),
        [] (const ExprPairT& sumP, const ExprPairT& diffP, const ExprPairT& prodP) {
            return ExprVectT{ sumP, diffP, prodP};
        });
Observe(expressions, printExpressions);

This is essentially a purely functional approach. One issue is that all the intermediate helper signals are exposed, even if we are only interested in expressions. The next implementation addresses that.

Version 2

We define a helper function createExpressionSignal, which takes the two operand signals as inputs and returns the vector signal:

SignalT<ExprVectT> createExpressionSignal(const SignalT<int>& a, const SignalT<int>& b)
{
    using std::placeholders::_1;
    using std::placeholders::_2;

    auto sumExpr =
        MakeSignal(With(a,b), bind(makeExprStr, _1, _2, "+"));

    auto diffExpr =
        MakeSignal(With(a,b), bind(makeExprStr, _1, _2, "-"));

    auto prodExpr =
        MakeSignal(With(a,b), bind(makeExprStr, _1, _2, "*"));

    return MakeSignal(
        With(
            MakeSignal(With(sumExpr,  a + b), &makeExprPair),
            MakeSignal(With(diffExpr, a - b), &makeExprPair),
            MakeSignal(With(prodExpr, a * b), &makeExprPair)
        ),
        [] (const ExprPairT& sumP, const ExprPairT& diffP, const ExprPairT& prodP) {
            return ExprVectT{ sumP, diffP, prodP };
        });
}

// Input operands
VarSignalT<int> a = MakeVar<D>(1);
VarSignalT<int> b = MakeVar<D>(2);

// The expression vector
SignalT<ExprVectT> expressions = createExpressionSignal(a, b);

Is it a problem that expressions depends on signals that are out of scope after we leave the function? And if no, does that mean that there are memory leaks? Neither is the case. The actual signal - referred to as signal node from here on - is heap allocated. Signal and VarSignal have semantics of a shared pointer to that node. Furthermore, dependent signals share ownership of their inputs, so as long as expressions is still around sumP, diffP and prodP won't be destroyed either.

The previous example might seem unnecessarily complicated, but the declarative style does not have to be used for everything, as shown in the next version.

Version 3

Instead of defining intermediate signals, we can just use regular imperative style inside the function that generates expressions:

// Input operands
VarSignalT<int> a = MakeVar<D>(1);
VarSignalT<int> b = MakeVar<D>(2);

// The expression vector
SignalT<ExprVectT> expressions = MakeSignal(With(a,b), [] (int a, int b) {
    ExprVectT result;

    result.push_back(
        make_pair(
            makeExprStr(a, b, "+"),
            a + b));

    result.push_back(
        make_pair(
            makeExprStr(a, b, "-"),
            a - b));

    result.push_back(
        make_pair(
            makeExprStr(a, b, "*"),
            a * b));

    return result;
});

The result is the same, and of the 3 versions, this one seems to be the most feasible.

Details

Graph model

The interdependencies between signals can be modeled as a directed acyclic graph, where each signal is a node and edges denote a dependency relation. Viewing the reactive system as a graph is key to understanding two aspects: dataflow and memory management.

Consider the following example and the respective graph representations:

VarSignalT<int> a = MakeVar(...);
VarSignalT<int> b = MakeVar(...);
VarSignalT<int> c = MakeVar(...);
SignalT<int>    x = (a + b) * c;
Dataflow graph Reference graph
Drawing Drawing

In the dataflow graph, each node is a signal. If an edge exists from v1 to v2 that means v1 will propagate its changes to v2. In other words, after a node changed, all its sucessors will be updated. From a dataflow perspectve, VarSignals are sources. One observation taken from this example is that not all nodes in the graph are named signals; the temporary sub-expression a + b results in a node as well.

While the dataflow graph abstracts from certain details, the second graph shows a more complete picture of the implementation. Each circle represents the actual signal node that is created internally. The number inside the node denotes its reference count. The boxes on the left are the Signal and VarSignal proxies exposed to the API. They are lightweight objects that hold no data other than a shared pointers to the heap allocated signal nodes. As such, they have similar semantics to a std::shared_ptr, i.e. after the statement VarSignalT aCopy = a, the reference count of the node is incremented and both a and aCopy would point to the same node. If a new node is created, it takes shared ownership of its dependencies, because it needs them to calculate its own value.

Assuming the handles for a, b and c would go out of scope but x remains, that would still keep the reference count of all nodes at 1, until x disappears as well. Once that happens, the graph is deconstructed from the bottom up.

Cycles

It's generally not possible to construct cyclic graphs. In the following example

SignalT<U> a;
SignalT<U> b;

a = MakeSignal(b, ...);
b = MakeSignal(a, ...);

b is still invalid at the time of construction, because it's not linked to any node. As such, the code will compile, but fail a runtime assertion.

The exception to this rule are higher order signals:

VarSignalT<U>           in      = MakeVar(...);
VarSignalT<SignalT<U>>  outer   = MakeVar(a);

SignalT<U>              flat    = Flatten(outer);

// Cycle'd
outer <<= flat;

Creating cyclic graphs results in undefined behaviour.

Temporary signals

There's a difference between using MakeSignal and implicit lifting with operators:

VarSignalT<int> a = MakeVar(...);
VarSignalT<int> b = MakeVar(...);
VarSignalT<int> c = MakeVar(...);

// Implicit lifting
SignalT<int> xLift   = (a + b) * c;

// Explicit function
SignalT<int> xFunc = MakeSignal(
    Width(a,b,c),
    [] (int a, int b, int c) {
        (a + b) * c;
    });

While both versions calculate the same result, they are not equivalent. As shown by the graph earlier, the sub-expression a + b results in an extra node with lifting, whereas xFunc only creates a single node for the whole expression. That's because xLift is equivalent to

// Explicit function
SignalT<int> xLift = MakeSignal
(
    With
    (
        MakeSignal(With(a,b), AddFunctor( )),
        c
    ),
    MultFunctor( )
);

The overloaded operators create a new signal with a function that wraps their operation on the values of the signals.

This presents a dilemma: On the one hand, being able to use operators directly results in more readable code, on the other hand creating additional nodes is rather expensive. They are heap-allocated, cache their value, store their predecessors, the propagation engine has to process them, and the compiler can't optimize the expression.

To resolve this, all r-value signals are merged to a single node. This is requires a new type of signal: TempSignal. Unlike SignalT<U>, TempSignalT<U,Op> also stores information about it's underlying operation in the type. The following code shows the simplified principle:

TempSignalT<int,AddOp<int,int>>
    t = a + b;
TempSignalT<int,MultOp<AddOp<int,int>,int>>
    x = t * c;

When an r-value TempSignal is passed to an operator, it'll move all the data out of the previous node and merge it into the newly created one. Since the operation is statically encoded in the type, the merged function can be optmized by the compiler. Besides some overhead during creation due to the node merging, the extra cost of temporary nodes is completely mitigated.

TempSignals are usually not exposed outside of temporary expressions, because they get downcasted to regular signals:

SignalT<int> x = (a + b) * c;
  1. a + b returns a TempSignal t
  2. Since t is an r-value, it gets merged by t * c, which returns a TempSignal as well.
  3. By assigning the TempSignal to a Signal, the extra type information is erased.

There is one situation, where temporary signals stay around longer; that is, when using auto. This allows to request node merging manually, for example when creating a complex signal with several intermediate signals:

// t still has its TempSignal type
auto t = a + b; 

// explicitly merged t into x
auto x = std::move(t) * c;

Note that without the std::move, there would be no merging, as t may be a TempSignal but can't be bound to an r-value reference.

Internals

Internally, each SignalT<S> stores a value of type S. When a signal is updated, the following actions take place:

void Update()
{
    S newValue = evaluate(predecessors.Value() ...);

    if (currentValue != newValue)
    {
        currentValue = std::move(newValue);
        D::PropagationPolicy::OnChange(*this);
    }
}

The propagation engine is responsible for calling Update on successors. It may not do so immediately, but order updates so that other predecessors that require updating in the same turn come first. This allows to eliminate repeated re-calculations. It also ensures that all values of predecessors.Value() are consistently from the current turn.