From 5dd5b1181fb1e78abe9ca64af40b0b02eea9c72b Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 30 May 2026 03:24:00 +0000 Subject: [PATCH 1/3] Initial plan From 84c4d705db757526810c48e925a30200f65c5729 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 30 May 2026 03:27:43 +0000 Subject: [PATCH 2/3] Tighten multiprocess adapter type annotations --- .../v1/multiprocess/adapter_connector/base.py | 35 ++++++++++++------- .../multiprocess/adapter_connector/pickle.py | 15 ++++---- .../v1/multiprocess/adapter_connector/shm.py | 14 +++++--- .../test_non_cuda_data_transfer.py | 9 ++--- 4 files changed, 45 insertions(+), 28 deletions(-) diff --git a/lmcache/v1/multiprocess/adapter_connector/base.py b/lmcache/v1/multiprocess/adapter_connector/base.py index 531ec968104..701db9831ad 100644 --- a/lmcache/v1/multiprocess/adapter_connector/base.py +++ b/lmcache/v1/multiprocess/adapter_connector/base.py @@ -15,7 +15,7 @@ # Standard from abc import ABC, abstractmethod from dataclasses import dataclass -from typing import Any, cast +from typing import TYPE_CHECKING, cast # Third Party import torch @@ -24,6 +24,13 @@ from lmcache.logging import init_logger from lmcache.utils import EngineType from lmcache.v1.distributed.api import MemoryLayoutDesc +from lmcache.v1.gpu_connector.utils import LayoutHints +from lmcache.v1.multiprocess.custom_types import IPCCacheEngineKey +from lmcache.v1.multiprocess.mq import MessageQueueClient + +if TYPE_CHECKING: + # First Party + import lmcache.c_ops as lmc_ops logger = init_logger(__name__) @@ -59,7 +66,7 @@ class NonGpuContext(ABC): def __init__( self, metadata: NonGpuContextMetadata, - mq_client: Any, + mq_client: MessageQueueClient, mq_timeout: float, ) -> None: self.metadata = metadata @@ -73,7 +80,7 @@ def layout_desc(self) -> MemoryLayoutDesc: @abstractmethod def prepare_store( - self, key: Any, instance_id: int + self, key: IPCCacheEngineKey, instance_id: int ) -> tuple[list[torch.Tensor], list[int]] | None: """Prepare SHM buffers for a store operation. @@ -95,18 +102,20 @@ def prepare_store( @abstractmethod def commit_store( - self, key: Any, instance_id: int, chunks: list[torch.Tensor] + self, key: IPCCacheEngineKey, instance_id: int, chunks: list[torch.Tensor] ) -> bool: """Commit store. Pickle: serialize and send. Shm: notify server.""" ... @abstractmethod - def prepare_retrieve(self, key: Any, instance_id: int) -> list[torch.Tensor] | None: + def prepare_retrieve( + self, key: IPCCacheEngineKey, instance_id: int + ) -> list[torch.Tensor] | None: """Prepare retrieve. Returns chunks or shm views, or None on miss.""" ... @abstractmethod - def commit_retrieve(self, key: Any, instance_id: int) -> bool: + def commit_retrieve(self, key: IPCCacheEngineKey, instance_id: int) -> bool: """Commit retrieve. Pickle: no-op. Shm: release read locks.""" ... @@ -118,7 +127,7 @@ def close(self) -> None: def create_non_gpu_context( metadata: NonGpuContextMetadata, - mq_client: Any, + mq_client: MessageQueueClient, mq_timeout: float, shm_name: str = "", pool_size: int = 0, @@ -175,8 +184,8 @@ def create_non_gpu_context( def compute_kv_layout( kv_caches: dict[str, torch.Tensor], - layout_hints: Any | None = None, -) -> tuple[int, int, int, str, Any]: + layout_hints: LayoutHints | None = None, +) -> tuple[int, int, int, str, "lmc_ops.GPUKVFormat"]: """Compute KV layout metadata from KV tensors. Args: @@ -216,8 +225,8 @@ def gather_paged_kv_to_cpu( kv_caches: dict[str, torch.Tensor], block_ids: list[int], blocks_per_chunk: int, - layout_hints: Any | None = None, - gpu_kv_format: Any | None = None, + layout_hints: LayoutHints | None = None, + gpu_kv_format: "lmc_ops.GPUKVFormat" | None = None, out: list[torch.Tensor] | None = None, chunk_indices: list[int] | None = None, ) -> list[torch.Tensor]: @@ -360,8 +369,8 @@ def scatter_cpu_to_paged_kv( chunks: list[torch.Tensor], blocks_per_chunk: int, skip_first_n_tokens: int = 0, - layout_hints: Any | None = None, - gpu_kv_format: Any | None = None, + layout_hints: LayoutHints | None = None, + gpu_kv_format: "lmc_ops.GPUKVFormat" | None = None, ) -> None: """Scatter CPU chunk tensors back into paged KV tensors. diff --git a/lmcache/v1/multiprocess/adapter_connector/pickle.py b/lmcache/v1/multiprocess/adapter_connector/pickle.py index a8843b0a109..e419307c93b 100644 --- a/lmcache/v1/multiprocess/adapter_connector/pickle.py +++ b/lmcache/v1/multiprocess/adapter_connector/pickle.py @@ -2,7 +2,6 @@ """Pickle-based NonGpuContext implementation for multiprocess mode.""" # Standard -from typing import Any import pickle # Third Party @@ -13,6 +12,8 @@ NonGpuContext, NonGpuContextMetadata, ) +from lmcache.v1.multiprocess.custom_types import IPCCacheEngineKey +from lmcache.v1.multiprocess.mq import MessageQueueClient from lmcache.v1.multiprocess.protocol import RequestType, get_response_class @@ -31,13 +32,13 @@ class NonGpuContextPickle(NonGpuContext): def __init__( self, metadata: NonGpuContextMetadata, - mq_client: Any, + mq_client: MessageQueueClient, mq_timeout: float, ) -> None: super().__init__(metadata, mq_client, mq_timeout) def prepare_store( - self, key: Any, instance_id: int + self, key: IPCCacheEngineKey, instance_id: int ) -> tuple[list[torch.Tensor], list[int]] | None: """Send PREPARE_STORE RPC. For pickle, returns no pre-allocated buffers.""" future = self.mq_client.submit_request( @@ -52,7 +53,7 @@ def prepare_store( return None def commit_store( - self, key: Any, instance_id: int, chunks: list[torch.Tensor] + self, key: IPCCacheEngineKey, instance_id: int, chunks: list[torch.Tensor] ) -> bool: """Serialize chunks and send via COMMIT_STORE. @@ -70,7 +71,9 @@ def commit_store( except TimeoutError: return False - def prepare_retrieve(self, key: Any, instance_id: int) -> list[torch.Tensor] | None: + def prepare_retrieve( + self, key: IPCCacheEngineKey, instance_id: int + ) -> list[torch.Tensor] | None: """Send PREPARE_RETRIEVE and deserialize the response data. Returns: @@ -90,7 +93,7 @@ def prepare_retrieve(self, key: Any, instance_id: int) -> list[torch.Tensor] | N chunks: list[torch.Tensor] = pickle.loads(response.data) return chunks - def commit_retrieve(self, key: Any, instance_id: int) -> bool: + def commit_retrieve(self, key: IPCCacheEngineKey, instance_id: int) -> bool: """Send COMMIT_RETRIEVE (no-op for pickle path).""" future = self.mq_client.submit_request( RequestType.COMMIT_RETRIEVE, diff --git a/lmcache/v1/multiprocess/adapter_connector/shm.py b/lmcache/v1/multiprocess/adapter_connector/shm.py index 18528e0901d..dc1e08b4fec 100644 --- a/lmcache/v1/multiprocess/adapter_connector/shm.py +++ b/lmcache/v1/multiprocess/adapter_connector/shm.py @@ -15,6 +15,8 @@ NonGpuContext, NonGpuContextMetadata, ) +from lmcache.v1.multiprocess.custom_types import IPCCacheEngineKey +from lmcache.v1.multiprocess.mq import MessageQueueClient from lmcache.v1.multiprocess.protocol import RequestType, get_response_class @@ -77,7 +79,7 @@ class NonGpuContextShm(NonGpuContext): def __init__( self, metadata: NonGpuContextMetadata, - mq_client: Any, + mq_client: MessageQueueClient, mq_timeout: float, shm_name: str, pool_size: int, @@ -141,7 +143,7 @@ def _build_slot_tensors(self, slots: list[dict[str, Any]]) -> list[torch.Tensor] ] def prepare_store( - self, key: Any, instance_id: int + self, key: IPCCacheEngineKey, instance_id: int ) -> tuple[list[torch.Tensor], list[int]] | None: future = self.mq_client.submit_request( RequestType.PREPARE_STORE, @@ -166,7 +168,7 @@ def prepare_store( return self._build_slot_tensors(slots), chunk_indices def commit_store( - self, key: Any, instance_id: int, _chunks: list[torch.Tensor] + self, key: IPCCacheEngineKey, instance_id: int, _chunks: list[torch.Tensor] ) -> bool: future = self.mq_client.submit_request( RequestType.COMMIT_STORE, @@ -178,7 +180,9 @@ def commit_store( except TimeoutError: return False - def prepare_retrieve(self, key: Any, instance_id: int) -> list[torch.Tensor] | None: + def prepare_retrieve( + self, key: IPCCacheEngineKey, instance_id: int + ) -> list[torch.Tensor] | None: future = self.mq_client.submit_request( RequestType.PREPARE_RETRIEVE, [key, instance_id], @@ -193,7 +197,7 @@ def prepare_retrieve(self, key: Any, instance_id: int) -> list[torch.Tensor] | N slots = response.context.get("slots", []) return self._build_slot_tensors(slots) if slots else None - def commit_retrieve(self, key: Any, instance_id: int) -> bool: + def commit_retrieve(self, key: IPCCacheEngineKey, instance_id: int) -> bool: future = self.mq_client.submit_request( RequestType.COMMIT_RETRIEVE, [key, instance_id], diff --git a/tests/v1/multiprocess/test_non_cuda_data_transfer.py b/tests/v1/multiprocess/test_non_cuda_data_transfer.py index f619a2cd2d7..d885a52d7b5 100644 --- a/tests/v1/multiprocess/test_non_cuda_data_transfer.py +++ b/tests/v1/multiprocess/test_non_cuda_data_transfer.py @@ -982,21 +982,22 @@ def _submit_request(req_type, payload, response_cls): # noqa: ARG001 pool_size=4096, ) try: - store_result = context.prepare_store(key="k", instance_id=1) + key = _default_key() + store_result = context.prepare_store(key=key, instance_id=1) assert store_result is not None store_views, _ = store_result store_views[0].copy_( torch.tensor([[1.0, 2.0], [3.0, 4.0]], dtype=torch.float32) ) - assert context.commit_store("k", 1, store_views) + assert context.commit_store(key, 1, store_views) - retrieve_views = context.prepare_retrieve(key="k", instance_id=1) + retrieve_views = context.prepare_retrieve(key=key, instance_id=1) assert retrieve_views is not None assert torch.equal( retrieve_views[0], torch.tensor([[1.0, 2.0], [3.0, 4.0]], dtype=torch.float32), ) - assert context.commit_retrieve("k", 1) + assert context.commit_retrieve(key, 1) finally: context.close() if os.path.exists(shm_path): From 990884488db536fd966360fb669814fc60049f2d Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 30 May 2026 03:36:54 +0000 Subject: [PATCH 3/3] Revert GPUKVFormat annotations back to Any --- lmcache/v1/multiprocess/adapter_connector/base.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/lmcache/v1/multiprocess/adapter_connector/base.py b/lmcache/v1/multiprocess/adapter_connector/base.py index 701db9831ad..803e7c122a2 100644 --- a/lmcache/v1/multiprocess/adapter_connector/base.py +++ b/lmcache/v1/multiprocess/adapter_connector/base.py @@ -15,7 +15,7 @@ # Standard from abc import ABC, abstractmethod from dataclasses import dataclass -from typing import TYPE_CHECKING, cast +from typing import Any, cast # Third Party import torch @@ -28,10 +28,6 @@ from lmcache.v1.multiprocess.custom_types import IPCCacheEngineKey from lmcache.v1.multiprocess.mq import MessageQueueClient -if TYPE_CHECKING: - # First Party - import lmcache.c_ops as lmc_ops - logger = init_logger(__name__) @@ -185,7 +181,7 @@ def create_non_gpu_context( def compute_kv_layout( kv_caches: dict[str, torch.Tensor], layout_hints: LayoutHints | None = None, -) -> tuple[int, int, int, str, "lmc_ops.GPUKVFormat"]: +) -> tuple[int, int, int, str, Any]: """Compute KV layout metadata from KV tensors. Args: @@ -226,7 +222,7 @@ def gather_paged_kv_to_cpu( block_ids: list[int], blocks_per_chunk: int, layout_hints: LayoutHints | None = None, - gpu_kv_format: "lmc_ops.GPUKVFormat" | None = None, + gpu_kv_format: Any | None = None, out: list[torch.Tensor] | None = None, chunk_indices: list[int] | None = None, ) -> list[torch.Tensor]: @@ -370,7 +366,7 @@ def scatter_cpu_to_paged_kv( blocks_per_chunk: int, skip_first_n_tokens: int = 0, layout_hints: LayoutHints | None = None, - gpu_kv_format: "lmc_ops.GPUKVFormat" | None = None, + gpu_kv_format: Any | None = None, ) -> None: """Scatter CPU chunk tensors back into paged KV tensors.