diff --git a/CHANGELOG.md b/CHANGELOG.md index 7a1aeef8f7e..382bf3983fa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -39,6 +39,7 @@ * [BUGFIX] Compactor: Add back deletion of partition group info file even if not complete #7157 * [BUGFIX] Query Frontend: Add Native Histogram extraction logic in results cache #7167 * [BUGFIX] Alertmanager: Fix alertmanager reloading bug that removes user template files #7196 +* [BUGFIX] Query Scheduler: If max_outstanding_requests_per_tenant value is updated to lesser value than the current number of requests in the queue, the excess requests (newest ones) will be dropped to prevent deadlocks. #7188 ## 1.20.1 2025-12-03 diff --git a/pkg/scheduler/queue/queue_test.go b/pkg/scheduler/queue/queue_test.go index 464c0e3e238..de98453978f 100644 --- a/pkg/scheduler/queue/queue_test.go +++ b/pkg/scheduler/queue/queue_test.go @@ -283,6 +283,60 @@ func TestQueriersShouldGetHighPriorityQueryFirst(t *testing.T) { assert.Equal(t, highPriorityRequest, nextRequest) // high priority request returned, although it was enqueued the last } +func TestGetOrAddQueue_ShouldNotDeadlockWhenLimitsAreReduced(t *testing.T) { + // Setup: Large initial limit + initialLimit := 100 + newLimit := 50 + + limits := MockLimits{ + MaxOutstanding: initialLimit, + } + + // Initialize queues + q := newUserQueues(0, limits, nil) + + // Create user queue + userID := "test-user-deadlock" + queue := q.getOrAddQueue(userID, 1) + + // Fill queue to capacity (near initialLimit) + // We fill it more than newLimit + itemsToFill := 80 + for range itemsToFill { + queue.enqueueRequest(MockRequest{priority: 1}) + } + + require.Equal(t, itemsToFill, queue.length()) + + // Reduce limit below current size + // We change the mock limits return value. + // In real app this comes from runtime config reload. + limits.MaxOutstanding = newLimit + q.limits = limits // Update strict reference in queues struct (mocking the reload effect) + + // Now call getOrAddQueue again. + // This triggers the migration logic: existing queue (80 items) -> new queue (cap 50). + done := make(chan struct{}) + go func() { + _ = q.getOrAddQueue(userID, 1) + close(done) + }() + + select { + case <-done: + // Success: no deadlock + case <-time.After(2 * time.Second): + t.Fatal("Deadlock detected! getOrAddQueue timed out while migrating queue with reduced limits.") + } + + // The new queue should be capped at newLimit or contain what managed to fit. + // Logic: it breaks when full. So new queue should be full (length == newLimit). + newQueue := q.getOrAddQueue(userID, 1) // Should be fast now + + // Note: The actual items in queue should be newLimit (50). The rest (30) are dropped. + assert.Equal(t, newLimit, newQueue.length()) +} + func TestReservedQueriersShouldOnlyGetHighPriorityQueries(t *testing.T) { queue := NewRequestQueue(0, prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user", "priority", "type"}), diff --git a/pkg/scheduler/queue/user_queues.go b/pkg/scheduler/queue/user_queues.go index c7e30d87375..b96f5ab4876 100644 --- a/pkg/scheduler/queue/user_queues.go +++ b/pkg/scheduler/queue/user_queues.go @@ -169,7 +169,9 @@ func (q *queues) getOrAddQueue(userID string, maxQueriers int) userRequestQueue tmpQueue := q.createUserRequestQueue(userID) // flush to new queue - for uq.queue.length() > 0 { + // If the new limit is lower than the current number of requests, + // the excess requests (newest ones) will be dropped to prevent deadlocks. + for (uq.queue.length() > 0) && (tmpQueue.length() < maxOutstanding) { tmpQueue.enqueueRequest(uq.queue.dequeueRequest(0, false)) }