- Polling Outbox Publisher is an outbox implementation for distributed systems. It uses an outbox algorithm and async transactions to ensure that no events are lost during processing.
- This application uses a straightforward approach where it periodically checks for new messages and publishes them in groups to the Kafka cluster. It deliberately avoids more complex data transfer technologies like Debezium Change Data Capture (CDC) or Couchbase Data Change Protocol (DCP). While those options might offer better performance and efficiency, this application prioritizes simplicity, ease of use, and maintaining independence from other systems.
- It also supports multiple instances to increase the reliability of the system.
- .NET 8.0 or higher
- SQL Server 2016 or higher
- Redis 5.0 or higher
- Kafka 2.5 or higher
- Clone the repository
git clone https://github.com/Trendyol/PollingOutboxPublisher.git
- Open the project in a IDE of your choice.
- Add the config file (
/src/config/config.json
) and secret file (/src/config/secret.json
). - Set as
0
the first row in theOutboxOffset
. - Run the project.
- The project will start the publish messages in the
OutboxEvents
data store. - That's it! You're ready to go!
- The application can be use Couchbase, PostgreSQL or MSSQL as a data store.
- Each data store has its own implementation and corresponding repository.
- Each data store is corresponding to a table in the database or a bucket in the Couchbase.
- The application uses four different data stores to manage the messages.
OutboxEvents
: The data store that holds the messages to be published.MissingEvents
: The data store that holds the messages that could not be published.ExceededEvents
: The data store that holds the messages that could not be published after the retry limit is reached.OutboxOffset
: The data store that holds the last publishedOutboxEvents
ID.
- The scritps for creating data stores can be found under examples](https://github.com/Trendyol/PollingOutboxPublisher/tree/master/examples) folder
- The application will start getting the next batch of messages from the
OutboxEvents
data store. - Then, it will publish the messages in parallel to the Kafka cluster.
- If an error occurs during the publishing process, the application will move the messages to
MissingEvents
and retry the publishing messages in batches.- When getting batch of event from
OutboxEvents
incremental ID list expected to be continuous. If there is a gap in the ID list, the application will move the messages to theMissingEvents
and retry the publishing messages in batches.
- When getting batch of event from
- If the retry limit is reached, the application will move the message to the
ExceededEvents
. - If the batch of messages is successfully published, the application will update the
OutboxOffset
with the last message id. - The application will continue to publish messages from the last message id in the
OutboxEvents
data store. - If the application is stopped, it will continue from the last message id in the
OutboxOffset
data store.
You can see some basic diagrams below to summarize the algorithm:
Multiple instances of the application can be run at the same time to increase the reliability of the system. But, only
one instance should be the Master Pod at a time.
MasterPodLock
is a distributed lock that is used to determine the Master Pod, and It has a TTL(time-to-live
value).
- The first instance that take the
MasterPodLock
will be the Master Pod and will start publishing messages.MasterPodLock
is a simple redis key. If lock taken, the value of the key is setted as pod name which is getting from Environment value. If Environment doesn't have pod name, GUID is used.
- The rest of the instances will be Follower Pods and will try to take the
MasterPodLock
every certain amount of time. - Also, the Master Pod will try to extend the
MasterPodLock
every certain amount of time. - If the Master Pod fails, the one of the Follower Pods will take the
MasterPodLock
and become the new Master Pod. - This feature can be disabled by setting the
MasterPodSettings.IsActive
tofalse
, but don't forget to make sure that only one instance is running at a time.
Warning
If there will be multiple instances, the MasterPodSettings.IsActive
should be set to true
. Otherwise, messages can
be duplicated or not published.
The application can be configured using the config.json
and secret.json
files. Here are the configurations you can
set:
Warning
The default values for Kafka.SaslMechanism
and Kafka.SecurityProtocol
have been removed in version 1.3.0. Please
ensure to set these values in your configuration to avoid any issues.
Key | Type | Description |
---|---|---|
Kafka.ReloadOnChange |
bool | The flag indicating whether the Kafka configuration should be reloaded when the configuration file changes. |
Kafka.SaslUsername |
string | The username for the SASL authentication of the Kafka cluster. |
Kafka.Brokers |
string | The addresses of the Kafka brokers. |
Kafka.SaslPassword |
string | The password for the SASL authentication of the Kafka cluster. |
Kafka.SslCaLocation |
string | The location of the SSL certificate for the Kafka cluster. |
Kafka.SslKeystorePassword |
string | The SSL Keystore Password |
Kafka.SaslMechanism |
string | The SSL Mechanism. |
Kafka.SecurityProtocol |
string | The SSL Protocol. |
Kafka.BatchSize |
string | The Batch Size of the Kafka Publisher. Default: 512 * 1024 |
Kafka.LingerMs |
string | The Linger of the Kafka Publisher. Default: 10 |
Kafka.CompressionType |
string | The Compression Type of the Kafka Publisher. Default: Snappy |
Kafka.MessageMaxBytes |
string | The Message Max Bytes of the Kafka Publisher. Default: 30000000 |
Kafka.Acks |
string | The Acks of the Kafka Publisher Default: Leader |
BenchMarkOptions.IsPublishingOn |
bool | A flag indicating whether publishing is on. Can be used when load testing. |
WorkerSettings.OutboxEventsBatchSize |
int | The batch size for outbox events. The amount of messages is published in paralells. |
WorkerSettings.QueueWaitDuration |
int | The wait duration for to take next batch of messages. |
WorkerSettings.MissingEventsBatchSize |
int | The batch size for missing events. The amount of missing messages is published in paralells. |
WorkerSettings.MissingEventsWaitDuration |
int | The wait duration to take next batch of missing events. |
WorkerSettings.MissingEventsMaxRetryCount |
int | The maximum retry count for missing events. |
WorkerSettings.BrokerErrorsMaxRetryCount |
int | The maximum retry count for broker errors. |
WorkerSettings.RedeliveryDelayAfterError |
int | The delay after an error before redelivery the batch of missing events. |
ConnectionString |
string | The connection string for the SQL database. It can be used for both MsSQL and PostgreSQL. |
ReadOnlyConnectionString |
string | The connection string for the Readonly SQL database. This config is optional and only work for MsSQL. If doesn't provided ConnectionString will be used. |
Couchbase.Host |
string | The hostname or IP address of the Couchbase server. |
Couchbase.Username |
string | The username for authentication with Couchbase. |
Couchbase.Bucket |
string | The name of the Couchbase bucket to access data. |
Couchbase.Scope |
string | The scope within the bucket to access data. |
Couchbase.Password |
string | The password for authentication with Couchbase. |
DataStoreSettings.DatabaseType |
string | The chosen database type. It can be MSSQL, Couchbase or PostgreSQL |
DataStoreSettings.OutboxEvents |
string | The name of the outbox event data store. |
DataStoreSettings.MissingEvents |
string | The name of the missing event data store. |
DataStoreSettings.ExceededEvents |
string | The name of the exceeded event data store. |
DataStoreSettings.OutboxOffset |
string | The name of the outbox offset data store. Holds the last published OutboxEvents Id |
MasterPodSettings.IsActive |
bool | A flag indicating whether the master pod checker is active. Should be active if multiple pods is using. |
MasterPodSettings.CacheName |
string | The name of the distributed lock key. This key should be same for the multiple instances of the app. |
MasterPodSettings.MasterPodLifetime |
int | The lifetime of the master pod. The TTL of the distributed lock. |
MasterPodSettings.MasterPodRaceInterval |
int | The interval to take MasterPodLock for MasterPod and FollowerPods. |
MasterPodSettings.IsMasterPodCheckInterval |
int | The check interval for the FollowerPods . The check without intervals causes high CPU usage; because of that, this is needed. |
Redis.Endpoints |
string | The endpoints for the Redis instance. |
Redis.DefaultDatabase |
int | The default database for the Redis instance. |
Redis.Config |
string | The configuration for the Redis instance. |
Redis.Password |
string | The password for the Redis instance. |
Serilog |
object | The configuration for Serilog. |
In the examples folder, you'll find example
files for config.json
, secret.json
, and implementation on how to insert messages for each database type
Warning
For the Couchbase, incremental ID is used for the OutboxEvents
data store. If you want to use the Couchbase, you
should use a Counter for the ID. You can find the example code in the CouchbaseExample
class.
Released under the MIT License.
See the CONTRIBUTING file for details.