Skip to content

Commit

Permalink
K8SPSMDB-1212: Disable balancer during backups/restores
Browse files Browse the repository at this point in the history
  • Loading branch information
egegunes committed Nov 9, 2024
1 parent 4e0b70e commit 8eb5ae4
Show file tree
Hide file tree
Showing 16 changed files with 193 additions and 128 deletions.
1 change: 0 additions & 1 deletion e2e-tests/balancer/run
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#!/bin/bash

set -o errexit
set -o xtrace

test_dir=$(realpath "$(dirname "$0")")
. "${test_dir}/../functions"
Expand Down
4 changes: 4 additions & 0 deletions pkg/apis/psmdb/v1/psmdb_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,10 @@ type BalancerSpec struct {
Enabled *bool `json:"enabled,omitempty"`
}

func (b *BalancerSpec) IsEnabled() bool {
return b.Enabled == nil || *b.Enabled
}

type UpgradeOptions struct {
VersionServiceEndpoint string `json:"versionServiceEndpoint,omitempty"`
Apply UpgradeStrategy `json:"apply,omitempty"`
Expand Down
11 changes: 8 additions & 3 deletions pkg/controller/perconaservermongodb/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,10 +278,15 @@ func (r *ReconcilePerconaServerMongoDB) isRestoreRunning(ctx context.Context, cr
}

for _, rst := range restores.Items {
if rst.Status.State != api.RestoreStateReady && rst.Status.State != api.RestoreStateNew && rst.Status.State != api.RestoreStateError &&
rst.Spec.ClusterName == cr.Name {
return true, nil
if rst.Spec.ClusterName != cr.Name {
continue
}

if rst.Status.State == api.RestoreStateReady || rst.Status.State == api.RestoreStateError {
continue
}

return true, nil
}

return false, nil
Expand Down
20 changes: 17 additions & 3 deletions pkg/controller/perconaservermongodb/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,13 @@ import (
func (r *ReconcilePerconaServerMongoDB) enableBalancerIfNeeded(ctx context.Context, cr *api.PerconaServerMongoDB) error {
log := logf.FromContext(ctx)

if s := cr.Spec.Sharding; !s.Enabled || s.Mongos.Size == 0 || cr.Spec.Unmanaged || (s.Balancer.Enabled != nil && !*s.Balancer.Enabled) || cr.DeletionTimestamp != nil || cr.Spec.Pause {
if s := cr.Spec.Sharding; !s.Enabled ||
s.Mongos.Size == 0 ||
!s.Balancer.IsEnabled() ||
cr.Spec.Unmanaged ||
cr.DeletionTimestamp != nil ||
cr.Spec.Pause {

return nil
}

Expand All @@ -26,12 +32,17 @@ func (r *ReconcilePerconaServerMongoDB) enableBalancerIfNeeded(ctx context.Conte
return errors.Wrap(err, "failed to check if all sfs are up to date")
}

bcpRunning, err := r.isBackupRunning(ctx, cr)
if err != nil {
return errors.Wrap(err, "failed to check running backups")
}

rstRunning, err := r.isRestoreRunning(ctx, cr)
if err != nil {
return errors.Wrap(err, "failed to check running restores")
}

if !uptodate || rstRunning {
if !uptodate || bcpRunning || rstRunning {
return nil
}

Expand Down Expand Up @@ -104,7 +115,10 @@ func (r *ReconcilePerconaServerMongoDB) enableBalancerIfNeeded(ctx context.Conte
}

func (r *ReconcilePerconaServerMongoDB) disableBalancerIfNeeded(ctx context.Context, cr *api.PerconaServerMongoDB) error {
if s := cr.Spec.Sharding; !s.Enabled || s.Mongos.Size == 0 || cr.Spec.Unmanaged || s.Balancer.Enabled == nil || *s.Balancer.Enabled {
if s := cr.Spec.Sharding; !s.Enabled ||
s.Mongos.Size == 0 ||
s.Balancer.IsEnabled() ||
cr.Spec.Unmanaged {
return nil
}
return r.disableBalancer(ctx, cr)
Expand Down
34 changes: 1 addition & 33 deletions pkg/controller/perconaservermongodb/connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (

"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

api "github.com/percona/percona-server-mongodb-operator/pkg/apis/psmdb/v1"
"github.com/percona/percona-server-mongodb-operator/pkg/psmdb"
Expand All @@ -20,42 +19,11 @@ type MongoClientProvider interface {

func (r *ReconcilePerconaServerMongoDB) MongoClientProvider() MongoClientProvider {
if r.mongoClientProvider == nil {
return &mongoClientProvider{r.client}
return &psmdb.MongoClientProvider{K8sClient: r.client}
}
return r.mongoClientProvider
}

type mongoClientProvider struct {
k8sclient client.Client
}

func (p *mongoClientProvider) Mongo(ctx context.Context, cr *api.PerconaServerMongoDB, rs *api.ReplsetSpec, role api.SystemUserRole) (mongo.Client, error) {
c, err := getInternalCredentials(ctx, p.k8sclient, cr, role)
if err != nil {
return nil, errors.Wrap(err, "failed to get credentials")
}

return psmdb.MongoClient(ctx, p.k8sclient, cr, rs, c)
}

func (p *mongoClientProvider) Mongos(ctx context.Context, cr *api.PerconaServerMongoDB, role api.SystemUserRole) (mongo.Client, error) {
c, err := getInternalCredentials(ctx, p.k8sclient, cr, role)
if err != nil {
return nil, errors.Wrap(err, "failed to get credentials")
}

return psmdb.MongosClient(ctx, p.k8sclient, cr, c)
}

func (p *mongoClientProvider) Standalone(ctx context.Context, cr *api.PerconaServerMongoDB, role api.SystemUserRole, host string, tlsEnabled bool) (mongo.Client, error) {
c, err := getInternalCredentials(ctx, p.k8sclient, cr, role)
if err != nil {
return nil, errors.Wrap(err, "failed to get credentials")
}

return psmdb.StandaloneClient(ctx, p.k8sclient, cr, c, host, tlsEnabled)
}

func (r *ReconcilePerconaServerMongoDB) mongoClientWithRole(ctx context.Context, cr *api.PerconaServerMongoDB, rs *api.ReplsetSpec, role api.SystemUserRole) (mongo.Client, error) {
return r.MongoClientProvider().Mongo(ctx, cr, rs, role)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/controller/perconaservermongodb/custom_users.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
logf "sigs.k8s.io/controller-runtime/pkg/log"

api "github.com/percona/percona-server-mongodb-operator/pkg/apis/psmdb/v1"
"github.com/percona/percona-server-mongodb-operator/pkg/psmdb"
"github.com/percona/percona-server-mongodb-operator/pkg/psmdb/mongo"
)

Expand Down Expand Up @@ -86,7 +87,7 @@ func (r *ReconcilePerconaServerMongoDB) reconcileCustomUsers(ctx context.Context
user.PasswordSecretRef.Key = "password"
}

sec, err := getUserSecret(ctx, r.client, cr, user.PasswordSecretRef.Name)
sec, err := psmdb.GetUserSecret(ctx, r.client, cr, user.PasswordSecretRef.Name)
if err != nil {
log.Error(err, "failed to get user secret", "user", user)
continue
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/perconaservermongodb/mgo.go
Original file line number Diff line number Diff line change
Expand Up @@ -710,7 +710,7 @@ func (r *ReconcilePerconaServerMongoDB) handleReplsetInit(ctx context.Context, c
time.Sleep(time.Second * 5)

log.Info("creating user admin", "replset", replsetName, "pod", pod.Name, "user", api.RoleUserAdmin)
userAdmin, err := getInternalCredentials(ctx, r.client, cr, api.RoleUserAdmin)
userAdmin, err := psmdb.GetInternalCredentials(ctx, r.client, cr, api.RoleUserAdmin)
if err != nil {
return errors.Wrap(err, "failed to get userAdmin credentials")
}
Expand Down Expand Up @@ -976,7 +976,7 @@ func (r *ReconcilePerconaServerMongoDB) createOrUpdateSystemUsers(ctx context.Co
}

for _, role := range users {
creds, err := getInternalCredentials(ctx, r.client, cr, role)
creds, err := psmdb.GetInternalCredentials(ctx, r.client, cr, role)
if err != nil {
log.Error(err, "failed to get credentials", "role", role)
continue
Expand Down
31 changes: 0 additions & 31 deletions pkg/controller/perconaservermongodb/psmdb_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -872,33 +872,6 @@ func (r *ReconcilePerconaServerMongoDB) deleteCfgIfNeeded(ctx context.Context, c
return nil
}

func (r *ReconcilePerconaServerMongoDB) stopMongosInCaseOfRestore(ctx context.Context, cr *api.PerconaServerMongoDB) error {
if !cr.Spec.Sharding.Enabled {
return nil
}

rstRunning, err := r.isRestoreRunning(ctx, cr)
if err != nil {
return errors.Wrap(err, "failed to check running restores")
}

if !rstRunning {
return nil
}

err = r.disableBalancer(ctx, cr)
if err != nil {
return errors.Wrap(err, "failed to disable balancer")
}

err = r.deleteMongos(ctx, cr)
if err != nil {
return errors.Wrap(err, "failed to delete mongos")
}

return nil
}

func (r *ReconcilePerconaServerMongoDB) upgradeFCVIfNeeded(ctx context.Context, cr *api.PerconaServerMongoDB, newFCV string) error {
if !cr.Spec.UpgradeOptions.SetFCV || newFCV == "" {
return nil
Expand Down Expand Up @@ -1121,10 +1094,6 @@ func (r *ReconcilePerconaServerMongoDB) createOrUpdateConfigMap(ctx context.Cont
}

func (r *ReconcilePerconaServerMongoDB) reconcileMongos(ctx context.Context, cr *api.PerconaServerMongoDB) error {
if err := r.stopMongosInCaseOfRestore(ctx, cr); err != nil {
return errors.Wrap(err, "on restore")
}

if err := r.reconcileMongosStatefulset(ctx, cr); err != nil {
return errors.Wrap(err, "reconcile mongos")
}
Expand Down
46 changes: 0 additions & 46 deletions pkg/controller/perconaservermongodb/secrets.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,58 +10,12 @@ import (
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"

api "github.com/percona/percona-server-mongodb-operator/pkg/apis/psmdb/v1"
"github.com/percona/percona-server-mongodb-operator/pkg/naming"
"github.com/percona/percona-server-mongodb-operator/pkg/psmdb"
"github.com/percona/percona-server-mongodb-operator/pkg/psmdb/secret"
)

func getUserSecret(ctx context.Context, cl client.Reader, cr *api.PerconaServerMongoDB, name string) (corev1.Secret, error) {
secrets := corev1.Secret{}
err := cl.Get(ctx, types.NamespacedName{Name: name, Namespace: cr.Namespace}, &secrets)
return secrets, errors.Wrap(err, "get user secrets")
}

func getInternalCredentials(ctx context.Context, cl client.Reader, cr *api.PerconaServerMongoDB, role api.SystemUserRole) (psmdb.Credentials, error) {
return getCredentials(ctx, cl, cr, api.UserSecretName(cr), role)
}

func getCredentials(ctx context.Context, cl client.Reader, cr *api.PerconaServerMongoDB, name string, role api.SystemUserRole) (psmdb.Credentials, error) {
creds := psmdb.Credentials{}
usersSecret, err := getUserSecret(ctx, cl, cr, name)
if err != nil {
return creds, errors.Wrap(err, "failed to get user secret")
}

switch role {
case api.RoleDatabaseAdmin:
creds.Username = string(usersSecret.Data[api.EnvMongoDBDatabaseAdminUser])
creds.Password = string(usersSecret.Data[api.EnvMongoDBDatabaseAdminPassword])
case api.RoleClusterAdmin:
creds.Username = string(usersSecret.Data[api.EnvMongoDBClusterAdminUser])
creds.Password = string(usersSecret.Data[api.EnvMongoDBClusterAdminPassword])
case api.RoleUserAdmin:
creds.Username = string(usersSecret.Data[api.EnvMongoDBUserAdminUser])
creds.Password = string(usersSecret.Data[api.EnvMongoDBUserAdminPassword])
case api.RoleClusterMonitor:
creds.Username = string(usersSecret.Data[api.EnvMongoDBClusterMonitorUser])
creds.Password = string(usersSecret.Data[api.EnvMongoDBClusterMonitorPassword])
case api.RoleBackup:
creds.Username = string(usersSecret.Data[api.EnvMongoDBBackupUser])
creds.Password = string(usersSecret.Data[api.EnvMongoDBBackupPassword])
default:
return creds, errors.Errorf("not implemented for role: %s", role)
}

if creds.Username == "" || creds.Password == "" {
return creds, errors.Errorf("can't find credentials for role %s", role)
}

return creds, nil
}

func (r *ReconcilePerconaServerMongoDB) reconcileUsersSecret(ctx context.Context, cr *api.PerconaServerMongoDB) error {
secretObj := corev1.Secret{}
err := r.client.Get(ctx,
Expand Down
22 changes: 19 additions & 3 deletions pkg/controller/perconaservermongodbbackup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/percona/percona-backup-mongodb/pbm/defs"
pbmErrors "github.com/percona/percona-backup-mongodb/pbm/errors"
api "github.com/percona/percona-server-mongodb-operator/pkg/apis/psmdb/v1"
"github.com/percona/percona-server-mongodb-operator/pkg/psmdb"
"github.com/percona/percona-server-mongodb-operator/pkg/psmdb/backup"
)

Expand Down Expand Up @@ -44,19 +45,34 @@ func (r *ReconcilePerconaServerMongoDBBackup) newBackup(ctx context.Context, clu

// Start requests backup on PBM
func (b *Backup) Start(ctx context.Context, k8sclient client.Client, cluster *api.PerconaServerMongoDB, cr *api.PerconaServerMongoDBBackup) (api.PerconaServerMongoDBBackupStatus, error) {
log := logf.FromContext(ctx)
log.Info("Starting backup", "backup", cr.Name, "storage", cr.Spec.StorageName)
log := logf.FromContext(ctx).WithValues("backup", cr.Name, "storage", cr.Spec.StorageName)

log.Info("Starting backup")

var status api.PerconaServerMongoDBBackupStatus

if cluster.Spec.Sharding.Enabled {
provider := psmdb.NewClientProvider(k8sclient)
mongos, err := provider.Mongos(ctx, cluster, api.RoleClusterAdmin)
if err != nil {
return status, errors.Wrap(err, "get mongos session")
}
defer mongos.Disconnect(ctx)

log.Info("Stopping balancer")
if err := mongos.StopBalancer(ctx); err != nil {
return status, errors.Wrap(err, "stop balancer")
}
}

stg, ok := b.spec.Storages[cr.Spec.StorageName]
if !ok {
return status, errors.Errorf("unable to get storage '%s'", cr.Spec.StorageName)
}

err := b.pbm.GetNSetConfig(ctx, k8sclient, cluster, stg)
if err != nil {
return api.PerconaServerMongoDBBackupStatus{}, errors.Wrapf(err, "set backup config with storage %s", cr.Spec.StorageName)
return status, errors.Wrapf(err, "set backup config with storage %s", cr.Spec.StorageName)
}

name := time.Now().UTC().Format(time.RFC3339)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ func (r *ReconcilePerconaServerMongoDBBackup) Reconcile(ctx context.Context, req
log.Error(err, "failed to make backup", "backup", cr.Name)
}
if cr.Status.State != status.State || cr.Status.Error != status.Error {
log.Info("Backup state changed", "previous", cr.Status.State, "current", status.State)
cr.Status = status
uerr := r.updateStatus(ctx, cr)
if uerr != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/perconaservermongodbrestore/logical.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ func runRestore(ctx context.Context, backup string, pbmc backup.PBM, pitr *psmdb

cfg, err := pbmc.GetConfig(ctx)
if err != nil {
return "", errors.Wrap(err, "get PBM config")
}

if err := pbmc.ResyncStorage(ctx, &cfg.Storage); err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/percona/percona-server-mongodb-operator/clientcmd"
psmdbv1 "github.com/percona/percona-server-mongodb-operator/pkg/apis/psmdb/v1"
"github.com/percona/percona-server-mongodb-operator/pkg/psmdb"
"github.com/percona/percona-server-mongodb-operator/pkg/psmdb/backup"
"github.com/percona/percona-server-mongodb-operator/pkg/util"
"github.com/percona/percona-server-mongodb-operator/version"
Expand Down Expand Up @@ -184,6 +185,36 @@ func (r *ReconcilePerconaServerMongoDBRestore) Reconcile(ctx context.Context, re
}

if cr.Status.State == psmdbv1.RestoreStateNew {
if cluster.Spec.Sharding.Enabled {
provider := psmdb.NewClientProvider(r.client)
mongos, err := provider.Mongos(ctx, cluster, psmdbv1.RoleClusterAdmin)
if err != nil {
return rr, errors.Wrap(err, "get mongos session")
}
defer mongos.Disconnect(ctx)

log.Info("Stopping balancer")
if err := mongos.StopBalancer(ctx); err != nil {
return rr, errors.Wrap(err, "stop balancer")
}

log.Info("Terminating mongos pods")
err = r.client.Delete(ctx, psmdb.MongosStatefulset(cluster))
if err != nil && !k8serrors.IsNotFound(err) {
return rr, errors.Wrap(err, "failed to delete mongos statefulset")
}

mongosPods, err := psmdb.GetMongosPods(ctx, r.client, cluster)
if err != nil {
return rr, errors.Wrap(err, "get mongos pods")
}

if len(mongosPods.Items) > 0 {
log.Info("Waiting for mongos pods to terminate")
return rr, nil
}
}

err = r.validate(ctx, cr, cluster)
if err != nil {
if errors.Is(err, errWaitingPBM) || errors.Is(err, errWaitingRestore) {
Expand Down
5 changes: 0 additions & 5 deletions pkg/psmdb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,6 @@ import (
"github.com/percona/percona-server-mongodb-operator/pkg/psmdb/tls"
)

type Credentials struct {
Username string
Password string
}

func MongoClient(ctx context.Context, k8sclient client.Client, cr *api.PerconaServerMongoDB, rs *api.ReplsetSpec, c Credentials) (mongo.Client, error) {
pods, err := GetRSPods(ctx, k8sclient, cr, rs.Name)
if err != nil {
Expand Down
Loading

0 comments on commit 8eb5ae4

Please sign in to comment.