From 6829336978bf45021d0d8a120c81b9defb4d3023 Mon Sep 17 00:00:00 2001 From: kapil27 Date: Thu, 19 Mar 2026 10:57:38 +0530 Subject: [PATCH] Enhance failure scenario notebooks and tests by adding environment variable checks, improving logging, and integrating a Kubeflow installation script. Update training runtime resolution and ensure proper handling of disconnected environments. --- .../trainer/resources/failure_scenarios.ipynb | 544 +++++++++++++----- tests/trainer/resources/rhai_features.ipynb | 68 +-- .../sdk_tests/failure_traininghub_tests.go | 65 ++- .../trainer/sdk_tests/fashion_mnist_tests.go | 4 + .../trainer/sdk_tests/rhai_features_tests.go | 22 +- 5 files changed, 530 insertions(+), 173 deletions(-) diff --git a/tests/trainer/resources/failure_scenarios.ipynb b/tests/trainer/resources/failure_scenarios.ipynb index fce80dc78..42e46b089 100644 --- a/tests/trainer/resources/failure_scenarios.ipynb +++ b/tests/trainer/resources/failure_scenarios.ipynb @@ -1,132 +1,416 @@ { - "cells": [ - { - "cell_type": "code", - "id": "cell-pip-install", - "metadata": {}, - "outputs": [], - "source": [ - "# pip Install kubeflow SDK from main branch for testing\n", - "%pip install git+https://github.com/opendatahub-io/kubeflow-sdk.git@main" - ], - "execution_count": null + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "cell-pip-install", + "metadata": {}, + "outputs": [], + "source": [ + "# pip Install kubeflow SDK from main branch for testing\n", + "%pip install git+https://github.com/opendatahub-io/kubeflow-sdk.git@main" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "cell-k8s-setup", + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "import urllib3\n", + "from kubernetes import client as k8s\n", + "\n", + "# Suppress InsecureRequestWarning since we use verify_ssl = False\n", + "urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)\n", + "\n", + "api_server = os.getenv(\"OPENSHIFT_API_URL\")\n", + "token = os.getenv(\"NOTEBOOK_USER_TOKEN\")\n", + "if not api_server or not token:\n", + " raise RuntimeError(\"OPENSHIFT_API_URL and NOTEBOOK_USER_TOKEN environment variables are required\")\n", + "\n", + "configuration = k8s.Configuration()\n", + "configuration.host = api_server\n", + "configuration.verify_ssl = False\n", + "configuration.api_key = {\"authorization\": f\"Bearer {token}\"}\n", + "api_client = k8s.ApiClient(configuration)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "cell-trainer-client", + "metadata": {}, + "outputs": [], + "source": [ + "from kubeflow.trainer import TrainerClient\n", + "from kubeflow.common.types import KubernetesBackendConfig\n", + "\n", + "backend_cfg = KubernetesBackendConfig(\n", + " client_configuration=api_client.configuration,\n", + ")\n", + "\n", + "client = TrainerClient(backend_cfg)\n", + "print(client)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "cell-find-runtime", + "metadata": {}, + "outputs": [], + "source": [ + "training_runtime_name = os.getenv(\"TRAINING_RUNTIME\")\n", + "if not training_runtime_name:\n", + " raise RuntimeError(\"TRAINING_RUNTIME environment variable is required\")\n", + "\n", + "th_runtime = None\n", + "for runtime in client.list_runtimes():\n", + " if runtime.name == training_runtime_name:\n", + " th_runtime = runtime\n", + " print(\"Found runtime: \" + str(th_runtime))\n", + " break\n", + "\n", + "if th_runtime is None:\n", + " raise RuntimeError(f\"Required runtime '{training_runtime_name}' not found\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "cell-helper", + "metadata": {}, + "outputs": [], + "source": [ + "import time\n", + "\n", + "from kubeflow.trainer.rhai import TrainingHubAlgorithms, TrainingHubTrainer\n", + "\n", + "NAMESPACE = os.getenv(\"NOTEBOOK_NAMESPACE\")\n", + "print(f\"Using namespace: {NAMESPACE}\")\n", + "\n", + "\n", + "def check_job_failure(client, job_name):\n", + " \"\"\"Check if a TrainJob has failed using SDK APIs.\n", + "\n", + " Returns (failure_confirmed, details) where failure is confirmed if:\n", + " - get_job().status == \"Failed\", OR\n", + " - get_job_logs() already contains a terminal framework/runtime error\n", + " \"\"\"\n", + " failure_confirmed = False\n", + " details = []\n", + "\n", + " try:\n", + " job = client.get_job(name=job_name)\n", + " status = getattr(job, \"status\", \"\")\n", + " if status == \"Failed\":\n", + " failure_confirmed = True\n", + " details.append(\"job.status=Failed\")\n", + " except Exception as e:\n", + " print(f\" get_job() error: {e}\")\n", + "\n", + " try:\n", + " log_lines = list(client.get_job_logs(job_name, follow=False))\n", + " log_text = \"\\n\".join(str(line) for line in log_lines)\n", + " terminal_error_markers = [\n", + " \"Traceback (most recent call last)\",\n", + " \"RuntimeError:\",\n", + " \"ValueError:\",\n", + " \"FileNotFoundError:\",\n", + " \"AssertionError:\",\n", + " \"ChildFailedError\",\n", + " \"OutOfMemoryError\",\n", + " ]\n", + " for marker in terminal_error_markers:\n", + " if marker in log_text:\n", + " failure_confirmed = True\n", + " details.append(f\"log marker={marker}\")\n", + " break\n", + " except Exception as e:\n", + " print(f\" get_job_logs() error while checking failure: {e}\")\n", + "\n", + " return failure_confirmed, details\n", + "\n", + "\n", + "def run_failure_scenario(client, scenario_name, training_params, expected_error, runtime, algorithm=TrainingHubAlgorithms.SFT):\n", + " \"\"\"Submit a TrainJob expected to fail, verify the expected error in logs\n", + " and that the job failure is detected via SDK-visible state or terminal logs.\"\"\"\n", + " print(f\"\\n{'='*60}\")\n", + " print(f\"Scenario: {scenario_name}\")\n", + " print(f\"Expected error: {expected_error}\")\n", + " print(f\"{'='*60}\")\n", + "\n", + " job_name = None\n", + "\n", + " try:\n", + " job_name = client.train(\n", + " trainer=TrainingHubTrainer(\n", + " algorithm=algorithm,\n", + " func_args=training_params,\n", + " resources_per_node={\n", + " \"cpu\": 4,\n", + " \"memory\": \"16Gi\",\n", + " },\n", + " ),\n", + " runtime=runtime,\n", + " )\n", + " print(f\"TrainJob created: {job_name}\")\n", + "\n", + " # Wait for the job to start running\n", + " try:\n", + " client.wait_for_job_status(name=job_name, status={\"Running\", \"Failed\"}, timeout=300)\n", + " except Exception as e:\n", + " print(f\"Wait for Running/Failed raised: {e}\")\n", + "\n", + " # Poll until we see: (1) the expected error in logs, and (2) failure via SDK APIs\n", + " error_found = False\n", + " failure_confirmed = False\n", + " deadline = time.time() + 300 # 5 minute timeout\n", + "\n", + " while time.time() < deadline:\n", + " # Check logs for the expected error\n", + " if not error_found:\n", + " try:\n", + " log_lines = list(client.get_job_logs(job_name, follow=False))\n", + " log_text = \"\\n\".join(str(line) for line in log_lines)\n", + " if expected_error in log_text:\n", + " error_found = True\n", + " print(f\"Found expected error in logs: '{expected_error}'\")\n", + " except Exception as e:\n", + " print(f\"Log fetch error (retrying): {e}\")\n", + "\n", + " # Check job failure via get_job() and terminal log markers\n", + " if not failure_confirmed:\n", + " failure_confirmed, details = check_job_failure(client, job_name)\n", + " if failure_confirmed:\n", + " print(f\"Job failure confirmed via SDK/logs: {'; '.join(details)}\")\n", + "\n", + " if error_found and failure_confirmed:\n", + " break\n", + "\n", + " time.sleep(15)\n", + "\n", + " # Verify results\n", + " assert error_found, f\"Expected error '{expected_error}' not found in logs\"\n", + " assert failure_confirmed, (\n", + " f\"Job failure not confirmed via SDK/logs \"\n", + " f\"— final job status: {client.get_job(name=job_name).status}\"\n", + " )\n", + "\n", + " print(f\"PASSED: {scenario_name}\")\n", + " result = True\n", + "\n", + " except Exception as e:\n", + " print(f\"FAILED: {scenario_name} - {e}\")\n", + " result = False\n", + "\n", + " finally:\n", + " # Cleanup\n", + " if job_name:\n", + " try:\n", + " client.delete_job(job_name)\n", + " print(f\"Deleted job: {job_name}\")\n", + " except Exception as e:\n", + " print(f\"Warning: failed to delete job {job_name}: {e}\")\n", + "\n", + " return result" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "cell-scenario-invalid-dataset", + "metadata": {}, + "outputs": [], + "source": [ + "# Scenario 1: Invalid Dataset Path\n", + "result_1 = run_failure_scenario(\n", + " client=client,\n", + " scenario_name=\"Invalid Dataset Path\",\n", + " training_params={\n", + " \"model_path\": \"/opt/app-root/src\",\n", + " \"data_path\": \"/nonexistent/dataset.jsonl\",\n", + " \"ckpt_output_dir\": \"/opt/app-root/src/checkpoints\",\n", + " \"data_output_dir\": \"/opt/app-root/src/sft-data\",\n", + " \"effective_batch_size\": 128,\n", + " \"num_epochs\": 1,\n", + " \"max_seq_len\": 4096,\n", + " \"max_batch_len\": 10000,\n", + " \"learning_rate\": 5e-6,\n", + " },\n", + " expected_error=\"FileNotFoundError\",\n", + " runtime=th_runtime,\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "cell-scenario-invalid-model", + "metadata": {}, + "outputs": [], + "source": [ + "# Scenario 2: Invalid Model Path\n", + "result_2 = run_failure_scenario(\n", + " client=client,\n", + " scenario_name=\"Invalid Model Path\",\n", + " training_params={\n", + " \"model_path\": \"/nonexistent/model\",\n", + " \"data_path\": \"/opt/app-root/src/data.jsonl\",\n", + " \"ckpt_output_dir\": \"/opt/app-root/src/checkpoints\",\n", + " \"data_output_dir\": \"/opt/app-root/src/sft-data\",\n", + " \"effective_batch_size\": 128,\n", + " \"num_epochs\": 1,\n", + " \"max_seq_len\": 4096,\n", + " \"max_batch_len\": 10000,\n", + " \"learning_rate\": 5e-6,\n", + " },\n", + " expected_error=\"FileNotFoundError\",\n", + " runtime=th_runtime,\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "cell-scenario-invalid-hyperparams", + "metadata": {}, + "outputs": [], + "source": [ + "# Scenario 3: Invalid Training Hyperparameters\n", + "# Uses a string value for max_batch_len where an int is expected,\n", + "# which triggers a Pydantic ValidationError during config parsing.\n", + "result_3 = run_failure_scenario(\n", + " client=client,\n", + " scenario_name=\"Invalid Training Hyperparameters\",\n", + " training_params={\n", + " \"model_path\": \"/opt/app-root/src\",\n", + " \"data_path\": \"/opt/app-root/src/data.jsonl\",\n", + " \"ckpt_output_dir\": \"/opt/app-root/src/checkpoints\",\n", + " \"data_output_dir\": \"/opt/app-root/src/sft-data\",\n", + " \"effective_batch_size\": 128,\n", + " \"num_epochs\": 1,\n", + " \"max_seq_len\": 4096,\n", + " \"max_batch_len\": \"not_a_number\",\n", + " \"learning_rate\": 5e-6,\n", + " },\n", + " expected_error=\"ValidationError\",\n", + " runtime=th_runtime,\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "cc9br1onzf", + "metadata": {}, + "outputs": [], + "source": [ + "# Scenario 4: OSFT — Invalid unfreeze_rank_ratio\n", + "# The OSFT algorithm validates unfreeze_rank_ratio with a manual range check\n", + "# (0.0 <= val <= 1.0), so passing a string triggers a TypeError.\n", + "result_4 = run_failure_scenario(\n", + " client=client,\n", + " scenario_name=\"OSFT Invalid Unfreeze Rank Ratio\",\n", + " training_params={\n", + " \"model_path\": \"/opt/app-root/src\",\n", + " \"data_path\": \"/opt/app-root/src/data.jsonl\",\n", + " \"ckpt_output_dir\": \"/opt/app-root/src/checkpoints\",\n", + " \"data_output_path\": \"/opt/app-root/src/osft-data\",\n", + " \"effective_batch_size\": 128,\n", + " \"num_epochs\": 1,\n", + " \"max_seq_len\": 2048,\n", + " \"max_tokens_per_gpu\": 32000,\n", + " \"learning_rate\": 5e-6,\n", + " \"unfreeze_rank_ratio\": \"not_a_number\", # Invalid: string instead of float\n", + " },\n", + " expected_error=\"'<=' not supported between instances of 'float' and 'str'\",\n", + " runtime=th_runtime,\n", + " algorithm=TrainingHubAlgorithms.OSFT,\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "qj6u4whtjg", + "metadata": {}, + "outputs": [], + "source": [ + "# Scenario 5: LORA — Invalid model path\n", + "# LORA uses Unsloth's FastLanguageModel which raises a RuntimeError when\n", + "# the model path doesn't exist, before any LoRA-specific validation runs.\n", + "result_5 = run_failure_scenario(\n", + " client=client,\n", + " scenario_name=\"LORA Invalid Model Path\",\n", + " training_params={\n", + " \"model_path\": \"/nonexistent/model\",\n", + " \"data_path\": \"/opt/app-root/src/data.jsonl\",\n", + " \"ckpt_output_dir\": \"/opt/app-root/src/checkpoints\",\n", + " \"data_output_dir\": \"/opt/app-root/src/sft-data\",\n", + " \"micro_batch_size\": 16,\n", + " \"num_epochs\": 1,\n", + " \"max_seq_len\": 1024,\n", + " \"learning_rate\": 2e-4,\n", + " \"lora_r\": 16,\n", + " \"lora_alpha\": 32,\n", + " \"lora_dropout\": 0.0,\n", + " \"dataset_type\": \"chat_template\",\n", + " \"field_messages\": \"messages\",\n", + " },\n", + " expected_error=\"No config file found\",\n", + " runtime=th_runtime,\n", + " algorithm=TrainingHubAlgorithms.LORA_SFT,\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "cell-summary", + "metadata": {}, + "outputs": [], + "source": [ + "# Summary\n", + "results = {\n", + " \"Invalid Dataset Path\": result_1,\n", + " \"Invalid Model Path\": result_2,\n", + " \"Invalid Training Hyperparameters\": result_3,\n", + " \"OSFT Invalid Unfreeze Rank Ratio\": result_4,\n", + " \"LORA Invalid Model Path\": result_5,\n", + "}\n", + "\n", + "print(\"\\n\" + \"=\" * 60)\n", + "print(\"FAILURE SCENARIOS TEST SUMMARY\")\n", + "print(\"=\" * 60)\n", + "for name, passed in results.items():\n", + " status = \"PASSED\" if passed else \"FAILED\"\n", + " print(f\" {name}: {status}\")\n", + "\n", + "all_passed = all(results.values())\n", + "print(\"=\" * 60)\n", + "if all_passed:\n", + " print(\"NOTEBOOK_STATUS: SUCCESS\")\n", + "else:\n", + " failed = [name for name, passed in results.items() if not passed]\n", + " print(\"NOTEBOOK_STATUS: FAILURE\")\n", + " raise RuntimeError(f\"Failed scenarios: {', '.join(failed)}\")" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "name": "python", + "version": "3.12.0" + } }, - { - "cell_type": "code", - "id": "cell-k8s-setup", - "metadata": {}, - "outputs": [], - "source": "import os\nimport urllib3\nfrom kubernetes import client as k8s\n\n# Suppress InsecureRequestWarning since we use verify_ssl = False\nurllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)\n\napi_server = os.getenv(\"OPENSHIFT_API_URL\")\ntoken = os.getenv(\"NOTEBOOK_USER_TOKEN\")\nif not api_server or not token:\n raise RuntimeError(\"OPENSHIFT_API_URL and NOTEBOOK_USER_TOKEN environment variables are required\")\n\nconfiguration = k8s.Configuration()\nconfiguration.host = api_server\nconfiguration.verify_ssl = False\nconfiguration.api_key = {\"authorization\": f\"Bearer {token}\"}\napi_client = k8s.ApiClient(configuration)", - "execution_count": null - }, - { - "cell_type": "code", - "id": "cell-trainer-client", - "metadata": {}, - "outputs": [], - "source": [ - "from kubeflow.trainer import TrainerClient\n", - "from kubeflow.common.types import KubernetesBackendConfig\n", - "\n", - "backend_cfg = KubernetesBackendConfig(\n", - " client_configuration=api_client.configuration,\n", - ")\n", - "\n", - "client = TrainerClient(backend_cfg)\n", - "print(client)" - ], - "execution_count": null - }, - { - "cell_type": "code", - "id": "cell-find-runtime", - "metadata": {}, - "outputs": [], - "source": [ - "training_runtime_name = os.getenv(\"TRAINING_RUNTIME\")\n", - "if not training_runtime_name:\n", - " raise RuntimeError(\"TRAINING_RUNTIME environment variable is required\")\n", - "\n", - "th_runtime = None\n", - "for runtime in client.list_runtimes():\n", - " if runtime.name == training_runtime_name:\n", - " th_runtime = runtime\n", - " print(\"Found runtime: \" + str(th_runtime))\n", - " break\n", - "\n", - "if th_runtime is None:\n", - " raise RuntimeError(f\"Required runtime '{training_runtime_name}' not found\")" - ], - "execution_count": null - }, - { - "cell_type": "code", - "id": "cell-helper", - "metadata": {}, - "outputs": [], - "source": "import time\n\nfrom kubeflow.trainer.rhai import TrainingHubAlgorithms, TrainingHubTrainer\n\nNAMESPACE = os.getenv(\"NOTEBOOK_NAMESPACE\")\nprint(f\"Using namespace: {NAMESPACE}\")\n\n# Event reasons that indicate a container crash or failure\nFAILURE_EVENT_REASONS = {\"BackOff\", \"CrashLoopBackOff\", \"Failed\", \"OOMKilled\", \"OOMKilling\", \"Killing\"}\n\n\ndef check_job_failure(client, job_name):\n \"\"\"Check if a TrainJob has failed using SDK APIs (get_job and get_job_events).\n\n Returns (failure_confirmed, details) where failure is confirmed if:\n - get_job().status == \"Failed\", OR\n - get_job_events() contains crash-related events (BackOff, OOMKilled, etc.)\n \"\"\"\n failure_confirmed = False\n details = []\n\n # Check job status via get_job()\n try:\n job = client.get_job(name=job_name)\n if job.status == \"Failed\":\n failure_confirmed = True\n details.append(f\"job.status=Failed\")\n except Exception as e:\n print(f\" get_job() error: {e}\")\n\n # Check events via get_job_events()\n try:\n events = client.get_job_events(name=job_name)\n for event in events:\n reason = getattr(event, \"reason\", \"\") or \"\"\n if reason in FAILURE_EVENT_REASONS:\n failure_confirmed = True\n message = getattr(event, \"message\", \"\") or \"\"\n details.append(f\"event: reason={reason} message={message[:100]}\")\n except Exception as e:\n print(f\" get_job_events() error: {e}\")\n\n return failure_confirmed, details\n\n\ndef run_failure_scenario(client, scenario_name, training_params, expected_error, runtime, algorithm=TrainingHubAlgorithms.SFT):\n \"\"\"Submit a TrainJob expected to fail, verify the expected error in logs\n and that the job failure is detected via get_job()/get_job_events().\"\"\"\n print(f\"\\n{'='*60}\")\n print(f\"Scenario: {scenario_name}\")\n print(f\"Expected error: {expected_error}\")\n print(f\"{'='*60}\")\n\n job_name = None\n\n try:\n job_name = client.train(\n trainer=TrainingHubTrainer(\n algorithm=algorithm,\n func_args=training_params,\n resources_per_node={\n \"cpu\": 4,\n \"memory\": \"16Gi\",\n },\n ),\n runtime=runtime,\n )\n print(f\"TrainJob created: {job_name}\")\n\n # Wait for the job to start running\n try:\n client.wait_for_job_status(name=job_name, status={\"Running\", \"Failed\"}, timeout=300)\n except Exception as e:\n print(f\"Wait for Running/Failed raised: {e}\")\n\n # Poll until we see: (1) the expected error in logs, and (2) failure via SDK APIs\n error_found = False\n failure_confirmed = False\n deadline = time.time() + 300 # 5 minute timeout\n\n while time.time() < deadline:\n # Check logs for the expected error\n if not error_found:\n try:\n log_lines = list(client.get_job_logs(job_name, follow=False))\n log_text = \"\\n\".join(str(line) for line in log_lines)\n if expected_error in log_text:\n error_found = True\n print(f\"Found expected error in logs: '{expected_error}'\")\n except Exception as e:\n print(f\"Log fetch error (retrying): {e}\")\n\n # Check job failure via get_job() and get_job_events()\n if not failure_confirmed:\n failure_confirmed, details = check_job_failure(client, job_name)\n if failure_confirmed:\n print(f\"Job failure confirmed via SDK: {'; '.join(details)}\")\n\n if error_found and failure_confirmed:\n break\n\n time.sleep(15)\n\n # Verify results\n assert error_found, f\"Expected error '{expected_error}' not found in logs\"\n assert failure_confirmed, (\n f\"Job failure not confirmed via get_job()/get_job_events() \"\n f\"— final job status: {client.get_job(name=job_name).status}\"\n )\n\n print(f\"PASSED: {scenario_name}\")\n result = True\n\n except Exception as e:\n print(f\"FAILED: {scenario_name} - {e}\")\n result = False\n\n finally:\n # Cleanup\n if job_name:\n try:\n client.delete_job(job_name)\n print(f\"Deleted job: {job_name}\")\n except Exception as e:\n print(f\"Warning: failed to delete job {job_name}: {e}\")\n\n return result", - "execution_count": null - }, - { - "cell_type": "code", - "id": "cell-scenario-invalid-dataset", - "metadata": {}, - "outputs": [], - "source": "# Scenario 1: Invalid Dataset Path\nresult_1 = run_failure_scenario(\n client=client,\n scenario_name=\"Invalid Dataset Path\",\n training_params={\n \"model_path\": \"/opt/app-root/src\",\n \"data_path\": \"/nonexistent/dataset.jsonl\",\n \"ckpt_output_dir\": \"/opt/app-root/src/checkpoints\",\n \"data_output_dir\": \"/opt/app-root/src/sft-data\",\n \"effective_batch_size\": 128,\n \"num_epochs\": 1,\n \"max_seq_len\": 4096,\n \"max_batch_len\": 10000,\n \"learning_rate\": 5e-6,\n },\n expected_error=\"FileNotFoundError\",\n runtime=th_runtime,\n)", - "execution_count": null - }, - { - "cell_type": "code", - "id": "cell-scenario-invalid-model", - "metadata": {}, - "outputs": [], - "source": "# Scenario 2: Invalid Model Path\nresult_2 = run_failure_scenario(\n client=client,\n scenario_name=\"Invalid Model Path\",\n training_params={\n \"model_path\": \"/nonexistent/model\",\n \"data_path\": \"/opt/app-root/src/data.jsonl\",\n \"ckpt_output_dir\": \"/opt/app-root/src/checkpoints\",\n \"data_output_dir\": \"/opt/app-root/src/sft-data\",\n \"effective_batch_size\": 128,\n \"num_epochs\": 1,\n \"max_seq_len\": 4096,\n \"max_batch_len\": 10000,\n \"learning_rate\": 5e-6,\n },\n expected_error=\"FileNotFoundError\",\n runtime=th_runtime,\n)", - "execution_count": null - }, - { - "cell_type": "code", - "id": "cell-scenario-invalid-hyperparams", - "metadata": {}, - "outputs": [], - "source": "# Scenario 3: Invalid Training Hyperparameters\n# Uses a string value for max_batch_len where an int is expected,\n# which triggers a Pydantic ValidationError during config parsing.\nresult_3 = run_failure_scenario(\n client=client,\n scenario_name=\"Invalid Training Hyperparameters\",\n training_params={\n \"model_path\": \"/opt/app-root/src\",\n \"data_path\": \"/opt/app-root/src/data.jsonl\",\n \"ckpt_output_dir\": \"/opt/app-root/src/checkpoints\",\n \"data_output_dir\": \"/opt/app-root/src/sft-data\",\n \"effective_batch_size\": 128,\n \"num_epochs\": 1,\n \"max_seq_len\": 4096,\n \"max_batch_len\": \"not_a_number\",\n \"learning_rate\": 5e-6,\n },\n expected_error=\"ValidationError\",\n runtime=th_runtime,\n)", - "execution_count": null - }, - { - "cell_type": "code", - "id": "cc9br1onzf", - "source": "# Scenario 4: OSFT — Invalid unfreeze_rank_ratio\n# The OSFT algorithm validates unfreeze_rank_ratio with a manual range check\n# (0.0 <= val <= 1.0), so passing a string triggers a TypeError.\nresult_4 = run_failure_scenario(\n client=client,\n scenario_name=\"OSFT Invalid Unfreeze Rank Ratio\",\n training_params={\n \"model_path\": \"/opt/app-root/src\",\n \"data_path\": \"/opt/app-root/src/data.jsonl\",\n \"ckpt_output_dir\": \"/opt/app-root/src/checkpoints\",\n \"data_output_path\": \"/opt/app-root/src/osft-data\",\n \"effective_batch_size\": 128,\n \"num_epochs\": 1,\n \"max_seq_len\": 2048,\n \"max_tokens_per_gpu\": 32000,\n \"learning_rate\": 5e-6,\n \"unfreeze_rank_ratio\": \"not_a_number\", # Invalid: string instead of float\n },\n expected_error=\"'<=' not supported between instances of 'float' and 'str'\",\n runtime=th_runtime,\n algorithm=TrainingHubAlgorithms.OSFT,\n)", - "metadata": {}, - "execution_count": null, - "outputs": [] - }, - { - "cell_type": "code", - "id": "qj6u4whtjg", - "source": "# Scenario 5: LORA — Invalid model path\n# LORA uses Unsloth's FastLanguageModel which raises a RuntimeError when\n# the model path doesn't exist, before any LoRA-specific validation runs.\nresult_5 = run_failure_scenario(\n client=client,\n scenario_name=\"LORA Invalid Model Path\",\n training_params={\n \"model_path\": \"/nonexistent/model\",\n \"data_path\": \"/opt/app-root/src/data.jsonl\",\n \"ckpt_output_dir\": \"/opt/app-root/src/checkpoints\",\n \"data_output_dir\": \"/opt/app-root/src/sft-data\",\n \"micro_batch_size\": 16,\n \"num_epochs\": 1,\n \"max_seq_len\": 1024,\n \"learning_rate\": 2e-4,\n \"lora_r\": 16,\n \"lora_alpha\": 32,\n \"lora_dropout\": 0.0,\n \"dataset_type\": \"chat_template\",\n \"field_messages\": \"messages\",\n },\n expected_error=\"No config file found\",\n runtime=th_runtime,\n algorithm=TrainingHubAlgorithms.LORA_SFT,\n)", - "metadata": {}, - "execution_count": null, - "outputs": [] - }, - { - "cell_type": "code", - "id": "cell-summary", - "metadata": {}, - "outputs": [], - "source": "# Summary\nresults = {\n \"Invalid Dataset Path\": result_1,\n \"Invalid Model Path\": result_2,\n \"Invalid Training Hyperparameters\": result_3,\n \"OSFT Invalid Unfreeze Rank Ratio\": result_4,\n \"LORA Invalid Model Path\": result_5,\n}\n\nprint(\"\\n\" + \"=\" * 60)\nprint(\"FAILURE SCENARIOS TEST SUMMARY\")\nprint(\"=\" * 60)\nfor name, passed in results.items():\n status = \"PASSED\" if passed else \"FAILED\"\n print(f\" {name}: {status}\")\n\nall_passed = all(results.values())\nprint(\"=\" * 60)\nif all_passed:\n print(\"NOTEBOOK_STATUS: SUCCESS\")\nelse:\n failed = [name for name, passed in results.items() if not passed]\n print(\"NOTEBOOK_STATUS: FAILURE\")\n raise RuntimeError(f\"Failed scenarios: {', '.join(failed)}\")", - "execution_count": null - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 3 (ipykernel)", - "language": "python", - "name": "python3" - }, - "language_info": { - "name": "python", - "version": "3.12.0" - } - }, - "nbformat": 4, - "nbformat_minor": 5 -} \ No newline at end of file + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/tests/trainer/resources/rhai_features.ipynb b/tests/trainer/resources/rhai_features.ipynb index 19380fd0b..635c5c3ec 100644 --- a/tests/trainer/resources/rhai_features.ipynb +++ b/tests/trainer/resources/rhai_features.ipynb @@ -13,9 +13,7 @@ }, { "cell_type": "code", - "execution_count": null, "metadata": {}, - "outputs": [], "source": [ "def train_bloom():\n", " \"\"\"Training function for distributed training.\n", @@ -199,13 +197,13 @@ " if rank == 0:\n", " print(\"Training is finished\")\n", " dist.destroy_process_group()" - ] + ], + "execution_count": null, + "outputs": [] }, { "cell_type": "code", - "execution_count": null, "metadata": {}, - "outputs": [], "source": [ "import os\n", "import warnings\n", @@ -242,13 +240,13 @@ "\n", "trainer_client = TrainerClient(backend_cfg)\n", "print(\"TrainerClient initialized\")" - ] + ], + "execution_count": null, + "outputs": [] }, { "cell_type": "code", - "execution_count": null, "metadata": {}, - "outputs": [], "source": [ "training_runtime_name = os.getenv(\"TRAINING_RUNTIME\")\n", "if not training_runtime_name:\n", @@ -258,13 +256,13 @@ "if torch_runtime is None:\n", " raise RuntimeError(f\"Required runtime '{training_runtime_name}' not found\")\n", "print(f\"Got runtime: {torch_runtime.name}\")" - ] + ], + "execution_count": null, + "outputs": [] }, { "cell_type": "code", - "execution_count": null, "metadata": {}, - "outputs": [], "source": [ "# Pre-download model/dataset from S3 to shared PVC (disconnected environments only)\n", "# Training pods will then load from these local paths\n", @@ -283,7 +281,7 @@ "if os.path.exists(checkpoints_path):\n", " print(f\"Cleaning up old checkpoints at {checkpoints_path}\")\n", " shutil.rmtree(checkpoints_path)\n", - " print(\" \u2705 Old checkpoints removed\")\n", + " print(\" ✅ Old checkpoints removed\")\n", "\n", "s3_endpoint = os.getenv(\"AWS_DEFAULT_ENDPOINT\", \"\")\n", "s3_access_key = os.getenv(\"AWS_ACCESS_KEY_ID\", \"\")\n", @@ -342,7 +340,7 @@ " os.makedirs(os.path.dirname(local_file), exist_ok=True)\n", " s3_client.download_file(s3_bucket, key, local_file)\n", " count += 1\n", - " print(f\" \u2705 Downloaded {count} files to {local_path}\")\n", + " print(f\" ✅ Downloaded {count} files to {local_path}\")\n", " \n", " # Download model if not already present\n", " if not os.path.exists(os.path.join(model_local_path, \"config.json\")):\n", @@ -356,16 +354,16 @@ " else:\n", " print(f\" Dataset already exists at {dataset_local_path}, skipping\")\n", " \n", - " print(\"\u2705 S3 download complete - training pods will load from shared PVC\")\n", + " print(\"✅ S3 download complete - training pods will load from shared PVC\")\n", "else:\n", " print(\"HuggingFace mode: training pods will download directly from HF Hub\")\n" - ] + ], + "execution_count": null, + "outputs": [] }, { "cell_type": "code", - "execution_count": null, "metadata": {}, - "outputs": [], "source": [ "from kubeflow.trainer.rhai.transformers import TransformersTrainer\n", "from kubeflow.trainer.options import PodTemplateOverrides, PodTemplateOverride, PodSpecOverride, ContainerOverride\n", @@ -397,14 +395,16 @@ "# Configure resources - GPU label tells k8s to schedule on GPU node\n", "if gpu_resource_label:\n", " resources_per_node = {\n", - " \"cpu\": 2, \n", + " \"cpu\": 2,\n", " \"memory\": \"12Gi\",\n", " gpu_resource_label: num_gpus_per_node # e.g., \"nvidia.com/gpu\": 2\n", " }\n", " print(f\"GPU mode: requesting {gpu_resource_label}: {num_gpus_per_node}\")\n", "else:\n", - " resources_per_node = {\"cpu\": 2, \"memory\": \"8Gi\"}\n", - " print(\"CPU mode: no GPU requested\")\n", + " # CPU mode: SDK infers nproc_per_node from CPU count.\n", + " # For single-process CPU tests, keep cpu=1 so checkpoint resume does not fan out local ranks.\n", + " resources_per_node = {\"cpu\": num_gpus_per_node, \"memory\": \"8Gi\"}\n", + " print(f\"CPU mode: {num_gpus_per_node} processes per node (WORLD_SIZE = {num_nodes} × {num_gpus_per_node} = {num_nodes * num_gpus_per_node})\")\n", "\n", "# Build env vars to pass to training pods (GPU_TYPE tells train_bloom() to force CPU mode)\n", "training_env = {\"GPU_TYPE\": gpu_type}\n", @@ -464,13 +464,13 @@ " ]\n", ")\n", "print(f\"TRAINJOB_NAME: {job_name}\")" - ] + ], + "execution_count": null, + "outputs": [] }, { "cell_type": "code", - "execution_count": null, "metadata": {}, - "outputs": [], "source": [ "# Wait for job completion\n", "trainer_client.wait_for_job_status(name=job_name, status={\"Running\"}, timeout=600)\n", @@ -478,39 +478,41 @@ "\n", "job = trainer_client.get_job(name=job_name)\n", "print(f\"Training job final status: {job.status}\")" - ] + ], + "execution_count": null, + "outputs": [] }, { "cell_type": "code", - "execution_count": null, "metadata": {}, - "outputs": [], "source": [ "# Print job steps\n", "for step in trainer_client.get_job(name=job_name).steps:\n", " print(f\"Step: {step.name}, Status: {step.status}, Devices: {step.device} x {step.device_count}\")" - ] + ], + "execution_count": null, + "outputs": [] }, { "cell_type": "code", - "execution_count": null, "metadata": {}, - "outputs": [], "source": [ "# Print job logs\n", "for logline in trainer_client.get_job_logs(job_name, follow=False):\n", " print(logline)" - ] + ], + "execution_count": null, + "outputs": [] }, { "cell_type": "code", - "execution_count": null, "metadata": {}, - "outputs": [], "source": [ "# Notebook completed - Go test handles assertions\n", "print(\"NOTEBOOK_STATUS: SUCCESS\")" - ] + ], + "execution_count": null, + "outputs": [] } ], "metadata": { diff --git a/tests/trainer/sdk_tests/failure_traininghub_tests.go b/tests/trainer/sdk_tests/failure_traininghub_tests.go index e568de876..4c3a3d6d8 100644 --- a/tests/trainer/sdk_tests/failure_traininghub_tests.go +++ b/tests/trainer/sdk_tests/failure_traininghub_tests.go @@ -24,6 +24,7 @@ import ( . "github.com/onsi/gomega" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" common "github.com/opendatahub-io/distributed-workloads/tests/common" support "github.com/opendatahub-io/distributed-workloads/tests/common/support" @@ -35,8 +36,23 @@ const ( failureNotebookPath = "resources/" + failureNotebookName torchrunFailureNotebookName = "torchrun_failure.ipynb" torchrunFailureNotebookPath = "resources/" + torchrunFailureNotebookName + failureInstallScriptPath = "resources/disconnected_env/install_kubeflow.py" + failureInstallScriptName = "install_kubeflow.py" ) +func resolveTrainingHubRuntime(test support.Test, preferred, fallback string) string { + test.T().Helper() + + if _, err := test.Client().Trainer().TrainerV1alpha1().ClusterTrainingRuntimes().Get( + test.Ctx(), preferred, metav1.GetOptions{}, + ); err == nil { + return preferred + } + + test.T().Logf("ClusterTrainingRuntime %q not found, falling back to %q", preferred, fallback) + return fallback +} + // RunTrainingFailureScenariosTest verifies that training failures are properly // propagated — when a training pod fails, the TrainJob should report a Failed // condition, and the SDK client should surface that failure correctly. @@ -55,11 +71,27 @@ func RunTrainingFailureScenariosTest(t *testing.T) { support.CreateUserRoleBindingWithClusterRole(test, userName, namespace.Name, "admin") trainerutils.CreateUserClusterRoleBindingForTrainerRuntimes(test, userName) - // Create ConfigMap with notebook + // Create ConfigMap with notebook and kubeflow install script localPath := failureNotebookPath nb, err := os.ReadFile(localPath) test.Expect(err).NotTo(HaveOccurred(), fmt.Sprintf("failed to read notebook: %s", localPath)) - cm := support.CreateConfigMap(test, namespace.Name, map[string][]byte{failureNotebookName: nb}) + installScript, err := os.ReadFile(failureInstallScriptPath) + test.Expect(err).NotTo(HaveOccurred(), fmt.Sprintf("failed to read install script: %s", failureInstallScriptPath)) + endpoint, endpointOK := support.GetStorageBucketDefaultEndpoint() + accessKey, _ := support.GetStorageBucketAccessKeyId() + secretKey, _ := support.GetStorageBucketSecretKey() + bucket, bucketOK := support.GetStorageBucketName() + if !endpointOK { + endpoint = "" + } + if !bucketOK { + bucket = "" + } + trainingRuntime := resolveTrainingHubRuntime(test, trainerutils.DefaultTrainingHubRuntimeCPU, trainerutils.DefaultTrainingHubRuntimeCUDA) + cm := support.CreateConfigMap(test, namespace.Name, map[string][]byte{ + failureNotebookName: nb, + failureInstallScriptName: installScript, + }) // Create RWX PVC required by the notebook pod template storageClass, err := support.GetRWXStorageClass(test) @@ -76,12 +108,20 @@ func RunTrainingFailureScenariosTest(t *testing.T) { "set -e; "+ "export OPENSHIFT_API_URL='%s'; export NOTEBOOK_USER_TOKEN='%s'; "+ "export NOTEBOOK_NAMESPACE='%s'; "+ + "export AWS_DEFAULT_ENDPOINT='%s'; export AWS_ACCESS_KEY_ID='%s'; "+ + "export AWS_SECRET_ACCESS_KEY='%s'; export AWS_STORAGE_BUCKET='%s'; "+ "export TRAINING_RUNTIME='%s'; "+ - "python -m pip install --quiet --no-cache-dir --break-system-packages ipykernel papermill && "+ + "export GPU_TYPE='cpu'; "+ + "%s"+ + "python -m pip install --quiet --no-cache-dir --break-system-packages ipykernel papermill boto3==1.34.162 && "+ + "python /opt/app-root/notebooks/%s && "+ "if python -m papermill -k python3 /opt/app-root/notebooks/%s /opt/app-root/src/out.ipynb --log-output; "+ "then echo 'NOTEBOOK_STATUS: SUCCESS'; else echo 'NOTEBOOK_STATUS: FAILURE'; fi; sleep infinity", support.GetOpenShiftApiUrl(test), userToken, namespace.Name, - trainerutils.DefaultTrainingHubRuntimeCPU, + endpoint, accessKey, secretKey, bucket, + trainingRuntime, + buildKubeflowInstallExports(), + failureInstallScriptName, failureNotebookName, ) command := []string{"/bin/sh", "-c", shellCmd} @@ -122,11 +162,16 @@ func RunTorchrunTrainingFailureTest(t *testing.T) { support.CreateUserRoleBindingWithClusterRole(test, userName, namespace.Name, "admin") trainerutils.CreateUserClusterRoleBindingForTrainerRuntimes(test, userName) - // Create ConfigMap with notebook + // Create ConfigMap with notebook and kubeflow install script localPath := torchrunFailureNotebookPath nb, err := os.ReadFile(localPath) test.Expect(err).NotTo(HaveOccurred(), fmt.Sprintf("failed to read notebook: %s", localPath)) - cm := support.CreateConfigMap(test, namespace.Name, map[string][]byte{torchrunFailureNotebookName: nb}) + installScript, err := os.ReadFile(failureInstallScriptPath) + test.Expect(err).NotTo(HaveOccurred(), fmt.Sprintf("failed to read install script: %s", failureInstallScriptPath)) + cm := support.CreateConfigMap(test, namespace.Name, map[string][]byte{ + torchrunFailureNotebookName: nb, + failureInstallScriptName: installScript, + }) // S3 configuration for model and dataset download endpoint, endpointOK := support.GetStorageBucketDefaultEndpoint() @@ -140,6 +185,7 @@ func RunTorchrunTrainingFailureTest(t *testing.T) { if !bucketOK { bucket = "" } + trainingRuntime := resolveTrainingHubRuntime(test, trainerutils.DefaultTrainingHubRuntimeCUDA, trainerutils.DefaultTrainingHubRuntimeCUDA) // Create RWX PVC for shared dataset and model storageClass, err := support.GetRWXStorageClass(test) @@ -162,12 +208,17 @@ func RunTorchrunTrainingFailureTest(t *testing.T) { "export AWS_STORAGE_BUCKET='%s'; "+ "export AWS_STORAGE_BUCKET_SFT_DIR='%s'; "+ "export TRAINING_RUNTIME='%s'; "+ + "export GPU_TYPE='nvidia'; "+ + "%s"+ "python -m pip install --quiet --no-cache-dir --break-system-packages ipykernel papermill boto3==1.34.162 && "+ + "python /opt/app-root/notebooks/%s && "+ "if python -m papermill -k python3 /opt/app-root/notebooks/%s /opt/app-root/src/out.ipynb --log-output; "+ "then echo 'NOTEBOOK_STATUS: SUCCESS'; else echo 'NOTEBOOK_STATUS: FAILURE'; fi; sleep infinity", support.GetOpenShiftApiUrl(test), userToken, namespace.Name, rwxPvc.Name, endpoint, accessKey, secretKey, bucket, prefix, - trainerutils.DefaultTrainingHubRuntimeCUDA, + trainingRuntime, + buildKubeflowInstallExports(), + failureInstallScriptName, torchrunFailureNotebookName, ) command := []string{"/bin/sh", "-c", shellCmd} diff --git a/tests/trainer/sdk_tests/fashion_mnist_tests.go b/tests/trainer/sdk_tests/fashion_mnist_tests.go index 02f69fa1f..432f896c1 100644 --- a/tests/trainer/sdk_tests/fashion_mnist_tests.go +++ b/tests/trainer/sdk_tests/fashion_mnist_tests.go @@ -101,6 +101,7 @@ func RunFashionMnistCpuDistributedTraining(t *testing.T) { "export AWS_STORAGE_BUCKET_MNIST_DIR='%s'; "+ "export TRAINING_RUNTIME='%s'; "+ "export GPU_TYPE='cpu'; "+ + "%s"+ "python -m pip install --quiet --no-cache-dir ipykernel papermill boto3==1.34.162 && "+ "python /opt/app-root/notebooks/%s && "+ "if python -m papermill -k python3 /opt/app-root/notebooks/%s /opt/app-root/src/out.ipynb --log-output; "+ @@ -108,6 +109,7 @@ func RunFashionMnistCpuDistributedTraining(t *testing.T) { support.GetOpenShiftApiUrl(test), userToken, namespace.Name, rwxPvc.Name, endpoint, accessKey, secretKey, bucket, prefix, trainerutils.DefaultClusterTrainingRuntimeCUDA, + buildKubeflowInstallExports(), installKubeflowScript, notebookName, ) @@ -229,6 +231,7 @@ func RunFashionMnistKueueCpuDistributedTraining(t *testing.T) { "export TRAINING_RUNTIME='%s'; "+ "export GPU_TYPE='cpu'; "+ "export KUEUE_QUEUE_NAME='%s'; "+ + "%s"+ "python -m pip install --quiet --no-cache-dir ipykernel papermill boto3==1.34.162 && "+ "python /opt/app-root/notebooks/%s && "+ "if python -m papermill -k python3 /opt/app-root/notebooks/%s /opt/app-root/src/out.ipynb --log-output; "+ @@ -237,6 +240,7 @@ func RunFashionMnistKueueCpuDistributedTraining(t *testing.T) { endpoint, accessKey, secretKey, bucket, prefix, trainerutils.DefaultClusterTrainingRuntimeCUDA, customLocalQueue.Name, + buildKubeflowInstallExports(), installKubeflowScript, notebookName, ) diff --git a/tests/trainer/sdk_tests/rhai_features_tests.go b/tests/trainer/sdk_tests/rhai_features_tests.go index 0069ec8cb..c3b51f960 100644 --- a/tests/trainer/sdk_tests/rhai_features_tests.go +++ b/tests/trainer/sdk_tests/rhai_features_tests.go @@ -62,6 +62,21 @@ func boolStr(b bool) string { return "false" } +// buildKubeflowInstallExports forwards optional disconnected-install overrides +// into the notebook shell so install_kubeflow.py can use a staged wheel. +func buildKubeflowInstallExports() string { + var exports strings.Builder + + if value, ok := os.LookupEnv("KUBEFLOW_REQUIRED_VERSION"); ok && value != "" { + exports.WriteString(fmt.Sprintf("export KUBEFLOW_REQUIRED_VERSION='%s'; ", value)) + } + if value, ok := os.LookupEnv("KUBEFLOW_WHEEL_S3_KEY"); ok && value != "" { + exports.WriteString(fmt.Sprintf("export KUBEFLOW_WHEEL_S3_KEY='%s'; ", value)) + } + + return exports.String() +} + // RhaiFeatureConfig holds configuration for RHAI feature tests type RhaiFeatureConfig struct { EnableProgressionTracking bool @@ -399,8 +414,9 @@ func runRhaiFeaturesTestWithConfig(t *testing.T, config RhaiFeatureConfig) { // install_kubeflow.py uses GPU_TYPE to select the correct index (cpu/cuda/rocm) test.T().Logf("Using Red Hat PyPI index for %s (kubeflow not on public PyPI)", gpuType) - // Build pip exports - GPU_TYPE tells install_kubeflow.py which Red Hat index to use - pipExports := fmt.Sprintf("export GPU_TYPE='%s'; ", gpuType) + // Build pip exports - GPU_TYPE tells install_kubeflow.py which Red Hat index to use. + // Optional KUBEFLOW_* exports let disconnected runs point at a staged wheel. + pipExports := fmt.Sprintf("export GPU_TYPE='%s'; %s", gpuType, buildKubeflowInstallExports()) pipInstallFlags := "" // Set defaults for num_nodes and num_gpus_per_node if not specified @@ -733,7 +749,7 @@ func verifyCheckpoints(test Test, namespace, trainJobName, checkpointDir string, test.T().Log("Waiting for training to complete at least 2 epochs (checking logs)...") test.Eventually(func() bool { return hasCompletedEpochFromLogs(test, namespace, trainJobName, 2) - }, TestTimeoutMedium, 5*time.Second).Should(BeTrue(), "Training should complete at least 2 epochs before suspension") + }, timeout, 5*time.Second).Should(BeTrue(), "Training should complete at least 2 epochs before suspension") test.T().Log("At least 2 epochs completed - ready to suspend") // Verify cloud checkpoint upload is working (only for cloud storage mode, not PVC)