Skip to content

Commit

Permalink
Merge pull request #127 from Leaseweb/operationlock
Browse files Browse the repository at this point in the history
fix: Fix the way operation locks are implemented, improve volume expand
  • Loading branch information
hrak authored Jun 20, 2024
2 parents f222b54 + 266a62f commit 40fbd87
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 25 deletions.
48 changes: 36 additions & 12 deletions pkg/driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,23 @@ var onlyVolumeCapAccessMode = csi.VolumeCapability_AccessMode{

type controllerServer struct {
csi.UnimplementedControllerServer
connector cloud.Interface
// connector is the CloudStack client interface
connector cloud.Interface

// A map storing all volumes with ongoing operations so that additional operations
// for that same volume (as defined by VolumeID/volume name) return an Aborted error
volumeLocks *util.VolumeLocks

// A map storing all volumes/snapshots with ongoing operations.
operationLocks *util.OperationLock
}

// NewControllerServer creates a new Controller gRPC server.
func NewControllerServer(connector cloud.Interface) csi.ControllerServer {
return &controllerServer{
connector: connector,
volumeLocks: util.NewVolumeLocks(),
connector: connector,
volumeLocks: util.NewVolumeLocks(),
operationLocks: util.NewOperationLock(),
}
}

Expand Down Expand Up @@ -237,6 +245,14 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
}
defer cs.volumeLocks.Release(volumeID)

// lock out volumeID for clone and expand operation
if err := cs.operationLocks.GetDeleteLock(volumeID); err != nil {
logger.Error(err, "Failed to acquire delete operation lock")

return nil, status.Error(codes.Aborted, err.Error())
}
defer cs.operationLocks.ReleaseDeleteLock(volumeID)

logger.Info("Deleting volume",
"volumeID", volumeID,
)
Expand Down Expand Up @@ -448,25 +464,24 @@ func (cs *controllerServer) ControllerExpandVolume(ctx context.Context, req *csi
logger := klog.FromContext(ctx)
logger.V(6).Info("ControllerExpandVolume: called", "args", protosanitizer.StripSecrets(*req))

expandVolumeLock := util.NewOperationLock(ctx)

volumeID := req.GetVolumeId()
if len(volumeID) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume ID not provided")
}
err := expandVolumeLock.GetExpandLock(volumeID)
if err != nil {
logger.Error(err, "failed acquiring expand lock", "volumeID", volumeID)

return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID)
}
defer expandVolumeLock.ReleaseExpandLock(volumeID)

capRange := req.GetCapacityRange()
if capRange == nil {
return nil, status.Error(codes.InvalidArgument, "Capacity range not provided")
}

// lock out parallel requests against the same volume ID
if acquired := cs.volumeLocks.TryAcquire(volumeID); !acquired {
logger.Error(errors.New(util.ErrVolumeOperationAlreadyExistsVolumeID), "failed to acquire volume lock", "volumeID", volumeID)

return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID)
}
defer cs.volumeLocks.Release(volumeID)

volSizeBytes := capRange.GetRequiredBytes()
volSizeGB := util.RoundUpBytesToGB(volSizeBytes)
maxVolSize := capRange.GetLimitBytes()
Expand Down Expand Up @@ -496,6 +511,15 @@ func (cs *controllerServer) ControllerExpandVolume(ctx context.Context, req *csi
NodeExpansionRequired: true,
}, nil
}

// lock out volumeID for clone and delete operation
if err := cs.operationLocks.GetExpandLock(volumeID); err != nil {
logger.Error(err, "failed acquiring expand lock", "volumeID", volumeID)

return nil, status.Error(codes.Aborted, err.Error())
}
defer cs.operationLocks.ReleaseExpandLock(volumeID)

err = cs.connector.ExpandVolume(ctx, volumeID, volSizeGB)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not resize volume %q to size %v: %v", volumeID, volSizeGB, err)
Expand Down
21 changes: 15 additions & 6 deletions pkg/driver/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,14 @@ func (ns *nodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandV
if len(volumeID) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume ID not provided")
}
volumePath := req.GetVolumePath()

// Get volume path
// This should work for Kubernetes >= 1.26, see https://github.com/kubernetes/kubernetes/issues/115343
volumePath := req.GetStagingTargetPath()
if volumePath == "" {
// Except that it doesn't work in the sanity test, so we need a fallback to volumePath.
volumePath = req.GetVolumePath()
}
if len(volumePath) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume path not provided")
}
Expand All @@ -418,6 +425,13 @@ func (ns *nodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandV
}
}

if acquired := ns.volumeLocks.TryAcquire(volumeID); !acquired {
logger.Error(errors.New(util.ErrVolumeOperationAlreadyExistsVolumeID), "failed to acquire volume lock", "volumeID", volumeID)

return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID)
}
defer ns.volumeLocks.Release(volumeID)

_, err := ns.connector.GetVolumeByID(ctx, volumeID)
if err != nil {
if errors.Is(err, cloud.ErrNotFound) {
Expand All @@ -427,11 +441,6 @@ func (ns *nodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandV
return nil, status.Error(codes.Internal, fmt.Sprintf("NodeExpandVolume failed with error %v", err))
}

_, err = ns.mounter.GetMountRefs(volumePath)
if err != nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("Failed to find mount file system %s: %v", volumePath, err))
}

devicePath, err := ns.mounter.GetDevicePath(ctx, volumeID)
if devicePath == "" {
return nil, status.Error(codes.Internal, fmt.Sprintf("Unable to find Device path for volume %s: %v", volumeID, err))
Expand Down
6 changes: 1 addition & 5 deletions pkg/util/idlocker.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ limitations under the License.
package util

import (
"context"
"fmt"
"sync"

Expand Down Expand Up @@ -95,12 +94,10 @@ type OperationLock struct {
locks map[operation]map[string]int
// lock to avoid concurrent operation on map
mux sync.Mutex
// context for logging
ctx context.Context //nolint:containedctx
}

// NewOperationLock returns new OperationLock.
func NewOperationLock(ctx context.Context) *OperationLock {
func NewOperationLock() *OperationLock {
lock := make(map[operation]map[string]int)
lock[createOp] = make(map[string]int)
lock[deleteOp] = make(map[string]int)
Expand All @@ -110,7 +107,6 @@ func NewOperationLock(ctx context.Context) *OperationLock {

return &OperationLock{
locks: lock,
ctx: ctx,
}
}

Expand Down
3 changes: 1 addition & 2 deletions pkg/util/idlocker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package util

import (
"context"
"testing"
)

Expand Down Expand Up @@ -56,7 +55,7 @@ func TestIDLocker(t *testing.T) {
func TestOperationLocks(t *testing.T) {
t.Parallel()
volumeID := "test-vol"
lock := NewOperationLock(context.Background())
lock := NewOperationLock()
err := lock.GetCloneLock(volumeID)
if err != nil {
t.Errorf("failed to acquire clone lock for %s %s", volumeID, err)
Expand Down

0 comments on commit 40fbd87

Please sign in to comment.