From 3271af93a0425819ad147f2d794fe69cb3bcb2bc Mon Sep 17 00:00:00 2001 From: Rewant Soni Date: Thu, 7 Nov 2024 16:33:13 +0530 Subject: [PATCH] implemenent the server side rpc call for peering Signed-off-by: Rewant Soni --- .../ocs-operator/manifests/provider-role.yaml | 7 ++ rbac/provider-role.yaml | 7 ++ services/provider/server/server.go | 69 ++++++++++++++----- .../provider/server/storageclusterpeer.go | 41 +++++++++++ 4 files changed, 108 insertions(+), 16 deletions(-) create mode 100644 services/provider/server/storageclusterpeer.go diff --git a/deploy/ocs-operator/manifests/provider-role.yaml b/deploy/ocs-operator/manifests/provider-role.yaml index dd1c692975..42259a455d 100644 --- a/deploy/ocs-operator/manifests/provider-role.yaml +++ b/deploy/ocs-operator/manifests/provider-role.yaml @@ -75,3 +75,10 @@ rules: verbs: - get - list + - apiGroups: + - ocs.openshift.io + resources: + - storageclusterpeers + verbs: + - get + - list diff --git a/rbac/provider-role.yaml b/rbac/provider-role.yaml index dd1c692975..42259a455d 100644 --- a/rbac/provider-role.yaml +++ b/rbac/provider-role.yaml @@ -75,3 +75,10 @@ rules: verbs: - get - list + - apiGroups: + - ocs.openshift.io + resources: + - storageclusterpeers + verbs: + - get + - list diff --git a/services/provider/server/server.go b/services/provider/server/server.go index 1e7f82ae4a..3952b47827 100644 --- a/services/provider/server/server.go +++ b/services/provider/server/server.go @@ -66,10 +66,11 @@ const ( type OCSProviderServer struct { pb.UnimplementedOCSProviderServer - client client.Client - consumerManager *ocsConsumerManager - storageRequestManager *storageRequestManager - namespace string + client client.Client + consumerManager *ocsConsumerManager + storageRequestManager *storageRequestManager + storageClusterPeerManager *storageClusterPeerManager + namespace string } func NewOCSProviderServer(ctx context.Context, namespace string) (*OCSProviderServer, error) { @@ -93,11 +94,17 @@ func NewOCSProviderServer(ctx context.Context, namespace string) (*OCSProviderSe return nil, fmt.Errorf("failed to create new StorageRequest instance. %v", err) } + storageClusterPeerManager, err := newStorageClusterPeerManager(client, namespace) + if err != nil { + return nil, fmt.Errorf("failed to create new StorageClusterPeer instance. %v", err) + } + return &OCSProviderServer{ - client: client, - consumerManager: consumerManager, - storageRequestManager: storageRequestManager, - namespace: namespace, + client: client, + consumerManager: consumerManager, + storageRequestManager: storageRequestManager, + storageClusterPeerManager: storageClusterPeerManager, + namespace: namespace, }, nil } @@ -141,7 +148,7 @@ func (s *OCSProviderServer) OnboardConsumer(ctx context.Context, req *pb.Onboard storageQuotaInGiB := ptr.Deref(onboardingTicket.StorageQuotaInGiB, 0) if onboardingTicket.SubjectRole != services.ClientRole { - err := fmt.Errorf("unsupported ticket role for consumer %q, found %s, expected %s", req.ConsumerName, onboardingTicket.SubjectRole, services.ClientRole) + err := fmt.Errorf("invalid onboarding ticket for %q, expecting role %s found role %s", req.ConsumerName, services.ClientRole, onboardingTicket.SubjectRole) klog.Error(err) return nil, status.Error(codes.InvalidArgument, err.Error()) } @@ -551,6 +558,11 @@ func decodeAndValidateTicket(ticket string, pubKey *rsa.PublicKey) (*services.On return nil, fmt.Errorf("failed to unmarshal onboarding ticket message. %v", err) } + signature, err := base64.StdEncoding.DecodeString(ticketArr[1]) + if err != nil { + return nil, fmt.Errorf("failed to decode onboarding ticket %s signature: %v", ticketData.ID, err) + } + switch ticketData.SubjectRole { case services.ClientRole: if ticketData.StorageQuotaInGiB != nil { @@ -564,11 +576,6 @@ func decodeAndValidateTicket(ticket string, pubKey *rsa.PublicKey) (*services.On return nil, fmt.Errorf("invalid onboarding ticket subject role") } - signature, err := base64.StdEncoding.DecodeString(ticketArr[1]) - if err != nil { - return nil, fmt.Errorf("failed to decode onboarding ticket %s signature: %v", ticketData.ID, err) - } - hash := sha256.Sum256(message) err = rsa.VerifyPKCS1v15(pubKey, crypto.SHA256, hash[:], signature) if err != nil { @@ -925,6 +932,36 @@ func extractMonitorIps(data string) ([]string, error) { return ips, nil } -func (s *OCSProviderServer) PeerStorageCluster(_ context.Context, _ *pb.PeerStorageClusterRequest) (*pb.PeerStorageClusterResponse, error) { - return &pb.PeerStorageClusterResponse{}, nil +func (s *OCSProviderServer) PeerStorageCluster(ctx context.Context, req *pb.PeerStorageClusterRequest) (*pb.PeerStorageClusterResponse, error) { + + pubKey, err := s.getOnboardingValidationKey(ctx) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to get public key to validate peer onboarding ticket %v", err) + } + + onboardingToken, err := decodeAndValidateTicket(req.OnboardingToken, pubKey) + if err != nil { + klog.Errorf("Invalid onboarding token. %v", err) + return nil, status.Errorf(codes.InvalidArgument, "onboarding ticket is invalid. %v", err) + } + + if onboardingToken.SubjectRole != services.PeerRole { + err := fmt.Errorf("invalid onboarding ticket for %q, expecting role %s found role %s", req.StorageClusterUID, services.PeerRole, onboardingToken.SubjectRole) + klog.Error(err) + return nil, status.Error(codes.InvalidArgument, err.Error()) + } + + storageClusterPeer, err := s.storageClusterPeerManager.GetByPeerStorageClusterUID(ctx, types.UID(req.StorageClusterUID)) + if err != nil { + klog.Error(err) + return nil, err + } + + klog.Infof("Found StorageClusterPeer %s for PeerStorageCluster", storageClusterPeer.Name) + + if storageClusterPeer.Status.State == ocsv1.StorageClusterPeerStatePending || storageClusterPeer.Status.State == ocsv1.StorageClusterPeerStatePeered { + return &pb.PeerStorageClusterResponse{}, nil + } + + return nil, status.Errorf(codes.NotFound, "Peer Cluster is not reachable") } diff --git a/services/provider/server/storageclusterpeer.go b/services/provider/server/storageclusterpeer.go new file mode 100644 index 0000000000..610a24829e --- /dev/null +++ b/services/provider/server/storageclusterpeer.go @@ -0,0 +1,41 @@ +package server + +import ( + "context" + "fmt" + ocsv1 "github.com/red-hat-storage/ocs-operator/api/v4/v1" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "sigs.k8s.io/controller-runtime/pkg/client" + + "k8s.io/apimachinery/pkg/types" +) + +type storageClusterPeerManager struct { + client client.Client + namespace string +} + +func newStorageClusterPeerManager(cl client.Client, namespace string) (*storageClusterPeerManager, error) { + return &storageClusterPeerManager{ + client: cl, + namespace: namespace, + }, nil +} + +func (s *storageClusterPeerManager) GetByPeerStorageClusterUID(ctx context.Context, storageClusterUID types.UID) (*ocsv1.StorageClusterPeer, error) { + storageClusterPeerList := &ocsv1.StorageClusterPeerList{} + err := s.client.List(ctx, storageClusterPeerList, client.InNamespace(s.namespace)) + if err != nil { + return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("failed to list StorageClusterPeer: %v", err)) + } + for i := range storageClusterPeerList.Items { + storageClusterPeer := &storageClusterPeerList.Items[i] + if storageClusterPeer.Status.PeerInfo != nil { + if storageClusterUID == types.UID(storageClusterPeer.Status.PeerInfo.StorageClusterUid) { + return storageClusterPeer, nil + } + } + } + return nil, status.Error(codes.NotFound, fmt.Sprintf("StorageClusterPeer linked to StorageCluster with uid %q not found", storageClusterUID)) +}