From 5385c9eb5308848046393f7a13309dabbb156607 Mon Sep 17 00:00:00 2001 From: Rewant Soni Date: Wed, 9 Oct 2024 12:07:26 +0530 Subject: [PATCH] implement server side rpc call for peering Signed-off-by: Rewant Soni --- rbac/provider-role.yaml | 7 ++ services/provider/server/server.go | 80 +++++++++++++++---- .../provider/server/storageclusterpeer.go | 66 +++++++++++++++ 3 files changed, 137 insertions(+), 16 deletions(-) create mode 100644 services/provider/server/storageclusterpeer.go diff --git a/rbac/provider-role.yaml b/rbac/provider-role.yaml index dd1c692975..42259a455d 100644 --- a/rbac/provider-role.yaml +++ b/rbac/provider-role.yaml @@ -75,3 +75,10 @@ rules: verbs: - get - list + - apiGroups: + - ocs.openshift.io + resources: + - storageclusterpeers + verbs: + - get + - list diff --git a/services/provider/server/server.go b/services/provider/server/server.go index 1e7f82ae4a..71a6a800e8 100644 --- a/services/provider/server/server.go +++ b/services/provider/server/server.go @@ -66,10 +66,11 @@ const ( type OCSProviderServer struct { pb.UnimplementedOCSProviderServer - client client.Client - consumerManager *ocsConsumerManager - storageRequestManager *storageRequestManager - namespace string + client client.Client + consumerManager *ocsConsumerManager + storageRequestManager *storageRequestManager + storageClusterPeerManager *storageClusterPeerManager + namespace string } func NewOCSProviderServer(ctx context.Context, namespace string) (*OCSProviderServer, error) { @@ -93,11 +94,17 @@ func NewOCSProviderServer(ctx context.Context, namespace string) (*OCSProviderSe return nil, fmt.Errorf("failed to create new StorageRequest instance. %v", err) } + storageClusterPeerManager, err := newStorageClusterPeerManager(client, namespace) + if err != nil { + return nil, fmt.Errorf("failed to create new StorageClusterPeer instance. %v", err) + } + return &OCSProviderServer{ - client: client, - consumerManager: consumerManager, - storageRequestManager: storageRequestManager, - namespace: namespace, + client: client, + consumerManager: consumerManager, + storageRequestManager: storageRequestManager, + storageClusterPeerManager: storageClusterPeerManager, + namespace: namespace, }, nil } @@ -141,7 +148,7 @@ func (s *OCSProviderServer) OnboardConsumer(ctx context.Context, req *pb.Onboard storageQuotaInGiB := ptr.Deref(onboardingTicket.StorageQuotaInGiB, 0) if onboardingTicket.SubjectRole != services.ClientRole { - err := fmt.Errorf("unsupported ticket role for consumer %q, found %s, expected %s", req.ConsumerName, onboardingTicket.SubjectRole, services.ClientRole) + err := fmt.Errorf("invalid onboarding ticket for %q, expecting role %s found role %s", req.ConsumerName, services.ClientRole, onboardingTicket.SubjectRole) klog.Error(err) return nil, status.Error(codes.InvalidArgument, err.Error()) } @@ -551,6 +558,11 @@ func decodeAndValidateTicket(ticket string, pubKey *rsa.PublicKey) (*services.On return nil, fmt.Errorf("failed to unmarshal onboarding ticket message. %v", err) } + signature, err := base64.StdEncoding.DecodeString(ticketArr[1]) + if err != nil { + return nil, fmt.Errorf("failed to decode onboarding ticket %s signature: %v", ticketData.ID, err) + } + switch ticketData.SubjectRole { case services.ClientRole: if ticketData.StorageQuotaInGiB != nil { @@ -564,11 +576,6 @@ func decodeAndValidateTicket(ticket string, pubKey *rsa.PublicKey) (*services.On return nil, fmt.Errorf("invalid onboarding ticket subject role") } - signature, err := base64.StdEncoding.DecodeString(ticketArr[1]) - if err != nil { - return nil, fmt.Errorf("failed to decode onboarding ticket %s signature: %v", ticketData.ID, err) - } - hash := sha256.Sum256(message) err = rsa.VerifyPKCS1v15(pubKey, crypto.SHA256, hash[:], signature) if err != nil { @@ -925,6 +932,47 @@ func extractMonitorIps(data string) ([]string, error) { return ips, nil } -func (s *OCSProviderServer) PeerStorageCluster(_ context.Context, _ *pb.PeerStorageClusterRequest) (*pb.PeerStorageClusterResponse, error) { - return &pb.PeerStorageClusterResponse{}, nil +func (s *OCSProviderServer) PeerStorageCluster(ctx context.Context, req *pb.PeerStorageClusterRequest) (*pb.PeerStorageClusterResponse, error) { + + pubKey, err := s.getOnboardingValidationKey(ctx) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to get public key to validate peer onboarding ticket %v", err) + } + + onboardingToken, err := decodeAndValidateTicket(req.OnboardingToken, pubKey) + if err != nil { + klog.Errorf("Invalid onboarding token. %v", err) + return nil, status.Errorf(codes.InvalidArgument, "onboarding ticket is invalid. %v", err) + } + + if onboardingToken.SubjectRole != services.PeerRole { + err := fmt.Errorf("invalid onboarding ticket for %q, expecting role %s found role %s", req.StorageClusterUID, services.PeerRole, onboardingToken.SubjectRole) + klog.Error(err) + return nil, status.Error(codes.InvalidArgument, err.Error()) + } + + storageCluster, err := util.GetStorageClusterInNamespace(ctx, s.client, s.namespace) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to get storageCluster. %v", err) + } + + if storageCluster.UID != onboardingToken.StorageCluster { + return nil, status.Errorf(codes.InvalidArgument, "storageClusterUID specified on the onboarding token does not match any existing storage cluster") + } + + klog.Infof("Found StorageCluster %s for PeerStorageCluster", storageCluster.Name) + + storageClusterPeer, err := s.storageClusterPeerManager.GetByStorageClusterUID(ctx, types.UID(req.StorageClusterUID)) + if err != nil { + klog.Error(err) + return nil, status.Error(codes.InvalidArgument, err.Error()) + } + + klog.Infof("Found StorageClusterPeer %s for PeerStorageCluster", storageClusterPeer.Name) + + if storageClusterPeer.Status.State == ocsv1.StorageClusterPeerStatePending || storageClusterPeer.Status.State == ocsv1.StorageClusterPeerStatePeered { + return &pb.PeerStorageClusterResponse{StorageClusterUID: string(storageCluster.UID)}, nil + } + + return nil, status.Errorf(codes.InvalidArgument, "Peer Cluster is not reachable") } diff --git a/services/provider/server/storageclusterpeer.go b/services/provider/server/storageclusterpeer.go new file mode 100644 index 0000000000..d7eb071d9a --- /dev/null +++ b/services/provider/server/storageclusterpeer.go @@ -0,0 +1,66 @@ +package server + +import ( + "context" + "encoding/base64" + "encoding/json" + "fmt" + "strings" + + ocsv1 "github.com/red-hat-storage/ocs-operator/api/v4/v1" + "github.com/red-hat-storage/ocs-operator/v4/services" + "sigs.k8s.io/controller-runtime/pkg/client" + + "k8s.io/apimachinery/pkg/types" +) + +type storageClusterPeerManager struct { + client client.Client + namespace string +} + +func newStorageClusterPeerManager(cl client.Client, namespace string) (*storageClusterPeerManager, error) { + return &storageClusterPeerManager{ + client: cl, + namespace: namespace, + }, nil +} + +func (s *storageClusterPeerManager) GetByStorageClusterUID(ctx context.Context, storageClusterUID types.UID) (*ocsv1.StorageClusterPeer, error) { + storageClusterPeerList := &ocsv1.StorageClusterPeerList{} + err := s.client.List(ctx, storageClusterPeerList, client.InNamespace(s.namespace)) + if err != nil { + return nil, err + } + for i := range storageClusterPeerList.Items { + storageClusterUIDInTicket, err := readStorageClusterUID(storageClusterPeerList.Items[i].Spec.OnboardingToken) + if err != nil { + return nil, err + } + + if storageClusterUIDInTicket == storageClusterUID { + return &storageClusterPeerList.Items[i], nil + } + } + return nil, fmt.Errorf("StorageClusterPeer linked to StorageCluster with uid %q not found", storageClusterUID) +} + +func readStorageClusterUID(ticket string) (types.UID, error) { + ticketArr := strings.Split(string(ticket), ".") + if len(ticketArr) != 2 { + return "", fmt.Errorf("invalid ticket") + } + + message, err := base64.StdEncoding.DecodeString(ticketArr[0]) + if err != nil { + return "", fmt.Errorf("failed to decode onboarding ticket message: %v", err) + } + + var ticketData services.OnboardingTicket + err = json.Unmarshal(message, &ticketData) + if err != nil { + return "", fmt.Errorf("failed to unmarshal onboarding ticket message. %v", err) + } + + return ticketData.StorageCluster, nil +}