Skip to content

Commit

Permalink
[PBM-1171] support mixed env in pre-check
Browse files Browse the repository at this point in the history
  • Loading branch information
defbin committed Sep 13, 2023
1 parent d21f1de commit 3033ea5
Show file tree
Hide file tree
Showing 11 changed files with 254 additions and 32 deletions.
18 changes: 13 additions & 5 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,10 +472,19 @@ func (a *Agent) HbIsRun() bool {
}

func (a *Agent) HbStatus() {
l := a.log.NewEvent("agentCheckup", "", "", primitive.Timestamp{})

nodeVersion, err := a.node.GetMongoVersion()
if err != nil {
l.Error("get mongo version: %v", err)
}

hb := pbm.AgentStat{
Node: a.node.Name(),
RS: a.node.RS(),
Ver: version.Current().Version,
Node: a.node.Name(),
RS: a.node.RS(),
AgentVer: version.Current().Version,
MongoVer: nodeVersion.VersionString,
PerconaVer: nodeVersion.PSMDBVersion,
}
defer func() {
if err := a.pbm.RemoveAgentStatus(hb); err != nil {
Expand All @@ -487,8 +496,6 @@ func (a *Agent) HbStatus() {
tk := time.NewTicker(pbm.AgentsStatCheckRange)
defer tk.Stop()

l := a.log.NewEvent("agentCheckup", "", "", primitive.Timestamp{})

// check storage once in a while if all is ok (see https://jira.percona.com/browse/PBM-647)
const checkStoreIn = int(60 / (pbm.AgentsStatCheckRange / time.Second))
cc := 0
Expand Down Expand Up @@ -535,6 +542,7 @@ func (a *Agent) HbStatus() {
hb.Hidden = inf.Hidden
hb.Passive = inf.Passive
}
hb.Arbiter = inf.ArbiterOnly

err = a.pbm.SetAgentStatus(hb)
if err != nil {
Expand Down
41 changes: 33 additions & 8 deletions agent/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,19 @@ func (a *Agent) Backup(cmd *pbm.BackupCmd, opid pbm.OPID, ep pbm.Epoch) {
return
}

q, err := backup.NodeSuits(a.node, nodeInfo)
isClusterLeader := nodeInfo.IsClusterLeader()
canRunBackup, err := backup.NodeSuitsExt(a.node, nodeInfo, cmd.Type)
if err != nil {
l.Error("node check: %v", err)
return
if !isClusterLeader {
return
}
}

// node is not suitable for doing backup
if !q {
if !canRunBackup {
l.Info("node is not suitable for backup")
return
if !isClusterLeader {
return
}
}

// wakeup the slicer not to wait for the tick
Expand Down Expand Up @@ -109,7 +112,7 @@ func (a *Agent) Backup(cmd *pbm.BackupCmd, opid pbm.OPID, ep pbm.Epoch) {
}
bcp.SetTimeouts(cfg.Backup.Timeouts)

if nodeInfo.IsClusterLeader() {
if isClusterLeader {
balancer := pbm.BalancerModeOff
if nodeInfo.IsSharded() {
bs, err := a.pbm.GetBalancerStatus()
Expand All @@ -128,6 +131,12 @@ func (a *Agent) Backup(cmd *pbm.BackupCmd, opid pbm.OPID, ep pbm.Epoch) {
}
l.Debug("init backup meta")

if err = pbm.CheckTopoForBackup(a.pbm, cmd.Type); err != nil {
ferr := a.pbm.ChangeBackupState(cmd.Name, pbm.StatusError, err.Error())
l.Info("mark backup as %s `%v`: %v", pbm.StatusError, err, ferr)
return
}

// Incremental backup history is stored by WiredTiger on the node
// not replset. So an `incremental && not_base` backup should land on
// the agent that made a previous (src) backup.
Expand All @@ -145,7 +154,23 @@ func (a *Agent) Backup(cmd *pbm.BackupCmd, opid pbm.OPID, ep pbm.Epoch) {
}
}
}
nodes, err := a.pbm.BcpNodesPriority(c)

agents, err := a.pbm.ListAgentStatuses()
if err != nil {
l.Error("get agents list: %v", err)
return
}

validCandidates := make([]pbm.AgentStat, 0, len(agents))
for _, s := range agents {
if pbm.FeatureSupport(s.MongoVersion()).BackupType(cmd.Type) != nil {
continue
}

validCandidates = append(validCandidates, s)
}

nodes, err := a.pbm.BcpNodesPriority(c, validCandidates)
if err != nil {
l.Error("get nodes priority: %v", err)
return
Expand Down
8 changes: 2 additions & 6 deletions cli/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,8 @@ func runBackup(cn *pbm.PBM, b *backupOpts, outf outFormat) (fmt.Stringer, error)
return nil, errors.New("--ns flag is only allowed for logical backup")
}

ver, err := pbm.GetMongoVersion(cn.Context(), cn.Conn)
if err != nil {
return nil, errors.WithMessage(err, "get mongo version")
}
if err = pbm.FeatureSupport(ver).BackupType(pbm.BackupType(b.typ)); err != nil {
return nil, errors.WithMessage(err, "unsupported backup type")
if err := pbm.CheckTopoForBackup(cn, pbm.BackupType(b.typ)); err != nil {
return nil, errors.WithMessage(err, "backup pre-check")
}

if err := checkConcurrentOp(cn); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cli/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func clusterStatus(cn *pbm.PBM, uri string) (fmt.Stringer, error) {
nd.Errs = append(nd.Errs, fmt.Sprintf("ERROR: lost agent, last heartbeat: %v", stat.Heartbeat.T))
continue
}
nd.Ver = "v" + stat.Ver
nd.Ver = "v" + stat.AgentVer
nd.OK, nd.Errs = stat.OK()
}

Expand Down
28 changes: 27 additions & 1 deletion pbm/agent_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ package pbm
import (
"context"
"fmt"
"strconv"
"strings"

"github.com/pkg/errors"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"golang.org/x/mod/semver"
)

type AgentStat struct {
Expand All @@ -18,7 +21,10 @@ type AgentStat struct {
StateStr string `bson:"str"`
Hidden bool `bson:"hdn"`
Passive bool `bson:"psv"`
Ver string `bson:"v"`
Arbiter bool `bson:"arb"`
AgentVer string `bson:"v"`
MongoVer string `bson:"mv"`
PerconaVer string `bson:"pv,omitempty"`
PBMStatus SubsysStatus `bson:"pbms"`
NodeStatus SubsysStatus `bson:"nodes"`
StorageStatus SubsysStatus `bson:"stors"`
Expand Down Expand Up @@ -50,6 +56,22 @@ func (s *AgentStat) OK() (bool, []string) {
return ok, errs
}

func (s *AgentStat) MongoVersion() MongoVersion {
v := MongoVersion{
PSMDBVersion: s.PerconaVer,
VersionString: s.MongoVer,
}

vs := semver.Canonical("v" + s.MongoVer)[1:]
vs = strings.SplitN(vs, "-", 2)[0]
for _, a := range strings.Split(vs, ".")[:3] {
n, _ := strconv.Atoi(a)
v.Version = append(v.Version, n)
}

return v
}

func (p *PBM) SetAgentStatus(stat AgentStat) error {
ct, err := p.ClusterTime()
if err != nil {
Expand Down Expand Up @@ -117,6 +139,10 @@ func (p *PBM) ListAgentStatuses() ([]AgentStat, error) {
return nil, errors.WithMessage(err, "remove stale statuses")
}

return p.ListAgents()
}

func (p *PBM) ListAgents() ([]AgentStat, error) {
cur, err := p.Conn.Database(DB).Collection(AgentsStatusCollection).Find(p.ctx, bson.M{})
if err != nil {
return nil, errors.WithMessage(err, "query")
Expand Down
14 changes: 14 additions & 0 deletions pbm/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,20 @@ func NodeSuits(node *pbm.Node, inf *pbm.NodeInfo) (bool, error) {
nil
}

func NodeSuitsExt(node *pbm.Node, inf *pbm.NodeInfo, t pbm.BackupType) (bool, error) {
if ok, err := NodeSuits(node, inf); err != nil || !ok {
return false, err
}

ver, err := node.GetMongoVersion()
if err != nil {
return false, errors.Wrap(err, "get mongo version")
}

err = pbm.FeatureSupport(*ver).BackupType(t)
return err == nil, err
}

// rwError multierror for the read/compress/write-to-store operations set
type rwError struct {
read error
Expand Down
6 changes: 1 addition & 5 deletions pbm/bcp_nodes_priority.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,11 @@ type agentScore func(AgentStat) float64
// in descended order. First are nodes with the highest priority.
// Custom coefficients might be passed. These will be ignored though
// if the config is set.
func (p *PBM) BcpNodesPriority(c map[string]float64) (*NodesPriority, error) {
func (p *PBM) BcpNodesPriority(c map[string]float64, agents []AgentStat) (*NodesPriority, error) {
cfg, err := p.GetConfig()
if err != nil {
return nil, errors.Wrap(err, "get config")
}
agents, err := p.ListAgentStatuses()
if err != nil {
return nil, errors.Wrap(err, "get agents list")
}

// if cfg.Backup.Priority doesn't set apply defaults
f := func(a AgentStat) float64 {
Expand Down
5 changes: 5 additions & 0 deletions pbm/bsontypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ type NodeInfo struct {
opts MongodOpts
}

// IsSharded returns true is replset is part sharded cluster
func (i *NodeInfo) IsMongos() bool {
return i.Msg == "isdbgrid"
}

// IsSharded returns true is replset is part sharded cluster
func (i *NodeInfo) IsSharded() bool {
return i.SetName != "" && (i.ConfigServerState != nil || i.opts.Sharding.ClusterRole != "" || i.ConfigSvr == 2)
Expand Down
35 changes: 29 additions & 6 deletions pbm/pbm.go
Original file line number Diff line number Diff line change
Expand Up @@ -925,16 +925,36 @@ func (p *PBM) ClusterMembers() ([]Shard, error) {
return nil, errors.Wrap(err, "define cluster state")
}

if inf.IsMongos() || inf.IsSharded() {
return getClusterMembersImpl(p.ctx, p.Conn)
}

shards := []Shard{{
RS: inf.SetName,
Host: inf.SetName + "/" + strings.Join(inf.Hosts, ","),
}}
if inf.IsSharded() {
s, err := p.GetShards()
if err != nil {
return nil, errors.Wrap(err, "get shards")
}
shards = append(shards, s...)
return shards, nil
}

func getClusterMembersImpl(ctx context.Context, m *mongo.Client) ([]Shard, error) {
res := m.Database("admin").RunCommand(ctx, bson.D{{"getShardMap", 1}})
if err := res.Err(); err != nil {
return nil, errors.WithMessage(err, "query")
}

var shardMap struct{ Map map[string]string }
if err := res.Decode(&shardMap); err != nil {
return nil, errors.WithMessage(err, "decode")
}

shards := make([]Shard, 0, len(shardMap.Map))
for id, host := range shardMap.Map {
rs, _, _ := strings.Cut(host, "/")
shards = append(shards, Shard{
ID: id,
RS: rs,
Host: host,
})
}

return shards, nil
Expand Down Expand Up @@ -979,6 +999,9 @@ func (p *PBM) GetNodeInfo() (*NodeInfo, error) {
if err != nil {
return nil, errors.Wrap(err, "get NodeInfo")
}
if inf.IsMongos() {
return inf, nil
}

opts := struct {
Parsed MongodOpts `bson:"parsed" json:"parsed"`
Expand Down
Loading

0 comments on commit 3033ea5

Please sign in to comment.