Skip to content

Commit

Permalink
feat(destination): Introduced distributed lock, task delivery mapper,…
Browse files Browse the repository at this point in the history
… and deployable workers (#77)
  • Loading branch information
PouriaSeyfi authored May 8, 2024
1 parent 7416db0 commit 9945004
Show file tree
Hide file tree
Showing 29 changed files with 383 additions and 177 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ logs/
mise.log

destination/logs.json
destination/worker_log.json

temp
.env
Expand Down
61 changes: 61 additions & 0 deletions adapter/etcd/adapter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package etcd

import (
"context"
"fmt"
"log/slog"
"time"

clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
)

type Config struct {
Host string `koanf:"host"`
Port int `koanf:"port"`
DialTimeoutSeconds uint8 `koanf:"dial_timeout"`
}

type Adapter struct {
client *clientv3.Client
}

func New(config Config) (Adapter, error) {
etcdClient, err := clientv3.New(clientv3.Config{
Endpoints: []string{fmt.Sprintf("%s:%d", config.Host, config.Port)},
DialTimeout: time.Duration(config.DialTimeoutSeconds) * time.Second,
})
if err != nil {
slog.Error("Error creating etcd client: ", err)

return Adapter{}, err
}

return Adapter{
client: etcdClient,
}, nil
}

func (a Adapter) Client() *clientv3.Client {
return a.client
}

func (a Adapter) Close() error {
return a.client.Close()
}

func (a Adapter) Lock(ctx context.Context, key string, ttl int64) (unlock func() error, err error) {
session, err := concurrency.NewSession(a.client, concurrency.WithTTL(int(ttl)))
if err != nil {
return nil, err
}

mutex := concurrency.NewMutex(session, key)
if err := mutex.Lock(ctx); err != nil {
return nil, err
}

return func() error {
return mutex.Unlock(ctx)
}, nil
}
108 changes: 108 additions & 0 deletions cmd/destination/delivery_workers/webhook_delivery_worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package main

import (
"log"
"log/slog"
"os"
"os/signal"
"sync"
"time"

"github.com/ormushq/ormus/adapter/etcd"
"github.com/ormushq/ormus/adapter/redis"
"github.com/ormushq/ormus/config"
"github.com/ormushq/ormus/destination/taskdelivery"
"github.com/ormushq/ormus/destination/taskdelivery/adapters/fakedeliveryhandler"
"github.com/ormushq/ormus/destination/taskmanager/adapter/rabbitmqtaskmanager"
"github.com/ormushq/ormus/destination/taskservice"
"github.com/ormushq/ormus/destination/taskservice/adapter/idempotency/redistaskidempotency"
"github.com/ormushq/ormus/destination/taskservice/adapter/repository/inmemorytaskrepo"
"github.com/ormushq/ormus/destination/worker"
"github.com/ormushq/ormus/logger"
)

const waitingAfterShutdownInSeconds = 2

func main() {
done := make(chan bool)
wg := sync.WaitGroup{}

//----------------- Setup Logger -----------------//

fileMaxSizeInMB := 10
fileMaxAgeInDays := 30

cfg := logger.Config{
FilePath: "./destination/worker_log.json",
UseLocalTime: false,
FileMaxSizeInMB: fileMaxSizeInMB,
FileMaxAgeInDays: fileMaxAgeInDays,
}

logLevel := slog.LevelInfo
if config.C().Destination.DebugMode {
logLevel = slog.LevelDebug
}

opt := slog.HandlerOptions{
// todo should level debug be read from config?
Level: logLevel,
}
l := logger.New(cfg, &opt)
slog.SetDefault(l)

//----------------- Setup Task Service -----------------//

redisAdapter, err := redis.New(config.C().Redis)
if err != nil {
log.Panicf("error in new redis")
}
taskIdempotency := redistaskidempotency.New(redisAdapter, "tasks:", 30*24*time.Hour)

taskRepo := inmemorytaskrepo.New()

// Set up etcd as distributed locker.
distributedLocker, err := etcd.New(config.C().Etcd)
if err != nil {
log.Panicf("Error on new etcd")
}

taskHandler := taskservice.New(taskIdempotency, taskRepo, distributedLocker)

// Register delivery handlers
// each destination type can have specific delivery handler
fakeTaskDeliveryHandler := fakedeliveryhandler.New()
taskdelivery.Register("webhook", fakeTaskDeliveryHandler)

//----------------- Consume ProcessEvents -----------------//

taskConsumerConf := config.C().Destination.RabbitMQTaskManagerConnection
webhookTaskConsumer := rabbitmqtaskmanager.NewTaskConsumer(taskConsumerConf, "webhook_tasks_queue")

processedEvents, err := webhookTaskConsumer.Consume(done, &wg)
if err != nil {
log.Panicf("Error on consuming tasks.")
}

w1 := worker.NewWorker(processedEvents, taskHandler)

w1Err := w1.Run(done, &wg)
if w1Err != nil {
log.Panicf("%s: %s", "Error on webhook worker", err)
}

//----------------- Handling graceful shutdown -----------------//

quit := make(chan os.Signal, 1)
signal.Notify(quit, os.Interrupt)
<-quit

slog.Info("Received interrupt signal, shutting down gracefully...")
done <- true

close(done)

// todo use config for waiting time after graceful shutdown
time.Sleep(waitingAfterShutdownInSeconds * time.Second)
wg.Wait()
}
5 changes: 2 additions & 3 deletions cmd/destination/faker/fake_processed_event_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/ormushq/ormus/config"
"log"
"time"

"github.com/ormushq/ormus/config"
"github.com/ormushq/ormus/event"
"github.com/ormushq/ormus/manager/entity"
amqp "github.com/rabbitmq/amqp091-go"
Expand All @@ -25,7 +25,6 @@ func main() {
rmqConsumerConnConfig := config.C().Destination.RabbitMQConsumerConnection
conn, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%d/", rmqConsumerConnConfig.User,
rmqConsumerConnConfig.Password, rmqConsumerConnConfig.Host, rmqConsumerConnConfig.Port))
//conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")

defer func(conn *amqp.Connection) {
err = conn.Close()
Expand Down Expand Up @@ -54,7 +53,7 @@ func main() {
defer cancel()

fakeIntegration := entity.Integration{
ID: "5",
ID: "10",
SourceID: "1",
Metadata: entity.DestinationMetadata{
ID: "1",
Expand Down
56 changes: 23 additions & 33 deletions cmd/destination/main.go
Original file line number Diff line number Diff line change
@@ -1,34 +1,32 @@
package main

import (
"fmt"
"log"
"log/slog"
"os"
"os/signal"
"sync"
"time"

"github.com/ormushq/ormus/adapter/redis"
"github.com/ormushq/ormus/config"
"github.com/ormushq/ormus/destination/processedevent/adapter/rabbitmqconsumer"
"github.com/ormushq/ormus/destination/taskcoordinator/adapter/dtcoordinator"
"github.com/ormushq/ormus/destination/taskservice"
"github.com/ormushq/ormus/destination/taskservice/adapter/idempotency/redistaskidempotency"
"github.com/ormushq/ormus/destination/taskservice/adapter/repository/inmemorytaskrepo"
"github.com/ormushq/ormus/destination/taskmanager/adapter/rabbitmqtaskmanager"
"github.com/ormushq/ormus/logger"
"github.com/ormushq/ormus/manager/entity"
)

const waitingAfterShutdownInSeconds = 2
const waitingAfterShutdownInSeconds = 1

func main() {
done := make(chan bool)
wg := sync.WaitGroup{}

//----------------- Setup Logger -----------------//

fileMaxSizeInMB := 10
fileMaxAgeInDays := 30

//------ Setup logger ------
cfg := logger.Config{
FilePath: "./destination/logs.json",
UseLocalTime: false,
Expand All @@ -46,23 +44,11 @@ func main() {
Level: logLevel,
}
l := logger.New(cfg, &opt)
slog.SetDefault(l)

// Setup Task Service

// In-Memory task idempotency
// taskIdempotency := inmemorytaskidempotency.New()
// use slog as default logger.
slog.SetDefault(l)

// Redis task idempotency
// todo do we need to use separate db number for redis task idempotency or destination module?
redisAdapter, err := redis.New(config.C().Redis)
if err != nil {
log.Panicf("error in new redis")
}
taskIdempotency := redistaskidempotency.New(redisAdapter, "tasks:", 30*24*time.Hour)
taskRepo := inmemorytaskrepo.New()
taskService := taskservice.New(taskIdempotency, taskRepo)
//----- Consuming processed events -----//
//----------------- Consume Processed Events From Core -----------------//

// Get connection config for rabbitMQ consumer
rmqConsumerConnConfig := config.C().Destination.RabbitMQConsumerConnection
Expand All @@ -71,35 +57,39 @@ func main() {
// todo should we consider array of topics?
rmqConsumer := rabbitmqconsumer.New(rmqConsumerConnConfig, rmqConsumerTopic)

log.Println("Start Consuming processed events.")
slog.Info("Start Consuming processed events.")
processedEvents, err := rmqConsumer.Consume(done, &wg)
if err != nil {
log.Panicf("Error on consuming processed events.")
}

//----- Setup Task Coordinator -----//
// Task coordinator specifies which task manager should handle incoming processed events.
// we can have different task coordinators base on destination type, customer plans, etc.
// Now we just create dtcoordinator that stands for destination type coordinator.
// It determines which task manager should be used for processed evens considering destination type of processed events.
//----------------- Setup Task Coordinator -----------------//

// Task coordinator is responsible for considering task's characteristics
// and publish it to task queue using task publisher. currently we support
// destination type coordinator which means every task with specific destination type
// will be published to its corresponding task publisher.

taskPublisherCnf := config.C().Destination.RabbitMQTaskManagerConnection
webhookTaskPublisher := rabbitmqtaskmanager.NewTaskPublisher(taskPublisherCnf, "webhook_tasks_queue")

// todo maybe it is better to having configs for setup of task coordinator.
taskPublishers := make(dtcoordinator.TaskPublisherMap)
taskPublishers[entity.WebhookDestinationType] = webhookTaskPublisher

rmqTaskManagerConnConfig := config.C().Destination.RabbitMQTaskManagerConnection
coordinator := dtcoordinator.New(taskService, rmqTaskManagerConnConfig)
coordinator := dtcoordinator.New(taskPublishers)

cErr := coordinator.Start(processedEvents, done, &wg)
if cErr != nil {
log.Panicf("Error on starting destination type coordinator.")
}

//----- Handling graceful shutdown -----//
//----------------- Handling graceful shutdown -----------------//

quit := make(chan os.Signal, 1)
signal.Notify(quit, os.Interrupt)
<-quit

fmt.Println("Received interrupt signal, shutting down gracefully...")
slog.Info("Received interrupt signal, shutting down gracefully...")
done <- true

close(done)
Expand Down
4 changes: 4 additions & 0 deletions config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ redis:
host: localhost
db: 0
password: ""
etcd:
port: 2379
host: localhost
dial_timeout: 5
destination:
debug_mode: true
consumer_topic: "pe.#" # pe stands for processed event. and # substitute for zero or more words.
Expand Down
4 changes: 3 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
package config

import (
"github.com/ormushq/ormus/adapter/etcd"
"github.com/ormushq/ormus/adapter/redis"
"github.com/ormushq/ormus/destination/dconfig"
"github.com/ormushq/ormus/manager"
"github.com/ormushq/ormus/source"
)

type Config struct {
Manager manager.Config `koanf:"manager"`
Redis redis.Config `koanf:"redis"`
Etcd etcd.Config `koanf:"etcd"`
Manager manager.Config `koanf:"manager"`
Source source.Config `koanf:"source"`
Destination dconfig.Config `koanf:"destination"`
}
Loading

0 comments on commit 9945004

Please sign in to comment.