This project offers a seamless SQL model for efficiently prototyping event-sourcing and event-streaming by using Postgres database.
Check the schema.sql and extensions.sql! It is all there! No additional tools, frameworks, or programming languages are required at this level.
- Run Postgres
- Examples of usage
- Event Sourcing
- Event Streaming
- 4. Registering a (materialized) view
view1
with 1 second pooling frequency, starting from 28th Jan. - 5. Appending two events for another decider
2ac37f68-9d66-11ed-a8fc-0242ac120002
. - 6a. Stream the events to concurrent consumers/views
- 6b. Stream the events to concurrent consumers / edge-functions (views)
- 4. Registering a (materialized) view
- Design
- fmodel
- Try YugabyteDB
- References and further reading
This model is enabling and supporting:
event-sourcing
data pattern (by using Postgres database) to durably store events- Append events to the ordered, append-only log, using
entity id
/decider id
anddecider
type as a key - Load all the events for a single entity/decider, in an ordered sequence, using the
entity id
/decider id
anddecider
type as a key - Support optimistic locking/concurrency
- Append events to the ordered, append-only log, using
event-streaming
to concurrently coordinate read over a stream of messages from multiple consumer instances/views- Support real-time concurrent consumers to project events to view/query models
- Acknowledge that event with
decider_id
andoffset
is successfully processed by the view / ACK - Acknowledge that event with
decider_id
is NOT processed by the view, and the view will process it again automatically / NACK - (Optionally) Acknowledge that event with
decider_id
is NOT processed by the view, and the view will process it again after delay / SCHEDULE NACK
Every decider/entity stream of events represents an independent kafka-like
partition. The events within a partition are ordered. There is no ordering guarantee across different partitions.
The API is a set of SQL functions that you can use to interact with the database. You can use them in your application. The API is what you would expect from a typical event-sourcing and event-streaming database.
SQL function / API | event-sourcing | event-streaming | description |
---|---|---|---|
register_decider_event |
✔️ | ❌ | Register a decider and event types that it can publish |
append_event |
✔️ | ❌ | Append/Insert new event to the database events table |
get_events |
✔️ | ❌ | Get/List events for the decider |
get_last_event |
✔️ | ❌ | Get last event for the decider |
register_view |
❌ | ✔️ | Register a view to stream events to |
stream_events |
❌ | ✔️ | Stream events to the view/concurrent consumers |
ack_event |
❌ | ✔️ | Acknowledge that event with decider_id and offset is successfully processed by the view |
nack_event |
❌ | ✔️ | Acknowledge that event with decider_id is NOT processed by the view, and the view will process it again |
schedule_nack_event |
❌ | ✔️ | Acknowledge that event with decider_id is NOT processed by the view, and the view will process it again after delay |
scedule_events (cron extension) |
❌ | ✔️ | Schedule events to be published |
It is a Supabase Docker image of Postgres, with extensions installed:
Notice that we only need these two extensions to publish events to edge-functions/HTTP endpoints/serverless applications, as explained in section 6b
below.
If you do not need to publish events directly to your serverless applications, vanilla Postgres will work just fine!
You can run the following command to start Postgres in a Docker container:
docker compose up -d
These examples are using SQL to interact with the database. Hopefully, you will find them useful, and you can use them in your application.
Import the schema.sql (imported by default) and extensions.sql (not imported!) into your database.
The deciders
table controls the decider and event names/types that can be used in the events table itself through composite foreign keys.
It must be populated before events can be appended to the main table called events
.
SELECT *
from register_decider_event('decider1', 'event1', 'description1', 1);
SELECT *
from register_decider_event('decider1', 'event2', 'description2', 1);
Multiple constraints are applied to events
table to ensure bad events do not make their way into the system.
This includes duplicated events, incorrect naming (event and decider names cannot be misspelled, and the client cannot insert an event from the wrong decider), ensured sequential events, disallowed delete, and disallowed update.
Notice how
previous_id
of the second event points toevent_id
of the first event (effectively implementing optimistic locking).
SELECT *
from append_event('event1', '21e19516-9bda-11ed-a8fc-0242ac120002', 'decider1', 'f156a3c4-9bd8-11ed-a8fc-0242ac120002',
'{}', 'f156a3c4-9bd8-11ed-a8fc-0242ac120002', null, 1);
SELECT *
from append_event('event2', 'eb411c34-9d64-11ed-a8fc-0242ac120002', 'decider1', 'f156a3c4-9bd8-11ed-a8fc-0242ac120002',
'{}', 'f156a3c4-9bd8-11ed-a8fc-0242ac120002', '21e19516-9bda-11ed-a8fc-0242ac120002', 1);
SELECT *
from get_events('f156a3c4-9bd8-11ed-a8fc-0242ac120002', 'decider1');
The View
must be registered before events can be streamed to it.
This streaming is kafka-like, in that it is modeling the concept of partitions and offsets.
Every unique stream of events for the one deciderId/entityId is a partition.
Lock
table is used to prevent concurrent access/reading to the same partition, guaranteeing that only one consumer can read from a partition at a time / guaranteeing the ordering within the partition on the reading side.
You can configure the view
to publish event(s) every 1 second, starting from 28th Jan, 2023 with lock/ACK timeout of 300 seconds (if you dont acknowledge that you processed the event in 300 sec, the lock will be released and event will be published again, automatically).
Notice how
lock
for the two events withdecider_id
=f156a3c4-9bd8-11ed-a8fc-0242ac120002
is created in the background (using triggers).
SELECT *
from register_view('view1', '2023-01-28 12:17:17.078384', 300, 1, 'https://localhost:3000/functions/v1/event-handler');
The alone existence of the View is changing how append_event
works. It is now creating a new event, but also updating a lock table.
offset
/ current offset of the event stream fordecider_id
offset_final
/ an indicator if the offset is final / offset will not grow anymore
Notice how
previous_id
of the second event is pointing toevent_id
of the first event.
Notice how additional
lock
for the registered view and two new events withdecider_id
=2ac37f68-9d66-11ed-a8fc-0242ac120002
created in the background (using triggers).
SELECT *
from append_event('event1', 'f7c370aa-9d65-11ed-a8fc-0242ac120002', 'decider1', '2ac37f68-9d66-11ed-a8fc-0242ac120002',
'{}', 'f156a3c4-9bd8-11ed-a8fc-0242ac120002', null, 1);
SELECT *
from append_event('event2', '42ee177e-9d66-11ed-a8fc-0242ac120002', 'decider1', '2ac37f68-9d66-11ed-a8fc-0242ac120002',
'{}', 'f156a3c4-9bd8-11ed-a8fc-0242ac120002', 'f7c370aa-9d65-11ed-a8fc-0242ac120002', 1);
stream_events
function is used to stream events to the view.
On every event being read a lock table is updated to acquire a lock on that partition.
You can:
- unlock the partition with
ack-event
function / acknowledge that the event withdecider_id
andoffset
is processed by the view - unlock the partition with
nack-event
function / acknowledge that the event withdecider_id
is NOT processed by the view, and the view should try to process it again / offset is not updated - schedule the partition for retry with
schedule_nack_event
function / acknowledge that the event withdecider_id
is NOT processed by the view, and the view should try to process it again after some time/offset is not updated
Notice that this query can run in a loop within your application.
-- Get first 100 events
SELECT * from stream_events('view1', 100);
SELECT * from ack_event('view1', 'f156a3c4-9bd8-11ed-a8fc-0242ac120002', 1);
-- ACK other 99 events, and call `stream_events` again to get the next 100 events.
-- If you do not ACK the events in 300 seconds as configured on the `view` table, they will be processed again on the next call to `stream_events`.
Import the extensions.sql into your database.
It is very similar to the 6a
case. The difference is that the cron job will run SELECT * from stream_events('view1');
for you, and publish event(s) to your edge-functions/http endpoints automatically. So, the database is doing all the job.
The cron
job is managed(created/deleted) by triggers on the view
table. So, whenever you register a new View, the cron job will be created automatically.
The SQL functions and schema we provide will help you to persist, query, and stream events in a robust way, but the decision-making and view-handling logic would be something that you would have to implement on your own.
- The decision-making process is a command handler responsible for handling the command/intent and producing new events/facts that can be saved in the database by using
append_event
SQL function. Command handler can be implemented in any programming language, Kotlin, TypeScript, Rust, ...
- The view-handling process is an event handler that is responsible for handling the event/fact and producing a new view/query model. Event handler uses
stream_events
SQL function from your application to fetch/pool events, orstream_events
SQL function is triggered by the cron job on the DB side and event(s) are published/pushed to your event handlers/HTTP endpoints/edge functions.
'fmodel' is a set of libraries that aims to bring functional, algebraic, and reactive domain modeling to Kotlin / TypeScript / Rust / Java. It is inspired by DDD, EventSourcing, and Functional programming communities.
💙 Accelerate the development of compositional, ergonomic, data-driven, and safe applications 💙
Event-Sourced | State-Stored | |
---|---|---|
Kotlin (Spring) |
fmodel-spring-demo | fmodel-spring-state-stored-demo |
Kotlin (Ktor) |
fmodel-ktor-demo | todo |
TypeScript |
todo | todo |
Rust |
fmodel-rust-demo | todo |
Alternatively, you can use YugabyteDB instead of Postgres. It is fully compatible with Postgres.
YugabyteDB is a high-performance, cloud-native distributed SQL database that aims to support all Postgres features. It is best fit for cloud-native OLTP (i.e. real-time, business-critical) applications that need absolute data correctness and require at least one of the following: scalability, high tolerance to failures, and globally distributed deployments.
You can download as ready-to-use packages or installers for various platforms.
./bin/yugabyted start --master_flags=ysql_sequence_cache_minval=0 --tserver_flags=ysql_sequence_cache_minval=0
Alternatively, you can run the following command to start YugabyteDB in a Docker container:
docker run -d --name yugabyte -p7000:7000 -p9000:9000 -p5433:5433 -p9042:9042\
yugabytedb/yugabyte:latest bin/yugabyted start\
--daemon=false --master_flags=ysql_sequence_cache_minval=0 --tserver_flags=ysql_sequence_cache_minval=0
- (Marco Pegoraro) https://github.com/marcopeg/postgres-event-sourcing
- (Matt Bishop) https://github.com/mattbishop/sql-event-store
- FModel
- Supabase
Created with ❤️ by Fraktalio
Excited to launch your next IT project with us? Let's get started! Reach out to our team at [email protected]
to begin the journey to success.