diff --git a/internal/cephfs/controllerserver.go b/internal/cephfs/controllerserver.go index 1c1c91b35..e012ca4e9 100644 --- a/internal/cephfs/controllerserver.go +++ b/internal/cephfs/controllerserver.go @@ -55,6 +55,9 @@ type ControllerServer struct { // Cluster name ClusterName string + + // Set metadata on volume + SetMetadata bool } // createBackingVolume creates the backing subvolume and on any error cleans up any created entities. @@ -67,7 +70,7 @@ func (cs *ControllerServer) createBackingVolume( ) error { var err error volClient := core.NewSubVolume(volOptions.GetConnection(), - &volOptions.SubVolume, volOptions.ClusterID, cs.ClusterName) + &volOptions.SubVolume, volOptions.ClusterID, cs.ClusterName, cs.SetMetadata) if sID != nil { return cs.createBackingVolumeFromSnapshotSource(ctx, volOptions, parentVolOpt, volClient, sID) @@ -101,7 +104,7 @@ func (cs *ControllerServer) createBackingVolumeFromSnapshotSource( defer cs.OperationLocks.ReleaseRestoreLock(sID.SnapshotID) if volOptions.BackingSnapshot { - if err := store.AddSnapshotBackedVolumeRef(ctx, volOptions, cs.ClusterName); err != nil { + if err := store.AddSnapshotBackedVolumeRef(ctx, volOptions, cs.ClusterName, cs.SetMetadata); err != nil { log.ErrorLog(ctx, "failed to create snapshot-backed volume from snapshot %s: %v", sID.FsSnapshotName, err) @@ -146,11 +149,10 @@ func (cs *ControllerServer) createBackingVolumeFromVolumeSource( return nil } -func checkContentSource( +func (cs *ControllerServer) checkContentSource( ctx context.Context, req *csi.CreateVolumeRequest, cr *util.Credentials, - clusterName string, ) (*store.VolumeOptions, *store.VolumeIdentifier, *store.SnapshotIdentifier, error) { if req.VolumeContentSource == nil { return nil, nil, nil, nil @@ -159,7 +161,7 @@ func checkContentSource( switch volumeSource.Type.(type) { case *csi.VolumeContentSource_Snapshot: snapshotID := req.VolumeContentSource.GetSnapshot().GetSnapshotId() - volOpt, _, sid, err := store.NewSnapshotOptionsFromID(ctx, snapshotID, cr, clusterName) + volOpt, _, sid, err := store.NewSnapshotOptionsFromID(ctx, snapshotID, cr, cs.ClusterName, cs.SetMetadata) if err != nil { if errors.Is(err, cerrors.ErrSnapNotFound) { return nil, nil, nil, status.Error(codes.NotFound, err.Error()) @@ -172,7 +174,8 @@ func checkContentSource( case *csi.VolumeContentSource_Volume: // Find the volume using the provided VolumeID volID := req.VolumeContentSource.GetVolume().GetVolumeId() - parentVol, pvID, err := store.NewVolumeOptionsFromVolID(ctx, volID, nil, req.Secrets, clusterName) + parentVol, pvID, err := store.NewVolumeOptionsFromVolID(ctx, + volID, nil, req.Secrets, cs.ClusterName, cs.SetMetadata) if err != nil { if !errors.Is(err, cerrors.ErrVolumeNotFound) { return nil, nil, nil, status.Error(codes.NotFound, err.Error()) @@ -257,7 +260,7 @@ func (cs *ControllerServer) CreateVolume( } defer cs.VolumeLocks.Release(requestName) - volOptions, err := store.NewVolumeOptions(ctx, requestName, cs.ClusterName, req, cr) + volOptions, err := store.NewVolumeOptions(ctx, requestName, cs.ClusterName, cs.SetMetadata, req, cr) if err != nil { log.ErrorLog(ctx, "validation and extraction of volume options failed: %v", err) @@ -269,7 +272,7 @@ func (cs *ControllerServer) CreateVolume( volOptions.Size = util.RoundOffCephFSVolSize(req.GetCapacityRange().GetRequiredBytes()) } - parentVol, pvID, sID, err := checkContentSource(ctx, req, cr, cs.ClusterName) + parentVol, pvID, sID, err := cs.checkContentSource(ctx, req, cr) if err != nil { return nil, err } @@ -282,7 +285,7 @@ func (cs *ControllerServer) CreateVolume( return nil, status.Error(codes.InvalidArgument, err.Error()) } - vID, err := store.CheckVolExists(ctx, volOptions, parentVol, pvID, sID, cr, cs.ClusterName) + vID, err := store.CheckVolExists(ctx, volOptions, parentVol, pvID, sID, cr, cs.ClusterName, cs.SetMetadata) if err != nil { if cerrors.IsCloneRetryError(err) { return nil, status.Error(codes.Aborted, err.Error()) @@ -295,7 +298,7 @@ func (cs *ControllerServer) CreateVolume( metadata := k8s.GetVolumeMetadata(req.GetParameters()) if vID != nil { volClient := core.NewSubVolume(volOptions.GetConnection(), &volOptions.SubVolume, - volOptions.ClusterID, cs.ClusterName) + volOptions.ClusterID, cs.ClusterName, cs.SetMetadata) if sID != nil || pvID != nil && !volOptions.BackingSnapshot { err = volClient.ExpandVolume(ctx, volOptions.Size) if err != nil { @@ -376,7 +379,7 @@ func (cs *ControllerServer) CreateVolume( } volClient := core.NewSubVolume(volOptions.GetConnection(), - &volOptions.SubVolume, volOptions.ClusterID, cs.ClusterName) + &volOptions.SubVolume, volOptions.ClusterID, cs.ClusterName, cs.SetMetadata) if !volOptions.BackingSnapshot { // Get root path for the created subvolume. // Note that root path for snapshot-backed volumes has been already set when @@ -464,7 +467,8 @@ func (cs *ControllerServer) DeleteVolume( defer cs.OperationLocks.ReleaseDeleteLock(req.GetVolumeId()) // Find the volume using the provided VolumeID - volOptions, vID, err := store.NewVolumeOptionsFromVolID(ctx, string(volID), nil, secrets, cs.ClusterName) + volOptions, vID, err := store.NewVolumeOptionsFromVolID(ctx, string(volID), nil, secrets, + cs.ClusterName, cs.SetMetadata) if err != nil { // if error is ErrPoolNotFound, the pool is already deleted we dont // need to worry about deleting subvolume or omap data, return success @@ -519,7 +523,7 @@ func (cs *ControllerServer) DeleteVolume( } defer cr.DeleteCredentials() - if err := cleanUpBackingVolume(ctx, volOptions, vID, cr, cs.ClusterName); err != nil { + if err := cs.cleanUpBackingVolume(ctx, volOptions, vID, cr); err != nil { return nil, err } @@ -532,18 +536,17 @@ func (cs *ControllerServer) DeleteVolume( return &csi.DeleteVolumeResponse{}, nil } -func cleanUpBackingVolume( +func (cs *ControllerServer) cleanUpBackingVolume( ctx context.Context, volOptions *store.VolumeOptions, volID *store.VolumeIdentifier, cr *util.Credentials, - clusterName string, ) error { if !volOptions.BackingSnapshot { // Regular volumes need to be purged. volClient := core.NewSubVolume(volOptions.GetConnection(), - &volOptions.SubVolume, volOptions.ClusterID, clusterName) + &volOptions.SubVolume, volOptions.ClusterID, cs.ClusterName, cs.SetMetadata) if err := volClient.PurgeVolume(ctx, false); err != nil { log.ErrorLog(ctx, "failed to delete volume %s: %v", volID, err) if errors.Is(err, cerrors.ErrVolumeHasSnapshots) { @@ -576,7 +579,7 @@ func cleanUpBackingVolume( } snapParentVolOptions, _, snapID, err := store.NewSnapshotOptionsFromID(ctx, - volOptions.BackingSnapshotID, cr, clusterName) + volOptions.BackingSnapshotID, cr, cs.ClusterName, cs.SetMetadata) if err != nil { absorbErrs := []error{ util.ErrPoolNotFound, @@ -602,7 +605,7 @@ func cleanUpBackingVolume( snapParentVolOptions.GetConnection(), snapID.FsSnapshotName, volOptions.ClusterID, - clusterName, + cs.ClusterName, &snapParentVolOptions.SubVolume, ) @@ -671,7 +674,8 @@ func (cs *ControllerServer) ControllerExpandVolume( } defer cr.DeleteCredentials() - volOptions, volIdentifier, err := store.NewVolumeOptionsFromVolID(ctx, volID, nil, secret, cs.ClusterName) + volOptions, volIdentifier, err := store.NewVolumeOptionsFromVolID(ctx, volID, nil, secret, + cs.ClusterName, cs.SetMetadata) if err != nil { log.ErrorLog(ctx, "validation and extraction of volume options failed: %v", err) @@ -686,7 +690,7 @@ func (cs *ControllerServer) ControllerExpandVolume( RoundOffSize := util.RoundOffCephFSVolSize(req.GetCapacityRange().GetRequiredBytes()) volClient := core.NewSubVolume(volOptions.GetConnection(), - &volOptions.SubVolume, volOptions.ClusterID, cs.ClusterName) + &volOptions.SubVolume, volOptions.ClusterID, cs.ClusterName, cs.SetMetadata) if err = volClient.ResizeVolume(ctx, RoundOffSize); err != nil { log.ErrorLog(ctx, "failed to expand volume %s: %v", fsutil.VolumeID(volIdentifier.FsSubvolName), err) @@ -740,7 +744,7 @@ func (cs *ControllerServer) CreateSnapshot( // Find the volume using the provided VolumeID parentVolOptions, vid, err := store.NewVolumeOptionsFromVolID(ctx, - sourceVolID, nil, req.GetSecrets(), cs.ClusterName) + sourceVolID, nil, req.GetSecrets(), cs.ClusterName, cs.SetMetadata) if err != nil { if errors.Is(err, util.ErrPoolNotFound) { log.WarningLog(ctx, "failed to get backend volume for %s: %v", sourceVolID, err) @@ -792,10 +796,8 @@ func (cs *ControllerServer) CreateSnapshot( // request to create snapshot. // TODO: For this purpose we could make use of cached clusterAdditionalInfo // too. - volClient := core.NewSubVolume(parentVolOptions.GetConnection(), - &parentVolOptions.SubVolume, - parentVolOptions.ClusterID, - cs.ClusterName) + volClient := core.NewSubVolume(parentVolOptions.GetConnection(), &parentVolOptions.SubVolume, + parentVolOptions.ClusterID, cs.ClusterName, cs.SetMetadata) info, err := volClient.GetSubVolumeInfo(ctx) if err != nil { // Check error code value against ErrInvalidCommand to understand the cluster @@ -998,7 +1000,7 @@ func (cs *ControllerServer) DeleteSnapshot( } defer cs.OperationLocks.ReleaseDeleteLock(snapshotID) - volOpt, snapInfo, sid, err := store.NewSnapshotOptionsFromID(ctx, snapshotID, cr, cs.ClusterName) + volOpt, snapInfo, sid, err := store.NewSnapshotOptionsFromID(ctx, snapshotID, cr, cs.ClusterName, cs.SetMetadata) if err != nil { switch { case errors.Is(err, util.ErrPoolNotFound): diff --git a/internal/cephfs/core/metadata.go b/internal/cephfs/core/metadata.go index bb02e0a9e..d124d2eda 100644 --- a/internal/cephfs/core/metadata.go +++ b/internal/cephfs/core/metadata.go @@ -93,6 +93,10 @@ func (s *subVolumeClient) removeMetadata(key string) error { // SetAllMetadata set all the metadata from arg parameters on Ssubvolume. func (s *subVolumeClient) SetAllMetadata(parameters map[string]string) error { + if !s.enableMetadata { + return nil + } + for k, v := range parameters { err := s.setMetadata(k, v) if err != nil { @@ -113,6 +117,10 @@ func (s *subVolumeClient) SetAllMetadata(parameters map[string]string) error { // UnsetAllMetadata unset all the metadata from arg keys on subvolume. func (s *subVolumeClient) UnsetAllMetadata(keys []string) error { + if !s.enableMetadata { + return nil + } + for _, key := range keys { err := s.removeMetadata(key) // TODO: replace string comparison with errno. diff --git a/internal/cephfs/core/volume.go b/internal/cephfs/core/volume.go index 61e32c3bb..316d2c7cd 100644 --- a/internal/cephfs/core/volume.go +++ b/internal/cephfs/core/volume.go @@ -81,10 +81,11 @@ type SubVolumeClient interface { // subVolumeClient implements SubVolumeClient interface. type subVolumeClient struct { - *SubVolume // Embedded SubVolume struct. - clusterID string // Cluster ID to check subvolumegroup and resize functionality. - clusterName string // Cluster name - conn *util.ClusterConnection // Cluster connection. + *SubVolume // Embedded SubVolume struct. + clusterID string // Cluster ID to check subvolumegroup and resize functionality. + clusterName string // Cluster name + enableMetadata bool // Set metadata on volume + conn *util.ClusterConnection // Cluster connection. } // SubVolume holds the information about the subvolume. @@ -98,12 +99,19 @@ type SubVolume struct { } // NewSubVolume returns a new subvolume client. -func NewSubVolume(conn *util.ClusterConnection, vol *SubVolume, clusterID, clusterName string) SubVolumeClient { +func NewSubVolume( + conn *util.ClusterConnection, + vol *SubVolume, + clusterID, + clusterName string, + setMetadata bool, +) SubVolumeClient { return &subVolumeClient{ - SubVolume: vol, - clusterID: clusterID, - clusterName: clusterName, - conn: conn, + SubVolume: vol, + clusterID: clusterID, + clusterName: clusterName, + enableMetadata: setMetadata, + conn: conn, } } diff --git a/internal/cephfs/driver.go b/internal/cephfs/driver.go index 94d4d2d4d..5261d12aa 100644 --- a/internal/cephfs/driver.go +++ b/internal/cephfs/driver.go @@ -136,6 +136,7 @@ func (fs *Driver) Run(conf *util.Config) { if conf.IsControllerServer { fs.cs = NewControllerServer(fs.cd) fs.cs.ClusterName = conf.ClusterName + fs.cs.SetMetadata = conf.SetMetadata } if !conf.IsControllerServer && !conf.IsNodeServer { topology, err = util.GetTopologyFromDomainLabels(conf.DomainLabels, conf.NodeID, conf.DriverName) diff --git a/internal/cephfs/nodeserver.go b/internal/cephfs/nodeserver.go index d8cdf0063..cb540541f 100644 --- a/internal/cephfs/nodeserver.go +++ b/internal/cephfs/nodeserver.go @@ -82,7 +82,7 @@ func (ns *NodeServer) getVolumeOptions( volContext, volSecrets map[string]string, ) (*store.VolumeOptions, error) { - volOptions, _, err := store.NewVolumeOptionsFromVolID(ctx, string(volID), volContext, volSecrets, "") + volOptions, _, err := store.NewVolumeOptionsFromVolID(ctx, string(volID), volContext, volSecrets, "", false) if err != nil { if !errors.Is(err, cerrors.ErrInvalidVolID) { return nil, status.Error(codes.Internal, err.Error()) diff --git a/internal/cephfs/store/backingsnapshot.go b/internal/cephfs/store/backingsnapshot.go index d9500e28c..b57321ceb 100644 --- a/internal/cephfs/store/backingsnapshot.go +++ b/internal/cephfs/store/backingsnapshot.go @@ -35,6 +35,7 @@ func AddSnapshotBackedVolumeRef( ctx context.Context, volOptions *VolumeOptions, clusterName string, + setMetadata bool, ) error { ioctx, err := volOptions.conn.GetIoctx(volOptions.MetadataPool) if err != nil { @@ -96,7 +97,8 @@ func AddSnapshotBackedVolumeRef( // There may have been a race between adding a ref to the reftracker and // deleting the backing snapshot. Make sure the snapshot still exists by // trying to retrieve it again. - _, _, _, err = NewSnapshotOptionsFromID(ctx, volOptions.BackingSnapshotID, volOptions.conn.Creds, clusterName) + _, _, _, err = NewSnapshotOptionsFromID(ctx, + volOptions.BackingSnapshotID, volOptions.conn.Creds, clusterName, setMetadata) if err != nil { log.ErrorLog(ctx, "failed to get backing snapshot %s: %v", volOptions.BackingSnapshotID, err) } diff --git a/internal/cephfs/store/fsjournal.go b/internal/cephfs/store/fsjournal.go index 83ef1d3c7..11b836a6d 100644 --- a/internal/cephfs/store/fsjournal.go +++ b/internal/cephfs/store/fsjournal.go @@ -79,6 +79,7 @@ func CheckVolExists(ctx context.Context, sID *SnapshotIdentifier, cr *util.Credentials, clusterName string, + setMetadata bool, ) (*VolumeIdentifier, error) { var vid VolumeIdentifier // Connect to cephfs' default radosNamespace (csi) @@ -100,7 +101,7 @@ func CheckVolExists(ctx context.Context, vid.FsSubvolName = imageData.ImageAttributes.ImageName volOptions.VolID = vid.FsSubvolName - vol := core.NewSubVolume(volOptions.conn, &volOptions.SubVolume, volOptions.ClusterID, clusterName) + vol := core.NewSubVolume(volOptions.conn, &volOptions.SubVolume, volOptions.ClusterID, clusterName, setMetadata) if (sID != nil || pvID != nil) && imageData.ImageAttributes.BackingSnapshotID == "" { cloneState, cloneStateErr := vol.GetCloneState(ctx) if cloneStateErr != nil { diff --git a/internal/cephfs/store/volumeoptions.go b/internal/cephfs/store/volumeoptions.go index 29dbb3742..937f2c4fa 100644 --- a/internal/cephfs/store/volumeoptions.go +++ b/internal/cephfs/store/volumeoptions.go @@ -196,7 +196,12 @@ func fmtBackingSnapshotOptionMismatch(optName, expected, actual string) error { // NewVolumeOptions generates a new instance of volumeOptions from the provided // CSI request parameters. -func NewVolumeOptions(ctx context.Context, requestName, clusterName string, req *csi.CreateVolumeRequest, +func NewVolumeOptions( + ctx context.Context, + requestName, + clusterName string, + setMetadata bool, + req *csi.CreateVolumeRequest, cr *util.Credentials, ) (*VolumeOptions, error) { var ( @@ -289,7 +294,7 @@ func NewVolumeOptions(ctx context.Context, requestName, clusterName string, req opts.BackingSnapshotID = req.GetVolumeContentSource().GetSnapshot().GetSnapshotId() - err = opts.populateVolumeOptionsFromBackingSnapshot(ctx, cr, clusterName) + err = opts.populateVolumeOptionsFromBackingSnapshot(ctx, cr, clusterName, setMetadata) if err != nil { return nil, err } @@ -305,6 +310,7 @@ func NewVolumeOptionsFromVolID( volID string, volOpt, secrets map[string]string, clusterName string, + setMetadata bool, ) (*VolumeOptions, *VolumeIdentifier, error) { var ( vi util.CSIIdentifier @@ -408,16 +414,20 @@ func NewVolumeOptionsFromVolID( volOptions.SubVolume.VolID = vid.FsSubvolName if volOptions.BackingSnapshot { - err = volOptions.populateVolumeOptionsFromBackingSnapshot(ctx, cr, clusterName) + err = volOptions.populateVolumeOptionsFromBackingSnapshot(ctx, cr, clusterName, setMetadata) } else { - err = volOptions.populateVolumeOptionsFromSubvolume(ctx, clusterName) + err = volOptions.populateVolumeOptionsFromSubvolume(ctx, clusterName, setMetadata) } return &volOptions, &vid, err } -func (vo *VolumeOptions) populateVolumeOptionsFromSubvolume(ctx context.Context, clusterName string) error { - vol := core.NewSubVolume(vo.conn, &vo.SubVolume, vo.ClusterID, clusterName) +func (vo *VolumeOptions) populateVolumeOptionsFromSubvolume( + ctx context.Context, + clusterName string, + setMetadata bool, +) error { + vol := core.NewSubVolume(vo.conn, &vo.SubVolume, vo.ClusterID, clusterName, setMetadata) var info *core.Subvolume info, err := vol.GetSubVolumeInfo(ctx) @@ -438,6 +448,7 @@ func (vo *VolumeOptions) populateVolumeOptionsFromBackingSnapshot( ctx context.Context, cr *util.Credentials, clusterName string, + setMetadata bool, ) error { // As of CephFS snapshot v2 API, snapshots may be found in two locations: // @@ -459,7 +470,8 @@ func (vo *VolumeOptions) populateVolumeOptionsFromBackingSnapshot( return nil } - parentBackingSnapVolOpts, _, snapID, err := NewSnapshotOptionsFromID(ctx, vo.BackingSnapshotID, cr, clusterName) + parentBackingSnapVolOpts, _, snapID, err := NewSnapshotOptionsFromID(ctx, + vo.BackingSnapshotID, cr, clusterName, setMetadata) if err != nil { return fmt.Errorf("failed to retrieve backing snapshot %s: %w", vo.BackingSnapshotID, err) } @@ -655,6 +667,7 @@ func NewSnapshotOptionsFromID( snapID string, cr *util.Credentials, clusterName string, + setMetadata bool, ) (*VolumeOptions, *core.SnapshotInfo, *SnapshotIdentifier, error) { var ( vi util.CSIIdentifier @@ -726,7 +739,7 @@ func NewSnapshotOptionsFromID( sid.FsSubvolName = imageAttributes.SourceName volOptions.SubVolume.VolID = sid.FsSubvolName - vol := core.NewSubVolume(volOptions.conn, &volOptions.SubVolume, volOptions.ClusterID, clusterName) + vol := core.NewSubVolume(volOptions.conn, &volOptions.SubVolume, volOptions.ClusterID, clusterName, setMetadata) subvolInfo, err := vol.GetSubVolumeInfo(ctx) if err != nil {