diff --git a/py-scripts/lf_interop_ping_plotter.py b/py-scripts/lf_interop_ping_plotter.py index 105fd872a..ca5ef7aa7 100755 --- a/py-scripts/lf_interop_ping_plotter.py +++ b/py-scripts/lf_interop_ping_plotter.py @@ -67,6 +67,22 @@ python3 lf_interop_ping_plotter.py --mgr 192.168.204.74 --real --target 192.168.204.66 --ping_interval 1 --ping_duration 1m --group_name grp4,grp5 --profile_name Openz,Openz --file_name g219 --device_csv_name device.csv --server_ip 192.168.204.74 + EXAMPLE-14: + Command Line Interface to run ping plotter test with desired resources at desired points using robo + python3 lf_interop_ping_plotter.py --mgr 192.168.207.78 --real --target 8.8.8.8 --ping_interval 1 --ping_duration 1m --use_default_config + --robot_ip 192.168.204.76 --coordinate 3,4 + + EXAMPLE-15: + Command Line Interface to run ping plotter test with desired resources at desired points with rotations using robo + python3 lf_interop_ping_plotter.py --mgr 192.168.207.78 --real --target 8.8.8.8 --ping_interval 1 --ping_duration 1m --use_default_config + --robot_ip 192.168.204.76 --coordinate 3,4 --rotation 45,90 + + EXAMPLE-16: + Command Line Interface to run ping plotter test with desired resources at desired points with bandsteering using robo + python3 lf_interop_ping_plotter.py --mgr 192.168.207.78 --real --target 8.8.8.8 --ping_interval 1 --ping_duration 1m --use_default_config + --robot_ip 192.168.204.76 --coordinate 3,4 --do_bandsteering --total_cycles 3 --bssids 94:A6:7E:74:26:33,94:A6:7E:74:26:22 + + SCRIPT_CLASSIFICATION : Test SCRIPT_CATEGORIES: Performance, Functional, Report Generation @@ -89,6 +105,7 @@ Copyright (C) 2020-2026 Candela Technologies Inc. ''' + import argparse import time import sys @@ -111,10 +128,12 @@ from lf_base_interop_profile import RealDevice from datetime import datetime, timedelta -from lf_graph import lf_bar_graph_horizontal +from lf_graph import lf_bar_graph_horizontal, lf_bar_graph from lf_report import lf_report from station_profile import StationProfile from typing import List, Optional +from collections import Counter +from lf_base_robo import RobotClass # Importing DeviceConfig to apply device configurations for ADB devices and laptops DeviceConfig = importlib.import_module("py-scripts.DeviceConfig") from LANforge import LFUtils # noqa: E402 @@ -156,7 +175,8 @@ def __init__(self, csv_name=None, wait_time=60, floors=None, - get_live_view=None): + get_live_view=None, robo_ip=None, angle_list=None, coordinate_list=None, rotation_enabled=None, local_lf_report_dir=None, do_bandsteering=False, total_cycles=1, bssids=None, + duration_to_skip=None): super().__init__(lfclient_host=host, lfclient_port=port) self.host = host @@ -204,6 +224,28 @@ def __init__(self, self.floors = floors self.get_live_view = get_live_view + # variables related to robot + self.coordinate_list = coordinate_list if coordinate_list is not None else [] + self.rotation_enabled = rotation_enabled + self.last_rotated_angles = [] + self.robo_ip = robo_ip + self.angle_list = angle_list if rotation_enabled else [0] + self.currentangle = None + self.currentcoordinate = None + if robo_ip is not None: + self.robot = RobotClass(robo_ip=self.robo_ip, angle_list=self.angle_list) + self.robot.time_to_reach = duration_to_skip + self.robot.coordinate_list = coordinate_list + self.robot.total_cycles = total_cycles + self.coordinate_json = {} + self.coordinates_completed = [] + self.starttime_track = {} + self.local_lf_report_dir = local_lf_report_dir + self.pingduration = None + self.do_bandsteering = do_bandsteering + self.total_cycles = total_cycles + self.bssids = bssids if bssids else [] + def change_target_to_ip(self): # checking if target is an IP or a port @@ -214,7 +256,7 @@ def change_target_to_ip(self): try: target_port_ip = self.json_get('/port/{}/{}/{}?fields=ip'.format(shelf, resource, port))['interface']['ip'] self.target = target_port_ip - except BaseException: + except Exception: logging.warning('The target is not an ethernet port. Proceeding with the given target {}.'.format(self.target)) logging.info(self.target) else: @@ -455,7 +497,7 @@ def generate_remarks(self, station_ping_data): return (remarks) - def generate_uptime_graph(self): + def generate_uptime_graph(self, coordinate=None, angle=None): json_data = {} for station in self.result_json: json_data[station] = { @@ -555,13 +597,19 @@ def generate_uptime_graph(self): # Show the plot # plt.show() - plt.savefig("%s.png" % "uptime_graph", dpi=96) + # Generate filename dynamically based on provided coordinate and angle + filename = "uptime_graph.png" + if angle: + filename = "uptime_graph_{}_{}.png".format(coordinate, angle) + elif coordinate: + filename = "uptime_graph_{}.png".format(coordinate) + plt.savefig(filename, dpi=96) plt.close() logger.debug("{}.png".format("uptime_graph")) - return ("%s.png" % "uptime_graph") + return filename - def build_area_graphs(self, report_obj=None): + def build_area_graphs(self, report_obj=None, coordinate=None, angle=None): json_data = self.graph_values device_names = list(json_data.keys()) @@ -651,8 +699,14 @@ def build_area_graphs(self, report_obj=None): # set origin on x-axis if rtts != []: plt.ylim(0, max(rtts)) - plt.savefig("%s.png" % device_name, dpi=96) - graph_name = "%s.png" % device_name + # Generate filename dynamically based on provided coordinate and angle + filename = "{}.png".format(device_name) + if angle: + filename = "{}_{}_{}.png".format(device_name, coordinate, angle) + elif coordinate: + filename = "{}_{}.png".format(device_name, coordinate) + plt.savefig(filename, dpi=96) + graph_name = filename plt.close() logger.debug("{}.png".format(device_name)) @@ -669,6 +723,37 @@ def build_area_graphs(self, report_obj=None): # report.move_csv_file() report_obj.build_graph() + def check_stop_status(self): + """ + Check whether the currently running test has been stopped by the user. + + This function looks for a JSON file that tracks the running status of a test. + The file is expected to be located in the `Running_instances` directory and + named using the pattern: __running.json. + + Returns: + bool: + True -> If the test status exists and is not "Running" (i.e., stopped). + False -> If the file does not exist or the status is still "Running". + """ + test_name = self.ui_report_dir.split("/")[-1] + + file_path = os.path.join( + self.ui_report_dir, + "../../Running_instances/{}_{}_running.json".format(self.host, test_name)) + + if not os.path.exists(file_path): + return False + + with open(file_path, 'r') as f: + run_status = json.load(f) + # Check if 'status' key exists and if the test is no longer running + if 'status' in run_status.keys() and run_status["status"] != "Running": + logging.info("Test is stopped by the user") + return True + + return False + def store_csv(self, data=None): if data is None: data = self.result_json @@ -690,8 +775,17 @@ def store_csv(self, data=None): # for seq,rtt in device_data['rtts'].items(): # new_dict[((int(seq) -1) * interval + self.start_time).strftime("%d/%m/%Y %H:%M:%S")] = rtt data[device]['webui_rtts'] = new_dict - with open(self.ui_report_dir + '/runtime_ping_data.json', 'w') as f: - json.dump(data, f, indent=4) + # save runtime ping data based on robo_ip and bandsteering status + if (self.robo_ip and self.do_bandsteering) or (self.robo_ip is None): + with open(self.ui_report_dir + '/runtime_ping_data.json', 'w') as f: + json.dump(data, f, indent=4) + else: + filename = '{}_runtime_ping_data.json'.format(self.currentcoordinate) + filepath = os.path.join(self.ui_report_dir, filename) + data = self.coordinate_json[self.currentcoordinate] + with open(filepath, 'w') as f: + json.dump(data, f, indent=4) + test_name = self.ui_report_dir.split("/")[-1] with open(self.ui_report_dir + '/../../Running_instances/{}_{}_running.json'.format(self.host, test_name), 'r') as f: run_status = json.load(f) @@ -701,13 +795,28 @@ def store_csv(self, data=None): return True def set_webUI_stop(self): - with open(self.ui_report_dir + '/runtime_ping_data.json', 'r') as f: + if self.robo_ip is None or self.do_bandsteering: + filename = "runtime_ping_data.json" + else: + filename = "{}_runtime_ping_data.json".format(self.currentcoordinate) + + filepath = os.path.join(self.ui_report_dir, filename) + with open(filepath, 'r') as f: data = json.load(f) - if 'status' in data.keys() and data['status'] != 'Aborted': + if self.rotation_enabled: + data_for_lastangle = data[self.currentangle] + if 'status' in data_for_lastangle.keys() and data_for_lastangle['status'] != 'Aborted': + + data_for_lastangle['status'] = 'Completed' + data[self.currentangle] = data_for_lastangle + + with open(filepath, 'w') as f: + json.dump(data, f, indent=4) + elif 'status' in data.keys() and data['status'] != 'Aborted': data['status'] = 'Completed' - with open(self.ui_report_dir + '/runtime_ping_data.json', 'w') as f: + with open(filepath, 'w') as f: json.dump(data, f, indent=4) def copy_reports(self, report_path): @@ -895,6 +1004,12 @@ def generate_report(self, result_json=None, result_dir='Ping_Plotter_Test_Report 'No of Devices': '{} (V:{}, A:{}, W:{}, L:{}, M:{})'.format(len(self.sta_list), len(self.sta_list) - len(self.real_sta_list), self.android, self.windows, self.linux, self.mac), 'Duration': self.duration } + # if bandsteering is enabled + if self.do_bandsteering: + del test_setup_info["Duration"] + test_setup_info["Robot IP"] = self.robo_ip + test_setup_info["Selected Coordinates"] = ",".join(self.coordinate_list) + test_setup_info["no of cycles"] = self.total_cycles # Test setup information table for devices in groups else: group_names = ', '.join(config_devices.keys()) @@ -1017,6 +1132,9 @@ def generate_report(self, result_json=None, result_dir='Ping_Plotter_Test_Report }) report.set_table_dataframe(individual_report_df) report.build_table() + # Extract bandsteering graphs + if self.do_bandsteering: + self.get_bandsteering_stats(report) # packets sent vs received vs dropped report.set_table_title( @@ -1279,6 +1397,1185 @@ def add_ping_packet_images(self, report): report.set_custom_html(f'') report.build_custom() + def store_bandsteeringcsv(self): + """ + Collects band-steering data for the current monitor iteration. + + Iterates over result_json to extract the BSSID and channel for each connected + station, then fetches the robot's current pose to append spatial metadata. + If the robot has not moved (from_coordinate equals to_coordinate), the test + is flagged for termination. + + Returns: + tuple[list, bool]: + - individual_df_data (list): A flat list of values representing one + CSV row, ordered as: [bssid, channel, ...] per station, followed by + [timestamp, robot_x, robot_y, from_coordinate, to_coordinate]. + Returns a partial list (BSSID/channel only) if stop_status is True. + - stop_status (bool): True if the robot has stopped moving, signalling + the band-steering test should terminate. + """ + stop_status = False + individual_df_data = [] + # Collect BSSID and channel for each station from the latest result snapshot. + # Skip the 'status' key as it is metadata, not a station entry. + for key, value in self.result_json.items(): + if key == 'status': + continue + individual_df_data.extend([value['bssid'], value['channel']]) + timestamp = datetime.now().strftime("%d/%m %I:%M:%S %p") + # Fetch the robot's current position and the coordinates it is travelling between + robot_x, robot_y, from_coordinate, to_coordinate = self.robot.get_robot_pose() + if from_coordinate == to_coordinate: + stop_status = True + return individual_df_data, stop_status + individual_df_data.extend([timestamp, robot_x, robot_y, from_coordinate, to_coordinate]) + + return individual_df_data, stop_status + + def monitor(self, individual_df, Devices=None, rtts=None, rtts_list=None, ping_stats=None, + duration=None, coord=None, angle=None, monitor_charge_time=None): + """ + Continuously monitors ping test results for all stations over a given duration. + + This method runs a polling loop that fetches live ping endpoint data, parses + RTT (round-trip time) values and dropped packet counts per station, and updates + the internal result_json. It also handles robot-assisted battery pausing and + optional WebUI-driven test abortion. For band-steering tests, it saves + per-iteration data to a CSV and returns early. + + Args: + individual_df (pd.DataFrame): DataFrame for accumulating band-steering data. + Passed through for band-steering mode; unused in standard ping monitoring. + Devices (RealDevice, optional): Device manager instance for fetching live + device metadata (MAC, IP, BSSID, etc.). Defaults to a new RealDevice + instance scoped to self.host if not provided. + rtts (dict, optional): Pre-initialized dict mapping station -> {seq: rtt}. + Overwritten internally at the start of each monitor call. + rtts_list (list, optional): Flat list of all RTT values seen so far. + Overwritten internally at the start of each monitor call. + ping_stats (dict, optional): Dict mapping station -> {sent, received, dropped} + lists. Overwritten internally at the start of each monitor call. + duration (float, optional): Test duration in seconds. Defaults to + self.pingduration * 60. + coord (tuple, optional): Current robot coordinate (x, y). Used when the + robot needs to re-navigate after a battery pause. + angle (float, optional): Current robot rotation angle in degrees. Used + when re-rotating after a battery pause. + monitor_charge_time (datetime, optional): Timestamp of the last battery + check. Managed internally; resets every 300 seconds. + + Returns: + tuple[pd.DataFrame, bool] or None: + - In band-steering mode: returns (individual_df, stop_status) where + stop_status is True if the WebUI requested an abort. + - In standard mode: returns None implicitly after the loop ends. + """ + logging.info("Monitoringcx") + if Devices is None: + Devices = RealDevice(manager_ip=self.host, selected_bands=[]) + stop_status = False + if duration is None: + duration = self.pingduration * 60 + loop_timer = 0 + # logging.info(self.result_json) + rtts = {} + rtts_list = [] + ping_stats = {} + for station in self.sta_list: + rtts[station] = {} + ping_stats[station] = { + 'sent': [], + 'received': [], + 'dropped': [] + } + monitor_charge_time = datetime.now() + + while (loop_timer <= duration): + + t_init = datetime.now() + try: + result_data = self.get_results() + if isinstance(result_data, dict): + if 'UNKNOWN' in result_data['name']: + raise ValueError("There are no valid generic endpoints to run the test") + else: + keys = [list(d.keys())[0] for d in result_data] + keys = [key for key in keys if 'UNKNOWN' not in key] + if len(keys) == 0: + raise ValueError("There are no valid generic endpoints to run the test") + except ValueError as e: + logger.info(result_data) + logger.error(e) + exit(0) + # logging.info(result_data) + # Robot battery management — check every 300 seconds + if self.robot is not None and not self.do_bandsteering and monitor_charge_time is not None: + if ((datetime.now() - monitor_charge_time).total_seconds() >= 300): + pause_monitor, stop_test = self.robot.wait_for_battery(stop=self.stop_generic) + if stop_test: + break + if pause_monitor: + reached = self.robot.move_to_coordinate(coord) + if not reached: + continue + self.start_generic() + if self.rotation_enabled: + rotation = self.robot.rotate_angle(angle) + if not rotation: + break + monitor_charge_time = datetime.now() + + if self.real: + if isinstance(result_data, dict): + for station in self.real_sta_list: + Devices.get_devices() + current_device_data = Devices.devices_data[station] + # logging.info(current_device_data) + if station in result_data['name']: + # logging.info(result_data['last results'].split('\n')) + if len(result_data['last results']) != 0: + result = result_data['last results'].split('\n') + if len(result) > 1: + last_result = result[-2] + else: + last_result = result[-1] + else: + last_result = "" + + hw_version = current_device_data['hw version'] + if "Win" in hw_version: + os = "Windows" + elif "Linux" in hw_version: + os = "Linux" + elif "Apple" in hw_version: + os = "Mac" + else: + os = "Android" + + self.result_json[station] = { + 'command': result_data['command'], + 'sent': result_data['tx pkts'], + 'recv': result_data['rx pkts'], + 'dropped': result_data['dropped'], + 'mac': current_device_data['mac'], + 'ip': current_device_data['ip'], + 'bssid': current_device_data['ap'], + 'ssid': current_device_data['ssid'], + 'channel': current_device_data['channel'], + 'mode': current_device_data['mode'], + 'name': [current_device_data['user'] if current_device_data['user'] != '' else current_device_data['hostname']][0], + 'os': os, + 'remarks': [], + 'last_result': [last_result][0] + } + ping_stats[station]['sent'].append(result_data['tx pkts']) + ping_stats[station]['received'].append(result_data['rx pkts']) + ping_stats[station]['dropped'].append(result_data['dropped']) + self.result_json[station]['ping_stats'] = ping_stats[station] + if len(result_data['last results']) != 0: + temp_last_results = result_data['last results'].split('\n')[0: len(result_data['last results']) - 1] + drop_count = 0 # let dropped = 0 initially + dropped_packets = [] + # sample result - 64 bytes from 192.168.1.61: icmp_seq=28 time=3.66 ms *** drop: 0 (0, 0.000) rx: 28 fail: 0 bytes: 1792 min/avg/max: 2.160/3.422/5.190 + for result in temp_last_results: + try: + # fetching the first part of the last result e.g., 64 bytes from 192.168.1.61: icmp_seq=28 time=3.66 ms into t_result and the remaining part into t_fail + t_result, t_fail = result.split('***') + except Exception: + continue + t_result = t_result.split() + seq_number = None + rtt = None + if 'icmp_seq=' not in result or 'time=' not in result: + continue + for t_data in t_result: + if 'icmp_seq=' in t_data: + seq_number = int(t_data.strip('icmp_seq=')) + if 'time=' in t_data: + rtt = float(t_data.strip('time=')) + if seq_number is None or rtt is None: + logger.error(f"missing keys | t_result: {t_result} | full result: {result}") + continue + rtts[station][seq_number] = rtt + rtts_list.append(rtt) + + # finding dropped packets + t_fail = t_fail.split() # [' drop:', '0', '(0, 0.000)', 'rx:', '28', 'fail:', '0', 'bytes:', '1792', 'min/avg/max:', '2.160/3.422/5.190'] + t_drop_val = t_fail[1] # t_drop_val = '0' + t_drop_val = int(t_drop_val) # type cast string to int + if t_drop_val != drop_count: + current_drop_packets = t_drop_val - drop_count + drop_count = t_drop_val + for drop_packet in range(1, current_drop_packets + 1): + dropped_packets.append(seq_number - drop_packet) + + if rtts_list == []: + rtts_list = [0] + min_rtt = str(min(rtts_list)) + avg_rtt = str(sum(rtts_list) / len(rtts_list)) + max_rtt = str(max(rtts_list)) + self.result_json[station]['min_rtt'] = min_rtt + self.result_json[station]['avg_rtt'] = avg_rtt + self.result_json[station]['max_rtt'] = max_rtt + if self.result_json[station]['os'] == 'Android' and isinstance(rtts, dict) and rtts != {}: + if list(rtts[station].keys()) == []: + self.result_json[station]['sent'] = str(0) + self.result_json[station]['recv'] = str(0) + self.result_json[station]['dropped'] = str(0) + else: + self.result_json[station]['sent'] = str(max(list(rtts[station].keys()))) + self.result_json[station]['recv'] = str(len(rtts[station].keys())) + self.result_json[station]['dropped'] = str(int(self.result_json[station]['sent']) - int(self.result_json[station]['recv'])) + if len(rtts[station].keys()) != 0: + required_sequence_numbers = list(range(1, max(rtts[station].keys()))) + for seq in required_sequence_numbers: + if seq not in rtts[station].keys(): + if seq in dropped_packets: + rtts[station][seq] = 0 + else: + rtts[station][seq] = 0.11 + self.result_json[station]['rtts'] = rtts[station] + self.result_json[station]['remarks'] = self.generate_remarks(self.result_json[station]) + + else: + for station in self.real_sta_list: + Devices.get_devices() + current_device_data = Devices.devices_data[station] + # print('<<<<<<<<<<<<<<<<<<<', current_device_data) + for ping_device in result_data: + ping_endp, ping_data = list(ping_device.keys())[ + 0], list(ping_device.values())[0] + eid = str(ping_data['eid']) + self.sta_list = list(self.sta_list) + # Removing devices with UNKNOWN CX + if 'UNKNOWN' in ping_endp: + device_id = eid.split('.')[0] + '.' + eid.split('.')[1] + if device_id == station.split('.')[0] + '.' + station.split('.')[1]: + self.sta_list.remove(station) + self.real_sta_list.remove(station) + logger.info(result_data) + logger.info("Excluding {} from report as there is no valid generic endpoint creation during the test(UNKNOWN CX)".format(device_id)) + continue + if station in ping_endp: + if len(ping_data['last results']) != 0: + result = ping_data['last results'].split('\n') + if len(result) > 1: + last_result = result[-2] + else: + last_result = result[-1] + else: + last_result = "" + + hw_version = current_device_data['hw version'] + if "Win" in hw_version: + os = "Windows" + elif "Linux" in hw_version: + os = "Linux" + elif "Apple" in hw_version: + os = "Mac" + else: + os = "Android" + + self.result_json[station] = { + 'command': ping_data['command'], + 'sent': ping_data['tx pkts'], + 'recv': ping_data['rx pkts'], + 'dropped': ping_data['dropped'], + 'mac': current_device_data['mac'], + 'ip': current_device_data['ip'], + 'bssid': current_device_data['ap'], + 'ssid': current_device_data['ssid'], + 'channel': current_device_data['channel'], + 'mode': current_device_data['mode'], + 'name': [current_device_data['user'] if current_device_data['user'] != '' else current_device_data['hostname']][0], + 'os': os, + 'remarks': [], + 'last_result': [last_result][0] + } + ping_stats[station]['sent'].append(ping_data['tx pkts']) + ping_stats[station]['received'].append(ping_data['rx pkts']) + ping_stats[station]['dropped'].append(ping_data['dropped']) + self.result_json[station]['ping_stats'] = ping_stats[station] + if len(ping_data['last results']) != 0: + temp_last_results = ping_data['last results'].split('\n')[0: len(ping_data['last results']) - 1] + drop_count = 0 # let dropped = 0 initially + dropped_packets = [] + for result in temp_last_results: + # sample result - 64 bytes from 192.168.1.61: icmp_seq=28 time=3.66 ms *** drop: 0 (0, 0.000) rx: 28 fail: 0 bytes: 1792 min/avg/max: 2.160/3.422/5.190 + if 'time=' in result: + try: + # fetching the first part of the last result e.g., 64 bytes from 192.168.1.61: icmp_seq=28 time=3.66 ms into t_result and the remaining part into t_fail + t_result, t_fail = result.split('***') + except Exception: + continue + t_result = t_result.split() + seq_number = None + rtt = None + if 'icmp_seq=' not in result and 'time=' not in result: + continue + for t_data in t_result: + if 'icmp_seq=' in t_data: + seq_number = int(t_data.strip('icmp_seq=')) + if 'time=' in t_data: + rtt = float(t_data.strip('time=')) + if seq_number is None or rtt is None: + logger.error(f"missing variables | t_result: {t_result} | full result: {result}") + continue + rtts[station][seq_number] = rtt + rtts_list.append(rtt) + + # finding dropped packets + t_fail = t_fail.split() # [' drop:', '0', '(0, 0.000)', 'rx:', '28', 'fail:', '0', 'bytes:', '1792', 'min/avg/max:', '2.160/3.422/5.190'] + t_drop_val = t_fail[1] # t_drop_val = '0' + t_drop_val = int(t_drop_val) # type cast string to int + if t_drop_val != drop_count: + current_drop_packets = t_drop_val - drop_count + drop_count = t_drop_val + for drop_packet in range(1, current_drop_packets + 1): + dropped_packets.append(seq_number - drop_packet) + + if rtts_list == []: + rtts_list = [0] + min_rtt = str(min(rtts_list)) + avg_rtt = str(sum(rtts_list) / len(rtts_list)) + max_rtt = str(max(rtts_list)) + self.result_json[station]['min_rtt'] = min_rtt + self.result_json[station]['avg_rtt'] = avg_rtt + self.result_json[station]['max_rtt'] = max_rtt + if self.result_json[station]['os'] == 'Android' and isinstance(rtts, dict) and rtts != {}: + if list(rtts[station].keys()) == []: + self.result_json[station]['sent'] = str(0) + self.result_json[station]['recv'] = str(0) + self.result_json[station]['dropped'] = str(0) + else: + self.result_json[station]['sent'] = str(max(list(rtts[station].keys()))) + self.result_json[station]['recv'] = str(len(rtts[station].keys())) + self.result_json[station]['dropped'] = str(int(self.result_json[station]['sent']) - int(self.result_json[station]['recv'])) + if len(rtts[station].keys()) != 0: + required_sequence_numbers = list(range(1, max(rtts[station].keys()))) + for seq in required_sequence_numbers: + if seq not in rtts[station].keys(): + if seq in dropped_packets: + rtts[station][seq] = 0 + else: + rtts[station][seq] = 0.11 + # print(station, rtts[station]) + self.result_json[station]['rtts'] = rtts[station] + self.result_json[station]['remarks'] = self.generate_remarks(self.result_json[station]) + # self.result_json[station]['dropped_packets'] = dropped_packets + + # print("resultttjsonnn",self.result_json) + + # Robot result tracking (non-band-steering mode) + if self.robot is not None and not self.do_bandsteering: + self.track_resultjson() + + # WebUI abort check — stop if the user requested a test abort + if self.do_webUI: + if not self.store_csv(): + logging.info('Aborted test from webUI') + if self.do_bandsteering: + individual_data, stop_status = self.store_bandsteeringcsv() + stop_status = True + individual_df.to_csv('bandsteering_data.csv', index=False) + return individual_df, stop_status + break + + if self.do_bandsteering: + individual_data, stop_status = self.store_bandsteeringcsv() + if not stop_status: + individual_df.loc[len(individual_df)] = individual_data + individual_df.to_csv('bandsteering_data.csv', index=False) + return individual_df, stop_status + + time.sleep(1) + # loop_timer += 1 + # print(loop_timer) + t_end = datetime.now() + # print(t_end, abs(t_init - t_end).total_seconds()) + loop_timer += abs(t_init - t_end).total_seconds() + + def perform_robo(self, Devices, config_devices, group_device_map): + """ + Orchestrates a full robot-assisted ping test across a list of coordinates and angles. + + This method drives the robot through each coordinate in self.coordinate_list, + optionally rotating to each angle in self.angle_list at every coordinate. At each + position/angle combination it starts a generic CX, calls monitor() to collect ping + data for the configured duration, then stops the CX. After all coordinates are + visited, it generates the final test report. + + For band-steering tests, the flow is different: the robot iterates over its own + internal coordinate list (from self.robot.get_coordinates_list()), and monitor() + is called via callback as the robot moves. + + Args: + Devices (RealDevice): Device manager instance providing live device metadata. + config_devices (dict): Device configuration data passed through to + generate_report() when a group_name is set. + group_device_map (dict): Mapping of group name to devices, also forwarded + to generate_report() for grouped reporting. + + Returns: + None. Results are written to the report directory (and optionally copied + to the home directory if do_webUI is True). + """ + logging.info("Performing test using robo") + abort = False + if self.do_webUI: + # To generate nav_data.json in webui folder + base_dir = os.path.dirname(os.path.dirname(self.ui_report_dir)) + nav_data = os.path.join(base_dir, 'nav_data.json') + # Empty the json before test is initiated + with open(nav_data, "w") as file: + json.dump({}, file) + + self.robot.nav_data_path = nav_data + self.robot.runtime_dir = self.ui_report_dir + self.robot.ip = self.host + self.robot.testname = self.ui_report_dir.split("/")[-1] + + initial_duration = self.pingduration + stop_test = False + # Band-steering mode + if self.do_bandsteering: + reached = False + coordinate_list_with_robo = self.robot.get_coordinates_list() + if (len(coordinate_list_with_robo) == 0): + logging.config("Coordinate list is empty") + + self.robot.do_bandsteering = True + columns = [] + self.start_generic() + + for sta in self.real_sta_list: + client = sta.split('.')[0] + '.' + sta.split('.')[1] + columns.extend([f'BSSID {client}', f'Channel {client}']) + + columns.extend(['TIMESTAMP', 'Robot X', 'Robot Y', 'To Coordinate', 'From Coordinate']) + individual_df = pd.DataFrame(columns=columns) + + for coord in coordinate_list_with_robo: + pause, stopped = self.robot.wait_for_battery(lambda: self.monitor(individual_df)) + + if stopped: + break + # Move robot to point; monitor() is called as a callback during movement + matched, abort, all_dataframes = self.robot.move_to_coordinate( + coord, + monitor_function=lambda: self.monitor(individual_df)) + + if not matched: + logging.info("Skipped moving to point {}".format(coord)) + if abort: + break + + self.stop_generic() + + self.generate_report() + if self.do_webUI: + self.copy_reports_to_home_dir() + self.set_webUI_stop() + exit(1) + # Standard robot test — iterate over coordinates and angles + for coord in self.coordinate_list: + # WebUI stop request between coordinates + if self.do_webUI: + if self.check_stop_status(): + break + # Pause if battery is low, then navigate to the next coordinate + pause, stop_test = self.robot.wait_for_battery() + matched, abort = self.robot.move_to_coordinate(coord) + if abort: + break + self.coordinates_completed.append(coord) + self.currentcoordinate = coord + + ports_data_dict = self.json_get('/ports/all/')['interfaces'] + ports_data = {} + for ports in ports_data_dict: + port, port_data = list(ports.keys())[0], list(ports.values())[0] + ports_data[port] = port_data + + logging.info("rotationlist {}".format(self.angle_list)) + for j in range(len(self.angle_list)): + duration = initial_duration * 60 + rtts = {} + rtts_list = [] + ping_stats = {} + for station in self.sta_list: + rtts[station] = {} + ping_stats[station] = { + 'sent': [], + 'received': [], + 'dropped': [], + } + + self.result_json = {} + # Handle rotation if enabled — rotate before starting the CX + if self.rotation_enabled: + if self.do_webUI: + if self.check_stop_status(): + break + pause_angle, stop_test = self.robot.wait_for_battery(stop=self.stop_generic) + if stop_test: + break + if pause_angle: + reached = self.robot.move_to_coordinate(coord) + if not reached: + continue + self.start_generic() + + if self.rotation_enabled: + angle = self.angle_list[j] + rotation = self.robot.rotate_angle(angle) + if not rotation: + break + + self.currentangle = self.angle_list[j] + self.start_time = datetime.now() + self.start_generic() + monitor_charge_time = datetime.now() + # Run the monitoring loop for the full duration at this position/angle + self.monitor( + individual_df=None, + Devices=Devices, + rtts=rtts, + rtts_list=rtts_list, + ping_stats=ping_stats, + duration=duration, + coord=coord, + angle=angle if self.rotation_enabled else None, + monitor_charge_time=monitor_charge_time + ) + # time.sleep(duration * 60) + + logging.info('Stopping the cx') + self.stop_generic() + # self.clear_counter_endp() + + if self.do_webUI: + self.copy_reports_to_home_dir() + self.set_webUI_stop() + # Generate the final report + if self.local_lf_report_dir == "": + if self.group_name: + self.generate_report_robo(config_devices=config_devices, group_device_map=group_device_map) + else: + self.generate_report_robo() + else: + if self.group_name: + self.generate_report_robo(config_devices=config_devices, group_device_map=group_device_map, report_path=self.local_lf_report_dir) + else: + self.generate_report_robo(report_path=self.local_lf_report_dir) + + def track_resultjson(self): + """ + Updates two internal dictionaries — self.coordinate_json and self.starttime_track — + using the robot's current coordinate (and optionally current angle) as keys. This + allows per-position (and per-orientation) ping results to be accumulated across the + full test run and later used for report generation or real-time WebUI updates. + + When rotation is enabled, results are stored at a two-level depth: + coordinate -> angle -> result_json + + When rotation is disabled, results are stored at a single level: + coordinate -> result_json + + The start time for each coordinate/angle slot is recorded only once (on first visit), + so subsequent monitor() iterations at the same position don't overwrite it. + + Args: + None. Reads from self.currentcoordinate, self.currentangle (if rotation enabled), + self.result_json, and self.start_time. + + Returns: + None. Mutates self.coordinate_json and self.starttime_track in place. + """ + # Initialize a slot for this coordinate if it hasn't been visited yet + if self.currentcoordinate not in self.coordinate_json: + self.coordinate_json[self.currentcoordinate] = {} + self.starttime_track[self.currentcoordinate] = {} + if self.rotation_enabled: + if self.currentangle not in self.coordinate_json[self.currentcoordinate]: + self.coordinate_json[self.currentcoordinate][self.currentangle] = {} + self.starttime_track[self.currentcoordinate][self.currentangle] = {} + # Overwrite the stored result for this coordinate with the latest json + self.coordinate_json[self.currentcoordinate][self.currentangle] = self.result_json + if 'starttime' not in self.starttime_track[self.currentcoordinate][self.currentangle]: + self.starttime_track[self.currentcoordinate][self.currentangle]['starttime'] = self.start_time + else: + # No rotation — store results directly under the coordinate key + # Record start time only on first visit to this coordinate + if 'starttime' not in self.starttime_track[self.currentcoordinate]: + self.starttime_track[self.currentcoordinate]['starttime'] = self.start_time + # Overwrite the stored result for this coordinate with the latest json + self.coordinate_json[self.currentcoordinate] = self.result_json + + def generate_report_robo(self, result_json=None, result_dir='Ping_Plotter_Test_Report', report_path='', config_devices='', group_device_map=None): + """ + Generates the final HTML and PDF ping plotter report for a robot test run. + + Iterates over every completed coordinate and angle combination, pulling per-position + ping results from self.coordinate_json, and builds a structured report containing: + - Test setup information table + - Per-coordinate/angle uptime graphs and individual ping plotter graphs + - Packet sent/received/dropped bar charts + - RTT (min/avg/max) bar charts and per-client RTT vs time area plots + - Individual client summary tables with optional pass/fail status + - A notes table for devices that generated remarks + + Supports both individual device and group-based reporting, WebUI live-view image + injection, and optional pass/fail evaluation against expected packet loss thresholds. + + Args: + result_json (dict, optional): Override for self.result_json. If provided, + replaces the instance's result data before report generation. + result_dir (str): Name of the output directory for the report. + Defaults to 'Ping_Plotter_Test_Report'. + report_path (str): Base path where the report directory will be created. + Defaults to the current working directory. + config_devices (dict or str): Device-to-profile configuration map used + when generating group-based test setup info. Pass '' for individual + device mode. + group_device_map (dict, optional): Maps group names to their member devices. + Used to generate per-group client tables. Defaults to {}. + + Returns: + None. Writes interop_ping.html and interop_ping.pdf to the report directory. + """ + if group_device_map is None: + group_device_map = {} + if result_json is not None: + self.result_json = result_json + logging.info('Generating Report') + + report = lf_report(_output_pdf='interop_ping.pdf', + _output_html='interop_ping.html', + _results_dir_name=result_dir, + _path=report_path) + report_path = report.get_path() + report_path_date_time = report.get_path_date_time() + logging.info('path: {}'.format(report_path)) + logging.info('path_date_time: {}'.format(report_path_date_time)) + + # setting report title + report.set_title('Ping Plotter Test Report') + report.build_banner() + + # test setup info + if self.do_webUI: + self.real_sta_list = self.sta_list + for resource in self.real_sta_list: + shelf, r_id, _ = resource.split('.') + url = 'http://{}:{}/resource/{}/{}?fields=hw version'.format(self.host, self.port, shelf, r_id) + hw_version = requests.get(url) + hw_version = hw_version.json() + if 'resource' in hw_version.keys(): + hw_version = hw_version['resource'] + if 'hw version' in hw_version.keys(): + hw_version = hw_version['hw version'] + print(hw_version) + if 'Win' in hw_version: + self.windows += 1 + elif 'Lin' in hw_version: + self.linux += 1 + elif 'Apple' in hw_version: + self.mac += 1 + else: + self.android += 1 + else: + logging.warning('Malformed response for hw version query on resource manager.') + else: + logging.warning('Malformed response for hw version query on resource manager.') + # Test setup information table for devices in device list + if config_devices == '': + test_setup_info = { + 'SSID': [self.ssid if self.ssid else 'TEST CONFIGURED'][0], + 'Security': [self.security if self.ssid else 'TEST CONFIGURED'][0], + 'Website / IP': self.target, + 'No of Devices': '{} (V:{}, A:{}, W:{}, L:{}, M:{})'.format(len(self.sta_list), len(self.sta_list) - len(self.real_sta_list), self.android, self.windows, self.linux, self.mac), + 'Duration': self.duration, + 'Selected Coordinates': ",".join(self.coordinate_list) + } + if self.rotation_enabled: + test_setup_info['Selected Rotations'] = ",".join(self.angle_list) + # Test setup information table for devices in groups + else: + group_names = ', '.join(config_devices.keys()) + profile_names = ', '.join(config_devices.values()) + configmap = "Groups:" + group_names + " -> Profiles:" + profile_names + test_setup_info = { + 'Configuration': configmap, + 'Website / IP': self.target, + 'No of Devices': '{} (V:{}, A:{}, W:{}, L:{}, M:{})'.format(len(self.sta_list), len(self.sta_list) - len(self.real_sta_list), self.android, self.windows, self.linux, self.mac), + 'Duration': self.duration + } + report.test_setup_table( + test_setup_data=test_setup_info, value='Test Setup Information') + + # objective and description + report.set_obj_html(_obj_title='Objective', + _obj='''Candela Ping Plotter Test assesses the network connectivity for specified clients by measuring Round + Trip data packet Travel time. It also detects issues like packet loss, delays, and + response time variations, ensuring effective device communication and identifying + connectivity problems. + ''') + report.build_objective() + + if self.do_webUI and self.get_live_view: + self.add_ping_packet_images(report=report) + + # Main report loop — one section per coordinate + angle combination + last_interation = False + for coord in self.coordinates_completed: + for angle in self.angle_list: + # Detect the final iteration so we can break the outer loop + if coord == len(self.coordinates_completed) - 1 and self.angle_list[angle] == self.currentangle: + last_interation = True + # Set the section title and load the result snapshot for this position + if self.rotation_enabled: + report.set_table_title("Coordinate {} : Angle {}".format(coord, angle)) + report.build_table_title() + self.result_json = self.coordinate_json[coord][angle] + self.start_time = self.starttime_track[coord][angle]['starttime'] + else: + report.set_table_title("Coordinate {}".format(coord)) + report.build_table_title() + self.result_json = self.coordinate_json[coord] + self.start_time = self.starttime_track[coord]['starttime'] + # graph for the above + self.packets_sent = [] + self.packets_received = [] + self.packets_dropped = [] + self.packet_loss_percent = [] + # self.client_unrechability_percent = [] + self.device_names = [] + self.device_modes = [] + self.device_channels = [] + self.device_min = [] + self.device_max = [] + self.device_avg = [] + self.device_mac = [] + self.device_ips = [] + self.device_bssid = [] + self.device_ssid = [] + self.device_names_with_errors = [] + self.devices_with_errors = [] + self.report_names = [] + self.remarks = [] + # packet_count_data = {} + if self.do_webUI and 'status' in self.result_json.keys(): + del self.result_json['status'] + + for device, device_data in self.result_json.items(): + print("deviceeee", device, "deviceedataaa", device_data) + self.packets_sent.append(int(device_data['ping_stats']['sent'][-1])) + self.packets_received.append(int(device_data['ping_stats']['received'][-1])) + self.packets_dropped.append(int(device_data['ping_stats']['dropped'][-1])) + self.device_names.append(device_data['name']) + self.device_modes.append(device_data['mode']) + self.device_channels.append(device_data['channel']) + self.device_mac.append(device_data['mac']) + self.device_ips.append(device_data['ip']) + self.device_bssid.append(device_data['bssid']) + self.device_ssid.append(device_data['ssid']) + if float(device_data['sent']) == 0: + self.packet_loss_percent.append(0) + # self.client_unrechability_percent.append(0) + else: + if device_data['ping_stats']['sent'] == [] or float(device_data['ping_stats']['sent'][-1]) == 0: + self.packet_loss_percent.append(0) + else: + self.packet_loss_percent.append(float(device_data['ping_stats']['dropped'][-1]) / float(device_data['ping_stats']['sent'][-1]) * 100) + # self.client_unrechability_percent.append(float(device_data['dropped']) / (float(self.duration) * 60) * 100) + t_rtt_values = sorted(list(device_data['rtts'].values())) + if t_rtt_values != []: + while (0.11 in t_rtt_values): + t_rtt_values.remove(0.11) + self.device_avg.append(float(sum(t_rtt_values) / len(t_rtt_values))) + self.device_min.append(float(min(t_rtt_values))) + self.device_max.append(float(max(t_rtt_values))) + else: + self.device_avg.append(0) + self.device_min.append(0) + self.device_max.append(0) + # self.device_avg.append(float(sum(t_rtt_values) / len(t_rtt_values))) + # self.device_min.append(float(device_data['min_rtt'].replace(',', ''))) + # self.device_max.append(float(device_data['max_rtt'].replace(',', ''))) + # self.device_avg.append(float(device_data['avg_rtt'].replace(',', ''))) + if device_data['os'] == 'Virtual': + self.report_names.append('{} {}'.format(device, device_data['os'])[0:25]) + else: + self.report_names.append('{} {} {}'.format(device, device_data['os'], device_data['name'])) + if device_data['remarks'] != []: + self.device_names_with_errors.append(device_data['name']) + self.devices_with_errors.append(device) + self.remarks.append(','.join(device_data['remarks'])) + logging.info('{} {} {}'.format(*self.packets_sent, + *self.packets_received, + *self.packets_dropped)) + logging.info('{} {} {}'.format(*self.device_min, + *self.device_max, + *self.device_avg)) + + logging.info('Generating Report') + + # uptime and downtime + report.set_table_title( + 'Individual Ping Plotter Graph for {} duration:'.format(self.duration) + ) + report.build_table_title() + # graph for above + if self.rotation_enabled: + uptime_graph = self.generate_uptime_graph(coordinate=coord, angle=angle) + else: + uptime_graph = self.generate_uptime_graph(coordinate=coord) + logging.info('uptime graph name {}'.format(uptime_graph)) + report.set_graph_image(uptime_graph) + + # need to move the graph image to the results directory + report.move_graph_image() + + # report.set_csv_filename(uptime_graph) + # report.move_csv_file() + report.build_graph() + + # individual client report table + report.set_table_title( + 'Individual client table report:' + ) + report.build_table_title() + if self.real: + # Calculating the pass/fail criteria when either expected_passfail_val or csv_name is provided + if self.expected_passfail_val or self.csv_name: + pass_fail_list, test_input_list = self.get_pass_fail_list() + # When groups are provided a seperate table will be generated for each group using generate_dataframe + if self.group_name: + for key, val in group_device_map.items(): + if self.expected_passfail_val or self.csv_name: + dataframe = self.generate_dataframe( + val, + self.report_names, + self.device_ips, + self.device_mac, + self.device_bssid, + self.device_ssid, + self.device_channels, + self.packets_sent, + self.packets_received, + self.packet_loss_percent, + test_input_list, + self.device_avg, + pass_fail_list) + else: + dataframe = self.generate_dataframe( + val, + self.report_names, + self.device_ips, + self.device_mac, + self.device_bssid, + self.device_ssid, + self.device_channels, + self.packets_sent, + self.packets_received, + self.packet_loss_percent, + [], + self.device_avg, + []) + if dataframe: + report.set_obj_html("", "Group: {}".format(key)) + report.build_objective() + dataframe1 = pd.DataFrame(dataframe) + report.set_table_dataframe(dataframe1) + report.build_table() + else: + individual_report_df = pd.DataFrame({ + 'Wireless Client': self.report_names, + 'IP Address': self.device_ips, + 'MAC': self.device_mac, + 'BSSID': self.device_bssid, + 'SSID': self.device_ssid, + 'Channel': self.device_channels, + 'Packets Sent': self.packets_sent, + 'Packets Received': self.packets_received, + 'Packet Loss %': self.packet_loss_percent, + 'AVG RTT (ms)': self.device_avg, + }) + if self.expected_passfail_val or self.csv_name: + individual_report_df['Expected Packet loss %'] = test_input_list + individual_report_df['Status '] = pass_fail_list + report.set_table_dataframe(individual_report_df) + report.build_table() + else: + individual_report_df = pd.DataFrame({ + 'Wireless Client': self.report_names, + 'IP Address': self.device_ips, + 'MAC': self.device_mac, + 'BSSID': self.device_bssid, + 'SSID': self.device_ssid, + 'Channel': self.device_channels, + 'Packets Sent': self.packets_sent, + 'Packets Received': self.packets_received, + 'Packet Loss %': self.packet_loss_percent, + 'AVG RTT (ms)': self.device_avg, + # 'Client Unrechability %': self.client_unrechability_percent + }) + report.set_table_dataframe(individual_report_df) + report.build_table() + + # packets sent vs received vs dropped + report.set_table_title( + 'Packets sent vs packets received vs packets dropped') + report.build_table_title() + x_fig_size = 20 + y_fig_size = len(self.device_names) * .5 + 4 + graph = lf_bar_graph_horizontal(_data_set=[self.packets_dropped, self.packets_received, self.packets_sent], + _xaxis_name='Packets Count', + _yaxis_name='Wireless Clients', + _label=[ + 'Packets Loss', 'Packets Received', 'Packets Sent'], + _graph_image_name='Packets sent vs received vs dropped', + _yaxis_label=self.report_names, + _yaxis_categories=self.report_names, + _yaxis_step=1, + _yticks_font=8, + _graph_title='Packets sent vs received vs dropped', + _title_size=16, + _color=['lightgrey', + 'orange', 'steelblue'], + _color_edge=['black'], + _bar_height=0.15, + _figsize=(x_fig_size, y_fig_size), + _legend_loc="best", + _legend_box=(1.0, 1.0), + _dpi=96, + _show_bar_value=False, + _enable_csv=True, + _color_name=['lightgrey', 'orange', 'steelblue']) + if self.rotation_enabled: + graph.graph_image_name = 'Packets sent vs received vs dropped_{}_{}'.format(coord, angle) + else: + graph.graph_image_name = 'Packets sent vs received vs dropped_{}'.format(coord) + + graph_png = graph.build_bar_graph_horizontal() + logging.info('graph name {}'.format(graph_png)) + + report.set_graph_image(graph_png) + # need to move the graph image to the results directory + report.move_graph_image() + report.set_csv_filename(graph_png) + report.move_csv_file() + report.build_graph() + + dataframe1 = pd.DataFrame({ + 'Wireless Client': self.device_names, + 'MAC': self.device_mac, + 'Channel': self.device_channels, + 'Mode': self.device_modes, + 'Packets Sent': self.packets_sent, + 'Packets Received': self.packets_received, + 'Packets Loss': self.packets_dropped + }) + report.set_table_dataframe(dataframe1) + report.build_table() + + # packets rtt graph + report.set_table_title('Ping RTT Graph') + report.build_table_title() + + graph = lf_bar_graph_horizontal(_data_set=[self.device_min, self.device_avg, self.device_max], + _xaxis_name='Time (ms)', + _yaxis_name='Wireless Clients', + _label=[ + 'Min RTT (ms)', 'Average RTT (ms)', 'Max RTT (ms)'], + _graph_image_name='Ping RTT per client', + _yaxis_label=self.report_names, + _yaxis_categories=self.report_names, + _yaxis_step=1, + _yticks_font=8, + _graph_title='Ping RTT per client', + _title_size=16, + _color=['lightgrey', + 'orange', 'steelblue'], + _color_edge='black', + _bar_height=0.15, + _figsize=(x_fig_size, y_fig_size), + _legend_loc="best", + _legend_box=(1.0, 1.0), + _dpi=96, + _show_bar_value=False, + _enable_csv=True, + _color_name=['lightgrey', 'orange', 'steelblue']) + + if self.rotation_enabled: + graph.graph_image_name = 'Ping RTT per client_{}_{}'.format(coord, angle) + else: + graph.graph_image_name = 'Ping RTT per client_{}'.format(coord) + + graph_png = graph.build_bar_graph_horizontal() + logging.info('graph name {}'.format(graph_png)) + report.set_graph_image(graph_png) + # need to move the graph image to the results directory + report.move_graph_image() + report.set_csv_filename(graph_png) + report.move_csv_file() + report.build_graph() + + dataframe2 = pd.DataFrame({ + 'Wireless Client': self.device_names, + 'MAC': self.device_mac, + 'Channel': self.device_channels, + 'Mode': self.device_modes, + 'Min RTT (ms)': self.device_min, + 'Average RTT (ms)': self.device_avg, + 'Max RTT (ms)': self.device_max + }) + report.set_table_dataframe(dataframe2) + report.build_table() + + # realtime ping graphs + report.set_table_title('Individual RTT vs Time Plots:') + report.build_table_title() + + # graphs for above + if self.rotation_enabled: + self.build_area_graphs(report_obj=report, coordinate=coord, angle=angle) + else: + self.build_area_graphs(report_obj=report, coordinate=coord) + + # check if there are remarks for any device. If there are remarks, build table else don't + if self.remarks != []: + report.set_table_title('Notes') + report.build_table_title() + dataframe3 = pd.DataFrame({ + 'Wireless Client': self.device_names_with_errors, + 'Port': self.devices_with_errors, + 'Remarks': self.remarks + }) + report.set_table_dataframe(dataframe3) + report.build_table() + + if last_interation: + break + # closing + report.build_footer() + report.write_html() + report.write_pdf() + + if self.do_webUI: + self.copy_reports(report_path_date_time) + + def get_bandsteering_stats(self, report=None): + """ + Retrieves and adds bandsteering statistics to the report. + + This function processes the given dataframe to detect BSSID changes + (transitions) per device, maps them with corresponding channels, + and correlates them with robot movement (coordinates and timestamps). + It generates bar graphs for BSSID change counts and tabular reports + for band steering events. + + Args: + report: Report object used to build graphs and tables. + df (pd.DataFrame): Input dataframe containing timestamp, BSSID, + channel, and coordinate data. + + Returns: + None + """ + df = pd.read_csv("bandsteering_data.csv") + bssid_cols = [c for c in df.columns if c.startswith("BSSID")] + channel_cols = [c for c in df.columns if c.startswith("Channel")] + + bssid_to_channel = { + bssid_col: next( + ch for ch in channel_cols + if ch.replace("Channel", "").strip() == + bssid_col.replace("BSSID", "").strip() + ) + for bssid_col in bssid_cols + } + + # print("bssiddcolss",bssid_cols) + # print("bassidchannelcolss",bssid_to_channel) + for col in bssid_cols: + + channel_col = bssid_to_channel[col] + + # Detect BSSID changes + mask = df[col] != df[col].shift() + filtered_df = df.loc[mask] + + if self.bssids: + filtered_df = df.loc[mask & df[col].isin(self.bssids)] + + bssid_list = filtered_df[col].tolist() + channel_list = filtered_df[channel_col].tolist() + timestamp_list = filtered_df['TIMESTAMP'].tolist() + from_coordinate_list = filtered_df['From Coordinate'].tolist() + to_coordinate_list = filtered_df['To Coordinate'].tolist() + bssid_counts = Counter(bssid_list) + + x_axis = list(bssid_counts.keys()) # BSSID values + y_axis = [[float(i)] for i in list(bssid_counts.values())] + if len(self.bssids) > 0: + x_axis = self.bssids + y_axis = [[float(bssid_counts.get(bssid, 0))] for bssid in self.bssids] + device_name = col.replace('BSSID ', '') + device_name = col.split()[-1] + report.set_obj_html( + _obj_title=f"BSSID change count of the {device_name}", + _obj=" ") + report.build_objective() + graph = lf_bar_graph(_data_set=y_axis, + _xaxis_name="BSSID", + _yaxis_name="Number of Changes", + # _xaxis_categories = [", ".join(x_axis)], + _xaxis_categories=[""], + _xaxis_label=x_axis, + _graph_image_name=f"bssid_change_count_{device_name}", + _label=x_axis, + _xaxis_step=1, + _graph_title=f"BSSID change count – {device_name}", + _title_size=16, + _color_edge='black', + _bar_width=0.15, + _figsize=(18, 6), + _legend_loc="best", + _legend_box=(1.0, 1.0), + _dpi=96, + _show_bar_value=True, + _enable_csv=True, + _color=['orange', 'lightcoral', 'steelblue', 'lightgrey'], + _color_name=['orange', 'lightcoral', 'steelblue', 'lightgrey'], + + ) + + graph_png = graph.build_bar_graph() + report.set_graph_image(graph_png) + # need to move the graph image to the results directory + report.move_graph_image() + report.set_csv_filename(graph_png) + report.move_csv_file() + report.build_graph() + + report.set_obj_html( + _obj_title=f"Band Steering Results for {device_name}", + _obj=" ") + report.build_objective() + table_df = { + "Timestamp": timestamp_list, + "BSSID": bssid_list, + "Channel": channel_list, + "From Coordinate": from_coordinate_list, + "To Coordinate": to_coordinate_list + } + table_df = pd.DataFrame(table_df) + report.set_table_dataframe(table_df) + report.build_table() + def validate_args(args): # input sanity @@ -1414,6 +2711,23 @@ def main(): python3 lf_interop_ping_plotter.py --mgr 192.168.204.74 --real --target 192.168.204.66 --ping_interval 1 --ping_duration 1m --group_name grp4,grp5 --profile_name Openz,Openz --file_name g219 --device_csv_name device.csv --server_ip 192.168.204.74 + EXAMPLE-14: + Command Line Interface to run ping plotter test with desired resources at desired points using robo + python3 lf_interop_ping_plotter.py --mgr 192.168.207.78 --real --target 8.8.8.8 --ping_interval 1 --ping_duration 1m --use_default_config + --robot_ip 192.168.204.76 --coordinate 3,4 + + EXAMPLE-15: + Command Line Interface to run ping plotter test with desired resources at desired points with rotations using robo + python3 lf_interop_ping_plotter.py --mgr 192.168.207.78 --real --target 8.8.8.8 --ping_interval 1 --ping_duration 1m --use_default_config + --robot_ip 192.168.204.76 --coordinate 3,4 --rotation 45,90 + + EXAMPLE-16: + Command Line Interface to run ping plotter test with desired resources at desired points with bandsteering using robo + python3 lf_interop_ping_plotter.py --mgr 192.168.207.78 --real --target 8.8.8.8 --ping_interval 1 --ping_duration 1m --use_default_config + --robot_ip 192.168.204.76 --coordinate 3,4 --do_bandsteering --total_cycles 3 --bssids 94:A6:7E:74:26:33,94:A6:7E:74:26:22 + + + SCRIPT_CLASSIFICATION : Test SCRIPT_CATEGORIES: Performance, Functional, Report Generation @@ -1576,6 +2890,15 @@ def main(): default=0, help='specify the Number of floors there in the house') + # Arguments related to Robot + optional.add_argument('--robot_ip', help='hostname for where Robot server is running') + optional.add_argument('--coordinate', help="The coordinate dictionary consists points and their respective x and y values") + optional.add_argument('--rotation', help="The set of angles to rotate at a particular point") + optional.add_argument('--do_bandsteering', help='Enable bandsteering', action='store_true') + optional.add_argument('--total_cycles', help='Iterations', default="1") + optional.add_argument('--bssids', type=str, help='Comma separated list of BSSIDs to be used for the test', default="") + optional.add_argument("--duration_to_skip", type=int, help='Specify the maximum time in seconds to skip a point if there is an obstacle', default=60) + args = parser.parse_args() if args.help_summary: @@ -1629,6 +2952,12 @@ def main(): pk_passwd = args.pk_passwd pac_file = args.pac_file + # declarations related to robo testcase + robo_ip = args.robot_ip + angle_list = args.rotation.split(",") if args.rotation else [0] + coord_list = args.coordinate.split(",") if args.coordinate else [0] + rotation_enabled = bool(args.rotation) + if 's' in duration or 'S' in duration: if 's' in duration: duration = float(duration.replace('s', '')) / 60 @@ -1662,6 +2991,14 @@ def main(): print('--ui_report_dir argument is required when --do_webUI is specified') exit(0) + if do_webUI: + # To generate nav_data.json in webui folder + base_dir = os.path.dirname(os.path.dirname(ui_report_dir)) + nav_data = os.path.join(base_dir, 'nav_data.json') + # Empty the json before test is initiated + with open(nav_data, "w") as file: + json.dump({}, file) + debug = args.debug if debug: @@ -1685,8 +3022,10 @@ def main(): ping = Ping(host=mgr_ip, port=mgr_port, ssid=ssid, security=security, password=password, radio=radio, lanforge_password=mgr_password, target=target, interval=interval, sta_list=[], virtual=args.virtual, real=args.real, duration=report_duration, do_webUI=do_webUI, debug=debug, ui_report_dir=ui_report_dir, csv_name=args.device_csv_name, expected_passfail_val=args.expected_passfail_value, wait_time=args.wait_time, group_name=group_name, - floors=args.floors, get_live_view=args.get_live_view) - + floors=args.floors, get_live_view=args.get_live_view, robo_ip=robo_ip, rotation_enabled=rotation_enabled, coordinate_list=coord_list, angle_list=angle_list, + local_lf_report_dir=args.local_lf_report_dir, do_bandsteering=args.do_bandsteering, total_cycles=args.total_cycles, bssids=args.bssids.split(",") if args.bssids else [], + duration_to_skip=args.duration_to_skip) + ping.pingduration = duration # creating virtual stations if --virtual flag is specified if args.virtual: @@ -1699,6 +3038,8 @@ def main(): '[', '').replace(']', '').replace('\'', '')) # selecting real clients if --real flag is specified + config_devices = {} + group_device_map = {} if args.real: Devices = RealDevice(manager_ip=mgr_ip, selected_bands=[]) Devices.get_devices() @@ -1801,7 +3142,10 @@ def main(): ping.create_generic_endp() logging.info('{}'.format(*ping.generic_endps_profile.created_cx)) - + # Run robot test + if robo_ip: + ping.perform_robo(Devices, config_devices, group_device_map) + exit(1) # run the test for the given duration logging.info('Running the ping plotter test for {} minutes'.format(duration)) @@ -1886,7 +3230,7 @@ def main(): try: # fetching the first part of the last result e.g., 64 bytes from 192.168.1.61: icmp_seq=28 time=3.66 ms into t_result and the remaining part into t_fail t_result, t_fail = result.split('***') - except BaseException: + except Exception: continue t_result = t_result.split() if 'icmp_seq=' not in result and 'time=' not in result: @@ -1968,7 +3312,7 @@ def main(): try: # fetching the first part of the last result e.g., 64 bytes from 192.168.1.61: icmp_seq=28 time=3.66 ms into t_result and the remaining part into t_fail t_result, t_fail = result.split('***') - except BaseException: + except Exception: continue # first line of ping result t_result = t_result.split() if 'icmp_seq=' not in result and 'time=' not in result: @@ -2031,13 +3375,13 @@ def main(): hw_version = current_device_data['hw version'] if "Win" in hw_version: - os = "Windows" + ostype = "Windows" elif "Linux" in hw_version: - os = "Linux" + ostype = "Linux" elif "Apple" in hw_version: - os = "Mac" + ostype = "Mac" else: - os = "Android" + ostype = "Android" ping.result_json[station] = { 'command': result_data['command'], @@ -2051,7 +3395,7 @@ def main(): 'channel': current_device_data['channel'], 'mode': current_device_data['mode'], 'name': [current_device_data['user'] if current_device_data['user'] != '' else current_device_data['hostname']][0], - 'os': os, + 'os': ostype, 'remarks': [], 'last_result': [last_result][0] } @@ -2068,7 +3412,7 @@ def main(): try: # fetching the first part of the last result e.g., 64 bytes from 192.168.1.61: icmp_seq=28 time=3.66 ms into t_result and the remaining part into t_fail t_result, t_fail = result.split('***') - except BaseException: + except Exception: continue t_result = t_result.split() if 'icmp_seq=' not in result and 'time=' not in result: @@ -2149,13 +3493,13 @@ def main(): hw_version = current_device_data['hw version'] if "Win" in hw_version: - os = "Windows" + ostype = "Windows" elif "Linux" in hw_version: - os = "Linux" + ostype = "Linux" elif "Apple" in hw_version: - os = "Mac" + ostype = "Mac" else: - os = "Android" + ostype = "Android" ping.result_json[station] = { 'command': ping_data['command'], @@ -2169,7 +3513,7 @@ def main(): 'channel': current_device_data['channel'], 'mode': current_device_data['mode'], 'name': [current_device_data['user'] if current_device_data['user'] != '' else current_device_data['hostname']][0], - 'os': os, + 'os': ostype, 'remarks': [], 'last_result': [last_result][0] } @@ -2187,7 +3531,7 @@ def main(): try: # fetching the first part of the last result e.g., 64 bytes from 192.168.1.61: icmp_seq=28 time=3.66 ms into t_result and the remaining part into t_fail t_result, t_fail = result.split('***') - except BaseException: + except Exception: continue t_result = t_result.split() if 'icmp_seq=' not in result and 'time=' not in result: