From 9c000add29dade837ac44e2fc7ba8ed3e9bd83af Mon Sep 17 00:00:00 2001 From: Humble Chirammal Date: Fri, 7 Aug 2020 12:37:36 +0530 Subject: [PATCH] cephfs: Change checkVolExist for snapshot and clone workflow Signed-off-by: Humble Chirammal --- internal/cephfs/fsjournal.go | 91 +++++++++++++++++++++++++++++++----- 1 file changed, 79 insertions(+), 12 deletions(-) diff --git a/internal/cephfs/fsjournal.go b/internal/cephfs/fsjournal.go index 3c7040847..5ac021f37 100644 --- a/internal/cephfs/fsjournal.go +++ b/internal/cephfs/fsjournal.go @@ -19,8 +19,12 @@ package cephfs import ( "context" "errors" + "fmt" "github.com/ceph/ceph-csi/internal/util" + + "github.com/golang/protobuf/ptypes/timestamp" + klog "k8s.io/klog/v2" ) // volumeIdentifier structure contains an association between the CSI VolumeID to its subvolume @@ -30,6 +34,14 @@ type volumeIdentifier struct { VolumeID string } +type snapshotIdentifier struct { + FsSnapshotName string + SnapshotID string + RequestName string + CreationTime *timestamp.Timestamp + FsSubvolName string +} + /* checkVolExists checks to determine if passed in RequestName in volOptions exists on the backend. @@ -44,15 +56,15 @@ because, the order of omap creation and deletion are inverse of each other, and request name lock, and hence any stale omaps are leftovers from incomplete transactions and are hence safe to garbage collect. */ -func checkVolExists(ctx context.Context, volOptions *volumeOptions, secret map[string]string) (*volumeIdentifier, error) { +// nolint:gocognit:gocyclo // TODO: reduce complexity +func checkVolExists(ctx context.Context, + volOptions, + parentVolOpt *volumeOptions, + + pvID *volumeIdentifier, + sID *snapshotIdentifier, + cr *util.Credentials) (*volumeIdentifier, error) { var vid volumeIdentifier - - cr, err := util.NewAdminCredentials(secret) - if err != nil { - return nil, err - } - defer cr.DeleteCredentials() - j, err := volJournal.Connect(volOptions.Monitors, cr) if err != nil { return nil, err @@ -70,14 +82,62 @@ func checkVolExists(ctx context.Context, volOptions *volumeOptions, secret map[s imageUUID := imageData.ImageUUID vid.FsSubvolName = imageData.ImageAttributes.ImageName - _, err = getVolumeRootPathCeph(ctx, volOptions, cr, volumeID(vid.FsSubvolName)) - if err != nil { - if errors.Is(err, ErrVolumeNotFound) { + if sID != nil || pvID != nil { + clone, cloneInfoErr := getCloneInfo(ctx, volOptions, cr, volumeID(vid.FsSubvolName)) + if cloneInfoErr != nil { + if errors.Is(cloneInfoErr, ErrVolumeNotFound) { + if pvID != nil { + err = cleanupCloneFromSubvolumeSnapshot( + ctx, volumeID(pvID.FsSubvolName), + volumeID(vid.FsSubvolName), + parentVolOpt, + cr) + if err != nil { + return nil, err + } + } + err = j.UndoReservation(ctx, volOptions.MetadataPool, + volOptions.MetadataPool, vid.FsSubvolName, volOptions.RequestName) + return nil, err + } + return nil, err + } + if clone.Status.State == cephFSCloneInprogress { + return nil, ErrCloneInProgress + } + if clone.Status.State == cephFSCloneFailed { + err = purgeVolume(ctx, volumeID(vid.FsSubvolName), cr, volOptions, true) + if err != nil { + klog.Errorf(util.Log(ctx, "failed to delete volume %s: %v"), vid.FsSubvolName, err) + return nil, err + } + if pvID != nil { + err = cleanupCloneFromSubvolumeSnapshot( + ctx, volumeID(pvID.FsSubvolName), + volumeID(vid.FsSubvolName), + parentVolOpt, + cr) + if err != nil { + return nil, err + } + } err = j.UndoReservation(ctx, volOptions.MetadataPool, volOptions.MetadataPool, vid.FsSubvolName, volOptions.RequestName) return nil, err } - return nil, err + if clone.Status.State != cephFSCloneComplete { + return nil, fmt.Errorf("clone is not in complete state for %s", vid.FsSubvolName) + } + } else { + _, err = getVolumeRootPathCeph(ctx, volOptions, cr, volumeID(vid.FsSubvolName)) + if err != nil { + if errors.Is(err, ErrVolumeNotFound) { + err = j.UndoReservation(ctx, volOptions.MetadataPool, + volOptions.MetadataPool, vid.FsSubvolName, volOptions.RequestName) + return nil, err + } + return nil, err + } } // check if topology constraints match what is found @@ -96,6 +156,13 @@ func checkVolExists(ctx context.Context, volOptions *volumeOptions, secret map[s util.DebugLog(ctx, "Found existing volume (%s) with subvolume name (%s) for request (%s)", vid.VolumeID, vid.FsSubvolName, volOptions.RequestName) + if parentVolOpt != nil && pvID != nil { + err = cleanupCloneFromSubvolumeSnapshot(ctx, volumeID(pvID.FsSubvolName), volumeID(vid.FsSubvolName), parentVolOpt, cr) + if err != nil { + return nil, err + } + } + return &vid, nil }