Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,18 @@ tidy: ## Run go mod tidy on every mod file in the repo
PROCS?=$(shell expr $(shell nproc --ignore 2) / 2)
PROC_CMD = --procs ${PROCS}

# Skip instanceha tests if --focus or --skip is used (focused test run)
ifeq (,$(findstring --focus,$(GINKGO_ARGS))$(findstring --skip,$(GINKGO_ARGS)))
INSTANCEHA_DEP = test-instanceha
else
INSTANCEHA_DEP =
endif

.PHONY: test
test: manifests generate gowork fmt vet envtest ginkgo test-instanceha ## Run tests.
test: manifests generate gowork fmt vet envtest ginkgo $(INSTANCEHA_DEP) ## Run tests.
KUBEBUILDER_ASSETS="$(shell $(ENVTEST) -v debug --bin-dir $(LOCALBIN) use $(ENVTEST_K8S_VERSION) -p path)" \
OPERATOR_TEMPLATES="$(PWD)/templates" \
$(GINKGO) --trace --cover --coverpkg=./pkg/...,./internal/...,./apis/network/v1beta1/...,./apis/rabbitmq/v1beta1/... --coverprofile cover.out --covermode=atomic ${PROC_CMD} $(GINKGO_ARGS) ./test/... ./apis/network/... ./apis/rabbitmq/... ./internal/webhook/...
$(GINKGO) --trace --cover --coverpkg=./pkg/...,./internal/...,./apis/network/v1beta1/...,./apis/rabbitmq/v1beta1/... --coverprofile cover.out --covermode=atomic ${PROC_CMD} $(GINKGO_ARGS) ./test/... ./apis/network/... ./apis/rabbitmq/... ./internal/webhook/... ./internal/controller/...

.PHONY: test-instanceha
test-instanceha: ## Run instanceha tests.
Expand Down
23 changes: 23 additions & 0 deletions apis/bases/rabbitmq.openstack.org_rabbitmqs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1940,6 +1940,12 @@ spec:
- type
type: object
type: array
currentVersion:
description: |-
CurrentVersion - the currently deployed RabbitMQ version (e.g., "3.9", "4.2")
This is controller-managed and reflects the actual running version.
openstack-operator should use the "rabbitmq.openstack.org/target-version" annotation to request version changes.
type: string
lastAppliedTopology:
description: LastAppliedTopology - the last applied Topology
properties:
Expand All @@ -1963,6 +1969,13 @@ spec:
the opentack-operator in the top-level CR (e.g. the ContainerImage)
format: int64
type: integer
proxyRequired:
description: |-
ProxyRequired - tracks whether the AMQP proxy sidecar is required for this cluster.
Set to true when upgrading from RabbitMQ 3.x to 4.x with Quorum queues.
The proxy allows non-durable clients to work with quorum queues during the upgrade window.
Only cleared when the "rabbitmq.openstack.org/clients-reconfigured" annotation is set.
type: boolean
queueType:
description: QueueType - store whether default ha-all policy is present
or not
Expand All @@ -1975,6 +1988,16 @@ spec:
type: string
type: array
x-kubernetes-list-type: atomic
upgradePhase:
description: |-
UpgradePhase - tracks the current phase of a version upgrade or migration
Valid values:
"" (no upgrade in progress)
"DeletingResources" (deleting cluster and ha-all policy)
"WaitingForPods" (waiting for pods to terminate after cluster deletion)
"WaitingForCluster" (pods terminated, waiting for new cluster creation)
This allows resuming upgrades that failed midway.
type: string
type: object
type: object
served: true
Expand Down
24 changes: 24 additions & 0 deletions apis/rabbitmq/v1beta1/rabbitmq_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ const (
QueueTypeQuorum = "Quorum"
// QueueTypeNone - no special queue type
QueueTypeNone = "None"

// Annotations
// AnnotationTargetVersion - annotation key for target RabbitMQ version (set by openstack-operator)
AnnotationTargetVersion = "rabbitmq.openstack.org/target-version"
)

// PodOverride defines per-pod service configurations
Expand Down Expand Up @@ -133,6 +137,26 @@ type RabbitMqStatus struct {
// When populated, transport URLs use these hostnames instead of pod names.
// +listType=atomic
ServiceHostnames []string `json:"serviceHostnames,omitempty"`

// CurrentVersion - the currently deployed RabbitMQ version (e.g., "3.9", "4.2")
// This is controller-managed and reflects the actual running version.
// openstack-operator should use the "rabbitmq.openstack.org/target-version" annotation to request version changes.
CurrentVersion string `json:"currentVersion,omitempty"`

// UpgradePhase - tracks the current phase of a version upgrade or migration
// Valid values:
// "" (no upgrade in progress)
// "DeletingResources" (deleting cluster and ha-all policy)
// "WaitingForPods" (waiting for pods to terminate after cluster deletion)
// "WaitingForCluster" (pods terminated, waiting for new cluster creation)
// This allows resuming upgrades that failed midway.
UpgradePhase string `json:"upgradePhase,omitempty"`

// ProxyRequired - tracks whether the AMQP proxy sidecar is required for this cluster.
// Set to true when upgrading from RabbitMQ 3.x to 4.x with Quorum queues.
// The proxy allows non-durable clients to work with quorum queues during the upgrade window.
// Only cleared when the "rabbitmq.openstack.org/clients-reconfigured" annotation is set.
ProxyRequired bool `json:"proxyRequired,omitempty"`
}

//+kubebuilder:object:root=true
Expand Down
150 changes: 146 additions & 4 deletions apis/rabbitmq/v1beta1/rabbitmq_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ package v1beta1

import (
"context"
"fmt"
"strconv"
"strings"

common_webhook "github.com/openstack-k8s-operators/lib-common/modules/common/webhook"
rabbitmqv2 "github.com/rabbitmq/cluster-operator/v2/api/v1beta1"
Expand Down Expand Up @@ -48,6 +51,22 @@ func SetupRabbitMqDefaults(defaults RabbitMqDefaults) {
rabbitmqlog.Info("RabbitMq defaults initialized", "defaults", defaults)
}

// parseVersionMajor extracts the major version number from a version string
// Returns the major version and an error if parsing fails
func parseVersionMajor(version string) (int, error) {
parts := strings.Split(version, ".")
if len(parts) < 1 {
return 0, fmt.Errorf("invalid version format: %s", version)
}

major, err := strconv.Atoi(parts[0])
if err != nil {
return 0, fmt.Errorf("invalid major version: %s", parts[0])
}

return major, nil
}

// Default sets default values for the RabbitMq using the provided Kubernetes client
// to check if the cluster already exists
func (r *RabbitMq) Default(k8sClient client.Client) {
Expand All @@ -68,9 +87,55 @@ func (r *RabbitMq) Default(k8sClient client.Client) {
panic("cannot determine if RabbitMq resource is new or existing due to API error")
}

if err == nil && existingRabbitMq.Spec.QueueType != nil && *existingRabbitMq.Spec.QueueType != "" {
r.Spec.QueueType = existingRabbitMq.Spec.QueueType
rabbitmqlog.Info("preserving QueueType from existing CR", "name", r.Name, "queueType", *r.Spec.QueueType)
if err == nil && existingRabbitMq.Spec.QueueType != nil {
// Check if we should override Mirrored to Quorum on RabbitMQ 4.2+
// Only override if the queueType is NOT being explicitly CHANGED to Mirrored
shouldOverride := false
if *existingRabbitMq.Spec.QueueType == "Mirrored" {
// Kubernetes fills in all spec fields during updates, so we need to detect
// if the user is explicitly CHANGING to Mirrored (from something else)
// vs just preserving the existing Mirrored value
userChangingToMirrored := r.Spec.QueueType != nil &&
*r.Spec.QueueType == "Mirrored" &&
*existingRabbitMq.Spec.QueueType != "Mirrored"

if !userChangingToMirrored {
// User is not explicitly changing TO Mirrored, so we can auto-override
// Check the target version (annotation) to see if upgrading to 4.2+
if r.Annotations != nil {
if targetVersion, hasTarget := r.Annotations[AnnotationTargetVersion]; hasTarget {
// Parse version to check if major version is 4 or higher
majorVersion, err := parseVersionMajor(targetVersion)
if err == nil && majorVersion >= 4 {
shouldOverride = true
queueType := "Quorum"
r.Spec.QueueType = &queueType
rabbitmqlog.Info("overriding Mirrored to Quorum on RabbitMQ 4.2+",
"name", r.Name,
"targetVersion", targetVersion,
"queueType", "Quorum")
}
}
}
}
}

// Only preserve existing queueType if we didn't override above
if !shouldOverride {
// Preserve existing queueType if the incoming request doesn't specify one
// This allows operators to explicitly change the queueType for migration purposes
if r.Spec.QueueType == nil || *r.Spec.QueueType == "" {
r.Spec.QueueType = existingRabbitMq.Spec.QueueType
rabbitmqlog.Info("preserving QueueType from existing CR", "name", r.Name, "queueType", *r.Spec.QueueType)
} else if *r.Spec.QueueType != *existingRabbitMq.Spec.QueueType {
// User is explicitly changing queueType - allow it to proceed to validation
rabbitmqlog.Info("allowing queueType change",
"name", r.Name,
"oldQueueType", *existingRabbitMq.Spec.QueueType,
"newQueueType", *r.Spec.QueueType)
}
}

isNew = false
} else {
// Check if RabbitMQCluster exists (upgrade scenario: cluster exists but CR is new)
Expand All @@ -93,6 +158,16 @@ func (r *RabbitMq) Default(k8sClient client.Client) {
}

r.Spec.Default(isNew)

// For updates (isNew == false), also apply version-aware defaults
// This handles cases where existing CRs don't have queueType set
if !isNew {
if r.Annotations != nil {
if targetVersion, hasTarget := r.Annotations[AnnotationTargetVersion]; hasTarget {
r.Spec.RabbitMqSpecCore.DefaultForUpdate(targetVersion)
}
}
}
}

// Default - set defaults for this RabbitMq spec
Expand All @@ -105,12 +180,35 @@ func (spec *RabbitMqSpec) Default(isNew bool) {

// Default - set defaults for this RabbitMqSpecCore
func (spec *RabbitMqSpecCore) Default(isNew bool) {
// Default QueueType for new instances
if isNew && (spec.QueueType == nil || *spec.QueueType == "") {
queueType := "Quorum"
spec.QueueType = &queueType
}
}

// DefaultForUpdate - set defaults for RabbitMqSpecCore during updates
// This is called when updating existing CRs
// For RabbitMQ 4.2+, we enforce Quorum queues (override Mirrored if present)
func (spec *RabbitMqSpecCore) DefaultForUpdate(targetVersion string) {
if targetVersion == "" {
return
}

majorVersion, err := parseVersionMajor(targetVersion)
if err != nil || majorVersion < 4 {
return
}

// For RabbitMQ 4.2+:
// 1. If queueType is not set, default to Quorum
// 2. If queueType is Mirrored, override to Quorum (mirrored queues removed in 4.2)
if spec.QueueType == nil || *spec.QueueType == "" || *spec.QueueType == "Mirrored" {
queueType := "Quorum"
spec.QueueType = &queueType
}
}

var _ webhook.Validator = &RabbitMq{}

// ValidateCreate implements webhook.Validator so a webhook will be registered for the type
Expand Down Expand Up @@ -149,7 +247,7 @@ func (r *RabbitMq) ValidateCreate() (admission.Warnings, error) {
}

// ValidateUpdate implements webhook.Validator so a webhook will be registered for the type
func (r *RabbitMq) ValidateUpdate(_ runtime.Object) (admission.Warnings, error) {
func (r *RabbitMq) ValidateUpdate(old runtime.Object) (admission.Warnings, error) {
rabbitmqlog.Info("validate update", "name", r.Name)

var allErrs field.ErrorList
Expand All @@ -167,6 +265,50 @@ func (r *RabbitMq) ValidateUpdate(_ runtime.Object) (admission.Warnings, error)
// Validate QueueType if specified
allErrs = append(allErrs, r.Spec.ValidateQueueType(basePath)...)

// Note: We don't block queueType: Mirrored on RabbitMQ 4.x here because:
// 1. Default() function automatically overrides Mirrored → Quorum when target-version is 4.2+
// 2. DefaultForUpdate() handles cases where queueType is not explicitly set
// 3. Controller also enforces Quorum queues on RabbitMQ 4.x upgrades
// 4. RabbitMQ server will reject mirrored queues on 4.2+ if they somehow get through
oldRabbitMq := old.(*RabbitMq)

// Block migrating TO Quorum on RabbitMQ 3.x (no server-side enforcement available)
// Allow creating new clusters with Quorum on 3.x, only block migrations
// UNLESS there's a concurrent version upgrade to 4.x (which will wipe storage)
if r.Spec.QueueType != nil && *r.Spec.QueueType == "Quorum" {
// Check if queueType is being changed TO Quorum (wasn't Quorum before)
queueTypeChanged := oldRabbitMq.Spec.QueueType != nil && *oldRabbitMq.Spec.QueueType != "Quorum"

if queueTypeChanged {
// Check current running version from Status (controller-managed)
currentVersion := oldRabbitMq.Status.CurrentVersion
if currentVersion != "" {
// Parse version - if major version is 3.x, check if upgrading
majorVersion, err := parseVersionMajor(currentVersion)
if err == nil && majorVersion == 3 {
// Check if there's a concurrent version upgrade to 4.x via annotation
isUpgradingTo4x := false
if r.Annotations != nil {
if targetVersion, hasTarget := r.Annotations[AnnotationTargetVersion]; hasTarget {
targetMajor, err := parseVersionMajor(targetVersion)
if err == nil && targetMajor >= 4 {
isUpgradingTo4x = true
}
}
}

// Only block if NOT upgrading to 4.x
if !isUpgradingTo4x {
allErrs = append(allErrs, field.Forbidden(
basePath.Child("queueType"),
"Migrating to Quorum queues on RabbitMQ 3.x is not supported due to lack of server-side enforcement. "+
"Upgrade to RabbitMQ 4.x first to enable automatic Quorum queue migration."))
}
}
}
}
}

if len(allErrs) != 0 {
return allWarn, apierrors.NewInvalid(
schema.GroupKind{Group: "rabbitmq.openstack.org", Kind: "RabbitMq"},
Expand Down
23 changes: 23 additions & 0 deletions config/crd/bases/rabbitmq.openstack.org_rabbitmqs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1940,6 +1940,12 @@ spec:
- type
type: object
type: array
currentVersion:
description: |-
CurrentVersion - the currently deployed RabbitMQ version (e.g., "3.9", "4.2")
This is controller-managed and reflects the actual running version.
openstack-operator should use the "rabbitmq.openstack.org/target-version" annotation to request version changes.
type: string
lastAppliedTopology:
description: LastAppliedTopology - the last applied Topology
properties:
Expand All @@ -1963,6 +1969,13 @@ spec:
the opentack-operator in the top-level CR (e.g. the ContainerImage)
format: int64
type: integer
proxyRequired:
description: |-
ProxyRequired - tracks whether the AMQP proxy sidecar is required for this cluster.
Set to true when upgrading from RabbitMQ 3.x to 4.x with Quorum queues.
The proxy allows non-durable clients to work with quorum queues during the upgrade window.
Only cleared when the "rabbitmq.openstack.org/clients-reconfigured" annotation is set.
type: boolean
queueType:
description: QueueType - store whether default ha-all policy is present
or not
Expand All @@ -1975,6 +1988,16 @@ spec:
type: string
type: array
x-kubernetes-list-type: atomic
upgradePhase:
description: |-
UpgradePhase - tracks the current phase of a version upgrade or migration
Valid values:
"" (no upgrade in progress)
"DeletingResources" (deleting cluster and ha-all policy)
"WaitingForPods" (waiting for pods to terminate after cluster deletion)
"WaitingForCluster" (pods terminated, waiting for new cluster creation)
This allows resuming upgrades that failed midway.
type: string
type: object
type: object
served: true
Expand Down
Loading