From e116c0c150649b715fb3dc59a7ee83d33a029823 Mon Sep 17 00:00:00 2001 From: Amit Roushan Date: Thu, 2 Sep 2021 22:25:52 +0530 Subject: [PATCH] added code changes for protocol topological awarness --- src/csi/backend/backend.go | 106 +++++++++++++++++++++++--------- src/csi/driver/controller.go | 58 +++++++++-------- src/csi/main.go | 4 +- src/utils/k8sutils/k8s_utils.go | 94 ++++++++++++++++++++++++++++ src/utils/k8sutils/k8sutils.go | 75 ---------------------- 5 files changed, 204 insertions(+), 133 deletions(-) create mode 100644 src/utils/k8sutils/k8s_utils.go delete mode 100644 src/utils/k8sutils/k8sutils.go diff --git a/src/csi/backend/backend.go b/src/csi/backend/backend.go index 314f0b58..6d192f43 100644 --- a/src/csi/backend/backend.go +++ b/src/csi/backend/backend.go @@ -8,12 +8,15 @@ import ( "regexp" "sync" "utils" + "utils/k8sutils" "utils/log" ) const ( + // TopologyRequirement constant for topology filter function TopologyRequirement = "topologyRequirement" - SupportedTopologies = "supportedTopologies" + // supported topology key in CSI plugin configuration + supportedTopologiesKey = "supportedTopologies" ) var ( @@ -39,6 +42,7 @@ var ( } ) +// AccessibleTopology represents selected node topology type AccessibleTopology struct { RequisiteTopologies []map[string]string PreferredTopologies []map[string]string @@ -122,26 +126,10 @@ func newBackend(backendName string, config map[string]interface{}) (*Backend, er return nil, errors.New("parameters must be configured for backend") } - supportedTopologies := make([]map[string]string, 0) - if topologies, exist := config[SupportedTopologies]; exist { - topologyArray, ok := topologies.([]interface{}) - if !ok { - return nil, errors.New("invalid supported topologies configuration") - } - for _, topologyArrElem := range topologyArray { - topologyMap, ok := topologyArrElem.(map[string]interface{}) - if !ok { - return nil, errors.New("invalid supported topologies configuration") - } - tempMap := make(map[string]string, 0) - for topologyKey, value := range topologyMap { - if topologyValue, ok := value.(string); ok { - tempMap[topologyKey] = topologyValue - } - } - - supportedTopologies = append(supportedTopologies, tempMap) - } + // Get supported topologies for backend + supportedTopologies, err := getSupportedTopologies(config) + if err != nil { + return nil, err } plugin := plugin.GetPlugin(storage) @@ -174,6 +162,50 @@ func newBackend(backendName string, config map[string]interface{}) (*Backend, er }, nil } +func getSupportedTopologies(config map[string]interface{}) ([]map[string]string, error) { + supportedTopologies := make([]map[string]string, 0) + + topologies, exist := config[supportedTopologiesKey] + if !exist { + return supportedTopologies, nil + } + + // populate configured topologies + topologyArray, ok := topologies.([]interface{}) + if !ok { + return nil, errors.New("invalid supported topologies configuration") + } + for _, topologyArrElem := range topologyArray { + topologyMap, ok := topologyArrElem.(map[string]interface{}) + if !ok { + return nil, errors.New("invalid supported topologies configuration") + } + tempMap := make(map[string]string, 0) + for topologyKey, value := range topologyMap { + if topologyValue, ok := value.(string); ok { + tempMap[topologyKey] = topologyValue + } + } + supportedTopologies = append(supportedTopologies, tempMap) + } + + return supportedTopologies, nil +} + +// addProtocolTopology add up protocol specific topological support +func addProtocolTopology(backend *Backend, driverName string) { + proto, protocolAvailable := backend.Parameters["protocol"] + if protocol, isString := proto.(string); protocolAvailable && isString { + backend.SupportedTopologies = append(backend.SupportedTopologies, map[string]string{ + k8sutils.TopologyPrefix + "/protocol." + protocol: driverName, + }) + return + } + + log.Warningf("supported topology for protocol may not work as protocol is miss configured " + + "in backend configuration") +} + func analyzeBackend(config map[string]interface{}) (*Backend, error) { backendName, exist := config["name"].(string) if !exist { @@ -245,7 +277,7 @@ func updateReplicaBackends() { } } -func RegisterBackend(backendConfigs []map[string]interface{}, keepLogin bool) error { +func RegisterBackend(backendConfigs []map[string]interface{}, keepLogin bool, driverName string) error { for _, i := range backendConfigs { backend, err := analyzeBackend(i) if err != nil { @@ -259,6 +291,18 @@ func RegisterBackend(backendConfigs []map[string]interface{}, keepLogin bool) er return err } + // Note: Protocol is considered as special topological parameter. The protocol topology + // is populated internally by plugin using protocol name. + // If configured protocol for backend is "iscsi", CSI plugin internally add + // topology.kubernetes.io/protocol.iscsi = csi.huawei.com in supportedTopologies. + // + // Now users can opt to provision volumes based on protocol by + // 1. Labeling kubernetes nodes with protocol specific label (ie topology.kubernetes.io/protocol.iscsi = csi.huawei.com) + // 2. Configure topology support in plugin + // 3. Configure protocol topology in allowedTopologies fo Storage class + // addProtocolTopology is called after backend plugin init as init takes care of protocol validation + addProtocolTopology(backend, driverName) + csiBackends[backend.Name] = backend } @@ -590,15 +634,17 @@ func filterPoolsOnTopology(candidatePools []*StoragePool, requisiteTopologies [] continue } - if len(backend.SupportedTopologies) > 0 { - for _, topology := range requisiteTopologies { - if isTopologySupportedByBackend(backend, topology) { - filteredPools = append(filteredPools, pool) - break - } - } - } else { + // when backend is not configured with supported topology + if len(backend.SupportedTopologies) == 0 { filteredPools = append(filteredPools, pool) + continue + } + + for _, topology := range requisiteTopologies { + if isTopologySupportedByBackend(backend, topology) { + filteredPools = append(filteredPools, pool) + break + } } } diff --git a/src/csi/driver/controller.go b/src/csi/driver/controller.go index 8c38ae17..274c1715 100644 --- a/src/csi/driver/controller.go +++ b/src/csi/driver/controller.go @@ -56,32 +56,8 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) } } - // Get topology requirements - accessibleTopology := req.GetAccessibilityRequirements() - if accessibleTopology != nil { - var requisiteTopologies = make([]map[string]string, 0) - for _, requisite := range accessibleTopology.GetRequisite() { - requirement := make(map[string]string) - for k, v := range requisite.GetSegments() { - requirement[k] = v - } - requisiteTopologies = append(requisiteTopologies, requirement) - } - - var preferredTopologies = make([]map[string]string, 0) - for _, preferred := range accessibleTopology.GetPreferred() { - preference := make(map[string]string) - for k, v := range preferred.GetSegments() { - preference[k] = v - } - preferredTopologies = append(preferredTopologies, preference) - } - - parameters[backend.TopologyRequirement] = backend.AccessibleTopology{ - RequisiteTopologies: requisiteTopologies, - PreferredTopologies: preferredTopologies, - } - } + // process accessibility requirements + d.processAccessibilityRequirements(req, parameters) localPool, remotePool, err := backend.SelectStoragePool(size, parameters) if err != nil { @@ -128,6 +104,36 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) }, nil } +func (d *Driver) processAccessibilityRequirements(req *csi.CreateVolumeRequest, parameters map[string]interface{}) { + accessibleTopology := req.GetAccessibilityRequirements() + if accessibleTopology == nil { + return + } + + var requisiteTopologies = make([]map[string]string, 0) + for _, requisite := range accessibleTopology.GetRequisite() { + requirement := make(map[string]string) + for k, v := range requisite.GetSegments() { + requirement[k] = v + } + requisiteTopologies = append(requisiteTopologies, requirement) + } + + var preferredTopologies = make([]map[string]string, 0) + for _, preferred := range accessibleTopology.GetPreferred() { + preference := make(map[string]string) + for k, v := range preferred.GetSegments() { + preference[k] = v + } + preferredTopologies = append(preferredTopologies, preference) + } + + parameters[backend.TopologyRequirement] = backend.AccessibleTopology{ + RequisiteTopologies: requisiteTopologies, + PreferredTopologies: preferredTopologies, + } +} + func (d *Driver) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) { volumeId := req.GetVolumeId() diff --git a/src/csi/main.go b/src/csi/main.go index 5f781d14..8f819f65 100644 --- a/src/csi/main.go +++ b/src/csi/main.go @@ -182,14 +182,14 @@ func main() { }() if *controller || *controllerFlagFile != "" { - err := backend.RegisterBackend(config.Backends, true) + err := backend.RegisterBackend(config.Backends, true, *driverName) if err != nil { log.Fatalf("Register backends error: %v", err) } go updateBackendCapabilities() } else { - err := backend.RegisterBackend(config.Backends, false) + err := backend.RegisterBackend(config.Backends, false, *driverName) if err != nil { log.Fatalf("Register backends error: %v", err) } diff --git a/src/utils/k8sutils/k8s_utils.go b/src/utils/k8sutils/k8s_utils.go new file mode 100644 index 00000000..8a38ec75 --- /dev/null +++ b/src/utils/k8sutils/k8s_utils.go @@ -0,0 +1,94 @@ +/* + Copyright (c) Huawei Technologies Co., Ltd. 2021-2021. All rights reserved. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +// Package k8sutils provides Kubernetes utilities +package k8sutils + +import ( + "fmt" + "regexp" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" +) + +const ( + // TopologyPrefix supported by CSI plugin + TopologyPrefix = "topology.kubernetes.io" + topologyRegx = TopologyPrefix + "/.*" +) + +// Interface is a kubernetes utility interface required by CSI plugin to interact with Kubernetes +type Interface interface { + // GetNodeTopology returns configured kubernetes node's topological labels + GetNodeTopology(nodeName string) (map[string]string, error) +} + +type kubeClient struct { + clientSet *kubernetes.Clientset +} + +// NewK8SUtils returns an object of Kubernetes utility interface +func NewK8SUtils(kubeConfig string) (Interface, error) { + var clientset *kubernetes.Clientset + + if kubeConfig != "" { + config, err := clientcmd.BuildConfigFromFlags("", kubeConfig) + if err != nil { + return nil, err + } + + clientset, err = kubernetes.NewForConfig(config) + if err != nil { + return nil, err + } + } else { + config, err := rest.InClusterConfig() + if err != nil { + return nil, err + } + + clientset, err = kubernetes.NewForConfig(config) + if err != nil { + return nil, err + } + } + + return &kubeClient{ + clientSet: clientset, + }, nil +} + +func (k *kubeClient) GetNodeTopology(nodeName string) (map[string]string, error) { + k8sNode, err := k.getNode(nodeName) + if err != nil { + return nil, fmt.Errorf("failed to get node topology with error: %v", err) + } + + topology := make(map[string]string) + for key, value := range k8sNode.Labels { + if match, err := regexp.MatchString(topologyRegx, key); err == nil && match { + topology[key] = value + } + } + + return topology, nil +} + +func (k *kubeClient) getNode(nodeName string) (*corev1.Node, error) { + return k.clientSet.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{}) +} diff --git a/src/utils/k8sutils/k8sutils.go b/src/utils/k8sutils/k8sutils.go deleted file mode 100644 index dff1deac..00000000 --- a/src/utils/k8sutils/k8sutils.go +++ /dev/null @@ -1,75 +0,0 @@ -package k8sutils - -import ( - "errors" - "fmt" - "regexp" - - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/clientcmd" -) - -const ( - topologyRegx = "topology.kubernetes.io/*" -) - -type Interface interface { - GetNodeTopology(nodeName string) (map[string]string, error) -} - -type KubeClient struct { - clientSet *kubernetes.Clientset -} - -func NewK8SUtils(kubeConfig string) (Interface, error) { - var clientset *kubernetes.Clientset - - if kubeConfig != "" { - config, err := clientcmd.BuildConfigFromFlags("", kubeConfig) - if err != nil { - return nil, err - } - - clientset, err = kubernetes.NewForConfig(config) - if err != nil { - return nil, err - } - } else { - config, err := rest.InClusterConfig() - if err != nil { - return nil, err - } - - clientset, err = kubernetes.NewForConfig(config) - if err != nil { - return nil, err - } - } - - return &KubeClient{ - clientSet: clientset, - }, nil -} - -func (k *KubeClient) GetNodeTopology(nodeName string) (map[string]string, error) { - k8sNode, err := k.getNode(nodeName) - if err != nil { - return nil, errors.New(fmt.Sprintf("failed to get node topology with error: %v", err)) - } - - topology := make(map[string]string) - for key, value := range k8sNode.Labels { - if match, err := regexp.MatchString(topologyRegx, key); err == nil && match { - topology[key] = value - } - } - - return topology, nil -} - -func (k *KubeClient) getNode(nodeName string) (*corev1.Node, error) { - return k.clientSet.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{}) -}