Skip to content

Commit

Permalink
Merge branch 'main' into feature/orcMonitResources
Browse files Browse the repository at this point in the history
  • Loading branch information
SlavaUtesinov authored Jan 19, 2024
2 parents a45d435 + b292362 commit 2e79d63
Show file tree
Hide file tree
Showing 24 changed files with 718 additions and 1,569 deletions.
3 changes: 3 additions & 0 deletions api/v1alpha1/perconaservermysql_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,7 @@ func (cr *PerconaServerMySQL) CheckNSetDefaults(ctx context.Context, serverVersi

if cr.Spec.MySQL.ClusterType == ClusterTypeGR && !cr.Spec.AllowUnsafeConfig {
if cr.Spec.MySQL.Size < MinSafeGRSize {
log.Info("Setting safe defaults, updating MySQL cluster size", "oldSize", cr.Spec.MySQL.Size, "newSafeSize", MinSafeGRSize)
cr.Spec.MySQL.Size = MinSafeGRSize
}

Expand All @@ -646,6 +647,7 @@ func (cr *PerconaServerMySQL) CheckNSetDefaults(ctx context.Context, serverVersi

if cr.RouterEnabled() && !cr.Spec.AllowUnsafeConfig {
if cr.Spec.Proxy.Router.Size < MinSafeProxySize {
log.Info("Setting safe defaults, updating Router size", "oldSize", cr.Spec.Proxy.Router.Size, "newSafeSize", MinSafeProxySize)
cr.Spec.Proxy.Router.Size = MinSafeProxySize
}
}
Expand All @@ -656,6 +658,7 @@ func (cr *PerconaServerMySQL) CheckNSetDefaults(ctx context.Context, serverVersi

if cr.HAProxyEnabled() && !cr.Spec.AllowUnsafeConfig {
if cr.Spec.Proxy.HAProxy.Size < MinSafeProxySize {
log.Info("Setting safe defaults, updating HAProxy size", "oldSize", cr.Spec.Proxy.HAProxy.Size, "newSafeSize", MinSafeProxySize)
cr.Spec.Proxy.HAProxy.Size = MinSafeProxySize
}
}
Expand Down
17 changes: 9 additions & 8 deletions cmd/bootstrap/async_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ import (
"k8s.io/apimachinery/pkg/util/sets"

apiv1alpha1 "github.com/percona/percona-server-mysql-operator/api/v1alpha1"
database "github.com/percona/percona-server-mysql-operator/cmd/db"
mysqldb "github.com/percona/percona-server-mysql-operator/pkg/db"
"github.com/percona/percona-server-mysql-operator/pkg/mysql"
"github.com/percona/percona-server-mysql-operator/pkg/replicator"
)

func bootstrapAsyncReplication(ctx context.Context) error {
Expand Down Expand Up @@ -87,9 +88,9 @@ func bootstrapAsyncReplication(ctx context.Context) error {
return errors.Wrapf(err, "get %s password", apiv1alpha1.UserOperator)
}

db, err := replicator.NewReplicator(ctx, apiv1alpha1.UserOperator, operatorPass, podIp, mysql.DefaultAdminPort)
db, err := database.NewDatabase(ctx, apiv1alpha1.UserOperator, operatorPass, podIp, mysql.DefaultAdminPort)
if err != nil {
return errors.Wrap(err, "connect to db")
return errors.Wrap(err, "connect to database")
}
defer db.Close()

Expand Down Expand Up @@ -148,7 +149,7 @@ func bootstrapAsyncReplication(ctx context.Context) error {
log.Printf("Cloning from %s", donor)
err = db.Clone(ctx, donor, string(apiv1alpha1.UserOperator), operatorPass, mysql.DefaultAdminPort)
timer.Stop("clone")
if err != nil && !errors.Is(err, replicator.ErrRestartAfterClone) {
if err != nil && !errors.Is(err, database.ErrRestartAfterClone) {
return errors.Wrapf(err, "clone from donor %s", donor)
}

Expand All @@ -174,7 +175,7 @@ func bootstrapAsyncReplication(ctx context.Context) error {
return errors.Wrap(err, "check replication status")
}

if rStatus == replicator.ReplicationStatusNotInitiated {
if rStatus == mysqldb.ReplicationStatusNotInitiated {
log.Println("configuring replication")

replicaPass, err := getSecret(apiv1alpha1.UserReplication)
Expand Down Expand Up @@ -208,7 +209,7 @@ func getTopology(ctx context.Context, peers sets.Set[string]) (string, []string,
}

for _, peer := range sets.List(peers) {
db, err := replicator.NewReplicator(ctx, apiv1alpha1.UserOperator, operatorPass, peer, mysql.DefaultAdminPort)
db, err := database.NewDatabase(ctx, apiv1alpha1.UserOperator, operatorPass, peer, mysql.DefaultAdminPort)
if err != nil {
return "", nil, errors.Wrapf(err, "connect to %s", peer)
}
Expand All @@ -228,7 +229,7 @@ func getTopology(ctx context.Context, peers sets.Set[string]) (string, []string,
}
replicas.Insert(replicaHost)

if status == replicator.ReplicationStatusActive {
if status == mysqldb.ReplicationStatusActive {
primary = source
}
}
Expand All @@ -255,7 +256,7 @@ func selectDonor(ctx context.Context, fqdn, primary string, replicas []string) (
}

for _, replica := range replicas {
db, err := replicator.NewReplicator(ctx, apiv1alpha1.UserOperator, operatorPass, replica, mysql.DefaultAdminPort)
db, err := database.NewDatabase(ctx, apiv1alpha1.UserOperator, operatorPass, replica, mysql.DefaultAdminPort)
if err != nil {
continue
}
Expand Down
240 changes: 240 additions & 0 deletions cmd/db/db.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
package db

import (
"context"
"database/sql"
"fmt"

"github.com/go-sql-driver/mysql"
"github.com/pkg/errors"

apiv1alpha1 "github.com/percona/percona-server-mysql-operator/api/v1alpha1"
"github.com/percona/percona-server-mysql-operator/pkg/db"
)

const defaultChannelName = ""

var ErrRestartAfterClone = errors.New("Error 3707: Restart server failed (mysqld is not managed by supervisor process).")

type ReplicationStatus int8

type DB struct {
db *sql.DB
}

func NewDatabase(ctx context.Context, user apiv1alpha1.SystemUser, pass, host string, port int32) (*DB, error) {
config := mysql.NewConfig()

config.User = string(user)
config.Passwd = pass
config.Net = "tcp"
config.Addr = fmt.Sprintf("%s:%d", host, port)
config.DBName = "performance_schema"
config.Params = map[string]string{
"interpolateParams": "true",
"timeout": "10s",
"readTimeout": "10s",
"writeTimeout": "10s",
"tls": "preferred",
}

db, err := sql.Open("mysql", config.FormatDSN())
if err != nil {
return nil, errors.Wrap(err, "connect to MySQL")
}

if err := db.PingContext(ctx); err != nil {
return nil, errors.Wrap(err, "ping DB")
}

return &DB{db}, nil
}

func (d *DB) StartReplication(ctx context.Context, host, replicaPass string, port int32) error {
// TODO: Make retries configurable
_, err := d.db.ExecContext(ctx, `
CHANGE REPLICATION SOURCE TO
SOURCE_USER=?,
SOURCE_PASSWORD=?,
SOURCE_HOST=?,
SOURCE_PORT=?,
SOURCE_SSL=1,
SOURCE_CONNECTION_AUTO_FAILOVER=1,
SOURCE_AUTO_POSITION=1,
SOURCE_RETRY_COUNT=3,
SOURCE_CONNECT_RETRY=60
`, apiv1alpha1.UserReplication, replicaPass, host, port)
if err != nil {
return errors.Wrap(err, "exec CHANGE REPLICATION SOURCE TO")
}

_, err = d.db.ExecContext(ctx, "START REPLICA")
return errors.Wrap(err, "start replication")
}

func (d *DB) StopReplication(ctx context.Context) error {
_, err := d.db.ExecContext(ctx, "STOP REPLICA")
return errors.Wrap(err, "stop replication")
}

func (d *DB) ResetReplication(ctx context.Context) error {
_, err := d.db.ExecContext(ctx, "RESET REPLICA ALL")
return errors.Wrap(err, "reset replication")
}

func (d *DB) ReplicationStatus(ctx context.Context) (db.ReplicationStatus, string, error) {
row := d.db.QueryRowContext(ctx, `
SELECT
connection_status.SERVICE_STATE,
applier_status.SERVICE_STATE,
HOST
FROM replication_connection_status connection_status
JOIN replication_connection_configuration connection_configuration
ON connection_status.channel_name = connection_configuration.channel_name
JOIN replication_applier_status applier_status
ON connection_status.channel_name = applier_status.channel_name
WHERE connection_status.channel_name = ?
`, defaultChannelName)

var ioState, sqlState, host string
if err := row.Scan(&ioState, &sqlState, &host); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return db.ReplicationStatusNotInitiated, "", nil
}
return db.ReplicationStatusError, "", errors.Wrap(err, "scan replication status")
}

if ioState == "ON" && sqlState == "ON" {
return db.ReplicationStatusActive, host, nil
}

return db.ReplicationStatusNotInitiated, "", nil
}

func (d *DB) IsReplica(ctx context.Context) (bool, error) {
status, _, err := d.ReplicationStatus(ctx)
return status == db.ReplicationStatusActive, errors.Wrap(err, "get replication status")
}

func (d *DB) DisableSuperReadonly(ctx context.Context) error {
_, err := d.db.ExecContext(ctx, "SET GLOBAL SUPER_READ_ONLY=0")
return errors.Wrap(err, "set global super_read_only param to 0")
}

func (d *DB) IsReadonly(ctx context.Context) (bool, error) {
var readonly int
err := d.db.QueryRowContext(ctx, "select @@read_only and @@super_read_only").Scan(&readonly)
return readonly == 1, errors.Wrap(err, "select global read_only param")
}

func (d *DB) ReportHost(ctx context.Context) (string, error) {
var reportHost string
err := d.db.QueryRowContext(ctx, "select @@report_host").Scan(&reportHost)
return reportHost, errors.Wrap(err, "select report_host param")
}

func (d *DB) Close() error {
return d.db.Close()
}

func (d *DB) CloneInProgress(ctx context.Context) (bool, error) {
rows, err := d.db.QueryContext(ctx, "SELECT STATE FROM clone_status")
if err != nil {
return false, errors.Wrap(err, "fetch clone status")
}
defer rows.Close()

for rows.Next() {
var state string
if err := rows.Scan(&state); err != nil {
return false, errors.Wrap(err, "scan rows")
}

if state != "Completed" && state != "Failed" {
return true, nil
}
}

return false, nil
}

func (d *DB) Clone(ctx context.Context, donor, user, pass string, port int32) error {
_, err := d.db.ExecContext(ctx, "SET GLOBAL clone_valid_donor_list=?", fmt.Sprintf("%s:%d", donor, port))
if err != nil {
return errors.Wrap(err, "set clone_valid_donor_list")
}

_, err = d.db.ExecContext(ctx, "CLONE INSTANCE FROM ?@?:? IDENTIFIED BY ?", user, donor, port, pass)

mErr, ok := err.(*mysql.MySQLError)
if !ok {
return errors.Wrap(err, "clone instance")
}

// Error 3707: Restart server failed (mysqld is not managed by supervisor process).
if mErr.Number == uint16(3707) {
return ErrRestartAfterClone
}

return nil
}

func (d *DB) DumbQuery(ctx context.Context) error {
_, err := d.db.ExecContext(ctx, "SELECT 1")
return errors.Wrap(err, "SELECT 1")
}

func (d *DB) GetMemberState(ctx context.Context, host string) (db.MemberState, error) {
var state db.MemberState

err := d.db.QueryRowContext(ctx, "SELECT MEMBER_STATE FROM replication_group_members WHERE MEMBER_HOST=?", host).Scan(&state)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return db.MemberStateOffline, nil
}
return db.MemberStateError, errors.Wrap(err, "query member state")
}

return state, nil
}

func (d *DB) CheckIfInPrimaryPartition(ctx context.Context) (bool, error) {
var in bool

err := d.db.QueryRowContext(ctx, `
SELECT
MEMBER_STATE = 'ONLINE'
AND (
(
SELECT
COUNT(*)
FROM
performance_schema.replication_group_members
WHERE
MEMBER_STATE NOT IN ('ONLINE', 'RECOVERING')
) >= (
(
SELECT
COUNT(*)
FROM
performance_schema.replication_group_members
) / 2
) = 0
)
FROM
performance_schema.replication_group_members
JOIN performance_schema.replication_group_member_stats USING(member_id)
WHERE
member_id = @@global.server_uuid;
`).Scan(&in)
if err != nil {
return false, err
}

return in, nil
}

func (d *DB) EnableSuperReadonly(ctx context.Context) error {
_, err := d.db.ExecContext(ctx, "SET GLOBAL SUPER_READ_ONLY=1")
return errors.Wrap(err, "set global super_read_only param to 1")
}
15 changes: 8 additions & 7 deletions cmd/healthcheck/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@ import (
"github.com/pkg/errors"

apiv1alpha1 "github.com/percona/percona-server-mysql-operator/api/v1alpha1"
database "github.com/percona/percona-server-mysql-operator/cmd/db"
mysqldb "github.com/percona/percona-server-mysql-operator/pkg/db"
"github.com/percona/percona-server-mysql-operator/pkg/k8s"
"github.com/percona/percona-server-mysql-operator/pkg/mysql"
"github.com/percona/percona-server-mysql-operator/pkg/replicator"
)

func main() {
Expand Down Expand Up @@ -81,7 +82,7 @@ func checkReadinessAsync(ctx context.Context) error {
return errors.Wrapf(err, "get %s password", apiv1alpha1.UserMonitor)
}

db, err := replicator.NewReplicator(ctx, apiv1alpha1.UserMonitor, monitorPass, podIP, mysql.DefaultAdminPort)
db, err := database.NewDatabase(ctx, apiv1alpha1.UserMonitor, monitorPass, podIP, mysql.DefaultAdminPort)
if err != nil {
return errors.Wrap(err, "connect to db")
}
Expand Down Expand Up @@ -116,7 +117,7 @@ func checkReadinessGR(ctx context.Context) error {
return errors.Wrapf(err, "get %s password", apiv1alpha1.UserMonitor)
}

db, err := replicator.NewReplicator(ctx, apiv1alpha1.UserMonitor, monitorPass, podIP, mysql.DefaultAdminPort)
db, err := database.NewDatabase(ctx, apiv1alpha1.UserMonitor, monitorPass, podIP, mysql.DefaultAdminPort)
if err != nil {
return errors.Wrap(err, "connect to db")
}
Expand All @@ -132,7 +133,7 @@ func checkReadinessGR(ctx context.Context) error {
return errors.Wrap(err, "get member state")
}

if state != replicator.MemberStateOnline {
if state != mysqldb.MemberStateOnline {
return errors.Errorf("Member state: %s", state)
}

Expand All @@ -150,7 +151,7 @@ func checkLivenessAsync(ctx context.Context) error {
return errors.Wrapf(err, "get %s password", apiv1alpha1.UserMonitor)
}

db, err := replicator.NewReplicator(ctx, apiv1alpha1.UserMonitor, monitorPass, podIP, mysql.DefaultAdminPort)
db, err := database.NewDatabase(ctx, apiv1alpha1.UserMonitor, monitorPass, podIP, mysql.DefaultAdminPort)
if err != nil {
return errors.Wrap(err, "connect to db")
}
Expand All @@ -170,7 +171,7 @@ func checkLivenessGR(ctx context.Context) error {
return errors.Wrapf(err, "get %s password", apiv1alpha1.UserMonitor)
}

db, err := replicator.NewReplicator(ctx, apiv1alpha1.UserMonitor, monitorPass, podIP, mysql.DefaultAdminPort)
db, err := database.NewDatabase(ctx, apiv1alpha1.UserMonitor, monitorPass, podIP, mysql.DefaultAdminPort)
if err != nil {
return errors.Wrap(err, "connect to db")
}
Expand Down Expand Up @@ -201,7 +202,7 @@ func checkReplication(ctx context.Context) error {
return errors.Wrapf(err, "get %s password", apiv1alpha1.UserMonitor)
}

db, err := replicator.NewReplicator(ctx, apiv1alpha1.UserMonitor, monitorPass, podIP, mysql.DefaultAdminPort)
db, err := database.NewDatabase(ctx, apiv1alpha1.UserMonitor, monitorPass, podIP, mysql.DefaultAdminPort)
if err != nil {
return errors.Wrap(err, "connect to db")
}
Expand Down
Loading

0 comments on commit 2e79d63

Please sign in to comment.