From 8cd901d2ddc09c57dee924a0257a656c9d7199a0 Mon Sep 17 00:00:00 2001 From: Madhu Rajanna Date: Mon, 8 Feb 2021 12:45:23 +0530 Subject: [PATCH] cephfs: add subvolume path to volume context There are many usecases with adding the subvolume path to the PV object. the volume context returned in the createVolumeResponse is added to the PV object by the external provisioner. More Details about the usecases are in below link https://github.com/rook/rook/issues/5471 Signed-off-by: Madhu Rajanna --- internal/cephfs/controllerserver.go | 22 ++++++++++++++++++++++ internal/cephfs/fsjournal.go | 21 ++++++++++++++------- 2 files changed, 36 insertions(+), 7 deletions(-) diff --git a/internal/cephfs/controllerserver.go b/internal/cephfs/controllerserver.go index 032778683..4b2eec6d1 100644 --- a/internal/cephfs/controllerserver.go +++ b/internal/cephfs/controllerserver.go @@ -207,6 +207,7 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol } volumeContext := req.GetParameters() volumeContext["subvolumeName"] = vID.FsSubvolName + volumeContext["subvolumePath"] = volOptions.RootPath volume := &csi.Volume{ VolumeId: vID.VolumeID, CapacityBytes: volOptions.Size, @@ -250,10 +251,31 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol } return nil, err } + + volOptions.RootPath, err = volOptions.getVolumeRootPathCeph(ctx, volumeID(vID.FsSubvolName)) + if err != nil { + purgeErr := volOptions.purgeVolume(ctx, volumeID(vID.FsSubvolName), true) + if purgeErr != nil { + util.ErrorLog(ctx, "failed to delete volume %s: %v", vID.FsSubvolName, purgeErr) + // All errors other than ErrVolumeNotFound should return an error back to the caller + if !errors.Is(purgeErr, ErrVolumeNotFound) { + // If the subvolume deletion is failed, we should not cleanup + // the OMAP entry it will stale subvolume in cluster. + // set err=nil so that when we get the request again we can get + // the subvolume info. + err = nil + return nil, status.Error(codes.Internal, purgeErr.Error()) + } + } + util.ErrorLog(ctx, "failed to get subvolume path %s: %v", vID.FsSubvolName, err) + return nil, status.Error(codes.Internal, err.Error()) + } + util.DebugLog(ctx, "cephfs: successfully created backing volume named %s for request name %s", vID.FsSubvolName, requestName) volumeContext := req.GetParameters() volumeContext["subvolumeName"] = vID.FsSubvolName + volumeContext["subvolumePath"] = volOptions.RootPath volume := &csi.Volume{ VolumeId: vID.VolumeID, CapacityBytes: volOptions.Size, diff --git a/internal/cephfs/fsjournal.go b/internal/cephfs/fsjournal.go index baf187f59..1e06c225d 100644 --- a/internal/cephfs/fsjournal.go +++ b/internal/cephfs/fsjournal.go @@ -129,16 +129,23 @@ func checkVolExists(ctx context.Context, if cloneState != cephFSCloneComplete { return nil, fmt.Errorf("clone is not in complete state for %s", vid.FsSubvolName) } - } else { - _, err = volOptions.getVolumeRootPathCeph(ctx, volumeID(vid.FsSubvolName)) - if err != nil { - if errors.Is(err, ErrVolumeNotFound) { - err = j.UndoReservation(ctx, volOptions.MetadataPool, - volOptions.MetadataPool, vid.FsSubvolName, volOptions.RequestName) - return nil, err + } + volOptions.RootPath, err = volOptions.getVolumeRootPathCeph(ctx, volumeID(vid.FsSubvolName)) + if err != nil { + if errors.Is(err, ErrVolumeNotFound) { + // If the subvolume is not present, cleanup the stale snapshot + // created for clone. + if parentVolOpt != nil && pvID != nil { + err = cleanupCloneFromSubvolumeSnapshot(ctx, volumeID(pvID.FsSubvolName), volumeID(vid.FsSubvolName), parentVolOpt) + if err != nil { + return nil, err + } } + err = j.UndoReservation(ctx, volOptions.MetadataPool, + volOptions.MetadataPool, vid.FsSubvolName, volOptions.RequestName) return nil, err } + return nil, err } // check if topology constraints match what is found