Skip to content

Commit

Permalink
Use Scylla Manager cluster labels for cluster reconciliation
Browse files Browse the repository at this point in the history
  • Loading branch information
rzetelskik committed Oct 16, 2024
1 parent 04a5ae3 commit 5ffcffb
Show file tree
Hide file tree
Showing 4 changed files with 870 additions and 645 deletions.
20 changes: 15 additions & 5 deletions pkg/controller/manager/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,30 +6,41 @@ import (
"context"

scyllav1 "github.com/scylladb/scylla-operator/pkg/api/scylla/v1"
"github.com/scylladb/scylla-operator/pkg/naming"
"github.com/scylladb/scylla-operator/pkg/pointer"
apiequality "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
)

func (c *Controller) calculateStatus(sc *scyllav1.ScyllaCluster, managerState *state) *scyllav1.ScyllaClusterStatus {
func (c *Controller) calculateStatus(sc *scyllav1.ScyllaCluster, state *managerClusterState) *scyllav1.ScyllaClusterStatus {
status := sc.Status.DeepCopy()

status.ManagerID = pointer.Ptr("")
status.Backups = []scyllav1.BackupTaskStatus{}
status.Repairs = []scyllav1.RepairTaskStatus{}

if state.Cluster == nil || state.Cluster.Labels[naming.OwnerUIDLabel] != string(sc.UID) {
return status
}

status.ManagerID = pointer.Ptr(state.Cluster.ID)

repairTaskClientErrorMap := map[string]string{}
for _, rts := range status.Repairs {
if rts.Error != nil {
repairTaskClientErrorMap[rts.Name] = *rts.Error
}
}

status.Repairs = []scyllav1.RepairTaskStatus{}
for _, rt := range sc.Spec.Repairs {
repairTaskStatus := scyllav1.RepairTaskStatus{
TaskStatus: scyllav1.TaskStatus{
Name: rt.Name,
},
}

managerTaskStatus, isInManagerState := managerState.RepairTasks[rt.Name]
managerTaskStatus, isInManagerState := state.RepairTasks[rt.Name]
if isInManagerState {
repairTaskStatus = managerTaskStatus
} else {
Expand All @@ -52,15 +63,14 @@ func (c *Controller) calculateStatus(sc *scyllav1.ScyllaCluster, managerState *s
}
}

status.Backups = []scyllav1.BackupTaskStatus{}
for _, bt := range sc.Spec.Backups {
backupTaskStatus := scyllav1.BackupTaskStatus{
TaskStatus: scyllav1.TaskStatus{
Name: bt.Name,
},
}

managerTaskStatus, isInManagerState := managerState.BackupTasks[bt.Name]
managerTaskStatus, isInManagerState := state.BackupTasks[bt.Name]
if isInManagerState {
backupTaskStatus = managerTaskStatus
} else {
Expand Down
128 changes: 70 additions & 58 deletions pkg/controller/manager/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ import (
"fmt"
"time"

"github.com/scylladb/scylla-manager/v3/pkg/managerclient"
"github.com/scylladb/scylla-manager/v3/swagger/gen/scylla-manager/models"
scyllav1 "github.com/scylladb/scylla-operator/pkg/api/scylla/v1"
"github.com/scylladb/scylla-operator/pkg/helpers"
"github.com/scylladb/scylla-operator/pkg/helpers/slices"
"github.com/scylladb/scylla-operator/pkg/naming"
apierrors "k8s.io/apimachinery/pkg/api/errors"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
Expand All @@ -24,59 +27,73 @@ func (c *Controller) getAuthToken(sc *scyllav1.ScyllaCluster) (string, error) {
return helpers.GetAgentAuthTokenFromSecret(secret)
}

func (c *Controller) getManagerState(ctx context.Context, clusterID string) (*state, error) {
clusters, err := c.managerClient.ListClusters(ctx)
func (c *Controller) getManagerClusterState(ctx context.Context, sc *scyllav1.ScyllaCluster) (*managerClusterState, error) {
managerClusters, err := c.managerClient.ListClusters(ctx)
if err != nil {
return nil, err
}
var (
repairTasks map[string]scyllav1.RepairTaskStatus
backupTasks map[string]scyllav1.BackupTaskStatus
)

if clusterID != "" {
clusterFound := false
for _, c := range clusters {
if c.ID == clusterID {
clusterFound = true
}
return nil, fmt.Errorf("can't list clusters registered with manager: %w", err)
}

clusterName := naming.ManagerClusterName(sc)
// Cluster names in manager state are unique, so it suffices to only find one with a matching name.
managerCluster, _, found := slices.Find(managerClusters, func(c *models.Cluster) bool {
return c.Name == clusterName
})
if !found {
return &managerClusterState{}, nil
}

ownerUIDLabel := managerCluster.Labels[naming.OwnerUIDLabel]
if ownerUIDLabel != string(sc.UID) {
// Despite the label mismatch the cluster needs to be propagated to state so that we can delete it to avoid a name collision.
return &managerClusterState{
Cluster: managerCluster,
}, nil
}

// Sanity check.
if len(managerCluster.ID) == 0 {
return nil, fmt.Errorf("manager cluster is missing an ID")
}

var repairTaskStatuses map[string]scyllav1.RepairTaskStatus
var backupTaskStatuses map[string]scyllav1.BackupTaskStatus

var managerRepairTasks managerclient.TaskListItems
managerRepairTasks, err = c.managerClient.ListTasks(ctx, managerCluster.ID, "repair", true, "", "")
if err != nil {
return nil, fmt.Errorf("can't list repair tasks registered with manager: %w", err)
}

repairTaskStatuses = make(map[string]scyllav1.RepairTaskStatus, len(managerRepairTasks.TaskListItemSlice))
for _, managerRepairTask := range managerRepairTasks.TaskListItemSlice {
var repairTaskStatus *scyllav1.RepairTaskStatus
repairTaskStatus, err = NewRepairStatusFromManager(managerRepairTask)
if err != nil {
return nil, fmt.Errorf("can't get repair task status from manager task: %w", err)
}
repairTaskStatuses[repairTaskStatus.Name] = *repairTaskStatus
}

var managerBackupTasks managerclient.TaskListItems
managerBackupTasks, err = c.managerClient.ListTasks(ctx, managerCluster.ID, "backup", true, "", "")
if err != nil {
return nil, fmt.Errorf("can't list backup tasks registered with manager: %w", err)
}

if clusterFound {
managerRepairTasks, err := c.managerClient.ListTasks(ctx, clusterID, "repair", true, "", "")
if err != nil {
return nil, err
}

repairTasks = make(map[string]scyllav1.RepairTaskStatus, len(managerRepairTasks.TaskListItemSlice))
for _, managerRepairTask := range managerRepairTasks.TaskListItemSlice {
rts, err := NewRepairStatusFromManager(managerRepairTask)
if err != nil {
return nil, err
}
repairTasks[rts.Name] = *rts
}

managerBackupTasks, err := c.managerClient.ListTasks(ctx, clusterID, "backup", true, "", "")
if err != nil {
return nil, err
}

backupTasks = make(map[string]scyllav1.BackupTaskStatus, len(managerBackupTasks.TaskListItemSlice))
for _, managerBackupTask := range managerBackupTasks.TaskListItemSlice {
bts, err := NewBackupStatusFromManager(managerBackupTask)
if err != nil {
return nil, err
}
backupTasks[bts.Name] = *bts
}
backupTaskStatuses = make(map[string]scyllav1.BackupTaskStatus, len(managerBackupTasks.TaskListItemSlice))
for _, managerBackupTask := range managerBackupTasks.TaskListItemSlice {
var backupTaskStatus *scyllav1.BackupTaskStatus
backupTaskStatus, err = NewBackupStatusFromManager(managerBackupTask)
if err != nil {
return nil, fmt.Errorf("can't get backup task status from manager backup task: %w", err)
}
backupTaskStatuses[backupTaskStatus.Name] = *backupTaskStatus
}

return &state{
Clusters: clusters,
BackupTasks: backupTasks,
RepairTasks: repairTasks,
return &managerClusterState{
Cluster: managerCluster,
BackupTasks: backupTaskStatuses,
RepairTasks: repairTaskStatuses,
}, nil
}

Expand All @@ -102,30 +119,25 @@ func (c *Controller) sync(ctx context.Context, key string) error {
return err
}

clusterID := ""
if sc.Status.ManagerID != nil {
clusterID = *sc.Status.ManagerID
}

managerState, err := c.getManagerState(ctx, clusterID)
state, err := c.getManagerClusterState(ctx, sc)
if err != nil {
return fmt.Errorf("can't get manager state: %w", err)
return fmt.Errorf("can't get manager state for cluster %q: %w", naming.ObjRef(sc), err)
}

status := c.calculateStatus(sc, managerState)
status := c.calculateStatus(sc, state)

if sc.DeletionTimestamp != nil {
return c.updateStatus(ctx, sc, status)
}

authToken, err := c.getAuthToken(sc)
if err != nil {
return fmt.Errorf("can't get auth token: %w", err)
return fmt.Errorf("can't get auth token for cluster %q: %w", naming.ObjRef(sc), err)
}

actions, requeue, err := runSync(ctx, sc, authToken, managerState)
actions, requeue, err := runSync(ctx, sc, authToken, state)
if err != nil {
return fmt.Errorf("can't run sync: %w", err)
return fmt.Errorf("can't run sync for cluster %q: %w", naming.ObjRef(sc), err)
}

var errs []error
Expand Down
Loading

0 comments on commit 5ffcffb

Please sign in to comment.