diff --git a/.github/workflows/pre-commit.yml b/.github/workflows/pre-commit.yml index 505ef2c..2e0bf15 100644 --- a/.github/workflows/pre-commit.yml +++ b/.github/workflows/pre-commit.yml @@ -4,9 +4,6 @@ name: pre-commit # No need to avoid / cancel lightweight pre-commit jobs on: pull_request: - push: - branches: - - master # Declare permissions just read content. permissions: diff --git a/README.md b/README.md index 583289c..5372cfc 100644 --- a/README.md +++ b/README.md @@ -38,12 +38,6 @@ For performance benchmarks, see the [Performance Benchmark Report](./docs/develo - torch == 2.7.1, torch-npu == 2.7.1.post1 - Ray (same version as ray-ascend) -## Version - -| Version | Release Type | Doc | -| --------- | ------------------------ | --- | -| 0.54.0rc1 | Latest Release Candidate | | - ## Quick Start ### Installation @@ -57,9 +51,14 @@ pip install "ray-ascend[yr]" ```python import ray from ray.util import collective -from ray_ascend.collective import HCCLGroup +from ray_ascend import register_hccl_collective_backend + +register_hccl_collective_backend() -ray.register_collective_backend("HCCL", HCCLGroup) +@ray.remote(resources={"NPU": 1}) +class RayActor: + def __init__(self): + register_hccl_collective_backend() collective.create_collective_group( actors, @@ -79,16 +78,15 @@ collective.broadcast(tensor, src_rank=0, group_name="my_group") import ray import torch from ray.util.collective import create_collective_group -from ray.experimental import register_tensor_transport -from ray_ascend.collective import HCCLGroup -from ray_ascend.direct_transport import HCCLTensorTransport - -ray.register_collective_backend("HCCL", HCCLGroup) -register_tensor_transport("HCCL", ["npu"], HCCLTensorTransport, torch.Tensor) +from ray_ascend import register_hccl_tensor_transport +register_hccl_tensor_transport() @ray.remote(resources={"NPU": 1}) class RayActor: + def __init__(self): + register_hccl_tensor_transport() + @ray.method(tensor_transport="HCCL") def random_tensor(self): return torch.zeros(1024, device="npu") @@ -135,6 +133,13 @@ npu_tensor = ray.get(sender.transfer_npu_tensor_via_hccs.remote()) cpu_tensor = ray.get(sender.transfer_cpu_tensor_via_rdma.remote()) ``` +## Ray Version Compatibility + +| Ray Version | YR Transport | HCCL Collective | HCCL Tensor Transport (RDT) | +|-------------|-------------|-----------------|-----------------------------| +| >=2.55, <2.56 | ✅ | ❌ | ❌ | +| >= 2.56 | ✅ | ✅ | ✅ | + ## Contributing See [CONTRIBUTING](./CONTRIBUTING.md) and [developer guide](https://ascend.github.io/ray-ascend/developer_guide/) for more details—a step-by-step guide to help diff --git a/docs/user_guide/api_reference.md b/docs/user_guide/api_reference.md index c700fb8..39883b0 100644 --- a/docs/user_guide/api_reference.md +++ b/docs/user_guide/api_reference.md @@ -1,66 +1,48 @@ # API Reference -> _Last updated: 03/24/2026_ +> _Last updated: 05/30/2026_ -## HCCLGroup +## register_hccl_collective_backend -The main class for HCCL collective communication. - -### Constructor +Register HCCL collective backend for Ray. Requires Ray >= 2.56. ```python -HCCLGroup(world_size: int, rank: int, group_name: str) -``` +from ray_ascend import register_hccl_collective_backend -### Methods - -| Method | Description | -| ----------------------------------------------------------- | ------------------------------- | -| `broadcast(tensor, broadcast_options)` | Broadcast tensor from root rank | -| `allreduce(tensor, allreduce_options)` | All-reduce tensor across group | -| `allgather(tensor_list, tensor, allgather_options)` | Gather tensors from all ranks | -| `reduce(tensor, reduce_options)` | Reduce tensor to root rank | -| `reducescatter(tensor, tensor_list, reducescatter_options)` | Reduce and scatter | -| `send(tensor, send_options)` | Send tensor to peer | -| `recv(tensor, recv_options)` | Receive tensor from peer | -| `barrier(barrier_options)` | Synchronize all ranks | -| `destroy_group()` | Clean up communicator resources | +register_hccl_collective_backend() +``` -## YRTensorTransport +Must be called in both the driver process and each actor's `__init__`. -The main class for YR direct tensor transport. +## register_hccl_tensor_transport -### Constructor +Register HCCL backend and tensor transport for Ray. Requires Ray >= 2.56. ```python -YRTensorTransport() -``` - -### Environment Variables +from ray_ascend import register_hccl_tensor_transport -| Variable | Description | -| ------------------- | --------------------------------------------------- | -| `YR_DS_WORKER_HOST` | Host address of the YR DataSystem worker (required) | -| `YR_DS_WORKER_PORT` | Port of the YR DataSystem worker (required) | - -### Methods +register_hccl_tensor_transport() +``` -| Method | Description | -| --------------------------------------------------------------------------------- | ----------------------------------------- | -| `tensor_transport_backend()` | Returns "YR" | -| `is_one_sided()` | Returns True (one-sided communication) | -| `get_ds_client(device_type)` | Get or create the DataSystem client | -| `actor_has_tensor_transport(actor)` | Check if actor has YR transport available | -| `extract_tensor_transport_metadata(obj_id, gpu_object)` | Extract metadata for transport | -| `recv_multiple_tensors(obj_id, tensor_transport_metadata, communicator_metadata)` | Receive tensors | -| `garbage_collect(obj_id, tensor_transport_meta)` | Clean up resources | +Must be called in both the driver process and each actor's `__init__`. -### Registration +## register_yr_tensor_transport -Register YR tensor transport with: +Register YR tensor transport for Ray and initialize YR backend. ```python from ray_ascend import register_yr_tensor_transport register_yr_tensor_transport(["npu", "cpu"]) ``` + +Must be called in both the driver process and each actor's `__init__`. + +### Environment Variables + +| Variable | Default | Description | +| ---------------------- | ----------- | ------------------------------------------- | +| `YR_DS_INIT_MODE` | `metastore` | Initialization mode (`metastore` or `etcd`) | +| `YR_DS_WORKER_PORT` | `31501` | YR DS worker port | +| `YR_DS_METASTORE_PORT` | `2379` | Metastore service port | +| `YR_DS_ETCD_ADDRESS` | - | Etcd address (required for etcd mode) | diff --git a/docs/user_guide/best_practices.md b/docs/user_guide/best_practices.md index 3c35ea3..d57704e 100644 --- a/docs/user_guide/best_practices.md +++ b/docs/user_guide/best_practices.md @@ -9,7 +9,7 @@ 1. **Device Consistency**: Ensure all tensors used in collective operations reside on the same NPU device that was used during communicator initialization. -1. **Group Cleanup**: Always call `destroy_group()` when done to free communicator +1. **Group Cleanup**: Clean up collective group resources when done to free communicator resources. 1. **Rank Coordination**: All ranks must participate in collective operations in the @@ -47,7 +47,7 @@ **Problem**: "Collective ops must use the same device as communicator initialization" **Solution**: Ensure the tensor you're passing is on the same NPU device that was -current when the `HCCLGroup` was created. +current when the collective group was created. ### YR Transport Issues diff --git a/docs/user_guide/hccl_collective.md b/docs/user_guide/hccl_collective.md index 6d42072..3bf849c 100644 --- a/docs/user_guide/hccl_collective.md +++ b/docs/user_guide/hccl_collective.md @@ -1,10 +1,12 @@ # HCCL Collective Communication -> _Last updated: 03/24/2026_ +> _Last updated: 05/30/2026_ ray-ascend provides HCCL (Huawei Collective Communication Library) support for distributed collective operations across Ray actors. +> **Note**: HCCL collective backend requires Ray >= 2.56. + ## Available Collective Operations - **broadcast**: Send data from one rank to all ranks @@ -15,44 +17,30 @@ distributed collective operations across Ray actors. - **send/recv**: Point-to-point communication - **barrier**: Synchronize all ranks -## Quick Example: HCCL Collective Group +## Quick Example ```python import ray +import torch from ray.util import collective -from ray_ascend.collective import HCCLGroup +from ray_ascend import register_hccl_collective_backend -# Initialize Ray ray.init() +register_hccl_collective_backend() -# Register the HCCL backend -ray.register_collective_backend("HCCL", HCCLGroup) - -# Create actors with NPU resources @ray.remote(resources={"NPU": 1}) class Worker: def __init__(self): - import torch - import torch_npu - self.device = torch.npu.current_device() - - def setup_group(self, world_size, rank, group_name): - self.group = HCCLGroup(world_size, rank, group_name) + register_hccl_collective_backend() def do_allreduce(self, data): - import torch tensor = torch.tensor(data, dtype=torch.float32).npu() - self.group.allreduce(tensor) + collective.allreduce(tensor, group_name="my_hccl_group") return tensor.cpu().tolist() - def destroy(self): - self.group.destroy_group() - -# Create workers world_size = 2 actors = [Worker.remote() for _ in range(world_size)] -# Create collective group collective.create_collective_group( actors, world_size, @@ -61,42 +49,59 @@ collective.create_collective_group( group_name="my_hccl_group", ) -# Perform allreduce results = ray.get([ actors[i].do_allreduce.remote([1.0 * (i + 1), 2.0 * (i + 1)]) for i in range(world_size) ]) -print("Allreduce results:", results) # Both should show [3.0, 6.0] +print("Allreduce results:", results) -# Cleanup -ray.get([actor.destroy.remote() for actor in actors]) ray.shutdown() ``` -## Using Ray's Collective API +## Point-to-Point Communication -You can also use Ray's high-level collective API: +HCCL supports send/recv operations between specific ranks in a collective group: ```python import ray +import torch from ray.util import collective -from ray_ascend.collective import HCCLGroup +from ray_ascend import register_hccl_collective_backend ray.init() -ray.register_collective_backend("HCCL", HCCLGroup) +register_hccl_collective_backend() @ray.remote(resources={"NPU": 1}) class Worker: - def broadcast_tensor(self, src_rank=0): - import torch - tensor = torch.ones(10).npu() if self.rank == src_rank else torch.zeros(10).npu() - collective.broadcast(tensor, src_rank=src_rank, group_name="my_hccl_group") + def __init__(self): + register_hccl_collective_backend() + + def send_tensor(self, data, dst_rank): + tensor = torch.tensor(data, dtype=torch.float32).npu() + collective.send(tensor, dst_rank=dst_rank, group_name="p2p_group") + + def recv_tensor(self, shape, src_rank): + tensor = torch.zeros(shape, dtype=torch.float32).npu() + collective.recv(tensor, src_rank=src_rank, group_name="p2p_group") return tensor.cpu().tolist() -# Create and setup group... +world_size = 2 +actors = [Worker.remote() for _ in range(world_size)] -# Each actor broadcasts in SPMD manner -results = ray.get([actor.broadcast_tensor.remote() for actor in actors]) +collective.create_collective_group( + actors, + world_size, + list(range(world_size)), + backend="HCCL", + group_name="p2p_group", +) + +# Rank 0 sends to rank 1 +ray.get(actors[0].send_tensor.remote([7.0, 8.0, 9.0], dst_rank=1)) +result = ray.get(actors[1].recv_tensor.remote((3,), src_rank=0)) +print("Received:", result) # [7.0, 8.0, 9.0] + +ray.shutdown() ``` ## Supported Tensor Types diff --git a/docs/user_guide/hccl_transport.md b/docs/user_guide/hccl_transport.md new file mode 100644 index 0000000..1e93cb0 --- /dev/null +++ b/docs/user_guide/hccl_transport.md @@ -0,0 +1,56 @@ +# HCCL Tensor Transport + +> _Last updated: 05/30/2026_ + +HCCL tensor transport enables zero-copy transfer of NPU tensors between Ray actors via +HCCS (Huawei Cache Coherence System). + +> **Note**: HCCL tensor transport requires Ray >= 2.56. + +## Quick Example + +```python +import ray +import torch +from ray.util.collective import create_collective_group +from ray_ascend import register_hccl_tensor_transport + +ray.init() +register_hccl_tensor_transport() + +@ray.remote(resources={"NPU": 1}) +class RayActor: + def __init__(self): + register_hccl_tensor_transport() + + @ray.method(tensor_transport="HCCL") + def random_tensor(self): + return torch.zeros(1024, device="npu") + + def sum(self, tensor: torch.Tensor): + return torch.sum(tensor) + +sender, receiver = RayActor.remote(), RayActor.remote() +group = create_collective_group([sender, receiver], backend="HCCL") + +tensor = sender.random_tensor.remote() +result = receiver.sum.remote(tensor) +print(ray.get(result)) + +ray.shutdown() +``` + +## How It Works + +`register_hccl_tensor_transport()` registers both the HCCL collective backend and the +HCCL tensor transport. It must be called in the driver process and in each actor's +`__init__`. + +Under the hood, HCCL tensor transport uses Ray's `CollectiveTensorTransport` +infrastructure, which reuses the HCCL collective communicator for point-to-point tensor +transfers. A collective group must be created between the sender and receiver actors +before using `@ray.method(tensor_transport="HCCL")`. + +## Supported Device Types + +- **NPU**: Tensors on Ascend NPU devices (via HCCS) diff --git a/docs/user_guide/index.md b/docs/user_guide/index.md index 3209f14..2233f91 100644 --- a/docs/user_guide/index.md +++ b/docs/user_guide/index.md @@ -38,7 +38,8 @@ pip install "ray-ascend[yr]" - [Installation](installation.md): Detailed installation and setup instructions - [HCCL Collective Communication](hccl_collective.md): Collective operations guide -- [YR Direct Transport](yr_transport.md): Tensor transport guide +- [HCCL Tensor Transport](hccl_transport.md): NPU tensor transport via HCCS +- [YR Direct Transport](yr_transport.md): CPU/NPU tensor transport via RDMA/HCCS - [API Reference](api_reference.md): Complete API documentation - [Best Practices](best_practices.md): Best practices, troubleshooting, and FAQ diff --git a/docs/user_guide/installation.md b/docs/user_guide/installation.md index 0e8ee91..4b886ad 100644 --- a/docs/user_guide/installation.md +++ b/docs/user_guide/installation.md @@ -6,6 +6,8 @@ ### Basic Installation (HCCL Only) +> **Note**: HCCL collective and tensor transport features require Ray >= 2.56. + Install the base package with HCCL collective communication support: ```bash diff --git a/docs/user_guide/yr_transport.md b/docs/user_guide/yr_transport.md index 0c96137..599cca1 100644 --- a/docs/user_guide/yr_transport.md +++ b/docs/user_guide/yr_transport.md @@ -206,52 +206,6 @@ print("CPU tensor shape:", cpu_tensor.shape) ray.shutdown() ``` -## Combined HCCL + YR Transport - -For advanced use cases combining HCCL collective communication with YR direct transport: - -```python -import os -import ray -from ray.util.collective import create_collective_group -from ray_ascend import register_yr_tensor_transport -from ray_ascend.collective import HCCLGroup - -ray.init(address="auto") - -# Register both backends -ray.register_collective_backend("HCCL", HCCLGroup) -register_yr_tensor_transport(["npu", "cpu"]) - -@ray.remote(resources={"NPU": 1}) -class RayActor: - def __init__(self): - import torch - import torch_npu - register_yr_tensor_transport(["npu", "cpu"]) - - @ray.method(tensor_transport="YR") - def random_tensor(self): - """Return an NPU tensor via YR transport.""" - import torch - return torch.zeros(1024, device="npu") - - def sum(self, tensor): - """Process a received tensor.""" - return tensor.sum() - -# Create actors -sender, receiver = RayActor.remote(), RayActor.remote() - -# Create HCCL collective group -group = create_collective_group([sender, receiver], backend="HCCL") - -# Use YR transport for tensor transfer -tensor = sender.random_tensor.remote() -result = receiver.sum.remote(tensor) -print(ray.get(result)) -``` - ## Decorator Usage Use `@ray.method(tensor_transport="YR")` on actor methods that return or receive tensors diff --git a/ray_ascend/__init__.py b/ray_ascend/__init__.py index 2dcdd6d..2bb2603 100644 --- a/ray_ascend/__init__.py +++ b/ray_ascend/__init__.py @@ -9,6 +9,8 @@ "__version__", "__commit__", "register_yr_tensor_transport", + "register_hccl_collective_backend", + "register_hccl_tensor_transport", ] __commit__ = _version.commit @@ -86,3 +88,79 @@ def transfer_cpu_tensor_via_rdma(): # Register tensor transport register_tensor_transport("YR", devices, YRTensorTransport, torch.Tensor) + + +def register_hccl_collective_backend() -> None: + """ + Register HCCL collective backend for Ray. + + Requires Ray >= 2.56. For older Ray versions, this API is not available. + + This function must be called in each Ray worker/actor process + before using HCCL collective operations. + + Example: + from ray.util import collective + from ray_ascend import register_hccl_collective_backend + + register_hccl_collective_backend() + + @ray.remote(resources={"NPU": 1}) + class RayActor: + def __init__(self): + register_hccl_collective_backend() + + collective.create_collective_group( + actors, + len(actors), + list(range(0, len(actors))), + backend="HCCL", + group_name="my_group", + ) + """ + try: + from ray.util.collective.backend_registry import register_collective_backend + except ImportError as e: + import ray + + raise RuntimeError( + f"register_hccl_collective_backend requires Ray >= 2.56, " + f"but Ray {ray.__version__} is installed. " + f"Please upgrade: pip install 'ray>=2.56'" + ) from e + + from .collective.hccl_collective_group import HCCLGroup + + register_collective_backend("HCCL", HCCLGroup) + + +def register_hccl_tensor_transport() -> None: + """ + Register HCCL backend and tensor transport for Ray. + + Requires Ray >= 2.56. For older Ray versions, this API is not available. + + This function must be called in each Ray worker/actor process + before using HCCL collective operations or tensor transport. + + Example: + from ray_ascend import register_hccl_tensor_transport + + register_hccl_tensor_transport() + + @ray.remote(resources={"NPU": 1}) + class RayActor: + def __init__(self): + register_hccl_tensor_transport() + + @ray.method(tensor_transport="HCCL") + def transfer_npu_tensor_via_hccs(self): + return torch.tensor([1, 2, 3]).npu() + """ + import torch + from ray.experimental import register_tensor_transport + + from .direct_transport.hccl_tensor_transport import HCCLTensorTransport + + register_hccl_collective_backend() + register_tensor_transport("HCCL", ["npu"], HCCLTensorTransport, torch.Tensor) diff --git a/ray_ascend/collective/__init__.py b/ray_ascend/collective/__init__.py deleted file mode 100644 index d2626ab..0000000 --- a/ray_ascend/collective/__init__.py +++ /dev/null @@ -1,7 +0,0 @@ -from .hccl_collective_group import ( - HCCLGroup, -) - -__all__ = [ - "HCCLGroup", -] diff --git a/ray_ascend/collective/hccl_collective_group.py b/ray_ascend/collective/hccl_collective_group.py index 93a20d3..f79bb8d 100644 --- a/ray_ascend/collective/hccl_collective_group.py +++ b/ray_ascend/collective/hccl_collective_group.py @@ -2,7 +2,7 @@ import datetime import logging import time -from typing import Any, List, Optional, Sequence, Tuple +from typing import Any, List, Optional, Sequence, Tuple, Union import ray import torch @@ -23,6 +23,18 @@ logger = logging.getLogger(__name__) +try: + import importlib.util + + if importlib.util.find_spec("torch_npu") is None: + raise ImportError("torch_npu not found") + ctypes.CDLL("libhccl.so") + _HCCL_AVAILABLE = True + _LOG_HCCL_WARNING = False +except (ImportError, OSError): + _HCCL_AVAILABLE = False + _LOG_HCCL_WARNING = True + class HcclRootInfo(ctypes.Structure): _fields_ = [("internal", ctypes.c_byte * 4108)] @@ -154,6 +166,20 @@ def backend(cls) -> Backend: """Return the backend type for this group.""" return Backend.HCCL + @classmethod + def check_backend_availability(cls) -> bool: + """Check if the backend is available.""" + global _HCCL_AVAILABLE, _LOG_HCCL_WARNING + if not _HCCL_AVAILABLE and _LOG_HCCL_WARNING: + logger.warning( + "HCCL seems unavailable. Please install torch_npu " + "following the guide at: " + "https://gitcode.com/Ascend/pytorch and " + "ensure libhccl.so is available." + ) + _LOG_HCCL_WARNING = False + return _HCCL_AVAILABLE + def broadcast( self, tensor: torch.Tensor, @@ -232,6 +258,10 @@ def collective_fn( current_stream.wait_event(event) logger.debug(f"HcclAllGather execute result : {exec_result}") + # Handle case where tensor_list is wrapped in another list by Ray's collective.py + if tensor_list and isinstance(tensor_list[0], list): + tensor_list = tensor_list[0] + output_flattened = [_flatten_for_scatter_gather(tensor_list, copy=False)] input_tensor = self._validate_tensor(tensor) @@ -592,12 +622,18 @@ def _init_collective_communicator(self) -> None: self._stream = stream self._device = device - def _validate_tensor(self, tensor: torch.Tensor) -> torch.Tensor: - """Validate a single tensor and return it. + def _validate_tensor( + self, tensor: Union[torch.Tensor, List[torch.Tensor]] + ) -> torch.Tensor: + """Validate a tensor and return it. - Enforces the single-device constraint and checks that the tensor device - matches the communicator's initialization device. + Accepts a Tensor or a single-element list (unwrapped automatically). + Enforces single-device constraint against the communicator's device. """ + # If the input is a list of tensors, we only support single tensor list for now. + # We will extract the single tensor out for validation. + if isinstance(tensor, list) and len(tensor) == 1: + tensor = tensor[0] if not isinstance(tensor, torch.Tensor): raise RuntimeError("Collective ops require torch.Tensor inputs.") device = get_tensor_device(tensor) diff --git a/ray_ascend/direct_transport/__init__.py b/ray_ascend/direct_transport/__init__.py deleted file mode 100644 index 793add2..0000000 --- a/ray_ascend/direct_transport/__init__.py +++ /dev/null @@ -1,7 +0,0 @@ -from .yr_tensor_transport import ( - YRTensorTransport, -) - -__all__ = [ - "YRTensorTransport", -] diff --git a/ray_ascend/direct_transport/hccl_tensor_transport.py b/ray_ascend/direct_transport/hccl_tensor_transport.py new file mode 100644 index 0000000..2b34472 --- /dev/null +++ b/ray_ascend/direct_transport/hccl_tensor_transport.py @@ -0,0 +1,27 @@ +from typing import List, Optional + +import torch +from ray.experimental.rdt.collective_tensor_transport import ( + CollectiveTensorTransport, +) +from ray.experimental.rdt.tensor_transport_manager import ( + CommunicatorMetadata, + TensorTransportMetadata, +) + + +class HCCLTensorTransport(CollectiveTensorTransport): + def tensor_transport_backend(self) -> str: + return "HCCL" + + def recv_multiple_tensors( + self, + obj_id: str, + tensor_transport_metadata: TensorTransportMetadata, + communicator_metadata: CommunicatorMetadata, + target_buffers: Optional[List["torch.Tensor"]] = None, + ) -> List["torch.Tensor"]: + torch.npu.set_device(0) + return super().recv_multiple_tensors( # type: ignore[no-any-return] + obj_id, tensor_transport_metadata, communicator_metadata, target_buffers + ) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/collective/test_hccl_group.py b/tests/collective/test_hccl_via_registry.py similarity index 57% rename from tests/collective/test_hccl_group.py rename to tests/collective/test_hccl_via_registry.py index 7d8bf34..b3d6a3a 100644 --- a/tests/collective/test_hccl_group.py +++ b/tests/collective/test_hccl_via_registry.py @@ -1,42 +1,41 @@ import pytest + +pytest.importorskip( + "ray.util.collective.backend_registry", + reason="HCCL collective backend requires Ray >= 2.56", +) + import ray import torch -from ray.util.collective.types import ( - AllGatherOptions, - BroadcastOptions, - RecvOptions, - ReduceOp, - ReduceOptions, - ReduceScatterOptions, - SendOptions, +from ray.experimental.collective import create_collective_group +from ray.util.collective import ( + allgather, + allreduce, + broadcast, + recv, + reduce, + reducescatter, + send, ) +from ray.util.collective.types import ReduceOp -from ray_ascend.collective.hccl_collective_group import HCCLGroup +from ray_ascend import register_hccl_collective_backend @pytest.fixture(scope="session") -def ray_cluster(): +def actors(ray_cluster_with_npu): + register_hccl_collective_backend() world_size = 2 - if torch.npu.device_count() < world_size: - pytest.skip("Not enough NPU devices for HcclGroup tests") - if not ray.is_initialized(): - try: - ray.init(ignore_reinit_error=True, resources={"NPU": world_size}) - except ValueError: - # Likely connecting to an existing cluster; do not pass resources. - ray.init(ignore_reinit_error=True) - yield - if ray.is_initialized(): - ray.shutdown() + group_name = "hccl_group" + actors = [HCCLRegistryTestActor.remote(group_name) for _ in range(world_size)] + # Create collective group using Ray's experimental interface + create_collective_group( + actors=actors, + backend="HCCL", + name=group_name, + ) -@pytest.fixture(scope="session") -def actors(ray_cluster): - world_size = 2 - group_name = "hccl_group" - actors = [ - HCCLTestActor.remote(rank, world_size, group_name) for rank in range(world_size) - ] yield actors for actor in actors: @@ -48,82 +47,74 @@ def actors(ray_cluster): @ray.remote(resources={"NPU": 1}) -class HCCLTestActor: - def __init__(self, rank, world_size, group_name="test_group"): - self.rank = rank - self.world_size = world_size - self.group = HCCLGroup(world_size, rank, group_name) +class HCCLRegistryTestActor: + def __init__(self, group_name): + register_hccl_collective_backend() + self.group_name = group_name def destroy(self): - self.group.destroy_group() + pass def get_rank(self): - return self.rank + return ray.get_gpu_ids()[0] if ray.get_gpu_ids() else None def test_allreduce(self, tensor_data): """Test allreduce operation.""" - tensor = torch.tensor(tensor_data, dtype=torch.float32).npu() - self.group.allreduce(tensor) + device = torch.npu.current_device() + tensor = torch.tensor(tensor_data, dtype=torch.float32, device=device) + allreduce(tensor, group_name=self.group_name, op=ReduceOp.SUM) return tensor.cpu().tolist() - def test_broadcast(self, tensor_data, root_rank=0): + def test_broadcast(self, tensor_data, src_rank=0): """Test broadcast operation.""" - tensor = torch.tensor(tensor_data, dtype=torch.float32).npu() - broadcast_options = BroadcastOptions() - broadcast_options.root_rank = root_rank - broadcast_options.root_tensor = 0 - self.group.broadcast(tensor, broadcast_options=broadcast_options) + device = torch.npu.current_device() + tensor = torch.tensor(tensor_data, dtype=torch.float32, device=device) + broadcast(tensor, src_rank=src_rank, group_name=self.group_name) return tensor.cpu().tolist() def test_send(self, tensor_data, dst_rank): """Test send operation.""" - tensor = torch.tensor(tensor_data, dtype=torch.float32).npu() - send_options = SendOptions() - send_options.dst_rank = dst_rank - send_options.dst_gpu_index = 0 - self.group.send(tensor, send_options=send_options) + device = torch.npu.current_device() + tensor = torch.tensor(tensor_data, dtype=torch.float32, device=device) + send(tensor, dst_rank=dst_rank, group_name=self.group_name) def test_recv(self, tensor_shape, src_rank): """Test recv operation.""" - tensor = torch.zeros(tensor_shape, dtype=torch.float32).npu() - recv_options = RecvOptions() - recv_options.src_rank = src_rank - recv_options.src_gpu_index = 0 - self.group.recv(tensor, recv_options=recv_options) + device = torch.npu.current_device() + tensor = torch.zeros(tensor_shape, dtype=torch.float32, device=device) + recv(tensor, src_rank=src_rank, group_name=self.group_name) return tensor.cpu().tolist() def test_allgather(self, tensor_data): """Test allgather operation.""" - tensor = torch.tensor(tensor_data, dtype=torch.float32).npu() - # Create output tensors list for each rank - output_tensors = [torch.zeros_like(tensor) for _ in range(self.world_size)] - allgather_options = AllGatherOptions() - self.group.allgather( - output_tensors, tensor, allgather_options=allgather_options - ) - return [t.cpu().tolist() for t in output_tensors] - - def test_reduce(self, tensor_data, root_rank=0): + device = torch.npu.current_device() + tensor = torch.tensor(tensor_data, dtype=torch.float32, device=device) + # Create output tensor list for each rank + tensor_list = [torch.zeros_like(tensor) for _ in range(2)] + allgather(tensor_list=tensor_list, tensor=tensor, group_name=self.group_name) + return [t.cpu().tolist() for t in tensor_list] + + def test_reduce(self, tensor_data, dst_rank=0): """Test reduce operation.""" - tensor = torch.tensor(tensor_data, dtype=torch.float32).npu() - reduce_options = ReduceOptions() - reduce_options.root_rank = root_rank - reduce_options.root_tensor = 0 - reduce_options.reduceOp = ReduceOp.SUM - self.group.reduce(tensor, reduce_options=reduce_options) + device = torch.npu.current_device() + tensor = torch.tensor(tensor_data, dtype=torch.float32, device=device) + reduce(tensor, dst_rank=dst_rank, group_name=self.group_name, op=ReduceOp.SUM) return tensor.cpu().tolist() def test_reducescatter(self, tensor_data_list): """Test reducescatter operation.""" + device = torch.npu.current_device() # tensor_data_list is a list of tensors (one per rank) - input_tensors = [ - torch.tensor(data, dtype=torch.float32).npu() for data in tensor_data_list + tensor_list = [ + torch.tensor(data, dtype=torch.float32, device=device) + for data in tensor_data_list ] - output_tensor = torch.zeros_like(input_tensors[0]) - reducescatter_options = ReduceScatterOptions() - reducescatter_options.reduceOp = ReduceOp.SUM - self.group.reducescatter( - output_tensor, input_tensors, reducescatter_options=reducescatter_options + output_tensor = torch.zeros_like(tensor_list[0]) + reducescatter( + tensor=output_tensor, + tensor_list=tensor_list, + group_name=self.group_name, + op=ReduceOp.SUM, ) return output_tensor.cpu().tolist() @@ -153,7 +144,7 @@ def test_broadcast(actors): root_tensor = [10.0, 20.0] results = ray.get( - [actor.test_broadcast.remote(root_tensor, root_rank=0) for actor in actors] + [actor.test_broadcast.remote(root_tensor, src_rank=0) for actor in actors] ) for result in results: assert result == root_tensor, f"Broadcast failed: {result} != {root_tensor}" @@ -193,8 +184,8 @@ def test_reduce(actors): rank1_data = [4.0, 5.0, 6.0] results = ray.get( [ - actors[0].test_reduce.remote(rank0_data, root_rank=0), - actors[1].test_reduce.remote(rank1_data, root_rank=0), + actors[0].test_reduce.remote(rank0_data, dst_rank=0), + actors[1].test_reduce.remote(rank1_data, dst_rank=0), ] ) expected_root = [5.0, 7.0, 9.0] diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..6897acc --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,37 @@ +import pytest +import ray +import torch + +# Default NPU resource count for HCCL tests +DEFAULT_NPU_COUNT = 2 + + +@pytest.fixture(scope="session") +def ray_cluster_with_npu(): + """ + Initialize Ray cluster with NPU resources for HCCL tests. + + This fixture provides a Ray cluster initialized with NPU resources + required for HCCL (Huawei Collective Communication Library) operations. + Raises an error if torch_npu is not available or insufficient NPU devices. + """ + if not hasattr(torch, "npu"): + raise RuntimeError( + "torch.npu is not available. " + "Please install torch_npu following the guide at: " + "https://gitcode.com/Ascend/pytorch" + ) + if torch.npu.device_count() < DEFAULT_NPU_COUNT: + raise RuntimeError( + f"Not enough NPU devices for HCCL tests. " + f"Required: {DEFAULT_NPU_COUNT}, Available: {torch.npu.device_count()}" + ) + if not ray.is_initialized(): + try: + ray.init(ignore_reinit_error=True, resources={"NPU": DEFAULT_NPU_COUNT}) + except ValueError: + # Likely connecting to an existing cluster; do not pass resources. + ray.init(ignore_reinit_error=True) + yield + if ray.is_initialized(): + ray.shutdown() diff --git a/tests/direct_transport/test_hccl_tensor_transport.py b/tests/direct_transport/test_hccl_tensor_transport.py new file mode 100644 index 0000000..b11b183 --- /dev/null +++ b/tests/direct_transport/test_hccl_tensor_transport.py @@ -0,0 +1,64 @@ +import pytest + +pytest.importorskip( + "ray.util.collective.backend_registry", + reason="HCCL tensor transport requires Ray >= 2.56", +) + +import ray +import torch +from ray.experimental.collective import create_collective_group + +from ray_ascend import register_hccl_tensor_transport + +register_hccl_tensor_transport() + + +@pytest.fixture(scope="session") +def actors(ray_cluster_with_npu): + """Create actors with HCCL tensor transport registered.""" + world_size = 2 + group_name = "hccl_transport_group" + actors = [ + HCCLTensorTransportTestActor.remote(group_name) for _ in range(world_size) + ] + + # Create collective group using Ray's experimental interface + create_collective_group( + actors=actors, + backend="HCCL", + name=group_name, + ) + + yield actors + + +@ray.remote(resources={"NPU": 1}) +class HCCLTensorTransportTestActor: + """Test actor for HCCL tensor transport.""" + + def __init__(self, group_name): + register_hccl_tensor_transport() + self.group_name = group_name + + @ray.method(tensor_transport="HCCL") + def create_tensor(self): + """Return a random tensor on NPU.""" + return torch.tensor([1, 2, 3]).npu() + + def sum(self, tensor: torch.Tensor): + """Return sum of tensor elements.""" + return torch.sum(tensor) + + +def test_hccl_tensor_transport(actors): + """Test basic tensor transport between actors.""" + sender, receiver = actors[0], actors[1] + + # Send tensor from sender to receiver + tensor = sender.create_tensor.remote() + result = receiver.sum.remote(tensor) + + expected_sum = 6 # 1 + 2 + 3 + actual_sum = ray.get(result) + assert actual_sum == expected_sum, f"Expected sum {expected_sum}, got {actual_sum}"