From b3a4f510e6a3d834aa86c499c541aba5b04929c3 Mon Sep 17 00:00:00 2001 From: Madhu Rajanna Date: Mon, 13 Jul 2020 10:58:17 +0530 Subject: [PATCH] rbd: take operation locks before operating on resource Take operation locks on the resources before operating on the resouces. This allows us to do parallel operations for some RPC calls such as Clone and Restore of PVC. This operations will only be blocked if the image is expanding or Snapshot and RBD image is getting deleted. Signed-off-by: Madhu Rajanna --- internal/rbd/controllerserver.go | 46 ++++++++++++++++++++++++++------ internal/rbd/driver.go | 1 + 2 files changed, 39 insertions(+), 8 deletions(-) diff --git a/internal/rbd/controllerserver.go b/internal/rbd/controllerserver.go index d5637b76d..d59e7a93c 100644 --- a/internal/rbd/controllerserver.go +++ b/internal/rbd/controllerserver.go @@ -47,6 +47,9 @@ type ControllerServer struct { // A map storing all volumes with ongoing operations so that additional operations // for that same snapshot (as defined by SnapshotID/snapshot name) return an Aborted error SnapshotLocks *util.VolumeLocks + + // A map storing all volumes/snapshots with ongoing operations. + OperationLocks *util.OperationLock } func (cs *ControllerServer) validateVolumeReq(ctx context.Context, req *csi.CreateVolumeRequest) error { @@ -421,17 +424,23 @@ func (cs *ControllerServer) createBackingImage(ctx context.Context, cr *util.Cre // nolint:gocritic // this ifElseChain can not be rewritten to a switch statement if rbdSnap != nil { + if err = cs.OperationLocks.GetRestoreLock(rbdSnap.SnapID); err != nil { + klog.Error(util.Log(ctx, err.Error())) + return status.Error(codes.Aborted, err.Error()) + } + defer cs.OperationLocks.ReleaseRestoreLock(rbdSnap.SnapID) + err = cs.createVolumeFromSnapshot(ctx, cr, rbdVol, rbdSnap.SnapID) if err != nil { return err } util.DebugLog(ctx, "created volume %s from snapshot %s", rbdVol.RequestName, rbdSnap.RbdSnapName) } else if parentVol != nil { - if acquired := cs.VolumeLocks.TryAcquire(parentVol.VolID); !acquired { - klog.Infof(util.Log(ctx, util.VolumeOperationAlreadyExistsFmt), parentVol.VolID) - return status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, parentVol.VolID) + if err = cs.OperationLocks.GetCloneLock(parentVol.VolID); err != nil { + klog.Error(util.Log(ctx, err.Error())) + return status.Error(codes.Aborted, err.Error()) } - defer cs.VolumeLocks.Release(parentVol.VolID) + defer cs.OperationLocks.ReleaseCloneLock(parentVol.VolID) return rbdVol.createCloneFromImage(ctx, parentVol) } else { err = createImage(ctx, rbdVol, cr) @@ -560,6 +569,13 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol } defer cs.VolumeLocks.Release(volumeID) + // lock out volumeID for clone and expand operation + if err = cs.OperationLocks.GetDeleteLock(volumeID); err != nil { + klog.Error(util.Log(ctx, err.Error())) + return nil, status.Error(codes.Aborted, err.Error()) + } + defer cs.OperationLocks.ReleaseDeleteLock(volumeID) + rbdVol := &rbdVolume{} defer rbdVol.Destroy() @@ -739,11 +755,11 @@ func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS defer cs.SnapshotLocks.Release(req.GetName()) // Take lock on parent rbd image - if acquired := cs.VolumeLocks.TryAcquire(rbdSnap.SourceVolumeID); !acquired { - klog.Errorf(util.Log(ctx, util.VolumeOperationAlreadyExistsFmt), rbdSnap.SourceVolumeID) - return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, rbdSnap.SourceVolumeID) + if err = cs.OperationLocks.GetSnapshotCreateLock(rbdSnap.SourceVolumeID); err != nil { + klog.Error(util.Log(ctx, err.Error())) + return nil, status.Error(codes.Aborted, err.Error()) } - defer cs.VolumeLocks.Release(rbdSnap.SourceVolumeID) + defer cs.OperationLocks.ReleaseSnapshotCreateLock(rbdSnap.SourceVolumeID) // Need to check for already existing snapshot name, and if found // check for the requested source volume id and already allocated source volume id @@ -956,6 +972,13 @@ func (cs *ControllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteS } defer cs.SnapshotLocks.Release(snapshotID) + // lock out snapshotID for restore operation + if err = cs.OperationLocks.GetDeleteLock(snapshotID); err != nil { + klog.Error(util.Log(ctx, err.Error())) + return nil, status.Error(codes.Aborted, err.Error()) + } + defer cs.OperationLocks.ReleaseDeleteLock(snapshotID) + rbdSnap := &rbdSnapshot{} if err = genSnapFromSnapID(ctx, rbdSnap, snapshotID, cr); err != nil { // if error is ErrPoolNotFound, the pool is already deleted we dont @@ -1047,6 +1070,13 @@ func (cs *ControllerServer) ControllerExpandVolume(ctx context.Context, req *csi } defer cs.VolumeLocks.Release(volID) + // lock out volumeID for clone and delete operation + if err := cs.OperationLocks.GetExpandLock(volID); err != nil { + klog.Error(util.Log(ctx, err.Error())) + return nil, status.Error(codes.Aborted, err.Error()) + } + defer cs.OperationLocks.ReleaseExpandLock(volID) + cr, err := util.NewUserCredentials(req.GetSecrets()) if err != nil { return nil, status.Error(codes.Internal, err.Error()) diff --git a/internal/rbd/driver.go b/internal/rbd/driver.go index faf6aeb99..0767f7acc 100644 --- a/internal/rbd/driver.go +++ b/internal/rbd/driver.go @@ -80,6 +80,7 @@ func NewControllerServer(d *csicommon.CSIDriver) *ControllerServer { DefaultControllerServer: csicommon.NewDefaultControllerServer(d), VolumeLocks: util.NewVolumeLocks(), SnapshotLocks: util.NewVolumeLocks(), + OperationLocks: util.NewOperationLock(), } }