Skip to content

Commit

Permalink
Fix RabbitMQ HTTP client with TLS (#136)
Browse files Browse the repository at this point in the history
* Fix RabbitMQ HTTP client with TLS

Signed-off-by: raihankhan <[email protected]>
  • Loading branch information
raihankhan authored Sep 4, 2024
1 parent b8c278d commit 2af340c
Showing 1 changed file with 66 additions and 4 deletions.
70 changes: 66 additions & 4 deletions rabbitmq/kubedb_client_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,14 @@ package rabbitmq

import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"net"
"net/http"
"strings"
"time"

rmqhttp "github.com/michaelklishin/rabbit-hole/v2"
amqp "github.com/rabbitmq/amqp091-go"
Expand All @@ -29,6 +34,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
"kubedb.dev/apimachinery/apis/kubedb"
dbapi "kubedb.dev/apimachinery/apis/kubedb/v1"
olddbapi "kubedb.dev/apimachinery/apis/kubedb/v1alpha2"
"sigs.k8s.io/controller-runtime/pkg/client"
)
Expand Down Expand Up @@ -129,14 +135,63 @@ func (o *KubeDBClientBuilder) GetRabbitMQClient() (*Client, error) {
username, password = "guest", "guest"
}

var tlsConfig *tls.Config
if o.db.Spec.EnableSSL {
certSecret := &core.Secret{}
err := o.kc.Get(o.ctx, types.NamespacedName{
Namespace: o.db.Namespace,
Name: o.db.GetCertSecretName(olddbapi.RabbitmqClientCert),
}, certSecret)
if err != nil {
if kerr.IsNotFound(err) {
klog.Error(err, "Client certificate secret not found")
return nil, errors.New("client certificate secret is not found")
}
klog.Error(err, "Failed to get client certificate Secret")
return nil, err
}

// get tls cert, clientCA and rootCA for tls config
clientCA := x509.NewCertPool()
rootCA := x509.NewCertPool()

crt, err := tls.X509KeyPair(certSecret.Data[core.TLSCertKey], certSecret.Data[core.TLSPrivateKeyKey])
if err != nil {
klog.Error(err, "Failed to parse private key pair")
return nil, err
}
clientCA.AppendCertsFromPEM(certSecret.Data[dbapi.TLSCACertFileName])
rootCA.AppendCertsFromPEM(certSecret.Data[dbapi.TLSCACertFileName])

tlsConfig = &tls.Config{
Certificates: []tls.Certificate{crt},
ClientAuth: tls.RequireAndVerifyClientCert,
ClientCAs: clientCA,
RootCAs: rootCA,
MaxVersion: tls.VersionTLS13,
}
}

rmqClient := &Client{}
defaultVhost := "/"

if o.enableHTTPClient {
if o.httpURL == "" {
o.httpURL = o.GetHTTPconnURL()
}
httpClient, err := rmqhttp.NewClient(o.httpURL, username, password)
httpClient, err := func(isTLSEnabled bool) (*rmqhttp.Client, error) {
if isTLSEnabled {
return rmqhttp.NewTLSClient(o.httpURL, username, password, &http.Transport{
IdleConnTimeout: time.Second * 3,
DialContext: (&net.Dialer{
Timeout: time.Second * 30,
}).DialContext,
TLSClientConfig: tlsConfig,
TLSHandshakeTimeout: time.Second * 30,
})
}
return rmqhttp.NewClient(o.httpURL, username, password)
}(o.db.Spec.EnableSSL)
if err != nil {
klog.Error(err, "Failed to get http client for rabbitmq")
return nil, err
Expand Down Expand Up @@ -184,11 +239,18 @@ func (o *KubeDBClientBuilder) GetAMQPconnURL(username string, password string, v
}

func (o *KubeDBClientBuilder) GetHTTPconnURL() string {
protocolScheme := rmqhttp.HTTP
protocolScheme := o.db.GetConnectionScheme()
connectionPort := func(scheme string) int {
if scheme == "http" {
return kubedb.RabbitMQManagementUIPort
} else {
return kubedb.RabbitMQManagementUIPortWithSSL
}
}(protocolScheme)
if o.podName != "" {
return fmt.Sprintf("%s://%s.%s.%s.svc:%d", protocolScheme, o.podName, o.db.GoverningServiceName(), o.db.Namespace, kubedb.RabbitMQManagementUIPort)
return fmt.Sprintf("%s://%s.%s.%s.svc:%d", protocolScheme, o.podName, o.db.GoverningServiceName(), o.db.Namespace, connectionPort)
}
return fmt.Sprintf("%s://%s.%s.svc.cluster.local:%d", protocolScheme, o.db.DashboardServiceName(), o.db.Namespace, kubedb.RabbitMQManagementUIPort)
return fmt.Sprintf("%s://%s.%s.svc.cluster.local:%d", protocolScheme, o.db.DashboardServiceName(), o.db.Namespace, connectionPort)
}

// RabbitMQ server have a default virtual host "/"
Expand Down

0 comments on commit 2af340c

Please sign in to comment.