Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 8 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ TransferQueue offers **fine-grained, sub-sample-level** data management and **lo

### Control Plane: Panoramic Data Management

In the control plane, `TransferQueueController` tracks the **production status** and **consumption status** of each training sample as metadata. Once all required data fields are ready (i.e., written to the `TransferQueueStorageManager`), the data sample can be consumed by downstream tasks.
In the control plane, `TransferQueueController` tracks the **production status** and **consumption status** of each training sample as metadata. Once all required data fields are ready (i.e., written to the `StorageManager`), the data sample can be consumed by downstream tasks.

We also track the consumption history for each computational task (e.g., `generate_sequences`, `compute_log_prob`, etc.). Therefore, even when different computational tasks require the same data field, they can consume the data independently without interfering with each other.

Expand All @@ -66,7 +66,7 @@ To make the data retrieval process more customizable, we provide a `Sampler` cla

In the data plane, we utilize a pluggable design, enabling TransferQueue to integrate with different storage backends based on user requirements.

Specifically, we provide a `TransferQueueStorageManager` abstraction class that defines the core APIs as follows:
Specifically, we provide a `StorageManager` abstraction class that defines the core APIs as follows:

- `async def put_data(self, data: TensorDict, metadata: BatchMeta) -> None`
- `async def get_data(self, metadata: BatchMeta) -> TensorDict`
Expand Down Expand Up @@ -298,21 +298,19 @@ The data plane is organized as follows:
│ │── simple_backend.py # Default distributed storage backend (SimpleStorageUnit) by TQ
│ ├── managers/ # Managers are upper level interfaces that encapsulate the interaction logic with TQ system.
│ │ ├── __init__.py
│ │ ├──base.py # TransferQueueStorageManager, KVStorageManager
│ │ ├──simple_backend_manager.py # AsyncSimpleStorageManager
│ │ ├──base.py # StorageManager, KVStorageManager, StorageManagerFactory
│ │ ├──simple_storage_manager.py # AsyncSimpleStorageManager
│ │ ├──yuanrong_manager.py # YuanrongStorageManager
│ │ ├──mooncake_manager.py # MooncakeStorageManager
│ │ └──factory.py # TransferQueueStorageManagerFactory
│ │ └──mooncake_manager.py # MooncakeStorageManager
│ └── clients/ # Clients are lower level interfaces that directly manipulate the target storage backend.
│ │ ├── __init__.py
│ │ ├── base.py # TransferQueueStorageKVClient
│ │ ├── base.py # StorageKVClient, StorageClientFactory
│ │ ├── yuanrong_client.py # YuanrongStorageClient
│ │ ├── mooncake_client.py # MooncakeStorageClient
│ │ ├── ray_storage_client.py # RayStorageClient
│ │ └── factory.py # TransferQueueStorageClientFactory
│ │ └── ray_storage_client.py # RayStorageClient
```

To integrate TransferQueue with a custom storage backend, start by implementing a subclass that inherits from `TransferQueueStorageManager`. This subclass acts as an adapter between the TransferQueue system and the target storage backend. For KV-based storage backends, you can simply inherit from `KVStorageManager`, which can serve as the general manager for all KV-based backends.
To integrate TransferQueue with a custom storage backend, start by implementing a subclass that inherits from `StorageManager`. This subclass acts as an adapter between the TransferQueue system and the target storage backend. For KV-based storage backends, you can simply inherit from `KVStorageManager`, which can serve as the general manager for all KV-based backends.

Distributed storage backends often come with their own native clients serving as the interface of the storage system. In such cases, a low-level adapter for this client can be written, following the examples provided in the `storage/clients` directory.

Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ requires-python = ">=3.10"
# Note: While the formatter will attempt to format lines such that they remain within the line-length,
# it isn't a hard upper bound, and formatted lines may exceed the line-length.
line-length = 120
target-version = "py310"

[tool.ruff.lint]
isort = {known-first-party = ["transfer_queue"]}
Expand Down Expand Up @@ -60,7 +61,7 @@ ignore = [
# `.log()` statement uses f-string
"G004",
# X | None for type annotations
"UP045",
# "UP045",
# deprecated import
"UP035",
]
Expand Down
2 changes: 1 addition & 1 deletion scripts/put_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

from transfer_queue import TransferQueueClient
from transfer_queue.controller import TransferQueueController
from transfer_queue.storage.simple_backend import SimpleStorageUnit
from transfer_queue.storage.simple_storage import SimpleStorageUnit
from transfer_queue.utils.common import get_placement_group
from transfer_queue.utils.zmq_utils import process_zmq_server_info

Expand Down
40 changes: 19 additions & 21 deletions tests/test_async_simple_storage_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

from transfer_queue.metadata import BatchMeta
from transfer_queue.storage import AsyncSimpleStorageManager
from transfer_queue.utils.enum_utils import TransferQueueRole
from transfer_queue.utils.enum_utils import Role
from transfer_queue.utils.zmq_utils import ZMQMessage, ZMQRequestType, ZMQServerInfo


Expand All @@ -35,13 +35,13 @@ async def mock_async_storage_manager():
# Mock storage unit infos
storage_unit_infos = {
"storage_0": ZMQServerInfo(
role=TransferQueueRole.STORAGE,
role=Role.STORAGE,
id="storage_0",
ip="127.0.0.1",
ports={"put_get_socket": 12345},
),
"storage_1": ZMQServerInfo(
role=TransferQueueRole.STORAGE,
role=Role.STORAGE,
id="storage_1",
ip="127.0.0.1",
ports={"put_get_socket": 12346},
Expand All @@ -50,7 +50,7 @@ async def mock_async_storage_manager():

# Mock controller info
controller_info = ZMQServerInfo(
role=TransferQueueRole.CONTROLLER,
role=Role.CONTROLLER,
id="controller_0",
ip="127.0.0.1",
ports={"handshake_socket": 12347, "data_status_update_socket": 12348},
Expand All @@ -61,9 +61,7 @@ async def mock_async_storage_manager():
}

# Mock the handshake process entirely to avoid ZMQ complexity
with patch(
"transfer_queue.storage.managers.base.TransferQueueStorageManager._connect_to_controller"
) as mock_connect:
with patch("transfer_queue.storage.managers.base.StorageManager._connect_to_controller") as mock_connect:
# Mock the manager without actually connecting
manager = AsyncSimpleStorageManager.__new__(AsyncSimpleStorageManager)
manager.storage_manager_id = "test_storage_manager"
Expand Down Expand Up @@ -148,7 +146,7 @@ async def test_async_storage_manager_error_handling():
# Mock storage unit infos
storage_unit_infos = {
"storage_0": ZMQServerInfo(
role=TransferQueueRole.STORAGE,
role=Role.STORAGE,
id="storage_0",
ip="127.0.0.1",
ports={"put_get_socket": 12345},
Expand All @@ -157,7 +155,7 @@ async def test_async_storage_manager_error_handling():

# Mock controller info
controller_info = ZMQServerInfo(
role=TransferQueueRole.CONTROLLER,
role=Role.CONTROLLER,
id="controller_0",
ip="127.0.0.1",
ports={"handshake_socket": 12346, "data_status_update_socket": 12347},
Expand Down Expand Up @@ -242,19 +240,19 @@ async def test_get_data_routes_from_hash():
"""get_data should route using global_idx % num_su (hash routing)."""
storage_unit_infos = {
"storage_0": ZMQServerInfo(
role=TransferQueueRole.STORAGE,
role=Role.STORAGE,
id="storage_0",
ip="127.0.0.1",
ports={"put_get_socket": 19010},
),
"storage_1": ZMQServerInfo(
role=TransferQueueRole.STORAGE,
role=Role.STORAGE,
id="storage_1",
ip="127.0.0.1",
ports={"put_get_socket": 19011},
),
}
with patch("transfer_queue.storage.managers.base.TransferQueueStorageManager._connect_to_controller"):
with patch("transfer_queue.storage.managers.base.StorageManager._connect_to_controller"):
manager = AsyncSimpleStorageManager.__new__(AsyncSimpleStorageManager)
manager.storage_manager_id = "test_get"
manager.storage_unit_infos = storage_unit_infos
Expand Down Expand Up @@ -295,19 +293,19 @@ async def test_clear_data_routes_from_hash():
"""clear_data should route using global_idx % num_su (hash routing)."""
storage_unit_infos = {
"storage_0": ZMQServerInfo(
role=TransferQueueRole.STORAGE,
role=Role.STORAGE,
id="storage_0",
ip="127.0.0.1",
ports={"put_get_socket": 19020},
),
"storage_1": ZMQServerInfo(
role=TransferQueueRole.STORAGE,
role=Role.STORAGE,
id="storage_1",
ip="127.0.0.1",
ports={"put_get_socket": 19021},
),
}
with patch("transfer_queue.storage.managers.base.TransferQueueStorageManager._connect_to_controller"):
with patch("transfer_queue.storage.managers.base.StorageManager._connect_to_controller"):
manager = AsyncSimpleStorageManager.__new__(AsyncSimpleStorageManager)
manager.storage_manager_id = "test_clear"
manager.storage_unit_infos = storage_unit_infos
Expand Down Expand Up @@ -346,19 +344,19 @@ async def test_hash_routing_stable_across_batch_sizes():
"""
storage_unit_infos = {
"storage_0": ZMQServerInfo(
role=TransferQueueRole.STORAGE,
role=Role.STORAGE,
id="storage_0",
ip="127.0.0.1",
ports={"put_get_socket": 19030},
),
"storage_1": ZMQServerInfo(
role=TransferQueueRole.STORAGE,
role=Role.STORAGE,
id="storage_1",
ip="127.0.0.1",
ports={"put_get_socket": 19031},
),
}
with patch("transfer_queue.storage.managers.base.TransferQueueStorageManager._connect_to_controller"):
with patch("transfer_queue.storage.managers.base.StorageManager._connect_to_controller"):
manager = AsyncSimpleStorageManager.__new__(AsyncSimpleStorageManager)
manager.storage_manager_id = "test_hash_batch"
manager.storage_unit_infos = storage_unit_infos
Expand Down Expand Up @@ -407,19 +405,19 @@ async def test_hash_routing_stable_reversed_order():
"""
storage_unit_infos = {
"storage_0": ZMQServerInfo(
role=TransferQueueRole.STORAGE,
role=Role.STORAGE,
id="storage_0",
ip="127.0.0.1",
ports={"put_get_socket": 19040},
),
"storage_1": ZMQServerInfo(
role=TransferQueueRole.STORAGE,
role=Role.STORAGE,
id="storage_1",
ip="127.0.0.1",
ports={"put_get_socket": 19041},
),
}
with patch("transfer_queue.storage.managers.base.TransferQueueStorageManager._connect_to_controller"):
with patch("transfer_queue.storage.managers.base.StorageManager._connect_to_controller"):
manager = AsyncSimpleStorageManager.__new__(AsyncSimpleStorageManager)
manager.storage_manager_id = "test_hash_order"
manager.storage_unit_infos = storage_unit_infos
Expand Down
10 changes: 5 additions & 5 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

from transfer_queue import TransferQueueClient
from transfer_queue.metadata import BatchMeta
from transfer_queue.utils.enum_utils import TransferQueueRole
from transfer_queue.utils.enum_utils import Role
from transfer_queue.utils.zmq_utils import (
ZMQMessage,
ZMQRequestType,
Expand Down Expand Up @@ -59,7 +59,7 @@ def __init__(self, controller_id="controller_0"):
self.request_port = self._bind_to_random_port(self.request_socket)

self.zmq_server_info = ZMQServerInfo(
role=TransferQueueRole.CONTROLLER,
role=Role.CONTROLLER,
id=controller_id,
ip="127.0.0.1",
ports={
Expand Down Expand Up @@ -300,7 +300,7 @@ def __init__(self, storage_id="storage_0"):
self.data_port = self._bind_to_random_port(self.data_socket)

self.zmq_server_info = ZMQServerInfo(
role=TransferQueueRole.STORAGE,
role=Role.STORAGE,
id=storage_id,
ip="127.0.0.1",
ports={
Expand Down Expand Up @@ -409,7 +409,7 @@ def client_setup(mock_controller, mock_storage):

# Mock the storage manager to avoid handshake issues but mock all data operations
with patch(
"transfer_queue.storage.managers.simple_backend_manager.AsyncSimpleStorageManager._connect_to_controller"
"transfer_queue.storage.managers.simple_storage_manager.AsyncSimpleStorageManager._connect_to_controller"
):
config = {
"controller_info": mock_controller.zmq_server_info,
Expand Down Expand Up @@ -502,7 +502,7 @@ def test_single_controller_multiple_storages():

# Mock the storage manager to avoid handshake issues but mock all data operations
with patch(
"transfer_queue.storage.managers.simple_backend_manager.AsyncSimpleStorageManager._connect_to_controller"
"transfer_queue.storage.managers.simple_storage_manager.AsyncSimpleStorageManager._connect_to_controller"
):
config = {
"controller_info": controller.zmq_server_info,
Expand Down
2 changes: 1 addition & 1 deletion tests/test_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -678,7 +678,7 @@ class TestStorageUnitDataStrict:

def test_put_data_length_mismatch_raises(self):
"""put_data must raise when global_indexes and field values have different lengths."""
from transfer_queue.storage.simple_backend import StorageUnitData
from transfer_queue.storage.simple_storage import StorageUnitData

sud = StorageUnitData(storage_size=10)
# 3 indexes but only 2 values — must raise, not silently drop
Expand Down
13 changes: 6 additions & 7 deletions tests/test_ray_p2p.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@

from transfer_queue.client import TransferQueueClient
from transfer_queue.metadata import BatchMeta
from transfer_queue.storage.managers.base import KVStorageManager
from transfer_queue.storage.managers.factory import TransferQueueStorageManagerFactory
from transfer_queue.storage.managers.base import KVStorageManager, StorageManagerFactory
from transfer_queue.utils.zmq_utils import ZMQServerInfo

TEST_CONFIGS: list[tuple[tuple[int, int], torch.dtype]] = [
Expand All @@ -45,18 +44,18 @@

# Step 1: Mock Controller Role
try:
from transfer_queue.role import TransferQueueRole
from transfer_queue.role import Role
except ImportError:
from enum import Enum

class TransferQueueRole(Enum):
class Role(Enum):
CONTROLLER = "controller"
STORAGE = "storage"


def create_mock_controller():
return ZMQServerInfo(
role=TransferQueueRole.CONTROLLER,
role=Role.CONTROLLER,
id="controller_0",
ip="127.0.0.1",
ports={
Expand All @@ -71,9 +70,9 @@ def create_mock_controller():
def ensure_mock_storage_manager_registered():
"""Ensure MockKVStorageManager is registered in current process."""

if "KV_MOCK" not in TransferQueueStorageManagerFactory._registry:
if "KV_MOCK" not in StorageManagerFactory._registry:

@TransferQueueStorageManagerFactory.register("KV_MOCK")
@StorageManagerFactory.register("KV_MOCK")
class MockKVStorageManager(KVStorageManager):
def _connect_to_controller(self):
pass
Expand Down
4 changes: 2 additions & 2 deletions tests/test_simple_storage_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import torch
import zmq

from transfer_queue.storage.simple_backend import SimpleStorageUnit
from transfer_queue.storage.simple_storage import SimpleStorageUnit
from transfer_queue.utils.zmq_utils import ZMQMessage, ZMQRequestType


Expand Down Expand Up @@ -420,7 +420,7 @@ def test_storage_unit_data_direct():

def test_storage_unit_data_capacity_uses_active_keys():
"""Capacity check must use _active_keys, not scan field_data."""
from transfer_queue.storage.simple_backend import StorageUnitData
from transfer_queue.storage.simple_storage import StorageUnitData

storage = StorageUnitData(storage_size=3)

Expand Down
2 changes: 1 addition & 1 deletion tests/test_storage_client_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import pytest
import torch

from transfer_queue.storage.clients.factory import StorageClientFactory
from transfer_queue.storage.clients.base import StorageClientFactory
from transfer_queue.storage.clients.yuanrong_client import YuanrongStorageClient


Expand Down
Loading
Loading