From 1566da53c1317dad23705dc2b954f01f7f2cabf7 Mon Sep 17 00:00:00 2001 From: Hung Nguyen Date: Mon, 24 Feb 2025 23:25:43 +0000 Subject: [PATCH 1/2] update RAIDing and validation steps --- cmd/gce-pd-csi-driver/main.go | 85 ++++++--- pkg/common/constants.go | 7 + pkg/common/runcmd.go | 12 +- pkg/gce-pd-csi-driver/cache.go | 280 ++++++++++++++++++++-------- pkg/gce-pd-csi-driver/controller.go | 2 +- pkg/gce-pd-csi-driver/node.go | 6 +- test/e2e/tests/setup_e2e_test.go | 5 +- test/remote/client-wrappers.go | 11 +- 8 files changed, 293 insertions(+), 115 deletions(-) diff --git a/cmd/gce-pd-csi-driver/main.go b/cmd/gce-pd-csi-driver/main.go index 458b45619..e33fa18ef 100644 --- a/cmd/gce-pd-csi-driver/main.go +++ b/cmd/gce-pd-csi-driver/main.go @@ -27,12 +27,8 @@ import ( "strings" "time" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" "k8s.io/klog/v2" "k8s.io/utils/strings/slices" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common" "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/deviceutils" gce "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/compute" @@ -102,9 +98,7 @@ var ( ) const ( - driverName = "pd.csi.storage.gke.io" - dataCacheLabel = "datacache-storage-gke-io" - dataCacheLabelValue = "enabled" + driverName = "pd.csi.storage.gke.io" ) func init() { @@ -350,29 +344,74 @@ func urlFlag(target **url.URL, name string, usage string) { }) } +func fetchLssdsForRaiding(lssdCount int) ([]string, error) { + allLssds, err := driver.FetchAllLssds() + if err != nil { + return nil, fmt.Errorf("Error listing all LSSDs %v", err) + } + + raidedLssds, err := driver.FetchRaidedLssds() + if err != nil { + return nil, fmt.Errorf("Error listing RAIDed LSSDs %v", err) + } + + unRaidedLssds := []string{} + for _, l := range allLssds { + if !slices.Contains(raidedLssds, l) { + unRaidedLssds = append(unRaidedLssds, l) + } + if len(unRaidedLssds) == lssdCount { + break + } + } + + LSSDsWithEmptyMountPoint, err := driver.FetchLSSDsWihtEmptyMountPoint() + if err != nil { + return nil, fmt.Errorf("Error listing LSSDs with empty mountpoint: %v", err) + } + + // We need to ensure the disks to be used for Datacache are both unRAIDed & not containing mountpoints for ephemeral storage already + availableLssds := slices.Filter(nil, unRaidedLssds, func(e string) bool { + return slices.Contains(LSSDsWithEmptyMountPoint, e) + }) + + if len(availableLssds) == 0 { + return nil, fmt.Errorf("No LSSDs available to set up caching") + } + + if len(availableLssds) < lssdCount { + return nil, fmt.Errorf("Not enough LSSDs available to set up caching. Available LSSDs: %v, wanted LSSDs: %v", len(availableLssds), lssdCount) + } + return availableLssds, nil +} + func setupDataCache(ctx context.Context, nodeName string) error { - klog.V(2).Infof("Setting up data cache for node %s", nodeName) + isAlreadyRaided, err := driver.IsRaided() + if err != nil { + klog.V(2).Infof("Errored while scanning for available LocalSSDs err:%v; continuing Raiding", err) + } else if isAlreadyRaided { + klog.V(2).Infof("Local SSDs are already RAIDed. Skipping Datacache setup.") + return nil + } + + lssdCount := common.LocalSSDCountForDataCache if nodeName != common.TestNode { - cfg, err := rest.InClusterConfig() - if err != nil { - return err - } - kubeClient, err := kubernetes.NewForConfig(cfg) - if err != nil { - return err + var err error + lssdCount, err = driver.GetDataCacheCountFromNodeLabel(ctx, nodeName) + if lssdCount == 0 { + klog.Infof("Datacache is not enabled on node %v", nodeName) + return nil } - node, err := kubeClient.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{}) if err != nil { - // We could retry, but this error will also crashloop the driver which may be as good a way to retry as any. return err } - if val, found := node.GetLabels()[dataCacheLabel]; !found || val != dataCacheLabelValue { - klog.V(2).Infof("Datacache not enabled for node %s; node label %s=%s and not %s", nodeName, dataCacheLabel, val, dataCacheLabelValue) - return nil - } } - klog.V(2).Info("Raiding local ssds to setup data cache") - if err := driver.RaidLocalSsds(); err != nil { + lssdNames, err := fetchLssdsForRaiding(lssdCount) + if err != nil { + klog.Fatalf("Failed to get sufficient SSDs for Datacache's caching setup: %v", err) + } + klog.V(2).Infof("Raiding local ssds to setup data cache: %v", lssdNames) + if err := driver.RaidLocalSsds(lssdNames); err != nil { return fmt.Errorf("Failed to Raid local SSDs, unable to setup data caching, got error %v", err) } diff --git a/pkg/common/constants.go b/pkg/common/constants.go index a9f070f82..f5a5dcf5f 100644 --- a/pkg/common/constants.go +++ b/pkg/common/constants.go @@ -44,4 +44,11 @@ const ( ContexLocalSsdCacheSize = "local-ssd-cache-size" // Node name for E2E tests TestNode = "test-node-csi-e2e" + + // Default LSSD count for datacache E2E tests + LocalSSDCountForDataCache = 2 + + // Node label for datacache + NodeLabelPrefix = "cloud.google.com/%s" + DataCacheLssdCountLabel = "gke-data-cache-disk" ) diff --git a/pkg/common/runcmd.go b/pkg/common/runcmd.go index 71240d2a9..39457dfb1 100644 --- a/pkg/common/runcmd.go +++ b/pkg/common/runcmd.go @@ -16,7 +16,7 @@ const ( // RunCommand wraps a k8s exec to deal with the no child process error. Same as exec.CombinedOutput. // On error, the output is included so callers don't need to echo it again. -func RunCommand(pipeCmd string, pipeCmdArg string, cmd1 string, execCmdArgs ...string) ([]byte, error) { +func RunCommand(pipeCmd string, pipeCmdArg []string, cmd1 string, execCmdArgs ...string) ([]byte, error) { execCmd1 := exec.Command(cmd1, execCmdArgs...) if pipeCmd != "" { @@ -47,9 +47,9 @@ func checkError(err error, execCmd exec.Cmd) error { } return err } -func execPipeCommand(pipeCmd string, pipeCmdArg string, execCmd1 *exec.Cmd) ([]byte, error) { +func execPipeCommand(pipeCmd string, pipeCmdArg []string, execCmd1 *exec.Cmd) ([]byte, error) { - execPipeCmd := exec.Command(pipeCmd, pipeCmdArg) + execPipeCmd := exec.Command(pipeCmd, pipeCmdArg...) stdoutPipe, err := execCmd1.StdoutPipe() if err != nil { klog.Errorf("failed command %v: got error:%v", execCmd1, err) @@ -63,8 +63,12 @@ func execPipeCommand(pipeCmd string, pipeCmdArg string, execCmd1 *exec.Cmd) ([]b execPipeCmd.Stdin = stdoutPipe output, err := execPipeCmd.CombinedOutput() if err != nil { + // Some commands (such as grep) will return an error with exit status of 1 + if len(output) == 0 && err.(*exec.ExitError).ExitCode() == 1 { + return output, nil + } err = checkError(err, *execPipeCmd) - return nil, fmt.Errorf("%s failed: %w; output: %s", pipeCmd, err, string(output)) + return nil, fmt.Errorf("%s failed: %w; output: %s", execPipeCmd, err, string(output)) } return output, nil diff --git a/pkg/gce-pd-csi-driver/cache.go b/pkg/gce-pd-csi-driver/cache.go index d9bd5454f..032bf68ae 100644 --- a/pkg/gce-pd-csi-driver/cache.go +++ b/pkg/gce-pd-csi-driver/cache.go @@ -1,42 +1,57 @@ package gceGCEDriver import ( + "context" "fmt" "regexp" "strconv" "strings" csi "github.com/container-storage-interface/spec/lib/go/csi" - + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" "k8s.io/klog/v2" - "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common" ) const ( - cacheSuffix = "csi-fast" - mainLvSuffix = "csi-main" - raidedLocalSsdName = "csi-driver-data-cache" - raidMode = "0" - raidedLssdPrefix = "/dev/md/" + cacheSuffix = "csi-fast" + mainLvSuffix = "csi-main" + raidedLocalSsdName = "csi-driver-data-cache" + raidMode = "0" + initialRaidedLocalSsdPath = "/dev/md0" ) -var raidedLocalSsdPath = raidedLssdPrefix + raidedLocalSsdName +func fetchRAIDedLocalSsdPath() (string, error) { + args := []string{ + "--detail", + "--scan", + } + info, err := common.RunCommand("grep", []string{raidedLocalSsdName}, "mdadm", args...) + if err != nil || len(info) == 0 { + return "", fmt.Errorf("Error getting RAIDed device path for Datacache %v, output:%v ===============", err, string(info)) + } + infoString := strings.TrimSpace(string(info)) + infoSlice := strings.Split(infoString, " ") + + // We want to get the second element in the array, which is the path to the RAIDed device + return infoSlice[1], nil +} func setupCaching(devicePath string, req *csi.NodeStageVolumeRequest, nodeId string) (string, error) { + + // The device path may have changed after rebooting, so we need to fetch the path again + raidedLocalSsdPath, err := fetchRAIDedLocalSsdPath() + if err != nil { + return "", err + } + volumeId := req.GetVolumeId() volumeGroupName := getVolumeGroupName(nodeId) mainDevicePath := "/dev/" + volumeGroupName + "/" + getLvName(mainLvSuffix, volumeId) mainLvName := getLvName(mainLvSuffix, volumeId) klog.V(2).Infof("Volume group available on node %v ", volumeGroupName) - - info, err := common.RunCommand("grep", raidedLocalSsdName, "ls", raidedLssdPrefix) - if err != nil { - klog.Errorf("failed while listing raided devices, err: %v, output:%v", err, info) - } - infoString := strings.TrimSpace(string(info)) - raidedLocalSsdPath = raidedLssdPrefix + infoString - vgExists := checkVgExists(volumeGroupName) if vgExists { // Clean up Volume Group before adding the PD @@ -55,22 +70,24 @@ func setupCaching(devicePath string, req *csi.NodeStageVolumeRequest, nodeId str "-o", "vg_name", } - info, err = common.RunCommand("" /* pipedCmd */, "" /* pipedCmdArg */, "pvs", args...) + info, err := common.RunCommand("" /* pipedCmd */, nil /* pipedCmdArg */, "pvs", args...) if err != nil { klog.Errorf("errored while checking physical volume details %v: %s", err, info) // On error info contains the error message which we cannot use for further steps info = nil } - infoString = strings.TrimSpace(strings.ReplaceAll(string(info), "\n", " ")) + infoString := strings.TrimSpace(strings.ReplaceAll(string(info), "\n", " ")) infoString = strings.ReplaceAll(infoString, ".", "") infoString = strings.ReplaceAll(infoString, "\"", "") infoSlice := strings.Split(strings.TrimSpace(infoString), " ") vgNameForPv := strings.TrimSpace(infoSlice[(len(infoSlice) - 1)]) + klog.V(2).Infof("============================== Physical volume is part of Volume group: %v ==============================", vgNameForPv) if vgNameForPv == volumeGroupName { - klog.V(2).Infof("Physical Volume(PV) already exists in the Volume Group %v", volumeGroupName) + klog.V(2).Infof("============================== Physical Volume(PV) already exists in the Volume Group ==============================") } else if vgNameForPv != "VG" && vgNameForPv != "" { - info, err = common.RunCommand("" /* pipedCmd */, "" /* pipedCmdArg */, "vgchange", []string{"-an", vgNameForPv}...) + + info, err = common.RunCommand("" /* pipedCmd */, nil /* pipedCmdArg */, "vgchange", []string{"-an", vgNameForPv}...) if err != nil { klog.Errorf("Errored while deactivating VG %v: err: %v: %s", vgNameForPv, err, info) } @@ -78,6 +95,7 @@ func setupCaching(devicePath string, req *csi.NodeStageVolumeRequest, nodeId str reduceVolumeGroup(vgNameForPv, false) _, isCached := isCachingSetup(mainLvName) // We will continue to uncache even if it errors to check caching as it is not a terminal issue. + if isCached { // Uncache LV args = []string{ @@ -86,20 +104,20 @@ func setupCaching(devicePath string, req *csi.NodeStageVolumeRequest, nodeId str "--force", "-y", // force remove cache without flushing data } - info, err = common.RunCommand("" /* pipedCmd */, "" /* pipedCmdArg */, "lvconvert", args...) + info, err = common.RunCommand("" /* pipedCmd */, nil /* pipedCmdArg */, "lvconvert", args...) if err != nil { return "", fmt.Errorf("errored while uncaching main LV. %v: %s", err, info) } // CLean up volume group to remove any dangling PV refrences reduceVolumeGroup(vgNameForPv, false) } - info, err = common.RunCommand("" /* pipedCmd */, "" /* pipedCmdArg */, "vgmerge", []string{volumeGroupName, vgNameForPv}...) + info, err = common.RunCommand("" /* pipedCmd */, nil /* pipedCmdArg */, "vgmerge", []string{volumeGroupName, vgNameForPv}...) if err != nil { return "", fmt.Errorf("Errored while merging the PV Volume group %s into %s %v: %s", vgNameForPv, volumeGroupName, err, info) } } else { - info, err := common.RunCommand("" /* pipedCmd */, "" /* pipedCmdArg */, "vgextend", []string{volumeGroupName, devicePath}...) + info, err := common.RunCommand("" /* pipedCmd */, nil /* pipedCmdArg */, "vgextend", []string{volumeGroupName, devicePath}...) if err != nil { return "", fmt.Errorf("Errored while extending Volume group to add PV %v, error: %v: %s", devicePath, err, info) } @@ -112,7 +130,7 @@ func setupCaching(devicePath string, req *csi.NodeStageVolumeRequest, nodeId str "-o", "lv_name", } - lvList, err := common.RunCommand("" /* pipedCmd */, "" /* pipedCmdArg */, "lvs", args...) + lvList, err := common.RunCommand("" /* pipedCmd */, nil /* pipedCmdArg */, "lvs", args...) if err != nil { return mainDevicePath, fmt.Errorf("Errored while checking logical volume for the device %s %w: %s", devicePath, err, info) } @@ -126,7 +144,7 @@ func setupCaching(devicePath string, req *csi.NodeStageVolumeRequest, nodeId str volumeGroupName, devicePath, } - info, err = common.RunCommand("" /* pipedCmd */, "" /* pipedCmdArg */, "lvcreate", args...) + info, err = common.RunCommand("" /* pipedCmd */, nil /* pipedCmdArg */, "lvcreate", args...) if err != nil { return mainDevicePath, fmt.Errorf("Errored setting up logical volume for the volume %s %w: %s", devicePath, err, info) } @@ -141,7 +159,7 @@ func setupCaching(devicePath string, req *csi.NodeStageVolumeRequest, nodeId str // Validate that cache is setup for required size klog.V(2).Infof("Assuming valid data cache size and mode, resizing cache is not supported") } else { - fastCacheSize := req.GetPublishContext()[common.ContexLocalSsdCacheSize] + fastCacheSize := req.GetPublishContext()[common.ContextDataCacheSize] chunkSize := "960" // Cannot use default chunk size(64KiB) as it errors on maxChunksAllowed. Unit - KiB args = []string{ "--yes", @@ -153,7 +171,7 @@ func setupCaching(devicePath string, req *csi.NodeStageVolumeRequest, nodeId str volumeGroupName, raidedLocalSsdPath, } - info, err = common.RunCommand("" /* pipedCmd */, "" /* pipedCmdArg */, "lvcreate", args...) + info, err = common.RunCommand("" /* pipedCmd */, nil /* pipedCmdArg */, "lvcreate", args...) if err != nil { return mainDevicePath, fmt.Errorf("Errored while creating cache %w: %s", err, info) } @@ -174,14 +192,14 @@ func setupCaching(devicePath string, req *csi.NodeStageVolumeRequest, nodeId str "--force", "-y", } - info, err = common.RunCommand("" /* pipedCmd */, "" /* pipedCmdArg */, "lvconvert", args...) + info, err = common.RunCommand("" /* pipedCmd */, nil /* pipedCmdArg */, "lvconvert", args...) if err != nil { return mainDevicePath, fmt.Errorf("Errored while setting up caching for volume %s %w: %s", devicePath, err, info) } } // activate all the LVs in the Volume group - info, err = common.RunCommand("" /* pipedCmd */, "" /* pipedCmdArg */, "vgchange", []string{"-ay", volumeGroupName}...) + info, err = common.RunCommand("" /* pipedCmd */, nil /* pipedCmdArg */, "vgchange", []string{"-ay", volumeGroupName}...) if err != nil { // The logical volumes would not be accessible if the group is not activated return mainDevicePath, fmt.Errorf("Failed to activate volume group %v %v:%s", volumeGroupName, err, info) @@ -189,9 +207,142 @@ func setupCaching(devicePath string, req *csi.NodeStageVolumeRequest, nodeId str return mainDevicePath, nil } +func ValidateDataCacheConfig(dataCacheMode string, datacacheSize string, ctx context.Context, nodeName string) error { + if dataCacheMode != "" && datacacheSize != "" { + isAlreadyRaided, err := IsRaided() + if err != nil { + return fmt.Errorf("Local SSDs are not setup for caching; got error: %v", err) + } + if !isAlreadyRaided { + return fmt.Errorf("Local SSDs are not setup for caching") + } + return nil + } + klog.Infof("Data cache is not enabled for PVC") + return nil +} + +func GetDataCacheCountFromNodeLabel(ctx context.Context, nodeName string) (int, error) { + if nodeName == common.TestNode { + return common.LocalSSDCountForDataCache, nil + } + cfg, err := rest.InClusterConfig() + // We want to capture API errors with node label fetching, so return -1 + // in those cases instead of 0. + if err != nil { + return -1, err + } + kubeClient, err := kubernetes.NewForConfig(cfg) + if err != nil { + return -1, err + } + node, err := kubeClient.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{}) + if err != nil { + // We could retry, but this error will also crashloop the driver which may be as good a way to retry as any. + return -1, err + } + if val, found := node.GetLabels()[fmt.Sprintf(common.NodeLabelPrefix, common.DataCacheLssdCountLabel)]; found { + dataCacheCount, err := strconv.Atoi(val) + if err != nil { + return -1, fmt.Errorf("Error getting Datacache's LSSD count from node label: %v", err) + } + klog.Infof("Number of local SSDs requested for Datacache: %v", dataCacheCount) + return dataCacheCount, nil + } + return 0, fmt.Errorf("Cannot get Datacache's LSSD count from node label") +} + +func FetchRaidedLssdCountForDatacache() (int, error) { + args := []string{ + "--detail", + initialRaidedLocalSsdPath, + } + info, err := common.RunCommand("grep", []string{"Raid Devices"}, "mdadm", args...) + if err != nil { + return 0, fmt.Errorf("Error getting RAIDed devices for Datacache") + } + if len(info) != 0 { + raidedDeviceInfo := strings.Split(strings.TrimSpace(string(info)), ":") + // raidedDeviceInfo should be in "Raid Devices : X" format + raidedDeviceCount, _ := strconv.Atoi(strings.TrimSpace(raidedDeviceInfo[1])) + return raidedDeviceCount, nil + } + return 0, nil +} + +func FetchRaidedLssds() ([]string, error) { + raidedLssdList := []string{} + + args := []string{ + "--detail", + "--scan", + "--export", + } + + info, err := common.RunCommand("grep", []string{"/dev"}, "mdadm", args...) + if err != nil { + return nil, fmt.Errorf("error fetching RAIDed LSSDs: %v; err:%v", info, err) + } + + if len(info) != 0 { + infoList := strings.Split(strings.TrimSpace(string(info)), "\n") + for _, ssd := range infoList { + ssdInfo := strings.TrimSpace(ssd) + // SSD name comes after "=" on each output line (e.g. MD_DEVICE_dev_nvme3n1_DEV=/dev/nvme3n1) + ssdName := strings.Split(ssdInfo, "=")[1] + raidedLssdList = append(raidedLssdList, ssdName) + } + } + + klog.V(2).Infof("Raided NVME list %v", raidedLssdList) + + return raidedLssdList, nil +} + +func FetchAllLssds() ([]string, error) { + diskList := []string{} + + info, err := common.RunCommand("" /* pipedCmd */, nil /* pipeCmdArg */, "lsblk", []string{"-o", "NAME,MODEL", "-p", "-d", "-n"}...) + if err != nil { + return nil, fmt.Errorf("errored while fetching NVME disks info: %v; err:%v", info, err) + } + infoList := strings.Split(strings.TrimSpace(string(info)), "\n") + re, err := regexp.Compile("nvme_card([0-9]+)?$") + if err != nil { + klog.V(2).ErrorS(err, "Errored while compiling to check PD or LSSD") + } + for _, ssd := range infoList { + ssd = strings.TrimSpace(ssd) + if strings.HasPrefix(ssd, "/dev/nvme") { + ssdDetails := strings.Split(ssd, " ") + lssd := re.MatchString(ssdDetails[1]) + if lssd { + diskList = append(diskList, strings.TrimSpace(ssdDetails[0])) + } + } + } + + klog.V(2).Infof("NVME list %v", diskList) + + return diskList, nil +} + +func FetchLSSDsWihtEmptyMountPoint() ([]string, error) { + info, err := common.RunCommand("grep", []string{"-E", `^\S+\s*$`} /* pipeCmdArg */, "lsblk", []string{"-o", "NAME,MOUNTPOINT", "-pdn"}...) + if err != nil { + return nil, fmt.Errorf("Error while fetching disks with no mount point: %v; err:%v", info, err) + } + infoList := strings.Split(string(info), "\n") + diskList := []string{} + for _, ssd := range infoList { + diskList = append(diskList, strings.TrimSpace(ssd)) + } + return diskList, nil +} + func checkVgExists(volumeGroupName string) bool { args := []string{} - info, err := common.RunCommand("" /* pipedCmd */, "" /* pipedCmdArg */, "vgscan", args...) + info, err := common.RunCommand("" /* pipedCmd */, nil /* pipedCmdArg */, "vgscan", args...) if err != nil { klog.Errorf("Errored while checking if volume group exists %v: %s", err, info) return false @@ -212,7 +363,7 @@ func cleanupCache(volumeId string, nodeId string) error { "-an", "/dev/" + volumeGroupName + "/" + mainLvName, } - info, err := common.RunCommand("" /* pipedCmd */, "" /* pipedCmdArg */, "lvchange", args...) + info, err := common.RunCommand("" /* pipedCmd */, nil /* pipedCmdArg */, "lvchange", args...) if err != nil { return fmt.Errorf("Failed to deactivate volume for uncaching %s %v: %s", volumeId, err, info) } @@ -221,7 +372,7 @@ func cleanupCache(volumeId string, nodeId string) error { volumeGroupName + "/" + mainLvName, "-y", } - info, err = common.RunCommand("" /* pipedCmd */, "" /* pipedCmdArg */, "lvconvert", args...) + info, err = common.RunCommand("" /* pipedCmd */, nil /* pipedCmdArg */, "lvconvert", args...) if err != nil { return fmt.Errorf("Failed to uncache volume %s %w: %s", volumeId, err, info) } @@ -249,14 +400,14 @@ func createVg(volumeGroupName string, devicePath string, raidedLocalSsds string) raidedLocalSsds, "-v", } - info, err := common.RunCommand("" /* pipedCmd */, "" /* pipedCmdArg */, "vgcreate", args...) + info, err := common.RunCommand("" /* pipedCmd */, nil /* pipedCmdArg */, "vgcreate", args...) if err != nil { return fmt.Errorf("Volume group creation failed %w: %s", err, info) } klog.Infof("Volume group creation succeeded for %v", volumeGroupName) args = []string{} - info, err = common.RunCommand("" /* pipedCmd */, "" /* pipedCmdArg */, "vgscan", args...) + info, err = common.RunCommand("" /* pipedCmd */, nil /* pipedCmdArg */, "vgscan", args...) if err != nil { klog.Errorf("Failed to scan for volume group post creation, continuing: %v: %s", err, info) } @@ -271,75 +422,54 @@ func reduceVolumeGroup(volumeGroupName string, force bool) { if force { args = append(args, "--force") } - info, err := common.RunCommand("" /* pipedCmd */, "" /* pipedCmdArg */, "vgreduce", args...) + info, err := common.RunCommand("" /* pipedCmd */, nil /* pipedCmdArg */, "vgreduce", args...) if err != nil { klog.Errorf("Errored while cleaning up volume group %v: %s", err, info) } } -func RaidLocalSsds() error { - isAlreadyRaided, err := isRaided() - if err != nil { - klog.V(2).Infof("Errored while scanning for available LocalSSDs err:%v; continuing Raiding", err) - } else if isAlreadyRaided { - klog.V(2).Infof("Local SSDs are already RAIDed, no further action needed here") - return nil - } - diskList := []string{} - info, err := common.RunCommand("" /* pipedCmd */, "" /* pipeCmdArg */, "lsblk", []string{"-o", "NAME,MODEL", "-p", "-d", "-n"}...) - if err != nil { - return fmt.Errorf("Failed to fetch LSSD info: %v; err:%v", info, err) - } - infoList := strings.Split(strings.TrimSpace(string(info)), "\n") - re, err := regexp.Compile("nvme_card([0-9]+)?$") - if err != nil { - return fmt.Errorf("Errored while compiling to check PD or LSSD %s", err) - } - for _, ssd := range infoList { - ssd = strings.TrimSpace(ssd) - if strings.HasPrefix(ssd, "/dev/nvme") { - ssdDetails := strings.Split(ssd, " ") - lssd := re.MatchString(ssdDetails[1]) - if lssd { - diskList = append(diskList, strings.TrimSpace(ssdDetails[0])) - } - } - } - nvmeDiskCount := len(diskList) - if nvmeDiskCount == 0 { - return fmt.Errorf("No local SSDs found for raiding") - } +func RaidLocalSsds(availableLssds []string) error { args := []string{ "--create", - raidedLssdPrefix + raidedLocalSsdName, + initialRaidedLocalSsdPath, + "--name", + raidedLocalSsdName, "-l" + raidMode, // Force RAIDing as sometime it might fail for caution if there is just 1 LSSD present as 1 LSSD need not be RAIDed "--force", "-n", - strconv.Itoa(nvmeDiskCount), + strconv.Itoa(len(availableLssds)), } - args = append(args, diskList...) - info, err = common.RunCommand("" /* pipedCmd */, "" /* pipeCmdArg */, "mdadm", args...) + args = append(args, availableLssds...) + info, err := common.RunCommand("" /* pipedCmd */, nil /* pipeCmdArg */, "mdadm", args...) if err != nil { return fmt.Errorf("errored while RAIDing LSSDs info: %v; err:%v", info, err) } // Validate if Raided successfully - isAlreadyRaided, err = isRaided() + isAlreadyRaided, err := IsRaided() if err != nil { klog.V(2).Infof("Errored while scanning for available raided LocalSSDs err:%v=", err) } if !isAlreadyRaided { return fmt.Errorf("failed raiding, raided device not found on scanning") } + + raidedDataCacheCount, err := FetchRaidedLssdCountForDatacache() + if err != nil { + return err + } + if raidedDataCacheCount != len(availableLssds) { + return fmt.Errorf("Local SSDs reserved do not match the requested count") + } return nil } -func isRaided() (bool, error) { +func IsRaided() (bool, error) { args := []string{ "--detail", "--scan", } - info, err := common.RunCommand("" /* pipedCmd */, "" /* pipeCmdArg */, "mdadm", args...) + info, err := common.RunCommand("" /* pipedCmd */, nil /* pipeCmdArg */, "mdadm", args...) if err != nil { return false, fmt.Errorf("errored while scanning for raided LSSD %v: %s", err, info) } @@ -357,7 +487,7 @@ func isCachingSetup(mainLvName string) (error, bool) { "-o", "pool_lv", } - poolName, err := common.RunCommand("" /* pipedCmd */, "" /* pipeCmdArg */, "lvs", args...) + poolName, err := common.RunCommand("" /* pipedCmd */, nil /* pipeCmdArg */, "lvs", args...) if err != nil { return fmt.Errorf("Failed to check if caching is setup %w", err), false } diff --git a/pkg/gce-pd-csi-driver/controller.go b/pkg/gce-pd-csi-driver/controller.go index 2688efa4d..92f4a0037 100644 --- a/pkg/gce-pd-csi-driver/controller.go +++ b/pkg/gce-pd-csi-driver/controller.go @@ -1063,7 +1063,7 @@ func (gceCS *GCEControllerServer) executeControllerPublishVolume(ctx context.Con if gceCS.enableDataCache && req.GetVolumeContext() != nil { if req.GetVolumeContext()[common.ContextDataCacheSize] != "" { pubVolResp.PublishContext = map[string]string{} - pubVolResp.PublishContext[common.ContexLocalSsdCacheSize] = req.GetVolumeContext()[common.ContextDataCacheSize] + pubVolResp.PublishContext[common.ContextDataCacheSize] = req.GetVolumeContext()[common.ContextDataCacheSize] pubVolResp.PublishContext[common.ContextDataCacheMode] = req.GetVolumeContext()[common.ContextDataCacheMode] } } diff --git a/pkg/gce-pd-csi-driver/node.go b/pkg/gce-pd-csi-driver/node.go index 0815d404e..0f7dbe5bf 100644 --- a/pkg/gce-pd-csi-driver/node.go +++ b/pkg/gce-pd-csi-driver/node.go @@ -337,7 +337,7 @@ func (ns *GCENodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStage klog.Infof("Successfully found attached GCE PD %q at device path %s.", volumeKey.Name, devicePath) - if ns.EnableDataCache && req.GetPublishContext()[common.ContexLocalSsdCacheSize] != "" { + if ns.EnableDataCache && req.GetPublishContext()[common.ContextDataCacheSize] != "" { if len(nodeId) == 0 { return nil, status.Error(codes.InvalidArgument, "NodeStageVolume Node ID must be provided") } @@ -345,6 +345,10 @@ func (ns *GCENodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStage if err != nil { klog.Errorf("filepath.EvalSymlinks(%q) failed when trying to create volume group: %v", devicePath, err) } + configError := ValidateDataCacheConfig(req.GetPublishContext()[common.ContextDataCacheMode], req.GetPublishContext()[common.ContextDataCacheSize], ctx, nodeId) + if configError != nil { + return nil, status.Error(codes.Internal, fmt.Sprintf("Error validate configuration for Datacache: %v", err.Error())) + } devicePath, err = setupCaching(devFsPath, req, nodeId) if err != nil { return nil, status.Error(codes.DataLoss, fmt.Sprintf("Error setting up cache: %v", err.Error())) diff --git a/test/e2e/tests/setup_e2e_test.go b/test/e2e/tests/setup_e2e_test.go index 5222cc663..bd6de4581 100644 --- a/test/e2e/tests/setup_e2e_test.go +++ b/test/e2e/tests/setup_e2e_test.go @@ -33,6 +33,7 @@ import ( compute "google.golang.org/api/compute/v1" "k8s.io/klog/v2" "k8s.io/utils/strings/slices" + "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common" testutils "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/test/e2e/utils" remote "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/test/remote" ) @@ -65,8 +66,6 @@ var ( kmsClient *cloudkms.KeyManagementClient ) -const localSSDCount int64 = 2 - func init() { klog.InitFlags(flag.CommandLine) } @@ -194,7 +193,7 @@ func NewTestContext(zone, minCpuPlatform, machineType string, instanceNumber str CloudtopHost: *cloudtopHost, EnableConfidentialCompute: *enableConfidentialCompute, ComputeService: computeService, - LocalSSDCount: localSSDCount, + LocalSSDCount: common.LocalSSDCountForDataCache, } if machineType == *hdMachineType { diff --git a/test/remote/client-wrappers.go b/test/remote/client-wrappers.go index 28b9887cf..83c6db3af 100644 --- a/test/remote/client-wrappers.go +++ b/test/remote/client-wrappers.go @@ -54,12 +54,7 @@ var ( const ( // Keys in the volume context. - contextForceAttach = "force-attach" - contextDataCacheSize = "data-cache-size" - contextDataCacheMode = "data-cache-mode" - - // Keys in the publish context - contexLocalSsdCacheSize = "local-ssd-cache-size" + contextForceAttach = "force-attach" defaultLocalSsdCacheSize = "200Gi" defaultDataCacheMode = common.DataCacheModeWriteThrough @@ -203,8 +198,8 @@ func (c *CsiClient) NodeStageBlockVolume(volId, stageDir string, setupDataCache func (c *CsiClient) NodeStageVolume(volId string, stageDir string, volumeCap *csipb.VolumeCapability, setupDataCache bool) error { publishContext := map[string]string{} if setupDataCache { - publishContext[contexLocalSsdCacheSize] = defaultLocalSsdCacheSize - publishContext[contextDataCacheMode] = defaultDataCacheMode + publishContext[common.ContextDataCacheSize] = defaultLocalSsdCacheSize + publishContext[common.ContextDataCacheMode] = defaultDataCacheMode } nodeStageReq := &csipb.NodeStageVolumeRequest{ VolumeId: volId, From e33bb0de61d57835d6dc41db384683cabd622d42 Mon Sep 17 00:00:00 2001 From: Hung Nguyen Date: Tue, 25 Feb 2025 00:13:51 +0000 Subject: [PATCH 2/2] update test setup to avoid running Datacache setup on machines not supporting LSSDs --- cmd/gce-pd-csi-driver/main.go | 30 ++++++++-------- pkg/common/constants.go | 2 +- pkg/gce-pd-csi-driver/cache.go | 65 +++++++++++++++++----------------- pkg/gce-pd-csi-driver/node.go | 2 +- test/e2e/utils/utils.go | 7 ++-- test/remote/instance.go | 4 +++ 6 files changed, 59 insertions(+), 51 deletions(-) diff --git a/cmd/gce-pd-csi-driver/main.go b/cmd/gce-pd-csi-driver/main.go index e33fa18ef..4b3d01e31 100644 --- a/cmd/gce-pd-csi-driver/main.go +++ b/cmd/gce-pd-csi-driver/main.go @@ -75,7 +75,7 @@ var ( fallbackRequisiteZonesFlag = flag.String("fallback-requisite-zones", "", "Comma separated list of requisite zones that will be used if there are not sufficient zones present in requisite topologies when provisioning a disk") enableStoragePoolsFlag = flag.Bool("enable-storage-pools", false, "If set to true, the CSI Driver will allow volumes to be provisioned in Storage Pools") enableHdHAFlag = flag.Bool("allow-hdha-provisioning", false, "If set to true, will allow the driver to provision Hyperdisk-balanced High Availability disks") - enableDataCacheFlag = flag.Bool("enable-data-cache", false, "If set to true, the CSI Driver will allow volumes to be provisioned with data cache configuration") + enableDataCacheFlag = flag.Bool("enable-data-cache", false, "If set to true, the CSI Driver will allow volumes to be provisioned with Data Cache configuration") nodeName = flag.String("node-name", "", "The node this driver is running on") multiZoneVolumeHandleDiskTypesFlag = flag.String("multi-zone-volume-handle-disk-types", "", "Comma separated list of allowed disk types that can use the multi-zone volumeHandle. Used only if --multi-zone-volume-handle-enable") @@ -130,7 +130,7 @@ func handle() { if version == "" { klog.Fatalf("version must be set at compile time") } - klog.V(2).Infof("Driver vendor version %v", version) + klog.V(4).Infof("Driver vendor version %v", version) // Start tracing as soon as possible if *enableOtelTracing { @@ -258,10 +258,10 @@ func handle() { if *enableDataCacheFlag { if nodeName == nil || *nodeName == "" { - klog.Errorf("Data cache enabled, but --node-name not passed") + klog.Errorf("Data Cache enabled, but --node-name not passed") } if err := setupDataCache(ctx, *nodeName); err != nil { - klog.Errorf("DataCache setup failed: %v", err) + klog.Errorf("Data Cache setup failed: %v", err) } } @@ -370,7 +370,7 @@ func fetchLssdsForRaiding(lssdCount int) ([]string, error) { return nil, fmt.Errorf("Error listing LSSDs with empty mountpoint: %v", err) } - // We need to ensure the disks to be used for Datacache are both unRAIDed & not containing mountpoints for ephemeral storage already + // We need to ensure the disks to be used for Data Cache are both unRAIDed & not containing mountpoints for ephemeral storage already availableLssds := slices.Filter(nil, unRaidedLssds, func(e string) bool { return slices.Contains(LSSDsWithEmptyMountPoint, e) }) @@ -388,9 +388,9 @@ func fetchLssdsForRaiding(lssdCount int) ([]string, error) { func setupDataCache(ctx context.Context, nodeName string) error { isAlreadyRaided, err := driver.IsRaided() if err != nil { - klog.V(2).Infof("Errored while scanning for available LocalSSDs err:%v; continuing Raiding", err) + klog.V(4).Infof("Errored while scanning for available LocalSSDs err:%v; continuing Raiding", err) } else if isAlreadyRaided { - klog.V(2).Infof("Local SSDs are already RAIDed. Skipping Datacache setup.") + klog.V(4).Infof("Local SSDs are already RAIDed. Skipping Data Cache setup.") return nil } @@ -398,23 +398,23 @@ func setupDataCache(ctx context.Context, nodeName string) error { if nodeName != common.TestNode { var err error lssdCount, err = driver.GetDataCacheCountFromNodeLabel(ctx, nodeName) - if lssdCount == 0 { - klog.Infof("Datacache is not enabled on node %v", nodeName) - return nil - } if err != nil { return err } + if lssdCount == 0 { + klog.V(4).Infof("Data Cache is not enabled on node %v, so skipping caching setup", nodeName) + return nil + } } lssdNames, err := fetchLssdsForRaiding(lssdCount) if err != nil { - klog.Fatalf("Failed to get sufficient SSDs for Datacache's caching setup: %v", err) + klog.Fatalf("Failed to get sufficient SSDs for Data Cache's caching setup: %v", err) } - klog.V(2).Infof("Raiding local ssds to setup data cache: %v", lssdNames) + klog.V(4).Infof("Raiding local ssds to setup Data Cache: %v", lssdNames) if err := driver.RaidLocalSsds(lssdNames); err != nil { - return fmt.Errorf("Failed to Raid local SSDs, unable to setup data caching, got error %v", err) + return fmt.Errorf("Failed to Raid local SSDs, unable to setup Data Cache, got error %v", err) } - klog.V(2).Infof("Datacache enabled for node %s", nodeName) + klog.V(4).Infof("LSSD caching is setup for the Data Cache enabled node %s", nodeName) return nil } diff --git a/pkg/common/constants.go b/pkg/common/constants.go index f5a5dcf5f..f7697c50b 100644 --- a/pkg/common/constants.go +++ b/pkg/common/constants.go @@ -48,7 +48,7 @@ const ( // Default LSSD count for datacache E2E tests LocalSSDCountForDataCache = 2 - // Node label for datacache + // Node label for Data Cache (only applicable to GKE nodes) NodeLabelPrefix = "cloud.google.com/%s" DataCacheLssdCountLabel = "gke-data-cache-disk" ) diff --git a/pkg/gce-pd-csi-driver/cache.go b/pkg/gce-pd-csi-driver/cache.go index 032bf68ae..91be2eb95 100644 --- a/pkg/gce-pd-csi-driver/cache.go +++ b/pkg/gce-pd-csi-driver/cache.go @@ -16,11 +16,10 @@ import ( ) const ( - cacheSuffix = "csi-fast" - mainLvSuffix = "csi-main" - raidedLocalSsdName = "csi-driver-data-cache" - raidMode = "0" - initialRaidedLocalSsdPath = "/dev/md0" + cacheSuffix = "csi-fast" + mainLvSuffix = "csi-main" + raidedLocalSsdName = "csi-driver-data-cache" + raidMode = "0" ) func fetchRAIDedLocalSsdPath() (string, error) { @@ -30,12 +29,13 @@ func fetchRAIDedLocalSsdPath() (string, error) { } info, err := common.RunCommand("grep", []string{raidedLocalSsdName}, "mdadm", args...) if err != nil || len(info) == 0 { - return "", fmt.Errorf("Error getting RAIDed device path for Datacache %v, output:%v ===============", err, string(info)) + return "", fmt.Errorf("Error getting RAIDed device path for Data Cache %v, output:%v", err, string(info)) } infoString := strings.TrimSpace(string(info)) infoSlice := strings.Split(infoString, " ") - // We want to get the second element in the array, which is the path to the RAIDed device + // We want to get the second element in the array (sample: ARRAY /dev/md126 metadata=1.2 name=csi-driver-data-cache UUID=*), + // which is the path to the RAIDed device return infoSlice[1], nil } @@ -51,7 +51,7 @@ func setupCaching(devicePath string, req *csi.NodeStageVolumeRequest, nodeId str volumeGroupName := getVolumeGroupName(nodeId) mainDevicePath := "/dev/" + volumeGroupName + "/" + getLvName(mainLvSuffix, volumeId) mainLvName := getLvName(mainLvSuffix, volumeId) - klog.V(2).Infof("Volume group available on node %v ", volumeGroupName) + klog.V(4).Infof("Volume group available on node %v ", volumeGroupName) vgExists := checkVgExists(volumeGroupName) if vgExists { // Clean up Volume Group before adding the PD @@ -82,9 +82,9 @@ func setupCaching(devicePath string, req *csi.NodeStageVolumeRequest, nodeId str infoString = strings.ReplaceAll(infoString, "\"", "") infoSlice := strings.Split(strings.TrimSpace(infoString), " ") vgNameForPv := strings.TrimSpace(infoSlice[(len(infoSlice) - 1)]) - klog.V(2).Infof("============================== Physical volume is part of Volume group: %v ==============================", vgNameForPv) + klog.V(4).Infof("Physical volume is part of Volume group: %v", vgNameForPv) if vgNameForPv == volumeGroupName { - klog.V(2).Infof("============================== Physical Volume(PV) already exists in the Volume Group ==============================") + klog.V(4).Infof("Physical Volume(PV) already exists in the Volume Group") } else if vgNameForPv != "VG" && vgNameForPv != "" { info, err = common.RunCommand("" /* pipedCmd */, nil /* pipedCmdArg */, "vgchange", []string{"-an", vgNameForPv}...) @@ -157,7 +157,7 @@ func setupCaching(devicePath string, req *csi.NodeStageVolumeRequest, nodeId str cacheLvName := getLvName(cacheSuffix, volumeId) if isCached { // Validate that cache is setup for required size - klog.V(2).Infof("Assuming valid data cache size and mode, resizing cache is not supported") + klog.V(4).Infof("Assuming valid data cache size and mode, resizing cache is not supported") } else { fastCacheSize := req.GetPublishContext()[common.ContextDataCacheSize] chunkSize := "960" // Cannot use default chunk size(64KiB) as it errors on maxChunksAllowed. Unit - KiB @@ -207,8 +207,8 @@ func setupCaching(devicePath string, req *csi.NodeStageVolumeRequest, nodeId str return mainDevicePath, nil } -func ValidateDataCacheConfig(dataCacheMode string, datacacheSize string, ctx context.Context, nodeName string) error { - if dataCacheMode != "" && datacacheSize != "" { +func ValidateDataCacheConfig(dataCacheMode string, dataCacheSize string, ctx context.Context, nodeName string) error { + if dataCacheMode != "" && dataCacheSize != "" { isAlreadyRaided, err := IsRaided() if err != nil { return fmt.Errorf("Local SSDs are not setup for caching; got error: %v", err) @@ -218,48 +218,50 @@ func ValidateDataCacheConfig(dataCacheMode string, datacacheSize string, ctx con } return nil } - klog.Infof("Data cache is not enabled for PVC") + klog.V(4).Infof("Data Cache is not enabled for PVC (data-cache-size: %v, data-cache-mode: %v). Please set both these parameters in StorageClass to enable caching", dataCacheSize, dataCacheMode) return nil } func GetDataCacheCountFromNodeLabel(ctx context.Context, nodeName string) (int, error) { - if nodeName == common.TestNode { - return common.LocalSSDCountForDataCache, nil - } cfg, err := rest.InClusterConfig() // We want to capture API errors with node label fetching, so return -1 // in those cases instead of 0. if err != nil { - return -1, err + return 0, err } kubeClient, err := kubernetes.NewForConfig(cfg) if err != nil { - return -1, err + return 0, err } node, err := kubeClient.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{}) if err != nil { // We could retry, but this error will also crashloop the driver which may be as good a way to retry as any. - return -1, err + return 0, err } if val, found := node.GetLabels()[fmt.Sprintf(common.NodeLabelPrefix, common.DataCacheLssdCountLabel)]; found { dataCacheCount, err := strconv.Atoi(val) if err != nil { - return -1, fmt.Errorf("Error getting Datacache's LSSD count from node label: %v", err) + return 0, fmt.Errorf("Error getting Data Cache's LSSD count from node label: %v", err) } - klog.Infof("Number of local SSDs requested for Datacache: %v", dataCacheCount) + klog.V(4).Infof("Number of local SSDs requested for Data Cache: %v", dataCacheCount) return dataCacheCount, nil } - return 0, fmt.Errorf("Cannot get Datacache's LSSD count from node label") + // This will be returned for a non-Data-Cache node pool + return 0, nil } func FetchRaidedLssdCountForDatacache() (int, error) { + raidedPath, err := fetchRAIDedLocalSsdPath() + if err != nil { + return 0, err + } args := []string{ "--detail", - initialRaidedLocalSsdPath, + raidedPath, } info, err := common.RunCommand("grep", []string{"Raid Devices"}, "mdadm", args...) if err != nil { - return 0, fmt.Errorf("Error getting RAIDed devices for Datacache") + return 0, fmt.Errorf("Error getting RAIDed devices for Data Cache") } if len(info) != 0 { raidedDeviceInfo := strings.Split(strings.TrimSpace(string(info)), ":") @@ -294,7 +296,7 @@ func FetchRaidedLssds() ([]string, error) { } } - klog.V(2).Infof("Raided NVME list %v", raidedLssdList) + klog.V(4).Infof("Raided NVME list %v", raidedLssdList) return raidedLssdList, nil } @@ -309,7 +311,7 @@ func FetchAllLssds() ([]string, error) { infoList := strings.Split(strings.TrimSpace(string(info)), "\n") re, err := regexp.Compile("nvme_card([0-9]+)?$") if err != nil { - klog.V(2).ErrorS(err, "Errored while compiling to check PD or LSSD") + klog.V(4).ErrorS(err, "Errored while compiling to check PD or LSSD") } for _, ssd := range infoList { ssd = strings.TrimSpace(ssd) @@ -322,7 +324,7 @@ func FetchAllLssds() ([]string, error) { } } - klog.V(2).Infof("NVME list %v", diskList) + klog.V(4).Infof("NVME list %v", diskList) return diskList, nil } @@ -358,6 +360,7 @@ func cleanupCache(volumeId string, nodeId string) error { // If volume group doesn't exist then there's nothing to uncache return nil } + reduceVolumeGroup(volumeGroupName, true) mainLvName := getLvName(mainLvSuffix, volumeId) args := []string{ "-an", @@ -404,7 +407,7 @@ func createVg(volumeGroupName string, devicePath string, raidedLocalSsds string) if err != nil { return fmt.Errorf("Volume group creation failed %w: %s", err, info) } - klog.Infof("Volume group creation succeeded for %v", volumeGroupName) + klog.V(4).Infof("Volume group creation succeeded for %v", volumeGroupName) args = []string{} info, err = common.RunCommand("" /* pipedCmd */, nil /* pipedCmdArg */, "vgscan", args...) @@ -431,8 +434,6 @@ func reduceVolumeGroup(volumeGroupName string, force bool) { func RaidLocalSsds(availableLssds []string) error { args := []string{ "--create", - initialRaidedLocalSsdPath, - "--name", raidedLocalSsdName, "-l" + raidMode, // Force RAIDing as sometime it might fail for caution if there is just 1 LSSD present as 1 LSSD need not be RAIDed @@ -448,7 +449,7 @@ func RaidLocalSsds(availableLssds []string) error { // Validate if Raided successfully isAlreadyRaided, err := IsRaided() if err != nil { - klog.V(2).Infof("Errored while scanning for available raided LocalSSDs err:%v=", err) + klog.V(4).Infof("Errored while scanning for available raided LocalSSDs err:%v=", err) } if !isAlreadyRaided { return fmt.Errorf("failed raiding, raided device not found on scanning") diff --git a/pkg/gce-pd-csi-driver/node.go b/pkg/gce-pd-csi-driver/node.go index 0f7dbe5bf..62f0db372 100644 --- a/pkg/gce-pd-csi-driver/node.go +++ b/pkg/gce-pd-csi-driver/node.go @@ -347,7 +347,7 @@ func (ns *GCENodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStage } configError := ValidateDataCacheConfig(req.GetPublishContext()[common.ContextDataCacheMode], req.GetPublishContext()[common.ContextDataCacheSize], ctx, nodeId) if configError != nil { - return nil, status.Error(codes.Internal, fmt.Sprintf("Error validate configuration for Datacache: %v", err.Error())) + return nil, status.Error(codes.Internal, fmt.Sprintf("Error validate configuration for Data Cache: %v", err.Error())) } devicePath, err = setupCaching(devFsPath, req, nodeId) if err != nil { diff --git a/test/e2e/utils/utils.go b/test/e2e/utils/utils.go index b4eb8020a..949c62e40 100644 --- a/test/e2e/utils/utils.go +++ b/test/e2e/utils/utils.go @@ -71,8 +71,11 @@ func GCEClientAndDriverSetup(instance *remote.InstanceInfo, driverConfig DriverC "--allow-hdha-provisioning", "--device-in-use-timeout=10s", // Set lower than the usual value to expedite tests fmt.Sprintf("--fallback-requisite-zones=%s", strings.Join(driverConfig.Zones, ",")), - "--enable-data-cache", - fmt.Sprintf("--node-name=%s", utilcommon.TestNode), + } + + if instance.GetLocalSSD() > 0 { + extra_flags = append(extra_flags, "--enable-data-cache") + extra_flags = append(extra_flags, fmt.Sprintf("--node-name=%s", utilcommon.TestNode)) } extra_flags = append(extra_flags, fmt.Sprintf("--compute-endpoint=%s", driverConfig.ComputeEndpoint)) extra_flags = append(extra_flags, driverConfig.ExtraFlags...) diff --git a/test/remote/instance.go b/test/remote/instance.go index c9c3dd6d5..554e7612e 100644 --- a/test/remote/instance.go +++ b/test/remote/instance.go @@ -80,6 +80,10 @@ func (i *InstanceInfo) GetNodeID() string { return common.CreateNodeID(i.cfg.Project, i.cfg.Zone, i.cfg.Name) } +func (i *InstanceInfo) GetLocalSSD() int64 { + return i.cfg.LocalSSDCount +} + func machineTypeMismatch(curInst *compute.Instance, newInst *compute.Instance) bool { if !strings.Contains(curInst.MachineType, newInst.MachineType) { klog.Infof("Machine type mismatch")