Skip to content

Commit

Permalink
[PBM-1171] support mixed env in pre-check
Browse files Browse the repository at this point in the history
  • Loading branch information
defbin committed Sep 13, 2023
1 parent d21f1de commit 9a0d75d
Show file tree
Hide file tree
Showing 9 changed files with 214 additions and 27 deletions.
18 changes: 13 additions & 5 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,10 +472,19 @@ func (a *Agent) HbIsRun() bool {
}

func (a *Agent) HbStatus() {
l := a.log.NewEvent("agentCheckup", "", "", primitive.Timestamp{})

nodeVersion, err := a.node.GetMongoVersion()
if err != nil {
l.Error("get mongo version: %v", err)
}

hb := pbm.AgentStat{
Node: a.node.Name(),
RS: a.node.RS(),
Ver: version.Current().Version,
Node: a.node.Name(),
RS: a.node.RS(),
AgentVer: version.Current().Version,
MongoVer: nodeVersion.VersionString,
PerconaVer: nodeVersion.PSMDBVersion,
}
defer func() {
if err := a.pbm.RemoveAgentStatus(hb); err != nil {
Expand All @@ -487,8 +496,6 @@ func (a *Agent) HbStatus() {
tk := time.NewTicker(pbm.AgentsStatCheckRange)
defer tk.Stop()

l := a.log.NewEvent("agentCheckup", "", "", primitive.Timestamp{})

// check storage once in a while if all is ok (see https://jira.percona.com/browse/PBM-647)
const checkStoreIn = int(60 / (pbm.AgentsStatCheckRange / time.Second))
cc := 0
Expand Down Expand Up @@ -535,6 +542,7 @@ func (a *Agent) HbStatus() {
hb.Hidden = inf.Hidden
hb.Passive = inf.Passive
}
hb.Arbiter = inf.ArbiterOnly

err = a.pbm.SetAgentStatus(hb)
if err != nil {
Expand Down
25 changes: 17 additions & 8 deletions agent/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,19 @@ func (a *Agent) Backup(cmd *pbm.BackupCmd, opid pbm.OPID, ep pbm.Epoch) {
return
}

q, err := backup.NodeSuits(a.node, nodeInfo)
isClusterLeader := nodeInfo.IsClusterLeader()
canRunBackup, err := backup.NodeSuitsExt(a.node, nodeInfo, cmd.Type)
if err != nil {
l.Error("node check: %v", err)
return
if !isClusterLeader {
return
}
}

// node is not suitable for doing backup
if !q {
if !canRunBackup {
l.Info("node is not suitable for backup")
return
if !isClusterLeader {
return
}
}

// wakeup the slicer not to wait for the tick
Expand Down Expand Up @@ -109,7 +112,7 @@ func (a *Agent) Backup(cmd *pbm.BackupCmd, opid pbm.OPID, ep pbm.Epoch) {
}
bcp.SetTimeouts(cfg.Backup.Timeouts)

if nodeInfo.IsClusterLeader() {
if isClusterLeader {
balancer := pbm.BalancerModeOff
if nodeInfo.IsSharded() {
bs, err := a.pbm.GetBalancerStatus()
Expand Down Expand Up @@ -145,7 +148,13 @@ func (a *Agent) Backup(cmd *pbm.BackupCmd, opid pbm.OPID, ep pbm.Epoch) {
}
}
}
nodes, err := a.pbm.BcpNodesPriority(c)

exclude := ""
if !canRunBackup {
exclude = a.node.Name()
}

nodes, err := a.pbm.BcpNodesPriority(c, exclude)
if err != nil {
l.Error("get nodes priority: %v", err)
return
Expand Down
123 changes: 117 additions & 6 deletions cli/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,121 @@ type descBcp struct {
name string
}

func checkTopoForBackup(cn *pbm.PBM, ts pbm.BackupType) error {
members, err := cn.ClusterMembers()
if err != nil {
return errors.WithMessage(err, "get cluster members")
}

time, err := cn.ClusterTime()
if err != nil {
return errors.Wrap(err, "get cluster time")
}

agentList, err := cn.ListAgents()
if err != nil {
return errors.WithMessage(err, "list agents")
}

agents := make(map[string]map[string]pbm.AgentStat)
for _, a := range agentList {
if agents[a.RS] == nil {
agents[a.RS] = make(map[string]pbm.AgentStat)
}
agents[a.RS][a.Node] = a
}

return collectTopoCheckErrors(members, agents, time, ts)
}

type (
ReplsetName = string
NodeURI = string
)

type topoCheckError struct {
Replsets map[ReplsetName]map[NodeURI][]error
Missed []string
}

func (r topoCheckError) hasError() bool {
return len(r.Missed) != 0
}

func (r topoCheckError) Error() string {
if !r.hasError() {
return ""
}

return fmt.Sprintf("no available agent(s) on replsets: %s", strings.Join(r.Missed, ", "))
}

func collectTopoCheckErrors(
replsets []pbm.Shard,
agentsByRS map[ReplsetName]map[NodeURI]pbm.AgentStat,
ts primitive.Timestamp,
type_ pbm.BackupType,
) error {
rv := topoCheckError{
Replsets: make(map[string]map[NodeURI][]error),
Missed: make([]string, 0),
}

for _, rs := range replsets {
rsName, uri, _ := strings.Cut(rs.Host, "/")
agents := agentsByRS[rsName]
if len(agents) == 0 {
rv.Missed = append(rv.Missed, rsName)
continue
}

hosts := strings.Split(uri, ",")
members := make(map[NodeURI][]error, len(hosts))
anyAvail := false
for _, host := range hosts {
a, ok := agents[host]
if !ok || a.Arbiter || a.Passive {
continue
}

errs := []error{}
if a.Err != "" {
errs = append(errs, errors.New(a.Err))
}
if ok, estrs := a.OK(); !ok {
for _, e := range estrs {
errs = append(errs, errors.New(e))
}
}

const maxReplicationLag uint32 = 35
if ts.T-a.Heartbeat.T > maxReplicationLag {
errs = append(errs, errors.New("stale"))
}
if err := pbm.FeatureSupport(a.MongoVersion()).BackupType(type_); err != nil {
errs = append(errs, errors.WithMessage(err, "unsupported backup type"))
}

members[host] = errs
if len(errs) == 0 {
anyAvail = true
}
}

rv.Replsets[rsName] = members

if !anyAvail {
rv.Missed = append(rv.Missed, rsName)
}
}

if rv.hasError() {
return rv
}

return nil
}

func runBackup(cn *pbm.PBM, b *backupOpts, outf outFormat) (fmt.Stringer, error) {
nss, err := parseCLINSOption(b.ns)
if err != nil {
Expand All @@ -80,12 +195,8 @@ func runBackup(cn *pbm.PBM, b *backupOpts, outf outFormat) (fmt.Stringer, error)
return nil, errors.New("--ns flag is only allowed for logical backup")
}

ver, err := pbm.GetMongoVersion(cn.Context(), cn.Conn)
if err != nil {
return nil, errors.WithMessage(err, "get mongo version")
}
if err = pbm.FeatureSupport(ver).BackupType(pbm.BackupType(b.typ)); err != nil {
return nil, errors.WithMessage(err, "unsupported backup type")
if err := checkTopoForBackup(cn, pbm.BackupType(b.typ)); err != nil {
return nil, errors.WithMessage(err, "backup pre-check")
}

if err := checkConcurrentOp(cn); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cli/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func clusterStatus(cn *pbm.PBM, uri string) (fmt.Stringer, error) {
nd.Errs = append(nd.Errs, fmt.Sprintf("ERROR: lost agent, last heartbeat: %v", stat.Heartbeat.T))
continue
}
nd.Ver = "v" + stat.Ver
nd.Ver = "v" + stat.AgentVer
nd.OK, nd.Errs = stat.OK()
}

Expand Down
28 changes: 27 additions & 1 deletion pbm/agent_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ package pbm
import (
"context"
"fmt"
"strconv"
"strings"

"github.com/pkg/errors"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"golang.org/x/mod/semver"
)

type AgentStat struct {
Expand All @@ -18,7 +21,10 @@ type AgentStat struct {
StateStr string `bson:"str"`
Hidden bool `bson:"hdn"`
Passive bool `bson:"psv"`
Ver string `bson:"v"`
Arbiter bool `bson:"arb"`
AgentVer string `bson:"v"`
MongoVer string `bson:"mv"`
PerconaVer string `bson:"pv,omitempty"`
PBMStatus SubsysStatus `bson:"pbms"`
NodeStatus SubsysStatus `bson:"nodes"`
StorageStatus SubsysStatus `bson:"stors"`
Expand Down Expand Up @@ -50,6 +56,22 @@ func (s *AgentStat) OK() (bool, []string) {
return ok, errs
}

func (s *AgentStat) MongoVersion() MongoVersion {
v := MongoVersion{
PSMDBVersion: s.PerconaVer,
VersionString: s.MongoVer,
}

vs := semver.Canonical("v" + s.MongoVer)[1:]
vs = strings.SplitN(vs, "-", 2)[0]
for _, a := range strings.Split(vs, ".")[:3] {
n, _ := strconv.Atoi(a)
v.Version = append(v.Version, n)
}

return v
}

func (p *PBM) SetAgentStatus(stat AgentStat) error {
ct, err := p.ClusterTime()
if err != nil {
Expand Down Expand Up @@ -117,6 +139,10 @@ func (p *PBM) ListAgentStatuses() ([]AgentStat, error) {
return nil, errors.WithMessage(err, "remove stale statuses")
}

return p.ListAgents()
}

func (p *PBM) ListAgents() ([]AgentStat, error) {
cur, err := p.Conn.Database(DB).Collection(AgentsStatusCollection).Find(p.ctx, bson.M{})
if err != nil {
return nil, errors.WithMessage(err, "query")
Expand Down
14 changes: 14 additions & 0 deletions pbm/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,20 @@ func NodeSuits(node *pbm.Node, inf *pbm.NodeInfo) (bool, error) {
nil
}

func NodeSuitsExt(node *pbm.Node, inf *pbm.NodeInfo, t pbm.BackupType) (bool, error) {
if ok, err := NodeSuits(node, inf); err != nil || !ok {
return false, err
}

ver, err := node.GetMongoVersion()
if err != nil {
return false, errors.Wrap(err, "get mongo version")
}

err = pbm.FeatureSupport(*ver).BackupType(t)
return err == nil, err
}

// rwError multierror for the read/compress/write-to-store operations set
type rwError struct {
read error
Expand Down
10 changes: 9 additions & 1 deletion pbm/bcp_nodes_priority.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type agentScore func(AgentStat) float64
// in descended order. First are nodes with the highest priority.
// Custom coefficients might be passed. These will be ignored though
// if the config is set.
func (p *PBM) BcpNodesPriority(c map[string]float64) (*NodesPriority, error) {
func (p *PBM) BcpNodesPriority(c map[string]float64, exclude string) (*NodesPriority, error) {
cfg, err := p.GetConfig()
if err != nil {
return nil, errors.Wrap(err, "get config")
Expand All @@ -50,6 +50,14 @@ func (p *PBM) BcpNodesPriority(c map[string]float64) (*NodesPriority, error) {
if err != nil {
return nil, errors.Wrap(err, "get agents list")
}
if exclude != "" {
for i := range agents {
if agents[i].Node == exclude {
agents = append(agents[:i], agents[i+1:]...)
break
}
}
}

// if cfg.Backup.Priority doesn't set apply defaults
f := func(a AgentStat) float64 {
Expand Down
5 changes: 5 additions & 0 deletions pbm/bsontypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ type NodeInfo struct {
opts MongodOpts
}

// IsSharded returns true is replset is part sharded cluster
func (i *NodeInfo) IsMongos() bool {
return i.Msg == "isdbgrid"
}

// IsSharded returns true is replset is part sharded cluster
func (i *NodeInfo) IsSharded() bool {
return i.SetName != "" && (i.ConfigServerState != nil || i.opts.Sharding.ClusterRole != "" || i.ConfigSvr == 2)
Expand Down
16 changes: 11 additions & 5 deletions pbm/pbm.go
Original file line number Diff line number Diff line change
Expand Up @@ -925,11 +925,14 @@ func (p *PBM) ClusterMembers() ([]Shard, error) {
return nil, errors.Wrap(err, "define cluster state")
}

shards := []Shard{{
RS: inf.SetName,
Host: inf.SetName + "/" + strings.Join(inf.Hosts, ","),
}}
if inf.IsSharded() {
shards := []Shard{}
if !inf.IsMongos() {
shards = append(shards, Shard{
RS: inf.SetName,
Host: inf.SetName + "/" + strings.Join(inf.Hosts, ","),
})
}
if inf.IsMongos() || inf.IsSharded() {
s, err := p.GetShards()
if err != nil {
return nil, errors.Wrap(err, "get shards")
Expand Down Expand Up @@ -979,6 +982,9 @@ func (p *PBM) GetNodeInfo() (*NodeInfo, error) {
if err != nil {
return nil, errors.Wrap(err, "get NodeInfo")
}
if inf.IsMongos() {
return inf, nil
}

opts := struct {
Parsed MongodOpts `bson:"parsed" json:"parsed"`
Expand Down

0 comments on commit 9a0d75d

Please sign in to comment.