Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion custom_ops/gpu_ops/swap_cache_batch.cu
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ void SwapCacheAllLayers(
const std::vector<int64_t>& swap_block_ids_cpu,
int rank,
int mode) {
checkCudaErrors(cudaSetDevice(rank)); // used for distributed launch
assert(cache_gpu_tensors.size() > 0 &&
cache_gpu_tensors.size() == cache_cpu_ptrs.size());
switch (cache_gpu_tensors[0].dtype()) {
Expand Down
400 changes: 400 additions & 0 deletions custom_ops/gpu_ops/swap_cache_optimized.cu

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions custom_ops/setup_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@ def find_end_files(directory, end_str):
"gpu_ops/swap_cache_batch.cu",
"gpu_ops/swap_cache.cu",
"gpu_ops/swap_cache_layout.cu",
"gpu_ops/swap_cache_optimized.cu", # 新增:优化的 KV cache 换入算子
"gpu_ops/step_system_cache.cu",
"gpu_ops/cpp_extensions.cc",
"gpu_ops/share_external_data.cu",
Expand Down
22 changes: 22 additions & 0 deletions fastdeploy/cache_manager/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@

try:
if current_platform.is_cuda():
from fastdeploy.model_executor.ops.gpu import (
swap_cache_per_layer, # 单层 KV cache 换入算子(同步)
)
from fastdeploy.model_executor.ops.gpu import (
swap_cache_per_layer_async, # 单层 KV cache 换入算子(异步,无强制 sync)
)
from fastdeploy.model_executor.ops.gpu import (
cuda_host_alloc,
cuda_host_free,
Expand All @@ -43,6 +49,12 @@ def get_peer_mem_addr(*args, **kwargs):
raise RuntimeError("CUDA no need of get_peer_mem_addr!")

elif current_platform.is_maca():
from fastdeploy.model_executor.ops.gpu import (
swap_cache_per_layer, # 单层 KV cache 换入算子(同步)
)
from fastdeploy.model_executor.ops.gpu import (
swap_cache_per_layer_async, # 单层 KV cache 换入算子(异步,无强制 sync)
)
from fastdeploy.model_executor.ops.gpu import ( # get_output_kv_signal,; ipc_sent_key_value_cache_by_remote_ptr_block_sync,
cuda_host_alloc,
cuda_host_free,
Expand Down Expand Up @@ -89,6 +101,12 @@ def ipc_sent_key_value_cache_by_remote_ptr(*args, **kwargs):
def ipc_sent_key_value_cache_by_remote_ptr_block_sync(*args, **kwargs):
raise RuntimeError("XPU No ipc_sent_key_value_cache_by_remote_ptr UNIMPLENENTED")

def swap_cache_per_layer(*args, **kwargs): # 单层 KV cache 换入算子(同步)
raise RuntimeError("XPU swap_cache_per_layer UNIMPLENENTED")

def swap_cache_per_layer_async(*args, **kwargs): # 单层 KV cache 换入算子(异步)
raise RuntimeError("XPU swap_cache_per_layer_async UNIMPLENENTED")

else:
raise RuntimeError("Prefix cache ops only supported CUDA nor XPU platform ")

Expand Down Expand Up @@ -128,6 +146,8 @@ def get_all_visible_devices():
set_data_ipc = None
share_external_data_ = None
swap_cache_all_layers = None
swap_cache_per_layer = None # 单层 KV cache 换入算子(同步)
swap_cache_per_layer_async = None # 单层 KV cache 换入算子(异步)
unset_data_ipc = None
set_device = None
memory_allocated = None
Expand All @@ -146,6 +166,8 @@ def get_all_visible_devices():
"set_data_ipc",
"share_external_data_",
"swap_cache_all_layers",
"swap_cache_per_layer", # 单层 KV cache 换入算子(同步)
"swap_cache_per_layer_async", # 单层 KV cache 换入算子(异步,无强制 sync)
"unset_data_ipc", # XPU是 None
"set_device",
"memory_allocated",
Expand Down
72 changes: 72 additions & 0 deletions fastdeploy/cache_manager/v1/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
"""
Cache Manager V1 - Multi-level KV Cache Management System

This module provides a three-level cache hierarchy:
- Device (GPU) → Host (CPU) → Storage

Key components:
- KVCacheBase: Abstract base class defining common interface
- CacheManager: Scheduler-side cache management with block pools
- CacheController: Worker-side cache control for transfer operations
- CacheTransferManager: Manages cache transfer operations
- LayerDoneCounter: Tracks layer-by-layer transfer completion
- create_storage_scheduler: Factory function to create StorageScheduler
- create_storage_connector: Factory function to create StorageConnector
- create_transfer_connector: Factory function to create TransferConnector
"""

from .base import KVCacheBase
from .cache_controller import CacheController
from .cache_manager import CacheManager
from .cache_utils import LayerDoneCounter, LayerSwapTimeoutError
from .metadata import (
AsyncTaskHandler,
BlockNode,
CacheBlockMetadata,
CacheStatus,
MatchResult,
PDTransferMetadata,
StorageConfig,
StorageMetadata,
StorageType,
TransferConfig,
TransferResult,
TransferStatus,
TransferTask,
TransferType,
)
from .storage import create_storage_connector, create_storage_scheduler
from .transfer import create_transfer_connector
from .transfer_manager import CacheTransferManager

__all__ = [
# Base classes
"KVCacheBase",
# Managers
"CacheManager",
"CacheController",
"CacheTransferManager",
# Exceptions
"LayerSwapTimeoutError",
# Utils
"LayerDoneCounter",
# Metadata
"CacheBlockMetadata",
"BlockNode",
"CacheStatus",
"TransferTask",
"TransferStatus",
"TransferConfig",
"TransferResult",
"AsyncTaskHandler",
"MatchResult",
"StorageMetadata",
"PDTransferMetadata",
"StorageConfig",
"StorageType",
"TransferType",
# Factory functions
"create_storage_scheduler",
"create_storage_connector",
"create_transfer_connector",
]
71 changes: 71 additions & 0 deletions fastdeploy/cache_manager/v1/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
"""
KVCacheBase - Abstract base class for KV cache management

Defines the common interface that both CacheManager (Scheduler) and
CacheController (Worker) must implement.
"""

from abc import ABC, abstractmethod
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from fastdeploy.config import FDConfig


class KVCacheBase(ABC):
"""
Abstract base class for KV cache management.

This class defines the common interface for cache management operations.
Subclasses (CacheManager and CacheController) implement specific behaviors
based on their roles in the system.

CacheManager (Scheduler process):
- Manages DeviceBlockPool and HostBlockPool
- Handles block allocation and release
- Coordinates storage operations via StorageScheduler

CacheController (Worker process):
- Manages cache transfer operations
- Handles layer-by-layer transfer synchronization
- Coordinates cross-node transfer via TransferConnector
"""

def __init__(self, config: "FDConfig"):
"""
Initialize the KV cache base.

Args:
config: FDConfig instance containing all fastdeploy configuration
"""
self.config = config

# Extract configuration from FDConfig
self.model_config = config.model_config
self.cache_config = config.cache_config
self.quant_config = config.quant_config
self.parallel_config = config.parallel_config

self._initialized = False

@abstractmethod
def reset_cache(self) -> bool:
"""
Reset the cache state.

This method should be implemented by subclasses to reset their
specific cache state (e.g., clear block pools, reset transfer state).

Returns:
True if reset was successful, False otherwise
"""
pass

def is_initialized(self) -> bool:
"""
Check if the cache has been initialized.

Returns:
True if initialized, False otherwise
"""
return self._initialized
Loading
Loading