Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PBM-1455: Oplog restore tests #1073

Merged
merged 9 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ require (
github.com/fsnotify/fsnotify v1.7.0
github.com/golang/snappy v0.0.4
github.com/google/uuid v1.6.0
github.com/klauspost/compress v1.17.8
github.com/klauspost/compress v1.17.11
github.com/klauspost/pgzip v1.2.6
github.com/mongodb/mongo-tools v0.0.0-20240723193119-837c2bc263f4
github.com/pierrec/lz4 v2.6.1+incompatible
github.com/pkg/errors v0.9.1
github.com/spf13/cobra v1.8.1
github.com/spf13/viper v1.19.0
go.mongodb.org/mongo-driver v1.16.0
go.mongodb.org/mongo-driver v1.17.1
golang.org/x/mod v0.19.0
golang.org/x/sync v0.10.0
gopkg.in/yaml.v2 v2.4.0
Expand All @@ -30,6 +30,7 @@ require (
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/alecthomas/units v0.0.0-20231202071711-9a357b53e9c9 // indirect
github.com/containerd/log v0.1.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/distribution/reference v0.5.0 // indirect
github.com/docker/go-connections v0.5.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
Expand Down Expand Up @@ -60,7 +61,7 @@ require (
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.2 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect
github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a // indirect
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect
go.opentelemetry.io/otel v1.24.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.24.0 // indirect
Expand Down
13 changes: 6 additions & 7 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU=
github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc=
github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
github.com/klauspost/pgzip v1.2.6 h1:8RXeL5crjEUFnR2/Sn6GJNWtSQ3Dk8pq4CL3jvdDyjU=
github.com/klauspost/pgzip v1.2.6/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
Expand Down Expand Up @@ -170,13 +170,13 @@ github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY=
github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4=
github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8=
github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM=
github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a h1:fZHgsYlfvtyqToslyjUt3VOPF4J7aK/3MPcK7xp3PDk=
github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a/go.mod h1:ul22v+Nro/R083muKhosV54bj5niojjWZvU8xrevuH4=
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 h1:ilQV1hzziu+LLM3zUTJ0trRztfwgjqKnBWNtSRkbmwM=
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78/go.mod h1:aL8wCCfTfSfmXjznFBSZNN13rSJjlIOI1fUNAtF7rmI=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
go.mongodb.org/mongo-driver v1.16.0 h1:tpRsfBJMROVHKpdGyc1BBEzzjDUWjItxbVSZ8Ls4BQ4=
go.mongodb.org/mongo-driver v1.16.0/go.mod h1:oB6AhJQvFQL4LEHyXi6aJzQJtBiTQHiAd83l0GdFaiw=
go.mongodb.org/mongo-driver v1.17.1 h1:Wic5cJIwJgSpBhe3lx3+/RybR5PiYRMpVFgO7cOHyIM=
go.mongodb.org/mongo-driver v1.17.1/go.mod h1:wwWm/+BuOddhcq3n68LKRmgk2wXzmF6s0SFOa0GINL4=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 h1:jq9TW8u3so/bN+JPT166wjOI6/vQPF6Xe7nMNIltagk=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0/go.mod h1:p8pYQP+m5XfbZm9fxtSKAbM6oIllS7s2AfxrChvc7iw=
go.opentelemetry.io/otel v1.24.0 h1:0LAOdjNmQeSTzGBzduGe/rU4tZhMwL5rWgtp9Ku5Jfo=
Expand All @@ -199,7 +199,6 @@ go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI=
go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200302210943-78000ba7a073/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U=
Expand Down
84 changes: 84 additions & 0 deletions pbm/oplog/db.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package oplog

import (
"context"
"strings"

"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"

"github.com/percona/percona-backup-mongodb/pbm/errors"
)

// mDB represents MongoDB access functionality.
type mDB struct {
m *mongo.Client
}

func newMDB(m *mongo.Client) *mDB {
return &mDB{m: m}
}

// getUUIDForNS ruturns UUID of existing collection.
// When ns doesn't exist, it returns zero value without an error.
// In case of error, it returns zero value for UUID in addition to error.
func (d *mDB) getUUIDForNS(ctx context.Context, ns string) (primitive.Binary, error) {
var uuid primitive.Binary

db, coll, _ := strings.Cut(ns, ".")
cur, err := d.m.Database(db).ListCollections(ctx, bson.D{{"name", coll}})
if err != nil {
return uuid, errors.Wrap(err, "list collections")
}
defer cur.Close(ctx)

for cur.Next(ctx) {
if subtype, data, ok := cur.Current.Lookup("info", "uuid").BinaryOK(); ok {
uuid = primitive.Binary{
Subtype: subtype,
Data: data,
}
break
}
}

return uuid, errors.Wrap(cur.Err(), "list collections cursor")
}

// ensureCollExists ensures that the collection exists before "creating" views or timeseries.
// See PBM-921 for details.
func (d *mDB) ensureCollExists(dbName string) error {
err := d.m.Database(dbName).CreateCollection(context.TODO(), "system.views")
if err != nil {
// MongoDB 5.0 and 6.0 returns NamespaceExists error.
// MongoDB 7.0 and 8.0 does not return error.
// https://github.com/mongodb/mongo/blob/v6.0/src/mongo/base/error_codes.yml#L84
const NamespaceExists = 48
var cmdError mongo.CommandError
if !errors.As(err, &cmdError) || cmdError.Code != NamespaceExists {
return errors.Wrapf(err, "ensure %s.system.views collection", dbName)
}
}

return nil
}

// applyOps is a wrapper for the applyOps database command, we pass in
// a session to avoid opening a new connection for a few inserts at a time.
func (d *mDB) applyOps(entries []interface{}) error {
singleRes := d.m.Database("admin").RunCommand(context.TODO(), bson.D{{"applyOps", entries}})
if err := singleRes.Err(); err != nil {
return errors.Wrap(err, "applyOps")
}
res := bson.M{}
err := singleRes.Decode(&res)
if err != nil {
return errors.Wrap(err, "decode singleRes")
}
if isFalsy(res["ok"]) {
return errors.Errorf("applyOps command: %v", res["errmsg"])
}

return nil
}
57 changes: 57 additions & 0 deletions pbm/oplog/db_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package oplog

import (
"context"
"strings"
"testing"

"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo/integration/mtest"
)

func TestGetUUIDForNS(t *testing.T) {
mt := mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock))

mt.Run("successful response from db", func(mt *mtest.T) {
expectedUUID := primitive.Binary{Subtype: 0xFF, Data: []byte{0x01, 0x02, 0x03}}
listCollRes := bson.D{
{"name", "c1"},
{"type", "collection"},
{"info", bson.D{
{"readOnly", false},
{"uuid", expectedUUID},
}},
}
mt.AddMockResponses(mtest.CreateCursorResponse(1, "mydb.c1", mtest.FirstBatch, listCollRes))

db := newMDB(mt.Client)
uuid, err := db.getUUIDForNS(context.Background(), "mydb.c1")
if err != nil {
t.Errorf("got err=%v", err)
}
primitive.NewObjectID()

if !uuid.Equal(expectedUUID) {
t.Errorf("wrong uuid for ns: expected=%v, got=%v", expectedUUID, uuid)
}
t.Log(uuid)
})

mt.Run("failed response from db", func(mt *mtest.T) {
errRes := mtest.CreateCommandErrorResponse(mtest.CommandError{
Code: 11601,
Name: "error",
Message: "querying list collections",
})
mt.AddMockResponses(errRes)
db := newMDB(mt.Client)
_, err := db.getUUIDForNS(context.Background(), "mydb.c1")
if err == nil {
t.Error("expected to get error from getUUIDForNS")
}
if !strings.Contains(err.Error(), "list collections") {
t.Error("wrong err")
}
})
}
78 changes: 17 additions & 61 deletions pbm/oplog/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,16 @@ func (c *cloneNS) SetNSPair(nsPair snapshot.CloneNS) {
c.toDB, c.toColl = nsPair.SplitToNS()
}

// mDBCl represents client interface for MongoDB logic used by OplogRestore
type mDBCl interface {
getUUIDForNS(ctx context.Context, ns string) (primitive.Binary, error)
ensureCollExists(dbName string) error
applyOps(entries []interface{}) error
}

// OplogRestore is the oplog applyer
type OplogRestore struct {
dst *mongo.Client
mdb mDBCl
ver *db.Version
needIdxWorkaround bool
preserveUUIDopt bool
Expand Down Expand Up @@ -157,15 +164,15 @@ const saveLastDistTxns = 100

// NewOplogRestore creates an object for an oplog applying
func NewOplogRestore(
dst *mongo.Client,
m *mongo.Client,
ic *idx.IndexCatalog,
sv *version.MongoVersion,
unsafe,
preserveUUID bool,
ctxn chan phys.RestoreTxn,
txnErr chan error,
) (*OplogRestore, error) {
m, err := ns.NewMatcher(append(snapshot.ExcludeFromRestore, excludeFromOplog...))
matcher, err := ns.NewMatcher(append(snapshot.ExcludeFromRestore, excludeFromOplog...))
if err != nil {
return nil, errors.Wrap(err, "create matcher for the collections exclude")
}
Expand All @@ -185,13 +192,13 @@ func NewOplogRestore(
}
ver := &db.Version{v[0], v[1], v[2]}
return &OplogRestore{
dst: dst,
mdb: newMDB(m),
ver: ver,
preserveUUIDopt: preserveUUID,
preserveUUID: preserveUUID,
needIdxWorkaround: needsCreateIndexWorkaround(ver),
indexCatalog: ic,
excludeNS: m,
excludeNS: matcher,
noUUIDns: noUUID,
txn: ctxn,
txnSyncErr: txnErr,
Expand Down Expand Up @@ -298,7 +305,7 @@ func (o *OplogRestore) SetCloneNS(ctx context.Context, ns snapshot.CloneNS) erro
o.cloneNS.SetNSPair(ns)

var err error
o.cloneNS.toUUID, err = getUUIDForNS(ctx, o.dst, o.cloneNS.ToNS)
o.cloneNS.toUUID, err = o.mdb.getUUIDForNS(ctx, o.cloneNS.ToNS)
if err != nil {
return errors.Wrap(err, "get to ns uuid")
}
Expand Down Expand Up @@ -956,20 +963,14 @@ func (o *OplogRestore) handleNonTxnOp(op db.Oplog) error {
}
} else if op.Operation == "i" && collName == "system.views" {
// PBM-921: ensure the collection exists before "creating" views or timeseries
err := o.dst.Database(dbName).CreateCollection(context.TODO(), "system.views")
err := o.mdb.ensureCollExists(dbName)
if err != nil {
// MongoDB 5.0 and 6.0 returns NamespaceExists error.
// MongoDB 7.0 and 8.0 does not return error.
// https://github.com/mongodb/mongo/blob/v6.0/src/mongo/base/error_codes.yml#L84
const NamespaceExists = 48
var cmdError mongo.CommandError
if !errors.As(err, &cmdError) || cmdError.Code != NamespaceExists {
return errors.Wrapf(err, "ensure %s.system.views collection", dbName)
}
return err
}

}

err = o.applyOps([]interface{}{op})
err = o.mdb.applyOps([]interface{}{op})
if err != nil {
// https://jira.percona.com/browse/PBM-818
if o.unsafe && op.Namespace == "config.chunks" {
Expand Down Expand Up @@ -1071,25 +1072,6 @@ func extractIndexDocumentFromCommitIndexBuilds(op db.Oplog) (string, []*idx.Inde
return collectionName, nil
}

// applyOps is a wrapper for the applyOps database command, we pass in
// a session to avoid opening a new connection for a few inserts at a time.
func (o *OplogRestore) applyOps(entries []interface{}) error {
singleRes := o.dst.Database("admin").RunCommand(context.TODO(), bson.D{{"applyOps", entries}})
if err := singleRes.Err(); err != nil {
return errors.Wrap(err, "applyOps")
}
res := bson.M{}
err := singleRes.Decode(&res)
if err != nil {
return errors.Wrap(err, "decode singleRes")
}
if isFalsy(res["ok"]) {
return errors.Errorf("applyOps command: %v", res["errmsg"])
}

return nil
}

// filterUUIDs removes 'ui' entries from ops, including nested applyOps ops.
// It also modifies ops that rely on 'ui'.
func (o *OplogRestore) filterUUIDs(op db.Oplog) (db.Oplog, error) {
Expand Down Expand Up @@ -1261,29 +1243,3 @@ func isTruthy(val interface{}) bool {
func isFalsy(val interface{}) bool {
return !isTruthy(val)
}

// getUUIDForNS ruturns UUID of existing collection.
// When ns doesn't exist, it returns zero value without an error.
// In case of error, it returns zero value for UUID in addition to error.
func getUUIDForNS(ctx context.Context, m *mongo.Client, ns string) (primitive.Binary, error) {
var uuid primitive.Binary

d, c, _ := strings.Cut(ns, ".")
cur, err := m.Database(d).ListCollections(ctx, bson.D{{"name", c}})
if err != nil {
return uuid, errors.Wrap(err, "list collections")
}
defer cur.Close(ctx)

for cur.Next(ctx) {
if subtype, data, ok := cur.Current.Lookup("info", "uuid").BinaryOK(); ok {
uuid = primitive.Binary{
Subtype: subtype,
Data: data,
}
break
}
}

return uuid, errors.Wrap(cur.Err(), "list collections cursor")
}
Loading
Loading