Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
* [BUGFIX] Compactor: Add back deletion of partition group info file even if not complete #7157
* [BUGFIX] Query Frontend: Add Native Histogram extraction logic in results cache #7167
* [BUGFIX] Alertmanager: Fix alertmanager reloading bug that removes user template files #7196
* [BUGFIX] Query Scheduler: If max_outstanding_requests_per_tenant value is updated to lesser value than the current number of requests in the queue, the excess requests (newest ones) will be dropped to prevent deadlocks. #7188

## 1.20.1 2025-12-03

Expand Down
54 changes: 54 additions & 0 deletions pkg/scheduler/queue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,60 @@ func TestQueriersShouldGetHighPriorityQueryFirst(t *testing.T) {
assert.Equal(t, highPriorityRequest, nextRequest) // high priority request returned, although it was enqueued the last
}

func TestGetOrAddQueue_ShouldNotDeadlockWhenLimitsAreReduced(t *testing.T) {
// Setup: Large initial limit
initialLimit := 100
newLimit := 50

limits := MockLimits{
MaxOutstanding: initialLimit,
}

// Initialize queues
q := newUserQueues(0, limits, nil)

// Create user queue
userID := "test-user-deadlock"
queue := q.getOrAddQueue(userID, 1)

// Fill queue to capacity (near initialLimit)
// We fill it more than newLimit
itemsToFill := 80
for range itemsToFill {
queue.enqueueRequest(MockRequest{priority: 1})
}

require.Equal(t, itemsToFill, queue.length())

// Reduce limit below current size
// We change the mock limits return value.
// In real app this comes from runtime config reload.
limits.MaxOutstanding = newLimit
q.limits = limits // Update strict reference in queues struct (mocking the reload effect)

// Now call getOrAddQueue again.
// This triggers the migration logic: existing queue (80 items) -> new queue (cap 50).
done := make(chan struct{})
go func() {
_ = q.getOrAddQueue(userID, 1)
close(done)
}()

select {
case <-done:
// Success: no deadlock
case <-time.After(2 * time.Second):
t.Fatal("Deadlock detected! getOrAddQueue timed out while migrating queue with reduced limits.")
}

// The new queue should be capped at newLimit or contain what managed to fit.
// Logic: it breaks when full. So new queue should be full (length == newLimit).
newQueue := q.getOrAddQueue(userID, 1) // Should be fast now

// Note: The actual items in queue should be newLimit (50). The rest (30) are dropped.
assert.Equal(t, newLimit, newQueue.length())
}

func TestReservedQueriersShouldOnlyGetHighPriorityQueries(t *testing.T) {
queue := NewRequestQueue(0,
prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user", "priority", "type"}),
Expand Down
4 changes: 3 additions & 1 deletion pkg/scheduler/queue/user_queues.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,9 @@ func (q *queues) getOrAddQueue(userID string, maxQueriers int) userRequestQueue
tmpQueue := q.createUserRequestQueue(userID)

// flush to new queue
for uq.queue.length() > 0 {
// If the new limit is lower than the current number of requests,
// the excess requests (newest ones) will be dropped to prevent deadlocks.
for (uq.queue.length() > 0) && (tmpQueue.length() < maxOutstanding) {
tmpQueue.enqueueRequest(uq.queue.dequeueRequest(0, false))
}

Expand Down
Loading