From aa6297e164290d67d22f88bc832eb2f72fdad2ad Mon Sep 17 00:00:00 2001 From: Robert Vasek Date: Wed, 2 Feb 2022 14:45:39 +0100 Subject: [PATCH] cleanup: refactor helper functions in nodeserver.go Refactored a couple of helper functions for easier resue. * Code for building store.VolumeOptions is factored out into a separate function. * Changed args of getCredentailsForVolume() and NodeServer.mount() so that instead of passing in whole csi.NodeStageVolumeRequest, only necessary properties are passed explicitly. This is to allow these functions to be called outside of NodeStageVolume() where NodeStageVolumeRequest is not available. Signed-off-by: Robert Vasek --- internal/cephfs/nodeserver.go | 116 ++++++++++++++++++++-------------- 1 file changed, 70 insertions(+), 46 deletions(-) diff --git a/internal/cephfs/nodeserver.go b/internal/cephfs/nodeserver.go index 573a0e543..a6a6cb794 100644 --- a/internal/cephfs/nodeserver.go +++ b/internal/cephfs/nodeserver.go @@ -47,11 +47,10 @@ type NodeServer struct { func getCredentialsForVolume( volOptions *store.VolumeOptions, - req *csi.NodeStageVolumeRequest) (*util.Credentials, error) { + secrets map[string]string) (*util.Credentials, error) { var ( - err error - cr *util.Credentials - secrets = req.GetSecrets() + err error + cr *util.Credentials ) if volOptions.ProvisionVolume { @@ -64,7 +63,7 @@ func getCredentialsForVolume( } else { // The volume is pre-made, credentials are in node stage secrets - cr, err = util.NewUserCredentials(req.GetSecrets()) + cr, err = util.NewUserCredentials(secrets) if err != nil { return nil, fmt.Errorf("failed to get user credentials from node stage secrets: %w", err) } @@ -73,11 +72,38 @@ func getCredentialsForVolume( return cr, nil } +func (ns *NodeServer) getVolumeOptions( + ctx context.Context, + volID fsutil.VolumeID, + volContext, + volSecrets map[string]string, +) (*store.VolumeOptions, error) { + volOptions, _, err := store.NewVolumeOptionsFromVolID(ctx, string(volID), volContext, volSecrets) + if err != nil { + if !errors.Is(err, cerrors.ErrInvalidVolID) { + return nil, status.Error(codes.Internal, err.Error()) + } + + volOptions, _, err = store.NewVolumeOptionsFromStaticVolume(string(volID), volContext) + if err != nil { + if !errors.Is(err, cerrors.ErrNonStaticVolume) { + return nil, status.Error(codes.Internal, err.Error()) + } + + volOptions, _, err = store.NewVolumeOptionsFromMonitorList(string(volID), volContext, volSecrets) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + } + } + + return volOptions, nil +} + // NodeStageVolume mounts the volume to a staging path on the node. func (ns *NodeServer) NodeStageVolume( ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) { - var volOptions *store.VolumeOptions if err := util.ValidateNodeStageVolumeRequest(req); err != nil { return nil, err } @@ -94,29 +120,19 @@ func (ns *NodeServer) NodeStageVolume( } defer ns.VolumeLocks.Release(req.GetVolumeId()) - volOptions, _, err := store.NewVolumeOptionsFromVolID(ctx, string(volID), req.GetVolumeContext(), req.GetSecrets()) + volOptions, err := ns.getVolumeOptions(ctx, volID, req.GetVolumeContext(), req.GetSecrets()) if err != nil { - if !errors.Is(err, cerrors.ErrInvalidVolID) { - return nil, status.Error(codes.Internal, err.Error()) - } - - // gets mon IPs from the supplied cluster info - volOptions, _, err = store.NewVolumeOptionsFromStaticVolume(string(volID), req.GetVolumeContext()) - if err != nil { - if !errors.Is(err, cerrors.ErrNonStaticVolume) { - return nil, status.Error(codes.Internal, err.Error()) - } - - // get mon IPs from the volume context - volOptions, _, err = store.NewVolumeOptionsFromMonitorList(string(volID), req.GetVolumeContext(), - req.GetSecrets()) - if err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } - } + return nil, status.Error(codes.Internal, err.Error()) } defer volOptions.Destroy() + mnt, err := mounter.New(volOptions) + if err != nil { + log.ErrorLog(ctx, "failed to create mounter for volume %s: %v", volID, err) + + return nil, status.Error(codes.Internal, err.Error()) + } + // Check if the volume is already mounted isMnt, err := util.IsMountPoint(stagingTargetPath) @@ -133,7 +149,16 @@ func (ns *NodeServer) NodeStageVolume( } // It's not, mount now - if err = ns.mount(ctx, volOptions, req); err != nil { + + if err = ns.mount( + ctx, + mnt, + volOptions, + fsutil.VolumeID(req.GetVolumeId()), + req.GetStagingTargetPath(), + req.GetSecrets(), + req.GetVolumeCapability(), + ); err != nil { return nil, err } @@ -142,11 +167,16 @@ func (ns *NodeServer) NodeStageVolume( return &csi.NodeStageVolumeResponse{}, nil } -func (*NodeServer) mount(ctx context.Context, volOptions *store.VolumeOptions, req *csi.NodeStageVolumeRequest) error { - stagingTargetPath := req.GetStagingTargetPath() - volID := fsutil.VolumeID(req.GetVolumeId()) - - cr, err := getCredentialsForVolume(volOptions, req) +func (*NodeServer) mount( + ctx context.Context, + mnt mounter.VolumeMounter, + volOptions *store.VolumeOptions, + volID fsutil.VolumeID, + stagingTargetPath string, + secrets map[string]string, + volCap *csi.VolumeCapability, +) error { + cr, err := getCredentialsForVolume(volOptions, secrets) if err != nil { log.ErrorLog(ctx, "failed to get ceph credentials for volume %s: %v", volID, err) @@ -154,20 +184,13 @@ func (*NodeServer) mount(ctx context.Context, volOptions *store.VolumeOptions, r } defer cr.DeleteCredentials() - m, err := mounter.New(volOptions) - if err != nil { - log.ErrorLog(ctx, "failed to create mounter for volume %s: %v", volID, err) - - return status.Error(codes.Internal, err.Error()) - } - - log.DebugLog(ctx, "cephfs: mounting volume %s with %s", volID, m.Name()) + log.DebugLog(ctx, "cephfs: mounting volume %s with %s", volID, mnt.Name()) readOnly := "ro" - if req.VolumeCapability.AccessMode.Mode == csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY || - req.VolumeCapability.AccessMode.Mode == csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY { - switch m.(type) { + if volCap.AccessMode.Mode == csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY || + volCap.AccessMode.Mode == csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY { + switch mnt.(type) { case *mounter.FuseMounter: if !csicommon.MountOptionContains(strings.Split(volOptions.FuseMountOptions, ","), readOnly) { volOptions.FuseMountOptions = util.MountOptionsAdd(volOptions.FuseMountOptions, readOnly) @@ -179,7 +202,7 @@ func (*NodeServer) mount(ctx context.Context, volOptions *store.VolumeOptions, r } } - if err = m.Mount(ctx, stagingTargetPath, cr, volOptions); err != nil { + if err = mnt.Mount(ctx, stagingTargetPath, cr, volOptions); err != nil { log.ErrorLog(ctx, "failed to mount volume %s: %v Check dmesg logs if required.", volID, @@ -201,8 +224,9 @@ func (ns *NodeServer) NodePublishVolume( return nil, err } + stagingTargetPath := req.GetStagingTargetPath() targetPath := req.GetTargetPath() - volID := req.GetVolumeId() + volID := fsutil.VolumeID(req.GetVolumeId()) // Considering kubelet make sure the stage and publish operations // are serialized, we dont need any extra locking in nodePublish @@ -238,8 +262,8 @@ func (ns *NodeServer) NodePublishVolume( if err = mounter.BindMount( ctx, - req.GetStagingTargetPath(), - req.GetTargetPath(), + stagingTargetPath, + targetPath, req.GetReadonly(), mountOptions); err != nil { log.ErrorLog(ctx, "failed to bind-mount volume %s: %v", volID, err)