diff --git a/cmd/pbm/config.go b/cmd/pbm/config.go index bd01fae48..bda6ec90e 100644 --- a/cmd/pbm/config.go +++ b/cmd/pbm/config.go @@ -7,6 +7,7 @@ import ( "os" "reflect" "strings" + "time" "go.mongodb.org/mongo-driver/mongo" "gopkg.in/yaml.v2" @@ -17,8 +18,11 @@ import ( "github.com/percona/percona-backup-mongodb/sdk" ) +const resyncWaitDuration = 30 * time.Second + type configOpts struct { rsync bool + wait bool list bool file string set map[string]string @@ -75,11 +79,27 @@ func runConfig(ctx context.Context, conn connect.Client, pbmSDK sdk.Client, c *c } return confKV{c.key, fmt.Sprint(k)}, nil case c.rsync: - - if _, err := pbmSDK.SyncFromStorage(ctx); err != nil { + cid, err := pbmSDK.SyncFromStorage(ctx) + if err != nil { return nil, errors.Wrap(err, "resync") } - return outMsg{"Storage resync started"}, nil + + if !c.wait { + return outMsg{"Storage resync started"}, nil + } + + ctx, cancel := context.WithTimeout(ctx, resyncWaitDuration) + defer cancel() + + err = sdk.WaitForResync(ctx, pbmSDK, cid) + if err != nil { + if errors.Is(err, context.DeadlineExceeded) { + err = errors.New("timeout") + } + return nil, errors.Wrapf(err, "waiting for resync [opid %q]", cid) + } + + return outMsg{"Storage resync finished"}, nil case len(c.file) > 0: var buf []byte var err error diff --git a/cmd/pbm/main.go b/cmd/pbm/main.go index 7ca1c3432..192d07bba 100644 --- a/cmd/pbm/main.go +++ b/cmd/pbm/main.go @@ -96,6 +96,9 @@ func main() { StringMapVar(&cfg.set) configCmd.Arg("key", "Show the value of a specified key"). StringVar(&cfg.key) + configCmd.Flag("wait", "Wait for finish"). + Short('w'). + BoolVar(&cfg.wait) backupCmd := pbmCmd.Command("backup", "Make backup") backup := backupOpts{} diff --git a/internal/log/history.go b/internal/log/history.go index 656f25bf8..d35d074e8 100644 --- a/internal/log/history.go +++ b/internal/log/history.go @@ -214,6 +214,10 @@ func Follow( e.ObjID, _ = cur.Current.Lookup("_id").ObjectIDOK() outC <- e } + + if err := cur.Err(); err != nil { + errC <- err + } }() return outC, errC diff --git a/sdk/util.go b/sdk/util.go index f625dfb47..d233c9d1a 100644 --- a/sdk/util.go +++ b/sdk/util.go @@ -2,12 +2,14 @@ package sdk import ( "context" - "errors" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" + "github.com/percona/percona-backup-mongodb/internal/ctrl" "github.com/percona/percona-backup-mongodb/internal/defs" + "github.com/percona/percona-backup-mongodb/internal/errors" + "github.com/percona/percona-backup-mongodb/internal/log" "github.com/percona/percona-backup-mongodb/internal/topo" ) @@ -28,3 +30,29 @@ func GetClusterTime(ctx context.Context, m *mongo.Client) (Timestamp, error) { return info.ClusterTime.ClusterTime, nil } + +func WaitForResync(ctx context.Context, c Client, cid CommandID) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + r := &log.LogRequest{ + LogKeys: log.LogKeys{ + Event: string(ctrl.CmdResync), + OPID: string(cid), + Severity: log.Info, + }, + } + + outC, errC := log.Follow(ctx, c.(*clientImpl).conn.LogCollection(), r, false) + + for { + select { + case entry := <-outC: + if entry != nil && entry.Msg == "succeed" { + return nil + } + case err := <-errC: + return err + } + } +}