Skip to content

Commit

Permalink
implemenent the server side rpc call for peering
Browse files Browse the repository at this point in the history
Signed-off-by: Rewant Soni <[email protected]>
  • Loading branch information
rewantsoni committed Nov 7, 2024
1 parent 20951c4 commit 3271af9
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 16 deletions.
7 changes: 7 additions & 0 deletions deploy/ocs-operator/manifests/provider-role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,10 @@ rules:
verbs:
- get
- list
- apiGroups:
- ocs.openshift.io
resources:
- storageclusterpeers
verbs:
- get
- list
7 changes: 7 additions & 0 deletions rbac/provider-role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,10 @@ rules:
verbs:
- get
- list
- apiGroups:
- ocs.openshift.io
resources:
- storageclusterpeers
verbs:
- get
- list
69 changes: 53 additions & 16 deletions services/provider/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,11 @@ const (

type OCSProviderServer struct {
pb.UnimplementedOCSProviderServer
client client.Client
consumerManager *ocsConsumerManager
storageRequestManager *storageRequestManager
namespace string
client client.Client
consumerManager *ocsConsumerManager
storageRequestManager *storageRequestManager
storageClusterPeerManager *storageClusterPeerManager
namespace string
}

func NewOCSProviderServer(ctx context.Context, namespace string) (*OCSProviderServer, error) {
Expand All @@ -93,11 +94,17 @@ func NewOCSProviderServer(ctx context.Context, namespace string) (*OCSProviderSe
return nil, fmt.Errorf("failed to create new StorageRequest instance. %v", err)
}

storageClusterPeerManager, err := newStorageClusterPeerManager(client, namespace)
if err != nil {
return nil, fmt.Errorf("failed to create new StorageClusterPeer instance. %v", err)
}

return &OCSProviderServer{
client: client,
consumerManager: consumerManager,
storageRequestManager: storageRequestManager,
namespace: namespace,
client: client,
consumerManager: consumerManager,
storageRequestManager: storageRequestManager,
storageClusterPeerManager: storageClusterPeerManager,
namespace: namespace,
}, nil
}

Expand Down Expand Up @@ -141,7 +148,7 @@ func (s *OCSProviderServer) OnboardConsumer(ctx context.Context, req *pb.Onboard
storageQuotaInGiB := ptr.Deref(onboardingTicket.StorageQuotaInGiB, 0)

if onboardingTicket.SubjectRole != services.ClientRole {
err := fmt.Errorf("unsupported ticket role for consumer %q, found %s, expected %s", req.ConsumerName, onboardingTicket.SubjectRole, services.ClientRole)
err := fmt.Errorf("invalid onboarding ticket for %q, expecting role %s found role %s", req.ConsumerName, services.ClientRole, onboardingTicket.SubjectRole)
klog.Error(err)
return nil, status.Error(codes.InvalidArgument, err.Error())
}
Expand Down Expand Up @@ -551,6 +558,11 @@ func decodeAndValidateTicket(ticket string, pubKey *rsa.PublicKey) (*services.On
return nil, fmt.Errorf("failed to unmarshal onboarding ticket message. %v", err)
}

signature, err := base64.StdEncoding.DecodeString(ticketArr[1])
if err != nil {
return nil, fmt.Errorf("failed to decode onboarding ticket %s signature: %v", ticketData.ID, err)
}

switch ticketData.SubjectRole {
case services.ClientRole:
if ticketData.StorageQuotaInGiB != nil {
Expand All @@ -564,11 +576,6 @@ func decodeAndValidateTicket(ticket string, pubKey *rsa.PublicKey) (*services.On
return nil, fmt.Errorf("invalid onboarding ticket subject role")
}

signature, err := base64.StdEncoding.DecodeString(ticketArr[1])
if err != nil {
return nil, fmt.Errorf("failed to decode onboarding ticket %s signature: %v", ticketData.ID, err)
}

hash := sha256.Sum256(message)
err = rsa.VerifyPKCS1v15(pubKey, crypto.SHA256, hash[:], signature)
if err != nil {
Expand Down Expand Up @@ -925,6 +932,36 @@ func extractMonitorIps(data string) ([]string, error) {
return ips, nil
}

func (s *OCSProviderServer) PeerStorageCluster(_ context.Context, _ *pb.PeerStorageClusterRequest) (*pb.PeerStorageClusterResponse, error) {
return &pb.PeerStorageClusterResponse{}, nil
func (s *OCSProviderServer) PeerStorageCluster(ctx context.Context, req *pb.PeerStorageClusterRequest) (*pb.PeerStorageClusterResponse, error) {

pubKey, err := s.getOnboardingValidationKey(ctx)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to get public key to validate peer onboarding ticket %v", err)
}

onboardingToken, err := decodeAndValidateTicket(req.OnboardingToken, pubKey)
if err != nil {
klog.Errorf("Invalid onboarding token. %v", err)
return nil, status.Errorf(codes.InvalidArgument, "onboarding ticket is invalid. %v", err)
}

if onboardingToken.SubjectRole != services.PeerRole {
err := fmt.Errorf("invalid onboarding ticket for %q, expecting role %s found role %s", req.StorageClusterUID, services.PeerRole, onboardingToken.SubjectRole)
klog.Error(err)
return nil, status.Error(codes.InvalidArgument, err.Error())
}

storageClusterPeer, err := s.storageClusterPeerManager.GetByPeerStorageClusterUID(ctx, types.UID(req.StorageClusterUID))
if err != nil {
klog.Error(err)
return nil, err
}

klog.Infof("Found StorageClusterPeer %s for PeerStorageCluster", storageClusterPeer.Name)

if storageClusterPeer.Status.State == ocsv1.StorageClusterPeerStatePending || storageClusterPeer.Status.State == ocsv1.StorageClusterPeerStatePeered {
return &pb.PeerStorageClusterResponse{}, nil
}

return nil, status.Errorf(codes.NotFound, "Peer Cluster is not reachable")
}
41 changes: 41 additions & 0 deletions services/provider/server/storageclusterpeer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package server

import (
"context"
"fmt"
ocsv1 "github.com/red-hat-storage/ocs-operator/api/v4/v1"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"sigs.k8s.io/controller-runtime/pkg/client"

"k8s.io/apimachinery/pkg/types"
)

type storageClusterPeerManager struct {
client client.Client
namespace string
}

func newStorageClusterPeerManager(cl client.Client, namespace string) (*storageClusterPeerManager, error) {
return &storageClusterPeerManager{
client: cl,
namespace: namespace,
}, nil
}

func (s *storageClusterPeerManager) GetByPeerStorageClusterUID(ctx context.Context, storageClusterUID types.UID) (*ocsv1.StorageClusterPeer, error) {
storageClusterPeerList := &ocsv1.StorageClusterPeerList{}
err := s.client.List(ctx, storageClusterPeerList, client.InNamespace(s.namespace))
if err != nil {
return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("failed to list StorageClusterPeer: %v", err))
}
for i := range storageClusterPeerList.Items {
storageClusterPeer := &storageClusterPeerList.Items[i]
if storageClusterPeer.Status.PeerInfo != nil {
if storageClusterUID == types.UID(storageClusterPeer.Status.PeerInfo.StorageClusterUid) {
return storageClusterPeer, nil
}
}
}
return nil, status.Error(codes.NotFound, fmt.Sprintf("StorageClusterPeer linked to StorageCluster with uid %q not found", storageClusterUID))
}

0 comments on commit 3271af9

Please sign in to comment.