diff --git a/py-scripts/lf_interop_qos.py b/py-scripts/lf_interop_qos.py
index 249bfe091..e213f5449 100755
--- a/py-scripts/lf_interop_qos.py
+++ b/py-scripts/lf_interop_qos.py
@@ -73,6 +73,15 @@
./lf_interop_qos.py --mgr 192.168.207.78 --upstream_port eth1 --security None --ssid "NETGEAR_2G_wpa2" --passwd "" --traffic_type lf_tcp --download 10000000
--upload 0 --test_duration 1m --tos VO,VI,BE,BK --device_list 1.13 --iot_test --iot_ip 127.0.0.1 --iot_port 8000 --iot_iterations 1
--iot_delay 5 --iot_device_list "switch.smart_plug_1_socket_1" --iot_testname "QosWithIot" --iot_increment ""
+
+ # Command Line Interface to run the QOS Test along with Robot by enabling rotation at the specified coordinates
+ ./lf_interop_qos.py --ap_name Cisco --mgr 192.168.207.78 --test_duration 1m --upstream_port eth1 --upload 1000000 --mgr_port 8080 --traffic_type lf_udp
+ --tos "VI,VO,BE,BK" --ssid DLI-LPC992 --passwd Password@123 --security wpa2 --robot_ip 192.168.204.101 --rotation 30,90 --coordinate 3,4 --robot_test
+
+ # Command Line Interface to run the QOS Test along with Robot at the specified coordinates without any rotation
+ ./lf_interop_qos.py --ap_name Cisco --mgr 192.168.207.78 --test_duration 1m --upstream_port eth1 --upload 1000000 --mgr_port 8080 --traffic_type lf_udp
+ --tos "VI,VO,BE,BK" --ssid DLI-LPC992 --passwd Password@123 --security wpa2 --robot_ip 192.168.204.101 --coordinate 3,4 --robot_test
+
SCRIPT_CLASSIFICATION:
Test
@@ -90,6 +99,7 @@
Copyright 2025 Candela Technologies Inc
"""
+from lf_base_robo import RobotClass
import time
import argparse
import sys
@@ -204,7 +214,13 @@ def __init__(self,
csv_name=None,
wait_time=60,
get_live_view=False,
- total_floors=0):
+ total_floors=0,
+ robot_test=False,
+ robot_ip=None,
+ coordinate=None,
+ rotation=None,
+ rotation_enabled=None,
+ angle_list=None):
super().__init__(lfclient_host=host,
lfclient_port=port)
self.ssid_list = []
@@ -291,6 +307,25 @@ def __init__(self,
self.config = config
self.get_live_view = get_live_view
self.total_floors = total_floors
+ self.qos_data = {}
+ # Initializing robot test parameters
+ self.robot_test = robot_test
+ if robot_test:
+ if self.dowebgui:
+ self.get_live_view = True
+ self.robot_ip = robot_ip
+ self.coordinate = coordinate
+ self.rotation = rotation
+ self.test_stopped_by_user = False
+ self.coordinate_list = coordinate.split(',')
+ self.rotation_list = rotation.split(',')
+ self.current_coordinate = None
+ self.current_angle = None
+ self.angle_list = angle_list
+ self.rotation_enabled = rotation_enabled
+ self.robot = RobotClass(robo_ip=self.robot_ip, angle_list=self.angle_list)
+ self.last_rotated_angles = []
+ self.charge_point_name = None
def os_type(self):
response = self.json_get("/resource/all")
@@ -664,10 +699,12 @@ def monitor_cx(self):
self.mac_id_list = list(self.mac_id_list)
self.num_stations = len(self.real_client_list)
- def monitor(self):
+ def monitor(self, curr_coordinate=None, curr_rotation=None, monitor_charge_time=None):
# TODO: Fix this. This is poor style
throughput, upload, download, upload_throughput, download_throughput, connections_upload, connections_download, avg_upload, avg_download, avg_upload_throughput, avg_download_throughput, connections_download_avg, connections_upload_avg, avg_drop_a, avg_drop_b, dropa_connections, dropb_connections = { # noqa: E501
}, [], [], [], [], {}, {}, [], [], [], [], {}, {}, [], [], {}, {}
+ if self.robot_test:
+ curr_coordinate = self.current_coordinate
# Initialized seperate variables for average values for report changes
drop_a, drop_a_per, drop_b, drop_b_per, avg_drop_b_per, avg_drop_a_per = [], [], [], [], [], []
if (self.test_duration is None) or (int(self.test_duration) <= 1):
@@ -680,8 +717,9 @@ def monitor(self):
print("Test started at: ", test_start_time)
print("Monitoring cx and endpoints")
end_time = start_time + timedelta(seconds=int(self.test_duration))
- self.overall = []
- self.df_for_webui = []
+ if not self.robot_test:
+ self.overall = []
+ self.df_for_webui = []
index = -1
connections_upload = dict.fromkeys(list(self.cx_profile.created_cx.keys()), float(0))
connections_download = dict.fromkeys(list(self.cx_profile.created_cx.keys()), float(0))
@@ -743,6 +781,35 @@ def monitor(self):
columns = ['bps rx a', 'bps rx b']
individual_device_data[cx] = pd.DataFrame(columns=columns)
while datetime.now() < end_time or getattr(self, "background_run", None):
+ if self.robot_test:
+ if self.rotation_enabled:
+ if (datetime.now() - monitor_charge_time).total_seconds() >= 300:
+ logger.info("Checking battery status (5-minute interval)...")
+ pause_start = datetime.now()
+ pause = False
+ # Check battery level: if below 20% robot charges fully before resuming
+ pause, test_stopped_by_user = self.robot.wait_for_battery(stop=self.stop)
+ if test_stopped_by_user:
+ break
+ if pause:
+ # Return robot to the last saved coordinate after charging
+ reached = self.robot.move_to_coordinate(curr_coordinate)
+ if not reached:
+ test_stopped_by_user = True
+ break
+ if self.rotation_enabled:
+ # Restore robot's previous orientation before resuming the test
+ rotation_moni = self.robot.rotate_angle(curr_rotation)
+ if not rotation_moni:
+ test_stopped_by_user = True
+ break
+ self.start(False, False)
+ pause_end = datetime.now()
+ charge_pause = pause_end - pause_start
+ end_time += charge_pause
+ # overall_end_time += charge_pause
+ previous_time = datetime.now()
+ monitor_charge_time = datetime.now()
index += 1
current_time = datetime.now()
# removed the fields query from endp so that the cx names will be given in the reponse as keys instead of cx_ids
@@ -837,6 +904,9 @@ def monitor(self):
total_hours) != 0 or int(remaining_minutes) != 0 else '<1 min'][0],
'status': 'Running'
})
+ # Appending current angle if rotation is enabled for the robot test
+ if self.robot_test and self.rotation_enabled:
+ self.overall[-1]["angle"] = self.current_angle
# Appending latest rx_rate, tx_rate, and signal (RSSI) values to the most recent self.overall entry
for col_keys, col_values in rates_data.items():
self.overall[-1].update({
@@ -864,6 +934,9 @@ def monitor(self):
total_hours) != 0 or int(remaining_minutes) != 0 else '<1 min'][0],
'status': 'Running'
})
+ # Appending current angle if rotation is enabled for the robot test
+ if self.robot_test and self.rotation_enabled:
+ self.overall[-1]["angle"] = self.current_angle
# Appending latest rx_rate, tx_rate, and signal (RSSI) values to the most recent self.overall entry
for col_keys, col_values in rates_data.items():
self.overall[-1].update({
@@ -890,6 +963,9 @@ def monitor(self):
total_hours) != 0 or int(remaining_minutes) != 0 else '<1 min'][0],
'status': 'Running'
})
+ # Appending current angle if rotation is enabled for the robot test
+ if self.robot_test and self.rotation_enabled:
+ self.overall[-1]["angle"] = self.current_angle
# Appending latest rx_rate, tx_rate, and signal (RSSI) values to the most recent self.overall entry
for col_keys, col_values in rates_data.items():
self.overall[-1].update({
@@ -905,11 +981,14 @@ def monitor(self):
for port, df in individual_device_data.items():
df.to_csv(f"{runtime_dir}/{port}.csv", index=False)
df1 = pd.DataFrame(self.df_for_webui)
- df1.to_csv('{}/overall_throughput.csv'.format(runtime_dir), index=False)
-
+ if not self.robot_test:
+ df1.to_csv('{}/overall_throughput.csv'.format(runtime_dir), index=False)
+ else:
+ df1.to_csv('{}/overall_throughput_{}.csv'.format(runtime_dir, curr_coordinate), index=False)
with open(runtime_dir + "/../../Running_instances/{}_{}_running.json".format(self.ip, self.test_name), 'r') as file:
data = json.load(file)
if data["status"] != "Running":
+ self.test_stopped_by_user = True
logger.warning('Test is stopped by the user')
break
# Adjust time_gap based on elapsed time since start (for webui)
@@ -953,6 +1032,17 @@ def monitor(self):
avg_download[ind].append(throughput[ind][0])
avg_drop_a[ind].append(throughput[ind][2])
avg_drop_b[ind].append(throughput[ind][3])
+
+ if self.robot_test and self.dowebgui:
+ last_entry = self.df_for_webui[-1].copy()
+ last_entry["status"] = "Stopped"
+ last_entry["timestamp"] = datetime.now().strftime("%d/%m %I:%M:%S %p")
+ last_entry["remaining_time"] = "0"
+ last_entry["end_time"] = last_entry["timestamp"]
+ self.df_for_webui.append(last_entry)
+ df1 = pd.DataFrame(self.df_for_webui)
+ df1.to_csv('{}/overall_throughput_{}.csv'.format(runtime_dir, curr_coordinate), index=False)
+
# # rx_rate list is calculated
for index, _key in enumerate(throughput):
upload[index].append(throughput[index][1])
@@ -1466,10 +1556,12 @@ def get_live_view_images(self, multicast_exists=False):
"""
image_paths_by_tos = {} # { "BE": [img1, img2, ...], "VO": [...], ... }
rssi_image_paths_by_floor = {} if not multicast_exists else {} # Empty if skipping RSSI
-
+ # Robot currently supports single-floor testing only, So setting the floors value to 1
+ if self.robot_test:
+ self.total_floors = 1
for floor in range(int(self.total_floors)):
for tos in self.tos:
- timeout = 60 # seconds
+ timeout = 200 # seconds
throughput_image_path = os.path.join(self.result_dir, "live_view_images", f"{self.test_name}_throughput_{tos}_{floor + 1}.png")
@@ -1486,7 +1578,7 @@ def get_live_view_images(self, multicast_exists=False):
break
if time.time() - start_time > timeout:
- print(f"Timeout: Images for TOS '{tos}' on Floor {floor + 1} not found within 60 seconds.")
+ print(f"Timeout: Images for TOS '{tos}' on Floor {floor + 1} not found within 200 seconds.")
break
time.sleep(1)
@@ -1499,7 +1591,7 @@ def get_live_view_images(self, multicast_exists=False):
return image_paths_by_tos, rssi_image_paths_by_floor
- def generate_individual_graph(self, res, report, connections_download_avg, connections_upload_avg, avg_drop_a, avg_drop_b, totalfloors=None, multicast_exists=False):
+ def generate_individual_graph(self, res, report, connections_download_avg, connections_upload_avg, avg_drop_a, avg_drop_b, totalfloors=None, multicast_exists=False, graph_no=''):
# Required when generate_individual_graph() called explicitly from mixed traffic
if totalfloors is not None:
self.total_floors = totalfloors
@@ -1669,7 +1761,7 @@ def generate_individual_graph(self, res, report, connections_download_avg, conne
_color_name=colors,
_show_bar_value=True,
_enable_csv=True,
- _graph_image_name="bk_{}".format(self.direction), _color_edge=['black'],
+ _graph_image_name="bk_{}{}".format(self.direction, graph_no), _color_edge=['black'],
_color=colors)
graph_png = graph.build_bar_graph_horizontal()
print("graph name {}".format(graph_png))
@@ -1680,11 +1772,12 @@ def generate_individual_graph(self, res, report, connections_download_avg, conne
report.move_csv_file()
report.build_graph()
if (self.dowebgui and self.get_live_view) or multicast_exists:
- for image_path in tos_images['BK']:
- report.set_custom_html('
')
- report.build_custom()
- report.set_custom_html(f'
')
- report.build_custom()
+ if not self.robot_test:
+ for image_path in tos_images['BK']:
+ report.set_custom_html('')
+ report.build_custom()
+ report.set_custom_html(f'
')
+ report.build_custom()
individual_avgupload_list = []
individual_avgdownload_list = []
for i in range(len(individual_upload_list)):
@@ -1798,7 +1891,7 @@ def generate_individual_graph(self, res, report, connections_download_avg, conne
_color_name=colors,
_show_bar_value=True,
_enable_csv=True,
- _graph_image_name="be_{}".format(self.direction), _color_edge=['black'],
+ _graph_image_name="be_{}{}".format(self.direction, graph_no), _color_edge=['black'],
_color=colors)
graph_png = graph.build_bar_graph_horizontal()
print("graph name {}".format(graph_png))
@@ -1809,11 +1902,12 @@ def generate_individual_graph(self, res, report, connections_download_avg, conne
report.move_csv_file()
report.build_graph()
if (self.dowebgui and self.get_live_view) or multicast_exists:
- for image_path in tos_images['BE']:
- report.set_custom_html('')
- report.build_custom()
- report.set_custom_html(f'
')
- report.build_custom()
+ if not self.robot_test:
+ for image_path in tos_images['BE']:
+ report.set_custom_html('')
+ report.build_custom()
+ report.set_custom_html(f'
')
+ report.build_custom()
individual_avgupload_list = []
individual_avgdownload_list = []
for i in range(len(individual_upload_list)):
@@ -1924,7 +2018,7 @@ def generate_individual_graph(self, res, report, connections_download_avg, conne
_show_bar_value=True,
_color_name=colors,
_enable_csv=True,
- _graph_image_name="video_{}".format(self.direction),
+ _graph_image_name="video_{}{}".format(self.direction, graph_no),
_color_edge=['black'],
_color=colors)
graph_png = graph.build_bar_graph_horizontal()
@@ -1936,11 +2030,12 @@ def generate_individual_graph(self, res, report, connections_download_avg, conne
report.move_csv_file()
report.build_graph()
if (self.dowebgui and self.get_live_view) or multicast_exists:
- for image_path in tos_images['VI']:
- report.set_custom_html('')
- report.build_custom()
- report.set_custom_html(f'
')
- report.build_custom()
+ if not self.robot_test:
+ for image_path in tos_images['VI']:
+ report.set_custom_html('')
+ report.build_custom()
+ report.set_custom_html(f'
')
+ report.build_custom()
individual_avgupload_list = []
individual_avgdownload_list = []
for i in range(len(individual_upload_list)):
@@ -2051,7 +2146,7 @@ def generate_individual_graph(self, res, report, connections_download_avg, conne
_show_bar_value=True,
_color_name=colors,
_enable_csv=True,
- _graph_image_name="voice_{}".format(self.direction),
+ _graph_image_name="voice_{}{}".format(self.direction, graph_no),
_color_edge=['black'],
_color=colors)
graph_png = graph.build_bar_graph_horizontal()
@@ -2063,11 +2158,12 @@ def generate_individual_graph(self, res, report, connections_download_avg, conne
report.move_csv_file()
report.build_graph()
if (self.dowebgui and self.get_live_view) or multicast_exists:
- for image_path in tos_images['VO']:
- report.set_custom_html('')
- report.build_custom()
- report.set_custom_html(f'
')
- report.build_custom()
+ if not self.robot_test:
+ for image_path in tos_images['VO']:
+ report.set_custom_html('')
+ report.build_custom()
+ report.set_custom_html(f'
')
+ report.build_custom()
individual_avgupload_list = []
individual_avgdownload_list = []
for i in range(len(individual_upload_list)):
@@ -2139,25 +2235,26 @@ def generate_individual_graph(self, res, report, connections_download_avg, conne
report.build_table()
logger.info("Graph and table for VO tos are built")
if self.dowebgui and self.get_live_view and not multicast_exists:
- for _floor, rssi_image_path in rssi_images.items():
- if os.path.exists(rssi_image_path):
- report.set_custom_html('')
- report.build_custom()
- report.set_custom_html(f'
')
- report.build_custom()
+ if not self.robot_test:
+ for _floor, rssi_image_path in rssi_images.items():
+ if os.path.exists(rssi_image_path):
+ report.set_custom_html('')
+ report.build_custom()
+ report.set_custom_html(f'
')
+ report.build_custom()
else:
print("No individual graph to generate.")
# storing overall throughput CSV in the report directory
logger.info('Storing real time values in a CSV')
df1 = pd.DataFrame(self.overall)
- df1.to_csv('{}/overall_throughput.csv'.format(report.path_date_time))
+ df1.to_csv('{}/overall_throughput{}.csv'.format(report.path_date_time, graph_no))
# storing real time data for CXs in seperate CSVs
for cx in self.real_time_data:
for tos in self.real_time_data[cx]:
if tos in self.tos and len(self.real_time_data[cx][tos]['time']) != 0:
try:
cx_df = pd.DataFrame(self.real_time_data[cx][tos])
- cx_df.to_csv('{}/{}_{}_realtime_data.csv'.format(report.path_date_time, cx, tos), index=False)
+ cx_df.to_csv('{}/{}_{}_realtime_data{}.csv'.format(report.path_date_time, cx, tos, graph_no), index=False)
except Exception:
logger.info(f'failed cx {cx} tos {tos}')
logger.info(f"overall Data {self.real_time_data}")
@@ -2225,6 +2322,376 @@ def copy_reports_to_home_dir(self):
os.makedirs(test_name_dir)
shutil.copytree(curr_path, test_name_dir, dirs_exist_ok=True)
+ def generate_individual_coordinate(self, report, data, connections_download_avg, connections_upload_avg, avg_drop_a, avg_drop_b, coordinate, angle):
+ """
+ Processes QoS data and generates a report (table and bar graph) for a
+ specific robot coordinate and rotation angle.
+ """
+ res = self.set_report_data(data)
+ data_set, load, res = self.generate_graph_data_set(data)
+ report.set_table_title(
+ f"Overall {self.direction} Throughput for all TOS i.e BK | BE | Video (VI) | Voice (VO)")
+ report.build_table_title()
+ df_throughput = pd.DataFrame(res["throughput_table_df"])
+ report.set_table_dataframe(df_throughput)
+ report.build_table()
+ for _key in res["graph_df"]:
+ report.set_obj_html(
+ _obj_title=f"Overall {self.direction} throughput for {len(self.input_devices_list)} clients with different TOS.",
+ _obj=f"The below graph represents overall {self.direction} throughput for all "
+ "connected stations running BK, BE, VO, VI traffic with different "
+ f"intended loads{load} per tos")
+ report.build_objective()
+ if self.rotation_enabled:
+ graph_image_name = f"tos_{_key}_coord{self.coordinate_list[coordinate]}_angle{self.rotation_list[angle]}Hz"
+ graph_title = f"Overall {self.direction} throughput – BK,BE,VO,VI traffic streams at Coordinate: {self.coordinate_list[coordinate]} | Rotation Angle: {self.rotation_list[angle]}°"
+ graph_no = "_{}_{}".format(self.coordinate_list[coordinate], self.rotation_list[angle])
+ else:
+ graph_image_name = f"tos_{_key}_coord{self.coordinate_list[coordinate]}Hz"
+ graph_title = f"Overall {self.direction} throughput – BK,BE,VO,VI traffic streams at Coordinate: {self.coordinate_list[coordinate]}"
+ graph_no = "_{}".format(self.coordinate_list[coordinate])
+ graph = lf_bar_graph(_data_set=data_set,
+ _xaxis_name="Load per Type of Service",
+ _yaxis_name="Throughput (Mbps)",
+ _xaxis_categories=["BK,BE,VI,VO"],
+ _xaxis_label=['1 Mbps', '2 Mbps', '3 Mbps', '4 Mbps', '5 Mbps'],
+ _graph_image_name=graph_image_name,
+ _label=["BK", "BE", "VI", "VO"],
+ _xaxis_step=1,
+ _graph_title=graph_title,
+ _title_size=16,
+ _color=['orange', 'lightcoral', 'steelblue', 'lightgrey'],
+ _color_edge='black',
+ _bar_width=0.15,
+ _figsize=(18, 6),
+ _legend_loc="best",
+ _legend_box=(1.0, 1.0),
+ _dpi=96,
+ _show_bar_value=True,
+ _enable_csv=True,
+ _color_name=['orange', 'lightcoral', 'steelblue', 'lightgrey'])
+ graph_png = graph.build_bar_graph()
+ print("graph name {}".format(graph_png))
+ report.set_graph_image(graph_png)
+ # need to move the graph image to the results directory
+ report.move_graph_image()
+ report.set_csv_filename(graph_png)
+ report.move_csv_file()
+ report.build_graph()
+ self.generate_individual_graph(res, report, connections_download_avg, connections_upload_avg, avg_drop_a, avg_drop_b, graph_no=graph_no)
+
+ def generate_report_for_robo(self, coordinate_list=None, angle_list=None, passed_coordinates=None):
+ """
+ Creates the final PDF and HTML report. It combines the results from every robot
+ coordinate into one document.
+ """
+ self.ssid_list = self.get_ssid_list(self.input_devices_list)
+ load = ''
+ rate_down = str(str(int(self.cx_profile.side_b_min_bps) / 1000000) + ' ' + 'Mbps')
+ rate_up = str(str(int(self.cx_profile.side_a_min_bps) / 1000000) + ' ' + 'Mbps')
+ if self.direction == 'Upload':
+ load = rate_up
+ else:
+ if self.direction == "Download":
+ load = rate_down
+ # res = self.set_report_data(data)
+ if self.direction == "Bi-direction":
+ load = 'Upload' + ':' + rate_up + ',' + 'Download' + ':' + rate_down
+ # if selected_real_clients_names is not None:
+ # self.num_stations = selected_real_clients_names
+ # data_set, load, res = self.generate_graph_data_set(data)
+ report = lf_report(_output_pdf="interop_qos.pdf", _output_html="interop_qos.html", _path=self.result_dir,
+ _results_dir_name="Qos_Test_report")
+ report_path = report.get_path()
+ report_path_date_time = report.get_path_date_time()
+ print("path: {}".format(report_path))
+ print("path_date_time: {}".format(report_path_date_time))
+ report.set_title("Interop QOS")
+ report.build_banner()
+ # objective title and description
+ report.set_obj_html(_obj_title="Objective",
+ _obj="The objective of the QoS (Quality of Service) traffic throughput test is to measure the maximum"
+ " achievable throughput of a network under specific QoS settings and conditions.By conducting"
+ " this test, we aim to assess the capacity of network to handle high volumes of traffic while"
+ " maintaining acceptable performance levels,ensuring that the network meets the required QoS"
+ " standards and can adequately support the expected user demands.")
+ report.build_objective()
+ # Initialize counts and lists for device types
+ android_devices, windows_devices, linux_devices, ios_devices, ios_mob_devices = 0, 0, 0, 0, 0
+ all_devices_names = []
+ device_type = []
+ total_devices = ""
+ for i in self.real_client_list:
+ split_device_name = i.split(" ")
+ if 'android' in split_device_name:
+ all_devices_names.append(split_device_name[2] + ("(Android)"))
+ device_type.append("Android")
+ android_devices += 1
+ elif 'Win' in split_device_name:
+ all_devices_names.append(split_device_name[2] + ("(Windows)"))
+ device_type.append("Windows")
+ windows_devices += 1
+ elif 'Lin' in split_device_name:
+ all_devices_names.append(split_device_name[2] + ("(Linux)"))
+ device_type.append("Linux")
+ linux_devices += 1
+ elif 'Mac' in split_device_name:
+ all_devices_names.append(split_device_name[2] + ("(Mac)"))
+ device_type.append("Mac")
+ ios_devices += 1
+ elif 'iOS' in split_device_name:
+ all_devices_names.append(split_device_name[2] + ("(iOS)"))
+ device_type.append("iOS")
+ ios_mob_devices += 1
+
+ # Build total_devices string based on counts
+ if android_devices > 0:
+ total_devices += f" Android({android_devices})"
+ if windows_devices > 0:
+ total_devices += f" Windows({windows_devices})"
+ if linux_devices > 0:
+ total_devices += f" Linux({linux_devices})"
+ if ios_devices > 0:
+ total_devices += f" Mac({ios_devices})"
+ if ios_mob_devices > 0:
+ total_devices += f" iOS({ios_mob_devices})"
+
+ # Test setup information table for devices in device list
+ if self.qos_data["configuration"] == "" or self.qos_data["configuration"] == {}:
+ test_setup_info = {
+ "Device List": ", ".join(all_devices_names),
+ "Number of Stations": "Total" + f"({self.num_stations})" + total_devices,
+ "AP Model": self.ap_name,
+ "SSID": self.ssid,
+ "Traffic Duration in hours": round(int(self.test_duration) / 3600, 2),
+ "Security": self.security,
+ "Protocol": (self.traffic_type.strip("lf_")).upper(),
+ "Traffic Direction": self.direction,
+ "TOS": self.tos,
+ "Per TOS Load in Mbps": load,
+ "Coordinates": self.coordinate_list
+ }
+ if self.rotation_enabled:
+ test_setup_info["Rotations"] = self.rotation_list
+ # Test setup information table for devices in groups
+ else:
+ group_names = ', '.join(self.qos_data["configuration"].keys())
+ profile_names = ', '.join(self.qos_data["configuration"].values())
+ configmap = "Groups:" + group_names + " -> Profiles:" + profile_names
+ test_setup_info = {
+ "AP Model": self.ap_name,
+ 'Configuration': configmap,
+ "Traffic Duration in hours": round(int(self.test_duration) / 3600, 2),
+ "Security": self.security,
+ "Protocol": (self.traffic_type.strip("lf_")).upper(),
+ "Traffic Direction": self.direction,
+ "TOS": self.tos,
+ "Per TOS Load in Mbps": load
+ }
+ report.test_setup_table(test_setup_data=test_setup_info, value="Test Configuration")
+ if self.dowebgui:
+ tos_for_report = self.tos
+ tos_images, rssi_images = self.get_live_view_images()
+ for tos_val in tos_for_report:
+ for image_path in tos_images[tos_val]:
+ report.set_custom_html('')
+ report.build_custom()
+ report.set_custom_html(f'
')
+ report.build_custom()
+ for _floor, rssi_image_path in rssi_images.items():
+ if os.path.exists(rssi_image_path):
+ report.set_custom_html('')
+ report.build_custom()
+ report.set_custom_html(f'
')
+ report.build_custom()
+
+ for coordinate in range(len(passed_coordinates)):
+ if self.rotation_enabled:
+ for angle in range(len(self.rotation_list)):
+ report.set_obj_html(_obj_title=f"Coordinate: {self.coordinate_list[coordinate]} | Rotation Angle: {self.rotation_list[angle]}°",
+ _obj="")
+ report.build_objective()
+ data = self.qos_data[self.coordinate_list[coordinate]][self.rotation_list[angle]]["data"]
+ connections_download_avg = self.qos_data[self.coordinate_list[coordinate]][self.rotation_list[angle]]["connections_download_avg"]
+ connections_upload_avg = self.qos_data[self.coordinate_list[coordinate]][self.rotation_list[angle]]["connections_upload_avg"]
+ avg_drop_a = self.qos_data[self.coordinate_list[coordinate]][self.rotation_list[angle]]["avg_drop_a"]
+ avg_drop_b = self.qos_data[self.coordinate_list[coordinate]][self.rotation_list[angle]]["avg_drop_b"]
+ self.generate_individual_coordinate(report, data, connections_download_avg, connections_upload_avg, avg_drop_a, avg_drop_b, coordinate, angle)
+ else:
+ report.set_obj_html(_obj_title=f"Coordinate: {self.coordinate_list[coordinate]}",
+ _obj="")
+ report.build_objective()
+ data = self.qos_data[self.coordinate_list[coordinate]]["data"]
+ connections_download_avg = self.qos_data[self.coordinate_list[coordinate]]["connections_download_avg"]
+ connections_upload_avg = self.qos_data[self.coordinate_list[coordinate]]["connections_upload_avg"]
+ avg_drop_a = self.qos_data[self.coordinate_list[coordinate]]["avg_drop_a"]
+ avg_drop_b = self.qos_data[self.coordinate_list[coordinate]]["avg_drop_b"]
+ self.generate_individual_coordinate(report, data, connections_download_avg, connections_upload_avg, avg_drop_a, avg_drop_b, coordinate, None)
+ input_setup_info = {
+ "contact": "support@candelatech.com"
+ }
+ report.test_setup_table(test_setup_data=input_setup_info, value="Information")
+ report.build_footer()
+ report.write_html()
+ report.write_pdf()
+
+ def perform_robo(self):
+ """
+ It handles moving the robot to each coordinate,
+ checking battery levels, rotating at each spot, and collecting Wi-Fi
+ performance data to build the final report.
+ """
+ if (self.rotation_list[0] != ""):
+ self.rotation_enabled = True
+ # coordinate list to track coordinates where the test needs to be triggered
+ coord_list = []
+ test_stopped_by_user = False
+ if self.coordinate:
+ coord_list = self.coordinate_list
+ if self.dowebgui:
+ base_dir = os.path.dirname(os.path.dirname(self.result_dir))
+ nav_data = os.path.join(base_dir, 'nav_data.json') # To generate nav_data.json in webgui folder
+ with open(nav_data, "w") as file:
+ json.dump({}, file)
+ self.robot.nav_data_path = nav_data
+ self.robot.runtime_dir = self.result_dir
+ self.robot.ip = self.host
+ self.robot.testname = self.test_name
+ passed_coord_list = []
+ abort = False
+ for coordinate in coord_list:
+ if self.robot_ip:
+ if self.test_stopped_by_user:
+ break
+ # Before moving to next coordinate, check if battery is sufficient
+ pause_coord, test_stopped_by_user = self.robot.wait_for_battery()
+ if test_stopped_by_user:
+ break
+ matched, abort = self.robot.move_to_coordinate(coordinate)
+
+ if matched:
+ logger.info("Reached the coordinate {}".format(coordinate))
+ if abort:
+ break
+ passed_coord_list.append(coordinate)
+
+ if matched:
+ self.overall = []
+ self.df_for_webui = []
+ # If rotations are not allowed
+ if not self.rotation_enabled:
+ test_results = {'test_results': []}
+ data = {}
+ input_setup_info = {
+ "contact": "support@candelatech.com"
+ }
+ self.current_coordinate = coordinate
+ self.start(False, False)
+ time.sleep(10)
+ connections_download, connections_upload, drop_a_per, drop_b_per, connections_download_avg, connections_upload_avg, avg_drop_a, avg_drop_b = self.monitor(
+ curr_coordinate=coordinate)
+ logger.info("connections download {}".format(connections_download))
+ logger.info("connections upload {}".format(connections_upload))
+ self.stop()
+ time.sleep(5)
+ test_results['test_results'].append(self.evaluate_qos(connections_download, connections_upload, drop_a_per, drop_b_per))
+ data.update(test_results)
+ params = {
+ "data": None,
+ "input_setup_info": None,
+ "connections_download_avg": None,
+ "connections_upload_avg": None,
+ "avg_drop_a": None,
+ "avg_drop_b": None,
+ "report_path": "",
+ "result_dir_name": "Qos_Test_report",
+ "selected_real_clients_names": None,
+ "config_devices": ""
+ }
+
+ params.update({
+ "data": data,
+ "input_setup_info": input_setup_info,
+ "report_path": (
+ self.result_dir
+ if self.dowebgui else ""
+ ),
+ "connections_upload_avg": connections_upload_avg,
+ "connections_download_avg": connections_download_avg,
+ "avg_drop_a": avg_drop_a,
+ "avg_drop_b": avg_drop_b
+ })
+ self.qos_data[coordinate] = params
+
+ # If rotations are enabled
+ else:
+ exit_from_monitor = False
+ for angle in range(len(self.rotation_list)):
+ test_results = {'test_results': []}
+ data = {}
+ input_setup_info = {
+ "contact": "support@candelatech.com"
+ }
+ self.last_rotated_angles = []
+ self.current_coordinate = coordinate
+ self.current_angle = self.angle_list[angle]
+ pause_angle, test_stopped_by_user = self.robot.wait_for_battery(stop=self.stop)
+ if test_stopped_by_user:
+ break
+ if pause_angle:
+ reached = self.robot.move_to_coordinate(coordinate)
+ if not reached:
+ test_stopped_by_user = True
+ break
+ final_angle = self.robot.angle_list[angle]
+ rotation = self.robot.rotate_angle(self.current_angle)
+ if not rotation:
+ exit_from_monitor = True
+ if exit_from_monitor:
+ break
+ if final_angle not in self.last_rotated_angles:
+ self.last_rotated_angles.append(final_angle)
+ self.start(False, False)
+ monitor_charge_time = datetime.now()
+ connections_download, connections_upload, drop_a_per, drop_b_per, connections_download_avg, connections_upload_avg, avg_drop_a, avg_drop_b = self.monitor(
+ curr_coordinate=coordinate, curr_rotation=self.current_angle, monitor_charge_time=monitor_charge_time)
+ logger.info("connections download {}".format(connections_download))
+ logger.info("connections upload {}".format(connections_upload))
+ self.stop()
+ time.sleep(5)
+ test_results['test_results'].append(self.evaluate_qos(connections_download, connections_upload, drop_a_per, drop_b_per))
+ data.update(test_results)
+ params = {
+ "data": None,
+ "input_setup_info": None,
+ "connections_download_avg": None,
+ "connections_upload_avg": None,
+ "avg_drop_a": None,
+ "avg_drop_b": None,
+ "report_path": "",
+ "result_dir_name": "Qos_Test_report",
+ "selected_real_clients_names": None,
+ "config_devices": ""
+ }
+
+ params.update({
+ "data": data,
+ "input_setup_info": input_setup_info,
+ "report_path": (
+ self.result_dir
+ if self.dowebgui else ""
+ ),
+ "connections_upload_avg": connections_upload_avg,
+ "connections_download_avg": connections_download_avg,
+ "avg_drop_a": avg_drop_a,
+ "avg_drop_b": avg_drop_b
+ })
+ if coordinate not in self.qos_data:
+ self.qos_data[coordinate] = {}
+ self.qos_data[coordinate][self.rotation_list[angle]] = params
+
+ self.generate_report_for_robo(coordinate_list=coord_list, angle_list=self.rotation_list, passed_coordinates=passed_coord_list)
+
def build_iot_report_section(self, report, iot_summary):
"""
Handles all IoT-related charts, tables, and increment-wise reports.
@@ -2666,6 +3133,11 @@ def main():
optional.add_argument("--config", action="store_true", help="Specify for configuring the devices")
optional.add_argument('--get_live_view', help="If true will heatmap will be generated from testhouse automation WebGui ", action='store_true')
optional.add_argument('--total_floors', help="Total floors from testhouse automation WebGui ", default="0")
+ # Args for robot testing
+ optional.add_argument("--robot_test", help='to trigger robot test', action='store_true')
+ optional.add_argument('--robot_ip', type=str, default='localhost', help='hostname for where Robot server is running')
+ optional.add_argument('--coordinate', type=str, default='', help="The coordinate contains list of coordinates to be ")
+ optional.add_argument('--rotation', type=str, default='', help="The set of angles to rotate at a particular point")
# IOT ARGS
parser.add_argument('--iot_test', help="If true will execute script for iot", action='store_true')
optional.add_argument('--iot_ip',
@@ -2716,6 +3188,12 @@ def main():
loads = {}
data = {}
+ rotation_enabled = False
+ angle_list = []
+
+ if args.rotation:
+ angle_list = args.rotation.split(',')
+ rotation_enabled = True
if args.download and args.upload:
loads = {'upload': str(args.upload).split(","), 'download': str(args.download).split(",")}
@@ -2804,10 +3282,17 @@ def main():
wait_time=args.wait_time,
config=args.config,
get_live_view=args.get_live_view,
- total_floors=args.total_floors
+ total_floors=args.total_floors,
+ robot_test=args.robot_test,
+ robot_ip=args.robot_ip,
+ coordinate=args.coordinate,
+ rotation=args.rotation,
+ rotation_enabled=rotation_enabled,
+ angle_list=angle_list
)
throughput_qos.os_type()
_, configured_device, _, configuration = throughput_qos.phantom_check()
+ throughput_qos.qos_data["configuration"] = configuration
if args.dowebgui and args.group_name:
if len(configured_device) == 0:
logger.warning("No device is available to run the test")
@@ -2866,6 +3351,9 @@ def main():
raise ValueError("Aborting the test....")
throughput_qos.build()
throughput_qos.monitor_cx()
+ if args.robot_test:
+ throughput_qos.perform_robo()
+ exit(1)
throughput_qos.start(False, False)
time.sleep(10)
connections_download, connections_upload, drop_a_per, drop_b_per, connections_download_avg, connections_upload_avg, avg_drop_a, avg_drop_b = throughput_qos.monitor()