Skip to content

Commit 464ab57

Browse files
committed
take the headless service as an assistant object
1 parent 8f2a542 commit 464ab57

File tree

4 files changed

+85
-169
lines changed

4 files changed

+85
-169
lines changed

controllers/workloads/instanceset_controller2.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@ func (r *InstanceSetReconciler2) Reconcile(ctx context.Context, req ctrl.Request
5959
Do(instanceset2.NewStatusReconciler()).
6060
Do(instanceset2.NewRevisionUpdateReconciler()).
6161
Do(instanceset2.NewHeadlessServiceReconciler()).
62-
// Do(instanceset2.NewAssistantObjectReconciler()).
6362
Do(instanceset2.NewAlignmentReconciler()).
6463
Do(instanceset2.NewUpdateReconciler()).
6564
Commit()

pkg/controller/instance/reconciler_assistant_object.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -144,21 +144,18 @@ func (r *assistantObjectReconciler) create(tree *kubebuilderx.ObjectTree, inst *
144144
// if err := controllerutil.SetControllerReference(inst, obj, model.GetScheme()); err != nil {
145145
// return err
146146
// }
147-
tree.Logger.Info("create object", "kind", obj.GetObjectKind().GroupVersionKind().Kind, "name", obj.GetName(), "labels", obj.GetLabels())
148147
return tree.Add(obj)
149148
}
150149

151150
func (r *assistantObjectReconciler) update(tree *kubebuilderx.ObjectTree, assistantObj workloads.InstanceAssistantObject, robj, obj client.Object) error {
152151
ng, og := r.generation(obj), r.generation(robj)
153152
if ng > 0 && og > 0 && ng < og {
154-
tree.Logger.Info("skip update object", "kind", obj.GetObjectKind().GroupVersionKind().Kind, "name", obj.GetName(), "labels", obj.GetLabels())
155153
return nil
156154
}
157155
merged := r.copyAndMerge(assistantObj, robj, obj)
158156
if merged == nil {
159157
return nil
160158
}
161-
tree.Logger.Info("update object", "kind", obj.GetObjectKind().GroupVersionKind().Kind, "name", obj.GetName(), "labels", obj.GetLabels())
162159
return tree.Update(merged)
163160
}
164161

pkg/controller/instanceset2/reconciler_assistant_object.go

Lines changed: 0 additions & 160 deletions
This file was deleted.

pkg/controller/instanceset2/reconciler_headless_service.go

Lines changed: 85 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,17 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
2020
package instanceset2
2121

2222
import (
23+
"fmt"
24+
"slices"
25+
"strings"
26+
2327
corev1 "k8s.io/api/core/v1"
28+
"k8s.io/apimachinery/pkg/util/sets"
29+
"sigs.k8s.io/controller-runtime/pkg/client"
2430

2531
workloads "github.com/apecloud/kubeblocks/apis/workloads/v1"
32+
"github.com/apecloud/kubeblocks/pkg/constant"
33+
"github.com/apecloud/kubeblocks/pkg/controller/builder"
2634
"github.com/apecloud/kubeblocks/pkg/controller/kubebuilderx"
2735
"github.com/apecloud/kubeblocks/pkg/controller/model"
2836
intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
@@ -36,7 +44,7 @@ type headlessServiceReconciler struct{}
3644

3745
var _ kubebuilderx.Reconciler = &headlessServiceReconciler{}
3846

39-
func (a *headlessServiceReconciler) PreCondition(tree *kubebuilderx.ObjectTree) *kubebuilderx.CheckResult {
47+
func (r *headlessServiceReconciler) PreCondition(tree *kubebuilderx.ObjectTree) *kubebuilderx.CheckResult {
4048
if tree.GetRoot() == nil || model.IsObjectDeleting(tree.GetRoot()) {
4149
return kubebuilderx.ConditionUnsatisfied
4250
}
@@ -46,7 +54,7 @@ func (a *headlessServiceReconciler) PreCondition(tree *kubebuilderx.ObjectTree)
4654
return kubebuilderx.ConditionSatisfied
4755
}
4856

49-
func (a *headlessServiceReconciler) Reconcile(tree *kubebuilderx.ObjectTree) (kubebuilderx.Result, error) {
57+
func (r *headlessServiceReconciler) Reconcile(tree *kubebuilderx.ObjectTree) (kubebuilderx.Result, error) {
5058
its, _ := tree.GetRoot().(*workloads.InstanceSet)
5159
var headlessService *corev1.Service
5260
if !its.Spec.DisableDefaultHeadlessService {
@@ -65,21 +73,93 @@ func (a *headlessServiceReconciler) Reconcile(tree *kubebuilderx.ObjectTree) (ku
6573
return kubebuilderx.Continue, err
6674
}
6775

76+
skipToReconcileOpt := kubebuilderx.SkipToReconcile(shouldCloneInstanceAssistantObjects(its))
6877
if oldHeadlessService == nil && headlessService != nil {
69-
if err := tree.Add(headlessService); err != nil {
78+
if err := tree.AddWithOption(headlessService, skipToReconcileOpt); err != nil {
7079
return kubebuilderx.Continue, err
7180
}
7281
}
7382
if oldHeadlessService != nil && headlessService != nil {
7483
newObj := copyAndMerge(oldHeadlessService, headlessService)
75-
if err := tree.Update(newObj); err != nil {
84+
if err := tree.Update(newObj, skipToReconcileOpt); err != nil {
7685
return kubebuilderx.Continue, err
7786
}
7887
}
7988
if oldHeadlessService != nil && headlessService == nil {
80-
if err := tree.Delete(oldHeadlessService); err != nil {
89+
if err := tree.DeleteWithOption(oldHeadlessService, skipToReconcileOpt); err != nil {
8190
return kubebuilderx.Continue, err
8291
}
8392
}
93+
94+
if headlessService != nil {
95+
r.addHeadlessService(its, headlessService)
96+
} else {
97+
r.deleteHeadlessService(its, oldHeadlessService)
98+
}
99+
84100
return kubebuilderx.Continue, nil
85101
}
102+
103+
func (r *headlessServiceReconciler) addHeadlessService(its *workloads.InstanceSet, svc *corev1.Service) {
104+
if shouldCloneInstanceAssistantObjects(its) && svc != nil {
105+
if its.Spec.InstanceAssistantObjects == nil {
106+
its.Spec.InstanceAssistantObjects = make([]corev1.ObjectReference, 0)
107+
}
108+
gvk, _ := model.GetGVKName(svc)
109+
its.Spec.InstanceAssistantObjects = append(its.Spec.InstanceAssistantObjects,
110+
corev1.ObjectReference{
111+
Kind: gvk.Kind,
112+
Namespace: gvk.Namespace,
113+
Name: gvk.Name,
114+
})
115+
}
116+
}
117+
118+
func (r *headlessServiceReconciler) deleteHeadlessService(its *workloads.InstanceSet, obj client.Object) {
119+
var svc *corev1.Service
120+
if obj != nil {
121+
svc = obj.(*corev1.Service)
122+
}
123+
if svc != nil {
124+
gvk, _ := model.GetGVKName(svc)
125+
its.Spec.InstanceAssistantObjects = slices.DeleteFunc(its.Spec.InstanceAssistantObjects,
126+
func(o corev1.ObjectReference) bool {
127+
return o.Kind == gvk.Kind && o.Namespace == gvk.Namespace && o.Name == gvk.Name
128+
})
129+
}
130+
}
131+
132+
func getHeadlessSvcSelector(its *workloads.InstanceSet) map[string]string {
133+
selectors := make(map[string]string)
134+
for k, v := range its.Spec.Selector.MatchLabels {
135+
selectors[k] = v
136+
}
137+
selectors[constant.KBAppReleasePhaseKey] = constant.ReleasePhaseStable
138+
return selectors
139+
}
140+
141+
func buildHeadlessSvc(its workloads.InstanceSet, labels, selectors map[string]string) *corev1.Service {
142+
hdlBuilder := builder.NewHeadlessServiceBuilder(its.Namespace, getHeadlessSvcName(its.Name)).
143+
AddLabelsInMap(labels).
144+
AddSelectorsInMap(selectors).
145+
SetPublishNotReadyAddresses(true)
146+
147+
portNames := sets.New[string]()
148+
for _, container := range its.Spec.Template.Spec.Containers {
149+
for _, port := range container.Ports {
150+
servicePort := corev1.ServicePort{
151+
Protocol: port.Protocol,
152+
Port: port.ContainerPort,
153+
}
154+
switch {
155+
case len(port.Name) > 0 && !portNames.Has(port.Name):
156+
portNames.Insert(port.Name)
157+
servicePort.Name = port.Name
158+
default:
159+
servicePort.Name = fmt.Sprintf("%s-%d", strings.ToLower(string(port.Protocol)), port.ContainerPort)
160+
}
161+
hdlBuilder.AddPorts(servicePort)
162+
}
163+
}
164+
return hdlBuilder.GetObject()
165+
}

0 commit comments

Comments
 (0)