Skip to content

Commit a9abd1a

Browse files
committed
remove replica status
1 parent 1846fa3 commit a9abd1a

File tree

6 files changed

+168
-750
lines changed

6 files changed

+168
-750
lines changed

controllers/apps/component/transformer_component_pre_terminate.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -107,13 +107,12 @@ func (t *componentPreTerminateTransformer) provisioned(transCtx *componentTransf
107107
return false, client.IgnoreNotFound(err)
108108
}
109109

110-
provisioned, err := component.GetReplicasStatusFunc(its, func(s component.ReplicaStatus) bool {
111-
return s.Provisioned
112-
})
113-
if err != nil {
114-
return false, err
110+
for _, inst := range its.Status.InstanceStatus {
111+
if inst.Provisioned {
112+
return true, nil
113+
}
115114
}
116-
return len(provisioned) > 0, nil
115+
return false, nil
117116
}
118117

119118
func (t *componentPreTerminateTransformer) checkPreTerminateDone(transCtx *componentTransformContext, dag *graph.DAG) bool {

controllers/apps/component/transformer_component_pre_terminate_test.go

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ import (
3737
workloads "github.com/apecloud/kubeblocks/apis/workloads/v1"
3838
appsutil "github.com/apecloud/kubeblocks/controllers/apps/util"
3939
"github.com/apecloud/kubeblocks/pkg/constant"
40-
"github.com/apecloud/kubeblocks/pkg/controller/component"
4140
"github.com/apecloud/kubeblocks/pkg/controller/graph"
4241
"github.com/apecloud/kubeblocks/pkg/controller/model"
4342
kbacli "github.com/apecloud/kubeblocks/pkg/kbagent/client"
@@ -65,10 +64,12 @@ var _ = Describe("pre-terminate transformer test", func() {
6564
}
6665

6766
provisioned := func(its *workloads.InstanceSet) {
68-
replicas := []string{
69-
fmt.Sprintf("%s-0", its.Name),
67+
its.Status.InstanceStatus = []workloads.InstanceStatus{
68+
{
69+
PodName: fmt.Sprintf("%s-0", its.Name),
70+
Provisioned: true,
71+
},
7072
}
71-
Expect(component.StatusReplicasStatus(its, replicas, false, false)).Should(Succeed())
7273
}
7374

7475
BeforeEach(func() {
@@ -199,12 +200,7 @@ var _ = Describe("pre-terminate transformer test", func() {
199200

200201
It("not provisioned", func() {
201202
its := reader.Objects[1].(*workloads.InstanceSet)
202-
Expect(component.UpdateReplicasStatusFunc(its, func(r *component.ReplicasStatus) error {
203-
for i := range r.Status {
204-
r.Status[i].Provisioned = false
205-
}
206-
return nil
207-
})).Should(Succeed())
203+
its.Status.InstanceStatus[0].Provisioned = false
208204

209205
transformer := &componentPreTerminateTransformer{}
210206
err := transformer.Transform(transCtx, dag)

controllers/apps/component/transformer_component_workload.go

Lines changed: 0 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import (
2828
"golang.org/x/exp/maps"
2929
corev1 "k8s.io/api/core/v1"
3030
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
31-
"k8s.io/apimachinery/pkg/util/sets"
3231
"k8s.io/utils/ptr"
3332
"sigs.k8s.io/controller-runtime/pkg/client"
3433

@@ -116,10 +115,6 @@ func (t *componentWorkloadTransformer) reconcileWorkload(ctx context.Context, cl
116115

117116
t.buildInstanceSetPlacementAnnotation(comp, protoITS)
118117

119-
if err := t.reconcileReplicasStatus(ctx, cli, synthesizedComp, runningITS, protoITS); err != nil {
120-
return err
121-
}
122-
123118
return nil
124119
}
125120

@@ -135,43 +130,6 @@ func (t *componentWorkloadTransformer) buildInstanceSetPlacementAnnotation(comp
135130
}
136131
}
137132

138-
func (t *componentWorkloadTransformer) reconcileReplicasStatus(ctx context.Context, cli client.Reader,
139-
synthesizedComp *component.SynthesizedComponent, runningITS, protoITS *workloads.InstanceSet) error {
140-
var (
141-
namespace = synthesizedComp.Namespace
142-
clusterName = synthesizedComp.ClusterName
143-
compName = synthesizedComp.Name
144-
)
145-
146-
// HACK: sync replicas status from runningITS to protoITS
147-
component.BuildReplicasStatus(runningITS, protoITS)
148-
149-
replicas, err := func() ([]string, error) {
150-
pods, err := component.ListOwnedPods(ctx, cli, namespace, clusterName, compName)
151-
if err != nil {
152-
return nil, err
153-
}
154-
podNameSet := sets.New[string]()
155-
for _, pod := range pods {
156-
podNameSet.Insert(pod.Name)
157-
}
158-
159-
desiredPodNames, err := component.GeneratePodNamesByITS(protoITS)
160-
if err != nil {
161-
return nil, err
162-
}
163-
desiredPodNameSet := sets.New(desiredPodNames...)
164-
165-
return desiredPodNameSet.Intersection(podNameSet).UnsortedList(), nil
166-
}()
167-
if err != nil {
168-
return err
169-
}
170-
171-
hasMemberJoinDefined, hasDataActionDefined := hasMemberJoinNDataActionDefined(synthesizedComp.LifecycleActions)
172-
return component.StatusReplicasStatus(protoITS, replicas, hasMemberJoinDefined, hasDataActionDefined)
173-
}
174-
175133
func (t *componentWorkloadTransformer) handleUpdate(transCtx *componentTransformContext, cli model.GraphClient, dag *graph.DAG,
176134
synthesizedComp *component.SynthesizedComponent, comp *appsv1.Component, runningITS, protoITS *workloads.InstanceSet) error {
177135
start, stop, err := t.handleWorkloadStartNStop(synthesizedComp, runningITS, &protoITS)
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
/*
2+
Copyright (C) 2022-2025 ApeCloud Co., Ltd
3+
4+
This file is part of KubeBlocks project
5+
6+
This program is free software: you can redistribute it and/or modify
7+
it under the terms of the GNU Affero General Public License as published by
8+
the Free Software Foundation, either version 3 of the License, or
9+
(at your option) any later version.
10+
11+
This program is distributed in the hope that it will be useful
12+
but WITHOUT ANY WARRANTY; without even the implied warranty of
13+
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14+
GNU Affero General Public License for more details.
15+
16+
You should have received a copy of the GNU Affero General Public License
17+
along with this program. If not, see <http://www.gnu.org/licenses/>.
18+
*/
19+
20+
package component
21+
22+
import (
23+
"context"
24+
"slices"
25+
"strings"
26+
27+
"github.com/go-logr/logr"
28+
corev1 "k8s.io/api/core/v1"
29+
"k8s.io/apimachinery/pkg/types"
30+
"sigs.k8s.io/controller-runtime/pkg/client"
31+
32+
workloads "github.com/apecloud/kubeblocks/apis/workloads/v1"
33+
"github.com/apecloud/kubeblocks/pkg/constant"
34+
intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
35+
"github.com/apecloud/kubeblocks/pkg/kbagent"
36+
"github.com/apecloud/kubeblocks/pkg/kbagent/proto"
37+
)
38+
39+
const (
40+
// new replicas task & event
41+
newReplicaTask = "newReplica"
42+
defaultNewReplicaTaskReportPeriodSeconds = 60
43+
)
44+
45+
func NewReplicaTask(compName, uid string, source *corev1.Pod, replicas []string) (map[string]string, error) {
46+
port, err := intctrlutil.GetPortByName(*source, kbagent.ContainerName, kbagent.DefaultStreamingPortName)
47+
if err != nil {
48+
return nil, err
49+
}
50+
task := proto.Task{
51+
Instance: compName,
52+
Task: newReplicaTask,
53+
UID: uid,
54+
Replicas: strings.Join(replicas, ","),
55+
NotifyAtFinish: true,
56+
ReportPeriodSeconds: defaultNewReplicaTaskReportPeriodSeconds,
57+
NewReplica: &proto.NewReplicaTask{
58+
Remote: intctrlutil.PodFQDN(source.Namespace, compName, source.Name),
59+
Port: port,
60+
Replicas: strings.Join(replicas, ","),
61+
},
62+
}
63+
return buildKBAgentTaskEnv(task)
64+
}
65+
66+
func handleNewReplicaTaskEvent(logger logr.Logger, ctx context.Context, cli client.Client, namespace string, event proto.TaskEvent) error {
67+
key := types.NamespacedName{
68+
Namespace: namespace,
69+
Name: event.Instance,
70+
}
71+
its := &workloads.InstanceSet{}
72+
if err := cli.Get(ctx, key, its); err != nil {
73+
logger.Error(err, "get ITS failed when handle new replica task event",
74+
"code", event.Code, "finished", !event.EndTime.IsZero(), "message", event.Message)
75+
return err
76+
}
77+
78+
var err error
79+
finished := !event.EndTime.IsZero()
80+
switch {
81+
case finished && event.Code == 0:
82+
err = handleNewReplicaTaskEvent4Finished(ctx, cli, its, event)
83+
case finished:
84+
err = handleNewReplicaTaskEvent4Failed(ctx, cli, its, event)
85+
default:
86+
err = handleNewReplicaTaskEvent4Unfinished(ctx, cli, its, event)
87+
}
88+
if err != nil {
89+
logger.Error(err, "handle new replica task event failed",
90+
"code", event.Code, "finished", finished, "message", event.Message)
91+
} else {
92+
logger.Info("handle new replica task event success",
93+
"code", event.Code, "finished", finished, "message", event.Message)
94+
}
95+
return err
96+
}
97+
98+
func handleNewReplicaTaskEvent4Finished(ctx context.Context, cli client.Client, its *workloads.InstanceSet, event proto.TaskEvent) error {
99+
if err := func() error {
100+
envKey := types.NamespacedName{
101+
Namespace: its.Namespace,
102+
Name: constant.GetCompEnvCMName(its.Name),
103+
}
104+
obj := &corev1.ConfigMap{}
105+
err := cli.Get(ctx, envKey, obj)
106+
if err != nil {
107+
return err
108+
}
109+
110+
parameters, err := updateKBAgentTaskEnv(obj.Data, func(task proto.Task) *proto.Task {
111+
if task.Task == newReplicaTask {
112+
replicas := strings.Split(task.Replicas, ",")
113+
replicas = slices.DeleteFunc(replicas, func(r string) bool {
114+
return r == event.Replica
115+
})
116+
if len(replicas) == 0 {
117+
return nil
118+
}
119+
task.Replicas = strings.Join(replicas, ",")
120+
if task.NewReplica != nil {
121+
task.NewReplica.Replicas = task.Replicas
122+
}
123+
}
124+
return &task
125+
})
126+
if err != nil {
127+
return err
128+
}
129+
if parameters == nil {
130+
return nil // do nothing
131+
}
132+
133+
if obj.Data == nil {
134+
obj.Data = make(map[string]string)
135+
}
136+
for k, v := range parameters {
137+
obj.Data[k] = v
138+
}
139+
return cli.Update(ctx, obj)
140+
}(); err != nil {
141+
return err
142+
}
143+
144+
// TODO: mark data loaded
145+
146+
return nil
147+
}
148+
149+
func handleNewReplicaTaskEvent4Unfinished(ctx context.Context, cli client.Client, its *workloads.InstanceSet, event proto.TaskEvent) error {
150+
// TODO: do nothing, log the event.Message
151+
return nil
152+
}
153+
154+
func handleNewReplicaTaskEvent4Failed(ctx context.Context, cli client.Client, its *workloads.InstanceSet, event proto.TaskEvent) error {
155+
// TODO: do nothing, log the event.Message
156+
return nil
157+
}

0 commit comments

Comments
 (0)