Skip to content

Commit

Permalink
Add RabbitMQ HTTP and AMQP clients
Browse files Browse the repository at this point in the history
Signed-off-by: raihankhan <[email protected]>
  • Loading branch information
raihankhan committed May 16, 2024
1 parent 94f6f27 commit 2d68697
Show file tree
Hide file tree
Showing 46 changed files with 6,260 additions and 29 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ lint: $(BUILD_DIRS)
--env GO111MODULE=on \
--env GOFLAGS="-mod=vendor" \
$(BUILD_IMAGE) \
golangci-lint run --enable $(ADDTL_LINTERS) --timeout=10m --skip-files="generated.*\.go$\" --skip-dirs-use-default
golangci-lint run --enable $(ADDTL_LINTERS) --timeout=10m --exclude-files="generated.*\.go$\" --exclude-dirs-use-default

$(BUILD_DIRS):
@mkdir -p $@
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ require (
github.com/go-resty/resty/v2 v2.11.0
github.com/go-sql-driver/mysql v1.7.1
github.com/lib/pq v1.10.7
github.com/michaelklishin/rabbit-hole/v2 v2.16.0
github.com/microsoft/go-mssqldb v1.6.0
github.com/opensearch-project/opensearch-go v1.1.0
github.com/opensearch-project/opensearch-go/v2 v2.3.0
Expand Down
152 changes: 146 additions & 6 deletions go.sum

Large diffs are not rendered by default.

105 changes: 105 additions & 0 deletions rabbitmq/amqp_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
Copyright AppsCode Inc. and Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package rabbitmq

import (
"context"
"errors"
amqp "github.com/rabbitmq/amqp091-go"
"k8s.io/klog/v2"
)

func (c *AMQPClient) GetMessagingChannel() *Channel {
// Create a channel for sending and receiving messages
messagingCh, err := c.Channel()
if err != nil {
klog.Error(err, "Failed to Open a Messaging Channel")
return &Channel{nil}
}
return &Channel{messagingCh}
}

func (ch *Channel) GetNewQueue(name string, isDurable bool, isTypeQuoeum bool) (*amqp.Queue, error) {
// Declare a non-persistent queue, where messages will be sent
q, err := ch.QueueDeclare(
name, // name
isDurable, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
klog.Error(err, "Failed to create a queue for publishing message")
return nil, err
}
return &q, nil
}

func (ch *Channel) PublishMessageOnce(ctx context.Context, queueName string, message string) error {
// Declare a non-persistent queue, where messages will be sent
err := ch.PublishWithContext(
ctx,
"", // exchange
queueName, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
})
if err != nil {
klog.Error(err, "Failed to publish message")
return err
}
return nil
}

func (ch *Channel) ConsumeMessageOnce(ctx context.Context, cancelFunc context.CancelFunc, queueName string) error {
// start delivering messages through the channel,
// a goroutine will be collecting messages with the context timeout
deliveryCh, err := ch.ConsumeWithContext(
ctx,
queueName, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
klog.Error(err, "Failed to consume message")
return err
}
received := false
go func() {
for d := range deliveryCh {
if d.Body != nil {
received = true
cancelFunc()
return
}
}
}()

<-ctx.Done()
if !received {
return errors.New("failed to consume message due to timeout")
}
return nil
}
14 changes: 14 additions & 0 deletions rabbitmq/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,23 @@ limitations under the License.
package rabbitmq

import (
rmqhttp "github.com/michaelklishin/rabbit-hole/v2"
amqp "github.com/rabbitmq/amqp091-go"
)

type Client struct {
AMQPClient
HTTPClient
}

type AMQPClient struct {
*amqp.Connection
}

type HTTPClient struct {
*rmqhttp.Client
}

type Channel struct {
*amqp.Channel
}
44 changes: 44 additions & 0 deletions rabbitmq/http_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
Copyright AppsCode Inc. and Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package rabbitmq

import (
"fmt"
"k8s.io/klog/v2"
)

func (c *HTTPClient) IsAllNodesRunningInCluster(replicas int) (bool, error) {
nodes, err := c.ListNodes()
if err != nil {
klog.Error(err, "Failed to get node lists")
return true, err
}

if len(nodes) < replicas {
klog.Info(fmt.Sprintf("Cluster requires %v nodes but only %v nodes joined", replicas, len(nodes)))
return false, nil
}
for _, node := range nodes {
if !node.IsRunning {
klog.Error(err, fmt.Sprintf("Node: %s is not running", node.Name))
return false, nil
}
}

klog.Info("All required nodes running in cluster")
return true, nil
}
110 changes: 88 additions & 22 deletions rabbitmq/kubedb_client_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import (
"context"
"errors"
"fmt"
"strings"

rmqhttp "github.com/michaelklishin/rabbit-hole/v2"
amqp "github.com/rabbitmq/amqp091-go"
core "k8s.io/api/core/v1"
kerr "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -31,11 +33,15 @@ import (
)

type KubeDBClientBuilder struct {
kc client.Client
db *api.RabbitMQ
url string
podName string
ctx context.Context
kc client.Client
db *api.RabbitMQ
ctx context.Context
amqpURL string
httpURL string
podName string
vhost string
enableHTTPClient bool
disableAMQPClient bool
}

func NewKubeDBClientBuilder(kc client.Client, db *api.RabbitMQ) *KubeDBClientBuilder {
Expand All @@ -50,8 +56,18 @@ func (o *KubeDBClientBuilder) WithPod(podName string) *KubeDBClientBuilder {
return o
}

func (o *KubeDBClientBuilder) WithURL(url string) *KubeDBClientBuilder {
o.url = url
func (o *KubeDBClientBuilder) WithAMQPURL(url string) *KubeDBClientBuilder {
o.amqpURL = url
return o
}

func (o *KubeDBClientBuilder) WithHTTPURL(url string) *KubeDBClientBuilder {
o.httpURL = url
return o
}

func (o *KubeDBClientBuilder) WithVHost(vhost string) *KubeDBClientBuilder {
o.vhost = vhost
return o
}

Expand All @@ -60,11 +76,20 @@ func (o *KubeDBClientBuilder) WithContext(ctx context.Context) *KubeDBClientBuil
return o
}

func (o *KubeDBClientBuilder) GetAMQPconnURL(username string, password string) string {
return fmt.Sprintf("amqp://%s:%s@%s.%s.svc.cluster.local:%d/", username, password, o.db.OffshootName(), o.db.Namespace, api.RabbitMQAMQPPort)
func (o *KubeDBClientBuilder) WithHTTPClientEnabled() *KubeDBClientBuilder {
o.enableHTTPClient = true
return o
}

func (o *KubeDBClientBuilder) WithAMQPClientDisabled() *KubeDBClientBuilder {
o.disableAMQPClient = true
return o
}

func (o *KubeDBClientBuilder) GetRabbitMQClient() (*Client, error) {
if o.ctx == nil {
o.ctx = context.TODO()
}
authSecret := &core.Secret{}
if !o.db.Spec.DisableSecurity {
if o.db.Spec.AuthSecret == nil {
Expand All @@ -84,19 +109,60 @@ func (o *KubeDBClientBuilder) GetRabbitMQClient() (*Client, error) {
return nil, err
}
}
dbConnURL := o.GetAMQPconnURL(string(authSecret.Data[core.BasicAuthUsernameKey]), string(authSecret.Data[core.BasicAuthPasswordKey]))

rabbitConnection, err := amqp.DialConfig(dbConnURL, amqp.Config{
Vhost: "/",
Locale: "en_US",
})
if err != nil {
klog.Error(err, "Failed to connect to rabbitmq")
return nil, err

rmqClient := &Client{}

if !o.disableAMQPClient {
if o.amqpURL == "" {
o.amqpURL = o.GetAMQPconnURL(string(authSecret.Data[core.BasicAuthUsernameKey]), string(authSecret.Data[core.BasicAuthPasswordKey]))
}

if o.vhost == "" {
o.vhost = o.GetVirtualHostFromURL(o.amqpURL)
}

rabbitConnection, err := amqp.DialConfig(o.amqpURL, amqp.Config{
Vhost: o.vhost,
Locale: "en_US",
})
if err != nil {
klog.Error(err, "Failed to connect to rabbitmq")
return nil, err
}
klog.Info("Successfully created AMQP client for RabbitMQ")
rmqClient.AMQPClient = AMQPClient{rabbitConnection}
}

if o.enableHTTPClient {
if o.httpURL == "" {
o.httpURL = o.GetHTTPconnURL()
}
httpClient, err := rmqhttp.NewClient(o.httpURL, string(authSecret.Data[core.BasicAuthUsernameKey]), string(authSecret.Data[core.BasicAuthPasswordKey]))
if err != nil {
klog.Error(err, "Failed to get http client for rabbitmq")
return nil, err
}
rmqClient.HTTPClient = HTTPClient{httpClient}
}
klog.Info("Successfully created client for db")

return &Client{
Connection: rabbitConnection,
}, nil
return rmqClient, nil
}

func (o *KubeDBClientBuilder) GetAMQPconnURL(username string, password string) string {
return fmt.Sprintf("amqp://%s:%s@%s.%s.svc.cluster.local:%d/", username, password, o.db.OffshootName(), o.db.Namespace, api.RabbitMQAMQPPort)
}

func (o *KubeDBClientBuilder) GetHTTPconnURL() string {
return fmt.Sprintf("http://%s.%s.svc.cluster.local:%d/", o.db.OffshootName(), o.db.Namespace, api.RabbitMQManagementUIPort)
}

// RabbitMQ server have a default virtual host "/"
// for custom vhost, it must be appended at the end of the url separated by "/"
func (o *KubeDBClientBuilder) GetVirtualHostFromURL(url string) (vhost string) {
vhost = "/"
lastIndex := strings.LastIndex(url, vhost)
if lastIndex != -1 && lastIndex < len(url)-1 {
return url[lastIndex+1:]
}
return vhost
}
27 changes: 27 additions & 0 deletions vendor/github.com/michaelklishin/rabbit-hole/v2/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 29 additions & 0 deletions vendor/github.com/michaelklishin/rabbit-hole/v2/.travis.yml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 2d68697

Please sign in to comment.