Crystal shard to capture Postgres database change events via LISTEN/NOTIFY mechanism and publish them to EventBus::EventHandler
s for further processing. Shard comes with retry and watchdog functionality, and try to re-connect automatically until it has exhausted all of the configured retry_attempts
. To configure Retry Mechanics and WatchDog follow below configuration options.
retry_attempts
: Number of attempts to try before giving up. Use0
or less for infinite attempts. Default0
retry_interval
: Interval in seconds to wait for next attempt to re-connect. Default to5
seconds.
Set
on_error
call back if you want to receive final give-up message, along with last exception received on re-connection attempt. If noon_error
callback is configured, it will raise the last exception
Shard monitors the database connectivity in a separate connection and watch for disconnection/freeze like situations and follow the same semantics of retrying after configured interval. To configure watchdog heartbeat interval and connection time_out, configure EventBus
with below configurations.
watchdog_interval
: Heart beat interval in seconds. Default to5
secondstimeout
: Interval in seconds to wait for network connectivity. Default to5
seconds.
WatchDog will trigger at every watchdog_interval
and wait for connection status for timeout
seconds before timing out.
Below lifecycle methods are invoked for all registered handlers
- on_start - invoked when EventBus is going to start. Override this method
- on_connect - invoked when EventBus PG listener get connected to Postgres.
- on_event - invoked when an event is received. Refer to
EventBus::Event
struct for structure - on_close - invoked when EventBus is going to shutdown
- timestamp :
Time
- PG Timestamp when event occurred - schema :
String
- Schema name of PG - table :
String
- PG Table name where event occurred - action :
EventBus::Action
- contains one ofINSERT
|UPDATE
|DELETE
based on event - id :
JSON::Any
- PG Table columnid
value - data :
String
- JSON object in String representing table row data - changes :
String?
- JSON object in String, contains array of hash with column name, old and new value. This field is set forUPDATE
events only.
-
Add the dependency to your
shard.yml
:dependencies: eventbus: github: spider-gazelle/eventbus
-
Run
shards install
require "eventbus"
# Instantiate EventBus object with Postgres URI
eventbus = EventBus.new(PG_DATABASE_URL, retry_attempts: 5, retry_interval: 5)
# Register Custom EventHandlers which will receive events
eventbus.add_handler MyLogger.new, MyRedisPub.new
# Register Error handler which will get invoked on fatal error
eventbus.on_error ->(ex : EventBus::ErrHandlerType) {
puts " Received Fatal error from EventBus\n"
puts ex
puts "\n terminating gracefully"
eventbus.close rescue nil
}
# enable cdc mechansim on all or particular table
eventbus.ensure_cdc_for_all_tables
# OR
eventbus.ensure_cdc_for("MyTable")
# Start Event Bus
eventbus.start # for asycn (non-blocking mode)
# OR
eventbus.run # for sync (blocking mode)
# Once done
eventbus.close
Located under example
folder.
Complete demo application which make use of handlers under example folder and publishes PG events to Redis Cluster. To run demo application ensure you set below environment variables for it to work.
# Database config:
PG_DATABASE_URL=postgresql://user:password@hostname/database
REDIS_URL=redis://user:password@redis:port/database
Demo client application which connects to REDIS_URL
and subscribe to CHANNEL
for events.
To run demo client subscriber ensure you set below environment variables for it to work.
# Database config:
REDIS_URL=redis://user:password@redis:port/database
CHANNEL="name of your application published channel"
Below sample implementations are provided which are used by demo application.
Sample implementation which simply logs events as they are received.
Sample implementation which publish events to Redis Cluster for publishing to subscribers.
Events are captured and published to Redis channels which are built using scheme schema.table.cdc_events
, where schema
and table
referred to your database schema and table name.
e.g. If you want to subscribe to change events for table mytable
located in public
schema, you should subscribe to channel public.mytable.cdc_events
All change events are published to channels in JSON format
{
"timestamp": "Timestamp with timezone",
"schema": "PG Schema",
"table": "PG Table name",
"action": "one of insert|update|delete",
"id": "Table row ID",
"data" : "JSON object representing table row data",
"changes" : "JSON Array object representing updated columns with old and new value. this is only set for update events"
}
Given you have the following dependencies...
It is simple to develop the service with docker.
- Run specs, tearing down the
docker-compose
environment upon completion.
$ ./test
- Run specs on changes to Crystal files within the
src
andspec
folders.
$ ./test
- To run tests
$ crystal spec
NOTE: The upstream dependencies specified in docker-compose.yml
are required...
$ shards build