diff --git a/cmd/controller/antplugins/filter/localline.go b/cmd/controller/antplugins/filter/localline.go deleted file mode 100644 index 183c606..0000000 --- a/cmd/controller/antplugins/filter/localline.go +++ /dev/null @@ -1,58 +0,0 @@ -package filter - -import ( - v1 "code.alipay.com/dbplatform/node-disk-controller/pkg/api/volume.antstor.alipay.com/v1" - "code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/scheduler/filter" - "code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/state" - "code.alipay.com/dbplatform/node-disk-controller/pkg/util" - "k8s.io/klog/v2" -) - -const ( - minLocalStoragePct float64 = 20 - // - ReasonLocalStorageTooLow = "LocalStorageTooLow" -) - -// MinLocalStorageFilterFunc ensures local storage cannot be less than 20% of total storage -func MinLocalStorageFilterFunc(ctx *filter.FilterContext, n *state.Node, vol *v1.AntstorVolume) bool { - var err = ctx.Error - var isLocalVol = vol.Spec.HostNode.ID == n.Pool.Spec.NodeInfo.ID - - if !isLocalVol { - allocRemotes := n.GetAllocatedRemoteBytes() - total := n.Pool.GetAvailableBytes() - localPct := float64(total-int64(allocRemotes)-int64(vol.Spec.SizeByte)) / float64(total) * 100 - if localPct < minLocalStoragePct { - klog.Infof("[SchedFail] vol=%s Pool %s local-storage pct too low (%f)", vol.Name, n.Pool.Name, localPct) - err.AddReason(ReasonLocalStorageTooLow) - return false - } - } - - return true -} - -func GetAllocatableRemoveVolumeSize(node *state.Node, volSize int64) (result int64) { - result = volSize - if result == 0 { - return - } - if node != nil { - allocRemotes := node.GetAllocatedRemoteBytes() - total := node.Pool.GetAvailableBytes() - // maxResultSize := int64(float64(total)*(100-minLocalStoragePct)*100) - int64(allocRemotes) - maxResultSize := total - int64(float64(total)*minLocalStoragePct/100) - int64(allocRemotes) - // cannot allocate remote volume - if maxResultSize < 0 { - return 0 - } - - if int64(maxResultSize) < result { - result = int64(maxResultSize) - } - } - - result = result / util.FourMiB * util.FourMiB - return -} diff --git a/cmd/controller/antplugins/localstorage.go b/cmd/controller/antplugins/localstorage.go index f356138..1f7efd3 100644 --- a/cmd/controller/antplugins/localstorage.go +++ b/cmd/controller/antplugins/localstorage.go @@ -84,8 +84,8 @@ func (r *ReportLocalStoragePlugin) Reconcile(ctx *plugin.Context) (result plugin r.PoolUtil = kubeutil.NewStoragePoolUtil(cli) } - volume, isVolume = ctx.Object.(*v1.AntstorVolume) - pool, isPool = ctx.Object.(*v1.StoragePool) + volume, isVolume = ctx.ReqCtx.Object.(*v1.AntstorVolume) + pool, isPool = ctx.ReqCtx.Object.(*v1.StoragePool) if !isVolume && !isPool { err = fmt.Errorf("obj is not *v1.AntstorVolume or *v1.StoragePool") diff --git a/cmd/controller/antplugins/patchpv.go b/cmd/controller/antplugins/patchpv.go index d67796a..9d54707 100644 --- a/cmd/controller/antplugins/patchpv.go +++ b/cmd/controller/antplugins/patchpv.go @@ -32,7 +32,7 @@ func (p *PatchPVPlugin) Reconcile(ctx *plugin.Context) (result plugin.Result) { var ( log = ctx.Log - obj = ctx.Object + obj = ctx.ReqCtx.Object volume *v1.AntstorVolume ok bool err error diff --git a/cmd/controller/main.go b/cmd/controller/main.go index 0d1bf5d..0368a7d 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -6,27 +6,23 @@ import ( antplugin "code.alipay.com/dbplatform/node-disk-controller/cmd/controller/antplugins" antfilter "code.alipay.com/dbplatform/node-disk-controller/cmd/controller/antplugins/filter" "code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/controllers" - "code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/scheduler" "code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/scheduler/filter" "code.alipay.com/dbplatform/node-disk-controller/pkg/csi" ) func main() { // add plugins - controllers.RegisterPlugins([]controllers.PluginFactoryFunc{ + controllers.RegisterPluginsInPoolReconciler([]controllers.PluginFactoryFunc{ antplugin.NewReportLocalStoragePlugin, - }, []controllers.PluginFactoryFunc{ + }) + controllers.RegisterPluginsInVolumeReconciler([]controllers.PluginFactoryFunc{ antplugin.NewReportLocalStoragePlugin, antplugin.NewPatchPVPlugin, }) // add filters - filter.RegisterFilter("MinLocalStorage", antfilter.MinLocalStorageFilterFunc) filter.RegisterFilter("ObReplica", antfilter.ObReplicaFilterFunc) - // for volumegroup - scheduler.RegisterVolumeGroupPickSizeFn("MinLocalStorage", antfilter.GetAllocatableRemoveVolumeSize) - cmd := controllers.NewApplicationCmd() // add CSI command cmd.AddCommand(csi.NewCSICommand()) diff --git a/hack/deploy/aio-lvs/050-configmap.yaml b/hack/deploy/aio-lvs/050-configmap.yaml index 0eb601e..9a8f15c 100644 --- a/hack/deploy/aio-lvs/050-configmap.yaml +++ b/hack/deploy/aio-lvs/050-configmap.yaml @@ -7,6 +7,7 @@ data: config.yaml: | scheduler: maxRemoteVolumeCount: 3 + minLocalStoragePct: 20 filters: - Basic - Affinity diff --git a/hack/deploy/lvm/050-configmap.yaml b/hack/deploy/lvm/050-configmap.yaml index 9f5d0ad..c70e611 100644 --- a/hack/deploy/lvm/050-configmap.yaml +++ b/hack/deploy/lvm/050-configmap.yaml @@ -7,6 +7,7 @@ data: config.yaml: | scheduler: maxRemoteVolumeCount: 3 + minLocalStoragePct: 20 filters: - Basic - Affinity diff --git a/pkg/controller/manager/config/config.go b/pkg/controller/manager/config/config.go index 33bbda6..825ef9e 100644 --- a/pkg/controller/manager/config/config.go +++ b/pkg/controller/manager/config/config.go @@ -28,6 +28,8 @@ type SchedulerConfig struct { // NodeCacheSelector specify which nodes are cached to Node Informer. // Empty selector means all nodes are allowd to be cached. NodeCacheSelector map[string]string `json:"nodeCacheSelector" yaml:"nodeCacheSelector"` + // MinLocalStoragePct defines the minimun percentage of local storage to be reserved on one node. + MinLocalStoragePct int `json:"minLocalStoragePct" yaml:"minLocalStoragePct"` } type NoScheduleConfig struct { diff --git a/pkg/controller/manager/controllers/manager.go b/pkg/controller/manager/controllers/manager.go index d7eb6e1..ace6d24 100644 --- a/pkg/controller/manager/controllers/manager.go +++ b/pkg/controller/manager/controllers/manager.go @@ -85,44 +85,87 @@ func NewAndInitControllerManager(req NewManagerRequest) manager.Manager { scheduler = sched.NewScheduler(req.ControllerConfig) ) - // setup StoragePoolReconciler - poolReconciler := &reconciler.StoragePoolReconciler{ - Plugable: plugin.NewPluginList(), + poolReconciler := reconciler.PlugableReconciler{ Client: mgr.GetClient(), - Log: rt.Log.WithName("controllers").WithName("StoragePool"), - State: stateObj, - PoolUtil: poolUtil, - KubeCli: kubeClient, - Lock: misc.NewResourceLocks(), + Plugable: plugin.NewPluginList(), + + Log: rt.Log.WithName("Controller:StoragePool"), + KubeCli: kubeClient, + State: stateObj, + + Concurrency: 4, + MainHandler: &reconciler.StoragePoolReconcileHandler{ + Client: mgr.GetClient(), + State: stateObj, + PoolUtil: poolUtil, + KubeCli: kubeClient, + }, + WatchType: &v1.StoragePool{}, + } + if err = poolReconciler.SetupWithManager(mgr); err != nil { + klog.Error(err, "unable to create controller StoragePoolReconciler") + os.Exit(1) } // setup AntstorVolumeReconciler - volReconciler := &reconciler.AntstorVolumeReconciler{ - Plugable: plugin.NewPluginList(), - Client: mgr.GetClient(), - Log: rt.Log.WithName("controllers").WithName("AntstorVolume"), - State: stateObj, - AntstoreCli: antstorCli, - Scheduler: scheduler, - // EventRecorder for AntstorVolume - EventRecorder: mgr.GetEventRecorderFor("AntstorVolume"), + volReconciler := reconciler.PlugableReconciler{ + Client: mgr.GetClient(), + Plugable: plugin.NewPluginList(), + + Log: rt.Log.WithName("Controller:AntstorVolume"), + KubeCli: kubeClient, + State: stateObj, + + Concurrency: 1, + MainHandler: &reconciler.AntstorVolumeReconcileHandler{ + Client: mgr.GetClient(), + State: stateObj, + AntstoreCli: antstorCli, + Scheduler: scheduler, + }, + WatchType: &v1.AntstorVolume{}, } + if err = volReconciler.SetupWithManager(mgr); err != nil { + klog.Error(err, "unable to create controller VolumeReconciler") + os.Exit(1) + } + + // setup AntstorVolumeGroupReconciler + volGroupReconciler := reconciler.PlugableReconciler{ + Client: mgr.GetClient(), + Plugable: plugin.NewPluginList(), - volGroupReconciler := &reconciler.AntstorVolumeGroupReconciler{ - Client: mgr.GetClient(), - Plugable: plugin.NewPluginList(), - Log: rt.Log.WithName("controllers").WithName("AntstorVolumeGroup"), - Scheduler: scheduler, - State: stateObj, + Log: rt.Log.WithName("Controller:AntstorVolumeGroup"), + KubeCli: kubeClient, + State: stateObj, + + Concurrency: 1, + MainHandler: &reconciler.AntstorVolumeGroupReconcileHandler{ + Client: mgr.GetClient(), + Scheduler: scheduler, + State: stateObj, + }, + WatchType: &v1.AntstorVolumeGroup{}, + } + if err = volGroupReconciler.SetupWithManager(mgr); err != nil { + klog.Error(err, "unable to create controller VolumeGroupReconciler") + os.Exit(1) } // setup AntstorDataControlReconciler - dataControlReconciler := &reconciler.AntstorDataControlReconciler{ + dataControlReconciler := reconciler.PlugableReconciler{ Client: mgr.GetClient(), Plugable: plugin.NewPluginList(), - Log: rt.Log.WithName("controllers").WithName("DataControl"), - State: stateObj, + Log: rt.Log.WithName("Controller:AntstorDataControl"), + KubeCli: kubeClient, + State: stateObj, + + Concurrency: 1, + MainHandler: &reconciler.AntstorDataControlReconcileHandler{ + Client: mgr.GetClient(), + }, + WatchType: &v1.AntstorDataControl{}, } if err = dataControlReconciler.SetupWithManager(mgr); err != nil { klog.Error(err, "unable to create controller AntstorDataControlReconciler") @@ -169,20 +212,6 @@ func NewAndInitControllerManager(req NewManagerRequest) manager.Manager { dataControlReconciler.RegisterPlugin(p) } - // setup StoragePool/Volume Reconciler - if err = poolReconciler.SetupWithManager(mgr); err != nil { - klog.Error(err, "unable to create controller StoragePoolReconciler") - os.Exit(1) - } - if err = volReconciler.SetupWithManager(mgr); err != nil { - klog.Error(err, "unable to create controller VolumeReconciler") - os.Exit(1) - } - if err = volGroupReconciler.SetupWithManager(mgr); err != nil { - klog.Error(err, "unable to create controller VolumeGroupReconciler") - os.Exit(1) - } - // setup SnapshotReconsiler snapshotReconciler := &reconciler.SnapshotReconciler{ // kube client diff --git a/pkg/controller/manager/controllers/plugin.go b/pkg/controller/manager/controllers/plugin.go index a2ab316..aae9815 100644 --- a/pkg/controller/manager/controllers/plugin.go +++ b/pkg/controller/manager/controllers/plugin.go @@ -26,8 +26,11 @@ func init() { DataControlReconcilerPluginCreaters = append(DataControlReconcilerPluginCreaters, NewMetaSyncerPlugin) } -func RegisterPlugins(poolPlugins, volumePlugins []PluginFactoryFunc) { +func RegisterPluginsInPoolReconciler(poolPlugins []PluginFactoryFunc) { PoolReconcilerPluginCreaters = append(PoolReconcilerPluginCreaters, poolPlugins...) +} + +func RegisterPluginsInVolumeReconciler(volumePlugins []PluginFactoryFunc) { VolumeReconcilerPluginCreaters = append(VolumeReconcilerPluginCreaters, volumePlugins...) } diff --git a/pkg/controller/manager/reconciler/data_control_reconciler.go b/pkg/controller/manager/reconciler/data_control_reconciler.go index 303b721..ce4454a 100644 --- a/pkg/controller/manager/reconciler/data_control_reconciler.go +++ b/pkg/controller/manager/reconciler/data_control_reconciler.go @@ -1,86 +1,55 @@ package reconciler import ( - "context" "fmt" "time" - "github.com/go-logr/logr" - "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller" - "sigs.k8s.io/controller-runtime/pkg/reconcile" v1 "code.alipay.com/dbplatform/node-disk-controller/pkg/api/volume.antstor.alipay.com/v1" "code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/reconciler/plugin" - "code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/state" "code.alipay.com/dbplatform/node-disk-controller/pkg/util/misc" ) -type AntstorDataControlReconciler struct { +type AntstorDataControlReconcileHandler struct { client.Client - plugin.Plugable +} + +func (r *AntstorDataControlReconcileHandler) ResourceName() string { + return "AntstorDataControl" +} - Log logr.Logger - State state.StateIface +func (r *AntstorDataControlReconcileHandler) GetObject(req plugin.RequestContent) (obj runtime.Object, err error) { + var dataControl v1.AntstorDataControl + err = r.Client.Get(req.Ctx, req.Request.NamespacedName, &dataControl) + return &dataControl, err } -// SetupWithManager sets up the controller with the Manager. -func (r *AntstorDataControlReconciler) SetupWithManager(mgr ctrl.Manager) error { - return ctrl.NewControllerManagedBy(mgr). - WithOptions(controller.Options{ - MaxConcurrentReconciles: 1, - }). - For(&v1.AntstorDataControl{}). - Complete(r) +func (r *AntstorDataControlReconcileHandler) HandleDeletion(ctx *plugin.Context) (reuslt plugin.Result) { + return } -func (r *AntstorDataControlReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { +func (r *AntstorDataControlReconcileHandler) HandleReconcile(pCtx *plugin.Context) plugin.Result { var ( - resourceID = req.NamespacedName.String() - log = r.Log.WithValues("DataControl", resourceID) - dataControl v1.AntstorDataControl - err error - result plugin.Result - pCtx = &plugin.Context{ - Client: r.Client, - Ctx: ctx, - Object: &dataControl, - Request: req, - Log: log, - State: r.State, - } + dataControl, ok = pCtx.ReqCtx.Object.(*v1.AntstorDataControl) + result plugin.Result + ctx = pCtx.ReqCtx.Ctx + err error + log = pCtx.Log ) - if err = r.Get(ctx, req.NamespacedName, &dataControl); err != nil { - // When user deleted a volume, a request will be recieved. - // However the volume does not exists. Therefore the code goes to here - log.Error(err, "unable to fetch DataControl") - // we'll ignore not-found errors, since they can't be fixed by an immediate - // requeue (we'll need to wait for a new notification), and we can get them - // on deleted requests. - if errors.IsNotFound(err) { - // remove SP from State - log.Info("cannot find DataControl in apiserver") + if !ok { + return plugin.Result{ + Error: fmt.Errorf("object is not *v1.AntstorDataControl, %#v", pCtx.ReqCtx.Object), } - - return ctrl.Result{}, client.IgnoreNotFound(err) - } - - // not handle delete request - if dataControl.DeletionTimestamp != nil { - // run plugins - for _, plugin := range r.Plugable.Plugins() { - plugin.HandleDeletion(pCtx) - } - return r.handleDeletion(pCtx, &dataControl) } // validate and mutate DataControl - result = r.validateAndMutate(pCtx, &dataControl) + result = r.validateAndMutate(pCtx, dataControl) if result.NeedBreak() { - return result.Result, result.Error + return result } // sync volume group status @@ -99,7 +68,9 @@ func (r *AntstorDataControlReconciler) Reconcile(ctx context.Context, req ctrl.R if vg.Status.Status != v1.VolumeStatusReady { log.Info("VolumeGroup is not ready yet, retry in 20 sec", "name", key, "status", vg.Status.Status) - return ctrl.Result{RequeueAfter: 20 * time.Second}, nil + return plugin.Result{ + Result: ctrl.Result{RequeueAfter: 20 * time.Second}, + } } } @@ -127,7 +98,9 @@ func (r *AntstorDataControlReconciler) Reconcile(ctx context.Context, req ctrl.R }, &volGroup) if err != nil { log.Error(err, "fetching VolGroup failed") - return ctrl.Result{RequeueAfter: 10 * time.Second}, nil + return plugin.Result{ + Result: ctrl.Result{RequeueAfter: 10 * time.Second}, + } } else { for _, item := range volGroup.Spec.Volumes { var vol v1.AntstorVolume @@ -137,23 +110,29 @@ func (r *AntstorDataControlReconciler) Reconcile(ctx context.Context, req ctrl.R }, &vol) if err != nil { log.Error(err, "fetching VolGroup failed") - return ctrl.Result{RequeueAfter: 10 * time.Second}, nil + return plugin.Result{ + Result: ctrl.Result{RequeueAfter: 10 * time.Second}, + } } hostNode := dataControl.Spec.HostNode vol.Spec.HostNode = &hostNode err = r.Client.Update(ctx, &vol) if err != nil { log.Error(err, "updating Volume failed") - return ctrl.Result{RequeueAfter: 10 * time.Second}, nil + return plugin.Result{ + Result: ctrl.Result{RequeueAfter: 10 * time.Second}, + } } } } } - err = r.Client.Update(ctx, &dataControl) + err = r.Client.Update(ctx, dataControl) if err != nil { log.Error(err, "updating DataControl failed") - return ctrl.Result{RequeueAfter: 10 * time.Second}, nil + return plugin.Result{ + Result: ctrl.Result{RequeueAfter: 10 * time.Second}, + } } } @@ -167,7 +146,9 @@ func (r *AntstorDataControlReconciler) Reconcile(ctx context.Context, req ctrl.R }, &volGroup) if err != nil { log.Error(err, "fetching VolGroup failed") - return ctrl.Result{RequeueAfter: 10 * time.Second}, nil + return plugin.Result{ + Result: ctrl.Result{RequeueAfter: 10 * time.Second}, + } } else { for _, item := range volGroup.Spec.Volumes { var vol v1.AntstorVolume @@ -177,14 +158,18 @@ func (r *AntstorDataControlReconciler) Reconcile(ctx context.Context, req ctrl.R }, &vol) if err != nil { log.Error(err, "fetching VolGroup failed") - return ctrl.Result{RequeueAfter: 10 * time.Second}, nil + return plugin.Result{ + Result: ctrl.Result{RequeueAfter: 10 * time.Second}, + } } if vol.Status.CSINodePubParams == nil || vol.Status.CSINodePubParams.TargetPath == "" { vol.Status.CSINodePubParams = dataControl.Status.CSINodePubParams err = r.Client.Status().Update(ctx, &vol) if err != nil { log.Error(err, "updating Volume Status failed") - return ctrl.Result{RequeueAfter: 10 * time.Second}, nil + return plugin.Result{ + Result: ctrl.Result{RequeueAfter: 10 * time.Second}, + } } } } @@ -192,24 +177,10 @@ func (r *AntstorDataControlReconciler) Reconcile(ctx context.Context, req ctrl.R } } - // run plugins - for _, plugin := range r.Plugable.Plugins() { - result = plugin.Reconcile(pCtx) - if result.NeedBreak() { - return result.Result, result.Error - } - } - - return ctrl.Result{}, nil -} - -func (r *AntstorDataControlReconciler) handleDeletion(ctx *plugin.Context, dataControl *v1.AntstorDataControl) (result reconcile.Result, err error) { - // resouce cleaning is done in agent - - return ctrl.Result{}, nil + return result } -func (r *AntstorDataControlReconciler) validateAndMutate(ctx *plugin.Context, dataControl *v1.AntstorDataControl) (result plugin.Result) { +func (r *AntstorDataControlReconcileHandler) validateAndMutate(ctx *plugin.Context, dataControl *v1.AntstorDataControl) (result plugin.Result) { if !misc.InSliceString(string(dataControl.Spec.EngineType), []string{string(v1.PoolModeKernelLVM), string(v1.PoolModeSpdkLVStore)}) { return plugin.Result{Error: fmt.Errorf("invalid type %s", dataControl.Spec.EngineType)} diff --git a/pkg/controller/manager/reconciler/plugable_reconciler.go b/pkg/controller/manager/reconciler/plugable_reconciler.go new file mode 100644 index 0000000..dd07b6f --- /dev/null +++ b/pkg/controller/manager/reconciler/plugable_reconciler.go @@ -0,0 +1,159 @@ +package reconciler + +import ( + "context" + "fmt" + "time" + + "github.com/go-logr/logr" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + "code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/reconciler/plugin" + "code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/state" + "code.alipay.com/dbplatform/node-disk-controller/pkg/util/misc" +) + +type PlugableReconcilerIface interface { + plugin.Plugable + reconcile.Reconciler +} + +type ReconcileHandler interface { + ResourceName() string + GetObject(plugin.RequestContent) (runtime.Object, error) + HandleReconcile(*plugin.Context) plugin.Result + HandleDeletion(*plugin.Context) plugin.Result +} + +type SetupWithManagerProvider interface { + GetSetupWithManagerFn() SetupWithManagerFn +} + +type SetupWithManagerFn func(r reconcile.Reconciler, mgr ctrl.Manager) error + +type PlugableReconciler struct { + client.Client + plugin.Plugable + + KubeCli kubernetes.Interface + State state.StateIface + Log logr.Logger + + Concurrency int + WatchType client.Object + MainHandler ReconcileHandler + + Lock misc.ResourceLockIface +} + +// SetupWithManager sets up the controller with the Manager. +func (r *PlugableReconciler) SetupWithManager(mgr ctrl.Manager) error { + if setupProvider, ok := r.MainHandler.(SetupWithManagerProvider); ok { + fn := setupProvider.GetSetupWithManagerFn() + return fn(r, mgr) + } + + if r.Concurrency <= 0 { + r.Concurrency = 1 + } + if r.Concurrency > 1 { + r.Lock = misc.NewResourceLocks() + } + + if r.WatchType == nil { + return fmt.Errorf("WatchType is nil") + } + + return ctrl.NewControllerManagedBy(mgr). + WithOptions(controller.Options{ + MaxConcurrentReconciles: r.Concurrency, + }). + For(r.WatchType). + Complete(r) +} + +func (r *PlugableReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + var ( + resourceID = req.NamespacedName.String() + log = r.Log.WithValues(r.MainHandler.ResourceName(), resourceID) + err error + result plugin.Result + obj runtime.Object + metaObj metav1.Object + ) + + if r.Lock != nil { + // try to get lock by id (ns/name) + if !r.Lock.TryAcquire(resourceID) { + log.Info("cannot get lock of the storagepool, try reconciling in 10 sec") + return ctrl.Result{RequeueAfter: 10 * time.Second}, nil + } + defer r.Lock.Release(resourceID) + } + + if obj, err = r.MainHandler.GetObject(plugin.RequestContent{ + Ctx: ctx, + Request: req, + }); err != nil { + // When user deleted a volume, a request will be recieved. + // However the volume does not exists. Therefore the code goes to here + log.Error(err, "unable to fetch Object") + // we'll ignore not-found errors, since they can't be fixed by an immediate + // requeue (we'll need to wait for a new notification), and we can get them + // on deleted requests. + if errors.IsNotFound(err) { + // remove SP from State + log.Info("cannot find Object in apiserver") + } + + return ctrl.Result{}, client.IgnoreNotFound(err) + } + + // get metadata + if metaObj, err = meta.Accessor(obj); err != nil { + log.Error(err, "cannot access object meta") + return ctrl.Result{}, err + } + + // create Context + var pCtx = &plugin.Context{ + KubeCli: r.KubeCli, + Client: r.Client, + State: r.State, + Log: log, + ReqCtx: plugin.RequestContent{ + Ctx: ctx, + Request: req, + Object: obj, + }, + } + + // not handle delete request + if metaObj.GetDeletionTimestamp() != nil { + for _, plugin := range r.Plugable.Plugins() { + plugin.HandleDeletion(pCtx) + } + result = r.MainHandler.HandleDeletion(pCtx) + return result.Result, result.Error + } + + // run plugins + for _, plugin := range r.Plugable.Plugins() { + result = plugin.Reconcile(pCtx) + if result.NeedBreak() { + return result.Result, result.Error + } + } + + // main reconciling + result = r.MainHandler.HandleReconcile(pCtx) + return result.Result, result.Error +} diff --git a/pkg/controller/manager/reconciler/plugin/lockpool.go b/pkg/controller/manager/reconciler/plugin/lockpool.go index 82dc3ae..bd5a845 100644 --- a/pkg/controller/manager/reconciler/plugin/lockpool.go +++ b/pkg/controller/manager/reconciler/plugin/lockpool.go @@ -27,7 +27,7 @@ func (p *LockPoolPlugin) Name() string { func (p *LockPoolPlugin) Reconcile(ctx *Context) (result Result) { var ( log = ctx.Log - obj = ctx.Object + obj = ctx.ReqCtx.Object ok bool err error @@ -41,7 +41,7 @@ func (p *LockPoolPlugin) Reconcile(ctx *Context) (result Result) { return Result{} } - err = p.Client.Get(ctx.Ctx, client.ObjectKey{ + err = p.Client.Get(ctx.ReqCtx.Ctx, client.ObjectKey{ Name: sp.Name, }, &node) if err != nil { @@ -104,7 +104,7 @@ func (p *LockPoolPlugin) Reconcile(ctx *Context) (result Result) { sp.Status.Conditions[condIdx].Status = v1.StatusError sp.Status.Conditions[condIdx].Message = v1.KubeNodeMsgNcOffline } - err = p.Client.Status().Update(ctx.Ctx, sp) + err = p.Client.Status().Update(ctx.ReqCtx.Ctx, sp) if err != nil { return Result{Error: err} } @@ -115,7 +115,7 @@ func (p *LockPoolPlugin) Reconcile(ctx *Context) (result Result) { log.Info("node is not in NC_OFFLINE, unlock storagepool", "name", sp.Name) sp.Status.Conditions[condIdx].Status = v1.StatusOK sp.Status.Conditions[condIdx].Message = "" - err = p.Client.Status().Update(ctx.Ctx, sp) + err = p.Client.Status().Update(ctx.ReqCtx.Ctx, sp) if err != nil { return Result{Error: err} } diff --git a/pkg/controller/manager/reconciler/plugin/metasync.go b/pkg/controller/manager/reconciler/plugin/metasync.go index ebeb5ba..7fad491 100644 --- a/pkg/controller/manager/reconciler/plugin/metasync.go +++ b/pkg/controller/manager/reconciler/plugin/metasync.go @@ -28,8 +28,8 @@ func (r *MetaSyncPlugin) Reconcile(ctx *Context) (result Result) { } var ( - obj = ctx.Object - req = ctx.Request + obj = ctx.ReqCtx.Object + req = ctx.ReqCtx.Request ) r.log = ctx.Log diff --git a/pkg/controller/manager/reconciler/plugin/plugable.go b/pkg/controller/manager/reconciler/plugin/plugable.go index abdae61..3ba0d9b 100644 --- a/pkg/controller/manager/reconciler/plugin/plugable.go +++ b/pkg/controller/manager/reconciler/plugin/plugable.go @@ -12,14 +12,19 @@ import ( ) type Context struct { - // client + // per reconciler KubeCli kubernetes.Interface Client client.Client + State state.StateIface + Log logr.Logger + // per request + ReqCtx RequestContent +} + +type RequestContent struct { Ctx context.Context Request ctrl.Request Object runtime.Object - State state.StateIface - Log logr.Logger } type Result struct { diff --git a/pkg/controller/manager/reconciler/pool_reconciler.go b/pkg/controller/manager/reconciler/pool_reconciler.go index c18180b..061b090 100644 --- a/pkg/controller/manager/reconciler/pool_reconciler.go +++ b/pkg/controller/manager/reconciler/pool_reconciler.go @@ -12,10 +12,10 @@ import ( "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/reconcile" v1 "code.alipay.com/dbplatform/node-disk-controller/pkg/api/volume.antstor.alipay.com/v1" @@ -36,125 +36,77 @@ var ( nodeOfflineExpireDuration = 24 * time.Hour ) -type StoragePoolReconciler struct { +type StoragePoolReconcileHandler struct { client.Client - plugin.Plugable - // - Log logr.Logger - // - // NodeGetter kubeutil.NodeInfoGetterIface + State state.StateIface PoolUtil kubeutil.StoragePoolUpdater KubeCli kubernetes.Interface - // - Lock misc.ResourceLockIface } -// SetupWithManager sets up the controller with the Manager. -func (r *StoragePoolReconciler) SetupWithManager(mgr ctrl.Manager) error { - // setup indexer - // mgr.GetFieldIndexer().IndexField(context.Background(), obj client.Object, field string, extractValue client.IndexerFunc) +func (r *StoragePoolReconcileHandler) ResourceName() string { + return "StoragePool" +} - var concurrency = 1 - if r.Lock != nil { - concurrency = 4 - } - return ctrl.NewControllerManagedBy(mgr). - WithOptions(controller.Options{ - MaxConcurrentReconciles: concurrency, - }). - For(&v1.StoragePool{}). - Complete(r) +func (r *StoragePoolReconcileHandler) GetObject(req plugin.RequestContent) (obj runtime.Object, err error) { + var pool v1.StoragePool + err = r.Client.Get(req.Ctx, req.Request.NamespacedName, &pool) + return &pool, err +} + +func (r *StoragePoolReconcileHandler) HandleDeletion(pCtx *plugin.Context) (result plugin.Result) { + result.Result, result.Error = r.handleDeletion(pCtx) + return } -func (r *StoragePoolReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { +func (r *StoragePoolReconcileHandler) HandleReconcile(pCtx *plugin.Context) (result plugin.Result) { var ( - resourceID = req.NamespacedName.String() - log = r.Log.WithValues("StoragePool", resourceID) - sp v1.StoragePool - err error - result plugin.Result - pCtx = &plugin.Context{ - Client: r.Client, - KubeCli: r.KubeCli, - Ctx: ctx, - Object: &sp, - Request: req, - Log: log, - State: r.State, - } + log = pCtx.Log + ctx = pCtx.ReqCtx.Ctx + sp *v1.StoragePool + ok bool ) - // try get lock by id (ns/name) - if !r.Lock.TryAcquire(resourceID) { - log.Info("cannot get lock of the storagepool, skip reconciling.") - return ctrl.Result{}, nil - } - defer r.Lock.Release(resourceID) - - if err := r.Get(ctx, req.NamespacedName, &sp); err != nil { - // When user deleted a volume, a request will be recieved. - // However the volume does not exists. Therefore the code goes to here - log.Error(err, "unable to fetch StoragePool") - // we'll ignore not-found errors, since they can't be fixed by an immediate - // requeue (we'll need to wait for a new notification), and we can get them - // on deleted requests. - if errors.IsNotFound(err) { - // remove SP from State - log.Info("cannot find StoragePool in apiserver, so remove it from State", "error", r.State.RemoveStoragePool(req.Name)) - } - - return ctrl.Result{}, client.IgnoreNotFound(err) - } - - // not handle delete request - if sp.DeletionTimestamp != nil { - // run plugins - for _, plugin := range r.Plugable.Plugins() { - plugin.HandleDeletion(pCtx) - } - return r.handleDeletion(ctx, sp, log) + if sp, ok = pCtx.ReqCtx.Object.(*v1.StoragePool); !ok { + result.Error = fmt.Errorf("object is not *v1.AntstorVolumeGroup, %#v", pCtx.ReqCtx.Object) + return } // TODO: move to webhook // validate and mutate StoragePool result = r.validateAndMutate(sp, log) if result.NeedBreak() { - return result.Result, result.Error + return } // save StoragePool to State result = r.saveToState(sp, log) if result.NeedBreak() { - return result.Result, result.Error + return } // check - err = r.checkHeartbeat(ctx, sp, log) + err := r.checkHeartbeat(ctx, sp, log) if err != nil { log.Error(err, "checking heartbeat with error") + result.Error = err + return } // check if node is deleted result = r.processNodeOffline(sp, log) if result.NeedBreak() { - return result.Result, result.Error - } - - // run plugins - for _, plugin := range r.Plugable.Plugins() { - result = plugin.Reconcile(pCtx) - if result.NeedBreak() { - return result.Result, result.Error - } + return } // requeue StoragePool every 5 minutes, to check heartbeat and check Node status - return ctrl.Result{RequeueAfter: 5 * time.Minute}, nil + return plugin.Result{ + Result: ctrl.Result{RequeueAfter: 5 * time.Minute}, + } } // checkHeartbeat check Lease and update StoragePool's status -func (r *StoragePoolReconciler) checkHeartbeat(ctx context.Context, sp v1.StoragePool, log logr.Logger) (err error) { +func (r *StoragePoolReconcileHandler) checkHeartbeat(ctx context.Context, sp *v1.StoragePool, log logr.Logger) (err error) { var ( leaseCli = r.KubeCli.CoordinationV1().Leases(LeaseNamespace) lease *coorv1.Lease @@ -219,8 +171,18 @@ func (r *StoragePoolReconciler) checkHeartbeat(ctx context.Context, sp v1.Storag return } -func (r *StoragePoolReconciler) handleDeletion(ctx context.Context, sp v1.StoragePool, log logr.Logger) (result reconcile.Result, err error) { - var node *state.Node +func (r *StoragePoolReconcileHandler) handleDeletion(pCtx *plugin.Context) (result reconcile.Result, err error) { + var ( + log = pCtx.Log + ctx = pCtx.ReqCtx.Ctx + sp *v1.StoragePool + ok bool + node *state.Node + ) + if sp, ok = pCtx.ReqCtx.Object.(*v1.StoragePool); !ok { + err = fmt.Errorf("object is not *v1.AntstorVolumeGroup, %#v", pCtx.ReqCtx.Object) + return + } var name = sp.Name node, err = r.State.GetNodeByNodeID(name) @@ -235,7 +197,7 @@ func (r *StoragePoolReconciler) handleDeletion(ctx context.Context, sp v1.Storag } if !strings.HasPrefix(sp.Status.Message, "Volumes left on node") { sp.Status.Message = "Volumes left on node: " + strings.Join(volNames, ", ") - err = r.Client.Status().Update(ctx, &sp) + err = r.Client.Status().Update(ctx, sp) if err != nil { log.Error(err, "updating StoragePool status failed") } @@ -253,7 +215,7 @@ func (r *StoragePoolReconciler) handleDeletion(ctx context.Context, sp v1.Storag log.Info("remove finalizers of StoragePool") // remove Finalizers sp.Finalizers = []string{} - err = r.Client.Update(ctx, &sp) + err = r.Client.Update(ctx, sp) if err != nil { log.Error(err, "removing finalizer failed") } @@ -262,17 +224,17 @@ func (r *StoragePoolReconciler) handleDeletion(ctx context.Context, sp v1.Storag return ctrl.Result{}, nil } -func (r *StoragePoolReconciler) saveToState(sp v1.StoragePool, log logr.Logger) (result plugin.Result) { +func (r *StoragePoolReconcileHandler) saveToState(sp *v1.StoragePool, log logr.Logger) (result plugin.Result) { var patch = client.MergeFrom(sp.DeepCopy()) var err error - r.State.SetStoragePool(&sp) + r.State.SetStoragePool(sp) if !misc.InSliceString(v1.InStateFinalizer, sp.Finalizers) { sp.Finalizers = append(sp.Finalizers, v1.InStateFinalizer) // do update in APIServer log.Info("inject InStateFinalizer to pool") - err = r.Patch(context.Background(), &sp, patch) + err = r.Patch(context.Background(), sp, patch) if err != nil { log.Error(err, "Update StoragePool failed") } @@ -285,7 +247,7 @@ func (r *StoragePoolReconciler) saveToState(sp v1.StoragePool, log logr.Logger) return plugin.Result{} } -func (r *StoragePoolReconciler) validateAndMutate(sp v1.StoragePool, log logr.Logger) (result plugin.Result) { +func (r *StoragePoolReconcileHandler) validateAndMutate(sp *v1.StoragePool, log logr.Logger) (result plugin.Result) { var err error var patch = client.MergeFrom(sp.DeepCopy()) @@ -303,7 +265,7 @@ func (r *StoragePoolReconciler) validateAndMutate(sp v1.StoragePool, log logr.Lo if len(sp.Labels) == 0 { sp.Labels = make(map[string]string) sp.Labels[v1.PoolLabelsNodeSnKey] = sp.Name - err = r.Patch(context.Background(), &sp, patch) + err = r.Patch(context.Background(), sp, patch) if err != nil { log.Error(err, "Update StoragePool Labels failed") } @@ -325,7 +287,7 @@ func (r *StoragePoolReconciler) validateAndMutate(sp v1.StoragePool, log logr.Lo sp.Status.Capacity[v1.ResourceDiskPoolByte] = *quant log.Info("update pool status and capacity", "status", sp.Status) - err = r.Status().Update(context.Background(), &sp) + err = r.Status().Update(context.Background(), sp) if err != nil { log.Error(err, "update StoragePool/Status failed") } @@ -338,7 +300,7 @@ func (r *StoragePoolReconciler) validateAndMutate(sp v1.StoragePool, log logr.Lo return plugin.Result{} } -func (r *StoragePoolReconciler) processNodeOffline(sp v1.StoragePool, log logr.Logger) (result plugin.Result) { +func (r *StoragePoolReconcileHandler) processNodeOffline(sp *v1.StoragePool, log logr.Logger) (result plugin.Result) { /* Question: If node does not exist, is it safe to delete the associating StoragePool; Condition: @@ -379,7 +341,7 @@ func (r *StoragePoolReconciler) processNodeOffline(sp v1.StoragePool, log logr.L if nodeNotFound { log.Info("deleting StoragePool because node does not exist anymore") - err = r.Delete(context.Background(), &sp) + err = r.Delete(context.Background(), sp) if err != nil { log.Error(err, "deleting StoragePool failed") return plugin.Result{Error: err} @@ -389,30 +351,3 @@ func (r *StoragePoolReconciler) processNodeOffline(sp v1.StoragePool, log logr.L return plugin.Result{} } - -/* -// TODO: move to agent -func (r *StoragePoolReconciler) fillNodeInfo(sp v1.StoragePool, log logr.Logger, newNodeInfo v1.NodeInfo) (needReturn bool, result reconcile.Result, err error) { - var patch = client.MergeFrom(sp.DeepCopy()) - if kubeutil.IsNodeInfoDifferent(sp.Spec.NodeInfo, newNodeInfo) { - sp.Spec.NodeInfo = newNodeInfo - sp.Spec.Addresses = []corev1.NodeAddress{ - { - Type: corev1.NodeInternalIP, - Address: newNodeInfo.IP, - }, - } - // do update in APIServer - log.Info("update NodeInfo of pool") - err = r.Patch(context.Background(), &sp, patch) - if err != nil { - log.Error(err, "update StoragePool failed") - return true, ctrl.Result{}, err - } - return true, ctrl.Result{}, nil - } - - return false, ctrl.Result{}, nil -} - -*/ diff --git a/pkg/controller/manager/reconciler/volume_group_reconciler.go b/pkg/controller/manager/reconciler/volume_group_reconciler.go index e01391c..72f46e5 100644 --- a/pkg/controller/manager/reconciler/volume_group_reconciler.go +++ b/pkg/controller/manager/reconciler/volume_group_reconciler.go @@ -7,13 +7,12 @@ import ( "strings" "time" - "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/reconcile" v1 "code.alipay.com/dbplatform/node-disk-controller/pkg/api/volume.antstor.alipay.com/v1" @@ -24,110 +23,80 @@ import ( "code.alipay.com/dbplatform/node-disk-controller/pkg/util/misc" ) -type AntstorVolumeGroupReconciler struct { +type AntstorVolumeGroupReconcileHandler struct { client.Client - plugin.Plugable - Log logr.Logger State state.StateIface Scheduler sched.SchedulerIface } -// SetupWithManager sets up the controller with the Manager. -func (r *AntstorVolumeGroupReconciler) SetupWithManager(mgr ctrl.Manager) error { - return ctrl.NewControllerManagedBy(mgr). - WithOptions(controller.Options{ - MaxConcurrentReconciles: 1, - }). - For(&v1.AntstorVolumeGroup{}). - Complete(r) +func (r *AntstorVolumeGroupReconcileHandler) ResourceName() string { + return "AntstorVolumeGroup" } -func (r *AntstorVolumeGroupReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - var ( - resourceID = req.NamespacedName.String() - log = r.Log.WithValues("VolumeGroup", resourceID) - volGroup v1.AntstorVolumeGroup - err error - result plugin.Result - pCtx = &plugin.Context{ - Client: r.Client, - Ctx: ctx, - Object: &volGroup, - Request: req, - Log: log, - State: r.State, - } - ) - - if err = r.Get(ctx, req.NamespacedName, &volGroup); err != nil { - // When user deleted a volume, a request will be recieved. - // However the volume does not exists. Therefore the code goes to here - log.Error(err, "unable to fetch VolumeGroup") - // we'll ignore not-found errors, since they can't be fixed by an immediate - // requeue (we'll need to wait for a new notification), and we can get them - // on deleted requests. - if errors.IsNotFound(err) { - // remove SP from State - log.Info("cannot find VolumeGroup in apiserver") - } +func (r *AntstorVolumeGroupReconcileHandler) GetObject(req plugin.RequestContent) (obj runtime.Object, err error) { + var volGroup v1.AntstorVolumeGroup + err = r.Client.Get(req.Ctx, req.Request.NamespacedName, &volGroup) + return &volGroup, err +} - return ctrl.Result{}, client.IgnoreNotFound(err) - } +func (r *AntstorVolumeGroupReconcileHandler) HandleDeletion(pCtx *plugin.Context) (result plugin.Result) { + result.Result, result.Error = r.handleDeletion(pCtx) + return +} - // not handle delete request - if volGroup.DeletionTimestamp != nil { - // run plugins - for _, plugin := range r.Plugable.Plugins() { - plugin.HandleDeletion(pCtx) - } - return r.handleDeletion(pCtx, &volGroup) +func (r *AntstorVolumeGroupReconcileHandler) HandleReconcile(pCtx *plugin.Context) (result plugin.Result) { + var ( + volGroup *v1.AntstorVolumeGroup + ok bool + ) + if volGroup, ok = pCtx.ReqCtx.Object.(*v1.AntstorVolumeGroup); !ok { + result.Error = fmt.Errorf("object is not *v1.AntstorVolumeGroup, %#v", pCtx.ReqCtx.Object) + return } // validate and mutate VolumeGroup - result = r.validateAndMutate(pCtx, &volGroup) + result = r.validateAndMutate(pCtx, volGroup) if result.NeedBreak() { - return result.Result, result.Error + return } // sync volume - result = r.syncVolumes(pCtx, &volGroup) + result = r.syncVolumes(pCtx, volGroup) if result.NeedBreak() { - return result.Result, result.Error + return } - result = r.rollbackUnrecoverable(pCtx, &volGroup) + result = r.rollbackUnrecoverable(pCtx, volGroup) if result.NeedBreak() { - return result.Result, result.Error + return } // schedule volumes for volume group - result = r.scheduleVolGroup(pCtx, &volGroup) + result = r.scheduleVolGroup(pCtx, volGroup) if result.NeedBreak() { - return result.Result, result.Error - } - - // run plugins - for _, plugin := range r.Plugable.Plugins() { - result = plugin.Reconcile(pCtx) - if result.NeedBreak() { - return result.Result, result.Error - } + return } // if volumes are not all ready, reconcile the volGroup - result = r.waitVolumesReady(pCtx, &volGroup) + result = r.waitVolumesReady(pCtx, volGroup) if result.NeedBreak() { - return result.Result, result.Error + return } - return ctrl.Result{}, nil + return } -func (r *AntstorVolumeGroupReconciler) handleDeletion(ctx *plugin.Context, volGroup *v1.AntstorVolumeGroup) (result reconcile.Result, err error) { +func (r *AntstorVolumeGroupReconcileHandler) handleDeletion(ctx *plugin.Context) (result reconcile.Result, err error) { var ( - log = ctx.Log + log = ctx.Log + volGroup *v1.AntstorVolumeGroup + ok bool ) + if volGroup, ok = ctx.ReqCtx.Object.(*v1.AntstorVolumeGroup); !ok { + err = fmt.Errorf("object is not *v1.AntstorVolumeGroup, %#v", ctx.ReqCtx.Object) + return + } // TODO: wait until data control is deleted if val, has := volGroup.Labels[v1.DataControlNameKey]; has { @@ -136,7 +105,7 @@ func (r *AntstorVolumeGroupReconciler) handleDeletion(ctx *plugin.Context, volGr Namespace: v1.DefaultNamespace, Name: val, } - err = r.Client.Get(ctx.Ctx, key, &dc) + err = r.Client.Get(ctx.ReqCtx.Ctx, key, &dc) log.Info("try to find datacontrol", "key", key, "err", err) if !errors.IsNotFound(err) { log.Info("wait datacontrol to be deleted, retry in 20 second", "key", key) @@ -152,11 +121,11 @@ func (r *AntstorVolumeGroupReconciler) handleDeletion(ctx *plugin.Context, volGr Namespace: volMeta.VolId.Namespace, Name: volMeta.VolId.Name, } - err = r.Client.Get(ctx.Ctx, key, &volume) + err = r.Client.Get(ctx.ReqCtx.Ctx, key, &volume) if errors.IsNotFound(err) { log.Info("volume is deleted", "vol", key) } else { - err = r.Client.Delete(ctx.Ctx, &volume) + err = r.Client.Delete(ctx.ReqCtx.Ctx, &volume) if err != nil { log.Error(err, "delete vol failed. retry in 10 sec", "vol", key) return reconcile.Result{RequeueAfter: 10 * time.Second}, nil @@ -178,7 +147,7 @@ func (r *AntstorVolumeGroupReconciler) handleDeletion(ctx *plugin.Context, volGr volGroup.Finalizers = append(volGroup.Finalizers[:toDelIdx], volGroup.Finalizers[toDelIdx+1:]...) // update object - err = r.Client.Update(ctx.Ctx, volGroup) + err = r.Client.Update(ctx.ReqCtx.Ctx, volGroup) if err != nil { log.Error(err, "update volumegroup failed") return ctrl.Result{}, err @@ -188,7 +157,7 @@ func (r *AntstorVolumeGroupReconciler) handleDeletion(ctx *plugin.Context, volGr return ctrl.Result{}, nil } -func (r *AntstorVolumeGroupReconciler) validateAndMutate(ctx *plugin.Context, volGroup *v1.AntstorVolumeGroup) (result plugin.Result) { +func (r *AntstorVolumeGroupReconcileHandler) validateAndMutate(ctx *plugin.Context, volGroup *v1.AntstorVolumeGroup) (result plugin.Result) { var err error var minQ = volGroup.Spec.DesiredVolumeSpec.SizeRange.Min var maxQ = volGroup.Spec.DesiredVolumeSpec.SizeRange.Max @@ -248,7 +217,7 @@ func (r *AntstorVolumeGroupReconciler) validateAndMutate(ctx *plugin.Context, vo // rollbackUnrecoverable rollback unrecoverable error of Volumes // unrecoverable error example: no enough free space, node failure, e.g. -func (r *AntstorVolumeGroupReconciler) rollbackUnrecoverable(ctx *plugin.Context, volGroup *v1.AntstorVolumeGroup) (result plugin.Result) { +func (r *AntstorVolumeGroupReconcileHandler) rollbackUnrecoverable(ctx *plugin.Context, volGroup *v1.AntstorVolumeGroup) (result plugin.Result) { var ( log = ctx.Log err error @@ -269,7 +238,7 @@ func (r *AntstorVolumeGroupReconciler) rollbackUnrecoverable(ctx *plugin.Context var volId = volGroup.Spec.Volumes[idx] // delete volume var vol v1.AntstorVolume - err = r.Client.Get(ctx.Ctx, client.ObjectKey{ + err = r.Client.Get(ctx.ReqCtx.Ctx, client.ObjectKey{ Namespace: volId.VolId.Namespace, Name: volId.VolId.Name, }, &vol) @@ -279,7 +248,7 @@ func (r *AntstorVolumeGroupReconciler) rollbackUnrecoverable(ctx *plugin.Context sinceCreation := time.Since(vol.CreationTimestamp.Time) if sinceCreation > time.Minute { log.Info("deleting volume", "vol", volId, "sinceCreation", sinceCreation) - err = r.Client.Delete(ctx.Ctx, &vol) + err = r.Client.Delete(ctx.ReqCtx.Ctx, &vol) if err != nil { log.Error(err, "delete volume failed", "vol", volId) } @@ -303,7 +272,7 @@ func (r *AntstorVolumeGroupReconciler) rollbackUnrecoverable(ctx *plugin.Context // update volumegroup spec if rollback { - err = r.Client.Update(ctx.Ctx, volGroup) + err = r.Client.Update(ctx.ReqCtx.Ctx, volGroup) if err != nil { return plugin.Result{Error: err} } @@ -313,7 +282,7 @@ func (r *AntstorVolumeGroupReconciler) rollbackUnrecoverable(ctx *plugin.Context return plugin.Result{} } -func (r *AntstorVolumeGroupReconciler) scheduleVolGroup(ctx *plugin.Context, volGroup *v1.AntstorVolumeGroup) (result plugin.Result) { +func (r *AntstorVolumeGroupReconcileHandler) scheduleVolGroup(ctx *plugin.Context, volGroup *v1.AntstorVolumeGroup) (result plugin.Result) { var ( err error log = ctx.Log @@ -332,7 +301,7 @@ func (r *AntstorVolumeGroupReconciler) scheduleVolGroup(ctx *plugin.Context, vol // TODO: update status log.Error(err, "sched volumegroup failed, retry in 1 min") volGroup.Status.Message = err.Error() - updateErr := r.Status().Update(ctx.Ctx, volGroup) + updateErr := r.Status().Update(ctx.ReqCtx.Ctx, volGroup) if updateErr != nil { log.Error(updateErr, err.Error()) } @@ -344,7 +313,7 @@ func (r *AntstorVolumeGroupReconciler) scheduleVolGroup(ctx *plugin.Context, vol if !misc.InSliceString(v1.VolumesFinalizer, volGroup.Finalizers) { volGroup.Finalizers = append(volGroup.Finalizers, v1.VolumesFinalizer) } - err = r.Client.Update(ctx.Ctx, volGroup) + err = r.Client.Update(ctx.ReqCtx.Ctx, volGroup) if err != nil { log.Error(err, "update volumegroup failed") return plugin.Result{Error: err} @@ -355,9 +324,7 @@ func (r *AntstorVolumeGroupReconciler) scheduleVolGroup(ctx *plugin.Context, vol return } -/* - */ -func (r *AntstorVolumeGroupReconciler) syncVolumes(ctx *plugin.Context, volGroup *v1.AntstorVolumeGroup) (result plugin.Result) { +func (r *AntstorVolumeGroupReconcileHandler) syncVolumes(ctx *plugin.Context, volGroup *v1.AntstorVolumeGroup) (result plugin.Result) { // vol group is not scheduled yet if len(volGroup.Spec.Volumes) == 0 { return plugin.Result{} @@ -397,7 +364,7 @@ func (r *AntstorVolumeGroupReconciler) syncVolumes(ctx *plugin.Context, volGroup // if vol is scheduled and vol is not found, create volume if (item.Size > 0 && item.TargetNodeName != "") && errors.IsNotFound(err) { newVol := newVolume(volGroup, item.VolId, item.Size, item.TargetNodeName) - errCreate := r.Client.Create(ctx.Ctx, newVol) + errCreate := r.Client.Create(ctx.ReqCtx.Ctx, newVol) if errCreate == nil || errors.IsAlreadyExists(errCreate) { log.Info("successfully created volume", "vol", item.VolId) } else { @@ -434,7 +401,7 @@ func (r *AntstorVolumeGroupReconciler) syncVolumes(ctx *plugin.Context, volGroup // if status is changed, update status if !reflect.DeepEqual(volGroup.Status, copyVolGroup.Status) { - err = r.Client.Status().Update(ctx.Ctx, volGroup) + err = r.Client.Status().Update(ctx.ReqCtx.Ctx, volGroup) if err != nil { log.Error(err, "update volumegroup status failed") return plugin.Result{Error: err} @@ -446,7 +413,7 @@ func (r *AntstorVolumeGroupReconciler) syncVolumes(ctx *plugin.Context, volGroup return } -func (r *AntstorVolumeGroupReconciler) waitVolumesReady(ctx *plugin.Context, volGroup *v1.AntstorVolumeGroup) (result plugin.Result) { +func (r *AntstorVolumeGroupReconcileHandler) waitVolumesReady(ctx *plugin.Context, volGroup *v1.AntstorVolumeGroup) (result plugin.Result) { var ( log = ctx.Log volNotAllReady bool diff --git a/pkg/controller/manager/reconciler/volume_reconciler.go b/pkg/controller/manager/reconciler/volume_reconciler.go index b217f79..256ec15 100644 --- a/pkg/controller/manager/reconciler/volume_reconciler.go +++ b/pkg/controller/manager/reconciler/volume_reconciler.go @@ -16,8 +16,8 @@ import ( "code.alipay.com/dbplatform/node-disk-controller/pkg/util/misc" "github.com/go-logr/logr" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" @@ -33,123 +33,112 @@ const ( EventReasonCreateSpdkFailure = "CreateSpdkFailure" ) -type AntstorVolumeReconciler struct { +type AntstorVolumeReconcileHandler struct { client.Client - plugin.Plugable KubeCli kubernetes.Interface - Log logr.Logger State state.StateIface // if Scheduler is nil, Reconciler will not schedule Volume Scheduler sched.SchedulerIface AntstoreCli versioned.Interface // EventRecorder - EventRecorder record.EventRecorder + // EventRecorder record.EventRecorder } // SetupWithManager sets up the controller with the Manager. -func (r *AntstorVolumeReconciler) SetupWithManager(mgr ctrl.Manager) error { - // setup indexer - // example code: https://github.com/kubernetes-sigs/kubebuilder/blob/master/docs/book/src/cronjob-tutorial/testdata/project/controllers/cronjob_controller.go#L548 - mgr.GetFieldIndexer().IndexField(context.Background(), &v1.AntstorVolume{}, v1.IndexKeyUUID, func(rawObj client.Object) []string { - // grab the volume, extract the uuid - if vol, ok := rawObj.(*v1.AntstorVolume); ok { - return []string{vol.Spec.Uuid} - } - return nil - }) +func (rh *AntstorVolumeReconcileHandler) GetSetupWithManagerFn() SetupWithManagerFn { + return func(r reconcile.Reconciler, mgr ctrl.Manager) error { + // setup indexer + // example code: https://github.com/kubernetes-sigs/kubebuilder/blob/master/docs/book/src/cronjob-tutorial/testdata/project/controllers/cronjob_controller.go#L548 + mgr.GetFieldIndexer().IndexField(context.Background(), &v1.AntstorVolume{}, v1.IndexKeyUUID, func(rawObj client.Object) []string { + // grab the volume, extract the uuid + if vol, ok := rawObj.(*v1.AntstorVolume); ok { + return []string{vol.Spec.Uuid} + } + return nil + }) - mgr.GetFieldIndexer().IndexField(context.Background(), &v1.AntstorVolume{}, v1.IndexKeyTargetNodeID, func(rawObj client.Object) []string { - // grab the volume, extract the targetNodeId - if vol, ok := rawObj.(*v1.AntstorVolume); ok { - return []string{vol.Spec.TargetNodeId} - } - return nil - }) - - return ctrl.NewControllerManagedBy(mgr). - WithOptions(controller.Options{ - MaxConcurrentReconciles: 1, - }). - For(&v1.AntstorVolume{}). - Watches(&source.Kind{Type: &v1.AntstorVolume{}}, &handler.VolumeEventHandler{ - State: r.State, - }). - Complete(r) + mgr.GetFieldIndexer().IndexField(context.Background(), &v1.AntstorVolume{}, v1.IndexKeyTargetNodeID, func(rawObj client.Object) []string { + // grab the volume, extract the targetNodeId + if vol, ok := rawObj.(*v1.AntstorVolume); ok { + return []string{vol.Spec.TargetNodeId} + } + return nil + }) + + return ctrl.NewControllerManagedBy(mgr). + WithOptions(controller.Options{ + MaxConcurrentReconciles: 1, + }). + For(&v1.AntstorVolume{}). + Watches(&source.Kind{Type: &v1.AntstorVolume{}}, &handler.VolumeEventHandler{ + State: rh.State, + }). + Complete(r) + } +} + +func (r *AntstorVolumeReconcileHandler) ResourceName() string { + return "AntstorVolume" } -func (r *AntstorVolumeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { +func (r *AntstorVolumeReconcileHandler) GetObject(req plugin.RequestContent) (obj runtime.Object, err error) { + var vol v1.AntstorVolume + err = r.Client.Get(req.Ctx, req.Request.NamespacedName, &vol) + return &vol, err +} + +func (r *AntstorVolumeReconcileHandler) HandleDeletion(pCtx *plugin.Context) (result plugin.Result) { + result = r.handleDeletion(pCtx) + return +} + +func (r *AntstorVolumeReconcileHandler) HandleReconcile(pCtx *plugin.Context) (result plugin.Result) { var ( - log = r.Log.WithValues("AntstorVolume", req.NamespacedName) - volume v1.AntstorVolume + ctx = pCtx.ReqCtx.Ctx + log = pCtx.Log + volume *v1.AntstorVolume + ok bool ) - - // get volume - if err := r.Get(ctx, req.NamespacedName, &volume); err != nil { - // When user deleted a volume, a request will be recieved. - // However the volume does not exists. Therefore the code goes to here - log.Error(err, "unable to fetch Volume") - // we'll ignore not-found errors, since they can't be fixed by an immediate - // requeue (we'll need to wait for a new notification), and we can get them - // on deleted requests. - return ctrl.Result{}, client.IgnoreNotFound(err) + if volume, ok = pCtx.ReqCtx.Object.(*v1.AntstorVolume); !ok { + result.Error = fmt.Errorf("object is not *v1.AntstorVolume, %#v", pCtx.ReqCtx.Object) + return } log.Info("Start handle Volume", "status", volume.Status) - // check if stop reconcling if volume.Spec.StopReconcile { - return ctrl.Result{}, nil - } - - var ( - result plugin.Result - pCtx = &plugin.Context{ - Client: r.Client, - KubeCli: r.KubeCli, - Ctx: ctx, - Object: &volume, - Request: req, - Log: log, - State: r.State, - } - ) - - // handle deletion request - if volume.DeletionTimestamp != nil { - // run plugins - for _, p := range r.Plugable.Plugins() { - p.HandleDeletion(pCtx) - } - result = r.handleDeletion(ctx, volume, log) - return result.Result, result.Error + return } // validate and mutate volume result = r.validateAndMutate(ctx, volume, log) if result.NeedBreak() { - return result.Result, result.Error - } - - // run plugins - for _, p := range r.Plugable.Plugins() { - result = p.Reconcile(pCtx) - if result.NeedBreak() { - return result.Result, result.Error - } + return } // schedule volume result = r.scheduleVolume(ctx, volume, log) if result.NeedBreak() { - return result.Result, result.Error + return } - return ctrl.Result{}, nil + return } -func (r *AntstorVolumeReconciler) handleDeletion(ctx context.Context, volume v1.AntstorVolume, log logr.Logger) (result plugin.Result) { - var err error +func (r *AntstorVolumeReconcileHandler) handleDeletion(pCtx *plugin.Context) (result plugin.Result) { + var ( + err error + ctx = pCtx.ReqCtx.Ctx + log = pCtx.Log + volume *v1.AntstorVolume + ok bool + ) + if volume, ok = pCtx.ReqCtx.Object.(*v1.AntstorVolume); !ok { + err = fmt.Errorf("object is not *v1.AntstorVolume, %#v", pCtx.ReqCtx.Object) + return + } + // 1. if there is only InStateFinalizer Finalizer left, unbind and delete volume if len(volume.Finalizers) == 1 && misc.Contains(volume.Finalizers, v1.InStateFinalizer) { // remove from state @@ -161,7 +150,7 @@ func (r *AntstorVolumeReconciler) handleDeletion(ctx context.Context, volume v1. // remove all Finalizers volume.Finalizers = []string{} - err = r.Client.Update(ctx, &volume) + err = r.Client.Update(ctx, volume) if err != nil { log.Error(err, "remove Finalizers failed") } @@ -180,7 +169,7 @@ func (r *AntstorVolumeReconciler) handleDeletion(ctx context.Context, volume v1. if targetPool == nil { log.Info("cannot find pool in APIServer, remove volume Finalizers", "nodeid", volume.Spec.TargetNodeId, "UnbindAntstorVolume error", r.State.UnbindAntstorVolume(volume.Spec.Uuid)) volume.Finalizers = []string{} - err = r.Client.Update(ctx, &volume) + err = r.Client.Update(ctx, volume) if err != nil { log.Error(err, "remove Finalizers failed") } @@ -210,7 +199,7 @@ func (r *AntstorVolumeReconciler) handleDeletion(ctx context.Context, volume v1. } volume.Finalizers = finalizers - err = r.Client.Update(ctx, &volume) + err = r.Client.Update(ctx, volume) if err != nil { log.Error(err, "update volume Finalizer failed") } @@ -223,7 +212,7 @@ func (r *AntstorVolumeReconciler) handleDeletion(ctx context.Context, volume v1. } } -func (r *AntstorVolumeReconciler) validateAndMutate(ctx context.Context, volume v1.AntstorVolume, log logr.Logger) (result plugin.Result) { +func (r *AntstorVolumeReconcileHandler) validateAndMutate(ctx context.Context, volume *v1.AntstorVolume, log logr.Logger) (result plugin.Result) { if volume.Spec.Uuid == "" { log.Error(nil, "volume uuid is empty, break reconcilinig") return plugin.Result{ @@ -245,7 +234,7 @@ func (r *AntstorVolumeReconciler) validateAndMutate(ctx context.Context, volume if !foundUuidLabel && volume.Spec.Uuid != "" { volume.Labels[v1.UuidLabelKey] = volume.Spec.Uuid // after adding Label, start a new reconciling - err := r.Client.Patch(context.Background(), &volume, patch) + err := r.Client.Patch(context.Background(), volume, patch) if err != nil { log.Error(err, "add uuid label failed") return plugin.Result{ @@ -260,7 +249,7 @@ func (r *AntstorVolumeReconciler) validateAndMutate(ctx context.Context, volume if volume.Status.Status == "" { volume.Status.Status = v1.VolumeStatusCreating - err := r.Status().Update(ctx, &volume) + err := r.Status().Update(ctx, volume) return plugin.Result{ Break: true, Error: err, @@ -271,7 +260,7 @@ func (r *AntstorVolumeReconciler) validateAndMutate(ctx context.Context, volume if val, has := volume.Annotations[v1.LvLayoutAnnoKey]; has { if !misc.InSliceString(val, []string{"", string(v1.LVLayoutLinear), string(v1.LVLayoutStriped)}) { volume.Status.Message = fmt.Sprintf("invalide value of Anno obnvmf/lv-layout=%s", val) - err := r.Client.Status().Update(context.Background(), &volume) + err := r.Client.Status().Update(context.Background(), volume) if err != nil { log.Error(err, "update status message failed") return plugin.Result{ @@ -291,7 +280,7 @@ func (r *AntstorVolumeReconciler) validateAndMutate(ctx context.Context, volume // volume was scheduled to TargetNodeId, check if Pool is in state // BindAntstorVolume one volume twice will not return error log.Info("bind volume to node", "nodeId", volume.Spec.TargetNodeId) - err = stateObj.BindAntstorVolume(volume.Spec.TargetNodeId, &volume) + err = stateObj.BindAntstorVolume(volume.Spec.TargetNodeId, volume) if err != nil { log.Error(err, "binding volume failed") return plugin.Result{ @@ -320,7 +309,7 @@ func (r *AntstorVolumeReconciler) validateAndMutate(ctx context.Context, volume // update Volume if updated { - err = r.Client.Patch(context.Background(), &volume, patch) + err = r.Client.Patch(context.Background(), volume, patch) if err != nil { log.Error(err, "patch volume failed") return plugin.Result{ @@ -333,7 +322,7 @@ func (r *AntstorVolumeReconciler) validateAndMutate(ctx context.Context, volume return plugin.Result{} } -func (r *AntstorVolumeReconciler) scheduleVolume(ctx context.Context, volume v1.AntstorVolume, log logr.Logger) (result plugin.Result) { +func (r *AntstorVolumeReconcileHandler) scheduleVolume(ctx context.Context, volume *v1.AntstorVolume, log logr.Logger) (result plugin.Result) { var ( scheduler = r.Scheduler stateObj = r.State @@ -352,7 +341,7 @@ func (r *AntstorVolumeReconciler) scheduleVolume(ctx context.Context, volume v1. if err == nil && boundVol != nil && boundVol.Spec.TargetNodeId != "" { log.Info("volume is already scheduled and bind to node", "nodeId", boundVol.Spec.TargetNodeId) volume.Spec.TargetNodeId = boundVol.Spec.TargetNodeId - err = r.Client.Update(ctx, &volume) + err = r.Client.Update(ctx, volume) if err != nil { log.Error(err, "persist binding of volume failed") return plugin.Result{Error: err} @@ -364,12 +353,12 @@ func (r *AntstorVolumeReconciler) scheduleVolume(ctx context.Context, volume v1. } // do scehdule - nodeInfo, err = scheduler.ScheduleVolume(stateObj.GetAllNodes(), &volume) + nodeInfo, err = scheduler.ScheduleVolume(stateObj.GetAllNodes(), volume) if filter.IsNoStoragePoolAvailable(err) { if !strings.Contains(volume.Status.Message, filter.NoStoragePoolAvailable) { volume.Status.Message = err.Error() volume.Status.Status = v1.VolumeStatusCreating - errUpdate := r.Client.Status().Update(ctx, &volume) + errUpdate := r.Client.Status().Update(ctx, volume) if errUpdate != nil { log.Error(errUpdate, "update volume status failed") } @@ -393,7 +382,7 @@ func (r *AntstorVolumeReconciler) scheduleVolume(ctx context.Context, volume v1. // save binding to state log.Info("volume is scheduled to node", "nodeId", nodeInfo.ID) - err = stateObj.BindAntstorVolume(nodeInfo.ID, &volume) + err = stateObj.BindAntstorVolume(nodeInfo.ID, volume) if err != nil { log.Error(err, "bind volume to node failed", "nodeID", nodeInfo.ID) return plugin.Result{Error: err} @@ -407,7 +396,7 @@ func (r *AntstorVolumeReconciler) scheduleVolume(ctx context.Context, volume v1. volume.Spec.TargetNodeId = nodeInfo.ID volume.Labels[v1.TargetNodeIdLabelKey] = volume.Spec.TargetNodeId volume.Status.Status = v1.VolumeStatusCreating - err = r.Client.Patch(ctx, &volume, patch) + err = r.Client.Patch(ctx, volume, patch) if err != nil { log.Error(err, "patching volume failed") return plugin.Result{Error: err} diff --git a/pkg/controller/manager/scheduler/filter/localline.go b/pkg/controller/manager/scheduler/filter/localline.go new file mode 100644 index 0000000..cc87467 --- /dev/null +++ b/pkg/controller/manager/scheduler/filter/localline.go @@ -0,0 +1,31 @@ +package filter + +import ( + v1 "code.alipay.com/dbplatform/node-disk-controller/pkg/api/volume.antstor.alipay.com/v1" + "code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/state" + "k8s.io/klog/v2" +) + +const ( + ReasonLocalStorageTooLow = "LocalStorageTooLow" +) + +// MinLocalStorageFilterFunc ensures local storage cannot be less than 20% of total storage +func MinLocalStorageFilterFunc(ctx *FilterContext, n *state.Node, vol *v1.AntstorVolume) bool { + var err = ctx.Error + var isLocalVol = vol.Spec.HostNode.ID == n.Pool.Spec.NodeInfo.ID + var minLocalStoragePct = float64(ctx.Config.MinLocalStoragePct) + + if !isLocalVol { + allocRemotes := n.GetAllocatedRemoteBytes() + total := n.Pool.GetAvailableBytes() + localPct := float64(total-int64(allocRemotes)-int64(vol.Spec.SizeByte)) / float64(total) * 100 + if localPct < minLocalStoragePct { + klog.Infof("[SchedFail] vol=%s Pool %s local-storage pct too low (%f)", vol.Name, n.Pool.Name, localPct) + err.AddReason(ReasonLocalStorageTooLow) + return false + } + } + + return true +} diff --git a/pkg/controller/manager/scheduler/filter/registry.go b/pkg/controller/manager/scheduler/filter/registry.go index 2b1fc22..54fe493 100644 --- a/pkg/controller/manager/scheduler/filter/registry.go +++ b/pkg/controller/manager/scheduler/filter/registry.go @@ -14,6 +14,7 @@ var ( func init() { RegisterFilter("Basic", BasicFilterFunc) RegisterFilter("Affinity", AffinityFilterFunc) + RegisterFilter("MinLocalStorage", MinLocalStorageFilterFunc) } func RegisterFilter(name string, filter PredicateFunc) { diff --git a/pkg/controller/manager/scheduler/sched_vol_group.go b/pkg/controller/manager/scheduler/sched_vol_group.go index 94da30a..fc1b994 100644 --- a/pkg/controller/manager/scheduler/sched_vol_group.go +++ b/pkg/controller/manager/scheduler/sched_vol_group.go @@ -6,6 +6,7 @@ import ( "sort" v1 "code.alipay.com/dbplatform/node-disk-controller/pkg/api/volume.antstor.alipay.com/v1" + "code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/config" "code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/scheduler/filter" "code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/state" "code.alipay.com/dbplatform/node-disk-controller/pkg/util" @@ -15,16 +16,6 @@ import ( "k8s.io/klog/v2" ) -var ( - ExtraPickSizeFnMap = make(map[string]GetAllocatableVolumeSizeFn) -) - -type GetAllocatableVolumeSizeFn func(node *state.Node, volSize int64) (result int64) - -func RegisterVolumeGroupPickSizeFn(name string, fn GetAllocatableVolumeSizeFn) { - ExtraPickSizeFnMap[name] = fn -} - // ScheduleVolume return error if there is no StoragePool available func (s *scheduler) ScheduleVolumeGroup(allNodes []*state.Node, volGroup *v1.AntstorVolumeGroup) (err error) { var ( @@ -60,7 +51,7 @@ func (s *scheduler) ScheduleVolumeGroup(allNodes []*state.Node, volGroup *v1.Ant // node usage < empty threashold, set score to 0, last of the list sort.Sort(sort.Reverse(SortByStorage(qualified))) - err = schedVolGroup(qualified, volGroup) + err = schedVolGroup(s.cfg, qualified, volGroup) if err != nil { klog.Error(err) return @@ -71,7 +62,7 @@ func (s *scheduler) ScheduleVolumeGroup(allNodes []*state.Node, volGroup *v1.Ant return } -func schedVolGroup(nodes []*state.Node, volGroup *v1.AntstorVolumeGroup) (err error) { +func schedVolGroup(cfg config.Config, nodes []*state.Node, volGroup *v1.AntstorVolumeGroup) (err error) { var ( maxVolCnt = volGroup.Spec.DesiredVolumeSpec.CountRange.Max maxSize = volGroup.Spec.DesiredVolumeSpec.SizeRange.Max @@ -128,13 +119,10 @@ func schedVolGroup(nodes []*state.Node, volGroup *v1.AntstorVolumeGroup) (err er for _, item := range nodes { if !tgtNodeSet.Contains(item.Info.ID) { result = pickSizeFn(item, leftSize) - for name, extraFn := range ExtraPickSizeFnMap { - result = extraFn(item, result) - klog.Info("pickSize Fn %s, picked size %d on node %s", name, result, item.Info.ID) - if result == 0 { - break - } - } + // calculate allocatable bytes by min local line + result = getAllocatableRemoteVolumeSize(item, result, float64(cfg.Scheduler.MinLocalStoragePct)) + klog.Info("getAllocatableRemoteVolumeSize picked size %d on node %s", result, item.Info.ID) + // success if result > 0 { volGroup.Spec.Volumes[idx].Size = result @@ -165,13 +153,10 @@ func schedVolGroup(nodes []*state.Node, volGroup *v1.AntstorVolumeGroup) (err er for _, item := range nodes { if !tgtNodeSet.Contains(item.Info.ID) { result = pickSizeFn(item, leftSize) - for name, extraFn := range ExtraPickSizeFnMap { - result = extraFn(item, result) - klog.Info("pickSize Fn %s, picked size %d on node %s", name, result, item.Info.ID) - if result == 0 { - break - } - } + // calculate allocatable bytes by min local line + result = getAllocatableRemoteVolumeSize(item, result, float64(cfg.Scheduler.MinLocalStoragePct)) + klog.Info("getAllocatableRemoteVolumeSize picked size %d on node %s", result, item.Info.ID) + // success if result > 0 { tgtNodeSet.Add(item.Info.ID) @@ -278,3 +263,27 @@ func (p SortByStorage) Less(i, j int) bool { func (p SortByStorage) Swap(i, j int) { p[i], p[j] = p[j], p[i] } + +func getAllocatableRemoteVolumeSize(node *state.Node, volSize int64, minLocalStoragePct float64) (result int64) { + result = volSize + if result == 0 { + return + } + if node != nil { + allocRemotes := node.GetAllocatedRemoteBytes() + total := node.Pool.GetAvailableBytes() + // maxResultSize := int64(float64(total)*(100-minLocalStoragePct)*100) - int64(allocRemotes) + maxResultSize := total - int64(float64(total)*minLocalStoragePct/100) - int64(allocRemotes) + // cannot allocate remote volume + if maxResultSize < 0 { + return 0 + } + + if int64(maxResultSize) < result { + result = int64(maxResultSize) + } + } + + result = result / util.FourMiB * util.FourMiB + return +}