Skip to content

Commit bf56652

Browse files
committed
[BugFix] Eagerly abort final-step requests
Signed-off-by: Nick Hill <[email protected]>
1 parent d1f7392 commit bf56652

File tree

1 file changed

+21
-3
lines changed

1 file changed

+21
-3
lines changed

vllm/v1/engine/core.py

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,8 @@ def __init__(
204204
)
205205
self.async_scheduling = vllm_config.scheduler_config.async_scheduling
206206

207+
self.aborts_queue = queue.Queue[list[str]]()
208+
207209
# Mark the startup heap as static so that it's ignored by GC.
208210
# Reduces pause times of oldest generation collections.
209211
freeze_gc_heap()
@@ -347,6 +349,8 @@ def step(self) -> tuple[dict[int, EngineCoreOutputs], bool]:
347349
if model_output is None:
348350
model_output = self.model_executor.sample_tokens(grammar_output)
349351

352+
# Ensure we handle aborts which happened during the model execution.
353+
self._process_aborts_queue()
350354
engine_core_outputs = self.scheduler.update_from_output(
351355
scheduler_output, model_output
352356
)
@@ -440,6 +444,8 @@ def step_with_batch_queue(
440444
with self.log_error_detail(scheduler_output):
441445
model_output = future.result()
442446

447+
# Ensure we handle aborts which happened during the model execution.
448+
self._process_aborts_queue()
443449
engine_core_outputs = self.scheduler.update_from_output(
444450
scheduler_output, model_output
445451
)
@@ -458,6 +464,10 @@ def step_with_batch_queue(
458464

459465
return engine_core_outputs, model_executed
460466

467+
def _process_aborts_queue(self):
468+
while not self.aborts_queue.empty():
469+
self.abort_requests(self.aborts_queue.get_nowait())
470+
461471
def shutdown(self):
462472
self.structured_output_manager.clear_backend()
463473
if self.model_executor:
@@ -871,9 +881,13 @@ def _process_input_queue(self):
871881
and not self.scheduler.has_requests()
872882
and not self.batch_queue
873883
):
874-
if logger.isEnabledFor(DEBUG) and self.input_queue.empty():
875-
logger.debug("EngineCore waiting for work.")
876-
waited = True
884+
if self.input_queue.empty():
885+
# Drain aborts queue; all aborts are also processed via input_queue.
886+
with self.aborts_queue.mutex:
887+
self.aborts_queue.queue.clear()
888+
if logger.isEnabledFor(DEBUG):
889+
logger.debug("EngineCore waiting for work.")
890+
waited = True
877891
req = self.input_queue.get()
878892
self._handle_client_request(*req)
879893

@@ -1027,6 +1041,10 @@ def process_input_sockets(
10271041
else:
10281042
request = generic_decoder.decode(data_frames)
10291043

1044+
if request_type == EngineCoreRequestType.ABORT:
1045+
# Aborts are added to *both* queues.
1046+
self.aborts_queue.put_nowait(request)
1047+
10301048
# Push to input queue for core busy loop.
10311049
self.input_queue.put_nowait((request_type, request))
10321050

0 commit comments

Comments
 (0)