diff --git a/apis/apps/v1/componentdefinition_types.go b/apis/apps/v1/componentdefinition_types.go index 3dae6e08817..3164dac4336 100644 --- a/apis/apps/v1/componentdefinition_types.go +++ b/apis/apps/v1/componentdefinition_types.go @@ -1057,6 +1057,12 @@ type ComponentFileTemplate struct { // +optional Namespace string `json:"namespace,omitempty"` + // Indicates whether the template requires rendering at the pod level to incorporate pod-specific variables. + // + // +kubebuilder:default=false + // +optional + RequiresPodRender bool `json:"requiresPodRender,omitempty"` + // Refers to the volume name of PodTemplate. The file produced through the template will be mounted to // the corresponding volume. Must be a DNS_LABEL name. // The volume name must be defined in podSpec.containers[*].volumeMounts. diff --git a/config/crd/bases/apps.kubeblocks.io_componentdefinitions.yaml b/config/crd/bases/apps.kubeblocks.io_componentdefinitions.yaml index ef1cbc51378..09b59bd6745 100644 --- a/config/crd/bases/apps.kubeblocks.io_componentdefinitions.yaml +++ b/config/crd/bases/apps.kubeblocks.io_componentdefinitions.yaml @@ -4329,6 +4329,11 @@ spec: maxLength: 63 pattern: ^[a-z0-9]([a-z0-9\-]*[a-z0-9])?$ type: string + requiresPodRender: + default: false + description: Indicates whether the template requires rendering + at the pod level to incorporate pod-specific variables. + type: boolean template: description: Specifies the name of the referenced template ConfigMap object. @@ -16057,6 +16062,11 @@ spec: maxLength: 63 pattern: ^[a-z0-9]([a-z0-9\-]*[a-z0-9])?$ type: string + requiresPodRender: + default: false + description: Indicates whether the template requires rendering + at the pod level to incorporate pod-specific variables. + type: boolean template: description: Specifies the name of the referenced template ConfigMap object. diff --git a/controllers/apps/component/transformer_component_reconfigure.go b/controllers/apps/component/transformer_component_reconfigure.go index dfb80b75faf..5d655415f96 100644 --- a/controllers/apps/component/transformer_component_reconfigure.go +++ b/controllers/apps/component/transformer_component_reconfigure.go @@ -229,6 +229,7 @@ func (t *componentReconfigureTransformer) reconfigureReplicaTemplate(transCtx *c return nil // disabled by the external } } + // TODO: variables, dynamic render if tpl.Reconfigure != nil { actionName := component.UDFReconfigureActionName(tpl) args := lifecycle.FileTemplateChanges(changes.Created, changes.Removed, changes.Updated) diff --git a/controllers/apps/component/transformer_component_template.go b/controllers/apps/component/transformer_component_template.go index 8d121f63fcc..63c443fda80 100644 --- a/controllers/apps/component/transformer_component_template.go +++ b/controllers/apps/component/transformer_component_template.go @@ -51,6 +51,7 @@ var _ graph.Transformer = &componentFileTemplateTransformer{} func (t *componentFileTemplateTransformer) Transform(ctx graph.TransformContext, dag *graph.DAG) error { transCtx, _ := ctx.(*componentTransformContext) + synthesizedComp := transCtx.SynthesizeComponent if model.IsObjectDeleting(transCtx.ComponentOrig) { return nil } @@ -73,7 +74,14 @@ func (t *componentFileTemplateTransformer) Transform(ctx graph.TransformContext, t.handleTemplateObjectChanges(transCtx, dag, runningObjs, protoObjs, toCreate, toDelete, toUpdate) - return t.buildPodVolumes(transCtx) + if err = t.buildPodVolumes(transCtx); err != nil { + return err + } + + synthesizedComp.KBAgentTasks = append(synthesizedComp.KBAgentTasks, + *component.NewRenderTask(synthesizedComp.FullCompName, synthesizedComp.Generation, nil, synthesizedComp, nil)) + + return nil } func (t *componentFileTemplateTransformer) precheck(transCtx *componentTransformContext) error { @@ -234,7 +242,10 @@ func renderFileTemplateData(transCtx *componentTransformContext, variables[k] = v // override } - tpl := template.New(fileTemplate.Name).Option("missingkey=error").Funcs(sprig.TxtFuncMap()) + tpl := template.New(fileTemplate.Name).Funcs(sprig.TxtFuncMap()) + if !fileTemplate.RequiresPodRender { + tpl = tpl.Option("missingkey=error") + } for key, val := range data { ptpl, err := tpl.Parse(val) if err != nil { diff --git a/controllers/apps/component/transformer_component_workload.go b/controllers/apps/component/transformer_component_workload.go index 5f3ecb8f466..98e620ce73e 100644 --- a/controllers/apps/component/transformer_component_workload.go +++ b/controllers/apps/component/transformer_component_workload.go @@ -50,6 +50,7 @@ import ( "github.com/apecloud/kubeblocks/pkg/controller/lifecycle" "github.com/apecloud/kubeblocks/pkg/controller/model" intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil" + "github.com/apecloud/kubeblocks/pkg/kbagent" ) const ( @@ -236,6 +237,10 @@ func (t *componentWorkloadTransformer) handleUpdate(reqCtx intctrlutil.RequestCt if err := t.handleWorkloadUpdate(reqCtx, dag, synthesizedComp, comp, runningITS, protoITS); err != nil { return err } + + if err := updateEnvCM4KBAgentTask(reqCtx.Ctx, t.Client, dag, synthesizedComp, comp); err != nil { + return err + } } objCopy := copyAndMergeITS(runningITS, protoITS) @@ -250,9 +255,6 @@ func (t *componentWorkloadTransformer) handleUpdate(reqCtx intctrlutil.RequestCt }) } - // if start { - // return intctrlutil.NewDelayedRequeueError(time.Second, "workload is starting") - // } return nil } @@ -411,6 +413,26 @@ func buildPodSpecVolumeMounts(synthesizeComp *component.SynthesizedComponent) { synthesizeComp.PodSpec = podSpec } +func updateEnvCM4KBAgentTask(ctx context.Context, cli client.Reader, dag *graph.DAG, + synthesizedComp *component.SynthesizedComponent, comp *appsv1.Component) error { + if len(synthesizedComp.KBAgentTasks) == 0 { + return nil + } + envVar, err := kbagent.BuildEnv4Worker(synthesizedComp.KBAgentTasks) + if err != nil { + return err + } + // apply the updated env to the env CM + transCtx := &componentTransformContext{ + Context: ctx, + Client: model.NewGraphClient(cli), + SynthesizeComponent: synthesizedComp, + Component: comp, + } + parameters := map[string]string{envVar.Name: envVar.Value} + return createOrUpdateEnvConfigMap(transCtx, dag, nil, parameters) +} + // copyAndMergeITS merges two ITS objects for updating: // 1. new an object targetObj by copying from oldObj // 2. merge all fields can be updated from newObj into targetObj @@ -529,32 +551,6 @@ func checkNRollbackProtoImages(itsObj, itsProto *workloads.InstanceSet) { } } -// expandVolume handles workload expand volume -func (r *componentWorkloadOps) expandVolume() error { - return r.expandVolumeClaimTemplates(r.runningITS.Spec.VolumeClaimTemplates, r.synthesizeComp.VolumeClaimTemplates) -} - -func (r *componentWorkloadOps) expandVolumeClaimTemplates(runningVCTs []corev1.PersistentVolumeClaim, protoVCTs []corev1.PersistentVolumeClaimTemplate) error { - for _, vct := range runningVCTs { - var proto *corev1.PersistentVolumeClaimTemplate - for i, v := range protoVCTs { - if v.Name == vct.Name { - proto = &protoVCTs[i] - break - } - } - // REVIEW: seems we can remove a volume claim from templates at runtime, without any changes and warning messages? - if proto == nil { - continue - } - - if err := r.expandVolumes(vct.Name, proto); err != nil { - return err - } - } - return nil -} - func (r *componentWorkloadOps) horizontalScale() error { var ( in = r.runningItsPodNameSet.Difference(r.desiredCompPodNameSet) @@ -731,20 +727,11 @@ func (r *componentWorkloadOps) scaleOut() error { } replicas := append(slices.Clone(newReplicas), provisioningReplicas...) - parameters, err := component.NewReplicaTask(r.synthesizeComp.FullCompName, r.synthesizeComp.Generation, source, replicas) + task, err := component.NewReplicaTask(r.synthesizeComp.FullCompName, r.synthesizeComp.Generation, source, replicas) if err != nil { return err } - // apply the updated env to the env CM - transCtx := &componentTransformContext{ - Context: r.reqCtx.Ctx, - Client: model.NewGraphClient(r.cli), - SynthesizeComponent: r.synthesizeComp, - Component: r.component, - } - if err = createOrUpdateEnvConfigMap(transCtx, r.dag, nil, parameters); err != nil { - return err - } + r.synthesizeComp.KBAgentTasks = append(r.synthesizeComp.KBAgentTasks, *task) return nil }(); err != nil { return err @@ -848,6 +835,27 @@ func (r *componentWorkloadOps) joinMemberForPod(pod *corev1.Pod, pods []*corev1. return nil } +func (r *componentWorkloadOps) expandVolume() error { + for _, vct := range r.runningITS.Spec.VolumeClaimTemplates { + var proto *corev1.PersistentVolumeClaimTemplate + for i, v := range r.synthesizeComp.VolumeClaimTemplates { + if v.Name == vct.Name { + proto = &r.synthesizeComp.VolumeClaimTemplates[i] + break + } + } + // REVIEW: seems we can remove a volume claim from templates at runtime, without any changes and warning messages? + if proto == nil { + continue + } + + if err := r.expandVolumes(vct.Name, proto); err != nil { + return err + } + } + return nil +} + func (r *componentWorkloadOps) expandVolumes(vctName string, proto *corev1.PersistentVolumeClaimTemplate) error { for _, pod := range r.runningItsPodNames { pvc := &corev1.PersistentVolumeClaim{} diff --git a/deploy/helm/crds/apps.kubeblocks.io_componentdefinitions.yaml b/deploy/helm/crds/apps.kubeblocks.io_componentdefinitions.yaml index ef1cbc51378..09b59bd6745 100644 --- a/deploy/helm/crds/apps.kubeblocks.io_componentdefinitions.yaml +++ b/deploy/helm/crds/apps.kubeblocks.io_componentdefinitions.yaml @@ -4329,6 +4329,11 @@ spec: maxLength: 63 pattern: ^[a-z0-9]([a-z0-9\-]*[a-z0-9])?$ type: string + requiresPodRender: + default: false + description: Indicates whether the template requires rendering + at the pod level to incorporate pod-specific variables. + type: boolean template: description: Specifies the name of the referenced template ConfigMap object. @@ -16057,6 +16062,11 @@ spec: maxLength: 63 pattern: ^[a-z0-9]([a-z0-9\-]*[a-z0-9])?$ type: string + requiresPodRender: + default: false + description: Indicates whether the template requires rendering + at the pod level to incorporate pod-specific variables. + type: boolean template: description: Specifies the name of the referenced template ConfigMap object. diff --git a/docs/developer_docs/api-reference/cluster.md b/docs/developer_docs/api-reference/cluster.md index 3e96fc62ee3..2b4468d81f8 100644 --- a/docs/developer_docs/api-reference/cluster.md +++ b/docs/developer_docs/api-reference/cluster.md @@ -5622,6 +5622,18 @@ string +requiresPodRender
+ +bool + + + +(Optional) +

Indicates whether the template requires rendering at the pod level to incorporate pod-specific variables.

+ + + + volumeName
string diff --git a/pkg/controller/component/kbagent_task.go b/pkg/controller/component/kbagent_task.go new file mode 100644 index 00000000000..1b28f02585b --- /dev/null +++ b/pkg/controller/component/kbagent_task.go @@ -0,0 +1,244 @@ +/* +Copyright (C) 2022-2025 ApeCloud Co., Ltd + +This file is part of KubeBlocks project + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see . +*/ + +package component + +import ( + "context" + "encoding/json" + "fmt" + "slices" + "strings" + + "github.com/go-logr/logr" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/record" + "k8s.io/utils/ptr" + "sigs.k8s.io/controller-runtime/pkg/client" + + workloads "github.com/apecloud/kubeblocks/apis/workloads/v1" + "github.com/apecloud/kubeblocks/pkg/constant" + intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil" + "github.com/apecloud/kubeblocks/pkg/kbagent" + "github.com/apecloud/kubeblocks/pkg/kbagent/proto" +) + +const ( + // new replicas task & event + newReplicaTask = "newReplica" + defaultNewReplicaTaskReportPeriodSeconds = 60 + + // render task + renderTask = "render" +) + +func NewReplicaTask(compName, uid string, source *corev1.Pod, replicas []string) (*proto.Task, error) { + port, err := intctrlutil.GetPortByName(*source, kbagent.ContainerName, kbagent.DefaultStreamingPortName) + if err != nil { + return nil, err + } + return &proto.Task{ + Instance: compName, + Task: newReplicaTask, + UID: uid, + Replicas: strings.Join(replicas, ","), + NotifyAtFinish: true, + ReportPeriodSeconds: defaultNewReplicaTaskReportPeriodSeconds, + NewReplica: &proto.NewReplicaTask{ + Remote: intctrlutil.PodFQDN(source.Namespace, compName, source.Name), + Port: port, + Replicas: strings.Join(replicas, ","), + }, + }, nil +} + +func NewRenderTask(compName, uid string, replicas []string, synthesizedComp *SynthesizedComponent, files map[string][]string) *proto.Task { + task := proto.Task{ + Instance: compName, + Task: renderTask, + UID: uid, + Replicas: strings.Join(replicas, ","), + NotifyAtFinish: false, + ReportPeriodSeconds: 0, + Render: &proto.RenderTask{ + Templates: []proto.RenderTaskFileTemplate{}, + }, + } + for _, tpl := range synthesizedComp.FileTemplates { + if tpl.RequiresPodRender { + task.Render.Templates = append(task.Render.Templates, proto.RenderTaskFileTemplate{ + Name: tpl.Name, + Files: templateFiles(synthesizedComp, tpl.Name, files[tpl.Name]), + Variables: tpl.Variables, + }) + } + } + if len(task.Render.Templates) > 0 { + return &task + } + return nil +} + +type KBAgentTaskEventHandler struct{} + +func (h *KBAgentTaskEventHandler) Handle(cli client.Client, reqCtx intctrlutil.RequestCtx, recorder record.EventRecorder, event *corev1.Event) error { + if !h.isTaskEvent(event) { + return nil + } + + taskEvent := &proto.TaskEvent{} + if err := json.Unmarshal([]byte(event.Message), taskEvent); err != nil { + return err + } + + return h.handleEvent(reqCtx, cli, event.InvolvedObject.Namespace, *taskEvent) +} + +func (h *KBAgentTaskEventHandler) isTaskEvent(event *corev1.Event) bool { + return event.ReportingController == proto.ProbeEventReportingController && + event.Reason == "task" && event.InvolvedObject.FieldPath == proto.ProbeEventFieldPath +} + +func (h *KBAgentTaskEventHandler) handleEvent(reqCtx intctrlutil.RequestCtx, cli client.Client, namespace string, event proto.TaskEvent) error { + if event.Task == newReplicaTask { + return handleNewReplicaTaskEvent(reqCtx.Log, reqCtx.Ctx, cli, namespace, event) + } + return fmt.Errorf("unsupported kind of task event: %s", event.Task) +} + +func handleNewReplicaTaskEvent(logger logr.Logger, ctx context.Context, cli client.Client, namespace string, event proto.TaskEvent) error { + key := types.NamespacedName{ + Namespace: namespace, + Name: event.Instance, + } + its := &workloads.InstanceSet{} + if err := cli.Get(ctx, key, its); err != nil { + logger.Error(err, "get ITS failed when handle new replica task event", + "code", event.Code, "finished", !event.EndTime.IsZero(), "message", event.Message) + return err + } + + var err error + finished := !event.EndTime.IsZero() + switch { + case finished && event.Code == 0: + err = handleNewReplicaTaskEvent4Finished(ctx, cli, its, event) + case finished: + err = handleNewReplicaTaskEvent4Failed(ctx, cli, its, event) + default: + err = handleNewReplicaTaskEvent4Unfinished(ctx, cli, its, event) + } + if err != nil { + logger.Error(err, "handle new replica task event failed", + "code", event.Code, "finished", finished, "message", event.Message) + } else { + logger.Info("handle new replica task event success", + "code", event.Code, "finished", finished, "message", event.Message) + } + return err +} + +func handleNewReplicaTaskEvent4Finished(ctx context.Context, cli client.Client, its *workloads.InstanceSet, event proto.TaskEvent) error { + if err := func() error { + envKey := types.NamespacedName{ + Namespace: its.Namespace, + Name: constant.GetCompEnvCMName(its.Name), + } + obj := &corev1.ConfigMap{} + err := cli.Get(ctx, envKey, obj, inDataContext()) + if err != nil { + return err + } + + parameters, err := updateKBAgentTaskEnv(obj.Data, func(task proto.Task) *proto.Task { + if task.Task == newReplicaTask { + replicas := strings.Split(task.Replicas, ",") + replicas = slices.DeleteFunc(replicas, func(r string) bool { + return r == event.Replica + }) + if len(replicas) == 0 { + return nil + } + task.Replicas = strings.Join(replicas, ",") + if task.NewReplica != nil { + task.NewReplica.Replicas = task.Replicas + } + } + return &task + }) + if err != nil { + return err + } + if parameters == nil { + return nil // do nothing + } + + if obj.Data == nil { + obj.Data = make(map[string]string) + } + for k, v := range parameters { + obj.Data[k] = v + } + return cli.Update(ctx, obj, inDataContext()) + }(); err != nil { + return err + } + return updateReplicaStatusFunc(ctx, cli, its, event.Replica, func(status *ReplicaStatus) error { + status.Message = "" + status.Provisioned = true + status.DataLoaded = ptr.To(true) + return nil + }) +} + +func handleNewReplicaTaskEvent4Unfinished(ctx context.Context, cli client.Client, its *workloads.InstanceSet, event proto.TaskEvent) error { + return updateReplicaStatusFunc(ctx, cli, its, event.Replica, func(status *ReplicaStatus) error { + status.Message = event.Message + status.Provisioned = true + status.DataLoaded = ptr.To(false) + return nil + }) +} + +func handleNewReplicaTaskEvent4Failed(ctx context.Context, cli client.Client, its *workloads.InstanceSet, event proto.TaskEvent) error { + return updateReplicaStatusFunc(ctx, cli, its, event.Replica, func(status *ReplicaStatus) error { + status.Message = event.Message + status.Provisioned = true + return nil + }) +} + +func updateReplicaStatusFunc(ctx context.Context, cli client.Client, + its *workloads.InstanceSet, replicaName string, f func(*ReplicaStatus) error) error { + if err := UpdateReplicasStatusFunc(its, func(status *ReplicasStatus) error { + for i := range status.Status { + if status.Status[i].Name == replicaName { + if f != nil { + return f(&status.Status[i]) + } + return nil + } + } + return fmt.Errorf("replica %s not found", replicaName) + }); err != nil { + return err + } + return cli.Update(ctx, its) +} diff --git a/pkg/controller/component/kbagent_task_event.go b/pkg/controller/component/kbagent_task_event.go deleted file mode 100644 index d48e576c3d0..00000000000 --- a/pkg/controller/component/kbagent_task_event.go +++ /dev/null @@ -1,59 +0,0 @@ -/* -Copyright (C) 2022-2025 ApeCloud Co., Ltd - -This file is part of KubeBlocks project - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU Affero General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU Affero General Public License for more details. - -You should have received a copy of the GNU Affero General Public License -along with this program. If not, see . -*/ - -package component - -import ( - "encoding/json" - "fmt" - - corev1 "k8s.io/api/core/v1" - "k8s.io/client-go/tools/record" - "sigs.k8s.io/controller-runtime/pkg/client" - - intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil" - "github.com/apecloud/kubeblocks/pkg/kbagent/proto" -) - -type KBAgentTaskEventHandler struct{} - -func (h *KBAgentTaskEventHandler) Handle(cli client.Client, reqCtx intctrlutil.RequestCtx, recorder record.EventRecorder, event *corev1.Event) error { - if !h.isTaskEvent(event) { - return nil - } - - taskEvent := &proto.TaskEvent{} - if err := json.Unmarshal([]byte(event.Message), taskEvent); err != nil { - return err - } - - return h.handleEvent(reqCtx, cli, event.InvolvedObject.Namespace, *taskEvent) -} - -func (h *KBAgentTaskEventHandler) isTaskEvent(event *corev1.Event) bool { - return event.ReportingController == proto.ProbeEventReportingController && - event.Reason == "task" && event.InvolvedObject.FieldPath == proto.ProbeEventFieldPath -} - -func (h *KBAgentTaskEventHandler) handleEvent(reqCtx intctrlutil.RequestCtx, cli client.Client, namespace string, event proto.TaskEvent) error { - if event.Task == newReplicaTask { - return handleNewReplicaTaskEvent(reqCtx.Log, reqCtx.Ctx, cli, namespace, event) - } - return fmt.Errorf("unsupported kind of task event: %s", event.Task) -} diff --git a/pkg/controller/component/replicas.go b/pkg/controller/component/replicas.go index f8b370366de..57d8bd8dd2d 100644 --- a/pkg/controller/component/replicas.go +++ b/pkg/controller/component/replicas.go @@ -20,32 +20,18 @@ along with this program. If not, see . package component import ( - "context" "encoding/json" - "fmt" "slices" - "strings" "time" - "github.com/go-logr/logr" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/types" "k8s.io/utils/ptr" - "sigs.k8s.io/controller-runtime/pkg/client" workloads "github.com/apecloud/kubeblocks/apis/workloads/v1" "github.com/apecloud/kubeblocks/pkg/constant" - intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil" - "github.com/apecloud/kubeblocks/pkg/kbagent" - "github.com/apecloud/kubeblocks/pkg/kbagent/proto" ) const ( replicaStatusAnnotationKey = "apps.kubeblocks.io/replicas-status" - - // new replicas task & event - newReplicaTask = "newReplica" - defaultNewReplicaTaskReportPeriodSeconds = 60 ) type ReplicasStatus struct { @@ -209,27 +195,6 @@ func GetReplicasStatusFunc(its *workloads.InstanceSet, f func(ReplicaStatus) boo return replicas, nil } -func NewReplicaTask(compName, uid string, source *corev1.Pod, replicas []string) (map[string]string, error) { - port, err := intctrlutil.GetPortByName(*source, kbagent.ContainerName, kbagent.DefaultStreamingPortName) - if err != nil { - return nil, err - } - task := proto.Task{ - Instance: compName, - Task: newReplicaTask, - UID: uid, - Replicas: strings.Join(replicas, ","), - NotifyAtFinish: true, - ReportPeriodSeconds: defaultNewReplicaTaskReportPeriodSeconds, - NewReplica: &proto.NewReplicaTask{ - Remote: intctrlutil.PodFQDN(source.Namespace, compName, source.Name), - Port: port, - Replicas: strings.Join(replicas, ","), - }, - } - return buildKBAgentTaskEnv(task) -} - func compGenerationFromITS(its *workloads.InstanceSet) string { if its == nil { return "" @@ -277,123 +242,3 @@ func setReplicasStatus(its *workloads.InstanceSet, status ReplicasStatus) error its.SetAnnotations(annotations) return nil } - -func handleNewReplicaTaskEvent(logger logr.Logger, ctx context.Context, cli client.Client, namespace string, event proto.TaskEvent) error { - key := types.NamespacedName{ - Namespace: namespace, - Name: event.Instance, - } - its := &workloads.InstanceSet{} - if err := cli.Get(ctx, key, its); err != nil { - logger.Error(err, "get ITS failed when handle new replica task event", - "code", event.Code, "finished", !event.EndTime.IsZero(), "message", event.Message) - return err - } - - var err error - finished := !event.EndTime.IsZero() - switch { - case finished && event.Code == 0: - err = handleNewReplicaTaskEvent4Finished(ctx, cli, its, event) - case finished: - err = handleNewReplicaTaskEvent4Failed(ctx, cli, its, event) - default: - err = handleNewReplicaTaskEvent4Unfinished(ctx, cli, its, event) - } - if err != nil { - logger.Error(err, "handle new replica task event failed", - "code", event.Code, "finished", finished, "message", event.Message) - } else { - logger.Info("handle new replica task event success", - "code", event.Code, "finished", finished, "message", event.Message) - } - return err -} - -func handleNewReplicaTaskEvent4Finished(ctx context.Context, cli client.Client, its *workloads.InstanceSet, event proto.TaskEvent) error { - if err := func() error { - envKey := types.NamespacedName{ - Namespace: its.Namespace, - Name: constant.GetCompEnvCMName(its.Name), - } - obj := &corev1.ConfigMap{} - err := cli.Get(ctx, envKey, obj, inDataContext()) - if err != nil { - return err - } - - parameters, err := updateKBAgentTaskEnv(obj.Data, func(task proto.Task) *proto.Task { - if task.Task == newReplicaTask { - replicas := strings.Split(task.Replicas, ",") - replicas = slices.DeleteFunc(replicas, func(r string) bool { - return r == event.Replica - }) - if len(replicas) == 0 { - return nil - } - task.Replicas = strings.Join(replicas, ",") - if task.NewReplica != nil { - task.NewReplica.Replicas = task.Replicas - } - } - return &task - }) - if err != nil { - return err - } - if parameters == nil { - return nil // do nothing - } - - if obj.Data == nil { - obj.Data = make(map[string]string) - } - for k, v := range parameters { - obj.Data[k] = v - } - return cli.Update(ctx, obj, inDataContext()) - }(); err != nil { - return err - } - return updateReplicaStatusFunc(ctx, cli, its, event.Replica, func(status *ReplicaStatus) error { - status.Message = "" - status.Provisioned = true - status.DataLoaded = ptr.To(true) - return nil - }) -} - -func handleNewReplicaTaskEvent4Unfinished(ctx context.Context, cli client.Client, its *workloads.InstanceSet, event proto.TaskEvent) error { - return updateReplicaStatusFunc(ctx, cli, its, event.Replica, func(status *ReplicaStatus) error { - status.Message = event.Message - status.Provisioned = true - status.DataLoaded = ptr.To(false) - return nil - }) -} - -func handleNewReplicaTaskEvent4Failed(ctx context.Context, cli client.Client, its *workloads.InstanceSet, event proto.TaskEvent) error { - return updateReplicaStatusFunc(ctx, cli, its, event.Replica, func(status *ReplicaStatus) error { - status.Message = event.Message - status.Provisioned = true - return nil - }) -} - -func updateReplicaStatusFunc(ctx context.Context, cli client.Client, - its *workloads.InstanceSet, replicaName string, f func(*ReplicaStatus) error) error { - if err := UpdateReplicasStatusFunc(its, func(status *ReplicasStatus) error { - for i := range status.Status { - if status.Status[i].Name == replicaName { - if f != nil { - return f(&status.Status[i]) - } - return nil - } - } - return fmt.Errorf("replica %s not found", replicaName) - }); err != nil { - return err - } - return cli.Update(ctx, its) -} diff --git a/pkg/controller/component/synthesize_component.go b/pkg/controller/component/synthesize_component.go index 2ce7b7baef3..b3b64567003 100644 --- a/pkg/controller/component/synthesize_component.go +++ b/pkg/controller/component/synthesize_component.go @@ -36,6 +36,7 @@ import ( "github.com/apecloud/kubeblocks/pkg/constant" intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil" "github.com/apecloud/kubeblocks/pkg/generics" + "github.com/apecloud/kubeblocks/pkg/kbagent/proto" viper "github.com/apecloud/kubeblocks/pkg/viperx" ) @@ -118,6 +119,7 @@ func BuildSynthesizedComponent(ctx context.Context, cli client.Reader, PodUpdatePolicy: comp.Spec.PodUpdatePolicy, UpdateStrategy: compDef.Spec.UpdateStrategy, InstanceUpdateStrategy: comp.Spec.InstanceUpdateStrategy, + KBAgentTasks: make([]proto.Task, 0), } if err = mergeUserDefinedEnv(synthesizeComp, comp); err != nil { diff --git a/pkg/controller/component/template.go b/pkg/controller/component/template.go new file mode 100644 index 00000000000..ab8fac50921 --- /dev/null +++ b/pkg/controller/component/template.go @@ -0,0 +1,65 @@ +/* +Copyright (C) 2022-2025 ApeCloud Co., Ltd + +This file is part of KubeBlocks project + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see . +*/ + +package component + +import ( + "path/filepath" +) + +func templateFiles(synthesizedComp *SynthesizedComponent, tpl string, files []string) []string { + result := make([]string, 0) + for _, f := range files { + fullPath := absoluteTemplateFilePath(synthesizedComp, tpl, f) + if len(fullPath) > 0 { + result = append(result, fullPath) + } + } + return result +} + +func absoluteTemplateFilePath(synthesizedComp *SynthesizedComponent, tpl, file string) string { + var volName, mountPath string + for _, fileTpl := range synthesizedComp.FileTemplates { + if fileTpl.Name == tpl { + volName = fileTpl.VolumeName + break + } + } + if volName == "" { + return "" // has no volumes specified + } + + for _, container := range synthesizedComp.PodSpec.Containers { + for _, mount := range container.VolumeMounts { + if mount.Name == volName { + mountPath = mount.MountPath + break + } + } + if mountPath != "" { + break + } + } + if mountPath == "" { + return "" // the template is not mounted, ignore it + } + + return filepath.Join(mountPath, file) +} diff --git a/pkg/controller/component/type.go b/pkg/controller/component/type.go index 9e718dcc686..d3d30e71728 100644 --- a/pkg/controller/component/type.go +++ b/pkg/controller/component/type.go @@ -26,6 +26,7 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" kbappsv1 "github.com/apecloud/kubeblocks/apis/apps/v1" + "github.com/apecloud/kubeblocks/pkg/kbagent/proto" ) type SynthesizedComponent struct { @@ -79,6 +80,7 @@ type SynthesizedComponent struct { MinReadySeconds int32 `json:"minReadySeconds,omitempty"` DisableExporter *bool `json:"disableExporter,omitempty"` Stop *bool + KBAgentTasks []proto.Task } type SynthesizedFileTemplate struct { diff --git a/pkg/kbagent/proto/proto.go b/pkg/kbagent/proto/proto.go index c0778352946..0813dc1a453 100644 --- a/pkg/kbagent/proto/proto.go +++ b/pkg/kbagent/proto/proto.go @@ -88,6 +88,7 @@ type Task struct { NotifyAtFinish bool `json:"notifyAtFinish,omitempty"` // whether to notify the controller when the task is finished ReportPeriodSeconds int32 `json:"reportPeriodSeconds,omitempty"` // the period to report the progress of the task NewReplica *NewReplicaTask `json:"newReplica,omitempty"` + Render *RenderTask `json:"render,omitempty"` } type TaskEvent struct { @@ -109,3 +110,13 @@ type NewReplicaTask struct { Parameters map[string]string `json:"parameters,omitempty"` // parameters for data dump and load TimeoutSeconds *int32 `json:"timeoutSeconds,omitempty"` } + +type RenderTask struct { + Templates []RenderTaskFileTemplate `json:"templates"` +} + +type RenderTaskFileTemplate struct { + Name string `json:"name"` + Files []string `json:"files"` + Variables map[string]string `json:"variables,omitempty"` +} diff --git a/pkg/kbagent/service/action.go b/pkg/kbagent/service/action.go index db5a5d81ebc..5520bc14e4d 100644 --- a/pkg/kbagent/service/action.go +++ b/pkg/kbagent/service/action.go @@ -120,13 +120,20 @@ func (s *actionService) handleRequest(ctx context.Context, req *proto.ActionRequ if action.Exec == nil { return nil, errors.Wrap(proto.ErrNotImplemented, "only exec action is supported") } - // HACK: pre-check for the reconfigure action - if err := checkReconfigure(ctx, req); err != nil { + if err := s.preExec(ctx, req, action); err != nil { return nil, err } return s.handleExecAction(ctx, req, action) } +func (s *actionService) preExec(ctx context.Context, req *proto.ActionRequest, action *proto.Action) error { + // HACK: pre-check for the reconfigure action + if err := checkReconfigure(ctx, req); err != nil { + return err + } + return nil +} + func (s *actionService) handleExecAction(ctx context.Context, req *proto.ActionRequest, action *proto.Action) ([]byte, error) { if req.NonBlocking == nil || !*req.NonBlocking { return runCommand(ctx, action.Exec, req.Parameters, req.TimeoutSeconds) diff --git a/pkg/kbagent/service/reconfigure.go b/pkg/kbagent/service/reconfigure.go index c7d0d22a1af..199e6594f39 100644 --- a/pkg/kbagent/service/reconfigure.go +++ b/pkg/kbagent/service/reconfigure.go @@ -37,8 +37,8 @@ const ( configFilesUpdated = "KB_CONFIG_FILES_UPDATED" ) -func checkReconfigure(_ context.Context, req *proto.ActionRequest) error { - if req.Action != "reconfigure" && !strings.HasPrefix(req.Action, "udf-reconfigure") { +func checkReconfigure(ctx context.Context, req *proto.ActionRequest) error { + if !isReconfigureRequest(req) { return nil } @@ -51,7 +51,12 @@ func checkReconfigure(_ context.Context, req *proto.ActionRequest) error { if err := checkReconfigureUpdated(req); err != nil { return err } - return nil + + return reconfigurePostRender(ctx, req) +} + +func isReconfigureRequest(req *proto.ActionRequest) bool { + return req.Action == "reconfigure" || strings.HasPrefix(req.Action, "udf-reconfigure") } func checkReconfigureCreated(req *proto.ActionRequest) error { @@ -128,3 +133,50 @@ func checkLocalFileUpToDate(file, checksum string) error { } return nil } + +func reconfigurePostRender(ctx context.Context, req *proto.ActionRequest) error { + files := reconfigurePostRenderFiles(req) + if len(files) == 0 { + return nil + } + + render := renderTask{ + task: &proto.RenderTask{ + Templates: []proto.RenderTaskFileTemplate{ + { + Name: "reconfigure", // TODO: tpl name + Files: files, + Variables: req.Parameters, + }, + }, + }, + } + ch, err := render.run(ctx) + if err != nil { + return err + } + if ch != nil { + err1, ok := <-ch + if !ok { + err1 = errors.New("runtime error: error chan closed unexpectedly") + } + return err1 + } + return nil +} + +func reconfigurePostRenderFiles(req *proto.ActionRequest) []string { + files := make([]string, 0) + created := req.Parameters[configFilesCreated] + updated := req.Parameters[configFilesUpdated] + if len(created) > 0 { + files = append(files, strings.Split(created, ",")...) + } + if len(updated) > 0 { + for _, item := range strings.Split(updated, ",") { + tokens := strings.Split(item, ":") + files = append(files, tokens[0]) + } + } + return files +} diff --git a/pkg/kbagent/service/task.go b/pkg/kbagent/service/task.go index 063e3ec5030..21b3890eb75 100644 --- a/pkg/kbagent/service/task.go +++ b/pkg/kbagent/service/task.go @@ -114,6 +114,12 @@ func (s *taskService) newTask(task proto.Task) task { task: task.NewReplica, } } + if task.Render != nil { + return &renderTask{ + logger: s.logger, + task: task.Render, + } + } return nil } diff --git a/pkg/kbagent/service/task_render.go b/pkg/kbagent/service/task_render.go new file mode 100644 index 00000000000..6e0a4135b6d --- /dev/null +++ b/pkg/kbagent/service/task_render.go @@ -0,0 +1,150 @@ +/* +Copyright (C) 2022-2025 ApeCloud Co., Ltd + +This file is part of KubeBlocks project + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see . +*/ + +package service + +import ( + "context" + "fmt" + "os" + "path/filepath" + "strings" + "text/template" + "time" + + "github.com/Masterminds/sprig/v3" + "github.com/go-logr/logr" + + "github.com/apecloud/kubeblocks/pkg/kbagent/proto" + "github.com/apecloud/kubeblocks/pkg/kbagent/util" +) + +type renderTask struct { + logger logr.Logger + task *proto.RenderTask +} + +var _ task = &renderTask{} + +func (s *renderTask) run(ctx context.Context) (chan error, error) { + for _, tpl := range s.task.Templates { + if err := s.renderTemplate(tpl); err != nil { + return nil, err + } + } + return nil, nil +} + +func (s *renderTask) status(ctx context.Context, event *proto.TaskEvent) { +} + +func (s *renderTask) renderTemplate(tpl proto.RenderTaskFileTemplate) error { + variables := util.EnvL2M(os.Environ()) + for k, v := range tpl.Variables { + variables[k] = v // override + } + + for _, f := range tpl.Files { + if err := s.renderFile(tpl.Name, f, variables); err != nil { + return err + } + } + return nil +} + +func (s *renderTask) renderFile(name, path string, variables map[string]string) error { + data, err := s.readFile(path) + if err != nil { + return err + } + if len(data) == 0 { + return nil + } + + rendered, err := s.renderFileData(name, data, variables) + if err != nil { + return err + } + + return s.writeFile(path, rendered) +} + +func (s *renderTask) readFile(path string) (string, error) { + var ( + f *os.File + info os.FileInfo + err error + ) + + info, err = os.Stat(path) + if err != nil { + if os.IsNotExist(err) { + return "", nil + } + return "", err + } + if info.Size() == 0 { + return "", nil + } + + f, err = os.Open(path) + if err != nil { + return "", err + } + + buf := make([]byte, info.Size()+1) + _, err = f.Read(buf) + if err != nil { + return "", err + } + + if err = f.Close(); err != nil { + return "", err + } + + return string(buf), nil +} + +func (s *renderTask) writeFile(path string, data string) error { + tmpPath := filepath.Join("tmp", fmt.Sprintf("%d.tmp", time.Now().UnixMicro())) + f, err := os.Create(tmpPath) + if err != nil { + return err + } + _, err = f.Write([]byte(data)) + if err != nil { + return err + } + if err = f.Close(); err != nil { + return err + } + return os.Rename(tmpPath, path) +} + +func (s *renderTask) renderFileData(name, data string, variables map[string]string) (string, error) { + tpl, err := template.New(name).Option("missingkey=error").Funcs(sprig.TxtFuncMap()).Parse(data) + if err != nil { + return "", err + } + var buf strings.Builder + if err = tpl.Execute(&buf, variables); err != nil { + return "", err + } + return buf.String(), nil +} diff --git a/pkg/kbagent/service/task_new_replica.go b/pkg/kbagent/service/task_replica.go similarity index 100% rename from pkg/kbagent/service/task_new_replica.go rename to pkg/kbagent/service/task_replica.go