diff --git a/internal/journal/omap.go b/internal/journal/omap.go index 10e01cab5..23654c8e2 100644 --- a/internal/journal/omap.go +++ b/internal/journal/omap.go @@ -25,14 +25,18 @@ import ( "k8s.io/klog" ) -func getOneOMapValue( +// listExcess is the number of false-positive key-value pairs we will +// accept from ceph when getting omap values. +var listExcess = 32 + +func getOMapValues( ctx context.Context, conn *Connection, - poolName, namespace, oMapName, oMapKey string) (string, error) { + poolName, namespace, oid, prefix string, keys []string) (map[string]string, error) { // fetch and configure the rados ioctx ioctx, err := conn.conn.GetIoctx(poolName) if err != nil { - return "", omapPoolError(poolName, err) + return nil, omapPoolError(poolName, err) } defer ioctx.Destroy() @@ -40,34 +44,37 @@ func getOneOMapValue( ioctx.SetNamespace(namespace) } - pairs, err := ioctx.GetOmapValues( - oMapName, // oid (name of object) - "", // startAfter (ignored) - oMapKey, // filterPrefix - match only keys with this prefix - 1, // maxReturn - fetch no more than N values + results := map[string]string{} + // want is our "lookup map" that ensures O(1) checks for keys + // while iterating, without needing to complicate the caller. + want := make(map[string]bool, len(keys)) + for i := range keys { + want[keys[i]] = true + } + + err = ioctx.ListOmapValues( + oid, "", prefix, int64(len(want)+listExcess), + func(key string, value []byte) { + if want[key] { + results[key] = string(value) + } + }, ) switch err { case nil: case rados.ErrNotFound: klog.Errorf( - util.Log(ctx, "omap not found (pool=%q, namespace=%q, name=%q, key=%q): %v"), - poolName, namespace, oMapName, oMapKey, err) - return "", util.NewErrKeyNotFound(oMapKey, err) + util.Log(ctx, "omap not found (pool=%q, namespace=%q, name=%q): %v"), + poolName, namespace, oid, err) + return nil, util.NewErrKeyNotFound(oid, err) default: - return "", err + return nil, err } - result, found := pairs[oMapKey] - if !found { - klog.Errorf( - util.Log(ctx, "key not found in omap (pool=%q, namespace=%q, name=%q, key=%q): %v"), - poolName, namespace, oMapName, oMapKey, err) - return "", util.NewErrKeyNotFound(oMapKey, nil) - } - klog.Infof( - util.Log(ctx, "XXX key found in omap! (pool=%q, namespace=%q, name=%q, key=%q): %v"), - poolName, namespace, oMapName, oMapKey, result) - return string(result), nil + klog.V(4).Infof( + util.Log(ctx, "got omap values: (pool=%q, namespace=%q, name=%q): %+v"), + poolName, namespace, oid, results) + return results, nil } func removeOneOMapKey( diff --git a/internal/journal/voljournal.go b/internal/journal/voljournal.go index 9357ef520..ff3556a0a 100644 --- a/internal/journal/voljournal.go +++ b/internal/journal/voljournal.go @@ -144,6 +144,9 @@ type Config struct { // encryptKMS in which encryption passphrase was saved, default is no encryption encryptKMSKey string + + // commonPrefix is the prefix common to all omap keys for this Config + commonPrefix string } // NewCSIVolumeJournal returns an instance of CSIJournal for volumes @@ -158,6 +161,7 @@ func NewCSIVolumeJournal(suffix string) *Config { cephSnapSourceKey: "", namespace: "", encryptKMSKey: "csi.volume.encryptKMS", + commonPrefix: "csi.", } } @@ -173,6 +177,7 @@ func NewCSISnapshotJournal(suffix string) *Config { cephSnapSourceKey: "csi.source", namespace: "", encryptKMSKey: "csi.volume.encryptKMS", + commonPrefix: "csi.", } } @@ -264,17 +269,27 @@ func (conn *Connection) CheckReservation(ctx context.Context, } // check if request name is already part of the directory omap - objUUIDAndPool, err := getOneOMapValue(ctx, conn, journalPool, cj.namespace, cj.csiDirectory, - cj.csiNameKeyPrefix+reqName) - if err != nil { - // error should specifically be not found, for volume to be absent, any other error - // is not conclusive, and we should not proceed - switch err.(type) { - case util.ErrKeyNotFound, util.ErrPoolNotFound: - return nil, nil - } + fetchKeys := []string{ + cj.csiNameKeyPrefix + reqName, + } + values, err := getOMapValues( + ctx, conn, journalPool, cj.namespace, cj.csiDirectory, + cj.commonPrefix, fetchKeys) + switch err.(type) { + case nil: + case util.ErrKeyNotFound, util.ErrPoolNotFound: + // pool or omap (oid) was not present + // stop processing but without an error for no reservation exists + return nil, nil + default: return nil, err } + objUUIDAndPool, found := values[cj.csiNameKeyPrefix+reqName] + if !found { + // oamp was read but was missing the desired key-value pair + // stop processing but without an error for no reservation exists + return nil, nil + } // check UUID only encoded value if len(objUUIDAndPool) == uuidEncodedLength { @@ -590,26 +605,33 @@ func (conn *Connection) GetImageAttributes(ctx context.Context, pool, objectUUID return nil, err } - // TODO: fetch all omap vals in one call, than make multiple listomapvals - imageAttributes.RequestName, err = getOneOMapValue(ctx, conn, pool, cj.namespace, - cj.cephUUIDDirectoryPrefix+objectUUID, cj.csiNameKey) - if err != nil { + fetchKeys := []string{ + cj.csiNameKey, + cj.csiImageKey, + cj.encryptKMSKey, + cj.csiJournalPool, + cj.cephSnapSourceKey, + } + values, err := getOMapValues( + ctx, conn, pool, cj.namespace, cj.cephUUIDDirectoryPrefix+objectUUID, + cj.commonPrefix, fetchKeys) + switch err.(type) { + case nil: + case util.ErrPoolNotFound, util.ErrKeyNotFound: + klog.Warningf(util.Log(ctx, "unable to read omap keys: pool or key missing: %v"), err) + default: return nil, err } - // image key was added at some point, so not all volumes will have this key set - // when ceph-csi was upgraded - imageAttributes.ImageName, err = getOneOMapValue(ctx, conn, pool, cj.namespace, - cj.cephUUIDDirectoryPrefix+objectUUID, cj.csiImageKey) - if err != nil { - // if the key was not found, assume the default key + UUID - // otherwise return error - switch err.(type) { - default: - return nil, err - case util.ErrKeyNotFound, util.ErrPoolNotFound: - } + var found bool + imageAttributes.RequestName = values[cj.csiNameKey] + imageAttributes.KmsID = values[cj.encryptKMSKey] + // image key was added at a later point, so not all volumes will have this + // key set when ceph-csi was upgraded + imageAttributes.ImageName, found = values[cj.csiImageKey] + if !found { + // if the key was not found, assume the default key + UUID if snapSource { imageAttributes.ImageName = defaultSnapshotNamingPrefix + objectUUID } else { @@ -617,24 +639,8 @@ func (conn *Connection) GetImageAttributes(ctx context.Context, pool, objectUUID } } - imageAttributes.KmsID, err = getOneOMapValue(ctx, conn, pool, cj.namespace, - cj.cephUUIDDirectoryPrefix+objectUUID, cj.encryptKMSKey) - if err != nil { - // ErrKeyNotFound means no encryption KMS was used - switch err.(type) { - default: - return nil, fmt.Errorf("OMapVal for %s/%s failed to get encryption KMS value: %s", - pool, cj.cephUUIDDirectoryPrefix+objectUUID, err) - case util.ErrKeyNotFound, util.ErrPoolNotFound: - } - } - - journalPoolIDStr, err := getOneOMapValue(ctx, conn, pool, cj.namespace, - cj.cephUUIDDirectoryPrefix+objectUUID, cj.csiJournalPool) - if err != nil { - if _, ok := err.(util.ErrKeyNotFound); !ok { - return nil, err - } + journalPoolIDStr, found := values[cj.csiJournalPool] + if !found { imageAttributes.JournalPoolID = util.InvalidPoolID } else { var buf64 []byte @@ -646,10 +652,11 @@ func (conn *Connection) GetImageAttributes(ctx context.Context, pool, objectUUID } if snapSource { - imageAttributes.SourceName, err = getOneOMapValue(ctx, conn, pool, cj.namespace, - cj.cephUUIDDirectoryPrefix+objectUUID, cj.cephSnapSourceKey) - if err != nil { - return nil, err + imageAttributes.SourceName, found = values[cj.cephSnapSourceKey] + if !found { + return nil, util.NewErrKeyNotFound( + cj.cephSnapSourceKey, + fmt.Errorf("no snap source in omap for %q", cj.cephUUIDDirectoryPrefix+objectUUID)) } }