From 1ed0bf197ada26989f09a7363700b36df5bb4bf1 Mon Sep 17 00:00:00 2001 From: Haichuan Hu Date: Thu, 5 Mar 2026 18:36:46 +0800 Subject: [PATCH 1/8] [feat]: Support hccl tensor transport Signed-off-by: Haichuan Hu --- .github/workflows/pre-commit.yml | 3 - README.md | 20 +- docs/user_guide/hccl_collective.md | 4 +- docs/user_guide/yr_transport.md | 2 +- ray_ascend/__init__.py | 65 +++++ ray_ascend/collective/__init__.py | 7 - .../collective/hccl_collective_group.py | 36 ++- ray_ascend/direct_transport/__init__.py | 7 - .../direct_transport/hccl_tensor_transport.py | 27 +++ tests/__init__.py | 0 tests/collective/test_hccl_group.py | 18 +- tests/collective/test_hccl_via_registry.py | 228 ++++++++++++++++++ tests/conftest.py | 37 +++ .../test_hccl_tensor_transport.py | 58 +++++ 14 files changed, 466 insertions(+), 46 deletions(-) delete mode 100644 ray_ascend/collective/__init__.py delete mode 100644 ray_ascend/direct_transport/__init__.py create mode 100644 ray_ascend/direct_transport/hccl_tensor_transport.py create mode 100644 tests/__init__.py create mode 100644 tests/collective/test_hccl_via_registry.py create mode 100644 tests/conftest.py create mode 100644 tests/direct_transport/test_hccl_tensor_transport.py diff --git a/.github/workflows/pre-commit.yml b/.github/workflows/pre-commit.yml index 505ef2c..2e0bf15 100644 --- a/.github/workflows/pre-commit.yml +++ b/.github/workflows/pre-commit.yml @@ -4,9 +4,6 @@ name: pre-commit # No need to avoid / cancel lightweight pre-commit jobs on: pull_request: - push: - branches: - - master # Declare permissions just read content. permissions: diff --git a/README.md b/README.md index 583289c..101a8d5 100644 --- a/README.md +++ b/README.md @@ -57,9 +57,14 @@ pip install "ray-ascend[yr]" ```python import ray from ray.util import collective -from ray_ascend.collective import HCCLGroup +from ray_ascend import register_hccl_collective_backend -ray.register_collective_backend("HCCL", HCCLGroup) +register_hccl_collective_backend() + +@ray.remote(resources={"NPU": 1}) +class RayActor: + def __init__(self): + register_hccl_collective_backend() collective.create_collective_group( actors, @@ -79,16 +84,15 @@ collective.broadcast(tensor, src_rank=0, group_name="my_group") import ray import torch from ray.util.collective import create_collective_group -from ray.experimental import register_tensor_transport -from ray_ascend.collective import HCCLGroup -from ray_ascend.direct_transport import HCCLTensorTransport - -ray.register_collective_backend("HCCL", HCCLGroup) -register_tensor_transport("HCCL", ["npu"], HCCLTensorTransport, torch.Tensor) +from ray_ascend import register_hccl_tensor_transport +register_hccl_tensor_transport() @ray.remote(resources={"NPU": 1}) class RayActor: + def __init__(self): + register_hccl_tensor_transport() + @ray.method(tensor_transport="HCCL") def random_tensor(self): return torch.zeros(1024, device="npu") diff --git a/docs/user_guide/hccl_collective.md b/docs/user_guide/hccl_collective.md index 6d42072..cccae90 100644 --- a/docs/user_guide/hccl_collective.md +++ b/docs/user_guide/hccl_collective.md @@ -20,7 +20,7 @@ distributed collective operations across Ray actors. ```python import ray from ray.util import collective -from ray_ascend.collective import HCCLGroup +from ray_ascend.collective.hccl_collective_group import HCCLGroup # Initialize Ray ray.init() @@ -80,7 +80,7 @@ You can also use Ray's high-level collective API: ```python import ray from ray.util import collective -from ray_ascend.collective import HCCLGroup +from ray_ascend.collective.hccl_collective_group import HCCLGroup ray.init() ray.register_collective_backend("HCCL", HCCLGroup) diff --git a/docs/user_guide/yr_transport.md b/docs/user_guide/yr_transport.md index 0c96137..c3bd3c1 100644 --- a/docs/user_guide/yr_transport.md +++ b/docs/user_guide/yr_transport.md @@ -215,7 +215,7 @@ import os import ray from ray.util.collective import create_collective_group from ray_ascend import register_yr_tensor_transport -from ray_ascend.collective import HCCLGroup +from ray_ascend.collective.hccl_collective_group import HCCLGroup ray.init(address="auto") diff --git a/ray_ascend/__init__.py b/ray_ascend/__init__.py index 2dcdd6d..0e4a1d7 100644 --- a/ray_ascend/__init__.py +++ b/ray_ascend/__init__.py @@ -9,6 +9,8 @@ "__version__", "__commit__", "register_yr_tensor_transport", + "register_hccl_collective_backend", + "register_hccl_tensor_transport", ] __commit__ = _version.commit @@ -86,3 +88,66 @@ def transfer_cpu_tensor_via_rdma(): # Register tensor transport register_tensor_transport("YR", devices, YRTensorTransport, torch.Tensor) + + +def register_hccl_collective_backend() -> None: + """ + Register HCCL collective backend for Ray. + + This function must be called in each Ray worker/actor process + before using HCCL collective operations. + + Example: + from ray.util import collective + from ray_ascend import register_hccl_collective_backend + + register_hccl_collective_backend() + + @ray.remote(resources={"NPU": 1}) + class RayActor: + def __init__(self): + register_hccl_collective_backend() + + collective.create_collective_group( + actors, + len(actors), + list(range(0, len(actors))), + backend="HCCL", + group_name="my_group", + ) + """ + from ray.util.collective.backend_registry import register_collective_backend + + from .collective.hccl_collective_group import HCCLGroup + + register_collective_backend("HCCL", HCCLGroup) + + +def register_hccl_tensor_transport() -> None: + """ + Register HCCL backend and tensor transport for Ray. + + This function must be called in each Ray worker/actor process + before using HCCL collective operations or tensor transport. + + Example: + from ray_ascend import register_hccl_tensor_transport + + register_hccl_tensor_transport() + + @ray.remote(resources={"NPU": 1}) + class RayActor: + def __init__(self): + register_hccl_tensor_transport() + + @ray.method(tensor_transport="HCCL") + def transfer_npu_tensor_via_hccs(self): + return torch.tensor([1, 2, 3]).npu() + """ + import torch + from ray.experimental import register_tensor_transport + + from .direct_transport.hccl_tensor_transport import HCCLTensorTransport + + register_hccl_collective_backend() + register_tensor_transport("HCCL", ["npu"], HCCLTensorTransport, torch.Tensor) diff --git a/ray_ascend/collective/__init__.py b/ray_ascend/collective/__init__.py deleted file mode 100644 index d2626ab..0000000 --- a/ray_ascend/collective/__init__.py +++ /dev/null @@ -1,7 +0,0 @@ -from .hccl_collective_group import ( - HCCLGroup, -) - -__all__ = [ - "HCCLGroup", -] diff --git a/ray_ascend/collective/hccl_collective_group.py b/ray_ascend/collective/hccl_collective_group.py index 93a20d3..6fb538b 100644 --- a/ray_ascend/collective/hccl_collective_group.py +++ b/ray_ascend/collective/hccl_collective_group.py @@ -23,6 +23,18 @@ logger = logging.getLogger(__name__) +try: + import importlib.util + + if importlib.util.find_spec("torch_npu") is None: + raise ImportError("torch_npu not found") + ctypes.CDLL("libhccl.so") + _HCCL_AVAILABLE = True + _LOG_HCCL_WARNING = False +except (ImportError, OSError): + _HCCL_AVAILABLE = False + _LOG_HCCL_WARNING = True + class HcclRootInfo(ctypes.Structure): _fields_ = [("internal", ctypes.c_byte * 4108)] @@ -152,7 +164,21 @@ def destroy_group(self) -> None: @classmethod def backend(cls) -> Backend: """Return the backend type for this group.""" - return Backend.HCCL + return "HCCL" + + @classmethod + def check_backend_availability(cls) -> bool: + """Check if the backend is available.""" + global _HCCL_AVAILABLE, _LOG_HCCL_WARNING + if not _HCCL_AVAILABLE and _LOG_HCCL_WARNING: + logger.warning( + "HCCL seems unavailable. Please install torch_npu " + "following the guide at: " + "https://gitcode.com/Ascend/pytorch and " + "ensure libhccl.so is available." + ) + _LOG_HCCL_WARNING = False + return _HCCL_AVAILABLE def broadcast( self, @@ -232,6 +258,10 @@ def collective_fn( current_stream.wait_event(event) logger.debug(f"HcclAllGather execute result : {exec_result}") + # Handle case where tensor_list is wrapped in another list by Ray's collective.py + if tensor_list and isinstance(tensor_list[0], list): + tensor_list = tensor_list[0] + output_flattened = [_flatten_for_scatter_gather(tensor_list, copy=False)] input_tensor = self._validate_tensor(tensor) @@ -598,6 +628,10 @@ def _validate_tensor(self, tensor: torch.Tensor) -> torch.Tensor: Enforces the single-device constraint and checks that the tensor device matches the communicator's initialization device. """ + # If the input is a list of tensors, we only support single tensor list for now. + # We will extract the single tensor out for validation. + if isinstance(tensor, list) and len(tensor) == 1: + tensor = tensor[0] if not isinstance(tensor, torch.Tensor): raise RuntimeError("Collective ops require torch.Tensor inputs.") device = get_tensor_device(tensor) diff --git a/ray_ascend/direct_transport/__init__.py b/ray_ascend/direct_transport/__init__.py deleted file mode 100644 index 793add2..0000000 --- a/ray_ascend/direct_transport/__init__.py +++ /dev/null @@ -1,7 +0,0 @@ -from .yr_tensor_transport import ( - YRTensorTransport, -) - -__all__ = [ - "YRTensorTransport", -] diff --git a/ray_ascend/direct_transport/hccl_tensor_transport.py b/ray_ascend/direct_transport/hccl_tensor_transport.py new file mode 100644 index 0000000..2b34472 --- /dev/null +++ b/ray_ascend/direct_transport/hccl_tensor_transport.py @@ -0,0 +1,27 @@ +from typing import List, Optional + +import torch +from ray.experimental.rdt.collective_tensor_transport import ( + CollectiveTensorTransport, +) +from ray.experimental.rdt.tensor_transport_manager import ( + CommunicatorMetadata, + TensorTransportMetadata, +) + + +class HCCLTensorTransport(CollectiveTensorTransport): + def tensor_transport_backend(self) -> str: + return "HCCL" + + def recv_multiple_tensors( + self, + obj_id: str, + tensor_transport_metadata: TensorTransportMetadata, + communicator_metadata: CommunicatorMetadata, + target_buffers: Optional[List["torch.Tensor"]] = None, + ) -> List["torch.Tensor"]: + torch.npu.set_device(0) + return super().recv_multiple_tensors( # type: ignore[no-any-return] + obj_id, tensor_transport_metadata, communicator_metadata, target_buffers + ) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/collective/test_hccl_group.py b/tests/collective/test_hccl_group.py index 7d8bf34..ee7e515 100644 --- a/tests/collective/test_hccl_group.py +++ b/tests/collective/test_hccl_group.py @@ -15,23 +15,7 @@ @pytest.fixture(scope="session") -def ray_cluster(): - world_size = 2 - if torch.npu.device_count() < world_size: - pytest.skip("Not enough NPU devices for HcclGroup tests") - if not ray.is_initialized(): - try: - ray.init(ignore_reinit_error=True, resources={"NPU": world_size}) - except ValueError: - # Likely connecting to an existing cluster; do not pass resources. - ray.init(ignore_reinit_error=True) - yield - if ray.is_initialized(): - ray.shutdown() - - -@pytest.fixture(scope="session") -def actors(ray_cluster): +def actors(ray_cluster_with_npu): world_size = 2 group_name = "hccl_group" actors = [ diff --git a/tests/collective/test_hccl_via_registry.py b/tests/collective/test_hccl_via_registry.py new file mode 100644 index 0000000..ed2daf6 --- /dev/null +++ b/tests/collective/test_hccl_via_registry.py @@ -0,0 +1,228 @@ +import pytest +import ray +import torch +from ray.experimental.collective import create_collective_group +from ray.util.collective import ( + allgather, + allreduce, + broadcast, + recv, + reduce, + reducescatter, + send, +) +from ray.util.collective.types import ReduceOp + +from ray_ascend import register_hccl_collective_backend + + +@pytest.fixture(scope="session") +def actors(ray_cluster_with_npu): + register_hccl_collective_backend() + world_size = 2 + group_name = "hccl_group" + actors = [HCCLRegistryTestActor.remote(group_name) for _ in range(world_size)] + + # Create collective group using Ray's experimental interface + create_collective_group( + actors=actors, + backend="HCCL", + name=group_name, + ) + + yield actors + + for actor in actors: + try: + ray.get(actor.destroy.remote()) + except Exception: + # Best-effort cleanup; rely on Ray shutdown for process teardown. + pass + + +@ray.remote(resources={"NPU": 1}) +class HCCLRegistryTestActor: + def __init__(self, group_name): + register_hccl_collective_backend() + self.group_name = group_name + + def destroy(self): + pass + + def get_rank(self): + return ray.get_gpu_ids()[0] if ray.get_gpu_ids() else None + + def test_allreduce(self, tensor_data): + """Test allreduce operation.""" + device = torch.npu.current_device() + tensor = torch.tensor(tensor_data, dtype=torch.float32, device=device) + allreduce(tensor, group_name=self.group_name, op=ReduceOp.SUM) + return tensor.cpu().tolist() + + def test_broadcast(self, tensor_data, src_rank=0): + """Test broadcast operation.""" + device = torch.npu.current_device() + tensor = torch.tensor(tensor_data, dtype=torch.float32, device=device) + broadcast(tensor, src_rank=src_rank, group_name=self.group_name) + return tensor.cpu().tolist() + + def test_send(self, tensor_data, dst_rank): + """Test send operation.""" + device = torch.npu.current_device() + tensor = torch.tensor(tensor_data, dtype=torch.float32, device=device) + send(tensor, dst_rank=dst_rank, group_name=self.group_name) + + def test_recv(self, tensor_shape, src_rank): + """Test recv operation.""" + device = torch.npu.current_device() + tensor = torch.zeros(tensor_shape, dtype=torch.float32, device=device) + recv(tensor, src_rank=src_rank, group_name=self.group_name) + return tensor.cpu().tolist() + + def test_allgather(self, tensor_data): + """Test allgather operation.""" + device = torch.npu.current_device() + tensor = torch.tensor(tensor_data, dtype=torch.float32, device=device) + # Create output tensor list for each rank + tensor_list = [torch.zeros_like(tensor) for _ in range(2)] + allgather(tensor_list=tensor_list, tensor=tensor, group_name=self.group_name) + return [t.cpu().tolist() for t in tensor_list] + + def test_reduce(self, tensor_data, dst_rank=0): + """Test reduce operation.""" + device = torch.npu.current_device() + tensor = torch.tensor(tensor_data, dtype=torch.float32, device=device) + reduce(tensor, dst_rank=dst_rank, group_name=self.group_name, op=ReduceOp.SUM) + return tensor.cpu().tolist() + + def test_reducescatter(self, tensor_data_list): + """Test reducescatter operation.""" + device = torch.npu.current_device() + # tensor_data_list is a list of tensors (one per rank) + tensor_list = [ + torch.tensor(data, dtype=torch.float32, device=device) + for data in tensor_data_list + ] + output_tensor = torch.zeros_like(tensor_list[0]) + reducescatter( + tensor=output_tensor, + tensor_list=tensor_list, + group_name=self.group_name, + op=ReduceOp.SUM, + ) + return output_tensor.cpu().tolist() + + +def test_allreduce(actors): + """Test allreduce collective communication.""" + world_size = 2 + assert len(actors) == world_size, f"Expected {world_size} actors, got {len(actors)}" + + rank0_data = [1.0, 2.0, 3.0] + rank1_data = [4.0, 5.0, 6.0] + results = ray.get( + [ + actors[0].test_allreduce.remote(rank0_data), + actors[1].test_allreduce.remote(rank1_data), + ] + ) + expected = [5.0, 7.0, 9.0] + for result in results: + assert result == expected, f"Allreduce failed: {result} != {expected}" + + +def test_broadcast(actors): + """Test broadcast collective communication.""" + world_size = 2 + assert len(actors) == world_size, f"Expected {world_size} actors, got {len(actors)}" + + root_tensor = [10.0, 20.0] + results = ray.get( + [actor.test_broadcast.remote(root_tensor, src_rank=0) for actor in actors] + ) + for result in results: + assert result == root_tensor, f"Broadcast failed: {result} != {root_tensor}" + + +def test_allgather(actors): + """Test allgather collective communication.""" + world_size = 2 + assert len(actors) == world_size, f"Expected {world_size} actors, got {len(actors)}" + + rank0_data = [1.0, 2.0] + rank1_data = [3.0, 4.0] + results = ray.get( + [ + actors[0].test_allgather.remote(rank0_data), + actors[1].test_allgather.remote(rank1_data), + ] + ) + for i, result in enumerate(results): + result_flattened = [item for sublist in result for item in sublist] + all_values = sorted(result_flattened) + expected_values = sorted([1.0, 2.0, 3.0, 4.0]) + assert ( + all_values == expected_values + ), f"Allgather failed for rank {i}: {all_values} != {expected_values}" + assert ( + len(result) == 2 + ), f"Allgather failed for rank {i}: expected 2 gathered tensors, got {len(result)}" + + +def test_reduce(actors): + """Test reduce collective communication.""" + world_size = 2 + assert len(actors) == world_size, f"Expected {world_size} actors, got {len(actors)}" + + rank0_data = [1.0, 2.0, 3.0] + rank1_data = [4.0, 5.0, 6.0] + results = ray.get( + [ + actors[0].test_reduce.remote(rank0_data, dst_rank=0), + actors[1].test_reduce.remote(rank1_data, dst_rank=0), + ] + ) + expected_root = [5.0, 7.0, 9.0] + assert ( + results[0] == expected_root + ), f"Reduce failed for root rank: {results[0]} != {expected_root}" + + +def test_reducescatter(actors): + """Test reducescatter collective communication.""" + world_size = 2 + assert len(actors) == world_size, f"Expected {world_size} actors, got {len(actors)}" + + rank0_data = [1.0, 2.0, 3.0] + rank1_data = [4.0, 5.0, 6.0] + results = ray.get( + [ + actors[0].test_reducescatter.remote([rank0_data, rank1_data]), + actors[1].test_reducescatter.remote([rank0_data, rank1_data]), + ] + ) + expected_rank0 = [2.0, 4.0, 6.0] + expected_rank1 = [8.0, 10.0, 12.0] + assert ( + results[0] == expected_rank0 + ), f"Reducescatter failed for rank 0: {results[0]} != {expected_rank0}" + assert ( + results[1] == expected_rank1 + ), f"Reducescatter failed for rank 1: {results[1]} != {expected_rank1}" + + +def test_send_recv(actors): + """Test send/recv point-to-point communication.""" + world_size = 2 + assert len(actors) == world_size, f"Expected {world_size} actors, got {len(actors)}" + + tensor_data = [7.0, 8.0, 9.0] + tensor_shape = (3,) + + send_task = actors[0].test_send.remote(tensor_data, dst_rank=1) + recv_task = actors[1].test_recv.remote(tensor_shape, src_rank=0) + + ray.get(send_task) + result = ray.get(recv_task) + + assert result == tensor_data, f"Send/recv failed: {result} != {tensor_data}" diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..6897acc --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,37 @@ +import pytest +import ray +import torch + +# Default NPU resource count for HCCL tests +DEFAULT_NPU_COUNT = 2 + + +@pytest.fixture(scope="session") +def ray_cluster_with_npu(): + """ + Initialize Ray cluster with NPU resources for HCCL tests. + + This fixture provides a Ray cluster initialized with NPU resources + required for HCCL (Huawei Collective Communication Library) operations. + Raises an error if torch_npu is not available or insufficient NPU devices. + """ + if not hasattr(torch, "npu"): + raise RuntimeError( + "torch.npu is not available. " + "Please install torch_npu following the guide at: " + "https://gitcode.com/Ascend/pytorch" + ) + if torch.npu.device_count() < DEFAULT_NPU_COUNT: + raise RuntimeError( + f"Not enough NPU devices for HCCL tests. " + f"Required: {DEFAULT_NPU_COUNT}, Available: {torch.npu.device_count()}" + ) + if not ray.is_initialized(): + try: + ray.init(ignore_reinit_error=True, resources={"NPU": DEFAULT_NPU_COUNT}) + except ValueError: + # Likely connecting to an existing cluster; do not pass resources. + ray.init(ignore_reinit_error=True) + yield + if ray.is_initialized(): + ray.shutdown() diff --git a/tests/direct_transport/test_hccl_tensor_transport.py b/tests/direct_transport/test_hccl_tensor_transport.py new file mode 100644 index 0000000..76eb249 --- /dev/null +++ b/tests/direct_transport/test_hccl_tensor_transport.py @@ -0,0 +1,58 @@ +import pytest +import ray +import torch +from ray.experimental.collective import create_collective_group + +from ray_ascend import register_hccl_tensor_transport + +register_hccl_tensor_transport() + + +@pytest.fixture(scope="session") +def actors(ray_cluster_with_npu): + """Create actors with HCCL tensor transport registered.""" + world_size = 2 + group_name = "hccl_transport_group" + actors = [ + HCCLTensorTransportTestActor.remote(group_name) for _ in range(world_size) + ] + + # Create collective group using Ray's experimental interface + create_collective_group( + actors=actors, + backend="HCCL", + name=group_name, + ) + + yield actors + + +@ray.remote(resources={"NPU": 1}) +class HCCLTensorTransportTestActor: + """Test actor for HCCL tensor transport.""" + + def __init__(self, group_name): + register_hccl_tensor_transport() + self.group_name = group_name + + @ray.method(tensor_transport="HCCL") + def create_tensor(self): + """Return a random tensor on NPU.""" + return torch.tensor([1, 2, 3]).npu() + + def sum(self, tensor: torch.Tensor): + """Return sum of tensor elements.""" + return torch.sum(tensor) + + +def test_hccl_tensor_transport(actors): + """Test basic tensor transport between actors.""" + sender, receiver = actors[0], actors[1] + + # Send tensor from sender to receiver + tensor = sender.create_tensor.remote() + result = receiver.sum.remote(tensor) + + expected_sum = 6 # 1 + 2 + 3 + actual_sum = ray.get(result) + assert actual_sum == expected_sum, f"Expected sum {expected_sum}, got {actual_sum}" From 1cb149c58198605f8f094fc32defd14072556451 Mon Sep 17 00:00:00 2001 From: Haichuan Hu Date: Sat, 30 May 2026 15:04:49 +0800 Subject: [PATCH 2/8] [feat]: Support hccl tensor transport Signed-off-by: Haichuan Hu --- README.md | 7 +++++++ docs/user_guide/hccl_collective.md | 4 +++- docs/user_guide/installation.md | 2 ++ ray_ascend/__init__.py | 15 ++++++++++++++- tests/collective/test_hccl_via_registry.py | 6 ++++++ .../test_hccl_tensor_transport.py | 6 ++++++ 6 files changed, 38 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 101a8d5..f1f7eb7 100644 --- a/README.md +++ b/README.md @@ -139,6 +139,13 @@ npu_tensor = ray.get(sender.transfer_npu_tensor_via_hccs.remote()) cpu_tensor = ray.get(sender.transfer_cpu_tensor_via_rdma.remote()) ``` +## Ray Version Compatibility + +| Ray Version | YR Transport | HCCL Collective | HCCL Tensor Transport (RDT) | +|-------------|-------------|-----------------|-----------------------------| +| 2.55 | ✅ | ❌ | ❌ | +| >= 2.56 | ✅ | ✅ | ✅ | + ## Contributing See [CONTRIBUTING](./CONTRIBUTING.md) and [developer guide](https://ascend.github.io/ray-ascend/developer_guide/) for more details—a step-by-step guide to help diff --git a/docs/user_guide/hccl_collective.md b/docs/user_guide/hccl_collective.md index cccae90..717a196 100644 --- a/docs/user_guide/hccl_collective.md +++ b/docs/user_guide/hccl_collective.md @@ -1,10 +1,12 @@ # HCCL Collective Communication -> _Last updated: 03/24/2026_ +> _Last updated: 05/30/2026_ ray-ascend provides HCCL (Huawei Collective Communication Library) support for distributed collective operations across Ray actors. +> **Note**: HCCL collective backend requires Ray >= 2.56. + ## Available Collective Operations - **broadcast**: Send data from one rank to all ranks diff --git a/docs/user_guide/installation.md b/docs/user_guide/installation.md index 0e8ee91..4b886ad 100644 --- a/docs/user_guide/installation.md +++ b/docs/user_guide/installation.md @@ -6,6 +6,8 @@ ### Basic Installation (HCCL Only) +> **Note**: HCCL collective and tensor transport features require Ray >= 2.56. + Install the base package with HCCL collective communication support: ```bash diff --git a/ray_ascend/__init__.py b/ray_ascend/__init__.py index 0e4a1d7..2bb2603 100644 --- a/ray_ascend/__init__.py +++ b/ray_ascend/__init__.py @@ -94,6 +94,8 @@ def register_hccl_collective_backend() -> None: """ Register HCCL collective backend for Ray. + Requires Ray >= 2.56. For older Ray versions, this API is not available. + This function must be called in each Ray worker/actor process before using HCCL collective operations. @@ -116,7 +118,16 @@ def __init__(self): group_name="my_group", ) """ - from ray.util.collective.backend_registry import register_collective_backend + try: + from ray.util.collective.backend_registry import register_collective_backend + except ImportError as e: + import ray + + raise RuntimeError( + f"register_hccl_collective_backend requires Ray >= 2.56, " + f"but Ray {ray.__version__} is installed. " + f"Please upgrade: pip install 'ray>=2.56'" + ) from e from .collective.hccl_collective_group import HCCLGroup @@ -127,6 +138,8 @@ def register_hccl_tensor_transport() -> None: """ Register HCCL backend and tensor transport for Ray. + Requires Ray >= 2.56. For older Ray versions, this API is not available. + This function must be called in each Ray worker/actor process before using HCCL collective operations or tensor transport. diff --git a/tests/collective/test_hccl_via_registry.py b/tests/collective/test_hccl_via_registry.py index ed2daf6..b3d6a3a 100644 --- a/tests/collective/test_hccl_via_registry.py +++ b/tests/collective/test_hccl_via_registry.py @@ -1,4 +1,10 @@ import pytest + +pytest.importorskip( + "ray.util.collective.backend_registry", + reason="HCCL collective backend requires Ray >= 2.56", +) + import ray import torch from ray.experimental.collective import create_collective_group diff --git a/tests/direct_transport/test_hccl_tensor_transport.py b/tests/direct_transport/test_hccl_tensor_transport.py index 76eb249..b11b183 100644 --- a/tests/direct_transport/test_hccl_tensor_transport.py +++ b/tests/direct_transport/test_hccl_tensor_transport.py @@ -1,4 +1,10 @@ import pytest + +pytest.importorskip( + "ray.util.collective.backend_registry", + reason="HCCL tensor transport requires Ray >= 2.56", +) + import ray import torch from ray.experimental.collective import create_collective_group From 4e2acbf9d5ea4acb664a621401032180fe67baf7 Mon Sep 17 00:00:00 2001 From: Haichuan Hu Date: Sat, 30 May 2026 16:08:48 +0800 Subject: [PATCH 3/8] [feat]: Support hccl tensor transport Signed-off-by: Haichuan Hu --- README.md | 2 +- docs/user_guide/hccl_collective.md | 57 +++---------------- .../collective/hccl_collective_group.py | 2 +- 3 files changed, 9 insertions(+), 52 deletions(-) diff --git a/README.md b/README.md index f1f7eb7..14f90a8 100644 --- a/README.md +++ b/README.md @@ -143,7 +143,7 @@ cpu_tensor = ray.get(sender.transfer_cpu_tensor_via_rdma.remote()) | Ray Version | YR Transport | HCCL Collective | HCCL Tensor Transport (RDT) | |-------------|-------------|-----------------|-----------------------------| -| 2.55 | ✅ | ❌ | ❌ | +| >=2.55, <2.56 | ✅ | ❌ | ❌ | | >= 2.56 | ✅ | ✅ | ✅ | ## Contributing diff --git a/docs/user_guide/hccl_collective.md b/docs/user_guide/hccl_collective.md index 717a196..ef3e3c0 100644 --- a/docs/user_guide/hccl_collective.md +++ b/docs/user_guide/hccl_collective.md @@ -17,44 +17,30 @@ distributed collective operations across Ray actors. - **send/recv**: Point-to-point communication - **barrier**: Synchronize all ranks -## Quick Example: HCCL Collective Group +## Quick Example ```python import ray +import torch from ray.util import collective -from ray_ascend.collective.hccl_collective_group import HCCLGroup +from ray_ascend import register_hccl_collective_backend -# Initialize Ray ray.init() +register_hccl_collective_backend() -# Register the HCCL backend -ray.register_collective_backend("HCCL", HCCLGroup) - -# Create actors with NPU resources @ray.remote(resources={"NPU": 1}) class Worker: def __init__(self): - import torch - import torch_npu - self.device = torch.npu.current_device() - - def setup_group(self, world_size, rank, group_name): - self.group = HCCLGroup(world_size, rank, group_name) + register_hccl_collective_backend() def do_allreduce(self, data): - import torch tensor = torch.tensor(data, dtype=torch.float32).npu() - self.group.allreduce(tensor) + collective.allreduce(tensor, group_name="my_hccl_group") return tensor.cpu().tolist() - def destroy(self): - self.group.destroy_group() - -# Create workers world_size = 2 actors = [Worker.remote() for _ in range(world_size)] -# Create collective group collective.create_collective_group( actors, world_size, @@ -63,44 +49,15 @@ collective.create_collective_group( group_name="my_hccl_group", ) -# Perform allreduce results = ray.get([ actors[i].do_allreduce.remote([1.0 * (i + 1), 2.0 * (i + 1)]) for i in range(world_size) ]) -print("Allreduce results:", results) # Both should show [3.0, 6.0] +print("Allreduce results:", results) -# Cleanup -ray.get([actor.destroy.remote() for actor in actors]) ray.shutdown() ``` -## Using Ray's Collective API - -You can also use Ray's high-level collective API: - -```python -import ray -from ray.util import collective -from ray_ascend.collective.hccl_collective_group import HCCLGroup - -ray.init() -ray.register_collective_backend("HCCL", HCCLGroup) - -@ray.remote(resources={"NPU": 1}) -class Worker: - def broadcast_tensor(self, src_rank=0): - import torch - tensor = torch.ones(10).npu() if self.rank == src_rank else torch.zeros(10).npu() - collective.broadcast(tensor, src_rank=src_rank, group_name="my_hccl_group") - return tensor.cpu().tolist() - -# Create and setup group... - -# Each actor broadcasts in SPMD manner -results = ray.get([actor.broadcast_tensor.remote() for actor in actors]) -``` - ## Supported Tensor Types HCCL supports common PyTorch types: diff --git a/ray_ascend/collective/hccl_collective_group.py b/ray_ascend/collective/hccl_collective_group.py index 6fb538b..a5b5b04 100644 --- a/ray_ascend/collective/hccl_collective_group.py +++ b/ray_ascend/collective/hccl_collective_group.py @@ -164,7 +164,7 @@ def destroy_group(self) -> None: @classmethod def backend(cls) -> Backend: """Return the backend type for this group.""" - return "HCCL" + return Backend.HCCL @classmethod def check_backend_availability(cls) -> bool: From aad41e2eb3675df20cd3e05de5b97514291b32ac Mon Sep 17 00:00:00 2001 From: Haichuan Hu Date: Sat, 30 May 2026 16:14:01 +0800 Subject: [PATCH 4/8] [feat]: Support hccl tensor transport Signed-off-by: Haichuan Hu --- tests/collective/test_hccl_group.py | 227 ---------------------------- 1 file changed, 227 deletions(-) delete mode 100644 tests/collective/test_hccl_group.py diff --git a/tests/collective/test_hccl_group.py b/tests/collective/test_hccl_group.py deleted file mode 100644 index ee7e515..0000000 --- a/tests/collective/test_hccl_group.py +++ /dev/null @@ -1,227 +0,0 @@ -import pytest -import ray -import torch -from ray.util.collective.types import ( - AllGatherOptions, - BroadcastOptions, - RecvOptions, - ReduceOp, - ReduceOptions, - ReduceScatterOptions, - SendOptions, -) - -from ray_ascend.collective.hccl_collective_group import HCCLGroup - - -@pytest.fixture(scope="session") -def actors(ray_cluster_with_npu): - world_size = 2 - group_name = "hccl_group" - actors = [ - HCCLTestActor.remote(rank, world_size, group_name) for rank in range(world_size) - ] - yield actors - - for actor in actors: - try: - ray.get(actor.destroy.remote()) - except Exception: - # Best-effort cleanup; rely on Ray shutdown for process teardown. - pass - - -@ray.remote(resources={"NPU": 1}) -class HCCLTestActor: - def __init__(self, rank, world_size, group_name="test_group"): - self.rank = rank - self.world_size = world_size - self.group = HCCLGroup(world_size, rank, group_name) - - def destroy(self): - self.group.destroy_group() - - def get_rank(self): - return self.rank - - def test_allreduce(self, tensor_data): - """Test allreduce operation.""" - tensor = torch.tensor(tensor_data, dtype=torch.float32).npu() - self.group.allreduce(tensor) - return tensor.cpu().tolist() - - def test_broadcast(self, tensor_data, root_rank=0): - """Test broadcast operation.""" - tensor = torch.tensor(tensor_data, dtype=torch.float32).npu() - broadcast_options = BroadcastOptions() - broadcast_options.root_rank = root_rank - broadcast_options.root_tensor = 0 - self.group.broadcast(tensor, broadcast_options=broadcast_options) - return tensor.cpu().tolist() - - def test_send(self, tensor_data, dst_rank): - """Test send operation.""" - tensor = torch.tensor(tensor_data, dtype=torch.float32).npu() - send_options = SendOptions() - send_options.dst_rank = dst_rank - send_options.dst_gpu_index = 0 - self.group.send(tensor, send_options=send_options) - - def test_recv(self, tensor_shape, src_rank): - """Test recv operation.""" - tensor = torch.zeros(tensor_shape, dtype=torch.float32).npu() - recv_options = RecvOptions() - recv_options.src_rank = src_rank - recv_options.src_gpu_index = 0 - self.group.recv(tensor, recv_options=recv_options) - return tensor.cpu().tolist() - - def test_allgather(self, tensor_data): - """Test allgather operation.""" - tensor = torch.tensor(tensor_data, dtype=torch.float32).npu() - # Create output tensors list for each rank - output_tensors = [torch.zeros_like(tensor) for _ in range(self.world_size)] - allgather_options = AllGatherOptions() - self.group.allgather( - output_tensors, tensor, allgather_options=allgather_options - ) - return [t.cpu().tolist() for t in output_tensors] - - def test_reduce(self, tensor_data, root_rank=0): - """Test reduce operation.""" - tensor = torch.tensor(tensor_data, dtype=torch.float32).npu() - reduce_options = ReduceOptions() - reduce_options.root_rank = root_rank - reduce_options.root_tensor = 0 - reduce_options.reduceOp = ReduceOp.SUM - self.group.reduce(tensor, reduce_options=reduce_options) - return tensor.cpu().tolist() - - def test_reducescatter(self, tensor_data_list): - """Test reducescatter operation.""" - # tensor_data_list is a list of tensors (one per rank) - input_tensors = [ - torch.tensor(data, dtype=torch.float32).npu() for data in tensor_data_list - ] - output_tensor = torch.zeros_like(input_tensors[0]) - reducescatter_options = ReduceScatterOptions() - reducescatter_options.reduceOp = ReduceOp.SUM - self.group.reducescatter( - output_tensor, input_tensors, reducescatter_options=reducescatter_options - ) - return output_tensor.cpu().tolist() - - -def test_allreduce(actors): - """Test allreduce collective communication.""" - world_size = 2 - assert len(actors) == world_size, f"Expected {world_size} actors, got {len(actors)}" - - rank0_data = [1.0, 2.0, 3.0] - rank1_data = [4.0, 5.0, 6.0] - results = ray.get( - [ - actors[0].test_allreduce.remote(rank0_data), - actors[1].test_allreduce.remote(rank1_data), - ] - ) - expected = [5.0, 7.0, 9.0] - for result in results: - assert result == expected, f"Allreduce failed: {result} != {expected}" - - -def test_broadcast(actors): - """Test broadcast collective communication.""" - world_size = 2 - assert len(actors) == world_size, f"Expected {world_size} actors, got {len(actors)}" - - root_tensor = [10.0, 20.0] - results = ray.get( - [actor.test_broadcast.remote(root_tensor, root_rank=0) for actor in actors] - ) - for result in results: - assert result == root_tensor, f"Broadcast failed: {result} != {root_tensor}" - - -def test_allgather(actors): - """Test allgather collective communication.""" - world_size = 2 - assert len(actors) == world_size, f"Expected {world_size} actors, got {len(actors)}" - - rank0_data = [1.0, 2.0] - rank1_data = [3.0, 4.0] - results = ray.get( - [ - actors[0].test_allgather.remote(rank0_data), - actors[1].test_allgather.remote(rank1_data), - ] - ) - for i, result in enumerate(results): - result_flattened = [item for sublist in result for item in sublist] - all_values = sorted(result_flattened) - expected_values = sorted([1.0, 2.0, 3.0, 4.0]) - assert ( - all_values == expected_values - ), f"Allgather failed for rank {i}: {all_values} != {expected_values}" - assert ( - len(result) == 2 - ), f"Allgather failed for rank {i}: expected 2 gathered tensors, got {len(result)}" - - -def test_reduce(actors): - """Test reduce collective communication.""" - world_size = 2 - assert len(actors) == world_size, f"Expected {world_size} actors, got {len(actors)}" - - rank0_data = [1.0, 2.0, 3.0] - rank1_data = [4.0, 5.0, 6.0] - results = ray.get( - [ - actors[0].test_reduce.remote(rank0_data, root_rank=0), - actors[1].test_reduce.remote(rank1_data, root_rank=0), - ] - ) - expected_root = [5.0, 7.0, 9.0] - assert ( - results[0] == expected_root - ), f"Reduce failed for root rank: {results[0]} != {expected_root}" - - -def test_reducescatter(actors): - """Test reducescatter collective communication.""" - world_size = 2 - assert len(actors) == world_size, f"Expected {world_size} actors, got {len(actors)}" - - rank0_data = [1.0, 2.0, 3.0] - rank1_data = [4.0, 5.0, 6.0] - results = ray.get( - [ - actors[0].test_reducescatter.remote([rank0_data, rank1_data]), - actors[1].test_reducescatter.remote([rank0_data, rank1_data]), - ] - ) - expected_rank0 = [2.0, 4.0, 6.0] - expected_rank1 = [8.0, 10.0, 12.0] - assert ( - results[0] == expected_rank0 - ), f"Reducescatter failed for rank 0: {results[0]} != {expected_rank0}" - assert ( - results[1] == expected_rank1 - ), f"Reducescatter failed for rank 1: {results[1]} != {expected_rank1}" - - -def test_send_recv(actors): - """Test send/recv point-to-point communication.""" - world_size = 2 - assert len(actors) == world_size, f"Expected {world_size} actors, got {len(actors)}" - - tensor_data = [7.0, 8.0, 9.0] - tensor_shape = (3,) - - send_task = actors[0].test_send.remote(tensor_data, dst_rank=1) - recv_task = actors[1].test_recv.remote(tensor_shape, src_rank=0) - - ray.get(send_task) - result = ray.get(recv_task) - - assert result == tensor_data, f"Send/recv failed: {result} != {tensor_data}" From 0fb0560dea19ee03efcb3d065e27ee9b826eab35 Mon Sep 17 00:00:00 2001 From: Haichuan Hu Date: Sat, 30 May 2026 16:37:44 +0800 Subject: [PATCH 5/8] [feat]: Support hccl tensor transport Signed-off-by: Haichuan Hu --- docs/user_guide/hccl_collective.md | 46 ++++++++++++++++++++++++ docs/user_guide/hccl_transport.md | 56 ++++++++++++++++++++++++++++++ docs/user_guide/index.md | 3 +- docs/user_guide/yr_transport.md | 46 ------------------------ 4 files changed, 104 insertions(+), 47 deletions(-) create mode 100644 docs/user_guide/hccl_transport.md diff --git a/docs/user_guide/hccl_collective.md b/docs/user_guide/hccl_collective.md index ef3e3c0..3bf849c 100644 --- a/docs/user_guide/hccl_collective.md +++ b/docs/user_guide/hccl_collective.md @@ -58,6 +58,52 @@ print("Allreduce results:", results) ray.shutdown() ``` +## Point-to-Point Communication + +HCCL supports send/recv operations between specific ranks in a collective group: + +```python +import ray +import torch +from ray.util import collective +from ray_ascend import register_hccl_collective_backend + +ray.init() +register_hccl_collective_backend() + +@ray.remote(resources={"NPU": 1}) +class Worker: + def __init__(self): + register_hccl_collective_backend() + + def send_tensor(self, data, dst_rank): + tensor = torch.tensor(data, dtype=torch.float32).npu() + collective.send(tensor, dst_rank=dst_rank, group_name="p2p_group") + + def recv_tensor(self, shape, src_rank): + tensor = torch.zeros(shape, dtype=torch.float32).npu() + collective.recv(tensor, src_rank=src_rank, group_name="p2p_group") + return tensor.cpu().tolist() + +world_size = 2 +actors = [Worker.remote() for _ in range(world_size)] + +collective.create_collective_group( + actors, + world_size, + list(range(world_size)), + backend="HCCL", + group_name="p2p_group", +) + +# Rank 0 sends to rank 1 +ray.get(actors[0].send_tensor.remote([7.0, 8.0, 9.0], dst_rank=1)) +result = ray.get(actors[1].recv_tensor.remote((3,), src_rank=0)) +print("Received:", result) # [7.0, 8.0, 9.0] + +ray.shutdown() +``` + ## Supported Tensor Types HCCL supports common PyTorch types: diff --git a/docs/user_guide/hccl_transport.md b/docs/user_guide/hccl_transport.md new file mode 100644 index 0000000..1e93cb0 --- /dev/null +++ b/docs/user_guide/hccl_transport.md @@ -0,0 +1,56 @@ +# HCCL Tensor Transport + +> _Last updated: 05/30/2026_ + +HCCL tensor transport enables zero-copy transfer of NPU tensors between Ray actors via +HCCS (Huawei Cache Coherence System). + +> **Note**: HCCL tensor transport requires Ray >= 2.56. + +## Quick Example + +```python +import ray +import torch +from ray.util.collective import create_collective_group +from ray_ascend import register_hccl_tensor_transport + +ray.init() +register_hccl_tensor_transport() + +@ray.remote(resources={"NPU": 1}) +class RayActor: + def __init__(self): + register_hccl_tensor_transport() + + @ray.method(tensor_transport="HCCL") + def random_tensor(self): + return torch.zeros(1024, device="npu") + + def sum(self, tensor: torch.Tensor): + return torch.sum(tensor) + +sender, receiver = RayActor.remote(), RayActor.remote() +group = create_collective_group([sender, receiver], backend="HCCL") + +tensor = sender.random_tensor.remote() +result = receiver.sum.remote(tensor) +print(ray.get(result)) + +ray.shutdown() +``` + +## How It Works + +`register_hccl_tensor_transport()` registers both the HCCL collective backend and the +HCCL tensor transport. It must be called in the driver process and in each actor's +`__init__`. + +Under the hood, HCCL tensor transport uses Ray's `CollectiveTensorTransport` +infrastructure, which reuses the HCCL collective communicator for point-to-point tensor +transfers. A collective group must be created between the sender and receiver actors +before using `@ray.method(tensor_transport="HCCL")`. + +## Supported Device Types + +- **NPU**: Tensors on Ascend NPU devices (via HCCS) diff --git a/docs/user_guide/index.md b/docs/user_guide/index.md index 3209f14..2233f91 100644 --- a/docs/user_guide/index.md +++ b/docs/user_guide/index.md @@ -38,7 +38,8 @@ pip install "ray-ascend[yr]" - [Installation](installation.md): Detailed installation and setup instructions - [HCCL Collective Communication](hccl_collective.md): Collective operations guide -- [YR Direct Transport](yr_transport.md): Tensor transport guide +- [HCCL Tensor Transport](hccl_transport.md): NPU tensor transport via HCCS +- [YR Direct Transport](yr_transport.md): CPU/NPU tensor transport via RDMA/HCCS - [API Reference](api_reference.md): Complete API documentation - [Best Practices](best_practices.md): Best practices, troubleshooting, and FAQ diff --git a/docs/user_guide/yr_transport.md b/docs/user_guide/yr_transport.md index c3bd3c1..599cca1 100644 --- a/docs/user_guide/yr_transport.md +++ b/docs/user_guide/yr_transport.md @@ -206,52 +206,6 @@ print("CPU tensor shape:", cpu_tensor.shape) ray.shutdown() ``` -## Combined HCCL + YR Transport - -For advanced use cases combining HCCL collective communication with YR direct transport: - -```python -import os -import ray -from ray.util.collective import create_collective_group -from ray_ascend import register_yr_tensor_transport -from ray_ascend.collective.hccl_collective_group import HCCLGroup - -ray.init(address="auto") - -# Register both backends -ray.register_collective_backend("HCCL", HCCLGroup) -register_yr_tensor_transport(["npu", "cpu"]) - -@ray.remote(resources={"NPU": 1}) -class RayActor: - def __init__(self): - import torch - import torch_npu - register_yr_tensor_transport(["npu", "cpu"]) - - @ray.method(tensor_transport="YR") - def random_tensor(self): - """Return an NPU tensor via YR transport.""" - import torch - return torch.zeros(1024, device="npu") - - def sum(self, tensor): - """Process a received tensor.""" - return tensor.sum() - -# Create actors -sender, receiver = RayActor.remote(), RayActor.remote() - -# Create HCCL collective group -group = create_collective_group([sender, receiver], backend="HCCL") - -# Use YR transport for tensor transfer -tensor = sender.random_tensor.remote() -result = receiver.sum.remote(tensor) -print(ray.get(result)) -``` - ## Decorator Usage Use `@ray.method(tensor_transport="YR")` on actor methods that return or receive tensors From 58ee47b0a77b7d18fcedb53b12bdae1d31a51921 Mon Sep 17 00:00:00 2001 From: Haichuan Hu Date: Sat, 30 May 2026 17:47:52 +0800 Subject: [PATCH 6/8] [feat]: Support hccl tensor transport Signed-off-by: Haichuan Hu --- README.md | 6 ------ 1 file changed, 6 deletions(-) diff --git a/README.md b/README.md index 14f90a8..5372cfc 100644 --- a/README.md +++ b/README.md @@ -38,12 +38,6 @@ For performance benchmarks, see the [Performance Benchmark Report](./docs/develo - torch == 2.7.1, torch-npu == 2.7.1.post1 - Ray (same version as ray-ascend) -## Version - -| Version | Release Type | Doc | -| --------- | ------------------------ | --- | -| 0.54.0rc1 | Latest Release Candidate | | - ## Quick Start ### Installation From 35bfcd1c2cb18e1e3fb05862b524196dfb383b05 Mon Sep 17 00:00:00 2001 From: Haichuan Hu Date: Mon, 1 Jun 2026 11:00:49 +0800 Subject: [PATCH 7/8] [feat]: Support hccl tensor transport Signed-off-by: Haichuan Hu --- ray_ascend/collective/hccl_collective_group.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/ray_ascend/collective/hccl_collective_group.py b/ray_ascend/collective/hccl_collective_group.py index a5b5b04..f79bb8d 100644 --- a/ray_ascend/collective/hccl_collective_group.py +++ b/ray_ascend/collective/hccl_collective_group.py @@ -2,7 +2,7 @@ import datetime import logging import time -from typing import Any, List, Optional, Sequence, Tuple +from typing import Any, List, Optional, Sequence, Tuple, Union import ray import torch @@ -622,11 +622,13 @@ def _init_collective_communicator(self) -> None: self._stream = stream self._device = device - def _validate_tensor(self, tensor: torch.Tensor) -> torch.Tensor: - """Validate a single tensor and return it. + def _validate_tensor( + self, tensor: Union[torch.Tensor, List[torch.Tensor]] + ) -> torch.Tensor: + """Validate a tensor and return it. - Enforces the single-device constraint and checks that the tensor device - matches the communicator's initialization device. + Accepts a Tensor or a single-element list (unwrapped automatically). + Enforces single-device constraint against the communicator's device. """ # If the input is a list of tensors, we only support single tensor list for now. # We will extract the single tensor out for validation. From 0b925de13075620779a82cdfd61010b7df629750 Mon Sep 17 00:00:00 2001 From: Haichuan Hu Date: Mon, 1 Jun 2026 16:05:34 +0800 Subject: [PATCH 8/8] [feat]: Support hccl tensor transport Signed-off-by: Haichuan Hu --- docs/user_guide/api_reference.md | 70 ++++++++++++------------------- docs/user_guide/best_practices.md | 4 +- 2 files changed, 28 insertions(+), 46 deletions(-) diff --git a/docs/user_guide/api_reference.md b/docs/user_guide/api_reference.md index c700fb8..39883b0 100644 --- a/docs/user_guide/api_reference.md +++ b/docs/user_guide/api_reference.md @@ -1,66 +1,48 @@ # API Reference -> _Last updated: 03/24/2026_ +> _Last updated: 05/30/2026_ -## HCCLGroup +## register_hccl_collective_backend -The main class for HCCL collective communication. - -### Constructor +Register HCCL collective backend for Ray. Requires Ray >= 2.56. ```python -HCCLGroup(world_size: int, rank: int, group_name: str) -``` +from ray_ascend import register_hccl_collective_backend -### Methods - -| Method | Description | -| ----------------------------------------------------------- | ------------------------------- | -| `broadcast(tensor, broadcast_options)` | Broadcast tensor from root rank | -| `allreduce(tensor, allreduce_options)` | All-reduce tensor across group | -| `allgather(tensor_list, tensor, allgather_options)` | Gather tensors from all ranks | -| `reduce(tensor, reduce_options)` | Reduce tensor to root rank | -| `reducescatter(tensor, tensor_list, reducescatter_options)` | Reduce and scatter | -| `send(tensor, send_options)` | Send tensor to peer | -| `recv(tensor, recv_options)` | Receive tensor from peer | -| `barrier(barrier_options)` | Synchronize all ranks | -| `destroy_group()` | Clean up communicator resources | +register_hccl_collective_backend() +``` -## YRTensorTransport +Must be called in both the driver process and each actor's `__init__`. -The main class for YR direct tensor transport. +## register_hccl_tensor_transport -### Constructor +Register HCCL backend and tensor transport for Ray. Requires Ray >= 2.56. ```python -YRTensorTransport() -``` - -### Environment Variables +from ray_ascend import register_hccl_tensor_transport -| Variable | Description | -| ------------------- | --------------------------------------------------- | -| `YR_DS_WORKER_HOST` | Host address of the YR DataSystem worker (required) | -| `YR_DS_WORKER_PORT` | Port of the YR DataSystem worker (required) | - -### Methods +register_hccl_tensor_transport() +``` -| Method | Description | -| --------------------------------------------------------------------------------- | ----------------------------------------- | -| `tensor_transport_backend()` | Returns "YR" | -| `is_one_sided()` | Returns True (one-sided communication) | -| `get_ds_client(device_type)` | Get or create the DataSystem client | -| `actor_has_tensor_transport(actor)` | Check if actor has YR transport available | -| `extract_tensor_transport_metadata(obj_id, gpu_object)` | Extract metadata for transport | -| `recv_multiple_tensors(obj_id, tensor_transport_metadata, communicator_metadata)` | Receive tensors | -| `garbage_collect(obj_id, tensor_transport_meta)` | Clean up resources | +Must be called in both the driver process and each actor's `__init__`. -### Registration +## register_yr_tensor_transport -Register YR tensor transport with: +Register YR tensor transport for Ray and initialize YR backend. ```python from ray_ascend import register_yr_tensor_transport register_yr_tensor_transport(["npu", "cpu"]) ``` + +Must be called in both the driver process and each actor's `__init__`. + +### Environment Variables + +| Variable | Default | Description | +| ---------------------- | ----------- | ------------------------------------------- | +| `YR_DS_INIT_MODE` | `metastore` | Initialization mode (`metastore` or `etcd`) | +| `YR_DS_WORKER_PORT` | `31501` | YR DS worker port | +| `YR_DS_METASTORE_PORT` | `2379` | Metastore service port | +| `YR_DS_ETCD_ADDRESS` | - | Etcd address (required for etcd mode) | diff --git a/docs/user_guide/best_practices.md b/docs/user_guide/best_practices.md index 3c35ea3..d57704e 100644 --- a/docs/user_guide/best_practices.md +++ b/docs/user_guide/best_practices.md @@ -9,7 +9,7 @@ 1. **Device Consistency**: Ensure all tensors used in collective operations reside on the same NPU device that was used during communicator initialization. -1. **Group Cleanup**: Always call `destroy_group()` when done to free communicator +1. **Group Cleanup**: Clean up collective group resources when done to free communicator resources. 1. **Rank Coordination**: All ranks must participate in collective operations in the @@ -47,7 +47,7 @@ **Problem**: "Collective ops must use the same device as communicator initialization" **Solution**: Ensure the tensor you're passing is on the same NPU device that was -current when the `HCCLGroup` was created. +current when the collective group was created. ### YR Transport Issues