Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
196 changes: 195 additions & 1 deletion application/tests/web_main_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import json
import unittest
import tempfile
from types import SimpleNamespace
from unittest.mock import patch

import redis
Expand Down Expand Up @@ -688,7 +689,200 @@ def test_standards_from_db(self, node_mock, redis_conn_mock) -> None:
headers={"Content-Type": "application/json"},
)
self.assertEqual(200, response.status_code)
self.assertEqual(expected, json.loads(response.data))
self.assertEqual(expected + ["OpenCRE"], json.loads(response.data))

@patch.object(web_main.gap_analysis, "schedule")
@patch.object(db, "Node_collection")
def test_gap_analysis_supports_opencre_as_standard(
self, db_mock, schedule_mock
) -> None:
shared_cre = defs.CRE(id="170-772", name="Cryptography", description="")
compare = defs.Standard(
name="OWASP Web Security Testing Guide (WSTG)",
section="WSTG-CRYP-04",
)
compare.add_link(
defs.Link(ltype=defs.LinkTypes.LinkedTo, document=shared_cre.shallow_copy())
)
opencre = defs.CRE(id="170-772", name="Cryptography", description="")
opencre.add_link(
defs.Link(ltype=defs.LinkTypes.LinkedTo, document=compare.shallow_copy())
)

db_mock.return_value.get_gap_analysis_result.return_value = None
db_mock.return_value.gap_analysis_exists.return_value = False
db_mock.return_value.get_nodes.side_effect = lambda name=None, **kwargs: (
[compare] if name == "OWASP Web Security Testing Guide (WSTG)" else []
)
db_mock.return_value.session.query.return_value.all.return_value = [
SimpleNamespace(id="cre-internal-1")
]
db_mock.return_value.get_CREs.return_value = [opencre]

with self.app.test_client() as client:
response = client.get(
"/rest/v1/map_analysis?standard=OpenCRE&standard=OWASP%20Web%20Security%20Testing%20Guide%20(WSTG)",
headers={"Content-Type": "application/json"},
)

payload = json.loads(response.data)
self.assertEqual(200, response.status_code)
self.assertIn("result", payload)
self.assertIn(opencre.id, payload["result"])
self.assertEqual(1, len(payload["result"][opencre.id]["paths"]))
path = next(iter(payload["result"][opencre.id]["paths"].values()))
self.assertEqual(compare.id, path["end"]["id"])
schedule_mock.assert_not_called()

@patch.object(web_main.gap_analysis, "schedule")
@patch.object(db, "Node_collection")
def test_gap_analysis_returns_only_direct_opencre_mappings(
self, db_mock, schedule_mock
) -> None:
compare = defs.Standard(
name="CWE",
sectionID="1004",
section="Sensitive Cookie Without 'HttpOnly' Flag",
)
direct_cre = defs.CRE(
id="804-220",
name="Set httponly attribute for cookie-based session tokens",
description="",
)
direct_cre.add_link(
defs.Link(ltype=defs.LinkTypes.LinkedTo, document=compare.shallow_copy())
)
auto_linked_cres = []
for i, cre_id in enumerate(
[
"117-371",
"166-151",
"284-521",
"368-633",
"612-252",
"664-080",
"801-310",
],
start=1,
):
cre = defs.CRE(
id=cre_id,
name=f"Automatically mapped CRE {i}",
description="",
)
cre.add_link(
defs.Link(
ltype=defs.LinkTypes.AutomaticallyLinkedTo,
document=compare.shallow_copy(),
)
)
auto_linked_cres.append(cre)

opencre_documents = [direct_cre] + auto_linked_cres
internal_ids = [
SimpleNamespace(id=f"cre-internal-{i}")
for i in range(len(opencre_documents))
]

db_mock.return_value.get_gap_analysis_result.return_value = None
db_mock.return_value.gap_analysis_exists.return_value = False
db_mock.return_value.get_nodes.side_effect = lambda name=None, **kwargs: (
[compare] if name == "CWE" else []
)
db_mock.return_value.session.query.return_value.all.return_value = internal_ids
db_mock.return_value.get_CREs.side_effect = lambda internal_id=None, **kwargs: [
next(
cre
for index, cre in enumerate(opencre_documents)
if internal_id == f"cre-internal-{index}"
)
]

with self.app.test_client() as client:
response = client.get(
"/rest/v1/map_analysis?standard=CWE&standard=OpenCRE",
headers={"Content-Type": "application/json"},
)

payload = json.loads(response.data)
self.assertEqual(200, response.status_code)
self.assertIn("result", payload)
self.assertEqual([compare.id], list(payload["result"].keys()))
self.assertEqual(1, len(payload["result"][compare.id]["paths"]))
path = next(iter(payload["result"][compare.id]["paths"].values()))
self.assertEqual(compare.id, payload["result"][compare.id]["start"]["id"])
self.assertEqual(direct_cre.id, path["end"]["id"])
self.assertEqual(direct_cre.name, path["end"]["name"])
self.assertEqual("", path["path"][0]["start"]["id"])
self.assertEqual(direct_cre.id, path["path"][0]["end"]["id"])
schedule_mock.assert_not_called()

@patch.object(web_main.gap_analysis, "schedule")
@patch.object(db, "Node_collection")
def test_gap_analysis_returns_only_direct_opencre_mappings_when_opencre_is_left(
self, db_mock, schedule_mock
) -> None:
compare = defs.Standard(
name="CWE",
sectionID="1004",
section="Sensitive Cookie Without 'HttpOnly' Flag",
)
direct_cre = defs.CRE(
id="804-220",
name="Set httponly attribute for cookie-based session tokens",
description="",
)
direct_cre.add_link(
defs.Link(ltype=defs.LinkTypes.LinkedTo, document=compare.shallow_copy())
)
indirect_cre = defs.CRE(
id="117-371",
name="Use a centralized access control mechanism",
description="",
)
indirect_cre.add_link(
defs.Link(
ltype=defs.LinkTypes.AutomaticallyLinkedTo,
document=compare.shallow_copy(),
)
)

opencre_documents = [direct_cre, indirect_cre]
internal_ids = [
SimpleNamespace(id=f"cre-internal-{i}")
for i in range(len(opencre_documents))
]

db_mock.return_value.get_gap_analysis_result.return_value = None
db_mock.return_value.gap_analysis_exists.return_value = False
db_mock.return_value.get_nodes.side_effect = lambda name=None, **kwargs: (
[compare] if name == "CWE" else []
)
db_mock.return_value.session.query.return_value.all.return_value = internal_ids
db_mock.return_value.get_CREs.side_effect = lambda internal_id=None, **kwargs: [
next(
cre
for index, cre in enumerate(opencre_documents)
if internal_id == f"cre-internal-{index}"
)
]

with self.app.test_client() as client:
response = client.get(
"/rest/v1/map_analysis?standard=OpenCRE&standard=CWE",
headers={"Content-Type": "application/json"},
)

payload = json.loads(response.data)
self.assertEqual(200, response.status_code)
self.assertEqual([direct_cre.id], list(payload["result"].keys()))
self.assertEqual(1, len(payload["result"][direct_cre.id]["paths"]))
path = next(iter(payload["result"][direct_cre.id]["paths"].values()))
self.assertEqual(direct_cre.id, payload["result"][direct_cre.id]["start"]["id"])
self.assertEqual(compare.id, path["end"]["id"])
self.assertEqual(direct_cre.id, path["path"][0]["start"]["id"])
self.assertEqual(compare.id, path["path"][0]["end"]["id"])
schedule_mock.assert_not_called()

def test_gap_analysis_weak_links_no_cache(self) -> None:
with self.app.test_client() as client:
Expand Down
124 changes: 122 additions & 2 deletions application/web/web_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@


ITEMS_PER_PAGE = 20
OPENCRE_STANDARD_NAME = "OpenCRE"

app = Blueprint(
"web",
Expand Down Expand Up @@ -294,16 +295,133 @@ def find_document_by_tag() -> Any:
abort(404, "Tag does not exist")


def _get_opencre_documents(collection: db.Node_collection) -> list[defs.CRE]:
return [
collection.get_CREs(internal_id=cre.id)[0]
for cre in collection.session.query(db.CRE).all()
]


def _get_map_analysis_documents(
standard: str, collection: db.Node_collection
) -> list[defs.Document]:
if standard == OPENCRE_STANDARD_NAME:
return _get_opencre_documents(collection)
return collection.get_nodes(name=standard)


def _build_direct_link_path(
start_document: defs.Document, end_document: defs.Document
) -> dict[str, Any]:
segment_start = start_document.shallow_copy()
# The current gap-analysis popup mutates non-CRE row ids during display
# before it resolves the one-step direct path. Keep this direct-link fast
# path compatible by mirroring that display-only shape in the segment start.
if segment_start.doctype != defs.Credoctypes.CRE.value:
segment_start.id = ""
return {
"end": end_document.shallow_copy(),
"path": [
{
"start": segment_start,
"end": end_document.shallow_copy(),
"relationship": "LINKED_TO",
"score": 0,
}
],
"score": 0,
}


def _make_direct_link_path_key(end_document: defs.Document) -> str:
return end_document.id


def _add_direct_link_result(
grouped_paths: dict[str, dict[str, Any]],
start_document: defs.Document,
end_document: defs.Document,
) -> None:
shared_paths = grouped_paths.setdefault(
start_document.id,
{
"start": start_document.shallow_copy(),
"paths": {},
"extra": 0,
},
)["paths"]
shared_paths.setdefault(
_make_direct_link_path_key(end_document),
_build_direct_link_path(start_document, end_document),
)


def _build_direct_cre_overlap_map_analysis(
standards: list[str],
standards_hash: str,
collection: db.Node_collection,
) -> dict[str, Any] | None:
if len(standards) < 2:
return None

base_standard = standards[0]
compare_standard = standards[1]
base_nodes = _get_map_analysis_documents(base_standard, collection)
compare_nodes = _get_map_analysis_documents(compare_standard, collection)
if not base_nodes or not compare_nodes:
return None

base_is_opencre = base_standard == OPENCRE_STANDARD_NAME
opencre_nodes = base_nodes if base_is_opencre else compare_nodes
standard_nodes = compare_nodes if base_is_opencre else base_nodes

standard_nodes_by_id = {
standard_node.id: standard_node for standard_node in standard_nodes
}
direct_pairs: list[tuple[defs.CRE, defs.Document]] = []
for opencre_node in opencre_nodes:
for link in opencre_node.links:
if link.ltype != defs.LinkTypes.LinkedTo:
continue
standard_node = standard_nodes_by_id.get(link.document.id)
if not standard_node:
continue
direct_pairs.append((opencre_node, standard_node))

grouped_paths: dict[str, dict[str, Any]] = {}
for opencre_node, standard_node in direct_pairs:
if base_is_opencre:
_add_direct_link_result(grouped_paths, opencre_node, standard_node)
else:
_add_direct_link_result(grouped_paths, standard_node, opencre_node)

if not grouped_paths:
return None

result = {"result": grouped_paths}
collection.add_gap_analysis_result(
cache_key=standards_hash, ga_object=flask_json.dumps(result)
)
return result


@app.route("/rest/v1/map_analysis", methods=["GET"])
def map_analysis() -> Any:
standards = request.args.getlist("standard")
if posthog:
posthog.capture(f"map_analysis", f"standards:{standards}")

database = db.Node_collection()
standards = request.args.getlist("standard")
standards_hash = gap_analysis.make_resources_key(standards)

if OPENCRE_STANDARD_NAME in standards:
direct_gap_analysis = _build_direct_cre_overlap_map_analysis(
standards, standards_hash, database
)
if direct_gap_analysis:
return jsonify(direct_gap_analysis)
abort(404, "No direct overlap found for requested standards")

# First, check if we have cached results in the database
if database.gap_analysis_exists(standards_hash):
gap_analysis_result = database.get_gap_analysis_result(standards_hash)
Expand Down Expand Up @@ -438,7 +556,9 @@ def standards() -> Any:
posthog.capture(f"standards", "")

database = db.Node_collection()
standards = database.standards()
standards = list(database.standards())
if OPENCRE_STANDARD_NAME not in standards:
standards.append(OPENCRE_STANDARD_NAME)
return standards


Expand Down