diff --git a/lmcache/v1/distributed/memory_manager.py b/lmcache/v1/distributed/memory_manager.py index 048dee3d246..ca28fb1ab2f 100644 --- a/lmcache/v1/distributed/memory_manager.py +++ b/lmcache/v1/distributed/memory_manager.py @@ -1,8 +1,9 @@ # SPDX-License-Identifier: Apache-2.0 # Standard -import os +from multiprocessing import shared_memory import shutil +import sys # First Party from lmcache.logging import init_logger @@ -28,13 +29,16 @@ def _unlink_stale_shm(shm_name: str) -> None: return if not normalized.startswith("lmcache_l1_pool_"): return - shm_path = os.path.join("/dev/shm", normalized) try: - os.unlink(shm_path) + shm = shared_memory.SharedMemory(name=normalized, create=False) + shm.close() + shm.unlink() except FileNotFoundError: return except OSError: - logger.warning("Failed to remove stale shm segment %s", shm_path, exc_info=True) + logger.warning( + "Failed to remove stale shm segment %s", normalized, exc_info=True + ) def create_memory_allocator(config: L1MemoryManagerConfig) -> MemoryAllocatorInterface: @@ -73,12 +77,16 @@ def create_memory_allocator(config: L1MemoryManagerConfig) -> MemoryAllocatorInt if not bare.startswith("lmcache_l1_pool_"): shm_name = f"lmcache_l1_pool_{bare}" try: - free_bytes = shutil.disk_usage("/dev/shm").free - if free_bytes < config.size_in_bytes: - raise RuntimeError( - "insufficient /dev/shm capacity: " - f"need {config.size_in_bytes} bytes, have {free_bytes} bytes" - ) + # /dev/shm capacity is only meaningful on Linux, where POSIX shm + # is backed by a tmpfs mount with a bounded free-space view. + if sys.platform.startswith("linux"): + free_bytes = shutil.disk_usage("/dev/shm").free + if free_bytes < config.size_in_bytes: + raise RuntimeError( + "insufficient /dev/shm capacity: " + f"need {config.size_in_bytes} bytes, " + f"have {free_bytes} bytes" + ) _unlink_stale_shm(shm_name) return MixedMemoryAllocator( config.size_in_bytes, diff --git a/lmcache/v1/multiprocess/non_gpu_context.py b/lmcache/v1/multiprocess/non_gpu_context.py index 4017c1700e1..9fac3ea3ad9 100644 --- a/lmcache/v1/multiprocess/non_gpu_context.py +++ b/lmcache/v1/multiprocess/non_gpu_context.py @@ -120,6 +120,8 @@ def create_non_gpu_context( Returns SHM-based implementation when shared-memory pool information is available; otherwise falls back to the pickle-based implementation. + If SHM initialization fails for any reason (e.g. segment not found, + permission error), gracefully falls back to pickle transport. Args: metadata: Layout metadata for the non-GPU context. @@ -135,12 +137,20 @@ def create_non_gpu_context( # Local from .non_gpu_context_shm import NonGpuContextShm - logger.info( - "Creating NonGpuContextShm (shm_name=%s, pool_size=%d)", - shm_name, - pool_size, - ) - return NonGpuContextShm(metadata, mq_client, mq_timeout, shm_name, pool_size) + try: + logger.info( + "Creating NonGpuContextShm (shm_name=%s, pool_size=%d)", + shm_name, + pool_size, + ) + return NonGpuContextShm(metadata, mq_client, mq_timeout, shm_name, pool_size) + except Exception: + logger.warning( + "Failed to initialize SHM context (shm_name=%s), " + "falling back to pickle transport", + shm_name, + exc_info=True, + ) # Local from .non_gpu_context_pickle import NonGpuContextPickle diff --git a/lmcache/v1/multiprocess/non_gpu_context_shm.py b/lmcache/v1/multiprocess/non_gpu_context_shm.py index d42a8031eba..b52c62956e9 100644 --- a/lmcache/v1/multiprocess/non_gpu_context_shm.py +++ b/lmcache/v1/multiprocess/non_gpu_context_shm.py @@ -2,9 +2,9 @@ """Shared-memory NonGpuContext implementation for multiprocess mode.""" # Standard +from multiprocessing import shared_memory +from multiprocessing.resource_tracker import unregister from typing import Any -import mmap -import os # Third Party import torch @@ -16,8 +16,6 @@ ) from lmcache.v1.multiprocess.protocol import RequestType, get_response_class -INVALID_SHM_FD = -1 - class NonGpuContextShm(NonGpuContext): """Shared-memory implementation of :class:`NonGpuContext`.""" @@ -36,16 +34,20 @@ def __init__( self._shm_name = shm_name self._pool_size = pool_size - self._shm_fd = INVALID_SHM_FD - shm_path = os.path.join("/dev/shm", shm_name.lstrip("/")) - self._shm_fd = os.open(shm_path, os.O_RDWR) + self._shm: shared_memory.SharedMemory | None = None + self._shm_buffer: memoryview | None = None try: - self._mmap_obj = mmap.mmap( - self._shm_fd, self._pool_size, access=mmap.ACCESS_WRITE + self._shm = shared_memory.SharedMemory( + name=shm_name.lstrip("/"), create=False ) + # The SHM segment is owned by the server process. Unregister it + # from this worker's resource tracker so that Python does not + # unlink the segment when this worker exits. + unregister(f"/{self._shm.name}", "shared_memory") + self._shm_buffer = self._shm.buf except Exception: - os.close(self._shm_fd) - self._shm_fd = INVALID_SHM_FD + self._shm = None + self._shm_buffer = None raise def _make_tensor_view( @@ -63,8 +65,12 @@ def _make_tensor_view( if itemsize <= 0: raise ValueError(f"Invalid dtype size for {dtype_str}") count = length // itemsize + if self._shm_buffer is None: + raise RuntimeError( + f"Shared memory buffer not initialized for shm_name={self._shm_name}" + ) tensor_1d = torch.frombuffer( - self._mmap_obj, dtype=dtype, count=count, offset=offset + self._shm_buffer, dtype=dtype, count=count, offset=offset ) return tensor_1d.view(torch.Size(shape)) @@ -150,11 +156,10 @@ def commit_retrieve(self, key: Any, instance_id: int) -> bool: return False def close(self) -> None: - if self._shm_fd == INVALID_SHM_FD: + if self._shm is None: return try: - self._mmap_obj.close() + self._shm.close() finally: - fd = self._shm_fd - self._shm_fd = INVALID_SHM_FD - os.close(fd) + self._shm = None + self._shm_buffer = None