Skip to content

Commit

Permalink
Merge pull request #1073 from percona/PBM-1455-oplog-restore-tests-wi…
Browse files Browse the repository at this point in the history
…th-mocks

PBM-1455: Oplog restore tests
  • Loading branch information
boris-ilijic authored Dec 20, 2024
2 parents 2bd080e + fc616b1 commit 197d680
Show file tree
Hide file tree
Showing 148 changed files with 23,521 additions and 11,742 deletions.
7 changes: 4 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ require (
github.com/fsnotify/fsnotify v1.7.0
github.com/golang/snappy v0.0.4
github.com/google/uuid v1.6.0
github.com/klauspost/compress v1.17.8
github.com/klauspost/compress v1.17.11
github.com/klauspost/pgzip v1.2.6
github.com/mongodb/mongo-tools v0.0.0-20240723193119-837c2bc263f4
github.com/pierrec/lz4 v2.6.1+incompatible
github.com/pkg/errors v0.9.1
github.com/spf13/cobra v1.8.1
github.com/spf13/viper v1.19.0
go.mongodb.org/mongo-driver v1.16.0
go.mongodb.org/mongo-driver v1.17.1
golang.org/x/mod v0.19.0
golang.org/x/sync v0.10.0
gopkg.in/yaml.v2 v2.4.0
Expand All @@ -30,6 +30,7 @@ require (
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/alecthomas/units v0.0.0-20231202071711-9a357b53e9c9 // indirect
github.com/containerd/log v0.1.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/distribution/reference v0.5.0 // indirect
github.com/docker/go-connections v0.5.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
Expand Down Expand Up @@ -60,7 +61,7 @@ require (
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.2 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect
github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a // indirect
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect
go.opentelemetry.io/otel v1.24.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.24.0 // indirect
Expand Down
13 changes: 6 additions & 7 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU=
github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc=
github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
github.com/klauspost/pgzip v1.2.6 h1:8RXeL5crjEUFnR2/Sn6GJNWtSQ3Dk8pq4CL3jvdDyjU=
github.com/klauspost/pgzip v1.2.6/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
Expand Down Expand Up @@ -170,13 +170,13 @@ github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY=
github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4=
github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8=
github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM=
github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a h1:fZHgsYlfvtyqToslyjUt3VOPF4J7aK/3MPcK7xp3PDk=
github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a/go.mod h1:ul22v+Nro/R083muKhosV54bj5niojjWZvU8xrevuH4=
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 h1:ilQV1hzziu+LLM3zUTJ0trRztfwgjqKnBWNtSRkbmwM=
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78/go.mod h1:aL8wCCfTfSfmXjznFBSZNN13rSJjlIOI1fUNAtF7rmI=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
go.mongodb.org/mongo-driver v1.16.0 h1:tpRsfBJMROVHKpdGyc1BBEzzjDUWjItxbVSZ8Ls4BQ4=
go.mongodb.org/mongo-driver v1.16.0/go.mod h1:oB6AhJQvFQL4LEHyXi6aJzQJtBiTQHiAd83l0GdFaiw=
go.mongodb.org/mongo-driver v1.17.1 h1:Wic5cJIwJgSpBhe3lx3+/RybR5PiYRMpVFgO7cOHyIM=
go.mongodb.org/mongo-driver v1.17.1/go.mod h1:wwWm/+BuOddhcq3n68LKRmgk2wXzmF6s0SFOa0GINL4=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 h1:jq9TW8u3so/bN+JPT166wjOI6/vQPF6Xe7nMNIltagk=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0/go.mod h1:p8pYQP+m5XfbZm9fxtSKAbM6oIllS7s2AfxrChvc7iw=
go.opentelemetry.io/otel v1.24.0 h1:0LAOdjNmQeSTzGBzduGe/rU4tZhMwL5rWgtp9Ku5Jfo=
Expand All @@ -199,7 +199,6 @@ go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI=
go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200302210943-78000ba7a073/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U=
Expand Down
84 changes: 84 additions & 0 deletions pbm/oplog/db.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package oplog

import (
"context"
"strings"

"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"

"github.com/percona/percona-backup-mongodb/pbm/errors"
)

// mDB represents MongoDB access functionality.
type mDB struct {
m *mongo.Client
}

func newMDB(m *mongo.Client) *mDB {
return &mDB{m: m}
}

// getUUIDForNS ruturns UUID of existing collection.
// When ns doesn't exist, it returns zero value without an error.
// In case of error, it returns zero value for UUID in addition to error.
func (d *mDB) getUUIDForNS(ctx context.Context, ns string) (primitive.Binary, error) {
var uuid primitive.Binary

db, coll, _ := strings.Cut(ns, ".")
cur, err := d.m.Database(db).ListCollections(ctx, bson.D{{"name", coll}})
if err != nil {
return uuid, errors.Wrap(err, "list collections")
}
defer cur.Close(ctx)

for cur.Next(ctx) {
if subtype, data, ok := cur.Current.Lookup("info", "uuid").BinaryOK(); ok {
uuid = primitive.Binary{
Subtype: subtype,
Data: data,
}
break
}
}

return uuid, errors.Wrap(cur.Err(), "list collections cursor")
}

// ensureCollExists ensures that the collection exists before "creating" views or timeseries.
// See PBM-921 for details.
func (d *mDB) ensureCollExists(dbName string) error {
err := d.m.Database(dbName).CreateCollection(context.TODO(), "system.views")
if err != nil {
// MongoDB 5.0 and 6.0 returns NamespaceExists error.
// MongoDB 7.0 and 8.0 does not return error.
// https://github.com/mongodb/mongo/blob/v6.0/src/mongo/base/error_codes.yml#L84
const NamespaceExists = 48
var cmdError mongo.CommandError
if !errors.As(err, &cmdError) || cmdError.Code != NamespaceExists {
return errors.Wrapf(err, "ensure %s.system.views collection", dbName)
}
}

return nil
}

// applyOps is a wrapper for the applyOps database command, we pass in
// a session to avoid opening a new connection for a few inserts at a time.
func (d *mDB) applyOps(entries []interface{}) error {
singleRes := d.m.Database("admin").RunCommand(context.TODO(), bson.D{{"applyOps", entries}})
if err := singleRes.Err(); err != nil {
return errors.Wrap(err, "applyOps")
}
res := bson.M{}
err := singleRes.Decode(&res)
if err != nil {
return errors.Wrap(err, "decode singleRes")
}
if isFalsy(res["ok"]) {
return errors.Errorf("applyOps command: %v", res["errmsg"])
}

return nil
}
57 changes: 57 additions & 0 deletions pbm/oplog/db_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package oplog

import (
"context"
"strings"
"testing"

"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo/integration/mtest"
)

func TestGetUUIDForNS(t *testing.T) {
mt := mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock))

mt.Run("successful response from db", func(mt *mtest.T) {
expectedUUID := primitive.Binary{Subtype: 0xFF, Data: []byte{0x01, 0x02, 0x03}}
listCollRes := bson.D{
{"name", "c1"},
{"type", "collection"},
{"info", bson.D{
{"readOnly", false},
{"uuid", expectedUUID},
}},
}
mt.AddMockResponses(mtest.CreateCursorResponse(1, "mydb.c1", mtest.FirstBatch, listCollRes))

db := newMDB(mt.Client)
uuid, err := db.getUUIDForNS(context.Background(), "mydb.c1")
if err != nil {
t.Errorf("got err=%v", err)
}
primitive.NewObjectID()

if !uuid.Equal(expectedUUID) {
t.Errorf("wrong uuid for ns: expected=%v, got=%v", expectedUUID, uuid)
}
t.Log(uuid)
})

mt.Run("failed response from db", func(mt *mtest.T) {
errRes := mtest.CreateCommandErrorResponse(mtest.CommandError{
Code: 11601,
Name: "error",
Message: "querying list collections",
})
mt.AddMockResponses(errRes)
db := newMDB(mt.Client)
_, err := db.getUUIDForNS(context.Background(), "mydb.c1")
if err == nil {
t.Error("expected to get error from getUUIDForNS")
}
if !strings.Contains(err.Error(), "list collections") {
t.Error("wrong err")
}
})
}
78 changes: 17 additions & 61 deletions pbm/oplog/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,16 @@ func (c *cloneNS) SetNSPair(nsPair snapshot.CloneNS) {
c.toDB, c.toColl = nsPair.SplitToNS()
}

// mDBCl represents client interface for MongoDB logic used by OplogRestore
type mDBCl interface {
getUUIDForNS(ctx context.Context, ns string) (primitive.Binary, error)
ensureCollExists(dbName string) error
applyOps(entries []interface{}) error
}

// OplogRestore is the oplog applyer
type OplogRestore struct {
dst *mongo.Client
mdb mDBCl
ver *db.Version
needIdxWorkaround bool
preserveUUIDopt bool
Expand Down Expand Up @@ -157,15 +164,15 @@ const saveLastDistTxns = 100

// NewOplogRestore creates an object for an oplog applying
func NewOplogRestore(
dst *mongo.Client,
m *mongo.Client,
ic *idx.IndexCatalog,
sv *version.MongoVersion,
unsafe,
preserveUUID bool,
ctxn chan phys.RestoreTxn,
txnErr chan error,
) (*OplogRestore, error) {
m, err := ns.NewMatcher(append(snapshot.ExcludeFromRestore, excludeFromOplog...))
matcher, err := ns.NewMatcher(append(snapshot.ExcludeFromRestore, excludeFromOplog...))
if err != nil {
return nil, errors.Wrap(err, "create matcher for the collections exclude")
}
Expand All @@ -185,13 +192,13 @@ func NewOplogRestore(
}
ver := &db.Version{v[0], v[1], v[2]}
return &OplogRestore{
dst: dst,
mdb: newMDB(m),
ver: ver,
preserveUUIDopt: preserveUUID,
preserveUUID: preserveUUID,
needIdxWorkaround: needsCreateIndexWorkaround(ver),
indexCatalog: ic,
excludeNS: m,
excludeNS: matcher,
noUUIDns: noUUID,
txn: ctxn,
txnSyncErr: txnErr,
Expand Down Expand Up @@ -298,7 +305,7 @@ func (o *OplogRestore) SetCloneNS(ctx context.Context, ns snapshot.CloneNS) erro
o.cloneNS.SetNSPair(ns)

var err error
o.cloneNS.toUUID, err = getUUIDForNS(ctx, o.dst, o.cloneNS.ToNS)
o.cloneNS.toUUID, err = o.mdb.getUUIDForNS(ctx, o.cloneNS.ToNS)
if err != nil {
return errors.Wrap(err, "get to ns uuid")
}
Expand Down Expand Up @@ -956,20 +963,14 @@ func (o *OplogRestore) handleNonTxnOp(op db.Oplog) error {
}
} else if op.Operation == "i" && collName == "system.views" {
// PBM-921: ensure the collection exists before "creating" views or timeseries
err := o.dst.Database(dbName).CreateCollection(context.TODO(), "system.views")
err := o.mdb.ensureCollExists(dbName)
if err != nil {
// MongoDB 5.0 and 6.0 returns NamespaceExists error.
// MongoDB 7.0 and 8.0 does not return error.
// https://github.com/mongodb/mongo/blob/v6.0/src/mongo/base/error_codes.yml#L84
const NamespaceExists = 48
var cmdError mongo.CommandError
if !errors.As(err, &cmdError) || cmdError.Code != NamespaceExists {
return errors.Wrapf(err, "ensure %s.system.views collection", dbName)
}
return err
}

}

err = o.applyOps([]interface{}{op})
err = o.mdb.applyOps([]interface{}{op})
if err != nil {
// https://jira.percona.com/browse/PBM-818
if o.unsafe && op.Namespace == "config.chunks" {
Expand Down Expand Up @@ -1071,25 +1072,6 @@ func extractIndexDocumentFromCommitIndexBuilds(op db.Oplog) (string, []*idx.Inde
return collectionName, nil
}

// applyOps is a wrapper for the applyOps database command, we pass in
// a session to avoid opening a new connection for a few inserts at a time.
func (o *OplogRestore) applyOps(entries []interface{}) error {
singleRes := o.dst.Database("admin").RunCommand(context.TODO(), bson.D{{"applyOps", entries}})
if err := singleRes.Err(); err != nil {
return errors.Wrap(err, "applyOps")
}
res := bson.M{}
err := singleRes.Decode(&res)
if err != nil {
return errors.Wrap(err, "decode singleRes")
}
if isFalsy(res["ok"]) {
return errors.Errorf("applyOps command: %v", res["errmsg"])
}

return nil
}

// filterUUIDs removes 'ui' entries from ops, including nested applyOps ops.
// It also modifies ops that rely on 'ui'.
func (o *OplogRestore) filterUUIDs(op db.Oplog) (db.Oplog, error) {
Expand Down Expand Up @@ -1261,29 +1243,3 @@ func isTruthy(val interface{}) bool {
func isFalsy(val interface{}) bool {
return !isTruthy(val)
}

// getUUIDForNS ruturns UUID of existing collection.
// When ns doesn't exist, it returns zero value without an error.
// In case of error, it returns zero value for UUID in addition to error.
func getUUIDForNS(ctx context.Context, m *mongo.Client, ns string) (primitive.Binary, error) {
var uuid primitive.Binary

d, c, _ := strings.Cut(ns, ".")
cur, err := m.Database(d).ListCollections(ctx, bson.D{{"name", c}})
if err != nil {
return uuid, errors.Wrap(err, "list collections")
}
defer cur.Close(ctx)

for cur.Next(ctx) {
if subtype, data, ok := cur.Current.Lookup("info", "uuid").BinaryOK(); ok {
uuid = primitive.Binary{
Subtype: subtype,
Data: data,
}
break
}
}

return uuid, errors.Wrap(cur.Err(), "list collections cursor")
}
Loading

0 comments on commit 197d680

Please sign in to comment.