diff --git a/internal/rbd/controllerserver.go b/internal/rbd/controllerserver.go index 1edc2c48b..86a880505 100644 --- a/internal/rbd/controllerserver.go +++ b/internal/rbd/controllerserver.go @@ -18,11 +18,12 @@ package rbd import ( "context" - "fmt" csicommon "github.com/ceph/ceph-csi/internal/csi-common" + "github.com/ceph/ceph-csi/internal/journal" "github.com/ceph/ceph-csi/internal/util" + librbd "github.com/ceph/go-ceph/rbd" "github.com/container-storage-interface/spec/lib/go/csi" "github.com/kubernetes-csi/csi-lib-utils/protosanitizer" "google.golang.org/grpc/codes" @@ -178,7 +179,6 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol return nil, err } defer rbdVol.Destroy() - // Existence and conflict checks if acquired := cs.VolumeLocks.TryAcquire(req.GetName()); !acquired { klog.Errorf(util.Log(ctx, util.VolumeOperationAlreadyExistsFmt), req.GetName()) @@ -192,53 +192,50 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol return nil, status.Error(codes.Internal, err.Error()) } + rbdSnap, err := cs.checkSnapshotSource(ctx, req, cr) + if err != nil { + return nil, err + } + found, err := rbdVol.Exists(ctx) if err != nil { if _, ok := err.(ErrVolNameConflict); ok { return nil, status.Error(codes.AlreadyExists, err.Error()) } - return nil, status.Error(codes.Internal, err.Error()) } if found { + if rbdSnap != nil { + // check if image depth is reached limit and requires flatten + err = checkFlatten(ctx, rbdVol, cr) + if err != nil { + return nil, err + } + } return buildCreateVolumeResponse(ctx, req, rbdVol) } - rbdSnap, err := cs.checkSnapshotSource(ctx, req, cr) - if err != nil { - return nil, err - } - err = reserveVol(ctx, rbdVol, rbdSnap, cr) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } defer func() { if err != nil { - errDefer := undoVolReservation(ctx, rbdVol, cr) - if errDefer != nil { - klog.Warningf(util.Log(ctx, "failed undoing reservation of volume: %s (%s)"), req.GetName(), errDefer) + if _, ok := err.(ErrFlattenInProgress); !ok { + errDefer := undoVolReservation(ctx, rbdVol, cr) + if errDefer != nil { + klog.Warningf(util.Log(ctx, "failed undoing reservation of volume: %s (%s)"), req.GetName(), errDefer) + } } } }() - err = createBackingImage(ctx, cr, rbdVol, rbdSnap) + err = cs.createBackingImage(ctx, cr, rbdVol, rbdSnap) if err != nil { - return nil, err - } - - if rbdVol.Encrypted { - err = rbdVol.ensureEncryptionMetadataSet(rbdImageRequiresEncryption) - if err != nil { - klog.Errorf(util.Log(ctx, "failed to save encryption status, deleting image %s: %s"), - rbdVol, err) - if deleteErr := deleteImage(ctx, rbdVol, cr); deleteErr != nil { - klog.Errorf(util.Log(ctx, "failed to delete rbd image: %s with error: %v"), - rbdVol, deleteErr) - return nil, deleteErr - } - return nil, err + if _, ok := err.(ErrFlattenInProgress); ok { + return nil, status.Error(codes.Aborted, err.Error()) } + return nil, err } volumeContext := req.GetParameters() @@ -262,27 +259,121 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol return &csi.CreateVolumeResponse{Volume: volume}, nil } -func createBackingImage(ctx context.Context, cr *util.Credentials, rbdVol *rbdVolume, rbdSnap *rbdSnapshot) error { +// checkFlatten ensures that that the image chain depth is not reached +// hardlimit or softlimit. if the softlimit is reached it adds a task and +// return success,the hardlimit is reached it starts a task to flatten the +// image and return Aborted +func checkFlatten(ctx context.Context, rbdVol *rbdVolume, cr *util.Credentials) error { + err := rbdVol.flattenRbdImage(ctx, cr, false) + if err != nil { + if _, ok := err.(ErrFlattenInProgress); ok { + return status.Error(codes.Aborted, err.Error()) + } + if errDefer := deleteImage(ctx, rbdVol, cr); errDefer != nil { + klog.Errorf(util.Log(ctx, "failed to delete rbd image: %s with error: %v"), rbdVol, errDefer) + return status.Error(codes.Internal, err.Error()) + } + errDefer := undoVolReservation(ctx, rbdVol, cr) + if errDefer != nil { + klog.Warningf(util.Log(ctx, "failed undoing reservation of volume: %s (%s)"), rbdVol.RequestName, errDefer) + } + return status.Error(codes.Internal, err.Error()) + } + return nil +} + +func (cs *ControllerServer) createVolumeFromSnapshot(ctx context.Context, cr *util.Credentials, rbdVol *rbdVolume, snapshotID string) error { + rbdSnap := &rbdSnapshot{} + if acquired := cs.SnapshotLocks.TryAcquire(snapshotID); !acquired { + klog.Infof(util.Log(ctx, util.SnapshotOperationAlreadyExistsFmt), snapshotID) + return status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, snapshotID) + } + defer cs.SnapshotLocks.Release(snapshotID) + + err := genSnapFromSnapID(ctx, rbdSnap, snapshotID, cr) + if err != nil { + if _, ok := err.(util.ErrPoolNotFound); ok { + klog.Errorf(util.Log(ctx, "failed to get backend snapshot for %s: %v"), snapshotID, err) + return status.Error(codes.InvalidArgument, err.Error()) + } + return status.Error(codes.Internal, err.Error()) + } + + // update parent name(rbd image name in snapshot) + rbdSnap.RbdImageName = rbdSnap.RbdSnapName + // create clone image and delete snapshot + err = rbdVol.cloneRbdImageFromSnapshot(ctx, rbdSnap) + if err != nil { + klog.Errorf(util.Log(ctx, "failed to clone rbd image %s from snapshot %s: %v"), rbdSnap, err) + return err + } + + klog.V(4).Infof(util.Log(ctx, "create volume %s from snapshot %s"), rbdVol.RequestName, rbdSnap.RbdSnapName) + return nil +} + +func (cs *ControllerServer) createBackingImage(ctx context.Context, cr *util.Credentials, rbdVol *rbdVolume, rbdSnap *rbdSnapshot) error { var err error + var j = &journal.Connection{} + j, err = volJournal.Connect(rbdVol.Monitors, cr) + if err != nil { + return status.Error(codes.Internal, err.Error()) + } + defer j.Destroy() + if rbdSnap != nil { - err = restoreSnapshot(ctx, rbdVol, rbdSnap, cr) + err = cs.createVolumeFromSnapshot(ctx, cr, rbdVol, rbdSnap.SnapID) if err != nil { return err } - klog.V(4).Infof(util.Log(ctx, "created volume %s from snapshot %s"), rbdVol.RequestName, rbdSnap.RbdSnapName) - return nil - } - - err = createImage(ctx, rbdVol, cr) - if err != nil { - klog.Errorf(util.Log(ctx, "failed to create volume: %v"), err) - return status.Error(codes.Internal, err.Error()) + } else { + err = createImage(ctx, rbdVol, cr) + if err != nil { + klog.Errorf(util.Log(ctx, "failed to create volume: %v"), err) + return status.Error(codes.Internal, err.Error()) + } } klog.V(4).Infof(util.Log(ctx, "created volume %s backed by image %s"), rbdVol.RequestName, rbdVol.RbdImageName) + defer func() { + if err != nil { + if _, ok := err.(ErrFlattenInProgress); !ok { + if deleteErr := deleteImage(ctx, rbdVol, cr); deleteErr != nil { + klog.Errorf(util.Log(ctx, "failed to delete rbd image: %s with error: %v"), rbdVol, deleteErr) + } + } + } + }() + err = rbdVol.getImageID() + if err != nil { + klog.Errorf(util.Log(ctx, "failed to get volume id %s: %v"), rbdVol, err) + return status.Error(codes.Internal, err.Error()) + } + + err = j.StoreImageID(ctx, rbdVol.JournalPool, rbdVol.ReservedID, rbdVol.ImageID, cr) + if err != nil { + klog.Errorf(util.Log(ctx, "failed to reserve volume %s: %v"), rbdVol, err) + return status.Error(codes.Internal, err.Error()) + } + + if rbdSnap != nil { + err = rbdVol.flattenRbdImage(ctx, cr, false) + if err != nil { + klog.Errorf(util.Log(ctx, "failed to flatten image %s: %v"), rbdVol, err) + return err + } + } + if rbdVol.Encrypted { + err = rbdVol.ensureEncryptionMetadataSet(rbdImageRequiresEncryption) + if err != nil { + klog.Errorf(util.Log(ctx, "failed to save encryption status, deleting image %s: %s"), + rbdVol, err) + return status.Error(codes.Internal, err.Error()) + } + } return nil } @@ -315,7 +406,6 @@ func (cs *ControllerServer) checkSnapshotSource(ctx context.Context, req *csi.Cr return nil, status.Error(codes.InvalidArgument, "missing requested Snapshot ID") } - return rbdSnap, nil } @@ -335,7 +425,6 @@ func (cs *ControllerServer) DeleteLegacyVolume(ctx context.Context, req *csi.Del defer cs.VolumeLocks.Release(volumeID) rbdVol := &rbdVolume{} - defer rbdVol.Destroy() if err := cs.MetadataStore.Get(volumeID, rbdVol); err != nil { if err, ok := err.(*util.CacheEntryNotFound); ok { klog.Warningf(util.Log(ctx, "metadata for legacy volume %s not found, assuming the volume to be already deleted (%v)"), volumeID, err) @@ -350,6 +439,13 @@ func (cs *ControllerServer) DeleteLegacyVolume(ctx context.Context, req *csi.Del return nil, status.Error(codes.Internal, err.Error()) } + defer rbdVol.Destroy() + + err := rbdVol.Connect(cr) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + // Update rbdImageName as the VolName when dealing with version 1 volumes rbdVol.RbdImageName = rbdVol.VolName @@ -393,7 +489,10 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol } defer cs.VolumeLocks.Release(volumeID) - rbdVol, err := genVolFromVolID(ctx, volumeID, cr, req.GetSecrets()) + rbdVol := &rbdVolume{} + defer rbdVol.Destroy() + + rbdVol, err = genVolFromVolID(ctx, volumeID, cr, req.GetSecrets()) if err != nil { switch err.(type) { case util.ErrPoolNotFound: @@ -420,7 +519,6 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol // All errors other than ErrImageNotFound should return an error back to the caller case ErrImageNotFound: - break default: return nil, status.Error(codes.Internal, err.Error()) } @@ -509,8 +607,10 @@ func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS } defer cr.DeleteCredentials() + rbdVol := &rbdVolume{} + defer rbdVol.Destroy() // Fetch source volume information - rbdVol, err := genVolFromVolID(ctx, req.GetSourceVolumeId(), cr, req.GetSecrets()) + rbdVol, err = genVolFromVolID(ctx, req.GetSourceVolumeId(), cr, req.GetSecrets()) if err != nil { switch err.(type) { case ErrImageNotFound: @@ -536,7 +636,6 @@ func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS return nil, status.Errorf(codes.InvalidArgument, "volume(%s) has not snapshot feature(layering)", req.GetSourceVolumeId()) } - // Create snap volume rbdSnap := genSnapFromOptions(ctx, rbdVol, req.GetParameters()) rbdSnap.RbdImageName = rbdVol.RbdImageName rbdSnap.SizeBytes = rbdVol.VolSize @@ -551,15 +650,45 @@ func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS // Need to check for already existing snapshot name, and if found // check for the requested source volume id and already allocated source volume id - found, err := checkSnapExists(ctx, rbdSnap, cr) + found, err := checkSnapCloneExists(ctx, rbdVol, rbdSnap, cr) if err != nil { if _, ok := err.(util.ErrSnapNameConflict); ok { return nil, status.Error(codes.AlreadyExists, err.Error()) } - return nil, status.Errorf(codes.Internal, err.Error()) } if found { + vol := generateVolFromSnap(rbdSnap) + err = vol.Connect(cr) + if err != nil { + uErr := undoSnapshotCloning(ctx, vol, rbdSnap, vol, cr) + if uErr != nil { + klog.Warningf(util.Log(ctx, "failed undoing reservation of snapshot: %s %v"), req.GetName(), uErr) + } + return nil, status.Errorf(codes.Internal, err.Error()) + } + defer vol.Destroy() + + err = vol.flattenRbdImage(ctx, cr, false) + if _, ok := err.(ErrFlattenInProgress); ok { + return &csi.CreateSnapshotResponse{ + Snapshot: &csi.Snapshot{ + SizeBytes: rbdSnap.SizeBytes, + SnapshotId: rbdSnap.SnapID, + SourceVolumeId: rbdSnap.SourceVolumeID, + CreationTime: rbdSnap.CreatedAt, + ReadyToUse: false, + }, + }, nil + } + if err != nil { + uErr := undoSnapshotCloning(ctx, vol, rbdSnap, vol, cr) + if uErr != nil { + klog.Warningf(util.Log(ctx, "failed undoing reservation of snapshot: %s %v"), req.GetName(), uErr) + } + return nil, status.Errorf(codes.Internal, err.Error()) + } + return &csi.CreateSnapshotResponse{ Snapshot: &csi.Snapshot{ SizeBytes: rbdSnap.SizeBytes, @@ -571,7 +700,7 @@ func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS }, nil } - err = reserveSnap(ctx, rbdSnap, cr) + err = reserveSnap(ctx, rbdSnap, rbdVol, cr) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } @@ -584,18 +713,21 @@ func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS } }() - err = cs.doSnapshot(ctx, rbdSnap, cr) + ready := false + var vol = new(rbdVolume) + + ready, vol, err = cs.doSnapshotClone(ctx, rbdVol, rbdSnap, cr) if err != nil { return nil, err } return &csi.CreateSnapshotResponse{ Snapshot: &csi.Snapshot{ - SizeBytes: rbdSnap.SizeBytes, - SnapshotId: rbdSnap.SnapID, + SizeBytes: vol.VolSize, + SnapshotId: vol.VolID, SourceVolumeId: req.GetSourceVolumeId(), - CreationTime: rbdSnap.CreatedAt, - ReadyToUse: true, + CreationTime: vol.CreatedAt, + ReadyToUse: ready, }, }, nil } @@ -622,26 +754,75 @@ func (cs *ControllerServer) validateSnapshotReq(ctx context.Context, req *csi.Cr return nil } -func (cs *ControllerServer) doSnapshot(ctx context.Context, rbdSnap *rbdSnapshot, cr *util.Credentials) (err error) { - err = createSnapshot(ctx, rbdSnap, cr) - // If snap creation fails, even due to snapname already used, fail, next attempt will get a new - // uuid for use as the snap name +func (cs *ControllerServer) doSnapshotClone(ctx context.Context, parentVol *rbdVolume, rbdSnap *rbdSnapshot, cr *util.Credentials) (bool, *rbdVolume, error) { + // generate cloned volume details from snapshot + cloneRbd := generateVolFromSnap(rbdSnap) + defer cloneRbd.Destroy() + // add image feature for cloneRbd + f := []string{librbd.FeatureNameLayering, librbd.FeatureNameDeepFlatten} + cloneRbd.imageFeatureSet = librbd.FeatureSetFromNames(f) + ready := false + + err := cloneRbd.Connect(cr) + if err != nil { + return ready, cloneRbd, err + } + + err = createRBDClone(ctx, parentVol, cloneRbd, rbdSnap, cr) if err != nil { klog.Errorf(util.Log(ctx, "failed to create snapshot: %v"), err) - return status.Error(codes.Internal, err.Error()) + return ready, cloneRbd, status.Error(codes.Internal, err.Error()) } + defer func() { if err != nil { - errDefer := deleteSnapshot(ctx, rbdSnap, cr) - if errDefer != nil { - klog.Errorf(util.Log(ctx, "failed to delete snapshot: %v"), errDefer) - err = fmt.Errorf("snapshot created but failed to delete snapshot due to"+ - " other failures: %v", err) + if _, ok := err.(ErrFlattenInProgress); !ok { + // cleanup clone and snapshot + errCleanUp := cleanUpSnapshot(ctx, cloneRbd, rbdSnap, cloneRbd, cr) + if errCleanUp != nil { + klog.Errorf(util.Log(ctx, "failed to cleanup snapshot and clone: %v"), errCleanUp) + } } - err = status.Error(codes.Internal, err.Error()) } }() - return err + + err = cloneRbd.createSnapshot(ctx, rbdSnap) + if err != nil { + // update rbd image name for logging + rbdSnap.RbdImageName = cloneRbd.RbdImageName + klog.Errorf(util.Log(ctx, "failed to create snapshot %s: %v"), rbdSnap, err) + return ready, cloneRbd, err + } + + err = cloneRbd.getImageID() + if err != nil { + klog.Errorf(util.Log(ctx, "failed to get image id: %v"), err) + return ready, cloneRbd, err + } + var j = &journal.Connection{} + // save image ID + j, err = snapJournal.Connect(rbdSnap.Monitors, cr) + if err != nil { + klog.Errorf(util.Log(ctx, "failed to connect to cluster: %v"), err) + return ready, cloneRbd, err + } + defer j.Destroy() + + err = j.StoreImageID(ctx, rbdSnap.JournalPool, rbdSnap.ReservedID, cloneRbd.ImageID, cr) + if err != nil { + klog.Errorf(util.Log(ctx, "failed to reserve volume id: %v"), err) + return ready, cloneRbd, err + } + + err = cloneRbd.flattenRbdImage(ctx, cr, false) + if err != nil { + if _, ok := err.(ErrFlattenInProgress); ok { + return ready, cloneRbd, nil + } + return ready, cloneRbd, err + } + ready = true + return ready, cloneRbd, nil } // DeleteSnapshot deletes the snapshot in backend and removes the @@ -685,24 +866,7 @@ func (cs *ControllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteS return &csi.DeleteSnapshotResponse{}, nil } - // All errors other than ErrSnapNotFound should return an error back to the caller - if _, ok := err.(ErrSnapNotFound); !ok { - return nil, status.Error(codes.Internal, err.Error()) - } - - // Consider missing snap as already deleted, and proceed to remove the omap values, - // safeguarding against parallel create or delete requests against the - // same name. - if acquired := cs.SnapshotLocks.TryAcquire(rbdSnap.RequestName); !acquired { - klog.Errorf(util.Log(ctx, util.SnapshotOperationAlreadyExistsFmt), rbdSnap.RequestName) - return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, rbdSnap.RequestName) - } - defer cs.SnapshotLocks.Release(rbdSnap.RequestName) - - if err = undoSnapReservation(ctx, rbdSnap, cr); err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } - return &csi.DeleteSnapshotResponse{}, nil + return nil, status.Error(codes.Internal, err.Error()) } // safeguard against parallel create or delete requests against the same @@ -713,11 +877,52 @@ func (cs *ControllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteS } defer cs.SnapshotLocks.Release(rbdSnap.RequestName) - // Deleting snapshot - klog.V(4).Infof(util.Log(ctx, "deleting Snaphot %s"), rbdSnap) - if err := deleteSnapshot(ctx, rbdSnap, cr); err != nil { - return nil, status.Errorf(codes.FailedPrecondition, - "failed to delete snapshot: %s with error: %v", rbdSnap, err) + // Deleting snapshot and cloned volume + klog.V(4).Infof(util.Log(ctx, "deleting cloned rbd volume %s"), rbdSnap.RbdSnapName) + + rbdVol := generateVolFromSnap(rbdSnap) + + err = rbdVol.Connect(cr) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + defer rbdVol.Destroy() + + err = rbdVol.getImageInfo() + if err != nil { + if _, ok := err.(ErrImageNotFound); !ok { + klog.Errorf(util.Log(ctx, "failed to delete rbd image: %s/%s with error: %v"), rbdVol.Pool, rbdVol.VolName, err) + return nil, status.Error(codes.Internal, err.Error()) + } + } else { + // save image ID + var j = &journal.Connection{} + j, err = snapJournal.Connect(rbdSnap.Monitors, cr) + if err != nil { + klog.Errorf(util.Log(ctx, "failed to connect to cluster: %v"), err) + return nil, status.Error(codes.Internal, err.Error()) + } + defer j.Destroy() + // TODO replace GetStoredImageID with GetImageAttributes in all places + rbdVol.ImageID, err = j.GetStoredImageID(ctx, rbdSnap.JournalPool, rbdSnap.ReservedID, cr) + if err != nil { + klog.Errorf(util.Log(ctx, "failed to get reserved image id: %v"), err) + return nil, status.Error(codes.Internal, err.Error()) + } + // update parent name to delete the snapshot + rbdSnap.RbdImageName = rbdVol.RbdImageName + + err = cleanUpSnapshot(ctx, rbdVol, rbdSnap, rbdVol, cr) + if err != nil { + klog.Errorf(util.Log(ctx, "failed to delete image: %v"), err) + return nil, status.Error(codes.Internal, err.Error()) + } + } + err = undoSnapReservation(ctx, rbdSnap, cr) + if err != nil { + klog.Errorf(util.Log(ctx, "failed to remove reservation for snapname (%s) with backing snap (%s) on image (%s) (%s)"), + rbdSnap.RequestName, rbdSnap.RbdSnapName, rbdSnap.RbdImageName, err) + return nil, status.Error(codes.Internal, err.Error()) } return &csi.DeleteSnapshotResponse{}, nil @@ -753,7 +958,10 @@ func (cs *ControllerServer) ControllerExpandVolume(ctx context.Context, req *csi } defer cr.DeleteCredentials() - rbdVol, err := genVolFromVolID(ctx, volID, cr, req.GetSecrets()) + rbdVol := &rbdVolume{} + defer rbdVol.Destroy() + + rbdVol, err = genVolFromVolID(ctx, volID, cr, req.GetSecrets()) if err != nil { switch err.(type) { case ErrImageNotFound: diff --git a/internal/rbd/errors.go b/internal/rbd/errors.go index 865292e53..7fb8db38f 100644 --- a/internal/rbd/errors.go +++ b/internal/rbd/errors.go @@ -66,3 +66,12 @@ type ErrMissingStash struct { func (e ErrMissingStash) Error() string { return e.err.Error() } + +// ErrFlattenInProgress is returned when flatten is inprogess for an image +type ErrFlattenInProgress struct { + err error +} + +func (e ErrFlattenInProgress) Error() string { + return e.err.Error() +} diff --git a/internal/rbd/rbd_journal.go b/internal/rbd/rbd_journal.go index 38ab53a0c..600f7f7b8 100644 --- a/internal/rbd/rbd_journal.go +++ b/internal/rbd/rbd_journal.go @@ -87,28 +87,31 @@ func validateRbdVol(rbdVol *rbdVolume) error { } /* -checkSnapExists, and its counterpart checkVolExists, function checks if the passed in rbdSnapshot -or rbdVolume exists on the backend. +checkSnapCloneExists, and its counterpart checkVolExists, function checks if +the passed in rbdSnapshot or rbdVolume exists on the backend. -**NOTE:** These functions manipulate the rados omaps that hold information regarding -volume names as requested by the CSI drivers. Hence, these need to be invoked only when the -respective CSI driver generated snapshot or volume name based locks are held, as otherwise racy -access to these omaps may end up leaving them in an inconsistent state. +**NOTE:** These functions manipulate the rados omaps that hold information +regarding volume names as requested by the CSI drivers. Hence, these need to be +invoked only when the respective CSI driver generated snapshot or volume name +based locks are held, as otherwise racy access to these omaps may end up +leaving them in an inconsistent state. -These functions need enough information about cluster and pool (ie, Monitors, Pool, IDs filled in) -to operate. They further require that the RequestName element of the structure have a valid value -to operate on and determine if the said RequestName already exists on the backend. +These functions need enough information about cluster and pool (ie, Monitors, +Pool, IDs filled in) to operate. They further require that the RequestName +element of the structure have a valid value to operate on and determine if the +said RequestName already exists on the backend. -These functions populate the snapshot or the image name, its attributes and the CSI snapshot/volume -ID for the same when successful. +These functions populate the snapshot or the image name, its attributes and the +CSI snapshot/volume ID for the same when successful. -These functions also cleanup omap reservations that are stale. I.e when omap entries exist and -backing images or snapshots are missing, or one of the omaps exist and the next is missing. This is -because, the order of omap creation and deletion are inverse of each other, and protected by the -request name lock, and hence any stale omaps are leftovers from incomplete transactions and are -hence safe to garbage collect. +These functions also cleanup omap reservations that are stale. I.e when omap +entries exist and backing images or snapshots are missing, or one of the omaps +exist and the next is missing. This is because, the order of omap creation and +deletion are inverse of each other, and protected by the request name lock, and +hence any stale omaps are leftovers from incomplete transactions and are hence +safe to garbage collect. */ -func checkSnapExists(ctx context.Context, rbdSnap *rbdSnapshot, cr *util.Credentials) (bool, error) { +func checkSnapCloneExists(ctx context.Context, parentVol *rbdVolume, rbdSnap *rbdSnapshot, cr *util.Credentials) (bool, error) { err := validateRbdSnap(rbdSnap) if err != nil { return false, err @@ -137,6 +140,36 @@ func checkSnapExists(ctx context.Context, rbdSnap *rbdSnapshot, cr *util.Credent snapData.ImagePool, rbdSnap.Pool) } + vol := generateVolFromSnap(rbdSnap) + defer vol.Destroy() + err = vol.Connect(cr) + if err != nil { + return false, err + } + vol.ReservedID = snapUUID + // Fetch on-disk image attributes + err = vol.getImageInfo() + if err != nil { + if _, ok := err.(ErrImageNotFound); ok { + err = parentVol.deleteSnapshot(ctx, rbdSnap) + if err != nil { + if _, ok := err.(ErrSnapNotFound); !ok { + klog.Errorf(util.Log(ctx, "failed to delete snapshot %s: %v"), rbdSnap, err) + return false, err + } + } + err = undoSnapshotCloning(ctx, vol, rbdSnap, vol, cr) + } + return false, err + } + + // Snapshot creation transaction is rolled forward if rbd clone image + // representing the snapshot is found. Any failures till finding the image + // causes a roll back of the snapshot creation transaction. + // Code from here on, rolls the transaction forward. + + rbdSnap.CreatedAt = vol.CreatedAt + rbdSnap.SizeBytes = vol.VolSize // found a snapshot already available, process and return its information rbdSnap.SnapID, err = util.GenerateVolID(ctx, rbdSnap.Monitors, cr, snapData.ImagePoolID, rbdSnap.Pool, rbdSnap.ClusterID, snapUUID, volIDVersion) @@ -144,9 +177,43 @@ func checkSnapExists(ctx context.Context, rbdSnap *rbdSnapshot, cr *util.Credent return false, err } - klog.V(4).Infof(util.Log(ctx, "found existing snap (%s) with snap name (%s) for request (%s)"), - rbdSnap.SnapID, rbdSnap.RbdSnapName, rbdSnap.RequestName) + // check snapshot exists if not create it + _, err = vol.getSnapInfo(rbdSnap) + if _, ok := err.(ErrSnapNotFound); ok { + // create snapshot + sErr := vol.createSnapshot(ctx, rbdSnap) + if sErr != nil { + klog.Errorf(util.Log(ctx, "failed to create snapshot %s: %v"), rbdSnap, sErr) + err = undoSnapshotCloning(ctx, vol, rbdSnap, vol, cr) + return false, err + } + } + if err != nil { + return false, err + } + vol.ImageID, err = j.GetStoredImageID(ctx, vol.JournalPool, vol.ReservedID, cr) + if _, ok := err.(util.ErrKeyNotFound); ok { + sErr := vol.getImageID() + if sErr != nil { + klog.Errorf(util.Log(ctx, "failed to get image id %s: %v"), vol, sErr) + err = undoSnapshotCloning(ctx, vol, rbdSnap, vol, cr) + return false, err + } + sErr = j.StoreImageID(ctx, vol.JournalPool, vol.ReservedID, vol.ImageID, cr) + if sErr != nil { + klog.Errorf(util.Log(ctx, "failed to store volume id %s: %v"), vol, sErr) + err = undoSnapshotCloning(ctx, vol, rbdSnap, vol, cr) + return false, err + } + } + + if err != nil { + return false, err + } + + klog.V(4).Infof(util.Log(ctx, "found existing image (%s) with name (%s) for request (%s)"), + rbdSnap.SnapID, rbdSnap.RbdSnapName, rbdSnap.RequestName) return true, nil } @@ -184,7 +251,7 @@ func (rv *rbdVolume) Exists(ctx context.Context) (bool, error) { return false, nil } - imageUUID := imageData.ImageUUID + rv.ReservedID = imageData.ImageUUID rv.RbdImageName = imageData.ImageAttributes.ImageName // check if topology constraints match what is found @@ -223,7 +290,7 @@ func (rv *rbdVolume) Exists(ctx context.Context) (bool, error) { // found a volume already available, process and return it! rv.VolID, err = util.GenerateVolID(ctx, rv.Monitors, rv.conn.Creds, imageData.ImagePoolID, rv.Pool, - rv.ClusterID, imageUUID, volIDVersion) + rv.ClusterID, rv.ReservedID, volIDVersion) if err != nil { return false, err } @@ -236,10 +303,9 @@ func (rv *rbdVolume) Exists(ctx context.Context) (bool, error) { // reserveSnap is a helper routine to request a rbdSnapshot name reservation and generate the // volume ID for the generated name -func reserveSnap(ctx context.Context, rbdSnap *rbdSnapshot, cr *util.Credentials) error { +func reserveSnap(ctx context.Context, rbdSnap *rbdSnapshot, rbdVol *rbdVolume, cr *util.Credentials) error { var ( - snapUUID string - err error + err error ) journalPoolID, imagePoolID, err := util.GetPoolIDs(ctx, rbdSnap.Monitors, rbdSnap.JournalPool, rbdSnap.Pool, cr) @@ -253,15 +319,15 @@ func reserveSnap(ctx context.Context, rbdSnap *rbdSnapshot, cr *util.Credentials } defer j.Destroy() - snapUUID, rbdSnap.RbdSnapName, err = j.ReserveName( + rbdSnap.ReservedID, rbdSnap.RbdSnapName, err = j.ReserveName( ctx, rbdSnap.JournalPool, journalPoolID, rbdSnap.Pool, imagePoolID, - rbdSnap.RequestName, rbdSnap.NamePrefix, rbdSnap.RbdImageName, "") + rbdSnap.RequestName, rbdSnap.NamePrefix, rbdVol.RbdImageName, "") if err != nil { return err } rbdSnap.SnapID, err = util.GenerateVolID(ctx, rbdSnap.Monitors, cr, imagePoolID, rbdSnap.Pool, - rbdSnap.ClusterID, snapUUID, volIDVersion) + rbdSnap.ClusterID, rbdSnap.ReservedID, volIDVersion) if err != nil { return err } @@ -307,8 +373,7 @@ func updateTopologyConstraints(rbdVol *rbdVolume, rbdSnap *rbdSnapshot) error { // volume ID for the generated name func reserveVol(ctx context.Context, rbdVol *rbdVolume, rbdSnap *rbdSnapshot, cr *util.Credentials) error { var ( - imageUUID string - err error + err error ) err = updateTopologyConstraints(rbdVol, rbdSnap) @@ -332,7 +397,7 @@ func reserveVol(ctx context.Context, rbdVol *rbdVolume, rbdSnap *rbdSnapshot, cr } defer j.Destroy() - imageUUID, rbdVol.RbdImageName, err = j.ReserveName( + rbdVol.ReservedID, rbdVol.RbdImageName, err = j.ReserveName( ctx, rbdVol.JournalPool, journalPoolID, rbdVol.Pool, imagePoolID, rbdVol.RequestName, rbdVol.NamePrefix, "", kmsID) if err != nil { @@ -340,7 +405,7 @@ func reserveVol(ctx context.Context, rbdVol *rbdVolume, rbdSnap *rbdSnapshot, cr } rbdVol.VolID, err = util.GenerateVolID(ctx, rbdVol.Monitors, cr, imagePoolID, rbdVol.Pool, - rbdVol.ClusterID, imageUUID, volIDVersion) + rbdVol.ClusterID, rbdVol.ReservedID, volIDVersion) if err != nil { return err } diff --git a/internal/rbd/rbd_util.go b/internal/rbd/rbd_util.go index c2459d93c..5c4900486 100644 --- a/internal/rbd/rbd_util.go +++ b/internal/rbd/rbd_util.go @@ -33,6 +33,7 @@ import ( "github.com/ceph/go-ceph/rados" librbd "github.com/ceph/go-ceph/rbd" "github.com/container-storage-interface/spec/lib/go/csi" + "github.com/golang/protobuf/ptypes" "github.com/golang/protobuf/ptypes/timestamp" "github.com/pborman/uuid" "github.com/pkg/errors" @@ -62,6 +63,9 @@ const ( rbdImageRequiresEncryption = "requiresEncryption" // image metadata key for encryption encryptionMetaKey = ".rbd.csi.ceph.com/encrypted" + + // go-ceph will provide rbd.ImageOptionCloneFormat + imageOptionCloneFormat = librbd.RbdImageOption(12) ) // rbdVolume represents a CSI volume and its RBD image specifics @@ -89,6 +93,7 @@ type rbdVolume struct { Pool string `json:"pool"` DataPool string ImageID string + ParentName string imageFeatureSet librbd.FeatureSet AdminID string `json:"adminId"` UserID string `json:"userId"` @@ -103,7 +108,7 @@ type rbdVolume struct { Encrypted bool readOnly bool KMS util.EncryptionKMS - + CreatedAt *timestamp.Timestamp // conn is a connection to the Ceph cluster obtained from a ConnPool conn *util.ClusterConnection // an opened IOContext, call .openIoctx() before using @@ -391,24 +396,113 @@ func deleteImage(ctx context.Context, pOpts *rbdVolume, cr *util.Credentials) er return nil } -func (rv *rbdVolume) removeImageFromTrash() error { - err := rv.openIoctx() +func (rv *rbdVolume) getCloneDepth(ctx context.Context) (uint, error) { + var depth uint + vol := rbdVolume{ + Pool: rv.Pool, + Monitors: rv.Monitors, + RbdImageName: rv.RbdImageName, + conn: rv.conn, + } + + err := vol.openIoctx() + if err != nil { + return depth, err + } + + defer func() { + vol.ioctx.Destroy() + }() + for { + if vol.RbdImageName == "" { + return depth, nil + } + err = vol.getImageInfo() + if err != nil { + klog.Errorf(util.Log(ctx, "failed to check depth on image %s: %s"), vol, err) + return depth, err + } + if vol.ParentName != "" { + depth++ + } + vol.RbdImageName = vol.ParentName + } +} + +func (rv *rbdVolume) flattenRbdImage(ctx context.Context, cr *util.Credentials, forceFlatten bool) error { + depth, err := rv.getCloneDepth(ctx) if err != nil { return err } - list, err := librbd.GetTrashList(rv.ioctx) - if err != nil { - return err - } - for _, l := range list { - if l.Name == rv.RbdImageName { - err = librbd.TrashRemove(rv.ioctx, l.Id, true) - return err + klog.Infof(util.Log(ctx, "clone depth is (%d), configured softlimit (%d) and hardlimit (%d) for %s"), depth, rbdSoftMaxCloneDepth, rbdHardMaxCloneDepth, rv) + + if forceFlatten || (depth >= rbdHardMaxCloneDepth) || (depth >= rbdSoftMaxCloneDepth) { + args := []string{"flatten", rv.Pool + "/" + rv.RbdImageName, "--id", cr.ID, "--keyfile=" + cr.KeyFile, "-m", rv.Monitors} + supported, err := addRbdManagerTask(ctx, rv, args) + if supported { + if err != nil { + klog.Errorf(util.Log(ctx, "failed to add task flatten for %s : %v"), rv, err) + return err + } + if forceFlatten || depth >= rbdHardMaxCloneDepth { + return ErrFlattenInProgress{err: fmt.Errorf("flatten is in progress for image %s", rv.RbdImageName)} + } + } + if !supported { + klog.Errorf(util.Log(ctx, "task manager does not support flatten,image will be flattened once hardlimit is reached: %v"), err) + if forceFlatten || depth >= rbdHardMaxCloneDepth { + err = rv.Connect(cr) + if err != nil { + return err + } + rbdImage, err := rv.open() + if err != nil { + return err + } + defer rbdImage.Close() + if err = rbdImage.Flatten(); err != nil { + klog.Errorf(util.Log(ctx, "rbd failed to flatten image %s %s: %v"), rv.Pool, rv.RbdImageName, err) + return err + } + } } } return nil } +func (rv *rbdVolume) hasFeature(feature uint64) bool { + return (uint64(rv.imageFeatureSet) & feature) == feature +} + +func (rv *rbdVolume) checkImageChainHasFeature(ctx context.Context, feature uint64) (bool, error) { + vol := rbdVolume{ + Pool: rv.Pool, + Monitors: rv.Monitors, + RbdImageName: rv.RbdImageName, + conn: rv.conn, + } + err := vol.openIoctx() + if err != nil { + return false, err + } + defer vol.ioctx.Destroy() + + for { + if vol.RbdImageName == "" { + return false, nil + } + err = vol.getImageInfo() + if err != nil { + klog.Errorf(util.Log(ctx, "failed to get image info for %s: %s"), vol, err) + return false, err + } + if f := vol.hasFeature(feature); f { + return true, nil + } + vol.RbdImageName = vol.ParentName + } +} + // genSnapFromSnapID generates a rbdSnapshot structure from the provided identifier, updating // the structure with elements from on-disk snapshot metadata as well func genSnapFromSnapID(ctx context.Context, rbdSnap *rbdSnapshot, snapshotID string, cr *util.Credentials) error { @@ -454,7 +548,7 @@ func genSnapFromSnapID(ctx context.Context, rbdSnap *rbdSnapshot, snapshotID str rbdSnap.RequestName = imageAttributes.RequestName rbdSnap.RbdImageName = imageAttributes.SourceName rbdSnap.RbdSnapName = imageAttributes.ImageName - + rbdSnap.ReservedID = vi.ObjectUUID // convert the journal pool ID to name, for use in DeleteSnapshot cases if imageAttributes.JournalPoolID != util.InvalidPoolID { rbdSnap.JournalPool, err = util.GetPoolName(rbdSnap.Monitors, cr, imageAttributes.JournalPoolID) @@ -478,13 +572,13 @@ func genVolFromVolID(ctx context.Context, volumeID string, cr *util.Credentials, options = make(map[string]string) // rbdVolume fields that are not filled up in this function are: - // Mounter, MultiNodeWritable + // Mounter, MultiNodeWritable rbdVol = &rbdVolume{VolID: volumeID} err := vi.DecomposeCSIID(rbdVol.VolID) if err != nil { err = fmt.Errorf("error decoding volume ID (%s) (%s)", err, rbdVol.VolID) - return nil, ErrInvalidVolID{err} + return rbdVol, ErrInvalidVolID{err} } rbdVol.ClusterID = vi.ClusterID @@ -492,53 +586,71 @@ func genVolFromVolID(ctx context.Context, volumeID string, cr *util.Credentials, rbdVol.Monitors, _, err = getMonsAndClusterID(ctx, options) if err != nil { - return nil, err + return rbdVol, err } rbdVol.Pool, err = util.GetPoolName(rbdVol.Monitors, cr, vi.LocationID) if err != nil { - return nil, err + return rbdVol, err } err = rbdVol.Connect(cr) if err != nil { - return nil, err + return rbdVol, err } rbdVol.JournalPool = rbdVol.Pool j, err := volJournal.Connect(rbdVol.Monitors, cr) if err != nil { - return nil, err + return rbdVol, err } defer j.Destroy() imageAttributes, err := j.GetImageAttributes( ctx, rbdVol.Pool, vi.ObjectUUID, false) if err != nil { - return nil, err + return rbdVol, err } + rbdVol.RequestName = imageAttributes.RequestName + rbdVol.RbdImageName = imageAttributes.ImageName + rbdVol.ReservedID = vi.ObjectUUID + if imageAttributes.KmsID != "" { rbdVol.Encrypted = true rbdVol.KMS, err = util.GetKMS(imageAttributes.KmsID, secrets) if err != nil { - return nil, err + return rbdVol, err } } - rbdVol.RequestName = imageAttributes.RequestName - rbdVol.RbdImageName = imageAttributes.ImageName - // convert the journal pool ID to name, for use in DeleteVolume cases if imageAttributes.JournalPoolID >= 0 { rbdVol.JournalPool, err = util.GetPoolName(rbdVol.Monitors, cr, imageAttributes.JournalPoolID) if err != nil { // TODO: If pool is not found we may leak the image (as DeleteVolume will return success) - return nil, err + return rbdVol, err } } - err = rbdVol.getImageInfo() + rbdVol.ImageID, err = j.GetStoredImageID(ctx, rbdVol.JournalPool, rbdVol.ReservedID, cr) + if _, ok := err.(util.ErrKeyNotFound); ok { + err = rbdVol.getImageID() + if err != nil { + klog.Errorf(util.Log(ctx, "failed to get image id %s: %v"), rbdVol, err) + return rbdVol, err + } + err = j.StoreImageID(ctx, rbdVol.JournalPool, rbdVol.ReservedID, rbdVol.ImageID, cr) + if err != nil { + klog.Errorf(util.Log(ctx, "failed to store volume id %s: %v"), rbdVol, err) + return rbdVol, err + } + } + if err != nil { + klog.Errorf(util.Log(ctx, "failed to get stored image id: %v"), err) + return rbdVol, err + } + err = rbdVol.getImageInfo() return rbdVol, err } @@ -789,6 +901,62 @@ func (rv *rbdVolume) cloneRbdImageFromSnapshot(ctx context.Context, pSnapOpts *r return nil } +// imageInfo strongly typed JSON spec for image info +type imageInfo struct { + ObjectUUID string `json:"name"` + Size int64 `json:"size"` + Features []string `json:"features"` + CreatedAt string `json:"create_timestamp"` + Parent parentInfo `json:"parent"` +} + +// parentInfo spec for parent volume info +type parentInfo struct { + Image string `json:"image"` + Pool string `json:"pool"` + Snapshot string `json:"snapshost"` +} + +// updateVolWithImageInfo updates provided rbdVolume with information from on-disk data +// regarding the same +func (rv *rbdVolume) updateVolWithImageInfo(cr *util.Credentials) error { + // rbd --format=json info [image-spec | snap-spec] + var imgInfo imageInfo + + stdout, stderr, err := util.ExecCommand("rbd", + "-m", rv.Monitors, + "--id", cr.ID, + "--keyfile="+cr.KeyFile, + "-c", util.CephConfigPath, + "--format="+"json", + "info", rv.String()) + if err != nil { + klog.Errorf("failed getting information for image (%s): (%s)", rv, err) + if strings.Contains(string(stderr), "rbd: error opening image "+rv.RbdImageName+ + ": (2) No such file or directory") { + return ErrImageNotFound{rv.String(), err} + } + return err + } + + err = json.Unmarshal(stdout, &imgInfo) + if err != nil { + klog.Errorf("failed to parse JSON output of image info (%s): (%s)", rv, err) + return fmt.Errorf("unmarshal failed: %+v. raw buffer response: %s", err, string(stdout)) + } + + rv.VolSize = imgInfo.Size + rv.ParentName = imgInfo.Parent.Image + + tm, err := time.Parse(time.ANSIC, imgInfo.CreatedAt) + if err != nil { + return err + } + + rv.CreatedAt, err = ptypes.TimestampProto(tm) + return err +} + // getImageInfo queries rbd about the given image and returns its metadata, and returns // ErrImageNotFound if provided image is not found func (rv *rbdVolume) getImageInfo() error { @@ -810,6 +978,10 @@ func (rv *rbdVolume) getImageInfo() error { return err } rv.imageFeatureSet = librbd.FeatureSet(features) + err = rv.updateVolWithImageInfo(rv.conn.Creds) + if err != nil { + return err + } return nil } diff --git a/internal/rbd/snapshot.go b/internal/rbd/snapshot.go new file mode 100644 index 000000000..e0642f08a --- /dev/null +++ b/internal/rbd/snapshot.go @@ -0,0 +1,101 @@ +/* +Copyright 2020 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package rbd + +import ( + "context" + + "github.com/ceph/ceph-csi/internal/util" + + "github.com/pkg/errors" + "k8s.io/klog" +) + +func createRBDClone(ctx context.Context, parentVol, cloneRbdVol *rbdVolume, snap *rbdSnapshot, cr *util.Credentials) error { + // create snapshot + err := parentVol.createSnapshot(ctx, snap) + if err != nil { + klog.Errorf(util.Log(ctx, "failed to create snapshot %s: %v"), snap, err) + return err + } + + // create clone image and delete snapshot + err = cloneRbdVol.cloneRbdImageFromSnapshot(ctx, snap) + if err != nil { + klog.Errorf(util.Log(ctx, "failed to clone rbd image %s from snapshot %s: %v"), cloneRbdVol.RbdImageName, snap.RbdSnapName, err) + err = errors.Errorf("failed to clone rbd image %s from snapshot %s: %v", cloneRbdVol.RbdImageName, snap.RbdSnapName, err) + } + errSnap := parentVol.deleteSnapshot(ctx, snap) + if errSnap != nil { + klog.Errorf(util.Log(ctx, "failed to delete snapshot: %v"), errSnap) + delErr := deleteImage(ctx, cloneRbdVol, cr) + if delErr != nil { + klog.Errorf(util.Log(ctx, "failed to delete rbd image: %s with error: %v"), cloneRbdVol, delErr) + } + return err + } + + err = cloneRbdVol.getImageInfo() + if err != nil { + klog.Errorf(util.Log(ctx, "failed to get rbd image: %s details with error: %v"), cloneRbdVol, err) + delErr := deleteImage(ctx, cloneRbdVol, cr) + if delErr != nil { + klog.Errorf(util.Log(ctx, "failed to delete rbd image: %s with error: %v"), cloneRbdVol, delErr) + } + return err + } + + return nil +} + +func cleanUpSnapshot(ctx context.Context, parentVol *rbdVolume, rbdSnap *rbdSnapshot, rbdVol *rbdVolume, cr *util.Credentials) error { + err := parentVol.deleteSnapshot(ctx, rbdSnap) + if err != nil { + if _, ok := err.(ErrSnapNotFound); !ok { + klog.Errorf(util.Log(ctx, "failed to delete snapshot: %v"), err) + return err + } + } + err = deleteImage(ctx, rbdVol, cr) + if err != nil { + if _, ok := err.(ErrImageNotFound); !ok { + klog.Errorf(util.Log(ctx, "failed to delete rbd image: %s/%s with error: %v"), rbdVol.Pool, rbdVol.VolName, err) + return err + } + } + return nil +} + +func generateVolFromSnap(rbdSnap *rbdSnapshot) *rbdVolume { + vol := new(rbdVolume) + vol.ClusterID = rbdSnap.ClusterID + vol.VolID = rbdSnap.SnapID + vol.Monitors = rbdSnap.Monitors + vol.Pool = rbdSnap.Pool + vol.JournalPool = rbdSnap.JournalPool + vol.RbdImageName = rbdSnap.RbdSnapName + return vol +} + +func undoSnapshotCloning(ctx context.Context, parentVol *rbdVolume, rbdSnap *rbdSnapshot, cloneVol *rbdVolume, cr *util.Credentials) error { + err := cleanUpSnapshot(ctx, parentVol, rbdSnap, cloneVol, cr) + if err != nil { + klog.Errorf(util.Log(ctx, "failed to clean up %s or %s: %v"), cloneVol, rbdSnap, err) + return err + } + err = undoSnapReservation(ctx, rbdSnap, cr) + return err +}