diff --git a/deploy/cephfs/helm/templates/nodeplugin-daemonset.yaml b/deploy/cephfs/helm/templates/nodeplugin-daemonset.yaml index 31d27337e..9c78f53b7 100644 --- a/deploy/cephfs/helm/templates/nodeplugin-daemonset.yaml +++ b/deploy/cephfs/helm/templates/nodeplugin-daemonset.yaml @@ -87,13 +87,13 @@ spec: volumeMounts: - name: mount-cache-dir mountPath: /mount-cache-dir - - name: plugin-dir + - name: socket-dir mountPath: {{ .Values.socketDir }} - - name: pods-mount-dir - mountPath: /var/lib/kubelet/pods + - name: plugin-dir + mountPath: {{ .Values.pluginDir }} mountPropagation: "Bidirectional" - - name: plugin-mount-dir - mountPath: {{ .Values.volumeDevicesDir }} + - name: mointpoint-dir + mountPath: /var/lib/kubelet/pods mountPropagation: "Bidirectional" - mountPath: /dev name: host-dev @@ -111,22 +111,22 @@ spec: volumes: - name: mount-cache-dir emptyDir: {} - - name: plugin-dir + - name: socket-dir hostPath: path: {{ .Values.socketDir }} type: DirectoryOrCreate - - name: plugin-mount-dir - hostPath: - path: {{ .Values.volumeDevicesDir }} - type: DirectoryOrCreate - name: registration-dir hostPath: path: {{ .Values.registrationDir }} type: Directory - - name: pods-mount-dir + - name: plugin-dir + hostPath: + path: {{ .Values.pluginDir }} + type: Directory + - name: mountpoint-dir hostPath: path: /var/lib/kubelet/pods - type: Directory + type: DirectoryOrCreate - name: host-dev hostPath: path: /dev diff --git a/deploy/cephfs/helm/values.yaml b/deploy/cephfs/helm/values.yaml index 0a15962a1..35b6cd30a 100644 --- a/deploy/cephfs/helm/values.yaml +++ b/deploy/cephfs/helm/values.yaml @@ -16,7 +16,7 @@ serviceAccounts: socketDir: /var/lib/kubelet/plugins/cephfs.csi.ceph.com socketFile: csi.sock registrationDir: /var/lib/kubelet/plugins_registry -volumeDevicesDir: /var/lib/kubelet/plugins/kubernetes.io/csi/volumeDevices +pluginDir: /var/lib/kubelet/plugins driverName: cephfs.csi.ceph.com configMapName: ceph-csi-config attacher: diff --git a/deploy/cephfs/kubernetes/csi-cephfsplugin.yaml b/deploy/cephfs/kubernetes/csi-cephfsplugin.yaml index 237a9d413..1440b5c91 100644 --- a/deploy/cephfs/kubernetes/csi-cephfsplugin.yaml +++ b/deploy/cephfs/kubernetes/csi-cephfsplugin.yaml @@ -38,7 +38,7 @@ spec: fieldRef: fieldPath: spec.nodeName volumeMounts: - - name: plugin-dir + - name: socket-dir mountPath: /csi - name: registration-dir mountPath: /registration @@ -73,13 +73,13 @@ spec: volumeMounts: - name: mount-cache-dir mountPath: /mount-cache-dir - - name: plugin-dir + - name: socket-dir mountPath: /csi - - name: csi-plugins-dir - mountPath: /var/lib/kubelet/plugins/kubernetes.io/csi - mountPropagation: "Bidirectional" - - name: pods-mount-dir + - name: mountpoint-dir mountPath: /var/lib/kubelet/pods + mountPropagation: Bidirectional + - name: plugin-dir + mountPath: /var/lib/kubelet/plugins mountPropagation: "Bidirectional" - name: host-sys mountPath: /sys @@ -93,21 +93,21 @@ spec: volumes: - name: mount-cache-dir emptyDir: {} - - name: plugin-dir + - name: socket-dir hostPath: path: /var/lib/kubelet/plugins/cephfs.csi.ceph.com/ type: DirectoryOrCreate - - name: csi-plugins-dir - hostPath: - path: /var/lib/kubelet/plugins/kubernetes.io/csi - type: DirectoryOrCreate - name: registration-dir hostPath: path: /var/lib/kubelet/plugins_registry/ type: Directory - - name: pods-mount-dir + - name: mountpoint-dir hostPath: path: /var/lib/kubelet/pods + type: DirectoryOrCreate + - name: plugin-dir + hostPath: + path: /var/lib/kubelet/plugins type: Directory - name: host-sys hostPath: diff --git a/deploy/rbd/helm/templates/nodeplugin-daemonset.yaml b/deploy/rbd/helm/templates/nodeplugin-daemonset.yaml index 5c31ee990..15abc5147 100644 --- a/deploy/rbd/helm/templates/nodeplugin-daemonset.yaml +++ b/deploy/rbd/helm/templates/nodeplugin-daemonset.yaml @@ -14,6 +14,8 @@ spec: app: {{ include "ceph-csi-rbd.name" . }} component: {{ .Values.nodeplugin.name }} release: {{ .Release.Name }} + updateStrategy: + type: OnDelete template: metadata: labels: @@ -51,7 +53,7 @@ spec: fieldPath: spec.nodeName imagePullPolicy: {{ .Values.nodeplugin.registrar.image.pullPolicy }} volumeMounts: - - name: plugin-dir + - name: socket-dir mountPath: /csi - name: registration-dir mountPath: /registration @@ -84,13 +86,13 @@ spec: value: "unix:/{{ .Values.socketDir }}/{{ .Values.socketFile }}" imagePullPolicy: {{ .Values.nodeplugin.plugin.image.pullPolicy }} volumeMounts: - - name: plugin-dir + - name: socket-dir mountPath: {{ .Values.socketDir }} - - name: pods-mount-dir - mountPath: /var/lib/kubelet/pods + - name: plugin-dir + mountPath: {{ .Values.pluginDir }} mountPropagation: "Bidirectional" - - name: plugin-mount-dir - mountPath: {{ .Values.volumeDevicesDir }} + - name: mointpoint-dir + mountPath: /var/lib/kubelet/pods mountPropagation: "Bidirectional" - mountPath: /dev name: host-dev @@ -106,22 +108,22 @@ spec: resources: {{ toYaml .Values.nodeplugin.plugin.resources | indent 12 }} volumes: - - name: plugin-dir + - name: socket-dir hostPath: path: {{ .Values.socketDir }} type: DirectoryOrCreate - - name: plugin-mount-dir - hostPath: - path: {{ .Values.volumeDevicesDir }} - type: DirectoryOrCreate - name: registration-dir hostPath: path: {{ .Values.registrationDir }} type: Directory - - name: pods-mount-dir + - name: plugin-dir + hostPath: + path: {{ .Values.pluginDir }} + type: Directory + - name: mountpoint-dir hostPath: path: /var/lib/kubelet/pods - type: Directory + type: DirectoryOrCreate - name: host-dev hostPath: path: /dev diff --git a/deploy/rbd/helm/values.yaml b/deploy/rbd/helm/values.yaml index 7421f1d0c..6db42cb17 100644 --- a/deploy/rbd/helm/values.yaml +++ b/deploy/rbd/helm/values.yaml @@ -16,7 +16,7 @@ serviceAccounts: socketDir: /var/lib/kubelet/plugins/rbd.csi.ceph.com socketFile: csi.sock registrationDir: /var/lib/kubelet/plugins_registry -volumeDevicesDir: /var/lib/kubelet/plugins/kubernetes.io/csi/volumeDevices +pluginDir: /var/lib/kubelet/plugins driverName: rbd.csi.ceph.com configMapName: ceph-csi-config diff --git a/deploy/rbd/kubernetes/csi-rbdplugin.yaml b/deploy/rbd/kubernetes/csi-rbdplugin.yaml index 02b268fb6..e259cce89 100644 --- a/deploy/rbd/kubernetes/csi-rbdplugin.yaml +++ b/deploy/rbd/kubernetes/csi-rbdplugin.yaml @@ -7,6 +7,8 @@ spec: selector: matchLabels: app: csi-rbdplugin + updateStrategy: + type: OnDelete template: metadata: labels: @@ -39,7 +41,7 @@ spec: fieldRef: fieldPath: spec.nodeName volumeMounts: - - name: plugin-dir + - name: socket-dir mountPath: /csi - name: registration-dir mountPath: /registration @@ -69,14 +71,8 @@ spec: value: unix:///csi/csi.sock imagePullPolicy: "IfNotPresent" volumeMounts: - - name: plugin-dir + - name: socket-dir mountPath: /csi - - name: pods-mount-dir - mountPath: /var/lib/kubelet/pods - mountPropagation: "Bidirectional" - - name: plugin-mount-dir - mountPath: /var/lib/kubelet/plugins/kubernetes.io/csi/volumeDevices/ - mountPropagation: "Bidirectional" - mountPath: /dev name: host-dev - mountPath: /rootfs @@ -88,23 +84,29 @@ spec: readOnly: true - name: ceph-csi-config mountPath: /etc/ceph-csi-config/ + - name: plugin-dir + mountPath: /var/lib/kubelet/plugins + mountPropagation: "Bidirectional" + - name: mountpoint-dir + mountPath: /var/lib/kubelet/pods + mountPropagation: "Bidirectional" volumes: - - name: plugin-dir + - name: socket-dir hostPath: path: /var/lib/kubelet/plugins/rbd.csi.ceph.com type: DirectoryOrCreate - - name: plugin-mount-dir + - name: plugin-dir hostPath: - path: /var/lib/kubelet/plugins/kubernetes.io/csi/volumeDevices/ + path: /var/lib/kubelet/plugins + type: Directory + - name: mountpoint-dir + hostPath: + path: /var/lib/kubelet/pods type: DirectoryOrCreate - name: registration-dir hostPath: path: /var/lib/kubelet/plugins_registry/ type: Directory - - name: pods-mount-dir - hostPath: - path: /var/lib/kubelet/pods - type: Directory - name: host-dev hostPath: path: /dev diff --git a/e2e/cephfs.go b/e2e/cephfs.go index f8173ba34..de16ea4d0 100644 --- a/e2e/cephfs.go +++ b/e2e/cephfs.go @@ -117,6 +117,13 @@ var _ = Describe("cephfs", func() { } }) + By("check data persist after recreating pod with same pvc", func() { + err := checkDataPersist(pvcPath, appPath, f) + if err != nil { + Fail(err.Error()) + } + }) + }) }) diff --git a/e2e/rbd.go b/e2e/rbd.go index 8882b2712..6ddd1f1d9 100644 --- a/e2e/rbd.go +++ b/e2e/rbd.go @@ -189,6 +189,13 @@ var _ = Describe("RBD", func() { Fail("validate multiple pvc failed") } }) + + By("check data persist after recreating pod with same pvc", func() { + err := checkDataPersist(pvcPath, appPath, f) + if err != nil { + Fail(err.Error()) + } + }) }) }) diff --git a/e2e/utils.go b/e2e/utils.go index de44d6e37..0d63a7e35 100644 --- a/e2e/utils.go +++ b/e2e/utils.go @@ -759,3 +759,52 @@ func GivePermToCephfsRoot(f *framework.Framework) { out = execCommandInPod(f, "chmod 777 /mnt/cephfs/", rookNS, &opt) e2elog.Logf("Setting chmod 777 on the cepfs root %s", out) } + +func checkDataPersist(pvcPath, appPath string, f *framework.Framework) error { + data := "checking data persist" + pvc, err := loadPVC(pvcPath) + if pvc == nil { + return err + } + pvc.Namespace = f.UniqueName + e2elog.Logf("The PVC template %+v", pvc) + + app, err := loadApp(appPath) + if err != nil { + return err + } + app.Labels = map[string]string{"app": "validate-data"} + app.Namespace = f.UniqueName + + err = createPVCAndApp("", f, pvc, app) + if err != nil { + return err + } + + opt := metav1.ListOptions{ + LabelSelector: "app=validate-data", + } + // write data to PVC + filePath := app.Spec.Containers[0].VolumeMounts[0].MountPath + "/test" + + execCommandInPod(f, fmt.Sprintf("echo %s > %s", data, filePath), app.Namespace, &opt) + + // delete app + err = deletePod(app.Name, app.Namespace, f.ClientSet, deployTimeout) + if err != nil { + return err + } + // recreate app and check data persist + err = createApp(f.ClientSet, app, deployTimeout) + if err != nil { + return err + } + persistData := execCommandInPod(f, fmt.Sprintf("cat %s", filePath), app.Namespace, &opt) + + if !strings.Contains(persistData, data) { + return fmt.Errorf("data not persistent expected data %s received data %s ", data, persistData) + } + + err = deletePVCAndApp("", f, pvc, app) + return err +} diff --git a/examples/rbd/storageclass.yaml b/examples/rbd/storageclass.yaml index c87f45348..a28a8021c 100644 --- a/examples/rbd/storageclass.yaml +++ b/examples/rbd/storageclass.yaml @@ -28,8 +28,8 @@ parameters: # to the 'pool'. csi.storage.k8s.io/provisioner-secret-name: csi-rbd-secret csi.storage.k8s.io/provisioner-secret-namespace: default - csi.storage.k8s.io/node-publish-secret-name: csi-rbd-secret - csi.storage.k8s.io/node-publish-secret-namespace: default + csi.storage.k8s.io/node-stage-secret-name: csi-rbd-secret + csi.storage.k8s.io/node-stage-secret-namespace: default # uncomment the following to use rbd-nbd as mounter on supported nodes # mounter: rbd-nbd diff --git a/pkg/cephfs/mountcache.go b/pkg/cephfs/mountcache.go index eafcbe2fd..9991df1e7 100644 --- a/pkg/cephfs/mountcache.go +++ b/pkg/cephfs/mountcache.go @@ -43,7 +43,7 @@ func initVolumeMountCache(driverName, mountCacheDir string) { } func remountCachedVolumes() error { - if err := os.MkdirAll(volumeMountCache.nodeCacheStore.BasePath, 0755); err != nil { + if err := util.CreateMountPoint(volumeMountCache.nodeCacheStore.BasePath); err != nil { klog.Errorf("mount-cache: failed to create %s: %v", volumeMountCache.nodeCacheStore.BasePath, err) return err } @@ -124,7 +124,7 @@ func mountOneCacheEntry(volOptions *volumeOptions, vid *volumeIdentifier, me *vo return err } - isMnt, err := isMountPoint(me.StagingPath) + isMnt, err := util.IsMountPoint(me.StagingPath) if err != nil { isMnt = false klog.Infof("mount-cache: failed to check volume mounted %s: %s %v", volID, me.StagingPath, err) diff --git a/pkg/cephfs/nodeserver.go b/pkg/cephfs/nodeserver.go index 9656cfd16..af4779435 100644 --- a/pkg/cephfs/nodeserver.go +++ b/pkg/cephfs/nodeserver.go @@ -84,8 +84,8 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol volOptions *volumeOptions vid *volumeIdentifier ) - if err := validateNodeStageVolumeRequest(req); err != nil { - return nil, status.Error(codes.InvalidArgument, err.Error()) + if err := util.ValidateNodeStageVolumeRequest(req); err != nil { + return nil, err } // Configuration @@ -115,7 +115,7 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol } } - if err = createMountPoint(stagingTargetPath); err != nil { + if err = util.CreateMountPoint(stagingTargetPath); err != nil { klog.Errorf("failed to create staging mount point at %s for volume %s: %v", stagingTargetPath, volID, err) return nil, status.Error(codes.Internal, err.Error()) } @@ -125,7 +125,7 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol // Check if the volume is already mounted - isMnt, err := isMountPoint(stagingTargetPath) + isMnt, err := util.IsMountPoint(stagingTargetPath) if err != nil { klog.Errorf("stat failed: %v", err) @@ -180,8 +180,8 @@ func (*NodeServer) mount(volOptions *volumeOptions, req *csi.NodeStageVolumeRequ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) { mountOptions := []string{"bind"} - if err := validateNodePublishVolumeRequest(req); err != nil { - return nil, status.Error(codes.InvalidArgument, err.Error()) + if err := util.ValidateNodePublishVolumeRequest(req); err != nil { + return nil, err } // Configuration @@ -189,7 +189,7 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis targetPath := req.GetTargetPath() volID := req.GetVolumeId() - if err := createMountPoint(targetPath); err != nil { + if err := util.CreateMountPoint(targetPath); err != nil { klog.Errorf("failed to create mount point at %s: %v", targetPath, err) return nil, status.Error(codes.Internal, err.Error()) } @@ -218,7 +218,7 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis // Check if the volume is already mounted - isMnt, err := isMountPoint(targetPath) + isMnt, err := util.IsMountPoint(targetPath) if err != nil { klog.Errorf("stat failed: %v", err) @@ -255,8 +255,8 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis // NodeUnpublishVolume unmounts the volume from the target path func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) { var err error - if err = validateNodeUnpublishVolumeRequest(req); err != nil { - return nil, status.Error(codes.InvalidArgument, err.Error()) + if err = util.ValidateNodeUnpublishVolumeRequest(req); err != nil { + return nil, err } targetPath := req.GetTargetPath() @@ -283,8 +283,8 @@ func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu // NodeUnstageVolume unstages the volume from the staging path func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) { var err error - if err = validateNodeUnstageVolumeRequest(req); err != nil { - return nil, status.Error(codes.InvalidArgument, err.Error()) + if err = util.ValidateNodeUnstageVolumeRequest(req); err != nil { + return nil, err } stagingTargetPath := req.GetStagingTargetPath() diff --git a/pkg/cephfs/util.go b/pkg/cephfs/util.go index 52d9c9bb5..377d03fd6 100644 --- a/pkg/cephfs/util.go +++ b/pkg/cephfs/util.go @@ -19,7 +19,6 @@ package cephfs import ( "bytes" "encoding/json" - "errors" "fmt" "os" "os/exec" @@ -30,7 +29,6 @@ import ( "github.com/ceph/ceph-csi/pkg/util" "github.com/container-storage-interface/spec/lib/go/csi" - "k8s.io/kubernetes/pkg/util/mount" ) type volumeID string @@ -79,18 +77,6 @@ func execCommandJSON(v interface{}, program string, args ...string) error { return nil } -// Used in isMountPoint() -var dummyMount = mount.New("") - -func isMountPoint(p string) (bool, error) { - notMnt, err := dummyMount.IsLikelyNotMountPoint(p) - if err != nil { - return false, status.Error(codes.Internal, err.Error()) - } - - return !notMnt, nil -} - func pathExists(p string) bool { _, err := os.Stat(p) return err == nil @@ -127,68 +113,3 @@ func (cs *ControllerServer) validateDeleteVolumeRequest() error { return nil } - -// Node service request validation -func validateNodeStageVolumeRequest(req *csi.NodeStageVolumeRequest) error { - if req.GetVolumeCapability() == nil { - return errors.New("volume capability missing in request") - } - - if req.GetVolumeId() == "" { - return errors.New("volume ID missing in request") - } - - if req.GetStagingTargetPath() == "" { - return errors.New("staging target path missing in request") - } - - if req.GetSecrets() == nil || len(req.GetSecrets()) == 0 { - return errors.New("stage secrets cannot be nil or empty") - } - - return nil -} - -func validateNodeUnstageVolumeRequest(req *csi.NodeUnstageVolumeRequest) error { - if req.GetVolumeId() == "" { - return errors.New("volume ID missing in request") - } - - if req.GetStagingTargetPath() == "" { - return errors.New("staging target path missing in request") - } - - return nil -} - -func validateNodePublishVolumeRequest(req *csi.NodePublishVolumeRequest) error { - if req.GetVolumeCapability() == nil { - return errors.New("volume capability missing in request") - } - - if req.GetVolumeId() == "" { - return errors.New("volume ID missing in request") - } - - if req.GetTargetPath() == "" { - return errors.New("target path missing in request") - } - - if req.GetStagingTargetPath() == "" { - return errors.New("staging target path missing in request") - } - - return nil -} - -func validateNodeUnpublishVolumeRequest(req *csi.NodeUnpublishVolumeRequest) error { - if req.GetVolumeId() == "" { - return errors.New("volume ID missing in request") - } - - if req.GetTargetPath() == "" { - return errors.New("target path missing in request") - } - - return nil -} diff --git a/pkg/cephfs/volume.go b/pkg/cephfs/volume.go index 7a613f3d1..3c34b9adc 100644 --- a/pkg/cephfs/volume.go +++ b/pkg/cephfs/volume.go @@ -132,7 +132,7 @@ func mountCephRoot(volID volumeID, volOptions *volumeOptions, adminCr *util.Cred // Access to cephfs's / is required volOptions.RootPath = "/" - if err := createMountPoint(cephRoot); err != nil { + if err := util.CreateMountPoint(cephRoot); err != nil { return err } diff --git a/pkg/cephfs/volumemounter.go b/pkg/cephfs/volumemounter.go index b3c7dae8b..a21471f3e 100644 --- a/pkg/cephfs/volumemounter.go +++ b/pkg/cephfs/volumemounter.go @@ -155,7 +155,7 @@ func mountFuse(mountPoint string, cr *util.Credentials, volOptions *volumeOption } func (m *fuseMounter) mount(mountPoint string, cr *util.Credentials, volOptions *volumeOptions) error { - if err := createMountPoint(mountPoint); err != nil { + if err := util.CreateMountPoint(mountPoint); err != nil { return err } @@ -186,7 +186,7 @@ func mountKernel(mountPoint string, cr *util.Credentials, volOptions *volumeOpti } func (m *kernelMounter) mount(mountPoint string, cr *util.Credentials, volOptions *volumeOptions) error { - if err := createMountPoint(mountPoint); err != nil { + if err := util.CreateMountPoint(mountPoint); err != nil { return err } @@ -236,7 +236,3 @@ func unmountVolume(mountPoint string) error { return nil } - -func createMountPoint(root string) error { - return os.MkdirAll(root, 0750) -} diff --git a/pkg/rbd/nodeserver.go b/pkg/rbd/nodeserver.go index 105954bc4..5f1548e94 100644 --- a/pkg/rbd/nodeserver.go +++ b/pkg/rbd/nodeserver.go @@ -41,59 +41,15 @@ type NodeServer struct { mounter mount.Interface } -// NodePublishVolume mounts the volume mounted to the device path to the target -// path -func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) { - targetPath := req.GetTargetPath() - if targetPath == "" { - return nil, status.Error(codes.InvalidArgument, "empty target path in request") - } - - if req.GetVolumeCapability() == nil { - return nil, status.Error(codes.InvalidArgument, "empty volume capability in request") - } - - if req.GetVolumeId() == "" { - return nil, status.Error(codes.InvalidArgument, "empty volume ID in request") - } - - cr, err := util.GetUserCredentials(req.GetSecrets()) - if err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } - - idLk := targetPathLocker.Lock(targetPath) - defer targetPathLocker.Unlock(idLk, targetPath) - - disableInUseChecks := false - - isLegacyVolume := false - volName, err := getVolumeName(req) - if err != nil { - // error ErrInvalidVolID may mean this is an 1.0.0 version volume, check for name - // pattern match in addition to error to ensure this is a likely v1.0.0 volume - if _, ok := err.(ErrInvalidVolID); !ok || !isLegacyVolumeID(req.GetVolumeId()) { - return nil, status.Error(codes.InvalidArgument, err.Error()) - } - - volName, err = getLegacyVolumeName(req) - if err != nil { - return nil, status.Error(codes.InvalidArgument, err.Error()) - } - isLegacyVolume = true - } - - isBlock := req.GetVolumeCapability().GetBlock() != nil - // Check if that target path exists properly - notMnt, err := ns.createTargetPath(targetPath, isBlock) - if err != nil { +// NodeStageVolume mounts the volume to a staging path on the node. +func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) { + if err := util.ValidateNodeStageVolumeRequest(req); err != nil { return nil, err } - if !notMnt { - return &csi.NodePublishVolumeResponse{}, nil - } - + stagingTargetPath := req.GetStagingTargetPath() + isBlock := req.GetVolumeCapability().GetBlock() != nil + disableInUseChecks := false // MULTI_NODE_MULTI_WRITER is supported by default for Block access type volumes if req.VolumeCapability.AccessMode.Mode == csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER { if isBlock { @@ -104,114 +60,215 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis } } + volID := req.GetVolumeId() + + cr, err := util.GetUserCredentials(req.GetSecrets()) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + + isLegacyVolume := false + volName, err := getVolumeName(req.GetVolumeId()) + if err != nil { + // error ErrInvalidVolID may mean this is an 1.0.0 version volume, check for name + // pattern match in addition to error to ensure this is a likely v1.0.0 volume + if _, ok := err.(ErrInvalidVolID); !ok || !isLegacyVolumeID(req.GetVolumeId()) { + return nil, status.Error(codes.InvalidArgument, err.Error()) + } + + volName, err = getLegacyVolumeName(req.GetStagingTargetPath()) + if err != nil { + return nil, status.Error(codes.InvalidArgument, err.Error()) + } + isLegacyVolume = true + } + + if isBlock { + stagingTargetPath += "/" + volID + } + + idLk := nodeVolumeIDLocker.Lock(volID) + defer nodeVolumeIDLocker.Unlock(idLk, volID) + + // Check if that target path exists properly + isNotMnt, err := ns.createMountPath(stagingTargetPath, isBlock) + if err != nil { + klog.Errorf("stat failed: %v", err) + return nil, status.Error(codes.Internal, err.Error()) + } + + if !isNotMnt { + klog.Infof("rbd: volume %s is already mounted to %s, skipping", req.GetVolumeId(), stagingTargetPath) + return &csi.NodeStageVolumeResponse{}, nil + } + volOptions, err := genVolFromVolumeOptions(req.GetVolumeContext(), req.GetSecrets(), disableInUseChecks, isLegacyVolume) if err != nil { - return nil, err + return nil, status.Error(codes.Internal, err.Error()) } volOptions.RbdImageName = volName // Mapping RBD image devicePath, err := attachRBDImage(volOptions, cr) if err != nil { - return nil, err + return nil, status.Error(codes.Internal, err.Error()) } klog.V(4).Infof("rbd image: %s/%s was successfully mapped at %s\n", req.GetVolumeId(), volOptions.Pool, devicePath) - // Publish Path - err = ns.mountVolume(req, devicePath) - if err != nil { - return nil, err - } - err = os.Chmod(targetPath, 0777) + // nodeStage Path + err = ns.mountVolumeToStagePath(req, stagingTargetPath, devicePath) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } + + err = os.Chmod(stagingTargetPath, 0777) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + + klog.Infof("rbd: successfully mounted volume %s to stagingTargetPath %s", req.GetVolumeId(), stagingTargetPath) + + return &csi.NodeStageVolumeResponse{}, nil +} + +// NodePublishVolume mounts the volume mounted to the device path to the target +// path +func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) { + + err := util.ValidateNodePublishVolumeRequest(req) + if err != nil { + return nil, err + } + targetPath := req.GetTargetPath() + isBlock := req.GetVolumeCapability().GetBlock() != nil + stagingPath := req.GetStagingTargetPath() + if isBlock { + stagingPath += "/" + req.GetVolumeId() + } + + idLk := targetPathLocker.Lock(targetPath) + defer targetPathLocker.Unlock(idLk, targetPath) + + // Check if that target path exists properly + notMnt, err := ns.createMountPath(targetPath, isBlock) + if err != nil { + return nil, err + } + + if !notMnt { + return &csi.NodePublishVolumeResponse{}, nil + } + + // Publish Path + err = ns.mountVolume(stagingPath, req) + if err != nil { + return nil, err + } + + klog.Infof("rbd: successfully mounted stagingPath %s to targetPath %s", stagingPath, targetPath) return &csi.NodePublishVolumeResponse{}, nil } -func getVolumeName(req *csi.NodePublishVolumeRequest) (string, error) { +func getVolumeName(volID string) (string, error) { var vi util.CSIIdentifier - err := vi.DecomposeCSIID(req.GetVolumeId()) + err := vi.DecomposeCSIID(volID) if err != nil { - err = fmt.Errorf("error decoding volume ID (%s) (%s)", err, req.GetVolumeId()) + err = fmt.Errorf("error decoding volume ID (%s) (%s)", err, volID) return "", ErrInvalidVolID{err} } return volJournal.NamingPrefix() + vi.ObjectUUID, nil } -func getLegacyVolumeName(req *csi.NodePublishVolumeRequest) (string, error) { +func getLegacyVolumeName(mountPath string) (string, error) { var volName string - isBlock := req.GetVolumeCapability().GetBlock() != nil - targetPath := req.GetTargetPath() - - if isBlock { - // Get volName from targetPath - s := strings.Split(targetPath, "/") - volName = s[len(s)-1] - } else { - // Get volName from targetPath - if !strings.HasSuffix(targetPath, "/mount") { - return "", fmt.Errorf("rbd: malformed value of target path: %s", targetPath) - } - s := strings.Split(strings.TrimSuffix(targetPath, "/mount"), "/") + if strings.HasSuffix(mountPath, "/globalmount") { + s := strings.Split(strings.TrimSuffix(mountPath, "/globalmount"), "/") volName = s[len(s)-1] + return volName, nil } + if strings.HasSuffix(mountPath, "/mount") { + s := strings.Split(strings.TrimSuffix(mountPath, "/mount"), "/") + volName = s[len(s)-1] + return volName, nil + } + + // get volume name for block volume + s := strings.Split(mountPath, "/") + if len(s) == 0 { + return "", fmt.Errorf("rbd: malformed value of stage target path: %s", mountPath) + } + volName = s[len(s)-1] return volName, nil } -func (ns *NodeServer) mountVolume(req *csi.NodePublishVolumeRequest, devicePath string) error { +func (ns *NodeServer) mountVolumeToStagePath(req *csi.NodeStageVolumeRequest, stagingPath, devicePath string) error { + // Publish Path + fsType := req.GetVolumeCapability().GetMount().GetFsType() + diskMounter := &mount.SafeFormatAndMount{Interface: ns.mounter, Exec: mount.NewOsExec()} + opt := []string{} + isBlock := req.GetVolumeCapability().GetBlock() != nil + var err error + + if isBlock { + opt = append(opt, "bind") + err = diskMounter.Mount(devicePath, stagingPath, fsType, opt) + } else { + err = diskMounter.FormatAndMount(devicePath, stagingPath, fsType, opt) + } + if err != nil { + klog.Errorf("failed to mount device path (%s) to staging path (%s) for volume (%s) error %s", devicePath, stagingPath, req.GetVolumeId(), err) + } + return err +} + +func (ns *NodeServer) mountVolume(stagingPath string, req *csi.NodePublishVolumeRequest) error { // Publish Path fsType := req.GetVolumeCapability().GetMount().GetFsType() readOnly := req.GetReadonly() - attrib := req.GetVolumeContext() mountFlags := req.GetVolumeCapability().GetMount().GetMountFlags() isBlock := req.GetVolumeCapability().GetBlock() != nil targetPath := req.GetTargetPath() - - klog.V(4).Infof("target %v\nisBlock %v\nfstype %v\ndevice %v\nreadonly %v\nattributes %v\n mountflags %v\n", - targetPath, isBlock, fsType, devicePath, readOnly, attrib, mountFlags) - - diskMounter := &mount.SafeFormatAndMount{Interface: ns.mounter, Exec: mount.NewOsExec()} + klog.V(4).Infof("target %v\nisBlock %v\nfstype %v\nstagingPath %v\nreadonly %v\nmountflags %v\n", + targetPath, isBlock, fsType, stagingPath, readOnly, mountFlags) + mountFlags = append(mountFlags, "bind") + if readOnly { + mountFlags = append(mountFlags, "ro") + } if isBlock { - mountFlags = append(mountFlags, "bind") - if err := diskMounter.Mount(devicePath, targetPath, fsType, mountFlags); err != nil { - return err + if err := util.Mount(stagingPath, targetPath, fsType, mountFlags); err != nil { + return status.Error(codes.Internal, err.Error()) } } else { - if readOnly { - mountFlags = append(mountFlags, "ro") - } - if err := diskMounter.FormatAndMount(devicePath, targetPath, fsType, mountFlags); err != nil { - return err + if err := util.Mount(stagingPath, targetPath, "", mountFlags); err != nil { + return status.Error(codes.Internal, err.Error()) } } return nil } -func (ns *NodeServer) createTargetPath(targetPath string, isBlock bool) (bool, error) { - // Check if that target path exists properly - notMnt, err := mount.IsNotMountPoint(ns.mounter, targetPath) +func (ns *NodeServer) createMountPath(mountPath string, isBlock bool) (bool, error) { + // Check if that mount path exists properly + notMnt, err := mount.IsNotMountPoint(ns.mounter, mountPath) if err != nil { if os.IsNotExist(err) { if isBlock { - // create an empty file // #nosec - targetPathFile, e := os.OpenFile(targetPath, os.O_CREATE|os.O_RDWR, 0750) + pathFile, e := os.OpenFile(mountPath, os.O_CREATE|os.O_RDWR, 0750) if e != nil { - klog.V(4).Infof("Failed to create targetPath:%s with error: %v", targetPath, err) + klog.V(4).Infof("Failed to create mountPath:%s with error: %v", mountPath, err) return notMnt, status.Error(codes.Internal, e.Error()) } - if err = targetPathFile.Close(); err != nil { - klog.V(4).Infof("Failed to close targetPath:%s with error: %v", targetPath, err) + if err = pathFile.Close(); err != nil { + klog.V(4).Infof("Failed to close mountPath:%s with error: %v", mountPath, err) return notMnt, status.Error(codes.Internal, err.Error()) } } else { // Create a directory - if err = os.MkdirAll(targetPath, 0750); err != nil { + if err = util.CreateMountPoint(mountPath); err != nil { return notMnt, status.Error(codes.Internal, err.Error()) } } @@ -226,18 +283,12 @@ func (ns *NodeServer) createTargetPath(targetPath string, isBlock bool) (bool, e // NodeUnpublishVolume unmounts the volume from the target path func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) { + err := util.ValidateNodeUnpublishVolumeRequest(req) + if err != nil { + return nil, err + } + targetPath := req.GetTargetPath() - if targetPath == "" { - return nil, status.Error(codes.InvalidArgument, "empty target path in request") - } - - if req.GetVolumeId() == "" { - return nil, status.Error(codes.InvalidArgument, "empty volume ID in request") - } - - idLk := targetPathLocker.Lock(targetPath) - defer targetPathLocker.Unlock(idLk, targetPath) - notMnt, err := mount.IsNotMountPoint(ns.mounter, targetPath) if err != nil { if os.IsNotExist(err) { @@ -248,21 +299,75 @@ func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu return nil, status.Error(codes.NotFound, err.Error()) } if notMnt { - // TODO should consider deleting path instead of returning error, - // once all codes become ready for csi 1.0. - return nil, status.Error(codes.NotFound, "volume not mounted") + if err = os.RemoveAll(targetPath); err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + return &csi.NodeUnpublishVolumeResponse{}, nil } - devicePath, cnt, err := mount.GetDeviceNameFromMount(ns.mounter, targetPath) + if err = ns.mounter.Unmount(targetPath); err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + + if err = os.RemoveAll(targetPath); err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + + klog.Infof("rbd: successfully unbinded volume %s from %s", req.GetVolumeId(), targetPath) + + return &csi.NodeUnpublishVolumeResponse{}, nil +} + +// NodeUnstageVolume unstages the volume from the staging path +func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) { + var err error + if err = util.ValidateNodeUnstageVolumeRequest(req); err != nil { + return nil, err + } + + stagingTargetPath := req.GetStagingTargetPath() + + // kind of hack to unmount block volumes + blockStagingPath := stagingTargetPath + "/" + req.GetVolumeId() +unmount: + notMnt, err := mount.IsNotMountPoint(ns.mounter, stagingTargetPath) + if err != nil { + if os.IsNotExist(err) { + // staging targetPath has already been deleted + klog.V(4).Infof("stagingTargetPath: %s has already been deleted", stagingTargetPath) + return &csi.NodeUnstageVolumeResponse{}, nil + } + return nil, status.Error(codes.NotFound, err.Error()) + } + + if notMnt { + _, err = os.Stat(blockStagingPath) + if err == nil && (stagingTargetPath != blockStagingPath) { + stagingTargetPath = blockStagingPath + goto unmount + } + if err = os.RemoveAll(stagingTargetPath); err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + return &csi.NodeUnstageVolumeResponse{}, nil + } + // Unmount the volume + devicePath, cnt, err := mount.GetDeviceNameFromMount(ns.mounter, stagingTargetPath) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } - if err = ns.unmount(targetPath, devicePath, cnt); err != nil { + if err = ns.unmount(stagingTargetPath, devicePath, cnt); err != nil { return nil, err } - return &csi.NodeUnpublishVolumeResponse{}, nil + if err = os.RemoveAll(stagingTargetPath); err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + + klog.Infof("rbd: successfully unmounted volume %s from %s", req.GetVolumeId(), stagingTargetPath) + + return &csi.NodeUnstageVolumeResponse{}, nil } func (ns *NodeServer) unmount(targetPath, devicePath string, cnt int) error { @@ -299,14 +404,15 @@ func (ns *NodeServer) unmount(targetPath, devicePath string, cnt int) error { // Unmapping rbd device if err = detachRBDDevice(devicePath); err != nil { klog.V(3).Infof("failed to unmap rbd device: %s with error: %v", devicePath, err) - return err + return status.Error(codes.Internal, err.Error()) } // Remove targetPath if err = os.RemoveAll(targetPath); err != nil { klog.V(3).Infof("failed to remove targetPath: %s with error: %v", targetPath, err) + return status.Error(codes.Internal, err.Error()) } - return err + return nil } func resolveBindMountedBlockDevice(mountPath string) (string, error) { // #nosec @@ -336,3 +442,18 @@ func parseFindMntResolveSource(out string) (string, error) { } return "", fmt.Errorf("parseFindMntResolveSource: %s doesn't match to any expected findMnt output", out) } + +// NodeGetCapabilities returns the supported capabilities of the node server +func (ns *NodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) { + return &csi.NodeGetCapabilitiesResponse{ + Capabilities: []*csi.NodeServiceCapability{ + { + Type: &csi.NodeServiceCapability_Rpc{ + Rpc: &csi.NodeServiceCapability_RPC{ + Type: csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME, + }, + }, + }, + }, + }, nil +} diff --git a/pkg/rbd/rbd_attach.go b/pkg/rbd/rbd_attach.go index 8b966a099..2a9c0c3d6 100644 --- a/pkg/rbd/rbd_attach.go +++ b/pkg/rbd/rbd_attach.go @@ -231,8 +231,6 @@ func attachRBDImage(volOptions *rbdVolume, cr *util.Credentials) (string, error) var err error image := volOptions.RbdImageName - imagePath := fmt.Sprintf("%s/%s", volOptions.Pool, image) - useNBD := false moduleName := rbd if volOptions.Mounter == rbdTonbd && hasNBD { @@ -242,9 +240,6 @@ func attachRBDImage(volOptions *rbdVolume, cr *util.Credentials) (string, error) devicePath, found := waitForPath(volOptions.Pool, image, 1, useNBD) if !found { - idLk := attachdetachLocker.Lock(imagePath) - defer attachdetachLocker.Unlock(idLk, imagePath) - _, err = execCommand("modprobe", []string{moduleName}) if err != nil { klog.Warningf("rbd: failed to load rbd kernel module:%v", err) diff --git a/pkg/rbd/rbd_util.go b/pkg/rbd/rbd_util.go index aeaa62435..d77cbb145 100644 --- a/pkg/rbd/rbd_util.go +++ b/pkg/rbd/rbd_util.go @@ -90,16 +90,16 @@ type rbdSnapshot struct { } var ( - // serializes operations based on "/" as key - attachdetachLocker = util.NewIDLocker() // serializes operations based on "volume name" as key volumeNameLocker = util.NewIDLocker() // serializes operations based on "snapshot name" as key snapshotNameLocker = util.NewIDLocker() - // serializes operations based on "mount target path" as key - targetPathLocker = util.NewIDLocker() // serializes delete operations on legacy volumes legacyVolumeIDLocker = util.NewIDLocker() + // serializes operations based on "mount staging path" as key + nodeVolumeIDLocker = util.NewIDLocker() + // serializes operations based on "mount target path" as key + targetPathLocker = util.NewIDLocker() supportedFeatures = sets.NewString("layering") ) diff --git a/pkg/util/util.go b/pkg/util/util.go index a53a5c4fe..fe23ad48d 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -22,8 +22,11 @@ import ( "strings" "github.com/pkg/errors" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "k8s.io/apimachinery/pkg/util/validation" "k8s.io/klog" + "k8s.io/kubernetes/pkg/util/mount" ) // remove this once kubernetes v1.14.0 release is done @@ -58,12 +61,12 @@ func roundUpSize(volumeSizeBytes, allocationUnitBytes int64) int64 { // CreatePersistanceStorage creates storage path and initializes new cache func CreatePersistanceStorage(sPath, metaDataStore, driverName string) (CachePersister, error) { var err error - if err = createPersistentStorage(path.Join(sPath, "controller")); err != nil { + if err = CreateMountPoint(path.Join(sPath, "controller")); err != nil { klog.Errorf("failed to create persistent storage for controller: %v", err) return nil, err } - if err = createPersistentStorage(path.Join(sPath, "node")); err != nil { + if err = CreateMountPoint(path.Join(sPath, "node")); err != nil { klog.Errorf("failed to create persistent storage for node: %v", err) return nil, err } @@ -76,10 +79,6 @@ func CreatePersistanceStorage(sPath, metaDataStore, driverName string) (CachePer return cp, err } -func createPersistentStorage(persistentStoragePath string) error { - return os.MkdirAll(persistentStoragePath, os.FileMode(0755)) -} - // ValidateDriverName validates the driver name func ValidateDriverName(driverName string) error { if driverName == "" { @@ -120,3 +119,25 @@ func GenerateVolID(monitors string, cr *Credentials, pool, clusterID, objUUID st return volID, err } + +// CreateMountPoint creates the directory with given path +func CreateMountPoint(mountPath string) error { + return os.MkdirAll(mountPath, 0750) +} + +// IsMountPoint checks if the given path is mountpoint or not +func IsMountPoint(p string) (bool, error) { + dummyMount := mount.New("") + notMnt, err := dummyMount.IsLikelyNotMountPoint(p) + if err != nil { + return false, status.Error(codes.Internal, err.Error()) + } + + return !notMnt, nil +} + +// Mount mounts the source to target path +func Mount(source, target, fstype string, options []string) error { + dummyMount := mount.New("") + return dummyMount.Mount(source, target, fstype, options) +} diff --git a/pkg/util/validate.go b/pkg/util/validate.go new file mode 100644 index 000000000..9db09fa13 --- /dev/null +++ b/pkg/util/validate.go @@ -0,0 +1,75 @@ +package util + +import ( + "github.com/container-storage-interface/spec/lib/go/csi" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// ValidateNodeStageVolumeRequest validates the node stage request +func ValidateNodeStageVolumeRequest(req *csi.NodeStageVolumeRequest) error { + if req.GetVolumeCapability() == nil { + return status.Error(codes.InvalidArgument, "volume capability missing in request") + } + + if req.GetVolumeId() == "" { + return status.Error(codes.InvalidArgument, "volume ID missing in request") + } + + if req.GetStagingTargetPath() == "" { + return status.Error(codes.InvalidArgument, "staging target path missing in request") + } + + if req.GetSecrets() == nil || len(req.GetSecrets()) == 0 { + return status.Error(codes.InvalidArgument, "stage secrets cannot be nil or empty") + } + + return nil +} + +// ValidateNodeUnstageVolumeRequest validates the node unstage request +func ValidateNodeUnstageVolumeRequest(req *csi.NodeUnstageVolumeRequest) error { + if req.GetVolumeId() == "" { + return status.Error(codes.InvalidArgument, "volume ID missing in request") + } + + if req.GetStagingTargetPath() == "" { + return status.Error(codes.InvalidArgument, "staging target path missing in request") + } + + return nil +} + +// ValidateNodePublishVolumeRequest validates the node publish request +func ValidateNodePublishVolumeRequest(req *csi.NodePublishVolumeRequest) error { + if req.GetVolumeCapability() == nil { + return status.Error(codes.InvalidArgument, "volume capability missing in request") + } + + if req.GetVolumeId() == "" { + return status.Error(codes.InvalidArgument, "volume ID missing in request") + } + + if req.GetTargetPath() == "" { + return status.Error(codes.InvalidArgument, "target path missing in request") + } + + if req.GetStagingTargetPath() == "" { + return status.Error(codes.InvalidArgument, "staging target path missing in request") + } + + return nil +} + +// ValidateNodeUnpublishVolumeRequest validates the node unpublish request +func ValidateNodeUnpublishVolumeRequest(req *csi.NodeUnpublishVolumeRequest) error { + if req.GetVolumeId() == "" { + return status.Error(codes.InvalidArgument, "volume ID missing in request") + } + + if req.GetTargetPath() == "" { + return status.Error(codes.InvalidArgument, "target path missing in request") + } + + return nil +}