Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,7 @@ profiling_results*

# Direnv
.envrc


# START Ruler Generated Files
# END Ruler Generated Files
83 changes: 65 additions & 18 deletions lib/kvbm/src/block_manager/vllm/connector/leader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ pub trait Leader: Send + Sync + std::fmt::Debug {
block_ids: Vec<BlockId>,
) -> anyhow::Result<bool>;

#[allow(dead_code)]
fn update_connector_output(&mut self) -> anyhow::Result<()>;

fn has_slot(&self, request_id: String) -> bool;

fn create_slot(&mut self, request: KvbmRequest, tokens: Vec<u32>) -> anyhow::Result<()>;
Expand Down Expand Up @@ -471,17 +474,42 @@ impl Leader for KvConnectorLeader {
}

for unscheduled_req in inflight_requests.iter() {
tracing::debug!("evaluating state of unscheduled request: {unscheduled_req}");
let shared_slot = self.slot_manager().get_slot(unscheduled_req)?;
let mut slot_guard = shared_slot
.lock()
.map_err(|e| anyhow::anyhow!("Failed to lock slot: {}", e))?;

let slot = slot_guard
.as_any_mut()
.downcast_mut::<VllmConnectorSlot>()
.ok_or_else(|| anyhow::anyhow!("Expected VllmConnectorSlot, got different type"))?;

slot.mark_as_skipped()?;
let state = slot_guard.state();
if state == SlotState::Finishing {
// re-evaluate the slot state, try to mark it as finished
tracing::debug!(
"slot {} was marked as finishing; checking if it can be marked as finished",
unscheduled_req
);
slot_guard.mark_as_finished(self.iteration_counter)?;

if let SlotState::Finished = slot_guard.state() {
// slot was marked as finished, so we can remove it
tracing::debug!("slot was marked as finished, so we can remove it");
self.slot_manager().remove_slot(unscheduled_req)?;
self.inflight_requests.remove(unscheduled_req);
} else {
tracing::debug!("slot was not marked as finished, so we cannot remove it");
}
} else {
// this deals with possible eviction scenarios or being in "paused" state
// we will want to more clearly evaluate how/why this can happens have an improved
// state management system in v2.
let slot = slot_guard
.as_any_mut()
.downcast_mut::<VllmConnectorSlot>()
.ok_or_else(|| {
anyhow::anyhow!("Expected VllmConnectorSlot, got different type")
})?;

slot.mark_as_skipped()?;
}
}

tracing::debug!("metadata: {md:#?}");
Expand All @@ -507,35 +535,54 @@ impl Leader for KvConnectorLeader {
// grab the slot
let shared_slot = self.slot_manager().get_slot(&request_id)?;

// mark the slot as finished
// Acquire lock BEFORE marking as finished
// This ensures we check state and prevent new operations from being created
let mut slot = shared_slot
.lock()
.map_err(|e| anyhow::anyhow!("Failed to lock slot: {}", e))?;
slot.mark_as_finished(self.iteration_counter)?;

// todo: allow the request to resolve when it should exit
// the request may have some outstanding operations
// we would like to inform it to shutdown, then have it signal to the work that is officially gone,
// then we can remove the slot and trigger the worker to clean up as well.

// remove the request from the inflight requests
self.inflight_requests.remove(&request_id);

// remove it from the manager as we will never use it again
self.slot_manager().remove_slot(&request_id)?;
// Mark the slot as finished (sets state to Finishing if there are operations,
// or Finished if all operations are complete)
slot.mark_as_finished(self.iteration_counter)?;

// if the slot has finished, we can return false to vllm, indicating all gpu blocks are free to be reused
// otherwise, we return true, which means there are still outstanding operations on gpu blocks which
// must be awaited before the gpu blocks can be reused. if we return true, then it is the worker side
// of the connector api which will be used to inform vllm that the request is finished.
if let SlotState::Finished = slot.state() {
// All operations complete - safe to remove slot and tell vLLM blocks are free
self.slot_manager().remove_slot(&request_id)?;
self.inflight_requests.remove(&request_id);
Ok(false)
} else {
debug_assert!(matches!(slot.state(), SlotState::Finishing));
// The slot still has pending operations - keep slot alive for worker to process
// Don't remove slot here. Worker needs it to process the finish event.
// Worker will remove it after verifying all operations are complete.
// The lock on the slot prevents new operations from being created in offload_blocks()
//
// We still need to clean this up leader side, but in v0.10, we don't have a strong
// signal on when the worker is finished. However, with the new methods in v0.11,
// update_connector_output, we can get a strong signal on when the worker is finished.
Ok(true)
}
}

// v0.11 of vllm adds this method; we have not yet implemented, nor do we have the signature of this
// function correctly defined yet.
//
// initially, like all new methods on the connector, the policy on when this is called is not well
// defined, so we will need to first evaluate how and when teh scheudler calls this function.
//
// in theory, this methods is the correct place to re-evaluate the ::Finishing -> ::Finished transition,
// for requests that could not be immediately marked as finished due to pending operations.
//
// once we have a clear picture, we should be able to migrate the logic in the `unscheduled_req` request
// loop to this method for slots in the ::Finishing state.
fn update_connector_output(&mut self) -> anyhow::Result<()> {
Ok(())
}

fn has_slot(&self, request_id: String) -> bool {
self.slot_manager().has_slot(&request_id)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,10 @@ impl Leader for KvConnectorLeaderRecorder {
Ok(output)
}

fn update_connector_output(&mut self) -> anyhow::Result<()> {
self.connector_leader.update_connector_output()
}

fn has_slot(&self, request_id: String) -> bool {
let input_copy = HasSlotInput {
request_id: request_id.clone(),
Expand Down
75 changes: 65 additions & 10 deletions lib/kvbm/src/block_manager/vllm/connector/leader/slot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -712,14 +712,35 @@ impl Slot for VllmConnectorSlot {
}

fn mark_as_finished(&mut self, _iteration: u64) -> Result<(), SlotError> {
self.state = SlotState::Finishing;
tracing::info!(
request_id = %self.request_id,
"request set to finish: cached_gpu_tokens: {}; cached_host_tokens: {}; cached_disk_tokens: {}",
self.tokens_cached_from_device,
self.tokens_cached_from_host,
self.tokens_cached_from_disk
);
// Check if there are any pending operations
let has_pending_ops = self
.pending_operations
.as_ref()
.map(|ops| !ops.is_empty())
.unwrap_or(false);

if has_pending_ops {
// There are pending operations - need to wait for them to complete
self.state = SlotState::Finishing;
tracing::debug!(
request_id = %self.request_id,
pending_operations = self.pending_operations.as_ref().unwrap().len(),
"request set to finish (with pending operations): cached_gpu_tokens: {}; cached_host_tokens: {}; cached_disk_tokens: {}",
self.tokens_cached_from_device,
self.tokens_cached_from_host,
self.tokens_cached_from_disk
);
} else {
// No pending operations - can immediately mark as finished
self.state = SlotState::Finished;
tracing::debug!(
request_id = %self.request_id,
"request set to finished (no pending operations): cached_gpu_tokens: {}; cached_host_tokens: {}; cached_disk_tokens: {}",
self.tokens_cached_from_device,
self.tokens_cached_from_host,
self.tokens_cached_from_disk
);
}
Ok(())
}

Expand Down Expand Up @@ -989,6 +1010,12 @@ impl VllmConnectorSlot {
block_ids: &[BlockId],
token_blocks: &[TokenBlock],
) -> Result<(), SlotError> {
// Check if slot is in Finishing state before creating operations
// If we're finishing, don't create new operations
if matches!(self.state, SlotState::Finishing | SlotState::Finished) {
return Ok(());
}

assert!(block_ids.len() == token_blocks.len());
let operation_id = uuid::Uuid::new_v4();

Expand Down Expand Up @@ -1173,8 +1200,8 @@ impl LocalTransferEngine {
task_token: CancellationToken,
kvbm_metrics: KvbmMetrics,
) -> anyhow::Result<()> {
let (onboard_tx, mut onboard_rx) = mpsc::unbounded_channel();
let (offload_tx, mut offload_rx) = mpsc::unbounded_channel();
let (onboard_tx, mut onboard_rx) = mpsc::unbounded_channel::<LocalOnboardRequest>();
let (offload_tx, mut offload_rx) = mpsc::unbounded_channel::<LocalOffloadRequest>();

// Clone resources needed for tasks
let block_manager_offload = self.block_manager.clone();
Expand Down Expand Up @@ -1212,6 +1239,10 @@ impl LocalTransferEngine {
tracing::debug!("LocalOffloadTask: received cancellation signal");
break;
}

let request_id = req.request_id.clone();
let operation_id = req.operation_id;

if let Err(e) = process_offload_request(
req,
&block_manager_offload,
Expand All @@ -1221,6 +1252,30 @@ impl LocalTransferEngine {
.await
{
tracing::error!("LocalOffloadTask: error processing request: {:?}", e);

// Create a fake/immediate transfer request that completes instantly.
// Otherwise, worker side might stuck and cause memory leak.
let fake_xfer = BlockTransferRequest {
from_pool: BlockTransferPool::Device, // Use valid Device->Host transfer type
to_pool: BlockTransferPool::Host, // (offload path, but no blocks)
blocks: vec![], // Empty - nothing to transfer
connector_req: Some(LeaderTransferRequest {
request_id: request_id.clone(),
uuid: operation_id,
requirement: None,
request_type: RequestType::Immediate, // Immediate = completes instantly
}),
};

match leader_offload.transfer_blocks_request(fake_xfer).await {
Ok(notify_receiver) => {
// Wait for the fake transfer to "complete" (should be instant)
let _ = notify_receiver.await;
}
Err(_xfer_err) => {
// Failed to create completion notification - error already logged above
}
}
}
}
Ok(())
Expand Down
Loading