Skip to content

Commit 9d6f391

Browse files
authored
chore: implement multi-cluster with the Instance API (#9697)
1 parent cce5f38 commit 9d6f391

31 files changed

+382
-543
lines changed

cmd/manager/main.go

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -497,10 +497,10 @@ func main() {
497497
}
498498

499499
if err = (&k8scorecontrollers.EventReconciler{
500-
Client: client,
500+
Client: mgr.GetClient(),
501501
Scheme: mgr.GetScheme(),
502502
Recorder: mgr.GetEventRecorderFor("event-controller"),
503-
}).SetupWithManager(mgr, multiClusterMgr); err != nil {
503+
}).SetupWithManager(mgr); err != nil {
504504
setupLog.Error(err, "unable to create controller", "controller", "Event")
505505
os.Exit(1)
506506
}
@@ -542,6 +542,15 @@ func main() {
542542
setupLog.Error(err, "unable to create controller", "controller", "Instance")
543543
os.Exit(1)
544544
}
545+
546+
if err = (&workloadscontrollers.InstanceEventReconciler{
547+
Client: mgr.GetClient(),
548+
Scheme: mgr.GetScheme(),
549+
Recorder: mgr.GetEventRecorderFor("instance-event-controller"),
550+
}).SetupWithManager(mgr); err != nil {
551+
setupLog.Error(err, "unable to create controller", "controller", "InstanceEvent")
552+
os.Exit(1)
553+
}
545554
}
546555

547556
if viper.GetBool(operationsFlagKey.viperName()) {
@@ -675,24 +684,18 @@ func main() {
675684
setupLog.Error(err, "unable to create controller", "controller", "ReconfigureRequest")
676685
os.Exit(1)
677686
}
678-
if err = (&parameterscontrollers.ParametersDefinitionReconciler{
679-
Client: mgr.GetClient(),
680-
Scheme: mgr.GetScheme(),
681-
}).SetupWithManager(mgr); err != nil {
682-
setupLog.Error(err, "unable to create controller", "controller", "ParametersDefinition")
683-
os.Exit(1)
684-
}
685687
if err = (&parameterscontrollers.ParameterDrivenConfigRenderReconciler{
686-
Client: mgr.GetClient(),
687-
Scheme: mgr.GetScheme(),
688+
Client: mgr.GetClient(),
689+
Scheme: mgr.GetScheme(),
690+
Recorder: mgr.GetEventRecorderFor("component-driven-config-render-controller"),
688691
}).SetupWithManager(mgr); err != nil {
689692
setupLog.Error(err, "unable to create controller", "controller", "ParamConfigRenderer")
690693
os.Exit(1)
691694
}
692695
if err = (&parameterscontrollers.ParameterTemplateExtensionReconciler{
693696
Client: mgr.GetClient(),
694697
Scheme: mgr.GetScheme(),
695-
Recorder: mgr.GetEventRecorderFor("parameter-extension"),
698+
Recorder: mgr.GetEventRecorderFor("parameter-template-extension-controller"),
696699
}).SetupWithManager(mgr); err != nil {
697700
setupLog.Error(err, "unable to create controller", "controller", "ParameterTemplateExtension")
698701
os.Exit(1)

controllers/apps/cluster/cluster_plan_builder.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -321,15 +321,15 @@ func (c *clusterPlanBuilder) reconcileObject(node *model.ObjectVertex) error {
321321
}
322322

323323
func (c *clusterPlanBuilder) reconcileCreateObject(ctx context.Context, node *model.ObjectVertex) error {
324-
err := c.cli.Create(ctx, node.Obj, appsutil.ClientOption(node))
324+
err := c.cli.Create(ctx, node.Obj, clientOption(node))
325325
if err != nil && !apierrors.IsAlreadyExists(err) {
326326
return err
327327
}
328328
return nil
329329
}
330330

331331
func (c *clusterPlanBuilder) reconcileUpdateObject(ctx context.Context, node *model.ObjectVertex) error {
332-
err := c.cli.Update(ctx, node.Obj, appsutil.ClientOption(node))
332+
err := c.cli.Update(ctx, node.Obj, clientOption(node))
333333
if err != nil && !apierrors.IsNotFound(err) {
334334
return err
335335
}
@@ -338,7 +338,7 @@ func (c *clusterPlanBuilder) reconcileUpdateObject(ctx context.Context, node *mo
338338

339339
func (c *clusterPlanBuilder) reconcilePatchObject(ctx context.Context, node *model.ObjectVertex) error {
340340
patch := client.MergeFrom(node.OriObj)
341-
err := c.cli.Patch(ctx, node.Obj, patch, appsutil.ClientOption(node))
341+
err := c.cli.Patch(ctx, node.Obj, patch, clientOption(node))
342342
if err != nil && !apierrors.IsNotFound(err) {
343343
return err
344344
}
@@ -347,7 +347,7 @@ func (c *clusterPlanBuilder) reconcilePatchObject(ctx context.Context, node *mod
347347

348348
func (c *clusterPlanBuilder) reconcileDeleteObject(ctx context.Context, node *model.ObjectVertex) error {
349349
if controllerutil.RemoveFinalizer(node.Obj, constant.DBClusterFinalizerName) {
350-
err := c.cli.Update(ctx, node.Obj, appsutil.ClientOption(node))
350+
err := c.cli.Update(ctx, node.Obj, clientOption(node))
351351
if err != nil && !apierrors.IsNotFound(err) {
352352
return err
353353
}
@@ -357,7 +357,7 @@ func (c *clusterPlanBuilder) reconcileDeleteObject(ctx context.Context, node *mo
357357
deleteOptions := &client.DeleteOptions{
358358
PropagationPolicy: &deletePropagation,
359359
}
360-
if err := c.cli.Delete(ctx, node.Obj, deleteOptions, appsutil.ClientOption(node)); err != nil {
360+
if err := c.cli.Delete(ctx, node.Obj, deleteOptions, clientOption(node)); err != nil {
361361
return client.IgnoreNotFound(err)
362362
}
363363
return nil
@@ -374,7 +374,7 @@ func (c *clusterPlanBuilder) reconcileDeleteObject(ctx context.Context, node *mo
374374

375375
func (c *clusterPlanBuilder) reconcileStatusObject(ctx context.Context, node *model.ObjectVertex) error {
376376
patch := client.MergeFrom(node.OriObj)
377-
if err := c.cli.Status().Patch(ctx, node.Obj, patch, appsutil.ClientOption(node)); err != nil {
377+
if err := c.cli.Status().Patch(ctx, node.Obj, patch, clientOption(node)); err != nil {
378378
return err
379379
}
380380
// handle condition and phase changing triggered events

controllers/apps/util/multicluster.go renamed to controllers/apps/cluster/multicluster.go

Lines changed: 3 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,35 +17,20 @@ You should have received a copy of the GNU Affero General Public License
1717
along with this program. If not, see <http://www.gnu.org/licenses/>.
1818
*/
1919

20-
package util
20+
package cluster
2121

2222
import (
23-
"context"
2423
"fmt"
2524

26-
"sigs.k8s.io/controller-runtime/pkg/client"
27-
28-
"github.com/apecloud/kubeblocks/pkg/constant"
2925
"github.com/apecloud/kubeblocks/pkg/controller/model"
3026
"github.com/apecloud/kubeblocks/pkg/controller/multicluster"
3127
)
3228

33-
func Placement(obj client.Object) string {
34-
if obj == nil || obj.GetAnnotations() == nil {
35-
return ""
36-
}
37-
return obj.GetAnnotations()[constant.KBAppMultiClusterPlacementKey]
38-
}
39-
40-
func IntoContext(ctx context.Context, placement string) context.Context {
41-
return multicluster.IntoContext(ctx, placement)
42-
}
43-
44-
func InDataContext4G() model.GraphOption {
29+
func inDataContext4G() model.GraphOption {
4530
return model.WithClientOption(multicluster.InDataContext())
4631
}
4732

48-
func ClientOption(v *model.ObjectVertex) *multicluster.ClientOption {
33+
func clientOption(v *model.ObjectVertex) *multicluster.ClientOption {
4934
if v.ClientOpt != nil {
5035
opt, ok := v.ClientOpt.(*multicluster.ClientOption)
5136
if ok {

controllers/apps/cluster/transformer_cluster_placement.go

Lines changed: 45 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,14 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
2020
package cluster
2121

2222
import (
23+
"fmt"
2324
"math/rand"
2425
"slices"
2526
"strings"
2627

28+
"k8s.io/utils/ptr"
29+
2730
appsv1 "github.com/apecloud/kubeblocks/apis/apps/v1"
28-
appsutil "github.com/apecloud/kubeblocks/controllers/apps/util"
2931
"github.com/apecloud/kubeblocks/pkg/constant"
3032
"github.com/apecloud/kubeblocks/pkg/controller/graph"
3133
"github.com/apecloud/kubeblocks/pkg/controller/multicluster"
@@ -44,33 +46,65 @@ func (t *clusterPlacementTransformer) Transform(ctx graph.TransformContext, dag
4446
return nil
4547
}
4648

47-
if t.multiClusterMgr == nil {
48-
return nil // do nothing
49+
if !t.enabled(transCtx) {
50+
return nil
51+
}
52+
53+
if err := t.precheck(transCtx); err != nil {
54+
return err
4955
}
5056

5157
if t.assigned(transCtx) {
52-
transCtx.Context = appsutil.IntoContext(transCtx.Context, appsutil.Placement(transCtx.OrigCluster))
5358
return nil
5459
}
5560

56-
p := t.assign(transCtx)
57-
61+
contexts := t.assign(transCtx)
5862
cluster := transCtx.Cluster
5963
if cluster.Annotations == nil {
6064
cluster.Annotations = make(map[string]string)
6165
}
62-
cluster.Annotations[constant.KBAppMultiClusterPlacementKey] = strings.Join(p, ",")
63-
transCtx.Context = appsutil.IntoContext(transCtx.Context, appsutil.Placement(cluster))
66+
cluster.Annotations[constant.KBAppMultiClusterPlacementKey] = strings.Join(contexts, ",")
6467

6568
return nil
6669
}
6770

68-
func (t *clusterPlacementTransformer) assigned(transCtx *clusterTransformContext) bool {
71+
func (t *clusterPlacementTransformer) enabled(transCtx *clusterTransformContext) bool {
6972
cluster := transCtx.OrigCluster
70-
if cluster.Annotations == nil {
71-
return false
73+
_, ok := cluster.Annotations[constant.KBAppMultiClusterPlacementKey]
74+
return ok
75+
}
76+
77+
func (t *clusterPlacementTransformer) precheck(transCtx *clusterTransformContext) error {
78+
if t.multiClusterMgr == nil {
79+
return fmt.Errorf("intend to create a multi-cluster object, but the multi-cluster manager is not set up properly")
7280
}
7381

82+
var components []string
83+
for _, spec := range transCtx.components {
84+
if !ptr.Deref(spec.EnableInstanceAPI, false) {
85+
components = append(components, spec.Name)
86+
87+
}
88+
}
89+
if len(components) > 0 {
90+
return fmt.Errorf("the multi-cluster object is only supported for components that enable the instance API: %s", strings.Join(components, ","))
91+
}
92+
93+
var shardings []string
94+
for _, spec := range transCtx.shardings {
95+
if !ptr.Deref(spec.Template.EnableInstanceAPI, false) {
96+
shardings = append(shardings, spec.Name)
97+
}
98+
}
99+
if len(shardings) > 0 {
100+
return fmt.Errorf("the multi-cluster object is only supported for shardings that enable the instance API: %s", strings.Join(shardings, ","))
101+
}
102+
103+
return nil
104+
}
105+
106+
func (t *clusterPlacementTransformer) assigned(transCtx *clusterTransformContext) bool {
107+
cluster := transCtx.OrigCluster
74108
p, ok := cluster.Annotations[constant.KBAppMultiClusterPlacementKey]
75109
return ok && len(strings.TrimSpace(p)) > 0
76110
}

controllers/apps/cluster/transformer_cluster_service.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,13 +69,13 @@ func (t *clusterServiceTransformer) Transform(ctx graph.TransformContext, dag *g
6969
toCreateServices, toDeleteServices, toUpdateServices := mapDiff(services, protoServices)
7070

7171
for svc := range toCreateServices {
72-
graphCli.Create(dag, protoServices[svc], appsutil.InDataContext4G())
72+
graphCli.Create(dag, protoServices[svc], inDataContext4G())
7373
}
7474
for svc := range toUpdateServices {
7575
t.updateService(dag, graphCli, services[svc], protoServices[svc])
7676
}
7777
for svc := range toDeleteServices {
78-
graphCli.Delete(dag, services[svc], appsutil.InDataContext4G())
78+
graphCli.Delete(dag, services[svc], inDataContext4G())
7979
}
8080
return nil
8181
}
@@ -229,6 +229,6 @@ func (t *clusterServiceTransformer) updateService(dag *graph.DAG, graphCli model
229229
appsutil.ResolveServiceDefaultFields(&running.Spec, &newSvc.Spec)
230230

231231
if !reflect.DeepEqual(running, newSvc) {
232-
graphCli.Update(dag, running, newSvc, appsutil.InDataContext4G())
232+
graphCli.Update(dag, running, newSvc, inDataContext4G())
233233
}
234234
}

controllers/apps/component/component_plan_builder.go

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ import (
3131
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
3232

3333
appsv1 "github.com/apecloud/kubeblocks/apis/apps/v1"
34-
appsutil "github.com/apecloud/kubeblocks/controllers/apps/util"
3534
"github.com/apecloud/kubeblocks/pkg/constant"
3635
"github.com/apecloud/kubeblocks/pkg/controller/component"
3736
"github.com/apecloud/kubeblocks/pkg/controller/graph"
@@ -189,15 +188,15 @@ func (c *componentPlanBuilder) defaultWalkFunc(v graph.Vertex) error {
189188
}
190189

191190
func (c *componentPlanBuilder) reconcileCreateObject(ctx context.Context, vertex *model.ObjectVertex) error {
192-
err := c.cli.Create(ctx, vertex.Obj, appsutil.ClientOption(vertex))
191+
err := c.cli.Create(ctx, vertex.Obj)
193192
if err != nil && !apierrors.IsAlreadyExists(err) {
194193
return err
195194
}
196195
return nil
197196
}
198197

199198
func (c *componentPlanBuilder) reconcileUpdateObject(ctx context.Context, vertex *model.ObjectVertex) error {
200-
err := c.cli.Update(ctx, vertex.Obj, appsutil.ClientOption(vertex))
199+
err := c.cli.Update(ctx, vertex.Obj)
201200
if err != nil && !apierrors.IsNotFound(err) {
202201
return err
203202
}
@@ -206,7 +205,7 @@ func (c *componentPlanBuilder) reconcileUpdateObject(ctx context.Context, vertex
206205

207206
func (c *componentPlanBuilder) reconcilePatchObject(ctx context.Context, vertex *model.ObjectVertex) error {
208207
patch := client.MergeFrom(vertex.OriObj)
209-
err := c.cli.Patch(ctx, vertex.Obj, patch, appsutil.ClientOption(vertex))
208+
err := c.cli.Patch(ctx, vertex.Obj, patch)
210209
if err != nil && !apierrors.IsNotFound(err) {
211210
return err
212211
}
@@ -220,7 +219,7 @@ func (c *componentPlanBuilder) reconcileDeleteObject(ctx context.Context, vertex
220219
finalizers := []string{constant.DBComponentFinalizerName, constant.DBClusterFinalizerName}
221220
for _, finalizer := range finalizers {
222221
if controllerutil.RemoveFinalizer(vertex.Obj, finalizer) {
223-
err := c.cli.Update(ctx, vertex.Obj, appsutil.ClientOption(vertex))
222+
err := c.cli.Update(ctx, vertex.Obj)
224223
if err != nil && !apierrors.IsNotFound(err) {
225224
return err
226225
}
@@ -229,7 +228,6 @@ func (c *componentPlanBuilder) reconcileDeleteObject(ctx context.Context, vertex
229228

230229
if !model.IsObjectDeleting(vertex.Obj) {
231230
var opts []client.DeleteOption
232-
opts = append(opts, appsutil.ClientOption(vertex))
233231
if len(vertex.PropagationPolicy) > 0 {
234232
opts = append(opts, vertex.PropagationPolicy)
235233
}
@@ -242,5 +240,5 @@ func (c *componentPlanBuilder) reconcileDeleteObject(ctx context.Context, vertex
242240
}
243241

244242
func (c *componentPlanBuilder) reconcileStatusObject(ctx context.Context, vertex *model.ObjectVertex) error {
245-
return c.cli.Status().Update(ctx, vertex.Obj, appsutil.ClientOption(vertex))
243+
return c.cli.Status().Update(ctx, vertex.Obj)
246244
}

controllers/apps/component/suite_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ var _ = BeforeSuite(func() {
196196
Client: k8sManager.GetClient(),
197197
Scheme: k8sManager.GetScheme(),
198198
Recorder: k8sManager.GetEventRecorderFor("event-controller"),
199-
}).SetupWithManager(k8sManager, nil)
199+
}).SetupWithManager(k8sManager)
200200
Expect(err).ToNot(HaveOccurred())
201201

202202
testCtx = testutil.NewDefaultTestContext(ctx, k8sClient, testEnv)

controllers/apps/component/transformer_component_init.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
2020
package component
2121

2222
import (
23-
appsutil "github.com/apecloud/kubeblocks/controllers/apps/util"
2423
"github.com/apecloud/kubeblocks/pkg/controller/graph"
2524
"github.com/apecloud/kubeblocks/pkg/controller/model"
2625
intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
@@ -37,9 +36,6 @@ func (t *componentInitTransformer) Transform(ctx graph.TransformContext, dag *gr
3736
rootVertex := &model.ObjectVertex{Obj: transCtx.Component, OriObj: transCtx.ComponentOrig, Action: model.ActionStatusPtr()}
3837
dag.AddVertex(rootVertex)
3938

40-
// init placement
41-
transCtx.Context = appsutil.IntoContext(transCtx.Context, appsutil.Placement(transCtx.Component))
42-
4339
if !intctrlutil.ObjectAPIVersionSupported(transCtx.Component) {
4440
return graph.ErrPrematureStop
4541
}

controllers/apps/component/transformer_component_service.go

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import (
2323
"context"
2424
"fmt"
2525
"reflect"
26-
"strconv"
2726
"strings"
2827

2928
"golang.org/x/exp/maps"
@@ -42,7 +41,6 @@ import (
4241
"github.com/apecloud/kubeblocks/pkg/controller/factory"
4342
"github.com/apecloud/kubeblocks/pkg/controller/graph"
4443
"github.com/apecloud/kubeblocks/pkg/controller/model"
45-
"github.com/apecloud/kubeblocks/pkg/controller/multicluster"
4644
intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
4745
)
4846

@@ -278,21 +276,15 @@ func (t *componentServiceTransformer) createOrUpdateService(ctx graph.TransformC
278276
)
279277

280278
if service.Annotations != nil {
281-
kind = service.Annotations[constant.MultiClusterServicePlacementKey]
282-
delete(service.Annotations, constant.MultiClusterServicePlacementKey)
279+
kind = service.Annotations[constant.KBAppMultiClusterServicePlacementKey]
280+
delete(service.Annotations, constant.KBAppMultiClusterServicePlacementKey)
283281
}
284282
if podService && len(kind) > 0 && kind != multiClusterServicePlacementInMirror && kind != multiClusterServicePlacementInUnique {
285283
return fmt.Errorf("invalid multi-cluster pod-service placement kind %s for service %s", kind, service.Name)
286284
}
287285

288286
if podService && kind == multiClusterServicePlacementInUnique {
289-
// create or update service in unique, by hacking the pod placement strategy.
290-
ordinal := func() int {
291-
subs := strings.Split(service.GetName(), "-")
292-
o, _ := strconv.Atoi(subs[len(subs)-1])
293-
return o
294-
}
295-
multicluster.Assign(ctx.GetContext(), service, ordinal)
287+
service.Annotations[constant.KBAppMultiClusterObjectProvisionPolicyKey] = "ordinal" // HACK
296288
}
297289

298290
createOrUpdateService := func(service *corev1.Service) error {

0 commit comments

Comments
 (0)