diff --git a/internal/rbd/replicationcontrollerserver.go b/internal/rbd/replicationcontrollerserver.go index 16e16c418..021ac06e6 100644 --- a/internal/rbd/replicationcontrollerserver.go +++ b/internal/rbd/replicationcontrollerserver.go @@ -23,6 +23,7 @@ import ( "regexp" "strconv" "strings" + "sync" "time" "github.com/ceph/ceph-csi/internal/util" @@ -63,6 +64,19 @@ const ( schedulingStartTimeKey = "schedulingStartTime" ) +type operation string + +var ( + // pool+"/"+key to check dummy image is created. + dummyImageCreated operation = "dummyImageCreated" + // pool+"/"+key to check mirroring enabled on dummy image. + dummyImageMirroringEnabled operation = "dummyImageMirrorEnabled" + // pool+"/"+key to check mirroring disabled on dummy image. + dummyImageMirroringDisabled operation = "dummyImageMirrorDisabled" + // Read write lock to ensure that only one operation is happening at a time. + operationLock = sync.Map{} +) + // ReplicationServer struct of rbd CSI driver with supported methods of Replication // controller server spec. type ReplicationServer struct { @@ -232,6 +246,11 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context, return nil, status.Error(codes.Internal, err.Error()) } + err = createDummyImage(ctx, rbdVol) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to create dummy image %s", err.Error()) + } + if mirroringInfo.State != librbd.MirrorImageEnabled { err = rbdVol.enableImageMirroring(mirroringMode) if err != nil { @@ -244,6 +263,96 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context, return &replication.EnableVolumeReplicationResponse{}, nil } +// getDummyImageName returns the csi-vol-dummy+cluster FSID as the image name. +// each cluster should have a unique dummy image created. choosing the cluster +// FSID for the same reason. +func getDummyImageName(conn *util.ClusterConnection) (string, error) { + id, err := conn.GetFSID() + if err != nil { + return "", err + } + + return fmt.Sprintf("csi-vol-dummy-%s", id), nil +} + +// getOperationName returns the operation name for the given operation type +// combined with the pool name. +func getOperationName(poolName string, optName operation) string { + return fmt.Sprintf("%s/%s", poolName, optName) +} + +// createDummyImage creates a dummy image as a workaround for the rbd +// scheduling problem. +func createDummyImage(ctx context.Context, rbdVol *rbdVolume) error { + optName := getOperationName(rbdVol.Pool, dummyImageCreated) + if _, ok := operationLock.Load(optName); !ok { + // create a dummy image + imgName, err := getDummyImageName(rbdVol.conn) + if err != nil { + return err + } + dummyVol := rbdVol + dummyVol.RbdImageName = imgName + err = createImage(ctx, dummyVol, dummyVol.conn.Creds) + if err != nil && !strings.Contains(err.Error(), "File exists") { + return err + } + operationLock.Store(optName, true) + } + + return nil +} + +// enableImageMirroring enables the mirroring for the dummy image. +func enableMirroringOnDummyImage(rbdVol *rbdVolume, mirroringMode librbd.ImageMirrorMode) error { + optName := getOperationName(rbdVol.Pool, dummyImageMirroringEnabled) + if _, ok := operationLock.Load(optName); !ok { + imgName, err := getDummyImageName(rbdVol.conn) + if err != nil { + return err + } + dummyVol := rbdVol + dummyVol.RbdImageName = imgName + // this is a idempotent call we dont need to worry multiple enable calls + err = dummyVol.enableImageMirroring(mirroringMode) + if err != nil { + return err + } + operationLock.Store(optName, true) + // Remove the dummyImageMirroringDisabled lock so that the mirroring can + // be disabled on the image. + optName = getOperationName(rbdVol.Pool, dummyImageMirroringDisabled) + operationLock.Delete(optName) + } + + return nil +} + +// disableImageMirroring disables the mirroring for the dummy image. +func disableMirroringOnDummyImage(rbdVol *rbdVolume) error { + optName := getOperationName(rbdVol.Pool, dummyImageMirroringDisabled) + if _, ok := operationLock.Load(optName); !ok { + imgName, err := getDummyImageName(rbdVol.conn) + if err != nil { + return err + } + dummyVol := rbdVol + dummyVol.RbdImageName = imgName + + err = dummyVol.disableImageMirroring(false) + if err != nil { + return err + } + operationLock.Store(optName, true) + // Remove the dummyImageMirroringEnabled lock so that the mirroring can + // be re-enabled on the image. + optName = getOperationName(rbdVol.Pool, dummyImageMirroringEnabled) + operationLock.Delete(optName) + } + + return nil +} + // DisableVolumeReplication extracts the RBD volume information from the // volumeID, If the image is present and the mirroring is enabled on the RBD // image it will disable the mirroring. @@ -428,6 +537,16 @@ func (rs *ReplicationServer) PromoteVolume(ctx context.Context, } } + var mode librbd.ImageMirrorMode + mode, err = getMirroringMode(ctx, req.GetParameters()) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to get mirroring mode %s", err.Error()) + } + err = enableMirroringOnDummyImage(rbdVol, mode) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to enable mirroring on dummy image %s", err.Error()) + } + interval, startTime := getSchedulingDetails(req.GetParameters()) if interval != admin.NoInterval { err = rbdVol.addSnapshotScheduling(interval, startTime) @@ -500,6 +619,10 @@ func (rs *ReplicationServer) DemoteVolume(ctx context.Context, // demote image to secondary if mirroringInfo.Primary { + err = disableMirroringOnDummyImage(rbdVol) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to disable mirroring on dummy image %s", err.Error()) + } err = rbdVol.demoteImage() if err != nil { log.ErrorLog(ctx, err.Error())