diff --git a/documentation/rework-2025/rulebase-links.md b/documentation/rework-2025/rulebase-links.md new file mode 100644 index 0000000000..592a7840c7 --- /dev/null +++ b/documentation/rework-2025/rulebase-links.md @@ -0,0 +1,80 @@ +# Rulebase Links + +## Concept + +- each firewall device defines one directed tree for its rulebases +- rulebases are the tree verticies and rulebase links are tree edges +- rulebases contain a (possibly empty) list of rules and a uid/id +- in context of database we use ids, in importer context uids, here we default to ids +- each rulebase tree is rooted, this is defined with the initial rulebase link +- rulebase links have many fields, each is explained in this document + +## Rulebase Link Fields + +### From and To Rulebase Id + +- **from_rulebase_id** and **to_rulebase_id** constitute the directed tree edges +- both fields are not NULL, with one exception, from_rulebase_id is empty for the initial rulebase link +- sometimes rulebases need to be linked from a specific rule (e.g. inline-layer, domain) +- this is done with **from_rule_id** +- from _rule_id is a rule id that is in the from_rulebase_id + +### Boolean Flags + +- **is_initial** is true, if to_rulebase_id is the first rulebase for the device +- **is_global** is true, if to_rulebase_id belongs to the super manager +- **is_section** is true, if to_rulebase_id gets a section header in reporting +- sections get a section header in reporting and are collapsible + +### Types + +Each link has a **type** that explains +- how the rulebase tree is traversed by incoming traffic requests +- how the rules report should look like +- how rule numbers are created + +#### Ordered + +- high level rulebase concept +- ordered rulebases are a chain +- each ordered rulebase needs one accept rule to allow traffic request +- first ordered rulebase gets number 1 and so on +- rules are numbered in dot notation, 2.13 is the 13th rule in the 2nd ordered rulebase +- if only one (initial) ordered rulebase exists the number 1. is ommited for all rules +- each ordered rulebase gets a header that is collapsible in reporting + +#### Domain + +- from_rulebase_id is a global rulebase, to_rulebase_id is a local rulebase +- from_rule_id is defined and named placeholder rule +- traffic request traverses from_rulebase_id up to and including placeholder rule +- then domain rules are traversed and after that the remaining global rules +- if traffic is droped in domain rules, it is droped overall +- numbering is in dot notation, each domain rule gets the number of placeholder rule as prefix +- 2.3.38 is the 38th rule in the local rulebase, placeholder rule is the 3rd global rule in the 2nd global ordered rulebase +- in reporting to_rulebase_id is collapsible with a the placeholder rule as banner + +#### Concatenated + +- from_rulebase_id and to_rulebase_id are the same rulebase in context of a traffic request +- first from_rulebase_id is traversed for an accept rule and if none is found, then to_rulebase_id +- rule numbers are continued, if rulebase 1 ends with rule 16, then rulebase 2 starts with rule 17 +- concatenated rulebases are not collapsible in reporting by default, but could be if is_section is true + +#### Inline + +- from_rulebase_id is interrupted by to_rulebase_id +- from_rule_id is defined and named layer guard +- if layer guard is reached by traffic traversal and allows traffic, then all to_rulebase_id are traversed +- if to_rulebase_id allows traffic it is allowed overall, else from_rulebase_id keeps getting traversed after layer guard +- inline rulebases may be nested +- numbering is in dot notation, each inline rule gets the number of from_rule_id as prefix +- 2.3.18 is the 18th rule in the nested inline rulebase, its layerguard is the 3rd rule in the inline rulebase with layer guard 2 in the ordered layer +- in reporting to_rulebase_id is collapsible with a the layer guard rule as banner + +#### NAT + +- each ordered from_rulebase_id may have a nat to_rulebase_id +- nat rulebases do not contain access rules but nat rules +- nat rules are not important for numbering or traffic traversal +- nat rules get their own report diff --git a/roles/database/files/sql/creation/fworch-fill-stm.sql b/roles/database/files/sql/creation/fworch-fill-stm.sql index 6c121ac3f4..6186e4b43b 100644 --- a/roles/database/files/sql/creation/fworch-fill-stm.sql +++ b/roles/database/files/sql/creation/fworch-fill-stm.sql @@ -543,6 +543,7 @@ insert into stm_link_type (id, name) VALUES (2, 'ordered'); insert into stm_link_type (id, name) VALUES (3, 'inline'); insert into stm_link_type (id, name) VALUES (4, 'concatenated'); insert into stm_link_type (id, name) VALUES (5, 'domain'); +insert into stm_link_type (id, name) VALUES (6, 'nat'); -- insert into compliance.assessability_issue_type (type_id, type_name) VALUES (1, 'empty group'); -- insert into compliance.assessability_issue_type (type_id, type_name) VALUES (2, 'broadcast address'); diff --git a/roles/database/files/upgrade/9.0.sql b/roles/database/files/upgrade/9.0.sql index 4a65dff012..d72944b3e8 100644 --- a/roles/database/files/upgrade/9.0.sql +++ b/roles/database/files/upgrade/9.0.sql @@ -789,6 +789,7 @@ insert into stm_link_type (id, name) VALUES (2, 'ordered') ON CONFLICT DO NOTHIN insert into stm_link_type (id, name) VALUES (3, 'inline') ON CONFLICT DO NOTHING; insert into stm_link_type (id, name) VALUES (4, 'concatenated') ON CONFLICT DO NOTHING; insert into stm_link_type (id, name) VALUES (5, 'domain') ON CONFLICT DO NOTHING; +insert into stm_link_type (id, name) VALUES (6, 'nat') ON CONFLICT DO NOTHING; delete from stm_link_type where name in ('initial','global','local','section'); -- initial and global/local are additional flags now -- TODO delete all rule.parent_rule_id and rule.parent_rule_type, always = None so far diff --git a/roles/importer/files/importer/fw_modules/checkpointR8x/cp_getter.py b/roles/importer/files/importer/fw_modules/checkpointR8x/cp_getter.py index 7853faf8b3..e3b37e7027 100644 --- a/roles/importer/files/importer/fw_modules/checkpointR8x/cp_getter.py +++ b/roles/importer/files/importer/fw_modules/checkpointR8x/cp_getter.py @@ -326,8 +326,9 @@ def get_global_assignments(api_v_url: str, sid: str, show_params_policy_structur # parse global assignments for assignment in assignments["objects"]: - global_assignment = parse_global_assignment(assignment) - global_assignments.append(global_assignment) + if "global-access-policy" in assignment: + global_assignment = parse_global_assignment(assignment) + global_assignments.append(global_assignment) return global_assignments @@ -344,7 +345,7 @@ def get_rulebases( rulebase_uid: str | None = None, rulebase_name: str | None = None, ) -> list[str]: - # i access_type : access / nat + # access_type is either "access" or "nat" native_config_rulebase_key = "rulebases" current_rulebase = {} @@ -635,11 +636,14 @@ def assign_placeholder_uids( def get_nat_rules_from_api_as_dict( - api_v_url: str, sid: str, show_params_rules: dict[str, Any], native_config_domain: dict[str, Any] | None = None -): - if native_config_domain is None: - native_config_domain = {} - nat_rules: dict[str, list[Any]] = {"nat_rule_chunks": []} + policy_dict: dict[str, Any], + api_v_url: str, + sid: str, + show_params_rules: dict[str, Any], + native_config_domain: dict[str, Any], +) -> dict[str, Any]: + """Gets NAT rulebases, uid and name are augmented with _nat for uniquenes of rulebase_links""" + nat_rules: dict[str, Any] = {"uid": policy_dict["uid"] + "_nat", "name": policy_dict["name"] + "_nat", "chunks": []} current = 0 total = current + 1 while current < total: @@ -661,7 +665,7 @@ def get_nat_rules_from_api_as_dict( ]: resolve_ref_list_from_object_dictionary(rulebase, rule_field, native_config_domain=native_config_domain) - nat_rules["nat_rule_chunks"].append(rulebase) + nat_rules["chunks"].append(rulebase) if "total" in rulebase: total = rulebase["total"] else: diff --git a/roles/importer/files/importer/fw_modules/checkpointR8x/cp_rule.py b/roles/importer/files/importer/fw_modules/checkpointR8x/cp_rule.py index a0aa82a2d7..e1b75f20f8 100644 --- a/roles/importer/files/importer/fw_modules/checkpointR8x/cp_rule.py +++ b/roles/importer/files/importer/fw_modules/checkpointR8x/cp_rule.py @@ -1,5 +1,6 @@ import ast import json +from copy import deepcopy from typing import Any from fwo_base import sanitize, sort_and_join_refs @@ -28,7 +29,7 @@ def normalize_rulebases( native_config_global: dict[str, Any] | None, import_state: ImportState, normalized_config_dict: dict[str, Any], - normalized_config_global: dict[str, Any] | None, + normalized_config_global: dict[str, Any], is_global_loop_iteration: bool, ): normalized_config_dict["policies"] = [] @@ -38,7 +39,7 @@ def normalize_rulebases( uid_to_name_map[nw_obj["obj_uid"]] = nw_obj["obj_name"] fetched_rulebase_uids: list[str] = [] - if normalized_config_global is not None and normalized_config_global != {}: + if normalized_config_global != {}: fetched_rulebase_uids.extend( [normalized_rulebase_global.uid for normalized_rulebase_global in normalized_config_global["policies"]] ) @@ -51,7 +52,7 @@ def normalize_rulebases( is_global_loop_iteration, import_state, normalized_config_dict, - normalized_config_global, # type: ignore # TODO: check if normalized_config_global can be None, I am pretty sure it cannot be None here # noqa: PGH003 + normalized_config_global, ) # TODO: parse nat rulebase here @@ -66,7 +67,7 @@ def normalize_rulebases_for_each_link_destination( normalized_config_global: dict[str, Any], ): for rulebase_link in gateway["rulebase_links"]: - if rulebase_link["to_rulebase_uid"] not in fetched_rulebase_uids and rulebase_link["to_rulebase_uid"] != "": + if rulebase_link["to_rulebase_uid"] not in fetched_rulebase_uids: rulebase_to_parse, is_section, is_placeholder = find_rulebase_to_parse( native_config["rulebases"], rulebase_link["to_rulebase_uid"] ) @@ -77,6 +78,11 @@ def normalize_rulebases_for_each_link_destination( native_config_global["rulebases"], rulebase_link["to_rulebase_uid"] ) found_rulebase_in_global = True + # search in nat rulebases + if rulebase_to_parse == {}: + rulebase_to_parse, is_section, is_placeholder = find_rulebase_to_parse( + native_config["nat_rulebases"], rulebase_link["to_rulebase_uid"] + ) if rulebase_to_parse == {}: FWOLogger.warning("found to_rulebase link without rulebase in nativeConfig: " + str(rulebase_link)) continue @@ -86,10 +92,21 @@ def normalize_rulebases_for_each_link_destination( ) fetched_rulebase_uids.append(rulebase_link["to_rulebase_uid"]) - if found_rulebase_in_global: - normalized_config_global["policies"].append(normalized_rulebase) - else: - normalized_config_dict["policies"].append(normalized_rulebase) + append_normalized_rulebase( + normalized_config_dict, normalized_config_global, normalized_rulebase, found_rulebase_in_global + ) + + +def append_normalized_rulebase( + normalized_config_dict: dict[str, Any], + normalized_config_global: dict[str, Any], + normalized_rulebase: Rulebase, + found_rulebase_in_global: bool, +): + if found_rulebase_in_global: + normalized_config_global["policies"].append(normalized_rulebase) + else: + normalized_config_dict["policies"].append(normalized_rulebase) def find_rulebase_to_parse(rulebase_list: list[dict[str, Any]], rulebase_uid: str) -> tuple[dict[str, Any], bool, bool]: @@ -166,14 +183,11 @@ def parse_rulebase( ): if is_section: for rule in rulebase_to_parse["rulebase"]: - # delte_v sind import_id, parent_uid, config2import wirklich egal? Dann können wir diese argumente löschen - NAT ACHTUNG - parse_single_rule(rule, normalized_rulebase, normalized_rulebase.uid, None, gateway, policy_structure) + parse_single_rule(rule, normalized_rulebase, normalized_rulebase.uid, gateway, policy_structure) FWOLogger.debug("parsed rulebase " + normalized_rulebase.uid, 4) elif is_placeholder: - parse_single_rule( - rulebase_to_parse, normalized_rulebase, normalized_rulebase.uid, None, gateway, policy_structure - ) + parse_single_rule(rulebase_to_parse, normalized_rulebase, normalized_rulebase.uid, gateway, policy_structure) else: parse_rulebase_chunk(rulebase_to_parse, normalized_rulebase, gateway, policy_structure) @@ -187,7 +201,7 @@ def parse_rulebase_chunk( for chunk in rulebase_to_parse["chunks"]: for rule in chunk["rulebase"]: if "rule-number" in rule: - parse_single_rule(rule, normalized_rulebase, normalized_rulebase.uid, None, gateway, policy_structure) + parse_single_rule(rule, normalized_rulebase, normalized_rulebase.uid, gateway, policy_structure) else: FWOLogger.debug("found unparsable rulebase: " + str(rulebase_to_parse), 9) @@ -214,6 +228,7 @@ def accept_malformed_parts(objects: dict[str, Any] | list[dict[str, Any]], part: return {} +# TODO: chunking in object dicts should not be neccessary if limit is high enough def parse_rule_part( objects: dict[str, Any] | list[dict[str, Any] | None] | None, part: str = "source" ) -> dict[str, Any]: @@ -297,7 +312,6 @@ def parse_single_rule( native_rule: dict[str, Any], rulebase: Rulebase, layer_name: str, - parent_uid: str | None, gateway: dict[str, Any], policy_structure: list[dict[str, Any]], ): @@ -306,99 +320,115 @@ def parse_single_rule( "type" in native_rule and native_rule["type"] != "place-holder" and "rule-number" in native_rule ): # standard rule, no section header return - # the following objects might come in chunks: - source_objects: dict[str, str] = parse_rule_part(native_rule["source"], "source") - rule_src_ref, rule_src_name = sort_and_join_refs(list(source_objects.items())) - - dst_objects: dict[str, str] = parse_rule_part(native_rule["destination"], "destination") - rule_dst_ref, rule_dst_name = sort_and_join_refs(list(dst_objects.items())) - svc_objects: dict[str, str] = parse_rule_part(native_rule["service"], "service") - rule_svc_ref, rule_svc_name = sort_and_join_refs(list(svc_objects.items())) + + # start with values for both access and nat rules + last_change_admin = native_rule.get("meta-info", {}).get("last-modifier", None) + rule_name = native_rule.get("name") rule_enforced_on_gateways = parse_rule_enforced_on_gateway(gateway, policy_structure, native_rule=native_rule) list_of_gw_uids = sorted({enforceEntry.dev_uid for enforceEntry in rule_enforced_on_gateways}) str_list_of_gw_uids = LIST_DELIMITER.join(list_of_gw_uids) if list_of_gw_uids else None - - rule_track = _parse_track(native_rule=native_rule) - - action_objects = parse_rule_part(native_rule["action"], "action") - if action_objects is not None: # type: ignore # TODO: this should be never None # noqa: PGH003 - rule_action = LIST_DELIMITER.join(action_objects.values()) # expecting only a single action - else: - rule_action = None - FWOLogger.warning("found rule without action: " + str(native_rule)) - - time_objects = parse_rule_part(native_rule["time"], "time") - rule_time = LIST_DELIMITER.join(time_objects.values()) if time_objects else None - - # starting with the non-chunk objects - rule_name = native_rule.get("name") - - # new in v8.0.3: - rule_custom_fields = native_rule.get("custom-fields") - - # we leave out all last_admin info for now - last_change_admin = None - - parent_rule_uid = _parse_parent_rule_uid(parent_uid, native_rule=native_rule) - - # new in v5.5.1: - rule_type = native_rule.get("rule_type", "access") - comments = native_rule.get("comments") if comments == "": comments = None - + rule_custom_fields = native_rule.get("custom-fields") if "hits" in native_rule and "last-date" in native_rule["hits"] and "iso-8601" in native_rule["hits"]["last-date"]: last_hit = native_rule["hits"]["last-date"]["iso-8601"] else: last_hit = None + # objects only used in access rules + if native_rule["type"] == "access-rule": + source_negate = bool(native_rule["source-negate"]) + destination_negate = bool(native_rule["destination-negate"]) + service_negate = bool(native_rule["service-negate"]) + source_objects: dict[str, str] = parse_rule_part(native_rule["source"], "source") + rule_src_ref, rule_src_name = sort_and_join_refs(list(source_objects.items())) + dst_objects: dict[str, str] = parse_rule_part(native_rule["destination"], "destination") + rule_dst_ref, rule_dst_name = sort_and_join_refs(list(dst_objects.items())) + svc_objects: dict[str, str] = parse_rule_part(native_rule["service"], "service") + rule_svc_ref, rule_svc_name = sort_and_join_refs(list(svc_objects.items())) + action_objects = parse_rule_part(native_rule["action"], "action") + rule_action = LIST_DELIMITER.join(action_objects.values()) + rule_track = _parse_track(native_rule) + time_objects = parse_rule_part(native_rule["time"], "time") + rule_time = LIST_DELIMITER.join(time_objects.values()) if time_objects else None + rule_type = "access" + # objects only used in nat rules + elif native_rule["type"] == "nat-rule": + source_negate = False + destination_negate = False + service_negate = False + source_objects: dict[str, str] = parse_rule_part(native_rule["original-source"], "original-source") + rule_src_ref, rule_src_name = sort_and_join_refs(list(source_objects.items())) + dst_objects: dict[str, str] = parse_rule_part(native_rule["original-destination"], "original-destination") + rule_dst_ref, rule_dst_name = sort_and_join_refs(list(dst_objects.items())) + svc_objects: dict[str, str] = parse_rule_part(native_rule["original-service"], "original-service") + rule_svc_ref, rule_svc_name = sort_and_join_refs(list(svc_objects.items())) + rule_action = "drop" + rule_track = "none" + rule_time = None + rule_type = "nat" + else: + raise FwoImporterErrorInconsistenciesError( + "Rule " + native_rule["uid"] + " is neither access-rule nor nat-rule" + ) + rule: dict[str, Any] = { + "last_change_admin": sanitize(last_change_admin), + "rule_name": sanitize(rule_name), + "parent_rule_uid": None, "rule_num": 0, "rule_num_numeric": 0, - "rulebase_name": sanitize(layer_name), + "rule_uid": sanitize(native_rule["uid"]), "rule_disabled": not bool(native_rule["enabled"]), - "rule_src_neg": bool(native_rule["source-negate"]), + "rule_src_neg": source_negate, + "rule_dst_neg": destination_negate, + "rule_svc_neg": service_negate, "rule_src": sanitize(rule_src_name), - "rule_src_refs": sanitize(rule_src_ref), - "rule_dst_neg": bool(native_rule["destination-negate"]), "rule_dst": sanitize(rule_dst_name), - "rule_dst_refs": sanitize(rule_dst_ref), - "rule_svc_neg": bool(native_rule["service-negate"]), "rule_svc": sanitize(rule_svc_name), + "rule_src_refs": sanitize(rule_src_ref), + "rule_dst_refs": sanitize(rule_dst_ref), "rule_svc_refs": sanitize(rule_svc_ref), "rule_action": sanitize(rule_action, lower=True), "rule_track": sanitize(rule_track, lower=True), "rule_installon": sanitize(str_list_of_gw_uids), "rule_time": sanitize(rule_time), - "rule_name": sanitize(rule_name), - "rule_uid": sanitize(native_rule["uid"]), - "rule_custom_fields": sanitize(rule_custom_fields), + "rule_comment": sanitize(comments), "rule_implied": False, + "rule_custom_fields": sanitize(rule_custom_fields), "rule_type": sanitize(rule_type), - "last_change_admin": sanitize(last_change_admin), - "parent_rule_uid": sanitize(parent_rule_uid), + "rulebase_name": sanitize(layer_name), "last_hit": sanitize(last_hit), } - if comments is not None: - rule["rule_comment"] = sanitize(comments) - rulebase.rules.update({rule["rule_uid"]: RuleNormalized(**rule)}) - -def _parse_parent_rule_uid(parent_uid: str | None, native_rule: dict[str, Any]) -> str | None: - # new in v5.1.17: - if "parent_rule_uid" in native_rule: - FWOLogger.debug( - "found rule (uid=" + native_rule["uid"] + ") with parent_rule_uid set: " + native_rule["parent_rule_uid"] + if native_rule["type"] == "nat-rule": + rule.update({"xlate_rule": sanitize(native_rule["uid"] + "_translated")}) + source_objects: dict[str, str] = parse_rule_part(native_rule["translated-source"], "translated-source") + rule_src_ref, rule_src_name = sort_and_join_refs(list(source_objects.items())) + dst_objects: dict[str, str] = parse_rule_part(native_rule["translated-destination"], "translated-destination") + rule_dst_ref, rule_dst_name = sort_and_join_refs(list(dst_objects.items())) + svc_objects: dict[str, str] = parse_rule_part(native_rule["translated-service"], "translated-service") + rule_svc_ref, rule_svc_name = sort_and_join_refs(list(svc_objects.items())) + + xlate_rule: dict[str, Any] = deepcopy(rule) + xlate_rule.update( + { + "rule_uid": sanitize(native_rule["uid"] + "_translated"), + "rule_src_neg": source_negate, + "rule_dst_neg": destination_negate, + "rule_svc_neg": service_negate, + "rule_src": sanitize(rule_src_name), + "rule_dst": sanitize(rule_dst_name), + "rule_svc": sanitize(rule_svc_name), + "rule_src_refs": sanitize(rule_src_ref), + "rule_dst_refs": sanitize(rule_dst_ref), + "rule_svc_refs": sanitize(rule_svc_ref), + } ) - parent_rule_uid = native_rule["parent_rule_uid"] - else: - parent_rule_uid = parent_uid + rulebase.rules.update({rule["rule_uid"]: RuleNormalized(**xlate_rule)}) - if parent_rule_uid == "": - parent_rule_uid = None - - return parent_rule_uid + rulebase.rules.update({rule["rule_uid"]: RuleNormalized(**rule)}) def _parse_track(native_rule: dict[str, Any]) -> str: @@ -461,31 +491,6 @@ def resolve_nwobj_uid_to_name(nw_obj_uid: str) -> str: return "" -# delete_v: left here only for nat case -def check_and_add_section_header( - src_rulebase: dict[str, Any], - target_rulebase: Rulebase, - layer_name: str, - import_id: str, - section_header_uids: set[str], -): - # TODO: re-implement - raise NotImplementedError("check_and_add_section_header is not implemented yet.") - - -def insert_section_header_rule( - _target_rulebase: Rulebase, - _section_name: str, - _layer_name: str, - _import_id: str, - _src_rulebase_uid: str, - _section_header_uids: set[str], - _parent_uid: str, -): - # TODO: re-implement - return - - def ensure_json(raw: str) -> Any: """ Tries to parse the given string as valid JSON. diff --git a/roles/importer/files/importer/fw_modules/checkpointR8x/fwcommon.py b/roles/importer/files/importer/fw_modules/checkpointR8x/fwcommon.py index 617630d2b8..a96257332a 100644 --- a/roles/importer/files/importer/fw_modules/checkpointR8x/fwcommon.py +++ b/roles/importer/files/importer/fw_modules/checkpointR8x/fwcommon.py @@ -133,7 +133,7 @@ def normalize_config( # in case of mds, first nativ config domain is global is_global_loop_iteration = False - native_config_global: dict[str, Any] = {} + native_config_global: dict[str, Any] | None = None normalized_config_global = {} if config_in.native_config["domains"][0]["is-super-manager"]: native_config_global = config_in.native_config["domains"][0] @@ -189,7 +189,7 @@ def normalize_config( def normalize_single_manager_config( native_config: dict[str, Any], - native_config_global: dict[str, Any], + native_config_global: dict[str, Any] | None, normalized_config_dict: dict[str, Any], normalized_config_global: dict[str, Any], import_state: ImportState, @@ -278,10 +278,10 @@ def create_ordered_manager_list(import_state: ImportState) -> list[ManagementCon def handle_super_manager( manager_details: ManagementController, cp_manager_api_base_url: str, show_params_policy_structure: dict[str, Any] -) -> tuple[list[Any], None, Any | None, str]: +) -> tuple[list[Any], list[Any] | None, Any | None, str]: # global assignments are fetched from mds domain mds_sid: str = cp_getter.login(manager_details) - global_policy_structure = None + global_policy_structure: list[Any] | None = [] global_domain = None global_assignments = cp_getter.get_global_assignments( cp_manager_api_base_url, mds_sid, show_params_policy_structure @@ -306,6 +306,9 @@ def handle_super_manager( else: raise FwoImporterError(f"Unexpected global assignments: {global_assignments!s}") + if len(global_policy_structure) == 0: + global_policy_structure = None + return global_assignments, global_policy_structure, global_domain, global_sid @@ -322,15 +325,17 @@ def process_devices( native_config_global_domain: dict[str, Any], import_state: ImportState, ) -> None: + fetched_nat_rulebases: list[str] = [] + fetched_but_empty_nat_rulebases: list[str] = [] for device in manager_details.devices: - device_config: dict[str, Any] = initialize_device_config(device) + device_config = initialize_device_config(device) if not device_config: continue - ordered_layer_uids: list[str] = get_ordered_layer_uids( + ordered_layer_uids, policy_dict = get_ordered_layer_uids( policy_structure, device_config, manager_details.get_domain_string() ) - if not ordered_layer_uids: + if not ordered_layer_uids or not policy_dict: FWOLogger.warning(f"No ordered layers found for device: {device_config['name']}") native_config_domain["gateways"].append(device_config) continue @@ -363,7 +368,15 @@ def process_devices( global_ordered_layer_count=global_ordered_layer_count, ) - handle_nat_rules(device, native_config_domain, sid, import_state) + handle_nat_rules( + policy_dict, + device_config, + native_config_domain, + sid, + import_state, + fetched_nat_rulebases, + fetched_but_empty_nat_rulebases, + ) native_config_domain["gateways"].append(device_config) @@ -401,7 +414,10 @@ def handle_global_rulebase_links( continue for global_policy in global_policy_structure: if global_policy["name"] == global_assignment["global-access-policy"]: - global_ordered_layer_uids = get_ordered_layer_uids([global_policy], device_config, global_domain) + # no global NAT, so global_policy_dict not used + global_ordered_layer_uids, _global_policy_dict = get_ordered_layer_uids( + [global_policy], device_config, global_domain + ) if not global_ordered_layer_uids: FWOLogger.warning(f"No access layer for global policy: {global_policy['name']}") break @@ -445,7 +461,6 @@ def define_global_rulebase_link( # parse global rulebases, find place-holders and link local rulebases placeholder_link_index = 0 for global_rulebase_uid in global_policy_rulebases_uid_list: - placeholder_rule_uid = "" for rulebase in native_config_global_domain["rulebases"]: if rulebase["uid"] == global_rulebase_uid: placeholder_rule_uid, placeholder_rulebase_uid = cp_getter.get_placeholder_in_rulebase(rulebase) @@ -459,7 +474,7 @@ def define_global_rulebase_link( device_config["rulebase_links"].append( { "from_rulebase_uid": placeholder_rulebase_uid, - "from_rule_uid": None, + "from_rule_uid": placeholder_rule_uid, "to_rulebase_uid": ordered_layer_uid, "type": "domain", "is_global": False, @@ -494,27 +509,84 @@ def get_rules_params(import_state: ImportState) -> dict[str, Any]: } -def handle_nat_rules(device: dict[str, Any], native_config_domain: dict[str, Any], sid: str, import_state: ImportState): - if device.get("package_name"): +def handle_nat_rules( + policy_dict: dict[str, Any], + device_config: dict[str, Any], + native_config_domain: dict[str, Any], + sid: str, + import_state: ImportState, + fetched_nat_rulebases: list[str], + fetched_but_empty_nat_rulebases: list[str], +): + """Get nat rulebases, name and uid get _nat prefix and link to access rulebase""" + if policy_dict["uid"] not in fetched_nat_rulebases + fetched_but_empty_nat_rulebases: show_params_rules: dict[str, Any] = { "limit": import_state.fwo_config.api_fetch_size, "use-object-dictionary": cp_const.use_object_dictionary, "details-level": "standard", - "package": device["package_name"], + "package": policy_dict["name"], } - FWOLogger.debug(f"Getting NAT rules for package: {device['package_name']}", 4) + FWOLogger.debug(f"Getting NAT rules for package: {policy_dict['name']}", 4) nat_rules = cp_getter.get_nat_rules_from_api_as_dict( + policy_dict, import_state.mgm_details.build_fw_api_string(), sid, show_params_rules, - native_config_domain=native_config_domain, + native_config_domain, ) - if nat_rules: + if nat_rules["chunks"]: native_config_domain["nat_rulebases"].append(nat_rules) + fetched_nat_rulebases.append(policy_dict["uid"]) # uid without _nat postfix else: - native_config_domain["nat_rulebases"].append({"nat_rule_chunks": []}) - else: - native_config_domain["nat_rulebases"].append({"nat_rule_chunks": []}) + fetched_but_empty_nat_rulebases.append(policy_dict["uid"]) # uid without _nat postfix + + if policy_dict["uid"] in fetched_nat_rulebases: + link_nat_rulebase_sections(policy_dict["uid"], native_config_domain["nat_rulebases"], device_config) + + +def link_nat_rulebase_sections(policy_dict_uid: str, nat_rulebases: list[Any], device_config: dict[str, Any]): + current_nat_rulebase_uid = policy_dict_uid + "_nat" + # link nat rulebase to access rulebase + device_config["rulebase_links"].append( + { + "from_rulebase_uid": policy_dict_uid, + "from_rule_uid": None, + "to_rulebase_uid": current_nat_rulebase_uid, + "type": "nat", + "is_global": False, + "is_initial": False, + "is_section": False, + } + ) + # link all nat sections concatenated + for nat_rulebase in nat_rulebases: + if current_nat_rulebase_uid == nat_rulebase["uid"]: + for nat_rulebase_chunk in nat_rulebase["chunks"]: + if "rulebase" in nat_rulebase_chunk: + current_nat_rulebase_uid = define_nat_section_chain( + current_nat_rulebase_uid, nat_rulebase_chunk, device_config + ) + break + + +def define_nat_section_chain( + current_nat_rulebase_uid: str, nat_rulebase_chunk: dict[str, Any], device_config: dict[str, Any] +) -> str: + for nat_section in nat_rulebase_chunk["rulebase"]: + if nat_section["type"] == "nat-section": + device_config["rulebase_links"].append( + { + "from_rulebase_uid": current_nat_rulebase_uid, + "from_rule_uid": None, + "to_rulebase_uid": nat_section["uid"], + "type": "concatenated", + "is_global": False, + "is_initial": False, + "is_section": True, + } + ) + current_nat_rulebase_uid = nat_section["uid"] + return current_nat_rulebase_uid def add_ordered_layers_to_native_config( @@ -568,20 +640,30 @@ def add_ordered_layers_to_native_config( def get_ordered_layer_uids( policy_structure: list[dict[str, Any]], device_config: dict[str, Any], domain: str | None -) -> list[str]: +) -> tuple[list[str], dict[str, Any]]: """ Get UIDs of ordered layers for policy of device """ ordered_layer_uids: list[str] = [] + policy_dict: dict[str, str] = {} + failsafe_multiple_policies_per_device = False for policy in policy_structure: found_target_in_policy = False for target in policy["targets"]: if target["uid"] == device_config["uid"] or target["uid"] == "all": found_target_in_policy = True + check_if_multiple_policies_per_device(failsafe_multiple_policies_per_device, device_config["uid"]) + failsafe_multiple_policies_per_device = True if found_target_in_policy: append_access_layer_uid(policy, domain, ordered_layer_uids) + policy_dict = policy + + return ordered_layer_uids, policy_dict + - return ordered_layer_uids +def check_if_multiple_policies_per_device(failsafe_multiple_policies_per_device: bool, device_config_uid: str): + if failsafe_multiple_policies_per_device: + raise FwoImporterError("multiple policies for device " + device_config_uid) def append_access_layer_uid(policy: dict[str, Any], domain: str | None, ordered_layer_uids: list[str]) -> None: diff --git a/roles/importer/files/importer/fw_modules/fortiadom5ff/fmgr_rule.py b/roles/importer/files/importer/fw_modules/fortiadom5ff/fmgr_rule.py index 73fe4b3fe2..d144f04b74 100644 --- a/roles/importer/files/importer/fw_modules/fortiadom5ff/fmgr_rule.py +++ b/roles/importer/files/importer/fw_modules/fortiadom5ff/fmgr_rule.py @@ -537,7 +537,7 @@ def get_and_link_global_rulebase( if not is_rulebase_already_fetched( native_config_global["rulebases"], rulebase_type_prefix + "_v6_" + global_pkg_name ): - # delete_v: hier auch options=options? + # TODO: should try options=options here fmgr_getter.update_config_with_fortinet_api_call( native_config_global["rulebases"], sid, @@ -715,7 +715,7 @@ def get_nat_policy( ) -# delete_v: ab hier kann sehr viel weg, ich lasses vorerst zB für die nat +# TODO: delte all unused code here after nat is working again # pure nat rules diff --git a/roles/importer/files/importer/fw_modules/fortiadom5ff/fwcommon.py b/roles/importer/files/importer/fw_modules/fortiadom5ff/fwcommon.py index cc3fbe1c59..8d7f8d514e 100644 --- a/roles/importer/files/importer/fw_modules/fortiadom5ff/fwcommon.py +++ b/roles/importer/files/importer/fw_modules/fortiadom5ff/fwcommon.py @@ -45,7 +45,7 @@ def get_config( config_in.native_config["domains"].append(native_config_global) # type: ignore #TYPING: None or not None this is the question # noqa: PGH003 adom_list = build_adom_list(import_state.state) adom_device_vdom_structure = build_adom_device_vdom_structure(adom_list, sid, fm_api_url) - # delete_v: das geht schief für unschöne adoms + # TODO: get_arbitrary_vdom may fail for pre-defined forti managers arbitrary_vdom_for_updateable_objects = get_arbitrary_vdom(adom_device_vdom_structure) adom_device_vdom_policy_package_structure = add_policy_package_to_vdoms( adom_device_vdom_structure, sid, fm_api_url