diff --git a/docs/design/proposals/rbd-mirror.md b/docs/design/proposals/rbd-mirror.md index f66ced4a3..7f39cafae 100644 --- a/docs/design/proposals/rbd-mirror.md +++ b/docs/design/proposals/rbd-mirror.md @@ -75,12 +75,12 @@ To solve this problem, We will have a new controller(rbdplugin controller) running as part of provisioner pod which watches for the PV objects. When a PV is created it will extract the required information from the PV spec and it will regenerate the OMAP data and also it will generate a new VolumeHandle -(`newclusterID-newpoolID-volumeuniqueID`) and it creates an OMAP object for -mapping between old VolumeHandle and new VolumeHandle. Whenever Ceph-CSI gets a -RPC request with older VolumeHandle, it will check if any new VolumeHandle -exists for the old VolumeHandle. If yes, it uses the new VolumeHandle for -internal operations (to get pool name, Ceph monitor details from the ClusterID -etc). +(`newclusterID-newpoolID-volumeuniqueID`) and it adds a PV annotation +`csi.ceph.io/volume-handle` for mapping between old VolumeHandle and new +VolumeHandle. Whenever Ceph-CSI gets a RPC request with older VolumeHandle, it +will check if any new VolumeHandle exists for the old VolumeHandle. If yes, it +uses the new VolumeHandle for internal operations (to get pool name, Ceph +monitor details from the ClusterID etc). Currently, We are making use of watchers in node stage request to make sure ReadWriteOnce (RWO) PVC is mounted on a single node at a given point in time. diff --git a/internal/controller/persistentvolume/persistentvolume.go b/internal/controller/persistentvolume/persistentvolume.go index 9d046fac8..c77ababc1 100644 --- a/internal/controller/persistentvolume/persistentvolume.go +++ b/internal/controller/persistentvolume/persistentvolume.go @@ -122,6 +122,24 @@ func checkStaticVolume(pv *corev1.PersistentVolume) (bool, error) { return static, nil } +// storeVolumeIDInPV stores the new volumeID in PV object. +func (r ReconcilePersistentVolume) storeVolumeIDInPV(pv *corev1.PersistentVolume, newVolumeID string) error { + if v, ok := pv.Annotations[rbd.PVVolumeHandleAnnotationKey]; ok { + if v == newVolumeID { + return nil + } + } + if pv.Annotations == nil { + pv.Annotations = make(map[string]string) + } + if pv.Labels == nil { + pv.Labels = make(map[string]string) + } + pv.Labels[rbd.PVReplicatedLabelKey] = rbd.PVReplicatedLabelValue + pv.Annotations[rbd.PVVolumeHandleAnnotationKey] = newVolumeID + return r.client.Update(context.TODO(), pv) +} + // reconcilePV will extract the image details from the pv spec and regenerates // the omap data. func (r ReconcilePersistentVolume) reconcilePV(obj runtime.Object) error { @@ -163,11 +181,18 @@ func (r ReconcilePersistentVolume) reconcilePV(obj runtime.Object) error { } defer cr.DeleteCredentials() - err = rbd.RegenerateJournal(imageName, volumeHandler, pool, journalPool, requestName, cr) + rbdVolID, err := rbd.RegenerateJournal(imageName, volumeHandler, pool, journalPool, requestName, cr) if err != nil { util.ErrorLogMsg("failed to regenerate journal %s", err) return err } + if rbdVolID != volumeHandler { + err = r.storeVolumeIDInPV(pv, rbdVolID) + if err != nil { + util.ErrorLogMsg("failed to store volumeID in PV %s", err) + return err + } + } return nil } diff --git a/internal/rbd/rbd_journal.go b/internal/rbd/rbd_journal.go index beb657cf6..0d2f4ffa8 100644 --- a/internal/rbd/rbd_journal.go +++ b/internal/rbd/rbd_journal.go @@ -25,6 +25,15 @@ import ( "github.com/ceph/ceph-csi/internal/util" ) +const ( + // PVVolumeHandleAnnotationKey is the annotation key set on the PV object. + PVVolumeHandleAnnotationKey = "csi.ceph.io/volume-handle" + // PVReplicatedLabelKey is the label key set on PV object. + PVReplicatedLabelKey = "csi.ceph.io/replicated-volume" + // PVReplicatedLabelValue is the label value set on PV object. + PVReplicatedLabelValue = "volume-handle-detected" +) + func validateNonEmptyField(field, fieldName, structName string) error { if field == "" { return fmt.Errorf("value '%s' in '%s' structure cannot be empty", fieldName, structName) @@ -479,10 +488,9 @@ func undoVolReservation(ctx context.Context, rbdVol *rbdVolume, cr *util.Credent // Extract uuid from volumeID // Reserve omap data // Generate new volume Handler -// Create old volumeHandler to new handler mapping // The volume handler won't remain same as its contains poolID,clusterID etc // which are not same across clusters. -func RegenerateJournal(imageName, volumeID, pool, journalPool, requestName string, cr *util.Credentials) error { +func RegenerateJournal(imageName, volumeID, pool, journalPool, requestName string, cr *util.Credentials) (string, error) { ctx := context.Background() var ( options map[string]string @@ -496,7 +504,7 @@ func RegenerateJournal(imageName, volumeID, pool, journalPool, requestName strin err := vi.DecomposeCSIID(rbdVol.VolID) if err != nil { - return fmt.Errorf("%w: error decoding volume ID (%s) (%s)", + return "", fmt.Errorf("%w: error decoding volume ID (%s) (%s)", ErrInvalidVolID, err, rbdVol.VolID) } @@ -507,13 +515,13 @@ func RegenerateJournal(imageName, volumeID, pool, journalPool, requestName strin rbdVol.Monitors, _, err = util.GetMonsAndClusterID(options) if err != nil { util.ErrorLog(ctx, "failed getting mons (%s)", err) - return err + return "", err } rbdVol.Pool = pool err = rbdVol.Connect(cr) if err != nil { - return err + return "", err } rbdVol.JournalPool = journalPool if rbdVol.JournalPool == "" { @@ -522,13 +530,13 @@ func RegenerateJournal(imageName, volumeID, pool, journalPool, requestName strin volJournal = journal.NewCSIVolumeJournal("default") j, err := volJournal.Connect(rbdVol.Monitors, rbdVol.RadosNamespace, cr) if err != nil { - return err + return "", err } defer j.Destroy() journalPoolID, imagePoolID, err := util.GetPoolIDs(ctx, rbdVol.Monitors, rbdVol.JournalPool, rbdVol.Pool, cr) if err != nil { - return err + return "", err } rbdVol.RequestName = requestName @@ -538,7 +546,7 @@ func RegenerateJournal(imageName, volumeID, pool, journalPool, requestName strin imageData, err := j.CheckReservation( ctx, rbdVol.JournalPool, rbdVol.RequestName, rbdVol.NamePrefix, "", kmsID) if err != nil { - return err + return "", err } if imageData != nil { rbdVol.ReservedID = imageData.ImageUUID @@ -547,23 +555,23 @@ func RegenerateJournal(imageName, volumeID, pool, journalPool, requestName strin if rbdVol.ImageID == "" { err = rbdVol.storeImageID(ctx, j) if err != nil { - return err + return "", err } } - err = rbdVol.addNewUUIDMapping(ctx, imagePoolID, j) - if err != nil { - util.ErrorLog(ctx, "failed to add UUID mapping %s: %v", rbdVol, err) - return err - } // As the omap already exists for this image ID return nil. - return nil + rbdVol.VolID, err = util.GenerateVolID(ctx, rbdVol.Monitors, cr, imagePoolID, rbdVol.Pool, + rbdVol.ClusterID, rbdVol.ReservedID, volIDVersion) + if err != nil { + return "", err + } + return rbdVol.VolID, nil } rbdVol.ReservedID, rbdVol.RbdImageName, err = j.ReserveName( ctx, rbdVol.JournalPool, journalPoolID, rbdVol.Pool, imagePoolID, rbdVol.RequestName, rbdVol.NamePrefix, "", kmsID, vi.ObjectUUID, rbdVol.Owner) if err != nil { - return err + return "", err } defer func() { @@ -578,7 +586,7 @@ func RegenerateJournal(imageName, volumeID, pool, journalPool, requestName strin rbdVol.VolID, err = util.GenerateVolID(ctx, rbdVol.Monitors, cr, imagePoolID, rbdVol.Pool, rbdVol.ClusterID, rbdVol.ReservedID, volIDVersion) if err != nil { - return err + return "", err } util.DebugLog(ctx, "re-generated Volume ID (%s) and image name (%s) for request name (%s)", @@ -586,14 +594,11 @@ func RegenerateJournal(imageName, volumeID, pool, journalPool, requestName strin if rbdVol.ImageID == "" { err = rbdVol.storeImageID(ctx, j) if err != nil { - return err + return "", err } } - if volumeID != rbdVol.VolID { - return j.ReserveNewUUIDMapping(ctx, rbdVol.JournalPool, volumeID, rbdVol.VolID) - } - return nil + return rbdVol.VolID, nil } // storeImageID retrieves the image ID and stores it in OMAP. @@ -610,23 +615,3 @@ func (rv *rbdVolume) storeImageID(ctx context.Context, j *journal.Connection) er } return nil } - -// addNewUUIDMapping creates the mapping between two volumeID. -func (rv *rbdVolume) addNewUUIDMapping(ctx context.Context, imagePoolID int64, j *journal.Connection) error { - var err error - volID := "" - - id, err := j.CheckNewUUIDMapping(ctx, rv.JournalPool, rv.VolID) - if err == nil && id == "" { - volID, err = util.GenerateVolID(ctx, rv.Monitors, rv.conn.Creds, imagePoolID, rv.Pool, - rv.ClusterID, rv.ReservedID, volIDVersion) - if err != nil { - return err - } - if rv.VolID == volID { - return nil - } - return j.ReserveNewUUIDMapping(ctx, rv.JournalPool, rv.VolID, volID) - } - return err -} diff --git a/internal/rbd/rbd_util.go b/internal/rbd/rbd_util.go index b857c93d6..b0ff5a16a 100644 --- a/internal/rbd/rbd_util.go +++ b/internal/rbd/rbd_util.go @@ -35,6 +35,7 @@ import ( "github.com/container-storage-interface/spec/lib/go/csi" "github.com/golang/protobuf/ptypes" "github.com/golang/protobuf/ptypes/timestamp" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/cloud-provider/volume/helpers" ) @@ -777,9 +778,8 @@ func genSnapFromSnapID(ctx context.Context, rbdSnap *rbdSnapshot, snapshotID str return err } -// genVolFromVolID generates a rbdVolume structure from the provided identifier, updating -// the structure with elements from on-disk image metadata as well. -func genVolFromVolID(ctx context.Context, volumeID string, cr *util.Credentials, secrets map[string]string) (*rbdVolume, error) { +// generateVolumeFromVolumeID generates a rbdVolume structure from the provided identifier. +func generateVolumeFromVolumeID(ctx context.Context, volumeID string, cr *util.Credentials, secrets map[string]string) (*rbdVolume, error) { var ( options map[string]string vi util.CSIIdentifier @@ -821,20 +821,6 @@ func genVolFromVolID(ctx context.Context, volumeID string, cr *util.Credentials, } defer j.Destroy() - // check is there any volumeID mapping exists. - id, err := j.CheckNewUUIDMapping(ctx, rbdVol.JournalPool, volumeID) - if err != nil { - return rbdVol, fmt.Errorf("failed to get volume id %s mapping %w", - volumeID, err) - } - if id != "" { - rbdVol.VolID = id - err = vi.DecomposeCSIID(rbdVol.VolID) - if err != nil { - return rbdVol, fmt.Errorf("%w: error decoding volume ID (%s) (%s)", - ErrInvalidVolID, err, rbdVol.VolID) - } - } rbdVol.Pool, err = util.GetPoolName(rbdVol.Monitors, cr, vi.LocationID) if err != nil { return rbdVol, err @@ -883,6 +869,38 @@ func genVolFromVolID(ctx context.Context, volumeID string, cr *util.Credentials, return rbdVol, err } +// genVolFromVolID generates a rbdVolume structure from the provided identifier, updating +// the structure with elements from on-disk image metadata as well. +func genVolFromVolID(ctx context.Context, volumeID string, cr *util.Credentials, secrets map[string]string) (*rbdVolume, error) { + vol, err := generateVolumeFromVolumeID(ctx, volumeID, cr, secrets) + if !errors.Is(err, util.ErrKeyNotFound) && !errors.Is(err, util.ErrPoolNotFound) { + return vol, err + } + + // If the volume details are not found in the OMAP it can be a mirrored RBD + // image and the OMAP is already generated and the volumeHandle might not + // be the same in the PV.Spec.CSI.VolumeHandle. Check the PV annotation for + // the new volumeHandle. If the new volumeHandle is found, generate the RBD + // volume structure from the new volumeHandle. + c := util.NewK8sClient() + listOpt := metav1.ListOptions{ + LabelSelector: PVReplicatedLabelKey, + } + pvlist, pErr := c.CoreV1().PersistentVolumes().List(context.TODO(), listOpt) + if pErr != nil { + return vol, pErr + } + for i := range pvlist.Items { + if pvlist.Items[i].Spec.CSI != nil && pvlist.Items[i].Spec.CSI.VolumeHandle == volumeID { + if v, ok := pvlist.Items[i].Annotations[PVVolumeHandleAnnotationKey]; ok { + util.UsefulLog(ctx, "found new volumeID %s for existing volumeID %s", v, volumeID) + return generateVolumeFromVolumeID(ctx, v, cr, secrets) + } + } + } + return vol, err +} + func genVolFromVolumeOptions(ctx context.Context, volOptions, credentials map[string]string, disableInUseChecks bool) (*rbdVolume, error) { var ( ok bool