From 30244bf11ba1b5b4e946de61622dc9a693d4ba22 Mon Sep 17 00:00:00 2001 From: Prasanna Kumar Kalever Date: Thu, 28 Jul 2022 17:56:55 +0530 Subject: [PATCH] cephfs: snapshots honor `--setmetadata` option `--setmetadata` is false by default, honoring it will keep the metadata disabled by default Signed-off-by: Prasanna Kumar Kalever --- internal/cephfs/controllerserver.go | 30 ++++++++--------------- internal/cephfs/core/clone.go | 6 ++--- internal/cephfs/core/snapshot.go | 17 +++++++------ internal/cephfs/core/snapshot_metadata.go | 8 ++++++ internal/cephfs/store/fsjournal.go | 4 ++- internal/cephfs/store/volumeoptions.go | 2 +- 6 files changed, 35 insertions(+), 32 deletions(-) diff --git a/internal/cephfs/controllerserver.go b/internal/cephfs/controllerserver.go index e012ca4e9..87f1d2617 100644 --- a/internal/cephfs/controllerserver.go +++ b/internal/cephfs/controllerserver.go @@ -601,13 +601,8 @@ func (cs *ControllerServer) cleanUpBackingVolume( return status.Error(codes.Internal, err.Error()) } } else { - snapClient := core.NewSnapshot( - snapParentVolOptions.GetConnection(), - snapID.FsSnapshotName, - volOptions.ClusterID, - cs.ClusterName, - &snapParentVolOptions.SubVolume, - ) + snapClient := core.NewSnapshot(snapParentVolOptions.GetConnection(), snapID.FsSnapshotName, + volOptions.ClusterID, cs.ClusterName, cs.SetMetadata, &snapParentVolOptions.SubVolume) err = deleteSnapshotAndUndoReservation(ctx, snapClient, snapParentVolOptions, snapID, cr) if err != nil { @@ -785,7 +780,7 @@ func (cs *ControllerServer) CreateSnapshot( } defer cs.VolumeLocks.Release(sourceVolID) snapName := req.GetName() - sid, snapInfo, err := store.CheckSnapExists(ctx, parentVolOptions, cephfsSnap, cs.ClusterName, cr) + sid, snapInfo, err := store.CheckSnapExists(ctx, parentVolOptions, cephfsSnap, cs.ClusterName, cs.SetMetadata, cr) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } @@ -823,12 +818,8 @@ func (cs *ControllerServer) CreateSnapshot( if sid != nil { // check snapshot is protected protected := true - snapClient := core.NewSnapshot( - parentVolOptions.GetConnection(), - sid.FsSnapshotName, - parentVolOptions.ClusterID, - cs.ClusterName, - &parentVolOptions.SubVolume) + snapClient := core.NewSnapshot(parentVolOptions.GetConnection(), sid.FsSnapshotName, + parentVolOptions.ClusterID, cs.ClusterName, cs.SetMetadata, &parentVolOptions.SubVolume) if !(snapInfo.Protected == core.SnapshotIsProtected) { err = snapClient.ProtectSnapshot(ctx) if err != nil { @@ -872,7 +863,7 @@ func (cs *ControllerServer) CreateSnapshot( } } }() - snap, err := doSnapshot(ctx, parentVolOptions, sID.FsSnapshotName, cs.ClusterName, metadata) + snap, err := cs.doSnapshot(ctx, parentVolOptions, sID.FsSnapshotName, metadata) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } @@ -888,17 +879,16 @@ func (cs *ControllerServer) CreateSnapshot( }, nil } -func doSnapshot( +func (cs *ControllerServer) doSnapshot( ctx context.Context, volOpt *store.VolumeOptions, - snapshotName, - clusterName string, + snapshotName string, metadata map[string]string, ) (core.SnapshotInfo, error) { snapID := fsutil.VolumeID(snapshotName) snap := core.SnapshotInfo{} snapClient := core.NewSnapshot(volOpt.GetConnection(), snapshotName, - volOpt.ClusterID, clusterName, &volOpt.SubVolume) + volOpt.ClusterID, cs.ClusterName, cs.SetMetadata, &volOpt.SubVolume) err := snapClient.CreateSnapshot(ctx) if err != nil { log.ErrorLog(ctx, "failed to create snapshot %s %v", snapID, err) @@ -1056,7 +1046,7 @@ func (cs *ControllerServer) DeleteSnapshot( return nil, status.Errorf(codes.FailedPrecondition, "snapshot %s has pending clones", snapshotID) } snapClient := core.NewSnapshot(volOpt.GetConnection(), sid.FsSnapshotName, - volOpt.ClusterID, cs.ClusterName, &volOpt.SubVolume) + volOpt.ClusterID, cs.ClusterName, cs.SetMetadata, &volOpt.SubVolume) if snapInfo.Protected == core.SnapshotIsProtected { err = snapClient.UnprotectSnapshot(ctx) if err != nil { diff --git a/internal/cephfs/core/clone.go b/internal/cephfs/core/clone.go index 1e801750f..8f3a75a4d 100644 --- a/internal/cephfs/core/clone.go +++ b/internal/cephfs/core/clone.go @@ -66,7 +66,7 @@ func (s *subVolumeClient) CreateCloneFromSubvolume( parentvolOpt *SubVolume, ) error { snapshotID := s.VolID - snapClient := NewSnapshot(s.conn, snapshotID, s.clusterID, s.clusterName, parentvolOpt) + snapClient := NewSnapshot(s.conn, snapshotID, s.clusterID, s.clusterName, s.enableMetadata, parentvolOpt) err := snapClient.CreateSnapshot(ctx) if err != nil { log.ErrorLog(ctx, "failed to create snapshot %s %v", snapshotID, err) @@ -165,7 +165,7 @@ func (s *subVolumeClient) CleanupSnapshotFromSubvolume( // snapshot name is same as clone name as we need a name which can be // identified during PVC-PVC cloning. snapShotID := s.VolID - snapClient := NewSnapshot(s.conn, snapShotID, s.clusterID, s.clusterName, parentVol) + snapClient := NewSnapshot(s.conn, snapShotID, s.clusterID, s.clusterName, s.enableMetadata, parentVol) snapInfo, err := snapClient.GetSnapshotInfo(ctx) if err != nil { if errors.Is(err, cerrors.ErrSnapNotFound) { @@ -198,7 +198,7 @@ func (s *subVolumeClient) CreateCloneFromSnapshot( ctx context.Context, snap Snapshot, ) error { snapID := snap.SnapshotID - snapClient := NewSnapshot(s.conn, snapID, s.clusterID, s.clusterName, snap.SubVolume) + snapClient := NewSnapshot(s.conn, snapID, s.clusterID, s.clusterName, s.enableMetadata, snap.SubVolume) err := snapClient.CloneSnapshot(ctx, s.SubVolume) if err != nil { return err diff --git a/internal/cephfs/core/snapshot.go b/internal/cephfs/core/snapshot.go index 4de877c2e..b1994266e 100644 --- a/internal/cephfs/core/snapshot.go +++ b/internal/cephfs/core/snapshot.go @@ -61,10 +61,11 @@ type SnapshotClient interface { // snapshotClient is the implementation of SnapshotClient interface. type snapshotClient struct { - *Snapshot // Embedded snapshot struct. - clusterID string // Cluster ID. - clusterName string // Cluster Name. - conn *util.ClusterConnection // Cluster connection. + *Snapshot // Embedded snapshot struct. + clusterID string // Cluster ID. + clusterName string // Cluster Name. + enableMetadata bool // Set metadata on volume + conn *util.ClusterConnection // Cluster connection. } // Snapshot represents a subvolume snapshot and its cluster information. @@ -79,6 +80,7 @@ func NewSnapshot( snapshotID, clusterID, clusterName string, + setMetadata bool, vol *SubVolume, ) SnapshotClient { return &snapshotClient{ @@ -86,9 +88,10 @@ func NewSnapshot( SnapshotID: snapshotID, SubVolume: vol, }, - clusterID: clusterID, - clusterName: clusterName, - conn: conn, + clusterID: clusterID, + clusterName: clusterName, + enableMetadata: setMetadata, + conn: conn, } } diff --git a/internal/cephfs/core/snapshot_metadata.go b/internal/cephfs/core/snapshot_metadata.go index 060a7bbcb..5d00798c7 100644 --- a/internal/cephfs/core/snapshot_metadata.go +++ b/internal/cephfs/core/snapshot_metadata.go @@ -91,6 +91,10 @@ func (s *snapshotClient) removeSnapshotMetadata(key string) error { // SetAllSnapshotMetadata set all the metadata from arg parameters on // subvolume snapshot. func (s *snapshotClient) SetAllSnapshotMetadata(parameters map[string]string) error { + if !s.enableMetadata { + return nil + } + for k, v := range parameters { err := s.setSnapshotMetadata(k, v) if err != nil { @@ -113,6 +117,10 @@ func (s *snapshotClient) SetAllSnapshotMetadata(parameters map[string]string) er // UnsetAllSnapshotMetadata unset all the metadata from arg keys on subvolume // snapshot. func (s *snapshotClient) UnsetAllSnapshotMetadata(keys []string) error { + if !s.enableMetadata { + return nil + } + for _, key := range keys { err := s.removeSnapshotMetadata(key) // TODO: replace string comparison with errno. diff --git a/internal/cephfs/store/fsjournal.go b/internal/cephfs/store/fsjournal.go index 11b836a6d..d2a7e0678 100644 --- a/internal/cephfs/store/fsjournal.go +++ b/internal/cephfs/store/fsjournal.go @@ -379,6 +379,7 @@ func CheckSnapExists( volOptions *VolumeOptions, snap *SnapshotOption, clusterName string, + setMetadata bool, cr *util.Credentials, ) (*SnapshotIdentifier, *core.SnapshotInfo, error) { // Connect to cephfs' default radosNamespace (csi) @@ -400,7 +401,8 @@ func CheckSnapExists( snapUUID := snapData.ImageUUID snapID := snapData.ImageAttributes.ImageName sid.FsSnapshotName = snapData.ImageAttributes.ImageName - snapClient := core.NewSnapshot(volOptions.conn, snapID, volOptions.ClusterID, clusterName, &volOptions.SubVolume) + snapClient := core.NewSnapshot(volOptions.conn, snapID, + volOptions.ClusterID, clusterName, setMetadata, &volOptions.SubVolume) snapInfo, err := snapClient.GetSnapshotInfo(ctx) if err != nil { if errors.Is(err, cerrors.ErrSnapNotFound) { diff --git a/internal/cephfs/store/volumeoptions.go b/internal/cephfs/store/volumeoptions.go index 937f2c4fa..044f1c7fd 100644 --- a/internal/cephfs/store/volumeoptions.go +++ b/internal/cephfs/store/volumeoptions.go @@ -749,7 +749,7 @@ func NewSnapshotOptionsFromID( volOptions.Size = subvolInfo.BytesQuota volOptions.RootPath = subvolInfo.Path snap := core.NewSnapshot(volOptions.conn, sid.FsSnapshotName, - volOptions.ClusterID, clusterName, &volOptions.SubVolume) + volOptions.ClusterID, clusterName, setMetadata, &volOptions.SubVolume) info, err := snap.GetSnapshotInfo(ctx) if err != nil { return &volOptions, nil, &sid, err