Skip to content

Commit 829faae

Browse files
Add Capacity Buffer controller logic
1 parent 7b9cb8c commit 829faae

13 files changed

+1065
-0
lines changed
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package common
18+
19+
import (
20+
"context"
21+
22+
v1 "k8s.io/autoscaler/cluster-autoscaler/apis/capacitybuffer/autoscaling.x-k8s.io/v1"
23+
client "k8s.io/autoscaler/cluster-autoscaler/apis/capacitybuffer/client/clientset/versioned"
24+
25+
corev1 "k8s.io/api/core/v1"
26+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27+
"k8s.io/client-go/kubernetes"
28+
)
29+
30+
// Constants to use in Capacity Buffers objects
31+
const (
32+
ActiveProvisioningStrategy = "active-capacity"
33+
ReadyForProvisioningCondition = "ReadyForProvisioning"
34+
ProvisioningCondition = "Provisioning"
35+
ConditionTrue = "True"
36+
ConditionFalse = "False"
37+
DefaultNamespace = "default"
38+
)
39+
40+
// CreatePodTemplate creates a pod template object by calling API server
41+
func CreatePodTemplate(client *kubernetes.Clientset, podTemplate *corev1.PodTemplate) (*corev1.PodTemplate, error) {
42+
return client.CoreV1().PodTemplates(DefaultNamespace).Create(context.TODO(), podTemplate, metav1.CreateOptions{})
43+
}
44+
45+
// UpdateBufferStatus updates the passed buffer object with its defined status
46+
func UpdateBufferStatus(buffersClient client.Interface, buffer *v1.CapacityBuffer) error {
47+
_, err := buffersClient.AutoscalingV1().CapacityBuffers(DefaultNamespace).UpdateStatus(context.TODO(), buffer, metav1.UpdateOptions{})
48+
return err
49+
}
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package controller
18+
19+
import (
20+
"time"
21+
22+
"k8s.io/klog/v2"
23+
24+
"k8s.io/apimachinery/pkg/labels"
25+
buffers_client "k8s.io/autoscaler/cluster-autoscaler/apis/capacitybuffer/client/clientset/versioned"
26+
27+
v1 "k8s.io/autoscaler/cluster-autoscaler/apis/capacitybuffer/client/listers/autoscaling.x-k8s.io/v1"
28+
"k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
29+
30+
common "k8s.io/autoscaler/cluster-autoscaler/capacitybuffer/common"
31+
buffers_filters "k8s.io/autoscaler/cluster-autoscaler/capacitybuffer/filters"
32+
buffers_translators "k8s.io/autoscaler/cluster-autoscaler/capacitybuffer/translators"
33+
buffers_updater "k8s.io/autoscaler/cluster-autoscaler/capacitybuffer/updater"
34+
35+
kube_client "k8s.io/client-go/kubernetes"
36+
)
37+
38+
const loopInterval = time.Second * 5
39+
40+
// BufferController performs updates on Buffers and convert them to pods to be injected
41+
type BufferController interface {
42+
// Run to run the reconciliation loop frequently every x seconds
43+
Run(stopCh <-chan struct{})
44+
}
45+
46+
type bufferController struct {
47+
buffersLister v1.CapacityBufferLister
48+
buffersStrategyFilter buffers_filters.BuffersFilter
49+
buffersStatusFilter buffers_filters.BuffersFilter
50+
buffersTranslator buffers_translators.BuffersTranslator
51+
buffersUpdater buffers_updater.BuffersStatusUpdater
52+
loopInterval time.Duration
53+
}
54+
55+
// NewBufferController creates new bufferController object
56+
func NewBufferController(
57+
buffersLister v1.CapacityBufferLister,
58+
buffersStrategyFilter buffers_filters.BuffersFilter,
59+
buffersStatusFilter buffers_filters.BuffersFilter,
60+
buffersTranslator buffers_translators.BuffersTranslator,
61+
buffersUpdater buffers_updater.BuffersStatusUpdater,
62+
loopInterval time.Duration,
63+
) (BufferController, error) {
64+
return &bufferController{
65+
buffersLister: buffersLister,
66+
buffersStrategyFilter: buffersStrategyFilter,
67+
buffersStatusFilter: buffersStatusFilter,
68+
buffersTranslator: buffersTranslator,
69+
buffersUpdater: buffersUpdater,
70+
loopInterval: loopInterval,
71+
}, nil
72+
}
73+
74+
// NewDefaultBufferController creates Updater with default configs
75+
func NewDefaultBufferController(
76+
listerRegistry kubernetes.ListerRegistry,
77+
capacityBufferClinet buffers_client.Clientset,
78+
nodeBufferListener v1.CapacityBufferLister,
79+
kubeClient kube_client.Clientset,
80+
) BufferController {
81+
return &bufferController{
82+
buffersLister: nodeBufferListener,
83+
buffersStrategyFilter: buffers_filters.NewDefaultBuffersStrategyFilter([]string{common.ActiveProvisioningStrategy}),
84+
buffersStatusFilter: buffers_filters.NewDefaultBuffersStatusFilter(map[string]string{
85+
common.ReadyForProvisioningCondition: common.ConditionTrue,
86+
common.ProvisioningCondition: common.ConditionTrue,
87+
}),
88+
buffersTranslator: buffers_translators.NewCombinedBuffersTranslator(
89+
[]buffers_translators.BuffersTranslator{
90+
buffers_translators.NewDefaultPodTemplateBufferTranslator(),
91+
},
92+
),
93+
buffersUpdater: *buffers_updater.NewDefaultBuffersStatusUpdater(&capacityBufferClinet),
94+
loopInterval: loopInterval,
95+
}
96+
}
97+
98+
// Run to run the controller reconcile loop
99+
func (c *bufferController) Run(stopCh <-chan struct{}) {
100+
for {
101+
select {
102+
case <-stopCh:
103+
return
104+
case <-time.After(c.loopInterval):
105+
c.reconcile()
106+
}
107+
}
108+
}
109+
110+
// Reconcile represents single iteration in the main-loop of Updater
111+
func (c *bufferController) reconcile() {
112+
113+
// 1. List all capacity buffers objects
114+
buffers, err := c.buffersLister.List(labels.Everything())
115+
if err != nil {
116+
klog.Errorf("Capacity buffer controller failed to list buffers with error: %v", err.Error())
117+
return
118+
}
119+
klog.V(2).Infof("Capacity buffer controller listed [%v] buffers", len(buffers))
120+
121+
// 2. Filter the desired provisioning strategy
122+
filteredBuffers, _ := c.buffersStrategyFilter.Filter(buffers)
123+
klog.V(2).Infof("Capacity buffer controller filtered %v buffers with buffers strategy filter", len(filteredBuffers))
124+
125+
// 2. Filter the desired status
126+
_, toBeTranslatedBuffers := c.buffersStatusFilter.Filter(filteredBuffers)
127+
klog.V(2).Infof("Capacity buffer controller filtered %v buffers with buffers status filter", len(filteredBuffers))
128+
129+
// 3. Extract a) pod specs and b) number of replicase from filtered buffers
130+
errors := c.buffersTranslator.Translate(toBeTranslatedBuffers)
131+
logErrors(errors)
132+
133+
// 4. Update buffer status by calling API server
134+
errors = c.buffersUpdater.Update(toBeTranslatedBuffers)
135+
logErrors(errors)
136+
}
137+
138+
func logErrors(errors []error) {
139+
for _, error := range errors {
140+
klog.Errorf("Capacity buffer controller error %v", error.Error())
141+
}
142+
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package buffersfilter
18+
19+
import (
20+
api_v1 "k8s.io/autoscaler/cluster-autoscaler/apis/capacitybuffer/autoscaling.x-k8s.io/v1"
21+
)
22+
23+
// BuffersFilter filters CapacityBuffer based on some criteria.
24+
type BuffersFilter interface {
25+
Filter(buffers []*api_v1.CapacityBuffer) ([]*api_v1.CapacityBuffer, []*api_v1.CapacityBuffer)
26+
CleanUp()
27+
}
28+
29+
// NoOpBuffersFilter is returning buffers lists without filtering any of them.
30+
type NoOpBuffersFilter struct {
31+
}
32+
33+
// NewDefaultNoOpBuffersFilter creates an instance of PodListProcessor.
34+
func NewDefaultNoOpBuffersFilter() *NoOpBuffersFilter {
35+
return &NoOpBuffersFilter{}
36+
}
37+
38+
// Filter do no filteration
39+
func (f *NoOpBuffersFilter) Filter(buffers []*api_v1.CapacityBuffer) ([]*api_v1.CapacityBuffer, []*api_v1.CapacityBuffer) {
40+
return buffers, []*api_v1.CapacityBuffer{}
41+
}
42+
43+
// CleanUp cleans up the filter's internal structures.
44+
func (f *NoOpBuffersFilter) CleanUp() {
45+
}
46+
47+
// CombinedBuffersFilter is a list of BuffersFilter
48+
type CombinedBuffersFilter struct {
49+
filters []BuffersFilter
50+
}
51+
52+
// NewCombinedBuffersFilter construct CombinedBuffersFilter.
53+
func NewCombinedBuffersFilter(filters []BuffersFilter) *CombinedBuffersFilter {
54+
return &CombinedBuffersFilter{filters}
55+
}
56+
57+
// AddFilter append a filter to the list.
58+
func (f *CombinedBuffersFilter) AddFilter(filter BuffersFilter) {
59+
f.filters = append(f.filters, filter)
60+
}
61+
62+
// Filter runs sub-filters sequentially
63+
func (f *CombinedBuffersFilter) Filter(buffers []*api_v1.CapacityBuffer) ([]*api_v1.CapacityBuffer, []*api_v1.CapacityBuffer) {
64+
var totalFilteredOutBuffers []*api_v1.CapacityBuffer
65+
for _, buffersFilter := range f.filters {
66+
updatedBuffersList, filteredOutBuffers := buffersFilter.Filter(buffers)
67+
buffers = updatedBuffersList
68+
totalFilteredOutBuffers = append(totalFilteredOutBuffers, filteredOutBuffers...)
69+
}
70+
return buffers, totalFilteredOutBuffers
71+
}
72+
73+
// CleanUp cleans up the filter's internal structures.
74+
func (f *CombinedBuffersFilter) CleanUp() {
75+
for _, filter := range f.filters {
76+
filter.CleanUp()
77+
}
78+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package buffersfilter
18+
19+
import (
20+
api_v1 "k8s.io/autoscaler/cluster-autoscaler/apis/capacitybuffer/autoscaling.x-k8s.io/v1"
21+
)
22+
23+
// BuffersStatusFilter filters out buffers with conditions defined in conditionsToFilterOut
24+
type BuffersStatusFilter struct {
25+
conditions map[string]string
26+
}
27+
28+
// NewDefaultBuffersStatusFilter creates an instance of BuffersStatusFilter that filters out the buffers with non of any passed conditions.
29+
func NewDefaultBuffersStatusFilter(conditions map[string]string) *BuffersStatusFilter {
30+
return &BuffersStatusFilter{
31+
conditions: conditions,
32+
}
33+
}
34+
35+
// Filter filters the passed buffers based on buffer status conditions
36+
func (b *BuffersStatusFilter) Filter(buffersToFilter []*api_v1.CapacityBuffer) ([]*api_v1.CapacityBuffer, []*api_v1.CapacityBuffer) {
37+
var buffers []*api_v1.CapacityBuffer
38+
var filteredOutBuffers []*api_v1.CapacityBuffer
39+
40+
for _, buffer := range buffersToFilter {
41+
if b.hasCondition(buffer) {
42+
buffers = append(buffers, buffer)
43+
} else {
44+
filteredOutBuffers = append(filteredOutBuffers, buffer)
45+
}
46+
}
47+
return buffers, filteredOutBuffers
48+
}
49+
50+
func (b *BuffersStatusFilter) hasCondition(buffer *api_v1.CapacityBuffer) bool {
51+
bufferConditions := buffer.Status.Conditions
52+
for _, condition := range bufferConditions {
53+
if val, found := b.conditions[condition.Type]; found && val == string(condition.Status) {
54+
return true
55+
}
56+
}
57+
return false
58+
}
59+
60+
// CleanUp cleans up the filter's internal structures.
61+
func (b *BuffersStatusFilter) CleanUp() {
62+
b.conditions = map[string]string{}
63+
}

0 commit comments

Comments
 (0)