File tree Expand file tree Collapse file tree 5 files changed +21
-5
lines changed
distributed/kv_transfer/kv_connector/v1 Expand file tree Collapse file tree 5 files changed +21
-5
lines changed Original file line number Diff line number Diff line change @@ -355,3 +355,14 @@ def get_required_kvcache_layout(
355355 raise TypeError ("get_required_kvcache_layout should not be called "
356356 "on the abstract base class" )
357357 return None
358+
359+ def get_finished_count (self ) -> Optional [int ]:
360+ """
361+ Get the count of requests expected to complete send/receive operations
362+ via this connector.
363+
364+ Returns:
365+ int: expected sending or receiving completion count.
366+ """
367+
368+ return None
Original file line number Diff line number Diff line change 1313
1414import vllm .platforms
1515from vllm .config import VllmConfig
16+ from vllm .distributed .kv_transfer .kv_connector .utils import KVOutputAggregator
1617from vllm .logger import init_logger
1718from vllm .lora .request import LoRARequest
1819from vllm .model_executor .layers .sampler import SamplerOutput
@@ -54,6 +55,7 @@ def __init__(
5455 self ._init_executor ()
5556 self .is_sleeping = False
5657 self .sleeping_tags : set [str ] = set ()
58+ self .kv_output_aggregator = None
5759
5860 @abstractmethod
5961 def _init_executor (self ) -> None :
@@ -252,6 +254,11 @@ async def check_health_async(self) -> None:
252254 exception."""
253255 self .check_health ()
254256
257+ def init_kv_output_aggregator (self , finished_count : Optional [int ]) -> None :
258+ """Init KVOutputAggregator"""
259+ self .kv_output_aggregator = KVOutputAggregator (
260+ finished_count or self .parallel_config .world_size )
261+
255262
256263class DistributedExecutorBase (ExecutorBase ):
257264 """Abstract superclass of distributed executor implementations."""
Original file line number Diff line number Diff line change @@ -128,6 +128,9 @@ def __init__(self,
128128 log_stats = self .log_stats ,
129129 )
130130 self .use_spec_decode = vllm_config .speculative_config is not None
131+ if self .scheduler .connector is not None : # type: ignore
132+ self .model_executor .init_kv_output_aggregator (
133+ self .scheduler .connector .get_finished_count ()) # type: ignore
131134
132135 self .mm_registry = mm_registry = MULTIMODAL_REGISTRY
133136 self .mm_receiver_cache = engine_receiver_cache_from_config (
Original file line number Diff line number Diff line change 2626 destroy_model_parallel )
2727from vllm .distributed .device_communicators .shm_broadcast import (Handle ,
2828 MessageQueue )
29- from vllm .distributed .kv_transfer .kv_connector .utils import KVOutputAggregator
3029from vllm .distributed .parallel_state import (get_dp_group , get_ep_group ,
3130 get_pp_group , get_tp_group )
3231from vllm .executor .multiproc_worker_utils import (
@@ -135,8 +134,6 @@ def _init_executor(self) -> None:
135134
136135 self .output_rank = self ._get_output_rank ()
137136 self .has_connector = self .vllm_config .kv_transfer_config is not None
138- self .kv_output_aggregator = KVOutputAggregator (
139- self .parallel_config .world_size )
140137
141138 def start_worker_monitor (self ):
142139 workers = self .workers
Original file line number Diff line number Diff line change @@ -51,8 +51,6 @@ def _init_executor(self) -> None:
5151
5252 # KV connector setup
5353 self .has_connector = self .vllm_config .kv_transfer_config is not None
54- self .kv_output_aggregator = KVOutputAggregator (
55- self .parallel_config .world_size )
5654
5755 @property
5856 def max_concurrent_batches (self ) -> int :
You can’t perform that action at this time.
0 commit comments