mirror of
				https://git.mirrors.martin98.com/https://github.com/ceph/ceph-csi.git
				synced 2025-10-21 04:11:09 +08:00 
			
		
		
		
	 3eeac3d36c
			
		
	
	
		3eeac3d36c
		
	
	
	
	
		
			
			The rbd-driver calls rbd.runVolumeHealer() which is not available outside the rbd package. By moving the rbd-driver into its own package, RunVolumeHealer() needs to be exported. Signed-off-by: Niels de Vos <ndevos@redhat.com>
		
			
				
	
	
		
			208 lines
		
	
	
		
			6.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			208 lines
		
	
	
		
			6.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| /*
 | |
| Copyright 2021 The Ceph-CSI Authors.
 | |
| 
 | |
| Licensed under the Apache License, Version 2.0 (the "License");
 | |
| you may not use this file except in compliance with the License.
 | |
| You may obtain a copy of the License at
 | |
| 
 | |
|     http://www.apache.org/licenses/LICENSE-2.0
 | |
| 
 | |
| Unless required by applicable law or agreed to in writing, software
 | |
| distributed under the License is distributed on an "AS IS" BASIS,
 | |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| See the License for the specific language governing permissions and
 | |
| limitations under the License.
 | |
| */
 | |
| 
 | |
| package rbd
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"sync"
 | |
| 
 | |
| 	"github.com/ceph/ceph-csi/internal/util"
 | |
| 	kubeclient "github.com/ceph/ceph-csi/internal/util/k8s"
 | |
| 	"github.com/ceph/ceph-csi/internal/util/log"
 | |
| 
 | |
| 	"github.com/container-storage-interface/spec/lib/go/csi"
 | |
| 	v1 "k8s.io/api/core/v1"
 | |
| 	apierrors "k8s.io/apimachinery/pkg/api/errors"
 | |
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | |
| 	k8s "k8s.io/client-go/kubernetes"
 | |
| )
 | |
| 
 | |
| const (
 | |
| 	fsTypeBlockName = "block"
 | |
| )
 | |
| 
 | |
| // accessModeStrToInt convert access mode type string to int32.
 | |
| // Make sure to update this function as and when there are new modes introduced.
 | |
| func accessModeStrToInt(mode v1.PersistentVolumeAccessMode) csi.VolumeCapability_AccessMode_Mode {
 | |
| 	switch mode {
 | |
| 	case v1.ReadWriteOnce:
 | |
| 		return csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER
 | |
| 	case v1.ReadOnlyMany:
 | |
| 		return csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY
 | |
| 	case v1.ReadWriteMany:
 | |
| 		return csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER
 | |
| 	case v1.ReadWriteOncePod:
 | |
| 		return csi.VolumeCapability_AccessMode_SINGLE_NODE_SINGLE_WRITER
 | |
| 	}
 | |
| 
 | |
| 	return csi.VolumeCapability_AccessMode_UNKNOWN
 | |
| }
 | |
| 
 | |
| // getSecret get the secret details by name.
 | |
| func getSecret(c *k8s.Clientset, ns, name string) (map[string]string, error) {
 | |
| 	deviceSecret := make(map[string]string)
 | |
| 
 | |
| 	secret, err := c.CoreV1().Secrets(ns).Get(context.TODO(), name, metav1.GetOptions{})
 | |
| 	if err != nil {
 | |
| 		log.ErrorLogMsg("get secret failed, err: %v", err)
 | |
| 
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	for k, v := range secret.Data {
 | |
| 		deviceSecret[k] = string(v)
 | |
| 	}
 | |
| 
 | |
| 	return deviceSecret, nil
 | |
| }
 | |
| 
 | |
| func callNodeStageVolume(ns *NodeServer, c *k8s.Clientset, pv *v1.PersistentVolume, stagingPath string) error {
 | |
| 	publishContext := make(map[string]string)
 | |
| 
 | |
| 	volID := pv.Spec.PersistentVolumeSource.CSI.VolumeHandle
 | |
| 	stagingParentPath := stagingPath + pv.Name + "/globalmount"
 | |
| 
 | |
| 	log.DefaultLog("sending nodeStageVolume for volID: %s, stagingPath: %s",
 | |
| 		volID, stagingParentPath)
 | |
| 
 | |
| 	deviceSecret, err := getSecret(c,
 | |
| 		pv.Spec.PersistentVolumeSource.CSI.NodeStageSecretRef.Namespace,
 | |
| 		pv.Spec.PersistentVolumeSource.CSI.NodeStageSecretRef.Name)
 | |
| 	if err != nil {
 | |
| 		log.ErrorLogMsg("getSecret failed for volID: %s, err: %v", volID, err)
 | |
| 
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	volumeContext := pv.Spec.PersistentVolumeSource.CSI.VolumeAttributes
 | |
| 	volumeContext["volumeHealerContext"] = "true"
 | |
| 
 | |
| 	req := &csi.NodeStageVolumeRequest{
 | |
| 		VolumeId:          volID,
 | |
| 		PublishContext:    publishContext,
 | |
| 		StagingTargetPath: stagingParentPath,
 | |
| 		VolumeCapability: &csi.VolumeCapability{
 | |
| 			AccessMode: &csi.VolumeCapability_AccessMode{
 | |
| 				Mode: accessModeStrToInt(pv.Spec.AccessModes[0]),
 | |
| 			},
 | |
| 		},
 | |
| 		Secrets:       deviceSecret,
 | |
| 		VolumeContext: volumeContext,
 | |
| 	}
 | |
| 	if pv.Spec.PersistentVolumeSource.CSI.FSType == fsTypeBlockName {
 | |
| 		req.VolumeCapability.AccessType = &csi.VolumeCapability_Block{
 | |
| 			Block: &csi.VolumeCapability_BlockVolume{},
 | |
| 		}
 | |
| 	} else {
 | |
| 		req.VolumeCapability.AccessType = &csi.VolumeCapability_Mount{
 | |
| 			Mount: &csi.VolumeCapability_MountVolume{
 | |
| 				FsType:     pv.Spec.PersistentVolumeSource.CSI.FSType,
 | |
| 				MountFlags: pv.Spec.MountOptions,
 | |
| 			},
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	_, err = ns.NodeStageVolume(context.TODO(), req)
 | |
| 	if err != nil {
 | |
| 		log.ErrorLogMsg("nodeStageVolume request failed, volID: %s, stagingPath: %s, err: %v",
 | |
| 			volID, stagingParentPath, err)
 | |
| 
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // RunVolumeHealer heal the volumes attached on a node.
 | |
| func RunVolumeHealer(ns *NodeServer, conf *util.Config) error {
 | |
| 	c, err := kubeclient.NewK8sClient()
 | |
| 	if err != nil {
 | |
| 		log.ErrorLogMsg("failed to connect to Kubernetes: %v", err)
 | |
| 
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	val, err := c.StorageV1().VolumeAttachments().List(context.TODO(), metav1.ListOptions{})
 | |
| 	if err != nil {
 | |
| 		log.ErrorLogMsg("list volumeAttachments failed, err: %v", err)
 | |
| 
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	var wg sync.WaitGroup
 | |
| 	channel := make(chan error)
 | |
| 	for i := range val.Items {
 | |
| 		// skip if the volumeattachments doesn't belong to current node or driver
 | |
| 		if val.Items[i].Spec.NodeName != conf.NodeID || val.Items[i].Spec.Attacher != conf.DriverName {
 | |
| 			continue
 | |
| 		}
 | |
| 		pvName := *val.Items[i].Spec.Source.PersistentVolumeName
 | |
| 		pv, err := c.CoreV1().PersistentVolumes().Get(context.TODO(), pvName, metav1.GetOptions{})
 | |
| 		if err != nil {
 | |
| 			// skip if volume doesn't exist
 | |
| 			if !apierrors.IsNotFound(err) {
 | |
| 				log.ErrorLogMsg("get persistentVolumes failed for pv: %s, err: %v", pvName, err)
 | |
| 			}
 | |
| 
 | |
| 			continue
 | |
| 		}
 | |
| 		// skip this volumeattachment if its pv is not bound or marked for deletion
 | |
| 		if pv.Status.Phase != v1.VolumeBound || pv.DeletionTimestamp != nil {
 | |
| 			continue
 | |
| 		}
 | |
| 		// skip if mounter is not rbd-nbd
 | |
| 		if pv.Spec.PersistentVolumeSource.CSI.VolumeAttributes["mounter"] != "rbd-nbd" {
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		// ensure that the volume is still in attached state
 | |
| 		va, err := c.StorageV1().VolumeAttachments().Get(context.TODO(), val.Items[i].Name, metav1.GetOptions{})
 | |
| 		if err != nil {
 | |
| 			// skip if volume attachment doesn't exist
 | |
| 			if !apierrors.IsNotFound(err) {
 | |
| 				log.ErrorLogMsg("get volumeAttachments failed for volumeAttachment: %s, volID: %s, err: %v",
 | |
| 					val.Items[i].Name, pv.Spec.PersistentVolumeSource.CSI.VolumeHandle, err)
 | |
| 			}
 | |
| 
 | |
| 			continue
 | |
| 		}
 | |
| 		if !va.Status.Attached {
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		wg.Add(1)
 | |
| 		// run multiple NodeStageVolume calls concurrently
 | |
| 		go func(wg *sync.WaitGroup, ns *NodeServer, c *k8s.Clientset, pv *v1.PersistentVolume, stagingPath string) {
 | |
| 			defer wg.Done()
 | |
| 			channel <- callNodeStageVolume(ns, c, pv, stagingPath)
 | |
| 		}(&wg, ns, c, pv, conf.StagingPath)
 | |
| 	}
 | |
| 
 | |
| 	go func() {
 | |
| 		wg.Wait()
 | |
| 		close(channel)
 | |
| 	}()
 | |
| 
 | |
| 	for s := range channel {
 | |
| 		if s != nil {
 | |
| 			log.ErrorLogMsg("callNodeStageVolume failed, err: %v", s)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 |