diff --git a/internal/rbd/controllerserver.go b/internal/rbd/controllerserver.go index a13365777..3a41e9792 100644 --- a/internal/rbd/controllerserver.go +++ b/internal/rbd/controllerserver.go @@ -28,7 +28,6 @@ import ( librbd "github.com/ceph/go-ceph/rbd" "github.com/container-storage-interface/spec/lib/go/csi" - "github.com/kube-storage/spec/lib/go/replication" "github.com/kubernetes-csi/csi-lib-utils/protosanitizer" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -42,11 +41,6 @@ const ( // controller server spec. type ControllerServer struct { *csicommon.DefaultControllerServer - // added UnimplementedControllerServer as a member of - // ControllerServer. if replication spec add more RPC services in the proto - // file, then we don't need to add all RPC methods leading to forward - // compatibility. - *replication.UnimplementedControllerServer // A map storing all volumes with ongoing operations so that additional operations // for that same volume (as defined by VolumeID/volume name) return an Aborted error VolumeLocks *util.VolumeLocks diff --git a/internal/rbd/driver.go b/internal/rbd/driver.go index 1414e0494..4f43ce99d 100644 --- a/internal/rbd/driver.go +++ b/internal/rbd/driver.go @@ -37,6 +37,7 @@ type Driver struct { ids *IdentityServer ns *NodeServer cs *ControllerServer + rs *ReplicationServer } var ( @@ -81,6 +82,10 @@ func NewControllerServer(d *csicommon.CSIDriver) *ControllerServer { } } +func NewReplicationServer(c *ControllerServer) *ReplicationServer { + return &ReplicationServer{ControllerServer: c} +} + // NewNodeServer initialize a node server for rbd CSI driver. func NewNodeServer(d *csicommon.CSIDriver, t string, topology map[string]string) (*NodeServer, error) { mounter := mount.New("") @@ -154,6 +159,7 @@ func (r *Driver) Run(conf *util.Config) { if conf.IsControllerServer { r.cs = NewControllerServer(r.cd) + r.rs = NewReplicationServer(r.cs) } if !conf.IsControllerServer && !conf.IsNodeServer { topology, err = util.GetTopologyFromDomainLabels(conf.DomainLabels, conf.NodeID, conf.DriverName) @@ -172,8 +178,9 @@ func (r *Driver) Run(conf *util.Config) { IS: r.ids, CS: r.cs, NS: r.ns, - // Register the replication controller to expose replication operations. - RS: r.cs, + // Register the replication controller to expose replication + // operations. + RS: r.rs, } s.Start(conf.Endpoint, conf.HistogramOption, srv, conf.EnableGRPCMetrics) if conf.EnableGRPCMetrics { diff --git a/internal/rbd/replicationcontrollerserver.go b/internal/rbd/replicationcontrollerserver.go index 9645aaef8..bbe4fc9d2 100644 --- a/internal/rbd/replicationcontrollerserver.go +++ b/internal/rbd/replicationcontrollerserver.go @@ -59,37 +59,16 @@ const ( forceKey = "force" ) -// getVolumeFromID gets the rbd image details from the volumeID. -// TODO: move this to controllerserver.go and reuse it wherever its applicable. -func (cs *ControllerServer) getVolumeFromID(ctx context.Context, volumeID string, secrets map[string]string) (*rbdVolume, *util.Credentials, error) { - // validate the volume ID - cr, err := util.NewUserCredentials(secrets) - if err != nil { - return nil, nil, status.Error(codes.Internal, err.Error()) - } - - if volumeID == "" { - return nil, cr, status.Error(codes.InvalidArgument, "empty volume ID in request") - } - - if acquired := cs.VolumeLocks.TryAcquire(volumeID); !acquired { - util.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID) - return nil, cr, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID) - } - - var rbdVol = &rbdVolume{} - rbdVol, err = genVolFromVolID(ctx, volumeID, cr, secrets) - if err != nil { - switch { - case errors.Is(err, ErrImageNotFound): - err = status.Errorf(codes.NotFound, "volume %s not found", volumeID) - case errors.Is(err, util.ErrPoolNotFound): - err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.Pool, volumeID) - default: - err = status.Errorf(codes.Internal, err.Error()) - } - } - return rbdVol, cr, err +// ReplicationServer struct of rbd CSI driver with supported methods of Replication +// controller server spec. +type ReplicationServer struct { + // added UnimplementedControllerServer as a member of + // ControllerServer. if replication spec add more RPC services in the proto + // file, then we don't need to add all RPC methods leading to forward + // compatibility. + *replication.UnimplementedControllerServer + // Embed ControllerServer as it implements helper functions + *ControllerServer } // getForceOption extracts the force option from the GRPC request parameters. @@ -126,30 +105,39 @@ func getMirroringMode(ctx context.Context, parameters map[string]string) (librbd return mirroringMode, nil } -// cleanup performs below resource cleanup operations. -func (cs *ControllerServer) cleanup(rbdVol *rbdVolume, cr *util.Credentials) { - if cr != nil { - // destroy the credential file - cr.DeleteCredentials() - } - if rbdVol != nil { - // release the volume lock - cs.VolumeLocks.Release(rbdVol.VolID) - // destroy the cluster connection - rbdVol.Destroy() - } -} - // EnableVolumeReplication extracts the RBD volume information from the // volumeID, If the image is present it will enable the mirroring based on the // user provided information. -// TODO: create new Replication controller struct for the replication operations. -func (cs *ControllerServer) EnableVolumeReplication(ctx context.Context, +func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context, req *replication.EnableVolumeReplicationRequest, ) (*replication.EnableVolumeReplicationResponse, error) { - rbdVol, cr, err := cs.getVolumeFromID(ctx, req.GetVolumeId(), req.GetSecrets()) - defer cs.cleanup(rbdVol, cr) + volumeID := req.GetVolumeId() + if volumeID == "" { + return nil, status.Error(codes.InvalidArgument, "empty volume ID in request") + } + cr, err := util.NewUserCredentials(req.GetSecrets()) if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + defer cr.DeleteCredentials() + + if acquired := rs.VolumeLocks.TryAcquire(volumeID); !acquired { + util.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID) + return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID) + } + defer rs.VolumeLocks.Release(volumeID) + + rbdVol, err := genVolFromVolID(ctx, volumeID, cr, req.GetSecrets()) + defer rbdVol.Destroy() + if err != nil { + switch { + case errors.Is(err, ErrImageNotFound): + err = status.Errorf(codes.NotFound, "volume %s not found", volumeID) + case errors.Is(err, util.ErrPoolNotFound): + err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.Pool, volumeID) + default: + err = status.Errorf(codes.Internal, err.Error()) + } return nil, err } // extract the mirroring mode @@ -177,15 +165,38 @@ func (cs *ControllerServer) EnableVolumeReplication(ctx context.Context, // DisableVolumeReplication extracts the RBD volume information from the // volumeID, If the image is present and the mirroring is enabled on the RBD // image it will disable the mirroring. -func (cs *ControllerServer) DisableVolumeReplication(ctx context.Context, +func (rs *ReplicationServer) DisableVolumeReplication(ctx context.Context, req *replication.DisableVolumeReplicationRequest, ) (*replication.DisableVolumeReplicationResponse, error) { - rbdVol, cr, err := cs.getVolumeFromID(ctx, req.GetVolumeId(), req.GetSecrets()) - defer cs.cleanup(rbdVol, cr) + volumeID := req.GetVolumeId() + if volumeID == "" { + return nil, status.Error(codes.InvalidArgument, "empty volume ID in request") + } + cr, err := util.NewUserCredentials(req.GetSecrets()) if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + defer cr.DeleteCredentials() + + if acquired := rs.VolumeLocks.TryAcquire(volumeID); !acquired { + util.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID) + return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID) + } + defer rs.VolumeLocks.Release(volumeID) + + rbdVol, err := genVolFromVolID(ctx, volumeID, cr, req.GetSecrets()) + defer rbdVol.Destroy() + if err != nil { + switch { + case errors.Is(err, ErrImageNotFound): + err = status.Errorf(codes.NotFound, "volume %s not found", volumeID) + case errors.Is(err, util.ErrPoolNotFound): + err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.Pool, volumeID) + default: + err = status.Errorf(codes.Internal, err.Error()) + } return nil, err } - // extract the force option force, err := getForceOption(ctx, req.GetParameters()) if err != nil { @@ -224,15 +235,38 @@ func (cs *ControllerServer) DisableVolumeReplication(ctx context.Context, // image is present, mirroring is enabled and the image is in demoted state it // will promote the volume as primary. // If the image is already primary it will return success. -func (cs *ControllerServer) PromoteVolume(ctx context.Context, +func (rs *ReplicationServer) PromoteVolume(ctx context.Context, req *replication.PromoteVolumeRequest, ) (*replication.PromoteVolumeResponse, error) { - rbdVol, cr, err := cs.getVolumeFromID(ctx, req.GetVolumeId(), req.GetSecrets()) - defer cs.cleanup(rbdVol, cr) + volumeID := req.GetVolumeId() + if volumeID == "" { + return nil, status.Error(codes.InvalidArgument, "empty volume ID in request") + } + cr, err := util.NewUserCredentials(req.GetSecrets()) if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + defer cr.DeleteCredentials() + + if acquired := rs.VolumeLocks.TryAcquire(volumeID); !acquired { + util.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID) + return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID) + } + defer rs.VolumeLocks.Release(volumeID) + + rbdVol, err := genVolFromVolID(ctx, volumeID, cr, req.GetSecrets()) + defer rbdVol.Destroy() + if err != nil { + switch { + case errors.Is(err, ErrImageNotFound): + err = status.Errorf(codes.NotFound, "volume %s not found", volumeID) + case errors.Is(err, util.ErrPoolNotFound): + err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.Pool, volumeID) + default: + err = status.Errorf(codes.Internal, err.Error()) + } return nil, err } - // extract the force option force, err := getForceOption(ctx, req.GetParameters()) if err != nil { @@ -265,15 +299,38 @@ func (cs *ControllerServer) PromoteVolume(ctx context.Context, // volumeID, If the image is present, mirroring is enabled and the // image is in promoted state it will demote the volume as secondary. // If the image is already secondary it will return success. -func (cs *ControllerServer) DemoteVolume(ctx context.Context, +func (rs *ReplicationServer) DemoteVolume(ctx context.Context, req *replication.DemoteVolumeRequest, ) (*replication.DemoteVolumeResponse, error) { - rbdVol, cr, err := cs.getVolumeFromID(ctx, req.GetVolumeId(), req.GetSecrets()) - defer cs.cleanup(rbdVol, cr) + volumeID := req.GetVolumeId() + if volumeID == "" { + return nil, status.Error(codes.InvalidArgument, "empty volume ID in request") + } + cr, err := util.NewUserCredentials(req.GetSecrets()) if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + defer cr.DeleteCredentials() + + if acquired := rs.VolumeLocks.TryAcquire(volumeID); !acquired { + util.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID) + return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID) + } + defer rs.VolumeLocks.Release(volumeID) + + rbdVol, err := genVolFromVolID(ctx, volumeID, cr, req.GetSecrets()) + defer rbdVol.Destroy() + if err != nil { + switch { + case errors.Is(err, ErrImageNotFound): + err = status.Errorf(codes.NotFound, "volume %s not found", volumeID) + case errors.Is(err, util.ErrPoolNotFound): + err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.Pool, volumeID) + default: + err = status.Errorf(codes.Internal, err.Error()) + } return nil, err } - mirroringInfo, err := rbdVol.getImageMirroringInfo() if err != nil { util.ErrorLog(ctx, err.Error()) @@ -298,12 +355,35 @@ func (cs *ControllerServer) DemoteVolume(ctx context.Context, // ResyncVolume extracts the RBD volume information from the volumeID, If the // image is present, mirroring is enabled and the image is in demoted state. // If yes it will resync the image to correct the split-brain. -func (cs *ControllerServer) ResyncVolume(ctx context.Context, +func (rs *ReplicationServer) ResyncVolume(ctx context.Context, req *replication.ResyncVolumeRequest, ) (*replication.ResyncVolumeResponse, error) { - rbdVol, cr, err := cs.getVolumeFromID(ctx, req.GetVolumeId(), req.GetSecrets()) - defer cs.cleanup(rbdVol, cr) + volumeID := req.GetVolumeId() + if volumeID == "" { + return nil, status.Error(codes.InvalidArgument, "empty volume ID in request") + } + cr, err := util.NewUserCredentials(req.GetSecrets()) if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + defer cr.DeleteCredentials() + + if acquired := rs.VolumeLocks.TryAcquire(volumeID); !acquired { + util.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID) + return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID) + } + defer rs.VolumeLocks.Release(volumeID) + rbdVol, err := genVolFromVolID(ctx, volumeID, cr, req.GetSecrets()) + defer rbdVol.Destroy() + if err != nil { + switch { + case errors.Is(err, ErrImageNotFound): + err = status.Errorf(codes.NotFound, "volume %s not found", volumeID) + case errors.Is(err, util.ErrPoolNotFound): + err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.Pool, volumeID) + default: + err = status.Errorf(codes.Internal, err.Error()) + } return nil, err }