@@ -29,7 +29,6 @@ import (
29
29
30
30
"k8s.io/klog/v2"
31
31
"k8s.io/utils/strings/slices"
32
-
33
32
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common"
34
33
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/deviceutils"
35
34
gce "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/compute"
72
71
formatAndMountTimeout = flag .Duration ("format-and-mount-timeout" , 1 * time .Minute , "The maximum duration of a format and mount operation before another such operation will be started. Used only if --serialize-format-and-mount" )
73
72
fallbackRequisiteZonesFlag = flag .String ("fallback-requisite-zones" , "" , "Comma separated list of requisite zones that will be used if there are not sufficient zones present in requisite topologies when provisioning a disk" )
74
73
enableStoragePoolsFlag = flag .Bool ("enable-storage-pools" , false , "If set to true, the CSI Driver will allow volumes to be provisioned in Storage Pools" )
74
+ enableDataCacheFlag = flag .Bool ("enable-data-cache" , false , "If set to true, the CSI Driver will allow volumes to be provisioned with Data Cache configuration" )
75
+ nodeName = flag .String ("node-name" , "" , "The node this driver is running on" )
75
76
76
77
multiZoneVolumeHandleDiskTypesFlag = flag .String ("multi-zone-volume-handle-disk-types" , "" , "Comma separated list of allowed disk types that can use the multi-zone volumeHandle. Used only if --multi-zone-volume-handle-enable" )
77
78
multiZoneVolumeHandleEnableFlag = flag .Bool ("multi-zone-volume-handle-enable" , false , "If set to true, the multi-zone volumeHandle feature will be enabled" )
@@ -122,7 +123,7 @@ func handle() {
122
123
if version == "" {
123
124
klog .Fatalf ("version must be set at compile time" )
124
125
}
125
- klog .V (2 ).Infof ("Driver vendor version %v" , version )
126
+ klog .V (4 ).Infof ("Driver vendor version %v" , version )
126
127
127
128
// Start tracing as soon as possible
128
129
if * enableOtelTracing {
@@ -209,7 +210,7 @@ func handle() {
209
210
}
210
211
initialBackoffDuration := time .Duration (* errorBackoffInitialDurationMs ) * time .Millisecond
211
212
maxBackoffDuration := time .Duration (* errorBackoffMaxDurationMs ) * time .Millisecond
212
- controllerServer = driver .NewControllerServer (gceDriver , cloudProvider , initialBackoffDuration , maxBackoffDuration , fallbackRequisiteZones , * enableStoragePoolsFlag , multiZoneVolumeHandleConfig , listVolumesConfig )
213
+ controllerServer = driver .NewControllerServer (gceDriver , cloudProvider , initialBackoffDuration , maxBackoffDuration , fallbackRequisiteZones , * enableStoragePoolsFlag , * enableDataCacheFlag , multiZoneVolumeHandleConfig , listVolumesConfig )
213
214
} else if * cloudConfigFilePath != "" {
214
215
klog .Warningf ("controller service is disabled but cloud config given - it has no effect" )
215
216
}
@@ -227,10 +228,29 @@ func handle() {
227
228
if err != nil {
228
229
klog .Fatalf ("Failed to set up metadata service: %v" , err .Error ())
229
230
}
230
- nodeServer = driver .NewNodeServer (gceDriver , mounter , deviceUtils , meta , statter )
231
+ isDataCacheEnabledNodePool , err := isDataCacheEnabledNodePool (ctx , * nodeName )
232
+ if err != nil {
233
+ klog .Fatalf ("Failed to get node info from API server: %v" , err .Error ())
234
+ }
235
+ nsArgs := driver.NodeServerArgs {
236
+ EnableDataCache : * enableDataCacheFlag ,
237
+ DataCacheEnabledNodePool : isDataCacheEnabledNodePool ,
238
+ }
239
+ nodeServer = driver .NewNodeServer (gceDriver , mounter , deviceUtils , meta , statter , nsArgs )
231
240
if * maxConcurrentFormatAndMount > 0 {
232
241
nodeServer = nodeServer .WithSerializedFormatAndMount (* formatAndMountTimeout , * maxConcurrentFormatAndMount )
233
242
}
243
+ if * enableDataCacheFlag {
244
+ if nodeName == nil || * nodeName == "" {
245
+ klog .Errorf ("Data Cache enabled, but --node-name not passed" )
246
+ }
247
+ if nsArgs .DataCacheEnabledNodePool {
248
+ if err := setupDataCache (ctx , * nodeName , nodeServer .MetadataService .GetName ()); err != nil {
249
+ klog .Errorf ("Data Cache setup failed: %v" , err )
250
+ }
251
+ go driver .StartWatcher (* nodeName )
252
+ }
253
+ }
234
254
}
235
255
236
256
err = gceDriver .SetupGCEDriver (driverName , version , extraVolumeLabels , extraTags , identityServer , controllerServer , nodeServer )
@@ -311,3 +331,85 @@ func urlFlag(target **url.URL, name string, usage string) {
311
331
return err
312
332
})
313
333
}
334
+
335
+ func isDataCacheEnabledNodePool (ctx context.Context , nodeName string ) (bool , error ) {
336
+ if ! * enableDataCacheFlag {
337
+ return false , nil
338
+ }
339
+ if len (nodeName ) > 0 && nodeName != common .TestNode { // disregard logic below when E2E testing.
340
+ dataCacheLSSDCount , err := driver .GetDataCacheCountFromNodeLabel (ctx , nodeName )
341
+ return dataCacheLSSDCount != 0 , err
342
+ }
343
+ return true , nil
344
+ }
345
+
346
+ func fetchLssdsForRaiding (lssdCount int ) ([]string , error ) {
347
+ allLssds , err := driver .FetchAllLssds ()
348
+ if err != nil {
349
+ return nil , fmt .Errorf ("Error listing all LSSDs %v" , err )
350
+ }
351
+
352
+ raidedLssds , err := driver .FetchRaidedLssds ()
353
+ if err != nil {
354
+ return nil , fmt .Errorf ("Error listing RAIDed LSSDs %v" , err )
355
+ }
356
+
357
+ LSSDsWithEmptyMountPoint , err := driver .FetchLSSDsWithEmptyMountPoint ()
358
+ if err != nil {
359
+ return nil , fmt .Errorf ("Error listing LSSDs with empty mountpoint: %v" , err )
360
+ }
361
+
362
+ // We need to ensure the disks to be used for Data Cache are both unRAIDed & not containing mountpoints for ephemeral storage already
363
+ availableLssds := slices .Filter (nil , allLssds , func (e string ) bool {
364
+ return slices .Contains (LSSDsWithEmptyMountPoint , e ) && ! slices .Contains (raidedLssds , e )
365
+ })
366
+
367
+ if len (availableLssds ) == 0 {
368
+ return nil , fmt .Errorf ("No LSSDs available to set up caching" )
369
+ }
370
+
371
+ if len (availableLssds ) < lssdCount {
372
+ return nil , fmt .Errorf ("Not enough LSSDs available to set up caching. Available LSSDs: %v, wanted LSSDs: %v" , len (availableLssds ), lssdCount )
373
+ }
374
+
375
+ return availableLssds [:lssdCount ], nil
376
+ }
377
+
378
+ func setupDataCache (ctx context.Context , nodeName string , nodeId string ) error {
379
+ isAlreadyRaided , err := driver .IsRaided ()
380
+ if err != nil {
381
+ klog .V (4 ).Infof ("Errored while scanning for available LocalSSDs err:%v; continuing Raiding" , err )
382
+ } else if isAlreadyRaided {
383
+ klog .V (4 ).Infof ("Local SSDs are already RAIDed. Skipping Data Cache setup." )
384
+ return nil
385
+ }
386
+
387
+ lssdCount := common .LocalSSDCountForDataCache
388
+ if nodeName != common .TestNode {
389
+ var err error
390
+ lssdCount , err = driver .GetDataCacheCountFromNodeLabel (ctx , nodeName )
391
+ if err != nil {
392
+ return err
393
+ }
394
+ if lssdCount == 0 {
395
+ klog .V (4 ).Infof ("Data Cache is not enabled on node %v, so skipping caching setup" , nodeName )
396
+ return nil
397
+ }
398
+ }
399
+ lssdNames , err := fetchLssdsForRaiding (lssdCount )
400
+ if err != nil {
401
+ klog .Fatalf ("Failed to get sufficient SSDs for Data Cache's caching setup: %v" , err )
402
+ }
403
+ klog .V (4 ).Infof ("Raiding local ssds to setup Data Cache: %v" , lssdNames )
404
+ if err := driver .RaidLocalSsds (lssdNames ); err != nil {
405
+ return fmt .Errorf ("Failed to Raid local SSDs, unable to setup Data Cache, got error %v" , err )
406
+ }
407
+
408
+ // Initializing data cache node (VG checks w/ raided lssd)
409
+ if err := driver .InitializeDataCacheNode (nodeId ); err != nil {
410
+ return err
411
+ }
412
+
413
+ klog .V (4 ).Infof ("LSSD caching is setup for the Data Cache enabled node %s" , nodeName )
414
+ return nil
415
+ }
0 commit comments