From 70e71e6d6b8bc6278009ec2368627235637d21d5 Mon Sep 17 00:00:00 2001 From: Ivan Podkidyshev Date: Fri, 5 Jun 2026 12:33:24 +0200 Subject: [PATCH 1/3] fix nixlep rank removal --- src/cloudai/workloads/nixl_ep/nixl_ep.py | 19 +++++- .../nixl_ep/slurm_command_gen_strategy.py | 33 ++++++++-- tests/ref_data/nixl-ep-launch.sh | 11 +++- .../test_command_gen_strategy_slurm.py | 60 +++++++++++++++++++ .../test_job_status_retrieval_strategy.py | 45 ++++++++++++++ 5 files changed, 161 insertions(+), 7 deletions(-) diff --git a/src/cloudai/workloads/nixl_ep/nixl_ep.py b/src/cloudai/workloads/nixl_ep/nixl_ep.py index a99020a9e..f5ae1acd6 100644 --- a/src/cloudai/workloads/nixl_ep/nixl_ep.py +++ b/src/cloudai/workloads/nixl_ep/nixl_ep.py @@ -149,10 +149,28 @@ def _primary_launch_exit_error_message(content: str) -> str | None: return f"The primary NIXL EP launch exited before phase {phase} completed." + @staticmethod + def _looks_like_planned_srun_termination(content: str) -> bool: + allowed_patterns = ( + re.compile(r"^srun: error: .+: task \d+: Terminated$"), + re.compile(r"^srun: Terminating StepId=\S+$"), + re.compile(r"^srun: Force Terminated StepId=\S+$"), + ) + lines = [line.strip() for line in content.splitlines() if line.strip()] + return bool(lines) and all(any(pattern.match(line) for pattern in allowed_patterns) for line in lines) + + def _has_planned_rank_removal(self) -> bool: + plans = self.cmd_args.plan if isinstance(self.cmd_args.plan, list) else [self.cmd_args.plan] + return any(rank < 0 for plan in plans for phase in NixlEPCmdArgs._parse_plan(plan) for rank in phase) + def _scan_log_for_failures(self, path: Path) -> JobStatusResult | None: if not path.is_file(): return None + content = path.read_text(encoding="utf-8", errors="ignore") + if self._has_planned_rank_removal() and self._looks_like_planned_srun_termination(content): + return None + launcher_failure_patterns = ( ("python3: can't open file", "The benchmark entrypoint could not be opened."), ("Traceback (most recent call last):", "The benchmark launcher raised a Python traceback."), @@ -164,7 +182,6 @@ def _scan_log_for_failures(self, path: Path) -> JobStatusResult | None: ("srun: error:", "Slurm reported an srun failure."), ("Exited with exit code", "A Slurm step exited with a non-zero status."), ) - content = path.read_text(encoding="utf-8", errors="ignore") primary_launch_error = self._primary_launch_exit_error_message(content) if primary_launch_error is not None: tail = self._tail(path) diff --git a/src/cloudai/workloads/nixl_ep/slurm_command_gen_strategy.py b/src/cloudai/workloads/nixl_ep/slurm_command_gen_strategy.py index e60c8dfab..bad92811d 100644 --- a/src/cloudai/workloads/nixl_ep/slurm_command_gen_strategy.py +++ b/src/cloudai/workloads/nixl_ep/slurm_command_gen_strategy.py @@ -111,6 +111,9 @@ def _new_process_counts_by_phase(self) -> list[int]: self._validate_requested_processes(counts) return counts + def _has_planned_rank_removal(self) -> bool: + return any(rank < 0 for phase in self.tdef.cmd_args.parse_plan() for rank in phase) + def _validate_requested_processes(self, new_process_counts: list[int]) -> None: total_requested_processes = sum(new_process_counts) num_nodes, _ = self.get_cached_nodes_spec() @@ -302,22 +305,42 @@ def _finish_with_rc_lines() -> list[str]: "exit $rc", ] - @classmethod - def _wait_for_workers_lines(cls) -> list[str]: - return [ + def _wait_for_workers_lines(self) -> list[str]: + allow_planned_removal_143 = "1" if self._has_planned_rank_removal() else "0" + final_phase = len(self.tdef.cmd_args.parse_plan()) - 1 + lines = [ "", + f"allow_planned_removal_143={allow_planned_removal_143}", + "ignored_planned_removal_143=0", "rc=0", 'while [ "$active_srun_count" -gt 0 ]; do', " wait -n", " wait_rc=$?", " active_srun_count=$((active_srun_count - 1))", - ' if [ "$wait_rc" -ne 0 ] && [ "$rc" -eq 0 ]; then', + ' if [ "$allow_planned_removal_143" -eq 1 ] && [ "$wait_rc" -eq 143 ]; then', + ' echo "Ignoring provisional NIXL EP planned-rank-removal exit 143"', + " ignored_planned_removal_143=1", + ' elif [ "$wait_rc" -ne 0 ] && [ "$rc" -eq 0 ]; then', " rc=$wait_rc", " fi", "done", "", - *cls._finish_with_rc_lines(), ] + if self._has_planned_rank_removal(): + final_phase_wait = ( + f' wait_for_phase_completion "{final_phase}" "{self.node_log_path(0).absolute()}" "$primary_pid" ' + "|| rc=143" + ) + lines.extend( + [ + 'if [ "$ignored_planned_removal_143" -eq 1 ] && [ "$rc" -eq 0 ]; then', + final_phase_wait, + "fi", + "", + ] + ) + lines.extend(self._finish_with_rc_lines()) + return lines @staticmethod def _has_follower_launches(stages: list[NixlEPStage]) -> bool: diff --git a/tests/ref_data/nixl-ep-launch.sh b/tests/ref_data/nixl-ep-launch.sh index 2dec420a8..89a6fce44 100644 --- a/tests/ref_data/nixl-ep-launch.sh +++ b/tests/ref_data/nixl-ep-launch.sh @@ -107,16 +107,25 @@ echo "Starting launches for phase 3..." srun --export=ALL --mpi=pmix --container-image=docker.io/nvidia/nixl-ep:latest --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__INSTALL_DIR__:/cloudai_install,__OUTPUT_DIR__/output --overlap --nodelist="${nodes_array[2]}" --ntasks-per-node=1 --ntasks=1 -N1 --open-mode=append --output=__OUTPUT_DIR__/output/nixl-ep-node-2.log --error=__OUTPUT_DIR__/output/nixl-ep-node-2.log bash -c "source __OUTPUT_DIR__/output/env_vars.sh; python3 /workspace/nixl/examples/device/ep/tests/elastic/elastic.py --plan __OUTPUT_DIR__/output/nixl-ep-plan.json --num-processes 2 --tcp-server $master_ip --disable-ll-nvlink --hidden-dim 8192 --kineto --num-experts-per-rank 4 --num-tokens 256 --num-topk 6" & active_srun_count=$((active_srun_count + 1)) +allow_planned_removal_143=1 +ignored_planned_removal_143=0 rc=0 while [ "$active_srun_count" -gt 0 ]; do wait -n wait_rc=$? active_srun_count=$((active_srun_count - 1)) - if [ "$wait_rc" -ne 0 ] && [ "$rc" -eq 0 ]; then + if [ "$allow_planned_removal_143" -eq 1 ] && [ "$wait_rc" -eq 143 ]; then + echo "Ignoring provisional NIXL EP planned-rank-removal exit 143" + ignored_planned_removal_143=1 + elif [ "$wait_rc" -ne 0 ] && [ "$rc" -eq 0 ]; then rc=$wait_rc fi done +if [ "$ignored_planned_removal_143" -eq 1 ] && [ "$rc" -eq 0 ]; then + wait_for_phase_completion "3" "__OUTPUT_DIR__/output/nixl-ep-node-0.log" "$primary_pid" || rc=143 +fi + if [ "$rc" -eq 0 ]; then echo "All NIXL EP launches completed successfully" fi diff --git a/tests/workloads/nixl_ep/test_command_gen_strategy_slurm.py b/tests/workloads/nixl_ep/test_command_gen_strategy_slurm.py index 663aa88e4..20e174460 100644 --- a/tests/workloads/nixl_ep/test_command_gen_strategy_slurm.py +++ b/tests/workloads/nixl_ep/test_command_gen_strategy_slurm.py @@ -747,6 +747,66 @@ def test_gen_srun_command_multi_node_public_single_expansion_waits_for_phase_bef assert launcher_script.count("--open-mode=append") == 1 +def test_gen_srun_command_planned_rank_removal_tolerates_143_after_final_phase( + slurm_system: SlurmSystem, +) -> None: + tdef = NixlEPTestDefinition( + name="nixl_ep", + description="NIXL Elastic EP benchmark", + test_template_name="NixlEP", + cmd_args=NixlEPCmdArgs( + docker_image_url="docker.io/nvidia/nixl-ep:latest", + plan=json.dumps([[0, 1], [0, 1, 2, 3], [0, -2, 3], [0, 1, 2, 3]]), + num_processes_per_node=3, + ), + ) + test_run = TestRun( + name="nixl-ep", + num_nodes=2, + nodes=[], + test=tdef, + output_path=slurm_system.output_path, + ) + strategy = NixlEPSlurmCommandGenStrategy(slurm_system, test_run) + + launcher_script = read_launcher_script(strategy) + + assert "allow_planned_removal_143=1" in launcher_script + assert 'if [ "$allow_planned_removal_143" -eq 1 ] && [ "$wait_rc" -eq 143 ]; then' in launcher_script + assert "Ignoring provisional NIXL EP planned-rank-removal exit 143" in launcher_script + assert 'wait_for_phase_completion "3"' in launcher_script + assert "|| rc=143" in launcher_script + + +def test_gen_srun_command_without_planned_rank_removal_keeps_143_fatal( + slurm_system: SlurmSystem, +) -> None: + tdef = NixlEPTestDefinition( + name="nixl_ep", + description="NIXL Elastic EP benchmark", + test_template_name="NixlEP", + cmd_args=NixlEPCmdArgs( + docker_image_url="docker.io/nvidia/nixl-ep:latest", + plan=SINGLE_EXPANSION_PLAN_STR, + num_processes_per_node=4, + ), + ) + test_run = TestRun( + name="nixl-ep", + num_nodes=2, + nodes=[], + test=tdef, + output_path=slurm_system.output_path, + ) + strategy = NixlEPSlurmCommandGenStrategy(slurm_system, test_run) + + launcher_script = read_launcher_script(strategy) + + assert "allow_planned_removal_143=0" in launcher_script + assert "Ignoring provisional NIXL EP planned-rank-removal exit 143" in launcher_script + assert 'wait_for_phase_completion "1"' not in launcher_script + + def test_gen_srun_command_multi_node_single_stage_starts_followers( slurm_system: SlurmSystem, ) -> None: diff --git a/tests/workloads/nixl_ep/test_job_status_retrieval_strategy.py b/tests/workloads/nixl_ep/test_job_status_retrieval_strategy.py index de0dcab6c..20ae9ca48 100644 --- a/tests/workloads/nixl_ep/test_job_status_retrieval_strategy.py +++ b/tests/workloads/nixl_ep/test_job_status_retrieval_strategy.py @@ -69,6 +69,51 @@ def test_successful_job(self, nixl_ep_tr: TestRun) -> None: assert result.is_successful assert result.error_message == "" + def test_planned_srun_termination_is_ignored_when_benchmark_output_exists(self, nixl_ep_tr: TestRun) -> None: + nixl_ep_tr.output_path.mkdir(parents=True, exist_ok=True) + for node_idx in range(num_nodes(nixl_ep_tr)): + (nixl_ep_tr.output_path / f"nixl-ep-node-{node_idx}.log").write_text( + SUCCESSFUL_BANDWIDTH_LINE, encoding="utf-8" + ) + (nixl_ep_tr.output_path / "stderr.txt").write_text( + "\n".join( + [ + "srun: error: node001: task 0: Terminated", + "srun: Terminating StepId=123.4", + "srun: Force Terminated StepId=123.4", + ] + ) + + "\n", + encoding="utf-8", + ) + + result = nixl_ep_tr.test.was_run_successful(nixl_ep_tr) + + assert result.is_successful + + def test_unplanned_srun_termination_is_reported(self, nixl_ep_tr: TestRun) -> None: + nixl_ep_tr.output_path.mkdir(parents=True, exist_ok=True) + test_def = cast(NixlEPTestDefinition, nixl_ep_tr.test) + test_def.cmd_args = NixlEPCmdArgs( + docker_image_url="docker.io/nvidia/nixl-ep:latest", + elastic_script="/workspace/nixl/examples/device/ep/tests/elastic/elastic.py", + plan="[[0, 1], [0, 1, 2, 3]]", + num_processes_per_node=4, + ) + for node_idx in range(num_nodes(nixl_ep_tr)): + (nixl_ep_tr.output_path / f"nixl-ep-node-{node_idx}.log").write_text( + SUCCESSFUL_BANDWIDTH_LINE, encoding="utf-8" + ) + (nixl_ep_tr.output_path / "stderr.txt").write_text( + "srun: error: node001: task 0: Terminated\n", + encoding="utf-8", + ) + + result = nixl_ep_tr.test.was_run_successful(nixl_ep_tr) + + assert not result.is_successful + assert "srun failure" in result.error_message + def test_launcher_path_error_is_reported(self, nixl_ep_tr: TestRun) -> None: nixl_ep_tr.output_path.mkdir(parents=True, exist_ok=True) (nixl_ep_tr.output_path / "nixl-ep-node-0.log").write_text( From f39e0ae99e7412c64e96651fa1bd3f78b7c56395 Mon Sep 17 00:00:00 2001 From: Ivan Podkidyshev Date: Tue, 9 Jun 2026 18:25:05 +0200 Subject: [PATCH 2/3] loosen termination detection check --- src/cloudai/workloads/nixl_ep/nixl_ep.py | 5 ++-- .../test_job_status_retrieval_strategy.py | 25 +++++++++++++++++++ 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/src/cloudai/workloads/nixl_ep/nixl_ep.py b/src/cloudai/workloads/nixl_ep/nixl_ep.py index f5ae1acd6..7a05a2feb 100644 --- a/src/cloudai/workloads/nixl_ep/nixl_ep.py +++ b/src/cloudai/workloads/nixl_ep/nixl_ep.py @@ -157,7 +157,8 @@ def _looks_like_planned_srun_termination(content: str) -> bool: re.compile(r"^srun: Force Terminated StepId=\S+$"), ) lines = [line.strip() for line in content.splitlines() if line.strip()] - return bool(lines) and all(any(pattern.match(line) for pattern in allowed_patterns) for line in lines) + srun_lines = [line for line in lines if line.startswith("srun:")] + return bool(srun_lines) and all(any(pattern.match(line) for pattern in allowed_patterns) for line in srun_lines) def _has_planned_rank_removal(self) -> bool: plans = self.cmd_args.plan if isinstance(self.cmd_args.plan, list) else [self.cmd_args.plan] @@ -169,7 +170,7 @@ def _scan_log_for_failures(self, path: Path) -> JobStatusResult | None: content = path.read_text(encoding="utf-8", errors="ignore") if self._has_planned_rank_removal() and self._looks_like_planned_srun_termination(content): - return None + content = "\n".join(line for line in content.splitlines() if not line.strip().startswith("srun:")) launcher_failure_patterns = ( ("python3: can't open file", "The benchmark entrypoint could not be opened."), diff --git a/tests/workloads/nixl_ep/test_job_status_retrieval_strategy.py b/tests/workloads/nixl_ep/test_job_status_retrieval_strategy.py index 20ae9ca48..173061573 100644 --- a/tests/workloads/nixl_ep/test_job_status_retrieval_strategy.py +++ b/tests/workloads/nixl_ep/test_job_status_retrieval_strategy.py @@ -79,6 +79,7 @@ def test_planned_srun_termination_is_ignored_when_benchmark_output_exists(self, "\n".join( [ "srun: error: node001: task 0: Terminated", + "Ignoring provisional NIXL EP planned-rank-removal exit 143", "srun: Terminating StepId=123.4", "srun: Force Terminated StepId=123.4", ] @@ -91,6 +92,30 @@ def test_planned_srun_termination_is_ignored_when_benchmark_output_exists(self, assert result.is_successful + def test_planned_srun_termination_still_reports_other_failures(self, nixl_ep_tr: TestRun) -> None: + nixl_ep_tr.output_path.mkdir(parents=True, exist_ok=True) + for node_idx in range(num_nodes(nixl_ep_tr)): + (nixl_ep_tr.output_path / f"nixl-ep-node-{node_idx}.log").write_text( + SUCCESSFUL_BANDWIDTH_LINE, encoding="utf-8" + ) + (nixl_ep_tr.output_path / "stderr.txt").write_text( + "\n".join( + [ + "srun: error: node001: task 0: Terminated", + "Traceback (most recent call last):", + "srun: Terminating StepId=123.4", + "srun: Force Terminated StepId=123.4", + ] + ) + + "\n", + encoding="utf-8", + ) + + result = nixl_ep_tr.test.was_run_successful(nixl_ep_tr) + + assert not result.is_successful + assert "Python traceback" in result.error_message + def test_unplanned_srun_termination_is_reported(self, nixl_ep_tr: TestRun) -> None: nixl_ep_tr.output_path.mkdir(parents=True, exist_ok=True) test_def = cast(NixlEPTestDefinition, nixl_ep_tr.test) From f3f366cc256950294d0952633ba094182d34d149 Mon Sep 17 00:00:00 2001 From: Ivan Podkidyshev Date: Tue, 9 Jun 2026 18:50:01 +0200 Subject: [PATCH 3/3] tightened back planned termination check + tests refactor --- src/cloudai/workloads/nixl_ep/nixl_ep.py | 6 +- .../test_command_gen_strategy_slurm.py | 36 ---- .../test_job_status_retrieval_strategy.py | 182 ++++++++---------- 3 files changed, 88 insertions(+), 136 deletions(-) diff --git a/src/cloudai/workloads/nixl_ep/nixl_ep.py b/src/cloudai/workloads/nixl_ep/nixl_ep.py index 7a05a2feb..0b37a88b7 100644 --- a/src/cloudai/workloads/nixl_ep/nixl_ep.py +++ b/src/cloudai/workloads/nixl_ep/nixl_ep.py @@ -158,7 +158,11 @@ def _looks_like_planned_srun_termination(content: str) -> bool: ) lines = [line.strip() for line in content.splitlines() if line.strip()] srun_lines = [line for line in lines if line.startswith("srun:")] - return bool(srun_lines) and all(any(pattern.match(line) for pattern in allowed_patterns) for line in srun_lines) + return ( + bool(srun_lines) + and all(any(pattern.match(line) for pattern in allowed_patterns) for line in srun_lines) + and all(any(pattern.match(line) for line in srun_lines) for pattern in allowed_patterns) + ) def _has_planned_rank_removal(self) -> bool: plans = self.cmd_args.plan if isinstance(self.cmd_args.plan, list) else [self.cmd_args.plan] diff --git a/tests/workloads/nixl_ep/test_command_gen_strategy_slurm.py b/tests/workloads/nixl_ep/test_command_gen_strategy_slurm.py index 20e174460..c72e13626 100644 --- a/tests/workloads/nixl_ep/test_command_gen_strategy_slurm.py +++ b/tests/workloads/nixl_ep/test_command_gen_strategy_slurm.py @@ -15,9 +15,6 @@ # limitations under the License. import json -import re -from importlib.metadata import version -from pathlib import Path import pytest @@ -102,23 +99,6 @@ def nixl_ep_tr(nixl_ep: NixlEPTestDefinition, slurm_system: SlurmSystem) -> Test ) -def normalize_sbatch(content: str, test_run: TestRun, slurm_system: SlurmSystem) -> str: - normalized = content.replace(str(slurm_system.install_path.absolute()), "__INSTALL_DIR__").replace( - str(test_run.output_path.parent.absolute()), "__OUTPUT_DIR__" - ) - normalized = re.sub( - r"^#SBATCH --job-name=.*$", - "#SBATCH --job-name=__JOB_NAME__", - normalized, - flags=re.MULTILINE, - ) - return normalized.replace(version("cloudai"), "__CLOUDAI_VERSION__") - - -def significant_sbatch_lines(content: str) -> list[str]: - return [line for line in content.splitlines() if line.strip() and not line.lstrip().startswith("echo ")] - - def normalize_stages(strategy: NixlEPSlurmCommandGenStrategy) -> list[tuple[int, tuple[int, ...]]]: num_nodes, _ = strategy.get_cached_nodes_spec() normalized_stages: list[tuple[int, tuple[int, ...]]] = [] @@ -886,19 +866,3 @@ def test_gen_srun_command_single_launch_reports_success( assert 'echo "All NIXL EP launches completed successfully"' in launcher_script assert 'if [ "$rc" -eq 0 ]; then' in launcher_script assert "exit $rc" in launcher_script - - -def test_gen_exec_command_matches_reference(nixl_ep_tr: TestRun, slurm_system: SlurmSystem) -> None: - slurm_system.container_mount_home = True - strategy = NixlEPSlurmCommandGenStrategy(slurm_system, nixl_ep_tr) - - sbatch_cmd = strategy.gen_exec_command() - - assert sbatch_cmd == f"sbatch {nixl_ep_tr.output_path / 'cloudai_sbatch_script.sh'}" - - content = (nixl_ep_tr.output_path / "cloudai_sbatch_script.sh").read_text().strip() - content = normalize_sbatch(content, nixl_ep_tr, slurm_system) - - ref = (Path(__file__).parents[2] / "ref_data" / "nixl-ep.sbatch").read_text().strip() - ref = normalize_sbatch(ref, nixl_ep_tr, slurm_system) - assert significant_sbatch_lines(content) == significant_sbatch_lines(ref) diff --git a/tests/workloads/nixl_ep/test_job_status_retrieval_strategy.py b/tests/workloads/nixl_ep/test_job_status_retrieval_strategy.py index 173061573..d84f19dd4 100644 --- a/tests/workloads/nixl_ep/test_job_status_retrieval_strategy.py +++ b/tests/workloads/nixl_ep/test_job_status_retrieval_strategy.py @@ -24,6 +24,7 @@ EXPANSION_CONTRACTION_PLAN = ( "[[0, 1, 2, 3], [0, 1, 2, 3, 4, 5, 6, 7], [0, 1, 2, 3, 4, -6, 7], [0, 1, 2, 3, 4, 5, 6, 7]]" ) +NO_RANK_REMOVAL_PLAN = "[[0, 1], [0, 1, 2, 3]]" SUCCESSFUL_BANDWIDTH_LINE = ( "[rank 0] Dispatch + combine bandwidth: 12.34 GB/s, avg_t=56.7 us, min_t=50.0 us, max_t=60.0 us\n" ) @@ -36,6 +37,22 @@ def num_nodes(test_run: TestRun) -> int: return cast(int, test_run.num_nodes) +def write_successful_node_logs(test_run: TestRun) -> None: + test_run.output_path.mkdir(parents=True, exist_ok=True) + for node_idx in range(num_nodes(test_run)): + (test_run.output_path / f"nixl-ep-node-{node_idx}.log").write_text(SUCCESSFUL_BANDWIDTH_LINE, encoding="utf-8") + + +def set_plan(test_run: TestRun, plan: str) -> None: + test_def = cast(NixlEPTestDefinition, test_run.test) + test_def.cmd_args = NixlEPCmdArgs( + docker_image_url="docker.io/nvidia/nixl-ep:latest", + elastic_script="/workspace/nixl/examples/device/ep/tests/elastic/elastic.py", + plan=plan, + num_processes_per_node=4, + ) + + @pytest.fixture def nixl_ep_tr(tmp_path) -> TestRun: tdef = NixlEPTestDefinition( @@ -53,91 +70,74 @@ def nixl_ep_tr(tmp_path) -> TestRun: class TestNixlEPStatusCheck: - def test_successful_job(self, nixl_ep_tr: TestRun) -> None: - nixl_ep_tr.output_path.mkdir(parents=True, exist_ok=True) - for node_idx in range(num_nodes(nixl_ep_tr)): - (nixl_ep_tr.output_path / f"nixl-ep-node-{node_idx}.log").write_text( - SUCCESSFUL_BANDWIDTH_LINE, encoding="utf-8" - ) - (nixl_ep_tr.output_path / "slurm-job.toml").write_text( - 'state = "COMPLETED"\nexit_code = "0:0"\n', - encoding="utf-8", - ) - - result = nixl_ep_tr.test.was_run_successful(nixl_ep_tr) - - assert result.is_successful - assert result.error_message == "" - - def test_planned_srun_termination_is_ignored_when_benchmark_output_exists(self, nixl_ep_tr: TestRun) -> None: - nixl_ep_tr.output_path.mkdir(parents=True, exist_ok=True) - for node_idx in range(num_nodes(nixl_ep_tr)): - (nixl_ep_tr.output_path / f"nixl-ep-node-{node_idx}.log").write_text( - SUCCESSFUL_BANDWIDTH_LINE, encoding="utf-8" - ) - (nixl_ep_tr.output_path / "stderr.txt").write_text( - "\n".join( - [ - "srun: error: node001: task 0: Terminated", - "Ignoring provisional NIXL EP planned-rank-removal exit 143", - "srun: Terminating StepId=123.4", - "srun: Force Terminated StepId=123.4", - ] - ) - + "\n", - encoding="utf-8", - ) - - result = nixl_ep_tr.test.was_run_successful(nixl_ep_tr) - - assert result.is_successful - - def test_planned_srun_termination_still_reports_other_failures(self, nixl_ep_tr: TestRun) -> None: - nixl_ep_tr.output_path.mkdir(parents=True, exist_ok=True) - for node_idx in range(num_nodes(nixl_ep_tr)): - (nixl_ep_tr.output_path / f"nixl-ep-node-{node_idx}.log").write_text( - SUCCESSFUL_BANDWIDTH_LINE, encoding="utf-8" - ) - (nixl_ep_tr.output_path / "stderr.txt").write_text( - "\n".join( - [ - "srun: error: node001: task 0: Terminated", - "Traceback (most recent call last):", - "srun: Terminating StepId=123.4", - "srun: Force Terminated StepId=123.4", - ] - ) - + "\n", - encoding="utf-8", - ) - - result = nixl_ep_tr.test.was_run_successful(nixl_ep_tr) - - assert not result.is_successful - assert "Python traceback" in result.error_message - - def test_unplanned_srun_termination_is_reported(self, nixl_ep_tr: TestRun) -> None: - nixl_ep_tr.output_path.mkdir(parents=True, exist_ok=True) - test_def = cast(NixlEPTestDefinition, nixl_ep_tr.test) - test_def.cmd_args = NixlEPCmdArgs( - docker_image_url="docker.io/nvidia/nixl-ep:latest", - elastic_script="/workspace/nixl/examples/device/ep/tests/elastic/elastic.py", - plan="[[0, 1], [0, 1, 2, 3]]", - num_processes_per_node=4, - ) - for node_idx in range(num_nodes(nixl_ep_tr)): - (nixl_ep_tr.output_path / f"nixl-ep-node-{node_idx}.log").write_text( - SUCCESSFUL_BANDWIDTH_LINE, encoding="utf-8" - ) + @pytest.mark.parametrize( + ("plan", "stderr_content", "expected_fragment"), + [ + pytest.param( + EXPANSION_CONTRACTION_PLAN, + ( + "srun: error: node001: task 0: Terminated\n" + "Ignoring provisional NIXL EP planned-rank-removal exit 143\n" + "srun: Terminating StepId=123.4\n" + "srun: Force Terminated StepId=123.4\n" + ), + None, + id="planned-rank-removal-full-srun-signature-is-ignored", + ), + pytest.param( + EXPANSION_CONTRACTION_PLAN, + "srun: error: node001: task 0: Terminated\n", + "srun failure", + id="planned-rank-removal-incomplete-srun-signature-is-reported", + ), + pytest.param( + EXPANSION_CONTRACTION_PLAN, + ( + "srun: error: node001: task 0: Terminated\n" + "Traceback (most recent call last):\n" + "srun: Terminating StepId=123.4\n" + "srun: Force Terminated StepId=123.4\n" + ), + "Python traceback", + id="planned-rank-removal-keeps-non-srun-failures", + ), + pytest.param( + EXPANSION_CONTRACTION_PLAN, + ( + "srun: error: node001: task 0: Terminated\n" + "srun: Terminating StepId=123.4\n" + "srun: Force Terminated StepId=123.4\n" + "srun: error: Unable to allocate resources: Invalid node name\n" + ), + "srun failure", + id="planned-rank-removal-keeps-other-srun-failures", + ), + pytest.param( + NO_RANK_REMOVAL_PLAN, + "srun: error: node001: task 0: Terminated\n", + "srun failure", + id="unplanned-rank-removal-srun-termination-is-reported", + ), + ], + ) + def test_srun_termination_detection( + self, nixl_ep_tr: TestRun, plan: str, stderr_content: str, expected_fragment: str | None + ) -> None: + set_plan(nixl_ep_tr, plan) + write_successful_node_logs(nixl_ep_tr) (nixl_ep_tr.output_path / "stderr.txt").write_text( - "srun: error: node001: task 0: Terminated\n", + stderr_content, encoding="utf-8", ) result = nixl_ep_tr.test.was_run_successful(nixl_ep_tr) - assert not result.is_successful - assert "srun failure" in result.error_message + if expected_fragment is None: + assert result.is_successful + assert result.error_message == "" + else: + assert not result.is_successful + assert expected_fragment in result.error_message def test_launcher_path_error_is_reported(self, nixl_ep_tr: TestRun) -> None: nixl_ep_tr.output_path.mkdir(parents=True, exist_ok=True) @@ -165,11 +165,7 @@ def test_missing_node_logs_is_reported(self, nixl_ep_tr: TestRun) -> None: assert "nixl-ep-node-1.log, nixl-ep-node-2.log" in result.error_message def test_plan_mismatch_is_reported(self, nixl_ep_tr: TestRun) -> None: - nixl_ep_tr.output_path.mkdir(parents=True, exist_ok=True) - for node_idx in range(num_nodes(nixl_ep_tr)): - (nixl_ep_tr.output_path / f"nixl-ep-node-{node_idx}.log").write_text( - SUCCESSFUL_BANDWIDTH_LINE, encoding="utf-8" - ) + write_successful_node_logs(nixl_ep_tr) (nixl_ep_tr.output_path / "nixl-ep-node-1.log").write_text( "Process 0 -> no plan phases were found for rank 9 after phase None, exiting\n", encoding="utf-8", @@ -185,11 +181,7 @@ def test_plan_mismatch_is_reported(self, nixl_ep_tr: TestRun) -> None: assert "never appears in the plan" in result.error_message def test_tcpstore_timeout_is_reported(self, nixl_ep_tr: TestRun) -> None: - nixl_ep_tr.output_path.mkdir(parents=True, exist_ok=True) - for node_idx in range(num_nodes(nixl_ep_tr)): - (nixl_ep_tr.output_path / f"nixl-ep-node-{node_idx}.log").write_text( - SUCCESSFUL_BANDWIDTH_LINE, encoding="utf-8" - ) + write_successful_node_logs(nixl_ep_tr) (nixl_ep_tr.output_path / "nixl-ep-node-2.log").write_text( TCPSTORE_TIMEOUT_LINE, encoding="utf-8", @@ -205,11 +197,7 @@ def test_tcpstore_timeout_is_reported(self, nixl_ep_tr: TestRun) -> None: assert "lost its TCPStore connection" in result.error_message def test_primary_launch_exit_before_phase_completion_is_reported(self, nixl_ep_tr: TestRun) -> None: - nixl_ep_tr.output_path.mkdir(parents=True, exist_ok=True) - for node_idx in range(num_nodes(nixl_ep_tr)): - (nixl_ep_tr.output_path / f"nixl-ep-node-{node_idx}.log").write_text( - SUCCESSFUL_BANDWIDTH_LINE, encoding="utf-8" - ) + write_successful_node_logs(nixl_ep_tr) (nixl_ep_tr.output_path / "nixl-ep-node-1.log").write_text( "Primary NIXL EP launch exited before phase 1 completed\n", encoding="utf-8", @@ -243,11 +231,7 @@ def test_initial_primary_launch_exit_explains_missing_later_node_logs(self, nixl assert "some node logs may be absent" in result.error_message def test_ucx_remote_memory_view_failure_is_reported(self, nixl_ep_tr: TestRun) -> None: - nixl_ep_tr.output_path.mkdir(parents=True, exist_ok=True) - for node_idx in range(num_nodes(nixl_ep_tr)): - (nixl_ep_tr.output_path / f"nixl-ep-node-{node_idx}.log").write_text( - SUCCESSFUL_BANDWIDTH_LINE, encoding="utf-8" - ) + write_successful_node_logs(nixl_ep_tr) (nixl_ep_tr.output_path / "nixl-ep-node-0.log").write_text( "E0319 04:13:25.442619 950677 ucx_backend.cpp:1486] " "Failed to prepare remote memory view: Failed to create device memory list(remote): No such device\n",