Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .github/workflows/run-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,8 @@ jobs:
TQ_TEST_BACKEND=MooncakeStore pytest tests/e2e/test_e2e_lifecycle_consistency.py
pkill -f "[m]ooncake_master" || true
TQ_TEST_BACKEND=MooncakeStore pytest tests/e2e/test_kv_interface_e2e.py
pkill -f "[m]ooncake_master" || true
pkill -f "[m]ooncake_master" || true
- name: Run Yuanrong Backend Specific E2E Tests
run: |
TQ_TEST_BACKEND=Yuanrong pytest tests/e2e/test_e2e_lifecycle_consistency.py
TQ_TEST_BACKEND=Yuanrong pytest tests/e2e/test_kv_interface_e2e.py
136 changes: 60 additions & 76 deletions docs/storage_backends/openyuanrong_datasystem.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

# OpenYuanrong-Datasystem Integration for TransferQueue

> Last updated: 01/26/2026
> Last updated: 04/17/2026

## 🎉 Overview

Expand Down Expand Up @@ -58,27 +58,7 @@ pip install openyuanrong-datasystem
dscli -h
```

#### 3. Install etcd

OpenYuanrong-datasystem relies on etcd for cluster coordination.
Download and install etcd from the official releases: [ETCD GitHub Releases](https://github.com/etcd-io/etcd/releases)

```bash
# Example for Linux ARM64 (adjust for your architecture)
# Unpack and install etcd
ETCD_VERSION = "v3.6.5" # Replace with the desired version
tar -xvf etcd-${ETCD_VERSION}-linux-arm64.tar.gz
cd etcd-${ETCD_VERSION}-linux-arm64

# Copy the executable file to the system path
sudo cp etcd etcdctl /usr/local/bin/

# Verify installation
etcd --version
etcdctl version
```

#### 4. (Optional) Install CANN and torch-npu
#### 3. (Optional) Install CANN and torch-npu

If you have NPU devices and want to accelerate the transmission of NPU tensor,
you can install **Ascend-cann-toolkit** and **torch-npu**.
Expand Down Expand Up @@ -106,19 +86,36 @@ pip install torch-npu==2.8.0
Next, we will provide deployment and code examples for single-node scenarios.
For multi-node scenarios, please refer to [Appendix B](#B-deploy-multi-node-datasystem-for-multi-node-training-and-inference-scenarios).

Unlike using TransferQueue with its default backend, integrating OpenYuanrong-Datasystem requires **pre-launching** the datasystem services before running your Python application.

#### Deployment
```bash
# Deploy etcd (for example, port 2379)
etcd --listen-client-urls http://0.0.0.0:2379 \
--advertise-client-urls http://localhost:2379 &

# Deploy datasystem
dscli start -w --worker_address "127.0.0.1:31501" --etcd_address "127.0.0.1:2379"
TransferQueue will automatically initialize the Yuanrong backend when `auto_init: True` is set. TransferQueue will:
- Create placement groups to ensure workers are spread across Ray nodes
- Launch YuanrongWorkerActor on each node to start datasystem workers
- Set up metastore service on the head node

Configuration example:
```yaml
backend:
storage_backend: Yuanrong
Yuanrong:
auto_init: True
worker_port: 31501
metastore_port: 2379
enable_yr_npu_transport: true
worker_args: "--shared_memory_size_mb 8192 --remote_h2d_device_ids 0 --enable_huge_tlb true"
```

Once the datasystem is up, you can run your TransferQueue + Datasystem application.
Configuration options:
- `auto_init`: Whether to automatically initialize Yuanrong backend. Currently only `True` is supported.
- `worker_port`: Port for Yuanrong datasystem worker on each node.
- `metastore_port`: Port for metastore service on the head node.
- `enable_yr_npu_transport`: Enable NPU transport for high-performance device-to-device data transfer.
- `worker_args`: Additional arguments passed to `dscli start` command:
- `--shared_memory_size_mb`: Shared memory size in MB for datasystem worker.
- `--remote_h2d_device_ids`: Enable RH2D (Remote Host-to-Device) for efficient cross-node data transfer. Specify NPU device IDs as comma-separated values (e.g., `0,1,2,3`).
- `--enable_huge_tlb`: Enable huge page memory, required for >21GB shared memory on Ascend 910B.

Once the configuration is set, you can run your TransferQueue + Datasystem application directly.

#### Demo
You can associate `TransferQueueClient` with `YuanrongStorageManager` through the configuration dictionary when initializing the TransferQueue.
Expand All @@ -137,7 +134,7 @@ from transfer_queue import (
config_str = """
manager_type: YuanrongStorageManager
client_name: YuanrongStorageClient
port: 31501
worker_port: 31501
"""
dict_conf = OmegaConf.create(config_str, flags={"allow_objects": True})
```
Expand Down Expand Up @@ -198,13 +195,7 @@ print("output: ", output)


#### Shut down datasystem:
```bash
# shutdown datasystem on the node
dscli stop --worker_address "127.0.0.1:31501"

# shutdown etcd
pkill -f etcd || true
```
TransferQueue automatically handles cleanup when calling `tq.close()`, which stops all Yuanrong datasystem workers gracefully.

### Datasystem Logs

Expand Down Expand Up @@ -251,57 +242,46 @@ If you need to uninstall, execute:
```

### B: Deploy multi-node datasystem for multi-node training and inference scenarios
We can use etcd to connect to a datasystem backend across multiple nodes.
Let's take two nodes (for instance, 10.170.27.24 and 10.170.27.33) as an example.

#### Start etcd on head node

```bash
# For example, using the port 2379 of head node
etcd \
--name etcd-single \
--data-dir /tmp/etcd-data \
--listen-client-urls http://10.170.27.24:2379 \
--advertise-client-urls http://10.170.27.24:2379 \
--listen-peer-urls http://10.170.27.24:2380 \
--initial-advertise-peer-urls http://10.170.27.24:2380 \
--initial-cluster etcd-single=http://10.170.27.24:2380 &
```
TransferQueue automatically initializes Yuanrong datasystem workers across all Ray cluster nodes. Just set `auto_init: True` in the configuration and TransferQueue will handle the multi-node deployment.

Let's take two nodes (for instance, 192.168.0.1 and 192.168.0.2) as an example.

#### Deploy multi-nodes datasystem
On each node, you need to connect to the etcd service on the head node using your local node's IP address.
#### Deploy ray
```bash
#on head node
dscli start -w --worker_address "10.170.27.24:31501" --etcd_address "10.170.27.24:2379"
# on head node
ray start --head --resources='{"node:192.168.0.1": 1}'

# on worker node (assume ray port of head_node is 6379)
ray start --address="192.168.0.1:6379" --resources='{"node:192.168.0.2": 1}'
```

```bash
#on work node
dscli start -w --worker_address "10.170.27.33:31501" --etcd_address "10.170.27.24:2379"
#### Configuration

TransferQueue will detect all Ray nodes and deploy datasystem workers automatically:
```yaml
backend:
storage_backend: Yuanrong
Yuanrong:
auto_init: True
worker_port: 31501
metastore_port: 2379
enable_yr_npu_transport: true
worker_args: "--shared_memory_size_mb 65536 --remote_h2d_device_ids 0 --enable_huge_tlb true"
```
Now you can use datasystem on head-node and work-node.

> For more detailed deployment instructions, please refer to [yuanrong documents](https://gitcode.com/openeuler/yuanrong-datasystem/blob/master/README.md#%E9%83%A8%E7%BD%B2-openyuanrong-datasystem).
> The configuration parameters for deploying the data system can refer [dscli config](https://gitcode.com/openeuler/yuanrong-datasystem/blob/master/docs/source_zh_cn/deployment/dscli.md#%E9%85%8D%E7%BD%AE%E9%A1%B9%E8%AF%B4%E6%98%8E).

There is a demo with multi-node scenarios as fellow.

#### Deploy ray
```bash
# on head node
ray start --head --resources='{"node:10.170.27.24": 1}'

# on worker node (assume ray port of head_node is 6379)
ray start --address="10.170.27.24:6379" --resources='{"node:10.170.27.33": 1}'
```

#### Run demo
In the demo below, we use ray actors to implement distributed deployment of processes.
In the demo below, we use ray actors to implement distributed deployment of processes.
The actor writer writes data to the head node, and the actor reader reads data from the worker nodes.
```python
from omegaconf import OmegaConf
from tensordict import TensorDict
import transfer_queue as tq
from transfer_queue import (
TransferQueueClient,
TransferQueueController,
Expand All @@ -312,10 +292,10 @@ import ray

########################################################################
# Please set up Ray cluster before running this script
# e.g. ray start --head --resources='{"node:127.0.0.1": 1}'
# e.g. ray start --head --resources='{"node:192.168.0.1": 1}'
########################################################################
HEAD_NODE_IP = "10.170.27.24" # Replace with your head node IP
WORKER_NODE_IP = "10.170.27.33" # Replace with your worker node IP
HEAD_NODE_IP = "192.168.0.1" # Replace with your head node IP
WORKER_NODE_IP = "192.168.0.2" # Replace with your worker node IP


def initialize_controller():
Expand Down Expand Up @@ -357,10 +337,12 @@ class TransferQueueClientActor:


def main():
tq.init()

config_str = """
manager_type: YuanrongStorageManager
client_name: YuanrongStorageClient
port: 31501
worker_port: 31501
"""
dict_conf = OmegaConf.create(config_str, flags={"allow_objects": True})
# It is important to pay attention to the controller's lifecycle.
Expand All @@ -387,6 +369,8 @@ def main():
)
output = ray.get(output)

tq.close()

if __name__ == "__main__":
main()

Expand Down
5 changes: 4 additions & 1 deletion scripts/performance_test/README_PERFTEST.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,11 @@ backend:
backend:
storage_backend: Yuanrong
Yuanrong:
port: 31501
auto_init: True
worker_port: 31501
metastore_port: 2379
enable_yr_npu_transport: true
worker_args: "--shared_memory_size_mb 65536 --remote_h2d_device_ids 0 --enable_huge_tlb true"
```

For Yuanrong backend, writer runs on the head node and reader runs on the worker node. `--worker_node_ip` is required.
Expand Down
14 changes: 12 additions & 2 deletions scripts/performance_test/perftest_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,17 @@ backend:

# For Yuanrong:
Yuanrong:
# Port of local yuanrong datasystem worker
port: 31501
# Whether to let TQ automatically init yuanrong
auto_init: True
# Datasystem worker port
worker_port: 31501
# Metastore service port
metastore_port: 2379
# If enable npu transport
enable_yr_npu_transport: true
# Additional config for yuanrong worker.
# Recommended options for NPU environments:
# --remote_h2d_device_ids Enable RH2D for efficient cross-node data transfer. Specify NPU device IDs (comma-separated).
# --enable_huge_tlb Enable huge page memory to improve performance. Required for >21GB shared memory on 910B.
# Example: "--shared_memory_size_mb 16384 --remote_h2d_device_ids 0,1,2,3 --enable_huge_tlb true"
worker_args: "--shared_memory_size_mb 65536 --remote_h2d_device_ids 0 --enable_huge_tlb true"
7 changes: 4 additions & 3 deletions tests/e2e/test_e2e_lifecycle_consistency.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@
"backend": {
"storage_backend": "Yuanrong",
"Yuanrong": {
"host": "127.0.0.1",
"port": 31501,
"worker_port": 31501,
"metastore_port": 2379,
},
},
},
Expand All @@ -102,11 +102,12 @@ def backend_name():
"""Get the backend name from environment variable.

Environment variables:
TQ_TEST_BACKEND: Backend name (SimpleStorage or MooncakeStore)
TQ_TEST_BACKEND: Backend name (SimpleStorage, MooncakeStore, or Yuanrong)

To run tests for a specific backend:
TQ_TEST_BACKEND=SimpleStorage pytest tests/e2e/test_e2e_lifecycle_consistency.py
TQ_TEST_BACKEND=MooncakeStore pytest tests/e2e/test_e2e_lifecycle_consistency.py
TQ_TEST_BACKEND=Yuanrong pytest tests/e2e/test_e2e_lifecycle_consistency.py
"""
return os.environ.get("TQ_TEST_BACKEND", "SimpleStorage")

Expand Down
15 changes: 14 additions & 1 deletion tests/e2e/test_kv_interface_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,18 @@ def tq_api(request):
},
},
},
"Yuanrong": {
"controller": {
"polling_mode": True,
},
"backend": {
"storage_backend": "Yuanrong",
"Yuanrong": {
"worker_port": 31501,
"metastore_port": 2379,
},
},
},
}


Expand All @@ -112,11 +124,12 @@ def backend_name():
"""Get the backend name from environment variable.

Environment variables:
TQ_TEST_BACKEND: Backend name (SimpleStorage or MooncakeStore)
TQ_TEST_BACKEND: Backend name (SimpleStorage, MooncakeStore, or Yuanrong)

To run tests for a specific backend:
TQ_TEST_BACKEND=SimpleStorage pytest tests/e2e/test_kv_interface_e2e.py
TQ_TEST_BACKEND=MooncakeStore pytest tests/e2e/test_kv_interface_e2e.py
TQ_TEST_BACKEND=Yuanrong pytest tests/e2e/test_kv_interface_e2e.py
"""
return os.environ.get("TQ_TEST_BACKEND", "SimpleStorage")

Expand Down
3 changes: 1 addition & 2 deletions tests/test_kv_storage_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ def test_data():
cfg = {
"controller_info": MagicMock(),
"client_name": "YuanrongStorageClient",
"host": "127.0.0.1",
"port": 31501,
"worker_port": 31501,
"device_id": 0,
}
global_indexes = [8, 9, 10]
Expand Down
2 changes: 1 addition & 1 deletion tests/test_storage_client_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

class Test(unittest.TestCase):
def setUp(self):
self.cfg = {"host": "127.0.0.1", "port": 31501, "device_id": 0}
self.cfg = {"worker_port": 31501, "device_id": 0}

@pytest.mark.skipif(find_spec("datasystem") is None, reason="datasystem is not available")
def test_create_client(self):
Expand Down
2 changes: 1 addition & 1 deletion tests/test_yuanrong_client_zero_copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def mock_kv_client(self, mocker):

@pytest.fixture
def storage_client(self, mock_kv_client):
return GeneralKVClientAdapter({"host": "127.0.0.1", "port": 31501})
return GeneralKVClientAdapter({"worker_port": 31501})

def test_mset_mget_p2p(self, storage_client, mocker):
# Mock serialization/deserialization
Expand Down
2 changes: 1 addition & 1 deletion tests/test_yuanrong_storage_client_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def mock_find_reachable_host(port, timeout=1.0):

@pytest.fixture
def config():
return {"host": "127.0.0.1", "port": 12345, "enable_yr_npu_optimization": True}
return {"worker_port": 12345, "enable_yr_npu_optimization": True}


def assert_tensors_equal(a: torch.Tensor, b: torch.Tensor):
Expand Down
19 changes: 13 additions & 6 deletions transfer_queue/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,17 @@ backend:

# For Yuanrong:
Yuanrong:
# Whether to let TQ automatically start etcd and datasystem services
# Whether to let TQ automatically init yuanrong
auto_init: True
# etcd service address (used to start etcd when auto_init=true)
etcd_address: "127.0.0.1:2379"
# datasystem worker host and port (used to start dscli when auto_init=true)
host: "127.0.0.1"
port: 31501
# Datasystem worker port
worker_port: 31501
# Metastore service port
metastore_port: 2379
# If enable npu transport
enable_yr_npu_transport: false
# Additional config for yuanrong worker.
# Recommended options for NPU environments:
# --remote_h2d_device_ids Enable RH2D for efficient cross-node data transfer. Specify NPU device IDs (comma-separated).
# --enable_huge_tlb Enable huge page memory to improve performance. Required for >21GB shared memory on 910B.
# Example: "--shared_memory_size_mb 16384 --remote_h2d_device_ids 0,1,2,3 --enable_huge_tlb true"
worker_args: "--shared_memory_size_mb 8192"
Loading
Loading