diff --git a/internal/rbd/nodeserver.go b/internal/rbd/nodeserver.go index b9b41f519..a9023354a 100644 --- a/internal/rbd/nodeserver.go +++ b/internal/rbd/nodeserver.go @@ -149,28 +149,14 @@ func healerStageTransaction(ctx context.Context, cr *util.Credentials, volOps *r return nil } -// NodeStageVolume mounts the volume to a staging path on the node. -// Implementation notes: -// - stagingTargetPath is the directory passed in the request where the volume needs to be staged -// - We stage the volume into a directory, named after the VolumeID inside stagingTargetPath if -// it is a file system -// - We stage the volume into a file, named after the VolumeID inside stagingTargetPath if it is -// a block volume -// - Order of operation execution: (useful for defer stacking and when Unstaging to ensure steps -// are done in reverse, this is done in undoStagingTransaction) -// - Stash image metadata under staging path -// - Map the image (creates a device) -// - Create the staging file/directory under staging path -// - Stage the device (mount the device mapped for image) -// TODO: make this function less complex. -// nolint:gocyclo,cyclop // reduce complexity -func (ns *NodeServer) NodeStageVolume( +// populateRbdVol update the fields in rbdVolume struct based on the request it received. +func populateRbdVol( ctx context.Context, - req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) { - if err := util.ValidateNodeStageVolumeRequest(req); err != nil { - return nil, err - } - + req *csi.NodeStageVolumeRequest, + cr *util.Credentials) (*rbdVolume, error) { + var err error + var j *journal.Connection + volID := req.GetVolumeId() isBlock := req.GetVolumeCapability().GetBlock() != nil disableInUseChecks := false // MULTI_NODE_MULTI_WRITER is supported by default for Block access type volumes @@ -192,6 +178,77 @@ func (ns *NodeServer) NodeStageVolume( disableInUseChecks = true } + rv, err := genVolFromVolumeOptions(ctx, req.GetVolumeContext(), req.GetSecrets(), disableInUseChecks) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + + rv.ThickProvision = isThickProvisionRequest(req.GetVolumeContext()) + isStaticVol := isStaticVolume(req.GetVolumeContext()) + // get rbd image name from the volume journal + // for static volumes, the image name is actually the volume ID itself + if isStaticVol { + rv.RbdImageName = volID + } else { + var vi util.CSIIdentifier + var imageAttributes *journal.ImageAttributes + err = vi.DecomposeCSIID(volID) + if err != nil { + err = fmt.Errorf("error decoding volume ID (%s): %w", volID, err) + + return nil, status.Error(codes.Internal, err.Error()) + } + + j, err = volJournal.Connect(rv.Monitors, rv.RadosNamespace, cr) + if err != nil { + log.ErrorLog(ctx, "failed to establish cluster connection: %v", err) + + return nil, status.Error(codes.Internal, err.Error()) + } + defer j.Destroy() + + imageAttributes, err = j.GetImageAttributes( + ctx, rv.Pool, vi.ObjectUUID, false) + if err != nil { + err = fmt.Errorf("error fetching image attributes for volume ID (%s): %w", volID, err) + + return nil, status.Error(codes.Internal, err.Error()) + } + rv.RbdImageName = imageAttributes.ImageName + } + + rv.VolID = volID + rv.MapOptions = req.GetVolumeContext()["mapOptions"] + rv.UnmapOptions = req.GetVolumeContext()["unmapOptions"] + rv.Mounter = req.GetVolumeContext()["mounter"] + rv.LogDir = req.GetVolumeContext()["cephLogDir"] + if rv.LogDir == "" { + rv.LogDir = defaultLogDir + } + + return rv, err +} + +// NodeStageVolume mounts the volume to a staging path on the node. +// Implementation notes: +// - stagingTargetPath is the directory passed in the request where the volume needs to be staged +// - We stage the volume into a directory, named after the VolumeID inside stagingTargetPath if +// it is a file system +// - We stage the volume into a file, named after the VolumeID inside stagingTargetPath if it is +// a block volume +// - Order of operation execution: (useful for defer stacking and when Unstaging to ensure steps +// are done in reverse, this is done in undoStagingTransaction) +// - Stash image metadata under staging path +// - Map the image (creates a device) +// - Create the staging file/directory under staging path +// - Stage the device (mount the device mapped for image) +func (ns *NodeServer) NodeStageVolume( + ctx context.Context, + req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) { + if err := util.ValidateNodeStageVolumeRequest(req); err != nil { + return nil, err + } + volID := req.GetVolumeId() cr, err := util.NewUserCredentials(req.GetSecrets()) @@ -232,64 +289,21 @@ func (ns *NodeServer) NodeStageVolume( return nil, status.Error(codes.InvalidArgument, "missing required parameter imageFeatures") } - volOptions, err := genVolFromVolumeOptions(ctx, req.GetVolumeContext(), req.GetSecrets(), disableInUseChecks) + rv, err := populateRbdVol(ctx, req, cr) if err != nil { - return nil, status.Error(codes.Internal, err.Error()) + return nil, err } - volOptions.ThickProvision = isThickProvisionRequest(req.GetVolumeContext()) - - // get rbd image name from the volume journal - // for static volumes, the image name is actually the volume ID itself - if isStaticVol { - volOptions.RbdImageName = volID - } else { - var vi util.CSIIdentifier - var imageAttributes *journal.ImageAttributes - err = vi.DecomposeCSIID(volID) - if err != nil { - err = fmt.Errorf("error decoding volume ID (%s): %w", volID, err) - - return nil, status.Error(codes.Internal, err.Error()) - } - - j, connErr := volJournal.Connect(volOptions.Monitors, volOptions.RadosNamespace, cr) - if connErr != nil { - log.ErrorLog(ctx, "failed to establish cluster connection: %v", connErr) - - return nil, status.Error(codes.Internal, connErr.Error()) - } - defer j.Destroy() - - imageAttributes, err = j.GetImageAttributes( - ctx, volOptions.Pool, vi.ObjectUUID, false) - if err != nil { - err = fmt.Errorf("error fetching image attributes for volume ID (%s): %w", volID, err) - - return nil, status.Error(codes.Internal, err.Error()) - } - volOptions.RbdImageName = imageAttributes.ImageName - } - - volOptions.VolID = volID - volOptions.MapOptions = req.GetVolumeContext()["mapOptions"] - volOptions.UnmapOptions = req.GetVolumeContext()["unmapOptions"] - volOptions.Mounter = req.GetVolumeContext()["mounter"] - volOptions.LogDir = req.GetVolumeContext()["cephLogDir"] - if volOptions.LogDir == "" { - volOptions.LogDir = defaultLogDir - } - - err = volOptions.Connect(cr) + err = rv.Connect(cr) if err != nil { - log.ErrorLog(ctx, "failed to connect to volume %s: %v", volOptions, err) + log.ErrorLog(ctx, "failed to connect to volume %s: %v", rv, err) return nil, status.Error(codes.Internal, err.Error()) } - defer volOptions.Destroy() + defer rv.Destroy() if isHealer { - err = healerStageTransaction(ctx, cr, volOptions, stagingParentPath) + err = healerStageTransaction(ctx, cr, rv, stagingParentPath) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } @@ -300,19 +314,19 @@ func (ns *NodeServer) NodeStageVolume( transaction := stageTransaction{} // Stash image details prior to mapping the image (useful during Unstage as it has no // voloptions passed to the RPC as per the CSI spec) - err = stashRBDImageMetadata(volOptions, stagingParentPath) + err = stashRBDImageMetadata(rv, stagingParentPath) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } defer func() { if err != nil { - ns.undoStagingTransaction(ctx, req, transaction, volOptions) + ns.undoStagingTransaction(ctx, req, transaction, rv) } }() // perform the actual staging and if this fails, have undoStagingTransaction // cleans up for us - transaction, err = ns.stageTransaction(ctx, req, volOptions, isStaticVol) + transaction, err = ns.stageTransaction(ctx, req, rv, isStaticVol) if err != nil { return nil, status.Error(codes.Internal, err.Error()) }