diff --git a/.gitignore b/.gitignore index 6810abe9..0996c17a 100644 --- a/.gitignore +++ b/.gitignore @@ -27,6 +27,7 @@ logs/ mise.log destination/logs.json +destination/worker_log.json temp .env diff --git a/adapter/etcd/adapter.go b/adapter/etcd/adapter.go new file mode 100644 index 00000000..e09aa652 --- /dev/null +++ b/adapter/etcd/adapter.go @@ -0,0 +1,61 @@ +package etcd + +import ( + "context" + "fmt" + "log/slog" + "time" + + clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/client/v3/concurrency" +) + +type Config struct { + Host string `koanf:"host"` + Port int `koanf:"port"` + DialTimeoutSeconds uint8 `koanf:"dial_timeout"` +} + +type Adapter struct { + client *clientv3.Client +} + +func New(config Config) (Adapter, error) { + etcdClient, err := clientv3.New(clientv3.Config{ + Endpoints: []string{fmt.Sprintf("%s:%d", config.Host, config.Port)}, + DialTimeout: time.Duration(config.DialTimeoutSeconds) * time.Second, + }) + if err != nil { + slog.Error("Error creating etcd client: ", err) + + return Adapter{}, err + } + + return Adapter{ + client: etcdClient, + }, nil +} + +func (a Adapter) Client() *clientv3.Client { + return a.client +} + +func (a Adapter) Close() error { + return a.client.Close() +} + +func (a Adapter) Lock(ctx context.Context, key string, ttl int64) (unlock func() error, err error) { + session, err := concurrency.NewSession(a.client, concurrency.WithTTL(int(ttl))) + if err != nil { + return nil, err + } + + mutex := concurrency.NewMutex(session, key) + if err := mutex.Lock(ctx); err != nil { + return nil, err + } + + return func() error { + return mutex.Unlock(ctx) + }, nil +} diff --git a/cmd/destination/delivery_workers/webhook_delivery_worker.go b/cmd/destination/delivery_workers/webhook_delivery_worker.go new file mode 100644 index 00000000..07d190f4 --- /dev/null +++ b/cmd/destination/delivery_workers/webhook_delivery_worker.go @@ -0,0 +1,108 @@ +package main + +import ( + "log" + "log/slog" + "os" + "os/signal" + "sync" + "time" + + "github.com/ormushq/ormus/adapter/etcd" + "github.com/ormushq/ormus/adapter/redis" + "github.com/ormushq/ormus/config" + "github.com/ormushq/ormus/destination/taskdelivery" + "github.com/ormushq/ormus/destination/taskdelivery/adapters/fakedeliveryhandler" + "github.com/ormushq/ormus/destination/taskmanager/adapter/rabbitmqtaskmanager" + "github.com/ormushq/ormus/destination/taskservice" + "github.com/ormushq/ormus/destination/taskservice/adapter/idempotency/redistaskidempotency" + "github.com/ormushq/ormus/destination/taskservice/adapter/repository/inmemorytaskrepo" + "github.com/ormushq/ormus/destination/worker" + "github.com/ormushq/ormus/logger" +) + +const waitingAfterShutdownInSeconds = 2 + +func main() { + done := make(chan bool) + wg := sync.WaitGroup{} + + //----------------- Setup Logger -----------------// + + fileMaxSizeInMB := 10 + fileMaxAgeInDays := 30 + + cfg := logger.Config{ + FilePath: "./destination/worker_log.json", + UseLocalTime: false, + FileMaxSizeInMB: fileMaxSizeInMB, + FileMaxAgeInDays: fileMaxAgeInDays, + } + + logLevel := slog.LevelInfo + if config.C().Destination.DebugMode { + logLevel = slog.LevelDebug + } + + opt := slog.HandlerOptions{ + // todo should level debug be read from config? + Level: logLevel, + } + l := logger.New(cfg, &opt) + slog.SetDefault(l) + + //----------------- Setup Task Service -----------------// + + redisAdapter, err := redis.New(config.C().Redis) + if err != nil { + log.Panicf("error in new redis") + } + taskIdempotency := redistaskidempotency.New(redisAdapter, "tasks:", 30*24*time.Hour) + + taskRepo := inmemorytaskrepo.New() + + // Set up etcd as distributed locker. + distributedLocker, err := etcd.New(config.C().Etcd) + if err != nil { + log.Panicf("Error on new etcd") + } + + taskHandler := taskservice.New(taskIdempotency, taskRepo, distributedLocker) + + // Register delivery handlers + // each destination type can have specific delivery handler + fakeTaskDeliveryHandler := fakedeliveryhandler.New() + taskdelivery.Register("webhook", fakeTaskDeliveryHandler) + + //----------------- Consume ProcessEvents -----------------// + + taskConsumerConf := config.C().Destination.RabbitMQTaskManagerConnection + webhookTaskConsumer := rabbitmqtaskmanager.NewTaskConsumer(taskConsumerConf, "webhook_tasks_queue") + + processedEvents, err := webhookTaskConsumer.Consume(done, &wg) + if err != nil { + log.Panicf("Error on consuming tasks.") + } + + w1 := worker.NewWorker(processedEvents, taskHandler) + + w1Err := w1.Run(done, &wg) + if w1Err != nil { + log.Panicf("%s: %s", "Error on webhook worker", err) + } + + //----------------- Handling graceful shutdown -----------------// + + quit := make(chan os.Signal, 1) + signal.Notify(quit, os.Interrupt) + <-quit + + slog.Info("Received interrupt signal, shutting down gracefully...") + done <- true + + close(done) + + // todo use config for waiting time after graceful shutdown + time.Sleep(waitingAfterShutdownInSeconds * time.Second) + wg.Wait() +} diff --git a/cmd/destination/faker/fake_processed_event_producer.go b/cmd/destination/faker/fake_processed_event_producer.go index 72e1eda3..63fcfc76 100644 --- a/cmd/destination/faker/fake_processed_event_producer.go +++ b/cmd/destination/faker/fake_processed_event_producer.go @@ -4,10 +4,10 @@ import ( "context" "encoding/json" "fmt" - "github.com/ormushq/ormus/config" "log" "time" + "github.com/ormushq/ormus/config" "github.com/ormushq/ormus/event" "github.com/ormushq/ormus/manager/entity" amqp "github.com/rabbitmq/amqp091-go" @@ -25,7 +25,6 @@ func main() { rmqConsumerConnConfig := config.C().Destination.RabbitMQConsumerConnection conn, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%d/", rmqConsumerConnConfig.User, rmqConsumerConnConfig.Password, rmqConsumerConnConfig.Host, rmqConsumerConnConfig.Port)) - //conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") defer func(conn *amqp.Connection) { err = conn.Close() @@ -54,7 +53,7 @@ func main() { defer cancel() fakeIntegration := entity.Integration{ - ID: "5", + ID: "10", SourceID: "1", Metadata: entity.DestinationMetadata{ ID: "1", diff --git a/cmd/destination/main.go b/cmd/destination/main.go index 47134301..1deb4d23 100644 --- a/cmd/destination/main.go +++ b/cmd/destination/main.go @@ -1,7 +1,6 @@ package main import ( - "fmt" "log" "log/slog" "os" @@ -9,26 +8,25 @@ import ( "sync" "time" - "github.com/ormushq/ormus/adapter/redis" "github.com/ormushq/ormus/config" "github.com/ormushq/ormus/destination/processedevent/adapter/rabbitmqconsumer" "github.com/ormushq/ormus/destination/taskcoordinator/adapter/dtcoordinator" - "github.com/ormushq/ormus/destination/taskservice" - "github.com/ormushq/ormus/destination/taskservice/adapter/idempotency/redistaskidempotency" - "github.com/ormushq/ormus/destination/taskservice/adapter/repository/inmemorytaskrepo" + "github.com/ormushq/ormus/destination/taskmanager/adapter/rabbitmqtaskmanager" "github.com/ormushq/ormus/logger" + "github.com/ormushq/ormus/manager/entity" ) -const waitingAfterShutdownInSeconds = 2 +const waitingAfterShutdownInSeconds = 1 func main() { done := make(chan bool) wg := sync.WaitGroup{} + //----------------- Setup Logger -----------------// + fileMaxSizeInMB := 10 fileMaxAgeInDays := 30 - //------ Setup logger ------ cfg := logger.Config{ FilePath: "./destination/logs.json", UseLocalTime: false, @@ -46,23 +44,11 @@ func main() { Level: logLevel, } l := logger.New(cfg, &opt) - slog.SetDefault(l) - // Setup Task Service - - // In-Memory task idempotency - // taskIdempotency := inmemorytaskidempotency.New() + // use slog as default logger. + slog.SetDefault(l) - // Redis task idempotency - // todo do we need to use separate db number for redis task idempotency or destination module? - redisAdapter, err := redis.New(config.C().Redis) - if err != nil { - log.Panicf("error in new redis") - } - taskIdempotency := redistaskidempotency.New(redisAdapter, "tasks:", 30*24*time.Hour) - taskRepo := inmemorytaskrepo.New() - taskService := taskservice.New(taskIdempotency, taskRepo) - //----- Consuming processed events -----// + //----------------- Consume Processed Events From Core -----------------// // Get connection config for rabbitMQ consumer rmqConsumerConnConfig := config.C().Destination.RabbitMQConsumerConnection @@ -71,35 +57,39 @@ func main() { // todo should we consider array of topics? rmqConsumer := rabbitmqconsumer.New(rmqConsumerConnConfig, rmqConsumerTopic) - log.Println("Start Consuming processed events.") + slog.Info("Start Consuming processed events.") processedEvents, err := rmqConsumer.Consume(done, &wg) if err != nil { log.Panicf("Error on consuming processed events.") } - //----- Setup Task Coordinator -----// - // Task coordinator specifies which task manager should handle incoming processed events. - // we can have different task coordinators base on destination type, customer plans, etc. - // Now we just create dtcoordinator that stands for destination type coordinator. - // It determines which task manager should be used for processed evens considering destination type of processed events. + //----------------- Setup Task Coordinator -----------------// + + // Task coordinator is responsible for considering task's characteristics + // and publish it to task queue using task publisher. currently we support + // destination type coordinator which means every task with specific destination type + // will be published to its corresponding task publisher. + + taskPublisherCnf := config.C().Destination.RabbitMQTaskManagerConnection + webhookTaskPublisher := rabbitmqtaskmanager.NewTaskPublisher(taskPublisherCnf, "webhook_tasks_queue") - // todo maybe it is better to having configs for setup of task coordinator. + taskPublishers := make(dtcoordinator.TaskPublisherMap) + taskPublishers[entity.WebhookDestinationType] = webhookTaskPublisher - rmqTaskManagerConnConfig := config.C().Destination.RabbitMQTaskManagerConnection - coordinator := dtcoordinator.New(taskService, rmqTaskManagerConnConfig) + coordinator := dtcoordinator.New(taskPublishers) cErr := coordinator.Start(processedEvents, done, &wg) if cErr != nil { log.Panicf("Error on starting destination type coordinator.") } - //----- Handling graceful shutdown -----// + //----------------- Handling graceful shutdown -----------------// quit := make(chan os.Signal, 1) signal.Notify(quit, os.Interrupt) <-quit - fmt.Println("Received interrupt signal, shutting down gracefully...") + slog.Info("Received interrupt signal, shutting down gracefully...") done <- true close(done) diff --git a/config.yml b/config.yml index 2e7ff682..9aafecce 100644 --- a/config.yml +++ b/config.yml @@ -9,6 +9,10 @@ redis: host: localhost db: 0 password: "" +etcd: + port: 2379 + host: localhost + dial_timeout: 5 destination: debug_mode: true consumer_topic: "pe.#" # pe stands for processed event. and # substitute for zero or more words. diff --git a/config/config.go b/config/config.go index 619cb6b9..06084de3 100644 --- a/config/config.go +++ b/config/config.go @@ -1,6 +1,7 @@ package config import ( + "github.com/ormushq/ormus/adapter/etcd" "github.com/ormushq/ormus/adapter/redis" "github.com/ormushq/ormus/destination/dconfig" "github.com/ormushq/ormus/manager" @@ -8,8 +9,9 @@ import ( ) type Config struct { - Manager manager.Config `koanf:"manager"` Redis redis.Config `koanf:"redis"` + Etcd etcd.Config `koanf:"etcd"` + Manager manager.Config `koanf:"manager"` Source source.Config `koanf:"source"` Destination dconfig.Config `koanf:"destination"` } diff --git a/destination/entity/taskentity/task.go b/destination/entity/taskentity/task.go index db36823a..e59ae42b 100644 --- a/destination/entity/taskentity/task.go +++ b/destination/entity/taskentity/task.go @@ -6,6 +6,7 @@ import ( "time" "github.com/ormushq/ormus/event" + "github.com/ormushq/ormus/manager/entity" ) type IntegrationDeliveryStatus uint8 @@ -20,13 +21,13 @@ const ( // Task represents a delivering processed event to corresponding third party integrations. type Task struct { - ID string - IntegrationDeliveryStatus IntegrationDeliveryStatus - Attempts uint8 - FailedReason *string - ProcessedEvent event.ProcessedEvent - CreatedAt time.Time - UpdatedAt time.Time + ID string + DeliveryStatus IntegrationDeliveryStatus + Attempts uint8 + FailedReason *string + ProcessedEvent event.ProcessedEvent + CreatedAt time.Time + UpdatedAt time.Time } func (t IntegrationDeliveryStatus) String() string { @@ -51,15 +52,16 @@ func (t IntegrationDeliveryStatus) IsBroadcast() bool { func MakeTaskUsingProcessedEvent(pe event.ProcessedEvent) Task { return Task{ - ID: pe.ID(), - IntegrationDeliveryStatus: NotExecutedTaskStatus, - Attempts: 0, - CreatedAt: time.Time{}, - UpdatedAt: time.Time{}, + ID: pe.ID(), + ProcessedEvent: pe, + DeliveryStatus: NotExecutedTaskStatus, + Attempts: 0, + CreatedAt: time.Time{}, + UpdatedAt: time.Time{}, } } -func (t Task) DestinationSlug() string { +func (t Task) DestinationSlug() entity.DestinationType { return t.ProcessedEvent.Integration.Metadata.Slug } diff --git a/destination/taskcoordinator/adapter/dtcoordinator/adapter.go b/destination/taskcoordinator/adapter/dtcoordinator/adapter.go index 0eff9609..073c5b03 100644 --- a/destination/taskcoordinator/adapter/dtcoordinator/adapter.go +++ b/destination/taskcoordinator/adapter/dtcoordinator/adapter.go @@ -2,36 +2,23 @@ package dtcoordinator import ( "fmt" - "log" "log/slog" "sync" - "github.com/ormushq/ormus/destination/dconfig" - "github.com/ormushq/ormus/destination/taskdelivery/adapters/fakedeliveryhandler" "github.com/ormushq/ormus/destination/taskmanager" - "github.com/ormushq/ormus/destination/taskmanager/adapter/rabbitmqtaskmanager" - "github.com/ormushq/ormus/destination/taskservice" "github.com/ormushq/ormus/event" + "github.com/ormushq/ormus/manager/entity" ) -// DestinationTypeCoordinator is responsible for setup task managers and publish incoming processed events using suitable task publishers. +type TaskPublisherMap map[entity.DestinationType]taskmanager.Publisher + type DestinationTypeCoordinator struct { - TaskService taskservice.Service - TaskPublishers map[string]taskmanager.Publisher - RabbitMQConnectionConfig dconfig.RabbitMQTaskManagerConnection + TaskPublishers TaskPublisherMap } -func New(ts taskservice.Service, rmqCnf dconfig.RabbitMQTaskManagerConnection) DestinationTypeCoordinator { - // Create RabbitMQ task manager for webhook events - rmqTaskManagerForWebhooks := rabbitmqtaskmanager.NewTaskManager(rmqCnf, "webhook_tasks_queue") - - taskPublishers := make(map[string]taskmanager.Publisher) - taskPublishers["webhook"] = rmqTaskManagerForWebhooks - +func New(taskPublishers TaskPublisherMap) DestinationTypeCoordinator { return DestinationTypeCoordinator{ - TaskService: ts, - TaskPublishers: taskPublishers, - RabbitMQConnectionConfig: rmqCnf, + TaskPublishers: taskPublishers, } } @@ -39,7 +26,7 @@ func (d DestinationTypeCoordinator) Start(processedEvents <-chan event.Processed wg.Add(1) go func() { defer wg.Done() - slog.Info("Start task coordinator [DestinationType].") + slog.Info("Starting destination type task coordinator.") for { select { @@ -54,7 +41,7 @@ func (d DestinationTypeCoordinator) Start(processedEvents <-chan event.Processed pErr := taskPublisher.Publish(pe) if pErr != nil { - fmt.Println(pErr) + slog.Error(fmt.Sprintf("Error on publishing event : %s", pErr)) break } @@ -66,22 +53,5 @@ func (d DestinationTypeCoordinator) Start(processedEvents <-chan event.Processed } }() - webhookTaskConsumer := rabbitmqtaskmanager.NewTaskConsumer(d.RabbitMQConnectionConfig, "webhook_tasks_queue") - - // Run workers - // todo we can use loop in range of slices of workers. - // also we can use config for number of each worker for different destination types. - - taskDeliveryHandler := fakedeliveryhandler.New() - taskHandler := taskservice.NewTaskHandler(d.TaskService, taskDeliveryHandler) - - webhookWorker1 := rabbitmqtaskmanager.NewWorker(webhookTaskConsumer, taskHandler) - err := webhookWorker1.Run(done, wg) - if err != nil { - log.Panicf("%s: %s", "Error on webhook worker", err) - - return err - } - return nil } diff --git a/destination/taskcoordinator/coordinator.go b/destination/taskcoordinator/coordinator.go index fb41136d..6458a93f 100644 --- a/destination/taskcoordinator/coordinator.go +++ b/destination/taskcoordinator/coordinator.go @@ -1,4 +1,4 @@ -package taskrouter +package taskcoordinator import ( "sync" @@ -6,7 +6,6 @@ import ( "github.com/ormushq/ormus/event" ) -// Coordinator is responsible for setup task managers and publish coming process events using suitable task publishers. type Coordinator interface { - Start(processedEvents <-chan *event.ProcessedEvent, done <-chan bool, wg *sync.WaitGroup) + Start(processedEvents <-chan event.ProcessedEvent, done <-chan bool, wg *sync.WaitGroup) } diff --git a/destination/taskdelivery/adapters/fakedeliveryhandler/handler.go b/destination/taskdelivery/adapters/fakedeliveryhandler/handler.go index 04fc62e5..608db2e3 100644 --- a/destination/taskdelivery/adapters/fakedeliveryhandler/handler.go +++ b/destination/taskdelivery/adapters/fakedeliveryhandler/handler.go @@ -7,7 +7,6 @@ import ( "github.com/ormushq/ormus/destination/entity/taskentity" "github.com/ormushq/ormus/destination/taskdelivery/param" - "github.com/ormushq/ormus/event" ) type FakeHandler struct{} @@ -18,10 +17,10 @@ func New() *FakeHandler { const fakeProcessingTimeSecond = 2 -func (h FakeHandler) Handle(t taskentity.Task, _ event.ProcessedEvent) (param.DeliveryTaskResponse, error) { +func (h FakeHandler) Handle(t taskentity.Task) (param.DeliveryTaskResponse, error) { time.Sleep(fakeProcessingTimeSecond * time.Second) - slog.Info(fmt.Sprintf("Task [%s] handled successfully!", t.ID)) + slog.Info(fmt.Sprintf("Task [%s] handled successfully! ✅ ", t.ID)) res := param.DeliveryTaskResponse{ Attempts: 1, diff --git a/destination/taskdelivery/adapters/webhookdeliveryhandler/handler.go b/destination/taskdelivery/adapters/webhookdeliveryhandler/handler.go index 90fa8c7d..6ad31256 100644 --- a/destination/taskdelivery/adapters/webhookdeliveryhandler/handler.go +++ b/destination/taskdelivery/adapters/webhookdeliveryhandler/handler.go @@ -3,18 +3,14 @@ package webhookdeliveryhandler import ( "github.com/ormushq/ormus/destination/entity/taskentity" "github.com/ormushq/ormus/destination/taskdelivery/param" - "github.com/ormushq/ormus/event" ) type WebhookHandler struct{} -func (h WebhookHandler) Handle(t taskentity.Task, pe event.ProcessedEvent) (param.DeliveryTaskResponse, error) { +func (h WebhookHandler) Handle(t taskentity.Task) (param.DeliveryTaskResponse, error) { // todo webhook handler is responsible for sending processed event to url of webhook and making DeliveryTaskResponse // todo webhook handler should consider max_retry_exceeded and other necessary policies in e.Integration.Config - // get configs from processed event - println(pe.Integration.Config) - res := param.DeliveryTaskResponse{ FailedReason: nil, DeliveryStatus: taskentity.SuccessTaskStatus, diff --git a/destination/taskdelivery/delivery_handler.go b/destination/taskdelivery/delivery_handler.go index fbc2bf42..e51d4a4c 100644 --- a/destination/taskdelivery/delivery_handler.go +++ b/destination/taskdelivery/delivery_handler.go @@ -3,10 +3,9 @@ package taskdelivery import ( "github.com/ormushq/ormus/destination/entity/taskentity" "github.com/ormushq/ormus/destination/taskdelivery/param" - "github.com/ormushq/ormus/event" ) -// DeliveryHandler is responsible for delivering event to third party destinations. +// DeliveryHandler is responsible for delivering processed event to third party destinations. type DeliveryHandler interface { - Handle(task taskentity.Task, processedEvent event.ProcessedEvent) (param.DeliveryTaskResponse, error) + Handle(task taskentity.Task) (param.DeliveryTaskResponse, error) } diff --git a/destination/taskdelivery/delivery_mapper.go b/destination/taskdelivery/delivery_mapper.go new file mode 100644 index 00000000..dbfb4d92 --- /dev/null +++ b/destination/taskdelivery/delivery_mapper.go @@ -0,0 +1,10 @@ +package taskdelivery + +import "github.com/ormushq/ormus/manager/entity" + +var Mapper = make(map[entity.DestinationType]DeliveryHandler) + +// Register registers a new delivery handler for a destination type. +func Register(destinationType entity.DestinationType, dh DeliveryHandler) { + Mapper[destinationType] = dh +} diff --git a/destination/taskdelivery/param/handle_task.go b/destination/taskdelivery/param/task_delivery.go similarity index 100% rename from destination/taskdelivery/param/handle_task.go rename to destination/taskdelivery/param/task_delivery.go diff --git a/destination/taskmanager/adapter/inmemorytaskmanager/task_manager.go b/destination/taskmanager/adapter/inmemorytaskmanager/task_manager.go index cd14fd03..b6a2b74c 100644 --- a/destination/taskmanager/adapter/inmemorytaskmanager/task_manager.go +++ b/destination/taskmanager/adapter/inmemorytaskmanager/task_manager.go @@ -1,7 +1,7 @@ package inmemorytaskmanager import ( - "fmt" + "log/slog" "github.com/ormushq/ormus/event" ) @@ -22,7 +22,7 @@ func (tm *TaskManager) Publish(e event.ProcessedEvent) error { // send task to queue err := tm.Queue.Enqueue(e) if err != nil { - fmt.Println("enqueue Error : ", err) + slog.Error("enqueue Error : ", err) return err } diff --git a/destination/taskmanager/adapter/rabbitmqtaskmanager/queue.go b/destination/taskmanager/adapter/rabbitmqtaskmanager/queue.go index e5de47c2..ae112ad7 100644 --- a/destination/taskmanager/adapter/rabbitmqtaskmanager/queue.go +++ b/destination/taskmanager/adapter/rabbitmqtaskmanager/queue.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "log" + "log/slog" "time" "github.com/ormushq/ormus/destination/dconfig" @@ -50,11 +51,10 @@ func (q *Queue) Enqueue(pe event.ProcessedEvent) error { nil, // arguments ) failOnError(err, "Failed to declare a queue") - // Convert Processed event to json jsonEvent, err := json.Marshal(pe) if err != nil { - fmt.Println("Error:", err) + slog.Error("Error:", err) return err } diff --git a/destination/taskmanager/adapter/rabbitmqtaskmanager/task_consumer.go b/destination/taskmanager/adapter/rabbitmqtaskmanager/task_consumer.go index e1d6485c..a4bea50e 100644 --- a/destination/taskmanager/adapter/rabbitmqtaskmanager/task_consumer.go +++ b/destination/taskmanager/adapter/rabbitmqtaskmanager/task_consumer.go @@ -2,6 +2,7 @@ package rabbitmqtaskmanager import ( "fmt" + "log" "sync" "github.com/ormushq/ormus/destination/dconfig" @@ -95,3 +96,13 @@ func (c Consumer) Consume(done <-chan bool, wg *sync.WaitGroup) (<-chan event.Pr return eventsChannel, nil } + +func panicOnWorkersError(err error, msg string) { + if err != nil { + log.Panicf("%s: %s", msg, err) + } +} + +func printWorkersError(err error, msg string) { + log.Printf("%s: %s", msg, err) +} diff --git a/destination/taskmanager/adapter/rabbitmqtaskmanager/task_manager.go b/destination/taskmanager/adapter/rabbitmqtaskmanager/task_manager.go index 15c94072..eeef3bf4 100644 --- a/destination/taskmanager/adapter/rabbitmqtaskmanager/task_manager.go +++ b/destination/taskmanager/adapter/rabbitmqtaskmanager/task_manager.go @@ -5,19 +5,19 @@ import ( "github.com/ormushq/ormus/event" ) -type TaskManager struct { +type TaskPublisher struct { queue *Queue } -func NewTaskManager(c dconfig.RabbitMQTaskManagerConnection, queueName string) *TaskManager { +func NewTaskPublisher(c dconfig.RabbitMQTaskManagerConnection, queueName string) *TaskPublisher { q := newQueue(c, queueName) - return &TaskManager{ + return &TaskPublisher{ queue: q, } } -func (tm *TaskManager) Publish(pe event.ProcessedEvent) error { +func (tm *TaskPublisher) Publish(pe event.ProcessedEvent) error { err := tm.queue.Enqueue(pe) if err != nil { return err diff --git a/destination/taskservice/adapter/repository/inmemorytaskrepo/repo.go b/destination/taskservice/adapter/repository/inmemorytaskrepo/repo.go index 53a09691..fb3f6402 100644 --- a/destination/taskservice/adapter/repository/inmemorytaskrepo/repo.go +++ b/destination/taskservice/adapter/repository/inmemorytaskrepo/repo.go @@ -28,7 +28,7 @@ func (db DB) UpsertTask(taskID string, request param.UpsertTaskRequest) error { } db.tasks[taskID] = task } - task.IntegrationDeliveryStatus = request.IntegrationDeliveryStatus + task.DeliveryStatus = request.IntegrationDeliveryStatus task.Attempts = request.Attempts task.FailedReason = request.FailedReason diff --git a/destination/taskservice/task_handler.go b/destination/taskservice/handle_task.go similarity index 52% rename from destination/taskservice/task_handler.go rename to destination/taskservice/handle_task.go index 6263b95d..930a2ef0 100644 --- a/destination/taskservice/task_handler.go +++ b/destination/taskservice/handle_task.go @@ -10,69 +10,65 @@ import ( "github.com/ormushq/ormus/event" ) -type TaskService interface { - GetTaskStatusByID(ctx context.Context, taskID string) (taskentity.IntegrationDeliveryStatus, error) - GetTaskByID(taskID string) (taskentity.Task, error) - UpsertTaskAndSaveIdempotency(ctx context.Context, t taskentity.Task) error -} - -type Handler struct { - DeliveryHandler taskdelivery.DeliveryHandler - TaskService TaskService -} - -func NewTaskHandler(ts TaskService, dh taskdelivery.DeliveryHandler) Handler { - return Handler{ - DeliveryHandler: dh, - TaskService: ts, - } -} - -func (h Handler) HandleTask(ctx context.Context, newEvent event.ProcessedEvent) error { +func (s Service) HandleTask(ctx context.Context, newEvent event.ProcessedEvent) error { var task taskentity.Task var err error var taskStatus taskentity.IntegrationDeliveryStatus - taskID := newEvent.ID() // Get task status using idempotency in the task service. - if taskStatus, err = h.TaskService.GetTaskStatusByID(ctx, taskID); err != nil { + if taskStatus, err = s.GetTaskStatusByID(ctx, taskID); err != nil { // todo use richError return err } + // If task status is not executable, we don't need to do anything. if !taskStatus.CanBeExecuted() { slog.Debug(fmt.Sprintf("Task [%s] has %s status and is not executable", taskID, taskStatus.String())) return nil } + // If task status is broadcast, we need to get task info from repository. if taskStatus.IsBroadcast() { - // Get task info such as attempts, failed reason and... using repository. - task, err = h.TaskService.GetTaskByID(taskID) + task, err = s.GetTaskByID(taskID) + task.ProcessedEvent = newEvent if err != nil { return err } } else { + // If task status is not broadcast, we need to create task using processed event. task = taskentity.MakeTaskUsingProcessedEvent(newEvent) } - // DeliveryHandler is responsible for delivering event to third party destinations. - // DeliveryHandler should consider max_retries base on integration configs. - deliveryResponse, err := h.DeliveryHandler.Handle(task, newEvent) + // DeliveryHandler is responsible for delivering processed event to third party destinations. + destinationType := task.DestinationSlug() + + // Get Delivery handler from task delivery mapper. + deliveryHandler, ok := taskdelivery.Mapper[destinationType] + + fmt.Printf("%v", destinationType) + + if !ok { + return fmt.Errorf("destination type %s is not supported", destinationType) + } + + // Deliver processed event to third party destinations using corresponding delivery handler. + deliveryResponse, err := deliveryHandler.Handle(task) if err != nil { return err } // DeliveryStatus is set in delivery handler in case of success, retriable failed, unretriable failed. - task.IntegrationDeliveryStatus = deliveryResponse.DeliveryStatus + task.DeliveryStatus = deliveryResponse.DeliveryStatus // Attempts is incremented in delivery handler in case of success. task.Attempts = deliveryResponse.Attempts // FailedReason describes what caused the failure. task.FailedReason = deliveryResponse.FailedReason - err = h.TaskService.UpsertTaskAndSaveIdempotency(ctx, task) + err = s.UpsertTaskAndSaveIdempotency(ctx, task) + // dispatch event of success delivery if err != nil { // todo what should we do if error occurs in updating task repo or idempotency ? return err diff --git a/destination/taskservice/service.go b/destination/taskservice/service.go index 02545544..5f7749ef 100644 --- a/destination/taskservice/service.go +++ b/destination/taskservice/service.go @@ -18,19 +18,38 @@ type Idempotency interface { SaveTaskStatus(ctx context.Context, taskID string, status taskentity.IntegrationDeliveryStatus) error } +type Locker interface { + Lock(ctx context.Context, key string, ttl int64) (unlock func() error, err error) +} + type Service struct { idempotency Idempotency repo Repository + locker Locker } -func New(idempotency Idempotency, repo Repository) Service { +func New(idempotency Idempotency, repo Repository, l Locker) Service { return Service{ idempotency: idempotency, repo: repo, + locker: l, } } func (s Service) GetTaskStatusByID(ctx context.Context, taskID string) (taskentity.IntegrationDeliveryStatus, error) { + // Acquire a lock with a 10-second TTL + // todo get lock prefix and ttl from config + lockKey := "task:" + taskID + const ttl = 10 + unlock, err := s.locker.Lock(ctx, lockKey, ttl) + if err != nil { + return taskentity.InvalidTaskStatus, err + } + + defer func() { + err = unlock() + }() + status, err := s.idempotency.GetTaskStatusByID(ctx, taskID) if err != nil { return taskentity.InvalidTaskStatus, err @@ -50,7 +69,7 @@ func (s Service) GetTaskByID(taskID string) (taskentity.Task, error) { func (s Service) UpsertTaskAndSaveIdempotency(ctx context.Context, t taskentity.Task) error { req := param.UpsertTaskRequest{ - IntegrationDeliveryStatus: t.IntegrationDeliveryStatus, + IntegrationDeliveryStatus: t.DeliveryStatus, Attempts: t.Attempts, FailedReason: t.FailedReason, UpdatedAt: time.Now(), @@ -61,7 +80,7 @@ func (s Service) UpsertTaskAndSaveIdempotency(ctx context.Context, t taskentity. return rErr } - iErr := s.idempotency.SaveTaskStatus(ctx, t.ID, t.IntegrationDeliveryStatus) + iErr := s.idempotency.SaveTaskStatus(ctx, t.ID, t.DeliveryStatus) if iErr != nil { // todo is it better to rollback updated task status in idempotency? return iErr diff --git a/destination/taskmanager/adapter/rabbitmqtaskmanager/worker.go b/destination/worker/worker.go similarity index 57% rename from destination/taskmanager/adapter/rabbitmqtaskmanager/worker.go rename to destination/worker/worker.go index 742c09ec..d1882c0a 100644 --- a/destination/taskmanager/adapter/rabbitmqtaskmanager/worker.go +++ b/destination/worker/worker.go @@ -1,37 +1,35 @@ -package rabbitmqtaskmanager +package worker import ( "context" "fmt" - "log" "log/slog" "sync" "time" "github.com/ormushq/ormus/destination/taskmanager" + "github.com/ormushq/ormus/event" ) const timeoutInSeconds = 5 type Worker struct { - TaskConsumer taskmanager.Consumer - TaskHandler taskmanager.TaskHandler + TaskHandler TaskHandler + EventsChannel <-chan event.ProcessedEvent } -func (w *Worker) Run(done <-chan bool, wg *sync.WaitGroup) error { - processedEventsChannel, err := w.TaskConsumer.Consume(done, wg) - if err != nil { - return err - } +type TaskHandler interface { + HandleTask(ctx context.Context, newEvent event.ProcessedEvent) error +} +func (w *Worker) Run(done <-chan bool, wg *sync.WaitGroup) error { wg.Add(1) go func() { defer wg.Done() slog.Info("Start rabbitmq worker for handling tasks.") - for { select { - case newEvent := <-processedEventsChannel: + case newEvent := <-w.EventsChannel: go func() { ctx, cancel := context.WithTimeout(context.Background(), timeoutInSeconds*time.Second) defer cancel() @@ -50,19 +48,9 @@ func (w *Worker) Run(done <-chan bool, wg *sync.WaitGroup) error { return nil } -func NewWorker(c taskmanager.Consumer, th taskmanager.TaskHandler) *Worker { +func NewWorker(events <-chan event.ProcessedEvent, th taskmanager.TaskHandler) *Worker { return &Worker{ - TaskConsumer: c, - TaskHandler: th, + EventsChannel: events, + TaskHandler: th, } } - -func panicOnWorkersError(err error, msg string) { - if err != nil { - log.Panicf("%s: %s", msg, err) - } -} - -func printWorkersError(err error, msg string) { - log.Printf("%s: %s", msg, err) -} diff --git a/docker-compose.yml b/docker-compose.yml index 207e51f8..dda69c7c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -12,5 +12,25 @@ services: volumes: - rabbitmq_data:/var/lib/rabbitmq # Persist data + redis: + image: redis:7.2.4 + container_name: ormus-redis + ports: + - "6379:6379" + volumes: + - redis_data:/data + + etcd: + image: quay.io/coreos/etcd:v3.5.13 + container_name: etcd + command: /usr/local/bin/etcd -name etcd0 -advertise-client-urls http://etcd:2379 -listen-client-urls http://0.0.0.0:2379 + ports: + - "2379:2379" + - "2380:2380" + volumes: + - etcd_data:/etcd-data + volumes: + redis_data: + etcd_data: rabbitmq_data: \ No newline at end of file diff --git a/event/processed_event.go b/event/processed_event.go index 8dc35a1a..15681dca 100644 --- a/event/processed_event.go +++ b/event/processed_event.go @@ -35,6 +35,6 @@ func (e ProcessedEvent) ID() string { return e.MessageID + "-" + e.Integration.ID } -func (e ProcessedEvent) DestinationType() string { +func (e ProcessedEvent) DestinationType() entity.DestinationType { return e.Integration.Metadata.Slug } diff --git a/go.mod b/go.mod index b2a9ec42..bd02d14d 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,9 @@ go 1.21.0 require github.com/rabbitmq/amqp091-go v1.9.0 require ( + github.com/coreos/go-semver v0.3.0 // indirect + github.com/coreos/go-systemd/v22 v22.3.2 // indirect + github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/google/go-cmp v0.6.0 // indirect github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect @@ -13,9 +16,15 @@ require ( github.com/kr/pretty v0.3.1 // indirect github.com/rogpeppe/go-internal v1.12.0 // indirect github.com/scylladb/go-reflectx v1.0.1 // indirect + go.etcd.io/etcd/api/v3 v3.5.13 // indirect + go.etcd.io/etcd/client/pkg/v3 v3.5.13 // indirect go.uber.org/atomic v1.7.0 // indirect + go.uber.org/multierr v1.6.0 // indirect + go.uber.org/zap v1.17.0 // indirect golang.org/x/sync v0.6.0 // indirect golang.org/x/tools v0.17.0 // indirect + google.golang.org/genproto v0.0.0-20231120223509-83a465c0220f // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20231106174013-bbf56f31fb17 // indirect gopkg.in/inf.v0 v0.9.1 // indirect ) @@ -29,7 +38,7 @@ require ( github.com/gocql/gocql v1.6.0 github.com/golang-jwt/jwt/v5 v5.2.0 github.com/golang-migrate/migrate/v4 v4.16.2 - github.com/golang/protobuf v1.5.3 // indirect + github.com/golang/protobuf v1.5.4 // indirect github.com/knadh/koanf v1.5.0 github.com/labstack/echo/v4 v4.11.4 github.com/labstack/gommon v0.4.2 // indirect @@ -51,7 +60,7 @@ require ( golang.org/x/text v0.14.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20231127180814-3a041ad873d4 // indirect google.golang.org/grpc v1.59.0 - google.golang.org/protobuf v1.31.0 // indirect + google.golang.org/protobuf v1.33.0 // indirect gopkg.in/natefinch/lumberjack.v2 v2.2.1 gopkg.in/yaml.v3 v3.0.1 // indirect ) @@ -59,4 +68,5 @@ require ( require ( github.com/google/uuid v1.3.1 github.com/streadway/amqp v1.1.0 + go.etcd.io/etcd/client/v3 v3.5.13 ) diff --git a/go.sum b/go.sum index f109b068..1e0c2237 100644 --- a/go.sum +++ b/go.sum @@ -46,7 +46,9 @@ github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XL github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= +github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmfM= github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= +github.com/coreos/go-systemd/v22 v22.3.2 h1:D9/bQk5vlXQFZ6Kwuu6zaiXJ9oTPe68++AzAJc1DzSI= github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -114,8 +116,8 @@ github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= -github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= -github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= @@ -337,13 +339,21 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= go.etcd.io/etcd/api/v3 v3.5.4/go.mod h1:5GB2vv4A4AOn3yk7MftYGHkUfGtDHnEraIjym4dYz5A= +go.etcd.io/etcd/api/v3 v3.5.13 h1:8WXU2/NBge6AUF1K1gOexB6e07NgsN1hXK0rSTtgSp4= +go.etcd.io/etcd/api/v3 v3.5.13/go.mod h1:gBqlqkcMMZMVTMm4NDZloEVJzxQOQIls8splbqBDa0c= go.etcd.io/etcd/client/pkg/v3 v3.5.4/go.mod h1:IJHfcCEKxYu1Os13ZdwCwIUTUVGYTSAM3YSwc9/Ac1g= +go.etcd.io/etcd/client/pkg/v3 v3.5.13 h1:RVZSAnWWWiI5IrYAXjQorajncORbS0zI48LQlE2kQWg= +go.etcd.io/etcd/client/pkg/v3 v3.5.13/go.mod h1:XxHT4u1qU12E2+po+UVPrEeL94Um6zL58ppuJWXSAB8= go.etcd.io/etcd/client/v3 v3.5.4/go.mod h1:ZaRkVgBZC+L+dLCjTcF1hRXpgZXQPOvnA/Ak/gq3kiY= +go.etcd.io/etcd/client/v3 v3.5.13 h1:o0fHTNJLeO0MyVbc7I3fsCf6nrOqn5d+diSarKnB2js= +go.etcd.io/etcd/client/v3 v3.5.13/go.mod h1:cqiAeY8b5DEEcpxvgWKsbLIWNM/8Wy2xJSDMtioMcoI= go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= +go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= +go.uber.org/zap v1.17.0 h1:MTjgFu6ZLKvY6Pvaqk97GlxNBuMpV4Hy/3P6tRGlI2U= go.uber.org/zap v1.17.0/go.mod h1:MXVU+bhUf/A7Xi2HNOnopQOrmycQ5Ih87HtOu4q5SSo= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= @@ -465,6 +475,10 @@ google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98 google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c/go.mod h1:UODoCrxHCcBojKKwX1terBiRUaqAsFqJiF615XL43r0= +google.golang.org/genproto v0.0.0-20231120223509-83a465c0220f h1:Vn+VyHU5guc9KjB5KrjI2q0wCOWEOIh0OEsleqakHJg= +google.golang.org/genproto v0.0.0-20231120223509-83a465c0220f/go.mod h1:nWSwAFPb+qfNJXsoeO3Io7zf4tMSfN8EA8RlDA04GhY= +google.golang.org/genproto/googleapis/api v0.0.0-20231106174013-bbf56f31fb17 h1:JpwMPBpFN3uKhdaekDpiNlImDdkUAyiJ6ez/uxGaUSo= +google.golang.org/genproto/googleapis/api v0.0.0-20231106174013-bbf56f31fb17/go.mod h1:0xJLfVdJqpAPl8tDg1ujOCGzx6LFLttXT5NhllGOXY4= google.golang.org/genproto/googleapis/rpc v0.0.0-20231127180814-3a041ad873d4 h1:DC7wcm+i+P1rN3Ff07vL+OndGg5OhNddHyTA+ocPqYE= google.golang.org/genproto/googleapis/rpc v0.0.0-20231127180814-3a041ad873d4/go.mod h1:eJVxU6o+4G1PSczBr85xmyvSNYAKvAYgkub40YGomFM= google.golang.org/grpc v1.14.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= @@ -488,8 +502,8 @@ google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpAD google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= -google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= +google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/asn1-ber.v1 v1.0.0-20181015200546-f715ec2f112d/go.mod h1:cuepJuh7vyXfUyUwEgHQXw849cJrilpS5NeIjOWESAw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= @@ -508,6 +522,7 @@ gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/manager/entity/integration.go b/manager/entity/integration.go index bb9f2586..64e03ae8 100644 --- a/manager/entity/integration.go +++ b/manager/entity/integration.go @@ -6,6 +6,12 @@ type IntegrationConfig map[string]any type DestinationCategory string +type DestinationType string + +const ( + WebhookDestinationType DestinationType = "webhook" +) + const ( Analytics DestinationCategory = "analytics" Advertising DestinationCategory = "advertising" @@ -41,7 +47,7 @@ type Integration struct { type DestinationMetadata struct { ID string - Name string // Javascript, Google Universal Analytics - Slug string // javascript, google-analytics + Name string // webhook, Google Universal Analytics + Slug DestinationType // webhook, google-analytics Categories []DestinationCategory } diff --git a/pkg/broker/rabbitmq/test/rabbitmq_fanout_test.go b/pkg/broker/rabbitmq/test/rabbitmq_fanout_test.go index 6cfbdaa7..05722d46 100644 --- a/pkg/broker/rabbitmq/test/rabbitmq_fanout_test.go +++ b/pkg/broker/rabbitmq/test/rabbitmq_fanout_test.go @@ -2,10 +2,11 @@ package test import ( "fmt" - "github.com/ormushq/ormus/pkg/broker/messagebroker" - "github.com/ormushq/ormus/pkg/broker/rabbitmq" "testing" "time" + + "github.com/ormushq/ormus/pkg/broker/messagebroker" + "github.com/ormushq/ormus/pkg/broker/rabbitmq" ) // Define a struct to hold parameters for the fanout test case