Skip to content

Commit 074733f

Browse files
committed
Add OpenCRE as a map analysis resource
1 parent ea15578 commit 074733f

File tree

2 files changed

+178
-2
lines changed

2 files changed

+178
-2
lines changed

application/tests/web_main_test.py

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import json
88
import unittest
99
import tempfile
10+
from types import SimpleNamespace
1011
from unittest.mock import patch
1112

1213
import redis
@@ -1116,7 +1117,48 @@ def test_standards_from_db(self, node_mock, redis_conn_mock) -> None:
11161117
headers={"Content-Type": "application/json"},
11171118
)
11181119
self.assertEqual(200, response.status_code)
1119-
self.assertEqual(expected, json.loads(response.data))
1120+
self.assertEqual(expected + ["OpenCRE"], json.loads(response.data))
1121+
1122+
@patch.object(web_main.gap_analysis, "schedule")
1123+
@patch.object(db, "Node_collection")
1124+
def test_gap_analysis_supports_opencre_as_standard(
1125+
self, db_mock, schedule_mock
1126+
) -> None:
1127+
shared_cre = defs.CRE(id="170-772", name="Cryptography", description="")
1128+
compare = defs.Standard(
1129+
name="OWASP Web Security Testing Guide (WSTG)",
1130+
section="WSTG-CRYP-04",
1131+
)
1132+
compare.add_link(
1133+
defs.Link(ltype=defs.LinkTypes.LinkedTo, document=shared_cre.shallow_copy())
1134+
)
1135+
opencre = defs.CRE(id="170-772", name="Cryptography", description="")
1136+
opencre.add_link(
1137+
defs.Link(ltype=defs.LinkTypes.LinkedTo, document=compare.shallow_copy())
1138+
)
1139+
1140+
db_mock.return_value.get_gap_analysis_result.return_value = None
1141+
db_mock.return_value.gap_analysis_exists.return_value = False
1142+
db_mock.return_value.get_nodes.side_effect = lambda name=None, **kwargs: (
1143+
[compare] if name == "OWASP Web Security Testing Guide (WSTG)" else []
1144+
)
1145+
db_mock.return_value.session.query.return_value.all.return_value = [
1146+
SimpleNamespace(id="cre-internal-1")
1147+
]
1148+
db_mock.return_value.get_CREs.return_value = [opencre]
1149+
1150+
with self.app.test_client() as client:
1151+
response = client.get(
1152+
"/rest/v1/map_analysis?standard=OpenCRE&standard=OWASP%20Web%20Security%20Testing%20Guide%20(WSTG)",
1153+
headers={"Content-Type": "application/json"},
1154+
)
1155+
1156+
payload = json.loads(response.data)
1157+
self.assertEqual(200, response.status_code)
1158+
self.assertIn("result", payload)
1159+
self.assertIn(opencre.id, payload["result"])
1160+
self.assertIn(compare.id, payload["result"][opencre.id]["paths"])
1161+
schedule_mock.assert_not_called()
11201162

11211163
def test_gap_analysis_weak_links_no_cache(self) -> None:
11221164
with self.app.test_client() as client:

application/web/web_main.py

Lines changed: 135 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
/ "data"
5656
/ "owasp_top10_2025.json"
5757
)
58+
OPENCRE_STANDARD_NAME = "OpenCRE"
5859

5960
app = Blueprint(
6061
"web",
@@ -618,6 +619,129 @@ def find_document_by_tag() -> Any:
618619
abort(404, "Tag does not exist")
619620

620621

622+
def _get_opencre_documents(collection: db.Node_collection) -> list[defs.CRE]:
623+
return [
624+
collection.get_CREs(internal_id=cre.id)[0]
625+
for cre in collection.session.query(db.CRE).all()
626+
]
627+
628+
629+
def _get_map_analysis_documents(
630+
standard: str, collection: db.Node_collection
631+
) -> list[defs.Document]:
632+
if standard == OPENCRE_STANDARD_NAME:
633+
return _get_opencre_documents(collection)
634+
return collection.get_nodes(name=standard)
635+
636+
637+
def _get_document_cre_ids(document: defs.Document) -> list[str]:
638+
if document.doctype == defs.Credoctypes.CRE:
639+
return [document.id]
640+
return [
641+
link.document.id
642+
for link in document.links
643+
if link.document.doctype == defs.Credoctypes.CRE
644+
]
645+
646+
647+
def _build_direct_overlap_path(
648+
base_document: defs.Document, cre_id: str, compare_document: defs.Document
649+
) -> dict[str, Any] | None:
650+
if base_document.doctype == defs.Credoctypes.CRE:
651+
if compare_document.doctype == defs.Credoctypes.CRE:
652+
return None
653+
return {
654+
"end": compare_document.shallow_copy(),
655+
"path": [
656+
{
657+
"start": base_document.shallow_copy(),
658+
"end": compare_document.shallow_copy(),
659+
"relationship": "LINKED_TO",
660+
"score": 0,
661+
}
662+
],
663+
"score": 0,
664+
}
665+
666+
if compare_document.doctype == defs.Credoctypes.CRE:
667+
return {
668+
"end": compare_document.shallow_copy(),
669+
"path": [
670+
{
671+
"start": base_document.shallow_copy(),
672+
"end": compare_document.shallow_copy(),
673+
"relationship": "LINKED_TO",
674+
"score": 0,
675+
}
676+
],
677+
"score": 0,
678+
}
679+
680+
return {
681+
"end": compare_document.shallow_copy(),
682+
"path": [
683+
{
684+
"start": base_document.shallow_copy(),
685+
"end": defs.CRE(id=cre_id).shallow_copy(),
686+
"relationship": "LINKED_TO",
687+
"score": 0,
688+
},
689+
{
690+
"start": defs.CRE(id=cre_id).shallow_copy(),
691+
"end": compare_document.shallow_copy(),
692+
"relationship": "LINKED_TO",
693+
"score": 0,
694+
},
695+
],
696+
"score": 0,
697+
}
698+
699+
700+
def _build_direct_cre_overlap_map_analysis(
701+
standards: list[str],
702+
standards_hash: str,
703+
collection: db.Node_collection,
704+
) -> dict[str, Any] | None:
705+
if len(standards) < 2:
706+
return None
707+
708+
base_nodes = _get_map_analysis_documents(standards[0], collection)
709+
compare_nodes = _get_map_analysis_documents(standards[1], collection)
710+
if not base_nodes or not compare_nodes:
711+
return None
712+
713+
compare_nodes_by_cre: dict[str, list[defs.Document]] = {}
714+
for compare_node in compare_nodes:
715+
for cre_id in _get_document_cre_ids(compare_node):
716+
compare_nodes_by_cre.setdefault(cre_id, []).append(compare_node)
717+
718+
grouped_paths: dict[str, dict[str, Any]] = {}
719+
for base_node in base_nodes:
720+
shared_paths: dict[str, Any] = {}
721+
for cre_id in _get_document_cre_ids(base_node):
722+
for compare_node in compare_nodes_by_cre.get(cre_id, []):
723+
path = _build_direct_overlap_path(base_node, cre_id, compare_node)
724+
if not path:
725+
continue
726+
shared_paths.setdefault(compare_node.id, path)
727+
728+
if shared_paths:
729+
grouped_paths[base_node.id] = {
730+
"start": base_node.shallow_copy(),
731+
"paths": shared_paths,
732+
"extra": 0,
733+
}
734+
735+
if not grouped_paths:
736+
return None
737+
738+
result = {"result": grouped_paths}
739+
collection.add_gap_analysis_result(
740+
cache_key=standards_hash, ga_object=flask_json.dumps(result)
741+
)
742+
return result
743+
744+
621745
@app.route("/rest/v1/map_analysis", methods=["GET"])
622746
def map_analysis() -> Any:
623747
standards = [_normalize_standard_name(s) for s in request.args.getlist("standard")]
@@ -628,6 +752,14 @@ def map_analysis() -> Any:
628752
standards_hash = gap_analysis.make_resources_key(standards)
629753
owasp_top10_comparison = _build_owasp_top10_comparison(standards, database)
630754

755+
if OPENCRE_STANDARD_NAME in standards:
756+
direct_gap_analysis = _build_direct_cre_overlap_map_analysis(
757+
standards, standards_hash, database
758+
)
759+
if direct_gap_analysis:
760+
return jsonify(direct_gap_analysis)
761+
abort(404, "No direct overlap found for requested standards")
762+
631763
# First, check if we have cached results in the database
632764
if database.gap_analysis_exists(standards_hash):
633765
gap_analysis_result = database.get_gap_analysis_result(standards_hash)
@@ -875,7 +1007,9 @@ def standards() -> Any:
8751007
posthog.capture(f"standards", "")
8761008

8771009
database = db.Node_collection()
878-
standards = database.standards()
1010+
standards = list(database.standards())
1011+
if OPENCRE_STANDARD_NAME not in standards:
1012+
standards.append(OPENCRE_STANDARD_NAME)
8791013
return standards
8801014

8811015

0 commit comments

Comments
 (0)