Skip to content

Commit

Permalink
implement server side rpc call for peering
Browse files Browse the repository at this point in the history
Signed-off-by: Rewant Soni <[email protected]>
  • Loading branch information
rewantsoni committed Nov 6, 2024
1 parent 51fd45d commit e0ed19e
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 23 deletions.
9 changes: 9 additions & 0 deletions rbac/provider-role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,12 @@ rules:
verbs:
- get
- list
- apiGroups:
- ocs.openshift.io
resources:
- storageclusterpeers
- storageclusterpeers/status
verbs:
- get
- list
- update
108 changes: 85 additions & 23 deletions services/provider/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,11 @@ const (

type OCSProviderServer struct {
pb.UnimplementedOCSProviderServer
client client.Client
consumerManager *ocsConsumerManager
storageRequestManager *storageRequestManager
namespace string
client client.Client
consumerManager *ocsConsumerManager
storageRequestManager *storageRequestManager
storageClusterPeerManager *storageClusterPeerManager
namespace string
}

func NewOCSProviderServer(ctx context.Context, namespace string) (*OCSProviderServer, error) {
Expand All @@ -93,11 +94,17 @@ func NewOCSProviderServer(ctx context.Context, namespace string) (*OCSProviderSe
return nil, fmt.Errorf("failed to create new StorageRequest instance. %v", err)
}

storageClusterPeerManager, err := newStorageClusterPeerManager(client, namespace)
if err != nil {
return nil, fmt.Errorf("failed to create new StorageClusterPeer instance. %v", err)
}

return &OCSProviderServer{
client: client,
consumerManager: consumerManager,
storageRequestManager: storageRequestManager,
namespace: namespace,
client: client,
consumerManager: consumerManager,
storageRequestManager: storageRequestManager,
storageClusterPeerManager: storageClusterPeerManager,
namespace: namespace,
}, nil
}

Expand Down Expand Up @@ -141,7 +148,7 @@ func (s *OCSProviderServer) OnboardConsumer(ctx context.Context, req *pb.Onboard
storageQuotaInGiB := ptr.Deref(onboardingTicket.StorageQuotaInGiB, 0)

if onboardingTicket.SubjectRole != services.ClientRole {
err := fmt.Errorf("unsupported ticket role for consumer %q, found %s, expected %s", req.ConsumerName, onboardingTicket.SubjectRole, services.ClientRole)
err := fmt.Errorf("invalid onboarding ticket for %q, expecting role %s found role %s", req.ConsumerName, services.ClientRole, onboardingTicket.SubjectRole)
klog.Error(err)
return nil, status.Error(codes.InvalidArgument, err.Error())
}
Expand Down Expand Up @@ -535,14 +542,9 @@ func getSubVolumeGroupClusterID(subVolumeGroup *rookCephv1.CephFilesystemSubVolu
}

func decodeAndValidateTicket(ticket string, pubKey *rsa.PublicKey) (*services.OnboardingTicket, error) {
ticketArr := strings.Split(string(ticket), ".")
if len(ticketArr) != 2 {
return nil, fmt.Errorf("invalid ticket")
}

message, err := base64.StdEncoding.DecodeString(ticketArr[0])
message, signature, err := decodeTicket(ticket)
if err != nil {
return nil, fmt.Errorf("failed to decode onboarding ticket: %v", err)
return nil, err
}

var ticketData services.OnboardingTicket
Expand All @@ -564,11 +566,6 @@ func decodeAndValidateTicket(ticket string, pubKey *rsa.PublicKey) (*services.On
return nil, fmt.Errorf("invalid onboarding ticket subject role")
}

signature, err := base64.StdEncoding.DecodeString(ticketArr[1])
if err != nil {
return nil, fmt.Errorf("failed to decode onboarding ticket %s signature: %v", ticketData.ID, err)
}

hash := sha256.Sum256(message)
err = rsa.VerifyPKCS1v15(pubKey, crypto.SHA256, hash[:], signature)
if err != nil {
Expand All @@ -584,6 +581,25 @@ func decodeAndValidateTicket(ticket string, pubKey *rsa.PublicKey) (*services.On
return &ticketData, nil
}

func decodeTicket(ticket string) ([]byte, []byte, error) {
ticketArr := strings.Split(string(ticket), ".")
if len(ticketArr) != 2 {
return nil, nil, fmt.Errorf("invalid ticket")
}

message, err := base64.StdEncoding.DecodeString(ticketArr[0])
if err != nil {
return nil, nil, fmt.Errorf("failed to decode onboarding ticket message: %v", err)
}

signature, err := base64.StdEncoding.DecodeString(ticketArr[1])
if err != nil {
return nil, nil, fmt.Errorf("failed to decode onboarding ticket signature: %v", err)
}

return message, signature, nil
}

// FulfillStorageClaim RPC call to create the StorageClaim CR on
// provider cluster.
func (s *OCSProviderServer) FulfillStorageClaim(ctx context.Context, req *pb.FulfillStorageClaimRequest) (*pb.FulfillStorageClaimResponse, error) {
Expand Down Expand Up @@ -925,6 +941,52 @@ func extractMonitorIps(data string) ([]string, error) {
return ips, nil
}

func (s *OCSProviderServer) PeerStorageCluster(_ context.Context, _ *pb.PeerStorageClusterRequest) (*pb.PeerStorageClusterResponse, error) {
return &pb.PeerStorageClusterResponse{}, nil
func (s *OCSProviderServer) PeerStorageCluster(ctx context.Context, req *pb.PeerStorageClusterRequest) (*pb.PeerStorageClusterResponse, error) {

pubKey, err := s.getOnboardingValidationKey(ctx)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to get public key to validate peer onboarding ticket %v", err)
}

onboardingToken, err := decodeAndValidateTicket(req.OnboardingToken, pubKey)
if err != nil {
klog.Errorf("Invalid onboarding token. %v", err)
return nil, status.Errorf(codes.InvalidArgument, "onboarding ticket is invalid. %v", err)
}

if onboardingToken.SubjectRole != services.PeerRole {
err := fmt.Errorf("invalid onboarding ticket for %q, expecting role %s found role %s", req.StorageClusterUID, services.PeerRole, onboardingToken.SubjectRole)
klog.Error(err)
return nil, status.Error(codes.InvalidArgument, err.Error())
}

storageCluster, err := util.GetStorageClusterInNamespace(ctx, s.client, s.namespace)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to get storageCluster. %v", err)
}

if storageCluster.UID != onboardingToken.StorageCluster {
return nil, status.Errorf(codes.InvalidArgument, "storageClusterUID specified on the onboarding token does not match any existing storage cluster")
}

klog.Infof("Found StorageCluster %s for PeerStorageCluster", storageCluster.Name)

storageClusterPeer, err := s.storageClusterPeerManager.GetByStorageClusterUID(ctx, types.UID(req.StorageClusterUID))
if err != nil {
klog.Error(err)
return nil, status.Error(codes.InvalidArgument, err.Error())
}

klog.Infof("Found StorageClusterPeer %s for PeerStorageCluster", storageClusterPeer.Name)

//TODO: Should we store the onboarding token in storageClusterPeer?

storageClusterPeer.Status.PeerInfo.StorageClusterUid = req.StorageClusterUID
err = s.storageClusterPeerManager.UpdateStatus(ctx, storageClusterPeer)
if err != nil {
klog.Error(err)
return nil, status.Error(codes.InvalidArgument, err.Error())
}

return &pb.PeerStorageClusterResponse{StorageClusterUID: string(storageCluster.UID)}, nil
}
58 changes: 58 additions & 0 deletions services/provider/server/storageclusterpeer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package server

import (
"context"
"encoding/json"
"fmt"
"github.com/red-hat-storage/ocs-operator/v4/services"
"k8s.io/apimachinery/pkg/types"

ocsv1 "github.com/red-hat-storage/ocs-operator/api/v4/v1"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type storageClusterPeerManager struct {
client client.Client
namespace string
}

func newStorageClusterPeerManager(cl client.Client, namespace string) (*storageClusterPeerManager, error) {
return &storageClusterPeerManager{
client: cl,
namespace: namespace,
}, nil
}

func (s *storageClusterPeerManager) GetByStorageClusterUID(ctx context.Context, storageClusterUid types.UID) (*ocsv1.StorageClusterPeer, error) {
storageClusterPeerList := &ocsv1.StorageClusterPeerList{}
err := s.client.List(ctx, storageClusterPeerList, client.InNamespace(s.namespace))
if err != nil {
return nil, err
}
for i := range storageClusterPeerList.Items {
message, _, err := decodeTicket(storageClusterPeerList.Items[i].Spec.OnboardingToken)
if err != nil {
return nil, err
}

var ticketData services.OnboardingTicket
err = json.Unmarshal(message, &ticketData)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal onboarding ticket message. %v", err)
}

if ticketData.StorageCluster == storageClusterUid {
return &storageClusterPeerList.Items[i], nil
}
}
return nil, fmt.Errorf("StorageClusterPeer linked to StorageCluster with uid %q not found", storageClusterUid)
}

func (s *storageClusterPeerManager) UpdateStatus(ctx context.Context, storageClusterPeer client.Object) error {
if err := s.client.Status().Update(ctx, storageClusterPeer); err != nil {
return fmt.Errorf("failed to update Status for StorageClusterPeer %v: %v", storageClusterPeer.GetName(), err)
}
klog.Infof("successfully updated Status for StorageClusterPeer %v", storageClusterPeer.GetName())
return nil
}

0 comments on commit e0ed19e

Please sign in to comment.