Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/controller/as3Handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ import (
"net/http"
"strings"

"sync"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"k8s.io/apimachinery/pkg/util/intstr"
"sync"
)

var _ = Describe("AS3Handler Tests", func() {
Expand Down Expand Up @@ -853,5 +854,4 @@ var _ = Describe("AS3Handler Tests", func() {
Expect(config.tenantResponseMap["test"].agentResponseCode).To(Equal(http.StatusUnprocessableEntity), "Response code should be 422")
})
})

})
92 changes: 46 additions & 46 deletions pkg/controller/clusterHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ func (ch *ClusterHandler) addClusterConfig(clusterName string, config *ClusterCo
ch.Unlock()
}

// deleteClusterConfig removes a cluster configuration from the ClusterHandler.
func (ch *ClusterHandler) deleteClusterConfig(clusterName string) {
ch.Lock()
delete(ch.ClusterConfigs, clusterName)
ch.Unlock()
}
//// deleteClusterConfig removes a cluster configuration from the ClusterHandler.
//func (ch *ClusterHandler) deleteClusterConfig(clusterName string) {
// ch.Lock()
// delete(ch.ClusterConfigs, clusterName)
// ch.Unlock()
//}

// getClusterConfig returns the cluster configuration for the specified cluster.
func (ch *ClusterHandler) getClusterConfig(clusterName string) *ClusterConfig {
Expand Down Expand Up @@ -96,37 +96,37 @@ func (ch *ClusterHandler) getInformerStore(clusterName string) *InformerStore {
}

// enqueueEvent adds an event to the eventQueue after checking for uniqueness.
func (ch *ClusterHandler) enqueueEvent(clusterName string, obj interface{}) {
key := fmt.Sprintf("%s/%s", clusterName, "")
if _, exists := ch.uniqueAppIdentifier[key]; exists {
fmt.Printf("Duplicate event discarded: %s\n", key)
return
}
ch.uniqueAppIdentifier[key] = struct{}{}
ch.eventQueue.Add(obj)
fmt.Printf("Event queued: %s\n", key)
}

// ProcessEvents processes events from the eventQueue, applying deduplication and passing unique events to the controller.
func (ch *ClusterHandler) ProcessEvents() {
for {
obj, shutdown := ch.eventQueue.Get()
if shutdown {
break
}

// Process event
ch.processEvent(obj)
ch.eventQueue.Done(obj)
}
}
//func (ch *ClusterHandler) enqueueEvent(clusterName string, obj interface{}) {
// key := fmt.Sprintf("%s/%s", clusterName, "")
// if _, exists := ch.uniqueAppIdentifier[key]; exists {
// fmt.Printf("Duplicate event discarded: %s\n", key)
// return
// }
// ch.uniqueAppIdentifier[key] = struct{}{}
// ch.eventQueue.Add(obj)
// fmt.Printf("Event queued: %s\n", key)
//}

//// ProcessEvents processes events from the eventQueue, applying deduplication and passing unique events to the controller.
//func (ch *ClusterHandler) ProcessEvents() {
// for {
// obj, shutdown := ch.eventQueue.Get()
// if shutdown {
// break
// }
//
// // Process event
// ch.processEvent(obj)
// ch.eventQueue.Done(obj)
// }
//}

// processEvent handles individual events, simulating sending to a controller.
func (ch *ClusterHandler) processEvent(obj interface{}) {
// Here you would handle the business logic for the event.
fmt.Printf("Processing event: %v\n", obj)
// Add actual controller handling logic here.
}
//func (ch *ClusterHandler) processEvent(obj interface{}) {
// // Here you would handle the business logic for the event.
// fmt.Printf("Processing event: %v\n", obj)
// // Add actual controller handling logic here.
//}

// remove any cluster which is not provided in externalClustersConfig or not part of the HA cluster
func (ch *ClusterHandler) cleanClusterCache(primaryClusterName, secondaryClusterName string, activeClusters map[string]bool) {
Expand Down Expand Up @@ -262,17 +262,17 @@ func (ch *ClusterHandler) getMonitoredNamespaces(clusterName string) map[string]
return ns
}

func (ch *ClusterHandler) getClusterNames() map[string]struct{} {
ch.RLock()
defer ch.RUnlock()
clusterNames := make(map[string]struct{})
for clusterName, config := range ch.ClusterConfigs {
if config.clusterDetails.ServiceTypeLBDiscovery {
clusterNames[clusterName] = struct{}{}
}
}
return clusterNames
}
//func (ch *ClusterHandler) getClusterNames() map[string]struct{} {
// ch.RLock()
// defer ch.RUnlock()
// clusterNames := make(map[string]struct{})
// for clusterName, config := range ch.ClusterConfigs {
// if config.clusterDetails.ServiceTypeLBDiscovery {
// clusterNames[clusterName] = struct{}{}
// }
// }
// return clusterNames
//}

// ResourceStatusUpdater is a go routine that listens to the resourceStatusUpdateChan
func (ch *ClusterHandler) ResourceStatusUpdater() {
Expand Down
9 changes: 6 additions & 3 deletions pkg/controller/controller_suit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ import (
"fmt"
"github.com/f5devcentral/go-bigip"
"io"
"net/http"
"strings"
"testing"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
dynamicfake "k8s.io/client-go/dynamic/fake"
"net/http"
"strings"
"testing"

cisapiv1 "github.com/F5Networks/k8s-bigip-ctlr/v2/config/apis/cis/v1"
"github.com/F5Networks/k8s-bigip-ctlr/v2/pkg/test"
Expand Down Expand Up @@ -123,6 +124,8 @@ func newMockRequestHandler(writer writer.Writer) *RequestHandler {
},
}
return &RequestHandler{
reqChan: make(chan ResourceConfigRequest, 1),
respChan: make(chan *agentPostConfig, 1),
PrimaryBigIPWorker: &Agent{
APIHandler: &APIHandler{LTM: &LTMAPIHandler{
&BaseAPIHandler{
Expand Down
40 changes: 20 additions & 20 deletions pkg/controller/eventNotifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,26 +80,26 @@ func (en *EventNotifier) CreateNotifierForNamespace(
return evNotifier
}

// Get the notifier for a namespace
func (en *EventNotifier) GetNotifierForNamespace(
namespace string,
) *NamespaceEventNotifier {

en.mutex.Lock()
defer en.mutex.Unlock()

evNotifier, found := en.notifierMap[namespace]
if !found {
return nil
}
return evNotifier
}

func (en *EventNotifier) DeleteNotifierForNamespace(namespace string) {
en.mutex.Lock()
defer en.mutex.Unlock()
delete(en.notifierMap, namespace)
}
//// Get the notifier for a namespace
//func (en *EventNotifier) GetNotifierForNamespace(
// namespace string,
//) *NamespaceEventNotifier {
//
// en.mutex.Lock()
// defer en.mutex.Unlock()
//
// evNotifier, found := en.notifierMap[namespace]
// if !found {
// return nil
// }
// return evNotifier
//}
//
//func (en *EventNotifier) DeleteNotifierForNamespace(namespace string) {
// en.mutex.Lock()
// defer en.mutex.Unlock()
// delete(en.notifierMap, namespace)
//}

func (nen *NamespaceEventNotifier) RecordEvent(
obj runtime.Object,
Expand Down
4 changes: 4 additions & 0 deletions pkg/controller/gtmBackend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,17 @@ var _ = Describe("Backend Tests", func() {
BaseAPIHandler: mockBaseAPIHandler,
},
}
agent.GTM.PostManager.firstPost = true
agent.GTM.PostManager.PostDelay = 30
go agent.gtmWorker()
agent.GTM.PostManager.postChan <- postConfig
response := <-agent.GTM.PostManager.respChan

Expect(response).NotTo(BeNil(), "response should not be nil")
Expect(response.tenantResponseMap["test_gtm"].agentResponseCode).To(Equal(http.StatusOK), "response code should be 200")

postConfig.incomingTenantDeclMap = make(map[string]as3Tenant)
agent.GTM.PostManager.postChan <- postConfig
close(agent.LTM.PostManager.postChan)
close(agent.LTM.PostManager.respChan)
})
Expand Down
32 changes: 30 additions & 2 deletions pkg/controller/multiClusterHealthProbeManager_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package controller

import (
"crypto/tls"
"net/http"

"github.com/F5Networks/k8s-bigip-ctlr/v2/pkg/teem"
"github.com/F5Networks/k8s-bigip-ctlr/v2/pkg/test"
. "github.com/onsi/ginkgo/v2"
Expand All @@ -10,7 +13,6 @@ import (
"gopkg.in/yaml.v2"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8sfake "k8s.io/client-go/kubernetes/fake"
"net/http"
)

var _ = Describe("Multi Cluster Health Probe", func() {
Expand Down Expand Up @@ -124,7 +126,9 @@ extendedRouteSpec:
Expect(mockCtlr.RequestHandler.PrimaryClusterHealthProbeParams.statusRunning).To(BeFalse(), "incorrect primary cluster health status")
mockCtlr.getPrimaryClusterHealthStatus()
Expect(mockCtlr.RequestHandler.PrimaryClusterHealthProbeParams.statusRunning).To(BeFalse(), "incorrect primary cluster health status")

mockCtlr.RequestHandler.PrimaryClusterHealthProbeParams.statusChanged = false
mockCtlr.getPrimaryClusterHealthStatus()
Expect(mockCtlr.RequestHandler.PrimaryClusterHealthProbeParams.statusRunning).To(BeFalse(), "incorrect primary cluster health status")
})
It("Check Primary Cluster HealthProbe with valid http endpoint", func() {
server := ghttp.NewServer()
Expand All @@ -139,6 +143,30 @@ extendedRouteSpec:
Expect(mockCtlr.RequestHandler.checkPrimaryClusterHealthStatus()).To(BeTrue(), "incorrect primary cluster health status")
server.Close()
})
It("Check Primary Cluster HealthProbe with valid https endpoint", func() {
http.DefaultTransport.(*http.Transport).TLSClientConfig = &tls.Config{
InsecureSkipVerify: true,
}
server := ghttp.NewTLSServer()
statusCode := 200
server.AppendHandlers(
ghttp.CombineHandlers(
ghttp.VerifyRequest("GET", "/"),
ghttp.RespondWithJSONEncoded(statusCode, ""),
))
es.HAClusterConfig.PrimaryClusterEndPoint = "https://" + server.Addr()
mockCtlr.updateHealthProbeConfig(es.HAClusterConfig)
mockCtlr.RequestHandler.PrimaryClusterHealthProbeParams.EndPointType = "https"
Expect(mockCtlr.RequestHandler.checkPrimaryClusterHealthStatus()).To(BeTrue(), "incorrect primary cluster health status")
server.Close()
})
It("Check Primary Cluster HealthProbe with invalid https endpoint", func() {
Expect(mockCtlr.RequestHandler.getPrimaryClusterHealthStatusFromHTTPSEndPoint()).To(BeFalse())
mockCtlr.RequestHandler.PrimaryClusterHealthProbeParams.EndPoint = "http://0.0.0.0:80"
Expect(mockCtlr.RequestHandler.getPrimaryClusterHealthStatusFromHTTPSEndPoint()).To(BeFalse())
mockCtlr.RequestHandler.PrimaryClusterHealthProbeParams.EndPoint = "https://"
Expect(mockCtlr.RequestHandler.getPrimaryClusterHealthStatusFromHTTPSEndPoint()).To(BeFalse())
})

It("Check Primary Cluster HealthProbe with invalid http endpoint", func() {
Expect(mockCtlr.RequestHandler.getPrimaryClusterHealthStatusFromHTTPEndPoint()).To(BeFalse())
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/node_poll_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,7 @@ var _ = Describe("Node Poller Handler", func() {
}

mockCtlr.SetupNodeProcessing("")
mockCtlr.processBlockAffinities("")
mockWriter, ok = mockCtlr.PrimaryBigIPWorker.ConfigWriter.(*test.MockWriter)
Expect(ok).To(Equal(true))
Expect(len(mockWriter.Sections)).To(Equal(1))
Expand Down
Loading
Loading