From 3f31ca8a3ae80b3f09e9660c154e225d57364c07 Mon Sep 17 00:00:00 2001 From: Humble Chirammal Date: Thu, 2 Sep 2021 13:50:26 +0530 Subject: [PATCH] cleanup: introduce populateVolOptions(), to fill rbdVol from stage req At present the nodeStageVolume() handle many logic of filling rbdvol struct based on the request received and this method is complex to follow. with this patch, filling or populating volOptions has been segregrated and handled hence make the stage functions' job easy. Signed-off-by: Humble Chirammal --- internal/rbd/nodeserver.go | 160 ++++++++++++++++++++----------------- 1 file changed, 87 insertions(+), 73 deletions(-) diff --git a/internal/rbd/nodeserver.go b/internal/rbd/nodeserver.go index b9b41f519..a9023354a 100644 --- a/internal/rbd/nodeserver.go +++ b/internal/rbd/nodeserver.go @@ -149,28 +149,14 @@ func healerStageTransaction(ctx context.Context, cr *util.Credentials, volOps *r return nil } -// NodeStageVolume mounts the volume to a staging path on the node. -// Implementation notes: -// - stagingTargetPath is the directory passed in the request where the volume needs to be staged -// - We stage the volume into a directory, named after the VolumeID inside stagingTargetPath if -// it is a file system -// - We stage the volume into a file, named after the VolumeID inside stagingTargetPath if it is -// a block volume -// - Order of operation execution: (useful for defer stacking and when Unstaging to ensure steps -// are done in reverse, this is done in undoStagingTransaction) -// - Stash image metadata under staging path -// - Map the image (creates a device) -// - Create the staging file/directory under staging path -// - Stage the device (mount the device mapped for image) -// TODO: make this function less complex. -// nolint:gocyclo,cyclop // reduce complexity -func (ns *NodeServer) NodeStageVolume( +// populateRbdVol update the fields in rbdVolume struct based on the request it received. +func populateRbdVol( ctx context.Context, - req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) { - if err := util.ValidateNodeStageVolumeRequest(req); err != nil { - return nil, err - } - + req *csi.NodeStageVolumeRequest, + cr *util.Credentials) (*rbdVolume, error) { + var err error + var j *journal.Connection + volID := req.GetVolumeId() isBlock := req.GetVolumeCapability().GetBlock() != nil disableInUseChecks := false // MULTI_NODE_MULTI_WRITER is supported by default for Block access type volumes @@ -192,6 +178,77 @@ func (ns *NodeServer) NodeStageVolume( disableInUseChecks = true } + rv, err := genVolFromVolumeOptions(ctx, req.GetVolumeContext(), req.GetSecrets(), disableInUseChecks) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + + rv.ThickProvision = isThickProvisionRequest(req.GetVolumeContext()) + isStaticVol := isStaticVolume(req.GetVolumeContext()) + // get rbd image name from the volume journal + // for static volumes, the image name is actually the volume ID itself + if isStaticVol { + rv.RbdImageName = volID + } else { + var vi util.CSIIdentifier + var imageAttributes *journal.ImageAttributes + err = vi.DecomposeCSIID(volID) + if err != nil { + err = fmt.Errorf("error decoding volume ID (%s): %w", volID, err) + + return nil, status.Error(codes.Internal, err.Error()) + } + + j, err = volJournal.Connect(rv.Monitors, rv.RadosNamespace, cr) + if err != nil { + log.ErrorLog(ctx, "failed to establish cluster connection: %v", err) + + return nil, status.Error(codes.Internal, err.Error()) + } + defer j.Destroy() + + imageAttributes, err = j.GetImageAttributes( + ctx, rv.Pool, vi.ObjectUUID, false) + if err != nil { + err = fmt.Errorf("error fetching image attributes for volume ID (%s): %w", volID, err) + + return nil, status.Error(codes.Internal, err.Error()) + } + rv.RbdImageName = imageAttributes.ImageName + } + + rv.VolID = volID + rv.MapOptions = req.GetVolumeContext()["mapOptions"] + rv.UnmapOptions = req.GetVolumeContext()["unmapOptions"] + rv.Mounter = req.GetVolumeContext()["mounter"] + rv.LogDir = req.GetVolumeContext()["cephLogDir"] + if rv.LogDir == "" { + rv.LogDir = defaultLogDir + } + + return rv, err +} + +// NodeStageVolume mounts the volume to a staging path on the node. +// Implementation notes: +// - stagingTargetPath is the directory passed in the request where the volume needs to be staged +// - We stage the volume into a directory, named after the VolumeID inside stagingTargetPath if +// it is a file system +// - We stage the volume into a file, named after the VolumeID inside stagingTargetPath if it is +// a block volume +// - Order of operation execution: (useful for defer stacking and when Unstaging to ensure steps +// are done in reverse, this is done in undoStagingTransaction) +// - Stash image metadata under staging path +// - Map the image (creates a device) +// - Create the staging file/directory under staging path +// - Stage the device (mount the device mapped for image) +func (ns *NodeServer) NodeStageVolume( + ctx context.Context, + req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) { + if err := util.ValidateNodeStageVolumeRequest(req); err != nil { + return nil, err + } + volID := req.GetVolumeId() cr, err := util.NewUserCredentials(req.GetSecrets()) @@ -232,64 +289,21 @@ func (ns *NodeServer) NodeStageVolume( return nil, status.Error(codes.InvalidArgument, "missing required parameter imageFeatures") } - volOptions, err := genVolFromVolumeOptions(ctx, req.GetVolumeContext(), req.GetSecrets(), disableInUseChecks) + rv, err := populateRbdVol(ctx, req, cr) if err != nil { - return nil, status.Error(codes.Internal, err.Error()) + return nil, err } - volOptions.ThickProvision = isThickProvisionRequest(req.GetVolumeContext()) - - // get rbd image name from the volume journal - // for static volumes, the image name is actually the volume ID itself - if isStaticVol { - volOptions.RbdImageName = volID - } else { - var vi util.CSIIdentifier - var imageAttributes *journal.ImageAttributes - err = vi.DecomposeCSIID(volID) - if err != nil { - err = fmt.Errorf("error decoding volume ID (%s): %w", volID, err) - - return nil, status.Error(codes.Internal, err.Error()) - } - - j, connErr := volJournal.Connect(volOptions.Monitors, volOptions.RadosNamespace, cr) - if connErr != nil { - log.ErrorLog(ctx, "failed to establish cluster connection: %v", connErr) - - return nil, status.Error(codes.Internal, connErr.Error()) - } - defer j.Destroy() - - imageAttributes, err = j.GetImageAttributes( - ctx, volOptions.Pool, vi.ObjectUUID, false) - if err != nil { - err = fmt.Errorf("error fetching image attributes for volume ID (%s): %w", volID, err) - - return nil, status.Error(codes.Internal, err.Error()) - } - volOptions.RbdImageName = imageAttributes.ImageName - } - - volOptions.VolID = volID - volOptions.MapOptions = req.GetVolumeContext()["mapOptions"] - volOptions.UnmapOptions = req.GetVolumeContext()["unmapOptions"] - volOptions.Mounter = req.GetVolumeContext()["mounter"] - volOptions.LogDir = req.GetVolumeContext()["cephLogDir"] - if volOptions.LogDir == "" { - volOptions.LogDir = defaultLogDir - } - - err = volOptions.Connect(cr) + err = rv.Connect(cr) if err != nil { - log.ErrorLog(ctx, "failed to connect to volume %s: %v", volOptions, err) + log.ErrorLog(ctx, "failed to connect to volume %s: %v", rv, err) return nil, status.Error(codes.Internal, err.Error()) } - defer volOptions.Destroy() + defer rv.Destroy() if isHealer { - err = healerStageTransaction(ctx, cr, volOptions, stagingParentPath) + err = healerStageTransaction(ctx, cr, rv, stagingParentPath) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } @@ -300,19 +314,19 @@ func (ns *NodeServer) NodeStageVolume( transaction := stageTransaction{} // Stash image details prior to mapping the image (useful during Unstage as it has no // voloptions passed to the RPC as per the CSI spec) - err = stashRBDImageMetadata(volOptions, stagingParentPath) + err = stashRBDImageMetadata(rv, stagingParentPath) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } defer func() { if err != nil { - ns.undoStagingTransaction(ctx, req, transaction, volOptions) + ns.undoStagingTransaction(ctx, req, transaction, rv) } }() // perform the actual staging and if this fails, have undoStagingTransaction // cleans up for us - transaction, err = ns.stageTransaction(ctx, req, volOptions, isStaticVol) + transaction, err = ns.stageTransaction(ctx, req, rv, isStaticVol) if err != nil { return nil, status.Error(codes.Internal, err.Error()) }