From e9802c4940b1ae491e6890e52700508b428fac55 Mon Sep 17 00:00:00 2001 From: Madhu Rajanna Date: Tue, 15 Feb 2022 17:41:09 +0530 Subject: [PATCH] cephfs: refactor cephfs core functions This commits refactors the cephfs core functions with interfaces. This helps in better code structuring and writing the unit test cases. update #852 Signed-off-by: Madhu Rajanna --- internal/cephfs/controllerserver.go | 128 +++++++------- internal/cephfs/core/clone.go | 137 +++++++-------- internal/cephfs/core/clone_test.go | 10 +- internal/cephfs/core/snapshot.go | 140 +++++++++------ internal/cephfs/core/volume.go | 160 ++++++++++++------ internal/cephfs/driver.go | 6 +- internal/cephfs/mounter/fuse.go | 6 +- internal/cephfs/mounter/kernel.go | 6 +- internal/cephfs/mounter/volumemounter.go | 6 +- internal/cephfs/nodeserver.go | 14 +- internal/cephfs/{core => store}/fsjournal.go | 63 +++---- .../cephfs/{core => store}/volumeoptions.go | 56 +++--- 12 files changed, 416 insertions(+), 316 deletions(-) rename internal/cephfs/{core => store}/fsjournal.go (89%) rename internal/cephfs/{core => store}/volumeoptions.go (91%) diff --git a/internal/cephfs/controllerserver.go b/internal/cephfs/controllerserver.go index 2e4fb8077..9a35475e8 100644 --- a/internal/cephfs/controllerserver.go +++ b/internal/cephfs/controllerserver.go @@ -23,6 +23,7 @@ import ( "github.com/ceph/ceph-csi/internal/cephfs/core" cerrors "github.com/ceph/ceph-csi/internal/cephfs/errors" + "github.com/ceph/ceph-csi/internal/cephfs/store" fsutil "github.com/ceph/ceph-csi/internal/cephfs/util" csicommon "github.com/ceph/ceph-csi/internal/csi-common" "github.com/ceph/ceph-csi/internal/util" @@ -55,12 +56,12 @@ type ControllerServer struct { func (cs *ControllerServer) createBackingVolume( ctx context.Context, volOptions, - parentVolOpt *core.VolumeOptions, - - vID, - pvID *core.VolumeIdentifier, - sID *core.SnapshotIdentifier) error { + parentVolOpt *store.VolumeOptions, + pvID *store.VolumeIdentifier, + sID *store.SnapshotIdentifier) error { var err error + volClient := core.NewSubVolume(volOptions.GetConnection(), &volOptions.SubVolume, volOptions.ClusterID) + if sID != nil { if err = cs.OperationLocks.GetRestoreLock(sID.SnapshotID); err != nil { log.ErrorLog(ctx, err.Error()) @@ -68,8 +69,12 @@ func (cs *ControllerServer) createBackingVolume( return status.Error(codes.Aborted, err.Error()) } defer cs.OperationLocks.ReleaseRestoreLock(sID.SnapshotID) + snap := core.Snapshot{ + SnapshotID: sID.FsSnapshotName, + SubVolume: &parentVolOpt.SubVolume, + } - err = core.CreateCloneFromSnapshot(ctx, parentVolOpt, volOptions, vID, sID) + err = volClient.CreateCloneFromSnapshot(ctx, snap) if err != nil { log.ErrorLog(ctx, "failed to create clone from snapshot %s: %v", sID.FsSnapshotName, err) @@ -85,12 +90,8 @@ func (cs *ControllerServer) createBackingVolume( return status.Error(codes.Aborted, err.Error()) } defer cs.OperationLocks.ReleaseCloneLock(pvID.VolumeID) - err = core.CreateCloneFromSubvolume( - ctx, - fsutil.VolumeID(pvID.FsSubvolName), - fsutil.VolumeID(vID.FsSubvolName), - volOptions, - parentVolOpt) + err = volClient.CreateCloneFromSubvolume( + ctx, &parentVolOpt.SubVolume) if err != nil { log.ErrorLog(ctx, "failed to create clone from subvolume %s: %v", fsutil.VolumeID(pvID.FsSubvolName), err) @@ -99,8 +100,7 @@ func (cs *ControllerServer) createBackingVolume( return nil } - - if err = core.CreateVolume(ctx, volOptions, fsutil.VolumeID(vID.FsSubvolName), volOptions.Size); err != nil { + if err = volClient.CreateVolume(ctx); err != nil { log.ErrorLog(ctx, "failed to create volume %s: %v", volOptions.RequestName, err) return status.Error(codes.Internal, err.Error()) @@ -112,7 +112,7 @@ func (cs *ControllerServer) createBackingVolume( func checkContentSource( ctx context.Context, req *csi.CreateVolumeRequest, - cr *util.Credentials) (*core.VolumeOptions, *core.VolumeIdentifier, *core.SnapshotIdentifier, error) { + cr *util.Credentials) (*store.VolumeOptions, *store.VolumeIdentifier, *store.SnapshotIdentifier, error) { if req.VolumeContentSource == nil { return nil, nil, nil, nil } @@ -120,7 +120,7 @@ func checkContentSource( switch volumeSource.Type.(type) { case *csi.VolumeContentSource_Snapshot: snapshotID := req.VolumeContentSource.GetSnapshot().GetSnapshotId() - volOpt, _, sid, err := core.NewSnapshotOptionsFromID(ctx, snapshotID, cr) + volOpt, _, sid, err := store.NewSnapshotOptionsFromID(ctx, snapshotID, cr) if err != nil { if errors.Is(err, cerrors.ErrSnapNotFound) { return nil, nil, nil, status.Error(codes.NotFound, err.Error()) @@ -133,7 +133,7 @@ func checkContentSource( case *csi.VolumeContentSource_Volume: // Find the volume using the provided VolumeID volID := req.VolumeContentSource.GetVolume().GetVolumeId() - parentVol, pvID, err := core.NewVolumeOptionsFromVolID(ctx, volID, nil, req.Secrets) + parentVol, pvID, err := store.NewVolumeOptionsFromVolID(ctx, volID, nil, req.Secrets) if err != nil { if !errors.Is(err, cerrors.ErrVolumeNotFound) { return nil, nil, nil, status.Error(codes.NotFound, err.Error()) @@ -179,7 +179,7 @@ func (cs *ControllerServer) CreateVolume( } defer cs.VolumeLocks.Release(requestName) - volOptions, err := core.NewVolumeOptions(ctx, requestName, req, cr) + volOptions, err := store.NewVolumeOptions(ctx, requestName, req, cr) if err != nil { log.ErrorLog(ctx, "validation and extraction of volume options failed: %v", err) @@ -199,7 +199,7 @@ func (cs *ControllerServer) CreateVolume( defer parentVol.Destroy() } - vID, err := core.CheckVolExists(ctx, volOptions, parentVol, pvID, sID, cr) + vID, err := store.CheckVolExists(ctx, volOptions, parentVol, pvID, sID, cr) if err != nil { if cerrors.IsCloneRetryError(err) { return nil, status.Error(codes.Aborted, err.Error()) @@ -211,9 +211,10 @@ func (cs *ControllerServer) CreateVolume( if vID != nil { if sID != nil || pvID != nil { - err = volOptions.ExpandVolume(ctx, fsutil.VolumeID(vID.FsSubvolName), volOptions.Size) + volClient := core.NewSubVolume(volOptions.GetConnection(), &volOptions.SubVolume, volOptions.ClusterID) + err = volClient.ExpandVolume(ctx, volOptions.Size) if err != nil { - purgeErr := volOptions.PurgeVolume(ctx, fsutil.VolumeID(vID.FsSubvolName), false) + purgeErr := volClient.PurgeVolume(ctx, false) if purgeErr != nil { log.ErrorLog(ctx, "failed to delete volume %s: %v", requestName, purgeErr) // All errors other than ErrVolumeNotFound should return an error back to the caller @@ -221,7 +222,7 @@ func (cs *ControllerServer) CreateVolume( return nil, status.Error(codes.Internal, purgeErr.Error()) } } - errUndo := core.UndoVolReservation(ctx, volOptions, *vID, secret) + errUndo := store.UndoVolReservation(ctx, volOptions, *vID, secret) if errUndo != nil { log.WarningLog(ctx, "failed undoing reservation of volume: %s (%s)", requestName, errUndo) @@ -254,7 +255,7 @@ func (cs *ControllerServer) CreateVolume( } // Reservation - vID, err = core.ReserveVol(ctx, volOptions, secret) + vID, err = store.ReserveVol(ctx, volOptions, secret) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } @@ -262,7 +263,7 @@ func (cs *ControllerServer) CreateVolume( defer func() { if err != nil { if !cerrors.IsCloneRetryError(err) { - errDefer := core.UndoVolReservation(ctx, volOptions, *vID, secret) + errDefer := store.UndoVolReservation(ctx, volOptions, *vID, secret) if errDefer != nil { log.WarningLog(ctx, "failed undoing reservation of volume: %s (%s)", requestName, errDefer) @@ -272,7 +273,7 @@ func (cs *ControllerServer) CreateVolume( }() // Create a volume - err = cs.createBackingVolume(ctx, volOptions, parentVol, vID, pvID, sID) + err = cs.createBackingVolume(ctx, volOptions, parentVol, pvID, sID) if err != nil { if cerrors.IsCloneRetryError(err) { return nil, status.Error(codes.Aborted, err.Error()) @@ -281,9 +282,10 @@ func (cs *ControllerServer) CreateVolume( return nil, err } - volOptions.RootPath, err = volOptions.GetVolumeRootPathCeph(ctx, fsutil.VolumeID(vID.FsSubvolName)) + volClient := core.NewSubVolume(volOptions.GetConnection(), &volOptions.SubVolume, volOptions.ClusterID) + volOptions.RootPath, err = volClient.GetVolumeRootPathCeph(ctx) if err != nil { - purgeErr := volOptions.PurgeVolume(ctx, fsutil.VolumeID(vID.FsSubvolName), true) + purgeErr := volClient.PurgeVolume(ctx, true) if purgeErr != nil { log.ErrorLog(ctx, "failed to delete volume %s: %v", vID.FsSubvolName, purgeErr) // All errors other than ErrVolumeNotFound should return an error back to the caller @@ -355,7 +357,7 @@ func (cs *ControllerServer) DeleteVolume( defer cs.OperationLocks.ReleaseDeleteLock(req.GetVolumeId()) // Find the volume using the provided VolumeID - volOptions, vID, err := core.NewVolumeOptionsFromVolID(ctx, string(volID), nil, secrets) + volOptions, vID, err := store.NewVolumeOptionsFromVolID(ctx, string(volID), nil, secrets) if err != nil { // if error is ErrPoolNotFound, the pool is already deleted we dont // need to worry about deleting subvolume or omap data, return success @@ -386,7 +388,7 @@ func (cs *ControllerServer) DeleteVolume( } defer cs.VolumeLocks.Release(volOptions.RequestName) - if err = core.UndoVolReservation(ctx, volOptions, *vID, secrets); err != nil { + if err = store.UndoVolReservation(ctx, volOptions, *vID, secrets); err != nil { return nil, status.Error(codes.Internal, err.Error()) } @@ -410,7 +412,8 @@ func (cs *ControllerServer) DeleteVolume( } defer cr.DeleteCredentials() - if err = volOptions.PurgeVolume(ctx, fsutil.VolumeID(vID.FsSubvolName), false); err != nil { + volClient := core.NewSubVolume(volOptions.GetConnection(), &volOptions.SubVolume, volOptions.ClusterID) + if err = volClient.PurgeVolume(ctx, false); err != nil { log.ErrorLog(ctx, "failed to delete volume %s: %v", volID, err) if errors.Is(err, cerrors.ErrVolumeHasSnapshots) { return nil, status.Error(codes.FailedPrecondition, err.Error()) @@ -421,7 +424,7 @@ func (cs *ControllerServer) DeleteVolume( } } - if err := core.UndoVolReservation(ctx, volOptions, *vID, secrets); err != nil { + if err := store.UndoVolReservation(ctx, volOptions, *vID, secrets); err != nil { return nil, status.Error(codes.Internal, err.Error()) } @@ -484,7 +487,7 @@ func (cs *ControllerServer) ControllerExpandVolume( } defer cr.DeleteCredentials() - volOptions, volIdentifier, err := core.NewVolumeOptionsFromVolID(ctx, volID, nil, secret) + volOptions, volIdentifier, err := store.NewVolumeOptionsFromVolID(ctx, volID, nil, secret) if err != nil { log.ErrorLog(ctx, "validation and extraction of volume options failed: %v", err) @@ -493,8 +496,8 @@ func (cs *ControllerServer) ControllerExpandVolume( defer volOptions.Destroy() RoundOffSize := util.RoundOffBytes(req.GetCapacityRange().GetRequiredBytes()) - - if err = volOptions.ResizeVolume(ctx, fsutil.VolumeID(volIdentifier.FsSubvolName), RoundOffSize); err != nil { + volClient := core.NewSubVolume(volOptions.GetConnection(), &volOptions.SubVolume, volOptions.ClusterID) + if err = volClient.ResizeVolume(ctx, RoundOffSize); err != nil { log.ErrorLog(ctx, "failed to expand volume %s: %v", fsutil.VolumeID(volIdentifier.FsSubvolName), err) return nil, status.Error(codes.Internal, err.Error()) @@ -521,7 +524,7 @@ func (cs *ControllerServer) CreateSnapshot( } defer cr.DeleteCredentials() - clusterData, err := core.GetClusterInformation(req.GetParameters()) + clusterData, err := store.GetClusterInformation(req.GetParameters()) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } @@ -545,7 +548,7 @@ func (cs *ControllerServer) CreateSnapshot( defer cs.OperationLocks.ReleaseSnapshotCreateLock(sourceVolID) // Find the volume using the provided VolumeID - parentVolOptions, vid, err := core.NewVolumeOptionsFromVolID(ctx, sourceVolID, nil, req.GetSecrets()) + parentVolOptions, vid, err := store.NewVolumeOptionsFromVolID(ctx, sourceVolID, nil, req.GetSecrets()) if err != nil { if errors.Is(err, util.ErrPoolNotFound) { log.WarningLog(ctx, "failed to get backend volume for %s: %v", sourceVolID, err) @@ -569,7 +572,7 @@ func (cs *ControllerServer) CreateSnapshot( parentVolOptions.ClusterID) } - cephfsSnap, genSnapErr := core.GenSnapFromOptions(ctx, req) + cephfsSnap, genSnapErr := store.GenSnapFromOptions(ctx, req) if genSnapErr != nil { return nil, status.Error(codes.Internal, genSnapErr.Error()) } @@ -582,7 +585,7 @@ func (cs *ControllerServer) CreateSnapshot( } defer cs.VolumeLocks.Release(sourceVolID) snapName := req.GetName() - sid, snapInfo, err := core.CheckSnapExists(ctx, parentVolOptions, vid.FsSubvolName, cephfsSnap, cr) + sid, snapInfo, err := store.CheckSnapExists(ctx, parentVolOptions, cephfsSnap, cr) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } @@ -591,8 +594,12 @@ func (cs *ControllerServer) CreateSnapshot( // ceph fs subvolume info command got added in 14.2.10 and 15.+ // as we are not able to retrieve the parent size we are rejecting the // request to create snapshot. - // TODO: For this purpose we could make use of cached clusterAdditionalInfo too. - info, err := parentVolOptions.GetSubVolumeInfo(ctx, fsutil.VolumeID(vid.FsSubvolName)) + // TODO: For this purpose we could make use of cached clusterAdditionalInfo + // too. + volClient := core.NewSubVolume(parentVolOptions.GetConnection(), + &parentVolOptions.SubVolume, + parentVolOptions.ClusterID) + info, err := volClient.GetSubVolumeInfo(ctx) if err != nil { // Check error code value against ErrInvalidCommand to understand the cluster // support it or not, It's safe to evaluate as the filtering @@ -603,7 +610,7 @@ func (cs *ControllerServer) CreateSnapshot( "subvolume info command not supported in current ceph cluster") } if sid != nil { - errDefer := core.UndoSnapReservation(ctx, parentVolOptions, *sid, snapName, cr) + errDefer := store.UndoSnapReservation(ctx, parentVolOptions, *sid, snapName, cr) if errDefer != nil { log.WarningLog(ctx, "failed undoing reservation of snapshot: %s (%s)", requestName, errDefer) @@ -617,7 +624,8 @@ func (cs *ControllerServer) CreateSnapshot( // check snapshot is protected protected := true if !(snapInfo.Protected == core.SnapshotIsProtected) { - err = parentVolOptions.ProtectSnapshot(ctx, fsutil.VolumeID(sid.FsSnapshotName), fsutil.VolumeID(vid.FsSubvolName)) + snapClient := core.NewSnapshot(parentVolOptions.GetConnection(), sid.FsSnapshotName, &parentVolOptions.SubVolume) + err = snapClient.ProtectSnapshot(ctx) if err != nil { protected = false log.WarningLog(ctx, "failed to protect snapshot of snapshot: %s (%s)", @@ -637,20 +645,20 @@ func (cs *ControllerServer) CreateSnapshot( } // Reservation - sID, err := core.ReserveSnap(ctx, parentVolOptions, vid.FsSubvolName, cephfsSnap, cr) + sID, err := store.ReserveSnap(ctx, parentVolOptions, vid.FsSubvolName, cephfsSnap, cr) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } defer func() { if err != nil { - errDefer := core.UndoSnapReservation(ctx, parentVolOptions, *sID, snapName, cr) + errDefer := store.UndoSnapReservation(ctx, parentVolOptions, *sID, snapName, cr) if errDefer != nil { log.WarningLog(ctx, "failed undoing reservation of snapshot: %s (%s)", requestName, errDefer) } } }() - snap, err := doSnapshot(ctx, parentVolOptions, vid.FsSubvolName, sID.FsSnapshotName) + snap, err := doSnapshot(ctx, parentVolOptions, sID.FsSnapshotName) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } @@ -668,13 +676,12 @@ func (cs *ControllerServer) CreateSnapshot( func doSnapshot( ctx context.Context, - volOpt *core.VolumeOptions, - subvolumeName, + volOpt *store.VolumeOptions, snapshotName string) (core.SnapshotInfo, error) { - volID := fsutil.VolumeID(subvolumeName) snapID := fsutil.VolumeID(snapshotName) snap := core.SnapshotInfo{} - err := volOpt.CreateSnapshot(ctx, snapID, volID) + snapClient := core.NewSnapshot(volOpt.GetConnection(), snapshotName, &volOpt.SubVolume) + err := snapClient.CreateSnapshot(ctx) if err != nil { log.ErrorLog(ctx, "failed to create snapshot %s %v", snapID, err) @@ -682,13 +689,13 @@ func doSnapshot( } defer func() { if err != nil { - dErr := volOpt.DeleteSnapshot(ctx, snapID, volID) + dErr := snapClient.DeleteSnapshot(ctx) if dErr != nil { log.ErrorLog(ctx, "failed to delete snapshot %s %v", snapID, err) } } }() - snap, err = volOpt.GetSnapshotInfo(ctx, snapID, volID) + snap, err = snapClient.GetSnapshotInfo(ctx) if err != nil { log.ErrorLog(ctx, "failed to get snapshot info %s %v", snapID, err) @@ -700,7 +707,7 @@ func doSnapshot( return snap, err } snap.CreationTime = t - err = volOpt.ProtectSnapshot(ctx, snapID, volID) + err = snapClient.ProtectSnapshot(ctx) if err != nil { log.ErrorLog(ctx, "failed to protect snapshot %s %v", snapID, err) } @@ -764,7 +771,7 @@ func (cs *ControllerServer) DeleteSnapshot( } defer cs.OperationLocks.ReleaseDeleteLock(snapshotID) - volOpt, snapInfo, sid, err := core.NewSnapshotOptionsFromID(ctx, snapshotID, cr) + volOpt, snapInfo, sid, err := store.NewSnapshotOptionsFromID(ctx, snapshotID, cr) if err != nil { switch { case errors.Is(err, util.ErrPoolNotFound): @@ -779,10 +786,10 @@ func (cs *ControllerServer) DeleteSnapshot( // success as deletion is complete return &csi.DeleteSnapshotResponse{}, nil case errors.Is(err, cerrors.ErrSnapNotFound): - err = core.UndoSnapReservation(ctx, volOpt, *sid, sid.RequestName, cr) + err = store.UndoSnapReservation(ctx, volOpt, *sid, sid.FsSnapshotName, cr) if err != nil { log.ErrorLog(ctx, "failed to remove reservation for snapname (%s) with backing snap (%s) (%s)", - sid.RequestName, sid.FsSnapshotName, err) + sid.FsSubvolName, sid.FsSnapshotName, err) return nil, status.Error(codes.Internal, err.Error()) } @@ -792,10 +799,10 @@ func (cs *ControllerServer) DeleteSnapshot( // if the error is ErrVolumeNotFound, the subvolume is already deleted // from backend, Hence undo the omap entries and return success log.ErrorLog(ctx, "Volume not present") - err = core.UndoSnapReservation(ctx, volOpt, *sid, sid.RequestName, cr) + err = store.UndoSnapReservation(ctx, volOpt, *sid, sid.FsSnapshotName, cr) if err != nil { log.ErrorLog(ctx, "failed to remove reservation for snapname (%s) with backing snap (%s) (%s)", - sid.RequestName, sid.FsSnapshotName, err) + sid.FsSubvolName, sid.FsSnapshotName, err) return nil, status.Error(codes.Internal, err.Error()) } @@ -819,17 +826,18 @@ func (cs *ControllerServer) DeleteSnapshot( if snapInfo.HasPendingClones == "yes" { return nil, status.Errorf(codes.FailedPrecondition, "snapshot %s has pending clones", snapshotID) } + snapClient := core.NewSnapshot(volOpt.GetConnection(), sid.FsSnapshotName, &volOpt.SubVolume) if snapInfo.Protected == core.SnapshotIsProtected { - err = volOpt.UnprotectSnapshot(ctx, fsutil.VolumeID(sid.FsSnapshotName), fsutil.VolumeID(sid.FsSubvolName)) + err = snapClient.UnprotectSnapshot(ctx) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } } - err = volOpt.DeleteSnapshot(ctx, fsutil.VolumeID(sid.FsSnapshotName), fsutil.VolumeID(sid.FsSubvolName)) + err = snapClient.DeleteSnapshot(ctx) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } - err = core.UndoSnapReservation(ctx, volOpt, *sid, sid.RequestName, cr) + err = store.UndoSnapReservation(ctx, volOpt, *sid, sid.FsSnapshotName, cr) if err != nil { log.ErrorLog(ctx, "failed to remove reservation for snapname (%s) with backing snap (%s) (%s)", sid.RequestName, sid.FsSnapshotName, err) diff --git a/internal/cephfs/core/clone.go b/internal/cephfs/core/clone.go index b821d0aa7..8dabb74de 100644 --- a/internal/cephfs/core/clone.go +++ b/internal/cephfs/core/clone.go @@ -21,7 +21,6 @@ import ( "errors" cerrors "github.com/ceph/ceph-csi/internal/cephfs/errors" - fsutil "github.com/ceph/ceph-csi/internal/cephfs/util" "github.com/ceph/ceph-csi/internal/util/log" ) @@ -29,16 +28,16 @@ import ( type cephFSCloneState string const ( - // cephFSCloneError indicates that fetching the clone state returned an error. - cephFSCloneError = cephFSCloneState("") - // cephFSCloneFailed indicates that clone is in failed state. - cephFSCloneFailed = cephFSCloneState("failed") - // cephFSClonePending indicates that clone is in pending state. - cephFSClonePending = cephFSCloneState("pending") - // cephFSCloneInprogress indicates that clone is in in-progress state. - cephFSCloneInprogress = cephFSCloneState("in-progress") - // cephFSCloneComplete indicates that clone is in complete state. - cephFSCloneComplete = cephFSCloneState("complete") + // CephFSCloneError indicates that fetching the clone state returned an error. + CephFSCloneError = cephFSCloneState("") + // CephFSCloneFailed indicates that clone is in failed state. + CephFSCloneFailed = cephFSCloneState("failed") + // CephFSClonePending indicates that clone is in pending state. + CephFSClonePending = cephFSCloneState("pending") + // CephFSCloneInprogress indicates that clone is in in-progress state. + CephFSCloneInprogress = cephFSCloneState("in-progress") + // CephFSCloneComplete indicates that clone is in complete state. + CephFSCloneComplete = cephFSCloneState("complete") // SnapshotIsProtected string indicates that the snapshot is currently protected. SnapshotIsProtected = "yes" @@ -47,28 +46,28 @@ const ( // toError checks the state of the clone if it's not cephFSCloneComplete. func (cs cephFSCloneState) toError() error { switch cs { - case cephFSCloneComplete: + case CephFSCloneComplete: return nil - case cephFSCloneError: + case CephFSCloneError: return cerrors.ErrInvalidClone - case cephFSCloneInprogress: + case CephFSCloneInprogress: return cerrors.ErrCloneInProgress - case cephFSClonePending: + case CephFSClonePending: return cerrors.ErrClonePending - case cephFSCloneFailed: + case CephFSCloneFailed: return cerrors.ErrCloneFailed } return nil } -func CreateCloneFromSubvolume( +// CreateCloneFromSubvolume creates a clone from a subvolume. +func (s *subVolumeClient) CreateCloneFromSubvolume( ctx context.Context, - volID, cloneID fsutil.VolumeID, - volOpt, - parentvolOpt *VolumeOptions) error { - snapshotID := cloneID - err := parentvolOpt.CreateSnapshot(ctx, snapshotID, volID) + parentvolOpt *SubVolume) error { + snapshotID := s.VolID + snapClient := NewSnapshot(s.conn, snapshotID, parentvolOpt) + err := snapClient.CreateSnapshot(ctx) if err != nil { log.ErrorLog(ctx, "failed to create snapshot %s %v", snapshotID, err) @@ -82,17 +81,17 @@ func CreateCloneFromSubvolume( ) defer func() { if protectErr != nil { - err = parentvolOpt.DeleteSnapshot(ctx, snapshotID, volID) + err = snapClient.DeleteSnapshot(ctx) if err != nil { log.ErrorLog(ctx, "failed to delete snapshot %s %v", snapshotID, err) } } if cloneErr != nil { - if err = volOpt.PurgeVolume(ctx, cloneID, true); err != nil { - log.ErrorLog(ctx, "failed to delete volume %s: %v", cloneID, err) + if err = s.PurgeVolume(ctx, true); err != nil { + log.ErrorLog(ctx, "failed to delete volume %s: %v", s.VolID, err) } - if err = parentvolOpt.UnprotectSnapshot(ctx, snapshotID, volID); err != nil { + if err = snapClient.UnprotectSnapshot(ctx); err != nil { // In case the snap is already unprotected we get ErrSnapProtectionExist error code // in that case we are safe and we could discard this error and we are good to go // ahead with deletion @@ -100,47 +99,46 @@ func CreateCloneFromSubvolume( log.ErrorLog(ctx, "failed to unprotect snapshot %s %v", snapshotID, err) } } - if err = parentvolOpt.DeleteSnapshot(ctx, snapshotID, volID); err != nil { + if err = snapClient.DeleteSnapshot(ctx); err != nil { log.ErrorLog(ctx, "failed to delete snapshot %s %v", snapshotID, err) } } }() - protectErr = parentvolOpt.ProtectSnapshot(ctx, snapshotID, volID) + protectErr = snapClient.ProtectSnapshot(ctx) if protectErr != nil { log.ErrorLog(ctx, "failed to protect snapshot %s %v", snapshotID, protectErr) return protectErr } - - cloneErr = parentvolOpt.cloneSnapshot(ctx, volID, snapshotID, cloneID, volOpt) + cloneErr = snapClient.CloneSnapshot(ctx, s.SubVolume) if cloneErr != nil { - log.ErrorLog(ctx, "failed to clone snapshot %s %s to %s %v", volID, snapshotID, cloneID, cloneErr) + log.ErrorLog(ctx, "failed to clone snapshot %s %s to %s %v", parentvolOpt.VolID, snapshotID, s.VolID, cloneErr) return cloneErr } - cloneState, cloneErr := volOpt.getCloneState(ctx, cloneID) + cloneState, cloneErr := s.GetCloneState(ctx) if cloneErr != nil { log.ErrorLog(ctx, "failed to get clone state: %v", cloneErr) return cloneErr } - if cloneState != cephFSCloneComplete { - log.ErrorLog(ctx, "clone %s did not complete: %v", cloneID, cloneState.toError()) + if cloneState != CephFSCloneComplete { + log.ErrorLog(ctx, "clone %s did not complete: %v", s.VolID, cloneState.toError()) return cloneState.toError() } - err = volOpt.ExpandVolume(ctx, cloneID, volOpt.Size) + err = s.ExpandVolume(ctx, s.Size) if err != nil { - log.ErrorLog(ctx, "failed to expand volume %s: %v", cloneID, err) + log.ErrorLog(ctx, "failed to expand volume %s: %v", s.VolID, err) return err } // As we completed clone, remove the intermediate snap - if err = parentvolOpt.UnprotectSnapshot(ctx, snapshotID, volID); err != nil { + if err = snapClient.UnprotectSnapshot(ctx); err != nil { // In case the snap is already unprotected we get ErrSnapProtectionExist error code // in that case we are safe and we could discard this error and we are good to go // ahead with deletion @@ -150,7 +148,7 @@ func CreateCloneFromSubvolume( return err } } - if err = parentvolOpt.DeleteSnapshot(ctx, snapshotID, volID); err != nil { + if err = snapClient.DeleteSnapshot(ctx); err != nil { log.ErrorLog(ctx, "failed to delete snapshot %s %v", snapshotID, err) return err @@ -159,14 +157,14 @@ func CreateCloneFromSubvolume( return nil } -func cleanupCloneFromSubvolumeSnapshot( - ctx context.Context, - volID, cloneID fsutil.VolumeID, - parentVolOpt *VolumeOptions) error { +// CleanupSnapshotFromSubvolume removes the snapshot from the subvolume. +func (s *subVolumeClient) CleanupSnapshotFromSubvolume( + ctx context.Context, parentVol *SubVolume) error { // snapshot name is same as clone name as we need a name which can be // identified during PVC-PVC cloning. - snapShotID := cloneID - snapInfo, err := parentVolOpt.GetSnapshotInfo(ctx, snapShotID, volID) + snapShotID := s.VolID + snapClient := NewSnapshot(s.conn, snapShotID, parentVol) + snapInfo, err := snapClient.GetSnapshotInfo(ctx) if err != nil { if errors.Is(err, cerrors.ErrSnapNotFound) { return nil @@ -176,14 +174,14 @@ func cleanupCloneFromSubvolumeSnapshot( } if snapInfo.Protected == SnapshotIsProtected { - err = parentVolOpt.UnprotectSnapshot(ctx, snapShotID, volID) + err = snapClient.UnprotectSnapshot(ctx) if err != nil { log.ErrorLog(ctx, "failed to unprotect snapshot %s %v", snapShotID, err) return err } } - err = parentVolOpt.DeleteSnapshot(ctx, snapShotID, volID) + err = snapClient.DeleteSnapshot(ctx) if err != nil { log.ErrorLog(ctx, "failed to delete snapshot %s %v", snapShotID, err) @@ -193,45 +191,39 @@ func cleanupCloneFromSubvolumeSnapshot( return nil } -func CreateCloneFromSnapshot( - ctx context.Context, - parentVolOpt, volOptions *VolumeOptions, - vID *VolumeIdentifier, - sID *SnapshotIdentifier) error { - snapID := fsutil.VolumeID(sID.FsSnapshotName) - err := parentVolOpt.cloneSnapshot( - ctx, - fsutil.VolumeID(sID.FsSubvolName), - snapID, - fsutil.VolumeID(vID.FsSubvolName), - volOptions) +// CreateSnapshotFromSubvolume creates a clone from subvolume snapshot. +func (s *subVolumeClient) CreateCloneFromSnapshot( + ctx context.Context, snap Snapshot) error { + snapID := snap.SnapshotID + snapClient := NewSnapshot(s.conn, snapID, snap.SubVolume) + err := snapClient.CloneSnapshot(ctx, s.SubVolume) if err != nil { return err } defer func() { if err != nil { if !cerrors.IsCloneRetryError(err) { - if dErr := volOptions.PurgeVolume(ctx, fsutil.VolumeID(vID.FsSubvolName), true); dErr != nil { - log.ErrorLog(ctx, "failed to delete volume %s: %v", vID.FsSubvolName, dErr) + if dErr := s.PurgeVolume(ctx, true); dErr != nil { + log.ErrorLog(ctx, "failed to delete volume %s: %v", s.VolID, dErr) } } } }() - cloneState, err := volOptions.getCloneState(ctx, fsutil.VolumeID(vID.FsSubvolName)) + cloneState, err := s.GetCloneState(ctx) if err != nil { log.ErrorLog(ctx, "failed to get clone state: %v", err) return err } - if cloneState != cephFSCloneComplete { + if cloneState != CephFSCloneComplete { return cloneState.toError() } - err = volOptions.ExpandVolume(ctx, fsutil.VolumeID(vID.FsSubvolName), volOptions.Size) + err = s.ExpandVolume(ctx, s.Size) if err != nil { - log.ErrorLog(ctx, "failed to expand volume %s with error: %v", vID.FsSubvolName, err) + log.ErrorLog(ctx, "failed to expand volume %s with error: %v", s.VolID, err) return err } @@ -239,24 +231,25 @@ func CreateCloneFromSnapshot( return nil } -func (vo *VolumeOptions) getCloneState(ctx context.Context, volID fsutil.VolumeID) (cephFSCloneState, error) { - fsa, err := vo.conn.GetFSAdmin() +// GetCloneState returns the clone state of the subvolume. +func (s *subVolumeClient) GetCloneState(ctx context.Context) (cephFSCloneState, error) { + fsa, err := s.conn.GetFSAdmin() if err != nil { log.ErrorLog( ctx, "could not get FSAdmin, can get clone status for volume %s with ID %s: %v", - vo.FsName, - string(volID), + s.FsName, + s.VolID, err) - return cephFSCloneError, err + return CephFSCloneError, err } - cs, err := fsa.CloneStatus(vo.FsName, vo.SubvolumeGroup, string(volID)) + cs, err := fsa.CloneStatus(s.FsName, s.SubvolumeGroup, s.VolID) if err != nil { - log.ErrorLog(ctx, "could not get clone state for volume %s with ID %s: %v", vo.FsName, string(volID), err) + log.ErrorLog(ctx, "could not get clone state for volume %s with ID %s: %v", s.FsName, s.VolID, err) - return cephFSCloneError, err + return CephFSCloneError, err } return cephFSCloneState(cs.State), nil diff --git a/internal/cephfs/core/clone_test.go b/internal/cephfs/core/clone_test.go index 2b7e21863..72b852b2b 100644 --- a/internal/cephfs/core/clone_test.go +++ b/internal/cephfs/core/clone_test.go @@ -27,11 +27,11 @@ import ( func TestCloneStateToError(t *testing.T) { t.Parallel() errorState := make(map[cephFSCloneState]error) - errorState[cephFSCloneComplete] = nil - errorState[cephFSCloneError] = cerrors.ErrInvalidClone - errorState[cephFSCloneInprogress] = cerrors.ErrCloneInProgress - errorState[cephFSClonePending] = cerrors.ErrClonePending - errorState[cephFSCloneFailed] = cerrors.ErrCloneFailed + errorState[CephFSCloneComplete] = nil + errorState[CephFSCloneError] = cerrors.ErrInvalidClone + errorState[CephFSCloneInprogress] = cerrors.ErrCloneInProgress + errorState[CephFSClonePending] = cerrors.ErrClonePending + errorState[CephFSCloneFailed] = cerrors.ErrCloneFailed for state, err := range errorState { assert.Equal(t, state.toError(), err) diff --git a/internal/cephfs/core/snapshot.go b/internal/cephfs/core/snapshot.go index 4973ce6be..30b23177f 100644 --- a/internal/cephfs/core/snapshot.go +++ b/internal/cephfs/core/snapshot.go @@ -22,7 +22,7 @@ import ( "time" cerrors "github.com/ceph/ceph-csi/internal/cephfs/errors" - fsutil "github.com/ceph/ceph-csi/internal/cephfs/util" + "github.com/ceph/ceph-csi/internal/util" "github.com/ceph/ceph-csi/internal/util/log" "github.com/ceph/go-ceph/cephfs/admin" @@ -36,32 +36,59 @@ const ( autoProtect = "snapshot-autoprotect" ) -// CephfsSnapshot represents a CSI snapshot and its cluster information. -type CephfsSnapshot struct { - NamePrefix string - Monitors string - // MetadataPool & Pool fields are not used atm. But its definitely good to have it in this struct - // so keeping it here - MetadataPool string - Pool string - ClusterID string - RequestName string - // ReservedID represents the ID reserved for a snapshot - ReservedID string +// SnapshotClient is the interface that holds the signature of snapshot methods +// that interacts with CephFS snapshot API's. +type SnapshotClient interface { + // CreateSnapshot creates a snapshot of the subvolume. + CreateSnapshot(ctx context.Context) error + // DeleteSnapshot deletes the snapshot of the subvolume. + DeleteSnapshot(ctx context.Context) error + // GetSnapshotInfo returns the snapshot info of the subvolume. + GetSnapshotInfo(ctx context.Context) (SnapshotInfo, error) + // ProtectSnapshot protects the snapshot of the subvolume. + ProtectSnapshot(ctx context.Context) error + // UnprotectSnapshot unprotects the snapshot of the subvolume. + UnprotectSnapshot(ctx context.Context) error + // CloneSnapshot clones the snapshot of the subvolume. + CloneSnapshot(ctx context.Context, cloneVolOptions *SubVolume) error } -func (vo *VolumeOptions) CreateSnapshot(ctx context.Context, snapID, volID fsutil.VolumeID) error { - fsa, err := vo.conn.GetFSAdmin() +// snapshotClient is the implementation of SnapshotClient interface. +type snapshotClient struct { + *Snapshot // Embedded snapshot struct. + conn *util.ClusterConnection // Cluster connection. +} + +// Snapshot represents a subvolume snapshot and its cluster information. +type Snapshot struct { + SnapshotID string // subvolume snapshot id. + *SubVolume // parent subvolume information. +} + +// NewSnapshot creates a new snapshot client. +func NewSnapshot(conn *util.ClusterConnection, snapshotID string, vol *SubVolume) SnapshotClient { + return &snapshotClient{ + Snapshot: &Snapshot{ + SnapshotID: snapshotID, + SubVolume: vol, + }, + conn: conn, + } +} + +// CreateSnapshot creates a snapshot of the subvolume. +func (s *snapshotClient) CreateSnapshot(ctx context.Context) error { + fsa, err := s.conn.GetFSAdmin() if err != nil { log.ErrorLog(ctx, "could not get FSAdmin: %s", err) return err } - err = fsa.CreateSubVolumeSnapshot(vo.FsName, vo.SubvolumeGroup, string(volID), string(snapID)) + err = fsa.CreateSubVolumeSnapshot(s.FsName, s.SubvolumeGroup, s.VolID, s.SnapshotID) if err != nil { log.ErrorLog(ctx, "failed to create subvolume snapshot %s %s in fs %s: %s", - string(snapID), string(volID), vo.FsName, err) + s.SnapshotID, s.VolID, s.FsName, err) return err } @@ -69,18 +96,19 @@ func (vo *VolumeOptions) CreateSnapshot(ctx context.Context, snapID, volID fsuti return nil } -func (vo *VolumeOptions) DeleteSnapshot(ctx context.Context, snapID, volID fsutil.VolumeID) error { - fsa, err := vo.conn.GetFSAdmin() +// DeleteSnapshot deletes the snapshot of the subvolume. +func (s *snapshotClient) DeleteSnapshot(ctx context.Context) error { + fsa, err := s.conn.GetFSAdmin() if err != nil { log.ErrorLog(ctx, "could not get FSAdmin: %s", err) return err } - err = fsa.ForceRemoveSubVolumeSnapshot(vo.FsName, vo.SubvolumeGroup, string(volID), string(snapID)) + err = fsa.ForceRemoveSubVolumeSnapshot(s.FsName, s.SubvolumeGroup, s.VolID, s.SnapshotID) if err != nil { log.ErrorLog(ctx, "failed to delete subvolume snapshot %s %s in fs %s: %s", - string(snapID), string(volID), vo.FsName, err) + s.SnapshotID, s.VolID, s.FsName, err) return err } @@ -95,16 +123,17 @@ type SnapshotInfo struct { Protected string } -func (vo *VolumeOptions) GetSnapshotInfo(ctx context.Context, snapID, volID fsutil.VolumeID) (SnapshotInfo, error) { +// GetSnapshotInfo returns the snapshot info of the subvolume. +func (s *snapshotClient) GetSnapshotInfo(ctx context.Context) (SnapshotInfo, error) { snap := SnapshotInfo{} - fsa, err := vo.conn.GetFSAdmin() + fsa, err := s.conn.GetFSAdmin() if err != nil { log.ErrorLog(ctx, "could not get FSAdmin: %s", err) return snap, err } - info, err := fsa.SubVolumeSnapshotInfo(vo.FsName, vo.SubvolumeGroup, string(volID), string(snapID)) + info, err := fsa.SubVolumeSnapshotInfo(s.FsName, s.SubvolumeGroup, s.VolID, s.SnapshotID) if err != nil { if errors.Is(err, rados.ErrNotFound) { return snap, cerrors.ErrSnapNotFound @@ -112,9 +141,9 @@ func (vo *VolumeOptions) GetSnapshotInfo(ctx context.Context, snapID, volID fsut log.ErrorLog( ctx, "failed to get subvolume snapshot info %s %s in fs %s with error %s", - string(volID), - string(snapID), - vo.FsName, + s.VolID, + s.SnapshotID, + s.FsName, err) return snap, err @@ -126,21 +155,21 @@ func (vo *VolumeOptions) GetSnapshotInfo(ctx context.Context, snapID, volID fsut return snap, nil } -func (vo *VolumeOptions) ProtectSnapshot(ctx context.Context, snapID, volID fsutil.VolumeID) error { +// ProtectSnapshot protects the snapshot of the subvolume. +func (s *snapshotClient) ProtectSnapshot(ctx context.Context) error { // If "snapshot-autoprotect" feature is present, The ProtectSnapshot // call should be treated as a no-op. - if checkSubvolumeHasFeature(autoProtect, vo.Features) { + if checkSubvolumeHasFeature(autoProtect, s.Features) { return nil } - fsa, err := vo.conn.GetFSAdmin() + fsa, err := s.conn.GetFSAdmin() if err != nil { log.ErrorLog(ctx, "could not get FSAdmin: %s", err) return err } - err = fsa.ProtectSubVolumeSnapshot(vo.FsName, vo.SubvolumeGroup, string(volID), - string(snapID)) + err = fsa.ProtectSubVolumeSnapshot(s.FsName, s.SubvolumeGroup, s.VolID, s.SnapshotID) if err != nil { if errors.Is(err, rados.ErrObjectExists) { return nil @@ -148,9 +177,9 @@ func (vo *VolumeOptions) ProtectSnapshot(ctx context.Context, snapID, volID fsut log.ErrorLog( ctx, "failed to protect subvolume snapshot %s %s in fs %s with error: %s", - string(volID), - string(snapID), - vo.FsName, + s.VolID, + s.SnapshotID, + s.FsName, err) return err @@ -159,21 +188,22 @@ func (vo *VolumeOptions) ProtectSnapshot(ctx context.Context, snapID, volID fsut return nil } -func (vo *VolumeOptions) UnprotectSnapshot(ctx context.Context, snapID, volID fsutil.VolumeID) error { +// UnprotectSnapshot unprotects the snapshot of the subvolume. +func (s *snapshotClient) UnprotectSnapshot(ctx context.Context) error { // If "snapshot-autoprotect" feature is present, The UnprotectSnapshot // call should be treated as a no-op. - if checkSubvolumeHasFeature(autoProtect, vo.Features) { + if checkSubvolumeHasFeature(autoProtect, s.Features) { return nil } - fsa, err := vo.conn.GetFSAdmin() + fsa, err := s.conn.GetFSAdmin() if err != nil { log.ErrorLog(ctx, "could not get FSAdmin: %s", err) return err } - err = fsa.UnprotectSubVolumeSnapshot(vo.FsName, vo.SubvolumeGroup, string(volID), - string(snapID)) + err = fsa.UnprotectSubVolumeSnapshot(s.FsName, s.SubvolumeGroup, s.VolID, + s.SnapshotID) if err != nil { // In case the snap is already unprotected we get ErrSnapProtectionExist error code // in that case we are safe and we could discard this error. @@ -183,9 +213,9 @@ func (vo *VolumeOptions) UnprotectSnapshot(ctx context.Context, snapID, volID fs log.ErrorLog( ctx, "failed to unprotect subvolume snapshot %s %s in fs %s with error: %s", - string(volID), - string(snapID), - vo.FsName, + s.VolID, + s.SnapshotID, + s.FsName, err) return err @@ -194,33 +224,33 @@ func (vo *VolumeOptions) UnprotectSnapshot(ctx context.Context, snapID, volID fs return nil } -func (vo *VolumeOptions) cloneSnapshot( +// CloneSnapshot clones the snapshot of the subvolume. +func (s *snapshotClient) CloneSnapshot( ctx context.Context, - volID, snapID, cloneID fsutil.VolumeID, - cloneVolOptions *VolumeOptions, + cloneSubVol *SubVolume, ) error { - fsa, err := vo.conn.GetFSAdmin() + fsa, err := s.conn.GetFSAdmin() if err != nil { log.ErrorLog(ctx, "could not get FSAdmin: %s", err) return err } co := &admin.CloneOptions{ - TargetGroup: cloneVolOptions.SubvolumeGroup, + TargetGroup: cloneSubVol.SubvolumeGroup, } - if cloneVolOptions.Pool != "" { - co.PoolLayout = cloneVolOptions.Pool + if cloneSubVol.Pool != "" { + co.PoolLayout = cloneSubVol.Pool } - err = fsa.CloneSubVolumeSnapshot(vo.FsName, vo.SubvolumeGroup, string(volID), string(snapID), string(cloneID), co) + err = fsa.CloneSubVolumeSnapshot(s.FsName, s.SubvolumeGroup, s.VolID, s.SnapshotID, cloneSubVol.VolID, co) if err != nil { log.ErrorLog( ctx, "failed to clone subvolume snapshot %s %s in fs %s with error: %s", - string(volID), - string(snapID), - string(cloneID), - vo.FsName, + s.VolID, + s.SnapshotID, + cloneSubVol.VolID, + s.FsName, err) if errors.Is(err, rados.ErrNotFound) { return cerrors.ErrVolumeNotFound diff --git a/internal/cephfs/core/volume.go b/internal/cephfs/core/volume.go index a21c0d2fa..d01b5c143 100644 --- a/internal/cephfs/core/volume.go +++ b/internal/cephfs/core/volume.go @@ -53,20 +53,75 @@ type Subvolume struct { Features []string } +// SubVolumeClient is the interface that holds the signature of subvolume methods +// that interacts with CephFS subvolume API's. +type SubVolumeClient interface { + // GetVolumeRootPathCeph returns the root path of the subvolume. + GetVolumeRootPathCeph(ctx context.Context) (string, error) + // CreateVolume creates a subvolume. + CreateVolume(ctx context.Context) error + // GetSubVolumeInfo returns the subvolume information. + GetSubVolumeInfo(ctx context.Context) (*Subvolume, error) + // ExpandVolume expands the volume if the requested size is greater than + // the subvolume size. + ExpandVolume(ctx context.Context, bytesQuota int64) error + // ResizeVolume resizes the volume. + ResizeVolume(ctx context.Context, bytesQuota int64) error + // PurgSubVolume removes the subvolume. + PurgeVolume(ctx context.Context, force bool) error + + // CreateCloneFromSubVolume creates a clone from the subvolume. + CreateCloneFromSubvolume(ctx context.Context, parentvolOpt *SubVolume) error + // GetCloneState returns the clone state of the subvolume. + GetCloneState(ctx context.Context) (cephFSCloneState, error) + // CreateCloneFromSnapshot creates a clone from the subvolume snapshot. + CreateCloneFromSnapshot(ctx context.Context, snap Snapshot) error + // CleanupSnapshotFromSubvolume removes the snapshot from the subvolume. + CleanupSnapshotFromSubvolume(ctx context.Context, parentVol *SubVolume) error +} + +// subVolumeClient implements SubVolumeClient interface. +type subVolumeClient struct { + *SubVolume // Embedded SubVolume struct. + clusterID string // Cluster ID to check subvolumegroup and resize functionality. + conn *util.ClusterConnection // Cluster connection. +} + +// SubVolume holds the information about the subvolume. +type SubVolume struct { + VolID string // subvolume id. + FsName string // filesystem name. + SubvolumeGroup string // subvolume group name where subvolume will be created. + Pool string // pool name where subvolume will be created. + Features []string // subvolume features. + Size int64 // subvolume size. +} + +// NewSubVolume returns a new subvolume client. +func NewSubVolume(conn *util.ClusterConnection, vol *SubVolume, clusterID string) SubVolumeClient { + return &subVolumeClient{ + SubVolume: vol, + clusterID: clusterID, + conn: conn, + } +} + +// GetVolumeRootPathCephDeprecated returns the root path of the subvolume. func GetVolumeRootPathCephDeprecated(volID fsutil.VolumeID) string { return path.Join("/", "csi-volumes", string(volID)) } -func (vo *VolumeOptions) GetVolumeRootPathCeph(ctx context.Context, volID fsutil.VolumeID) (string, error) { - fsa, err := vo.conn.GetFSAdmin() +// GetVolumeRootPathCeph returns the root path of the subvolume. +func (s *subVolumeClient) GetVolumeRootPathCeph(ctx context.Context) (string, error) { + fsa, err := s.conn.GetFSAdmin() if err != nil { log.ErrorLog(ctx, "could not get FSAdmin err %s", err) return "", err } - svPath, err := fsa.SubVolumePath(vo.FsName, vo.SubvolumeGroup, string(volID)) + svPath, err := fsa.SubVolumePath(s.FsName, s.SubvolumeGroup, s.VolID) if err != nil { - log.ErrorLog(ctx, "failed to get the rootpath for the vol %s: %s", string(volID), err) + log.ErrorLog(ctx, "failed to get the rootpath for the vol %s: %s", s.VolID, err) if errors.Is(err, rados.ErrNotFound) { return "", util.JoinErrors(cerrors.ErrVolumeNotFound, err) } @@ -77,17 +132,18 @@ func (vo *VolumeOptions) GetVolumeRootPathCeph(ctx context.Context, volID fsutil return svPath, nil } -func (vo *VolumeOptions) GetSubVolumeInfo(ctx context.Context, volID fsutil.VolumeID) (*Subvolume, error) { - fsa, err := vo.conn.GetFSAdmin() +// GetSubVolumeInfo returns the subvolume information. +func (s *subVolumeClient) GetSubVolumeInfo(ctx context.Context) (*Subvolume, error) { + fsa, err := s.conn.GetFSAdmin() if err != nil { - log.ErrorLog(ctx, "could not get FSAdmin, can not fetch metadata pool for %s:", vo.FsName, err) + log.ErrorLog(ctx, "could not get FSAdmin, can not fetch metadata pool for %s:", s.FsName, err) return nil, err } - info, err := fsa.SubVolumeInfo(vo.FsName, vo.SubvolumeGroup, string(volID)) + info, err := fsa.SubVolumeInfo(s.FsName, s.SubvolumeGroup, s.VolID) if err != nil { - log.ErrorLog(ctx, "failed to get subvolume info for the vol %s: %s", string(volID), err) + log.ErrorLog(ctx, "failed to get subvolume info for the vol %s: %s", s.VolID, err) if errors.Is(err, rados.ErrNotFound) { return nil, cerrors.ErrVolumeNotFound } @@ -111,7 +167,7 @@ func (vo *VolumeOptions) GetSubVolumeInfo(ctx context.Context, volID fsutil.Volu // or nil (in case the subvolume is in snapshot-retained state), // just continue without returning quota information. if !(info.BytesQuota == fsAdmin.Infinite || info.State == fsAdmin.StateSnapRetained) { - return nil, fmt.Errorf("subvolume %s has unsupported quota: %v", string(volID), info.BytesQuota) + return nil, fmt.Errorf("subvolume %s has unsupported quota: %v", s.VolID, info.BytesQuota) } } else { subvol.BytesQuota = int64(bc) @@ -140,50 +196,52 @@ type localClusterState struct { subVolumeGroupCreated bool } -func CreateVolume(ctx context.Context, volOptions *VolumeOptions, volID fsutil.VolumeID, bytesQuota int64) error { - // verify if corresponding ClusterID key is present in the map, +// CreateVolume creates a subvolume. +func (s *subVolumeClient) CreateVolume(ctx context.Context) error { + // verify if corresponding clusterID key is present in the map, // and if not, initialize with default values(false). - if _, keyPresent := clusterAdditionalInfo[volOptions.ClusterID]; !keyPresent { - clusterAdditionalInfo[volOptions.ClusterID] = &localClusterState{} + if _, keyPresent := clusterAdditionalInfo[s.clusterID]; !keyPresent { + clusterAdditionalInfo[s.clusterID] = &localClusterState{} } - ca, err := volOptions.conn.GetFSAdmin() + ca, err := s.conn.GetFSAdmin() if err != nil { - log.ErrorLog(ctx, "could not get FSAdmin, can not create subvolume %s: %s", string(volID), err) + log.ErrorLog(ctx, "could not get FSAdmin, can not create subvolume %s: %s", s.VolID, err) return err } // create subvolumegroup if not already created for the cluster. - if !clusterAdditionalInfo[volOptions.ClusterID].subVolumeGroupCreated { + if !clusterAdditionalInfo[s.clusterID].subVolumeGroupCreated { opts := fsAdmin.SubVolumeGroupOptions{} - err = ca.CreateSubVolumeGroup(volOptions.FsName, volOptions.SubvolumeGroup, &opts) + err = ca.CreateSubVolumeGroup(s.FsName, s.SubvolumeGroup, &opts) if err != nil { log.ErrorLog( ctx, "failed to create subvolume group %s, for the vol %s: %s", - volOptions.SubvolumeGroup, - string(volID), + s.SubvolumeGroup, + s.VolID, err) return err } - log.DebugLog(ctx, "cephfs: created subvolume group %s", volOptions.SubvolumeGroup) - clusterAdditionalInfo[volOptions.ClusterID].subVolumeGroupCreated = true + log.DebugLog(ctx, "cephfs: created subvolume group %s", s.SubvolumeGroup) + clusterAdditionalInfo[s.clusterID].subVolumeGroupCreated = true } opts := fsAdmin.SubVolumeOptions{ - Size: fsAdmin.ByteCount(bytesQuota), + Size: fsAdmin.ByteCount(s.Size), Mode: modeAllRWX, } - if volOptions.Pool != "" { - opts.PoolLayout = volOptions.Pool + if s.Pool != "" { + opts.PoolLayout = s.Pool } + fmt.Println("this is for debugging ") // FIXME: check if the right credentials are used ("-n", cephEntityClientPrefix + cr.ID) - err = ca.CreateSubVolume(volOptions.FsName, volOptions.SubvolumeGroup, string(volID), &opts) + err = ca.CreateSubVolume(s.FsName, s.SubvolumeGroup, s.VolID, &opts) if err != nil { - log.ErrorLog(ctx, "failed to create subvolume %s in fs %s: %s", string(volID), volOptions.FsName, err) + log.ErrorLog(ctx, "failed to create subvolume %s in fs %s: %s", s.VolID, s.FsName, err) return err } @@ -193,16 +251,16 @@ func CreateVolume(ctx context.Context, volOptions *VolumeOptions, volID fsutil.V // ExpandVolume will expand the volume if the requested size is greater than // the subvolume size. -func (vo *VolumeOptions) ExpandVolume(ctx context.Context, volID fsutil.VolumeID, bytesQuota int64) error { +func (s *subVolumeClient) ExpandVolume(ctx context.Context, bytesQuota int64) error { // get the subvolume size for comparison with the requested size. - info, err := vo.GetSubVolumeInfo(ctx, volID) + info, err := s.GetSubVolumeInfo(ctx) if err != nil { return err } // resize if the requested size is greater than the current size. - if vo.Size > info.BytesQuota { - log.DebugLog(ctx, "clone %s size %d is greater than requested size %d", volID, info.BytesQuota, bytesQuota) - err = vo.ResizeVolume(ctx, volID, bytesQuota) + if s.Size > info.BytesQuota { + log.DebugLog(ctx, "clone %s size %d is greater than requested size %d", s.VolID, info.BytesQuota, bytesQuota) + err = s.ResizeVolume(ctx, bytesQuota) } return err @@ -211,45 +269,47 @@ func (vo *VolumeOptions) ExpandVolume(ctx context.Context, volID fsutil.VolumeID // ResizeVolume will try to use ceph fs subvolume resize command to resize the // subvolume. If the command is not available as a fallback it will use // CreateVolume to resize the subvolume. -func (vo *VolumeOptions) ResizeVolume(ctx context.Context, volID fsutil.VolumeID, bytesQuota int64) error { +func (s *subVolumeClient) ResizeVolume(ctx context.Context, bytesQuota int64) error { // keyPresent checks whether corresponding clusterID key is present in clusterAdditionalInfo var keyPresent bool - // verify if corresponding ClusterID key is present in the map, + // verify if corresponding clusterID key is present in the map, // and if not, initialize with default values(false). - if _, keyPresent = clusterAdditionalInfo[vo.ClusterID]; !keyPresent { - clusterAdditionalInfo[vo.ClusterID] = &localClusterState{} + if _, keyPresent = clusterAdditionalInfo[s.clusterID]; !keyPresent { + clusterAdditionalInfo[s.clusterID] = &localClusterState{} } // resize subvolume when either it's supported, or when corresponding // clusterID key was not present. - if clusterAdditionalInfo[vo.ClusterID].resizeState == unknown || - clusterAdditionalInfo[vo.ClusterID].resizeState == supported { - fsa, err := vo.conn.GetFSAdmin() + if clusterAdditionalInfo[s.clusterID].resizeState == unknown || + clusterAdditionalInfo[s.clusterID].resizeState == supported { + fsa, err := s.conn.GetFSAdmin() if err != nil { - log.ErrorLog(ctx, "could not get FSAdmin, can not resize volume %s:", vo.FsName, err) + log.ErrorLog(ctx, "could not get FSAdmin, can not resize volume %s:", s.FsName, err) return err } - _, err = fsa.ResizeSubVolume(vo.FsName, vo.SubvolumeGroup, string(volID), fsAdmin.ByteCount(bytesQuota), true) + _, err = fsa.ResizeSubVolume(s.FsName, s.SubvolumeGroup, s.VolID, fsAdmin.ByteCount(bytesQuota), true) if err == nil { - clusterAdditionalInfo[vo.ClusterID].resizeState = supported + clusterAdditionalInfo[s.clusterID].resizeState = supported return nil } var invalid fsAdmin.NotImplementedError // In case the error is other than invalid command return error to the caller. if !errors.As(err, &invalid) { - log.ErrorLog(ctx, "failed to resize subvolume %s in fs %s: %s", string(volID), vo.FsName, err) + log.ErrorLog(ctx, "failed to resize subvolume %s in fs %s: %s", s.VolID, s.FsName, err) return err } } - clusterAdditionalInfo[vo.ClusterID].resizeState = unsupported + clusterAdditionalInfo[s.clusterID].resizeState = unsupported + s.Size = bytesQuota - return CreateVolume(ctx, vo, volID, bytesQuota) + return s.CreateVolume(ctx) } -func (vo *VolumeOptions) PurgeVolume(ctx context.Context, volID fsutil.VolumeID, force bool) error { - fsa, err := vo.conn.GetFSAdmin() +// PurgSubVolume removes the subvolume. +func (s *subVolumeClient) PurgeVolume(ctx context.Context, force bool) error { + fsa, err := s.conn.GetFSAdmin() if err != nil { log.ErrorLog(ctx, "could not get FSAdmin %s:", err) @@ -259,13 +319,13 @@ func (vo *VolumeOptions) PurgeVolume(ctx context.Context, volID fsutil.VolumeID, opt := fsAdmin.SubVolRmFlags{} opt.Force = force - if checkSubvolumeHasFeature("snapshot-retention", vo.Features) { + if checkSubvolumeHasFeature("snapshot-retention", s.Features) { opt.RetainSnapshots = true } - err = fsa.RemoveSubVolumeWithFlags(vo.FsName, vo.SubvolumeGroup, string(volID), opt) + err = fsa.RemoveSubVolumeWithFlags(s.FsName, s.SubvolumeGroup, s.VolID, opt) if err != nil { - log.ErrorLog(ctx, "failed to purge subvolume %s in fs %s: %s", string(volID), vo.FsName, err) + log.ErrorLog(ctx, "failed to purge subvolume %s in fs %s: %s", s.VolID, s.FsName, err) if strings.Contains(err.Error(), cerrors.VolumeNotEmpty) { return util.JoinErrors(cerrors.ErrVolumeHasSnapshots, err) } diff --git a/internal/cephfs/driver.go b/internal/cephfs/driver.go index 2a268b8ce..4b7295b0b 100644 --- a/internal/cephfs/driver.go +++ b/internal/cephfs/driver.go @@ -17,8 +17,8 @@ limitations under the License. package cephfs import ( - "github.com/ceph/ceph-csi/internal/cephfs/core" "github.com/ceph/ceph-csi/internal/cephfs/mounter" + "github.com/ceph/ceph-csi/internal/cephfs/store" fsutil "github.com/ceph/ceph-csi/internal/cephfs/util" csicommon "github.com/ceph/ceph-csi/internal/csi-common" "github.com/ceph/ceph-csi/internal/journal" @@ -87,9 +87,9 @@ func (fs *Driver) Run(conf *util.Config) { CSIInstanceID = conf.InstanceID } // Create an instance of the volume journal - core.VolJournal = journal.NewCSIVolumeJournalWithNamespace(CSIInstanceID, fsutil.RadosNamespace) + store.VolJournal = journal.NewCSIVolumeJournalWithNamespace(CSIInstanceID, fsutil.RadosNamespace) - core.SnapJournal = journal.NewCSISnapshotJournalWithNamespace(CSIInstanceID, fsutil.RadosNamespace) + store.SnapJournal = journal.NewCSISnapshotJournalWithNamespace(CSIInstanceID, fsutil.RadosNamespace) // Initialize default library driver fs.cd = csicommon.NewCSIDriver(conf.DriverName, util.DriverVersion, conf.NodeID) diff --git a/internal/cephfs/mounter/fuse.go b/internal/cephfs/mounter/fuse.go index 5ba873684..91077398b 100644 --- a/internal/cephfs/mounter/fuse.go +++ b/internal/cephfs/mounter/fuse.go @@ -25,7 +25,7 @@ import ( "strings" "sync" - "github.com/ceph/ceph-csi/internal/cephfs/core" + "github.com/ceph/ceph-csi/internal/cephfs/store" "github.com/ceph/ceph-csi/internal/util" "github.com/ceph/ceph-csi/internal/util/log" ) @@ -47,7 +47,7 @@ var ( type FuseMounter struct{} -func mountFuse(ctx context.Context, mountPoint string, cr *util.Credentials, volOptions *core.VolumeOptions) error { +func mountFuse(ctx context.Context, mountPoint string, cr *util.Credentials, volOptions *store.VolumeOptions) error { args := []string{ mountPoint, "-m", volOptions.Monitors, @@ -99,7 +99,7 @@ func (m *FuseMounter) Mount( ctx context.Context, mountPoint string, cr *util.Credentials, - volOptions *core.VolumeOptions) error { + volOptions *store.VolumeOptions) error { if err := util.CreateMountPoint(mountPoint); err != nil { return err } diff --git a/internal/cephfs/mounter/kernel.go b/internal/cephfs/mounter/kernel.go index 7fa2f7c2c..ff515be28 100644 --- a/internal/cephfs/mounter/kernel.go +++ b/internal/cephfs/mounter/kernel.go @@ -20,7 +20,7 @@ import ( "context" "fmt" - "github.com/ceph/ceph-csi/internal/cephfs/core" + "github.com/ceph/ceph-csi/internal/cephfs/store" "github.com/ceph/ceph-csi/internal/util" ) @@ -31,7 +31,7 @@ const ( type KernelMounter struct{} -func mountKernel(ctx context.Context, mountPoint string, cr *util.Credentials, volOptions *core.VolumeOptions) error { +func mountKernel(ctx context.Context, mountPoint string, cr *util.Credentials, volOptions *store.VolumeOptions) error { if err := execCommandErr(ctx, "modprobe", "ceph"); err != nil { return err } @@ -63,7 +63,7 @@ func (m *KernelMounter) Mount( ctx context.Context, mountPoint string, cr *util.Credentials, - volOptions *core.VolumeOptions) error { + volOptions *store.VolumeOptions) error { if err := util.CreateMountPoint(mountPoint); err != nil { return err } diff --git a/internal/cephfs/mounter/volumemounter.go b/internal/cephfs/mounter/volumemounter.go index 7f2628d83..e740f4f13 100644 --- a/internal/cephfs/mounter/volumemounter.go +++ b/internal/cephfs/mounter/volumemounter.go @@ -23,7 +23,7 @@ import ( "os/exec" "strings" - "github.com/ceph/ceph-csi/internal/cephfs/core" + "github.com/ceph/ceph-csi/internal/cephfs/store" "github.com/ceph/ceph-csi/internal/util" "github.com/ceph/ceph-csi/internal/util/log" ) @@ -99,11 +99,11 @@ func LoadAvailableMounters(conf *util.Config) error { } type VolumeMounter interface { - Mount(ctx context.Context, mountPoint string, cr *util.Credentials, volOptions *core.VolumeOptions) error + Mount(ctx context.Context, mountPoint string, cr *util.Credentials, volOptions *store.VolumeOptions) error Name() string } -func New(volOptions *core.VolumeOptions) (VolumeMounter, error) { +func New(volOptions *store.VolumeOptions) (VolumeMounter, error) { // Get the mounter from the configuration wantMounter := volOptions.Mounter diff --git a/internal/cephfs/nodeserver.go b/internal/cephfs/nodeserver.go index 9c2bc145a..573a0e543 100644 --- a/internal/cephfs/nodeserver.go +++ b/internal/cephfs/nodeserver.go @@ -23,9 +23,9 @@ import ( "os" "strings" - "github.com/ceph/ceph-csi/internal/cephfs/core" cerrors "github.com/ceph/ceph-csi/internal/cephfs/errors" "github.com/ceph/ceph-csi/internal/cephfs/mounter" + "github.com/ceph/ceph-csi/internal/cephfs/store" fsutil "github.com/ceph/ceph-csi/internal/cephfs/util" csicommon "github.com/ceph/ceph-csi/internal/csi-common" "github.com/ceph/ceph-csi/internal/util" @@ -46,7 +46,7 @@ type NodeServer struct { } func getCredentialsForVolume( - volOptions *core.VolumeOptions, + volOptions *store.VolumeOptions, req *csi.NodeStageVolumeRequest) (*util.Credentials, error) { var ( err error @@ -77,7 +77,7 @@ func getCredentialsForVolume( func (ns *NodeServer) NodeStageVolume( ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) { - var volOptions *core.VolumeOptions + var volOptions *store.VolumeOptions if err := util.ValidateNodeStageVolumeRequest(req); err != nil { return nil, err } @@ -94,21 +94,21 @@ func (ns *NodeServer) NodeStageVolume( } defer ns.VolumeLocks.Release(req.GetVolumeId()) - volOptions, _, err := core.NewVolumeOptionsFromVolID(ctx, string(volID), req.GetVolumeContext(), req.GetSecrets()) + volOptions, _, err := store.NewVolumeOptionsFromVolID(ctx, string(volID), req.GetVolumeContext(), req.GetSecrets()) if err != nil { if !errors.Is(err, cerrors.ErrInvalidVolID) { return nil, status.Error(codes.Internal, err.Error()) } // gets mon IPs from the supplied cluster info - volOptions, _, err = core.NewVolumeOptionsFromStaticVolume(string(volID), req.GetVolumeContext()) + volOptions, _, err = store.NewVolumeOptionsFromStaticVolume(string(volID), req.GetVolumeContext()) if err != nil { if !errors.Is(err, cerrors.ErrNonStaticVolume) { return nil, status.Error(codes.Internal, err.Error()) } // get mon IPs from the volume context - volOptions, _, err = core.NewVolumeOptionsFromMonitorList(string(volID), req.GetVolumeContext(), + volOptions, _, err = store.NewVolumeOptionsFromMonitorList(string(volID), req.GetVolumeContext(), req.GetSecrets()) if err != nil { return nil, status.Error(codes.Internal, err.Error()) @@ -142,7 +142,7 @@ func (ns *NodeServer) NodeStageVolume( return &csi.NodeStageVolumeResponse{}, nil } -func (*NodeServer) mount(ctx context.Context, volOptions *core.VolumeOptions, req *csi.NodeStageVolumeRequest) error { +func (*NodeServer) mount(ctx context.Context, volOptions *store.VolumeOptions, req *csi.NodeStageVolumeRequest) error { stagingTargetPath := req.GetStagingTargetPath() volID := fsutil.VolumeID(req.GetVolumeId()) diff --git a/internal/cephfs/core/fsjournal.go b/internal/cephfs/store/fsjournal.go similarity index 89% rename from internal/cephfs/core/fsjournal.go rename to internal/cephfs/store/fsjournal.go index f22b7b9a8..9c28ce84a 100644 --- a/internal/cephfs/core/fsjournal.go +++ b/internal/cephfs/store/fsjournal.go @@ -14,13 +14,14 @@ See the License for the specific language governing permissions and limitations under the License. */ -package core +package store import ( "context" "errors" "fmt" + "github.com/ceph/ceph-csi/internal/cephfs/core" cerrors "github.com/ceph/ceph-csi/internal/cephfs/errors" fsutil "github.com/ceph/ceph-csi/internal/cephfs/util" "github.com/ceph/ceph-csi/internal/journal" @@ -95,16 +96,16 @@ func CheckVolExists(ctx context.Context, } imageUUID := imageData.ImageUUID vid.FsSubvolName = imageData.ImageAttributes.ImageName + volOptions.VolID = vid.FsSubvolName + vol := core.NewSubVolume(volOptions.conn, &volOptions.SubVolume, volOptions.ClusterID) if sID != nil || pvID != nil { - cloneState, cloneStateErr := volOptions.getCloneState(ctx, fsutil.VolumeID(vid.FsSubvolName)) + cloneState, cloneStateErr := vol.GetCloneState(ctx) if cloneStateErr != nil { if errors.Is(cloneStateErr, cerrors.ErrVolumeNotFound) { if pvID != nil { - err = cleanupCloneFromSubvolumeSnapshot( - ctx, fsutil.VolumeID(pvID.FsSubvolName), - fsutil.VolumeID(vid.FsSubvolName), - parentVolOpt) + err = vol.CleanupSnapshotFromSubvolume( + ctx, &parentVolOpt.SubVolume) if err != nil { return nil, err } @@ -117,29 +118,27 @@ func CheckVolExists(ctx context.Context, return nil, err } - if cloneState == cephFSCloneInprogress { + if cloneState == core.CephFSCloneInprogress { return nil, cerrors.ErrCloneInProgress } - if cloneState == cephFSClonePending { + if cloneState == core.CephFSClonePending { return nil, cerrors.ErrClonePending } - if cloneState == cephFSCloneFailed { + if cloneState == core.CephFSCloneFailed { log.ErrorLog(ctx, "clone failed, deleting subvolume clone. vol=%s, subvol=%s subvolgroup=%s", volOptions.FsName, vid.FsSubvolName, volOptions.SubvolumeGroup) - err = volOptions.PurgeVolume(ctx, fsutil.VolumeID(vid.FsSubvolName), true) + err = vol.PurgeVolume(ctx, true) if err != nil { log.ErrorLog(ctx, "failed to delete volume %s: %v", vid.FsSubvolName, err) return nil, err } if pvID != nil { - err = cleanupCloneFromSubvolumeSnapshot( - ctx, fsutil.VolumeID(pvID.FsSubvolName), - fsutil.VolumeID(vid.FsSubvolName), - parentVolOpt) + err = vol.CleanupSnapshotFromSubvolume( + ctx, &parentVolOpt.SubVolume) if err != nil { return nil, err } @@ -149,21 +148,18 @@ func CheckVolExists(ctx context.Context, return nil, err } - if cloneState != cephFSCloneComplete { + if cloneState != core.CephFSCloneComplete { return nil, fmt.Errorf("clone is not in complete state for %s", vid.FsSubvolName) } } - volOptions.RootPath, err = volOptions.GetVolumeRootPathCeph(ctx, fsutil.VolumeID(vid.FsSubvolName)) + volOptions.RootPath, err = vol.GetVolumeRootPathCeph(ctx) if err != nil { if errors.Is(err, cerrors.ErrVolumeNotFound) { // If the subvolume is not present, cleanup the stale snapshot // created for clone. if parentVolOpt != nil && pvID != nil { - err = cleanupCloneFromSubvolumeSnapshot( - ctx, - fsutil.VolumeID(pvID.FsSubvolName), - fsutil.VolumeID(vid.FsSubvolName), - parentVolOpt) + err = vol.CleanupSnapshotFromSubvolume( + ctx, &parentVolOpt.SubVolume) if err != nil { return nil, err } @@ -194,11 +190,8 @@ func CheckVolExists(ctx context.Context, vid.VolumeID, vid.FsSubvolName, volOptions.RequestName) if parentVolOpt != nil && pvID != nil { - err = cleanupCloneFromSubvolumeSnapshot( - ctx, - fsutil.VolumeID(pvID.FsSubvolName), - fsutil.VolumeID(vid.FsSubvolName), - parentVolOpt) + err = vol.CleanupSnapshotFromSubvolume( + ctx, &parentVolOpt.SubVolume) if err != nil { return nil, err } @@ -280,7 +273,7 @@ func ReserveVol(ctx context.Context, volOptions *VolumeOptions, secret map[strin if err != nil { return nil, err } - + volOptions.VolID = vid.FsSubvolName // generate the volume ID to return to the CO system vid.VolumeID, err = util.GenerateVolID(ctx, volOptions.Monitors, cr, volOptions.FscID, "", volOptions.ClusterID, imageUUID, fsutil.VolIDVersion) @@ -300,7 +293,7 @@ func ReserveSnap( ctx context.Context, volOptions *VolumeOptions, parentSubVolName string, - snap *CephfsSnapshot, + snap *SnapshotOption, cr *util.Credentials) (*SnapshotIdentifier, error) { var ( vid SnapshotIdentifier @@ -373,9 +366,8 @@ hence safe to garbage collect. func CheckSnapExists( ctx context.Context, volOptions *VolumeOptions, - parentSubVolName string, - snap *CephfsSnapshot, - cr *util.Credentials) (*SnapshotIdentifier, *SnapshotInfo, error) { + snap *SnapshotOption, + cr *util.Credentials) (*SnapshotIdentifier, *core.SnapshotInfo, error) { // Connect to cephfs' default radosNamespace (csi) j, err := SnapJournal.Connect(volOptions.Monitors, fsutil.RadosNamespace, cr) if err != nil { @@ -384,7 +376,7 @@ func CheckSnapExists( defer j.Destroy() snapData, err := j.CheckReservation( - ctx, volOptions.MetadataPool, snap.RequestName, snap.NamePrefix, parentSubVolName, "") + ctx, volOptions.MetadataPool, snap.RequestName, snap.NamePrefix, volOptions.VolID, "") if err != nil { return nil, nil, err } @@ -395,7 +387,8 @@ func CheckSnapExists( snapUUID := snapData.ImageUUID snapID := snapData.ImageAttributes.ImageName sid.FsSnapshotName = snapData.ImageAttributes.ImageName - snapInfo, err := volOptions.GetSnapshotInfo(ctx, fsutil.VolumeID(snapID), fsutil.VolumeID(parentSubVolName)) + snapClient := core.NewSnapshot(volOptions.conn, snapID, &volOptions.SubVolume) + snapInfo, err := snapClient.GetSnapshotInfo(ctx) if err != nil { if errors.Is(err, cerrors.ErrSnapNotFound) { err = j.UndoReservation(ctx, volOptions.MetadataPool, @@ -409,7 +402,7 @@ func CheckSnapExists( defer func() { if err != nil { - err = volOptions.DeleteSnapshot(ctx, fsutil.VolumeID(snapID), fsutil.VolumeID(parentSubVolName)) + err = snapClient.DeleteSnapshot(ctx) if err != nil { log.ErrorLog(ctx, "failed to delete snapshot %s: %v", snapID, err) @@ -435,7 +428,7 @@ func CheckSnapExists( return nil, nil, err } log.DebugLog(ctx, "Found existing snapshot (%s) with subvolume name (%s) for request (%s)", - snapData.ImageAttributes.RequestName, parentSubVolName, sid.FsSnapshotName) + snapData.ImageAttributes.RequestName, volOptions.VolID, sid.FsSnapshotName) return sid, &snapInfo, nil } diff --git a/internal/cephfs/core/volumeoptions.go b/internal/cephfs/store/volumeoptions.go similarity index 91% rename from internal/cephfs/core/volumeoptions.go rename to internal/cephfs/store/volumeoptions.go index cbfbb9277..44706f61f 100644 --- a/internal/cephfs/core/volumeoptions.go +++ b/internal/cephfs/store/volumeoptions.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package core +package store import ( "context" @@ -25,6 +25,7 @@ import ( "github.com/container-storage-interface/spec/lib/go/csi" + "github.com/ceph/ceph-csi/internal/cephfs/core" cerrors "github.com/ceph/ceph-csi/internal/cephfs/errors" fsutil "github.com/ceph/ceph-csi/internal/cephfs/util" "github.com/ceph/ceph-csi/internal/util" @@ -32,27 +33,23 @@ import ( ) type VolumeOptions struct { + core.SubVolume TopologyPools *[]util.TopologyConstrainedPool TopologyRequirement *csi.TopologyRequirement Topology map[string]string RequestName string NamePrefix string - Size int64 ClusterID string - FsName string FscID int64 + MetadataPool string // ReservedID represents the ID reserved for a subvolume ReservedID string - MetadataPool string Monitors string `json:"monitors"` - Pool string `json:"pool"` RootPath string `json:"rootPath"` Mounter string `json:"mounter"` ProvisionVolume bool `json:"provisionVolume"` KernelMountOptions string `json:"kernelMountOptions"` FuseMountOptions string `json:"fuseMountOptions"` - SubvolumeGroup string - Features []string // conn is a connection to the Ceph cluster obtained from a ConnPool conn *util.ClusterConnection @@ -180,6 +177,11 @@ func GetClusterInformation(options map[string]string) (*util.ClusterInfo, error) return clusterData, nil } +// GetConnection returns the cluster connection. +func (vo *VolumeOptions) GetConnection() *util.ClusterConnection { + return vo.conn +} + // NewVolumeOptions generates a new instance of volumeOptions from the provided // CSI request parameters. func NewVolumeOptions(ctx context.Context, requestName string, req *csi.CreateVolumeRequest, @@ -230,7 +232,7 @@ func NewVolumeOptions(ctx context.Context, requestName string, req *csi.CreateVo return nil, err } - fs := NewFileSystem(opts.conn) + fs := core.NewFileSystem(opts.conn) opts.FscID, err = fs.GetFscID(ctx, opts.FsName) if err != nil { return nil, err @@ -281,6 +283,7 @@ func NewVolumeOptionsFromVolID( } volOptions.ClusterID = vi.ClusterID vid.VolumeID = volID + volOptions.VolID = volID volOptions.FscID = vi.LocationID if volOptions.Monitors, err = util.Mons(util.CsiConfigFile, vi.ClusterID); err != nil { @@ -309,7 +312,7 @@ func NewVolumeOptionsFromVolID( } }() - fs := NewFileSystem(volOptions.conn) + fs := core.NewFileSystem(volOptions.conn) volOptions.FsName, err = fs.GetFsName(ctx, volOptions.FscID) if err != nil { return nil, nil, err @@ -358,8 +361,9 @@ func NewVolumeOptionsFromVolID( } volOptions.ProvisionVolume = true - - info, err := volOptions.GetSubVolumeInfo(ctx, fsutil.VolumeID(vid.FsSubvolName)) + volOptions.SubVolume.VolID = vid.FsSubvolName + vol := core.NewSubVolume(volOptions.conn, &volOptions.SubVolume, volOptions.ClusterID) + info, err := vol.GetSubVolumeInfo(ctx) if err == nil { volOptions.RootPath = info.Path volOptions.Features = info.Features @@ -367,7 +371,7 @@ func NewVolumeOptionsFromVolID( } if errors.Is(err, cerrors.ErrInvalidCommand) { - volOptions.RootPath, err = volOptions.GetVolumeRootPathCeph(ctx, fsutil.VolumeID(vid.FsSubvolName)) + volOptions.RootPath, err = vol.GetVolumeRootPathCeph(ctx) } return &volOptions, &vid, err @@ -410,7 +414,7 @@ func NewVolumeOptionsFromMonitorList( return nil, nil, err } - opts.RootPath = GetVolumeRootPathCephDeprecated(fsutil.VolumeID(volID)) + opts.RootPath = core.GetVolumeRootPathCephDeprecated(fsutil.VolumeID(volID)) } else { if err = extractOption(&opts.RootPath, "rootPath", options); err != nil { return nil, nil, err @@ -509,7 +513,7 @@ func NewVolumeOptionsFromStaticVolume( func NewSnapshotOptionsFromID( ctx context.Context, snapID string, - cr *util.Credentials) (*VolumeOptions, *SnapshotInfo, *SnapshotIdentifier, error) { + cr *util.Credentials) (*VolumeOptions, *core.SnapshotInfo, *SnapshotIdentifier, error) { var ( vi util.CSIIdentifier volOptions VolumeOptions @@ -551,7 +555,7 @@ func NewSnapshotOptionsFromID( } }() - fs := NewFileSystem(volOptions.conn) + fs := core.NewFileSystem(volOptions.conn) volOptions.FsName, err = fs.GetFsName(ctx, volOptions.FscID) if err != nil { return &volOptions, nil, &sid, err @@ -579,14 +583,17 @@ func NewSnapshotOptionsFromID( sid.FsSnapshotName = imageAttributes.ImageName sid.FsSubvolName = imageAttributes.SourceName - subvolInfo, err := volOptions.GetSubVolumeInfo(ctx, fsutil.VolumeID(sid.FsSubvolName)) + volOptions.SubVolume.VolID = sid.FsSubvolName + vol := core.NewSubVolume(volOptions.conn, &volOptions.SubVolume, volOptions.ClusterID) + + subvolInfo, err := vol.GetSubVolumeInfo(ctx) if err != nil { return &volOptions, nil, &sid, err } volOptions.Features = subvolInfo.Features volOptions.Size = subvolInfo.BytesQuota - - info, err := volOptions.GetSnapshotInfo(ctx, fsutil.VolumeID(sid.FsSnapshotName), fsutil.VolumeID(sid.FsSubvolName)) + snap := core.NewSnapshot(volOptions.conn, sid.FsSnapshotName, &volOptions.SubVolume) + info, err := snap.GetSnapshotInfo(ctx) if err != nil { return &volOptions, nil, &sid, err } @@ -594,8 +601,17 @@ func NewSnapshotOptionsFromID( return &volOptions, &info, &sid, nil } -func GenSnapFromOptions(ctx context.Context, req *csi.CreateSnapshotRequest) (snap *CephfsSnapshot, err error) { - cephfsSnap := &CephfsSnapshot{} +// SnapshotOption is a struct that holds the information about the snapshot. +type SnapshotOption struct { + ReservedID string // ID reserved for the snapshot. + RequestName string // Request name of the snapshot. + ClusterID string // Cluster ID of to identify ceph cluster connection information. + Monitors string // Monitors of the ceph cluster. + NamePrefix string // Name prefix of the snapshot. +} + +func GenSnapFromOptions(ctx context.Context, req *csi.CreateSnapshotRequest) (*SnapshotOption, error) { + cephfsSnap := &SnapshotOption{} cephfsSnap.RequestName = req.GetName() snapOptions := req.GetParameters()