diff --git a/application/tests/web_main_test.py b/application/tests/web_main_test.py index 9e219b4ce..9302ab9b8 100644 --- a/application/tests/web_main_test.py +++ b/application/tests/web_main_test.py @@ -7,6 +7,7 @@ import json import unittest import tempfile +from types import SimpleNamespace from unittest.mock import patch import redis @@ -688,7 +689,200 @@ def test_standards_from_db(self, node_mock, redis_conn_mock) -> None: headers={"Content-Type": "application/json"}, ) self.assertEqual(200, response.status_code) - self.assertEqual(expected, json.loads(response.data)) + self.assertEqual(expected + ["OpenCRE"], json.loads(response.data)) + + @patch.object(web_main.gap_analysis, "schedule") + @patch.object(db, "Node_collection") + def test_gap_analysis_supports_opencre_as_standard( + self, db_mock, schedule_mock + ) -> None: + shared_cre = defs.CRE(id="170-772", name="Cryptography", description="") + compare = defs.Standard( + name="OWASP Web Security Testing Guide (WSTG)", + section="WSTG-CRYP-04", + ) + compare.add_link( + defs.Link(ltype=defs.LinkTypes.LinkedTo, document=shared_cre.shallow_copy()) + ) + opencre = defs.CRE(id="170-772", name="Cryptography", description="") + opencre.add_link( + defs.Link(ltype=defs.LinkTypes.LinkedTo, document=compare.shallow_copy()) + ) + + db_mock.return_value.get_gap_analysis_result.return_value = None + db_mock.return_value.gap_analysis_exists.return_value = False + db_mock.return_value.get_nodes.side_effect = lambda name=None, **kwargs: ( + [compare] if name == "OWASP Web Security Testing Guide (WSTG)" else [] + ) + db_mock.return_value.session.query.return_value.all.return_value = [ + SimpleNamespace(id="cre-internal-1") + ] + db_mock.return_value.get_CREs.return_value = [opencre] + + with self.app.test_client() as client: + response = client.get( + "/rest/v1/map_analysis?standard=OpenCRE&standard=OWASP%20Web%20Security%20Testing%20Guide%20(WSTG)", + headers={"Content-Type": "application/json"}, + ) + + payload = json.loads(response.data) + self.assertEqual(200, response.status_code) + self.assertIn("result", payload) + self.assertIn(opencre.id, payload["result"]) + self.assertEqual(1, len(payload["result"][opencre.id]["paths"])) + path = next(iter(payload["result"][opencre.id]["paths"].values())) + self.assertEqual(compare.id, path["end"]["id"]) + schedule_mock.assert_not_called() + + @patch.object(web_main.gap_analysis, "schedule") + @patch.object(db, "Node_collection") + def test_gap_analysis_returns_only_direct_opencre_mappings( + self, db_mock, schedule_mock + ) -> None: + compare = defs.Standard( + name="CWE", + sectionID="1004", + section="Sensitive Cookie Without 'HttpOnly' Flag", + ) + direct_cre = defs.CRE( + id="804-220", + name="Set httponly attribute for cookie-based session tokens", + description="", + ) + direct_cre.add_link( + defs.Link(ltype=defs.LinkTypes.LinkedTo, document=compare.shallow_copy()) + ) + auto_linked_cres = [] + for i, cre_id in enumerate( + [ + "117-371", + "166-151", + "284-521", + "368-633", + "612-252", + "664-080", + "801-310", + ], + start=1, + ): + cre = defs.CRE( + id=cre_id, + name=f"Automatically mapped CRE {i}", + description="", + ) + cre.add_link( + defs.Link( + ltype=defs.LinkTypes.AutomaticallyLinkedTo, + document=compare.shallow_copy(), + ) + ) + auto_linked_cres.append(cre) + + opencre_documents = [direct_cre] + auto_linked_cres + internal_ids = [ + SimpleNamespace(id=f"cre-internal-{i}") + for i in range(len(opencre_documents)) + ] + + db_mock.return_value.get_gap_analysis_result.return_value = None + db_mock.return_value.gap_analysis_exists.return_value = False + db_mock.return_value.get_nodes.side_effect = lambda name=None, **kwargs: ( + [compare] if name == "CWE" else [] + ) + db_mock.return_value.session.query.return_value.all.return_value = internal_ids + db_mock.return_value.get_CREs.side_effect = lambda internal_id=None, **kwargs: [ + next( + cre + for index, cre in enumerate(opencre_documents) + if internal_id == f"cre-internal-{index}" + ) + ] + + with self.app.test_client() as client: + response = client.get( + "/rest/v1/map_analysis?standard=CWE&standard=OpenCRE", + headers={"Content-Type": "application/json"}, + ) + + payload = json.loads(response.data) + self.assertEqual(200, response.status_code) + self.assertIn("result", payload) + self.assertEqual([compare.id], list(payload["result"].keys())) + self.assertEqual(1, len(payload["result"][compare.id]["paths"])) + path = next(iter(payload["result"][compare.id]["paths"].values())) + self.assertEqual(compare.id, payload["result"][compare.id]["start"]["id"]) + self.assertEqual(direct_cre.id, path["end"]["id"]) + self.assertEqual(direct_cre.name, path["end"]["name"]) + self.assertEqual("", path["path"][0]["start"]["id"]) + self.assertEqual(direct_cre.id, path["path"][0]["end"]["id"]) + schedule_mock.assert_not_called() + + @patch.object(web_main.gap_analysis, "schedule") + @patch.object(db, "Node_collection") + def test_gap_analysis_returns_only_direct_opencre_mappings_when_opencre_is_left( + self, db_mock, schedule_mock + ) -> None: + compare = defs.Standard( + name="CWE", + sectionID="1004", + section="Sensitive Cookie Without 'HttpOnly' Flag", + ) + direct_cre = defs.CRE( + id="804-220", + name="Set httponly attribute for cookie-based session tokens", + description="", + ) + direct_cre.add_link( + defs.Link(ltype=defs.LinkTypes.LinkedTo, document=compare.shallow_copy()) + ) + indirect_cre = defs.CRE( + id="117-371", + name="Use a centralized access control mechanism", + description="", + ) + indirect_cre.add_link( + defs.Link( + ltype=defs.LinkTypes.AutomaticallyLinkedTo, + document=compare.shallow_copy(), + ) + ) + + opencre_documents = [direct_cre, indirect_cre] + internal_ids = [ + SimpleNamespace(id=f"cre-internal-{i}") + for i in range(len(opencre_documents)) + ] + + db_mock.return_value.get_gap_analysis_result.return_value = None + db_mock.return_value.gap_analysis_exists.return_value = False + db_mock.return_value.get_nodes.side_effect = lambda name=None, **kwargs: ( + [compare] if name == "CWE" else [] + ) + db_mock.return_value.session.query.return_value.all.return_value = internal_ids + db_mock.return_value.get_CREs.side_effect = lambda internal_id=None, **kwargs: [ + next( + cre + for index, cre in enumerate(opencre_documents) + if internal_id == f"cre-internal-{index}" + ) + ] + + with self.app.test_client() as client: + response = client.get( + "/rest/v1/map_analysis?standard=OpenCRE&standard=CWE", + headers={"Content-Type": "application/json"}, + ) + + payload = json.loads(response.data) + self.assertEqual(200, response.status_code) + self.assertEqual([direct_cre.id], list(payload["result"].keys())) + self.assertEqual(1, len(payload["result"][direct_cre.id]["paths"])) + path = next(iter(payload["result"][direct_cre.id]["paths"].values())) + self.assertEqual(direct_cre.id, payload["result"][direct_cre.id]["start"]["id"]) + self.assertEqual(compare.id, path["end"]["id"]) + self.assertEqual(direct_cre.id, path["path"][0]["start"]["id"]) + self.assertEqual(compare.id, path["path"][0]["end"]["id"]) + schedule_mock.assert_not_called() def test_gap_analysis_weak_links_no_cache(self) -> None: with self.app.test_client() as client: diff --git a/application/web/web_main.py b/application/web/web_main.py index 29567470a..54331f2da 100644 --- a/application/web/web_main.py +++ b/application/web/web_main.py @@ -48,6 +48,7 @@ ITEMS_PER_PAGE = 20 +OPENCRE_STANDARD_NAME = "OpenCRE" app = Blueprint( "web", @@ -294,6 +295,116 @@ def find_document_by_tag() -> Any: abort(404, "Tag does not exist") +def _get_opencre_documents(collection: db.Node_collection) -> list[defs.CRE]: + return [ + collection.get_CREs(internal_id=cre.id)[0] + for cre in collection.session.query(db.CRE).all() + ] + + +def _get_map_analysis_documents( + standard: str, collection: db.Node_collection +) -> list[defs.Document]: + if standard == OPENCRE_STANDARD_NAME: + return _get_opencre_documents(collection) + return collection.get_nodes(name=standard) + + +def _build_direct_link_path( + start_document: defs.Document, end_document: defs.Document +) -> dict[str, Any]: + segment_start = start_document.shallow_copy() + # The current gap-analysis popup mutates non-CRE row ids during display + # before it resolves the one-step direct path. Keep this direct-link fast + # path compatible by mirroring that display-only shape in the segment start. + if segment_start.doctype != defs.Credoctypes.CRE.value: + segment_start.id = "" + return { + "end": end_document.shallow_copy(), + "path": [ + { + "start": segment_start, + "end": end_document.shallow_copy(), + "relationship": "LINKED_TO", + "score": 0, + } + ], + "score": 0, + } + + +def _make_direct_link_path_key(end_document: defs.Document) -> str: + return end_document.id + + +def _add_direct_link_result( + grouped_paths: dict[str, dict[str, Any]], + start_document: defs.Document, + end_document: defs.Document, +) -> None: + shared_paths = grouped_paths.setdefault( + start_document.id, + { + "start": start_document.shallow_copy(), + "paths": {}, + "extra": 0, + }, + )["paths"] + shared_paths.setdefault( + _make_direct_link_path_key(end_document), + _build_direct_link_path(start_document, end_document), + ) + + +def _build_direct_cre_overlap_map_analysis( + standards: list[str], + standards_hash: str, + collection: db.Node_collection, +) -> dict[str, Any] | None: + if len(standards) < 2: + return None + + base_standard = standards[0] + compare_standard = standards[1] + base_nodes = _get_map_analysis_documents(base_standard, collection) + compare_nodes = _get_map_analysis_documents(compare_standard, collection) + if not base_nodes or not compare_nodes: + return None + + base_is_opencre = base_standard == OPENCRE_STANDARD_NAME + opencre_nodes = base_nodes if base_is_opencre else compare_nodes + standard_nodes = compare_nodes if base_is_opencre else base_nodes + + standard_nodes_by_id = { + standard_node.id: standard_node for standard_node in standard_nodes + } + direct_pairs: list[tuple[defs.CRE, defs.Document]] = [] + for opencre_node in opencre_nodes: + for link in opencre_node.links: + if link.ltype != defs.LinkTypes.LinkedTo: + continue + standard_node = standard_nodes_by_id.get(link.document.id) + if not standard_node: + continue + direct_pairs.append((opencre_node, standard_node)) + + grouped_paths: dict[str, dict[str, Any]] = {} + for opencre_node, standard_node in direct_pairs: + if base_is_opencre: + _add_direct_link_result(grouped_paths, opencre_node, standard_node) + else: + _add_direct_link_result(grouped_paths, standard_node, opencre_node) + + if not grouped_paths: + return None + + result = {"result": grouped_paths} + collection.add_gap_analysis_result( + cache_key=standards_hash, ga_object=flask_json.dumps(result) + ) + return result + + @app.route("/rest/v1/map_analysis", methods=["GET"]) def map_analysis() -> Any: standards = request.args.getlist("standard") @@ -301,9 +412,16 @@ def map_analysis() -> Any: posthog.capture(f"map_analysis", f"standards:{standards}") database = db.Node_collection() - standards = request.args.getlist("standard") standards_hash = gap_analysis.make_resources_key(standards) + if OPENCRE_STANDARD_NAME in standards: + direct_gap_analysis = _build_direct_cre_overlap_map_analysis( + standards, standards_hash, database + ) + if direct_gap_analysis: + return jsonify(direct_gap_analysis) + abort(404, "No direct overlap found for requested standards") + # First, check if we have cached results in the database if database.gap_analysis_exists(standards_hash): gap_analysis_result = database.get_gap_analysis_result(standards_hash) @@ -438,7 +556,9 @@ def standards() -> Any: posthog.capture(f"standards", "") database = db.Node_collection() - standards = database.standards() + standards = list(database.standards()) + if OPENCRE_STANDARD_NAME not in standards: + standards.append(OPENCRE_STANDARD_NAME) return standards