The fundamental units of systems built with LabGraph are:
Together, these concepts allow us to construct real-time computational [graphs](https://en.wikipedia.org/wiki/Graph_(discrete_mathematics). These are "networks" of algorithms working together to produce some result within a time constraint. We want to be able to iterate on systems we build this way as efficiently as possible, which is what LabGraph is designed to enable. By enabling the fast plug-and-play of different nodes in such a graph, we can rapidly try out different hardware sensors, preprocessing pipelines, machine learning models, and user interfaces.
In this overview we will use a toy example of a simple visualization. The example is intended to illustrate the end-to-end combination of a data source, processing step, and user interface. While this may not seem like a useful piece of software to build, this example is a jumping-off point that is extensible to any combination of hardware, processing, and UI.
A message is a piece of data with a consistent structure. For example:
import numpy as np
import labgraph.v1 as lg
class RandomMessage(lg.Message):
timestamp: float
data: np.ndarray
Here, we define two fields:
timestamp
, a float which associates a timestamp with the data.data
, a numpy array that stores some data (which we will generate randomly in this example)
This should look familiar if you are familiar with dataclasses or named tuples in Python – these are libraries that allow the concise construction of simple data-wrapping classes using Python typehints. Like these builtin libraries, LabGraph relies on Python type hints to allow users to specify how data is composed. As you'll see when we look at groups, LabGraph relies heavily on type hints in general for composition.
To create a message, we simply create an instance of the RandomMessage
class:
import time
random_message = RandomMessage(
timestamp=time.time(), data=np.random.rand(100)
)
# Now LabGraph can use random_message
Suppose we were running an experiment where someone presses a button, and we wanted to include the state of the button with every frame. We could just add a button_pressed
field to RandomMessage
, but this would couple RandomMessage
with that experiment and make reusing it with other experiments harder. Instead, we can compose message types via class inheritance (in the same way that the dataclasses
module allows):
class CustomMessage(RandomMessage):
button_pressed: bool
Then CustomMessage
retains all the fields of RandomMessage
, and we can instantiate it like so:
custom_message = CustomMessage(
timestamp=time.time(),
data=np.random.rand(100),
button_pressed=True,
)
A stream is a sequence of messages that is accessible in real-time. Streams are a useful construct when we want to build systems that respond in real-time to input from a hardware device. In this example we will stream RandomMessage
which is not particularly useful, but you could imagine a sensor stream instead in an experimental setting.
A topic is a named reference to a stream. This is useful because we usually want to refer to a stream by different names in different contexts.
We'll see how to create topics when we discuss nodes below, and we'll see how to create streams when we discuss groups.
A node is a class that describes a self-contained algorithm that operates on topics. Nodes have the following properties:
- They define topics which act as inputs and outputs for streams
- They describe their algorithms using methods that are marked with special LabGraph "decorators"
- They are guaranteed to always be running in the same Python process throughout the life of a graph, with access to the same memory. This means a node can store state that is unique to the algorithm it implements.
Nodes describe their algorithms using methods that are marked with special LabGraph decorators. Here is an example of a node that takes a rolling average of some data:
class RollingState(lg.State):
messages: List[RandomMessage] = field(default_factory=list)
class RollingConfig(lg.Config):
window: float
class RollingAverager(lg.Node):
INPUT = lg.Topic(RandomMessage)
OUTPUT = lg.Topic(RandomMessage)
state: RollingState
config: RollingConfig
@lg.subscriber(INPUT)
@lg.publisher(OUTPUT)
async def average(self, message: RandomMessage) -> lg.AsyncPublisher:
current_time = time.time()
self.state.messages.append(message)
self.state.messages = [
message
for message in self.state.messages
if message.timestamp >= current_time - self.config.window
]
if len(self.state.messages) == 0:
return
all_data = np.stack([message.data for message in self.state.messages])
mean_data = np.mean(all_data, axis=0)
yield self.OUTPUT, RandomMessage(timestamp=current_time, data=mean_data)
First, we define some state & configuration - see Lifecycles and Configuration for details. Then we define the RollingAverager
node:
- We define two topics,
INPUT
andOUTPUT
which are references to streams of typeRandomMessage
. These topics are local to theRolingAverager
node; it is the only node that can interact with them. When we construct a graph, we can point these topics at whatever streams we want. - We define a method
average
to describe the averaging algorithm.- The subscriber decorator indicates that this method subscribes to the
INPUT
topic. Whenever a message is sent into the stream that theINPUT
topic points to, theaverage
method will be called with that message. - The publisher decorator indicates that this method publishes to the
OUTPUT
topic. - The
async
keyword indicates that this is a asyncio-enabled method. asyncio is a builtin Python library for writing concurrent code. AsyncPublisher
is a typehint provided by LabGraph to help type publisher functions..- The averaging algorithm produces an average sample by looking at all previous samples over the window.
- The
yield
keyword is used to publish messages using asyncio. We yield a tuple containing the topic and the message to be published.
- The subscriber decorator indicates that this method subscribes to the
A group is a container that includes some functionality that can be reused much like a node can be. A group can contain nodes, as well as other groups. As a result, groups enable composition and swappability of subgraphs.
Suppose we wanted a way to quickly package together a data source and a transformation of that data. Here is a Generator
node, as well as a group that packages it with RollingAverager
above:
class GeneratorConfig(lg.Config):
sample_rate: float
num_features: int
class Generator(lg.Node):
OUTPUT = lg.Topic(RandomMessage)
config: GeneratorConfig
@lg.publisher(OUTPUT)
async def generate_noise(self) -> lg.AsyncPublisher:
while True:
yield self.OUTPUT, RandomMessage(
timestamp=time.time(), data=np.random.rand(self.config.num_features)
)
await asyncio.sleep(1 / self.config.sample_rate)
class AveragedNoiseConfig(lg.Config):
sample_rate: float
num_features: int
window: float
class AveragedNoise(lg.Group):
OUTPUT = lg.Topic(RandomMessage)
config: AveragedNoiseConfig
GENERATOR: Generator
ROLLING_AVERAGER: RollingAverager
def connections(self) -> lg.Connections:
return (
(self.GENERATOR.OUTPUT, self.ROLLING_AVERAGER.INPUT),
(self.ROLLING_AVERAGER.OUTPUT, self.OUTPUT),
)
def setup(self) -> None:
# Cascade configuration to contained nodes
self.GENERATOR.configure(
GeneratorConfig(
sample_rate=self.config.sample_rate,
num_features=self.config.num_features,
)
)
self.ROLLING_AVERAGER.configure(RollingConfig(window=self.config.window))
- We create an output topic for the group - other groups that contain this group can connect to this topic
- We assign a configuration to this group that contains all the information both nodes require
- We add the type annotations
GENERATOR
andROLLING_AVERAGER
to indicate that eachAveragedNoise
group contains these nodes. - The
connections
method lets us specify how topics will be connected. When two topics are connected, it means they will point to the same stream at runtime. In this case we specify that theOUTPUT
topic of theGenerator
will be connected to theINPUT
topic of theRollingAverager
. We also create theOUTPUT
topic inAveragedNoise
that basically exposes the output stream fromRollingAverager
.
Now we can use AveragedNoise
as if it were a node – it is a self-contained module that we can reuse anywhere we want.
A graph is a complete set of topics and nodes that describes a functioning system. It is actually a group, but it is assumed to be the outermost container for the description of a system. Suppose we want to display a live visualization using our group from earlier. We can define a graph as follows:
class Demo(lg.Graph):
AVERAGED_NOISE: AveragedNoise
PLOT: Plot
def setup(self) -> None:
self.AVERAGED_NOISE.configure(
AveragedNoiseConfig(
sample_rate=SAMPLE_RATE, num_features=NUM_FEATURES, window=WINDOW
)
)
self.PLOT.configure(
PlotConfig(refresh_rate=REFRESH_RATE, num_bars=NUM_FEATURES)
)
def connections(self) -> lg.Connections:
return ((self.AVERAGED_NOISE.OUTPUT, self.PLOT.INPUT),)
def process_modules(self) -> Tuple[lg.Module, ...]:
return (self.AVERAGED_NOISE, self.PLOT)
- We add the
AVERAGED_NOISE
group from before, as well as aPlot
node (implementation left out) that displays a live bar graph in a UI. - Since graphs are groups, we can use
connections()
to connectAveragedNoise
toPlot
. - Graphs have the additional functionality of being able to define
process_modules()
which lets us specify the process architecture of the groups and nodes the graph contains. In this case, by returning(self.AVERAGED_NOISE, self.PLOT)
from this method, we cause theAveragedNoise
group and thePlot
node to run in separate processes. If we excluded this method, everything would run in a single process. In this way,process_modules()
lets us concisely specify how a graph should be parallelized.
Then we can run the graph using the lg.run()
tool:
if __name__ == "__main__":
lg.run(Demo)
That's it! This will run the graph, which will generate noise and display it in real-time.