Skip to content

Commit

Permalink
csi: refresh monitor IP
Browse files Browse the repository at this point in the history
As part of status-report update the
monitor IP for the connected provider cluster.

Signed-off-by: Madhu Rajanna <[email protected]>
  • Loading branch information
Madhu-1 committed Jan 23, 2023
1 parent 3c83672 commit 7aae1c7
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 32 deletions.
20 changes: 5 additions & 15 deletions controllers/storageclassclaim_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"encoding/json"
"fmt"
"reflect"
"strings"
"time"

v1alpha1 "github.com/red-hat-storage/ocs-client-operator/api/v1alpha1"
Expand Down Expand Up @@ -298,20 +297,11 @@ func (r *StorageClassClaimReconciler) reconcilePhases() (reconcile.Result, error
}
for _, eResource := range scResponse.ExternalResource {
if eResource.Kind == "ConfigMap" && eResource.Name == "rook-ceph-mon-endpoints" {
data := map[string]string{}
err = json.Unmarshal(eResource.Data, &data)
monitorIps, err := csi.ExtractMonitor(eResource.Data)
if err != nil {
return reconcile.Result{}, fmt.Errorf("failed to unmarshal data: %v", err)
}
// Ip will be in the format of "b=172.30.60.238:6789","c=172.30.162.124:6789","a=172.30.1.100:6789"
monIPs := strings.Split(data["data"], ",")
for _, monIP := range monIPs {
ip := strings.Split(monIP, "=")
if len(ip) != 2 {
return reconcile.Result{}, fmt.Errorf("invalid mon ips: %s", monIPs)
}
csiClusterConfigEntry.Monitors = append(csiClusterConfigEntry.Monitors, ip[1])
return reconcile.Result{}, fmt.Errorf("failed to extract monitor data: %v", err)
}
csiClusterConfigEntry.Monitors = append(csiClusterConfigEntry.Monitors, monitorIps...)
}
}
// Go over the received objects and operate on them accordingly.
Expand Down Expand Up @@ -389,7 +379,7 @@ func (r *StorageClassClaimReconciler) reconcilePhases() (reconcile.Result, error
}

// update monitor configuration for cephcsi
err = csi.UpdateMonConfigMap(r.ctx, r.Client, r.log, r.storageClassClaim.Name, csiClusterConfigEntry)
err = csi.UpdateMonConfigMap(r.ctx, r.Client, r.log, r.storageClassClaim.Name, r.storageClient.Status.ConsumerID, csiClusterConfigEntry)
if err != nil {
return reconcile.Result{}, fmt.Errorf("failed to update mon configmap: %v", err)
}
Expand Down Expand Up @@ -422,7 +412,7 @@ func (r *StorageClassClaimReconciler) reconcilePhases() (reconcile.Result, error
}

// Delete configmap entry for cephcsi
err = csi.UpdateMonConfigMap(r.ctx, r.Client, r.log, r.storageClassClaim.Name, nil)
err = csi.UpdateMonConfigMap(r.ctx, r.Client, r.log, r.storageClassClaim.Name, r.storageClient.Status.ConsumerID, nil)
if err != nil {
return reconcile.Result{}, fmt.Errorf("failed to update mon configmap: %v", err)
}
Expand Down
39 changes: 30 additions & 9 deletions csi/monconfigmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package csi
import (
"context"
"encoding/json"
"fmt"
"strings"
"sync"

"github.com/pkg/errors"
Expand All @@ -40,9 +42,10 @@ var (
)

type ClusterConfigEntry struct {
ClusterID string `json:"clusterID"`
Monitors []string `json:"monitors"`
CephFS *CephFSSpec `json:"cephFS,omitempty"`
ClusterID string `json:"clusterID"`
StorageClientID string `json:"storageClientID"`
Monitors []string `json:"monitors"`
CephFS *CephFSSpec `json:"cephFS,omitempty"`
}

type CephFSSpec struct {
Expand Down Expand Up @@ -70,7 +73,7 @@ func formatCsiClusterConfig(cc csiClusterConfig) (string, error) {

// updateCsiClusterConfig returns a json-formatted string containing
// the cluster-to-mon mapping required to configure ceph csi.
func updateCsiClusterConfig(curr, clusterKey string, newClusterConfigEntry *ClusterConfigEntry) (string, error) {
func updateCsiClusterConfig(curr, clusterKey, storageClientID string, newClusterConfigEntry *ClusterConfigEntry) (string, error) {
var (
cc csiClusterConfig
centry ClusterConfigEntry
Expand All @@ -89,8 +92,7 @@ func updateCsiClusterConfig(curr, clusterKey string, newClusterConfigEntry *Clus
if newClusterConfigEntry != nil {
for i, centry := range cc {
// If the clusterID belongs to the same cluster, update the entry.
// update default clusterID's entry
if clusterKey == newClusterConfigEntry.ClusterID {
if storageClientID == cc[i].StorageClientID || clusterKey == newClusterConfigEntry.ClusterID {
centry.Monitors = newClusterConfigEntry.Monitors
cc[i] = centry
}
Expand All @@ -116,7 +118,7 @@ func updateCsiClusterConfig(curr, clusterKey string, newClusterConfigEntry *Clus
if !found {
// If it's the first time we create the cluster, the entry does not exist, so the removal
// will fail with a dangling pointer
if newClusterConfigEntry != nil {
if newClusterConfigEntry != nil && clusterKey != "" {
centry.ClusterID = clusterKey
centry.Monitors = newClusterConfigEntry.Monitors
// Add a condition not to fill with empty values
Expand Down Expand Up @@ -162,7 +164,7 @@ func createMonConfigMap(ctx context.Context, c client.Client, ownerDep *appsv1.D
// value that is provided to ceph-csi uses in the storage class.
// The locker l is typically a mutex and is used to prevent the config
// map from being updated for multiple clusters simultaneously.
func UpdateMonConfigMap(ctx context.Context, c client.Client, log klog.Logger, clusterID string, newClusterConfigEntry *ClusterConfigEntry) error {
func UpdateMonConfigMap(ctx context.Context, c client.Client, log klog.Logger, clusterID, storageClientID string, newClusterConfigEntry *ClusterConfigEntry) error {
ConfigKey := "config.json"
configMap := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -185,7 +187,7 @@ func UpdateMonConfigMap(ctx context.Context, c client.Client, log klog.Logger, c

// update ConfigMap contents for current cluster
currData := configMap.Data[ConfigKey]
newData, err := updateCsiClusterConfig(currData, clusterID, newClusterConfigEntry)
newData, err := updateCsiClusterConfig(currData, clusterID, storageClientID, newClusterConfigEntry)
if err != nil {
return errors.Wrap(err, "failed to update csi config map data")
}
Expand All @@ -201,3 +203,22 @@ func UpdateMonConfigMap(ctx context.Context, c client.Client, log klog.Logger, c

return nil
}

func ExtractMonitor(monitorData []byte) ([]string, error) {
data := map[string]string{}
monitorIPs := []string{}
err := json.Unmarshal(monitorData, &data)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal data: %v", err)
}
// Ip will be in the format of "b=172.30.60.238:6789","c=172.30.162.124:6789","a=172.30.1.100:6789"
monIPs := strings.Split(data["data"], ",")
for _, monIP := range monIPs {
ip := strings.Split(monIP, "=")
if len(ip) != 2 {
return nil, fmt.Errorf("invalid mon ips: %s", monIPs)
}
monitorIPs = append(monitorIPs, ip[1])
}
return monitorIPs, nil
}
36 changes: 28 additions & 8 deletions status-report/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (
"os"
"time"

"github.com/go-logr/logr"
"github.com/red-hat-storage/ocs-client-operator/api/v1alpha1"
"github.com/red-hat-storage/ocs-client-operator/csi"
"github.com/red-hat-storage/ocs-client-operator/pkg/utils"

providerclient "github.com/red-hat-storage/ocs-operator/services/provider/client"
Expand Down Expand Up @@ -59,26 +61,44 @@ func main() {
klog.Exitf("%s env var not set", utils.StorageClientNameEnvVar)
}

storageclient := &v1alpha1.StorageClient{}
storageclient.Name = storageClientName
storageclient.Namespace = storageClientNamespace
storageClient := &v1alpha1.StorageClient{}
storageClient.Name = storageClientName
storageClient.Namespace = storageClientNamespace

if err = cl.Get(ctx, types.NamespacedName{Name: storageclient.Name, Namespace: storageclient.Namespace}, storageclient); err != nil {
klog.Exitf("Failed to get storageClient %q/%q: %v", storageclient.Namespace, storageclient.Name, err)
if err = cl.Get(ctx, types.NamespacedName{Name: storageClient.Name, Namespace: storageClient.Namespace}, storageClient); err != nil {
klog.Exitf("Failed to get storageClient %q/%q: %v", storageClient.Namespace, storageClient.Name, err)
}

providerClient, err := providerclient.NewProviderClient(
ctx,
storageclient.Spec.StorageProviderEndpoint,
storageClient.Spec.StorageProviderEndpoint,
10*time.Second,
)
if err != nil {
klog.Exitf("Failed to create grpc client: %v", err)
}
defer providerClient.Close()

if _, err = providerClient.ReportStatus(ctx, storageclient.Status.ConsumerID); err != nil {
klog.Exitf("Failed to update lastHeartbeat of storageClient %v: %v", storageclient.Status.ConsumerID, err)
if _, err = providerClient.ReportStatus(ctx, storageClient.Status.ConsumerID); err != nil {
klog.Exitf("Failed to update lastHeartbeat of storageClient %v: %v", storageClient.Status.ConsumerID, err)
}

var csiClusterConfigEntry = new(csi.ClusterConfigEntry)
scResponse, err := providerClient.GetStorageConfig(ctx, storageClient.Status.ConsumerID)
if err != nil {
klog.Exitf("Failed to get StorageConfig of storageClient %v: %v", storageClient.Status.ConsumerID, err)
}
for _, eResource := range scResponse.ExternalResource {
if eResource.Kind == "ConfigMap" && eResource.Name == "rook-ceph-mon-endpoints" {
monitorIps, err := csi.ExtractMonitor(eResource.Data)
if err != nil {
klog.Exitf("Failed to extract monitor data for storageClient %v: %v", storageClient.Status.ConsumerID, err)
}
csiClusterConfigEntry.Monitors = append(csiClusterConfigEntry.Monitors, monitorIps...)
}
}
err = csi.UpdateMonConfigMap(ctx, cl, logr.FromContextOrDiscard(ctx), "", storageClient.Status.ConsumerID, csiClusterConfigEntry)
if err != nil {
klog.Exitf("Failed to update mon configmap for storageClient %v: %v", storageClient.Status.ConsumerID, err)
}
}

0 comments on commit 7aae1c7

Please sign in to comment.