From 68e93a31ccac7b9b383d55f8d1c7d3eef26f2a23 Mon Sep 17 00:00:00 2001 From: Madhu Rajanna Date: Tue, 19 Mar 2024 14:48:27 +0100 Subject: [PATCH] journal: fix connection problem with groupjournal Same group jounral config need to be reused for multiple connection where different monitors and users are used, for that reason create a unique connection each time. Signed-off-by: Madhu Rajanna --- internal/journal/volumegroupjournal.go | 106 +++++++++++++------------ 1 file changed, 55 insertions(+), 51 deletions(-) diff --git a/internal/journal/volumegroupjournal.go b/internal/journal/volumegroupjournal.go index 1498748d3..75f06a355 100644 --- a/internal/journal/volumegroupjournal.go +++ b/internal/journal/volumegroupjournal.go @@ -32,15 +32,7 @@ const ( ) type VolumeGroupJournal interface { - // Connect establishes a new connection to a ceph cluster for journal metadata. - Connect( - monitors, - namespace string, - cr *util.Credentials) error - // Destroy frees any resources and invalidates the journal connection. Destroy() - // SetNamespace sets the namespace for the journal. - SetNamespace(ns string) CheckReservation( ctx context.Context, journalPool, @@ -78,16 +70,20 @@ type VolumeGroupJournal interface { volumeID string) error } -// volumeGroupJournalConfig contains the configuration and connection details. -type volumeGroupJournalConfig struct { - *Config - *Connection +// VolumeGroupJournalConfig contains the configuration. +type VolumeGroupJournalConfig struct { + Config +} + +type VolumeGroupJournalConnection struct { + config *VolumeGroupJournalConfig + connection *Connection } // NewCSIVolumeGroupJournal returns an instance of VolumeGroupJournal for groups. -func NewCSIVolumeGroupJournal(suffix string) VolumeGroupJournal { - return &volumeGroupJournalConfig{ - Config: &Config{ +func NewCSIVolumeGroupJournal(suffix string) VolumeGroupJournalConfig { + return VolumeGroupJournalConfig{ + Config: Config{ csiDirectory: "csi.groups." + suffix, csiNameKeyPrefix: "csi.volume.group.", cephUUIDDirectoryPrefix: "csi.volume.group.", @@ -98,35 +94,42 @@ func NewCSIVolumeGroupJournal(suffix string) VolumeGroupJournal { } } -func (sgj *volumeGroupJournalConfig) SetNamespace(ns string) { - sgj.Config.namespace = ns +// SetNamespace sets the namespace for the journal. +func (vgc *VolumeGroupJournalConfig) SetNamespace(ns string) { + vgc.Config.namespace = ns } // NewCSIVolumeGroupJournalWithNamespace returns an instance of VolumeGroupJournal for // volume groups using a predetermined namespace value. -func NewCSIVolumeGroupJournalWithNamespace(suffix, ns string) VolumeGroupJournal { +func NewCSIVolumeGroupJournalWithNamespace(suffix, ns string) VolumeGroupJournalConfig { j := NewCSIVolumeGroupJournal(suffix) j.SetNamespace(ns) return j } -func (sgj *volumeGroupJournalConfig) Connect( +// Connect establishes a new connection to a ceph cluster for journal metadata. +func (vgc *VolumeGroupJournalConfig) Connect( monitors, namespace string, cr *util.Credentials, -) error { - conn, err := sgj.Config.Connect(monitors, namespace, cr) - if err != nil { - return err +) (VolumeGroupJournal, error) { + vgjc := &VolumeGroupJournalConnection{} + vgjc.config = &VolumeGroupJournalConfig{ + Config: vgc.Config, } - sgj.Connection = conn + conn, err := vgc.Config.Connect(monitors, namespace, cr) + if err != nil { + return nil, err + } + vgjc.connection = conn - return nil + return vgjc, nil } -func (sgj *volumeGroupJournalConfig) Destroy() { - sgj.Connection.Destroy() +// Destroy frees any resources and invalidates the journal connection. +func (vgjc *VolumeGroupJournalConnection) Destroy() { + vgjc.connection.Destroy() } // VolumeGroupData contains the GroupUUID and VolumeGroupAttributes for a @@ -162,11 +165,11 @@ Return values: reservation found. - error: non-nil in case of any errors. */ -func (sgj *volumeGroupJournalConfig) CheckReservation(ctx context.Context, +func (vgjc *VolumeGroupJournalConnection) CheckReservation(ctx context.Context, journalPool, reqName, namePrefix string, ) (*VolumeGroupData, error) { var ( - cj = sgj.Config + cj = vgjc.config volGroupData = &VolumeGroupData{} ) @@ -175,7 +178,7 @@ func (sgj *volumeGroupJournalConfig) CheckReservation(ctx context.Context, cj.csiNameKeyPrefix + reqName, } values, err := getOMapValues( - ctx, sgj.Connection, journalPool, cj.namespace, cj.csiDirectory, + ctx, vgjc.connection, journalPool, cj.namespace, cj.csiDirectory, cj.commonPrefix, fetchKeys) if err != nil { if errors.Is(err, util.ErrKeyNotFound) || errors.Is(err, util.ErrPoolNotFound) { @@ -195,13 +198,13 @@ func (sgj *volumeGroupJournalConfig) CheckReservation(ctx context.Context, } volGroupData.GroupUUID = objUUID - savedVolumeGroupAttributes, err := sgj.GetVolumeGroupAttributes(ctx, journalPool, + savedVolumeGroupAttributes, err := vgjc.GetVolumeGroupAttributes(ctx, journalPool, objUUID) if err != nil { // error should specifically be not found, for image to be absent, any other error // is not conclusive, and we should not proceed if errors.Is(err, util.ErrKeyNotFound) { - err = sgj.UndoReservation(ctx, journalPool, + err = vgjc.UndoReservation(ctx, journalPool, generateVolumeGroupName(namePrefix, objUUID), reqName) } @@ -239,11 +242,11 @@ Input arguments: - groupID: ID of the volume group, generated from the UUID - reqName: Request name for the volume group */ -func (sgj *volumeGroupJournalConfig) UndoReservation(ctx context.Context, +func (vgjc *VolumeGroupJournalConnection) UndoReservation(ctx context.Context, csiJournalPool, groupID, reqName string, ) error { // delete volume UUID omap (first, inverse of create order) - cj := sgj.Config + cj := vgjc.config if groupID != "" { if len(groupID) < uuidEncodedLength { return fmt.Errorf("unable to parse UUID from %s, too short", groupID) @@ -256,8 +259,8 @@ func (sgj *volumeGroupJournalConfig) UndoReservation(ctx context.Context, err := util.RemoveObject( ctx, - sgj.Connection.monitors, - sgj.Connection.cr, + vgjc.connection.monitors, + vgjc.connection.cr, csiJournalPool, cj.namespace, cj.cephUUIDDirectoryPrefix+groupUUID) @@ -271,7 +274,7 @@ func (sgj *volumeGroupJournalConfig) UndoReservation(ctx context.Context, } // delete the request name key (last, inverse of create order) - err := removeMapKeys(ctx, sgj.Connection, csiJournalPool, cj.namespace, cj.csiDirectory, + err := removeMapKeys(ctx, vgjc.connection, csiJournalPool, cj.namespace, cj.csiDirectory, []string{cj.csiNameKeyPrefix + reqName}) if err != nil { log.ErrorLog(ctx, "failed removing oMap key %s (%s)", cj.csiNameKeyPrefix+reqName, err) @@ -299,11 +302,11 @@ Return values: - string: Contains the VolumeGroup name that was reserved for the passed in reqName - error: non-nil in case of any errors */ -func (sgj *volumeGroupJournalConfig) ReserveName(ctx context.Context, +func (vgjc *VolumeGroupJournalConnection) ReserveName(ctx context.Context, journalPool string, journalPoolID int64, reqName, namePrefix string, ) (string, string, error) { - cj := sgj.Config + cj := vgjc.config // Create the UUID based omap first, to reserve the same and avoid conflicts // NOTE: If any service loss occurs post creation of the UUID directory, and before @@ -311,8 +314,8 @@ func (sgj *volumeGroupJournalConfig) ReserveName(ctx context.Context, // UUID directory key will be leaked objUUID, err := reserveOMapName( ctx, - sgj.Connection.monitors, - sgj.Connection.cr, + vgjc.connection.monitors, + vgjc.connection.cr, journalPool, cj.namespace, cj.cephUUIDDirectoryPrefix, @@ -325,7 +328,7 @@ func (sgj *volumeGroupJournalConfig) ReserveName(ctx context.Context, // After generating the UUID Directory omap, we populate the csiDirectory // omap with a key-value entry to map the request to the backend volume group: // `csiNameKeyPrefix + reqName: nameKeyVal` - err = setOMapKeys(ctx, sgj.Connection, journalPool, cj.namespace, cj.csiDirectory, + err = setOMapKeys(ctx, vgjc.connection, journalPool, cj.namespace, cj.csiDirectory, map[string]string{cj.csiNameKeyPrefix + reqName: nameKeyVal}) if err != nil { return "", "", err @@ -333,7 +336,7 @@ func (sgj *volumeGroupJournalConfig) ReserveName(ctx context.Context, defer func() { if err != nil { log.WarningLog(ctx, "reservation failed for volume group: %s", reqName) - errDefer := sgj.UndoReservation(ctx, journalPool, groupName, reqName) + errDefer := vgjc.UndoReservation(ctx, journalPool, groupName, reqName) if errDefer != nil { log.WarningLog(ctx, "failed undoing reservation of volume group: %s (%v)", reqName, errDefer) } @@ -347,7 +350,7 @@ func (sgj *volumeGroupJournalConfig) ReserveName(ctx context.Context, omapValues[cj.csiNameKey] = reqName omapValues[cj.csiImageKey] = groupName - err = setOMapKeys(ctx, sgj.Connection, journalPool, cj.namespace, oid, omapValues) + err = setOMapKeys(ctx, vgjc.connection, journalPool, cj.namespace, oid, omapValues) if err != nil { return "", "", err } @@ -363,18 +366,18 @@ type VolumeGroupAttributes struct { VolumeSnapshotMap map[string]string // Contains the volumeID and the corresponding snapshotID mapping } -func (sgj *volumeGroupJournalConfig) GetVolumeGroupAttributes( +func (vgjc *VolumeGroupJournalConnection) GetVolumeGroupAttributes( ctx context.Context, pool, objectUUID string, ) (*VolumeGroupAttributes, error) { var ( err error groupAttributes = &VolumeGroupAttributes{} - cj = sgj.Config + cj = vgjc.config ) values, err := listOMapValues( - ctx, sgj.Connection, pool, cj.namespace, cj.cephUUIDDirectoryPrefix+objectUUID, + ctx, vgjc.connection, pool, cj.namespace, cj.cephUUIDDirectoryPrefix+objectUUID, cj.commonPrefix) if err != nil { if !errors.Is(err, util.ErrKeyNotFound) && !errors.Is(err, util.ErrPoolNotFound) { @@ -398,14 +401,14 @@ func (sgj *volumeGroupJournalConfig) GetVolumeGroupAttributes( return groupAttributes, nil } -func (sgj *volumeGroupJournalConfig) AddVolumeSnapshotMapping( +func (vgjc *VolumeGroupJournalConnection) AddVolumeSnapshotMapping( ctx context.Context, pool, reservedUUID, volumeID, snapshotID string, ) error { - err := setOMapKeys(ctx, sgj.Connection, pool, sgj.Config.namespace, sgj.Config.cephUUIDDirectoryPrefix+reservedUUID, + err := setOMapKeys(ctx, vgjc.connection, pool, vgjc.config.namespace, vgjc.config.cephUUIDDirectoryPrefix+reservedUUID, map[string]string{volumeID: snapshotID}) if err != nil { log.ErrorLog(ctx, "failed adding volume snapshot mapping: %v", err) @@ -416,13 +419,14 @@ func (sgj *volumeGroupJournalConfig) AddVolumeSnapshotMapping( return nil } -func (sgj *volumeGroupJournalConfig) RemoveVolumeSnapshotMapping( +func (vgjc *VolumeGroupJournalConnection) RemoveVolumeSnapshotMapping( ctx context.Context, pool, reservedUUID, volumeID string, ) error { - err := removeMapKeys(ctx, sgj.Connection, pool, sgj.Config.namespace, sgj.Config.cephUUIDDirectoryPrefix+reservedUUID, + err := removeMapKeys(ctx, vgjc.connection, pool, vgjc.config.namespace, + vgjc.config.cephUUIDDirectoryPrefix+reservedUUID, []string{volumeID}) if err != nil { log.ErrorLog(ctx, "failed removing volume snapshot mapping: %v", err)