Skip to content

Commit 8a2939e

Browse files
committed
update
1 parent 6e825f1 commit 8a2939e

File tree

9 files changed

+57
-39
lines changed

9 files changed

+57
-39
lines changed

controllers/apps/component/transformer_component_service.go

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import (
2323
"context"
2424
"fmt"
2525
"reflect"
26-
"strconv"
2726
"strings"
2827

2928
"golang.org/x/exp/maps"
@@ -42,7 +41,6 @@ import (
4241
"github.com/apecloud/kubeblocks/pkg/controller/factory"
4342
"github.com/apecloud/kubeblocks/pkg/controller/graph"
4443
"github.com/apecloud/kubeblocks/pkg/controller/model"
45-
"github.com/apecloud/kubeblocks/pkg/controller/multicluster"
4644
intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
4745
)
4846

@@ -278,21 +276,15 @@ func (t *componentServiceTransformer) createOrUpdateService(ctx graph.TransformC
278276
)
279277

280278
if service.Annotations != nil {
281-
kind = service.Annotations[constant.MultiClusterServicePlacementKey]
282-
delete(service.Annotations, constant.MultiClusterServicePlacementKey)
279+
kind = service.Annotations[constant.KBAppMultiClusterServicePlacementKey]
280+
delete(service.Annotations, constant.KBAppMultiClusterServicePlacementKey)
283281
}
284282
if podService && len(kind) > 0 && kind != multiClusterServicePlacementInMirror && kind != multiClusterServicePlacementInUnique {
285283
return fmt.Errorf("invalid multi-cluster pod-service placement kind %s for service %s", kind, service.Name)
286284
}
287285

288286
if podService && kind == multiClusterServicePlacementInUnique {
289-
// create or update service in unique, by hacking the pod placement strategy.
290-
ordinal := func() int {
291-
subs := strings.Split(service.GetName(), "-")
292-
o, _ := strconv.Atoi(subs[len(subs)-1])
293-
return o
294-
}
295-
multicluster.Assign(ctx.GetContext(), service, ordinal)
287+
service.Annotations[constant.KBAppMultiClusterObjectProvisionPolicyKey] = "ordinal" // HACK
296288
}
297289

298290
createOrUpdateService := func(service *corev1.Service) error {

controllers/workloads/instanceset_controller_2.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,11 +87,11 @@ func (r *InstanceSetReconciler2) setupWithMultiClusterManager(mgr ctrl.Manager,
8787
For(&workloads.InstanceSet{}).
8888
WithOptions(controller.Options{
8989
MaxConcurrentReconciles: viper.GetInt(constant.CfgKBReconcileWorkers),
90-
})
90+
}).
91+
Owns(&corev1.Service{}) // headless service
9192

9293
eventHandler := handler.EnqueueRequestsFromMapFunc(r.instanceFilter)
93-
multiClusterMgr.Watch(b, &workloads.Instance{}, eventHandler).
94-
Watch(b, &corev1.Service{}, eventHandler) // headless service
94+
multiClusterMgr.Watch(b, &workloads.Instance{}, eventHandler)
9595

9696
return b.Complete(r)
9797
}

pkg/constant/annotations.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,9 @@ const (
6161

6262
// annotations for multi-cluster
6363
const (
64-
KBAppMultiClusterPlacementKey = "apps.kubeblocks.io/multi-cluster-placement"
65-
MultiClusterServicePlacementKey = "apps.kubeblocks.io/multi-cluster-service-placement"
64+
KBAppMultiClusterPlacementKey = "apps.kubeblocks.io/multi-cluster-placement"
65+
KBAppMultiClusterServicePlacementKey = "apps.kubeblocks.io/multi-cluster-service-placement"
66+
KBAppMultiClusterObjectProvisionPolicyKey = "apps.kubeblocks.io/multi-cluster-object-provision-policy"
6667
)
6768

6869
func InheritedAnnotations() []string {

pkg/controller/component/service_reference.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ func referencedServiceVars(ctx context.Context, cli client.Reader, namespace str
154154
case len(selector.Service.Component) == 0:
155155
obj, err = clusterServiceGetter(ctx, cli, svcNamespace, selector.Cluster, selector.Service.Service)
156156
case selector.Service.Service == "headless":
157-
obj, err = headlessCompServiceGetter(ctx, cli, svcNamespace, selector.Cluster, selector.Service.Component)
157+
obj, err = headlessServiceGetter(ctx, cli, svcNamespace, selector.Cluster, selector.Service.Component)
158158
default:
159159
obj, err = compServiceGetter(ctx, cli, svcNamespace, selector.Cluster, selector.Service.Component, selector.Service.Service)
160160
}

pkg/controller/component/utils.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ import (
4242
)
4343

4444
func inDataContext() *multicluster.ClientOption {
45-
return multicluster.InDataContext()
45+
return multicluster.InDataContext() // TODO
4646
}
4747

4848
func ValidateDefNameRegexp(defNamePattern string) error {

pkg/controller/component/vars.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -925,7 +925,7 @@ func resolveServiceVarRefLow(ctx context.Context, cli client.Reader, synthesized
925925
selector appsv1.ServiceVarSelector, option *appsv1.VarOption, resolveVar func(any) (*corev1.EnvVar, *corev1.EnvVar, error)) ([]*corev1.EnvVar, []*corev1.EnvVar, error) {
926926
resolveObjs := func() (map[string]any, error) {
927927
headlessGetter := func(compName string) (any, error) {
928-
return headlessCompServiceGetter(ctx, cli, synthesizedComp.Namespace, synthesizedComp.ClusterName, compName)
928+
return headlessServiceGetter(ctx, cli, synthesizedComp.Namespace, synthesizedComp.ClusterName, compName)
929929
}
930930
getter := func(compName string) (any, error) {
931931
return compServiceGetter(ctx, cli, synthesizedComp.Namespace, synthesizedComp.ClusterName, compName, selector.Name)
@@ -944,7 +944,7 @@ func clusterServiceGetter(ctx context.Context, cli client.Reader, namespace, clu
944944
Name: constant.GenerateClusterServiceName(clusterName, name),
945945
}
946946
obj := &corev1.Service{}
947-
err := cli.Get(ctx, key, obj, inDataContext())
947+
err := cli.Get(ctx, key, obj, inDataContext()) // TODO: cluster service
948948
return &resolvedServiceObj{service: obj}, err
949949
}
950950

@@ -975,18 +975,18 @@ func compServiceGetter(ctx context.Context, cli client.Reader, namespace, cluste
975975
Name: svcName,
976976
}
977977
obj := &corev1.Service{}
978-
err = cli.Get(ctx, key, obj, inDataContext())
978+
err = cli.Get(ctx, key, obj, inDataContext()) // TODO: cmp service
979979
if err == nil {
980980
return &resolvedServiceObj{service: obj}, nil
981981
}
982-
if err != nil && !apierrors.IsNotFound(err) {
982+
if !apierrors.IsNotFound(err) {
983983
return nil, err
984984
}
985985

986986
// fall-back to list services and find the matched prefix
987987
svcList := &corev1.ServiceList{}
988988
matchingLabels := client.MatchingLabels(constant.GetCompLabels(clusterName, compName))
989-
err = cli.List(ctx, svcList, matchingLabels, inDataContext())
989+
err = cli.List(ctx, svcList, matchingLabels, inDataContext()) // TODO: cmp service
990990
if err != nil {
991991
return nil, err
992992
}
@@ -1003,13 +1003,13 @@ func compServiceGetter(ctx context.Context, cli client.Reader, namespace, cluste
10031003
return &resolvedServiceObj{podServices: objs}, nil
10041004
}
10051005

1006-
func headlessCompServiceGetter(ctx context.Context, cli client.Reader, namespace, clusterName, compName string) (any, error) {
1006+
func headlessServiceGetter(ctx context.Context, cli client.Reader, namespace, clusterName, compName string) (any, error) {
10071007
key := types.NamespacedName{
10081008
Namespace: namespace,
10091009
Name: constant.GenerateDefaultComponentHeadlessServiceName(clusterName, compName),
10101010
}
10111011
obj := &corev1.Service{}
1012-
err := cli.Get(ctx, key, obj, inDataContext())
1012+
err := cli.Get(ctx, key, obj, inDataContext()) // TODO: headless service
10131013
return &resolvedServiceObj{service: obj}, err
10141014
}
10151015

pkg/controller/component/workload_utils.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ func ListOwnedPVCs(ctx context.Context, cli client.Reader, namespace, clusterNam
6060
if opts == nil {
6161
opts = make([]client.ListOption, 0)
6262
}
63-
opts = append(opts, inDataContext())
63+
opts = append(opts, inDataContext()) // TODO: pvc
6464
return listObjWithLabelsInNamespace(ctx, cli, generics.PersistentVolumeClaimSignature, namespace, labels, opts...)
6565
}
6666

@@ -99,7 +99,7 @@ func listPods(ctx context.Context, cli client.Reader, namespace, clusterName, co
9999
if opts == nil {
100100
opts = make([]client.ListOption, 0)
101101
}
102-
opts = append(opts, inDataContext())
102+
opts = append(opts, inDataContext()) // TODO: pod
103103
return listObjWithLabelsInNamespace(ctx, cli, generics.PodSignature, namespace, labels, opts...)
104104
}
105105

pkg/controller/instance/reconciler_assistant_object.go

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
2020
package instance
2121

2222
import (
23+
"fmt"
2324
"reflect"
25+
"strconv"
26+
"strings"
2427

2528
"golang.org/x/exp/maps"
2629
corev1 "k8s.io/api/core/v1"
@@ -30,6 +33,7 @@ import (
3033
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
3134

3235
workloads "github.com/apecloud/kubeblocks/apis/workloads/v1"
36+
"github.com/apecloud/kubeblocks/pkg/constant"
3337
"github.com/apecloud/kubeblocks/pkg/controller/kubebuilderx"
3438
"github.com/apecloud/kubeblocks/pkg/controller/model"
3539
)
@@ -55,19 +59,21 @@ func (r *assistantObjectReconciler) PreCondition(tree *kubebuilderx.ObjectTree)
5559
func (r *assistantObjectReconciler) Reconcile(tree *kubebuilderx.ObjectTree) (kubebuilderx.Result, error) {
5660
inst := tree.GetRoot().(*workloads.Instance)
5761
for _, obj := range inst.Spec.InstanceAssistantObjects {
58-
_, err := r.createOrUpdate(tree, inst, obj)
59-
if err != nil {
62+
if err := r.createOrUpdate(tree, inst, obj); err != nil {
6063
return kubebuilderx.Continue, err
6164
}
6265
}
6366
return kubebuilderx.Continue, nil
6467
}
6568

66-
func (r *assistantObjectReconciler) createOrUpdate(tree *kubebuilderx.ObjectTree, inst *workloads.Instance, assistantObj workloads.InstanceAssistantObject) (bool, error) {
67-
obj := r.instanceAssistantObject(assistantObj)
69+
func (r *assistantObjectReconciler) createOrUpdate(tree *kubebuilderx.ObjectTree, inst *workloads.Instance, assistantObj workloads.InstanceAssistantObject) error {
70+
obj := r.checkObjectProvisionPolicy(inst, r.instanceAssistantObject(assistantObj))
71+
if obj == nil {
72+
return nil // skip the object
73+
}
6874
robj, err := tree.Get(obj)
6975
if err != nil && !errors.IsNotFound(err) {
70-
return false, err
76+
return err
7177
}
7278
if err != nil || robj == nil {
7379
labels := obj.GetLabels()
@@ -78,14 +84,14 @@ func (r *assistantObjectReconciler) createOrUpdate(tree *kubebuilderx.ObjectTree
7884
}
7985
obj.SetLabels(labels)
8086
if err := controllerutil.SetControllerReference(inst, obj, model.GetScheme()); err != nil {
81-
return false, err
87+
return err
8288
}
83-
return true, tree.Add(obj)
89+
return tree.Add(obj)
8490
}
8591
if merged := r.copyAndMerge(assistantObj, robj, obj); merged != nil {
86-
return true, tree.Update(merged)
92+
return tree.Update(merged)
8793
}
88-
return false, nil
94+
return nil
8995
}
9096

9197
func (r *assistantObjectReconciler) instanceAssistantObject(obj workloads.InstanceAssistantObject) client.Object {
@@ -107,6 +113,26 @@ func (r *assistantObjectReconciler) instanceAssistantObject(obj workloads.Instan
107113
return obj.RoleBinding
108114
}
109115

116+
func (r *assistantObjectReconciler) checkObjectProvisionPolicy(inst *workloads.Instance, obj client.Object) client.Object {
117+
var policy string
118+
if obj.GetAnnotations() != nil {
119+
policy = obj.GetAnnotations()[constant.KBAppMultiClusterObjectProvisionPolicyKey]
120+
}
121+
if policy != "ordinal" { // HACK
122+
return obj
123+
}
124+
125+
ordinal := func() int {
126+
subs := strings.Split(inst.GetName(), "-")
127+
o, _ := strconv.Atoi(subs[len(subs)-1])
128+
return o
129+
}
130+
if strings.HasSuffix(obj.GetName(), fmt.Sprintf("-%d", ordinal())) {
131+
return obj
132+
}
133+
return nil
134+
}
135+
110136
func (r *assistantObjectReconciler) copyAndMerge(obj workloads.InstanceAssistantObject, oldObj, newObj client.Object) client.Object {
111137
service := func() client.Object {
112138
return copyAndMergeAssistantObject(oldObj, newObj,

pkg/controller/kubebuilderx/utils.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,11 @@ import (
2626
"strconv"
2727
"strings"
2828

29-
corev1 "k8s.io/api/core/v1"
3029
apierrors "k8s.io/apimachinery/pkg/api/errors"
3130
ctrl "sigs.k8s.io/controller-runtime"
3231
"sigs.k8s.io/controller-runtime/pkg/client"
3332

33+
workloads "github.com/apecloud/kubeblocks/apis/workloads/v1"
3434
"github.com/apecloud/kubeblocks/pkg/constant"
3535
"github.com/apecloud/kubeblocks/pkg/controller/model"
3636
"github.com/apecloud/kubeblocks/pkg/controller/multicluster"
@@ -92,8 +92,7 @@ func placement(obj client.Object) string {
9292

9393
func assign(ctx context.Context, obj client.Object) client.Object {
9494
switch obj.(type) {
95-
// only handle Pod and PersistentVolumeClaim
96-
case *corev1.Pod, *corev1.PersistentVolumeClaim:
95+
case *workloads.Instance: // TODO
9796
ordinal := func() int {
9897
subs := strings.Split(obj.GetName(), "-")
9998
o, _ := strconv.Atoi(subs[len(subs)-1])

0 commit comments

Comments
 (0)