From 5027a135123192a5c71e8b9e5412008ca0c74d6f Mon Sep 17 00:00:00 2001 From: daohu527 Date: Wed, 11 Feb 2026 00:38:09 +0800 Subject: [PATCH 1/8] feat: refractor auto test tool --- modules/tools/whl-can/auto_test/README.md | 20 ++ modules/tools/whl-can/auto_test/cli.py | 159 +++++++++ .../whl-can/auto_test/config/__init__.py | 0 .../whl-can/auto_test/config/default.yaml | 17 + .../tools/whl-can/auto_test/config/manager.py | 31 ++ .../tools/whl-can/auto_test/core/__init__.py | 0 .../tools/whl-can/auto_test/core/reporter.py | 80 +++++ .../tools/whl-can/auto_test/core/runner.py | 239 ++++++++++++++ modules/tools/whl-can/auto_test/core/ui.py | 121 +++++++ .../tools/whl-can/auto_test/tests/__init__.py | 0 .../whl-can/auto_test/tests/l1_static.py | 301 ++++++++++++++++++ .../whl-can/auto_test/tests/l2_dynamic_low.py | 294 +++++++++++++++++ .../auto_test/tests/l3_dynamic_high.py | 181 +++++++++++ modules/tools/whl-can/auto_test/util.py | 66 ++++ 14 files changed, 1509 insertions(+) create mode 100644 modules/tools/whl-can/auto_test/README.md create mode 100644 modules/tools/whl-can/auto_test/cli.py create mode 100644 modules/tools/whl-can/auto_test/config/__init__.py create mode 100644 modules/tools/whl-can/auto_test/config/default.yaml create mode 100644 modules/tools/whl-can/auto_test/config/manager.py create mode 100644 modules/tools/whl-can/auto_test/core/__init__.py create mode 100644 modules/tools/whl-can/auto_test/core/reporter.py create mode 100644 modules/tools/whl-can/auto_test/core/runner.py create mode 100644 modules/tools/whl-can/auto_test/core/ui.py create mode 100644 modules/tools/whl-can/auto_test/tests/__init__.py create mode 100644 modules/tools/whl-can/auto_test/tests/l1_static.py create mode 100644 modules/tools/whl-can/auto_test/tests/l2_dynamic_low.py create mode 100644 modules/tools/whl-can/auto_test/tests/l3_dynamic_high.py create mode 100644 modules/tools/whl-can/auto_test/util.py diff --git a/modules/tools/whl-can/auto_test/README.md b/modules/tools/whl-can/auto_test/README.md new file mode 100644 index 00000000..98b01f71 --- /dev/null +++ b/modules/tools/whl-can/auto_test/README.md @@ -0,0 +1,20 @@ +# Quickstart + +Autonomous driving chassis automatic testing tool + +--- + +## Prerequisites + +Before using automated testing tools, it's best to perform a check using the whl-can tool, and then launch the automated testing tool. + +--- + +## How to Run + +```bash +python3 cli.py --level 1 --config config/default.yaml +``` + +This will start the program with non-blocking keyboard listening and send +control commands in real time. diff --git a/modules/tools/whl-can/auto_test/cli.py b/modules/tools/whl-can/auto_test/cli.py new file mode 100644 index 00000000..525f2cc5 --- /dev/null +++ b/modules/tools/whl-can/auto_test/cli.py @@ -0,0 +1,159 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +A robust test runner for validating Apollo's chassis control interface. +Implements V-Model testing tiers (Static -> Dynamic Low -> Dynamic High). + +Key Features: +- 100Hz Command Heartbeat (prevents chassis timeouts) +- Safety Watchdog (monitors thread health & chassis errors) +- Tiered Test Execution (L1/L2/L3) +- Real-time Curses UI +""" + +import argparse +import time +import curses +import sys + +try: + from cyber.python.cyber_py3 import cyber +except ImportError: + print("FATAL: Apollo Cyber RT python modules not found.") + print("Please ensure you are in the Apollo docker environment.") + sys.exit(1) + +from core.runner import TestRunner +from core.ui import UIHandler +from config.manager import ConfigManager + +from tests.l1_static import ( + test_l1_driving_mode_transition, + test_l1_steering_performance_static, + test_l1_static_max_steering_rate, + test_l1_static_brake_consistency, + test_l1_epb_toggle, + test_l1_static_gear_shift, + test_l1_signal_control, + test_l1_sport_mode_toggle, +) +from tests.l2_dynamic_low import ( + test_l2_speed_control_loop, + test_l2_throttle_linearity, + test_l2_brake_linearity, + test_l2_throttle_response_time, + test_l2_brake_response_time, + test_l2_gear_protection, + test_l2_mode_protection, +) +from tests.l3_dynamic_high import ( + test_l3_staged_acceleration, + test_l3_staged_braking_performance, + test_l3_emergency_brake, +) + + +def main(stdscr): + # 1. Parse Arguments + parser = argparse.ArgumentParser(description="Apollo Chassis Auto-Tester") + parser.add_argument( + "--level", + type=int, + default=1, + choices=[1, 2, 3], + help="Test Level (1: Static, 2: Low-Speed, 3: High-Dynamic)", + ) + parser.add_argument( + "--config", + type=str, + default="config/default.yaml", + help="Path to test configuration file", + ) + # Curses wrapper passes extra args awkwardly, so we parse known args only + # In a real CLI, we might handle argv differently before curses.wrapper + args, _ = parser.parse_known_args() + + # 2. Initialize Components + config = ConfigManager(args.config) + ui = UIHandler(stdscr) + runner = TestRunner(ui, config, args.level) + + # 3. Register Test Cases + # --- Level 1: Static / Functional --- + runner.register_test_case( + test_l1_driving_mode_transition, "TC-FUNC-01: Mode Transition", 1 + ) + runner.register_test_case( + test_l1_steering_performance_static, "TC-CTRL-07: Static Steer Step", 1 + ) + runner.register_test_case( + test_l1_static_max_steering_rate, "TC-CTRL-08: Static Max Steer Test", 1 + ) + runner.register_test_case( + test_l1_static_brake_consistency, "TC-CTRL-04-S: Static Brake Consistency", 1 + ) + runner.register_test_case(test_l1_epb_toggle, "TC-FUNC-04: EPB Toggle", 1) + runner.register_test_case( + test_l1_static_gear_shift, "TC-FUNC-02: Static Gear Shift", 1 + ) + runner.register_test_case( + test_l1_sport_mode_toggle, "TC-FUNC-05: Sport/Mode Toggle", 1 + ) + runner.register_test_case(test_l1_signal_control, "TC-SIG-ALL: Body Signals", 1) + + # --- Level 2: Dynamic Low Speed --- + runner.register_test_case( + test_l2_speed_control_loop, "TC-CTRL-09: Low Speed Loop", 2 + ) + runner.register_test_case( + test_l2_throttle_linearity, "TC-CTRL-01: Throttle Linearity", 2 + ) + runner.register_test_case(test_l2_brake_linearity, "TC-CTRL-04: Brake Linearity", 2) + runner.register_test_case( + test_l2_throttle_response_time, "TC-CTRL-02: Throttle Resp.", 2 + ) + runner.register_test_case(test_l2_brake_response_time, "TC-CTRL-05: Brake Resp.", 2) + runner.register_test_case(test_l2_gear_protection, "TC-FUNC-03: Gear Protection", 2) + runner.register_test_case(test_l2_mode_protection, "TC-FUNC-06: Mode Protection", 2) + + # --- Level 3: Dynamic High Performance --- + runner.register_test_case( + test_l3_staged_acceleration, "TC-CTRL-10: Staged Accel", 3 + ) + runner.register_test_case( + test_l3_staged_braking_performance, "TC-CTRL-06: Staged Brake", 3 + ) + runner.register_test_case( + test_l3_emergency_brake, "TC-SAFETY-01: Emergency Brake", 3 + ) + + # 4. Execution Loop + try: + runner.start() # Starts Heartbeat & Watchdog + runner.run_sequence() + except KeyboardInterrupt: + ui.log("User interrupted (Ctrl+C). Stopping...", "RED") + except Exception as e: + ui.log(f"CRITICAL ERROR: {str(e)}", "RED") + import traceback + + ui.log(traceback.format_exc(), "RED") + time.sleep(3) # Give user time to see the error + finally: + ui.log("Shutting down...", "CYAN") + runner.cleanup() + + +if __name__ == "__main__": + # Environment Check + if not cyber.ok(): + print("Initializing Cyber RT...") + cyber.init() + + try: + # Using curses.wrapper to handle terminal setup/teardown automatically + curses.wrapper(main) + except Exception as e: + print(f"Failed to initialize UI: {e}") + cyber.shutdown() diff --git a/modules/tools/whl-can/auto_test/config/__init__.py b/modules/tools/whl-can/auto_test/config/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/modules/tools/whl-can/auto_test/config/default.yaml b/modules/tools/whl-can/auto_test/config/default.yaml new file mode 100644 index 00000000..e7bbe408 --- /dev/null +++ b/modules/tools/whl-can/auto_test/config/default.yaml @@ -0,0 +1,17 @@ +# config/default.yaml +topics: + control: "/apollo/control" + chassis: "/apollo/chassis" + +limits: + max_speed_mps: 20.0 + max_throttle_pct: 100.0 + max_brake_pct: 100.0 + max_steering_pct: 100.0 + emergency_brake_val: 60.0 + +thresholds: + latency_ms: 200 + steering_steady_error: 2.0 + speed_steady_error: 0.5 + chassis_wait_sec: 5.0 diff --git a/modules/tools/whl-can/auto_test/config/manager.py b/modules/tools/whl-can/auto_test/config/manager.py new file mode 100644 index 00000000..1e3b777b --- /dev/null +++ b/modules/tools/whl-can/auto_test/config/manager.py @@ -0,0 +1,31 @@ +import yaml +import os + + +class ConfigManager: + def __init__(self, path): + self.data = { + "topics": {"control": "/apollo/control", "chassis": "/apollo/chassis"}, + "limits": {"emergency_brake_val": 50.0}, + "thresholds": {"steady_error": 2.0, "chassis_wait_sec": 5.0}, + } + if os.path.exists(path): + try: + with open(path, "r") as f: + user_config = yaml.safe_load(f) + if user_config: + self.data.update(user_config) + except Exception as e: + print(f"Warning: Failed to load config {path}: {e}") + + @property + def topics(self): + return self.data["topics"] + + @property + def limits(self): + return self.data["limits"] + + @property + def thresholds(self): + return self.data["thresholds"] diff --git a/modules/tools/whl-can/auto_test/core/__init__.py b/modules/tools/whl-can/auto_test/core/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/modules/tools/whl-can/auto_test/core/reporter.py b/modules/tools/whl-can/auto_test/core/reporter.py new file mode 100644 index 00000000..e55325dc --- /dev/null +++ b/modules/tools/whl-can/auto_test/core/reporter.py @@ -0,0 +1,80 @@ +import os +import csv +import time +import threading +import matplotlib.pyplot as plt +from datetime import datetime +from typing import List, Tuple + + +class DataRecorder: + """ + Responsible for high-frequency data collection, CSV storage, and plotting. + Implements decoupling of data and logic. + """ + + def __init__(self, test_name: str, output_dir: str = "test_results"): + self.test_name = test_name + self.output_dir = os.path.join( + output_dir, datetime.now().strftime("%Y%m%d_%H%M%S") + ) + if not os.path.exists(self.output_dir): + os.makedirs(self.output_dir) + + # Data buffer: (timestamp, cmd_val, feedback_val, speed_mps) + self.data: List[Tuple[float, float, float, float]] = [] + self.is_recording = False + self._lock = threading.Lock() + self.start_time = 0.0 + + def start(self): + with self._lock: + self.data = [] + self.start_time = time.time() + self.is_recording = True + + def record(self, cmd_val: float, feedback_val: float, speed_mps: float = 0.0): + """Call this method in the test loop to record data points.""" + if not self.is_recording: + return + with self._lock: + t = time.time() - self.start_time + self.data.append((t, cmd_val, feedback_val, speed_mps)) + + def stop(self): + with self._lock: + self.is_recording = False + + def save_and_plot(self, title: str, ylabel: str, save_name: str): + """Save the CSV file and generate a PNG image.""" + if not self.data: + return + + # 1. Save CSV + csv_path = os.path.join(self.output_dir, f"{save_name}.csv") + with open(csv_path, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["Time(s)", "Command", "Feedback", "Speed(m/s)"]) + writer.writerows(self.data) + + # 2. Plot (Backend Agg for headless environments) + plt.switch_backend("Agg") + times = [d[0] for d in self.data] + cmds = [d[1] for d in self.data] + fdbks = [d[2] for d in self.data] + + plt.figure(figsize=(10, 6)) + plt.plot(times, cmds, "r--", label="Command", linewidth=1.5) + plt.plot(times, fdbks, "b-", label="Feedback", linewidth=2.0) + + plt.title(f"{self.test_name}: {title}") + plt.xlabel("Time (s)") + plt.ylabel(ylabel) + plt.legend() + plt.grid(True) + + png_path = os.path.join(self.output_dir, f"{save_name}.png") + plt.savefig(png_path) + plt.close() + + return png_path diff --git a/modules/tools/whl-can/auto_test/core/runner.py b/modules/tools/whl-can/auto_test/core/runner.py new file mode 100644 index 00000000..dd188bf5 --- /dev/null +++ b/modules/tools/whl-can/auto_test/core/runner.py @@ -0,0 +1,239 @@ +import threading +import time +import logging +from typing import Optional, List, Dict, Callable + +from cyber.python.cyber_py3 import cyber +from modules.common_msgs.control_msgs import control_cmd_pb2 +from modules.common_msgs.chassis_msgs import chassis_pb2 + +from core.ui import UIHandler +from config.manager import ConfigManager +from util import create_base_command + + +class CommandHeartbeat(threading.Thread): + """Sends control commands at fixed 100Hz frequency.""" + + def __init__(self, writer, interval=0.01): + super().__init__(name="CmdHeartbeat", daemon=True) + self.writer = writer + self.interval = interval + self.active = True + self.current_cmd = control_cmd_pb2.ControlCommand() + self.lock = threading.Lock() + self.seq_num = 0 + + def update(self, cmd: control_cmd_pb2.ControlCommand): + with self.lock: + self.current_cmd.CopyFrom(cmd) + + def run(self): + while self.active: + with self.lock: + self.seq_num += 1 + self.current_cmd.header.sequence_num = self.seq_num + self.current_cmd.header.timestamp_sec = cyber.time.Time.now().to_sec() + self.current_cmd.header.module_name = "chassis_tester" + self.writer.write(self.current_cmd) + time.sleep(self.interval) + + def stop(self): + self.active = False + + +class TestRunner: + def __init__(self, ui: UIHandler, config: ConfigManager, level: int): + self.ui = ui + self.config = config + self.run_level = level + + self.node = cyber.Node("chassis_tester_node") + self.writer = self.node.create_writer( + config.topics["control"], control_cmd_pb2.ControlCommand + ) + self.node.create_reader( + config.topics["chassis"], chassis_pb2.Chassis, self._chassis_cb + ) + + self._chassis_msg = None + self._chassis_lock = threading.Lock() + self._last_chassis_ts = 0.0 + + self.heartbeat = CommandHeartbeat(self.writer) + self.estop_triggered = threading.Event() + + self.test_cases = [] + + def _chassis_cb(self, msg): + with self._chassis_lock: + self._chassis_msg = msg + self._last_chassis_ts = time.time() + + def register_test_case(self, func, name, level): + self.test_cases.append({"func": func, "name": name, "level": level}) + + def start(self): + self.ui.draw_header("Apollo Chassis Auto-Tester", self.run_level) + self.heartbeat.start() + # Start safety monitoring thread + self.safety_thread = threading.Thread(target=self._safety_monitor, daemon=True) + self.safety_thread.start() + + self.ui.log("System Initialized.", "CYAN") + if not self.wait_for_chassis( + self.config.thresholds.get("chassis_wait_sec", 5.0) + ): + self.ui.log("No chassis feedback detected. Check /apollo/chassis.", "RED") + else: + self.ui.log("Chassis feedback OK.", "GREEN") + self.ui.log("Press [Enter] to begin testing...", "YELLOW") + self.ui.wait_for_enter() + + def update_cmd(self, cmd): + """Update the command being sent by the heartbeat.""" + self.heartbeat.update(cmd) + + def update_command(self, cmd): + """Compatibility alias for test cases.""" + self.update_cmd(cmd) + + def get_chassis(self) -> Optional[chassis_pb2.Chassis]: + with self._chassis_lock: + return self._chassis_msg + + def get_latest_chassis(self) -> Optional[chassis_pb2.Chassis]: + return self.get_chassis() + + def wait_for_chassis(self, timeout_sec: float) -> bool: + start = time.time() + while time.time() - start < timeout_sec: + if self.get_chassis() is not None: + return True + time.sleep(0.05) + return False + + def wait_for_condition( + self, condition_fn: Callable[[], bool], timeout_sec: float, desc: str + ) -> bool: + start = time.time() + while time.time() - start < timeout_sec: + if self.estop_triggered.is_set(): + self.ui.log(f"Aborted: {desc}", "RED") + return False + try: + if condition_fn(): + return True + except Exception: + pass + time.sleep(0.01) + self.ui.log(f"Timeout: {desc}", "YELLOW") + return False + + def prepare_for_drive(self) -> bool: + """Request AUTO + Drive with parking brake released in a safe sequence.""" + if not self.wait_for_chassis( + self.config.thresholds.get("chassis_wait_sec", 5.0) + ): + return False + + cmd = create_base_command() + cmd.brake = 20.0 + cmd.parking_brake = True + cmd.gear_location = chassis_pb2.Chassis.GEAR_NEUTRAL + self.update_cmd(cmd) + + if not self.wait_for_condition( + lambda: self.get_latest_chassis() + and self.get_latest_chassis().driving_mode + == chassis_pb2.Chassis.COMPLETE_AUTO_DRIVE, + 5.0, + "Enter Auto Mode", + ): + return False + + cmd.parking_brake = False + cmd.gear_location = chassis_pb2.Chassis.GEAR_DRIVE + cmd.brake = 0.0 + self.update_cmd(cmd) + + return True + + def _safety_monitor(self): + """Watchdog: Checks for Spacebar press and Chassis Errors.""" + while not self.estop_triggered.is_set(): + # 1. Check Keyboard + key = self.ui.get_input() + if key == ord(" "): + self.trigger_estop("User requested Emergency Stop (Spacebar)") + + # 2. Check Chassis Error (Optional: Configurable) + # msg = self.get_chassis() + # if msg and msg.error_code != ... + + time.sleep(0.05) + + def trigger_estop(self, reason): + if self.estop_triggered.is_set(): + return + self.estop_triggered.set() + self.ui.log(f"!!! ESTOP TRIGGERED: {reason} !!!", "RED") + + # Construct Safe Stop Command + cmd = create_base_command() + cmd.throttle = 0.0 + cmd.brake = self.config.limits.get("emergency_brake_val", 50.0) + cmd.parking_brake = True + cmd.driving_mode = chassis_pb2.Chassis.COMPLETE_MANUAL + + self.update_cmd(cmd) + + def reset_to_safe_state(self): + self.ui.log("Resetting to safe state (N, Brake)...", "CYAN") + cmd = create_base_command() + cmd.gear_location = chassis_pb2.Chassis.GEAR_NEUTRAL + cmd.brake = 20.0 + cmd.parking_brake = True + self.update_cmd(cmd) + time.sleep(1.0) + + def run_sequence(self): + tests = [t for t in self.test_cases if t["level"] <= self.run_level] + + if not tests: + self.ui.log("No tests found for this level.", "YELLOW") + return + + for i, case in enumerate(tests): + if self.estop_triggered.is_set(): + break + + self.reset_to_safe_state() + self.ui.log(f"[{i+1}/{len(tests)}] Running: {case['name']}", "DEFAULT") + + try: + # Pass self (runner) to the test function + result = case["func"](self) + + status = "PASS" if result.get("pass") else "FAIL" + color = "GREEN" if result.get("pass") else "RED" + self.ui.log(f" -> {status}: {result.get('details')}", color) + + except Exception as e: + self.ui.log(f" -> EXCEPTION: {e}", "RED") + + time.sleep(2) + + if self.estop_triggered.is_set(): + self.ui.log("Sequence aborted due to ESTOP.", "RED") + else: + self.ui.log("All tests completed.", "GREEN") + self.ui.log("Press any key to exit.", "CYAN") + self.ui.wait_for_enter() + + def cleanup(self): + self.heartbeat.stop() + self.estop_triggered.set() + if hasattr(self, "safety_thread"): + self.safety_thread.join(timeout=1.0) + cyber.shutdown() diff --git a/modules/tools/whl-can/auto_test/core/ui.py b/modules/tools/whl-can/auto_test/core/ui.py new file mode 100644 index 00000000..3494849e --- /dev/null +++ b/modules/tools/whl-can/auto_test/core/ui.py @@ -0,0 +1,121 @@ +import curses +import threading +from datetime import datetime + + +class UIHandler: + """ + Thread-safe abstraction for Curses UI. + Splits screen into Header, Log Area, and Status Footer. + """ + + # Color Pairs + PAIR_DEFAULT = 1 + PAIR_GREEN = 2 + PAIR_RED = 3 + PAIR_YELLOW = 4 + PAIR_CYAN = 5 + + COLOR_MAP = { + "DEFAULT": PAIR_DEFAULT, + "GREEN": PAIR_GREEN, + "RED": PAIR_RED, + "YELLOW": PAIR_YELLOW, + "CYAN": PAIR_CYAN, + } + + def __init__(self, stdscr): + self.stdscr = stdscr + self.lock = threading.Lock() + self._setup_curses() + + def _setup_curses(self): + curses.curs_set(0) # Hide cursor + self.stdscr.nodelay(True) # Non-blocking input + curses.start_color() + curses.use_default_colors() + + # Init pairs + curses.init_pair(self.PAIR_DEFAULT, curses.COLOR_WHITE, -1) + curses.init_pair(self.PAIR_GREEN, curses.COLOR_GREEN, -1) + curses.init_pair(self.PAIR_RED, curses.COLOR_RED, -1) + curses.init_pair(self.PAIR_YELLOW, curses.COLOR_YELLOW, -1) + curses.init_pair(self.PAIR_CYAN, curses.COLOR_CYAN, -1) + + def clear(self): + with self.lock: + self.stdscr.clear() + self.stdscr.refresh() + + def get_input(self): + """Non-blocking input retrieval""" + try: + return self.stdscr.getch() + except curses.error: + return -1 + + def wait_for_enter(self): + """Blocking wait for Enter key""" + self.stdscr.nodelay(False) + while True: + key = self.stdscr.getch() + if key == 10 or key == 13: # Enter + break + self.stdscr.nodelay(True) + + def draw_header(self, title, level): + with self.lock: + h, w = self.stdscr.getmaxyx() + header_text = f"{title} | Level: {level}" + self.stdscr.addstr( + 0, + 0, + header_text[: w - 1], + curses.color_pair(self.PAIR_CYAN) | curses.A_BOLD, + ) + self.stdscr.addstr( + 1, 0, "-" * (w - 1), curses.color_pair(self.PAIR_DEFAULT) + ) + + # Safety instruction always visible + safety_msg = "🚨 PRESS [SPACEBAR] FOR EMERGENCY STOP 🚨" + self.stdscr.addstr( + h - 1, + 0, + safety_msg[: w - 1], + curses.color_pair(self.PAIR_RED) | curses.A_REVERSE, + ) + self.stdscr.refresh() + + def log(self, message: str, color: str = "DEFAULT"): + """ + Scrollable log output. + """ + with self.lock: + timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3] + full_msg = f"[{timestamp}] {message}" + + h, w = self.stdscr.getmaxyx() + # Log area is between header (lines 0-1) and footer (line h-1) + # Simplistic approach: just append and let curses scroll or handle manually + # For robustness, we usually just write to the line above footer and scroll up + + y, x = self.stdscr.getyx() + + # If at bottom of log area, clear and reset (simple ring buffer visual) + if y >= h - 2: + self.stdscr.move(2, 0) + self.stdscr.clrtobot() + # Redraw footer + safety_msg = "🚨 PRESS [SPACEBAR] FOR EMERGENCY STOP 🚨" + self.stdscr.addstr( + h - 1, + 0, + safety_msg[: w - 1], + curses.color_pair(self.PAIR_RED) | curses.A_REVERSE, + ) + self.stdscr.move(2, 0) + + pair = self.COLOR_MAP.get(color.upper(), self.PAIR_DEFAULT) + self.stdscr.addstr(full_msg + "\n", curses.color_pair(pair)) + self.stdscr.refresh() diff --git a/modules/tools/whl-can/auto_test/tests/__init__.py b/modules/tools/whl-can/auto_test/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/modules/tools/whl-can/auto_test/tests/l1_static.py b/modules/tools/whl-can/auto_test/tests/l1_static.py new file mode 100644 index 00000000..b23ce6dd --- /dev/null +++ b/modules/tools/whl-can/auto_test/tests/l1_static.py @@ -0,0 +1,301 @@ +import time +from util import success, fail, create_base_command +from modules.common_msgs.chassis_msgs import chassis_pb2 + + +def test_l1_driving_mode_transition(runner): + """TC-FUNC-01: Driving Mode Transition""" + runner.ui.log("Step 1: Requesting AUTO...", "CYAN") + cmd = create_base_command() + runner.update_command(cmd) + + if not runner.wait_for_condition( + lambda: runner.get_latest_chassis().driving_mode + == chassis_pb2.Chassis.COMPLETE_AUTO_DRIVE, + 5.0, + "Enter Auto", + ): + return fail("Failed to enter Auto Mode") + + runner.ui.log("Step 2: Press Brake to disengage...", "YELLOW") + if not runner.wait_for_condition( + lambda: runner.get_latest_chassis().driving_mode + == chassis_pb2.Chassis.COMPLETE_MANUAL, + 15.0, + "Manual Disengagement", + ): + return fail("Failed to disengage") + + return success("Mode transition verified") + + +def test_l1_steering_performance_static(runner): + """TC-CTRL-07: Static Steering Tracking""" + cmd = create_base_command() + cmd.gear_location = chassis_pb2.Chassis.GEAR_DRIVE + cmd.parking_brake = False + runner.update_command(cmd) + + target = 40.0 + runner.ui.log(f"Commanding Steering {target}%...", "CYAN") + + cmd.steering_target = target + runner.update_command(cmd) + + # Wait for steady state + time.sleep(3.0) + + chassis = runner.get_latest_chassis() + if not chassis: + return fail("No chassis feedback") + actual = chassis.steering_percentage + err = abs(actual - target) + + if err < 2.0: + return success(f"Target: {target}%, Actual: {actual:.1f}%") + else: + return fail(f"Error {err:.1f}% too high") + + +def test_l1_static_max_steering_rate(runner): + """ + TC-CTRL-08: Static Max Steering Rate (Dry Steering) + Verify the system stability under extreme load conditions when turning in place + (without overheating or current overload protection). + """ + cmd = create_base_command() + cmd.gear_location = chassis_pb2.Chassis.GEAR_DRIVE + cmd.parking_brake = False + runner.update_command(cmd) + + # 1. Get back on track first + runner.ui.log("Centering Steering...", "CYAN") + cmd.steering_target = 0.0 + runner.update_command(cmd) + time.sleep(3.0) + + # 2. Full sweep from full left to full right + max_rate_check_points = [-100.0, 100.0] + + for target in max_rate_check_points: + if runner.estop_triggered.is_set(): + return fail("Aborted") + + runner.ui.log(f"High Speed Turn to {target}%...", "YELLOW") + + # Record start time + start_time = time.time() + + cmd.steering_target = target + # Assuming protocol supports setting steering rate, set to 100% + # cmd.steering_rate = 100.0 + runner.update_command(cmd) + + # Allow sufficient timeout + timeout = 5.0 + success_flag = False + + while time.time() - start_time < timeout: + chassis = runner.get_latest_chassis() + actual = chassis.steering_percentage + + # Check for error codes (such as EPS overheating). + if chassis.error_code != chassis_pb2.Chassis.NO_ERROR: + return fail(f"Chassis Error during turn: {chassis.error_code}") + + # Arrival determination: Error < 1% + if abs(actual - target) < 1.0: + duration = time.time() - start_time + runner.ui.log(f" -> Reached in {duration:.2f}s", "GREEN") + success_flag = True + break + + time.sleep(0.1) + + if not success_flag: + return fail(f"Timeout: Steering failed to reach {target}% under load") + + # 3. Return to center + cmd.steering_target = 0.0 + runner.update_command(cmd) + + return success("Max Steering Rate Stability OK") + + +def test_l1_static_brake_consistency(runner): + """ + TC-CTRL-04-S: Static Brake Command Consistency + Verify if the drive-by-wire brake accurately responds to commands when the vehicle is stationary. + """ + # 1. Safety Initialization: P gear + EPB released (testing Service Brake) + cmd = create_base_command() + cmd.gear_location = chassis_pb2.Chassis.GEAR_PARKING + cmd.parking_brake = False + runner.update_command(cmd) + + runner.ui.log("Setup: Park Gear, EPB Released. Testing Service Brake...", "YELLOW") + time.sleep(2.0) + + # 2. Test points: cover low, medium, and high ranges + test_points = [0, 30, 70, 100] + tolerance = 1.0 # Allow 1% error + try: + for target in test_points: + # Emergency stop check + if runner.estop_triggered.is_set(): + return fail("User Aborted") + + runner.ui.log(f"Commanding Brake: {target}%...", "CYAN") + cmd.brake = float(target) + runner.update_command(cmd) + + # Wait for hydraulic build-up to stabilize (usually quick when stationary, 2s is sufficient) + time.sleep(2.0) + + # Get feedback + chassis = runner.get_latest_chassis() + if not chassis: + return fail("No chassis feedback") + + actual = chassis.brake_percentage + diff = abs(actual - target) + + # 3. Verify consistency + runner.ui.log(f" -> Feedback: {actual:.1f}% (Diff: {diff:.1f}%)", "WHITE") + + if diff > tolerance: + return fail(f"Mismatch at {target}%: Cmd={target}, Act={actual:.1f}") + + finally: + # Restore safe state + runner.reset_to_safe_state() + + return success("Static Brake Consistency OK") + + +def test_l1_static_gear_shift(runner): + """TC-FUNC-02: Static Gear Shift""" + cmd = create_base_command() + cmd.brake = 50.0 + + for gear in [ + chassis_pb2.Chassis.GEAR_REVERSE, + chassis_pb2.Chassis.GEAR_DRIVE, + chassis_pb2.Chassis.GEAR_PARKING, + ]: + cmd.gear_location = gear + runner.update_command(cmd) + if not runner.wait_for_condition( + lambda: runner.get_latest_chassis().gear_location == gear, + 3.0, + f"Shift Gear {gear}", + ): + return fail(f"Failed to shift to Gear {gear}") + return success("Gear shift OK") + + +def test_l1_epb_toggle(runner): + """TC-FUNC-04: EPB Toggle""" + cmd = create_base_command() + + cmd.parking_brake = True + runner.update_command(cmd) + if not runner.wait_for_condition( + lambda: runner.get_latest_chassis().parking_brake, 3.0, "EPB Engage" + ): + return fail("EPB Engage Failed") + + cmd.parking_brake = False + runner.update_command(cmd) + if not runner.wait_for_condition( + lambda: not runner.get_latest_chassis().parking_brake, 3.0, "EPB Release" + ): + return fail("EPB Release Failed") + return success("EPB OK") + + +def test_l1_signal_control(runner): + """TC-SIG-01~04: Turn Signals, Beams, Hazard, Horn""" + cmd = create_base_command() + + # TC-SIG-01: Turn Signals + cmd.signal.turn_signal = 1 # LEFT + runner.update_command(cmd) + if not runner.wait_for_condition( + lambda: runner.get_latest_chassis() + and runner.get_latest_chassis().signal.turn_signal == 1, + 1.0, + "Left turn signal", + ): + return fail("Left Turn Signal Failed") + + cmd.signal.turn_signal = 2 # RIGHT + runner.update_command(cmd) + if not runner.wait_for_condition( + lambda: runner.get_latest_chassis() + and runner.get_latest_chassis().signal.turn_signal == 2, + 1.0, + "Right turn signal", + ): + return fail("Right Turn Signal Failed") + + cmd.signal.turn_signal = 3 # NONE + runner.update_command(cmd) + + # TC-SIG-02: High/Low Beam + cmd.signal.high_beam = True + runner.update_command(cmd) + if not runner.wait_for_condition( + lambda: runner.get_latest_chassis() + and runner.get_latest_chassis().signal.high_beam, + 1.0, + "High beam", + ): + return fail("High Beam Failed") + cmd.signal.high_beam = False + cmd.signal.low_beam = True + runner.update_command(cmd) + if not runner.wait_for_condition( + lambda: runner.get_latest_chassis() + and runner.get_latest_chassis().signal.low_beam, + 1.0, + "Low beam", + ): + return fail("Low Beam Failed") + cmd.signal.low_beam = False + runner.update_command(cmd) + + # TC-SIG-03: Hazard + cmd.signal.emergency_light = True + runner.update_command(cmd) + if not runner.wait_for_condition( + lambda: runner.get_latest_chassis() + and runner.get_latest_chassis().signal.emergency_light, + 1.0, + "Emergency light", + ): + return fail("Emergency Light Failed") + cmd.signal.emergency_light = False + runner.update_command(cmd) + + # TC-SIG-04: Horn + cmd.signal.horn = True + runner.update_command(cmd) + if not runner.wait_for_condition( + lambda: runner.get_latest_chassis() and runner.get_latest_chassis().signal.horn, + 1.0, + "Horn ON", + ): + return fail("Horn Failed") + + cmd.signal.horn = False + runner.update_command(cmd) + if not runner.wait_for_condition( + lambda: runner.get_latest_chassis() + and not runner.get_latest_chassis().signal.horn, + 1.0, + "Horn OFF", + ): + return fail("Horn Release Failed") + + return success("Signals OK") diff --git a/modules/tools/whl-can/auto_test/tests/l2_dynamic_low.py b/modules/tools/whl-can/auto_test/tests/l2_dynamic_low.py new file mode 100644 index 00000000..8bb85055 --- /dev/null +++ b/modules/tools/whl-can/auto_test/tests/l2_dynamic_low.py @@ -0,0 +1,294 @@ +import time +from util import success, fail, create_base_command +from core.reporter import DataRecorder + +from modules.common_msgs.chassis_msgs import chassis_pb2 + + +def test_l2_throttle_linearity(runner): + """TC-CTRL-01: Throttle Linearity (0-30%)""" + if not runner.prepare_for_drive(): + return fail("Prep Failed") + + recorder = DataRecorder("L2_Throttle_Linearity") + recorder.start() + + cmd = create_base_command() + cmd.parking_brake = False + cmd.gear_location = chassis_pb2.Chassis.GEAR_DRIVE + + try: + for throttle in [0, 10, 20, 30]: + if runner.estop_triggered.is_set(): + return fail("Aborted") + + cmd.throttle = float(throttle) + runner.update_command(cmd) + + # Hold for 3s, recording at high freq + start_hold = time.time() + while time.time() - start_hold < 3.0: + chassis = runner.get_latest_chassis() + if chassis: + recorder.record( + throttle, chassis.throttle_percentage, chassis.speed_mps + ) + time.sleep(0.01) + + actual = runner.get_latest_chassis().throttle_percentage + if abs(actual - throttle) > 3.0: + return fail(f"Linearity error at {throttle}%") + + finally: + recorder.stop() + path = recorder.save_and_plot("Linearity Test", "Throttle %", "throttle_lin") + runner.ui.log(f"Plot saved: {path}", "CYAN") + + return success("Linearity OK") + + +def test_l2_brake_linearity(runner): + """TC-CTRL-04: Brake Linearity (0-30%)""" + if not runner.prepare_for_drive(): + return fail("Prep Failed") + + recorder = DataRecorder("L2_Brake_Linearity") + recorder.start() + + cmd = create_base_command() + cmd.parking_brake = False + cmd.gear_location = chassis_pb2.Chassis.GEAR_DRIVE + + # Build minimal speed before brake steps + cmd.throttle = 10.0 + runner.update_command(cmd) + time.sleep(2.0) + cmd.throttle = 0.0 + runner.update_command(cmd) + + try: + for brake in [0, 10, 20, 30]: + if runner.estop_triggered.is_set(): + return fail("Aborted") + + cmd.brake = float(brake) + runner.update_command(cmd) + + start_hold = time.time() + while time.time() - start_hold < 3.0: + chassis = runner.get_latest_chassis() + if chassis: + recorder.record(brake, chassis.brake_percentage, chassis.speed_mps) + time.sleep(0.01) + + chassis = runner.get_latest_chassis() + if not chassis: + return fail("No chassis feedback") + + actual = chassis.brake_percentage + if abs(actual - brake) > 3.0: + return fail(f"Linearity error at {brake}%") + + finally: + recorder.stop() + path = recorder.save_and_plot("Linearity Test", "Brake %", "brake_lin") + runner.ui.log(f"Plot saved: {path}", "CYAN") + + return success("Linearity OK") + + +def test_l2_brake_response_time(runner): + """TC-CTRL-05: Brake Response Time (Step 0->20%)""" + cmd = create_base_command() + cmd.parking_brake = False + runner.update_command(cmd) + time.sleep(1.0) + + recorder = DataRecorder("L2_Brake_Response") + recorder.start() + + target = 20.0 + cmd.brake = target + start_time = time.time() + runner.update_command(cmd) + + t90 = -1 + + try: + while time.time() - start_time < 2.0: + chassis = runner.get_latest_chassis() + if chassis: + recorder.record(target, chassis.brake_percentage) + if t90 < 0 and chassis.brake_percentage >= target * 0.9: + t90 = (time.time() - start_time) * 1000 + time.sleep(0.01) + finally: + recorder.stop() + path = recorder.save_and_plot("Step Response", "Brake %", "brake_resp") + + if t90 < 0: + return fail("Response Timeout") + if t90 > 400: + return fail(f"Response too slow: {t90:.0f}ms") + + return success(f"Response Time: {t90:.0f}ms", plot=path) + + +def test_l2_speed_control_loop(runner): + """TC-CTRL-09: Low Speed Loop (2 m/s)""" + if not runner.prepare_for_drive(): + return fail("Prep Failed") + + recorder = DataRecorder("L2_Speed_Loop") + recorder.start() + + target_speed = 2.0 + cmd = create_base_command() + cmd.gear_location = chassis_pb2.Chassis.GEAR_DRIVE + cmd.parking_brake = False + cmd.speed = target_speed + runner.update_command(cmd) + + start = time.time() + try: + while time.time() - start < 10.0: + chassis = runner.get_latest_chassis() + if chassis: + recorder.record(target_speed, chassis.speed_mps) + if runner.estop_triggered.is_set(): + break + time.sleep(0.01) + + chassis = runner.get_latest_chassis() + if not chassis: + return fail("No chassis feedback") + final_speed = chassis.speed_mps + err = abs(final_speed - target_speed) + + # Stop + cmd.speed = 0 + cmd.brake = 20.0 + runner.update_command(cmd) + + finally: + recorder.stop() + recorder.save_and_plot("Speed Control", "Speed (m/s)", "speed_loop") + + if err > 0.3: + return fail(f"Speed Error {err:.2f} m/s") + return success(f"Stable Speed: {final_speed:.2f} m/s") + + +def test_l2_gear_protection(runner): + """TC-FUNC-03: Gear Protection (Reverse while moving)""" + if not runner.prepare_for_drive(): + return fail("Prep Failed") + + # 1. Move forward > 1m/s + cmd = create_base_command() + cmd.parking_brake = False + cmd.gear_location = chassis_pb2.Chassis.GEAR_DRIVE + cmd.speed = 1.5 + runner.update_command(cmd) + + if not runner.wait_for_condition( + lambda: runner.get_latest_chassis().speed_mps > 1.0, 5.0, "Speed > 1m/s" + ): + return fail("Could not build speed") + + # 2. Try Reverse + runner.ui.log("Attempting Shift to REVERSE...", "YELLOW") + cmd.gear_location = chassis_pb2.Chassis.GEAR_REVERSE + runner.update_command(cmd) + time.sleep(2.0) + + final_gear = runner.get_latest_chassis().gear_location + + # Stop + cmd.speed = 0 + cmd.brake = 50.0 + cmd.gear_location = chassis_pb2.Chassis.GEAR_NEUTRAL + runner.update_command(cmd) + + if final_gear == chassis_pb2.Chassis.GEAR_REVERSE: + return fail("Safety Failure: Shifted to R while moving!") + return success("Gear Protection Active") + + +def test_l2_mode_protection(runner): + """TC-FUNC-06: Motion Mode Protection (Park while moving)""" + if not runner.prepare_for_drive(): + return fail("Prep Failed") + + # 1. Move forward + cmd = create_base_command() + cmd.parking_brake = False + cmd.gear_location = chassis_pb2.Chassis.GEAR_DRIVE + cmd.speed = 2.0 + runner.update_command(cmd) + + if not runner.wait_for_condition( + lambda: runner.get_latest_chassis().speed_mps > 1.5, 5.0, "Speed > 1.5m/s" + ): + return fail("Could not build speed") + + # 2. Try Shift to PARK + runner.ui.log("Attempting Shift to PARK while moving...", "YELLOW") + cmd.gear_location = chassis_pb2.Chassis.GEAR_PARKING + runner.update_command(cmd) + time.sleep(2.0) + + final_gear = runner.get_latest_chassis().gear_location + + # Cleanup + cmd.speed = 0 + cmd.brake = 50.0 + cmd.gear_location = chassis_pb2.Chassis.GEAR_NEUTRAL + runner.update_command(cmd) + + if final_gear == chassis_pb2.Chassis.GEAR_PARKING: + return fail("Safety Failure: Shifted to PARK while moving!") + + return success("Park Inhibition Active") + + +def test_l2_throttle_response_time(runner): + """TC-CTRL-02: Throttle Response Time (Step 0->20%)""" + if not runner.prepare_for_drive(): + return fail("Prep Failed") + + cmd = create_base_command() + cmd.parking_brake = False + cmd.gear_location = chassis_pb2.Chassis.GEAR_DRIVE + cmd.throttle = 0.0 + runner.update_command(cmd) + time.sleep(0.5) + + recorder = DataRecorder("L2_Throttle_Response") + recorder.start() + + target = 20.0 + cmd.throttle = target + start_time = time.time() + runner.update_command(cmd) + + t90 = -1 + + try: + while time.time() - start_time < 2.0: + chassis = runner.get_latest_chassis() + if chassis: + recorder.record(target, chassis.throttle_percentage, chassis.speed_mps) + if t90 < 0 and chassis.throttle_percentage >= target * 0.9: + t90 = (time.time() - start_time) * 1000 + time.sleep(0.01) + finally: + recorder.stop() + path = recorder.save_and_plot("Step Response", "Throttle %", "throttle_resp") + + if t90 < 0: + return fail("Response Timeout") + if t90 > 400: + return fail(f"Response too slow: {t90:.0f}ms") + + return success(f"Response Time: {t90:.0f}ms", plot=path) diff --git a/modules/tools/whl-can/auto_test/tests/l3_dynamic_high.py b/modules/tools/whl-can/auto_test/tests/l3_dynamic_high.py new file mode 100644 index 00000000..fb33f109 --- /dev/null +++ b/modules/tools/whl-can/auto_test/tests/l3_dynamic_high.py @@ -0,0 +1,181 @@ +import time +from util import success, fail, create_base_command +from core.reporter import DataRecorder +from modules.common_msgs.chassis_msgs import chassis_pb2 + + +def run_staged_accel(runner, target_kph, acc_limit_mps2): + """Helper for TC-CTRL-10""" + target_mps = target_kph / 3.6 + runner.ui.log( + f"--- Accel Test: 0-{target_kph} km/h @ {acc_limit_mps2} m/s^2 ---", "CYAN" + ) + + if not runner.prepare_for_drive(): + return fail("Prep Failed") + + recorder = DataRecorder(f"L3_Accel_0_{target_kph}") + recorder.start() + + cmd = create_base_command() + cmd.parking_brake = False + cmd.gear_location = chassis_pb2.Chassis.GEAR_DRIVE + cmd.speed = target_mps + 5.0 # Set target slightly higher to ensure limit is hit + cmd.acceleration = acc_limit_mps2 + + start_t = time.time() + runner.update_command(cmd) + + reached = False + duration = 0 + + try: + # Timeout allows for slow acceleration + timeout = 20.0 if target_kph < 60 else 40.0 + while time.time() - start_t < timeout: + chassis = runner.get_latest_chassis() + if chassis: + recorder.record( + acc_limit_mps2, + chassis.instantaneous_acceleration, + chassis.speed_mps, + ) + + if chassis.speed_mps >= target_mps: + duration = time.time() - start_t + reached = True + break + + if runner.estop_triggered.is_set(): + return fail("Aborted") + time.sleep(0.01) + + # Safe Stop + cmd.speed = 0 + cmd.acceleration = 0 + cmd.brake = 30.0 + runner.update_command(cmd) + while True: + chassis = runner.get_latest_chassis() + if not chassis or chassis.speed_mps <= 0.1: + break + time.sleep(0.01) + + finally: + recorder.stop() + recorder.save_and_plot( + f"Accel 0-{target_kph}", "Accel (m/s^2)", f"accel_0_{target_kph}" + ) + + if not reached: + return fail(f"Failed to reach {target_kph} km/h") + return success(f"0-{target_kph} km/h Time: {duration:.2f}s") + + +def run_staged_brake(runner, start_kph, brake_pct): + """Helper for TC-CTRL-06""" + start_mps = start_kph / 3.6 + runner.ui.log( + f"--- Brake Test: {start_kph}-0 km/h @ {brake_pct}% Brake ---", "CYAN" + ) + + # 1. Accelerate to start speed + if not runner.prepare_for_drive(): + return fail("Prep Failed") + cmd = create_base_command() + cmd.parking_brake = False + cmd.gear_location = chassis_pb2.Chassis.GEAR_DRIVE + cmd.speed = start_mps + cmd.acceleration = 2.0 + runner.update_command(cmd) + + if not runner.wait_for_condition( + lambda: runner.get_latest_chassis().speed_mps >= start_mps * 0.95, + 30.0, + f"Reach {start_kph} km/h", + ): + return fail("Failed to reach start speed") + + # 2. Apply Brake + recorder = DataRecorder(f"L3_Brake_{start_kph}_0") + recorder.start() + + cmd.speed = 0 + cmd.acceleration = 0 + cmd.brake = brake_pct + trigger_time = time.time() + runner.update_command(cmd) + + stop_time = 0 + stopped = False + + try: + while time.time() - trigger_time < 15.0: + chassis = runner.get_latest_chassis() + if chassis: + recorder.record(brake_pct, chassis.brake_percentage, chassis.speed_mps) + + if chassis.speed_mps < 0.1: + stop_time = time.time() - trigger_time + stopped = True + break + time.sleep(0.01) + finally: + recorder.stop() + recorder.save_and_plot( + f"Brake {start_kph}-0", "Speed (m/s)", f"brake_{start_kph}_0" + ) + + if not stopped: + return fail("Failed to stop") + return success(f"Stop Time: {stop_time:.2f}s") + + +# --- Test Cases Exported --- + + +def test_l3_staged_acceleration(runner): + """TC-CTRL-10: Staged Acceleration (Low/Mid/High)""" + # 1. Low Speed + res1 = run_staged_accel(runner, 30, 1.5) + if not res1["pass"]: + return res1 + time.sleep(2) + + # 2. Mid Speed + res2 = run_staged_accel(runner, 60, 2.0) + if not res2["pass"]: + return res2 + time.sleep(2) + + # 3. High Speed (Run only if safe!) + # res3 = run_staged_accel(runner, 100, 2.5) + + return success("Staged Acceleration Passed (Low/Mid)") + + +def test_l3_staged_braking(runner): + """TC-CTRL-06: Staged Braking (30/60/100)""" + # 1. 30-0 + res1 = run_staged_brake(runner, 30, 30.0) + if not res1["pass"]: + return res1 + time.sleep(3) + + # 2. 60-0 + res2 = run_staged_brake(runner, 60, 50.0) + if not res2["pass"]: + return res2 + + return success("Staged Braking Passed") + + +def test_l3_staged_braking_performance(runner): + """TC-CTRL-06: Staged Braking (30/60/100)""" + return test_l3_staged_braking(runner) + + +def test_l3_emergency_brake(runner): + """TC-CTRL-06: Emergency Brake (100km/h -> 0)""" + # This is high risk, ensure safety + return run_staged_brake(runner, 100, 100.0) diff --git a/modules/tools/whl-can/auto_test/util.py b/modules/tools/whl-can/auto_test/util.py new file mode 100644 index 00000000..de16d26e --- /dev/null +++ b/modules/tools/whl-can/auto_test/util.py @@ -0,0 +1,66 @@ +# util.py +from modules.common_msgs.control_msgs import control_cmd_pb2 +from modules.common_msgs.chassis_msgs import chassis_pb2 +from typing import Dict, Any + + +def fail(details: str, **kwargs) -> Dict[str, Any]: + return {"pass": False, "details": details, **kwargs} + + +def success(details: str, **kwargs) -> Dict[str, Any]: + return {"pass": True, "details": details, **kwargs} + + +def create_base_command() -> control_cmd_pb2.ControlCommand: + """Creates a base command with safe defaults (AUTO mode, Neutral, Brake).""" + cmd = control_cmd_pb2.ControlCommand() + cmd.pad_msg.driving_mode = chassis_pb2.Chassis.COMPLETE_AUTO_DRIVE + cmd.pad_msg.action = 2 # START + cmd.throttle = 0.0 + cmd.brake = 0.0 + cmd.steering_target = 0.0 + cmd.speed = 0.0 + cmd.acceleration = 0.0 + cmd.parking_brake = True + cmd.gear_location = chassis_pb2.Chassis.GEAR_NEUTRAL + return cmd + + +def calculate_response_metrics(start_time, data_stream, target): + """Simple metric calc. data_stream: [(t, val), ...]""" + if not data_stream: + return { + "response_time_ms": 0, + "overshoot_percent": 0, + "steady_state_error_percent": 0, + } + + # Filter after start + valid_data = [d for d in data_stream if d[0] >= start_time] + if not valid_data: + return { + "response_time_ms": 0, + "overshoot_percent": 0, + "steady_state_error_percent": 0, + } + + # Steady state (last 0.5s) + end_time = valid_data[-1][0] + steady_vals = [v for t, v in valid_data if t > end_time - 0.5] + avg = sum(steady_vals) / len(steady_vals) if steady_vals else 0 + + err_pct = abs(avg - target) + + # Response time (simplistic t90) + t90 = start_time + for t, v in valid_data: + if abs(v) >= abs(target * 0.9): + t90 = t + break + + return { + "response_time_ms": int((t90 - start_time) * 1000), + "overshoot_percent": 0.0, # Simplification + "steady_state_error_percent": round(err_pct, 2), + } From 892e00cb7894ee4c374bc48ff541aa3a91e072b4 Mon Sep 17 00:00:00 2001 From: zero Date: Wed, 11 Feb 2026 14:55:30 +0800 Subject: [PATCH 2/8] Update modules/tools/whl-can/auto_test/tests/l2_dynamic_low.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- modules/tools/whl-can/auto_test/tests/l2_dynamic_low.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/modules/tools/whl-can/auto_test/tests/l2_dynamic_low.py b/modules/tools/whl-can/auto_test/tests/l2_dynamic_low.py index 8bb85055..d03e880c 100644 --- a/modules/tools/whl-can/auto_test/tests/l2_dynamic_low.py +++ b/modules/tools/whl-can/auto_test/tests/l2_dynamic_low.py @@ -35,7 +35,11 @@ def test_l2_throttle_linearity(runner): ) time.sleep(0.01) - actual = runner.get_latest_chassis().throttle_percentage + chassis = runner.get_latest_chassis() + if not chassis: + return fail("No chassis feedback") + + actual = chassis.throttle_percentage if abs(actual - throttle) > 3.0: return fail(f"Linearity error at {throttle}%") From 9dd16ee009018e30c77974dcbf529c33eb602b2c Mon Sep 17 00:00:00 2001 From: zero Date: Wed, 11 Feb 2026 14:56:46 +0800 Subject: [PATCH 3/8] Update modules/tools/whl-can/auto_test/tests/l2_dynamic_low.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../whl-can/auto_test/tests/l2_dynamic_low.py | 43 +++++++++++-------- 1 file changed, 24 insertions(+), 19 deletions(-) diff --git a/modules/tools/whl-can/auto_test/tests/l2_dynamic_low.py b/modules/tools/whl-can/auto_test/tests/l2_dynamic_low.py index d03e880c..1a1da33e 100644 --- a/modules/tools/whl-can/auto_test/tests/l2_dynamic_low.py +++ b/modules/tools/whl-can/auto_test/tests/l2_dynamic_low.py @@ -195,28 +195,33 @@ def test_l2_gear_protection(runner): cmd.speed = 1.5 runner.update_command(cmd) - if not runner.wait_for_condition( - lambda: runner.get_latest_chassis().speed_mps > 1.0, 5.0, "Speed > 1m/s" - ): - return fail("Could not build speed") - - # 2. Try Reverse - runner.ui.log("Attempting Shift to REVERSE...", "YELLOW") - cmd.gear_location = chassis_pb2.Chassis.GEAR_REVERSE - runner.update_command(cmd) - time.sleep(2.0) + try: + if not runner.wait_for_condition( + lambda: runner.get_latest_chassis().speed_mps > 1.0, 5.0, "Speed > 1m/s" + ): + return fail("Could not build speed") + + # 2. Try Reverse + runner.ui.log("Attempting Shift to REVERSE...", "YELLOW") + cmd.gear_location = chassis_pb2.Chassis.GEAR_REVERSE + runner.update_command(cmd) + time.sleep(2.0) - final_gear = runner.get_latest_chassis().gear_location + chassis = runner.get_latest_chassis() + if not chassis: + return fail("No chassis feedback") - # Stop - cmd.speed = 0 - cmd.brake = 50.0 - cmd.gear_location = chassis_pb2.Chassis.GEAR_NEUTRAL - runner.update_command(cmd) + final_gear = chassis.gear_location - if final_gear == chassis_pb2.Chassis.GEAR_REVERSE: - return fail("Safety Failure: Shifted to R while moving!") - return success("Gear Protection Active") + if final_gear == chassis_pb2.Chassis.GEAR_REVERSE: + return fail("Safety Failure: Shifted to R while moving!") + return success("Gear Protection Active") + finally: + # Stop + cmd.speed = 0 + cmd.brake = 50.0 + cmd.gear_location = chassis_pb2.Chassis.GEAR_NEUTRAL + runner.update_command(cmd) def test_l2_mode_protection(runner): From cd8817023f8094de56b9889db1bb8bf8cbdecac0 Mon Sep 17 00:00:00 2001 From: zero Date: Wed, 11 Feb 2026 14:57:54 +0800 Subject: [PATCH 4/8] Update modules/tools/whl-can/auto_test/core/runner.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- modules/tools/whl-can/auto_test/core/runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/tools/whl-can/auto_test/core/runner.py b/modules/tools/whl-can/auto_test/core/runner.py index dd188bf5..9730c535 100644 --- a/modules/tools/whl-can/auto_test/core/runner.py +++ b/modules/tools/whl-can/auto_test/core/runner.py @@ -228,7 +228,7 @@ def run_sequence(self): self.ui.log("Sequence aborted due to ESTOP.", "RED") else: self.ui.log("All tests completed.", "GREEN") - self.ui.log("Press any key to exit.", "CYAN") + self.ui.log("Press [Enter] to exit.", "CYAN") self.ui.wait_for_enter() def cleanup(self): From 03ca1669b3342196cfa4037629e253c468eb366f Mon Sep 17 00:00:00 2001 From: zero Date: Wed, 11 Feb 2026 14:58:29 +0800 Subject: [PATCH 5/8] Update modules/tools/whl-can/auto_test/tests/l2_dynamic_low.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../tools/whl-can/auto_test/tests/l2_dynamic_low.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/modules/tools/whl-can/auto_test/tests/l2_dynamic_low.py b/modules/tools/whl-can/auto_test/tests/l2_dynamic_low.py index 1a1da33e..12dc3193 100644 --- a/modules/tools/whl-can/auto_test/tests/l2_dynamic_low.py +++ b/modules/tools/whl-can/auto_test/tests/l2_dynamic_low.py @@ -247,8 +247,16 @@ def test_l2_mode_protection(runner): runner.update_command(cmd) time.sleep(2.0) - final_gear = runner.get_latest_chassis().gear_location + chassis = runner.get_latest_chassis() + if chassis is None: + # Cleanup before failing to keep runner in a safe state + cmd.speed = 0 + cmd.brake = 50.0 + cmd.gear_location = chassis_pb2.Chassis.GEAR_NEUTRAL + runner.update_command(cmd) + return fail("No chassis feedback available") + final_gear = chassis.gear_location # Cleanup cmd.speed = 0 cmd.brake = 50.0 From 88da41a70c64e27d0003e4889b35d0070659ce84 Mon Sep 17 00:00:00 2001 From: zero Date: Wed, 11 Feb 2026 14:58:55 +0800 Subject: [PATCH 6/8] Update modules/tools/whl-can/auto_test/core/runner.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- modules/tools/whl-can/auto_test/core/runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/tools/whl-can/auto_test/core/runner.py b/modules/tools/whl-can/auto_test/core/runner.py index 9730c535..e1998b96 100644 --- a/modules/tools/whl-can/auto_test/core/runner.py +++ b/modules/tools/whl-can/auto_test/core/runner.py @@ -1,7 +1,7 @@ import threading import time import logging -from typing import Optional, List, Dict, Callable +from typing import Optional, Callable from cyber.python.cyber_py3 import cyber from modules.common_msgs.control_msgs import control_cmd_pb2 From 81745e14ab9435cb58f6a144a7573910f2353a73 Mon Sep 17 00:00:00 2001 From: zero Date: Wed, 11 Feb 2026 14:59:10 +0800 Subject: [PATCH 7/8] Update modules/tools/whl-can/auto_test/core/runner.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- modules/tools/whl-can/auto_test/core/runner.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/modules/tools/whl-can/auto_test/core/runner.py b/modules/tools/whl-can/auto_test/core/runner.py index e1998b96..8de4ce80 100644 --- a/modules/tools/whl-can/auto_test/core/runner.py +++ b/modules/tools/whl-can/auto_test/core/runner.py @@ -124,8 +124,9 @@ def wait_for_condition( try: if condition_fn(): return True - except Exception: - pass + except Exception as exc: + # Log and continue polling so transient condition errors don't abort the test. + logging.exception("Error while evaluating wait_for_condition '%s': %s", desc, exc) time.sleep(0.01) self.ui.log(f"Timeout: {desc}", "YELLOW") return False From 8e427362667e3415f6f36d2875709f28610d83ed Mon Sep 17 00:00:00 2001 From: zero Date: Wed, 11 Feb 2026 14:59:49 +0800 Subject: [PATCH 8/8] Update modules/tools/whl-can/auto_test/tests/l1_static.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- modules/tools/whl-can/auto_test/tests/l1_static.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/tools/whl-can/auto_test/tests/l1_static.py b/modules/tools/whl-can/auto_test/tests/l1_static.py index b23ce6dd..1b2fe842 100644 --- a/modules/tools/whl-can/auto_test/tests/l1_static.py +++ b/modules/tools/whl-can/auto_test/tests/l1_static.py @@ -161,7 +161,7 @@ def test_l1_static_brake_consistency(runner): diff = abs(actual - target) # 3. Verify consistency - runner.ui.log(f" -> Feedback: {actual:.1f}% (Diff: {diff:.1f}%)", "WHITE") + runner.ui.log(f" -> Feedback: {actual:.1f}% (Diff: {diff:.1f}%)", "CYAN") if diff > tolerance: return fail(f"Mismatch at {target}%: Cmd={target}, Act={actual:.1f}")