diff --git a/py-scripts/lf_interop_video_streaming.py b/py-scripts/lf_interop_video_streaming.py index 7f96cf83e..abb860ebd 100755 --- a/py-scripts/lf_interop_video_streaming.py +++ b/py-scripts/lf_interop_video_streaming.py @@ -59,6 +59,13 @@ python3 lf_interop_video_streaming.py --mgr 192.168.214.219 --url "https://test-streams.mux.dev/x36xhzz/x36xhzz.m3u8" --media_source hls --media_quality 1080P --duration 1m --device_list 1.10,1.12 --debug --test_name video_streaming_test --expected_passfail_value 5 + Example-11: Command Line Interface to run the Test along with IOT without device list + python3 -u lf_interop_video_streaming.py --mgr 192.168.242.2 --duration 1m --url https://dash.akamaized.net/akamai/bbb_30fps/bbb_30fps.mpd --test_name videostreaming + --webgui_incremental 1 --media_source dash --media_quality 4k --postcleanup --precleanup --iot_test --iot_testname "IotVideoStreaming" + + Example-12: Command Line Interface to run the Test along with IOT with device list + python3 -u lf_interop_video_streaming.py --mgr 192.168.242.2 --duration 1m --url https://dash.akamaized.net/akamai/bbb_30fps/bbb_30fps.mpd --test_name videostreaming + --webgui_incremental 1 --media_source dash --media_quality 4k --postcleanup --precleanup --iot_test --iot_testname "IotVideoStreaming" --iot_device_list "switch.smart_plug_1_socket_1" SCRIPT CLASSIFICATION: Test @@ -96,6 +103,8 @@ from datetime import datetime, timedelta from lf_graph import lf_bar_graph_horizontal from lf_graph import lf_line_graph +import threading +from collections import OrderedDict if sys.version_info[0] != 3: @@ -106,6 +115,7 @@ base = importlib.import_module('py-scripts.lf_base_interop_profile') lf_csv = importlib.import_module("py-scripts.lf_csv") realm = importlib.import_module("py-json.realm") +LFCliBase = realm.LFCliBase Realm = realm.Realm base_RealDevice = base.RealDevice lf_report = importlib.import_module("py-scripts.lf_report") @@ -121,6 +131,11 @@ PortUtils = port_utils.PortUtils DeviceConfig = importlib.import_module("py-scripts.DeviceConfig") +iot_scripts_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../local/interop-webGUI/IoT/scripts/")) +if os.path.exists(iot_scripts_path): + sys.path.insert(0, iot_scripts_path) + from test_automation import Automation # noqa: E402 + class VideoStreamingTest(Realm): def __init__(self, host, ssid, passwd, encryp, media_source, media_quality, suporrted_release=None, max_speed=None, url=None, @@ -1199,7 +1214,7 @@ def handle_passfail_criteria(self, data: dict): 'Average Rx Rate (Mbps)': avg_rx_rate_list } - def generate_report(self, date, iterations_before_test_stopped_by_user, test_setup_info, realtime_dataset, report_path=''): + def generate_report(self, date, iterations_before_test_stopped_by_user, test_setup_info, realtime_dataset, report_path='', iot_summary=None): logging.info("Creating Reports") # Initialize the report object if self.dowebgui and report_path == '': @@ -1219,15 +1234,30 @@ def generate_report(self, date, iterations_before_test_stopped_by_user, test_set keys = list(self.http_profile.created_cx.keys()) # Set report title, date, and build banner - report.set_title("Video Streaming Test") + report.set_title("Video Streaming Test Including IoT Devices" if iot_summary else "Video Streaming Test") report.set_date(date) report.build_banner() - report.set_obj_html("Objective", " The Candela Video streaming test is designed to measure the access point performance and stability by streaming the videos from the local browser" - "or from over the Internet in real clients like android which are connected to the access point," - "this test allows the user to choose the options like video link, type of media source, media quality, number of playbacks." - "Along with the performance other measurements like No of Buffers, Wait-Time, per client Video Bitrate, Video Quality, and more. " - "The expected behavior is for the DUT to be able to handle several stations (within the limitations of the AP specs)" - "and make sure all capable clients can browse the video. ") + if iot_summary: + report.set_obj_html( + "Objective", + "The Candela Video Streaming Test Including IoT Devices is designed to evaluate an Access Point’s " + "performance and stability when handling both Real clients (Android, Windows, Linux, iOS) and IoT devices " + "(controlled via Home Assistant) simultaneously. " + "For Real clients, the test measures video streaming performance by playing user-defined media " + "(from a local browser or the Internet) with configurable options such as video link, media source type, " + "quality, and number of playbacks. Metrics such as buffering events, wait-time, per-client video bitrate, " + "and video quality are captured to validate the AP’s ability to sustain smooth playback across multiple clients. " + "For IoT clients, the test concurrently executes device-specific actions (e.g., camera streaming, switch toggling, " + "lock/unlock) and tracks success rate, latency, and failure rate. The goal is to ensure the AP can reliably support " + "video streaming for multiple real clients while maintaining responsive and consistent performance for IoT devices." + ) + else: + report.set_obj_html("Objective", " The Candela Video streaming test is designed to measure the access point performance and stability by streaming the videos from the local browser" + "or from over the Internet in real clients like android which are connected to the access point," + "this test allows the user to choose the options like video link, type of media source, media quality, number of playbacks." + "Along with the performance other measurements like No of Buffers, Wait-Time, per client Video Bitrate, Video Quality, and more. " + "The expected behavior is for the DUT to be able to handle several stations (within the limitations of the AP specs)" + "and make sure all capable clients can browse the video. ") report.build_objective() report.set_table_title("Input Parameters") report.build_table_title() @@ -1241,7 +1271,8 @@ def generate_report(self, date, iterations_before_test_stopped_by_user, test_set # Create a string by joining the mapped pairs gp_map = ", ".join(f"{group} -> {profile}" for group, profile in gp_pairs) test_setup_info["Configuration"] = gp_map - + if iot_summary: + test_setup_info = with_iot_params_in_table(test_setup_info, iot_summary) report.test_setup_table(value="Test Setup Information", test_setup_data=test_setup_info) device_type = [] @@ -1497,6 +1528,8 @@ def generate_report(self, date, iterations_before_test_stopped_by_user, test_set dataframe3 = pd.DataFrame(dataframe2) report.set_table_dataframe(dataframe3) report.build_table() + if iot_summary: + self.build_iot_report_section(report, iot_summary) report.build_footer() report.write_html() report.write_pdf() @@ -1790,6 +1823,229 @@ def updating_webui_running_json(self): with open(file_path, 'w') as file: json.dump(data, file, indent=4) + def build_iot_report_section(self, report, iot_summary): + """ + Handles all IoT-related charts, tables, and increment-wise reports. + """ + outdir = report.path_date_time + os.makedirs(outdir, exist_ok=True) + + def copy_into_report(raw_path, new_name): + """Resolve and copy image into report dir.""" + if not raw_path: + return None + + abs_src = os.path.abspath(raw_path) + if not os.path.exists(abs_src): + # Search recursively under 'results' if absolute path missing + for root, _, files in os.walk(os.path.join(os.getcwd(), "results")): + if os.path.basename(raw_path) in files: + abs_src = os.path.join(root, os.path.basename(raw_path)) + break + else: + return None + + dst = os.path.join(outdir, new_name) + if os.path.abspath(abs_src) != os.path.abspath(dst): + shutil.copy2(abs_src, dst) + return new_name + + # section header + report.set_custom_html('
') + report.build_custom() + report.set_custom_html('

IoT Results

') + report.build_custom() + + # Statistics + stats_png = copy_into_report(iot_summary.get("statistics_img"), "iot_statistics.png") + if stats_png: + report.build_chart_title("Test Statistics") + report.set_custom_html(f'') + report.build_custom() + + # Request vs Latency + rvl_png = copy_into_report(iot_summary.get("req_vs_latency_img"), "iot_request_vs_latency.png") + if rvl_png: + report.build_chart_title("Request vs Average Latency") + report.set_custom_html(f'') + report.build_custom() + + # Overall results table + ort = iot_summary.get("overall_result_table") or {} + if ort: + rows = [{ + "Device": dev, + "Min Latency (ms)": stats.get("min_latency"), + "Avg Latency (ms)": stats.get("avg_latency"), + "Max Latency (ms)": stats.get("max_latency"), + "Total Iterations": stats.get("total_iterations"), + "Success Iters": stats.get("success_iterations"), + "Failed Iters": stats.get("failed_iterations"), + "No-Response Iters": stats.get("no_response_iterations"), + } for dev, stats in ort.items()] + + df_overall = pd.DataFrame(rows).round(2) + + report.set_custom_html('
') + report.build_custom() + report.set_obj_html(_obj_title="Overall IoT Result Table", _obj=" ") + report.build_objective() + report.set_table_dataframe(df_overall) + report.build_table() + report.set_custom_html('
') + report.build_custom() + + # Increment reports + inc = iot_summary.get("increment_reports") or {} + if inc: + report.set_custom_html('

Reports by Increment Steps

') + report.build_custom() + + for step_name, rep in inc.items(): + + report.set_custom_html(f'

{step_name.replace("_", " ")}

') + report.build_custom() + + # Latency graph + lat_png = copy_into_report(rep.get("latency_graph"), f"iot_{step_name}_latency.png") + if lat_png: + report.build_chart_title("Average Latency") + report.set_custom_html(f'') + report.build_custom() + + # Success count graph + res_png = copy_into_report(rep.get("result_graph"), f"iot_{step_name}_results.png") + if res_png: + report.build_chart_title("Success Count") + report.set_custom_html(f'') + report.build_custom() + + # Tabular data for detailed iteration-level results + data_rows = rep.get("data") or [] + if data_rows: + df = pd.DataFrame(data_rows).rename( + columns={"latency__ms": "Latency_ms", "latency_ms": "Latency_ms"} + ) + if "Latency_ms" in df.columns: + df["Latency_ms"] = pd.to_numeric(df["Latency_ms"], errors="coerce").round(3) + if "Result" in df.columns: + df["Result"] = df["Result"].map(lambda x: "Success" if bool(x) else "Failure") + + desired_cols = ["Iteration", "Device", "Current State", "Latency_ms", "Result"] + df = df[[c for c in desired_cols if c in df.columns]] + + report.set_table_dataframe(df) + report.build_table() + + report.set_custom_html('
') + report.build_custom() + + +def with_iot_params_in_table(base: dict, iot_summary) -> dict: + """ + Append IoT params into the existing Throughput Input Parameters table. + Adds: IoT Test name, IoT Iterations, IoT Delay (s), IoT Increment. + Accepts dict or JSON string. + """ + try: + if not iot_summary: + return base + if isinstance(iot_summary, str): + try: + iot_summary = json.loads(iot_summary) + except Exception: + start = iot_summary.find("{") + end = iot_summary.rfind("}") + if start == -1 or end == -1 or end <= start: + return base + try: + iot_summary = json.loads(iot_summary[start:end + 1]) + except Exception: + return base + + ti = (iot_summary.get("test_input_table") or {}) + out = OrderedDict(base) + out["Iot Device List"] = ti.get("Device List", "") + out["IoT Iterations"] = ti.get("Iterations", "") + out["IoT Delay (s)"] = ti.get("Delay (seconds)", "") + out["IoT Increment"] = ti.get("Increment Pattern", "") + return out + except Exception: + return base + + +def trigger_iot(ip, port, iterations, delay, device_list, testname, increment): + """ + Entry point to start the IoT test in a separate thread. + This function is called from the throughput test script when IoT testing + is enabled. It wraps the asynchronous `run_iot()`. + """ + asyncio.run(run_iot(ip, port, iterations, delay, device_list, testname, increment)) + + +async def run_iot(ip: str = '127.0.0.1', + port: str = '8000', + iterations: int = 1, + delay: int = 5, + device_list: str = '', + testname: str = '', + increment: str = ''): + try: + + if delay < 5: + logger.error('The minimum delay should be 5 seconds.') + exit(1) + + if device_list != '': + device_list = device_list.split(',') + else: + device_list = None + # Parse and validate increment pattern if provided + if increment: + print("the increment is : ", increment) + try: + increment = list(map(int, increment.split(','))) + if any(i < 1 for i in increment): + logger.error('Increment values must be positive integers') + exit(1) + except ValueError: + logger.error('Invalid increment format. Please provide comma-separated integers (e.g., "1,3,5")') + exit(1) + + testname = testname + + # Ensure test name is unique (avoid overwriting previous results) + if testname in os.listdir('../../local/interop-webGUI/IoT/scripts/results/'): + logger.error('Test with same name already existing. Please give a different testname.') + exit(1) + automation = Automation(ip=ip, + port=port, + iterations=iterations, + delay=delay, + device_list=device_list, + testname=testname, + increment=increment) + + # fetch the available iot devices + automation.devices = await automation.fetch_iot_devices() + + # select the iot devices for testing + automation.select_iot_devices() + + # run the iot test on selected devices + automation.run_test() + + # generate the iot report + automation.generate_report() + + except Exception as e: + logger.error(f"Iot Test failed: {str(e)}") + raise + + await automation.session.close() + + logger.info('Iot Test Completed.') + def main(): help_summary = '''\ @@ -1888,7 +2144,7 @@ def main(): """) - + optional = parser.add_argument_group('Optional arguments to run lf_interop_video_streaming.py') parser.add_argument("--host", "--mgr", help='specify the GUI to connect to, assumes port ' '8080') parser.add_argument("--ssid", help='specify ssid on which the test will be running') @@ -1954,6 +2210,40 @@ def main(): type=int, default=0, help='specify the Number of floors there in the house') + # IOT ARGS + parser.add_argument('--iot_test', help="If true will execute script for iot", action='store_true') + optional.add_argument('--iot_ip', + default='127.0.0.1', + help='IP of the server') + + optional.add_argument('--iot_port', + default='8000', + help='Port of the server') + optional.add_argument('--iot_iterations', + type=int, + default=1, + help='Iterations to run the test') + + optional.add_argument('--iot_delay', + type=int, + default=5, + help='Delay in seconds between iterations (min. 5 seconds)') + + optional.add_argument('--iot_device_list', + type=str, + default='', + help='Entity IDs of the devices to include in testing (comma separated)') + + optional.add_argument('--iot_testname', + type=str, + default='', + help='Testname for reporting') + + optional.add_argument('--iot_increment', + type=str, + default='', + help='Comma-separated list of device counts to incrementally test (e.g., "1,3,5")') + args = parser.parse_args() if args.help_summary: @@ -2004,6 +2294,14 @@ def main(): if args.lf_logger_config_json: logger_config.lf_logger_config_json = args.lf_logger_config_json logger_config.load_lf_logger_config() + if args.iot_test: + iot_ip = args.iot_ip + iot_port = args.iot_port + iot_iterations = args.iot_iterations + iot_delay = args.iot_delay + iot_device_list = args.iot_device_list + iot_testname = args.iot_testname + iot_increment = args.iot_increment logger = logging.getLogger(__name__) @@ -2164,7 +2462,27 @@ def main(): elif obj.incremental[-1] < len(available_resources) and len(obj.incremental) > 1: logging.info("Exiting the program as the last incremental value must be equal to selected devices") exit() - + if args.iot_test: + if args.iot_iterations > 1: + thread = threading.Thread(target=trigger_iot, args=(iot_ip, iot_port, iot_iterations, iot_delay, iot_device_list, iot_testname, iot_increment)) + thread.start() + else: + total_secs = int(LFCliBase.parse_time(args.duration).total_seconds()) + iot_iterations = max(1, total_secs // args.iot_delay) + iot_thread = threading.Thread( + target=trigger_iot, + args=( + args.iot_ip, + args.iot_port, + iot_iterations, + args.iot_delay, + args.iot_device_list, + args.iot_testname, + args.iot_increment + ), + daemon=True + ) + iot_thread.start() # To create cx for selected devices obj.build() @@ -2301,12 +2619,19 @@ def main(): break obj.stop() date = str(datetime.now()).split(",")[0].replace(" ", "-").split(".")[0] + iot_summary = None + if args.iot_test and args.iot_testname: + base = os.path.join("results", args.iot_testname) + p = os.path.join(base, "iot_summary.json") + if os.path.exists(p): + with open(p) as f: + iot_summary = json.load(f) # prev_inc_value = 0 if obj.resource_ids and obj.incremental: - obj.generate_report(date, list(set(iterations_before_test_stopped_by_user)), test_setup_info=test_setup_info, realtime_dataset=individual_df) + obj.generate_report(date, list(set(iterations_before_test_stopped_by_user)), test_setup_info=test_setup_info, realtime_dataset=individual_df, iot_summary=iot_summary) elif obj.resource_ids: - obj.generate_report(date, list(set(iterations_before_test_stopped_by_user)), test_setup_info=test_setup_info, realtime_dataset=individual_df) + obj.generate_report(date, list(set(iterations_before_test_stopped_by_user)), test_setup_info=test_setup_info, realtime_dataset=individual_df, iot_summary=iot_summary) # Perform post-cleanup operations if args.postcleanup: