diff --git a/CHANGELOG.md b/CHANGELOG.md index 8d4e302..a5b7d52 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,5 +25,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Support `Queue` feature with `Sync` and `Redis` drivers. -[1.1.0]: https://github.com/fwidjaya20/symphonic/releases/tag/v1.1.0 +### [1.1.1] - 2023-10-04 + +### Added + +- Support `Queue` feature with `RabbitMQ` driver. + +[1.1.1]: https://github.com/fwidjaya20/symphonic/compare/v1.1.0...v1.1.1 +[1.1.0]: https://github.com/fwidjaya20/symphonic/compare/v1.0.0...v1.1.0 [1.0.0]: https://github.com/fwidjaya20/symphonic/releases/tag/v1.0.0 diff --git a/contracts/event/event.go b/contracts/event/event.go index e223cd5..81519d2 100644 --- a/contracts/event/event.go +++ b/contracts/event/event.go @@ -9,6 +9,7 @@ type Event interface { type Bus interface { OnConnection(connection string) Bus + OnQueue(queueName string) Bus Publish() error } @@ -17,4 +18,5 @@ type Collection = map[string][]Listener type RunEvent struct { Connection string Job Job + QueueName string } diff --git a/contracts/event/queue.go b/contracts/event/queue.go index 6b90210..f52b3b9 100644 --- a/contracts/event/queue.go +++ b/contracts/event/queue.go @@ -12,10 +12,12 @@ type DriverArgs struct { Job Job Listeners []Listener Logger ContractLog.Logger + QueueName string } type QueueDriver interface { Driver() string Publish() error Subscribe(c context.Context) error + Flush() error } diff --git a/event/application.go b/event/application.go index 867f0b9..93705ae 100644 --- a/event/application.go +++ b/event/application.go @@ -40,6 +40,7 @@ func (a *Application) Run(config event.RunEvent) error { Job: config.Job, Listeners: a.Collection()[config.Job.Signature()], Logger: a.logger, + QueueName: config.QueueName, }) return driver.Subscribe(context.Background()) diff --git a/event/bus.go b/event/bus.go index e3bf5c4..4218802 100644 --- a/event/bus.go +++ b/event/bus.go @@ -18,6 +18,7 @@ type Bus struct { listeners []event.Listener locker sync.Mutex logger log.Logger + queueName string } func NewEventBus(config config.Config, event event.Job, listeners []event.Listener, logger log.Logger) event.Bus { @@ -44,6 +45,11 @@ func (b *Bus) OnConnection(driver string) event.Bus { return b } +func (b *Bus) OnQueue(queueName string) event.Bus { + b.queueName = queueName + return b +} + func (b *Bus) Publish() error { b.locker.Lock() defer b.locker.Unlock() @@ -70,5 +76,6 @@ func (b *Bus) determineDriver(driver string) { Job: b.event, Listeners: b.listeners, Logger: b.logger, + QueueName: b.queueName, }) } diff --git a/event/factory.go b/event/factory.go index d020cfb..bb2ab77 100644 --- a/event/factory.go +++ b/event/factory.go @@ -4,6 +4,8 @@ import "github.com/fwidjaya20/symphonic/contracts/event" func GetQueueDriver(driver string, args event.DriverArgs) event.QueueDriver { switch driver { + case DriverRabbitMQ: + return NewRabbitMQDriver(args) case DriverRedis: return NewRedisDriver(args) default: diff --git a/event/rabbitmq_driver.go b/event/rabbitmq_driver.go new file mode 100644 index 0000000..ff884ec --- /dev/null +++ b/event/rabbitmq_driver.go @@ -0,0 +1,166 @@ +package event + +import ( + "context" + "encoding/json" + "fmt" + "reflect" + + ContractEvent "github.com/fwidjaya20/symphonic/contracts/event" + "github.com/rabbitmq/amqp091-go" +) + +type RabbitMQDriver struct { + ContractEvent.DriverArgs + + channel *amqp091.Channel + connection *amqp091.Connection +} + +func NewRabbitMQDriver(args ContractEvent.DriverArgs) ContractEvent.QueueDriver { + addr := fmt.Sprintf( + "%s://%s:%s@%s:%s/", + args.Config.GetString("queue.connections.rabbitmq.protocol"), + args.Config.GetString("queue.connections.rabbitmq.username"), + args.Config.GetString("queue.connections.rabbitmq.password"), + args.Config.GetString("queue.connections.rabbitmq.host"), + args.Config.GetString("queue.connections.rabbitmq.port"), + ) + + connection, err := amqp091.Dial(addr) + if nil != err { + args.Logger.Errorf("RabbitMQ connection failed. %v\n", err.Error()) + } + + channel, err := connection.Channel() + if nil != err { + args.Logger.Errorf("RabbitMQ channel failed. %v\n", err.Error()) + } + + err = channel.ExchangeDeclare( + args.Job.Signature(), + amqp091.ExchangeFanout, + true, + false, + false, + false, + nil, + ) + if nil != err { + args.Logger.Errorf("failed to declare '%s' exchange. %v", err.Error()) + } + + return &RabbitMQDriver{ + DriverArgs: args, + channel: channel, + connection: connection, + } +} + +func (r *RabbitMQDriver) Driver() string { + return DriverRabbitMQ +} + +func (r *RabbitMQDriver) Publish() error { + var err error + + payload, err := json.Marshal(r.Job.GetPayload()) + if nil != err { + r.Logger.Errorf("failed to marshal Job payload. %v", err.Error()) + return err + } + + err = r.channel.PublishWithContext( + context.Background(), + r.Job.Signature(), + "", + false, + false, + amqp091.Publishing{ + ContentType: "application/json", + Body: payload, + }, + ) + if nil != err { + r.Logger.Errorf("failed to publish '%s'. %v", r.Job.Signature(), err.Error()) + return err + } + + return nil +} + +func (r *RabbitMQDriver) Subscribe(c context.Context) error { + queue, err := r.channel.QueueDeclare( + r.QueueName, + true, + false, + false, + false, + nil, + ) + if nil != err { + r.Logger.Errorf("failed to declare '%s' queue. %v", r.QueueName, err.Error()) + return err + } + + err = r.channel.QueueBind( + queue.Name, + "", + r.Job.Signature(), + false, + nil, + ) + if nil != err { + r.Logger.Errorf("failed to bind '%s' to '%s' queue. %v", r.Job.Signature(), r.QueueName, err.Error()) + return err + } + + messages, err := r.channel.ConsumeWithContext( + c, + r.QueueName, + "", + true, + false, + false, + false, + nil, + ) + if nil != err { + r.Logger.Errorf("failed to consume messages on '%s' queue. %v", r.QueueName, err.Error()) + } + + var forever chan struct{} + + go func() { + for msg := range messages { + jobInstance := reflect.New(reflect.TypeOf(r.Job)).Interface().(ContractEvent.Job) + + if err = json.Unmarshal([]byte(msg.Body), jobInstance); nil != err { + r.Logger.Infof("error unmarshalling payload: %v\n", err.Error()) + continue + } + + for _, listener := range r.Listeners { + if err = listener.Handle(jobInstance); nil != err { + r.Logger.Errorf("error calling Handle method: %v\n", err.Error()) + } + } + } + }() + + <-forever + + return nil +} + +func (r *RabbitMQDriver) Flush() error { + if err := r.channel.Close(); nil != err { + r.Logger.Errorf("failed to close channel. %v", err.Error()) + return err + } + if err := r.connection.Close(); nil != err { + r.Logger.Errorf("failed to close connection. %v", err.Error()) + return err + } + return nil +} diff --git a/event/redis_driver.go b/event/redis_driver.go index 445ddae..a213bd9 100644 --- a/event/redis_driver.go +++ b/event/redis_driver.go @@ -73,3 +73,7 @@ func (r *RedisDriver) Subscribe(c context.Context) error { } } } + +func (r *RedisDriver) Flush() error { + return r.connection.Close() +} diff --git a/event/sync_driver.go b/event/sync_driver.go index 307234b..1006355 100644 --- a/event/sync_driver.go +++ b/event/sync_driver.go @@ -50,3 +50,7 @@ func (d *SyncDriver) Subscribe(c context.Context) error { d.Logger.Infof("Running the Sync Driver explicitly is unnecessary and could potentially disrupt system operations.") return nil } + +func (d *SyncDriver) Flush() error { + return nil +} diff --git a/example/event/events.go b/example/event/events.go index 577cbcf..b579499 100644 --- a/example/event/events.go +++ b/example/event/events.go @@ -3,9 +3,9 @@ package event import "time" type PostCreated struct { - Id int64 - Author string - CreatedAt time.Time + Id int64 `json:"id"` + Author string `json:"author"` + CreatedAt time.Time `json:"created_at"` } func (pc PostCreated) Signature() string { diff --git a/example/queue.go b/example/queue.go index 113dacb..844b368 100644 --- a/example/queue.go +++ b/example/queue.go @@ -32,8 +32,14 @@ func main() { facades.Config().Add("queue", map[string]any{ "connections": map[string]any{ - "kafka": map[string]any{}, - "rabbitmq": map[string]any{}, + "kafka": map[string]any{}, + "rabbitmq": map[string]any{ + "protocol": "amqp", + "username": "guest", + "password": "guest", + "host": "localhost", + "port": "5672", + }, "redis": map[string]any{ "host": facades.Config().Get("database.connections.redis.host"), "port": facades.Config().Get("database.connections.redis.port"), @@ -50,6 +56,18 @@ func main() { facades.Event().Register(kernel.Listen()) + go func() { + if err := facades.Event().Run(ContractEvent.RunEvent{ + Connection: event.DriverRedis, + Job: ExampleEvent.PostCreated{}, + QueueName: "symphonic-example-queue", + }); nil != err { + SysLog.Fatalln(err.Error()) + } + }() + + time.Sleep(5 * time.Second) + go func() { for i := 1; i <= 3; i++ { time.Sleep(1 * time.Second) @@ -57,40 +75,11 @@ func main() { Id: int64(i), Author: "Fredrick Widjaya", CreatedAt: time.Now(), - }).OnConnection("redis").Publish(); nil != err { + }).OnConnection(event.DriverRedis).OnQueue("testing-queue").Publish(); nil != err { SysLog.Fatalln(err.Error()) } } }() - go func() { - if err := facades.Event().Run(ContractEvent.RunEvent{ - Connection: event.DriverRedis, - Job: ExampleEvent.PostCreated{}, - }); nil != err { - SysLog.Fatalln(err.Error()) - } - }() - - //go func() { - // client := redis.NewClient(&redis.Options{ - // Addr: "localhost:6379", // Update with your Redis server address - // }) - // - // pubsub := client.Subscribe(context.Background(), ExampleEvent.PostCreated{}.Signature()) - // defer pubsub.Close() - // - // for { - // msg, err := pubsub.ReceiveMessage(context.Background()) - // if err != nil { - // fmt.Printf("Error receiving message: %v\n", err) - // continue - // } - // - // // Process the message here - // fmt.Printf("Received: %s\n", msg.Payload) - // } - //}() - select {} } diff --git a/go.mod b/go.mod index f8c706a..7f0b6d5 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,8 @@ require ( github.com/golang-migrate/migrate/v4 v4.16.2 github.com/golang-module/carbon/v2 v2.2.6 github.com/gookit/color v1.5.4 + github.com/rabbitmq/amqp091-go v1.9.0 + github.com/redis/go-redis/v9 v9.2.0 github.com/robfig/cron/v3 v3.0.0 github.com/sirupsen/logrus v1.9.3 github.com/spf13/cast v1.5.1 @@ -25,7 +27,6 @@ require ( github.com/magiconair/properties v1.8.7 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/pelletier/go-toml/v2 v2.0.8 // indirect - github.com/redis/go-redis/v9 v9.2.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/spf13/afero v1.9.5 // indirect github.com/spf13/jwalterweatherman v1.1.0 // indirect diff --git a/go.sum b/go.sum index a3f0597..aacdf0b 100644 --- a/go.sum +++ b/go.sum @@ -42,6 +42,10 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/Microsoft/go-winio v0.6.1 h1:9/kr64B9VUZrLm5YYwbGtUJnMgqWVOdUAXu6Migciow= github.com/Microsoft/go-winio v0.6.1/go.mod h1:LRdKpFKfdobln8UmuiYcKPot9D2v6svN5+sAH+4kjUM= +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= @@ -190,6 +194,8 @@ github.com/pkg/sftp v1.13.1/go.mod h1:3HaPG6Dq1ILlpPZRO0HVMrsydcdLt6HRDccSgb87qR github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/rabbitmq/amqp091-go v1.9.0 h1:qrQtyzB4H8BQgEuJwhmVQqVHB9O4+MNDJCCAcpc3Aoo= +github.com/rabbitmq/amqp091-go v1.9.0/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc= github.com/redis/go-redis/v9 v9.2.0 h1:zwMdX0A4eVzse46YN18QhuDiM4uf3JmkOB4VZrdt5uI= github.com/redis/go-redis/v9 v9.2.0/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= github.com/robfig/cron/v3 v3.0.0 h1:kQ6Cb7aHOHTSzNVNEhmp8EcWKLb4CbiMW9h9VyIhO4E= @@ -244,6 +250,8 @@ go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= +go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=