From 56688ba1a37159aff04b0daa7a58bb6c45ee45dc Mon Sep 17 00:00:00 2001 From: Rewant Soni Date: Mon, 4 Nov 2024 21:11:02 +0530 Subject: [PATCH] add server side implementation Signed-off-by: Rewant Soni --- controllers/storagecluster/reconcile.go | 27 +++++ services/provider/server/consumer.go | 15 +++ services/provider/server/server.go | 131 ++++++++++++++++++++++++ 3 files changed, 173 insertions(+) diff --git a/controllers/storagecluster/reconcile.go b/controllers/storagecluster/reconcile.go index 5ecd8d8350..06d04a6b40 100644 --- a/controllers/storagecluster/reconcile.go +++ b/controllers/storagecluster/reconcile.go @@ -408,6 +408,10 @@ func (r *StorageClusterReconciler) reconcilePhases( return reconcile.Result{}, err } + if res, err := r.ownStorageConsumersInNamespace(instance); err != nil || !res.IsZero() { + return reconcile.Result{}, err + } + // in-memory conditions should start off empty. It will only ever hold // negative conditions (!Available, Degraded, Progressing) r.conditions = nil @@ -816,6 +820,29 @@ func (r *StorageClusterReconciler) ownStorageClusterPeersInNamespace(instance *o return reconcile.Result{}, nil } +func (r *StorageClusterReconciler) ownStorageConsumersInNamespace(instance *ocsv1.StorageCluster) (reconcile.Result, error) { + storageConsumerList := &ocsv1alpha1.StorageConsumerList{} + err := r.Client.List(r.ctx, storageConsumerList, client.InNamespace(instance.Namespace)) + if err != nil { + return reconcile.Result{}, fmt.Errorf("failed to list storageConsumer: %w", err) + } + for i := range storageConsumerList.Items { + scp := &storageConsumerList.Items[i] + lenOwners := len(scp.OwnerReferences) + err := controllerutil.SetOwnerReference(instance, scp, r.Scheme) + if err != nil { + return reconcile.Result{}, fmt.Errorf("failed to set owner reference on storageConsumer %v: %w", scp.Name, err) + } + if lenOwners != len(scp.OwnerReferences) { + err = r.Client.Update(r.ctx, scp) + if err != nil { + return reconcile.Result{}, fmt.Errorf("failed to persist StorageCluster owner ref on storageConsumer %v: %w", scp.Name, err) + } + } + } + return reconcile.Result{}, nil +} + // Checks whether a string is contained within a slice func contains(slice []string, s string) bool { for _, item := range slice { diff --git a/services/provider/server/consumer.go b/services/provider/server/consumer.go index 63423fb620..f20f1c45cc 100644 --- a/services/provider/server/consumer.go +++ b/services/provider/server/consumer.go @@ -270,3 +270,18 @@ func (c *ocsConsumerManager) RemoveAnnotation(ctx context.Context, id string, an } return nil } + +func (c *ocsConsumerManager) GetByClientID(ctx context.Context, clientID string) (*ocsv1alpha1.StorageConsumer, error) { + consumerObjList := &ocsv1alpha1.StorageConsumerList{} + err := c.client.List(ctx, consumerObjList, client.InNamespace(c.namespace)) + if err != nil { + return nil, fmt.Errorf("failed to list storageConsumer objects: %v", err) + } + for i := range consumerObjList.Items { + consumer := consumerObjList.Items[i] + if consumer.Status.Client.ID == clientID { + return &consumer, nil + } + } + return nil, nil +} diff --git a/services/provider/server/server.go b/services/provider/server/server.go index 1733875208..6d8f8a9178 100644 --- a/services/provider/server/server.go +++ b/services/provider/server/server.go @@ -65,6 +65,7 @@ const ( monConfigMap = "rook-ceph-mon-endpoints" monSecret = "rook-ceph-mon" volumeReplicationClass5mSchedule = "5m" + mirroringTokenKey = "rbdMirrorBootstrapPeerSecretName" ) type OCSProviderServer struct { @@ -1124,6 +1125,136 @@ func (s *OCSProviderServer) RequestMaintenanceMode(ctx context.Context, req *pb. return &pb.RequestMaintenanceModeResponse{}, nil } +func (s *OCSProviderServer) GetStorageClientsInfo(ctx context.Context, req *pb.StorageClientsInfoRequest) (*pb.StorageClientsInfoResponse, error) { + klog.Infof("GetStorageClientsInfo called with request: %s", req) + + response := &pb.StorageClientsInfoResponse{} + for i := range req.ClientIDs { + consumer, err := s.consumerManager.GetByClientID(ctx, req.ClientIDs[i]) + if err != nil { + klog.Errorf("failed to get consumer with client id %v: %v", req.ClientIDs[i], err) + response.Errors = append(response.Errors, + &pb.StorageClientInfoError{ + ClientID: req.ClientIDs[i], + Code: pb.ErrorCode_Internal, + Message: "failed loading client information", + }, + ) + } + if consumer == nil { + klog.Infof("no consumer found with client id %v", req.ClientIDs[i]) + continue + } + + owner := util.FindOwnerRefByKind(consumer, "StorageCluster") + if owner == nil { + klog.Infof("no owner found for consumer %v", req.ClientIDs[i]) + continue + } + + if owner.UID != types.UID(req.StorageClusterUID) { + klog.Infof("storageCluster specified on the req does not own the client %v", req.ClientIDs[i]) + continue + } + + rnsList := &rookCephv1.CephBlockPoolRadosNamespaceList{} + err = s.client.List( + ctx, + rnsList, + client.InNamespace(s.namespace), + client.MatchingLabels{controllers.StorageConsumerNameLabel: consumer.Name}, + client.Limit(2), + ) + if err != nil { + response.Errors = append(response.Errors, + &pb.StorageClientInfoError{ + ClientID: req.ClientIDs[i], + Code: pb.ErrorCode_Internal, + Message: "failed loading client information", + }, + ) + klog.Error(err) + continue + } + if len(rnsList.Items) > 1 { + response.Errors = append(response.Errors, + &pb.StorageClientInfoError{ + ClientID: req.ClientIDs[i], + Code: pb.ErrorCode_Internal, + Message: "failed loading client information", + }, + ) + klog.Errorf("invalid number of radosnamespace found for the Client %v", req.ClientIDs[i]) + continue + } + clientInfo := &pb.ClientInfo{ClientID: req.ClientIDs[i]} + if len(rnsList.Items) == 1 { + clientInfo.RadosNamespace = rnsList.Items[0].Name + } + response.ClientsInfo = append(response.ClientsInfo, clientInfo) + } + + return response, nil +} + +func (s *OCSProviderServer) GetBlockPoolsInfo(ctx context.Context, req *pb.BlockPoolsInfoRequest) (*pb.BlockPoolsInfoResponse, error) { + klog.Infof("GetBlockPoolsInfo called with request: %s", req) + + response := &pb.BlockPoolsInfoResponse{} + for i := range req.BlockPoolNames { + cephBlockPool := &rookCephv1.CephBlockPool{} + cephBlockPool.Name = req.BlockPoolNames[i] + cephBlockPool.Namespace = s.namespace + err := s.client.Get(ctx, client.ObjectKeyFromObject(cephBlockPool), cephBlockPool) + if kerrors.IsNotFound(err) { + klog.Infof("blockpool %v not found", cephBlockPool.Name) + continue + } else if err != nil { + klog.Errorf("failed to get blockpool %v: %v", cephBlockPool.Name, err) + response.Errors = append(response.Errors, + &pb.BlockPoolInfoError{ + BlockPoolName: cephBlockPool.Name, + Code: pb.ErrorCode_Internal, + Message: "failed loading block pool information", + }, + ) + } + + var mirroringToken string + if cephBlockPool.Spec.Mirroring.Enabled && + cephBlockPool.Status.Info != nil && + cephBlockPool.Status.Info[mirroringTokenKey] != "" { + secret := &corev1.Secret{} + secret.Name = cephBlockPool.Status.Info[mirroringTokenKey] + secret.Namespace = s.namespace + err := s.client.Get(ctx, client.ObjectKeyFromObject(secret), secret) + if kerrors.IsNotFound(err) { + klog.Infof("bootstrap secret %v for blockpool %v not found", secret.Name, cephBlockPool.Name) + continue + } else if err != nil { + errMsg := fmt.Sprintf( + "failed to get bootstrap secret %s for CephBlockPool %s: %v", + cephBlockPool.Status.Info[mirroringTokenKey], + cephBlockPool.Name, + err, + ) + klog.Error(errMsg) + continue + } + mirroringToken = string(secret.Data["token"]) + } + + response.BlockPoolsInfo = append(response.BlockPoolsInfo, &pb.BlockPoolInfo{ + BlockPoolName: cephBlockPool.Name, + MirroringToken: mirroringToken, + BlockPoolID: strconv.Itoa(cephBlockPool.Status.PoolID), + }) + + } + + return response, nil +} + func (s *OCSProviderServer) isSystemInMaintenanceMode(ctx context.Context) (bool, error) { // found - false, not found - true cephRBDMirrors := &rookCephv1.CephRBDMirror{}