diff --git a/py-scripts/real_application_tests/real_browser/lf_interop_real_browser_test.py b/py-scripts/real_application_tests/real_browser/lf_interop_real_browser_test.py index 23f214264..464762841 100644 --- a/py-scripts/real_application_tests/real_browser/lf_interop_real_browser_test.py +++ b/py-scripts/real_application_tests/real_browser/lf_interop_real_browser_test.py @@ -50,6 +50,12 @@ python3 lf_interop_real_browser_test.py --mgr 192.168.207.78 --url "https://google.com" --duration 1m --debug --upstream_port 192.168.200.198 --coordinates 1 --do_robo --robo_ip 192.168.200.140 --rotations 30 --device_list 1.5,1.11 + Example-9: + Command Line Interface to run the Real Browser Test with Robo bandsteering and Device list + python3 lf_interop_real_browser_test.py --mgr 192.168.207.78 --duration 1 --url https://www.google.com --device_list 1.11,1.10 --count 10 + --upstream_port 192.168.204.90 --expected_passfail_value 5 --do_robo --robo_ip 192.168.200.140 --coordinates 3,2,1 --rotations "" + --duration_to_skip 1 --do_bandsteering --cycles 1 --bssids 94:A6:7E:74:26:31,94:A6:7E:74:26:22 + SCRIPT CLASSIFICATION: Test @@ -184,15 +190,20 @@ def __init__(self, selected_groups=None, selected_profiles=None, robo_ip="127.0.0.1", + do_bandsteering=False, + cycles=1, + bssids=None, coordinates_list=None, angles_list=None, do_robo=False, current_cord="", current_angle=None, rotations_enabled=False, + duration_to_skip=None ): super().__init__(lfclient_host=host, lfclient_port=8080) # Initialize attributes with provided parameters + self.original_dir = os.getcwd() self.host = host self.ssid = ssid self.report_ssid = ssid @@ -221,6 +232,7 @@ def __init__(self, self.mac_list = None self.csv_file_names = [] self.stop_signal = False + self.webui_stop_clicked = False self.device_targets = {} self.mac = 0 self.windows = 0 @@ -301,6 +313,7 @@ def __init__(self, self.utility = base.UtilityInteropWifi(host_ip=self.host) self.serial_list = [] self.do_robo = do_robo + self.do_bandsteering = do_bandsteering if self.do_robo: self.robo_ip = robo_ip self.robo_obj = robo_base_class.RobotClass(robo_ip=self.robo_ip, angle_list=angles_list) @@ -309,6 +322,19 @@ def __init__(self, self.current_cord = current_cord self.current_angle = current_angle self.rotations_enabled = rotations_enabled + self.band_csv_files = {} + self.time_to_target = {} + self.test_start_time = None + self.cycles = cycles + self.bandsteering_completed = False + self.robo_obj.time_to_reach = int(duration_to_skip) * 60 + if bssids: + self.bssids = [ + b.strip().strip('"').strip("'").upper() + for b in bssids.split(",") + ] + else: + self.bssids = [] self.robo_csv_files = [] self.robo_mobile_data = {} @@ -895,11 +921,28 @@ def start_flask_server(self): @self.app.route('/stop_rb', methods=['GET']) def stop_rb(): logging.info("Stopping the test through WEB GUI") + self.webui_stop_clicked = True response = jsonify({"message": "Stopping Zoom Test"}) response.status_code = 200 self.stop() def shutdown(): + try: + time.sleep(2) + + if getattr(self, "report_already_generated", False): + return + + if self.do_robo and not self.do_bandsteering: + self.create_robo_report() + else: + self.create_report() + + except Exception as e: + logging.error(f"[WEBUI] Report generation failed: {e}", exc_info=True) + + logging.info("[WEBUI] Report generated, exiting process") + os._exit(0) response.call_on_close(shutdown) return response @@ -1019,6 +1062,11 @@ def handle_duration(self): """ Convert duration string to minutes. """ + if self.do_robo and self.do_bandsteering: + # setting some long duration for bandsteering + self.duration = 24 * 60 + logging.info("[BANDSTEERING] Duration overridden to large value (24h)") + return if isinstance(self.duration, str): if self.duration.endswith(('s', 'S')): self.duration = round(int(self.duration[:-1]) / 60, 2) @@ -1029,6 +1077,543 @@ def handle_duration(self): else: self.duration = int(self.duration) + def get_bssid_map(self): + """ + Build a mapping of device hostnames to their associated BSSID (AP MAC). + + Returns: + dict: {hostname: bssid} mapping. + If data is unavailable, 'NA' is used as default. + """ + + bssid_map = {} + + try: + port_data = self.json_get("/ports/all")["interfaces"] + interfaces_dict = {} + for port in port_data: + interfaces_dict.update(port) + except Exception as e: + logger.error(f"Failed to fetch ports data for BSSID: {e}") + return bssid_map + + # MOBILES + for sta in self.phone_data or []: + try: + eid = self.name_to_eid(sta) + resp = self.json_get(f"/resource/{eid[0]}/{eid[1]}/list?fields=user") + hostname = resp["resource"].get("user", f"mobile_{eid[1]}") + except Exception: + hostname = f"mobile_{sta}" + + bssid_map[hostname] = interfaces_dict.get(sta, {}).get("ap", "NA") + + # LAPTOPS + for sta in self.laptops or []: + try: + eid = self.name_to_eid(sta) + resp = self.json_get(f"/resource/{eid[0]}/{eid[1]}/list?fields=hostname") + hostname = resp["resource"].get("hostname", sta) + except Exception: + hostname = sta + + bssid_map[hostname] = interfaces_dict.get(sta, {}).get("ap", "NA") + + return bssid_map + + def init_bandsteering_csv(self, device_name): + """ + Initialize a CSV file for a specific device to store bandsteering data. + + - Creates a CSV file per device if it does not already exist. + - Writes header columns required for analysis. + Args: + device_name (str): The name of the device for which to initialize the CSV file. + """ + + if device_name in self.band_csv_files: + return + + filename = os.path.join( + self.bandsteering_dir, + f"{device_name}_bandsteering.csv" + ) + + self.band_csv_files[device_name] = filename + logger.info("[BANDSTEERING] Creating CSV for device: %s -> %s", device_name, filename) + headers = [ + "timestamp", + "device_type", + "device_name", + "total_urls", + "uc_min", + "uc_avg", + "uc_max", + "total_err", + "time_to_target_urls", + "cx_name", + "bssid", + "channel", + "from_coordinate", + "to_coordinate", + "robot_x", + "robot_y" + ] + + with open(filename, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(headers) + + def write_bandsteering_row(self, row): + """ + Append a single row of bandsteering metrics to the device-specific CSV. + + Args: + row (dict): Dictionary containing device metrics such as: + - timestamp + - device_name + - total_urls + - latency stats (uc_min, uc_avg, uc_max) + - errors + - coordinates and robot position + """ + + dev = row["device_name"] + self.init_bandsteering_csv(dev) + + with open(self.band_csv_files[dev], "a", newline="") as f: + writer = csv.writer(f) + writer.writerow([ + row["timestamp"], + row["device_type"], + row["device_name"], + row["total_urls"], + row["uc_min"], + row["uc_avg"], + row["uc_max"], + row["total_err"], + row["time_to_target_urls"], + row["cx_name"], + row["bssid"], + row.get("channel", "NA"), + row["from_coordinate"], + row["to_coordinate"], + row["robot_x"], + row["robot_y"] + ]) + + def collect_bandsteering_values(self): + """ + Collect stats for all devices and RETURN rows. + """ + rows = [] + ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + # ROBOT POSE + if self.robo_obj: + robot_x, robot_y, from_cord, to_cord = self.robo_obj.get_robot_pose() + else: + robot_x = robot_y = from_cord = to_cord = None + + bssid_map = self.get_bssid_map() + port_data = self.local_realm.json_get("port/list?fields=channel") + channel_map = {} + for iface in port_data.get("interfaces", []): + channel_map.update({ + k: v.get("channel", "NA") + for k, v in iface.items() + }) + + # LAPTOPS + for sta in self.laptops or []: + try: + eid = self.name_to_eid(sta) + resp = self.local_realm.json_get( + f"/resource/{eid[0]}/{eid[1]}/list?fields=hostname" + ) + hostname = resp["resource"].get("hostname", sta) + except Exception: + hostname = sta + + stats = self.laptop_stats.get(hostname, {}) + + if hostname not in self.time_to_target and stats.get("total_urls", 0) >= self.count: + self.time_to_target[hostname] = ( + datetime.now() - self.test_start_time + ).total_seconds() + + rows.append({ + "timestamp": ts, + "device_type": "laptop", + "device_name": hostname, + "total_urls": stats.get("total_urls", 0), + "uc_min": stats.get("uc_min", 0), + "uc_avg": stats.get("uc_avg", 0), + "uc_max": stats.get("uc_max", 0), + "total_err": stats.get("total_err", 0), + "time_to_target_urls": self.time_to_target.get(hostname, 0), + "cx_name": "NA", + "bssid": bssid_map.get(hostname, "NA"), + "channel": channel_map.get(sta, "NA"), + "from_coordinate": from_cord, + "to_coordinate": to_cord, + "robot_x": robot_x, + "robot_y": robot_y + }) + + # MOBILES + data = self.local_realm.json_get( + "layer4/%s/list?fields=name,total-urls,uc-min,uc-avg,uc-max,total-err" % + ",".join(self.bandsteering_cx_list) + ) + # logger.info("[BANDSTEERING] Using CX list: %s", self.bandsteering_cx_list) + + endpoint_data = data.get("endpoint", []) + + # NORMALIZE FOR SINGLE OR MULTIPLE CX + if isinstance(endpoint_data, dict): + endpoint_data = [endpoint_data] + + for ep in endpoint_data: + if not isinstance(ep, dict): + continue + + # LANforge returns { cx_name : stats } + # HANDLE SINGLE vs MULTI CX + if "name" in ep: + # SINGLE CX case + cx_name = ep.get("name") + v = ep + else: + # MULTI CX case + cx_name = list(ep.keys())[0] + v = ep[cx_name] + + if not isinstance(v, dict): + continue + + # RESOURCE EXTRACTION + res_no = None + m = re.search(r'http(\d+)_l4', cx_name) + if m: + res_no = m.group(1) + else: + logger.error("[BANDSTEERING] Failed to extract resource from CX: %s", cx_name) + continue + + sta = f"1.{res_no}.wlan0" + + hostname = f"mobile_{res_no}" + if res_no: + if res_no not in self.robo_mobile_hostname_cache: + resp = self.local_realm.json_get( + f"resource/1/{res_no}/list?fields=user" + ) + user = resp["resource"].get("user", "").strip() + + if not user: + user = f"mobile_{res_no}" + + self.robo_mobile_hostname_cache[res_no] = user + + hostname = self.robo_mobile_hostname_cache[res_no] + + bssid = bssid_map.get(hostname, "NA") + + if hostname not in self.time_to_target and v.get("total-urls", 0) >= self.count: + self.time_to_target[hostname] = ( + datetime.now() - self.test_start_time + ).total_seconds() + + rows.append({ + "timestamp": ts, + "device_type": "mobile", + "device_name": hostname, + "total_urls": v.get("total-urls", 0), + "uc_min": float(v.get("uc-min", 0)) / 1000, + "uc_avg": float(v.get("uc-avg", 0)) / 1000, + "uc_max": float(v.get("uc-max", 0)) / 1000, + "total_err": v.get("total-err", 0), + "time_to_target_urls": self.time_to_target.get(hostname, 0), + "cx_name": cx_name, + + # ---- ROBOT related + "from_coordinate": from_cord, + "to_coordinate": to_cord, + "robot_x": robot_x, + "robot_y": robot_y, + "channel": channel_map.get(sta, "NA"), + "bssid": bssid + }) + logger.info("[DEBUG] CX BATCH: %s", self.bandsteering_cx_list) + return rows + + def bandsteering_monitor(self): + """ + Called every second while robot is moving. + Collects per-device stats and writes one CSV row. + """ + rows = self.collect_bandsteering_values() + logger.info("[DEBUG] CX BATCH: %s", self.cx_batch) + time.sleep(1) + return rows + + def write_live_webui_csv(self, rows): + """ + REQUIRED for WebUI live graphs. + WebUI reads ONLY real_time_data.csv. + """ + headers = [ + "device_type", + "device_name", + "total_urls", + "uc_min", + "uc_avg", + "uc_max", + "total_err", + "time_to_target_urls", + "cx_name" + ] + + with open("real_time_data.csv", "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(headers) + + for r in rows: + writer.writerow([ + r.get("device_type"), + r.get("device_name"), + r.get("total_urls", 0), + r.get("uc_min", 0), + r.get("uc_avg", 0), + r.get("uc_max", 0), + r.get("total_err", 0), + r.get("time_to_target_urls", 0), + r.get("cx_name", "NA"), + ]) + + def run_robo_bandsteering_test(self, cx_batch): + logging.info("[BANDSTEERING] Starting robo band-steering test") + # build cyclic coordinate list + base_coords = list(self.coordinates_list) + starting_coord = base_coords[0] + self.bandsteering_start_coord = base_coords[0] + self.bandsteering_end_coord = base_coords[-1] + + logging.info( + "[BANDSTEERING] Ensuring robot reaches starting coordinate %s before starting test", + starting_coord + ) + + pause, stopped = self.robo_obj.wait_for_battery() + if stopped: + return + + # check robo current position + try: + curr_x, curr_y, _, _ = self.robo_obj.get_robot_pose() + current_coord = self.robo_obj.current_coordinate + except Exception: + current_coord = None + + logging.info( + "[BANDSTEERING] Current robot coordinate: %s | CLI start coordinate: %s", + current_coord, starting_coord + ) + + # move only if needed + if current_coord != starting_coord: + logging.info( + "[BANDSTEERING] Robot not at starting coordinate. Moving to %s", + starting_coord + ) + + result = self.robo_obj.move_to_coordinate(coord=starting_coord) + + if isinstance(result, tuple): + _, stopped = result + else: + _, stopped = False, True + + if stopped: + logging.error( + "[BANDSTEERING] Failed to reach starting coordinate %s. Aborting test.", + starting_coord + ) + return + # if not reached: + # continue + + self.robo_obj.current_coordinate = starting_coord + else: + logging.info( + "[BANDSTEERING] Robot already at starting coordinate %s. No initial move needed.", + starting_coord + ) + self.robo_obj.current_coordinate = starting_coord + + logging.info( + "[BANDSTEERING] Robot reached starting coordinate %s. Ready to start band-steering.", + starting_coord + ) + + self.prev_coordinate = starting_coord + self.current_cord = starting_coord + + logging.info( + "[BANDSTEERING] Starting CX before first movement from %s", + starting_coord + ) + self.start_specific(cx_batch) + # self.http_profile.start_cx() + self.test_start_time = datetime.now() + cx_started = True + + self.robo_obj.total_cycles = self.cycles + self.robo_obj.coordinate_list = self.coordinates_list + self.coordinates_list = self.robo_obj.get_coordinates_list() + + logging.info( + "[BANDSTEERING] Final coordinate list: %s", + self.coordinates_list + ) + + self.time_to_target = {} + self.band_csv_files = {} + self.robo_mobile_hostname_cache = {} + # self.prev_coordinate = starting_coord + + # Directory to store bandsteering CSVs + self.bandsteering_dir = f"bandsteering_{datetime.now():%Y%m%d_%H%M%S}" + os.makedirs(self.bandsteering_dir, exist_ok=True) + if not hasattr(self, "phone_data") or not self.phone_data: + self.phone_data, _, _, _, _ = self.get_resource_data() + + self.bandsteering_cx_list = cx_batch.copy() + # cx_started = False + + try: + for _idx, coordinate in enumerate(self.coordinates_list): + logging.info(f"[BANDSTEERING] Moving to coordinate {coordinate}") + + self.current_cord = coordinate + pause, stopped = self.robo_obj.wait_for_battery() + if stopped: + break + + def monitor_wrapper(): + rows = self.collect_bandsteering_values() + + for row in rows: + if not row.get("device_name"): + continue + row["from_coordinate"] = self.prev_coordinate + row["to_coordinate"] = self.current_cord + + self.write_bandsteering_row(row) + + name = row["device_name"] + + if name not in self.laptop_stats: + self.laptop_stats[name] = { + "name": name, + "start_time": self.test_start_time.isoformat() + } + + self.laptop_stats[name].update({ + "total_urls": row.get("total_urls", 0), + "uc_min": row.get("uc_min", 0), + "uc_avg": row.get("uc_avg", 0), + "uc_max": row.get("uc_max", 0), + "total_err": row.get("total_err", 0), + "cx_name": row.get("cx_name", "NA"), + "current_cord": self.current_cord + }) + + # CSV FOR WEBUI GRAPHS + self.write_live_webui_csv(rows) + + self.prev_coordinate = self.current_cord + time.sleep(1) + return True + + # time.sleep(10) + result = self.robo_obj.move_to_coordinate( + coord=coordinate, + monitor_function=monitor_wrapper + ) + + # NORMALIZE REAL ROBO RETURN + if isinstance(result, tuple): + moved = result[0] + stopped = result[1] + else: + moved = False + stopped = True + + if stopped: + break + if not moved: + continue + + finally: + logging.info("[BANDSTEERING] Final coordinate reached, stopping CX") + if cx_started: + self.http_profile.stop_cx() + # self.clear_http_cx_data() + self.generic_endps_profile.stop_cx() + self.bandsteering_completed = True + logger.info("[BANDSTEERING] Bandsteering completed, CXs stopped") + + # CREATE NORMAL real_time_data.csv FOR REPORT + if self.bandsteering_completed: + logging.info("[BANDSTEERING] Creating real_time_data.csv for reporting") + + headers = [ + "device_type", + "device_name", + "total_urls", + "uc_min", + "uc_avg", + "uc_max", + "total_err", + "time_to_target_urls", + "cx_name" + ] + + latest_rows = {} + + # Read last row per device from bandsteering CSVs + for csv_file in self.band_csv_files.values(): + try: + df = pd.read_csv(csv_file) + if df.empty: + continue + last = df.iloc[-1] + latest_rows[last["device_name"]] = last + except Exception as e: + logging.warning(f"Failed reading {csv_file}: {e}") + + with open("real_time_data.csv", "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(headers) + + for row in latest_rows.values(): + writer.writerow([ + row["device_type"], + row["device_name"], + row["total_urls"], + row["uc_min"], + row["uc_avg"], + row["uc_max"], + row["total_err"], + row["time_to_target_urls"], + row["cx_name"] + ]) + + self.csv_file_names.append("real_time_data.csv") + + return + def run_test(self, available_resources): """ Runs the test with calculated parameters. @@ -1043,12 +1628,18 @@ def run_test(self, available_resources): cx_order_list = self.calculate_cx_order_list() cx_batch = cx_order_list[0] + if self.do_robo and self.do_bandsteering: + self.run_robo_bandsteering_test(cx_batch) + return - if self.do_robo: + if self.do_robo and not self.do_bandsteering: for coordinate in self.coordinates_list: # self.robo_obj.ensure_battery_for_test(duration_min=self.duration, mins_per_percent=self.mins_per_percent) self.robo_obj.wait_for_battery() - self.robo_obj.move_to_coordinate(coord=coordinate) + matched, abort = self.robo_obj.move_to_coordinate(coord=coordinate) + if not matched: + logging.warning(f"Failed to move to coordinate {coordinate}") + continue self.current_cord = coordinate if self.rotations_enabled: for angle in self.angles_list: @@ -1648,7 +2239,8 @@ def generate_test_setup_info(self): 'URL': self.url, 'Test Duration (min)': self.duration, } - + if self.do_bandsteering and 'Test Duration (min)' in test_setup_info: + del test_setup_info['Test Duration (min)'] return test_setup_info def generate_pass_fail_list(self, device_type_data, device_names, total_urls): @@ -1785,6 +2377,15 @@ def create_report(self, iot_summary=None): test_setup_info = with_iot_params_in_table(test_setup_info, iot_summary) report.test_setup_table( test_setup_data=test_setup_info, value='Test Parameters') + # Ensure real_time_data.csv is included before graph generation + if os.path.isfile("real_time_data.csv") and \ + "real_time_data.csv" not in self.csv_file_names: + self.csv_file_names.insert(0, "real_time_data.csv") + + # Keep only last iteration for normal full run + if not self.webui_stop_clicked and not self.do_bandsteering: + if len(self.csv_file_names) > 1: + self.csv_file_names = [self.csv_file_names[-1]] for i in range(0, len(self.csv_file_names)): @@ -1872,12 +2473,52 @@ def create_report(self, iot_summary=None): total_err_data = data['total_err'].tolist() else: raise ValueError("The 'total_err' column was not found in the CSV file.") + # bandsteering bssid section + if self.do_bandsteering: + self.add_bandsteering_bssid_section(report) + # robot charging timestamps section + if self.do_bandsteering: + charging_ts = getattr(self.robo_obj, "charging_timestamps", []) + + if charging_ts: + report.set_obj_html( + _obj_title="Robot Charging Timestamps", + _obj="" + ) + report.build_objective() + + df = pd.DataFrame( + charging_ts, + columns=[ + "charge_dock_arrival_timestamp", + "charging_completion_timestamp" + ] + ) + + # Add serial number column + df.insert(0, "S.No", range(1, len(df) + 1)) + + report.set_table_dataframe(df) + report.build_table() + else: + report.set_obj_html( + _obj_title="Robot Charging Timestamps", + _obj="Robot did not go to charge during this test" + ) + report.build_objective() report.set_table_title("Final Test Results") report.build_table_title() if self.selected_groups and self.selected_profiles: if self.expected_passfail_value or self.device_csv_name: - pass_fail_list, test_input_list = self.generate_pass_fail_list(device_type_data, device_names, total_urls) + + if self.webui_stop_clicked: + logging.info("[REPORT] WebUI stop detected. Skipping PASS/FAIL evaluation.") + pass_fail_list = ["NA"] * len(device_names) + test_input_list = ["NA"] * len(device_names) + else: + pass_fail_list, test_input_list = self.generate_pass_fail_list( + device_type_data, device_names, total_urls) final_test_results = { @@ -1921,13 +2562,28 @@ def create_report(self, iot_summary=None): continue report.set_table_title(f"{group} Test Results") report.build_table_title() + # Ensure all columns have equal length + max_len = max(len(v) for v in final_test_results.values()) + + for key in final_test_results: + if len(final_test_results[key]) < max_len: + diff = max_len - len(final_test_results[key]) + final_test_results[key].extend(["NA"] * diff) + test_results_df = pd.DataFrame(group_specific_test_results) report.set_table_dataframe(test_results_df) report.build_table() else: if self.expected_passfail_value or self.device_csv_name: - pass_fail_list, test_input_list = self.generate_pass_fail_list(device_type_data, device_names, total_urls) + if self.webui_stop_clicked: + logging.info("[REPORT] WebUI stop detected. Skipping PASS/FAIL evaluation.") + pass_fail_list = ["NA"] * len(device_names) + test_input_list = ["NA"] * len(device_names) + else: + pass_fail_list, test_input_list = self.generate_pass_fail_list( + device_type_data, device_names, total_urls) + final_test_results = { "Device Type": device_type_data, @@ -1964,6 +2620,14 @@ def create_report(self, iot_summary=None): "Link Speed": tx_rate_data, } + # Ensure all columns have equal length + max_len = max(len(v) for v in final_test_results.values()) + + for key in final_test_results: + if len(final_test_results[key]) < max_len: + diff = max_len - len(final_test_results[key]) + final_test_results[key].extend(["NA"] * diff) + test_results_df = pd.DataFrame(final_test_results) report.set_table_dataframe(test_results_df) report.build_table() @@ -1980,18 +2644,32 @@ def create_report(self, iot_summary=None): except Exception as e: logging.error(f"Error in create_report function {e}", exc_info=True) finally: - if not self.dowebgui: - source_dir = "." - destination_dir = self.report_path_date_time - self.csv_file_names.append('real_time_data.csv') + source_dir = os.getcwd() + destination_dir = self.report_path_date_time + + # Only move if report folder exists + if os.path.isdir(destination_dir): + + if 'real_time_data.csv' not in self.csv_file_names: + self.csv_file_names.append('real_time_data.csv') + for filename in self.csv_file_names: source_path = os.path.join(source_dir, filename) destination_path = os.path.join(destination_dir, filename) + if os.path.isfile(source_path): - shutil.move(source_path, destination_path) - logging.info(f"Moved {filename} to {destination_dir}") - else: - logging.info(f"{filename} not found in the current directory") + try: + shutil.move(source_path, destination_path) + logging.info(f"Moved {filename} to {destination_dir}") + except Exception as e: + logging.warning(f"Could not move {filename}: {e}") + + if self.do_bandsteering and hasattr(self, "bandsteering_dir"): + for fname in os.listdir(self.bandsteering_dir): + src = os.path.join(self.bandsteering_dir, fname) + dst = os.path.join(destination_dir, fname) + if os.path.isfile(src): + shutil.move(src, dst) def extract_device_data(self, file_path): # Load the CSV file @@ -2332,16 +3010,30 @@ def create_robo_report(self): test_setup_info = self.generate_test_setup_info() self.report.test_setup_table( test_setup_data=test_setup_info, value='Test Parameters') + # Ensure real_time_data.csv is included before graph generation + if os.path.isfile("real_time_data.csv") and \ + "real_time_data.csv" not in self.csv_file_names: + self.csv_file_names.insert(0, "real_time_data.csv") for coordinate in self.coordinates_list: if self.rotations_enabled: for angle in self.angles_list: csv_file = f"{coordinate}_{angle}_webBrowser.csv" + if not os.path.isfile(csv_file): + logging.warning(f"CSV file {csv_file} does not exist.") + continue self.create_robo_graphs_test_results(csv_file, coordinate, angle) else: csv_file = f"{coordinate}_webBrowser.csv" + if not os.path.isfile(csv_file): + logging.warning(f"CSV file {csv_file} does not exist.") + continue self.create_robo_graphs_test_results(csv_file, coordinate) + # bandsteering bssid section + if self.do_bandsteering: + self.add_bandsteering_bssid_section(self.report) + self.add_live_view_images_to_report() if self.dowebgui: @@ -2470,7 +3162,13 @@ def create_robo_graphs_test_results(self, csv_file, coordinate, angle=None): self.report.set_table_title(f"Final Test Results at coordinate {coordinate}:") self.report.build_table_title() if self.expected_passfail_value or self.device_csv_name: - pass_fail_list, test_input_list = self.generate_pass_fail_list(device_type_data, device_names, total_urls) + if self.webui_stop_clicked: + logging.info("[REPORT] WebUI stop detected. Skipping PASS/FAIL evaluation.") + pass_fail_list = ["NA"] * len(device_names) + test_input_list = ["NA"] * len(device_names) + else: + pass_fail_list, test_input_list = self.generate_pass_fail_list( + device_type_data, device_names, total_urls) final_test_results = { @@ -2514,6 +3212,104 @@ def create_robo_graphs_test_results(self, csv_file, coordinate, angle=None): except Exception as e: logging.error(f"Error in create_robo_graphs_test_results {e}", exc_info=True) + def add_bandsteering_bssid_section(self, report): + # for bssids things in pdf + report.set_table_title("Band Steering – BSSID Transition Analysis") + report.build_table_title() + + if not self.bssids: + return + + # normalize CLI BSSIDs + cli_bssids = [b.strip().upper() for b in self.bssids] + + for csv_file in self.band_csv_files.values(): + + if not os.path.exists(csv_file): + continue + + df = pd.read_csv(csv_file) + + if df.empty or "bssid" not in df.columns: + continue + + device_name = df["device_name"].iloc[0] + + # Normalize CSV + df["bssid"] = df["bssid"].astype(str).str.upper().str.strip() + + # Detect transitions from FULL CSV + df["prev_bssid"] = df["bssid"].shift() + + transition_mask = ( + (df["bssid"] != df["prev_bssid"]) & + (df["bssid"] != "NA") + ) + + transition_rows = df[transition_mask] + + # Count only CLI BSSIDs + bssid_counts = {b: 0 for b in cli_bssids} + transitions = [] + + for _, row in transition_rows.iterrows(): + curr_bssid = row["bssid"] + + if curr_bssid in cli_bssids: + bssid_counts[curr_bssid] += 1 + + transitions.append({ + "BSSID": curr_bssid, + "Timestamp": row.get("timestamp", "NA"), + "From Coordinate": row.get("from_coordinate", "NA"), + "To Coordinate": row.get("to_coordinate", "NA"), + "Channel": row.get("channel", "NA") + }) + + bssid_list = list(bssid_counts.keys()) + count_list = list(bssid_counts.values()) + + report.set_graph_title(f"BSSID Change Count – {device_name}") + report.build_graph_title() + + graph = lf_bar_graph_horizontal( + _data_set=[count_list], + _xaxis_name="Transition Count", + _yaxis_name="BSSID", + _yaxis_label=bssid_list, + _yaxis_categories=bssid_list, + _bar_height=0.25, + _show_bar_value=True, + _figsize=(18, max(4, len(bssid_list))), + _graph_title="BSSID Transitions", + _graph_image_name=f"{device_name}_bssid_transitions", + _label=["Transitions"] + ) + + graph_image = graph.build_bar_graph_horizontal() + report.set_graph_image(graph_image) + report.move_graph_image() + report.build_graph() + + report.set_table_title(f"Band Steering Results for {device_name}") + report.build_table_title() + + if not transitions: + first_row = df.iloc[0] + last_row = df.iloc[-1] + + transitions.append({ + "BSSID": first_row.get("bssid", "NA"), + "Timestamp": last_row.get("timestamp", "NA"), + "From Coordinate": first_row.get("from_coordinate", "NA"), + "To Coordinate": last_row.get("to_coordinate", "NA"), + "Channel": first_row.get("channel", "NA") + }) + + transition_df = pd.DataFrame(transitions) + report.set_table_dataframe(transition_df) + report.build_table() + def clear_http_cx_data(self): """Clears endpoint counters for all created HTTP connections.""" for cx in self.http_profile.created_cx: @@ -2678,6 +3474,7 @@ def main(): optional.add_argument('--iot_increment', type=str, default='', help='Comma-separated list of device counts to incrementally test (e.g., "1,3,5")') # ROBO ARGS + robo.add_argument('--duration_to_skip', help='Robot wait duration in seconds at obstacle', default="1") robo.add_argument('--robo_ip', type=str, help='Specify the robo ip') robo.add_argument( '--coordinates', @@ -2692,6 +3489,24 @@ def main(): '--do_robo', help="Specify this flag to perform the test with robo", action='store_true' ) + robo.add_argument( + "--do_bandsteering", + action="store_true", + help="Enable continuous robo band-steering test" + ) + robo.add_argument( + "--cycles", + type=int, + default=1, + help="Number of cycles to repeat coordinates for band-steering" + ) + robo.add_argument( + '--bssids', + type=str, + default='', + help='bssid values' + ) + args = parser.parse_args() if args.help_summary: print(help_summary) @@ -2760,7 +3575,11 @@ def main(): coordinates_list=args.coordinates, angles_list=args.rotations, do_robo=args.do_robo, + do_bandsteering=args.do_bandsteering, + cycles=args.cycles, + bssids=args.bssids, rotations_enabled=rotations_enabled, + duration_to_skip=args.duration_to_skip ) obj.change_port_to_ip() obj.validate_and_process_args() @@ -2826,7 +3645,11 @@ def main(): logger.error("An exception occurred:\n%s", tb_str) finally: if '--help' not in sys.argv and '-h' not in sys.argv: - if args.do_robo: + if args.do_robo and args.do_bandsteering: + if args.dowebgui: + obj.stop_webui_test() + obj.create_report(iot_summary=iot_summary) + elif args.do_robo: if args.dowebgui: obj.stop_webui_test() obj.create_robo_report()