Skip to content

Commit

Permalink
Merge branch 'dev' into PBM-1454-Configuration-file-for-all-pbm-agent…
Browse files Browse the repository at this point in the history
…-s-options
  • Loading branch information
veceraj authored Dec 9, 2024
2 parents 10a5944 + d6bacd7 commit 59337f8
Show file tree
Hide file tree
Showing 9 changed files with 270 additions and 21 deletions.
25 changes: 25 additions & 0 deletions .github/workflows/codecov.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
name: 'codecov'

on:
push:
branches:
- dev

jobs:
go-test:
name: runner / go-test
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-go@v4
with:
go-version: "1.22"
- name: test
run: go test -v ./... -covermode=atomic -coverprofile=cover.out

- name: upload coverage report
uses: codecov/codecov-action@v4
with:
file: cover.out
fail_ci_if_error: false
token: ${{ secrets.CODECOV_TOKEN }}
9 changes: 8 additions & 1 deletion .github/workflows/reviewdog.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,14 @@ jobs:
with:
go-version: "1.22"
- name: test
run: go test -v ./...
run: go test -v ./... -covermode=atomic -coverprofile=cover.out

- name: upload coverage report
uses: codecov/codecov-action@v4
with:
file: cover.out
fail_ci_if_error: false
token: ${{ secrets.CODECOV_TOKEN }}

shellcheck:
name: runner / shellcheck
Expand Down
17 changes: 17 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,20 @@ install-stest-static:
$(ENVS_STATIC) go install -ldflags="$(LDFLAGS_STATIC)" $(BUILD_FLAGS) ./cmd/pbm-speed-test
install-static-entrypoint:
$(ENVS_STATIC) go install -ldflags="$(LDFLAGS_STATIC)" $(BUILD_FLAGS) ./cmd/pbm-agent-entrypoint

# BUILD WITH COVERAGE PROFILING
build-cover: build-pbm-cover build-agent-cover build-stest-cover
build-pbm-cover:
$(ENVS) go build -cover -ldflags="$(LDFLAGS)" $(BUILD_FLAGS) -o ./bin/pbm ./cmd/pbm
build-agent-cover:
$(ENVS) go build -cover -ldflags="$(LDFLAGS)" $(BUILD_FLAGS) -o ./bin/pbm-agent ./cmd/pbm-agent
build-stest-cover:
$(ENVS) go build -cover -ldflags="$(LDFLAGS)" $(BUILD_FLAGS) -o ./bin/pbm-speed-test ./cmd/pbm-speed-test

install-cover: install-pbm-cover install-agent-cover install-stest-cover
install-pbm-cover:
$(ENVS) go install -cover -ldflags="$(LDFLAGS)" $(BUILD_FLAGS) ./cmd/pbm
install-agent-cover:
$(ENVS) go install -cover -ldflags="$(LDFLAGS)" $(BUILD_FLAGS) ./cmd/pbm-agent
install-stest-cover:
$(ENVS) go install -cover -ldflags="$(LDFLAGS)" $(BUILD_FLAGS) ./cmd/pbm-speed-test
37 changes: 23 additions & 14 deletions cmd/pbm-agent/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
)

const (
numInsertionWorkersDefault = 1
numInsertionWorkersDefault = 10
)

func (a *Agent) Restore(ctx context.Context, r *ctrl.RestoreCmd, opid ctrl.OPID, ep config.Epoch) {
Expand Down Expand Up @@ -135,19 +135,8 @@ func (a *Agent) Restore(ctx context.Context, r *ctrl.RestoreCmd, opid ctrl.OPID,
return
}

numParallelColls := runtime.NumCPU() / 2
if r.NumParallelColls != nil && *r.NumParallelColls > 0 {
numParallelColls = int(*r.NumParallelColls)
} else if cfg.Restore != nil && cfg.Restore.NumParallelCollections > 0 {
numParallelColls = cfg.Restore.NumParallelCollections
}

numInsertionWorkersPerCol := numInsertionWorkersDefault
if r.NumInsertionWorkers != nil && *r.NumInsertionWorkers > 0 {
numInsertionWorkersPerCol = int(*r.NumInsertionWorkers)
} else if cfg.Restore != nil && cfg.Restore.NumInsertionWorkers > 0 {
numInsertionWorkersPerCol = cfg.Restore.NumInsertionWorkers
}
numParallelColls := getNumParallelCollsConfig(r.NumParallelColls, cfg.Restore)
numInsertionWorkersPerCol := getNumInsertionWorkersConfig(r.NumInsertionWorkers, cfg.Restore)

rr := restore.New(a.leadConn, a.nodeConn, a.brief, cfg, r.RSMap, numParallelColls, numInsertionWorkersPerCol)
if r.OplogTS.IsZero() {
Expand Down Expand Up @@ -194,6 +183,26 @@ func (a *Agent) Restore(ctx context.Context, r *ctrl.RestoreCmd, opid ctrl.OPID,
l.Info("recovery successfully finished")
}

func getNumParallelCollsConfig(rParallelColls *int32, restoreConf *config.RestoreConf) int {
numParallelColls := runtime.NumCPU() / 2
if rParallelColls != nil && *rParallelColls > 0 {
numParallelColls = int(*rParallelColls)
} else if restoreConf != nil && restoreConf.NumParallelCollections > 0 {
numParallelColls = restoreConf.NumParallelCollections
}
return numParallelColls
}

func getNumInsertionWorkersConfig(rInsWorkers *int32, restoreConf *config.RestoreConf) int {
numInsertionWorkersPerCol := numInsertionWorkersDefault
if rInsWorkers != nil && int(*rInsWorkers) > 0 {
numInsertionWorkersPerCol = int(*rInsWorkers)
} else if restoreConf != nil && restoreConf.NumInsertionWorkers > 0 {
numInsertionWorkersPerCol = restoreConf.NumInsertionWorkers
}
return numInsertionWorkersPerCol
}

func addRestoreMetaWithError(
ctx context.Context,
conn connect.Client,
Expand Down
153 changes: 153 additions & 0 deletions cmd/pbm-agent/restore_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package main

import (
"runtime"
"testing"

"github.com/percona/percona-backup-mongodb/pbm/config"
)

func TestGetNumInsertionWorkersConfig(t *testing.T) {
type args struct {
rInsWorkers *int32
cfg *config.RestoreConf
}

rZeroInsWorkers := int32(0)
rValidInsWorkers := int32(99)

tests := []struct {
name string
args args
want int
}{
{
name: "When no command line param and no Restore config, return default value",
args: args{
rInsWorkers: nil,
cfg: nil,
},
want: numInsertionWorkersDefault,
},
{
name: "When no command line param and no Restore.NumInsertionWorkers config, return default value",
args: args{
rInsWorkers: nil,
cfg: &config.RestoreConf{},
},
want: numInsertionWorkersDefault,
},
{
name: "When zero command line param, return default value",
args: args{
rInsWorkers: &rZeroInsWorkers,
cfg: &config.RestoreConf{},
},
want: numInsertionWorkersDefault,
},
{
name: "NumInsertionWorkers passed from commandline",
args: args{
rInsWorkers: &rValidInsWorkers,
cfg: nil,
},
want: 99,
},
{
name: "NumInsertionWorkers passed from config",
args: args{
rInsWorkers: nil,
cfg: &config.RestoreConf{NumInsertionWorkers: 42},
},
want: 42,
},
{
name: "NumInsertionWorkers passed from command line and from config, return from command line value",
args: args{
rInsWorkers: &rValidInsWorkers,
cfg: &config.RestoreConf{NumInsertionWorkers: 42},
},
want: 99,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := getNumInsertionWorkersConfig(tt.args.rInsWorkers, tt.args.cfg); got != tt.want {
t.Errorf("getNumInsertionWorkersConfig() = %v, want %v", got, tt.want)
}
})
}
}

func TestGetNumParallelCollsConfig(t *testing.T) {
type args struct {
rParallelColls *int32
restoreConf *config.RestoreConf
}

rZeroParallelColls := int32(0)
rValidParallelColls := int32(99)
defaultValue := runtime.NumCPU() / 2

tests := []struct {
name string
args args
want int
}{
{
name: "When no command line param and no Restore config, return default value",
args: args{
rParallelColls: nil,
restoreConf: nil,
},
want: defaultValue,
},
{
name: "When no command line param and no Restore.NumInsertionWorkers config, return default value",
args: args{
rParallelColls: nil,
restoreConf: &config.RestoreConf{},
},
want: defaultValue,
},
{
name: "When zero command line param, return default value",
args: args{
rParallelColls: &rZeroParallelColls,
restoreConf: &config.RestoreConf{},
},
want: defaultValue,
},
{
name: "NumInsertionWorkers passed from commandline",
args: args{
rParallelColls: &rValidParallelColls,
restoreConf: nil,
},
want: 99,
},
{
name: "NumInsertionWorkers passed from config",
args: args{
rParallelColls: nil,
restoreConf: &config.RestoreConf{NumParallelCollections: 42},
},
want: 42,
},
{
name: "NumInsertionWorkers passed from command line and from config, return from command line value",
args: args{
rParallelColls: &rValidParallelColls,
restoreConf: &config.RestoreConf{NumParallelCollections: 42},
},
want: 99,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := getNumParallelCollsConfig(tt.args.rParallelColls, tt.args.restoreConf); got != tt.want {
t.Errorf("getNumParallelCollsConfig() = %v, want %v", got, tt.want)
}
})
}
}
1 change: 0 additions & 1 deletion cmd/pbm/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,6 @@ func main() {
restoreCmd.Flag("num-insertion-workers-per-collection",
"Specifies the number of insertion workers to run concurrently per collection. For large imports, "+
"increasing the number of insertion workers may increase the speed of the import.").
Default("1").
Int32Var(&restore.numInsertionWorkers)
restoreCmd.Flag("ns", `Namespaces to restore (e.g. "db1.*,db2.collection2"). If not set, restore all ("*.*")`).
StringVar(&restore.ns)
Expand Down
9 changes: 5 additions & 4 deletions packaging/scripts/mongodb-backup_builder.sh
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ get_system() {
ARCH=$(echo $(uname -m) | sed -e 's:i686:i386:g')
OS_NAME="el$RHEL"
OS="rpm"
elif [ -f /etc/amazon-linux-release ]; then
RHEL=$(rpm --eval %amzn)
ARCH=$(echo $(uname -m) | sed -e 's:i686:i386:g')
OS_NAME="amzn$RHEL"
OS="rpm"
else
ARCH=$(uname -m)
OS_NAME="$(lsb_release -sc)"
Expand Down Expand Up @@ -164,7 +169,6 @@ install_deps() {
CURPLACE=$(pwd)

if [ "x$OS" = "xrpm" ]; then
RHEL=$(rpm --eval %rhel)
yum clean all
yum -y install epel-release git wget
yum -y install rpm-build make rpmlint rpmdevtools golang krb5-devel
Expand Down Expand Up @@ -282,9 +286,6 @@ build_rpm() {
mkdir -vp rpmbuild/{SOURCES,SPECS,BUILD,SRPMS,RPMS}
cp $SRC_RPM rpmbuild/SRPMS/

RHEL=$(rpm --eval %rhel)
ARCH=$(echo $(uname -m) | sed -e 's:i686:i386:g')

echo "RHEL=${RHEL}" >>percona-backup-mongodb.properties
echo "ARCH=${ARCH}" >>percona-backup-mongodb.properties
[[ ${PATH} == *"/usr/local/go/bin"* && -x /usr/local/go/bin/go ]] || export PATH=/usr/local/go/bin:${PATH}
Expand Down
38 changes: 38 additions & 0 deletions pbm/restore/physical.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,39 @@ func waitMgoShutdown(dbpath string) error {
return nil
}

// waitToBecomePrimary pause execution until RS member becomes primary node.
// Error is returned in case of timeout.
// Unexpected error while getting node info is just logged.
func (r *PhysRestore) waitToBecomePrimary(ctx context.Context, m *mongo.Client) error {
tk := time.NewTicker(time.Second)
defer tk.Stop()

tout := time.NewTimer(2 * time.Minute)
defer tout.Stop()

for {
select {
case <-tk.C:
inf, err := topo.GetNodeInfo(ctx, m)
if err != nil {
r.log.Debug("get node info error while waiting to become primary: %v", err)
continue
}

if inf.IsPrimary {
return nil
}
r.log.Debug("node: %s is still not primary, waiting for another cycle", inf.Me)

case <-tout.C:
return errors.New("timeout while waiting the node to become primary")

case <-ctx.Done():
return ctx.Err()
}
}
}

// toState moves cluster to the given restore state.
// All communication happens via files in the restore dir on storage.
//
Expand Down Expand Up @@ -1347,6 +1380,11 @@ func (r *PhysRestore) replayOplog(
return errors.Wrap(err, "define mongo version")
}

err = r.waitToBecomePrimary(ctx, nodeConn)
if err != nil {
return errors.Wrap(err, "wait to become primary before applying oplog")
}

oplogOption := applyOplogOption{
start: &from,
end: &to,
Expand Down
2 changes: 1 addition & 1 deletion pbm/version/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
)

// current PBM version
const version = "2.7.0"
const version = "2.8.0"

var (
platform string
Expand Down

0 comments on commit 59337f8

Please sign in to comment.