diff --git a/src/cloudai/registration.py b/src/cloudai/registration.py index da14df78d..e31bc7273 100644 --- a/src/cloudai/registration.py +++ b/src/cloudai/registration.py @@ -136,6 +136,7 @@ def register_all(): NIXLBenchTestDefinition, ) from cloudai.workloads.nixl_ep import ( + NixlEPComparisonReport, NixlEPReportGenerationStrategy, NixlEPSlurmCommandGenStrategy, NixlEPTestDefinition, @@ -321,6 +322,11 @@ def register_all(): NIXLBenchComparisonReport, ComparisonReportConfig(enable=True, group_by=["backend", "op_type"]), ) + Registry().add_scenario_report( + "nixl_ep_comparison", + NixlEPComparisonReport, + ComparisonReportConfig(enable=True), + ) Registry().add_scenario_report( "nccl_comparison", NcclComparisonReport, ComparisonReportConfig(enable=True, group_by=["subtest_name"]) ) diff --git a/src/cloudai/workloads/nixl_ep/__init__.py b/src/cloudai/workloads/nixl_ep/__init__.py index f13454e26..8dd1ee9c7 100644 --- a/src/cloudai/workloads/nixl_ep/__init__.py +++ b/src/cloudai/workloads/nixl_ep/__init__.py @@ -15,11 +15,13 @@ # limitations under the License. from .nixl_ep import NixlEPCmdArgs, NixlEPTestDefinition +from .nixl_ep_comparison_report import NixlEPComparisonReport from .report_generation_strategy import NixlEPReportGenerationStrategy from .slurm_command_gen_strategy import NixlEPSlurmCommandGenStrategy __all__ = [ "NixlEPCmdArgs", + "NixlEPComparisonReport", "NixlEPReportGenerationStrategy", "NixlEPSlurmCommandGenStrategy", "NixlEPTestDefinition", diff --git a/src/cloudai/workloads/nixl_ep/log_parsing.py b/src/cloudai/workloads/nixl_ep/log_parsing.py index f64ff5b8a..8182134d1 100644 --- a/src/cloudai/workloads/nixl_ep/log_parsing.py +++ b/src/cloudai/workloads/nixl_ep/log_parsing.py @@ -31,8 +31,10 @@ ) _KINETO_BW_RE = re.compile( rf"\[rank (?P\d+)\] Dispatch bandwidth: " - rf"(?P{_FLOAT_RE}) GB/s \| " + rf"(?P{_FLOAT_RE}) GB/s" + rf"(?:, avg_t={_FLOAT_RE} us)? \| " rf"Combine bandwidth: (?P{_FLOAT_RE}) GB/s" + rf"(?:, avg_t={_FLOAT_RE} us)?" ) diff --git a/src/cloudai/workloads/nixl_ep/nixl_ep_comparison_report.py b/src/cloudai/workloads/nixl_ep/nixl_ep_comparison_report.py new file mode 100644 index 000000000..ddc614ffe --- /dev/null +++ b/src/cloudai/workloads/nixl_ep/nixl_ep_comparison_report.py @@ -0,0 +1,226 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import itertools +import pathlib +from typing import TYPE_CHECKING, Any, ClassVar, cast + +import rich.table + +import cloudai.core +import cloudai.report_generator.comparison_report +import cloudai.report_generator.groups +import cloudai.workloads.nixl_ep.log_parsing as log_parsing +import cloudai.workloads.nixl_ep.nixl_ep as nixl_ep +from cloudai.util.lazy_imports import lazy + +if TYPE_CHECKING: + import bokeh.plotting as bk + import pandas as pd + + +class NixlEPComparisonReport(cloudai.report_generator.comparison_report.ComparisonReport): + """Comparison report for NIXL EP benchmark runs.""" + + NODE_COLUMN: ClassVar[str] = "Node" + DISPATCH_COMBINE_BW_COLUMN: ClassVar[str] = "Dispatch+Combine BW (GB/s)" + AVG_TIME_COLUMN: ClassVar[str] = "Avg Time (us)" + MIN_TIME_COLUMN: ClassVar[str] = "Min Time (us)" + MAX_TIME_COLUMN: ClassVar[str] = "Max Time (us)" + DISPATCH_BW_COLUMN: ClassVar[str] = "Dispatch BW (GB/s)" + COMBINE_BW_COLUMN: ClassVar[str] = "Combine BW (GB/s)" + + METRIC_COLUMNS: ClassVar[tuple[tuple[str, str], ...]] = ( + ("dispatch_combine_bandwidth_gbps", DISPATCH_COMBINE_BW_COLUMN), + ("avg_time_us", AVG_TIME_COLUMN), + ("min_time_us", MIN_TIME_COLUMN), + ("max_time_us", MAX_TIME_COLUMN), + ("dispatch_bandwidth_gbps", DISPATCH_BW_COLUMN), + ("combine_bandwidth_gbps", COMBINE_BW_COLUMN), + ) + BANDWIDTH_COLUMNS: ClassVar[tuple[str, ...]] = ( + DISPATCH_COMBINE_BW_COLUMN, + DISPATCH_BW_COLUMN, + COMBINE_BW_COLUMN, + ) + TIME_COLUMNS: ClassVar[tuple[str, ...]] = (AVG_TIME_COLUMN, MIN_TIME_COLUMN, MAX_TIME_COLUMN) + + def __init__( + self, + system: cloudai.core.System, + test_scenario: cloudai.core.TestScenario, + results_root: pathlib.Path, + config: cloudai.report_generator.comparison_report.ComparisonReportConfig, + ) -> None: + super().__init__(system, test_scenario, results_root, config) + self.report_file_name = "nixl_ep_comparison.html" + + def load_test_runs(self) -> None: + super().load_test_runs() + self.trs = [tr for tr in self.trs if isinstance(tr.test, nixl_ep.NixlEPTestDefinition)] + + def comparison_values(self, tr: cloudai.core.TestRun) -> dict[str, object]: + return {"case": tr.name.removeprefix("NIXL.EP."), "NUM_NODES": tr.nnodes} + + @staticmethod + def _mean(values: list[float]) -> float | None: + return sum(values) / len(values) if values else None + + @staticmethod + def _metric_value(value: float | None) -> float | str: + if value is None: + return "n/a" + return round(value, 4) + + @classmethod + def _empty_df(cls) -> pd.DataFrame: + return lazy.pd.DataFrame( + { + cls.NODE_COLUMN: lazy.pd.Series([], dtype=int), + **{column: lazy.pd.Series([], dtype=object) for _, column in cls.METRIC_COLUMNS}, + } + ) + + def extract_data_as_df(self, tr: cloudai.core.TestRun) -> pd.DataFrame: + rows: list[dict[str, object]] = [] + for node_idx in range(tr.nnodes): + samples = log_parsing.parse_nixl_ep_bandwidth_samples(tr.output_path / f"nixl-ep-node-{node_idx}.log") + row: dict[str, object] = {self.NODE_COLUMN: node_idx} + for sample_attr, column in self.METRIC_COLUMNS: + values = [value for sample in samples if (value := getattr(sample, sample_attr)) is not None] + row[column] = self._metric_value(self._mean(values)) + rows.append(row) + + if not rows: + return self._empty_df() + + return lazy.pd.DataFrame(rows) + + @staticmethod + def _has_metric(dfs: list[pd.DataFrame], column: str) -> bool: + for df in dfs: + if column not in df.columns: + continue + if any(isinstance(value, (int, float)) for value in df[column].tolist()): + return True + return False + + def _available_columns(self, dfs: list[pd.DataFrame], columns: tuple[str, ...]) -> list[str]: + return [column for column in columns if self._has_metric(dfs, column)] + + def create_tables( + self, cmp_groups: list[cloudai.report_generator.groups.GroupedTestRuns] + ) -> list[rich.table.Table]: + tables: list[rich.table.Table] = [] + for group in cmp_groups: + dfs = [self.extract_data_as_df(item.tr) for item in group.items] + bandwidth_columns = self._available_columns(dfs, self.BANDWIDTH_COLUMNS) + time_columns = self._available_columns(dfs, self.TIME_COLUMNS) + + for bandwidth_column in bandwidth_columns: + tables.append( + self.create_table( + group, + dfs=dfs, + title=bandwidth_column, + info_columns=[self.NODE_COLUMN], + data_columns=[bandwidth_column], + ) + ) + for time_column in time_columns: + tables.append( + self.create_table( + group, + dfs=dfs, + title=time_column, + info_columns=[self.NODE_COLUMN], + data_columns=[time_column], + ) + ) + + return tables + + def _create_metric_bar_chart( + self, + group: cloudai.report_generator.groups.GroupedTestRuns, + dfs: list[pd.DataFrame], + metric_column: str, + y_axis_label: str, + ) -> bk.figure: + factors: list[tuple[str, str]] = [] + values: list[float] = [] + nodes: list[str] = [] + runs: list[str] = [] + colors: list[str] = [] + color_cycler = itertools.cycle(["#1f77b4", "#17becf", "#2ca02c", "#bcbd22", "#ff7f0e"]) + color_by_run = {item.name: next(color_cycler) for item in group.items} + + for df, item in zip(dfs, group.items, strict=True): + if metric_column not in df.columns: + continue + for _, row in df.iterrows(): + value = row[metric_column] + if not isinstance(value, (int, float)): + continue + node = f"Node {row[self.NODE_COLUMN]}" + factors.append((node, item.name)) + values.append(float(value)) + nodes.append(node) + runs.append(item.name) + colors.append(color_by_run[item.name]) + + x_range = lazy.bokeh_models.FactorRange(*factors) + cast(Any, x_range).range_padding = 0.1 + plot = lazy.bokeh_plotting.figure( + title=f"{metric_column}: {group.name}", + x_range=x_range, + y_axis_label=y_axis_label, + width=800, + height=500, + tools="save,reset", + ) + hover = lazy.bokeh_models.HoverTool(tooltips=[("Node", "@node"), ("Run", "@run"), ("Value", "@value{0.0000}")]) + plot.add_tools(hover) + + if not values: + return plot + + source = lazy.bokeh_models.ColumnDataSource( + data={ + "x": factors, + "node": nodes, + "run": runs, + "value": values, + "color": colors, + } + ) + plot.vbar(x="x", top="value", width=0.8, fill_color="color", line_color="color", source=source) + plot.xaxis.major_label_orientation = 0.8 + y_max = max(values) + plot.y_range = lazy.bokeh_models.Range1d(start=0, end=y_max * 1.1 if y_max > 0 else 1) + return plot + + def create_charts(self, cmp_groups: list[cloudai.report_generator.groups.GroupedTestRuns]) -> list[bk.figure]: + charts: list[bk.figure] = [] + for group in cmp_groups: + dfs = [self.extract_data_as_df(item.tr) for item in group.items] + for column in self._available_columns(dfs, self.BANDWIDTH_COLUMNS): + charts.append(self._create_metric_bar_chart(group, dfs, column, "Bandwidth (GB/s)")) + for column in self._available_columns(dfs, self.TIME_COLUMNS): + charts.append(self._create_metric_bar_chart(group, dfs, column, "Time (us)")) + return charts diff --git a/tests/test_init.py b/tests/test_init.py index 1fbab93f3..47b486110 100644 --- a/tests/test_init.py +++ b/tests/test_init.py @@ -80,7 +80,7 @@ NIXLBenchSlurmCommandGenStrategy, NIXLBenchTestDefinition, ) -from cloudai.workloads.nixl_ep import NixlEPSlurmCommandGenStrategy, NixlEPTestDefinition +from cloudai.workloads.nixl_ep import NixlEPComparisonReport, NixlEPSlurmCommandGenStrategy, NixlEPTestDefinition from cloudai.workloads.nixl_kvbench import NIXLKVBenchSlurmCommandGenStrategy, NIXLKVBenchTestDefinition from cloudai.workloads.nixl_perftest import NixlPerftestSlurmCommandGenStrategy, NixlPerftestTestDefinition from cloudai.workloads.osu_bench import ( @@ -272,6 +272,7 @@ def test_scenario_reports(): "dse", "tarball", "nixl_bench_summary", + "nixl_ep_comparison", "nccl_comparison", "osu_bench_comparison", "vllm_comparison", @@ -283,6 +284,7 @@ def test_scenario_reports(): DSEReporter, TarballReporter, NIXLBenchComparisonReport, + NixlEPComparisonReport, NcclComparisonReport, OSUBenchComparisonReport, VLLMComparisonReport, @@ -298,6 +300,7 @@ def test_report_configs(): "dse", "tarball", "nixl_bench_summary", + "nixl_ep_comparison", "nccl_comparison", "osu_bench_comparison", "vllm_comparison", diff --git a/tests/workloads/nixl_ep/test_comparison_report.py b/tests/workloads/nixl_ep/test_comparison_report.py new file mode 100644 index 000000000..ddd482439 --- /dev/null +++ b/tests/workloads/nixl_ep/test_comparison_report.py @@ -0,0 +1,94 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pathlib + +import cloudai.core +import cloudai.report_generator.comparison_report +import cloudai.systems.slurm +from cloudai.workloads.nixl_ep import NixlEPCmdArgs, NixlEPComparisonReport, NixlEPTestDefinition + + +def _nixl_ep_tr(name: str, num_processes_per_node: int) -> cloudai.core.TestRun: + return cloudai.core.TestRun( + name=name, + test=NixlEPTestDefinition( + name="nixl_ep", + description="NIXL EP benchmark", + test_template_name="NixlEP", + cmd_args=NixlEPCmdArgs( + docker_image_url="fake://nixl", + plan="[[0, 1]]", + num_processes_per_node=num_processes_per_node, + ), + ), + num_nodes=2, + nodes=[], + ) + + +def _write_node_log( + run_dir: pathlib.Path, node_idx: int, combined_bw: float, dispatch_bw: float, combine_bw: float +) -> None: + run_dir.mkdir(parents=True, exist_ok=True) + (run_dir / f"nixl-ep-node-{node_idx}.log").write_text( + "\n".join( + [ + f"[rank {node_idx}] Dispatch + combine bandwidth: {combined_bw} GB/s, " + "avg_t=10 us, min_t=8 us, max_t=12 us", + f"[rank {node_idx}] Dispatch bandwidth: {dispatch_bw} GB/s | Combine bandwidth: {combine_bw} GB/s", + ] + ), + encoding="utf-8", + ) + + +def test_nixl_ep_comparison_report_generates_html(slurm_system: cloudai.systems.slurm.SlurmSystem) -> None: + tr1 = _nixl_ep_tr("NIXL.EP.No-expansion", 1) + tr2 = _nixl_ep_tr("NIXL.EP.Single-expansion", 2) + + tr1_dir = slurm_system.output_path / tr1.name / "0" + tr2_dir = slurm_system.output_path / tr2.name / "0" + _write_node_log(tr1_dir, 0, combined_bw=100.0, dispatch_bw=40.0, combine_bw=45.0) + _write_node_log(tr1_dir, 1, combined_bw=120.0, dispatch_bw=50.0, combine_bw=55.0) + _write_node_log(tr2_dir, 0, combined_bw=130.0, dispatch_bw=60.0, combine_bw=65.0) + _write_node_log(tr2_dir, 1, combined_bw=150.0, dispatch_bw=70.0, combine_bw=75.0) + + report = NixlEPComparisonReport( + slurm_system, + cloudai.core.TestScenario(name="nixl-ep-comparison", test_runs=[tr1, tr2]), + slurm_system.output_path, + cloudai.report_generator.comparison_report.ComparisonReportConfig(enable=True), + ) + + report.load_test_runs() + assert len(report.trs) == 2 + + tables = report.create_tables(report.group_test_runs()) + dispatch_combine_bandwidth_table = tables[0] + dispatch_bandwidth_table = tables[1] + avg_time_table = tables[3] + + assert "case=No-expansion" in str(dispatch_combine_bandwidth_table.columns[1].header) + assert "case=Single-expansion" in str(dispatch_combine_bandwidth_table.columns[2].header) + assert list(dispatch_combine_bandwidth_table.columns[0].cells) == ["0", "1"] + assert list(dispatch_combine_bandwidth_table.columns[1].cells) == ["100.0", "120.0"] + assert list(dispatch_bandwidth_table.columns[1].cells) == ["40.0", "50.0"] + assert list(avg_time_table.columns[1].cells) == ["10.0", "10.0"] + + report.generate() + + assert (slurm_system.output_path / "nixl_ep_comparison.html").exists() diff --git a/tests/workloads/nixl_ep/test_log_parsing.py b/tests/workloads/nixl_ep/test_log_parsing.py index c88885ad6..822f4db25 100644 --- a/tests/workloads/nixl_ep/test_log_parsing.py +++ b/tests/workloads/nixl_ep/test_log_parsing.py @@ -39,7 +39,7 @@ def test_parse_combined_bandwidth_output(tmp_path: Path) -> None: def test_parse_kineto_bandwidth_output(tmp_path: Path) -> None: log_path = tmp_path / "nixl-ep-node-0.log" log_path.write_text( - "[rank 7] Dispatch bandwidth: 30.25 GB/s | Combine bandwidth: 28.75 GB/s\n", + "[rank 7] Dispatch bandwidth: 30.25 GB/s, avg_t=40.5 us | Combine bandwidth: 28.75 GB/s, avg_t=43.2 us\n", encoding="utf-8", )