Skip to content

Commit ee6c26d

Browse files
committed
Propagate errors from cluster cache down to affected leaf applications
1 parent e35de02 commit ee6c26d

File tree

6 files changed

+281
-49
lines changed

6 files changed

+281
-49
lines changed

controller/appcontroller.go

Lines changed: 77 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1823,33 +1823,82 @@ func (ctrl *ApplicationController) processAppRefreshQueueItem() (processNext boo
18231823
app.Status.ReconciledAt = &now
18241824
}
18251825
app.Status.Sync = *compareResult.syncStatus
1826-
// Check both for conversion webhook errors in app state and compareResult conditions
1827-
hasConversionIssues := hasConversionWebhookErrors(app)
1826+
// Check for cluster cache issues that affect this application
1827+
hasCacheIssues := false
1828+
usesTaintedResources := false
18281829

1829-
// Also check if compareResult detected conversion webhook errors during refresh
1830-
// This helps identify conversion webhook errors during refresh operations
1830+
// 1. First check if the app itself reports conversion webhook errors
1831+
hasCacheIssues = hasClusterCacheIssues(app)
1832+
1833+
// 2. Check app conditions for cache-related errors during refresh
18311834
for _, condition := range app.Status.Conditions {
18321835
if condition.Type == appv1.ApplicationConditionComparisonError &&
18331836
(strings.Contains(condition.Message, "conversion webhook") ||
18341837
strings.Contains(condition.Message, "known conversion webhook failures") ||
1835-
strings.Contains(condition.Message, "unavailable resource types")) {
1836-
hasConversionIssues = true
1838+
strings.Contains(condition.Message, "unavailable resource types") ||
1839+
strings.Contains(condition.Message, "failed to list resources") ||
1840+
strings.Contains(condition.Message, "Expired: too old resource version")) {
1841+
hasCacheIssues = true
18371842
break
18381843
}
18391844
}
18401845

1841-
// Also check resources for conversion webhook errors
1846+
// 3. Check resources for errors
18421847
for _, res := range app.Status.Resources {
18431848
if res.Health != nil && res.Health.Message != "" &&
18441849
(strings.Contains(res.Health.Message, "conversion webhook") ||
1845-
strings.Contains(res.Health.Message, "unavailable resource types")) {
1846-
hasConversionIssues = true
1850+
strings.Contains(res.Health.Message, "unavailable resource types") ||
1851+
strings.Contains(res.Health.Message, "failed to get resource")) {
1852+
hasCacheIssues = true
18471853
break
18481854
}
18491855
}
18501856

1857+
// 4. Check if the app directly uses any tainted resources
1858+
clusterURL := app.Spec.Destination.Server
1859+
// Check if cluster is tainted directly using our cluster taint tracking
1860+
statecache.ClusterTaintLock.RLock()
1861+
taints, exists := statecache.GetClusterTaints()[clusterURL]
1862+
statecache.ClusterTaintLock.RUnlock()
1863+
1864+
// If the cluster is tainted, check if the app uses any of the affected resource types
1865+
if exists && len(taints) > 0 {
1866+
// Check if app directly uses any of the tainted GVKs
1867+
failedGVKs := statecache.GetTaintedGVKs(clusterURL)
1868+
1869+
resLoop:
1870+
for _, res := range app.Status.Resources {
1871+
gvkStr := fmt.Sprintf("%s/%s, Kind=%s", res.Group, res.Version, res.Kind)
1872+
gvkWildcard := fmt.Sprintf("%s/*, Kind=%s", res.Group, res.Kind)
1873+
1874+
for _, failedGVK := range failedGVKs {
1875+
if failedGVK == gvkStr || failedGVK == gvkWildcard {
1876+
usesTaintedResources = true
1877+
break resLoop
1878+
}
1879+
}
1880+
}
1881+
}
1882+
18511883
// If the application has conversion webhook errors, set health status to degraded
1852-
if hasConversionIssues {
1884+
// Set health status based on the severity of detected issues
1885+
if usesTaintedResources {
1886+
// Application directly uses tainted resources - mark as degraded
1887+
app.Status.Health.Status = health.HealthStatusDegraded
1888+
// Add a condition explaining why it's degraded
1889+
now := metav1.Now()
1890+
app.Status.SetConditions(
1891+
[]appv1.ApplicationCondition{
1892+
{
1893+
Type: appv1.ApplicationConditionComparisonError,
1894+
Message: "Application directly uses resources with known issues in the cluster cache",
1895+
LastTransitionTime: &now,
1896+
},
1897+
},
1898+
map[appv1.ApplicationConditionType]bool{appv1.ApplicationConditionComparisonError: true},
1899+
)
1900+
} else if hasCacheIssues {
1901+
// Application has cache issues but doesn't directly use tainted resources
18531902
app.Status.Health.Status = health.HealthStatusDegraded
18541903
} else {
18551904
app.Status.Health.Status = compareResult.healthStatus
@@ -2014,18 +2063,26 @@ func (ctrl *ApplicationController) needRefreshAppStatus(app *appv1.Application,
20142063
return false, refreshType, compareWith
20152064
}
20162065

2017-
// hasConversionWebhookErrors checks if the application has conversion webhook errors
2018-
// in either its sync results or conditions
2019-
func hasConversionWebhookErrors(app *appv1.Application) bool {
2066+
// hasClusterCacheIssues checks if the application has cluster cache issues
2067+
// like conversion webhook errors, pagination token expiration, etc.
2068+
func hasClusterCacheIssues(app *appv1.Application) bool {
20202069
// Check operation state for conversion webhook errors
20212070
if app.Status.OperationState != nil {
2022-
if strings.Contains(app.Status.OperationState.Message, "conversion webhook") || strings.Contains(app.Status.OperationState.Message, "known conversion webhook failures") || strings.Contains(app.Status.OperationState.Message, "unavailable resource types") {
2071+
if strings.Contains(app.Status.OperationState.Message, "conversion webhook") ||
2072+
strings.Contains(app.Status.OperationState.Message, "known conversion webhook failures") ||
2073+
strings.Contains(app.Status.OperationState.Message, "unavailable resource types") ||
2074+
strings.Contains(app.Status.OperationState.Message, "failed to list resources") ||
2075+
strings.Contains(app.Status.OperationState.Message, "Expired: too old resource version") {
20232076
return true
20242077
}
20252078
// Also check sync result resources
20262079
if app.Status.OperationState.SyncResult != nil {
20272080
for _, res := range app.Status.OperationState.SyncResult.Resources {
2028-
if strings.Contains(res.Message, "conversion webhook") || strings.Contains(res.Message, "known conversion webhook failures") || strings.Contains(res.Message, "unavailable resource types") {
2081+
if strings.Contains(res.Message, "conversion webhook") ||
2082+
strings.Contains(res.Message, "known conversion webhook failures") ||
2083+
strings.Contains(res.Message, "unavailable resource types") ||
2084+
strings.Contains(res.Message, "failed to list resources") ||
2085+
strings.Contains(res.Message, "Expired: too old resource version") {
20292086
return true
20302087
}
20312088
}
@@ -2036,7 +2093,11 @@ func hasConversionWebhookErrors(app *appv1.Application) bool {
20362093
for _, condition := range app.Status.Conditions {
20372094
if (condition.Type == appv1.ApplicationConditionComparisonError ||
20382095
condition.Type == appv1.ApplicationConditionSyncError) &&
2039-
(strings.Contains(condition.Message, "conversion webhook") || strings.Contains(condition.Message, "known conversion webhook failures") || strings.Contains(condition.Message, "unavailable resource types")) {
2096+
(strings.Contains(condition.Message, "conversion webhook") ||
2097+
strings.Contains(condition.Message, "known conversion webhook failures") ||
2098+
strings.Contains(condition.Message, "unavailable resource types") ||
2099+
strings.Contains(condition.Message, "failed to list resources") ||
2100+
strings.Contains(condition.Message, "Expired: too old resource version")) {
20402101
return true
20412102
}
20422103
}

controller/cache/cache.go

Lines changed: 161 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -655,14 +655,42 @@ func (c *liveStateCache) getSyncedCluster(server *appv1.Cluster) (clustercache.C
655655

656656
err = clusterCache.EnsureSynced()
657657
if err != nil {
658-
if isConversionWebhookError(err) {
659-
log.WithField("cluster", server.Server).Warnf("Conversion webhook error during cluster sync, cluster cache may be incomplete: %v", err)
658+
if isClusterCacheError(err) {
659+
log.WithField("cluster", server.Server).Warnf("Cluster cache sync error detected, cluster cache may be incomplete: %v", err)
660+
660661
// Extract the GVK from the error and track it as a failed GVK
661-
gvkStr := extractGVKFromConversionWebhookError(err)
662+
gvkStr := extractGVKFromCacheError(err)
662663
if gvkStr != "" {
663664
c.trackFailedResourceGVK(server.Server, gvkStr)
665+
666+
// Also mark the cluster as tainted
667+
errorType := "unknown"
668+
errStr := err.Error()
669+
if strings.Contains(errStr, "conversion webhook") {
670+
errorType = "conversion_webhook"
671+
} else if strings.Contains(errStr, "Expired: too old resource version") {
672+
errorType = "pagination_token_expired"
673+
} else if strings.Contains(errStr, "connection") {
674+
errorType = "connection_issue"
675+
}
676+
677+
markClusterTainted(server.Server, errStr, gvkStr, errorType)
678+
}
679+
680+
// Mark the cluster as tainted
681+
errorType := "unknown"
682+
errStr := err.Error()
683+
if strings.Contains(errStr, "conversion webhook") {
684+
errorType = "conversion_webhook"
685+
} else if strings.Contains(errStr, "Expired: too old resource version") {
686+
errorType = "pagination_token_expired"
687+
} else if strings.Contains(errStr, "connection") {
688+
errorType = "connection_issue"
664689
}
665-
// For conversion webhook errors, we return the cluster cache without an error
690+
691+
markClusterTainted(server.Server, errStr, gvkStr, errorType)
692+
693+
// For known cache errors, we return the cluster cache without an error
666694
// but log the issue. This allows applications to continue with
667695
// whatever state is available rather than failing completely.
668696
// We're handling partial state in each of the methods that use getSyncedCluster
@@ -1068,18 +1096,40 @@ func (c *liveStateCache) GetClustersInfo() []clustercache.ClusterInfo {
10681096
for gvk := range failedGVKs {
10691097
failedGVKList = append(failedGVKList, gvk)
10701098
}
1099+
}
1100+
c.failedGVKLock.RUnlock()
1101+
1102+
// Update cluster info with failed resource GVKs from our tracking
1103+
1104+
// Check if this cluster is tainted
1105+
ClusterTaintLock.RLock()
1106+
taints, exists := clusterTaints[server]
1107+
ClusterTaintLock.RUnlock()
1108+
1109+
// Update cluster info with taint status and reason
1110+
if exists && len(taints) > 0 {
1111+
info.IsTainted = true
10711112

1072-
// If there are failed GVKs, modify SyncError to include this information
1073-
if len(failedGVKList) > 0 {
1074-
msg := fmt.Sprintf("Cluster has %d unavailable resource types due to conversion webhook errors", len(failedGVKList))
1075-
if info.SyncError == nil {
1076-
info.SyncError = fmt.Errorf(msg)
1077-
} else {
1078-
info.SyncError = fmt.Errorf("%s; %s", info.SyncError.Error(), msg)
1113+
// Add any tainted GVKs that aren't already in the list
1114+
for gvk := range taints {
1115+
if !contains(failedGVKList, gvk) {
1116+
info.FailedResourceGVKs = append(info.FailedResourceGVKs, gvk)
10791117
}
10801118
}
1119+
1120+
// Set taint reason
1121+
info.TaintReason = "Cluster has tainted resource types"
1122+
}
1123+
1124+
// If there are failed GVKs, modify SyncError to include this information
1125+
if len(failedGVKList) > 0 {
1126+
msg := fmt.Sprintf("Cluster has %d unavailable resource types", len(failedGVKList))
1127+
if info.SyncError == nil {
1128+
info.SyncError = errors.New(msg)
1129+
} else {
1130+
info.SyncError = errors.New(info.SyncError.Error() + "; " + msg)
1131+
}
10811132
}
1082-
c.failedGVKLock.RUnlock()
10831133

10841134
res = append(res, info)
10851135
}
@@ -1095,7 +1145,42 @@ func (c *liveStateCache) UpdateShard(shard int) bool {
10951145
return c.clusterSharding.UpdateShard(shard)
10961146
}
10971147

1148+
// isClusterCacheError checks if an error is a known cluster cache error
1149+
// This handles a broader class of errors that can happen when syncing cluster cache
1150+
func isClusterCacheError(err error) bool {
1151+
if err == nil {
1152+
return false
1153+
}
1154+
errStr := err.Error()
1155+
1156+
// Define all the error patterns we want to catch
1157+
errorPatterns := []string{
1158+
// Conversion webhook errors
1159+
"conversion webhook",
1160+
1161+
// Pagination token expiration errors
1162+
"Expired: too old resource version",
1163+
1164+
// General resource list errors
1165+
"failed to list resources",
1166+
1167+
// Connection issues
1168+
"connection refused",
1169+
"connection reset by peer",
1170+
"i/o timeout",
1171+
}
1172+
1173+
for _, pattern := range errorPatterns {
1174+
if strings.Contains(errStr, pattern) {
1175+
return true
1176+
}
1177+
}
1178+
1179+
return false
1180+
}
1181+
10981182
// isConversionWebhookError checks if an error is related to conversion webhooks
1183+
// Kept for backward compatibility
10991184
func isConversionWebhookError(err error) bool {
11001185
if err == nil {
11011186
return false
@@ -1104,9 +1189,9 @@ func isConversionWebhookError(err error) bool {
11041189
return strings.Contains(errStr, "conversion webhook")
11051190
}
11061191

1107-
// extractGVKFromConversionWebhookError attempts to extract the GroupVersionKind from a conversion webhook error
1192+
// extractGVKFromCacheError attempts to extract the GroupVersionKind from various cache errors
11081193
// Returns an empty string if no GVK could be extracted
1109-
func extractGVKFromConversionWebhookError(err error) string {
1194+
func extractGVKFromCacheError(err error) string {
11101195
if err == nil {
11111196
return ""
11121197
}
@@ -1152,6 +1237,53 @@ func (c *liveStateCache) trackFailedResourceGVK(server string, gvkStr string) {
11521237
}).Infof("Tracked failed resource GVK due to conversion webhook error")
11531238
}
11541239

1240+
// Map of server URL to tainted resource GVKs and error types
1241+
var clusterTaints = make(map[string]map[string]string) // server -> gvk -> error type
1242+
var ClusterTaintLock = sync.RWMutex{}
1243+
1244+
// markClusterTainted marks a cluster as having tainted cache state
1245+
func markClusterTainted(server string, reason string, gvk string, errorType string) {
1246+
ClusterTaintLock.Lock()
1247+
defer ClusterTaintLock.Unlock()
1248+
1249+
// Initialize if not exists
1250+
_, exists := clusterTaints[server]
1251+
if !exists {
1252+
clusterTaints[server] = make(map[string]string)
1253+
}
1254+
1255+
// Store the GVK and error type
1256+
if gvk != "" {
1257+
clusterTaints[server][gvk] = errorType
1258+
}
1259+
}
1260+
1261+
// IsClusterTainted checks if a cluster is in tainted state
1262+
func IsClusterTainted(server string) bool {
1263+
ClusterTaintLock.RLock()
1264+
defer ClusterTaintLock.RUnlock()
1265+
1266+
taints, exists := clusterTaints[server]
1267+
return exists && len(taints) > 0
1268+
}
1269+
1270+
// GetTaintedGVKs returns a list of tainted GVKs for a cluster
1271+
func GetTaintedGVKs(server string) []string {
1272+
ClusterTaintLock.RLock()
1273+
defer ClusterTaintLock.RUnlock()
1274+
1275+
taints, exists := clusterTaints[server]
1276+
if !exists {
1277+
return nil
1278+
}
1279+
1280+
gvks := make([]string, 0, len(taints))
1281+
for gvk := range taints {
1282+
gvks = append(gvks, gvk)
1283+
}
1284+
return gvks
1285+
}
1286+
11551287
// isResourceGVKFailed checks if a resource GVK is in the failed resources map
11561288
func (c *liveStateCache) isResourceGVKFailed(server string, gvkStr string) bool {
11571289
c.failedGVKLock.RLock()
@@ -1176,6 +1308,21 @@ func (c *liveStateCache) clearFailedResourceGVKs(server string) {
11761308
delete(c.failedResourceGVKs, server)
11771309
}
11781310

1311+
// contains checks if a string is in a slice
1312+
func contains(slice []string, str string) bool {
1313+
for _, s := range slice {
1314+
if s == str {
1315+
return true
1316+
}
1317+
}
1318+
return false
1319+
}
1320+
1321+
// GetClusterTaints returns the map of cluster taints
1322+
func GetClusterTaints() map[string]map[string]string {
1323+
return clusterTaints
1324+
}
1325+
11791326
// cleanupExpiredFailedGVKs removes expired entries from the failed GVKs cache
11801327
// This should be called periodically to avoid the cache growing too large
11811328
func (c *liveStateCache) cleanupExpiredFailedGVKs() {

0 commit comments

Comments
 (0)