diff --git a/cmd/cephcsi.go b/cmd/cephcsi.go index 0e35961f4..09f8cef35 100644 --- a/cmd/cephcsi.go +++ b/cmd/cephcsi.go @@ -121,11 +121,11 @@ func main() { switch driverType { case rbdType: driver := rbd.NewDriver() - driver.Run(dname, *nodeID, *endpoint, *instanceID, *containerized, cp) + driver.Run(dname, *nodeID, *endpoint, *instanceID, *containerized, cp, driverType) case cephfsType: driver := cephfs.NewDriver() - driver.Run(dname, *nodeID, *endpoint, *volumeMounter, *mountCacheDir, *instanceID, csipluginPath, cp) + driver.Run(dname, *nodeID, *endpoint, *volumeMounter, *mountCacheDir, *instanceID, csipluginPath, cp, driverType) default: klog.Fatalln("invalid volume type", vtype) // calls exit diff --git a/pkg/cephfs/driver.go b/pkg/cephfs/driver.go index d49d3a726..9d960394a 100644 --- a/pkg/cephfs/driver.go +++ b/pkg/cephfs/driver.go @@ -83,15 +83,15 @@ func NewControllerServer(d *csicommon.CSIDriver, cachePersister util.CachePersis } // NewNodeServer initialize a node server for ceph CSI driver. -func NewNodeServer(d *csicommon.CSIDriver) *NodeServer { +func NewNodeServer(d *csicommon.CSIDriver, t string) *NodeServer { return &NodeServer{ - DefaultNodeServer: csicommon.NewDefaultNodeServer(d), + DefaultNodeServer: csicommon.NewDefaultNodeServer(d, t), } } // Run start a non-blocking grpc controller,node and identityserver for // ceph CSI driver which can serve multiple parallel requests -func (fs *Driver) Run(driverName, nodeID, endpoint, volumeMounter, mountCacheDir, instanceID, pluginPath string, cachePersister util.CachePersister) { +func (fs *Driver) Run(driverName, nodeID, endpoint, volumeMounter, mountCacheDir, instanceID, pluginPath string, cachePersister util.CachePersister, t string) { // Configuration PluginFolder = pluginPath @@ -158,7 +158,7 @@ func (fs *Driver) Run(driverName, nodeID, endpoint, volumeMounter, mountCacheDir // Create gRPC servers fs.is = NewIdentityServer(fs.cd) - fs.ns = NewNodeServer(fs.cd) + fs.ns = NewNodeServer(fs.cd, t) fs.cs = NewControllerServer(fs.cd, cachePersister) diff --git a/pkg/cephfs/nodeserver.go b/pkg/cephfs/nodeserver.go index 8c75f5257..eedd49040 100644 --- a/pkg/cephfs/nodeserver.go +++ b/pkg/cephfs/nodeserver.go @@ -23,13 +23,11 @@ import ( csicommon "github.com/ceph/ceph-csi/pkg/csi-common" "github.com/ceph/ceph-csi/pkg/util" - csipbv1 "github.com/container-storage-interface/spec/lib/go/csi" "github.com/container-storage-interface/spec/lib/go/csi" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "k8s.io/klog" - "k8s.io/kubernetes/pkg/volume" ) // NodeServer struct of ceph CSI driver with supported methods of CSI @@ -289,92 +287,6 @@ func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag return &csi.NodeUnstageVolumeResponse{}, nil } -func (ns *NodeServer) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) { - - var err error - targetPath := req.GetVolumePath() - if targetPath == "" { - err = fmt.Errorf("targetpath %v is empty", targetPath) - return nil, status.Error(codes.InvalidArgument, err.Error()) - } - /* - volID := req.GetVolumeId() - - TODO: Map the volumeID to the targetpath. - - we need secret to connect to the ceph cluster to get the volumeID from volume - Name, however `secret` field/option is not available in NodeGetVolumeStats spec, - Below issue covers this request and once its available, we can do the validation - as per the spec. - https://github.com/container-storage-interface/spec/issues/371 - - */ - - isMnt, err := util.IsMountPoint(targetPath) - - if err != nil { - if os.IsNotExist(err) { - return nil, status.Errorf(codes.InvalidArgument, "targetpath %s doesnot exist", targetPath) - } - return nil, err - } - if !isMnt { - return nil, status.Errorf(codes.InvalidArgument, "targetpath %s is not mounted", targetPath) - } - - cephfsProvider := volume.NewMetricsStatFS(targetPath) - volMetrics, volMetErr := cephfsProvider.GetMetrics() - if volMetErr != nil { - return nil, status.Error(codes.Internal, volMetErr.Error()) - } - - available, ok := (*(volMetrics.Available)).AsInt64() - if !ok { - klog.Errorf("failed to fetch available bytes") - } - capacity, ok := (*(volMetrics.Capacity)).AsInt64() - if !ok { - klog.Errorf("failed to fetch capacity bytes") - return nil, status.Error(codes.Unknown, "failed to fetch capacity bytes") - } - used, ok := (*(volMetrics.Used)).AsInt64() - if !ok { - klog.Errorf("failed to fetch used bytes") - } - inodes, ok := (*(volMetrics.Inodes)).AsInt64() - if !ok { - klog.Errorf("failed to fetch available inodes") - return nil, status.Error(codes.Unknown, "failed to fetch available inodes") - - } - inodesFree, ok := (*(volMetrics.InodesFree)).AsInt64() - if !ok { - klog.Errorf("failed to fetch free inodes") - } - - inodesUsed, ok := (*(volMetrics.InodesUsed)).AsInt64() - if !ok { - klog.Errorf("failed to fetch used inodes") - } - return &csi.NodeGetVolumeStatsResponse{ - Usage: []*csi.VolumeUsage{ - { - Available: available, - Total: capacity, - Used: used, - Unit: csipbv1.VolumeUsage_BYTES, - }, - { - Available: inodesFree, - Total: inodes, - Used: inodesUsed, - Unit: csipbv1.VolumeUsage_INODES, - }, - }, - }, nil - -} - // NodeGetCapabilities returns the supported capabilities of the node server func (ns *NodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) { return &csi.NodeGetCapabilitiesResponse{ diff --git a/pkg/csi-common/nodeserver-default.go b/pkg/csi-common/nodeserver-default.go index 20141487b..79128f133 100644 --- a/pkg/csi-common/nodeserver-default.go +++ b/pkg/csi-common/nodeserver-default.go @@ -17,16 +17,24 @@ limitations under the License. package csicommon import ( + "fmt" + "os" + + "github.com/ceph/ceph-csi/pkg/util" + "github.com/container-storage-interface/spec/lib/go/csi" + csipbv1 "github.com/container-storage-interface/spec/lib/go/csi" "golang.org/x/net/context" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "k8s.io/klog" + "k8s.io/kubernetes/pkg/volume" ) // DefaultNodeServer stores driver object type DefaultNodeServer struct { Driver *CSIDriver + Type string } // NodeStageVolume returns unimplemented response @@ -71,6 +79,96 @@ func (ns *DefaultNodeServer) NodeGetCapabilities(ctx context.Context, req *csi.N } // NodeGetVolumeStats returns volume stats -func (ns *DefaultNodeServer) NodeGetVolumeStats(ctx context.Context, in *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) { - return nil, status.Error(codes.Unimplemented, "") +func (ns *DefaultNodeServer) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) { + + var err error + targetPath := req.GetVolumePath() + if targetPath == "" { + err = fmt.Errorf("targetpath %v is empty", targetPath) + return nil, status.Error(codes.InvalidArgument, err.Error()) + } + /* + volID := req.GetVolumeId() + + TODO: Map the volumeID to the targetpath. + + CephFS: + we need secret to connect to the ceph cluster to get the volumeID from volume + Name, however `secret` field/option is not available in NodeGetVolumeStats spec, + Below issue covers this request and once its available, we can do the validation + as per the spec. + + https://github.com/container-storage-interface/spec/issues/371 + + RBD: + Below issue covers this request for RBD and once its available, we can do the validation + as per the spec. + + https://github.com/ceph/ceph-csi/issues/511 + + */ + + isMnt, err := util.IsMountPoint(targetPath) + + if err != nil { + if os.IsNotExist(err) { + return nil, status.Errorf(codes.InvalidArgument, "targetpath %s doesnot exist", targetPath) + } + return nil, err + } + if !isMnt { + return nil, status.Errorf(codes.InvalidArgument, "targetpath %s is not mounted", targetPath) + } + + cephMetricsProvider := volume.NewMetricsStatFS(targetPath) + volMetrics, volMetErr := cephMetricsProvider.GetMetrics() + if volMetErr != nil { + return nil, status.Error(codes.Internal, volMetErr.Error()) + } + + available, ok := (*(volMetrics.Available)).AsInt64() + if !ok { + klog.Errorf("failed to fetch available bytes") + } + capacity, ok := (*(volMetrics.Capacity)).AsInt64() + if !ok { + klog.Errorf("failed to fetch capacity bytes") + return nil, status.Error(codes.Unknown, "failed to fetch capacity bytes") + } + used, ok := (*(volMetrics.Used)).AsInt64() + if !ok { + klog.Errorf("failed to fetch used bytes") + } + inodes, ok := (*(volMetrics.Inodes)).AsInt64() + if !ok { + klog.Errorf("failed to fetch available inodes") + return nil, status.Error(codes.Unknown, "failed to fetch available inodes") + + } + inodesFree, ok := (*(volMetrics.InodesFree)).AsInt64() + if !ok { + klog.Errorf("failed to fetch free inodes") + } + + inodesUsed, ok := (*(volMetrics.InodesUsed)).AsInt64() + if !ok { + klog.Errorf("failed to fetch used inodes") + } + return &csi.NodeGetVolumeStatsResponse{ + Usage: []*csi.VolumeUsage{ + { + Available: available, + Total: capacity, + Used: used, + Unit: csipbv1.VolumeUsage_BYTES, + }, + { + Available: inodesFree, + Total: inodes, + Used: inodesUsed, + Unit: csipbv1.VolumeUsage_INODES, + }, + }, + }, nil + } diff --git a/pkg/csi-common/utils.go b/pkg/csi-common/utils.go index be643633b..28b4fc8c4 100644 --- a/pkg/csi-common/utils.go +++ b/pkg/csi-common/utils.go @@ -46,9 +46,10 @@ func NewVolumeCapabilityAccessMode(mode csi.VolumeCapability_AccessMode_Mode) *c } // NewDefaultNodeServer initializes default node server -func NewDefaultNodeServer(d *CSIDriver) *DefaultNodeServer { +func NewDefaultNodeServer(d *CSIDriver, t string) *DefaultNodeServer { return &DefaultNodeServer{ Driver: d, + Type: t, } } diff --git a/pkg/rbd/nodeserver.go b/pkg/rbd/nodeserver.go index a0b9ced33..d40c931b1 100644 --- a/pkg/rbd/nodeserver.go +++ b/pkg/rbd/nodeserver.go @@ -511,6 +511,13 @@ func (ns *NodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetC }, }, }, + { + Type: &csi.NodeServiceCapability_Rpc{ + Rpc: &csi.NodeServiceCapability_RPC{ + Type: csi.NodeServiceCapability_RPC_GET_VOLUME_STATS, + }, + }, + }, }, }, nil } diff --git a/pkg/rbd/rbd.go b/pkg/rbd/rbd.go index a3612413e..313561604 100644 --- a/pkg/rbd/rbd.go +++ b/pkg/rbd/rbd.go @@ -78,7 +78,7 @@ func NewControllerServer(d *csicommon.CSIDriver, cachePersister util.CachePersis } // NewNodeServer initialize a node server for rbd CSI driver. -func NewNodeServer(d *csicommon.CSIDriver, containerized bool) (*NodeServer, error) { +func NewNodeServer(d *csicommon.CSIDriver, containerized bool, t string) (*NodeServer, error) { mounter := mount.New("") if containerized { ne, err := nsenter.NewNsenter(nsenter.DefaultHostRootFsPath, exec.New()) @@ -88,14 +88,14 @@ func NewNodeServer(d *csicommon.CSIDriver, containerized bool) (*NodeServer, err mounter = nsutil.NewMounter("", ne) } return &NodeServer{ - DefaultNodeServer: csicommon.NewDefaultNodeServer(d), + DefaultNodeServer: csicommon.NewDefaultNodeServer(d, t), mounter: mounter, }, nil } // Run start a non-blocking grpc controller,node and identityserver for // rbd CSI driver which can serve multiple parallel requests -func (r *Driver) Run(driverName, nodeID, endpoint, instanceID string, containerized bool, cachePersister util.CachePersister) { +func (r *Driver) Run(driverName, nodeID, endpoint, instanceID string, containerized bool, cachePersister util.CachePersister, t string) { var err error // Create ceph.conf for use with CLI commands @@ -137,7 +137,7 @@ func (r *Driver) Run(driverName, nodeID, endpoint, instanceID string, containeri // Create GRPC servers r.ids = NewIdentityServer(r.cd) - r.ns, err = NewNodeServer(r.cd, containerized) + r.ns, err = NewNodeServer(r.cd, containerized, t) if err != nil { klog.Fatalf("failed to start node server, err %v\n", err) }