Skip to content

Commit

Permalink
storageclusterpeer: setup the reconciler
Browse files Browse the repository at this point in the history
Signed-off-by: Rewant Soni <[email protected]>
  • Loading branch information
rewantsoni committed Feb 19, 2024
1 parent 1c98153 commit ba6d227
Show file tree
Hide file tree
Showing 3 changed files with 182 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"crypto/md5"
"encoding/hex"
"fmt"
"sigs.k8s.io/controller-runtime/pkg/controller"
"strings"

"github.com/go-logr/logr"
Expand Down Expand Up @@ -175,6 +176,7 @@ func (r *StorageClassRequestReconciler) SetupWithManager(mgr ctrl.Manager) error
For(&v1alpha1.StorageClassRequest{}, builder.WithPredicates(
predicate.GenerationChangedPredicate{},
)).
WithOptions(controller.Options{MaxConcurrentReconciles: 1}).
Watches(&rookCephv1.CephBlockPool{}, enqueueForOwner).
Watches(&rookCephv1.CephFilesystemSubVolumeGroup{}, enqueueForOwner).
Watches(&rookCephv1.CephClient{}, enqueueForOwner).
Expand Down
184 changes: 179 additions & 5 deletions controllers/storageclusterpeer/storageclusterpeer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,37 @@ package storageclusterpeer

import (
"context"
"crypto/tls"
"fmt"
v1 "k8s.io/api/core/v1"

"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"time"

"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
ctrllog "sigs.k8s.io/controller-runtime/pkg/log"

ocsv1 "github.com/red-hat-storage/ocs-operator/api/v4/v1"
providerClient "github.com/red-hat-storage/ocs-operator/v4/services/provider/client"
rookCephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1"
)

// StorageClusterPeerReconciler reconciles a StorageClusterPeer object
type StorageClusterPeerReconciler struct {

Check failure on line 44 in controllers/storageclusterpeer/storageclusterpeer_controller.go

View workflow job for this annotation

GitHub Actions / golangci-lint (1.21)

exported: type name will be used as storageclusterpeer.StorageClusterPeerReconciler by other packages, and that stutters; consider calling this Reconciler (revive)

Check warning on line 44 in controllers/storageclusterpeer/storageclusterpeer_controller.go

View workflow job for this annotation

GitHub Actions / golangci-lint (1.21)

exported: type name will be used as storageclusterpeer.StorageClusterPeerReconciler by other packages, and that stutters; consider calling this Reconciler (revive)
client.Client
Scheme *runtime.Scheme
Log logr.Logger

ctx context.Context
storageClusterPeer *ocsv1.StorageClusterPeer
cephBlockPoolList *rookCephv1.CephBlockPoolList
}

//+kubebuilder:rbac:groups=ocs.openshift.io,resources=storageclusterpeers,verbs=get;list;watch;create;update;patch;delete
Expand All @@ -46,17 +64,173 @@ type StorageClusterPeerReconciler struct {
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile
func (r *StorageClusterPeerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
_ = log.FromContext(ctx)
func (r *StorageClusterPeerReconciler) Reconcile(ctx context.Context, request ctrl.Request) (ctrl.Result, error) {

r.ctx = ctrllog.IntoContext(ctx, r.Log)
r.Log.Info("Reconciling StorageClusterPeer.")

// Fetch the StorageClassRequest instance
r.storageClusterPeer = &ocsv1.StorageClusterPeer{}
r.storageClusterPeer.Name = request.Name
r.storageClusterPeer.Namespace = request.Namespace

if err := r.get(r.storageClusterPeer); err != nil {
if errors.IsNotFound(err) {
r.Log.Info("StorageClusterPeer resource not found. Ignoring since object must be deleted.")
return reconcile.Result{}, nil
}
r.Log.Error(err, "Failed to get StorageClusterPeer.")
return reconcile.Result{}, err
}

r.cephBlockPoolList = &rookCephv1.CephBlockPoolList{}
if err := r.list(r.cephBlockPoolList, client.InNamespace(r.storageClusterPeer.Namespace)); err != nil {
return reconcile.Result{}, err
}

// TODO(user): your logic here
var result reconcile.Result
var reconcileError error

return ctrl.Result{}, nil
result, reconcileError = r.reconcilePhases()

return result, reconcileError
}

// SetupWithManager sets up the controller with the Manager.
func (r *StorageClusterPeerReconciler) SetupWithManager(mgr ctrl.Manager) error {

enqueueStorageClusterPeerRequest := handler.EnqueueRequestsFromMapFunc(
func(context context.Context, obj client.Object) []reconcile.Request {
// Get the StorageClusterPeer objects
scpList := &ocsv1.StorageClusterPeerList{}
err := r.Client.List(context, scpList, &client.ListOptions{Namespace: obj.GetNamespace()})
if err != nil {
r.Log.Error(err, "Unable to list StorageClusterPeer objects")
return []reconcile.Request{}
}

// Return name and namespace of the StorageClusterPeers object
request := []reconcile.Request{}
for _, scp := range scpList.Items {
request = append(request, reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: scp.Namespace,
Name: scp.Name,
},
})
}

return request
},
)

return ctrl.NewControllerManagedBy(mgr).
For(&ocsv1.StorageClusterPeer{}).
Watches(&rookCephv1.CephBlockPool{}, enqueueStorageClusterPeerRequest).
Complete(r)
}

func (r *StorageClusterPeerReconciler) reconcilePhases() (reconcile.Result, error) {
r.Log.Info("Running StorageClusterPeer controller")

//marked for deletion
if !r.storageClusterPeer.GetDeletionTimestamp().IsZero() {

Check failure on line 137 in controllers/storageclusterpeer/storageclusterpeer_controller.go

View workflow job for this annotation

GitHub Actions / golangci-lint (1.21)

empty-block: this block is empty, you can remove it (revive)

Check warning on line 137 in controllers/storageclusterpeer/storageclusterpeer_controller.go

View workflow job for this annotation

GitHub Actions / golangci-lint (1.21)

empty-block: this block is empty, you can remove it (revive)

}
// not marked for deletion
ocsClient, err := r.newExternalClient()
if err != nil {
return reconcile.Result{}, err
}
defer ocsClient.Close()

if err := r.reconcileCephBlockPool(); err != nil {
return reconcile.Result{}, err
}

if err := r.reconcilePeerBlockPool(ocsClient); err != nil {
return reconcile.Result{}, err
}

return reconcile.Result{}, nil
}

func (r *StorageClusterPeerReconciler) newExternalClient() (*providerClient.OCSProviderClient, error) {
//TODO: tls config
ocsClient, err := providerClient.NewAuthenticatedOCSClient(
r.ctx, r.storageClusterPeer.Spec.OCSAPIServerURI, time.Second*10, &tls.Config{})
if err != nil {
return nil, fmt.Errorf("failed to create a new ocs client: %v", err)
}

return ocsClient, nil
}

func (r *StorageClusterPeerReconciler) reconcileCephBlockPool() error {

for _, cephBlockPool := range r.cephBlockPoolList.Items {

cephBlockPoolInstance := &rookCephv1.CephBlockPool{}
cephBlockPoolInstance.Name = cephBlockPool.Name
cephBlockPoolInstance.Namespace = cephBlockPool.Namespace

if err := r.get(cephBlockPoolInstance); err != nil {
r.Log.Error(err, "unable to fetch the cephBlockPool",
"CephBlockPool", klog.KRef(cephBlockPool.Namespace, cephBlockPool.Name))
return err
}

cephBlockPoolInstance.Spec.Mirroring.Enabled = true
cephBlockPoolInstance.Spec.Mirroring.Mode = "image"

if err := r.update(cephBlockPoolInstance); err != nil {
r.Log.Error(err, "unable to update the cephBlockPool with mirroring spec",
"CephBlockPool", klog.KRef(cephBlockPool.Namespace, cephBlockPool.Name))
return err
}

}
return nil
}

func (r *StorageClusterPeerReconciler) reconcilePeerBlockPool(ocsClient *providerClient.OCSProviderClient) error {

for _, cephBlockPool := range r.cephBlockPoolList.Items {
if _, ok := cephBlockPool.Status.Info["rbdMirrorBootstrapPeerSecretName"]; !ok {
r.Log.Info("waiting for bootstrap secret to be generated",
"CephBlockPool", klog.KRef(cephBlockPool.Namespace, cephBlockPool.Name))
return fmt.Errorf("waiting for bootstrap secret for %s blockpool in %s namespace to be generated",
cephBlockPool.Name, cephBlockPool.Namespace)
}

secret := &v1.Secret{}
secret.Name = cephBlockPool.Status.Info["rbdMirrorBootstrapPeerSecretName"]
secret.Namespace = cephBlockPool.Namespace

if err := r.get(secret); err != nil {
r.Log.Error(err, "unable to fetch the bootstrap secret for blockPool",
"CephBlockPool", klog.KRef(cephBlockPool.Namespace, cephBlockPool.Name))
return err
}

_, err := ocsClient.PeerBlockPool(r.ctx, cephBlockPool.Name, secret.Data["pool"], secret.Data["token"])
if err != nil {
return err
}

}
return nil
}

func (r *StorageClusterPeerReconciler) get(obj client.Object) error {
key := client.ObjectKeyFromObject(obj)
return r.Client.Get(r.ctx, key, obj)
}

func (r *StorageClusterPeerReconciler) list(obj client.ObjectList, listOptions ...client.ListOption) error {
return r.Client.List(r.ctx, obj, listOptions...)
}

func (r *StorageClusterPeerReconciler) update(obj client.Object) error {
return r.Client.Update(r.ctx, obj)
}
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ import (
apiclient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
metrics "sigs.k8s.io/controller-runtime/pkg/metrics/server"

// +kubebuilder:scaffold:imports
)

Expand Down Expand Up @@ -220,6 +219,7 @@ func main() {
if err = (&storageclusterpeer.StorageClusterPeerReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Log: ctrl.Log.WithName("controllers").WithName("StorageClusterPeer"),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "StorageClusterPeer")
os.Exit(1)
Expand Down

0 comments on commit ba6d227

Please sign in to comment.