diff --git a/apis/apps/v1/componentdefinition_types.go b/apis/apps/v1/componentdefinition_types.go
index 3dae6e08817..3164dac4336 100644
--- a/apis/apps/v1/componentdefinition_types.go
+++ b/apis/apps/v1/componentdefinition_types.go
@@ -1057,6 +1057,12 @@ type ComponentFileTemplate struct {
// +optional
Namespace string `json:"namespace,omitempty"`
+ // Indicates whether the template requires rendering at the pod level to incorporate pod-specific variables.
+ //
+ // +kubebuilder:default=false
+ // +optional
+ RequiresPodRender bool `json:"requiresPodRender,omitempty"`
+
// Refers to the volume name of PodTemplate. The file produced through the template will be mounted to
// the corresponding volume. Must be a DNS_LABEL name.
// The volume name must be defined in podSpec.containers[*].volumeMounts.
diff --git a/config/crd/bases/apps.kubeblocks.io_componentdefinitions.yaml b/config/crd/bases/apps.kubeblocks.io_componentdefinitions.yaml
index ef1cbc51378..09b59bd6745 100644
--- a/config/crd/bases/apps.kubeblocks.io_componentdefinitions.yaml
+++ b/config/crd/bases/apps.kubeblocks.io_componentdefinitions.yaml
@@ -4329,6 +4329,11 @@ spec:
maxLength: 63
pattern: ^[a-z0-9]([a-z0-9\-]*[a-z0-9])?$
type: string
+ requiresPodRender:
+ default: false
+ description: Indicates whether the template requires rendering
+ at the pod level to incorporate pod-specific variables.
+ type: boolean
template:
description: Specifies the name of the referenced template ConfigMap
object.
@@ -16057,6 +16062,11 @@ spec:
maxLength: 63
pattern: ^[a-z0-9]([a-z0-9\-]*[a-z0-9])?$
type: string
+ requiresPodRender:
+ default: false
+ description: Indicates whether the template requires rendering
+ at the pod level to incorporate pod-specific variables.
+ type: boolean
template:
description: Specifies the name of the referenced template ConfigMap
object.
diff --git a/controllers/apps/component/transformer_component_reconfigure.go b/controllers/apps/component/transformer_component_reconfigure.go
index dfb80b75faf..5d655415f96 100644
--- a/controllers/apps/component/transformer_component_reconfigure.go
+++ b/controllers/apps/component/transformer_component_reconfigure.go
@@ -229,6 +229,7 @@ func (t *componentReconfigureTransformer) reconfigureReplicaTemplate(transCtx *c
return nil // disabled by the external
}
}
+ // TODO: variables, dynamic render
if tpl.Reconfigure != nil {
actionName := component.UDFReconfigureActionName(tpl)
args := lifecycle.FileTemplateChanges(changes.Created, changes.Removed, changes.Updated)
diff --git a/controllers/apps/component/transformer_component_template.go b/controllers/apps/component/transformer_component_template.go
index 8d121f63fcc..63c443fda80 100644
--- a/controllers/apps/component/transformer_component_template.go
+++ b/controllers/apps/component/transformer_component_template.go
@@ -51,6 +51,7 @@ var _ graph.Transformer = &componentFileTemplateTransformer{}
func (t *componentFileTemplateTransformer) Transform(ctx graph.TransformContext, dag *graph.DAG) error {
transCtx, _ := ctx.(*componentTransformContext)
+ synthesizedComp := transCtx.SynthesizeComponent
if model.IsObjectDeleting(transCtx.ComponentOrig) {
return nil
}
@@ -73,7 +74,14 @@ func (t *componentFileTemplateTransformer) Transform(ctx graph.TransformContext,
t.handleTemplateObjectChanges(transCtx, dag, runningObjs, protoObjs, toCreate, toDelete, toUpdate)
- return t.buildPodVolumes(transCtx)
+ if err = t.buildPodVolumes(transCtx); err != nil {
+ return err
+ }
+
+ synthesizedComp.KBAgentTasks = append(synthesizedComp.KBAgentTasks,
+ *component.NewRenderTask(synthesizedComp.FullCompName, synthesizedComp.Generation, nil, synthesizedComp, nil))
+
+ return nil
}
func (t *componentFileTemplateTransformer) precheck(transCtx *componentTransformContext) error {
@@ -234,7 +242,10 @@ func renderFileTemplateData(transCtx *componentTransformContext,
variables[k] = v // override
}
- tpl := template.New(fileTemplate.Name).Option("missingkey=error").Funcs(sprig.TxtFuncMap())
+ tpl := template.New(fileTemplate.Name).Funcs(sprig.TxtFuncMap())
+ if !fileTemplate.RequiresPodRender {
+ tpl = tpl.Option("missingkey=error")
+ }
for key, val := range data {
ptpl, err := tpl.Parse(val)
if err != nil {
diff --git a/controllers/apps/component/transformer_component_workload.go b/controllers/apps/component/transformer_component_workload.go
index 5f3ecb8f466..98e620ce73e 100644
--- a/controllers/apps/component/transformer_component_workload.go
+++ b/controllers/apps/component/transformer_component_workload.go
@@ -50,6 +50,7 @@ import (
"github.com/apecloud/kubeblocks/pkg/controller/lifecycle"
"github.com/apecloud/kubeblocks/pkg/controller/model"
intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
+ "github.com/apecloud/kubeblocks/pkg/kbagent"
)
const (
@@ -236,6 +237,10 @@ func (t *componentWorkloadTransformer) handleUpdate(reqCtx intctrlutil.RequestCt
if err := t.handleWorkloadUpdate(reqCtx, dag, synthesizedComp, comp, runningITS, protoITS); err != nil {
return err
}
+
+ if err := updateEnvCM4KBAgentTask(reqCtx.Ctx, t.Client, dag, synthesizedComp, comp); err != nil {
+ return err
+ }
}
objCopy := copyAndMergeITS(runningITS, protoITS)
@@ -250,9 +255,6 @@ func (t *componentWorkloadTransformer) handleUpdate(reqCtx intctrlutil.RequestCt
})
}
- // if start {
- // return intctrlutil.NewDelayedRequeueError(time.Second, "workload is starting")
- // }
return nil
}
@@ -411,6 +413,26 @@ func buildPodSpecVolumeMounts(synthesizeComp *component.SynthesizedComponent) {
synthesizeComp.PodSpec = podSpec
}
+func updateEnvCM4KBAgentTask(ctx context.Context, cli client.Reader, dag *graph.DAG,
+ synthesizedComp *component.SynthesizedComponent, comp *appsv1.Component) error {
+ if len(synthesizedComp.KBAgentTasks) == 0 {
+ return nil
+ }
+ envVar, err := kbagent.BuildEnv4Worker(synthesizedComp.KBAgentTasks)
+ if err != nil {
+ return err
+ }
+ // apply the updated env to the env CM
+ transCtx := &componentTransformContext{
+ Context: ctx,
+ Client: model.NewGraphClient(cli),
+ SynthesizeComponent: synthesizedComp,
+ Component: comp,
+ }
+ parameters := map[string]string{envVar.Name: envVar.Value}
+ return createOrUpdateEnvConfigMap(transCtx, dag, nil, parameters)
+}
+
// copyAndMergeITS merges two ITS objects for updating:
// 1. new an object targetObj by copying from oldObj
// 2. merge all fields can be updated from newObj into targetObj
@@ -529,32 +551,6 @@ func checkNRollbackProtoImages(itsObj, itsProto *workloads.InstanceSet) {
}
}
-// expandVolume handles workload expand volume
-func (r *componentWorkloadOps) expandVolume() error {
- return r.expandVolumeClaimTemplates(r.runningITS.Spec.VolumeClaimTemplates, r.synthesizeComp.VolumeClaimTemplates)
-}
-
-func (r *componentWorkloadOps) expandVolumeClaimTemplates(runningVCTs []corev1.PersistentVolumeClaim, protoVCTs []corev1.PersistentVolumeClaimTemplate) error {
- for _, vct := range runningVCTs {
- var proto *corev1.PersistentVolumeClaimTemplate
- for i, v := range protoVCTs {
- if v.Name == vct.Name {
- proto = &protoVCTs[i]
- break
- }
- }
- // REVIEW: seems we can remove a volume claim from templates at runtime, without any changes and warning messages?
- if proto == nil {
- continue
- }
-
- if err := r.expandVolumes(vct.Name, proto); err != nil {
- return err
- }
- }
- return nil
-}
-
func (r *componentWorkloadOps) horizontalScale() error {
var (
in = r.runningItsPodNameSet.Difference(r.desiredCompPodNameSet)
@@ -731,20 +727,11 @@ func (r *componentWorkloadOps) scaleOut() error {
}
replicas := append(slices.Clone(newReplicas), provisioningReplicas...)
- parameters, err := component.NewReplicaTask(r.synthesizeComp.FullCompName, r.synthesizeComp.Generation, source, replicas)
+ task, err := component.NewReplicaTask(r.synthesizeComp.FullCompName, r.synthesizeComp.Generation, source, replicas)
if err != nil {
return err
}
- // apply the updated env to the env CM
- transCtx := &componentTransformContext{
- Context: r.reqCtx.Ctx,
- Client: model.NewGraphClient(r.cli),
- SynthesizeComponent: r.synthesizeComp,
- Component: r.component,
- }
- if err = createOrUpdateEnvConfigMap(transCtx, r.dag, nil, parameters); err != nil {
- return err
- }
+ r.synthesizeComp.KBAgentTasks = append(r.synthesizeComp.KBAgentTasks, *task)
return nil
}(); err != nil {
return err
@@ -848,6 +835,27 @@ func (r *componentWorkloadOps) joinMemberForPod(pod *corev1.Pod, pods []*corev1.
return nil
}
+func (r *componentWorkloadOps) expandVolume() error {
+ for _, vct := range r.runningITS.Spec.VolumeClaimTemplates {
+ var proto *corev1.PersistentVolumeClaimTemplate
+ for i, v := range r.synthesizeComp.VolumeClaimTemplates {
+ if v.Name == vct.Name {
+ proto = &r.synthesizeComp.VolumeClaimTemplates[i]
+ break
+ }
+ }
+ // REVIEW: seems we can remove a volume claim from templates at runtime, without any changes and warning messages?
+ if proto == nil {
+ continue
+ }
+
+ if err := r.expandVolumes(vct.Name, proto); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
func (r *componentWorkloadOps) expandVolumes(vctName string, proto *corev1.PersistentVolumeClaimTemplate) error {
for _, pod := range r.runningItsPodNames {
pvc := &corev1.PersistentVolumeClaim{}
diff --git a/deploy/helm/crds/apps.kubeblocks.io_componentdefinitions.yaml b/deploy/helm/crds/apps.kubeblocks.io_componentdefinitions.yaml
index ef1cbc51378..09b59bd6745 100644
--- a/deploy/helm/crds/apps.kubeblocks.io_componentdefinitions.yaml
+++ b/deploy/helm/crds/apps.kubeblocks.io_componentdefinitions.yaml
@@ -4329,6 +4329,11 @@ spec:
maxLength: 63
pattern: ^[a-z0-9]([a-z0-9\-]*[a-z0-9])?$
type: string
+ requiresPodRender:
+ default: false
+ description: Indicates whether the template requires rendering
+ at the pod level to incorporate pod-specific variables.
+ type: boolean
template:
description: Specifies the name of the referenced template ConfigMap
object.
@@ -16057,6 +16062,11 @@ spec:
maxLength: 63
pattern: ^[a-z0-9]([a-z0-9\-]*[a-z0-9])?$
type: string
+ requiresPodRender:
+ default: false
+ description: Indicates whether the template requires rendering
+ at the pod level to incorporate pod-specific variables.
+ type: boolean
template:
description: Specifies the name of the referenced template ConfigMap
object.
diff --git a/docs/developer_docs/api-reference/cluster.md b/docs/developer_docs/api-reference/cluster.md
index 3e96fc62ee3..2b4468d81f8 100644
--- a/docs/developer_docs/api-reference/cluster.md
+++ b/docs/developer_docs/api-reference/cluster.md
@@ -5622,6 +5622,18 @@ string
+
volumeName
string
diff --git a/pkg/controller/component/kbagent_task.go b/pkg/controller/component/kbagent_task.go
new file mode 100644
index 00000000000..1b28f02585b
--- /dev/null
+++ b/pkg/controller/component/kbagent_task.go
@@ -0,0 +1,244 @@
+/*
+Copyright (C) 2022-2025 ApeCloud Co., Ltd
+
+This file is part of KubeBlocks project
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU Affero General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU Affero General Public License for more details.
+
+You should have received a copy of the GNU Affero General Public License
+along with this program. If not, see .
+*/
+
+package component
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "slices"
+ "strings"
+
+ "github.com/go-logr/logr"
+ corev1 "k8s.io/api/core/v1"
+ "k8s.io/apimachinery/pkg/types"
+ "k8s.io/client-go/tools/record"
+ "k8s.io/utils/ptr"
+ "sigs.k8s.io/controller-runtime/pkg/client"
+
+ workloads "github.com/apecloud/kubeblocks/apis/workloads/v1"
+ "github.com/apecloud/kubeblocks/pkg/constant"
+ intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
+ "github.com/apecloud/kubeblocks/pkg/kbagent"
+ "github.com/apecloud/kubeblocks/pkg/kbagent/proto"
+)
+
+const (
+ // new replicas task & event
+ newReplicaTask = "newReplica"
+ defaultNewReplicaTaskReportPeriodSeconds = 60
+
+ // render task
+ renderTask = "render"
+)
+
+func NewReplicaTask(compName, uid string, source *corev1.Pod, replicas []string) (*proto.Task, error) {
+ port, err := intctrlutil.GetPortByName(*source, kbagent.ContainerName, kbagent.DefaultStreamingPortName)
+ if err != nil {
+ return nil, err
+ }
+ return &proto.Task{
+ Instance: compName,
+ Task: newReplicaTask,
+ UID: uid,
+ Replicas: strings.Join(replicas, ","),
+ NotifyAtFinish: true,
+ ReportPeriodSeconds: defaultNewReplicaTaskReportPeriodSeconds,
+ NewReplica: &proto.NewReplicaTask{
+ Remote: intctrlutil.PodFQDN(source.Namespace, compName, source.Name),
+ Port: port,
+ Replicas: strings.Join(replicas, ","),
+ },
+ }, nil
+}
+
+func NewRenderTask(compName, uid string, replicas []string, synthesizedComp *SynthesizedComponent, files map[string][]string) *proto.Task {
+ task := proto.Task{
+ Instance: compName,
+ Task: renderTask,
+ UID: uid,
+ Replicas: strings.Join(replicas, ","),
+ NotifyAtFinish: false,
+ ReportPeriodSeconds: 0,
+ Render: &proto.RenderTask{
+ Templates: []proto.RenderTaskFileTemplate{},
+ },
+ }
+ for _, tpl := range synthesizedComp.FileTemplates {
+ if tpl.RequiresPodRender {
+ task.Render.Templates = append(task.Render.Templates, proto.RenderTaskFileTemplate{
+ Name: tpl.Name,
+ Files: templateFiles(synthesizedComp, tpl.Name, files[tpl.Name]),
+ Variables: tpl.Variables,
+ })
+ }
+ }
+ if len(task.Render.Templates) > 0 {
+ return &task
+ }
+ return nil
+}
+
+type KBAgentTaskEventHandler struct{}
+
+func (h *KBAgentTaskEventHandler) Handle(cli client.Client, reqCtx intctrlutil.RequestCtx, recorder record.EventRecorder, event *corev1.Event) error {
+ if !h.isTaskEvent(event) {
+ return nil
+ }
+
+ taskEvent := &proto.TaskEvent{}
+ if err := json.Unmarshal([]byte(event.Message), taskEvent); err != nil {
+ return err
+ }
+
+ return h.handleEvent(reqCtx, cli, event.InvolvedObject.Namespace, *taskEvent)
+}
+
+func (h *KBAgentTaskEventHandler) isTaskEvent(event *corev1.Event) bool {
+ return event.ReportingController == proto.ProbeEventReportingController &&
+ event.Reason == "task" && event.InvolvedObject.FieldPath == proto.ProbeEventFieldPath
+}
+
+func (h *KBAgentTaskEventHandler) handleEvent(reqCtx intctrlutil.RequestCtx, cli client.Client, namespace string, event proto.TaskEvent) error {
+ if event.Task == newReplicaTask {
+ return handleNewReplicaTaskEvent(reqCtx.Log, reqCtx.Ctx, cli, namespace, event)
+ }
+ return fmt.Errorf("unsupported kind of task event: %s", event.Task)
+}
+
+func handleNewReplicaTaskEvent(logger logr.Logger, ctx context.Context, cli client.Client, namespace string, event proto.TaskEvent) error {
+ key := types.NamespacedName{
+ Namespace: namespace,
+ Name: event.Instance,
+ }
+ its := &workloads.InstanceSet{}
+ if err := cli.Get(ctx, key, its); err != nil {
+ logger.Error(err, "get ITS failed when handle new replica task event",
+ "code", event.Code, "finished", !event.EndTime.IsZero(), "message", event.Message)
+ return err
+ }
+
+ var err error
+ finished := !event.EndTime.IsZero()
+ switch {
+ case finished && event.Code == 0:
+ err = handleNewReplicaTaskEvent4Finished(ctx, cli, its, event)
+ case finished:
+ err = handleNewReplicaTaskEvent4Failed(ctx, cli, its, event)
+ default:
+ err = handleNewReplicaTaskEvent4Unfinished(ctx, cli, its, event)
+ }
+ if err != nil {
+ logger.Error(err, "handle new replica task event failed",
+ "code", event.Code, "finished", finished, "message", event.Message)
+ } else {
+ logger.Info("handle new replica task event success",
+ "code", event.Code, "finished", finished, "message", event.Message)
+ }
+ return err
+}
+
+func handleNewReplicaTaskEvent4Finished(ctx context.Context, cli client.Client, its *workloads.InstanceSet, event proto.TaskEvent) error {
+ if err := func() error {
+ envKey := types.NamespacedName{
+ Namespace: its.Namespace,
+ Name: constant.GetCompEnvCMName(its.Name),
+ }
+ obj := &corev1.ConfigMap{}
+ err := cli.Get(ctx, envKey, obj, inDataContext())
+ if err != nil {
+ return err
+ }
+
+ parameters, err := updateKBAgentTaskEnv(obj.Data, func(task proto.Task) *proto.Task {
+ if task.Task == newReplicaTask {
+ replicas := strings.Split(task.Replicas, ",")
+ replicas = slices.DeleteFunc(replicas, func(r string) bool {
+ return r == event.Replica
+ })
+ if len(replicas) == 0 {
+ return nil
+ }
+ task.Replicas = strings.Join(replicas, ",")
+ if task.NewReplica != nil {
+ task.NewReplica.Replicas = task.Replicas
+ }
+ }
+ return &task
+ })
+ if err != nil {
+ return err
+ }
+ if parameters == nil {
+ return nil // do nothing
+ }
+
+ if obj.Data == nil {
+ obj.Data = make(map[string]string)
+ }
+ for k, v := range parameters {
+ obj.Data[k] = v
+ }
+ return cli.Update(ctx, obj, inDataContext())
+ }(); err != nil {
+ return err
+ }
+ return updateReplicaStatusFunc(ctx, cli, its, event.Replica, func(status *ReplicaStatus) error {
+ status.Message = ""
+ status.Provisioned = true
+ status.DataLoaded = ptr.To(true)
+ return nil
+ })
+}
+
+func handleNewReplicaTaskEvent4Unfinished(ctx context.Context, cli client.Client, its *workloads.InstanceSet, event proto.TaskEvent) error {
+ return updateReplicaStatusFunc(ctx, cli, its, event.Replica, func(status *ReplicaStatus) error {
+ status.Message = event.Message
+ status.Provisioned = true
+ status.DataLoaded = ptr.To(false)
+ return nil
+ })
+}
+
+func handleNewReplicaTaskEvent4Failed(ctx context.Context, cli client.Client, its *workloads.InstanceSet, event proto.TaskEvent) error {
+ return updateReplicaStatusFunc(ctx, cli, its, event.Replica, func(status *ReplicaStatus) error {
+ status.Message = event.Message
+ status.Provisioned = true
+ return nil
+ })
+}
+
+func updateReplicaStatusFunc(ctx context.Context, cli client.Client,
+ its *workloads.InstanceSet, replicaName string, f func(*ReplicaStatus) error) error {
+ if err := UpdateReplicasStatusFunc(its, func(status *ReplicasStatus) error {
+ for i := range status.Status {
+ if status.Status[i].Name == replicaName {
+ if f != nil {
+ return f(&status.Status[i])
+ }
+ return nil
+ }
+ }
+ return fmt.Errorf("replica %s not found", replicaName)
+ }); err != nil {
+ return err
+ }
+ return cli.Update(ctx, its)
+}
diff --git a/pkg/controller/component/kbagent_task_event.go b/pkg/controller/component/kbagent_task_event.go
deleted file mode 100644
index d48e576c3d0..00000000000
--- a/pkg/controller/component/kbagent_task_event.go
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
-Copyright (C) 2022-2025 ApeCloud Co., Ltd
-
-This file is part of KubeBlocks project
-
-This program is free software: you can redistribute it and/or modify
-it under the terms of the GNU Affero General Public License as published by
-the Free Software Foundation, either version 3 of the License, or
-(at your option) any later version.
-
-This program is distributed in the hope that it will be useful
-but WITHOUT ANY WARRANTY; without even the implied warranty of
-MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-GNU Affero General Public License for more details.
-
-You should have received a copy of the GNU Affero General Public License
-along with this program. If not, see .
-*/
-
-package component
-
-import (
- "encoding/json"
- "fmt"
-
- corev1 "k8s.io/api/core/v1"
- "k8s.io/client-go/tools/record"
- "sigs.k8s.io/controller-runtime/pkg/client"
-
- intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
- "github.com/apecloud/kubeblocks/pkg/kbagent/proto"
-)
-
-type KBAgentTaskEventHandler struct{}
-
-func (h *KBAgentTaskEventHandler) Handle(cli client.Client, reqCtx intctrlutil.RequestCtx, recorder record.EventRecorder, event *corev1.Event) error {
- if !h.isTaskEvent(event) {
- return nil
- }
-
- taskEvent := &proto.TaskEvent{}
- if err := json.Unmarshal([]byte(event.Message), taskEvent); err != nil {
- return err
- }
-
- return h.handleEvent(reqCtx, cli, event.InvolvedObject.Namespace, *taskEvent)
-}
-
-func (h *KBAgentTaskEventHandler) isTaskEvent(event *corev1.Event) bool {
- return event.ReportingController == proto.ProbeEventReportingController &&
- event.Reason == "task" && event.InvolvedObject.FieldPath == proto.ProbeEventFieldPath
-}
-
-func (h *KBAgentTaskEventHandler) handleEvent(reqCtx intctrlutil.RequestCtx, cli client.Client, namespace string, event proto.TaskEvent) error {
- if event.Task == newReplicaTask {
- return handleNewReplicaTaskEvent(reqCtx.Log, reqCtx.Ctx, cli, namespace, event)
- }
- return fmt.Errorf("unsupported kind of task event: %s", event.Task)
-}
diff --git a/pkg/controller/component/replicas.go b/pkg/controller/component/replicas.go
index f8b370366de..57d8bd8dd2d 100644
--- a/pkg/controller/component/replicas.go
+++ b/pkg/controller/component/replicas.go
@@ -20,32 +20,18 @@ along with this program. If not, see .
package component
import (
- "context"
"encoding/json"
- "fmt"
"slices"
- "strings"
"time"
- "github.com/go-logr/logr"
- corev1 "k8s.io/api/core/v1"
- "k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
- "sigs.k8s.io/controller-runtime/pkg/client"
workloads "github.com/apecloud/kubeblocks/apis/workloads/v1"
"github.com/apecloud/kubeblocks/pkg/constant"
- intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
- "github.com/apecloud/kubeblocks/pkg/kbagent"
- "github.com/apecloud/kubeblocks/pkg/kbagent/proto"
)
const (
replicaStatusAnnotationKey = "apps.kubeblocks.io/replicas-status"
-
- // new replicas task & event
- newReplicaTask = "newReplica"
- defaultNewReplicaTaskReportPeriodSeconds = 60
)
type ReplicasStatus struct {
@@ -209,27 +195,6 @@ func GetReplicasStatusFunc(its *workloads.InstanceSet, f func(ReplicaStatus) boo
return replicas, nil
}
-func NewReplicaTask(compName, uid string, source *corev1.Pod, replicas []string) (map[string]string, error) {
- port, err := intctrlutil.GetPortByName(*source, kbagent.ContainerName, kbagent.DefaultStreamingPortName)
- if err != nil {
- return nil, err
- }
- task := proto.Task{
- Instance: compName,
- Task: newReplicaTask,
- UID: uid,
- Replicas: strings.Join(replicas, ","),
- NotifyAtFinish: true,
- ReportPeriodSeconds: defaultNewReplicaTaskReportPeriodSeconds,
- NewReplica: &proto.NewReplicaTask{
- Remote: intctrlutil.PodFQDN(source.Namespace, compName, source.Name),
- Port: port,
- Replicas: strings.Join(replicas, ","),
- },
- }
- return buildKBAgentTaskEnv(task)
-}
-
func compGenerationFromITS(its *workloads.InstanceSet) string {
if its == nil {
return ""
@@ -277,123 +242,3 @@ func setReplicasStatus(its *workloads.InstanceSet, status ReplicasStatus) error
its.SetAnnotations(annotations)
return nil
}
-
-func handleNewReplicaTaskEvent(logger logr.Logger, ctx context.Context, cli client.Client, namespace string, event proto.TaskEvent) error {
- key := types.NamespacedName{
- Namespace: namespace,
- Name: event.Instance,
- }
- its := &workloads.InstanceSet{}
- if err := cli.Get(ctx, key, its); err != nil {
- logger.Error(err, "get ITS failed when handle new replica task event",
- "code", event.Code, "finished", !event.EndTime.IsZero(), "message", event.Message)
- return err
- }
-
- var err error
- finished := !event.EndTime.IsZero()
- switch {
- case finished && event.Code == 0:
- err = handleNewReplicaTaskEvent4Finished(ctx, cli, its, event)
- case finished:
- err = handleNewReplicaTaskEvent4Failed(ctx, cli, its, event)
- default:
- err = handleNewReplicaTaskEvent4Unfinished(ctx, cli, its, event)
- }
- if err != nil {
- logger.Error(err, "handle new replica task event failed",
- "code", event.Code, "finished", finished, "message", event.Message)
- } else {
- logger.Info("handle new replica task event success",
- "code", event.Code, "finished", finished, "message", event.Message)
- }
- return err
-}
-
-func handleNewReplicaTaskEvent4Finished(ctx context.Context, cli client.Client, its *workloads.InstanceSet, event proto.TaskEvent) error {
- if err := func() error {
- envKey := types.NamespacedName{
- Namespace: its.Namespace,
- Name: constant.GetCompEnvCMName(its.Name),
- }
- obj := &corev1.ConfigMap{}
- err := cli.Get(ctx, envKey, obj, inDataContext())
- if err != nil {
- return err
- }
-
- parameters, err := updateKBAgentTaskEnv(obj.Data, func(task proto.Task) *proto.Task {
- if task.Task == newReplicaTask {
- replicas := strings.Split(task.Replicas, ",")
- replicas = slices.DeleteFunc(replicas, func(r string) bool {
- return r == event.Replica
- })
- if len(replicas) == 0 {
- return nil
- }
- task.Replicas = strings.Join(replicas, ",")
- if task.NewReplica != nil {
- task.NewReplica.Replicas = task.Replicas
- }
- }
- return &task
- })
- if err != nil {
- return err
- }
- if parameters == nil {
- return nil // do nothing
- }
-
- if obj.Data == nil {
- obj.Data = make(map[string]string)
- }
- for k, v := range parameters {
- obj.Data[k] = v
- }
- return cli.Update(ctx, obj, inDataContext())
- }(); err != nil {
- return err
- }
- return updateReplicaStatusFunc(ctx, cli, its, event.Replica, func(status *ReplicaStatus) error {
- status.Message = ""
- status.Provisioned = true
- status.DataLoaded = ptr.To(true)
- return nil
- })
-}
-
-func handleNewReplicaTaskEvent4Unfinished(ctx context.Context, cli client.Client, its *workloads.InstanceSet, event proto.TaskEvent) error {
- return updateReplicaStatusFunc(ctx, cli, its, event.Replica, func(status *ReplicaStatus) error {
- status.Message = event.Message
- status.Provisioned = true
- status.DataLoaded = ptr.To(false)
- return nil
- })
-}
-
-func handleNewReplicaTaskEvent4Failed(ctx context.Context, cli client.Client, its *workloads.InstanceSet, event proto.TaskEvent) error {
- return updateReplicaStatusFunc(ctx, cli, its, event.Replica, func(status *ReplicaStatus) error {
- status.Message = event.Message
- status.Provisioned = true
- return nil
- })
-}
-
-func updateReplicaStatusFunc(ctx context.Context, cli client.Client,
- its *workloads.InstanceSet, replicaName string, f func(*ReplicaStatus) error) error {
- if err := UpdateReplicasStatusFunc(its, func(status *ReplicasStatus) error {
- for i := range status.Status {
- if status.Status[i].Name == replicaName {
- if f != nil {
- return f(&status.Status[i])
- }
- return nil
- }
- }
- return fmt.Errorf("replica %s not found", replicaName)
- }); err != nil {
- return err
- }
- return cli.Update(ctx, its)
-}
diff --git a/pkg/controller/component/synthesize_component.go b/pkg/controller/component/synthesize_component.go
index 2ce7b7baef3..b3b64567003 100644
--- a/pkg/controller/component/synthesize_component.go
+++ b/pkg/controller/component/synthesize_component.go
@@ -36,6 +36,7 @@ import (
"github.com/apecloud/kubeblocks/pkg/constant"
intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
"github.com/apecloud/kubeblocks/pkg/generics"
+ "github.com/apecloud/kubeblocks/pkg/kbagent/proto"
viper "github.com/apecloud/kubeblocks/pkg/viperx"
)
@@ -118,6 +119,7 @@ func BuildSynthesizedComponent(ctx context.Context, cli client.Reader,
PodUpdatePolicy: comp.Spec.PodUpdatePolicy,
UpdateStrategy: compDef.Spec.UpdateStrategy,
InstanceUpdateStrategy: comp.Spec.InstanceUpdateStrategy,
+ KBAgentTasks: make([]proto.Task, 0),
}
if err = mergeUserDefinedEnv(synthesizeComp, comp); err != nil {
diff --git a/pkg/controller/component/template.go b/pkg/controller/component/template.go
new file mode 100644
index 00000000000..ab8fac50921
--- /dev/null
+++ b/pkg/controller/component/template.go
@@ -0,0 +1,65 @@
+/*
+Copyright (C) 2022-2025 ApeCloud Co., Ltd
+
+This file is part of KubeBlocks project
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU Affero General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU Affero General Public License for more details.
+
+You should have received a copy of the GNU Affero General Public License
+along with this program. If not, see .
+*/
+
+package component
+
+import (
+ "path/filepath"
+)
+
+func templateFiles(synthesizedComp *SynthesizedComponent, tpl string, files []string) []string {
+ result := make([]string, 0)
+ for _, f := range files {
+ fullPath := absoluteTemplateFilePath(synthesizedComp, tpl, f)
+ if len(fullPath) > 0 {
+ result = append(result, fullPath)
+ }
+ }
+ return result
+}
+
+func absoluteTemplateFilePath(synthesizedComp *SynthesizedComponent, tpl, file string) string {
+ var volName, mountPath string
+ for _, fileTpl := range synthesizedComp.FileTemplates {
+ if fileTpl.Name == tpl {
+ volName = fileTpl.VolumeName
+ break
+ }
+ }
+ if volName == "" {
+ return "" // has no volumes specified
+ }
+
+ for _, container := range synthesizedComp.PodSpec.Containers {
+ for _, mount := range container.VolumeMounts {
+ if mount.Name == volName {
+ mountPath = mount.MountPath
+ break
+ }
+ }
+ if mountPath != "" {
+ break
+ }
+ }
+ if mountPath == "" {
+ return "" // the template is not mounted, ignore it
+ }
+
+ return filepath.Join(mountPath, file)
+}
diff --git a/pkg/controller/component/type.go b/pkg/controller/component/type.go
index 9e718dcc686..d3d30e71728 100644
--- a/pkg/controller/component/type.go
+++ b/pkg/controller/component/type.go
@@ -26,6 +26,7 @@ import (
"k8s.io/apimachinery/pkg/util/intstr"
kbappsv1 "github.com/apecloud/kubeblocks/apis/apps/v1"
+ "github.com/apecloud/kubeblocks/pkg/kbagent/proto"
)
type SynthesizedComponent struct {
@@ -79,6 +80,7 @@ type SynthesizedComponent struct {
MinReadySeconds int32 `json:"minReadySeconds,omitempty"`
DisableExporter *bool `json:"disableExporter,omitempty"`
Stop *bool
+ KBAgentTasks []proto.Task
}
type SynthesizedFileTemplate struct {
diff --git a/pkg/kbagent/proto/proto.go b/pkg/kbagent/proto/proto.go
index c0778352946..0813dc1a453 100644
--- a/pkg/kbagent/proto/proto.go
+++ b/pkg/kbagent/proto/proto.go
@@ -88,6 +88,7 @@ type Task struct {
NotifyAtFinish bool `json:"notifyAtFinish,omitempty"` // whether to notify the controller when the task is finished
ReportPeriodSeconds int32 `json:"reportPeriodSeconds,omitempty"` // the period to report the progress of the task
NewReplica *NewReplicaTask `json:"newReplica,omitempty"`
+ Render *RenderTask `json:"render,omitempty"`
}
type TaskEvent struct {
@@ -109,3 +110,13 @@ type NewReplicaTask struct {
Parameters map[string]string `json:"parameters,omitempty"` // parameters for data dump and load
TimeoutSeconds *int32 `json:"timeoutSeconds,omitempty"`
}
+
+type RenderTask struct {
+ Templates []RenderTaskFileTemplate `json:"templates"`
+}
+
+type RenderTaskFileTemplate struct {
+ Name string `json:"name"`
+ Files []string `json:"files"`
+ Variables map[string]string `json:"variables,omitempty"`
+}
diff --git a/pkg/kbagent/service/action.go b/pkg/kbagent/service/action.go
index db5a5d81ebc..5520bc14e4d 100644
--- a/pkg/kbagent/service/action.go
+++ b/pkg/kbagent/service/action.go
@@ -120,13 +120,20 @@ func (s *actionService) handleRequest(ctx context.Context, req *proto.ActionRequ
if action.Exec == nil {
return nil, errors.Wrap(proto.ErrNotImplemented, "only exec action is supported")
}
- // HACK: pre-check for the reconfigure action
- if err := checkReconfigure(ctx, req); err != nil {
+ if err := s.preExec(ctx, req, action); err != nil {
return nil, err
}
return s.handleExecAction(ctx, req, action)
}
+func (s *actionService) preExec(ctx context.Context, req *proto.ActionRequest, action *proto.Action) error {
+ // HACK: pre-check for the reconfigure action
+ if err := checkReconfigure(ctx, req); err != nil {
+ return err
+ }
+ return nil
+}
+
func (s *actionService) handleExecAction(ctx context.Context, req *proto.ActionRequest, action *proto.Action) ([]byte, error) {
if req.NonBlocking == nil || !*req.NonBlocking {
return runCommand(ctx, action.Exec, req.Parameters, req.TimeoutSeconds)
diff --git a/pkg/kbagent/service/reconfigure.go b/pkg/kbagent/service/reconfigure.go
index c7d0d22a1af..199e6594f39 100644
--- a/pkg/kbagent/service/reconfigure.go
+++ b/pkg/kbagent/service/reconfigure.go
@@ -37,8 +37,8 @@ const (
configFilesUpdated = "KB_CONFIG_FILES_UPDATED"
)
-func checkReconfigure(_ context.Context, req *proto.ActionRequest) error {
- if req.Action != "reconfigure" && !strings.HasPrefix(req.Action, "udf-reconfigure") {
+func checkReconfigure(ctx context.Context, req *proto.ActionRequest) error {
+ if !isReconfigureRequest(req) {
return nil
}
@@ -51,7 +51,12 @@ func checkReconfigure(_ context.Context, req *proto.ActionRequest) error {
if err := checkReconfigureUpdated(req); err != nil {
return err
}
- return nil
+
+ return reconfigurePostRender(ctx, req)
+}
+
+func isReconfigureRequest(req *proto.ActionRequest) bool {
+ return req.Action == "reconfigure" || strings.HasPrefix(req.Action, "udf-reconfigure")
}
func checkReconfigureCreated(req *proto.ActionRequest) error {
@@ -128,3 +133,50 @@ func checkLocalFileUpToDate(file, checksum string) error {
}
return nil
}
+
+func reconfigurePostRender(ctx context.Context, req *proto.ActionRequest) error {
+ files := reconfigurePostRenderFiles(req)
+ if len(files) == 0 {
+ return nil
+ }
+
+ render := renderTask{
+ task: &proto.RenderTask{
+ Templates: []proto.RenderTaskFileTemplate{
+ {
+ Name: "reconfigure", // TODO: tpl name
+ Files: files,
+ Variables: req.Parameters,
+ },
+ },
+ },
+ }
+ ch, err := render.run(ctx)
+ if err != nil {
+ return err
+ }
+ if ch != nil {
+ err1, ok := <-ch
+ if !ok {
+ err1 = errors.New("runtime error: error chan closed unexpectedly")
+ }
+ return err1
+ }
+ return nil
+}
+
+func reconfigurePostRenderFiles(req *proto.ActionRequest) []string {
+ files := make([]string, 0)
+ created := req.Parameters[configFilesCreated]
+ updated := req.Parameters[configFilesUpdated]
+ if len(created) > 0 {
+ files = append(files, strings.Split(created, ",")...)
+ }
+ if len(updated) > 0 {
+ for _, item := range strings.Split(updated, ",") {
+ tokens := strings.Split(item, ":")
+ files = append(files, tokens[0])
+ }
+ }
+ return files
+}
diff --git a/pkg/kbagent/service/task.go b/pkg/kbagent/service/task.go
index 063e3ec5030..21b3890eb75 100644
--- a/pkg/kbagent/service/task.go
+++ b/pkg/kbagent/service/task.go
@@ -114,6 +114,12 @@ func (s *taskService) newTask(task proto.Task) task {
task: task.NewReplica,
}
}
+ if task.Render != nil {
+ return &renderTask{
+ logger: s.logger,
+ task: task.Render,
+ }
+ }
return nil
}
diff --git a/pkg/kbagent/service/task_render.go b/pkg/kbagent/service/task_render.go
new file mode 100644
index 00000000000..6e0a4135b6d
--- /dev/null
+++ b/pkg/kbagent/service/task_render.go
@@ -0,0 +1,150 @@
+/*
+Copyright (C) 2022-2025 ApeCloud Co., Ltd
+
+This file is part of KubeBlocks project
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU Affero General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU Affero General Public License for more details.
+
+You should have received a copy of the GNU Affero General Public License
+along with this program. If not, see .
+*/
+
+package service
+
+import (
+ "context"
+ "fmt"
+ "os"
+ "path/filepath"
+ "strings"
+ "text/template"
+ "time"
+
+ "github.com/Masterminds/sprig/v3"
+ "github.com/go-logr/logr"
+
+ "github.com/apecloud/kubeblocks/pkg/kbagent/proto"
+ "github.com/apecloud/kubeblocks/pkg/kbagent/util"
+)
+
+type renderTask struct {
+ logger logr.Logger
+ task *proto.RenderTask
+}
+
+var _ task = &renderTask{}
+
+func (s *renderTask) run(ctx context.Context) (chan error, error) {
+ for _, tpl := range s.task.Templates {
+ if err := s.renderTemplate(tpl); err != nil {
+ return nil, err
+ }
+ }
+ return nil, nil
+}
+
+func (s *renderTask) status(ctx context.Context, event *proto.TaskEvent) {
+}
+
+func (s *renderTask) renderTemplate(tpl proto.RenderTaskFileTemplate) error {
+ variables := util.EnvL2M(os.Environ())
+ for k, v := range tpl.Variables {
+ variables[k] = v // override
+ }
+
+ for _, f := range tpl.Files {
+ if err := s.renderFile(tpl.Name, f, variables); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func (s *renderTask) renderFile(name, path string, variables map[string]string) error {
+ data, err := s.readFile(path)
+ if err != nil {
+ return err
+ }
+ if len(data) == 0 {
+ return nil
+ }
+
+ rendered, err := s.renderFileData(name, data, variables)
+ if err != nil {
+ return err
+ }
+
+ return s.writeFile(path, rendered)
+}
+
+func (s *renderTask) readFile(path string) (string, error) {
+ var (
+ f *os.File
+ info os.FileInfo
+ err error
+ )
+
+ info, err = os.Stat(path)
+ if err != nil {
+ if os.IsNotExist(err) {
+ return "", nil
+ }
+ return "", err
+ }
+ if info.Size() == 0 {
+ return "", nil
+ }
+
+ f, err = os.Open(path)
+ if err != nil {
+ return "", err
+ }
+
+ buf := make([]byte, info.Size()+1)
+ _, err = f.Read(buf)
+ if err != nil {
+ return "", err
+ }
+
+ if err = f.Close(); err != nil {
+ return "", err
+ }
+
+ return string(buf), nil
+}
+
+func (s *renderTask) writeFile(path string, data string) error {
+ tmpPath := filepath.Join("tmp", fmt.Sprintf("%d.tmp", time.Now().UnixMicro()))
+ f, err := os.Create(tmpPath)
+ if err != nil {
+ return err
+ }
+ _, err = f.Write([]byte(data))
+ if err != nil {
+ return err
+ }
+ if err = f.Close(); err != nil {
+ return err
+ }
+ return os.Rename(tmpPath, path)
+}
+
+func (s *renderTask) renderFileData(name, data string, variables map[string]string) (string, error) {
+ tpl, err := template.New(name).Option("missingkey=error").Funcs(sprig.TxtFuncMap()).Parse(data)
+ if err != nil {
+ return "", err
+ }
+ var buf strings.Builder
+ if err = tpl.Execute(&buf, variables); err != nil {
+ return "", err
+ }
+ return buf.String(), nil
+}
diff --git a/pkg/kbagent/service/task_new_replica.go b/pkg/kbagent/service/task_replica.go
similarity index 100%
rename from pkg/kbagent/service/task_new_replica.go
rename to pkg/kbagent/service/task_replica.go
|