From 3969adaaa87999307bc7c401bdde9ea4e06f177b Mon Sep 17 00:00:00 2001 From: Rewant Soni Date: Wed, 9 Oct 2024 12:07:26 +0530 Subject: [PATCH] implement server side rpc call for peering Signed-off-by: Rewant Soni --- rbac/provider-role.yaml | 9 ++ services/provider/server/server.go | 98 +++++++++++++++++-- .../provider/server/storageclusterpeer.go | 51 ++++++++++ 3 files changed, 148 insertions(+), 10 deletions(-) create mode 100644 services/provider/server/storageclusterpeer.go diff --git a/rbac/provider-role.yaml b/rbac/provider-role.yaml index dd1c692975..953361821a 100644 --- a/rbac/provider-role.yaml +++ b/rbac/provider-role.yaml @@ -75,3 +75,12 @@ rules: verbs: - get - list + - apiGroups: + - ocs.openshift.io + resources: + - storageclusterpeers + - storageclusterpeers/status + verbs: + - get + - list + - update diff --git a/services/provider/server/server.go b/services/provider/server/server.go index 9ff8eba5d8..299fda1bdd 100644 --- a/services/provider/server/server.go +++ b/services/provider/server/server.go @@ -66,10 +66,11 @@ const ( type OCSProviderServer struct { pb.UnimplementedOCSProviderServer - client client.Client - consumerManager *ocsConsumerManager - storageRequestManager *storageRequestManager - namespace string + client client.Client + consumerManager *ocsConsumerManager + storageRequestManager *storageRequestManager + storageClusterPeerManager *storageClusterPeerManager + namespace string } func NewOCSProviderServer(ctx context.Context, namespace string) (*OCSProviderServer, error) { @@ -88,11 +89,17 @@ func NewOCSProviderServer(ctx context.Context, namespace string) (*OCSProviderSe return nil, fmt.Errorf("failed to create new StorageRequest instance. %v", err) } + storageClusterPeerManager, err := newStorageClusterPeerManager(client, namespace) + if err != nil { + return nil, fmt.Errorf("failed to create new StorageClusterPeer instance. %v", err) + } + return &OCSProviderServer{ - client: client, - consumerManager: consumerManager, - storageRequestManager: storageRequestManager, - namespace: namespace, + client: client, + consumerManager: consumerManager, + storageRequestManager: storageRequestManager, + storageClusterPeerManager: storageClusterPeerManager, + namespace: namespace, }, nil } @@ -893,6 +900,77 @@ func extractMonitorIps(data string) ([]string, error) { return ips, nil } -func (s *OCSProviderServer) PeerStorageCluster(_ context.Context, _ *pb.PeerStorageClusterRequest) (*pb.PeerStorageClusterResponse, error) { - return &pb.PeerStorageClusterResponse{}, nil +func (s *OCSProviderServer) PeerStorageCluster(ctx context.Context, req *pb.PeerStorageClusterRequest) (*pb.PeerStorageClusterResponse, error) { + + pubKey, err := s.getOnboardingValidationKey(ctx) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to get public key to validate onboarding ticket for StorageCluster %q. %v", req.StorageClusterUID, err) + } + + onboardingToken, err := decodeAndValidateTicket(req.OnboardingToken, pubKey) + if err != nil { + klog.Errorf("failed to validate onboarding ticket for StorageCluster %q. %v", req.StorageClusterUID, err) + return nil, status.Errorf(codes.InvalidArgument, "onboarding ticket is not valid. %v", err) + } + + if onboardingToken.SubjectRole != services.PeerRole { + err := fmt.Errorf("unsupported ticket role for StorageCluster %q, found %s, expected %s", req.StorageClusterUID, onboardingToken.SubjectRole, services.ClientRole) + klog.Error(err) + return nil, status.Error(codes.InvalidArgument, err.Error()) + } + + storageCluster, err := s.findStorageClusterWithUID(ctx, onboardingToken.StorageClusterUID) + if err != nil { + klog.Error(err) + return nil, status.Error(codes.InvalidArgument, err.Error()) + } + + storageClusterPeer, err := s.storageClusterPeerManager.FindStorageClusterPeerWithStorageClusterID(ctx, req.StorageClusterUID) + if err != nil { + klog.Error(err) + return nil, status.Error(codes.InvalidArgument, err.Error()) + } + + //TODO: Should we store the onboarding token in storageClusterPeer? + + err = s.storageClusterPeerManager.UpdateStorageClusterPeerStatus(ctx, storageClusterPeer, req.StorageClusterUID) + if err != nil { + klog.Error(err) + return nil, status.Error(codes.InvalidArgument, err.Error()) + } + + return &pb.PeerStorageClusterResponse{StorageClusterUID: string(storageCluster.UID)}, nil +} + +func (s *OCSProviderServer) findStorageClusterWithUID(ctx context.Context, uid string) (*ocsv1.StorageCluster, error) { + storageClusterList := &ocsv1.StorageClusterList{} + err := s.client.List(ctx, storageClusterList, client.InNamespace(s.namespace)) + if err != nil { + return nil, err + } + for i := range storageClusterList.Items { + if string(storageClusterList.Items[i].UID) == uid { + return &storageClusterList.Items[i], nil + } + } + return nil, fmt.Errorf("storage cluster with uid %q not found", uid) +} + +func decodeToken(ticket string) (*services.OnboardingTicket, error) { + ticketArr := strings.Split(string(ticket), ".") + if len(ticketArr) != 2 { + return nil, fmt.Errorf("invalid ticket") + } + + message, err := base64.StdEncoding.DecodeString(ticketArr[0]) + if err != nil { + return nil, fmt.Errorf("failed to decode onboarding ticket: %v", err) + } + + var ticketData services.OnboardingTicket + err = json.Unmarshal(message, &ticketData) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal onboarding ticket message. %v", err) + } + return &ticketData, nil } diff --git a/services/provider/server/storageclusterpeer.go b/services/provider/server/storageclusterpeer.go new file mode 100644 index 0000000000..6085341e8f --- /dev/null +++ b/services/provider/server/storageclusterpeer.go @@ -0,0 +1,51 @@ +package server + +import ( + "context" + "fmt" + + ocsv1 "github.com/red-hat-storage/ocs-operator/api/v4/v1" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +type storageClusterPeerManager struct { + client client.Client + namespace string +} + +func newStorageClusterPeerManager(cl client.Client, namespace string) (*storageClusterPeerManager, error) { + return &storageClusterPeerManager{ + client: cl, + namespace: namespace, + }, nil +} + +func (s *storageClusterPeerManager) FindStorageClusterPeerWithStorageClusterID(ctx context.Context, storageClusterUID string) (*ocsv1.StorageClusterPeer, error) { + storageClusterPeerList := &ocsv1.StorageClusterPeerList{} + err := s.client.List(ctx, storageClusterPeerList, client.InNamespace(s.namespace)) + if err != nil { + return nil, err + } + for i := range storageClusterPeerList.Items { + token, err := decodeToken(storageClusterPeerList.Items[i].Spec.OnboardingToken) + if err != nil { + return nil, err + } + if token.StorageClusterUID == storageClusterUID { + return &storageClusterPeerList.Items[i], nil + } + } + return nil, fmt.Errorf("StorageClusterPeer linked to StorageCluster with uid %q not found", storageClusterUID) +} + +func (s *storageClusterPeerManager) UpdateStorageClusterPeerStatus(ctx context.Context, storageClusterPeer *ocsv1.StorageClusterPeer, storageClusterUID string) error { + + storageClusterPeer.Status.RemoteStorageClusterUID = storageClusterUID + + if err := s.client.Status().Update(ctx, storageClusterPeer); err != nil { + return fmt.Errorf("failed to patch Status for StorageClusterPeer %v: %v", storageClusterPeer.Name, err) + } + klog.Infof("successfully updated Status for StorageConsumer %v", storageClusterPeer.Name) + return nil +}