From 7c54afa6ffd703de3c57538ae910bb9b50e6fe38 Mon Sep 17 00:00:00 2001 From: pritamdas99 Date: Fri, 20 Sep 2024 15:11:00 +0600 Subject: [PATCH] Add methods Signed-off-by: pritamdas99 --- solr/client.go | 11 ++++ solr/util.go | 161 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 172 insertions(+) diff --git a/solr/client.go b/solr/client.go index aa8dd7b4..595ff38d 100644 --- a/solr/client.go +++ b/solr/client.go @@ -91,3 +91,14 @@ type BalanceReplica struct { WaitForFinalState bool `json:"waitForFinalState,omitempty" yaml:"waitForFinalState,omitempty"` Async string `json:"async,omitempty" yaml:"async,omitempty"` } + +type CoreList struct { + coreName string + collection string +} + +type UpdateList struct { + target string + replica string + collection string +} diff --git a/solr/util.go b/solr/util.go index 880e2079..877b9b60 100644 --- a/solr/util.go +++ b/solr/util.go @@ -279,3 +279,164 @@ func (sc *Client) CleanupAsync(async string) error { wg.Wait() return errr } + +func (sc *Client) Balance() error { + var errr error + async := "balance-replica" + err := sc.CleanupAsync(async) + if err != nil { + klog.Error(fmt.Sprintf("Failed to clean asyncid******************************* %v\n", async)) + time.Sleep(30 * time.Second) + errr = err + } else { + klog.Info(fmt.Sprintf("Cleanup async successful for %v", async)) + time.Sleep(10 * time.Second) + } + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + resp, err := sc.BalanceReplica(async) + if err != nil { + klog.Error(fmt.Errorf("failed to do balance request. err %v", err)) + errr = err + } + responseBody, err := sc.DecodeResponse(resp) + if err != nil { + klog.Error(fmt.Errorf("failed to decode response for async %s, err %v", async, err)) + errr = err + } + _, err = sc.GetResponseStatus(responseBody) + if err != nil { + klog.Error(fmt.Errorf("failed to decode response for async %s, err %v", async, err)) + errr = err + } + + err = sc.CheckupStatus(async) + if err != nil { + klog.Error("Error while checking status************ ", err) + errr = err + } + }() + wg.Wait() + return errr +} + +func (sc *Client) Run(lst []UpdateList) error { + var errr error + var wg sync.WaitGroup + for _, x := range lst { + target := x.target + replica := x.replica + collection := x.collection + async := fmt.Sprintf("%s-%s-%s", replica, collection, target) + err := sc.CleanupAsync(async) + if err != nil { + klog.Error(fmt.Sprintf("Failed to clean asyncid******************************* %v\n", async)) + time.Sleep(30 * time.Second) + errr = err + } else { + klog.Info(fmt.Sprintf("Cleanup async successful for %v", async)) + time.Sleep(10 * time.Second) + } + wg.Add(1) + go func() { + defer wg.Done() + resp, err := sc.MoveReplica(target, replica, collection, async) + if err != nil { + klog.Error(fmt.Errorf("failed to do request for target %s, replica %s, collection %s, err %v", target, replica, collection, err)) + errr = err + } + responseBody, err := sc.DecodeResponse(resp) + if err != nil { + klog.Error(fmt.Errorf("failed to decode response for target %s, replica %s, collection %s, err %v", target, replica, collection, err)) + errr = err + } + _, err = sc.GetResponseStatus(responseBody) + if err != nil { + klog.Error(fmt.Errorf("failed to decode response for target %s, replica %s, collection %s, err %v", target, replica, collection, err)) + errr = err + } + + err = sc.CheckupStatus(async) + if err != nil { + errr = err + klog.Error("Error while checking status************ ", err) + } + }() + } + wg.Wait() + return errr +} + +func (sc *Client) Down(nodeList []string, x int, mp map[string][]CoreList) error { + n := len(nodeList) + ls2 := nodeList[n-x:] + ls1 := nodeList[:n-x] + fmt.Println("ls1 ", ls1) + fmt.Println("ls2 ", ls2) + ar := make([]UpdateList, 0) + for _, node := range ls2 { + for _, core := range mp[node] { + id := -1 + mx := 1000000000 + for j, l1 := range ls1 { + if len(mp[l1]) < mx { + mx = len(mp[l1]) + id = j + } + } + ar = append(ar, UpdateList{ + target: ls1[id], + replica: core.coreName, + collection: core.collection, + }) + mp[ls1[id]] = append(mp[ls1[id]], core) + fmt.Println(core.coreName, core.collection, ls1[id]) + } + } + err := sc.Run(ar) + return err +} +func (sc *Client) Up(nodeList []string, mp map[string][]CoreList) error { + for _, x := range nodeList { + if _, ok := mp[x]; !ok { + mp[x] = make([]CoreList, 0) + } + } + ar := make([]UpdateList, 0) + for { + mn := 10000000000 + minNode := "" + mx := -1 + maxNode := "" + for x, y := range mp { + n := len(y) + if mx < n { + mx = n + maxNode = x + } + + if mn > n { + mn = n + minNode = x + } + } + if maxNode == minNode || mx-mn <= 1 { + break + } + target := minNode + core := mp[maxNode][0].coreName + collection := mp[maxNode][0].collection + mp[minNode] = append(mp[minNode], mp[maxNode][0]) + mp[maxNode] = mp[maxNode][1:] + ar = append(ar, UpdateList{ + target: target, + replica: core, + collection: collection, + }) + fmt.Println(target, core, collection) + } + err := sc.Run(ar) + return err +}