From 520ee5f0c3349d4bd36de77755673c28936bdb68 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 20 May 2026 06:32:30 +0000 Subject: [PATCH 1/4] Initial plan From 7c642290a00729a982604127ba9abdc13c165b69 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 20 May 2026 06:43:10 +0000 Subject: [PATCH 2/4] Fix SHM non-GPU transport idempotency, locking, and cleanup issues Agent-Logs-Url: https://github.com/hlin99/LMCache/sessions/661cbeee-d0d4-40ef-9312-4044e4696a51 Co-authored-by: hlin99 <73271530+hlin99@users.noreply.github.com> --- lmcache/v1/distributed/memory_manager.py | 12 -- .../v1/multiprocess/non_gpu_context_shm.py | 26 +++- lmcache/v1/multiprocess/server.py | 89 ++++++----- tests/v1/distributed/test_shm_l1_pool.py | 64 ++++++++ .../v1/multiprocess/test_non_cuda_context.py | 139 ++++++++++++++++++ 5 files changed, 276 insertions(+), 54 deletions(-) diff --git a/lmcache/v1/distributed/memory_manager.py b/lmcache/v1/distributed/memory_manager.py index b571113e316..5cbede5c057 100644 --- a/lmcache/v1/distributed/memory_manager.py +++ b/lmcache/v1/distributed/memory_manager.py @@ -20,18 +20,6 @@ logger = init_logger(__name__) -# HELPER FUNCTIONS -def _check_shm_capacity(required_bytes: int) -> bool: - """Return whether ``/dev/shm`` has enough free space.""" - if required_bytes <= 0: - return True - try: - free_bytes = shutil.disk_usage("/dev/shm").free - except OSError: - return False - return free_bytes >= required_bytes - - def _unlink_stale_shm(shm_name: str) -> None: """Remove a stale LMCache shm segment if it exists.""" normalized = shm_name.lstrip("/") diff --git a/lmcache/v1/multiprocess/non_gpu_context_shm.py b/lmcache/v1/multiprocess/non_gpu_context_shm.py index acbe1a5a0a6..42f616c3933 100644 --- a/lmcache/v1/multiprocess/non_gpu_context_shm.py +++ b/lmcache/v1/multiprocess/non_gpu_context_shm.py @@ -35,13 +35,15 @@ def __init__( self._shm_name = shm_name self._pool_size = pool_size shm_path = os.path.join("/dev/shm", shm_name.lstrip("/")) - shm_fd = os.open(shm_path, os.O_RDWR) + self._shm_fd = os.open(shm_path, os.O_RDWR) try: self._mmap_obj = mmap.mmap( - shm_fd, self._pool_size, access=mmap.ACCESS_WRITE + self._shm_fd, self._pool_size, access=mmap.ACCESS_WRITE ) - finally: - os.close(shm_fd) + except Exception: + os.close(self._shm_fd) + self._shm_fd = -1 + raise def _make_tensor_view( self, @@ -84,7 +86,12 @@ def prepare_store(self, key: Any, instance_id: int) -> list[torch.Tensor] | None response = future.result(timeout=self.mq_timeout) except TimeoutError: return None - slots = response.context.get("slots", []) + context = response.context if isinstance(response.context, dict) else {} + slots = context.get("slots") + if not slots: + return None + if not isinstance(slots, list): + return None return self._build_slot_tensors(slots) if slots else None def commit_store( @@ -127,4 +134,11 @@ def commit_retrieve(self, key: Any, instance_id: int) -> bool: return False def close(self) -> None: - self._mmap_obj.close() + if self._shm_fd < 0: + return + try: + self._mmap_obj.close() + finally: + fd = self._shm_fd + self._shm_fd = -1 + os.close(fd) diff --git a/lmcache/v1/multiprocess/server.py b/lmcache/v1/multiprocess/server.py index e306da208dc..5c0787ffceb 100644 --- a/lmcache/v1/multiprocess/server.py +++ b/lmcache/v1/multiprocess/server.py @@ -260,8 +260,14 @@ def __init__( # for crash resilience (e.g., client calls lookup but never queries) self._prefetch_jobs: dict[str, _PrefetchJob] = {} self._prefetch_job_lock = threading.Lock() - self._pending_shm_writes: dict[tuple[object, ...], list[ObjectKey]] = {} - self._pending_shm_reads: dict[tuple[object, ...], list[ObjectKey]] = {} + self._pending_shm_writes: dict[ + tuple[int, IPCCacheEngineKey], list[ObjectKey] + ] = {} + self._pending_shm_reads: dict[ + tuple[int, IPCCacheEngineKey], list[ObjectKey] + ] = {} + self._pending_shm_lock = threading.Lock() + self._shm_active = False self._setup_metrics() @@ -341,12 +347,23 @@ def unregister_kv_cache(self, instance_id: int) -> None: torch_dev.empty_cache() else: logger.info("Unregistered non-CUDA context for instance ID %d", instance_id) - self._pending_shm_writes = { - k: v for k, v in self._pending_shm_writes.items() if k[0] != instance_id - } - self._pending_shm_reads = { - k: v for k, v in self._pending_shm_reads.items() if k[0] != instance_id - } + with self._pending_shm_lock: + stale_writes = { + k: v for k, v in self._pending_shm_writes.items() if k[0] == instance_id + } + for transfer_key in stale_writes: + del self._pending_shm_writes[transfer_key] + stale_reads = { + k: v for k, v in self._pending_shm_reads.items() if k[0] == instance_id + } + for transfer_key in stale_reads: + del self._pending_shm_reads[transfer_key] + for reserved_keys in stale_writes.values(): + if reserved_keys: + self.storage_manager.finish_write(reserved_keys) + for prefetched_keys in stale_reads.values(): + if prefetched_keys: + self.storage_manager.finish_read_prefetched(prefetched_keys) def register_kv_cache_non_gpu_context( self, @@ -397,17 +414,21 @@ def register_kv_cache_non_gpu_context( ) shm_pool_info = self.storage_manager.get_shm_pool_info() if not isinstance(shm_pool_info, dict): + self._shm_active = False logger.info( "Instance %s non-GPU context using pickle transport " "(no SHM pool info returned)", payload.instance_id, ) return RegisterNonGpuContextResponse() + shm_name = str(shm_pool_info.get("shm_name", "")) + pool_size = int(shm_pool_info.get("pool_size", 0)) + self._shm_active = bool(shm_name) and pool_size > 0 response = RegisterNonGpuContextResponse( - shm_name=str(shm_pool_info.get("shm_name", "")), - pool_size=int(shm_pool_info.get("pool_size", 0)), + shm_name=shm_name, + pool_size=pool_size, ) - if response.shm_name and response.pool_size > 0: + if self._shm_active: logger.info( "Instance %s non-GPU context using SHM transport " "(shm_name=%s, pool_size=%d)", @@ -425,28 +446,12 @@ def register_kv_cache_non_gpu_context( @staticmethod def _make_non_gpu_transfer_key( key: IPCCacheEngineKey, instance_id: int - ) -> tuple[object, ...]: + ) -> tuple[int, IPCCacheEngineKey]: """Build a unique key for pending SHM write/read transfer tracking.""" - return ( - instance_id, - key.model_name, - key.world_size, - key.worker_id, - key.token_ids, - key.start, - key.end, - key.request_id, - key.cache_salt, - ) + return (instance_id, key) def _is_shm_active(self) -> bool: - shm_pool_info = self.storage_manager.get_shm_pool_info() - if not isinstance(shm_pool_info, dict): - return False - return ( - bool(shm_pool_info.get("shm_name")) - and int(shm_pool_info.get("pool_size", 0)) > 0 - ) + return self._shm_active def _resolve_obj_keys(self, key: IPCCacheEngineKey) -> list[ObjectKey]: """Resolve object keys from an IPC cache key. @@ -512,8 +517,11 @@ def prepare_store( } ) reserved_keys.append(obj_key) + if not reserved_keys: + return PrepareStoreResponse(context={}) transfer_key = self._make_non_gpu_transfer_key(key, instance_id) - self._pending_shm_writes[transfer_key] = reserved_keys + with self._pending_shm_lock: + self._pending_shm_writes[transfer_key] = reserved_keys return PrepareStoreResponse(context={"slots": slots}) @_lmcache_nvtx_annotate @@ -537,9 +545,10 @@ def commit_store( """ if cpu_data == b"" and self._is_shm_active(): transfer_key = self._make_non_gpu_transfer_key(key, instance_id) - reserved_keys = self._pending_shm_writes.pop(transfer_key, []) + with self._pending_shm_lock: + reserved_keys = self._pending_shm_writes.pop(transfer_key, []) if not reserved_keys: - return False + return True self.storage_manager.finish_write(reserved_keys) return True @@ -601,10 +610,16 @@ def prepare_retrieve( ) if self._is_shm_active(): + # Precondition: lookup stage already acquired read locks for obj_keys, + # so unsafe_read is valid here and avoids duplicate lock operations. shm_prefetched_keys, shm_memory_objs = self.storage_manager.unsafe_read( obj_keys ) - if not shm_memory_objs or len(shm_prefetched_keys) != len(obj_keys): + if ( + not shm_memory_objs + or len(shm_prefetched_keys) != len(obj_keys) + or len(shm_memory_objs) != len(obj_keys) + ): if shm_prefetched_keys: self.storage_manager.finish_read_prefetched(shm_prefetched_keys) return PrepareRetrieveResponse(success=False, data=b"", context={}) @@ -622,7 +637,8 @@ def prepare_retrieve( } ) transfer_key = self._make_non_gpu_transfer_key(key, instance_id) - self._pending_shm_reads[transfer_key] = shm_prefetched_keys + with self._pending_shm_lock: + self._pending_shm_reads[transfer_key] = shm_prefetched_keys return PrepareRetrieveResponse( success=True, data=b"", context={"slots": slots} ) @@ -665,7 +681,8 @@ def commit_retrieve( """ if self._is_shm_active(): transfer_key = self._make_non_gpu_transfer_key(key, instance_id) - prefetched_keys = self._pending_shm_reads.pop(transfer_key, []) + with self._pending_shm_lock: + prefetched_keys = self._pending_shm_reads.pop(transfer_key, []) if prefetched_keys: self.storage_manager.finish_read_prefetched(prefetched_keys) return True diff --git a/tests/v1/distributed/test_shm_l1_pool.py b/tests/v1/distributed/test_shm_l1_pool.py index 3ac3102cf0d..a52c6f21e86 100644 --- a/tests/v1/distributed/test_shm_l1_pool.py +++ b/tests/v1/distributed/test_shm_l1_pool.py @@ -6,6 +6,7 @@ import os # Third Party +import pytest import torch # First Party @@ -14,6 +15,8 @@ from lmcache.v1.distributed.memory_manager import create_memory_allocator from lmcache.v1.memory_management import MixedMemoryAllocator from lmcache.v1.multiprocess.non_gpu_context import NonGpuContextMetadata +from lmcache.v1.multiprocess.non_gpu_context import create_non_gpu_context +from lmcache.v1.multiprocess.non_gpu_context_pickle import NonGpuContextPickle from lmcache.v1.multiprocess.non_gpu_context_shm import NonGpuContextShm from lmcache.v1.multiprocess.protocol import RequestType from lmcache.v1.multiprocess.protocols.engine import ( @@ -159,3 +162,64 @@ def _submit_request(req_type, payload, response_cls): # noqa: ARG001 context.close() if os.path.exists(shm_path): os.unlink(shm_path) + + +def test_non_gpu_context_shm_init_raises_when_segment_missing() -> None: + with pytest.raises(FileNotFoundError): + NonGpuContextShm( + metadata=NonGpuContextMetadata( + layout_desc=MemoryLayoutDesc( + shapes=[torch.Size([2, 2])], + dtypes=[torch.float32], + ), + block_size=1, + use_mla=False, + ), + mq_client=MagicMock(), + mq_timeout=1.0, + shm_name="lmcache_missing_shm_segment", + pool_size=4096, + ) + + +def test_create_non_gpu_context_falls_back_to_pickle_without_shm_info() -> None: + context = create_non_gpu_context( + metadata=NonGpuContextMetadata( + layout_desc=MemoryLayoutDesc( + shapes=[torch.Size([2, 2])], + dtypes=[torch.float32], + ), + block_size=1, + use_mla=False, + ), + mq_client=MagicMock(), + mq_timeout=1.0, + shm_name="", + pool_size=0, + ) + assert isinstance(context, NonGpuContextPickle) + + +def test_non_gpu_context_shm_close_is_idempotent() -> None: + shm_name = f"lmcache_test_close_{os.getpid()}" + shm_path = _create_shm_file(shm_name, 4096) + try: + context = NonGpuContextShm( + metadata=NonGpuContextMetadata( + layout_desc=MemoryLayoutDesc( + shapes=[torch.Size([2, 2])], + dtypes=[torch.float32], + ), + block_size=1, + use_mla=False, + ), + mq_client=MagicMock(), + mq_timeout=1.0, + shm_name=shm_name, + pool_size=4096, + ) + context.close() + context.close() + finally: + if os.path.exists(shm_path): + os.unlink(shm_path) diff --git a/tests/v1/multiprocess/test_non_cuda_context.py b/tests/v1/multiprocess/test_non_cuda_context.py index 5da7dc47aca..0e83fea62b4 100644 --- a/tests/v1/multiprocess/test_non_cuda_context.py +++ b/tests/v1/multiprocess/test_non_cuda_context.py @@ -459,3 +459,142 @@ def _read_prefetched_results(_keys: Any) -> Any: recovered_chunks: list[torch.Tensor] = pickle.loads(cpu_data) assert len(recovered_chunks) == 1 assert torch.allclose(recovered_chunks[0], payload) + + +def test_server_shm_store_idempotent_on_duplicate_key( + stub_native_storage_ops: Any, +) -> None: + """Ensure SHM prepare/commit treats duplicate keys as idempotent success.""" + # First Party + from lmcache.v1.multiprocess.custom_types import ( + IPCCacheEngineKey, + RegisterNonGpuContextPayload, + ) + from lmcache.v1.multiprocess.server import MPCacheEngine + + mock_storage = MagicMock() + mock_storage.get_shm_pool_info.return_value = { + "shm_name": "lmcache_l1_pool_test", + "pool_size": 4096, + } + mock_storage.reserve_write.return_value = {} + mock_session = MagicMock() + mock_session.get_hashes.return_value = [b"h"] + + with ( + patch( + "lmcache.v1.multiprocess.server.StorageManager", + return_value=mock_storage, + ), + patch("lmcache.v1.multiprocess.server.TokenHasher"), + patch("lmcache.v1.multiprocess.server.SessionManager") as session_cls, + patch("lmcache.v1.multiprocess.server.get_event_bus"), + patch( + "lmcache.v1.multiprocess.server.ipc_key_to_object_keys", + return_value=["obj"], + ), + ): + session_cls.return_value.get_or_create.return_value = mock_session + engine = MPCacheEngine(storage_manager_config=MagicMock(), chunk_size=8) + + engine.register_kv_cache_non_gpu_context( + RegisterNonGpuContextPayload( + instance_id=3, + model_name="m", + world_size=1, + block_size=4, + num_layers=2, + hidden_dim_size=16, + dtype_str="float32", + use_mla=False, + ) + ) + key = IPCCacheEngineKey.from_token_ids( + "m", + 1, + 0, + [1] * 8, + start=0, + end=8, + request_id="req", + ) + response = engine.prepare_store(key, 3) + assert response.context == {} + assert engine.commit_store(key, 3, b"") is True + + +def test_server_unregister_non_gpu_context_releases_pending_shm_locks( + stub_native_storage_ops: Any, +) -> None: + """Ensure unregister releases pending SHM read/write reservations.""" + # First Party + from lmcache.v1.multiprocess.custom_types import ( + IPCCacheEngineKey, + RegisterNonGpuContextPayload, + ) + from lmcache.v1.multiprocess.server import MPCacheEngine + + mock_storage = MagicMock() + mock_storage.get_shm_pool_info.return_value = { + "shm_name": "lmcache_l1_pool_test", + "pool_size": 4096, + } + mock_memory_obj = MagicMock() + mock_memory_obj.tensor = torch.zeros(2, 2, 8, 16) + mock_memory_obj.shm_offset = 0 + mock_memory_obj.shm_byte_length = 2048 + mock_storage.reserve_write.side_effect = ( + lambda obj_keys, *_args, **_kwargs: { + obj_key: mock_memory_obj for obj_key in obj_keys + } + ) + mock_storage.unsafe_read.side_effect = ( + lambda obj_keys: (obj_keys, [mock_memory_obj for _ in obj_keys]) + ) + mock_session = MagicMock() + mock_session.get_hashes.return_value = [b"h"] + + with ( + patch( + "lmcache.v1.multiprocess.server.StorageManager", + return_value=mock_storage, + ), + patch("lmcache.v1.multiprocess.server.TokenHasher"), + patch("lmcache.v1.multiprocess.server.SessionManager") as session_cls, + patch("lmcache.v1.multiprocess.server.get_event_bus"), + patch( + "lmcache.v1.multiprocess.server.ipc_key_to_object_keys", + return_value=["obj"], + ), + ): + session_cls.return_value.get_or_create.return_value = mock_session + engine = MPCacheEngine(storage_manager_config=MagicMock(), chunk_size=8) + + engine.register_kv_cache_non_gpu_context( + RegisterNonGpuContextPayload( + instance_id=4, + model_name="m", + world_size=1, + block_size=4, + num_layers=2, + hidden_dim_size=16, + dtype_str="float32", + use_mla=False, + ) + ) + key = IPCCacheEngineKey.from_token_ids( + "m", + 1, + 0, + [1] * 8, + start=0, + end=8, + request_id="req", + ) + assert engine.prepare_store(key, 4).context.get("slots") + assert engine.prepare_retrieve(key, 4).success is True + + engine.unregister_kv_cache(4) + + mock_storage.finish_write.assert_called_once() + mock_storage.finish_read_prefetched.assert_called_once() From 8ae9bc4f69f52f4143709a5497c874ef5e76c7ba Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 20 May 2026 06:48:07 +0000 Subject: [PATCH 3/4] Polish SHM feedback fixes and align validation comments Agent-Logs-Url: https://github.com/hlin99/LMCache/sessions/661cbeee-d0d4-40ef-9312-4044e4696a51 Co-authored-by: hlin99 <73271530+hlin99@users.noreply.github.com> --- lmcache/v1/multiprocess/non_gpu_context_shm.py | 15 ++++++++------- lmcache/v1/multiprocess/server.py | 10 ++++++++-- tests/v1/distributed/test_shm_l1_pool.py | 8 +++++--- 3 files changed, 21 insertions(+), 12 deletions(-) diff --git a/lmcache/v1/multiprocess/non_gpu_context_shm.py b/lmcache/v1/multiprocess/non_gpu_context_shm.py index 42f616c3933..cfa7c58748c 100644 --- a/lmcache/v1/multiprocess/non_gpu_context_shm.py +++ b/lmcache/v1/multiprocess/non_gpu_context_shm.py @@ -16,6 +16,8 @@ ) from lmcache.v1.multiprocess.protocol import RequestType, get_response_class +INVALID_SHM_FD = -1 + class NonGpuContextShm(NonGpuContext): """Shared-memory implementation of :class:`NonGpuContext`.""" @@ -34,6 +36,7 @@ def __init__( self._shm_name = shm_name self._pool_size = pool_size + self._shm_fd = INVALID_SHM_FD shm_path = os.path.join("/dev/shm", shm_name.lstrip("/")) self._shm_fd = os.open(shm_path, os.O_RDWR) try: @@ -42,7 +45,7 @@ def __init__( ) except Exception: os.close(self._shm_fd) - self._shm_fd = -1 + self._shm_fd = INVALID_SHM_FD raise def _make_tensor_view( @@ -88,11 +91,9 @@ def prepare_store(self, key: Any, instance_id: int) -> list[torch.Tensor] | None return None context = response.context if isinstance(response.context, dict) else {} slots = context.get("slots") - if not slots: - return None - if not isinstance(slots, list): + if not isinstance(slots, list) or not slots: return None - return self._build_slot_tensors(slots) if slots else None + return self._build_slot_tensors(slots) def commit_store( self, key: Any, instance_id: int, _chunks: list[torch.Tensor] @@ -134,11 +135,11 @@ def commit_retrieve(self, key: Any, instance_id: int) -> bool: return False def close(self) -> None: - if self._shm_fd < 0: + if self._shm_fd == INVALID_SHM_FD: return try: self._mmap_obj.close() finally: fd = self._shm_fd - self._shm_fd = -1 + self._shm_fd = INVALID_SHM_FD os.close(fd) diff --git a/lmcache/v1/multiprocess/server.py b/lmcache/v1/multiprocess/server.py index 5c0787ffceb..8a67c6d9317 100644 --- a/lmcache/v1/multiprocess/server.py +++ b/lmcache/v1/multiprocess/server.py @@ -260,6 +260,9 @@ def __init__( # for crash resilience (e.g., client calls lookup but never queries) self._prefetch_jobs: dict[str, _PrefetchJob] = {} self._prefetch_job_lock = threading.Lock() + # Pending SHM transfer tracking, keyed by (instance_id, IPC key). + # IPCCacheEngineKey is a frozen dataclass and hashable, so it is safe + # and efficient for dict lookups across pending transfer tracking. self._pending_shm_writes: dict[ tuple[int, IPCCacheEngineKey], list[ObjectKey] ] = {} @@ -548,6 +551,9 @@ def commit_store( with self._pending_shm_lock: reserved_keys = self._pending_shm_writes.pop(transfer_key, []) if not reserved_keys: + # Idempotent SHM semantics: duplicate store or missing prepare + # after a prior successful store should still be treated as + # success because key data already exists in storage. return True self.storage_manager.finish_write(reserved_keys) return True @@ -610,8 +616,8 @@ def prepare_retrieve( ) if self._is_shm_active(): - # Precondition: lookup stage already acquired read locks for obj_keys, - # so unsafe_read is valid here and avoids duplicate lock operations. + # Precondition: read locks for these keys were acquired during the + # lookup/prefetch phase before this retrieve call. shm_prefetched_keys, shm_memory_objs = self.storage_manager.unsafe_read( obj_keys ) diff --git a/tests/v1/distributed/test_shm_l1_pool.py b/tests/v1/distributed/test_shm_l1_pool.py index a52c6f21e86..7919214f85d 100644 --- a/tests/v1/distributed/test_shm_l1_pool.py +++ b/tests/v1/distributed/test_shm_l1_pool.py @@ -14,8 +14,10 @@ from lmcache.v1.distributed.config import L1MemoryManagerConfig from lmcache.v1.distributed.memory_manager import create_memory_allocator from lmcache.v1.memory_management import MixedMemoryAllocator -from lmcache.v1.multiprocess.non_gpu_context import NonGpuContextMetadata -from lmcache.v1.multiprocess.non_gpu_context import create_non_gpu_context +from lmcache.v1.multiprocess.non_gpu_context import ( + NonGpuContextMetadata, + create_non_gpu_context, +) from lmcache.v1.multiprocess.non_gpu_context_pickle import NonGpuContextPickle from lmcache.v1.multiprocess.non_gpu_context_shm import NonGpuContextShm from lmcache.v1.multiprocess.protocol import RequestType @@ -165,7 +167,7 @@ def _submit_request(req_type, payload, response_cls): # noqa: ARG001 def test_non_gpu_context_shm_init_raises_when_segment_missing() -> None: - with pytest.raises(FileNotFoundError): + with pytest.raises(FileNotFoundError, match="No such file or directory"): NonGpuContextShm( metadata=NonGpuContextMetadata( layout_desc=MemoryLayoutDesc( From 338abb746689cc53abc095822fbb43a245e1b3a1 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 20 May 2026 07:10:50 +0000 Subject: [PATCH 4/4] Fix memory leak: early-return from prepare_store when all keys exist When reserve_write returns empty (all object keys already cached), return PrepareStoreResponse(context={}) immediately without storing an entry in _pending_shm_writes. This prevents leaked entries that would never be popped since the worker won't call commit_store. Agent-Logs-Url: https://github.com/hlin99/LMCache/sessions/182111d5-1737-49c0-be65-0287d5b9d6c5 Co-authored-by: hlin99 <73271530+hlin99@users.noreply.github.com> --- lmcache/v1/multiprocess/server.py | 2 ++ tests/v1/multiprocess/test_non_cuda_context.py | 15 ++++++--------- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/lmcache/v1/multiprocess/server.py b/lmcache/v1/multiprocess/server.py index 64e862c5b89..794fa7d79b5 100644 --- a/lmcache/v1/multiprocess/server.py +++ b/lmcache/v1/multiprocess/server.py @@ -520,6 +520,8 @@ def prepare_store( } ) reserved_keys.append(obj_key) + if not reserved_keys: + return PrepareStoreResponse(context={}) transfer_key = self._make_non_gpu_transfer_key(key, instance_id) with self._pending_shm_lock: self._pending_shm_writes[transfer_key] = reserved_keys diff --git a/tests/v1/multiprocess/test_non_cuda_context.py b/tests/v1/multiprocess/test_non_cuda_context.py index fe991614003..05fb83def6a 100644 --- a/tests/v1/multiprocess/test_non_cuda_context.py +++ b/tests/v1/multiprocess/test_non_cuda_context.py @@ -467,9 +467,9 @@ def test_server_shm_commit_store_allows_noop_when_all_keys_exist( """Regression: repeated prompt after worker restart should no-op-store cleanly. When all object keys already exist in cache, SHM ``prepare_store`` reserves - no new objects and returns empty slots. ``commit_store`` must still succeed - as a valid no-op for that prepared transfer, but fail without a matching - prepare state. + no new objects and returns empty context (no "slots" key). The worker sees + no slots and does not call ``commit_store``, so no entry leaks in + ``_pending_shm_writes``. """ # First Party from lmcache.v1.multiprocess.custom_types import ( @@ -526,13 +526,10 @@ def test_server_shm_commit_store_allows_noop_when_all_keys_exist( request_id="req", ) prepare_response = engine.prepare_store(key, 3) - assert prepare_response.context["slots"] == [] + # Empty context means no slots reserved — worker won't call commit_store. + assert prepare_response.context == {} - store_ok = engine.commit_store(key, 3, b"") - assert store_ok is True - mock_storage.finish_write.assert_not_called() - - # A second commit without a matching prepare must fail. + # commit_store without a matching prepare must fail (no entry leaked). assert engine.commit_store(key, 3, b"") is False