diff --git a/internal/controller/controller.go b/internal/controller/controller.go new file mode 100644 index 000000000..700d81327 --- /dev/null +++ b/internal/controller/controller.go @@ -0,0 +1,78 @@ +/* +Copyright 2020 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package controller + +import ( + "github.com/ceph/ceph-csi/internal/util" + + clientConfig "sigs.k8s.io/controller-runtime/pkg/client/config" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/manager/signals" +) + +// ContollerManager is the interface that will wrap Add function. +// The New controllers which gets added, as to implement Add function to get +// started by the manager. +type ContollerManager interface { + Add(manager.Manager, Config) error +} + +// Config holds the drivername and namespace name. +type Config struct { + DriverName string + Namespace string +} + +// ControllerList holds the list of managers need to be started. +var ControllerList []ContollerManager + +// addToManager calls the registered managers Add method. +func addToManager(mgr manager.Manager, config Config) error { + for _, c := range ControllerList { + err := c.Add(mgr, config) + if err != nil { + return err + } + } + return nil +} + +// Start will start all the registered managers. +func Start(config Config) error { + electionID := config.DriverName + "-" + config.Namespace + opts := manager.Options{ + LeaderElection: true, + // disable metrics + MetricsBindAddress: "0", + LeaderElectionNamespace: config.Namespace, + LeaderElectionID: electionID, + } + mgr, err := manager.New(clientConfig.GetConfigOrDie(), opts) + if err != nil { + util.ErrorLogMsg("failed to create manager %s", err) + return err + } + err = addToManager(mgr, config) + if err != nil { + util.ErrorLogMsg("failed to add manager %s", err) + return err + } + err = mgr.Start(signals.SetupSignalHandler()) + if err != nil { + util.ErrorLogMsg("failed to start manager %s", err) + } + return err +} diff --git a/internal/controller/persistentvolume/persistentvolume.go b/internal/controller/persistentvolume/persistentvolume.go new file mode 100644 index 000000000..fd53d0600 --- /dev/null +++ b/internal/controller/persistentvolume/persistentvolume.go @@ -0,0 +1,167 @@ +/* +Copyright 2020 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package persistentvolume + +import ( + "context" + "errors" + "fmt" + + ctrl "github.com/ceph/ceph-csi/internal/controller" + "github.com/ceph/ceph-csi/internal/rbd" + "github.com/ceph/ceph-csi/internal/util" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" +) + +// ReconcilePersistentVolume reconciles a PersistentVolume object. +type ReconcilePersistentVolume struct { + client client.Client + config ctrl.Config +} + +var _ reconcile.Reconciler = &ReconcilePersistentVolume{} +var _ ctrl.ContollerManager = &ReconcilePersistentVolume{} + +// Init will add the ReconcilePersistentVolume to the list. +func Init() { + // add ReconcilePersistentVolume to the list + ctrl.ControllerList = append(ctrl.ControllerList, ReconcilePersistentVolume{}) +} + +// Add adds the newPVReconciler. +func (r ReconcilePersistentVolume) Add(mgr manager.Manager, config ctrl.Config) error { + return add(mgr, newPVReconciler(mgr, config)) +} + +// newReconciler returns a ReconcilePersistentVolume. +func newPVReconciler(mgr manager.Manager, config ctrl.Config) reconcile.Reconciler { + r := &ReconcilePersistentVolume{ + client: mgr.GetClient(), + config: config, + } + return r +} + +func add(mgr manager.Manager, r reconcile.Reconciler) error { + // Create a new controller + c, err := controller.New("persistentvolume-controller", mgr, controller.Options{MaxConcurrentReconciles: 1, Reconciler: r}) + if err != nil { + return err + } + + // Watch for changes to PersistentVolumes + err = c.Watch(&source.Kind{Type: &corev1.PersistentVolume{}}, &handler.EnqueueRequestForObject{}) + if err != nil { + return err + } + return nil +} + +func (r *ReconcilePersistentVolume) getCredentials(name, namespace string) (map[string]string, error) { + secret := &corev1.Secret{} + err := r.client.Get(context.TODO(), types.NamespacedName{Name: name, Namespace: namespace}, secret) + if err != nil { + return nil, fmt.Errorf("error getting secret %s in namespace %s: %v", name, namespace, err) + } + + credentials := map[string]string{} + for key, value := range secret.Data { + credentials[key] = string(value) + } + return credentials, nil +} + +// reconcilePV will extract the image details from the pv spec and regenerates +// the omap data. +func (r ReconcilePersistentVolume) reconcilePV(obj runtime.Object) error { + pv, ok := obj.(*corev1.PersistentVolume) + if !ok { + return nil + } + if pv.Spec.CSI != nil && pv.Spec.CSI.Driver == r.config.DriverName { + pool := pv.Spec.CSI.VolumeAttributes["pool"] + journalPool := pv.Spec.CSI.VolumeAttributes["journalPool"] + requestName := pv.Name + imageName := pv.Spec.CSI.VolumeAttributes["imageName"] + volumeHandler := pv.Spec.CSI.VolumeHandle + secretName := "" + secretNamespace := "" + + if pv.Spec.CSI.ControllerExpandSecretRef != nil { + secretName = pv.Spec.CSI.ControllerExpandSecretRef.Name + secretNamespace = pv.Spec.CSI.ControllerExpandSecretRef.Namespace + } else if pv.Spec.CSI.NodeStageSecretRef != nil { + secretName = pv.Spec.CSI.NodeStageSecretRef.Name + secretNamespace = pv.Spec.CSI.NodeStageSecretRef.Namespace + } + if secretName == "" || secretNamespace == "" { + errStr := "secretname or secret namespace is empty" + util.ErrorLogMsg(errStr) + return errors.New(errStr) + } + + secrets, err := r.getCredentials(secretName, secretNamespace) + if err != nil { + util.ErrorLogMsg("failed to get secrets %s", err) + return err + } + cr, err := util.NewUserCredentials(secrets) + if err != nil { + util.ErrorLogMsg("failed to get user credentials %s", err) + return err + } + defer cr.DeleteCredentials() + err = rbd.RegenerateJournal(imageName, volumeHandler, pool, journalPool, requestName, cr) + if err != nil { + util.ErrorLogMsg("failed to regenerate journal %s", err) + return err + } + } + return nil +} + +// Reconcile reconciles the PersitentVolume object and creates a new omap entries +// for the volume. +func (r *ReconcilePersistentVolume) Reconcile(request reconcile.Request) (reconcile.Result, error) { + pv := &corev1.PersistentVolume{} + err := r.client.Get(context.TODO(), request.NamespacedName, pv) + if err != nil { + if apierrors.IsNotFound(err) { + return reconcile.Result{}, nil + } + return reconcile.Result{}, err + } + // Check if the object is under deletion + if !pv.GetDeletionTimestamp().IsZero() { + return reconcile.Result{}, nil + } + + err = r.reconcilePV(pv) + if err != nil { + return reconcile.Result{}, err + } + return reconcile.Result{}, nil +} diff --git a/internal/journal/voljournal.go b/internal/journal/voljournal.go index c133a84f2..45a459d75 100644 --- a/internal/journal/voljournal.go +++ b/internal/journal/voljournal.go @@ -434,9 +434,12 @@ func (conn *Connection) UndoReservation(ctx context.Context, return err } -// reserveOMapName creates an omap with passed in oMapNamePrefix and a generated . -// It ensures generated omap name does not already exist and if conflicts are detected, a set -// number of retires with newer uuids are attempted before returning an error. +// reserveOMapName creates an omap with passed in oMapNamePrefix and a +// generated . If the passed volUUID is not empty it will use it instead +// of generating its own UUID and it will return an error immediately if omap +// already exists.if the passed volUUID is empty It ensures generated omap name +// does not already exist and if conflicts are detected, a set number of +// retires with newer uuids are attempted before returning an error. func reserveOMapName(ctx context.Context, monitors string, cr *util.Credentials, pool, namespace, oMapNamePrefix, volUUID string) (string, error) { var iterUUID string @@ -489,8 +492,8 @@ Input arguments: - namePrefix: Prefix to use when generating the image/subvolume name (suffix is an auto-genetated UUID) - parentName: Name of the parent image/subvolume if reservation is for a snapshot (optional) - kmsConf: Name of the key management service used to encrypt the image (optional) - - volUUID: UUID need to be reserved instead of auto-generating one (this is - useful for mirroring and metro-DR) + - volUUID: UUID need to be reserved instead of auto-generating one (this is + useful for mirroring and metro-DR) Return values: - string: Contains the UUID that was reserved for the passed in reqName @@ -689,3 +692,43 @@ func (conn *Connection) Destroy() { conn.monitors = "" conn.cr = nil } + +// CheckNewUUIDMapping checks is there any UUID mapping between old +// volumeHandle and the newly generated volumeHandle. +func (conn *Connection) CheckNewUUIDMapping(ctx context.Context, + journalPool, volumeHandle string) (string, error) { + var cj = conn.config + + // check if request name is already part of the directory omap + fetchKeys := []string{ + cj.csiNameKeyPrefix + volumeHandle, + } + values, err := getOMapValues( + ctx, conn, journalPool, cj.namespace, cj.csiDirectory, + cj.commonPrefix, fetchKeys) + if err != nil { + if errors.Is(err, util.ErrKeyNotFound) || errors.Is(err, util.ErrPoolNotFound) { + // pool or omap (oid) was not present + // stop processing but without an error for no reservation exists + return "", nil + } + return "", err + } + return values[cj.csiNameKeyPrefix+volumeHandle], nil +} + +// ReserveNewUUIDMapping creates the omap mapping between the oldVolumeHandle +// and the newVolumeHandle. Incase of Async Mirroring the PV is statically +// created it will have oldVolumeHandle,the volumeHandle is composed of +// clusterID,PoolID etc. as the poolID and clusterID might be different at the +// secondary cluster cephcsi will generate the new mapping and keep it for +// internal reference. +func (conn *Connection) ReserveNewUUIDMapping(ctx context.Context, + journalPool, oldVolumeHandle, newVolumeHandle string) error { + var cj = conn.config + + setKeys := map[string]string{ + cj.csiNameKeyPrefix + oldVolumeHandle: newVolumeHandle, + } + return setOMapKeys(ctx, conn, journalPool, cj.namespace, cj.csiDirectory, setKeys) +} diff --git a/internal/rbd/rbd_journal.go b/internal/rbd/rbd_journal.go index 0f6506e91..1b525e186 100644 --- a/internal/rbd/rbd_journal.go +++ b/internal/rbd/rbd_journal.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" + "github.com/ceph/ceph-csi/internal/journal" "github.com/ceph/ceph-csi/internal/util" ) @@ -466,3 +467,150 @@ func undoVolReservation(ctx context.Context, rbdVol *rbdVolume, cr *util.Credent return err } + +// RegenerateJournal regenerates the omap data for the static volumes, the +// input parameters imageName, volumeID, pool, journalPool, requestName will be +// present in the PV.Spec.CSI object based on that we can regenerate the +// complete omap mapping between imageName and volumeID. + +// RegenerateJournal performs below operations +// Extract information from volumeID +// Get pool ID from pool name +// Extract uuid from volumeID +// Reserve omap data +// Generate new volume Handler +// Create old volumeHandler to new handler mapping +// The volume handler wont remain same as its contains poolID,clusterID etc +// which are not same across clusters. +func RegenerateJournal(imageName, volumeID, pool, journalPool, requestName string, cr *util.Credentials) error { + ctx := context.Background() + var ( + options map[string]string + vi util.CSIIdentifier + rbdVol *rbdVolume + ) + + options = make(map[string]string) + rbdVol = &rbdVolume{VolID: volumeID} + + err := vi.DecomposeCSIID(rbdVol.VolID) + if err != nil { + return fmt.Errorf("%w: error decoding volume ID (%s) (%s)", + ErrInvalidVolID, err, rbdVol.VolID) + } + + // TODO check clusterID mapping exists + rbdVol.ClusterID = vi.ClusterID + options["clusterID"] = rbdVol.ClusterID + + rbdVol.Monitors, _, err = util.GetMonsAndClusterID(options) + if err != nil { + util.ErrorLog(ctx, "failed getting mons (%s)", err) + return err + } + + rbdVol.Pool = pool + err = rbdVol.Connect(cr) + if err != nil { + return err + } + rbdVol.JournalPool = journalPool + if rbdVol.JournalPool == "" { + rbdVol.JournalPool = rbdVol.Pool + } + volJournal = journal.NewCSIVolumeJournal("default") + j, err := volJournal.Connect(rbdVol.Monitors, rbdVol.RadosNamespace, cr) + if err != nil { + return err + } + defer j.Destroy() + + journalPoolID, imagePoolID, err := util.GetPoolIDs(ctx, rbdVol.Monitors, rbdVol.JournalPool, rbdVol.Pool, cr) + if err != nil { + return err + } + + rbdVol.RequestName = requestName + // TODO add Nameprefix also + + kmsID := "" + imageData, err := j.CheckReservation( + ctx, rbdVol.JournalPool, rbdVol.RequestName, rbdVol.NamePrefix, "", kmsID) + if err != nil { + return err + } + if imageData != nil { + rbdVol.ReservedID = imageData.ImageUUID + rbdVol.ImageID = imageData.ImageAttributes.ImageID + if rbdVol.ImageID == "" { + err = rbdVol.getImageID() + if err != nil { + util.ErrorLog(ctx, "failed to get image id %s: %v", rbdVol, err) + return err + } + err = j.StoreImageID(ctx, rbdVol.JournalPool, rbdVol.ReservedID, rbdVol.ImageID) + if err != nil { + util.ErrorLog(ctx, "failed to store volume id %s: %v", rbdVol, err) + return err + } + } + err = rbdVol.addNewUUIDMapping(ctx, imagePoolID, j) + if err != nil { + util.ErrorLog(ctx, "failed to add UUID mapping %s: %v", rbdVol, err) + return err + } + } + + rbdVol.ReservedID, rbdVol.RbdImageName, err = j.ReserveName( + ctx, rbdVol.JournalPool, journalPoolID, rbdVol.Pool, imagePoolID, + rbdVol.RequestName, rbdVol.NamePrefix, "", kmsID, vi.ObjectUUID) + if err != nil { + return err + } + + rbdVol.VolID, err = util.GenerateVolID(ctx, rbdVol.Monitors, cr, imagePoolID, rbdVol.Pool, + rbdVol.ClusterID, rbdVol.ReservedID, volIDVersion) + if err != nil { + return err + } + + util.DebugLog(ctx, "re-generated Volume ID (%s) and image name (%s) for request name (%s)", + rbdVol.VolID, rbdVol.RbdImageName, rbdVol.RequestName) + if rbdVol.ImageID == "" { + err = rbdVol.getImageID() + if err != nil { + util.ErrorLog(ctx, "failed to get image id %s: %v", rbdVol, err) + return err + } + err = j.StoreImageID(ctx, rbdVol.JournalPool, rbdVol.ReservedID, rbdVol.ImageID) + if err != nil { + util.ErrorLog(ctx, "failed to store volume id %s: %v", rbdVol, err) + return err + } + } + + if volumeID != rbdVol.VolID { + return j.ReserveNewUUIDMapping(ctx, rbdVol.JournalPool, volumeID, rbdVol.VolID) + } + return nil +} + +// addNewUUIDMapping creates the mapping between two volumeID. +func (rv *rbdVolume) addNewUUIDMapping(ctx context.Context, imagePoolID int64, j *journal.Connection) error { + var err error + volID := "" + + id, err := j.CheckNewUUIDMapping(ctx, rv.JournalPool, rv.VolID) + if err == nil && id == "" { + volID, err = util.GenerateVolID(ctx, rv.Monitors, rv.conn.Creds, imagePoolID, rv.Pool, + rv.ClusterID, rv.ReservedID, volIDVersion) + if err != nil { + return err + } + if rv.VolID == volID { + return nil + } + return j.ReserveNewUUIDMapping(ctx, rv.JournalPool, rv.VolID, volID) + } + return err +} diff --git a/internal/rbd/rbd_util.go b/internal/rbd/rbd_util.go index fc52e1f57..edc4572f7 100644 --- a/internal/rbd/rbd_util.go +++ b/internal/rbd/rbd_util.go @@ -663,6 +663,7 @@ func genVolFromVolID(ctx context.Context, volumeID string, cr *util.Credentials, options map[string]string vi util.CSIIdentifier rbdVol *rbdVolume + err error ) options = make(map[string]string) @@ -670,12 +671,14 @@ func genVolFromVolID(ctx context.Context, volumeID string, cr *util.Credentials, // Mounter, MultiNodeWritable rbdVol = &rbdVolume{VolID: volumeID} - err := vi.DecomposeCSIID(rbdVol.VolID) + err = vi.DecomposeCSIID(rbdVol.VolID) if err != nil { return rbdVol, fmt.Errorf("%w: error decoding volume ID (%s) (%s)", ErrInvalidVolID, err, rbdVol.VolID) } + // TODO check clusterID mapping exists + rbdVol.ClusterID = vi.ClusterID options["clusterID"] = rbdVol.ClusterID @@ -685,17 +688,6 @@ func genVolFromVolID(ctx context.Context, volumeID string, cr *util.Credentials, return rbdVol, err } - rbdVol.Pool, err = util.GetPoolName(rbdVol.Monitors, cr, vi.LocationID) - if err != nil { - return rbdVol, err - } - - err = rbdVol.Connect(cr) - if err != nil { - return rbdVol, err - } - rbdVol.JournalPool = rbdVol.Pool - rbdVol.RadosNamespace, err = util.RadosNamespace(util.CsiConfigFile, rbdVol.ClusterID) if err != nil { return nil, err @@ -707,6 +699,31 @@ func genVolFromVolID(ctx context.Context, volumeID string, cr *util.Credentials, } defer j.Destroy() + // check is there any volumeID mapping exists. + id, err := j.CheckNewUUIDMapping(ctx, rbdVol.JournalPool, volumeID) + if err != nil { + return rbdVol, fmt.Errorf("failed to get volume id %s mapping %w", + volumeID, err) + } + if id != "" { + rbdVol.VolID = id + err = vi.DecomposeCSIID(rbdVol.VolID) + if err != nil { + return rbdVol, fmt.Errorf("%w: error decoding volume ID (%s) (%s)", + ErrInvalidVolID, err, rbdVol.VolID) + } + } + rbdVol.Pool, err = util.GetPoolName(rbdVol.Monitors, cr, vi.LocationID) + if err != nil { + return rbdVol, err + } + + err = rbdVol.Connect(cr) + if err != nil { + return rbdVol, err + } + rbdVol.JournalPool = rbdVol.Pool + imageAttributes, err := j.GetImageAttributes( ctx, rbdVol.Pool, vi.ObjectUUID, false) if err != nil { @@ -982,10 +999,7 @@ type imageInfo struct { // parentInfo spec for parent volume info. type mirroring struct { - Mode string `json:"mode"` - State string `json:"state"` - GlobalID string `json:"global_id"` - Primary bool `json:"primary"` + Primary bool `json:"primary"` } // updateVolWithImageInfo updates provided rbdVolume with information from on-disk data