From 4737528f1e2dac885eea72dcba23ce5292ff9b5c Mon Sep 17 00:00:00 2001 From: Rewant Soni Date: Wed, 19 Jun 2024 18:08:22 +0530 Subject: [PATCH] add server implementation Signed-off-by: Rewant Soni --- controllers/util/k8sutil.go | 3 + .../ocs-operator/manifests/provider-role.yaml | 18 +- rbac/provider-role.yaml | 18 +- services/provider/server/configmap.go | 150 +++++++++++++++++ services/provider/server/server.go | 159 ++++++++++++++++-- 5 files changed, 329 insertions(+), 19 deletions(-) create mode 100644 services/provider/server/configmap.go diff --git a/controllers/util/k8sutil.go b/controllers/util/k8sutil.go index 71c1c08270..f13637dd8c 100644 --- a/controllers/util/k8sutil.go +++ b/controllers/util/k8sutil.go @@ -45,6 +45,9 @@ const ( OwnerUIDIndexName = "ownerUID" OdfInfoNamespacedNameClaimName = "odfinfo.odf.openshift.io" + + // CephBlockPoolForbidMirroringLabel is used to blacklist cephBlockPool for mirroring + CephBlockPoolForbidMirroringLabel = "ocs.openshift.io/forbid-mirroring" ) // GetWatchNamespace returns the namespace the operator should be watching for changes diff --git a/deploy/ocs-operator/manifests/provider-role.yaml b/deploy/ocs-operator/manifests/provider-role.yaml index aea3ada2e5..32f978d64e 100644 --- a/deploy/ocs-operator/manifests/provider-role.yaml +++ b/deploy/ocs-operator/manifests/provider-role.yaml @@ -6,16 +6,26 @@ rules: - apiGroups: - "" resources: - - configmaps - secrets - services verbs: - get +- apiGroups: + - "" + resources: + - configmaps + verbs: + - get + - list + - create + - update + - delete - apiGroups: - ceph.rook.io resources: - cephfilesystemsubvolumegroups - cephblockpoolradosnamespaces + - cephblockpools verbs: - get - list @@ -54,6 +64,12 @@ rules: - list - create - delete +- apiGroups: + - ocs.openshift.io + resources: + - storageclusters + verbs: + - get - apiGroups: - operators.coreos.com resources: diff --git a/rbac/provider-role.yaml b/rbac/provider-role.yaml index aea3ada2e5..32f978d64e 100644 --- a/rbac/provider-role.yaml +++ b/rbac/provider-role.yaml @@ -6,16 +6,26 @@ rules: - apiGroups: - "" resources: - - configmaps - secrets - services verbs: - get +- apiGroups: + - "" + resources: + - configmaps + verbs: + - get + - list + - create + - update + - delete - apiGroups: - ceph.rook.io resources: - cephfilesystemsubvolumegroups - cephblockpoolradosnamespaces + - cephblockpools verbs: - get - list @@ -54,6 +64,12 @@ rules: - list - create - delete +- apiGroups: + - ocs.openshift.io + resources: + - storageclusters + verbs: + - get - apiGroups: - operators.coreos.com resources: diff --git a/services/provider/server/configmap.go b/services/provider/server/configmap.go new file mode 100644 index 0000000000..b3b41ef0c9 --- /dev/null +++ b/services/provider/server/configmap.go @@ -0,0 +1,150 @@ +package server + +import ( + "context" + "errors" + "fmt" + "sync" + + corev1 "k8s.io/api/core/v1" + kerrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +var ( + errTicketAlreadyExistsStorageClusterPeer = errors.New("onboarding ticket already used by another storageClusterPeer representation") +) + +type ocsConfigMapManager struct { + client client.Client + namespace string + nameByTicket map[string]string + mutex sync.RWMutex +} + +func newConfigMapManager(ctx context.Context, cl client.Client, namespace string) (*ocsConfigMapManager, error) { + configMaps := &corev1.ConfigMapList{} + + //TODO: figure out a way to list only the configMaps for storageClusterPeer Representation + // we can use cached client to set up a indexer based on ticket annotation or prefix or use a label? + err := cl.List(ctx, configMaps, client.InNamespace(namespace)) + if err != nil { + return nil, fmt.Errorf("failed to list storage consumers. %v", err) + } + + nameByTicket := map[string]string{} + + for _, configMap := range configMaps.Items { + if ticket, ok := configMap.GetAnnotations()[TicketAnnotation]; ok { + nameByTicket[ticket] = configMap.Name + } + } + + return &ocsConfigMapManager{ + client: cl, + namespace: namespace, + nameByTicket: nameByTicket, + }, nil +} + +// Create creates a new storageClusterPeer representation resource, updates the cache and returns the representation UID +func (c *ocsConfigMapManager) Create(ctx context.Context, ticket, name, namespace string) error { + + c.mutex.RLock() + if _, ok := c.nameByTicket[ticket]; ok { + c.mutex.RUnlock() + klog.Warning("onboarding ticket already in use") + return errTicketAlreadyExistsStorageClusterPeer + } + c.mutex.RUnlock() + + configMapObj := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + Annotations: map[string]string{ + TicketAnnotation: ticket, + }, + }, + } + + err := c.client.Create(ctx, configMapObj) + if err != nil { + if kerrors.IsAlreadyExists(err) { + klog.Warningf("storageClusterPeer representation %q already exists", name) + return err + } + return fmt.Errorf("failed to create storageClusterPeer representation %q. %v", configMapObj.Name, err) + } + + c.mutex.Lock() + c.nameByTicket[ticket] = configMapObj.Name + c.mutex.Unlock() + + klog.Infof("successfully created storageClusterPeer representation %q", name) + + return nil +} + +// Delete deletes the storageClusterPeer representation resource using UID and updates the cache +func (c *ocsConfigMapManager) Delete(ctx context.Context, name, namespace string) error { + + configMapObj := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + } + + if err := c.client.Delete(ctx, configMapObj); err != nil { + if kerrors.IsNotFound(err) { + klog.Warningf("storageClusterPeer representation %q not found.", configMapObj.Name) + return nil + } + return fmt.Errorf("failed to delete storageClusterPeer representation %q. %v", configMapObj.Name, err) + } + + klog.Infof("successfully deleted storageClusterPeer representation %q", configMapObj.Name) + + return nil +} + +func (c *ocsConfigMapManager) Enable(ctx context.Context, name, namespace string) error { + // Get storage cluster peer representation resource using name + configMapObj, err := c.GetByName(ctx, name, namespace) + if err != nil { + return err + } + + if configMapObj.Data == nil { + configMapObj.Data = map[string]string{} + } + configMapObj.Data["enable"] = "true" + err = c.client.Update(ctx, configMapObj) + if err != nil { + klog.Errorf("failed to enable storageClusterPeer representation %v", err) + return fmt.Errorf("failed to update storageClusterPeer representation %q. %v", configMapObj.Name, err) + } + + klog.Infof("successfully Enabled the storageClusterPeer representation %q", configMapObj.Name) + return nil +} + +// GetByName returns a storageClusterPeer representation resource using the Name +func (c *ocsConfigMapManager) GetByName(ctx context.Context, representationName, namespace string) (*corev1.ConfigMap, error) { + + configMapObj := &corev1.ConfigMap{} + if err := c.client.Get(ctx, types.NamespacedName{Name: representationName, Namespace: namespace}, configMapObj); err != nil { + klog.Errorf("Failed to get the storageClusterPeer representation configMap %s: %v", representationName, err) + return nil, err + } + + return configMapObj, nil +} + +func (c *ocsConfigMapManager) IsEnabled(configMap *corev1.ConfigMap) bool { + return configMap.Data["enable"] == "true" +} diff --git a/services/provider/server/server.go b/services/provider/server/server.go index b046529804..25234c4aa8 100644 --- a/services/provider/server/server.go +++ b/services/provider/server/server.go @@ -57,16 +57,18 @@ const ( ) const ( - monConfigMap = "rook-ceph-mon-endpoints" - monSecret = "rook-ceph-mon" + monConfigMap = "rook-ceph-mon-endpoints" + monSecret = "rook-ceph-mon" + rbdMirrorBootstrapPeerSecretName = "rbdMirrorBootstrapPeerSecretName" ) type OCSProviderServer struct { pb.UnimplementedOCSProviderServer - client client.Client - consumerManager *ocsConsumerManager - storageRequestManager *storageRequestManager - namespace string + client client.Client + consumerManager *ocsConsumerManager + storageRequestManager *storageRequestManager + storageClusterPeerManager *ocsConfigMapManager + namespace string } func NewOCSProviderServer(ctx context.Context, namespace string) (*OCSProviderServer, error) { @@ -85,11 +87,17 @@ func NewOCSProviderServer(ctx context.Context, namespace string) (*OCSProviderSe return nil, fmt.Errorf("failed to create new StorageRequest instance. %v", err) } + storageClusterPeerManager, err := newConfigMapManager(ctx, client, namespace) + if err != nil { + return nil, fmt.Errorf("failed to create new StorageClusterPeerManager instance. %v", err) + } + return &OCSProviderServer{ - client: client, - consumerManager: consumerManager, - storageRequestManager: storageRequestManager, - namespace: namespace, + client: client, + consumerManager: consumerManager, + storageRequestManager: storageRequestManager, + storageClusterPeerManager: storageClusterPeerManager, + namespace: namespace, }, nil } @@ -112,7 +120,7 @@ func (s *OCSProviderServer) OnboardConsumer(ctx context.Context, req *pb.Onboard return nil, status.Errorf(codes.Internal, "failed to get public key to validate onboarding ticket for consumer %q. %v", req.ConsumerName, err) } - onboardingTicket, err := decodeAndValidateTicket(req.OnboardingTicket, pubKey) + onboardingTicket, err := decodeAndValidateTicket(req.OnboardingTicket, pubKey, services.ClientRole) if err != nil { klog.Errorf("failed to validate onboarding ticket for consumer %q. %v", req.ConsumerName, err) return nil, status.Errorf(codes.InvalidArgument, "onboarding ticket is not valid. %v", err) @@ -238,6 +246,10 @@ func newClient() (client.Client, error) { if err != nil { return nil, fmt.Errorf("failed to add ocsv1alpha1 to scheme. %v", err) } + err = ocsv1.AddToScheme(scheme) + if err != nil { + return nil, fmt.Errorf("failed to add ocsv1 to scheme. %v", err) + } err = corev1.AddToScheme(scheme) if err != nil { return nil, fmt.Errorf("failed to add ocsv1alpha1 to scheme. %v", err) @@ -497,7 +509,7 @@ func getSubVolumeGroupClusterID(subVolumeGroup *rookCephv1.CephFilesystemSubVolu return hex.EncodeToString(hash[:16]) } -func decodeAndValidateTicket(ticket string, pubKey *rsa.PublicKey) (*services.OnboardingTicket, error) { +func decodeAndValidateTicket(ticket string, pubKey *rsa.PublicKey, desiredRole services.OnboardingSubjectRole) (*services.OnboardingTicket, error) { ticketArr := strings.Split(string(ticket), ".") if len(ticketArr) != 2 { return nil, fmt.Errorf("invalid ticket") @@ -518,6 +530,10 @@ func decodeAndValidateTicket(ticket string, pubKey *rsa.PublicKey) (*services.On return nil, fmt.Errorf("invalid value sent in onboarding ticket, storage quota should be greater than 0 and less than %v: %v", math.MaxInt, ticketData.StorageQuotaInGiB) } + if ticketData.SubjectRole != desiredRole { + return nil, fmt.Errorf("failed to validate onboarding ticket role, %s role used instead of %s", ticketData.SubjectRole, desiredRole) + } + signature, err := base64.StdEncoding.DecodeString(ticketArr[1]) if err != nil { return nil, fmt.Errorf("failed to decode onboarding ticket %s signature: %v", ticketData.ID, err) @@ -865,18 +881,127 @@ func extractMonitorIps(data string) ([]string, error) { return ips, nil } -func (s *OCSProviderServer) OnboardStorageClusterPeer(_ context.Context, _ *pb.OnboardStorageClusterPeerRequest) (*pb.OnboardStorageClusterPeerResponse, error) { +func (s *OCSProviderServer) OnboardStorageClusterPeer(ctx context.Context, req *pb.OnboardStorageClusterPeerRequest) (*pb.OnboardStorageClusterPeerResponse, error) { + + pubKey, err := s.getOnboardingValidationKey(ctx) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to get public key to validate onboarding ticket for StorageClusterPeer %q. %v", req.StorageClusterPeerUID, err) + } + + if _, err := decodeAndValidateTicket(req.OnboardingTicket, pubKey, services.PeerRole); err != nil { + klog.Errorf("failed to validate onboarding ticket for StorageClusterPeer %q. %v", req.StorageClusterPeerUID, err) + return nil, status.Errorf(codes.InvalidArgument, "onboarding ticket is not valid. %v", err) + } + + err = s.storageClusterPeerManager.Create(ctx, req.OnboardingTicket, req.StorageClusterPeerUID, s.namespace) + if err != nil { + if !kerrors.IsAlreadyExists(err) && err != errTicketAlreadyExistsStorageClusterPeer { + return nil, status.Errorf(codes.Internal, "failed to create StorageClusterPeer representation %q. %v", req.StorageClusterPeerUID, err) + } + + representation, err := s.storageClusterPeerManager.GetByName(ctx, req.StorageClusterPeerUID, s.namespace) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to get StorageClusterPeer representation. %v", err) + } + + if s.storageClusterPeerManager.IsEnabled(representation) { + err = fmt.Errorf("storageClusterPeer representation %s already exists", req.StorageClusterPeerUID) + return nil, status.Errorf(codes.AlreadyExists, "failed to create storageClusterPeer representation %q. %v", req.StorageClusterPeerUID, err) + } + } + return &pb.OnboardStorageClusterPeerResponse{}, nil } -func (s *OCSProviderServer) OffboardStorageClusterPeer(_ context.Context, _ *pb.OffboardStorageClusterPeerRequest) (*pb.OffboardStorageClusterPeerResponse, error) { +func (s *OCSProviderServer) OffboardStorageClusterPeer(ctx context.Context, req *pb.OffboardStorageClusterPeerRequest) (*pb.OffboardStorageClusterPeerResponse, error) { + + err := s.storageClusterPeerManager.Delete(ctx, req.StorageClusterPeerUID, s.namespace) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to delete storageClusterPeer representation with the provided UUID. %v", err) + } + return &pb.OffboardStorageClusterPeerResponse{}, nil } -func (s *OCSProviderServer) AcknowledgeOnboardingStorageClusterPeer(_ context.Context, _ *pb.AcknowledgeOnboardingStorageClusterPeerRequest) (*pb.AcknowledgeOnboardingStorageClusterPeerResponse, error) { +func (s *OCSProviderServer) AcknowledgeOnboardingStorageClusterPeer(ctx context.Context, req *pb.AcknowledgeOnboardingStorageClusterPeerRequest) (*pb.AcknowledgeOnboardingStorageClusterPeerResponse, error) { + + if err := s.storageClusterPeerManager.Enable(ctx, req.StorageClusterPeerUID, s.namespace); err != nil { + if kerrors.IsNotFound(err) { + return nil, status.Errorf(codes.NotFound, "storageClusterPeer representation not found. %v", err) + } + return nil, status.Errorf(codes.Internal, "Failed to update the storageClusterPeer representation. %v", err) + } return &pb.AcknowledgeOnboardingStorageClusterPeerResponse{}, nil } -func (s *OCSProviderServer) GetMirroringInfo(_ context.Context, _ *pb.MirroringInfoRequest) (*pb.MirroringInfoResponse, error) { - return &pb.MirroringInfoResponse{}, nil +func (s *OCSProviderServer) GetMirroringInfo(ctx context.Context, req *pb.MirroringInfoRequest) (*pb.MirroringInfoResponse, error) { + + representation, err := s.storageClusterPeerManager.GetByName(ctx, req.StorageClusterPeerUID, s.namespace) + if err != nil { + errMsg := fmt.Sprintf("failed to get storageClusterPeer representation: %q. %v", req.StorageClusterPeerUID, err) + if kerrors.IsNotFound(err) { + return nil, status.Error(codes.NotFound, errMsg) + } + return nil, status.Error(codes.Internal, errMsg) + } + + if !s.storageClusterPeerManager.IsEnabled(representation) { + errMsg := fmt.Sprintf("failed to process the request as StorageClusterPeer is not acknowledged: %q", req.StorageClusterPeerUID) + return nil, status.Errorf(codes.Unauthenticated, errMsg) + + } + + var extR []*pb.ExternalResource + + cephBlockPool := &rookCephv1.CephBlockPool{} + for _, blockPoolName := range req.BlockPoolNames { + err := s.client.Get(ctx, types.NamespacedName{Name: blockPoolName, Namespace: s.namespace}, cephBlockPool) + if err != nil { + return nil, status.Errorf(codes.NotFound, err.Error()) + } + + // if the blockPool is forbidden for mirroring return an error + if _, ok := cephBlockPool.Labels[util.CephBlockPoolForbidMirroringLabel]; ok { + errMsg := fmt.Sprintf("BlockPool is forbidden for mirroring: %v", blockPoolName) + klog.Error(errMsg) + return nil, status.Errorf(codes.Internal, errMsg) + } + + // if the blockPool mirroring is not enabled return an error + if !cephBlockPool.Spec.Mirroring.Enabled { + errMsg := fmt.Sprintf("Mirroring is not enabled for blockpool: %v", blockPoolName) + klog.Error(errMsg) + return nil, status.Errorf(codes.Internal, errMsg) + } + + if cephBlockPool.Status.Info != nil && + cephBlockPool.Status.Info[rbdMirrorBootstrapPeerSecretName] != "" { + + secret := &corev1.Secret{} + err := s.client.Get(ctx, + types.NamespacedName{ + Name: cephBlockPool.Status.Info[rbdMirrorBootstrapPeerSecretName], + Namespace: s.namespace}, + secret) + if err != nil { + errMsg := fmt.Sprintf("Error fetching bootstrap secret %s for blockPool %s: %v", + cephBlockPool.Status.Info[rbdMirrorBootstrapPeerSecretName], + blockPoolName, + err) + klog.Error(errMsg) + return nil, status.Errorf(codes.Internal, errMsg) + } + + extR = append(extR, &pb.ExternalResource{ + Name: cephBlockPool.Name, + Kind: "Secret", + Data: mustMarshal(secret.Data)}) + } else { + errMsg := fmt.Sprintf("Bootstrap secret for BlockPool %s is not generated", blockPoolName) + klog.Error(errMsg) + return nil, status.Errorf(codes.Internal, errMsg) + } + } + + return &pb.MirroringInfoResponse{ExternalResource: extR}, nil }