Skip to content

Commit

Permalink
Add RabbitMQ pubsub methods
Browse files Browse the repository at this point in the history
Signed-off-by: raihankhan <[email protected]>
  • Loading branch information
raihankhan committed May 6, 2024
1 parent d3a1eb1 commit 9893078
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 1 deletion.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ lint: $(BUILD_DIRS)
--env GO111MODULE=on \
--env GOFLAGS="-mod=vendor" \
$(BUILD_IMAGE) \
golangci-lint run --enable $(ADDTL_LINTERS) --timeout=10m --skip-files="generated.*\.go$\" --skip-dirs-use-default
golangci-lint run --enable $(ADDTL_LINTERS) --timeout=10m --exclude-files="generated.*\.go$\" --exclude-dirs-use-default

$(BUILD_DIRS):
@mkdir -p $@
Expand Down
89 changes: 89 additions & 0 deletions rabbitmq/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,98 @@ limitations under the License.
package rabbitmq

import (
"context"
"errors"

amqp "github.com/rabbitmq/amqp091-go"
"k8s.io/klog/v2"
)

type Client struct {
*amqp.Connection
}

type Channel struct {
*amqp.Channel
}

func (c *Client) GetMessagingChannel() *Channel {
// Create a channel for sending and receiving messages
messagingCh, err := c.Channel()
if err != nil {
klog.Error(err, "Failed to Open a Messaging Channel")
return &Channel{nil}
}
return &Channel{messagingCh}
}

func (ch *Channel) GetNewQueue(name string, isDurable bool, isTypeQuoeum bool) (*amqp.Queue, error) {
// Declare a non-persistent queue, where messages will be sent
q, err := ch.QueueDeclare(
name, // name
isDurable, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
klog.Error(err, "Failed to create a queue for publishing message")
return nil, err
}
return &q, nil
}

func (ch *Channel) PublishMessageOnce(ctx context.Context, queueName string, message string) error {
// Declare a non-persistent queue, where messages will be sent
err := ch.PublishWithContext(
ctx,
"", // exchange
queueName, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
})
if err != nil {
klog.Error(err, "Failed to publish message")
return err
}
return nil
}

func (ch *Channel) ConsumeMessageOnce(ctx context.Context, cancelFunc context.CancelFunc, queueName string) error {
// start delivering messages through the channel,
// a goroutine will be collecting messages with the context timeout
deliveryCh, err := ch.ConsumeWithContext(
ctx,
queueName, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
klog.Error(err, "Failed to consume message")
return err
}
received := false
go func() {
for d := range deliveryCh {
if d.Body != nil {
received = true
cancelFunc()
return
}
}
}()

<-ctx.Done()
if !received {
return errors.New("failed to consume message due to timeout")
}
return nil
}

0 comments on commit 9893078

Please sign in to comment.