Skip to content

Commit

Permalink
adding RabbitMQ as Queue driver
Browse files Browse the repository at this point in the history
  • Loading branch information
fwidjaya20 committed Oct 4, 2023
1 parent 2daaf6a commit fb7263c
Show file tree
Hide file tree
Showing 13 changed files with 230 additions and 37 deletions.
9 changes: 8 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- Support `Queue` feature with `Sync` and `Redis` drivers.

[1.1.0]: https://github.com/fwidjaya20/symphonic/releases/tag/v1.1.0
### [1.1.1] - 2023-10-04

### Added

- Support `Queue` feature with `RabbitMQ` driver.

[1.1.1]: https://github.com/fwidjaya20/symphonic/compare/v1.1.0...v1.1.1
[1.1.0]: https://github.com/fwidjaya20/symphonic/compare/v1.0.0...v1.1.0
[1.0.0]: https://github.com/fwidjaya20/symphonic/releases/tag/v1.0.0
2 changes: 2 additions & 0 deletions contracts/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ type Event interface {

type Bus interface {
OnConnection(connection string) Bus
OnQueue(queueName string) Bus
Publish() error
}

Expand All @@ -17,4 +18,5 @@ type Collection = map[string][]Listener
type RunEvent struct {
Connection string
Job Job
QueueName string
}
2 changes: 2 additions & 0 deletions contracts/event/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ type DriverArgs struct {
Job Job
Listeners []Listener
Logger ContractLog.Logger
QueueName string
}

type QueueDriver interface {
Driver() string
Publish() error
Subscribe(c context.Context) error
Flush() error
}
1 change: 1 addition & 0 deletions event/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func (a *Application) Run(config event.RunEvent) error {
Job: config.Job,
Listeners: a.Collection()[config.Job.Signature()],
Logger: a.logger,
QueueName: config.QueueName,
})

return driver.Subscribe(context.Background())
Expand Down
7 changes: 7 additions & 0 deletions event/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type Bus struct {
listeners []event.Listener
locker sync.Mutex
logger log.Logger
queueName string
}

func NewEventBus(config config.Config, event event.Job, listeners []event.Listener, logger log.Logger) event.Bus {
Expand All @@ -44,6 +45,11 @@ func (b *Bus) OnConnection(driver string) event.Bus {
return b
}

func (b *Bus) OnQueue(queueName string) event.Bus {
b.queueName = queueName
return b
}

func (b *Bus) Publish() error {
b.locker.Lock()
defer b.locker.Unlock()
Expand All @@ -70,5 +76,6 @@ func (b *Bus) determineDriver(driver string) {
Job: b.event,
Listeners: b.listeners,
Logger: b.logger,
QueueName: b.queueName,
})
}
2 changes: 2 additions & 0 deletions event/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import "github.com/fwidjaya20/symphonic/contracts/event"

func GetQueueDriver(driver string, args event.DriverArgs) event.QueueDriver {
switch driver {
case DriverRabbitMQ:
return NewRabbitMQDriver(args)
case DriverRedis:
return NewRedisDriver(args)
default:
Expand Down
166 changes: 166 additions & 0 deletions event/rabbitmq_driver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package event

import (
"context"
"encoding/json"
"fmt"
"reflect"

ContractEvent "github.com/fwidjaya20/symphonic/contracts/event"
"github.com/rabbitmq/amqp091-go"
)

type RabbitMQDriver struct {
ContractEvent.DriverArgs

channel *amqp091.Channel
connection *amqp091.Connection
}

func NewRabbitMQDriver(args ContractEvent.DriverArgs) ContractEvent.QueueDriver {
addr := fmt.Sprintf(
"%s://%s:%s@%s:%s/",
args.Config.GetString("queue.connections.rabbitmq.protocol"),
args.Config.GetString("queue.connections.rabbitmq.username"),
args.Config.GetString("queue.connections.rabbitmq.password"),
args.Config.GetString("queue.connections.rabbitmq.host"),
args.Config.GetString("queue.connections.rabbitmq.port"),
)

connection, err := amqp091.Dial(addr)
if nil != err {
args.Logger.Errorf("RabbitMQ connection failed. %v\n", err.Error())
}

channel, err := connection.Channel()
if nil != err {
args.Logger.Errorf("RabbitMQ channel failed. %v\n", err.Error())
}

err = channel.ExchangeDeclare(
args.Job.Signature(),
amqp091.ExchangeFanout,
true,
false,
false,
false,
nil,
)
if nil != err {
args.Logger.Errorf("failed to declare '%s' exchange. %v", err.Error())
}

return &RabbitMQDriver{
DriverArgs: args,
channel: channel,
connection: connection,
}
}

func (r *RabbitMQDriver) Driver() string {
return DriverRabbitMQ
}

func (r *RabbitMQDriver) Publish() error {
var err error

payload, err := json.Marshal(r.Job.GetPayload())
if nil != err {
r.Logger.Errorf("failed to marshal Job payload. %v", err.Error())
return err
}

err = r.channel.PublishWithContext(
context.Background(),
r.Job.Signature(),
"",
false,
false,
amqp091.Publishing{
ContentType: "application/json",
Body: payload,
},
)
if nil != err {
r.Logger.Errorf("failed to publish '%s'. %v", r.Job.Signature(), err.Error())
return err
}

return nil
}

func (r *RabbitMQDriver) Subscribe(c context.Context) error {
queue, err := r.channel.QueueDeclare(
r.QueueName,
true,
false,
false,
false,
nil,
)
if nil != err {
r.Logger.Errorf("failed to declare '%s' queue. %v", r.QueueName, err.Error())
return err
}

err = r.channel.QueueBind(
queue.Name,
"",
r.Job.Signature(),
false,
nil,
)
if nil != err {
r.Logger.Errorf("failed to bind '%s' to '%s' queue. %v", r.Job.Signature(), r.QueueName, err.Error())
return err
}

messages, err := r.channel.ConsumeWithContext(
c,
r.QueueName,
"",
true,
false,
false,
false,
nil,
)
if nil != err {
r.Logger.Errorf("failed to consume messages on '%s' queue. %v", r.QueueName, err.Error())
}

var forever chan struct{}

go func() {
for msg := range messages {
jobInstance := reflect.New(reflect.TypeOf(r.Job)).Interface().(ContractEvent.Job)

if err = json.Unmarshal([]byte(msg.Body), jobInstance); nil != err {
r.Logger.Infof("error unmarshalling payload: %v\n", err.Error())
continue
}

for _, listener := range r.Listeners {
if err = listener.Handle(jobInstance); nil != err {
r.Logger.Errorf("error calling Handle method: %v\n", err.Error())
}
}
}
}()

<-forever

return nil
}

func (r *RabbitMQDriver) Flush() error {
if err := r.channel.Close(); nil != err {
r.Logger.Errorf("failed to close channel. %v", err.Error())
return err
}
if err := r.connection.Close(); nil != err {
r.Logger.Errorf("failed to close connection. %v", err.Error())
return err
}
return nil
}
4 changes: 4 additions & 0 deletions event/redis_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,7 @@ func (r *RedisDriver) Subscribe(c context.Context) error {
}
}
}

func (r *RedisDriver) Flush() error {
return r.connection.Close()
}
4 changes: 4 additions & 0 deletions event/sync_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,7 @@ func (d *SyncDriver) Subscribe(c context.Context) error {
d.Logger.Infof("Running the Sync Driver explicitly is unnecessary and could potentially disrupt system operations.")
return nil
}

func (d *SyncDriver) Flush() error {
return nil
}
6 changes: 3 additions & 3 deletions example/event/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package event
import "time"

type PostCreated struct {
Id int64
Author string
CreatedAt time.Time
Id int64 `json:"id"`
Author string `json:"author"`
CreatedAt time.Time `json:"created_at"`
}

func (pc PostCreated) Signature() string {
Expand Down
53 changes: 21 additions & 32 deletions example/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,14 @@ func main() {

facades.Config().Add("queue", map[string]any{
"connections": map[string]any{
"kafka": map[string]any{},
"rabbitmq": map[string]any{},
"kafka": map[string]any{},
"rabbitmq": map[string]any{
"protocol": "amqp",
"username": "guest",
"password": "guest",
"host": "localhost",
"port": "5672",
},
"redis": map[string]any{
"host": facades.Config().Get("database.connections.redis.host"),
"port": facades.Config().Get("database.connections.redis.port"),
Expand All @@ -50,47 +56,30 @@ func main() {

facades.Event().Register(kernel.Listen())

go func() {
if err := facades.Event().Run(ContractEvent.RunEvent{
Connection: event.DriverRedis,
Job: ExampleEvent.PostCreated{},
QueueName: "symphonic-example-queue",
}); nil != err {
SysLog.Fatalln(err.Error())
}
}()

time.Sleep(5 * time.Second)

go func() {
for i := 1; i <= 3; i++ {
time.Sleep(1 * time.Second)
if err := facades.Event().Job(&ExampleEvent.PostCreated{
Id: int64(i),
Author: "Fredrick Widjaya",
CreatedAt: time.Now(),
}).OnConnection("redis").Publish(); nil != err {
}).OnConnection(event.DriverRedis).OnQueue("testing-queue").Publish(); nil != err {
SysLog.Fatalln(err.Error())
}
}
}()

go func() {
if err := facades.Event().Run(ContractEvent.RunEvent{
Connection: event.DriverRedis,
Job: ExampleEvent.PostCreated{},
}); nil != err {
SysLog.Fatalln(err.Error())
}
}()

//go func() {
// client := redis.NewClient(&redis.Options{
// Addr: "localhost:6379", // Update with your Redis server address
// })
//
// pubsub := client.Subscribe(context.Background(), ExampleEvent.PostCreated{}.Signature())
// defer pubsub.Close()
//
// for {
// msg, err := pubsub.ReceiveMessage(context.Background())
// if err != nil {
// fmt.Printf("Error receiving message: %v\n", err)
// continue
// }
//
// // Process the message here
// fmt.Printf("Received: %s\n", msg.Payload)
// }
//}()

select {}
}
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ require (
github.com/golang-migrate/migrate/v4 v4.16.2
github.com/golang-module/carbon/v2 v2.2.6
github.com/gookit/color v1.5.4
github.com/rabbitmq/amqp091-go v1.9.0
github.com/redis/go-redis/v9 v9.2.0
github.com/robfig/cron/v3 v3.0.0
github.com/sirupsen/logrus v1.9.3
github.com/spf13/cast v1.5.1
Expand All @@ -25,7 +27,6 @@ require (
github.com/magiconair/properties v1.8.7 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/pelletier/go-toml/v2 v2.0.8 // indirect
github.com/redis/go-redis/v9 v9.2.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/spf13/afero v1.9.5 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
Expand Down
Loading

0 comments on commit fb7263c

Please sign in to comment.