diff --git a/api/v1alpha1/perconaservermysql_types.go b/api/v1alpha1/perconaservermysql_types.go index 861ae026e..da1b5b3e8 100644 --- a/api/v1alpha1/perconaservermysql_types.go +++ b/api/v1alpha1/perconaservermysql_types.go @@ -628,6 +628,7 @@ func (cr *PerconaServerMySQL) CheckNSetDefaults(ctx context.Context, serverVersi if cr.Spec.MySQL.ClusterType == ClusterTypeGR && !cr.Spec.AllowUnsafeConfig { if cr.Spec.MySQL.Size < MinSafeGRSize { + log.Info("Setting safe defaults, updating MySQL cluster size", "oldSize", cr.Spec.MySQL.Size, "newSafeSize", MinSafeGRSize) cr.Spec.MySQL.Size = MinSafeGRSize } @@ -646,6 +647,7 @@ func (cr *PerconaServerMySQL) CheckNSetDefaults(ctx context.Context, serverVersi if cr.RouterEnabled() && !cr.Spec.AllowUnsafeConfig { if cr.Spec.Proxy.Router.Size < MinSafeProxySize { + log.Info("Setting safe defaults, updating Router size", "oldSize", cr.Spec.Proxy.Router.Size, "newSafeSize", MinSafeProxySize) cr.Spec.Proxy.Router.Size = MinSafeProxySize } } @@ -656,6 +658,7 @@ func (cr *PerconaServerMySQL) CheckNSetDefaults(ctx context.Context, serverVersi if cr.HAProxyEnabled() && !cr.Spec.AllowUnsafeConfig { if cr.Spec.Proxy.HAProxy.Size < MinSafeProxySize { + log.Info("Setting safe defaults, updating HAProxy size", "oldSize", cr.Spec.Proxy.HAProxy.Size, "newSafeSize", MinSafeProxySize) cr.Spec.Proxy.HAProxy.Size = MinSafeProxySize } } diff --git a/cmd/bootstrap/async_replication.go b/cmd/bootstrap/async_replication.go index 25959293a..9679ca53a 100644 --- a/cmd/bootstrap/async_replication.go +++ b/cmd/bootstrap/async_replication.go @@ -11,8 +11,9 @@ import ( "k8s.io/apimachinery/pkg/util/sets" apiv1alpha1 "github.com/percona/percona-server-mysql-operator/api/v1alpha1" + database "github.com/percona/percona-server-mysql-operator/cmd/db" + mysqldb "github.com/percona/percona-server-mysql-operator/pkg/db" "github.com/percona/percona-server-mysql-operator/pkg/mysql" - "github.com/percona/percona-server-mysql-operator/pkg/replicator" ) func bootstrapAsyncReplication(ctx context.Context) error { @@ -87,9 +88,9 @@ func bootstrapAsyncReplication(ctx context.Context) error { return errors.Wrapf(err, "get %s password", apiv1alpha1.UserOperator) } - db, err := replicator.NewReplicator(ctx, apiv1alpha1.UserOperator, operatorPass, podIp, mysql.DefaultAdminPort) + db, err := database.NewDatabase(ctx, apiv1alpha1.UserOperator, operatorPass, podIp, mysql.DefaultAdminPort) if err != nil { - return errors.Wrap(err, "connect to db") + return errors.Wrap(err, "connect to database") } defer db.Close() @@ -148,7 +149,7 @@ func bootstrapAsyncReplication(ctx context.Context) error { log.Printf("Cloning from %s", donor) err = db.Clone(ctx, donor, string(apiv1alpha1.UserOperator), operatorPass, mysql.DefaultAdminPort) timer.Stop("clone") - if err != nil && !errors.Is(err, replicator.ErrRestartAfterClone) { + if err != nil && !errors.Is(err, database.ErrRestartAfterClone) { return errors.Wrapf(err, "clone from donor %s", donor) } @@ -174,7 +175,7 @@ func bootstrapAsyncReplication(ctx context.Context) error { return errors.Wrap(err, "check replication status") } - if rStatus == replicator.ReplicationStatusNotInitiated { + if rStatus == mysqldb.ReplicationStatusNotInitiated { log.Println("configuring replication") replicaPass, err := getSecret(apiv1alpha1.UserReplication) @@ -208,7 +209,7 @@ func getTopology(ctx context.Context, peers sets.Set[string]) (string, []string, } for _, peer := range sets.List(peers) { - db, err := replicator.NewReplicator(ctx, apiv1alpha1.UserOperator, operatorPass, peer, mysql.DefaultAdminPort) + db, err := database.NewDatabase(ctx, apiv1alpha1.UserOperator, operatorPass, peer, mysql.DefaultAdminPort) if err != nil { return "", nil, errors.Wrapf(err, "connect to %s", peer) } @@ -228,7 +229,7 @@ func getTopology(ctx context.Context, peers sets.Set[string]) (string, []string, } replicas.Insert(replicaHost) - if status == replicator.ReplicationStatusActive { + if status == mysqldb.ReplicationStatusActive { primary = source } } @@ -255,7 +256,7 @@ func selectDonor(ctx context.Context, fqdn, primary string, replicas []string) ( } for _, replica := range replicas { - db, err := replicator.NewReplicator(ctx, apiv1alpha1.UserOperator, operatorPass, replica, mysql.DefaultAdminPort) + db, err := database.NewDatabase(ctx, apiv1alpha1.UserOperator, operatorPass, replica, mysql.DefaultAdminPort) if err != nil { continue } diff --git a/cmd/db/db.go b/cmd/db/db.go new file mode 100644 index 000000000..a887b1318 --- /dev/null +++ b/cmd/db/db.go @@ -0,0 +1,240 @@ +package db + +import ( + "context" + "database/sql" + "fmt" + + "github.com/go-sql-driver/mysql" + "github.com/pkg/errors" + + apiv1alpha1 "github.com/percona/percona-server-mysql-operator/api/v1alpha1" + "github.com/percona/percona-server-mysql-operator/pkg/db" +) + +const defaultChannelName = "" + +var ErrRestartAfterClone = errors.New("Error 3707: Restart server failed (mysqld is not managed by supervisor process).") + +type ReplicationStatus int8 + +type DB struct { + db *sql.DB +} + +func NewDatabase(ctx context.Context, user apiv1alpha1.SystemUser, pass, host string, port int32) (*DB, error) { + config := mysql.NewConfig() + + config.User = string(user) + config.Passwd = pass + config.Net = "tcp" + config.Addr = fmt.Sprintf("%s:%d", host, port) + config.DBName = "performance_schema" + config.Params = map[string]string{ + "interpolateParams": "true", + "timeout": "10s", + "readTimeout": "10s", + "writeTimeout": "10s", + "tls": "preferred", + } + + db, err := sql.Open("mysql", config.FormatDSN()) + if err != nil { + return nil, errors.Wrap(err, "connect to MySQL") + } + + if err := db.PingContext(ctx); err != nil { + return nil, errors.Wrap(err, "ping DB") + } + + return &DB{db}, nil +} + +func (d *DB) StartReplication(ctx context.Context, host, replicaPass string, port int32) error { + // TODO: Make retries configurable + _, err := d.db.ExecContext(ctx, ` + CHANGE REPLICATION SOURCE TO + SOURCE_USER=?, + SOURCE_PASSWORD=?, + SOURCE_HOST=?, + SOURCE_PORT=?, + SOURCE_SSL=1, + SOURCE_CONNECTION_AUTO_FAILOVER=1, + SOURCE_AUTO_POSITION=1, + SOURCE_RETRY_COUNT=3, + SOURCE_CONNECT_RETRY=60 + `, apiv1alpha1.UserReplication, replicaPass, host, port) + if err != nil { + return errors.Wrap(err, "exec CHANGE REPLICATION SOURCE TO") + } + + _, err = d.db.ExecContext(ctx, "START REPLICA") + return errors.Wrap(err, "start replication") +} + +func (d *DB) StopReplication(ctx context.Context) error { + _, err := d.db.ExecContext(ctx, "STOP REPLICA") + return errors.Wrap(err, "stop replication") +} + +func (d *DB) ResetReplication(ctx context.Context) error { + _, err := d.db.ExecContext(ctx, "RESET REPLICA ALL") + return errors.Wrap(err, "reset replication") +} + +func (d *DB) ReplicationStatus(ctx context.Context) (db.ReplicationStatus, string, error) { + row := d.db.QueryRowContext(ctx, ` + SELECT + connection_status.SERVICE_STATE, + applier_status.SERVICE_STATE, + HOST + FROM replication_connection_status connection_status + JOIN replication_connection_configuration connection_configuration + ON connection_status.channel_name = connection_configuration.channel_name + JOIN replication_applier_status applier_status + ON connection_status.channel_name = applier_status.channel_name + WHERE connection_status.channel_name = ? + `, defaultChannelName) + + var ioState, sqlState, host string + if err := row.Scan(&ioState, &sqlState, &host); err != nil { + if errors.Is(err, sql.ErrNoRows) { + return db.ReplicationStatusNotInitiated, "", nil + } + return db.ReplicationStatusError, "", errors.Wrap(err, "scan replication status") + } + + if ioState == "ON" && sqlState == "ON" { + return db.ReplicationStatusActive, host, nil + } + + return db.ReplicationStatusNotInitiated, "", nil +} + +func (d *DB) IsReplica(ctx context.Context) (bool, error) { + status, _, err := d.ReplicationStatus(ctx) + return status == db.ReplicationStatusActive, errors.Wrap(err, "get replication status") +} + +func (d *DB) DisableSuperReadonly(ctx context.Context) error { + _, err := d.db.ExecContext(ctx, "SET GLOBAL SUPER_READ_ONLY=0") + return errors.Wrap(err, "set global super_read_only param to 0") +} + +func (d *DB) IsReadonly(ctx context.Context) (bool, error) { + var readonly int + err := d.db.QueryRowContext(ctx, "select @@read_only and @@super_read_only").Scan(&readonly) + return readonly == 1, errors.Wrap(err, "select global read_only param") +} + +func (d *DB) ReportHost(ctx context.Context) (string, error) { + var reportHost string + err := d.db.QueryRowContext(ctx, "select @@report_host").Scan(&reportHost) + return reportHost, errors.Wrap(err, "select report_host param") +} + +func (d *DB) Close() error { + return d.db.Close() +} + +func (d *DB) CloneInProgress(ctx context.Context) (bool, error) { + rows, err := d.db.QueryContext(ctx, "SELECT STATE FROM clone_status") + if err != nil { + return false, errors.Wrap(err, "fetch clone status") + } + defer rows.Close() + + for rows.Next() { + var state string + if err := rows.Scan(&state); err != nil { + return false, errors.Wrap(err, "scan rows") + } + + if state != "Completed" && state != "Failed" { + return true, nil + } + } + + return false, nil +} + +func (d *DB) Clone(ctx context.Context, donor, user, pass string, port int32) error { + _, err := d.db.ExecContext(ctx, "SET GLOBAL clone_valid_donor_list=?", fmt.Sprintf("%s:%d", donor, port)) + if err != nil { + return errors.Wrap(err, "set clone_valid_donor_list") + } + + _, err = d.db.ExecContext(ctx, "CLONE INSTANCE FROM ?@?:? IDENTIFIED BY ?", user, donor, port, pass) + + mErr, ok := err.(*mysql.MySQLError) + if !ok { + return errors.Wrap(err, "clone instance") + } + + // Error 3707: Restart server failed (mysqld is not managed by supervisor process). + if mErr.Number == uint16(3707) { + return ErrRestartAfterClone + } + + return nil +} + +func (d *DB) DumbQuery(ctx context.Context) error { + _, err := d.db.ExecContext(ctx, "SELECT 1") + return errors.Wrap(err, "SELECT 1") +} + +func (d *DB) GetMemberState(ctx context.Context, host string) (db.MemberState, error) { + var state db.MemberState + + err := d.db.QueryRowContext(ctx, "SELECT MEMBER_STATE FROM replication_group_members WHERE MEMBER_HOST=?", host).Scan(&state) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return db.MemberStateOffline, nil + } + return db.MemberStateError, errors.Wrap(err, "query member state") + } + + return state, nil +} + +func (d *DB) CheckIfInPrimaryPartition(ctx context.Context) (bool, error) { + var in bool + + err := d.db.QueryRowContext(ctx, ` + SELECT + MEMBER_STATE = 'ONLINE' + AND ( + ( + SELECT + COUNT(*) + FROM + performance_schema.replication_group_members + WHERE + MEMBER_STATE NOT IN ('ONLINE', 'RECOVERING') + ) >= ( + ( + SELECT + COUNT(*) + FROM + performance_schema.replication_group_members + ) / 2 + ) = 0 + ) + FROM + performance_schema.replication_group_members + JOIN performance_schema.replication_group_member_stats USING(member_id) + WHERE + member_id = @@global.server_uuid; + `).Scan(&in) + if err != nil { + return false, err + } + + return in, nil +} + +func (d *DB) EnableSuperReadonly(ctx context.Context) error { + _, err := d.db.ExecContext(ctx, "SET GLOBAL SUPER_READ_ONLY=1") + return errors.Wrap(err, "set global super_read_only param to 1") +} diff --git a/cmd/healthcheck/main.go b/cmd/healthcheck/main.go index 5dd982f2d..cac0020d9 100644 --- a/cmd/healthcheck/main.go +++ b/cmd/healthcheck/main.go @@ -13,9 +13,10 @@ import ( "github.com/pkg/errors" apiv1alpha1 "github.com/percona/percona-server-mysql-operator/api/v1alpha1" + database "github.com/percona/percona-server-mysql-operator/cmd/db" + mysqldb "github.com/percona/percona-server-mysql-operator/pkg/db" "github.com/percona/percona-server-mysql-operator/pkg/k8s" "github.com/percona/percona-server-mysql-operator/pkg/mysql" - "github.com/percona/percona-server-mysql-operator/pkg/replicator" ) func main() { @@ -81,7 +82,7 @@ func checkReadinessAsync(ctx context.Context) error { return errors.Wrapf(err, "get %s password", apiv1alpha1.UserMonitor) } - db, err := replicator.NewReplicator(ctx, apiv1alpha1.UserMonitor, monitorPass, podIP, mysql.DefaultAdminPort) + db, err := database.NewDatabase(ctx, apiv1alpha1.UserMonitor, monitorPass, podIP, mysql.DefaultAdminPort) if err != nil { return errors.Wrap(err, "connect to db") } @@ -116,7 +117,7 @@ func checkReadinessGR(ctx context.Context) error { return errors.Wrapf(err, "get %s password", apiv1alpha1.UserMonitor) } - db, err := replicator.NewReplicator(ctx, apiv1alpha1.UserMonitor, monitorPass, podIP, mysql.DefaultAdminPort) + db, err := database.NewDatabase(ctx, apiv1alpha1.UserMonitor, monitorPass, podIP, mysql.DefaultAdminPort) if err != nil { return errors.Wrap(err, "connect to db") } @@ -132,7 +133,7 @@ func checkReadinessGR(ctx context.Context) error { return errors.Wrap(err, "get member state") } - if state != replicator.MemberStateOnline { + if state != mysqldb.MemberStateOnline { return errors.Errorf("Member state: %s", state) } @@ -150,7 +151,7 @@ func checkLivenessAsync(ctx context.Context) error { return errors.Wrapf(err, "get %s password", apiv1alpha1.UserMonitor) } - db, err := replicator.NewReplicator(ctx, apiv1alpha1.UserMonitor, monitorPass, podIP, mysql.DefaultAdminPort) + db, err := database.NewDatabase(ctx, apiv1alpha1.UserMonitor, monitorPass, podIP, mysql.DefaultAdminPort) if err != nil { return errors.Wrap(err, "connect to db") } @@ -170,7 +171,7 @@ func checkLivenessGR(ctx context.Context) error { return errors.Wrapf(err, "get %s password", apiv1alpha1.UserMonitor) } - db, err := replicator.NewReplicator(ctx, apiv1alpha1.UserMonitor, monitorPass, podIP, mysql.DefaultAdminPort) + db, err := database.NewDatabase(ctx, apiv1alpha1.UserMonitor, monitorPass, podIP, mysql.DefaultAdminPort) if err != nil { return errors.Wrap(err, "connect to db") } @@ -201,7 +202,7 @@ func checkReplication(ctx context.Context) error { return errors.Wrapf(err, "get %s password", apiv1alpha1.UserMonitor) } - db, err := replicator.NewReplicator(ctx, apiv1alpha1.UserMonitor, monitorPass, podIP, mysql.DefaultAdminPort) + db, err := database.NewDatabase(ctx, apiv1alpha1.UserMonitor, monitorPass, podIP, mysql.DefaultAdminPort) if err != nil { return errors.Wrap(err, "connect to db") } diff --git a/cmd/manager/main.go b/cmd/manager/main.go index 2bca7945f..5ff5535fd 100644 --- a/cmd/manager/main.go +++ b/cmd/manager/main.go @@ -163,6 +163,7 @@ func main() { Client: nsClient, Scheme: mgr.GetScheme(), ServerVersion: serverVersion, + ClientCmd: cliCmd, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "PerconaServerMySQLBackup") os.Exit(1) diff --git a/e2e-tests/license/compare/go-licenses b/e2e-tests/license/compare/go-licenses index 9cff4042d..bfe5cc51f 100644 --- a/e2e-tests/license/compare/go-licenses +++ b/e2e-tests/license/compare/go-licenses @@ -3,4 +3,3 @@ BSD-2-Clause BSD-3-Clause ISC MIT -MPL-2.0 diff --git a/e2e-tests/license/compare/golicense b/e2e-tests/license/compare/golicense index 64653f306..db7c44349 100644 --- a/e2e-tests/license/compare/golicense +++ b/e2e-tests/license/compare/golicense @@ -3,4 +3,3 @@ BSD 2-Clause "Simplified" License BSD 3-Clause "New" or "Revised" License ISC License MIT License -Mozilla Public License 2.0 diff --git a/go.mod b/go.mod index ffd6d9063..0630c3239 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,7 @@ require ( github.com/go-sql-driver/mysql v1.7.1 github.com/gocarina/gocsv v0.0.0-20230616125104-99d496ca653d github.com/minio/minio-go/v7 v7.0.66 - github.com/onsi/ginkgo/v2 v2.13.2 + github.com/onsi/ginkgo/v2 v2.14.0 github.com/onsi/gomega v1.30.0 github.com/pkg/errors v0.9.1 github.com/sjmudd/stopwatch v0.1.1 @@ -27,12 +27,13 @@ require ( k8s.io/apimachinery v0.29.0 k8s.io/client-go v0.29.0 k8s.io/utils v0.0.0-20230726121419-3b25d923346b - sigs.k8s.io/controller-runtime v0.16.3 + sigs.k8s.io/controller-runtime v0.17.0 ) require ( github.com/google/gnostic-models v0.6.8 // indirect github.com/gorilla/websocket v1.5.0 // indirect + github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect github.com/moby/spdystream v0.2.0 // indirect github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect go.opentelemetry.io/otel/metric v1.20.0 // indirect @@ -53,10 +54,10 @@ require ( github.com/dustin/go-humanize v1.0.1 // indirect github.com/emicklei/go-restful/v3 v3.11.0 // indirect github.com/evanphx/json-patch v5.6.0+incompatible // indirect - github.com/evanphx/json-patch/v5 v5.6.0 // indirect - github.com/fsnotify/fsnotify v1.6.0 // indirect + github.com/evanphx/json-patch/v5 v5.8.0 // indirect + github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/go-logr/stdr v1.2.2 // indirect - github.com/go-logr/zapr v1.2.4 // indirect + github.com/go-logr/zapr v1.3.0 // indirect github.com/go-openapi/analysis v0.22.0 // indirect github.com/go-openapi/jsonpointer v0.20.2 // indirect github.com/go-openapi/jsonreference v0.20.4 // indirect @@ -79,7 +80,6 @@ require ( github.com/klauspost/compress v1.17.4 // indirect github.com/klauspost/cpuid/v2 v2.2.6 // indirect github.com/mailru/easyjson v0.7.7 // indirect - github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/minio/md5-simd v1.1.2 // indirect github.com/minio/sha256-simd v1.0.1 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect @@ -89,10 +89,10 @@ require ( github.com/oklog/ulid v1.3.1 // indirect github.com/opentracing/opentracing-go v1.2.0 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect - github.com/prometheus/client_golang v1.16.0 // indirect - github.com/prometheus/client_model v0.4.0 // indirect - github.com/prometheus/common v0.44.0 // indirect - github.com/prometheus/procfs v0.10.1 // indirect + github.com/prometheus/client_golang v1.18.0 // indirect + github.com/prometheus/client_model v0.5.0 // indirect + github.com/prometheus/common v0.45.0 // indirect + github.com/prometheus/procfs v0.12.0 // indirect github.com/rs/xid v1.5.0 // indirect github.com/sergi/go-diff v1.3.1 // indirect github.com/sirupsen/logrus v1.9.3 // indirect @@ -111,11 +111,11 @@ require ( golang.org/x/crypto v0.17.0 // indirect golang.org/x/net v0.19.0 // indirect golang.org/x/oauth2 v0.15.0 // indirect - golang.org/x/sys v0.15.0 // indirect + golang.org/x/sys v0.16.0 // indirect golang.org/x/term v0.15.0 // indirect golang.org/x/text v0.14.0 // indirect golang.org/x/time v0.5.0 // indirect - golang.org/x/tools v0.14.0 // indirect + golang.org/x/tools v0.16.1 // indirect gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect google.golang.org/appengine v1.6.8 // indirect google.golang.org/genproto v0.0.0-20231212172506-995d672761c0 // indirect @@ -124,12 +124,12 @@ require ( gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect - k8s.io/apiextensions-apiserver v0.28.3 // indirect - k8s.io/component-base v0.28.3 // indirect + k8s.io/apiextensions-apiserver v0.29.0 // indirect + k8s.io/component-base v0.29.0 // indirect k8s.io/klog/v2 v2.110.1 // indirect k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 // indirect sigs.k8s.io/gateway-api v0.8.0 // indirect sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect - sigs.k8s.io/yaml v1.3.0 // indirect + sigs.k8s.io/yaml v1.4.0 // indirect ) diff --git a/go.sum b/go.sum index bab1a45d7..0bc435695 100644 --- a/go.sum +++ b/go.sum @@ -52,23 +52,22 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/evanphx/json-patch v5.6.0+incompatible h1:jBYDEEiFBPxA0v50tFdvOzQQTCvpL6mnFh5mB2/l16U= github.com/evanphx/json-patch v5.6.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= -github.com/evanphx/json-patch/v5 v5.6.0 h1:b91NhWfaz02IuVxO9faSllyAtNXHMPkC5J8sJCLunww= -github.com/evanphx/json-patch/v5 v5.6.0/go.mod h1:G79N1coSVB93tBe7j6PhzjmR3/2VvlbKOFpnXhI9Bw4= +github.com/evanphx/json-patch/v5 v5.8.0 h1:lRj6N9Nci7MvzrXuX6HFzU8XjmhPiXPlsKEy1u0KQro= +github.com/evanphx/json-patch/v5 v5.8.0/go.mod h1:VNkHZ/282BpEyt/tObQO8s5CMPmYYq14uClGH4abBuQ= github.com/flosch/pongo2 v0.0.0-20200913210552-0d938eb266f3 h1:fmFk0Wt3bBxxwZnu48jqMdaOR/IZ4vdtJFuaFV8MpIE= github.com/flosch/pongo2 v0.0.0-20200913210552-0d938eb266f3/go.mod h1:bJWSKrZyQvfTnb2OudyUjurSG4/edverV7n82+K3JiM= -github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= -github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= +github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= +github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vbaY= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= -github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= -github.com/go-logr/zapr v1.2.4 h1:QHVo+6stLbfJmYGkQ7uGHUCu5hnAFAj6mDe6Ea0SeOo= -github.com/go-logr/zapr v1.2.4/go.mod h1:FyHWQIzQORZ0QVE1BtVHv3cKtNLuXsbNLtpuhNapBOA= +github.com/go-logr/zapr v1.3.0 h1:XGdV8XW8zdwFiwOA2Dryh1gj2KRQyOOoNmBy4EplIcQ= +github.com/go-logr/zapr v1.3.0/go.mod h1:YKepepNBd1u/oyhd/yQmtjVXmm9uML4IXUgMOwR8/Gg= github.com/go-openapi/analysis v0.22.0 h1:wQ/d07nf78HNj4u+KiSY0sT234IAyePPbMgpUjUJQR0= github.com/go-openapi/analysis v0.22.0/go.mod h1:acDnkkCI2QxIo8sSIPgmp1wUlRohV7vfGtAIVae73b0= github.com/go-openapi/errors v0.21.0 h1:FhChC/duCnfoLj1gZ0BgaBmzhJC2SL/sJr8a2vAobSY= @@ -139,7 +138,6 @@ github.com/iancoleman/orderedmap v0.3.0/go.mod h1:XuLcCUkdL5owUCQeF2Ue9uuw1EptkJ github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/imdario/mergo v0.3.12 h1:b6R2BslTbIEToALKP7LxUvijTsNI9TAe80pLWN2g/HU= github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA= -github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= @@ -168,8 +166,8 @@ github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxec github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= github.com/mattn/go-isatty v0.0.17 h1:BTarxUcIeDqL27Mc+vyvdWYSL28zpIhv3RoTdsLMPng= github.com/mattn/go-isatty v0.0.17/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= -github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= -github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= +github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg= +github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod h1:QUyp042oQthUoa9bqDv0ER0wrtXnBruoNd7aNjkbP+k= github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34= github.com/minio/md5-simd v1.1.2/go.mod h1:MzdKDxYpY2BT9XQFocsiZf/NKVtR7nkE4RoEpN+20RM= github.com/minio/minio-go/v7 v7.0.66 h1:bnTOXOHjOqv/gcMuiVbN9o2ngRItvqE774dG9nq0Dzw= @@ -197,8 +195,8 @@ github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= github.com/onsi/ginkgo v1.15.2 h1:l77YT15o814C2qVL47NOyjV/6RbaP7kKdrvZnxQ3Org= github.com/onsi/ginkgo v1.15.2/go.mod h1:Dd6YFfwBW84ETqqtL0CPyPXillHgY6XhQH3uuCCTr/o= -github.com/onsi/ginkgo/v2 v2.13.2 h1:Bi2gGVkfn6gQcjNjZJVO8Gf0FHzMPf2phUei9tejVMs= -github.com/onsi/ginkgo/v2 v2.13.2/go.mod h1:XStQ8QcGwLyF4HdfcZB8SFOS/MWCgDuXMSBe6zrvLgM= +github.com/onsi/ginkgo/v2 v2.14.0 h1:vSmGj2Z5YPb9JwCWT6z6ihcUvDhuXLc3sJiqd3jMKAY= +github.com/onsi/ginkgo/v2 v2.14.0/go.mod h1:JkUdW7JkN0V6rFvsHcJ478egV3XH9NxpD27Hal/PhZw= github.com/onsi/gomega v1.30.0 h1:hvMK7xYz4D3HapigLTeGdId/NcfQx1VHMJc60ew99+8= github.com/onsi/gomega v1.30.0/go.mod h1:9sxs+SwGrKI0+PWe4Fxa9tFQQBG5xSsSbMXOI8PPpoQ= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= @@ -212,15 +210,15 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/prometheus/client_golang v1.16.0 h1:yk/hx9hDbrGHovbci4BY+pRMfSuuat626eFsHb7tmT8= -github.com/prometheus/client_golang v1.16.0/go.mod h1:Zsulrv/L9oM40tJ7T815tM89lFEugiJ9HzIqaAx4LKc= +github.com/prometheus/client_golang v1.18.0 h1:HzFfmkOzH5Q8L8G+kSJKUx5dtG87sewO+FoDDqP5Tbk= +github.com/prometheus/client_golang v1.18.0/go.mod h1:T+GXkCk5wSJyOqMIzVgvvjFDlkOQntgjkJWKrN5txjA= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= -github.com/prometheus/client_model v0.4.0 h1:5lQXD3cAg1OXBf4Wq03gTrXHeaV0TQvGfUooCfx1yqY= -github.com/prometheus/client_model v0.4.0/go.mod h1:oMQmHW1/JoDwqLtg57MGgP/Fb1CJEYF2imWWhWtMkYU= -github.com/prometheus/common v0.44.0 h1:+5BrQJwiBB9xsMygAB3TNvpQKOwlkc25LbISbrdOOfY= -github.com/prometheus/common v0.44.0/go.mod h1:ofAIvZbQ1e/nugmZGz4/qCb9Ap1VoSTIO7x0VV9VvuY= -github.com/prometheus/procfs v0.10.1 h1:kYK1Va/YMlutzCGazswoHKo//tZVlFpKYh+PymziUAg= -github.com/prometheus/procfs v0.10.1/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM= +github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw= +github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI= +github.com/prometheus/common v0.45.0 h1:2BGz0eBc2hdMDLnO/8n0jeB3oPrt2D08CekT0lneoxM= +github.com/prometheus/common v0.45.0/go.mod h1:YJmSTw9BoKxJplESWWxlbyttQR4uaEcGyv9MZjVOJsY= +github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo= +github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA= github.com/rs/xid v1.5.0 h1:mKX4bl4iPYJtEIxp6CYiUuLQ/8DYMoz0PUdtGgMFRVc= @@ -238,7 +236,6 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.1 h1:4VhoImhV/Bm0ToFkXFi8hXNXwpDRZ/ynw3amt82mzq0= github.com/stretchr/objx v0.5.1/go.mod h1:/iHQpkQwBD6DLUmQ4pE+s1TXdob1mORJ4/UFdrifcy0= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= @@ -246,8 +243,6 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/swaggest/assertjson v1.9.0 h1:dKu0BfJkIxv/xe//mkCrK5yZbs79jL7OVf9Ija7o2xQ= @@ -264,7 +259,6 @@ github.com/yudai/pp v2.0.1+incompatible h1:Q4//iY4pNF6yPLZIigmvcl7k/bPgrcTPIFIcm github.com/yudai/pp v2.0.1+incompatible/go.mod h1:PuxR/8QJ7cyCkFp/aUDS+JY727OFEZkTdatxwunjIkc= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= -github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= go.mongodb.org/mongo-driver v1.13.1 h1:YIc7HTYsKndGK4RFzJ3covLz1byri52x0IoMB0Pt/vk= go.mongodb.org/mongo-driver v1.13.1/go.mod h1:wcDf1JBCXy2mOW0bWHwO/IOYqdca1MPCwDtFu/Z9+eo= @@ -286,14 +280,12 @@ go.opentelemetry.io/otel/trace v1.20.0 h1:+yxVAPZPbQhbC3OfAkeIVTky6iTFpcr4SiY9om go.opentelemetry.io/otel/trace v1.20.0/go.mod h1:HJSK7F/hA5RlzpZ0zKDCHCDHm556LCDtKaAo6JmBFUU= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= -go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= -go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= -go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.18.1/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI= -go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg= go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo= go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= @@ -312,10 +304,7 @@ golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHl golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= -golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= -golang.org/x/mod v0.13.0 h1:I/DsJXRlw/8l/0c24sM9yb0T4z9liZTduXvdAWYiysY= -golang.org/x/mod v0.13.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -325,7 +314,6 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= -golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c= @@ -335,11 +323,9 @@ golang.org/x/oauth2 v0.15.0 h1:s8pnnxNVzjWyrvYdFUQq5llS1PX2zhPXmccZv99h7uQ= golang.org/x/oauth2 v0.15.0/go.mod h1:q48ptWNTY5XWf+JNten23lcvHpLJ0ZSxF5ttTHKVCAM= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ= golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= @@ -350,18 +336,15 @@ golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= -golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= +golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.15.0 h1:y/Oo/a/q3IXu26lQgl04j/gjuBDOBlx7X6Om1j2CPW4= @@ -385,10 +368,9 @@ golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= -golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= -golang.org/x/tools v0.14.0 h1:jvNa2pY0M4r62jkRQ6RwEZZyPcymeL9XZMLBbV7U2nc= -golang.org/x/tools v0.14.0/go.mod h1:uYBEerGOWcJyEORxN+Ek8+TT266gXkNlHdJBwexUsBg= +golang.org/x/tools v0.16.1 h1:TLyB3WofjdOEepBHAU20JdNC1Zbg87elYofWYAY5oZA= +golang.org/x/tools v0.16.1/go.mod h1:kYVVN6I1mBNoB1OX+noeBjbRk4IUEPa7JJ+TJMEooJ0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -444,27 +426,27 @@ honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWh honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= k8s.io/api v0.29.0 h1:NiCdQMY1QOp1H8lfRyeEf8eOwV6+0xA6XEE44ohDX2A= k8s.io/api v0.29.0/go.mod h1:sdVmXoz2Bo/cb77Pxi71IPTSErEW32xa4aXwKH7gfBA= -k8s.io/apiextensions-apiserver v0.28.3 h1:Od7DEnhXHnHPZG+W9I97/fSQkVpVPQx2diy+2EtmY08= -k8s.io/apiextensions-apiserver v0.28.3/go.mod h1:NE1XJZ4On0hS11aWWJUTNkmVB03j9LM7gJSisbRt8Lc= +k8s.io/apiextensions-apiserver v0.29.0 h1:0VuspFG7Hj+SxyF/Z/2T0uFbI5gb5LRgEyUVE3Q4lV0= +k8s.io/apiextensions-apiserver v0.29.0/go.mod h1:TKmpy3bTS0mr9pylH0nOt/QzQRrW7/h7yLdRForMZwc= k8s.io/apimachinery v0.29.0 h1:+ACVktwyicPz0oc6MTMLwa2Pw3ouLAfAon1wPLtG48o= k8s.io/apimachinery v0.29.0/go.mod h1:eVBxQ/cwiJxH58eK/jd/vAk4mrxmVlnpBH5J2GbMeis= k8s.io/client-go v0.29.0 h1:KmlDtFcrdUzOYrBhXHgKw5ycWzc3ryPX5mQe0SkG3y8= k8s.io/client-go v0.29.0/go.mod h1:yLkXH4HKMAywcrD82KMSmfYg2DlE8mepPR4JGSo5n38= -k8s.io/component-base v0.28.3 h1:rDy68eHKxq/80RiMb2Ld/tbH8uAE75JdCqJyi6lXMzI= -k8s.io/component-base v0.28.3/go.mod h1:fDJ6vpVNSk6cRo5wmDa6eKIG7UlIQkaFmZN2fYgIUD8= +k8s.io/component-base v0.29.0 h1:T7rjd5wvLnPBV1vC4zWd/iWRbV8Mdxs+nGaoaFzGw3s= +k8s.io/component-base v0.29.0/go.mod h1:sADonFTQ9Zc9yFLghpDpmNXEdHyQmFIGbiuZbqAXQ1M= k8s.io/klog/v2 v2.110.1 h1:U/Af64HJf7FcwMcXyKm2RPM22WZzyR7OSpYj5tg3cL0= k8s.io/klog/v2 v2.110.1/go.mod h1:YGtd1984u+GgbuZ7e08/yBuAfKLSO0+uR1Fhi6ExXjo= k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 h1:aVUu9fTY98ivBPKR9Y5w/AuzbMm96cd3YHRTU83I780= k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00/go.mod h1:AsvuZPBlUDVuCdzJ87iajxtXuR9oktsTctW/R9wwouA= k8s.io/utils v0.0.0-20230726121419-3b25d923346b h1:sgn3ZU783SCgtaSJjpcVVlRqd6GSnlTLKgpAAttJvpI= k8s.io/utils v0.0.0-20230726121419-3b25d923346b/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= -sigs.k8s.io/controller-runtime v0.16.3 h1:2TuvuokmfXvDUamSx1SuAOO3eTyye+47mJCigwG62c4= -sigs.k8s.io/controller-runtime v0.16.3/go.mod h1:j7bialYoSn142nv9sCOJmQgDXQXxnroFU4VnX/brVJ0= +sigs.k8s.io/controller-runtime v0.17.0 h1:fjJQf8Ukya+VjogLO6/bNX9HE6Y2xpsO5+fyS26ur/s= +sigs.k8s.io/controller-runtime v0.17.0/go.mod h1:+MngTvIQQQhfXtwfdGw/UOQ/aIaqsYywfCINOtwMO/s= sigs.k8s.io/gateway-api v0.8.0 h1:isQQ3Jx2qFP7vaA3ls0846F0Amp9Eq14P08xbSwVbQg= sigs.k8s.io/gateway-api v0.8.0/go.mod h1:okOnjPNBFbIS/Rw9kAhuIUaIkLhTKEu+ARIuXk2dgaM= sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd h1:EDPBXCAspyGV4jQlpZSudPeMmr1bNJefnuqLsRAsHZo= sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0= sigs.k8s.io/structured-merge-diff/v4 v4.4.1 h1:150L+0vs/8DA78h1u02ooW1/fFq/Lwr+sGiqlzvrtq4= sigs.k8s.io/structured-merge-diff/v4 v4.4.1/go.mod h1:N8hJocpFajUSSeSJ9bOZ77VzejKZaXsTtZo4/u7Io08= -sigs.k8s.io/yaml v1.3.0 h1:a2VclLzOGrwOHDiV8EfBGhvjHvP46CtW5j6POvhYGGo= -sigs.k8s.io/yaml v1.3.0/go.mod h1:GeOyir5tyXNByN85N/dRIT9es5UQNerPYEKK56eTBm8= +sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E= +sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY= diff --git a/pkg/controller/ps/controller.go b/pkg/controller/ps/controller.go index 1305ab333..cfcd3b688 100644 --- a/pkg/controller/ps/controller.go +++ b/pkg/controller/ps/controller.go @@ -45,13 +45,13 @@ import ( apiv1alpha1 "github.com/percona/percona-server-mysql-operator/api/v1alpha1" "github.com/percona/percona-server-mysql-operator/pkg/clientcmd" "github.com/percona/percona-server-mysql-operator/pkg/controller/psrestore" + "github.com/percona/percona-server-mysql-operator/pkg/db" "github.com/percona/percona-server-mysql-operator/pkg/haproxy" "github.com/percona/percona-server-mysql-operator/pkg/k8s" "github.com/percona/percona-server-mysql-operator/pkg/mysql" "github.com/percona/percona-server-mysql-operator/pkg/mysqlsh" "github.com/percona/percona-server-mysql-operator/pkg/orchestrator" "github.com/percona/percona-server-mysql-operator/pkg/platform" - "github.com/percona/percona-server-mysql-operator/pkg/replicator" "github.com/percona/percona-server-mysql-operator/pkg/router" "github.com/percona/percona-server-mysql-operator/pkg/util" ) @@ -204,11 +204,7 @@ func (r *PerconaServerMySQLReconciler) deleteMySQLPods(ctx context.Context, cr * firstPodFQDN := fmt.Sprintf("%s.%s.%s", firstPod.Name, mysql.ServiceName(cr), cr.Namespace) firstPodUri := fmt.Sprintf("%s:%s@%s", apiv1alpha1.UserOperator, operatorPass, firstPodFQDN) - db, err := replicator.NewReplicatorExec(&firstPod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, firstPodFQDN) - if err != nil { - return errors.Wrapf(err, "connect to %s", firstPod.Name) - } - defer db.Close() + um := db.NewReplicationManager(&firstPod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, firstPodFQDN) mysh, err := mysqlsh.NewWithExec(r.ClientCmd, &firstPod, firstPodUri) if err != nil { @@ -223,12 +219,12 @@ func (r *PerconaServerMySQLReconciler) deleteMySQLPods(ctx context.Context, cr * podFQDN := fmt.Sprintf("%s.%s.%s", pod.Name, mysql.ServiceName(cr), cr.Namespace) - state, err := db.GetMemberState(ctx, podFQDN) + state, err := um.GetMemberState(ctx, podFQDN) if err != nil { return errors.Wrapf(err, "get member state of %s from performance_schema", pod.Name) } - if state == replicator.MemberStateOffline { + if state == db.MemberStateOffline { log.Info("Member is not part of GR or already removed", "member", pod.Name, "memberState", state) continue } @@ -538,15 +534,15 @@ func (r *PerconaServerMySQLReconciler) reconcileOrchestrator(ctx context.Context return nil } - cm := &corev1.ConfigMap{} - err := r.Client.Get(ctx, orchestrator.NamespacedName(cr), cm) + cmap := &corev1.ConfigMap{} + err := r.Client.Get(ctx, orchestrator.NamespacedName(cr), cmap) if client.IgnoreNotFound(err) != nil { return errors.Wrap(err, "get config map") } existingNodes := make([]string, 0) if !k8serrors.IsNotFound(err) { - cfg, ok := cm.Data[orchestrator.ConfigFileName] + cfg, ok := cmap.Data[orchestrator.ConfigFileName] if !ok { return errors.Errorf("key %s not found in ConfigMap", orchestrator.ConfigFileName) } @@ -1009,12 +1005,9 @@ func (r *PerconaServerMySQLReconciler) getPrimaryFromGR(ctx context.Context, cr return "", err } - db, err := replicator.NewReplicatorExec(firstPod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, fqdn) - if err != nil { - return "", errors.Wrapf(err, "open connection to %s", fqdn) - } + um := db.NewReplicationManager(firstPod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, fqdn) - return db.GetGroupReplicationPrimary(ctx) + return um.GetGroupReplicationPrimary(ctx) } func (r *PerconaServerMySQLReconciler) getPrimaryHost(ctx context.Context, cr *apiv1alpha1.PerconaServerMySQL) (string, error) { @@ -1060,10 +1053,7 @@ func (r *PerconaServerMySQLReconciler) stopAsyncReplication(ctx context.Context, if err != nil { return err } - repDb, err := replicator.NewReplicatorExec(pod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, hostname) - if err != nil { - return errors.Wrapf(err, "connect to replica %s", hostname) - } + repDb := db.NewReplicationManager(pod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, hostname) if err := orchestrator.StopReplicationExec(gCtx, r.ClientCmd, orcPod, hostname, port); err != nil { return errors.Wrapf(err, "stop replica %s", hostname) @@ -1074,7 +1064,7 @@ func (r *PerconaServerMySQLReconciler) stopAsyncReplication(ctx context.Context, return errors.Wrapf(err, "get replication status of %s", hostname) } - for status == replicator.ReplicationStatusActive { + for status == db.ReplicationStatusActive { time.Sleep(250 * time.Millisecond) status, _, err = repDb.ReplicationStatus(ctx) if err != nil { @@ -1117,14 +1107,10 @@ func (r *PerconaServerMySQLReconciler) startAsyncReplication(ctx context.Context if err != nil { return err } - db, err := replicator.NewReplicatorExec(pod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, hostname) - if err != nil { - return errors.Wrapf(err, "get db connection to %s", hostname) - } - defer db.Close() + um := db.NewReplicationManager(pod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, hostname) log.V(1).Info("Change replication source", "primary", primary.Key.Hostname, "replica", hostname) - if err := db.ChangeReplicationSource(ctx, primary.Key.Hostname, replicaPass, primary.Key.Port); err != nil { + if err := um.ChangeReplicationSource(ctx, primary.Key.Hostname, replicaPass, primary.Key.Port); err != nil { return errors.Wrapf(err, "change replication source on %s", hostname) } diff --git a/pkg/controller/ps/user.go b/pkg/controller/ps/user.go index ccfff446c..ab25349d7 100644 --- a/pkg/controller/ps/user.go +++ b/pkg/controller/ps/user.go @@ -17,13 +17,13 @@ import ( logf "sigs.k8s.io/controller-runtime/pkg/log" apiv1alpha1 "github.com/percona/percona-server-mysql-operator/api/v1alpha1" + "github.com/percona/percona-server-mysql-operator/pkg/db" "github.com/percona/percona-server-mysql-operator/pkg/haproxy" "github.com/percona/percona-server-mysql-operator/pkg/k8s" "github.com/percona/percona-server-mysql-operator/pkg/mysql" "github.com/percona/percona-server-mysql-operator/pkg/orchestrator" "github.com/percona/percona-server-mysql-operator/pkg/router" "github.com/percona/percona-server-mysql-operator/pkg/secret" - "github.com/percona/percona-server-mysql-operator/pkg/users" ) const ( @@ -216,11 +216,7 @@ func (r *PerconaServerMySQLReconciler) reconcileUsers(ctx context.Context, cr *a return err } - um, err := users.NewManagerExec(primPod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, primaryHost) - if err != nil { - return errors.Wrap(err, "init user manager") - } - defer um.Close() + um := db.NewUserManager(primPod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, primaryHost) var asyncPrimary *orchestrator.Instance @@ -236,7 +232,7 @@ func (r *PerconaServerMySQLReconciler) reconcileUsers(ctx context.Context, cr *a } } - if err := um.UpdateUserPasswords(updatedUsers); err != nil { + if err := um.UpdateUserPasswords(ctx, updatedUsers); err != nil { return errors.Wrapf(err, "update passwords") } @@ -346,13 +342,9 @@ func (r *PerconaServerMySQLReconciler) discardOldPasswordsAfterNewPropagated( return err } - um, err := users.NewManagerExec(primPod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, primaryHost) - if err != nil { - return errors.Wrap(err, "init user manager") - } - defer um.Close() + um := db.NewUserManager(primPod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, primaryHost) - if err := um.DiscardOldPasswords(updatedUsers); err != nil { + if err := um.DiscardOldPasswords(ctx, updatedUsers); err != nil { return errors.Wrap(err, "discard old passwords") } diff --git a/pkg/controller/psbackup/controller.go b/pkg/controller/psbackup/controller.go index 9b81b1bf1..dbb24c966 100644 --- a/pkg/controller/psbackup/controller.go +++ b/pkg/controller/psbackup/controller.go @@ -41,8 +41,8 @@ import ( logf "sigs.k8s.io/controller-runtime/pkg/log" apiv1alpha1 "github.com/percona/percona-server-mysql-operator/api/v1alpha1" + "github.com/percona/percona-server-mysql-operator/pkg/clientcmd" "github.com/percona/percona-server-mysql-operator/pkg/k8s" - "github.com/percona/percona-server-mysql-operator/pkg/mysql/topology" "github.com/percona/percona-server-mysql-operator/pkg/platform" "github.com/percona/percona-server-mysql-operator/pkg/secret" "github.com/percona/percona-server-mysql-operator/pkg/xtrabackup" @@ -53,6 +53,7 @@ type PerconaServerMySQLBackupReconciler struct { client.Client Scheme *runtime.Scheme ServerVersion *platform.ServerVersion + ClientCmd clientcmd.Client } //+kubebuilder:rbac:groups=ps.percona.com,resources=perconaservermysqlbackups;perconaservermysqlbackups/status;perconaservermysqlbackups/finalizers,verbs=get;list;watch;create;update;patch;delete @@ -317,17 +318,17 @@ func (r *PerconaServerMySQLBackupReconciler) getBackupSource(ctx context.Context return "", errors.Wrap(err, "get operator password") } - top, err := topology.Get(ctx, cluster, operatorPass) + top, err := getDBTopology(ctx, r.Client, r.ClientCmd, cluster, operatorPass) if err != nil { return "", errors.Wrap(err, "get topology") } var source string - if len(top.Replicas) < 1 { - source = top.Primary - log.Info("no replicas found, using primary as the backup source", "primary", top.Primary) + if len(top.replicas) < 1 { + source = top.primary + log.Info("no replicas found, using primary as the backup source", "primary", top.primary) } else { - source = top.Replicas[0] + source = top.replicas[0] } return source, nil diff --git a/pkg/controller/psbackup/topology.go b/pkg/controller/psbackup/topology.go new file mode 100644 index 000000000..02087120d --- /dev/null +++ b/pkg/controller/psbackup/topology.go @@ -0,0 +1,75 @@ +package psbackup + +import ( + "context" + + "github.com/pkg/errors" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + + apiv1alpha1 "github.com/percona/percona-server-mysql-operator/api/v1alpha1" + "github.com/percona/percona-server-mysql-operator/pkg/clientcmd" + "github.com/percona/percona-server-mysql-operator/pkg/db" + "github.com/percona/percona-server-mysql-operator/pkg/mysql" + "github.com/percona/percona-server-mysql-operator/pkg/orchestrator" +) + +// topology represents the topology of the database cluster. +type topology struct { + primary string + replicas []string +} + +// getDBTopology returns the topology of the database cluster. +func getDBTopology(ctx context.Context, cli client.Client, cliCmd clientcmd.Client, cluster *apiv1alpha1.PerconaServerMySQL, operatorPass string) (topology, error) { + switch cluster.Spec.MySQL.ClusterType { + case apiv1alpha1.ClusterTypeGR: + firstPod := &corev1.Pod{} + nn := types.NamespacedName{Namespace: cluster.Namespace, Name: mysql.PodName(cluster, 0)} + if err := cli.Get(ctx, nn, firstPod); err != nil { + return topology{}, err + } + + fqdn := mysql.FQDN(cluster, 0) + + rm := db.NewReplicationManager(firstPod, cliCmd, apiv1alpha1.UserOperator, operatorPass, fqdn) + + replicas, err := rm.GetGroupReplicationReplicas(ctx) + if err != nil { + return topology{}, errors.Wrap(err, "get group-replication replicas") + } + + primary, err := rm.GetGroupReplicationPrimary(ctx) + if err != nil { + return topology{}, errors.Wrap(err, "get group-replication primary") + } + return topology{ + primary: primary, + replicas: replicas, + }, nil + case apiv1alpha1.ClusterTypeAsync: + pod := &corev1.Pod{} + nn := types.NamespacedName{Namespace: cluster.Namespace, Name: orchestrator.PodName(cluster, 0)} + if err := cli.Get(ctx, nn, pod); err != nil { + return topology{}, err + } + + primary, err := orchestrator.ClusterPrimaryExec(ctx, cliCmd, pod, cluster.ClusterHint()) + + if err != nil { + return topology{}, errors.Wrap(err, "get primary") + } + + replicas := make([]string, 0, len(primary.Replicas)) + for _, r := range primary.Replicas { + replicas = append(replicas, r.Hostname) + } + return topology{ + primary: primary.Key.Hostname, + replicas: replicas, + }, nil + default: + return topology{}, errors.New("unknown cluster type") + } +} diff --git a/pkg/db/db.go b/pkg/db/db.go new file mode 100644 index 000000000..0d9636996 --- /dev/null +++ b/pkg/db/db.go @@ -0,0 +1,46 @@ +package db + +import ( + "bytes" + "context" + "fmt" + "regexp" + "strings" + + "github.com/pkg/errors" + corev1 "k8s.io/api/core/v1" + + apiv1alpha1 "github.com/percona/percona-server-mysql-operator/api/v1alpha1" + "github.com/percona/percona-server-mysql-operator/pkg/clientcmd" +) + +var sensitiveRegexp = regexp.MustCompile(":.*@") + +type db struct { + client clientcmd.Client + pod *corev1.Pod + user apiv1alpha1.SystemUser + pass string + host string +} + +func newDB(pod *corev1.Pod, cliCmd clientcmd.Client, user apiv1alpha1.SystemUser, pass, host string) *db { + return &db{client: cliCmd, pod: pod, user: user, pass: pass, host: host} +} + +func (d *db) exec(ctx context.Context, stm string, stdout, stderr *bytes.Buffer) error { + cmd := []string{"mysql", "--database", "performance_schema", fmt.Sprintf("-p%s", d.pass), "-u", string(d.user), "-h", d.host, "-e", stm} + + err := d.client.Exec(ctx, d.pod, "mysql", cmd, nil, stdout, stderr, false) + if err != nil { + sout := sensitiveRegexp.ReplaceAllString(stdout.String(), ":*****@") + serr := sensitiveRegexp.ReplaceAllString(stderr.String(), ":*****@") + return errors.Wrapf(err, "stdout: %s, stderr: %s", sout, serr) + } + + if strings.Contains(stderr.String(), "ERROR") { + return fmt.Errorf("sql error: %s", stderr) + } + + return nil +} diff --git a/pkg/db/replication.go b/pkg/db/replication.go new file mode 100644 index 000000000..f5a04b1f3 --- /dev/null +++ b/pkg/db/replication.go @@ -0,0 +1,169 @@ +package db + +import ( + "bytes" + "context" + "database/sql" + "encoding/csv" + "fmt" + "github.com/gocarina/gocsv" + "strings" + + "github.com/pkg/errors" + corev1 "k8s.io/api/core/v1" + + apiv1alpha1 "github.com/percona/percona-server-mysql-operator/api/v1alpha1" + "github.com/percona/percona-server-mysql-operator/pkg/clientcmd" +) + +const defaultChannelName = "" + +type ReplicationStatus int8 + +const ( + ReplicationStatusActive ReplicationStatus = iota + ReplicationStatusError + ReplicationStatusNotInitiated +) + +type MemberState string + +const ( + MemberStateOnline MemberState = "ONLINE" + MemberStateOffline MemberState = "OFFLINE" + MemberStateError MemberState = "ERROR" +) + +type ReplicationDBManager struct { + db *db +} + +func NewReplicationManager(pod *corev1.Pod, cliCmd clientcmd.Client, user apiv1alpha1.SystemUser, pass, host string) *ReplicationDBManager { + return &ReplicationDBManager{db: newDB(pod, cliCmd, user, pass, host)} +} + +func (m *ReplicationDBManager) query(ctx context.Context, query string, out interface{}) error { + var errb, outb bytes.Buffer + err := m.db.exec(ctx, query, &outb, &errb) + if err != nil { + return err + } + + if !strings.Contains(errb.String(), "ERROR") && outb.Len() == 0 { + return sql.ErrNoRows + } + + r := csv.NewReader(bytes.NewReader(outb.Bytes())) + r.Comma = '\t' + + if err = gocsv.UnmarshalCSV(r, out); err != nil { + return err + } + + return nil +} + +func (m *ReplicationDBManager) ChangeReplicationSource(ctx context.Context, host, replicaPass string, port int32) error { + var errb, outb bytes.Buffer + q := fmt.Sprintf(` + CHANGE REPLICATION SOURCE TO + SOURCE_USER='%s', + SOURCE_PASSWORD='%s', + SOURCE_HOST='%s', + SOURCE_PORT=%d, + SOURCE_SSL=1, + SOURCE_CONNECTION_AUTO_FAILOVER=1, + SOURCE_AUTO_POSITION=1, + SOURCE_RETRY_COUNT=3, + SOURCE_CONNECT_RETRY=60 + `, apiv1alpha1.UserReplication, replicaPass, host, port) + err := m.db.exec(ctx, q, &outb, &errb) + + if err != nil { + return errors.Wrap(err, "exec CHANGE REPLICATION SOURCE TO") + } + + return nil +} + +func (m *ReplicationDBManager) ReplicationStatus(ctx context.Context) (ReplicationStatus, string, error) { + rows := []*struct { + IoState string `csv:"conn_state"` + SqlState string `csv:"applier_state"` + Host string `csv:"host"` + }{} + + q := fmt.Sprintf(` + SELECT + connection_status.SERVICE_STATE as conn_state, + applier_status.SERVICE_STATE as applier_state, + HOST as host + FROM replication_connection_status connection_status + JOIN replication_connection_configuration connection_configuration + ON connection_status.channel_name = connection_configuration.channel_name + JOIN replication_applier_status applier_status + ON connection_status.channel_name = applier_status.channel_name + WHERE connection_status.channel_name = '%s' + `, defaultChannelName) + err := m.query(ctx, q, &rows) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return ReplicationStatusNotInitiated, "", nil + } + return ReplicationStatusError, "", errors.Wrap(err, "scan replication status") + } + + if rows[0].IoState == "ON" && rows[0].SqlState == "ON" { + return ReplicationStatusActive, rows[0].Host, nil + } + + return ReplicationStatusNotInitiated, "", err +} + +func (m *ReplicationDBManager) GetGroupReplicationPrimary(ctx context.Context) (string, error) { + rows := []*struct { + Host string `csv:"host"` + }{} + + err := m.query(ctx, "SELECT MEMBER_HOST as host FROM replication_group_members WHERE MEMBER_ROLE='PRIMARY' AND MEMBER_STATE='ONLINE'", &rows) + if err != nil { + return "", errors.Wrap(err, "query primary member") + } + + return rows[0].Host, nil +} + +// TODO: finish implementation +func (m *ReplicationDBManager) GetGroupReplicationReplicas(ctx context.Context) ([]string, error) { + rows := []*struct { + Host string `csv:"host"` + }{} + + err := m.query(ctx, "SELECT MEMBER_HOST as host FROM replication_group_members WHERE MEMBER_ROLE='SECONDARY' AND MEMBER_STATE='ONLINE'", &rows) + if err != nil && !errors.Is(err, sql.ErrNoRows) { + return nil, errors.Wrap(err, "query replicas") + } + + replicas := make([]string, 0) + for _, row := range rows { + replicas = append(replicas, row.Host) + } + + return replicas, nil +} + +func (m *ReplicationDBManager) GetMemberState(ctx context.Context, host string) (MemberState, error) { + rows := []*struct { + State MemberState `csv:"state"` + }{} + q := fmt.Sprintf(`SELECT MEMBER_STATE as state FROM replication_group_members WHERE MEMBER_HOST='%s'`, host) + err := m.query(ctx, q, &rows) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return MemberStateOffline, nil + } + return MemberStateError, errors.Wrap(err, "query member state") + } + + return rows[0].State, nil +} diff --git a/pkg/db/users.go b/pkg/db/users.go new file mode 100644 index 000000000..64e167769 --- /dev/null +++ b/pkg/db/users.go @@ -0,0 +1,73 @@ +package db + +import ( + "bytes" + "context" + "fmt" + "github.com/pkg/errors" + corev1 "k8s.io/api/core/v1" + "strings" + + apiv1alpha1 "github.com/percona/percona-server-mysql-operator/api/v1alpha1" + "github.com/percona/percona-server-mysql-operator/pkg/clientcmd" + "github.com/percona/percona-server-mysql-operator/pkg/mysql" +) + +type UserManager struct { + db *db +} + +func NewUserManager(pod *corev1.Pod, cliCmd clientcmd.Client, user apiv1alpha1.SystemUser, pass, host string) *UserManager { + return &UserManager{db: newDB(pod, cliCmd, user, pass, host)} +} + +// UpdateUserPasswords updates user passwords but retains the current password using Dual Password feature of MySQL 8 +func (m *UserManager) UpdateUserPasswords(ctx context.Context, users []mysql.User) error { + for _, user := range users { + for _, host := range user.Hosts { + q := fmt.Sprintf("ALTER USER '%s'@'%s' IDENTIFIED BY '%s' RETAIN CURRENT PASSWORD", user.Username, host, escapePass(user.Password)) + var errb, outb bytes.Buffer + err := m.db.exec(ctx, q, &outb, &errb) + if err != nil { + return errors.Wrap(err, "alter user") + } + } + } + + var errb, outb bytes.Buffer + err := m.db.exec(ctx, "FLUSH PRIVILEGES", &outb, &errb) + if err != nil { + return errors.Wrap(err, "flush privileges") + } + + return nil +} + +// DiscardOldPasswords discards old passwords of givens users +func (m *UserManager) DiscardOldPasswords(ctx context.Context, users []mysql.User) error { + for _, user := range users { + for _, host := range user.Hosts { + q := fmt.Sprintf("ALTER USER '%s'@'%s' DISCARD OLD PASSWORD", user.Username, host) + var errb, outb bytes.Buffer + err := m.db.exec(ctx, q, &outb, &errb) + if err != nil { + return errors.Wrap(err, "discard old password") + } + } + } + + var errb, outb bytes.Buffer + err := m.db.exec(ctx, "FLUSH PRIVILEGES", &outb, &errb) + if err != nil { + return errors.Wrap(err, "flush privileges") + } + + return nil +} + +func escapePass(pass string) string { + s := strings.ReplaceAll(pass, `'`, `\'`) + s = strings.ReplaceAll(s, `"`, `\"`) + s = strings.ReplaceAll(s, `\`, `\\`) + return s +} diff --git a/pkg/mysql/topology/topology.go b/pkg/mysql/topology/topology.go deleted file mode 100644 index 32a5b143a..000000000 --- a/pkg/mysql/topology/topology.go +++ /dev/null @@ -1,77 +0,0 @@ -package topology - -import ( - "context" - - "github.com/pkg/errors" - - apiv1alpha1 "github.com/percona/percona-server-mysql-operator/api/v1alpha1" - "github.com/percona/percona-server-mysql-operator/pkg/mysql" - "github.com/percona/percona-server-mysql-operator/pkg/orchestrator" - "github.com/percona/percona-server-mysql-operator/pkg/replicator" -) - -type Topology struct { - Primary string - Replicas []string -} - -func Get(ctx context.Context, cluster *apiv1alpha1.PerconaServerMySQL, operatorPass string) (Topology, error) { - var err error - var top Topology - switch cluster.Spec.MySQL.ClusterType { - case apiv1alpha1.ClusterTypeGR: - top, err = getGRTopology(ctx, cluster, operatorPass) - if err != nil { - return Topology{}, errors.Wrap(err, "get group-replication topology") - } - case apiv1alpha1.ClusterTypeAsync: - top, err = getAsyncTopology(ctx, cluster) - if err != nil { - return Topology{}, errors.Wrap(err, "get async topology") - } - default: - return Topology{}, errors.New("unknown cluster type") - } - return top, nil -} - -func getGRTopology(ctx context.Context, cluster *apiv1alpha1.PerconaServerMySQL, operatorPass string) (Topology, error) { - fqdn := mysql.FQDN(cluster, 0) - db, err := replicator.NewReplicator(ctx, apiv1alpha1.UserOperator, operatorPass, fqdn, mysql.DefaultAdminPort) - if err != nil { - return Topology{}, errors.Wrapf(err, "open connection to %s", fqdn) - } - defer db.Close() - - replicas, err := db.GetGroupReplicationReplicas(ctx) - if err != nil { - return Topology{}, errors.Wrap(err, "get group-replication replicas") - } - - primary, err := db.GetGroupReplicationPrimary(ctx) - if err != nil { - return Topology{}, errors.Wrap(err, "get group-replication primary") - } - return Topology{ - Primary: primary, - Replicas: replicas, - }, nil -} - -func getAsyncTopology(ctx context.Context, cluster *apiv1alpha1.PerconaServerMySQL) (Topology, error) { - orcHost := orchestrator.APIHost(cluster) - primary, err := orchestrator.ClusterPrimary(ctx, orcHost, cluster.ClusterHint()) - if err != nil { - return Topology{}, errors.Wrap(err, "get primary") - } - - replicas := make([]string, 0, len(primary.Replicas)) - for _, r := range primary.Replicas { - replicas = append(replicas, r.Hostname) - } - return Topology{ - Primary: primary.Key.Hostname, - Replicas: replicas, - }, nil -} diff --git a/pkg/orchestrator/client.go b/pkg/orchestrator/client.go deleted file mode 100644 index 5f9ef2bbe..000000000 --- a/pkg/orchestrator/client.go +++ /dev/null @@ -1,242 +0,0 @@ -package orchestrator - -import ( - "context" - "encoding/json" - "fmt" - "io" - "net/http" - - "github.com/pkg/errors" -) - -type orcResponse struct { - Code string `json:"Code"` - Message string `json:"Message"` - Details interface{} `json:"Details,omitempty"` -} - -type InstanceKey struct { - Hostname string `json:"Hostname"` - Port int32 `json:"Port"` -} - -type Instance struct { - Key InstanceKey `json:"Key"` - Alias string `json:"InstanceAlias"` - MasterKey InstanceKey `json:"MasterKey"` - Replicas []InstanceKey `json:"Replicas"` - ReadOnly bool `json:"ReadOnly"` -} - -func ClusterPrimary(ctx context.Context, apiHost, clusterHint string) (*Instance, error) { - url := fmt.Sprintf("%s/api/master/%s", apiHost, clusterHint) - - resp, err := doRequest(ctx, url) - if err != nil { - return nil, errors.Wrapf(err, "do request to %s", url) - } - defer resp.Body.Close() - - body, err := io.ReadAll(resp.Body) - if err != nil { - return nil, errors.Wrap(err, "read response body") - } - - primary := &Instance{} - if err := json.Unmarshal(body, primary); err == nil { - return primary, nil - } - - orcResp := &orcResponse{} - if err := json.Unmarshal(body, orcResp); err != nil { - return nil, errors.Wrap(err, "json decode") - } - - if orcResp.Code == "ERROR" { - return nil, errors.New(orcResp.Message) - } - - return primary, nil -} - -func StopReplication(ctx context.Context, apiHost, host string, port int32) error { - url := fmt.Sprintf("%s/api/stop-replica/%s/%d", apiHost, host, port) - - resp, err := doRequest(ctx, url) - if err != nil { - return errors.Wrapf(err, "do request to %s", url) - } - defer resp.Body.Close() - - orcResp := &orcResponse{} - if err := json.NewDecoder(resp.Body).Decode(orcResp); err != nil { - return errors.Wrap(err, "json decode") - } - - if orcResp.Code == "ERROR" { - return errors.New(orcResp.Message) - } - - return nil -} - -func StartReplication(ctx context.Context, apiHost, host string, port int32) error { - url := fmt.Sprintf("%s/api/start-replica/%s/%d", apiHost, host, port) - - resp, err := doRequest(ctx, url) - if err != nil { - return errors.Wrapf(err, "do request to %s", url) - } - defer resp.Body.Close() - - orcResp := &orcResponse{} - if err := json.NewDecoder(resp.Body).Decode(orcResp); err != nil { - return errors.Wrap(err, "json decode") - } - - if orcResp.Code == "ERROR" { - return errors.New(orcResp.Message) - } - - return nil -} - -func AddPeer(ctx context.Context, apiHost string, peer string) error { - url := fmt.Sprintf("%s/api/raft-add-peer/%s", apiHost, peer) - - resp, err := doRequest(ctx, url) - if err != nil { - return errors.Wrapf(err, "do request to %s", url) - } - defer resp.Body.Close() - - body, err := io.ReadAll(resp.Body) - if err != nil { - return errors.Wrap(err, "read response body") - } - - // Orchestrator returns peer IP as string on success - o := "" - if err := json.Unmarshal(body, &o); err == nil { - return nil - } - - orcResp := &orcResponse{} - if err := json.Unmarshal(body, &orcResp); err != nil { - return errors.Wrap(err, "json decode") - } - - if orcResp.Code == "ERROR" { - return errors.New(orcResp.Message) - } - - return nil -} - -func RemovePeer(ctx context.Context, apiHost string, peer string) error { - url := fmt.Sprintf("%s/api/raft-remove-peer/%s", apiHost, peer) - - resp, err := doRequest(ctx, url) - if err != nil { - return errors.Wrapf(err, "do request to %s", url) - } - defer resp.Body.Close() - - body, err := io.ReadAll(resp.Body) - if err != nil { - return errors.Wrap(err, "read response body") - } - - // Orchestrator returns peer IP as string on success - o := "" - if err := json.Unmarshal(body, &o); err == nil { - return nil - } - - orcResp := &orcResponse{} - if err := json.Unmarshal(body, &orcResp); err != nil { - return errors.Wrap(err, "json decode") - } - - if orcResp.Code == "ERROR" { - return errors.New(orcResp.Message) - } - - return nil -} - -func EnsureNodeIsPrimary(ctx context.Context, apiHost, clusterHint, host string, port int) error { - primary, err := ClusterPrimary(ctx, apiHost, clusterHint) - if err != nil { - return errors.Wrap(err, "get cluster primary") - } - - if primary.Alias == host { - return nil - } - - // /api/graceful-master-takeover-auto/cluster1.default/cluster1-mysql-0/3306 - url := fmt.Sprintf("%s/api/graceful-master-takeover-auto/%s/%s/%d", apiHost, clusterHint, host, port) - - resp, err := doRequest(ctx, url) - if err != nil { - return errors.Wrapf(err, "do request to %s", url) - } - defer resp.Body.Close() - - orcResp := &orcResponse{} - if err := json.NewDecoder(resp.Body).Decode(orcResp); err != nil { - return errors.Wrap(err, "json decode") - } - - if orcResp.Code == "ERROR" { - return errors.New(orcResp.Message) - } - - return nil -} - -var ErrEmptyResponse = errors.New("empty response") - -func Discover(ctx context.Context, apiHost, host string, port int) error { - url := fmt.Sprintf("%s/api/discover/%s/%d", apiHost, host, port) - - resp, err := doRequest(ctx, url) - if err != nil { - return errors.Wrapf(err, "do request to %s", url) - } - defer resp.Body.Close() - - orcResp := new(orcResponse) - data, err := io.ReadAll(resp.Body) - if err != nil { - return errors.Wrap(err, "read response body") - } - - if len(data) == 0 { - return ErrEmptyResponse - } - - if err := json.Unmarshal(data, orcResp); err != nil { - return errors.Wrapf(err, "json decode \"%s\"", string(data)) - } - - if orcResp.Code == "ERROR" { - return errors.New(orcResp.Message) - } - return nil -} - -func doRequest(ctx context.Context, url string) (*http.Response, error) { - req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) - if err != nil { - return nil, errors.Wrap(err, "make request") - } - resp, err := http.DefaultClient.Do(req) - if err != nil { - return nil, errors.Wrap(err, "do request") - } - - return resp, nil -} diff --git a/pkg/orchestrator/clientexec.go b/pkg/orchestrator/clientexec.go index 41242621d..046eac244 100644 --- a/pkg/orchestrator/clientexec.go +++ b/pkg/orchestrator/clientexec.go @@ -12,6 +12,27 @@ import ( "github.com/percona/percona-server-mysql-operator/pkg/clientcmd" ) +type orcResponse struct { + Code string `json:"Code"` + Message string `json:"Message"` + Details interface{} `json:"Details,omitempty"` +} + +type InstanceKey struct { + Hostname string `json:"Hostname"` + Port int32 `json:"Port"` +} + +type Instance struct { + Key InstanceKey `json:"Key"` + Alias string `json:"InstanceAlias"` + MasterKey InstanceKey `json:"MasterKey"` + Replicas []InstanceKey `json:"Replicas"` + ReadOnly bool `json:"ReadOnly"` +} + +var ErrEmptyResponse = errors.New("empty response") + func exec(ctx context.Context, cliCmd clientcmd.Client, pod *corev1.Pod, endpoint string, outb, errb *bytes.Buffer) error { c := []string{"curl", fmt.Sprintf("localhost:%d/%s", defaultWebPort, endpoint)} err := cliCmd.Exec(ctx, pod, "orc", c, nil, outb, errb, false) diff --git a/pkg/platform/platform.go b/pkg/platform/platform.go index c23373963..5b1877121 100644 --- a/pkg/platform/platform.go +++ b/pkg/platform/platform.go @@ -29,7 +29,7 @@ var ( mx sync.Mutex ) -// Server returns server version and platform (k8s|oc) +// GetServerVersion returns server version and platform (k8s|oc) // it performs API requests for the first invocation and then returns "cached" value func GetServerVersion(cliCmd clientcmd.Client) (*ServerVersion, error) { mx.Lock() diff --git a/pkg/replicator/replicator.go b/pkg/replicator/replicator.go deleted file mode 100644 index 435d3527c..000000000 --- a/pkg/replicator/replicator.go +++ /dev/null @@ -1,431 +0,0 @@ -package replicator - -import ( - "context" - "database/sql" - "fmt" - - "github.com/go-sql-driver/mysql" - "github.com/pkg/errors" - - apiv1alpha1 "github.com/percona/percona-server-mysql-operator/api/v1alpha1" - "github.com/percona/percona-server-mysql-operator/pkg/innodbcluster" -) - -const DefaultChannelName = "" - -type ReplicationStatus int8 - -var ErrRestartAfterClone error = errors.New("Error 3707: Restart server failed (mysqld is not managed by supervisor process).") - -const ( - ReplicationStatusActive ReplicationStatus = iota - ReplicationStatusError - ReplicationStatusNotInitiated -) - -type MemberState string - -const ( - MemberStateOnline MemberState = "ONLINE" - MemberStateRecovering MemberState = "RECOVERING" - MemberStateOffline MemberState = "OFFLINE" - MemberStateError MemberState = "ERROR" - MemberStateUnreachable MemberState = "UNREACHABLE" -) - -var ErrGroupReplicationNotReady = errors.New("Error 3092: The server is not configured properly to be an active member of the group.") - -type Replicator interface { - ChangeReplicationSource(ctx context.Context, host, replicaPass string, port int32) error - StartReplication(ctx context.Context, host, replicaPass string, port int32) error - StopReplication(ctx context.Context) error - ResetReplication(ctx context.Context) error - ReplicationStatus(ctx context.Context) (ReplicationStatus, string, error) - EnableSuperReadonly(ctx context.Context) error - DisableSuperReadonly(ctx context.Context) error - IsReadonly(ctx context.Context) (bool, error) - ReportHost(ctx context.Context) (string, error) - Close() error - CloneInProgress(ctx context.Context) (bool, error) - NeedsClone(ctx context.Context, donor string, port int32) (bool, error) - Clone(ctx context.Context, donor, user, pass string, port int32) error - IsReplica(ctx context.Context) (bool, error) - DumbQuery(ctx context.Context) error - GetGlobal(ctx context.Context, variable string) (interface{}, error) - SetGlobal(ctx context.Context, variable, value interface{}) error - StartGroupReplication(ctx context.Context, password string) error - StopGroupReplication(ctx context.Context) error - GetGroupReplicationPrimary(ctx context.Context) (string, error) - GetGroupReplicationReplicas(ctx context.Context) ([]string, error) - GetMemberState(ctx context.Context, host string) (MemberState, error) - GetGroupReplicationMembers(ctx context.Context) ([]string, error) - CheckIfDatabaseExists(ctx context.Context, name string) (bool, error) - CheckIfInPrimaryPartition(ctx context.Context) (bool, error) - CheckIfPrimaryUnreachable(ctx context.Context) (bool, error) -} - -type dbImpl struct{ db *sql.DB } - -func NewReplicator(ctx context.Context, user apiv1alpha1.SystemUser, pass, host string, port int32) (Replicator, error) { - config := mysql.NewConfig() - - config.User = string(user) - config.Passwd = pass - config.Net = "tcp" - config.Addr = fmt.Sprintf("%s:%d", host, port) - config.DBName = "performance_schema" - config.Params = map[string]string{ - "interpolateParams": "true", - "timeout": "10s", - "readTimeout": "10s", - "writeTimeout": "10s", - "tls": "preferred", - } - - db, err := sql.Open("mysql", config.FormatDSN()) - if err != nil { - return nil, errors.Wrap(err, "connect to MySQL") - } - - if err := db.PingContext(ctx); err != nil { - return nil, errors.Wrap(err, "ping database") - } - - return &dbImpl{db}, nil -} - -func (d *dbImpl) ChangeReplicationSource(ctx context.Context, host, replicaPass string, port int32) error { - // TODO: Make retries configurable - _, err := d.db.ExecContext(ctx, ` - CHANGE REPLICATION SOURCE TO - SOURCE_USER=?, - SOURCE_PASSWORD=?, - SOURCE_HOST=?, - SOURCE_PORT=?, - SOURCE_SSL=1, - SOURCE_CONNECTION_AUTO_FAILOVER=1, - SOURCE_AUTO_POSITION=1, - SOURCE_RETRY_COUNT=3, - SOURCE_CONNECT_RETRY=60 - `, apiv1alpha1.UserReplication, replicaPass, host, port) - if err != nil { - return errors.Wrap(err, "exec CHANGE REPLICATION SOURCE TO") - } - - return nil -} - -func (d *dbImpl) StartReplication(ctx context.Context, host, replicaPass string, port int32) error { - if err := d.ChangeReplicationSource(ctx, host, replicaPass, port); err != nil { - return errors.Wrap(err, "change replication source") - } - - _, err := d.db.ExecContext(ctx, "START REPLICA") - return errors.Wrap(err, "start replication") -} - -func (d *dbImpl) StopReplication(ctx context.Context) error { - _, err := d.db.ExecContext(ctx, "STOP REPLICA") - return errors.Wrap(err, "stop replication") -} - -func (d *dbImpl) ResetReplication(ctx context.Context) error { - _, err := d.db.ExecContext(ctx, "RESET REPLICA ALL") - return errors.Wrap(err, "reset replication") -} - -func (d *dbImpl) ReplicationStatus(ctx context.Context) (ReplicationStatus, string, error) { - row := d.db.QueryRowContext(ctx, ` - SELECT - connection_status.SERVICE_STATE, - applier_status.SERVICE_STATE, - HOST - FROM replication_connection_status connection_status - JOIN replication_connection_configuration connection_configuration - ON connection_status.channel_name = connection_configuration.channel_name - JOIN replication_applier_status applier_status - ON connection_status.channel_name = applier_status.channel_name - WHERE connection_status.channel_name = ? - `, DefaultChannelName) - - var ioState, sqlState, host string - if err := row.Scan(&ioState, &sqlState, &host); err != nil { - if errors.Is(err, sql.ErrNoRows) { - return ReplicationStatusNotInitiated, "", nil - } - return ReplicationStatusError, "", errors.Wrap(err, "scan replication status") - } - - if ioState == "ON" && sqlState == "ON" { - return ReplicationStatusActive, host, nil - } - - return ReplicationStatusNotInitiated, "", nil -} - -func (d *dbImpl) IsReplica(ctx context.Context) (bool, error) { - status, _, err := d.ReplicationStatus(ctx) - return status == ReplicationStatusActive, errors.Wrap(err, "get replication status") -} - -func (d *dbImpl) EnableSuperReadonly(ctx context.Context) error { - _, err := d.db.ExecContext(ctx, "SET GLOBAL SUPER_READ_ONLY=1") - return errors.Wrap(err, "set global super_read_only param to 1") -} - -func (d *dbImpl) DisableSuperReadonly(ctx context.Context) error { - _, err := d.db.ExecContext(ctx, "SET GLOBAL SUPER_READ_ONLY=0") - return errors.Wrap(err, "set global super_read_only param to 0") -} - -func (d *dbImpl) IsReadonly(ctx context.Context) (bool, error) { - var readonly int - err := d.db.QueryRowContext(ctx, "select @@read_only and @@super_read_only").Scan(&readonly) - return readonly == 1, errors.Wrap(err, "select global read_only param") -} - -func (d *dbImpl) ReportHost(ctx context.Context) (string, error) { - var reportHost string - err := d.db.QueryRowContext(ctx, "select @@report_host").Scan(&reportHost) - return reportHost, errors.Wrap(err, "select report_host param") -} - -func (d *dbImpl) Close() error { - return d.db.Close() -} - -func (d *dbImpl) CloneInProgress(ctx context.Context) (bool, error) { - rows, err := d.db.QueryContext(ctx, "SELECT STATE FROM clone_status") - if err != nil { - return false, errors.Wrap(err, "fetch clone status") - } - defer rows.Close() - - for rows.Next() { - var state string - if err := rows.Scan(&state); err != nil { - return false, errors.Wrap(err, "scan rows") - } - - if state != "Completed" && state != "Failed" { - return true, nil - } - } - - return false, nil -} - -func (d *dbImpl) NeedsClone(ctx context.Context, donor string, port int32) (bool, error) { - rows, err := d.db.QueryContext(ctx, "SELECT SOURCE, STATE FROM clone_status") - if err != nil { - return false, errors.Wrap(err, "fetch clone status") - } - defer rows.Close() - - for rows.Next() { - var source, state string - if err := rows.Scan(&source, &state); err != nil { - return false, errors.Wrap(err, "scan rows") - } - if source == fmt.Sprintf("%s:%d", donor, port) && state == "Completed" { - return false, nil - } - } - - return true, nil -} - -func (d *dbImpl) Clone(ctx context.Context, donor, user, pass string, port int32) error { - _, err := d.db.ExecContext(ctx, "SET GLOBAL clone_valid_donor_list=?", fmt.Sprintf("%s:%d", donor, port)) - if err != nil { - return errors.Wrap(err, "set clone_valid_donor_list") - } - - _, err = d.db.ExecContext(ctx, "CLONE INSTANCE FROM ?@?:? IDENTIFIED BY ?", user, donor, port, pass) - - mErr, ok := err.(*mysql.MySQLError) - if !ok { - return errors.Wrap(err, "clone instance") - } - - // Error 3707: Restart server failed (mysqld is not managed by supervisor process). - if mErr.Number == uint16(3707) { - return ErrRestartAfterClone - } - - return nil -} - -func (d *dbImpl) DumbQuery(ctx context.Context) error { - _, err := d.db.ExecContext(ctx, "SELECT 1") - return errors.Wrap(err, "SELECT 1") -} - -func (d *dbImpl) GetGlobal(ctx context.Context, variable string) (interface{}, error) { - // TODO: check how to do this without being vulnerable to injection - var value interface{} - err := d.db.QueryRowContext(ctx, fmt.Sprintf("SELECT @@%s", variable)).Scan(&value) - return value, errors.Wrapf(err, "SELECT @@%s", variable) -} - -func (d *dbImpl) SetGlobal(ctx context.Context, variable, value interface{}) error { - _, err := d.db.ExecContext(ctx, fmt.Sprintf("SET GLOBAL %s=?", variable), value) - return errors.Wrapf(err, "SET GLOBAL %s=%s", variable, value) -} - -func (d *dbImpl) StartGroupReplication(ctx context.Context, password string) error { - _, err := d.db.ExecContext(ctx, "START GROUP_REPLICATION USER=?, PASSWORD=?", apiv1alpha1.UserReplication, password) - - mErr, ok := err.(*mysql.MySQLError) - if !ok { - return errors.Wrap(err, "start group replication") - } - - // Error 3092: The server is not configured properly to be an active member of the group. - if mErr.Number == uint16(3092) { - return ErrGroupReplicationNotReady - } - - return errors.Wrap(err, "start group replication") -} - -func (d *dbImpl) StopGroupReplication(ctx context.Context) error { - _, err := d.db.ExecContext(ctx, "STOP GROUP_REPLICATION") - return errors.Wrap(err, "stop group replication") -} - -func (d *dbImpl) GetGroupReplicationPrimary(ctx context.Context) (string, error) { - var host string - - err := d.db.QueryRowContext(ctx, "SELECT MEMBER_HOST FROM replication_group_members WHERE MEMBER_ROLE='PRIMARY' AND MEMBER_STATE='ONLINE'").Scan(&host) - if err != nil { - return "", errors.Wrap(err, "query primary member") - } - - return host, nil -} - -func (d *dbImpl) GetGroupReplicationReplicas(ctx context.Context) ([]string, error) { - replicas := make([]string, 0) - - rows, err := d.db.QueryContext(ctx, "SELECT MEMBER_HOST FROM replication_group_members WHERE MEMBER_ROLE='SECONDARY' AND MEMBER_STATE='ONLINE'") - if err != nil { - return nil, errors.Wrap(err, "query replicas") - } - defer rows.Close() - - for rows.Next() { - var host string - if err := rows.Scan(&host); err != nil { - return nil, errors.Wrap(err, "scan rows") - } - - replicas = append(replicas, host) - } - - return replicas, nil -} - -func (d *dbImpl) GetMemberState(ctx context.Context, host string) (MemberState, error) { - var state MemberState - - err := d.db.QueryRowContext(ctx, "SELECT MEMBER_STATE FROM replication_group_members WHERE MEMBER_HOST=?", host).Scan(&state) - if err != nil { - if errors.Is(err, sql.ErrNoRows) { - return MemberStateOffline, nil - } - return MemberStateError, errors.Wrap(err, "query member state") - } - - return state, nil -} - -func (d *dbImpl) GetGroupReplicationMembers(ctx context.Context) ([]string, error) { - members := make([]string, 0) - - rows, err := d.db.QueryContext(ctx, "SELECT MEMBER_HOST FROM replication_group_members") - if err != nil { - return nil, errors.Wrap(err, "query members") - } - defer rows.Close() - - for rows.Next() { - var host string - if err := rows.Scan(&host); err != nil { - return nil, errors.Wrap(err, "scan rows") - } - - members = append(members, host) - } - - return members, nil -} - -func (d *dbImpl) CheckIfDatabaseExists(ctx context.Context, name string) (bool, error) { - var db string - - err := d.db.QueryRowContext(ctx, "SHOW DATABASES LIKE ?", name).Scan(&db) - if err != nil { - if errors.Is(err, sql.ErrNoRows) { - return false, nil - } - return false, err - } - - return true, nil -} - -func (d *dbImpl) CheckIfInPrimaryPartition(ctx context.Context) (bool, error) { - var in bool - - err := d.db.QueryRowContext(ctx, ` - SELECT - MEMBER_STATE = 'ONLINE' - AND ( - ( - SELECT - COUNT(*) - FROM - performance_schema.replication_group_members - WHERE - MEMBER_STATE NOT IN ('ONLINE', 'RECOVERING') - ) >= ( - ( - SELECT - COUNT(*) - FROM - performance_schema.replication_group_members - ) / 2 - ) = 0 - ) - FROM - performance_schema.replication_group_members - JOIN performance_schema.replication_group_member_stats USING(member_id) - WHERE - member_id = @@global.server_uuid; - `).Scan(&in) - if err != nil { - return false, err - } - - return in, nil -} - -func (d *dbImpl) CheckIfPrimaryUnreachable(ctx context.Context) (bool, error) { - var state string - - err := d.db.QueryRowContext(ctx, ` - SELECT - MEMBER_STATE - FROM - performance_schema.replication_group_members - WHERE - MEMBER_ROLE = 'PRIMARY' - `).Scan(&state) - if err != nil { - return false, err - } - - return state == string(innodbcluster.MemberStateUnreachable), nil -} diff --git a/pkg/replicator/replicatorexec.go b/pkg/replicator/replicatorexec.go deleted file mode 100644 index 46cb2ee03..000000000 --- a/pkg/replicator/replicatorexec.go +++ /dev/null @@ -1,457 +0,0 @@ -package replicator - -import ( - "bytes" - "context" - "database/sql" - "encoding/csv" - "fmt" - "regexp" - "strings" - - "github.com/go-sql-driver/mysql" - "github.com/gocarina/gocsv" - "github.com/pkg/errors" - corev1 "k8s.io/api/core/v1" - - apiv1alpha1 "github.com/percona/percona-server-mysql-operator/api/v1alpha1" - "github.com/percona/percona-server-mysql-operator/pkg/clientcmd" - "github.com/percona/percona-server-mysql-operator/pkg/innodbcluster" -) - -var sensitiveRegexp = regexp.MustCompile(":.*@") - -type dbImplExec struct { - client clientcmd.Client - pod *corev1.Pod - user apiv1alpha1.SystemUser - pass string - host string -} - -func NewReplicatorExec(pod *corev1.Pod, cliCmd clientcmd.Client, user apiv1alpha1.SystemUser, pass, host string) (Replicator, error) { - return &dbImplExec{client: cliCmd, pod: pod, user: user, pass: pass, host: host}, nil -} - -func (d *dbImplExec) exec(ctx context.Context, stm string, stdout, stderr *bytes.Buffer) error { - cmd := []string{"mysql", "--database", "performance_schema", fmt.Sprintf("-p%s", d.pass), "-u", string(d.user), "-h", d.host, "-e", stm} - - err := d.client.Exec(ctx, d.pod, "mysql", cmd, nil, stdout, stderr, false) - if err != nil { - sout := sensitiveRegexp.ReplaceAllString(stdout.String(), ":*****@") - serr := sensitiveRegexp.ReplaceAllString(stderr.String(), ":*****@") - return errors.Wrapf(err, "run %s, stdout: %s, stderr: %s", cmd, sout, serr) - } - - if strings.Contains(stderr.String(), "ERROR") { - return fmt.Errorf("sql error: %s", stderr) - } - - return nil -} - -func (d *dbImplExec) query(ctx context.Context, query string, out interface{}) error { - var errb, outb bytes.Buffer - err := d.exec(ctx, query, &outb, &errb) - if err != nil { - return err - } - - if !strings.Contains(errb.String(), "ERROR") && outb.Len() == 0 { - return sql.ErrNoRows - } - - csv := csv.NewReader(bytes.NewReader(outb.Bytes())) - csv.Comma = '\t' - - if err = gocsv.UnmarshalCSV(csv, out); err != nil { - return err - } - - return nil -} - -func (d *dbImplExec) ChangeReplicationSource(ctx context.Context, host, replicaPass string, port int32) error { - var errb, outb bytes.Buffer - q := fmt.Sprintf(` - CHANGE REPLICATION SOURCE TO - SOURCE_USER='%s', - SOURCE_PASSWORD='%s', - SOURCE_HOST='%s', - SOURCE_PORT=%d, - SOURCE_SSL=1, - SOURCE_CONNECTION_AUTO_FAILOVER=1, - SOURCE_AUTO_POSITION=1, - SOURCE_RETRY_COUNT=3, - SOURCE_CONNECT_RETRY=60 - `, apiv1alpha1.UserReplication, replicaPass, host, port) - err := d.exec(ctx, q, &outb, &errb) - - if err != nil { - return errors.Wrap(err, "exec CHANGE REPLICATION SOURCE TO") - } - - return nil -} - -func (d *dbImplExec) StartReplication(ctx context.Context, host, replicaPass string, port int32) error { - if err := d.ChangeReplicationSource(ctx, host, replicaPass, port); err != nil { - return errors.Wrap(err, "change replication source") - } - - var errb, outb bytes.Buffer - err := d.exec(ctx, "START REPLICA", &outb, &errb) - return errors.Wrap(err, "start replication") -} - -func (d *dbImplExec) StopReplication(ctx context.Context) error { - var errb, outb bytes.Buffer - err := d.exec(ctx, "STOP REPLICA", &outb, &errb) - return errors.Wrap(err, "stop replication") -} - -func (d *dbImplExec) ResetReplication(ctx context.Context) error { - var errb, outb bytes.Buffer - err := d.exec(ctx, "RESET REPLICA ALL", &outb, &errb) - return errors.Wrap(err, "reset replication") - -} - -func (d *dbImplExec) ReplicationStatus(ctx context.Context) (ReplicationStatus, string, error) { - rows := []*struct { - IoState string `csv:"conn_state"` - SqlState string `csv:"applier_state"` - Host string `csv:"host"` - }{} - - q := fmt.Sprintf(` - SELECT - connection_status.SERVICE_STATE as conn_state, - applier_status.SERVICE_STATE as applier_state, - HOST as host - FROM replication_connection_status connection_status - JOIN replication_connection_configuration connection_configuration - ON connection_status.channel_name = connection_configuration.channel_name - JOIN replication_applier_status applier_status - ON connection_status.channel_name = applier_status.channel_name - WHERE connection_status.channel_name = '%s' - `, DefaultChannelName) - err := d.query(ctx, q, &rows) - if err != nil { - if errors.Is(err, sql.ErrNoRows) { - return ReplicationStatusNotInitiated, "", nil - } - return ReplicationStatusError, "", errors.Wrap(err, "scan replication status") - } - - if rows[0].IoState == "ON" && rows[0].SqlState == "ON" { - return ReplicationStatusActive, rows[0].Host, nil - } - - return ReplicationStatusNotInitiated, "", err -} - -func (d *dbImplExec) IsReplica(ctx context.Context) (bool, error) { - status, _, err := d.ReplicationStatus(ctx) - return status == ReplicationStatusActive, errors.Wrap(err, "get replication status") -} - -func (d *dbImplExec) EnableSuperReadonly(ctx context.Context) error { - var errb, outb bytes.Buffer - err := d.exec(ctx, "SET GLOBAL SUPER_READ_ONLY=1", &outb, &errb) - return errors.Wrap(err, "set global super_read_only param to 1") -} - -func (d *dbImplExec) DisableSuperReadonly(ctx context.Context) error { - var errb, outb bytes.Buffer - err := d.exec(ctx, "SET GLOBAL SUPER_READ_ONLY=0", &outb, &errb) - return errors.Wrap(err, "set global super_read_only param to 0") -} - -func (d *dbImplExec) IsReadonly(ctx context.Context) (bool, error) { - rows := []*struct { - Readonly int `csv:"readonly"` - }{} - - err := d.query(ctx, "select @@read_only and @@super_read_only as readonly", &rows) - if err != nil { - return false, err - } - - return rows[0].Readonly == 1, nil -} - -func (d *dbImplExec) ReportHost(ctx context.Context) (string, error) { - rows := []*struct { - Host string `csv:"host"` - }{} - - err := d.query(ctx, "select @@report_host as host", &rows) - if err != nil { - return "", err - } - - return rows[0].Host, nil -} - -func (d *dbImplExec) Close() error { - return nil -} - -func (d *dbImplExec) CloneInProgress(ctx context.Context) (bool, error) { - rows := []*struct { - State string `csv:"state"` - }{} - err := d.query(ctx, "SELECT STATE FROM clone_status as state", &rows) - if err != nil { - return false, errors.Wrap(err, "fetch clone status") - } - - for _, row := range rows { - if row.State != "Completed" && row.State != "Failed" { - return true, nil - } - } - - return false, nil -} - -func (d *dbImplExec) NeedsClone(ctx context.Context, donor string, port int32) (bool, error) { - rows := []*struct { - Source string `csv:"source"` - State string `csv:"state"` - }{} - err := d.query(ctx, "SELECT SOURCE as source, STATE as state FROM clone_status", &rows) - if err != nil { - return false, errors.Wrap(err, "fetch clone status") - } - - for _, row := range rows { - if row.Source == fmt.Sprintf("%s:%d", donor, port) && row.State == "Completed" { - return false, nil - } - } - - return true, nil -} - -func (d *dbImplExec) Clone(ctx context.Context, donor, user, pass string, port int32) error { - var errb, outb bytes.Buffer - q := fmt.Sprintf("SET GLOBAL clone_valid_donor_list='%s'", fmt.Sprintf("%s:%d", donor, port)) - err := d.exec(ctx, q, &outb, &errb) - if err != nil { - return errors.Wrap(err, "set clone_valid_donor_list") - } - - q = fmt.Sprintf("CLONE INSTANCE FROM %s@%s:%d IDENTIFIED BY %s", user, donor, port, pass) - err = d.exec(ctx, q, &outb, &errb) - - if strings.Contains(errb.String(), "ERROR") { - return errors.Wrap(err, "clone instance") - } - - // Error 3707: Restart server failed (mysqld is not managed by supervisor process). - if strings.Contains(errb.String(), "3707") { - return ErrRestartAfterClone - } - - return nil -} - -func (d *dbImplExec) DumbQuery(ctx context.Context) error { - var errb, outb bytes.Buffer - err := d.exec(ctx, "SELECT 1", &outb, &errb) - - return errors.Wrap(err, "SELECT 1") -} - -func (d *dbImplExec) GetGlobal(ctx context.Context, variable string) (interface{}, error) { - rows := []*struct { - Val interface{} `csv:"val"` - }{} - - // TODO: check how to do this without being vulnerable to injection - err := d.query(ctx, fmt.Sprintf("SELECT @@%s as val", variable), &rows) - if err != nil { - return nil, errors.Wrapf(err, "SELECT @@%s", variable) - } - - return rows[0].Val, nil -} - -func (d *dbImplExec) SetGlobal(ctx context.Context, variable, value interface{}) error { - var errb, outb bytes.Buffer - q := fmt.Sprintf("SET GLOBAL %s=%s", variable, value) - err := d.exec(ctx, q, &outb, &errb) - if err != nil { - return errors.Wrapf(err, "SET GLOBAL %s=%s", variable, value) - - } - return nil -} - -func (d *dbImplExec) StartGroupReplication(ctx context.Context, password string) error { - var errb, outb bytes.Buffer - q := fmt.Sprintf("START GROUP_REPLICATION USER='%s', PASSWORD='%s'", apiv1alpha1.UserReplication, password) - err := d.exec(ctx, q, &outb, &errb) - - mErr, ok := err.(*mysql.MySQLError) - if !ok { - return errors.Wrap(err, "start group replication") - } - - // Error 3092: The server is not configured properly to be an active member of the group. - if mErr.Number == uint16(3092) { - return ErrGroupReplicationNotReady - } - - return errors.Wrap(err, "start group replication") -} - -func (d *dbImplExec) StopGroupReplication(ctx context.Context) error { - var errb, outb bytes.Buffer - err := d.exec(ctx, "STOP GROUP_REPLICATION", &outb, &errb) - return errors.Wrap(err, "stop group replication") -} - -func (d *dbImplExec) GetGroupReplicationPrimary(ctx context.Context) (string, error) { - rows := []*struct { - Host string `csv:"host"` - }{} - - err := d.query(ctx, "SELECT MEMBER_HOST as host FROM replication_group_members WHERE MEMBER_ROLE='PRIMARY' AND MEMBER_STATE='ONLINE'", &rows) - if err != nil { - return "", errors.Wrap(err, "query primary member") - } - - return rows[0].Host, nil -} - -// TODO: finish implementation -func (d *dbImplExec) GetGroupReplicationReplicas(ctx context.Context) ([]string, error) { - rows := []*struct { - Host string `csv:"host"` - }{} - - err := d.query(ctx, "SELECT MEMBER_HOST as host FROM replication_group_members WHERE MEMBER_ROLE='SECONDARY' AND MEMBER_STATE='ONLINE'", &rows) - if err != nil { - return nil, errors.Wrap(err, "query replicas") - } - - replicas := make([]string, 0) - for _, row := range rows { - replicas = append(replicas, row.Host) - } - - return replicas, nil -} - -func (d *dbImplExec) GetMemberState(ctx context.Context, host string) (MemberState, error) { - rows := []*struct { - State MemberState `csv:"state"` - }{} - q := fmt.Sprintf(`SELECT MEMBER_STATE as state FROM replication_group_members WHERE MEMBER_HOST='%s'`, host) - err := d.query(ctx, q, &rows) - if err != nil { - if errors.Is(err, sql.ErrNoRows) { - return MemberStateOffline, nil - } - return MemberStateError, errors.Wrap(err, "query member state") - } - - return rows[0].State, nil -} - -func (d *dbImplExec) GetGroupReplicationMembers(ctx context.Context) ([]string, error) { - rows := []*struct { - Member string `csv:"member"` - }{} - - err := d.query(ctx, "SELECT MEMBER_HOST as member FROM replication_group_members", &rows) - if err != nil { - return nil, errors.Wrap(err, "query members") - } - - members := make([]string, 0) - for _, row := range rows { - members = append(members, row.Member) - } - - return members, nil -} - -func (d *dbImplExec) CheckIfDatabaseExists(ctx context.Context, name string) (bool, error) { - rows := []*struct { - DB string `csv:"db"` - }{} - - q := fmt.Sprintf("SELECT SCHEMA_NAME AS db FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME LIKE '%s'", name) - err := d.query(ctx, q, &rows) - - if err != nil { - if errors.Is(err, sql.ErrNoRows) { - return false, nil - } - return false, err - } - - return true, nil -} - -// TODO: finish implementation -func (d *dbImplExec) CheckIfInPrimaryPartition(ctx context.Context) (bool, error) { - rows := []*struct { - In bool `csv:"in"` - }{} - - err := d.query(ctx, ` - SELECT - MEMBER_STATE = 'ONLINE' - AND ( - ( - SELECT - COUNT(*) - FROM - performance_schema.replication_group_members - WHERE - MEMBER_STATE NOT IN ('ONLINE', 'RECOVERING') - ) >= ( - ( - SELECT - COUNT(*) - FROM - performance_schema.replication_group_members - ) / 2 - ) = 0 - ) as in - FROM - performance_schema.replication_group_members - JOIN performance_schema.replication_group_member_stats USING(member_id) - WHERE - member_id = @@glob, &outb, &errba - `, &rows) - - if err != nil { - return false, err - } - - return rows[0].In, nil -} - -func (d *dbImplExec) CheckIfPrimaryUnreachable(ctx context.Context) (bool, error) { - var state string - - err := d.query(ctx, ` - SELECT - MEMBER_STATE - FROM - performance_schema.replication_group_members - WHERE - MEMBER_ROLE = 'PRIMARY' - `, &state) - if err != nil { - return false, err - } - - return state == string(innodbcluster.MemberStateUnreachable), nil -} diff --git a/pkg/users/users.go b/pkg/users/users.go deleted file mode 100644 index 33ed1a0bd..000000000 --- a/pkg/users/users.go +++ /dev/null @@ -1,132 +0,0 @@ -package users - -import ( - "database/sql" - "fmt" - - mysqldriver "github.com/go-sql-driver/mysql" - "github.com/pkg/errors" - - apiv1alpha1 "github.com/percona/percona-server-mysql-operator/api/v1alpha1" - "github.com/percona/percona-server-mysql-operator/pkg/mysql" -) - -type Manager interface { - UpdateUserPasswords(users []mysql.User) error - DiscardOldPasswords(users []mysql.User) error - Close() error -} - -type dbImpl struct{ db *sql.DB } - -func NewManager(user apiv1alpha1.SystemUser, pass, host string, port int32) (Manager, error) { - config := mysqldriver.NewConfig() - - config.User = string(user) - config.Passwd = pass - config.Net = "tcp" - config.Addr = fmt.Sprintf("%s:%d", host, port) - config.DBName = "performance_schema" - config.Params = map[string]string{ - "interpolateParams": "true", - "timeout": "20s", - "readTimeout": "20s", - "writeTimeout": "20s", - "tls": "preferred", - } - - db, err := sql.Open("mysql", config.FormatDSN()) - if err != nil { - return nil, errors.Wrap(err, "connect to MySQL") - } - - if err := db.Ping(); err != nil { - return nil, errors.Wrap(err, "ping database") - } - - return &dbImpl{db}, nil -} - -// UpdateUserPasswords updates user passwords but retains the current password using Dual Password feature of MySQL 8 -func (d *dbImpl) UpdateUserPasswords(users []mysql.User) error { - tx, err := d.db.Begin() - if err != nil { - return errors.Wrap(err, "begin transaction") - } - - for _, user := range users { - for _, host := range user.Hosts { - _, err = tx.Exec("ALTER USER ?@? IDENTIFIED BY ? RETAIN CURRENT PASSWORD", user.Username, host, user.Password) - if err != nil { - err = errors.Wrap(err, "alter user") - - if errT := tx.Rollback(); errT != nil { - return errors.Wrap(errors.Wrap(errT, "rollback"), err.Error()) - } - - return err - } - } - } - - _, err = tx.Exec("FLUSH PRIVILEGES") - if err != nil { - err = errors.Wrap(err, "flush privileges") - - if errT := tx.Rollback(); errT != nil { - return errors.Wrap(errors.Wrap(errT, "rollback"), err.Error()) - } - - return err - } - - if err := tx.Commit(); err != nil { - return errors.Wrap(err, "commit transaction") - } - - return nil -} - -// DiscardOldPasswords discards old passwords of givens users -func (d *dbImpl) DiscardOldPasswords(users []mysql.User) error { - tx, err := d.db.Begin() - if err != nil { - return errors.Wrap(err, "begin transaction") - } - - for _, user := range users { - for _, host := range user.Hosts { - _, err = tx.Exec("ALTER USER ?@? DISCARD OLD PASSWORD", user.Username, host) - if err != nil { - err = errors.Wrap(err, "alter user") - - if errT := tx.Rollback(); errT != nil { - return errors.Wrap(errors.Wrap(errT, "rollback"), err.Error()) - } - - return err - } - } - } - - _, err = tx.Exec("FLUSH PRIVILEGES") - if err != nil { - err = errors.Wrap(err, "flush privileges") - - if errT := tx.Rollback(); errT != nil { - return errors.Wrap(errors.Wrap(errT, "rollback"), err.Error()) - } - - return err - } - - if err := tx.Commit(); err != nil { - return errors.Wrap(err, "commit transaction") - } - - return nil -} - -func (d *dbImpl) Close() error { - return d.db.Close() -} diff --git a/pkg/users/usersexec.go b/pkg/users/usersexec.go deleted file mode 100644 index b143c5150..000000000 --- a/pkg/users/usersexec.go +++ /dev/null @@ -1,101 +0,0 @@ -package users - -import ( - "bytes" - "context" - "fmt" - "regexp" - "strings" - - "github.com/pkg/errors" - corev1 "k8s.io/api/core/v1" - - apiv1alpha1 "github.com/percona/percona-server-mysql-operator/api/v1alpha1" - "github.com/percona/percona-server-mysql-operator/pkg/clientcmd" - "github.com/percona/percona-server-mysql-operator/pkg/mysql" -) - -var sensitiveRegexp = regexp.MustCompile(":.*@") - -type dbExecImpl struct { - client clientcmd.Client - pod *corev1.Pod - user apiv1alpha1.SystemUser - pass string - host string -} - -func NewManagerExec(pod *corev1.Pod, cliCmd clientcmd.Client, user apiv1alpha1.SystemUser, pass, host string) (Manager, error) { - return &dbExecImpl{client: cliCmd, pod: pod, user: user, pass: pass, host: host}, nil -} - -func (d *dbExecImpl) exec(stm string) error { - - cmd := []string{"mysql", "--database", "performance_schema", fmt.Sprintf("-p%s", escapePass(d.pass)), "-u", string(d.user), "-h", d.host, "-e", stm} - - var outb, errb bytes.Buffer - err := d.client.Exec(context.TODO(), d.pod, "mysql", cmd, nil, &outb, &errb, false) - if err != nil { - sout := sensitiveRegexp.ReplaceAllString(outb.String(), ":*****@") - serr := sensitiveRegexp.ReplaceAllString(errb.String(), ":*****@") - return errors.Wrapf(err, "run %s, stdout: %s, stderr: %s", cmd, sout, serr) - } - - if strings.Contains(errb.String(), "ERROR") { - serr := sensitiveRegexp.ReplaceAllString(errb.String(), ":*****@") - return fmt.Errorf("sql error: %s", serr) - } - - return nil -} - -// UpdateUserPasswords updates user passwords but retains the current password using Dual Password feature of MySQL 8 -func (d *dbExecImpl) UpdateUserPasswords(users []mysql.User) error { - for _, user := range users { - for _, host := range user.Hosts { - q := fmt.Sprintf("ALTER USER '%s'@'%s' IDENTIFIED BY '%s' RETAIN CURRENT PASSWORD", user.Username, host, escapePass(user.Password)) - err := d.exec(q) - if err != nil { - return errors.Wrap(err, "alter user") - } - } - } - - err := d.exec("FLUSH PRIVILEGES") - if err != nil { - return errors.Wrap(err, "flush privileges") - } - - return nil -} - -// DiscardOldPasswords discards old passwords of givens users -func (d *dbExecImpl) DiscardOldPasswords(users []mysql.User) error { - for _, user := range users { - for _, host := range user.Hosts { - q := fmt.Sprintf("ALTER USER '%s'@'%s' DISCARD OLD PASSWORD", user.Username, host) - err := d.exec(q) - if err != nil { - return errors.Wrap(err, "discard old password") - } - } - } - - err := d.exec("FLUSH PRIVILEGES") - if err != nil { - return errors.Wrap(err, "flush privileges") - } - - return nil -} - -func (d *dbExecImpl) Close() error { - return nil -} - -func escapePass(pass string) string { - s := strings.ReplaceAll(pass, `'`, `\'`) - s = strings.ReplaceAll(s, `"`, `\"`) - s = strings.ReplaceAll(s, `\`, `\\`) - return s -}