Skip to content

Commit

Permalink
Merge branch 'master-dev' into my-main
Browse files Browse the repository at this point in the history
  • Loading branch information
silentred committed Jan 24, 2024
2 parents 75a260f + f7a43f6 commit acdbfc7
Show file tree
Hide file tree
Showing 14 changed files with 234 additions and 48 deletions.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,10 @@ However, it is important to note that LiteIO does not currently support data rep

- [x] Disk-Agent exposes metric service
- [ ] SPDK volume replica


## Contact

Wechat Group QRCode

![Wechat Group](doc/image/wechat_group.JPG)
65 changes: 36 additions & 29 deletions cmd/controller/antplugins/localstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"fmt"
"strconv"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

v1 "code.alipay.com/dbplatform/node-disk-controller/pkg/api/volume.antstor.alipay.com/v1"
"code.alipay.com/dbplatform/node-disk-controller/pkg/controller/kubeutil"
Expand Down Expand Up @@ -41,6 +43,7 @@ func NewReportLocalStoragePlugin(h *controllers.PluginHandle) (p plugin.Plugin,
}

p = &ReportLocalStoragePlugin{
Client: h.Client,
NodeUpdater: kubeutil.NewKubeNodeInfoGetter(h.Req.KubeCli),
PoolUtil: kubeutil.NewStoragePoolUtil(h.Client),
ReportLocalConfigs: pluginCfg.DefaultLocalSpaceRules,
Expand All @@ -50,7 +53,7 @@ func NewReportLocalStoragePlugin(h *controllers.PluginHandle) (p plugin.Plugin,

// ReportLocalStoragePlugin is a AntstorVolume plugin.
type ReportLocalStoragePlugin struct {
// NodeGetter kubeutil.NodeInfoGetterIface
Client client.Client
NodeUpdater kubeutil.NodeUpdaterIface
PoolUtil kubeutil.StoragePoolUpdater

Expand Down Expand Up @@ -96,18 +99,44 @@ func (r *ReportLocalStoragePlugin) Reconcile(ctx *plugin.Context) (result plugin

// report the local storage when the StoragePool is created in the first place.
if isPool && pool != nil {
totalBs := pool.GetAvailableBytes()
if _, has := pool.Labels[v1.PoolLocalStorageBytesKey]; !has {
log.Info("update node/status capacity", "local-storage", totalBs)
// update Pool Label "obnvmf/node-local-storage-size" = totalBs
err = r.PoolUtil.SavePoolLocalStorageMark(pool, uint64(totalBs))
var (
localBS uint64
node corev1.Node
snode *state.Node
hasNodeRes bool
hasPoolLabel bool
)

// calculate local storage
snode, err = stateObj.GetNodeByNodeID(pool.Name)
if err != nil {
log.Error(err, "find node failed")
return plugin.Result{Error: err}
}
localBS = CalculateLocalStorageCapacity(snode)

// get node
err = r.Client.Get(ctx.ReqCtx.Ctx, client.ObjectKey{Name: pool.Name}, &node)
if err != nil {
log.Error(err, "getting Node failed")
return plugin.Result{Error: err}
}

_, hasNodeRes = node.Status.Allocatable[kubeutil.SdsLocalStorageResourceKey]
_, hasPoolLabel = pool.Labels[v1.PoolLocalStorageBytesKey]
log.Info("check pool PoolLocalStorageBytesKey and node SdsLocalStorageResourceKey", "nodeResource", hasNodeRes, "hasPoolLabel", hasPoolLabel)

if !hasPoolLabel || !hasNodeRes {
log.Info("update node/status capacity", "local-storage", localBS)
// update Pool Label "obnvmf/local-storage-bytes" = totalBs
err = r.PoolUtil.SavePoolLocalStorageMark(pool, localBS)
if err != nil {
log.Error(err, "SavePoolLocalStorageMark failed")
return plugin.Result{Error: err}
}

// update node/status capacity = totalBs
_, err = r.NodeUpdater.ReportLocalDiskResource(pool.Name, uint64(totalBs))
_, err = r.NodeUpdater.ReportLocalDiskResource(pool.Name, localBS)
if err != nil {
log.Error(err, "ReportLocalDiskResource failed")
return plugin.Result{Error: err}
Expand All @@ -126,28 +155,6 @@ func (r *ReportLocalStoragePlugin) Reconcile(ctx *plugin.Context) (result plugin
}
var sp = node.Pool

/*
var localStorePct int
var volInState *v1.AntstorVolume
for _, item := range r.ReportLocalConfigs {
selector, err := metav1.LabelSelectorAsSelector(&item.LabelSelector)
if err != nil {
log.Error(err, "LabelSelectorAsSelector failed", "selector", item.LabelSelector)
continue
}
if selector.Matches(labels.Set(sp.Spec.NodeInfo.Labels)) && item.EnableDefault {
localStorePct = item.DefaultLocalStoragePct
log.Info("matched local-storage percentage", "pct", localStorePct)
}
}
volInState, err = node.GetVolumeByID(volume.Spec.Uuid)
if err == nil {
log.Info("copy volume into state")
*volInState = *volume
}
*/

var expectLocalSize = CalculateLocalStorageCapacity(node)
var localSizeStr = strconv.Itoa(int(expectLocalSize))
log.Info("compare local storage size", "in label", sp.Labels[v1.PoolLocalStorageBytesKey], "expect", localSizeStr, "delTS", volume.DeletionTimestamp)
Expand Down
Binary file added doc/image/wechat_group.JPG
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
3 changes: 3 additions & 0 deletions hack/deploy/lvm/050-configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ data:
nodeTaints:
- key: node.sigma.ali/lifecycle
operator: Exists
#nodeReservations:
#- id: obnvmf/app-vol
# size: 107374182400 # 100Gi
pluginConfigs:
defaultLocalSpaceRules:
- enableDefault: true
Expand Down
7 changes: 7 additions & 0 deletions pkg/controller/manager/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ type SchedulerConfig struct {
NodeCacheSelector map[string]string `json:"nodeCacheSelector" yaml:"nodeCacheSelector"`
// MinLocalStoragePct defines the minimun percentage of local storage to be reserved on one node.
MinLocalStoragePct int `json:"minLocalStoragePct" yaml:"minLocalStoragePct"`
// NodeReservations defines the reservations on each node
NodeReservations []NodeReservation `json:"nodeReservations" yaml:"nodeReservations"`
}

type NodeReservation struct {
ID string `json:"id" yaml:"id"`
Size int64 `json:"size" yaml:"size"`
}

type NoScheduleConfig struct {
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/manager/controllers/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ func NewAndInitControllerManager(req NewManagerRequest) manager.Manager {
Concurrency: 4,
MainHandler: &reconciler.StoragePoolReconcileHandler{
Client: mgr.GetClient(),
Cfg: req.ControllerConfig,
State: stateObj,
PoolUtil: poolUtil,
KubeCli: kubeClient,
Expand Down
16 changes: 16 additions & 0 deletions pkg/controller/manager/reconciler/pool_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

v1 "code.alipay.com/dbplatform/node-disk-controller/pkg/api/volume.antstor.alipay.com/v1"
"code.alipay.com/dbplatform/node-disk-controller/pkg/controller/kubeutil"
"code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/config"
"code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/reconciler/plugin"
"code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/state"
"code.alipay.com/dbplatform/node-disk-controller/pkg/util/misc"
Expand All @@ -39,6 +40,7 @@ var (
type StoragePoolReconcileHandler struct {
client.Client

Cfg config.Config
State state.StateIface
PoolUtil kubeutil.StoragePoolUpdater
KubeCli kubernetes.Interface
Expand Down Expand Up @@ -227,6 +229,7 @@ func (r *StoragePoolReconcileHandler) handleDeletion(pCtx *plugin.Context) (resu
func (r *StoragePoolReconcileHandler) saveToState(sp *v1.StoragePool, log logr.Logger) (result plugin.Result) {
var patch = client.MergeFrom(sp.DeepCopy())
var err error
var node *state.Node

r.State.SetStoragePool(sp)

Expand All @@ -244,6 +247,19 @@ func (r *StoragePoolReconcileHandler) saveToState(sp *v1.StoragePool, log logr.L
}
}

// try to add reservation by config
node, err = r.State.GetNodeByNodeID(sp.Name)
if err != nil {
log.Error(err, "GetNodeByNodeID error")
}
for _, item := range r.Cfg.Scheduler.NodeReservations {
if node != nil {
if _, has := node.GetReservation(item.ID); !has {
node.Reserve(state.NewReservation(item.ID, item.Size))
}
}
}

return plugin.Result{}
}

Expand Down
25 changes: 25 additions & 0 deletions pkg/controller/manager/scheduler/filter/basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ func BasicFilterFunc(ctx *FilterContext, n *state.Node, vol *v1.AntstorVolume) b
)
// check if err is nil

// if voume matches reservation, then do not do following checks
if pass, hasErr := matchReservationFilter(ctx, n, vol); hasErr || pass {
return pass
}

// consider Pool FreeSpace
var freeRes = n.GetFreeResourceNonLock()
var freeDisk = freeRes[v1.ResourceDiskPoolByte]
Expand Down Expand Up @@ -100,3 +105,23 @@ func BasicFilterFunc(ctx *FilterContext, n *state.Node, vol *v1.AntstorVolume) b

return true
}

func matchReservationFilter(ctx *FilterContext, n *state.Node, vol *v1.AntstorVolume) (pass, hasError bool) {
if resvId, has := vol.Annotations[v1.ReservationIDKey]; has {
free := n.FreeResource.Storage()
if free.CmpInt64(0) < 0 {
ctx.Error.AddReason(ReasonPoolFreeSize)
return false, true
}

if r, has := n.GetReservation(resvId); has {
if r.Size() < int64(vol.Spec.SizeByte) {
ctx.Error.AddReason(ReasonReservationSize)
return false, true
}
return true, false
}
}

return false, false
}
1 change: 1 addition & 0 deletions pkg/controller/manager/scheduler/filter/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const (
ReasonNodeAffinity = "NodeAffinity"
ReasonPoolAffinity = "PoolAffinity"
ReasonPoolUnschedulable = "PoolUnschedulable"
ReasonReservationSize = "ReservationTooSmall"

NoStoragePoolAvailable = "NoStoragePoolAvailable"
//
Expand Down
43 changes: 33 additions & 10 deletions pkg/controller/manager/state/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,31 @@ type NodeStateAPI struct {
KernelLVM *v1.KernelLVM `json:"kernelLVM,omitempty"`
SpdkLVS *v1.SpdkLVStore `json:"spdkLVS,omitempty"`
// Volumes breif info
Volumes []VolumeBreif `json:"volumes"`
// FreeSize of the pool
FreeSize int64 `json:"freeSize"`
Volumes []VolumeBrief `json:"volumes"`
// VgFreeSize of the pool
VgFreeSize int64 `json:"vgFreeSize"`
// MemFreeSize in controller memory
MemFreeSize int64 `json:"memFreeSize"`
// MemFreeSizeStr readable size in controller memory
MemFreeSizeStr string `json:"memFreeSizeStr"`
// Conditions of the pool status
Conditions map[v1.PoolConditionType]v1.ConditionStatus `json:"conditions"`
// Resvervations on the node
Resvervations []ReservationBreif `json:"reservations"`
}

type VolumeBreif struct {
type VolumeBrief struct {
Namespace string `json:"ns"`
Name string `json:"name"`
DataHolder string `json:"dataHolder"`
Size int64 `json:"size"`
}

type ReservationBreif struct {
ID string `json:"id"`
Size int64 `json:"size"`
}

func NewStateHandler(s StateIface) *StateHandler {
return &StateHandler{state: s}
}
Expand All @@ -49,11 +60,14 @@ func (h *StateHandler) ServeHTTP(writer http.ResponseWriter, req *http.Request)
}

var api = NodeStateAPI{
Name: spName,
PoolLabels: node.Pool.Labels,
KernelLVM: &node.Pool.Spec.KernelLVM,
SpdkLVS: &node.Pool.Spec.SpdkLVStore,
FreeSize: node.Pool.Status.VGFreeSize.Value(),
Name: spName,
PoolLabels: node.Pool.Labels,
KernelLVM: &node.Pool.Spec.KernelLVM,
SpdkLVS: &node.Pool.Spec.SpdkLVStore,
VgFreeSize: node.Pool.Status.VGFreeSize.Value(),
MemFreeSize: int64(node.FreeResource.Storage().AsApproximateFloat64()),
MemFreeSizeStr: node.FreeResource.Storage().String(),

Conditions: make(map[v1.PoolConditionType]v1.ConditionStatus),
}

Expand All @@ -62,14 +76,23 @@ func (h *StateHandler) ServeHTTP(writer http.ResponseWriter, req *http.Request)
}

for _, vol := range node.Volumes {
api.Volumes = append(api.Volumes, VolumeBreif{
api.Volumes = append(api.Volumes, VolumeBrief{
Namespace: vol.Namespace,
Name: vol.Name,
Size: int64(vol.Spec.SizeByte),
DataHolder: vol.Labels[v1.VolumeDataHolderKey],
})
}

if node.resvSet != nil {
for _, resv := range node.resvSet.Items() {
api.Resvervations = append(api.Resvervations, ReservationBreif{
ID: resv.ID(),
Size: resv.Size(),
})
}
}

bs, err := json.Marshal(api)
if err != nil {
writer.Write([]byte(err.Error()))
Expand Down
46 changes: 38 additions & 8 deletions pkg/controller/manager/state/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ func (n *Node) AddVolume(vol *v1.AntstorVolume) (err error) {
defer n.volLock.Unlock()

var nodeID = n.Info.ID
var duplicate bool

// delete reservation if volume has reservation id
if resvID := getVolumeReservationID(vol); resvID != "" {
n.resvSet.Unreserve(resvID)
}

// check duplicate
for _, item := range n.Volumes {
if item.Name == vol.Name {
Expand All @@ -77,19 +84,16 @@ func (n *Node) AddVolume(vol *v1.AntstorVolume) (err error) {
// save the newer volume
*item = *vol.DeepCopy()
klog.Infof("vol %s already in node %s. type and sizes equal to each other", vol.Name, nodeID)
return
duplicate = true
}
}

// delete reservation if volume has reservation id
if resvID := getVolumeReservationID(vol); resvID != "" {
n.resvSet.Unreserve(resvID)
if !duplicate {
// volume reside on Node
vol.Spec.TargetNodeId = n.Pool.Spec.NodeInfo.ID
n.Volumes = append(n.Volumes, vol)
}

n.Volumes = append(n.Volumes, vol)
// volume reside on Node
vol.Spec.TargetNodeId = n.Pool.Spec.NodeInfo.ID

// update free resource
n.FreeResource = n.GetFreeResourceNonLock()

Expand Down Expand Up @@ -231,14 +235,40 @@ func (n *Node) GetFreeResourceNonLock() (free corev1.ResourceList) {

// Reserve storage resource for Node
func (n *Node) Reserve(r ReservationIface) {
// if volume is already binded, then skip reservation.
var resvID = r.ID()
for _, vol := range n.Volumes {
if resvID == getVolumeReservationID(vol) {
return
}
}

// check free resource
if free := n.FreeResource.Storage(); free != nil {
if free.CmpInt64(r.Size()) < 0 {
klog.Errorf("node %s have no enough disk pool space for reservation %s", n.Info.ID, resvID)
return
}
}

n.volLock.Lock()
defer n.volLock.Unlock()

n.resvSet.Reserve(r)
// update free resource
n.FreeResource = n.GetFreeResourceNonLock()
}

// Unreserve storage resource
func (n *Node) Unreserve(id string) {
n.volLock.Lock()
defer n.volLock.Unlock()

n.resvSet.Unreserve(id)
// update free resource
n.FreeResource = n.GetFreeResourceNonLock()
}

func (n *Node) GetReservation(id string) (r ReservationIface, has bool) {
return n.resvSet.GetById(id)
}
Loading

0 comments on commit acdbfc7

Please sign in to comment.