From 89732d923ffda1459f83d9f6ee65e6b5454ad230 Mon Sep 17 00:00:00 2001 From: Madhu Rajanna Date: Wed, 14 Aug 2019 11:27:45 +0530 Subject: [PATCH] move flag configuration variable to util remove unwanted checks remove getting drivertype from binary name Signed-off-by: Madhu Rajanna --- cmd/cephcsi.go | 101 +++++++++++++++++---------------------- pkg/cephfs/driver.go | 24 +++++----- pkg/liveness/liveness.go | 10 ++-- pkg/rbd/driver.go | 12 ++--- pkg/util/util.go | 28 +++++++++++ 5 files changed, 95 insertions(+), 80 deletions(-) diff --git a/cmd/cephcsi.go b/cmd/cephcsi.go index 43031b811..3505ceecd 100644 --- a/cmd/cephcsi.go +++ b/cmd/cephcsi.go @@ -19,9 +19,7 @@ package main import ( "flag" "os" - "path" "path/filepath" - "strings" "time" "github.com/ceph/ceph-csi/pkg/cephfs" @@ -42,32 +40,34 @@ const ( ) var ( - // common flags - vtype = flag.String("type", "", "driver type [rbd|cephfs|liveness]") - endpoint = flag.String("endpoint", "unix://tmp/csi.sock", "CSI endpoint") - driverName = flag.String("drivername", "", "name of the driver") - nodeID = flag.String("nodeid", "", "node id") - instanceID = flag.String("instanceid", "", "Unique ID distinguishing this instance of Ceph CSI among other"+ - " instances, when sharing Ceph clusters across CSI instances for provisioning") - metadataStorage = flag.String("metadatastorage", "", "metadata persistence method [node|k8s_configmap]") - pluginPath = flag.String("pluginpath", "/var/lib/kubelet/plugins/", "the location of cephcsi plugin") - pidLimit = flag.Int("pidlimit", 0, "the PID limit to configure through cgroups") - - // rbd related flags - containerized = flag.Bool("containerized", true, "whether run as containerized") - - // cephfs related flags - volumeMounter = flag.String("volumemounter", "", "default volume mounter (possible options are 'kernel', 'fuse')") - mountCacheDir = flag.String("mountcachedir", "", "mount info cache save dir") - - // livenes related flags - livenessport = flag.Int("livenessport", 8080, "TCP port for liveness requests") - livenesspath = flag.String("livenesspath", "/metrics", "path of prometheus endpoint where metrics will be available") - pollTime = flag.Duration("polltime", time.Second*60, "time interval in seconds between each poll") - timeout = flag.Duration("timeout", time.Second*3, "probe timeout in seconds") + conf util.Config ) func init() { + + // common flags + flag.StringVar(&conf.Vtype, "type", "", "driver type [rbd|cephfs|liveness]") + flag.StringVar(&conf.Endpoint, "endpoint", "unix://tmp/csi.sock", "CSI endpoint") + flag.StringVar(&conf.DriverName, "drivername", "", "name of the driver") + flag.StringVar(&conf.NodeID, "nodeid", "", "node id") + flag.StringVar(&conf.InstanceID, "instanceid", "", "Unique ID distinguishing this instance of Ceph CSI among other"+ + " instances, when sharing Ceph clusters across CSI instances for provisioning") + flag.StringVar(&conf.MetadataStorage, "metadatastorage", "", "metadata persistence method [node|k8s_configmap]") + flag.StringVar(&conf.PluginPath, "pluginpath", "/var/lib/kubelet/plugins/", "the location of cephcsi plugin") + flag.IntVar(&conf.PidLimit, "pidlimit", 0, "the PID limit to configure through cgroups") + + // rbd related flags + flag.BoolVar(&conf.Containerized, "containerized", true, "whether run as containerized") + + // cephfs related flags + flag.StringVar(&conf.VolumeMounter, "volumemounter", "", "default volume mounter (possible options are 'kernel', 'fuse')") + flag.StringVar(&conf.MountCacheDir, "mountcachedir", "", "mount info cache save dir") + + // livenes related flags + flag.IntVar(&conf.LivenessPort, "livenessport", 8080, "TCP port for liveness requests") + flag.StringVar(&conf.LivenessPath, "livenesspath", "/metrics", "path of prometheus endpoint where metrics will be available") + flag.DurationVar(&conf.PollTime, "polltime", time.Second*60, "time interval in seconds between each poll") + flag.DurationVar(&conf.PoolTimeout, "timeout", time.Second*3, "probe timeout in seconds") klog.InitFlags(nil) if err := flag.Set("logtostderr", "true"); err != nil { klog.Exitf("failed to set logtostderr flag: %v", err) @@ -75,27 +75,13 @@ func init() { flag.Parse() } -func getType() string { - if vtype == nil || *vtype == "" { - a0 := path.Base(os.Args[0]) - if strings.Contains(a0, rbdType) { - return rbdType - } - if strings.Contains(a0, cephfsType) { - return cephfsType - } - return "" - } - return *vtype -} - func getDriverName() string { // was explicitly passed a driver name - if driverName != nil && *driverName != "" { - return *driverName + if conf.DriverName != "" { + return conf.DriverName } // select driver name based on volume type - switch getType() { + switch conf.Vtype { case rbdType: return rbdDefaultName case cephfsType: @@ -111,8 +97,7 @@ func main() { klog.Infof("Driver version: %s and Git version: %s", util.DriverVersion, util.GitCommit) var cp util.CachePersister - driverType := getType() - if driverType == "" { + if conf.Vtype == "" { klog.Fatalln("driver type not specified") } @@ -121,50 +106,50 @@ func main() { if err != nil { klog.Fatalln(err) // calls exit } - csipluginPath := filepath.Join(*pluginPath, dname) - if *metadataStorage != "" { + csipluginPath := filepath.Join(conf.PluginPath, dname) + if conf.MetadataStorage != "" { cp, err = util.CreatePersistanceStorage( - csipluginPath, *metadataStorage, *pluginPath) + csipluginPath, conf.MetadataStorage, conf.PluginPath) if err != nil { os.Exit(1) } } // the driver may need a higher PID limit for handling all concurrent requests - if pidLimit != nil && *pidLimit != 0 { + if conf.PidLimit != 0 { currentLimit, err := util.GetPIDLimit() if err != nil { klog.Errorf("Failed to get the PID limit, can not reconfigure: %v", err) } else { klog.Infof("Initial PID limit is set to %d", currentLimit) - err = util.SetPIDLimit(*pidLimit) + err = util.SetPIDLimit(conf.PidLimit) if err != nil { - klog.Errorf("Failed to set new PID limit to %d: %v", *pidLimit, err) + klog.Errorf("Failed to set new PID limit to %d: %v", conf.PidLimit, err) } else { s := "" - if *pidLimit == -1 { + if conf.PidLimit == -1 { s = " (max)" } - klog.Infof("Reconfigured PID limit to %d%s", *pidLimit, s) + klog.Infof("Reconfigured PID limit to %d%s", conf.PidLimit, s) } } } - klog.Infof("Starting driver type: %v with name: %v", driverType, dname) - switch driverType { + klog.Infof("Starting driver type: %v with name: %v", conf.Vtype, dname) + switch conf.Vtype { case rbdType: driver := rbd.NewDriver() - driver.Run(dname, *nodeID, *endpoint, *instanceID, *containerized, cp, driverType) + driver.Run(&conf, cp) case cephfsType: driver := cephfs.NewDriver() - driver.Run(dname, *nodeID, *endpoint, *volumeMounter, *mountCacheDir, *instanceID, csipluginPath, cp, driverType) + driver.Run(&conf, cp) case livenessType: - liveness.Run(*endpoint, *livenesspath, *livenessport, *pollTime, *timeout) + liveness.Run(&conf) default: - klog.Fatalln("invalid volume type", vtype) // calls exit + klog.Fatalln("invalid volume type", conf.Vtype) // calls exit } os.Exit(0) diff --git a/pkg/cephfs/driver.go b/pkg/cephfs/driver.go index 9d960394a..2b620b5a7 100644 --- a/pkg/cephfs/driver.go +++ b/pkg/cephfs/driver.go @@ -91,20 +91,20 @@ func NewNodeServer(d *csicommon.CSIDriver, t string) *NodeServer { // Run start a non-blocking grpc controller,node and identityserver for // ceph CSI driver which can serve multiple parallel requests -func (fs *Driver) Run(driverName, nodeID, endpoint, volumeMounter, mountCacheDir, instanceID, pluginPath string, cachePersister util.CachePersister, t string) { +func (fs *Driver) Run(conf *util.Config, cachePersister util.CachePersister) { // Configuration - PluginFolder = pluginPath + PluginFolder = conf.PluginPath if err := loadAvailableMounters(); err != nil { klog.Fatalf("cephfs: failed to load ceph mounters: %v", err) } - if volumeMounter != "" { - if err := validateMounter(volumeMounter); err != nil { + if conf.VolumeMounter != "" { + if err := validateMounter(conf.VolumeMounter); err != nil { klog.Fatalln(err) } else { - DefaultVolumeMounter = volumeMounter + DefaultVolumeMounter = conf.VolumeMounter } } else { // Pick the first available mounter as the default one. @@ -120,8 +120,8 @@ func (fs *Driver) Run(driverName, nodeID, endpoint, volumeMounter, mountCacheDir } // Use passed in instance ID, if provided for omap suffix naming - if instanceID != "" { - CSIInstanceID = instanceID + if conf.InstanceID != "" { + CSIInstanceID = conf.InstanceID } // Get an instance of the volume journal volJournal = util.NewCSIVolumeJournal() @@ -133,8 +133,8 @@ func (fs *Driver) Run(driverName, nodeID, endpoint, volumeMounter, mountCacheDir // metadata pool volJournal.SetNamespace(radosNamespace) - initVolumeMountCache(driverName, mountCacheDir) - if mountCacheDir != "" { + initVolumeMountCache(conf.DriverName, conf.MountCacheDir) + if conf.MountCacheDir != "" { if err := remountCachedVolumes(); err != nil { klog.Warningf("failed to remount cached volumes: %v", err) // ignore remount fail @@ -142,7 +142,7 @@ func (fs *Driver) Run(driverName, nodeID, endpoint, volumeMounter, mountCacheDir } // Initialize default library driver - fs.cd = csicommon.NewCSIDriver(driverName, util.DriverVersion, nodeID) + fs.cd = csicommon.NewCSIDriver(conf.DriverName, util.DriverVersion, conf.NodeID) if fs.cd == nil { klog.Fatalln("failed to initialize CSI driver") } @@ -158,11 +158,11 @@ func (fs *Driver) Run(driverName, nodeID, endpoint, volumeMounter, mountCacheDir // Create gRPC servers fs.is = NewIdentityServer(fs.cd) - fs.ns = NewNodeServer(fs.cd, t) + fs.ns = NewNodeServer(fs.cd, conf.Vtype) fs.cs = NewControllerServer(fs.cd, cachePersister) server := csicommon.NewNonBlockingGRPCServer() - server.Start(endpoint, fs.is, fs.cs, fs.ns) + server.Start(conf.Endpoint, fs.is, fs.cs, fs.ns) server.Wait() } diff --git a/pkg/liveness/liveness.go b/pkg/liveness/liveness.go index 411045a57..5ec873bdc 100644 --- a/pkg/liveness/liveness.go +++ b/pkg/liveness/liveness.go @@ -24,6 +24,8 @@ import ( "strconv" "time" + "github.com/ceph/ceph-csi/pkg/util" + connlib "github.com/kubernetes-csi/csi-lib-utils/connection" "github.com/kubernetes-csi/csi-lib-utils/rpc" "github.com/prometheus/client_golang/prometheus" @@ -83,7 +85,7 @@ func recordLiveness(endpoint string, pollTime, timeout time.Duration) { } } -func Run(endpoint, livenessendpoint string, port int, pollTime, timeout time.Duration) { +func Run(conf *util.Config) { klog.Infof("Liveness Running") ip := os.Getenv("POD_IP") @@ -94,11 +96,11 @@ func Run(endpoint, livenessendpoint string, port int, pollTime, timeout time.Dur } // start liveness collection - go recordLiveness(endpoint, pollTime, timeout) + go recordLiveness(conf.Endpoint, conf.PollTime, conf.PoolTimeout) // start up prometheus endpoint - addr := net.JoinHostPort(ip, strconv.Itoa(port)) - http.Handle(livenessendpoint, promhttp.Handler()) + addr := net.JoinHostPort(ip, strconv.Itoa(conf.LivenessPort)) + http.Handle(conf.LivenessPath, promhttp.Handler()) err := http.ListenAndServe(addr, nil) if err != nil { klog.Fatalln(err) diff --git a/pkg/rbd/driver.go b/pkg/rbd/driver.go index 313561604..361c201fc 100644 --- a/pkg/rbd/driver.go +++ b/pkg/rbd/driver.go @@ -95,7 +95,7 @@ func NewNodeServer(d *csicommon.CSIDriver, containerized bool, t string) (*NodeS // Run start a non-blocking grpc controller,node and identityserver for // rbd CSI driver which can serve multiple parallel requests -func (r *Driver) Run(driverName, nodeID, endpoint, instanceID string, containerized bool, cachePersister util.CachePersister, t string) { +func (r *Driver) Run(conf *util.Config, cachePersister util.CachePersister) { var err error // Create ceph.conf for use with CLI commands @@ -104,8 +104,8 @@ func (r *Driver) Run(driverName, nodeID, endpoint, instanceID string, containeri } // Use passed in instance ID, if provided for omap suffix naming - if instanceID != "" { - CSIInstanceID = instanceID + if conf.InstanceID != "" { + CSIInstanceID = conf.InstanceID } // Get an instance of the volume and snapshot journal keys @@ -117,7 +117,7 @@ func (r *Driver) Run(driverName, nodeID, endpoint, instanceID string, containeri snapJournal.SetCSIDirectorySuffix(CSIInstanceID) // Initialize default library driver - r.cd = csicommon.NewCSIDriver(driverName, util.DriverVersion, nodeID) + r.cd = csicommon.NewCSIDriver(conf.DriverName, util.DriverVersion, conf.NodeID) if r.cd == nil { klog.Fatalln("Failed to initialize CSI Driver.") } @@ -137,7 +137,7 @@ func (r *Driver) Run(driverName, nodeID, endpoint, instanceID string, containeri // Create GRPC servers r.ids = NewIdentityServer(r.cd) - r.ns, err = NewNodeServer(r.cd, containerized, t) + r.ns, err = NewNodeServer(r.cd, conf.Containerized, conf.Vtype) if err != nil { klog.Fatalf("failed to start node server, err %v\n", err) } @@ -145,6 +145,6 @@ func (r *Driver) Run(driverName, nodeID, endpoint, instanceID string, containeri r.cs = NewControllerServer(r.cd, cachePersister) s := csicommon.NewNonBlockingGRPCServer() - s.Start(endpoint, r.ids, r.cs, r.ns) + s.Start(conf.Endpoint, r.ids, r.cs, r.ns) s.Wait() } diff --git a/pkg/util/util.go b/pkg/util/util.go index 187dcf472..58794a50a 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -20,6 +20,7 @@ import ( "os" "path" "strings" + "time" "github.com/pkg/errors" "google.golang.org/grpc/codes" @@ -50,6 +51,33 @@ var ( DriverVersion string ) +// Config holds the parameters list which can be configured +type Config struct { + // common flags + Vtype string // driver type [rbd|cephfs|liveness] + Endpoint string // CSI endpoint + DriverName string // name of the driver + NodeID string // node id + InstanceID string // unique ID distinguishing this instance of Ceph CSI + MetadataStorage string // metadata persistence method [node|k8s_configmap] + PluginPath string // location of cephcsi plugin + PidLimit int // PID limit to configure through cgroups") + + // rbd related flags + Containerized bool // whether run as containerized + + // cephfs related flags + VolumeMounter string // default volume mounter (possible options are 'kernel', 'fuse') + MountCacheDir string // mount info cache save dir + + // livenes related flags + LivenessPort int // TCP port for liveness requests" + LivenessPath string // path of prometheus endpoint where metrics will be available + PollTime time.Duration // time interval in seconds between each poll + PoolTimeout time.Duration // probe timeout in seconds + +} + func roundUpSize(volumeSizeBytes, allocationUnitBytes int64) int64 { roundedUp := volumeSizeBytes / allocationUnitBytes if volumeSizeBytes%allocationUnitBytes > 0 {