Skip to content

Commit 9a8ccd9

Browse files
committed
feat: add option to disable ClusterIP routing via revision annotation
Adds the ability to optionally disable ClusterIP routing on a per-revision basis through the 'serving.knative.dev/disable-clusterip-routing' annotation. When this annotation is set to 'true' on a revision, the activator will bypass ClusterIP routing and use direct pod IP routing exclusively, even when a healthy ClusterIP is available. Key changes: - Add DisableClusterIPRoutingAnnotationKey constant for the new annotation - Extend revisionThrottler to track and respect the disableClusterIP setting - Modify routing logic to skip ClusterIP when disabled - Read annotation value during throttler creation from revision metadata - Maintain full backward compatibility (ClusterIP routing enabled by default) This feature is useful for scenarios where direct pod routing is preferred over ClusterIP load balancing, such as for debugging, performance testing, or when specific routing behaviors are required. Includes comprehensive tests to verify both disabled and default behaviors.
1 parent 4853ead commit 9a8ccd9

File tree

6 files changed

+120
-9
lines changed

6 files changed

+120
-9
lines changed
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
apiVersion: serving.knative.dev/v1
2+
kind: Service
3+
metadata:
4+
name: example-service
5+
spec:
6+
template:
7+
metadata:
8+
annotations:
9+
# Disable ClusterIP routing for this revision
10+
# When set to "true", the activator will only use pod IP routing
11+
serving.knative.dev/disable-clusterip-routing: "true"
12+
spec:
13+
containers:
14+
- image: gcr.io/knative-samples/helloworld-go
15+
ports:
16+
- containerPort: 8080
17+
env:
18+
- name: TARGET
19+
value: "Go Sample v1"
20+

pkg/activator/net/throttler.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ type revisionThrottler struct {
132132
revID types.NamespacedName
133133
containerConcurrency int
134134
lbPolicy lbPolicy
135+
disableClusterIP bool
135136

136137
// These are used in slicing to infer which pods to assign
137138
// to this activator.
@@ -171,6 +172,7 @@ type revisionThrottler struct {
171172
func newRevisionThrottler(revID types.NamespacedName,
172173
containerConcurrency int, proto string,
173174
breakerParams queue.BreakerParams,
175+
disableClusterIP bool,
174176
logger *zap.SugaredLogger,
175177
) *revisionThrottler {
176178
logger = logger.With(zap.String(logkey.Key, revID.String()))
@@ -198,6 +200,7 @@ func newRevisionThrottler(revID types.NamespacedName,
198200
logger: logger,
199201
protocol: proto,
200202
lbPolicy: lbp,
203+
disableClusterIP: disableClusterIP,
201204
}
202205

203206
// Start with unknown
@@ -414,10 +417,11 @@ func (rt *revisionThrottler) handleUpdate(update revisionDestsUpdate) {
414417
rt.logger.Debugw("Handling update",
415418
zap.String("ClusterIP", update.ClusterIPDest), zap.Object("dests", logging.StringSet(update.Dests)))
416419

417-
// ClusterIP is not yet ready, so we want to send requests directly to the pods.
420+
// If ClusterIP routing is disabled OR ClusterIP is not yet ready,
421+
// we want to send requests directly to the pods.
418422
// NB: this will not be called in parallel, thus we can build a new podTrackers
419423
// array before taking out a lock.
420-
if update.ClusterIPDest == "" {
424+
if rt.disableClusterIP || update.ClusterIPDest == "" {
421425
// Create a map for fast lookup of existing trackers.
422426
trackersMap := make(map[string]*podTracker, len(rt.podTrackers))
423427
for _, tracker := range rt.podTrackers {
@@ -546,11 +550,16 @@ func (t *Throttler) getOrCreateRevisionThrottler(revID types.NamespacedName) (*r
546550
if err != nil {
547551
return nil, err
548552
}
553+
// Check if ClusterIP routing is disabled via annotation
554+
disableClusterIP := rev.Annotations != nil &&
555+
rev.Annotations[serving.DisableClusterIPRoutingAnnotationKey] == "true"
556+
549557
revThrottler = newRevisionThrottler(
550558
revID,
551559
int(rev.Spec.GetContainerConcurrency()),
552560
pkgnet.ServicePortName(rev.GetProtocol()),
553561
queue.BreakerParams{QueueDepth: breakerQueueDepth, MaxConcurrency: revisionMaxConcurrency},
562+
disableClusterIP,
554563
t.logger,
555564
)
556565
t.revisionThrottlers[revID] = revThrottler

pkg/activator/net/throttler_test.go

Lines changed: 78 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -618,7 +618,7 @@ func TestPodAssignmentFinite(t *testing.T) {
618618
defer cancel()
619619

620620
throttler := newTestThrottler(ctx)
621-
rt := newRevisionThrottler(revName, 42 /*cc*/, pkgnet.ServicePortNameHTTP1, testBreakerParams, logger)
621+
rt := newRevisionThrottler(revName, 42 /*cc*/, pkgnet.ServicePortNameHTTP1, testBreakerParams, false /*disableClusterIP*/, logger)
622622
rt.numActivators.Store(4)
623623
rt.activatorIndex.Store(0)
624624
throttler.revisionThrottlers[revName] = rt
@@ -670,7 +670,7 @@ func TestPodAssignmentInfinite(t *testing.T) {
670670
defer cancel()
671671

672672
throttler := newTestThrottler(ctx)
673-
rt := newRevisionThrottler(revName, 0 /*cc*/, pkgnet.ServicePortNameHTTP1, testBreakerParams, logger)
673+
rt := newRevisionThrottler(revName, 0 /*cc*/, pkgnet.ServicePortNameHTTP1, testBreakerParams, false /*disableClusterIP*/, logger)
674674
throttler.revisionThrottlers[revName] = rt
675675

676676
update := revisionDestsUpdate{
@@ -902,12 +902,87 @@ func TestMultipleActivators(t *testing.T) {
902902
func TestInfiniteBreakerCreation(t *testing.T) {
903903
// This test verifies that we use infiniteBreaker when CC==0.
904904
tttl := newRevisionThrottler(types.NamespacedName{Namespace: "a", Name: "b"}, 0, /*cc*/
905-
pkgnet.ServicePortNameHTTP1, queue.BreakerParams{}, TestLogger(t))
905+
pkgnet.ServicePortNameHTTP1, queue.BreakerParams{}, false /*disableClusterIP*/, TestLogger(t))
906906
if _, ok := tttl.breaker.(*infiniteBreaker); !ok {
907907
t.Errorf("The type of revisionBreaker = %T, want %T", tttl, (*infiniteBreaker)(nil))
908908
}
909909
}
910910

911+
func TestDisableClusterIPRouting(t *testing.T) {
912+
// Test that when disableClusterIP is true, we always use pod trackers even when ClusterIP is available
913+
logger := TestLogger(t)
914+
revName := types.NamespacedName{Namespace: testNamespace, Name: testRevision}
915+
916+
// Create a revision throttler with ClusterIP disabled
917+
rt := newRevisionThrottler(revName, 42 /*cc*/, pkgnet.ServicePortNameHTTP1,
918+
testBreakerParams, true /*disableClusterIP*/, logger)
919+
920+
// Update with both ClusterIP and pod destinations
921+
update := revisionDestsUpdate{
922+
Rev: revName,
923+
ClusterIPDest: "10.0.0.1:80", // ClusterIP is available
924+
Dests: sets.New("ip1", "ip2", "ip3"),
925+
}
926+
927+
rt.handleUpdate(update)
928+
929+
// Verify that we're using pod trackers, not ClusterIP
930+
rt.mux.RLock()
931+
defer rt.mux.RUnlock()
932+
933+
if rt.clusterIPTracker != nil {
934+
t.Error("Expected clusterIPTracker to be nil when ClusterIP routing is disabled")
935+
}
936+
937+
if len(rt.podTrackers) != 3 {
938+
t.Errorf("Expected 3 pod trackers, got %d", len(rt.podTrackers))
939+
}
940+
941+
// Verify pod trackers have correct destinations
942+
expectedDests := sets.New("ip1", "ip2", "ip3")
943+
for _, tracker := range rt.podTrackers {
944+
if !expectedDests.Has(tracker.dest) {
945+
t.Errorf("Unexpected pod tracker destination: %s", tracker.dest)
946+
}
947+
expectedDests.Delete(tracker.dest)
948+
}
949+
}
950+
951+
func TestEnableClusterIPRoutingByDefault(t *testing.T) {
952+
// Test that when disableClusterIP is false (default), we use ClusterIP when available
953+
logger := TestLogger(t)
954+
revName := types.NamespacedName{Namespace: testNamespace, Name: testRevision}
955+
956+
// Create a revision throttler with ClusterIP enabled (default)
957+
rt := newRevisionThrottler(revName, 42 /*cc*/, pkgnet.ServicePortNameHTTP1,
958+
testBreakerParams, false /*disableClusterIP*/, logger)
959+
960+
// Update with both ClusterIP and pod destinations
961+
update := revisionDestsUpdate{
962+
Rev: revName,
963+
ClusterIPDest: "10.0.0.1:80", // ClusterIP is available
964+
Dests: sets.New("ip1", "ip2", "ip3"),
965+
}
966+
967+
rt.handleUpdate(update)
968+
969+
// Verify that we're using ClusterIP, not pod trackers
970+
rt.mux.RLock()
971+
defer rt.mux.RUnlock()
972+
973+
if rt.clusterIPTracker == nil {
974+
t.Error("Expected clusterIPTracker to be set when ClusterIP routing is enabled")
975+
}
976+
977+
if rt.clusterIPTracker.dest != "10.0.0.1:80" {
978+
t.Errorf("Expected clusterIPTracker dest to be '10.0.0.1:80', got %s", rt.clusterIPTracker.dest)
979+
}
980+
981+
if rt.podTrackers != nil {
982+
t.Error("Expected podTrackers to be nil when using ClusterIP")
983+
}
984+
}
985+
911986
func (t *Throttler) try(ctx context.Context, requests int, try func(string) error) chan tryResult {
912987
resultChan := make(chan tryResult)
913988

pkg/apis/serving/register.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,11 @@ const (
144144

145145
// ProgressDeadlineAnnotationKey is the label key for the per revision progress deadline to set for the deployment
146146
ProgressDeadlineAnnotationKey = GroupName + "/progress-deadline"
147+
148+
// DisableClusterIPRoutingAnnotationKey is the annotation key to disable ClusterIP routing for a revision
149+
// When set to "true", the activator will only use pod IP routing and not use ClusterIP for load balancing.
150+
// By default, ClusterIP routing is enabled.
151+
DisableClusterIPRoutingAnnotationKey = GroupName + "/disable-clusterip-routing"
147152
)
148153

149154
var (

pkg/autoscaler/metrics/stat.pb.go

Lines changed: 2 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

test/test_images/grpc-ping/proto/ping.pb.go

Lines changed: 4 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)