Skip to content

Commit

Permalink
[PBM-1187] add --wait option to pbm config --force-resync (#883)
Browse files Browse the repository at this point in the history
  • Loading branch information
defbin authored Oct 15, 2023
1 parent c17e43d commit 9a0e29d
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 4 deletions.
26 changes: 23 additions & 3 deletions cmd/pbm/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os"
"reflect"
"strings"
"time"

"go.mongodb.org/mongo-driver/mongo"
"gopkg.in/yaml.v2"
Expand All @@ -17,8 +18,11 @@ import (
"github.com/percona/percona-backup-mongodb/sdk"
)

const resyncWaitDuration = 30 * time.Second

type configOpts struct {
rsync bool
wait bool
list bool
file string
set map[string]string
Expand Down Expand Up @@ -75,11 +79,27 @@ func runConfig(ctx context.Context, conn connect.Client, pbmSDK sdk.Client, c *c
}
return confKV{c.key, fmt.Sprint(k)}, nil
case c.rsync:

if _, err := pbmSDK.SyncFromStorage(ctx); err != nil {
cid, err := pbmSDK.SyncFromStorage(ctx)
if err != nil {
return nil, errors.Wrap(err, "resync")
}
return outMsg{"Storage resync started"}, nil

if !c.wait {
return outMsg{"Storage resync started"}, nil
}

ctx, cancel := context.WithTimeout(ctx, resyncWaitDuration)
defer cancel()

err = sdk.WaitForResync(ctx, pbmSDK, cid)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
err = errors.New("timeout")
}
return nil, errors.Wrapf(err, "waiting for resync [opid %q]", cid)
}

return outMsg{"Storage resync finished"}, nil
case len(c.file) > 0:
var buf []byte
var err error
Expand Down
3 changes: 3 additions & 0 deletions cmd/pbm/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ func main() {
StringMapVar(&cfg.set)
configCmd.Arg("key", "Show the value of a specified key").
StringVar(&cfg.key)
configCmd.Flag("wait", "Wait for finish").
Short('w').
BoolVar(&cfg.wait)

backupCmd := pbmCmd.Command("backup", "Make backup")
backup := backupOpts{}
Expand Down
4 changes: 4 additions & 0 deletions internal/log/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,10 @@ func Follow(
e.ObjID, _ = cur.Current.Lookup("_id").ObjectIDOK()
outC <- e
}

if err := cur.Err(); err != nil {
errC <- err
}
}()

return outC, errC
Expand Down
30 changes: 29 additions & 1 deletion sdk/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package sdk

import (
"context"
"errors"

"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"

"github.com/percona/percona-backup-mongodb/internal/ctrl"
"github.com/percona/percona-backup-mongodb/internal/defs"
"github.com/percona/percona-backup-mongodb/internal/errors"
"github.com/percona/percona-backup-mongodb/internal/log"
"github.com/percona/percona-backup-mongodb/internal/topo"
)

Expand All @@ -28,3 +30,29 @@ func GetClusterTime(ctx context.Context, m *mongo.Client) (Timestamp, error) {

return info.ClusterTime.ClusterTime, nil
}

func WaitForResync(ctx context.Context, c Client, cid CommandID) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

r := &log.LogRequest{
LogKeys: log.LogKeys{
Event: string(ctrl.CmdResync),
OPID: string(cid),
Severity: log.Info,
},
}

outC, errC := log.Follow(ctx, c.(*clientImpl).conn.LogCollection(), r, false)

for {
select {
case entry := <-outC:
if entry != nil && entry.Msg == "succeed" {
return nil
}
case err := <-errC:
return err
}
}
}

0 comments on commit 9a0e29d

Please sign in to comment.