diff --git a/pkg/rbd/controllerserver.go b/pkg/rbd/controllerserver.go index b8a5c96ec..f7a968483 100644 --- a/pkg/rbd/controllerserver.go +++ b/pkg/rbd/controllerserver.go @@ -55,6 +55,9 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol return nil, status.Error(codes.InvalidArgument, "Volume Capabilities cannot be empty") } + volumeNameMutex.LockKey(req.GetName()) + defer volumeNameMutex.UnlockKey(req.GetName()) + // Need to check for already existing volume name, and if found // check for the requested capacity and already allocated capacity if exVol, err := getRBDVolumeByName(req.GetName()); err == nil { @@ -156,6 +159,8 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol } // For now the image get unconditionally deleted, but here retention policy can be checked volumeID := req.GetVolumeId() + volumeIDMutex.LockKey(volumeID) + defer volumeIDMutex.UnlockKey(volumeID) rbdVol := &rbdVolume{} if err := loadVolInfo(volumeID, path.Join(PluginFolder, "controller"), rbdVol); err != nil { if os.IsNotExist(errors.Cause(err)) { @@ -174,8 +179,6 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol } // Removing persistent storage file for the unmapped volume if err := deleteVolInfo(volumeID, path.Join(PluginFolder, "controller")); err != nil { - // TODO: we can theoretically end up here when two DeleteVolume calls - // get invoked concurrently. Serialize? return nil, err } @@ -214,6 +217,9 @@ func (cs *controllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS return nil, status.Error(codes.InvalidArgument, "Source Volume ID cannot be empty") } + snapshotNameMutex.LockKey(req.GetName()) + defer snapshotNameMutex.UnlockKey(req.GetName()) + // Need to check for already existing snapshot name, and if found // check for the requested source volume id and already allocated source volume id if exSnap, err := getRBDSnapshotByName(req.GetName()); err == nil { @@ -332,6 +338,9 @@ func (cs *controllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteS if len(snapshotID) == 0 { return nil, status.Error(codes.InvalidArgument, "Snapshot ID cannot be empty") } + snapshotIDMutex.LockKey(snapshotID) + defer snapshotIDMutex.UnlockKey(snapshotID) + rbdSnap := &rbdSnapshot{} if err := loadSnapInfo(snapshotID, path.Join(PluginFolder, "controller-snap"), rbdSnap); err != nil { return nil, err @@ -368,6 +377,7 @@ func (cs *controllerServer) ListSnapshots(ctx context.Context, req *csi.ListSnap sourceVolumeId := req.GetSourceVolumeId() // TODO (sngchlko) list with token + // TODO (#94) protect concurrent access to global data structures // list only a specific snapshot which has snapshot ID if snapshotID := req.GetSnapshotId(); len(snapshotID) != 0 { diff --git a/pkg/rbd/nodeserver.go b/pkg/rbd/nodeserver.go index eb75cf48b..65c5ad194 100644 --- a/pkg/rbd/nodeserver.go +++ b/pkg/rbd/nodeserver.go @@ -47,6 +47,9 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis s := strings.Split(strings.TrimSuffix(targetPath, "/mount"), "/") volName := s[len(s)-1] + targetPathMutex.LockKey(targetPath) + defer targetPathMutex.UnlockKey(targetPath) + notMnt, err := ns.mounter.IsLikelyNotMountPoint(targetPath) if err != nil { if os.IsNotExist(err) { @@ -97,6 +100,8 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) { targetPath := req.GetTargetPath() + targetPathMutex.LockKey(targetPath) + defer targetPathMutex.UnlockKey(targetPath) notMnt, err := ns.mounter.IsLikelyNotMountPoint(targetPath) if err != nil { diff --git a/pkg/rbd/rbd_util.go b/pkg/rbd/rbd_util.go index ce3d87a59..accde1091 100644 --- a/pkg/rbd/rbd_util.go +++ b/pkg/rbd/rbd_util.go @@ -75,7 +75,19 @@ type rbdSnapshot struct { } var ( + // serializes operations based on "/" as key attachdetachMutex = keymutex.NewKeyMutex() + // serializes operations based on "volume name" as key + volumeNameMutex = keymutex.NewKeyMutex() + // serializes operations based on "volume id" as key + volumeIDMutex = keymutex.NewKeyMutex() + // serializes operations based on "snapshot name" as key + snapshotNameMutex = keymutex.NewKeyMutex() + // serializes operations based on "snapshot id" as key + snapshotIDMutex = keymutex.NewKeyMutex() + // serializes operations based on "mount target path" as key + targetPathMutex = keymutex.NewKeyMutex() + supportedFeatures = sets.NewString("layering") )