Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 38 additions & 1 deletion tests/v1/core/test_scheduler.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import dataclasses
from unittest.mock import Mock
from unittest.mock import Mock, patch

import pytest
import torch
Expand Down Expand Up @@ -62,6 +62,43 @@ def test_finish_request():
assert len(scheduler.waiting) == 9 - i


def test_abort_request_with_kv_connector():
# `use_kv_connector=True` will expose a kv_connector to the scheduler, but
# we will need to mimick the delay_freed since the default kv_connector is
# too simple
scheduler = create_scheduler(use_kv_connector=True)
requests = create_requests(num_requests=10)
for request in requests:
scheduler.add_request(request)

with patch.object(
scheduler,
"_connector_finished",
side_effect=lambda req: (
req.status == RequestStatus.FINISHED_LENGTH_CAPPED,
{"fake_kv_params": False},
),
):
for i, request in enumerate(requests):
scheduler.finish_requests(
request.request_id, RequestStatus.FINISHED_LENGTH_CAPPED
)
assert request.request_id in scheduler.requests # since delayed
assert len(scheduler.waiting) == 9 - i

assert not scheduler.waiting and not scheduler.running
assert len(scheduler.requests) == 10

for i, request in enumerate(requests):
scheduler.finish_requests(
request.request_id, RequestStatus.FINISHED_ABORTED
)
assert request.request_id not in scheduler.requests # since aborted

assert not scheduler.waiting and not scheduler.running
assert not scheduler.requests

Comment on lines +65 to +100
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This is a great test for the new force-abort logic. To align with the suggestion to handle all 'properly finished' statuses in scheduler.py, I recommend parameterizing this test to run for both RequestStatus.FINISHED_LENGTH_CAPPED and RequestStatus.FINISHED_STOPPED. This will ensure the fix is robust and covers all scenarios where a request might be subject to delayed free.

Here's an example of how you could structure the parameterized test:

@pytest.mark.parametrize(
    "finish_status",
    [RequestStatus.FINISHED_LENGTH_CAPPED, RequestStatus.FINISHED_STOPPED],
)
def test_abort_request_with_kv_connector(finish_status):
    # ... setup ...
    with patch.object(
        scheduler,
        "_connector_finished",
        side_effect=lambda req: (
            req.status == finish_status,
            {"fake_kv_params": False},
        ),
    ):
        # ... first loop ...
        scheduler.finish_requests(
            request.request_id, finish_status
        )
        # ... assertions ...

        # ... second loop ...


def test_get_num_unfinished_requests():
scheduler = create_scheduler()
requests = create_requests(num_requests=10)
Expand Down
29 changes: 27 additions & 2 deletions vllm/v1/core/sched/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1326,11 +1326,31 @@ def finish_requests(
waiting_requests_to_remove = []
valid_requests = []

# this is only required only if we have a kv connector
should_force_abort = (
finished_status == RequestStatus.FINISHED_ABORTED
and self.get_kv_connector() is not None
)
forced_aborted_requests = []
Comment on lines +1329 to +1334
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have you tried to set the timeout to a much lower value? if I understand correctly, this fix is freeing requests asap, which should be similar. Not sure if this is optimal in the case D is hogged as well

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another todo we have is to propagate the deadline from P to D to make smaller timeouts "safe". Otherwise D could pull invalid blocks in this case. This should be a pretty simple change


# First pass: collect requests to remove from queues
for req_id in request_ids:
request = self.requests.get(req_id)
if request is None or request.is_finished():
# Invalid request ID.
if request is None:
continue # Invalid request ID.
elif request.is_finished():
if (
should_force_abort
and request.status == RequestStatus.FINISHED_LENGTH_CAPPED
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The current logic to force an abort on a request that might be subject to delayed free only considers requests with the status FINISHED_LENGTH_CAPPED. However, a request that is FINISHED_STOPPED could also be considered 'finished properly' and be eligible for delayed free by a KV connector. To make the fix more robust and cover all such cases, it would be better to handle both FINISHED_LENGTH_CAPPED and FINISHED_STOPPED statuses. This would also require updating the new test case to cover this additional status.

Suggested change
and request.status == RequestStatus.FINISHED_LENGTH_CAPPED
and request.status in (RequestStatus.FINISHED_LENGTH_CAPPED, RequestStatus.FINISHED_STOPPED)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

talked offline about this, not really a case for P (it outputs just one token)

):
# we need to force the status to FINISHED_ABORTED to avoid
# the request being delayed freed. The kv_connector will
# delay the free if it the status is FINISHED_LENGTH_CAPPED
logger.info(
"Request %s is finished but will get forced aborted.",
req_id,
)
forced_aborted_requests.append(request)
continue

valid_requests.append(request)
Expand All @@ -1350,6 +1370,11 @@ def finish_requests(
request.status = finished_status
self._free_request(request)

# Free the requests that are being delayed
for request in forced_aborted_requests:
request.status = finished_status
self._free_request(request)

def _free_request(self, request: Request) -> dict[str, Any] | None:
assert request.is_finished()

Expand Down
Loading