Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PBM-780] PBM SDK (incremental impl) #882

Merged
merged 3 commits into from
Oct 5, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
348 changes: 142 additions & 206 deletions cmd/pbm-agent/agent.go

Large diffs are not rendered by default.

72 changes: 36 additions & 36 deletions cmd/pbm-agent/backup.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,23 @@
package main

import (
"context"
"time"

"github.com/percona/percona-backup-mongodb/internal/backup"
"github.com/percona/percona-backup-mongodb/internal/config"
"github.com/percona/percona-backup-mongodb/internal/context"
"github.com/percona/percona-backup-mongodb/internal/ctrl"
"github.com/percona/percona-backup-mongodb/internal/defs"
"github.com/percona/percona-backup-mongodb/internal/errors"
"github.com/percona/percona-backup-mongodb/internal/lock"
"github.com/percona/percona-backup-mongodb/internal/log"
"github.com/percona/percona-backup-mongodb/internal/priority"
"github.com/percona/percona-backup-mongodb/internal/query"
"github.com/percona/percona-backup-mongodb/internal/storage"
"github.com/percona/percona-backup-mongodb/internal/topo"
"github.com/percona/percona-backup-mongodb/internal/types"
"github.com/percona/percona-backup-mongodb/internal/version"
"github.com/percona/percona-backup-mongodb/pbm/backup"
)

type currentBackup struct {
header *types.BackupCmd
header *ctrl.BackupCmd
cancel context.CancelFunc
}

Expand Down Expand Up @@ -52,17 +50,19 @@ func (a *Agent) CancelBackup() {
}

// Backup starts backup
func (a *Agent) Backup(ctx context.Context, cmd *types.BackupCmd, opid types.OPID, ep config.Epoch) {
func (a *Agent) Backup(ctx context.Context, cmd *ctrl.BackupCmd, opid ctrl.OPID, ep config.Epoch) {
logger := log.FromContext(ctx)

if cmd == nil {
l := a.log.NewEvent(string(defs.CmdBackup), "", opid.String(), ep.TS())
l := logger.NewEvent(string(ctrl.CmdBackup), "", opid.String(), ep.TS())
l.Error("missed command")
return
}

l := a.log.NewEvent(string(defs.CmdBackup), cmd.Name, opid.String(), ep.TS())
ctx = log.SetLoggerToContext(ctx, a.log)
l := logger.NewEvent(string(ctrl.CmdBackup), cmd.Name, opid.String(), ep.TS())
ctx = log.SetLoggerToContext(ctx, logger)

nodeInfo, err := topo.GetNodeInfoExt(ctx, a.node.Session())
nodeInfo, err := topo.GetNodeInfoExt(ctx, a.nodeConn)
if err != nil {
l.Error("get node info: %v", err)
return
Expand All @@ -75,7 +75,7 @@ func (a *Agent) Backup(ctx context.Context, cmd *types.BackupCmd, opid types.OPI
}

isClusterLeader := nodeInfo.IsClusterLeader()
canRunBackup, err := topo.NodeSuitsExt(ctx, a.node.Session(), nodeInfo, cmd.Type)
canRunBackup, err := topo.NodeSuitsExt(ctx, a.nodeConn, nodeInfo, cmd.Type)
if err != nil {
l.Error("node check: %v", err)
if !isClusterLeader {
Expand All @@ -97,18 +97,18 @@ func (a *Agent) Backup(ctx context.Context, cmd *types.BackupCmd, opid types.OPI
var bcp *backup.Backup
switch cmd.Type {
case defs.PhysicalBackup:
bcp = backup.NewPhysical(a.pbm, a.node)
bcp = backup.NewPhysical(a.leadConn, a.nodeConn, a.brief)
case defs.ExternalBackup:
bcp = backup.NewExternal(a.pbm, a.node)
bcp = backup.NewExternal(a.leadConn, a.nodeConn, a.brief)
case defs.IncrementalBackup:
bcp = backup.NewIncremental(a.pbm, a.node, cmd.IncrBase)
bcp = backup.NewIncremental(a.leadConn, a.nodeConn, a.brief, cmd.IncrBase)
case defs.LogicalBackup:
fallthrough
default:
bcp = backup.New(a.pbm, a.node)
bcp = backup.New(a.leadConn, a.nodeConn, a.brief, a.dumpConns)
}

cfg, err := config.GetConfig(ctx, a.pbm.Conn)
cfg, err := config.GetConfig(ctx, a.leadConn)
if err != nil {
l.Error("unable to get PBM config settings: " + err.Error())
return
Expand All @@ -122,7 +122,7 @@ func (a *Agent) Backup(ctx context.Context, cmd *types.BackupCmd, opid types.OPI
if isClusterLeader {
balancer := topo.BalancerModeOff
if nodeInfo.IsSharded() {
bs, err := topo.GetBalancerStatus(ctx, a.pbm.Conn)
bs, err := topo.GetBalancerStatus(ctx, a.leadConn)
if err != nil {
l.Error("get balancer status: %v", err)
return
Expand All @@ -138,8 +138,8 @@ func (a *Agent) Backup(ctx context.Context, cmd *types.BackupCmd, opid types.OPI
}
l.Debug("init backup meta")

if err = topo.CheckTopoForBackup(ctx, a.pbm.Conn, cmd.Type); err != nil {
ferr := query.ChangeBackupState(a.pbm.Conn, cmd.Name, defs.StatusError, err.Error())
if err = topo.CheckTopoForBackup(ctx, a.leadConn, cmd.Type); err != nil {
ferr := backup.ChangeBackupState(a.leadConn, cmd.Name, defs.StatusError, err.Error())
l.Info("mark backup as %s `%v`: %v", defs.StatusError, err, ferr)
return
}
Expand All @@ -150,7 +150,7 @@ func (a *Agent) Backup(ctx context.Context, cmd *types.BackupCmd, opid types.OPI
const srcHostMultiplier = 3.0
var c map[string]float64
if cmd.Type == defs.IncrementalBackup && !cmd.IncrBase {
src, err := query.LastIncrementalBackup(ctx, a.pbm.Conn)
src, err := backup.LastIncrementalBackup(ctx, a.leadConn)
if err != nil {
// try backup anyway
l.Warning("define source backup: %v", err)
Expand All @@ -162,7 +162,7 @@ func (a *Agent) Backup(ctx context.Context, cmd *types.BackupCmd, opid types.OPI
}
}

agents, err := topo.ListAgentStatuses(ctx, a.pbm.Conn)
agents, err := topo.ListAgentStatuses(ctx, a.leadConn)
if err != nil {
l.Error("get agents list: %v", err)
return
Expand All @@ -177,12 +177,12 @@ func (a *Agent) Backup(ctx context.Context, cmd *types.BackupCmd, opid types.OPI
validCandidates = append(validCandidates, s)
}

nodes, err := priority.BcpNodesPriority(ctx, a.pbm.Conn, c, validCandidates)
nodes, err := BcpNodesPriority(ctx, a.leadConn, c, validCandidates)
if err != nil {
l.Error("get nodes priority: %v", err)
return
}
shards, err := topo.ClusterMembers(ctx, a.pbm.Conn.MongoClient())
shards, err := topo.ClusterMembers(ctx, a.leadConn.MongoClient())
if err != nil {
l.Error("get cluster members: %v", err)
return
Expand All @@ -208,8 +208,8 @@ func (a *Agent) Backup(ctx context.Context, cmd *types.BackupCmd, opid types.OPI
}

epoch := ep.TS()
lck := lock.NewLock(a.pbm.Conn, lock.LockHeader{
Type: defs.CmdBackup,
lck := lock.NewLock(a.leadConn, lock.LockHeader{
Type: ctrl.CmdBackup,
Replset: nodeInfo.SetName,
Node: nodeInfo.Me,
OPID: opid.String(),
Expand All @@ -219,8 +219,8 @@ func (a *Agent) Backup(ctx context.Context, cmd *types.BackupCmd, opid types.OPI
// install a backup lock despite having PITR one
got, err := a.acquireLock(ctx, lck, l, func(ctx context.Context) (bool, error) {
return lck.Rewrite(ctx, &lock.LockHeader{
Replset: a.node.RS(),
Type: defs.CmdPITR,
Replset: a.brief.SetName,
Type: ctrl.CmdPITR,
})
})
if err != nil {
Expand All @@ -232,7 +232,7 @@ func (a *Agent) Backup(ctx context.Context, cmd *types.BackupCmd, opid types.OPI
return
}

err = query.SetRSNomineeACK(ctx, a.pbm.Conn, cmd.Name, nodeInfo.SetName, nodeInfo.Me)
err = backup.SetRSNomineeACK(ctx, a.leadConn, cmd.Name, nodeInfo.SetName, nodeInfo.Me)
if err != nil {
l.Warning("set nominee ack: %v", err)
}
Expand Down Expand Up @@ -264,15 +264,15 @@ func (a *Agent) Backup(ctx context.Context, cmd *types.BackupCmd, opid types.OPI

const renominationFrame = 5 * time.Second

func (a *Agent) nominateRS(ctx context.Context, bcp, rs string, nodes [][]string, l *log.Event) error {
func (a *Agent) nominateRS(ctx context.Context, bcp, rs string, nodes [][]string, l log.LogEvent) error {
l.Debug("nomination list for %s: %v", rs, nodes)
err := query.SetRSNomination(ctx, a.pbm.Conn, bcp, rs)
err := backup.SetRSNomination(ctx, a.leadConn, bcp, rs)
if err != nil {
return errors.Wrap(err, "set nomination meta")
}

for _, n := range nodes {
nms, err := query.GetRSNominees(ctx, a.pbm.Conn, bcp, rs)
nms, err := backup.GetRSNominees(ctx, a.leadConn, bcp, rs)
if err != nil && !errors.Is(err, errors.ErrNotFound) {
return errors.Wrap(err, "get nomination meta")
}
Expand All @@ -281,13 +281,13 @@ func (a *Agent) nominateRS(ctx context.Context, bcp, rs string, nodes [][]string
return nil
}

err = query.SetRSNominees(ctx, a.pbm.Conn, bcp, rs, n)
err = backup.SetRSNominees(ctx, a.leadConn, bcp, rs, n)
if err != nil {
return errors.Wrap(err, "set nominees")
}
l.Debug("nomination %s, set candidates %v", rs, n)

err = query.BackupHB(ctx, a.pbm.Conn, bcp)
err = backup.BackupHB(ctx, a.leadConn, bcp)
if err != nil {
l.Warning("send heartbeat: %v", err)
}
Expand All @@ -298,7 +298,7 @@ func (a *Agent) nominateRS(ctx context.Context, bcp, rs string, nodes [][]string
return nil
}

func (a *Agent) waitNomination(ctx context.Context, bcp, rs, node string, l *log.Event) (bool, error) {
func (a *Agent) waitNomination(ctx context.Context, bcp, rs, node string, l log.LogEvent) (bool, error) {
tk := time.NewTicker(time.Millisecond * 500)
defer tk.Stop()

Expand All @@ -308,7 +308,7 @@ func (a *Agent) waitNomination(ctx context.Context, bcp, rs, node string, l *log
for {
select {
case <-tk.C:
nm, err := query.GetRSNominees(ctx, a.pbm.Conn, bcp, rs)
nm, err := backup.GetRSNominees(ctx, a.leadConn, bcp, rs)
if err != nil {
if errors.Is(err, errors.ErrNotFound) {
continue
Expand Down
51 changes: 51 additions & 0 deletions cmd/pbm-agent/lock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package main

import (
"context"

"go.mongodb.org/mongo-driver/bson/primitive"

"github.com/percona/percona-backup-mongodb/internal/backup"
"github.com/percona/percona-backup-mongodb/internal/ctrl"
"github.com/percona/percona-backup-mongodb/internal/defs"
"github.com/percona/percona-backup-mongodb/internal/errors"
"github.com/percona/percona-backup-mongodb/internal/lock"
"github.com/percona/percona-backup-mongodb/internal/log"
"github.com/percona/percona-backup-mongodb/internal/restore"
)

func markBcpStale(ctx context.Context, l *lock.Lock, opid string) error {
bcp, err := backup.GetBackupByOPID(ctx, l.Connect(), opid)
if err != nil {
return errors.Wrap(err, "get backup meta")
}

// not to rewrite an error emitted by the agent
if bcp.Status == defs.StatusError || bcp.Status == defs.StatusDone {
return nil
}

if logger := log.FromContext(ctx); logger != nil {
logger.Debug(string(ctrl.CmdBackup), "", opid, primitive.Timestamp{}, "mark stale meta")
}
return backup.ChangeBackupStateOPID(l.Connect(), opid, defs.StatusError,
"some of pbm-agents were lost during the backup")
}

func markRestoreStale(ctx context.Context, l *lock.Lock, opid string) error {
r, err := restore.GetRestoreMetaByOPID(ctx, l.Connect(), opid)
if err != nil {
return errors.Wrap(err, "get retore meta")
}

// not to rewrite an error emitted by the agent
if r.Status == defs.StatusError || r.Status == defs.StatusDone {
return nil
}

if logger := log.FromContext(ctx); logger != nil {
logger.Debug(string(ctrl.CmdRestore), "", opid, primitive.Timestamp{}, "mark stale meta")
}
return restore.ChangeRestoreStateOPID(ctx, l.Connect(), opid, defs.StatusError,
"some of pbm-agents were lost during the restore")
}
43 changes: 25 additions & 18 deletions cmd/pbm-agent/main.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
package main

import (
"context"
"fmt"
"log"
stdlog "log"
"os"
"runtime"
"strconv"
"strings"

"github.com/alecthomas/kingpin"
mlog "github.com/mongodb/mongo-tools/common/log"
mtLog "github.com/mongodb/mongo-tools/common/log"
"github.com/mongodb/mongo-tools/common/options"

"github.com/percona/percona-backup-mongodb/internal/context"
"github.com/percona/percona-backup-mongodb/internal/connect"
"github.com/percona/percona-backup-mongodb/internal/errors"
plog "github.com/percona/percona-backup-mongodb/internal/log"
"github.com/percona/percona-backup-mongodb/internal/log"
"github.com/percona/percona-backup-mongodb/internal/version"
"github.com/percona/percona-backup-mongodb/pbm"
)

const mongoConnFlag = "mongodb-uri"
Expand Down Expand Up @@ -51,7 +51,7 @@ func main() {

cmd, err := pbmCmd.DefaultEnvars().Parse(os.Args[1:])
if err != nil && cmd != versionCmd.FullCommand() {
log.Println("Error: Parse command line parameters:", err)
stdlog.Println("Error: Parse command line parameters:", err)
return
}

Expand All @@ -73,38 +73,45 @@ func main() {
hidecreds()

err = runAgent(url, *dumpConns)
log.Println("Exit:", err)
stdlog.Println("Exit:", err)
if err != nil {
os.Exit(1)
}
}

func runAgent(mongoURI string, dumpConns int) error {
mlog.SetDateFormat(plog.LogTimeFormat)
mlog.SetVerbosity(&options.Verbosity{VLevel: mlog.DebugLow})
mtLog.SetDateFormat(log.LogTimeFormat)
mtLog.SetVerbosity(&options.Verbosity{VLevel: mtLog.DebugLow})

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

pbmClient, err := pbm.New(ctx, mongoURI, "pbm-agent")
leadConn, err := connect.Connect(ctx, mongoURI, &connect.ConnectOptions{AppName: "pbm-agent"})
if err != nil {
return errors.Wrap(err, "connect to PBM")
}

agnt := newAgent(pbmClient)
defer agnt.Close()
err = agnt.AddNode(ctx, mongoURI, dumpConns)
agent, err := newAgent(ctx, leadConn, mongoURI, dumpConns)
if err != nil {
return errors.Wrap(err, "connect to the node")
}
agnt.InitLogger()

if err := agnt.CanStart(ctx); err != nil {
logger := log.New(agent.leadConn.LogCollection(), agent.brief.SetName, agent.brief.Me)
defer logger.Close()

ctx = log.SetLoggerToContext(ctx, logger)

if err := agent.CanStart(ctx); err != nil {
return errors.Wrap(err, "pre-start check")
}

go agnt.PITR(ctx)
go agnt.HbStatus(ctx)
err = setupNewDB(ctx, agent.leadConn)
if err != nil {
return errors.Wrap(err, "setup pbm collections")
}

go agent.PITR(ctx)
go agent.HbStatus(ctx)

return errors.Wrap(agnt.Start(ctx), "listen the commands stream")
return errors.Wrap(agent.Start(ctx), "listen the commands stream")
}
Loading
Loading