e2e: handle ceph-csi-operator deployment changes

This commits adds e2e/operator.go containing utility
methods specific to the operator.

Signed-off-by: Praveen M <m.praveen@ibm.com>
This commit is contained in:
Praveen M 2024-10-23 13:40:52 +05:30 committed by mergify[bot]
parent 954f36520b
commit 0f6b93ed17
13 changed files with 492 additions and 185 deletions

View File

@ -46,8 +46,19 @@ var (
subvolumegroup = "e2e"
fileSystemName = "myfs"
fileSystemPoolName = "myfs-replicated"
helmCephFSPodsLabel = "ceph-csi-cephfs"
operatorCephFSDeploymentName = "cephfs.csi.ceph.com-ctrlplugin"
operatorCephFSDaemonsetName = "cephfs.csi.ceph.com-nodeplugin"
cephFSDeployment CephFSDeploymentMethod
)
type CephFSDeployment struct {
DriverInfo
}
func deployCephfsPlugin() {
// delete objects deployed by rook
@ -165,6 +176,18 @@ func validateSubvolumePath(f *framework.Framework, pvcName, pvcNamespace, fileSy
return nil
}
func NewCephFSDeployment(c clientset.Interface) CephFSDeploymentMethod {
return &CephFSDeployment{
DriverInfo: DriverInfo{
clientSet: c,
deploymentName: cephFSDeploymentName,
daemonsetName: cephFSDeamonSetName,
helmPodLabelName: helmCephFSPodsLabel,
driverContainers: []string{cephFSContainerName},
},
}
}
var _ = Describe(cephfsType, func() {
f := framework.NewDefaultFramework(cephfsType)
f.NamespacePodSecurityEnforceLevel = api.LevelPrivileged
@ -175,13 +198,20 @@ var _ = Describe(cephfsType, func() {
Skip("Skipping CephFS E2E")
}
c = f.ClientSet
if deployCephFS {
if cephCSINamespace != defaultNs {
err := createNamespace(c, cephCSINamespace)
if err != nil {
framework.Failf("failed to create namespace %s: %v", cephCSINamespace, err)
}
cephFSDeployment = NewCephFSDeployment(c)
if operatorDeployment {
cephFSDeployment = NewCephFSOperatorDeployment(c)
}
// No need to create the namespace if ceph-csi is deployed via helm or operator.
if cephCSINamespace != defaultNs && !(helmTest || operatorDeployment) {
err := createNamespace(c, cephCSINamespace)
if err != nil {
framework.Failf("failed to create namespace %s: %v", cephCSINamespace, err)
}
}
if deployCephFS {
deployCephfsPlugin()
}
err := createConfigMap(cephFSDirPath, f.ClientSet, f)
@ -208,12 +238,9 @@ var _ = Describe(cephfsType, func() {
}
deployVault(f.ClientSet, deployTimeout)
// wait for cluster name update in deployment
containers := []string{cephFSContainerName}
err = waitForContainersArgsUpdate(c, cephCSINamespace, cephFSDeploymentName,
"clustername", defaultClusterName, containers, deployTimeout)
err = cephFSDeployment.setClusterName(defaultClusterName)
if err != nil {
framework.Failf("timeout waiting for deployment update %s/%s: %v", cephCSINamespace, cephFSDeploymentName, err)
framework.Failf("failed to set cluster name: %v", err)
}
err = createSubvolumegroup(f, fileSystemName, subvolumegroup)
@ -226,13 +253,14 @@ var _ = Describe(cephfsType, func() {
if !testCephFS || upgradeTesting {
Skip("Skipping CephFS E2E")
}
if CurrentSpecReport().Failed() {
// log pods created by helm chart
logsCSIPods("app=ceph-csi-cephfs", c)
logsCSIPods("app="+helmCephFSPodsLabel, c)
// log provisioner
logsCSIPods("app=csi-cephfsplugin-provisioner", c)
logsCSIPods("app="+cephFSDeployment.getDeploymentName(), c)
// log node plugin
logsCSIPods("app=csi-cephfsplugin", c)
logsCSIPods("app="+cephFSDeployment.getDaemonsetName(), c)
// log all details from the namespace where Ceph-CSI is deployed
e2edebug.DumpAllNamespaceInfo(context.TODO(), c, cephCSINamespace)
@ -266,11 +294,12 @@ var _ = Describe(cephfsType, func() {
if deployCephFS {
deleteCephfsPlugin()
if cephCSINamespace != defaultNs {
err = deleteNamespace(c, cephCSINamespace)
if err != nil {
framework.Failf("failed to delete namespace %s: %v", cephCSINamespace, err)
}
}
// No need to delete the namespace if ceph-csi is deployed via helm or operator.
if cephCSINamespace != defaultNs && !(helmTest || operatorDeployment) {
err = deleteNamespace(c, cephCSINamespace)
if err != nil {
framework.Failf("failed to delete namespace %s: %v", cephCSINamespace, err)
}
}
})
@ -299,16 +328,16 @@ var _ = Describe(cephfsType, func() {
}
By("checking provisioner deployment is running", func() {
err := waitForDeploymentComplete(f.ClientSet, cephFSDeploymentName, cephCSINamespace, deployTimeout)
err := waitForDeploymentComplete(f.ClientSet, cephFSDeployment.getDeploymentName(), cephCSINamespace, deployTimeout)
if err != nil {
framework.Failf("timeout waiting for deployment %s: %v", cephFSDeploymentName, err)
framework.Failf("timeout waiting for deployment %s: %v", cephFSDeployment.getDeploymentName(), err)
}
})
By("checking nodeplugin daemonset pods are running", func() {
err := waitForDaemonSets(cephFSDeamonSetName, cephCSINamespace, f.ClientSet, deployTimeout)
err := waitForDaemonSets(cephFSDeployment.getDaemonsetName(), cephCSINamespace, f.ClientSet, deployTimeout)
if err != nil {
framework.Failf("timeout waiting for daemonset %s: %v", cephFSDeamonSetName, err)
framework.Failf("timeout waiting for daemonset %s: %v", cephFSDeployment.getDaemonsetName(), err)
}
})
@ -338,7 +367,7 @@ var _ = Describe(cephfsType, func() {
}
err = verifySeLinuxMountOption(f, pvcPath, appPath,
cephFSDeamonSetName, cephFSContainerName, cephCSINamespace)
cephFSDeployment.getDaemonsetName(), cephFSContainerName, cephCSINamespace)
if err != nil {
framework.Failf("failed to verify mount options: %v", err)
}
@ -764,7 +793,7 @@ var _ = Describe(cephfsType, func() {
}
}
// Kill ceph-fuse in cephfs-csi node plugin Pods.
nodePluginSelector, err := getDaemonSetLabelSelector(f, cephCSINamespace, cephFSDeamonSetName)
nodePluginSelector, err := getDaemonSetLabelSelector(f, cephCSINamespace, cephFSDeployment.getDaemonsetName())
if err != nil {
framework.Failf("failed to get node plugin DaemonSet label selector: %v", err)
}
@ -2498,20 +2527,11 @@ var _ = Describe(cephfsType, func() {
framework.Failf("failed to create configmap: %v", err)
}
// delete csi pods
err = deletePodWithLabel("app in (ceph-csi-cephfs, csi-cephfsplugin, csi-cephfsplugin-provisioner)",
cephCSINamespace, false)
// restart csi pods for the configmap to take effect.
err = recreateCSIPods(f, cephFSDeployment.getPodSelector(),
cephFSDeployment.getDaemonsetName(), cephFSDeployment.getDeploymentName())
if err != nil {
framework.Failf("failed to delete pods with labels: %v", err)
}
// wait for csi pods to come up
err = waitForDaemonSets(cephFSDeamonSetName, cephCSINamespace, f.ClientSet, deployTimeout)
if err != nil {
framework.Failf("timeout waiting for daemonset pods: %v", err)
}
err = waitForDeploymentComplete(f.ClientSet, cephFSDeploymentName, cephCSINamespace, deployTimeout)
if err != nil {
framework.Failf("timeout waiting for deployment pods: %v", err)
framework.Failf("failed to recreate cephfs csi pods: %v", err)
}
}

View File

@ -161,7 +161,7 @@ func unmountCephFSVolume(f *framework.Framework, appName, pvcName string) error
stdErr, err := execCommandInDaemonsetPod(
f,
cmd,
cephFSDeamonSetName,
cephFSDeployment.getDaemonsetName(),
pod.Spec.NodeName,
cephFSContainerName,
cephCSINamespace)
@ -396,7 +396,7 @@ func validateEncryptedCephfs(f *framework.Framework, pvName, appName string) err
pod.UID,
pvName)
selector, err := getDaemonSetLabelSelector(f, cephCSINamespace, cephFSDeamonSetName)
selector, err := getDaemonSetLabelSelector(f, cephCSINamespace, cephFSDeployment.getDaemonsetName())
if err != nil {
return fmt.Errorf("failed to get labels: %w", err)
}

View File

@ -52,6 +52,7 @@ func init() {
flag.StringVar(&fileSystemName, "filesystem", "myfs", "CephFS filesystem to use")
flag.StringVar(&clusterID, "clusterid", "", "Ceph cluster ID to use (defaults to `ceph fsid` detection)")
flag.StringVar(&nfsDriverName, "nfs-driver", "nfs.csi.ceph.com", "name of the driver for NFS-volumes")
flag.BoolVar(&operatorDeployment, "operator-deployment", false, "test running on deployment via operator")
setDefaultKubeconfig()
// Register framework flags, then handle flags
@ -91,4 +92,8 @@ func handleFlags() {
testNFS = testCephFS
deployNFS = deployCephFS
}
if operatorDeployment {
cephCSINamespace = "ceph-csi-operator-system"
}
}

View File

@ -63,7 +63,8 @@ func generateClusterIDConfigMapForMigration(f *framework.Framework, c kubernetes
return fmt.Errorf("failed to create configmap: %w", err)
}
// restart csi pods for the configmap to take effect.
err = recreateCSIPods(f, rbdPodLabels, rbdDaemonsetName, rbdDeploymentName)
err = recreateCSIPods(f,
rbdDeployment.getPodSelector(), rbdDeployment.getDaemonsetName(), rbdDeployment.getDeploymentName())
if err != nil {
return fmt.Errorf("failed to recreate rbd csi pods: %w", err)
}

View File

@ -50,9 +50,20 @@ var (
// FIXME: some tests change the subvolumegroup to "e2e".
defaultSubvolumegroup = "csi"
helmNFSPodsLabel = "ceph-csi-nfs"
operatorNFSDeploymentName = "nfs.csi.ceph.com-ctrlplugin"
operatorNFSDaemonsetName = "nfs.csi.ceph.com-nodeplugin"
nfsDeployment NFSDeploymentMethod
)
func deployNFSPlugin(f *framework.Framework) {
type NFSDeployment struct {
DriverInfo
}
func deployNFSPlugin() {
// delete objects deployed by rook
err := deleteResource(nfsDirPath + nfsProvisionerRBAC)
@ -65,13 +76,6 @@ func deployNFSPlugin(f *framework.Framework) {
framework.Failf("failed to delete nodeplugin rbac %s: %v", nfsDirPath+nfsNodePluginRBAC, err)
}
// the pool should not be deleted, as it may contain configurations
// from non-e2e related CephNFS objects
err = createPool(f, nfsPoolName)
if err != nil {
framework.Failf("failed to create pool for NFS config %q: %v", nfsPoolName, err)
}
createORDeleteNFSResources(kubectlCreate)
}
@ -79,6 +83,30 @@ func deleteNFSPlugin() {
createORDeleteNFSResources(kubectlDelete)
}
func createNFSPool(f *framework.Framework) {
// the pool should not be deleted, as it may contain configurations
// from non-e2e related CephNFS objects
err := createPool(f, nfsPoolName)
if err != nil {
framework.Failf("failed to create pool for NFS config %q: %v", nfsPoolName, err)
}
resources := []ResourceDeployer{
// NFS server deployment
&yamlResourceNamespaced{
filename: nfsExamplePath + nfsRookCephNFS,
namespace: rookNamespace,
},
}
for _, r := range resources {
err := r.Do(kubectlCreate)
if err != nil {
framework.Failf("failed to %s resource: %v", kubectlCreate, err)
}
}
}
func createORDeleteNFSResources(action kubectlAction) {
cephConfigFile := getConfigFile(cephConfconfigMap, deployPath, examplePath)
resources := []ResourceDeployer{
@ -221,7 +249,7 @@ func unmountNFSVolume(f *framework.Framework, appName, pvcName string) error {
stdErr, err := execCommandInDaemonsetPod(
f,
cmd,
nfsDeamonSetName,
nfsDeployment.getDaemonsetName(),
pod.Spec.NodeName,
"csi-nfsplugin", // name of the container
cephCSINamespace)
@ -242,14 +270,36 @@ var _ = Describe("nfs", func() {
Skip("Skipping NFS E2E")
}
c = f.ClientSet
if deployNFS {
if cephCSINamespace != defaultNs {
err := createNamespace(c, cephCSINamespace)
if err != nil {
framework.Failf("failed to create namespace %s: %v", cephCSINamespace, err)
}
nfsDeployment = &NFSDeployment{
DriverInfo: DriverInfo{
clientSet: c,
deploymentName: nfsDeploymentName,
daemonsetName: nfsDeamonSetName,
driverContainers: []string{nfsContainerName},
},
}
if operatorDeployment {
nfsDeployment = &OperatorDeployment{
DriverInfo: DriverInfo{
clientSet: c,
deploymentName: operatorNFSDeploymentName,
daemonsetName: operatorNFSDaemonsetName,
helmPodLabelName: helmNFSPodsLabel,
driverContainers: []string{nfsContainerName},
},
}
deployNFSPlugin(f)
}
// No need to create the namespace if ceph-csi is deployed via operator.
if cephCSINamespace != defaultNs && !operatorDeployment {
err := createNamespace(c, cephCSINamespace)
if err != nil {
framework.Failf("failed to create namespace %s: %v", cephCSINamespace, err)
}
}
createNFSPool(f)
if deployNFS {
deployNFSPlugin()
}
// cephfs testing might have changed the default subvolumegroup
@ -287,13 +337,14 @@ var _ = Describe("nfs", func() {
if !testNFS || upgradeTesting {
Skip("Skipping NFS E2E")
}
if CurrentSpecReport().Failed() {
// log pods created by helm chart
logsCSIPods("app=ceph-csi-nfs", c)
logsCSIPods("app="+helmNFSPodsLabel, c)
// log provisioner
logsCSIPods("app=csi-nfsplugin-provisioner", c)
logsCSIPods("app="+nfsDeployment.getDeploymentName(), c)
// log node plugin
logsCSIPods("app=csi-nfsplugin", c)
logsCSIPods("app="+nfsDeployment.getDaemonsetName(), c)
// log all details from the namespace where Ceph-CSI is deployed
e2edebug.DumpAllNamespaceInfo(context.TODO(), c, cephCSINamespace)
@ -325,11 +376,12 @@ var _ = Describe("nfs", func() {
if deployNFS {
deleteNFSPlugin()
if cephCSINamespace != defaultNs {
err = deleteNamespace(c, cephCSINamespace)
if err != nil {
framework.Failf("failed to delete namespace %s: %v", cephCSINamespace, err)
}
}
// No need to delete the namespace if ceph-csi is deployed via operator.
if cephCSINamespace != defaultNs && !operatorDeployment {
err = deleteNamespace(c, cephCSINamespace)
if err != nil {
framework.Failf("failed to delete namespace %s: %v", cephCSINamespace, err)
}
}
})
@ -356,16 +408,16 @@ var _ = Describe("nfs", func() {
}
By("checking provisioner deployment is running", func() {
err := waitForDeploymentComplete(f.ClientSet, nfsDeploymentName, cephCSINamespace, deployTimeout)
err := waitForDeploymentComplete(f.ClientSet, nfsDeployment.getDeploymentName(), cephCSINamespace, deployTimeout)
if err != nil {
framework.Failf("timeout waiting for deployment %s: %v", nfsDeploymentName, err)
framework.Failf("timeout waiting for deployment %s: %v", nfsDeployment.getDeploymentName(), err)
}
})
By("checking nodeplugin deamonset pods are running", func() {
err := waitForDaemonSets(nfsDeamonSetName, cephCSINamespace, f.ClientSet, deployTimeout)
err := waitForDaemonSets(nfsDeployment.getDaemonsetName(), cephCSINamespace, f.ClientSet, deployTimeout)
if err != nil {
framework.Failf("timeout waiting for daemonset %s: %v", nfsDeamonSetName, err)
framework.Failf("timeout waiting for daemonset %s: %v", nfsDeployment.getDaemonsetName(), err)
}
})
@ -376,7 +428,7 @@ var _ = Describe("nfs", func() {
}
err = verifySeLinuxMountOption(f, pvcPath, appPath,
nfsDeamonSetName, nfsContainerName, cephCSINamespace)
nfsDeployment.getDaemonsetName(), nfsContainerName, cephCSINamespace)
if err != nil {
framework.Failf("failed to verify mount options: %v", err)
}

126
e2e/operator.go Normal file
View File

@ -0,0 +1,126 @@
/*
Copyright 2025 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package e2e
import (
"encoding/json"
"fmt"
clientset "k8s.io/client-go/kubernetes"
)
const (
OperatorConfigName = "ceph-csi-operator-config"
)
type OperatorDeployment struct {
DriverInfo
}
func NewRBDOperatorDeployment(c clientset.Interface) RBDDeploymentMethod {
return &OperatorDeployment{
DriverInfo: DriverInfo{
clientSet: c,
deploymentName: operatorRBDDeploymentName,
daemonsetName: operatorRBDDaemonsetName,
driverContainers: rbdContainersName,
},
}
}
func NewCephFSOperatorDeployment(c clientset.Interface) CephFSDeploymentMethod {
return &OperatorDeployment{
DriverInfo: DriverInfo{
clientSet: c,
deploymentName: operatorCephFSDeploymentName,
daemonsetName: operatorCephFSDaemonsetName,
driverContainers: []string{cephFSContainerName},
},
}
}
func (r *OperatorDeployment) getPodSelector() string {
return fmt.Sprintf("app in (%s, %s, %s, %s, %s)", helmRBDPodsLabel, helmCephFSPodsLabel, helmNFSPodsLabel,
r.deploymentName, r.daemonsetName)
}
func (OperatorDeployment) setEnableMetadata(value bool) error {
command := []string{
"operatorconfigs.csi.ceph.io",
OperatorConfigName,
"--type=merge",
"-p",
fmt.Sprintf(`{"spec": {"driverSpecDefaults": {"enableMetadata": %t}}}`, value),
}
// Patch the operator config
err := retryKubectlArgs(cephCSINamespace, kubectlPatch, deployTimeout, command...)
if err != nil {
return err
}
return nil
}
func (OperatorDeployment) setClusterName(value string) error {
command := []string{
"operatorconfigs.csi.ceph.io",
OperatorConfigName,
"--type=merge",
"-p",
fmt.Sprintf(`{"spec": {"driverSpecDefaults": {"clusterName": %q}}}`, value),
}
// Patch the operator config
err := retryKubectlArgs(cephCSINamespace, kubectlPatch, deployTimeout, command...)
if err != nil {
return fmt.Errorf("failed to set cluster name: %w", err)
}
return nil
}
func (OperatorDeployment) setDomainLabels(labels []string) error {
// Define the patch operations
patchOps := []map[string]interface{}{
{"op": "add", "path": "/spec/driverSpecDefaults/nodePlugin", "value": map[string]interface{}{}},
{"op": "add", "path": "/spec/driverSpecDefaults/nodePlugin/topology", "value": map[string]interface{}{}},
{"op": "add", "path": "/spec/driverSpecDefaults/nodePlugin/topology/domainLabels", "value": labels},
}
// Serialize to JSON
patchJSON, err := json.Marshal(patchOps)
if err != nil {
return fmt.Errorf("failed to marshal patch JSON: %w", err)
}
command := []string{
"operatorconfigs.csi.ceph.io",
OperatorConfigName,
"--type=json",
"-p",
string(patchJSON),
}
// Patch the operator config
err = retryKubectlArgs(cephCSINamespace, kubectlPatch, deployTimeout, command...)
if err != nil {
return fmt.Errorf("failed to set domain labels: %w", err)
}
return nil
}

View File

@ -433,7 +433,6 @@ func deletePod(name, ns string, c kubernetes.Interface, t int) error {
})
}
//nolint:unparam // currently skipNotFound is always false, this can change in the future
func deletePodWithLabel(label, ns string, skipNotFound bool) error {
err := retryKubectlArgs(
ns,

View File

@ -110,8 +110,34 @@ var (
volSnapNameKey = "csi.storage.k8s.io/volumesnapshot/name"
volSnapNamespaceKey = "csi.storage.k8s.io/volumesnapshot/namespace"
volSnapContentNameKey = "csi.storage.k8s.io/volumesnapshotcontent/name"
helmRBDPodsLabel = "ceph-csi-rbd"
operatorRBDDeploymentName = "rbd.csi.ceph.com-ctrlplugin"
operatorRBDDaemonsetName = "rbd.csi.ceph.com-nodeplugin"
rbdContainersName = []string{"csi-rbdplugin", "csi-rbdplugin-controller"}
rbdDeployment RBDDeploymentMethod
)
type RBDDeployment struct {
DriverInfo
}
func (r *RBDDeployment) setDomainLabels(labels []string) error {
return nil
}
func (r *RBDDeployment) setEnableMetadata(value bool) error {
err := waitForContainersArgsUpdate(r.clientSet, cephCSINamespace, r.deploymentName,
"setmetadata", strconv.FormatBool(value), r.driverContainers, deployTimeout)
if err != nil {
return fmt.Errorf("timeout waiting for setmetadata arg update %s/%s: %v", cephCSINamespace, r.deploymentName, err)
}
return nil
}
func deployRBDPlugin() {
// delete objects deployed by rook
data, err := replaceNamespaceInTemplate(rbdDirPath + rbdProvisionerRBAC)
@ -294,6 +320,18 @@ func ByFileAndBlockEncryption(
})
}
func NewRBDDeployment(c clientset.Interface) RBDDeploymentMethod {
return &RBDDeployment{
DriverInfo: DriverInfo{
clientSet: c,
deploymentName: rbdDeploymentName,
daemonsetName: rbdDaemonsetName,
helmPodLabelName: helmRBDPodsLabel,
driverContainers: rbdContainersName,
},
}
}
var _ = Describe("RBD", func() {
f := framework.NewDefaultFramework(rbdType)
f.NamespacePodSecurityEnforceLevel = api.LevelPrivileged
@ -305,7 +343,20 @@ var _ = Describe("RBD", func() {
Skip("Skipping RBD E2E")
}
c = f.ClientSet
if deployRBD {
rbdDeployment = NewRBDDeployment(c)
if operatorDeployment {
rbdDeployment = NewRBDOperatorDeployment(c)
}
// No need to create the namespace if ceph-csi is deployed via helm or operator.
if cephCSINamespace != defaultNs && !(helmTest || operatorDeployment) {
err := createNamespace(c, cephCSINamespace)
if err != nil {
framework.Failf("failed to create namespace: %v", err)
}
}
// helm script already adds node labels
if !helmTest {
err := addLabelsToNodes(f, map[string]string{
nodeRegionLabel: regionValue,
nodeZoneLabel: zoneValue,
@ -315,12 +366,8 @@ var _ = Describe("RBD", func() {
if err != nil {
framework.Failf("failed to add node labels: %v", err)
}
if cephCSINamespace != defaultNs {
err = createNamespace(c, cephCSINamespace)
if err != nil {
framework.Failf("failed to create namespace: %v", err)
}
}
}
if deployRBD {
deployRBDPlugin()
}
err := createConfigMap(rbdDirPath, f.ClientSet, f)
@ -356,18 +403,18 @@ var _ = Describe("RBD", func() {
deployVault(f.ClientSet, deployTimeout)
// wait for provisioner deployment
err = waitForDeploymentComplete(f.ClientSet, rbdDeploymentName, cephCSINamespace, deployTimeout)
err = waitForDeploymentComplete(f.ClientSet, rbdDeployment.getDeploymentName(), cephCSINamespace, deployTimeout)
if err != nil {
framework.Failf("timeout waiting for deployment %s: %v", rbdDeploymentName, err)
framework.Failf("timeout waiting for deployment %s: %v", rbdDeployment.getDeploymentName(), err)
}
// wait for nodeplugin deamonset pods
err = waitForDaemonSets(rbdDaemonsetName, cephCSINamespace, f.ClientSet, deployTimeout)
err = waitForDaemonSets(rbdDeployment.getDaemonsetName(), cephCSINamespace, f.ClientSet, deployTimeout)
if err != nil {
framework.Failf("timeout waiting for daemonset %s: %v", rbdDaemonsetName, err)
framework.Failf("timeout waiting for daemonset %s: %v", rbdDeployment.getDaemonsetName(), err)
}
kernelRelease, err = getKernelVersionFromDaemonset(f, cephCSINamespace, rbdDaemonsetName, "csi-rbdplugin")
kernelRelease, err = getKernelVersionFromDaemonset(f, cephCSINamespace, rbdDeployment.getDaemonsetName(), rbdContainerName)
if err != nil {
framework.Failf("failed to get the kernel version: %v", err)
}
@ -376,12 +423,14 @@ var _ = Describe("RBD", func() {
nbdMapOptions = "nbd:debug-rbd=20,io-timeout=330"
}
// wait for cluster name update in deployment
containers := []string{"csi-rbdplugin", "csi-rbdplugin-controller"}
err = waitForContainersArgsUpdate(c, cephCSINamespace, rbdDeploymentName,
"clustername", defaultClusterName, containers, deployTimeout)
err = rbdDeployment.setDomainLabels([]string{nodeRegionLabel, nodeZoneLabel})
if err != nil {
framework.Failf("timeout waiting for deployment update %s/%s: %v", cephCSINamespace, rbdDeploymentName, err)
framework.Failf("failed to set domain labels: %v", err)
}
err = rbdDeployment.setClusterName(defaultClusterName)
if err != nil {
framework.Failf("failed to set cluster name: %v", err)
}
})
@ -389,13 +438,14 @@ var _ = Describe("RBD", func() {
if !testRBD || upgradeTesting {
Skip("Skipping RBD E2E")
}
if CurrentSpecReport().Failed() {
// log pods created by helm chart
logsCSIPods("app=ceph-csi-rbd", c)
logsCSIPods("app="+helmRBDPodsLabel, c)
// log provisioner
logsCSIPods("app=csi-rbdplugin-provisioner", c)
logsCSIPods("app="+rbdDeployment.getDeploymentName(), c)
// log node plugin
logsCSIPods("app=csi-rbdplugin", c)
logsCSIPods("app="+rbdDeployment.getDaemonsetName(), c)
// log all details from the namespace where Ceph-CSI is deployed
e2edebug.DumpAllNamespaceInfo(context.TODO(), c, cephCSINamespace)
@ -425,11 +475,12 @@ var _ = Describe("RBD", func() {
deleteVault()
if deployRBD {
deleteRBDPlugin()
if cephCSINamespace != defaultNs {
err = deleteNamespace(c, cephCSINamespace)
if err != nil {
framework.Failf("failed to delete namespace: %v", err)
}
}
// No need to delete the namespace if ceph-csi is deployed via helm or operator.
if cephCSINamespace != defaultNs && !(helmTest || operatorDeployment) {
err = deleteNamespace(c, cephCSINamespace)
if err != nil {
framework.Failf("failed to delete namespace: %v", err)
}
}
err = deleteNodeLabels(c, []string{
@ -480,7 +531,7 @@ var _ = Describe("RBD", func() {
By("verify readAffinity support", func() {
err := verifyReadAffinity(f, pvcPath, appPath,
rbdDaemonsetName, rbdContainerName, cephCSINamespace)
rbdDeployment.getDaemonsetName(), rbdContainerName, cephCSINamespace)
if err != nil {
framework.Failf("failed to verify readAffinity: %v", err)
}
@ -488,7 +539,7 @@ var _ = Describe("RBD", func() {
By("verify mountOptions support", func() {
err := verifySeLinuxMountOption(f, pvcPath, appPath,
rbdDaemonsetName, rbdContainerName, cephCSINamespace)
rbdDeployment.getDaemonsetName(), rbdContainerName, cephCSINamespace)
if err != nil {
framework.Failf("failed to verify mount options: %v", err)
}
@ -1911,7 +1962,7 @@ var _ = Describe("RBD", func() {
validateRBDImageCount(f, 1, defaultRBDPool)
validateOmapCount(f, 1, rbdType, defaultRBDPool, volumesType)
selector, err := getDaemonSetLabelSelector(f, cephCSINamespace, rbdDaemonsetName)
selector, err := getDaemonSetLabelSelector(f, cephCSINamespace, rbdDeployment.getDaemonsetName())
if err != nil {
framework.Failf("failed to get the labels: %v", err)
}
@ -1922,7 +1973,7 @@ var _ = Describe("RBD", func() {
}
// wait for nodeplugin pods to come up
err = waitForDaemonSets(rbdDaemonsetName, cephCSINamespace, f.ClientSet, deployTimeout)
err = waitForDaemonSets(rbdDeployment.getDaemonsetName(), cephCSINamespace, f.ClientSet, deployTimeout)
if err != nil {
framework.Failf("timeout waiting for daemonset pods: %v", err)
}
@ -2847,12 +2898,16 @@ var _ = Describe("RBD", func() {
validateRBDImageCount(f, 1, defaultRBDPool)
validateOmapCount(f, 1, rbdType, defaultRBDPool, volumesType)
// delete rbd nodeplugin pods
err = deletePodWithLabel("app=csi-rbdplugin", cephCSINamespace, false)
selector, err := getDaemonSetLabelSelector(f, cephCSINamespace, rbdDeployment.getDaemonsetName())
if err != nil {
framework.Failf("failed to get the labels: %v", err)
}
err = deletePodWithLabel(selector, cephCSINamespace, false)
if err != nil {
framework.Failf("fail to delete pod: %v", err)
}
// wait for nodeplugin pods to come up
err = waitForDaemonSets(rbdDaemonsetName, cephCSINamespace, f.ClientSet, deployTimeout)
err = waitForDaemonSets(rbdDeployment.getDaemonsetName(), cephCSINamespace, f.ClientSet, deployTimeout)
if err != nil {
framework.Failf("timeout waiting for daemonset pods: %v", err)
}
@ -3926,20 +3981,11 @@ var _ = Describe("RBD", func() {
if err != nil {
framework.Failf("failed to create rados namespace: %v", err)
}
// delete csi pods
err = deletePodWithLabel("app in (ceph-csi-rbd, csi-rbdplugin, csi-rbdplugin-provisioner)",
cephCSINamespace, false)
// restart csi pods for the configmap to take effect.
err = recreateCSIPods(f,
rbdDeployment.getPodSelector(), rbdDeployment.getDaemonsetName(), rbdDeployment.getDeploymentName())
if err != nil {
framework.Failf("failed to delete pods with labels: %v", err)
}
// wait for csi pods to come up
err = waitForDaemonSets(rbdDaemonsetName, cephCSINamespace, f.ClientSet, deployTimeout)
if err != nil {
framework.Failf("timeout waiting for daemonset pods: %v", err)
}
err = waitForDeploymentComplete(f.ClientSet, rbdDeploymentName, cephCSINamespace, deployTimeout)
if err != nil {
framework.Failf("timeout waiting for deployment to be in running state: %v", err)
framework.Failf("failed to recreate rbd csi pods: %v", err)
}
}
@ -5023,13 +5069,12 @@ var _ = Describe("RBD", func() {
validateOmapCount(f, 1, rbdType, defaultRBDPool, volumesType)
validateOmapCount(f, 1, rbdType, defaultRBDPool, snapsType)
// wait for cluster name update in deployment
containers := []string{"csi-rbdplugin", "csi-rbdplugin-controller"}
err = waitForContainersArgsUpdate(c, cephCSINamespace, rbdDeploymentName,
"setmetadata", "false", containers, deployTimeout)
err = rbdDeployment.setEnableMetadata(false)
if err != nil {
framework.Failf("timeout waiting for deployment update %s/%s: %v", cephCSINamespace, rbdDeploymentName, err)
framework.Failf("failed to update setmetadata arg in %s/%s: %v",
cephCSINamespace, rbdDeployment.getDeploymentName(), err)
}
pvcSmartClone, err := loadPVC(pvcSmartClonePath)
if err != nil {
framework.Failf("failed to load PVC: %v", err)
@ -5128,11 +5173,11 @@ var _ = Describe("RBD", func() {
validateRBDImageCount(f, 0, defaultRBDPool)
validateOmapCount(f, 0, rbdType, defaultRBDPool, volumesType)
validateOmapCount(f, 0, rbdType, defaultRBDPool, snapsType)
// wait for cluster name update in deployment
err = waitForContainersArgsUpdate(c, cephCSINamespace, rbdDeploymentName,
"setmetadata", "true", containers, deployTimeout)
err = rbdDeployment.setEnableMetadata(true)
if err != nil {
framework.Failf("timeout waiting for deployment update %s/%s: %v", cephCSINamespace, rbdDeploymentName, err)
framework.Failf("failed to update setmetadata arg in %s/%s: %v",
cephCSINamespace, rbdDeployment.getDeploymentName(), err)
}
})

View File

@ -647,7 +647,7 @@ func validateEncryptedImage(f *framework.Framework, rbdImageSpec, pvName, appNam
"/var/lib/kubelet/pods/%s/volumes/kubernetes.io~csi/%s/mount",
pod.UID,
pvName)
selector, err := getDaemonSetLabelSelector(f, cephCSINamespace, rbdDaemonsetName)
selector, err := getDaemonSetLabelSelector(f, cephCSINamespace, rbdDeployment.getDaemonsetName())
if err != nil {
return fmt.Errorf("failed to get labels: %w", err)
}
@ -672,7 +672,7 @@ func validateEncryptedFilesystem(f *framework.Framework, rbdImageSpec, pvName, a
pod.UID,
pvName)
selector, err := getDaemonSetLabelSelector(f, cephCSINamespace, rbdDaemonsetName)
selector, err := getDaemonSetLabelSelector(f, cephCSINamespace, rbdDeployment.getDaemonsetName())
if err != nil {
return fmt.Errorf("failed to get labels: %w", err)
}
@ -708,7 +708,7 @@ func validateEncryptedFilesystem(f *framework.Framework, rbdImageSpec, pvName, a
// librbd.so.* in a ceph-csi container. If this function is available,
// VolumeGroupSnapshot support is available.
func librbdSupportsVolumeGroupSnapshot(f *framework.Framework) (bool, error) {
selector, err := getDaemonSetLabelSelector(f, cephCSINamespace, rbdDaemonsetName)
selector, err := getDaemonSetLabelSelector(f, cephCSINamespace, rbdDeployment.getDaemonsetName())
if err != nil {
return false, fmt.Errorf("failed to get labels: %w", err)
}

View File

@ -61,7 +61,12 @@ var _ = Describe("CephFS Upgrade Testing", func() {
Skip("Skipping CephFS Upgrade Test")
}
c = f.ClientSet
if cephCSINamespace != defaultNs {
cephFSDeployment = NewCephFSDeployment(c)
if operatorDeployment {
cephFSDeployment = NewCephFSOperatorDeployment(c)
}
// No need to create the namespace if ceph-csi is deployed via helm or operator.
if cephCSINamespace != defaultNs && !(helmTest || operatorDeployment) {
err = createNamespace(c, cephCSINamespace)
if err != nil {
framework.Failf("failed to create namespace: %v", err)
@ -154,13 +159,12 @@ var _ = Describe("CephFS Upgrade Testing", func() {
deleteVault()
if deployCephFS {
deleteCephfsPlugin()
if cephCSINamespace != defaultNs {
err = deleteNamespace(c, cephCSINamespace)
if err != nil {
if err != nil {
framework.Failf("failed to delete namespace: %v", err)
}
}
}
// No need to delete the namespace if ceph-csi is deployed via helm or operator.
if cephCSINamespace != defaultNs && !(helmTest || operatorDeployment) {
err = deleteNamespace(c, cephCSINamespace)
if err != nil {
framework.Failf("failed to delete namespace %s: %v", cephCSINamespace, err)
}
}
})
@ -172,15 +176,15 @@ var _ = Describe("CephFS Upgrade Testing", func() {
It("Cephfs Upgrade Test", func() {
By("checking provisioner deployment is running", func() {
err = waitForDeploymentComplete(f.ClientSet, cephFSDeploymentName, cephCSINamespace, deployTimeout)
err = waitForDeploymentComplete(f.ClientSet, cephFSDeployment.getDeploymentName(), cephCSINamespace, deployTimeout)
if err != nil {
framework.Failf("timeout waiting for deployment %s: %v", cephFSDeploymentName, err)
framework.Failf("timeout waiting for deployment %s: %v", cephFSDeployment.getDeploymentName(), err)
}
})
By("checking nodeplugin deamonset pods are running", func() {
err = waitForDaemonSets(cephFSDeamonSetName, cephCSINamespace, f.ClientSet, deployTimeout)
err = waitForDaemonSets(cephFSDeployment.getDaemonsetName(), cephCSINamespace, f.ClientSet, deployTimeout)
if err != nil {
framework.Failf("timeout waiting for daemonset %s: %v", cephFSDeamonSetName, err)
framework.Failf("timeout waiting for daemonset %s: %v", cephFSDeployment.getDaemonsetName(), err)
}
})
@ -269,14 +273,14 @@ var _ = Describe("CephFS Upgrade Testing", func() {
}
deployCephfsPlugin()
err = waitForDeploymentComplete(f.ClientSet, cephFSDeploymentName, cephCSINamespace, deployTimeout)
err = waitForDeploymentComplete(f.ClientSet, cephFSDeployment.getDeploymentName(), cephCSINamespace, deployTimeout)
if err != nil {
framework.Failf("timeout waiting for upgraded deployment %s: %v", cephFSDeploymentName, err)
framework.Failf("timeout waiting for upgraded deployment %s: %v", cephFSDeployment.getDeploymentName(), err)
}
err = waitForDaemonSets(cephFSDeamonSetName, cephCSINamespace, f.ClientSet, deployTimeout)
err = waitForDaemonSets(cephFSDeployment.getDaemonsetName(), cephCSINamespace, f.ClientSet, deployTimeout)
if err != nil {
framework.Failf("timeout waiting for upgraded daemonset %s: %v", cephFSDeamonSetName, err)
framework.Failf("timeout waiting for upgraded daemonset %s: %v", cephFSDeployment.getDaemonsetName(), err)
}
app.Labels = label

View File

@ -57,7 +57,12 @@ var _ = Describe("RBD Upgrade Testing", func() {
Skip("Skipping RBD Upgrade Testing")
}
c = f.ClientSet
if cephCSINamespace != defaultNs {
rbdDeployment = NewRBDDeployment(c)
if operatorDeployment {
rbdDeployment = NewRBDOperatorDeployment(c)
}
// No need to create the namespace if ceph-csi is deployed via helm or operator.
if cephCSINamespace != defaultNs && !(helmTest || operatorDeployment) {
err := createNamespace(c, cephCSINamespace)
if err != nil {
framework.Failf("failed to create namespace: %v", err)
@ -158,11 +163,12 @@ var _ = Describe("RBD Upgrade Testing", func() {
deleteVault()
if deployRBD {
deleteRBDPlugin()
if cephCSINamespace != defaultNs {
err = deleteNamespace(c, cephCSINamespace)
if err != nil {
framework.Failf("failed to delete namespace: %v", err)
}
}
// No need to delete the namespace if ceph-csi is deployed via helm or operator.
if cephCSINamespace != defaultNs && !(helmTest || operatorDeployment) {
err = deleteNamespace(c, cephCSINamespace)
if err != nil {
framework.Failf("failed to delete namespace: %v", err)
}
}
err = deleteNodeLabels(c, []string{
@ -184,16 +190,16 @@ var _ = Describe("RBD Upgrade Testing", func() {
appPath := rbdExamplePath + "pod.yaml"
By("checking provisioner deployment is running", func() {
err := waitForDeploymentComplete(f.ClientSet, rbdDeploymentName, cephCSINamespace, deployTimeout)
err := waitForDeploymentComplete(f.ClientSet, rbdDeployment.getDeploymentName(), cephCSINamespace, deployTimeout)
if err != nil {
framework.Failf("timeout waiting for deployment %s: %v", rbdDeploymentName, err)
framework.Failf("timeout waiting for deployment %s: %v", rbdDeployment.getDeploymentName(), err)
}
})
By("checking nodeplugin deamonset pods are running", func() {
err := waitForDaemonSets(rbdDaemonsetName, cephCSINamespace, f.ClientSet, deployTimeout)
err := waitForDaemonSets(rbdDeployment.getDaemonsetName(), cephCSINamespace, f.ClientSet, deployTimeout)
if err != nil {
framework.Failf("timeout waiting for daemonset %s: %v", rbdDaemonsetName, err)
framework.Failf("timeout waiting for daemonset %s: %v", rbdDeployment.getDaemonsetName(), err)
}
})
@ -277,14 +283,14 @@ var _ = Describe("RBD Upgrade Testing", func() {
deployRBDPlugin()
err = waitForDeploymentComplete(f.ClientSet, rbdDeploymentName, cephCSINamespace, deployTimeout)
err = waitForDeploymentComplete(f.ClientSet, rbdDeployment.getDeploymentName(), cephCSINamespace, deployTimeout)
if err != nil {
framework.Failf("timeout waiting for upgraded deployment %s: %v", rbdDeploymentName, err)
framework.Failf("timeout waiting for upgraded deployment %s: %v", rbdDeployment.getDeploymentName(), err)
}
err = waitForDaemonSets(rbdDaemonsetName, cephCSINamespace, f.ClientSet, deployTimeout)
err = waitForDaemonSets(rbdDeployment.getDaemonsetName(), cephCSINamespace, f.ClientSet, deployTimeout)
if err != nil {
framework.Failf("timeout waiting for upgraded daemonset %s: %v", rbdDaemonsetName, err)
framework.Failf("timeout waiting for upgraded daemonset %s: %v", rbdDeployment.getDaemonsetName(), err)
}
// validate if the app gets bound to a pvc created by

View File

@ -67,10 +67,7 @@ const (
appLabel = "write-data-in-pod"
appCloneLabel = "app-clone"
noError = ""
// labels/selector used to list/delete rbd pods.
rbdPodLabels = "app in (ceph-csi-rbd, csi-rbdplugin, csi-rbdplugin-provisioner)"
noError = ""
exitOneErr = "command terminated with exit code 1"
// cluster Name, set by user.
@ -80,26 +77,27 @@ const (
var (
// cli flags.
deployTimeout int
deployCephFS bool
deployRBD bool
deployNFS bool
testCephFS bool
testCephFSFscrypt bool
testRBD bool
testRBDFSCrypt bool
testNBD bool
testNFS bool
helmTest bool
upgradeTesting bool
upgradeVersion string
cephCSINamespace string
rookNamespace string
radosNamespace string
poll = 2 * time.Second
isOpenShift bool
clusterID string
nfsDriverName string
deployTimeout int
deployCephFS bool
deployRBD bool
deployNFS bool
testCephFS bool
testCephFSFscrypt bool
testRBD bool
testRBDFSCrypt bool
testNBD bool
testNFS bool
helmTest bool
upgradeTesting bool
upgradeVersion string
cephCSINamespace string
rookNamespace string
radosNamespace string
poll = 2 * time.Second
isOpenShift bool
clusterID string
nfsDriverName string
operatorDeployment bool
)
type cephfsFilesystem struct {
@ -107,6 +105,57 @@ type cephfsFilesystem struct {
MetadataPool string `json:"metadata_pool"`
}
type DeploymentMethod interface {
getDeploymentName() string
getDaemonsetName() string
getPodSelector() string
setClusterName(clusterName string) error
}
type RBDDeploymentMethod interface {
DeploymentMethod
setDomainLabels(labels []string) error
setEnableMetadata(value bool) error
}
type CephFSDeploymentMethod interface {
DeploymentMethod
}
type NFSDeploymentMethod interface {
DeploymentMethod
}
var _ DeploymentMethod = &DriverInfo{}
type DriverInfo struct {
clientSet kubernetes.Interface
deploymentName string
daemonsetName string
helmPodLabelName string
driverContainers []string
}
func (d *DriverInfo) getDeploymentName() string {
return d.deploymentName
}
func (d *DriverInfo) getDaemonsetName() string {
return d.daemonsetName
}
func (d *DriverInfo) getPodSelector() string {
return fmt.Sprintf("app in (%s, %s, %s)", d.helmPodLabelName, d.deploymentName, d.daemonsetName)
}
func (d *DriverInfo) setClusterName(clusterName string) error {
err := waitForContainersArgsUpdate(d.clientSet, cephCSINamespace, d.deploymentName,
"clustername", clusterName, d.driverContainers, deployTimeout)
if err != nil {
return fmt.Errorf("timeout waiting for clustername arg update %s/%s: %v", cephCSINamespace, d.deploymentName, err)
}
return nil
}
// listCephFSFileSystems list CephFS filesystems in json format.
func listCephFSFileSystems(f *framework.Framework) ([]cephfsFilesystem, error) {
var fsList []cephfsFilesystem
@ -1643,6 +1692,8 @@ const (
kubectlCreate = kubectlAction("create")
// kubectlDelete tells retryKubectlInput() to run "delete".
kubectlDelete = kubectlAction("delete")
// kubectlPatch tells retryKubectlInput() to run "patch".
kubectlPatch = kubectlAction("patch")
)
// String returns the string format of the kubectlAction, this is automatically
@ -1733,8 +1784,6 @@ func retryKubectlFile(namespace string, action kubectlAction, filename string, t
// retryKubectlArgs takes a namespace and action telling kubectl what to do
// with the passed arguments. This function retries until no error occurred, or
// the timeout passed.
//
//nolint:unparam // retryKubectlArgs will be used with kubectlDelete arg later on.
func retryKubectlArgs(namespace string, action kubectlAction, t int, args ...string) error {
timeout := time.Duration(t) * time.Minute
args = append([]string{string(action)}, args...)
@ -1758,7 +1807,7 @@ func retryKubectlArgs(namespace string, action kubectlAction, t int, args ...str
args,
int(time.Since(start).Seconds()))
return false, fmt.Errorf("failed to run kubectl: %w", err)
return false, fmt.Errorf("failed to run kubectl: %v, error: %w", args, err)
}
return true, nil

View File

@ -169,7 +169,7 @@ spec:
- name: PLUGIN_ROLE
value: csi-kubernetes
- name: SERVICE_ACCOUNTS
value: rbd-csi-nodeplugin,rbd-csi-provisioner,csi-rbdplugin,csi-rbdplugin-provisioner,cephfs-csi-nodeplugin,cephfs-csi-provisioner,csi-cephfsplugin,csi-cephfsplugin-provisioner
value: rbd-csi-nodeplugin,rbd-csi-provisioner,csi-rbdplugin,csi-rbdplugin-provisioner,cephfs-csi-nodeplugin,cephfs-csi-provisioner,csi-cephfsplugin,csi-cephfsplugin-provisioner,ceph-csi-operator-rbd-ctrlplugin-sa,ceph-csi-operator-rbd-nodeplugin-sa,ceph-csi-operator-cephfs-ctrlplugin-sa,ceph-csi-operator-cephfs-nodeplugin-sa
- name: SERVICE_ACCOUNTS_NAMESPACE
value: default
- name: VAULT_ADDR