diff --git a/conf/experimental/ai_dynamo/test/sglang.toml b/conf/experimental/ai_dynamo/test/sglang.toml index 67fc999f0..34bc9cbff 100644 --- a/conf/experimental/ai_dynamo/test/sglang.toml +++ b/conf/experimental/ai_dynamo/test/sglang.toml @@ -18,6 +18,7 @@ name = "sglang" description = "sglang backend" test_template_name = "AIDynamo" extra_container_mounts = ["/run/udev:/run/udev"] +dse_excluded_args = ["cmd_args.aiperf_phases"] [cmd_args] docker_image_url = "nvcr.io/nvidia/ai-dynamo/sglang-runtime:1.1.1" @@ -88,6 +89,20 @@ workloads = "aiperf.sh" request-count = 50 synthetic-input-tokens-mean = 300 + [[cmd_args.aiperf_phases]] + name = "round_1" + + [cmd_args.aiperf_phases.args] + concurrency = 2 + request-count = 50 + + [[cmd_args.aiperf_phases]] + name = "round_2" + + [cmd_args.aiperf_phases.args] + concurrency = 4 + request-count = 50 + [cmd_args.aiperf_accuracy] entrypoint = "aiperf profile" setup-cmd = "python -m pip install --break-system-packages --ignore-installed blinker==1.9.0 && python -m pip install --break-system-packages --upgrade aiperf==0.8.0" diff --git a/conf/experimental/ai_dynamo/test/vllm.toml b/conf/experimental/ai_dynamo/test/vllm.toml index 85f7d353f..ea2a4552c 100644 --- a/conf/experimental/ai_dynamo/test/vllm.toml +++ b/conf/experimental/ai_dynamo/test/vllm.toml @@ -18,6 +18,7 @@ name = "vLLM" description = "vLLM backend" test_template_name = "AIDynamo" extra_container_mounts = ["/run/udev:/run/udev"] +dse_excluded_args = ["cmd_args.aiperf_phases"] [cmd_args] docker_image_url = "nvcr.io/nvidia/ai-dynamo/vllm-runtime:1.1.1" @@ -73,13 +74,33 @@ workloads = "aiperf.sh" concurrency = 2 [cmd_args.aiperf] + health-check-between-phases = true + continue-on-phase-failure = false [cmd_args.aiperf.args] concurrency = 2 + endpoint-type = "chat" extra-inputs = '{"min_tokens":10}' output-tokens-mean = 500 request-count = 50 + server-metrics = "auto" + streaming = true synthetic-input-tokens-mean = 300 + [[cmd_args.aiperf_phases]] + name = "round_1" + + [cmd_args.aiperf_phases.args] + concurrency = 2 + request-count = 50 + + [[cmd_args.aiperf_phases]] + name = "round_2" + + [cmd_args.aiperf_phases.args] + concurrency = 4 + request-count = 50 + streaming = false + [cmd_args.aiperf_accuracy] entrypoint = "aiperf profile" setup-cmd = "python -m pip install --break-system-packages --upgrade aiperf==0.8.0" diff --git a/conf/experimental/ai_dynamo/test_scenario/vllm_lmcache.toml b/conf/experimental/ai_dynamo/test_scenario/vllm_lmcache.toml index 564311240..c63319b4e 100644 --- a/conf/experimental/ai_dynamo/test_scenario/vllm_lmcache.toml +++ b/conf/experimental/ai_dynamo/test_scenario/vllm_lmcache.toml @@ -24,7 +24,10 @@ description = "Self-contained AIDynamo scenario wiring vLLM disaggregated infere test_template_name = "AIDynamo" time_limit = "00:10:00" extra_container_mounts = ["/run/udev:/run/udev"] -dse_excluded_args = ["cmd_args.lmcache.lmcache_worker_ports"] +dse_excluded_args = [ + "cmd_args.lmcache.lmcache_worker_ports", + "cmd_args.aiperf_phases", +] [Tests.cmd_args] docker_image_url = "nvcr.io/nvidia/ai-dynamo/vllm-runtime:1.1.1" @@ -90,6 +93,20 @@ dse_excluded_args = ["cmd_args.lmcache.lmcache_worker_ports"] request-count = 50 synthetic-input-tokens-mean = 300 + [[Tests.cmd_args.aiperf_phases]] + name = "round_1" + + [Tests.cmd_args.aiperf_phases.args] + concurrency = 2 + request-count = 50 + + [[Tests.cmd_args.aiperf_phases]] + name = "round_2" + + [Tests.cmd_args.aiperf_phases.args] + concurrency = 4 + request-count = 50 + [Tests.cmd_args.aiperf_accuracy] entrypoint = "aiperf profile" setup-cmd = "python -m pip install --break-system-packages --upgrade aiperf==0.8.0" diff --git a/conf/experimental/ai_dynamo/test_scenario/vllm_slurm.toml b/conf/experimental/ai_dynamo/test_scenario/vllm_slurm.toml index 45031da3a..decfead3d 100644 --- a/conf/experimental/ai_dynamo/test_scenario/vllm_slurm.toml +++ b/conf/experimental/ai_dynamo/test_scenario/vllm_slurm.toml @@ -36,12 +36,29 @@ time_limit = "00:10:00" tensor-parallel-size = 4 pipeline-parallel-size = 1 + [[Tests.cmd_args.aiperf_phases]] + name = "round_1" + [Tests.cmd_args.aiperf_phases.args] + concurrency = 2 + request-count = 50 + server-metrics = "auto" + + [[Tests.cmd_args.aiperf_phases]] + name = "round_2" + [Tests.cmd_args.aiperf_phases.args] + concurrency = 4 + request-count = 50 + [[Tests]] id = "test.disagg.multinode" test_name = "vLLM" time_limit = "00:10:00" [Tests.cmd_args] + [Tests.cmd_args.dynamo.dcgm_exporter] + enabled = true + docker-image-url = "nvcr.io/nvidia/k8s/dcgm-exporter:4.5.2-4.8.1-distroless" + [Tests.cmd_args.dynamo.prefill_worker] num-nodes = 2 [Tests.cmd_args.dynamo.prefill_worker.args] @@ -53,3 +70,17 @@ time_limit = "00:10:00" [Tests.cmd_args.dynamo.decode_worker.args] tensor-parallel-size = 4 pipeline-parallel-size = 1 + + [[Tests.cmd_args.aiperf_phases]] + name = "round_1" + [Tests.cmd_args.aiperf_phases.args] + concurrency = 4 + request-count = 50 + server-metrics = "auto" + + [[Tests.cmd_args.aiperf_phases]] + name = "round_2" + [Tests.cmd_args.aiperf_phases.args] + concurrency = 8 + request-count = 50 + server-metrics = "auto" diff --git a/src/cloudai/workloads/ai_dynamo/__init__.py b/src/cloudai/workloads/ai_dynamo/__init__.py index 5e430068d..fc7e2b376 100644 --- a/src/cloudai/workloads/ai_dynamo/__init__.py +++ b/src/cloudai/workloads/ai_dynamo/__init__.py @@ -22,6 +22,8 @@ AIDynamoTestDefinition, AIPerf, AIPerfAccuracy, + AIPerfPhase, + DCGMExporter, GenAIPerf, LMCacheController, WorkerBaseArgs, @@ -42,6 +44,8 @@ "AIDynamoTestDefinition", "AIPerf", "AIPerfAccuracy", + "AIPerfPhase", + "DCGMExporter", "GenAIPerf", "LMCacheController", "WorkerBaseArgs", diff --git a/src/cloudai/workloads/ai_dynamo/ai_dynamo.py b/src/cloudai/workloads/ai_dynamo/ai_dynamo.py index 7f8da4165..b85b35d9a 100644 --- a/src/cloudai/workloads/ai_dynamo/ai_dynamo.py +++ b/src/cloudai/workloads/ai_dynamo/ai_dynamo.py @@ -140,6 +140,20 @@ class WorkerConfig(BaseModel): ) +class DCGMExporter(BaseModel): + """Optional DCGM exporter launch configuration.""" + + model_config = ConfigDict(extra="forbid", populate_by_name=True) + + enabled: bool = False + docker_image_url: str = Field( + default="nvcr.io/nvidia/k8s/dcgm-exporter:4.5.2-4.8.1-distroless", + serialization_alias="docker-image-url", + validation_alias=AliasChoices("docker-image-url", "docker_image_url", "image-url", "image_url"), + ) + port: int = 9401 + + class AIDynamoArgs(BaseModel): """Arguments for AI Dynamo setup.""" @@ -205,6 +219,7 @@ def validate_connector(cls, v: str | list[str] | None) -> str | list[str] | None serialization_alias="nats-port", validation_alias=AliasChoices("nats-port", "nats_port"), ) + dcgm_exporter: DCGMExporter = Field(default_factory=DCGMExporter) decode_worker: WorkerConfig = WorkerConfig( cmd="python3 -m dynamo.vllm", @@ -264,11 +279,62 @@ class AIPerf(Workload): serialization_alias="report-name", validation_alias=AliasChoices("report-name", "report_name"), ) + artifact_dir_name: str = Field( + default=AIPERF_ARTIFACTS_DIR, + serialization_alias="artifact-dir-name", + validation_alias=AliasChoices("artifact-dir-name", "artifact_dir_name"), + ) + health_check_between_phases: bool = Field( + default=True, + serialization_alias="health-check-between-phases", + validation_alias=AliasChoices("health-check-between-phases", "health_check_between_phases"), + ) + continue_on_phase_failure: bool = Field( + default=False, + serialization_alias="continue-on-phase-failure", + validation_alias=AliasChoices("continue-on-phase-failure", "continue_on_phase_failure"), + ) @property def installables(self) -> list[Installable]: return [self.script] + @model_validator(mode="after") + def validate_extra_args(self) -> "AIPerf": + if isinstance(self.extra_args, list): + raise ValueError("AIPerf extra_args must be a string with explicit CLI syntax") + return self + + +class AIPerfPhase(BaseModel): + """Named AIPerf phase that overrides the base AIPerf configuration.""" + + model_config = ConfigDict(extra="allow", populate_by_name=True) + + name: str = Field(..., min_length=1, pattern=r"^[A-Za-z0-9_.-]+$") + cmd: str | None = None + setup_cmd: str | None = Field( + default=None, + serialization_alias="setup-cmd", + validation_alias=AliasChoices("setup-cmd", "setup_cmd"), + ) + report_name: str | None = Field( + default=None, + serialization_alias="report-name", + validation_alias=AliasChoices("report-name", "report_name"), + ) + artifact_dir_name: str | None = Field( + default=None, + serialization_alias="artifact-dir-name", + validation_alias=AliasChoices("artifact-dir-name", "artifact_dir_name"), + ) + args: Args = Field(default_factory=Args) + extra_args: str | None = Field( + default=None, + serialization_alias="extra-args", + validation_alias=AliasChoices("extra-args", "extra_args"), + ) + class AIPerfAccuracy(BaseModel): """Optional accuracy benchmark configuration.""" @@ -324,6 +390,7 @@ class AIDynamoCmdArgs(CmdArgs): lmcache_controller: LMCacheController | None = None genai_perf: GenAIPerf = Field(default_factory=GenAIPerf) aiperf: AIPerf = Field(default_factory=AIPerf) + aiperf_phases: list[AIPerfPhase] | None = None aiperf_accuracy: AIPerfAccuracy | None = None workloads: str = "genai_perf.sh" @@ -341,6 +408,23 @@ def validate_workloads(cls, v: str) -> str: def workloads_list(self) -> list[str]: return [w.strip() for w in self.workloads.split(",")] + @model_validator(mode="after") + def validate_aiperf_phases(self) -> "AIDynamoCmdArgs": + """Validate AIPerf phases.""" + if not self.aiperf_phases: + return self + + seen = set() + duplicates = set() + for phase in self.aiperf_phases: + if phase.name in seen: + duplicates.add(phase.name) + seen.add(phase.name) + if duplicates: + raise ValueError(f"AIPerf phase names must be unique. Duplicates: {sorted(duplicates)}") + + return self + @property def installables(self) -> list[Installable]: return [ @@ -356,6 +440,7 @@ class AIDynamoTestDefinition(TestDefinition): model_config = ConfigDict(extra="forbid") cmd_args: AIDynamoCmdArgs _docker_image: Optional[DockerImage] = None + _dcgm_exporter_image: Optional[DockerImage] = None script: File = File(Path(__file__).parent.parent / "ai_dynamo/ai_dynamo.sh") repo: GitRepo = GitRepo( url="https://github.com/ai-dynamo/dynamo.git", commit="f7e468c7e8ff0d1426db987564e60572167e8464" @@ -389,6 +474,16 @@ def docker_image(self) -> DockerImage: self._docker_image = DockerImage(url=self.cmd_args.docker_image_url) return self._docker_image + @property + def dcgm_exporter_image(self) -> DockerImage | None: + if not self.cmd_args.dynamo.dcgm_exporter.enabled: + return None + + image_url = self.cmd_args.dynamo.dcgm_exporter.docker_image_url + if not self._dcgm_exporter_image or self._dcgm_exporter_image.url != image_url: + self._dcgm_exporter_image = DockerImage(url=image_url) + return self._dcgm_exporter_image + @property def hf_model(self) -> HFModel: if not self._hf_model: @@ -399,13 +494,16 @@ def hf_model(self) -> HFModel: @property def installables(self) -> list[Installable]: """Get all installables for this test definition.""" - return [ + installables = [ self.docker_image, self.repo, self.script, self.hf_model, *self.cmd_args.installables, ] + if self.dcgm_exporter_image: + installables.append(self.dcgm_exporter_image) + return installables def _has_aiperf_accuracy_results(self, output_path: Path) -> bool: accuracy = parse_aiperf_accuracy(output_path) diff --git a/src/cloudai/workloads/ai_dynamo/ai_dynamo.sh b/src/cloudai/workloads/ai_dynamo/ai_dynamo.sh index 52e975850..5697a78ea 100644 --- a/src/cloudai/workloads/ai_dynamo/ai_dynamo.sh +++ b/src/cloudai/workloads/ai_dynamo/ai_dynamo.sh @@ -60,6 +60,8 @@ dynamo_args["worker-error-pattern"]="zmq.error.ZMQError:.Address.already.in.use| dynamo_args["sgl-http-port"]=9001 dynamo_args["prefill-port"]=30011 dynamo_args["decode-port"]=30021 +dynamo_args["dcgm-exporter-enabled"]="False" +dynamo_args["dcgm-exporter-port"]=9401 function log() { @@ -892,6 +894,39 @@ _query_frontend() { curl -s -X POST "${dynamo_args["url"]}/v1/chat/completions" -H "Content-Type: application/json" -d @$RESULTS_DIR/curl_cmd.json } +_resolve_aiperf_server_metrics_urls() { + local urls="http://${dynamo_args["frontend-node"]}:${dynamo_args["port"]}/metrics" + local base_system_port=${DYN_SYSTEM_PORT:-9090} + local decode_workers_per_node=${decode_config["workers-per-node"]:-1} + local prefill_workers_per_node=${prefill_config["workers-per-node"]:-1} + local IFS_SAVE="$IFS" + local node i + + IFS=',' + for node in ${prefill_config["node-list"]:-}; do + for i in $(seq 0 $(( prefill_workers_per_node - 1 ))); do + urls="${urls},http://${node}:$((base_system_port + i))/metrics" + done + done + + for node in ${decode_config["node-list"]:-}; do + for i in $(seq 0 $(( decode_workers_per_node - 1 ))); do + urls="${urls},http://${node}:$((base_system_port + i))/metrics" + done + done + + if [[ "${dynamo_args["dcgm-exporter-enabled"]}" == "True" || "${dynamo_args["dcgm-exporter-enabled"]}" == "true" ]]; then + local dcgm_nodes="${decode_config["node-list"]:-},${prefill_config["node-list"]:-}" + for node in $dcgm_nodes; do + [[ -z "$node" ]] && continue + urls="${urls},http://${node}:${dynamo_args["dcgm-exporter-port"]}/metrics" + done + fi + IFS="$IFS_SAVE" + + echo "$urls" +} + function setup_cufile() { export CUFILE_ENV_PATH_JSON="$RESULTS_DIR/cufile.json" @@ -1058,6 +1093,11 @@ function launch_workload() local workload_name="${workload_config_ref["--name"]}" local script="${workload_config_ref["--script"]}" + export FRONTEND_URL="${dynamo_args["url"]}" + export AIPERF_MODEL="${dynamo_args["model"]}" + export AIPERF_ENDPOINT="${dynamo_args["endpoint"]}" + export AIPERF_FAILURE_MARKER="${FATAL_ERROR_MARKER}" + export AIPERF_SERVER_METRICS_URLS="$(_resolve_aiperf_server_metrics_urls)" # Build config and workload args as proper bash arrays to preserve # multi-word values (e.g. --cmd "genai-perf profile") through word splitting. diff --git a/src/cloudai/workloads/ai_dynamo/aiperf.sh b/src/cloudai/workloads/ai_dynamo/aiperf.sh index 15cee3a58..22eb0541c 100644 --- a/src/cloudai/workloads/ai_dynamo/aiperf.sh +++ b/src/cloudai/workloads/ai_dynamo/aiperf.sh @@ -2,182 +2,9 @@ # SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES # Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# aiperf.sh — aiperf profile wrapper for ai_dynamo workloads. -# -# Called from ai_dynamo.sh's launch_workload() with: -# bash aiperf.sh --result-dir --model --url --port -# [--cmd ] [--report-name ] [--artifact-dir-name ] [--extra-args ] -# -- ... -# -# Context flags (before --) that are recognised and used: -# --result-dir Directory where artifacts and the final report are written. -# --model HuggingFace model identifier (e.g. Qwen/Qwen3-0.6B). -# --url Base URL of the dynamo.frontend (e.g. http://node01). -# --port HTTP port the dynamo.frontend is listening on. -# --report-name Output CSV name (default: aiperf_report.csv). -# --artifact-dir-name Artifact directory name under --result-dir (default: aiperf_artifacts). -# --cmd Full launch command including subcommand (default: "aiperf profile"). -# --setup-cmd Optional shell command run before launching aiperf. -# --extra-args Raw string appended verbatim after all other flags. -# -# All unrecognised flags (--install-dir, --gpus-per-node, etc.) are silently -# consumed so this script is forward-compatible with launch_workload additions. -# -# Everything after -- is passed directly to the aiperf profile invocation. set -Eeuo pipefail -result_dir="" -model="" -url="http://localhost" -port=8000 -report_name="aiperf_report.csv" -artifact_dir_name="aiperf_artifacts" -cmd="aiperf profile" -setup_cmd="" -declare -a extra_args=() -declare -a profile_args=() - -log() { - echo "[$(date '+%F %T') $(hostname)]: $*" -} - -_parse_aiperf_args() { - while [[ $# -ge 2 ]]; do - case "$1" in - --*) profile_args+=("$1" "$2"); shift 2 ;; - *) shift ;; - esac - done - # Capture a trailing lone boolean flag if present. - # Use if/fi — not [[ ]] && — so set -e does not trigger on a false condition. - if [[ $# -eq 1 && "$1" == --* ]]; then - profile_args+=("$1") - fi -} - -process_args() { - while [[ $# -gt 0 ]]; do - case "$1" in - --result-dir) result_dir="$2"; shift 2 ;; - --model) model="$2"; shift 2 ;; - --url) url="$2"; shift 2 ;; - --port) port="$2"; shift 2 ;; - --report-name) report_name="$2"; shift 2 ;; - --artifact-dir-name) artifact_dir_name="$2"; shift 2 ;; - --cmd) cmd="$2"; shift 2 ;; - --setup-cmd) setup_cmd="$2"; shift 2 ;; - --extra-args) read -ra extra_args <<< "$2"; shift 2 ;; - --) shift; _parse_aiperf_args "$@"; break ;; - --*) if [[ -n "${2:-}" && "${2}" != -* ]]; then shift 2; else shift 1; fi ;; # consume unknown flag; shift 2 only if next arg is a value - *) shift ;; - esac - done - - log "Parsed args: - result_dir: $result_dir - model: $model - url: $url - port: $port - report_name: $report_name - artifact_dir: $artifact_dir_name - cmd: $cmd - setup_cmd: ${setup_cmd:-} - extra_args: ${extra_args[*]:-} - profile_args: ${profile_args[*]:-}" -} - -run_setup_cmd() { - if [[ -z "$setup_cmd" ]]; then - return - fi - - log "Running AIPerf setup command: $setup_cmd" - bash -lc "$setup_cmd" - log "AIPerf setup command complete" -} - -process_results() { - local artifact_dir="$result_dir/$artifact_dir_name" - local csv_path="" - - if [[ -f "$artifact_dir/profile_export_aiperf.csv" ]]; then - csv_path="$artifact_dir/profile_export_aiperf.csv" - else - csv_path=$(find "$artifact_dir" -name "*aiperf*.csv" -print -quit 2>/dev/null || true) - fi - - if [[ -n "$csv_path" ]]; then - cp "$csv_path" "$result_dir/$report_name" - log "aiperf report saved to $result_dir/$report_name" - else - log "ERROR: no CSV found in $artifact_dir — aiperf may not have completed" - exit 1 - fi - -} - -run_aiperf() { - local full_url="$1" - local artifact_dir="$2" - local -a run_cmd=() - read -ra run_cmd <<< "$cmd" - local -a launch_cmd=( - "${run_cmd[@]}" - --model "$model" - --url "$full_url" - --endpoint-type chat - --streaming - --artifact-dir "$artifact_dir" - --no-server-metrics - ) - - log "Launching aiperf: ${run_cmd[*]} --model $model --url $full_url" - - if [[ "${#profile_args[@]}" -gt 0 ]]; then - launch_cmd+=("${profile_args[@]}") - fi - if [[ "${#extra_args[@]}" -gt 0 ]]; then - launch_cmd+=("${extra_args[@]}") - fi - - "${launch_cmd[@]}" - - log "aiperf run complete" -} - -main() { - process_args "$@" - - if [[ -z "$result_dir" ]]; then - log "ERROR: --result-dir is required"; exit 1 - fi - if [[ -z "$model" ]]; then - log "ERROR: --model is required"; exit 1 - fi - - run_setup_cmd - - local full_url="${url}:${port}" - local artifact_dir="$result_dir/$artifact_dir_name" - rm -rf "$artifact_dir" - - run_aiperf "$full_url" "$artifact_dir" - process_results -} - -main "$@" -exit 0 +echo "AIPerf scripts are generated per test run by the AIDynamo Slurm command generator." >&2 +echo "This installable placeholder should be overridden by --aiperf-script /cloudai_run_results/aiperf.sh." >&2 +exit 1 diff --git a/src/cloudai/workloads/ai_dynamo/slurm_command_gen_strategy.py b/src/cloudai/workloads/ai_dynamo/slurm_command_gen_strategy.py index 861a4c469..cc3b51273 100644 --- a/src/cloudai/workloads/ai_dynamo/slurm_command_gen_strategy.py +++ b/src/cloudai/workloads/ai_dynamo/slurm_command_gen_strategy.py @@ -16,16 +16,26 @@ import logging import shlex +import textwrap from pathlib import Path -from typing import List, cast +from typing import Any, List, cast import yaml from pydantic import BaseModel, TypeAdapter, ValidationError +import cloudai.util from cloudai.core import File, GitRepo from cloudai.systems.slurm import SlurmCommandGenStrategy -from .ai_dynamo import LMCACHE_CONFIG_BACKUP_FILE_NAME, LMCACHE_CONFIG_FILE_NAME, AIDynamoTestDefinition +from .ai_dynamo import ( + LMCACHE_CONFIG_BACKUP_FILE_NAME, + LMCACHE_CONFIG_FILE_NAME, + AIDynamoTestDefinition, + AIPerf, + AIPerfPhase, +) + +AIPERF_SCRIPT_FILE_NAME = "aiperf.sh" class AIDynamoSlurmCommandGenStrategy(SlurmCommandGenStrategy): @@ -60,6 +70,19 @@ def image_path(self) -> str | None: return str(self.td.docker_image.installed_path) return None + def _gen_dcgm_srun_prefix(self, image_path: str) -> list[str]: + srun_parts = ["srun", "--export=ALL", f"--mpi={self.mpi}", f"--container-image={image_path}"] + mounts = self.container_mounts() + if mounts: + srun_parts.append(f"--container-mounts={','.join(mounts)}") + if not self.system.container_mount_home: + srun_parts.append("--no-container-mount-home") + if self.system.extra_srun_args: + srun_parts.append(self.system.extra_srun_args) + if self.test_run.extra_srun_args: + srun_parts.append(self.test_run.extra_srun_args) + return srun_parts + def _get_toml_args(self, base_model: BaseModel, prefix: str, exclude: List[str] | None = None) -> List[str]: args = [] exclude = exclude or [] @@ -109,8 +132,211 @@ def _prepare_lmcache_config(self): (self.test_run.output_path / LMCACHE_CONFIG_FILE_NAME).write_text(config) (self.test_run.output_path / LMCACHE_CONFIG_BACKUP_FILE_NAME).write_text(config) + def _render_aiperf_args(self, args: dict[str, Any]) -> str: + parts: list[str] = [] + for key, value in args.items(): + if value is None or value is False: + continue + if isinstance(value, list | dict): + raise ValueError( + f"AIPerf argument {key!r} must be a scalar value. " + "Use a string with AIPerf CLI syntax for multi-value arguments." + ) + + parts.append(f"--{key}") + if value is True: + continue + + parts.append(shlex.quote(str(value))) + return " ".join(parts) + + def _runtime_result_path(self, path: str) -> str: + if Path(path).is_absolute(): + return path + return f"{self.CONTAINER_MOUNT_OUTPUT}/{path}" + + def _aiperf_phase_args(self, resolved_phase: AIPerf, artifact_dir: str) -> dict[str, Any]: + args: dict[str, Any] = { + "model": self.td.cmd_args.dynamo.model, + "endpoint-type": "chat", + "streaming": True, + } + args.update(resolved_phase.args.model_dump(by_alias=True, exclude_none=True)) + args["artifact-dir"] = artifact_dir + + if "server-metrics" not in args and "no-server-metrics" not in args: + args["no-server-metrics"] = True + + return args + + def _render_aiperf_phase_args(self, resolved_phase: AIPerf, artifact_dir: str) -> str: + args = self._aiperf_phase_args(resolved_phase, artifact_dir) + url = args.pop("url", None) + server_metrics_auto = args.get("server-metrics") == "auto" + if server_metrics_auto: + args.pop("server-metrics") + + parts = [] + for key in ("model", "endpoint-type", "streaming"): + if key in args: + parts.append(self._render_aiperf_args({key: args.pop(key)})) + if url is None: + parts.append('--url "$FRONTEND_URL"') + else: + parts.append(self._render_aiperf_args({"url": url})) + parts.append(self._render_aiperf_args(args)) + if server_metrics_auto: + parts.append('--server-metrics "$AIPERF_SERVER_METRICS_URLS"') + return " ".join(part for part in parts if part) + + def _resolve_aiperf_phase(self, phase: AIPerfPhase) -> AIPerf: + base_data = self.td.cmd_args.aiperf.model_dump(by_alias=True, exclude_none=True) + phase_data = phase.model_dump(by_alias=True, exclude_none=True, exclude_unset=True) + single_phase = self.td.cmd_args.aiperf_phases is None or len(self.td.cmd_args.aiperf_phases) == 1 + + if "artifact-dir-name" not in phase_data and not single_phase: + phase_data["artifact-dir-name"] = f"{self.td.cmd_args.aiperf.artifact_dir_name}/{phase.name}" + if "report-name" not in phase_data and not single_phase: + phase_data["report-name"] = f"aiperf_{phase.name}_report.csv" + + return AIPerf.model_validate(cloudai.util.deep_merge(base_data, phase_data)) + + def _render_aiperf_setup_blocks(self, log_message: str, setup_cmd: str | None) -> list[str]: + if not setup_cmd: + return [] + + setup_argv = ["bash", "-lc", setup_cmd] + return [ + textwrap.dedent( + f"""\ + log {shlex.quote(f"{log_message}: {shlex.join(setup_argv)}")} + {shlex.join(setup_argv)} + """ + ).rstrip() + ] + + def _render_aiperf_script(self) -> str: + phases = self.td.cmd_args.aiperf_phases or [AIPerfPhase.model_validate({"name": "aiperf"})] + single_phase = len(phases) == 1 + blocks = [ + textwrap.dedent( + f"""\ + #!/usr/bin/env bash + set -Eeuo pipefail + + log() {{ echo "[$(date +%F\\ %T) $(hostname)]: $*"; }} + + : "${{FRONTEND_URL:?FRONTEND_URL is not set}}" + : "${{AIPERF_MODEL:={self.td.cmd_args.dynamo.model}}}" + : "${{AIPERF_ENDPOINT:={self.td.cmd_args.dynamo.endpoint}}}" + : "${{AIPERF_FAILURE_MARKER:={self.CONTAINER_MOUNT_OUTPUT}/{self.td.failure_marker}}}" + """ + ).rstrip() + ] + + blocks.extend(self._render_aiperf_setup_blocks("Running aiperf setup", self.td.cmd_args.aiperf.setup_cmd)) + + write_phase_logs = not single_phase + for idx, phase in enumerate(phases): + resolved_phase = self._resolve_aiperf_phase(phase) + artifact_dir = self._runtime_result_path(resolved_phase.artifact_dir_name) + report_source = f"{artifact_dir}/profile_export_aiperf.csv" + report_file = self._runtime_result_path(resolved_phase.report_name) + if isinstance(resolved_phase.extra_args, list): + raise ValueError("AIPerf extra_args must be a string with explicit CLI syntax") + cmd_parts = [ + shlex.join(shlex.split(resolved_phase.cmd)), + self._render_aiperf_phase_args(resolved_phase, artifact_dir), + resolved_phase.extra_args or "", + ] + cmd = " ".join(part for part in cmd_parts if part) + if write_phase_logs: + log_file = self._runtime_result_path(f"aiperf_{phase.name}.log") + run_cmd = f"{cmd} > {shlex.quote(log_file)} 2>&1" + else: + run_cmd = cmd + log_message = f"Running {phase.name}: {cmd}" + phase_setup = phase.setup_cmd if "setup_cmd" in phase.model_fields_set else None + phase_lines = self._render_aiperf_setup_blocks(f"Running AIPerf phase setup for {phase.name}", phase_setup) + phase_lines.append( + textwrap.dedent( + f"""\ + rm -rf {shlex.quote(artifact_dir)} + mkdir -p {shlex.quote(artifact_dir)} + log {shlex.quote(log_message)} + phase_status=0 + set +e + {run_cmd} + phase_status=$? + set -e + if [[ "$phase_status" -ne 0 ]]; then + log {shlex.quote(f"AIPerf phase {phase.name} failed")} + """ + ).rstrip() + ) + if not resolved_phase.continue_on_phase_failure: + phase_lines.append(' exit "$phase_status"') + phase_lines.extend( + [ + "fi", + textwrap.dedent( + f"""\ + if [[ "$phase_status" -eq 0 ]]; then + mkdir -p {shlex.quote(str(Path(report_file).parent))} + """ + ).rstrip(), + ] + ) + if report_source != report_file: + phase_lines.append(f" cp {shlex.quote(report_source)} {shlex.quote(report_file)}") + phase_lines.append(f" log {shlex.quote(f'AIPerf report saved to {report_file}')}") + + if not single_phase and idx == len(phases) - 1: + final_report_file = self._runtime_result_path("aiperf_report.csv") + phase_lines.append(f" mkdir -p {shlex.quote(str(Path(final_report_file).parent))}") + if report_file != final_report_file: + phase_lines.append(f" cp {shlex.quote(report_file)} {shlex.quote(final_report_file)}") + phase_lines.append(f" log {shlex.quote(f'Final AIPerf report saved to {final_report_file}')}") + + if not single_phase and idx < len(phases) - 1 and resolved_phase.health_check_between_phases: + health_probe_cmd = ( + ' if ! curl -fsS -X POST "${FRONTEND_URL}/${AIPERF_ENDPOINT}" ' + "-H 'Content-Type: application/json' " + '-d "{\\"model\\":\\"${AIPERF_MODEL}\\",\\"messages\\":[{\\"role\\":\\"user\\",' + '\\"content\\":\\"ping\\"}],\\"stream\\":false,\\"max_tokens\\":1}" ' + ">/dev/null; then" + ) + phase_lines.extend( + [ + ' if [[ -f "$AIPERF_FAILURE_MARKER" ]]; then', + " log 'FATAL: failure marker found between AIPerf phases'", + " exit 1", + " fi", + health_probe_cmd, + " log 'FATAL: frontend health probe failed between AIPerf phases'", + " exit 1", + " fi", + ] + ) + phase_lines.append("fi") + blocks.append("\n".join(phase_lines)) + + return "\n\n".join(blocks) + + def _prepare_aiperf_script(self) -> str | None: + if "aiperf.sh" not in self.td.cmd_args.workloads_list: + return None + + self.test_run.output_path.mkdir(parents=True, exist_ok=True) + + script_path = self.test_run.output_path / AIPERF_SCRIPT_FILE_NAME + script_path.write_text(self._render_aiperf_script() + "\n") + script_path.chmod(0o755) + return f"{self.CONTAINER_MOUNT_OUTPUT}/{AIPERF_SCRIPT_FILE_NAME}" + def _gen_script_args(self, td: AIDynamoTestDefinition) -> List[str]: self._prepare_lmcache_config() + aiperf_script = self._prepare_aiperf_script() if not td.repo.installed_path: raise ValueError("Dynamo repo is not installed") args = [ @@ -136,16 +362,25 @@ def _gen_script_args(self, td: AIDynamoTestDefinition) -> List[str]: exclude=[ "prefill_worker", "decode_worker", + "dcgm_exporter", + "dcgm-exporter", ], ) ) + if td.cmd_args.dynamo.dcgm_exporter.enabled: + args.append('--dynamo-dcgm-exporter-enabled "True"') + args.append(f'--dynamo-dcgm-exporter-port "{td.cmd_args.dynamo.dcgm_exporter.port}"') if td.cmd_args.dynamo.prefill_worker: args.extend(self._get_nested_toml_args(td.cmd_args.dynamo.prefill_worker, "--prefill-")) args.extend(self._get_nested_toml_args(td.cmd_args.dynamo.decode_worker, "--decode-")) args.extend(self._get_nested_toml_args(td.cmd_args.genai_perf, "--genai_perf-")) - args.extend(self._get_nested_toml_args(td.cmd_args.aiperf, "--aiperf-")) + if aiperf_script: + args.append(f'--aiperf-name "{td.cmd_args.aiperf.name}"') + args.append(f"--aiperf-script {aiperf_script}") + else: + args.extend(self._get_nested_toml_args(td.cmd_args.aiperf, "--aiperf-")) if td.cmd_args.aiperf_accuracy is not None: args.extend(self._get_nested_toml_args(td.cmd_args.aiperf_accuracy, "--aiperf_accuracy-")) @@ -172,6 +407,132 @@ def _gen_srun_command(self) -> str: srun_cmd.extend(self._gen_script_args(self.td)) return " \\\n ".join(srun_cmd) + "\n" + def _gen_dcgm_launcher_block(self) -> list[str]: + dcgm_image = self.td.dcgm_exporter_image + if not dcgm_image: + return [] + + num_nodes, node_list = self.get_cached_nodes_spec() + out_dir = self.test_run.output_path.absolute() + port = self.td.cmd_args.dynamo.dcgm_exporter.port + dcgm_cmd = f"DCGM_EXPORTER_LISTEN=:{port} dcgm-exporter" + dcgm_step_name = "cloudai-dcgm-exporter" + srun_parts = [ + *self._gen_dcgm_srun_prefix(str(dcgm_image.installed_path)), + "--overlap", + f"--job-name={dcgm_step_name}", + f"-N{num_nodes}", + *([] if not node_list else [f"--nodelist={','.join(node_list)}"]), + f"--ntasks={num_nodes}", + "--ntasks-per-node=1", + f"--output={out_dir / 'dcgm-node-%n-stdout.txt'}", + f"--error={out_dir / 'dcgm-node-%n-stderr.txt'}", + "bash", + "-lc", + shlex.quote(dcgm_cmd), + ] + + block = [ + "# Start DCGM exporter on each node.", + 'echo "Starting DCGM exporter..."', + " ".join(srun_parts) + " &", + "DCGM_EXPORTER_SRUN_PID=$!", + 'echo "DCGM exporter srun PID: ${DCGM_EXPORTER_SRUN_PID}"', + "DCGM_EXPORTER_STEP_ID=", + "for _ in {1..10}; do", + ' DCGM_EXPORTER_STEP_ID=$(squeue --noheader --steps --job "$SLURM_JOB_ID" ' + f'--format="%i %j" | awk \'$2 == "{dcgm_step_name}" {{ print $1; exit }}\')', + ' if [[ -n "${DCGM_EXPORTER_STEP_ID}" ]]; then break; fi', + " sleep 1", + "done", + 'echo "DCGM exporter step ID: ${DCGM_EXPORTER_STEP_ID:-unknown}"', + "function stop_dcgm_exporter()", + "{", + ' if [[ -n "${DCGM_EXPORTER_STEP_ID:-}" ]]; then', + ' scancel --signal=TERM "${DCGM_EXPORTER_STEP_ID}" 2>/dev/null || true', + " fi", + ' if [[ -n "${DCGM_EXPORTER_SRUN_PID:-}" ]]; then', + ' wait "${DCGM_EXPORTER_SRUN_PID}" 2>/dev/null || true', + " fi", + "}", + "sleep 5", + 'echo "Checking DCGM exporter metrics endpoints..."', + "DCGM_EXPORTER_STARTUP_TIMEOUT=${DCGM_EXPORTER_STARTUP_TIMEOUT:-60}", + ] + if node_list: + block.extend( + [ + "dcgm_nodes=(" + " ".join(shlex.quote(node) for node in node_list) + ")", + ] + ) + else: + block.append('mapfile -t dcgm_nodes < <(scontrol show hostnames "$SLURM_JOB_NODELIST")') + endpoints_file = shlex.quote(str(out_dir / "dcgm_endpoints.txt")) + block.extend( + [ + f": > {endpoints_file}", + "dcgm_failed=0", + 'for node in "${dcgm_nodes[@]}"; do', + f' dcgm_url="http://${{node}}:{port}/metrics"', + f' echo " ${{dcgm_url}}" >> {endpoints_file}', + " deadline=$((SECONDS + DCGM_EXPORTER_STARTUP_TIMEOUT))", + ' until curl -fsS --max-time 2 "${dcgm_url}" >/dev/null; do', + " if (( SECONDS >= deadline )); then", + ' echo "FATAL: DCGM exporter metrics endpoint is unreachable: ${dcgm_url}" >&2', + " dcgm_failed=1", + " break", + " fi", + " sleep 2", + " done", + " if (( dcgm_failed != 0 )); then break; fi", + ' echo "DCGM exporter reachable: ${dcgm_url}"', + "done", + "if (( dcgm_failed != 0 )); then", + " stop_dcgm_exporter", + " exit 1", + "fi", + "", + ] + ) + return block + + def _gen_dcgm_cleanup_command(self) -> str | None: + if not self.td.cmd_args.dynamo.dcgm_exporter.enabled: + return None + + return "stop_dcgm_exporter" + + def gen_exec_command(self) -> str: + srun_command = self._gen_srun_command() + command_list = [] + indent = "" + + if self.test_run.pre_test: + pre_test_command = self.gen_pre_test(self.test_run.pre_test, self.test_run.output_path) + command_list.extend([pre_test_command, "if [ $PRE_TEST_SUCCESS -eq 1 ]; then"]) + indent = " " + + dcgm_block = self._gen_dcgm_launcher_block() + if dcgm_block: + command_list.extend(f"{indent}{line}" for line in dcgm_block) + + command_list.append(f"{indent}{srun_command}") + + dcgm_cleanup = self._gen_dcgm_cleanup_command() + if dcgm_cleanup: + command_list.append(f"{indent}# Stop DCGM exporter when test finishes") + command_list.append(f"{indent}{dcgm_cleanup}") + + if self.test_run.post_test: + post_test_command = self.gen_post_test(self.test_run.post_test, self.test_run.output_path) + command_list.append(f"{indent}{post_test_command}") + + if self.test_run.pre_test: + command_list.append("fi") + + full_command = "\n".join(command_list).strip() + return self._write_sbatch_script(full_command) + def _validate_worker_nodes( self, node_list: list[str], worker_nodes: str | None, num_nodes: int, worker_type: str ) -> None: diff --git a/tests/ref_data/ai-dynamo-aiperf.sh b/tests/ref_data/ai-dynamo-aiperf.sh new file mode 100644 index 000000000..60798ef8b --- /dev/null +++ b/tests/ref_data/ai-dynamo-aiperf.sh @@ -0,0 +1,56 @@ +#!/usr/bin/env bash +set -Eeuo pipefail + +log() { echo "[$(date +%F\ %T) $(hostname)]: $*"; } + +: "${FRONTEND_URL:?FRONTEND_URL is not set}" +: "${AIPERF_MODEL:=model}" +: "${AIPERF_ENDPOINT:=v1/chat/completions}" +: "${AIPERF_FAILURE_MARKER:=/cloudai_run_results/failure-marker.txt}" + +rm -rf /cloudai_run_results/aiperf_artifacts/round_1 +mkdir -p /cloudai_run_results/aiperf_artifacts/round_1 +log 'Running round_1: aiperf profile --model model --endpoint-type chat --streaming --url "$FRONTEND_URL" --concurrency 1 --request-count 50 --synthetic-input-tokens-mean 300 --output-tokens-mean 500 --artifact-dir /cloudai_run_results/aiperf_artifacts/round_1 --server-metrics "$AIPERF_SERVER_METRICS_URLS" --server-metrics-formats json csv' +phase_status=0 +set +e +aiperf profile --model model --endpoint-type chat --streaming --url "$FRONTEND_URL" --concurrency 1 --request-count 50 --synthetic-input-tokens-mean 300 --output-tokens-mean 500 --artifact-dir /cloudai_run_results/aiperf_artifacts/round_1 --server-metrics "$AIPERF_SERVER_METRICS_URLS" --server-metrics-formats json csv > /cloudai_run_results/aiperf_round_1.log 2>&1 +phase_status=$? +set -e +if [[ "$phase_status" -ne 0 ]]; then + log 'AIPerf phase round_1 failed' + exit "$phase_status" +fi +if [[ "$phase_status" -eq 0 ]]; then + mkdir -p /cloudai_run_results + cp /cloudai_run_results/aiperf_artifacts/round_1/profile_export_aiperf.csv /cloudai_run_results/aiperf_round_1_report.csv + log 'AIPerf report saved to /cloudai_run_results/aiperf_round_1_report.csv' + if [[ -f "$AIPERF_FAILURE_MARKER" ]]; then + log 'FATAL: failure marker found between AIPerf phases' + exit 1 + fi + if ! curl -fsS -X POST "${FRONTEND_URL}/${AIPERF_ENDPOINT}" -H 'Content-Type: application/json' -d "{\"model\":\"${AIPERF_MODEL}\",\"messages\":[{\"role\":\"user\",\"content\":\"ping\"}],\"stream\":false,\"max_tokens\":1}" >/dev/null; then + log 'FATAL: frontend health probe failed between AIPerf phases' + exit 1 + fi +fi + +rm -rf /cloudai_run_results/aiperf_artifacts/round_2 +mkdir -p /cloudai_run_results/aiperf_artifacts/round_2 +log 'Running round_2: aiperf profile --model model --endpoint-type chat --streaming --url "$FRONTEND_URL" --concurrency 2 --request-count 10 --synthetic-input-tokens-mean 300 --output-tokens-mean 500 --artifact-dir /cloudai_run_results/aiperf_artifacts/round_2 --server-metrics "$AIPERF_SERVER_METRICS_URLS" --server-metrics-formats json csv' +phase_status=0 +set +e +aiperf profile --model model --endpoint-type chat --streaming --url "$FRONTEND_URL" --concurrency 2 --request-count 10 --synthetic-input-tokens-mean 300 --output-tokens-mean 500 --artifact-dir /cloudai_run_results/aiperf_artifacts/round_2 --server-metrics "$AIPERF_SERVER_METRICS_URLS" --server-metrics-formats json csv > /cloudai_run_results/aiperf_round_2.log 2>&1 +phase_status=$? +set -e +if [[ "$phase_status" -ne 0 ]]; then + log 'AIPerf phase round_2 failed' + exit "$phase_status" +fi +if [[ "$phase_status" -eq 0 ]]; then + mkdir -p /cloudai_run_results + cp /cloudai_run_results/aiperf_artifacts/round_2/profile_export_aiperf.csv /cloudai_run_results/aiperf_round_2_report.csv + log 'AIPerf report saved to /cloudai_run_results/aiperf_round_2_report.csv' + mkdir -p /cloudai_run_results + cp /cloudai_run_results/aiperf_round_2_report.csv /cloudai_run_results/aiperf_report.csv + log 'Final AIPerf report saved to /cloudai_run_results/aiperf_report.csv' +fi diff --git a/tests/ref_data/ai-dynamo.sbatch b/tests/ref_data/ai-dynamo.sbatch index 865444b81..c00906d40 100644 --- a/tests/ref_data/ai-dynamo.sbatch +++ b/tests/ref_data/ai-dynamo.sbatch @@ -14,6 +14,53 @@ srun --export=ALL --mpi=pmix -N2 --container-image=nvcr.io/nvidia/ai-dynamo:24.0 srun --export=ALL --mpi=pmix -N2 --container-image=nvcr.io/nvidia/ai-dynamo:24.09 --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__INSTALL_DIR__:/cloudai_install,__OUTPUT_DIR__/output,__INSTALL_DIR__/huggingface:/cloudai_install/huggingface,/tmp:/tmp --ntasks=2 --ntasks-per-node=1 --output=__OUTPUT_DIR__/output/metadata/node-%N.toml --error=__OUTPUT_DIR__/output/metadata/nodes.err bash /cloudai_install/slurm-metadata.sh +# Start DCGM exporter on each node. +echo "Starting DCGM exporter..." +srun --export=ALL --mpi=pmix --container-image=nvcr.io/nvidia/k8s/dcgm-exporter:4.5.2-4.8.1-distroless --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__INSTALL_DIR__:/cloudai_install,__OUTPUT_DIR__/output,__INSTALL_DIR__/huggingface:/cloudai_install/huggingface,/tmp:/tmp --overlap --job-name=cloudai-dcgm-exporter -N2 --ntasks=2 --ntasks-per-node=1 --output=__OUTPUT_DIR__/output/dcgm-node-%n-stdout.txt --error=__OUTPUT_DIR__/output/dcgm-node-%n-stderr.txt bash -lc 'DCGM_EXPORTER_LISTEN=:9501 dcgm-exporter' & +DCGM_EXPORTER_SRUN_PID=$! +echo "DCGM exporter srun PID: ${DCGM_EXPORTER_SRUN_PID}" +DCGM_EXPORTER_STEP_ID= +for _ in {1..10}; do + DCGM_EXPORTER_STEP_ID=$(squeue --noheader --steps --job "$SLURM_JOB_ID" --format="%i %j" | awk '$2 == "cloudai-dcgm-exporter" { print $1; exit }') + if [[ -n "${DCGM_EXPORTER_STEP_ID}" ]]; then break; fi + sleep 1 +done +echo "DCGM exporter step ID: ${DCGM_EXPORTER_STEP_ID:-unknown}" +function stop_dcgm_exporter() +{ + if [[ -n "${DCGM_EXPORTER_STEP_ID:-}" ]]; then + scancel --signal=TERM "${DCGM_EXPORTER_STEP_ID}" 2>/dev/null || true + fi + if [[ -n "${DCGM_EXPORTER_SRUN_PID:-}" ]]; then + wait "${DCGM_EXPORTER_SRUN_PID}" 2>/dev/null || true + fi +} +sleep 5 +echo "Checking DCGM exporter metrics endpoints..." +DCGM_EXPORTER_STARTUP_TIMEOUT=${DCGM_EXPORTER_STARTUP_TIMEOUT:-60} +mapfile -t dcgm_nodes < <(scontrol show hostnames "$SLURM_JOB_NODELIST") +: > __OUTPUT_DIR__/output/dcgm_endpoints.txt +dcgm_failed=0 +for node in "${dcgm_nodes[@]}"; do + dcgm_url="http://${node}:9501/metrics" + echo " ${dcgm_url}" >> __OUTPUT_DIR__/output/dcgm_endpoints.txt + deadline=$((SECONDS + DCGM_EXPORTER_STARTUP_TIMEOUT)) + until curl -fsS --max-time 2 "${dcgm_url}" >/dev/null; do + if (( SECONDS >= deadline )); then + echo "FATAL: DCGM exporter metrics endpoint is unreachable: ${dcgm_url}" >&2 + dcgm_failed=1 + break + fi + sleep 2 + done + if (( dcgm_failed != 0 )); then break; fi + echo "DCGM exporter reachable: ${dcgm_url}" +done +if (( dcgm_failed != 0 )); then + stop_dcgm_exporter + exit 1 +fi + srun \ --export=ALL \ --mpi=pmix \ @@ -32,7 +79,7 @@ srun \ --results-dir /cloudai_run_results \ --dynamo-repo /cloudai_install/dynamo__f7e468c7e8ff0d1426db987564e60572167e8464 \ --hf-home /cloudai_install/huggingface \ - --workloads genai_perf.sh \ + --workloads aiperf.sh \ --failure-marker /cloudai_run_results/failure-marker.txt \ --success-marker /cloudai_run_results/success-marker.txt \ --storage-cache-dir /tmp \ @@ -47,6 +94,8 @@ srun \ --dynamo-etcd-port "2379" \ --dynamo-nats-cmd "nats-server -js" \ --dynamo-nats-port "4222" \ + --dynamo-dcgm-exporter-enabled "True" \ + --dynamo-dcgm-exporter-port "9501" \ --prefill-cmd "python3 -m dynamo.vllm --is-prefill-worker" \ --prefill-worker-initialized-regex "VllmWorker.*has.been.initialized" \ --prefill-multiple-workers-per-node "False" \ @@ -73,6 +122,7 @@ srun \ --genai_perf-synthetic-input-tokens-mean "550" \ --genai_perf-warmup-request-count "10" \ --aiperf-name "aiperf" \ - --aiperf-cmd "aiperf profile" \ - --aiperf-script "/cloudai_install/aiperf.sh" \ - --aiperf-report-name "aiperf_report.csv" + --aiperf-script /cloudai_run_results/aiperf.sh + +# Stop DCGM exporter when test finishes +stop_dcgm_exporter diff --git a/tests/test_acceptance.py b/tests/test_acceptance.py index d45416595..151c6fb9e 100644 --- a/tests/test_acceptance.py +++ b/tests/test_acceptance.py @@ -34,6 +34,9 @@ AIDynamoArgs, AIDynamoCmdArgs, AIDynamoTestDefinition, + AIPerf, + AIPerfPhase, + DCGMExporter, GenAIPerf, WorkerBaseArgs, WorkerConfig, @@ -493,11 +496,13 @@ def test_req(request, slurm_system: SlurmSystem, partial_tr: partial[TestRun]) - ), cmd_args=AIDynamoCmdArgs( docker_image_url="nvcr.io/nvidia/ai-dynamo:24.09", + workloads="aiperf.sh", dynamo=AIDynamoArgs( model="model", backend="vllm", endpoint="v1/chat/completions", workspace_path="/workspace", + dcgm_exporter=DCGMExporter(enabled=True, port=9501), prefill_worker=WorkerConfig( cmd="python3 -m dynamo.vllm --is-prefill-worker", worker_initialized_regex="VllmWorker.*has.been.initialized", @@ -526,6 +531,22 @@ def test_req(request, slurm_system: SlurmSystem, partial_tr: partial[TestRun]) - "warmup-request-count": 10, } ), + aiperf=AIPerf.model_validate( + { + "extra-args": "--server-metrics-formats json csv", + "args": { + "concurrency": 2, + "request-count": 50, + "synthetic-input-tokens-mean": 300, + "output-tokens-mean": 500, + "server-metrics": "auto", + }, + } + ), + aiperf_phases=[ + AIPerfPhase.model_validate({"name": "round_1", "args": {"concurrency": 1}}), + AIPerfPhase.model_validate({"name": "round_2", "args": {"request-count": 10}}), + ], ), ), ), @@ -745,3 +766,10 @@ def test_sbatch_generation(slurm_system: SlurmSystem, test_req: tuple[TestRun, s "__INSTALL_DIR__", str(slurm_system.install_path.absolute()) ) assert curr_launcher == ref_launcher, "nixl-ep-launch.sh does not match reference" + + if test_req[1] == "ai-dynamo.sbatch": + aiperf_script = slurm_system.output_path / "aiperf.sh" + assert aiperf_script.exists(), "aiperf.sh was not generated" + curr_aiperf = aiperf_script.read_text().strip() + ref_aiperf = (Path(__file__).parent / "ref_data" / "ai-dynamo-aiperf.sh").read_text().strip() + assert curr_aiperf == ref_aiperf, "aiperf.sh does not match reference" diff --git a/tests/workloads/ai_dynamo/test_command_gen_strategy_slurm.py b/tests/workloads/ai_dynamo/test_command_gen_strategy_slurm.py index 544bb064a..46a10906b 100644 --- a/tests/workloads/ai_dynamo/test_command_gen_strategy_slurm.py +++ b/tests/workloads/ai_dynamo/test_command_gen_strategy_slurm.py @@ -33,6 +33,7 @@ AIDynamoTestDefinition, AIPerf, AIPerfAccuracy, + AIPerfPhase, GenAIPerf, LMCacheController, WorkerBaseArgs, @@ -192,9 +193,10 @@ def test_gen_script_args_contains_split_aiperf_accuracy_args(strategy: AIDynamoS result = strategy._gen_script_args(td) - assert '--aiperf-args-request-count "50"' in result - assert '--aiperf-args-synthetic-input-tokens-mean "300"' in result - assert '--aiperf-args-output-tokens-mean "500"' in result + script = (strategy.test_run.output_path / "aiperf.sh").read_text() + assert "--request-count 50" in script + assert "--synthetic-input-tokens-mean 300" in script + assert "--output-tokens-mean 500" in script assert f'--aiperf_accuracy-setup-cmd "{setup_cmd}"' in result assert '--aiperf_accuracy-name "aiperf_accuracy"' in result assert '--aiperf_accuracy-entrypoint "aiperf profile"' in result @@ -218,6 +220,179 @@ def test_gen_script_args_contains_custom_aiperf_accuracy_args(strategy: AIDynamo assert f'--aiperf_accuracy-cli "{cli}"' in result +def test_gen_script_args_writes_resolved_aiperf_script(strategy: AIDynamoSlurmCommandGenStrategy) -> None: + td = cast(AIDynamoTestDefinition, strategy.test_run.test) + td.cmd_args.workloads = "aiperf.sh" + td.cmd_args.aiperf = AIPerf.model_validate( + { + "setup-cmd": "python -m pip install --upgrade aiperf", + "args": { + "concurrency": 2, + "request-count": 50, + "synthetic-input-tokens-mean": 300, + "output-tokens-mean": 500, + }, + } + ) + td.cmd_args.aiperf_phases = [ + AIPerfPhase.model_validate({"name": "round_1", "args": {"concurrency": 1}}), + AIPerfPhase.model_validate( + { + "name": "round_2", + "setup-cmd": "python -m pip install --upgrade another-aiperf-plugin", + "args": {"request-count": 10}, + } + ), + ] + + result = strategy._gen_script_args(td) + + assert f"--aiperf-script {strategy.CONTAINER_MOUNT_OUTPUT}/aiperf.sh" in result + script = (strategy.test_run.output_path / "aiperf.sh").read_text() + assert script.count("Running aiperf setup:") == 1 + assert "bash -lc 'python -m pip install --upgrade aiperf'" in script + assert "Running AIPerf phase setup for round_1" not in script + assert "Running AIPerf phase setup for round_2" in script + assert "bash -lc 'python -m pip install --upgrade another-aiperf-plugin'" in script + assert ': "${FRONTEND_URL:?FRONTEND_URL is not set}"' in script + assert '--url "$FRONTEND_URL"' in script + assert f"--artifact-dir {strategy.CONTAINER_MOUNT_OUTPUT}/aiperf_artifacts/round_1" in script + assert f"--artifact-dir {strategy.CONTAINER_MOUNT_OUTPUT}/aiperf_artifacts/round_2" in script + assert "--concurrency 1 --request-count 50" in script + assert "--concurrency 2 --request-count 10" in script + assert f"{strategy.CONTAINER_MOUNT_OUTPUT}/aiperf_round_1.log" in script + assert f"{strategy.CONTAINER_MOUNT_OUTPUT}/aiperf_round_1_report.csv" in script + assert f"{strategy.CONTAINER_MOUNT_OUTPUT}/aiperf_round_2_report.csv" in script + assert f"{strategy.CONTAINER_MOUNT_OUTPUT}/aiperf_report.csv" in script + + +def test_generated_aiperf_script_supports_core_overrides_and_server_metrics_auto( + strategy: AIDynamoSlurmCommandGenStrategy, +) -> None: + td = cast(AIDynamoTestDefinition, strategy.test_run.test) + td.cmd_args.workloads = "aiperf.sh" + td.cmd_args.aiperf = AIPerf.model_validate( + { + "args": { + "model": "custom-model", + "endpoint-type": "completions", + "streaming": False, + "server-metrics": "auto", + "request-count": 10, + }, + } + ) + + strategy._gen_script_args(td) + + script = (strategy.test_run.output_path / "aiperf.sh").read_text() + assert "--model custom-model" in script + assert "--endpoint-type completions" in script + assert "--streaming" not in script + assert '--server-metrics "$AIPERF_SERVER_METRICS_URLS"' in script + assert "--no-server-metrics" not in script + + +def test_generated_aiperf_script_rejects_list_args(strategy: AIDynamoSlurmCommandGenStrategy) -> None: + td = cast(AIDynamoTestDefinition, strategy.test_run.test) + td.cmd_args.workloads = "aiperf.sh" + td.cmd_args.aiperf = AIPerf.model_validate({"args": {"server-metrics-formats": ["json", "csv"]}}) + + with pytest.raises(ValueError, match="AIPerf argument 'server-metrics-formats' must be a scalar value"): + strategy._gen_script_args(td) + + +def test_aiperf_extra_args_must_be_string() -> None: + with pytest.raises(ValueError): + AIPerf.model_validate({"extra-args": ["--server-metrics-formats", "json"]}) + + with pytest.raises(ValueError): + AIPerfPhase.model_validate({"name": "round_1", "extra-args": ["--server-metrics-formats", "json"]}) + + +def test_dcgm_exporter_generates_launcher_and_runtime_flags(strategy: AIDynamoSlurmCommandGenStrategy) -> None: + td = cast(AIDynamoTestDefinition, strategy.test_run.test) + td.cmd_args.dynamo.dcgm_exporter.enabled = True + td.cmd_args.dynamo.dcgm_exporter.docker_image_url = "nvcr.io/test/dcgm:latest" + td.cmd_args.dynamo.dcgm_exporter.port = 9501 + + args = strategy._gen_script_args(td) + block = strategy._gen_dcgm_launcher_block() + + assert '--dynamo-dcgm-exporter-enabled "True"' in args + assert '--dynamo-dcgm-exporter-port "9501"' in args + assert any("--container-image=nvcr.io/test/dcgm:latest" in line for line in block) + assert any("DCGM_EXPORTER_LISTEN=:9501" in line for line in block) + assert any("DCGM_EXPORTER_STARTUP_TIMEOUT" in line for line in block) + assert any('curl -fsS --max-time 2 "${dcgm_url}"' in line for line in block) + assert any("FATAL: DCGM exporter metrics endpoint is unreachable" in line for line in block) + assert any('scancel --signal=TERM "${DCGM_EXPORTER_STEP_ID}"' in line for line in block) + assert strategy._gen_dcgm_cleanup_command() == "stop_dcgm_exporter" + assert not any("docker run" in line for line in block) + assert not any('kill "${DCGM_EXPORTER_SRUN_PID}"' in line for line in block) + + +def test_dcgm_exporter_adds_configured_docker_image_installable(cmd_args: AIDynamoCmdArgs) -> None: + cmd_args.dynamo.dcgm_exporter.enabled = True + cmd_args.dynamo.dcgm_exporter.docker_image_url = "nvcr.io/test/dcgm:latest" + tdef = AIDynamoTestDefinition( + name="test", + description="desc", + test_template_name="template", + cmd_args=cmd_args, + ) + + assert tdef.dcgm_exporter_image is not None + assert tdef.dcgm_exporter_image.url == "nvcr.io/test/dcgm:latest" + assert tdef.dcgm_exporter_image in tdef.installables + + +def test_aiperf_phase_roundtrip_does_not_emit_default_report_name(strategy: AIDynamoSlurmCommandGenStrategy) -> None: + td = cast(AIDynamoTestDefinition, strategy.test_run.test) + td.cmd_args.workloads = "aiperf.sh" + td.cmd_args.aiperf_phases = [ + AIPerfPhase.model_validate({"name": "round_1"}), + AIPerfPhase.model_validate({"name": "round_2"}), + ] + + roundtripped = AIDynamoTestDefinition.model_validate(td.model_dump()) + strategy.test_run.test = roundtripped + + assert roundtripped.cmd_args.aiperf_phases is not None + assert [phase.report_name for phase in roundtripped.cmd_args.aiperf_phases] == [None, None] + + strategy._gen_script_args(roundtripped) + + script = (strategy.test_run.output_path / "aiperf.sh").read_text() + assert f"{strategy.CONTAINER_MOUNT_OUTPUT}/aiperf_round_1_report.csv" in script + assert f"{strategy.CONTAINER_MOUNT_OUTPUT}/aiperf_round_2_report.csv" in script + + +def test_single_aiperf_phase_keeps_legacy_artifact_defaults(strategy: AIDynamoSlurmCommandGenStrategy) -> None: + td = cast(AIDynamoTestDefinition, strategy.test_run.test) + td.cmd_args.workloads = "aiperf.sh" + td.cmd_args.aiperf_phases = [AIPerfPhase.model_validate({"name": "round_1", "args": {"request-count": 10}})] + + strategy._gen_script_args(td) + + script = (strategy.test_run.output_path / "aiperf.sh").read_text() + assert f"{strategy.CONTAINER_MOUNT_OUTPUT}/aiperf_artifacts" in script + assert f"{strategy.CONTAINER_MOUNT_OUTPUT}/aiperf_report.csv" in script + assert f"{strategy.CONTAINER_MOUNT_OUTPUT}/aiperf_round_1.log" not in script + + +def test_aiperf_phase_names_must_be_unique(cmd_args: AIDynamoCmdArgs) -> None: + with pytest.raises(ValueError, match="AIPerf phase names must be unique"): + AIDynamoCmdArgs( + docker_image_url=cmd_args.docker_image_url, + dynamo=cmd_args.dynamo, + aiperf_phases=[ + AIPerfPhase.model_validate({"name": "round_1"}), + AIPerfPhase.model_validate({"name": "round_1"}), + ], + ) + + def test_gen_script_args_quotes_worker_json_args(strategy: AIDynamoSlurmCommandGenStrategy) -> None: td = cast(AIDynamoTestDefinition, strategy.test_run.test) config = '{"kv_connector":"NixlConnector","kv_role":"kv_both"}'