diff --git a/pkg/cephfs/cephfs_util.go b/pkg/cephfs/cephfs_util.go index 1cad969ff..0c0e805ac 100644 --- a/pkg/cephfs/cephfs_util.go +++ b/pkg/cephfs/cephfs_util.go @@ -17,6 +17,7 @@ limitations under the License. package cephfs import ( + "context" "fmt" "github.com/ceph/ceph-csi/pkg/util" @@ -33,11 +34,11 @@ type CephFilesystemDetails struct { MDSMap MDSMap `json:"mdsmap"` } -func getFscID(monitors string, cr *util.Credentials, fsName string) (int64, error) { +func getFscID(ctx context.Context, monitors string, cr *util.Credentials, fsName string) (int64, error) { // ceph fs get myfs --format=json // {"mdsmap":{...},"id":2} var fsDetails CephFilesystemDetails - err := execCommandJSON(&fsDetails, + err := execCommandJSON(ctx, &fsDetails, "ceph", "-m", monitors, "--id", cr.ID, @@ -61,11 +62,11 @@ type CephFilesystem struct { DataPoolIDs []int `json:"data_pool_ids"` } -func getMetadataPool(monitors string, cr *util.Credentials, fsName string) (string, error) { +func getMetadataPool(ctx context.Context, monitors string, cr *util.Credentials, fsName string) (string, error) { // ./tbox ceph fs ls --format=json // [{"name":"myfs","metadata_pool":"myfs-metadata","metadata_pool_id":4,...},...] var filesystems []CephFilesystem - err := execCommandJSON(&filesystems, + err := execCommandJSON(ctx, &filesystems, "ceph", "-m", monitors, "--id", cr.ID, @@ -91,11 +92,11 @@ type CephFilesystemDump struct { Filesystems []CephFilesystemDetails `json:"filesystems"` } -func getFsName(monitors string, cr *util.Credentials, fscID int64) (string, error) { +func getFsName(ctx context.Context, monitors string, cr *util.Credentials, fscID int64) (string, error) { // ./tbox ceph fs dump --format=json // JSON: {...,"filesystems":[{"mdsmap":{},"id":},...],...} var fsDump CephFilesystemDump - err := execCommandJSON(&fsDump, + err := execCommandJSON(ctx, &fsDump, "ceph", "-m", monitors, "--id", cr.ID, diff --git a/pkg/cephfs/cephuser.go b/pkg/cephfs/cephuser.go index d48b30aba..74f0b179d 100644 --- a/pkg/cephfs/cephuser.go +++ b/pkg/cephfs/cephuser.go @@ -17,6 +17,8 @@ limitations under the License. package cephfs import ( + "context" + "github.com/ceph/ceph-csi/pkg/util" ) @@ -33,11 +35,11 @@ func getCephUserName(volID volumeID) string { return cephUserPrefix + string(volID) } -func deleteCephUserDeprecated(volOptions *volumeOptions, adminCr *util.Credentials, volID volumeID) error { +func deleteCephUserDeprecated(ctx context.Context, volOptions *volumeOptions, adminCr *util.Credentials, volID volumeID) error { adminID, userID := genUserIDs(adminCr, volID) // TODO: Need to return success if userID is not found - return execCommandErr("ceph", + return execCommandErr(ctx, "ceph", "-m", volOptions.Monitors, "-n", adminID, "--keyfile="+adminCr.KeyFile, diff --git a/pkg/cephfs/controllerserver.go b/pkg/cephfs/controllerserver.go index 12661ceb8..8c82b393e 100644 --- a/pkg/cephfs/controllerserver.go +++ b/pkg/cephfs/controllerserver.go @@ -46,21 +46,21 @@ var ( ) // createBackingVolume creates the backing subvolume and on any error cleans up any created entities -func (cs *ControllerServer) createBackingVolume(volOptions *volumeOptions, vID *volumeIdentifier, secret map[string]string) error { +func (cs *ControllerServer) createBackingVolume(ctx context.Context, volOptions *volumeOptions, vID *volumeIdentifier, secret map[string]string) error { cr, err := util.NewAdminCredentials(secret) if err != nil { return status.Error(codes.InvalidArgument, err.Error()) } defer cr.DeleteCredentials() - if err = createVolume(volOptions, cr, volumeID(vID.FsSubvolName), volOptions.Size); err != nil { - klog.Errorf("failed to create volume %s: %v", volOptions.RequestName, err) + if err = createVolume(ctx, volOptions, cr, volumeID(vID.FsSubvolName), volOptions.Size); err != nil { + klog.Errorf(util.Log(ctx, "failed to create volume %s: %v"), volOptions.RequestName, err) return status.Error(codes.Internal, err.Error()) } defer func() { if err != nil { - if errDefer := purgeVolume(volumeID(vID.FsSubvolName), cr, volOptions); errDefer != nil { - klog.Warningf("failed purging volume: %s (%s)", volOptions.RequestName, errDefer) + if errDefer := purgeVolume(ctx, volumeID(vID.FsSubvolName), cr, volOptions); errDefer != nil { + klog.Warningf(util.Log(ctx, "failed purging volume: %s (%s)"), volOptions.RequestName, errDefer) } } }() @@ -71,17 +71,17 @@ func (cs *ControllerServer) createBackingVolume(volOptions *volumeOptions, vID * // CreateVolume creates a reservation and the volume in backend, if it is not already present func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) { if err := cs.validateCreateVolumeRequest(req); err != nil { - klog.Errorf("CreateVolumeRequest validation failed: %v", err) + klog.Errorf(util.Log(ctx, "CreateVolumeRequest validation failed: %v"), err) return nil, err } // Configuration secret := req.GetSecrets() requestName := req.GetName() - volOptions, err := newVolumeOptions(requestName, req.GetCapacityRange().GetRequiredBytes(), + volOptions, err := newVolumeOptions(ctx, requestName, req.GetCapacityRange().GetRequiredBytes(), req.GetParameters(), secret) if err != nil { - klog.Errorf("validation and extraction of volume options failed: %v", err) + klog.Errorf(util.Log(ctx, "validation and extraction of volume options failed: %v"), err) return nil, status.Error(codes.InvalidArgument, err.Error()) } @@ -89,7 +89,7 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol idLk := volumeNameLocker.Lock(requestName) defer volumeNameLocker.Unlock(idLk, requestName) - vID, err := checkVolExists(volOptions, secret) + vID, err := checkVolExists(ctx, volOptions, secret) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } @@ -104,27 +104,27 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol } // Reservation - vID, err = reserveVol(volOptions, secret) + vID, err = reserveVol(ctx, volOptions, secret) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } defer func() { if err != nil { - errDefer := undoVolReservation(volOptions, *vID, secret) + errDefer := undoVolReservation(ctx, volOptions, *vID, secret) if errDefer != nil { - klog.Warningf("failed undoing reservation of volume: %s (%s)", + klog.Warningf(util.Log(ctx, "failed undoing reservation of volume: %s (%s)"), requestName, errDefer) } } }() // Create a volume - err = cs.createBackingVolume(volOptions, vID, secret) + err = cs.createBackingVolume(ctx, volOptions, vID, secret) if err != nil { return nil, err } - klog.Infof("cephfs: successfully created backing volume named %s for request name %s", + klog.Infof(util.Log(ctx, "cephfs: successfully created backing volume named %s for request name %s"), vID.FsSubvolName, requestName) return &csi.CreateVolumeResponse{ @@ -138,7 +138,7 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol // deleteVolumeDeprecated is used to delete volumes created using version 1.0.0 of the plugin, // that have state information stored in files or kubernetes config maps -func (cs *ControllerServer) deleteVolumeDeprecated(req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) { +func (cs *ControllerServer) deleteVolumeDeprecated(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) { var ( volID = volumeID(req.GetVolumeId()) secrets = req.GetSecrets() @@ -147,7 +147,7 @@ func (cs *ControllerServer) deleteVolumeDeprecated(req *csi.DeleteVolumeRequest) ce := &controllerCacheEntry{} if err := cs.MetadataStore.Get(string(volID), ce); err != nil { if err, ok := err.(*util.CacheEntryNotFound); ok { - klog.Infof("cephfs: metadata for volume %s not found, assuming the volume to be already deleted (%v)", volID, err) + klog.Infof(util.Log(ctx, "cephfs: metadata for volume %s not found, assuming the volume to be already deleted (%v)"), volID, err) return &csi.DeleteVolumeResponse{}, nil } @@ -157,14 +157,14 @@ func (cs *ControllerServer) deleteVolumeDeprecated(req *csi.DeleteVolumeRequest) if !ce.VolOptions.ProvisionVolume { // DeleteVolume() is forbidden for statically provisioned volumes! - klog.Warningf("volume %s is provisioned statically, aborting delete", volID) + klog.Warningf(util.Log(ctx, "volume %s is provisioned statically, aborting delete"), volID) return &csi.DeleteVolumeResponse{}, nil } // mons may have changed since create volume, // retrieve the latest mons and override old mons if mon, secretsErr := util.GetMonValFromSecret(secrets); secretsErr == nil && len(mon) > 0 { - klog.Infof("overriding monitors [%q] with [%q] for volume %s", ce.VolOptions.Monitors, mon, volID) + klog.Infof(util.Log(ctx, "overriding monitors [%q] with [%q] for volume %s"), ce.VolOptions.Monitors, mon, volID) ce.VolOptions.Monitors = mon } @@ -172,7 +172,7 @@ func (cs *ControllerServer) deleteVolumeDeprecated(req *csi.DeleteVolumeRequest) cr, err := util.NewAdminCredentials(secrets) if err != nil { - klog.Errorf("failed to retrieve admin credentials: %v", err) + klog.Errorf(util.Log(ctx, "failed to retrieve admin credentials: %v"), err) return nil, status.Error(codes.InvalidArgument, err.Error()) } defer cr.DeleteCredentials() @@ -180,13 +180,13 @@ func (cs *ControllerServer) deleteVolumeDeprecated(req *csi.DeleteVolumeRequest) idLk := volumeIDLocker.Lock(string(volID)) defer volumeIDLocker.Unlock(idLk, string(volID)) - if err = purgeVolumeDeprecated(volID, cr, &ce.VolOptions); err != nil { - klog.Errorf("failed to delete volume %s: %v", volID, err) + if err = purgeVolumeDeprecated(ctx, volID, cr, &ce.VolOptions); err != nil { + klog.Errorf(util.Log(ctx, "failed to delete volume %s: %v"), volID, err) return nil, status.Error(codes.Internal, err.Error()) } - if err = deleteCephUserDeprecated(&ce.VolOptions, cr, volID); err != nil { - klog.Errorf("failed to delete ceph user for volume %s: %v", volID, err) + if err = deleteCephUserDeprecated(ctx, &ce.VolOptions, cr, volID); err != nil { + klog.Errorf(util.Log(ctx, "failed to delete ceph user for volume %s: %v"), volID, err) return nil, status.Error(codes.Internal, err.Error()) } @@ -194,7 +194,7 @@ func (cs *ControllerServer) deleteVolumeDeprecated(req *csi.DeleteVolumeRequest) return nil, status.Error(codes.Internal, err.Error()) } - klog.Infof("cephfs: successfully deleted volume %s", volID) + klog.Infof(util.Log(ctx, "cephfs: successfully deleted volume %s"), volID) return &csi.DeleteVolumeResponse{}, nil } @@ -202,7 +202,7 @@ func (cs *ControllerServer) deleteVolumeDeprecated(req *csi.DeleteVolumeRequest) // DeleteVolume deletes the volume in backend and its reservation func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) { if err := cs.validateDeleteVolumeRequest(); err != nil { - klog.Errorf("DeleteVolumeRequest validation failed: %v", err) + klog.Errorf(util.Log(ctx, "DeleteVolumeRequest validation failed: %v"), err) return nil, err } @@ -210,7 +210,7 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol secrets := req.GetSecrets() // Find the volume using the provided VolumeID - volOptions, vID, err := newVolumeOptionsFromVolID(string(volID), nil, secrets) + volOptions, vID, err := newVolumeOptionsFromVolID(ctx, string(volID), nil, secrets) if err != nil { // if error is ErrKeyNotFound, then a previous attempt at deletion was complete // or partially complete (subvolume and imageOMap are garbage collected already), hence @@ -221,7 +221,7 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol // ErrInvalidVolID may mean this is an 1.0.0 version volume if _, ok := err.(ErrInvalidVolID); ok && cs.MetadataStore != nil { - return cs.deleteVolumeDeprecated(req) + return cs.deleteVolumeDeprecated(ctx, req) } return nil, status.Error(codes.Internal, err.Error()) @@ -230,7 +230,7 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol // Deleting a volume requires admin credentials cr, err := util.NewAdminCredentials(secrets) if err != nil { - klog.Errorf("failed to retrieve admin credentials: %v", err) + klog.Errorf(util.Log(ctx, "failed to retrieve admin credentials: %v"), err) return nil, status.Error(codes.InvalidArgument, err.Error()) } defer cr.DeleteCredentials() @@ -240,16 +240,16 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol idLk := volumeNameLocker.Lock(volOptions.RequestName) defer volumeNameLocker.Unlock(idLk, volOptions.RequestName) - if err = purgeVolume(volumeID(vID.FsSubvolName), cr, volOptions); err != nil { - klog.Errorf("failed to delete volume %s: %v", volID, err) + if err = purgeVolume(ctx, volumeID(vID.FsSubvolName), cr, volOptions); err != nil { + klog.Errorf(util.Log(ctx, "failed to delete volume %s: %v"), volID, err) return nil, status.Error(codes.Internal, err.Error()) } - if err := undoVolReservation(volOptions, *vID, secrets); err != nil { + if err := undoVolReservation(ctx, volOptions, *vID, secrets); err != nil { return nil, status.Error(codes.Internal, err.Error()) } - klog.Infof("cephfs: successfully deleted volume %s", volID) + klog.Infof(util.Log(ctx, "cephfs: successfully deleted volume %s"), volID) return &csi.DeleteVolumeResponse{}, nil } diff --git a/pkg/cephfs/fsjournal.go b/pkg/cephfs/fsjournal.go index 9e96e173d..09ca9c9d1 100644 --- a/pkg/cephfs/fsjournal.go +++ b/pkg/cephfs/fsjournal.go @@ -17,6 +17,8 @@ limitations under the License. package cephfs import ( + "context" + "github.com/ceph/ceph-csi/pkg/util" "k8s.io/klog" @@ -43,7 +45,7 @@ because, the order of omap creation and deletion are inverse of each other, and request name lock, and hence any stale omaps are leftovers from incomplete transactions and are hence safe to garbage collect. */ -func checkVolExists(volOptions *volumeOptions, secret map[string]string) (*volumeIdentifier, error) { +func checkVolExists(ctx context.Context, volOptions *volumeOptions, secret map[string]string) (*volumeIdentifier, error) { var ( vi util.CSIIdentifier vid volumeIdentifier @@ -55,7 +57,7 @@ func checkVolExists(volOptions *volumeOptions, secret map[string]string) (*volum } defer cr.DeleteCredentials() - imageUUID, err := volJournal.CheckReservation(volOptions.Monitors, cr, + imageUUID, err := volJournal.CheckReservation(ctx, volOptions.Monitors, cr, volOptions.MetadataPool, volOptions.RequestName, "") if err != nil { return nil, err @@ -79,21 +81,21 @@ func checkVolExists(volOptions *volumeOptions, secret map[string]string) (*volum return nil, err } - klog.V(4).Infof("Found existing volume (%s) with subvolume name (%s) for request (%s)", + klog.V(4).Infof(util.Log(ctx, "Found existing volume (%s) with subvolume name (%s) for request (%s)"), vid.VolumeID, vid.FsSubvolName, volOptions.RequestName) return &vid, nil } // undoVolReservation is a helper routine to undo a name reservation for a CSI VolumeName -func undoVolReservation(volOptions *volumeOptions, vid volumeIdentifier, secret map[string]string) error { +func undoVolReservation(ctx context.Context, volOptions *volumeOptions, vid volumeIdentifier, secret map[string]string) error { cr, err := util.NewAdminCredentials(secret) if err != nil { return err } defer cr.DeleteCredentials() - err = volJournal.UndoReservation(volOptions.Monitors, cr, volOptions.MetadataPool, + err = volJournal.UndoReservation(ctx, volOptions.Monitors, cr, volOptions.MetadataPool, vid.FsSubvolName, volOptions.RequestName) return err @@ -101,7 +103,7 @@ func undoVolReservation(volOptions *volumeOptions, vid volumeIdentifier, secret // reserveVol is a helper routine to request a UUID reservation for the CSI VolumeName and, // to generate the volume identifier for the reserved UUID -func reserveVol(volOptions *volumeOptions, secret map[string]string) (*volumeIdentifier, error) { +func reserveVol(ctx context.Context, volOptions *volumeOptions, secret map[string]string) (*volumeIdentifier, error) { var ( vi util.CSIIdentifier vid volumeIdentifier @@ -113,7 +115,7 @@ func reserveVol(volOptions *volumeOptions, secret map[string]string) (*volumeIde } defer cr.DeleteCredentials() - imageUUID, err := volJournal.ReserveName(volOptions.Monitors, cr, + imageUUID, err := volJournal.ReserveName(ctx, volOptions.Monitors, cr, volOptions.MetadataPool, volOptions.RequestName, "") if err != nil { return nil, err @@ -132,7 +134,7 @@ func reserveVol(volOptions *volumeOptions, secret map[string]string) (*volumeIde return nil, err } - klog.V(4).Infof("Generated Volume ID (%s) and subvolume name (%s) for request name (%s)", + klog.V(4).Infof(util.Log(ctx, "Generated Volume ID (%s) and subvolume name (%s) for request name (%s)"), vid.VolumeID, vid.FsSubvolName, volOptions.RequestName) return &vid, nil diff --git a/pkg/cephfs/mountcache.go b/pkg/cephfs/mountcache.go index e20da712d..8c85cea82 100644 --- a/pkg/cephfs/mountcache.go +++ b/pkg/cephfs/mountcache.go @@ -1,6 +1,23 @@ +/* +Copyright 2019 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package cephfs import ( + "context" "encoding/base64" "os" "sync" @@ -51,7 +68,7 @@ func remountCachedVolumes() error { me := &volumeMountCacheEntry{} err := volumeMountCache.nodeCacheStore.ForAll(volumeMountCachePrefix, me, func(identifier string) error { volID := me.VolumeID - if volOpts, vid, err := newVolumeOptionsFromVolID(me.VolumeID, nil, decodeCredentials(me.Secrets)); err != nil { + if volOpts, vid, err := newVolumeOptionsFromVolID(context.TODO(), me.VolumeID, nil, decodeCredentials(me.Secrets)); err != nil { if err, ok := err.(util.ErrKeyNotFound); ok { klog.Infof("mount-cache: image key not found, assuming the volume %s to be already deleted (%v)", volID, err) if err := volumeMountCache.nodeCacheStore.Delete(genVolumeMountCacheFileName(volID)); err == nil { @@ -101,7 +118,7 @@ func mountOneCacheEntry(volOptions *volumeOptions, vid *volumeIdentifier, me *vo } defer cr.DeleteCredentials() - volOptions.RootPath, err = getVolumeRootPathCeph(volOptions, cr, volumeID(vid.FsSubvolName)) + volOptions.RootPath, err = getVolumeRootPathCeph(context.TODO(), volOptions, cr, volumeID(vid.FsSubvolName)) if err != nil { return err } @@ -131,7 +148,7 @@ func mountOneCacheEntry(volOptions *volumeOptions, vid *volumeIdentifier, me *vo klog.Errorf("mount-cache: failed to create mounter for volume %s: %v", volID, err) return err } - if err := m.mount(me.StagingPath, cr, volOptions); err != nil { + if err := m.mount(context.TODO(), me.StagingPath, cr, volOptions); err != nil { klog.Errorf("mount-cache: failed to mount volume %s: %v", volID, err) return err } @@ -140,7 +157,7 @@ func mountOneCacheEntry(volOptions *volumeOptions, vid *volumeIdentifier, me *vo mountOptions := []string{"bind"} for targetPath, readOnly := range me.TargetPaths { if err := cleanupMountPoint(targetPath); err == nil { - if err := bindMount(me.StagingPath, targetPath, readOnly, mountOptions); err != nil { + if err := bindMount(context.TODO(), me.StagingPath, targetPath, readOnly, mountOptions); err != nil { klog.Errorf("mount-cache: failed to bind-mount volume %s: %s %s %v %v", volID, me.StagingPath, targetPath, readOnly, err) } else { @@ -156,7 +173,7 @@ func cleanupMountPoint(mountPoint string) error { if _, err := os.Stat(mountPoint); err != nil { if isCorruptedMnt(err) { klog.Infof("mount-cache: corrupted mount point %s, need unmount", mountPoint) - err := execCommandErr("umount", mountPoint) + err := execCommandErr(context.TODO(), "umount", mountPoint) if err != nil { klog.Infof("mount-cache: failed to umount %s %v", mountPoint, err) // ignore error return err @@ -205,7 +222,7 @@ func (mc *volumeMountCacheMap) isEnable() bool { return mc.nodeCacheStore.BasePath != "" } -func (mc *volumeMountCacheMap) nodeStageVolume(volID, stagingTargetPath, mounter string, secrets map[string]string) error { +func (mc *volumeMountCacheMap) nodeStageVolume(ctx context.Context, volID, stagingTargetPath, mounter string, secrets map[string]string) error { if !mc.isEnable() { return nil } @@ -216,11 +233,11 @@ func (mc *volumeMountCacheMap) nodeStageVolume(volID, stagingTargetPath, mounter me, ok := volumeMountCache.volumes[volID] if ok { if me.StagingPath == stagingTargetPath { - klog.Warningf("mount-cache: node unexpected restage volume for volume %s", volID) + klog.Warningf(util.Log(ctx, "mount-cache: node unexpected restage volume for volume %s"), volID) return nil } lastTargetPaths = me.TargetPaths - klog.Warningf("mount-cache: node stage volume ignore last cache entry for volume %s", volID) + klog.Warningf(util.Log(ctx, "mount-cache: node stage volume ignore last cache entry for volume %s"), volID) } me = volumeMountCacheEntry{DriverVersion: util.DriverVersion} @@ -246,7 +263,7 @@ func (mc *volumeMountCacheMap) nodeUnStageVolume(volID string) error { return mc.nodeCacheStore.Delete(genVolumeMountCacheFileName(volID)) } -func (mc *volumeMountCacheMap) nodePublishVolume(volID, targetPath string, readOnly bool) error { +func (mc *volumeMountCacheMap) nodePublishVolume(ctx context.Context, volID, targetPath string, readOnly bool) error { if !mc.isEnable() { return nil } @@ -258,10 +275,10 @@ func (mc *volumeMountCacheMap) nodePublishVolume(volID, targetPath string, readO return errors.New("mount-cache: node publish volume failed to find cache entry for volume") } volumeMountCache.volumes[volID].TargetPaths[targetPath] = readOnly - return mc.updateNodeCache(volID) + return mc.updateNodeCache(ctx, volID) } -func (mc *volumeMountCacheMap) nodeUnPublishVolume(volID, targetPath string) error { +func (mc *volumeMountCacheMap) nodeUnPublishVolume(ctx context.Context, volID, targetPath string) error { if !mc.isEnable() { return nil } @@ -273,13 +290,13 @@ func (mc *volumeMountCacheMap) nodeUnPublishVolume(volID, targetPath string) err return errors.New("mount-cache: node unpublish volume failed to find cache entry for volume") } delete(volumeMountCache.volumes[volID].TargetPaths, targetPath) - return mc.updateNodeCache(volID) + return mc.updateNodeCache(ctx, volID) } -func (mc *volumeMountCacheMap) updateNodeCache(volID string) error { +func (mc *volumeMountCacheMap) updateNodeCache(ctx context.Context, volID string) error { me := volumeMountCache.volumes[volID] if err := volumeMountCache.nodeCacheStore.Delete(genVolumeMountCacheFileName(volID)); err == nil { - klog.Infof("mount-cache: metadata not found, delete mount cache failed for volume %s", volID) + klog.Infof(util.Log(ctx, "mount-cache: metadata not found, delete mount cache failed for volume %s"), volID) } return mc.nodeCacheStore.Create(genVolumeMountCacheFileName(volID), me) } diff --git a/pkg/cephfs/nodeserver.go b/pkg/cephfs/nodeserver.go index eedd49040..27494aa44 100644 --- a/pkg/cephfs/nodeserver.go +++ b/pkg/cephfs/nodeserver.go @@ -80,7 +80,7 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol stagingTargetPath := req.GetStagingTargetPath() volID := volumeID(req.GetVolumeId()) - volOptions, _, err := newVolumeOptionsFromVolID(string(volID), req.GetVolumeContext(), req.GetSecrets()) + volOptions, _, err := newVolumeOptionsFromVolID(ctx, string(volID), req.GetVolumeContext(), req.GetSecrets()) if err != nil { if _, ok := err.(ErrInvalidVolID); !ok { return nil, status.Error(codes.Internal, err.Error()) @@ -110,50 +110,50 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol isMnt, err := util.IsMountPoint(stagingTargetPath) if err != nil { - klog.Errorf("stat failed: %v", err) + klog.Errorf(util.Log(ctx, "stat failed: %v"), err) return nil, status.Error(codes.Internal, err.Error()) } if isMnt { - klog.Infof("cephfs: volume %s is already mounted to %s, skipping", volID, stagingTargetPath) + klog.Infof(util.Log(ctx, "cephfs: volume %s is already mounted to %s, skipping"), volID, stagingTargetPath) return &csi.NodeStageVolumeResponse{}, nil } // It's not, mount now - if err = ns.mount(volOptions, req); err != nil { + if err = ns.mount(ctx, volOptions, req); err != nil { return nil, err } - klog.Infof("cephfs: successfully mounted volume %s to %s", volID, stagingTargetPath) + klog.Infof(util.Log(ctx, "cephfs: successfully mounted volume %s to %s"), volID, stagingTargetPath) return &csi.NodeStageVolumeResponse{}, nil } -func (*NodeServer) mount(volOptions *volumeOptions, req *csi.NodeStageVolumeRequest) error { +func (*NodeServer) mount(ctx context.Context, volOptions *volumeOptions, req *csi.NodeStageVolumeRequest) error { stagingTargetPath := req.GetStagingTargetPath() volID := volumeID(req.GetVolumeId()) cr, err := getCredentialsForVolume(volOptions, req) if err != nil { - klog.Errorf("failed to get ceph credentials for volume %s: %v", volID, err) + klog.Errorf(util.Log(ctx, "failed to get ceph credentials for volume %s: %v"), volID, err) return status.Error(codes.Internal, err.Error()) } defer cr.DeleteCredentials() m, err := newMounter(volOptions) if err != nil { - klog.Errorf("failed to create mounter for volume %s: %v", volID, err) + klog.Errorf(util.Log(ctx, "failed to create mounter for volume %s: %v"), volID, err) return status.Error(codes.Internal, err.Error()) } - klog.V(4).Infof("cephfs: mounting volume %s with %s", volID, m.name()) + klog.V(4).Infof(util.Log(ctx, "cephfs: mounting volume %s with %s"), volID, m.name()) - if err = m.mount(stagingTargetPath, cr, volOptions); err != nil { - klog.Errorf("failed to mount volume %s: %v", volID, err) + if err = m.mount(ctx, stagingTargetPath, cr, volOptions); err != nil { + klog.Errorf(util.Log(ctx, "failed to mount volume %s: %v"), volID, err) return status.Error(codes.Internal, err.Error()) } - if err := volumeMountCache.nodeStageVolume(req.GetVolumeId(), stagingTargetPath, volOptions.Mounter, req.GetSecrets()); err != nil { - klog.Warningf("mount-cache: failed to stage volume %s %s: %v", volID, stagingTargetPath, err) + if err := volumeMountCache.nodeStageVolume(ctx, req.GetVolumeId(), stagingTargetPath, volOptions.Mounter, req.GetSecrets()); err != nil { + klog.Warningf(util.Log(ctx, "mount-cache: failed to stage volume %s %s: %v"), volID, stagingTargetPath, err) } return nil } @@ -173,7 +173,7 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis volID := req.GetVolumeId() if err := util.CreateMountPoint(targetPath); err != nil { - klog.Errorf("failed to create mount point at %s: %v", targetPath, err) + klog.Errorf(util.Log(ctx, "failed to create mount point at %s: %v"), targetPath, err) return nil, status.Error(codes.Internal, err.Error()) } @@ -204,31 +204,31 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis isMnt, err := util.IsMountPoint(targetPath) if err != nil { - klog.Errorf("stat failed: %v", err) + klog.Errorf(util.Log(ctx, "stat failed: %v"), err) return nil, status.Error(codes.Internal, err.Error()) } if isMnt { - klog.Infof("cephfs: volume %s is already bind-mounted to %s", volID, targetPath) + klog.Infof(util.Log(ctx, "cephfs: volume %s is already bind-mounted to %s"), volID, targetPath) return &csi.NodePublishVolumeResponse{}, nil } // It's not, mount now - if err = bindMount(req.GetStagingTargetPath(), req.GetTargetPath(), req.GetReadonly(), mountOptions); err != nil { - klog.Errorf("failed to bind-mount volume %s: %v", volID, err) + if err = bindMount(ctx, req.GetStagingTargetPath(), req.GetTargetPath(), req.GetReadonly(), mountOptions); err != nil { + klog.Errorf(util.Log(ctx, "failed to bind-mount volume %s: %v"), volID, err) return nil, status.Error(codes.Internal, err.Error()) } - if err = volumeMountCache.nodePublishVolume(volID, targetPath, req.GetReadonly()); err != nil { - klog.Warningf("mount-cache: failed to publish volume %s %s: %v", volID, targetPath, err) + if err = volumeMountCache.nodePublishVolume(ctx, volID, targetPath, req.GetReadonly()); err != nil { + klog.Warningf(util.Log(ctx, "mount-cache: failed to publish volume %s %s: %v"), volID, targetPath, err) } - klog.Infof("cephfs: successfully bind-mounted volume %s to %s", volID, targetPath) + klog.Infof(util.Log(ctx, "cephfs: successfully bind-mounted volume %s to %s"), volID, targetPath) err = os.Chmod(targetPath, 0777) if err != nil { - klog.Errorf("failed to change targetpath permission for volume %s: %v", volID, err) + klog.Errorf(util.Log(ctx, "failed to change targetpath permission for volume %s: %v"), volID, err) return nil, status.Error(codes.Internal, err.Error()) } @@ -245,12 +245,12 @@ func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu targetPath := req.GetTargetPath() volID := req.GetVolumeId() - if err = volumeMountCache.nodeUnPublishVolume(volID, targetPath); err != nil { - klog.Warningf("mount-cache: failed to unpublish volume %s %s: %v", volID, targetPath, err) + if err = volumeMountCache.nodeUnPublishVolume(ctx, volID, targetPath); err != nil { + klog.Warningf(util.Log(ctx, "mount-cache: failed to unpublish volume %s %s: %v"), volID, targetPath, err) } // Unmount the bind-mount - if err = unmountVolume(targetPath); err != nil { + if err = unmountVolume(ctx, targetPath); err != nil { return nil, status.Error(codes.Internal, err.Error()) } @@ -258,7 +258,7 @@ func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu return nil, status.Error(codes.Internal, err.Error()) } - klog.Infof("cephfs: successfully unbinded volume %s from %s", req.GetVolumeId(), targetPath) + klog.Infof(util.Log(ctx, "cephfs: successfully unbinded volume %s from %s"), req.GetVolumeId(), targetPath) return &csi.NodeUnpublishVolumeResponse{}, nil } @@ -274,15 +274,15 @@ func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag volID := req.GetVolumeId() if err = volumeMountCache.nodeUnStageVolume(volID); err != nil { - klog.Warningf("mount-cache: failed to unstage volume %s %s: %v", volID, stagingTargetPath, err) + klog.Warningf(util.Log(ctx, "mount-cache: failed to unstage volume %s %s: %v"), volID, stagingTargetPath, err) } // Unmount the volume - if err = unmountVolume(stagingTargetPath); err != nil { + if err = unmountVolume(ctx, stagingTargetPath); err != nil { return nil, status.Error(codes.Internal, err.Error()) } - klog.Infof("cephfs: successfully unmounted volume %s from %s", req.GetVolumeId(), stagingTargetPath) + klog.Infof(util.Log(ctx, "cephfs: successfully unmounted volume %s from %s"), req.GetVolumeId(), stagingTargetPath) return &csi.NodeUnstageVolumeResponse{}, nil } diff --git a/pkg/cephfs/util.go b/pkg/cephfs/util.go index 377d03fd6..e1b9af8a1 100644 --- a/pkg/cephfs/util.go +++ b/pkg/cephfs/util.go @@ -18,6 +18,7 @@ package cephfs import ( "bytes" + "context" "encoding/json" "fmt" "os" @@ -33,7 +34,7 @@ import ( type volumeID string -func execCommand(program string, args ...string) (stdout, stderr []byte, err error) { +func execCommand(ctx context.Context, program string, args ...string) (stdout, stderr []byte, err error) { var ( cmd = exec.Command(program, args...) // nolint: gosec sanitizedArgs = util.StripSecretInArgs(args) @@ -44,7 +45,7 @@ func execCommand(program string, args ...string) (stdout, stderr []byte, err err cmd.Stdout = &stdoutBuf cmd.Stderr = &stderrBuf - klog.V(4).Infof("cephfs: EXEC %s %s", program, sanitizedArgs) + klog.V(4).Infof(util.Log(ctx, "cephfs: EXEC %s %s"), program, sanitizedArgs) if err := cmd.Run(); err != nil { if cmd.Process == nil { @@ -58,14 +59,14 @@ func execCommand(program string, args ...string) (stdout, stderr []byte, err err return stdoutBuf.Bytes(), stderrBuf.Bytes(), nil } -func execCommandErr(program string, args ...string) error { - _, _, err := execCommand(program, args...) +func execCommandErr(ctx context.Context, program string, args ...string) error { + _, _, err := execCommand(ctx, program, args...) return err } //nolint: unparam -func execCommandJSON(v interface{}, program string, args ...string) error { - stdout, _, err := execCommand(program, args...) +func execCommandJSON(ctx context.Context, v interface{}, program string, args ...string) error { + stdout, _, err := execCommand(ctx, program, args...) if err != nil { return err } diff --git a/pkg/cephfs/volume.go b/pkg/cephfs/volume.go index 8f3e2fdb4..56e1eb482 100644 --- a/pkg/cephfs/volume.go +++ b/pkg/cephfs/volume.go @@ -17,6 +17,7 @@ limitations under the License. package cephfs import ( + "context" "fmt" "os" "path" @@ -51,7 +52,7 @@ func getCephRootPathLocalDeprecated(volID volumeID) string { return fmt.Sprintf("%s/controller/volumes/root-%s", PluginFolder, string(volID)) } -func getVolumeRootPathCeph(volOptions *volumeOptions, cr *util.Credentials, volID volumeID) (string, error) { +func getVolumeRootPathCeph(ctx context.Context, volOptions *volumeOptions, cr *util.Credentials, volID volumeID) (string, error) { stdout, _, err := util.ExecCommand( "ceph", "fs", @@ -67,16 +68,17 @@ func getVolumeRootPathCeph(volOptions *volumeOptions, cr *util.Credentials, volI "--keyfile="+cr.KeyFile) if err != nil { - klog.Errorf("failed to get the rootpath for the vol %s(%s)", string(volID), err) + klog.Errorf(util.Log(ctx, "failed to get the rootpath for the vol %s(%s)"), string(volID), err) return "", err } return strings.TrimSuffix(string(stdout), "\n"), nil } -func createVolume(volOptions *volumeOptions, cr *util.Credentials, volID volumeID, bytesQuota int64) error { +func createVolume(ctx context.Context, volOptions *volumeOptions, cr *util.Credentials, volID volumeID, bytesQuota int64) error { //TODO: When we support multiple fs, need to hande subvolume group create for all fs's if !cephfsInit { err := execCommandErr( + ctx, "ceph", "fs", "subvolumegroup", @@ -88,10 +90,10 @@ func createVolume(volOptions *volumeOptions, cr *util.Credentials, volID volumeI "-n", cephEntityClientPrefix+cr.ID, "--keyfile="+cr.KeyFile) if err != nil { - klog.Errorf("failed to create subvolume group csi, for the vol %s(%s)", string(volID), err) + klog.Errorf(util.Log(ctx, "failed to create subvolume group csi, for the vol %s(%s)"), string(volID), err) return err } - klog.V(4).Infof("cephfs: created subvolume group csi") + klog.V(4).Infof(util.Log(ctx, "cephfs: created subvolume group csi")) cephfsInit = true } @@ -116,17 +118,18 @@ func createVolume(volOptions *volumeOptions, cr *util.Credentials, volID volumeI } err := execCommandErr( + ctx, "ceph", args[:]...) if err != nil { - klog.Errorf("failed to create subvolume %s(%s) in fs %s", string(volID), err, volOptions.FsName) + klog.Errorf(util.Log(ctx, "failed to create subvolume %s(%s) in fs %s"), string(volID), err, volOptions.FsName) return err } return nil } -func mountCephRoot(volID volumeID, volOptions *volumeOptions, adminCr *util.Credentials) error { +func mountCephRoot(ctx context.Context, volID volumeID, volOptions *volumeOptions, adminCr *util.Credentials) error { cephRoot := getCephRootPathLocalDeprecated(volID) // Root path is not set for dynamically provisioned volumes @@ -142,30 +145,30 @@ func mountCephRoot(volID volumeID, volOptions *volumeOptions, adminCr *util.Cred return fmt.Errorf("failed to create mounter: %v", err) } - if err = m.mount(cephRoot, adminCr, volOptions); err != nil { + if err = m.mount(ctx, cephRoot, adminCr, volOptions); err != nil { return fmt.Errorf("error mounting ceph root: %v", err) } return nil } -func unmountCephRoot(volID volumeID) { +func unmountCephRoot(ctx context.Context, volID volumeID) { cephRoot := getCephRootPathLocalDeprecated(volID) - if err := unmountVolume(cephRoot); err != nil { - klog.Errorf("failed to unmount %s with error %s", cephRoot, err) + if err := unmountVolume(ctx, cephRoot); err != nil { + klog.Errorf(util.Log(ctx, "failed to unmount %s with error %s"), cephRoot, err) } else { if err := os.Remove(cephRoot); err != nil { - klog.Errorf("failed to remove %s with error %s", cephRoot, err) + klog.Errorf(util.Log(ctx, "failed to remove %s with error %s"), cephRoot, err) } } } -func purgeVolumeDeprecated(volID volumeID, adminCr *util.Credentials, volOptions *volumeOptions) error { - if err := mountCephRoot(volID, volOptions, adminCr); err != nil { +func purgeVolumeDeprecated(ctx context.Context, volID volumeID, adminCr *util.Credentials, volOptions *volumeOptions) error { + if err := mountCephRoot(ctx, volID, volOptions, adminCr); err != nil { return err } - defer unmountCephRoot(volID) + defer unmountCephRoot(ctx, volID) var ( volRoot = getCephRootVolumePathLocalDeprecated(volID) @@ -178,7 +181,7 @@ func purgeVolumeDeprecated(volID volumeID, adminCr *util.Credentials, volOptions } } else { if !pathExists(volRootDeleting) { - klog.V(4).Infof("cephfs: volume %s not found, assuming it to be already deleted", volID) + klog.V(4).Infof(util.Log(ctx, "cephfs: volume %s not found, assuming it to be already deleted"), volID) return nil } } @@ -190,8 +193,9 @@ func purgeVolumeDeprecated(volID volumeID, adminCr *util.Credentials, volOptions return nil } -func purgeVolume(volID volumeID, cr *util.Credentials, volOptions *volumeOptions) error { +func purgeVolume(ctx context.Context, volID volumeID, cr *util.Credentials, volOptions *volumeOptions) error { err := execCommandErr( + ctx, "ceph", "fs", "subvolume", @@ -206,7 +210,7 @@ func purgeVolume(volID volumeID, cr *util.Credentials, volOptions *volumeOptions "-n", cephEntityClientPrefix+cr.ID, "--keyfile="+cr.KeyFile) if err != nil { - klog.Errorf("failed to purge subvolume %s(%s) in fs %s", string(volID), err, volOptions.FsName) + klog.Errorf(util.Log(ctx, "failed to purge subvolume %s(%s) in fs %s"), string(volID), err, volOptions.FsName) return err } diff --git a/pkg/cephfs/volumemounter.go b/pkg/cephfs/volumemounter.go index fee7da0e5..7b656997c 100644 --- a/pkg/cephfs/volumemounter.go +++ b/pkg/cephfs/volumemounter.go @@ -17,6 +17,7 @@ limitations under the License. package cephfs import ( + "context" "errors" "fmt" "os" @@ -70,7 +71,7 @@ func loadAvailableMounters() error { } type volumeMounter interface { - mount(mountPoint string, cr *util.Credentials, volOptions *volumeOptions) error + mount(ctx context.Context, mountPoint string, cr *util.Credentials, volOptions *volumeOptions) error name() string } @@ -114,7 +115,7 @@ func newMounter(volOptions *volumeOptions) (volumeMounter, error) { type fuseMounter struct{} -func mountFuse(mountPoint string, cr *util.Credentials, volOptions *volumeOptions) error { +func mountFuse(ctx context.Context, mountPoint string, cr *util.Credentials, volOptions *volumeOptions) error { args := []string{ mountPoint, "-m", volOptions.Monitors, @@ -128,7 +129,7 @@ func mountFuse(mountPoint string, cr *util.Credentials, volOptions *volumeOption args = append(args, "--client_mds_namespace="+volOptions.FsName) } - _, stderr, err := execCommand("ceph-fuse", args[:]...) + _, stderr, err := execCommand(ctx, "ceph-fuse", args[:]...) if err != nil { return err } @@ -154,20 +155,20 @@ func mountFuse(mountPoint string, cr *util.Credentials, volOptions *volumeOption return nil } -func (m *fuseMounter) mount(mountPoint string, cr *util.Credentials, volOptions *volumeOptions) error { +func (m *fuseMounter) mount(ctx context.Context, mountPoint string, cr *util.Credentials, volOptions *volumeOptions) error { if err := util.CreateMountPoint(mountPoint); err != nil { return err } - return mountFuse(mountPoint, cr, volOptions) + return mountFuse(ctx, mountPoint, cr, volOptions) } func (m *fuseMounter) name() string { return "Ceph FUSE driver" } type kernelMounter struct{} -func mountKernel(mountPoint string, cr *util.Credentials, volOptions *volumeOptions) error { - if err := execCommandErr("modprobe", "ceph"); err != nil { +func mountKernel(ctx context.Context, mountPoint string, cr *util.Credentials, volOptions *volumeOptions) error { + if err := execCommandErr(ctx, "modprobe", "ceph"); err != nil { return err } @@ -182,28 +183,28 @@ func mountKernel(mountPoint string, cr *util.Credentials, volOptions *volumeOpti } args = append(args, "-o", optionsStr) - return execCommandErr("mount", args[:]...) + return execCommandErr(ctx, "mount", args[:]...) } -func (m *kernelMounter) mount(mountPoint string, cr *util.Credentials, volOptions *volumeOptions) error { +func (m *kernelMounter) mount(ctx context.Context, mountPoint string, cr *util.Credentials, volOptions *volumeOptions) error { if err := util.CreateMountPoint(mountPoint); err != nil { return err } - return mountKernel(mountPoint, cr, volOptions) + return mountKernel(ctx, mountPoint, cr, volOptions) } func (m *kernelMounter) name() string { return "Ceph kernel client" } -func bindMount(from, to string, readOnly bool, mntOptions []string) error { +func bindMount(ctx context.Context, from, to string, readOnly bool, mntOptions []string) error { mntOptionSli := strings.Join(mntOptions, ",") - if err := execCommandErr("mount", "-o", mntOptionSli, from, to); err != nil { + if err := execCommandErr(ctx, "mount", "-o", mntOptionSli, from, to); err != nil { return fmt.Errorf("failed to bind-mount %s to %s: %v", from, to, err) } if readOnly { mntOptionSli += ",remount" - if err := execCommandErr("mount", "-o", mntOptionSli, to); err != nil { + if err := execCommandErr(ctx, "mount", "-o", mntOptionSli, to); err != nil { return fmt.Errorf("failed read-only remount of %s: %v", to, err) } } @@ -211,8 +212,8 @@ func bindMount(from, to string, readOnly bool, mntOptions []string) error { return nil } -func unmountVolume(mountPoint string) error { - if err := execCommandErr("umount", mountPoint); err != nil { +func unmountVolume(ctx context.Context, mountPoint string) error { + if err := execCommandErr(ctx, "umount", mountPoint); err != nil { return err } @@ -226,10 +227,10 @@ func unmountVolume(mountPoint string) error { if ok { p, err := os.FindProcess(pid) if err != nil { - klog.Warningf("failed to find process %d: %v", pid, err) + klog.Warningf(util.Log(ctx, "failed to find process %d: %v"), pid, err) } else { if _, err = p.Wait(); err != nil { - klog.Warningf("%d is not a child process: %v", pid, err) + klog.Warningf(util.Log(ctx, "%d is not a child process: %v"), pid, err) } } } diff --git a/pkg/cephfs/volumeoptions.go b/pkg/cephfs/volumeoptions.go index 9d20f7d5a..36d5fd072 100644 --- a/pkg/cephfs/volumeoptions.go +++ b/pkg/cephfs/volumeoptions.go @@ -17,6 +17,7 @@ limitations under the License. package cephfs import ( + "context" "fmt" "strconv" @@ -123,7 +124,7 @@ func getMonsAndClusterID(options map[string]string) (string, string, error) { // newVolumeOptions generates a new instance of volumeOptions from the provided // CSI request parameters -func newVolumeOptions(requestName string, size int64, volOptions, secret map[string]string) (*volumeOptions, error) { +func newVolumeOptions(ctx context.Context, requestName string, size int64, volOptions, secret map[string]string) (*volumeOptions, error) { var ( opts volumeOptions err error @@ -155,12 +156,12 @@ func newVolumeOptions(requestName string, size int64, volOptions, secret map[str } defer cr.DeleteCredentials() - opts.FscID, err = getFscID(opts.Monitors, cr, opts.FsName) + opts.FscID, err = getFscID(ctx, opts.Monitors, cr, opts.FsName) if err != nil { return nil, err } - opts.MetadataPool, err = getMetadataPool(opts.Monitors, cr, opts.FsName) + opts.MetadataPool, err = getMetadataPool(ctx, opts.Monitors, cr, opts.FsName) if err != nil { return nil, err } @@ -172,7 +173,7 @@ func newVolumeOptions(requestName string, size int64, volOptions, secret map[str // newVolumeOptionsFromVolID generates a new instance of volumeOptions and volumeIdentifier // from the provided CSI VolumeID -func newVolumeOptionsFromVolID(volID string, volOpt, secrets map[string]string) (*volumeOptions, *volumeIdentifier, error) { +func newVolumeOptionsFromVolID(ctx context.Context, volID string, volOpt, secrets map[string]string) (*volumeOptions, *volumeIdentifier, error) { var ( vi util.CSIIdentifier volOptions volumeOptions @@ -201,17 +202,17 @@ func newVolumeOptionsFromVolID(volID string, volOpt, secrets map[string]string) } defer cr.DeleteCredentials() - volOptions.FsName, err = getFsName(volOptions.Monitors, cr, volOptions.FscID) + volOptions.FsName, err = getFsName(ctx, volOptions.Monitors, cr, volOptions.FscID) if err != nil { return nil, nil, err } - volOptions.MetadataPool, err = getMetadataPool(volOptions.Monitors, cr, volOptions.FsName) + volOptions.MetadataPool, err = getMetadataPool(ctx, volOptions.Monitors, cr, volOptions.FsName) if err != nil { return nil, nil, err } - volOptions.RequestName, _, err = volJournal.GetObjectUUIDData(volOptions.Monitors, cr, + volOptions.RequestName, _, err = volJournal.GetObjectUUIDData(ctx, volOptions.Monitors, cr, volOptions.MetadataPool, vi.ObjectUUID, false) if err != nil { return nil, nil, err @@ -227,7 +228,7 @@ func newVolumeOptionsFromVolID(volID string, volOpt, secrets map[string]string) } } - volOptions.RootPath, err = getVolumeRootPathCeph(&volOptions, cr, volumeID(vid.FsSubvolName)) + volOptions.RootPath, err = getVolumeRootPathCeph(ctx, &volOptions, cr, volumeID(vid.FsSubvolName)) if err != nil { return nil, nil, err } diff --git a/pkg/csi-common/controllerserver-default.go b/pkg/csi-common/controllerserver-default.go index 710fda1b3..eecb3a7f2 100644 --- a/pkg/csi-common/controllerserver-default.go +++ b/pkg/csi-common/controllerserver-default.go @@ -19,6 +19,8 @@ package csicommon import ( "context" + "github.com/ceph/ceph-csi/pkg/util" + "github.com/container-storage-interface/spec/lib/go/csi" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -58,7 +60,7 @@ func (cs *DefaultControllerServer) GetCapacity(ctx context.Context, req *csi.Get // ControllerGetCapabilities implements the default GRPC callout. // Default supports all capabilities func (cs *DefaultControllerServer) ControllerGetCapabilities(ctx context.Context, req *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error) { - klog.V(5).Infof("Using default ControllerGetCapabilities") + klog.V(5).Infof(util.Log(ctx, "Using default ControllerGetCapabilities")) return &csi.ControllerGetCapabilitiesResponse{ Capabilities: cs.Driver.cap, diff --git a/pkg/csi-common/identityserver-default.go b/pkg/csi-common/identityserver-default.go index 2e4501ed4..cb83595ec 100644 --- a/pkg/csi-common/identityserver-default.go +++ b/pkg/csi-common/identityserver-default.go @@ -19,6 +19,8 @@ package csicommon import ( "context" + "github.com/ceph/ceph-csi/pkg/util" + "github.com/container-storage-interface/spec/lib/go/csi" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -32,7 +34,7 @@ type DefaultIdentityServer struct { // GetPluginInfo returns plugin information func (ids *DefaultIdentityServer) GetPluginInfo(ctx context.Context, req *csi.GetPluginInfoRequest) (*csi.GetPluginInfoResponse, error) { - klog.V(5).Infof("Using default GetPluginInfo") + klog.V(5).Infof(util.Log(ctx, "Using default GetPluginInfo")) if ids.Driver.name == "" { return nil, status.Error(codes.Unavailable, "Driver name not configured") @@ -55,7 +57,7 @@ func (ids *DefaultIdentityServer) Probe(ctx context.Context, req *csi.ProbeReque // GetPluginCapabilities returns plugin capabilities func (ids *DefaultIdentityServer) GetPluginCapabilities(ctx context.Context, req *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) { - klog.V(5).Infof("Using default capabilities") + klog.V(5).Infof(util.Log(ctx, "Using default capabilities")) return &csi.GetPluginCapabilitiesResponse{ Capabilities: []*csi.PluginCapability{ { diff --git a/pkg/csi-common/nodeserver-default.go b/pkg/csi-common/nodeserver-default.go index 3ef488756..67e792c59 100644 --- a/pkg/csi-common/nodeserver-default.go +++ b/pkg/csi-common/nodeserver-default.go @@ -55,7 +55,7 @@ func (ns *DefaultNodeServer) NodeExpandVolume(ctx context.Context, req *csi.Node // NodeGetInfo returns node ID func (ns *DefaultNodeServer) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) { - klog.V(5).Infof("Using default NodeGetInfo") + klog.V(5).Infof(util.Log(ctx, "Using default NodeGetInfo")) return &csi.NodeGetInfoResponse{ NodeId: ns.Driver.nodeID, @@ -64,7 +64,7 @@ func (ns *DefaultNodeServer) NodeGetInfo(ctx context.Context, req *csi.NodeGetIn // NodeGetCapabilities returns RPC unknow capability func (ns *DefaultNodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) { - klog.V(5).Infof("Using default NodeGetCapabilities") + klog.V(5).Infof(util.Log(ctx, "Using default NodeGetCapabilities")) return &csi.NodeGetCapabilitiesResponse{ Capabilities: []*csi.NodeServiceCapability{ @@ -129,31 +129,31 @@ func (ns *DefaultNodeServer) NodeGetVolumeStats(ctx context.Context, req *csi.No available, ok := (*(volMetrics.Available)).AsInt64() if !ok { - klog.Errorf("failed to fetch available bytes") + klog.Errorf(util.Log(ctx, "failed to fetch available bytes")) } capacity, ok := (*(volMetrics.Capacity)).AsInt64() if !ok { - klog.Errorf("failed to fetch capacity bytes") + klog.Errorf(util.Log(ctx, "failed to fetch capacity bytes")) return nil, status.Error(codes.Unknown, "failed to fetch capacity bytes") } used, ok := (*(volMetrics.Used)).AsInt64() if !ok { - klog.Errorf("failed to fetch used bytes") + klog.Errorf(util.Log(ctx, "failed to fetch used bytes")) } inodes, ok := (*(volMetrics.Inodes)).AsInt64() if !ok { - klog.Errorf("failed to fetch available inodes") + klog.Errorf(util.Log(ctx, "failed to fetch available inodes")) return nil, status.Error(codes.Unknown, "failed to fetch available inodes") } inodesFree, ok := (*(volMetrics.InodesFree)).AsInt64() if !ok { - klog.Errorf("failed to fetch free inodes") + klog.Errorf(util.Log(ctx, "failed to fetch free inodes")) } inodesUsed, ok := (*(volMetrics.InodesUsed)).AsInt64() if !ok { - klog.Errorf("failed to fetch used inodes") + klog.Errorf(util.Log(ctx, "failed to fetch used inodes")) } return &csi.NodeGetVolumeStatsResponse{ Usage: []*csi.VolumeUsage{ diff --git a/pkg/rbd/controllerserver.go b/pkg/rbd/controllerserver.go index ba45656fd..316d446de 100644 --- a/pkg/rbd/controllerserver.go +++ b/pkg/rbd/controllerserver.go @@ -150,7 +150,7 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol } defer func() { if err != nil { - errDefer := undoVolReservation(rbdVol, cr) + errDefer := undoVolReservation(ctx, rbdVol, cr) if errDefer != nil { klog.Warningf(util.Log(ctx, "failed undoing reservation of volume: %s (%s)"), req.GetName(), errDefer) } @@ -326,7 +326,7 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol idLk := volumeNameLocker.Lock(rbdVol.RequestName) defer volumeNameLocker.Unlock(idLk, rbdVol.RequestName) - if err := undoVolReservation(rbdVol, cr); err != nil { + if err := undoVolReservation(ctx, rbdVol, cr); err != nil { return nil, status.Error(codes.Internal, err.Error()) } return &csi.DeleteVolumeResponse{}, nil @@ -345,7 +345,7 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol return nil, status.Error(codes.Internal, err.Error()) } - if err := undoVolReservation(rbdVol, cr); err != nil { + if err := undoVolReservation(ctx, rbdVol, cr); err != nil { klog.Errorf(util.Log(ctx, "failed to remove reservation for volume (%s) with backing image (%s) (%s)"), rbdVol.RequestName, rbdVol.RbdImageName, err) return nil, status.Error(codes.Internal, err.Error()) @@ -444,7 +444,7 @@ func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS } defer func() { if err != nil { - errDefer := undoSnapReservation(rbdSnap, cr) + errDefer := undoSnapReservation(ctx, rbdSnap, cr) if errDefer != nil { klog.Warningf(util.Log(ctx, "failed undoing reservation of snapshot: %s %v"), req.GetName(), errDefer) } @@ -568,7 +568,7 @@ func (cs *ControllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteS idLk := snapshotNameLocker.Lock(rbdSnap.RequestName) defer snapshotNameLocker.Unlock(idLk, rbdSnap.RequestName) - if err = undoSnapReservation(rbdSnap, cr); err != nil { + if err = undoSnapReservation(ctx, rbdSnap, cr); err != nil { return nil, status.Error(codes.Internal, err.Error()) } return &csi.DeleteSnapshotResponse{}, nil diff --git a/pkg/rbd/rbd_journal.go b/pkg/rbd/rbd_journal.go index fcd350a00..46697ffff 100644 --- a/pkg/rbd/rbd_journal.go +++ b/pkg/rbd/rbd_journal.go @@ -114,7 +114,7 @@ func checkSnapExists(ctx context.Context, rbdSnap *rbdSnapshot, cr *util.Credent return false, err } - snapUUID, err := snapJournal.CheckReservation(rbdSnap.Monitors, cr, rbdSnap.Pool, + snapUUID, err := snapJournal.CheckReservation(ctx, rbdSnap.Monitors, cr, rbdSnap.Pool, rbdSnap.RequestName, rbdSnap.RbdImageName) if err != nil { return false, err @@ -128,7 +128,7 @@ func checkSnapExists(ctx context.Context, rbdSnap *rbdSnapshot, cr *util.Credent err = updateSnapWithImageInfo(ctx, rbdSnap, cr) if err != nil { if _, ok := err.(ErrSnapNotFound); ok { - err = snapJournal.UndoReservation(rbdSnap.Monitors, cr, rbdSnap.Pool, + err = snapJournal.UndoReservation(ctx, rbdSnap.Monitors, cr, rbdSnap.Pool, rbdSnap.RbdSnapName, rbdSnap.RequestName) return false, err } @@ -136,7 +136,7 @@ func checkSnapExists(ctx context.Context, rbdSnap *rbdSnapshot, cr *util.Credent } // found a snapshot already available, process and return its information - rbdSnap.SnapID, err = util.GenerateVolID(rbdSnap.Monitors, cr, rbdSnap.Pool, + rbdSnap.SnapID, err = util.GenerateVolID(ctx, rbdSnap.Monitors, cr, rbdSnap.Pool, rbdSnap.ClusterID, snapUUID, volIDVersion) if err != nil { return false, err @@ -162,7 +162,7 @@ func checkVolExists(ctx context.Context, rbdVol *rbdVolume, cr *util.Credentials return false, err } - imageUUID, err := volJournal.CheckReservation(rbdVol.Monitors, cr, rbdVol.Pool, + imageUUID, err := volJournal.CheckReservation(ctx, rbdVol.Monitors, cr, rbdVol.Pool, rbdVol.RequestName, "") if err != nil { return false, err @@ -179,7 +179,7 @@ func checkVolExists(ctx context.Context, rbdVol *rbdVolume, cr *util.Credentials err = updateVolWithImageInfo(ctx, rbdVol, cr) if err != nil { if _, ok := err.(ErrImageNotFound); ok { - err = volJournal.UndoReservation(rbdVol.Monitors, cr, rbdVol.Pool, + err = volJournal.UndoReservation(ctx, rbdVol.Monitors, cr, rbdVol.Pool, rbdVol.RbdImageName, rbdVol.RequestName) return false, err } @@ -195,7 +195,7 @@ func checkVolExists(ctx context.Context, rbdVol *rbdVolume, cr *util.Credentials // TODO: We should also ensure image features and format is the same // found a volume already available, process and return it! - rbdVol.VolID, err = util.GenerateVolID(rbdVol.Monitors, cr, rbdVol.Pool, + rbdVol.VolID, err = util.GenerateVolID(ctx, rbdVol.Monitors, cr, rbdVol.Pool, rbdVol.ClusterID, imageUUID, volIDVersion) if err != nil { return false, err @@ -210,13 +210,13 @@ func checkVolExists(ctx context.Context, rbdVol *rbdVolume, cr *util.Credentials // reserveSnap is a helper routine to request a rbdSnapshot name reservation and generate the // volume ID for the generated name func reserveSnap(ctx context.Context, rbdSnap *rbdSnapshot, cr *util.Credentials) error { - snapUUID, err := snapJournal.ReserveName(rbdSnap.Monitors, cr, rbdSnap.Pool, + snapUUID, err := snapJournal.ReserveName(ctx, rbdSnap.Monitors, cr, rbdSnap.Pool, rbdSnap.RequestName, rbdSnap.RbdImageName) if err != nil { return err } - rbdSnap.SnapID, err = util.GenerateVolID(rbdSnap.Monitors, cr, rbdSnap.Pool, + rbdSnap.SnapID, err = util.GenerateVolID(ctx, rbdSnap.Monitors, cr, rbdSnap.Pool, rbdSnap.ClusterID, snapUUID, volIDVersion) if err != nil { return err @@ -233,13 +233,13 @@ func reserveSnap(ctx context.Context, rbdSnap *rbdSnapshot, cr *util.Credentials // reserveVol is a helper routine to request a rbdVolume name reservation and generate the // volume ID for the generated name func reserveVol(ctx context.Context, rbdVol *rbdVolume, cr *util.Credentials) error { - imageUUID, err := volJournal.ReserveName(rbdVol.Monitors, cr, rbdVol.Pool, + imageUUID, err := volJournal.ReserveName(ctx, rbdVol.Monitors, cr, rbdVol.Pool, rbdVol.RequestName, "") if err != nil { return err } - rbdVol.VolID, err = util.GenerateVolID(rbdVol.Monitors, cr, rbdVol.Pool, + rbdVol.VolID, err = util.GenerateVolID(ctx, rbdVol.Monitors, cr, rbdVol.Pool, rbdVol.ClusterID, imageUUID, volIDVersion) if err != nil { return err @@ -254,16 +254,16 @@ func reserveVol(ctx context.Context, rbdVol *rbdVolume, cr *util.Credentials) er } // undoSnapReservation is a helper routine to undo a name reservation for rbdSnapshot -func undoSnapReservation(rbdSnap *rbdSnapshot, cr *util.Credentials) error { - err := snapJournal.UndoReservation(rbdSnap.Monitors, cr, rbdSnap.Pool, +func undoSnapReservation(ctx context.Context, rbdSnap *rbdSnapshot, cr *util.Credentials) error { + err := snapJournal.UndoReservation(ctx, rbdSnap.Monitors, cr, rbdSnap.Pool, rbdSnap.RbdSnapName, rbdSnap.RequestName) return err } // undoVolReservation is a helper routine to undo a name reservation for rbdVolume -func undoVolReservation(rbdVol *rbdVolume, cr *util.Credentials) error { - err := volJournal.UndoReservation(rbdVol.Monitors, cr, rbdVol.Pool, +func undoVolReservation(ctx context.Context, rbdVol *rbdVolume, cr *util.Credentials) error { + err := volJournal.UndoReservation(ctx, rbdVol.Monitors, cr, rbdVol.Pool, rbdVol.RbdImageName, rbdVol.RequestName) return err diff --git a/pkg/rbd/rbd_util.go b/pkg/rbd/rbd_util.go index 0db9f1f0a..011f1345a 100644 --- a/pkg/rbd/rbd_util.go +++ b/pkg/rbd/rbd_util.go @@ -298,12 +298,12 @@ func genSnapFromSnapID(ctx context.Context, rbdSnap *rbdSnapshot, snapshotID str return err } - rbdSnap.Pool, err = util.GetPoolName(rbdSnap.Monitors, cr, vi.LocationID) + rbdSnap.Pool, err = util.GetPoolName(ctx, rbdSnap.Monitors, cr, vi.LocationID) if err != nil { return err } - rbdSnap.RequestName, rbdSnap.RbdImageName, err = snapJournal.GetObjectUUIDData(rbdSnap.Monitors, + rbdSnap.RequestName, rbdSnap.RbdImageName, err = snapJournal.GetObjectUUIDData(ctx, rbdSnap.Monitors, cr, rbdSnap.Pool, vi.ObjectUUID, true) if err != nil { return err @@ -342,12 +342,12 @@ func genVolFromVolID(ctx context.Context, rbdVol *rbdVolume, volumeID string, cr return err } - rbdVol.Pool, err = util.GetPoolName(rbdVol.Monitors, cr, vi.LocationID) + rbdVol.Pool, err = util.GetPoolName(ctx, rbdVol.Monitors, cr, vi.LocationID) if err != nil { return err } - rbdVol.RequestName, _, err = volJournal.GetObjectUUIDData(rbdVol.Monitors, cr, + rbdVol.RequestName, _, err = volJournal.GetObjectUUIDData(ctx, rbdVol.Monitors, cr, rbdVol.Pool, vi.ObjectUUID, false) if err != nil { return err @@ -594,7 +594,7 @@ func deleteSnapshot(ctx context.Context, pOpts *rbdSnapshot, cr *util.Credential return errors.Wrapf(err, "failed to delete snapshot, command output: %s", string(output)) } - if err := undoSnapReservation(pOpts, cr); err != nil { + if err := undoSnapReservation(ctx, pOpts, cr); err != nil { klog.Errorf(util.Log(ctx, "failed to remove reservation for snapname (%s) with backing snap (%s) on image (%s) (%s)"), pOpts.RequestName, pOpts.RbdSnapName, pOpts.RbdImageName, err) } diff --git a/pkg/util/cephcmds.go b/pkg/util/cephcmds.go index aabc15196..034f95619 100644 --- a/pkg/util/cephcmds.go +++ b/pkg/util/cephcmds.go @@ -18,6 +18,7 @@ package util import ( "bytes" + "context" "encoding/json" "fmt" "io/ioutil" @@ -55,7 +56,7 @@ type cephStoragePoolSummary struct { } // GetPools fetches a list of pools from a cluster -func getPools(monitors string, cr *Credentials) ([]cephStoragePoolSummary, error) { +func getPools(ctx context.Context, monitors string, cr *Credentials) ([]cephStoragePoolSummary, error) { // ceph -f json osd lspools // JSON out: [{"poolnum":,"poolname":}] @@ -68,14 +69,14 @@ func getPools(monitors string, cr *Credentials) ([]cephStoragePoolSummary, error "-f", "json", "osd", "lspools") if err != nil { - klog.Errorf("failed getting pool list from cluster (%s)", err) + klog.Errorf(Log(ctx, "failed getting pool list from cluster (%s)"), err) return nil, err } var pools []cephStoragePoolSummary err = json.Unmarshal(stdout, &pools) if err != nil { - klog.Errorf("failed to parse JSON output of pool list from cluster (%s)", err) + klog.Errorf(Log(ctx, "failed to parse JSON output of pool list from cluster (%s)"), err) return nil, fmt.Errorf("unmarshal of pool list failed: %+v. raw buffer response: %s", err, string(stdout)) } @@ -84,8 +85,8 @@ func getPools(monitors string, cr *Credentials) ([]cephStoragePoolSummary, error // GetPoolID searches a list of pools in a cluster and returns the ID of the pool that matches // the passed in poolName parameter -func GetPoolID(monitors string, cr *Credentials, poolName string) (int64, error) { - pools, err := getPools(monitors, cr) +func GetPoolID(ctx context.Context, monitors string, cr *Credentials, poolName string) (int64, error) { + pools, err := getPools(ctx, monitors, cr) if err != nil { return 0, err } @@ -101,8 +102,8 @@ func GetPoolID(monitors string, cr *Credentials, poolName string) (int64, error) // GetPoolName lists all pools in a ceph cluster, and matches the pool whose pool ID is equal to // the requested poolID parameter -func GetPoolName(monitors string, cr *Credentials, poolID int64) (string, error) { - pools, err := getPools(monitors, cr) +func GetPoolName(ctx context.Context, monitors string, cr *Credentials, poolID int64) (string, error) { + pools, err := getPools(ctx, monitors, cr) if err != nil { return "", err } @@ -117,7 +118,7 @@ func GetPoolName(monitors string, cr *Credentials, poolID int64) (string, error) } // SetOMapKeyValue sets the given key and value into the provided Ceph omap name -func SetOMapKeyValue(monitors string, cr *Credentials, poolName, namespace, oMapName, oMapKey, keyValue string) error { +func SetOMapKeyValue(ctx context.Context, monitors string, cr *Credentials, poolName, namespace, oMapName, oMapKey, keyValue string) error { // Command: "rados setomapval oMapName oMapKey keyValue" args := []string{ "-m", monitors, @@ -134,8 +135,8 @@ func SetOMapKeyValue(monitors string, cr *Credentials, poolName, namespace, oMap _, _, err := ExecCommand("rados", args[:]...) if err != nil { - klog.Errorf("failed adding key (%s with value %s), to omap (%s) in "+ - "pool (%s): (%v)", oMapKey, keyValue, oMapName, poolName, err) + klog.Errorf(Log(ctx, "failed adding key (%s with value %s), to omap (%s) in "+ + "pool (%s): (%v)"), oMapKey, keyValue, oMapName, poolName, err) return err } @@ -143,12 +144,12 @@ func SetOMapKeyValue(monitors string, cr *Credentials, poolName, namespace, oMap } // GetOMapValue gets the value for the given key from the named omap -func GetOMapValue(monitors string, cr *Credentials, poolName, namespace, oMapName, oMapKey string) (string, error) { +func GetOMapValue(ctx context.Context, monitors string, cr *Credentials, poolName, namespace, oMapName, oMapKey string) (string, error) { // Command: "rados getomapval oMapName oMapKey " // No such key: replicapool/csi.volumes.directory.default/csi.volname tmpFile, err := ioutil.TempFile("", "omap-get-") if err != nil { - klog.Errorf("failed creating a temporary file for key contents") + klog.Errorf(Log(ctx, "failed creating a temporary file for key contents")) return "", err } defer tmpFile.Close() @@ -182,7 +183,7 @@ func GetOMapValue(monitors string, cr *Credentials, poolName, namespace, oMapNam } // log other errors for troubleshooting assistance - klog.Errorf("failed getting omap value for key (%s) from omap (%s) in pool (%s): (%v)", + klog.Errorf(Log(ctx, "failed getting omap value for key (%s) from omap (%s) in pool (%s): (%v)"), oMapKey, oMapName, poolName, err) return "", fmt.Errorf("error (%v) occurred, command output streams is (%s)", @@ -194,7 +195,7 @@ func GetOMapValue(monitors string, cr *Credentials, poolName, namespace, oMapNam } // RemoveOMapKey removes the omap key from the given omap name -func RemoveOMapKey(monitors string, cr *Credentials, poolName, namespace, oMapName, oMapKey string) error { +func RemoveOMapKey(ctx context.Context, monitors string, cr *Credentials, poolName, namespace, oMapName, oMapKey string) error { // Command: "rados rmomapkey oMapName oMapKey" args := []string{ "-m", monitors, @@ -212,8 +213,8 @@ func RemoveOMapKey(monitors string, cr *Credentials, poolName, namespace, oMapNa _, _, err := ExecCommand("rados", args[:]...) if err != nil { // NOTE: Missing omap key removal does not return an error - klog.Errorf("failed removing key (%s), from omap (%s) in "+ - "pool (%s): (%v)", oMapKey, oMapName, poolName, err) + klog.Errorf(Log(ctx, "failed removing key (%s), from omap (%s) in "+ + "pool (%s): (%v)"), oMapKey, oMapName, poolName, err) return err } @@ -222,7 +223,7 @@ func RemoveOMapKey(monitors string, cr *Credentials, poolName, namespace, oMapNa // CreateObject creates the object name passed in and returns ErrObjectExists if the provided object // is already present in rados -func CreateObject(monitors string, cr *Credentials, poolName, namespace, objectName string) error { +func CreateObject(ctx context.Context, monitors string, cr *Credentials, poolName, namespace, objectName string) error { // Command: "rados create objectName" args := []string{ "-m", monitors, @@ -239,7 +240,7 @@ func CreateObject(monitors string, cr *Credentials, poolName, namespace, objectN _, stderr, err := ExecCommand("rados", args[:]...) if err != nil { - klog.Errorf("failed creating omap (%s) in pool (%s): (%v)", objectName, poolName, err) + klog.Errorf(Log(ctx, "failed creating omap (%s) in pool (%s): (%v)"), objectName, poolName, err) if strings.Contains(string(stderr), "error creating "+poolName+"/"+objectName+ ": (17) File exists") { return ErrObjectExists{objectName, err} @@ -252,7 +253,7 @@ func CreateObject(monitors string, cr *Credentials, poolName, namespace, objectN // RemoveObject removes the entire omap name passed in and returns ErrObjectNotFound is provided omap // is not found in rados -func RemoveObject(monitors string, cr *Credentials, poolName, namespace, oMapName string) error { +func RemoveObject(ctx context.Context, monitors string, cr *Credentials, poolName, namespace, oMapName string) error { // Command: "rados rm oMapName" args := []string{ "-m", monitors, @@ -269,7 +270,7 @@ func RemoveObject(monitors string, cr *Credentials, poolName, namespace, oMapNam _, stderr, err := ExecCommand("rados", args[:]...) if err != nil { - klog.Errorf("failed removing omap (%s) in pool (%s): (%v)", oMapName, poolName, err) + klog.Errorf(Log(ctx, "failed removing omap (%s) in pool (%s): (%v)"), oMapName, poolName, err) if strings.Contains(string(stderr), "error removing "+poolName+">"+oMapName+ ": (2) No such file or directory") { return ErrObjectNotFound{oMapName, err} diff --git a/pkg/util/log.go b/pkg/util/log.go index bc1d2e779..21d2853cc 100644 --- a/pkg/util/log.go +++ b/pkg/util/log.go @@ -1,12 +1,9 @@ /* Copyright 2019 The Ceph-CSI Authors. - Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -28,6 +25,10 @@ var Key = contextKey("ID") // Log helps in context based logging func Log(ctx context.Context, format string) string { - a := fmt.Sprintf("ID: %v ", ctx.Value(Key)) + id := ctx.Value(Key) + if id == nil { + return format + } + a := fmt.Sprintf("ID: %v ", id) return a + format } diff --git a/pkg/util/util.go b/pkg/util/util.go index 69549003f..0ab6e7521 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -17,6 +17,7 @@ limitations under the License. package util import ( + "context" "os" "path" "strings" @@ -131,8 +132,8 @@ func ValidateDriverName(driverName string) error { // GenerateVolID generates a volume ID based on passed in parameters and version, to be returned // to the CO system -func GenerateVolID(monitors string, cr *Credentials, pool, clusterID, objUUID string, volIDVersion uint16) (string, error) { - poolID, err := GetPoolID(monitors, cr, pool) +func GenerateVolID(ctx context.Context, monitors string, cr *Credentials, pool, clusterID, objUUID string, volIDVersion uint16) (string, error) { + poolID, err := GetPoolID(ctx, monitors, cr, pool) if err != nil { return "", err } diff --git a/pkg/util/voljournal.go b/pkg/util/voljournal.go index 197192fd5..81f15dadb 100644 --- a/pkg/util/voljournal.go +++ b/pkg/util/voljournal.go @@ -17,6 +17,7 @@ limitations under the License. package util import ( + "context" "fmt" "strings" @@ -175,7 +176,7 @@ Return values: there was no reservation found - error: non-nil in case of any errors */ -func (cj *CSIJournal) CheckReservation(monitors string, cr *Credentials, pool, reqName, parentName string) (string, error) { +func (cj *CSIJournal) CheckReservation(ctx context.Context, monitors string, cr *Credentials, pool, reqName, parentName string) (string, error) { var snapSource bool if parentName != "" { @@ -187,7 +188,7 @@ func (cj *CSIJournal) CheckReservation(monitors string, cr *Credentials, pool, r } // check if request name is already part of the directory omap - objUUID, err := GetOMapValue(monitors, cr, pool, cj.namespace, cj.csiDirectory, + objUUID, err := GetOMapValue(ctx, monitors, cr, pool, cj.namespace, cj.csiDirectory, cj.csiNameKeyPrefix+reqName) if err != nil { // error should specifically be not found, for volume to be absent, any other error @@ -198,13 +199,13 @@ func (cj *CSIJournal) CheckReservation(monitors string, cr *Credentials, pool, r return "", err } - savedReqName, savedReqParentName, err := cj.GetObjectUUIDData(monitors, cr, pool, + savedReqName, savedReqParentName, err := cj.GetObjectUUIDData(ctx, monitors, cr, pool, objUUID, snapSource) if err != nil { // error should specifically be not found, for image to be absent, any other error // is not conclusive, and we should not proceed if _, ok := err.(ErrKeyNotFound); ok { - err = cj.UndoReservation(monitors, cr, pool, cj.namingPrefix+objUUID, reqName) + err = cj.UndoReservation(ctx, monitors, cr, pool, cj.namingPrefix+objUUID, reqName) } return "", err } @@ -243,23 +244,23 @@ prior to cleaning up the reservation NOTE: As the function manipulates omaps, it should be called with a lock against the request name held, to prevent parallel operations from modifying the state of the omaps for this request name. */ -func (cj *CSIJournal) UndoReservation(monitors string, cr *Credentials, pool, volName, reqName string) error { +func (cj *CSIJournal) UndoReservation(ctx context.Context, monitors string, cr *Credentials, pool, volName, reqName string) error { // delete volume UUID omap (first, inverse of create order) // TODO: Check cases where volName can be empty, and we need to just cleanup the reqName imageUUID := strings.TrimPrefix(volName, cj.namingPrefix) - err := RemoveObject(monitors, cr, pool, cj.namespace, cj.cephUUIDDirectoryPrefix+imageUUID) + err := RemoveObject(ctx, monitors, cr, pool, cj.namespace, cj.cephUUIDDirectoryPrefix+imageUUID) if err != nil { if _, ok := err.(ErrObjectNotFound); !ok { - klog.Errorf("failed removing oMap %s (%s)", cj.cephUUIDDirectoryPrefix+imageUUID, err) + klog.Errorf(Log(ctx, "failed removing oMap %s (%s)"), cj.cephUUIDDirectoryPrefix+imageUUID, err) return err } } // delete the request name key (last, inverse of create order) - err = RemoveOMapKey(monitors, cr, pool, cj.namespace, cj.csiDirectory, + err = RemoveOMapKey(ctx, monitors, cr, pool, cj.namespace, cj.csiDirectory, cj.csiNameKeyPrefix+reqName) if err != nil { - klog.Errorf("failed removing oMap key %s (%s)", cj.csiNameKeyPrefix+reqName, err) + klog.Errorf(Log(ctx, "failed removing oMap key %s (%s)"), cj.csiNameKeyPrefix+reqName, err) return err } @@ -269,7 +270,7 @@ func (cj *CSIJournal) UndoReservation(monitors string, cr *Credentials, pool, vo // reserveOMapName creates an omap with passed in oMapNamePrefix and a generated . // It ensures generated omap name does not already exist and if conflicts are detected, a set // number of retires with newer uuids are attempted before returning an error -func reserveOMapName(monitors string, cr *Credentials, pool, namespace, oMapNamePrefix string) (string, error) { +func reserveOMapName(ctx context.Context, monitors string, cr *Credentials, pool, namespace, oMapNamePrefix string) (string, error) { var iterUUID string maxAttempts := 5 @@ -278,12 +279,12 @@ func reserveOMapName(monitors string, cr *Credentials, pool, namespace, oMapName // generate a uuid for the image name iterUUID = uuid.NewUUID().String() - err := CreateObject(monitors, cr, pool, namespace, oMapNamePrefix+iterUUID) + err := CreateObject(ctx, monitors, cr, pool, namespace, oMapNamePrefix+iterUUID) if err != nil { if _, ok := err.(ErrObjectExists); ok { attempt++ // try again with a different uuid, for maxAttempts tries - klog.V(4).Infof("uuid (%s) conflict detected, retrying (attempt %d of %d)", + klog.V(4).Infof(Log(ctx, "uuid (%s) conflict detected, retrying (attempt %d of %d)"), iterUUID, attempt, maxAttempts) continue } @@ -310,7 +311,7 @@ Return values: - string: Contains the UUID that was reserved for the passed in reqName - error: non-nil in case of any errors */ -func (cj *CSIJournal) ReserveName(monitors string, cr *Credentials, pool, reqName, parentName string) (string, error) { +func (cj *CSIJournal) ReserveName(ctx context.Context, monitors string, cr *Credentials, pool, reqName, parentName string) (string, error) { var snapSource bool if parentName != "" { @@ -325,31 +326,31 @@ func (cj *CSIJournal) ReserveName(monitors string, cr *Credentials, pool, reqNam // NOTE: If any service loss occurs post creation of the UUID directory, and before // setting the request name key (csiNameKey) to point back to the UUID directory, the // UUID directory key will be leaked - volUUID, err := reserveOMapName(monitors, cr, pool, cj.namespace, cj.cephUUIDDirectoryPrefix) + volUUID, err := reserveOMapName(ctx, monitors, cr, pool, cj.namespace, cj.cephUUIDDirectoryPrefix) if err != nil { return "", err } // Create request name (csiNameKey) key in csiDirectory and store the UUId based // volume name into it - err = SetOMapKeyValue(monitors, cr, pool, cj.namespace, cj.csiDirectory, + err = SetOMapKeyValue(ctx, monitors, cr, pool, cj.namespace, cj.csiDirectory, cj.csiNameKeyPrefix+reqName, volUUID) if err != nil { return "", err } defer func() { if err != nil { - klog.Warningf("reservation failed for volume: %s", reqName) - errDefer := cj.UndoReservation(monitors, cr, pool, cj.namingPrefix+volUUID, + klog.Warningf(Log(ctx, "reservation failed for volume: %s"), reqName) + errDefer := cj.UndoReservation(ctx, monitors, cr, pool, cj.namingPrefix+volUUID, reqName) if errDefer != nil { - klog.Warningf("failed undoing reservation of volume: %s (%v)", reqName, errDefer) + klog.Warningf(Log(ctx, "failed undoing reservation of volume: %s (%v)"), reqName, errDefer) } } }() // Update UUID directory to store CSI request name - err = SetOMapKeyValue(monitors, cr, pool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID, + err = SetOMapKeyValue(ctx, monitors, cr, pool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID, cj.csiNameKey, reqName) if err != nil { return "", err @@ -357,7 +358,7 @@ func (cj *CSIJournal) ReserveName(monitors string, cr *Credentials, pool, reqNam if snapSource { // Update UUID directory to store source volume UUID in case of snapshots - err = SetOMapKeyValue(monitors, cr, pool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID, + err = SetOMapKeyValue(ctx, monitors, cr, pool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID, cj.cephSnapSourceKey, parentName) if err != nil { return "", err @@ -374,7 +375,7 @@ Return values: - string: Contains the parent image name for the passed in UUID, if it is a snapshot - error: non-nil in case of any errors */ -func (cj *CSIJournal) GetObjectUUIDData(monitors string, cr *Credentials, pool, objectUUID string, snapSource bool) (string, string, error) { +func (cj *CSIJournal) GetObjectUUIDData(ctx context.Context, monitors string, cr *Credentials, pool, objectUUID string, snapSource bool) (string, string, error) { var sourceName string if snapSource && cj.cephSnapSourceKey == "" { @@ -383,14 +384,14 @@ func (cj *CSIJournal) GetObjectUUIDData(monitors string, cr *Credentials, pool, } // TODO: fetch all omap vals in one call, than make multiple listomapvals - requestName, err := GetOMapValue(monitors, cr, pool, cj.namespace, + requestName, err := GetOMapValue(ctx, monitors, cr, pool, cj.namespace, cj.cephUUIDDirectoryPrefix+objectUUID, cj.csiNameKey) if err != nil { return "", "", err } if snapSource { - sourceName, err = GetOMapValue(monitors, cr, pool, cj.namespace, + sourceName, err = GetOMapValue(ctx, monitors, cr, pool, cj.namespace, cj.cephUUIDDirectoryPrefix+objectUUID, cj.cephSnapSourceKey) if err != nil { return "", "", err