From 60588d89685f1741bf8621e5dd328021d437129b Mon Sep 17 00:00:00 2001 From: gman Date: Tue, 26 Feb 2019 11:06:16 +0100 Subject: [PATCH 1/3] cephfs/volume: create/delete-volume idempotency checks --- pkg/cephfs/util.go | 73 +++++++++++----------------- pkg/cephfs/volume.go | 111 ++++++++++++++++++++++++------------------- 2 files changed, 90 insertions(+), 94 deletions(-) diff --git a/pkg/cephfs/util.go b/pkg/cephfs/util.go index 323b965f5..19928c0fa 100644 --- a/pkg/cephfs/util.go +++ b/pkg/cephfs/util.go @@ -17,11 +17,11 @@ limitations under the License. package cephfs import ( + "bytes" "encoding/json" "errors" "fmt" - "io" - "io/ioutil" + "os" "os/exec" "google.golang.org/grpc/codes" @@ -30,61 +30,41 @@ import ( "github.com/ceph/ceph-csi/pkg/util" "github.com/container-storage-interface/spec/lib/go/csi" + "k8s.io/kubernetes/pkg/util/keymutex" "k8s.io/kubernetes/pkg/util/mount" ) type volumeID string +func mustUnlock(m keymutex.KeyMutex, key string) { + if err := m.UnlockKey(key); err != nil { + klog.Fatalf("failed to unlock mutex for %s: %v", key, err) + } +} + func makeVolumeID(volName string) volumeID { return volumeID("csi-cephfs-" + volName) } -func closePipeOnError(pipe io.Closer, err error) { - if err != nil { - if err = pipe.Close(); err != nil { - klog.Warningf("failed to close pipe: %v", err) - } - } -} - func execCommand(program string, args ...string) (stdout, stderr []byte, err error) { - cmd := exec.Command(program, args...) // nolint: gosec - stripArgs := util.StripSecretInArgs(args) - klog.V(4).Infof("cephfs: EXEC %s %s", program, stripArgs) + var ( + cmd = exec.Command(program, args...) // nolint: gosec + sanitizedArgs = util.StripSecretInArgs(args) + stdoutBuf bytes.Buffer + stderrBuf bytes.Buffer + ) - stdoutPipe, err := cmd.StdoutPipe() - if err != nil { - return nil, nil, fmt.Errorf("cannot open stdout pipe for %s %v: %v", program, stripArgs, err) + cmd.Stdout = &stdoutBuf + cmd.Stderr = &stderrBuf + + klog.V(4).Infof("cephfs: EXEC %s %s", program, sanitizedArgs) + + if err := cmd.Run(); err != nil { + return nil, nil, fmt.Errorf("an error occurred while running (%d) %s %v: %v: %s", + cmd.Process.Pid, program, sanitizedArgs, err, stderrBuf.Bytes()) } - defer closePipeOnError(stdoutPipe, err) - - stderrPipe, err := cmd.StderrPipe() - if err != nil { - return nil, nil, fmt.Errorf("cannot open stdout pipe for %s %v: %v", program, stripArgs, err) - } - - defer closePipeOnError(stderrPipe, err) - - if err = cmd.Start(); err != nil { - return nil, nil, fmt.Errorf("failed to run %s %v: %v", program, stripArgs, err) - } - - stdout, err = ioutil.ReadAll(stdoutPipe) - if err != nil { - return nil, nil, fmt.Errorf("failed to read from stdout for %s %v: %v", program, stripArgs, err) - } - - stderr, err = ioutil.ReadAll(stderrPipe) - if err != nil { - return nil, nil, fmt.Errorf("failed to read from stderr for %s %v: %v", program, stripArgs, err) - } - - if waitErr := cmd.Wait(); waitErr != nil { - return nil, nil, fmt.Errorf("an error occurred while running %s %v: %v: %s", program, stripArgs, waitErr, stderr) - } - - return + return stdoutBuf.Bytes(), stderrBuf.Bytes(), nil } func execCommandErr(program string, args ...string) error { @@ -117,6 +97,11 @@ func isMountPoint(p string) (bool, error) { return !notMnt, nil } +func pathExists(p string) bool { + _, err := os.Stat(p) + return err == nil +} + // Controller service request validation func (cs *ControllerServer) validateCreateVolumeRequest(req *csi.CreateVolumeRequest) error { if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil { diff --git a/pkg/cephfs/volume.go b/pkg/cephfs/volume.go index 683677b53..3103aa74e 100644 --- a/pkg/cephfs/volume.go +++ b/pkg/cephfs/volume.go @@ -52,79 +52,66 @@ func setVolumeAttribute(root, attrName, attrValue string) error { } func createVolume(volOptions *volumeOptions, adminCr *credentials, volID volumeID, bytesQuota int64) error { - cephRoot := getCephRootPathLocal(volID) - - if err := createMountPoint(cephRoot); err != nil { + if err := mountCephRoot(volID, volOptions, adminCr); err != nil { return err } + defer unmountCephRoot(volID) - // RootPath is not set for a dynamically provisioned volume - // Access to cephfs's / is required - volOptions.RootPath = "/" + var ( + volRoot = getCephRootVolumePathLocal(volID) + volRootCreating = volRoot + "-creating" + ) - m, err := newMounter(volOptions) - if err != nil { - return fmt.Errorf("failed to create mounter: %v", err) + if pathExists(volRoot) { + klog.V(4).Infof("cephfs: volume %s already exists, skipping creation", volID) + return nil } - if err = m.mount(cephRoot, adminCr, volOptions, volID); err != nil { - return fmt.Errorf("error mounting ceph root: %v", err) - } - - defer unmountAndRemove(cephRoot) - - volOptions.RootPath = getVolumeRootPathCeph(volID) - localVolRoot := getCephRootVolumePathLocal(volID) - - if err := createMountPoint(localVolRoot); err != nil { + if err := createMountPoint(volRootCreating); err != nil { return err } if bytesQuota > 0 { - if err := setVolumeAttribute(localVolRoot, "ceph.quota.max_bytes", fmt.Sprintf("%d", bytesQuota)); err != nil { + if err := setVolumeAttribute(volRootCreating, "ceph.quota.max_bytes", fmt.Sprintf("%d", bytesQuota)); err != nil { return err } } - if err := setVolumeAttribute(localVolRoot, "ceph.dir.layout.pool", volOptions.Pool); err != nil { + if err := setVolumeAttribute(volRootCreating, "ceph.dir.layout.pool", volOptions.Pool); err != nil { return fmt.Errorf("%v\ncephfs: Does pool '%s' exist?", err, volOptions.Pool) } - if err := setVolumeAttribute(localVolRoot, "ceph.dir.layout.pool_namespace", getVolumeNamespace(volID)); err != nil { + if err := setVolumeAttribute(volRootCreating, "ceph.dir.layout.pool_namespace", getVolumeNamespace(volID)); err != nil { return err } + if err := os.Rename(volRootCreating, volRoot); err != nil { + return fmt.Errorf("couldn't mark volume %s as created: %v", volID, err) + } + return nil } func purgeVolume(volID volumeID, adminCr *credentials, volOptions *volumeOptions) error { + if err := mountCephRoot(volID, volOptions, adminCr); err != nil { + return err + } + defer unmountCephRoot(volID) + var ( - cephRoot = getCephRootPathLocal(volID) volRoot = getCephRootVolumePathLocal(volID) volRootDeleting = volRoot + "-deleting" ) - if err := createMountPoint(cephRoot); err != nil { - return err - } - - // Root path is not set for dynamically provisioned volumes - // Access to cephfs's / is required - volOptions.RootPath = "/" - - m, err := newMounter(volOptions) - if err != nil { - return fmt.Errorf("failed to create mounter: %v", err) - } - - if err = m.mount(cephRoot, adminCr, volOptions, volID); err != nil { - return fmt.Errorf("error mounting ceph root: %v", err) - } - - defer unmountAndRemove(cephRoot) - - if err := os.Rename(volRoot, volRootDeleting); err != nil { - return fmt.Errorf("couldn't mark volume %s for deletion: %v", volID, err) + if pathExists(volRoot) { + if err := os.Rename(volRoot, volRootDeleting); err != nil { + return fmt.Errorf("couldn't mark volume %s for deletion: %v", volID, err) + } + } else { + if !pathExists(volRootDeleting) { + klog.V(4).Infof("cephfs: volume %s not found, assuming it to be already deleted", volID) + return nil + } } if err := os.RemoveAll(volRootDeleting); err != nil { @@ -134,13 +121,37 @@ func purgeVolume(volID volumeID, adminCr *credentials, volOptions *volumeOptions return nil } -func unmountAndRemove(mountPoint string) { - var err error - if err = unmountVolume(mountPoint); err != nil { - klog.Errorf("failed to unmount %s with error %s", mountPoint, err) +func mountCephRoot(volID volumeID, volOptions *volumeOptions, adminCr *credentials) error { + cephRoot := getCephRootPathLocal(volID) + + // Root path is not set for dynamically provisioned volumes + // Access to cephfs's / is required + volOptions.RootPath = "/" + + if err := createMountPoint(cephRoot); err != nil { + return err } - if err = os.Remove(mountPoint); err != nil { - klog.Errorf("failed to remove %s with error %s", mountPoint, err) + m, err := newMounter(volOptions) + if err != nil { + return fmt.Errorf("failed to create mounter: %v", err) + } + + if err = m.mount(cephRoot, adminCr, volOptions, volID); err != nil { + return fmt.Errorf("error mounting ceph root: %v", err) + } + + return nil +} + +func unmountCephRoot(volID volumeID) { + cephRoot := getCephRootPathLocal(volID) + + if err := unmountVolume(cephRoot); err != nil { + klog.Errorf("failed to unmount %s with error %s", cephRoot, err) + } + + if err := os.Remove(cephRoot); err != nil { + klog.Errorf("failed to remove %s with error %s", cephRoot, err) } } From 143003bcfdacd4b4ef32dd90b43b2a4d7055c2db Mon Sep 17 00:00:00 2001 From: gman Date: Tue, 26 Feb 2019 11:06:25 +0100 Subject: [PATCH 2/3] cephfs: added locks for {Create,Delete}Volume, NodeStageVolume --- pkg/cephfs/controllerserver.go | 11 +++++++++++ pkg/cephfs/nodeserver.go | 10 +++++++++- 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/pkg/cephfs/controllerserver.go b/pkg/cephfs/controllerserver.go index 67a342040..a3ea1290e 100644 --- a/pkg/cephfs/controllerserver.go +++ b/pkg/cephfs/controllerserver.go @@ -24,6 +24,7 @@ import ( "github.com/container-storage-interface/spec/lib/go/csi" "github.com/kubernetes-csi/drivers/pkg/csi-common" + "k8s.io/kubernetes/pkg/util/keymutex" "github.com/ceph/ceph-csi/pkg/util" ) @@ -40,6 +41,10 @@ type controllerCacheEntry struct { VolumeID volumeID } +var ( + mtxControllerVolumeID = keymutex.NewHashed(0) +) + // CreateVolume creates the volume in backend and store the volume metadata func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) { if err := cs.validateCreateVolumeRequest(req); err != nil { @@ -58,6 +63,9 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol volID := makeVolumeID(req.GetName()) + mtxControllerVolumeID.LockKey(string(volID)) + defer mustUnlock(mtxControllerVolumeID, string(volID)) + // Create a volume in case the user didn't provide one if volOptions.ProvisionVolume { @@ -143,6 +151,9 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol return nil, status.Error(codes.InvalidArgument, err.Error()) } + mtxControllerVolumeID.LockKey(string(volID)) + defer mustUnlock(mtxControllerVolumeID, string(volID)) + if err = purgeVolume(volID, cr, &ce.VolOptions); err != nil { klog.Errorf("failed to delete volume %s: %v", volID, err) return nil, status.Error(codes.Internal, err.Error()) diff --git a/pkg/cephfs/nodeserver.go b/pkg/cephfs/nodeserver.go index b9ec7284c..a5ffe1ad3 100644 --- a/pkg/cephfs/nodeserver.go +++ b/pkg/cephfs/nodeserver.go @@ -24,6 +24,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "k8s.io/klog" + "k8s.io/kubernetes/pkg/util/keymutex" "github.com/container-storage-interface/spec/lib/go/csi" "github.com/kubernetes-csi/drivers/pkg/csi-common" @@ -35,6 +36,10 @@ type NodeServer struct { *csicommon.DefaultNodeServer } +var ( + mtxNodeVolumeID = keymutex.NewHashed(0) +) + func getCredentialsForVolume(volOptions *volumeOptions, volID volumeID, req *csi.NodeStageVolumeRequest) (*credentials, error) { var ( cr *credentials @@ -44,7 +49,7 @@ func getCredentialsForVolume(volOptions *volumeOptions, volID volumeID, req *csi if volOptions.ProvisionVolume { // The volume is provisioned dynamically, get the credentials directly from Ceph - // First, store admin credentials - those are needed for retrieving the user credentials + // First, get admin credentials - those are needed for retrieving the user credentials adminCr, err := getAdminCredentials(secrets) if err != nil { @@ -100,6 +105,9 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol return nil, status.Error(codes.Internal, err.Error()) } + mtxNodeVolumeID.LockKey(string(volID)) + defer mustUnlock(mtxNodeVolumeID, string(volID)) + // Check if the volume is already mounted isMnt, err := isMountPoint(stagingTargetPath) From 2f8931315a570a7183d18db6578af6803fd36783 Mon Sep 17 00:00:00 2001 From: gman Date: Tue, 26 Feb 2019 14:46:21 +0100 Subject: [PATCH 3/3] don't attempt to delete mountpoint if unmount failed --- pkg/cephfs/volume.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/cephfs/volume.go b/pkg/cephfs/volume.go index 3103aa74e..7b8dea03a 100644 --- a/pkg/cephfs/volume.go +++ b/pkg/cephfs/volume.go @@ -149,9 +149,9 @@ func unmountCephRoot(volID volumeID) { if err := unmountVolume(cephRoot); err != nil { klog.Errorf("failed to unmount %s with error %s", cephRoot, err) - } - - if err := os.Remove(cephRoot); err != nil { - klog.Errorf("failed to remove %s with error %s", cephRoot, err) + } else { + if err := os.Remove(cephRoot); err != nil { + klog.Errorf("failed to remove %s with error %s", cephRoot, err) + } } }