-
Notifications
You must be signed in to change notification settings - Fork 21
feat: Checkpoint & Forti NAT support #4547
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
Laennart
wants to merge
81
commits into
CactuseSecurity:develop
Choose a base branch
from
weichwaren-schmiede:feature/checkpoint-nat-support
base: develop
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
81 commits
Select commit
Hold shift + click to select a range
b08ece2
feat: added standard layer
ErikPre 64da640
feat: stuff
ErikPre af68600
chore: Add TODO
Laennart 5ffdaff
fix: Policy registration
Laennart 10ecb5b
wip: Nat
Laennart a3d79be
fix: duplicate rulebase
Laennart 032920c
wip: Broken with two policies
Laennart d46c0a4
Revert "chore: Add TODO"
Laennart a8fab50
Merge remote-tracking branch 'upstream/develop' into feature/checkpoi…
Laennart 32029e6
fix: Checkpoint Policy Import
Laennart 08188e4
Merge remote-tracking branch 'upstream/develop' into feature/checkpoi…
Laennart eb60980
feat: Get NAT rulebases
Laennart a4dedce
wip: NAT import
Laennart 0f818a7
Merge remote-tracking branch 'upstream/develop' into feature/checkpoi…
Laennart dd5a7d3
fix: Database duplication errors
Laennart b2485c8
Merge remote-tracking branch 'upstream/develop' into feature/checkpoi…
Laennart 2f1e37b
fix: malformed object warning
ErikPre 0514aef
Merge remote-tracking branch 'upstream/develop' into feature/checkpoi…
Laennart 78bcf2f
feat: Add stm_link_type
Laennart b0fdcd1
wip: Proper policy mapping & nat import
Laennart 13f6ffc
Merge remote-tracking branch 'upstream/develop' into feature/checkpoi…
Laennart 3e359bd
fix: multiple NAT objects
ErikPre b1c1d0d
wip: NAT rules working as rules
ErikPre d1f7bcb
Merge remote-tracking branch 'upstream/develop' into feature/checkpoi…
Laennart 37ba8d2
chore: Revert sql upgrade script change
Laennart a47b191
fix: NAT query
Laennart 5ac8a55
wip: NAT frontend
Laennart 1c836b3
Merge remote-tracking branch 'upstream/develop' into feature/checkpoi…
Laennart bce860f
wip: NAT report
Laennart 6d373ab
wip: NAT import
Laennart 2f4f3ee
fix: NAT report
Laennart 3de8cdd
Merge remote-tracking branch 'upstream/develop' into feature/checkpoi…
Laennart f694343
fix: Revert change
Laennart 8d15525
fix: NAT report
Laennart 57db4cf
fix: Nat rules set as access rule
Laennart ed5e471
refactor: Revert changes and remove unused attributes
Laennart d813b4d
wip: Fix NAT Rulebases missing
Laennart efecb08
fix: Rulebase order
Laennart b202e53
fix: Format
Laennart 67add83
Merge remote-tracking branch 'upstream/develop' into feature/checkpoi…
Laennart 6429a85
refactor: Satisfy Sonarqube
Laennart acf5cd8
feat: copilot comment, initial check
ErikPre 9bc72fa
feat: set simpler standard
ErikPre e58a152
fix: pagination of gateways and servers
ErikPre 56d7116
Merge remote-tracking branch 'upstream/develop' into feature/checkpoi…
Laennart f396510
wip: Review comments & new link type
Laennart ae594cc
Merge remote-tracking branch 'upstream/develop' into feature/checkpoi…
Laennart 97103f9
feat: redo rule_type to type
ErikPre 268a02b
fix: Query
Laennart 4f36413
wip: Rule tree builder
Laennart 074856c
fix: Format
Laennart deab96d
Merge remote-tracking branch 'upstream/develop' into feature/checkpoi…
Laennart 4f10be6
fix: Review comments
Laennart 8e8e7f7
fix: Review comments
Laennart 0a86461
Merge branch 'develop' into feature/checkpoint-nat-support
Laennart cde5db7
feat: Forti NAT Support
Laennart 53f72a3
feat: simplify function
ErikPre 9ec613b
fix: Copilot issues
ErikPre 6e5386f
Merge remote-tracking branch 'upstream/develop' into feature/checkpoi…
ErikPre 899baa2
fix: made function easier
ErikPre 7ce4132
Merge branch 'develop' into feature/checkpoint-nat-support
Y4nnikH 6075cd9
Merge remote-tracking branch 'upstream/develop' into feature/checkpoi…
ErikPre 258e190
refactor: linting
ErikPre cf3ed5d
fix: use long for bigint from database
ErikPre aa7e16c
fix: overwrite default egde-case
ErikPre f58da15
fix: long string
ErikPre 46a6c1e
Merge remote-tracking branch 'upstream/develop' into feature/checkpoi…
ErikPre 4767bbc
fix: making sure that fcntl is not throwing an exception
ErikPre 40028f9
fix: tests for Original
ErikPre 4bf27cb
feat: ipv6 support
ErikPre 5416d30
feat: implemented all nat types
ErikPre cf3e19b
fix: made function easier
ErikPre ad37dec
fix: more sonarqube issues
ErikPre 9c04889
Merge remote-tracking branch 'upstream/develop' into feature/checkpoi…
ErikPre 9cf926a
fix: make import safer
ErikPre 7083c2c
Merge branch 'develop' into feature/checkpoint-nat-support
ErikPre 3ffda47
feat: bumped up version
ErikPre 38ff70a
refactor: set standard time object to time
ErikPre 970f884
refactor: time object
ErikPre 3277863
feat: added NAT tests
ErikPre 36e2c40
fix: cs tests
ErikPre File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Submodule .agents
updated
from 74356a to c677fd
Submodule agents
added at
a5ed17
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| insert into stm_link_type (id, name) VALUES (6, 'nat') ON CONFLICT DO NOTHING; | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
264 changes: 264 additions & 0 deletions
264
roles/importer/files/importer/fw_modules/checkpointR8x/cp_nat.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,264 @@ | ||
| from typing import Any | ||
|
|
||
| from fw_modules.checkpointR8x.cp_rule import parse_single_rule | ||
| from fwo_log import FWOLogger | ||
| from models.import_state import ImportState | ||
| from models.rulebase import Rulebase | ||
|
|
||
|
|
||
| def normalize_nat_rules( | ||
| native_config: dict[str, Any], | ||
| import_state: ImportState, | ||
| normalized_config: dict[str, Any], | ||
| ): | ||
| native_nat_rulebases = native_config.get("nat_rulebases", []) | ||
| if not native_nat_rulebases: | ||
| return | ||
|
|
||
| for gateway in native_config["gateways"]: | ||
| parse_native_nat_rulebases(gateway, native_nat_rulebases, import_state, normalized_config, native_config) | ||
|
|
||
|
ErikPre marked this conversation as resolved.
Laennart marked this conversation as resolved.
|
||
|
|
||
| def get_initial_nat_rulebase_link(gateway: dict[str, Any], normalized_config: dict[str, Any]) -> dict[str, Any] | None: | ||
| normalized_gateway = next((gw for gw in normalized_config["gateways"] if gw["Uid"] == gateway["uid"]), None) | ||
|
|
||
| if normalized_gateway is None: | ||
| FWOLogger.warning("Could not find normalized gateway for initial NAT rulebase link: " + str(gateway["uid"])) | ||
| return None | ||
|
|
||
| initial_gateway_link = next( | ||
| ( | ||
| link | ||
| for link in normalized_gateway["RulebaseLinks"] | ||
| if link.get("is_initial") and link.get("link_type") == "ordered" | ||
| ), | ||
| None, | ||
| ) | ||
|
|
||
| if initial_gateway_link is None: | ||
| FWOLogger.warning("Could not find initial gateway rulebase link for NAT rulebase: " + str(gateway["uid"])) | ||
| return None | ||
|
|
||
| return initial_gateway_link | ||
|
|
||
|
|
||
| def parse_native_nat_rulebases( | ||
| gateway: dict[str, Any], | ||
| native_nat_rulebases: list[dict[str, Any]], | ||
| import_state: ImportState, | ||
| normalized_config: dict[str, Any], | ||
| native_config: dict[str, Any], | ||
| ): | ||
| for nat_rulebase in native_nat_rulebases: | ||
| if "nat_rule_chunks" not in nat_rulebase: | ||
| continue | ||
|
|
||
| normalized_nat_rulebase = insert_parent_nat_rulebase(gateway, import_state, normalized_config) | ||
| normalized_gateway = next((gw for gw in normalized_config["gateways"] if gw["Uid"] == gateway["uid"]), None) | ||
|
|
||
| if normalized_gateway is None: | ||
| FWOLogger.warning("Could not find normalized gateway for NAT rulebase, skipping: " + str(gateway["uid"])) | ||
| continue | ||
|
|
||
| initial_gateway_link = get_initial_nat_rulebase_link(gateway, normalized_config) | ||
|
|
||
| if initial_gateway_link is None: | ||
| continue | ||
|
|
||
| initial_to_rulebase_uid = initial_gateway_link.get("to_rulebase_uid") | ||
| if not initial_to_rulebase_uid: | ||
| FWOLogger.warning( | ||
| "Initial gateway rulebase link is missing to_rulebase_uid for NAT rulebase, skipping: " | ||
| + str(gateway["uid"]) | ||
| ) | ||
| continue | ||
|
|
||
| insert_rulebase_link( | ||
| from_rulebase_uid=initial_to_rulebase_uid, | ||
| to_rulebase_uid=normalized_nat_rulebase.uid, | ||
| link_type="nat", | ||
| normalized_gateway=normalized_gateway, | ||
| ) | ||
|
ErikPre marked this conversation as resolved.
|
||
|
|
||
| for chunk in nat_rulebase["nat_rule_chunks"]: | ||
| parse_nat_rule_chunk( | ||
| chunk, | ||
| normalized_nat_rulebase, | ||
| gateway, | ||
| native_config, | ||
| import_state, | ||
| normalized_config, | ||
| normalized_gateway, | ||
| ) | ||
|
|
||
|
|
||
| def insert_parent_nat_rulebase( | ||
| gateway: dict[str, Any], | ||
| import_state: ImportState, | ||
| normalized_config: dict[str, Any], | ||
| ) -> Rulebase: | ||
| nat_rulebase_uid = "nat-rulebase-" + gateway["uid"] | ||
| existing_nat_rulebase = next((rb for rb in normalized_config["policies"] if rb.uid == nat_rulebase_uid), None) | ||
|
|
||
| if existing_nat_rulebase is not None: | ||
| return existing_nat_rulebase | ||
|
|
||
| normalized_nat_rulebase = Rulebase( | ||
| uid=nat_rulebase_uid, | ||
| mgm_uid=import_state.mgm_details.uid, | ||
| name="NAT", | ||
| rules={}, | ||
| ) | ||
|
|
||
| normalized_config["policies"].append(normalized_nat_rulebase) | ||
|
|
||
| return normalized_nat_rulebase | ||
|
|
||
|
|
||
| def insert_rulebase_link( | ||
| from_rulebase_uid: str, | ||
| to_rulebase_uid: str, | ||
| link_type: str, | ||
| normalized_gateway: dict[str, Any], | ||
| ) -> None: | ||
| if not any( | ||
| link | ||
| for link in normalized_gateway["RulebaseLinks"] | ||
| if link["to_rulebase_uid"] == to_rulebase_uid | ||
| and link["link_type"] == link_type | ||
| and link["from_rulebase_uid"] == from_rulebase_uid | ||
| ): | ||
| normalized_gateway["RulebaseLinks"].append( | ||
| { | ||
| "from_rulebase_uid": from_rulebase_uid, | ||
| "to_rulebase_uid": to_rulebase_uid, | ||
| "link_type": link_type, | ||
| "is_initial": False, | ||
| "is_global": False, | ||
| "is_section": False, | ||
| } | ||
| ) | ||
|
|
||
|
|
||
| def parse_nat_rulebase( | ||
| src_rulebase: dict[str, Any], | ||
| normalized_nat_rulebase: Rulebase, | ||
| gateway: dict[str, Any], | ||
| native_config: dict[str, Any], | ||
| import_state: ImportState, | ||
| normalized_config: dict[str, Any], | ||
| normalized_gateway: dict[str, Any], | ||
| ): | ||
| section_rulebase = Rulebase( | ||
| uid=src_rulebase["uid"], | ||
| mgm_uid=import_state.mgm_details.uid, | ||
| name=src_rulebase["name"], | ||
| rules={}, | ||
| ) | ||
|
|
||
| if not any(rb for rb in normalized_config["policies"] if rb.uid == section_rulebase.uid): | ||
| normalized_config["policies"].append(section_rulebase) | ||
|
|
||
| insert_rulebase_link( | ||
| from_rulebase_uid=normalized_nat_rulebase.uid, | ||
| to_rulebase_uid=section_rulebase.uid, | ||
| link_type="nat", | ||
| normalized_gateway=normalized_gateway, | ||
| ) | ||
|
|
||
| for rule in src_rulebase["rulebase"]: | ||
| parse_nat_rule(rule, section_rulebase, gateway, native_config) | ||
|
|
||
|
|
||
| def parse_nat_rule( | ||
| src_rulebase: dict[str, Any], | ||
| rulebase: Rulebase, | ||
| gateway: dict[str, Any], | ||
| native_config: dict[str, Any], | ||
| ): | ||
| (rule_match, rule_xlate) = parse_nat_rule_transform(src_rulebase) | ||
| parse_single_rule( | ||
| rule_match, | ||
| rulebase, | ||
| rulebase.name, | ||
| rulebase.uid, | ||
| gateway, | ||
| native_config["policies"], | ||
| ) | ||
| parse_single_rule( # do not increase rule_num here (xlate rules do not count) | ||
| rule_xlate, | ||
| rulebase, | ||
| rulebase.name, | ||
| rulebase.uid, | ||
| gateway, | ||
| native_config["policies"], | ||
| ) | ||
|
|
||
|
|
||
| def parse_nat_rule_chunk( | ||
| chunk: dict[str, Any], | ||
| normalized_nat_rulebase: Rulebase, | ||
| gateway: dict[str, Any], | ||
| native_config: dict[str, Any], | ||
| import_state: ImportState, | ||
| normalized_config: dict[str, Any], | ||
| normalized_gateway: dict[str, Any], | ||
| ): | ||
| if "rulebase" not in chunk: | ||
| return | ||
|
|
||
| for src_rulebase in chunk["rulebase"]: | ||
| if "rulebase" in src_rulebase: | ||
| parse_nat_rulebase( | ||
| src_rulebase, | ||
| normalized_nat_rulebase, | ||
| gateway, | ||
| native_config, | ||
| import_state, | ||
| normalized_config, | ||
| normalized_gateway, | ||
| ) | ||
| if "rule-number" in src_rulebase: # rulebase is just a single rule (xlate rules do not count) | ||
| parse_nat_rule(src_rulebase, normalized_nat_rulebase, gateway, native_config) | ||
|
|
||
|
|
||
| def parse_nat_rule_transform(nat_rule: dict[str, Any]) -> tuple[dict[str, Any], dict[str, Any]]: | ||
| nat_in_rule = { | ||
| "uid": nat_rule["uid"], | ||
| "source": [nat_rule["original-source"]], | ||
| "destination": [nat_rule["original-destination"]], | ||
| "service": [nat_rule["original-service"]], | ||
| "action": [{"name": "accept", "type": "nat-action", "uid": nat_rule["uid"] + "_original-action"}], | ||
| "track": [{"type": "nat", "name": "None", "uid": nat_rule["uid"]}], | ||
| "type": "nat", | ||
| "rule-number": 0, | ||
| "source-negate": False, | ||
| "destination-negate": False, | ||
| "service-negate": False, | ||
| "install-on": nat_rule["install-on"], | ||
| "time": nat_rule.get("time", "time"), | ||
| "enabled": nat_rule["enabled"], | ||
| "comments": nat_rule["comments"], | ||
|
ErikPre marked this conversation as resolved.
|
||
| "nat_rule": True, | ||
| "xlate_rule_uid": nat_rule["uid"] + "_translated", | ||
| "access_rule": False, | ||
| } | ||
|
Laennart marked this conversation as resolved.
ErikPre marked this conversation as resolved.
|
||
| nat_out_rule = { | ||
| "uid": nat_rule["uid"] + "_translated", | ||
| "source": [nat_rule["translated-source"]], | ||
| "destination": [nat_rule["translated-destination"]], | ||
| "service": [nat_rule["translated-service"]], | ||
| "action": [{"name": "accept", "type": "nat-action", "uid": nat_rule["uid"] + "_translated-action"}], | ||
| "track": [{"type": "nat", "name": "None", "uid": nat_rule["uid"] + "_translated"}], | ||
| "type": "nat", | ||
| "rule-number": 0, | ||
| "enabled": True, | ||
| "source-negate": False, | ||
| "destination-negate": False, | ||
| "service-negate": False, | ||
| "install-on": nat_rule["install-on"], | ||
| "time": "time", | ||
| "nat_rule": True, | ||
| "access_rule": False, | ||
| } | ||
| return (nat_in_rule, nat_out_rule) | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.