This source demonstrates how to process a stream of sensor data using Kafka Streams.
The sensors produce a stream of records, including sensor ID, a timestamp and the current state (on or off). The desired result is a stream of records enriched with the duration the sensor has been in this state.
For example, a stream
Name | Timestamp | State |
---|---|---|
Sensor 1 |
1984-01-22T15:45:00Z |
off |
Sensor 1 |
1984-01-22T15:45:10Z |
off |
Sensor 1 |
1984-01-22T15:45:30Z |
on |
Sensor 1 |
1984-01-22T15:46:30Z |
off |
should produce
Name | Timestamp | State | Duration |
---|---|---|---|
Sensor 1 |
1984-01-22T15:45:00Z |
off |
10s |
Sensor 1 |
1984-01-22T15:45:00Z |
off |
30s |
Sensor 1 |
1984-01-22T15:45:30Z |
on |
60s |
Which tells us that “Sensor 1” was “off” from 15:45:00 for 30 seconds and “on” from 15:45:30 for 60 seconds.
Note that the second “off” reading produced an intermediate result.
Duplicate readings of the same state generate intermediate results, and delayed readings (timestamps preceding previously seen values) are treated as errors.
These are deliberate choices and can easily be changed.
Care has been taken to keep the business logic independent of implementation details like serialization formats.
The data model is in the model directory, the business logic in logic.
The tests test the topology with nine different formats, Protocol Buffers, JSON, Apache Avro, the Confluent variants of these three, XML, Apache Thrift and Amazon Ion. Different, random combinations of input, result, and state store formats are tested.
While this abstraction might not be necessary in practice, it demonstrates two important design considerations:
-
The business logic should only depend on a data model, not capabilities of the serialization mechanism.
We can simply use
Duration::between
,
which is a simple call and easy to understand and test, instead of cluttering our logic with
conversions and unnecessary error-prone calculations.
-
The choice of (de-)serializers should depend on the requirements, not on what is just at hand.
While internal processing pipelines tend (but don’t have) to use one serialization mechanism, it is perfectly valid and a good design decision to use different mechanisms for parts interfacing with external components.
Since the business logic is independent of the serialization mechanism, changing it is simple and usually does not require retesting.
By refactoring the business logic to depend only on an abstract store, we speed up testing by a
factor of seven
(bazel test //src/test/java/com/fillmore_labs/kafka/sensors/logic:all
vs.
bazel test //src/test/java/com/fillmore_labs/kafka/sensors/topology:all
), which demonstrates a potential
for improvement in development speed and testability.
You need Bazelisk installed, see also Installing Bazel using Bazelisk.
Using Chocolatey enter
choco install bazelisk
Enable developer mode:
-
Open Windows settings
-
Go to “Update & security”, then “For developers”
-
Under “Developer Mode” section enable “Install apps from any source, including loose files”.
or run with administrator privileges.
To run all tests, use
bazel test //src/test/...
To run a single test, use
bazel test //src/test/java/com/fillmore_labs/kafka/sensors/topology:all
The tests run with an embedded Kafka and mock schema registry, when necessary.
The main app needs Kafka running at localhost
, port 9092 (see
application.yaml). There is a script doing that:
scripts/kafka-server.sh
When Kafka has finished starting, create the topics in a different terminal:
scripts/kafka-topics.sh
Now start the main app:
bazel run //:kafka-sensors
Open another terminal to watch the results:
scripts/kafka-consume.sh
Publish sensor values:
scripts/kafka-produce.sh
Run the JMH microbenchmarks with
bazel run //:benchmark
Compare deserialization of two formats:
bazel run //:benchmark -- -p "format=proto,thrift" "Bench\\.deserialize"
Generate a flame graph for detailed analysis:
bazel run //:benchmark -- -p "format=proto" "Bench\\.deserialize" \
-prof "async:output=flamegraph;direction=forward"
open "$(bazel info bazel-bin)/src/main/java/com/fillmore_labs/kafka/sensors/benchmark/benchmark.runfiles/com_fillmore_labs_kafka_sensors/com.fillmore_labs.kafka.sensors.benchmark.Bench.deserialize-AverageTime-format-proto/flame-cpu-forward.html"
Run the latest image on your Kubernetes cluster:
kubectl run serialization-benchmark --image=fillmorelabs/serialization-benchmark \
--attach --rm --restart=Never -- -p "format=proto,json,json-iso" "Bench\\.serialize"
As noted in Implementation of Business Logic the business login is independent of the serialization, in the spirit of hexagonal architecture. This of course requires some mapping, where we mostly use MapStruct for. This necessitates some limitations in data model naming conventions. MapStruct uses a fixed und quite inflexible accessor naming strategy, so you can’t really decide that Protocol Buffers should have one convention but Immutables another. Especially for Immutables we are forced to use JavaBeans-style naming convention, although this is not a JEE application.