From 2ebffec04c11b7af759d24c95088efad20bebb7d Mon Sep 17 00:00:00 2001 From: Rewant Soni Date: Wed, 19 Jun 2024 18:08:22 +0530 Subject: [PATCH] add server implementation Signed-off-by: Rewant Soni --- controllers/util/k8sutil.go | 3 + .../ocs-operator/manifests/provider-role.yaml | 12 +- rbac/provider-role.yaml | 12 +- services/provider/server/configmap.go | 197 ++++++++++++++++++ services/provider/server/server.go | 164 +++++++++++++-- 5 files changed, 368 insertions(+), 20 deletions(-) create mode 100644 services/provider/server/configmap.go diff --git a/controllers/util/k8sutil.go b/controllers/util/k8sutil.go index 01a68536c8..e9146c1d20 100644 --- a/controllers/util/k8sutil.go +++ b/controllers/util/k8sutil.go @@ -42,6 +42,9 @@ const ( // This is the name for the OwnerUID FieldIndex OwnerUIDIndexName = "ownerUID" + + // CephBlockPoolBlackListAnnotation is used to disable cephBlockPool mirroring for RDR + CephBlockPoolBlackListAnnotation = "ocs.openshift.io/disable-mirroring" ) // GetWatchNamespace returns the namespace the operator should be watching for changes diff --git a/deploy/ocs-operator/manifests/provider-role.yaml b/deploy/ocs-operator/manifests/provider-role.yaml index bd708fc6fe..8a61b85d66 100644 --- a/deploy/ocs-operator/manifests/provider-role.yaml +++ b/deploy/ocs-operator/manifests/provider-role.yaml @@ -6,16 +6,26 @@ rules: - apiGroups: - "" resources: - - configmaps - secrets - services verbs: - get +- apiGroups: + - "" + resources: + - configmaps + verbs: + - get + - list + - create + - update + - delete - apiGroups: - ceph.rook.io resources: - cephfilesystemsubvolumegroups - cephblockpoolradosnamespaces + - cephblockpools verbs: - get - list diff --git a/rbac/provider-role.yaml b/rbac/provider-role.yaml index bd708fc6fe..8a61b85d66 100644 --- a/rbac/provider-role.yaml +++ b/rbac/provider-role.yaml @@ -6,16 +6,26 @@ rules: - apiGroups: - "" resources: - - configmaps - secrets - services verbs: - get +- apiGroups: + - "" + resources: + - configmaps + verbs: + - get + - list + - create + - update + - delete - apiGroups: - ceph.rook.io resources: - cephfilesystemsubvolumegroups - cephblockpoolradosnamespaces + - cephblockpools verbs: - get - list diff --git a/services/provider/server/configmap.go b/services/provider/server/configmap.go new file mode 100644 index 0000000000..3674c3a202 --- /dev/null +++ b/services/provider/server/configmap.go @@ -0,0 +1,197 @@ +package server + +import ( + "context" + "errors" + "fmt" + "sync" + + corev1 "k8s.io/api/core/v1" + kerrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +var ( + errTicketAlreadyExistsStorageClusterPeer = errors.New("onboarding ticket already used by another storageClusterPeer") +) + +type ocsConfigMapManager struct { + client client.Client + namespace string + nameByTicket map[string]string + nameByUID map[types.UID]string + mutex sync.RWMutex +} + +func newConfigMapManager(ctx context.Context, cl client.Client, namespace string) (*ocsConfigMapManager, error) { + configMaps := &corev1.ConfigMapList{} + + //TODO: figure out a way to list only the configMaps for storageClusterPeer Representation + // we can use cached client to set up a indexer based on ticket annotation or prefix or use a label? + err := cl.List(ctx, configMaps, client.InNamespace(namespace)) + if err != nil { + return nil, fmt.Errorf("failed to list storage consumers. %v", err) + } + + nameByTicket := map[string]string{} + nameByUID := map[types.UID]string{} + + for _, configMap := range configMaps.Items { + if ticket, ok := configMap.GetAnnotations()[TicketAnnotation]; ok { + nameByUID[configMap.UID] = configMap.Name + nameByTicket[ticket] = configMap.Name + } + } + + return &ocsConfigMapManager{ + client: cl, + namespace: namespace, + nameByTicket: nameByTicket, + nameByUID: nameByUID, + }, nil +} + +// Create creates a new storageClusterPeer representation resource, updates the cache and returns the representation UID +func (c *ocsConfigMapManager) Create(ctx context.Context, ticket, name string) (string, error) { + + c.mutex.RLock() + if _, ok := c.nameByTicket[ticket]; ok { + c.mutex.RUnlock() + klog.Warning("onboarding ticket already in use") + return "", errTicketAlreadyExistsStorageClusterPeer + } + c.mutex.RUnlock() + + configMapObj := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: c.namespace, + Annotations: map[string]string{ + TicketAnnotation: ticket, + }, + }, + } + + err := c.client.Create(ctx, configMapObj) + if err != nil { + if kerrors.IsAlreadyExists(err) { + klog.Warningf("storageClusterPeer representation %q already exists", name) + return "", err + } + return "", fmt.Errorf("failed to create storageClusterPeer representation %q. %v", configMapObj.Name, err) + } + + c.mutex.Lock() + c.nameByUID[configMapObj.UID] = configMapObj.Name + c.nameByTicket[ticket] = configMapObj.Name + c.mutex.Unlock() + + klog.Infof("successfully created storageClusterPeer representation %q", name) + + return string(configMapObj.UID), nil +} + +// Delete deletes the storageClusterPeer representation resource using UID and updates the cache +func (c *ocsConfigMapManager) Delete(ctx context.Context, id string) error { + uid := types.UID(id) + c.mutex.RLock() + representationName, ok := c.nameByUID[uid] + if !ok { + klog.Warningf("no storageClusterPeer representation found with UID %q", id) + c.mutex.RUnlock() + return nil + } + c.mutex.RUnlock() + + configMapObj := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: representationName, + Namespace: c.namespace, + }, + } + + if err := c.client.Delete(ctx, configMapObj); err != nil { + if kerrors.IsNotFound(err) { + // update uidStore + c.mutex.Lock() + delete(c.nameByUID, uid) + c.mutex.Unlock() + klog.Warningf("storageClusterPeer representation %q not found.", configMapObj.Name) + return nil + } + return fmt.Errorf("failed to delete storageClusterPeer representation %q. %v", configMapObj.Name, err) + } + + c.mutex.Lock() + delete(c.nameByUID, uid) + c.mutex.Unlock() + + klog.Infof("successfully deleted storageClusterPeer representation %q", configMapObj.Name) + + return nil +} + +func (c *ocsConfigMapManager) Enable(ctx context.Context, id string) error { + // Get storage consumer resource using UID + configMapObj, err := c.Get(ctx, id) + if err != nil { + return err + } + + if configMapObj.Data == nil { + configMapObj.Data = map[string]string{} + } + + configMapObj.Data["enable"] = "true" + err = c.client.Update(ctx, configMapObj) + if err != nil { + klog.Errorf("Failed to enable storageClusterPeer representation %v", err) + return fmt.Errorf("failed to update storageClusterPeer representation %q. %v", configMapObj.Name, err) + } + + klog.Infof("successfully Enabled the storageClusterPeer representation %q", configMapObj.Name) + return nil +} + +// GetByName returns a storageClusterPeer representation resource using the Name +func (c *ocsConfigMapManager) GetByName(ctx context.Context, representationName string) (*corev1.ConfigMap, error) { + + configMapObj := &corev1.ConfigMap{} + if err := c.client.Get(ctx, types.NamespacedName{Name: representationName, Namespace: c.namespace}, configMapObj); err != nil { + klog.Errorf("Failed to get the storageClusterPeer representation configMap %s: %v", representationName, err) + return nil, err + } + + return configMapObj, nil +} + +// Get returns a storageClusterPeer representation resource using the UID +func (c *ocsConfigMapManager) Get(ctx context.Context, id string) (*corev1.ConfigMap, error) { + uid := types.UID(id) + + c.mutex.RLock() + representationName, ok := c.nameByUID[uid] + if !ok { + c.mutex.RUnlock() + klog.Errorf("no storageClusterPeer representation found with the UID %q", id) + return nil, fmt.Errorf("no storageClusterPeer representation found with the UID %q", id) + } + c.mutex.RUnlock() + + consumerObj := &corev1.ConfigMap{} + if err := c.client.Get(ctx, types.NamespacedName{Name: representationName, Namespace: c.namespace}, consumerObj); err != nil { + if kerrors.IsNotFound(err) { + // update uidStore + c.mutex.Lock() + delete(c.nameByUID, uid) + c.mutex.Unlock() + return nil, fmt.Errorf("storageClusterPeer representation resource %q not found. %v", representationName, err) + } + return nil, fmt.Errorf("failed to get storageClusterPeer representation resource with name %q. %v", representationName, err) + } + + return consumerObj, nil +} diff --git a/services/provider/server/server.go b/services/provider/server/server.go index 1ebd497fa8..24c8c8170e 100644 --- a/services/provider/server/server.go +++ b/services/provider/server/server.go @@ -51,16 +51,18 @@ const ( ) const ( - monConfigMap = "rook-ceph-mon-endpoints" - monSecret = "rook-ceph-mon" + monConfigMap = "rook-ceph-mon-endpoints" + monSecret = "rook-ceph-mon" + rbdMirrorBootstrapPeerSecretName = "rbdMirrorBootstrapPeerSecretName" ) type OCSProviderServer struct { pb.UnimplementedOCSProviderServer - client client.Client - consumerManager *ocsConsumerManager - storageRequestManager *storageRequestManager - namespace string + client client.Client + consumerManager *ocsConsumerManager + storageRequestManager *storageRequestManager + storageClusterPeerManager *ocsConfigMapManager + namespace string } func NewOCSProviderServer(ctx context.Context, namespace string) (*OCSProviderServer, error) { @@ -79,11 +81,17 @@ func NewOCSProviderServer(ctx context.Context, namespace string) (*OCSProviderSe return nil, fmt.Errorf("failed to create new StorageRequest instance. %v", err) } + storageClusterPeerManager, err := newConfigMapManager(ctx, client, namespace) + if err != nil { + return nil, fmt.Errorf("failed to create new StorageClusterPeerManager instance. %v", err) + } + return &OCSProviderServer{ - client: client, - consumerManager: consumerManager, - storageRequestManager: storageRequestManager, - namespace: namespace, + client: client, + consumerManager: consumerManager, + storageRequestManager: storageRequestManager, + storageClusterPeerManager: storageClusterPeerManager, + namespace: namespace, }, nil } @@ -106,7 +114,7 @@ func (s *OCSProviderServer) OnboardConsumer(ctx context.Context, req *pb.Onboard return nil, status.Errorf(codes.Internal, "failed to get public key to validate onboarding ticket for consumer %q. %v", req.ConsumerName, err) } - if err := validateTicket(req.OnboardingTicket, pubKey); err != nil { + if err := validateTicket(req.OnboardingTicket, pubKey, services.ClientRole); err != nil { klog.Errorf("failed to validate onboarding ticket for consumer %q. %v", req.ConsumerName, err) return nil, status.Errorf(codes.InvalidArgument, "onboarding ticket is not valid. %v", err) } @@ -436,7 +444,7 @@ func getSubVolumeGroupClusterID(subVolumeGroup *rookCephv1.CephFilesystemSubVolu return hex.EncodeToString(hash[:16]) } -func validateTicket(ticket string, pubKey *rsa.PublicKey) error { +func validateTicket(ticket string, pubKey *rsa.PublicKey, role services.Role) error { ticketArr := strings.Split(string(ticket), ".") if len(ticketArr) != 2 { return fmt.Errorf("invalid ticket") @@ -453,6 +461,10 @@ func validateTicket(ticket string, pubKey *rsa.PublicKey) error { return fmt.Errorf("failed to unmarshal onboarding ticket message. %v", err) } + if ticketData.Role != role { + return fmt.Errorf("failed to validate onboarding ticket role, %s role used instead of %s", ticketData.Role, role) + } + signature, err := base64.StdEncoding.DecodeString(ticketArr[1]) if err != nil { return fmt.Errorf("failed to decode onboarding ticket %s signature: %v", ticketData.ID, err) @@ -723,18 +735,134 @@ func (s *OCSProviderServer) getOCSSubscriptionChannel(ctx context.Context) (stri return subscription.Spec.Channel, nil } -func (s *OCSProviderServer) OnboardMirroringPeer(_ context.Context, _ *pb.OnboardMirroringPeerRequest) (*pb.OnboardMirroringPeerResponse, error) { - return &pb.OnboardMirroringPeerResponse{}, nil +func (s *OCSProviderServer) OnboardMirroringPeer(ctx context.Context, req *pb.OnboardMirroringPeerRequest) (*pb.OnboardMirroringPeerResponse, error) { + + pubKey, err := s.getOnboardingValidationKey(ctx) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to get public key to validate onboarding ticket for StorageClusterPeer %q. %v", req.StorageClusterPeerName, err) + } + + if err := validateTicket(req.OnboardingTicket, pubKey, services.MirroringPeerRole); err != nil { + klog.Errorf("failed to validate onboarding ticket for StorageClusterPeer %q. %v", req.StorageClusterPeerName, err) + return nil, status.Errorf(codes.InvalidArgument, "onboarding ticket is not valid. %v", err) + } + + representationID, err := s.storageClusterPeerManager.Create(ctx, req.OnboardingTicket, req.StorageClusterPeerName) + if err != nil { + if !kerrors.IsAlreadyExists(err) && err != errTicketAlreadyExistsStorageClusterPeer { + return nil, status.Errorf(codes.Internal, "failed to create StorageClusterPeer representation %q. %v", req.StorageClusterPeerName, err) + } + + representation, err := s.storageClusterPeerManager.GetByName(ctx, req.StorageClusterPeerName) + if err != nil { + return nil, status.Errorf(codes.Internal, "Failed to get StorageClusterPeer representation. %v", err) + } + + if representation.Data["enable"] != "" { + err = fmt.Errorf("storageClusterPeer representation %s already exists", req.StorageClusterPeerName) + return nil, status.Errorf(codes.AlreadyExists, "failed to create storageClusterPeer representation %q. %v", req.StorageClusterPeerName, err) + } + representationID = string(representation.UID) + } + + return &pb.OnboardMirroringPeerResponse{UUID: representationID}, nil } -func (s *OCSProviderServer) OffboardMirroringPeer(_ context.Context, _ *pb.OffboardMirroringPeerRequest) (*pb.OffboardMirroringPeerResponse, error) { +func (s *OCSProviderServer) OffboardMirroringPeer(ctx context.Context, req *pb.OffboardMirroringPeerRequest) (*pb.OffboardMirroringPeerResponse, error) { + + err := s.storageClusterPeerManager.Delete(ctx, req.UUID) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to delete storageClusterPeer representation with the provided UUID. %v", err) + } + return &pb.OffboardMirroringPeerResponse{}, nil } -func (s *OCSProviderServer) AcknowledgeMirrorPeerOnboarding(_ context.Context, _ *pb.AcknowledgeMirrorPeerOnboardingRequest) (*pb.AcknowledgeMirrorPeerOnboardingResponse, error) { +func (s *OCSProviderServer) AcknowledgeMirrorPeerOnboarding(ctx context.Context, req *pb.AcknowledgeMirrorPeerOnboardingRequest) (*pb.AcknowledgeMirrorPeerOnboardingResponse, error) { + if err := s.storageClusterPeerManager.Enable(ctx, req.UUID); err != nil { + if kerrors.IsNotFound(err) { + return nil, status.Errorf(codes.NotFound, "storageClusterPeer representation not found. %v", err) + } + return nil, status.Errorf(codes.Internal, "Failed to update the storageClusterPeer representation. %v", err) + } return &pb.AcknowledgeMirrorPeerOnboardingResponse{}, nil } -func (s *OCSProviderServer) GetMirroringInfo(_ context.Context, _ *pb.MirroringInfoRequest) (*pb.MirroringInfoResponse, error) { - return &pb.MirroringInfoResponse{}, nil +func (s *OCSProviderServer) GetMirroringInfo(ctx context.Context, req *pb.MirroringInfoRequest) (*pb.MirroringInfoResponse, error) { + representation, err := s.storageClusterPeerManager.Get(ctx, req.UUID) + if err != nil { + errMsg := fmt.Sprintf("failed to get storageClusterPeer representation: %q. %v", req.UUID, err) + if kerrors.IsNotFound(err) { + return nil, status.Error(codes.NotFound, errMsg) + } + return nil, status.Error(codes.Internal, errMsg) + } + + if representation.Data == nil || representation.Data["enable"] != "true" { + errMsg := fmt.Sprintf("failed to process the request as StorageClusterPeer is not acknowledged: %q", req.UUID) + return nil, status.Errorf(codes.Unauthenticated, errMsg) + + } + + var extR []*pb.ExternalResource + + cephBlockPool := &rookCephv1.CephBlockPool{} + for _, blockPoolName := range req.BlockPoolNames { + err := s.client.Get(ctx, types.NamespacedName{Name: blockPoolName, Namespace: s.namespace}, cephBlockPool) + if err != nil { + return nil, status.Errorf(codes.NotFound, err.Error()) + } + + // if the blockPool is disabled for mirroring return an error + if cephBlockPool.GetAnnotations() != nil { + if _, ok := cephBlockPool.GetAnnotations()[util.CephBlockPoolBlackListAnnotation]; ok { + errMsg := fmt.Sprintf("BlockPool is disabled for mirroring: %v", blockPoolName) + klog.Error(errMsg) + return nil, status.Errorf(codes.Internal, errMsg) + } + } + + // if the blockPool mirroring is not enabled return an error + if !cephBlockPool.Spec.Mirroring.Enabled { + errMsg := fmt.Sprintf("Mirroring is not enabled for blockpool: %v", blockPoolName) + klog.Error(errMsg) + return nil, status.Errorf(codes.Internal, errMsg) + } + + if cephBlockPool.Status.Info != nil && + cephBlockPool.Status.Info[rbdMirrorBootstrapPeerSecretName] != "" { + + secret := &corev1.Secret{} + err := s.client.Get(ctx, + types.NamespacedName{Name: cephBlockPool.Status.Info[rbdMirrorBootstrapPeerSecretName], Namespace: s.namespace}, + secret) + if err != nil { + errMsg := fmt.Sprintf("Error fetching bootstrap secret %s for blockPool %s: %v", + cephBlockPool.Status.Info[rbdMirrorBootstrapPeerSecretName], + blockPoolName, + err) + klog.Error(errMsg) + return nil, status.Errorf(codes.Internal, errMsg) + } + + extR = append(extR, &pb.ExternalResource{ + Name: cephBlockPool.Name, + Kind: "Secret", + Data: mustMarshalByte(secret.Data)}) + } else { + errMsg := fmt.Sprintf("Bootstrap secret for BlockPool %s is not generated", blockPoolName) + klog.Error(errMsg) + return nil, status.Errorf(codes.Internal, errMsg) + } + } + + return &pb.MirroringInfoResponse{ExternalResource: extR}, nil +} + +func mustMarshalByte(data map[string][]byte) []byte { + newData, err := json.Marshal(data) + if err != nil { + panic("failed to marshal") + } + return newData }