@@ -21,7 +21,6 @@ package component
2121
2222import (
2323 "crypto/sha256"
24- "errors"
2524 "fmt"
2625 "path/filepath"
2726 "reflect"
@@ -42,7 +41,6 @@ import (
4241 "github.com/apecloud/kubeblocks/pkg/controller/graph"
4342 "github.com/apecloud/kubeblocks/pkg/controller/lifecycle"
4443 "github.com/apecloud/kubeblocks/pkg/controller/model"
45- intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
4644)
4745
4846type componentWorkloadOps struct {
@@ -95,162 +93,19 @@ func (r *componentWorkloadOps) horizontalScale() error {
9593 in = r .runningItsPodNameSet .Difference (r .desiredCompPodNameSet )
9694 out = r .desiredCompPodNameSet .Difference (r .runningItsPodNameSet )
9795 )
98- if in .Len () == 0 && out .Len () == 0 {
99- return r .postHorizontalScale () // TODO: how about consecutive horizontal scales?
100- }
101-
102- if in .Len () > 0 {
103- if err := r .scaleIn (); err != nil {
104- return err
105- }
106- }
107-
108- if out .Len () > 0 {
109- if err := r .scaleOut (); err != nil {
110- return err
111- }
112- }
113-
114- r .transCtx .EventRecorder .Eventf (r .component ,
115- corev1 .EventTypeNormal ,
116- "HorizontalScale" ,
117- "start horizontal scale component %s of cluster %s from %d to %d" ,
118- r .synthesizeComp .Name , r .synthesizeComp .ClusterName , int (* r .runningITS .Spec .Replicas ), r .synthesizeComp .Replicas )
119-
120- return nil
121- }
122-
123- func (r * componentWorkloadOps ) scaleIn () error {
124- if r .synthesizeComp .Replicas == 0 && len (r .synthesizeComp .VolumeClaimTemplates ) > 0 {
125- if r .synthesizeComp .PVCRetentionPolicy .WhenScaled != appsv1 .RetainPersistentVolumeClaimRetentionPolicyType {
126- return fmt .Errorf ("when intending to scale-in to 0, only the \" Retain\" option is supported for the PVC retention policy" )
127- }
128- }
129-
130- deleteReplicas := r .runningItsPodNameSet .Difference (r .desiredCompPodNameSet ).UnsortedList ()
131- joinedReplicas := make ([]string , 0 )
132- err := component .DeleteReplicasStatus (r .protoITS , deleteReplicas , func (s component.ReplicaStatus ) {
133- // has no member join defined or has joined successfully
134- if s .Provisioned && (s .MemberJoined == nil || * s .MemberJoined ) {
135- joinedReplicas = append (joinedReplicas , s .Name )
136- }
137- })
138- if err != nil {
139- return err
140- }
141-
142- // TODO: check the component definition to determine whether we need to call leave member before deleting replicas.
143- if err := r .leaveMember4ScaleIn (deleteReplicas , joinedReplicas ); err != nil {
144- r .transCtx .Logger .Error (err , "leave member at scale-in error" )
145- return err
146- }
147- return nil
148- }
149-
150- func (r * componentWorkloadOps ) leaveMember4ScaleIn (deleteReplicas , joinedReplicas []string ) error {
151- pods , err := component .ListOwnedPods (r .transCtx .Context , r .cli ,
152- r .synthesizeComp .Namespace , r .synthesizeComp .ClusterName , r .synthesizeComp .Name )
153- if err != nil {
154- return err
155- }
156-
157- deleteReplicasSet := sets .New (deleteReplicas ... )
158- joinedReplicasSet := sets .New (joinedReplicas ... )
159- hasMemberLeaveDefined := r .synthesizeComp .LifecycleActions != nil && r .synthesizeComp .LifecycleActions .MemberLeave != nil
160- r .transCtx .Logger .Info ("leave member at scaling-in" , "delete replicas" , deleteReplicas ,
161- "joined replicas" , joinedReplicas , "has member-leave action defined" , hasMemberLeaveDefined )
162-
163- leaveErrors := make ([]error , 0 )
164- for _ , pod := range pods {
165- if deleteReplicasSet .Has (pod .Name ) {
166- if joinedReplicasSet .Has (pod .Name ) { // else: hasn't joined yet, no need to leave
167- if err = r .leaveMemberForPod (pod , pods ); err != nil {
168- leaveErrors = append (leaveErrors , err )
169- }
170- joinedReplicasSet .Delete (pod .Name )
171- }
172- deleteReplicasSet .Delete (pod .Name )
173- }
174- }
175-
176- if hasMemberLeaveDefined && len (joinedReplicasSet ) > 0 {
177- leaveErrors = append (leaveErrors ,
178- fmt .Errorf ("some replicas have joined but not leaved since the Pod object is not exist: %v" , sets .List (joinedReplicasSet )))
179- }
180- if len (leaveErrors ) > 0 {
181- return intctrlutil .NewRequeueError (time .Second , fmt .Sprintf ("%v" , leaveErrors ))
182- }
183- return nil
184- }
185-
186- func (r * componentWorkloadOps ) leaveMemberForPod (pod * corev1.Pod , pods []* corev1.Pod ) error {
187- var (
188- synthesizedComp = r .synthesizeComp
189- lifecycleActions = synthesizedComp .LifecycleActions
190- )
191-
192- trySwitchover := func (lfa lifecycle.Lifecycle , pod * corev1.Pod ) error {
193- if lifecycleActions .Switchover == nil {
194- return nil
195- }
196- err := lfa .Switchover (r .transCtx .Context , r .cli , nil , "" )
197- if err != nil {
198- if errors .Is (err , lifecycle .ErrActionNotDefined ) {
199- return nil
200- }
201- return err
202- }
203- r .transCtx .Logger .Info ("successfully call switchover action for pod" , "pod" , pod .Name )
204- return nil
205- }
206-
207- tryMemberLeave := func (lfa lifecycle.Lifecycle , pod * corev1.Pod ) error {
208- if lifecycleActions .MemberLeave == nil {
209- return nil
210- }
211- err := lfa .MemberLeave (r .transCtx .Context , r .cli , nil )
212- if err != nil {
213- if errors .Is (err , lifecycle .ErrActionNotDefined ) {
214- return nil
215- }
216- return err
217- }
218- r .transCtx .Logger .Info ("successfully call leave member action for pod" , "pod" , pod .Name )
219- return nil
220- }
221-
222- if lifecycleActions == nil || (lifecycleActions .Switchover == nil && lifecycleActions .MemberLeave == nil ) {
223- return nil
224- }
225-
226- lfa , err := lifecycle .New (synthesizedComp .Namespace , synthesizedComp .ClusterName , synthesizedComp .Name ,
227- lifecycleActions , synthesizedComp .TemplateVars , pod , pods ... )
228- if err != nil {
229- return err
230- }
231-
232- if err := trySwitchover (lfa , pod ); err != nil {
96+ if err := r .buildDataReplicationTask (); err != nil {
23397 return err
23498 }
235-
236- if err := tryMemberLeave (lfa , pod ); err != nil {
237- return err
99+ if in .Len () != 0 || out .Len () != 0 {
100+ r .transCtx .EventRecorder .Eventf (r .component ,
101+ corev1 .EventTypeNormal ,
102+ "HorizontalScale" ,
103+ "start horizontal scale component %s of cluster %s from %d to %d" ,
104+ r .synthesizeComp .Name , r .synthesizeComp .ClusterName , int (* r .runningITS .Spec .Replicas ), r .synthesizeComp .Replicas )
238105 }
239-
240106 return nil
241107}
242108
243- func (r * componentWorkloadOps ) scaleOut () error {
244- if err := r .buildDataReplicationTask (); err != nil {
245- return err
246- }
247-
248- // replicas to be created
249- newReplicas := r .desiredCompPodNameSet .Difference (r .runningItsPodNameSet ).UnsortedList ()
250- hasMemberJoinDefined , hasDataActionDefined := hasMemberJoinNDataActionDefined (r .synthesizeComp .LifecycleActions )
251- return component .NewReplicasStatus (r .protoITS , newReplicas , hasMemberJoinDefined , hasDataActionDefined )
252- }
253-
254109func (r * componentWorkloadOps ) buildDataReplicationTask () error {
255110 _ , hasDataActionDefined := hasMemberJoinNDataActionDefined (r .synthesizeComp .LifecycleActions )
256111 if ! hasDataActionDefined {
@@ -271,7 +126,7 @@ func (r *componentWorkloadOps) buildDataReplicationTask() error {
271126 return err
272127 }
273128
274- // the source replica
129+ // choose the source replica
275130 source , err := r .sourceReplica (r .synthesizeComp .LifecycleActions .DataDump , provisioningReplicas )
276131 if err != nil {
277132 return err
@@ -320,90 +175,6 @@ func (r *componentWorkloadOps) sourceReplica(dataDump *appsv1.Action, provisioni
320175 return nil , fmt .Errorf ("no available pod to dump data" )
321176}
322177
323- func (r * componentWorkloadOps ) postHorizontalScale () error {
324- if err := r .postScaleOut (); err != nil {
325- return err
326- }
327- return nil
328- }
329-
330- func (r * componentWorkloadOps ) postScaleOut () error {
331- if err := r .buildDataReplicationTask (); err != nil {
332- return err
333- }
334- if err := r .joinMember4ScaleOut (); err != nil {
335- return err
336- }
337- return nil
338- }
339-
340- func (r * componentWorkloadOps ) joinMember4ScaleOut () error {
341- pods , err := component .ListOwnedPods (r .transCtx .Context , r .cli ,
342- r .synthesizeComp .Namespace , r .synthesizeComp .ClusterName , r .synthesizeComp .Name )
343- if err != nil {
344- return err
345- }
346-
347- joinErrors := make ([]error , 0 )
348- if err = component .UpdateReplicasStatusFunc (r .protoITS , func (replicas * component.ReplicasStatus ) error {
349- for _ , pod := range pods {
350- i := slices .IndexFunc (replicas .Status , func (r component.ReplicaStatus ) bool {
351- return r .Name == pod .Name
352- })
353- if i < 0 {
354- continue // the pod is not in the replicas status?
355- }
356-
357- status := replicas .Status [i ]
358- if status .MemberJoined == nil || * status .MemberJoined {
359- continue // no need to join or already joined
360- }
361-
362- // TODO: should wait for the data to be loaded before joining the member?
363-
364- if err := r .joinMemberForPod (pod , pods ); err != nil {
365- joinErrors = append (joinErrors , fmt .Errorf ("pod %s: %w" , pod .Name , err ))
366- } else {
367- replicas .Status [i ].MemberJoined = ptr .To (true )
368- }
369- }
370-
371- notJoinedReplicas := make ([]string , 0 )
372- for _ , r := range replicas .Status {
373- if r .MemberJoined != nil && ! * r .MemberJoined {
374- notJoinedReplicas = append (notJoinedReplicas , r .Name )
375- }
376- }
377- if len (notJoinedReplicas ) > 0 {
378- joinErrors = append (joinErrors , fmt .Errorf ("some replicas have not joined: %v" , notJoinedReplicas ))
379- }
380- return nil
381- }); err != nil {
382- return err
383- }
384-
385- if len (joinErrors ) > 0 {
386- return intctrlutil .NewRequeueError (time .Second , fmt .Sprintf ("%v" , joinErrors ))
387- }
388- return nil
389- }
390-
391- func (r * componentWorkloadOps ) joinMemberForPod (pod * corev1.Pod , pods []* corev1.Pod ) error {
392- synthesizedComp := r .synthesizeComp
393- lfa , err := lifecycle .New (synthesizedComp .Namespace , synthesizedComp .ClusterName , synthesizedComp .Name ,
394- synthesizedComp .LifecycleActions , synthesizedComp .TemplateVars , pod , pods ... )
395- if err != nil {
396- return err
397- }
398- if err = lfa .MemberJoin (r .transCtx .Context , r .cli , nil ); err != nil {
399- if ! errors .Is (err , lifecycle .ErrActionNotDefined ) {
400- return err
401- }
402- }
403- r .transCtx .Logger .Info ("succeed to join member for pod" , "pod" , pod .Name )
404- return nil
405- }
406-
407178func (r * componentWorkloadOps ) reconfigure () error {
408179 runningObjs , protoObjs , err := prepareFileTemplateObjects (r .transCtx )
409180 if err != nil {
0 commit comments