diff --git a/application/tests/cwe_parser_test.py b/application/tests/cwe_parser_test.py index 2c0f72327..0d088884e 100644 --- a/application/tests/cwe_parser_test.py +++ b/application/tests/cwe_parser_test.py @@ -102,6 +102,157 @@ def iter_content(self, chunk_size=None): self.assertCountEqual(nodes[0].todict(), expected[0].todict()) self.assertCountEqual(nodes[1].todict(), expected[1].todict()) + @patch.object(requests, "get") + def test_register_CWE_inherits_mappings_transitively(self, mock_requests) -> None: + tmpdir = mkdtemp() + tmpFile = os.path.join(tmpdir, "cwe.xml") + tmpzip = os.path.join(tmpdir, "cwe.zip") + with open(tmpFile, "w") as cx: + cx.write(self.CWE_transitive_xml) + with zipfile.ZipFile(tmpzip, "w", zipfile.ZIP_DEFLATED) as zipf: + zipf.write(tmpFile, arcname="cwe.xml") + + class fakeRequest: + def iter_content(self, chunk_size=None): + with open(tmpzip, "rb") as zipf: + return [zipf.read()] + + mock_requests.return_value = fakeRequest() + + cre = defs.CRE(id="089-089", name="CRE-Injection") + dbcre = self.collection.add_cre(cre=cre) + dbcwe = self.collection.add_node(defs.Standard(name="CWE", sectionID="89")) + self.collection.add_link(dbcre, dbcwe, defs.LinkTypes.LinkedTo) + + entries = cwe.CWE().parse( + cache=self.collection, + ph=prompt_client.PromptHandler(database=self.collection), + ) + imported_cwes = {node.sectionID: node for node in entries.results["CWE"]} + + self.assertEqual(imported_cwes["2001"].links[0].document.todict(), cre.todict()) + self.assertEqual(imported_cwes["2002"].links[0].document.todict(), cre.todict()) + + @patch.object(requests, "get") + def test_register_CWE_applies_fallback_family_mappings(self, mock_requests) -> None: + tmpdir = mkdtemp() + tmpFile = os.path.join(tmpdir, "cwe.xml") + tmpzip = os.path.join(tmpdir, "cwe.zip") + with open(tmpFile, "w") as cx: + cx.write(self.CWE_fallback_xml) + with zipfile.ZipFile(tmpzip, "w", zipfile.ZIP_DEFLATED) as zipf: + zipf.write(tmpFile, arcname="cwe.xml") + + class fakeRequest: + def iter_content(self, chunk_size=None): + with open(tmpzip, "rb") as zipf: + return [zipf.read()] + + mock_requests.return_value = fakeRequest() + + injection_cre = defs.CRE(id="760-764", name="Injection protection") + xss_cre = defs.CRE(id="760-765", name="XSS protection") + xxe_cre = defs.CRE(id="764-507", name="Restrict XML parsing (against XXE)") + auth_cre = defs.CRE( + id="117-371", name="Use a centralized access control mechanism" + ) + authn_cre = defs.CRE( + id="113-133", name="Use centralized authentication mechanism" + ) + csrf_cre = defs.CRE(id="028-727", name="CSRF protection") + ssrf_cre = defs.CRE(id="028-728", name="SSRF protection") + hardcoded_secret_cre = defs.CRE( + id="774-888", name="Do not store secrets in the code" + ) + password_storage_cre = defs.CRE( + id="622-203", name="Store passwords salted and hashed" + ) + credential_storage_cre = defs.CRE( + id="881-321", name="Store credentials securely" + ) + session_management_cre = defs.CRE(id="177-260", name="Session management") + secure_cookie_cre = defs.CRE( + id="688-081", name='Set "secure" attribute for cookie-based session tokens' + ) + deserialization_cre = defs.CRE(id="836-068", name="Deserialization Prevention") + self.collection.add_cre(cre=injection_cre) + self.collection.add_cre(cre=xss_cre) + self.collection.add_cre(cre=xxe_cre) + self.collection.add_cre(cre=auth_cre) + self.collection.add_cre(cre=authn_cre) + self.collection.add_cre(cre=csrf_cre) + self.collection.add_cre(cre=ssrf_cre) + self.collection.add_cre(cre=hardcoded_secret_cre) + self.collection.add_cre(cre=password_storage_cre) + self.collection.add_cre(cre=credential_storage_cre) + self.collection.add_cre(cre=session_management_cre) + self.collection.add_cre(cre=secure_cookie_cre) + self.collection.add_cre(cre=deserialization_cre) + + entries = cwe.CWE().parse( + cache=self.collection, + ph=prompt_client.PromptHandler(database=self.collection), + ) + imported_cwes = {node.sectionID: node for node in entries.results["CWE"]} + + self.assertEqual( + imported_cwes["89"].links[0].document.todict(), injection_cre.todict() + ) + self.assertEqual( + imported_cwes["79"].links[0].document.todict(), xss_cre.todict() + ) + self.assertEqual( + imported_cwes["611"].links[0].document.todict(), xxe_cre.todict() + ) + self.assertEqual( + imported_cwes["612"].links[0].document.todict(), auth_cre.todict() + ) + self.assertEqual( + imported_cwes["287"].links[0].document.todict(), authn_cre.todict() + ) + self.assertEqual( + imported_cwes["352"].links[0].document.todict(), csrf_cre.todict() + ) + self.assertEqual( + imported_cwes["918"].links[0].document.todict(), ssrf_cre.todict() + ) + self.assertEqual( + imported_cwes["798"].links[0].document.todict(), + hardcoded_secret_cre.todict(), + ) + self.assertEqual( + imported_cwes["321"].links[0].document.todict(), + hardcoded_secret_cre.todict(), + ) + self.assertEqual( + imported_cwes["256"].links[0].document.todict(), + password_storage_cre.todict(), + ) + self.assertEqual( + imported_cwes["257"].links[0].document.todict(), + password_storage_cre.todict(), + ) + self.assertEqual( + imported_cwes["258"].links[0].document.todict(), + credential_storage_cre.todict(), + ) + self.assertEqual( + imported_cwes["260"].links[0].document.todict(), + credential_storage_cre.todict(), + ) + self.assertEqual( + imported_cwes["384"].links[0].document.todict(), + session_management_cre.todict(), + ) + self.assertEqual( + imported_cwes["614"].links[0].document.todict(), + secure_cookie_cre.todict(), + ) + self.assertEqual( + imported_cwes["502"].links[0].document.todict(), + deserialization_cre.todict(), + ) + CWE_xml = """ """ + + CWE_transitive_xml = """ + + + + + + + + + + + + + + Padding entry so xmltodict returns a list of Weakness elements. + + + +""" + + CWE_fallback_xml = """ + + + + XSS entry. + + + SQL injection entry. + + + XXE entry. + + + Authorization entry. + + + Authentication entry. + + + CSRF entry. + + + Hard-coded credentials entry. + + + Hard-coded key entry. + + + Password storage entry. + + + Recoverable password entry. + + + Password in config entry. + + + Password in config entry. + + + Session fixation entry. + + + Cookie secure attribute entry. + + + Deserialization entry. + + + SSRF entry. + + + +""" diff --git a/application/tests/web_main_test.py b/application/tests/web_main_test.py index 9e219b4ce..9302ab9b8 100644 --- a/application/tests/web_main_test.py +++ b/application/tests/web_main_test.py @@ -7,6 +7,7 @@ import json import unittest import tempfile +from types import SimpleNamespace from unittest.mock import patch import redis @@ -688,7 +689,200 @@ def test_standards_from_db(self, node_mock, redis_conn_mock) -> None: headers={"Content-Type": "application/json"}, ) self.assertEqual(200, response.status_code) - self.assertEqual(expected, json.loads(response.data)) + self.assertEqual(expected + ["OpenCRE"], json.loads(response.data)) + + @patch.object(web_main.gap_analysis, "schedule") + @patch.object(db, "Node_collection") + def test_gap_analysis_supports_opencre_as_standard( + self, db_mock, schedule_mock + ) -> None: + shared_cre = defs.CRE(id="170-772", name="Cryptography", description="") + compare = defs.Standard( + name="OWASP Web Security Testing Guide (WSTG)", + section="WSTG-CRYP-04", + ) + compare.add_link( + defs.Link(ltype=defs.LinkTypes.LinkedTo, document=shared_cre.shallow_copy()) + ) + opencre = defs.CRE(id="170-772", name="Cryptography", description="") + opencre.add_link( + defs.Link(ltype=defs.LinkTypes.LinkedTo, document=compare.shallow_copy()) + ) + + db_mock.return_value.get_gap_analysis_result.return_value = None + db_mock.return_value.gap_analysis_exists.return_value = False + db_mock.return_value.get_nodes.side_effect = lambda name=None, **kwargs: ( + [compare] if name == "OWASP Web Security Testing Guide (WSTG)" else [] + ) + db_mock.return_value.session.query.return_value.all.return_value = [ + SimpleNamespace(id="cre-internal-1") + ] + db_mock.return_value.get_CREs.return_value = [opencre] + + with self.app.test_client() as client: + response = client.get( + "/rest/v1/map_analysis?standard=OpenCRE&standard=OWASP%20Web%20Security%20Testing%20Guide%20(WSTG)", + headers={"Content-Type": "application/json"}, + ) + + payload = json.loads(response.data) + self.assertEqual(200, response.status_code) + self.assertIn("result", payload) + self.assertIn(opencre.id, payload["result"]) + self.assertEqual(1, len(payload["result"][opencre.id]["paths"])) + path = next(iter(payload["result"][opencre.id]["paths"].values())) + self.assertEqual(compare.id, path["end"]["id"]) + schedule_mock.assert_not_called() + + @patch.object(web_main.gap_analysis, "schedule") + @patch.object(db, "Node_collection") + def test_gap_analysis_returns_only_direct_opencre_mappings( + self, db_mock, schedule_mock + ) -> None: + compare = defs.Standard( + name="CWE", + sectionID="1004", + section="Sensitive Cookie Without 'HttpOnly' Flag", + ) + direct_cre = defs.CRE( + id="804-220", + name="Set httponly attribute for cookie-based session tokens", + description="", + ) + direct_cre.add_link( + defs.Link(ltype=defs.LinkTypes.LinkedTo, document=compare.shallow_copy()) + ) + auto_linked_cres = [] + for i, cre_id in enumerate( + [ + "117-371", + "166-151", + "284-521", + "368-633", + "612-252", + "664-080", + "801-310", + ], + start=1, + ): + cre = defs.CRE( + id=cre_id, + name=f"Automatically mapped CRE {i}", + description="", + ) + cre.add_link( + defs.Link( + ltype=defs.LinkTypes.AutomaticallyLinkedTo, + document=compare.shallow_copy(), + ) + ) + auto_linked_cres.append(cre) + + opencre_documents = [direct_cre] + auto_linked_cres + internal_ids = [ + SimpleNamespace(id=f"cre-internal-{i}") + for i in range(len(opencre_documents)) + ] + + db_mock.return_value.get_gap_analysis_result.return_value = None + db_mock.return_value.gap_analysis_exists.return_value = False + db_mock.return_value.get_nodes.side_effect = lambda name=None, **kwargs: ( + [compare] if name == "CWE" else [] + ) + db_mock.return_value.session.query.return_value.all.return_value = internal_ids + db_mock.return_value.get_CREs.side_effect = lambda internal_id=None, **kwargs: [ + next( + cre + for index, cre in enumerate(opencre_documents) + if internal_id == f"cre-internal-{index}" + ) + ] + + with self.app.test_client() as client: + response = client.get( + "/rest/v1/map_analysis?standard=CWE&standard=OpenCRE", + headers={"Content-Type": "application/json"}, + ) + + payload = json.loads(response.data) + self.assertEqual(200, response.status_code) + self.assertIn("result", payload) + self.assertEqual([compare.id], list(payload["result"].keys())) + self.assertEqual(1, len(payload["result"][compare.id]["paths"])) + path = next(iter(payload["result"][compare.id]["paths"].values())) + self.assertEqual(compare.id, payload["result"][compare.id]["start"]["id"]) + self.assertEqual(direct_cre.id, path["end"]["id"]) + self.assertEqual(direct_cre.name, path["end"]["name"]) + self.assertEqual("", path["path"][0]["start"]["id"]) + self.assertEqual(direct_cre.id, path["path"][0]["end"]["id"]) + schedule_mock.assert_not_called() + + @patch.object(web_main.gap_analysis, "schedule") + @patch.object(db, "Node_collection") + def test_gap_analysis_returns_only_direct_opencre_mappings_when_opencre_is_left( + self, db_mock, schedule_mock + ) -> None: + compare = defs.Standard( + name="CWE", + sectionID="1004", + section="Sensitive Cookie Without 'HttpOnly' Flag", + ) + direct_cre = defs.CRE( + id="804-220", + name="Set httponly attribute for cookie-based session tokens", + description="", + ) + direct_cre.add_link( + defs.Link(ltype=defs.LinkTypes.LinkedTo, document=compare.shallow_copy()) + ) + indirect_cre = defs.CRE( + id="117-371", + name="Use a centralized access control mechanism", + description="", + ) + indirect_cre.add_link( + defs.Link( + ltype=defs.LinkTypes.AutomaticallyLinkedTo, + document=compare.shallow_copy(), + ) + ) + + opencre_documents = [direct_cre, indirect_cre] + internal_ids = [ + SimpleNamespace(id=f"cre-internal-{i}") + for i in range(len(opencre_documents)) + ] + + db_mock.return_value.get_gap_analysis_result.return_value = None + db_mock.return_value.gap_analysis_exists.return_value = False + db_mock.return_value.get_nodes.side_effect = lambda name=None, **kwargs: ( + [compare] if name == "CWE" else [] + ) + db_mock.return_value.session.query.return_value.all.return_value = internal_ids + db_mock.return_value.get_CREs.side_effect = lambda internal_id=None, **kwargs: [ + next( + cre + for index, cre in enumerate(opencre_documents) + if internal_id == f"cre-internal-{index}" + ) + ] + + with self.app.test_client() as client: + response = client.get( + "/rest/v1/map_analysis?standard=OpenCRE&standard=CWE", + headers={"Content-Type": "application/json"}, + ) + + payload = json.loads(response.data) + self.assertEqual(200, response.status_code) + self.assertEqual([direct_cre.id], list(payload["result"].keys())) + self.assertEqual(1, len(payload["result"][direct_cre.id]["paths"])) + path = next(iter(payload["result"][direct_cre.id]["paths"].values())) + self.assertEqual(direct_cre.id, payload["result"][direct_cre.id]["start"]["id"]) + self.assertEqual(compare.id, path["end"]["id"]) + self.assertEqual(direct_cre.id, path["path"][0]["start"]["id"]) + self.assertEqual(compare.id, path["path"][0]["end"]["id"]) + schedule_mock.assert_not_called() def test_gap_analysis_weak_links_no_cache(self) -> None: with self.app.test_client() as client: diff --git a/application/utils/external_project_parsers/data/cwe_fallback_mappings.json b/application/utils/external_project_parsers/data/cwe_fallback_mappings.json new file mode 100644 index 000000000..11d9d1ff8 --- /dev/null +++ b/application/utils/external_project_parsers/data/cwe_fallback_mappings.json @@ -0,0 +1,102 @@ +[ + { + "keywords": [ + "xml external entity", + "xxe" + ], + "cre_id": "764-507" + }, + { + "keywords": [ + "cross-site scripting", + " xss", + "(xss)" + ], + "cre_id": "760-765" + }, + { + "keywords": [ + "authorization", + "access control" + ], + "cre_id": "117-371" + }, + { + "keywords": [ + "improper authentication", + "missing authentication", + "authentication bypass" + ], + "cre_id": "113-133" + }, + { + "keywords": [ + "cross-site request forgery", + "(csrf)", + "csrf" + ], + "cre_id": "028-727" + }, + { + "keywords": [ + "server-side request forgery", + "(ssrf)", + "ssrf" + ], + "cre_id": "028-728" + }, + { + "keywords": [ + "plaintext storage of a password", + "storing passwords in a recoverable format" + ], + "cre_id": "622-203" + }, + { + "keywords": [ + "empty password in configuration file", + "password in configuration file" + ], + "cre_id": "881-321" + }, + { + "keywords": [ + "hard-coded password", + "hardcoded password", + "hard-coded credentials", + "hardcoded credentials", + "hard-coded credential", + "hardcoded credential", + "hard-coded cryptographic key", + "hardcoded cryptographic key", + "hard-coded key", + "hardcoded key" + ], + "cre_id": "774-888" + }, + { + "keywords": [ + "session fixation" + ], + "cre_id": "177-260" + }, + { + "keywords": [ + "sensitive cookie in https session without 'secure' attribute" + ], + "cre_id": "688-081" + }, + { + "keywords": [ + "deserialization of untrusted data" + ], + "cre_id": "836-068" + }, + { + "keywords": [ + "injection", + "query logic" + ], + "cre_id": "760-764" + } +] diff --git a/application/utils/external_project_parsers/parsers/cwe.py b/application/utils/external_project_parsers/parsers/cwe.py index b0821aba5..2de35a8bb 100644 --- a/application/utils/external_project_parsers/parsers/cwe.py +++ b/application/utils/external_project_parsers/parsers/cwe.py @@ -1,8 +1,10 @@ import logging import os import tempfile +import json +from pathlib import Path import requests -from typing import Dict +from typing import Dict, List from application.database import db from application.defs import cre_defs as defs import shutil @@ -21,6 +23,22 @@ class CWE(ParserInterface): name = "CWE" cwe_zip = "https://cwe.mitre.org/data/xml/cwec_latest.xml.zip" + fallback_mapping_path = ( + Path(__file__).resolve().parent.parent / "data" / "cwe_fallback_mappings.json" + ) + + def __init__(self) -> None: + self.fallback_cre_by_match = self.load_fallback_cre_mappings() + + def load_fallback_cre_mappings(self) -> List[tuple[tuple[str, ...], str]]: + with self.fallback_mapping_path.open("r", encoding="utf-8") as mapping_file: + raw_mappings = json.load(mapping_file) + + mappings = [] + for entry in raw_mappings: + keywords = tuple(keyword.lower() for keyword in entry["keywords"]) + mappings.append((keywords, entry["cre_id"])) + return mappings def parse(self, cache: db.Node_collection, ph: prompt_client.PromptHandler): response = requests.get(self.cwe_zip, stream=True) @@ -72,17 +90,74 @@ def link_to_related_cwe( ) -> defs.Standard: related_cwes = cache.get_nodes(name="CWE", sectionID=related_id) if related_cwes: - for cre in [ - c.document - for c in related_cwes[0].links - if c.document.doctype == defs.Credoctypes.CRE - ]: - logger.debug( - f"linked CWE with id {cwe.sectionID} to CRE with ID {cre.id}" - ) - cwe.add_link( - defs.Link(document=cre, ltype=defs.LinkTypes.AutomaticallyLinkedTo) - ) + return self.link_to_related_cwe_entry(cwe, related_cwes[0]) + return cwe + + def link_to_related_cwe_entry( + self, cwe: defs.Standard, related_cwe: defs.Standard + ) -> defs.Standard: + for cre in [ + link.document + for link in related_cwe.links + if link.document.doctype == defs.Credoctypes.CRE + ]: + logger.debug(f"linked CWE with id {cwe.sectionID} to CRE with ID {cre.id}") + autolink = defs.Link( + document=cre, ltype=defs.LinkTypes.AutomaticallyLinkedTo + ) + if not cwe.has_link(autolink): + cwe.add_link(autolink) + return cwe + + def collect_related_weakness_ids(self, weakness: Dict) -> List[str]: + related_ids = [] + related_weaknesses = weakness.get("Related_Weaknesses") + if not related_weaknesses: + return related_ids + + containers = ( + related_weaknesses + if isinstance(related_weaknesses, list) + else [related_weaknesses] + ) + for container in containers: + if not isinstance(container, Dict): + continue + related_entries = container.get("Related_Weakness") + if not related_entries: + continue + related_entries = ( + related_entries + if isinstance(related_entries, list) + else [related_entries] + ) + for entry in related_entries: + if isinstance(entry, Dict) and entry.get("@CWE_ID"): + related_ids.append(str(entry["@CWE_ID"])) + return related_ids + + def apply_fallback_cre_mapping( + self, cwe: defs.Standard, cache: db.Node_collection + ) -> defs.Standard: + if any(link.document.doctype == defs.Credoctypes.CRE for link in cwe.links): + return cwe + + section_text = (cwe.section or "").lower() + for keywords, cre_id in self.fallback_cre_by_match: + if not any(keyword in section_text for keyword in keywords): + continue + + matching_cres = cache.get_CREs(external_id=cre_id) + if not matching_cres: + continue + + fallback_link = defs.Link( + document=matching_cres[0], ltype=defs.LinkTypes.AutomaticallyLinkedTo + ) + if not cwe.has_link(fallback_link): + cwe.add_link(fallback_link) + return cwe + return cwe # cwe is a special case because it already partially exists in our spreadsheet @@ -91,6 +166,8 @@ def link_to_related_cwe( def register_cwe(self, cache: db.Node_collection, xml_file: str): statuses = {} entries = [] + entries_by_id = {} + related_ids_by_cwe = {} with open(xml_file, "r") as xml: weakness_catalog = xmltodict.parse(xml.read()).get("Weakness_Catalog") for _, weaknesses in weakness_catalog.get("Weaknesses").items(): @@ -147,23 +224,31 @@ def register_cwe(self, cache: db.Node_collection, xml_file: str): logger.info( f"CWE '{cwe.sectionID}-{cwe.section}' does not have any related CAPEC attack patterns, skipping automated linking" ) - if weakness.get("Related_Weaknesses"): - if isinstance(weakness.get("Related_Weaknesses"), list): - for related_weakness in weakness.get("Related_Weaknesses"): - cwe = self.parse_related_weakness( - cache, related_weakness, cwe - ) - else: - cwe = self.parse_related_weakness( - cache, weakness.get("Related_Weaknesses"), cwe - ) entries.append(cwe) - return entries + entries_by_id[cwe.sectionID] = cwe + related_ids_by_cwe[cwe.sectionID] = ( + self.collect_related_weakness_ids(weakness) + ) - def parse_related_weakness( - self, cache: db.Node_collection, rw: Dict[str, Dict], cwe: defs.Standard - ) -> defs.Standard: - cwe_entry = rw.get("Related_Weakness") - if isinstance(cwe_entry, Dict): - id = cwe_entry["@CWE_ID"] - return self.link_to_related_cwe(cwe=cwe, cache=cache, related_id=id) + changed = True + while changed: + changed = False + for cwe_id, related_ids in related_ids_by_cwe.items(): + cwe = entries_by_id[cwe_id] + before_count = len(cwe.links) + for related_id in related_ids: + related_cwe = entries_by_id.get(related_id) + if related_cwe: + cwe = self.link_to_related_cwe_entry(cwe, related_cwe) + else: + cwe = self.link_to_related_cwe( + cwe=cwe, cache=cache, related_id=related_id + ) + entries_by_id[cwe_id] = cwe + if len(cwe.links) != before_count: + changed = True + + for cwe_id, cwe in entries_by_id.items(): + entries_by_id[cwe_id] = self.apply_fallback_cre_mapping(cwe, cache) + + return entries diff --git a/application/web/web_main.py b/application/web/web_main.py index 29567470a..54331f2da 100644 --- a/application/web/web_main.py +++ b/application/web/web_main.py @@ -48,6 +48,7 @@ ITEMS_PER_PAGE = 20 +OPENCRE_STANDARD_NAME = "OpenCRE" app = Blueprint( "web", @@ -294,6 +295,116 @@ def find_document_by_tag() -> Any: abort(404, "Tag does not exist") +def _get_opencre_documents(collection: db.Node_collection) -> list[defs.CRE]: + return [ + collection.get_CREs(internal_id=cre.id)[0] + for cre in collection.session.query(db.CRE).all() + ] + + +def _get_map_analysis_documents( + standard: str, collection: db.Node_collection +) -> list[defs.Document]: + if standard == OPENCRE_STANDARD_NAME: + return _get_opencre_documents(collection) + return collection.get_nodes(name=standard) + + +def _build_direct_link_path( + start_document: defs.Document, end_document: defs.Document +) -> dict[str, Any]: + segment_start = start_document.shallow_copy() + # The current gap-analysis popup mutates non-CRE row ids during display + # before it resolves the one-step direct path. Keep this direct-link fast + # path compatible by mirroring that display-only shape in the segment start. + if segment_start.doctype != defs.Credoctypes.CRE.value: + segment_start.id = "" + return { + "end": end_document.shallow_copy(), + "path": [ + { + "start": segment_start, + "end": end_document.shallow_copy(), + "relationship": "LINKED_TO", + "score": 0, + } + ], + "score": 0, + } + + +def _make_direct_link_path_key(end_document: defs.Document) -> str: + return end_document.id + + +def _add_direct_link_result( + grouped_paths: dict[str, dict[str, Any]], + start_document: defs.Document, + end_document: defs.Document, +) -> None: + shared_paths = grouped_paths.setdefault( + start_document.id, + { + "start": start_document.shallow_copy(), + "paths": {}, + "extra": 0, + }, + )["paths"] + shared_paths.setdefault( + _make_direct_link_path_key(end_document), + _build_direct_link_path(start_document, end_document), + ) + + +def _build_direct_cre_overlap_map_analysis( + standards: list[str], + standards_hash: str, + collection: db.Node_collection, +) -> dict[str, Any] | None: + if len(standards) < 2: + return None + + base_standard = standards[0] + compare_standard = standards[1] + base_nodes = _get_map_analysis_documents(base_standard, collection) + compare_nodes = _get_map_analysis_documents(compare_standard, collection) + if not base_nodes or not compare_nodes: + return None + + base_is_opencre = base_standard == OPENCRE_STANDARD_NAME + opencre_nodes = base_nodes if base_is_opencre else compare_nodes + standard_nodes = compare_nodes if base_is_opencre else base_nodes + + standard_nodes_by_id = { + standard_node.id: standard_node for standard_node in standard_nodes + } + direct_pairs: list[tuple[defs.CRE, defs.Document]] = [] + for opencre_node in opencre_nodes: + for link in opencre_node.links: + if link.ltype != defs.LinkTypes.LinkedTo: + continue + standard_node = standard_nodes_by_id.get(link.document.id) + if not standard_node: + continue + direct_pairs.append((opencre_node, standard_node)) + + grouped_paths: dict[str, dict[str, Any]] = {} + for opencre_node, standard_node in direct_pairs: + if base_is_opencre: + _add_direct_link_result(grouped_paths, opencre_node, standard_node) + else: + _add_direct_link_result(grouped_paths, standard_node, opencre_node) + + if not grouped_paths: + return None + + result = {"result": grouped_paths} + collection.add_gap_analysis_result( + cache_key=standards_hash, ga_object=flask_json.dumps(result) + ) + return result + + @app.route("/rest/v1/map_analysis", methods=["GET"]) def map_analysis() -> Any: standards = request.args.getlist("standard") @@ -301,9 +412,16 @@ def map_analysis() -> Any: posthog.capture(f"map_analysis", f"standards:{standards}") database = db.Node_collection() - standards = request.args.getlist("standard") standards_hash = gap_analysis.make_resources_key(standards) + if OPENCRE_STANDARD_NAME in standards: + direct_gap_analysis = _build_direct_cre_overlap_map_analysis( + standards, standards_hash, database + ) + if direct_gap_analysis: + return jsonify(direct_gap_analysis) + abort(404, "No direct overlap found for requested standards") + # First, check if we have cached results in the database if database.gap_analysis_exists(standards_hash): gap_analysis_result = database.get_gap_analysis_result(standards_hash) @@ -438,7 +556,9 @@ def standards() -> Any: posthog.capture(f"standards", "") database = db.Node_collection() - standards = database.standards() + standards = list(database.standards()) + if OPENCRE_STANDARD_NAME not in standards: + standards.append(OPENCRE_STANDARD_NAME) return standards diff --git a/scripts/run-local.sh b/scripts/run-local.sh new file mode 100755 index 000000000..94631cbe9 --- /dev/null +++ b/scripts/run-local.sh @@ -0,0 +1,26 @@ +#!/usr/bin/env bash + +set -euo pipefail + +ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" +VENV_DIR="$ROOT_DIR/venv" + +if [[ ! -d "$VENV_DIR" ]]; then + echo "Creating virtual environment in $VENV_DIR" + python3 -m venv "$VENV_DIR" +fi + +source "$VENV_DIR/bin/activate" + +if ! python -c "import flask" >/dev/null 2>&1; then + echo "Installing Python dependencies" + pip install -r "$ROOT_DIR/requirements.txt" +fi + +export NO_LOGIN="${NO_LOGIN:-1}" +export INSECURE_REQUESTS="${INSECURE_REQUESTS:-1}" +export FLASK_APP="$ROOT_DIR/cre.py" +export FLASK_CONFIG="${FLASK_CONFIG:-development}" + +echo "Starting OpenCRE on http://127.0.0.1:5000" +exec flask run --host 127.0.0.1 --port 5000 diff --git a/scripts/show-db-stats.sh b/scripts/show-db-stats.sh new file mode 100755 index 000000000..626e30dda --- /dev/null +++ b/scripts/show-db-stats.sh @@ -0,0 +1,59 @@ +#!/usr/bin/env bash +set -euo pipefail + +ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" +DB_PATH="${1:-$ROOT_DIR/standards_cache.sqlite}" + +if [[ ! -f "$DB_PATH" ]]; then + echo "Database not found: $DB_PATH" >&2 + exit 1 +fi + +echo "Database: $DB_PATH" +du -h "$DB_PATH" + +"$ROOT_DIR/venv/bin/python" - "$DB_PATH" <<'PY' +import os +import sqlite3 +import sys + +db_path = sys.argv[1] +conn = sqlite3.connect(db_path) +cur = conn.cursor() + +print(f"size_bytes {os.path.getsize(db_path)}") + +tables = [ + "node", + "cre", + "cre_links", + "cre_node_links", + "embeddings", +] + +for table in tables: + try: + count = cur.execute(f"select count(*) from {table}").fetchone()[0] + print(f"{table}_count {count}") + except sqlite3.Error as exc: + print(f"{table}_count unavailable ({exc})") + +try: + standards = cur.execute( + """ + select name, count(*) + from node + where name is not null + group by name + order by count(*) desc, name asc + limit 15 + """ + ).fetchall() + print("top_standards") + for name, count in standards: + print(f"{name}\t{count}") +except sqlite3.Error as exc: + print(f"top_standards unavailable ({exc})") + +conn.close() +PY diff --git a/scripts/update-cwe.sh b/scripts/update-cwe.sh new file mode 100755 index 000000000..7c12c92e1 --- /dev/null +++ b/scripts/update-cwe.sh @@ -0,0 +1,32 @@ +#!/usr/bin/env bash + +set -euo pipefail + +ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" +VENV_DIR="$ROOT_DIR/venv" +CACHE_FILE="${1:-$ROOT_DIR/standards_cache.sqlite}" +TIMESTAMP="$(date +%Y%m%d-%H%M%S)" +BACKUP_FILE="${CACHE_FILE}.bak.${TIMESTAMP}" + +if [[ ! -d "$VENV_DIR" ]]; then + echo "Creating virtual environment in $VENV_DIR" + python3 -m venv "$VENV_DIR" +fi + +source "$VENV_DIR/bin/activate" + +if ! python -c "import requests" >/dev/null 2>&1; then + echo "Installing Python dependencies" + pip install -r "$ROOT_DIR/requirements.txt" +fi + +if [[ -f "$CACHE_FILE" ]]; then + cp "$CACHE_FILE" "$BACKUP_FILE" + echo "Backed up database to $BACKUP_FILE" +fi + +export CRE_NO_NEO4J="${CRE_NO_NEO4J:-1}" +export CRE_NO_GEN_EMBEDDINGS="${CRE_NO_GEN_EMBEDDINGS:-1}" + +echo "Importing latest MITRE CWE data into $CACHE_FILE" +exec python "$ROOT_DIR/cre.py" --cwe_in --cache_file "$CACHE_FILE"