Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 18 additions & 10 deletions lmcache/v1/distributed/memory_manager.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
# SPDX-License-Identifier: Apache-2.0

# Standard
import os
from multiprocessing import shared_memory
import shutil
import sys

# First Party
from lmcache.logging import init_logger
Expand All @@ -28,13 +29,16 @@ def _unlink_stale_shm(shm_name: str) -> None:
return
if not normalized.startswith("lmcache_l1_pool_"):
return
shm_path = os.path.join("/dev/shm", normalized)
try:
os.unlink(shm_path)
shm = shared_memory.SharedMemory(name=normalized, create=False)
shm.close()
shm.unlink()
except FileNotFoundError:
return
except OSError:
logger.warning("Failed to remove stale shm segment %s", shm_path, exc_info=True)
logger.warning(
"Failed to remove stale shm segment %s", normalized, exc_info=True
)


def create_memory_allocator(config: L1MemoryManagerConfig) -> MemoryAllocatorInterface:
Expand Down Expand Up @@ -73,12 +77,16 @@ def create_memory_allocator(config: L1MemoryManagerConfig) -> MemoryAllocatorInt
if not bare.startswith("lmcache_l1_pool_"):
shm_name = f"lmcache_l1_pool_{bare}"
try:
free_bytes = shutil.disk_usage("/dev/shm").free
if free_bytes < config.size_in_bytes:
raise RuntimeError(
"insufficient /dev/shm capacity: "
f"need {config.size_in_bytes} bytes, have {free_bytes} bytes"
)
# /dev/shm capacity is only meaningful on Linux, where POSIX shm
# is backed by a tmpfs mount with a bounded free-space view.
if sys.platform.startswith("linux"):
free_bytes = shutil.disk_usage("/dev/shm").free
if free_bytes < config.size_in_bytes:
raise RuntimeError(
"insufficient /dev/shm capacity: "
f"need {config.size_in_bytes} bytes, "
f"have {free_bytes} bytes"
)
_unlink_stale_shm(shm_name)
return MixedMemoryAllocator(
config.size_in_bytes,
Expand Down
22 changes: 16 additions & 6 deletions lmcache/v1/multiprocess/non_gpu_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@

Returns SHM-based implementation when shared-memory pool information is
available; otherwise falls back to the pickle-based implementation.
If SHM initialization fails for any reason (e.g. segment not found,
permission error), gracefully falls back to pickle transport.

Args:
metadata: Layout metadata for the non-GPU context.
Expand All @@ -135,12 +137,20 @@
# Local
from .non_gpu_context_shm import NonGpuContextShm

logger.info(
"Creating NonGpuContextShm (shm_name=%s, pool_size=%d)",
shm_name,
pool_size,
)
return NonGpuContextShm(metadata, mq_client, mq_timeout, shm_name, pool_size)
try:
logger.info(
"Creating NonGpuContextShm (shm_name=%s, pool_size=%d)",
shm_name,
pool_size,
)
return NonGpuContextShm(metadata, mq_client, mq_timeout, shm_name, pool_size)

Check failure on line 146 in lmcache/v1/multiprocess/non_gpu_context.py

View workflow job for this annotation

GitHub Actions / Check code quality

Ruff (E501)

lmcache/v1/multiprocess/non_gpu_context.py:146:89: E501 Line too long (89 > 88)
except Exception:
logger.warning(
"Failed to initialize SHM context (shm_name=%s), "
"falling back to pickle transport",
shm_name,
exc_info=True,
)

# Local
from .non_gpu_context_pickle import NonGpuContextPickle
Expand Down
39 changes: 22 additions & 17 deletions lmcache/v1/multiprocess/non_gpu_context_shm.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
"""Shared-memory NonGpuContext implementation for multiprocess mode."""

# Standard
from multiprocessing import shared_memory
from multiprocessing.resource_tracker import unregister
from typing import Any
import mmap
import os

# Third Party
import torch
Expand All @@ -16,8 +16,6 @@
)
from lmcache.v1.multiprocess.protocol import RequestType, get_response_class

INVALID_SHM_FD = -1


class NonGpuContextShm(NonGpuContext):
"""Shared-memory implementation of :class:`NonGpuContext`."""
Expand All @@ -36,16 +34,20 @@ def __init__(

self._shm_name = shm_name
self._pool_size = pool_size
self._shm_fd = INVALID_SHM_FD
shm_path = os.path.join("/dev/shm", shm_name.lstrip("/"))
self._shm_fd = os.open(shm_path, os.O_RDWR)
self._shm: shared_memory.SharedMemory | None = None
self._shm_buffer: memoryview | None = None
try:
self._mmap_obj = mmap.mmap(
self._shm_fd, self._pool_size, access=mmap.ACCESS_WRITE
self._shm = shared_memory.SharedMemory(
name=shm_name.lstrip("/"), create=False
)
# The SHM segment is owned by the server process. Unregister it
# from this worker's resource tracker so that Python does not
# unlink the segment when this worker exits.
unregister(f"/{self._shm.name}", "shared_memory")
self._shm_buffer = self._shm.buf
except Exception:
os.close(self._shm_fd)
self._shm_fd = INVALID_SHM_FD
self._shm = None
self._shm_buffer = None
raise

def _make_tensor_view(
Expand All @@ -63,8 +65,12 @@ def _make_tensor_view(
if itemsize <= 0:
raise ValueError(f"Invalid dtype size for {dtype_str}")
count = length // itemsize
if self._shm_buffer is None:
raise RuntimeError(
f"Shared memory buffer not initialized for shm_name={self._shm_name}"
)
tensor_1d = torch.frombuffer(
self._mmap_obj, dtype=dtype, count=count, offset=offset
self._shm_buffer, dtype=dtype, count=count, offset=offset
)
return tensor_1d.view(torch.Size(shape))

Expand Down Expand Up @@ -150,11 +156,10 @@ def commit_retrieve(self, key: Any, instance_id: int) -> bool:
return False

def close(self) -> None:
if self._shm_fd == INVALID_SHM_FD:
if self._shm is None:
return
try:
self._mmap_obj.close()
self._shm.close()
finally:
fd = self._shm_fd
self._shm_fd = INVALID_SHM_FD
os.close(fd)
self._shm = None
self._shm_buffer = None
Loading