Skip to content

Commit

Permalink
[PBM-780] PBM SDK (inc impl) (#882)
Browse files Browse the repository at this point in the history
  • Loading branch information
defbin authored Oct 5, 2023
1 parent adb2b08 commit c17e43d
Show file tree
Hide file tree
Showing 116 changed files with 4,003 additions and 3,545 deletions.
348 changes: 142 additions & 206 deletions cmd/pbm-agent/agent.go

Large diffs are not rendered by default.

72 changes: 36 additions & 36 deletions cmd/pbm-agent/backup.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,23 @@
package main

import (
"context"
"time"

"github.com/percona/percona-backup-mongodb/internal/backup"
"github.com/percona/percona-backup-mongodb/internal/config"
"github.com/percona/percona-backup-mongodb/internal/context"
"github.com/percona/percona-backup-mongodb/internal/ctrl"
"github.com/percona/percona-backup-mongodb/internal/defs"
"github.com/percona/percona-backup-mongodb/internal/errors"
"github.com/percona/percona-backup-mongodb/internal/lock"
"github.com/percona/percona-backup-mongodb/internal/log"
"github.com/percona/percona-backup-mongodb/internal/priority"
"github.com/percona/percona-backup-mongodb/internal/query"
"github.com/percona/percona-backup-mongodb/internal/storage"
"github.com/percona/percona-backup-mongodb/internal/topo"
"github.com/percona/percona-backup-mongodb/internal/types"
"github.com/percona/percona-backup-mongodb/internal/version"
"github.com/percona/percona-backup-mongodb/pbm/backup"
)

type currentBackup struct {
header *types.BackupCmd
header *ctrl.BackupCmd
cancel context.CancelFunc
}

Expand Down Expand Up @@ -52,17 +50,19 @@ func (a *Agent) CancelBackup() {
}

// Backup starts backup
func (a *Agent) Backup(ctx context.Context, cmd *types.BackupCmd, opid types.OPID, ep config.Epoch) {
func (a *Agent) Backup(ctx context.Context, cmd *ctrl.BackupCmd, opid ctrl.OPID, ep config.Epoch) {
logger := log.FromContext(ctx)

if cmd == nil {
l := a.log.NewEvent(string(defs.CmdBackup), "", opid.String(), ep.TS())
l := logger.NewEvent(string(ctrl.CmdBackup), "", opid.String(), ep.TS())
l.Error("missed command")
return
}

l := a.log.NewEvent(string(defs.CmdBackup), cmd.Name, opid.String(), ep.TS())
ctx = log.SetLoggerToContext(ctx, a.log)
l := logger.NewEvent(string(ctrl.CmdBackup), cmd.Name, opid.String(), ep.TS())
ctx = log.SetLoggerToContext(ctx, logger)

nodeInfo, err := topo.GetNodeInfoExt(ctx, a.node.Session())
nodeInfo, err := topo.GetNodeInfoExt(ctx, a.nodeConn)
if err != nil {
l.Error("get node info: %v", err)
return
Expand All @@ -75,7 +75,7 @@ func (a *Agent) Backup(ctx context.Context, cmd *types.BackupCmd, opid types.OPI
}

isClusterLeader := nodeInfo.IsClusterLeader()
canRunBackup, err := topo.NodeSuitsExt(ctx, a.node.Session(), nodeInfo, cmd.Type)
canRunBackup, err := topo.NodeSuitsExt(ctx, a.nodeConn, nodeInfo, cmd.Type)
if err != nil {
l.Error("node check: %v", err)
if !isClusterLeader {
Expand All @@ -97,18 +97,18 @@ func (a *Agent) Backup(ctx context.Context, cmd *types.BackupCmd, opid types.OPI
var bcp *backup.Backup
switch cmd.Type {
case defs.PhysicalBackup:
bcp = backup.NewPhysical(a.pbm, a.node)
bcp = backup.NewPhysical(a.leadConn, a.nodeConn, a.brief)
case defs.ExternalBackup:
bcp = backup.NewExternal(a.pbm, a.node)
bcp = backup.NewExternal(a.leadConn, a.nodeConn, a.brief)
case defs.IncrementalBackup:
bcp = backup.NewIncremental(a.pbm, a.node, cmd.IncrBase)
bcp = backup.NewIncremental(a.leadConn, a.nodeConn, a.brief, cmd.IncrBase)
case defs.LogicalBackup:
fallthrough
default:
bcp = backup.New(a.pbm, a.node)
bcp = backup.New(a.leadConn, a.nodeConn, a.brief, a.dumpConns)
}

cfg, err := config.GetConfig(ctx, a.pbm.Conn)
cfg, err := config.GetConfig(ctx, a.leadConn)
if err != nil {
l.Error("unable to get PBM config settings: " + err.Error())
return
Expand All @@ -122,7 +122,7 @@ func (a *Agent) Backup(ctx context.Context, cmd *types.BackupCmd, opid types.OPI
if isClusterLeader {
balancer := topo.BalancerModeOff
if nodeInfo.IsSharded() {
bs, err := topo.GetBalancerStatus(ctx, a.pbm.Conn)
bs, err := topo.GetBalancerStatus(ctx, a.leadConn)
if err != nil {
l.Error("get balancer status: %v", err)
return
Expand All @@ -138,8 +138,8 @@ func (a *Agent) Backup(ctx context.Context, cmd *types.BackupCmd, opid types.OPI
}
l.Debug("init backup meta")

if err = topo.CheckTopoForBackup(ctx, a.pbm.Conn, cmd.Type); err != nil {
ferr := query.ChangeBackupState(a.pbm.Conn, cmd.Name, defs.StatusError, err.Error())
if err = topo.CheckTopoForBackup(ctx, a.leadConn, cmd.Type); err != nil {
ferr := backup.ChangeBackupState(a.leadConn, cmd.Name, defs.StatusError, err.Error())
l.Info("mark backup as %s `%v`: %v", defs.StatusError, err, ferr)
return
}
Expand All @@ -150,7 +150,7 @@ func (a *Agent) Backup(ctx context.Context, cmd *types.BackupCmd, opid types.OPI
const srcHostMultiplier = 3.0
var c map[string]float64
if cmd.Type == defs.IncrementalBackup && !cmd.IncrBase {
src, err := query.LastIncrementalBackup(ctx, a.pbm.Conn)
src, err := backup.LastIncrementalBackup(ctx, a.leadConn)
if err != nil {
// try backup anyway
l.Warning("define source backup: %v", err)
Expand All @@ -162,7 +162,7 @@ func (a *Agent) Backup(ctx context.Context, cmd *types.BackupCmd, opid types.OPI
}
}

agents, err := topo.ListAgentStatuses(ctx, a.pbm.Conn)
agents, err := topo.ListAgentStatuses(ctx, a.leadConn)
if err != nil {
l.Error("get agents list: %v", err)
return
Expand All @@ -177,12 +177,12 @@ func (a *Agent) Backup(ctx context.Context, cmd *types.BackupCmd, opid types.OPI
validCandidates = append(validCandidates, s)
}

nodes, err := priority.BcpNodesPriority(ctx, a.pbm.Conn, c, validCandidates)
nodes, err := BcpNodesPriority(ctx, a.leadConn, c, validCandidates)
if err != nil {
l.Error("get nodes priority: %v", err)
return
}
shards, err := topo.ClusterMembers(ctx, a.pbm.Conn.MongoClient())
shards, err := topo.ClusterMembers(ctx, a.leadConn.MongoClient())
if err != nil {
l.Error("get cluster members: %v", err)
return
Expand All @@ -208,8 +208,8 @@ func (a *Agent) Backup(ctx context.Context, cmd *types.BackupCmd, opid types.OPI
}

epoch := ep.TS()
lck := lock.NewLock(a.pbm.Conn, lock.LockHeader{
Type: defs.CmdBackup,
lck := lock.NewLock(a.leadConn, lock.LockHeader{
Type: ctrl.CmdBackup,
Replset: nodeInfo.SetName,
Node: nodeInfo.Me,
OPID: opid.String(),
Expand All @@ -219,8 +219,8 @@ func (a *Agent) Backup(ctx context.Context, cmd *types.BackupCmd, opid types.OPI
// install a backup lock despite having PITR one
got, err := a.acquireLock(ctx, lck, l, func(ctx context.Context) (bool, error) {
return lck.Rewrite(ctx, &lock.LockHeader{
Replset: a.node.RS(),
Type: defs.CmdPITR,
Replset: a.brief.SetName,
Type: ctrl.CmdPITR,
})
})
if err != nil {
Expand All @@ -232,7 +232,7 @@ func (a *Agent) Backup(ctx context.Context, cmd *types.BackupCmd, opid types.OPI
return
}

err = query.SetRSNomineeACK(ctx, a.pbm.Conn, cmd.Name, nodeInfo.SetName, nodeInfo.Me)
err = backup.SetRSNomineeACK(ctx, a.leadConn, cmd.Name, nodeInfo.SetName, nodeInfo.Me)
if err != nil {
l.Warning("set nominee ack: %v", err)
}
Expand Down Expand Up @@ -264,15 +264,15 @@ func (a *Agent) Backup(ctx context.Context, cmd *types.BackupCmd, opid types.OPI

const renominationFrame = 5 * time.Second

func (a *Agent) nominateRS(ctx context.Context, bcp, rs string, nodes [][]string, l *log.Event) error {
func (a *Agent) nominateRS(ctx context.Context, bcp, rs string, nodes [][]string, l log.LogEvent) error {
l.Debug("nomination list for %s: %v", rs, nodes)
err := query.SetRSNomination(ctx, a.pbm.Conn, bcp, rs)
err := backup.SetRSNomination(ctx, a.leadConn, bcp, rs)
if err != nil {
return errors.Wrap(err, "set nomination meta")
}

for _, n := range nodes {
nms, err := query.GetRSNominees(ctx, a.pbm.Conn, bcp, rs)
nms, err := backup.GetRSNominees(ctx, a.leadConn, bcp, rs)
if err != nil && !errors.Is(err, errors.ErrNotFound) {
return errors.Wrap(err, "get nomination meta")
}
Expand All @@ -281,13 +281,13 @@ func (a *Agent) nominateRS(ctx context.Context, bcp, rs string, nodes [][]string
return nil
}

err = query.SetRSNominees(ctx, a.pbm.Conn, bcp, rs, n)
err = backup.SetRSNominees(ctx, a.leadConn, bcp, rs, n)
if err != nil {
return errors.Wrap(err, "set nominees")
}
l.Debug("nomination %s, set candidates %v", rs, n)

err = query.BackupHB(ctx, a.pbm.Conn, bcp)
err = backup.BackupHB(ctx, a.leadConn, bcp)
if err != nil {
l.Warning("send heartbeat: %v", err)
}
Expand All @@ -298,7 +298,7 @@ func (a *Agent) nominateRS(ctx context.Context, bcp, rs string, nodes [][]string
return nil
}

func (a *Agent) waitNomination(ctx context.Context, bcp, rs, node string, l *log.Event) (bool, error) {
func (a *Agent) waitNomination(ctx context.Context, bcp, rs, node string, l log.LogEvent) (bool, error) {
tk := time.NewTicker(time.Millisecond * 500)
defer tk.Stop()

Expand All @@ -308,7 +308,7 @@ func (a *Agent) waitNomination(ctx context.Context, bcp, rs, node string, l *log
for {
select {
case <-tk.C:
nm, err := query.GetRSNominees(ctx, a.pbm.Conn, bcp, rs)
nm, err := backup.GetRSNominees(ctx, a.leadConn, bcp, rs)
if err != nil {
if errors.Is(err, errors.ErrNotFound) {
continue
Expand Down
51 changes: 51 additions & 0 deletions cmd/pbm-agent/lock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package main

import (
"context"

"go.mongodb.org/mongo-driver/bson/primitive"

"github.com/percona/percona-backup-mongodb/internal/backup"
"github.com/percona/percona-backup-mongodb/internal/ctrl"
"github.com/percona/percona-backup-mongodb/internal/defs"
"github.com/percona/percona-backup-mongodb/internal/errors"
"github.com/percona/percona-backup-mongodb/internal/lock"
"github.com/percona/percona-backup-mongodb/internal/log"
"github.com/percona/percona-backup-mongodb/internal/restore"
)

func markBcpStale(ctx context.Context, l *lock.Lock, opid string) error {
bcp, err := backup.GetBackupByOPID(ctx, l.Connect(), opid)
if err != nil {
return errors.Wrap(err, "get backup meta")
}

// not to rewrite an error emitted by the agent
if bcp.Status == defs.StatusError || bcp.Status == defs.StatusDone {
return nil
}

if logger := log.FromContext(ctx); logger != nil {
logger.Debug(string(ctrl.CmdBackup), "", opid, primitive.Timestamp{}, "mark stale meta")
}
return backup.ChangeBackupStateOPID(l.Connect(), opid, defs.StatusError,
"some of pbm-agents were lost during the backup")
}

func markRestoreStale(ctx context.Context, l *lock.Lock, opid string) error {
r, err := restore.GetRestoreMetaByOPID(ctx, l.Connect(), opid)
if err != nil {
return errors.Wrap(err, "get retore meta")
}

// not to rewrite an error emitted by the agent
if r.Status == defs.StatusError || r.Status == defs.StatusDone {
return nil
}

if logger := log.FromContext(ctx); logger != nil {
logger.Debug(string(ctrl.CmdRestore), "", opid, primitive.Timestamp{}, "mark stale meta")
}
return restore.ChangeRestoreStateOPID(ctx, l.Connect(), opid, defs.StatusError,
"some of pbm-agents were lost during the restore")
}
43 changes: 25 additions & 18 deletions cmd/pbm-agent/main.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
package main

import (
"context"
"fmt"
"log"
stdlog "log"
"os"
"runtime"
"strconv"
"strings"

"github.com/alecthomas/kingpin"
mlog "github.com/mongodb/mongo-tools/common/log"
mtLog "github.com/mongodb/mongo-tools/common/log"
"github.com/mongodb/mongo-tools/common/options"

"github.com/percona/percona-backup-mongodb/internal/context"
"github.com/percona/percona-backup-mongodb/internal/connect"
"github.com/percona/percona-backup-mongodb/internal/errors"
plog "github.com/percona/percona-backup-mongodb/internal/log"
"github.com/percona/percona-backup-mongodb/internal/log"
"github.com/percona/percona-backup-mongodb/internal/version"
"github.com/percona/percona-backup-mongodb/pbm"
)

const mongoConnFlag = "mongodb-uri"
Expand Down Expand Up @@ -51,7 +51,7 @@ func main() {

cmd, err := pbmCmd.DefaultEnvars().Parse(os.Args[1:])
if err != nil && cmd != versionCmd.FullCommand() {
log.Println("Error: Parse command line parameters:", err)
stdlog.Println("Error: Parse command line parameters:", err)
return
}

Expand All @@ -73,38 +73,45 @@ func main() {
hidecreds()

err = runAgent(url, *dumpConns)
log.Println("Exit:", err)
stdlog.Println("Exit:", err)
if err != nil {
os.Exit(1)
}
}

func runAgent(mongoURI string, dumpConns int) error {
mlog.SetDateFormat(plog.LogTimeFormat)
mlog.SetVerbosity(&options.Verbosity{VLevel: mlog.DebugLow})
mtLog.SetDateFormat(log.LogTimeFormat)
mtLog.SetVerbosity(&options.Verbosity{VLevel: mtLog.DebugLow})

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

pbmClient, err := pbm.New(ctx, mongoURI, "pbm-agent")
leadConn, err := connect.Connect(ctx, mongoURI, &connect.ConnectOptions{AppName: "pbm-agent"})
if err != nil {
return errors.Wrap(err, "connect to PBM")
}

agnt := newAgent(pbmClient)
defer agnt.Close()
err = agnt.AddNode(ctx, mongoURI, dumpConns)
agent, err := newAgent(ctx, leadConn, mongoURI, dumpConns)
if err != nil {
return errors.Wrap(err, "connect to the node")
}
agnt.InitLogger()

if err := agnt.CanStart(ctx); err != nil {
logger := log.New(agent.leadConn.LogCollection(), agent.brief.SetName, agent.brief.Me)
defer logger.Close()

ctx = log.SetLoggerToContext(ctx, logger)

if err := agent.CanStart(ctx); err != nil {
return errors.Wrap(err, "pre-start check")
}

go agnt.PITR(ctx)
go agnt.HbStatus(ctx)
err = setupNewDB(ctx, agent.leadConn)
if err != nil {
return errors.Wrap(err, "setup pbm collections")
}

go agent.PITR(ctx)
go agent.HbStatus(ctx)

return errors.Wrap(agnt.Start(ctx), "listen the commands stream")
return errors.Wrap(agent.Start(ctx), "listen the commands stream")
}
Loading

0 comments on commit c17e43d

Please sign in to comment.