This demo uses Materialize to tap into what’s happening on Twitch right now, and shows how far you can push standard SQL to explore streaming data.
First things first 👇
To work with data from Twitch, you need to register an app and get a hold of your app access tokens. If you already have an account, the process should be pretty smooth! After cloning this repo, remember to replace <client_id>
and <client_secret>
in the Kafka producer file with the valid credentials.
Warning: The setup in this repo does not run on M1 Macbooks. For an ARM-compatible version (using Redpanda as a drop-in replacement for Kafka), check out this repo.
We'll use Docker Compose to make it easier to bundle up all the services for our Twitch analytics pipeline:
# Start the setup
docker-compose up -d
# Is everything really up and running?
docker-compose ps
The data generator produces JSON-formatted events about active streams on Twitch into the twitch-streams
Kafka topic. To check that the topic has been created:
docker-compose exec kafka kafka-topics \
--list \
--bootstrap-server kafka:9092
and that there's data landing:
docker-compose exec kafka kafka-console-consumer \
--bootstrap-server kafka:9092 \
--topic twitch-streams \
--from-beginning
Postgres is bootstrapped with the list of stream tags that can be used to describe live streams and categories. This reference data allows us to enrich twitch-streams
events with the description for each tag_id
.
To connect to the running Materialize service, we can use any compatible CLI. Because we're on Docker, let's roll with mzcli
:
docker-compose run mzcli
1. The first step to consume JSON events from Kafka in Materialize is to create a Kafka+JSON source:
CREATE SOURCE kafka_twitch
FROM KAFKA BROKER 'kafka:9092' TOPIC 'twitch-streams'
KEY FORMAT BYTES
VALUE FORMAT BYTES
ENVELOPE UPSERT;
Why do we need
ENVELOPE UPSERT
?The Twitch Helix API returns events sorted by number of current viewers. This means that there might be duplicate or missing events as viewers join and leave a broadcast — which we'll have to deal with. As new events flow in for the same Kafka key (
id
), we're only ever interested in keeping the most recent values, so we'll useENVELOPE UPSERT
to make sure Materialize can also handle updates (and the associated retractions!).
2. The data is stored as raw bytes, so we need to do some casting to convert it to a readable format (and appropriate data types) next:
CREATE VIEW v_twitch_stream_conv AS
SELECT CAST(data AS JSONB) AS data
FROM (
SELECT CONVERT_FROM(data, 'utf8') AS data
FROM kafka_twitch
);
CREATE VIEW v_twitch_stream AS
SELECT CAST(nullif((data->>'id')::string, '') AS bigint) AS id,
CAST(nullif((data->>'user_id')::string, '') AS int) AS user_id,
(data->>'user_login')::string AS user_login,
(data->>'user_name')::string AS user_name,
CAST(nullif((data->>'game_id')::string, '') AS int) AS game_id,
(data->>'game_name')::string AS game_name,
(data->>'type')::string AS type,
(data->>'title')::string AS title,
(data->>'viewer_count')::int AS viewer_count,
(data->>'started_at')::timestamp AS started_at,
(data->>'language')::string AS language,
(data->>'thumbnail_url')::string AS thumbnail_url,
LIST[(data->'tag_ids'->>0)::string,(data->'tag_ids'->>1)::string,(data->'tag_ids'->>2)::string,(data->'tag_ids'->>3)::string,(data->'tag_ids'->>4)::string] AS tag_ids,
(data->>'is_mature')::boolean AS is_mature
FROM v_twitch_stream_conv;
1. One way to connect to a Postgres database in Materialize is to use a Postgres source, which uses logical replication to continuously ingest changes and maintain freshly updated results:
CREATE MATERIALIZED SOURCE mz_source
FROM POSTGRES
CONNECTION 'host=postgres port=5432 user=postgres dbname=postgres password=postgres'
PUBLICATION 'mz_source';
CREATE VIEWS FROM SOURCE mz_source (stream_tag_ids);
CREATE MATERIALIZED VIEW mv_agg_stream_game AS
SELECT game_id,
game_name,
COUNT(id) AS cnt_streams,
SUM(viewer_count) AS agg_viewer_cnt
FROM v_twitch_stream
WHERE game_id IS NOT NULL
GROUP BY game_id, game_name;
What are the top10 games being played?
SELECT game_name,
cnt_streams,
agg_viewer_cnt
FROM mv_agg_stream_game
ORDER BY agg_viewer_cnt
DESC LIMIT 10;
Is anyone playing DOOM?
SELECT game_name,
cnt_streams,
agg_viewer_cnt
FROM mv_agg_stream_game
WHERE upper(game_name) LIKE 'DOOM%';
CREATE MATERIALIZED VIEW mv_stream_15min AS
SELECT title,
user_name,
game_name,
started_at
FROM v_twitch_stream
WHERE game_id IS NOT NULL
AND (mz_logical_timestamp() >= (extract('epoch' from started_at)*1000)::bigint
AND mz_logical_timestamp() < (extract('epoch' from started_at)*1000)::bigint + 900000);
SELECT * FROM mv_stream_15min ORDER BY started_at ASC LIMIT 10;
SELECT MIN(started_at) FROM mv_stream_15min;
CREATE MATERIALIZED VIEW mv_agg_stream_tag AS
SELECT st.localization_name AS tag,
cnt_tag
FROM (
SELECT tg, COUNT(*) AS cnt_tag
FROM v_twitch_stream ts,
unnest(tag_ids) tg
WHERE game_id IS NOT NULL
GROUP BY tg
) un
JOIN stream_tag_ids st ON un.tg = st.tag_id AND NOT st.is_auto;
SELECT * FROM mv_agg_stream_tag ORDER BY cnt_tag DESC;
CREATE VIEW v_stream_game_top10 AS
SELECT game_id, game_name, agg_viewer_cnt
FROM mv_agg_stream_game
ORDER BY agg_viewer_cnt DESC
LIMIT 10;
CREATE MATERIALIZED VIEW mv_stream_game_top10 AS
SELECT t.game_name, user_name, sum_viewer_cnt
FROM v_stream_game_top10 t,
LATERAL (
SELECT game_name, user_name, SUM(viewer_count) AS sum_viewer_cnt
FROM v_twitch_stream ts
WHERE t.game_id = ts.game_id
AND game_id IS NOT NULL
GROUP BY game_name, user_name
ORDER BY sum_viewer_cnt DESC
LIMIT 1
);
To visualize the results in Metabase:
1. In a browser, navigate to localhost:3030 (or <host
:3030>, if running on a VM).
2. Click Let's get started.
3. Complete the first set of fields asking for your email address. This information isn't crucial for anything but has to be filled in.
4. On the Add your data page, specify the connection properties for the Materialize database:
Field | Value |
---|---|
Database | PostgreSQL |
Name | twitch |
Host | materialized |
Port | 6875 |
Database name | materialize |
Database username | materialize |
Database password | Leave empty |
5. Click Ask a question -> Native query. You can find instructions to reproduce the dashboard below under metabase/README.md.
6. Under Select a database, choose twitch.
7. In the query editor, enter:
SELECT SUM(cnt_streams) FROM mv_agg_stream_game;
and hit Save. You need to do this for each visualization you’re planning to add to the dashboard that Metabase prompts you to create.
8. Once you have a dashboard set up, you can manually set the refresh rate to 1 second by adding #refresh=1
to the end of the URL:
http://localhost:3030/dashboard/1-whats-streaming-on-twitch#refresh=1
and opening the modified URL in a new tab:
-
Improve the Kafka producer:
In the future, it'd be preferable to use PubSub to subscribe to a topic for updates instead of periodically sending requests to the Twitch Helix API.
-
Pre-load the Metabase dashboard:
Include a backup of Metabase's embedded database (H2) with a bootstrapped dashboard to save users some time in this step.