diff --git a/internal/cephfs/core/metadata.go b/internal/cephfs/core/metadata.go index e0cedbd72..d5fb8a73a 100644 --- a/internal/cephfs/core/metadata.go +++ b/internal/cephfs/core/metadata.go @@ -33,9 +33,7 @@ const ( var ErrSubVolMetadataNotSupported = errors.New("subvolume metadata operations are not supported") func (s *subVolumeClient) supportsSubVolMetadata() bool { - if _, keyPresent := clusterAdditionalInfo[s.clusterID]; !keyPresent { - clusterAdditionalInfo[s.clusterID] = &localClusterState{} - } + newLocalClusterState(s.clusterID) return clusterAdditionalInfo[s.clusterID].subVolMetadataState != unsupported } diff --git a/internal/cephfs/core/snapshot_metadata.go b/internal/cephfs/core/snapshot_metadata.go index 5d00798c7..5cf67ddc0 100644 --- a/internal/cephfs/core/snapshot_metadata.go +++ b/internal/cephfs/core/snapshot_metadata.go @@ -29,9 +29,7 @@ import ( var ErrSubVolSnapMetadataNotSupported = errors.New("subvolume snapshot metadata operations are not supported") func (s *snapshotClient) supportsSubVolSnapMetadata() bool { - if _, keyPresent := clusterAdditionalInfo[s.clusterID]; !keyPresent { - clusterAdditionalInfo[s.clusterID] = &localClusterState{} - } + newLocalClusterState(s.clusterID) return clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState != unsupported } diff --git a/internal/cephfs/core/volume.go b/internal/cephfs/core/volume.go index 316d2c7cd..ee0611c9e 100644 --- a/internal/cephfs/core/volume.go +++ b/internal/cephfs/core/volume.go @@ -202,18 +202,25 @@ type localClusterState struct { resizeState operationState subVolMetadataState operationState subVolSnapshotMetadataState operationState + // A cluster can have multiple filesystem for that we need to have a map of + // subvolumegroups to check filesystem is created nor not. // set true once a subvolumegroup is created - // for corresponding cluster. - subVolumeGroupCreated bool + // for corresponding filesystem in a cluster. + subVolumeGroupsCreated map[string]bool +} + +func newLocalClusterState(clusterID string) { + // verify if corresponding clusterID key is present in the map, + // and if not, initialize with default values(false). + if _, keyPresent := clusterAdditionalInfo[clusterID]; !keyPresent { + clusterAdditionalInfo[clusterID] = &localClusterState{} + clusterAdditionalInfo[clusterID].subVolumeGroupsCreated = make(map[string]bool) + } } // CreateVolume creates a subvolume. func (s *subVolumeClient) CreateVolume(ctx context.Context) error { - // verify if corresponding clusterID key is present in the map, - // and if not, initialize with default values(false). - if _, keyPresent := clusterAdditionalInfo[s.clusterID]; !keyPresent { - clusterAdditionalInfo[s.clusterID] = &localClusterState{} - } + newLocalClusterState(s.clusterID) ca, err := s.conn.GetFSAdmin() if err != nil { @@ -223,7 +230,7 @@ func (s *subVolumeClient) CreateVolume(ctx context.Context) error { } // create subvolumegroup if not already created for the cluster. - if !clusterAdditionalInfo[s.clusterID].subVolumeGroupCreated { + if !clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated[s.FsName] { opts := fsAdmin.SubVolumeGroupOptions{} err = ca.CreateSubVolumeGroup(s.FsName, s.SubvolumeGroup, &opts) if err != nil { @@ -237,7 +244,7 @@ func (s *subVolumeClient) CreateVolume(ctx context.Context) error { return err } log.DebugLog(ctx, "cephfs: created subvolume group %s", s.SubvolumeGroup) - clusterAdditionalInfo[s.clusterID].subVolumeGroupCreated = true + clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated[s.FsName] = true } opts := fsAdmin.SubVolumeOptions{ @@ -279,13 +286,7 @@ func (s *subVolumeClient) ExpandVolume(ctx context.Context, bytesQuota int64) er // subvolume. If the command is not available as a fallback it will use // CreateVolume to resize the subvolume. func (s *subVolumeClient) ResizeVolume(ctx context.Context, bytesQuota int64) error { - // keyPresent checks whether corresponding clusterID key is present in clusterAdditionalInfo - var keyPresent bool - // verify if corresponding clusterID key is present in the map, - // and if not, initialize with default values(false). - if _, keyPresent = clusterAdditionalInfo[s.clusterID]; !keyPresent { - clusterAdditionalInfo[s.clusterID] = &localClusterState{} - } + newLocalClusterState(s.clusterID) // resize subvolume when either it's supported, or when corresponding // clusterID key was not present. if clusterAdditionalInfo[s.clusterID].resizeState == unknown ||