diff --git a/conf/experimental/sglang/test_scenario/sglang-heavy.toml b/conf/experimental/sglang/test_scenario/sglang-heavy.toml new file mode 100644 index 000000000..bacb72503 --- /dev/null +++ b/conf/experimental/sglang/test_scenario/sglang-heavy.toml @@ -0,0 +1,38 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name = "sglang-heavy-multinode" + +[[Tests]] +id = "sglang.heavy.disagg.8nodes.4p4d" +test_name = "sglang" +num_nodes = 8 +time_limit = "01:30:00" + + [Tests.cmd_args.prefill] + num_nodes = 4 + gpu_ids = "0,1,2,3" + tp = 16 + mem_fraction_static = 0.75 + + [Tests.cmd_args.decode] + num_nodes = 4 + gpu_ids = "0,1,2,3" + tp = 16 + mem_fraction_static = 0.75 + + [Tests.extra_env_vars] + CUDA_VISIBLE_DEVICES = "0,1,2,3" diff --git a/conf/experimental/sglang/test_scenario/sglang.toml b/conf/experimental/sglang/test_scenario/sglang.toml index b6f96f4e2..6213574c3 100644 --- a/conf/experimental/sglang/test_scenario/sglang.toml +++ b/conf/experimental/sglang/test_scenario/sglang.toml @@ -23,8 +23,12 @@ num_nodes = 2 time_limit = "00:10:00" [Tests.cmd_args.decode] + tensor_parallel_size = 8 mem_fraction_static = 0.75 + [Tests.extra_env_vars] + CUDA_VISIBLE_DEVICES = "0,1,2,3" + [[Tests]] id = "sglang.agg.1node" test_name = "sglang" @@ -32,8 +36,12 @@ num_nodes = 1 time_limit = "00:10:00" [Tests.cmd_args.decode] + tensor_parallel_size = 4 mem_fraction_static = 0.75 + [Tests.extra_env_vars] + CUDA_VISIBLE_DEVICES = "0,1,2,3" + [[Tests]] id = "sglang.disagg.sync" test_name = "sglang" @@ -50,6 +58,9 @@ time_limit = "00:10:00" tensor_parallel_size = 2 mem_fraction_static = 0.75 + [Tests.extra_env_vars] + CUDA_VISIBLE_DEVICES = "0,1,2,3" + [[Tests]] id = "sglang.disagg.async" test_name = "sglang" @@ -66,6 +77,9 @@ time_limit = "00:10:00" tensor_parallel_size = 2 mem_fraction_static = 0.75 + [Tests.extra_env_vars] + CUDA_VISIBLE_DEVICES = "0,1,2,3" + [[Tests]] id = "sglang.disagg.2nodes" test_name = "sglang" @@ -79,3 +93,6 @@ time_limit = "00:10:00" [Tests.cmd_args.decode] tensor_parallel_size = 4 mem_fraction_static = 0.75 + + [Tests.extra_env_vars] + CUDA_VISIBLE_DEVICES = "0,1,2,3" diff --git a/conf/experimental/vllm/test/vllm.toml b/conf/experimental/vllm/test/vllm.toml index a8061099c..8e6581653 100644 --- a/conf/experimental/vllm/test/vllm.toml +++ b/conf/experimental/vllm/test/vllm.toml @@ -25,6 +25,7 @@ mount_as = "/vllm_repo" [cmd_args] docker_image_url = "nvcr.io#nvidia/ai-dynamo/vllm-runtime:1.1.1" +serve_healthcheck = "/health" [semantic_eval_cmd_args] entrypoint = "python3 /vllm_repo/tests/evals/gsm8k/gsm8k_eval.py" diff --git a/conf/experimental/vllm/test_scenario/vllm-heavy.toml b/conf/experimental/vllm/test_scenario/vllm-heavy.toml new file mode 100644 index 000000000..0f1b1f92a --- /dev/null +++ b/conf/experimental/vllm/test_scenario/vllm-heavy.toml @@ -0,0 +1,57 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name = "vllm-heavy-multinode" + +[[Tests]] +id = "vllm.heavy.disagg.8nodes.4p4d" +test_name = "vllm" +num_nodes = 8 +time_limit = "01:30:00" + + [Tests.cmd_args] + model = "Qwen/Qwen3-8B" + + [Tests.cmd_args.prefill] + num_nodes = 4 + gpu_ids = "0,1,2,3" + enforce_eager = "" + tensor_parallel_size = 16 + max_num_batched_tokens = 8192 + max_model_len = 8192 + + [Tests.cmd_args.prefill.ray_head] + num_gpus = 4 + + [Tests.cmd_args.prefill.ray_worker] + num_gpus = 4 + + [Tests.cmd_args.decode] + num_nodes = 4 + gpu_ids = "0,1,2,3" + enforce_eager = "" + tensor_parallel_size = 16 + max_num_batched_tokens = 8192 + max_model_len = 8192 + + [Tests.cmd_args.decode.ray_head] + num_gpus = 4 + + [Tests.cmd_args.decode.ray_worker] + num_gpus = 4 + + [Tests.extra_env_vars] + CUDA_VISIBLE_DEVICES = "0,1,2,3" diff --git a/conf/experimental/vllm/test_scenario/vllm.toml b/conf/experimental/vllm/test_scenario/vllm.toml index 8e1207221..828c9a815 100644 --- a/conf/experimental/vllm/test_scenario/vllm.toml +++ b/conf/experimental/vllm/test_scenario/vllm.toml @@ -16,6 +16,34 @@ name = "vllm" +[[Tests]] +id = "vllm.agg.1node" +test_name = "vllm" +num_nodes = 1 +time_limit = "00:10:00" + + [Tests.cmd_args.decode] + enforce_eager = "" + tensor_parallel_size = 4 + max_num_batched_tokens = 1024 + + [Tests.extra_env_vars] + CUDA_VISIBLE_DEVICES = "0,1,2,3" + +[[Tests]] +id = "vllm.agg.2nodes" +test_name = "vllm" +num_nodes = 2 +time_limit = "00:30:00" + + [Tests.cmd_args.decode] + enforce_eager = "" + tensor_parallel_size = 8 + max_num_batched_tokens = 1024 + + [Tests.extra_env_vars] + CUDA_VISIBLE_DEVICES = "0,1,2,3" + [[Tests]] id = "vllm.disagg.sync" test_name = "vllm" @@ -24,14 +52,17 @@ time_limit = "00:30:00" [Tests.cmd_args.prefill] enforce_eager = "" - tensor_parallel_size = 2 + tensor_parallel_size = 4 max_num_batched_tokens = 1024 [Tests.cmd_args.decode] enforce_eager = "" - tensor_parallel_size = 2 + tensor_parallel_size = 4 max_num_batched_tokens = 1024 + [Tests.extra_env_vars] + CUDA_VISIBLE_DEVICES = "0,1,2,3" + [[Tests]] id = "vllm.disagg.async" test_name = "vllm" @@ -41,7 +72,7 @@ time_limit = "00:10:00" [Tests.cmd_args.prefill] gpu_ids = "0,1" enforce_eager = "" - tensor_parallel_size = 1 + tensor_parallel_size = 2 max_num_batched_tokens = 1024 [Tests.cmd_args.decode] @@ -49,3 +80,6 @@ time_limit = "00:10:00" enforce_eager = "" tensor_parallel_size = 2 max_num_batched_tokens = 1024 + + [Tests.extra_env_vars] + CUDA_VISIBLE_DEVICES = "0,1,2,3" diff --git a/doc/workloads/sglang.rst b/doc/workloads/sglang.rst index cdbd5cff1..9250e65b0 100644 --- a/doc/workloads/sglang.rst +++ b/doc/workloads/sglang.rst @@ -96,12 +96,26 @@ The ``cli`` string supports ``{model}``, ``{host}``, ``{port}``, ``{url}``, ``{o placeholders. +Readiness health checks +----------------------- +Healthcheck fields: + +- ``healthcheck``: aggregated server and disaggregated router endpoint, default ``/v1/models``. +- ``serve_healthcheck``: optional override for serve, prefill, and decode servers. + +If ``serve_healthcheck`` is omitted, disaggregated prefill/decode servers keep the legacy ``/health`` endpoint. + + Control number of GPUs ---------------------- -The number of GPUs can be controlled using the options below, listed from lowest to highest priority: +GPU selection priority, from lowest to highest: + 1. ``gpus_per_node`` system property (scalar value) -2. ``CUDA_VISIBLE_DEVICES`` environment variable (comma-separated list of GPU IDs) -3. ``gpu_ids`` command argument for ``prefill`` and ``decode`` configurations (comma-separated list of GPU IDs). If disaggregated mode is used (``prefill`` is set), both ``prefill`` and ``decode`` should define ``gpu_ids``, or none of them should set it. +2. ``decode.gpu_ids`` command argument in non-disaggregated mode when ``CUDA_VISIBLE_DEVICES`` is not set +3. ``CUDA_VISIBLE_DEVICES`` environment variable (comma-separated list of GPU IDs) +4. ``gpu_ids`` command argument for both ``prefill`` and ``decode`` configurations in disaggregated mode + +In disaggregated mode, define both ``prefill.gpu_ids`` and ``decode.gpu_ids``, or omit both. Control disaggregation @@ -135,6 +149,39 @@ For more control, one can specify the GPU IDs explicitly in ``prefill`` and ``de In this case ``CUDA_VISIBLE_DEVICES`` will be ignored and only the GPUs specified in ``gpu_ids`` will be used. +Multi-node serving +------------------ +For non-disaggregated ``num_nodes > 1``, CloudAI starts one ``sglang.launch_server`` task per serving node with shared +``--dist-init-addr``, ``--nnodes``, and ``--node-rank "$SLURM_PROCID"``. + +For disaggregated serving over more than two nodes, set explicit role sizes: + +- ``prefill.num_nodes + decode.num_nodes`` must equal the test ``num_nodes``. +- CloudAI assigns contiguous node slices: prefill first, decode second. +- ``tp`` is total per role, not per node. +- ``CUDA_VISIBLE_DEVICES`` and ``gpu_ids`` are local GPU IDs on each serving node. + +Example: four prefill nodes and four decode nodes, each with four visible GPUs: + +.. code-block:: toml + :caption: scenario.toml (multi-node disaggregated serving) + + [[Tests]] + id = "sglang.pd_multi_node" + num_nodes = 8 + test_template_name = "sglang" + + [Tests.cmd_args.prefill] + num_nodes = 4 + tp = 16 + + [Tests.cmd_args.decode] + num_nodes = 4 + tp = 16 + + [Tests.extra_env_vars] + CUDA_VISIBLE_DEVICES = "0,1,2,3" + API Documentation ----------------- diff --git a/doc/workloads/vllm.rst b/doc/workloads/vllm.rst index 57773992f..a57486773 100644 --- a/doc/workloads/vllm.rst +++ b/doc/workloads/vllm.rst @@ -93,11 +93,14 @@ placeholders. Controlling the Number of GPUs ------------------------------- -The number of GPUs can be controlled using the options below, listed from lowest to highest priority: +GPU selection priority, from lowest to highest: + 1. ``gpus_per_node`` system property (scalar value) -2. ``CUDA_VISIBLE_DEVICES`` environment variable (comma-separated list of GPU IDs) -3. ``gpu_ids`` command argument for ``prefill`` and ``decode`` configurations (comma-separated list of GPU IDs). If disaggregated mode is used (``prefill`` is set), both ``prefill`` and ``decode`` should define ``gpu_ids``, or none of them should set it. +2. ``decode.gpu_ids`` command argument in non-disaggregated mode when ``CUDA_VISIBLE_DEVICES`` is not set +3. ``CUDA_VISIBLE_DEVICES`` environment variable (comma-separated list of GPU IDs) +4. ``gpu_ids`` command argument for both ``prefill`` and ``decode`` configurations in disaggregated mode +In disaggregated mode, define both ``prefill.gpu_ids`` and ``decode.gpu_ids``, or omit both. Controlling Disaggregation -------------------------- @@ -133,6 +136,53 @@ For more control, users can specify the GPU IDs explicitly in ``prefill`` and `` In this case ``CUDA_VISIBLE_DEVICES`` will be ignored and only the GPUs specified in ``gpu_ids`` will be used. +Multi-node serving +------------------ +For non-disaggregated ``num_nodes > 1``, CloudAI creates one Ray cluster and starts ``vllm serve`` on the head node with +``--distributed-executor-backend ray``. + +For disaggregated serving over more than two nodes, set explicit role sizes: + +- ``prefill.num_nodes + decode.num_nodes`` must equal the test ``num_nodes``. +- CloudAI assigns contiguous node slices: prefill first, decode second. +- ``tensor_parallel_size`` is total per role, not per node. +- ``CUDA_VISIBLE_DEVICES`` and ``gpu_ids`` are local GPU IDs on each serving node. + +Example: four prefill nodes and four decode nodes, each with four visible GPUs: + +.. code-block:: toml + :caption: scenario.toml (multi-node disaggregated serving) + + [[Tests]] + id = "vllm.pd_multi_node" + num_nodes = 8 + test_template_name = "vllm" + + [Tests.cmd_args.prefill] + num_nodes = 4 + tensor_parallel_size = 16 + + [Tests.cmd_args.decode] + num_nodes = 4 + tensor_parallel_size = 16 + + [Tests.extra_env_vars] + CUDA_VISIBLE_DEVICES = "0,1,2,3" + + +Readiness health checks +----------------------- +Healthcheck fields: + +- ``healthcheck``: aggregated server endpoint, default ``/healthcheck``. +- ``serve_healthcheck``: optional override for serve, prefill, and decode servers. +- ``proxy_healthcheck``: disaggregated proxy/router endpoint, default ``/healthcheck``. + +If ``serve_healthcheck`` is omitted, disaggregated prefill/decode servers keep the legacy ``/health`` endpoint. If a +disaggregated config sets ``healthcheck`` but omits ``proxy_healthcheck``, the proxy/router uses ``healthcheck`` for +backward compatibility. + + Controlling ``proxy_script`` ----------------------------- ``proxy_script`` is used to proxy the requests from the client to the prefill and decode instances. It is ignored for non-disaggregated mode. Default value can be found below. diff --git a/src/cloudai/workloads/common/llm_serving.py b/src/cloudai/workloads/common/llm_serving.py index 30a6943c1..7abe4430b 100644 --- a/src/cloudai/workloads/common/llm_serving.py +++ b/src/cloudai/workloads/common/llm_serving.py @@ -60,8 +60,14 @@ def parse_gpu_ids(gpu_ids: str | list[str] | None) -> list[int]: def all_gpu_ids(tdef: LLMServingTestDefinition[LLMServingCmdArgsT], system_gpus_per_node: int | None) -> list[int]: cuda_devices = str(tdef.extra_env_vars.get("CUDA_VISIBLE_DEVICES", "")) - if (tdef.cmd_args.prefill and tdef.cmd_args.prefill.gpu_ids) and tdef.cmd_args.decode.gpu_ids: - return parse_gpu_ids(tdef.cmd_args.prefill.gpu_ids) + parse_gpu_ids(tdef.cmd_args.decode.gpu_ids) + if tdef.cmd_args.prefill: + if tdef.cmd_args.prefill.gpu_ids and tdef.cmd_args.decode.gpu_ids: + return parse_gpu_ids(tdef.cmd_args.prefill.gpu_ids) + parse_gpu_ids(tdef.cmd_args.decode.gpu_ids) + else: + if cuda_devices: + return parse_gpu_ids(cuda_devices) + if tdef.cmd_args.decode.gpu_ids: + return parse_gpu_ids(tdef.cmd_args.decode.gpu_ids) if cuda_devices: return parse_gpu_ids(cuda_devices) return list(range(system_gpus_per_node or 1)) @@ -78,7 +84,7 @@ def calculate_prefill_gpu_ids( return parse_gpu_ids(tdef.cmd_args.prefill.gpu_ids) gpu_ids = all_gpu_ids(tdef, system_gpus_per_node) - if num_nodes == 2: + if num_nodes > 1 or tdef.cmd_args.prefill.num_nodes is not None: return gpu_ids mid = len(gpu_ids) // 2 return gpu_ids[:mid] @@ -95,7 +101,7 @@ def calculate_decode_gpu_ids( gpu_ids = all_gpu_ids(tdef, system_gpus_per_node) if not tdef.cmd_args.prefill: return gpu_ids - if num_nodes == 2: + if num_nodes > 1 or tdef.cmd_args.decode.num_nodes is not None: return gpu_ids mid = len(gpu_ids) // 2 return gpu_ids[mid:] @@ -107,11 +113,15 @@ class LLMServingArgs(CmdArgs): gpu_ids: str | list[str] | None = Field( default=None, description="Comma-separated GPU IDs. If not set, all available GPUs will be used." ) + num_nodes: int | list[int] | None = Field( + default=None, + description="Number of Slurm nodes assigned to this role in disaggregated serving mode.", + ) @property def serve_args_exclude(self) -> set[str]: """Fields consumed internally and excluded from generic serve args.""" - return {"gpu_ids"} + return {"gpu_ids", "num_nodes"} def serialize_serve_arg(self, key: str, value: Any) -> list[str]: """Serialize a single serve argument to CLI tokens.""" @@ -139,7 +149,11 @@ class LLMServingCmdArgs(CmdArgs, Generic[LLMServingArgsT]): default=None, description="Hostname used by the benchmark client. Defaults to the allocated node hostname.", ) - healthcheck: str = Field(default="") + healthcheck: str = Field(default="/health") + serve_healthcheck: str | None = Field( + default=None, + description="Readiness endpoint for serve, prefill, and decode server processes. Defaults to healthcheck.", + ) serve_wait_seconds: int = 300 prefill: LLMServingArgsT | None = Field(default=None) decode: LLMServingArgsT @@ -369,15 +383,52 @@ def gpu_ids(self) -> list[int]: def is_disaggregated(self) -> bool: return self.tdef.cmd_args.prefill is not None + @staticmethod + def _role_num_nodes(value: int | list[int] | None, role: str) -> int | None: + if isinstance(value, list): + raise ValueError(f"{role}.num_nodes must be a single integer for command generation.") + return value + @property - def is_two_node_disaggregated(self) -> bool: - if not self.is_disaggregated: - return False + def aggregated_node_count(self) -> int: + num_nodes, _ = self.get_cached_nodes_spec() + return num_nodes + + def disaggregated_role_node_counts(self) -> tuple[int, int]: + if not self.is_disaggregated or self.tdef.cmd_args.prefill is None: + return (0, 0) num_nodes, _ = self.get_cached_nodes_spec() - if num_nodes not in (1, 2): - raise ValueError(f"Disaggregated {self.workload_name} supports only 1 or 2 nodes, got {num_nodes}.") - return num_nodes == 2 + prefill_nodes = self._role_num_nodes(self.tdef.cmd_args.prefill.num_nodes, "prefill") + decode_nodes = self._role_num_nodes(self.tdef.cmd_args.decode.num_nodes, "decode") + + if prefill_nodes is None and decode_nodes is None: + if num_nodes in (1, 2): + return (1, 1) + raise ValueError( + f"Disaggregated {self.workload_name} over more than 2 nodes requires both " + "prefill.num_nodes and decode.num_nodes." + ) + if prefill_nodes is None or decode_nodes is None: + raise ValueError("Both prefill.num_nodes and decode.num_nodes must be set or both must be omitted.") + if prefill_nodes <= 0 or decode_nodes <= 0: + raise ValueError("prefill.num_nodes and decode.num_nodes must be positive integers.") + if prefill_nodes + decode_nodes != num_nodes: + raise ValueError( + f"prefill.num_nodes + decode.num_nodes must equal allocated nodes ({num_nodes}), " + f"got {prefill_nodes + decode_nodes}." + ) + return (prefill_nodes, decode_nodes) + + def role_node_count(self, role: str) -> int: + if role == "serve": + return self.aggregated_node_count + prefill_nodes, decode_nodes = self.disaggregated_role_node_counts() + if role == "prefill": + return prefill_nodes + if role == "decode": + return decode_nodes + raise ValueError(f"Unknown serving role: {role}") @property def prefill_gpu_ids(self) -> list[int]: @@ -387,13 +438,22 @@ def prefill_gpu_ids(self) -> list[int]: def decode_gpu_ids(self) -> list[int]: return calculate_decode_gpu_ids(self.tdef, self.test_run.nnodes, self.system.gpus_per_node) - def _disagg_srun_prefix(self, relative: int | None = None) -> str: - srun_command_parts = self.gen_srun_prefix(with_num_nodes=(relative is None)) - srun_command_parts.extend(["--overlap", "--ntasks-per-node=1", "--ntasks=1"]) - if relative is not None: - srun_command_parts.extend([f"--relative={relative}", "-N1"]) + def _role_srun_prefix(self, nodelist_expr: str, node_count: int = 1, task_count: int = 1) -> str: + srun_command_parts = self.gen_srun_prefix(with_num_nodes=False) + srun_command_parts.extend( + [ + "--overlap", + f'--nodelist="{nodelist_expr}"', + f"--nodes={node_count}", + f"--ntasks={task_count}", + "--ntasks-per-node=1", + ] + ) return " ".join(srun_command_parts) + def _single_role_srun_prefix(self, node_var: str) -> str: + return self._role_srun_prefix(f"${{{node_var}}}") + @staticmethod def _with_env(command: list[str], env_vars: dict[str, str]) -> str: if not env_vars: @@ -438,27 +498,49 @@ def bench_host(self) -> str: return "${PREFILL_NODE}" return "${NODE}" - def generate_disaggregated_node_setup(self) -> str: - if not self.is_disaggregated: + def generate_aggregated_node_setup(self, node_count: int) -> str: + if node_count <= 1: return "" - decode_node_check = "" - if self.is_two_node_disaggregated: - decode_node_check = f"""\ -if [ -z "${{NODES[1]}}" ]; then - echo "Expected 2 allocated nodes for disaggregated {self.workload_name}, got: ${{NODES[*]}}" + return f"""\ +NODES=( $(scontrol show hostname $SLURM_JOB_NODELIST) ) +SERVE_NODES=( "${{NODES[@]:0:{node_count}}}" ) +if [ "${{#SERVE_NODES[@]}}" -ne {node_count} ]; then + echo "Expected {node_count} allocated nodes for {self.workload_name}, got: ${{NODES[*]}}" exit 1 fi +export SERVE_NODE=${{SERVE_NODES[0]}} +export NODE=$SERVE_NODE +SERVE_NODELIST=$(IFS=,; echo "${{SERVE_NODES[*]}}") +echo "Node roles: serve=${{SERVE_NODES[*]}}" + """ + + def generate_disaggregated_node_setup(self) -> str: + if not self.is_disaggregated: + return "" + allocated_nodes, _ = self.get_cached_nodes_spec() + prefill_nodes, decode_nodes = self.disaggregated_role_node_counts() + decode_start = 0 if allocated_nodes == 1 and prefill_nodes == 1 and decode_nodes == 1 else prefill_nodes + role_error = ( + f"Expected {prefill_nodes} prefill and {decode_nodes} decode nodes for disaggregated {self.workload_name}" + ) return f"""\ NODES=( $(scontrol show hostname $SLURM_JOB_NODELIST) ) -export PREFILL_NODE=${{NODES[0]}} -export DECODE_NODE=${{NODES[1]:-${{PREFILL_NODE}}}} -if [ -z "$PREFILL_NODE" ]; then +PREFILL_NODES=( "${{NODES[@]:0:{prefill_nodes}}}" ) +DECODE_NODES=( "${{NODES[@]:{decode_start}:{decode_nodes}}}" ) +if [ "${{#PREFILL_NODES[@]}}" -ne {prefill_nodes} ] || [ "${{#DECODE_NODES[@]}}" -ne {decode_nodes} ]; then + echo "{role_error}, got: ${{NODES[*]}}" + exit 1 +fi +export PREFILL_NODE=${{PREFILL_NODES[0]}} +export DECODE_NODE=${{DECODE_NODES[0]}} +PREFILL_NODELIST=$(IFS=,; echo "${{PREFILL_NODES[*]}}") +DECODE_NODELIST=$(IFS=,; echo "${{DECODE_NODES[*]}}") +if [ -z "$PREFILL_NODE" ] || [ -z "$DECODE_NODE" ]; then echo "Failed to resolve allocated nodes for disaggregated {self.workload_name}" exit 1 fi -{decode_node_check}\ -echo "Node roles: prefill=$PREFILL_NODE decode=$DECODE_NODE" +echo "Node roles: prefill=${{PREFILL_NODES[*]}} decode=${{DECODE_NODES[*]}}" """ @@ -483,8 +565,7 @@ def generate_wait_for_health_function(self) -> str: return 1 }}""" - @staticmethod - def generate_cleanup_function(pid_vars: list[str], timeout: int = 15) -> str: + def generate_cleanup_function(self, pid_vars: list[str], timeout: int = 15) -> str: if len(pid_vars) == 1: pid_var = pid_vars[0] return f"""\ @@ -569,6 +650,15 @@ def proxy_router_log_file(self) -> str: """Helper process log file name.""" return f"{self.workload_slug}-{self.proxy_router_name}.log" + @property + def proxy_router_healthcheck(self) -> str: + """Healthcheck endpoint for the helper/proxy process in disaggregated mode.""" + return self.tdef.cmd_args.healthcheck + + def serve_healthcheck(self, role: str) -> str: + """Healthcheck endpoint for serve, prefill, and decode server processes.""" + return self.tdef.cmd_args.serve_healthcheck or self.tdef.cmd_args.healthcheck + @property def bench_log_file(self) -> str: """Benchmark log file name.""" @@ -597,6 +687,15 @@ def serve_port(self) -> int: def disaggregated_script_preamble(self) -> str: return "" + def aggregated_script_preamble(self) -> str: + return "" + + def aggregated_cleanup_pid_vars(self) -> list[str]: + return [self.serve_pid_var] + + def disaggregated_cleanup_pid_vars(self) -> list[str]: + return ["PREFILL_PID", "DECODE_PID", self.proxy_router_pid_var] + def aggregated_serve_env(self) -> dict[str, str]: return {} @@ -619,6 +718,23 @@ def get_semantic_eval_command(self) -> list[str] | None: """Return the optional semantic validation command.""" return None + def render_serve_launch( + self, + role: str, + command_tail: str, + pid_var: str, + log_file: str, + node_count: int, + head_node_var: str, + nodelist_var: str, + ) -> str: + del role, node_count, nodelist_var + return f"""\ +{self._single_role_srun_prefix(head_node_var)} \\ + --output={self.test_run.output_path.absolute()}/{log_file} \\ + {self._with_custom_bash(command_tail)} & +{pid_var}=$!""" + def _expand_semantic_eval_args(self, args: str, *, host: str) -> str: replacements = { "{model}": self.tdef.cmd_args.model, @@ -658,74 +774,117 @@ def _gen_llm_serving_srun_command(self, serve_commands: list[list[str]]) -> str: return self._gen_disaggregated_script(serve_commands, bench_cmd) def _gen_aggregated_script(self, serve_cmd: list[str], bench_cmd: str) -> str: - srun_prefix = " ".join(self.gen_srun_prefix()) + serve_node_count = self.role_node_count("serve") + legacy_single_node = serve_node_count == 1 + srun_prefix = ( + " ".join(self.gen_srun_prefix()) if legacy_single_node else self._single_role_srun_prefix("SERVE_NODE") + ) + host_setup = ( + "" if not legacy_single_node else "NODE=$(scontrol show hostname $SLURM_JOB_NODELIST | head -n 1)\n" + ) serve_cmd_with_env = self._with_env(serve_cmd, self.aggregated_serve_env()) health_func = self.generate_wait_for_health_function() wait_block = self.generate_wait_for_health_block( - self.workload_name, [f"http://${{NODE}}:{self.serve_port}{self.tdef.cmd_args.healthcheck}"] + self.workload_name, + [f"http://${{NODE}}:{self.serve_port}{self.serve_healthcheck('serve')}"], + host_setup=host_setup, ) + node_setup = self.generate_aggregated_node_setup(serve_node_count) + preamble = self.aggregated_script_preamble() + if legacy_single_node: + serve_launch = f"""\ +{srun_prefix} --overlap --ntasks-per-node=1 --ntasks=1 \\ + --output={(self.test_run.output_path / self.serve_log_file).absolute()} \\ + {self._with_custom_bash(serve_cmd_with_env)} & +{self.serve_pid_var}=$!""" + else: + serve_launch = self.render_serve_launch( + "serve", + serve_cmd_with_env, + self.serve_pid_var, + self.serve_log_file, + serve_node_count, + "SERVE_NODE", + "SERVE_NODELIST", + ) + semantic_prefix = ( + f"{srun_prefix} --overlap --ntasks-per-node=1 --ntasks=1" + if legacy_single_node + else self._single_role_srun_prefix("SERVE_NODE") + ) + bench_prefix = semantic_prefix return f"""\ -{self.generate_cleanup_function([self.serve_pid_var])} +{self.generate_cleanup_function(self.aggregated_cleanup_pid_vars())} {health_func} +{preamble}{node_setup}\ echo "Starting {self.workload_name} instances..." -{srun_prefix} --overlap --ntasks-per-node=1 --ntasks=1 \\ - --output={(self.test_run.output_path / self.serve_log_file).absolute()} \\ - {self._with_custom_bash(serve_cmd_with_env)} & -{self.serve_pid_var}=$! +{serve_launch} {wait_block} echo "Running benchmark..." -{srun_prefix} --overlap --ntasks-per-node=1 --ntasks=1 \\ +{bench_prefix} \\ --output={(self.test_run.output_path / self.bench_log_file).absolute()} \\ {self._with_custom_bash(bench_cmd)} -{self._gen_semantic_eval_block(f"{srun_prefix} --overlap --ntasks-per-node=1 --ntasks=1")}""".strip() +{self._gen_semantic_eval_block(semantic_prefix)}""".strip() def _gen_disaggregated_script(self, serve_commands: list[list[str]], bench_cmd: str) -> str: prefill_cmd, decode_cmd = serve_commands health_func = self.generate_wait_for_health_function() prefill_cmd_with_env = self._with_env(prefill_cmd, self.disaggregated_role_env("prefill", self.prefill_gpu_ids)) decode_cmd_with_env = self._with_env(decode_cmd, self.disaggregated_role_env("decode", self.decode_gpu_ids)) - prefill_srun_prefix = self._disagg_srun_prefix(0 if self.is_two_node_disaggregated else None) - decode_srun_prefix = self._disagg_srun_prefix(1 if self.is_two_node_disaggregated else None) + prefill_nodes, decode_nodes = self.disaggregated_role_node_counts() + prefill_srun_prefix = self._single_role_srun_prefix("PREFILL_NODE") helper_cmd = self.get_helper_command() node_setup = self.generate_disaggregated_node_setup() wait_block = self.generate_wait_for_health_block( self.workload_name, [ - f"http://{self.disaggregated_role_host('prefill')}:{self.prefill_port}/health", - f"http://{self.disaggregated_role_host('decode')}:{self.decode_port}/health", + f"http://{self.disaggregated_role_host('prefill')}:{self.prefill_port}{self.serve_healthcheck('prefill')}", + f"http://{self.disaggregated_role_host('decode')}:{self.decode_port}{self.serve_healthcheck('decode')}", ], host_setup="", host_display="$PREFILL_NODE and $DECODE_NODE", ) wait_block_helper = self.generate_wait_for_health_block( self.workload_name, - [f"http://{self.disaggregated_role_host('prefill')}:{self.serve_port}{self.tdef.cmd_args.healthcheck}"], + [f"http://{self.disaggregated_role_host('prefill')}:{self.serve_port}{self.proxy_router_healthcheck}"], host_setup="", host_display="$PREFILL_NODE server", ) preamble = self.disaggregated_script_preamble() + prefill_launch = self.render_serve_launch( + "prefill", + prefill_cmd_with_env, + "PREFILL_PID", + self.prefill_log_file, + prefill_nodes, + "PREFILL_NODE", + "PREFILL_NODELIST", + ) + decode_launch = self.render_serve_launch( + "decode", + decode_cmd_with_env, + "DECODE_PID", + self.decode_log_file, + decode_nodes, + "DECODE_NODE", + "DECODE_NODELIST", + ) return f"""\ -{self.generate_cleanup_function(["PREFILL_PID", "DECODE_PID", self.proxy_router_pid_var])} +{self.generate_cleanup_function(self.disaggregated_cleanup_pid_vars())} {health_func} {preamble}{node_setup}\ echo "Starting {self.workload_name} instances..." -{prefill_srun_prefix} \\ - --output={self.test_run.output_path.absolute()}/{self.prefill_log_file} \\ - {self._with_custom_bash(prefill_cmd_with_env)} & -PREFILL_PID=$! - -{decode_srun_prefix} \\ - --output={self.test_run.output_path.absolute()}/{self.decode_log_file} \\ - {self._with_custom_bash(decode_cmd_with_env)} & -DECODE_PID=$! +{prefill_launch} + +{decode_launch} {wait_block} diff --git a/src/cloudai/workloads/sglang/slurm_command_gen_strategy.py b/src/cloudai/workloads/sglang/slurm_command_gen_strategy.py index 7a7a97d5b..f1e576ac5 100644 --- a/src/cloudai/workloads/sglang/slurm_command_gen_strategy.py +++ b/src/cloudai/workloads/sglang/slurm_command_gen_strategy.py @@ -14,6 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import shlex from typing import cast from cloudai.workloads.common.llm_serving import LLMServingSlurmCommandGenStrategy @@ -133,5 +134,64 @@ def get_semantic_eval_command(self) -> list[str] | None: cli = self._expand_semantic_eval_args(eval_args.cli, host=host) return [eval_args.entrypoint, cli] if cli else [eval_args.entrypoint] + def serve_healthcheck(self, role: str) -> str: + if self.tdef.cmd_args.serve_healthcheck: + return self.tdef.cmd_args.serve_healthcheck + if role in {"prefill", "decode"}: + return "/health" + return self.tdef.cmd_args.healthcheck + def aggregated_serve_env(self) -> dict[str, str]: return {"CUDA_VISIBLE_DEVICES": ",".join(str(gpu_id) for gpu_id in self.gpu_ids)} + + def _needs_distributed_launch(self, role: str) -> bool: + return self.role_node_count(role) > 1 + + def aggregated_script_preamble(self) -> str: + if not self._needs_distributed_launch("serve"): + return "" + return """\ +export PORT_OFFSET=$((SLURM_JOB_ID % 1000)) +export SERVE_DIST_INIT_PORT=$((20000 + PORT_OFFSET)) + +""" + + def disaggregated_script_preamble(self) -> str: + if not (self._needs_distributed_launch("prefill") or self._needs_distributed_launch("decode")): + return "" + return """\ +export PORT_OFFSET=$((SLURM_JOB_ID % 1000)) +export PREFILL_DIST_INIT_PORT=$((20000 + PORT_OFFSET)) +export DECODE_DIST_INIT_PORT=$((21000 + PORT_OFFSET)) + +""" + + def render_serve_launch( + self, + role: str, + command_tail: str, + pid_var: str, + log_file: str, + node_count: int, + head_node_var: str, + nodelist_var: str, + ) -> str: + if node_count <= 1: + return super().render_serve_launch( + role, command_tail, pid_var, log_file, node_count, head_node_var, nodelist_var + ) + + role_prefix = role.upper() + dist_port_var = f"{role_prefix}_DIST_INIT_PORT" + custom_bash = self._custom_bash_for_command(command_tail) + custom_prefix = f"{custom_bash}; " if custom_bash else "" + dist_command = ( + f'{command_tail} --dist-init-addr "${{{head_node_var}}}:${{{dist_port_var}}}" ' + f'--nnodes {node_count} --node-rank "$SLURM_PROCID"' + ) + task_command = "bash -c " + shlex.quote(f"{custom_prefix}exec {dist_command}") + return f"""\ +{self._role_srun_prefix(f"${{{nodelist_var}}}", node_count, node_count)} \\ + --output={self.test_run.output_path.absolute()}/{log_file}-%N \\ + {task_command} & +{pid_var}=$!""" diff --git a/src/cloudai/workloads/vllm/__init__.py b/src/cloudai/workloads/vllm/__init__.py index bd809ecd0..86216d151 100644 --- a/src/cloudai/workloads/vllm/__init__.py +++ b/src/cloudai/workloads/vllm/__init__.py @@ -24,6 +24,7 @@ VllmArgs, VllmBenchCmdArgs, VllmCmdArgs, + VllmRayStartArgs, VllmSemanticEvalCmdArgs, VllmTestDefinition, ) @@ -40,6 +41,7 @@ "VllmArgs", "VllmBenchCmdArgs", "VllmCmdArgs", + "VllmRayStartArgs", "VllmSemanticEvalCmdArgs", "VllmSlurmCommandGenStrategy", "VllmTestDefinition", diff --git a/src/cloudai/workloads/vllm/slurm_command_gen_strategy.py b/src/cloudai/workloads/vllm/slurm_command_gen_strategy.py index 2f00e95f7..4b8470cd1 100644 --- a/src/cloudai/workloads/vllm/slurm_command_gen_strategy.py +++ b/src/cloudai/workloads/vllm/slurm_command_gen_strategy.py @@ -15,13 +15,16 @@ # limitations under the License. import json +import shlex from typing import Any, cast from cloudai.workloads.common.llm_serving import LLMServingSlurmCommandGenStrategy from .vllm import ( VLLM_BENCH_JSON_FILE, + VllmArgs, VllmCmdArgs, + VllmRayStartArgs, VllmSemanticEvalCmdArgs, VllmTestDefinition, ) @@ -42,42 +45,267 @@ def workload_name(self) -> str: def _to_json_str_arg(config: dict) -> str: return "'" + json.dumps(config, separators=(",", ":")) + "'" + @staticmethod + def _with_ray_backend(command: list[str], enabled: bool) -> list[str]: + if not enabled or "--distributed-executor-backend" in command: + return command + return [*command, "--distributed-executor-backend", "ray"] + + def _needs_ray(self, role: str) -> bool: + return self.role_node_count(role) > 1 + + @staticmethod + def _format_ray_value(value: Any, *, quote_strings: bool = True) -> str: + if isinstance(value, dict): + return shlex.quote(json.dumps(value, separators=(",", ":"))) + if quote_strings and isinstance(value, str): + return shlex.quote(value) + return str(value) + + @classmethod + def _serialize_ray_start_args(cls, args: dict[str, Any], *, quote_strings: bool = True) -> str: + parts: list[str] = [] + for key, value in args.items(): + if value is None: + continue + opt = f"--{key.replace('_', '-')}" + if isinstance(value, bool): + if value: + parts.append(opt) + continue + parts.append(f"{opt}={cls._format_ray_value(value, quote_strings=quote_strings)}") + return " ".join(parts) + + def _role_args(self, role: str) -> VllmArgs: + if role == "prefill": + if self.tdef.cmd_args.prefill is None: + raise ValueError("Prefill role requested for non-disaggregated vLLM.") + return self.tdef.cmd_args.prefill + return self.tdef.cmd_args.decode + + def _ray_start_args(self, role: str, kind: str, generated: dict[str, Any]) -> str: + role_args = self._role_args(role) + ray_args: VllmRayStartArgs | None = role_args.ray_head if kind == "head" else role_args.ray_worker + if ray_args is None: + return self._serialize_ray_start_args(generated, quote_strings=False) + + fields_set = ray_args.model_fields_set + user_args = ray_args.model_dump(exclude_none=True) + generated_args = {key: value for key, value in generated.items() if key not in fields_set} + return " ".join( + part + for part in ( + self._serialize_ray_start_args(generated_args, quote_strings=False), + self._serialize_ray_start_args(user_args), + ) + if part + ) + def get_serve_commands(self) -> list[list[str]]: tdef: VllmTestDefinition = cast(VllmTestDefinition, self.test_run.test) cmd_args: VllmCmdArgs = tdef.cmd_args base_cmd = ["vllm", "serve", cmd_args.model, "--host", self.bind_host] if not tdef.cmd_args.prefill: - return [[*base_cmd, *tdef.cmd_args.decode.serve_args, "--port", str(self.serve_port)]] + return [ + self._with_ray_backend( + [*base_cmd, *tdef.cmd_args.decode.serve_args, "--port", str(self.serve_port)], + self._needs_ray("serve"), + ) + ] commands: list[list[str]] = [] - for port, role, args in [ - (self.prefill_port, "kv_producer", tdef.cmd_args.prefill), - (self.decode_port, "kv_consumer", tdef.cmd_args.decode), + for port, role, kv_role, args in [ + (self.prefill_port, "prefill", "kv_producer", tdef.cmd_args.prefill), + (self.decode_port, "decode", "kv_consumer", tdef.cmd_args.decode), ]: - kv_transfer_config: dict[str, Any] = {"kv_connector": "NixlConnector", "kv_role": role} + kv_transfer_config: dict[str, Any] = {"kv_connector": "NixlConnector", "kv_role": kv_role} if args.nixl_threads is not None: kv_transfer_config["kv_connector_extra_config"] = {"num_threads": cast(int, args.nixl_threads)} commands.append( - [ - *base_cmd, - "--port", - str(port), - "--kv-transfer-config", - self._to_json_str_arg(kv_transfer_config), - *args.serve_args, - ] + self._with_ray_backend( + [ + *base_cmd, + "--port", + str(port), + "--kv-transfer-config", + self._to_json_str_arg(kv_transfer_config), + *args.serve_args, + ], + self._needs_ray(role), + ) ) return commands + def aggregated_script_preamble(self) -> str: + if not self._needs_ray("serve"): + return "" + return """\ +export PORT_OFFSET=$((SLURM_JOB_ID % 1000)) +export SERVE_RAY_PORT=$((6379 + PORT_OFFSET)) +""" + def disaggregated_script_preamble(self) -> str: + ray_preamble = "" + if self._needs_ray("prefill") or self._needs_ray("decode"): + ray_preamble = """\ +export PREFILL_RAY_PORT=$((6379 + PORT_OFFSET)) +export DECODE_RAY_PORT=$((7379 + PORT_OFFSET)) +""" return f"""\ export PORT_OFFSET=$((SLURM_JOB_ID % 1000)) export PREFILL_NIXL_PORT=$((5557 + PORT_OFFSET)) export DECODE_NIXL_PORT=$((5557 + PORT_OFFSET + {len(self.gpu_ids)})) -""" +{ray_preamble}""" + + def aggregated_cleanup_pid_vars(self) -> list[str]: + if not self._needs_ray("serve"): + return super().aggregated_cleanup_pid_vars() + return ["SERVE_RAY_PID", self.serve_pid_var] + + def disaggregated_cleanup_pid_vars(self) -> list[str]: + pid_vars = super().disaggregated_cleanup_pid_vars() + if self._needs_ray("prefill"): + pid_vars.insert(0, "PREFILL_RAY_PID") + if self._needs_ray("decode"): + insert_at = 1 if self._needs_ray("prefill") else 0 + pid_vars.insert(insert_at, "DECODE_RAY_PID") + return pid_vars + + @property + def proxy_router_healthcheck(self) -> str: + fields_set = self.tdef.cmd_args.model_fields_set + if "proxy_healthcheck" not in fields_set and "healthcheck" in fields_set: + return self.tdef.cmd_args.healthcheck + return self.tdef.cmd_args.proxy_healthcheck + + def serve_healthcheck(self, role: str) -> str: + if self.tdef.cmd_args.serve_healthcheck: + return self.tdef.cmd_args.serve_healthcheck + if role in {"prefill", "decode"}: + return "/health" + return self.tdef.cmd_args.healthcheck + + def _ray_stop_cleanup_block(self) -> str: + role_specs: list[tuple[str, str, int]] = [] + if not self.is_disaggregated and self._needs_ray("serve"): + role_specs.append(("SERVE_NODELIST", "serve", self.role_node_count("serve"))) + if self.is_disaggregated: + if self._needs_ray("prefill"): + role_specs.append(("PREFILL_NODELIST", "prefill", self.role_node_count("prefill"))) + if self._needs_ray("decode"): + role_specs.append(("DECODE_NODELIST", "decode", self.role_node_count("decode"))) + + if not role_specs: + return "" + + lines = [' echo "Stopping Ray clusters..."'] + for nodelist_var, role, node_count in role_specs: + stop_prefix = self._role_srun_prefix(f"${{{nodelist_var}}}", node_count, node_count) + stop_command = f"{stop_prefix} bash -lc 'ray stop --force >/dev/null 2>&1 || true' >/dev/null 2>&1 || true" + lines.extend( + [ + f' if [ -n "${{{nodelist_var}:-}}" ]; then', + f" {stop_command}", + " else", + f' echo "Skipping Ray stop for {role}: node list is not set"', + " fi", + ] + ) + return "\n".join(lines) + + def generate_cleanup_function(self, pid_vars: list[str], timeout: int = 15) -> str: + cleanup = super().generate_cleanup_function(pid_vars, timeout) + ray_stop_block = self._ray_stop_cleanup_block() + if not ray_stop_block: + return cleanup + return cleanup.replace("cleanup() {\n", f"cleanup() {{\n{ray_stop_block}\n", 1) + + def render_serve_launch( + self, + role: str, + command_tail: str, + pid_var: str, + log_file: str, + node_count: int, + head_node_var: str, + nodelist_var: str, + ) -> str: + if node_count <= 1: + return super().render_serve_launch( + role, command_tail, pid_var, log_file, node_count, head_node_var, nodelist_var + ) + + role_prefix = role.upper() + ray_pid_var = f"{role_prefix}_RAY_PID" + ray_port_var = f"{role_prefix}_RAY_PORT" + node_array_var = f"{role_prefix}_NODES" + ray_head_log = f"{self.workload_slug}-{role}-ray-head.log" + ray_worker_log = f"{self.workload_slug}-{role}-ray-worker-%N.log" + serve_log = f"{self.test_run.output_path.absolute()}/{log_file}" + head_node_expr = f"${{{head_node_var}}}" + worker_prefix = self._role_srun_prefix("$node") + head_prefix = self._single_role_srun_prefix(head_node_var) + serve_cmd = self._with_custom_bash(f'env RAY_ADDRESS="{head_node_expr}:${{{ray_port_var}}}" {command_tail}') + ray_head_args = self._ray_start_args(role, "head", {"head": True, "port": f'"${{{ray_port_var}}}"'}) + ray_worker_args = self._ray_start_args( + role, + "worker", + {"address": f"{head_node_expr}:${{{ray_port_var}}}", "block": True}, + ) + ray_head_command = shlex.quote( + f"""\ +ray stop --force >/dev/null 2>&1 || true +ray start {ray_head_args} + +active_nodes=0 +for (( i=0; i < {self.tdef.cmd_args.serve_wait_seconds}; i+=5 )); do + active_nodes=$(python3 -c 'import ray; ray.init(); print(sum(node["Alive"] for node in ray.nodes()))') + if [ "$active_nodes" -eq "{node_count}" ]; then + echo "All Ray workers are active: $active_nodes/{node_count}" + ray status || true + exec {serve_cmd} + fi + echo "Waiting for Ray workers: $active_nodes/{node_count} active" + sleep 5 +done + +echo "Waiting for Ray workers timed out: $active_nodes/{node_count} active" +exit 1""" + ) + ray_worker_command = shlex.quote( + f"""\ +ray stop --force >/dev/null 2>&1 || true +for (( i=0; i < {self.tdef.cmd_args.serve_wait_seconds}; i+=5 )); do + if ray start {ray_worker_args}; then + echo "Ray worker connected to {head_node_expr}:${{{ray_port_var}}}" + exit 0 + fi + echo "Waiting until the Ray worker can connect to {head_node_expr}:${{{ray_port_var}}}..." + sleep 5 +done +echo "Ray worker startup timed out for {head_node_expr}:${{{ray_port_var}}}" +exit 1""" + ) + + return f"""\ +( + trap 'kill -TERM $(jobs -pr) 2>/dev/null' TERM EXIT + for node in "${{{node_array_var}[@]:1}}"; do + {worker_prefix} \\ + --output={self.test_run.output_path.absolute()}/{ray_worker_log} \\ + bash -lc {ray_worker_command} & + done + wait +) & +{ray_pid_var}=$! +{head_prefix} \\ + --output={serve_log} \\ + --error={self.test_run.output_path.absolute()}/{ray_head_log} \\ + bash -lc {ray_head_command} & +{pid_var}=$!""" def disaggregated_role_env(self, role: str, gpu_ids: list[int]) -> dict[str, str]: env = super().disaggregated_role_env(role, gpu_ids) diff --git a/src/cloudai/workloads/vllm/vllm.py b/src/cloudai/workloads/vllm/vllm.py index f77039edc..af5b815af 100644 --- a/src/cloudai/workloads/vllm/vllm.py +++ b/src/cloudai/workloads/vllm/vllm.py @@ -48,6 +48,14 @@ class VllmArgs(LLMServingArgs): """Base command arguments for vLLM instances.""" + ray_head: VllmRayStartArgs | None = Field( + default=None, + description="Arguments appended to the Ray head startup command for multi-node vLLM roles.", + ) + ray_worker: VllmRayStartArgs | None = Field( + default=None, + description="Arguments appended to the Ray worker startup command for multi-node vLLM roles.", + ) nixl_threads: int | list[int] | None = Field( default=None, description="Set ``kv_connector_extra_config.num_threads`` for ``--kv-transfer-config`` CLI argument.", @@ -55,7 +63,7 @@ class VllmArgs(LLMServingArgs): @property def serve_args_exclude(self) -> set[str]: - return super().serve_args_exclude | {"nixl_threads"} + return super().serve_args_exclude | {"nixl_threads", "ray_head", "ray_worker"} def serialize_serve_arg(self, key: str, value: object) -> list[str]: opt = f"--{key.replace('_', '-')}" @@ -64,6 +72,17 @@ def serialize_serve_arg(self, key: str, value: object) -> list[str]: return super().serialize_serve_arg(key, value) +class VllmRayStartArgs(CmdArgs): + """Ray startup arguments for vLLM multi-node serving roles.""" + + model_config = ConfigDict(extra="allow") + + head: bool | list[bool] | None = Field(default=None, description="Emit ``--head`` for Ray head startup.") + port: int | str | list[int] | list[str] | None = Field(default=None, description="Ray head port.") + address: str | list[str] | None = Field(default=None, description="Ray head address for worker startup.") + block: bool | list[bool] | None = Field(default=None, description="Emit ``--block`` for Ray worker startup.") + + class VllmCmdArgs(LLMServingCmdArgs[VllmArgs]): """vLLM serve command arguments.""" @@ -71,6 +90,10 @@ class VllmCmdArgs(LLMServingCmdArgs[VllmArgs]): proxy_script: str = "/opt/vllm/tests/v1/kv_connector/nixl_integration/toy_proxy_server.py" healthcheck: str = Field(default="/healthcheck", description="vLLM server healthcheck endpoint.") + proxy_healthcheck: str = Field( + default="/healthcheck", + description="vLLM disaggregated proxy/router healthcheck endpoint.", + ) model: str = "Qwen/Qwen3-0.6B" prefill: VllmArgs | None = Field( @@ -156,22 +179,59 @@ def _validate_vllm_parallelism_constraints(role: str, args: VllmArgs, gpu_count: def constraint_check(self, tr: TestRun, system: Optional[System]) -> bool: system_gpus_per_node = getattr(system, "gpus_per_node", None) if system is not None else None num_nodes = tr.nnodes + local_gpu_count = len(all_gpu_ids(self, system_gpus_per_node)) if self.cmd_args.prefill is None: return self._validate_vllm_parallelism_constraints( role="decode", args=self.cmd_args.decode, - gpu_count=len(all_gpu_ids(self, system_gpus_per_node)), + gpu_count=local_gpu_count * num_nodes, ) + prefill_nodes_value = self.cmd_args.prefill.num_nodes + decode_nodes_value = self.cmd_args.decode.num_nodes + if prefill_nodes_value is None and decode_nodes_value is None: + if num_nodes > 2: + logging.error( + "vLLM disaggregated mode over more than 2 nodes requires both prefill.num_nodes and " + "decode.num_nodes." + ) + return False + prefill_nodes = 1 + decode_nodes = 1 + elif not isinstance(prefill_nodes_value, int) or not isinstance(decode_nodes_value, int): + logging.error("vLLM disaggregated role node counts must both be single integers or both be omitted.") + return False + elif prefill_nodes_value <= 0 or decode_nodes_value <= 0: + logging.error( + "vLLM disaggregated role node counts must be positive integers. prefill=%s decode=%s", + prefill_nodes_value, + decode_nodes_value, + ) + return False + elif num_nodes == 1 and prefill_nodes_value == 1 and decode_nodes_value == 1: + prefill_nodes = 1 + decode_nodes = 1 + elif prefill_nodes_value + decode_nodes_value != num_nodes: + logging.error( + "vLLM disaggregated role node counts must sum to allocated nodes. prefill=%s decode=%s allocated=%s", + prefill_nodes_value, + decode_nodes_value, + num_nodes, + ) + return False + else: + prefill_nodes = prefill_nodes_value + decode_nodes = decode_nodes_value + return self._validate_vllm_parallelism_constraints( role="prefill", args=self.cmd_args.prefill, - gpu_count=len(calculate_prefill_gpu_ids(self, num_nodes, system_gpus_per_node)), + gpu_count=len(calculate_prefill_gpu_ids(self, num_nodes, system_gpus_per_node)) * prefill_nodes, ) and self._validate_vllm_parallelism_constraints( role="decode", args=self.cmd_args.decode, - gpu_count=len(calculate_decode_gpu_ids(self, num_nodes, system_gpus_per_node)), + gpu_count=len(calculate_decode_gpu_ids(self, num_nodes, system_gpus_per_node)) * decode_nodes, ) def was_run_successful(self, tr: TestRun) -> JobStatusResult: diff --git a/tests/ref_data/sglang-disagg-2nodes.sbatch b/tests/ref_data/sglang-disagg-2nodes.sbatch index d7732fc68..8ab9ee091 100644 --- a/tests/ref_data/sglang-disagg-2nodes.sbatch +++ b/tests/ref_data/sglang-disagg-2nodes.sbatch @@ -52,25 +52,29 @@ wait_for_health() { } NODES=( $(scontrol show hostname $SLURM_JOB_NODELIST) ) -export PREFILL_NODE=${NODES[0]} -export DECODE_NODE=${NODES[1]:-${PREFILL_NODE}} -if [ -z "$PREFILL_NODE" ]; then - echo "Failed to resolve allocated nodes for disaggregated SGLang" +PREFILL_NODES=( "${NODES[@]:0:1}" ) +DECODE_NODES=( "${NODES[@]:1:1}" ) +if [ "${#PREFILL_NODES[@]}" -ne 1 ] || [ "${#DECODE_NODES[@]}" -ne 1 ]; then + echo "Expected 1 prefill and 1 decode nodes for disaggregated SGLang, got: ${NODES[*]}" exit 1 fi -if [ -z "${NODES[1]}" ]; then - echo "Expected 2 allocated nodes for disaggregated SGLang, got: ${NODES[*]}" +export PREFILL_NODE=${PREFILL_NODES[0]} +export DECODE_NODE=${DECODE_NODES[0]} +PREFILL_NODELIST=$(IFS=,; echo "${PREFILL_NODES[*]}") +DECODE_NODELIST=$(IFS=,; echo "${DECODE_NODES[*]}") +if [ -z "$PREFILL_NODE" ] || [ -z "$DECODE_NODE" ]; then + echo "Failed to resolve allocated nodes for disaggregated SGLang" exit 1 fi -echo "Node roles: prefill=$PREFILL_NODE decode=$DECODE_NODE" +echo "Node roles: prefill=${PREFILL_NODES[*]} decode=${DECODE_NODES[*]}" echo "Starting SGLang instances..." -srun --export=ALL --mpi=none --container-image=docker.io/lmsysorg/sglang:dev --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__INSTALL_DIR__/huggingface:/root/.cache/huggingface --overlap --ntasks-per-node=1 --ntasks=1 --relative=0 -N1 \ +srun --export=ALL --mpi=none --container-image=docker.io/lmsysorg/sglang:dev --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__INSTALL_DIR__/huggingface:/root/.cache/huggingface --overlap --nodelist="${PREFILL_NODE}" --nodes=1 --ntasks=1 --ntasks-per-node=1 \ --output=__OUTPUT_DIR__/output/sglang-prefill.log \ env CUDA_VISIBLE_DEVICES="0,1,2,3" python3 -m sglang.launch_server --model-path Qwen/Qwen3-8B --host 0.0.0.0 --port 8400 --disaggregation-mode prefill --disaggregation-transfer-backend nixl & PREFILL_PID=$! -srun --export=ALL --mpi=none --container-image=docker.io/lmsysorg/sglang:dev --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__INSTALL_DIR__/huggingface:/root/.cache/huggingface --overlap --ntasks-per-node=1 --ntasks=1 --relative=1 -N1 \ +srun --export=ALL --mpi=none --container-image=docker.io/lmsysorg/sglang:dev --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__INSTALL_DIR__/huggingface:/root/.cache/huggingface --overlap --nodelist="${DECODE_NODE}" --nodes=1 --ntasks=1 --ntasks-per-node=1 \ --output=__OUTPUT_DIR__/output/sglang-decode.log \ env CUDA_VISIBLE_DEVICES="0,1,2,3" python3 -m sglang.launch_server --model-path Qwen/Qwen3-8B --host 0.0.0.0 --port 8500 --disaggregation-mode decode --disaggregation-transfer-backend nixl & DECODE_PID=$! @@ -80,7 +84,7 @@ wait_for_health "http://${PREFILL_NODE}:8400/health" || exit 1 wait_for_health "http://${DECODE_NODE}:8500/health" || exit 1 echo "Starting router..." -srun --export=ALL --mpi=none --container-image=docker.io/lmsysorg/sglang:dev --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__INSTALL_DIR__/huggingface:/root/.cache/huggingface --overlap --ntasks-per-node=1 --ntasks=1 --relative=0 -N1 \ +srun --export=ALL --mpi=none --container-image=docker.io/lmsysorg/sglang:dev --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__INSTALL_DIR__/huggingface:/root/.cache/huggingface --overlap --nodelist="${PREFILL_NODE}" --nodes=1 --ntasks=1 --ntasks-per-node=1 \ --output=__OUTPUT_DIR__/output/sglang-router.log \ python3 -m sglang_router.launch_router --pd-disaggregation --prefill http://${PREFILL_NODE}:8400 --decode http://${DECODE_NODE}:8500 --host 0.0.0.0 --port 8300 & HELPER_PID=$! @@ -89,7 +93,7 @@ echo "Waiting for SGLang on $PREFILL_NODE server to be ready..." wait_for_health "http://${PREFILL_NODE}:8300/v1/models" || exit 1 echo "Running benchmark..." -srun --export=ALL --mpi=none --container-image=docker.io/lmsysorg/sglang:dev --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__INSTALL_DIR__/huggingface:/root/.cache/huggingface --overlap --ntasks-per-node=1 --ntasks=1 --relative=0 -N1 \ +srun --export=ALL --mpi=none --container-image=docker.io/lmsysorg/sglang:dev --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__INSTALL_DIR__/huggingface:/root/.cache/huggingface --overlap --nodelist="${PREFILL_NODE}" --nodes=1 --ntasks=1 --ntasks-per-node=1 \ --output=__OUTPUT_DIR__/output/sglang-bench.log \ python3 -m sglang.bench_serving --backend sglang --base-url http://${PREFILL_NODE}:8300 --model Qwen/Qwen3-8B --dataset-name random --num-prompts 30 --max-concurrency 16 --random-input 16 --random-output 128 --warmup-requests 2 --random-range-ratio 1.0 --output-file __OUTPUT_DIR__/output/sglang-bench.jsonl --output-details --pd-separated diff --git a/tests/ref_data/sglang-disagg.sbatch b/tests/ref_data/sglang-disagg.sbatch index 048ebe430..1d737f556 100644 --- a/tests/ref_data/sglang-disagg.sbatch +++ b/tests/ref_data/sglang-disagg.sbatch @@ -52,21 +52,29 @@ wait_for_health() { } NODES=( $(scontrol show hostname $SLURM_JOB_NODELIST) ) -export PREFILL_NODE=${NODES[0]} -export DECODE_NODE=${NODES[1]:-${PREFILL_NODE}} -if [ -z "$PREFILL_NODE" ]; then +PREFILL_NODES=( "${NODES[@]:0:1}" ) +DECODE_NODES=( "${NODES[@]:0:1}" ) +if [ "${#PREFILL_NODES[@]}" -ne 1 ] || [ "${#DECODE_NODES[@]}" -ne 1 ]; then + echo "Expected 1 prefill and 1 decode nodes for disaggregated SGLang, got: ${NODES[*]}" + exit 1 +fi +export PREFILL_NODE=${PREFILL_NODES[0]} +export DECODE_NODE=${DECODE_NODES[0]} +PREFILL_NODELIST=$(IFS=,; echo "${PREFILL_NODES[*]}") +DECODE_NODELIST=$(IFS=,; echo "${DECODE_NODES[*]}") +if [ -z "$PREFILL_NODE" ] || [ -z "$DECODE_NODE" ]; then echo "Failed to resolve allocated nodes for disaggregated SGLang" exit 1 fi -echo "Node roles: prefill=$PREFILL_NODE decode=$DECODE_NODE" +echo "Node roles: prefill=${PREFILL_NODES[*]} decode=${DECODE_NODES[*]}" echo "Starting SGLang instances..." -srun --export=ALL --mpi=none -N1 --container-image=docker.io/lmsysorg/sglang:dev --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__INSTALL_DIR__/huggingface:/root/.cache/huggingface --overlap --ntasks-per-node=1 --ntasks=1 \ +srun --export=ALL --mpi=none --container-image=docker.io/lmsysorg/sglang:dev --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__INSTALL_DIR__/huggingface:/root/.cache/huggingface --overlap --nodelist="${PREFILL_NODE}" --nodes=1 --ntasks=1 --ntasks-per-node=1 \ --output=__OUTPUT_DIR__/output/sglang-prefill.log \ env CUDA_VISIBLE_DEVICES="0,1" python3 -m sglang.launch_server --model-path Qwen/Qwen3-8B --host 0.0.0.0 --port 8400 --disaggregation-mode prefill --disaggregation-transfer-backend nixl & PREFILL_PID=$! -srun --export=ALL --mpi=none -N1 --container-image=docker.io/lmsysorg/sglang:dev --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__INSTALL_DIR__/huggingface:/root/.cache/huggingface --overlap --ntasks-per-node=1 --ntasks=1 \ +srun --export=ALL --mpi=none --container-image=docker.io/lmsysorg/sglang:dev --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__INSTALL_DIR__/huggingface:/root/.cache/huggingface --overlap --nodelist="${DECODE_NODE}" --nodes=1 --ntasks=1 --ntasks-per-node=1 \ --output=__OUTPUT_DIR__/output/sglang-decode.log \ env CUDA_VISIBLE_DEVICES="2,3" python3 -m sglang.launch_server --model-path Qwen/Qwen3-8B --host 0.0.0.0 --port 8500 --disaggregation-mode decode --disaggregation-transfer-backend nixl & DECODE_PID=$! @@ -76,7 +84,7 @@ wait_for_health "http://${PREFILL_NODE}:8400/health" || exit 1 wait_for_health "http://${DECODE_NODE}:8500/health" || exit 1 echo "Starting router..." -srun --export=ALL --mpi=none -N1 --container-image=docker.io/lmsysorg/sglang:dev --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__INSTALL_DIR__/huggingface:/root/.cache/huggingface --overlap --ntasks-per-node=1 --ntasks=1 \ +srun --export=ALL --mpi=none --container-image=docker.io/lmsysorg/sglang:dev --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__INSTALL_DIR__/huggingface:/root/.cache/huggingface --overlap --nodelist="${PREFILL_NODE}" --nodes=1 --ntasks=1 --ntasks-per-node=1 \ --output=__OUTPUT_DIR__/output/sglang-router.log \ python3 -m sglang_router.launch_router --pd-disaggregation --prefill http://${PREFILL_NODE}:8400 --decode http://${DECODE_NODE}:8500 --host 0.0.0.0 --port 8300 & HELPER_PID=$! @@ -85,7 +93,7 @@ echo "Waiting for SGLang on $PREFILL_NODE server to be ready..." wait_for_health "http://${PREFILL_NODE}:8300/v1/models" || exit 1 echo "Running benchmark..." -srun --export=ALL --mpi=none -N1 --container-image=docker.io/lmsysorg/sglang:dev --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__INSTALL_DIR__/huggingface:/root/.cache/huggingface --overlap --ntasks-per-node=1 --ntasks=1 \ +srun --export=ALL --mpi=none --container-image=docker.io/lmsysorg/sglang:dev --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__INSTALL_DIR__/huggingface:/root/.cache/huggingface --overlap --nodelist="${PREFILL_NODE}" --nodes=1 --ntasks=1 --ntasks-per-node=1 \ --output=__OUTPUT_DIR__/output/sglang-bench.log \ python3 -m sglang.bench_serving --backend sglang --base-url http://${PREFILL_NODE}:8300 --model Qwen/Qwen3-8B --dataset-name random --num-prompts 30 --max-concurrency 16 --random-input 16 --random-output 128 --warmup-requests 2 --random-range-ratio 1.0 --output-file __OUTPUT_DIR__/output/sglang-bench.jsonl --output-details --pd-separated diff --git a/tests/ref_data/sglang-multinode.sbatch b/tests/ref_data/sglang-multinode.sbatch new file mode 100644 index 000000000..e58989fbd --- /dev/null +++ b/tests/ref_data/sglang-multinode.sbatch @@ -0,0 +1,75 @@ +#!/bin/bash +# generated by CloudAI@__CLOUDAI_VERSION__ +#SBATCH --job-name=__JOB_NAME__ +#SBATCH --output=__OUTPUT_DIR__/output/stdout.txt +#SBATCH --error=__OUTPUT_DIR__/output/stderr.txt +#SBATCH --partition=main +#SBATCH -N 2 +#SBATCH --gpus-per-node=8 +#SBATCH --gres=gpu:8 + +export SLURM_JOB_MASTER_NODE=$(scontrol show hostname $SLURM_JOB_NODELIST | head -n 1) +export CUDA_VISIBLE_DEVICES=0,1,2,3 +srun --export=ALL --mpi=none -N2 --container-image=docker.io/lmsysorg/sglang:dev --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__INSTALL_DIR__/huggingface:/root/.cache/huggingface --output=__OUTPUT_DIR__/output/mapping-stdout.txt --error=__OUTPUT_DIR__/output/mapping-stderr.txt bash -c "echo \$(date): \$(hostname):node \${SLURM_NODEID}:rank \${SLURM_PROCID}." + +srun --export=ALL --mpi=none -N2 --container-image=docker.io/lmsysorg/sglang:dev --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__INSTALL_DIR__/huggingface:/root/.cache/huggingface --ntasks=2 --ntasks-per-node=1 --output=__OUTPUT_DIR__/output/metadata/node-%N.toml --error=__OUTPUT_DIR__/output/metadata/nodes.err bash /cloudai_install/slurm-metadata.sh + +cleanup() { + echo "Cleaning up PIDs: SERVE_PID=$SERVE_PID" + kill -TERM "$SERVE_PID" 2>/dev/null + i=0 + while kill -0 "$SERVE_PID" 2>/dev/null; do + [ "$i" -ge 15 ] && echo "PID did not exit in time" && return 1 + sleep 1 + i=$((i+1)) + done +} +trap cleanup EXIT + +wait_for_health() { + local endpoint="$1" + local timeout=300 + local interval=5 + local end_time=$(($(date +%s) + timeout)) + + while [ "$(date +%s)" -lt "$end_time" ]; do + if curl -sf "$endpoint" > /dev/null 2>&1; then + echo "Health check passed: $endpoint" + return 0 + fi + sleep "$interval" + done + + echo "Timeout waiting for: $endpoint" + return 1 +} + +export PORT_OFFSET=$((SLURM_JOB_ID % 1000)) +export SERVE_DIST_INIT_PORT=$((20000 + PORT_OFFSET)) + +NODES=( $(scontrol show hostname $SLURM_JOB_NODELIST) ) +SERVE_NODES=( "${NODES[@]:0:2}" ) +if [ "${#SERVE_NODES[@]}" -ne 2 ]; then + echo "Expected 2 allocated nodes for SGLang, got: ${NODES[*]}" + exit 1 +fi +export SERVE_NODE=${SERVE_NODES[0]} +export NODE=$SERVE_NODE +SERVE_NODELIST=$(IFS=,; echo "${SERVE_NODES[*]}") +echo "Node roles: serve=${SERVE_NODES[*]}" + +echo "Starting SGLang instances..." +srun --export=ALL --mpi=none --container-image=docker.io/lmsysorg/sglang:dev --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__INSTALL_DIR__/huggingface:/root/.cache/huggingface --overlap --nodelist="${SERVE_NODELIST}" --nodes=2 --ntasks=2 --ntasks-per-node=1 \ + --output=__OUTPUT_DIR__/output/sglang-serve.log-%N \ + bash -c 'exec env CUDA_VISIBLE_DEVICES="0,1,2,3" python3 -m sglang.launch_server --model-path Qwen/Qwen3-8B --host 0.0.0.0 --port 8300 --tp 8 --dist-init-addr "${SERVE_NODE}:${SERVE_DIST_INIT_PORT}" --nnodes 2 --node-rank "$SLURM_PROCID"' & +SERVE_PID=$! + +echo "Waiting for SGLang on $NODE to be ready..." +wait_for_health "http://${NODE}:8300/v1/models" || exit 1 + +echo "Running benchmark..." +srun --export=ALL --mpi=none --container-image=docker.io/lmsysorg/sglang:dev --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__INSTALL_DIR__/huggingface:/root/.cache/huggingface --overlap --nodelist="${SERVE_NODE}" --nodes=1 --ntasks=1 --ntasks-per-node=1 \ + --output=__OUTPUT_DIR__/output/sglang-bench.log \ + python3 -m sglang.bench_serving --backend sglang --base-url http://${NODE}:8300 --model Qwen/Qwen3-8B --dataset-name random --num-prompts 30 --max-concurrency 16 --random-input 16 --random-output 128 --warmup-requests 2 --random-range-ratio 1.0 --output-file __OUTPUT_DIR__/output/sglang-bench.jsonl --output-details + +cleanup diff --git a/tests/ref_data/vllm-disagg-2nodes.sbatch b/tests/ref_data/vllm-disagg-2nodes.sbatch index 82b2fde40..703e48da4 100644 --- a/tests/ref_data/vllm-disagg-2nodes.sbatch +++ b/tests/ref_data/vllm-disagg-2nodes.sbatch @@ -56,25 +56,29 @@ export PREFILL_NIXL_PORT=$((5557 + PORT_OFFSET)) export DECODE_NIXL_PORT=$((5557 + PORT_OFFSET + 4)) NODES=( $(scontrol show hostname $SLURM_JOB_NODELIST) ) -export PREFILL_NODE=${NODES[0]} -export DECODE_NODE=${NODES[1]:-${PREFILL_NODE}} -if [ -z "$PREFILL_NODE" ]; then - echo "Failed to resolve allocated nodes for disaggregated vLLM" +PREFILL_NODES=( "${NODES[@]:0:1}" ) +DECODE_NODES=( "${NODES[@]:1:1}" ) +if [ "${#PREFILL_NODES[@]}" -ne 1 ] || [ "${#DECODE_NODES[@]}" -ne 1 ]; then + echo "Expected 1 prefill and 1 decode nodes for disaggregated vLLM, got: ${NODES[*]}" exit 1 fi -if [ -z "${NODES[1]}" ]; then - echo "Expected 2 allocated nodes for disaggregated vLLM, got: ${NODES[*]}" +export PREFILL_NODE=${PREFILL_NODES[0]} +export DECODE_NODE=${DECODE_NODES[0]} +PREFILL_NODELIST=$(IFS=,; echo "${PREFILL_NODES[*]}") +DECODE_NODELIST=$(IFS=,; echo "${DECODE_NODES[*]}") +if [ -z "$PREFILL_NODE" ] || [ -z "$DECODE_NODE" ]; then + echo "Failed to resolve allocated nodes for disaggregated vLLM" exit 1 fi -echo "Node roles: prefill=$PREFILL_NODE decode=$DECODE_NODE" +echo "Node roles: prefill=${PREFILL_NODES[*]} decode=${DECODE_NODES[*]}" echo "Starting vLLM instances..." -srun --export=ALL --mpi=none --container-image=nvcr.io/nvidia/vllm:latest --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__INSTALL_DIR__/huggingface:/root/.cache/huggingface --overlap --ntasks-per-node=1 --ntasks=1 --relative=0 -N1 \ +srun --export=ALL --mpi=none --container-image=nvcr.io/nvidia/vllm:latest --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__INSTALL_DIR__/huggingface:/root/.cache/huggingface --overlap --nodelist="${PREFILL_NODE}" --nodes=1 --ntasks=1 --ntasks-per-node=1 \ --output=__OUTPUT_DIR__/output/vllm-prefill.log \ env CUDA_VISIBLE_DEVICES="0,1,2,3" VLLM_NIXL_SIDE_CHANNEL_HOST="${PREFILL_NODE}" VLLM_NIXL_SIDE_CHANNEL_PORT="$PREFILL_NIXL_PORT" vllm serve Qwen/Qwen3-0.6B --host 0.0.0.0 --port 8400 --kv-transfer-config '{"kv_connector":"NixlConnector","kv_role":"kv_producer"}' & PREFILL_PID=$! -srun --export=ALL --mpi=none --container-image=nvcr.io/nvidia/vllm:latest --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__INSTALL_DIR__/huggingface:/root/.cache/huggingface --overlap --ntasks-per-node=1 --ntasks=1 --relative=1 -N1 \ +srun --export=ALL --mpi=none --container-image=nvcr.io/nvidia/vllm:latest --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__INSTALL_DIR__/huggingface:/root/.cache/huggingface --overlap --nodelist="${DECODE_NODE}" --nodes=1 --ntasks=1 --ntasks-per-node=1 \ --output=__OUTPUT_DIR__/output/vllm-decode.log \ env CUDA_VISIBLE_DEVICES="0,1,2,3" VLLM_NIXL_SIDE_CHANNEL_HOST="${DECODE_NODE}" VLLM_NIXL_SIDE_CHANNEL_PORT="$DECODE_NIXL_PORT" vllm serve Qwen/Qwen3-0.6B --host 0.0.0.0 --port 8500 --kv-transfer-config '{"kv_connector":"NixlConnector","kv_role":"kv_consumer"}' & DECODE_PID=$! @@ -84,7 +88,7 @@ wait_for_health "http://${PREFILL_NODE}:8400/health" || exit 1 wait_for_health "http://${DECODE_NODE}:8500/health" || exit 1 echo "Starting router..." -srun --export=ALL --mpi=none --container-image=nvcr.io/nvidia/vllm:latest --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__INSTALL_DIR__/huggingface:/root/.cache/huggingface --overlap --ntasks-per-node=1 --ntasks=1 --relative=0 -N1 \ +srun --export=ALL --mpi=none --container-image=nvcr.io/nvidia/vllm:latest --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__INSTALL_DIR__/huggingface:/root/.cache/huggingface --overlap --nodelist="${PREFILL_NODE}" --nodes=1 --ntasks=1 --ntasks-per-node=1 \ --output=__OUTPUT_DIR__/output/vllm-router.log \ python3 /opt/vllm/tests/v1/kv_connector/nixl_integration/toy_proxy_server.py --host 0.0.0.0 --port 8300 --prefiller-hosts ${PREFILL_NODE} --prefiller-ports 8400 --decoder-hosts ${DECODE_NODE} --decoder-ports 8500 & HELPER_PID=$! @@ -93,7 +97,7 @@ echo "Waiting for vLLM on $PREFILL_NODE server to be ready..." wait_for_health "http://${PREFILL_NODE}:8300/healthcheck" || exit 1 echo "Running benchmark..." -srun --export=ALL --mpi=none --container-image=nvcr.io/nvidia/vllm:latest --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__INSTALL_DIR__/huggingface:/root/.cache/huggingface --overlap --ntasks-per-node=1 --ntasks=1 --relative=0 -N1 \ +srun --export=ALL --mpi=none --container-image=nvcr.io/nvidia/vllm:latest --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__INSTALL_DIR__/huggingface:/root/.cache/huggingface --overlap --nodelist="${PREFILL_NODE}" --nodes=1 --ntasks=1 --ntasks-per-node=1 \ --output=__OUTPUT_DIR__/output/vllm-bench.log \ vllm bench serve --model Qwen/Qwen3-0.6B --base-url http://${PREFILL_NODE}:8300 --random-input-len 16 --random-output-len 128 --max-concurrency 16 --num-prompts 30 --result-dir __OUTPUT_DIR__/output --result-filename vllm-bench.json --save-result diff --git a/tests/ref_data/vllm-disagg.sbatch b/tests/ref_data/vllm-disagg.sbatch index e58bda39e..61ce2503c 100644 --- a/tests/ref_data/vllm-disagg.sbatch +++ b/tests/ref_data/vllm-disagg.sbatch @@ -56,21 +56,29 @@ export PREFILL_NIXL_PORT=$((5557 + PORT_OFFSET)) export DECODE_NIXL_PORT=$((5557 + PORT_OFFSET + 4)) NODES=( $(scontrol show hostname $SLURM_JOB_NODELIST) ) -export PREFILL_NODE=${NODES[0]} -export DECODE_NODE=${NODES[1]:-${PREFILL_NODE}} -if [ -z "$PREFILL_NODE" ]; then +PREFILL_NODES=( "${NODES[@]:0:1}" ) +DECODE_NODES=( "${NODES[@]:0:1}" ) +if [ "${#PREFILL_NODES[@]}" -ne 1 ] || [ "${#DECODE_NODES[@]}" -ne 1 ]; then + echo "Expected 1 prefill and 1 decode nodes for disaggregated vLLM, got: ${NODES[*]}" + exit 1 +fi +export PREFILL_NODE=${PREFILL_NODES[0]} +export DECODE_NODE=${DECODE_NODES[0]} +PREFILL_NODELIST=$(IFS=,; echo "${PREFILL_NODES[*]}") +DECODE_NODELIST=$(IFS=,; echo "${DECODE_NODES[*]}") +if [ -z "$PREFILL_NODE" ] || [ -z "$DECODE_NODE" ]; then echo "Failed to resolve allocated nodes for disaggregated vLLM" exit 1 fi -echo "Node roles: prefill=$PREFILL_NODE decode=$DECODE_NODE" +echo "Node roles: prefill=${PREFILL_NODES[*]} decode=${DECODE_NODES[*]}" echo "Starting vLLM instances..." -srun --export=ALL --mpi=none -N1 --container-image=nvcr.io/nvidia/vllm:latest --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__INSTALL_DIR__/huggingface:/root/.cache/huggingface --overlap --ntasks-per-node=1 --ntasks=1 \ +srun --export=ALL --mpi=none --container-image=nvcr.io/nvidia/vllm:latest --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__INSTALL_DIR__/huggingface:/root/.cache/huggingface --overlap --nodelist="${PREFILL_NODE}" --nodes=1 --ntasks=1 --ntasks-per-node=1 \ --output=__OUTPUT_DIR__/output/vllm-prefill.log \ env CUDA_VISIBLE_DEVICES="0,1" VLLM_NIXL_SIDE_CHANNEL_HOST="${PREFILL_NODE}" VLLM_NIXL_SIDE_CHANNEL_PORT="$PREFILL_NIXL_PORT" vllm serve Qwen/Qwen3-0.6B --host 0.0.0.0 --port 8400 --kv-transfer-config '{"kv_connector":"NixlConnector","kv_role":"kv_producer"}' & PREFILL_PID=$! -srun --export=ALL --mpi=none -N1 --container-image=nvcr.io/nvidia/vllm:latest --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__INSTALL_DIR__/huggingface:/root/.cache/huggingface --overlap --ntasks-per-node=1 --ntasks=1 \ +srun --export=ALL --mpi=none --container-image=nvcr.io/nvidia/vllm:latest --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__INSTALL_DIR__/huggingface:/root/.cache/huggingface --overlap --nodelist="${DECODE_NODE}" --nodes=1 --ntasks=1 --ntasks-per-node=1 \ --output=__OUTPUT_DIR__/output/vllm-decode.log \ env CUDA_VISIBLE_DEVICES="2,3" VLLM_NIXL_SIDE_CHANNEL_HOST="${DECODE_NODE}" VLLM_NIXL_SIDE_CHANNEL_PORT="$DECODE_NIXL_PORT" vllm serve Qwen/Qwen3-0.6B --host 0.0.0.0 --port 8500 --kv-transfer-config '{"kv_connector":"NixlConnector","kv_role":"kv_consumer"}' & DECODE_PID=$! @@ -80,7 +88,7 @@ wait_for_health "http://${PREFILL_NODE}:8400/health" || exit 1 wait_for_health "http://${DECODE_NODE}:8500/health" || exit 1 echo "Starting router..." -srun --export=ALL --mpi=none -N1 --container-image=nvcr.io/nvidia/vllm:latest --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__INSTALL_DIR__/huggingface:/root/.cache/huggingface --overlap --ntasks-per-node=1 --ntasks=1 \ +srun --export=ALL --mpi=none --container-image=nvcr.io/nvidia/vllm:latest --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__INSTALL_DIR__/huggingface:/root/.cache/huggingface --overlap --nodelist="${PREFILL_NODE}" --nodes=1 --ntasks=1 --ntasks-per-node=1 \ --output=__OUTPUT_DIR__/output/vllm-router.log \ python3 /opt/vllm/tests/v1/kv_connector/nixl_integration/toy_proxy_server.py --host 0.0.0.0 --port 8300 --prefiller-hosts ${PREFILL_NODE} --prefiller-ports 8400 --decoder-hosts ${DECODE_NODE} --decoder-ports 8500 & HELPER_PID=$! @@ -89,7 +97,7 @@ echo "Waiting for vLLM on $PREFILL_NODE server to be ready..." wait_for_health "http://${PREFILL_NODE}:8300/healthcheck" || exit 1 echo "Running benchmark..." -srun --export=ALL --mpi=none -N1 --container-image=nvcr.io/nvidia/vllm:latest --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__INSTALL_DIR__/huggingface:/root/.cache/huggingface --overlap --ntasks-per-node=1 --ntasks=1 \ +srun --export=ALL --mpi=none --container-image=nvcr.io/nvidia/vllm:latest --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__INSTALL_DIR__/huggingface:/root/.cache/huggingface --overlap --nodelist="${PREFILL_NODE}" --nodes=1 --ntasks=1 --ntasks-per-node=1 \ --output=__OUTPUT_DIR__/output/vllm-bench.log \ vllm bench serve --model Qwen/Qwen3-0.6B --base-url http://${PREFILL_NODE}:8300 --random-input-len 16 --random-output-len 128 --max-concurrency 16 --num-prompts 30 --result-dir __OUTPUT_DIR__/output --result-filename vllm-bench.json --save-result diff --git a/tests/ref_data/vllm-multinode.sbatch b/tests/ref_data/vllm-multinode.sbatch new file mode 100644 index 000000000..42df1edef --- /dev/null +++ b/tests/ref_data/vllm-multinode.sbatch @@ -0,0 +1,124 @@ +#!/bin/bash +# generated by CloudAI@__CLOUDAI_VERSION__ +#SBATCH --job-name=__JOB_NAME__ +#SBATCH --output=__OUTPUT_DIR__/output/stdout.txt +#SBATCH --error=__OUTPUT_DIR__/output/stderr.txt +#SBATCH --partition=main +#SBATCH -N 2 +#SBATCH --gpus-per-node=8 +#SBATCH --gres=gpu:8 + +export SLURM_JOB_MASTER_NODE=$(scontrol show hostname $SLURM_JOB_NODELIST | head -n 1) +export CUDA_VISIBLE_DEVICES=0,1,2,3 +srun --export=ALL --mpi=none -N2 --container-image=nvcr.io/nvidia/vllm:latest --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__INSTALL_DIR__/huggingface:/root/.cache/huggingface --output=__OUTPUT_DIR__/output/mapping-stdout.txt --error=__OUTPUT_DIR__/output/mapping-stderr.txt bash -c "echo \$(date): \$(hostname):node \${SLURM_NODEID}:rank \${SLURM_PROCID}." + +srun --export=ALL --mpi=none -N2 --container-image=nvcr.io/nvidia/vllm:latest --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__INSTALL_DIR__/huggingface:/root/.cache/huggingface --ntasks=2 --ntasks-per-node=1 --output=__OUTPUT_DIR__/output/metadata/node-%N.toml --error=__OUTPUT_DIR__/output/metadata/nodes.err bash /cloudai_install/slurm-metadata.sh + +cleanup() { + echo "Stopping Ray clusters..." + if [ -n "${SERVE_NODELIST:-}" ]; then + srun --export=ALL --mpi=none --container-image=nvcr.io/nvidia/vllm:latest --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__INSTALL_DIR__/huggingface:/root/.cache/huggingface --overlap --nodelist="${SERVE_NODELIST}" --nodes=2 --ntasks=2 --ntasks-per-node=1 bash -lc 'ray stop --force >/dev/null 2>&1 || true' >/dev/null 2>&1 || true + else + echo "Skipping Ray stop for serve: node list is not set" + fi + echo "Cleaning up PIDs: SERVE_RAY_PID=$SERVE_RAY_PID SERVE_PID=$SERVE_PID" + + for pid in "$SERVE_RAY_PID" "$SERVE_PID"; do + [ -n "$pid" ] && kill -TERM "$pid" 2>/dev/null + done + + for pid in "$SERVE_RAY_PID" "$SERVE_PID"; do + [ -z "$pid" ] && continue + i=0 + while kill -0 "$pid" 2>/dev/null; do + [ "$i" -ge 15 ] && echo "PID $pid did not exit in time" && return 1 + sleep 1 + i=$((i+1)) + done + done +} +trap cleanup EXIT + +wait_for_health() { + local endpoint="$1" + local timeout=300 + local interval=5 + local end_time=$(($(date +%s) + timeout)) + + while [ "$(date +%s)" -lt "$end_time" ]; do + if curl -sf "$endpoint" > /dev/null 2>&1; then + echo "Health check passed: $endpoint" + return 0 + fi + sleep "$interval" + done + + echo "Timeout waiting for: $endpoint" + return 1 +} + +export PORT_OFFSET=$((SLURM_JOB_ID % 1000)) +export SERVE_RAY_PORT=$((6379 + PORT_OFFSET)) +NODES=( $(scontrol show hostname $SLURM_JOB_NODELIST) ) +SERVE_NODES=( "${NODES[@]:0:2}" ) +if [ "${#SERVE_NODES[@]}" -ne 2 ]; then + echo "Expected 2 allocated nodes for vLLM, got: ${NODES[*]}" + exit 1 +fi +export SERVE_NODE=${SERVE_NODES[0]} +export NODE=$SERVE_NODE +SERVE_NODELIST=$(IFS=,; echo "${SERVE_NODES[*]}") +echo "Node roles: serve=${SERVE_NODES[*]}" + +echo "Starting vLLM instances..." +( + trap 'kill -TERM $(jobs -pr) 2>/dev/null' TERM EXIT + for node in "${SERVE_NODES[@]:1}"; do + srun --export=ALL --mpi=none --container-image=nvcr.io/nvidia/vllm:latest --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__INSTALL_DIR__/huggingface:/root/.cache/huggingface --overlap --nodelist="$node" --nodes=1 --ntasks=1 --ntasks-per-node=1 \ + --output=__OUTPUT_DIR__/output/vllm-serve-ray-worker-%N.log \ + bash -lc 'ray stop --force >/dev/null 2>&1 || true +for (( i=0; i < 300; i+=5 )); do + if ray start --address=${SERVE_NODE}:${SERVE_RAY_PORT} --block --num-gpus=4 --num-cpus=64 --disable-usage-stats; then + echo "Ray worker connected to ${SERVE_NODE}:${SERVE_RAY_PORT}" + exit 0 + fi + echo "Waiting until the Ray worker can connect to ${SERVE_NODE}:${SERVE_RAY_PORT}..." + sleep 5 +done +echo "Ray worker startup timed out for ${SERVE_NODE}:${SERVE_RAY_PORT}" +exit 1' & + done + wait +) & +SERVE_RAY_PID=$! +srun --export=ALL --mpi=none --container-image=nvcr.io/nvidia/vllm:latest --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__INSTALL_DIR__/huggingface:/root/.cache/huggingface --overlap --nodelist="${SERVE_NODE}" --nodes=1 --ntasks=1 --ntasks-per-node=1 \ + --output=__OUTPUT_DIR__/output/vllm-serve.log \ + --error=__OUTPUT_DIR__/output/vllm-serve-ray-head.log \ + bash -lc 'ray stop --force >/dev/null 2>&1 || true +ray start --head --port="${SERVE_RAY_PORT}" --num-gpus=4 --num-cpus=64 --disable-usage-stats + +active_nodes=0 +for (( i=0; i < 300; i+=5 )); do + active_nodes=$(python3 -c '"'"'import ray; ray.init(); print(sum(node["Alive"] for node in ray.nodes()))'"'"') + if [ "$active_nodes" -eq "2" ]; then + echo "All Ray workers are active: $active_nodes/2" + ray status || true + exec env RAY_ADDRESS="${SERVE_NODE}:${SERVE_RAY_PORT}" vllm serve Qwen/Qwen3-0.6B --host 0.0.0.0 --tensor-parallel-size 8 --port 8300 --distributed-executor-backend ray + fi + echo "Waiting for Ray workers: $active_nodes/2 active" + sleep 5 +done + +echo "Waiting for Ray workers timed out: $active_nodes/2 active" +exit 1' & +SERVE_PID=$! + +echo "Waiting for vLLM on $NODE to be ready..." +wait_for_health "http://${NODE}:8300/healthcheck" || exit 1 + +echo "Running benchmark..." +srun --export=ALL --mpi=none --container-image=nvcr.io/nvidia/vllm:latest --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__INSTALL_DIR__/huggingface:/root/.cache/huggingface --overlap --nodelist="${SERVE_NODE}" --nodes=1 --ntasks=1 --ntasks-per-node=1 \ + --output=__OUTPUT_DIR__/output/vllm-bench.log \ + vllm bench serve --model Qwen/Qwen3-0.6B --base-url http://${NODE}:8300 --random-input-len 16 --random-output-len 128 --max-concurrency 16 --num-prompts 30 --result-dir __OUTPUT_DIR__/output --result-filename vllm-bench.json --save-result + +cleanup diff --git a/tests/test_acceptance.py b/tests/test_acceptance.py index 151c6fb9e..ea3fc5e9a 100644 --- a/tests/test_acceptance.py +++ b/tests/test_acceptance.py @@ -83,7 +83,7 @@ TritonInferenceTestDefinition, ) from cloudai.workloads.ucc_test import UCCCmdArgs, UCCTestDefinition -from cloudai.workloads.vllm import VllmArgs, VllmCmdArgs, VllmTestDefinition +from cloudai.workloads.vllm import VllmArgs, VllmCmdArgs, VllmRayStartArgs, VllmTestDefinition SLURM_TEST_SCENARIOS = [ {"path": Path("conf/common/test_scenario/sleep.toml"), "expected_dirs_number": 4, "log_file": "sleep_debug.log"}, @@ -276,9 +276,11 @@ def build_special_test_run( "deepep-benchmark", "osu-bench", "sglang", + "sglang-multinode", "sglang-disagg", "sglang-disagg-2nodes", "vllm", + "vllm-multinode", "vllm-disagg", "vllm-disagg-2nodes", ] @@ -602,6 +604,31 @@ def test_req(request, slurm_system: SlurmSystem, partial_tr: partial[TestRun]) - extra_env_vars={"CUDA_VISIBLE_DEVICES": "0"}, ), ), + "vllm-multinode": lambda: create_test_run( + partial_tr, + "vllm-multinode", + VllmTestDefinition( + name="vllm-multinode", + description="vLLM distributed benchmark on 2 nodes", + test_template_name="Vllm", + cmd_args=VllmCmdArgs( + docker_image_url="nvcr.io/nvidia/vllm:latest", + model="Qwen/Qwen3-0.6B", + decode=VllmArgs.model_validate( + { + "tensor_parallel_size": 8, + "ray_head": VllmRayStartArgs.model_validate( + {"num_gpus": 4, "num_cpus": 64, "disable_usage_stats": True} + ), + "ray_worker": VllmRayStartArgs.model_validate( + {"num_gpus": 4, "num_cpus": 64, "disable_usage_stats": True} + ), + } + ), + ), + extra_env_vars={"CUDA_VISIBLE_DEVICES": "0,1,2,3"}, + ), + ), "sglang": lambda: create_test_run( partial_tr, "sglang", @@ -616,6 +643,21 @@ def test_req(request, slurm_system: SlurmSystem, partial_tr: partial[TestRun]) - extra_env_vars={"CUDA_VISIBLE_DEVICES": "0"}, ), ), + "sglang-multinode": lambda: create_test_run( + partial_tr, + "sglang-multinode", + SglangTestDefinition( + name="sglang-multinode", + description="SGLang distributed benchmark on 2 nodes", + test_template_name="sglang", + cmd_args=SglangCmdArgs( + docker_image_url="docker.io/lmsysorg/sglang:dev", + model="Qwen/Qwen3-8B", + decode=SglangArgs.model_validate({"tp": 8}), + ), + extra_env_vars={"CUDA_VISIBLE_DEVICES": "0,1,2,3"}, + ), + ), "sglang-disagg": lambda: create_test_run( partial_tr, "sglang-disagg", @@ -700,7 +742,7 @@ def test_req(request, slurm_system: SlurmSystem, partial_tr: partial[TestRun]) - tr.num_nodes = 2 if request.param == "deepep-benchmark": tr.num_nodes = 2 - if request.param in {"sglang-disagg-2nodes", "vllm-disagg-2nodes"}: + if request.param in {"sglang-multinode", "sglang-disagg-2nodes", "vllm-multinode", "vllm-disagg-2nodes"}: tr.num_nodes = 2 return tr, f"{request.param}.sbatch", None diff --git a/tests/workloads/common/test_llm_serving.py b/tests/workloads/common/test_llm_serving.py index e2a7e219d..d6657314a 100644 --- a/tests/workloads/common/test_llm_serving.py +++ b/tests/workloads/common/test_llm_serving.py @@ -155,12 +155,20 @@ def test_fallback_to_system_gpu_count(self, llm_tdef: FakeLLMTestDefinition, gpu assert all_gpu_ids(cast(Any, llm_tdef), gpus_per_node) == list(range(gpus_per_node or 1)) - def test_decode_gpu_ids_override_defaults_in_aggregated_mode(self, llm_tdef: FakeLLMTestDefinition) -> None: + def test_cuda_visible_devices_wins_over_decode_gpu_ids_in_aggregated_mode( + self, llm_tdef: FakeLLMTestDefinition + ) -> None: llm_tdef.extra_env_vars = {"CUDA_VISIBLE_DEVICES": "0,1,2,3"} llm_tdef.cmd_args.decode.gpu_ids = "4,5" assert all_gpu_ids(cast(Any, llm_tdef), 8) == [0, 1, 2, 3] + def test_decode_gpu_ids_override_system_gpu_count_in_aggregated_mode(self, llm_tdef: FakeLLMTestDefinition) -> None: + llm_tdef.extra_env_vars = {} + llm_tdef.cmd_args.decode.gpu_ids = "4,5" + + assert all_gpu_ids(cast(Any, llm_tdef), 8) == [4, 5] + def test_prefill_and_decode_gpu_ids_override_cuda_visible_devices(self, llm_tdef: FakeLLMTestDefinition) -> None: llm_tdef.extra_env_vars = {"CUDA_VISIBLE_DEVICES": "0,1,2,3"} llm_tdef.cmd_args.prefill = FakeLLMArgs(gpu_ids="4") @@ -274,8 +282,12 @@ def test_two_node_disagg_uses_shared_gpu_ids_and_role_hosts(self, slurm_system: assert strategy.bench_log_file == "fake-llm-bench.log" assert strategy.serve_log_file == "fake-llm-serve.log" assert strategy.get_helper_command() == ["helper", "${PREFILL_NODE}", "${DECODE_NODE}"] - assert "DECODE_NODE=${NODES[1]:-${PREFILL_NODE}}" in strategy.generate_disaggregated_node_setup() - assert "Expected 2 allocated nodes for disaggregated Fake LLM" in strategy.generate_disaggregated_node_setup() + assert strategy.disaggregated_role_node_counts() == (1, 1) + node_setup = strategy.generate_disaggregated_node_setup() + assert 'PREFILL_NODES=( "${NODES[@]:0:1}" )' in node_setup + assert 'DECODE_NODES=( "${NODES[@]:1:1}" )' in node_setup + assert "PREFILL_NODE=${PREFILL_NODES[0]}" in node_setup + assert "DECODE_NODE=${DECODE_NODES[0]}" in node_setup def test_single_node_disagg_wait_block_uses_role_hosts(self, slurm_system: SlurmSystem, tmp_path) -> None: tdef = make_tdef(create_prefill=True) @@ -298,16 +310,43 @@ def test_single_node_disagg_wait_block_uses_role_hosts(self, slurm_system: Slurm wait_for_health "http://${PREFILL_NODE}:8400/health" || exit 1 wait_for_health "http://${DECODE_NODE}:8500/health" || exit 1""" ) - assert "DECODE_NODE=${NODES[1]:-${PREFILL_NODE}}" in strategy.generate_disaggregated_node_setup() + node_setup = strategy.generate_disaggregated_node_setup() + assert 'PREFILL_NODES=( "${NODES[@]:0:1}" )' in node_setup + assert 'DECODE_NODES=( "${NODES[@]:0:1}" )' in node_setup + + def test_disagg_more_than_two_nodes_requires_role_sizes(self, slurm_system: SlurmSystem, tmp_path) -> None: + tdef = make_tdef(create_prefill=True) + tdef.extra_env_vars = {"CUDA_VISIBLE_DEVICES": "0,1,2,3"} + tr = TestRun(name="llm", test=tdef, num_nodes=3, nodes=[], output_path=tmp_path) + strategy = FakeLLMSlurmStrategy(slurm_system, tr) + + with pytest.raises(ValueError, match=r"requires both prefill\.num_nodes and decode\.num_nodes"): + strategy.disaggregated_role_node_counts() - def test_more_than_two_disagg_nodes_are_rejected(self, slurm_system: SlurmSystem, tmp_path) -> None: + def test_disagg_explicit_role_sizes_plan_contiguous_node_slices(self, slurm_system: SlurmSystem, tmp_path) -> None: tdef = make_tdef(create_prefill=True) + assert tdef.cmd_args.prefill is not None + tdef.cmd_args.prefill.num_nodes = 2 + tdef.cmd_args.decode.num_nodes = 2 tdef.extra_env_vars = {"CUDA_VISIBLE_DEVICES": "0,1,2,3"} + tr = TestRun(name="llm", test=tdef, num_nodes=4, nodes=[], output_path=tmp_path) + strategy = FakeLLMSlurmStrategy(slurm_system, tr) + + assert strategy.disaggregated_role_node_counts() == (2, 2) + node_setup = strategy.generate_disaggregated_node_setup() + assert 'PREFILL_NODES=( "${NODES[@]:0:2}" )' in node_setup + assert 'DECODE_NODES=( "${NODES[@]:2:2}" )' in node_setup + + def test_disagg_role_sizes_must_match_allocation(self, slurm_system: SlurmSystem, tmp_path) -> None: + tdef = make_tdef(create_prefill=True) + assert tdef.cmd_args.prefill is not None + tdef.cmd_args.prefill.num_nodes = 2 + tdef.cmd_args.decode.num_nodes = 2 tr = TestRun(name="llm", test=tdef, num_nodes=3, nodes=[], output_path=tmp_path) strategy = FakeLLMSlurmStrategy(slurm_system, tr) - with pytest.raises(ValueError, match="supports only 1 or 2 nodes"): - _ = strategy.is_two_node_disaggregated + with pytest.raises(ValueError, match=r"must equal allocated nodes \(3\)"): + strategy.disaggregated_role_node_counts() def test_generate_report_uses_shared_table_builder( diff --git a/tests/workloads/sglang/test_command_gen_strategy_slurm.py b/tests/workloads/sglang/test_command_gen_strategy_slurm.py index c07d1771d..3519e5ac2 100644 --- a/tests/workloads/sglang/test_command_gen_strategy_slurm.py +++ b/tests/workloads/sglang/test_command_gen_strategy_slurm.py @@ -28,7 +28,6 @@ SglangSlurmCommandGenStrategy, SglangTestDefinition, ) -from cloudai.workloads.sglang.sglang import SGLANG_BENCH_JSONL_FILE, SGLANG_BENCH_LOG_FILE @pytest.fixture @@ -73,13 +72,14 @@ def test_container_mounts(sglang_cmd_gen_strategy: SglangSlurmCommandGenStrategy class TestGpuDetection: - @pytest.mark.parametrize("cuda_visible_devices", ["0", "0,1,2,3", "0,1,2,3,4,5,6,7"]) - def test_gpu_ids_from_cuda_visible_devices( - self, cuda_visible_devices: str, sglang_tr: TestRun, slurm_system: SlurmSystem - ) -> None: - sglang_tr.test.extra_env_vars = {"CUDA_VISIBLE_DEVICES": cuda_visible_devices} + def test_aggregated_gpu_ids_from_decode_config(self, sglang_tr: TestRun, slurm_system: SlurmSystem) -> None: + tdef = cast(SglangTestDefinition, sglang_tr.test) + tdef.extra_env_vars = {} + tdef.cmd_args.decode.gpu_ids = "0,1,2,3" strategy = SglangSlurmCommandGenStrategy(slurm_system, sglang_tr) - assert strategy.gpu_ids == [int(gpu_id) for gpu_id in cuda_visible_devices.split(",")] + + assert strategy.gpu_ids == [0, 1, 2, 3] + assert 'env CUDA_VISIBLE_DEVICES="0,1,2,3"' in strategy._gen_srun_command() def test_multinode_disagg_uses_shared_gpu_ids_per_role( self, sglang_disagg_2node_tr: TestRun, slurm_system: SlurmSystem @@ -89,60 +89,6 @@ def test_multinode_disagg_uses_shared_gpu_ids_per_role( assert strategy.decode_gpu_ids == [0, 1, 2, 3] -def test_get_sglang_serve_commands_aggregated(sglang_cmd_gen_strategy: SglangSlurmCommandGenStrategy) -> None: - cmd_args = sglang_cmd_gen_strategy.test_run.test.cmd_args - commands = sglang_cmd_gen_strategy.get_serve_commands() - - assert len(commands) == 1 - assert commands[0] == [ - "python3", - "-m", - cmd_args.serve_module, - "--model-path", - cmd_args.model, - "--host", - cmd_args.host, - "--port", - str(cmd_args.port), - ] - - -def test_get_sglang_serve_commands_disagg(sglang_disagg_tr: TestRun, slurm_system: SlurmSystem) -> None: - strategy = SglangSlurmCommandGenStrategy(slurm_system, sglang_disagg_tr) - - commands = strategy.get_serve_commands() - - assert len(commands) == 2 - prefill_cmd, decode_cmd = commands - assert "--disaggregation-mode" in prefill_cmd - assert "prefill" in prefill_cmd - assert str(strategy.prefill_port) in prefill_cmd - - assert "--disaggregation-mode" in decode_cmd - assert "decode" in decode_cmd - assert str(strategy.decode_port) in decode_cmd - - -def test_get_sglang_bench_command_adds_pd_separated_in_disagg( - sglang_disagg_tr: TestRun, slurm_system: SlurmSystem -) -> None: - strategy = SglangSlurmCommandGenStrategy(slurm_system, sglang_disagg_tr) - - command = strategy.get_bench_command() - - assert "--pd-separated" in command - - -def test_get_sglang_bench_command_writes_jsonl( - sglang_cmd_gen_strategy: SglangSlurmCommandGenStrategy, -) -> None: - command = sglang_cmd_gen_strategy.get_bench_command() - output_file_args = [part for part in command if part.startswith("--output-file ")] - assert len(output_file_args) == 1 - assert f"--base-url http://${{NODE}}:{sglang_cmd_gen_strategy.test_run.test.cmd_args.port}" in command - assert output_file_args[0].endswith(f"/{SGLANG_BENCH_JSONL_FILE}") - - def test_get_sglang_semantic_eval_command_defaults(sglang_cmd_gen_strategy: SglangSlurmCommandGenStrategy): sglang_test = cast(SglangTestDefinition, sglang_cmd_gen_strategy.test_run.test) sglang_test.semantic_eval_cmd_args = SglangSemanticEvalCmdArgs() @@ -202,58 +148,52 @@ def test_gen_srun_command_contains_sglang_semantic_eval_in_disagg( assert "python3 -m sglang.test.run_eval --host ${PREFILL_NODE} --port 8000" in srun_command -def test_gen_srun_command_contains_expected_flow(sglang_disagg_tr: TestRun, slurm_system: SlurmSystem) -> None: - strategy = SglangSlurmCommandGenStrategy(slurm_system, sglang_disagg_tr) - - srun_command = strategy._gen_srun_command() - - assert "Starting SGLang instances" in srun_command - assert "Starting router" in srun_command - assert "PREFILL_NODE=${NODES[0]}" in srun_command - assert "DECODE_NODE=${NODES[1]:-${PREFILL_NODE}}" in srun_command - assert 'env CUDA_VISIBLE_DEVICES="0,1"' in srun_command - assert 'env CUDA_VISIBLE_DEVICES="2,3"' in srun_command - assert 'wait_for_health "http://${PREFILL_NODE}:8100/health"' in srun_command - assert 'wait_for_health "http://${DECODE_NODE}:8200/health"' in srun_command - assert "--prefill http://${PREFILL_NODE}:8100" in srun_command - assert "--decode http://${DECODE_NODE}:8200" in srun_command - assert "--base-url http://${PREFILL_NODE}:8000" in srun_command - assert f"--output={strategy.test_run.output_path.absolute()}/{SGLANG_BENCH_LOG_FILE}" in srun_command - - -def test_gen_srun_command_contains_expected_two_node_flow( - sglang_disagg_2node_tr: TestRun, slurm_system: SlurmSystem +def test_disaggregated_server_healthcheck_defaults_to_legacy_health_endpoint( + sglang_disagg_tr: TestRun, slurm_system: SlurmSystem ) -> None: - strategy = SglangSlurmCommandGenStrategy(slurm_system, sglang_disagg_2node_tr) + strategy = SglangSlurmCommandGenStrategy(slurm_system, sglang_disagg_tr) srun_command = strategy._gen_srun_command() - assert "PREFILL_NODE=${NODES[0]}" in srun_command - assert "DECODE_NODE=${NODES[1]:-${PREFILL_NODE}}" in srun_command - assert srun_command.count("--relative=0 -N1") == 3 - assert srun_command.count("--relative=1 -N1") == 1 - assert 'env CUDA_VISIBLE_DEVICES="0,1,2,3"' in srun_command - assert srun_command.count("--host 0.0.0.0") >= 2 assert 'wait_for_health "http://${PREFILL_NODE}:8100/health"' in srun_command assert 'wait_for_health "http://${DECODE_NODE}:8200/health"' in srun_command - assert "--prefill http://${PREFILL_NODE}:8100" in srun_command - assert "--decode http://${DECODE_NODE}:8200" in srun_command - assert "--base-url http://${PREFILL_NODE}:8000" in srun_command + assert 'wait_for_health "http://${PREFILL_NODE}:8000/v1/models"' in srun_command def test_disagg_more_than_two_nodes_is_rejected(sglang_disagg_tr: TestRun, slurm_system: SlurmSystem) -> None: sglang_disagg_tr.num_nodes = 3 strategy = SglangSlurmCommandGenStrategy(slurm_system, sglang_disagg_tr) - with pytest.raises(ValueError, match="supports only 1 or 2 nodes"): + with pytest.raises(ValueError, match=r"requires both prefill\.num_nodes and decode\.num_nodes"): _ = strategy._gen_srun_command() -def test_gen_srun_command_contains_cuda_visible_devices_for_aggregated( - sglang_cmd_gen_strategy: SglangSlurmCommandGenStrategy, +def test_gen_srun_command_disagg_four_nodes_uses_separate_sglang_distributed_launches( + sglang_disagg_tr: TestRun, slurm_system: SlurmSystem ) -> None: - srun_command = sglang_cmd_gen_strategy._gen_srun_command() - assert 'env CUDA_VISIBLE_DEVICES="0"' in srun_command + tdef = cast(SglangTestDefinition, sglang_disagg_tr.test) + assert tdef.cmd_args.prefill is not None + tdef.cmd_args.prefill.num_nodes = 2 + tdef.cmd_args.decode.num_nodes = 2 + sglang_disagg_tr.num_nodes = 4 + strategy = SglangSlurmCommandGenStrategy(slurm_system, sglang_disagg_tr) + + srun_command = strategy._gen_srun_command() + + assert 'PREFILL_NODES=( "${NODES[@]:0:2}" )' in srun_command + assert 'DECODE_NODES=( "${NODES[@]:2:2}" )' in srun_command + assert "export PREFILL_DIST_INIT_PORT=$((20000 + PORT_OFFSET))" in srun_command + assert "export DECODE_DIST_INIT_PORT=$((21000 + PORT_OFFSET))" in srun_command + assert '--nodelist="${PREFILL_NODELIST}" --nodes=2 --ntasks=2 --ntasks-per-node=1' in srun_command + assert '--nodelist="${DECODE_NODELIST}" --nodes=2 --ntasks=2 --ntasks-per-node=1' in srun_command + assert ( + '--dist-init-addr "${PREFILL_NODE}:${PREFILL_DIST_INIT_PORT}" --nnodes 2 --node-rank "$SLURM_PROCID"' + in srun_command + ) + assert ( + '--dist-init-addr "${DECODE_NODE}:${DECODE_DIST_INIT_PORT}" --nnodes 2 --node-rank "$SLURM_PROCID"' + in srun_command + ) def test_custom_bash_string_wraps_aggregated_serve_and_benchmark( diff --git a/tests/workloads/vllm/test_command_gen_strategy_slurm.py b/tests/workloads/vllm/test_command_gen_strategy_slurm.py index 6eb62483c..865f52989 100644 --- a/tests/workloads/vllm/test_command_gen_strategy_slurm.py +++ b/tests/workloads/vllm/test_command_gen_strategy_slurm.py @@ -25,11 +25,11 @@ VllmArgs, VllmBenchCmdArgs, VllmCmdArgs, + VllmRayStartArgs, VllmSemanticEvalCmdArgs, VllmSlurmCommandGenStrategy, VllmTestDefinition, ) -from cloudai.workloads.vllm.vllm import VLLM_BENCH_JSON_FILE, VLLM_BENCH_LOG_FILE, VLLM_SERVE_LOG_FILE @pytest.fixture @@ -148,29 +148,6 @@ def test_nixl_threads( class TestVllmBenchCommand: - def test_get_vllm_bench_command(self, vllm_cmd_gen_strategy: VllmSlurmCommandGenStrategy) -> None: - tdef = cast(VllmTestDefinition, vllm_cmd_gen_strategy.test_run.test) - cmd_args = tdef.cmd_args - bench_args = tdef.bench_cmd_args - - command = vllm_cmd_gen_strategy.get_bench_command() - - expected = [ - "vllm", - "bench", - "serve", - f"--model {cmd_args.model}", - f"--base-url http://${{NODE}}:{cmd_args.port}", - f"--random-input-len {bench_args.random_input_len}", - f"--random-output-len {bench_args.random_output_len}", - f"--max-concurrency {bench_args.max_concurrency}", - f"--num-prompts {bench_args.num_prompts}", - f"--result-dir {vllm_cmd_gen_strategy.test_run.output_path.absolute()}", - f"--result-filename {VLLM_BENCH_JSON_FILE}", - "--save-result", - ] - assert command == expected - def test_get_vllm_bench_command_with_extra_args( self, vllm: VllmTestDefinition, vllm_tr: TestRun, slurm_system: SlurmSystem ) -> None: @@ -251,14 +228,6 @@ def test_gen_srun_command_contains_vllm_semantic_eval_in_disagg( class TestVllmAggregatedMode: """Tests for vLLM non-disaggregated mode with 1 GPU.""" - def test_get_vllm_serve_commands_single_gpu(self, vllm_cmd_gen_strategy: VllmSlurmCommandGenStrategy) -> None: - cmd_args = vllm_cmd_gen_strategy.test_run.test.cmd_args - - commands = vllm_cmd_gen_strategy.get_serve_commands() - - assert len(commands) == 1 - assert commands[0] == ["vllm", "serve", cmd_args.model, "--host", cmd_args.host, "--port", str(cmd_args.port)] - def test_get_vllm_serve_commands_convert_boolean_flags( self, vllm: VllmTestDefinition, vllm_tr: TestRun, slurm_system: SlurmSystem ) -> None: @@ -305,52 +274,6 @@ def test_generate_wait_for_health_function(self, vllm_cmd_gen_strategy: VllmSlur assert func == expected - def test_gen_srun_command_full_flow(self, vllm_cmd_gen_strategy: VllmSlurmCommandGenStrategy) -> None: - tdef = vllm_cmd_gen_strategy.test_run.test - cmd_args = tdef.cmd_args - output_path = vllm_cmd_gen_strategy.test_run.output_path.absolute() - srun_prefix = " ".join(vllm_cmd_gen_strategy.gen_srun_prefix()) - serve_cmd = " ".join(vllm_cmd_gen_strategy.get_serve_commands()[0]) - bench_cmd = " ".join(vllm_cmd_gen_strategy.get_bench_command()) - health_func = vllm_cmd_gen_strategy.generate_wait_for_health_function() - - srun_command = vllm_cmd_gen_strategy._gen_srun_command() - - expected = f"""\ -cleanup() {{ - echo "Cleaning up PIDs: SERVE_PID=$SERVE_PID" - kill -TERM "$SERVE_PID" 2>/dev/null - i=0 - while kill -0 "$SERVE_PID" 2>/dev/null; do - [ "$i" -ge 15 ] && echo "PID did not exit in time" && return 1 - sleep 1 - i=$((i+1)) - done -}} -trap cleanup EXIT - -{health_func} - -echo "Starting vLLM instances..." -{srun_prefix} --overlap --ntasks-per-node=1 --ntasks=1 \\ - --output={output_path}/{VLLM_SERVE_LOG_FILE} \\ - {serve_cmd} & -SERVE_PID=$! - -NODE=$(scontrol show hostname $SLURM_JOB_NODELIST | head -n 1) -echo "Waiting for vLLM on $NODE to be ready..." -wait_for_health "http://${{NODE}}:{cmd_args.port}/healthcheck" || exit 1 - -echo "Running benchmark..." -{srun_prefix} --overlap --ntasks-per-node=1 --ntasks=1 \\ - --output={output_path}/{VLLM_BENCH_LOG_FILE} \\ - {bench_cmd} - -cleanup -""" - - assert srun_command == expected - def test_custom_bash_string_wraps_aggregated_serve_and_benchmark( self, vllm_cmd_gen_strategy: VllmSlurmCommandGenStrategy ) -> None: @@ -375,170 +298,52 @@ def test_custom_bash_regex_can_target_only_aggregated_benchmark( assert "echo bench setup; exec vllm bench serve" in srun_command assert "echo bench setup; exec vllm serve" not in srun_command + def test_custom_healthcheck_endpoints( + self, vllm: VllmTestDefinition, vllm_tr: TestRun, slurm_system: SlurmSystem + ) -> None: + vllm.cmd_args.healthcheck = "/ready" + vllm_tr.test = vllm + aggregated = VllmSlurmCommandGenStrategy(slurm_system, vllm_tr)._gen_srun_command() + assert 'wait_for_health "http://${NODE}:8000/ready"' in aggregated + assert 'wait_for_health "http://${NODE}:8000/ready" "http://${NODE}:8000/healthcheck"' not in aggregated + + vllm.cmd_args.prefill = VllmArgs() + vllm.cmd_args.proxy_healthcheck = "/router-ready" + vllm_tr.num_nodes = 2 + disaggregated = VllmSlurmCommandGenStrategy(slurm_system, vllm_tr)._gen_srun_command() + assert 'wait_for_health "http://${PREFILL_NODE}:8100/health"' in disaggregated + assert 'wait_for_health "http://${DECODE_NODE}:8200/health"' in disaggregated + assert 'wait_for_health "http://${PREFILL_NODE}:8100/ready"' not in disaggregated + assert 'wait_for_health "http://${DECODE_NODE}:8200/ready"' not in disaggregated + assert 'wait_for_health "http://${PREFILL_NODE}:8000/router-ready"' in disaggregated + assert ( + 'wait_for_health "http://${PREFILL_NODE}:8000/router-ready" "http://${PREFILL_NODE}:8000/ready"' + not in disaggregated + ) -class TestVllmDisaggregatedMode: - """Tests for vLLM disaggregated mode with multiple GPUs.""" - - def test_prefill_gpu_ids(self, vllm_disagg_tr: TestRun, slurm_system: SlurmSystem) -> None: - """Prefill gets first half of GPUs.""" - strategy = VllmSlurmCommandGenStrategy(slurm_system, vllm_disagg_tr) - assert strategy.prefill_gpu_ids == [0, 1] - - def test_decode_gpu_ids(self, vllm_disagg_tr: TestRun, slurm_system: SlurmSystem) -> None: - """Decode gets second half of GPUs.""" - strategy = VllmSlurmCommandGenStrategy(slurm_system, vllm_disagg_tr) - assert strategy.decode_gpu_ids == [2, 3] - - def test_get_vllm_serve_commands_returns_two(self, vllm_disagg_tr: TestRun, slurm_system: SlurmSystem) -> None: - """Disagg mode returns prefill and decode commands.""" - strategy = VllmSlurmCommandGenStrategy(slurm_system, vllm_disagg_tr) - cmd_args = vllm_disagg_tr.test.cmd_args - - commands = strategy.get_serve_commands() - - assert len(commands) == 2 - prefill_cmd, decode_cmd = commands - - assert prefill_cmd == [ - "vllm", - "serve", - cmd_args.model, - "--host", - cmd_args.host, - "--port", - str(cmd_args.port + 100), - "--kv-transfer-config", - '\'{"kv_connector":"NixlConnector","kv_role":"kv_producer"}\'', - ] - assert decode_cmd == [ - "vllm", - "serve", - cmd_args.model, - "--host", - cmd_args.host, - "--port", - str(cmd_args.port + 200), - "--kv-transfer-config", - '\'{"kv_connector":"NixlConnector","kv_role":"kv_consumer"}\'', - ] - - def test_get_helper_command(self, vllm_disagg_tr: TestRun, slurm_system: SlurmSystem) -> None: - """Helper command routes to prefill and decode ports.""" - strategy = VllmSlurmCommandGenStrategy(slurm_system, vllm_disagg_tr) - cmd_args = vllm_disagg_tr.test.cmd_args - - command = strategy.get_helper_command() - - assert command == [ - "python3", - cmd_args.proxy_script, - "--host", - cmd_args.host, - "--port", - str(cmd_args.port), - "--prefiller-hosts", - "${PREFILL_NODE}", - "--prefiller-ports", - str(cmd_args.port + 100), - "--decoder-hosts", - "${DECODE_NODE}", - "--decoder-ports", - str(cmd_args.port + 200), - ] + vllm.cmd_args.serve_healthcheck = "/serve-ready" + disaggregated = VllmSlurmCommandGenStrategy(slurm_system, vllm_tr)._gen_srun_command() + assert 'wait_for_health "http://${PREFILL_NODE}:8100/serve-ready"' in disaggregated + assert 'wait_for_health "http://${DECODE_NODE}:8200/serve-ready"' in disaggregated + assert 'wait_for_health "http://${PREFILL_NODE}:8000/router-ready"' in disaggregated - def test_gen_srun_command_disagg_flow(self, vllm_disagg_tr: TestRun, slurm_system: SlurmSystem) -> None: - """Disagg mode starts prefill, decode, and helper, waits for health checks.""" - strategy = VllmSlurmCommandGenStrategy(slurm_system, vllm_disagg_tr) - cmd_args = vllm_disagg_tr.test.cmd_args - output_path = vllm_disagg_tr.output_path.absolute() - srun_prefix = " ".join(strategy.gen_srun_prefix()) - prefill_cmd, decode_cmd = strategy.get_serve_commands() - helper_cmd = strategy.get_helper_command() - bench_cmd = " ".join(strategy.get_bench_command()) - health_func = strategy.generate_wait_for_health_function() - prefill_gpus = ",".join(str(g) for g in strategy.prefill_gpu_ids) - decode_gpus = ",".join(str(g) for g in strategy.decode_gpu_ids) - prefill_env = ( - f'env CUDA_VISIBLE_DEVICES="{prefill_gpus}" ' - 'VLLM_NIXL_SIDE_CHANNEL_HOST="${PREFILL_NODE}" ' - 'VLLM_NIXL_SIDE_CHANNEL_PORT="$PREFILL_NIXL_PORT"' - ) - decode_env = ( - f'env CUDA_VISIBLE_DEVICES="{decode_gpus}" ' - 'VLLM_NIXL_SIDE_CHANNEL_HOST="${DECODE_NODE}" ' - 'VLLM_NIXL_SIDE_CHANNEL_PORT="$DECODE_NIXL_PORT"' - ) + def test_disagg_custom_healthcheck_preserves_legacy_proxy_endpoint( + self, vllm: VllmTestDefinition, vllm_tr: TestRun, slurm_system: SlurmSystem + ) -> None: + vllm.cmd_args.healthcheck = "/legacy-ready" + vllm.cmd_args.prefill = VllmArgs() + vllm_tr.test = vllm + vllm_tr.num_nodes = 2 - srun_command = strategy._gen_srun_command() + disaggregated = VllmSlurmCommandGenStrategy(slurm_system, vllm_tr)._gen_srun_command() - expected = f"""\ -cleanup() {{ - echo "Cleaning up PIDs: PREFILL_PID=$PREFILL_PID DECODE_PID=$DECODE_PID HELPER_PID=$HELPER_PID" + assert 'wait_for_health "http://${PREFILL_NODE}:8100/health"' in disaggregated + assert 'wait_for_health "http://${DECODE_NODE}:8200/health"' in disaggregated + assert 'wait_for_health "http://${PREFILL_NODE}:8000/legacy-ready"' in disaggregated - for pid in "$PREFILL_PID" "$DECODE_PID" "$HELPER_PID"; do - [ -n "$pid" ] && kill -TERM "$pid" 2>/dev/null - done - for pid in "$PREFILL_PID" "$DECODE_PID" "$HELPER_PID"; do - [ -z "$pid" ] && continue - i=0 - while kill -0 "$pid" 2>/dev/null; do - [ "$i" -ge 15 ] && echo "PID $pid did not exit in time" && return 1 - sleep 1 - i=$((i+1)) - done - done -}} -trap cleanup EXIT - -{health_func} - -export PORT_OFFSET=$((SLURM_JOB_ID % 1000)) -export PREFILL_NIXL_PORT=$((5557 + PORT_OFFSET)) -export DECODE_NIXL_PORT=$((5557 + PORT_OFFSET + {len(strategy.gpu_ids)})) - -NODES=( $(scontrol show hostname $SLURM_JOB_NODELIST) ) -export PREFILL_NODE=${{NODES[0]}} -export DECODE_NODE=${{NODES[1]:-${{PREFILL_NODE}}}} -if [ -z "$PREFILL_NODE" ]; then - echo "Failed to resolve allocated nodes for disaggregated vLLM" - exit 1 -fi -echo "Node roles: prefill=$PREFILL_NODE decode=$DECODE_NODE" - -echo "Starting vLLM instances..." -{srun_prefix} --overlap --ntasks-per-node=1 --ntasks=1 \\ - --output={output_path}/vllm-prefill.log \\ - {prefill_env} {" ".join(prefill_cmd)} & -PREFILL_PID=$! - -{srun_prefix} --overlap --ntasks-per-node=1 --ntasks=1 \\ - --output={output_path}/vllm-decode.log \\ - {decode_env} {" ".join(decode_cmd)} & -DECODE_PID=$! - -echo "Waiting for vLLM on $PREFILL_NODE and $DECODE_NODE to be ready..." -wait_for_health "http://${{PREFILL_NODE}}:{cmd_args.port + 100}/health" || exit 1 -wait_for_health "http://${{DECODE_NODE}}:{cmd_args.port + 200}/health" || exit 1 - -echo "Starting router..." -{srun_prefix} --overlap --ntasks-per-node=1 --ntasks=1 \\ - --output={output_path}/vllm-router.log \\ - {" ".join(helper_cmd)} & -HELPER_PID=$! - -echo "Waiting for vLLM on $PREFILL_NODE server to be ready..." -wait_for_health "http://${{PREFILL_NODE}}:{cmd_args.port}/healthcheck" || exit 1 - -echo "Running benchmark..." -{srun_prefix} --overlap --ntasks-per-node=1 --ntasks=1 \\ - --output={output_path}/{VLLM_BENCH_LOG_FILE} \\ - {bench_cmd} - -cleanup -""" - - assert srun_command == expected +class TestVllmDisaggregatedMode: + """Tests for vLLM disaggregated mode with multiple GPUs.""" def test_custom_bash_regex_can_target_disaggregated_commands( self, vllm_disagg_tr: TestRun, slurm_system: SlurmSystem @@ -560,34 +365,68 @@ def test_custom_bash_regex_can_target_disaggregated_commands( assert "echo router setup; exec python3" in srun_command assert "echo bench setup; exec vllm bench serve" in srun_command - def test_gen_srun_command_disagg_two_nodes_flow( - self, vllm_disagg_2node_tr: TestRun, slurm_system: SlurmSystem + def test_disagg_more_than_two_nodes_is_rejected(self, vllm_disagg_tr: TestRun, slurm_system: SlurmSystem) -> None: + vllm_disagg_tr.num_nodes = 3 + strategy = VllmSlurmCommandGenStrategy(slurm_system, vllm_disagg_tr) + + with pytest.raises(ValueError, match=r"requires both prefill\.num_nodes and decode\.num_nodes"): + _ = strategy._gen_srun_command() + + def test_gen_srun_command_disagg_four_nodes_uses_role_ray_clusters( + self, vllm_disagg_tr: TestRun, slurm_system: SlurmSystem ) -> None: - strategy = VllmSlurmCommandGenStrategy(slurm_system, vllm_disagg_2node_tr) + tdef = cast(VllmTestDefinition, vllm_disagg_tr.test) + assert tdef.cmd_args.prefill is not None + tdef.cmd_args.prefill.num_nodes = 2 + tdef.cmd_args.decode.num_nodes = 2 + vllm_disagg_tr.num_nodes = 4 + strategy = VllmSlurmCommandGenStrategy(slurm_system, vllm_disagg_tr) srun_command = strategy._gen_srun_command() - assert "PREFILL_NODE=${NODES[0]}" in srun_command - assert "DECODE_NODE=${NODES[1]:-${PREFILL_NODE}}" in srun_command - assert srun_command.count("--relative=0 -N1") == 3 - assert srun_command.count("--relative=1 -N1") == 1 - assert ( - 'env CUDA_VISIBLE_DEVICES="0,1,2,3" VLLM_NIXL_SIDE_CHANNEL_HOST="${PREFILL_NODE}" ' - 'VLLM_NIXL_SIDE_CHANNEL_PORT="$PREFILL_NIXL_PORT"' - ) in srun_command - assert ( - 'env CUDA_VISIBLE_DEVICES="0,1,2,3" VLLM_NIXL_SIDE_CHANNEL_HOST="${DECODE_NODE}" ' - 'VLLM_NIXL_SIDE_CHANNEL_PORT="$DECODE_NIXL_PORT"' - ) in srun_command - assert 'wait_for_health "http://${PREFILL_NODE}:8100/health"' in srun_command - assert 'wait_for_health "http://${DECODE_NODE}:8200/health"' in srun_command - assert "--prefiller-hosts ${PREFILL_NODE}" in srun_command - assert "--decoder-hosts ${DECODE_NODE}" in srun_command - assert "--base-url http://${PREFILL_NODE}:8000" in srun_command - - def test_disagg_more_than_two_nodes_is_rejected(self, vllm_disagg_tr: TestRun, slurm_system: SlurmSystem) -> None: - vllm_disagg_tr.num_nodes = 3 + assert "--distributed-executor-backend ray" in srun_command + assert "export PREFILL_RAY_PORT=$((6379 + PORT_OFFSET))" in srun_command + assert "export DECODE_RAY_PORT=$((7379 + PORT_OFFSET))" in srun_command + assert 'PREFILL_NODES=( "${NODES[@]:0:2}" )' in srun_command + assert 'DECODE_NODES=( "${NODES[@]:2:2}" )' in srun_command + assert "PREFILL_RAY_PID=$!" in srun_command + assert "DECODE_RAY_PID=$!" in srun_command + assert srun_command.count('sum(node["Alive"] for node in ray.nodes())') == 2 + assert "ray.init(address=" not in srun_command + assert 'env RAY_ADDRESS="${PREFILL_NODE}:${PREFILL_RAY_PORT}"' in srun_command + assert 'env RAY_ADDRESS="${DECODE_NODE}:${DECODE_RAY_PORT}"' in srun_command + + def test_ray_head_and_worker_topology_args_can_be_overridden( + self, vllm_disagg_tr: TestRun, slurm_system: SlurmSystem + ) -> None: + tdef = cast(VllmTestDefinition, vllm_disagg_tr.test) + assert tdef.cmd_args.prefill is not None + tdef.cmd_args.prefill.num_nodes = 2 + tdef.cmd_args.decode.num_nodes = 2 + tdef.cmd_args.prefill.ray_head = VllmRayStartArgs.model_validate( + { + "head": False, + "port": 9123, + "num_gpus": 4, + "dashboard_host": "0.0.0.0", + "temp_dir": "/tmp/ray with spaces", + } + ) + tdef.cmd_args.prefill.ray_worker = VllmRayStartArgs.model_validate( + { + "address": "custom-prefill-head:9123", + "block": False, + "num_gpus": 4, + } + ) + vllm_disagg_tr.num_nodes = 4 strategy = VllmSlurmCommandGenStrategy(slurm_system, vllm_disagg_tr) - with pytest.raises(ValueError, match="supports only 1 or 2 nodes"): - _ = strategy._gen_srun_command() + srun_command = strategy._gen_srun_command() + + assert ( + "ray start --port=9123 --num-gpus=4 --dashboard-host=0.0.0.0 --temp-dir='\"'\"'/tmp/ray with spaces'\"'\"'" + ) in srun_command + assert "ray start --head --port=9123" not in srun_command + assert "ray start --address=custom-prefill-head:9123 --num-gpus=4" in srun_command + assert "ray start --address=custom-prefill-head:9123 --block" not in srun_command diff --git a/tests/workloads/vllm/test_workload.py b/tests/workloads/vllm/test_workload.py index cf6265d54..6d123e56b 100644 --- a/tests/workloads/vllm/test_workload.py +++ b/tests/workloads/vllm/test_workload.py @@ -14,6 +14,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import pytest + from cloudai.core import GitRepo, TestRun from cloudai.systems.slurm import SlurmSystem from cloudai.workloads.vllm import VllmArgs, VllmCmdArgs, VllmTestDefinition @@ -118,3 +120,102 @@ def test_constraint_check_uses_all_node_gpus_per_role_for_two_node_disagg(tmp_pa slurm_system.gpus_per_node = 4 assert tdef.constraint_check(tr, slurm_system) is True + + +def test_constraint_check_uses_all_allocated_gpus_for_multinode_aggregated(tmp_path, slurm_system: SlurmSystem) -> None: + tdef = VllmTestDefinition( + name="test", + description="test", + test_template_name="vllm", + cmd_args=VllmCmdArgs( + docker_image_url="test_url", + decode=VllmArgs.model_validate({"tensor_parallel_size": 8}), + ), + ) + tr = TestRun(name="vllm", test=tdef, num_nodes=2, nodes=[], output_path=tmp_path) + slurm_system.gpus_per_node = 4 + + assert tdef.constraint_check(tr, slurm_system) is True + + +def test_constraint_check_rejects_explicit_disagg_role_nodes_that_do_not_match_two_node_allocation( + tmp_path, slurm_system: SlurmSystem +) -> None: + tdef = VllmTestDefinition( + name="test", + description="test", + test_template_name="vllm", + cmd_args=VllmCmdArgs( + docker_image_url="test_url", + prefill=VllmArgs.model_validate({"num_nodes": 2}), + decode=VllmArgs.model_validate({"num_nodes": 1}), + ), + ) + tr = TestRun(name="vllm", test=tdef, num_nodes=2, nodes=[], output_path=tmp_path) + slurm_system.gpus_per_node = 4 + + assert tdef.constraint_check(tr, slurm_system) is False + + +def test_constraint_check_allows_explicit_single_node_disagg_role_nodes(tmp_path, slurm_system: SlurmSystem) -> None: + tdef = VllmTestDefinition( + name="test", + description="test", + test_template_name="vllm", + cmd_args=VllmCmdArgs( + docker_image_url="test_url", + prefill=VllmArgs.model_validate({"num_nodes": 1, "gpu_ids": "0,1", "tensor_parallel_size": 2}), + decode=VllmArgs.model_validate({"num_nodes": 1, "gpu_ids": "2,3", "tensor_parallel_size": 2}), + ), + ) + tr = TestRun(name="vllm", test=tdef, num_nodes=1, nodes=[], output_path=tmp_path) + slurm_system.gpus_per_node = 4 + + assert tdef.constraint_check(tr, slurm_system) is True + + +def test_constraint_check_uses_role_nodes_for_multinode_disagg(tmp_path, slurm_system: SlurmSystem) -> None: + tdef = VllmTestDefinition( + name="test", + description="test", + test_template_name="vllm", + cmd_args=VllmCmdArgs( + docker_image_url="test_url", + prefill=VllmArgs.model_validate({"num_nodes": 2, "tensor_parallel_size": 8}), + decode=VllmArgs.model_validate({"num_nodes": 2, "tensor_parallel_size": 8}), + ), + ) + tr = TestRun(name="vllm", test=tdef, num_nodes=4, nodes=[], output_path=tmp_path) + slurm_system.gpus_per_node = 4 + + assert tdef.constraint_check(tr, slurm_system) is True + + +@pytest.mark.parametrize( + ("prefill_nodes", "decode_nodes"), + [ + (None, 2), + (2, None), + (1, 1), + ], +) +def test_constraint_check_rejects_invalid_multinode_disagg_role_nodes( + prefill_nodes: int | None, + decode_nodes: int | None, + tmp_path, + slurm_system: SlurmSystem, +) -> None: + tdef = VllmTestDefinition( + name="test", + description="test", + test_template_name="vllm", + cmd_args=VllmCmdArgs( + docker_image_url="test_url", + prefill=VllmArgs.model_validate({"num_nodes": prefill_nodes}), + decode=VllmArgs.model_validate({"num_nodes": decode_nodes}), + ), + ) + tr = TestRun(name="vllm", test=tdef, num_nodes=4, nodes=[], output_path=tmp_path) + slurm_system.gpus_per_node = 4 + + assert tdef.constraint_check(tr, slurm_system) is False