From 67c25a97ff79f8c9f7111c11107171b80544d6e4 Mon Sep 17 00:00:00 2001 From: Kishore K G Date: Tue, 6 Jan 2026 16:18:19 +0000 Subject: [PATCH 1/7] This commit fixes a critical deadlock in the query scheduler that occurs when a tenant's max_outstanding_requests_per_tenant limit is dynamically reduced via runtime configuration. When MaxOutstandingPerTenant is reduced while a user's FIFORequestQueue is full, the getOrAddQueue method attempts to migrate requests to a smaller queue. Previously, this loop blocked indefinitely when the new queue capacity was reached, causing the scheduler to freeze. The fix ensures the migration loop breaks when the new queue is full, effectively dropping excess requests instead of blocking. Signed-off-by: Kishore K G --- patch.diff | 78 ++++++++++++++++++++++++++++++ pkg/scheduler/queue/queue_test.go | 54 +++++++++++++++++++++ pkg/scheduler/queue/user_queues.go | 2 +- 3 files changed, 133 insertions(+), 1 deletion(-) create mode 100644 patch.diff diff --git a/patch.diff b/patch.diff new file mode 100644 index 00000000000..2bab212d7bd --- /dev/null +++ b/patch.diff @@ -0,0 +1,78 @@ +diff --git a/pkg/scheduler/queue/queue_test.go b/pkg/scheduler/queue/queue_test.go +index 47bd5295271..4b9561fc76d 100644 +--- a/pkg/scheduler/queue/queue_test.go ++++ b/pkg/scheduler/queue/queue_test.go +@@ -285,6 +285,60 @@ func TestQueriersShouldGetHighPriorityQueryFirst(t *testing.T) { + assert.Equal(t, highPriorityRequest, nextRequest) // high priority request returned, although it was enqueued the last + } + ++func TestGetOrAddQueue_ShouldNotDeadlockWhenLimitsAreReduced(t *testing.T) { ++ // Setup: Large initial limit ++ initialLimit := 100 ++ newLimit := 50 ++ ++ limits := MockLimits{ ++ MaxOutstanding: initialLimit, ++ } ++ ++ // Initialize queues ++ q := newUserQueues(0, limits, nil) ++ ++ // Create user queue ++ userID := "test-user-deadlock" ++ queue := q.getOrAddQueue(userID, 1) ++ ++ // Fill queue to capacity (near initialLimit) ++ // We fill it more than newLimit ++ itemsToFill := 80 ++ for i := 0; i < itemsToFill; i++ { ++ queue.enqueueRequest(MockRequest{priority: 1}) ++ } ++ ++ require.Equal(t, itemsToFill, queue.length()) ++ ++ // Reduce limit below current size ++ // We change the mock limits return value. ++ // In real app this comes from runtime config reload. ++ limits.MaxOutstanding = newLimit ++ q.limits = limits // Update strict reference in queues struct (mocking the reload effect) ++ ++ // Now call getOrAddQueue again. ++ // This triggers the migration logic: existing queue (80 items) -> new queue (cap 50). ++ done := make(chan struct{}) ++ go func() { ++ _ = q.getOrAddQueue(userID, 1) ++ close(done) ++ }() ++ ++ select { ++ case <-done: ++ // Success: no deadlock ++ case <-time.After(2 * time.Second): ++ t.Fatal("Deadlock detected! getOrAddQueue timed out while migrating queue with reduced limits.") ++ } ++ ++ // The new queue should be capped at newLimit or contain what managed to fit. ++ // Logic: it breaks when full. So new queue should be full (length == newLimit). ++ newQueue := q.getOrAddQueue(userID, 1) // Should be fast now ++ ++ // Note: The actual items in queue should be newLimit (50). The rest (30) are dropped. ++ assert.Equal(t, newLimit, newQueue.length()) ++} ++ + func TestReservedQueriersShouldOnlyGetHighPriorityQueries(t *testing.T) { + queue := NewRequestQueue(0, + prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user", "priority", "type"}), +diff --git a/pkg/scheduler/queue/user_queues.go b/pkg/scheduler/queue/user_queues.go +index c7e30d87375..eaed52ad2e4 100644 +--- a/pkg/scheduler/queue/user_queues.go ++++ b/pkg/scheduler/queue/user_queues.go +@@ -169,7 +169,7 @@ func (q *queues) getOrAddQueue(userID string, maxQueriers int) userRequestQueue + tmpQueue := q.createUserRequestQueue(userID) + + // flush to new queue +- for uq.queue.length() > 0 { ++ for (uq.queue.length() > 0) && (tmpQueue.length() < maxOutstanding) { + tmpQueue.enqueueRequest(uq.queue.dequeueRequest(0, false)) + } + diff --git a/pkg/scheduler/queue/queue_test.go b/pkg/scheduler/queue/queue_test.go index 47bd5295271..4b9561fc76d 100644 --- a/pkg/scheduler/queue/queue_test.go +++ b/pkg/scheduler/queue/queue_test.go @@ -285,6 +285,60 @@ func TestQueriersShouldGetHighPriorityQueryFirst(t *testing.T) { assert.Equal(t, highPriorityRequest, nextRequest) // high priority request returned, although it was enqueued the last } +func TestGetOrAddQueue_ShouldNotDeadlockWhenLimitsAreReduced(t *testing.T) { + // Setup: Large initial limit + initialLimit := 100 + newLimit := 50 + + limits := MockLimits{ + MaxOutstanding: initialLimit, + } + + // Initialize queues + q := newUserQueues(0, limits, nil) + + // Create user queue + userID := "test-user-deadlock" + queue := q.getOrAddQueue(userID, 1) + + // Fill queue to capacity (near initialLimit) + // We fill it more than newLimit + itemsToFill := 80 + for i := 0; i < itemsToFill; i++ { + queue.enqueueRequest(MockRequest{priority: 1}) + } + + require.Equal(t, itemsToFill, queue.length()) + + // Reduce limit below current size + // We change the mock limits return value. + // In real app this comes from runtime config reload. + limits.MaxOutstanding = newLimit + q.limits = limits // Update strict reference in queues struct (mocking the reload effect) + + // Now call getOrAddQueue again. + // This triggers the migration logic: existing queue (80 items) -> new queue (cap 50). + done := make(chan struct{}) + go func() { + _ = q.getOrAddQueue(userID, 1) + close(done) + }() + + select { + case <-done: + // Success: no deadlock + case <-time.After(2 * time.Second): + t.Fatal("Deadlock detected! getOrAddQueue timed out while migrating queue with reduced limits.") + } + + // The new queue should be capped at newLimit or contain what managed to fit. + // Logic: it breaks when full. So new queue should be full (length == newLimit). + newQueue := q.getOrAddQueue(userID, 1) // Should be fast now + + // Note: The actual items in queue should be newLimit (50). The rest (30) are dropped. + assert.Equal(t, newLimit, newQueue.length()) +} + func TestReservedQueriersShouldOnlyGetHighPriorityQueries(t *testing.T) { queue := NewRequestQueue(0, prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user", "priority", "type"}), diff --git a/pkg/scheduler/queue/user_queues.go b/pkg/scheduler/queue/user_queues.go index c7e30d87375..eaed52ad2e4 100644 --- a/pkg/scheduler/queue/user_queues.go +++ b/pkg/scheduler/queue/user_queues.go @@ -169,7 +169,7 @@ func (q *queues) getOrAddQueue(userID string, maxQueriers int) userRequestQueue tmpQueue := q.createUserRequestQueue(userID) // flush to new queue - for uq.queue.length() > 0 { + for (uq.queue.length() > 0) && (tmpQueue.length() < maxOutstanding) { tmpQueue.enqueueRequest(uq.queue.dequeueRequest(0, false)) } From 1d87cd83945cb96f040c2990a989f523b139eadd Mon Sep 17 00:00:00 2001 From: Kishore K G Date: Tue, 6 Jan 2026 16:21:01 +0000 Subject: [PATCH 2/7] remove patch file Signed-off-by: Kishore K G --- patch.diff | 78 ------------------------------------------------------ 1 file changed, 78 deletions(-) delete mode 100644 patch.diff diff --git a/patch.diff b/patch.diff deleted file mode 100644 index 2bab212d7bd..00000000000 --- a/patch.diff +++ /dev/null @@ -1,78 +0,0 @@ -diff --git a/pkg/scheduler/queue/queue_test.go b/pkg/scheduler/queue/queue_test.go -index 47bd5295271..4b9561fc76d 100644 ---- a/pkg/scheduler/queue/queue_test.go -+++ b/pkg/scheduler/queue/queue_test.go -@@ -285,6 +285,60 @@ func TestQueriersShouldGetHighPriorityQueryFirst(t *testing.T) { - assert.Equal(t, highPriorityRequest, nextRequest) // high priority request returned, although it was enqueued the last - } - -+func TestGetOrAddQueue_ShouldNotDeadlockWhenLimitsAreReduced(t *testing.T) { -+ // Setup: Large initial limit -+ initialLimit := 100 -+ newLimit := 50 -+ -+ limits := MockLimits{ -+ MaxOutstanding: initialLimit, -+ } -+ -+ // Initialize queues -+ q := newUserQueues(0, limits, nil) -+ -+ // Create user queue -+ userID := "test-user-deadlock" -+ queue := q.getOrAddQueue(userID, 1) -+ -+ // Fill queue to capacity (near initialLimit) -+ // We fill it more than newLimit -+ itemsToFill := 80 -+ for i := 0; i < itemsToFill; i++ { -+ queue.enqueueRequest(MockRequest{priority: 1}) -+ } -+ -+ require.Equal(t, itemsToFill, queue.length()) -+ -+ // Reduce limit below current size -+ // We change the mock limits return value. -+ // In real app this comes from runtime config reload. -+ limits.MaxOutstanding = newLimit -+ q.limits = limits // Update strict reference in queues struct (mocking the reload effect) -+ -+ // Now call getOrAddQueue again. -+ // This triggers the migration logic: existing queue (80 items) -> new queue (cap 50). -+ done := make(chan struct{}) -+ go func() { -+ _ = q.getOrAddQueue(userID, 1) -+ close(done) -+ }() -+ -+ select { -+ case <-done: -+ // Success: no deadlock -+ case <-time.After(2 * time.Second): -+ t.Fatal("Deadlock detected! getOrAddQueue timed out while migrating queue with reduced limits.") -+ } -+ -+ // The new queue should be capped at newLimit or contain what managed to fit. -+ // Logic: it breaks when full. So new queue should be full (length == newLimit). -+ newQueue := q.getOrAddQueue(userID, 1) // Should be fast now -+ -+ // Note: The actual items in queue should be newLimit (50). The rest (30) are dropped. -+ assert.Equal(t, newLimit, newQueue.length()) -+} -+ - func TestReservedQueriersShouldOnlyGetHighPriorityQueries(t *testing.T) { - queue := NewRequestQueue(0, - prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user", "priority", "type"}), -diff --git a/pkg/scheduler/queue/user_queues.go b/pkg/scheduler/queue/user_queues.go -index c7e30d87375..eaed52ad2e4 100644 ---- a/pkg/scheduler/queue/user_queues.go -+++ b/pkg/scheduler/queue/user_queues.go -@@ -169,7 +169,7 @@ func (q *queues) getOrAddQueue(userID string, maxQueriers int) userRequestQueue - tmpQueue := q.createUserRequestQueue(userID) - - // flush to new queue -- for uq.queue.length() > 0 { -+ for (uq.queue.length() > 0) && (tmpQueue.length() < maxOutstanding) { - tmpQueue.enqueueRequest(uq.queue.dequeueRequest(0, false)) - } - From 6594a58c0fb9a9e36c96309bad36e77a40432765 Mon Sep 17 00:00:00 2001 From: Kishore K G Date: Wed, 7 Jan 2026 06:09:14 +0000 Subject: [PATCH 3/7] fix lint Signed-off-by: Kishore K G --- pkg/scheduler/queue/queue_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/scheduler/queue/queue_test.go b/pkg/scheduler/queue/queue_test.go index 4b9561fc76d..665ead376e5 100644 --- a/pkg/scheduler/queue/queue_test.go +++ b/pkg/scheduler/queue/queue_test.go @@ -304,7 +304,7 @@ func TestGetOrAddQueue_ShouldNotDeadlockWhenLimitsAreReduced(t *testing.T) { // Fill queue to capacity (near initialLimit) // We fill it more than newLimit itemsToFill := 80 - for i := 0; i < itemsToFill; i++ { + for range itemsToFill { queue.enqueueRequest(MockRequest{priority: 1}) } From d014c5d948f6041de77400f6731eb39ab58f0454 Mon Sep 17 00:00:00 2001 From: Kishore K G Date: Thu, 8 Jan 2026 09:12:47 +0000 Subject: [PATCH 4/7] add comment and change log Signed-off-by: Kishore K G --- CHANGELOG.md | 1 + pkg/scheduler/queue/user_queues.go | 2 ++ 2 files changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index f110486d740..10bdc2e882d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2651,3 +2651,4 @@ This release has several exciting features, the most notable of them being setti * [FEATURE] You can specify "heap ballast" to reduce Go GC Churn #1489 * [BUGFIX] HA Tracker no longer always makes a request to Consul/Etcd when a request is not from the active replica #1516 * [BUGFIX] Queries are now correctly cancelled by the query-frontend #1508 +* [BUGFIX] If max_outstanding_requests_per_tenant value is updated to lesser value than the current number of requests in the queue, the excess requests (newest ones) will be dropped to prevent deadlocks. #7188 \ No newline at end of file diff --git a/pkg/scheduler/queue/user_queues.go b/pkg/scheduler/queue/user_queues.go index eaed52ad2e4..91d3de73a8a 100644 --- a/pkg/scheduler/queue/user_queues.go +++ b/pkg/scheduler/queue/user_queues.go @@ -169,6 +169,8 @@ func (q *queues) getOrAddQueue(userID string, maxQueriers int) userRequestQueue tmpQueue := q.createUserRequestQueue(userID) // flush to new queue + // If the new limit is lower than the current number of requests, + // the excess requests (newest ones) will be dropped to prevent deadlocks. for (uq.queue.length() > 0) && (tmpQueue.length() < maxOutstanding) { tmpQueue.enqueueRequest(uq.queue.dequeueRequest(0, false)) } From 7f3f3cdb23eac913c91dde8d74607e43e8baf07b Mon Sep 17 00:00:00 2001 From: Kishore K G Date: Thu, 8 Jan 2026 10:16:13 +0000 Subject: [PATCH 5/7] fix lint Signed-off-by: Kishore K G --- pkg/scheduler/queue/user_queues.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/scheduler/queue/user_queues.go b/pkg/scheduler/queue/user_queues.go index 91d3de73a8a..b96f5ab4876 100644 --- a/pkg/scheduler/queue/user_queues.go +++ b/pkg/scheduler/queue/user_queues.go @@ -169,8 +169,8 @@ func (q *queues) getOrAddQueue(userID string, maxQueriers int) userRequestQueue tmpQueue := q.createUserRequestQueue(userID) // flush to new queue - // If the new limit is lower than the current number of requests, - // the excess requests (newest ones) will be dropped to prevent deadlocks. + // If the new limit is lower than the current number of requests, + // the excess requests (newest ones) will be dropped to prevent deadlocks. for (uq.queue.length() > 0) && (tmpQueue.length() < maxOutstanding) { tmpQueue.enqueueRequest(uq.queue.dequeueRequest(0, false)) } From 1245011f10a877136219a3ec47c0628ee4133295 Mon Sep 17 00:00:00 2001 From: Kishore K G Date: Fri, 9 Jan 2026 04:50:11 +0000 Subject: [PATCH 6/7] move changelog to master /unreleased Signed-off-by: Kishore K G --- CHANGELOG.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 10bdc2e882d..2528f95929b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -34,6 +34,7 @@ * [BUGFIX] Scheduler: Fix memory leak by properly cleaning up query fragment registry. #7148 * [BUGFIX] Compactor: Add back deletion of partition group info file even if not complete #7157 * [BUGFIX] Query Frontend: Add Native Histogram extraction logic in results cache #7167 +* [BUGFIX] Query Scheduler: If max_outstanding_requests_per_tenant value is updated to lesser value than the current number of requests in the queue, the excess requests (newest ones) will be dropped to prevent deadlocks. #7188 ## 1.20.1 2025-12-03 @@ -2650,5 +2651,4 @@ This release has several exciting features, the most notable of them being setti * `ha-tracker.cluster` is now `distributor.ha-tracker.cluster` * [FEATURE] You can specify "heap ballast" to reduce Go GC Churn #1489 * [BUGFIX] HA Tracker no longer always makes a request to Consul/Etcd when a request is not from the active replica #1516 -* [BUGFIX] Queries are now correctly cancelled by the query-frontend #1508 -* [BUGFIX] If max_outstanding_requests_per_tenant value is updated to lesser value than the current number of requests in the queue, the excess requests (newest ones) will be dropped to prevent deadlocks. #7188 \ No newline at end of file +* [BUGFIX] Queries are now correctly cancelled by the query-frontend #1508 \ No newline at end of file From 188b0e2e72ce572ece363195fdcc31dbe0a3fb33 Mon Sep 17 00:00:00 2001 From: kishorekg1999 Date: Fri, 9 Jan 2026 10:22:25 +0530 Subject: [PATCH 7/7] Update CHANGELOG.md Signed-off-by: kishorekg1999 --- CHANGELOG.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 46ff6d792e6..382bf3983fa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -41,7 +41,6 @@ * [BUGFIX] Alertmanager: Fix alertmanager reloading bug that removes user template files #7196 * [BUGFIX] Query Scheduler: If max_outstanding_requests_per_tenant value is updated to lesser value than the current number of requests in the queue, the excess requests (newest ones) will be dropped to prevent deadlocks. #7188 - ## 1.20.1 2025-12-03 * [BUGFIX] Distributor: Fix panic on health check failure when using stream push. #7116 @@ -2657,4 +2656,4 @@ This release has several exciting features, the most notable of them being setti * `ha-tracker.cluster` is now `distributor.ha-tracker.cluster` * [FEATURE] You can specify "heap ballast" to reduce Go GC Churn #1489 * [BUGFIX] HA Tracker no longer always makes a request to Consul/Etcd when a request is not from the active replica #1516 -* [BUGFIX] Queries are now correctly cancelled by the query-frontend #1508 \ No newline at end of file +* [BUGFIX] Queries are now correctly cancelled by the query-frontend #1508