From b318964af5a361098d8cb455abda45ce02e63f57 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E6=80=80=E5=AE=97?= Date: Mon, 25 Mar 2019 22:47:39 +0800 Subject: [PATCH 01/10] issue #91 issue #217 Goal we try to solve when csi exit unexpect, the pod use cephfs pv can not auto recovery because lost mount relation until pod be killed and reschedule to other node. i think this is may be a problem. may be csi plugin can do more thing to remount the old path so when pod may be auto recovery when pod exit and restart, the old mount path can use. NoGoal Pod should exit and restart when csi plugin pod exit and mount point lost. if pod not exit will get error of **transport endpoint is not connected**. implment logic csi-plugin start: 1. load all MountCachEntry from node local dir 2. check if volID exist in cluster, if no we ignore this entry, if yes continue 3. check if stagingPath exist, if yes we mount the path 4. check if all targetPath exist, if yes we binmount to staging path NodeServer: 1. NodeStageVolume: add MountCachEntry on local dir include readonly attr and ceph secret 2. NodeStagePublishVolume: add pod bind mount path to MountCachEntry and persist local dir 3. NodeStageunPublishVolume: remove pod bind mount path From MountCachEntry and persist local dir 4. NodeStageunStageVolume: remove MountCachEntry from local dir --- pkg/cephfs/driver.go | 4 + pkg/cephfs/mountcache.go | 314 ++++++++++++++++++++++++++++++++++ pkg/cephfs/mountcache_test.go | 38 ++++ pkg/cephfs/nodeserver.go | 17 ++ pkg/util/cachepersister.go | 1 + pkg/util/nodecache.go | 29 +++- 6 files changed, 395 insertions(+), 8 deletions(-) create mode 100644 pkg/cephfs/mountcache.go create mode 100644 pkg/cephfs/mountcache_test.go diff --git a/pkg/cephfs/driver.go b/pkg/cephfs/driver.go index b2272e853..ee7b446b8 100644 --- a/pkg/cephfs/driver.go +++ b/pkg/cephfs/driver.go @@ -105,6 +105,10 @@ func (fs *Driver) Run(driverName, nodeID, endpoint, volumeMounter string, cacheP klog.Fatalf("failed to write ceph configuration file: %v", err) } + if err := remountHisMountedPath(driverName, version, nodeID, cachePersister); err != nil { + klog.Warningf("failed to remounted history mounted path: %v", err) + //ignore remount fail + } // Initialize default library driver fs.cd = csicommon.NewCSIDriver(driverName, version, nodeID) diff --git a/pkg/cephfs/mountcache.go b/pkg/cephfs/mountcache.go new file mode 100644 index 000000000..2f7a0a5c6 --- /dev/null +++ b/pkg/cephfs/mountcache.go @@ -0,0 +1,314 @@ +package cephfs + +import ( + "encoding/base64" + "os" + "sync" + "syscall" + "time" + + "github.com/ceph/ceph-csi/pkg/util" + "github.com/pkg/errors" + "k8s.io/klog" +) + +type volumeMountEntry struct { + NodeID string `json:"nodeID"` + DriverName string `json:"driverName"` + DriverVersion string `json:"driverVersion"` + + Namespace string `json:"namespace"` + + VolumeID string `json:"volumeID"` + Secrets map[string]string `json:"secrets"` + StagingPath string `json:"stagingPath"` + TargetPaths map[string]bool `json:"targetPaths"` + CreateTime time.Time `json:"createTime"` + LastMountTime time.Time `json:"lastMountTime"` + LoadCount uint64 `json:"loadCount"` +} + +type volumeMountCacheMap struct { + DriverName string + DriverVersion string + NodeID string + MountFailNum int64 + MountSuccNum int64 + Volumes map[string]volumeMountEntry + NodeCacheStore util.NodeCache + MetadataStore util.CachePersister +} + +var ( + csiPersistentVolumeRoot = "/var/lib/kubelet/plugins/kubernetes.io/csi" + volumeMountCachePrefix = "cephfs-mount-cache-" + volumeMountCache volumeMountCacheMap + volumeMountCacheMtx sync.Mutex +) + +func remountHisMountedPath(name string, v string, nodeID string, cachePersister util.CachePersister) error { + volumeMountCache.Volumes = make(map[string]volumeMountEntry) + volumeMountCache.NodeID = nodeID + volumeMountCache.DriverName = name + volumeMountCache.DriverVersion = v + volumeMountCache.MountSuccNum = 0 + volumeMountCache.MountFailNum = 0 + + volumeMountCache.MetadataStore = cachePersister + + volumeMountCache.NodeCacheStore.BasePath = PluginFolder + volumeMountCache.NodeCacheStore.CacheDir = "volumes-mount-cache" + + if _, err := os.Stat(csiPersistentVolumeRoot); err != nil { + klog.Infof("mount-cache: csi pv root path %s stat fail %v, may not in daemonset csi plugin, exit", csiPersistentVolumeRoot, err) + return err + } + + if err := os.MkdirAll(volumeMountCache.NodeCacheStore.BasePath, 0755); err != nil { + klog.Fatalf("mount-cache: failed to create %s: %v", volumeMountCache.NodeCacheStore.BasePath, err) + return err + } + me := &volumeMountEntry{} + ce := &controllerCacheEntry{} + err := volumeMountCache.NodeCacheStore.ForAll(volumeMountCachePrefix, me, func(identifier string) error { + volID := me.VolumeID + klog.Infof("mount-cache: load %v", me) + if err := volumeMountCache.MetadataStore.Get(volID, ce); err != nil { + if err, ok := err.(*util.CacheEntryNotFound); ok { + klog.Infof("cephfs: metadata for volume %s not found, assuming the volume to be already deleted (%v)", volID, err) + if err := volumeMountCache.NodeCacheStore.Delete(genVolumeMountCacheFileName(volID)); err == nil { + klog.Infof("mount-cache: metadata nofound, delete volume cache entry for volume %s", volID) + } + } + } else { + if err := mountOneCacheEntry(ce, me); err == nil { + volumeMountCache.MountSuccNum++ + volumeMountCache.Volumes[me.VolumeID] = *me + } else { + volumeMountCache.MountFailNum++ + } + } + return nil + }) + if err != nil { + klog.Infof("mount-cache: metastore list cache fail %v", err) + return err + } + if volumeMountCache.MountFailNum > volumeMountCache.MountSuccNum { + return errors.New("mount-cache: too many volumes mount fail") + } + klog.Infof("mount-cache: succ remount %d volumes, fail remount %d volumes", volumeMountCache.MountSuccNum, volumeMountCache.MountFailNum) + return nil +} + +func mountOneCacheEntry(ce *controllerCacheEntry, me *volumeMountEntry) error { + volumeMountCacheMtx.Lock() + defer volumeMountCacheMtx.Unlock() + + var err error + volID := ce.VolumeID + volOptions := ce.VolOptions + + adminCr, err := getAdminCredentials(decodeCredentials(me.Secrets)) + if err != nil { + return err + } + entity, err := getCephUser(&volOptions, adminCr, volID) + if err != nil { + klog.Infof("mount-cache: failed to get ceph user: %s %v", volID, me.StagingPath) + } + cr := entity.toCredentials() + + if volOptions.ProvisionVolume { + volOptions.RootPath = getVolumeRootPathCeph(volID) + } + + err = cleanupMountPoint(me.StagingPath) + if err != nil { + klog.Infof("mount-cache: failed to cleanup volume mount point %s, remove it: %s %v", volID, me.StagingPath, err) + return err + } + + isMnt, err := isMountPoint(me.StagingPath) + if err != nil { + isMnt = false + klog.Infof("mount-cache: failed to check volume mounted %s: %s %v", volID, me.StagingPath, err) + } + + if !isMnt { + m, err := newMounter(&volOptions) + if err != nil { + klog.Errorf("mount-cache: failed to create mounter for volume %s: %v", volID, err) + return err + } + if err := m.mount(me.StagingPath, cr, &volOptions); err != nil { + klog.Errorf("mount-cache: failed to mount volume %s: %v", volID, err) + return err + } + } + for targetPath, readOnly := range me.TargetPaths { + if err := cleanupMountPoint(targetPath); err == nil { + if err := bindMount(me.StagingPath, targetPath, readOnly); err != nil { + klog.Errorf("mount-cache: failed to bind-mount volume %s: %s %s %v %v", + volID, me.StagingPath, targetPath, readOnly, err) + } else { + klog.Infof("mount-cache: succ bind-mount volume %s: %s %s %v", + volID, me.StagingPath, targetPath, readOnly) + } + } + } + return nil +} + +func cleanupMountPoint(mountPoint string) error { + if _, err := os.Stat(mountPoint); err != nil { + if IsCorruptedMnt(err) { + klog.Infof("mount-cache: corrupted mount point %s, need unmount", mountPoint) + err := execCommandErr("umount", mountPoint) + if err != nil { + klog.Infof("mount-cache: unmount %s fail %v", mountPoint, err) + //ignore error return err + } + } + } + if _, err := os.Stat(mountPoint); err != nil { + klog.Errorf("mount-cache: mount point %s stat fail %v", mountPoint, err) + return err + } + return nil +} + +func IsCorruptedMnt(err error) bool { + if err == nil { + return false + } + var underlyingError error + switch pe := err.(type) { + case nil: + return false + case *os.PathError: + underlyingError = pe.Err + case *os.LinkError: + underlyingError = pe.Err + case *os.SyscallError: + underlyingError = pe.Err + } + + return underlyingError == syscall.ENOTCONN || underlyingError == syscall.ESTALE || underlyingError == syscall.EIO || underlyingError == syscall.EACCES +} + +func genVolumeMountCacheFileName(volID string) string { + cachePath := volumeMountCachePrefix + volID + return cachePath +} + +func (mc *volumeMountCacheMap) nodeStageVolume(volID string, stagingTargetPath string, secrets map[string]string) error { + volumeMountCacheMtx.Lock() + defer volumeMountCacheMtx.Unlock() + + lastTargetPaths := make(map[string]bool) + me, ok := volumeMountCache.Volumes[volID] + if ok { + if me.StagingPath == stagingTargetPath { + klog.Infof("mount-cache: node stage volume last cache entry for volume %s stagingTargetPath %s no equal %s", + volID, me.StagingPath, stagingTargetPath) + return nil + } + lastTargetPaths = me.TargetPaths + klog.Warningf("mount-cache: node stage volume ignore last cache entry for volume %s", volID) + } + + me = volumeMountEntry{NodeID: mc.NodeID, DriverName: mc.DriverName, DriverVersion: mc.DriverVersion} + + me.VolumeID = volID + me.Secrets = encodeCredentials(secrets) + me.StagingPath = stagingTargetPath + me.TargetPaths = lastTargetPaths + + curTime := time.Now() + me.CreateTime = curTime + me.CreateTime = curTime + me.LoadCount = 0 + volumeMountCache.Volumes[volID] = me + if err := mc.NodeCacheStore.Create(genVolumeMountCacheFileName(volID), me); err != nil { + klog.Errorf("mount-cache: node stage volume failed to store a cache entry for volume %s: %v", volID, err) + return err + } + klog.Infof("mount-cache: node stage volume succ to store a cache entry for volume %s: %v", volID, me) + return nil +} + +func (mc *volumeMountCacheMap) nodeUnStageVolume(volID string, stagingTargetPath string) error { + volumeMountCacheMtx.Lock() + defer volumeMountCacheMtx.Unlock() + delete(volumeMountCache.Volumes, volID) + if err := mc.NodeCacheStore.Delete(genVolumeMountCacheFileName(volID)); err != nil { + klog.Infof("mount-cache: node unstage volume failed to delete cache entry for volume %s: %s %v", volID, stagingTargetPath, err) + return err + } + return nil +} + +func (mc *volumeMountCacheMap) nodePublishVolume(volID string, targetPath string, readOnly bool) error { + volumeMountCacheMtx.Lock() + defer volumeMountCacheMtx.Unlock() + + _, ok := volumeMountCache.Volumes[volID] + if !ok { + klog.Errorf("mount-cache: node publish volume failed to find cache entry for volume %s", volID) + return errors.New("mount-cache: node publish volume failed to find cache entry for volume") + } + volumeMountCache.Volumes[volID].TargetPaths[targetPath] = readOnly + me := volumeMountCache.Volumes[volID] + if err := mc.NodeCacheStore.Update(genVolumeMountCacheFileName(volID), me); err != nil { + klog.Errorf("mount-cache: node publish volume failed to store a cache entry for volume %s: %v", volID, err) + return err + } + return nil +} + +func (mc *volumeMountCacheMap) nodeUnPublishVolume(volID string, targetPath string) error { + volumeMountCacheMtx.Lock() + defer volumeMountCacheMtx.Unlock() + + _, ok := volumeMountCache.Volumes[volID] + if !ok { + klog.Errorf("mount-cache: node unpublish volume failed to find cache entry for volume %s", volID) + return errors.New("mount-cache: node unpublish volume failed to find cache entry for volume") + } + delete(volumeMountCache.Volumes[volID].TargetPaths, targetPath) + me := volumeMountCache.Volumes[volID] + if err := mc.NodeCacheStore.Update(genVolumeMountCacheFileName(volID), me); err != nil { + klog.Errorf("mount-cache: node unpublish volume failed to store a cache entry for volume %s: %v", volID, err) + return err + } + return nil +} + +func encodeCredentials(input map[string]string) (output map[string]string) { + output = make(map[string]string) + for key, value := range input { + nKey := base64.StdEncoding.EncodeToString([]byte(key)) + nValue := base64.StdEncoding.EncodeToString([]byte(value)) + output[nKey] = nValue + } + return output +} + +func decodeCredentials(input map[string]string) (output map[string]string) { + output = make(map[string]string) + for key, value := range input { + nKey, err := base64.StdEncoding.DecodeString(key) + if err != nil { + klog.Errorf("mount-cache: decode secret fail") + continue + } + nValue, err := base64.StdEncoding.DecodeString(value) + if err != nil { + klog.Errorf("mount-cache: decode secret fail") + continue + } + output[string(nKey)] = string(nValue) + } + return output +} diff --git a/pkg/cephfs/mountcache_test.go b/pkg/cephfs/mountcache_test.go new file mode 100644 index 000000000..6bba59c55 --- /dev/null +++ b/pkg/cephfs/mountcache_test.go @@ -0,0 +1,38 @@ +package cephfs + +import ( + "testing" +) + +func init() { +} + +func TestMountOneCacheEntry(t *testing.T) { +} + +func TestRemountHisMountedPath(t *testing.T) { +} + +func TestNodeStageVolume(t *testing.T) { +} + +func TestNodeUnStageVolume(t *testing.T) { +} + +func TestNodePublishVolume(t *testing.T) { +} + +func TestNodeUnpublishVolume(t *testing.T) { +} + +func TestEncodeDecodeCredentials(t *testing.T) { + secrets := make(map[string]string) + secrets["user_1"] = "value_1" + enSecrets := encodeCredentials(secrets) + deSecrets := decodeCredentials(enSecrets) + for key, value := range secrets { + if deSecrets[key] != value { + t.Errorf("key %s value %s not equal %s after encode decode", key, value, deSecrets[key]) + } + } +} diff --git a/pkg/cephfs/nodeserver.go b/pkg/cephfs/nodeserver.go index 51c44933a..345e4904d 100644 --- a/pkg/cephfs/nodeserver.go +++ b/pkg/cephfs/nodeserver.go @@ -154,6 +154,9 @@ func (*NodeServer) mount(volOptions *volumeOptions, req *csi.NodeStageVolumeRequ klog.Errorf("failed to mount volume %s: %v", volID, err) return status.Error(codes.Internal, err.Error()) } + if err := volumeMountCache.nodeStageVolume(req.GetVolumeId(), stagingTargetPath, req.GetSecrets()); err != nil { + klog.Warningf("mount-cache: failed stage volume %s %s: %v", volID, stagingTargetPath, err) + } return nil } @@ -195,6 +198,10 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis return nil, status.Error(codes.Internal, err.Error()) } + if err := volumeMountCache.nodePublishVolume(volID, targetPath, req.GetReadonly()); err != nil { + klog.Warningf("mount-cache: failed publish volume %s %s: %v", volID, targetPath, err) + } + klog.Infof("cephfs: successfully bind-mounted volume %s to %s", volID, targetPath) return &csi.NodePublishVolumeResponse{}, nil @@ -209,6 +216,11 @@ func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu targetPath := req.GetTargetPath() + volID := req.GetVolumeId() + if err = volumeMountCache.nodeUnPublishVolume(volID, targetPath); err != nil { + klog.Warningf("mount-cache: failed unpublish volume %s %s: %v", volID, targetPath, err) + } + // Unmount the bind-mount if err = unmountVolume(targetPath); err != nil { return nil, status.Error(codes.Internal, err.Error()) @@ -232,6 +244,11 @@ func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag stagingTargetPath := req.GetStagingTargetPath() + volID := req.GetVolumeId() + if err = volumeMountCache.nodeUnStageVolume(volID, stagingTargetPath); err != nil { + klog.Warningf("mount-cache: failed unstage volume %s %s: %v", volID, stagingTargetPath, err) + } + // Unmount the volume if err = unmountVolume(stagingTargetPath); err != nil { return nil, status.Error(codes.Internal, err.Error()) diff --git a/pkg/util/cachepersister.go b/pkg/util/cachepersister.go index ba8918587..4faf6366c 100644 --- a/pkg/util/cachepersister.go +++ b/pkg/util/cachepersister.go @@ -56,6 +56,7 @@ func NewCachePersister(metadataStore, driverName string) (CachePersister, error) klog.Infof("cache-persister: using node as metadata cache persister") nc := &NodeCache{} nc.BasePath = PluginFolder + "/" + driverName + nc.CacheDir = "controller" return nc, nil } return nil, errors.New("cache-persister: couldn't parse metadatastorage flag") diff --git a/pkg/util/nodecache.go b/pkg/util/nodecache.go index 5659d4eaa..86278f4d2 100644 --- a/pkg/util/nodecache.go +++ b/pkg/util/nodecache.go @@ -32,10 +32,9 @@ import ( // NodeCache to store metadata type NodeCache struct { BasePath string + CacheDir string } -var cacheDir = "controller" - var errDec = errors.New("file not found") // EnsureCacheDirectory creates cache directory if not present @@ -52,15 +51,15 @@ func (nc *NodeCache) EnsureCacheDirectory(cacheDir string) error { //ForAll list the metadata in Nodecache and filters outs based on the pattern func (nc *NodeCache) ForAll(pattern string, destObj interface{}, f ForAllFunc) error { - err := nc.EnsureCacheDirectory(cacheDir) + err := nc.EnsureCacheDirectory(nc.CacheDir) if err != nil { return errors.Wrap(err, "node-cache: couldn't ensure cache directory exists") } - files, err := ioutil.ReadDir(path.Join(nc.BasePath, cacheDir)) + files, err := ioutil.ReadDir(path.Join(nc.BasePath, nc.CacheDir)) if err != nil { return errors.Wrapf(err, "node-cache: failed to read %s folder", nc.BasePath) } - path := path.Join(nc.BasePath, cacheDir) + path := path.Join(nc.BasePath, nc.CacheDir) for _, file := range files { err = decodeObj(path, pattern, file, destObj) if err == errDec { @@ -102,9 +101,23 @@ func decodeObj(filepath, pattern string, file os.FileInfo, destObj interface{}) } +func (nc *NodeCache) Update(identifier string, data interface{}) error { + file := path.Join(nc.BasePath, nc.CacheDir, identifier+".json") + identifierTmp := identifier + ".creating" + fileTmp := path.Join(nc.BasePath, nc.CacheDir, identifierTmp+".json") + os.Remove(fileTmp) + if err := nc.Create(identifierTmp, data); err != nil { + return errors.Wrapf(err, "node-cache: failed to create metadata storage file %s\n", file) + } + if err := os.Rename(fileTmp, file); err != nil { + return errors.Wrapf(err, "node-cache: couldn't rename %s as %s", fileTmp, file) + } + return nil +} + // Create creates the metadata file in cache directory with identifier name func (nc *NodeCache) Create(identifier string, data interface{}) error { - file := path.Join(nc.BasePath, cacheDir, identifier+".json") + file := path.Join(nc.BasePath, nc.CacheDir, identifier+".json") fp, err := os.Create(file) if err != nil { return errors.Wrapf(err, "node-cache: failed to create metadata storage file %s\n", file) @@ -126,7 +139,7 @@ func (nc *NodeCache) Create(identifier string, data interface{}) error { // Get retrieves the metadata from cache directory with identifier name func (nc *NodeCache) Get(identifier string, data interface{}) error { - file := path.Join(nc.BasePath, cacheDir, identifier+".json") + file := path.Join(nc.BasePath, nc.CacheDir, identifier+".json") // #nosec fp, err := os.Open(file) if err != nil { @@ -153,7 +166,7 @@ func (nc *NodeCache) Get(identifier string, data interface{}) error { // Delete deletes the metadata file from cache directory with identifier name func (nc *NodeCache) Delete(identifier string) error { - file := path.Join(nc.BasePath, cacheDir, identifier+".json") + file := path.Join(nc.BasePath, nc.CacheDir, identifier+".json") err := os.Remove(file) if err != nil { if err == os.ErrNotExist { From af330fe68ea4d1e9c295e0dc224839bb6e94f134 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E6=80=80=E5=AE=97?= Date: Wed, 27 Mar 2019 16:04:58 +0800 Subject: [PATCH 02/10] 1. fix mountcache race conflict 2. support user-defined cache dir 3. if not define mountcachedir disable mountcache --- cmd/cephfs/main.go | 2 ++ pkg/cephfs/mountcache.go | 59 ++++++++++++++++++++++++++-------------- pkg/util/nodecache.go | 14 ---------- 3 files changed, 41 insertions(+), 34 deletions(-) diff --git a/cmd/cephfs/main.go b/cmd/cephfs/main.go index fc5c0dbcc..e0972f17f 100644 --- a/cmd/cephfs/main.go +++ b/cmd/cephfs/main.go @@ -31,6 +31,7 @@ var ( nodeID = flag.String("nodeid", "", "node id") volumeMounter = flag.String("volumemounter", "", "default volume mounter (possible options are 'kernel', 'fuse')") metadataStorage = flag.String("metadatastorage", "", "metadata persistence method [node|k8s_configmap]") + mountCacheDir = flag.String("mountcachedir", "", "mount info cache save dir") ) func init() { @@ -49,6 +50,7 @@ func main() { } //update plugin name cephfs.PluginFolder = cephfs.PluginFolder + *driverName + cephfs.MountCacheDir = *mountCacheDir cp, err := util.CreatePersistanceStorage(cephfs.PluginFolder, *metadataStorage, *driverName) if err != nil { diff --git a/pkg/cephfs/mountcache.go b/pkg/cephfs/mountcache.go index 2f7a0a5c6..dd7e89cb0 100644 --- a/pkg/cephfs/mountcache.go +++ b/pkg/cephfs/mountcache.go @@ -40,10 +40,10 @@ type volumeMountCacheMap struct { } var ( - csiPersistentVolumeRoot = "/var/lib/kubelet/plugins/kubernetes.io/csi" - volumeMountCachePrefix = "cephfs-mount-cache-" - volumeMountCache volumeMountCacheMap - volumeMountCacheMtx sync.Mutex + MountCacheDir = "" + volumeMountCachePrefix = "cephfs-mount-cache-" + volumeMountCache volumeMountCacheMap + volumeMountCacheMtx sync.Mutex ) func remountHisMountedPath(name string, v string, nodeID string, cachePersister util.CachePersister) error { @@ -56,16 +56,18 @@ func remountHisMountedPath(name string, v string, nodeID string, cachePersister volumeMountCache.MetadataStore = cachePersister - volumeMountCache.NodeCacheStore.BasePath = PluginFolder - volumeMountCache.NodeCacheStore.CacheDir = "volumes-mount-cache" + volumeMountCache.NodeCacheStore.BasePath = MountCacheDir + volumeMountCache.NodeCacheStore.CacheDir = "" - if _, err := os.Stat(csiPersistentVolumeRoot); err != nil { - klog.Infof("mount-cache: csi pv root path %s stat fail %v, may not in daemonset csi plugin, exit", csiPersistentVolumeRoot, err) - return err + if len(MountCacheDir) == 0 { + //if mount cache dir unset, disable remount + klog.Infof("mount-cache: mountcachedir no define disalbe mount cache.") + return nil } + klog.Infof("mount-cache: MountCacheDir: %s", MountCacheDir) if err := os.MkdirAll(volumeMountCache.NodeCacheStore.BasePath, 0755); err != nil { - klog.Fatalf("mount-cache: failed to create %s: %v", volumeMountCache.NodeCacheStore.BasePath, err) + klog.Errorf("mount-cache: failed to create %s: %v", volumeMountCache.NodeCacheStore.BasePath, err) return err } me := &volumeMountEntry{} @@ -203,6 +205,10 @@ func genVolumeMountCacheFileName(volID string) string { } func (mc *volumeMountCacheMap) nodeStageVolume(volID string, stagingTargetPath string, secrets map[string]string) error { + if len(MountCacheDir) == 0 { + //if mount cache dir unset, disable remount + return nil + } volumeMountCacheMtx.Lock() defer volumeMountCacheMtx.Unlock() @@ -210,8 +216,7 @@ func (mc *volumeMountCacheMap) nodeStageVolume(volID string, stagingTargetPath s me, ok := volumeMountCache.Volumes[volID] if ok { if me.StagingPath == stagingTargetPath { - klog.Infof("mount-cache: node stage volume last cache entry for volume %s stagingTargetPath %s no equal %s", - volID, me.StagingPath, stagingTargetPath) + klog.Warningf("mount-cache: node unexpected restage volume for volume %s", volID) return nil } lastTargetPaths = me.TargetPaths @@ -239,6 +244,10 @@ func (mc *volumeMountCacheMap) nodeStageVolume(volID string, stagingTargetPath s } func (mc *volumeMountCacheMap) nodeUnStageVolume(volID string, stagingTargetPath string) error { + if len(MountCacheDir) == 0 { + //if mount cache dir unset, disable remount + return nil + } volumeMountCacheMtx.Lock() defer volumeMountCacheMtx.Unlock() delete(volumeMountCache.Volumes, volID) @@ -250,6 +259,10 @@ func (mc *volumeMountCacheMap) nodeUnStageVolume(volID string, stagingTargetPath } func (mc *volumeMountCacheMap) nodePublishVolume(volID string, targetPath string, readOnly bool) error { + if len(MountCacheDir) == 0 { + //if mount cache dir unset, disable remount + return nil + } volumeMountCacheMtx.Lock() defer volumeMountCacheMtx.Unlock() @@ -259,15 +272,14 @@ func (mc *volumeMountCacheMap) nodePublishVolume(volID string, targetPath string return errors.New("mount-cache: node publish volume failed to find cache entry for volume") } volumeMountCache.Volumes[volID].TargetPaths[targetPath] = readOnly - me := volumeMountCache.Volumes[volID] - if err := mc.NodeCacheStore.Update(genVolumeMountCacheFileName(volID), me); err != nil { - klog.Errorf("mount-cache: node publish volume failed to store a cache entry for volume %s: %v", volID, err) - return err - } - return nil + return mc.updateNodeCache(volID) } func (mc *volumeMountCacheMap) nodeUnPublishVolume(volID string, targetPath string) error { + if len(MountCacheDir) == 0 { + //if mount cache dir unset, disable remount + return nil + } volumeMountCacheMtx.Lock() defer volumeMountCacheMtx.Unlock() @@ -277,9 +289,16 @@ func (mc *volumeMountCacheMap) nodeUnPublishVolume(volID string, targetPath stri return errors.New("mount-cache: node unpublish volume failed to find cache entry for volume") } delete(volumeMountCache.Volumes[volID].TargetPaths, targetPath) + return mc.updateNodeCache(volID) +} + +func (mc *volumeMountCacheMap) updateNodeCache(volID string) error { me := volumeMountCache.Volumes[volID] - if err := mc.NodeCacheStore.Update(genVolumeMountCacheFileName(volID), me); err != nil { - klog.Errorf("mount-cache: node unpublish volume failed to store a cache entry for volume %s: %v", volID, err) + if err := volumeMountCache.NodeCacheStore.Delete(genVolumeMountCacheFileName(volID)); err == nil { + klog.Infof("mount-cache: metadata nofound, delete mount cache failed for volume %s", volID) + } + if err := mc.NodeCacheStore.Create(genVolumeMountCacheFileName(volID), me); err != nil { + klog.Errorf("mount-cache: mount cache failed to update for volume %s: %v", volID, err) return err } return nil diff --git a/pkg/util/nodecache.go b/pkg/util/nodecache.go index 86278f4d2..bcb94debc 100644 --- a/pkg/util/nodecache.go +++ b/pkg/util/nodecache.go @@ -101,20 +101,6 @@ func decodeObj(filepath, pattern string, file os.FileInfo, destObj interface{}) } -func (nc *NodeCache) Update(identifier string, data interface{}) error { - file := path.Join(nc.BasePath, nc.CacheDir, identifier+".json") - identifierTmp := identifier + ".creating" - fileTmp := path.Join(nc.BasePath, nc.CacheDir, identifierTmp+".json") - os.Remove(fileTmp) - if err := nc.Create(identifierTmp, data); err != nil { - return errors.Wrapf(err, "node-cache: failed to create metadata storage file %s\n", file) - } - if err := os.Rename(fileTmp, file); err != nil { - return errors.Wrapf(err, "node-cache: couldn't rename %s as %s", fileTmp, file) - } - return nil -} - // Create creates the metadata file in cache directory with identifier name func (nc *NodeCache) Create(identifier string, data interface{}) error { file := path.Join(nc.BasePath, nc.CacheDir, identifier+".json") From 5b53e90ee442f9b68e3c8e1ed014fa2398fdc82c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E6=80=80=E5=AE=97?= Date: Fri, 29 Mar 2019 10:18:59 +0800 Subject: [PATCH 03/10] fix code style --- cmd/cephfs/main.go | 3 +- pkg/cephfs/driver.go | 11 ++- pkg/cephfs/mountcache.go | 168 ++++++++++++++++++--------------------- pkg/cephfs/nodeserver.go | 2 +- 4 files changed, 85 insertions(+), 99 deletions(-) diff --git a/cmd/cephfs/main.go b/cmd/cephfs/main.go index e0972f17f..a4d01a5bc 100644 --- a/cmd/cephfs/main.go +++ b/cmd/cephfs/main.go @@ -50,7 +50,6 @@ func main() { } //update plugin name cephfs.PluginFolder = cephfs.PluginFolder + *driverName - cephfs.MountCacheDir = *mountCacheDir cp, err := util.CreatePersistanceStorage(cephfs.PluginFolder, *metadataStorage, *driverName) if err != nil { @@ -58,7 +57,7 @@ func main() { } driver := cephfs.NewDriver() - driver.Run(*driverName, *nodeID, *endpoint, *volumeMounter, cp) + driver.Run(*driverName, *nodeID, *endpoint, *volumeMounter, *mountCacheDir, cp) os.Exit(0) } diff --git a/pkg/cephfs/driver.go b/pkg/cephfs/driver.go index ee7b446b8..7c5dfc61e 100644 --- a/pkg/cephfs/driver.go +++ b/pkg/cephfs/driver.go @@ -77,7 +77,7 @@ func NewNodeServer(d *csicommon.CSIDriver) *NodeServer { // Run start a non-blocking grpc controller,node and identityserver for // ceph CSI driver which can serve multiple parallel requests -func (fs *Driver) Run(driverName, nodeID, endpoint, volumeMounter string, cachePersister util.CachePersister) { +func (fs *Driver) Run(driverName, nodeID, endpoint, volumeMounter, mountCacheDir string, cachePersister util.CachePersister) { klog.Infof("Driver: %v version: %v", driverName, version) // Configuration @@ -105,9 +105,12 @@ func (fs *Driver) Run(driverName, nodeID, endpoint, volumeMounter string, cacheP klog.Fatalf("failed to write ceph configuration file: %v", err) } - if err := remountHisMountedPath(driverName, version, nodeID, cachePersister); err != nil { - klog.Warningf("failed to remounted history mounted path: %v", err) - //ignore remount fail + initVolumeMountCache(driverName, mountCacheDir, cachePersister) + if mountCacheDir != "" { + if err := remountCachedVolumes(); err != nil { + klog.Warningf("failed to remount cached volumes: %v", err) + //ignore remount fail + } } // Initialize default library driver diff --git a/pkg/cephfs/mountcache.go b/pkg/cephfs/mountcache.go index dd7e89cb0..64daf9a01 100644 --- a/pkg/cephfs/mountcache.go +++ b/pkg/cephfs/mountcache.go @@ -12,82 +12,66 @@ import ( "k8s.io/klog" ) -type volumeMountEntry struct { - NodeID string `json:"nodeID"` +type volumeMountCacheEntry struct { DriverName string `json:"driverName"` DriverVersion string `json:"driverVersion"` - Namespace string `json:"namespace"` - VolumeID string `json:"volumeID"` Secrets map[string]string `json:"secrets"` StagingPath string `json:"stagingPath"` TargetPaths map[string]bool `json:"targetPaths"` CreateTime time.Time `json:"createTime"` LastMountTime time.Time `json:"lastMountTime"` - LoadCount uint64 `json:"loadCount"` } type volumeMountCacheMap struct { - DriverName string - DriverVersion string - NodeID string - MountFailNum int64 - MountSuccNum int64 - Volumes map[string]volumeMountEntry - NodeCacheStore util.NodeCache - MetadataStore util.CachePersister + driverName string + volumes map[string]volumeMountCacheEntry + nodeCacheStore util.NodeCache + metadataStore util.CachePersister } var ( - MountCacheDir = "" volumeMountCachePrefix = "cephfs-mount-cache-" volumeMountCache volumeMountCacheMap volumeMountCacheMtx sync.Mutex ) -func remountHisMountedPath(name string, v string, nodeID string, cachePersister util.CachePersister) error { - volumeMountCache.Volumes = make(map[string]volumeMountEntry) - volumeMountCache.NodeID = nodeID - volumeMountCache.DriverName = name - volumeMountCache.DriverVersion = v - volumeMountCache.MountSuccNum = 0 - volumeMountCache.MountFailNum = 0 +func initVolumeMountCache(driverName string, mountCacheDir string, cachePersister util.CachePersister) { + volumeMountCache.volumes = make(map[string]volumeMountCacheEntry) - volumeMountCache.MetadataStore = cachePersister + volumeMountCache.driverName = driverName + volumeMountCache.metadataStore = cachePersister + volumeMountCache.nodeCacheStore.BasePath = mountCacheDir + volumeMountCache.nodeCacheStore.CacheDir = "" + klog.Infof("mount-cache: name: %s, version: %s, mountCacheDir: %s", driverName, version, mountCacheDir) +} - volumeMountCache.NodeCacheStore.BasePath = MountCacheDir - volumeMountCache.NodeCacheStore.CacheDir = "" - - if len(MountCacheDir) == 0 { - //if mount cache dir unset, disable remount - klog.Infof("mount-cache: mountcachedir no define disalbe mount cache.") - return nil - } - - klog.Infof("mount-cache: MountCacheDir: %s", MountCacheDir) - if err := os.MkdirAll(volumeMountCache.NodeCacheStore.BasePath, 0755); err != nil { - klog.Errorf("mount-cache: failed to create %s: %v", volumeMountCache.NodeCacheStore.BasePath, err) +func remountCachedVolumes() error { + if err := os.MkdirAll(volumeMountCache.nodeCacheStore.BasePath, 0755); err != nil { + klog.Errorf("mount-cache: failed to create %s: %v", volumeMountCache.nodeCacheStore.BasePath, err) return err } - me := &volumeMountEntry{} + var remountFailCount, remountSuccCount int64 + me := &volumeMountCacheEntry{} ce := &controllerCacheEntry{} - err := volumeMountCache.NodeCacheStore.ForAll(volumeMountCachePrefix, me, func(identifier string) error { + err := volumeMountCache.nodeCacheStore.ForAll(volumeMountCachePrefix, me, func(identifier string) error { volID := me.VolumeID - klog.Infof("mount-cache: load %v", me) - if err := volumeMountCache.MetadataStore.Get(volID, ce); err != nil { + if err := volumeMountCache.metadataStore.Get(volID, ce); err != nil { if err, ok := err.(*util.CacheEntryNotFound); ok { - klog.Infof("cephfs: metadata for volume %s not found, assuming the volume to be already deleted (%v)", volID, err) - if err := volumeMountCache.NodeCacheStore.Delete(genVolumeMountCacheFileName(volID)); err == nil { + klog.Infof("mount-cache: metadata for volume %s not found, assuming the volume to be already deleted (%v)", volID, err) + if err := volumeMountCache.nodeCacheStore.Delete(genVolumeMountCacheFileName(volID)); err == nil { klog.Infof("mount-cache: metadata nofound, delete volume cache entry for volume %s", volID) } } } else { if err := mountOneCacheEntry(ce, me); err == nil { - volumeMountCache.MountSuccNum++ - volumeMountCache.Volumes[me.VolumeID] = *me + remountSuccCount++ + volumeMountCache.volumes[me.VolumeID] = *me + klog.Infof("mount-cache: remount volume %s succ", volID) } else { - volumeMountCache.MountFailNum++ + remountFailCount++ + klog.Infof("mount-cache: remount volume cache %s fail", volID) } } return nil @@ -96,33 +80,42 @@ func remountHisMountedPath(name string, v string, nodeID string, cachePersister klog.Infof("mount-cache: metastore list cache fail %v", err) return err } - if volumeMountCache.MountFailNum > volumeMountCache.MountSuccNum { - return errors.New("mount-cache: too many volumes mount fail") + if remountFailCount > 0 { + klog.Infof("mount-cache: succ remount %d volumes, fail remount %d volumes", remountSuccCount, remountFailCount) + } else { + klog.Infof("mount-cache: volume cache num %d, all succ remount", remountSuccCount) } - klog.Infof("mount-cache: succ remount %d volumes, fail remount %d volumes", volumeMountCache.MountSuccNum, volumeMountCache.MountFailNum) return nil } -func mountOneCacheEntry(ce *controllerCacheEntry, me *volumeMountEntry) error { +func mountOneCacheEntry(ce *controllerCacheEntry, me *volumeMountCacheEntry) error { volumeMountCacheMtx.Lock() defer volumeMountCacheMtx.Unlock() - var err error + var ( + err error + cr *credentials + ) volID := ce.VolumeID volOptions := ce.VolOptions - adminCr, err := getAdminCredentials(decodeCredentials(me.Secrets)) - if err != nil { - return err - } - entity, err := getCephUser(&volOptions, adminCr, volID) - if err != nil { - klog.Infof("mount-cache: failed to get ceph user: %s %v", volID, me.StagingPath) - } - cr := entity.toCredentials() - if volOptions.ProvisionVolume { volOptions.RootPath = getVolumeRootPathCeph(volID) + cr, err = getAdminCredentials(decodeCredentials(me.Secrets)) + if err != nil { + return err + } + var entity *cephEntity + entity, err = getCephUser(&volOptions, cr, volID) + if err != nil { + return err + } + cr = entity.toCredentials() + } else { + cr, err = getUserCredentials(decodeCredentials(me.Secrets)) + if err != nil { + return err + } } err = cleanupMountPoint(me.StagingPath) @@ -164,7 +157,7 @@ func mountOneCacheEntry(ce *controllerCacheEntry, me *volumeMountEntry) error { func cleanupMountPoint(mountPoint string) error { if _, err := os.Stat(mountPoint); err != nil { - if IsCorruptedMnt(err) { + if isCorruptedMnt(err) { klog.Infof("mount-cache: corrupted mount point %s, need unmount", mountPoint) err := execCommandErr("umount", mountPoint) if err != nil { @@ -180,7 +173,7 @@ func cleanupMountPoint(mountPoint string) error { return nil } -func IsCorruptedMnt(err error) bool { +func isCorruptedMnt(err error) bool { if err == nil { return false } @@ -203,17 +196,20 @@ func genVolumeMountCacheFileName(volID string) string { cachePath := volumeMountCachePrefix + volID return cachePath } +func (mc *volumeMountCacheMap) isEnable() bool { + //if mount cache dir unset, disable state + return mc.nodeCacheStore.BasePath != "" +} func (mc *volumeMountCacheMap) nodeStageVolume(volID string, stagingTargetPath string, secrets map[string]string) error { - if len(MountCacheDir) == 0 { - //if mount cache dir unset, disable remount + if !mc.isEnable() { return nil } volumeMountCacheMtx.Lock() defer volumeMountCacheMtx.Unlock() lastTargetPaths := make(map[string]bool) - me, ok := volumeMountCache.Volumes[volID] + me, ok := volumeMountCache.volumes[volID] if ok { if me.StagingPath == stagingTargetPath { klog.Warningf("mount-cache: node unexpected restage volume for volume %s", volID) @@ -223,82 +219,70 @@ func (mc *volumeMountCacheMap) nodeStageVolume(volID string, stagingTargetPath s klog.Warningf("mount-cache: node stage volume ignore last cache entry for volume %s", volID) } - me = volumeMountEntry{NodeID: mc.NodeID, DriverName: mc.DriverName, DriverVersion: mc.DriverVersion} + me = volumeMountCacheEntry{DriverName: mc.driverName, DriverVersion: version} me.VolumeID = volID me.Secrets = encodeCredentials(secrets) me.StagingPath = stagingTargetPath me.TargetPaths = lastTargetPaths - curTime := time.Now() - me.CreateTime = curTime - me.CreateTime = curTime - me.LoadCount = 0 - volumeMountCache.Volumes[volID] = me - if err := mc.NodeCacheStore.Create(genVolumeMountCacheFileName(volID), me); err != nil { - klog.Errorf("mount-cache: node stage volume failed to store a cache entry for volume %s: %v", volID, err) + me.CreateTime = time.Now() + volumeMountCache.volumes[volID] = me + if err := mc.nodeCacheStore.Create(genVolumeMountCacheFileName(volID), me); err != nil { return err } - klog.Infof("mount-cache: node stage volume succ to store a cache entry for volume %s: %v", volID, me) return nil } -func (mc *volumeMountCacheMap) nodeUnStageVolume(volID string, stagingTargetPath string) error { - if len(MountCacheDir) == 0 { - //if mount cache dir unset, disable remount +func (mc *volumeMountCacheMap) nodeUnStageVolume(volID string) error { + if !mc.isEnable() { return nil } volumeMountCacheMtx.Lock() defer volumeMountCacheMtx.Unlock() - delete(volumeMountCache.Volumes, volID) - if err := mc.NodeCacheStore.Delete(genVolumeMountCacheFileName(volID)); err != nil { - klog.Infof("mount-cache: node unstage volume failed to delete cache entry for volume %s: %s %v", volID, stagingTargetPath, err) + delete(volumeMountCache.volumes, volID) + if err := mc.nodeCacheStore.Delete(genVolumeMountCacheFileName(volID)); err != nil { return err } return nil } func (mc *volumeMountCacheMap) nodePublishVolume(volID string, targetPath string, readOnly bool) error { - if len(MountCacheDir) == 0 { - //if mount cache dir unset, disable remount + if !mc.isEnable() { return nil } volumeMountCacheMtx.Lock() defer volumeMountCacheMtx.Unlock() - _, ok := volumeMountCache.Volumes[volID] + _, ok := volumeMountCache.volumes[volID] if !ok { - klog.Errorf("mount-cache: node publish volume failed to find cache entry for volume %s", volID) return errors.New("mount-cache: node publish volume failed to find cache entry for volume") } - volumeMountCache.Volumes[volID].TargetPaths[targetPath] = readOnly + volumeMountCache.volumes[volID].TargetPaths[targetPath] = readOnly return mc.updateNodeCache(volID) } func (mc *volumeMountCacheMap) nodeUnPublishVolume(volID string, targetPath string) error { - if len(MountCacheDir) == 0 { - //if mount cache dir unset, disable remount + if !mc.isEnable() { return nil } volumeMountCacheMtx.Lock() defer volumeMountCacheMtx.Unlock() - _, ok := volumeMountCache.Volumes[volID] + _, ok := volumeMountCache.volumes[volID] if !ok { - klog.Errorf("mount-cache: node unpublish volume failed to find cache entry for volume %s", volID) return errors.New("mount-cache: node unpublish volume failed to find cache entry for volume") } - delete(volumeMountCache.Volumes[volID].TargetPaths, targetPath) + delete(volumeMountCache.volumes[volID].TargetPaths, targetPath) return mc.updateNodeCache(volID) } func (mc *volumeMountCacheMap) updateNodeCache(volID string) error { - me := volumeMountCache.Volumes[volID] - if err := volumeMountCache.NodeCacheStore.Delete(genVolumeMountCacheFileName(volID)); err == nil { + me := volumeMountCache.volumes[volID] + if err := volumeMountCache.nodeCacheStore.Delete(genVolumeMountCacheFileName(volID)); err == nil { klog.Infof("mount-cache: metadata nofound, delete mount cache failed for volume %s", volID) } - if err := mc.NodeCacheStore.Create(genVolumeMountCacheFileName(volID), me); err != nil { - klog.Errorf("mount-cache: mount cache failed to update for volume %s: %v", volID, err) + if err := mc.nodeCacheStore.Create(genVolumeMountCacheFileName(volID), me); err != nil { return err } return nil diff --git a/pkg/cephfs/nodeserver.go b/pkg/cephfs/nodeserver.go index 345e4904d..56d909ba9 100644 --- a/pkg/cephfs/nodeserver.go +++ b/pkg/cephfs/nodeserver.go @@ -245,7 +245,7 @@ func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag stagingTargetPath := req.GetStagingTargetPath() volID := req.GetVolumeId() - if err = volumeMountCache.nodeUnStageVolume(volID, stagingTargetPath); err != nil { + if err = volumeMountCache.nodeUnStageVolume(volID); err != nil { klog.Warningf("mount-cache: failed unstage volume %s %s: %v", volID, stagingTargetPath, err) } From 043d3603ff2464e28fbc0cc91e126a329525005f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E6=80=80=E5=AE=97?= Date: Fri, 29 Mar 2019 10:49:22 +0800 Subject: [PATCH 04/10] remove unuse var --- pkg/cephfs/mountcache.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/pkg/cephfs/mountcache.go b/pkg/cephfs/mountcache.go index 64daf9a01..a7b73d85e 100644 --- a/pkg/cephfs/mountcache.go +++ b/pkg/cephfs/mountcache.go @@ -16,12 +16,11 @@ type volumeMountCacheEntry struct { DriverName string `json:"driverName"` DriverVersion string `json:"driverVersion"` - VolumeID string `json:"volumeID"` - Secrets map[string]string `json:"secrets"` - StagingPath string `json:"stagingPath"` - TargetPaths map[string]bool `json:"targetPaths"` - CreateTime time.Time `json:"createTime"` - LastMountTime time.Time `json:"lastMountTime"` + VolumeID string `json:"volumeID"` + Secrets map[string]string `json:"secrets"` + StagingPath string `json:"stagingPath"` + TargetPaths map[string]bool `json:"targetPaths"` + CreateTime time.Time `json:"createTime"` } type volumeMountCacheMap struct { From 4ec3a5777a4a20eaaa0b8e99a90529670365d4d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E6=80=80=E5=AE=97?= Date: Fri, 29 Mar 2019 16:09:05 +0800 Subject: [PATCH 05/10] code style --- pkg/cephfs/mountcache.go | 49 ++++++++++++++++++---------------------- 1 file changed, 22 insertions(+), 27 deletions(-) diff --git a/pkg/cephfs/mountcache.go b/pkg/cephfs/mountcache.go index a7b73d85e..fbc24df54 100644 --- a/pkg/cephfs/mountcache.go +++ b/pkg/cephfs/mountcache.go @@ -13,7 +13,6 @@ import ( ) type volumeMountCacheEntry struct { - DriverName string `json:"driverName"` DriverVersion string `json:"driverVersion"` VolumeID string `json:"volumeID"` @@ -24,7 +23,6 @@ type volumeMountCacheEntry struct { } type volumeMountCacheMap struct { - driverName string volumes map[string]volumeMountCacheEntry nodeCacheStore util.NodeCache metadataStore util.CachePersister @@ -39,7 +37,6 @@ var ( func initVolumeMountCache(driverName string, mountCacheDir string, cachePersister util.CachePersister) { volumeMountCache.volumes = make(map[string]volumeMountCacheEntry) - volumeMountCache.driverName = driverName volumeMountCache.metadataStore = cachePersister volumeMountCache.nodeCacheStore.BasePath = mountCacheDir volumeMountCache.nodeCacheStore.CacheDir = "" @@ -58,19 +55,19 @@ func remountCachedVolumes() error { volID := me.VolumeID if err := volumeMountCache.metadataStore.Get(volID, ce); err != nil { if err, ok := err.(*util.CacheEntryNotFound); ok { - klog.Infof("mount-cache: metadata for volume %s not found, assuming the volume to be already deleted (%v)", volID, err) + klog.Infof("mount-cache: metadata not found, assuming the volume %s to be already deleted (%v)", volID, err) if err := volumeMountCache.nodeCacheStore.Delete(genVolumeMountCacheFileName(volID)); err == nil { - klog.Infof("mount-cache: metadata nofound, delete volume cache entry for volume %s", volID) + klog.Infof("mount-cache: metadata not found, delete volume cache entry for volume %s", volID) } } } else { if err := mountOneCacheEntry(ce, me); err == nil { remountSuccCount++ volumeMountCache.volumes[me.VolumeID] = *me - klog.Infof("mount-cache: remount volume %s succ", volID) + klog.Infof("mount-cache: remount volume %s success", volID) } else { remountFailCount++ - klog.Infof("mount-cache: remount volume cache %s fail", volID) + klog.Errorf("mount-cache: remount volume cache %s fail", volID) } } return nil @@ -80,7 +77,7 @@ func remountCachedVolumes() error { return err } if remountFailCount > 0 { - klog.Infof("mount-cache: succ remount %d volumes, fail remount %d volumes", remountSuccCount, remountFailCount) + klog.Infof("mount-cache: success remount %d volumes, fail remount %d volumes", remountSuccCount, remountFailCount) } else { klog.Infof("mount-cache: volume cache num %d, all succ remount", remountSuccCount) } @@ -146,7 +143,7 @@ func mountOneCacheEntry(ce *controllerCacheEntry, me *volumeMountCacheEntry) err klog.Errorf("mount-cache: failed to bind-mount volume %s: %s %s %v %v", volID, me.StagingPath, targetPath, readOnly, err) } else { - klog.Infof("mount-cache: succ bind-mount volume %s: %s %s %v", + klog.Infof("mount-cache: successfully bind-mount volume %s: %s %s %v", volID, me.StagingPath, targetPath, readOnly) } } @@ -173,9 +170,6 @@ func cleanupMountPoint(mountPoint string) error { } func isCorruptedMnt(err error) bool { - if err == nil { - return false - } var underlyingError error switch pe := err.(type) { case nil: @@ -186,9 +180,19 @@ func isCorruptedMnt(err error) bool { underlyingError = pe.Err case *os.SyscallError: underlyingError = pe.Err + default: + return false } - return underlyingError == syscall.ENOTCONN || underlyingError == syscall.ESTALE || underlyingError == syscall.EIO || underlyingError == syscall.EACCES + CorruptedErrors := []error{ + syscall.ENOTCONN, syscall.ESTALE, syscall.EIO, syscall.EACCES} + + for _, v := range CorruptedErrors { + if underlyingError == v { + return true + } + } + return false } func genVolumeMountCacheFileName(volID string) string { @@ -218,7 +222,7 @@ func (mc *volumeMountCacheMap) nodeStageVolume(volID string, stagingTargetPath s klog.Warningf("mount-cache: node stage volume ignore last cache entry for volume %s", volID) } - me = volumeMountCacheEntry{DriverName: mc.driverName, DriverVersion: version} + me = volumeMountCacheEntry{DriverVersion: version} me.VolumeID = volID me.Secrets = encodeCredentials(secrets) @@ -227,10 +231,7 @@ func (mc *volumeMountCacheMap) nodeStageVolume(volID string, stagingTargetPath s me.CreateTime = time.Now() volumeMountCache.volumes[volID] = me - if err := mc.nodeCacheStore.Create(genVolumeMountCacheFileName(volID), me); err != nil { - return err - } - return nil + return mc.nodeCacheStore.Create(genVolumeMountCacheFileName(volID), me) } func (mc *volumeMountCacheMap) nodeUnStageVolume(volID string) error { @@ -240,10 +241,7 @@ func (mc *volumeMountCacheMap) nodeUnStageVolume(volID string) error { volumeMountCacheMtx.Lock() defer volumeMountCacheMtx.Unlock() delete(volumeMountCache.volumes, volID) - if err := mc.nodeCacheStore.Delete(genVolumeMountCacheFileName(volID)); err != nil { - return err - } - return nil + return mc.nodeCacheStore.Delete(genVolumeMountCacheFileName(volID)) } func (mc *volumeMountCacheMap) nodePublishVolume(volID string, targetPath string, readOnly bool) error { @@ -279,12 +277,9 @@ func (mc *volumeMountCacheMap) nodeUnPublishVolume(volID string, targetPath stri func (mc *volumeMountCacheMap) updateNodeCache(volID string) error { me := volumeMountCache.volumes[volID] if err := volumeMountCache.nodeCacheStore.Delete(genVolumeMountCacheFileName(volID)); err == nil { - klog.Infof("mount-cache: metadata nofound, delete mount cache failed for volume %s", volID) + klog.Infof("mount-cache: metadata notfound, delete mount cache failed for volume %s", volID) } - if err := mc.nodeCacheStore.Create(genVolumeMountCacheFileName(volID), me); err != nil { - return err - } - return nil + return mc.nodeCacheStore.Create(genVolumeMountCacheFileName(volID), me) } func encodeCredentials(input map[string]string) (output map[string]string) { From dfdefe40c9221f2adb7ac77b838cc8d8ed1e163c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E6=80=80=E5=AE=97?= Date: Fri, 29 Mar 2019 16:11:02 +0800 Subject: [PATCH 06/10] add cephfs driver **--mountcachedir** parameter document --- docs/deploy-cephfs.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/deploy-cephfs.md b/docs/deploy-cephfs.md index a88293a14..6266107e7 100644 --- a/docs/deploy-cephfs.md +++ b/docs/deploy-cephfs.md @@ -34,6 +34,7 @@ Option | Default value | Description `--nodeid` | _empty_ | This node's ID `--volumemounter` | _empty_ | default volume mounter. Available options are `kernel` and `fuse`. This is the mount method used if volume parameters don't specify otherwise. If left unspecified, the driver will first probe for `ceph-fuse` in system's path and will choose Ceph kernel client if probing failed. `--metadatastorage` | _empty_ | Whether metadata should be kept on node as file or in a k8s configmap (`node` or `k8s_configmap`) +`--mountcachedir` | _empty_ | volume mount cache info save dir. If left unspecified, the dirver will not record mount info, or it will save mount info and when driver restart it will remount volume it cached. **Available environmental variables:** From 1ccbb5b6a53b00794612e1d1067a507e71454f70 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E6=80=80=E5=AE=97?= Date: Fri, 29 Mar 2019 16:12:09 +0800 Subject: [PATCH 07/10] cephfs driver deploy support remount volume --- deploy/cephfs/helm/templates/nodeplugin-clusterrole.yaml | 3 +++ deploy/cephfs/helm/templates/nodeplugin-daemonset.yaml | 5 +++++ deploy/cephfs/kubernetes/csi-cephfsplugin.yaml | 5 +++++ deploy/cephfs/kubernetes/csi-nodeplugin-rbac.yaml | 3 +++ 4 files changed, 16 insertions(+) diff --git a/deploy/cephfs/helm/templates/nodeplugin-clusterrole.yaml b/deploy/cephfs/helm/templates/nodeplugin-clusterrole.yaml index 290dd3f33..de4aaeaaa 100644 --- a/deploy/cephfs/helm/templates/nodeplugin-clusterrole.yaml +++ b/deploy/cephfs/helm/templates/nodeplugin-clusterrole.yaml @@ -10,6 +10,9 @@ metadata: release: {{ .Release.Name }} heritage: {{ .Release.Service }} rules: + - apiGroups: [""] + resources: ["configmaps"] + verbs: ["get", "list"] - apiGroups: [""] resources: ["nodes"] verbs: ["get", "list", "update"] diff --git a/deploy/cephfs/helm/templates/nodeplugin-daemonset.yaml b/deploy/cephfs/helm/templates/nodeplugin-daemonset.yaml index 9181d6102..c56e70bb4 100644 --- a/deploy/cephfs/helm/templates/nodeplugin-daemonset.yaml +++ b/deploy/cephfs/helm/templates/nodeplugin-daemonset.yaml @@ -70,6 +70,7 @@ spec: - "--v=5" - "--drivername=$(DRIVER_NAME)" - "--metadatastorage=k8s_configmap" + - "--mountcachedir=/mount-cache-dir" env: - name: HOST_ROOTFS value: "/rootfs" @@ -83,6 +84,8 @@ spec: value: "unix:/{{ .Values.socketDir }}/{{ .Values.socketFile }}" imagePullPolicy: {{ .Values.nodeplugin.plugin.image.imagePullPolicy }} volumeMounts: + - name: mount-cache-dir + mountPath: /mount-cache-dir - name: plugin-dir mountPath: {{ .Values.socketDir }} - name: pods-mount-dir @@ -103,6 +106,8 @@ spec: resources: {{ toYaml .Values.nodeplugin.plugin.resources | indent 12 }} volumes: + - name: mount-cache-dir + emptyDir: {} - name: plugin-dir hostPath: path: {{ .Values.socketDir }} diff --git a/deploy/cephfs/kubernetes/csi-cephfsplugin.yaml b/deploy/cephfs/kubernetes/csi-cephfsplugin.yaml index af4322fee..849cf57b3 100644 --- a/deploy/cephfs/kubernetes/csi-cephfsplugin.yaml +++ b/deploy/cephfs/kubernetes/csi-cephfsplugin.yaml @@ -55,6 +55,7 @@ spec: - "--v=5" - "--drivername=cephfs.csi.ceph.com" - "--metadatastorage=k8s_configmap" + - "--mountcachedir=/mount-cache-dir" env: - name: NODE_ID valueFrom: @@ -68,6 +69,8 @@ spec: value: unix:///csi/csi.sock imagePullPolicy: "IfNotPresent" volumeMounts: + - name: mount-cache-dir + mountPath: /mount-cache-dir - name: plugin-dir mountPath: /csi - name: csi-plugins-dir @@ -84,6 +87,8 @@ spec: - name: host-dev mountPath: /dev volumes: + - name: mount-cache-dir + emptyDir: {} - name: plugin-dir hostPath: path: /var/lib/kubelet/plugins/cephfs.csi.ceph.com/ diff --git a/deploy/cephfs/kubernetes/csi-nodeplugin-rbac.yaml b/deploy/cephfs/kubernetes/csi-nodeplugin-rbac.yaml index cc2919b0e..918bdc983 100644 --- a/deploy/cephfs/kubernetes/csi-nodeplugin-rbac.yaml +++ b/deploy/cephfs/kubernetes/csi-nodeplugin-rbac.yaml @@ -10,6 +10,9 @@ apiVersion: rbac.authorization.k8s.io/v1 metadata: name: cephfs-csi-nodeplugin rules: + - apiGroups: [""] + resources: ["configmaps"] + verbs: ["get", "list"] - apiGroups: [""] resources: ["nodes"] verbs: ["get", "list", "update"] From f3e5f83ee06a96d304b3ae15ed6250e9f44fdd18 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E6=80=80=E5=AE=97?= Date: Fri, 29 Mar 2019 16:13:48 +0800 Subject: [PATCH 08/10] mount info cache dir support multi cephfsdriver --- pkg/cephfs/mountcache.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/cephfs/mountcache.go b/pkg/cephfs/mountcache.go index fbc24df54..5a5ade560 100644 --- a/pkg/cephfs/mountcache.go +++ b/pkg/cephfs/mountcache.go @@ -39,7 +39,7 @@ func initVolumeMountCache(driverName string, mountCacheDir string, cachePersiste volumeMountCache.metadataStore = cachePersister volumeMountCache.nodeCacheStore.BasePath = mountCacheDir - volumeMountCache.nodeCacheStore.CacheDir = "" + volumeMountCache.nodeCacheStore.CacheDir = driverName klog.Infof("mount-cache: name: %s, version: %s, mountCacheDir: %s", driverName, version, mountCacheDir) } From 6de862d6cb2ba67a8e47619a2943ae3e169ea953 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E6=80=80=E5=AE=97?= Date: Mon, 1 Apr 2019 21:20:53 +0800 Subject: [PATCH 09/10] code style --- pkg/cephfs/mountcache.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/cephfs/mountcache.go b/pkg/cephfs/mountcache.go index 5a5ade560..60e5fad81 100644 --- a/pkg/cephfs/mountcache.go +++ b/pkg/cephfs/mountcache.go @@ -64,10 +64,10 @@ func remountCachedVolumes() error { if err := mountOneCacheEntry(ce, me); err == nil { remountSuccCount++ volumeMountCache.volumes[me.VolumeID] = *me - klog.Infof("mount-cache: remount volume %s success", volID) + klog.Infof("mount-cache: successfully remounted volume %s", volID) } else { remountFailCount++ - klog.Errorf("mount-cache: remount volume cache %s fail", volID) + klog.Errorf("mount-cache: failed to remount volume %s", volID) } } return nil @@ -77,9 +77,9 @@ func remountCachedVolumes() error { return err } if remountFailCount > 0 { - klog.Infof("mount-cache: success remount %d volumes, fail remount %d volumes", remountSuccCount, remountFailCount) + klog.Infof("mount-cache: successfully remounted %d volumes, failed to remount %d volumes", remountSuccCount, remountFailCount) } else { - klog.Infof("mount-cache: volume cache num %d, all succ remount", remountSuccCount) + klog.Infof("mount-cache: successfully remounted %d volumes", remountSuccCount) } return nil } @@ -157,13 +157,13 @@ func cleanupMountPoint(mountPoint string) error { klog.Infof("mount-cache: corrupted mount point %s, need unmount", mountPoint) err := execCommandErr("umount", mountPoint) if err != nil { - klog.Infof("mount-cache: unmount %s fail %v", mountPoint, err) + klog.Infof("mount-cache: failed to umount %s %v", mountPoint, err) //ignore error return err } } } if _, err := os.Stat(mountPoint); err != nil { - klog.Errorf("mount-cache: mount point %s stat fail %v", mountPoint, err) + klog.Errorf("mount-cache: failed to stat mount point %s %v", mountPoint, err) return err } return nil From 1f1d5f47c3953f7c69b524ba3b2d8ff4023b6761 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E6=80=80=E5=AE=97?= Date: Mon, 1 Apr 2019 23:02:19 +0800 Subject: [PATCH 10/10] code style --- pkg/cephfs/mountcache.go | 4 ++-- pkg/cephfs/mountcache_test.go | 2 +- pkg/cephfs/nodeserver.go | 8 ++++---- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/pkg/cephfs/mountcache.go b/pkg/cephfs/mountcache.go index 60e5fad81..c8bc2a03b 100644 --- a/pkg/cephfs/mountcache.go +++ b/pkg/cephfs/mountcache.go @@ -143,7 +143,7 @@ func mountOneCacheEntry(ce *controllerCacheEntry, me *volumeMountCacheEntry) err klog.Errorf("mount-cache: failed to bind-mount volume %s: %s %s %v %v", volID, me.StagingPath, targetPath, readOnly, err) } else { - klog.Infof("mount-cache: successfully bind-mount volume %s: %s %s %v", + klog.Infof("mount-cache: successfully bind-mounted volume %s: %s %s %v", volID, me.StagingPath, targetPath, readOnly) } } @@ -277,7 +277,7 @@ func (mc *volumeMountCacheMap) nodeUnPublishVolume(volID string, targetPath stri func (mc *volumeMountCacheMap) updateNodeCache(volID string) error { me := volumeMountCache.volumes[volID] if err := volumeMountCache.nodeCacheStore.Delete(genVolumeMountCacheFileName(volID)); err == nil { - klog.Infof("mount-cache: metadata notfound, delete mount cache failed for volume %s", volID) + klog.Infof("mount-cache: metadata not found, delete mount cache failed for volume %s", volID) } return mc.nodeCacheStore.Create(genVolumeMountCacheFileName(volID), me) } diff --git a/pkg/cephfs/mountcache_test.go b/pkg/cephfs/mountcache_test.go index 6bba59c55..e27053cd4 100644 --- a/pkg/cephfs/mountcache_test.go +++ b/pkg/cephfs/mountcache_test.go @@ -32,7 +32,7 @@ func TestEncodeDecodeCredentials(t *testing.T) { deSecrets := decodeCredentials(enSecrets) for key, value := range secrets { if deSecrets[key] != value { - t.Errorf("key %s value %s not equal %s after encode decode", key, value, deSecrets[key]) + t.Errorf("key %s of credentials's value %s change after decode %s ", key, value, deSecrets[key]) } } } diff --git a/pkg/cephfs/nodeserver.go b/pkg/cephfs/nodeserver.go index 56d909ba9..273471e05 100644 --- a/pkg/cephfs/nodeserver.go +++ b/pkg/cephfs/nodeserver.go @@ -155,7 +155,7 @@ func (*NodeServer) mount(volOptions *volumeOptions, req *csi.NodeStageVolumeRequ return status.Error(codes.Internal, err.Error()) } if err := volumeMountCache.nodeStageVolume(req.GetVolumeId(), stagingTargetPath, req.GetSecrets()); err != nil { - klog.Warningf("mount-cache: failed stage volume %s %s: %v", volID, stagingTargetPath, err) + klog.Warningf("mount-cache: failed to stage volume %s %s: %v", volID, stagingTargetPath, err) } return nil } @@ -199,7 +199,7 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis } if err := volumeMountCache.nodePublishVolume(volID, targetPath, req.GetReadonly()); err != nil { - klog.Warningf("mount-cache: failed publish volume %s %s: %v", volID, targetPath, err) + klog.Warningf("mount-cache: failed to publish volume %s %s: %v", volID, targetPath, err) } klog.Infof("cephfs: successfully bind-mounted volume %s to %s", volID, targetPath) @@ -218,7 +218,7 @@ func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu volID := req.GetVolumeId() if err = volumeMountCache.nodeUnPublishVolume(volID, targetPath); err != nil { - klog.Warningf("mount-cache: failed unpublish volume %s %s: %v", volID, targetPath, err) + klog.Warningf("mount-cache: failed to unpublish volume %s %s: %v", volID, targetPath, err) } // Unmount the bind-mount @@ -246,7 +246,7 @@ func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag volID := req.GetVolumeId() if err = volumeMountCache.nodeUnStageVolume(volID); err != nil { - klog.Warningf("mount-cache: failed unstage volume %s %s: %v", volID, stagingTargetPath, err) + klog.Warningf("mount-cache: failed to unstage volume %s %s: %v", volID, stagingTargetPath, err) } // Unmount the volume