Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 33 additions & 103 deletions internal/operator/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (impl LifecycleImplementation) GetCapabilities(
Type: lifecycle.OperatorOperationType_TYPE_CREATE,
},
{
Type: lifecycle.OperatorOperationType_TYPE_PATCH,
Type: lifecycle.OperatorOperationType_TYPE_EVALUATE,
},
},
},
Expand Down Expand Up @@ -97,75 +97,14 @@ func (impl LifecycleImplementation) LifecycleHook(
}
switch kind {
case "Pod":
podName := "postgres"
env, _ := consolidateEnvVar(&cluster, request, podName)
return impl.reconcilePod(ctx, &cluster, request, env, pluginConfig)
return impl.reconcilePod(ctx, &cluster, request, pluginConfig)
case "Job":
env := staticEnvVarConfig()
return impl.reconcileJob(ctx, &cluster, request, env)
return impl.reconcileJob(ctx, &cluster, request)
default:
return nil, fmt.Errorf("unsupported kind: %s", kind)
}
}

func staticEnvVarConfig() []corev1.EnvVar {
return []corev1.EnvVar{
{Name: "PGBACKREST_pg1-path", Value: "/var/lib/postgresql/data/pgdata"},
}
}

func consolidateEnvVar(
cluster *cnpgv1.Cluster,
request *lifecycle.OperatorLifecycleRequest,
srcContainerName string,
) ([]corev1.EnvVar, error) {

// get pod definition, we will use it to retrieve environment variables set on a specific (srcContainerName)
// container)
pod, err := decoder.DecodePodJSON(request.GetObjectDefinition())
if err != nil {
return nil, err
}
envs := []corev1.EnvVar{
{Name: "CLUSTER_NAME", Value: cluster.Name},
{Name: "NAMESPACE", Value: cluster.Namespace},
}
envs = append(envs, staticEnvVarConfig()...)
envs = append(envs, envFromContainer(srcContainerName, pod.Spec, envs)...)
return envs, nil
}

func envFromContainer(
srcContainer string,
p corev1.PodSpec,
srcEnvs []corev1.EnvVar,
) []corev1.EnvVar {
var envs []corev1.EnvVar
// first retrieve the container
var c corev1.Container
found := false
for _, c = range p.Containers {
if c.Name == srcContainer {
found = true
break
}
}
if !found {
return envs
}
existing := make(map[string]struct{}, len(srcEnvs))
for _, e := range srcEnvs {
existing[e.Name] = struct{}{}
}
// then merge the env var from it
for _, e := range c.Env {
if _, found := existing[e.Name]; !found {
envs = append(envs, e)
}
}
return envs
}

// getCNPGJobRole gets the role associated to a CNPG job
func getCNPGJobRole(job *batchv1.Job) string {
const jobRoleLabelSuffix = "/jobRole"
Expand All @@ -181,7 +120,6 @@ func (impl LifecycleImplementation) reconcileJob(
ctx context.Context,
cluster *cnpgv1.Cluster,
request *lifecycle.OperatorLifecycleRequest,
env []corev1.EnvVar,
) (*lifecycle.OperatorLifecycleResponse, error) {
logger := log.FromContext(ctx).WithName("lifecycle")

Expand Down Expand Up @@ -210,7 +148,7 @@ func (impl LifecycleImplementation) reconcileJob(
mutatedJob := job.DeepCopy()
podSpec := &mutatedJob.Spec.Template.Spec

sidecarContainer := &corev1.Container{Env: env, Args: []string{"restore"}}
sidecarContainer := &corev1.Container{Args: []string{"restore"}}

reconcilePodSpec(cluster, podSpec, role, sidecarContainer)

Expand Down Expand Up @@ -253,18 +191,6 @@ func reconcilePodSpec(
mainContainerName string,
containerConfig *corev1.Container,
) {
// Merge cluster defaults and main container envs
defaultEnv := []corev1.EnvVar{
{Name: "NAMESPACE", Value: cluster.Namespace},
{Name: "CLUSTER_NAME", Value: cluster.Name},
}
var mainEnv []corev1.EnvVar
for _, c := range spec.Containers {
if c.Name == mainContainerName {
mainEnv = c.Env
break
}
}
baseProbe := &corev1.Probe{
FailureThreshold: 10,
TimeoutSeconds: 10,
Expand Down Expand Up @@ -298,12 +224,39 @@ func reconcilePodSpec(
Drop: []corev1.Capability{"ALL"},
},
}
containerConfig.Env = mergeEnvs(containerConfig.Env, mainEnv, defaultEnv)
containerConfig.Env = envFromContainer(mainContainerName, spec, containerConfig.Env)
containerConfig.StartupProbe = baseProbe.DeepCopy()
containerConfig.RestartPolicy = ptr.To(corev1.ContainerRestartPolicyAlways)
object.InjectPluginVolumeSpec(spec)
}

func envFromContainer(
srcContainerName string,
srcPod *corev1.PodSpec,
destEnvVar []corev1.EnvVar,
) []corev1.EnvVar {
var env []corev1.EnvVar
existing := make(map[string]struct{}, len(destEnvVar))
for _, d := range destEnvVar {
existing[d.Name] = struct{}{}
}
var oriContainer *corev1.Container
for i := range srcPod.Containers {
if srcPod.Containers[i].Name == srcContainerName {
oriContainer = &srcPod.Containers[i]
break
}
}
if oriContainer != nil {
for _, srcEnv := range oriContainer.Env {
if _, ok := existing[srcEnv.Name]; !ok {
env = append(env, srcEnv)
}
}
}
return env
}

// injects the plugin volume (/plugin) into a CNPG Pod spec.
func injectPluginVolumeMount(spec *corev1.PodSpec, mainContainerName string) {
const (
Expand Down Expand Up @@ -388,25 +341,6 @@ func addVolumeMountsFromContainer(
return fmt.Errorf("container %q not found", sourceName)
}

// mergeEnvs merges environment variables, skipping duplicates by name
func mergeEnvs(envSlices ...[]corev1.EnvVar) []corev1.EnvVar {
envMap := make(map[string]corev1.EnvVar)
// Iterate through all provided slices
for _, slice := range envSlices {
for _, env := range slice {
if _, exists := envMap[env.Name]; !exists {
envMap[env.Name] = env
}
}
}
// Convert map back to slice
merged := make([]corev1.EnvVar, 0, len(envMap))
for _, env := range envMap {
merged = append(merged, env)
}
return merged
}

func (impl LifecycleImplementation) injectSharedPluginConfig(
ctx context.Context,
pluginConfig *PluginConfiguration,
Expand Down Expand Up @@ -562,7 +496,6 @@ func (impl LifecycleImplementation) reconcilePod(
ctx context.Context,
cluster *cnpgv1.Cluster,
request *lifecycle.OperatorLifecycleRequest,
envVars []corev1.EnvVar,
pluginConfig *PluginConfiguration,
) (*lifecycle.OperatorLifecycleResponse, error) {

Expand All @@ -578,10 +511,7 @@ func (impl LifecycleImplementation) reconcilePod(

if len(pluginConfig.StanzaRef) != 0 || len(pluginConfig.ReplicaStanzaRef) != 0 {
// Build the container config using envVars from caller
sidecar := corev1.Container{
Env: envVars,
Args: []string{"instance"},
}
sidecar := corev1.Container{Args: []string{"instance"}}

if err := impl.injectSharedPluginConfig(ctx, pluginConfig, &sidecar); err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion internal/operator/lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func envVarSliceToMap(envVars []corev1.EnvVar) map[string]string {
return envMap
}
func TestEnvFromContainer(t *testing.T) {
srcPod := corev1.PodSpec{
srcPod := &corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "test-container-with-var",
Expand Down
1 change: 1 addition & 0 deletions internal/pgbackrest/api/stanza.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ func (r *Stanza) ToEnv() ([]string, error) {
envConf,
"PGBACKREST_LOG_LEVEL_FILE=off",
"PGBACKREST_LOCK_PATH=/controller/tmp/pgbackrest-cnpg-plugin.lock",
"PGBACKREST_PG1_PATH=/var/lib/postgresql/data/pgdata",
)

return envConf, nil
Expand Down
1 change: 1 addition & 0 deletions internal/pgbackrest/api/stanza_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

func TestStanzaToEnv(t *testing.T) {
expected := []string{
"PGBACKREST_PG1_PATH=/var/lib/postgresql/data/pgdata",
"PGBACKREST_REPO1_S3_BUCKET=demo",
"PGBACKREST_REPO1_S3_ENDPOINT=s3.minio.svc.cluster.local",
"PGBACKREST_REPO1_S3_REGION=us-east-1",
Expand Down
Loading