From f86b092d1bb9cb89cb33f3d974d8279d9f72cf9d Mon Sep 17 00:00:00 2001 From: alf-cactus Date: Wed, 17 Dec 2025 14:06:21 +0100 Subject: [PATCH 01/10] cp nat --- .../fw_modules/checkpointR8x/cp_getter.py | 14 ++-- .../fw_modules/checkpointR8x/cp_rule.py | 3 +- .../fw_modules/checkpointR8x/fwcommon.py | 73 +++++++++++-------- 3 files changed, 52 insertions(+), 38 deletions(-) diff --git a/roles/importer/files/importer/fw_modules/checkpointR8x/cp_getter.py b/roles/importer/files/importer/fw_modules/checkpointR8x/cp_getter.py index b80a28f63b..930f9f1f4e 100644 --- a/roles/importer/files/importer/fw_modules/checkpointR8x/cp_getter.py +++ b/roles/importer/files/importer/fw_modules/checkpointR8x/cp_getter.py @@ -308,15 +308,19 @@ def get_global_assignments(api_v_url: str, sid: str, show_params_policy_structur # parse global assignments for assignment in assignments['objects']: - global_assignment = parse_global_assignment(assignment) - global_assignments.append(global_assignment) + if "global-access-policy" in assignment: + global_assignment = parse_global_assignment(assignment) + global_assignments.append(global_assignment) return global_assignments -def get_rulebases(api_v_url: str, sid: str | None, show_params_rules: dict[str, Any], native_config_domain: dict[str, Any] | None, - device_config: dict[str, Any] | None, policy_rulebases_uid_list: list[str], is_global: bool = False, - access_type: str = 'access', rulebase_uid: str | None = None, rulebase_name: str | None = None) -> list[str]: +def get_rulebases(api_v_url: str, sid: str | None, show_params_rules: dict[str, Any], + native_config_domain: dict[str, Any] | None, + device_config: dict[str, Any] | None, + policy_rulebases_uid_list: list[str], is_global: bool = False, + access_type: str = 'access', rulebase_uid: str | None = None, + rulebase_name: str | None = None) -> list[str]: # access_type: access / nat native_config_rulebase_key = 'rulebases' diff --git a/roles/importer/files/importer/fw_modules/checkpointR8x/cp_rule.py b/roles/importer/files/importer/fw_modules/checkpointR8x/cp_rule.py index 85a29e7657..95dc37786c 100644 --- a/roles/importer/files/importer/fw_modules/checkpointR8x/cp_rule.py +++ b/roles/importer/files/importer/fw_modules/checkpointR8x/cp_rule.py @@ -2,6 +2,7 @@ from typing import Any import ast +from fw_modules.checkpointR8x import cp_const from fwo_log import FWOLogger from fwo_const import LIST_DELIMITER, DEFAULT_SECTION_HEADER_TEXT from fwo_base import sanitize, sort_and_join_refs @@ -402,7 +403,7 @@ def resolve_nwobj_uid_to_name(nw_obj_uid: str) -> str: else: FWOLogger.warning("could not resolve network object with uid " + nw_obj_uid) return "" - + def check_and_add_section_header(src_rulebase: dict[str, Any], target_rulebase: Rulebase, layer_name: str, import_id: str, section_header_uids: set[str]): # if current rulebase starts a new section, add section header, but only if it does not exist yet (can happen by chunking a section) # if 'type' in src_rulebase and src_rulebase['type'] == 'access-section' and 'uid' in src_rulebase: # and not src_rulebase['uid'] in section_header_uids: diff --git a/roles/importer/files/importer/fw_modules/checkpointR8x/fwcommon.py b/roles/importer/files/importer/fw_modules/checkpointR8x/fwcommon.py index dc79fd77a2..48b2a73bc9 100644 --- a/roles/importer/files/importer/fw_modules/checkpointR8x/fwcommon.py +++ b/roles/importer/files/importer/fw_modules/checkpointR8x/fwcommon.py @@ -41,7 +41,7 @@ def get_config(self, config_in: FwConfigManagerListController, import_state: Imp def get_config(config_in: FwConfigManagerListController, import_state: ImportStateController) -> tuple[int, FwConfigManagerListController]: - FWOLogger.debug ( "starting checkpointR8x/get_config" ) + FWOLogger.debug( "starting checkpointR8x/get_config" ) if config_in.has_empty_config(): # no native config was passed in, so getting it from FW-Manager parsing_config_only = False @@ -53,7 +53,7 @@ def get_config(config_in: FwConfigManagerListController, import_state: ImportSta initialize_native_config(config_in, import_state) start_time_temp = int(time.time()) - FWOLogger.debug ( "checkpointR8x/get_config/getting objects ...") + FWOLogger.debug( "checkpointR8x/get_config/getting objects ...") if config_in.native_config is None: raise FwoImporterError("native_config is None in get_config") @@ -62,17 +62,17 @@ def get_config(config_in: FwConfigManagerListController, import_state: ImportSta result_get_objects = get_objects(config_in.native_config, import_state) if result_get_objects>0: raise FwLoginFailed( "checkpointR8x/get_config/error while gettings objects") - FWOLogger.debug ( "checkpointR8x/get_config/fetched objects in " + str(int(time.time()) - start_time_temp) + "s") + FWOLogger.debug( "checkpointR8x/get_config/fetched objects in " + str(int(time.time()) - start_time_temp) + "s") start_time_temp = int(time.time()) - FWOLogger.debug ( "checkpointR8x/get_config/getting rules ...") - result_get_rules = get_rules (config_in.native_config, import_state) + FWOLogger.debug( "checkpointR8x/get_config/getting rules ...") + result_get_rules = get_rules(config_in.native_config, import_state) if result_get_rules>0: raise FwLoginFailed( "checkpointR8x/get_config/error while gettings rules") - FWOLogger.debug ( "checkpointR8x/get_config/fetched rules in " + str(int(time.time()) - start_time_temp) + "s") + FWOLogger.debug( "checkpointR8x/get_config/fetched rules in " + str(int(time.time()) - start_time_temp) + "s") duration = int(time.time()) - starttime - FWOLogger.debug ( "checkpointR8x/get_config - fetch duration: " + str(duration) + "s" ) + FWOLogger.debug( "checkpointR8x/get_config - fetch duration: " + str(duration) + "s" ) if config_in.contains_only_native(): sid: str = cp_getter.login(import_state.mgm_details) @@ -255,12 +255,12 @@ def process_devices( native_config_global_domain: dict[str, Any], import_state: ImportStateController ) -> None: for device in manager_details.devices: - device_config: dict[str,Any] = initialize_device_config(device) + device_config = initialize_device_config(device) if not device_config: continue - ordered_layer_uids: list[str] = get_ordered_layer_uids(policy_structure, device_config, manager_details.getDomainString()) - if not ordered_layer_uids: + ordered_layer_uids, policy_name = get_ordered_layer_uids(policy_structure, device_config, manager_details.getDomainString()) + if not ordered_layer_uids or policy_name is None: FWOLogger.warning(f"No ordered layers found for device: {device_config['name']}") native_config_domain['gateways'].append(device_config) continue @@ -278,7 +278,7 @@ def process_devices( get_rules_params(import_state), cp_manager_api_base_url, sid, native_config_domain, device_config, False, global_ordered_layer_count) - handle_nat_rules(device, native_config_domain, sid, import_state) + handle_nat_rules(policy_name, native_config_domain, sid, import_state) native_config_domain['gateways'].append(device_config) @@ -308,7 +308,8 @@ def handle_global_rulebase_links( continue for global_policy in global_policy_structure: if global_policy['name'] == global_assignment['global-access-policy']: - global_ordered_layer_uids = get_ordered_layer_uids([global_policy], device_config, global_domain) + # no global NAT, so global_policy_name not used + global_ordered_layer_uids, _global_policy_name = get_ordered_layer_uids([global_policy], device_config, global_domain) if not global_ordered_layer_uids: FWOLogger.warning(f"No access layer for global policy: {global_policy['name']}") break @@ -378,23 +379,21 @@ def get_rules_params(import_state: ImportStateController) -> dict[str, Any]: } -def handle_nat_rules(device: dict[str, Any], native_config_domain: dict[str, Any], sid: str, import_state: ImportStateController): - if 'package_name' in device and device['package_name']: - show_params_rules: dict[str, Any] = { - 'limit': import_state.fwo_config.api_fetch_size, - 'use-object-dictionary': cp_const.use_object_dictionary, - 'details-level': 'standard', - 'package': device['package_name'] - } - FWOLogger.debug(f"Getting NAT rules for package: {device['package_name']}", 4) - nat_rules = cp_getter.get_nat_rules_from_api_as_dict( - import_state.mgm_details.buildFwApiString(), sid, show_params_rules, - native_config_domain=native_config_domain - ) - if nat_rules: - native_config_domain['nat_rulebases'].append(nat_rules) - else: - native_config_domain['nat_rulebases'].append({"nat_rule_chunks": []}) +def handle_nat_rules(policy_name: str, native_config_domain: dict[str, Any], sid: str, import_state: ImportStateController): + + show_params_rules: dict[str, Any] = { + 'limit': import_state.fwo_config.api_fetch_size, + 'use-object-dictionary': cp_const.use_object_dictionary, + 'details-level': 'standard', + 'package': policy_name + } + FWOLogger.debug(f"Getting NAT rules for package: {policy_name}", 4) + nat_rules = cp_getter.get_nat_rules_from_api_as_dict( + import_state.mgm_details.buildFwApiString(), sid, show_params_rules, + native_config_domain=native_config_domain + ) + if nat_rules: + native_config_domain['nat_rulebases'].append(nat_rules) else: native_config_domain['nat_rulebases'].append({"nat_rule_chunks": []}) @@ -435,20 +434,30 @@ def add_ordered_layers_to_native_config(ordered_layer_uids: list[str], show_para return policy_rulebases_uid_list -def get_ordered_layer_uids(policy_structure: list[dict[str, Any]], device_config: dict[str, Any], domain: str | None) -> list[str]: +def get_ordered_layer_uids(policy_structure: list[dict[str, Any]], device_config: dict[str, Any], domain: str | None) -> tuple[list[str], str | None]: """Get UIDs of ordered layers for policy of device """ ordered_layer_uids: list[str] = [] + policy_name: str | None = None + failsafe_multiple_policies_per_device = False for policy in policy_structure: found_target_in_policy = False for target in policy['targets']: if target['uid'] == device_config['uid'] or target['uid'] == 'all': - found_target_in_policy = True + found_target_in_policy = True + check_if_multiple_policies_per_device(failsafe_multiple_policies_per_device, device_config['uid']) + failsafe_multiple_policies_per_device = True if found_target_in_policy: append_access_layer_uid(policy, domain, ordered_layer_uids) + policy_name = policy['name'] + + return ordered_layer_uids, policy_name + - return ordered_layer_uids +def check_if_multiple_policies_per_device(failsafe_multiple_policies_per_device: bool, device_config_uid: str): + if failsafe_multiple_policies_per_device: + raise FwoImporterError('multiple policies for device ' + device_config_uid) def append_access_layer_uid(policy: dict[str, Any], domain: str | None, ordered_layer_uids: list[str]) -> None: From de62b23c0cab93680a7abe753b7d49b6e9ddd0f9 Mon Sep 17 00:00:00 2001 From: alf-cactus Date: Wed, 17 Dec 2025 15:43:29 +0100 Subject: [PATCH 02/10] cp nat --- .../fw_modules/checkpointR8x/cp_getter.py | 9 ++- .../fw_modules/checkpointR8x/fwcommon.py | 66 ++++++++++++------- 2 files changed, 47 insertions(+), 28 deletions(-) diff --git a/roles/importer/files/importer/fw_modules/checkpointR8x/cp_getter.py b/roles/importer/files/importer/fw_modules/checkpointR8x/cp_getter.py index 930f9f1f4e..2f85f8b53d 100644 --- a/roles/importer/files/importer/fw_modules/checkpointR8x/cp_getter.py +++ b/roles/importer/files/importer/fw_modules/checkpointR8x/cp_getter.py @@ -560,8 +560,11 @@ def assign_placeholder_uids(rulebase: dict[str, Any], section: dict[str, Any], r return placeholder_rule_uid, placeholder_rulebase_uid -def get_nat_rules_from_api_as_dict (api_v_url: str, sid: str, show_params_rules: dict[str, Any], native_config_domain: dict[str, Any]={}): - nat_rules: dict[str, list[Any]] = { "nat_rule_chunks": [] } +def get_nat_rules_from_api_as_dict(policy_dict: dict[str, Any], api_v_url: str, sid: str, show_params_rules: dict[str, Any], native_config_domain: dict[str, Any]={}) -> dict[str, Any]: + """Gets NAT rulebases, uid and name are augmented with _nat for uniquenes of rulebase_links""" + nat_rules: dict[str, Any] = { "uid": policy_dict['uid'] + "_nat", + "name": policy_dict['name'] + "_nat", + "chunks": []} current=0 total=current+1 while (current dict[str, Any]: } -def handle_nat_rules(policy_name: str, native_config_domain: dict[str, Any], sid: str, import_state: ImportStateController): +def handle_nat_rules(policy_dict: dict[str, Any], device_config: dict[str, Any], native_config_domain: dict[str, Any], sid: str, import_state: ImportStateController, fetched_nat_rulebases: list[str], fetched_but_empty_nat_rulebases: list[str]): - show_params_rules: dict[str, Any] = { - 'limit': import_state.fwo_config.api_fetch_size, - 'use-object-dictionary': cp_const.use_object_dictionary, - 'details-level': 'standard', - 'package': policy_name - } - FWOLogger.debug(f"Getting NAT rules for package: {policy_name}", 4) - nat_rules = cp_getter.get_nat_rules_from_api_as_dict( - import_state.mgm_details.buildFwApiString(), sid, show_params_rules, - native_config_domain=native_config_domain - ) - if nat_rules: - native_config_domain['nat_rulebases'].append(nat_rules) - else: - native_config_domain['nat_rulebases'].append({"nat_rule_chunks": []}) + if policy_dict["uid"] not in fetched_nat_rulebases + fetched_but_empty_nat_rulebases: + show_params_rules: dict[str, Any] = { + 'limit': import_state.fwo_config.api_fetch_size, + 'use-object-dictionary': cp_const.use_object_dictionary, + 'details-level': 'standard', + 'package': policy_dict + } + FWOLogger.debug(f"Getting NAT rules for package: {policy_dict['name']}", 4) + nat_rules = cp_getter.get_nat_rules_from_api_as_dict( + policy_dict, + import_state.mgm_details.buildFwApiString(), sid, show_params_rules, + native_config_domain=native_config_domain + ) + if nat_rules["chunks"]: + native_config_domain['nat_rulebases'].append(nat_rules) + fetched_nat_rulebases.append(policy_dict["uid"]) + else: + fetched_but_empty_nat_rulebases.append(policy_dict["uid"]) + + if policy_dict["uid"] in fetched_nat_rulebases: + device_config['rulebase_links'].append({ + 'from_rulebase_uid': policy_dict["uid"], + 'from_rule_uid': None, + 'to_rulebase_uid': policy_dict["uid"] + "_nat", + 'type': 'nat', + 'is_global': False, + 'is_initial': False, + 'is_section': False + }) def add_ordered_layers_to_native_config(ordered_layer_uids: list[str], show_params_rules: dict[str, Any], @@ -434,12 +450,12 @@ def add_ordered_layers_to_native_config(ordered_layer_uids: list[str], show_para return policy_rulebases_uid_list -def get_ordered_layer_uids(policy_structure: list[dict[str, Any]], device_config: dict[str, Any], domain: str | None) -> tuple[list[str], str | None]: +def get_ordered_layer_uids(policy_structure: list[dict[str, Any]], device_config: dict[str, Any], domain: str | None) -> tuple[list[str], dict[str, Any]]: """Get UIDs of ordered layers for policy of device """ ordered_layer_uids: list[str] = [] - policy_name: str | None = None + policy_dict: dict[str, str] = {} failsafe_multiple_policies_per_device = False for policy in policy_structure: found_target_in_policy = False @@ -450,9 +466,9 @@ def get_ordered_layer_uids(policy_structure: list[dict[str, Any]], device_config failsafe_multiple_policies_per_device = True if found_target_in_policy: append_access_layer_uid(policy, domain, ordered_layer_uids) - policy_name = policy['name'] + policy_dict = policy - return ordered_layer_uids, policy_name + return ordered_layer_uids, policy_dict def check_if_multiple_policies_per_device(failsafe_multiple_policies_per_device: bool, device_config_uid: str): From b0684876fe2ce1463fb1fdb05047acf74a1114cf Mon Sep 17 00:00:00 2001 From: alf-cactus Date: Mon, 22 Dec 2025 15:06:03 +0100 Subject: [PATCH 03/10] global_policy_structure none error --- .../files/importer/fw_modules/checkpointR8x/fwcommon.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/roles/importer/files/importer/fw_modules/checkpointR8x/fwcommon.py b/roles/importer/files/importer/fw_modules/checkpointR8x/fwcommon.py index d6b36bd3c2..774b595241 100644 --- a/roles/importer/files/importer/fw_modules/checkpointR8x/fwcommon.py +++ b/roles/importer/files/importer/fw_modules/checkpointR8x/fwcommon.py @@ -278,10 +278,10 @@ def create_ordered_manager_list(import_state: ImportState) -> list[ManagementCon def handle_super_manager( manager_details: ManagementController, cp_manager_api_base_url: str, show_params_policy_structure: dict[str, Any] -) -> tuple[list[Any], None, Any | None, str]: +) -> tuple[list[Any], list[Any] | None, Any | None, str]: # global assignments are fetched from mds domain mds_sid: str = cp_getter.login(manager_details) - global_policy_structure = None + global_policy_structure: list[Any] | None = [] global_domain = None global_assignments = cp_getter.get_global_assignments( cp_manager_api_base_url, mds_sid, show_params_policy_structure @@ -306,6 +306,9 @@ def handle_super_manager( else: raise FwoImporterError(f"Unexpected global assignments: {global_assignments!s}") + if len(global_policy_structure) == 0: + global_policy_structure = None + return global_assignments, global_policy_structure, global_domain, global_sid From dcd7f68faefaca7edd58513503b2b1e1091ea4d0 Mon Sep 17 00:00:00 2001 From: alf-cactus Date: Tue, 23 Dec 2025 11:39:12 +0100 Subject: [PATCH 04/10] cp nat wip --- .../fw_modules/checkpointR8x/cp_rule.py | 11 ++-- .../fw_modules/checkpointR8x/fwcommon.py | 62 ++++++++++++++----- 2 files changed, 53 insertions(+), 20 deletions(-) diff --git a/roles/importer/files/importer/fw_modules/checkpointR8x/cp_rule.py b/roles/importer/files/importer/fw_modules/checkpointR8x/cp_rule.py index a0aa82a2d7..1ca167d80b 100644 --- a/roles/importer/files/importer/fw_modules/checkpointR8x/cp_rule.py +++ b/roles/importer/files/importer/fw_modules/checkpointR8x/cp_rule.py @@ -28,7 +28,7 @@ def normalize_rulebases( native_config_global: dict[str, Any] | None, import_state: ImportState, normalized_config_dict: dict[str, Any], - normalized_config_global: dict[str, Any] | None, + normalized_config_global: dict[str, Any], is_global_loop_iteration: bool, ): normalized_config_dict["policies"] = [] @@ -38,7 +38,7 @@ def normalize_rulebases( uid_to_name_map[nw_obj["obj_uid"]] = nw_obj["obj_name"] fetched_rulebase_uids: list[str] = [] - if normalized_config_global is not None and normalized_config_global != {}: + if normalized_config_global != {}: fetched_rulebase_uids.extend( [normalized_rulebase_global.uid for normalized_rulebase_global in normalized_config_global["policies"]] ) @@ -51,7 +51,7 @@ def normalize_rulebases( is_global_loop_iteration, import_state, normalized_config_dict, - normalized_config_global, # type: ignore # TODO: check if normalized_config_global can be None, I am pretty sure it cannot be None here # noqa: PGH003 + normalized_config_global, ) # TODO: parse nat rulebase here @@ -66,7 +66,7 @@ def normalize_rulebases_for_each_link_destination( normalized_config_global: dict[str, Any], ): for rulebase_link in gateway["rulebase_links"]: - if rulebase_link["to_rulebase_uid"] not in fetched_rulebase_uids and rulebase_link["to_rulebase_uid"] != "": + if rulebase_link["to_rulebase_uid"] not in fetched_rulebase_uids and rulebase_link["type"] != "nat": rulebase_to_parse, is_section, is_placeholder = find_rulebase_to_parse( native_config["rulebases"], rulebase_link["to_rulebase_uid"] ) @@ -91,6 +91,9 @@ def normalize_rulebases_for_each_link_destination( else: normalized_config_dict["policies"].append(normalized_rulebase) + elif rulebase_link["to_rulebase_uid"] not in fetched_rulebase_uids and rulebase_link["type"] == "nat": + pass + def find_rulebase_to_parse(rulebase_list: list[dict[str, Any]], rulebase_uid: str) -> tuple[dict[str, Any], bool, bool]: """ diff --git a/roles/importer/files/importer/fw_modules/checkpointR8x/fwcommon.py b/roles/importer/files/importer/fw_modules/checkpointR8x/fwcommon.py index 774b595241..014d2229cd 100644 --- a/roles/importer/files/importer/fw_modules/checkpointR8x/fwcommon.py +++ b/roles/importer/files/importer/fw_modules/checkpointR8x/fwcommon.py @@ -133,7 +133,7 @@ def normalize_config( # in case of mds, first nativ config domain is global is_global_loop_iteration = False - native_config_global: dict[str, Any] = {} + native_config_global: dict[str, Any] | None = None normalized_config_global = {} if config_in.native_config["domains"][0]["is-super-manager"]: native_config_global = config_in.native_config["domains"][0] @@ -189,7 +189,7 @@ def normalize_config( def normalize_single_manager_config( native_config: dict[str, Any], - native_config_global: dict[str, Any], + native_config_global: dict[str, Any] | None, normalized_config_dict: dict[str, Any], normalized_config_global: dict[str, Any], import_state: ImportState, @@ -519,12 +519,13 @@ def handle_nat_rules( fetched_nat_rulebases: list[str], fetched_but_empty_nat_rulebases: list[str], ): + """Get nat rulebases, name and uid get _nat prefix and link to access rulebase""" if policy_dict["uid"] not in fetched_nat_rulebases + fetched_but_empty_nat_rulebases: show_params_rules: dict[str, Any] = { "limit": import_state.fwo_config.api_fetch_size, "use-object-dictionary": cp_const.use_object_dictionary, "details-level": "standard", - "package": policy_dict, + "package": policy_dict["name"], } FWOLogger.debug(f"Getting NAT rules for package: {policy_dict['name']}", 4) nat_rules = cp_getter.get_nat_rules_from_api_as_dict( @@ -536,22 +537,51 @@ def handle_nat_rules( ) if nat_rules["chunks"]: native_config_domain["nat_rulebases"].append(nat_rules) - fetched_nat_rulebases.append(policy_dict["uid"]) + fetched_nat_rulebases.append(policy_dict["uid"]) # uid without _nat postfix else: - fetched_but_empty_nat_rulebases.append(policy_dict["uid"]) + fetched_but_empty_nat_rulebases.append(policy_dict["uid"]) # uid without _nat postfix if policy_dict["uid"] in fetched_nat_rulebases: - device_config["rulebase_links"].append( - { - "from_rulebase_uid": policy_dict["uid"], - "from_rule_uid": None, - "to_rulebase_uid": policy_dict["uid"] + "_nat", - "type": "nat", - "is_global": False, - "is_initial": False, - "is_section": False, - } - ) + link_nat_rulebase_sections(policy_dict["uid"], native_config_domain["nat_rulebases"], device_config) + + +def link_nat_rulebase_sections(policy_dict_uid: str, nat_rulebases: list[Any], device_config: dict[str, Any]): + current_nat_rulebase_uid = policy_dict_uid + "_nat" + device_config["rulebase_links"].append( + { + "from_rulebase_uid": policy_dict_uid, + "from_rule_uid": None, + "to_rulebase_uid": current_nat_rulebase_uid, + "type": "nat", + "is_global": False, + "is_initial": False, + "is_section": False, + } + ) + for nat_rulebase in nat_rulebases: + if current_nat_rulebase_uid == nat_rulebase["uid"]: + for nat_rulebase_chunk in nat_rulebase["chunks"]: + if "rulebase" in nat_rulebase_chunk: + define_nat_section_chain(current_nat_rulebase_uid, nat_rulebase_chunk, device_config) + + +def define_nat_section_chain( + current_nat_rulebase_uid: str, nat_rulebase_chunk: dict[str, Any], device_config: dict[str, Any] +): + for nat_section in nat_rulebase_chunk["rulebase"]: + if nat_section["type"] == "nat-section": + device_config["rulebase_links"].append( + { + "from_rulebase_uid": current_nat_rulebase_uid, + "from_rule_uid": None, + "to_rulebase_uid": nat_section["uid"], + "type": "concatenated", + "is_global": False, + "is_initial": False, + "is_section": True, + } + ) + current_nat_rulebase_uid = nat_section["uid"] def add_ordered_layers_to_native_config( From 6a5720ab2839207b6ed747780d202594d5a10ccc Mon Sep 17 00:00:00 2001 From: alf-cactus Date: Tue, 23 Dec 2025 15:31:40 +0100 Subject: [PATCH 05/10] nat wip --- .../importer/fw_modules/checkpointR8x/cp_rule.py | 12 +++++++----- .../importer/fw_modules/checkpointR8x/fwcommon.py | 14 ++++++++++---- 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/roles/importer/files/importer/fw_modules/checkpointR8x/cp_rule.py b/roles/importer/files/importer/fw_modules/checkpointR8x/cp_rule.py index 1ca167d80b..974a7caf5a 100644 --- a/roles/importer/files/importer/fw_modules/checkpointR8x/cp_rule.py +++ b/roles/importer/files/importer/fw_modules/checkpointR8x/cp_rule.py @@ -66,7 +66,7 @@ def normalize_rulebases_for_each_link_destination( normalized_config_global: dict[str, Any], ): for rulebase_link in gateway["rulebase_links"]: - if rulebase_link["to_rulebase_uid"] not in fetched_rulebase_uids and rulebase_link["type"] != "nat": + if rulebase_link["to_rulebase_uid"] not in fetched_rulebase_uids: rulebase_to_parse, is_section, is_placeholder = find_rulebase_to_parse( native_config["rulebases"], rulebase_link["to_rulebase_uid"] ) @@ -77,6 +77,11 @@ def normalize_rulebases_for_each_link_destination( native_config_global["rulebases"], rulebase_link["to_rulebase_uid"] ) found_rulebase_in_global = True + # search in nat rulebases + if rulebase_to_parse == {}: + rulebase_to_parse, is_section, is_placeholder = find_rulebase_to_parse( + native_config["nat_rulebases"], rulebase_link["to_rulebase_uid"] + ) if rulebase_to_parse == {}: FWOLogger.warning("found to_rulebase link without rulebase in nativeConfig: " + str(rulebase_link)) continue @@ -91,9 +96,6 @@ def normalize_rulebases_for_each_link_destination( else: normalized_config_dict["policies"].append(normalized_rulebase) - elif rulebase_link["to_rulebase_uid"] not in fetched_rulebase_uids and rulebase_link["type"] == "nat": - pass - def find_rulebase_to_parse(rulebase_list: list[dict[str, Any]], rulebase_uid: str) -> tuple[dict[str, Any], bool, bool]: """ @@ -169,7 +171,7 @@ def parse_rulebase( ): if is_section: for rule in rulebase_to_parse["rulebase"]: - # delte_v sind import_id, parent_uid, config2import wirklich egal? Dann können wir diese argumente löschen - NAT ACHTUNG + # delete_v ist parent_uid wirklich egal? Dann können wir dieses argument löschen - NAT ACHTUNG parse_single_rule(rule, normalized_rulebase, normalized_rulebase.uid, None, gateway, policy_structure) FWOLogger.debug("parsed rulebase " + normalized_rulebase.uid, 4) diff --git a/roles/importer/files/importer/fw_modules/checkpointR8x/fwcommon.py b/roles/importer/files/importer/fw_modules/checkpointR8x/fwcommon.py index 014d2229cd..bf555e8441 100644 --- a/roles/importer/files/importer/fw_modules/checkpointR8x/fwcommon.py +++ b/roles/importer/files/importer/fw_modules/checkpointR8x/fwcommon.py @@ -325,6 +325,8 @@ def process_devices( native_config_global_domain: dict[str, Any], import_state: ImportState, ) -> None: + fetched_nat_rulebases: list[str] = [] + fetched_but_empty_nat_rulebases: list[str] = [] for device in manager_details.devices: device_config = initialize_device_config(device) if not device_config: @@ -366,8 +368,6 @@ def process_devices( global_ordered_layer_count=global_ordered_layer_count, ) - fetched_nat_rulebases: list[str] = [] - fetched_but_empty_nat_rulebases: list[str] = [] handle_nat_rules( policy_dict, device_config, @@ -547,6 +547,7 @@ def handle_nat_rules( def link_nat_rulebase_sections(policy_dict_uid: str, nat_rulebases: list[Any], device_config: dict[str, Any]): current_nat_rulebase_uid = policy_dict_uid + "_nat" + # link nat rulebase to access rulebase device_config["rulebase_links"].append( { "from_rulebase_uid": policy_dict_uid, @@ -558,16 +559,20 @@ def link_nat_rulebase_sections(policy_dict_uid: str, nat_rulebases: list[Any], d "is_section": False, } ) + # link all nat sections concatenated for nat_rulebase in nat_rulebases: if current_nat_rulebase_uid == nat_rulebase["uid"]: for nat_rulebase_chunk in nat_rulebase["chunks"]: if "rulebase" in nat_rulebase_chunk: - define_nat_section_chain(current_nat_rulebase_uid, nat_rulebase_chunk, device_config) + current_nat_rulebase_uid = define_nat_section_chain( + current_nat_rulebase_uid, nat_rulebase_chunk, device_config + ) + break def define_nat_section_chain( current_nat_rulebase_uid: str, nat_rulebase_chunk: dict[str, Any], device_config: dict[str, Any] -): +) -> str: for nat_section in nat_rulebase_chunk["rulebase"]: if nat_section["type"] == "nat-section": device_config["rulebase_links"].append( @@ -582,6 +587,7 @@ def define_nat_section_chain( } ) current_nat_rulebase_uid = nat_section["uid"] + return current_nat_rulebase_uid def add_ordered_layers_to_native_config( From d3c5af1694487c23ba4e3d92cefa6e680bab81a2 Mon Sep 17 00:00:00 2001 From: alf-cactus Date: Mon, 29 Dec 2025 07:04:33 +0100 Subject: [PATCH 06/10] cp nat missing xlate specific changes --- .../fw_modules/checkpointR8x/cp_rule.py | 168 ++++++++++-------- 1 file changed, 97 insertions(+), 71 deletions(-) diff --git a/roles/importer/files/importer/fw_modules/checkpointR8x/cp_rule.py b/roles/importer/files/importer/fw_modules/checkpointR8x/cp_rule.py index 974a7caf5a..5a7c624e87 100644 --- a/roles/importer/files/importer/fw_modules/checkpointR8x/cp_rule.py +++ b/roles/importer/files/importer/fw_modules/checkpointR8x/cp_rule.py @@ -171,14 +171,11 @@ def parse_rulebase( ): if is_section: for rule in rulebase_to_parse["rulebase"]: - # delete_v ist parent_uid wirklich egal? Dann können wir dieses argument löschen - NAT ACHTUNG - parse_single_rule(rule, normalized_rulebase, normalized_rulebase.uid, None, gateway, policy_structure) + parse_single_rule(rule, normalized_rulebase, normalized_rulebase.uid, gateway, policy_structure) FWOLogger.debug("parsed rulebase " + normalized_rulebase.uid, 4) elif is_placeholder: - parse_single_rule( - rulebase_to_parse, normalized_rulebase, normalized_rulebase.uid, None, gateway, policy_structure - ) + parse_single_rule(rulebase_to_parse, normalized_rulebase, normalized_rulebase.uid, gateway, policy_structure) else: parse_rulebase_chunk(rulebase_to_parse, normalized_rulebase, gateway, policy_structure) @@ -192,7 +189,7 @@ def parse_rulebase_chunk( for chunk in rulebase_to_parse["chunks"]: for rule in chunk["rulebase"]: if "rule-number" in rule: - parse_single_rule(rule, normalized_rulebase, normalized_rulebase.uid, None, gateway, policy_structure) + parse_single_rule(rule, normalized_rulebase, normalized_rulebase.uid, gateway, policy_structure) else: FWOLogger.debug("found unparsable rulebase: " + str(rulebase_to_parse), 9) @@ -219,6 +216,7 @@ def accept_malformed_parts(objects: dict[str, Any] | list[dict[str, Any]], part: return {} +# delete_v: mit tim reden, das sieht alles legacy aus. kann chunking jemals in regel-dicts/lists auftreten, evtl bei kleinem limit? def parse_rule_part( objects: dict[str, Any] | list[dict[str, Any] | None] | None, part: str = "source" ) -> dict[str, Any]: @@ -302,7 +300,6 @@ def parse_single_rule( native_rule: dict[str, Any], rulebase: Rulebase, layer_name: str, - parent_uid: str | None, gateway: dict[str, Any], policy_structure: list[dict[str, Any]], ): @@ -311,99 +308,128 @@ def parse_single_rule( "type" in native_rule and native_rule["type"] != "place-holder" and "rule-number" in native_rule ): # standard rule, no section header return - # the following objects might come in chunks: - source_objects: dict[str, str] = parse_rule_part(native_rule["source"], "source") - rule_src_ref, rule_src_name = sort_and_join_refs(list(source_objects.items())) - - dst_objects: dict[str, str] = parse_rule_part(native_rule["destination"], "destination") - rule_dst_ref, rule_dst_name = sort_and_join_refs(list(dst_objects.items())) - svc_objects: dict[str, str] = parse_rule_part(native_rule["service"], "service") - rule_svc_ref, rule_svc_name = sort_and_join_refs(list(svc_objects.items())) + + # start with values for both access and nat rules + last_change_admin = native_rule.get("meta-info", {}).get("last-modifier", None) + rule_name = native_rule.get("name") rule_enforced_on_gateways = parse_rule_enforced_on_gateway(gateway, policy_structure, native_rule=native_rule) list_of_gw_uids = sorted({enforceEntry.dev_uid for enforceEntry in rule_enforced_on_gateways}) str_list_of_gw_uids = LIST_DELIMITER.join(list_of_gw_uids) if list_of_gw_uids else None - - rule_track = _parse_track(native_rule=native_rule) - - action_objects = parse_rule_part(native_rule["action"], "action") - if action_objects is not None: # type: ignore # TODO: this should be never None # noqa: PGH003 - rule_action = LIST_DELIMITER.join(action_objects.values()) # expecting only a single action - else: - rule_action = None - FWOLogger.warning("found rule without action: " + str(native_rule)) - - time_objects = parse_rule_part(native_rule["time"], "time") - rule_time = LIST_DELIMITER.join(time_objects.values()) if time_objects else None - - # starting with the non-chunk objects - rule_name = native_rule.get("name") - - # new in v8.0.3: - rule_custom_fields = native_rule.get("custom-fields") - - # we leave out all last_admin info for now - last_change_admin = None - - parent_rule_uid = _parse_parent_rule_uid(parent_uid, native_rule=native_rule) - - # new in v5.5.1: - rule_type = native_rule.get("rule_type", "access") - comments = native_rule.get("comments") if comments == "": comments = None - + rule_custom_fields = native_rule.get("custom-fields") if "hits" in native_rule and "last-date" in native_rule["hits"] and "iso-8601" in native_rule["hits"]["last-date"]: last_hit = native_rule["hits"]["last-date"]["iso-8601"] else: last_hit = None + # objects only used in access rules + if native_rule["type"] == "access-rule": + source_negate = bool(native_rule["source-negate"]) + destination_negate = bool(native_rule["destination-negate"]) + service_negate = bool(native_rule["service-negate"]) + source_objects: dict[str, str] = parse_rule_part(native_rule["source"], "source") + rule_src_ref, rule_src_name = sort_and_join_refs(list(source_objects.items())) + dst_objects: dict[str, str] = parse_rule_part(native_rule["destination"], "destination") + rule_dst_ref, rule_dst_name = sort_and_join_refs(list(dst_objects.items())) + svc_objects: dict[str, str] = parse_rule_part(native_rule["service"], "service") + rule_svc_ref, rule_svc_name = sort_and_join_refs(list(svc_objects.items())) + action_objects = parse_rule_part(native_rule["action"], "action") + rule_action = LIST_DELIMITER.join(action_objects.values()) + rule_track = _parse_track(native_rule) + time_objects = parse_rule_part(native_rule["time"], "time") + rule_time = LIST_DELIMITER.join(time_objects.values()) if time_objects else None + rule_type = "access" + # objects only used in nat rules + elif native_rule["type"] == "nat-rule": + source_negate = False + destination_negate = False + service_negate = False + source_objects: dict[str, str] = parse_rule_part(native_rule["original-source"], "original-source") + rule_src_ref, rule_src_name = sort_and_join_refs(list(source_objects.items())) + dst_objects: dict[str, str] = parse_rule_part(native_rule["original-destination"], "original-destination") + rule_dst_ref, rule_dst_name = sort_and_join_refs(list(dst_objects.items())) + svc_objects: dict[str, str] = parse_rule_part(native_rule["original-service"], "original-service") + rule_svc_ref, rule_svc_name = sort_and_join_refs(list(svc_objects.items())) + rule_action = None # delete_v nochmal prüfen + rule_track = None # delete_v ist not null in db + rule_time = None + rule_type = "nat" + else: + raise FwoImporterErrorInconsistenciesError( + "Rule " + native_rule["uid"] + " is neither access-rule nor nat-rule" + ) + rule: dict[str, Any] = { + "last_change_admin": sanitize(last_change_admin), + "rule_name": sanitize(rule_name), + "parent_rule_uid": None, "rule_num": 0, "rule_num_numeric": 0, - "rulebase_name": sanitize(layer_name), + "rule_uid": sanitize(native_rule["uid"]), "rule_disabled": not bool(native_rule["enabled"]), - "rule_src_neg": bool(native_rule["source-negate"]), + "rule_src_neg": source_negate, + "rule_dst_neg": destination_negate, + "rule_svc_neg": service_negate, "rule_src": sanitize(rule_src_name), - "rule_src_refs": sanitize(rule_src_ref), - "rule_dst_neg": bool(native_rule["destination-negate"]), "rule_dst": sanitize(rule_dst_name), - "rule_dst_refs": sanitize(rule_dst_ref), - "rule_svc_neg": bool(native_rule["service-negate"]), "rule_svc": sanitize(rule_svc_name), + "rule_src_refs": sanitize(rule_src_ref), + "rule_dst_refs": sanitize(rule_dst_ref), "rule_svc_refs": sanitize(rule_svc_ref), "rule_action": sanitize(rule_action, lower=True), "rule_track": sanitize(rule_track, lower=True), "rule_installon": sanitize(str_list_of_gw_uids), "rule_time": sanitize(rule_time), - "rule_name": sanitize(rule_name), - "rule_uid": sanitize(native_rule["uid"]), - "rule_custom_fields": sanitize(rule_custom_fields), + "rule_comment": sanitize(comments), "rule_implied": False, + "rule_custom_fields": sanitize(rule_custom_fields), "rule_type": sanitize(rule_type), - "last_change_admin": sanitize(last_change_admin), - "parent_rule_uid": sanitize(parent_rule_uid), + "rulebase_name": sanitize(layer_name), "last_hit": sanitize(last_hit), } - if comments is not None: - rule["rule_comment"] = sanitize(comments) - rulebase.rules.update({rule["rule_uid"]: RuleNormalized(**rule)}) - -def _parse_parent_rule_uid(parent_uid: str | None, native_rule: dict[str, Any]) -> str | None: - # new in v5.1.17: - if "parent_rule_uid" in native_rule: - FWOLogger.debug( - "found rule (uid=" + native_rule["uid"] + ") with parent_rule_uid set: " + native_rule["parent_rule_uid"] - ) - parent_rule_uid = native_rule["parent_rule_uid"] - else: - parent_rule_uid = parent_uid + if native_rule["type"] == "nat-rule": + rule.update({"xlate_rule": "delete_v new xlate_rule uid"}) + source_objects: dict[str, str] = parse_rule_part(native_rule["translated-source"], "translated-source") + rule_src_ref, rule_src_name = sort_and_join_refs(list(source_objects.items())) + dst_objects: dict[str, str] = parse_rule_part(native_rule["translated-destination"], "translated-destination") + rule_dst_ref, rule_dst_name = sort_and_join_refs(list(dst_objects.items())) + svc_objects: dict[str, str] = parse_rule_part(native_rule["translated-service"], "translated-service") + rule_svc_ref, rule_svc_name = sort_and_join_refs(list(svc_objects.items())) + + xlate_rule: dict[str, Any] = { + "last_change_admin": sanitize(last_change_admin), + "rule_name": sanitize(rule_name), # delete_v + "parent_rule_uid": None, + "rule_num": 0, + "rule_num_numeric": 0, + "rule_uid": sanitize(native_rule["uid"]), # delete_v + "rule_disabled": not bool(native_rule["enabled"]), + "rule_src_neg": source_negate, + "rule_dst_neg": destination_negate, + "rule_svc_neg": service_negate, + "rule_src": sanitize(rule_src_name), + "rule_dst": sanitize(rule_dst_name), + "rule_svc": sanitize(rule_svc_name), + "rule_src_refs": sanitize(rule_src_ref), + "rule_dst_refs": sanitize(rule_dst_ref), + "rule_svc_refs": sanitize(rule_svc_ref), + "rule_action": sanitize(rule_action, lower=True), + "rule_track": sanitize(rule_track, lower=True), + "rule_installon": sanitize(str_list_of_gw_uids), + "rule_time": sanitize(rule_time), + "rule_comment": sanitize(comments), + "rule_implied": False, + "rule_custom_fields": sanitize(rule_custom_fields), + "rule_type": sanitize(rule_type), + "rulebase_name": sanitize(layer_name), + "last_hit": sanitize(last_hit), + } + rulebase.rules.update({rule["rule_uid"]: RuleNormalized(**xlate_rule)}) - if parent_rule_uid == "": - parent_rule_uid = None - - return parent_rule_uid + rulebase.rules.update({rule["rule_uid"]: RuleNormalized(**rule)}) def _parse_track(native_rule: dict[str, Any]) -> str: From 2edeb070efa3addf2bea6c12d64706e3dc4c804a Mon Sep 17 00:00:00 2001 From: alf-cactus Date: Wed, 31 Dec 2025 13:29:24 +0100 Subject: [PATCH 07/10] documentation and placeholder rule --- documentation/rework-2025/rulebase-links.md | 80 +++++++++++++++++++ .../files/sql/creation/fworch-fill-stm.sql | 1 + roles/database/files/upgrade/9.0.sql | 1 + .../fw_modules/checkpointR8x/cp_rule.py | 10 +-- .../fw_modules/checkpointR8x/fwcommon.py | 3 +- 5 files changed, 88 insertions(+), 7 deletions(-) create mode 100644 documentation/rework-2025/rulebase-links.md diff --git a/documentation/rework-2025/rulebase-links.md b/documentation/rework-2025/rulebase-links.md new file mode 100644 index 0000000000..592a7840c7 --- /dev/null +++ b/documentation/rework-2025/rulebase-links.md @@ -0,0 +1,80 @@ +# Rulebase Links + +## Concept + +- each firewall device defines one directed tree for its rulebases +- rulebases are the tree verticies and rulebase links are tree edges +- rulebases contain a (possibly empty) list of rules and a uid/id +- in context of database we use ids, in importer context uids, here we default to ids +- each rulebase tree is rooted, this is defined with the initial rulebase link +- rulebase links have many fields, each is explained in this document + +## Rulebase Link Fields + +### From and To Rulebase Id + +- **from_rulebase_id** and **to_rulebase_id** constitute the directed tree edges +- both fields are not NULL, with one exception, from_rulebase_id is empty for the initial rulebase link +- sometimes rulebases need to be linked from a specific rule (e.g. inline-layer, domain) +- this is done with **from_rule_id** +- from _rule_id is a rule id that is in the from_rulebase_id + +### Boolean Flags + +- **is_initial** is true, if to_rulebase_id is the first rulebase for the device +- **is_global** is true, if to_rulebase_id belongs to the super manager +- **is_section** is true, if to_rulebase_id gets a section header in reporting +- sections get a section header in reporting and are collapsible + +### Types + +Each link has a **type** that explains +- how the rulebase tree is traversed by incoming traffic requests +- how the rules report should look like +- how rule numbers are created + +#### Ordered + +- high level rulebase concept +- ordered rulebases are a chain +- each ordered rulebase needs one accept rule to allow traffic request +- first ordered rulebase gets number 1 and so on +- rules are numbered in dot notation, 2.13 is the 13th rule in the 2nd ordered rulebase +- if only one (initial) ordered rulebase exists the number 1. is ommited for all rules +- each ordered rulebase gets a header that is collapsible in reporting + +#### Domain + +- from_rulebase_id is a global rulebase, to_rulebase_id is a local rulebase +- from_rule_id is defined and named placeholder rule +- traffic request traverses from_rulebase_id up to and including placeholder rule +- then domain rules are traversed and after that the remaining global rules +- if traffic is droped in domain rules, it is droped overall +- numbering is in dot notation, each domain rule gets the number of placeholder rule as prefix +- 2.3.38 is the 38th rule in the local rulebase, placeholder rule is the 3rd global rule in the 2nd global ordered rulebase +- in reporting to_rulebase_id is collapsible with a the placeholder rule as banner + +#### Concatenated + +- from_rulebase_id and to_rulebase_id are the same rulebase in context of a traffic request +- first from_rulebase_id is traversed for an accept rule and if none is found, then to_rulebase_id +- rule numbers are continued, if rulebase 1 ends with rule 16, then rulebase 2 starts with rule 17 +- concatenated rulebases are not collapsible in reporting by default, but could be if is_section is true + +#### Inline + +- from_rulebase_id is interrupted by to_rulebase_id +- from_rule_id is defined and named layer guard +- if layer guard is reached by traffic traversal and allows traffic, then all to_rulebase_id are traversed +- if to_rulebase_id allows traffic it is allowed overall, else from_rulebase_id keeps getting traversed after layer guard +- inline rulebases may be nested +- numbering is in dot notation, each inline rule gets the number of from_rule_id as prefix +- 2.3.18 is the 18th rule in the nested inline rulebase, its layerguard is the 3rd rule in the inline rulebase with layer guard 2 in the ordered layer +- in reporting to_rulebase_id is collapsible with a the layer guard rule as banner + +#### NAT + +- each ordered from_rulebase_id may have a nat to_rulebase_id +- nat rulebases do not contain access rules but nat rules +- nat rules are not important for numbering or traffic traversal +- nat rules get their own report diff --git a/roles/database/files/sql/creation/fworch-fill-stm.sql b/roles/database/files/sql/creation/fworch-fill-stm.sql index 6c121ac3f4..6186e4b43b 100644 --- a/roles/database/files/sql/creation/fworch-fill-stm.sql +++ b/roles/database/files/sql/creation/fworch-fill-stm.sql @@ -543,6 +543,7 @@ insert into stm_link_type (id, name) VALUES (2, 'ordered'); insert into stm_link_type (id, name) VALUES (3, 'inline'); insert into stm_link_type (id, name) VALUES (4, 'concatenated'); insert into stm_link_type (id, name) VALUES (5, 'domain'); +insert into stm_link_type (id, name) VALUES (6, 'nat'); -- insert into compliance.assessability_issue_type (type_id, type_name) VALUES (1, 'empty group'); -- insert into compliance.assessability_issue_type (type_id, type_name) VALUES (2, 'broadcast address'); diff --git a/roles/database/files/upgrade/9.0.sql b/roles/database/files/upgrade/9.0.sql index 4a65dff012..d72944b3e8 100644 --- a/roles/database/files/upgrade/9.0.sql +++ b/roles/database/files/upgrade/9.0.sql @@ -789,6 +789,7 @@ insert into stm_link_type (id, name) VALUES (2, 'ordered') ON CONFLICT DO NOTHIN insert into stm_link_type (id, name) VALUES (3, 'inline') ON CONFLICT DO NOTHING; insert into stm_link_type (id, name) VALUES (4, 'concatenated') ON CONFLICT DO NOTHING; insert into stm_link_type (id, name) VALUES (5, 'domain') ON CONFLICT DO NOTHING; +insert into stm_link_type (id, name) VALUES (6, 'nat') ON CONFLICT DO NOTHING; delete from stm_link_type where name in ('initial','global','local','section'); -- initial and global/local are additional flags now -- TODO delete all rule.parent_rule_id and rule.parent_rule_type, always = None so far diff --git a/roles/importer/files/importer/fw_modules/checkpointR8x/cp_rule.py b/roles/importer/files/importer/fw_modules/checkpointR8x/cp_rule.py index 5a7c624e87..7b025ca5b5 100644 --- a/roles/importer/files/importer/fw_modules/checkpointR8x/cp_rule.py +++ b/roles/importer/files/importer/fw_modules/checkpointR8x/cp_rule.py @@ -352,8 +352,8 @@ def parse_single_rule( rule_dst_ref, rule_dst_name = sort_and_join_refs(list(dst_objects.items())) svc_objects: dict[str, str] = parse_rule_part(native_rule["original-service"], "original-service") rule_svc_ref, rule_svc_name = sort_and_join_refs(list(svc_objects.items())) - rule_action = None # delete_v nochmal prüfen - rule_track = None # delete_v ist not null in db + rule_action = "drop" + rule_track = "none" rule_time = None rule_type = "nat" else: @@ -391,7 +391,7 @@ def parse_single_rule( } if native_rule["type"] == "nat-rule": - rule.update({"xlate_rule": "delete_v new xlate_rule uid"}) + rule.update({"xlate_rule": sanitize(native_rule["uid"] + "_translated")}) source_objects: dict[str, str] = parse_rule_part(native_rule["translated-source"], "translated-source") rule_src_ref, rule_src_name = sort_and_join_refs(list(source_objects.items())) dst_objects: dict[str, str] = parse_rule_part(native_rule["translated-destination"], "translated-destination") @@ -401,11 +401,11 @@ def parse_single_rule( xlate_rule: dict[str, Any] = { "last_change_admin": sanitize(last_change_admin), - "rule_name": sanitize(rule_name), # delete_v + "rule_name": sanitize(rule_name), "parent_rule_uid": None, "rule_num": 0, "rule_num_numeric": 0, - "rule_uid": sanitize(native_rule["uid"]), # delete_v + "rule_uid": sanitize(native_rule["uid"] + "_translated"), "rule_disabled": not bool(native_rule["enabled"]), "rule_src_neg": source_negate, "rule_dst_neg": destination_negate, diff --git a/roles/importer/files/importer/fw_modules/checkpointR8x/fwcommon.py b/roles/importer/files/importer/fw_modules/checkpointR8x/fwcommon.py index bf555e8441..a96257332a 100644 --- a/roles/importer/files/importer/fw_modules/checkpointR8x/fwcommon.py +++ b/roles/importer/files/importer/fw_modules/checkpointR8x/fwcommon.py @@ -461,7 +461,6 @@ def define_global_rulebase_link( # parse global rulebases, find place-holders and link local rulebases placeholder_link_index = 0 for global_rulebase_uid in global_policy_rulebases_uid_list: - placeholder_rule_uid = "" for rulebase in native_config_global_domain["rulebases"]: if rulebase["uid"] == global_rulebase_uid: placeholder_rule_uid, placeholder_rulebase_uid = cp_getter.get_placeholder_in_rulebase(rulebase) @@ -475,7 +474,7 @@ def define_global_rulebase_link( device_config["rulebase_links"].append( { "from_rulebase_uid": placeholder_rulebase_uid, - "from_rule_uid": None, + "from_rule_uid": placeholder_rule_uid, "to_rulebase_uid": ordered_layer_uid, "type": "domain", "is_global": False, From f1a5b2a1173e24ef5c4631e377f9692b02a0b02f Mon Sep 17 00:00:00 2001 From: alf-cactus Date: Wed, 31 Dec 2025 13:39:32 +0100 Subject: [PATCH 08/10] code comments --- .../fw_modules/checkpointR8x/cp_rule.py | 27 +------------------ .../fw_modules/fortiadom5ff/fmgr_rule.py | 4 +-- .../fw_modules/fortiadom5ff/fwcommon.py | 2 +- 3 files changed, 4 insertions(+), 29 deletions(-) diff --git a/roles/importer/files/importer/fw_modules/checkpointR8x/cp_rule.py b/roles/importer/files/importer/fw_modules/checkpointR8x/cp_rule.py index 7b025ca5b5..d941f58cbe 100644 --- a/roles/importer/files/importer/fw_modules/checkpointR8x/cp_rule.py +++ b/roles/importer/files/importer/fw_modules/checkpointR8x/cp_rule.py @@ -216,7 +216,7 @@ def accept_malformed_parts(objects: dict[str, Any] | list[dict[str, Any]], part: return {} -# delete_v: mit tim reden, das sieht alles legacy aus. kann chunking jemals in regel-dicts/lists auftreten, evtl bei kleinem limit? +# TODO: chunking in object dicts should not be neccessary if limit is high enough def parse_rule_part( objects: dict[str, Any] | list[dict[str, Any] | None] | None, part: str = "source" ) -> dict[str, Any]: @@ -492,31 +492,6 @@ def resolve_nwobj_uid_to_name(nw_obj_uid: str) -> str: return "" -# delete_v: left here only for nat case -def check_and_add_section_header( - src_rulebase: dict[str, Any], - target_rulebase: Rulebase, - layer_name: str, - import_id: str, - section_header_uids: set[str], -): - # TODO: re-implement - raise NotImplementedError("check_and_add_section_header is not implemented yet.") - - -def insert_section_header_rule( - _target_rulebase: Rulebase, - _section_name: str, - _layer_name: str, - _import_id: str, - _src_rulebase_uid: str, - _section_header_uids: set[str], - _parent_uid: str, -): - # TODO: re-implement - return - - def ensure_json(raw: str) -> Any: """ Tries to parse the given string as valid JSON. diff --git a/roles/importer/files/importer/fw_modules/fortiadom5ff/fmgr_rule.py b/roles/importer/files/importer/fw_modules/fortiadom5ff/fmgr_rule.py index 73fe4b3fe2..d144f04b74 100644 --- a/roles/importer/files/importer/fw_modules/fortiadom5ff/fmgr_rule.py +++ b/roles/importer/files/importer/fw_modules/fortiadom5ff/fmgr_rule.py @@ -537,7 +537,7 @@ def get_and_link_global_rulebase( if not is_rulebase_already_fetched( native_config_global["rulebases"], rulebase_type_prefix + "_v6_" + global_pkg_name ): - # delete_v: hier auch options=options? + # TODO: should try options=options here fmgr_getter.update_config_with_fortinet_api_call( native_config_global["rulebases"], sid, @@ -715,7 +715,7 @@ def get_nat_policy( ) -# delete_v: ab hier kann sehr viel weg, ich lasses vorerst zB für die nat +# TODO: delte all unused code here after nat is working again # pure nat rules diff --git a/roles/importer/files/importer/fw_modules/fortiadom5ff/fwcommon.py b/roles/importer/files/importer/fw_modules/fortiadom5ff/fwcommon.py index cc3fbe1c59..8d7f8d514e 100644 --- a/roles/importer/files/importer/fw_modules/fortiadom5ff/fwcommon.py +++ b/roles/importer/files/importer/fw_modules/fortiadom5ff/fwcommon.py @@ -45,7 +45,7 @@ def get_config( config_in.native_config["domains"].append(native_config_global) # type: ignore #TYPING: None or not None this is the question # noqa: PGH003 adom_list = build_adom_list(import_state.state) adom_device_vdom_structure = build_adom_device_vdom_structure(adom_list, sid, fm_api_url) - # delete_v: das geht schief für unschöne adoms + # TODO: get_arbitrary_vdom may fail for pre-defined forti managers arbitrary_vdom_for_updateable_objects = get_arbitrary_vdom(adom_device_vdom_structure) adom_device_vdom_policy_package_structure = add_policy_package_to_vdoms( adom_device_vdom_structure, sid, fm_api_url From 00dd9807d866f56dcb7cedd403047faec8edd70b Mon Sep 17 00:00:00 2001 From: alf-cactus Date: Wed, 31 Dec 2025 13:53:06 +0100 Subject: [PATCH 09/10] refactor --- .../fw_modules/checkpointR8x/cp_rule.py | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/roles/importer/files/importer/fw_modules/checkpointR8x/cp_rule.py b/roles/importer/files/importer/fw_modules/checkpointR8x/cp_rule.py index d941f58cbe..6c1e644f9d 100644 --- a/roles/importer/files/importer/fw_modules/checkpointR8x/cp_rule.py +++ b/roles/importer/files/importer/fw_modules/checkpointR8x/cp_rule.py @@ -91,10 +91,21 @@ def normalize_rulebases_for_each_link_destination( ) fetched_rulebase_uids.append(rulebase_link["to_rulebase_uid"]) - if found_rulebase_in_global: - normalized_config_global["policies"].append(normalized_rulebase) - else: - normalized_config_dict["policies"].append(normalized_rulebase) + append_normalized_rulebase( + normalized_config_dict, normalized_config_global, normalized_rulebase, found_rulebase_in_global + ) + + +def append_normalized_rulebase( + normalized_config_dict: dict[str, Any], + normalized_config_global: dict[str, Any], + normalized_rulebase: Rulebase, + found_rulebase_in_global: bool, +): + if found_rulebase_in_global: + normalized_config_global["policies"].append(normalized_rulebase) + else: + normalized_config_dict["policies"].append(normalized_rulebase) def find_rulebase_to_parse(rulebase_list: list[dict[str, Any]], rulebase_uid: str) -> tuple[dict[str, Any], bool, bool]: From 05f6bb5d8e69ee9d654004b3e9122c0fe6f18407 Mon Sep 17 00:00:00 2001 From: alf-cactus Date: Wed, 31 Dec 2025 14:24:29 +0100 Subject: [PATCH 10/10] refactor --- .../fw_modules/checkpointR8x/cp_rule.py | 44 +++++++------------ 1 file changed, 16 insertions(+), 28 deletions(-) diff --git a/roles/importer/files/importer/fw_modules/checkpointR8x/cp_rule.py b/roles/importer/files/importer/fw_modules/checkpointR8x/cp_rule.py index 6c1e644f9d..e1b75f20f8 100644 --- a/roles/importer/files/importer/fw_modules/checkpointR8x/cp_rule.py +++ b/roles/importer/files/importer/fw_modules/checkpointR8x/cp_rule.py @@ -1,5 +1,6 @@ import ast import json +from copy import deepcopy from typing import Any from fwo_base import sanitize, sort_and_join_refs @@ -410,34 +411,21 @@ def parse_single_rule( svc_objects: dict[str, str] = parse_rule_part(native_rule["translated-service"], "translated-service") rule_svc_ref, rule_svc_name = sort_and_join_refs(list(svc_objects.items())) - xlate_rule: dict[str, Any] = { - "last_change_admin": sanitize(last_change_admin), - "rule_name": sanitize(rule_name), - "parent_rule_uid": None, - "rule_num": 0, - "rule_num_numeric": 0, - "rule_uid": sanitize(native_rule["uid"] + "_translated"), - "rule_disabled": not bool(native_rule["enabled"]), - "rule_src_neg": source_negate, - "rule_dst_neg": destination_negate, - "rule_svc_neg": service_negate, - "rule_src": sanitize(rule_src_name), - "rule_dst": sanitize(rule_dst_name), - "rule_svc": sanitize(rule_svc_name), - "rule_src_refs": sanitize(rule_src_ref), - "rule_dst_refs": sanitize(rule_dst_ref), - "rule_svc_refs": sanitize(rule_svc_ref), - "rule_action": sanitize(rule_action, lower=True), - "rule_track": sanitize(rule_track, lower=True), - "rule_installon": sanitize(str_list_of_gw_uids), - "rule_time": sanitize(rule_time), - "rule_comment": sanitize(comments), - "rule_implied": False, - "rule_custom_fields": sanitize(rule_custom_fields), - "rule_type": sanitize(rule_type), - "rulebase_name": sanitize(layer_name), - "last_hit": sanitize(last_hit), - } + xlate_rule: dict[str, Any] = deepcopy(rule) + xlate_rule.update( + { + "rule_uid": sanitize(native_rule["uid"] + "_translated"), + "rule_src_neg": source_negate, + "rule_dst_neg": destination_negate, + "rule_svc_neg": service_negate, + "rule_src": sanitize(rule_src_name), + "rule_dst": sanitize(rule_dst_name), + "rule_svc": sanitize(rule_svc_name), + "rule_src_refs": sanitize(rule_src_ref), + "rule_dst_refs": sanitize(rule_dst_ref), + "rule_svc_refs": sanitize(rule_svc_ref), + } + ) rulebase.rules.update({rule["rule_uid"]: RuleNormalized(**xlate_rule)}) rulebase.rules.update({rule["rule_uid"]: RuleNormalized(**rule)})