diff --git a/common/library/module_utils/input_validation/validation_flows/provision_validation.py b/common/library/module_utils/input_validation/validation_flows/provision_validation.py index cc6b4d8e76..4ba1515129 100644 --- a/common/library/module_utils/input_validation/validation_flows/provision_validation.py +++ b/common/library/module_utils/input_validation/validation_flows/provision_validation.py @@ -225,6 +225,52 @@ def validate_duplicate_service_tags_in_mapping_file(pxe_mapping_file_path): raise ValueError(f"Duplicate SERVICE_TAG found in PXE mapping file: {'; '.join(duplicates)}") +def validate_duplicate_admin_ips_in_mapping_file(pxe_mapping_file_path): + """Validates that ADMIN_IP values in the mapping file are unique.""" + if not pxe_mapping_file_path or not os.path.isfile(pxe_mapping_file_path): + raise ValueError(f"PXE mapping file not found: {pxe_mapping_file_path}") + + with open(pxe_mapping_file_path, "r", encoding="utf-8") as fh: + raw_lines = fh.readlines() + + non_comment_lines = [ln for ln in raw_lines if ln.strip()] + reader = csv.DictReader(non_comment_lines) + + fieldname_map = {fn.strip().upper(): fn for fn in reader.fieldnames} + admin_ip_col = fieldname_map.get("ADMIN_IP") + hostname_col = fieldname_map.get("HOSTNAME") + + if not admin_ip_col: + raise ValueError("ADMIN_IP column not found in PXE mapping file") + + seen_admin_ips = {} + duplicates = [] + + for row_idx, row in enumerate(reader, start=2): + admin_ip = row.get(admin_ip_col, "").strip() if row.get(admin_ip_col) else "" + hostname = "" + if hostname_col: + hostname = row.get(hostname_col, "").strip() if row.get(hostname_col) else "" + + if not admin_ip: + continue + + if admin_ip in seen_admin_ips: + first_row = seen_admin_ips[admin_ip]["row"] + first_host = seen_admin_ips[admin_ip]["hostname"] + dup_host = hostname or "" + first_host_disp = first_host or "" + duplicates.append( + f"'{admin_ip}' at CSV rows {first_row} ({first_host_disp}) and {row_idx} ({dup_host})" + ) + continue + + seen_admin_ips[admin_ip] = {"row": row_idx, "hostname": hostname} + + if duplicates: + raise ValueError(f"Duplicate ADMIN_IP found in PXE mapping file: {'; '.join(duplicates)}") + + def validate_group_parent_service_tag_consistency_in_mapping_file(pxe_mapping_file_path): """Validates that GROUP_NAME has a consistent PARENT_SERVICE_TAG across the mapping file.""" if not pxe_mapping_file_path or not os.path.isfile(pxe_mapping_file_path): @@ -740,6 +786,7 @@ def validate_provision_config( validate_functional_groups_in_mapping_file(pxe_mapping_file_path) validate_duplicate_service_tags_in_mapping_file(pxe_mapping_file_path) validate_duplicate_hostnames_in_mapping_file(pxe_mapping_file_path) + validate_duplicate_admin_ips_in_mapping_file(pxe_mapping_file_path) validate_group_parent_service_tag_consistency_in_mapping_file(pxe_mapping_file_path) validate_functional_groups_separation(pxe_mapping_file_path) validate_parent_service_tag_hierarchy(pxe_mapping_file_path)