Skip to content

feat(experimental): integrate Ray RDT for weight syncing#1305

Open
KaisennHu wants to merge 1 commit into
areal-project:mainfrom
KaisennHu:feat/integrate-rdt
Open

feat(experimental): integrate Ray RDT for weight syncing#1305
KaisennHu wants to merge 1 commit into
areal-project:mainfrom
KaisennHu:feat/integrate-rdt

Conversation

@KaisennHu
Copy link
Copy Markdown

@KaisennHu KaisennHu commented May 6, 2026

Description

This PR implements the RDT (Ray Direct Transport) weight syncing backend

Core changes:

  • IW Scheduler Bridge (rdt_scheduler.py): TransferPlan shard selection + Ray RPC weight pull
  • TW Adapter (rdt/fsdp_adapter.py): FSDP weight metadata extraction + actor handle serialization
  • HTTP Endpoints: TW Flask blueprint + IW FastAPI endpoints
  • Gateway (app.py): RDT mode /connect and /update_weights flow
  • Tensor Transport (ray_rpc_server.py): @ray.method(tensor_transport="YR"|"NIXL") decorated methods

Key features:

  • Supports YR (NPU) and NIXL (GPU) one-sided tensor transport
  • Independent implementation (RDT prefix), no coupling with awex classes
  • Uses TransferPlan.inter_operations for correct TW shard selection

Related Issue

Fixes #1243

Type of Change

  • ✨ New feature

Checklist

  • I have read the Contributing Guide
  • Pre-commit hooks pass (pre-commit run --all-files)
  • Relevant tests pass; new tests added for new functionality
  • Documentation updated (if applicable; built with ./docs/build_all.sh)
  • Branch is up to date with main
  • Self-reviewed via /review-pr command
  • This PR was created by a coding agent via /create-pr
  • This PR is a breaking change

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces the Ray Direct Transport (RDT) weight update backend, which utilizes one-sided RDMA (YR for NPU, NIXL for GPU) for weight synchronization between training and inference workers. Key additions include new HTTP endpoints for both services, a scheduler bridge for the inference service, and an FSDP adapter for the training service. Feedback highlights a potential bug in response handling within the gateway, opportunities to reduce code duplication in parameter unfusing logic, and a suggestion to make an internal dispatch method private to prevent API misuse.

Comment thread areal/experimental/weight_update/gateway/app.py
Comment thread areal/experimental/inference_service/sglang/rdt_scheduler.py Outdated
Comment thread areal/infra/rpc/ray_rpc_server.py Outdated
@KaisennHu KaisennHu force-pushed the feat/integrate-rdt branch 2 times, most recently from 770271d to d3547aa Compare May 10, 2026 16:10
@KaisennHu KaisennHu force-pushed the feat/integrate-rdt branch 22 times, most recently from 980b657 to 44cbb72 Compare May 14, 2026 08:46
@KaisennHu KaisennHu force-pushed the feat/integrate-rdt branch 16 times, most recently from 50217db to b380f11 Compare May 20, 2026 03:07
@KaisennHu
Copy link
Copy Markdown
Author

Hi @garrett4wade, the test_rdt_integration.py, test_sglang_integration.py and test_wu_controller.py have been verified. Would you mind taking a look when you get a chance? Thanks.

@KaisennHu KaisennHu force-pushed the feat/integrate-rdt branch 10 times, most recently from e963618 to c0ff6b3 Compare May 25, 2026 08:24
)

WEIGHT_UPDATE_BACKEND_ENV = "AREAL_WEIGHT_UPDATE_BACKEND"
BACKEND_AWEX = "awex"
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This environment variable is currently only used in scheduler.py, while others are still hard-code. It is recommended to unify them.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Thanks for reviewing.

Signed-off-by: Haichuan Hu <kaisennhu@gmail.com>
@KaisennHu
Copy link
Copy Markdown
Author

KaisennHu commented May 27, 2026

Hi @garrett4wade @sitabulaixizawaluduo, I benchmarked AWEX (118ms) and RDT (307ms) backend performance:

(AReaL) 20260521-16:46:57.646 WeightUpdateController INFO: Connected pair 'test_megatron_dp_e2e' (mode=awex, colocate=False)
(AReaL) 20260521-16:46:57.762 AwexSGLangAdapter INFO: [Awex-IW-Timing] prep=0.0ms | get_params=1.9ms | build_ops=3.0ms | nccl_recv=103.2ms | copy_non_contiguous=0.0ms | barrier=0.1ms | total=108.4ms
(AReaL) 20260521-16:46:57.762 AwexMegatronAdapter INFO: [Awex-TW-Timing] prep=0.0ms | get_params=89.5ms | build_ops=5.3ms | nccl_send=13.3ms | barrier=0.2ms | total=108.4ms
(AReaL) 20260521-16:46:57.767 WeightUpdateGateway INFO: Weight update completed for pair 'test_megatron_dp_e2e' v1 (118.0ms)

(AReaL) 20260526-09:51:14.894 WeightUpdateGateway INFO: Connected RDT pair 'test_rdt_megatron_dp_e2e'
(AReaL) 20260526-09:51:14.895 WeightUpdateController INFO: Connected pair 'test_rdt_megatron_dp_e2e' (mode=rdt, colocate=False)
(WeightTransportActor pid=4050813) 2026-05-26 09:51:11 NIXL INFO    _api.py:369 Backend UCX was instantiated
(WeightTransportActor pid=4050813) 2026-05-26 09:51:11 NIXL INFO    _api.py:247 Initialized NIXL agent: a8fba89883cffc63ebcb4ca924000000
(AReaL) 20260526-09:51:14.987 RDTTWBlueprint INFO: [RDT-TW-Timing] get_params=34.7ms | slice_ipc=10.7ms | store_handles=36.2ms | total=81.7ms
(AReaL) 20260526-09:51:14.987 RDTTWBlueprint INFO: [RDT-TW] Prepared weights for pair 'test_rdt_megatron_dp_e2e' v1
(AReaL) 20260526-09:51:14.993 RDTSGLangAdapter INFO: TransferPlan: send_ranks=[1], tw_indices=[0], infer_world_size=1
(AReaL) 20260526-09:51:14.993 RDTSGLangAdapter INFO: [RDT-IW] Pulling from TW shards [0] for pair 'test_rdt_megatron_dp_e2e' v1
(AReaL) 20260526-09:51:14.996 RDTSGLangAdapter INFO: [RDT-IW] Submitted 1 RPCs, calling ray.get...
(AReaL) 20260526-09:51:15.169 RDTSGLangAdapter INFO: [RDT-IW] Unpacked 1 buffers, total tensors=310, total_bytes=1136.9MB, unpack_time=4.3ms
(AReaL) 20260526-09:51:15.179 RDTSGLangAdapter INFO: Applied TransferPlan: 0 non-contiguous pairs handled
(AReaL) 20260526-09:51:15.198 RDTSGLangAdapter INFO: [RDT-IW-Timing] prep=0.2ms | rpc_submit=2.6ms | ray_get=168.5ms | unpack=4.3ms | apply_model=11.0ms | cleanup=18.9ms | total=205.5ms
(AReaL) 20260526-09:51:15.202 WeightUpdateGateway INFO: RDT timing breakdown: tw_prepare_ipc_handles=87.3ms, iw_pull_weights=211.9ms
(AReaL) 20260526-09:51:15.206 WeightUpdateGateway INFO: Weight update completed for pair 'test_rdt_megatron_dp_e2e' v1 (307.4ms)

RDT currently underperforms because TW and IW run as standalone processes rather than native Ray Actors, introducing the following overhead:

  1. Extra Actor: Requires creating a dedicated Ray Actor on the TW side for transmission.
  2. Extra Copy: Weights must be exposed to the actor via CUDA IPC, plus an additional memory copy for NIXL transfer (since NIXL cannot use borrowed tensors).

RDT offers plug-and-play flexibility for dynamic node scaling without extra communication groups, provided that IW and TW are implemented as Ray Actors.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[RFC] Integrate Ray Core RDT for Weight Syncing

2 participants