From caa720533dcf65ba1c73a4b8ff5562c8fa2a4157 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 24 Apr 2026 05:23:38 +0000 Subject: [PATCH] feat: add NVMeOF storage backend and L2 adapter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements KV cache storage on NVMe-over-Fabrics attached block devices formatted as a filesystem. Includes: - lmcache/v1/storage_backend/plugins/nvmeof_backend.py: StoragePluginInterface with LRU/LFU/FIFO/MRU eviction, O_DIRECT support, auto-connect/disconnect lifecycle via nvme-cli, and priority-queue async I/O. - lmcache/v1/distributed/l2_adapters/nvmeof_l2_adapter.py: L2AdapterInterface for MP mode using aiofiles, three event fds, atomic tmp→rename writes, and optional O_DIRECT. Self-registers as adapter type "nvmeof". - docs/design/v1/storage_backend/plugins/nvmeof_backend.md: design doc. - docs/design/v1/distributed/l2_adapters/nvmeof_l2_adapter.md: design doc. - tests/v1/storage_backend/test_nvmeof_backend.py: unit tests covering filename helpers, connect/disconnect helpers, config validation, store/ lookup/load/delete flows, listener notifications, and registration. https://claude.ai/code/session_01FtheXdEv1wT46uYr6fEPvy --- .../l2_adapters/nvmeof_l2_adapter.md | 138 +++ .../storage_backend/plugins/nvmeof_backend.md | 127 +++ .../l2_adapters/nvmeof_l2_adapter.py | 981 ++++++++++++++++++ .../storage_backend/plugins/nvmeof_backend.py | 942 +++++++++++++++++ .../v1/storage_backend/test_nvmeof_backend.py | 716 +++++++++++++ 5 files changed, 2904 insertions(+) create mode 100644 docs/design/v1/distributed/l2_adapters/nvmeof_l2_adapter.md create mode 100644 docs/design/v1/storage_backend/plugins/nvmeof_backend.md create mode 100644 lmcache/v1/distributed/l2_adapters/nvmeof_l2_adapter.py create mode 100644 lmcache/v1/storage_backend/plugins/nvmeof_backend.py create mode 100644 tests/v1/storage_backend/test_nvmeof_backend.py diff --git a/docs/design/v1/distributed/l2_adapters/nvmeof_l2_adapter.md b/docs/design/v1/distributed/l2_adapters/nvmeof_l2_adapter.md new file mode 100644 index 0000000000..34ec6200b4 --- /dev/null +++ b/docs/design/v1/distributed/l2_adapters/nvmeof_l2_adapter.md @@ -0,0 +1,138 @@ +# NVMeOF L2 Adapter + +**Source**: `lmcache/v1/distributed/l2_adapters/nvmeof_l2_adapter.py` +**Storage plugin counterpart**: `lmcache/v1/storage_backend/plugins/nvmeof_backend.py` + +## Overview + +`NVMeOFL2Adapter` is an `L2AdapterInterface` implementation for multi-process +(MP) mode that stores KV cache objects on an NVMeOF-attached block device +formatted and mounted as a filesystem. It follows the same event-fd protocol +as `FSL2Adapter` (three distinct fds: store, lookup, load) and uses +*aiofiles* for non-blocking filesystem I/O on a background asyncio event loop. + +The adapter self-registers as type `"nvmeof"` via +`register_l2_adapter_type` / `register_l2_adapter_factory` at import time. +The `__init__.py` auto-discovers it through `pkgutil` — no other changes are +required to make it available. + +## Architecture + +``` +StoreController / PrefetchController + │ + │ submit_{store,lookup,load}_task() + ▼ +NVMeOFL2Adapter + ├── _store_efd ─── signals completed stores + ├── _lookup_efd ─── signals completed lookups + ├── _load_efd ─── signals completed loads + │ + └── background asyncio loop (daemon thread "nvmeof-l2-loop") + │ + ├── _execute_store() ── aiofiles.open / O_DIRECT write + ├── _execute_lookup() ── aiofiles.os.path.exists + └── _execute_load() ── aiofiles.open / O_DIRECT read + │ + ▼ + /mnt/nvmeof/.nvmeof +``` + +## Configuration + +Passed as a JSON object to `--l2-adapter`: + +| Field | Type | Default | Description | +|---|---|---|---| +| `type` | str | _(required)_ | Must be `"nvmeof"` | +| `mount_path` | str | _(required)_ | Filesystem mount point | +| `transport` | str | `"tcp"` | NVMeOF transport: `rdma`, `tcp`, `fc` | +| `target_addr` | str | `""` | Target IP/hostname (auto-connect only) | +| `target_port` | str | `"4420"` | Target port (auto-connect only) | +| `target_nqn` | str | `""` | Target NQN (auto-connect / auto-disconnect) | +| `auto_connect` | bool | `false` | Run `nvme connect` at init | +| `auto_disconnect` | bool | `false` | Run `nvme disconnect` at close | +| `use_odirect` | bool | `false` | Bypass page cache via `O_DIRECT` | + +### Example + +```bash +--l2-adapter '{ + "type": "nvmeof", + "mount_path": "/mnt/nvmeof", + "transport": "tcp", + "target_addr": "192.168.1.100", + "target_port": "4420", + "target_nqn": "nqn.2023-01.io.example:nvme-sub", + "auto_connect": false, + "auto_disconnect": false, + "use_odirect": false +}' +``` + +## File Naming + +``` +/@0x@[@ ].nvmeof +``` + +- `/` in `model_name` is replaced with `-SEP-`. +- `kv_rank` is hex-encoded with `0x` prefix. +- `chunk_hash` is hex-encoded. +- `cache_salt` is appended only when non-empty. +- Extension `.nvmeof` distinguishes files from other adapters (e.g. `.data` + from `FSL2Adapter`) allowing coexistence on the same filesystem. + +## Event-fd Protocol + +Each of the three event fds is distinct (enforced by three separate +`os.eventfd()` calls): + +| Event fd | Signalled by | Read by | +|---|---|---| +| `_store_efd` | `_execute_store` on completion | StoreController | +| `_lookup_efd` | `_execute_lookup` on completion | PrefetchController | +| `_load_efd` | `_execute_load` on completion | PrefetchController | + +Each async coroutine appends to the matching completed-tasks dict under +`_lock`, then writes `1` to the event fd. The controller drains the dict +via `pop_completed_store_tasks()` or `query_{lookup,load}_result()`. + +## Lifecycle + +1. **Init**: create mount dir; optionally `nvme connect`; open three event + fds; start background event loop thread. +2. **Store**: `submit_store_task` → `asyncio.run_coroutine_threadsafe` → + `_execute_store` writes each file atomically (tmp → rename) → signal + `_store_efd`. +3. **Lookup**: `submit_lookup_and_lock_task` → `_execute_lookup` checks + `aiofiles.os.path.exists` for each key → populate Bitmap → signal + `_lookup_efd`. +4. **Unlock**: no-op (filesystem storage has no eviction race between lookup + and load). +5. **Load**: `submit_load_task` → `_execute_load` reads each file into the + caller-provided buffer → populate Bitmap → signal `_load_efd`. +6. **Delete**: synchronous `Path.unlink(missing_ok=True)` for each key; + fires `on_l2_keys_deleted` listener notifications. +7. **Close**: cancel pending tasks; stop event loop; join thread; close event + fds; optionally `nvme disconnect`. + +## Thread Safety + +- `_lock` protects all three completed-task dicts and `_next_task_id`. +- All I/O runs on the dedicated event loop thread; task submission is + thread-safe via `asyncio.run_coroutine_threadsafe`. + +## O_DIRECT + +When `use_odirect=true`, buffer sizes must be aligned to +`os.statvfs(mount_path).f_bsize`. Unaligned writes fall back to standard +buffered I/O with a warning. O_DIRECT reads and writes run in the default +executor (thread pool) via `loop.run_in_executor` so they don't block the +event loop. + +## Persistence + +Files are never deleted on normal shutdown — data persists across restarts. +Use the `delete()` method (called by the L2 eviction controller when +`"eviction"` is configured in the adapter JSON) to remove stale entries. diff --git a/docs/design/v1/storage_backend/plugins/nvmeof_backend.md b/docs/design/v1/storage_backend/plugins/nvmeof_backend.md new file mode 100644 index 0000000000..9ffc4fb933 --- /dev/null +++ b/docs/design/v1/storage_backend/plugins/nvmeof_backend.md @@ -0,0 +1,127 @@ +# NVMeOF Storage Plugin Backend + +**Source**: `lmcache/v1/storage_backend/plugins/nvmeof_backend.py` +**L2 counterpart**: `lmcache/v1/distributed/l2_adapters/nvmeof_l2_adapter.py` + +## Overview + +`NVMeOFBackend` is a `StoragePluginInterface` implementation that stores KV +cache chunks on an NVMe-over-Fabrics (NVMeOF) attached block device formatted +and mounted as a filesystem (ext4, XFS, etc.). Each KV chunk is a separate +file under `nvmeof.mount_path`. + +The backend fits into the single-process storage stack via +`storage_plugin_launcher()` in `lmcache/v1/storage_backend/__init__.py`. For +multi-process (MP) mode, see the companion L2 adapter. + +## Architecture + +``` +LMCache engine + │ + ▼ +StorageBackendInterface + │ + ▼ +NVMeOFBackend ──── _NVMeOFWorker (AsyncPQThreadPoolExecutor) + │ │ + │ ┌──────────┴───────────┐ + │ put (p=2) delete (p=1) prefetch (p=0) + │ + ▼ +NVMeOF mount point (e.g. /mnt/nvmeof, formatted as ext4/XFS) + │ + ▼ +NVMeOF block device (connected via nvme-cli or pre-mounted) +``` + +### Key design decisions + +| Decision | Choice | Rationale | +|---|---|---| +| Access mode | Filesystem (not raw block) | Simpler portability; works with any FS driver | +| Auto-connect | Off by default | Device is typically pre-mounted by the operator | +| Eviction | Pluggable policy (LRU/LFU/FIFO/MRU) | Matches LocalDiskBackend behavior | +| O_DIRECT | Optional | Avoids double-buffering; requires aligned buffers | +| Per-file storage | One file per KV chunk | Simple; enables parallel reads/writes | +| Async I/O | `AsyncPQThreadPoolExecutor` | Priority ordering: prefetch > delete > put | + +## Configuration + +All keys are prefixed `nvmeof.` in `extra_config`: + +| Key | Type | Default | Description | +|---|---|---|---| +| `nvmeof.mount_path` | str | _(required)_ | Filesystem mount point | +| `nvmeof.transport` | str | `"tcp"` | NVMeOF transport: `rdma`, `tcp`, `fc` | +| `nvmeof.target_addr` | str | `""` | Target IP/hostname (auto-connect only) | +| `nvmeof.target_port` | str | `"4420"` | Target port (auto-connect only) | +| `nvmeof.target_nqn` | str | `""` | Target NQN (auto-connect / auto-disconnect) | +| `nvmeof.auto_connect` | bool | `false` | Run `nvme connect` at init | +| `nvmeof.auto_disconnect` | bool | `false` | Run `nvme disconnect` at close | +| `nvmeof.use_odirect` | bool | `false` | Use `O_DIRECT` for I/O | +| `nvmeof.max_capacity_gb` | float | `0` (unlimited) | Max filesystem usage in GiB | +| `nvmeof.cache_policy` | str | `"lru"` | Eviction policy: `lru`, `lfu`, `fifo`, `mru` | + +### Example `extra_config` + +```yaml +storage_plugins: ["nvmeof"] +extra_config: + storage_plugin.nvmeof.module_path: lmcache.v1.storage_backend.plugins.nvmeof_backend + storage_plugin.nvmeof.class_name: NVMeOFBackend + nvmeof.mount_path: /mnt/nvmeof + nvmeof.transport: tcp + nvmeof.target_addr: 192.168.1.100 + nvmeof.target_port: "4420" + nvmeof.target_nqn: nqn.2023-01.io.example:nvme-sub + nvmeof.auto_connect: false + nvmeof.auto_disconnect: false + nvmeof.use_odirect: false + nvmeof.max_capacity_gb: 100.0 + nvmeof.cache_policy: lru +``` + +## File Naming + +``` +/.nvmeof +``` + +`CacheEngineKey.to_string()` produces a slash-separated string; slashes are +replaced with `-` to produce a flat filename with no subdirectory structure. + +## Lifecycle + +1. **Init**: `os.makedirs(mount_path, exist_ok=True)`. If `auto_connect=true`, + calls `nvme connect` via subprocess and raises `RuntimeError` on failure. +2. **Put**: `batched_submit_put_task` → evict if at capacity → schedule + `_write_chunk` on the priority executor (priority 2). +3. **Get (blocking)**: `get_blocking` reads the file synchronously into a CPU + staging buffer provided by `local_cpu_backend`. +4. **Get (async/prefetch)**: `batched_get_non_blocking` schedules + `_batched_read_files` on the executor at priority 0 (highest). +5. **Eviction**: `_cache_policy.get_evict_candidates()` selects keys; + `remove()` deletes the file and updates the in-memory index. +6. **Close**: Drains the executor, then calls `nvme disconnect` if + `auto_disconnect=true`. + +## Thread Safety + +- `_disk_lock` protects the in-memory index (`_dict`) and `_current_cache_size`. +- `_NVMeOFWorker._put_lock` protects the in-flight put-task list. +- Async tasks run on the caller-provided `loop` via `asyncio.run_coroutine_threadsafe`. + +## O_DIRECT + +When `use_odirect=true`, buffer sizes must be aligned to the filesystem block +size (`os.statvfs(mount_path).f_bsize`). Unaligned writes fall back to +standard buffered I/O with a warning log. NVMeOF devices typically have a +512 B or 4 KiB block size. + +## Extension Points + +- **Custom eviction**: implement a new policy and register it in + `lmcache/v1/storage_backend/cache_policy.py`. +- **Raw block access**: replace `_write_file` / `_read_file` with pread/pwrite + calls (see `rust_raw_block_backend.py` for the pattern). diff --git a/lmcache/v1/distributed/l2_adapters/nvmeof_l2_adapter.py b/lmcache/v1/distributed/l2_adapters/nvmeof_l2_adapter.py new file mode 100644 index 0000000000..571b4f9319 --- /dev/null +++ b/lmcache/v1/distributed/l2_adapters/nvmeof_l2_adapter.py @@ -0,0 +1,981 @@ +# SPDX-License-Identifier: Apache-2.0 +""" +NVMe-oF (NVMe over Fabrics) L2 adapter. + +Stores KV cache objects on an NVMeOF-connected block device that has been +formatted and mounted as a filesystem. Each :class:`ObjectKey` maps to a +separate ``.nvmeof`` file whose name encodes all key fields. + +Supports optional auto-connect / auto-disconnect lifecycle via ``nvme-cli``, +O_DIRECT I/O, and the standard L2 adapter event-fd protocol. + +Configuration JSON example:: + + { + "type": "nvmeof", + "mount_path": "/mnt/nvmeof", + "transport": "tcp", + "target_addr": "192.168.1.100", + "target_port": "4420", + "target_nqn": "nqn.2023-01.io.example:nvme-sub", + "auto_connect": false, + "auto_disconnect": false, + "use_odirect": false + } +""" + +# Future +from __future__ import annotations + +# Standard +from pathlib import Path +from typing import TYPE_CHECKING, Optional, Union +import asyncio +import os +import subprocess +import threading + +if TYPE_CHECKING: + from lmcache.v1.distributed.internal_api import L1MemoryDesc + +# Third Party +import aiofiles +import aiofiles.os + +# First Party +from lmcache.logging import init_logger +from lmcache.native_storage_ops import Bitmap +from lmcache.v1.distributed.api import ObjectKey +from lmcache.v1.distributed.l2_adapters.base import ( + L2AdapterInterface, + L2TaskId, +) +from lmcache.v1.distributed.l2_adapters.config import ( + L2AdapterConfigBase, + register_l2_adapter_type, +) +from lmcache.v1.distributed.l2_adapters.factory import ( + register_l2_adapter_factory, +) +from lmcache.v1.memory_management import MemoryObj + +logger = init_logger(__name__) + +_KEY_SEP = "@" +_PATH_SLASH_REPLACEMENT = "-SEP-" +_FILE_EXT = ".nvmeof" +_VALID_TRANSPORTS = {"rdma", "tcp", "fc"} + + +# --------------------------------------------------------------------------- +# Filename helpers +# --------------------------------------------------------------------------- + + +def _object_key_to_filename(key: ObjectKey) -> str: + """Build a reversible, filesystem-safe filename for *key*. + + Format:: + + @0x@.nvmeof + @0x@@.nvmeof + + Args: + key: The object key to encode. + + Returns: + Filename string (no directory component). + """ + safe_model = key.model_name.replace("/", _PATH_SLASH_REPLACEMENT) + base = ( + f"{safe_model}{_KEY_SEP}{key.kv_rank:#010x}" + f"{_KEY_SEP}{key.chunk_hash.hex()}" + ) + if key.cache_salt: + return f"{base}{_KEY_SEP}{key.cache_salt}{_FILE_EXT}" + return f"{base}{_FILE_EXT}" + + +def _filename_to_object_key(filename: str) -> Optional[ObjectKey]: + """Reverse :func:`_object_key_to_filename`. + + Args: + filename: Bare filename (no directory component). + + Returns: + :class:`ObjectKey` on success, ``None`` if the filename cannot be + parsed. + """ + if not filename.endswith(_FILE_EXT): + return None + stem = filename[: -len(_FILE_EXT)] + parts = stem.split(_KEY_SEP) + if len(parts) == 3: + safe_model, kv_rank_str, chunk_hash_hex = parts + cache_salt = "" + elif len(parts) == 4: + safe_model, kv_rank_str, chunk_hash_hex, cache_salt = parts + else: + return None + + model_name = safe_model.replace(_PATH_SLASH_REPLACEMENT, "/") + try: + chunk_hash = bytes.fromhex(chunk_hash_hex) + kv_rank = int(kv_rank_str, 16) + return ObjectKey( + chunk_hash=chunk_hash, + model_name=model_name, + kv_rank=kv_rank, + cache_salt=cache_salt, + ) + except ValueError: + return None + + +# --------------------------------------------------------------------------- +# NVMeOF connection helpers +# --------------------------------------------------------------------------- + + +def _run_nvme_connect( + transport: str, + target_addr: str, + target_port: str, + target_nqn: str, +) -> bool: + """Connect to an NVMeOF target using ``nvme connect``. + + Args: + transport: Transport protocol ("rdma", "tcp", or "fc"). + target_addr: Target IP address or hostname. + target_port: Target port number. + target_nqn: Target NVMe Qualified Name. + + Returns: + True if the connection succeeded, False otherwise. + """ + cmd = ["nvme", "connect", "-t", transport, "-a", target_addr, "-s", target_port, "-n", target_nqn] + try: + result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) + if result.returncode == 0: + logger.info( + "NVMeOF connect succeeded: transport=%s addr=%s port=%s nqn=%s", + transport, target_addr, target_port, target_nqn, + ) + return True + logger.error( + "NVMeOF connect failed (rc=%d): %s", + result.returncode, result.stderr.strip(), + ) + return False + except FileNotFoundError: + logger.error("nvme-cli not found; cannot auto-connect to NVMeOF target") + return False + except subprocess.TimeoutExpired: + logger.error("NVMeOF connect timed out after 30 s") + return False + except Exception as exc: + logger.error("NVMeOF connect raised: %s", exc) + return False + + +def _run_nvme_disconnect(target_nqn: str) -> bool: + """Disconnect from an NVMeOF target using ``nvme disconnect``. + + Args: + target_nqn: Target NVMe Qualified Name to disconnect. + + Returns: + True if the disconnection succeeded, False otherwise. + """ + cmd = ["nvme", "disconnect", "-n", target_nqn] + try: + result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) + if result.returncode == 0: + logger.info("NVMeOF disconnect succeeded for nqn=%s", target_nqn) + return True + logger.warning( + "NVMeOF disconnect returned rc=%d: %s", + result.returncode, result.stderr.strip(), + ) + return False + except FileNotFoundError: + logger.warning("nvme-cli not found; skipping disconnect") + return False + except subprocess.TimeoutExpired: + logger.error("NVMeOF disconnect timed out after 30 s") + return False + except Exception as exc: + logger.error("NVMeOF disconnect raised: %s", exc) + return False + + +# --------------------------------------------------------------------------- +# Config +# --------------------------------------------------------------------------- + + +class NVMeOFL2AdapterConfig(L2AdapterConfigBase): + """Configuration for the NVMeOF-backed L2 adapter. + + Fields: + - mount_path: Directory where the NVMeOF block device is mounted. + - transport: NVMeOF transport protocol ("rdma", "tcp", or "fc"). + - target_addr: Target IP address or hostname (required for auto_connect). + - target_port: Target port (default "4420"). + - target_nqn: Target NVMe Qualified Name (required for auto_connect). + - auto_connect: If True, connect via nvme-cli at init time. + - auto_disconnect: If True, disconnect via nvme-cli at close time. + - use_odirect: If True, bypass the OS page cache using O_DIRECT. + """ + + def __init__( + self, + mount_path: str, + transport: str = "tcp", + target_addr: str = "", + target_port: str = "4420", + target_nqn: str = "", + auto_connect: bool = False, + auto_disconnect: bool = False, + use_odirect: bool = False, + ) -> None: + """Initialize NVMeOFL2AdapterConfig. + + Args: + mount_path: Directory where the NVMeOF device is mounted. + transport: NVMeOF transport protocol. + target_addr: Target IP address or hostname. + target_port: Target port number as a string. + target_nqn: Target NVMe Qualified Name. + auto_connect: Connect via nvme-cli at init. + auto_disconnect: Disconnect via nvme-cli at close. + use_odirect: Bypass page cache via O_DIRECT. + """ + self.mount_path = mount_path + self.transport = transport + self.target_addr = target_addr + self.target_port = target_port + self.target_nqn = target_nqn + self.auto_connect = auto_connect + self.auto_disconnect = auto_disconnect + self.use_odirect = use_odirect + + @classmethod + def from_dict(cls, d: dict) -> "NVMeOFL2AdapterConfig": + """Build config from a dict (e.g. parsed JSON). + + Args: + d: Adapter spec dict; must include ``mount_path``. + + Returns: + NVMeOFL2AdapterConfig instance. + + Raises: + ValueError: If required keys are missing or values are invalid. + """ + mount_path = d.get("mount_path") + if not isinstance(mount_path, str) or not mount_path: + raise ValueError("mount_path must be a non-empty string") + + transport = d.get("transport", "tcp") + if not isinstance(transport, str) or transport.lower() not in _VALID_TRANSPORTS: + raise ValueError( + f"transport must be one of {sorted(_VALID_TRANSPORTS)}, got {transport!r}" + ) + + target_addr = d.get("target_addr", "") + if not isinstance(target_addr, str): + raise ValueError("target_addr must be a string") + + target_port = str(d.get("target_port", "4420")) + + target_nqn = d.get("target_nqn", "") + if not isinstance(target_nqn, str): + raise ValueError("target_nqn must be a string") + + auto_connect = d.get("auto_connect", False) + if not isinstance(auto_connect, bool): + raise ValueError("auto_connect must be a boolean") + + auto_disconnect = d.get("auto_disconnect", False) + if not isinstance(auto_disconnect, bool): + raise ValueError("auto_disconnect must be a boolean") + + use_odirect = d.get("use_odirect", False) + if not isinstance(use_odirect, bool): + raise ValueError("use_odirect must be a boolean") + + return cls( + mount_path=mount_path, + transport=transport.lower(), + target_addr=target_addr, + target_port=target_port, + target_nqn=target_nqn, + auto_connect=auto_connect, + auto_disconnect=auto_disconnect, + use_odirect=use_odirect, + ) + + @classmethod + def help(cls) -> str: + """Return a help string describing the config fields. + + Returns: + Human-readable description of all config fields. + """ + return ( + "NVMeOF L2 adapter config fields:\n" + "- mount_path (str): mount point of the NVMeOF filesystem (required)\n" + "- transport (str): NVMeOF transport -- 'rdma', 'tcp', or 'fc' (default 'tcp')\n" + "- target_addr (str): target IP/hostname (required when auto_connect=true)\n" + "- target_port (str): target port (default '4420')\n" + "- target_nqn (str): target NQN (required when auto_connect=true)\n" + "- auto_connect (bool): run 'nvme connect' at init (default false)\n" + "- auto_disconnect (bool): run 'nvme disconnect' at close (default false)\n" + "- use_odirect (bool): bypass page cache via O_DIRECT (default false)" + ) + + +# --------------------------------------------------------------------------- +# Adapter +# --------------------------------------------------------------------------- + + +class NVMeOFL2Adapter(L2AdapterInterface): + """NVMeOF-backed L2 adapter using async filesystem I/O via *aiofiles*. + + Each :class:`ObjectKey` maps to a ``.nvmeof`` file under ``mount_path``. + The file name encodes all key fields so it is reversible without an + in-memory index. + + Thread safety: + Task bookkeeping dicts are protected by ``_lock``. All I/O runs on + a dedicated asyncio event loop on a daemon thread. + + Args: + config: NVMeOF adapter configuration. + """ + + def __init__(self, config: NVMeOFL2AdapterConfig) -> None: + super().__init__() + self._config = config + self._mount_path = Path(config.mount_path) + self._mount_path.mkdir(parents=True, exist_ok=True) + + # O_DIRECT settings + self._use_odirect = config.use_odirect + self._os_disk_bs = 0 + if self._use_odirect: + stat = os.statvfs(self._mount_path) + self._os_disk_bs = stat.f_bsize + + # NVMeOF lifecycle + self._auto_disconnect = config.auto_disconnect + self._target_nqn = config.target_nqn + + if config.auto_connect: + if not config.target_addr or not config.target_nqn: + raise ValueError( + "target_addr and target_nqn are required when auto_connect=true" + ) + connected = _run_nvme_connect( + config.transport, + config.target_addr, + config.target_port, + config.target_nqn, + ) + if not connected: + raise RuntimeError( + f"Failed to auto-connect to NVMeOF target {config.target_nqn}" + ) + + # Event fds — each must be distinct (see L2AdapterInterface contract) + self._store_efd = os.eventfd(0, os.EFD_NONBLOCK | os.EFD_CLOEXEC) + self._lookup_efd = os.eventfd(0, os.EFD_NONBLOCK | os.EFD_CLOEXEC) + self._load_efd = os.eventfd(0, os.EFD_NONBLOCK | os.EFD_CLOEXEC) + + # Task bookkeeping + self._next_task_id: L2TaskId = 0 + self._completed_store_tasks: dict[L2TaskId, bool] = {} + self._completed_lookup_tasks: dict[L2TaskId, Bitmap] = {} + self._completed_load_tasks: dict[L2TaskId, Bitmap] = {} + self._lock = threading.Lock() + + # Background asyncio event loop on a daemon thread + self._loop = asyncio.new_event_loop() + self._loop_thread = threading.Thread( + target=self._run_event_loop, daemon=True, name="nvmeof-l2-loop" + ) + self._loop_thread.start() + + logger.info( + "NVMeOFL2Adapter initialised: mount_path=%s transport=%s odirect=%s", + self._mount_path, + config.transport, + self._use_odirect, + ) + + # ------------------------------------------------------------------ + # Event fd interface + # ------------------------------------------------------------------ + + def get_store_event_fd(self) -> int: + """Return the store-completion event fd. + + Returns: + File descriptor signalled when a store task completes. + """ + return self._store_efd + + def get_lookup_and_lock_event_fd(self) -> int: + """Return the lookup-and-lock-completion event fd. + + Returns: + File descriptor signalled when a lookup task completes. + """ + return self._lookup_efd + + def get_load_event_fd(self) -> int: + """Return the load-completion event fd. + + Returns: + File descriptor signalled when a load task completes. + """ + return self._load_efd + + # ------------------------------------------------------------------ + # Store interface + # ------------------------------------------------------------------ + + def submit_store_task( + self, + keys: list[ObjectKey], + objects: list[MemoryObj], + ) -> L2TaskId: + """Asynchronously write a batch of KV objects to the NVMeOF filesystem. + + Args: + keys: Cache keys for the objects to store. + objects: Memory objects to write; one per key. + + Returns: + Task ID that can be used to poll for completion via + :meth:`pop_completed_store_tasks`. + """ + with self._lock: + task_id = self._get_next_task_id() + + asyncio.run_coroutine_threadsafe( + self._execute_store(keys, objects, task_id), + self._loop, + ) + return task_id + + def pop_completed_store_tasks(self) -> dict[L2TaskId, bool]: + """Pop and return all completed store tasks. + + Returns: + Mapping of task ID to success flag (True = success). + """ + with self._lock: + completed = self._completed_store_tasks + self._completed_store_tasks = {} + return completed + + # ------------------------------------------------------------------ + # Lookup and lock interface + # ------------------------------------------------------------------ + + def submit_lookup_and_lock_task(self, keys: list[ObjectKey]) -> L2TaskId: + """Asynchronously check whether keys exist on the NVMeOF filesystem. + + Args: + keys: Keys to look up. + + Returns: + Task ID for polling via :meth:`query_lookup_and_lock_result`. + """ + with self._lock: + task_id = self._get_next_task_id() + + asyncio.run_coroutine_threadsafe( + self._execute_lookup(keys, task_id), + self._loop, + ) + return task_id + + def query_lookup_and_lock_result(self, task_id: L2TaskId) -> Bitmap | None: + """Non-blockingly retrieve the result of a lookup task. + + Args: + task_id: ID returned by :meth:`submit_lookup_and_lock_task`. + + Returns: + Bitmap (1 = found) or ``None`` if the task is not yet done. + """ + with self._lock: + return self._completed_lookup_tasks.pop(task_id, None) + + def submit_unlock(self, keys: list[ObjectKey]) -> None: + """No-op: NVMeOF filesystem storage does not require locking. + + Args: + keys: Keys to unlock (ignored). + """ + pass + + # ------------------------------------------------------------------ + # Load interface + # ------------------------------------------------------------------ + + def submit_load_task( + self, + keys: list[ObjectKey], + objects: list[MemoryObj], + ) -> L2TaskId: + """Asynchronously read KV objects from the NVMeOF filesystem. + + Args: + keys: Keys to load. + objects: Pre-allocated memory buffers to fill; one per key. + + Returns: + Task ID for polling via :meth:`query_load_result`. + """ + with self._lock: + task_id = self._get_next_task_id() + + asyncio.run_coroutine_threadsafe( + self._execute_load(keys, objects, task_id), + self._loop, + ) + return task_id + + def query_load_result(self, task_id: L2TaskId) -> Bitmap | None: + """Non-blockingly retrieve the result of a load task. + + Args: + task_id: ID returned by :meth:`submit_load_task`. + + Returns: + Bitmap (1 = loaded) or ``None`` if the task is not yet done. + """ + with self._lock: + return self._completed_load_tasks.pop(task_id, None) + + # ------------------------------------------------------------------ + # Eviction interface (no-op — NVMeOF files persist across sessions) + # ------------------------------------------------------------------ + + def delete(self, keys: list[ObjectKey]) -> None: + """Delete files for *keys* from the NVMeOF filesystem. + + Args: + keys: Keys whose files should be removed. + """ + for key in keys: + path = self._key_to_path(key) + try: + path.unlink(missing_ok=True) + except OSError as exc: + logger.warning("NVMeOF delete failed for %s: %s", path, exc) + self._notify_keys_deleted(keys) + + def get_usage(self) -> tuple[float, float]: + """Return storage utilisation (unsupported; returns sentinel values). + + Returns: + ``(-1.0, -1.0)`` — usage tracking is not implemented. + """ + return (-1.0, -1.0) + + # ------------------------------------------------------------------ + # Status interface + # ------------------------------------------------------------------ + + def report_status(self) -> dict: + """Return a status dict for the NVMeOF L2 adapter. + + Returns: + Dict with at least ``is_healthy``, ``type``, ``mount_path``, + and ``use_odirect`` keys. + """ + return { + "is_healthy": self._loop_thread.is_alive(), + "type": "NVMeOFL2Adapter", + "mount_path": str(self._mount_path), + "transport": self._config.transport, + "use_odirect": self._use_odirect, + "event_loop_alive": self._loop_thread.is_alive(), + } + + # ------------------------------------------------------------------ + # Cleanup + # ------------------------------------------------------------------ + + def close(self) -> None: + """Shut down the adapter, optionally disconnect from the NVMeOF target. + + Cancels all pending async tasks, stops the event loop, and closes + the event fds. + """ + async def _stop_tasks() -> None: + tasks = [ + t + for t in asyncio.all_tasks(self._loop) + if t is not asyncio.current_task() + ] + for task in tasks: + task.cancel() + if tasks: + await asyncio.gather(*tasks, return_exceptions=True) + + if self._loop.is_running(): + fut = asyncio.run_coroutine_threadsafe(_stop_tasks(), self._loop) + try: + fut.result(timeout=5) + except Exception: + pass + self._loop.call_soon_threadsafe(self._loop.stop) + + self._loop_thread.join() + self._loop.close() + + os.close(self._store_efd) + os.close(self._lookup_efd) + os.close(self._load_efd) + + if self._auto_disconnect and self._target_nqn: + _run_nvme_disconnect(self._target_nqn) + + logger.info("NVMeOFL2Adapter closed (mount_path=%s)", self._mount_path) + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + + def _run_event_loop(self) -> None: + """Run the background asyncio event loop (daemon thread target).""" + asyncio.set_event_loop(self._loop) + self._loop.run_forever() + + def _get_next_task_id(self) -> L2TaskId: + """Allocate and return the next sequential task ID. + + Returns: + Monotonically increasing integer task ID. + """ + tid = self._next_task_id + self._next_task_id += 1 + return tid + + def _key_to_path(self, key: ObjectKey) -> Path: + """Map *key* to its absolute file path under the mount point. + + Args: + key: Object key to map. + + Returns: + Absolute :class:`Path` for the NVMeOF file. + """ + return self._mount_path / _object_key_to_filename(key) + + # ---- O_DIRECT helpers ----------------------------------------------- + + def _read_with_odirect( + self, + file_path: Path, + dst_buf: Union[bytearray, memoryview, bytes], + ) -> int: + """Synchronous O_DIRECT read into *dst_buf* (runs in executor). + + Args: + file_path: Source file path. + dst_buf: Pre-allocated destination buffer. + + Returns: + Number of bytes read, or 0 on error. + """ + fd = -1 + size = len(dst_buf) + try: + aligned = self._os_disk_bs > 0 and size % self._os_disk_bs == 0 + if not aligned: + logger.warning("Cannot use O_DIRECT for %s: size not aligned", file_path) + with open(file_path, "rb") as f: + return _readinto_full(f, dst_buf) + + fd = os.open(str(file_path), os.O_RDONLY | getattr(os, "O_DIRECT", 0)) + with os.fdopen(fd, "rb", buffering=0) as fdo: + fd = -1 + return _readinto_full(fdo, dst_buf) + except Exception: + logger.exception("Failed to O_DIRECT read %s", file_path) + return 0 + finally: + if fd >= 0: + try: + os.close(fd) + except OSError: + pass + + def _write_with_odirect(self, file_path: Path, buf: bytes) -> None: + """Synchronous O_DIRECT write of *buf* (runs in executor). + + Args: + file_path: Destination file path. + buf: Data to write. + + Raises: + OSError: If the write fails. + """ + fd = -1 + try: + fd = os.open( + str(file_path), + os.O_CREAT | os.O_WRONLY | getattr(os, "O_DIRECT", 0), + 0o644, + ) + os.write(fd, buf) + except Exception: + logger.exception("Failed to O_DIRECT write %s", file_path) + raise + finally: + if fd >= 0: + try: + os.close(fd) + except OSError: + pass + + # ---- store ---------------------------------------------------------- + + async def _execute_store( + self, + keys: list[ObjectKey], + objects: list[MemoryObj], + task_id: L2TaskId, + ) -> None: + """Write each key-object pair to the NVMeOF filesystem. + + Signals the store event fd on completion. + + Args: + keys: Keys to store. + objects: Memory objects to write. + task_id: Task ID to record in the completed-tasks dict. + """ + success = True + try: + for key, obj in zip(keys, objects, strict=True): + file_path = self._key_to_path(key) + tmp_path = file_path.with_suffix(".tmp") + + if await aiofiles.os.path.exists(file_path): + continue + + buf = obj.byte_array + size = len(buf) + try: + do_odirect = self._use_odirect + if do_odirect: + aligned = self._os_disk_bs > 0 and size % self._os_disk_bs == 0 + if not aligned: + logger.warning( + "Cannot use O_DIRECT for write of size %d (bs=%d)", + size, self._os_disk_bs, + ) + do_odirect = False + + if do_odirect: + await self._loop.run_in_executor( + None, self._write_with_odirect, tmp_path, buf + ) + else: + async with aiofiles.open(tmp_path, "wb") as f: + await f.write(buf) + + await aiofiles.os.replace(tmp_path, file_path) + logger.debug("NVMeOFL2Adapter stored %s (%d B)", file_path.name, size) + except Exception: + logger.exception("NVMeOFL2Adapter failed to store %s", file_path) + if await aiofiles.os.path.exists(tmp_path): + await aiofiles.os.unlink(tmp_path) + success = False + except Exception: + logger.exception("NVMeOFL2Adapter store task %s failed", task_id) + success = False + + with self._lock: + self._completed_store_tasks[task_id] = success + os.eventfd_write(self._store_efd, 1) + + if success: + self._notify_keys_stored(keys) + + # ---- lookup --------------------------------------------------------- + + async def _execute_lookup( + self, + keys: list[ObjectKey], + task_id: L2TaskId, + ) -> None: + """Check existence of each key's file on the NVMeOF filesystem. + + Signals the lookup event fd on completion. + + Args: + keys: Keys to look up. + task_id: Task ID to record in the completed-tasks dict. + """ + bitmap = Bitmap(len(keys)) + for i, key in enumerate(keys): + path = self._key_to_path(key) + if await aiofiles.os.path.exists(path): + bitmap.set(i) + + with self._lock: + self._completed_lookup_tasks[task_id] = bitmap + os.eventfd_write(self._lookup_efd, 1) + + accessed = [keys[i] for i in range(len(keys)) if bitmap.get(i)] + if accessed: + self._notify_keys_accessed(accessed) + + # ---- load ----------------------------------------------------------- + + async def _execute_load( + self, + keys: list[ObjectKey], + objects: list[MemoryObj], + task_id: L2TaskId, + ) -> None: + """Read each key's file from the NVMeOF filesystem into *objects*. + + Signals the load event fd on completion. + + Args: + keys: Keys to load. + objects: Pre-allocated memory buffers to fill. + task_id: Task ID to record in the completed-tasks dict. + """ + bitmap = Bitmap(len(keys)) + for i, key in enumerate(keys): + file_path = self._key_to_path(key) + try: + dst_buf = objects[i].byte_array + expected = len(dst_buf) + + if self._use_odirect: + num_read = await self._loop.run_in_executor( + None, self._read_with_odirect, file_path, dst_buf + ) + if num_read == expected: + bitmap.set(i) + else: + logger.warning( + "Incomplete O_DIRECT read for %s: expected %d, got %d", + file_path.name, expected, num_read, + ) + continue + + async with aiofiles.open(file_path, "rb") as f: + num_read = await _async_readinto_full(f, dst_buf) + + if num_read == expected: + bitmap.set(i) + logger.debug( + "NVMeOFL2Adapter loaded %s (%d B)", file_path.name, num_read + ) + else: + logger.warning( + "Incomplete read for %s: expected %d, got %d", + file_path.name, expected, num_read, + ) + except FileNotFoundError: + continue + except Exception: + logger.exception("NVMeOFL2Adapter failed to load %s", file_path) + continue + + with self._lock: + self._completed_load_tasks[task_id] = bitmap + os.eventfd_write(self._load_efd, 1) + + +# --------------------------------------------------------------------------- +# I/O helpers (module-level so they can be referenced from both class methods +# and the executor) +# --------------------------------------------------------------------------- + + +def _readinto_full( + f, # IO[bytes] + buf: Union[bytearray, memoryview, bytes], +) -> int: + """Loop ``readinto()`` until *buf* is full or EOF. + + Args: + f: Binary file object with a ``readinto`` method. + buf: Destination buffer. + + Returns: + Total bytes read. + """ + mv = memoryview(buf) if not isinstance(buf, memoryview) else buf + total = 0 + while total < len(mv): + n = f.readinto(mv[total:]) + if n is None or n == 0: + break + total += n + return total + + +async def _async_readinto_full( + f, # aiofiles async file handle + buf: Union[bytearray, memoryview, bytes], +) -> int: + """Async version of :func:`_readinto_full`. + + Args: + f: Async file handle with a ``readinto`` coroutine. + buf: Destination buffer. + + Returns: + Total bytes read. + """ + mv = memoryview(buf) if not isinstance(buf, memoryview) else buf + total = 0 + while total < len(mv): + n = await f.readinto(mv[total:]) + if n is None or n == 0: + break + total += n + return total + + +# --------------------------------------------------------------------------- +# Self-registration +# --------------------------------------------------------------------------- + +register_l2_adapter_type("nvmeof", NVMeOFL2AdapterConfig) + + +def _create_nvmeof_adapter( + config: L2AdapterConfigBase, + l1_memory_desc: "Optional[L1MemoryDesc]" = None, +) -> L2AdapterInterface: + """Create a :class:`NVMeOFL2Adapter` from *config*. + + Args: + config: Must be a :class:`NVMeOFL2AdapterConfig` instance. + l1_memory_desc: Unused; present for factory signature compatibility. + + Returns: + A new :class:`NVMeOFL2Adapter`. + """ + return NVMeOFL2Adapter(config) # type: ignore[arg-type] + + +register_l2_adapter_factory("nvmeof", _create_nvmeof_adapter) diff --git a/lmcache/v1/storage_backend/plugins/nvmeof_backend.py b/lmcache/v1/storage_backend/plugins/nvmeof_backend.py new file mode 100644 index 0000000000..16dbf5308e --- /dev/null +++ b/lmcache/v1/storage_backend/plugins/nvmeof_backend.py @@ -0,0 +1,942 @@ +# SPDX-License-Identifier: Apache-2.0 +""" +NVMe-oF (NVMe over Fabrics) storage plugin backend. + +Stores KV cache chunks on an NVMeOF-connected block device that has been +formatted and mounted as a filesystem. Supports optional auto-connect/ +auto-disconnect lifecycle via the ``nvme-cli`` tool, O_DIRECT I/O, and +pluggable LRU/LFU/FIFO/MRU cache eviction policies. + +Configuration is provided via ``extra_config`` in :class:`LMCacheEngineConfig`: + +.. code-block:: yaml + + storage_plugins: ["nvmeof"] + extra_config: + storage_plugin.nvmeof.module_path: >- + lmcache.v1.storage_backend.plugins.nvmeof_backend + storage_plugin.nvmeof.class_name: NVMeOFBackend + nvmeof.mount_path: /mnt/nvmeof + nvmeof.transport: tcp # rdma | tcp | fc + nvmeof.target_addr: 192.168.1.100 + nvmeof.target_port: "4420" + nvmeof.target_nqn: nqn.2023-01.io.example:nvme-sub + nvmeof.auto_connect: false + nvmeof.auto_disconnect: false + nvmeof.use_odirect: false + nvmeof.max_capacity_gb: 100.0 + nvmeof.cache_policy: lru # lru | lfu | fifo | mru +""" + +# Future +from __future__ import annotations + +# Standard +from concurrent.futures import Future +from typing import TYPE_CHECKING, Any, Callable, List, Optional, Sequence +import asyncio +import os +import subprocess +import threading +import time + +# Third Party +import torch + +# First Party +from lmcache.logging import init_logger +from lmcache.observability import LMCStatsMonitor +from lmcache.utils import CacheEngineKey, DiskCacheMetadata +from lmcache.v1.config import LMCacheEngineConfig +from lmcache.v1.memory_management import MemoryFormat, MemoryObj +from lmcache.v1.metadata import LMCacheMetadata +from lmcache.v1.storage_backend.abstract_backend import ( + AllocatorBackendInterface, + StoragePluginInterface, +) +from lmcache.v1.storage_backend.cache_policy import get_cache_policy +from lmcache.v1.storage_backend.job_executor.pq_executor import ( + AsyncPQThreadPoolExecutor, +) +from lmcache.v1.storage_backend.local_cpu_backend import LocalCPUBackend + +if TYPE_CHECKING: + pass + +logger = init_logger(__name__) + +# Supported NVMeOF transport protocols +_VALID_TRANSPORTS = {"rdma", "tcp", "fc"} + + +def _run_nvme_connect( + transport: str, + target_addr: str, + target_port: str, + target_nqn: str, +) -> bool: + """Connect to an NVMeOF target using ``nvme connect``. + + Args: + transport: Transport protocol ("rdma", "tcp", or "fc"). + target_addr: Target IP address or hostname. + target_port: Target port number (default NVMeOF port is 4420). + target_nqn: Target NVMe Qualified Name. + + Returns: + True if the connection succeeded, False otherwise. + """ + cmd = [ + "nvme", + "connect", + "-t", + transport, + "-a", + target_addr, + "-s", + target_port, + "-n", + target_nqn, + ] + try: + result = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=30, + ) + if result.returncode == 0: + logger.info( + "NVMeOF connect succeeded: transport=%s addr=%s port=%s nqn=%s", + transport, + target_addr, + target_port, + target_nqn, + ) + return True + logger.error( + "NVMeOF connect failed (rc=%d): %s", + result.returncode, + result.stderr.strip(), + ) + return False + except FileNotFoundError: + logger.error( + "nvme-cli not found. Install it with 'apt install nvme-cli' " + "or 'dnf install nvme-cli' to use auto_connect." + ) + return False + except subprocess.TimeoutExpired: + logger.error("NVMeOF connect timed out after 30 s") + return False + except Exception as exc: + logger.error("NVMeOF connect raised: %s", exc) + return False + + +def _run_nvme_disconnect(target_nqn: str) -> bool: + """Disconnect from an NVMeOF target using ``nvme disconnect``. + + Args: + target_nqn: Target NVMe Qualified Name to disconnect. + + Returns: + True if the disconnection succeeded, False otherwise. + """ + cmd = ["nvme", "disconnect", "-n", target_nqn] + try: + result = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=30, + ) + if result.returncode == 0: + logger.info("NVMeOF disconnect succeeded for nqn=%s", target_nqn) + return True + logger.warning( + "NVMeOF disconnect returned rc=%d: %s", + result.returncode, + result.stderr.strip(), + ) + return False + except FileNotFoundError: + logger.warning("nvme-cli not found; skipping disconnect") + return False + except subprocess.TimeoutExpired: + logger.error("NVMeOF disconnect timed out after 30 s") + return False + except Exception as exc: + logger.error("NVMeOF disconnect raised: %s", exc) + return False + + +class _NVMeOFWorker: + """Priority-queue async I/O worker for NVMeOF backend operations. + + Manages the async executor that serialises put (write), prefetch + (read), and delete tasks on the NVMeOF mount point. Task priorities + mirror those of LocalDiskBackend: + + - prefetch → priority 0 (highest) + - delete → priority 1 + - put → priority 2 (lowest) + """ + + def __init__(self, loop: asyncio.AbstractEventLoop) -> None: + """Initialise the worker with the given event loop. + + Args: + loop: Asyncio event loop used for scheduling coroutines. + """ + self._put_lock = threading.Lock() + self._put_tasks: List[CacheEngineKey] = [] + self._executor = AsyncPQThreadPoolExecutor(loop, max_workers=4) + self._loop = loop + self._closed = False + + async def submit_task( + self, + task_type: str, + task: Callable, + *args: Any, + **kwargs: Any, + ) -> Any: + """Schedule a task on the priority-queue executor. + + Args: + task_type: One of "prefetch", "delete", or "put". + task: Callable to execute in the thread pool. + *args: Positional arguments forwarded to *task*. + **kwargs: Keyword arguments forwarded to *task*. + + Returns: + Whatever *task* returns. + + Raises: + ValueError: If *task_type* is not recognised. + """ + if task_type == "prefetch": + priority = 0 + elif task_type == "delete": + priority = 1 + elif task_type == "put": + priority = 2 + else: + raise ValueError(f"Unknown task type: {task_type}") + + return await self._executor.submit_job(task, *args, priority=priority, **kwargs) + + def insert_put_task(self, key: CacheEngineKey) -> None: + """Record that a put task for *key* is in-flight. + + Args: + key: The cache key being written. + """ + with self._put_lock: + self._put_tasks.append(key) + + def remove_put_task(self, key: CacheEngineKey) -> None: + """Remove the in-flight record for *key*. + + Args: + key: The cache key whose write has completed. + """ + with self._put_lock: + if key in self._put_tasks: + self._put_tasks.remove(key) + else: + logger.warning("Key %s not found in put tasks", key) + + def exists_in_put_tasks(self, key: CacheEngineKey) -> bool: + """Return True if a put task for *key* is currently in flight. + + Args: + key: The cache key to check. + + Returns: + True when a concurrent write for *key* is pending. + """ + with self._put_lock: + return key in self._put_tasks + + def close(self) -> None: + """Shut down the executor, waiting for in-flight tasks to drain.""" + if self._closed: + return + self._closed = True + self._executor.shutdown(wait=True) + + +class NVMeOFBackend(StoragePluginInterface): + """KV-cache storage plugin backend for NVMeOF-attached block devices. + + The backend assumes the NVMeOF target has already been connected (either + manually or via ``auto_connect``) and the block device formatted/mounted + at ``nvmeof.mount_path``. Each KV chunk is stored as a separate file + under that mount point. + + Eviction uses the policy selected by ``nvmeof.cache_policy`` (default + LRU) and is bounded by ``nvmeof.max_capacity_gb``. + + Thread safety: + The public interface is thread-safe. Internal bookkeeping is + protected by ``_disk_lock``. Async writes are submitted to the + event-loop thread via :class:`_NVMeOFWorker`. + + Args: + dst_device: Target device for tensors retrieved from this backend. + config: LMCache engine configuration. + metadata: LMCache engine metadata. + local_cpu_backend: CPU staging buffer used during prefetch. + loop: Asyncio event loop for async I/O tasks. + """ + + def __init__( + self, + dst_device: str = "cuda", + config: Optional[LMCacheEngineConfig] = None, + metadata: Optional[LMCacheMetadata] = None, + local_cpu_backend: Optional[LocalCPUBackend] = None, + loop: Optional[asyncio.AbstractEventLoop] = None, + ) -> None: + super().__init__( + dst_device=dst_device if torch.cuda.is_available() else "cpu", + config=config, + metadata=metadata, + local_cpu_backend=local_cpu_backend, + loop=loop, + ) + + if self.loop is None: + raise ValueError("NVMeOFBackend requires an asyncio event loop") + if self.local_cpu_backend is None: + raise ValueError("NVMeOFBackend requires local_cpu_backend") + if self.config is None: + raise ValueError("NVMeOFBackend requires config") + + extra = self.config.extra_config or {} + + # Mount-point is mandatory; the block device must be pre-mounted. + mount_path: Optional[str] = extra.get("nvmeof.mount_path") + if not mount_path: + raise ValueError("extra_config['nvmeof.mount_path'] is required") + os.makedirs(mount_path, exist_ok=True) + self._mount_path: str = mount_path + + # NVMeOF connection parameters (only used when auto_connect is True). + self._transport: str = str(extra.get("nvmeof.transport", "tcp")).lower() + if self._transport not in _VALID_TRANSPORTS: + raise ValueError( + f"nvmeof.transport must be one of {sorted(_VALID_TRANSPORTS)}, " + f"got {self._transport!r}" + ) + self._target_addr: str = str(extra.get("nvmeof.target_addr", "")) + self._target_port: str = str(extra.get("nvmeof.target_port", "4420")) + self._target_nqn: str = str(extra.get("nvmeof.target_nqn", "")) + + # Lifecycle control flags. + self._auto_connect: bool = bool(extra.get("nvmeof.auto_connect", False)) + self._auto_disconnect: bool = bool(extra.get("nvmeof.auto_disconnect", False)) + + # O_DIRECT settings. + self._use_odirect: bool = bool(extra.get("nvmeof.use_odirect", False)) + stat = os.statvfs(self._mount_path) + self._os_disk_bs: int = stat.f_bsize + + # Capacity management. + max_gb: float = float(extra.get("nvmeof.max_capacity_gb", 0.0)) + self._max_cache_size: int = ( + int(max_gb * 1024**3) if max_gb > 0 else 2**62 + ) + self._current_cache_size: float = 0.0 + + # Cache policy and in-memory index. + policy_name: str = str(extra.get("nvmeof.cache_policy", "lru")) + self._cache_policy = get_cache_policy(policy_name) + self._dict = self._cache_policy.init_mutable_mapping() + + self._disk_lock = threading.Lock() + self._keys_in_request: List[CacheEngineKey] = [] + + # Async worker. + self._worker = _NVMeOFWorker(self.loop) + + self._stats_monitor = LMCStatsMonitor.GetOrCreate() + self._usage: int = 0 + + # Optionally connect to the target. + if self._auto_connect: + if not self._target_addr or not self._target_nqn: + raise ValueError( + "nvmeof.target_addr and nvmeof.target_nqn are required " + "when nvmeof.auto_connect is True" + ) + connected = _run_nvme_connect( + self._transport, + self._target_addr, + self._target_port, + self._target_nqn, + ) + if not connected: + raise RuntimeError( + f"Failed to auto-connect to NVMeOF target {self._target_nqn}" + ) + + logger.info( + "NVMeOFBackend initialised: mount_path=%s transport=%s " + "odirect=%s max_cache_gb=%.1f policy=%s", + self._mount_path, + self._transport, + self._use_odirect, + max_gb if max_gb > 0 else float("inf"), + policy_name, + ) + + def __str__(self) -> str: + return "NVMeOFBackend" + + # ------------------------------------------------------------------ + # Key ↔ path mapping + # ------------------------------------------------------------------ + + def _key_to_path(self, key: CacheEngineKey) -> str: + """Map a cache key to an absolute file path under the mount point. + + Args: + key: The cache engine key. + + Returns: + Absolute path string for the file that stores the KV chunk. + """ + return os.path.join( + self._mount_path, + key.to_string().replace("/", "-") + ".nvmeof", + ) + + # ------------------------------------------------------------------ + # StorageBackendInterface + # ------------------------------------------------------------------ + + def contains(self, key: CacheEngineKey, pin: bool = False) -> bool: + """Check whether *key* is stored in this backend. + + Args: + key: The cache engine key to look up. + pin: If True, pin the entry to prevent eviction during retrieval. + + Returns: + True if the key exists in the index. + """ + with self._disk_lock: + if key not in self._dict: + return False + if pin: + self._dict[key].pin() + self._keys_in_request.append(key) + return True + + def touch_cache(self) -> None: + """Update eviction-policy recency for keys accessed during the last request. + + Called by the engine after each request to record which keys were + used, so the LRU (or other) policy can track access order. + """ + with self._disk_lock: + for key in reversed(self._keys_in_request): + self._cache_policy.update_on_hit(key, self._dict) + self._keys_in_request = [] + + def exists_in_put_tasks(self, key: CacheEngineKey) -> bool: + """Return True if *key* has an in-flight write. + + Args: + key: The cache engine key to check. + + Returns: + True when a concurrent write for *key* is pending. + """ + return self._worker.exists_in_put_tasks(key) + + def pin(self, key: CacheEngineKey) -> bool: + """Pin *key* to prevent eviction. + + Args: + key: The cache engine key to pin. + + Returns: + True if the key exists and was pinned; False otherwise. + """ + with self._disk_lock: + if key in self._dict: + self._dict[key].pin() + return True + return False + + def unpin(self, key: CacheEngineKey) -> bool: + """Unpin *key* to allow eviction again. + + Args: + key: The cache engine key to unpin. + + Returns: + True if the key exists and was unpinned; False otherwise. + """ + with self._disk_lock: + if key in self._dict: + self._dict[key].unpin() + return True + return False + + def remove(self, key: CacheEngineKey, force: bool = True) -> bool: + """Remove a KV chunk from the index and delete its file. + + Args: + key: The cache engine key to remove. + force: When True, acquire the internal lock (external eviction); + when False, the caller already holds the lock (internal eviction). + + Returns: + True if the key existed and was removed; False if it was not found. + """ + if force: + self._disk_lock.acquire() + + meta: Optional[DiskCacheMetadata] = self._dict.pop(key, None) + if not meta: + if force: + self._disk_lock.release() + return False + + path = meta.path + size = meta.size + self._usage -= size + self._stats_monitor.update_local_storage_usage(self._usage) + + try: + os.remove(path) + except OSError as exc: + logger.warning("Failed to remove NVMeOF file %s: %s", path, exc) + + if force: + self._cache_policy.update_on_force_evict(key) + self._disk_lock.release() + + return True + + def _insert_key( + self, + key: CacheEngineKey, + size: int, + shape: torch.Size, + dtype: torch.dtype, + fmt: MemoryFormat, + cached_positions: Optional[torch.Tensor] = None, + ) -> None: + """Add or refresh an entry in the in-memory index. + + Args: + key: The cache engine key. + size: File size in bytes. + shape: Tensor shape of the stored chunk. + dtype: Tensor dtype of the stored chunk. + fmt: Memory format of the stored chunk. + cached_positions: Optional token-position tensor for the chunk. + """ + path = self._key_to_path(key) + with self._disk_lock: + if key in self._dict: + self._cache_policy.update_on_hit(key, self._dict) + else: + self._dict[key] = DiskCacheMetadata( + path, size, shape, dtype, cached_positions, fmt, 0 + ) + + def _submit_put_task( + self, + key: CacheEngineKey, + memory_obj: MemoryObj, + on_complete_callback: Optional[Callable[[CacheEngineKey], None]] = None, + ) -> None: + """Queue a single file-write task on the async worker. + + Eviction is triggered synchronously before the async write if the + backend is at capacity. The function returns immediately after + scheduling the write. + + Args: + key: The cache key for this KV chunk. + memory_obj: Memory object holding the tensor data. + on_complete_callback: Optional callback invoked once per key + after the write completes. Exceptions are caught and logged. + """ + if memory_obj.tensor is None: + return + + if self._worker.exists_in_put_tasks(key): + logger.debug("Put task for %s already in progress; skipping", key) + return + + self._worker.insert_put_task(key) + + required_size = memory_obj.get_physical_size() + evict_success = True + + with self._disk_lock: + while self._current_cache_size + required_size > self._max_cache_size: + evict_keys = self._cache_policy.get_evict_candidates( + self._dict, num_candidates=1 + ) + if not evict_keys: + logger.warning( + "NVMeOFBackend: no eviction candidates; " + "mount_path=%s is under pressure", + self._mount_path, + ) + evict_success = False + break + + for evict_key in evict_keys: + self._current_cache_size -= self._dict[evict_key].size + + self.batched_remove(evict_keys, force=False) + + if evict_success: + self._current_cache_size += required_size + self._cache_policy.update_on_put(key) + + if not evict_success: + self._worker.remove_put_task(key) + return + + memory_obj.ref_count_up() + assert self.loop is not None + asyncio.run_coroutine_threadsafe( + self._worker.submit_task( + "put", + self._write_chunk, + key=key, + memory_obj=memory_obj, + on_complete_callback=on_complete_callback, + ), + self.loop, + ) + + def batched_submit_put_task( + self, + keys: Sequence[CacheEngineKey], + objs: List[MemoryObj], + transfer_spec: Any = None, + on_complete_callback: Optional[Callable[[CacheEngineKey], None]] = None, + ) -> None: + """Submit file-write tasks for a batch of KV chunks. + + Args: + keys: Cache keys for the KV chunks. + objs: Memory objects containing the KV data. + transfer_spec: Unused; present for interface compatibility. + on_complete_callback: Optional callback invoked once per key + after that key's write completes. Exceptions are caught + and logged. + """ + for key, obj in zip(keys, objs, strict=False): + self._submit_put_task(key, obj, on_complete_callback=on_complete_callback) + + def get_blocking(self, key: CacheEngineKey) -> Optional[MemoryObj]: + """Synchronously load a KV chunk from the NVMeOF filesystem. + + Args: + key: The cache engine key to load. + + Returns: + A :class:`MemoryObj` with the loaded tensor data, or ``None`` if + the key is not in the index. + """ + self._disk_lock.acquire() + if key not in self._dict: + self._disk_lock.release() + return None + + self._cache_policy.update_on_hit(key, self._dict) + disk_meta = self._dict[key] + path = disk_meta.path + dtype = disk_meta.dtype + shape = disk_meta.shape + fmt = disk_meta.fmt + assert dtype is not None + assert shape is not None + self._disk_lock.release() + + memory_obj = self.local_cpu_backend.allocate(shape, dtype, fmt) + assert memory_obj is not None, "Memory allocation failed during NVMeOF load" + + buf = memory_obj.byte_array + self._read_file(key, buf, path) + + cached_positions = self._dict[key].cached_positions + memory_obj.metadata.cached_positions = cached_positions + + return memory_obj + + async def batched_async_contains( + self, + lookup_id: str, + keys: list[CacheEngineKey], + pin: bool = False, + ) -> int: + """Async batch lookup: return the number of contiguous prefix hits. + + Args: + lookup_id: Caller-provided lookup identifier (unused internally). + keys: Ordered keys to check. + pin: If True, pin each hit to prevent eviction. + + Returns: + Number of leading keys found in the index (prefix count). + """ + hit_count = 0 + with self._disk_lock: + for key in keys: + if key not in self._dict: + return hit_count + if pin: + self._dict[key].pin() + self._keys_in_request.append(key) + hit_count += 1 + return hit_count + + async def batched_get_non_blocking( + self, + lookup_id: str, + keys: list[CacheEngineKey], + transfer_spec: Any = None, + ) -> list[MemoryObj]: + """Async non-blocking batch load from the NVMeOF filesystem. + + Queues a prefetch task on the priority executor and awaits completion. + + Args: + lookup_id: Caller-provided lookup identifier (for logging). + keys: Ordered keys to load (must all be pinned / present). + transfer_spec: Unused; present for interface compatibility. + + Returns: + List of loaded :class:`MemoryObj` for the leading prefix of found + keys. Shorter than *keys* if allocation fails mid-way. + """ + mem_objs: list[MemoryObj] = [] + paths: list[str] = [] + + logger.debug( + "lookup_id=%s; NVMeOFBackend prefetching %d keys", lookup_id, len(keys) + ) + for key in keys: + self._disk_lock.acquire() + assert key in self._dict, f"Key {key} not found after pinning" + + path = self._dict[key].path + dtype = self._dict[key].dtype + shape = self._dict[key].shape + fmt = self._dict[key].fmt + assert dtype is not None + assert shape is not None + + memory_obj = self.local_cpu_backend.allocate( + shape, dtype, fmt, busy_loop=False + ) + if memory_obj is None: + logger.error( + "NVMeOFBackend: memory allocation failed during async load " + "for key %s; CPU staging pool may be exhausted", + key, + ) + self._disk_lock.release() + return mem_objs + + self._dict[key].pin() + self._cache_policy.update_on_hit(key, self._dict) + self._disk_lock.release() + + memory_obj.pin() + mem_objs.append(memory_obj) + paths.append(path) + + return await self._worker.submit_task( + "prefetch", + self._batched_read_files, + paths=paths, + keys=keys, + memory_objs=mem_objs, + ) + + def get_allocator_backend(self) -> AllocatorBackendInterface: + """Return the CPU staging allocator used during loads. + + Returns: + The :class:`LocalCPUBackend` instance provided at construction. + """ + assert self.local_cpu_backend is not None + return self.local_cpu_backend + + def close(self) -> None: + """Shut down the backend and optionally disconnect from the NVMeOF target. + + Waits for in-flight writes to complete before releasing resources. + """ + self._worker.close() + + if self._auto_disconnect and self._target_nqn: + _run_nvme_disconnect(self._target_nqn) + + logger.info("NVMeOFBackend closed (mount_path=%s)", self._mount_path) + + # ------------------------------------------------------------------ + # I/O helpers + # ------------------------------------------------------------------ + + def _write_chunk( + self, + key: CacheEngineKey, + memory_obj: MemoryObj, + on_complete_callback: Optional[Callable[[CacheEngineKey], None]] = None, + ) -> None: + """Write a KV chunk to the NVMeOF filesystem (runs in thread pool). + + Args: + key: Cache key identifying the chunk. + memory_obj: Memory object holding the tensor data. + on_complete_callback: Optional callback invoked after the write. + Exceptions from the callback are caught and logged. + """ + assert memory_obj.tensor is not None + + buf = memory_obj.byte_array + path = self._key_to_path(key) + size = len(buf) + + start = time.time() + self._usage += size + self._stats_monitor.update_local_storage_usage(self._usage) + + self._write_file(buf, path) + + elapsed = time.time() - start + logger.debug( + "NVMeOF write: %d bytes in %.3f s (%.1f MB/s)", + size, + elapsed, + size / max(elapsed, 1e-9) / 1e6, + ) + + shape = memory_obj.metadata.shape + dtype = memory_obj.metadata.dtype + fmt = memory_obj.metadata.fmt + cached_positions = memory_obj.metadata.cached_positions + size_phys = memory_obj.get_physical_size() + memory_obj.ref_count_down() + + self._insert_key(key, size_phys, shape, dtype, fmt, cached_positions) + self._worker.remove_put_task(key) + + if on_complete_callback is not None: + try: + on_complete_callback(key) + except Exception as exc: + logger.warning("on_complete_callback failed for key %s: %s", key, exc) + + def _batched_read_files( + self, + paths: list[str], + keys: list[CacheEngineKey], + memory_objs: list[MemoryObj], + ) -> list[MemoryObj]: + """Read multiple KV chunks from disk into pre-allocated buffers. + + Args: + paths: File paths to read. + keys: Cache keys corresponding to each path. + memory_objs: Pre-allocated memory objects to fill. + + Returns: + The same *memory_objs* list (buffers filled in-place). + """ + for path, key, mem_obj in zip(paths, keys, memory_objs, strict=False): + buf = mem_obj.byte_array + self._read_file(key, buf, path) + + cached_positions = self._dict[key].cached_positions + mem_obj.metadata.cached_positions = cached_positions + + self._disk_lock.acquire() + self._dict[key].unpin() + self._disk_lock.release() + + return memory_objs + + def _write_file(self, buffer: Any, path: str) -> None: + """Write *buffer* to *path*, using O_DIRECT when configured. + + Args: + buffer: Byte-like object to write. + path: Destination file path. + """ + size = len(buffer) + if self._use_odirect and size % self._os_disk_bs == 0: + fd = os.open(path, os.O_CREAT | os.O_WRONLY | os.O_DIRECT, 0o644) + try: + os.write(fd, buffer) + finally: + os.close(fd) + else: + if self._use_odirect: + logger.debug( + "Skipping O_DIRECT for write (size %d not aligned to %d)", + size, + self._os_disk_bs, + ) + with open(path, "wb") as f: + f.write(buffer) + + def _read_file( + self, + key: CacheEngineKey, + buffer: Any, + path: str, + ) -> None: + """Read *path* into *buffer*, using O_DIRECT when configured. + + Args: + key: Cache key (used for logging and index cleanup on miss). + buffer: Pre-allocated byte buffer to fill. + path: Source file path. + """ + size = len(buffer) + aligned = size % self._os_disk_bs == 0 + + if self._use_odirect and not aligned: + logger.warning( + "Cannot use O_DIRECT for read of key %s (size %d not aligned)", + key, + size, + ) + + try: + if self._use_odirect and aligned: + fd = os.open(path, os.O_RDONLY | os.O_DIRECT) + try: + with os.fdopen(fd, "rb", buffering=0) as fdo: + fd = -1 + fdo.readinto(buffer) + except Exception: + if fd >= 0: + os.close(fd) + raise + else: + with open(path, "rb") as f: + f.readinto(buffer) + except FileNotFoundError: + logger.warning("NVMeOF file not found: %s; removing index entry", path) + with self._disk_lock: + self._dict.pop(key, None) + except OSError as exc: + logger.error("NVMeOF read error for %s: %s", path, exc) diff --git a/tests/v1/storage_backend/test_nvmeof_backend.py b/tests/v1/storage_backend/test_nvmeof_backend.py new file mode 100644 index 0000000000..991a81d93e --- /dev/null +++ b/tests/v1/storage_backend/test_nvmeof_backend.py @@ -0,0 +1,716 @@ +# SPDX-License-Identifier: Apache-2.0 +""" +Tests for NVMeOFBackend (StoragePluginInterface) and NVMeOFL2Adapter +(L2AdapterInterface). + +Both modules target a mounted NVMeOF filesystem; tests use a local temp +directory to avoid requiring real NVMeOF hardware. +""" + +# Standard +import asyncio +import os +from unittest.mock import MagicMock, patch + +# Third Party +import pytest +import torch + +# First Party +from lmcache.utils import CacheEngineKey +from lmcache.v1.config import LMCacheEngineConfig +from lmcache.v1.distributed.api import ObjectKey +from lmcache.v1.distributed.l2_adapters.nvmeof_l2_adapter import ( + NVMeOFL2Adapter, + NVMeOFL2AdapterConfig, + _filename_to_object_key, + _object_key_to_filename, + _run_nvme_connect, + _run_nvme_disconnect, +) +from lmcache.v1.metadata import LMCacheMetadata +from lmcache.v1.storage_backend.plugins.nvmeof_backend import ( # noqa: F401 + NVMeOFBackend, + _run_nvme_connect as backend_connect, + _run_nvme_disconnect as backend_disconnect, +) + + +# --------------------------------------------------------------------------- +# Shared helpers +# --------------------------------------------------------------------------- + + +def _make_object_key(idx: int = 0, salt: str = "") -> ObjectKey: + return ObjectKey( + chunk_hash=bytes([idx % 256] * 16), + model_name="test-model", + kv_rank=idx, + cache_salt=salt, + ) + + +def _make_memory_obj(size: int = 1024) -> MagicMock: + """Return a mock MemoryObj whose byte_array is a real bytearray.""" + buf = bytearray(os.urandom(size)) + obj = MagicMock() + obj.byte_array = buf + obj.tensor = torch.zeros(1) + obj.get_physical_size.return_value = size + obj.metadata = MagicMock() + obj.metadata.shape = torch.Size([1]) + obj.metadata.dtype = torch.float32 + obj.metadata.fmt = MagicMock() + obj.metadata.cached_positions = None + return obj + + +# --------------------------------------------------------------------------- +# Filename helpers +# --------------------------------------------------------------------------- + + +class TestFilenameHelpers: + def test_roundtrip_no_salt(self): + key = _make_object_key(1) + fname = _object_key_to_filename(key) + assert fname.endswith(".nvmeof") + recovered = _filename_to_object_key(fname) + assert recovered == key + + def test_roundtrip_with_salt(self): + key = _make_object_key(2, salt="user-abc") + fname = _object_key_to_filename(key) + assert "user-abc" in fname + recovered = _filename_to_object_key(fname) + assert recovered == key + + def test_slash_in_model_name_encoded(self): + key = ObjectKey( + chunk_hash=b"\x00" * 16, + model_name="org/model", + kv_rank=0, + ) + fname = _object_key_to_filename(key) + assert "/" not in fname + recovered = _filename_to_object_key(fname) + assert recovered is not None + assert recovered.model_name == "org/model" + + def test_unrecognised_file_returns_none(self): + assert _filename_to_object_key("random_file.txt") is None + assert _filename_to_object_key("bad@format.nvmeof") is None + + def test_different_keys_produce_different_filenames(self): + a = _object_key_to_filename(_make_object_key(0)) + b = _object_key_to_filename(_make_object_key(1)) + assert a != b + + +# --------------------------------------------------------------------------- +# NVMeOF connection helpers +# --------------------------------------------------------------------------- + + +class TestNVMeConnectHelpers: + def test_connect_success(self): + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=0, stderr="") + result = _run_nvme_connect("tcp", "127.0.0.1", "4420", "nqn.test") + assert result is True + + def test_connect_failure(self): + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=1, stderr="error") + result = _run_nvme_connect("tcp", "127.0.0.1", "4420", "nqn.test") + assert result is False + + def test_connect_no_nvme_cli(self): + with patch("subprocess.run", side_effect=FileNotFoundError): + result = _run_nvme_connect("tcp", "127.0.0.1", "4420", "nqn.test") + assert result is False + + def test_disconnect_success(self): + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=0, stderr="") + result = _run_nvme_disconnect("nqn.test") + assert result is True + + def test_disconnect_no_nvme_cli(self): + with patch("subprocess.run", side_effect=FileNotFoundError): + result = _run_nvme_disconnect("nqn.test") + assert result is False + + def test_backend_connect_success(self): + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=0, stderr="") + result = backend_connect("rdma", "10.0.0.1", "4420", "nqn.prod") + assert result is True + + def test_backend_disconnect_success(self): + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=0, stderr="") + result = backend_disconnect("nqn.prod") + assert result is True + + +# --------------------------------------------------------------------------- +# NVMeOFL2AdapterConfig +# --------------------------------------------------------------------------- + + +class TestNVMeOFL2AdapterConfig: + def test_from_dict_minimal(self): + cfg = NVMeOFL2AdapterConfig.from_dict({"type": "nvmeof", "mount_path": "/mnt/nv"}) + assert cfg.mount_path == "/mnt/nv" + assert cfg.transport == "tcp" + assert cfg.auto_connect is False + assert cfg.auto_disconnect is False + assert cfg.use_odirect is False + + def test_from_dict_full(self): + d = { + "type": "nvmeof", + "mount_path": "/mnt/nv", + "transport": "rdma", + "target_addr": "10.0.0.1", + "target_port": "4420", + "target_nqn": "nqn.test", + "auto_connect": False, + "auto_disconnect": True, + "use_odirect": False, + } + cfg = NVMeOFL2AdapterConfig.from_dict(d) + assert cfg.transport == "rdma" + assert cfg.target_addr == "10.0.0.1" + assert cfg.target_nqn == "nqn.test" + assert cfg.auto_disconnect is True + + def test_from_dict_missing_mount_path_raises(self): + with pytest.raises(ValueError, match="mount_path"): + NVMeOFL2AdapterConfig.from_dict({"type": "nvmeof"}) + + def test_from_dict_invalid_transport_raises(self): + with pytest.raises(ValueError, match="transport"): + NVMeOFL2AdapterConfig.from_dict( + {"type": "nvmeof", "mount_path": "/mnt/nv", "transport": "ib"} + ) + + def test_from_dict_invalid_bool_raises(self): + with pytest.raises(ValueError): + NVMeOFL2AdapterConfig.from_dict( + {"type": "nvmeof", "mount_path": "/mnt/nv", "auto_connect": "yes"} + ) + + def test_help_returns_string(self): + h = NVMeOFL2AdapterConfig.help() + assert isinstance(h, str) + assert "mount_path" in h + + +# --------------------------------------------------------------------------- +# NVMeOFL2Adapter — fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture +def tmp_mount(tmp_path): + """Yield a temp directory that acts as the NVMeOF mount point.""" + mount = tmp_path / "nvmeof_mount" + mount.mkdir() + return mount + + +@pytest.fixture +def adapter_config(tmp_mount): + return NVMeOFL2AdapterConfig(mount_path=str(tmp_mount)) + + +@pytest.fixture +def adapter(adapter_config): + a = NVMeOFL2Adapter(adapter_config) + yield a + a.close() + + +# --------------------------------------------------------------------------- +# NVMeOFL2Adapter — event fd interface +# --------------------------------------------------------------------------- + + +class TestNVMeOFL2AdapterEventFds: + def test_three_distinct_fds(self, adapter): + store_fd = adapter.get_store_event_fd() + lookup_fd = adapter.get_lookup_and_lock_event_fd() + load_fd = adapter.get_load_event_fd() + assert len({store_fd, lookup_fd, load_fd}) == 3 + for fd in (store_fd, lookup_fd, load_fd): + assert isinstance(fd, int) + assert fd >= 0 + + +# --------------------------------------------------------------------------- +# NVMeOFL2Adapter — store +# --------------------------------------------------------------------------- + + +def _wait_for_event(fd: int, timeout: float = 5.0) -> bool: + """Poll *fd* until it becomes readable or *timeout* seconds elapse.""" + import select + r, _, _ = select.select([fd], [], [], timeout) + return bool(r) + + +class TestNVMeOFL2AdapterStore: + def test_store_creates_file(self, adapter, tmp_mount): + key = _make_object_key(0) + obj = _make_memory_obj(512) + + task_id = adapter.submit_store_task([key], [obj]) + assert _wait_for_event(adapter.get_store_event_fd()), "store event not signalled" + + completed = adapter.pop_completed_store_tasks() + assert task_id in completed + assert completed[task_id] is True + + expected = tmp_mount / _object_key_to_filename(key) + assert expected.exists() + assert expected.read_bytes() == bytes(obj.byte_array) + + def test_store_idempotent(self, adapter, tmp_mount): + key = _make_object_key(1) + obj = _make_memory_obj(256) + + adapter.submit_store_task([key], [obj]) + _wait_for_event(adapter.get_store_event_fd()) + adapter.pop_completed_store_tasks() + + obj2 = _make_memory_obj(256) + task_id2 = adapter.submit_store_task([key], [obj2]) + _wait_for_event(adapter.get_store_event_fd()) + completed = adapter.pop_completed_store_tasks() + + assert completed[task_id2] is True + # File should still contain the first write + path = tmp_mount / _object_key_to_filename(key) + assert path.read_bytes() == bytes(obj.byte_array) + + def test_store_multiple_keys(self, adapter, tmp_mount): + keys = [_make_object_key(i) for i in range(3)] + objs = [_make_memory_obj(128) for _ in range(3)] + + task_id = adapter.submit_store_task(keys, objs) + _wait_for_event(adapter.get_store_event_fd()) + completed = adapter.pop_completed_store_tasks() + assert completed[task_id] is True + for key in keys: + assert (tmp_mount / _object_key_to_filename(key)).exists() + + def test_pop_clears_completed_tasks(self, adapter): + key = _make_object_key(10) + obj = _make_memory_obj(64) + adapter.submit_store_task([key], [obj]) + _wait_for_event(adapter.get_store_event_fd()) + first = adapter.pop_completed_store_tasks() + assert first + second = adapter.pop_completed_store_tasks() + assert second == {} + + +# --------------------------------------------------------------------------- +# NVMeOFL2Adapter — lookup +# --------------------------------------------------------------------------- + + +class TestNVMeOFL2AdapterLookup: + def test_lookup_miss(self, adapter): + key = _make_object_key(99) + task_id = adapter.submit_lookup_and_lock_task([key]) + _wait_for_event(adapter.get_lookup_and_lock_event_fd()) + bitmap = adapter.query_lookup_and_lock_result(task_id) + assert bitmap is not None + assert not bitmap.get(0) + + def test_lookup_hit_after_store(self, adapter): + key = _make_object_key(5) + obj = _make_memory_obj(64) + adapter.submit_store_task([key], [obj]) + _wait_for_event(adapter.get_store_event_fd()) + adapter.pop_completed_store_tasks() + + task_id = adapter.submit_lookup_and_lock_task([key]) + _wait_for_event(adapter.get_lookup_and_lock_event_fd()) + bitmap = adapter.query_lookup_and_lock_result(task_id) + assert bitmap is not None + assert bitmap.get(0) + + def test_lookup_partial_hit(self, adapter): + key_present = _make_object_key(6) + key_absent = _make_object_key(7) + obj = _make_memory_obj(64) + adapter.submit_store_task([key_present], [obj]) + _wait_for_event(adapter.get_store_event_fd()) + adapter.pop_completed_store_tasks() + + task_id = adapter.submit_lookup_and_lock_task([key_present, key_absent]) + _wait_for_event(adapter.get_lookup_and_lock_event_fd()) + bitmap = adapter.query_lookup_and_lock_result(task_id) + assert bitmap is not None + assert bitmap.get(0) + assert not bitmap.get(1) + + def test_query_result_is_consumed(self, adapter): + key = _make_object_key(20) + task_id = adapter.submit_lookup_and_lock_task([key]) + _wait_for_event(adapter.get_lookup_and_lock_event_fd()) + first = adapter.query_lookup_and_lock_result(task_id) + assert first is not None + second = adapter.query_lookup_and_lock_result(task_id) + assert second is None + + def test_submit_unlock_is_noop(self, adapter): + # submit_unlock must not raise + adapter.submit_unlock([_make_object_key(0)]) + + +# --------------------------------------------------------------------------- +# NVMeOFL2Adapter — load +# --------------------------------------------------------------------------- + + +class TestNVMeOFL2AdapterLoad: + def test_load_hit(self, adapter): + key = _make_object_key(30) + original = _make_memory_obj(256) + adapter.submit_store_task([key], [original]) + _wait_for_event(adapter.get_store_event_fd()) + adapter.pop_completed_store_tasks() + + dst = _make_memory_obj(256) + dst.byte_array = bytearray(256) + task_id = adapter.submit_load_task([key], [dst]) + _wait_for_event(adapter.get_load_event_fd()) + bitmap = adapter.query_load_result(task_id) + assert bitmap is not None + assert bitmap.get(0) + assert bytes(dst.byte_array) == bytes(original.byte_array) + + def test_load_miss(self, adapter): + key = _make_object_key(31) + dst = _make_memory_obj(64) + dst.byte_array = bytearray(64) + task_id = adapter.submit_load_task([key], [dst]) + _wait_for_event(adapter.get_load_event_fd()) + bitmap = adapter.query_load_result(task_id) + assert bitmap is not None + assert not bitmap.get(0) + + def test_load_result_consumed_once(self, adapter): + key = _make_object_key(32) + dst = _make_memory_obj(64) + dst.byte_array = bytearray(64) + task_id = adapter.submit_load_task([key], [dst]) + _wait_for_event(adapter.get_load_event_fd()) + first = adapter.query_load_result(task_id) + assert first is not None + assert adapter.query_load_result(task_id) is None + + +# --------------------------------------------------------------------------- +# NVMeOFL2Adapter — delete +# --------------------------------------------------------------------------- + + +class TestNVMeOFL2AdapterDelete: + def test_delete_existing_file(self, adapter, tmp_mount): + key = _make_object_key(40) + obj = _make_memory_obj(128) + adapter.submit_store_task([key], [obj]) + _wait_for_event(adapter.get_store_event_fd()) + adapter.pop_completed_store_tasks() + + path = tmp_mount / _object_key_to_filename(key) + assert path.exists() + + adapter.delete([key]) + assert not path.exists() + + def test_delete_nonexistent_is_safe(self, adapter): + adapter.delete([_make_object_key(999)]) + + def test_delete_notifies_listeners(self, adapter): + from lmcache.v1.distributed.internal_api import L2AdapterListener + + listener = MagicMock(spec=L2AdapterListener) + adapter.register_listener(listener) + key = _make_object_key(50) + adapter.delete([key]) + listener.on_l2_keys_deleted.assert_called_once_with([key]) + + +# --------------------------------------------------------------------------- +# NVMeOFL2Adapter — status and usage +# --------------------------------------------------------------------------- + + +class TestNVMeOFL2AdapterStatus: + def test_report_status_healthy(self, adapter): + status = adapter.report_status() + assert status["is_healthy"] is True + assert status["type"] == "NVMeOFL2Adapter" + assert "mount_path" in status + + def test_get_usage_returns_sentinel(self, adapter): + assert adapter.get_usage() == (-1.0, -1.0) + + +# --------------------------------------------------------------------------- +# NVMeOFL2Adapter — auto-connect / auto-disconnect +# --------------------------------------------------------------------------- + + +class TestNVMeOFL2AdapterConnect: + def test_auto_connect_called_on_init(self, tmp_mount): + with patch( + "lmcache.v1.distributed.l2_adapters.nvmeof_l2_adapter._run_nvme_connect", + return_value=True, + ) as mock_conn: + cfg = NVMeOFL2AdapterConfig( + mount_path=str(tmp_mount), + transport="tcp", + target_addr="10.0.0.1", + target_port="4420", + target_nqn="nqn.test", + auto_connect=True, + auto_disconnect=False, + ) + a = NVMeOFL2Adapter(cfg) + a.close() + mock_conn.assert_called_once_with("tcp", "10.0.0.1", "4420", "nqn.test") + + def test_auto_connect_failure_raises(self, tmp_mount): + with patch( + "lmcache.v1.distributed.l2_adapters.nvmeof_l2_adapter._run_nvme_connect", + return_value=False, + ): + with pytest.raises(RuntimeError, match="auto-connect"): + NVMeOFL2AdapterConfig( + mount_path=str(tmp_mount), + transport="tcp", + target_addr="10.0.0.1", + target_nqn="nqn.test", + auto_connect=True, + ) + + def test_auto_disconnect_called_on_close(self, tmp_mount): + with patch( + "lmcache.v1.distributed.l2_adapters.nvmeof_l2_adapter._run_nvme_disconnect", + return_value=True, + ) as mock_disc: + cfg = NVMeOFL2AdapterConfig( + mount_path=str(tmp_mount), + auto_disconnect=True, + target_nqn="nqn.test", + ) + a = NVMeOFL2Adapter(cfg) + a.close() + mock_disc.assert_called_once_with("nqn.test") + + def test_no_auto_connect_by_default(self, tmp_mount): + with patch( + "lmcache.v1.distributed.l2_adapters.nvmeof_l2_adapter._run_nvme_connect" + ) as mock_conn: + cfg = NVMeOFL2AdapterConfig(mount_path=str(tmp_mount)) + a = NVMeOFL2Adapter(cfg) + a.close() + mock_conn.assert_not_called() + + +# --------------------------------------------------------------------------- +# NVMeOFL2Adapter — registration +# --------------------------------------------------------------------------- + + +class TestNVMeOFL2AdapterRegistration: + def test_registered_as_nvmeof(self): + from lmcache.v1.distributed.l2_adapters.config import ( + _L2_ADAPTER_CONFIG_REGISTRY, + ) + assert "nvmeof" in _L2_ADAPTER_CONFIG_REGISTRY + assert _L2_ADAPTER_CONFIG_REGISTRY["nvmeof"] is NVMeOFL2AdapterConfig + + def test_create_via_factory(self, tmp_mount): + from lmcache.v1.distributed.l2_adapters import create_l2_adapter + + cfg = NVMeOFL2AdapterConfig(mount_path=str(tmp_mount)) + adapter = create_l2_adapter(cfg) + assert isinstance(adapter, NVMeOFL2Adapter) + adapter.close() + + +# --------------------------------------------------------------------------- +# NVMeOFBackend (StoragePluginInterface) — basic smoke tests +# --------------------------------------------------------------------------- + + +@pytest.fixture +def nvmeof_loop(): + loop = asyncio.new_event_loop() + yield loop + loop.close() + + +@pytest.fixture +def nvmeof_backend(tmp_path, nvmeof_loop): + from lmcache.v1.memory_management import MemoryFormat + from lmcache.v1.storage_backend.local_cpu_backend import LocalCPUBackend + + mount = tmp_path / "nvmeof_backend" + mount.mkdir() + + config = LMCacheEngineConfig.from_defaults( + chunk_size=256, + extra_config={ + "nvmeof.mount_path": str(mount), + }, + ) + metadata = LMCacheMetadata( + model_name="test_model", + world_size=1, + local_world_size=1, + worker_id=0, + local_worker_id=0, + kv_dtype=torch.bfloat16, + kv_shape=(2, 2, 256, 4, 64), + ) + cpu_backend = LocalCPUBackend(config) + + backend = NVMeOFBackend( + dst_device="cpu", + config=config, + metadata=metadata, + local_cpu_backend=cpu_backend, + loop=nvmeof_loop, + ) + yield backend + backend.close() + cpu_backend.memory_allocator.close() + + +class TestNVMeOFBackendSmoke: + def test_str(self, nvmeof_backend): + assert str(nvmeof_backend) == "NVMeOFBackend" + + def test_contains_missing_key(self, nvmeof_backend): + key = CacheEngineKey( + model_name="test_model", + world_size=1, + worker_id=0, + chunk_hash=0, + dtype=torch.bfloat16, + ) + assert not nvmeof_backend.contains(key) + + def test_exists_in_put_tasks_empty(self, nvmeof_backend): + key = CacheEngineKey( + model_name="test_model", + world_size=1, + worker_id=0, + chunk_hash=1, + dtype=torch.bfloat16, + ) + assert not nvmeof_backend.exists_in_put_tasks(key) + + def test_get_blocking_missing_key(self, nvmeof_backend): + key = CacheEngineKey( + model_name="test_model", + world_size=1, + worker_id=0, + chunk_hash=2, + dtype=torch.bfloat16, + ) + assert nvmeof_backend.get_blocking(key) is None + + def test_pin_unpin_missing_key(self, nvmeof_backend): + key = CacheEngineKey( + model_name="test_model", + world_size=1, + worker_id=0, + chunk_hash=3, + dtype=torch.bfloat16, + ) + assert not nvmeof_backend.pin(key) + assert not nvmeof_backend.unpin(key) + + def test_remove_missing_key(self, nvmeof_backend): + key = CacheEngineKey( + model_name="test_model", + world_size=1, + worker_id=0, + chunk_hash=4, + dtype=torch.bfloat16, + ) + assert not nvmeof_backend.remove(key) + + def test_missing_mount_path_raises(self, nvmeof_loop): + from lmcache.v1.storage_backend.local_cpu_backend import LocalCPUBackend + + config = LMCacheEngineConfig.from_defaults(chunk_size=256, extra_config={}) + metadata = LMCacheMetadata( + model_name="m", + world_size=1, + local_world_size=1, + worker_id=0, + local_worker_id=0, + kv_dtype=torch.bfloat16, + kv_shape=(2, 2, 256, 4, 64), + ) + cpu_backend = LocalCPUBackend(config) + with pytest.raises(ValueError, match="mount_path"): + NVMeOFBackend( + dst_device="cpu", + config=config, + metadata=metadata, + local_cpu_backend=cpu_backend, + loop=nvmeof_loop, + ) + cpu_backend.memory_allocator.close() + + def test_auto_connect_calls_nvme(self, tmp_path, nvmeof_loop): + from lmcache.v1.storage_backend.local_cpu_backend import LocalCPUBackend + + mount = tmp_path / "ac" + mount.mkdir() + config = LMCacheEngineConfig.from_defaults( + chunk_size=256, + extra_config={ + "nvmeof.mount_path": str(mount), + "nvmeof.auto_connect": True, + "nvmeof.target_addr": "10.0.0.1", + "nvmeof.target_nqn": "nqn.test", + }, + ) + metadata = LMCacheMetadata( + model_name="m", + world_size=1, + local_world_size=1, + worker_id=0, + local_worker_id=0, + kv_dtype=torch.bfloat16, + kv_shape=(2, 2, 256, 4, 64), + ) + cpu_backend = LocalCPUBackend(config) + with patch( + "lmcache.v1.storage_backend.plugins.nvmeof_backend._run_nvme_connect", + return_value=True, + ) as mock_conn: + b = NVMeOFBackend( + dst_device="cpu", + config=config, + metadata=metadata, + local_cpu_backend=cpu_backend, + loop=nvmeof_loop, + ) + b.close() + mock_conn.assert_called_once() + cpu_backend.memory_allocator.close()