From 16b0a5f1a7070cee6601f80724082e887a23f31c Mon Sep 17 00:00:00 2001 From: Jonathan Ogilvie Date: Thu, 21 Aug 2025 16:57:27 -0400 Subject: [PATCH 1/3] Add taint functionality to cluster cache Signed-off-by: Jonathan Ogilvie --- pkg/cache/cluster.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/pkg/cache/cluster.go b/pkg/cache/cluster.go index 38f1e6016..d827a9299 100644 --- a/pkg/cache/cluster.go +++ b/pkg/cache/cluster.go @@ -96,6 +96,12 @@ type ClusterInfo struct { SyncError error // APIResources holds list of API resources supported by the cluster APIResources []kube.APIResourceInfo + // FailedResourceGVKs holds list of GVKs that failed to sync due to conversion webhook errors + FailedResourceGVKs []string + // IsTainted indicates if the cluster cache is in a tainted state + IsTainted bool + // TaintReason provides information about why the cluster is tainted + TaintReason string } // OnEventHandler is a function that handles Kubernetes event @@ -1378,6 +1384,9 @@ func (c *clusterCache) GetClusterInfo() ClusterInfo { c.syncStatus.lock.Lock() defer c.syncStatus.lock.Unlock() + // Retrieve failed resource GVKs from the internal state + failedGVKs := c.getFailedResourceGVKs() + return ClusterInfo{ APIsCount: len(c.apisMeta), K8SVersion: c.serverVersion, @@ -1386,9 +1395,18 @@ func (c *clusterCache) GetClusterInfo() ClusterInfo { LastCacheSyncTime: c.syncStatus.syncTime, SyncError: c.syncStatus.syncError, APIResources: c.apiResources, + FailedResourceGVKs: failedGVKs, } } +// getFailedResourceGVKs returns a list of GVKs that failed to sync due to conversion webhook errors. +// This method should be called with the clusterCache lock held. +func (c *clusterCache) getFailedResourceGVKs() []string { + // This would be implemented by the liveStateCache in Argo CD and tracked there + // For now, return an empty list as this is just the gitops-engine interface + return []string{} +} + // skipAppRequeuing checks if the object is an API type which we want to skip requeuing against. // We ignore API types which have a high churn rate, and/or whose updates are irrelevant to the app func skipAppRequeuing(key kube.ResourceKey) bool { From 3b71e335c6770c9b2062a2413f09cf19f899bbfd Mon Sep 17 00:00:00 2001 From: Jonathan Ogilvie Date: Wed, 27 Aug 2025 17:19:09 -0400 Subject: [PATCH 2/3] WIP: track tainted GVKs Signed-off-by: Jonathan Ogilvie --- pkg/cache/cluster.go | 105 +++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 101 insertions(+), 4 deletions(-) diff --git a/pkg/cache/cluster.go b/pkg/cache/cluster.go index d827a9299..e8484781f 100644 --- a/pkg/cache/cluster.go +++ b/pkg/cache/cluster.go @@ -174,6 +174,7 @@ func NewClusterCache(config *rest.Config, opts ...UpdateSettingsFunc) *clusterCa listSemaphore: semaphore.NewWeighted(defaultListSemaphoreWeight), resources: make(map[kube.ResourceKey]*Resource), nsIndex: make(map[string]map[kube.ResourceKey]*Resource), + failedResourceGVKs: make(map[string]bool), config: config, kubectl: &kube.KubectlCmd{ Log: log, @@ -233,6 +234,9 @@ type clusterCache struct { lock sync.RWMutex resources map[kube.ResourceKey]*Resource nsIndex map[string]map[kube.ResourceKey]*Resource + + // failedResourceGVKs tracks GVKs that failed to sync due to conversion webhook errors + failedResourceGVKs map[string]bool kubectl kube.Kubectl log logr.Logger @@ -493,6 +497,7 @@ func (c *clusterCache) Invalidate(opts ...UpdateSettingsFunc) { } c.apisMeta = nil c.namespacedResources = nil + c.failedResourceGVKs = make(map[string]bool) c.log.Info("Invalidated cluster") } @@ -643,15 +648,42 @@ func (c *clusterCache) loadInitialState(ctx context.Context, api kube.APIResourc }) }) if err != nil { + if c.errorTaintsCache(err) { + // For cache-tainting errors, track the failed GVK but don't fail completely + gvk := schema.GroupVersionKind{ + Group: api.GroupKind.Group, + Version: api.GroupVersionResource.Version, + Kind: api.GroupKind.Kind, + } + if lock { + runSynced(&c.lock, func() error { + c.trackFailedGVK(gvk, err) + return nil + }) + } else { + c.trackFailedGVK(gvk, err) + } + // Return empty items and successful resource version for cache-tainting errors + return "", nil + } return "", fmt.Errorf("failed to load initial state of resource %s: %w", api.GroupKind.String(), err) } + // Successfully loaded resources - clear any previously failed GVK + gvk := schema.GroupVersionKind{ + Group: api.GroupKind.Group, + Version: api.GroupVersionResource.Version, + Kind: api.GroupKind.Kind, + } + if lock { return resourceVersion, runSynced(&c.lock, func() error { + c.clearFailedGVK(gvk) c.replaceResourceCache(api.GroupKind, items, ns) return nil }) } + c.clearFailedGVK(gvk) c.replaceResourceCache(api.GroupKind, items, ns) return resourceVersion, nil } @@ -972,9 +1004,31 @@ func (c *clusterCache) sync() error { return nil } } + if c.errorTaintsCache(err) { + // For cache-tainting errors, track the failed GVK but don't fail the sync + gvk := schema.GroupVersionKind{ + Group: api.GroupKind.Group, + Version: api.GroupVersionResource.Version, + Kind: api.GroupKind.Kind, + } + lock.Lock() + c.trackFailedGVK(gvk, err) + lock.Unlock() + return nil + } return fmt.Errorf("failed to load initial state of resource %s: %w", api.GroupKind.String(), err) } + // Successfully loaded resources - clear any previously failed GVK + gvk := schema.GroupVersionKind{ + Group: api.GroupKind.Group, + Version: api.GroupVersionResource.Version, + Kind: api.GroupKind.Kind, + } + lock.Lock() + c.clearFailedGVK(gvk) + lock.Unlock() + go c.watchEvents(ctx, api, resClient, ns, resourceVersion) return nil @@ -1399,12 +1453,55 @@ func (c *clusterCache) GetClusterInfo() ClusterInfo { } } -// getFailedResourceGVKs returns a list of GVKs that failed to sync due to conversion webhook errors. +// getFailedResourceGVKs returns a list of GVKs that failed to sync due to cache-tainting errors. // This method should be called with the clusterCache lock held. func (c *clusterCache) getFailedResourceGVKs() []string { - // This would be implemented by the liveStateCache in Argo CD and tracked there - // For now, return an empty list as this is just the gitops-engine interface - return []string{} + var failedGVKs []string + for gvk := range c.failedResourceGVKs { + failedGVKs = append(failedGVKs, gvk) + } + return failedGVKs +} + +// trackFailedGVK tracks a GVK that failed to sync due to a cache-tainting error. +// This method should be called with the clusterCache lock held. +func (c *clusterCache) trackFailedGVK(gvk schema.GroupVersionKind, err error) { + gvkString := gvk.String() + c.failedResourceGVKs[gvkString] = true + c.log.Info("Tracked failed resource GVK due to cache-tainting error", "gvk", gvkString, "error", err.Error()) +} + +// clearFailedGVK removes a GVK from the failed tracking when it recovers. +// This method should be called with the clusterCache lock held. +func (c *clusterCache) clearFailedGVK(gvk schema.GroupVersionKind) { + gvkString := gvk.String() + if _, exists := c.failedResourceGVKs[gvkString]; exists { + delete(c.failedResourceGVKs, gvkString) + c.log.Info("Cleared failed resource GVK after recovery", "gvk", gvkString) + } +} + +// errorTaintsCache checks if an error should cause the cache to be marked as tainted +// but allow it to continue operating. This includes errors like conversion webhook +// failures and other recoverable sync errors. +func (c *clusterCache) errorTaintsCache(err error) bool { + if err == nil { + return false + } + + errStr := err.Error() + + switch { + case strings.Contains(errStr, "conversion webhook"): + return true + case strings.Contains(errStr, "failed") && strings.Contains(errStr, "convert"): + return true + // Future extension: pagination token expiration errors + // case strings.Contains(errStr, "expired") && strings.Contains(errStr, "token"): + // return true + default: + return false + } } // skipAppRequeuing checks if the object is an API type which we want to skip requeuing against. From 9cb3a71bec5ee7626064c53024d10033d4202235 Mon Sep 17 00:00:00 2001 From: Jonathan Ogilvie Date: Fri, 29 Aug 2025 11:42:53 -0400 Subject: [PATCH 3/3] Add tests and fix lint Signed-off-by: Jonathan Ogilvie --- pkg/cache/cluster.go | 26 ++-- pkg/cache/cluster_test.go | 271 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 284 insertions(+), 13 deletions(-) diff --git a/pkg/cache/cluster.go b/pkg/cache/cluster.go index e8484781f..4684ca9ee 100644 --- a/pkg/cache/cluster.go +++ b/pkg/cache/cluster.go @@ -234,7 +234,7 @@ type clusterCache struct { lock sync.RWMutex resources map[kube.ResourceKey]*Resource nsIndex map[string]map[kube.ResourceKey]*Resource - + // failedResourceGVKs tracks GVKs that failed to sync due to conversion webhook errors failedResourceGVKs map[string]bool @@ -656,7 +656,7 @@ func (c *clusterCache) loadInitialState(ctx context.Context, api kube.APIResourc Kind: api.GroupKind.Kind, } if lock { - runSynced(&c.lock, func() error { + _ = runSynced(&c.lock, func() error { c.trackFailedGVK(gvk, err) return nil }) @@ -675,7 +675,7 @@ func (c *clusterCache) loadInitialState(ctx context.Context, api kube.APIResourc Version: api.GroupVersionResource.Version, Kind: api.GroupKind.Kind, } - + if lock { return resourceVersion, runSynced(&c.lock, func() error { c.clearFailedGVK(gvk) @@ -1440,15 +1440,15 @@ func (c *clusterCache) GetClusterInfo() ClusterInfo { // Retrieve failed resource GVKs from the internal state failedGVKs := c.getFailedResourceGVKs() - + return ClusterInfo{ - APIsCount: len(c.apisMeta), - K8SVersion: c.serverVersion, - ResourcesCount: len(c.resources), - Server: c.config.Host, - LastCacheSyncTime: c.syncStatus.syncTime, - SyncError: c.syncStatus.syncError, - APIResources: c.apiResources, + APIsCount: len(c.apisMeta), + K8SVersion: c.serverVersion, + ResourcesCount: len(c.resources), + Server: c.config.Host, + LastCacheSyncTime: c.syncStatus.syncTime, + SyncError: c.syncStatus.syncError, + APIResources: c.apiResources, FailedResourceGVKs: failedGVKs, } } @@ -1488,9 +1488,9 @@ func (c *clusterCache) errorTaintsCache(err error) bool { if err == nil { return false } - + errStr := err.Error() - + switch { case strings.Contains(errStr, "conversion webhook"): return true diff --git a/pkg/cache/cluster_test.go b/pkg/cache/cluster_test.go index 7bac78043..5634df7e5 100644 --- a/pkg/cache/cluster_test.go +++ b/pkg/cache/cluster_test.go @@ -1292,3 +1292,274 @@ func BenchmarkIterateHierarchyV2(b *testing.B) { }) } } + +func Test_errorTaintsCache(t *testing.T) { + cache := newCluster(t) + + testCases := []struct { + name string + err error + expected bool + }{ + { + name: "nil error should not taint cache", + err: nil, + expected: false, + }, + { + name: "conversion webhook error should taint cache", + err: errors.New("conversion webhook for example.com/v1, Kind=Example failed: Post \"https://webhook-service.svc:443/convert\": connection refused"), + expected: true, + }, + { + name: "failed convert error should taint cache", + err: errors.New("failed to convert resource: conversion error"), + expected: true, + }, + { + name: "generic error should not taint cache", + err: errors.New("some other error"), + expected: false, + }, + { + name: "authentication error should not taint cache", + err: errors.New("Unauthorized"), + expected: false, + }, + { + name: "network error should not taint cache", + err: errors.New("connection timeout"), + expected: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + result := cache.errorTaintsCache(tc.err) + assert.Equal(t, tc.expected, result, "errorTaintsCache should return %v for error: %v", tc.expected, tc.err) + }) + } +} + +func Test_trackFailedGVK(t *testing.T) { + cache := newCluster(t) + + testGVK := schema.GroupVersionKind{ + Group: "example.com", + Version: "v1", + Kind: "TestResource", + } + + testError := errors.New("conversion webhook failed") + + // Initially, no failed GVKs should be tracked + failedGVKs := cache.getFailedResourceGVKs() + assert.Empty(t, failedGVKs, "Initially no failed GVKs should be tracked") + + // Track a failed GVK + cache.trackFailedGVK(testGVK, testError) + + // Verify it's tracked + failedGVKs = cache.getFailedResourceGVKs() + assert.Len(t, failedGVKs, 1, "Should have exactly one failed GVK tracked") + assert.Contains(t, failedGVKs, testGVK.String(), "Should contain the tracked GVK") + + // Track the same GVK again (should not duplicate) + cache.trackFailedGVK(testGVK, testError) + failedGVKs = cache.getFailedResourceGVKs() + assert.Len(t, failedGVKs, 1, "Should still have exactly one failed GVK tracked") + + // Track a different GVK + anotherGVK := schema.GroupVersionKind{ + Group: "other.com", + Version: "v2", + Kind: "AnotherResource", + } + cache.trackFailedGVK(anotherGVK, testError) + + // Verify both are tracked + failedGVKs = cache.getFailedResourceGVKs() + assert.Len(t, failedGVKs, 2, "Should have exactly two failed GVKs tracked") + assert.Contains(t, failedGVKs, testGVK.String(), "Should contain the first GVK") + assert.Contains(t, failedGVKs, anotherGVK.String(), "Should contain the second GVK") +} + +func Test_clearFailedGVK(t *testing.T) { + cache := newCluster(t) + + testGVK := schema.GroupVersionKind{ + Group: "example.com", + Version: "v1", + Kind: "TestResource", + } + + anotherGVK := schema.GroupVersionKind{ + Group: "other.com", + Version: "v2", + Kind: "AnotherResource", + } + + testError := errors.New("conversion webhook failed") + + // Track two failed GVKs + cache.trackFailedGVK(testGVK, testError) + cache.trackFailedGVK(anotherGVK, testError) + + // Verify both are tracked + failedGVKs := cache.getFailedResourceGVKs() + assert.Len(t, failedGVKs, 2, "Should have exactly two failed GVKs tracked") + + // Clear one GVK + cache.clearFailedGVK(testGVK) + + // Verify only one remains + failedGVKs = cache.getFailedResourceGVKs() + assert.Len(t, failedGVKs, 1, "Should have exactly one failed GVK tracked after clearing one") + assert.Contains(t, failedGVKs, anotherGVK.String(), "Should contain the non-cleared GVK") + assert.NotContains(t, failedGVKs, testGVK.String(), "Should not contain the cleared GVK") + + // Clear a non-existent GVK (should be safe) + nonExistentGVK := schema.GroupVersionKind{ + Group: "nonexistent.com", + Version: "v1", + Kind: "NonExistent", + } + cache.clearFailedGVK(nonExistentGVK) // Should not panic or affect other entries + + // Verify the remaining GVK is still there + failedGVKs = cache.getFailedResourceGVKs() + assert.Len(t, failedGVKs, 1, "Should still have exactly one failed GVK tracked") + + // Clear the last GVK + cache.clearFailedGVK(anotherGVK) + + // Verify no GVKs remain + failedGVKs = cache.getFailedResourceGVKs() + assert.Empty(t, failedGVKs, "Should have no failed GVKs tracked after clearing all") +} + +func Test_GetClusterInfo_FailedResourceGVKs(t *testing.T) { + cache := newCluster(t) + + // Initially, cluster info should have no failed GVKs + clusterInfo := cache.GetClusterInfo() + assert.Empty(t, clusterInfo.FailedResourceGVKs, "Initially no failed GVKs should be in cluster info") + + // Track a failed GVK + testGVK := schema.GroupVersionKind{ + Group: "example.com", + Version: "v1", + Kind: "TestResource", + } + testError := errors.New("conversion webhook failed") + cache.trackFailedGVK(testGVK, testError) + + // Verify cluster info includes the failed GVK + clusterInfo = cache.GetClusterInfo() + assert.Len(t, clusterInfo.FailedResourceGVKs, 1, "Cluster info should include exactly one failed GVK") + assert.Contains(t, clusterInfo.FailedResourceGVKs, testGVK.String(), "Cluster info should contain the tracked GVK") + + // Track another failed GVK + anotherGVK := schema.GroupVersionKind{ + Group: "other.com", + Version: "v2", + Kind: "AnotherResource", + } + cache.trackFailedGVK(anotherGVK, testError) + + // Verify cluster info includes both failed GVKs + clusterInfo = cache.GetClusterInfo() + assert.Len(t, clusterInfo.FailedResourceGVKs, 2, "Cluster info should include exactly two failed GVKs") + assert.Contains(t, clusterInfo.FailedResourceGVKs, testGVK.String(), "Cluster info should contain the first GVK") + assert.Contains(t, clusterInfo.FailedResourceGVKs, anotherGVK.String(), "Cluster info should contain the second GVK") + + // Clear one GVK and verify cluster info is updated + cache.clearFailedGVK(testGVK) + clusterInfo = cache.GetClusterInfo() + assert.Len(t, clusterInfo.FailedResourceGVKs, 1, "Cluster info should include exactly one failed GVK after clearing one") + assert.Contains(t, clusterInfo.FailedResourceGVKs, anotherGVK.String(), "Cluster info should contain the remaining GVK") + assert.NotContains(t, clusterInfo.FailedResourceGVKs, testGVK.String(), "Cluster info should not contain the cleared GVK") +} + +func Test_ConversionWebhookErrorHandlingDuringSync(t *testing.T) { + // Create a test pod that we'll try to load + pod := &corev1.Pod{ + TypeMeta: metav1.TypeMeta{Kind: "Pod", APIVersion: "v1"}, + ObjectMeta: metav1.ObjectMeta{Name: "test-pod", Namespace: "default"}, + Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "test", Image: "test"}}}, + } + + // Create a cluster cache with the pod + cache := newCluster(t, pod) + + // Override the client with one that returns conversion webhook errors for certain operations + client := fake.NewSimpleDynamicClient(scheme.Scheme, pod) + + conversionWebhookError := errors.New("conversion webhook for v1, Kind=Pod failed: Post \"https://conversion-webhook.svc:443/convert?timeout=30s\": connection refused") + + // Set up reactor to simulate conversion webhook failures for list operations + client.PrependReactor("list", "pods", func(_ testcore.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, conversionWebhookError + }) + + // Since we can't easily replace the client, we'll test the error handling logic directly + + // Test the error handling in loadInitialState (this is a bit of a unit test within an integration test) + testGVK := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Pod"} + + // Initially no failed GVKs should be tracked + failedGVKs := cache.getFailedResourceGVKs() + assert.Empty(t, failedGVKs, "Initially no failed GVKs should be tracked") + + // Simulate the error taint detection and tracking that happens in loadInitialState + if cache.errorTaintsCache(conversionWebhookError) { + cache.trackFailedGVK(testGVK, conversionWebhookError) + } + + // Verify that the GVK was tracked as failed + failedGVKs = cache.getFailedResourceGVKs() + assert.Len(t, failedGVKs, 1, "Should have exactly one failed GVK tracked") + assert.Contains(t, failedGVKs, testGVK.String(), "Should contain the pod GVK") + + // Verify that cluster info reflects the failed GVK + clusterInfo := cache.GetClusterInfo() + assert.Len(t, clusterInfo.FailedResourceGVKs, 1, "Cluster info should include exactly one failed GVK") + assert.Contains(t, clusterInfo.FailedResourceGVKs, testGVK.String(), "Cluster info should contain the failed pod GVK") + + // Simulate recovery: clear the failed GVK (this would happen when the resource successfully loads later) + cache.clearFailedGVK(testGVK) + + // Verify recovery was tracked + failedGVKs = cache.getFailedResourceGVKs() + assert.Empty(t, failedGVKs, "Should have no failed GVKs after recovery") + + clusterInfo = cache.GetClusterInfo() + assert.Empty(t, clusterInfo.FailedResourceGVKs, "Cluster info should have no failed GVKs after recovery") +} + +func Test_ConversionWebhookErrorDoesNotFailSync(t *testing.T) { + cache := newCluster(t) + + // Test that conversion webhook errors are properly identified as cache-tainting errors + conversionWebhookError := errors.New("conversion webhook for example.com/v1, Kind=Example failed") + assert.True(t, cache.errorTaintsCache(conversionWebhookError), "Conversion webhook errors should taint cache") + + // Test that other errors are not cache-tainting + networkError := errors.New("connection timeout") + assert.False(t, cache.errorTaintsCache(networkError), "Network errors should not taint cache") + + authError := errors.New("Unauthorized") + assert.False(t, cache.errorTaintsCache(authError), "Auth errors should not taint cache") + + // Test various conversion webhook error patterns + testCases := []string{ + "conversion webhook for apps/v1, Kind=Deployment failed: Post \"https://webhook.svc:443/convert\": connection refused", + "failed to convert resource from apps/v1beta1 to apps/v1: conversion error", + "conversion webhook for stable.example.com/v1, Kind=CronTab failed: timeout", + } + + for _, errorMsg := range testCases { + err := errors.New(errorMsg) + assert.True(t, cache.errorTaintsCache(err), "Error '%s' should taint cache", errorMsg) + } +}