Skip to content

Commit

Permalink
Fix RabbitMQ client for default Vhost setup (#133)
Browse files Browse the repository at this point in the history
* Use dashboard svc for http connectivity

Signed-off-by: raihankhan <[email protected]>
  • Loading branch information
raihankhan authored and sabbir-hossain70 committed Sep 4, 2024
1 parent d8af824 commit c474d91
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 22 deletions.
8 changes: 7 additions & 1 deletion rabbitmq/amqp_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,20 @@ func (c *AMQPClient) GetMessagingChannel() *Channel {
}

func (ch *Channel) GetNewQueue(name string, isDurable bool, isTypeQuoeum bool) (*amqp.Queue, error) {
var args amqp.Table
if isTypeQuoeum {
args = amqp.Table{ // queue args
amqp.QueueTypeArg: amqp.QueueTypeQuorum,
}
}
// Declare a non-persistent queue, where messages will be sent
q, err := ch.QueueDeclare(
name, // name
isDurable, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
args, // arguments
)
if err != nil {
klog.Error(err, "Failed to create a queue for publishing message")
Expand Down
57 changes: 36 additions & 21 deletions rabbitmq/kubedb_client_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package rabbitmq
import (
"context"
"errors"

"fmt"
"strings"

Expand Down Expand Up @@ -107,6 +106,7 @@ func (o *KubeDBClientBuilder) GetRabbitMQClient() (*Client, error) {
o.ctx = context.TODO()
}
authSecret := &core.Secret{}
var username, password string
if !o.db.Spec.DisableSecurity {
if o.db.Spec.AuthSecret == nil {
klog.Info("Auth-secret not set")
Expand All @@ -124,17 +124,44 @@ func (o *KubeDBClientBuilder) GetRabbitMQClient() (*Client, error) {
klog.Error(err, "Failed to get auth-secret")
return nil, err
}
username, password = string(authSecret.Data[core.BasicAuthUsernameKey]), string(authSecret.Data[core.BasicAuthPasswordKey])
} else {
username, password = "guest", "guest"
}

rmqClient := &Client{}
defaultVhost := "/"

if !o.disableAMQPClient {
if o.amqpURL == "" {
o.amqpURL = o.GetAMQPconnURL(string(authSecret.Data[core.BasicAuthUsernameKey]), string(authSecret.Data[core.BasicAuthPasswordKey]))
if o.enableHTTPClient {
if o.httpURL == "" {
o.httpURL = o.GetHTTPconnURL()
}
httpClient, err := rmqhttp.NewClient(o.httpURL, username, password)
if err != nil {
klog.Error(err, "Failed to get http client for rabbitmq")
return nil, err
}

vhosts, err := httpClient.ListVhosts()
if err != nil {
klog.Error(err, "Failed to list virtual hosts")
return nil, err
}
for _, vhost := range vhosts {
if vhost.Description == "Default virtual host" {
defaultVhost = vhost.Name
break
}
}
rmqClient.HTTPClient = HTTPClient{httpClient}
}

if o.vhost == "" {
o.vhost = o.GetVirtualHostFromURL(o.amqpURL)
if !o.disableAMQPClient {
if o.amqpURL == "" {
if o.vhost == "" {
o.vhost = defaultVhost
}
o.amqpURL = o.GetAMQPconnURL(username, password, o.vhost)
}

rabbitConnection, err := amqp.DialConfig(o.amqpURL, amqp.Config{
Expand All @@ -149,31 +176,19 @@ func (o *KubeDBClientBuilder) GetRabbitMQClient() (*Client, error) {
rmqClient.AMQPClient = AMQPClient{rabbitConnection}
}

if o.enableHTTPClient {
if o.httpURL == "" {
o.httpURL = o.GetHTTPconnURL()
}
httpClient, err := rmqhttp.NewClient(o.httpURL, string(authSecret.Data[core.BasicAuthUsernameKey]), string(authSecret.Data[core.BasicAuthPasswordKey]))
if err != nil {
klog.Error(err, "Failed to get http client for rabbitmq")
return nil, err
}
rmqClient.HTTPClient = HTTPClient{httpClient}
}

return rmqClient, nil
}

func (o *KubeDBClientBuilder) GetAMQPconnURL(username string, password string) string {
return fmt.Sprintf("amqp://%s:%s@%s.%s.svc.cluster.local:%d/", username, password, o.db.OffshootName(), o.db.Namespace, kubedb.RabbitMQAMQPPort)
func (o *KubeDBClientBuilder) GetAMQPconnURL(username string, password string, vhost string) string {
return fmt.Sprintf("amqp://%s:%s@%s.%s.svc.cluster.local:%d%s", username, password, o.db.ServiceName(), o.db.Namespace, kubedb.RabbitMQAMQPPort, vhost)
}

func (o *KubeDBClientBuilder) GetHTTPconnURL() string {
protocolScheme := rmqhttp.HTTP
if o.podName != "" {
return fmt.Sprintf("%s://%s.%s.%s.svc:%d", protocolScheme, o.podName, o.db.GoverningServiceName(), o.db.Namespace, kubedb.RabbitMQManagementUIPort)
}
return fmt.Sprintf("%s://%s.%s.svc.cluster.local:%d", protocolScheme, o.db.ServiceName(), o.db.Namespace, kubedb.RabbitMQManagementUIPort)
return fmt.Sprintf("%s://%s.%s.svc.cluster.local:%d", protocolScheme, o.db.DashboardServiceName(), o.db.Namespace, kubedb.RabbitMQManagementUIPort)
}

// RabbitMQ server have a default virtual host "/"
Expand Down

0 comments on commit c474d91

Please sign in to comment.