diff --git a/pkg/cmd/cluster.go b/pkg/cmd/cluster.go
index 595111fa..ea9cacb3 100644
--- a/pkg/cmd/cluster.go
+++ b/pkg/cmd/cluster.go
@@ -732,6 +732,9 @@ var (
extendClientParam bool
grpcClientParam bool
profileFirstParam bool
+ machineParam string
+ rackParam string
+ siteParam string
skipMavenDepsParam bool
backupLogFilesParam bool
validPersistenceModes = []string{"on-demand", "active", "active-backup", "active-async"}
@@ -1450,6 +1453,9 @@ func init() {
createClusterCmd.Flags().Int32VarP(&jmxRemotePortParam, jmxPortArg, "J", 0, jmxPortMessage)
createClusterCmd.Flags().StringVarP(&jmxRemoteHostParam, jmxHostArg, "j", "", jmxHostMessage)
createClusterCmd.Flags().BoolVarP(&profileFirstParam, profileFirstArg, "F", false, profileFirstMessage)
+ createClusterCmd.Flags().StringVarP(&machineParam, machineArg, "", "", machineMessage)
+ createClusterCmd.Flags().StringVarP(&rackParam, rackArg, "", "", rackMessage)
+ createClusterCmd.Flags().StringVarP(&siteParam, siteArg, "", "", siteMessage)
stopClusterCmd.Flags().BoolVarP(&automaticallyConfirm, "yes", "y", false, confirmOptionMessage)
@@ -1465,6 +1471,9 @@ func init() {
startClusterCmd.Flags().StringVarP(&jmxRemoteHostParam, jmxHostArg, "j", "", jmxHostMessage)
startClusterCmd.Flags().BoolVarP(&profileFirstParam, profileFirstArg, "F", false, profileFirstMessage)
startClusterCmd.Flags().BoolVarP(&backupLogFilesParam, backupLogFilesArg, "B", false, backupLogFilesMessage)
+ startClusterCmd.Flags().StringVarP(&machineParam, machineArg, "", "", machineMessage)
+ startClusterCmd.Flags().StringVarP(&rackParam, rackArg, "", "", rackMessage)
+ startClusterCmd.Flags().StringVarP(&siteParam, siteArg, "", "", siteMessage)
startConsoleCmd.Flags().StringVarP(&heapMemoryParam, heapMemoryArg, "M", defaultHeap, heapMemoryMessage)
startConsoleCmd.Flags().Int32VarP(&logLevelParam, logLevelArg, "l", 5, logLevelMessage)
@@ -1493,6 +1502,9 @@ func init() {
scaleClusterCmd.Flags().StringVarP(&profileValueParam, profileArg, "P", "", profileMessage)
scaleClusterCmd.Flags().StringVarP(&serverStartClassParam, startClassArg, "S", "", startClassMessage)
scaleClusterCmd.Flags().BoolVarP(&backupLogFilesParam, backupLogFilesArg, "B", false, backupLogFilesMessage)
+ scaleClusterCmd.Flags().StringVarP(&machineParam, machineArg, "", "", machineMessage)
+ scaleClusterCmd.Flags().StringVarP(&rackParam, rackArg, "", "", rackMessage)
+ scaleClusterCmd.Flags().StringVarP(&siteParam, siteArg, "", "", siteMessage)
}
// sanitizeConnectionName sanitizes a cluster connection
diff --git a/pkg/cmd/cluster_utils.go b/pkg/cmd/cluster_utils.go
index 0f4a9520..8f4006ce 100644
--- a/pkg/cmd/cluster_utils.go
+++ b/pkg/cmd/cluster_utils.go
@@ -412,6 +412,18 @@ func getCacheServerArgs(connection ClusterConnection, member string, httpPort in
}
}
+ if machineParam != "" {
+ baseArgs = append(baseArgs, fmt.Sprintf("-Dcoherence.machine=%s", machineParam))
+ }
+
+ if rackParam != "" {
+ baseArgs = append(baseArgs, fmt.Sprintf("-Dcoherence.rack=%s", rackParam))
+ }
+
+ if siteParam != "" {
+ baseArgs = append(baseArgs, fmt.Sprintf("-Dcoherence.site=%s", siteParam))
+ }
+
// if default heap is overridden, then use this
if heapMemoryParam != defaultHeap {
heap = heapMemoryParam
diff --git a/pkg/cmd/formatting.go b/pkg/cmd/formatting.go
index 65b98e6e..771fd359 100644
--- a/pkg/cmd/formatting.go
+++ b/pkg/cmd/formatting.go
@@ -55,6 +55,9 @@ const (
NameColumn = "NAME"
publisherColumn = "PUBLISHER"
receiverColumn = "RECEIVER"
+ machineColumn = "MACHINE"
+ rackColumn = "RACK"
+ siteColumn = "SITE"
avgSize = "AVG SIZE"
avgApply = "AVG APPLY"
avgBacklogDelay = "AVG BACKLOG DELAY"
@@ -579,6 +582,76 @@ func FormatTopicsSummary(topicDetails []config.TopicDetail) string {
return table.String()
}
+// FormatPartitionOwnership returns the partition ownership in column formatted output.
+func FormatPartitionOwnership(partitionDetails map[int]*config.PartitionOwnership) string {
+ var (
+ ownershipCount = len(partitionDetails)
+ keys = make([]int, 0)
+ header = []string{MemberColumn, "PRIMARIES", "BACKUPS", "PRIMARY PARTITIONS"}
+ )
+ if ownershipCount == 0 {
+ return ""
+ }
+
+ // get and sort the keys
+ for k := range partitionDetails {
+ keys = append(keys, k)
+ }
+ sort.Ints(keys)
+
+ // get the backup-count
+ backupCount := utils.GetBackupCount(partitionDetails)
+
+ if OutputFormat == constants.WIDE {
+ header = []string{MemberColumn, machineColumn, rackColumn, siteColumn, "PRIMARIES", "BACKUPS", "PRIMARY"}
+ }
+
+ // build the header for the backups
+ for i := 0; i < backupCount; i++ {
+ header = append(header, fmt.Sprintf("BACKUP %d", i+1))
+ }
+
+ table := newFormattedTable().WithAlignment(generateColumnFormats(backupCount)...).WithHeader(header...)
+
+ for j := 0; j < len(keys); j++ {
+ key := keys[j]
+ value := partitionDetails[key]
+
+ memberID := "Orphaned"
+ if value.MemberID != -1 {
+ memberID = fmt.Sprintf("%v", value.MemberID)
+ }
+
+ table.AddRow(memberID)
+
+ if OutputFormat == constants.WIDE {
+ table.AddColumnsToRow(value.Machine, value.Rack, value.Site)
+ }
+
+ table.AddColumnsToRow(formatSmallInteger(int32(value.PrimaryPartitions)),
+ formatSmallInteger(int32(value.BackupPartitions)))
+
+ // add primaries and backups
+ for i := 0; i <= backupCount; i++ {
+ table.AddColumnsToRow(utils.FormatPartitions(value.PartitionMap[i]))
+ }
+ }
+
+ return table.String()
+}
+
+func generateColumnFormats(count int) []string {
+ result := []string{R, R, R, L}
+ if OutputFormat == constants.WIDE {
+ result = []string{R, L, L, L, R, R, L}
+ }
+
+ for i := 0; i < count; i++ {
+ result = append(result, L)
+ }
+ return result
+}
+
// FormatTopicsSubscribers returns the topics subscriber details in column formatted output
func FormatTopicsSubscribers(topicsSubscribers []config.TopicsSubscriberDetail) string {
var (
@@ -1388,7 +1461,7 @@ func FormatMembers(members []config.Member, verbose bool, storageMap map[int]boo
WithAlignment(finalAlignment...)
if OutputFormat == constants.WIDE {
- table.AddHeaderColumns("MACHINE", "RACK", "SITE", publisherColumn, receiverColumn)
+ table.AddHeaderColumns(machineColumn, rackColumn, siteColumn, publisherColumn, receiverColumn)
table.AddFormattingFunction(9, networkStatsFormatter)
table.AddFormattingFunction(10, networkStatsFormatter)
}
@@ -1813,7 +1886,7 @@ func FormatMachines(machines []config.Machine) string {
return strings.Compare(machines[p].MachineName, machines[q].MachineName) < 0
})
- table := newFormattedTable().WithHeader("MACHINE", "PROCESSORS", "LOAD", "TOTAL MEMORY", "FREE MEMORY",
+ table := newFormattedTable().WithHeader(machineColumn, "PROCESSORS", "LOAD", "TOTAL MEMORY", "FREE MEMORY",
"% FREE", "OS", "ARCH", "VERSION").WithAlignment(L, R, R, R, R, R, L, L, L)
table.AddFormattingFunction(5, machineMemoryFormatting)
diff --git a/pkg/cmd/monitor_cluster.go b/pkg/cmd/monitor_cluster.go
index 1ccd78eb..cf29436c 100644
--- a/pkg/cmd/monitor_cluster.go
+++ b/pkg/cmd/monitor_cluster.go
@@ -102,6 +102,7 @@ var validPanels = []panelImpl{
createContentPanel(8, "services", "Services", "show services", servicesContent, servicesPanelData),
createContentPanel(8, "service-members", "Service Members (%SERVICE)", "show service members", serviceMembersContent, servicesPanelData),
createContentPanel(8, "service-distributions", "Service Distributions (%SERVICE)", "show service distributions", serviceDistributionsContent, servicesPanelData),
+ createContentPanel(8, "service-ownership", "Service Ownership (%SERVICE)", "show service ownership", serviceOwnershipContent, servicesPanelData),
createContentPanel(8, "service-storage", "Service Storage", "show service storage", serviceStorageContent, servicesPanelData),
createContentPanel(8, "topic-members", "Topic Members (%SERVICE/%TOPIC)", "show topic members", topicMembersContent, topicsPanelData),
createContentPanel(8, "subscribers", "Topic Subscribers (%SERVICE/%TOPIC)", "show topic subscribers", topicSubscribersContent, topicsPanelData),
@@ -639,6 +640,61 @@ var serviceDistributionsContent = func(dataFetcher fetcher.Fetcher, _ clusterSum
return strings.Split(distributions.ScheduledDistributions, "\n"), nil
}
+var serviceOwnershipContent = func(dataFetcher fetcher.Fetcher, _ clusterSummaryInfo) ([]string, error) {
+ var (
+ membersResult []byte
+ memberNodeID string
+ membersDetails = config.ServiceMemberDetails{}
+ )
+
+ if serviceName == "" {
+ return emptyStringArray, errSelectService
+ }
+
+ servicesResult, err := GetDistributedServices(dataFetcher)
+ if err != nil {
+ return emptyStringArray, err
+ }
+
+ if !utils.SliceContains(servicesResult, serviceName) {
+ return emptyStringArray, fmt.Errorf(unableToFindService, serviceName)
+ }
+
+ // find storage member node
+ membersResult, err = dataFetcher.GetServiceMembersDetailsJSON(serviceName)
+ if err != nil {
+ return emptyStringArray, err
+ }
+
+ if len(membersResult) != 0 {
+ err = json.Unmarshal(membersResult, &membersDetails)
+ if err != nil {
+ return emptyStringArray, utils.GetError("unable to unmarshall members ownership", err)
+ }
+
+ for _, v := range membersDetails.Services {
+ memberNodeID = v.NodeID
+ break
+ }
+
+ var ownershipData []byte
+
+ ownershipData, err = dataFetcher.GetServiceOwnershipJSON(serviceName, memberNodeID)
+ if err != nil {
+ return emptyStringArray, err
+ }
+
+ result, err := getOwnershipData(dataFetcher, ownershipData)
+ if err != nil {
+ return emptyStringArray, err
+ }
+
+ return strings.Split(FormatPartitionOwnership(result), "\n"), nil
+ }
+
+ return noContentArray, nil
+}
+
var serviceStorageContent = func(dataFetcher fetcher.Fetcher, _ clusterSummaryInfo) ([]string, error) {
storageSummary, err := getServiceStorageDetails(dataFetcher)
diff --git a/pkg/cmd/root.go b/pkg/cmd/root.go
index 63a8c74d..7799daf2 100644
--- a/pkg/cmd/root.go
+++ b/pkg/cmd/root.go
@@ -85,6 +85,9 @@ const (
healthPortMessage = "starting port for health"
jmxPortMessage = "remote JMX port for management member"
jmxHostMessage = "remote JMX RMI host for management member"
+ machineMessage = "the machine name to use"
+ rackMessage = "the rack name to use"
+ siteMessage = "the site name to use"
cacheConfigMessage = "cache configuration file"
operationalConfigMessage = "override override file"
cacheConfigArg = "cache-config"
@@ -94,6 +97,9 @@ const (
healthPortArg = "health-port"
jmxPortArg = "jmx-port"
jmxHostArg = "jmx-host"
+ machineArg = "machine"
+ rackArg = "rack"
+ siteArg = "site"
logLevelMessage = "coherence log level"
profileMessage = "profile to add to cluster startup command line"
backupLogFilesMessage = "backup old cache server log files"
@@ -507,6 +513,7 @@ func Initialize(command *cobra.Command) *cobra.Command {
getCmd.AddCommand(getProxyConnectionsCmd)
getCmd.AddCommand(getUseGradleCmd)
getCmd.AddCommand(getServiceStorageCmd)
+ getCmd.AddCommand(getServiceOwnershipCmd)
getCmd.AddCommand(getCacheStoresCmd)
getCmd.AddCommand(getColorCmd)
getCmd.AddCommand(getNetworkStatsCmd)
diff --git a/pkg/cmd/service.go b/pkg/cmd/service.go
index c647d50c..60d8da59 100644
--- a/pkg/cmd/service.go
+++ b/pkg/cmd/service.go
@@ -45,6 +45,7 @@ const (
provideServiceName = "you must provide a service name"
unableToFindService = "unable to find service with service name '%s'"
noDistributionsData = "No distributions data is available"
+ serviceUnmarshall = "unable to unmarshall members result"
)
// getServicesCmd represents the get services command.
@@ -367,6 +368,156 @@ var getServiceDistributionsCmd = &cobra.Command{
},
}
+// getServiceOwnershipCmd represents the get service-ownership command.
+var getServiceOwnershipCmd = &cobra.Command{
+ Use: "service-ownership service-name",
+ Short: "display partition ownership information for a service",
+ Long: `The 'get service-ownership' command displays partition ownership for a service.`,
+ Args: func(cmd *cobra.Command, args []string) error {
+ if len(args) != 1 {
+ displayErrorAndExit(cmd, provideServiceName)
+ }
+ return nil
+ },
+ ValidArgsFunction: completionDistributedService,
+ RunE: func(cmd *cobra.Command, args []string) error {
+ var (
+ err error
+ dataFetcher fetcher.Fetcher
+ connection string
+ membersResult []byte
+ membersDetails = config.ServiceMemberDetails{}
+ memberNodeID string
+ )
+
+ connection, dataFetcher, err = GetConnectionAndDataFetcher()
+ if err != nil {
+ return err
+ }
+
+ servicesResult, err := GetDistributedServices(dataFetcher)
+ if err != nil {
+ return err
+ }
+
+ if !utils.SliceContains(servicesResult, args[0]) {
+ return fmt.Errorf(unableToFindService, args[0])
+ }
+
+ // find storage member node
+ membersResult, err = dataFetcher.GetServiceMembersDetailsJSON(args[0])
+ if err != nil {
+ return err
+ }
+
+ err = json.Unmarshal(membersResult, &membersDetails)
+ if err != nil {
+ return utils.GetError(serviceUnmarshall, err)
+ }
+
+ // find the first node
+ for _, v := range membersDetails.Services {
+ memberNodeID = v.NodeID
+ break
+ }
+
+ if memberNodeID == "" {
+ return fmt.Errorf("cannot find a node for service %s", args[0])
+ }
+
+ for {
+ var ownershipData []byte
+
+ ownershipData, err = dataFetcher.GetServiceOwnershipJSON(args[0], memberNodeID)
+ if err != nil {
+ return err
+ }
+
+ if strings.Contains(OutputFormat, constants.JSONPATH) || OutputFormat == constants.JSON {
+ if strings.Contains(OutputFormat, constants.JSONPATH) {
+ result, err := utils.GetJSONPathResults(ownershipData, OutputFormat)
+ if err != nil {
+ return err
+ }
+ cmd.Println(result)
+ } else {
+ cmd.Println(string(ownershipData))
+ }
+ } else {
+ printWatchHeader(cmd)
+
+ result, err := getOwnershipData(dataFetcher, ownershipData)
+ if err != nil {
+ return err
+ }
+ cmd.Println(FormatCurrentCluster(connection))
+ cmd.Println(FormatPartitionOwnership(result))
+ }
+
+ // check to see if we should exit if we are not watching
+ if !isWatchEnabled() {
+ break
+ }
+
+ // we are watching so sleep and then repeat until CTRL-C
+ time.Sleep(time.Duration(watchDelay) * time.Second)
+ }
+
+ return nil
+ },
+}
+
+func getOwnershipData(dataFetcher fetcher.Fetcher, ownershipData []byte) (map[int]*config.PartitionOwnership, error) {
+ var ownership config.Ownership
+
+ if len(ownershipData) != 0 {
+ err := json.Unmarshal(ownershipData, &ownership)
+ if err != nil {
+ return nil, err
+ }
+ } else {
+ ownership.Details = ""
+ }
+
+ results, err := utils.ParsePartitionOwnership(ownership.Details)
+ if err != nil {
+ return nil, err
+ }
+
+ if OutputFormat == constants.WIDE {
+ var (
+ members = config.Members{}
+ membersResult []byte
+ )
+
+ membersResult, err = dataFetcher.GetMemberDetailsJSON(OutputFormat != constants.TABLE && OutputFormat != constants.WIDE)
+ if err != nil {
+ return nil, err
+ }
+
+ err = json.Unmarshal(membersResult, &members)
+ if err != nil {
+ return nil, utils.GetError(unableToDecode, err)
+ }
+
+ // retrieve the machine, rack and site for display in wide mode
+ for _, v := range results {
+ v.Machine, v.Rack, v.Site = getMachineRackSite(fmt.Sprintf("%v", v.MemberID), members.Members)
+ }
+ }
+
+ return results, nil
+}
+
+func getMachineRackSite(nodeID string, members []config.Member) (string, string, string) {
+ for _, v := range members {
+ if v.NodeID == nodeID {
+ return v.MachineName, v.RackName, v.SiteName
+ }
+ }
+ return "", "", ""
+}
+
// getServiceDescriptionCmd represents the get service-description command.
var getServiceDescriptionCmd = &cobra.Command{
Use: "service-description service-name",
@@ -515,7 +666,7 @@ var getServiceMembersCmd = &cobra.Command{
err = json.Unmarshal(membersResult, &membersDetails)
if err != nil {
- return utils.GetError("unable to unmarshall members result", err)
+ return utils.GetError(serviceUnmarshall, err)
}
var finalDetails []config.ServiceMemberDetail
@@ -697,7 +848,7 @@ service is a cache service.`,
err = json.Unmarshal(membersResult, &membersDetails)
if err != nil {
- return utils.GetError("unable to unmarshall members result", err)
+ return utils.GetError(serviceUnmarshall, err)
}
err = json.Unmarshal(membersResult, &persistenceDetails)
diff --git a/pkg/config/config_helper.go b/pkg/config/config_helper.go
index f65df8d2..4788a726 100644
--- a/pkg/config/config_helper.go
+++ b/pkg/config/config_helper.go
@@ -775,7 +775,29 @@ type Distributions struct {
ScheduledDistributions string `json:"scheduledDistributions"`
}
+// Ownership contains partition ownership distributions.
+type Ownership struct {
+ Details string `json:"ownership"`
+}
+
// Description contains description for an item.
type Description struct {
Description string `json:"description"`
}
+
+// PartitionOwnership contains partition ownership for a service.
+type PartitionOwnership struct {
+ MemberID int
+ TotalPartitions int
+ PrimaryPartitions int
+ BackupPartitions int
+ // a map of partition ownership keyed by int where
+ // 0 = primary, 1 = first backup, 2 = second backup....
+ // and value are the owned partitions
+ PartitionMap map[int][]int
+
+ // -o wide options
+ Machine string
+ Rack string
+ Site string
+}
diff --git a/pkg/fetcher/fetcher.go b/pkg/fetcher/fetcher.go
index b09e4adf..ca696f2b 100644
--- a/pkg/fetcher/fetcher.go
+++ b/pkg/fetcher/fetcher.go
@@ -109,6 +109,9 @@ type Fetcher interface {
// GetScheduledDistributionsJSON returns scheduled distributions for a service.
GetScheduledDistributionsJSON(serviceName string) ([]byte, error)
+ // GetServiceOwnershipJSON returns service ownership for a service.
+ GetServiceOwnershipJSON(serviceName string, nodeID string) ([]byte, error)
+
// GetServiceDescriptionJSON returns service description.
GetServiceDescriptionJSON(serviceName string) ([]byte, error)
diff --git a/pkg/fetcher/http_fetcher.go b/pkg/fetcher/http_fetcher.go
index 969ac7f6..3d85068a 100644
--- a/pkg/fetcher/http_fetcher.go
+++ b/pkg/fetcher/http_fetcher.go
@@ -158,7 +158,7 @@ func (h HTTPFetcher) GetMemberDetailsJSON(verbose bool) ([]byte, error) {
// GetNetworkStatsJSON returns network stats in raw json.
func (h HTTPFetcher) GetNetworkStatsJSON(nodeID string) ([]byte, error) {
- result, err := httpGetRequest(h, "/members/"+nodeID+"/networkStats"+links)
+ result, err := httpGetRequest(h, membersPath+nodeID+"/networkStats"+links)
if err != nil {
return constants.EmptyByte, utils.GetError("cannot get get networkStats information", err)
}
@@ -447,6 +447,16 @@ func (h HTTPFetcher) GetScheduledDistributionsJSON(serviceName string) ([]byte,
return result, nil
}
+// GetServiceOwnershipJSON returns service ownership for a service.
+func (h HTTPFetcher) GetServiceOwnershipJSON(serviceName string, nodeID string) ([]byte, error) {
+ result, err := httpGetRequest(h, servicesPath+getSafeServiceName(h, serviceName)+
+ membersPath+nodeID+"/ownership?links=&verbose=true")
+ if err != nil && !strings.Contains(err.Error(), errorCode404) {
+ return constants.EmptyByte, utils.GetError("cannot get service ownership for service "+serviceName, err)
+ }
+ return result, nil
+}
+
// GetServiceDescriptionJSON returns service description.
func (h HTTPFetcher) GetServiceDescriptionJSON(serviceName string) ([]byte, error) {
result, err := httpGetRequest(h, servicesPath+getSafeServiceName(h, serviceName)+descriptionPath)
diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go
index 9269075a..c074f16f 100644
--- a/pkg/utils/utils.go
+++ b/pkg/utils/utils.go
@@ -17,7 +17,9 @@ import (
"go.uber.org/zap/zapcore"
"os"
"path/filepath"
+ "regexp"
"runtime"
+ "sort"
"strconv"
"strings"
"unicode"
@@ -303,3 +305,145 @@ func GetStartupDelayInMillis(startupDelay string) (int64, error) {
}
return int64(millis), nil
}
+
+var (
+ backupPattern = regexp.MustCompile(`\[(\d+)\]`)
+ replacePattern = regexp.MustCompile(`^.*?:\s`)
+ memberPattern = regexp.MustCompile(`\*\*\* Member:\s+(\d+)\s+total=(\d+)\s+\(primary=(\d+),\s+backup=(\d+)\)`)
+)
+
+func extractBackup(sBackupString string) int {
+ matches := backupPattern.FindStringSubmatch(sBackupString)
+ if len(matches) > 1 {
+ if backup, err := strconv.Atoi(matches[1]); err == nil {
+ return backup
+ }
+ }
+ return -1
+}
+
+func removePrefix(s string) string {
+ if !strings.Contains(s, ":") {
+ return ""
+ }
+ return replacePattern.ReplaceAllString(s, "")
+}
+
+func extractPartitions(spart string) []int {
+ var partitions []int
+ s := strings.ReplaceAll(spart, "+", " ")
+ parts := strings.Split(removePrefix(s), ", ")
+
+ for _, part := range parts {
+ if part != "" {
+ if num, err := strconv.Atoi(part); err == nil {
+ partitions = append(partitions, num)
+ }
+ }
+ }
+
+ return partitions
+}
+
+func newPartitionOwnership(memberID, totalPartitions, primaryPartitions, backupPartitions int) *config.PartitionOwnership {
+ return &config.PartitionOwnership{
+ MemberID: memberID,
+ TotalPartitions: totalPartitions,
+ PrimaryPartitions: primaryPartitions,
+ BackupPartitions: backupPartitions,
+ PartitionMap: make(map[int][]int),
+ }
+}
+
+func ParsePartitionOwnership(sOwnership string) (map[int]*config.PartitionOwnership, error) {
+ mapOwnership := make(map[int]*config.PartitionOwnership)
+
+ if len(sOwnership) == 0 {
+ return mapOwnership, nil
+ }
+
+ var (
+ parts = strings.Split(sOwnership, "
")
+ currentMember = -2
+ ownership = &config.PartitionOwnership{}
+ )
+
+ for _, line := range parts {
+ switch {
+ case strings.Contains(line, "*** Member:"):
+ matches := memberPattern.FindStringSubmatch(line)
+ if len(matches) > 1 {
+ memberID, _ := strconv.Atoi(matches[1])
+ totalPartitions, _ := strconv.Atoi(matches[2])
+ primaryPartitions, _ := strconv.Atoi(matches[3])
+ backupPartitions, _ := strconv.Atoi(matches[4])
+
+ ownership = newPartitionOwnership(memberID, totalPartitions, primaryPartitions, backupPartitions)
+ mapOwnership[memberID] = ownership
+ currentMember = memberID
+ } else {
+ return nil, fmt.Errorf("unable to parse line [%s]", line)
+ }
+
+ case strings.Contains(line, "*** Orphans"):
+ currentMember = -1
+ ownership = newPartitionOwnership(-1, 0, 0, 0)
+ mapOwnership[-1] = ownership
+
+ case strings.Contains(line, "Primary["):
+ ownership = mapOwnership[currentMember]
+ ownership.PartitionMap[0] = extractPartitions(line)
+
+ case strings.Contains(line, "Backup["):
+ backup := extractBackup(line)
+ if backup == -1 {
+ return nil, fmt.Errorf("negative backup from %s", line)
+ }
+ ownership = mapOwnership[currentMember]
+ ownership.PartitionMap[backup] = extractPartitions(line)
+ }
+ }
+
+ return mapOwnership, nil
+}
+
+func FormatPartitions(partitions []int) string {
+ if len(partitions) == 0 {
+ return "-"
+ }
+
+ sort.Ints(partitions)
+
+ var result []string
+ start := partitions[0]
+ prev := partitions[0]
+
+ for i := 1; i < len(partitions); i++ {
+ if partitions[i] == prev+1 {
+ prev = partitions[i]
+ } else {
+ if start == prev {
+ result = append(result, strconv.Itoa(start))
+ } else {
+ result = append(result, fmt.Sprintf("%d..%d", start, prev))
+ }
+ start = partitions[i]
+ prev = partitions[i]
+ }
+ }
+
+ if start == prev {
+ result = append(result, strconv.Itoa(start))
+ } else {
+ result = append(result, fmt.Sprintf("%d..%d", start, prev))
+ }
+
+ return strings.Join(result, ", ")
+}
+
+func GetBackupCount(ownership map[int]*config.PartitionOwnership) int {
+ for _, v := range ownership {
+ return len(v.PartitionMap) - 1
+ }
+ return 1
+}
diff --git a/pkg/utils/utils_test.go b/pkg/utils/utils_test.go
index ef944b04..c27a3c5f 100644
--- a/pkg/utils/utils_test.go
+++ b/pkg/utils/utils_test.go
@@ -265,3 +265,127 @@ func TestNoWritableHomeDir(t *testing.T) {
// required
fmt.Println("")
}
+
+const (
+ Ownership71 = `There are currently no pending or scheduled distributions for this service.
+
*** Member: 1 total=5 (primary=3, backup=2)
Primary[]#003: 000, 001, 002
+
Backup[1]#002: 003, 004
*** Member: 2 total=5 (primary=2, backup=3)
+
Primary[]#002: 005, 006
Backup[1]#003: 000, 001, 002
*** Member: 3 total=4
+(primary=2, backup=2)
Primary[]#002: 003, 004
Backup[1]#002: 005, 006
+*** Orphans:
Primary[]#000
Backup[1]#000
`
+
+ Ownership192 = `There are currently no pending or scheduled distributions for this service.
+
*** Member: 1 total=9 (primary=3, backup=6)
Primary[]#003: 000, 008, 012
+
Backup[1]#003: 013, 015, 017
Backup[2]#003: 002, 004, 007
+*** Member: 2 total=9 (primary=3, backup=6)
Primary[]#003: 005, 009, 013
+
Backup[1]#002: 006, 008
Backup[2]#004: 010, 012, 015, 017
+*** Member: 3 total=9 (primary=3, backup=6)
Primary[]#003: 001, 002, 004
+
Backup[1]#006: 000, 003, 005, 010, 011, 016
Backup[2]#000
+*** Member: 4 total=9 (primary=3, backup=6)
Primary[]#003: 006, 010, 014
+
Backup[1]#001: 018
Backup[2]#005: 000, 003, 005, 008, 013
+*** Member: 5 total=10 (primary=3, backup=7)
Primary[]#003: 003, 007, 011
+
Backup[1]#003: 009, 012, 014
Backup[2]#004: 001, 006, 016, 018
+*** Member: 6 total=11 (primary=4, backup=7)
Primary[]#004: 015, 016, 017, 018
+
Backup[1]#004: 001, 002, 004, 007
Backup[2]#003: 009, 011, 014
+*** Orphans:
Primary[]#000
Backup[1]#000
Backup[2]#000
`
+
+ Ownership71Safe = `There are currently no pending or scheduled distributions for this service.
+
*** Member: 1 total=5 (primary=3, backup=2)
Primary[]#003:+000,+001,+002
+
Backup[1]#002:+003,+004
*** Member: 2 total=5 (primary=2, backup=3)
+
Primary[]#002:+005,+006
Backup[1]#003:+000,+001,+002
*** Member: 3 total=4
+(primary=2, backup=2)
Primary[]#002:+003,+004
Backup[1]#002:+005,+006
+*** Orphans:
Primary[]#000
Backup[1]#000
`
+)
+
+func TestParsePartitions(t *testing.T) {
+ var (
+ g = gomega.NewGomegaWithT(t)
+ )
+ partitions := extractPartitions("Backup[1]#008: ")
+ g.Expect(len(partitions)).To(gomega.Equal(0))
+
+ partitions = extractPartitions("Backup[1]#000")
+ g.Expect(len(partitions)).To(gomega.Equal(0))
+
+ partitions = extractPartitions("Backup[1]#008: 333, 444, 5555")
+ g.Expect(len(partitions)).To(gomega.Equal(3))
+
+ partitions = extractPartitions("Primary[]#006: 031, 032, 033, 034, 035, 036")
+ g.Expect(len(partitions)).To(gomega.Equal(6))
+
+ partitions = extractPartitions("Primary[]#006:+031,+032,+033,+034,+035,+036")
+ g.Expect(len(partitions)).To(gomega.Equal(6))
+}
+
+func TestExtractBackup(t *testing.T) {
+ g := gomega.NewGomegaWithT(t)
+
+ g.Expect(extractBackup("Rubbish")).To(gomega.Equal(-1))
+ g.Expect(extractBackup("Backup[1]#008:")).To(gomega.Equal(1))
+ g.Expect(extractBackup("Backup[1]#008: 333, 444, 5555")).To(gomega.Equal(1))
+ g.Expect(extractBackup("Backup[2]#008: 333, 444, 5555")).To(gomega.Equal(2))
+ g.Expect(extractBackup("Backup[2]#008:+333,+444,+5555")).To(gomega.Equal(2))
+}
+
+func TestRemovePrefix(t *testing.T) {
+ g := gomega.NewGomegaWithT(t)
+
+ g.Expect(removePrefix("Rubbish")).To(gomega.Equal(""))
+ g.Expect(removePrefix("Backup[1]#008: ")).To(gomega.Equal(""))
+ g.Expect(removePrefix("Backup[1]#000")).To(gomega.Equal(""))
+ g.Expect(removePrefix("Backup[1]#008: 333, 444, 5555")).To(gomega.Equal("333, 444, 5555"))
+ g.Expect(removePrefix("Backup[2]#008: 333, 444")).To(gomega.Equal("333, 444"))
+ g.Expect(removePrefix("Primary[]#006: 031, 032, 033, 034, 035, 036")).To(gomega.Equal("031, 032, 033, 034, 035, 036"))
+}
+
+func TestFormatPartitions(t *testing.T) {
+ g := gomega.NewGomegaWithT(t)
+
+ g.Expect(FormatPartitions([]int{0, 1, 3, 4, 5, 10})).To(gomega.Equal("0..1, 3..5, 10"))
+ g.Expect(FormatPartitions([]int{0, 1, 2, 3, 4, 5})).To(gomega.Equal("0..5"))
+ g.Expect(FormatPartitions([]int{0, 3, 5, 7})).To(gomega.Equal("0, 3, 5, 7"))
+ g.Expect(FormatPartitions([]int{10, 9, 8, 22, 21})).To(gomega.Equal("8..10, 21..22"))
+}
+
+func Test7Partitions1Backup(t *testing.T) {
+ g := gomega.NewGomegaWithT(t)
+
+ // Parse the partition ownership from Ownership_7_1
+ mapOwnership, err := ParsePartitionOwnership(encodeOwnership(Ownership71))
+ g.Expect(err).ToNot(gomega.HaveOccurred())
+
+ for _, v := range mapOwnership {
+ g.Expect(v.PartitionMap).To(gomega.Not(gomega.BeNil()))
+ }
+
+ // Parse the partition ownership from Ownership_7_1
+ mapOwnership, err = ParsePartitionOwnership(encodeOwnership(Ownership71Safe))
+ g.Expect(err).ToNot(gomega.HaveOccurred())
+
+ for _, v := range mapOwnership {
+ g.Expect(v.PartitionMap).To(gomega.Not(gomega.BeNil()))
+ }
+
+ // Assert that the map size is correct
+ g.Expect(len(mapOwnership)).To(gomega.Equal(4))
+}
+
+func Test19Partitions2Backup(t *testing.T) {
+ g := gomega.NewGomegaWithT(t)
+
+ // Parse the partition ownership from Ownership_19_2
+ mapOwnership, err := ParsePartitionOwnership(encodeOwnership(Ownership192))
+ g.Expect(err).ToNot(gomega.HaveOccurred(), "Expected no error during parsing")
+
+ // Print the map for visualization (optional)
+ for k, v := range mapOwnership {
+ fmt.Printf("k=%d, v=%+v\n", k, v)
+ }
+
+ // Assert that the map size is correct
+ g.Expect(len(mapOwnership)).To(gomega.Equal(7), "Expected map size to be 7")
+}
+
+func encodeOwnership(sText string) string {
+ return fmt.Sprintf("{\"ownership\":\"%s\"}", sText)
+}
diff --git a/scripts/test-create-cluster.sh b/scripts/test-create-cluster.sh
index c90b971e..6bff3bd2 100755
--- a/scripts/test-create-cluster.sh
+++ b/scripts/test-create-cluster.sh
@@ -103,7 +103,7 @@ runCommand set debug on
# Create a cluster
message "Create Cluster"
-runCommand create cluster local -y -v $VERSION $COM -S com.tangosol.net.Coherence
+runCommand create cluster local -y -v $VERSION $COM -S com.tangosol.net.Coherence --machine machine1
wait_for_ready
@@ -118,7 +118,7 @@ grep "[3,3,3]" $OUTPUT
# Scale the cluster to 6 members
message "Scale Cluster to 6 members and delay each by 5 seconds"
-runCommand scale cluster local -r 6 -D 5s
+runCommand scale cluster local -r 6 -D 5s --machine machine2
pause && pause && pause
# Check the members of PartitionedCache
@@ -131,7 +131,7 @@ grep "[6,6,6]" $OUTPUT
runCommand stop cluster local -y
message "Startup cluster with 5 members"
-runCommand start cluster local -r 5
+runCommand start cluster local -r 5 --site site1
wait_for_ready
runCommand get services -o jsonpath="$.items[?(@.name=='PartitionedCache')].memberCount"
@@ -186,7 +186,7 @@ pause && pause && pause
LOGS_DEST=$(mktemp -d)
message "Test different log location - ${LOGS_DEST}"
-runCommand create cluster local -y -M 512m -I $COM -v $VERSION -L ${LOGS_DEST}
+runCommand create cluster local -y -M 512m -I $COM -v $VERSION -L ${LOGS_DEST} --rack rack1
wait_for_ready
# check to see a storage-0.log file exists