diff --git a/agent/agent.go b/agent/agent.go index a68017780..197f8cf45 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -472,10 +472,19 @@ func (a *Agent) HbIsRun() bool { } func (a *Agent) HbStatus() { + l := a.log.NewEvent("agentCheckup", "", "", primitive.Timestamp{}) + + nodeVersion, err := a.node.GetMongoVersion() + if err != nil { + l.Error("get mongo version: %v", err) + } + hb := pbm.AgentStat{ - Node: a.node.Name(), - RS: a.node.RS(), - Ver: version.Current().Version, + Node: a.node.Name(), + RS: a.node.RS(), + AgentVer: version.Current().Version, + MongoVer: nodeVersion.VersionString, + PerconaVer: nodeVersion.PSMDBVersion, } defer func() { if err := a.pbm.RemoveAgentStatus(hb); err != nil { @@ -487,8 +496,6 @@ func (a *Agent) HbStatus() { tk := time.NewTicker(pbm.AgentsStatCheckRange) defer tk.Stop() - l := a.log.NewEvent("agentCheckup", "", "", primitive.Timestamp{}) - // check storage once in a while if all is ok (see https://jira.percona.com/browse/PBM-647) const checkStoreIn = int(60 / (pbm.AgentsStatCheckRange / time.Second)) cc := 0 @@ -535,6 +542,7 @@ func (a *Agent) HbStatus() { hb.Hidden = inf.Hidden hb.Passive = inf.Passive } + hb.Arbiter = inf.ArbiterOnly err = a.pbm.SetAgentStatus(hb) if err != nil { diff --git a/agent/backup.go b/agent/backup.go index 2d6a555bd..b4306ad5c 100644 --- a/agent/backup.go +++ b/agent/backup.go @@ -67,16 +67,19 @@ func (a *Agent) Backup(cmd *pbm.BackupCmd, opid pbm.OPID, ep pbm.Epoch) { return } - q, err := backup.NodeSuits(a.node, nodeInfo) + isClusterLeader := nodeInfo.IsClusterLeader() + canRunBackup, err := backup.NodeSuitsExt(a.node, nodeInfo, cmd.Type) if err != nil { l.Error("node check: %v", err) - return + if !isClusterLeader { + return + } } - - // node is not suitable for doing backup - if !q { + if !canRunBackup { l.Info("node is not suitable for backup") - return + if !isClusterLeader { + return + } } // wakeup the slicer not to wait for the tick @@ -109,7 +112,7 @@ func (a *Agent) Backup(cmd *pbm.BackupCmd, opid pbm.OPID, ep pbm.Epoch) { } bcp.SetTimeouts(cfg.Backup.Timeouts) - if nodeInfo.IsClusterLeader() { + if isClusterLeader { balancer := pbm.BalancerModeOff if nodeInfo.IsSharded() { bs, err := a.pbm.GetBalancerStatus() @@ -128,6 +131,12 @@ func (a *Agent) Backup(cmd *pbm.BackupCmd, opid pbm.OPID, ep pbm.Epoch) { } l.Debug("init backup meta") + if err = pbm.CheckTopoForBackup(a.pbm, cmd.Type); err != nil { + ferr := a.pbm.ChangeBackupState(cmd.Name, pbm.StatusError, err.Error()) + l.Info("mark backup as %s `%v`: %v", pbm.StatusError, err, ferr) + return + } + // Incremental backup history is stored by WiredTiger on the node // not replset. So an `incremental && not_base` backup should land on // the agent that made a previous (src) backup. @@ -145,7 +154,23 @@ func (a *Agent) Backup(cmd *pbm.BackupCmd, opid pbm.OPID, ep pbm.Epoch) { } } } - nodes, err := a.pbm.BcpNodesPriority(c) + + agents, err := a.pbm.ListAgentStatuses() + if err != nil { + l.Error("get agents list: %v", err) + return + } + + validCandidates := make([]pbm.AgentStat, 0, len(agents)) + for _, s := range agents { + if pbm.FeatureSupport(s.MongoVersion()).BackupType(cmd.Type) != nil { + continue + } + + validCandidates = append(validCandidates, s) + } + + nodes, err := a.pbm.BcpNodesPriority(c, validCandidates) if err != nil { l.Error("get nodes priority: %v", err) return diff --git a/cli/backup.go b/cli/backup.go index ec1a7b88c..575a6df81 100644 --- a/cli/backup.go +++ b/cli/backup.go @@ -80,12 +80,8 @@ func runBackup(cn *pbm.PBM, b *backupOpts, outf outFormat) (fmt.Stringer, error) return nil, errors.New("--ns flag is only allowed for logical backup") } - ver, err := pbm.GetMongoVersion(cn.Context(), cn.Conn) - if err != nil { - return nil, errors.WithMessage(err, "get mongo version") - } - if err = pbm.FeatureSupport(ver).BackupType(pbm.BackupType(b.typ)); err != nil { - return nil, errors.WithMessage(err, "unsupported backup type") + if err := pbm.CheckTopoForBackup(cn, pbm.BackupType(b.typ)); err != nil { + return nil, errors.WithMessage(err, "backup pre-check") } if err := checkConcurrentOp(cn); err != nil { diff --git a/cli/status.go b/cli/status.go index 73507fb4b..5a365fb90 100644 --- a/cli/status.go +++ b/cli/status.go @@ -276,7 +276,7 @@ func clusterStatus(cn *pbm.PBM, uri string) (fmt.Stringer, error) { nd.Errs = append(nd.Errs, fmt.Sprintf("ERROR: lost agent, last heartbeat: %v", stat.Heartbeat.T)) continue } - nd.Ver = "v" + stat.Ver + nd.Ver = "v" + stat.AgentVer nd.OK, nd.Errs = stat.OK() } diff --git a/pbm/agent_status.go b/pbm/agent_status.go index 76e9003f6..474376712 100644 --- a/pbm/agent_status.go +++ b/pbm/agent_status.go @@ -3,12 +3,15 @@ package pbm import ( "context" "fmt" + "strconv" + "strings" "github.com/pkg/errors" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" + "golang.org/x/mod/semver" ) type AgentStat struct { @@ -18,7 +21,10 @@ type AgentStat struct { StateStr string `bson:"str"` Hidden bool `bson:"hdn"` Passive bool `bson:"psv"` - Ver string `bson:"v"` + Arbiter bool `bson:"arb"` + AgentVer string `bson:"v"` + MongoVer string `bson:"mv"` + PerconaVer string `bson:"pv,omitempty"` PBMStatus SubsysStatus `bson:"pbms"` NodeStatus SubsysStatus `bson:"nodes"` StorageStatus SubsysStatus `bson:"stors"` @@ -50,6 +56,22 @@ func (s *AgentStat) OK() (bool, []string) { return ok, errs } +func (s *AgentStat) MongoVersion() MongoVersion { + v := MongoVersion{ + PSMDBVersion: s.PerconaVer, + VersionString: s.MongoVer, + } + + vs := semver.Canonical("v" + s.MongoVer)[1:] + vs = strings.SplitN(vs, "-", 2)[0] + for _, a := range strings.Split(vs, ".")[:3] { + n, _ := strconv.Atoi(a) + v.Version = append(v.Version, n) + } + + return v +} + func (p *PBM) SetAgentStatus(stat AgentStat) error { ct, err := p.ClusterTime() if err != nil { @@ -117,6 +139,10 @@ func (p *PBM) ListAgentStatuses() ([]AgentStat, error) { return nil, errors.WithMessage(err, "remove stale statuses") } + return p.ListAgents() +} + +func (p *PBM) ListAgents() ([]AgentStat, error) { cur, err := p.Conn.Database(DB).Collection(AgentsStatusCollection).Find(p.ctx, bson.M{}) if err != nil { return nil, errors.WithMessage(err, "query") diff --git a/pbm/backup/backup.go b/pbm/backup/backup.go index 8fcf12eb8..b90b87f56 100644 --- a/pbm/backup/backup.go +++ b/pbm/backup/backup.go @@ -357,6 +357,20 @@ func NodeSuits(node *pbm.Node, inf *pbm.NodeInfo) (bool, error) { nil } +func NodeSuitsExt(node *pbm.Node, inf *pbm.NodeInfo, t pbm.BackupType) (bool, error) { + if ok, err := NodeSuits(node, inf); err != nil || !ok { + return false, err + } + + ver, err := node.GetMongoVersion() + if err != nil { + return false, errors.Wrap(err, "get mongo version") + } + + err = pbm.FeatureSupport(*ver).BackupType(t) + return err == nil, err +} + // rwError multierror for the read/compress/write-to-store operations set type rwError struct { read error diff --git a/pbm/bcp_nodes_priority.go b/pbm/bcp_nodes_priority.go index d3820edd4..e4de5d970 100644 --- a/pbm/bcp_nodes_priority.go +++ b/pbm/bcp_nodes_priority.go @@ -41,15 +41,11 @@ type agentScore func(AgentStat) float64 // in descended order. First are nodes with the highest priority. // Custom coefficients might be passed. These will be ignored though // if the config is set. -func (p *PBM) BcpNodesPriority(c map[string]float64) (*NodesPriority, error) { +func (p *PBM) BcpNodesPriority(c map[string]float64, agents []AgentStat) (*NodesPriority, error) { cfg, err := p.GetConfig() if err != nil { return nil, errors.Wrap(err, "get config") } - agents, err := p.ListAgentStatuses() - if err != nil { - return nil, errors.Wrap(err, "get agents list") - } // if cfg.Backup.Priority doesn't set apply defaults f := func(a AgentStat) float64 { diff --git a/pbm/bsontypes.go b/pbm/bsontypes.go index aaf13c4bf..01f1cfbfb 100644 --- a/pbm/bsontypes.go +++ b/pbm/bsontypes.go @@ -50,6 +50,11 @@ type NodeInfo struct { opts MongodOpts } +// IsSharded returns true is replset is part sharded cluster +func (i *NodeInfo) IsMongos() bool { + return i.Msg == "isdbgrid" +} + // IsSharded returns true is replset is part sharded cluster func (i *NodeInfo) IsSharded() bool { return i.SetName != "" && (i.ConfigServerState != nil || i.opts.Sharding.ClusterRole != "" || i.ConfigSvr == 2) diff --git a/pbm/pbm.go b/pbm/pbm.go index 935411493..edf51a67e 100644 --- a/pbm/pbm.go +++ b/pbm/pbm.go @@ -925,16 +925,47 @@ func (p *PBM) ClusterMembers() ([]Shard, error) { return nil, errors.Wrap(err, "define cluster state") } + if inf.IsMongos() || inf.IsSharded() { + return getClusterMembersImpl(p.ctx, p.Conn) + } + shards := []Shard{{ RS: inf.SetName, Host: inf.SetName + "/" + strings.Join(inf.Hosts, ","), }} - if inf.IsSharded() { - s, err := p.GetShards() - if err != nil { - return nil, errors.Wrap(err, "get shards") + return shards, nil +} + +func getClusterMembersImpl(ctx context.Context, m *mongo.Client) ([]Shard, error) { + res := m.Database("admin").RunCommand(ctx, bson.D{{"getShardMap", 1}}) + if err := res.Err(); err != nil { + return nil, errors.WithMessage(err, "query") + } + + // the map field is mapping of shard names to replset uri + // if shard name is not set, mongodb will provide unique name for it + // (e.g. the replset name of the shard) + // for configsvr, key name is "config" + var shardMap struct{ Map map[string]string } + if err := res.Decode(&shardMap); err != nil { + return nil, errors.WithMessage(err, "decode") + } + + shards := make([]Shard, 0, len(shardMap.Map)) + for id, host := range shardMap.Map { + if id == "" || strings.ContainsAny(id, "/:") { + // till 4.2, map field is like connStrings (added in 4.4) + // and key is uri of the directly (w/o mongos) connected replset + // skip not shard name + continue } - shards = append(shards, s...) + + rs, _, _ := strings.Cut(host, "/") + shards = append(shards, Shard{ + ID: id, + RS: rs, + Host: host, + }) } return shards, nil @@ -979,6 +1010,9 @@ func (p *PBM) GetNodeInfo() (*NodeInfo, error) { if err != nil { return nil, errors.Wrap(err, "get NodeInfo") } + if inf.IsMongos() { + return inf, nil + } opts := struct { Parsed MongodOpts `bson:"parsed" json:"parsed"` diff --git a/pbm/topo.go b/pbm/topo.go new file mode 100644 index 000000000..e252c6cc2 --- /dev/null +++ b/pbm/topo.go @@ -0,0 +1,124 @@ +package pbm + +import ( + "fmt" + "strings" + + "github.com/pkg/errors" + "go.mongodb.org/mongo-driver/bson/primitive" +) + +func CheckTopoForBackup(cn *PBM, type_ BackupType) error { + members, err := cn.ClusterMembers() + if err != nil { + return errors.WithMessage(err, "get cluster members") + } + + ts, err := cn.ClusterTime() + if err != nil { + return errors.Wrap(err, "get cluster time") + } + + agentList, err := cn.ListAgents() + if err != nil { + return errors.WithMessage(err, "list agents") + } + + agents := make(map[string]map[string]AgentStat) + for _, a := range agentList { + if agents[a.RS] == nil { + agents[a.RS] = make(map[string]AgentStat) + } + agents[a.RS][a.Node] = a + } + + return collectTopoCheckErrors(members, agents, ts, type_) +} + +type ( + ReplsetName = string + NodeURI = string +) + +type topoCheckError struct { + Replsets map[ReplsetName]map[NodeURI][]error + Missed []string +} + +func (r topoCheckError) hasError() bool { + return len(r.Missed) != 0 +} + +func (r topoCheckError) Error() string { + if !r.hasError() { + return "" + } + + return fmt.Sprintf("no available agent(s) on replsets: %s", strings.Join(r.Missed, ", ")) +} + +func collectTopoCheckErrors( + replsets []Shard, + agentsByRS map[ReplsetName]map[NodeURI]AgentStat, + ts primitive.Timestamp, + type_ BackupType, +) error { + rv := topoCheckError{ + Replsets: make(map[string]map[NodeURI][]error), + Missed: make([]string, 0), + } + + for _, rs := range replsets { + rsName, uri, _ := strings.Cut(rs.Host, "/") + agents := agentsByRS[rsName] + if len(agents) == 0 { + rv.Missed = append(rv.Missed, rsName) + continue + } + + hosts := strings.Split(uri, ",") + members := make(map[NodeURI][]error, len(hosts)) + anyAvail := false + for _, host := range hosts { + a, ok := agents[host] + if !ok || a.Arbiter || a.Passive { + continue + } + + errs := []error{} + if a.Err != "" { + errs = append(errs, errors.New(a.Err)) + } + if ok, estrs := a.OK(); !ok { + for _, e := range estrs { + errs = append(errs, errors.New(e)) + } + } + + const maxReplicationLag uint32 = 35 + if ts.T-a.Heartbeat.T > maxReplicationLag { + errs = append(errs, errors.New("stale")) + } + if err := FeatureSupport(a.MongoVersion()).BackupType(type_); err != nil { + errs = append(errs, errors.WithMessage(err, "unsupported backup type")) + } + + members[host] = errs + if len(errs) == 0 { + anyAvail = true + } + } + + rv.Replsets[rsName] = members + + if !anyAvail { + rv.Missed = append(rv.Missed, rsName) + } + } + + if rv.hasError() { + return rv + } + + return nil +} diff --git a/pbm/version.go b/pbm/version.go index fac1d5fa8..4e16122ea 100644 --- a/pbm/version.go +++ b/pbm/version.go @@ -101,6 +101,11 @@ func (f FeatureSupport) BackupType(t BackupType) error { return errors.New("incremental physical backup works since " + "Percona Server for MongoDB 4.2.24, 4.4.18, 5.0.14, 6.0.3") } + case ExternalBackup: + if !f.FullPhysicalBackup() { + return errors.New("external backup works since " + + "Percona Server for MongoDB 4.2.15, 4.4.6") + } } return nil