Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(destination): Introduced distributed lock, task delivery mapper, and deployable workers #77

Merged
merged 2 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ logs/
mise.log

destination/logs.json
destination/worker_log.json

temp
.env
Expand Down
61 changes: 61 additions & 0 deletions adapter/etcd/adapter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package etcd

import (
"context"
"fmt"
"log/slog"
"time"

clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
)

type Config struct {
Host string `koanf:"host"`
Port int `koanf:"port"`
DialTimeoutSeconds uint8 `koanf:"dial_timeout"`
}

type Adapter struct {
client *clientv3.Client
}

func New(config Config) (Adapter, error) {
etcdClient, err := clientv3.New(clientv3.Config{
Endpoints: []string{fmt.Sprintf("%s:%d", config.Host, config.Port)},
DialTimeout: time.Duration(config.DialTimeoutSeconds) * time.Second,
})
if err != nil {
slog.Error("Error creating etcd client: ", err)

return Adapter{}, err
}

return Adapter{
client: etcdClient,
}, nil
}

func (a Adapter) Client() *clientv3.Client {
return a.client
}

func (a Adapter) Close() error {
return a.client.Close()
}

func (a Adapter) Lock(ctx context.Context, key string, ttl int64) (unlock func() error, err error) {
session, err := concurrency.NewSession(a.client, concurrency.WithTTL(int(ttl)))
if err != nil {
return nil, err
}

mutex := concurrency.NewMutex(session, key)
if err := mutex.Lock(ctx); err != nil {
return nil, err
}

return func() error {
return mutex.Unlock(ctx)
}, nil
}
108 changes: 108 additions & 0 deletions cmd/destination/delivery_workers/webhook_delivery_worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package main

import (
"log"
"log/slog"
"os"
"os/signal"
"sync"
"time"

"github.com/ormushq/ormus/adapter/etcd"
"github.com/ormushq/ormus/adapter/redis"
"github.com/ormushq/ormus/config"
"github.com/ormushq/ormus/destination/taskdelivery"
"github.com/ormushq/ormus/destination/taskdelivery/adapters/fakedeliveryhandler"
"github.com/ormushq/ormus/destination/taskmanager/adapter/rabbitmqtaskmanager"
"github.com/ormushq/ormus/destination/taskservice"
"github.com/ormushq/ormus/destination/taskservice/adapter/idempotency/redistaskidempotency"
"github.com/ormushq/ormus/destination/taskservice/adapter/repository/inmemorytaskrepo"
"github.com/ormushq/ormus/destination/worker"
"github.com/ormushq/ormus/logger"
)

const waitingAfterShutdownInSeconds = 2

func main() {
done := make(chan bool)
wg := sync.WaitGroup{}

//----------------- Setup Logger -----------------//

fileMaxSizeInMB := 10
fileMaxAgeInDays := 30

cfg := logger.Config{
FilePath: "./destination/worker_log.json",
UseLocalTime: false,
FileMaxSizeInMB: fileMaxSizeInMB,
FileMaxAgeInDays: fileMaxAgeInDays,
}

logLevel := slog.LevelInfo
if config.C().Destination.DebugMode {
logLevel = slog.LevelDebug
}

opt := slog.HandlerOptions{
// todo should level debug be read from config?
Level: logLevel,
}
l := logger.New(cfg, &opt)
slog.SetDefault(l)

//----------------- Setup Task Service -----------------//

redisAdapter, err := redis.New(config.C().Redis)
if err != nil {
log.Panicf("error in new redis")
}
taskIdempotency := redistaskidempotency.New(redisAdapter, "tasks:", 30*24*time.Hour)

taskRepo := inmemorytaskrepo.New()

// Set up etcd as distributed locker.
distributedLocker, err := etcd.New(config.C().Etcd)
if err != nil {
log.Panicf("Error on new etcd")
}

taskHandler := taskservice.New(taskIdempotency, taskRepo, distributedLocker)

// Register delivery handlers
// each destination type can have specific delivery handler
fakeTaskDeliveryHandler := fakedeliveryhandler.New()
taskdelivery.Register("webhook", fakeTaskDeliveryHandler)

//----------------- Consume ProcessEvents -----------------//

taskConsumerConf := config.C().Destination.RabbitMQTaskManagerConnection
webhookTaskConsumer := rabbitmqtaskmanager.NewTaskConsumer(taskConsumerConf, "webhook_tasks_queue")

processedEvents, err := webhookTaskConsumer.Consume(done, &wg)
if err != nil {
log.Panicf("Error on consuming tasks.")
}

w1 := worker.NewWorker(processedEvents, taskHandler)

w1Err := w1.Run(done, &wg)
if w1Err != nil {
log.Panicf("%s: %s", "Error on webhook worker", err)
}

//----------------- Handling graceful shutdown -----------------//

quit := make(chan os.Signal, 1)
signal.Notify(quit, os.Interrupt)
<-quit

slog.Info("Received interrupt signal, shutting down gracefully...")
done <- true

close(done)

// todo use config for waiting time after graceful shutdown
time.Sleep(waitingAfterShutdownInSeconds * time.Second)
wg.Wait()
}
5 changes: 2 additions & 3 deletions cmd/destination/faker/fake_processed_event_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/ormushq/ormus/config"
"log"
"time"

"github.com/ormushq/ormus/config"
"github.com/ormushq/ormus/event"
"github.com/ormushq/ormus/manager/entity"
amqp "github.com/rabbitmq/amqp091-go"
Expand All @@ -25,7 +25,6 @@ func main() {
rmqConsumerConnConfig := config.C().Destination.RabbitMQConsumerConnection
conn, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%d/", rmqConsumerConnConfig.User,
rmqConsumerConnConfig.Password, rmqConsumerConnConfig.Host, rmqConsumerConnConfig.Port))
//conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")

defer func(conn *amqp.Connection) {
err = conn.Close()
Expand Down Expand Up @@ -54,7 +53,7 @@ func main() {
defer cancel()

fakeIntegration := entity.Integration{
ID: "5",
ID: "10",
SourceID: "1",
Metadata: entity.DestinationMetadata{
ID: "1",
Expand Down
56 changes: 23 additions & 33 deletions cmd/destination/main.go
Original file line number Diff line number Diff line change
@@ -1,34 +1,32 @@
package main

import (
"fmt"
"log"
"log/slog"
"os"
"os/signal"
"sync"
"time"

"github.com/ormushq/ormus/adapter/redis"
"github.com/ormushq/ormus/config"
"github.com/ormushq/ormus/destination/processedevent/adapter/rabbitmqconsumer"
"github.com/ormushq/ormus/destination/taskcoordinator/adapter/dtcoordinator"
"github.com/ormushq/ormus/destination/taskservice"
"github.com/ormushq/ormus/destination/taskservice/adapter/idempotency/redistaskidempotency"
"github.com/ormushq/ormus/destination/taskservice/adapter/repository/inmemorytaskrepo"
"github.com/ormushq/ormus/destination/taskmanager/adapter/rabbitmqtaskmanager"
"github.com/ormushq/ormus/logger"
"github.com/ormushq/ormus/manager/entity"
)

const waitingAfterShutdownInSeconds = 2
const waitingAfterShutdownInSeconds = 1

func main() {
done := make(chan bool)
wg := sync.WaitGroup{}

//----------------- Setup Logger -----------------//

fileMaxSizeInMB := 10
fileMaxAgeInDays := 30

//------ Setup logger ------
cfg := logger.Config{
FilePath: "./destination/logs.json",
UseLocalTime: false,
Expand All @@ -46,23 +44,11 @@ func main() {
Level: logLevel,
}
l := logger.New(cfg, &opt)
slog.SetDefault(l)

// Setup Task Service

// In-Memory task idempotency
// taskIdempotency := inmemorytaskidempotency.New()
// use slog as default logger.
slog.SetDefault(l)

// Redis task idempotency
// todo do we need to use separate db number for redis task idempotency or destination module?
redisAdapter, err := redis.New(config.C().Redis)
if err != nil {
log.Panicf("error in new redis")
}
taskIdempotency := redistaskidempotency.New(redisAdapter, "tasks:", 30*24*time.Hour)
taskRepo := inmemorytaskrepo.New()
taskService := taskservice.New(taskIdempotency, taskRepo)
//----- Consuming processed events -----//
//----------------- Consume Processed Events From Core -----------------//

// Get connection config for rabbitMQ consumer
rmqConsumerConnConfig := config.C().Destination.RabbitMQConsumerConnection
Expand All @@ -71,35 +57,39 @@ func main() {
// todo should we consider array of topics?
rmqConsumer := rabbitmqconsumer.New(rmqConsumerConnConfig, rmqConsumerTopic)

log.Println("Start Consuming processed events.")
slog.Info("Start Consuming processed events.")
processedEvents, err := rmqConsumer.Consume(done, &wg)
if err != nil {
log.Panicf("Error on consuming processed events.")
}

//----- Setup Task Coordinator -----//
// Task coordinator specifies which task manager should handle incoming processed events.
// we can have different task coordinators base on destination type, customer plans, etc.
// Now we just create dtcoordinator that stands for destination type coordinator.
// It determines which task manager should be used for processed evens considering destination type of processed events.
//----------------- Setup Task Coordinator -----------------//

// Task coordinator is responsible for considering task's characteristics
// and publish it to task queue using task publisher. currently we support
// destination type coordinator which means every task with specific destination type
// will be published to its corresponding task publisher.

taskPublisherCnf := config.C().Destination.RabbitMQTaskManagerConnection
webhookTaskPublisher := rabbitmqtaskmanager.NewTaskPublisher(taskPublisherCnf, "webhook_tasks_queue")

// todo maybe it is better to having configs for setup of task coordinator.
taskPublishers := make(dtcoordinator.TaskPublisherMap)
taskPublishers[entity.WebhookDestinationType] = webhookTaskPublisher

rmqTaskManagerConnConfig := config.C().Destination.RabbitMQTaskManagerConnection
coordinator := dtcoordinator.New(taskService, rmqTaskManagerConnConfig)
coordinator := dtcoordinator.New(taskPublishers)

cErr := coordinator.Start(processedEvents, done, &wg)
if cErr != nil {
log.Panicf("Error on starting destination type coordinator.")
}

//----- Handling graceful shutdown -----//
//----------------- Handling graceful shutdown -----------------//

quit := make(chan os.Signal, 1)
signal.Notify(quit, os.Interrupt)
<-quit

fmt.Println("Received interrupt signal, shutting down gracefully...")
slog.Info("Received interrupt signal, shutting down gracefully...")
done <- true

close(done)
Expand Down
4 changes: 4 additions & 0 deletions config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ redis:
host: localhost
db: 0
password: ""
etcd:
port: 2379
host: localhost
dial_timeout: 5
destination:
debug_mode: true
consumer_topic: "pe.#" # pe stands for processed event. and # substitute for zero or more words.
Expand Down
4 changes: 3 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
package config

import (
"github.com/ormushq/ormus/adapter/etcd"
"github.com/ormushq/ormus/adapter/redis"
"github.com/ormushq/ormus/destination/dconfig"
"github.com/ormushq/ormus/manager"
"github.com/ormushq/ormus/source"
)

type Config struct {
Manager manager.Config `koanf:"manager"`
Redis redis.Config `koanf:"redis"`
Etcd etcd.Config `koanf:"etcd"`
Manager manager.Config `koanf:"manager"`
Source source.Config `koanf:"source"`
Destination dconfig.Config `koanf:"destination"`
}
Loading
Loading