diff --git a/go.mod b/go.mod index 45717bb79..249225bd5 100644 --- a/go.mod +++ b/go.mod @@ -2,8 +2,6 @@ module kubedb.dev/db-client-go go 1.22.0 -toolchain go1.22.1 - require ( github.com/IBM/sarama v1.42.1 github.com/Masterminds/semver/v3 v3.2.1 diff --git a/rabbitmq/http_client.go b/rabbitmq/http_client.go index fe50e04db..7470d08d7 100644 --- a/rabbitmq/http_client.go +++ b/rabbitmq/http_client.go @@ -18,6 +18,7 @@ package rabbitmq import ( "fmt" + rabbithole "github.com/michaelklishin/rabbit-hole/v2" "k8s.io/klog/v2" ) @@ -42,3 +43,76 @@ func (c *HTTPClient) IsAllNodesRunningInCluster(replicas int) (bool, error) { klog.Info("All required nodes running in cluster") return true, nil } + +func (c *HTTPClient) getQueues() ([]rabbithole.QueueInfo, error) { + queues, err := c.Client.ListQueues() + if err != nil { + klog.Error(err, "Failed to get queue lists") + return nil, err + } + return queues, nil +} + +func (c *HTTPClient) getClassicQueues() ([]rabbithole.QueueInfo, error) { + queues, err := c.getQueues() + if err != nil { + klog.Error(err, "Failed to get queue lists") + return nil, err + } + classicQueues := []rabbithole.QueueInfo{} + + for _, q := range queues { + if q.Type == rabbitmqQueueTypeClassic { + classicQueues = append(classicQueues, q) + } + } + + return classicQueues, nil +} + +func (c *HTTPClient) hasNodeAnyClassicQueue(queues []rabbithole.QueueInfo, node string) bool { + for _, q := range queues { + if q.Type == rabbitmqQueueTypeClassic && q.Node == node { + return true + } + } + return false +} + +func (c *HTTPClient) getQuorumQueues() ([]rabbithole.QueueInfo, error) { + queues, err := c.getQueues() + if err != nil { + klog.Error(err, "Failed to get queue lists") + return nil, err + } + quorumQueues := []rabbithole.QueueInfo{} + + for _, q := range queues { + if q.Type == rabbitmqQueueTypeQuorum { + quorumQueues = append(quorumQueues, q) + } + } + + return quorumQueues, nil +} + +func (c *HTTPClient) hasNodeAnyQuorumQueue(queues []rabbithole.QueueInfo, node string) bool { + for _, q := range queues { + if q.Type == rabbitmqQueueTypeQuorum && q.Node == node { + return true + } + } + return false +} + +func (c *HTTPClient) getNodeNameFromPodURL(url string) string { + podClient, err := rabbithole.NewClient(url, c.Username, c.Password) + if err != nil { + return "" + } + overview, err := podClient.Overview() + if err != nil { + return "" + } + return overview.Node +} diff --git a/rabbitmq/kubedb_client_builder.go b/rabbitmq/kubedb_client_builder.go index 688ca8039..20fbc100a 100644 --- a/rabbitmq/kubedb_client_builder.go +++ b/rabbitmq/kubedb_client_builder.go @@ -44,6 +44,11 @@ type KubeDBClientBuilder struct { disableAMQPClient bool } +const ( + rabbitmqQueueTypeQuorum = "quorum" + rabbitmqQueueTypeClassic = "classic" +) + func NewKubeDBClientBuilder(kc client.Client, db *api.RabbitMQ) *KubeDBClientBuilder { return &KubeDBClientBuilder{ kc: kc,