From 9a0d75da53f703b0c50d542702c9f26a5dc3b8e2 Mon Sep 17 00:00:00 2001 From: Dmytro Zghoba Date: Thu, 7 Sep 2023 11:09:41 +0300 Subject: [PATCH] [PBM-1171] support mixed env in pre-check --- agent/agent.go | 18 ++++-- agent/backup.go | 25 +++++--- cli/backup.go | 123 ++++++++++++++++++++++++++++++++++++-- cli/status.go | 2 +- pbm/agent_status.go | 28 ++++++++- pbm/backup/backup.go | 14 +++++ pbm/bcp_nodes_priority.go | 10 +++- pbm/bsontypes.go | 5 ++ pbm/pbm.go | 16 +++-- 9 files changed, 214 insertions(+), 27 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index a68017780..197f8cf45 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -472,10 +472,19 @@ func (a *Agent) HbIsRun() bool { } func (a *Agent) HbStatus() { + l := a.log.NewEvent("agentCheckup", "", "", primitive.Timestamp{}) + + nodeVersion, err := a.node.GetMongoVersion() + if err != nil { + l.Error("get mongo version: %v", err) + } + hb := pbm.AgentStat{ - Node: a.node.Name(), - RS: a.node.RS(), - Ver: version.Current().Version, + Node: a.node.Name(), + RS: a.node.RS(), + AgentVer: version.Current().Version, + MongoVer: nodeVersion.VersionString, + PerconaVer: nodeVersion.PSMDBVersion, } defer func() { if err := a.pbm.RemoveAgentStatus(hb); err != nil { @@ -487,8 +496,6 @@ func (a *Agent) HbStatus() { tk := time.NewTicker(pbm.AgentsStatCheckRange) defer tk.Stop() - l := a.log.NewEvent("agentCheckup", "", "", primitive.Timestamp{}) - // check storage once in a while if all is ok (see https://jira.percona.com/browse/PBM-647) const checkStoreIn = int(60 / (pbm.AgentsStatCheckRange / time.Second)) cc := 0 @@ -535,6 +542,7 @@ func (a *Agent) HbStatus() { hb.Hidden = inf.Hidden hb.Passive = inf.Passive } + hb.Arbiter = inf.ArbiterOnly err = a.pbm.SetAgentStatus(hb) if err != nil { diff --git a/agent/backup.go b/agent/backup.go index 2d6a555bd..ba072fb85 100644 --- a/agent/backup.go +++ b/agent/backup.go @@ -67,16 +67,19 @@ func (a *Agent) Backup(cmd *pbm.BackupCmd, opid pbm.OPID, ep pbm.Epoch) { return } - q, err := backup.NodeSuits(a.node, nodeInfo) + isClusterLeader := nodeInfo.IsClusterLeader() + canRunBackup, err := backup.NodeSuitsExt(a.node, nodeInfo, cmd.Type) if err != nil { l.Error("node check: %v", err) - return + if !isClusterLeader { + return + } } - - // node is not suitable for doing backup - if !q { + if !canRunBackup { l.Info("node is not suitable for backup") - return + if !isClusterLeader { + return + } } // wakeup the slicer not to wait for the tick @@ -109,7 +112,7 @@ func (a *Agent) Backup(cmd *pbm.BackupCmd, opid pbm.OPID, ep pbm.Epoch) { } bcp.SetTimeouts(cfg.Backup.Timeouts) - if nodeInfo.IsClusterLeader() { + if isClusterLeader { balancer := pbm.BalancerModeOff if nodeInfo.IsSharded() { bs, err := a.pbm.GetBalancerStatus() @@ -145,7 +148,13 @@ func (a *Agent) Backup(cmd *pbm.BackupCmd, opid pbm.OPID, ep pbm.Epoch) { } } } - nodes, err := a.pbm.BcpNodesPriority(c) + + exclude := "" + if !canRunBackup { + exclude = a.node.Name() + } + + nodes, err := a.pbm.BcpNodesPriority(c, exclude) if err != nil { l.Error("get nodes priority: %v", err) return diff --git a/cli/backup.go b/cli/backup.go index ec1a7b88c..38d170618 100644 --- a/cli/backup.go +++ b/cli/backup.go @@ -68,6 +68,121 @@ type descBcp struct { name string } +func checkTopoForBackup(cn *pbm.PBM, ts pbm.BackupType) error { + members, err := cn.ClusterMembers() + if err != nil { + return errors.WithMessage(err, "get cluster members") + } + + time, err := cn.ClusterTime() + if err != nil { + return errors.Wrap(err, "get cluster time") + } + + agentList, err := cn.ListAgents() + if err != nil { + return errors.WithMessage(err, "list agents") + } + + agents := make(map[string]map[string]pbm.AgentStat) + for _, a := range agentList { + if agents[a.RS] == nil { + agents[a.RS] = make(map[string]pbm.AgentStat) + } + agents[a.RS][a.Node] = a + } + + return collectTopoCheckErrors(members, agents, time, ts) +} + +type ( + ReplsetName = string + NodeURI = string +) + +type topoCheckError struct { + Replsets map[ReplsetName]map[NodeURI][]error + Missed []string +} + +func (r topoCheckError) hasError() bool { + return len(r.Missed) != 0 +} + +func (r topoCheckError) Error() string { + if !r.hasError() { + return "" + } + + return fmt.Sprintf("no available agent(s) on replsets: %s", strings.Join(r.Missed, ", ")) +} + +func collectTopoCheckErrors( + replsets []pbm.Shard, + agentsByRS map[ReplsetName]map[NodeURI]pbm.AgentStat, + ts primitive.Timestamp, + type_ pbm.BackupType, +) error { + rv := topoCheckError{ + Replsets: make(map[string]map[NodeURI][]error), + Missed: make([]string, 0), + } + + for _, rs := range replsets { + rsName, uri, _ := strings.Cut(rs.Host, "/") + agents := agentsByRS[rsName] + if len(agents) == 0 { + rv.Missed = append(rv.Missed, rsName) + continue + } + + hosts := strings.Split(uri, ",") + members := make(map[NodeURI][]error, len(hosts)) + anyAvail := false + for _, host := range hosts { + a, ok := agents[host] + if !ok || a.Arbiter || a.Passive { + continue + } + + errs := []error{} + if a.Err != "" { + errs = append(errs, errors.New(a.Err)) + } + if ok, estrs := a.OK(); !ok { + for _, e := range estrs { + errs = append(errs, errors.New(e)) + } + } + + const maxReplicationLag uint32 = 35 + if ts.T-a.Heartbeat.T > maxReplicationLag { + errs = append(errs, errors.New("stale")) + } + if err := pbm.FeatureSupport(a.MongoVersion()).BackupType(type_); err != nil { + errs = append(errs, errors.WithMessage(err, "unsupported backup type")) + } + + members[host] = errs + if len(errs) == 0 { + anyAvail = true + } + } + + rv.Replsets[rsName] = members + + if !anyAvail { + rv.Missed = append(rv.Missed, rsName) + } + } + + if rv.hasError() { + return rv + } + + return nil +} + func runBackup(cn *pbm.PBM, b *backupOpts, outf outFormat) (fmt.Stringer, error) { nss, err := parseCLINSOption(b.ns) if err != nil { @@ -80,12 +195,8 @@ func runBackup(cn *pbm.PBM, b *backupOpts, outf outFormat) (fmt.Stringer, error) return nil, errors.New("--ns flag is only allowed for logical backup") } - ver, err := pbm.GetMongoVersion(cn.Context(), cn.Conn) - if err != nil { - return nil, errors.WithMessage(err, "get mongo version") - } - if err = pbm.FeatureSupport(ver).BackupType(pbm.BackupType(b.typ)); err != nil { - return nil, errors.WithMessage(err, "unsupported backup type") + if err := checkTopoForBackup(cn, pbm.BackupType(b.typ)); err != nil { + return nil, errors.WithMessage(err, "backup pre-check") } if err := checkConcurrentOp(cn); err != nil { diff --git a/cli/status.go b/cli/status.go index 73507fb4b..5a365fb90 100644 --- a/cli/status.go +++ b/cli/status.go @@ -276,7 +276,7 @@ func clusterStatus(cn *pbm.PBM, uri string) (fmt.Stringer, error) { nd.Errs = append(nd.Errs, fmt.Sprintf("ERROR: lost agent, last heartbeat: %v", stat.Heartbeat.T)) continue } - nd.Ver = "v" + stat.Ver + nd.Ver = "v" + stat.AgentVer nd.OK, nd.Errs = stat.OK() } diff --git a/pbm/agent_status.go b/pbm/agent_status.go index 76e9003f6..474376712 100644 --- a/pbm/agent_status.go +++ b/pbm/agent_status.go @@ -3,12 +3,15 @@ package pbm import ( "context" "fmt" + "strconv" + "strings" "github.com/pkg/errors" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" + "golang.org/x/mod/semver" ) type AgentStat struct { @@ -18,7 +21,10 @@ type AgentStat struct { StateStr string `bson:"str"` Hidden bool `bson:"hdn"` Passive bool `bson:"psv"` - Ver string `bson:"v"` + Arbiter bool `bson:"arb"` + AgentVer string `bson:"v"` + MongoVer string `bson:"mv"` + PerconaVer string `bson:"pv,omitempty"` PBMStatus SubsysStatus `bson:"pbms"` NodeStatus SubsysStatus `bson:"nodes"` StorageStatus SubsysStatus `bson:"stors"` @@ -50,6 +56,22 @@ func (s *AgentStat) OK() (bool, []string) { return ok, errs } +func (s *AgentStat) MongoVersion() MongoVersion { + v := MongoVersion{ + PSMDBVersion: s.PerconaVer, + VersionString: s.MongoVer, + } + + vs := semver.Canonical("v" + s.MongoVer)[1:] + vs = strings.SplitN(vs, "-", 2)[0] + for _, a := range strings.Split(vs, ".")[:3] { + n, _ := strconv.Atoi(a) + v.Version = append(v.Version, n) + } + + return v +} + func (p *PBM) SetAgentStatus(stat AgentStat) error { ct, err := p.ClusterTime() if err != nil { @@ -117,6 +139,10 @@ func (p *PBM) ListAgentStatuses() ([]AgentStat, error) { return nil, errors.WithMessage(err, "remove stale statuses") } + return p.ListAgents() +} + +func (p *PBM) ListAgents() ([]AgentStat, error) { cur, err := p.Conn.Database(DB).Collection(AgentsStatusCollection).Find(p.ctx, bson.M{}) if err != nil { return nil, errors.WithMessage(err, "query") diff --git a/pbm/backup/backup.go b/pbm/backup/backup.go index 8fcf12eb8..b90b87f56 100644 --- a/pbm/backup/backup.go +++ b/pbm/backup/backup.go @@ -357,6 +357,20 @@ func NodeSuits(node *pbm.Node, inf *pbm.NodeInfo) (bool, error) { nil } +func NodeSuitsExt(node *pbm.Node, inf *pbm.NodeInfo, t pbm.BackupType) (bool, error) { + if ok, err := NodeSuits(node, inf); err != nil || !ok { + return false, err + } + + ver, err := node.GetMongoVersion() + if err != nil { + return false, errors.Wrap(err, "get mongo version") + } + + err = pbm.FeatureSupport(*ver).BackupType(t) + return err == nil, err +} + // rwError multierror for the read/compress/write-to-store operations set type rwError struct { read error diff --git a/pbm/bcp_nodes_priority.go b/pbm/bcp_nodes_priority.go index d3820edd4..5c34bfc96 100644 --- a/pbm/bcp_nodes_priority.go +++ b/pbm/bcp_nodes_priority.go @@ -41,7 +41,7 @@ type agentScore func(AgentStat) float64 // in descended order. First are nodes with the highest priority. // Custom coefficients might be passed. These will be ignored though // if the config is set. -func (p *PBM) BcpNodesPriority(c map[string]float64) (*NodesPriority, error) { +func (p *PBM) BcpNodesPriority(c map[string]float64, exclude string) (*NodesPriority, error) { cfg, err := p.GetConfig() if err != nil { return nil, errors.Wrap(err, "get config") @@ -50,6 +50,14 @@ func (p *PBM) BcpNodesPriority(c map[string]float64) (*NodesPriority, error) { if err != nil { return nil, errors.Wrap(err, "get agents list") } + if exclude != "" { + for i := range agents { + if agents[i].Node == exclude { + agents = append(agents[:i], agents[i+1:]...) + break + } + } + } // if cfg.Backup.Priority doesn't set apply defaults f := func(a AgentStat) float64 { diff --git a/pbm/bsontypes.go b/pbm/bsontypes.go index aaf13c4bf..01f1cfbfb 100644 --- a/pbm/bsontypes.go +++ b/pbm/bsontypes.go @@ -50,6 +50,11 @@ type NodeInfo struct { opts MongodOpts } +// IsSharded returns true is replset is part sharded cluster +func (i *NodeInfo) IsMongos() bool { + return i.Msg == "isdbgrid" +} + // IsSharded returns true is replset is part sharded cluster func (i *NodeInfo) IsSharded() bool { return i.SetName != "" && (i.ConfigServerState != nil || i.opts.Sharding.ClusterRole != "" || i.ConfigSvr == 2) diff --git a/pbm/pbm.go b/pbm/pbm.go index 935411493..325c4450f 100644 --- a/pbm/pbm.go +++ b/pbm/pbm.go @@ -925,11 +925,14 @@ func (p *PBM) ClusterMembers() ([]Shard, error) { return nil, errors.Wrap(err, "define cluster state") } - shards := []Shard{{ - RS: inf.SetName, - Host: inf.SetName + "/" + strings.Join(inf.Hosts, ","), - }} - if inf.IsSharded() { + shards := []Shard{} + if !inf.IsMongos() { + shards = append(shards, Shard{ + RS: inf.SetName, + Host: inf.SetName + "/" + strings.Join(inf.Hosts, ","), + }) + } + if inf.IsMongos() || inf.IsSharded() { s, err := p.GetShards() if err != nil { return nil, errors.Wrap(err, "get shards") @@ -979,6 +982,9 @@ func (p *PBM) GetNodeInfo() (*NodeInfo, error) { if err != nil { return nil, errors.Wrap(err, "get NodeInfo") } + if inf.IsMongos() { + return inf, nil + } opts := struct { Parsed MongodOpts `bson:"parsed" json:"parsed"`