Skip to content

Commit

Permalink
CLOUDP-94859: allow to reclaim free space for processes [ops manager …
Browse files Browse the repository at this point in the history
…go client] (#114)
  • Loading branch information
andreaangiolillo authored Jul 13, 2021
1 parent 0820f6a commit c03daf0
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 17 deletions.
71 changes: 57 additions & 14 deletions atmcfg/atmcfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,44 +558,87 @@ func Restart(out *opsmngr.AutomationConfig, name string) {
}

// ReclaimFreeSpace sets all process of a cluster to reclaim free space.
func ReclaimFreeSpace(out *opsmngr.AutomationConfig, name string) {
// This value may not be present and is mandatory
if out.Auth.DeploymentAuthMechanisms == nil {
out.Auth.DeploymentAuthMechanisms = make([]string, 0)
func ReclaimFreeSpace(out *opsmngr.AutomationConfig, clusterName string) {
ReclaimFreeSpaceWithLastCompact(out, clusterName, "")
}

func ReclaimFreeSpaceWithLastCompact(out *opsmngr.AutomationConfig, clusterName, lastCompact string) {
if lastCompact == "" {
lastCompact = time.Now().Format(time.RFC3339)
}
newDeploymentAuthMechanisms(out)
reclaimByReplicaSetName(out, clusterName, lastCompact)
reclaimByShardName(out, clusterName, lastCompact)
}

// ReclaimFreeSpaceForProcessesByClusterName reclaims free space for a cluster. Processes are provided in the format {"hostname:port","hostname2:port2"}.
func ReclaimFreeSpaceForProcessesByClusterName(out *opsmngr.AutomationConfig, clusterName, lastCompact string, processes []string) error {
if len(processes) == 0 {
ReclaimFreeSpaceWithLastCompact(out, clusterName, lastCompact)
return nil
}
lastCompact := time.Now().Format(time.RFC3339)
reclaimByReplicaSetName(out, name, lastCompact)
reclaimByShardName(out, name, lastCompact)

return reclaimFreeSpaceForProcesses(out, clusterName, lastCompact, processes)
}

func reclaimFreeSpaceForProcesses(out *opsmngr.AutomationConfig, clusterName, lastCompact string, processes []string) error {
newDeploymentAuthMechanisms(out)
processesMap := newProcessMap(processes)
reclaimByReplicaSetNameAndProcesses(out, processesMap, clusterName, lastCompact)
reclaimByShardNameAndProcesses(out, processesMap, clusterName, lastCompact)

return newProcessNotFoundError(clusterName, processesMap)
}

func reclaimByReplicaSetName(out *opsmngr.AutomationConfig, name, lastCompact string) {
func reclaimByReplicaSetName(out *opsmngr.AutomationConfig, clusterName, lastCompact string) {
reclaimByReplicaSetNameAndProcesses(out, nil, clusterName, lastCompact)
}

func reclaimByReplicaSetNameAndProcesses(out *opsmngr.AutomationConfig, processesMap map[string]bool, clusterName, lastCompact string) {
i, found := search.ReplicaSets(out.ReplicaSets, func(rs *opsmngr.ReplicaSet) bool {
return rs.ID == name
return rs.ID == clusterName
})
if found {
rs := out.ReplicaSets[i]
for _, m := range rs.Members {
for k, p := range out.Processes {
if p.Name == m.Host {
out.Processes[k].LastCompact = lastCompact
setLastCompact(out.Processes[k], processesMap, lastCompact)
}
}
}
}
}

func reclaimByShardName(out *opsmngr.AutomationConfig, name, lastCompact string) {
func setLastCompact(process *opsmngr.Process, processesMap map[string]bool, lastCompact string) {
if len(processesMap) == 0 {
process.LastCompact = lastCompact
return
}

key := fmt.Sprintf("%s:%d", process.Hostname, process.Args26.NET.Port)
if _, ok := processesMap[key]; ok {
process.LastCompact = lastCompact
processesMap[key] = true
}
}

func reclaimByShardName(out *opsmngr.AutomationConfig, clusterName, lastCompact string) {
reclaimByShardNameAndProcesses(out, nil, clusterName, lastCompact)
}

func reclaimByShardNameAndProcesses(out *opsmngr.AutomationConfig, processesMap map[string]bool, clusterName, lastCompact string) {
i, found := search.ShardingConfig(out.Sharding, func(s *opsmngr.ShardingConfig) bool {
return s.Name == name
return s.Name == clusterName
})
if found {
s := out.Sharding[i]
// compact shards
for _, rs := range s.Shards {
reclaimByReplicaSetName(out, rs.ID, lastCompact)
reclaimByReplicaSetNameAndProcesses(out, processesMap, rs.ID, lastCompact)
}
// compact config rs
reclaimByReplicaSetName(out, s.ConfigServerReplica, lastCompact)
reclaimByReplicaSetNameAndProcesses(out, processesMap, s.ConfigServerReplica, lastCompact)
// compact doesn't run on mongoses
}
}
66 changes: 63 additions & 3 deletions atmcfg/atmcfg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package atmcfg
import (
"errors"
"testing"
"time"

"go.mongodb.org/ops-manager/opsmngr"
)
Expand Down Expand Up @@ -684,12 +685,71 @@ func TestReclaimFreeSpace(t *testing.T) {
config := automationConfigWithOneShardedCluster(clusterName, false)
ReclaimFreeSpace(config, clusterName)
for i := range config.Processes {
if config.Processes[i].ProcessType == "mongod" && config.Processes[i].LastCompact == "" {
t.Errorf("ReclaimFreeSpace\n got=%#v", config.Processes[i].LastRestart)
}
isLastCompactEmpty(t, config.Processes[i], "", -1)
if config.Processes[i].ProcessType == "mongos" && config.Processes[i].LastCompact != "" {
t.Errorf("ReclaimFreeSpace\n got=%#v", config.Processes[i].LastRestart)
}
}
})
}

func TestReclaimFreeSpaceForProcessesByClusterName(t *testing.T) {
lastCompact := time.Now().Format(time.RFC3339)
t.Run("replica set", func(t *testing.T) {
config := automationConfigWithOneReplicaSet(clusterName, true)
err := ReclaimFreeSpaceForProcessesByClusterName(config, clusterName, lastCompact, []string{"host0:27017"})
if err != nil {
t.Fatalf("ReclaimFreeSpaceForProcessesByClusterName() returned an unexpected error: %v", err)
}

if config.Processes[0].LastCompact == "" {
t.Errorf("Got = %#v", config.Processes[0].LastRestart)
}
})

t.Run("sharded cluster - two processes", func(t *testing.T) {
config := automationConfigWithOneShardedCluster(clusterName, true)
err := ReclaimFreeSpaceForProcessesByClusterName(config, clusterName, lastCompact, []string{"host0:27017", "host2:27018"})
if err != nil {
t.Fatalf("ReclaimFreeSpaceForProcessesByClusterName() returned an unexpected error: %v", err)
}
for i := range config.Processes {
isLastCompactEmpty(t, config.Processes[i], "host2", 27017)
isLastCompactEmpty(t, config.Processes[i], "host0", 27018)
}
})
t.Run("reclaim free space for entire sharded cluster", func(t *testing.T) {
config := automationConfigWithOneShardedCluster(clusterName, true)
err := ReclaimFreeSpaceForProcessesByClusterName(config, clusterName, lastCompact, nil)
if err != nil {
t.Fatalf("ReclaimFreeSpaceForProcessesByClusterName() returned an unexpected error: %v", err)
}
for i := range config.Processes {
isLastCompactEmpty(t, config.Processes[i], "", -1)
}
})
t.Run("provide a process that does not exist", func(t *testing.T) {
config := automationConfigWithOneShardedCluster(clusterName, true)
err := ReclaimFreeSpaceForProcessesByClusterName(config, clusterName, lastCompact, []string{"hostTest:21021"})
if !errors.Is(err, ErrProcessNotFound) {
t.Fatalf("Got = %#v, want = %#v", err, ErrProcessNotFound)
}

for i := range config.Processes {
if config.Processes[i].LastRestart != "" {
t.Errorf("Got = %#v, want = %#v", config.Processes[i].LastRestart, "")
}
}
})
}

func isLastCompactEmpty(t *testing.T, process *opsmngr.Process, hostname string, portN int) {
t.Helper()
if hostname != "" && process.Args26.NET.Port != portN && process.Hostname != hostname {
return
}

if process.ProcessType == "mongod" && process.LastCompact == "" {
t.Errorf("ReclaimFreeSpace\n got=%#v", process.LastRestart)
}
}

0 comments on commit c03daf0

Please sign in to comment.