From 266a62fab8d8bf6fccb0076b550638d810e82d06 Mon Sep 17 00:00:00 2001 From: Hans Rakers Date: Wed, 19 Jun 2024 16:12:24 +0200 Subject: [PATCH] fix: Fix the way operation locks are implemented, improve volume expand --- pkg/driver/controller.go | 48 +++++++++++++++++++++++++++++---------- pkg/driver/node.go | 21 ++++++++++++----- pkg/util/idlocker.go | 6 +---- pkg/util/idlocker_test.go | 3 +-- 4 files changed, 53 insertions(+), 25 deletions(-) diff --git a/pkg/driver/controller.go b/pkg/driver/controller.go index 54b90a5..dbef2b4 100644 --- a/pkg/driver/controller.go +++ b/pkg/driver/controller.go @@ -26,15 +26,23 @@ var onlyVolumeCapAccessMode = csi.VolumeCapability_AccessMode{ type controllerServer struct { csi.UnimplementedControllerServer - connector cloud.Interface + // connector is the CloudStack client interface + connector cloud.Interface + + // A map storing all volumes with ongoing operations so that additional operations + // for that same volume (as defined by VolumeID/volume name) return an Aborted error volumeLocks *util.VolumeLocks + + // A map storing all volumes/snapshots with ongoing operations. + operationLocks *util.OperationLock } // NewControllerServer creates a new Controller gRPC server. func NewControllerServer(connector cloud.Interface) csi.ControllerServer { return &controllerServer{ - connector: connector, - volumeLocks: util.NewVolumeLocks(), + connector: connector, + volumeLocks: util.NewVolumeLocks(), + operationLocks: util.NewOperationLock(), } } @@ -237,6 +245,14 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol } defer cs.volumeLocks.Release(volumeID) + // lock out volumeID for clone and expand operation + if err := cs.operationLocks.GetDeleteLock(volumeID); err != nil { + logger.Error(err, "Failed to acquire delete operation lock") + + return nil, status.Error(codes.Aborted, err.Error()) + } + defer cs.operationLocks.ReleaseDeleteLock(volumeID) + logger.Info("Deleting volume", "volumeID", volumeID, ) @@ -448,25 +464,24 @@ func (cs *controllerServer) ControllerExpandVolume(ctx context.Context, req *csi logger := klog.FromContext(ctx) logger.V(6).Info("ControllerExpandVolume: called", "args", protosanitizer.StripSecrets(*req)) - expandVolumeLock := util.NewOperationLock(ctx) - volumeID := req.GetVolumeId() if len(volumeID) == 0 { return nil, status.Error(codes.InvalidArgument, "Volume ID not provided") } - err := expandVolumeLock.GetExpandLock(volumeID) - if err != nil { - logger.Error(err, "failed acquiring expand lock", "volumeID", volumeID) - - return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID) - } - defer expandVolumeLock.ReleaseExpandLock(volumeID) capRange := req.GetCapacityRange() if capRange == nil { return nil, status.Error(codes.InvalidArgument, "Capacity range not provided") } + // lock out parallel requests against the same volume ID + if acquired := cs.volumeLocks.TryAcquire(volumeID); !acquired { + logger.Error(errors.New(util.ErrVolumeOperationAlreadyExistsVolumeID), "failed to acquire volume lock", "volumeID", volumeID) + + return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID) + } + defer cs.volumeLocks.Release(volumeID) + volSizeBytes := capRange.GetRequiredBytes() volSizeGB := util.RoundUpBytesToGB(volSizeBytes) maxVolSize := capRange.GetLimitBytes() @@ -496,6 +511,15 @@ func (cs *controllerServer) ControllerExpandVolume(ctx context.Context, req *csi NodeExpansionRequired: true, }, nil } + + // lock out volumeID for clone and delete operation + if err := cs.operationLocks.GetExpandLock(volumeID); err != nil { + logger.Error(err, "failed acquiring expand lock", "volumeID", volumeID) + + return nil, status.Error(codes.Aborted, err.Error()) + } + defer cs.operationLocks.ReleaseExpandLock(volumeID) + err = cs.connector.ExpandVolume(ctx, volumeID, volSizeGB) if err != nil { return nil, status.Errorf(codes.Internal, "Could not resize volume %q to size %v: %v", volumeID, volSizeGB, err) diff --git a/pkg/driver/node.go b/pkg/driver/node.go index 64f83de..fac21ab 100644 --- a/pkg/driver/node.go +++ b/pkg/driver/node.go @@ -403,7 +403,14 @@ func (ns *nodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandV if len(volumeID) == 0 { return nil, status.Error(codes.InvalidArgument, "Volume ID not provided") } - volumePath := req.GetVolumePath() + + // Get volume path + // This should work for Kubernetes >= 1.26, see https://github.com/kubernetes/kubernetes/issues/115343 + volumePath := req.GetStagingTargetPath() + if volumePath == "" { + // Except that it doesn't work in the sanity test, so we need a fallback to volumePath. + volumePath = req.GetVolumePath() + } if len(volumePath) == 0 { return nil, status.Error(codes.InvalidArgument, "Volume path not provided") } @@ -418,6 +425,13 @@ func (ns *nodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandV } } + if acquired := ns.volumeLocks.TryAcquire(volumeID); !acquired { + logger.Error(errors.New(util.ErrVolumeOperationAlreadyExistsVolumeID), "failed to acquire volume lock", "volumeID", volumeID) + + return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID) + } + defer ns.volumeLocks.Release(volumeID) + _, err := ns.connector.GetVolumeByID(ctx, volumeID) if err != nil { if errors.Is(err, cloud.ErrNotFound) { @@ -427,11 +441,6 @@ func (ns *nodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandV return nil, status.Error(codes.Internal, fmt.Sprintf("NodeExpandVolume failed with error %v", err)) } - _, err = ns.mounter.GetMountRefs(volumePath) - if err != nil { - return nil, status.Error(codes.Internal, fmt.Sprintf("Failed to find mount file system %s: %v", volumePath, err)) - } - devicePath, err := ns.mounter.GetDevicePath(ctx, volumeID) if devicePath == "" { return nil, status.Error(codes.Internal, fmt.Sprintf("Unable to find Device path for volume %s: %v", volumeID, err)) diff --git a/pkg/util/idlocker.go b/pkg/util/idlocker.go index f60fabf..619a3ae 100644 --- a/pkg/util/idlocker.go +++ b/pkg/util/idlocker.go @@ -14,7 +14,6 @@ limitations under the License. package util import ( - "context" "fmt" "sync" @@ -95,12 +94,10 @@ type OperationLock struct { locks map[operation]map[string]int // lock to avoid concurrent operation on map mux sync.Mutex - // context for logging - ctx context.Context //nolint:containedctx } // NewOperationLock returns new OperationLock. -func NewOperationLock(ctx context.Context) *OperationLock { +func NewOperationLock() *OperationLock { lock := make(map[operation]map[string]int) lock[createOp] = make(map[string]int) lock[deleteOp] = make(map[string]int) @@ -110,7 +107,6 @@ func NewOperationLock(ctx context.Context) *OperationLock { return &OperationLock{ locks: lock, - ctx: ctx, } } diff --git a/pkg/util/idlocker_test.go b/pkg/util/idlocker_test.go index f89efc5..365e841 100644 --- a/pkg/util/idlocker_test.go +++ b/pkg/util/idlocker_test.go @@ -17,7 +17,6 @@ limitations under the License. package util import ( - "context" "testing" ) @@ -56,7 +55,7 @@ func TestIDLocker(t *testing.T) { func TestOperationLocks(t *testing.T) { t.Parallel() volumeID := "test-vol" - lock := NewOperationLock(context.Background()) + lock := NewOperationLock() err := lock.GetCloneLock(volumeID) if err != nil { t.Errorf("failed to acquire clone lock for %s %s", volumeID, err)