Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 16 additions & 13 deletions cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ func main() {
}

if err = (&component.ComponentReconciler{
Client: client,
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("component-controller"),
}).SetupWithManager(mgr); err != nil {
Expand All @@ -497,10 +497,10 @@ func main() {
}

if err = (&k8scorecontrollers.EventReconciler{
Client: client,
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("event-controller"),
}).SetupWithManager(mgr, multiClusterMgr); err != nil {
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Event")
os.Exit(1)
}
Expand Down Expand Up @@ -542,6 +542,15 @@ func main() {
setupLog.Error(err, "unable to create controller", "controller", "Instance")
os.Exit(1)
}

if err = (&workloadscontrollers.InstanceEventReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("instance-event-controller"),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "InstanceEvent")
os.Exit(1)
}
}

if viper.GetBool(operationsFlagKey.viperName()) {
Expand Down Expand Up @@ -675,24 +684,18 @@ func main() {
setupLog.Error(err, "unable to create controller", "controller", "ReconfigureRequest")
os.Exit(1)
}
if err = (&parameterscontrollers.ParametersDefinitionReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ParametersDefinition")
os.Exit(1)
}
if err = (&parameterscontrollers.ParameterDrivenConfigRenderReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("component-driven-config-render-controller"),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ParamConfigRenderer")
os.Exit(1)
}
if err = (&parameterscontrollers.ParameterTemplateExtensionReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("parameter-extension"),
Recorder: mgr.GetEventRecorderFor("parameter-template-extension-controller"),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ParameterTemplateExtension")
os.Exit(1)
Expand Down
12 changes: 6 additions & 6 deletions controllers/apps/cluster/cluster_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,15 +321,15 @@ func (c *clusterPlanBuilder) reconcileObject(node *model.ObjectVertex) error {
}

func (c *clusterPlanBuilder) reconcileCreateObject(ctx context.Context, node *model.ObjectVertex) error {
err := c.cli.Create(ctx, node.Obj, appsutil.ClientOption(node))
err := c.cli.Create(ctx, node.Obj, multiClusterClientOption(node))
if err != nil && !apierrors.IsAlreadyExists(err) {
return err
}
return nil
}

func (c *clusterPlanBuilder) reconcileUpdateObject(ctx context.Context, node *model.ObjectVertex) error {
err := c.cli.Update(ctx, node.Obj, appsutil.ClientOption(node))
err := c.cli.Update(ctx, node.Obj, multiClusterClientOption(node))
if err != nil && !apierrors.IsNotFound(err) {
return err
}
Expand All @@ -338,7 +338,7 @@ func (c *clusterPlanBuilder) reconcileUpdateObject(ctx context.Context, node *mo

func (c *clusterPlanBuilder) reconcilePatchObject(ctx context.Context, node *model.ObjectVertex) error {
patch := client.MergeFrom(node.OriObj)
err := c.cli.Patch(ctx, node.Obj, patch, appsutil.ClientOption(node))
err := c.cli.Patch(ctx, node.Obj, patch, multiClusterClientOption(node))
if err != nil && !apierrors.IsNotFound(err) {
return err
}
Expand All @@ -347,7 +347,7 @@ func (c *clusterPlanBuilder) reconcilePatchObject(ctx context.Context, node *mod

func (c *clusterPlanBuilder) reconcileDeleteObject(ctx context.Context, node *model.ObjectVertex) error {
if controllerutil.RemoveFinalizer(node.Obj, constant.DBClusterFinalizerName) {
err := c.cli.Update(ctx, node.Obj, appsutil.ClientOption(node))
err := c.cli.Update(ctx, node.Obj, multiClusterClientOption(node))
if err != nil && !apierrors.IsNotFound(err) {
return err
}
Expand All @@ -357,7 +357,7 @@ func (c *clusterPlanBuilder) reconcileDeleteObject(ctx context.Context, node *mo
deleteOptions := &client.DeleteOptions{
PropagationPolicy: &deletePropagation,
}
if err := c.cli.Delete(ctx, node.Obj, deleteOptions, appsutil.ClientOption(node)); err != nil {
if err := c.cli.Delete(ctx, node.Obj, deleteOptions, multiClusterClientOption(node)); err != nil {
return client.IgnoreNotFound(err)
}
return nil
Expand All @@ -374,7 +374,7 @@ func (c *clusterPlanBuilder) reconcileDeleteObject(ctx context.Context, node *mo

func (c *clusterPlanBuilder) reconcileStatusObject(ctx context.Context, node *model.ObjectVertex) error {
patch := client.MergeFrom(node.OriObj)
if err := c.cli.Status().Patch(ctx, node.Obj, patch, appsutil.ClientOption(node)); err != nil {
if err := c.cli.Status().Patch(ctx, node.Obj, patch, multiClusterClientOption(node)); err != nil {
return err
}
// handle condition and phase changing triggered events
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,35 +17,20 @@ You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package util
package cluster

import (
"context"
"fmt"

"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/apecloud/kubeblocks/pkg/constant"
"github.com/apecloud/kubeblocks/pkg/controller/model"
"github.com/apecloud/kubeblocks/pkg/controller/multicluster"
)

func Placement(obj client.Object) string {
if obj == nil || obj.GetAnnotations() == nil {
return ""
}
return obj.GetAnnotations()[constant.KBAppMultiClusterPlacementKey]
}

func IntoContext(ctx context.Context, placement string) context.Context {
return multicluster.IntoContext(ctx, placement)
}

func InDataContext4G() model.GraphOption {
func inDataContext4G() model.GraphOption {
return model.WithClientOption(multicluster.InDataContext())
}

func ClientOption(v *model.ObjectVertex) *multicluster.ClientOption {
func multiClusterClientOption(v *model.ObjectVertex) *multicluster.ClientOption {
if v.ClientOpt != nil {
opt, ok := v.ClientOpt.(*multicluster.ClientOption)
if ok {
Expand Down
4 changes: 0 additions & 4 deletions controllers/apps/cluster/transformer_cluster_placement.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"strings"

appsv1 "github.com/apecloud/kubeblocks/apis/apps/v1"
appsutil "github.com/apecloud/kubeblocks/controllers/apps/util"
"github.com/apecloud/kubeblocks/pkg/constant"
"github.com/apecloud/kubeblocks/pkg/controller/graph"
"github.com/apecloud/kubeblocks/pkg/controller/multicluster"
Expand All @@ -49,18 +48,15 @@ func (t *clusterPlacementTransformer) Transform(ctx graph.TransformContext, dag
}

if t.assigned(transCtx) {
transCtx.Context = appsutil.IntoContext(transCtx.Context, appsutil.Placement(transCtx.OrigCluster))
return nil
}

p := t.assign(transCtx)

cluster := transCtx.Cluster
if cluster.Annotations == nil {
cluster.Annotations = make(map[string]string)
}
cluster.Annotations[constant.KBAppMultiClusterPlacementKey] = strings.Join(p, ",")
transCtx.Context = appsutil.IntoContext(transCtx.Context, appsutil.Placement(cluster))

return nil
}
Expand Down
6 changes: 3 additions & 3 deletions controllers/apps/cluster/transformer_cluster_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,13 @@ func (t *clusterServiceTransformer) Transform(ctx graph.TransformContext, dag *g
toCreateServices, toDeleteServices, toUpdateServices := mapDiff(services, protoServices)

for svc := range toCreateServices {
graphCli.Create(dag, protoServices[svc], appsutil.InDataContext4G())
graphCli.Create(dag, protoServices[svc], inDataContext4G())
}
for svc := range toUpdateServices {
t.updateService(dag, graphCli, services[svc], protoServices[svc])
}
for svc := range toDeleteServices {
graphCli.Delete(dag, services[svc], appsutil.InDataContext4G())
graphCli.Delete(dag, services[svc], inDataContext4G())
}
return nil
}
Expand Down Expand Up @@ -229,6 +229,6 @@ func (t *clusterServiceTransformer) updateService(dag *graph.DAG, graphCli model
appsutil.ResolveServiceDefaultFields(&running.Spec, &newSvc.Spec)

if !reflect.DeepEqual(running, newSvc) {
graphCli.Update(dag, running, newSvc, appsutil.InDataContext4G())
graphCli.Update(dag, running, newSvc, inDataContext4G())
}
}
12 changes: 5 additions & 7 deletions controllers/apps/component/component_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

appsv1 "github.com/apecloud/kubeblocks/apis/apps/v1"
appsutil "github.com/apecloud/kubeblocks/controllers/apps/util"
"github.com/apecloud/kubeblocks/pkg/constant"
"github.com/apecloud/kubeblocks/pkg/controller/component"
"github.com/apecloud/kubeblocks/pkg/controller/graph"
Expand Down Expand Up @@ -189,15 +188,15 @@ func (c *componentPlanBuilder) defaultWalkFunc(v graph.Vertex) error {
}

func (c *componentPlanBuilder) reconcileCreateObject(ctx context.Context, vertex *model.ObjectVertex) error {
err := c.cli.Create(ctx, vertex.Obj, appsutil.ClientOption(vertex))
err := c.cli.Create(ctx, vertex.Obj)
if err != nil && !apierrors.IsAlreadyExists(err) {
return err
}
return nil
}

func (c *componentPlanBuilder) reconcileUpdateObject(ctx context.Context, vertex *model.ObjectVertex) error {
err := c.cli.Update(ctx, vertex.Obj, appsutil.ClientOption(vertex))
err := c.cli.Update(ctx, vertex.Obj)
if err != nil && !apierrors.IsNotFound(err) {
return err
}
Expand All @@ -206,7 +205,7 @@ func (c *componentPlanBuilder) reconcileUpdateObject(ctx context.Context, vertex

func (c *componentPlanBuilder) reconcilePatchObject(ctx context.Context, vertex *model.ObjectVertex) error {
patch := client.MergeFrom(vertex.OriObj)
err := c.cli.Patch(ctx, vertex.Obj, patch, appsutil.ClientOption(vertex))
err := c.cli.Patch(ctx, vertex.Obj, patch)
if err != nil && !apierrors.IsNotFound(err) {
return err
}
Expand All @@ -220,7 +219,7 @@ func (c *componentPlanBuilder) reconcileDeleteObject(ctx context.Context, vertex
finalizers := []string{constant.DBComponentFinalizerName, constant.DBClusterFinalizerName}
for _, finalizer := range finalizers {
if controllerutil.RemoveFinalizer(vertex.Obj, finalizer) {
err := c.cli.Update(ctx, vertex.Obj, appsutil.ClientOption(vertex))
err := c.cli.Update(ctx, vertex.Obj)
if err != nil && !apierrors.IsNotFound(err) {
return err
}
Expand All @@ -229,7 +228,6 @@ func (c *componentPlanBuilder) reconcileDeleteObject(ctx context.Context, vertex

if !model.IsObjectDeleting(vertex.Obj) {
var opts []client.DeleteOption
opts = append(opts, appsutil.ClientOption(vertex))
if len(vertex.PropagationPolicy) > 0 {
opts = append(opts, vertex.PropagationPolicy)
}
Expand All @@ -242,5 +240,5 @@ func (c *componentPlanBuilder) reconcileDeleteObject(ctx context.Context, vertex
}

func (c *componentPlanBuilder) reconcileStatusObject(ctx context.Context, vertex *model.ObjectVertex) error {
return c.cli.Status().Update(ctx, vertex.Obj, appsutil.ClientOption(vertex))
return c.cli.Status().Update(ctx, vertex.Obj)
}
2 changes: 1 addition & 1 deletion controllers/apps/component/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ var _ = BeforeSuite(func() {
Client: k8sManager.GetClient(),
Scheme: k8sManager.GetScheme(),
Recorder: k8sManager.GetEventRecorderFor("event-controller"),
}).SetupWithManager(k8sManager, nil)
}).SetupWithManager(k8sManager)
Expect(err).ToNot(HaveOccurred())

testCtx = testutil.NewDefaultTestContext(ctx, k8sClient, testEnv)
Expand Down
4 changes: 0 additions & 4 deletions controllers/apps/component/transformer_component_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
package component

import (
appsutil "github.com/apecloud/kubeblocks/controllers/apps/util"
"github.com/apecloud/kubeblocks/pkg/controller/graph"
"github.com/apecloud/kubeblocks/pkg/controller/model"
intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
Expand All @@ -37,9 +36,6 @@ func (t *componentInitTransformer) Transform(ctx graph.TransformContext, dag *gr
rootVertex := &model.ObjectVertex{Obj: transCtx.Component, OriObj: transCtx.ComponentOrig, Action: model.ActionStatusPtr()}
dag.AddVertex(rootVertex)

// init placement
transCtx.Context = appsutil.IntoContext(transCtx.Context, appsutil.Placement(transCtx.Component))

if !intctrlutil.ObjectAPIVersionSupported(transCtx.Component) {
return graph.ErrPrematureStop
}
Expand Down
14 changes: 3 additions & 11 deletions controllers/apps/component/transformer_component_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"context"
"fmt"
"reflect"
"strconv"
"strings"

"golang.org/x/exp/maps"
Expand All @@ -42,7 +41,6 @@ import (
"github.com/apecloud/kubeblocks/pkg/controller/factory"
"github.com/apecloud/kubeblocks/pkg/controller/graph"
"github.com/apecloud/kubeblocks/pkg/controller/model"
"github.com/apecloud/kubeblocks/pkg/controller/multicluster"
intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
)

Expand Down Expand Up @@ -278,21 +276,15 @@ func (t *componentServiceTransformer) createOrUpdateService(ctx graph.TransformC
)

if service.Annotations != nil {
kind = service.Annotations[constant.MultiClusterServicePlacementKey]
delete(service.Annotations, constant.MultiClusterServicePlacementKey)
kind = service.Annotations[constant.KBAppMultiClusterServicePlacementKey]
delete(service.Annotations, constant.KBAppMultiClusterServicePlacementKey)
}
if podService && len(kind) > 0 && kind != multiClusterServicePlacementInMirror && kind != multiClusterServicePlacementInUnique {
return fmt.Errorf("invalid multi-cluster pod-service placement kind %s for service %s", kind, service.Name)
}

if podService && kind == multiClusterServicePlacementInUnique {
// create or update service in unique, by hacking the pod placement strategy.
ordinal := func() int {
subs := strings.Split(service.GetName(), "-")
o, _ := strconv.Atoi(subs[len(subs)-1])
return o
}
multicluster.Assign(ctx.GetContext(), service, ordinal)
service.Annotations[constant.KBAppMultiClusterObjectProvisionPolicyKey] = "ordinal" // HACK
}

createOrUpdateService := func(service *corev1.Service) error {
Expand Down
13 changes: 7 additions & 6 deletions controllers/apps/component/transformer_component_workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (

appsv1 "github.com/apecloud/kubeblocks/apis/apps/v1"
workloads "github.com/apecloud/kubeblocks/apis/workloads/v1"
appsutil "github.com/apecloud/kubeblocks/controllers/apps/util"
"github.com/apecloud/kubeblocks/pkg/constant"
"github.com/apecloud/kubeblocks/pkg/controller/component"
"github.com/apecloud/kubeblocks/pkg/controller/factory"
Expand Down Expand Up @@ -125,12 +124,14 @@ func (t *componentWorkloadTransformer) reconcileWorkload(ctx context.Context, cl
}

func (t *componentWorkloadTransformer) buildInstanceSetPlacementAnnotation(comp *appsv1.Component, its *workloads.InstanceSet) {
p := appsutil.Placement(comp)
if len(p) > 0 {
if its.Annotations == nil {
its.Annotations = make(map[string]string)
if comp.Annotations != nil {
placement := comp.Annotations[constant.KBAppMultiClusterPlacementKey]
if len(placement) > 0 {
if its.Annotations == nil {
its.Annotations = make(map[string]string)
}
its.Annotations[constant.KBAppMultiClusterPlacementKey] = placement
}
its.Annotations[constant.KBAppMultiClusterPlacementKey] = p
}
}

Expand Down
Loading
Loading