Skip to content

Commit

Permalink
feat(destination): implement task handler using etcd adapter, separat…
Browse files Browse the repository at this point in the history
…e workers from task coordinators
  • Loading branch information
PouriaSeyfi committed May 7, 2024
1 parent 1f47321 commit 1001d91
Show file tree
Hide file tree
Showing 22 changed files with 277 additions and 160 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ logs/
mise.log

destination/logs.json
destination/worker_log.json

temp
.env
Expand Down
109 changes: 109 additions & 0 deletions cmd/destination/delivery_workers/webhook_delivery_worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package main

import (
"fmt"
"log"
"log/slog"
"os"
"os/signal"
"sync"
"time"

"github.com/ormushq/ormus/adapter/etcd"
"github.com/ormushq/ormus/adapter/redis"
"github.com/ormushq/ormus/config"
"github.com/ormushq/ormus/destination/taskdelivery"
"github.com/ormushq/ormus/destination/taskdelivery/adapters/fakedeliveryhandler"
"github.com/ormushq/ormus/destination/taskmanager/adapter/rabbitmqtaskmanager"
"github.com/ormushq/ormus/destination/taskservice"
"github.com/ormushq/ormus/destination/taskservice/adapter/idempotency/redistaskidempotency"
"github.com/ormushq/ormus/destination/taskservice/adapter/repository/inmemorytaskrepo"
"github.com/ormushq/ormus/destination/worker"
"github.com/ormushq/ormus/logger"
)

const waitingAfterShutdownInSeconds = 2

func main() {
done := make(chan bool)
wg := sync.WaitGroup{}

//----------------- Setup Logger -----------------//

fileMaxSizeInMB := 10
fileMaxAgeInDays := 30

cfg := logger.Config{
FilePath: "./destination/worker_log.json",
UseLocalTime: false,
FileMaxSizeInMB: fileMaxSizeInMB,
FileMaxAgeInDays: fileMaxAgeInDays,
}

logLevel := slog.LevelInfo
if config.C().Destination.DebugMode {
logLevel = slog.LevelDebug
}

opt := slog.HandlerOptions{
// todo should level debug be read from config?
Level: logLevel,
}
l := logger.New(cfg, &opt)
slog.SetDefault(l)

//----------------- Setup Task Service -----------------//

redisAdapter, err := redis.New(config.C().Redis)
if err != nil {
log.Panicf("error in new redis")
}
taskIdempotency := redistaskidempotency.New(redisAdapter, "tasks:", 30*24*time.Hour)

taskRepo := inmemorytaskrepo.New()

// Set up etcd as distributed locker.
distributedLocker, err := etcd.New(config.C().Etcd)
if err != nil {
log.Panicf("Error on new etcd")
}

taskService := taskservice.New(taskIdempotency, taskRepo, distributedLocker)

// Register delivery handlers
// each destination type can have specific delivery handler
fakeTaskDeliveryHandler := fakedeliveryhandler.New()
taskdelivery.Register("webhook", fakeTaskDeliveryHandler)

//----------------- Consume ProcessEvents -----------------//

taskConsumerConf := config.C().Destination.RabbitMQTaskManagerConnection
webhookTaskConsumer := rabbitmqtaskmanager.NewTaskConsumer(taskConsumerConf, "webhook_tasks_queue")

processedEvents, err := webhookTaskConsumer.Consume(done, &wg)
if err != nil {
log.Panicf("Error on consuming tasks.")
}

w1 := worker.NewWorker(processedEvents, taskService)

w1Err := w1.Run(done, &wg)
if w1Err != nil {
log.Panicf("%s: %s", "Error on webhook worker", err)
}

//----------------- Handling graceful shutdown -----------------//

quit := make(chan os.Signal, 1)
signal.Notify(quit, os.Interrupt)
<-quit

fmt.Println("Received interrupt signal, shutting down gracefully...")
done <- true

close(done)

// todo use config for waiting time after graceful shutdown
time.Sleep(waitingAfterShutdownInSeconds * time.Second)
wg.Wait()
}
5 changes: 2 additions & 3 deletions cmd/destination/faker/fake_processed_event_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/ormushq/ormus/config"
"log"
"time"

"github.com/ormushq/ormus/config"
"github.com/ormushq/ormus/event"
"github.com/ormushq/ormus/manager/entity"
amqp "github.com/rabbitmq/amqp091-go"
Expand All @@ -25,7 +25,6 @@ func main() {
rmqConsumerConnConfig := config.C().Destination.RabbitMQConsumerConnection
conn, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%d/", rmqConsumerConnConfig.User,
rmqConsumerConnConfig.Password, rmqConsumerConnConfig.Host, rmqConsumerConnConfig.Port))
//conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")

defer func(conn *amqp.Connection) {
err = conn.Close()
Expand Down Expand Up @@ -54,7 +53,7 @@ func main() {
defer cancel()

fakeIntegration := entity.Integration{
ID: "5",
ID: "9",
SourceID: "1",
Metadata: entity.DestinationMetadata{
ID: "1",
Expand Down
48 changes: 19 additions & 29 deletions cmd/destination/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,13 @@ import (
"sync"
"time"

"github.com/ormushq/ormus/adapter/redis"
"github.com/ormushq/ormus/config"
"github.com/ormushq/ormus/destination/processedevent/adapter/rabbitmqconsumer"
"github.com/ormushq/ormus/destination/taskcoordinator/adapter/dtcoordinator"
"github.com/ormushq/ormus/destination/taskservice"
"github.com/ormushq/ormus/destination/taskservice/adapter/idempotency/redistaskidempotency"
"github.com/ormushq/ormus/destination/taskservice/adapter/repository/inmemorytaskrepo"
"github.com/ormushq/ormus/destination/taskmanager"
"github.com/ormushq/ormus/destination/taskmanager/adapter/rabbitmqtaskmanager"
"github.com/ormushq/ormus/logger"
"github.com/ormushq/ormus/manager/entity"
)

const waitingAfterShutdownInSeconds = 2
Expand All @@ -25,10 +24,11 @@ func main() {
done := make(chan bool)
wg := sync.WaitGroup{}

//----------------- Setup Logger -----------------//

fileMaxSizeInMB := 10
fileMaxAgeInDays := 30

//------ Setup logger ------
cfg := logger.Config{
FilePath: "./destination/logs.json",
UseLocalTime: false,
Expand All @@ -46,23 +46,11 @@ func main() {
Level: logLevel,
}
l := logger.New(cfg, &opt)
slog.SetDefault(l)

// Setup Task Service

// In-Memory task idempotency
// taskIdempotency := inmemorytaskidempotency.New()
// use slog as default logger.
slog.SetDefault(l)

// Redis task idempotency
// todo do we need to use separate db number for redis task idempotency or destination module?
redisAdapter, err := redis.New(config.C().Redis)
if err != nil {
log.Panicf("error in new redis")
}
taskIdempotency := redistaskidempotency.New(redisAdapter, "tasks:", 30*24*time.Hour)
taskRepo := inmemorytaskrepo.New()
taskService := taskservice.New(taskIdempotency, taskRepo)
//----- Consuming processed events -----//
//----------------- Consume Processed Events From Core -----------------//

// Get connection config for rabbitMQ consumer
rmqConsumerConnConfig := config.C().Destination.RabbitMQConsumerConnection
Expand All @@ -77,23 +65,25 @@ func main() {
log.Panicf("Error on consuming processed events.")
}

//----- Setup Task Coordinator -----//
// Task coordinator specifies which task manager should handle incoming processed events.
// we can have different task coordinators base on destination type, customer plans, etc.
// Now we just create dtcoordinator that stands for destination type coordinator.
// It determines which task manager should be used for processed evens considering destination type of processed events.
//----------------- Setup Task Coordinator -----------------//

// Task coordinator is responsible for considering task's characteristics
// and publish it to task queue using task publisher, base on specific logics

taskPublisherCnf := config.C().Destination.RabbitMQTaskManagerConnection
webhookTaskPublisher := rabbitmqtaskmanager.NewTaskPublisher(taskPublisherCnf, "webhook_tasks_queue")

// todo maybe it is better to having configs for setup of task coordinator.
taskPublishers := make(map[entity.DestinationType]taskmanager.Publisher)
taskPublishers[entity.WebhookDestinationType] = webhookTaskPublisher

rmqTaskManagerConnConfig := config.C().Destination.RabbitMQTaskManagerConnection
coordinator := dtcoordinator.New(taskService, rmqTaskManagerConnConfig)
coordinator := dtcoordinator.New(taskPublishers)

cErr := coordinator.Start(processedEvents, done, &wg)
if cErr != nil {
log.Panicf("Error on starting destination type coordinator.")
}

//----- Handling graceful shutdown -----//
//----------------- Handling graceful shutdown -----------------//

quit := make(chan os.Signal, 1)
signal.Notify(quit, os.Interrupt)
Expand Down
28 changes: 15 additions & 13 deletions destination/entity/taskentity/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/ormushq/ormus/event"
"github.com/ormushq/ormus/manager/entity"
)

type IntegrationDeliveryStatus uint8
Expand All @@ -20,13 +21,13 @@ const (

// Task represents a delivering processed event to corresponding third party integrations.
type Task struct {
ID string
IntegrationDeliveryStatus IntegrationDeliveryStatus
Attempts uint8
FailedReason *string
ProcessedEvent event.ProcessedEvent
CreatedAt time.Time
UpdatedAt time.Time
ID string
DeliveryStatus IntegrationDeliveryStatus
Attempts uint8
FailedReason *string
ProcessedEvent event.ProcessedEvent
CreatedAt time.Time
UpdatedAt time.Time
}

func (t IntegrationDeliveryStatus) String() string {
Expand All @@ -51,15 +52,16 @@ func (t IntegrationDeliveryStatus) IsBroadcast() bool {

func MakeTaskUsingProcessedEvent(pe event.ProcessedEvent) Task {
return Task{
ID: pe.ID(),
IntegrationDeliveryStatus: NotExecutedTaskStatus,
Attempts: 0,
CreatedAt: time.Time{},
UpdatedAt: time.Time{},
ID: pe.ID(),
ProcessedEvent: pe,
DeliveryStatus: NotExecutedTaskStatus,
Attempts: 0,
CreatedAt: time.Time{},
UpdatedAt: time.Time{},
}
}

func (t Task) DestinationSlug() string {
func (t Task) DestinationSlug() entity.DestinationType {
return t.ProcessedEvent.Integration.Metadata.Slug
}

Expand Down
44 changes: 6 additions & 38 deletions destination/taskcoordinator/adapter/dtcoordinator/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,44 +2,29 @@ package dtcoordinator

import (
"fmt"
"log"
"log/slog"
"sync"

"github.com/ormushq/ormus/destination/dconfig"
"github.com/ormushq/ormus/destination/taskdelivery/adapters/fakedeliveryhandler"
"github.com/ormushq/ormus/destination/taskmanager"
"github.com/ormushq/ormus/destination/taskmanager/adapter/rabbitmqtaskmanager"
"github.com/ormushq/ormus/destination/taskservice"
"github.com/ormushq/ormus/event"
"github.com/ormushq/ormus/manager/entity"
)

// DestinationTypeCoordinator is responsible for setup task managers and publish incoming processed events using suitable task publishers.
type DestinationTypeCoordinator struct {
TaskService taskservice.Service
TaskPublishers map[string]taskmanager.Publisher
RabbitMQConnectionConfig dconfig.RabbitMQTaskManagerConnection
TaskPublishers map[entity.DestinationType]taskmanager.Publisher
}

func New(ts taskservice.Service, rmqCnf dconfig.RabbitMQTaskManagerConnection) DestinationTypeCoordinator {
// Create RabbitMQ task manager for webhook events
rmqTaskManagerForWebhooks := rabbitmqtaskmanager.NewTaskManager(rmqCnf, "webhook_tasks_queue")

taskPublishers := make(map[string]taskmanager.Publisher)
taskPublishers["webhook"] = rmqTaskManagerForWebhooks

func New(taskPublishers map[entity.DestinationType]taskmanager.Publisher) DestinationTypeCoordinator {
return DestinationTypeCoordinator{
TaskService: ts,
TaskPublishers: taskPublishers,
RabbitMQConnectionConfig: rmqCnf,
TaskPublishers: taskPublishers,
}
}

func (d DestinationTypeCoordinator) Start(processedEvents <-chan event.ProcessedEvent, done <-chan bool, wg *sync.WaitGroup) error {
wg.Add(1)
go func() {
defer wg.Done()
slog.Info("Start task coordinator [DestinationType].")
slog.Info("Start task coordinator [DestinationType]")

for {
select {
Expand All @@ -54,7 +39,7 @@ func (d DestinationTypeCoordinator) Start(processedEvents <-chan event.Processed

pErr := taskPublisher.Publish(pe)
if pErr != nil {
fmt.Println(pErr)
slog.Error(fmt.Sprintf("Error on publishing event : %s", pErr))

break
}
Expand All @@ -66,22 +51,5 @@ func (d DestinationTypeCoordinator) Start(processedEvents <-chan event.Processed
}
}()

webhookTaskConsumer := rabbitmqtaskmanager.NewTaskConsumer(d.RabbitMQConnectionConfig, "webhook_tasks_queue")

// Run workers
// todo we can use loop in range of slices of workers.
// also we can use config for number of each worker for different destination types.

taskDeliveryHandler := fakedeliveryhandler.New()
taskHandler := taskservice.NewTaskHandler(d.TaskService, taskDeliveryHandler)

webhookWorker1 := rabbitmqtaskmanager.NewWorker(webhookTaskConsumer, taskHandler)
err := webhookWorker1.Run(done, wg)
if err != nil {
log.Panicf("%s: %s", "Error on webhook worker", err)

return err
}

return nil
}
5 changes: 2 additions & 3 deletions destination/taskcoordinator/coordinator.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package taskrouter
package taskcoordinator

import (
"sync"

"github.com/ormushq/ormus/event"
)

// Coordinator is responsible for setup task managers and publish coming process events using suitable task publishers.
type Coordinator interface {
Start(processedEvents <-chan *event.ProcessedEvent, done <-chan bool, wg *sync.WaitGroup)
Start(processedEvents <-chan event.ProcessedEvent, done <-chan bool, wg *sync.WaitGroup)
}
Loading

0 comments on commit 1001d91

Please sign in to comment.