Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PBM-1171] support mixed env in pre-check validation #872

Merged
merged 2 commits into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,10 +472,19 @@ func (a *Agent) HbIsRun() bool {
}

func (a *Agent) HbStatus() {
l := a.log.NewEvent("agentCheckup", "", "", primitive.Timestamp{})

nodeVersion, err := a.node.GetMongoVersion()
if err != nil {
l.Error("get mongo version: %v", err)
}

hb := pbm.AgentStat{
Node: a.node.Name(),
RS: a.node.RS(),
Ver: version.Current().Version,
Node: a.node.Name(),
RS: a.node.RS(),
AgentVer: version.Current().Version,
MongoVer: nodeVersion.VersionString,
PerconaVer: nodeVersion.PSMDBVersion,
}
defer func() {
if err := a.pbm.RemoveAgentStatus(hb); err != nil {
Expand All @@ -487,8 +496,6 @@ func (a *Agent) HbStatus() {
tk := time.NewTicker(pbm.AgentsStatCheckRange)
defer tk.Stop()

l := a.log.NewEvent("agentCheckup", "", "", primitive.Timestamp{})

// check storage once in a while if all is ok (see https://jira.percona.com/browse/PBM-647)
const checkStoreIn = int(60 / (pbm.AgentsStatCheckRange / time.Second))
cc := 0
Expand Down Expand Up @@ -535,6 +542,7 @@ func (a *Agent) HbStatus() {
hb.Hidden = inf.Hidden
hb.Passive = inf.Passive
}
hb.Arbiter = inf.ArbiterOnly

err = a.pbm.SetAgentStatus(hb)
if err != nil {
Expand Down
41 changes: 33 additions & 8 deletions agent/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,19 @@ func (a *Agent) Backup(cmd *pbm.BackupCmd, opid pbm.OPID, ep pbm.Epoch) {
return
}

q, err := backup.NodeSuits(a.node, nodeInfo)
isClusterLeader := nodeInfo.IsClusterLeader()
canRunBackup, err := backup.NodeSuitsExt(a.node, nodeInfo, cmd.Type)
if err != nil {
l.Error("node check: %v", err)
return
if !isClusterLeader {
return
}
}

// node is not suitable for doing backup
if !q {
if !canRunBackup {
l.Info("node is not suitable for backup")
return
if !isClusterLeader {
return
}
}

// wakeup the slicer not to wait for the tick
Expand Down Expand Up @@ -109,7 +112,7 @@ func (a *Agent) Backup(cmd *pbm.BackupCmd, opid pbm.OPID, ep pbm.Epoch) {
}
bcp.SetTimeouts(cfg.Backup.Timeouts)

if nodeInfo.IsClusterLeader() {
if isClusterLeader {
balancer := pbm.BalancerModeOff
if nodeInfo.IsSharded() {
bs, err := a.pbm.GetBalancerStatus()
Expand All @@ -128,6 +131,12 @@ func (a *Agent) Backup(cmd *pbm.BackupCmd, opid pbm.OPID, ep pbm.Epoch) {
}
l.Debug("init backup meta")

if err = pbm.CheckTopoForBackup(a.pbm, cmd.Type); err != nil {
ferr := a.pbm.ChangeBackupState(cmd.Name, pbm.StatusError, err.Error())
l.Info("mark backup as %s `%v`: %v", pbm.StatusError, err, ferr)
return
}

// Incremental backup history is stored by WiredTiger on the node
// not replset. So an `incremental && not_base` backup should land on
// the agent that made a previous (src) backup.
Expand All @@ -145,7 +154,23 @@ func (a *Agent) Backup(cmd *pbm.BackupCmd, opid pbm.OPID, ep pbm.Epoch) {
}
}
}
nodes, err := a.pbm.BcpNodesPriority(c)

agents, err := a.pbm.ListAgentStatuses()
if err != nil {
l.Error("get agents list: %v", err)
return
}

validCandidates := make([]pbm.AgentStat, 0, len(agents))
for _, s := range agents {
if pbm.FeatureSupport(s.MongoVersion()).BackupType(cmd.Type) != nil {
continue
}

validCandidates = append(validCandidates, s)
}

nodes, err := a.pbm.BcpNodesPriority(c, validCandidates)
if err != nil {
l.Error("get nodes priority: %v", err)
return
Expand Down
8 changes: 2 additions & 6 deletions cli/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,8 @@ func runBackup(cn *pbm.PBM, b *backupOpts, outf outFormat) (fmt.Stringer, error)
return nil, errors.New("--ns flag is only allowed for logical backup")
}

ver, err := pbm.GetMongoVersion(cn.Context(), cn.Conn)
if err != nil {
return nil, errors.WithMessage(err, "get mongo version")
}
if err = pbm.FeatureSupport(ver).BackupType(pbm.BackupType(b.typ)); err != nil {
return nil, errors.WithMessage(err, "unsupported backup type")
if err := pbm.CheckTopoForBackup(cn, pbm.BackupType(b.typ)); err != nil {
return nil, errors.WithMessage(err, "backup pre-check")
}

if err := checkConcurrentOp(cn); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cli/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func clusterStatus(cn *pbm.PBM, uri string) (fmt.Stringer, error) {
nd.Errs = append(nd.Errs, fmt.Sprintf("ERROR: lost agent, last heartbeat: %v", stat.Heartbeat.T))
continue
}
nd.Ver = "v" + stat.Ver
nd.Ver = "v" + stat.AgentVer
nd.OK, nd.Errs = stat.OK()
}

Expand Down
28 changes: 27 additions & 1 deletion pbm/agent_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ package pbm
import (
"context"
"fmt"
"strconv"
"strings"

"github.com/pkg/errors"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"golang.org/x/mod/semver"
)

type AgentStat struct {
Expand All @@ -18,7 +21,10 @@ type AgentStat struct {
StateStr string `bson:"str"`
Hidden bool `bson:"hdn"`
Passive bool `bson:"psv"`
Ver string `bson:"v"`
Arbiter bool `bson:"arb"`
AgentVer string `bson:"v"`
MongoVer string `bson:"mv"`
PerconaVer string `bson:"pv,omitempty"`
PBMStatus SubsysStatus `bson:"pbms"`
NodeStatus SubsysStatus `bson:"nodes"`
StorageStatus SubsysStatus `bson:"stors"`
Expand Down Expand Up @@ -50,6 +56,22 @@ func (s *AgentStat) OK() (bool, []string) {
return ok, errs
}

func (s *AgentStat) MongoVersion() MongoVersion {
v := MongoVersion{
PSMDBVersion: s.PerconaVer,
VersionString: s.MongoVer,
}

vs := semver.Canonical("v" + s.MongoVer)[1:]
vs = strings.SplitN(vs, "-", 2)[0]
for _, a := range strings.Split(vs, ".")[:3] {
n, _ := strconv.Atoi(a)
v.Version = append(v.Version, n)
}

return v
}

func (p *PBM) SetAgentStatus(stat AgentStat) error {
ct, err := p.ClusterTime()
if err != nil {
Expand Down Expand Up @@ -117,6 +139,10 @@ func (p *PBM) ListAgentStatuses() ([]AgentStat, error) {
return nil, errors.WithMessage(err, "remove stale statuses")
}

return p.ListAgents()
}

func (p *PBM) ListAgents() ([]AgentStat, error) {
cur, err := p.Conn.Database(DB).Collection(AgentsStatusCollection).Find(p.ctx, bson.M{})
if err != nil {
return nil, errors.WithMessage(err, "query")
Expand Down
14 changes: 14 additions & 0 deletions pbm/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,20 @@ func NodeSuits(node *pbm.Node, inf *pbm.NodeInfo) (bool, error) {
nil
}

func NodeSuitsExt(node *pbm.Node, inf *pbm.NodeInfo, t pbm.BackupType) (bool, error) {
if ok, err := NodeSuits(node, inf); err != nil || !ok {
return false, err
}

ver, err := node.GetMongoVersion()
if err != nil {
return false, errors.Wrap(err, "get mongo version")
}

err = pbm.FeatureSupport(*ver).BackupType(t)
return err == nil, err
}

// rwError multierror for the read/compress/write-to-store operations set
type rwError struct {
read error
Expand Down
6 changes: 1 addition & 5 deletions pbm/bcp_nodes_priority.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,11 @@ type agentScore func(AgentStat) float64
// in descended order. First are nodes with the highest priority.
// Custom coefficients might be passed. These will be ignored though
// if the config is set.
func (p *PBM) BcpNodesPriority(c map[string]float64) (*NodesPriority, error) {
func (p *PBM) BcpNodesPriority(c map[string]float64, agents []AgentStat) (*NodesPriority, error) {
cfg, err := p.GetConfig()
if err != nil {
return nil, errors.Wrap(err, "get config")
}
agents, err := p.ListAgentStatuses()
if err != nil {
return nil, errors.Wrap(err, "get agents list")
}

// if cfg.Backup.Priority doesn't set apply defaults
f := func(a AgentStat) float64 {
Expand Down
5 changes: 5 additions & 0 deletions pbm/bsontypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ type NodeInfo struct {
opts MongodOpts
}

// IsSharded returns true is replset is part sharded cluster
func (i *NodeInfo) IsMongos() bool {
return i.Msg == "isdbgrid"
}

// IsSharded returns true is replset is part sharded cluster
func (i *NodeInfo) IsSharded() bool {
return i.SetName != "" && (i.ConfigServerState != nil || i.opts.Sharding.ClusterRole != "" || i.ConfigSvr == 2)
Expand Down
44 changes: 39 additions & 5 deletions pbm/pbm.go
Original file line number Diff line number Diff line change
Expand Up @@ -925,16 +925,47 @@ func (p *PBM) ClusterMembers() ([]Shard, error) {
return nil, errors.Wrap(err, "define cluster state")
}

if inf.IsMongos() || inf.IsSharded() {
return getClusterMembersImpl(p.ctx, p.Conn)
}

shards := []Shard{{
RS: inf.SetName,
Host: inf.SetName + "/" + strings.Join(inf.Hosts, ","),
}}
if inf.IsSharded() {
s, err := p.GetShards()
if err != nil {
return nil, errors.Wrap(err, "get shards")
return shards, nil
}

func getClusterMembersImpl(ctx context.Context, m *mongo.Client) ([]Shard, error) {
res := m.Database("admin").RunCommand(ctx, bson.D{{"getShardMap", 1}})
if err := res.Err(); err != nil {
return nil, errors.WithMessage(err, "query")
}

// the map field is mapping of shard names to replset uri
// if shard name is not set, mongodb will provide unique name for it
// (e.g. the replset name of the shard)
// for configsvr, key name is "config"
var shardMap struct{ Map map[string]string }
if err := res.Decode(&shardMap); err != nil {
return nil, errors.WithMessage(err, "decode")
}

shards := make([]Shard, 0, len(shardMap.Map))
for id, host := range shardMap.Map {
if id == "<local>" || strings.ContainsAny(id, "/:") {
// till 4.2, map field is like connStrings (added in 4.4)
// and <local> key is uri of the directly (w/o mongos) connected replset
// skip not shard name
continue
}
shards = append(shards, s...)

rs, _, _ := strings.Cut(host, "/")
shards = append(shards, Shard{
ID: id,
RS: rs,
Host: host,
})
}

return shards, nil
Expand Down Expand Up @@ -979,6 +1010,9 @@ func (p *PBM) GetNodeInfo() (*NodeInfo, error) {
if err != nil {
return nil, errors.Wrap(err, "get NodeInfo")
}
if inf.IsMongos() {
return inf, nil
}

opts := struct {
Parsed MongodOpts `bson:"parsed" json:"parsed"`
Expand Down
Loading
Loading