From c3400bfb97f63e308c5258edad72e483502c9ec5 Mon Sep 17 00:00:00 2001 From: Humble Chirammal Date: Tue, 4 Aug 2020 09:55:28 +0530 Subject: [PATCH] cephfs: add snapshot create and delete functionalilies It also add helper routines like parsetime,doSnapshot..etc Signed-off-by: Humble Chirammal --- internal/cephfs/controllerserver.go | 290 ++++++++++++++++++++++++++++ 1 file changed, 290 insertions(+) diff --git a/internal/cephfs/controllerserver.go b/internal/cephfs/controllerserver.go index f40691c5b..db34aa69d 100644 --- a/internal/cephfs/controllerserver.go +++ b/internal/cephfs/controllerserver.go @@ -19,11 +19,14 @@ package cephfs import ( "context" "errors" + "fmt" csicommon "github.com/ceph/ceph-csi/internal/csi-common" "github.com/ceph/ceph-csi/internal/util" "github.com/container-storage-interface/spec/lib/go/csi" + "github.com/golang/protobuf/ptypes/timestamp" + "github.com/kubernetes-csi/csi-lib-utils/protosanitizer" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" klog "k8s.io/klog/v2" @@ -423,3 +426,290 @@ func (cs *ControllerServer) ControllerExpandVolume(ctx context.Context, req *csi NodeExpansionRequired: false, }, nil } + +// CreateSnapshot creates the snapshot in backend and stores metadata +// in store +// nolint:gocyclo // golangci-lint did not catch this earlier, needs to get fixed late +func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) { + if err := cs.validateSnapshotReq(ctx, req); err != nil { + return nil, err + } + cr, err := util.NewAdminCredentials(req.GetSecrets()) + if err != nil { + return nil, err + } + defer cr.DeleteCredentials() + + clusterData, err := getClusterInformation(req.GetParameters()) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + + requestName := req.GetName() + sourceVolID := req.GetSourceVolumeId() + // Existence and conflict checks + if acquired := cs.SnapshotLocks.TryAcquire(requestName); !acquired { + klog.Errorf(util.Log(ctx, util.SnapshotOperationAlreadyExistsFmt), requestName) + return nil, status.Errorf(codes.Aborted, util.SnapshotOperationAlreadyExistsFmt, requestName) + } + defer cs.SnapshotLocks.Release(requestName) + + if err = cs.OperationLocks.GetSnapshotCreateLock(sourceVolID); err != nil { + klog.Error(util.Log(ctx, err.Error())) + return nil, status.Error(codes.Aborted, err.Error()) + } + + defer cs.OperationLocks.ReleaseSnapshotCreateLock(sourceVolID) + + // Find the volume using the provided VolumeID + parentVolOptions, vid, err := newVolumeOptionsFromVolID(ctx, sourceVolID, nil, req.GetSecrets()) + if err != nil { + if errors.Is(err, util.ErrPoolNotFound) { + klog.Warningf(util.Log(ctx, "failed to get backend volume for %s: %v"), sourceVolID, err) + return nil, status.Error(codes.NotFound, err.Error()) + } + + if errors.Is(err, ErrVolumeNotFound) { + return nil, status.Error(codes.NotFound, err.Error()) + } + return nil, status.Error(codes.Internal, err.Error()) + } + + if clusterData.ClusterID != parentVolOptions.ClusterID { + return nil, status.Errorf(codes.InvalidArgument, "requested cluster id %s not matching subvolume cluster id %s", clusterData.ClusterID, parentVolOptions.ClusterID) + } + + cephfsSnap, genSnapErr := genSnapFromOptions(ctx, req) + if genSnapErr != nil { + return nil, status.Error(codes.Internal, genSnapErr.Error()) + } + + // lock out parallel snapshot create operations + if acquired := cs.VolumeLocks.TryAcquire(sourceVolID); !acquired { + klog.Errorf(util.Log(ctx, util.VolumeOperationAlreadyExistsFmt), sourceVolID) + return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, sourceVolID) + } + defer cs.VolumeLocks.Release(sourceVolID) + snapName := req.GetName() + sid, snapInfo, err := checkSnapExists(ctx, parentVolOptions, vid.FsSubvolName, cephfsSnap, cr) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + + // check are we able to retrieve the size of parent + // ceph fs subvolume info command got added in 14.2.10 and 15.+ + // as we are not able to retrieve the parent size we are rejecting the + // request to create snapshot. + // TODO: For this purpose we could make use of cached clusterAdditionalInfo too. + var info Subvolume + info, err = getSubVolumeInfo(ctx, parentVolOptions, cr, volumeID(vid.FsSubvolName)) + if err != nil { + // Check error code value against ErrInvalidCommand to understand the cluster + // support it or not, its safe to evaluat as the filtering + // is already done from getSubVolumeInfo() and send out the error here. + if errors.Is(err, ErrInvalidCommand) { + return nil, status.Error(codes.FailedPrecondition, "subvolume info command not supported in current ceph cluster") + } + if sid != nil { + errDefer := undoSnapReservation(ctx, parentVolOptions, *sid, snapName, cr) + if errDefer != nil { + klog.Warningf(util.Log(ctx, "failed undoing reservation of snapshot: %s (%s)"), + requestName, errDefer) + } + } + return nil, status.Error(codes.Internal, err.Error()) + } + + if sid != nil { + // check snapshot is protected + protected := true + if !(snapInfo.Protected == snapshotIsProtected) { + err = protectSnapshot(ctx, parentVolOptions, cr, volumeID(sid.FsSnapshotName), volumeID(vid.FsSubvolName)) + if err != nil { + protected = false + klog.Warningf(util.Log(ctx, "failed to protect snapshot of snapshot: %s (%s)"), + sid.FsSnapshotName, err) + } + } + + return &csi.CreateSnapshotResponse{ + Snapshot: &csi.Snapshot{ + SizeBytes: int64(info.BytesQuota), + SnapshotId: sid.SnapshotID, + SourceVolumeId: req.GetSourceVolumeId(), + CreationTime: sid.CreationTime, + ReadyToUse: protected, + }, + }, nil + } + + // Reservation + sID, err := reserveSnap(ctx, parentVolOptions, vid.FsSubvolName, cephfsSnap, cr) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + defer func() { + if err != nil { + errDefer := undoSnapReservation(ctx, parentVolOptions, *sID, snapName, cr) + if errDefer != nil { + klog.Warningf(util.Log(ctx, "failed undoing reservation of snapshot: %s (%s)"), + requestName, errDefer) + } + } + }() + snap := snapshotInfo{} + snap, err = doSnapshot(ctx, parentVolOptions, vid.FsSubvolName, sID.FsSnapshotName, cr) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + return &csi.CreateSnapshotResponse{ + Snapshot: &csi.Snapshot{ + SizeBytes: int64(info.BytesQuota), + SnapshotId: sID.SnapshotID, + SourceVolumeId: req.GetSourceVolumeId(), + CreationTime: snap.CreationTime, + ReadyToUse: true, + }, + }, nil +} + +func doSnapshot(ctx context.Context, volOpt *volumeOptions, subvolumeName, snapshotName string, cr *util.Credentials) (snapshotInfo, error) { + volID := volumeID(subvolumeName) + snapID := volumeID(snapshotName) + snap := snapshotInfo{} + err := createSnapshot(ctx, volOpt, cr, snapID, volID) + if err != nil { + klog.Errorf(util.Log(ctx, "failed to create snapshot %s %v"), snapID, err) + return snap, err + } + defer func() { + if err != nil { + dErr := deleteSnapshot(ctx, volOpt, cr, snapID, volID) + if dErr != nil { + klog.Errorf(util.Log(ctx, "failed to delete snapshot %s %v"), snapID, err) + } + } + }() + snap, err = getSnapshotInfo(ctx, volOpt, cr, snapID, volID) + if err != nil { + klog.Errorf(util.Log(ctx, "failed to get snapshot info %s %v"), snapID, err) + return snap, fmt.Errorf("failed to get snapshot info for snapshot:%s", snapID) + } + var t *timestamp.Timestamp + t, err = parseTime(ctx, snap.CreatedAt) + if err != nil { + return snap, err + } + snap.CreationTime = t + err = protectSnapshot(ctx, volOpt, cr, snapID, volID) + if err != nil { + klog.Errorf(util.Log(ctx, "failed to protect snapshot %s %v"), snapID, err) + } + return snap, err +} + +func (cs *ControllerServer) validateSnapshotReq(ctx context.Context, req *csi.CreateSnapshotRequest) error { + if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT); err != nil { + klog.Errorf(util.Log(ctx, "invalid create snapshot req: %v"), protosanitizer.StripSecrets(req)) + return err + } + + // Check sanity of request Snapshot Name, Source Volume Id + if req.Name == "" { + return status.Error(codes.NotFound, "snapshot Name cannot be empty") + } + if req.SourceVolumeId == "" { + return status.Error(codes.NotFound, "source Volume ID cannot be empty") + } + + return nil +} + +// DeleteSnapshot deletes the snapshot in backend and removes the +// snapshot metadata from store. +func (cs *ControllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) { + if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT); err != nil { + klog.Errorf(util.Log(ctx, "invalid delete snapshot req: %v"), protosanitizer.StripSecrets(req)) + return nil, err + } + + cr, err := util.NewAdminCredentials(req.GetSecrets()) + if err != nil { + return nil, status.Error(codes.InvalidArgument, err.Error()) + } + defer cr.DeleteCredentials() + snapshotID := req.GetSnapshotId() + if snapshotID == "" { + return nil, status.Error(codes.InvalidArgument, "snapshot ID cannot be empty") + } + + if acquired := cs.SnapshotLocks.TryAcquire(snapshotID); !acquired { + klog.Errorf(util.Log(ctx, util.SnapshotOperationAlreadyExistsFmt), snapshotID) + return nil, status.Errorf(codes.Aborted, util.SnapshotOperationAlreadyExistsFmt, snapshotID) + } + defer cs.SnapshotLocks.Release(snapshotID) + + // lock out snapshotID for restore operation + if err = cs.OperationLocks.GetDeleteLock(snapshotID); err != nil { + klog.Error(util.Log(ctx, err.Error())) + return nil, status.Error(codes.Aborted, err.Error()) + } + defer cs.OperationLocks.ReleaseDeleteLock(snapshotID) + + volOpt, snapInfo, sid, err := newSnapshotOptionsFromID(ctx, snapshotID, cr) + if err != nil { + // if error is ErrPoolNotFound, the pool is already deleted we dont + // need to worry about deleting snapshot or omap data, return success + if errors.Is(err, util.ErrPoolNotFound) { + klog.Warningf(util.Log(ctx, "failed to get backend snapshot for %s: %v"), snapshotID, err) + return &csi.DeleteSnapshotResponse{}, nil + } + + // if error is ErrKeyNotFound, then a previous attempt at deletion was complete + // or partially complete (snap and snapOMap are garbage collected already), hence return + // success as deletion is complete + if errors.Is(err, util.ErrKeyNotFound) { + return &csi.DeleteSnapshotResponse{}, nil + } + if errors.Is(err, ErrSnapNotFound) { + err = undoSnapReservation(ctx, volOpt, *sid, sid.FsSnapshotName, cr) + if err != nil { + klog.Errorf(util.Log(ctx, "failed to remove reservation for snapname (%s) with backing snap (%s) (%s)"), + sid.FsSubvolName, sid.FsSnapshotName, err) + return nil, status.Error(codes.Internal, err.Error()) + } + return &csi.DeleteSnapshotResponse{}, nil + } + return nil, status.Error(codes.Internal, err.Error()) + } + + // safeguard against parallel create or delete requests against the same + // name + if acquired := cs.SnapshotLocks.TryAcquire(sid.RequestName); !acquired { + klog.Errorf(util.Log(ctx, util.SnapshotOperationAlreadyExistsFmt), sid.RequestName) + return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, sid.RequestName) + } + defer cs.SnapshotLocks.Release(sid.RequestName) + + if snapInfo.HasPendingClones == "yes" { + return nil, status.Errorf(codes.FailedPrecondition, "snapshot %s has pending clones", snapshotID) + } + if snapInfo.Protected == snapshotIsProtected { + err = unprotectSnapshot(ctx, volOpt, cr, volumeID(sid.FsSnapshotName), volumeID(sid.FsSubvolName)) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + } + err = deleteSnapshot(ctx, volOpt, cr, volumeID(sid.FsSnapshotName), volumeID(sid.FsSubvolName)) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + err = undoSnapReservation(ctx, volOpt, *sid, sid.FsSnapshotName, cr) + if err != nil { + klog.Errorf(util.Log(ctx, "failed to remove reservation for snapname (%s) with backing snap (%s) (%s)"), + sid.RequestName, sid.FsSnapshotName, err) + return nil, status.Error(codes.Internal, err.Error()) + } + + return &csi.DeleteSnapshotResponse{}, nil +}