Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 67 additions & 28 deletions cmd/gce-pd-csi-driver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,8 @@ import (
"strings"
"time"

"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
"k8s.io/utils/strings/slices"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common"
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/deviceutils"
gce "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/compute"
Expand Down Expand Up @@ -79,7 +75,7 @@ var (
fallbackRequisiteZonesFlag = flag.String("fallback-requisite-zones", "", "Comma separated list of requisite zones that will be used if there are not sufficient zones present in requisite topologies when provisioning a disk")
enableStoragePoolsFlag = flag.Bool("enable-storage-pools", false, "If set to true, the CSI Driver will allow volumes to be provisioned in Storage Pools")
enableHdHAFlag = flag.Bool("allow-hdha-provisioning", false, "If set to true, will allow the driver to provision Hyperdisk-balanced High Availability disks")
enableDataCacheFlag = flag.Bool("enable-data-cache", false, "If set to true, the CSI Driver will allow volumes to be provisioned with data cache configuration")
enableDataCacheFlag = flag.Bool("enable-data-cache", false, "If set to true, the CSI Driver will allow volumes to be provisioned with Data Cache configuration")
nodeName = flag.String("node-name", "", "The node this driver is running on")

multiZoneVolumeHandleDiskTypesFlag = flag.String("multi-zone-volume-handle-disk-types", "", "Comma separated list of allowed disk types that can use the multi-zone volumeHandle. Used only if --multi-zone-volume-handle-enable")
Expand All @@ -102,9 +98,7 @@ var (
)

const (
driverName = "pd.csi.storage.gke.io"
dataCacheLabel = "datacache-storage-gke-io"
dataCacheLabelValue = "enabled"
driverName = "pd.csi.storage.gke.io"
)

func init() {
Expand Down Expand Up @@ -136,7 +130,7 @@ func handle() {
if version == "" {
klog.Fatalf("version must be set at compile time")
}
klog.V(2).Infof("Driver vendor version %v", version)
klog.V(4).Infof("Driver vendor version %v", version)

// Start tracing as soon as possible
if *enableOtelTracing {
Expand Down Expand Up @@ -264,10 +258,10 @@ func handle() {

if *enableDataCacheFlag {
if nodeName == nil || *nodeName == "" {
klog.Errorf("Data cache enabled, but --node-name not passed")
klog.Errorf("Data Cache enabled, but --node-name not passed")
}
if err := setupDataCache(ctx, *nodeName); err != nil {
klog.Errorf("DataCache setup failed: %v", err)
klog.Errorf("Data Cache setup failed: %v", err)
}
}

Expand Down Expand Up @@ -350,32 +344,77 @@ func urlFlag(target **url.URL, name string, usage string) {
})
}

func setupDataCache(ctx context.Context, nodeName string) error {
klog.V(2).Infof("Setting up data cache for node %s", nodeName)
if nodeName != common.TestNode {
cfg, err := rest.InClusterConfig()
if err != nil {
return err
func fetchLssdsForRaiding(lssdCount int) ([]string, error) {
allLssds, err := driver.FetchAllLssds()
if err != nil {
return nil, fmt.Errorf("Error listing all LSSDs %v", err)
}

raidedLssds, err := driver.FetchRaidedLssds()
if err != nil {
return nil, fmt.Errorf("Error listing RAIDed LSSDs %v", err)
}

unRaidedLssds := []string{}
for _, l := range allLssds {
if !slices.Contains(raidedLssds, l) {
unRaidedLssds = append(unRaidedLssds, l)
}
kubeClient, err := kubernetes.NewForConfig(cfg)
if err != nil {
return err
if len(unRaidedLssds) == lssdCount {
break
}
node, err := kubeClient.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
}

LSSDsWithEmptyMountPoint, err := driver.FetchLSSDsWihtEmptyMountPoint()
if err != nil {
return nil, fmt.Errorf("Error listing LSSDs with empty mountpoint: %v", err)
}

// We need to ensure the disks to be used for Data Cache are both unRAIDed & not containing mountpoints for ephemeral storage already
availableLssds := slices.Filter(nil, unRaidedLssds, func(e string) bool {
return slices.Contains(LSSDsWithEmptyMountPoint, e)
})

if len(availableLssds) == 0 {
return nil, fmt.Errorf("No LSSDs available to set up caching")
}

if len(availableLssds) < lssdCount {
return nil, fmt.Errorf("Not enough LSSDs available to set up caching. Available LSSDs: %v, wanted LSSDs: %v", len(availableLssds), lssdCount)
}
return availableLssds, nil
}

func setupDataCache(ctx context.Context, nodeName string) error {
isAlreadyRaided, err := driver.IsRaided()
if err != nil {
klog.V(4).Infof("Errored while scanning for available LocalSSDs err:%v; continuing Raiding", err)
} else if isAlreadyRaided {
klog.V(4).Infof("Local SSDs are already RAIDed. Skipping Data Cache setup.")
return nil
}

lssdCount := common.LocalSSDCountForDataCache
if nodeName != common.TestNode {
var err error
lssdCount, err = driver.GetDataCacheCountFromNodeLabel(ctx, nodeName)
if err != nil {
// We could retry, but this error will also crashloop the driver which may be as good a way to retry as any.
return err
}
if val, found := node.GetLabels()[dataCacheLabel]; !found || val != dataCacheLabelValue {
klog.V(2).Infof("Datacache not enabled for node %s; node label %s=%s and not %s", nodeName, dataCacheLabel, val, dataCacheLabelValue)
if lssdCount == 0 {
klog.V(4).Infof("Data Cache is not enabled on node %v, so skipping caching setup", nodeName)
return nil
}
}
klog.V(2).Info("Raiding local ssds to setup data cache")
if err := driver.RaidLocalSsds(); err != nil {
return fmt.Errorf("Failed to Raid local SSDs, unable to setup data caching, got error %v", err)
lssdNames, err := fetchLssdsForRaiding(lssdCount)
if err != nil {
klog.Fatalf("Failed to get sufficient SSDs for Data Cache's caching setup: %v", err)
}
klog.V(4).Infof("Raiding local ssds to setup Data Cache: %v", lssdNames)
if err := driver.RaidLocalSsds(lssdNames); err != nil {
return fmt.Errorf("Failed to Raid local SSDs, unable to setup Data Cache, got error %v", err)
}

klog.V(2).Infof("Datacache enabled for node %s", nodeName)
klog.V(4).Infof("LSSD caching is setup for the Data Cache enabled node %s", nodeName)
return nil
}
7 changes: 7 additions & 0 deletions pkg/common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,11 @@ const (
ContexLocalSsdCacheSize = "local-ssd-cache-size"
// Node name for E2E tests
TestNode = "test-node-csi-e2e"

// Default LSSD count for datacache E2E tests
LocalSSDCountForDataCache = 2

// Node label for Data Cache (only applicable to GKE nodes)
NodeLabelPrefix = "cloud.google.com/%s"
DataCacheLssdCountLabel = "gke-data-cache-disk"
)
12 changes: 8 additions & 4 deletions pkg/common/runcmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ const (
// RunCommand wraps a k8s exec to deal with the no child process error. Same as exec.CombinedOutput.
// On error, the output is included so callers don't need to echo it again.

func RunCommand(pipeCmd string, pipeCmdArg string, cmd1 string, execCmdArgs ...string) ([]byte, error) {
func RunCommand(pipeCmd string, pipeCmdArg []string, cmd1 string, execCmdArgs ...string) ([]byte, error) {
execCmd1 := exec.Command(cmd1, execCmdArgs...)

if pipeCmd != "" {
Expand Down Expand Up @@ -47,9 +47,9 @@ func checkError(err error, execCmd exec.Cmd) error {
}
return err
}
func execPipeCommand(pipeCmd string, pipeCmdArg string, execCmd1 *exec.Cmd) ([]byte, error) {
func execPipeCommand(pipeCmd string, pipeCmdArg []string, execCmd1 *exec.Cmd) ([]byte, error) {

execPipeCmd := exec.Command(pipeCmd, pipeCmdArg)
execPipeCmd := exec.Command(pipeCmd, pipeCmdArg...)
stdoutPipe, err := execCmd1.StdoutPipe()
if err != nil {
klog.Errorf("failed command %v: got error:%v", execCmd1, err)
Expand All @@ -63,8 +63,12 @@ func execPipeCommand(pipeCmd string, pipeCmdArg string, execCmd1 *exec.Cmd) ([]b
execPipeCmd.Stdin = stdoutPipe
output, err := execPipeCmd.CombinedOutput()
if err != nil {
// Some commands (such as grep) will return an error with exit status of 1
if len(output) == 0 && err.(*exec.ExitError).ExitCode() == 1 {
return output, nil
}
err = checkError(err, *execPipeCmd)
return nil, fmt.Errorf("%s failed: %w; output: %s", pipeCmd, err, string(output))
return nil, fmt.Errorf("%s failed: %w; output: %s", execPipeCmd, err, string(output))
}

return output, nil
Expand Down
Loading