From c474d918edde426495fd2f895856b8a1a77203c3 Mon Sep 17 00:00:00 2001 From: Raihan Khan Date: Fri, 30 Aug 2024 16:05:11 +0600 Subject: [PATCH] Fix RabbitMQ client for default Vhost setup (#133) * Use dashboard svc for http connectivity Signed-off-by: raihankhan --- rabbitmq/amqp_client.go | 8 ++++- rabbitmq/kubedb_client_builder.go | 57 +++++++++++++++++++------------ 2 files changed, 43 insertions(+), 22 deletions(-) diff --git a/rabbitmq/amqp_client.go b/rabbitmq/amqp_client.go index 7a9222f7..7e651fa3 100644 --- a/rabbitmq/amqp_client.go +++ b/rabbitmq/amqp_client.go @@ -35,6 +35,12 @@ func (c *AMQPClient) GetMessagingChannel() *Channel { } func (ch *Channel) GetNewQueue(name string, isDurable bool, isTypeQuoeum bool) (*amqp.Queue, error) { + var args amqp.Table + if isTypeQuoeum { + args = amqp.Table{ // queue args + amqp.QueueTypeArg: amqp.QueueTypeQuorum, + } + } // Declare a non-persistent queue, where messages will be sent q, err := ch.QueueDeclare( name, // name @@ -42,7 +48,7 @@ func (ch *Channel) GetNewQueue(name string, isDurable bool, isTypeQuoeum bool) ( false, // delete when unused false, // exclusive false, // no-wait - nil, // arguments + args, // arguments ) if err != nil { klog.Error(err, "Failed to create a queue for publishing message") diff --git a/rabbitmq/kubedb_client_builder.go b/rabbitmq/kubedb_client_builder.go index a7186306..87f6fef2 100644 --- a/rabbitmq/kubedb_client_builder.go +++ b/rabbitmq/kubedb_client_builder.go @@ -19,7 +19,6 @@ package rabbitmq import ( "context" "errors" - "fmt" "strings" @@ -107,6 +106,7 @@ func (o *KubeDBClientBuilder) GetRabbitMQClient() (*Client, error) { o.ctx = context.TODO() } authSecret := &core.Secret{} + var username, password string if !o.db.Spec.DisableSecurity { if o.db.Spec.AuthSecret == nil { klog.Info("Auth-secret not set") @@ -124,17 +124,44 @@ func (o *KubeDBClientBuilder) GetRabbitMQClient() (*Client, error) { klog.Error(err, "Failed to get auth-secret") return nil, err } + username, password = string(authSecret.Data[core.BasicAuthUsernameKey]), string(authSecret.Data[core.BasicAuthPasswordKey]) + } else { + username, password = "guest", "guest" } rmqClient := &Client{} + defaultVhost := "/" - if !o.disableAMQPClient { - if o.amqpURL == "" { - o.amqpURL = o.GetAMQPconnURL(string(authSecret.Data[core.BasicAuthUsernameKey]), string(authSecret.Data[core.BasicAuthPasswordKey])) + if o.enableHTTPClient { + if o.httpURL == "" { + o.httpURL = o.GetHTTPconnURL() + } + httpClient, err := rmqhttp.NewClient(o.httpURL, username, password) + if err != nil { + klog.Error(err, "Failed to get http client for rabbitmq") + return nil, err + } + + vhosts, err := httpClient.ListVhosts() + if err != nil { + klog.Error(err, "Failed to list virtual hosts") + return nil, err + } + for _, vhost := range vhosts { + if vhost.Description == "Default virtual host" { + defaultVhost = vhost.Name + break + } } + rmqClient.HTTPClient = HTTPClient{httpClient} + } - if o.vhost == "" { - o.vhost = o.GetVirtualHostFromURL(o.amqpURL) + if !o.disableAMQPClient { + if o.amqpURL == "" { + if o.vhost == "" { + o.vhost = defaultVhost + } + o.amqpURL = o.GetAMQPconnURL(username, password, o.vhost) } rabbitConnection, err := amqp.DialConfig(o.amqpURL, amqp.Config{ @@ -149,23 +176,11 @@ func (o *KubeDBClientBuilder) GetRabbitMQClient() (*Client, error) { rmqClient.AMQPClient = AMQPClient{rabbitConnection} } - if o.enableHTTPClient { - if o.httpURL == "" { - o.httpURL = o.GetHTTPconnURL() - } - httpClient, err := rmqhttp.NewClient(o.httpURL, string(authSecret.Data[core.BasicAuthUsernameKey]), string(authSecret.Data[core.BasicAuthPasswordKey])) - if err != nil { - klog.Error(err, "Failed to get http client for rabbitmq") - return nil, err - } - rmqClient.HTTPClient = HTTPClient{httpClient} - } - return rmqClient, nil } -func (o *KubeDBClientBuilder) GetAMQPconnURL(username string, password string) string { - return fmt.Sprintf("amqp://%s:%s@%s.%s.svc.cluster.local:%d/", username, password, o.db.OffshootName(), o.db.Namespace, kubedb.RabbitMQAMQPPort) +func (o *KubeDBClientBuilder) GetAMQPconnURL(username string, password string, vhost string) string { + return fmt.Sprintf("amqp://%s:%s@%s.%s.svc.cluster.local:%d%s", username, password, o.db.ServiceName(), o.db.Namespace, kubedb.RabbitMQAMQPPort, vhost) } func (o *KubeDBClientBuilder) GetHTTPconnURL() string { @@ -173,7 +188,7 @@ func (o *KubeDBClientBuilder) GetHTTPconnURL() string { if o.podName != "" { return fmt.Sprintf("%s://%s.%s.%s.svc:%d", protocolScheme, o.podName, o.db.GoverningServiceName(), o.db.Namespace, kubedb.RabbitMQManagementUIPort) } - return fmt.Sprintf("%s://%s.%s.svc.cluster.local:%d", protocolScheme, o.db.ServiceName(), o.db.Namespace, kubedb.RabbitMQManagementUIPort) + return fmt.Sprintf("%s://%s.%s.svc.cluster.local:%d", protocolScheme, o.db.DashboardServiceName(), o.db.Namespace, kubedb.RabbitMQManagementUIPort) } // RabbitMQ server have a default virtual host "/"