From 3189f972076c9bfd679c8d026e7903affb8bd524 Mon Sep 17 00:00:00 2001 From: bornunique911 Date: Tue, 24 Mar 2026 23:19:36 +0530 Subject: [PATCH 1/5] Add OpenCRE as a map analysis resource --- application/tests/web_main_test.py | 44 ++++++++- application/web/web_main.py | 143 ++++++++++++++++++++++++++++- 2 files changed, 185 insertions(+), 2 deletions(-) diff --git a/application/tests/web_main_test.py b/application/tests/web_main_test.py index 9e219b4ce..f66be64d7 100644 --- a/application/tests/web_main_test.py +++ b/application/tests/web_main_test.py @@ -7,6 +7,7 @@ import json import unittest import tempfile +from types import SimpleNamespace from unittest.mock import patch import redis @@ -688,7 +689,48 @@ def test_standards_from_db(self, node_mock, redis_conn_mock) -> None: headers={"Content-Type": "application/json"}, ) self.assertEqual(200, response.status_code) - self.assertEqual(expected, json.loads(response.data)) + self.assertEqual(expected + ["OpenCRE"], json.loads(response.data)) + + @patch.object(web_main.gap_analysis, "schedule") + @patch.object(db, "Node_collection") + def test_gap_analysis_supports_opencre_as_standard( + self, db_mock, schedule_mock + ) -> None: + shared_cre = defs.CRE(id="170-772", name="Cryptography", description="") + compare = defs.Standard( + name="OWASP Web Security Testing Guide (WSTG)", + section="WSTG-CRYP-04", + ) + compare.add_link( + defs.Link(ltype=defs.LinkTypes.LinkedTo, document=shared_cre.shallow_copy()) + ) + opencre = defs.CRE(id="170-772", name="Cryptography", description="") + opencre.add_link( + defs.Link(ltype=defs.LinkTypes.LinkedTo, document=compare.shallow_copy()) + ) + + db_mock.return_value.get_gap_analysis_result.return_value = None + db_mock.return_value.gap_analysis_exists.return_value = False + db_mock.return_value.get_nodes.side_effect = lambda name=None, **kwargs: ( + [compare] if name == "OWASP Web Security Testing Guide (WSTG)" else [] + ) + db_mock.return_value.session.query.return_value.all.return_value = [ + SimpleNamespace(id="cre-internal-1") + ] + db_mock.return_value.get_CREs.return_value = [opencre] + + with self.app.test_client() as client: + response = client.get( + "/rest/v1/map_analysis?standard=OpenCRE&standard=OWASP%20Web%20Security%20Testing%20Guide%20(WSTG)", + headers={"Content-Type": "application/json"}, + ) + + payload = json.loads(response.data) + self.assertEqual(200, response.status_code) + self.assertIn("result", payload) + self.assertIn(opencre.id, payload["result"]) + self.assertIn(compare.id, payload["result"][opencre.id]["paths"]) + schedule_mock.assert_not_called() def test_gap_analysis_weak_links_no_cache(self) -> None: with self.app.test_client() as client: diff --git a/application/web/web_main.py b/application/web/web_main.py index 29567470a..e322f5445 100644 --- a/application/web/web_main.py +++ b/application/web/web_main.py @@ -48,6 +48,14 @@ ITEMS_PER_PAGE = 20 +OWASP_TOP10_2025_DATA_FILE = ( + pathlib.Path(__file__).resolve().parent.parent + / "utils" + / "external_project_parsers" + / "data" + / "owasp_top10_2025.json" +) +OPENCRE_STANDARD_NAME = "OpenCRE" app = Blueprint( "web", @@ -294,6 +302,129 @@ def find_document_by_tag() -> Any: abort(404, "Tag does not exist") +def _get_opencre_documents(collection: db.Node_collection) -> list[defs.CRE]: + return [ + collection.get_CREs(internal_id=cre.id)[0] + for cre in collection.session.query(db.CRE).all() + ] + + +def _get_map_analysis_documents( + standard: str, collection: db.Node_collection +) -> list[defs.Document]: + if standard == OPENCRE_STANDARD_NAME: + return _get_opencre_documents(collection) + return collection.get_nodes(name=standard) + + +def _get_document_cre_ids(document: defs.Document) -> list[str]: + if document.doctype == defs.Credoctypes.CRE: + return [document.id] + return [ + link.document.id + for link in document.links + if link.document.doctype == defs.Credoctypes.CRE + ] + + +def _build_direct_overlap_path( + base_document: defs.Document, cre_id: str, compare_document: defs.Document +) -> dict[str, Any] | None: + if base_document.doctype == defs.Credoctypes.CRE: + if compare_document.doctype == defs.Credoctypes.CRE: + return None + return { + "end": compare_document.shallow_copy(), + "path": [ + { + "start": base_document.shallow_copy(), + "end": compare_document.shallow_copy(), + "relationship": "LINKED_TO", + "score": 0, + } + ], + "score": 0, + } + + if compare_document.doctype == defs.Credoctypes.CRE: + return { + "end": compare_document.shallow_copy(), + "path": [ + { + "start": base_document.shallow_copy(), + "end": compare_document.shallow_copy(), + "relationship": "LINKED_TO", + "score": 0, + } + ], + "score": 0, + } + + return { + "end": compare_document.shallow_copy(), + "path": [ + { + "start": base_document.shallow_copy(), + "end": defs.CRE(id=cre_id).shallow_copy(), + "relationship": "LINKED_TO", + "score": 0, + }, + { + "start": defs.CRE(id=cre_id).shallow_copy(), + "end": compare_document.shallow_copy(), + "relationship": "LINKED_TO", + "score": 0, + }, + ], + "score": 0, + } + + +def _build_direct_cre_overlap_map_analysis( + standards: list[str], + standards_hash: str, + collection: db.Node_collection, +) -> dict[str, Any] | None: + if len(standards) < 2: + return None + + base_nodes = _get_map_analysis_documents(standards[0], collection) + compare_nodes = _get_map_analysis_documents(standards[1], collection) + if not base_nodes or not compare_nodes: + return None + + compare_nodes_by_cre: dict[str, list[defs.Document]] = {} + for compare_node in compare_nodes: + for cre_id in _get_document_cre_ids(compare_node): + compare_nodes_by_cre.setdefault(cre_id, []).append(compare_node) + + grouped_paths: dict[str, dict[str, Any]] = {} + for base_node in base_nodes: + shared_paths: dict[str, Any] = {} + for cre_id in _get_document_cre_ids(base_node): + for compare_node in compare_nodes_by_cre.get(cre_id, []): + path = _build_direct_overlap_path(base_node, cre_id, compare_node) + if not path: + continue + shared_paths.setdefault(compare_node.id, path) + + if shared_paths: + grouped_paths[base_node.id] = { + "start": base_node.shallow_copy(), + "paths": shared_paths, + "extra": 0, + } + + if not grouped_paths: + return None + + result = {"result": grouped_paths} + collection.add_gap_analysis_result( + cache_key=standards_hash, ga_object=flask_json.dumps(result) + ) + return result + + @app.route("/rest/v1/map_analysis", methods=["GET"]) def map_analysis() -> Any: standards = request.args.getlist("standard") @@ -304,6 +435,14 @@ def map_analysis() -> Any: standards = request.args.getlist("standard") standards_hash = gap_analysis.make_resources_key(standards) + if OPENCRE_STANDARD_NAME in standards: + direct_gap_analysis = _build_direct_cre_overlap_map_analysis( + standards, standards_hash, database + ) + if direct_gap_analysis: + return jsonify(direct_gap_analysis) + abort(404, "No direct overlap found for requested standards") + # First, check if we have cached results in the database if database.gap_analysis_exists(standards_hash): gap_analysis_result = database.get_gap_analysis_result(standards_hash) @@ -438,7 +577,9 @@ def standards() -> Any: posthog.capture(f"standards", "") database = db.Node_collection() - standards = database.standards() + standards = list(database.standards()) + if OPENCRE_STANDARD_NAME not in standards: + standards.append(OPENCRE_STANDARD_NAME) return standards From 73dbd613bf94fae350931461c5ae91bbf4fd4a41 Mon Sep 17 00:00:00 2001 From: bornunique911 Date: Fri, 27 Mar 2026 02:31:02 +0530 Subject: [PATCH 2/5] Preserve multiple OpenCRE map analysis overlaps --- application/tests/web_main_test.py | 65 +++++++++++++++++++++++++++++- application/web/web_main.py | 8 +++- 2 files changed, 71 insertions(+), 2 deletions(-) diff --git a/application/tests/web_main_test.py b/application/tests/web_main_test.py index f66be64d7..fb6e83db8 100644 --- a/application/tests/web_main_test.py +++ b/application/tests/web_main_test.py @@ -729,7 +729,70 @@ def test_gap_analysis_supports_opencre_as_standard( self.assertEqual(200, response.status_code) self.assertIn("result", payload) self.assertIn(opencre.id, payload["result"]) - self.assertIn(compare.id, payload["result"][opencre.id]["paths"]) + self.assertEqual(1, len(payload["result"][opencre.id]["paths"])) + path = next(iter(payload["result"][opencre.id]["paths"].values())) + self.assertEqual(compare.id, path["end"]["id"]) + schedule_mock.assert_not_called() + + @patch.object(web_main.gap_analysis, "schedule") + @patch.object(db, "Node_collection") + def test_gap_analysis_preserves_multiple_opencre_overlaps( + self, db_mock, schedule_mock + ) -> None: + compare = defs.Standard( + name="CWE", + sectionID="1004", + section="Sensitive Cookie Without 'HttpOnly' Flag", + ) + opencre_documents = [] + internal_ids = [] + + for i in range(8): + cre = defs.CRE( + id=f"170-77{i}", + name=f"Cryptography {i}", + description="", + ) + compare.add_link( + defs.Link(ltype=defs.LinkTypes.LinkedTo, document=cre.shallow_copy()) + ) + opencre_documents.append(cre) + internal_ids.append(SimpleNamespace(id=f"cre-internal-{i}")) + + db_mock.return_value.get_gap_analysis_result.return_value = None + db_mock.return_value.gap_analysis_exists.return_value = False + db_mock.return_value.get_nodes.side_effect = lambda name=None, **kwargs: ( + [compare] if name == "CWE" else [] + ) + db_mock.return_value.session.query.return_value.all.return_value = internal_ids + db_mock.return_value.get_CREs.side_effect = lambda internal_id=None, **kwargs: [ + next( + cre + for index, cre in enumerate(opencre_documents) + if internal_id == f"cre-internal-{index}" + ) + ] + + with self.app.test_client() as client: + response = client.get( + "/rest/v1/map_analysis?standard=CWE&standard=OpenCRE", + headers={"Content-Type": "application/json"}, + ) + + payload = json.loads(response.data) + self.assertEqual(200, response.status_code) + self.assertIn("result", payload) + self.assertIn(compare.id, payload["result"]) + self.assertEqual(8, len(payload["result"][compare.id]["paths"])) + self.assertEqual( + 8, + len( + { + path["end"]["id"] + for path in payload["result"][compare.id]["paths"].values() + } + ), + ) schedule_mock.assert_not_called() def test_gap_analysis_weak_links_no_cache(self) -> None: diff --git a/application/web/web_main.py b/application/web/web_main.py index e322f5445..f8bdb989f 100644 --- a/application/web/web_main.py +++ b/application/web/web_main.py @@ -380,6 +380,10 @@ def _build_direct_overlap_path( } +def _make_direct_overlap_path_key(compare_document: defs.Document, cre_id: str) -> str: + return f"{compare_document.id}::{cre_id}" + + def _build_direct_cre_overlap_map_analysis( standards: list[str], standards_hash: str, @@ -406,7 +410,9 @@ def _build_direct_cre_overlap_map_analysis( path = _build_direct_overlap_path(base_node, cre_id, compare_node) if not path: continue - shared_paths.setdefault(compare_node.id, path) + shared_paths.setdefault( + _make_direct_overlap_path_key(compare_node, cre_id), path + ) if shared_paths: grouped_paths[base_node.id] = { From 389641e95e597e879592eb932c7dd761b5db0df2 Mon Sep 17 00:00:00 2001 From: bornunique911 Date: Fri, 27 Mar 2026 05:10:41 +0530 Subject: [PATCH 3/5] Clean issue #469 web_main.py --- application/web/web_main.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/application/web/web_main.py b/application/web/web_main.py index f8bdb989f..df0917b2d 100644 --- a/application/web/web_main.py +++ b/application/web/web_main.py @@ -48,13 +48,6 @@ ITEMS_PER_PAGE = 20 -OWASP_TOP10_2025_DATA_FILE = ( - pathlib.Path(__file__).resolve().parent.parent - / "utils" - / "external_project_parsers" - / "data" - / "owasp_top10_2025.json" -) OPENCRE_STANDARD_NAME = "OpenCRE" app = Blueprint( From 7f6c5dabacbeefdafbfb8161cc6fb32bf2106c01 Mon Sep 17 00:00:00 2001 From: bornunique911 Date: Fri, 27 Mar 2026 22:23:25 +0530 Subject: [PATCH 4/5] Add direct-only OpenCRE map analysis --- application/tests/web_main_test.py | 129 +++++++++++++++++++++++----- application/web/web_main.py | 132 ++++++++++++----------------- 2 files changed, 161 insertions(+), 100 deletions(-) diff --git a/application/tests/web_main_test.py b/application/tests/web_main_test.py index fb6e83db8..1d55fa3c4 100644 --- a/application/tests/web_main_test.py +++ b/application/tests/web_main_test.py @@ -736,7 +736,7 @@ def test_gap_analysis_supports_opencre_as_standard( @patch.object(web_main.gap_analysis, "schedule") @patch.object(db, "Node_collection") - def test_gap_analysis_preserves_multiple_opencre_overlaps( + def test_gap_analysis_returns_only_direct_opencre_mappings( self, db_mock, schedule_mock ) -> None: compare = defs.Standard( @@ -744,20 +744,45 @@ def test_gap_analysis_preserves_multiple_opencre_overlaps( sectionID="1004", section="Sensitive Cookie Without 'HttpOnly' Flag", ) - opencre_documents = [] - internal_ids = [] - - for i in range(8): + direct_cre = defs.CRE( + id="804-220", + name="Set httponly attribute for cookie-based session tokens", + description="", + ) + direct_cre.add_link( + defs.Link(ltype=defs.LinkTypes.LinkedTo, document=compare.shallow_copy()) + ) + auto_linked_cres = [] + for i, cre_id in enumerate( + [ + "117-371", + "166-151", + "284-521", + "368-633", + "612-252", + "664-080", + "801-310", + ], + start=1, + ): cre = defs.CRE( - id=f"170-77{i}", - name=f"Cryptography {i}", + id=cre_id, + name=f"Automatically mapped CRE {i}", description="", ) - compare.add_link( - defs.Link(ltype=defs.LinkTypes.LinkedTo, document=cre.shallow_copy()) + cre.add_link( + defs.Link( + ltype=defs.LinkTypes.AutomaticallyLinkedTo, + document=compare.shallow_copy(), + ) ) - opencre_documents.append(cre) - internal_ids.append(SimpleNamespace(id=f"cre-internal-{i}")) + auto_linked_cres.append(cre) + + opencre_documents = [direct_cre] + auto_linked_cres + internal_ids = [ + SimpleNamespace(id=f"cre-internal-{i}") + for i in range(len(opencre_documents)) + ] db_mock.return_value.get_gap_analysis_result.return_value = None db_mock.return_value.gap_analysis_exists.return_value = False @@ -782,17 +807,79 @@ def test_gap_analysis_preserves_multiple_opencre_overlaps( payload = json.loads(response.data) self.assertEqual(200, response.status_code) self.assertIn("result", payload) - self.assertIn(compare.id, payload["result"]) - self.assertEqual(8, len(payload["result"][compare.id]["paths"])) - self.assertEqual( - 8, - len( - { - path["end"]["id"] - for path in payload["result"][compare.id]["paths"].values() - } - ), + self.assertEqual([compare.id], list(payload["result"].keys())) + self.assertEqual(1, len(payload["result"][compare.id]["paths"])) + path = next(iter(payload["result"][compare.id]["paths"].values())) + self.assertEqual(direct_cre.id, path["end"]["id"]) + self.assertEqual(direct_cre.name, path["end"]["name"]) + self.assertEqual(compare.id, path["path"][0]["start"]["id"]) + self.assertEqual(direct_cre.id, path["path"][0]["end"]["id"]) + schedule_mock.assert_not_called() + + @patch.object(web_main.gap_analysis, "schedule") + @patch.object(db, "Node_collection") + def test_gap_analysis_returns_only_direct_opencre_mappings_when_opencre_is_left( + self, db_mock, schedule_mock + ) -> None: + compare = defs.Standard( + name="CWE", + sectionID="1004", + section="Sensitive Cookie Without 'HttpOnly' Flag", + ) + direct_cre = defs.CRE( + id="804-220", + name="Set httponly attribute for cookie-based session tokens", + description="", + ) + direct_cre.add_link( + defs.Link(ltype=defs.LinkTypes.LinkedTo, document=compare.shallow_copy()) + ) + indirect_cre = defs.CRE( + id="117-371", + name="Use a centralized access control mechanism", + description="", + ) + indirect_cre.add_link( + defs.Link( + ltype=defs.LinkTypes.AutomaticallyLinkedTo, + document=compare.shallow_copy(), + ) ) + + opencre_documents = [direct_cre, indirect_cre] + internal_ids = [ + SimpleNamespace(id=f"cre-internal-{i}") + for i in range(len(opencre_documents)) + ] + + db_mock.return_value.get_gap_analysis_result.return_value = None + db_mock.return_value.gap_analysis_exists.return_value = False + db_mock.return_value.get_nodes.side_effect = lambda name=None, **kwargs: ( + [compare] if name == "CWE" else [] + ) + db_mock.return_value.session.query.return_value.all.return_value = internal_ids + db_mock.return_value.get_CREs.side_effect = lambda internal_id=None, **kwargs: [ + next( + cre + for index, cre in enumerate(opencre_documents) + if internal_id == f"cre-internal-{index}" + ) + ] + + with self.app.test_client() as client: + response = client.get( + "/rest/v1/map_analysis?standard=OpenCRE&standard=CWE", + headers={"Content-Type": "application/json"}, + ) + + payload = json.loads(response.data) + self.assertEqual(200, response.status_code) + self.assertEqual([direct_cre.id], list(payload["result"].keys())) + self.assertEqual(1, len(payload["result"][direct_cre.id]["paths"])) + path = next(iter(payload["result"][direct_cre.id]["paths"].values())) + self.assertEqual(compare.id, path["end"]["id"]) + self.assertEqual(direct_cre.id, path["path"][0]["start"]["id"]) + self.assertEqual(compare.id, path["path"][0]["end"]["id"]) schedule_mock.assert_not_called() def test_gap_analysis_weak_links_no_cache(self) -> None: diff --git a/application/web/web_main.py b/application/web/web_main.py index df0917b2d..c1d71630f 100644 --- a/application/web/web_main.py +++ b/application/web/web_main.py @@ -310,71 +310,44 @@ def _get_map_analysis_documents( return collection.get_nodes(name=standard) -def _get_document_cre_ids(document: defs.Document) -> list[str]: - if document.doctype == defs.Credoctypes.CRE: - return [document.id] - return [ - link.document.id - for link in document.links - if link.document.doctype == defs.Credoctypes.CRE - ] - - -def _build_direct_overlap_path( - base_document: defs.Document, cre_id: str, compare_document: defs.Document -) -> dict[str, Any] | None: - if base_document.doctype == defs.Credoctypes.CRE: - if compare_document.doctype == defs.Credoctypes.CRE: - return None - return { - "end": compare_document.shallow_copy(), - "path": [ - { - "start": base_document.shallow_copy(), - "end": compare_document.shallow_copy(), - "relationship": "LINKED_TO", - "score": 0, - } - ], - "score": 0, - } - - if compare_document.doctype == defs.Credoctypes.CRE: - return { - "end": compare_document.shallow_copy(), - "path": [ - { - "start": base_document.shallow_copy(), - "end": compare_document.shallow_copy(), - "relationship": "LINKED_TO", - "score": 0, - } - ], - "score": 0, - } - +def _build_direct_link_path( + start_document: defs.Document, end_document: defs.Document +) -> dict[str, Any]: return { - "end": compare_document.shallow_copy(), + "end": end_document.shallow_copy(), "path": [ { - "start": base_document.shallow_copy(), - "end": defs.CRE(id=cre_id).shallow_copy(), + "start": start_document.shallow_copy(), + "end": end_document.shallow_copy(), "relationship": "LINKED_TO", "score": 0, - }, - { - "start": defs.CRE(id=cre_id).shallow_copy(), - "end": compare_document.shallow_copy(), - "relationship": "LINKED_TO", - "score": 0, - }, + } ], "score": 0, } -def _make_direct_overlap_path_key(compare_document: defs.Document, cre_id: str) -> str: - return f"{compare_document.id}::{cre_id}" +def _make_direct_link_path_key(end_document: defs.Document) -> str: + return end_document.id + + +def _add_direct_link_result( + grouped_paths: dict[str, dict[str, Any]], + start_document: defs.Document, + end_document: defs.Document, +) -> None: + shared_paths = grouped_paths.setdefault( + start_document.id, + { + "start": start_document.shallow_copy(), + "paths": {}, + "extra": 0, + }, + )["paths"] + shared_paths.setdefault( + _make_direct_link_path_key(end_document), + _build_direct_link_path(start_document, end_document), + ) def _build_direct_cre_overlap_map_analysis( @@ -385,34 +358,36 @@ def _build_direct_cre_overlap_map_analysis( if len(standards) < 2: return None - base_nodes = _get_map_analysis_documents(standards[0], collection) - compare_nodes = _get_map_analysis_documents(standards[1], collection) + base_standard = standards[0] + compare_standard = standards[1] + base_nodes = _get_map_analysis_documents(base_standard, collection) + compare_nodes = _get_map_analysis_documents(compare_standard, collection) if not base_nodes or not compare_nodes: return None - compare_nodes_by_cre: dict[str, list[defs.Document]] = {} - for compare_node in compare_nodes: - for cre_id in _get_document_cre_ids(compare_node): - compare_nodes_by_cre.setdefault(cre_id, []).append(compare_node) + base_is_opencre = base_standard == OPENCRE_STANDARD_NAME + opencre_nodes = base_nodes if base_is_opencre else compare_nodes + standard_nodes = compare_nodes if base_is_opencre else base_nodes + + standard_nodes_by_id = { + standard_node.id: standard_node for standard_node in standard_nodes + } + direct_pairs: list[tuple[defs.CRE, defs.Document]] = [] + for opencre_node in opencre_nodes: + for link in opencre_node.links: + if link.ltype != defs.LinkTypes.LinkedTo: + continue + standard_node = standard_nodes_by_id.get(link.document.id) + if not standard_node: + continue + direct_pairs.append((opencre_node, standard_node)) grouped_paths: dict[str, dict[str, Any]] = {} - for base_node in base_nodes: - shared_paths: dict[str, Any] = {} - for cre_id in _get_document_cre_ids(base_node): - for compare_node in compare_nodes_by_cre.get(cre_id, []): - path = _build_direct_overlap_path(base_node, cre_id, compare_node) - if not path: - continue - shared_paths.setdefault( - _make_direct_overlap_path_key(compare_node, cre_id), path - ) - - if shared_paths: - grouped_paths[base_node.id] = { - "start": base_node.shallow_copy(), - "paths": shared_paths, - "extra": 0, - } + for opencre_node, standard_node in direct_pairs: + if base_is_opencre: + _add_direct_link_result(grouped_paths, opencre_node, standard_node) + else: + _add_direct_link_result(grouped_paths, standard_node, opencre_node) if not grouped_paths: return None @@ -431,7 +406,6 @@ def map_analysis() -> Any: posthog.capture(f"map_analysis", f"standards:{standards}") database = db.Node_collection() - standards = request.args.getlist("standard") standards_hash = gap_analysis.make_resources_key(standards) if OPENCRE_STANDARD_NAME in standards: From 445cbbd51c78f4267ddac90955fc7b1bbf80cde8 Mon Sep 17 00:00:00 2001 From: bornunique911 Date: Fri, 27 Mar 2026 22:53:08 +0530 Subject: [PATCH 5/5] Fix OpenCRE map analysis hover payload --- application/tests/web_main_test.py | 4 +++- application/web/web_main.py | 8 +++++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/application/tests/web_main_test.py b/application/tests/web_main_test.py index 1d55fa3c4..9302ab9b8 100644 --- a/application/tests/web_main_test.py +++ b/application/tests/web_main_test.py @@ -810,9 +810,10 @@ def test_gap_analysis_returns_only_direct_opencre_mappings( self.assertEqual([compare.id], list(payload["result"].keys())) self.assertEqual(1, len(payload["result"][compare.id]["paths"])) path = next(iter(payload["result"][compare.id]["paths"].values())) + self.assertEqual(compare.id, payload["result"][compare.id]["start"]["id"]) self.assertEqual(direct_cre.id, path["end"]["id"]) self.assertEqual(direct_cre.name, path["end"]["name"]) - self.assertEqual(compare.id, path["path"][0]["start"]["id"]) + self.assertEqual("", path["path"][0]["start"]["id"]) self.assertEqual(direct_cre.id, path["path"][0]["end"]["id"]) schedule_mock.assert_not_called() @@ -877,6 +878,7 @@ def test_gap_analysis_returns_only_direct_opencre_mappings_when_opencre_is_left( self.assertEqual([direct_cre.id], list(payload["result"].keys())) self.assertEqual(1, len(payload["result"][direct_cre.id]["paths"])) path = next(iter(payload["result"][direct_cre.id]["paths"].values())) + self.assertEqual(direct_cre.id, payload["result"][direct_cre.id]["start"]["id"]) self.assertEqual(compare.id, path["end"]["id"]) self.assertEqual(direct_cre.id, path["path"][0]["start"]["id"]) self.assertEqual(compare.id, path["path"][0]["end"]["id"]) diff --git a/application/web/web_main.py b/application/web/web_main.py index c1d71630f..54331f2da 100644 --- a/application/web/web_main.py +++ b/application/web/web_main.py @@ -313,11 +313,17 @@ def _get_map_analysis_documents( def _build_direct_link_path( start_document: defs.Document, end_document: defs.Document ) -> dict[str, Any]: + segment_start = start_document.shallow_copy() + # The current gap-analysis popup mutates non-CRE row ids during display + # before it resolves the one-step direct path. Keep this direct-link fast + # path compatible by mirroring that display-only shape in the segment start. + if segment_start.doctype != defs.Credoctypes.CRE.value: + segment_start.id = "" return { "end": end_document.shallow_copy(), "path": [ { - "start": start_document.shallow_copy(), + "start": segment_start, "end": end_document.shallow_copy(), "relationship": "LINKED_TO", "score": 0,