Skip to content

Commit e1f24be

Browse files
committed
remove replica status
1 parent 1846fa3 commit e1f24be

File tree

6 files changed

+158
-750
lines changed

6 files changed

+158
-750
lines changed

controllers/apps/component/transformer_component_pre_terminate.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -107,13 +107,12 @@ func (t *componentPreTerminateTransformer) provisioned(transCtx *componentTransf
107107
return false, client.IgnoreNotFound(err)
108108
}
109109

110-
provisioned, err := component.GetReplicasStatusFunc(its, func(s component.ReplicaStatus) bool {
111-
return s.Provisioned
112-
})
113-
if err != nil {
114-
return false, err
110+
for _, inst := range its.Status.InstanceStatus {
111+
if inst.Provisioned {
112+
return true, nil
113+
}
115114
}
116-
return len(provisioned) > 0, nil
115+
return false, nil
117116
}
118117

119118
func (t *componentPreTerminateTransformer) checkPreTerminateDone(transCtx *componentTransformContext, dag *graph.DAG) bool {

controllers/apps/component/transformer_component_pre_terminate_test.go

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ import (
3737
workloads "github.com/apecloud/kubeblocks/apis/workloads/v1"
3838
appsutil "github.com/apecloud/kubeblocks/controllers/apps/util"
3939
"github.com/apecloud/kubeblocks/pkg/constant"
40-
"github.com/apecloud/kubeblocks/pkg/controller/component"
4140
"github.com/apecloud/kubeblocks/pkg/controller/graph"
4241
"github.com/apecloud/kubeblocks/pkg/controller/model"
4342
kbacli "github.com/apecloud/kubeblocks/pkg/kbagent/client"
@@ -65,10 +64,12 @@ var _ = Describe("pre-terminate transformer test", func() {
6564
}
6665

6766
provisioned := func(its *workloads.InstanceSet) {
68-
replicas := []string{
69-
fmt.Sprintf("%s-0", its.Name),
67+
its.Status.InstanceStatus = []workloads.InstanceStatus{
68+
{
69+
PodName: fmt.Sprintf("%s-0", its.Name),
70+
Provisioned: true,
71+
},
7072
}
71-
Expect(component.StatusReplicasStatus(its, replicas, false, false)).Should(Succeed())
7273
}
7374

7475
BeforeEach(func() {
@@ -199,12 +200,7 @@ var _ = Describe("pre-terminate transformer test", func() {
199200

200201
It("not provisioned", func() {
201202
its := reader.Objects[1].(*workloads.InstanceSet)
202-
Expect(component.UpdateReplicasStatusFunc(its, func(r *component.ReplicasStatus) error {
203-
for i := range r.Status {
204-
r.Status[i].Provisioned = false
205-
}
206-
return nil
207-
})).Should(Succeed())
203+
its.Status.InstanceStatus[0].Provisioned = false
208204

209205
transformer := &componentPreTerminateTransformer{}
210206
err := transformer.Transform(transCtx, dag)

controllers/apps/component/transformer_component_workload.go

Lines changed: 0 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import (
2828
"golang.org/x/exp/maps"
2929
corev1 "k8s.io/api/core/v1"
3030
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
31-
"k8s.io/apimachinery/pkg/util/sets"
3231
"k8s.io/utils/ptr"
3332
"sigs.k8s.io/controller-runtime/pkg/client"
3433

@@ -116,10 +115,6 @@ func (t *componentWorkloadTransformer) reconcileWorkload(ctx context.Context, cl
116115

117116
t.buildInstanceSetPlacementAnnotation(comp, protoITS)
118117

119-
if err := t.reconcileReplicasStatus(ctx, cli, synthesizedComp, runningITS, protoITS); err != nil {
120-
return err
121-
}
122-
123118
return nil
124119
}
125120

@@ -135,43 +130,6 @@ func (t *componentWorkloadTransformer) buildInstanceSetPlacementAnnotation(comp
135130
}
136131
}
137132

138-
func (t *componentWorkloadTransformer) reconcileReplicasStatus(ctx context.Context, cli client.Reader,
139-
synthesizedComp *component.SynthesizedComponent, runningITS, protoITS *workloads.InstanceSet) error {
140-
var (
141-
namespace = synthesizedComp.Namespace
142-
clusterName = synthesizedComp.ClusterName
143-
compName = synthesizedComp.Name
144-
)
145-
146-
// HACK: sync replicas status from runningITS to protoITS
147-
component.BuildReplicasStatus(runningITS, protoITS)
148-
149-
replicas, err := func() ([]string, error) {
150-
pods, err := component.ListOwnedPods(ctx, cli, namespace, clusterName, compName)
151-
if err != nil {
152-
return nil, err
153-
}
154-
podNameSet := sets.New[string]()
155-
for _, pod := range pods {
156-
podNameSet.Insert(pod.Name)
157-
}
158-
159-
desiredPodNames, err := component.GeneratePodNamesByITS(protoITS)
160-
if err != nil {
161-
return nil, err
162-
}
163-
desiredPodNameSet := sets.New(desiredPodNames...)
164-
165-
return desiredPodNameSet.Intersection(podNameSet).UnsortedList(), nil
166-
}()
167-
if err != nil {
168-
return err
169-
}
170-
171-
hasMemberJoinDefined, hasDataActionDefined := hasMemberJoinNDataActionDefined(synthesizedComp.LifecycleActions)
172-
return component.StatusReplicasStatus(protoITS, replicas, hasMemberJoinDefined, hasDataActionDefined)
173-
}
174-
175133
func (t *componentWorkloadTransformer) handleUpdate(transCtx *componentTransformContext, cli model.GraphClient, dag *graph.DAG,
176134
synthesizedComp *component.SynthesizedComponent, comp *appsv1.Component, runningITS, protoITS *workloads.InstanceSet) error {
177135
start, stop, err := t.handleWorkloadStartNStop(synthesizedComp, runningITS, &protoITS)
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
/*
2+
Copyright (C) 2022-2025 ApeCloud Co., Ltd
3+
4+
This file is part of KubeBlocks project
5+
6+
This program is free software: you can redistribute it and/or modify
7+
it under the terms of the GNU Affero General Public License as published by
8+
the Free Software Foundation, either version 3 of the License, or
9+
(at your option) any later version.
10+
11+
This program is distributed in the hope that it will be useful
12+
but WITHOUT ANY WARRANTY; without even the implied warranty of
13+
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14+
GNU Affero General Public License for more details.
15+
16+
You should have received a copy of the GNU Affero General Public License
17+
along with this program. If not, see <http://www.gnu.org/licenses/>.
18+
*/
19+
20+
package component
21+
22+
import (
23+
"context"
24+
"slices"
25+
"strings"
26+
27+
"github.com/go-logr/logr"
28+
corev1 "k8s.io/api/core/v1"
29+
"k8s.io/apimachinery/pkg/types"
30+
"k8s.io/utils/ptr"
31+
"sigs.k8s.io/controller-runtime/pkg/client"
32+
33+
workloads "github.com/apecloud/kubeblocks/apis/workloads/v1"
34+
"github.com/apecloud/kubeblocks/pkg/constant"
35+
intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
36+
"github.com/apecloud/kubeblocks/pkg/kbagent"
37+
"github.com/apecloud/kubeblocks/pkg/kbagent/proto"
38+
)
39+
40+
const (
41+
// new replicas task & event
42+
newReplicaTask = "newReplica"
43+
defaultNewReplicaTaskReportPeriodSeconds = 60
44+
)
45+
46+
func NewReplicaTask(compName, uid string, source *corev1.Pod, replicas []string) (map[string]string, error) {
47+
port, err := intctrlutil.GetPortByName(*source, kbagent.ContainerName, kbagent.DefaultStreamingPortName)
48+
if err != nil {
49+
return nil, err
50+
}
51+
task := proto.Task{
52+
Instance: compName,
53+
Task: newReplicaTask,
54+
UID: uid,
55+
Replicas: strings.Join(replicas, ","),
56+
NotifyAtFinish: true,
57+
ReportPeriodSeconds: defaultNewReplicaTaskReportPeriodSeconds,
58+
NewReplica: &proto.NewReplicaTask{
59+
Remote: intctrlutil.PodFQDN(source.Namespace, compName, source.Name),
60+
Port: port,
61+
Replicas: strings.Join(replicas, ","),
62+
},
63+
}
64+
return buildKBAgentTaskEnv(task)
65+
}
66+
67+
func handleNewReplicaTaskEvent(logger logr.Logger, ctx context.Context, cli client.Client, namespace string, event proto.TaskEvent) error {
68+
key := types.NamespacedName{
69+
Namespace: namespace,
70+
Name: event.Instance,
71+
}
72+
its := &workloads.InstanceSet{}
73+
if err := cli.Get(ctx, key, its); err != nil {
74+
logger.Error(err, "get ITS failed when handle new replica task event",
75+
"code", event.Code, "finished", !event.EndTime.IsZero(), "message", event.Message)
76+
return err
77+
}
78+
79+
var err error
80+
finished := !event.EndTime.IsZero()
81+
if finished && event.Code == 0 {
82+
err = handleNewReplicaTaskEvent4Finished(ctx, cli, its, event)
83+
}
84+
if err != nil {
85+
logger.Error(err, "handle new replica task event failed",
86+
"code", event.Code, "finished", finished, "message", event.Message)
87+
} else {
88+
logger.Info("handle new replica task event success",
89+
"code", event.Code, "finished", finished, "message", event.Message)
90+
}
91+
return err
92+
}
93+
94+
func handleNewReplicaTaskEvent4Finished(ctx context.Context, cli client.Client, its *workloads.InstanceSet, event proto.TaskEvent) error {
95+
if err := func() error {
96+
envKey := types.NamespacedName{
97+
Namespace: its.Namespace,
98+
Name: constant.GetCompEnvCMName(its.Name),
99+
}
100+
obj := &corev1.ConfigMap{}
101+
err := cli.Get(ctx, envKey, obj)
102+
if err != nil {
103+
return err
104+
}
105+
106+
parameters, err := updateKBAgentTaskEnv(obj.Data, func(task proto.Task) *proto.Task {
107+
if task.Task == newReplicaTask {
108+
replicas := strings.Split(task.Replicas, ",")
109+
replicas = slices.DeleteFunc(replicas, func(r string) bool {
110+
return r == event.Replica
111+
})
112+
if len(replicas) == 0 {
113+
return nil
114+
}
115+
task.Replicas = strings.Join(replicas, ",")
116+
if task.NewReplica != nil {
117+
task.NewReplica.Replicas = task.Replicas
118+
}
119+
}
120+
return &task
121+
})
122+
if err != nil {
123+
return err
124+
}
125+
if parameters == nil {
126+
return nil // do nothing
127+
}
128+
129+
if obj.Data == nil {
130+
obj.Data = make(map[string]string)
131+
}
132+
for k, v := range parameters {
133+
obj.Data[k] = v
134+
}
135+
return cli.Update(ctx, obj)
136+
}(); err != nil {
137+
return err
138+
}
139+
140+
// TODO: do NOT update the ITS status directly
141+
for i, inst := range its.Status.InstanceStatus {
142+
if inst.PodName == event.Replica {
143+
its.Status.InstanceStatus[i].DataLoaded = ptr.To(true)
144+
}
145+
}
146+
return cli.Status().Update(ctx, its)
147+
}

0 commit comments

Comments
 (0)