Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 122 additions & 7 deletions pkg/cache/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ type ClusterInfo struct {
SyncError error
// APIResources holds list of API resources supported by the cluster
APIResources []kube.APIResourceInfo
// FailedResourceGVKs holds list of GVKs that failed to sync due to conversion webhook errors
FailedResourceGVKs []string
// IsTainted indicates if the cluster cache is in a tainted state
IsTainted bool
// TaintReason provides information about why the cluster is tainted
TaintReason string
}

// OnEventHandler is a function that handles Kubernetes event
Expand Down Expand Up @@ -168,6 +174,7 @@ func NewClusterCache(config *rest.Config, opts ...UpdateSettingsFunc) *clusterCa
listSemaphore: semaphore.NewWeighted(defaultListSemaphoreWeight),
resources: make(map[kube.ResourceKey]*Resource),
nsIndex: make(map[string]map[kube.ResourceKey]*Resource),
failedResourceGVKs: make(map[string]bool),
config: config,
kubectl: &kube.KubectlCmd{
Log: log,
Expand Down Expand Up @@ -228,6 +235,9 @@ type clusterCache struct {
resources map[kube.ResourceKey]*Resource
nsIndex map[string]map[kube.ResourceKey]*Resource

// failedResourceGVKs tracks GVKs that failed to sync due to conversion webhook errors
failedResourceGVKs map[string]bool

kubectl kube.Kubectl
log logr.Logger
config *rest.Config
Expand Down Expand Up @@ -487,6 +497,7 @@ func (c *clusterCache) Invalidate(opts ...UpdateSettingsFunc) {
}
c.apisMeta = nil
c.namespacedResources = nil
c.failedResourceGVKs = make(map[string]bool)
c.log.Info("Invalidated cluster")
}

Expand Down Expand Up @@ -637,15 +648,42 @@ func (c *clusterCache) loadInitialState(ctx context.Context, api kube.APIResourc
})
})
if err != nil {
if c.errorTaintsCache(err) {
// For cache-tainting errors, track the failed GVK but don't fail completely
gvk := schema.GroupVersionKind{
Group: api.GroupKind.Group,
Version: api.GroupVersionResource.Version,
Kind: api.GroupKind.Kind,
}
if lock {
_ = runSynced(&c.lock, func() error {
c.trackFailedGVK(gvk, err)
return nil
})
} else {
c.trackFailedGVK(gvk, err)
}
// Return empty items and successful resource version for cache-tainting errors
return "", nil
}
return "", fmt.Errorf("failed to load initial state of resource %s: %w", api.GroupKind.String(), err)
}

// Successfully loaded resources - clear any previously failed GVK
gvk := schema.GroupVersionKind{
Group: api.GroupKind.Group,
Version: api.GroupVersionResource.Version,
Kind: api.GroupKind.Kind,
}

if lock {
return resourceVersion, runSynced(&c.lock, func() error {
c.clearFailedGVK(gvk)
c.replaceResourceCache(api.GroupKind, items, ns)
return nil
})
}
c.clearFailedGVK(gvk)
c.replaceResourceCache(api.GroupKind, items, ns)
return resourceVersion, nil
}
Expand Down Expand Up @@ -966,9 +1004,31 @@ func (c *clusterCache) sync() error {
return nil
}
}
if c.errorTaintsCache(err) {
// For cache-tainting errors, track the failed GVK but don't fail the sync
gvk := schema.GroupVersionKind{
Group: api.GroupKind.Group,
Version: api.GroupVersionResource.Version,
Kind: api.GroupKind.Kind,
}
lock.Lock()
c.trackFailedGVK(gvk, err)
lock.Unlock()
return nil
}
return fmt.Errorf("failed to load initial state of resource %s: %w", api.GroupKind.String(), err)
}

// Successfully loaded resources - clear any previously failed GVK
gvk := schema.GroupVersionKind{
Group: api.GroupKind.Group,
Version: api.GroupVersionResource.Version,
Kind: api.GroupKind.Kind,
}
lock.Lock()
c.clearFailedGVK(gvk)
lock.Unlock()

go c.watchEvents(ctx, api, resClient, ns, resourceVersion)

return nil
Expand Down Expand Up @@ -1378,14 +1438,69 @@ func (c *clusterCache) GetClusterInfo() ClusterInfo {
c.syncStatus.lock.Lock()
defer c.syncStatus.lock.Unlock()

// Retrieve failed resource GVKs from the internal state
failedGVKs := c.getFailedResourceGVKs()

return ClusterInfo{
APIsCount: len(c.apisMeta),
K8SVersion: c.serverVersion,
ResourcesCount: len(c.resources),
Server: c.config.Host,
LastCacheSyncTime: c.syncStatus.syncTime,
SyncError: c.syncStatus.syncError,
APIResources: c.apiResources,
APIsCount: len(c.apisMeta),
K8SVersion: c.serverVersion,
ResourcesCount: len(c.resources),
Server: c.config.Host,
LastCacheSyncTime: c.syncStatus.syncTime,
SyncError: c.syncStatus.syncError,
APIResources: c.apiResources,
FailedResourceGVKs: failedGVKs,
}
}

// getFailedResourceGVKs returns a list of GVKs that failed to sync due to cache-tainting errors.
// This method should be called with the clusterCache lock held.
func (c *clusterCache) getFailedResourceGVKs() []string {
var failedGVKs []string
for gvk := range c.failedResourceGVKs {
failedGVKs = append(failedGVKs, gvk)
}
return failedGVKs
}

// trackFailedGVK tracks a GVK that failed to sync due to a cache-tainting error.
// This method should be called with the clusterCache lock held.
func (c *clusterCache) trackFailedGVK(gvk schema.GroupVersionKind, err error) {
gvkString := gvk.String()
c.failedResourceGVKs[gvkString] = true
c.log.Info("Tracked failed resource GVK due to cache-tainting error", "gvk", gvkString, "error", err.Error())
}

// clearFailedGVK removes a GVK from the failed tracking when it recovers.
// This method should be called with the clusterCache lock held.
func (c *clusterCache) clearFailedGVK(gvk schema.GroupVersionKind) {
gvkString := gvk.String()
if _, exists := c.failedResourceGVKs[gvkString]; exists {
delete(c.failedResourceGVKs, gvkString)
c.log.Info("Cleared failed resource GVK after recovery", "gvk", gvkString)
}
}

// errorTaintsCache checks if an error should cause the cache to be marked as tainted
// but allow it to continue operating. This includes errors like conversion webhook
// failures and other recoverable sync errors.
func (c *clusterCache) errorTaintsCache(err error) bool {
if err == nil {
return false
}

errStr := err.Error()

switch {
case strings.Contains(errStr, "conversion webhook"):
return true
case strings.Contains(errStr, "failed") && strings.Contains(errStr, "convert"):
return true
// Future extension: pagination token expiration errors
// case strings.Contains(errStr, "expired") && strings.Contains(errStr, "token"):
// return true
default:
return false
}
}

Expand Down
Loading