diff --git a/.agents b/.agents index 74356aac20..c677fd615f 160000 --- a/.agents +++ b/.agents @@ -1 +1 @@ -Subproject commit 74356aac20d8da2ab73f49f1a67a4dbfc82fb435 +Subproject commit c677fd615fbcc4cef329ef12e4e9ad7eea4cff3d diff --git a/inventory/group_vars/all.yml b/inventory/group_vars/all.yml index ddb105e237..6279cd945d 100644 --- a/inventory/group_vars/all.yml +++ b/inventory/group_vars/all.yml @@ -1,5 +1,5 @@ ### general settings -product_version: "9.1.11" +product_version: "9.1.12" ansible_user: "{{ lookup('env', 'USER') }}" ansible_become_method: sudo ansible_python_interpreter: /usr/bin/python3 diff --git a/roles/common/files/fwo-api-calls/report/getManagementForLatestNormalizedConfig.graphql b/roles/common/files/fwo-api-calls/report/getManagementForLatestNormalizedConfig.graphql index a44ddadeee..6044c7e273 100644 --- a/roles/common/files/fwo-api-calls/report/getManagementForLatestNormalizedConfig.graphql +++ b/roles/common/files/fwo-api-calls/report/getManagementForLatestNormalizedConfig.graphql @@ -123,6 +123,8 @@ fragment ruleFragment on rule { rule_custom_fields rule_implied nat_rule + access_rule + xlate_rule uiuser { uiuser_username } @@ -133,6 +135,9 @@ fragment ruleFragment on rule { rule_last_hit } rule_comment + ruleByXlateRule { + rule_uid + } section_header: rule_head_text } diff --git a/roles/common/files/fwo-api-calls/report/getManagementForNormalizedConfig.graphql b/roles/common/files/fwo-api-calls/report/getManagementForNormalizedConfig.graphql index b0ad7066cc..ec4eee9aa7 100644 --- a/roles/common/files/fwo-api-calls/report/getManagementForNormalizedConfig.graphql +++ b/roles/common/files/fwo-api-calls/report/getManagementForNormalizedConfig.graphql @@ -178,6 +178,8 @@ fragment ruleFragment on rule { rule_custom_fields rule_implied nat_rule + access_rule + xlate_rule uiuser { uiuser_username } @@ -188,6 +190,9 @@ fragment ruleFragment on rule { rule_last_hit } rule_comment + ruleByXlateRule { + rule_uid + } section_header: rule_head_text } diff --git a/roles/database/files/sql/creation/fworch-fill-stm.sql b/roles/database/files/sql/creation/fworch-fill-stm.sql index 460049b778..bf8b5939fb 100644 --- a/roles/database/files/sql/creation/fworch-fill-stm.sql +++ b/roles/database/files/sql/creation/fworch-fill-stm.sql @@ -580,6 +580,7 @@ insert into stm_link_type (id, name) VALUES (2, 'ordered'); insert into stm_link_type (id, name) VALUES (3, 'inline'); insert into stm_link_type (id, name) VALUES (4, 'concatenated'); insert into stm_link_type (id, name) VALUES (5, 'domain'); +insert into stm_link_type (id, name) VALUES (6, 'nat'); -- insert into compliance.assessability_issue_type (type_id, type_name) VALUES (1, 'empty group'); -- insert into compliance.assessability_issue_type (type_id, type_name) VALUES (2, 'broadcast address'); diff --git a/roles/database/files/upgrade/9.1.12.sql b/roles/database/files/upgrade/9.1.12.sql new file mode 100644 index 0000000000..b89901f26a --- /dev/null +++ b/roles/database/files/upgrade/9.1.12.sql @@ -0,0 +1 @@ +insert into stm_link_type (id, name) VALUES (6, 'nat') ON CONFLICT DO NOTHING; \ No newline at end of file diff --git a/roles/importer/files/importer/fw_modules/checkpointR8x/cp_getter.py b/roles/importer/files/importer/fw_modules/checkpointR8x/cp_getter.py index 2d12b0c6bc..ba10b457e5 100644 --- a/roles/importer/files/importer/fw_modules/checkpointR8x/cp_getter.py +++ b/roles/importer/files/importer/fw_modules/checkpointR8x/cp_getter.py @@ -348,6 +348,7 @@ def get_rulebases( native_config_domain: dict[str, Any] | None, device_config: dict[str, Any] | None, policy_rulebases_uid_list: list[str], + policy_structure: dict[str, Any], is_global: bool = False, access_type: str = "access", rulebase_uid: str | None = None, @@ -407,6 +408,7 @@ def get_rulebases( show_params_rules, is_global, policy_rulebases_uid_list, + policy_structure=policy_structure, ) @@ -530,6 +532,7 @@ def get_inline_layers_recursively( show_params_rules: dict[str, Any], is_global: bool, policy_rulebases_uid_list: list[str], + policy_structure: dict[str, Any], ) -> list[str]: """ Takes current_rulebase, splits sections into sub-rulebases and searches for layerguards to fetch @@ -569,6 +572,7 @@ def get_inline_layers_recursively( is_global=is_global, access_type="access", rulebase_uid=rule["inline-layer"], + policy_structure=policy_structure, ) return policy_rulebases_uid_list @@ -978,3 +982,30 @@ def get_object_details_from_api(uid_missing_obj: str, sid: str = "", apiurl: str return obj FWOLogger.warning(f"missing nw obj of unexpected type '{obj_type}': {uid_missing_obj}") return {} + + +def get_gateways_and_servers(sid: str = "", apiurl: str = "") -> list[dict[str, Any]]: + """Fetch gateways and servers from the API.""" + current = 0 + total = 1 + + gateways_and_servers: list[dict[str, Any]] = [] + + while current < total: + try: + result = cp_api_call(apiurl, "show-gateways-and-servers", {"details-level": "full", "offset": current}, sid) + except Exception as e: + raise FwoImporterError(f"error while trying to get gateways and servers: {e}") + + if result is None or "objects" not in result: + raise FwoImporterError("no objects received while trying to get gateways and servers") + + gateways_and_servers.extend(result["objects"]) + + if "total" not in result or "to" not in result: + raise FwoImporterError("result does not contain total or to field while trying to get gateways and servers") + + total = result["total"] + current = result["to"] + + return gateways_and_servers diff --git a/roles/importer/files/importer/fw_modules/checkpointR8x/cp_nat.py b/roles/importer/files/importer/fw_modules/checkpointR8x/cp_nat.py new file mode 100644 index 0000000000..4dc3461165 --- /dev/null +++ b/roles/importer/files/importer/fw_modules/checkpointR8x/cp_nat.py @@ -0,0 +1,264 @@ +from typing import Any + +from fw_modules.checkpointR8x.cp_rule import parse_single_rule +from fwo_log import FWOLogger +from models.import_state import ImportState +from models.rulebase import Rulebase + + +def normalize_nat_rules( + native_config: dict[str, Any], + import_state: ImportState, + normalized_config: dict[str, Any], +): + native_nat_rulebases = native_config.get("nat_rulebases", []) + if not native_nat_rulebases: + return + + for gateway in native_config["gateways"]: + parse_native_nat_rulebases(gateway, native_nat_rulebases, import_state, normalized_config, native_config) + + +def get_initial_nat_rulebase_link(gateway: dict[str, Any], normalized_config: dict[str, Any]) -> dict[str, Any] | None: + normalized_gateway = next((gw for gw in normalized_config["gateways"] if gw["Uid"] == gateway["uid"]), None) + + if normalized_gateway is None: + FWOLogger.warning("Could not find normalized gateway for initial NAT rulebase link: " + str(gateway["uid"])) + return None + + initial_gateway_link = next( + ( + link + for link in normalized_gateway["RulebaseLinks"] + if link.get("is_initial") and link.get("link_type") == "ordered" + ), + None, + ) + + if initial_gateway_link is None: + FWOLogger.warning("Could not find initial gateway rulebase link for NAT rulebase: " + str(gateway["uid"])) + return None + + return initial_gateway_link + + +def parse_native_nat_rulebases( + gateway: dict[str, Any], + native_nat_rulebases: list[dict[str, Any]], + import_state: ImportState, + normalized_config: dict[str, Any], + native_config: dict[str, Any], +): + for nat_rulebase in native_nat_rulebases: + if "nat_rule_chunks" not in nat_rulebase: + continue + + normalized_nat_rulebase = insert_parent_nat_rulebase(gateway, import_state, normalized_config) + normalized_gateway = next((gw for gw in normalized_config["gateways"] if gw["Uid"] == gateway["uid"]), None) + + if normalized_gateway is None: + FWOLogger.warning("Could not find normalized gateway for NAT rulebase, skipping: " + str(gateway["uid"])) + continue + + initial_gateway_link = get_initial_nat_rulebase_link(gateway, normalized_config) + + if initial_gateway_link is None: + continue + + initial_to_rulebase_uid = initial_gateway_link.get("to_rulebase_uid") + if not initial_to_rulebase_uid: + FWOLogger.warning( + "Initial gateway rulebase link is missing to_rulebase_uid for NAT rulebase, skipping: " + + str(gateway["uid"]) + ) + continue + + insert_rulebase_link( + from_rulebase_uid=initial_to_rulebase_uid, + to_rulebase_uid=normalized_nat_rulebase.uid, + link_type="nat", + normalized_gateway=normalized_gateway, + ) + + for chunk in nat_rulebase["nat_rule_chunks"]: + parse_nat_rule_chunk( + chunk, + normalized_nat_rulebase, + gateway, + native_config, + import_state, + normalized_config, + normalized_gateway, + ) + + +def insert_parent_nat_rulebase( + gateway: dict[str, Any], + import_state: ImportState, + normalized_config: dict[str, Any], +) -> Rulebase: + nat_rulebase_uid = "nat-rulebase-" + gateway["uid"] + existing_nat_rulebase = next((rb for rb in normalized_config["policies"] if rb.uid == nat_rulebase_uid), None) + + if existing_nat_rulebase is not None: + return existing_nat_rulebase + + normalized_nat_rulebase = Rulebase( + uid=nat_rulebase_uid, + mgm_uid=import_state.mgm_details.uid, + name="NAT", + rules={}, + ) + + normalized_config["policies"].append(normalized_nat_rulebase) + + return normalized_nat_rulebase + + +def insert_rulebase_link( + from_rulebase_uid: str, + to_rulebase_uid: str, + link_type: str, + normalized_gateway: dict[str, Any], +) -> None: + if not any( + link + for link in normalized_gateway["RulebaseLinks"] + if link["to_rulebase_uid"] == to_rulebase_uid + and link["link_type"] == link_type + and link["from_rulebase_uid"] == from_rulebase_uid + ): + normalized_gateway["RulebaseLinks"].append( + { + "from_rulebase_uid": from_rulebase_uid, + "to_rulebase_uid": to_rulebase_uid, + "link_type": link_type, + "is_initial": False, + "is_global": False, + "is_section": False, + } + ) + + +def parse_nat_rulebase( + src_rulebase: dict[str, Any], + normalized_nat_rulebase: Rulebase, + gateway: dict[str, Any], + native_config: dict[str, Any], + import_state: ImportState, + normalized_config: dict[str, Any], + normalized_gateway: dict[str, Any], +): + section_rulebase = Rulebase( + uid=src_rulebase["uid"], + mgm_uid=import_state.mgm_details.uid, + name=src_rulebase["name"], + rules={}, + ) + + if not any(rb for rb in normalized_config["policies"] if rb.uid == section_rulebase.uid): + normalized_config["policies"].append(section_rulebase) + + insert_rulebase_link( + from_rulebase_uid=normalized_nat_rulebase.uid, + to_rulebase_uid=section_rulebase.uid, + link_type="nat", + normalized_gateway=normalized_gateway, + ) + + for rule in src_rulebase["rulebase"]: + parse_nat_rule(rule, section_rulebase, gateway, native_config) + + +def parse_nat_rule( + src_rulebase: dict[str, Any], + rulebase: Rulebase, + gateway: dict[str, Any], + native_config: dict[str, Any], +): + (rule_match, rule_xlate) = parse_nat_rule_transform(src_rulebase) + parse_single_rule( + rule_match, + rulebase, + rulebase.name, + rulebase.uid, + gateway, + native_config["policies"], + ) + parse_single_rule( # do not increase rule_num here (xlate rules do not count) + rule_xlate, + rulebase, + rulebase.name, + rulebase.uid, + gateway, + native_config["policies"], + ) + + +def parse_nat_rule_chunk( + chunk: dict[str, Any], + normalized_nat_rulebase: Rulebase, + gateway: dict[str, Any], + native_config: dict[str, Any], + import_state: ImportState, + normalized_config: dict[str, Any], + normalized_gateway: dict[str, Any], +): + if "rulebase" not in chunk: + return + + for src_rulebase in chunk["rulebase"]: + if "rulebase" in src_rulebase: + parse_nat_rulebase( + src_rulebase, + normalized_nat_rulebase, + gateway, + native_config, + import_state, + normalized_config, + normalized_gateway, + ) + if "rule-number" in src_rulebase: # rulebase is just a single rule (xlate rules do not count) + parse_nat_rule(src_rulebase, normalized_nat_rulebase, gateway, native_config) + + +def parse_nat_rule_transform(nat_rule: dict[str, Any]) -> tuple[dict[str, Any], dict[str, Any]]: + nat_in_rule = { + "uid": nat_rule["uid"], + "source": [nat_rule["original-source"]], + "destination": [nat_rule["original-destination"]], + "service": [nat_rule["original-service"]], + "action": [{"name": "accept", "type": "nat-action", "uid": nat_rule["uid"] + "_original-action"}], + "track": [{"type": "nat", "name": "None", "uid": nat_rule["uid"]}], + "type": "nat", + "rule-number": 0, + "source-negate": False, + "destination-negate": False, + "service-negate": False, + "install-on": nat_rule["install-on"], + "time": nat_rule.get("time", "time"), + "enabled": nat_rule["enabled"], + "comments": nat_rule["comments"], + "nat_rule": True, + "xlate_rule_uid": nat_rule["uid"] + "_translated", + "access_rule": False, + } + nat_out_rule = { + "uid": nat_rule["uid"] + "_translated", + "source": [nat_rule["translated-source"]], + "destination": [nat_rule["translated-destination"]], + "service": [nat_rule["translated-service"]], + "action": [{"name": "accept", "type": "nat-action", "uid": nat_rule["uid"] + "_translated-action"}], + "track": [{"type": "nat", "name": "None", "uid": nat_rule["uid"] + "_translated"}], + "type": "nat", + "rule-number": 0, + "enabled": True, + "source-negate": False, + "destination-negate": False, + "service-negate": False, + "install-on": nat_rule["install-on"], + "time": "time", + "nat_rule": True, + "access_rule": False, + } + return (nat_in_rule, nat_out_rule) diff --git a/roles/importer/files/importer/fw_modules/checkpointR8x/cp_rule.py b/roles/importer/files/importer/fw_modules/checkpointR8x/cp_rule.py index 755d801cea..2e264984a4 100644 --- a/roles/importer/files/importer/fw_modules/checkpointR8x/cp_rule.py +++ b/roles/importer/files/importer/fw_modules/checkpointR8x/cp_rule.py @@ -185,6 +185,10 @@ def parse_rulebase_chunk( policy_structure: list[dict[str, Any]], ): for chunk in rulebase_to_parse["chunks"]: + if "rulebase" not in chunk: + FWOLogger.debug("found unparsable rulebase chunk: " + str(chunk), 9) + continue + for rule in chunk["rulebase"]: if "rule-number" in rule: parse_single_rule(rule, normalized_rulebase, normalized_rulebase.uid, None, gateway, policy_structure) @@ -381,9 +385,13 @@ def parse_single_rule( "last_change_admin": sanitize(last_change_admin), "parent_rule_uid": sanitize(parent_rule_uid), "last_hit": sanitize(last_hit), + "nat_rule": bool(native_rule.get("nat_rule", False)), + "access_rule": bool(native_rule.get("access_rule", True)), } if comments is not None: rule["rule_comment"] = sanitize(comments) + if native_rule.get("xlate_rule_uid") is not None: + rule["xlate_rule_uid"] = sanitize(native_rule["xlate_rule_uid"]) rulebase.rules.update({rule["rule_uid"]: RuleNormalized(**rule)}) @@ -468,7 +476,7 @@ def check_and_add_section_header( src_rulebase: dict[str, Any], target_rulebase: Rulebase, layer_name: str, - import_id: str, + import_id: int, section_header_uids: set[str], ): # TODO: re-implement @@ -479,7 +487,7 @@ def insert_section_header_rule( _target_rulebase: Rulebase, _section_name: str, _layer_name: str, - _import_id: str, + _import_id: int, _src_rulebase_uid: str, _section_header_uids: set[str], _parent_uid: str, diff --git a/roles/importer/files/importer/fw_modules/checkpointR8x/fwcommon.py b/roles/importer/files/importer/fw_modules/checkpointR8x/fwcommon.py index 2d8f2fa3c1..f1ebb68f7d 100644 --- a/roles/importer/files/importer/fw_modules/checkpointR8x/fwcommon.py +++ b/roles/importer/files/importer/fw_modules/checkpointR8x/fwcommon.py @@ -4,7 +4,7 @@ import fwo_const import fwo_globals -from fw_modules.checkpointR8x import cp_const, cp_gateway, cp_getter, cp_network, cp_rule, cp_service +from fw_modules.checkpointR8x import cp_const, cp_gateway, cp_getter, cp_nat, cp_network, cp_rule, cp_service from fwo_base import ConfigAction from fwo_exceptions import FwLoginFailedError, FwoImporterError, ImportInterruptionError from fwo_log import FWOLogger @@ -229,6 +229,7 @@ def normalize_single_manager_config( cp_network.normalize_time_objects(native_config, normalized_config_dict) FWOLogger.info("completed normalizing time objects") cp_gateway.normalize_gateways(native_config, import_state, normalized_config_dict) + cp_rule.normalize_rulebases( native_config, native_config_global, @@ -237,6 +238,7 @@ def normalize_single_manager_config( normalized_config_global, is_global_loop_iteration, ) + cp_nat.normalize_nat_rules(native_config, import_state, normalized_config_dict) if not parsing_config_only: # get config from cp fw mgr cp_getter.logout(import_state.mgm_details.build_fw_api_string(), sid) FWOLogger.info("completed normalizing rulebases") @@ -272,6 +274,7 @@ def get_rules(native_config: dict[str, Any], import_state: ImportState) -> int: manager_details, policy_structure=policy_structure, ) + gateways_and_servers = cp_getter.get_gateways_and_servers(sid, cp_manager_api_base_url) process_devices( manager_details, @@ -287,6 +290,7 @@ def get_rules(native_config: dict[str, Any], import_state: ImportState) -> int: ], # globalSid should not be None but is when the first manager is not supermanager native_config["domains"][0], import_state, + gateways_and_servers, ) native_config["domains"][manager_index].update({"policies": policy_structure}) @@ -336,6 +340,28 @@ def handle_super_manager( return global_assignments, global_policy_structure, global_domain, global_sid +def get_policy_for_device( + device: dict[str, Any], + gateways_and_servers: list[dict[str, Any]], + policy_structure: list[dict[str, Any]], +) -> dict[str, Any] | None: + found_gateway = next((gw for gw in gateways_and_servers if gw["uid"] == device["uid"]), None) + if found_gateway is None: + FWOLogger.warning("Could not find gateway for device, skipping: " + str(device["uid"])) + return None + + if "policy" not in found_gateway: + FWOLogger.warning("Could not find policy in gateway, skipping: " + str(device["uid"])) + return None + + gateway_policy = found_gateway["policy"] + if "access-policy-name" not in gateway_policy: + FWOLogger.warning("Could not find access policy for gateway, skipping: " + str(device["uid"])) + return None + + return next((policy for policy in policy_structure if policy["name"] == gateway_policy["access-policy-name"]), None) + + def process_devices( manager_details: ManagementController, policy_structure: list[dict[str, Any]], @@ -348,16 +374,24 @@ def process_devices( native_config_domain: dict[str, Any], native_config_global_domain: dict[str, Any], import_state: ImportState, + gateways_and_servers: list[dict[str, Any]], ) -> None: for device in manager_details.devices: if device["importDisabled"] and not import_state.force_import: continue + device_config: dict[str, Any] = initialize_device_config(device) if not device_config: continue + policy = get_policy_for_device(device, gateways_and_servers, policy_structure) + if not policy: + FWOLogger.warning("Could not find policy structure for device, skipping: " + str(device["uid"])) + native_config_domain["gateways"].append(device_config) + continue + ordered_layer_uids: list[str] = get_ordered_layer_uids( - policy_structure, device_config, manager_details.get_domain_string() + policy, device_config, manager_details.get_domain_string() ) if not ordered_layer_uids: FWOLogger.warning(f"No ordered layers found for device: {device_config['name']}") @@ -379,7 +413,12 @@ def process_devices( cp_manager_api_base_url, ) else: - define_initial_rulebase(device_config, ordered_layer_uids, is_global=False) + define_initial_rulebase_links( + device_config, + policy, + is_global=False, + native_config_domain=native_config_global_domain, + ) add_ordered_layers_to_native_config( ordered_layer_uids, @@ -390,9 +429,10 @@ def process_devices( device_config, is_global=False, global_ordered_layer_count=global_ordered_layer_count, + policy_structure=policy, ) - handle_nat_rules(device, native_config_domain, sid, import_state) + handle_nat_rules(native_config_domain, sid, import_state, policy) native_config_domain["gateways"].append(device_config) @@ -430,7 +470,7 @@ def handle_global_rulebase_links( continue for global_policy in global_policy_structure: if global_policy["name"] == global_assignment["global-access-policy"]: - global_ordered_layer_uids = get_ordered_layer_uids([global_policy], device_config, global_domain) + global_ordered_layer_uids = get_ordered_layer_uids(global_policy, device_config, global_domain) if not global_ordered_layer_uids: FWOLogger.warning(f"No access layer for global policy: {global_policy['name']}") break @@ -445,13 +485,14 @@ def handle_global_rulebase_links( device_config, is_global=True, global_ordered_layer_count=global_ordered_layer_count, + policy_structure=global_policy, ) define_global_rulebase_link( device_config, - global_ordered_layer_uids, ordered_layer_uids, native_config_global_domain, global_policy_rulebases_uid_list, + global_policy, ) return global_ordered_layer_count @@ -461,15 +502,20 @@ def handle_global_rulebase_links( def define_global_rulebase_link( device_config: dict[str, Any], - global_ordered_layer_uids: list[str], ordered_layer_uids: list[str], native_config_global_domain: dict[str, Any], global_policy_rulebases_uid_list: list[str], + policy: dict[str, Any], ): """ Links initial and placeholder rule for global rulebases """ - define_initial_rulebase(device_config, global_ordered_layer_uids, is_global=True) + define_initial_rulebase_links( + device_config, + policy, + is_global=True, + native_config_domain=native_config_global_domain, + ) # parse global rulebases, find place-holders and link local rulebases placeholder_link_index = 0 @@ -500,12 +546,23 @@ def define_global_rulebase_link( placeholder_link_index += 1 -def define_initial_rulebase(device_config: dict[str, Any], ordered_layer_uids: list[str], is_global: bool): +def define_initial_rulebase_links( + device_config: dict[str, Any], + policy: dict[str, Any], + is_global: bool, + native_config_domain: dict[str, Any] | None, +): + if native_config_domain is None: + native_config_domain = {"rulebases": []} + + if not any(rb["uid"] == policy["uid"] for rb in native_config_domain["rulebases"]): + native_config_domain["rulebases"].append({"uid": policy["uid"], "name": policy["name"], "chunks": []}) + device_config["rulebase_links"].append( { "from_rulebase_uid": None, "from_rule_uid": None, - "to_rulebase_uid": ordered_layer_uids[0], + "to_rulebase_uid": policy["uid"], "type": "ordered", "is_global": is_global, "is_initial": True, @@ -523,25 +580,21 @@ def get_rules_params(import_state: ImportState) -> dict[str, Any]: } -def handle_nat_rules(device: dict[str, Any], native_config_domain: dict[str, Any], sid: str, import_state: ImportState): - if device.get("package_name"): - show_params_rules: dict[str, Any] = { - "limit": import_state.fwo_config.api_fetch_size, - "use-object-dictionary": cp_const.use_object_dictionary, - "details-level": "standard", - "package": device["package_name"], - } - FWOLogger.debug(f"Getting NAT rules for package: {device['package_name']}", 4) - nat_rules = cp_getter.get_nat_rules_from_api_as_dict( - import_state.mgm_details.build_fw_api_string(), - sid, - show_params_rules, - native_config_domain=native_config_domain, - ) - if nat_rules: - native_config_domain["nat_rulebases"].append(nat_rules) - else: - native_config_domain["nat_rulebases"].append({"nat_rule_chunks": []}) +def handle_nat_rules(native_config_domain: dict[str, Any], sid: str, import_state: ImportState, policy: dict[str, Any]): + show_params_rules: dict[str, Any] = { + "limit": import_state.fwo_config.api_fetch_size, + "use-object-dictionary": cp_const.use_object_dictionary, + "package": policy["name"], + } + FWOLogger.debug(f"Getting NAT rules for package: {policy['name']}", 4) + nat_rules = cp_getter.get_nat_rules_from_api_as_dict( + import_state.mgm_details.build_fw_api_string(), + sid, + show_params_rules, + native_config_domain=native_config_domain, + ) + if nat_rules: + native_config_domain["nat_rulebases"].append(nat_rules) else: native_config_domain["nat_rulebases"].append({"nat_rule_chunks": []}) @@ -555,6 +608,7 @@ def add_ordered_layers_to_native_config( device_config: dict[str, Any], is_global: bool, global_ordered_layer_count: int, + policy_structure: dict[str, Any], ) -> list[str]: """ Fetches ordered layers and links them @@ -573,12 +627,18 @@ def add_ordered_layers_to_native_config( is_global=is_global, access_type="access", rulebase_uid=ordered_layer_uid, + policy_structure=policy_structure, ) # link to next ordered layer # in case of mds: domain ordered layers are linked once there is no global ordered layer counterpart - if (is_global or ordered_layer_index >= global_ordered_layer_count - 1) and ( - ordered_layer_index < len(ordered_layer_uids) - 1 + if ( + (is_global or ordered_layer_index >= global_ordered_layer_count - 1) + and (ordered_layer_index < len(ordered_layer_uids) - 1) + and not any( + link["to_rulebase_uid"] == ordered_layer_uids[ordered_layer_index + 1] + for link in device_config["rulebase_links"] + ) ): device_config["rulebase_links"].append( { @@ -595,32 +655,23 @@ def add_ordered_layers_to_native_config( return policy_rulebases_uid_list -def get_ordered_layer_uids( - policy_structure: list[dict[str, Any]], device_config: dict[str, Any], domain: str | None -) -> list[str]: +def get_ordered_layer_uids(policy: dict[str, Any], device_config: dict[str, Any], domain: str | None) -> list[str]: """ Get UIDs of ordered layers for policy of device """ - ordered_layer_uids: list[str] = [] - for policy in policy_structure: - found_target_in_policy = False - for target in policy["targets"]: - if target["uid"] == device_config["uid"] or target["uid"] == "all": - found_target_in_policy = True - if found_target_in_policy: - append_access_layer_uid(policy, domain, ordered_layer_uids) - - return ordered_layer_uids - - -def append_access_layer_uid(policy: dict[str, Any], domain: str | None, ordered_layer_uids: list[str]) -> None: - ordered_layer_uids.extend( - [ - access_layer["uid"] - for access_layer in policy["access-layers"] - if access_layer["domain"] == domain or domain == "" - ] - ) + ordered_layer_uids: list[str] = [policy["uid"]] if "uid" in policy else [] + + is_targeted = any(target["uid"] == device_config["uid"] or target["uid"] == "all" for target in policy["targets"]) + if is_targeted: + ordered_layer_uids.extend( + [ + access_layer["uid"] + for access_layer in policy["access-layers"] + if access_layer["domain"] == domain or domain == "" + ] + ) + + return list(dict.fromkeys(ordered_layer_uids)) def get_objects(native_config_dict: dict[str, Any], import_state: ImportState) -> int: diff --git a/roles/importer/files/importer/fw_modules/fortiadom5ff/fmgr_consts.py b/roles/importer/files/importer/fw_modules/fortiadom5ff/fmgr_consts.py index 6cf9e6f111..45fb305711 100644 --- a/roles/importer/files/importer/fw_modules/fortiadom5ff/fmgr_consts.py +++ b/roles/importer/files/importer/fw_modules/fortiadom5ff/fmgr_consts.py @@ -4,6 +4,7 @@ "firewall/addrgrp", "firewall/addrgrp6", "firewall/ippool", + "firewall/ippool6", "firewall/vip", "system/external-resource", "firewall/wildcard-fqdn/custom", @@ -25,3 +26,5 @@ nat_types = ["central/dnat", "central/dnat6", "firewall/central-snat-map"] user_obj_types = ["user/local", "user/group"] + +EXPECTED_NATIP_LIST_LENGTH = 2 diff --git a/roles/importer/files/importer/fw_modules/fortiadom5ff/fmgr_network.py b/roles/importer/files/importer/fw_modules/fortiadom5ff/fmgr_network.py index 5a37725755..4f8fe46220 100644 --- a/roles/importer/files/importer/fw_modules/fortiadom5ff/fmgr_network.py +++ b/roles/importer/files/importer/fw_modules/fortiadom5ff/fmgr_network.py @@ -1,6 +1,7 @@ import ipaddress from typing import Any +from fw_modules.fortiadom5ff.fmgr_consts import nw_obj_types from fw_modules.fortiadom5ff.fmgr_zone import find_zones_in_normalized_config from fwo_base import sort_and_join_refs from fwo_const import ANY_IP_END, ANY_IP_START, LIST_DELIMITER, NAT_POSTFIX @@ -78,6 +79,13 @@ def exclude_object_types_in_member_ref_search(obj_type: str, current_obj_type: s return skip_member_ref_loop +def get_native_obj_type(object_type: str) -> str: + for native_obj_type in nw_obj_types: + if object_type.endswith(native_obj_type): + return native_obj_type + return "unknown" + + def normalize_network_object( obj_orig: dict[str, Any], nw_objects: list[dict[str, Any]], @@ -142,6 +150,7 @@ def normalize_network_object( obj_orig.get("associated-interface", []), normalized_config_adom, normalized_config_global ) obj.update({"obj_zone": LIST_DELIMITER.join(associated_interfaces)}) + obj.update({"obj_native_type": get_native_obj_type(current_obj_type)}) nw_objects.append(obj) diff --git a/roles/importer/files/importer/fw_modules/fortiadom5ff/fmgr_rule.py b/roles/importer/files/importer/fw_modules/fortiadom5ff/fmgr_rule.py index 526e12a14f..66274569f8 100644 --- a/roles/importer/files/importer/fw_modules/fortiadom5ff/fmgr_rule.py +++ b/roles/importer/files/importer/fw_modules/fortiadom5ff/fmgr_rule.py @@ -1,26 +1,43 @@ import copy import ipaddress +import json from datetime import datetime, timezone from typing import Any from fw_modules.fortiadom5ff import fmgr_getter -from fw_modules.fortiadom5ff.fmgr_consts import nat_types +from fw_modules.fortiadom5ff.fmgr_consts import EXPECTED_NATIP_LIST_LENGTH, nat_types +from fw_modules.fortiadom5ff.fmgr_network import create_network_object +from fw_modules.fortiadom5ff.fmgr_service import create_svc_object from fw_modules.fortiadom5ff.fmgr_zone import find_zones_in_normalized_config -from fwo_const import LIST_DELIMITER -from fwo_exceptions import FwoDeviceWithoutLocalPackageError, FwoImporterErrorInconsistenciesError +from fwo_const import ANY_IP_END, ANY_IP_START, LIST_DELIMITER +from fwo_exceptions import ( + FwoDeviceWithoutLocalPackageError, + FwoImporterErrorInconsistenciesError, +) from fwo_log import FWOLogger from models.rule import RuleAction, RuleNormalized, RuleTrack, RuleType from models.rulebase import Rulebase +from netaddr import IPNetwork NETWORK_OBJECT = "network_object" STRING_PKG = "/pkg/" STRING_PM_CONFIG_GLOBAL_PKG = "/pm/config/global/pkg/" STRING_PM_CONFIG_ADOM = "/pm/config/adom/" -rule_access_scope_v4 = ["rules_global_header_v4", "rules_adom_v4", "rules_global_footer_v4"] -rule_access_scope_v6 = ["rules_global_header_v6", "rules_adom_v6", "rules_global_footer_v6"] +rule_access_scope_v4 = [ + "rules_global_header_v4", + "rules_adom_v4", + "rules_global_footer_v4", +] +rule_access_scope_v6 = [ + "rules_global_header_v6", + "rules_adom_v6", + "rules_global_footer_v6", +] rule_access_scope = rule_access_scope_v6 + rule_access_scope_v4 rule_nat_scope = ["rules_global_nat", "rules_adom_nat"] rule_scope = rule_access_scope + rule_nat_scope +ip_v4_type = 4 +ip_v6_type = 6 def normalize_rulebases( @@ -61,58 +78,72 @@ def normalize_rulebases_for_each_link_destination( normalized_config_adom: dict[str, Any], normalized_config_global: dict[str, Any], ): - for rulebase_link in gateway["rulebase_links"]: - if rulebase_link["to_rulebase_uid"] not in fetched_rulebase_uids and rulebase_link["to_rulebase_uid"] != "": - rulebase_to_parse = find_rulebase_to_parse(native_config["rulebases"], rulebase_link["to_rulebase_uid"]) - # search in global rulebase - found_rulebase_in_global = False - if rulebase_to_parse == {} and not is_global_loop_iteration and native_config_global != {}: - rulebase_to_parse = find_rulebase_to_parse( - native_config_global["rulebases"], rulebase_link["to_rulebase_uid"] - ) - found_rulebase_in_global = True - if rulebase_to_parse == {}: - FWOLogger.warning("found to_rulebase link without rulebase in nativeConfig: " + str(rulebase_link)) - continue + # Iterate over a snapshot because we may append NAT links while processing. + for rulebase_link in list(gateway["rulebase_links"]): + if _should_skip_rulebase_link(rulebase_link, fetched_rulebase_uids): + normalize_nat_rulebase(rulebase_link, native_config, normalized_config_adom, normalized_config_global) + continue + + rulebase_to_parse, found_rulebase_in_global = _find_rulebase_to_parse_for_link( + rulebase_link, + native_config, + native_config_global, + is_global_loop_iteration, + ) + if rulebase_to_parse == {}: + FWOLogger.warning("found to_rulebase link without rulebase in nativeConfig: " + str(rulebase_link)) + continue - normalized_rulebase = initialize_normalized_rulebase(rulebase_to_parse, mgm_uid) - parse_rulebase( - normalized_config_adom, - normalized_config_global, - rulebase_to_parse, - normalized_rulebase, - found_rulebase_in_global, - ) - fetched_rulebase_uids.append(rulebase_link["to_rulebase_uid"]) + normalized_rulebase = initialize_normalized_rulebase(rulebase_to_parse, mgm_uid) + parse_rulebase( + normalized_config_adom, + normalized_config_global, + rulebase_to_parse, + normalized_rulebase, + found_rulebase_in_global, + ) + fetched_rulebase_uids.append(rulebase_link["to_rulebase_uid"]) + _append_normalized_rulebase( + normalized_config_adom, + normalized_config_global, + normalized_rulebase, + found_rulebase_in_global, + ) - if found_rulebase_in_global: - normalized_config_global["policies"].append(normalized_rulebase) - else: - normalized_config_adom["policies"].append(normalized_rulebase) + new_process_nat_rules_for_rulebase( + gateway, + normalized_config_adom, + normalized_config_global, + rulebase_to_parse, + normalized_rulebase, + ) # normalizing nat rulebases is work in progress - # normalize_nat_rulebase(rulebase_link, native_config, normalized_config_adom, normalized_config_global) # noqa: ERA001 + normalize_nat_rulebase(rulebase_link, native_config, normalized_config_adom, normalized_config_global) -def normalize_nat_rulebase( - rulebase_link: dict[str, Any], - native_config: dict[str, Any], - normalized_config_adom: dict[str, Any], - normalized_config_global: dict[str, Any], -): - if not rulebase_link["is_section"]: - for nat_type in nat_types: - nat_type_string = nat_type + "_" + rulebase_link["to_rulebase_uid"] - nat_rulebase = get_native_nat_rulebase(native_config, nat_type_string) - parse_nat_rulebase(nat_rulebase, nat_type_string, normalized_config_adom, normalized_config_global) +def _should_skip_rulebase_link(rulebase_link: dict[str, Any], fetched_rulebase_uids: list[str]) -> bool: + link_type = rulebase_link.get("link_type", rulebase_link.get("type", "ordered")) + if link_type == "nat": + # NAT links are generated during normalization and do not exist in native rulebases. + return True + return not ( + rulebase_link["to_rulebase_uid"] not in fetched_rulebase_uids and rulebase_link["to_rulebase_uid"] != "" + ) -def get_native_nat_rulebase(native_config: dict[str, Any], nat_type_string: str) -> list[dict[str, Any]]: - for nat_rulebase in native_config["nat_rulebases"]: - if nat_type_string == nat_rulebase["type"]: - return nat_rulebase["data"] - FWOLogger.warning("no nat data for " + nat_type_string) - return [] +def _find_rulebase_to_parse_for_link( + rulebase_link: dict[str, Any], + native_config: dict[str, Any], + native_config_global: dict[str, Any], + is_global_loop_iteration: bool, +) -> tuple[dict[str, Any], bool]: + rulebase_to_parse = find_rulebase_to_parse(native_config["rulebases"], rulebase_link["to_rulebase_uid"]) + found_rulebase_in_global = False + if rulebase_to_parse == {} and not is_global_loop_iteration and native_config_global != {}: + rulebase_to_parse = find_rulebase_to_parse(native_config_global["rulebases"], rulebase_link["to_rulebase_uid"]) + found_rulebase_in_global = True + return rulebase_to_parse, found_rulebase_in_global def find_rulebase_to_parse(rulebase_list: list[dict[str, Any]], rulebase_uid: str) -> dict[str, Any]: @@ -122,6 +153,18 @@ def find_rulebase_to_parse(rulebase_list: list[dict[str, Any]], rulebase_uid: st return {} +def _append_normalized_rulebase( + normalized_config_adom: dict[str, Any], + normalized_config_global: dict[str, Any], + normalized_rulebase: Rulebase, + found_rulebase_in_global: bool, +) -> None: + if found_rulebase_in_global: + normalized_config_global["policies"].append(normalized_rulebase) + else: + normalized_config_adom["policies"].append(normalized_rulebase) + + def initialize_normalized_rulebase(rulebase_to_parse: dict[str, Any], mgm_uid: str) -> Rulebase: """ We use 'type' as uid/name since a rulebase may have a v4 and a v6 part @@ -140,13 +183,20 @@ def parse_rulebase( ): """Parses a native Fortinet rulebase into a normalized rulebase.""" for native_rule in rulebase_to_parse["data"]: - parse_single_rule(normalized_config_adom, normalized_config_global, native_rule, normalized_rulebase) + parse_single_rule( + normalized_config_adom, + normalized_config_global, + native_rule, + normalized_rulebase, + ) if not found_rulebase_in_global: add_implicit_deny_rule(normalized_config_adom, normalized_config_global, normalized_rulebase) def add_implicit_deny_rule( - normalized_config_adom: dict[str, Any], normalized_config_global: dict[str, Any], rulebase: Rulebase + normalized_config_adom: dict[str, Any], + normalized_config_global: dict[str, Any], + rulebase: Rulebase, ): deny_rule = { "srcaddr": ["all"], @@ -215,6 +265,8 @@ def parse_single_rule( rulebase: Rulebase, ): """Parses a single native Fortinet rule into a normalized rule and adds it to the given rulebase.""" + is_nat_rule = any(key in native_rule and native_rule[key] == 1 for key in ["nat", "nat46", "nat64"]) + # Extract basic rule information rule_disabled = True # Default to disabled if "status" in native_rule and (native_rule["status"] == 1 or native_rule["status"] == "enable"): @@ -225,10 +277,18 @@ def parse_single_rule( rule_track = rule_parse_tracking_info(native_rule) rule_src_list, rule_src_refs_list = rule_parse_addresses( - native_rule, "src", normalized_config_adom, normalized_config_global, is_nat=False + native_rule, + "src", + normalized_config_adom, + normalized_config_global, + is_nat=is_nat_rule, ) rule_dst_list, rule_dst_refs_list = rule_parse_addresses( - native_rule, "dst", normalized_config_adom, normalized_config_global, is_nat=False + native_rule, + "dst", + normalized_config_adom, + normalized_config_global, + is_nat=is_nat_rule, ) rule_svc_list, rule_svc_refs_list = rule_parse_service(native_rule) @@ -247,7 +307,7 @@ def parse_single_rule( time = rule_parse_time(native_rule) - # Create the normalized rule + # Create the normalized access rule rule_normalized = RuleNormalized( rule_num=0, rule_num_numeric=0, @@ -277,15 +337,16 @@ def parse_single_rule( rule_src_zone=LIST_DELIMITER.join(rule_src_zones), rule_dst_zone=LIST_DELIMITER.join(rule_dst_zones), rule_head_text=None, + access_rule=True, + nat_rule=False, ) + if rule_normalized.rule_uid is None: raise FwoImporterErrorInconsistenciesError("rule_normalized.rule_uid is None when parsing single rule") # Add the rule to the rulebase rulebase.rules[rule_normalized.rule_uid] = rule_normalized - # TODO: handle combined NAT, see handle_combined_nat_rule - def rule_parse_action(native_rule: dict[str, Any]) -> RuleAction: # Extract action - Fortinet uses 0 for deny/drop, 1 for accept @@ -341,14 +402,31 @@ def rule_parse_addresses( addr_ref_list: list[str] = [] if not is_nat: build_addr_list( - native_rule, target, normalized_config_adom, normalized_config_global, addr_list, addr_ref_list, is_v4=True + native_rule, + target, + normalized_config_adom, + normalized_config_global, + addr_list, + addr_ref_list, + is_v4=True, ) build_addr_list( - native_rule, target, normalized_config_adom, normalized_config_global, addr_list, addr_ref_list, is_v4=False + native_rule, + target, + normalized_config_adom, + normalized_config_global, + addr_list, + addr_ref_list, + is_v4=False, ) else: build_nat_addr_list( - native_rule, target, normalized_config_adom, normalized_config_global, addr_list, addr_ref_list + native_rule, + target, + normalized_config_adom, + normalized_config_global, + addr_list, + addr_ref_list, ) return addr_list, addr_ref_list @@ -391,36 +469,98 @@ def build_nat_addr_list( addr_list: list[str], addr_ref_list: list[str], ) -> None: - # so far only ip v4 expected + is_ipv6 = bool(native_rule.get("srcaddr6") or native_rule.get("dstaddr6")) if target == "src": - for addr in sorted(native_rule.get("orig-addr", [])): + source_addrs = native_rule.get("srcaddr6", []) if is_ipv6 else native_rule.get("srcaddr", []) + for addr in sorted(source_addrs): addr_list.append(addr) addr_ref_list.append( find_addr_ref( addr, - is_v4=True, + is_v4=not is_ipv6, normalized_config_adom=normalized_config_adom, normalized_config_global=normalized_config_global, ) ) if target == "dst": - for addr in sorted(native_rule.get("dst-addr", [])): + destination_addrs = native_rule.get("dstaddr6", []) if is_ipv6 else native_rule.get("dstaddr", []) + for addr in sorted(destination_addrs): addr_list.append(addr) addr_ref_list.append( find_addr_ref( addr, - is_v4=True, + is_v4=not is_ipv6, normalized_config_adom=normalized_config_adom, normalized_config_global=normalized_config_global, ) ) +def ensure_original_objects(normalized_config_adom: dict[str, Any], normalized_config_global: dict[str, Any]) -> None: + """ + Ensure that a standard 'Original' network and service object exist in the normalized config. + If missing, create minimal placeholder objects in the ADOM normalized config. + """ + # Ensure lists exist + normalized_config_adom.setdefault("network_objects", []) + normalized_config_adom.setdefault("service_objects", []) + + # Check network objects in ADOM and global + combined_nw = normalized_config_adom["network_objects"] + normalized_config_global.get("network_objects", []) + if not any(obj.get("obj_name") == "Original" for obj in combined_nw): + normalized_config_adom["network_objects"].append( + create_network_object( + name="Original", + obj_type="network", + ip=ANY_IP_START, + ip_end=ANY_IP_END, + uid="Original", + color="black", + comment='"original" network object created by FWO importer for NAT purposes', + zone="global", + ) + ) + + # Check service objects in ADOM and global + combined_svc = normalized_config_adom["service_objects"] + normalized_config_global.get("service_objects", []) + if not any(svc.get("svc_name") == "Original" for svc in combined_svc): + normalized_config_adom["service_objects"].append( + create_svc_object( + name="Original", + proto=0, + color="foreground", + port=None, + comment='"original" service object created by FWO importer for NAT purposes', + ) + ) + + outgoing_interface_ip_name = "Outgoing Interface IP" + + if not any(obj.get("obj_name") == outgoing_interface_ip_name for obj in combined_nw): + normalized_config_adom["network_objects"].append( + create_network_object( + name=outgoing_interface_ip_name, + obj_type="network", + ip=ANY_IP_START, + ip_end=ANY_IP_END, + uid="Outgoing_Interface_IP", + color="black", + comment=f'"{outgoing_interface_ip_name}" network object created by FWO importer for NAT purposes', + zone="global", + ) + ) + + def find_addr_ref( - addr: str, is_v4: bool, normalized_config_adom: dict[str, Any], normalized_config_global: dict[str, Any] + addr: str, + is_v4: bool, + normalized_config_adom: dict[str, Any], + normalized_config_global: dict[str, Any], ) -> str: for nw_obj in normalized_config_adom["network_objects"] + normalized_config_global.get("network_objects", []): - if addr == nw_obj["obj_name"] and ((is_v4 and ip_type(nw_obj) == 4) or (not is_v4 and ip_type(nw_obj) == 6)): # noqa: PLR2004 + if addr == nw_obj["obj_name"] and ( + (is_v4 and ip_type(nw_obj) == ip_v4_type) or (not is_v4 and ip_type(nw_obj) == ip_v6_type) + ): return nw_obj["obj_uid"] raise FwoImporterErrorInconsistenciesError(f"No ref found for '{addr}'.") @@ -498,7 +638,15 @@ def get_access_policy( options = ["extra info", "scope member", "get meta"] previous_rulebase = get_and_link_global_rulebase( - "header", previous_rulebase, global_pkg_name, native_config_global, sid, fm_api_url, options, limit, link_list + "header", + previous_rulebase, + global_pkg_name, + native_config_global, + sid, + fm_api_url, + options, + limit, + link_list, ) previous_rulebase = get_and_link_local_rulebase( @@ -515,7 +663,15 @@ def get_access_policy( ) previous_rulebase = get_and_link_global_rulebase( - "footer", previous_rulebase, global_pkg_name, native_config_global, sid, fm_api_url, options, limit, link_list + "footer", + previous_rulebase, + global_pkg_name, + native_config_global, + sid, + fm_api_url, + options, + limit, + link_list, ) device_config["rulebase_links"].extend(link_list) @@ -535,7 +691,8 @@ def get_and_link_global_rulebase( rulebase_type_prefix = "rules_global_" + header_or_footer if global_pkg_name != "": if not is_rulebase_already_fetched( - native_config_global["rulebases"], rulebase_type_prefix + "_v4_" + global_pkg_name + native_config_global["rulebases"], + rulebase_type_prefix + "_v4_" + global_pkg_name, ): fmgr_getter.update_config_with_fortinet_api_call( native_config_global["rulebases"], @@ -547,7 +704,8 @@ def get_and_link_global_rulebase( limit=limit, ) if not is_rulebase_already_fetched( - native_config_global["rulebases"], rulebase_type_prefix + "_v6_" + global_pkg_name + native_config_global["rulebases"], + rulebase_type_prefix + "_v6_" + global_pkg_name, ): # delete_v: hier auch options=options? fmgr_getter.update_config_with_fortinet_api_call( @@ -611,7 +769,9 @@ def get_and_link_local_rulebase( def find_packages( - adom_device_vdom_policy_package_structure: dict[str, Any], adom_name: str, mgm_details_device: dict[str, Any] + adom_device_vdom_policy_package_structure: dict[str, Any], + adom_name: str, + mgm_details_device: dict[str, Any], ) -> tuple[str, str]: for device in adom_device_vdom_policy_package_structure[adom_name]: for vdom in adom_device_vdom_policy_package_structure[adom_name][device]: @@ -670,7 +830,11 @@ def build_link(previous_rulebase: str | None, full_pkg_name: str, is_global: boo def has_rulebase_data( - rulebases: list[dict[str, Any]], full_pkg_name: str, is_global: bool, version: str, pkg_name: str + rulebases: list[dict[str, Any]], + full_pkg_name: str, + is_global: bool, + version: str, + pkg_name: str, ) -> bool: """Adds name and uid to rulebase and removes empty global rulebases""" has_data = False @@ -693,6 +857,40 @@ def has_rulebase_data( return has_data +def handle_combined_nat_rule( + rule: dict[str, Any], + rule_orig: dict[str, Any], + config2import: dict[str, Any], + nat_rule_number: int, + dev_id: int, +) -> dict[str, Any] | None: + # TODO: see fOS_rule for reference implementation + raise NotImplementedError("handle_combined_nat_rule is not implemented yet") + + +def add_users_to_rule(rule_orig: dict[str, Any], rule: dict[str, Any]) -> None: + if "groups" in rule_orig: + add_users(rule_orig["groups"], rule) + if "users" in rule_orig: + add_users(rule_orig["users"], rule) + + +def add_users(users: list[str], rule: dict[str, Any]) -> None: + for user in users: + rule_src_with_users = [user + "@" + src for src in rule["rule_src"].split(LIST_DELIMITER)] + + rule["rule_src"] = LIST_DELIMITER.join(rule_src_with_users) + + # here user ref is the user name itself + rule_src_refs_with_users = [user + "@" + src for src in rule["rule_src_refs"].split(LIST_DELIMITER)] + rule["rule_src_refs"] = LIST_DELIMITER.join(rule_src_refs_with_users) + + +################### +# NAT STARTS HERE # +################### + + def get_nat_policy( sid: str, fm_api_url: str, @@ -732,13 +930,103 @@ def get_nat_policy( def parse_nat_rulebase( - _nat_rulebase: list[dict[str, Any]], - _nat_type_string: str, - _normalized_config_adom: dict[str, Any], - _normalized_config_global: dict[str, Any], -) -> None: - # this function is not called until it is ready check git commit for reference - return + nat_rulebase: list[dict[str, Any]], + nat_type_string: str, + normalized_config_adom: dict[str, Any], + normalized_config_global: dict[str, Any], +) -> list[RuleNormalized]: + nat_rules: list[RuleNormalized] = [] + rule_number = 0 + for rule_number, rule_orig in enumerate(nat_rulebase): + rule_src_list, rule_src_refs_list = rule_parse_addresses( + rule_orig, "src", normalized_config_adom, normalized_config_global, is_nat=True + ) # because of is_nat = True, this will look for orig-addr + rule_dst_list, rule_dst_refs_list = rule_parse_addresses( + rule_orig, "dst", normalized_config_adom, normalized_config_global, is_nat=True + ) # because of is_nat = True, this will look for dst-addr + + rule_svc_list, rule_svc_refs_list = rule_parse_service(rule_orig) + + rule_src_zones = find_zones_in_normalized_config( + rule_orig.get("srcintf", []), + normalized_config_adom, + normalized_config_global, + ) + rule_dst_zones = find_zones_in_normalized_config( + rule_orig.get("dstintf", []), + normalized_config_adom, + normalized_config_global, + ) + + rule_normalized = RuleNormalized( + rule_num=rule_number, + rule_num_numeric=0, + rule_disabled=False, + rule_src_neg=False, + rule_src=LIST_DELIMITER.join(rule_src_list), + rule_src_refs=LIST_DELIMITER.join(rule_src_refs_list), + rule_dst_neg=False, + rule_dst=LIST_DELIMITER.join(rule_dst_list), + rule_dst_refs=LIST_DELIMITER.join(rule_dst_refs_list), + rule_svc_neg=False, + rule_svc=LIST_DELIMITER.join(rule_svc_list), + rule_svc_refs=LIST_DELIMITER.join(rule_svc_refs_list), + rule_action=RuleAction.DROP, + rule_track=RuleTrack.NONE, + rule_installon=nat_type_string, + rule_time="", # Time-based rules not commonly used in basic Fortinet configs + rule_name=rule_orig.get("name", ""), + rule_uid=rule_orig.get("uuid"), + rule_custom_fields=str({}), + rule_implied=False, + rule_type=RuleType.NAT, + last_change_admin=rule_orig.get("_last-modified-by", ""), + parent_rule_uid=None, + last_hit=rule_parse_last_hit(rule_orig), + rule_comment=rule_orig.get("comments"), + rule_src_zone=LIST_DELIMITER.join(rule_src_zones), + rule_dst_zone=LIST_DELIMITER.join(rule_dst_zones), + rule_head_text=None, + xlate_rule_uid=f"{rule_orig.get('uuid')}_translated" if rule_orig.get("uuid") else None, + nat_rule=True, + ) + + xlate_rule = RuleNormalized( + rule_num=rule_number, + rule_num_numeric=0, + rule_disabled=False, + rule_src_neg=False, + rule_src=LIST_DELIMITER.join(rule_src_list), + rule_src_refs=LIST_DELIMITER.join(rule_src_refs_list), + rule_dst_neg=False, + rule_dst="Original", + rule_dst_refs=LIST_DELIMITER.join(rule_dst_refs_list), + rule_svc_neg=False, + rule_svc=LIST_DELIMITER.join(rule_svc_list), + rule_svc_refs=LIST_DELIMITER.join(rule_svc_refs_list), + rule_action=RuleAction.DROP, + rule_track=RuleTrack.NONE, + rule_installon=nat_type_string, + rule_time="", # Time-based rules not commonly used in basic Fortinet configs + rule_name=rule_orig.get("name", ""), + rule_uid=f"{rule_orig.get('uuid')}_translated" if rule_orig.get("uuid") else None, + rule_custom_fields=str({}), + rule_implied=False, + rule_type=RuleType.NAT, + last_change_admin=rule_orig.get("_last-modified-by", ""), + parent_rule_uid=None, + last_hit=rule_parse_last_hit(rule_orig), + rule_comment=rule_orig.get("comments"), + rule_src_zone=LIST_DELIMITER.join(rule_src_zones), + rule_dst_zone=LIST_DELIMITER.join(rule_dst_zones), + rule_head_text=None, + nat_rule=True, + ) + + nat_rules.append(rule_normalized) + nat_rules.append(xlate_rule) + normalized_config_adom["rules"].extend(nat_rules) + return nat_rules def create_xlate_rule(rule: dict[str, Any]) -> dict[str, Any]: @@ -756,13 +1044,6 @@ def create_xlate_rule(rule: dict[str, Any]) -> dict[str, Any]: return xlate_rule -def handle_combined_nat_rule( - rule: dict[str, Any], rule_orig: dict[str, Any], config2import: dict[str, Any], nat_rule_number: int, dev_id: int -) -> dict[str, Any] | None: - # TODO: see fOS_rule for reference implementation - raise NotImplementedError("handle_combined_nat_rule is not implemented yet") - - def extract_nat_objects(nwobj_list: list[str], all_nwobjects: list[dict[str, str]]) -> list[dict[str, str]]: nat_obj_list: list[dict[str, str]] = [] for obj in nwobj_list: @@ -774,19 +1055,454 @@ def extract_nat_objects(nwobj_list: list[str], all_nwobjects: list[dict[str, str return nat_obj_list -def add_users_to_rule(rule_orig: dict[str, Any], rule: dict[str, Any]) -> None: - if "groups" in rule_orig: - add_users(rule_orig["groups"], rule) - if "users" in rule_orig: - add_users(rule_orig["users"], rule) +def is_nat_rule( + native_rule: dict[str, Any], + normalized_config_adom: dict[str, Any], + normalized_config_global: dict[str, Any], +) -> tuple[bool, bool]: + is_snat = any(key in native_rule and native_rule[key] == 1 for key in ["nat", "nat46", "nat64"]) + dst_addrs = native_rule.get("dstaddr", []) + native_rule.get("dstaddr6", []) -def add_users(users: list[str], rule: dict[str, Any]) -> None: - for user in users: - rule_src_with_users = [user + "@" + src for src in rule["rule_src"].split(LIST_DELIMITER)] + vip_objects: list[dict[str, Any]] = [] + network_objects = normalized_config_adom.get("network_objects", []) + normalized_config_global.get( + "network_objects", [] + ) - rule["rule_src"] = LIST_DELIMITER.join(rule_src_with_users) + for nw_obj in network_objects: + if "firewall/vip" in nw_obj.get("obj_native_type", ""): + vip_objects.extend([nw_obj]) - # here user ref is the user name itself - rule_src_refs_with_users = [user + "@" + src for src in rule["rule_src_refs"].split(LIST_DELIMITER)] - rule["rule_src_refs"] = LIST_DELIMITER.join(rule_src_refs_with_users) + for addr in dst_addrs: + if any(addr == vip_obj.get("obj_name") for vip_obj in vip_objects): + return is_snat, True + + return is_snat, False + + +def parse_nat_ip( + entries: list[str], + native_rule: dict[str, Any], + normalized_config_adom: dict[str, Any], +) -> tuple[list[str], list[str]]: + """ + Example entries: ["1.2.3.4", "255.255.255.255"] + + Creates a network object for + """ + if len(entries) != EXPECTED_NATIP_LIST_LENGTH: + FWOLogger.warning(f"Unexpected number of entries for NAT IP parsing: {len(entries)}. Expected 2.") + return [], [] + + parsed_ip = str(IPNetwork(f"{entries[0]}/{entries[1]}")) + uid = f"{native_rule.get('uuid', 'Translated_IP')}_Translated_IP" + normalized_config_adom["network_objects"].append( + create_network_object( + name=native_rule.get("name", "Translated_IP"), + obj_type="network", + ip=parsed_ip, + ip_end=parsed_ip, + uid=uid, + color="black", + comment="Translated IP network object created by FWO importer for NAT purposes", + zone="global", + ) + ) + + return [parsed_ip], [uid] + + +def prepare_translated_nat_fields( + rule_src_list: list[str], + rule_dst_list: list[str], + rule_svc_list: list[str], + translated_src_list: list[str], + translated_src_refs_list: list[str], + translated_dst_list: list[str], + translated_dst_refs_list: list[str], + translated_svc_list: list[str], + translated_svc_refs_list: list[str], + native_rule: dict[str, Any], + is_snat: bool, + is_dnat: bool, + normalized_config_adom: dict[str, Any], +) -> tuple[list[str], list[str], list[str], list[str], list[str], list[str]]: + translated_dst_list_local = list(translated_dst_list) + translated_dst_refs_list_local = list(translated_dst_refs_list) + translated_svc_list_local = list(translated_svc_list) + translated_svc_refs_list_local = list(translated_svc_refs_list) + + if set(translated_src_list) == set(rule_src_list): + translated_src_list = ["Original"] + translated_src_refs_list = ["Original"] + + if is_snat and native_rule.get("ippool") == 0: + translated_src_list = ["Outgoing Interface IP"] + translated_src_refs_list = ["Outgoing_Interface_IP"] + + if set(translated_dst_list_local) == set(rule_dst_list) and not is_dnat: + translated_dst_list_local = ["Original"] + translated_dst_refs_list_local = ["Original"] + + if set(translated_svc_list_local) == set(rule_svc_list): + translated_svc_list_local = ["Original"] + translated_svc_refs_list_local = ["Original"] + + if native_rule.get("rtp-nat") == 1: + translated_src_list, translated_src_refs_list = parse_nat_ip( + native_rule.get("natip", []), native_rule, normalized_config_adom + ) + + return ( + translated_src_list, + translated_src_refs_list, + translated_dst_list_local, + translated_dst_refs_list_local, + translated_svc_list_local, + translated_svc_refs_list_local, + ) + + +def parse_nat_rules_in_rulebase( + normalized_config_adom: dict[str, Any], + normalized_config_global: dict[str, Any], + rulebase_to_parse: dict[str, Any], + normalized_nat_rulebase: Rulebase, +): + """ + Extracts NAT rules from a rulebase and creates normalized NAT rules. + Creates two RuleNormalized objects per NAT rule (original + translated). + """ + rule_num = 0 + for native_rule in rulebase_to_parse.get("data", []): + # Check if this is a NAT rule + is_snat, is_dnat = is_nat_rule(native_rule, normalized_config_adom, normalized_config_global) + + if not is_snat and not is_dnat: + continue + + rule_disabled = True + if "status" in native_rule and (native_rule["status"] == 1 or native_rule["status"] == "enable"): + rule_disabled = False + + # Parse addresses for original rule + rule_src_list, rule_src_refs_list = rule_parse_addresses( + native_rule, "src", normalized_config_adom, normalized_config_global, is_nat=True + ) + rule_dst_list, rule_dst_refs_list = rule_parse_addresses( + native_rule, "dst", normalized_config_adom, normalized_config_global, is_nat=True + ) + translated_src_list, translated_src_refs_list = get_nat_translated_source( + native_rule, normalized_config_adom, normalized_config_global + ) + + rule_svc_list, rule_svc_refs_list = rule_parse_service(native_rule) + + rule_src_zones = find_zones_in_normalized_config( + native_rule.get("srcintf", []), normalized_config_adom, normalized_config_global + ) + rule_dst_zones = find_zones_in_normalized_config( + native_rule.get("dstintf", []), normalized_config_adom, normalized_config_global + ) + + # Extract NAT config fields + nat_config_fields = extract_nat_config_fields(native_rule) + + rule_uid = native_rule.get("uuid") + if not rule_uid: + FWOLogger.warning("NAT rule without UUID, skipping") + continue + + # Prepare translated fields: if a translated field equals the original, + # replace it with the standard placeholder object "Original". + ensure_original_objects(normalized_config_adom, normalized_config_global) + + translated_dst_list = list(rule_dst_list) + translated_dst_refs_list = list(rule_dst_refs_list) + translated_svc_list = list(rule_svc_list) + translated_svc_refs_list = list(rule_svc_refs_list) + + ( + translated_src_list, + translated_src_refs_list, + translated_dst_list_local, + translated_dst_refs_list_local, + translated_svc_list_local, + translated_svc_refs_list_local, + ) = prepare_translated_nat_fields( + rule_src_list, + rule_dst_list, + rule_svc_list, + translated_src_list, + translated_src_refs_list, + translated_dst_list, + translated_dst_refs_list, + translated_svc_list, + translated_svc_refs_list, + native_rule, + is_snat, + is_dnat, + normalized_config_adom, + ) + + # Create original rule (match phase) + rule_original_uid = f"{rule_uid}-original" + rule_translated_uid = f"{rule_uid}-translated" + + rule_original = RuleNormalized( + rule_num=rule_num, + rule_num_numeric=0, + rule_disabled=rule_disabled, + rule_src_neg=False, + rule_src=LIST_DELIMITER.join(rule_src_list), + rule_src_refs=LIST_DELIMITER.join(rule_src_refs_list), + rule_dst_neg=False, + rule_dst=LIST_DELIMITER.join(rule_dst_list), + rule_dst_refs=LIST_DELIMITER.join(rule_dst_refs_list), + rule_svc_neg=False, + rule_svc=LIST_DELIMITER.join(rule_svc_list), + rule_svc_refs=LIST_DELIMITER.join(rule_svc_refs_list), + rule_action=rule_parse_action(native_rule), + rule_track=rule_parse_tracking_info(native_rule), + rule_installon=rule_parse_installon(native_rule), + rule_time=rule_parse_time(native_rule), + rule_name=native_rule.get("name", ""), + rule_uid=rule_original_uid, + rule_custom_fields=None, + rule_implied=False, + rule_type=RuleType.NAT, + last_change_admin=None, + parent_rule_uid=None, + last_hit=rule_parse_last_hit(native_rule), + rule_comment=native_rule.get("comments"), + rule_src_zone=LIST_DELIMITER.join(rule_src_zones), + rule_dst_zone=LIST_DELIMITER.join(rule_dst_zones), + rule_head_text=None, + access_rule=False, + nat_rule=True, + xlate_rule_uid=rule_translated_uid, + ) + + # Create translated rule (translation phase) + # Keep the original destination and service; translate the source to the NAT pool. + rule_translated = RuleNormalized( + rule_num=rule_num, + rule_num_numeric=0, + rule_disabled=rule_disabled, + rule_src_neg=False, + rule_src=LIST_DELIMITER.join(translated_src_list), + rule_src_refs=LIST_DELIMITER.join(translated_src_refs_list), + rule_dst_neg=False, + rule_dst=LIST_DELIMITER.join(translated_dst_list_local), + rule_dst_refs=LIST_DELIMITER.join(translated_dst_refs_list_local), + rule_svc_neg=False, + rule_svc=LIST_DELIMITER.join(translated_svc_list_local), + rule_svc_refs=LIST_DELIMITER.join(translated_svc_refs_list_local), + rule_action=rule_parse_action(native_rule), + rule_track=rule_parse_tracking_info(native_rule), + rule_installon=rule_parse_installon(native_rule), + rule_time=rule_parse_time(native_rule), + rule_name=native_rule.get("name", ""), + rule_uid=rule_translated_uid, + rule_custom_fields=nat_config_fields, + rule_implied=False, + rule_type=RuleType.NAT, + last_change_admin=None, + parent_rule_uid=None, + last_hit=rule_parse_last_hit(native_rule), + rule_comment=native_rule.get("comments"), + rule_src_zone=LIST_DELIMITER.join(rule_src_zones), + rule_dst_zone=LIST_DELIMITER.join(rule_dst_zones), + rule_head_text=None, + access_rule=False, + nat_rule=True, + xlate_rule_uid=None, + ) + + # Add both rules to the NAT rulebase + if rule_original.rule_uid: + normalized_nat_rulebase.rules[rule_original.rule_uid] = rule_original + if rule_translated.rule_uid: + normalized_nat_rulebase.rules[rule_translated.rule_uid] = rule_translated + + rule_num += 1 + + +def _as_list(val: Any) -> list[Any] | None: # pyright: ignore[reportUnknownParameterType] + if isinstance(val, list) and val: + return val # pyright: ignore[reportUnknownVariableType] + if isinstance(val, str) and val: + return [val] + return None + + +def extract_nat_config_fields(native_rule: dict[str, Any]) -> str: + """ + Extracts NAT-specific configuration fields from a native rule. + Returns a JSON string with NAT translation metadata. + """ + nat_config: dict[str, Any] = {} + + if native_rule.get("ippool") == 1: + nat_config["ippool"] = 1 + poolname6 = _as_list(native_rule.get("poolname6")) + if poolname6 is not None: + nat_config["poolname6"] = poolname6 + + poolname = _as_list(native_rule.get("poolname")) + if poolname is not None: + nat_config["poolname"] = poolname + + if "fixedport" in native_rule: + nat_config["fixedport"] = native_rule.get("fixedport") + + for key, name in (("nat", "nat"), ("nat46", "nat46"), ("nat64", "nat64")): + if native_rule.get(key) == 1: + nat_config["nat_type"] = name + break + + return json.dumps(nat_config, sort_keys=True) if nat_config else "{}" + + +def get_nat_translated_source( + native_rule: dict[str, Any], + normalized_config_adom: dict[str, Any], + normalized_config_global: dict[str, Any], +) -> tuple[list[str], list[str]]: + if native_rule.get("ippool") == 1: + is_ipv6 = "poolname6" in native_rule and native_rule.get("poolname6") not in (None, [], "") + poolname = native_rule.get("poolname6" if is_ipv6 else "poolname", []) + if isinstance(poolname, str): + poolname = [poolname] + translated_src_list = sorted(poolname) + translated_src_refs_list = [ + find_addr_ref( + pool, + is_v4=not is_ipv6, + normalized_config_adom=normalized_config_adom, + normalized_config_global=normalized_config_global, + ) + for pool in translated_src_list + ] + return translated_src_list, translated_src_refs_list + + rule_src_list, rule_src_refs_list = rule_parse_addresses( + native_rule, "src", normalized_config_adom, normalized_config_global, is_nat=True + ) + return rule_src_list, rule_src_refs_list + + +def new_process_nat_rules_for_rulebase( + gateway: dict[str, Any], + normalized_config_adom: dict[str, Any], + normalized_config_global: dict[str, Any], + rulebase_to_parse: dict[str, Any], + normalized_rulebase: Rulebase, +) -> None: + has_nat_rules = any( + any(key in native_rule and native_rule[key] == 1 for key in ["nat", "nat46", "nat64"]) + for native_rule in rulebase_to_parse.get("data", []) + ) + + if not has_nat_rules: + return + + normalized_nat_rulebase = insert_parent_nat_rulebase( + normalized_config_adom, + normalized_config_global, + normalized_rulebase.uid, + normalized_rulebase.mgm_uid, + ) + + insert_nat_rulebase_link( + from_rulebase_uid=normalized_rulebase.uid, + to_rulebase_uid=normalized_nat_rulebase.uid, + gateway=gateway, + ) + + parse_nat_rules_in_rulebase( + normalized_config_adom, + normalized_config_global, + rulebase_to_parse, + normalized_nat_rulebase, + ) + + +def normalize_nat_rulebase( + rulebase_link: dict[str, Any], + native_config: dict[str, Any], + normalized_config_adom: dict[str, Any], + normalized_config_global: dict[str, Any], +): + normalized_config_adom.setdefault("nat_policies", []) + link_type = rulebase_link.get("link_type", rulebase_link.get("type", "ordered")) + if link_type == "nat": + return + + if not rulebase_link["is_section"]: + for nat_type in nat_types: + nat_type_string = nat_type + "_" + rulebase_link["to_rulebase_uid"] + nat_rulebase = get_native_nat_rulebase(native_config, nat_type_string) + parse_nat_rulebase( + nat_rulebase, + nat_type_string, + normalized_config_adom, + normalized_config_global, + ) + + normalized_config_adom["nat_policies"].extend(nat_rulebase) # pyright: ignore[reportUnknownMemberType] + + +def get_native_nat_rulebase(native_config: dict[str, Any], nat_type_string: str) -> list[dict[str, Any]]: + for nat_rulebase in native_config["nat_rulebases"]: + if nat_type_string == nat_rulebase["type"]: + return nat_rulebase["data"] + FWOLogger.warning("no nat data for " + nat_type_string) + return [] + + +def insert_parent_nat_rulebase( + normalized_config_adom: dict[str, Any], + _normalized_config_global: dict[str, Any], + rulebase_uid: str, + mgm_uid: str, +) -> Rulebase: + # Creates a NAT rulebase for the given access rulebase. + nat_rulebase_uid = "nat-rulebase-" + rulebase_uid + normalized_nat_rulebase = Rulebase( + uid=nat_rulebase_uid, + mgm_uid=mgm_uid, + name="NAT", + rules={}, + ) + + # Add to adom policies (avoid duplicates) + if not any(rb for rb in normalized_config_adom["policies"] if rb.uid == normalized_nat_rulebase.uid): + normalized_config_adom["policies"].append(normalized_nat_rulebase) + + return normalized_nat_rulebase + + +def insert_nat_rulebase_link( + from_rulebase_uid: str, + to_rulebase_uid: str, + gateway: dict[str, Any], +) -> None: + # Creates a RulebaseLink with link_type='nat' connecting access rulebase to NAT rulebase. + if not any( + link + for link in gateway["rulebase_links"] + if link.get("to_rulebase_uid") == to_rulebase_uid + and link.get("link_type") == "nat" + and link.get("from_rulebase_uid") == from_rulebase_uid + ): + gateway["rulebase_links"].append( + { + "from_rulebase_uid": from_rulebase_uid, + "to_rulebase_uid": to_rulebase_uid, + "type": "nat", + "is_initial": False, + "is_global": False, + "is_section": False, + } + ) diff --git a/roles/importer/files/importer/fwo_log.py b/roles/importer/files/importer/fwo_log.py index c27aa33df6..190c3165a9 100644 --- a/roles/importer/files/importer/fwo_log.py +++ b/roles/importer/files/importer/fwo_log.py @@ -1,14 +1,21 @@ from __future__ import annotations import logging +import os +import sys import threading import time from contextlib import contextmanager from typing import TYPE_CHECKING, Any, Literal, Protocol, TextIO, TypeAlias, cast -try: - import fcntl as fcntl_module -except ImportError: +# Returns True if the OS is Linux, macOS (Darwin), or BSD-based systems +is_unix = sys.platform in ("linux", "darwin", "freebsd") +if is_unix: + try: + import fcntl as fcntl_module + except ImportError: + fcntl_module = None +else: fcntl_module = None diff --git a/roles/importer/files/importer/model_controllers/check_consistency.py b/roles/importer/files/importer/model_controllers/check_consistency.py index 1329f6c09f..28a40a1d06 100644 --- a/roles/importer/files/importer/model_controllers/check_consistency.py +++ b/roles/importer/files/importer/model_controllers/check_consistency.py @@ -528,7 +528,7 @@ def _check_rule_consistency(self, rulebases: list[Rulebase]) -> tuple[int, list[ if rule.rule_uid is None: rules_missing_uid += 1 continue - if rule.rule_uid in seen_rule_uids: + if rule.rule_uid in seen_rule_uids and rule.rule_type != "nat": duplicate_rule_uids.append(rule.rule_uid) seen_rule_uids.add(rule.rule_uid) if rule.rule_src == "" or rule.rule_dst == "": diff --git a/roles/importer/files/importer/model_controllers/fwconfig_import_rule.py b/roles/importer/files/importer/model_controllers/fwconfig_import_rule.py index 4ac6131df8..7d1268218e 100644 --- a/roles/importer/files/importer/model_controllers/fwconfig_import_rule.py +++ b/roles/importer/files/importer/model_controllers/fwconfig_import_rule.py @@ -116,10 +116,23 @@ def update_rulebase_diffs(self, prev_config: FwConfigNormalized) -> None: { rule_uid: (curr_rules[rule_uid], curr_rule_to_rulebase[rule_uid]) for rule_uid in (added_rule_uids | changed_rule_uids) + if curr_rules[rule_uid].xlate_rule_uid is None } ) - self.uid2id_mapper.add_rule_mappings(inserted_rule_ids) + + # add new NAT rules separately after all non-NAT rules have been added, to ensure that all xlate rules are already in the database + # and can be referenced by their new numeric id in the xlate_rule field of the NAT rules + num_inserted_nat_rules, inserted_nat_rule_ids = self.add_new_rules( + { + rule_uid: (curr_rules[rule_uid], curr_rule_to_rulebase[rule_uid]) + for rule_uid in (added_rule_uids | changed_rule_uids) + if curr_rules[rule_uid].xlate_rule_uid is not None + } + ) + num_inserted_rules += num_inserted_nat_rules + self.uid2id_mapper.add_rule_mappings(inserted_nat_rule_ids) + refs_added = self.add_new_refs(prev_config) num_set_removed_rules, _removed_rule_ids = self.mark_rules_removed(list(removed_rule_uids | changed_rule_uids)) @@ -1107,6 +1120,7 @@ def update_rule_enforced_on_gateway(self, changed_rule_uids: set[str]) -> tuple[ def prepare_rule_for_import(self, rule: RuleNormalized, rulebase_uid: str) -> Rule: rulebase_id = self.uid2id_mapper.get_rulebase_id(rulebase_uid) + xlate_rule_id = self.uid2id_mapper.get_rule_id(rule.xlate_rule_uid) if rule.xlate_rule_uid else None return Rule( mgm_id=self.import_details.state.mgm_details.current_mgm_id, rule_num=rule.rule_num, @@ -1130,8 +1144,8 @@ def prepare_rule_for_import(self, rule: RuleNormalized, rulebase_uid: str) -> Ru rule_comment=rule.rule_comment, rule_src_zone=rule.rule_src_zone, rule_dst_zone=rule.rule_dst_zone, - access_rule=True, - nat_rule=False, + access_rule=rule.access_rule, + nat_rule=rule.nat_rule, is_global=False, rulebase_id=rulebase_id, rule_create=self.import_details.state.import_id, @@ -1141,6 +1155,7 @@ def prepare_rule_for_import(self, rule: RuleNormalized, rulebase_uid: str) -> Ru rule_head_text=rule.rule_head_text, rule_installon=rule.rule_installon, last_change_admin=None, # TODO: get id from rule.last_change_admin + xlate_rule=xlate_rule_id, ) def write_changelog_rules( diff --git a/roles/importer/files/importer/models/rule.py b/roles/importer/files/importer/models/rule.py index 29e6d112b2..163c12b1c4 100644 --- a/roles/importer/files/importer/models/rule.py +++ b/roles/importer/files/importer/models/rule.py @@ -65,6 +65,9 @@ class RuleNormalized(BaseModel): # noqa: PLW1641 rule_src_zone: str | None = None rule_dst_zone: str | None = None rule_head_text: str | None = None + xlate_rule_uid: str | None = None + nat_rule: bool = False + access_rule: bool = True @field_validator("last_hit") @classmethod diff --git a/roles/importer/files/importer/test/test_checkpoint_file_import.py b/roles/importer/files/importer/test/test_checkpoint_file_import.py index 23f773ddc8..b85b82b5b2 100644 --- a/roles/importer/files/importer/test/test_checkpoint_file_import.py +++ b/roles/importer/files/importer/test/test_checkpoint_file_import.py @@ -35,3 +35,18 @@ def test_checkpoint_native_file_import_skips_login(self, import_state_controller _, result = fwcommon.get_config(config_in, import_state) assert result is config_in + + def test_get_ordered_layer_uids_adds_access_layers_once_for_multiple_matching_targets(self): + policy = { + "uid": "policy-uid", + "targets": [{"uid": "device-uid"}, {"uid": "all"}], + "access-layers": [ + {"uid": "layer-1", "domain": "domain-a"}, + {"uid": "layer-2", "domain": "domain-a"}, + ], + } + device_config = {"uid": "device-uid"} + + ordered_layer_uids = fwcommon.get_ordered_layer_uids(policy, device_config, "domain-a") + + assert ordered_layer_uids == ["policy-uid", "layer-1", "layer-2"] diff --git a/roles/importer/files/importer/test/test_cp_nat.py b/roles/importer/files/importer/test/test_cp_nat.py new file mode 100644 index 0000000000..31ce5c2ab5 --- /dev/null +++ b/roles/importer/files/importer/test/test_cp_nat.py @@ -0,0 +1,199 @@ +from typing import Any + +from fw_modules.checkpointR8x.cp_nat import ( + get_initial_nat_rulebase_link, + insert_parent_nat_rulebase, + insert_rulebase_link, + parse_nat_rule_transform, +) +from model_controllers.import_state_controller import ImportStateController +from models.rulebase import Rulebase + + +def _make_nat_rule(uid: str = "rule-uid-1") -> dict[str, Any]: + return { + "uid": uid, + "original-source": {"uid": "src-uid", "type": "host", "name": "OrigSrc"}, + "original-destination": {"uid": "dst-uid", "type": "host", "name": "OrigDst"}, + "original-service": {"uid": "svc-uid", "type": "simple", "name": "OrigSvc"}, + "translated-source": {"uid": "t-src-uid", "type": "host", "name": "TransSrc"}, + "translated-destination": {"uid": "t-dst-uid", "type": "host", "name": "TransDst"}, + "translated-service": {"uid": "t-svc-uid", "type": "simple", "name": "TransSvc"}, + "install-on": [{"uid": "gw-uid", "name": "gw"}], + "time": {"uid": "time-uid", "name": "Any"}, + "enabled": True, + "comments": "a test rule", + } + + +class TestParseNatRuleTransform: + def test_returns_tuple_of_two(self): + result = parse_nat_rule_transform(_make_nat_rule()) + assert len(result) == 2 + + def test_in_rule_maps_original_fields(self): + nat_rule = _make_nat_rule("r1") + in_rule, _ = parse_nat_rule_transform(nat_rule) + + assert in_rule["uid"] == "r1" + assert in_rule["source"] == [nat_rule["original-source"]] + assert in_rule["destination"] == [nat_rule["original-destination"]] + assert in_rule["service"] == [nat_rule["original-service"]] + assert in_rule["type"] == "nat" + assert in_rule["nat_rule"] is True + assert in_rule["access_rule"] is False + + def test_out_rule_maps_translated_fields(self): + nat_rule = _make_nat_rule("r2") + _, out_rule = parse_nat_rule_transform(nat_rule) + + assert out_rule["uid"] == "r2_translated" + assert out_rule["source"] == [nat_rule["translated-source"]] + assert out_rule["destination"] == [nat_rule["translated-destination"]] + assert out_rule["service"] == [nat_rule["translated-service"]] + assert out_rule["nat_rule"] is True + assert out_rule["access_rule"] is False + + def test_xlate_rule_uid_links_in_and_out(self): + in_rule, out_rule = parse_nat_rule_transform(_make_nat_rule("r3")) + assert in_rule["xlate_rule_uid"] == out_rule["uid"] + + def test_enabled_and_comments_propagated_to_in_rule(self): + nat_rule = _make_nat_rule() + nat_rule["enabled"] = False + nat_rule["comments"] = "disabled rule" + in_rule, _ = parse_nat_rule_transform(nat_rule) + + assert in_rule["enabled"] is False + assert in_rule["comments"] == "disabled rule" + + def test_out_rule_always_enabled(self): + nat_rule = _make_nat_rule() + nat_rule["enabled"] = False + _, out_rule = parse_nat_rule_transform(nat_rule) + + assert out_rule["enabled"] is True + + def test_in_rule_rule_number_is_zero(self): + in_rule, out_rule = parse_nat_rule_transform(_make_nat_rule()) + assert in_rule["rule-number"] == 0 + assert out_rule["rule-number"] == 0 + + def test_missing_time_field_defaults_to_empty_string(self): + nat_rule = _make_nat_rule() + del nat_rule["time"] + in_rule, _ = parse_nat_rule_transform(nat_rule) + + assert in_rule["time"] == "time" + + +class TestInsertRulebaseLink: + def _make_gateway(self) -> dict[str, Any]: + return {"RulebaseLinks": []} + + def test_adds_new_link(self): + gateway = self._make_gateway() + insert_rulebase_link("from-rb", "to-rb", "nat", gateway) + + assert len(gateway["RulebaseLinks"]) == 1 + link = gateway["RulebaseLinks"][0] + assert link["from_rulebase_uid"] == "from-rb" + assert link["to_rulebase_uid"] == "to-rb" + assert link["link_type"] == "nat" + assert link["is_initial"] is False + assert link["is_global"] is False + assert link["is_section"] is False + + def test_does_not_add_duplicate_link(self): + gateway = self._make_gateway() + insert_rulebase_link("from-rb", "to-rb", "nat", gateway) + insert_rulebase_link("from-rb", "to-rb", "nat", gateway) + + assert len(gateway["RulebaseLinks"]) == 1 + + def test_adds_different_link_type_separately(self): + gateway = self._make_gateway() + insert_rulebase_link("from-rb", "to-rb", "nat", gateway) + insert_rulebase_link("from-rb", "to-rb", "ordered", gateway) + + assert len(gateway["RulebaseLinks"]) == 2 + + def test_adds_different_from_rulebase_separately(self): + gateway = self._make_gateway() + insert_rulebase_link("from-rb-1", "to-rb", "nat", gateway) + insert_rulebase_link("from-rb-2", "to-rb", "nat", gateway) + + assert len(gateway["RulebaseLinks"]) == 2 + + +class TestInsertParentNatRulebase: + def test_creates_nat_rulebase_when_missing(self, import_state_controller: ImportStateController): + normalized_config: dict[str, Any] = {"policies": []} + gateway = {"uid": "gw-1"} + + result = insert_parent_nat_rulebase(gateway, import_state_controller.state, normalized_config) + + assert result.uid == "nat-rulebase-gw-1" + assert result.name == "NAT" + assert len(normalized_config["policies"]) == 1 + + def test_returns_existing_nat_rulebase_without_duplicate(self, import_state_controller: ImportStateController): + existing = Rulebase(uid="nat-rulebase-gw-2", name="NAT", mgm_uid="mgm-uid-1") + normalized_config = {"policies": [existing]} + gateway = {"uid": "gw-2"} + + result = insert_parent_nat_rulebase(gateway, import_state_controller.state, normalized_config) + + assert result is existing + assert len(normalized_config["policies"]) == 1 + + +class TestGetInitialNatRulebaseLink: + def _make_normalized_config_with_gateway( + self, gateway_uid: str, rulebase_links: list[dict[str, Any]] + ) -> dict[str, Any]: + return { + "gateways": [ + { + "Uid": gateway_uid, + "RulebaseLinks": rulebase_links, + } + ] + } + + def test_returns_initial_ordered_link(self): + gateway = {"uid": "gw-1"} + normalized_config = self._make_normalized_config_with_gateway( + "gw-1", + [ + {"is_initial": True, "link_type": "ordered", "to_rulebase_uid": "rb-access"}, + {"is_initial": False, "link_type": "nat", "to_rulebase_uid": "rb-nat"}, + ], + ) + + result = get_initial_nat_rulebase_link(gateway, normalized_config) + + assert result is not None + assert result["to_rulebase_uid"] == "rb-access" + + def test_returns_none_when_gateway_not_found(self): + gateway = {"uid": "unknown-gw"} + normalized_config = self._make_normalized_config_with_gateway("gw-1", []) + + result = get_initial_nat_rulebase_link(gateway, normalized_config) + + assert result is None + + def test_returns_none_when_no_initial_ordered_link(self): + gateway = {"uid": "gw-1"} + normalized_config = self._make_normalized_config_with_gateway( + "gw-1", + [ + {"is_initial": False, "link_type": "ordered", "to_rulebase_uid": "rb-1"}, + {"is_initial": True, "link_type": "nat", "to_rulebase_uid": "rb-nat"}, + ], + ) + + result = get_initial_nat_rulebase_link(gateway, normalized_config) + + assert result is None diff --git a/roles/importer/files/importer/test/test_fortiadom5ff.py b/roles/importer/files/importer/test/test_fortiadom5ff.py index 477714517c..1e7dbfcb7e 100644 --- a/roles/importer/files/importer/test/test_fortiadom5ff.py +++ b/roles/importer/files/importer/test/test_fortiadom5ff.py @@ -1,9 +1,16 @@ +import json from datetime import datetime, timezone +from typing import Any import pytest -from fw_modules.fortiadom5ff.fmgr_rule import rule_parse_last_hit +from fw_modules.fortiadom5ff.fmgr_rule import ( + extract_nat_config_fields, + parse_nat_rules_in_rulebase, + rule_parse_last_hit, +) from fw_modules.fortiadom5ff.fwcommon import to_time_object from fwo_exceptions import ImportInterruptionError +from models.rulebase import Rulebase from models.time_object import TimeObject from pytest_mock import MockerFixture @@ -152,3 +159,146 @@ def test_rule_parse_last_hit_returns_offset_aware_iso_timestamp(): parsed_time = datetime.fromisoformat(parsed) assert parsed_time.tzinfo is not None assert int(parsed_time.timestamp()) == epoch_seconds + + +def test_extract_nat_config_fields_serializes_poolname_and_fixedport(): + nat_config_fields = extract_nat_config_fields( + { + "nat": 1, + "ippool": 1, + "poolname": ["pool-a", "pool-b"], + "fixedport": 1, + } + ) + + assert json.loads(nat_config_fields) == { + "fixedport": 1, + "ippool": 1, + "nat_type": "nat", + "poolname": ["pool-a", "pool-b"], + } + + +def test_parse_nat_rules_in_rulebase_keeps_translation_metadata_on_translated_rule(): + normalized_config_adom = { + "network_objects": [ + {"obj_name": "src-net", "obj_uid": "src-net-uid", "obj_ip": "10.0.0.0/24"}, + {"obj_name": "dst-net", "obj_uid": "dst-net-uid", "obj_ip": "10.0.1.0/24"}, + {"obj_name": "pool-a", "obj_uid": "pool-a-uid", "obj_ip": "10.0.2.1/32"}, + ], + "zone_objects": [{"zone_name": "inside"}, {"zone_name": "outside"}], + "policies": [], + "rules": [], + } + normalized_config_global: dict[str, list[Any]] = { + "network_objects": [], + "zone_objects": [], + "policies": [], + "rules": [], + } + native_rulebase = { + "data": [ + { + "uuid": "nat-rule-uid", + "name": "nat-rule", + "nat": 1, + "status": 1, + "srcaddr": ["src-net"], + "dstaddr": ["dst-net"], + "service": ["ALL"], + "srcintf": ["inside"], + "dstintf": ["outside"], + "ippool": 1, + "poolname": ["pool-a"], + "fixedport": 1, + } + ] + } + normalized_nat_rulebase = Rulebase(uid="nat-rulebase-test", name="NAT", mgm_uid="mgm", rules={}) + + parse_nat_rules_in_rulebase( + normalized_config_adom, + normalized_config_global, + native_rulebase, + normalized_nat_rulebase, + ) + + assert set(normalized_nat_rulebase.rules) == {"nat-rule-uid-original", "nat-rule-uid-translated"} + + original_rule = normalized_nat_rulebase.rules["nat-rule-uid-original"] + assert original_rule.rule_custom_fields is None + + translated_rule = normalized_nat_rulebase.rules["nat-rule-uid-translated"] + assert translated_rule.rule_src == "pool-a" + assert translated_rule.rule_src_refs == "pool-a-uid" + assert translated_rule.rule_dst == "Original" + assert translated_rule.rule_dst_refs == "Original" + assert translated_rule.rule_src_zone == "inside" + assert translated_rule.rule_dst_zone == "outside" + assert json.loads(translated_rule.rule_custom_fields or "{}") == { + "fixedport": 1, + "ippool": 1, + "nat_type": "nat", + "poolname": ["pool-a"], + } + + +def test_parse_nat_rules_in_rulebase_supports_ipv6_nat_pool_translation(): + normalized_config_adom = { + "network_objects": [ + {"obj_name": "src-net-v6", "obj_uid": "src-net-v6-uid", "obj_ip": "2001:db8::/64"}, + {"obj_name": "dst-net-v6", "obj_uid": "dst-net-v6-uid", "obj_ip": "2001:db8:1::/64"}, + {"obj_name": "pool-v6", "obj_uid": "pool-v6-uid", "obj_ip": "2001:db8:2::1/128"}, + ], + "zone_objects": [{"zone_name": "inside"}, {"zone_name": "outside"}], + "policies": [], + "rules": [], + } + normalized_config_global: dict[str, list[Any]] = { + "network_objects": [], + "zone_objects": [], + "policies": [], + "rules": [], + } + native_rulebase = { + "data": [ + { + "uuid": "nat-rule-v6-uid", + "name": "nat-rule-v6", + "nat": 1, + "status": 1, + "srcaddr6": ["src-net-v6"], + "dstaddr6": ["dst-net-v6"], + "service": ["ALL"], + "srcintf": ["inside"], + "dstintf": ["outside"], + "ippool": 1, + "poolname6": ["pool-v6"], + "fixedport": 1, + } + ] + } + normalized_nat_rulebase = Rulebase(uid="nat-rulebase-v6-test", name="NAT", mgm_uid="mgm", rules={}) + + parse_nat_rules_in_rulebase( + normalized_config_adom, + normalized_config_global, + native_rulebase, + normalized_nat_rulebase, + ) + + assert set(normalized_nat_rulebase.rules) == {"nat-rule-v6-uid-original", "nat-rule-v6-uid-translated"} + + translated_rule = normalized_nat_rulebase.rules["nat-rule-v6-uid-translated"] + assert translated_rule.rule_src == "pool-v6" + assert translated_rule.rule_src_refs == "pool-v6-uid" + assert translated_rule.rule_dst == "Original" + assert translated_rule.rule_dst_refs == "Original" + assert translated_rule.rule_src_zone == "inside" + assert translated_rule.rule_dst_zone == "outside" + assert json.loads(translated_rule.rule_custom_fields or "{}") == { + "fixedport": 1, + "ippool": 1, + "nat_type": "nat", + "poolname6": ["pool-v6"], + } diff --git a/roles/lib/files/FWO.Data/NormalizedRule.cs b/roles/lib/files/FWO.Data/NormalizedRule.cs index d36ff01c14..67021c081c 100644 --- a/roles/lib/files/FWO.Data/NormalizedRule.cs +++ b/roles/lib/files/FWO.Data/NormalizedRule.cs @@ -89,6 +89,15 @@ public class NormalizedRule [JsonProperty("rule_head_text"), JsonPropertyName("rule_head_text")] public string? RuleHeadText { get; set; } + [JsonProperty("xlate_rule_uid"), JsonPropertyName("xlate_rule_uid")] + public string? XlateRule { get; set; } + + [JsonProperty("nat_rule"), JsonPropertyName("nat_rule")] + public bool NatRule { get; set; } + + [JsonProperty("access_rule"), JsonPropertyName("access_rule")] + public bool AccessRule { get; set; } = true; + /// /// Creates a NormalizedRule from a Rule. /// @@ -127,7 +136,10 @@ public static NormalizedRule FromRule(Rule rule) RuleComment = rule.Comment, RuleSrcZone = rule.SourceZone, RuleDstZone = rule.DestinationZone, - RuleHeadText = rule.SectionHeader + RuleHeadText = rule.SectionHeader, + XlateRule = rule.TranslatedRule?.Uid ?? rule.XlateRule, + NatRule = rule.NatRule, + AccessRule = rule.AccessRule }; } } diff --git a/roles/lib/files/FWO.Data/Rule.cs b/roles/lib/files/FWO.Data/Rule.cs index 5a1a815999..2383682ca4 100644 --- a/roles/lib/files/FWO.Data/Rule.cs +++ b/roles/lib/files/FWO.Data/Rule.cs @@ -121,6 +121,9 @@ public class Rule [JsonProperty("nat_rule"), JsonPropertyName("nat_rule")] public bool NatRule { get; set; } + [JsonProperty("access_rule"), JsonPropertyName("access_rule")] + public bool AccessRule { get; set; } = true; + [JsonProperty("rulebase_id"), JsonPropertyName("rulebase_id")] public int RulebaseId { get; set; } @@ -151,9 +154,15 @@ public class Rule [JsonProperty("rule"), JsonPropertyName("rule")] public Rule? ParentRule { get; set; } + [JsonProperty("ruleByXlateRule"), JsonPropertyName("ruleByXlateRule")] + public Rule? TranslatedRule { get; set; } + [JsonProperty("rule_owners"), JsonPropertyName("rule_owners")] public RuleOwner?[] RuleOwner { get; set; } = []; + [JsonProperty("xlate_rule"), JsonPropertyName("xlate_rule")] + public string? XlateRule { get; set; } + [JsonProperty("flow_access_id"), JsonPropertyName("flow_access_id")] public long? FlowAccessId { get; set; } @@ -222,6 +231,7 @@ public Rule(Rule rule) CustomFields = rule.CustomFields; Implied = rule.Implied; NatRule = rule.NatRule; + AccessRule = rule.AccessRule; RulebaseId = rule.RulebaseId; RuleOrderNumber = rule.RuleOrderNumber; EnforcingGateways = rule.EnforcingGateways; @@ -232,6 +242,7 @@ public Rule(Rule rule) Rulebase = rule.Rulebase; LastChangeAdmin = rule.LastChangeAdmin; ParentRule = rule.ParentRule; + TranslatedRule = rule.TranslatedRule; DisplayOrderNumberString = rule.DisplayOrderNumberString; DisplayOrderNumber = rule.DisplayOrderNumber; Certified = rule.Certified; @@ -248,6 +259,7 @@ public Rule(Rule rule) Detailed = rule.Detailed; UnusedSpecialUserObjects = rule.UnusedSpecialUserObjects; UnusedUpdatableObjects = rule.UnusedUpdatableObjects; + XlateRule = rule.XlateRule; } public bool IsDropRule() diff --git a/roles/lib/files/FWO.Report.Filter/DynGraphqlQuery.cs b/roles/lib/files/FWO.Report.Filter/DynGraphqlQuery.cs index 531bdc3a8a..ecaac2d014 100644 --- a/roles/lib/files/FWO.Report.Filter/DynGraphqlQuery.cs +++ b/roles/lib/files/FWO.Report.Filter/DynGraphqlQuery.cs @@ -17,6 +17,7 @@ public class DynGraphqlQuery(string rawInput) public Dictionary QueryVariables { get; set; } = []; public string FullQuery { get; set; } = ""; public string RulebaseLinkWhereStatement { get; set; } = ""; + public string NatRulebaseLinkWhereStatement { get; set; } = ""; public string RuleWhereStatement { get; set; } = ""; public string NwObjWhereStatement { get; set; } = ""; public string SvcObjWhereStatement { get; set; } = ""; @@ -397,24 +398,45 @@ query natRulesReport ({paramString}) management({mgmtWhereString}) {{ id: mgm_id + uid: mgm_uid name: mgm_name devices ({devWhereStringDefault}) {{ id: dev_id name: dev_name - rulebase_links(where: {{ {query.RulebaseLinkWhereStatement} }}) + uid: dev_uid + rulebase_links(where: {{ {query.NatRulebaseLinkWhereStatement} }}) {{ - {query.OpenRulesTable} - {limitOffsetString} - where: {{ nat_rule: {{_eq: true}}, ruleByXlateRule: {{}} {query.RuleWhereStatement} }} - order_by: {{ rule_num_numeric: asc }} ) - {{ - mgm_id: mgm_id - ...{(filter.Detailed ? "natRuleDetails" : "natRuleOverview")} - }} + linkType: stm_link_type {{ + name + id + }} + link_type + is_initial + is_global + is_section + gw_id + from_rule_id + from_rulebase_id + to_rulebase_id + created + removed }} }} - }} + rulebases {{ + name + uid + id + {query.OpenRulesTable} + {limitOffsetString} + where: {{ nat_rule: {{_eq: true}}, ruleByXlateRule: {{}} {query.RuleWhereStatement} }} + order_by: {{ rule_num_numeric: asc }} ) + {{ + mgm_id: mgm_id + ...{(filter.Detailed ? "natRuleDetails" : "natRuleOverview")} + }} + }} + }} }}"; } @@ -800,9 +822,14 @@ private static void SetTimeFilter(ref DynGraphqlQuery query, TimeFilter? timeFil case ReportType.RecertEventReport: query.QueryParameters.Add("$import_id_start: bigint "); query.QueryParameters.Add("$import_id_end: bigint "); + string removedStatement = $"_or: [{{removed: {{_gt: $import_id_start}} }}, {{removed: {{_is_null: true}} }}]"; query.RulebaseLinkWhereStatement += $"created: {{_lte: $import_id_end }}" + - $"_or: [{{removed: {{_gt: $import_id_start}} }}, {{removed: {{_is_null: true}} }}]"; + removedStatement + + $" stm_link_type: {{id: {{_neq: 6}}}}"; // Filter out NAT rulebase links + query.NatRulebaseLinkWhereStatement += + $"created: {{_lte: $import_id_end }}" + + $"_or: [{{is_initial: {{_eq: true}}, {removedStatement}}}, {{link_type: {{_eq: 6}}, {removedStatement}}}]"; query.RuleWhereStatement += $"rule_create: {{_lte: $import_id_end}}" + $"_or: [{{removed: {{_gt: $import_id_start}} }}, {{removed: {{_is_null: true}} }}]"; diff --git a/roles/lib/files/FWO.Report/ReportBase.cs b/roles/lib/files/FWO.Report/ReportBase.cs index ba6fb32f87..c85b63b027 100644 --- a/roles/lib/files/FWO.Report/ReportBase.cs +++ b/roles/lib/files/FWO.Report/ReportBase.cs @@ -163,7 +163,7 @@ public static ReportBase ConstructReport(ReportTemplate reportFilter, UserConfig ReportType.Changes => new ReportChanges(query, userConfig, repType, reportFilter.ReportParams.TimeFilter, reportFilter.ReportParams.IncludeObjects), ReportType.ResolvedChanges => new ReportChanges(query, userConfig, repType, reportFilter.ReportParams.TimeFilter, reportFilter.ReportParams.IncludeObjects), ReportType.ResolvedChangesTech => new ReportChanges(query, userConfig, repType, reportFilter.ReportParams.TimeFilter, reportFilter.ReportParams.IncludeObjects), - ReportType.NatRules => new ReportNatRules(query, userConfig, repType), + ReportType.NatRules => new ReportNatRules(query, userConfig, repType, ruleTreeBuilder), ReportType.Recertification => new ReportRules(query, userConfig, repType, ruleTreeBuilder), ReportType.UnusedRules => new ReportRules(query, userConfig, repType, ruleTreeBuilder), ReportType.Connections => new ReportConnections(query, userConfig, repType), diff --git a/roles/lib/files/FWO.Report/ReportNatRules.cs b/roles/lib/files/FWO.Report/ReportNatRules.cs index 123c3b361b..11cac4395c 100644 --- a/roles/lib/files/FWO.Report/ReportNatRules.cs +++ b/roles/lib/files/FWO.Report/ReportNatRules.cs @@ -2,6 +2,7 @@ using FWO.Config.Api; using FWO.Data; using FWO.Report.Filter; +using FWO.Services.RuleTreeBuilder; using FWO.Ui.Display; using System.Text; @@ -9,7 +10,7 @@ namespace FWO.Report { public class ReportNatRules : ReportRules { - public ReportNatRules(DynGraphqlQuery query, UserConfig userConfig, ReportType reportType) : base(query, userConfig, reportType) { } + public ReportNatRules(DynGraphqlQuery query, UserConfig userConfig, ReportType reportType, IRuleTreeBuilder? ruleTreeBuilder = null) : base(query, userConfig, reportType, ruleTreeBuilder) { } private const int ColumnCount = 12; diff --git a/roles/lib/files/FWO.Services/RuleTreeBuilder/RuleTreeBuilder.cs b/roles/lib/files/FWO.Services/RuleTreeBuilder/RuleTreeBuilder.cs index ae69c506c8..a88e670d63 100644 --- a/roles/lib/files/FWO.Services/RuleTreeBuilder/RuleTreeBuilder.cs +++ b/roles/lib/files/FWO.Services/RuleTreeBuilder/RuleTreeBuilder.cs @@ -140,7 +140,7 @@ public List ProcessLink(RulebaseLink link, List? trail = null) { trail = ProcessSectionLink(link, rulebase, trail); } - else if (link.LinkType == 4) + else if (link.LinkType == 4 || link.LinkType == 6) { trail = ProcessConcatenationLink(link, rulebase, trail); } @@ -336,7 +336,7 @@ private Rule GetUniqueRuleObject(Rule rule) private List EnsureTrailStartsWithZeroForFirstRule(RulebaseLink link, RuleTreeItem lastAddedItem, List trail, int index) { if (index == 0 - && (!(link.LinkType == 4) + && (link.LinkType != 4 && link.LinkType != 6 || lastAddedItem.Parent?.Children.Count() == 1)) { trail = trail.ToList(); @@ -394,7 +394,7 @@ private void SetParentForTreeItem(RuleTreeItem item, RulebaseLink link, RuleTree { SetParentForTreeItem(RuleTree, item); } - else if (link.LinkType == 4 && lastAddedItem.Parent != null) + else if ((link.LinkType == 4 || link.LinkType == 6) && lastAddedItem.Parent != null) { SetParentForTreeItem((RuleTreeItem)lastAddedItem.Parent, item); } diff --git a/roles/tests-unit/files/FWO.Test/ExportTest.cs b/roles/tests-unit/files/FWO.Test/ExportTest.cs index 774d159ef7..7b0de2aa92 100644 --- a/roles/tests-unit/files/FWO.Test/ExportTest.cs +++ b/roles/tests-unit/files/FWO.Test/ExportTest.cs @@ -872,7 +872,7 @@ public void RulesGenerateJson() "\"rule_action\": \"accept\",\"rule_track\": \"none\",\"section_header\": \"\"," + "\"rule_metadatum\": {\"rule_metadata_id\": 0,\"rule_created\": null,\"created_import\": null,\"removed\": null,\"removed_import\": null,\"rule_first_hit\": null,\"rule_last_hit\": \"2022-04-19T00:00:00Z\",\"recertification\": [],\"recert_history\": [],\"rule_uid\": \"\",\"rules\": [],\"Recert\": false}," + "\"translate\": {\"rule_svc_neg\": false,\"rule_svc\": \"\",\"rule_services\": [],\"rule_src_neg\": false,\"rule_src\": \"\",\"rule_froms\": [],\"rule_dst_neg\": false,\"rule_dst\": \"\",\"rule_tos\": []}," + - "\"owner_name\": \"\",\"owner_id\": null,\"matches\": \"\",\"rule_custom_fields\": \"\",\"rule_implied\": false,\"nat_rule\": false,\"rulebase_id\": 0,\"rule_num\": 0,\"rule_enforced_on_gateways\": [],\"rule_installon\": null,\"rule_time\": null,\"rule_times\": [],\"violations\": [],\"rulebase\": {\"id\": 0,\"name\": \"\",\"uid\": \"\",\"mgm_id\": 0,\"is_global\": false,\"created\": 0,\"removed\": 0,\"rules\": []},\"uiuser\": null,\"rule\": null,\"rule_owners\": [],\"flow_access_id\": null,\"flow_access\": null,\"removed\": null,\"ChangeID\": \"\",\"AdoITID\": \"\",\"Compliance\": 0,\"ViolationDetails\": \"\",\"DisplayOrderNumberString\": \"1\",\"DisplayOrderNumber\": 1,\"Certified\": false,\"DeviceName\": \"\",\"RulebaseName\": \"\",\"DisregardedFroms\": [],\"DisregardedTos\": [],\"DisregardedServices\": [],\"ShowDisregarded\": false}," + + "\"owner_name\": \"\",\"owner_id\": null,\"matches\": \"\",\"rule_custom_fields\": \"\",\"rule_implied\": false,\"nat_rule\": false,\"access_rule\": true,\"rulebase_id\": 0,\"rule_num\": 0,\"rule_enforced_on_gateways\": [],\"rule_installon\": null,\"rule_time\": null,\"rule_times\": [],\"violations\": [],\"rulebase\": {\"id\": 0,\"name\": \"\",\"uid\": \"\",\"mgm_id\": 0,\"is_global\": false,\"created\": 0,\"removed\": 0,\"rules\": []},\"uiuser\": null,\"rule\": null,\"ruleByXlateRule\": null,\"rule_owners\": [],\"xlate_rule\": null,\"flow_access_id\": null,\"flow_access\": null,\"removed\": null,\"ChangeID\": \"\",\"AdoITID\": \"\",\"Compliance\": 0,\"ViolationDetails\": \"\",\"DisplayOrderNumberString\": \"1\",\"DisplayOrderNumber\": 1,\"Certified\": false,\"DeviceName\": \"\",\"RulebaseName\": \"\",\"DisregardedFroms\": [],\"DisregardedTos\": [],\"DisregardedServices\": [],\"ShowDisregarded\": false}," + "{\"rule_id\": 0,\"rule_uid\": \"uid2:123\",\"mgm_id\": 0,\"rule_num_numeric\": 0,\"rule_name\": \"TestRule2\",\"rule_comment\": \"comment2\",\"rule_disabled\": false," + "\"rule_services\": [{\"service\": {\"svc_id\": 2,\"svc_name\": \"TestService2\",\"svc_uid\": \"019e44c0-4816-7c9a-9b94-5417d3cbb15f\",\"svc_port\": 6666,\"svc_port_end\": 7777,\"svc_source_port\": null,\"svc_source_port_end\": null,\"svc_code\": \"\",\"svc_timeout\": null,\"svc_typ_id\": null,\"active\": false,\"svc_create\": 0,\"svc_create_time\": {\"time\": \"0001-01-01T00:00:00\"},\"service_type\": {\"name\": \"\"},\"svc_comment\": \"Comment-019e44c0-4816-7c9a-9b94-5417d3cbb15f\",\"svc_color_id\": null,\"stm_color\": null,\"ip_proto_id\": null,\"protocol_name\": {\"id\": 17,\"name\": \"UDP\"},\"svc_member_names\": \"Member-019e44c0-4816-7c9a-9b94-5417d3cbb15f\",\"svc_member_refs\": \"\",\"svcgrps\": [],\"svcgrp_flats\": [],\"svc_rpcnr\": null,\"flow_svcobj_id\": null,\"flow_svcobject\": null,\"flow_svcgrp_id\": null,\"flow_svcgroup\": null,\"flow_active\": false,\"removed\": null}}]," + "\"rule_svc_neg\": true,\"rule_svc\": \"\",\"rule_svc_refs\": \"\",\"rule_src_neg\": true,\"rule_src\": \"\",\"rule_src_refs\": \"\",\"rule_src_zone\": \"\",\"rule_from_zones\": []," + @@ -885,7 +885,7 @@ public void RulesGenerateJson() "\"rule_action\": \"deny\",\"rule_track\": \"none\",\"section_header\": \"\"," + "\"rule_metadatum\": {\"rule_metadata_id\": 0,\"rule_created\": null,\"created_import\": null,\"removed\": null,\"removed_import\": null,\"rule_first_hit\": null,\"rule_last_hit\": null,\"recertification\": [],\"recert_history\": [],\"rule_uid\": \"\",\"rules\": [],\"Recert\": false}," + "\"translate\": {\"rule_svc_neg\": false,\"rule_svc\": \"\",\"rule_services\": [],\"rule_src_neg\": false,\"rule_src\": \"\",\"rule_froms\": [],\"rule_dst_neg\": false,\"rule_dst\": \"\",\"rule_tos\": []}," + - "\"owner_name\": \"\",\"owner_id\": null,\"matches\": \"\",\"rule_custom_fields\": \"\",\"rule_implied\": false,\"nat_rule\": false,\"rulebase_id\": 0,\"rule_num\": 0,\"rule_enforced_on_gateways\": [],\"rule_installon\": null,\"rule_time\": null,\"rule_times\": [],\"violations\": [],\"rulebase\": {\"id\": 0,\"name\": \"\",\"uid\": \"\",\"mgm_id\": 0,\"is_global\": false,\"created\": 0,\"removed\": 0,\"rules\": []},\"uiuser\": null,\"rule\": null,\"rule_owners\": [],\"flow_access_id\": null,\"flow_access\": null,\"removed\": null,\"ChangeID\": \"\",\"AdoITID\": \"\",\"Compliance\": 0,\"ViolationDetails\": \"\",\"DisplayOrderNumberString\": \"\",\"DisplayOrderNumber\": 2,\"Certified\": false,\"DeviceName\": \"\",\"RulebaseName\": \"\",\"DisregardedFroms\": [],\"DisregardedTos\": [],\"DisregardedServices\": [],\"ShowDisregarded\": false}]}]," + + "\"owner_name\": \"\",\"owner_id\": null,\"matches\": \"\",\"rule_custom_fields\": \"\",\"rule_implied\": false,\"nat_rule\": false,\"access_rule\": true,\"rulebase_id\": 0,\"rule_num\": 0,\"rule_enforced_on_gateways\": [],\"rule_installon\": null,\"rule_time\": null,\"rule_times\": [],\"violations\": [],\"rulebase\": {\"id\": 0,\"name\": \"\",\"uid\": \"\",\"mgm_id\": 0,\"is_global\": false,\"created\": 0,\"removed\": 0,\"rules\": []},\"uiuser\": null,\"rule\": null,\"ruleByXlateRule\": null,\"rule_owners\": [],\"xlate_rule\": null,\"flow_access_id\": null,\"flow_access\": null,\"removed\": null,\"ChangeID\": \"\",\"AdoITID\": \"\",\"Compliance\": 0,\"ViolationDetails\": \"\",\"DisplayOrderNumberString\": \"\",\"DisplayOrderNumber\": 2,\"Certified\": false,\"DeviceName\": \"\",\"RulebaseName\": \"\",\"DisregardedFroms\": [],\"DisregardedTos\": [],\"DisregardedServices\": [],\"ShowDisregarded\": false}]}]," + "\"changelog_rules\": null,\"changelog_objects\": null,\"changelog_services\": null,\"changelog_users\": null,\"import\": {\"aggregate\": {\"max\": {\"id\": null}}},\"import_controls\": [],\"RelevantImportId\": null,\"is_super_manager\": false,\"multi_device_manager_id\": null,\"management\": null,\"managementByMultiDeviceManagerId\": [],\"networkObjects\": [],\"serviceObjects\": [],\"userObjects\": [],\"zoneObjects\": []," + "\"reportNetworkObjects\": [{\"obj_id\": 1,\"obj_name\": \"TestIp1\",\"obj_ip\": \"1.2.3.4/32\",\"obj_ip_end\": \"1.2.3.4/32\",\"obj_uid\": \"019e44c0-4816-7c9a-9b94-5417d3cbb15f\",\"zone\": null,\"active\": false,\"obj_create\": 0," + "\"obj_create_time\": {\"time\": \"0001-01-01T00:00:00\"},\"type\": {\"id\": 0,\"name\": \"network\"},\"obj_color\": null,\"obj_comment\": \"Comment-019e44c0-4816-7c9a-9b94-5417d3cbb15f\",\"obj_member_names\": \"Member-019e44c0-4816-7c9a-9b94-5417d3cbb15f\",\"obj_member_refs\": \"\"," + diff --git a/roles/tests-unit/files/FWO.Test/NatRuleDisplayHtmlTest.cs b/roles/tests-unit/files/FWO.Test/NatRuleDisplayHtmlTest.cs new file mode 100644 index 0000000000..2b60a04063 --- /dev/null +++ b/roles/tests-unit/files/FWO.Test/NatRuleDisplayHtmlTest.cs @@ -0,0 +1,137 @@ +using FWO.Basics; +using FWO.Data; +using FWO.Report; +using FWO.Report.Filter; +using FWO.Ui.Display; +using NUnit.Framework; + +namespace FWO.Test +{ + [TestFixture] + [Parallelizable] + internal class NatRuleDisplayHtmlTest + { + private NatRuleDisplayHtml _display = null!; + + [SetUp] + public void SetUp() + { + _display = new NatRuleDisplayHtml(new SimulatedUserConfig()); + } + + private static Rule MakeNatRule(NatData natData) => + new Rule { Id = 1, MgmtId = 1, NatData = natData }; + + // ── DisplayTranslatedSource ────────────────────────────────────────── + + [Test] + public void DisplayTranslatedSource_EmptyFroms_ReturnsEmpty() + { + var rule = MakeNatRule(new NatData { TranslatedFroms = [] }); + + var result = _display.DisplayTranslatedSource(rule, OutputLocation.export); + + Assert.That(result.Trim(), Is.Empty); + } + + [Test] + public void DisplayTranslatedSource_NegatedWithEmptyFroms_ContainsNegatedText() + { + var rule = MakeNatRule(new NatData { TranslatedSourceNegated = true, TranslatedFroms = [] }); + + var result = _display.DisplayTranslatedSource(rule, OutputLocation.export); + + Assert.That(result, Does.Contain("not")); + } + + [Test] + public void DisplayTranslatedSource_NotNegated_DoesNotContainNegatedText() + { + var rule = MakeNatRule(new NatData { TranslatedSourceNegated = false, TranslatedFroms = [] }); + + var result = _display.DisplayTranslatedSource(rule, OutputLocation.export); + + Assert.That(result, Does.Not.Contain("not")); + } + + // ── DisplayTranslatedDestination ───────────────────────────────────── + + [Test] + public void DisplayTranslatedDestination_EmptyTos_ReturnsEmpty() + { + var rule = MakeNatRule(new NatData { TranslatedTos = [] }); + + var result = _display.DisplayTranslatedDestination(rule, OutputLocation.export); + + Assert.That(result.Trim(), Is.Empty); + } + + [Test] + public void DisplayTranslatedDestination_NegatedWithEmptyTos_ContainsNegatedText() + { + var rule = MakeNatRule(new NatData { TranslatedDestinationNegated = true, TranslatedTos = [] }); + + var result = _display.DisplayTranslatedDestination(rule, OutputLocation.export); + + Assert.That(result, Does.Contain("not")); + } + + [Test] + public void DisplayTranslatedDestination_NotNegated_DoesNotContainNegatedText() + { + var rule = MakeNatRule(new NatData { TranslatedDestinationNegated = false, TranslatedTos = [] }); + + var result = _display.DisplayTranslatedDestination(rule, OutputLocation.export); + + Assert.That(result, Does.Not.Contain("not")); + } + + // ── DisplayTranslatedService ───────────────────────────────────────── + + [Test] + public void DisplayTranslatedService_EmptyServices_ReturnsEmpty() + { + var rule = MakeNatRule(new NatData { TranslatedServices = [] }); + + var result = _display.DisplayTranslatedService(rule, OutputLocation.export); + + Assert.That(result.Trim(), Is.Empty); + } + + [Test] + public void DisplayTranslatedService_NegatedWithEmptyServices_ContainsNegatedText() + { + var rule = MakeNatRule(new NatData { TranslatedServiceNegated = true, TranslatedServices = [] }); + + var result = _display.DisplayTranslatedService(rule, OutputLocation.export); + + Assert.That(result, Does.Contain("not")); + } + + [Test] + public void DisplayTranslatedService_NotNegated_DoesNotContainNegatedText() + { + var rule = MakeNatRule(new NatData { TranslatedServiceNegated = false, TranslatedServices = [] }); + + var result = _display.DisplayTranslatedService(rule, OutputLocation.export); + + Assert.That(result, Does.Not.Contain("not")); + } + + [Test] + public void DisplayTranslatedService_WithOneService_ContainsServiceName() + { + var rule = MakeNatRule(new NatData + { + TranslatedServices = + [ + new ServiceWrapper { Content = new NetworkService { Name = "HTTPS", DestinationPort = 443, Protocol = new() { Id = 6, Name = "TCP" } } } + ] + }); + + var result = _display.DisplayTranslatedService(rule, OutputLocation.export); + + Assert.That(result, Does.Contain("HTTPS")); + } + } +} diff --git a/roles/tests-unit/files/FWO.Test/ReportNatRulesTest.cs b/roles/tests-unit/files/FWO.Test/ReportNatRulesTest.cs new file mode 100644 index 0000000000..9ce212002a --- /dev/null +++ b/roles/tests-unit/files/FWO.Test/ReportNatRulesTest.cs @@ -0,0 +1,130 @@ +using FWO.Basics; +using FWO.Data; +using FWO.Data.Report; +using FWO.Report; +using FWO.Report.Filter; +using FWO.Ui.Display; +using NUnit.Framework; + +namespace FWO.Test +{ + [TestFixture] + [Parallelizable] + internal class ReportNatRulesTest + { + private NatRuleDisplayHtml _display = null!; + private ReportNatRules _report = null!; + + [SetUp] + public void SetUp() + { + var userConfig = new SimulatedUserConfig(); + _display = new NatRuleDisplayHtml(userConfig); + _report = new ReportNatRules(new DynGraphqlQuery(""), userConfig, ReportType.NatRules); + } + + [Test] + public void ExportSingleRulebaseToHtml_EmptyRuleArray_ReturnsEmptyString() + { + var result = _report.ExportSingleRulebaseToHtml([], _display, chapterNumber: 1); + + Assert.That(result.Trim(), Is.Empty); + } + + [Test] + public void ExportSingleRulebaseToHtml_NormalRule_ContainsTableRow() + { + var rules = new[] + { + new Rule + { + Id = 1, + Uid = "rule-uid-1", + Name = "TestRule", + SectionHeader = "", + NatData = new NatData() + } + }; + + var result = _report.ExportSingleRulebaseToHtml(rules, _display, chapterNumber: 1); + + Assert.That(result, Does.Contain("")); + Assert.That(result, Does.Contain("")); + } + + [Test] + public void ExportSingleRulebaseToHtml_NormalRule_DoesNotContainColspan() + { + var rules = new[] + { + new Rule + { + Id = 1, + Uid = "rule-uid-1", + SectionHeader = "", + NatData = new NatData() + } + }; + + var result = _report.ExportSingleRulebaseToHtml(rules, _display, chapterNumber: 1); + + Assert.That(result, Does.Not.Contain("colspan")); + } + + [Test] + public void ExportSingleRulebaseToHtml_SectionHeaderRule_ContainsColspan() + { + var rules = new[] + { + new Rule + { + Id = 2, + SectionHeader = "My Section", + NatData = new NatData() + } + }; + + var result = _report.ExportSingleRulebaseToHtml(rules, _display, chapterNumber: 1); + + Assert.That(result, Does.Contain("colspan")); + Assert.That(result, Does.Contain("My Section")); + } + + [Test] + public void ExportSingleRulebaseToHtml_SectionHeaderRule_DoesNotContainRegularRuleCells() + { + var rules = new[] + { + new Rule + { + Id = 3, + SectionHeader = "Section A", + NatData = new NatData() + } + }; + + var result = _report.ExportSingleRulebaseToHtml(rules, _display, chapterNumber: 1); + + // A section row gets a single merged cell, no individual for each column + Assert.That(result, Does.Not.Contain("")); + var tdCount = result.Split("").Length - 1; + Assert.That(openTrCount, Is.EqualTo(3)); + } + } +} diff --git a/roles/tests-unit/files/FWO.Test/RuleDataTest.cs b/roles/tests-unit/files/FWO.Test/RuleDataTest.cs index 0fff0cf304..42ebbd72eb 100644 --- a/roles/tests-unit/files/FWO.Test/RuleDataTest.cs +++ b/roles/tests-unit/files/FWO.Test/RuleDataTest.cs @@ -156,5 +156,44 @@ public void WorkflowRequestFlowLinkFields_AreDeserialized() Assert.That(reqTask.Elements[0].FlowServiceObjectId, Is.EqualTo(7004)); Assert.That(reqTask.Elements[0].FlowServiceGroupId, Is.EqualTo(7005)); } + + [Test] + public void NormalizedRule_FromRule_PreservesNatAndTranslationFlags() + { + Rule rule = new() + { + NatRule = true, + AccessRule = false, + XlateRule = "1366", + TranslatedRule = new Rule { Uid = "translated-rule" }, + RuleOrderNumber = 7, + OrderNumber = 7.0, + Disabled = false, + SourceNegated = false, + Source = "any", + SourceRefs = "", + DestinationNegated = false, + Destination = "any", + DestinationRefs = "", + ServiceNegated = false, + Service = "any", + ServiceRefs = "", + Action = "accept", + Track = "none", + Implied = false, + Metadata = new RuleMetadata() + }; + + NormalizedRule normalizedRule = NormalizedRule.FromRule(rule); + + Assert.That(normalizedRule.NatRule, Is.True); + Assert.That(normalizedRule.AccessRule, Is.False); + Assert.That(normalizedRule.XlateRule, Is.EqualTo("translated-rule")); + + string serialized = JsonConvert.SerializeObject(normalizedRule); + Assert.That(serialized, Does.Contain("\"nat_rule\":true")); + Assert.That(serialized, Does.Contain("\"access_rule\":false")); + Assert.That(serialized, Does.Contain("\"xlate_rule_uid\":\"translated-rule\"")); + } } }